Commit 6d50e9e8 authored by 957dd's avatar 957dd

优化了mqtt的问题,使用了异步检查重连机制,并且添加了重连10次将不会重连

parent 05e5f38d
......@@ -116,12 +116,20 @@ void *thread_open_browser(void *arg) {
void *thread_mqtt_reconnect(void *arg) {
while(1){
if(mqtt_init() == 0){
my_zlog_warn("mqtt success");
static int reconnect_count=0;
static int remqtt_index;
remqtt_index=mqtt_init();
if(remqtt_index == 0){
my_zlog_warn("mqtt success",remqtt_index);
break;
}else {
my_zlog_warn("wait... mqtt reconect");
my_zlog_warn("wait... mqtt reconect,error:%d",remqtt_index);
delay_ms(300);
reconnect_count++;
if(reconnect_count>MAX_RECONNECT_ATTEMPTS){
send_fail_mqtt_conect();
break;
}
continue;
}
}
......
No preview for this file type
......@@ -10,25 +10,59 @@ mqttclient g_clients_t[MAX_SERVERS];
char g_uuid_mqtt_topic_id[MAX_SERVERS][56];
// struct mosquitto *mosq;
// 新增:用于向其他MQTT服务器发送通知的函数
void send_server_failure_notification(const char *failed_host, int failed_port, const char *failed_client_id);
int add_mqtt_create(const char *host, int port, int clients_count);
//mqtt初始化
int mqtt_init() {
mosquitto_lib_init();
// 这里创建mosq可以保留,也可以删除这一行,让创建放到 Mqtt_onnect 里
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){
int erg= add_mqtt_create(g_mqtt_cam_config_t->mqtt_servers[i],BROKER_PORT,i);
if(erg !=0){
return -1;
int success_count = 0;
int failure_count = 0;
// 在尝试重新初始化之前,重置所有客户端状态。
// 如果 mqtt_init 被多次调用,这一点至关重要。
for (int i = 0; i < MAX_SERVERS; i++) {
if (g_clients_t[i].mosq) { // 如果已经存在,先销毁旧的
mosquitto_disconnect(g_clients_t[i].mosq);
mosquitto_destroy(g_clients_t[i].mosq);
}
g_clients_t[i].mosq = NULL;
memset(g_clients_t[i].host, 0, sizeof(g_clients_t[i].host));
g_clients_t[i].port = 0;
// 清除 client_id 和 topic_id
memset(g_clients_t[i].client_id, 0, sizeof(g_clients_t[i].client_id));
memset(g_uuid_mqtt_topic_id[i], 0, sizeof(g_uuid_mqtt_topic_id[i]));
}
if (!g_clients_t) {
my_zlog_error("Failed to create Mosquitto client");
return -1;
// 假设 g_mqtt_cam_config_t 已经有效
if (!g_mqtt_cam_config_t) {
my_zlog_fatal("mqtt_init: g_mqtt_cam_config_t is NULL. Cannot initialize.");
return -1; // 表示严重错误
}
return 0;
for (int i = 0; i < g_mqtt_cam_config_t->mqtt_count; i++) {
// 正确地将索引 'i' 传递给 add_mqtt_create
int erg = add_mqtt_create(g_mqtt_cam_config_t->mqtt_servers[i], BROKER_PORT, i);
if (erg != 0) {
failure_count++;
my_zlog_error("未能为 %s:%d 添加 MQTT 客户端", g_mqtt_cam_config_t->mqtt_servers[i], BROKER_PORT);
} else {
success_count++;
}
}
// 返回失败的数量,或 0 表示完全成功
if (failure_count > 0) {
my_zlog_error("%d 个 MQTT 客户端初始化失败。", failure_count);
return failure_count; // 指示有多少失败
}
my_zlog_info("所有 %d 个 MQTT 客户端初始化成功。", success_count);
return 0; // 成功
}
//回调函数
......@@ -36,6 +70,11 @@ void on_connect(struct mosquitto *mosq, void *obj, int rc) {
mqttclient *client_t = (mqttclient*)obj;
if (!client_t) { // 安全检查
my_zlog_error("on_connect: client_t 为 NULL");
return;
}
if (rc == 0) {
my_zlog_info("Connected to broker");
my_zlog_info("[Connected] %s:%d (%s)",
......@@ -44,9 +83,29 @@ void on_connect(struct mosquitto *mosq, void *obj, int rc) {
mosquitto_subscribe(mosq, NULL, mqtt_topic_app2dev_number(), 0);
mosquitto_subscribe(mosq, NULL, mqtt_topic_ser2dev_number(), 0);
} else {
fprintf(stderr, "Connection failed with code %d\n", rc);
my_zlog_fatal("[Connection Failed] %s:%d rc=%d",
client_t->host, client_t->port, rc);
client_t->reconnect_attempts++;
my_zlog_warn("客户端 %s:%d 连接失败。重试次数: %d/%d",
client_t->host, client_t->port, client_t->reconnect_attempts, MAX_RECONNECT_ATTEMPTS);
if (client_t->reconnect_attempts >= MAX_RECONNECT_ATTEMPTS) {
my_zlog_error("客户端 %s:%d 达到最大重试次数,不再尝试连接。标记为永久失败。",
client_t->host, client_t->port);
client_t->permanently_failed = true;
// 停止 mosquitto 循环并销毁客户端
// 注意:在回调中直接销毁当前正在操作的 mosq 可能会导致问题,
// 更好的做法是设置一个标志,让外部循环(如果存在)或专门的清理线程来处理。
// 但因为 mosquitto_loop_start 是独立的线程,这里尝试停止并销毁它。
// mosquitto_loop_stop 可能需要等待线程结束,这里是同步调用
mosquitto_loop_stop(mosq, false); // false 表示不强制,等待线程自行结束
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
client_t->mosq = NULL; // 清除指针
// 向其他活跃的 MQTT 服务器发送通知
send_server_failure_notification(client_t->host, client_t->port, client_t->client_id);
}
}
}
......@@ -56,6 +115,11 @@ void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_messag
mqttclient *client_t = (mqttclient*)obj;
if (!client_t || !message || !message->topic) { // 安全检查
my_zlog_error("on_message: 无效参数 (client_t, message 或 topic 为 NULL)");
return;
}
my_zlog_info("[Message from %s:%d] topic=%s payload=%s",
client_t->host, client_t->port,
message->topic, (char*)message->payload);
......@@ -82,54 +146,87 @@ void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_messag
}
}
int add_mqtt_create(const char *host, int port, int clients_count){
static int uuid_index =0;
// 添加 MQTT 客户端并尝试连接
// clients_idx 是 g_clients_t 和 g_uuid_mqtt_topic_id 数组的索引
int add_mqtt_create(const char *host, int port, int clients_idx) {
// 移除了 'static int uuid_index = 0;'
// 不再需要独立的 uuid_index,直接使用传入的 clients_idx 作为索引
if(clients_count >= MAX_SERVERS) {
my_zlog_error("Max server limit reached.");
if (clients_idx >= MAX_SERVERS) {
my_zlog_error("达到最大服务器限制,索引 %d。", clients_idx);
return -1;
}
mqttclient *client_t = &g_clients_t[clients_count];
strncpy(client_t->host, host, sizeof(client_t->host)-1);
// 检查主机名是否为空或无效
if (!host || strlen(host) == 0) {
my_zlog_error("主机名为空,索引 %d。", clients_idx);
return -1;
}
mqttclient *client_t = &g_clients_t[clients_idx];
// 复制主机名,确保字符串末尾空终止
strncpy(client_t->host, host, sizeof(client_t->host) - 1);
client_t->host[sizeof(client_t->host) - 1] = '\0';
client_t->port = port;
// 生成 UUID 作为客户端 ID
uuid_t uuid;
uuid_generate_time(uuid); // 获取UUIDv1
uuid_generate_time(uuid); // 获取 UUIDv1
// 组合 UUID 的一部分作为短 ID,并存储在对应 clients_idx 的位置
// 这将作为 mosquitto 客户端的 ID
unsigned short time_part = (uuid[0] << 8) | uuid[1];
unsigned short mac_part = (uuid[10] << 8) | uuid[11];
snprintf(g_uuid_mqtt_topic_id[clients_idx], sizeof(g_uuid_mqtt_topic_id[clients_idx]), "%04x%04x", time_part, mac_part);
g_uuid_mqtt_topic_id[clients_idx][sizeof(g_uuid_mqtt_topic_id[clients_idx])-1] = '\0'; // 确保空终止
// 组合成32位值(8字符十六进制)
snprintf(g_uuid_mqtt_topic_id[uuid_index], 9, "%04x%04x", time_part, mac_part);
// 将生成的 Client ID 也存储到 client_t 结构体中
strncpy(client_t->client_id, g_uuid_mqtt_topic_id[clients_idx], sizeof(client_t->client_id) -1);
client_t->client_id[sizeof(client_t->client_id) -1] = '\0'; // 确保空终止
client_t->mosq = mosquitto_new(g_uuid_mqtt_topic_id[uuid_index], true, client_t);
// 创建 mosquitto 客户端实例
client_t->mosq = mosquitto_new(g_uuid_mqtt_topic_id[clients_idx], true, client_t);
uuid_index++;
if(!client_t->mosq) {
my_zlog_error("mosquitto_new failed");
if (!client_t->mosq) {
my_zlog_error("mosquitto_new 失败,客户端索引 %d", clients_idx);
return -1;
}
// 设置 MQTT 协议版本
mosquitto_int_option(client_t->mosq, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5);
// 设置重连延迟策略
mosquitto_reconnect_delay_set(client_t->mosq, 2, 10, true);
// 设置回调函数
mosquitto_connect_callback_set(client_t->mosq, on_connect);
mosquitto_message_callback_set(client_t->mosq, on_message);
// 设置用户名和密码
mosquitto_username_pw_set(client_t->mosq, USERNAME, PASSWORD);
// 尝试异步连接到 MQTT Broker
int rc = mosquitto_connect_async(client_t->mosq, host, port, 60);
if( rc != MOSQ_ERR_SUCCESS) {
my_zlog_error("mosquitto_connect failed");
my_zlog_warn("Failed to connect to broker: %s", mosquitto_strerror(rc));
if (rc != MOSQ_ERR_SUCCESS) {
my_zlog_error("mosquitto_connect_async 失败,主机 %s:%d (rc=%d): %s", host, port, rc, mosquitto_strerror(rc));
mosquitto_destroy(client_t->mosq); // 如果连接失败,立即销毁 mosquitto 客户端实例
client_t->mosq = NULL; // 将指针置空,以防止在清理时操作无效指针
return -1;
}
// 启动 mosquitto 循环线程,处理网络 I/O 和回调
int loop_rc = mosquitto_loop_start(client_t->mosq);
if (loop_rc != MOSQ_ERR_SUCCESS) {
my_zlog_error("未能为客户端 %s 启动 mosquitto_loop_start: %s", client_t->client_id, mosquitto_strerror(loop_rc));
// 如果 loop_start 失败,也应断开连接并销毁
mosquitto_disconnect(client_t->mosq);
mosquitto_destroy(client_t->mosq);
client_t->mosq = NULL; // 确保指针置空
return -1;
}
my_zlog_info("add %s mqttserver success",host);
mosquitto_loop_start(client_t->mosq);
return 0;
my_zlog_info("成功为主机 %s 添加 MQTT 客户端", host);
return 0; // 成功
}
//mqtt清理
......@@ -138,6 +235,7 @@ void mqtt_clean(){
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){
mosquitto_disconnect(g_clients_t[i].mosq);
mosquitto_destroy(g_clients_t[i].mosq);
g_clients_t[i].mosq = NULL; // 将指针置空,防止悬挂指针
}
mosquitto_lib_cleanup();
......@@ -148,19 +246,19 @@ int mqtt_cycle() {//非阻塞型
while (1) {
bool all_connected = true;
// bool all_connected = true;
for (int i = 0; i < g_mqtt_cam_config_t->mqtt_count; i++) {
if (!g_clients_t[i].mosq) continue;
// for (int i = 0; i < g_mqtt_cam_config_t->mqtt_count; i++) {
// if (!g_clients_t[i].mosq) continue;
int state = mosquitto_socket(g_clients_t[i].mosq);
if (state == -1) {
all_connected = false;
my_zlog_warn("检测到 MQTT [%s:%d] 已断开,尝试重连...",
g_clients_t[i].host, g_clients_t[i].port);
mosquitto_reconnect_async(g_clients_t[i].mosq);
}
}
// int state = mosquitto_socket(g_clients_t[i].mosq);
// if (state == -1) {
// all_connected = false;
// my_zlog_warn("检测到 MQTT [%s:%d] 已断开,尝试重连...",
// g_clients_t[i].host, g_clients_t[i].port);
// //mosquitto_reconnect_async(g_clients_t[i].mosq);
// }
// }
delay_ms(1000); // 每 1 秒检测一次连接状态
}
......@@ -179,3 +277,67 @@ int mqtt_cycle() {//非阻塞型
return 0;
}
// 向其他活跃的 MQTT 服务器发送通知
void send_server_failure_notification(const char *failed_host, int failed_port, const char *failed_client_id) {
my_zlog_info("准备发送服务器故障通知: %s:%d (%s)", failed_host, failed_port, failed_client_id);
cJSON *root = cJSON_CreateObject();
if (!root) {
my_zlog_error("创建 JSON 对象失败。");
return;
}
cJSON_AddStringToObject(root, "event_type", "server_failure");
cJSON_AddStringToObject(root, "failed_host", failed_host);
cJSON_AddNumberToObject(root, "failed_port", failed_port);
cJSON_AddStringToObject(root, "failed_client_id", failed_client_id);
cJSON_AddStringToObject(root, "message", "MQTT server permanently disconnected after multiple retries.");
cJSON_AddNumberToObject(root, "message_type", 9999);
char *json_string = cJSON_PrintUnformatted(root);
if (!json_string) {
my_zlog_error("将 JSON 打印到字符串失败。");
cJSON_Delete(root);
return;
}
for (int i = 0; i < MAX_SERVERS; i++) {
// 遍历所有客户端,如果客户端是活跃的且不是出问题的那个,就发送通知
if (g_clients_t[i].mosq && !g_clients_t[i].permanently_failed) {
// 确保不是给自己发送通知(通常同一个客户端不需要知道自己的失败)
// 尽管理论上失败的客户端的 mosq 已经为 NULL,这里仍然做个双重检查
if (strcmp(g_clients_t[i].host, failed_host) != 0 || g_clients_t[i].port != failed_port) {
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_dev2app_number(), strlen(json_string), json_string, 0, false);
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_pure_number(), strlen(json_string), json_string, 0, false);
}
}
}
}
free(json_string);
cJSON_Delete(root);
}
void send_fail_mqtt_conect(){
cJSON *root = cJSON_CreateObject();
if (!root) {
my_zlog_error("创建 JSON 对象失败。");
return;
}
cJSON_AddStringToObject(root, "event_type", "server_connect_failure");
cJSON_AddNumberToObject(root, "message_type", 9999);
char *json_string = cJSON_PrintUnformatted(root);
if (!json_string) {
my_zlog_error("将 JSON 打印到字符串失败。");
cJSON_Delete(root);
return;
}
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_dev2app_number(), strlen(json_string), json_string, 0, false);
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_pure_number(), strlen(json_string), json_string, 0, false);
}
}
\ No newline at end of file
......@@ -8,11 +8,15 @@
#define MAX_SERVERS 10
#define MAX_RECONNECT_ATTEMPTS 10
typedef struct {
struct mosquitto *mosq;
char host[128];
int port;
char client_id[64];
int reconnect_attempts; // 新增:重连尝试次数
bool permanently_failed; // 新增:标记是否永久失败
} mqttclient;
extern mqttclient g_clients_t[MAX_SERVERS];
......@@ -34,4 +38,7 @@ int mqtt_cycle();//循环
//mqtt清理
void mqtt_clean();
//莫一个服务器连接失败发送
void send_fail_mqtt_conect();
#endif
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment