--- a/src/MQTTCommand.cpp Sun May 08 21:42:14 2022 +0200
+++ b/src/MQTTCommand.cpp Sun Jun 05 22:51:45 2022 +0200
@@ -24,6 +24,7 @@
#include <sstream>
#include <iomanip>
#include <random>
+#include <regex>
#include <mosquittopp.h>
@@ -57,16 +58,14 @@
*/
static std::string generateClientID() {
std::stringstream result;
- // result << "relpipe-in-";
std::string symbols("0123456789abcdef");
std::random_device dev;
std::mt19937 rng(dev());
- std::uniform_int_distribution<std::mt19937::result_type> dist6(0, symbols.size());
+ std::uniform_int_distribution<std::mt19937::result_type> dist(0, symbols.size());
- for (int i = 0; i < 8; i++) result << symbols[dist6(rng)];
+ for (int i = 0; i < 8; i++) result << symbols[dist(rng)];
- // std::cerr << "generated clien ID = " << result.str() << std::endl;
return result.str();
}
public:
@@ -91,6 +90,21 @@
};
+void MQTTCommand::parseConnectionString(const std::string& connectionString, std::string& hostname, int& port) {
+ std::regex pattern("mqtt:(//)?([^:]+)(:([0-9]+))?");
+ std::smatch match;
+ if (std::regex_match(connectionString, match, pattern)) {
+ hostname = match[2];
+ port = stoi(match[4]);
+ } else {
+ throw std::invalid_argument("Invalid connection string format. Expecting something like: mqtt://localhost:1883");
+ }
+}
+
+void MQTTCommand::check(std::string operation, int result) {
+ if (result) throw std::logic_error("mosquitto operation failed: " + operation + " = " + std::to_string(result));
+}
+
void MQTTCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) {
std::shared_ptr<MQTTClient> mq = std::make_shared<MQTTClient>(writer, configuration);
@@ -100,17 +114,33 @@
{L"data", TypeId::STRING}
}, true);
- mq->max_inflight_messages_set(1);
- mq->connect("localhost", 1883);
+ std::string connectionString = convertor.to_bytes(configuration.connectionString);
+ std::string username;
+ std::string password;
+ std::string hostname;
+ int port;
+
+ parseConnectionString(connectionString, hostname, port);
+
+ for (auto o : configuration.connectionOptions) {
+ if (o.name == L"username") username = convertor.to_bytes(o.value);
+ else if (o.name == L"password") password = convertor.to_bytes(o.value);
+ else throw std::invalid_argument("Unsupported connection option: " + convertor.to_bytes(o.name));
+ }
+
+ if (username.size()) check("set credentials", mq->username_pw_set(username.c_str(), password.c_str()));
+
+ check("set maximum inflight messages", mq->max_inflight_messages_set(1));
+ check("connect", mq->connect(hostname.c_str(), port));
int mid;
- mq->subscribe(&mid, convertor.to_bytes(configuration.stream).c_str());
+ check("substcribe", mq->subscribe(&mid, convertor.to_bytes(configuration.stream).c_str()));
//for (int i = configuration.messageCount; continueProcessing && i > 0; i--) {
for (int i = configuration.messageCount; continueProcessing && i > 0; i = i - mq->popMessageCount()) {
// std::cerr << "loop(): i=" << i << std::endl;
//mq->loop();
- mq->loop(1000, 1);
+ check("loop", mq->loop(1000, 1));
//mq->loop(1000, -1);
//mq->loop_forever();
//mq->loop_write();