55 /** |
56 /** |
56 * @return unique (random) client ID for MQTT to allow multiple simultaneous connections |
57 * @return unique (random) client ID for MQTT to allow multiple simultaneous connections |
57 */ |
58 */ |
58 static std::string generateClientID() { |
59 static std::string generateClientID() { |
59 std::stringstream result; |
60 std::stringstream result; |
60 // result << "relpipe-in-"; |
|
61 std::string symbols("0123456789abcdef"); |
61 std::string symbols("0123456789abcdef"); |
62 |
62 |
63 std::random_device dev; |
63 std::random_device dev; |
64 std::mt19937 rng(dev()); |
64 std::mt19937 rng(dev()); |
65 std::uniform_int_distribution<std::mt19937::result_type> dist6(0, symbols.size()); |
65 std::uniform_int_distribution<std::mt19937::result_type> dist(0, symbols.size()); |
66 |
66 |
67 for (int i = 0; i < 8; i++) result << symbols[dist6(rng)]; |
67 for (int i = 0; i < 8; i++) result << symbols[dist(rng)]; |
68 |
68 |
69 // std::cerr << "generated clien ID = " << result.str() << std::endl; |
|
70 return result.str(); |
69 return result.str(); |
71 } |
70 } |
72 public: |
71 public: |
73 |
72 |
74 MQTTClient(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) : mosqpp::mosquittopp((generateClientID()).c_str()), writer(writer), configuration(configuration) { |
73 MQTTClient(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) : mosqpp::mosquittopp((generateClientID()).c_str()), writer(writer), configuration(configuration) { |
89 return count; |
88 return count; |
90 } |
89 } |
91 |
90 |
92 }; |
91 }; |
93 |
92 |
|
93 void MQTTCommand::parseConnectionString(const std::string& connectionString, std::string& hostname, int& port) { |
|
94 std::regex pattern("mqtt:(//)?([^:]+)(:([0-9]+))?"); |
|
95 std::smatch match; |
|
96 if (std::regex_match(connectionString, match, pattern)) { |
|
97 hostname = match[2]; |
|
98 port = stoi(match[4]); |
|
99 } else { |
|
100 throw std::invalid_argument("Invalid connection string format. Expecting something like: mqtt://localhost:1883"); |
|
101 } |
|
102 } |
|
103 |
|
104 void MQTTCommand::check(std::string operation, int result) { |
|
105 if (result) throw std::logic_error("mosquitto operation failed: " + operation + " = " + std::to_string(result)); |
|
106 } |
|
107 |
94 void MQTTCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) { |
108 void MQTTCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) { |
95 std::shared_ptr<MQTTClient> mq = std::make_shared<MQTTClient>(writer, configuration); |
109 std::shared_ptr<MQTTClient> mq = std::make_shared<MQTTClient>(writer, configuration); |
96 |
110 |
97 writer->startRelation(configuration.relation,{ |
111 writer->startRelation(configuration.relation,{ |
98 {L"stream", TypeId::STRING}, |
112 {L"stream", TypeId::STRING}, |
99 {L"text", TypeId::STRING}, |
113 {L"text", TypeId::STRING}, |
100 {L"data", TypeId::STRING} |
114 {L"data", TypeId::STRING} |
101 }, true); |
115 }, true); |
102 |
116 |
103 mq->max_inflight_messages_set(1); |
117 std::string connectionString = convertor.to_bytes(configuration.connectionString); |
104 mq->connect("localhost", 1883); |
118 std::string username; |
|
119 std::string password; |
|
120 std::string hostname; |
|
121 int port; |
|
122 |
|
123 parseConnectionString(connectionString, hostname, port); |
|
124 |
|
125 for (auto o : configuration.connectionOptions) { |
|
126 if (o.name == L"username") username = convertor.to_bytes(o.value); |
|
127 else if (o.name == L"password") password = convertor.to_bytes(o.value); |
|
128 else throw std::invalid_argument("Unsupported connection option: " + convertor.to_bytes(o.name)); |
|
129 } |
|
130 |
|
131 if (username.size()) check("set credentials", mq->username_pw_set(username.c_str(), password.c_str())); |
|
132 |
|
133 check("set maximum inflight messages", mq->max_inflight_messages_set(1)); |
|
134 check("connect", mq->connect(hostname.c_str(), port)); |
105 int mid; |
135 int mid; |
106 mq->subscribe(&mid, convertor.to_bytes(configuration.stream).c_str()); |
136 check("substcribe", mq->subscribe(&mid, convertor.to_bytes(configuration.stream).c_str())); |
107 |
137 |
108 //for (int i = configuration.messageCount; continueProcessing && i > 0; i--) { |
138 //for (int i = configuration.messageCount; continueProcessing && i > 0; i--) { |
109 for (int i = configuration.messageCount; continueProcessing && i > 0; i = i - mq->popMessageCount()) { |
139 for (int i = configuration.messageCount; continueProcessing && i > 0; i = i - mq->popMessageCount()) { |
110 // std::cerr << "loop(): i=" << i << std::endl; |
140 // std::cerr << "loop(): i=" << i << std::endl; |
111 |
141 |
112 //mq->loop(); |
142 //mq->loop(); |
113 mq->loop(1000, 1); |
143 check("loop", mq->loop(1000, 1)); |
114 //mq->loop(1000, -1); |
144 //mq->loop(1000, -1); |
115 //mq->loop_forever(); |
145 //mq->loop_forever(); |
116 //mq->loop_write(); |
146 //mq->loop_write(); |
117 } |
147 } |
118 |
148 |