# HG changeset patch # User František Kučera # Date 1654462305 -7200 # Node ID 610783d70ae97442488cad15986170faee616fe5 # Parent 0799eaf338b9064d47b9596e13180c86bbc106c8 parse connection string, credentials, check return values diff -r 0799eaf338b9 -r 610783d70ae9 bash-completion.sh --- a/bash-completion.sh Sun May 08 21:42:14 2022 +0200 +++ b/bash-completion.sh Sun Jun 05 22:51:45 2022 +0200 @@ -22,23 +22,23 @@ w2=${COMP_WORDS[COMP_CWORD-2]} w3=${COMP_WORDS[COMP_CWORD-3]} - DATA_TYPE=( - "string" - "integer" - "boolean" - ) - - BOOLEAN_VALUES=( - "true" - "false" + CONNECTION_OPTIONS=( + "username" + "password" ) if [[ "$w1" == "--relation" && "x$w0" == "x" ]]; then COMPREPLY=("''") + elif [[ "$w1" == "--connection-string" && "x$w0" == "x" ]]; then COMPREPLY=("'mqtt://localhost:1883'") + elif [[ "$w1" == "--connection-option" ]]; then COMPREPLY=($(compgen -W "${CONNECTION_OPTIONS[*]}" -- "$w0")) + elif [[ "$w2" == "--connection-option" && "x$w0" == "x" ]]; then COMPREPLY=("''") elif [[ "$w1" == "--stream" && "x$w0" == "x" ]]; then COMPREPLY=("''") elif [[ "$w1" == "--message-count" && "x$w0" == "x" ]]; then COMPREPLY=("1") else OPTIONS=( "--relation" + #"--connection-name" + "--connection-string" + "--connection-option" "--stream" "--message-count" ) diff -r 0799eaf338b9 -r 610783d70ae9 src/CLIParser.h --- a/src/CLIParser.h Sun May 08 21:42:14 2022 +0200 +++ b/src/CLIParser.h Sun Jun 05 22:51:45 2022 +0200 @@ -19,7 +19,7 @@ #include #include -#include +#include #include #include @@ -32,27 +32,35 @@ class CLIParser { private: - relpipe::writer::string_t readNext(const std::vector& arguments, int& i) { + relpipe::common::type::StringX readNext(const std::vector& arguments, int& i) { if (i < arguments.size()) return arguments[i++]; else throw relpipe::cli::RelpipeCLIException(L"Missing CLI argument" + (i > 0 ? (L" after " + arguments[i - 1]) : L""), relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); } public: - static const relpipe::writer::string_t OPTION_RELATION; - static const relpipe::writer::string_t OPTION_STREAM; - static const relpipe::writer::string_t OPTION_MESSAGE_COUNT; + static const relpipe::common::type::StringX OPTION_RELATION; + static const relpipe::common::type::StringX OPTION_STREAM; + static const relpipe::common::type::StringX OPTION_CONNECTION_STRING; + static const relpipe::common::type::StringX OPTION_CONNECTION_OPTION; + static const relpipe::common::type::StringX OPTION_MESSAGE_COUNT; - Configuration parse(const std::vector& arguments) { + Configuration parse(const std::vector& arguments) { Configuration c; for (int i = 0; i < arguments.size();) { - relpipe::writer::string_t option = readNext(arguments, i); + relpipe::common::type::StringX option = readNext(arguments, i); if (option == OPTION_RELATION) { c.relation = readNext(arguments, i); } else if (option == OPTION_STREAM) { c.stream = readNext(arguments, i); + } else if (option == OPTION_CONNECTION_STRING) { + c.connectionString = readNext(arguments, i); + } else if (option == OPTION_CONNECTION_OPTION) { + auto name = readNext(arguments, i); + auto value = readNext(arguments, i); + c.connectionOptions.push_back({name, value}); } else if (option == OPTION_MESSAGE_COUNT) { c.messageCount = std::stoull(readNext(arguments, i)); } else throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); @@ -65,9 +73,11 @@ } }; -const relpipe::writer::string_t CLIParser::OPTION_RELATION = L"--relation"; -const relpipe::writer::string_t CLIParser::OPTION_STREAM = L"--stream"; -const relpipe::writer::string_t CLIParser::OPTION_MESSAGE_COUNT = L"--message-count"; +const relpipe::common::type::StringX CLIParser::OPTION_RELATION = L"--relation"; +const relpipe::common::type::StringX CLIParser::OPTION_STREAM = L"--stream"; +const relpipe::common::type::StringX CLIParser::OPTION_CONNECTION_STRING = L"--connection-string"; +const relpipe::common::type::StringX CLIParser::OPTION_CONNECTION_OPTION = L"--connection-option"; +const relpipe::common::type::StringX CLIParser::OPTION_MESSAGE_COUNT = L"--message-count"; } } diff -r 0799eaf338b9 -r 610783d70ae9 src/Configuration.h --- a/src/Configuration.h Sun May 08 21:42:14 2022 +0200 +++ b/src/Configuration.h Sun Jun 05 22:51:45 2022 +0200 @@ -29,14 +29,29 @@ class Configuration { public: + class ConnectionOption { + public: + relpipe::common::type::StringX name; + relpipe::common::type::StringX value; + + ConnectionOption() = default; + + ConnectionOption(relpipe::common::type::StringX name, relpipe::common::type::StringX value) : name(name), value(value) { + } + + + }; + relpipe::common::type::Integer messageCount = 1; relpipe::common::type::StringX relation = L"message"; relpipe::common::type::StringX stream = L"relpipe"; - + relpipe::common::type::StringX connectionString = L"mqtt://localhost:1883"; + std::vector connectionOptions; + virtual ~Configuration() { } }; } } -} \ No newline at end of file +} diff -r 0799eaf338b9 -r 610783d70ae9 src/MQTTCommand.cpp --- a/src/MQTTCommand.cpp Sun May 08 21:42:14 2022 +0200 +++ b/src/MQTTCommand.cpp Sun Jun 05 22:51:45 2022 +0200 @@ -24,6 +24,7 @@ #include #include #include +#include #include @@ -57,16 +58,14 @@ */ static std::string generateClientID() { std::stringstream result; - // result << "relpipe-in-"; std::string symbols("0123456789abcdef"); std::random_device dev; std::mt19937 rng(dev()); - std::uniform_int_distribution dist6(0, symbols.size()); + std::uniform_int_distribution dist(0, symbols.size()); - for (int i = 0; i < 8; i++) result << symbols[dist6(rng)]; + for (int i = 0; i < 8; i++) result << symbols[dist(rng)]; - // std::cerr << "generated clien ID = " << result.str() << std::endl; return result.str(); } public: @@ -91,6 +90,21 @@ }; +void MQTTCommand::parseConnectionString(const std::string& connectionString, std::string& hostname, int& port) { + std::regex pattern("mqtt:(//)?([^:]+)(:([0-9]+))?"); + std::smatch match; + if (std::regex_match(connectionString, match, pattern)) { + hostname = match[2]; + port = stoi(match[4]); + } else { + throw std::invalid_argument("Invalid connection string format. Expecting something like: mqtt://localhost:1883"); + } +} + +void MQTTCommand::check(std::string operation, int result) { + if (result) throw std::logic_error("mosquitto operation failed: " + operation + " = " + std::to_string(result)); +} + void MQTTCommand::process(std::shared_ptr writer, Configuration& configuration) { std::shared_ptr mq = std::make_shared(writer, configuration); @@ -100,17 +114,33 @@ {L"data", TypeId::STRING} }, true); - mq->max_inflight_messages_set(1); - mq->connect("localhost", 1883); + std::string connectionString = convertor.to_bytes(configuration.connectionString); + std::string username; + std::string password; + std::string hostname; + int port; + + parseConnectionString(connectionString, hostname, port); + + for (auto o : configuration.connectionOptions) { + if (o.name == L"username") username = convertor.to_bytes(o.value); + else if (o.name == L"password") password = convertor.to_bytes(o.value); + else throw std::invalid_argument("Unsupported connection option: " + convertor.to_bytes(o.name)); + } + + if (username.size()) check("set credentials", mq->username_pw_set(username.c_str(), password.c_str())); + + check("set maximum inflight messages", mq->max_inflight_messages_set(1)); + check("connect", mq->connect(hostname.c_str(), port)); int mid; - mq->subscribe(&mid, convertor.to_bytes(configuration.stream).c_str()); + check("substcribe", mq->subscribe(&mid, convertor.to_bytes(configuration.stream).c_str())); //for (int i = configuration.messageCount; continueProcessing && i > 0; i--) { for (int i = configuration.messageCount; continueProcessing && i > 0; i = i - mq->popMessageCount()) { // std::cerr << "loop(): i=" << i << std::endl; //mq->loop(); - mq->loop(1000, 1); + check("loop", mq->loop(1000, 1)); //mq->loop(1000, -1); //mq->loop_forever(); //mq->loop_write(); diff -r 0799eaf338b9 -r 610783d70ae9 src/MQTTCommand.h --- a/src/MQTTCommand.h Sun May 08 21:42:14 2022 +0200 +++ b/src/MQTTCommand.h Sun Jun 05 22:51:45 2022 +0200 @@ -34,6 +34,8 @@ private: std::wstring_convert> convertor; // TODO: support also other encodings. std::atomic continueProcessing{true}; + static void parseConnectionString(const std::string& connectionString, std::string& hostname, int& port); + static void check(std::string operation, int result); public: virtual ~MQTTCommand();