# HG changeset patch # User František Kučera # Date 1652038934 -7200 # Node ID 0799eaf338b9064d47b9596e13180c86bbc106c8 # Parent 4993a084b8ba0bb6fc32f753a2bf87f285571b3f first version diff -r 4993a084b8ba -r 0799eaf338b9 bash-completion.sh --- a/bash-completion.sh Fri May 06 23:09:56 2022 +0200 +++ b/bash-completion.sh Sun May 08 21:42:14 2022 +0200 @@ -34,14 +34,12 @@ ) if [[ "$w1" == "--relation" && "x$w0" == "x" ]]; then COMPREPLY=("''") - elif [[ "$w1" == "--unlink-on-close" ]]; then COMPREPLY=($(compgen -W "${BOOLEAN_VALUES[*]}" -- "$w0")) - elif [[ "$w1" == "--queue" && "x$w0" == "x" ]]; then COMPREPLY=("''") + elif [[ "$w1" == "--stream" && "x$w0" == "x" ]]; then COMPREPLY=("''") elif [[ "$w1" == "--message-count" && "x$w0" == "x" ]]; then COMPREPLY=("1") else OPTIONS=( "--relation" - "--unlink-on-close" - "--queue" + "--stream" "--message-count" ) COMPREPLY=($(compgen -W "${OPTIONS[*]}" -- "$w0")) diff -r 4993a084b8ba -r 0799eaf338b9 nbproject/configurations.xml --- a/nbproject/configurations.xml Fri May 06 23:09:56 2022 +0200 +++ b/nbproject/configurations.xml Sun May 08 21:42:14 2022 +0200 @@ -42,7 +42,6 @@ - MQTT.h MQTTCommand.cpp relpipe-in-mqtt.cpp @@ -94,8 +93,6 @@ true - - @@ -134,8 +131,6 @@ true - - diff -r 4993a084b8ba -r 0799eaf338b9 src/CLIParser.h --- a/src/CLIParser.h Fri May 06 23:09:56 2022 +0200 +++ b/src/CLIParser.h Sun May 08 21:42:14 2022 +0200 @@ -37,20 +37,10 @@ else throw relpipe::cli::RelpipeCLIException(L"Missing CLI argument" + (i > 0 ? (L" after " + arguments[i - 1]) : L""), relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); } - /** - * TODO: use a common method - */ - bool parseBoolean(const relpipe::common::type::StringX& value) { - if (value == L"true") return true; - else if (value == L"false") return false; - else throw relpipe::cli::RelpipeCLIException(L"Unable to parse boolean value: " + value + L" (expecting true or false)", relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); - } - public: static const relpipe::writer::string_t OPTION_RELATION; - static const relpipe::writer::string_t OPTION_UNLINK_ON_CLOSE; - static const relpipe::writer::string_t OPTION_QUEUE; + static const relpipe::writer::string_t OPTION_STREAM; static const relpipe::writer::string_t OPTION_MESSAGE_COUNT; Configuration parse(const std::vector& arguments) { @@ -61,10 +51,8 @@ if (option == OPTION_RELATION) { c.relation = readNext(arguments, i); - } else if (option == OPTION_UNLINK_ON_CLOSE) { - c.unlinkOnClose = parseBoolean(readNext(arguments, i)); - } else if (option == OPTION_QUEUE) { - c.queue = readNext(arguments, i); + } else if (option == OPTION_STREAM) { + c.stream = readNext(arguments, i); } else if (option == OPTION_MESSAGE_COUNT) { c.messageCount = std::stoull(readNext(arguments, i)); } else throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); @@ -78,8 +66,7 @@ }; const relpipe::writer::string_t CLIParser::OPTION_RELATION = L"--relation"; -const relpipe::writer::string_t CLIParser::OPTION_UNLINK_ON_CLOSE = L"--unlink-on-close"; -const relpipe::writer::string_t CLIParser::OPTION_QUEUE = L"--queue"; +const relpipe::writer::string_t CLIParser::OPTION_STREAM = L"--stream"; const relpipe::writer::string_t CLIParser::OPTION_MESSAGE_COUNT = L"--message-count"; } diff -r 4993a084b8ba -r 0799eaf338b9 src/Configuration.h --- a/src/Configuration.h Fri May 06 23:09:56 2022 +0200 +++ b/src/Configuration.h Sun May 08 21:42:14 2022 +0200 @@ -30,9 +30,8 @@ public: relpipe::common::type::Integer messageCount = 1; - relpipe::common::type::StringX relation = L"mqtt"; - relpipe::common::type::StringX queue = L"/relpipe"; - relpipe::common::type::Boolean unlinkOnClose = false; + relpipe::common::type::StringX relation = L"message"; + relpipe::common::type::StringX stream = L"relpipe"; virtual ~Configuration() { } diff -r 4993a084b8ba -r 0799eaf338b9 src/MQTTCommand.cpp --- a/src/MQTTCommand.cpp Fri May 06 23:09:56 2022 +0200 +++ b/src/MQTTCommand.cpp Sun May 08 21:42:14 2022 +0200 @@ -23,6 +23,7 @@ #include #include #include +#include #include @@ -37,7 +38,6 @@ #include "MQTTCommand.h" #include "Hex.h" -using namespace std; using namespace relpipe::cli; using namespace relpipe::writer; @@ -45,31 +45,79 @@ namespace in { namespace mqtt { -void MQTTCommand::process(std::shared_ptr writer, Configuration& configuration) { - vector metadata; +class MQTTClient : public mosqpp::mosquittopp { +private: + std::shared_ptr writer; + Configuration& configuration; + int messageCount = 0; + std::string clientId; - { - // TODO: remove - int major, minor, patch; - mosqpp::lib_version(&major, &minor, &patch); - std::cerr << "mosquitto version: " << major << "." << minor << "." << patch << std::endl; + /** + * @return unique (random) client ID for MQTT to allow multiple simultaneous connections + */ + static std::string generateClientID() { + std::stringstream result; + // result << "relpipe-in-"; + std::string symbols("0123456789abcdef"); + + std::random_device dev; + std::mt19937 rng(dev()); + std::uniform_int_distribution dist6(0, symbols.size()); + + for (int i = 0; i < 8; i++) result << symbols[dist6(rng)]; + + // std::cerr << "generated clien ID = " << result.str() << std::endl; + return result.str(); + } +public: + + MQTTClient(std::shared_ptr writer, Configuration& configuration) : mosqpp::mosquittopp((generateClientID()).c_str()), writer(writer), configuration(configuration) { } + void on_message(const mosquitto_message* message) override { + // std::cerr << "got MQTT message: length=" << message->payloadlen << std::endl; + std::string payload = std::string((const char*) message->payload, message->payloadlen); + writer->writeAttribute(configuration.stream); + writer->writeAttribute(Hex::toTxt(payload)); + writer->writeAttribute(Hex::toHex(payload)); + messageCount++; + } + + int popMessageCount() { + int count = messageCount; + messageCount = 0; + return count; + } + +}; + +void MQTTCommand::process(std::shared_ptr writer, Configuration& configuration) { + std::shared_ptr mq = std::make_shared(writer, configuration); + writer->startRelation(configuration.relation,{ - {L"queue", TypeId::STRING}, + {L"stream", TypeId::STRING}, {L"text", TypeId::STRING}, {L"data", TypeId::STRING} }, true); - for (int i = configuration.messageCount; continueProcessing && i > 0; i--) { - // TODO: maybe rather call mq_timedreceive() inside and check continueProcessing (to be able to stop even when no messages are comming) - std::string message = "TODO"; // FIXME: receive message + mq->max_inflight_messages_set(1); + mq->connect("localhost", 1883); + int mid; + mq->subscribe(&mid, convertor.to_bytes(configuration.stream).c_str()); - writer->writeAttribute(configuration.queue); - writer->writeAttribute(Hex::toTxt(message)); - writer->writeAttribute(Hex::toHex(message)); + //for (int i = configuration.messageCount; continueProcessing && i > 0; i--) { + for (int i = configuration.messageCount; continueProcessing && i > 0; i = i - mq->popMessageCount()) { + // std::cerr << "loop(): i=" << i << std::endl; + + //mq->loop(); + mq->loop(1000, 1); + //mq->loop(1000, -1); + //mq->loop_forever(); + //mq->loop_write(); } + // FIXME: move do destructor + mq->disconnect(); } MQTTCommand::~MQTTCommand() {