Commit 47699a2f authored by 957dd's avatar 957dd

拉取接口绑定mqtt服务器,还需测试

parent 2f81009a
...@@ -91,6 +91,7 @@ target_link_libraries(main PRIVATE ...@@ -91,6 +91,7 @@ target_link_libraries(main PRIVATE
crypto crypto
pthread pthread
dl dl
uuid
) )
# 安装规则 # 安装规则
......
...@@ -124,20 +124,16 @@ void *thread_open_browser(void *arg) { ...@@ -124,20 +124,16 @@ void *thread_open_browser(void *arg) {
void *thread_mqtt_reconnect(void *arg) { void *thread_mqtt_reconnect(void *arg) {
while(1){ while(1){
if (mqtt_init() != 0) { if(mqtt_init() == 0){
my_zlog_fatal("mqtt_init failed"); my_zlog_warn("mqtt success");
mqtt_clean(); break;
continue; }else {
} delay_ms(300);
continue;
if (g_clients_t == NULL) {
if (!g_clients_t) {
my_zlog_error("Failed to create mosquitto client");
} }
} }
mqtt_cycle(); mqtt_cycle();
}
mqtt_clean(); mqtt_clean();
return NULL; return NULL;
......
No preview for this file type
...@@ -29,6 +29,8 @@ ...@@ -29,6 +29,8 @@
#include <time.h> // 时间处理 #include <time.h> // 时间处理
#include <stddef.h> #include <stddef.h>
#include <uuid/uuid.h>
#include "mosquitto.h" #include "mosquitto.h"
#include "cjson/cJSON.h" #include "cjson/cJSON.h"
......
...@@ -8,7 +8,7 @@ char gwebcam[2048];//存放启动火狐网站命令 ...@@ -8,7 +8,7 @@ char gwebcam[2048];//存放启动火狐网站命令
int opencamsh() { int opencamsh() {
char urls[50]; char urls[512];
sprintf(urls,"%s%s",g_mqtt_cam_config_t->videourl,mqtt_topic_pure_number()); sprintf(urls,"%s%s",g_mqtt_cam_config_t->videourl,mqtt_topic_pure_number());
......
...@@ -52,12 +52,13 @@ int parse_device_config(const char *json_str) { ...@@ -52,12 +52,13 @@ int parse_device_config(const char *json_str) {
int i = 0; int i = 0;
cJSON_ArrayForEach(item, mqtt) { cJSON_ArrayForEach(item, mqtt) {
if (cJSON_IsString(item)) { if (cJSON_IsString(item)) {
my_zlog_info(" [%d] %s", i++, item->valuestring);
strncpy(g_mqtt_cam_config_t->mqtt_servers[i], item->valuestring, sizeof(g_mqtt_cam_config_t->mqtt_servers[i]) - 1); strncpy(g_mqtt_cam_config_t->mqtt_servers[i], item->valuestring, sizeof(g_mqtt_cam_config_t->mqtt_servers[i]) - 1);
g_mqtt_cam_config_t->mqtt_servers[i][sizeof(g_mqtt_cam_config_t->mqtt_servers[i]) - 1] = '\0'; g_mqtt_cam_config_t->mqtt_servers[i][sizeof(g_mqtt_cam_config_t->mqtt_servers[i]) - 1] = '\0';
my_zlog_info(" [%d] %s", i, g_mqtt_cam_config_t->mqtt_servers[i]);
i = i + 1;
} }
g_mqtt_cam_config_t->mqtt_count = i;
} }
g_mqtt_cam_config_t->mqtt_count = i;
} }
// 4. 提取 api 和 videoUrl // 4. 提取 api 和 videoUrl
......
...@@ -12,8 +12,7 @@ typedef struct{ ...@@ -12,8 +12,7 @@ typedef struct{
int mqtt_count; int mqtt_count;
}DEVICE_MQTTCONFIG; }DEVICE_MQTTCONFIG;
extern DEVICE_MQTTCONFIG *g_mqtt_cam_config_t ; extern DEVICE_MQTTCONFIG *g_mqtt_cam_config_t;
/*mqtt服务器和视频流地址初始化*/ /*mqtt服务器和视频流地址初始化*/
int http_mqtt_video_init(); int http_mqtt_video_init();
......
...@@ -8,16 +8,17 @@ ...@@ -8,16 +8,17 @@
mqttclient g_clients_t[MAX_SERVERS]; mqttclient g_clients_t[MAX_SERVERS];
char g_uuid_mqtt_topic_id[MAX_SERVERS][56];
// struct mosquitto *mosq; // struct mosquitto *mosq;
int add_mqtt_create(const char *host, int port, int clients_count); int add_mqtt_create(const char *host, int port, int clients_count);
//mqtt初始化 //mqtt初始化
int mqtt_init() { int mqtt_init() {
mosquitto_lib_init(); mosquitto_lib_init();
// 这里创建mosq可以保留,也可以删除这一行,让创建放到 Mqtt_onnect 里 // 这里创建mosq可以保留,也可以删除这一行,让创建放到 Mqtt_onnect 里
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){ for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){
int erg= add_mqtt_create(g_mqtt_cam_config_t->mqtt_servers[i],BROKER_PORT,i); int erg= add_mqtt_create(g_mqtt_cam_config_t->mqtt_servers[i],BROKER_PORT,i);
if(erg !=0){ if(erg !=0){
...@@ -103,6 +104,8 @@ void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_messag ...@@ -103,6 +104,8 @@ void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_messag
// } // }
int add_mqtt_create(const char *host, int port, int clients_count){ int add_mqtt_create(const char *host, int port, int clients_count){
static int uuid_index =0;
if(clients_count >= MAX_SERVERS) { if(clients_count >= MAX_SERVERS) {
my_zlog_error("Max server limit reached."); my_zlog_error("Max server limit reached.");
return -1; return -1;
...@@ -110,9 +113,19 @@ int add_mqtt_create(const char *host, int port, int clients_count){ ...@@ -110,9 +113,19 @@ int add_mqtt_create(const char *host, int port, int clients_count){
mqttclient *client_t = &g_clients_t[clients_count]; mqttclient *client_t = &g_clients_t[clients_count];
strncpy(client_t->host, host, sizeof(client_t->host)-1); strncpy(client_t->host, host, sizeof(client_t->host)-1);
client_t->port = port; client_t->port = port;
//strncpy(client_t->client_id, client_id, sizeof(client_t->client_id)-1);
client_t->mosq = mosquitto_new(NULL, true, client_t); uuid_t uuid;
uuid_generate_time(uuid); // 获取UUIDv1
unsigned short time_part = (uuid[0] << 8) | uuid[1];
unsigned short mac_part = (uuid[10] << 8) | uuid[11];
// 组合成32位值(8字符十六进制)
snprintf(g_uuid_mqtt_topic_id[uuid_index], 9, "%04x%04x", time_part, mac_part);
client_t->mosq = mosquitto_new(g_uuid_mqtt_topic_id[uuid_index], true, client_t);
uuid_index++;
if(!client_t->mosq) { if(!client_t->mosq) {
my_zlog_error("mosquitto_new failed"); my_zlog_error("mosquitto_new failed");
...@@ -121,15 +134,20 @@ int add_mqtt_create(const char *host, int port, int clients_count){ ...@@ -121,15 +134,20 @@ int add_mqtt_create(const char *host, int port, int clients_count){
mosquitto_int_option(client_t->mosq, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5); mosquitto_int_option(client_t->mosq, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5);
mosquitto_reconnect_delay_set(client_t->mosq, 2, 10, true);
mosquitto_connect_callback_set(client_t->mosq, on_connect); mosquitto_connect_callback_set(client_t->mosq, on_connect);
mosquitto_message_callback_set(client_t->mosq, on_message); mosquitto_message_callback_set(client_t->mosq, on_message);
mosquitto_username_pw_set(client_t->mosq, USERNAME, PASSWORD); mosquitto_username_pw_set(client_t->mosq, USERNAME, PASSWORD);
if(mosquitto_connect(client_t->mosq, host, port, 60) != MOSQ_ERR_SUCCESS) { int rc = mosquitto_connect_async(client_t->mosq, host, port, 60);
if( rc != MOSQ_ERR_SUCCESS) {
my_zlog_error("mosquitto_connect failed"); my_zlog_error("mosquitto_connect failed");
my_zlog_warn("Failed to connect to broker: %s", mosquitto_strerror(rc));
return -1; return -1;
} }
my_zlog_info("add %s mqttserver success",host); my_zlog_info("add %s mqttserver success",host);
return 0; return 0;
} }
...@@ -159,11 +177,11 @@ void mqtt_clean(){ ...@@ -159,11 +177,11 @@ void mqtt_clean(){
int mqtt_cycle() {//非阻塞型 int mqtt_cycle() {//非阻塞型
while (1) { while (1) {
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){ for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){
int rc= mosquitto_loop(g_clients_t[i].mosq, 0, 1); // 每秒检查一次 int rc= mosquitto_loop(g_clients_t[i].mosq, 100, 1); // 每秒检查一次
if(rc != MOSQ_ERR_SUCCESS){ if(rc != MOSQ_ERR_SUCCESS){
my_zlog_warn("服务器 %s:%d 断开,尝试重连...\n", my_zlog_warn("服务器 %s:%d 断开,尝试重连...",
g_clients_t[i].host, g_clients_t[i].port); g_clients_t[i].host, g_clients_t[i].port);
mosquitto_reconnect(g_clients_t[i].mosq); mosquitto_reconnect(g_clients_t[i].mosq);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment