parse connection string, credentials, check return values v_0 tip
authorFrantišek Kučera <franta-hg@frantovo.cz>
Sun, 05 Jun 2022 22:51:45 +0200
branchv_0
changeset 3 610783d70ae9
parent 2 0799eaf338b9
parse connection string, credentials, check return values
bash-completion.sh
src/CLIParser.h
src/Configuration.h
src/MQTTCommand.cpp
src/MQTTCommand.h
--- a/bash-completion.sh	Sun May 08 21:42:14 2022 +0200
+++ b/bash-completion.sh	Sun Jun 05 22:51:45 2022 +0200
@@ -22,23 +22,23 @@
 	w2=${COMP_WORDS[COMP_CWORD-2]}
 	w3=${COMP_WORDS[COMP_CWORD-3]}
 
-	DATA_TYPE=(
-		"string"
-		"integer"
-		"boolean"
-	)
-
-	BOOLEAN_VALUES=(
-		"true"
-		"false"
+	CONNECTION_OPTIONS=(
+		"username"
+		"password"
 	)
 
 	if   [[ "$w1" == "--relation"                      && "x$w0" == "x" ]];    then COMPREPLY=("''")
+	elif [[ "$w1" == "--connection-string"             && "x$w0" == "x" ]];    then COMPREPLY=("'mqtt://localhost:1883'")
+	elif [[ "$w1" == "--connection-option"                              ]];    then COMPREPLY=($(compgen -W "${CONNECTION_OPTIONS[*]}" -- "$w0"))
+	elif [[ "$w2" == "--connection-option"             && "x$w0" == "x" ]];    then COMPREPLY=("''")
 	elif [[ "$w1" == "--stream"                        && "x$w0" == "x" ]];    then COMPREPLY=("''")
 	elif [[ "$w1" == "--message-count"                 && "x$w0" == "x" ]];    then COMPREPLY=("1")
 	else
 		OPTIONS=(
 			"--relation"
+			#"--connection-name"
+			"--connection-string"
+			"--connection-option"
 			"--stream"
 			"--message-count"
 		)
--- a/src/CLIParser.h	Sun May 08 21:42:14 2022 +0200
+++ b/src/CLIParser.h	Sun Jun 05 22:51:45 2022 +0200
@@ -19,7 +19,7 @@
 #include <vector>
 #include <iostream>
 
-#include <relpipe/writer/typedefs.h>
+#include <relpipe/common/type/typedefs.h>
 #include <relpipe/cli/CLI.h>
 #include <relpipe/cli/RelpipeCLIException.h>
 
@@ -32,27 +32,35 @@
 class CLIParser {
 private:
 
-	relpipe::writer::string_t readNext(const std::vector<relpipe::writer::string_t>& arguments, int& i) {
+	relpipe::common::type::StringX readNext(const std::vector<relpipe::common::type::StringX>& arguments, int& i) {
 		if (i < arguments.size()) return arguments[i++];
 		else throw relpipe::cli::RelpipeCLIException(L"Missing CLI argument" + (i > 0 ? (L" after " + arguments[i - 1]) : L""), relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
 	}
 
 public:
 
-	static const relpipe::writer::string_t OPTION_RELATION;
-	static const relpipe::writer::string_t OPTION_STREAM;
-	static const relpipe::writer::string_t OPTION_MESSAGE_COUNT;
+	static const relpipe::common::type::StringX OPTION_RELATION;
+	static const relpipe::common::type::StringX OPTION_STREAM;
+	static const relpipe::common::type::StringX OPTION_CONNECTION_STRING;
+	static const relpipe::common::type::StringX OPTION_CONNECTION_OPTION;
+	static const relpipe::common::type::StringX OPTION_MESSAGE_COUNT;
 
-	Configuration parse(const std::vector<relpipe::writer::string_t>& arguments) {
+	Configuration parse(const std::vector<relpipe::common::type::StringX>& arguments) {
 		Configuration c;
 
 		for (int i = 0; i < arguments.size();) {
-			relpipe::writer::string_t option = readNext(arguments, i);
+			relpipe::common::type::StringX option = readNext(arguments, i);
 
 			if (option == OPTION_RELATION) {
 				c.relation = readNext(arguments, i);
 			} else if (option == OPTION_STREAM) {
 				c.stream = readNext(arguments, i);
+			} else if (option == OPTION_CONNECTION_STRING) {
+				c.connectionString = readNext(arguments, i);
+			} else if (option == OPTION_CONNECTION_OPTION) {
+				auto name = readNext(arguments, i);
+				auto value = readNext(arguments, i);
+				c.connectionOptions.push_back({name, value});
 			} else if (option == OPTION_MESSAGE_COUNT) {
 				c.messageCount = std::stoull(readNext(arguments, i));
 			} else throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
@@ -65,9 +73,11 @@
 	}
 };
 
-const relpipe::writer::string_t CLIParser::OPTION_RELATION = L"--relation";
-const relpipe::writer::string_t CLIParser::OPTION_STREAM = L"--stream";
-const relpipe::writer::string_t CLIParser::OPTION_MESSAGE_COUNT = L"--message-count";
+const relpipe::common::type::StringX CLIParser::OPTION_RELATION = L"--relation";
+const relpipe::common::type::StringX CLIParser::OPTION_STREAM = L"--stream";
+const relpipe::common::type::StringX CLIParser::OPTION_CONNECTION_STRING = L"--connection-string";
+const relpipe::common::type::StringX CLIParser::OPTION_CONNECTION_OPTION = L"--connection-option";
+const relpipe::common::type::StringX CLIParser::OPTION_MESSAGE_COUNT = L"--message-count";
 
 }
 }
--- a/src/Configuration.h	Sun May 08 21:42:14 2022 +0200
+++ b/src/Configuration.h	Sun Jun 05 22:51:45 2022 +0200
@@ -29,14 +29,29 @@
 class Configuration {
 public:
 
+	class ConnectionOption {
+	public:
+		relpipe::common::type::StringX name;
+		relpipe::common::type::StringX value;
+
+		ConnectionOption() = default;
+
+		ConnectionOption(relpipe::common::type::StringX name, relpipe::common::type::StringX value) : name(name), value(value) {
+		}
+
+
+	};
+
 	relpipe::common::type::Integer messageCount = 1;
 	relpipe::common::type::StringX relation = L"message";
 	relpipe::common::type::StringX stream = L"relpipe";
-
+	relpipe::common::type::StringX connectionString = L"mqtt://localhost:1883";
+	std::vector<ConnectionOption> connectionOptions;
+	
 	virtual ~Configuration() {
 	}
 };
 
 }
 }
-}
\ No newline at end of file
+}
--- a/src/MQTTCommand.cpp	Sun May 08 21:42:14 2022 +0200
+++ b/src/MQTTCommand.cpp	Sun Jun 05 22:51:45 2022 +0200
@@ -24,6 +24,7 @@
 #include <sstream>
 #include <iomanip>
 #include <random>
