Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
C
car-controlserver
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
wenzhongjian
car-controlserver
Commits
a2d093f8
Commit
a2d093f8
authored
Aug 05, 2025
by
957dd
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
加入了程序限位
parent
85f391b7
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
153 additions
and
174 deletions
+153
-174
main
build/main
+0
-0
tank_common.c
drivers/devicecontrol/tank_common.c
+8
-10
gpio_control.c
drivers/gpio/gpio_control.c
+38
-23
gpio_init.c
drivers/gpio/gpio_init.c
+0
-2
tank_angle.c
drivers/sensors/tank_angle.c
+2
-2
http_request.h
modules/http/http_request.h
+1
-1
pthrpoll.c
modules/thread_pool/pthrpoll.c
+52
-109
pthrpoll.h
modules/thread_pool/pthrpoll.h
+52
-27
No files found.
build/main
View file @
a2d093f8
No preview for this file type
drivers/devicecontrol/tank_common.c
View file @
a2d093f8
...
@@ -55,7 +55,10 @@ const tank_common_back tank_common_config_t[]={
...
@@ -55,7 +55,10 @@ const tank_common_back tank_common_config_t[]={
};
};
void
tank_shot_back_stop_task_function
(
void
*
arg
)
{
//多线程处理坦克发射后退线程池
void
tank_shot_back_stop_task_function
(
void
*
arg
)
{
//多线程处理坦克发射后退线程池
if
(
arg
!=
NULL
){
free
(
arg
);
}
while
(
1
){
while
(
1
){
long
long
interval
=
shot_device_time_start
-
shot_device_time_end
;
long
long
interval
=
shot_device_time_start
-
shot_device_time_end
;
if
(
g_device_delay_count
>
g_tank_common_config_t
->
back_time
&&
g_device_delay_count
<
(
g_tank_common_config_t
->
back_time
+
30
))
if
(
g_device_delay_count
>
g_tank_common_config_t
->
back_time
&&
g_device_delay_count
<
(
g_tank_common_config_t
->
back_time
+
30
))
...
@@ -69,7 +72,7 @@ void tank_shot_back_stop_task_function(void *arg) {//多线程处理坦克发射
...
@@ -69,7 +72,7 @@ void tank_shot_back_stop_task_function(void *arg) {//多线程处理坦克发射
}
}
}
}
free
(
arg
);
}
}
ThreadPool_t
*
pool_tank_t
;
ThreadPool_t
*
pool_tank_t
;
...
@@ -78,7 +81,7 @@ void tank_shot_pthrpoll_task_init(){
...
@@ -78,7 +81,7 @@ void tank_shot_pthrpoll_task_init(){
int
*
arg
=
malloc
(
sizeof
(
int
));
int
*
arg
=
malloc
(
sizeof
(
int
));
*
arg
=
1
;
*
arg
=
1
;
pool_tank_t
=
thread_pool_init
(
1
,
1
);
pool_tank_t
=
thread_pool_init
(
1
,
1
);
thread_pool_add_task
(
pool_tank_t
,
tank_shot_back_stop_task_function
,
&
arg
);
thread_pool_add_task
(
pool_tank_t
,
tank_shot_back_stop_task_function
,
arg
);
my_zlog_debug
(
"线程池打开"
);
my_zlog_debug
(
"线程池打开"
);
}
}
...
@@ -110,13 +113,8 @@ int tank_shot_back_stop(unsigned char pin,unsigned char val){
...
@@ -110,13 +113,8 @@ int tank_shot_back_stop(unsigned char pin,unsigned char val){
/*销毁坦克使用的线程池,让其正常销毁,只有在tank相关设备号下才有用,最后销毁都会到device——common.h中*/
/*销毁坦克使用的线程池,让其正常销毁,只有在tank相关设备号下才有用,最后销毁都会到device——common.h中*/
void
tank_thread_close
(){
void
tank_thread_close
(){
if
(
pool_tank_t
!=
NULL
){
thread_pool_destroy
(
pool_tank_t
);
thread_pool_destroy
(
pool_tank_t
);
thread_pool_destroy
(
g_pool_device_gpio_control_t
);
}
if
(
g_pool_device_gpio_control_t
!=
NULL
){
thread_pool_destroy
(
g_pool_device_gpio_control_t
);
}
}
}
void
tank_shot_stop_control
(
int
device_id
,
unsigned
char
pin
,
unsigned
char
val
)
{
void
tank_shot_stop_control
(
int
device_id
,
unsigned
char
pin
,
unsigned
char
val
)
{
...
...
drivers/gpio/gpio_control.c
View file @
a2d093f8
...
@@ -10,24 +10,49 @@
...
@@ -10,24 +10,49 @@
const
gpiocontrol_t
*
gpio_control_config_t
=
NULL
;
const
gpiocontrol_t
*
gpio_control_config_t
=
NULL
;
ThreadPool_t
*
g_pool_device_gpio_control_t
=
NULL
;
ThreadPool_t
*
g_pool_device_gpio_control_t
;
void
tank_angle_limit_function
(
void
*
arg
);
static
bool
s_poll_tank_index
=
0
;
static
bool
s_poll_tank_index
=
0
;
void
public_pin_value
(
int
pin
,
int
value
);
void
public_pin_value
(
int
pin
,
int
value
);
void
public_pwm_value
(
int
pin
,
int
value
);
void
public_pwm_value
(
int
pin
,
int
value
);
void
tank0202_pwm_value
(
int
pin
,
int
value
);
void
tank0202_pwm_value
(
int
pin
,
int
value
);
void
tank0203_pwm_value
(
int
pin
,
int
value
);
void
tank0203_pwm_value
(
int
pin
,
int
value
);
void
tank0206_pwm_value
(
int
pin
,
int
value
);
void
tank0206_pwm_value
(
int
pin
,
int
value
);
void
device_gpio_control_threadpoll_init
(){
void
tank_angle_limit_function
(
void
*
arg_gpio
){
int
*
arg
=
malloc
(
sizeof
(
int
));
*
arg
=
1
;
g_pool_device_gpio_control_t
=
thread_pool_init
(
1
,
1
);
thread_pool_add_task
(
g_pool_device_gpio_control_t
,
tank_angle_limit_function
,
&
arg
);
if
(
arg_gpio
!=
NULL
)
{
free
(
arg_gpio
);
}
printf
(
"limit task started.
\n
"
);
while
(
1
){
int
limit_status
=
angle_limit
();
if
(
limit_status
==
1
)
{
device_gpio_control
(
g_device_type
,
5
,
0
);
my_zlog_debug
(
"lift limit stop"
);
}
else
if
(
limit_status
==
2
)
{
device_gpio_control
(
g_device_type
,
7
,
0
);
my_zlog_debug
(
"right limit stop"
);
}
else
if
(
limit_status
==
0
)
{
delay_ms
(
5
);
my_zlog_debug
(
"limit stop"
);
}
}
free
(
arg_gpio
);
}
void
device_gpio_control_threadpoll_init
(){
int
*
arg_gpio
=
malloc
(
sizeof
(
int
));
my_zlog_info
(
"device_gpio_control_threadpoll_init start
\n
"
);
*
arg_gpio
=
2
;
g_pool_device_gpio_control_t
=
thread_pool_init
(
1
,
1
);
thread_pool_add_task
(
g_pool_device_gpio_control_t
,
tank_angle_limit_function
,
arg_gpio
);
}
}
...
@@ -91,9 +116,10 @@ void device_gpio_control(int device_id,int pin,int val) {
...
@@ -91,9 +116,10 @@ void device_gpio_control(int device_id,int pin,int val) {
}
}
if
(
gpio_control_config_t
&&
if
(
gpio_control_config_t
&&
(
gpio_control_config_t
->
device_id
==
DEVICE_TANK0202
||
gpio_control_config_t
->
device_id
==
DEVICE_TANK0203
))
(
device_id
==
DEVICE_TANK0202
||
device_id
==
DEVICE_TANK0203
))
{
{
gpio_control_config_t
->
device_gpio_pthread_create
;
//创建线程,线程关闭在tank.common.h中何tank需要的其他线程关闭
my_zlog_info
(
"线程函数:%d
\n
"
,
device_id
);
gpio_control_config_t
->
device_gpio_pthread_create
();
//创建线程,线程关闭在tank.common.h中何tank需要的其他线程关闭
}
}
}
}
...
@@ -167,8 +193,7 @@ void tank0202_pwm_value(int pin,int value) { //软件陪我们控制调速
...
@@ -167,8 +193,7 @@ void tank0202_pwm_value(int pin,int value) { //软件陪我们控制调速
if
(
pin
==
27
){
if
(
pin
==
27
){
softPwmWrite
(
pin
,
50
);
softPwmWrite
(
pin
,
50
);
}
else
{
}
else
{
if
(
angle_limit
()
==
0
&&
(
pin
==
7
||
pin
==
5
))
softPwmWrite
(
pin
,
30
);
softPwmWrite
(
pin
,
30
);
if
(
pin
!=
7
&&
pin
!=
5
)
softPwmWrite
(
pin
,
30
);
my_zlog_debug
(
"pwm:%d"
,
pin
);
my_zlog_debug
(
"pwm:%d"
,
pin
);
}
}
...
@@ -193,8 +218,7 @@ void tank0203_pwm_value(int pin,int value) { //软件陪我们控制调速
...
@@ -193,8 +218,7 @@ void tank0203_pwm_value(int pin,int value) { //软件陪我们控制调速
if
(
pin
==
27
){
if
(
pin
==
27
){
softPwmWrite
(
pin
,
45
);
softPwmWrite
(
pin
,
45
);
}
else
{
}
else
{
if
(
angle_limit
()
==
0
&&
(
pin
==
7
||
pin
==
5
))
softPwmWrite
(
pin
,
30
);
softPwmWrite
(
pin
,
30
);
if
(
pin
!=
7
&&
pin
!=
5
)
softPwmWrite
(
pin
,
30
);
my_zlog_debug
(
"pwm:%d"
,
pin
);
my_zlog_debug
(
"pwm:%d"
,
pin
);
}
}
...
@@ -230,11 +254,3 @@ void tank0206_pwm_value(int pin,int value) { //软件陪我们控制调速
...
@@ -230,11 +254,3 @@ void tank0206_pwm_value(int pin,int value) { //软件陪我们控制调速
my_zlog_debug
(
"tank0206 pwm"
);
my_zlog_debug
(
"tank0206 pwm"
);
}
}
void
tank_angle_limit_function
(
void
*
arg
){
while
(
1
){
if
(
angle_limit
()
==
1
)
device_gpio_control
(
g_device_type
,
5
,
0
);
else
if
(
angle_limit
()
==
2
)
device_gpio_control
(
g_device_type
,
7
,
0
);
else
if
(
angle_limit
()
==
0
)
delay_ms
(
5
);
}
}
\ No newline at end of file
drivers/gpio/gpio_init.c
View file @
a2d093f8
...
@@ -54,8 +54,6 @@ void pwm_all_default() {//全部至低电平,车和坦克共用
...
@@ -54,8 +54,6 @@ void pwm_all_default() {//全部至低电平,车和坦克共用
}
}
/*物理pwm初始化*/
/*物理pwm初始化*/
void
physics_pwm_init
()
{
void
physics_pwm_init
()
{
int
pwm_clock
=
24000000
/
(
50
*
1000
);
// 定义 PWM 频率为 50Hz
int
pwm_clock
=
24000000
/
(
50
*
1000
);
// 定义 PWM 频率为 50Hz
...
...
drivers/sensors/tank_angle.c
View file @
a2d093f8
...
@@ -2,9 +2,9 @@
...
@@ -2,9 +2,9 @@
#include "common.h"
#include "common.h"
#include "ads1115.h"
#include "ads1115.h"
#define LIFT_LIMIT 1
65
#define LIFT_LIMIT 1
70
#define MIDDLE_LIMIT 180
#define MIDDLE_LIMIT 180
#define RIGHT_LIMIT
19
5
#define RIGHT_LIMIT
20
5
double
tank_angle
(){
double
tank_angle
(){
double
angle
=
0
;
double
angle
=
0
;
...
...
modules/http/http_request.h
View file @
a2d093f8
...
@@ -4,7 +4,7 @@
...
@@ -4,7 +4,7 @@
#include"common.h"// 用于存储HTTP响应数据的结构体
#include"common.h"// 用于存储HTTP响应数据的结构体
/*2为关闭请求,1为打开*/
/*2为关闭请求,1为打开*/
#define HTTP_REQUEST_INDEX
2
#define HTTP_REQUEST_INDEX
1
struct
MemoryStruct
{
struct
MemoryStruct
{
char
*
memory
;
char
*
memory
;
...
...
modules/thread_pool/pthrpoll.c
View file @
a2d093f8
#include "pthrpoll.h"
#include "pthrpoll.h"
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <errno.h>
// 初始化任务队列
// 初始化任务队列
static
int
task_queue_init
(
TaskQueue
*
queue
)
{
static
int
task_queue_init
(
TaskQueue
*
queue
)
{
...
@@ -16,6 +20,7 @@ static int task_queue_init(TaskQueue *queue) {
...
@@ -16,6 +20,7 @@ static int task_queue_init(TaskQueue *queue) {
}
}
// 向任务队列中添加任务
// 向任务队列中添加任务
// 注意:现在不负责复制参数,调用者需要保证 argument 的生命周期
static
int
task_queue_add
(
TaskQueue
*
queue
,
void
(
*
function
)(
void
*
),
void
*
argument
)
{
static
int
task_queue_add
(
TaskQueue
*
queue
,
void
(
*
function
)(
void
*
),
void
*
argument
)
{
Task
*
task
=
(
Task
*
)
malloc
(
sizeof
(
Task
));
Task
*
task
=
(
Task
*
)
malloc
(
sizeof
(
Task
));
if
(
task
==
NULL
)
{
if
(
task
==
NULL
)
{
...
@@ -36,6 +41,7 @@ static int task_queue_add(TaskQueue *queue, void (*function)(void *), void *argu
...
@@ -36,6 +41,7 @@ static int task_queue_add(TaskQueue *queue, void (*function)(void *), void *argu
}
}
queue
->
size
++
;
queue
->
size
++
;
// 唤醒一个等待的线程
pthread_cond_signal
(
&
queue
->
cond
);
pthread_cond_signal
(
&
queue
->
cond
);
pthread_mutex_unlock
(
&
queue
->
mutex
);
pthread_mutex_unlock
(
&
queue
->
mutex
);
return
0
;
return
0
;
...
@@ -45,10 +51,12 @@ static int task_queue_add(TaskQueue *queue, void (*function)(void *), void *argu
...
@@ -45,10 +51,12 @@ static int task_queue_add(TaskQueue *queue, void (*function)(void *), void *argu
static
Task
*
task_queue_remove
(
TaskQueue
*
queue
,
ThreadPool_t
*
pool
)
{
static
Task
*
task_queue_remove
(
TaskQueue
*
queue
,
ThreadPool_t
*
pool
)
{
pthread_mutex_lock
(
&
queue
->
mutex
);
pthread_mutex_lock
(
&
queue
->
mutex
);
// 如果队列为空且线程池未关闭,则等待
while
(
queue
->
size
==
0
&&
!
pool
->
shutdown
)
{
while
(
queue
->
size
==
0
&&
!
pool
->
shutdown
)
{
pthread_cond_wait
(
&
queue
->
cond
,
&
queue
->
mutex
);
pthread_cond_wait
(
&
queue
->
cond
,
&
queue
->
mutex
);
}
}
// 如果线程池已关闭且队列为空,则线程可以退出了
if
(
pool
->
shutdown
&&
queue
->
size
==
0
)
{
if
(
pool
->
shutdown
&&
queue
->
size
==
0
)
{
pthread_mutex_unlock
(
&
queue
->
mutex
);
pthread_mutex_unlock
(
&
queue
->
mutex
);
return
NULL
;
return
NULL
;
...
@@ -70,65 +78,29 @@ static Task *task_queue_remove(TaskQueue *queue, ThreadPool_t *pool) {
...
@@ -70,65 +78,29 @@ static Task *task_queue_remove(TaskQueue *queue, ThreadPool_t *pool) {
static
void
*
worker_thread
(
void
*
arg
)
{
static
void
*
worker_thread
(
void
*
arg
)
{
ThreadPool_t
*
pool
=
(
ThreadPool_t
*
)
arg
;
ThreadPool_t
*
pool
=
(
ThreadPool_t
*
)
arg
;
pthread_mutex_lock
(
&
pool
->
mutex
);
pool
->
active_threads
++
;
pthread_mutex_unlock
(
&
pool
->
mutex
);
while
(
1
)
{
while
(
1
)
{
// 从队列获取任务,如果返回 NULL,说明需要退出
Task
*
task
=
task_queue_remove
(
&
pool
->
task_queue
,
pool
);
Task
*
task
=
task_queue_remove
(
&
pool
->
task_queue
,
pool
);
if
(
task
==
NULL
)
{
if
(
task
==
NULL
)
{
break
;
// 收到关闭信号且队列为空
break
;
}
}
// 执行任务
task
->
function
(
task
->
argument
);
task
->
function
(
task
->
argument
);
free
(
task
->
argument
);
// 释放任务参数
// 释放任务结构体本身
free
(
task
);
// 释放任务本身
free
(
task
);
// 注意:task->argument 的释放由任务函数内部或调用者负责
// 通知回收线程可能有空闲线程
pthread_cond_signal
(
&
pool
->
reaper_cond
);
}
}
// 线程退出前,减少存活线程计数
pthread_mutex_lock
(
&
pool
->
mutex
);
pthread_mutex_lock
(
&
pool
->
mutex
);
pool
->
act
ive_threads
--
;
pool
->
l
ive_threads
--
;
pthread_mutex_unlock
(
&
pool
->
mutex
);
pthread_mutex_unlock
(
&
pool
->
mutex
);
return
NULL
;
return
NULL
;
}
}
// 回收线程函数
// 初始化线程池 (修正版,非单例)
static
void
*
reaper_thread
(
void
*
arg
)
{
ThreadPool_t
*
pool
=
(
ThreadPool_t
*
)
arg
;
while
(
!
pool
->
shutdown
)
{
pthread_mutex_lock
(
&
pool
->
mutex
);
// 检查是否需要回收线程
while
(
pool
->
active_threads
<=
pool
->
min_threads
||
(
pool
->
task_queue
.
size
>
0
&&
pool
->
active_threads
<=
pool
->
task_queue
.
size
))
{
pthread_cond_wait
(
&
pool
->
reaper_cond
,
&
pool
->
mutex
);
if
(
pool
->
shutdown
)
{
pthread_mutex_unlock
(
&
pool
->
mutex
);
return
NULL
;
}
}
// 计算可以回收的线程数量
int
excess_threads
=
pool
->
active_threads
-
pool
->
min_threads
;
if
(
excess_threads
>
0
&&
pool
->
active_threads
>
pool
->
min_threads
)
{
// 通过添加空任务来让工作线程退出
for
(
int
i
=
0
;
i
<
excess_threads
;
i
++
)
{
task_queue_add
(
&
pool
->
task_queue
,
NULL
,
NULL
);
}
}
pthread_mutex_unlock
(
&
pool
->
mutex
);
sleep
(
1
);
// 避免过于频繁检查
}
return
NULL
;
}
// 初始化线程池
ThreadPool_t
*
thread_pool_init
(
int
min_threads
,
int
max_threads
)
{
ThreadPool_t
*
thread_pool_init
(
int
min_threads
,
int
max_threads
)
{
if
(
min_threads
<=
0
||
max_threads
<=
0
||
min_threads
>
max_threads
)
{
if
(
min_threads
<=
0
||
max_threads
<=
0
||
min_threads
>
max_threads
)
{
errno
=
EINVAL
;
errno
=
EINVAL
;
...
@@ -140,26 +112,26 @@ ThreadPool_t *thread_pool_init(int min_threads, int max_threads) {
...
@@ -140,26 +112,26 @@ ThreadPool_t *thread_pool_init(int min_threads, int max_threads) {
return
NULL
;
return
NULL
;
}
}
pool
->
threads
=
(
pthread_t
*
)
malloc
(
max_threads
*
sizeof
(
pthread_t
));
if
(
task_queue_init
(
&
pool
->
task_queue
)
!=
0
)
{
if
(
pool
->
threads
==
NULL
)
{
free
(
pool
);
free
(
pool
);
return
NULL
;
return
NULL
;
}
}
if
(
task_queue_init
(
&
pool
->
task_queue
)
!=
0
)
{
pool
->
threads
=
(
pthread_t
*
)
malloc
(
max_threads
*
sizeof
(
pthread_t
));
free
(
pool
->
threads
);
if
(
pool
->
threads
==
NULL
)
{
pthread_mutex_destroy
(
&
pool
->
task_queue
.
mutex
);
pthread_cond_destroy
(
&
pool
->
task_queue
.
cond
);
free
(
pool
);
free
(
pool
);
return
NULL
;
return
NULL
;
}
}
pool
->
min_threads
=
min_threads
;
pool
->
min_threads
=
min_threads
;
pool
->
max_threads
=
max_threads
;
pool
->
max_threads
=
max_threads
;
pool
->
thread_count
=
min_threads
;
pool
->
live_threads
=
0
;
// 初始为0,创建一个增加一个
pool
->
active_threads
=
0
;
pool
->
busy_threads
=
0
;
// 可以增加这个来跟踪繁忙线程(本次简化实现中未使用)
pool
->
shutdown
=
0
;
pool
->
shutdown
=
0
;
if
(
pthread_mutex_init
(
&
pool
->
mutex
,
NULL
)
!=
0
||
if
(
pthread_mutex_init
(
&
pool
->
mutex
,
NULL
)
!=
0
)
{
pthread_cond_init
(
&
pool
->
reaper_cond
,
NULL
)
!=
0
)
{
pthread_mutex_destroy
(
&
pool
->
task_queue
.
mutex
);
pthread_mutex_destroy
(
&
pool
->
task_queue
.
mutex
);
pthread_cond_destroy
(
&
pool
->
task_queue
.
cond
);
pthread_cond_destroy
(
&
pool
->
task_queue
.
cond
);
free
(
pool
->
threads
);
free
(
pool
->
threads
);
...
@@ -167,43 +139,14 @@ ThreadPool_t *thread_pool_init(int min_threads, int max_threads) {
...
@@ -167,43 +139,14 @@ ThreadPool_t *thread_pool_init(int min_threads, int max_threads) {
return
NULL
;
return
NULL
;
}
}
// 创建工作线程
// 创建
核心(最小)数量的
工作线程
for
(
int
i
=
0
;
i
<
min_threads
;
i
++
)
{
for
(
int
i
=
0
;
i
<
min_threads
;
i
++
)
{
if
(
pthread_create
(
&
pool
->
threads
[
i
],
NULL
,
worker_thread
,
pool
)
!=
0
)
{
if
(
pthread_create
(
&
pool
->
threads
[
i
],
NULL
,
worker_thread
,
pool
)
!=
0
)
{
// 创建线程失败,关闭已创建的线程
// 如果创建失败,销毁已创建的资源并返回
pool
->
shutdown
=
1
;
thread_pool_destroy
(
pool
);
pthread_cond_broadcast
(
&
pool
->
task_queue
.
cond
);
for
(
int
j
=
0
;
j
<
i
;
j
++
)
{
pthread_join
(
pool
->
threads
[
j
],
NULL
);
}
pthread_mutex_destroy
(
&
pool
->
mutex
);
pthread_cond_destroy
(
&
pool
->
reaper_cond
);
pthread_mutex_destroy
(
&
pool
->
task_queue
.
mutex
);
pthread_cond_destroy
(
&
pool
->
task_queue
.
cond
);
free
(
pool
->
threads
);
free
(
pool
);
return
NULL
;
return
NULL
;
}
}
}
pool
->
live_threads
++
;
// 创建回收线程
if
(
pthread_create
(
&
pool
->
reaper_thread
,
NULL
,
reaper_thread
,
pool
)
!=
0
)
{
pool
->
shutdown
=
1
;
pthread_cond_broadcast
(
&
pool
->
task_queue
.
cond
);
for
(
int
i
=
0
;
i
<
min_threads
;
i
++
)
{
pthread_join
(
pool
->
threads
[
i
],
NULL
);
}
pthread_mutex_destroy
(
&
pool
->
mutex
);
pthread_cond_destroy
(
&
pool
->
reaper_cond
);
pthread_mutex_destroy
(
&
pool
->
task_queue
.
mutex
);
pthread_cond_destroy
(
&
pool
->
task_queue
.
cond
);
free
(
pool
->
threads
);
free
(
pool
);
return
NULL
;
}
}
return
pool
;
return
pool
;
...
@@ -221,23 +164,19 @@ int thread_pool_add_task(ThreadPool_t *pool, void (*function)(void *), void *arg
...
@@ -221,23 +164,19 @@ int thread_pool_add_task(ThreadPool_t *pool, void (*function)(void *), void *arg
return
-
1
;
return
-
1
;
}
}
// 如果任务队列过长且可以创建更多线程,则创建新线程
// 动态扩容:如果当前任务数大于存活线程数,且尚未达到最大线程数
if
(
pool
->
task_queue
.
size
>
pool
->
active_threads
&&
// 这是一个简单的扩容策略,可以根据需要调整
pool
->
thread_count
<
pool
->
max_threads
)
{
if
(
pool
->
task_queue
.
size
>
0
&&
pool
->
live_threads
<
pool
->
max_threads
)
{
if
(
pthread_create
(
&
pool
->
threads
[
pool
->
thread_count
],
NULL
,
worker_thread
,
pool
)
==
0
)
{
// 尝试创建一个新线程
pool
->
thread_count
++
;
if
(
pthread_create
(
&
pool
->
threads
[
pool
->
live_threads
],
NULL
,
worker_thread
,
pool
)
==
0
)
{
pool
->
live_threads
++
;
}
}
// 如果创建失败,也没关系,现有线程会处理任务
}
}
pthread_mutex_unlock
(
&
pool
->
mutex
);
pthread_mutex_unlock
(
&
pool
->
mutex
);
// 复制参数,确保生命周期
// 直接添加任务,不再复制参数
void
*
arg_copy
=
malloc
(
sizeof
(
*
argument
));
return
task_queue_add
(
&
pool
->
task_queue
,
function
,
argument
);
if
(
arg_copy
==
NULL
)
{
return
-
1
;
}
*
(
int
*
)
arg_copy
=
*
(
int
*
)
argument
;
return
task_queue_add
(
&
pool
->
task_queue
,
function
,
arg_copy
);
}
}
// 销毁线程池
// 销毁线程池
...
@@ -247,33 +186,38 @@ void thread_pool_destroy(ThreadPool_t *pool) {
...
@@ -247,33 +186,38 @@ void thread_pool_destroy(ThreadPool_t *pool) {
}
}
pthread_mutex_lock
(
&
pool
->
mutex
);
pthread_mutex_lock
(
&
pool
->
mutex
);
// 如果已经调用过销毁,直接返回
if
(
pool
->
shutdown
)
{
pthread_mutex_unlock
(
&
pool
->
mutex
);
return
;
}
pool
->
shutdown
=
1
;
pool
->
shutdown
=
1
;
pthread_mutex_unlock
(
&
pool
->
mutex
);
pthread_mutex_unlock
(
&
pool
->
mutex
);
// 唤醒所有
线程
// 唤醒所有
可能在等待任务的线程,让他们检查 shutdown 标志并退出
pthread_cond_broadcast
(
&
pool
->
task_queue
.
cond
);
pthread_cond_broadcast
(
&
pool
->
task_queue
.
cond
);
pthread_cond_signal
(
&
pool
->
reaper_cond
);
// 等待所有线程退出
// 等待所有存活的线程退出
for
(
int
i
=
0
;
i
<
pool
->
thread_count
;
i
++
)
{
// live_threads 在这里是创建过的线程数,因为我们去掉了回收功能
for
(
int
i
=
0
;
i
<
pool
->
live_threads
;
i
++
)
{
pthread_join
(
pool
->
threads
[
i
],
NULL
);
pthread_join
(
pool
->
threads
[
i
],
NULL
);
}
}
pthread_join
(
pool
->
reaper_thread
,
NULL
);
// 清理
剩余
任务
// 清理
任务队列中未执行的
任务
Task
*
task
=
pool
->
task_queue
.
head
;
Task
*
task
=
pool
->
task_queue
.
head
;
while
(
task
!=
NULL
)
{
while
(
task
!=
NULL
)
{
Task
*
next
=
task
->
next
;
Task
*
next
=
task
->
next
;
if
(
task
->
argument
)
free
(
task
->
argument
);
// 注意:这里我们只释放任务结构体
// 参数 argument 的内存应该由调用者在确认线程池销毁后自己管理
// 或者设计一个统一的清理回调
free
(
task
);
free
(
task
);
task
=
next
;
task
=
next
;
}
}
// 释放资源
// 释放
所有
资源
free
(
pool
->
threads
);
free
(
pool
->
threads
);
pthread_mutex_destroy
(
&
pool
->
task_queue
.
mutex
);
pthread_mutex_destroy
(
&
pool
->
task_queue
.
mutex
);
pthread_cond_destroy
(
&
pool
->
task_queue
.
cond
);
pthread_cond_destroy
(
&
pool
->
task_queue
.
cond
);
pthread_mutex_destroy
(
&
pool
->
mutex
);
pthread_mutex_destroy
(
&
pool
->
mutex
);
pthread_cond_destroy
(
&
pool
->
reaper_cond
);
free
(
pool
);
free
(
pool
);
}
}
\ No newline at end of file
modules/thread_pool/pthrpoll.h
View file @
a2d093f8
#ifndef
THREAD_POO
L_H
#ifndef
PTHRPOL
L_H
#define
THREAD_POO
L_H
#define
PTHRPOL
L_H
#include "common.h"
// 为了让头文件自给自足,直接包含它所需要的依赖
#include <common.h>
// 任务结构体
// 任务结构体
typedef
struct
Task
{
typedef
struct
Task
{
void
(
*
function
)(
void
*
);
void
(
*
function
)(
void
*
);
// 函数指针,指向要执行的任务函数
void
*
argument
;
void
*
argument
;
// 传递给任务函数的参数
struct
Task
*
next
;
struct
Task
*
next
;
// 指向下一个任务的指针,构成链表
}
Task
;
}
Task
;
// 任务队列结构体
// 任务队列结构体
typedef
struct
{
typedef
struct
{
Task
*
head
;
Task
*
head
;
// 队列头部
Task
*
tail
;
Task
*
tail
;
// 队列尾部
int
size
;
int
size
;
// 队列中任务的数量
pthread_mutex_t
mutex
;
pthread_mutex_t
mutex
;
// 用于保护任务队列的互斥锁
pthread_cond_t
cond
;
pthread_cond_t
cond
;
// 用于线程同步的条件变量
}
TaskQueue
;
}
TaskQueue
;
// 线程池结构体
// 线程池结构体
(此结构体已被简化和明确化)
typedef
struct
{
typedef
struct
{
pthread_t
*
threads
;
pthread_t
*
threads
;
// 存放线程ID的数组
pthread_t
reaper_thread
;
// 新增:回收线程
int
min_threads
;
// 线程池中最小线程数
int
thread_count
;
int
max_threads
;
// 线程池中最大线程数
int
min_threads
;
// 新增:最小线程数
int
max_threads
;
// 新增:最大线程数
// --- 字段已重命名和简化 ---
int
active_threads
;
// 新增:活跃线程数
int
live_threads
;
// 当前存活的线程数
TaskQueue
task_queue
;
int
busy_threads
;
// 当前正在忙碌(执行任务)的线程数(为未来功能增强预留)
int
shutdown
;
pthread_mutex_t
mutex
;
TaskQueue
task_queue
;
// 任务队列
pthread_cond_t
reaper_cond
;
// 新增:回收线程条件变量
int
shutdown
;
// 线程池关闭标志(1表示关闭,0表示运行)
pthread_mutex_t
mutex
;
// 用于保护线程池级别变量(如线程计数)的互斥锁
// --- 已移除的字段 ---
// pthread_t reaper_thread; // 已移除:回收线程的逻辑复杂且存在缺陷。
// pthread_cond_t reaper_cond; // 已移除:没有回收线程后不再需要。
// 'thread_count' 和 'active_threads' 的概念现在由 'live_threads' 更清晰地表示。
}
ThreadPool_t
;
}
ThreadPool_t
;
// 初始化线程池
/**
* @brief 初始化线程池。
* @param min_threads 线程池中保持存活的最小线程数。
* @param max_threads 线程池可以扩展到的最大线程数。
* @return 成功时返回新创建的线程池指针,失败时返回 NULL。
*/
ThreadPool_t
*
thread_pool_init
(
int
min_threads
,
int
max_threads
);
ThreadPool_t
*
thread_pool_init
(
int
min_threads
,
int
max_threads
);
// 向线程池添加任务
/**
* @brief 向线程池的任务队列中添加一个新任务。
* @param pool 要添加任务的线程池。
* @param function 要执行的任务函数指针。
* @param argument 传递给任务函数的参数。
* 注意:调用者负责管理此参数的内存。
* 如果参数是动态分配的,任务函数内部应负责释放它。
* @return 成功返回 0,失败返回 -1。
*/
int
thread_pool_add_task
(
ThreadPool_t
*
pool
,
void
(
*
function
)(
void
*
),
void
*
argument
);
int
thread_pool_add_task
(
ThreadPool_t
*
pool
,
void
(
*
function
)(
void
*
),
void
*
argument
);
// 销毁线程池
/**
* @brief 关闭并清理线程池资源。
* 此函数会等待所有当前正在运行的任务执行完毕,但不会执行队列中剩余的任务。
* @param pool 要销毁的线程池。
*/
void
thread_pool_destroy
(
ThreadPool_t
*
pool
);
void
thread_pool_destroy
(
ThreadPool_t
*
pool
);
#endif //THREAD_POOL_H
#endif // PTHRPOLL_H
\ No newline at end of file
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment