Commit 8dbb5dd1 authored by 957dd's avatar 957dd

Merge branch 'feature/mqtt_bug_repair' into 'master'

Feature/mqtt bug repair See merge request !73
parents 1218c51a 6d50e9e8
cmake_minimum_required(VERSION 3.10)
project(car
VERSION 1.2.15
VERSION 1.2.17
LANGUAGES C
)
......
......@@ -3,7 +3,6 @@
#include "common.h"
#include "device_id_change.h"
#include "delay.h"
#define BUFFER_SIZE 1024
......
#include "device_identity.h"
#include "mqtt_init.h"
#include "common.h"
#include "delay.h"
#include "device_wifi_change.h"
#include "http_config_mqtt.h"
#include "modules_common.h"
char g_current_ssid[SSID_MAX_LEN] = {0};//用于存现在已连接WiFi
char wifi_Last_ssid[SSID_MAX_LEN]= {0} ;
......
#ifndef DEVICE_WIFI_CHANGE_H__
#define DEVICE_WIFI_CHANGE_H__
#include "cjson/cJSON.h"
#include "common.h"
#define SSID_MAX_LEN 128
#define PASS_MAX_LEN 128
......
#include "device_identity.h"
#include "mqtt_init.h"
#include "delay.h"
#include "device_wifi_change.h"
#include "device_wifi_manager.h"
#include "http_config_mqtt.h"
#include "modules_common.h"
#define MAX_WIFI_ENTRIES 50
#define SSID_MAX_LENGTH 64
......
#include "device_fileopen.h"
#include "device_id_change.h"
#include "common.h"
#include "delay.h"
#define FILENAME "/home/orangepi/car/master/Deviceld.txt"
......
#include "common.h"
#include "devcontrol_common.h"
#include "device_identity.h"
#include "device_fileopen.h"
#include "device_init.h"
#include "delay.h"
#include "common.h"
#include "drivers_common.h"
char g_app2dev_topic[23];
char g_dev2app_topic[23];
char g_pure_topic[15];
char g_ser2dev_topic[23];
bool g_device_name_exists=0;//查找设备号状态机,如果id是不属于任何类型
static bool g_device_name_exists_s=0;//查找设备号状态机,如果id是不属于任何类型
/*提早声明*/
int device_mqtt_topic_init();
......@@ -96,7 +94,7 @@ int hash_insert_init(HashTable_t *HashTable_t) {
insert(HashTable_t, "0201", TANK_0201);
insert(HashTable_t, "0202", TANK_0202);
insert(HashTable_t, "0203", TANK_0203);
insert(HashTable_t, "0204", TANK_0203);
insert(HashTable_t, "0204", TANK_0204);
insert(HashTable_t, "0206", TANK_0206);
insert(HashTable_t, "0301", SHIP_0301);
insert(HashTable_t, "0401", PAO_0401);
......@@ -139,7 +137,7 @@ int device_judg(CodeEnum_t code,char *sub_str) {
my_zlog_info("使用型号%s",sub_str);
}else {
my_zlog_error("没有找到设备号,尝试启用备用mqtt,topic进行改名");
g_device_name_exists=1;
g_device_name_exists_s=1;
}
return 0;
......@@ -172,7 +170,7 @@ int device_mqtt_topic_init() {
free(sub_str);
delay_s(10);
if(g_device_name_exists == 0) {
if(g_device_name_exists_s == 0) {
sprintf(g_app2dev_topic,"app2dev/%s",readbuf);
sprintf(g_dev2app_topic,"dev2app/%s",readbuf);
sprintf(g_pure_topic,"%s",readbuf);
......
......@@ -2,15 +2,10 @@
#define MAIN_H__
#include "common.h"
#include "device_fileopen.h"
#include "device_identity.h"
#include "app_device_common.h"
#include "drivers_common.h"
#include "modules_common.h"
#include "pthread_open.h"
#include "http_request.h"
#include "softiic.h"
#include "INA226.h"
#include "devcontrol_common.h"
#include "gpio_init.h"
#include "http_config_mqtt.h"
/*设备id读取初始化*/
int device_id_file_init();
......
#include "common.h"
#include "delay.h"
#include "app_device_common.h"
#include "drivers_common.h"
#include "modules_common.h"
#include "pthread_open.h"
#include "mqtt_init.h"
#include "mqtt_infor_handle.h"
#include "browser_open.h"
#include "mqtt_verify.h"
#include "device_wifi_change.h"
#include "devcontrol_common.h"
#include "browser_open.h"
#include "device_init.h"
#include "audioplay.h"
#include "gpio_init.h"
#define PTHREAD_MAX 6
......@@ -124,11 +116,20 @@ void *thread_open_browser(void *arg) {
void *thread_mqtt_reconnect(void *arg) {
while(1){
if(mqtt_init() == 0){
my_zlog_warn("mqtt success");
static int reconnect_count=0;
static int remqtt_index;
remqtt_index=mqtt_init();
if(remqtt_index == 0){
my_zlog_warn("mqtt success",remqtt_index);
break;
}else {
my_zlog_warn("wait... mqtt reconect,error:%d",remqtt_index);
delay_ms(300);
reconnect_count++;
if(reconnect_count>MAX_RECONNECT_ATTEMPTS){
send_fail_mqtt_conect();
break;
}
continue;
}
}
......
#define PROJECT_VERSION_MAJOR 1
#define PROJECT_VERSION_MINOR 2
#define PROJECT_VERSION_PATCH 14
#define PROJECT_VERSION_PATCH 17
#define GIT_HASH ""
#define BUILD_TIMESTAMP ""
#define BUILD_USER ""
No preview for this file type
#include "common.h"
#include "car0101_control.h"
#include "gpio_init.h"
#include "modules_common.h"
#include "gpio_common.h"
/*将角度转化为对应的舵机pwm值*/
void car_calculate_L_R(int angle) {
......
#include "car0102_control.h"
#include "common.h"
#include "gpio_init.h"
#include "device_init.h"
#include "gpio_control.h"
#include "modules_common.h"
#include "gpio_common.h"
//将角度转化为对应的舵机pwm值
void car0102_calculate_L_R(int angle) {
......
#include "car0103_control.h"
#include "gpio_init.h"
#include "modules_common.h"
#include "common.h"
#include "gpio_common.h"
void car0103_middle() {
pwmWrite(PWM_PIN_SPEED,75);
......
#include "car0104_control.h"
#include "gpio_init.h"
#include "modules_common.h"
#include "common.h"
#include "gpio_common.h"
void car0104_stop() {
pwmWrite(PWM_PIN_SPEED,75);
......
#include "devcontrol_common.h"
#include "gpio_init.h"
#include "modules_common.h"
#include "gpio_common.h"
const device_didrive device_didrive_control_config_t[]={
{
......
#include "common.h"
#include "gpio_init.h"
#include "modules_common.h"
#include "ptz0401_control.h"
#include "gpio_common.h"
#define PWM_PIN_up 21
#define PWM_PIN_down 2
......
#include "common.h"
#include "gpio_init.h"
#include "modules_common.h"
#include "ship0301_control.h"
#include "gpio_common.h"
void ship0301_stop() {
pwmWrite(PWM_PIN_SPEED,75);
......
#include "common.h"
#include "gpio_init.h"
#include "modules_common.h"
#include "tank0202_control.h"
#include "gpio_common.h"
int g_modecount_tank0202=0;
......
#include "common.h"
#include "gpio_init.h"
#include "pthrpoll.h"
#include "modules_common.h"
#include "tank0203_control.h"
#include "gpio_common.h"
#define PWM_MIN 55 // 最小PWM值
#define PWM_MAX 95 // 最大PWM值
......
#ifndef TANK0203_CONTROL_H__
#define TANK0203_CONTROL_H__
void tank0203_middle();
void tank0203_change(unsigned char *buf);
......
#include "common.h"
#include "gpio_init.h"
#include "pthrpoll.h"
#include "modules_common.h"
#include "tank0204_control.h"
#include "gpio_common.h"
void tank0204_stop() {
pwmWrite(PWM_PIN_SPEED,75);
......@@ -70,76 +69,89 @@ void tank0204_mode_right_flont(unsigned char gval) {
}
}
int tank0204_change_grading(int mode){
int mode_val=0;
switch(mode){
case 2:
mode_val=10;
break;
case 3:
mode_val=20;
break;
case 4:
mode_val=30;
break;
case 5:
mode_val=50;
break;
default:
break;
}
return mode_val;
}
void tank0204_change(unsigned char *buf) {
unsigned char type = buf[0];
unsigned char mode = buf[1];
unsigned char val = buf[2];
static int modecount_tank0203=0;
static int tank0203_index =0;
// static int s_val_1=0;
// static int s_val_2=0;
if(mode == 1 ) {
tank0204_mode_lift_flont(val);
tank0204_mode_right_flont(val);
modecount_tank0203=0;
//s_val_1 = val;
}else if(mode == 2 ) {
tank0204_mode_lift_back(val);
tank0204_mode_right_back(val);
modecount_tank0203=1;
//s_val_2 = val;
}
if((mode == 1||mode == 2)&&val == 0) {
modecount_tank0203 = 0;
tank0203_index = 0;
}
if((mode == 1||mode == 2)&&val != 0) tank0203_index = 1;
if(mode == 3) {
if(modecount_tank0203 == 0){
if(tank0203_index == 1) {
tank0204_mode_lift_flont(0);
//tank0204_mode_right_flont(s_val_1-20);
}
else {
tank0204_mode_lift_back(val+30);
tank0204_mode_right_flont(val+30);
}
}
if(modecount_tank0203 == 1) {
if(tank0203_index == 1) {
tank0204_mode_lift_back(0);
//tank0204_mode_lift_back(s_val_2-20);
}else {
tank0204_mode_lift_back(val+30);
tank0204_mode_right_flont(val+30);
}
static int tank0204_steering_t=0;
static int tank0204_front_t =0;
}
static int tank0204_front_val=0;
static int tank0204_steering_val=0;
}else if(mode == 4) {
if(modecount_tank0203 == 1){
if(tank0203_index == 1) {
//tank0204_mode_right_back(s_val_1);
tank0204_mode_right_back(0);
}
else {
tank0204_mode_lift_flont(val+30);
tank0204_mode_right_back(val+30);
int tank0204_count=40;
if(type != 1) tank0204_count=tank0204_change_grading(type);
if((mode == 1 ||mode == 2)&&val == 0) tank0204_front_t =0;
if((mode == 3 ||mode == 4)&&val == 0) tank0204_steering_t=0;
if(mode == 1&&val != 0) {
tank0204_front_val=val;
tank0204_front_t =1;
}else if(mode == 2&&val != 0) {
tank0204_front_t =2;
tank0204_front_val=val;
}
if(mode == 3&&val != 0) {
tank0204_steering_t =1;
tank0204_steering_val = val;
}else if(mode == 4&&val != 0) {
tank0204_steering_t =2;
tank0204_steering_val = val;
}
if(modecount_tank0203 == 0) {
if(tank0203_index == 1) {
//tank0204_mode_lift_flont(s_val_1-20);
if(tank0204_front_t ==0&&tank0204_steering_t==0){
tank0204_mode_lift_flont(0);
tank0204_mode_right_flont(0);
}else if(tank0204_front_t ==1&&tank0204_steering_t==0){
tank0204_mode_right_flont(tank0204_front_val+10);
tank0204_mode_lift_flont(0);
}else if(tank0204_front_t ==2&&tank0204_steering_t==0){
tank0204_mode_right_back(tank0204_front_val+10);
tank0204_mode_lift_back(0);
}else if(tank0204_front_t ==0&&tank0204_steering_t==1){
tank0204_mode_lift_back(tank0204_steering_val + tank0204_count+20);
tank0204_mode_right_back(0);
}else if(tank0204_front_t ==0&&tank0204_steering_t==2){
tank0204_mode_lift_flont(tank0204_steering_val + tank0204_count+20);
tank0204_mode_right_back(0);
}
else {
tank0204_mode_lift_flont(val+30);
tank0204_mode_right_back(val+30);
}
}
else if(tank0204_front_t ==1&&tank0204_steering_t==1){
tank0204_mode_lift_back(tank0204_steering_val + 10);
tank0204_mode_right_flont(tank0204_steering_val + 20);
}else if(tank0204_front_t ==1&&tank0204_steering_t==2){
tank0204_mode_lift_flont(tank0204_steering_val +10);
tank0204_mode_right_flont(tank0204_steering_val + 20);
}else if(tank0204_front_t ==2&&tank0204_steering_t==1){
tank0204_mode_lift_back(tank0204_steering_val + 10);
tank0204_mode_right_back(tank0204_steering_val + 20);
}else if(tank0204_front_t ==2&&tank0204_steering_t==2){
tank0204_mode_lift_flont(tank0204_steering_val + 10);
tank0204_mode_right_back(tank0204_steering_val + 20);
}
}
......
#ifndef TANK0204_CONTROL_H__
#define TANK0204_CONTROL_H__
void tank0204_stop();
void tank0204_change(unsigned char *buf);
......
#include "common.h"
#include "gpio_init.h"
#include "pthrpoll.h"
#include "modules_common.h"
#include "tank0206_control.h"
#include "gpio_common.h"
void tank0206_middle() {
pwmWrite(PWM_PIN_SPEED,75);
......
#ifndef TANK0206_CONTROL_H__
#define TANK0206_CONTROL_H__
void tank0206_middle();
void tank0206_change(unsigned char *buf);
......
#include "tank_common.h"
#include "devcontrol_common.h"
#include "gpio_init.h"
#include "pthrpoll.h"
#include "tank_angle.h"
#include "gpio_control.h"
#include "modules_common.h"
#include "gpio_common.h"
int tank_shot_back_stop(unsigned char pin,unsigned char val);
......
......@@ -106,8 +106,8 @@ const deviceconfig_t device_configs[] = {
{
.device_id = DEVICE_ROBOT_DOG0501,
.device_name = "dog0501",
.gpio_pins = {5, 6, 7, 10, 16, 20, 22, 23, 24, 25, 26,-1},/* 补充GPIO引脚 */
.gpio_pwms = { 27,-1},
.gpio_pins = {6, 10, 16, 20, 22, 23, 25,-1},/* 补充GPIO引脚 */
.gpio_pwms = { 5, 7,24, 26,27,-1},
.device_pwm_init = physics_pwm_init,
.device_control_stop = car0101_middle_pwm,/* 补充速度控制函数 */
.emergency_code = 501
......
#ifndef GPIO_COMMON_H
#define GPIO_COMMON_H
#include "device_init.h"
#include "gpio_init.h"
#include "gpio_control.h"
#include "softiic.h"
#endif
\ No newline at end of file
......@@ -5,7 +5,6 @@
#include "gpio_init.h"
#include "devcontrol_common.h"
#include "http_request.h"
#include "delay.h"
#define GPIO_ID_THREAD_COUNT 3
......@@ -353,7 +352,7 @@ void tank0202_pwm_value(int pin,int value) { //软件陪我们控制调速
if(pin == 27){
device_shoting_check(27,30);
} else {
softPwmWrite(pin, 40);
softPwmWrite(pin, 30);
my_zlog_info("pwm:%d",pin);
}
......@@ -378,7 +377,7 @@ void tank0203_pwm_value(int pin,int value) { //软件陪我们控制调速
if(pin == 27){
device_shoting_check(27,30);
} else {
softPwmWrite(pin, 35);
softPwmWrite(pin, 30);
my_zlog_info("pwm:%d",pin);
}
......@@ -403,7 +402,7 @@ void tank0204_pwm_value(int pin,int value){
if(pin == 27){
device_shoting_check(27,30);
} else {
softPwmWrite(pin, 35);
softPwmWrite(pin, 30);
my_zlog_info("pwm:%d",pin);
}
......@@ -479,14 +478,18 @@ void dog0501_pwm_value(int pin,int value) { //软件陪我们控制调速
if(value==1) {
if(pin == 27){
softPwmWrite(pin, 90);
softPwmWrite(27, 40);
softPwmWrite(7, 80);
} else {
softPwmWrite(pin, 35);
my_zlog_info("pwm:%d",pin);
}
}else if(value==0) {
softPwmWrite(pin, 0);
if(pin==27){
softPwmWrite(27, 0);
softPwmWrite(7, 0);
}else softPwmWrite(pin, 0);
my_zlog_info("pwm:%d,0",pin);
}
my_zlog_info("dog0501 pwm");
......
#include <math.h>
#include "common.h"
#include "softiic.h"
......
#include <sys/socket.h>
#include <netinet/in.h>
#include <ifaddrs.h>
#include "common.h"
#include "ip_reader.h"
#include "mylog.h"
struct ifaddrs *ifap, *ifa;
struct sockaddr_in *sa;
......
#ifndef SERSORS_COMMON_H
#define SERSORS_COMMON_H
#include "ads1115.h"
#include "audioplay.h"
#include "INA226.h"
#include "infrared_gun.h"
#include "tank_angle.h"
#include "temperature.h"
#include "warn.h"
#endif
\ No newline at end of file
......@@ -2,11 +2,15 @@
#include "common.h"
#include "ads1115.h"
#ifndef M_PI
#define M_PI 3.14159265358979323846
#endif
double tank_angle(){
double angle=0;
float angle_shot=ads1115_read_channel(2);
if(angle_shot>0){
angle_shot=angle_shot*360/5;
angle_shot=angle_shot*360/5.05;
angle = round(angle_shot * 100) / 100;
}
......@@ -24,3 +28,48 @@ int angle_limit(){
}else return 0;
}
/*
角度 EMA 滤波
- filter : 上次滤波值(度)
- raw : 当前原始角度(度)
- alpha : 衰减因子(0..1),越大越跟随原始值(推荐 0.05~0.5)
- deadband : 死区(度),小于该值认为是抖动
*/
// EN滤波结构体
// typedef struct {
// double alpha; // 滤波系数 (0~1)
// double filtered; // 上一次滤波角度
// } ENFilter;
// static ENFilter filter = {.alpha = 0.5, .filtered = 0.05};
// // 支持360°→0°跳变的滤波函数
// double en_filter_update(double new_angle)
// {
// double diff = new_angle - filter.filtered;
// // 处理角度环绕(支持359→0或0→359平滑过渡)
// if (diff > 180.0)
// diff -= 360.0;
// else if (diff < -180.0)
// diff += 360.0;
// // 一阶低通滤波
// filter.filtered += filter.alpha * diff;
// // 结果保持在[0, 360)
// if (filter.filtered < 0)
// filter.filtered += 360.0;
// else if (filter.filtered >= 360.0)
// filter.filtered -= 360.0;
// return filter.filtered;
// }
// //上次的角度滤波后的值
// double return_en_filter_last_angle(){
// return filter.filtered;
// }
......@@ -11,4 +11,9 @@ double tank_angle();
int angle_limit();
// //角度滤波
// double en_filter_update(double new_angle);
// //上次角度滤波
// double return_en_filter_last_angle();
#endif
\ No newline at end of file
#include "common.h"
#include "warn.h"
#include "http_request.h"
#include "mylog.h"
#include "gpio_control.h"
#include "device_init.h"
#include "devcontrol_common.h"
......
......@@ -25,12 +25,14 @@
- tank0204 M1A2最大为200,更据电池电压具体调速。原电池为7.6v,大概为140左右
- tank0205 M1A2最大为200,更据电池电压具体调速。原电池为7.6v,大概为140左右
- tank0202~0205为眯眼潮玩坦克。
- 0202 豹2 ,0203 M1A2 ,0204 99A主站坦克
- tank0206 为可以发射水枪的水坦克.原电池为3.3v,大概为80左右
- car0101 最大为150,尽量不能这么大
- car0102 为小车 最大速度为200
- car0103 为挖机 最大速度为200,更据电池电压具体调速。原电池为7.6v,大概为140左右
- car0104 为推土机 最大速度为200,更据电池电压具体调速。原电池为7.6v,大概为140左右
- ptz0401 为炮台,有限位。
- 0501 为机械狗
## 国内国外二进制介绍
- mqtt_init.h改变里面的一个MQTT_IPMODE,游览器改变browser_open.h中的BROWSER_MODE
\ No newline at end of file
#ifndef APP_DEVICE_COMMON_H
#define APP_DEVICE_COMMON_H
#include "device_id_change.h"
#include "device_wifi_change.h"
#include "device_wifi_manager.h"
#include "device_fileopen.h"
#include "device_identity.h"
#endif
\ No newline at end of file
......@@ -8,6 +8,8 @@
#include <stdbool.h>
#include <string.h>
#include <errno.h>
#include <stddef.h>
#include <errno.h>
// ========== 硬件相关 ==========
#include <wiringPi.h> // GPIO控制
......@@ -18,32 +20,31 @@
#include <unistd.h> // POSIX接口
#include <sys/stat.h> // 文件状态
#include <dirent.h> // 目录操作
#include <sys/types.h>
#include <sys/select.h>
#include <pty.h> // for openpty
// ========== 网络通信 ==========
#include <curl/curl.h> // HTTP客户端
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <ifaddrs.h>
// ========== 高级功能 ==========
#include <termios.h> // 串口通信
#include <pthread.h> // 多线程
#include <math.h> // 数学函数
#include <time.h> // 时间处理
#include <stddef.h>
#include <uuid/uuid.h>
#include <signal.h>
// ========== 第三方库 ==========
#include "mosquitto.h"
#include "cjson/cJSON.h"
// ========== 自己封装的库 ==========
#include "delay.h"
#include "mylog.h"
// ========== 香橙派优化宏 ==========
// #define ORANGE_GPIO_SET(pin, val) { \
// pinMode(pin, OUTPUT); \
// digitalWrite(pin, val); \
// }
#endif // COMMON_H
\ No newline at end of file
#ifndef DRIVERS_COMMON_H
#define DRIVERS_COMMON_H
#include "devcontrol_common.h"
#include "gpio_common.h"
#include "ip_reader.h"
#include "sensors_common.h"
#endif
\ No newline at end of file
#ifndef MODULES_COMMON_H
#define MODULES_COMMON_H
#include "browser_open.h"
#include "http_common.h"
#include "delay.h"
#include "mylog.h"
#include "mqtt_common.h"
#include "pthrpoll.h"
#endif
\ No newline at end of file
#include "common.h"
#include "browser_open.h"
#include "device_fileopen.h"
#include "device_identity.h"
......
#ifndef BROWSER_OPEN_H__
#define BROWSER_OPEN_H__
#include "common.h"
/*打开摄像头函数,当MODE为1时候为国内网址,2时为泰国网址,3为中东网址*/
#define BROWSER_MODE 1
......
#ifndef HTTP_COMMON_H
#define HTTP_COMMON_H
#include "http_config_mqtt.h"
#include "http_consolepush.h"
#include "http_request.h"
#endif
\ No newline at end of file
#include "device_identity.h"
#include "http_config_mqtt.h"
#include<curl/curl.h>
#include "cjson/cJSON.h"
#include "common.h"
#include "http_request.h"
DEVICE_MQTTCONFIG g_mqtt_cam_config;
......@@ -62,7 +61,7 @@ int parse_device_config(const char *json_str) {
}
// 4. 提取 api 和 videoUrl
cJSON *videoUrl = cJSON_GetObjectItemCaseSensitive(data, "videoUrl");
cJSON *videoUrl = cJSON_GetObjectItemCaseSensitive(data, "videoUrlForDevice");
if (cJSON_IsString(videoUrl)) {
my_zlog_debug("视频地址: %s", videoUrl->valuestring);
......
#include "http_consolepush.h"
#include "common.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <sys/select.h>
#include <pty.h> // for openpty
#include <termios.h>
#include <sys/types.h>
#include <signal.h>
#include "pthrpoll.h"
void contril_pthread_open();
......
#include<curl/curl.h>
#include "cjson/cJSON.h"
#include "http_request.h"
#include "device_identity.h"
#include "common.h"
......
#ifndef _MQTT_COMMON_H
#define _MQTT_COMMON_H
#include "mqtt_infor_handle.h"
#include "mqtt_verify.h"
#include "mqtt_init.h"
#endif
\ No newline at end of file
#include "mqtt_infor_handle.h"
#include "mqtt_init.h"
#include "ip_reader.h"
#include "mqtt_verify.h"
#include "device_fileopen.h"
#include "device_identity.h"
#include "browser_open.h"
#include "temperature.h"
#include "mqtt_common.h"
#include "common.h"
#include "INA226.h"
#include "ads1115.h"
#include "warn.h"
#include "device_fileopen.h"
#include "devcontrol_common.h"
#include "device_wifi_change.h"
#include "device_wifi_manager.h"
#include "device_id_change.h"
#include "audioplay.h"
#include "gpio_init.h"
#include "device_init.h"
#include "tank_angle.h"
#include "gpio_control.h"
#include "http_consolepush.h"
#include "http_config_mqtt.h"
#include "app_device_common.h"
#include "drivers_common.h"
#include "modules_common.h"
int g_heartbeat_count=0;
......@@ -83,14 +65,12 @@ void heartbeat_send() {
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_pure_number(), strlen(payload), payload, 0, false);
}
cJSON_Delete(root); // 释放 cJSON 对象
}
//角度发送
void angle_mqtt_send() {
static int angle_i=0;
static double record_angle_t=0;
cJSON *root = cJSON_CreateObject();
char TOPIC_send_angle[26];
double rounded_angle = tank_angle();
......@@ -103,15 +83,12 @@ void angle_mqtt_send() {
my_zlog_debug("%s",payload);
angle_i=0;
}
if(fabs(rounded_angle - record_angle_t) >3){
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){
mosquitto_publish(g_clients_t[i].mosq, NULL, TOPIC_send_angle, strlen(payload), payload, 0, false);
}
}
record_angle_t=rounded_angle;
cJSON_Delete(root); // 释放 cJSON 对象
}
//心跳格式,每5s一次心跳
......@@ -131,7 +108,6 @@ void mqtt_beat_wirte(){
default:
break;
}
}
//message_type为3,控制pwm
......
......@@ -3,7 +3,6 @@
#include "mqtt_init.h"
#include "device_identity.h"
#include "mqtt_infor_handle.h"
#include "mylog.h"
#include "http_config_mqtt.h"
mqttclient g_clients_t[MAX_SERVERS];
......@@ -11,26 +10,59 @@ mqttclient g_clients_t[MAX_SERVERS];
char g_uuid_mqtt_topic_id[MAX_SERVERS][56];
// struct mosquitto *mosq;
int add_mqtt_create(const char *host, int port, int clients_count);
// 新增:用于向其他MQTT服务器发送通知的函数
void send_server_failure_notification(const char *failed_host, int failed_port, const char *failed_client_id);
int add_mqtt_create(const char *host, int port, int clients_count);
//mqtt初始化
int mqtt_init() {
mosquitto_lib_init();
// 这里创建mosq可以保留,也可以删除这一行,让创建放到 Mqtt_onnect 里
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){
int erg= add_mqtt_create(g_mqtt_cam_config_t->mqtt_servers[i],BROKER_PORT,i);
if(erg !=0){
return -1;
int success_count = 0;
int failure_count = 0;
// 在尝试重新初始化之前,重置所有客户端状态。
// 如果 mqtt_init 被多次调用,这一点至关重要。
for (int i = 0; i < MAX_SERVERS; i++) {
if (g_clients_t[i].mosq) { // 如果已经存在,先销毁旧的
mosquitto_disconnect(g_clients_t[i].mosq);
mosquitto_destroy(g_clients_t[i].mosq);
}
g_clients_t[i].mosq = NULL;
memset(g_clients_t[i].host, 0, sizeof(g_clients_t[i].host));
g_clients_t[i].port = 0;
// 清除 client_id 和 topic_id
memset(g_clients_t[i].client_id, 0, sizeof(g_clients_t[i].client_id));
memset(g_uuid_mqtt_topic_id[i], 0, sizeof(g_uuid_mqtt_topic_id[i]));
}
if (!g_clients_t) {
my_zlog_error("Failed to create Mosquitto client");
return -1;
// 假设 g_mqtt_cam_config_t 已经有效
if (!g_mqtt_cam_config_t) {
my_zlog_fatal("mqtt_init: g_mqtt_cam_config_t is NULL. Cannot initialize.");
return -1; // 表示严重错误
}
return 0;
for (int i = 0; i < g_mqtt_cam_config_t->mqtt_count; i++) {
// 正确地将索引 'i' 传递给 add_mqtt_create
int erg = add_mqtt_create(g_mqtt_cam_config_t->mqtt_servers[i], BROKER_PORT, i);
if (erg != 0) {
failure_count++;
my_zlog_error("未能为 %s:%d 添加 MQTT 客户端", g_mqtt_cam_config_t->mqtt_servers[i], BROKER_PORT);
} else {
success_count++;
}
}
// 返回失败的数量,或 0 表示完全成功
if (failure_count > 0) {
my_zlog_error("%d 个 MQTT 客户端初始化失败。", failure_count);
return failure_count; // 指示有多少失败
}
my_zlog_info("所有 %d 个 MQTT 客户端初始化成功。", success_count);
return 0; // 成功
}
//回调函数
......@@ -38,6 +70,11 @@ void on_connect(struct mosquitto *mosq, void *obj, int rc) {
mqttclient *client_t = (mqttclient*)obj;
if (!client_t) { // 安全检查
my_zlog_error("on_connect: client_t 为 NULL");
return;
}
if (rc == 0) {
my_zlog_info("Connected to broker");
my_zlog_info("[Connected] %s:%d (%s)",
......@@ -46,9 +83,29 @@ void on_connect(struct mosquitto *mosq, void *obj, int rc) {
mosquitto_subscribe(mosq, NULL, mqtt_topic_app2dev_number(), 0);
mosquitto_subscribe(mosq, NULL, mqtt_topic_ser2dev_number(), 0);
} else {
fprintf(stderr, "Connection failed with code %d\n", rc);
my_zlog_fatal("[Connection Failed] %s:%d rc=%d",
client_t->host, client_t->port, rc);
client_t->reconnect_attempts++;
my_zlog_warn("客户端 %s:%d 连接失败。重试次数: %d/%d",
client_t->host, client_t->port, client_t->reconnect_attempts, MAX_RECONNECT_ATTEMPTS);
if (client_t->reconnect_attempts >= MAX_RECONNECT_ATTEMPTS) {
my_zlog_error("客户端 %s:%d 达到最大重试次数,不再尝试连接。标记为永久失败。",
client_t->host, client_t->port);
client_t->permanently_failed = true;
// 停止 mosquitto 循环并销毁客户端
// 注意:在回调中直接销毁当前正在操作的 mosq 可能会导致问题,
// 更好的做法是设置一个标志,让外部循环(如果存在)或专门的清理线程来处理。
// 但因为 mosquitto_loop_start 是独立的线程,这里尝试停止并销毁它。
// mosquitto_loop_stop 可能需要等待线程结束,这里是同步调用
mosquitto_loop_stop(mosq, false); // false 表示不强制,等待线程自行结束
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
client_t->mosq = NULL; // 清除指针
// 向其他活跃的 MQTT 服务器发送通知
send_server_failure_notification(client_t->host, client_t->port, client_t->client_id);
}
}
}
......@@ -58,6 +115,11 @@ void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_messag
mqttclient *client_t = (mqttclient*)obj;
if (!client_t || !message || !message->topic) { // 安全检查
my_zlog_error("on_message: 无效参数 (client_t, message 或 topic 为 NULL)");
return;
}
my_zlog_info("[Message from %s:%d] topic=%s payload=%s",
client_t->host, client_t->port,
message->topic, (char*)message->payload);
......@@ -84,53 +146,87 @@ void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_messag
}
}
int add_mqtt_create(const char *host, int port, int clients_count){
static int uuid_index =0;
// 添加 MQTT 客户端并尝试连接
// clients_idx 是 g_clients_t 和 g_uuid_mqtt_topic_id 数组的索引
int add_mqtt_create(const char *host, int port, int clients_idx) {
// 移除了 'static int uuid_index = 0;'
// 不再需要独立的 uuid_index,直接使用传入的 clients_idx 作为索引
if (clients_idx >= MAX_SERVERS) {
my_zlog_error("达到最大服务器限制,索引 %d。", clients_idx);
return -1;
}
if(clients_count >= MAX_SERVERS) {
my_zlog_error("Max server limit reached.");
// 检查主机名是否为空或无效
if (!host || strlen(host) == 0) {
my_zlog_error("主机名为空,索引 %d。", clients_idx);
return -1;
}
mqttclient *client_t = &g_clients_t[clients_count];
strncpy(client_t->host, host, sizeof(client_t->host)-1);
mqttclient *client_t = &g_clients_t[clients_idx];
// 复制主机名,确保字符串末尾空终止
strncpy(client_t->host, host, sizeof(client_t->host) - 1);
client_t->host[sizeof(client_t->host) - 1] = '\0';
client_t->port = port;
// 生成 UUID 作为客户端 ID
uuid_t uuid;
uuid_generate_time(uuid); // 获取UUIDv1
uuid_generate_time(uuid); // 获取 UUIDv1
// 组合 UUID 的一部分作为短 ID,并存储在对应 clients_idx 的位置
// 这将作为 mosquitto 客户端的 ID
unsigned short time_part = (uuid[0] << 8) | uuid[1];
unsigned short mac_part = (uuid[10] << 8) | uuid[11];
snprintf(g_uuid_mqtt_topic_id[clients_idx], sizeof(g_uuid_mqtt_topic_id[clients_idx]), "%04x%04x", time_part, mac_part);
g_uuid_mqtt_topic_id[clients_idx][sizeof(g_uuid_mqtt_topic_id[clients_idx])-1] = '\0'; // 确保空终止
// 组合成32位值(8字符十六进制)
snprintf(g_uuid_mqtt_topic_id[uuid_index], 9, "%04x%04x", time_part, mac_part);
client_t->mosq = mosquitto_new(g_uuid_mqtt_topic_id[uuid_index], true, client_t);
// 将生成的 Client ID 也存储到 client_t 结构体中
strncpy(client_t->client_id, g_uuid_mqtt_topic_id[clients_idx], sizeof(client_t->client_id) -1);
client_t->client_id[sizeof(client_t->client_id) -1] = '\0'; // 确保空终止
uuid_index++;
// 创建 mosquitto 客户端实例
client_t->mosq = mosquitto_new(g_uuid_mqtt_topic_id[clients_idx], true, client_t);
if(!client_t->mosq) {
my_zlog_error("mosquitto_new failed");
if (!client_t->mosq) {
my_zlog_error("mosquitto_new 失败,客户端索引 %d", clients_idx);
return -1;
}
// 设置 MQTT 协议版本
mosquitto_int_option(client_t->mosq, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5);
// 设置重连延迟策略
mosquitto_reconnect_delay_set(client_t->mosq, 2, 10, true);
// 设置回调函数
mosquitto_connect_callback_set(client_t->mosq, on_connect);
mosquitto_message_callback_set(client_t->mosq, on_message);
// 设置用户名和密码
mosquitto_username_pw_set(client_t->mosq, USERNAME, PASSWORD);
// 尝试异步连接到 MQTT Broker
int rc = mosquitto_connect_async(client_t->mosq, host, port, 60);
if( rc != MOSQ_ERR_SUCCESS) {
my_zlog_error("mosquitto_connect failed");
my_zlog_warn("Failed to connect to broker: %s", mosquitto_strerror(rc));
if (rc != MOSQ_ERR_SUCCESS) {
my_zlog_error("mosquitto_connect_async 失败,主机 %s:%d (rc=%d): %s", host, port, rc, mosquitto_strerror(rc));
mosquitto_destroy(client_t->mosq); // 如果连接失败,立即销毁 mosquitto 客户端实例
client_t->mosq = NULL; // 将指针置空,以防止在清理时操作无效指针
return -1;
}
// 启动 mosquitto 循环线程,处理网络 I/O 和回调
int loop_rc = mosquitto_loop_start(client_t->mosq);
if (loop_rc != MOSQ_ERR_SUCCESS) {
my_zlog_error("未能为客户端 %s 启动 mosquitto_loop_start: %s", client_t->client_id, mosquitto_strerror(loop_rc));
// 如果 loop_start 失败,也应断开连接并销毁
mosquitto_disconnect(client_t->mosq);
mosquitto_destroy(client_t->mosq);
client_t->mosq = NULL; // 确保指针置空
return -1;
}
my_zlog_info("add %s mqttserver success",host);
return 0;
my_zlog_info("成功为主机 %s 添加 MQTT 客户端", host);
return 0; // 成功
}
//mqtt清理
......@@ -139,6 +235,7 @@ void mqtt_clean(){
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){
mosquitto_disconnect(g_clients_t[i].mosq);
mosquitto_destroy(g_clients_t[i].mosq);
g_clients_t[i].mosq = NULL; // 将指针置空,防止悬挂指针
}
mosquitto_lib_cleanup();
......@@ -146,18 +243,101 @@ void mqtt_clean(){
}
int mqtt_cycle() {//非阻塞型
while (1) {
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){
int rc= mosquitto_loop(g_clients_t[i].mosq, 17, 1); // 每秒检查一次
if(rc != MOSQ_ERR_SUCCESS){
my_zlog_warn("服务器 %s:%d 断开,尝试重连...",
g_clients_t[i].host, g_clients_t[i].port);
mosquitto_reconnect(g_clients_t[i].mosq);
// bool all_connected = true;
// for (int i = 0; i < g_mqtt_cam_config_t->mqtt_count; i++) {
// if (!g_clients_t[i].mosq) continue;
// int state = mosquitto_socket(g_clients_t[i].mosq);
// if (state == -1) {
// all_connected = false;
// my_zlog_warn("检测到 MQTT [%s:%d] 已断开,尝试重连...",
// g_clients_t[i].host, g_clients_t[i].port);
// //mosquitto_reconnect_async(g_clients_t[i].mosq);
// }
// }
delay_ms(1000); // 每 1 秒检测一次连接状态
}
// while (1) {
// for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){
// int rc= mosquitto_loop(g_clients_t[i].mosq, 17, 1); // 每秒检查一次
// if(rc != MOSQ_ERR_SUCCESS){
// my_zlog_warn("服务器 %s:%d 断开,尝试重连...",
// g_clients_t[i].host, g_clients_t[i].port);
// mosquitto_reconnect(g_clients_t[i].mosq);
// }
// }
// delay_us(50);
// }
return 0;
}
// 向其他活跃的 MQTT 服务器发送通知
void send_server_failure_notification(const char *failed_host, int failed_port, const char *failed_client_id) {
my_zlog_info("准备发送服务器故障通知: %s:%d (%s)", failed_host, failed_port, failed_client_id);
cJSON *root = cJSON_CreateObject();
if (!root) {
my_zlog_error("创建 JSON 对象失败。");
return;
}
cJSON_AddStringToObject(root, "event_type", "server_failure");
cJSON_AddStringToObject(root, "failed_host", failed_host);
cJSON_AddNumberToObject(root, "failed_port", failed_port);
cJSON_AddStringToObject(root, "failed_client_id", failed_client_id);
cJSON_AddStringToObject(root, "message", "MQTT server permanently disconnected after multiple retries.");
cJSON_AddNumberToObject(root, "message_type", 9999);
char *json_string = cJSON_PrintUnformatted(root);
if (!json_string) {
my_zlog_error("将 JSON 打印到字符串失败。");
cJSON_Delete(root);
return;
}
for (int i = 0; i < MAX_SERVERS; i++) {
// 遍历所有客户端,如果客户端是活跃的且不是出问题的那个,就发送通知
if (g_clients_t[i].mosq && !g_clients_t[i].permanently_failed) {
// 确保不是给自己发送通知(通常同一个客户端不需要知道自己的失败)
// 尽管理论上失败的客户端的 mosq 已经为 NULL,这里仍然做个双重检查
if (strcmp(g_clients_t[i].host, failed_host) != 0 || g_clients_t[i].port != failed_port) {
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_dev2app_number(), strlen(json_string), json_string, 0, false);
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_pure_number(), strlen(json_string), json_string, 0, false);
}
}
}
}
return 0;
free(json_string);
cJSON_Delete(root);
}
void send_fail_mqtt_conect(){
cJSON *root = cJSON_CreateObject();
if (!root) {
my_zlog_error("创建 JSON 对象失败。");
return;
}
cJSON_AddStringToObject(root, "event_type", "server_connect_failure");
cJSON_AddNumberToObject(root, "message_type", 9999);
char *json_string = cJSON_PrintUnformatted(root);
if (!json_string) {
my_zlog_error("将 JSON 打印到字符串失败。");
cJSON_Delete(root);
return;
}
for(int i=0;i<g_mqtt_cam_config_t->mqtt_count;i++){
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_dev2app_number(), strlen(json_string), json_string, 0, false);
mosquitto_publish(g_clients_t[i].mosq, NULL, mqtt_topic_pure_number(), strlen(json_string), json_string, 0, false);
}
}
\ No newline at end of file
......@@ -8,11 +8,15 @@
#define MAX_SERVERS 10
#define MAX_RECONNECT_ATTEMPTS 10
typedef struct {
struct mosquitto *mosq;
char host[128];
int port;
char client_id[64];
int reconnect_attempts; // 新增:重连尝试次数
bool permanently_failed; // 新增:标记是否永久失败
} mqttclient;
extern mqttclient g_clients_t[MAX_SERVERS];
......@@ -34,4 +38,7 @@ int mqtt_cycle();//循环
//mqtt清理
void mqtt_clean();
//莫一个服务器连接失败发送
void send_fail_mqtt_conect();
#endif
\ No newline at end of file
#include "pthrpoll.h"
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <errno.h>
#include "common.h"
// 初始化任务队列
static int task_queue_init(TaskQueue *queue) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment