parse connection string, credentials, check return values v_0 tip
authorFrantišek Kučera <franta-hg@frantovo.cz>
Sun, 05 Jun 2022 00:08:52 +0200
branchv_0
changeset 3 62ced7c41c67
parent 2 1a0fbd17db13
parse connection string, credentials, check return values
bash-completion.sh
src/CLIParser.h
src/Configuration.h
src/MQTTHandler.h
src/relpipe-out-mqtt.cpp
--- a/bash-completion.sh	Sun May 08 21:42:25 2022 +0200
+++ b/bash-completion.sh	Sun Jun 05 00:08:52 2022 +0200
@@ -22,22 +22,22 @@
 	w2=${COMP_WORDS[COMP_CWORD-2]}
 	w3=${COMP_WORDS[COMP_CWORD-3]}
 
-	DATA_TYPE=(
-		"string"
-		"integer"
-		"boolean"
-	)
-
-	BOOLEAN_VALUES=(
-		"true"
-		"false"
+	CONNECTION_OPTIONS=(
+		"username"
+		"password"
 	)
 
 	if   [[ "$w1" == "--relation"                      && "x$w0" == "x" ]];    then COMPREPLY=("''")
+	elif [[ "$w1" == "--connection-string"             && "x$w0" == "x" ]];    then COMPREPLY=("'mqtt://localhost:1883'")
+	elif [[ "$w1" == "--connection-option"                              ]];    then COMPREPLY=($(compgen -W "${CONNECTION_OPTIONS[*]}" -- "$w0"))
+	elif [[ "$w2" == "--connection-option"             && "x$w0" == "x" ]];    then COMPREPLY=("''")
 	elif [[ "$w1" == "--stream"                        && "x$w0" == "x" ]];    then COMPREPLY=("''")
 	else
 		OPTIONS=(
 			"--relation"
+			#"--connection-name"
+			"--connection-string"
+			"--connection-option"
 			"--stream"
 		)
 		COMPREPLY=($(compgen -W "${OPTIONS[*]}" -- "$w0"))
--- a/src/CLIParser.h	Sun May 08 21:42:25 2022 +0200
+++ b/src/CLIParser.h	Sun Jun 05 00:08:52 2022 +0200
@@ -41,6 +41,8 @@
 
 	static const relpipe::common::type::StringX OPTION_RELATION;
 	static const relpipe::common::type::StringX OPTION_STREAM;
+	static const relpipe::common::type::StringX OPTION_CONNECTION_STRING;
+	static const relpipe::common::type::StringX OPTION_CONNECTION_OPTION;
 
 	Configuration parse(const std::vector<relpipe::common::type::StringX>& arguments) {
 		Configuration c;
@@ -52,6 +54,12 @@
 				c.relation = readNext(arguments, i);
 			} else if (option == OPTION_STREAM) {
 				c.stream = readNext(arguments, i);
+			} else if (option == OPTION_CONNECTION_STRING) {
+				c.connectionString = readNext(arguments, i);
+			} else if (option == OPTION_CONNECTION_OPTION) {
+				auto name = readNext(arguments, i);
+				auto value = readNext(arguments, i);
+				c.connectionOptions.push_back({name, value});
 			} else throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
 		}
 
@@ -64,6 +72,8 @@
 
 const relpipe::common::type::StringX CLIParser::OPTION_RELATION = L"--relation";
 const relpipe::common::type::StringX CLIParser::OPTION_STREAM = L"--stream";
+const relpipe::common::type::StringX CLIParser::OPTION_CONNECTION_STRING = L"--connection-string";
+const relpipe::common::type::StringX CLIParser::OPTION_CONNECTION_OPTION = L"--connection-option";
 
 }
 }
--- a/src/Configuration.h	Sun May 08 21:42:25 2022 +0200
+++ b/src/Configuration.h	Sun Jun 05 00:08:52 2022 +0200
@@ -29,9 +29,24 @@
 class Configuration {
 public:
 
+	class ConnectionOption {
+	public:
+		relpipe::common::type::StringX name;
+		relpipe::common::type::StringX value;
+
+		ConnectionOption() = default;
+
+		ConnectionOption(relpipe::common::type::StringX name, relpipe::common::type::StringX value) : name(name), value(value) {
+		}
+
+
+	};
+
 	relpipe::common::type::StringX relation = L"message";
 	relpipe::common::type::StringX stream = L"relpipe";
-
+	relpipe::common::type::StringX connectionString = L"mqtt://localhost:1883";
+	std::vector<ConnectionOption> connectionOptions;
+	
 	virtual ~Configuration() {
 	}
 };
