Commit d789dfb9 authored by 957dd's avatar 957dd

增加了船的逻辑

parent e7bdceae
......@@ -14,7 +14,7 @@ void pin_all_default();//拉低车的控制引脚
void pin_value(int pin,int value);//控制引脚高低
void pin_ship_init();
void ship_speed_change(unsigned char *buf);
void ship_speed_change(void *buf);
void ship_stop_pwm();//船pwm停止函数
void pin_all_ship_default();//拉低船引脚
......
......@@ -16,6 +16,10 @@
#include "delay.h"
#include "opensh.h"
#include "INA226.h"
#include "pthrpoll.h"
#define filename "/home/orangepi/car/master/Deviceld.txt"
pthread_t thread[4];
......@@ -34,11 +38,12 @@ int thread_start(void *AppExit(void *arg),void *Mqttbeat(void *arg),void *opensh
if(pthread_create(&thread[2],NULL,opensh,NULL)!=0){
perror("Failed to create thread 2");
return 2;
}if(pthread_create(&thread[3],NULL,Mqtt_onnect,NULL)!=0) {
}
if(pthread_create(&thread[3],NULL,Mqtt_onnect,NULL)!=0) {
perror("Failed to create thread 3");
return 3;
}
return 0;
return 10086;
}
......@@ -46,8 +51,7 @@ void *AppExit(void *arg){ //出现意外自动停止
while(1){
Delay_Ms(0,100);
gPwmCount++;
if(gPwmCount>=5)
{
if(gPwmCount>=5) {
if(AppExit_pin_pwm==1) {//车异常问题处理
midde_pwm();
pin_all_default();
......
......@@ -15,11 +15,14 @@
#include "opensh.h"
#include "gpio_pwm.h"
#include "INA226.h"
#include "pthrpoll.h"
extern char* TOPIC ;//="app2dev/controlcar0004"
extern char* TOPIC2 ;//="dev2app/controlcar0004"
extern char* TOPIC3;//= "controlcar0004"
extern ThreadPool *pool;
#define BROKER_ADDRESS "119.45.167.177"
#define BROKER_PORT 1883
......
#ifndef PTHRPOLL_H
#define PTHRPOLL_H
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
// 任务结构体
typedef struct Task {
void (*function)(void *);
......@@ -27,12 +25,24 @@ typedef struct {
// 线程池结构体
typedef struct {
pthread_t *threads;
pthread_t reaper_thread; // 新增:回收线程
int thread_count;
int min_threads; // 新增:最小线程数
int max_threads; // 新增:最大线程数
int active_threads; // 新增:活跃线程数
TaskQueue task_queue;
int shutdown;
pthread_mutex_t mutex;
pthread_cond_t reaper_cond; // 新增:回收线程条件变量
} ThreadPool;
// 初始化线程池
ThreadPool *thread_pool_init(int min_threads, int max_threads);
// 向线程池添加任务
int thread_pool_add_task(ThreadPool *pool, void (*function)(void *), void *argument);
// 销毁线程池
void thread_pool_destroy(ThreadPool *pool);
#endif
\ No newline at end of file
#endif // THREAD_POOL_H
\ No newline at end of file
No preview for this file type
This diff is collapsed.
......@@ -13,6 +13,8 @@ uint8_t AppExit_pin_pwm=0;//判断坦克或者车的退出
int rc=0;//判断mqtt是否成功创建
ThreadPool *pool;//线程池线程函数
unsigned char modeTemp=0;
unsigned char typeTemp=0;
unsigned char pinTemp=0;
......@@ -95,7 +97,13 @@ void message_3(cJSON *body,cJSON *pwm_ctrl){//message_type为3,控制pwm
printf("modeTemp:%d\n",gvalt[1]);
printf("valTemp:%d\n",gvalt[2]);
if(AppExit_pin_pwm==1)speed_change(gvalt);
if(AppExit_pin_pwm==3)ship_speed_change(gvalt);
if(AppExit_pin_pwm==3) {
if (thread_pool_add_task(pool, ship_speed_change, gvalt) != 0) {
printf("Failed to add task\n");
}
}
}
void message_4(cJSON *body, cJSON *pin_setctrl){//message 为4时候
......
#include "pthrpoll.h"
// 初始化任务队列
int task_queue_init(TaskQueue *queue) {
static int task_queue_init(TaskQueue *queue) {
queue->head = NULL;
queue->tail = NULL;
queue->size = 0;
......@@ -16,7 +17,7 @@ int task_queue_init(TaskQueue *queue) {
}
// 向任务队列中添加任务
int task_queue_add(TaskQueue *queue, void (*function)(void *), void *argument) {
static int task_queue_add(TaskQueue *queue, void (*function)(void *), void *argument) {
Task *task = (Task *)malloc(sizeof(Task));
if (task == NULL) {
return -1;
......@@ -42,7 +43,7 @@ int task_queue_add(TaskQueue *queue, void (*function)(void *), void *argument) {
}
// 从任务队列中取出任务
Task *task_queue_remove(TaskQueue *queue, ThreadPool *pool) {
static Task *task_queue_remove(TaskQueue *queue, ThreadPool *pool) {
pthread_mutex_lock(&queue->mutex);
while (queue->size == 0 && !pool->shutdown) {
......@@ -67,9 +68,13 @@ Task *task_queue_remove(TaskQueue *queue, ThreadPool *pool) {
}
// 工作线程函数
void *worker_thread(void *arg) {
static void *worker_thread(void *arg) {
ThreadPool *pool = (ThreadPool *)arg;
pthread_mutex_lock(&pool->mutex);
pool->active_threads++;
pthread_mutex_unlock(&pool->mutex);
while (1) {
Task *task = task_queue_remove(&pool->task_queue, pool);
if (task == NULL) {
......@@ -79,14 +84,54 @@ void *worker_thread(void *arg) {
task->function(task->argument);
free(task->argument); // 释放任务参数
free(task); // 释放任务本身
// 通知回收线程可能有空闲线程
pthread_cond_signal(&pool->reaper_cond);
}
pthread_mutex_lock(&pool->mutex);
pool->active_threads--;
pthread_mutex_unlock(&pool->mutex);
return NULL;
}
// 回收线程函数
static void *reaper_thread(void *arg) {
ThreadPool *pool = (ThreadPool *)arg;
while (!pool->shutdown) {
pthread_mutex_lock(&pool->mutex);
// 检查是否需要回收线程
while (pool->active_threads <= pool->min_threads ||
(pool->task_queue.size > 0 && pool->active_threads <= pool->task_queue.size)) {
pthread_cond_wait(&pool->reaper_cond, &pool->mutex);
if (pool->shutdown) {
pthread_mutex_unlock(&pool->mutex);
return NULL;
}
}
// 计算可以回收的线程数量
int excess_threads = pool->active_threads - pool->min_threads;
if (excess_threads > 0 && pool->active_threads > pool->min_threads) {
// 通过添加空任务来让工作线程退出
for (int i = 0; i < excess_threads; i++) {
task_queue_add(&pool->task_queue, NULL, NULL);
}
}
pthread_mutex_unlock(&pool->mutex);
sleep(1); // 避免过于频繁检查
}
return NULL;
}
// 初始化线程池
ThreadPool *thread_pool_init(int thread_count) {
if (thread_count <= 0) {
ThreadPool *thread_pool_init(int min_threads, int max_threads) {
if (min_threads <= 0 || max_threads <= 0 || min_threads > max_threads) {
errno = EINVAL;
return NULL;
}
......@@ -96,7 +141,7 @@ ThreadPool *thread_pool_init(int thread_count) {
return NULL;
}
pool->threads = (pthread_t *)malloc(thread_count * sizeof(pthread_t));
pool->threads = (pthread_t *)malloc(max_threads * sizeof(pthread_t));
if (pool->threads == NULL) {
free(pool);
return NULL;
......@@ -108,9 +153,14 @@ ThreadPool *thread_pool_init(int thread_count) {
return NULL;
}
pool->thread_count = thread_count;
pool->min_threads = min_threads;
pool->max_threads = max_threads;
pool->thread_count = min_threads;
pool->active_threads = 0;
pool->shutdown = 0;
if (pthread_mutex_init(&pool->mutex, NULL) != 0) {
if (pthread_mutex_init(&pool->mutex, NULL) != 0 ||
pthread_cond_init(&pool->reaper_cond, NULL) != 0) {
pthread_mutex_destroy(&pool->task_queue.mutex);
pthread_cond_destroy(&pool->task_queue.cond);
free(pool->threads);
......@@ -118,7 +168,8 @@ ThreadPool *thread_pool_init(int thread_count) {
return NULL;
}
for (int i = 0; i < thread_count; i++) {
// 创建工作线程
for (int i = 0; i < min_threads; i++) {
if (pthread_create(&pool->threads[i], NULL, worker_thread, pool) != 0) {
// 创建线程失败,关闭已创建的线程
pool->shutdown = 1;
......@@ -129,6 +180,7 @@ ThreadPool *thread_pool_init(int thread_count) {
}
pthread_mutex_destroy(&pool->mutex);
pthread_cond_destroy(&pool->reaper_cond);
pthread_mutex_destroy(&pool->task_queue.mutex);
pthread_cond_destroy(&pool->task_queue.cond);
free(pool->threads);
......@@ -137,6 +189,24 @@ ThreadPool *thread_pool_init(int thread_count) {
}
}
// 创建回收线程
if (pthread_create(&pool->reaper_thread, NULL, reaper_thread, pool) != 0) {
pool->shutdown = 1;
pthread_cond_broadcast(&pool->task_queue.cond);
for (int i = 0; i < min_threads; i++) {
pthread_join(pool->threads[i], NULL);
}
pthread_mutex_destroy(&pool->mutex);
pthread_cond_destroy(&pool->reaper_cond);
pthread_mutex_destroy(&pool->task_queue.mutex);
pthread_cond_destroy(&pool->task_queue.cond);
free(pool->threads);
free(pool);
return NULL;
}
return pool;
}
......@@ -151,6 +221,14 @@ int thread_pool_add_task(ThreadPool *pool, void (*function)(void *), void *argum
pthread_mutex_unlock(&pool->mutex);
return -1;
}
// 如果任务队列过长且可以创建更多线程,则创建新线程
if (pool->task_queue.size > pool->active_threads &&
pool->thread_count < pool->max_threads) {
if (pthread_create(&pool->threads[pool->thread_count], NULL, worker_thread, pool) == 0) {
pool->thread_count++;
}
}
pthread_mutex_unlock(&pool->mutex);
// 复制参数,确保生命周期
......@@ -173,19 +251,21 @@ void thread_pool_destroy(ThreadPool *pool) {
pool->shutdown = 1;
pthread_mutex_unlock(&pool->mutex);
// 唤醒所有等待的线程
// 唤醒所有线程
pthread_cond_broadcast(&pool->task_queue.cond);
pthread_cond_signal(&pool->reaper_cond);
// 等待所有线程退出
for (int i = 0; i < pool->thread_count; i++) {
pthread_join(pool->threads[i], NULL);
}
pthread_join(pool->reaper_thread, NULL);
// 清理剩余任务
Task *task = pool->task_queue.head;
while (task != NULL) {
Task *next = task->next;
free(task->argument);
if (task->argument) free(task->argument);
free(task);
task = next;
}
......@@ -195,8 +275,6 @@ void thread_pool_destroy(ThreadPool *pool) {
pthread_mutex_destroy(&pool->task_queue.mutex);
pthread_cond_destroy(&pool->task_queue.cond);
pthread_mutex_destroy(&pool->mutex);
pthread_cond_destroy(&pool->reaper_cond);
free(pool);
}
\ No newline at end of file
......@@ -24,6 +24,7 @@ int Device_File_Init() {
}else if(strcmp(sub_str,"03")==0){//船的编码
pwm_speed();
pin_ship_init(); //pwm初始化,车为停止
pool = thread_pool_init(1, 1);
AppExit_pin_pwm=3;//车的异常停止值
free(sub_str);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment