first version v_0
authorFrantišek Kučera <franta-hg@frantovo.cz>
Sun, 08 May 2022 21:42:14 +0200
branchv_0
changeset 2 0799eaf338b9
parent 1 4993a084b8ba
child 3 610783d70ae9
first version
bash-completion.sh
nbproject/configurations.xml
src/CLIParser.h
src/Configuration.h
src/MQTTCommand.cpp
--- a/bash-completion.sh	Fri May 06 23:09:56 2022 +0200
+++ b/bash-completion.sh	Sun May 08 21:42:14 2022 +0200
@@ -34,14 +34,12 @@
 	)
 
 	if   [[ "$w1" == "--relation"                      && "x$w0" == "x" ]];    then COMPREPLY=("''")
-	elif [[ "$w1" == "--unlink-on-close"                                ]];    then COMPREPLY=($(compgen -W "${BOOLEAN_VALUES[*]}" -- "$w0"))
-	elif [[ "$w1" == "--queue"                         && "x$w0" == "x" ]];    then COMPREPLY=("''")
+	elif [[ "$w1" == "--stream"                        && "x$w0" == "x" ]];    then COMPREPLY=("''")
 	elif [[ "$w1" == "--message-count"                 && "x$w0" == "x" ]];    then COMPREPLY=("1")
 	else
 		OPTIONS=(
 			"--relation"
-			"--unlink-on-close"
-			"--queue"
+			"--stream"
 			"--message-count"
 		)
 		COMPREPLY=($(compgen -W "${OPTIONS[*]}" -- "$w0"))
--- a/nbproject/configurations.xml	Fri May 06 23:09:56 2022 +0200
+++ b/nbproject/configurations.xml	Sun May 08 21:42:14 2022 +0200
@@ -42,7 +42,6 @@
   <logicalFolder name="root" displayName="root" projectFiles="true" kind="ROOT">
     <df root="." name="0">
       <df name="src">
-        <in>MQTT.h</in>
         <in>MQTTCommand.cpp</in>
         <in>relpipe-in-mqtt.cpp</in>
       </df>
@@ -94,8 +93,6 @@
           <preBuildFirst>true</preBuildFirst>
         </preBuild>
       </makefileType>
-      <item path="src/MQTT.h" ex="false" tool="3" flavor2="0">
-      </item>
       <item path="src/MQTTCommand.cpp" ex="false" tool="1" flavor2="0">
         <ccTool flags="0">
         </ccTool>
@@ -134,8 +131,6 @@
           <preBuildFirst>true</preBuildFirst>
         </preBuild>
       </makefileType>
-      <item path="src/MQTT.h" ex="false" tool="3" flavor2="0">
-      </item>
     </conf>
   </confs>
 </configurationDescriptor>
--- a/src/CLIParser.h	Fri May 06 23:09:56 2022 +0200
+++ b/src/CLIParser.h	Sun May 08 21:42:14 2022 +0200
@@ -37,20 +37,10 @@
 		else throw relpipe::cli::RelpipeCLIException(L"Missing CLI argument" + (i > 0 ? (L" after " + arguments[i - 1]) : L""), relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
 	}
 
-	/**
-	 * TODO: use a common method
-	 */
-	bool parseBoolean(const relpipe::common::type::StringX& value) {
-		if (value == L"true") return true;
-		else if (value == L"false") return false;
-		else throw relpipe::cli::RelpipeCLIException(L"Unable to parse boolean value: " + value + L" (expecting true or false)", relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
-	}
-
 public:
 
 	static const relpipe::writer::string_t OPTION_RELATION;
-	static const relpipe::writer::string_t OPTION_UNLINK_ON_CLOSE;
-	static const relpipe::writer::string_t OPTION_QUEUE;
+	static const relpipe::writer::string_t OPTION_STREAM;
 	static const relpipe::writer::string_t OPTION_MESSAGE_COUNT;
 
 	Configuration parse(const std::vector<relpipe::writer::string_t>& arguments) {
@@ -61,10 +51,8 @@
 
 			if (option == OPTION_RELATION) {
 				c.relation = readNext(arguments, i);
-			} else if (option == OPTION_UNLINK_ON_CLOSE) {
-				c.unlinkOnClose = parseBoolean(readNext(arguments, i));
-			} else if (option == OPTION_QUEUE) {
-				c.queue = readNext(arguments, i);
+			} else if (option == OPTION_STREAM) {
+				c.stream = readNext(arguments, i);
 			} else if (option == OPTION_MESSAGE_COUNT) {
 				c.messageCount = std::stoull(readNext(arguments, i));
 			} else throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
@@ -78,8 +66,7 @@
 };
 
 const relpipe::writer::string_t CLIParser::OPTION_RELATION = L"--relation";
-const relpipe::writer::string_t CLIParser::OPTION_UNLINK_ON_CLOSE = L"--unlink-on-close";
-const relpipe::writer::string_t CLIParser::OPTION_QUEUE = L"--queue";
+const relpipe::writer::string_t CLIParser::OPTION_STREAM = L"--stream";
 const relpipe::writer::string_t CLIParser::OPTION_MESSAGE_COUNT = L"--message-count";
 
 }