+#include <regex>
 
 #include <mosquittopp.h>
 
@@ -57,16 +58,14 @@
 	 */
 	static std::string generateClientID() {
 		std::stringstream result;
-		// result << "relpipe-in-";
 		std::string symbols("0123456789abcdef");
 
 		std::random_device dev;
 		std::mt19937 rng(dev());
-		std::uniform_int_distribution<std::mt19937::result_type> dist6(0, symbols.size());
+		std::uniform_int_distribution<std::mt19937::result_type> dist(0, symbols.size());
 
-		for (int i = 0; i < 8; i++) result << symbols[dist6(rng)];
+		for (int i = 0; i < 8; i++) result << symbols[dist(rng)];
 
-		// std::cerr << "generated clien ID = " << result.str() << std::endl;
 		return result.str();
 	}
 public:
@@ -91,6 +90,21 @@
 
 };
 
+void MQTTCommand::parseConnectionString(const std::string& connectionString, std::string& hostname, int& port) {
+	std::regex pattern("mqtt:(//)?([^:]+)(:([0-9]+))?");
+	std::smatch match;
+	if (std::regex_match(connectionString, match, pattern)) {
+		hostname = match[2];
+		port = stoi(match[4]);
+	} else {
+		throw std::invalid_argument("Invalid connection string format. Expecting something like: mqtt://localhost:1883");
+	}
+}
+
+void MQTTCommand::check(std::string operation, int result) {
+	if (result) throw std::logic_error("mosquitto operation failed: " + operation + " = " + std::to_string(result));
+}
+
 void MQTTCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) {
 	std::shared_ptr<MQTTClient> mq = std::make_shared<MQTTClient>(writer, configuration);
 
@@ -100,17 +114,33 @@
 		{L"data", TypeId::STRING}
 	}, true);
 
-	mq->max_inflight_messages_set(1);
-	mq->connect("localhost", 1883);
+	std::string connectionString = convertor.to_bytes(configuration.connectionString);
+	std::string username;
+	std::string password;
+	std::string hostname;
+	int port;
+
+	parseConnectionString(connectionString, hostname, port);
+
+	for (auto o : configuration.connectionOptions) {
+		if (o.name == L"username") username = convertor.to_bytes(o.value);
+		else if (o.name == L"password") password = convertor.to_bytes(o.value);
+		else throw std::invalid_argument("Unsupported connection option: " + convertor.to_bytes(o.name));
+	}
+
+	if (username.size()) check("set credentials", mq->username_pw_set(username.c_str(), password.c_str()));
+
+	check("set maximum inflight messages", mq->max_inflight_messages_set(1));
+	check("connect", mq->connect(hostname.c_str(), port));
 	int mid;
-	mq->subscribe(&mid, convertor.to_bytes(configuration.stream).c_str());
+	check("substcribe", mq->subscribe(&mid, convertor.to_bytes(configuration.stream).c_str()));
 
 	//for (int i = configuration.messageCount; continueProcessing && i > 0; i--) {
 	for (int i = configuration.messageCount; continueProcessing && i > 0; i = i - mq->popMessageCount()) {
 		// std::cerr << "loop(): i=" << i << std::endl;
 
 		//mq->loop();
-		mq->loop(1000, 1);
+		check("loop", mq->loop(1000, 1));
 		//mq->loop(1000, -1);
 		//mq->loop_forever();
 		//mq->loop_write();
--- a/src/MQTTCommand.h	Sun May 08 21:42:14 2022 +0200
+++ b/src/MQTTCommand.h	Sun Jun 05 22:51:45 2022 +0200
@@ -34,6 +34,8 @@
 private:
 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
 	std::atomic<bool> continueProcessing{true};
+	static void parseConnectionString(const std::string& connectionString, std::string& hostname, int& port);
+	static void check(std::string operation, int result);
 public:
 	virtual ~MQTTCommand();