# HG changeset patch # User František Kučera # Date 1654380532 -7200 # Node ID 62ced7c41c67952894425c06c6b042c500639a3d # Parent 1a0fbd17db1325cdee3844db7cda25db0f8b0c92 parse connection string, credentials, check return values diff -r 1a0fbd17db13 -r 62ced7c41c67 bash-completion.sh --- a/bash-completion.sh Sun May 08 21:42:25 2022 +0200 +++ b/bash-completion.sh Sun Jun 05 00:08:52 2022 +0200 @@ -22,22 +22,22 @@ w2=${COMP_WORDS[COMP_CWORD-2]} w3=${COMP_WORDS[COMP_CWORD-3]} - DATA_TYPE=( - "string" - "integer" - "boolean" - ) - - BOOLEAN_VALUES=( - "true" - "false" + CONNECTION_OPTIONS=( + "username" + "password" ) if [[ "$w1" == "--relation" && "x$w0" == "x" ]]; then COMPREPLY=("''") + elif [[ "$w1" == "--connection-string" && "x$w0" == "x" ]]; then COMPREPLY=("'mqtt://localhost:1883'") + elif [[ "$w1" == "--connection-option" ]]; then COMPREPLY=($(compgen -W "${CONNECTION_OPTIONS[*]}" -- "$w0")) + elif [[ "$w2" == "--connection-option" && "x$w0" == "x" ]]; then COMPREPLY=("''") elif [[ "$w1" == "--stream" && "x$w0" == "x" ]]; then COMPREPLY=("''") else OPTIONS=( "--relation" + #"--connection-name" + "--connection-string" + "--connection-option" "--stream" ) COMPREPLY=($(compgen -W "${OPTIONS[*]}" -- "$w0")) diff -r 1a0fbd17db13 -r 62ced7c41c67 src/CLIParser.h --- a/src/CLIParser.h Sun May 08 21:42:25 2022 +0200 +++ b/src/CLIParser.h Sun Jun 05 00:08:52 2022 +0200 @@ -41,6 +41,8 @@ static const relpipe::common::type::StringX OPTION_RELATION; static const relpipe::common::type::StringX OPTION_STREAM; + static const relpipe::common::type::StringX OPTION_CONNECTION_STRING; + static const relpipe::common::type::StringX OPTION_CONNECTION_OPTION; Configuration parse(const std::vector& arguments) { Configuration c; @@ -52,6 +54,12 @@ c.relation = readNext(arguments, i); } else if (option == OPTION_STREAM) { c.stream = readNext(arguments, i); + } else if (option == OPTION_CONNECTION_STRING) { + c.connectionString = readNext(arguments, i); + } else if (option == OPTION_CONNECTION_OPTION) { + auto name = readNext(arguments, i); + auto value = readNext(arguments, i); + c.connectionOptions.push_back({name, value}); } else throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); } @@ -64,6 +72,8 @@ const relpipe::common::type::StringX CLIParser::OPTION_RELATION = L"--relation"; const relpipe::common::type::StringX CLIParser::OPTION_STREAM = L"--stream"; +const relpipe::common::type::StringX CLIParser::OPTION_CONNECTION_STRING = L"--connection-string"; +const relpipe::common::type::StringX CLIParser::OPTION_CONNECTION_OPTION = L"--connection-option"; } } diff -r 1a0fbd17db13 -r 62ced7c41c67 src/Configuration.h --- a/src/Configuration.h Sun May 08 21:42:25 2022 +0200 +++ b/src/Configuration.h Sun Jun 05 00:08:52 2022 +0200 @@ -29,9 +29,24 @@ class Configuration { public: + class ConnectionOption { + public: + relpipe::common::type::StringX name; + relpipe::common::type::StringX value; + + ConnectionOption() = default; + + ConnectionOption(relpipe::common::type::StringX name, relpipe::common::type::StringX value) : name(name), value(value) { + } + + + }; + relpipe::common::type::StringX relation = L"message"; relpipe::common::type::StringX stream = L"relpipe"; - + relpipe::common::type::StringX connectionString = L"mqtt://localhost:1883"; + std::vector connectionOptions; + virtual ~Configuration() { } }; diff -r 1a0fbd17db13 -r 62ced7c41c67 src/MQTTHandler.h --- a/src/MQTTHandler.h Sun May 08 21:42:25 2022 +0200 +++ b/src/MQTTHandler.h Sun Jun 05 00:08:52 2022 +0200 @@ -24,6 +24,7 @@ #include #include #include +#include #include @@ -50,16 +51,14 @@ */ static std::string generateClientID() { std::stringstream result; - // result << "relpipe-out-"; std::string symbols("0123456789abcdef"); std::random_device dev; std::mt19937 rng(dev()); - std::uniform_int_distribution dist6(0, symbols.size()); + std::uniform_int_distribution dist(0, symbols.size()); - for (int i = 0; i < 8; i++) result << symbols[dist6(rng)]; + for (int i = 0; i < 8; i++) result << symbols[dist(rng)]; - // std::cerr << "generated clien ID = " << result.str() << std::endl; return result.str(); } @@ -70,10 +69,45 @@ std::string currentValue; } currentRelation; + static void parseConnectionString(const std::string& connectionString, std::string& hostname, int& port) { + std::regex pattern("mqtt:(//)?([^:]+)(:([0-9]+))?"); + std::smatch match; + if (std::regex_match(connectionString, match, pattern)) { + hostname = match[2]; + port = stoi(match[4]); + } else { + throw std::invalid_argument("Invalid connection string format. Expecting something like: mqtt://localhost:1883"); + } + } + + static void check(std::string operation, int result) { + if (result) throw std::logic_error("mosquitto operation failed: " + operation + " = " + std::to_string(result)); + } + public: - MQTTHandler(Configuration configuration) : configuration(configuration) { - mq->connect("localhost", 1883); + static MQTTHandler* create(Configuration configuration) { + MQTTHandler* h = new MQTTHandler(); + + std::string connectionString = h->convertor.to_bytes(configuration.connectionString); + std::string username; + std::string password; + std::string hostname; + int port; + + parseConnectionString(connectionString, hostname, port); + + for (auto o : configuration.connectionOptions) { + if (o.name == L"username") username = h->convertor.to_bytes(o.value); + else if (o.name == L"password") password = h->convertor.to_bytes(o.value); + else throw std::invalid_argument("Unsupported connection option: " + h->convertor.to_bytes(o.name)); + } + + if (username.size()) check("set credentials", h->mq->username_pw_set(username.c_str(), password.c_str())); + + check("connect", h->mq->connect(hostname.c_str(), port)); + + return h; } void startRelation(relpipe::common::type::StringX name, std::vector attributes) override { @@ -93,14 +127,14 @@ if (currentRelation.attributeIndex == currentRelation.attributes.size()) { currentRelation.attributeIndex = 0; int mid = -1; - mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str()); + check("publish", mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str())); // std::cerr << "MQTT message enqueued: " << mid << std::endl; } } void endOfPipe() { - mq->disconnect(); + check("disconnect", mq->disconnect()); } }; diff -r 1a0fbd17db13 -r 62ced7c41c67 src/relpipe-out-mqtt.cpp --- a/src/relpipe-out-mqtt.cpp Sun May 08 21:42:25 2022 +0200 +++ b/src/relpipe-out-mqtt.cpp Sun Jun 05 00:08:52 2022 +0200 @@ -44,8 +44,8 @@ CLIParser cliParser; Configuration configuration = cliParser.parse(cli.arguments()); std::shared_ptr reader(Factory::create(std::cin)); - MQTTHandler handler(configuration); - reader->addHandler(&handler); + std::shared_ptr handler(MQTTHandler::create(configuration)); + reader->addHandler(handler.get()); reader->process(); resultCode = CLI::EXIT_CODE_SUCCESS; } catch (RelpipeCLIException e) {