diff -r 1a0fbd17db13 -r 62ced7c41c67 src/MQTTHandler.h --- a/src/MQTTHandler.h Sun May 08 21:42:25 2022 +0200 +++ b/src/MQTTHandler.h Sun Jun 05 00:08:52 2022 +0200 @@ -24,6 +24,7 @@ #include #include #include +#include #include @@ -50,16 +51,14 @@ */ static std::string generateClientID() { std::stringstream result; - // result << "relpipe-out-"; std::string symbols("0123456789abcdef"); std::random_device dev; std::mt19937 rng(dev()); - std::uniform_int_distribution dist6(0, symbols.size()); + std::uniform_int_distribution dist(0, symbols.size()); - for (int i = 0; i < 8; i++) result << symbols[dist6(rng)]; + for (int i = 0; i < 8; i++) result << symbols[dist(rng)]; - // std::cerr << "generated clien ID = " << result.str() << std::endl; return result.str(); } @@ -70,10 +69,45 @@ std::string currentValue; } currentRelation; + static void parseConnectionString(const std::string& connectionString, std::string& hostname, int& port) { + std::regex pattern("mqtt:(//)?([^:]+)(:([0-9]+))?"); + std::smatch match; + if (std::regex_match(connectionString, match, pattern)) { + hostname = match[2]; + port = stoi(match[4]); + } else { + throw std::invalid_argument("Invalid connection string format. Expecting something like: mqtt://localhost:1883"); + } + } + + static void check(std::string operation, int result) { + if (result) throw std::logic_error("mosquitto operation failed: " + operation + " = " + std::to_string(result)); + } + public: - MQTTHandler(Configuration configuration) : configuration(configuration) { - mq->connect("localhost", 1883); + static MQTTHandler* create(Configuration configuration) { + MQTTHandler* h = new MQTTHandler(); + + std::string connectionString = h->convertor.to_bytes(configuration.connectionString); + std::string username; + std::string password; + std::string hostname; + int port; + + parseConnectionString(connectionString, hostname, port); + + for (auto o : configuration.connectionOptions) { + if (o.name == L"username") username = h->convertor.to_bytes(o.value); + else if (o.name == L"password") password = h->convertor.to_bytes(o.value); + else throw std::invalid_argument("Unsupported connection option: " + h->convertor.to_bytes(o.name)); + } + + if (username.size()) check("set credentials", h->mq->username_pw_set(username.c_str(), password.c_str())); + + check("connect", h->mq->connect(hostname.c_str(), port)); + + return h; } void startRelation(relpipe::common::type::StringX name, std::vector attributes) override { @@ -93,14 +127,14 @@ if (currentRelation.attributeIndex == currentRelation.attributes.size()) { currentRelation.attributeIndex = 0; int mid = -1; - mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str()); + check("publish", mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str())); // std::cerr << "MQTT message enqueued: " << mid << std::endl; } } void endOfPipe() { - mq->disconnect(); + check("disconnect", mq->disconnect()); } };