Commit 0f075104 authored by 957dd's avatar 957dd

加入了动态订阅,发布还没有

parent e179dc11
This diff is collapsed.
......@@ -227,6 +227,7 @@ CMakeFiles/main.dir/src/main.cc.o: /home/orangepi/gps/master/src/main.cc \
/usr/include/c++/11/bits/locale_conv.h \
/usr/include/c++/11/bits/quoted_string.h /usr/include/c++/11/sstream \
/usr/include/c++/11/bits/sstream.tcc \
/home/orangepi/gps/master/include/device_tag_bind.hpp \
/home/orangepi/gps/master/include/serial_port.hpp \
/usr/include/c++/11/functional /usr/include/c++/11/bits/std_function.h \
/usr/include/c++/11/unordered_map /usr/include/c++/11/bits/hashtable.h \
......@@ -370,4 +371,9 @@ CMakeFiles/main.dir/src/main.cc.o: /home/orangepi/gps/master/src/main.cc \
/usr/include/MQTTSubscribeOpts.h /usr/include/mqtt/will_options.h \
/usr/include/mqtt/ssl_options.h /usr/include/mqtt/disconnect_options.h \
/usr/include/c++/11/list /usr/include/c++/11/bits/stl_list.h \
/usr/include/c++/11/bits/list.tcc
/usr/include/c++/11/bits/list.tcc \
/home/orangepi/gps/master/include/file_operations.hpp \
/usr/include/c++/11/fstream \
/usr/include/aarch64-linux-gnu/c++/11/bits/basic_file.h \
/usr/include/aarch64-linux-gnu/c++/11/bits/c++io.h \
/usr/include/c++/11/bits/fstream.tcc
No preview for this file type
This source diff could not be displayed because it is too large. You can view the blob instead.
#pragma once
#include <iostream>
#include <string>
#include <vector>
struct deviceTable {
struct Entry {
std::string tag_id;
std::string device_id;
};
std::vector<Entry> table;
// 插入或更新
void set(const std::string& tag_id, const std::string& device_id) {
// 先查找是否已存在相同的 tag_id
for (auto& e : table) {
if (e.tag_id == tag_id) {
e.device_id = device_id; // 覆盖
return;
}
}
// 如果不存在并且没超 20 个,就插入
if (table.size() < 20) {
table.push_back({tag_id, device_id});
} else {
std::cerr << "设备表已满,无法插入更多数据!" << std::endl;
}
}
// 打印全部
void print() const {
std::cout << "设备表:" << std::endl;
for (const auto& e : table) {
std::cout << "Tag ID: " << e.tag_id
<< " -> Device ID: " << e.device_id << std::endl;
}
}
std::string find(std::string find_tag_id) const {
for (const auto& e : table) {
if(find_tag_id == e.device_id){
return e.tag_id;
}
}
return "empty";
}
};
\ No newline at end of file
......@@ -4,6 +4,8 @@
#include <sstream>
#include <string>
#define DEVICE_FILE_PATH "/home/orangepi/car/master/Deviceld.txt"
class FileReader{
private:
std::string _filePath;
......
......@@ -5,9 +5,11 @@
#include <thread>
#include <chrono>
#include <iomanip>
#include "device_tag_bind.hpp"
#include "serial_port.hpp"
#include "tank_data.hpp"
#include "mqtt_used.hpp" // 注意:这里应该是 MqttTopicsHandler.hpp
#include "file_operations.hpp"
// --- 配置信息 ---
const std::string BROKER_ADDRESS = "tcp://119.45.167.177:1883";
......@@ -16,14 +18,18 @@ const std::string USERNAME = "admin";
const std::string PASSWORD = "admin";
// 主题定义
const std::string DYNAMIC_CMD_TOPIC = "ser2dev/CN040300000002"; // 用于接收动态订阅指令的主题
const std::string PUBLISH_TOPIC_PREFIX = "positioning/"; // 发布数据的主题前缀
const std::string g_base_topic_prefix = "ser2dev/"; // 用于接收动态订阅指令的主题
const std::string publish_topic_prefix = "positioning/"; // 发布数据的主题前缀
std::string g_device_id;
// --- 全局变量 ---
// 使用智能指针管理串口和MQTT客户端的生命周期
std::unique_ptr<SerialPort> g_serial_port;
std::shared_ptr<MqttTopicsHandler> g_mqtt_handler; // 改为 shared_ptr 并使用 MqttTopicsHandler 类型
std::shared_ptr<FileReader> g_file_read;
deviceTable g_device_tag_t;
// 串口数据相关的全局变量
std::vector<uint16_t> x_local(21);
std::vector<uint16_t> y_local(21);
......@@ -68,6 +74,14 @@ void onSerialMessageReceived(const std::vector<uint8_t>& message) {
}
}
std::string file_open_read(){
g_file_read->open();
return g_device_id=g_file_read->readAll();
}
// MQTT 初始化和配置函数
// **关键改动**: 此函数现在负责创建、配置并返回一个可用的 MQTT 客户端实例
std::shared_ptr<MqttTopicsHandler> mqtt_init_and_connect() {
......@@ -85,24 +99,39 @@ std::shared_ptr<MqttTopicsHandler> mqtt_init_and_connect() {
std::cout << "MQTT连接成功。" << std::endl;
handler->AddSubscription(DYNAMIC_CMD_TOPIC,
std::string g_base_topic = g_base_topic_prefix + g_device_id;
// 构造发布主题,例如:positioning/CN040300000002
std::string final_publish_topic = publish_topic_prefix + g_device_id;
handler->AddSubscription(g_base_topic,
[weak_handler = std::weak_ptr<MqttTopicsHandler>(handler)](const std::string& topic, const nlohmann::json& json) {
std::cout << "收到来自指令主题 [" << topic << "] 的消息。" << std::endl;
try {
if (json.value("/head/message_type"_json_pointer, 0) != 5000) return;
auto dev_id = json.at("body").at("dev_id").get<std::string>();
auto data_topic = "ser2dev/" + dev_id;
auto data_topic = "app2dev/" + dev_id;
g_device_tag_t.set(dev_id, data_topic);
if (auto shared_handler = weak_handler.lock()) {
std::cout << "指令解析成功,准备动态订阅主题: " << data_topic << std::endl;
// 调用 AddSubscription,它现在会将订阅任务 post 到工作线程,避免死锁
shared_handler->AddSubscription(data_topic,
[](const std::string& sub_topic, const nlohmann::json& sub_json) {
if (sub_json.contains("value")) {
std::cout << "动态订阅消息 -> 来自 [" << sub_topic
<< "] 的传感器数据: "
<< sub_json["value"] << std::endl;
std::cout<<"发送成功"<<std::endl;
if (sub_json.contains("head") &&
sub_json["head"].is_object() &&
sub_json["head"].contains("message_type") &&
sub_json["head"]["message_type"].is_number_integer() &&
sub_json["head"]["message_type"].get<int>() == 3){
std::cout<<"发送成功123"<<std::endl;
std::string device_local_label_id=g_device_tag_t.find(sub_topic);
if(device_local_label_id!="empty"){
std::cout<<"发送成123功"<<std::endl;
// 使用全局的 g_mqtt_handler 来发布
//mqtt_handler->PublishJson(final_publish_topic, root_json);
}
}
});
}
......@@ -114,6 +143,8 @@ std::shared_ptr<MqttTopicsHandler> mqtt_init_and_connect() {
return handler; // 返回成功创建并配置好的客户端实例
}
int main(int argc, char* argv[]) {
// 注册信号处理
signal(SIGINT, signalHandler);
......@@ -123,6 +154,10 @@ int main(int argc, char* argv[]) {
tag_id.push_back(i);
}
g_file_read = std::make_shared<FileReader>(DEVICE_FILE_PATH);
g_device_id = file_open_read();
// --- 1. 初始化 MQTT ---
g_mqtt_handler = mqtt_init_and_connect();
if (!g_mqtt_handler) {
......@@ -168,11 +203,7 @@ int main(int argc, char* argv[]) {
root_json["body"] = tank_data;
root_json["head"] = {{"message_type", 1}};
// 构造发布主题,例如:positioning/CN040300000002
std::string final_publish_topic = PUBLISH_TOPIC_PREFIX + CLIENT_ID;
// 使用全局的 g_mqtt_handler 来发布
g_mqtt_handler->PublishJson(final_publish_topic, root_json);
}
// 每秒发布一次所有标签的数据
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment