Commit 6eca326e authored by 957dd's avatar 957dd

验证加入了res,开启验证和关闭验证都有校验了

parent 13cf7d94
...@@ -107,9 +107,7 @@ void scan_wifi_json() { ...@@ -107,9 +107,7 @@ void scan_wifi_json() {
if (json_str) { if (json_str) {
my_zlog_debug("%s", json_str); my_zlog_debug("%s", json_str);
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){ mqtt_publish_to_all(mqtt_topic_pure_number(), json_str, 0);
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_pure_number(), strlen(json_str), json_str, 0, false);
}
free(json_str); free(json_str);
} }
...@@ -394,9 +392,7 @@ void wifichange_sendmqtt(int wifi_status) { //改WiFi发送 ...@@ -394,9 +392,7 @@ void wifichange_sendmqtt(int wifi_status) { //改WiFi发送
char *payload = cJSON_PrintUnformatted(root); char *payload = cJSON_PrintUnformatted(root);
my_zlog_debug("%s",payload); my_zlog_debug("%s",payload);
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){ mqtt_publish_to_all(mqtt_topic_pure_number(), payload, 0);
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_pure_number(), strlen(payload), payload, 0, false);
}
free(payload); free(payload);
cJSON_Delete(root); cJSON_Delete(root);
......
...@@ -23,9 +23,7 @@ void device_wifi_public_sendmqtt(int wifi_status) { ...@@ -23,9 +23,7 @@ void device_wifi_public_sendmqtt(int wifi_status) {
char *payload = cJSON_PrintUnformatted(root); char *payload = cJSON_PrintUnformatted(root);
my_zlog_debug("%s",payload); my_zlog_debug("%s",payload);
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){ mqtt_publish_to_all(mqtt_topic_pure_number(), payload, 0);
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_pure_number(), strlen(payload), payload, 0, false);
}
free(payload); free(payload);
cJSON_Delete(root); cJSON_Delete(root);
...@@ -173,9 +171,7 @@ void device_send_saved_wifi(){ ...@@ -173,9 +171,7 @@ void device_send_saved_wifi(){
if (json_str) { if (json_str) {
my_zlog_debug("%s", json_str); my_zlog_debug("%s", json_str);
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){ mqtt_publish_to_all(mqtt_topic_pure_number(), json_str, 0);
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_pure_number(), strlen(json_str), json_str, 0, false);
}
free(json_str); free(json_str);
} }
......
...@@ -54,20 +54,25 @@ char *device_id_function(){ ...@@ -54,20 +54,25 @@ char *device_id_function(){
/*打开版本文件,版本文件是go语言进行创建,只需要读取就行了*/ /*打开版本文件,版本文件是go语言进行创建,只需要读取就行了*/
char *program_version() { char *program_version() {
FILE *file; FILE *file;
static char s_buffer_version[30];// 用于存储版本号内容 static char s_buffer_version[30] = "unknown";
file = fopen(FILE_VERSION, "r"); // 以只读模式打开文件
file = fopen(FILE_VERSION, "r");
if (file == NULL) { if (file == NULL) {
my_zlog_info("版本文件无"); my_zlog_debug("版本文件无");
} else { return s_buffer_version;
// 尝试读取文件内容 }
if (fgets(s_buffer_version, sizeof(s_buffer_version), file) != NULL) { if (fgets(s_buffer_version, sizeof(s_buffer_version), file) != NULL) {
// 如果文件内容不为空 size_t len = strlen(s_buffer_version);
if (len > 0 && s_buffer_version[len - 1] == '\n') {
s_buffer_version[len - 1] = '\0';
}
fclose(file); fclose(file);
my_zlog_info("读取到文件内容: %s", s_buffer_version); my_zlog_debug("读取到文件内容: %s", s_buffer_version);
return s_buffer_version; return s_buffer_version;
} else {
my_zlog_info("文件为空,等待中...");
} }
fclose(file); fclose(file);
} my_zlog_debug("文件为空,等待中...");
return s_buffer_version;
} }
\ No newline at end of file
...@@ -87,15 +87,26 @@ void *thread_mqtt_beat(void *arg) ...@@ -87,15 +87,26 @@ void *thread_mqtt_beat(void *arg)
{ {
my_zlog_info("thread_mqtt_beat start"); my_zlog_info("thread_mqtt_beat start");
s_webrtc_index = 1; s_webrtc_index = 1;
delay_s(3); delay_s(1);
int mqtt_wait_ms = 0;
while (!mqtt_has_connected() && mqtt_wait_ms < 15000)
{
delay_ms(100);
mqtt_wait_ms += 100;
}
/*在这步进行检测,等mqtt连接成功后检查*/ /*在这步进行检测,等mqtt连接成功后检查*/
// if(wifi_change_sendmqtt_init()!=0){ // if(wifi_change_sendmqtt_init()!=0){
// my_zlog_error("WIFI是否更改检查相关,不会到这一步,有问题会直接重启,可能会导致重启,不会因此造成程序停止"); // my_zlog_error("WIFI是否更改检查相关,不会到这一步,有问题会直接重启,可能会导致重启,不会因此造成程序停止");
// } // }
if (mqtt_has_connected()) {
if (verify_open_index_init() != 0) if (verify_open_index_init() != 0)
{ {
my_zlog_warn("mqtt send verify is NULL"); my_zlog_warn("mqtt send verify is NULL");
} }
} else {
my_zlog_warn("MQTT未就绪,跳过首次验证开关查询(3013)");
}
// 如果为相关的不需要心跳或者游览器设备,提早结束线程 // 如果为相关的不需要心跳或者游览器设备,提早结束线程
if (g_device_type == DEVICE_ROBOT_DOG0501) if (g_device_type == DEVICE_ROBOT_DOG0501)
...@@ -107,6 +118,7 @@ void *thread_mqtt_beat(void *arg) ...@@ -107,6 +118,7 @@ void *thread_mqtt_beat(void *arg)
while (1) while (1)
{ {
delay_ms(100); delay_ms(100);
s_mqtt_grc = mqtt_has_connected() ? 0 : 1;
verify_open_query_poll(); verify_open_query_poll();
...@@ -235,34 +247,30 @@ void *thread_open_browser(void *arg) ...@@ -235,34 +247,30 @@ void *thread_open_browser(void *arg)
// mqtt异常处理,断开自动重连,简单粗暴 // mqtt异常处理,断开自动重连,简单粗暴
void *thread_mqtt_reconnect(void *arg) void *thread_mqtt_reconnect(void *arg)
{ {
int retry_count = 0;
int rc = mqtt_init();
while (1) while (rc != 0 && retry_count < MQTT_INIT_RETRY_MAX)
{ {
static int reconnect_count = 0; retry_count++;
static int remqtt_index; my_zlog_warn("wait... mqtt reconect,retry:%d/%d", retry_count, MQTT_INIT_RETRY_MAX);
remqtt_index = mqtt_init(); delay_ms(300);
if (remqtt_index == 0) rc = mqtt_retry_failed_clients();
}
if (rc == 0)
{ {
s_mqtt_grc = 0;
my_zlog_info("mqtt success"); my_zlog_info("mqtt success");
break;
} }
else else
{ {
my_zlog_warn("wait... mqtt reconect,error:%d", remqtt_index); s_mqtt_grc = 1;
delay_ms(300); my_zlog_error("mqtt init failed after %d retries", MQTT_INIT_RETRY_MAX);
reconnect_count++;
if (reconnect_count > MAX_RECONNECT_ATTEMPTS)
{
send_fail_mqtt_conect(); send_fail_mqtt_conect();
break;
}
continue;
}
} }
mqtt_cycle(); mqtt_cycle();
mqtt_clean();
return NULL; return NULL;
} }
......
No preview for this file type
...@@ -307,10 +307,7 @@ void pg0403_serial_gpssend_mqtt_json(int id, uint16_t x, uint16_t y) { ...@@ -307,10 +307,7 @@ void pg0403_serial_gpssend_mqtt_json(int id, uint16_t x, uint16_t y) {
char mqtt_topic_pg[256]; char mqtt_topic_pg[256];
sprintf(mqtt_topic_pg,"positioning/%s",mqtt_topic_pure_number()); sprintf(mqtt_topic_pg,"positioning/%s",mqtt_topic_pure_number());
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){ mqtt_publish_to_all(mqtt_topic_pg, json_str, 0);
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_pg ,strlen(json_str), json_str, 0, false);
}
free(json_str); free(json_str);
cJSON_Delete(root); cJSON_Delete(root);
......
...@@ -195,9 +195,7 @@ void send_self_contorl_mqtt(){ ...@@ -195,9 +195,7 @@ void send_self_contorl_mqtt(){
my_zlog_info("%s",json_str); my_zlog_info("%s",json_str);
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){ mqtt_publish_to_all(mqtt_topic_pure_number(), json_str, 0);
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_pure_number(), strlen(json_str), json_str, 0, false);
}
free(json_str); free(json_str);
cJSON_Delete(root); cJSON_Delete(root);
...@@ -226,9 +224,7 @@ void send_self_contorl_date_mqtt(){ ...@@ -226,9 +224,7 @@ void send_self_contorl_date_mqtt(){
my_zlog_info("%s",json_str); my_zlog_info("%s",json_str);
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){ mqtt_publish_to_all(mqtt_topic_pure_number(), json_str, 0);
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_pure_number(), strlen(json_str), json_str, 0, false);
}
free(json_str); free(json_str);
cJSON_Delete(root); cJSON_Delete(root);
......
...@@ -84,9 +84,7 @@ void audioplay_send_mqtt() { ...@@ -84,9 +84,7 @@ void audioplay_send_mqtt() {
char* json_string = cJSON_PrintUnformatted(root); char* json_string = cJSON_PrintUnformatted(root);
my_zlog_debug("%s",json_string); my_zlog_debug("%s",json_string);
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){ mqtt_publish_to_all(mqtt_topic_pure_number(), json_string, 0);
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_pure_number(), strlen(json_string), json_string, 0, false);
}
free(json_string); free(json_string);
cJSON_Delete(root); cJSON_Delete(root);
} }
......
...@@ -419,9 +419,7 @@ int mqtt_audio_tts_send(int status){ ...@@ -419,9 +419,7 @@ int mqtt_audio_tts_send(int status){
char* json_str = cJSON_PrintUnformatted(root); char* json_str = cJSON_PrintUnformatted(root);
my_zlog_debug("%s",json_str); my_zlog_debug("%s",json_str);
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){ mqtt_publish_to_all(mqtt_topic_pure_number(), json_str, 0);
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_pure_number(), strlen(json_str), json_str, 0, false);
}
free(json_str); free(json_str);
cJSON_Delete(root); cJSON_Delete(root);
......
...@@ -58,10 +58,10 @@ int alarm_control_close(float voltage) {//关闭警报 ...@@ -58,10 +58,10 @@ int alarm_control_close(float voltage) {//关闭警报
int alarm_control(float voltage) { int alarm_control(float voltage) {
#if WARN_MODE == 1 #if WARN_MODE == 1
my_zlog_info("Mode 1: 执行代码A"); my_zlog_debug("Mode 1: 执行代码A");
return alarm_control_open(voltage); return alarm_control_open(voltage);
#elif WARN_MODE == 2 #elif WARN_MODE == 2
my_zlog_info("Mode 2: 执行代码B"); my_zlog_debug("Mode 2: 执行代码B");
return alarm_control_close(voltage); return alarm_control_close(voltage);
#else #else
my_zlog_error("未知的 WARN_MODE: %d", WARN_MODE); my_zlog_error("未知的 WARN_MODE: %d", WARN_MODE);
......
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
#define MQTT_AES_H__ #define MQTT_AES_H__
#define MQTT_ENCRYPT_KEY "Kp9mX2nQ7vRw4sLd" #define MQTT_ENCRYPT_KEY "Kp9mX2nQ7vRw4sLd"
#define MQTT_VERIFY_AUTH_KEY "fcrs-verify-off-2026"
/* AES/ECB/PKCS7 解密 Base64 密文,调用方 free(*plain_json_out)。成功返回 0。 */ /* AES/ECB/PKCS7 解密 Base64 密文,调用方 free(*plain_json_out)。成功返回 0。 */
int mqtt_aes_decrypt_body(const char *b64_cipher, const char *secret_key, char **plain_json_out); int mqtt_aes_decrypt_body(const char *b64_cipher, const char *secret_key, char **plain_json_out);
......
...@@ -66,11 +66,7 @@ void heartbeat_send() ...@@ -66,11 +66,7 @@ void heartbeat_send()
char *payload = cJSON_PrintUnformatted(root); char *payload = cJSON_PrintUnformatted(root);
my_zlog_debug("%s", payload); my_zlog_debug("%s", payload);
for (int i = 0; i < g_mqtt_cam_config_t->mqtt_count; i++) mqtt_publish_to_topics(mqtt_topic_dev2app_number(), mqtt_topic_pure_number(), payload, 0);
{
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_dev2app_number(), strlen(payload), payload, 0, false);
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_pure_number(), strlen(payload), payload, 0, false);
}
free(payload); free(payload);
cJSON_Delete(root); cJSON_Delete(root);
...@@ -110,10 +106,7 @@ void angle_mqtt_send() ...@@ -110,10 +106,7 @@ void angle_mqtt_send()
cJSON_Delete(root); cJSON_Delete(root);
return; return;
} }
for (int i = 0; i < g_mqtt_cam_config_t->mqtt_count; i++) mqtt_publish_to_all(TOPIC_send_angle, payload, 0);
{
mosquitto_publish(g_clients_t[i].mosq, NULL, TOPIC_send_angle, strlen(payload), payload, 0, false);
}
free(payload); free(payload);
cJSON_Delete(root); cJSON_Delete(root);
......
...@@ -8,61 +8,121 @@ ...@@ -8,61 +8,121 @@
mqttclient g_clients_t[MAX_SERVERS]; mqttclient g_clients_t[MAX_SERVERS];
static char s_uuid_mqtt_topic_id[MAX_SERVERS][56]; static char s_uuid_mqtt_topic_id[MAX_SERVERS][56];
static bool s_mosquitto_lib_inited = false;
// struct mosquitto *mosq; // struct mosquitto *mosq;
// 新增:用于向其他MQTT服务器发送通知的函数 // 新增:用于向其他MQTT服务器发送通知的函数
void send_server_failure_notification(const char *failed_host, int failed_port, const char *failed_client_id); void send_server_failure_notification(const char *failed_host, int failed_port, const char *failed_client_id);
int add_mqtt_create(const char *host, int port, int clients_count); int add_mqtt_create(const char *host, int port, int clients_count);
static int mqtt_init_internal(bool reset_all_slots, bool retry_failed_only);
//mqtt初始化 //mqtt初始化
int mqtt_init() { int mqtt_init() {
return mqtt_init_internal(true, false);
}
mosquitto_lib_init(); int mqtt_retry_failed_clients(void)
{
return mqtt_init_internal(false, true);
}
bool mqtt_has_connected(void)
{
int total_count = g_mqtt_cam_config_t ? g_mqtt_cam_config_t->mqtt_count : 0;
for (int i = 0; i < total_count; i++) {
if (g_clients_t[i].mosq && !g_clients_t[i].permanently_failed) {
return true;
}
}
return false;
}
int mqtt_publish_to_all(const char *topic, const char *payload, int qos)
{
int total_count = g_mqtt_cam_config_t ? g_mqtt_cam_config_t->mqtt_count : 0;
int sent_count = 0;
if (!topic || !payload) {
return 0;
}
for (int i = 0; i < total_count; i++) {
if (!g_clients_t[i].mosq || g_clients_t[i].permanently_failed) {
continue;
}
if (mosquitto_publish(g_clients_t[i].mosq, NULL, topic, strlen(payload), payload, qos, false) == MOSQ_ERR_SUCCESS) {
sent_count++;
}
}
return sent_count;
}
int mqtt_publish_to_topics(const char *topic1, const char *topic2, const char *payload, int qos)
{
int sent_count = 0;
if (topic1) {
sent_count += mqtt_publish_to_all(topic1, payload, qos);
}
if (topic2) {
sent_count += mqtt_publish_to_all(topic2, payload, qos);
}
return sent_count;
}
static int mqtt_init_internal(bool reset_all_slots, bool retry_failed_only)
{
int success_count = 0; int success_count = 0;
int failure_count = 0; int failure_count = 0;
// 在尝试重新初始化之前,重置所有客户端状态。 if (!s_mosquitto_lib_inited) {
// 如果 mqtt_init 被多次调用,这一点至关重要。 mosquitto_lib_init();
s_mosquitto_lib_inited = true;
}
if (!g_mqtt_cam_config_t) {
my_zlog_fatal("mqtt_init: g_mqtt_cam_config_t is NULL. Cannot initialize.");
return -1;
}
if (reset_all_slots) {
for (int i = 0; i < MAX_SERVERS; i++) { for (int i = 0; i < MAX_SERVERS; i++) {
if (g_clients_t[i].mosq) { // 如果已经存在,先销毁旧的 if (g_clients_t[i].mosq) {
mosquitto_disconnect(g_clients_t[i].mosq); mosquitto_disconnect(g_clients_t[i].mosq);
mosquitto_destroy(g_clients_t[i].mosq); mosquitto_destroy(g_clients_t[i].mosq);
} }
g_clients_t[i].mosq = NULL; g_clients_t[i].mosq = NULL;
g_clients_t[i].reconnect_attempts = 0;
g_clients_t[i].permanently_failed = false;
memset(g_clients_t[i].host, 0, sizeof(g_clients_t[i].host)); memset(g_clients_t[i].host, 0, sizeof(g_clients_t[i].host));
g_clients_t[i].port = 0; g_clients_t[i].port = 0;
// 清除 client_id 和 topic_id
memset(g_clients_t[i].client_id, 0, sizeof(g_clients_t[i].client_id)); memset(g_clients_t[i].client_id, 0, sizeof(g_clients_t[i].client_id));
memset(s_uuid_mqtt_topic_id[i], 0, sizeof(s_uuid_mqtt_topic_id[i])); memset(s_uuid_mqtt_topic_id[i], 0, sizeof(s_uuid_mqtt_topic_id[i]));
} }
// 假设 g_mqtt_cam_config_t 已经有效
if (!g_mqtt_cam_config_t) {
my_zlog_fatal("mqtt_init: g_mqtt_cam_config_t is NULL. Cannot initialize.");
return -1; // 表示严重错误
} }
for (int i = 0; i < g_mqtt_cam_config_t->mqtt_count; i++) { for (int i = 0; i < g_mqtt_cam_config_t->mqtt_count; i++) {
// 正确地将索引 'i' 传递给 add_mqtt_create if (retry_failed_only && (g_clients_t[i].mosq || g_clients_t[i].permanently_failed)) {
int erg = add_mqtt_create(g_mqtt_cam_config_t->mqtt_servers[i], BROKER_PORT, i); continue;
if (erg != 0) { }
int rc = add_mqtt_create(g_mqtt_cam_config_t->mqtt_servers[i], BROKER_PORT, i);
if (rc == 0) {
success_count++;
} else {
failure_count++; failure_count++;
my_zlog_error("未能为 %s:%d 添加 MQTT 客户端", g_mqtt_cam_config_t->mqtt_servers[i], BROKER_PORT); my_zlog_error("未能为 %s:%d 添加 MQTT 客户端", g_mqtt_cam_config_t->mqtt_servers[i], BROKER_PORT);
} else {
success_count++;
} }
} }
// 返回失败的数量,或 0 表示完全成功 my_zlog_info("MQTT初始化结果: success=%d failure=%d total=%d",
if (failure_count > 0) { success_count, failure_count, g_mqtt_cam_config_t->mqtt_count);
my_zlog_error("%d 个 MQTT 客户端初始化失败。", failure_count);
return failure_count; // 指示有多少失败 if (mqtt_has_connected()) {
return 0;
} }
my_zlog_info("所有 %d 个 MQTT 客户端初始化成功。", success_count); return -1;
return 0; // 成功
} }
//回调函数 //回调函数
...@@ -79,6 +139,8 @@ void on_connect(struct mosquitto *mosq, void *obj, int rc) { ...@@ -79,6 +139,8 @@ void on_connect(struct mosquitto *mosq, void *obj, int rc) {
my_zlog_info("Connected to broker"); my_zlog_info("Connected to broker");
my_zlog_info("[Connected] %s:%d (%s)", my_zlog_info("[Connected] %s:%d (%s)",
client_t->host, client_t->port, client_t->client_id); client_t->host, client_t->port, client_t->client_id);
client_t->reconnect_attempts = 0;
client_t->permanently_failed = false;
mosquitto_subscribe(mosq, NULL, mqtt_topic_app2dev_number(), 0); mosquitto_subscribe(mosq, NULL, mqtt_topic_app2dev_number(), 0);
mosquitto_subscribe(mosq, NULL, mqtt_topic_ser2dev_number(), 0); mosquitto_subscribe(mosq, NULL, mqtt_topic_ser2dev_number(), 0);
...@@ -164,10 +226,21 @@ int add_mqtt_create(const char *host, int port, int clients_idx) { ...@@ -164,10 +226,21 @@ int add_mqtt_create(const char *host, int port, int clients_idx) {
} }
mqttclient *client_t = &g_clients_t[clients_idx]; mqttclient *client_t = &g_clients_t[clients_idx];
if (client_t->mosq) {
if (strcmp(client_t->host, host) == 0 && client_t->port == port) {
return 0;
}
mosquitto_disconnect(client_t->mosq);
mosquitto_destroy(client_t->mosq);
client_t->mosq = NULL;
}
// 复制主机名,确保字符串末尾空终止 // 复制主机名,确保字符串末尾空终止
strncpy(client_t->host, host, sizeof(client_t->host) - 1); strncpy(client_t->host, host, sizeof(client_t->host) - 1);
client_t->host[sizeof(client_t->host) - 1] = '\0'; client_t->host[sizeof(client_t->host) - 1] = '\0';
client_t->port = port; client_t->port = port;
client_t->reconnect_attempts = 0;
client_t->permanently_failed = false;
// 生成 UUID 作为客户端 ID // 生成 UUID 作为客户端 ID
uuid_t uuid; uuid_t uuid;
...@@ -233,12 +306,18 @@ int add_mqtt_create(const char *host, int port, int clients_idx) { ...@@ -233,12 +306,18 @@ int add_mqtt_create(const char *host, int port, int clients_idx) {
void mqtt_clean(){ void mqtt_clean(){
// 清理 // 清理
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){ for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){
if (!g_clients_t[i].mosq) {
continue;
}
mosquitto_disconnect(g_clients_t[i].mosq); mosquitto_disconnect(g_clients_t[i].mosq);
mosquitto_destroy(g_clients_t[i].mosq); mosquitto_destroy(g_clients_t[i].mosq);
g_clients_t[i].mosq = NULL; // 将指针置空,防止悬挂指针 g_clients_t[i].mosq = NULL; // 将指针置空,防止悬挂指针
} }
if (s_mosquitto_lib_inited) {
mosquitto_lib_cleanup(); mosquitto_lib_cleanup();
s_mosquitto_lib_inited = false;
}
my_zlog_info("清理成功"); my_zlog_info("清理成功");
} }
...@@ -294,20 +373,7 @@ void send_server_failure_notification(const char *failed_host, int failed_port, ...@@ -294,20 +373,7 @@ void send_server_failure_notification(const char *failed_host, int failed_port,
return; return;
} }
for (int i = 0; i < MAX_SERVERS; i++) { mqtt_publish_to_topics(mqtt_topic_dev2app_number(), mqtt_topic_pure_number(), json_string, 0);
// 遍历所有客户端,如果客户端是活跃的且不是出问题的那个,就发送通知
if (g_clients_t[i].mosq && !g_clients_t[i].permanently_failed) {
// 确保不是给自己发送通知(通常同一个客户端不需要知道自己的失败)
// 尽管理论上失败的客户端的 mosq 已经为 NULL,这里仍然做个双重检查
if (strcmp(g_clients_t[i].host, failed_host) != 0 || g_clients_t[i].port != failed_port) {
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_dev2app_number(), strlen(json_string), json_string, 0, false);
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_pure_number(), strlen(json_string), json_string, 0, false);
}
}
}
}
free(json_string); free(json_string);
cJSON_Delete(root); cJSON_Delete(root);
...@@ -330,10 +396,7 @@ void send_fail_mqtt_conect(){ ...@@ -330,10 +396,7 @@ void send_fail_mqtt_conect(){
return; return;
} }
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){ mqtt_publish_to_topics(mqtt_topic_dev2app_number(), mqtt_topic_pure_number(), json_string, 0);
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_dev2app_number(), strlen(json_string), json_string, 0, false);
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_pure_number(), strlen(json_string), json_string, 0, false);
}
free(json_string); free(json_string);
cJSON_Delete(root); cJSON_Delete(root);
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#define MAX_SERVERS 15 #define MAX_SERVERS 15
#define MAX_RECONNECT_ATTEMPTS 10 #define MAX_RECONNECT_ATTEMPTS 10
#define MQTT_INIT_RETRY_MAX 3
typedef struct { typedef struct {
struct mosquitto *mosq; struct mosquitto *mosq;
...@@ -31,6 +32,10 @@ extern int gPwmCount; // 计数 ...@@ -31,6 +32,10 @@ extern int gPwmCount; // 计数
extern int gmessage_type;//message消息值 extern int gmessage_type;//message消息值
int mqtt_init();//mqtt初始化 int mqtt_init();//mqtt初始化
int mqtt_retry_failed_clients(void); // 启动阶段:仅重试失败槽位
bool mqtt_has_connected(void); // 是否至少有一个可用连接
int mqtt_publish_to_all(const char *topic, const char *payload, int qos);
int mqtt_publish_to_topics(const char *topic1, const char *topic2, const char *payload, int qos);
//void on_connect(struct mosquitto *mosq, void *obj, int rc);//回调函数 //void on_connect(struct mosquitto *mosq, void *obj, int rc);//回调函数
//void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message);//消息回调函数 //void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message);//消息回调函数
......
...@@ -84,9 +84,7 @@ int send_jwtser(const char *token){ ...@@ -84,9 +84,7 @@ int send_jwtser(const char *token){
char *payload = cJSON_PrintUnformatted(root); char *payload = cJSON_PrintUnformatted(root);
my_zlog_debug("%s",payload); my_zlog_debug("%s",payload);
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){ mqtt_publish_to_all(mqtt_topic_pure_number(), payload, 0);
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_pure_number(), strlen(payload), payload, 0, false);
}
free(payload); free(payload);
free(only_Id); free(only_Id);
...@@ -257,7 +255,6 @@ int message2013_recverigy_open(cJSON *body_raw) ...@@ -257,7 +255,6 @@ int message2013_recverigy_open(cJSON *body_raw)
cJSON *body = NULL; cJSON *body = NULL;
char *plain_json = NULL; char *plain_json = NULL;
cJSON *verify_status = NULL; cJSON *verify_status = NULL;
cJSON *auth_key = NULL;
if (body_raw == NULL) { if (body_raw == NULL) {
my_zlog_warn("2013 body为空"); my_zlog_warn("2013 body为空");
...@@ -304,15 +301,8 @@ int message2013_recverigy_open(cJSON *body_raw) ...@@ -304,15 +301,8 @@ int message2013_recverigy_open(cJSON *body_raw)
} }
if (verify_status->valueint == 0) { if (verify_status->valueint == 0) {
auth_key = cJSON_GetObjectItem(body, "auth_key");
if (!cJSON_IsString(auth_key) || auth_key->valuestring == NULL
|| strcmp(auth_key->valuestring, MQTT_VERIFY_AUTH_KEY) != 0) {
my_zlog_warn("2013 关闭验证 auth_key校验失败");
cJSON_Delete(body);
return -1;
}
VERIFIED_MODE = FALSE; VERIFIED_MODE = FALSE;
my_zlog_info("关闭验证成功(加密+auth_key校验通过)"); my_zlog_info("关闭验证成功(加密校验通过), verify_status=0");
cJSON_Delete(body); cJSON_Delete(body);
return 0; return 0;
} }
...@@ -345,9 +335,7 @@ int message_sendopen_verify(){ ...@@ -345,9 +335,7 @@ int message_sendopen_verify(){
my_zlog_debug("发送验证查询: %s", payload); my_zlog_debug("发送验证查询: %s", payload);
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){ mqtt_publish_to_all(mqtt_topic_pure_number(), payload, 0);
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_pure_number(), strlen(payload), payload, 0, false);
}
free(payload); free(payload);
cJSON_Delete(root); cJSON_Delete(root);
...@@ -423,9 +411,7 @@ int mqtt_verify_clear_send(char *status){ ...@@ -423,9 +411,7 @@ int mqtt_verify_clear_send(char *status){
my_zlog_debug("%s",payload); my_zlog_debug("%s",payload);
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){ mqtt_publish_to_all(mqtt_topic_pure_number(), payload, 0);
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_pure_number(), strlen(payload), payload, 0, false);
}
free(payload); free(payload);
cJSON_Delete(root); cJSON_Delete(root);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment