Commit f49f554e authored by 957dd's avatar 957dd

Merge branch 'feature/add_mqtt_10s_send_1s' into 'master'

加入了开启发送自控,还需优化 See merge request !1
parents bdf222a9 8d9070e2
log/log.log log/
build/CMakeFiles/ build/CMakeFiles/
build/cmake_install.cmake build/cmake_install.cmake
build/CMakeCache.txt build/CMakeCache.txt
......
No preview for this file type
...@@ -37,6 +37,12 @@ public: ...@@ -37,6 +37,12 @@ public:
// 显示所有设备信息 (const 关键字表示该函数不会修改类成员变量) // 显示所有设备信息 (const 关键字表示该函数不会修改类成员变量)
void displayAllDevices() const; void displayAllDevices() const;
const DeviceNode* getHead() const {
return head;
}
// 根据设备ID查找并显示设备信息 (const) // 根据设备ID查找并显示设备信息 (const)
bool findDevice(const std::string& id) const; bool findDevice(const std::string& id) const;
}; };
\ No newline at end of file
...@@ -140,7 +140,7 @@ bool DeviceList::findDevice(const std::string& id) const { ...@@ -140,7 +140,7 @@ bool DeviceList::findDevice(const std::string& id) const {
DeviceNode* current = head; DeviceNode* current = head;
while (current != nullptr) { while (current != nullptr) {
if (current->deviceId == id) { if (current->deviceId == id) {
std::string header = "\n>> 找到设备 " + id + " 的信息:"; std::string header = ">> 找到设备 " + id + " 的信息:";
mylog_info(header.c_str()); mylog_info(header.c_str());
std::stringstream ss; std::stringstream ss;
......
...@@ -7,9 +7,12 @@ ...@@ -7,9 +7,12 @@
#include <iomanip> #include <iomanip>
#include <string> #include <string>
#include <cstdlib> #include <cstdlib>
#include "device_linked_list.hpp"// devicelist.cc
#include "networkInfo.hpp" #include "networkInfo.hpp"
#include "mqtt_used.hpp" // 注意:这里应该是 MqttTopicsHandler.hpp #include "mqtt_used.hpp" // 注意:这里应该是 MqttTopicsHandler.hpp
#include "mylog.hpp" #include "mylog.hpp"
#include "nlohmann/json.hpp"
#include "taskrunner.hpp"
// --- 配置信息 --- // --- 配置信息 ---
const std::string BROKER_ADDRESS = "tcp://119.45.167.177:1883"; const std::string BROKER_ADDRESS = "tcp://119.45.167.177:1883";
...@@ -25,8 +28,11 @@ std::string final_publish_topic; ...@@ -25,8 +28,11 @@ std::string final_publish_topic;
std::string g_device_id; std::string g_device_id;
std::shared_ptr<MqttTopicsHandler> g_mqtt_handler; // 改为 shared_ptr 并使用 MqttTopicsHandler 类型 std::shared_ptr<MqttTopicsHandler> g_mqtt_handler; // 改为 shared_ptr 并使用 MqttTopicsHandler 类型
NetworkInfo g_network_addr;//网络ip和mac地址获取的
DeviceList deviceTree;//设备排列和查找的
TaskRunner runner;
NetworkInfo g_network_addr; auto g_mqtt_send_count=0;//发送时间计数,100毫秒计数一次
// 信号处理函数,用于优雅地退出 // 信号处理函数,用于优雅地退出
void signalHandler(int signum) { void signalHandler(int signum) {
...@@ -77,16 +83,25 @@ std::shared_ptr<MqttTopicsHandler> mqtt_init_and_connect() { ...@@ -77,16 +83,25 @@ std::shared_ptr<MqttTopicsHandler> mqtt_init_and_connect() {
auto run_t= json.at("body").at("run_t").get<std::string>(); auto run_t= json.at("body").at("run_t").get<std::string>();
auto device_status = json.at("body").at("status").get<int>(); auto device_status = json.at("body").at("status").get<int>();
if(run_t.empty()||interval_t.empty()) return ;
auto msg_device= "app2dev/" + dev_id; auto msg_device= "app2dev/" + dev_id;
mylog_info("dev_id:{}",msg_device); mylog_info("dev_id:{}",msg_device);
mylog_info("interval_t:{}",interval_t); mylog_info("interval_t:{}",interval_t);
mylog_info("run_t:{}",run_t); mylog_info("run_t:{}",run_t);
mylog_info("device_status:{}",device_status); mylog_info("device_status:{}",device_status);
//g_mqtt_handler->publish( data_topic[topic_index], transfor_json(g_positioning_data_t)); //g_mqtt_handler->publish( data_topic[topic_index], transfor_json(g_positioning_data_t));
if(device_status==1){
} int interval_t_num = std::stoi(interval_t);
int run_t_num = std::stoi(run_t);
catch (const nlohmann::json::exception& e) { deviceTree.addDevice(msg_device, interval_t_num, run_t_num);
}else if(device_status==0){
deviceTree.findDevice(msg_device);
deviceTree.deleteDevice(msg_device);
}
}catch (const nlohmann::json::exception& e) {
mylog_error("处理指令时JSON解析出错:",e.what()); mylog_error("处理指令时JSON解析出错:",e.what());
} }
}); });
...@@ -95,6 +110,64 @@ std::shared_ptr<MqttTopicsHandler> mqtt_init_and_connect() { ...@@ -95,6 +110,64 @@ std::shared_ptr<MqttTopicsHandler> mqtt_init_and_connect() {
return handler; // 返回成功创建并配置好的客户端实例 return handler; // 返回成功创建并配置好的客户端实例
} }
auto json_send_mqtt(int message_type,int pin,int pwm_mode,int val){
// 构造 JSON 对象
nlohmann::json j;
j["head"]["message_type"] = message_type;
j["body"]["pin_setctrl"]["pin"] = pin;
j["body"]["pin_setctrl"]["val"] = val;
j["body"]["pwm_ctrl"]["mode"] = pwm_mode;
j["body"]["pwm_ctrl"]["type"] = 1;
j["body"]["pwm_ctrl"]["val"] = val;
// 转换成字符串(默认紧凑格式,适合网络传输)
std::string payload = j.dump();
return payload;
}
void longRunningTask1() {
mylog_info("[Task 1] Starting a long task...");
// ... 在这里填充 myList 的数据 ...
// 例如: myList.addDevice(...);
const DeviceNode* traverser = deviceTree.getHead();
while (!g_quit) {
if (g_mqtt_send_count > 100) {
//mylog_info("Starting to send MQTT messages for all devices...");
// 每次需要遍历时,都从 myList 获取头节点
traverser = deviceTree.getHead();
while (traverser != nullptr) {
// 在循环中,你可以访问当前节点的任何数据
std::string currentId = traverser->deviceId;
g_mqtt_handler->publish(currentId, json_send_mqtt(3, 26, 1, 80));
mylog_info("已发送设备号:{}", currentId);
// 移动到下一个节点
traverser = traverser->next;
//std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
//mylog_info("Finished sending MQTT messages for all devices.");
// 重要的逻辑:重置计数器,否则这个if会一直满足,导致CPU空转和消息风暴
if(g_mqtt_send_count > 102) g_mqtt_send_count = 0; // 或者设置为一个不会立即触发下一次发送的值
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
g_mqtt_send_count++;
// if(g_mqtt_send_count > 110) g_mqtt_send_count = 0; // 这行逻辑可以和上面的重置合并
}
}
int main() { int main() {
signal(SIGINT, signalHandler); signal(SIGINT, signalHandler);
...@@ -103,14 +176,17 @@ int main() { ...@@ -103,14 +176,17 @@ int main() {
g_client_id = g_network_addr.getip(); g_client_id = g_network_addr.getip();
g_mqtt_handler = mqtt_init_and_connect(); g_mqtt_handler = mqtt_init_and_connect();
if (!g_mqtt_handler) { if (!g_mqtt_handler) {
return 1; // 如果 MQTT 连接失败,则退出 return 1; // 如果 MQTT 连接失败,则退出
} }
runner.addTask(longRunningTask1);
while(!g_quit){ while(!g_quit){
std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300));
} }
return 0; return 0;
} }
...@@ -160,7 +160,7 @@ void MqttController::publish(const std::string& topic, const std::string& payloa ...@@ -160,7 +160,7 @@ void MqttController::publish(const std::string& topic, const std::string& payloa
// --- 内部回调处理类的实现 --- // --- 内部回调处理类的实现 ---
void MqttController::CallbackHandler::connected(const std::string& /*cause*/) { void MqttController::CallbackHandler::connected(const std::string& /*cause*/) {
std::cout << "连接成功!" << std::endl; mylog_info("连接成功!");
controller_.is_connected_ = true; controller_.is_connected_ = true;
if (controller_.on_connect_success_cb_) { if (controller_.on_connect_success_cb_) {
controller_.on_connect_success_cb_(); controller_.on_connect_success_cb_();
......
...@@ -70,7 +70,7 @@ void MqttTopicsHandler::AddSubscription(const std::string& topic, JsonCallback c ...@@ -70,7 +70,7 @@ void MqttTopicsHandler::AddSubscription(const std::string& topic, JsonCallback c
// 将真正的 subscribe 调用封装成一个任务,交给工作线程执行 // 将真正的 subscribe 调用封装成一个任务,交给工作线程执行
post_task([this, topic, wrapped_callback]() { post_task([this, topic, wrapped_callback]() {
std::cout << "工作线程:正在执行订阅主题: " << topic << std::endl; mylog_info("工作线程:正在执行订阅主题: {}", topic);
// 假设 QoS 为 1 // 假设 QoS 为 1
subscribe(topic, 1, wrapped_callback); subscribe(topic, 1, wrapped_callback);
}); });
...@@ -86,10 +86,10 @@ void MqttTopicsHandler::RemoveSubscription(const std::string& topic) { ...@@ -86,10 +86,10 @@ void MqttTopicsHandler::RemoveSubscription(const std::string& topic) {
if (active_subscriptions_.erase(topic) > 0) { if (active_subscriptions_.erase(topic) > 0) {
post_task([this, topic]() { post_task([this, topic]() {
unsubscribe(topic); unsubscribe(topic);
std::cout << "工作线程:已取消订阅主题: " << topic << std::endl; mylog_info("工作线程:已取消订阅主题: {}",topic);
}); });
} else { } else {
std::cerr << "无法移除订阅:主题 \"" << topic << "\" 未被订阅。" << std::endl; mylog_warn("无法移除订阅:主题 {} 未被订阅。" ,topic);
} }
} }
...@@ -97,9 +97,9 @@ void MqttTopicsHandler::RemoveSubscription(const std::string& topic) { ...@@ -97,9 +97,9 @@ void MqttTopicsHandler::RemoveSubscription(const std::string& topic) {
void MqttTopicsHandler::InitBaseSubscription() { void MqttTopicsHandler::InitBaseSubscription() {
AddSubscription("ser/dev", [](const nlohmann::json& json) { AddSubscription("ser/dev", [](const nlohmann::json& json) {
try { try {
std::cout << "[收到系统状态] " << json.dump(2) << std::endl; mylog_info( "[收到系统状态] {}" ,json.dump(2) );
} catch (const std::exception& e) { } catch (const std::exception& e) {
std::cerr << "处理系统状态时出错: " << e.what() << std::endl; mylog_warn( "处理系统状态时出错: {}" , e.what() );
} }
}); });
} }
...@@ -112,7 +112,7 @@ void MqttTopicsHandler::PublishJson( ...@@ -112,7 +112,7 @@ void MqttTopicsHandler::PublishJson(
try { try {
publish(topic, json_payload.dump(), qos, retained); publish(topic, json_payload.dump(), qos, retained);
} catch (const std::exception& e) { } catch (const std::exception& e) {
std::cerr << "向主题 \"" << topic << "\" 发布 JSON 失败: " << e.what() << std::endl; mylog_warn( "向主题 {} 发布 JSON 失败: {}" ,topic,e.what() );
} }
} }
void MqttTopicsHandler::AddSubscription(const std::string& topic, SimpleJsonCallback callback) { void MqttTopicsHandler::AddSubscription(const std::string& topic, SimpleJsonCallback callback) {
......
...@@ -32,7 +32,7 @@ void TaskRunner::addTask(const std::function<void()>& task) { ...@@ -32,7 +32,7 @@ void TaskRunner::addTask(const std::function<void()>& task) {
void TaskRunner::run() { void TaskRunner::run() {
while (true) { while (true) {
std::function<void()> task; std::function<void()> current_task; // 使用不会引起宏冲突的变量名
{ {
std::unique_lock<std::mutex> lock(mtx); std::unique_lock<std::mutex> lock(mtx);
...@@ -42,11 +42,17 @@ void TaskRunner::run() { ...@@ -42,11 +42,17 @@ void TaskRunner::run() {
break; // 退出线程 break; // 退出线程
} }
task = tasks.front(); // 从队列中取出任务
current_task = std::move(tasks.front());
tasks.pop(); tasks.pop();
} }
// 执行任务 // --- 这是关键的修改部分 ---
task(); // 使用 if 语句来检查任务是否有效,并在有效时执行它。
// 这一个代码块就替代了原来简单的 "task();"
if (current_task) {
current_task(); // 在 if 内部调用,确保 current_task 不是空的
}
// -------------------------
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment