48 /** |
49 /** |
49 * @return unique (random) client ID for MQTT to allow multiple simultaneous connections |
50 * @return unique (random) client ID for MQTT to allow multiple simultaneous connections |
50 */ |
51 */ |
51 static std::string generateClientID() { |
52 static std::string generateClientID() { |
52 std::stringstream result; |
53 std::stringstream result; |
53 // result << "relpipe-out-"; |
|
54 std::string symbols("0123456789abcdef"); |
54 std::string symbols("0123456789abcdef"); |
55 |
55 |
56 std::random_device dev; |
56 std::random_device dev; |
57 std::mt19937 rng(dev()); |
57 std::mt19937 rng(dev()); |
58 std::uniform_int_distribution<std::mt19937::result_type> dist6(0, symbols.size()); |
58 std::uniform_int_distribution<std::mt19937::result_type> dist(0, symbols.size()); |
59 |
59 |
60 for (int i = 0; i < 8; i++) result << symbols[dist6(rng)]; |
60 for (int i = 0; i < 8; i++) result << symbols[dist(rng)]; |
61 |
61 |
62 // std::cerr << "generated clien ID = " << result.str() << std::endl; |
|
63 return result.str(); |
62 return result.str(); |
64 } |
63 } |
65 |
64 |
66 struct CurrentRelation { |
65 struct CurrentRelation { |
67 relpipe::common::type::StringX name; |
66 relpipe::common::type::StringX name; |
68 std::vector<relpipe::reader::handlers::AttributeMetadata> attributes; |
67 std::vector<relpipe::reader::handlers::AttributeMetadata> attributes; |
69 relpipe::common::type::Integer attributeIndex = 0; |
68 relpipe::common::type::Integer attributeIndex = 0; |
70 std::string currentValue; |
69 std::string currentValue; |
71 } currentRelation; |
70 } currentRelation; |
72 |
71 |
|
72 static void parseConnectionString(const std::string& connectionString, std::string& hostname, int& port) { |
|
73 std::regex pattern("mqtt:(//)?([^:]+)(:([0-9]+))?"); |
|
74 std::smatch match; |
|
75 if (std::regex_match(connectionString, match, pattern)) { |
|
76 hostname = match[2]; |
|
77 port = stoi(match[4]); |
|
78 } else { |
|
79 throw std::invalid_argument("Invalid connection string format. Expecting something like: mqtt://localhost:1883"); |
|
80 } |
|
81 } |
|
82 |
|
83 static void check(std::string operation, int result) { |
|
84 if (result) throw std::logic_error("mosquitto operation failed: " + operation + " = " + std::to_string(result)); |
|
85 } |
|
86 |
73 public: |
87 public: |
74 |
88 |
75 MQTTHandler(Configuration configuration) : configuration(configuration) { |
89 static MQTTHandler* create(Configuration configuration) { |
76 mq->connect("localhost", 1883); |
90 MQTTHandler* h = new MQTTHandler(); |
|
91 |
|
92 std::string connectionString = h->convertor.to_bytes(configuration.connectionString); |
|
93 std::string username; |
|
94 std::string password; |
|
95 std::string hostname; |
|
96 int port; |
|
97 |
|
98 parseConnectionString(connectionString, hostname, port); |
|
99 |
|
100 for (auto o : configuration.connectionOptions) { |
|
101 if (o.name == L"username") username = h->convertor.to_bytes(o.value); |
|
102 else if (o.name == L"password") password = h->convertor.to_bytes(o.value); |
|
103 else throw std::invalid_argument("Unsupported connection option: " + h->convertor.to_bytes(o.name)); |
|
104 } |
|
105 |
|
106 if (username.size()) check("set credentials", h->mq->username_pw_set(username.c_str(), password.c_str())); |
|
107 |
|
108 check("connect", h->mq->connect(hostname.c_str(), port)); |
|
109 |
|
110 return h; |
77 } |
111 } |
78 |
112 |
79 void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override { |
113 void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override { |
80 // TODO: check relation name according to the configuration |
114 // TODO: check relation name according to the configuration |
81 currentRelation = CurrentRelation{name, attributes}; |
115 currentRelation = CurrentRelation{name, attributes}; |
91 |
125 |
92 currentRelation.attributeIndex++; |
126 currentRelation.attributeIndex++; |
93 if (currentRelation.attributeIndex == currentRelation.attributes.size()) { |
127 if (currentRelation.attributeIndex == currentRelation.attributes.size()) { |
94 currentRelation.attributeIndex = 0; |
128 currentRelation.attributeIndex = 0; |
95 int mid = -1; |
129 int mid = -1; |
96 mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str()); |
130 check("publish", mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str())); |
97 // std::cerr << "MQTT message enqueued: " << mid << std::endl; |
131 // std::cerr << "MQTT message enqueued: " << mid << std::endl; |
98 } |
132 } |
99 |
133 |
100 } |
134 } |
101 |
135 |
102 void endOfPipe() { |
136 void endOfPipe() { |
103 mq->disconnect(); |
137 check("disconnect", mq->disconnect()); |
104 } |
138 } |
105 |
139 |
106 }; |
140 }; |
107 |
141 |
108 } |
142 } |