--- a/bash-completion.sh Fri May 06 23:09:56 2022 +0200
+++ b/bash-completion.sh Sun May 08 21:42:14 2022 +0200
@@ -34,14 +34,12 @@
)
if [[ "$w1" == "--relation" && "x$w0" == "x" ]]; then COMPREPLY=("''")
- elif [[ "$w1" == "--unlink-on-close" ]]; then COMPREPLY=($(compgen -W "${BOOLEAN_VALUES[*]}" -- "$w0"))
- elif [[ "$w1" == "--queue" && "x$w0" == "x" ]]; then COMPREPLY=("''")
+ elif [[ "$w1" == "--stream" && "x$w0" == "x" ]]; then COMPREPLY=("''")
elif [[ "$w1" == "--message-count" && "x$w0" == "x" ]]; then COMPREPLY=("1")
else
OPTIONS=(
"--relation"
- "--unlink-on-close"
- "--queue"
+ "--stream"
"--message-count"
)
COMPREPLY=($(compgen -W "${OPTIONS[*]}" -- "$w0"))
--- a/nbproject/configurations.xml Fri May 06 23:09:56 2022 +0200
+++ b/nbproject/configurations.xml Sun May 08 21:42:14 2022 +0200
@@ -42,7 +42,6 @@
<logicalFolder name="root" displayName="root" projectFiles="true" kind="ROOT">
<df root="." name="0">
<df name="src">
- <in>MQTT.h</in>
<in>MQTTCommand.cpp</in>
<in>relpipe-in-mqtt.cpp</in>
</df>
@@ -94,8 +93,6 @@
<preBuildFirst>true</preBuildFirst>
</preBuild>
</makefileType>
- <item path="src/MQTT.h" ex="false" tool="3" flavor2="0">
- </item>
<item path="src/MQTTCommand.cpp" ex="false" tool="1" flavor2="0">
<ccTool flags="0">
</ccTool>
@@ -134,8 +131,6 @@
<preBuildFirst>true</preBuildFirst>
</preBuild>
</makefileType>
- <item path="src/MQTT.h" ex="false" tool="3" flavor2="0">
- </item>
</conf>
</confs>
</configurationDescriptor>
--- a/src/CLIParser.h Fri May 06 23:09:56 2022 +0200
+++ b/src/CLIParser.h Sun May 08 21:42:14 2022 +0200
@@ -37,20 +37,10 @@
else throw relpipe::cli::RelpipeCLIException(L"Missing CLI argument" + (i > 0 ? (L" after " + arguments[i - 1]) : L""), relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
}
- /**
- * TODO: use a common method
- */
- bool parseBoolean(const relpipe::common::type::StringX& value) {
- if (value == L"true") return true;
- else if (value == L"false") return false;
- else throw relpipe::cli::RelpipeCLIException(L"Unable to parse boolean value: " + value + L" (expecting true or false)", relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
- }
-
public:
static const relpipe::writer::string_t OPTION_RELATION;
- static const relpipe::writer::string_t OPTION_UNLINK_ON_CLOSE;
- static const relpipe::writer::string_t OPTION_QUEUE;
+ static const relpipe::writer::string_t OPTION_STREAM;
static const relpipe::writer::string_t OPTION_MESSAGE_COUNT;
Configuration parse(const std::vector<relpipe::writer::string_t>& arguments) {
@@ -61,10 +51,8 @@
if (option == OPTION_RELATION) {
c.relation = readNext(arguments, i);
- } else if (option == OPTION_UNLINK_ON_CLOSE) {
- c.unlinkOnClose = parseBoolean(readNext(arguments, i));
- } else if (option == OPTION_QUEUE) {
- c.queue = readNext(arguments, i);
+ } else if (option == OPTION_STREAM) {
+ c.stream = readNext(arguments, i);
} else if (option == OPTION_MESSAGE_COUNT) {
c.messageCount = std::stoull(readNext(arguments, i));
} else throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
@@ -78,8 +66,7 @@
};
const relpipe::writer::string_t CLIParser::OPTION_RELATION = L"--relation";
-const relpipe::writer::string_t CLIParser::OPTION_UNLINK_ON_CLOSE = L"--unlink-on-close";
-const relpipe::writer::string_t CLIParser::OPTION_QUEUE = L"--queue";
+const relpipe::writer::string_t CLIParser::OPTION_STREAM = L"--stream";
const relpipe::writer::string_t CLIParser::OPTION_MESSAGE_COUNT = L"--message-count";
}
--- a/src/Configuration.h Fri May 06 23:09:56 2022 +0200
+++ b/src/Configuration.h Sun May 08 21:42:14 2022 +0200
@@ -30,9 +30,8 @@
public:
relpipe::common::type::Integer messageCount = 1;
- relpipe::common::type::StringX relation = L"mqtt";
- relpipe::common::type::StringX queue = L"/relpipe";
- relpipe::common::type::Boolean unlinkOnClose = false;
+ relpipe::common::type::StringX relation = L"message";
+ relpipe::common::type::StringX stream = L"relpipe";
virtual ~Configuration() {
}
--- a/src/MQTTCommand.cpp Fri May 06 23:09:56 2022 +0200
+++ b/src/MQTTCommand.cpp Sun May 08 21:42:14 2022 +0200
@@ -23,6 +23,7 @@
#include <unistd.h>
#include <sstream>
#include <iomanip>
+#include <random>
#include <mosquittopp.h>
@@ -37,7 +38,6 @@
#include "MQTTCommand.h"
#include "Hex.h"
-using namespace std;
using namespace relpipe::cli;
using namespace relpipe::writer;
@@ -45,31 +45,79 @@
namespace in {
namespace mqtt {
-void MQTTCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) {
- vector<AttributeMetadata> metadata;
+class MQTTClient : public mosqpp::mosquittopp {
+private:
+ std::shared_ptr<writer::RelationalWriter> writer;
+ Configuration& configuration;
+ int messageCount = 0;
+ std::string clientId;
- {
- // TODO: remove
- int major, minor, patch;
- mosqpp::lib_version(&major, &minor, &patch);
- std::cerr << "mosquitto version: " << major << "." << minor << "." << patch << std::endl;
+ /**
+ * @return unique (random) client ID for MQTT to allow multiple simultaneous connections
+ */
+ static std::string generateClientID() {
+ std::stringstream result;
+ // result << "relpipe-in-";
+ std::string symbols("0123456789abcdef");
+
+ std::random_device dev;
+ std::mt19937 rng(dev());
+ std::uniform_int_distribution<std::mt19937::result_type> dist6(0, symbols.size());
+
+ for (int i = 0; i < 8; i++) result << symbols[dist6(rng)];
+
+ // std::cerr << "generated clien ID = " << result.str() << std::endl;
+ return result.str();
+ }
+public:
+
+ MQTTClient(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) : mosqpp::mosquittopp((generateClientID()).c_str()), writer(writer), configuration(configuration) {
}
+ void on_message(const mosquitto_message* message) override {
+ // std::cerr << "got MQTT message: length=" << message->payloadlen << std::endl;
+ std::string payload = std::string((const char*) message->payload, message->payloadlen);
+ writer->writeAttribute(configuration.stream);
+ writer->writeAttribute(Hex::toTxt(payload));
+ writer->writeAttribute(Hex::toHex(payload));
+ messageCount++;
+ }
+
+ int popMessageCount() {
+ int count = messageCount;
+ messageCount = 0;
+ return count;
+ }
+
+};
+
+void MQTTCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) {
+ std::shared_ptr<MQTTClient> mq = std::make_shared<MQTTClient>(writer, configuration);
+
writer->startRelation(configuration.relation,{
- {L"queue", TypeId::STRING},
+ {L"stream", TypeId::STRING},
{L"text", TypeId::STRING},
{L"data", TypeId::STRING}
}, true);
- for (int i = configuration.messageCount; continueProcessing && i > 0; i--) {
- // TODO: maybe rather call mq_timedreceive() inside and check continueProcessing (to be able to stop even when no messages are comming)
- std::string message = "TODO"; // FIXME: receive message
+ mq->max_inflight_messages_set(1);
+ mq->connect("localhost", 1883);
+ int mid;
+ mq->subscribe(&mid, convertor.to_bytes(configuration.stream).c_str());
- writer->writeAttribute(configuration.queue);
- writer->writeAttribute(Hex::toTxt(message));
- writer->writeAttribute(Hex::toHex(message));
+ //for (int i = configuration.messageCount; continueProcessing && i > 0; i--) {
+ for (int i = configuration.messageCount; continueProcessing && i > 0; i = i - mq->popMessageCount()) {
+ // std::cerr << "loop(): i=" << i << std::endl;
+
+ //mq->loop();
+ mq->loop(1000, 1);
+ //mq->loop(1000, -1);
+ //mq->loop_forever();
+ //mq->loop_write();
}
+ // FIXME: move do destructor
+ mq->disconnect();
}
MQTTCommand::~MQTTCommand() {