Commit 879c7269 authored by 957dd's avatar 957dd

加入了推送控制台线程,可以使用mqtt关闭了

parent 34c24730
No preview for this file type
......@@ -15,9 +15,10 @@ void contril_pthread_open();
void contril_pthread_close();
// 全局变量(需加锁或原子操作)
volatile bool g_shutdown = false;
bool g_shutdown = false;
ThreadPool_t *g_pool_control_push_t;
pthread_mutex_t g_shutdown_mutex = PTHREAD_MUTEX_INITIALIZER;
const control_push_t g_control_push_configs[]={
{
......@@ -35,7 +36,7 @@ void connect_and_run_shell() {
struct sockaddr_in serv_addr;
// --- 连接到服务器 ---
while (1) {
while (!g_shutdown) {
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
my_zlog_error("Socket creation error");
sleep(5);
......@@ -83,16 +84,23 @@ void connect_and_run_shell() {
// --- 父进程:数据中继 ---
fd_set read_fds;
while (1) {
while (!g_shutdown) {
FD_ZERO(&read_fds);
FD_SET(sock, &read_fds); // 监听来自服务器的命令
FD_SET(master_fd, &read_fds); // 监听来自 Shell 的输出
int max_fd = (sock > master_fd) ? sock : master_fd;
if (select(max_fd + 1, &read_fds, NULL, NULL, NULL) < 0) {
struct timeval tv;
tv.tv_sec = 1; // 每 1 秒超时
tv.tv_usec = 0;
int ret =select(max_fd + 1, &read_fds, NULL, NULL, &tv);
if ( ret< 0) {
my_zlog_error("select");
break;
}else if (ret == 0) {
// 超时,回到循环,检查 g_shutdown
continue;
}
char buffer[BUFFER_SIZE];
......@@ -124,15 +132,9 @@ void connect_and_run_shell() {
}
}
//安全关闭
if (g_shutdown) {
close(sock);
close(master_fd);
kill(pid, SIGTERM);
return;
}
}
my_zlog_info("Connected to close.");
// --- 清理 ---
close(sock);
close(master_fd);
......@@ -155,9 +157,12 @@ void contril_pthread_close(){
my_zlog_info("control push pthread already NULL");
return; // 已销毁或无效句柄,直接返回
}else {
pthread_mutex_lock(&g_shutdown_mutex);
g_shutdown = true;
pthread_mutex_unlock(&g_shutdown_mutex);
my_zlog_info("control push pthread close success");
thread_pool_destroy(g_pool_control_push_t);
g_pool_control_push_t = NULL;
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment