# HG changeset patch # User František Kučera # Date 1652038945 -7200 # Node ID 1a0fbd17db1325cdee3844db7cda25db0f8b0c92 # Parent cb9577442d3b183f5345e3428a1ef5417d90c0db first version diff -r cb9577442d3b -r 1a0fbd17db13 bash-completion.sh --- a/bash-completion.sh Fri May 06 23:06:44 2022 +0200 +++ b/bash-completion.sh Sun May 08 21:42:25 2022 +0200 @@ -34,13 +34,11 @@ ) if [[ "$w1" == "--relation" && "x$w0" == "x" ]]; then COMPREPLY=("''") - elif [[ "$w1" == "--unlink-on-close" ]]; then COMPREPLY=($(compgen -W "${BOOLEAN_VALUES[*]}" -- "$w0")) - elif [[ "$w1" == "--queue" && "x$w0" == "x" ]]; then COMPREPLY=("''") + elif [[ "$w1" == "--stream" && "x$w0" == "x" ]]; then COMPREPLY=("''") else OPTIONS=( "--relation" - "--unlink-on-close" - "--queue" + "--stream" ) COMPREPLY=($(compgen -W "${OPTIONS[*]}" -- "$w0")) fi diff -r cb9577442d3b -r 1a0fbd17db13 src/CLIParser.h --- a/src/CLIParser.h Fri May 06 23:06:44 2022 +0200 +++ b/src/CLIParser.h Sun May 08 21:42:25 2022 +0200 @@ -37,20 +37,10 @@ else throw relpipe::cli::RelpipeCLIException(L"Missing CLI argument" + (i > 0 ? (L" after " + arguments[i - 1]) : L""), relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); } - /** - * TODO: use a common method - */ - bool parseBoolean(const relpipe::common::type::StringX& value) { - if (value == L"true") return true; - else if (value == L"false") return false; - else throw relpipe::cli::RelpipeCLIException(L"Unable to parse boolean value: " + value + L" (expecting true or false)", relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); - } - public: static const relpipe::common::type::StringX OPTION_RELATION; - static const relpipe::common::type::StringX OPTION_UNLINK_ON_CLOSE; - static const relpipe::common::type::StringX OPTION_QUEUE; + static const relpipe::common::type::StringX OPTION_STREAM; Configuration parse(const std::vector& arguments) { Configuration c; @@ -60,10 +50,8 @@ if (option == OPTION_RELATION) { c.relation = readNext(arguments, i); - } else if (option == OPTION_UNLINK_ON_CLOSE) { - c.unlinkOnClose = parseBoolean(readNext(arguments, i)); - } else if (option == OPTION_QUEUE) { - c.queue = readNext(arguments, i); + } else if (option == OPTION_STREAM) { + c.stream = readNext(arguments, i); } else throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); } @@ -75,8 +63,7 @@ }; const relpipe::common::type::StringX CLIParser::OPTION_RELATION = L"--relation"; -const relpipe::common::type::StringX CLIParser::OPTION_UNLINK_ON_CLOSE = L"--unlink-on-close"; -const relpipe::common::type::StringX CLIParser::OPTION_QUEUE = L"--queue"; +const relpipe::common::type::StringX CLIParser::OPTION_STREAM = L"--stream"; } } diff -r cb9577442d3b -r 1a0fbd17db13 src/Configuration.h --- a/src/Configuration.h Fri May 06 23:06:44 2022 +0200 +++ b/src/Configuration.h Sun May 08 21:42:25 2022 +0200 @@ -29,9 +29,8 @@ class Configuration { public: - relpipe::common::type::StringX relation = L"mqtt"; - relpipe::common::type::StringX queue = L"/relpipe"; - relpipe::common::type::Boolean unlinkOnClose = false; + relpipe::common::type::StringX relation = L"message"; + relpipe::common::type::StringX stream = L"relpipe"; virtual ~Configuration() { } diff -r cb9577442d3b -r 1a0fbd17db13 src/MQTTHandler.h --- a/src/MQTTHandler.h Fri May 06 23:06:44 2022 +0200 +++ b/src/MQTTHandler.h Sun May 08 21:42:25 2022 +0200 @@ -23,6 +23,7 @@ #include #include #include +#include #include @@ -42,6 +43,25 @@ private: std::wstring_convert> convertor; // TODO: support also other encodings. Configuration configuration; + std::shared_ptr mq = std::make_shared(generateClientID().c_str()); + + /** + * @return unique (random) client ID for MQTT to allow multiple simultaneous connections + */ + static std::string generateClientID() { + std::stringstream result; + // result << "relpipe-out-"; + std::string symbols("0123456789abcdef"); + + std::random_device dev; + std::mt19937 rng(dev()); + std::uniform_int_distribution dist6(0, symbols.size()); + + for (int i = 0; i < 8; i++) result << symbols[dist6(rng)]; + + // std::cerr << "generated clien ID = " << result.str() << std::endl; + return result.str(); + } struct CurrentRelation { relpipe::common::type::StringX name; @@ -53,16 +73,11 @@ public: MQTTHandler(Configuration configuration) : configuration(configuration) { - - { - // TODO: remove - int major, minor, patch; - mosqpp::lib_version(&major, &minor, &patch); - std::cerr << "mosquitto version: " << major << "." << minor << "." << patch << std::endl; - } + mq->connect("localhost", 1883); } void startRelation(relpipe::common::type::StringX name, std::vector attributes) override { + // TODO: check relation name according to the configuration currentRelation = CurrentRelation{name, attributes}; } @@ -77,13 +92,15 @@ currentRelation.attributeIndex++; if (currentRelation.attributeIndex == currentRelation.attributes.size()) { currentRelation.attributeIndex = 0; - // FIXME: send the message + int mid = -1; + mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str()); + // std::cerr << "MQTT message enqueued: " << mid << std::endl; } } void endOfPipe() { - + mq->disconnect(); } };