--- a/src/MQTTHandler.h	Sun May 08 21:42:25 2022 +0200
+++ b/src/MQTTHandler.h	Sun Jun 05 00:08:52 2022 +0200
@@ -24,6 +24,7 @@
 #include <locale>
 #include <codecvt>
 #include <random>
+#include <regex>
 
 #include <mosquittopp.h>
 
@@ -50,16 +51,14 @@
 	 */
 	static std::string generateClientID() {
 		std::stringstream result;
-		// result << "relpipe-out-";
 		std::string symbols("0123456789abcdef");
 
 		std::random_device dev;
 		std::mt19937 rng(dev());
-		std::uniform_int_distribution<std::mt19937::result_type> dist6(0, symbols.size());
+		std::uniform_int_distribution<std::mt19937::result_type> dist(0, symbols.size());
 
-		for (int i = 0; i < 8; i++) result << symbols[dist6(rng)];
+		for (int i = 0; i < 8; i++) result << symbols[dist(rng)];
 
-		// std::cerr << "generated clien ID = " << result.str() << std::endl;
 		return result.str();
 	}
 
@@ -70,10 +69,45 @@
 		std::string currentValue;
 	} currentRelation;
 
+	static void parseConnectionString(const std::string& connectionString, std::string& hostname, int& port) {
+		std::regex pattern("mqtt:(//)?([^:]+)(:([0-9]+))?");
+		std::smatch match;
+		if (std::regex_match(connectionString, match, pattern)) {
+			hostname = match[2];
+			port = stoi(match[4]);
+		} else {
+			throw std::invalid_argument("Invalid connection string format. Expecting something like: mqtt://localhost:1883");
+		}
+	}
+
+	static void check(std::string operation, int result) {
+		if (result) throw std::logic_error("mosquitto operation failed: " + operation + " = " + std::to_string(result));
+	}
+
 public:
 
-	MQTTHandler(Configuration configuration) : configuration(configuration) {
-		mq->connect("localhost", 1883);
+	static MQTTHandler* create(Configuration configuration) {
+		MQTTHandler* h = new MQTTHandler();
+
+		std::string connectionString = h->convertor.to_bytes(configuration.connectionString);
+		std::string username;
+		std::string password;
+		std::string hostname;
+		int port;
+
+		parseConnectionString(connectionString, hostname, port);
+
+		for (auto o : configuration.connectionOptions) {
+			if (o.name == L"username") username = h->convertor.to_bytes(o.value);
+			else if (o.name == L"password") password = h->convertor.to_bytes(o.value);
+			else throw std::invalid_argument("Unsupported connection option: " + h->convertor.to_bytes(o.name));
+		}
+
+		if (username.size()) check("set credentials", h->mq->username_pw_set(username.c_str(), password.c_str()));
+
+		check("connect", h->mq->connect(hostname.c_str(), port));
+
+		return h;
 	}
 
 	void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
@@ -93,14 +127,14 @@
 		if (currentRelation.attributeIndex == currentRelation.attributes.size()) {
 			currentRelation.attributeIndex = 0;
 			int mid = -1;
-			mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str());
+			check("publish", mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str()));
 			// std::cerr << "MQTT message enqueued: " << mid << std::endl;
 		}
 
 	}
 
 	void endOfPipe() {
-		mq->disconnect();
+		check("disconnect", mq->disconnect());
 	}
 
 };
--- a/src/relpipe-out-mqtt.cpp	Sun May 08 21:42:25 2022 +0200
+++ b/src/relpipe-out-mqtt.cpp	Sun Jun 05 00:08:52 2022 +0200
@@ -44,8 +44,8 @@
 		CLIParser cliParser;
 		Configuration configuration = cliParser.parse(cli.arguments());
 		std::shared_ptr<RelationalReader> reader(Factory::create(std::cin));
-		MQTTHandler handler(configuration);
-		reader->addHandler(&handler);
+		std::shared_ptr<MQTTHandler> handler(MQTTHandler::create(configuration));
+		reader->addHandler(handler.get());
 		reader->process();
 		resultCode = CLI::EXIT_CODE_SUCCESS;
 	} catch (RelpipeCLIException e) {