first version v_0
authorFrantišek Kučera <franta-hg@frantovo.cz>
Sun, 08 May 2022 21:42:25 +0200
branchv_0
changeset 2 1a0fbd17db13
parent 1 cb9577442d3b
child 3 62ced7c41c67
first version
bash-completion.sh
src/CLIParser.h
src/Configuration.h
src/MQTTHandler.h
--- a/bash-completion.sh	Fri May 06 23:06:44 2022 +0200
+++ b/bash-completion.sh	Sun May 08 21:42:25 2022 +0200
@@ -34,13 +34,11 @@
 	)
 
 	if   [[ "$w1" == "--relation"                      && "x$w0" == "x" ]];    then COMPREPLY=("''")
-	elif [[ "$w1" == "--unlink-on-close"                                ]];    then COMPREPLY=($(compgen -W "${BOOLEAN_VALUES[*]}" -- "$w0"))
-	elif [[ "$w1" == "--queue"                         && "x$w0" == "x" ]];    then COMPREPLY=("''")
+	elif [[ "$w1" == "--stream"                        && "x$w0" == "x" ]];    then COMPREPLY=("''")
 	else
 		OPTIONS=(
 			"--relation"
-			"--unlink-on-close"
-			"--queue"
+			"--stream"
 		)
 		COMPREPLY=($(compgen -W "${OPTIONS[*]}" -- "$w0"))
 	fi
--- a/src/CLIParser.h	Fri May 06 23:06:44 2022 +0200
+++ b/src/CLIParser.h	Sun May 08 21:42:25 2022 +0200
@@ -37,20 +37,10 @@
 		else throw relpipe::cli::RelpipeCLIException(L"Missing CLI argument" + (i > 0 ? (L" after " + arguments[i - 1]) : L""), relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
 	}
 
-	/**
-	 * TODO: use a common method
-	 */
-	bool parseBoolean(const relpipe::common::type::StringX& value) {
-		if (value == L"true") return true;
-		else if (value == L"false") return false;
-		else throw relpipe::cli::RelpipeCLIException(L"Unable to parse boolean value: " + value + L" (expecting true or false)", relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
-	}
-
 public:
 
 	static const relpipe::common::type::StringX OPTION_RELATION;
-	static const relpipe::common::type::StringX OPTION_UNLINK_ON_CLOSE;
-	static const relpipe::common::type::StringX OPTION_QUEUE;
+	static const relpipe::common::type::StringX OPTION_STREAM;
 
 	Configuration parse(const std::vector<relpipe::common::type::StringX>& arguments) {
 		Configuration c;
@@ -60,10 +50,8 @@
 
 			if (option == OPTION_RELATION) {
 				c.relation = readNext(arguments, i);
-			} else if (option == OPTION_UNLINK_ON_CLOSE) {
-				c.unlinkOnClose = parseBoolean(readNext(arguments, i));
-			} else if (option == OPTION_QUEUE) {
-				c.queue = readNext(arguments, i);
+			} else if (option == OPTION_STREAM) {
+				c.stream = readNext(arguments, i);
 			} else throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
 		}
 
@@ -75,8 +63,7 @@
 };
 
 const relpipe::common::type::StringX CLIParser::OPTION_RELATION = L"--relation";
-const relpipe::common::type::StringX CLIParser::OPTION_UNLINK_ON_CLOSE = L"--unlink-on-close";
-const relpipe::common::type::StringX CLIParser::OPTION_QUEUE = L"--queue";
+const relpipe::common::type::StringX CLIParser::OPTION_STREAM = L"--stream";
 
 }
 }
--- a/src/Configuration.h	Fri May 06 23:06:44 2022 +0200
+++ b/src/Configuration.h	Sun May 08 21:42:25 2022 +0200
@@ -29,9 +29,8 @@
 class Configuration {
 public:
 
-	relpipe::common::type::StringX relation = L"mqtt";
-	relpipe::common::type::StringX queue = L"/relpipe";
-	relpipe::common::type::Boolean unlinkOnClose = false;
+	relpipe::common::type::StringX relation = L"message";
+	relpipe::common::type::StringX stream = L"relpipe";
 
 	virtual ~Configuration() {
 	}
--- a/src/MQTTHandler.h	Fri May 06 23:06:44 2022 +0200
+++ b/src/MQTTHandler.h	Sun May 08 21:42:25 2022 +0200
@@ -23,6 +23,7 @@
 #include <sstream>
 #include <locale>
 #include <codecvt>
+#include <random>
 
 #include <mosquittopp.h>
 
@@ -42,6 +43,25 @@
 private:
 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
 	Configuration configuration;
+	std::shared_ptr<mosqpp::mosquittopp> mq = std::make_shared<mosqpp::mosquittopp>(generateClientID().c_str());
+
+	/**
+	 * @return unique (random) client ID for MQTT to allow multiple simultaneous connections
+	 */
+	static std::string generateClientID() {
+		std::stringstream result;
+		// result << "relpipe-out-";
+		std::string symbols("0123456789abcdef");
+
+		std::random_device dev;
+		std::mt19937 rng(dev());
+		std::uniform_int_distribution<std::mt19937::result_type> dist6(0, symbols.size());
+
+		for (int i = 0; i < 8; i++) result << symbols[dist6(rng)];
+
+		// std::cerr << "generated clien ID = " << result.str() << std::endl;
+		return result.str();
+	}
 
 	struct CurrentRelation {
 		relpipe::common::type::StringX name;
@@ -53,16 +73,11 @@
 public:
 
 	MQTTHandler(Configuration configuration) : configuration(configuration) {
-
-		{
-			// TODO: remove
-			int major, minor, patch;
-			mosqpp::lib_version(&major, &minor, &patch);
-			std::cerr << "mosquitto version: " << major << "." << minor << "." << patch << std::endl;
-		}
+		mq->connect("localhost", 1883);
 	}
 
 	void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
+		// TODO: check relation name according to the configuration
 		currentRelation = CurrentRelation{name, attributes};
 	}
 
@@ -77,13 +92,15 @@
 		currentRelation.attributeIndex++;
 		if (currentRelation.attributeIndex == currentRelation.attributes.size()) {
 			currentRelation.attributeIndex = 0;
-			// FIXME: send the message
+			int mid = -1;
+			mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str());
+			// std::cerr << "MQTT message enqueued: " << mid << std::endl;
 		}
 
 	}
 
 	void endOfPipe() {
-
+		mq->disconnect();
 	}
 
 };