--- a/src/Configuration.h	Fri May 06 23:09:56 2022 +0200
+++ b/src/Configuration.h	Sun May 08 21:42:14 2022 +0200
@@ -30,9 +30,8 @@
 public:
 
 	relpipe::common::type::Integer messageCount = 1;
-	relpipe::common::type::StringX relation = L"mqtt";
-	relpipe::common::type::StringX queue = L"/relpipe";
-	relpipe::common::type::Boolean unlinkOnClose = false;
+	relpipe::common::type::StringX relation = L"message";
+	relpipe::common::type::StringX stream = L"relpipe";
 
 	virtual ~Configuration() {
 	}
--- a/src/MQTTCommand.cpp	Fri May 06 23:09:56 2022 +0200
+++ b/src/MQTTCommand.cpp	Sun May 08 21:42:14 2022 +0200
@@ -23,6 +23,7 @@
 #include <unistd.h>
 #include <sstream>
 #include <iomanip>
+#include <random>
 
 #include <mosquittopp.h>
 
@@ -37,7 +38,6 @@
 #include "MQTTCommand.h"
 #include "Hex.h"
 
-using namespace std;
 using namespace relpipe::cli;
 using namespace relpipe::writer;
 
@@ -45,31 +45,79 @@
 namespace in {
 namespace mqtt {
 
-void MQTTCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) {
-	vector<AttributeMetadata> metadata;
+class MQTTClient : public mosqpp::mosquittopp {
+private:
+	std::shared_ptr<writer::RelationalWriter> writer;
+	Configuration& configuration;
+	int messageCount = 0;
+	std::string clientId;
 
-	{
-		// TODO: remove
-		int major, minor, patch;
-		mosqpp::lib_version(&major, &minor, &patch);
-		std::cerr << "mosquitto version: " << major << "." << minor << "." << patch << std::endl;
+	/**
+	 * @return unique (random) client ID for MQTT to allow multiple simultaneous connections
+	 */
+	static std::string generateClientID() {
+		std::stringstream result;
+		// result << "relpipe-in-";
+		std::string symbols("0123456789abcdef");
+
+		std::random_device dev;
+		std::mt19937 rng(dev());
+		std::uniform_int_distribution<std::mt19937::result_type> dist6(0, symbols.size());
+
+		for (int i = 0; i < 8; i++) result << symbols[dist6(rng)];
+
+		// std::cerr << "generated clien ID = " << result.str() << std::endl;
+		return result.str();
+	}
+public:
+
+	MQTTClient(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) : mosqpp::mosquittopp((generateClientID()).c_str()), writer(writer), configuration(configuration) {
 	}
 
+	void on_message(const mosquitto_message* message) override {
+		// std::cerr << "got MQTT message: length=" << message->payloadlen << std::endl;
+		std::string payload = std::string((const char*) message->payload, message->payloadlen);
+		writer->writeAttribute(configuration.stream);
+		writer->writeAttribute(Hex::toTxt(payload));
+		writer->writeAttribute(Hex::toHex(payload));
+		messageCount++;
+	}
+
+	int popMessageCount() {
+		int count = messageCount;
+		messageCount = 0;
+		return count;
+	}
+
+};
+
+void MQTTCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) {
+	std::shared_ptr<MQTTClient> mq = std::make_shared<MQTTClient>(writer, configuration);
+
 	writer->startRelation(configuration.relation,{
-		{L"queue", TypeId::STRING},
+		{L"stream", TypeId::STRING},
 		{L"text", TypeId::STRING},
 		{L"data", TypeId::STRING}
 	}, true);
 
-	for (int i = configuration.messageCount; continueProcessing && i > 0; i--) {
-		// TODO: maybe rather call mq_timedreceive() inside and check continueProcessing (to be able to stop even when no messages are comming)
-		std::string message = "TODO"; // FIXME: receive message
+	mq->max_inflight_messages_set(1);
+	mq->connect("localhost", 1883);
+	int mid;
+	mq->subscribe(&mid, convertor.to_bytes(configuration.stream).c_str());
 
-		writer->writeAttribute(configuration.queue);
-		writer->writeAttribute(Hex::toTxt(message));
-		writer->writeAttribute(Hex::toHex(message));
+	//for (int i = configuration.messageCount; continueProcessing && i > 0; i--) {
+	for (int i = configuration.messageCount; continueProcessing && i > 0; i = i - mq->popMessageCount()) {
+		// std::cerr << "loop(): i=" << i << std::endl;
+
+		//mq->loop();
+		mq->loop(1000, 1);
+		//mq->loop(1000, -1);
+		//mq->loop_forever();
+		//mq->loop_write();
 	}
 
+	// FIXME: move do destructor
+	mq->disconnect();
 }
 
 MQTTCommand::~MQTTCommand() {