diff -r 0799eaf338b9 -r 610783d70ae9 src/MQTTCommand.cpp --- a/src/MQTTCommand.cpp Sun May 08 21:42:14 2022 +0200 +++ b/src/MQTTCommand.cpp Sun Jun 05 22:51:45 2022 +0200 @@ -24,6 +24,7 @@ #include #include #include +#include #include @@ -57,16 +58,14 @@ */ static std::string generateClientID() { std::stringstream result; - // result << "relpipe-in-"; std::string symbols("0123456789abcdef"); std::random_device dev; std::mt19937 rng(dev()); - std::uniform_int_distribution dist6(0, symbols.size()); + std::uniform_int_distribution dist(0, symbols.size()); - for (int i = 0; i < 8; i++) result << symbols[dist6(rng)]; + for (int i = 0; i < 8; i++) result << symbols[dist(rng)]; - // std::cerr << "generated clien ID = " << result.str() << std::endl; return result.str(); } public: @@ -91,6 +90,21 @@ }; +void MQTTCommand::parseConnectionString(const std::string& connectionString, std::string& hostname, int& port) { + std::regex pattern("mqtt:(//)?([^:]+)(:([0-9]+))?"); + std::smatch match; + if (std::regex_match(connectionString, match, pattern)) { + hostname = match[2]; + port = stoi(match[4]); + } else { + throw std::invalid_argument("Invalid connection string format. Expecting something like: mqtt://localhost:1883"); + } +} + +void MQTTCommand::check(std::string operation, int result) { + if (result) throw std::logic_error("mosquitto operation failed: " + operation + " = " + std::to_string(result)); +} + void MQTTCommand::process(std::shared_ptr writer, Configuration& configuration) { std::shared_ptr mq = std::make_shared(writer, configuration); @@ -100,17 +114,33 @@ {L"data", TypeId::STRING} }, true); - mq->max_inflight_messages_set(1); - mq->connect("localhost", 1883); + std::string connectionString = convertor.to_bytes(configuration.connectionString); + std::string username; + std::string password; + std::string hostname; + int port; + + parseConnectionString(connectionString, hostname, port); + + for (auto o : configuration.connectionOptions) { + if (o.name == L"username") username = convertor.to_bytes(o.value); + else if (o.name == L"password") password = convertor.to_bytes(o.value); + else throw std::invalid_argument("Unsupported connection option: " + convertor.to_bytes(o.name)); + } + + if (username.size()) check("set credentials", mq->username_pw_set(username.c_str(), password.c_str())); + + check("set maximum inflight messages", mq->max_inflight_messages_set(1)); + check("connect", mq->connect(hostname.c_str(), port)); int mid; - mq->subscribe(&mid, convertor.to_bytes(configuration.stream).c_str()); + check("substcribe", mq->subscribe(&mid, convertor.to_bytes(configuration.stream).c_str())); //for (int i = configuration.messageCount; continueProcessing && i > 0; i--) { for (int i = configuration.messageCount; continueProcessing && i > 0; i = i - mq->popMessageCount()) { // std::cerr << "loop(): i=" << i << std::endl; //mq->loop(); - mq->loop(1000, 1); + check("loop", mq->loop(1000, 1)); //mq->loop(1000, -1); //mq->loop_forever(); //mq->loop_write();