src/MQTTHandler.h
branchv_0
changeset 3 62ced7c41c67
parent 2 1a0fbd17db13
--- a/src/MQTTHandler.h	Sun May 08 21:42:25 2022 +0200
+++ b/src/MQTTHandler.h	Sun Jun 05 00:08:52 2022 +0200
@@ -24,6 +24,7 @@
 #include <locale>
 #include <codecvt>
 #include <random>
+#include <regex>
 
 #include <mosquittopp.h>
 
@@ -50,16 +51,14 @@
 	 */
 	static std::string generateClientID() {
 		std::stringstream result;
-		// result << "relpipe-out-";
 		std::string symbols("0123456789abcdef");
 
 		std::random_device dev;
 		std::mt19937 rng(dev());
-		std::uniform_int_distribution<std::mt19937::result_type> dist6(0, symbols.size());
+		std::uniform_int_distribution<std::mt19937::result_type> dist(0, symbols.size());
 
-		for (int i = 0; i < 8; i++) result << symbols[dist6(rng)];
+		for (int i = 0; i < 8; i++) result << symbols[dist(rng)];
 
-		// std::cerr << "generated clien ID = " << result.str() << std::endl;
 		return result.str();
 	}
 
@@ -70,10 +69,45 @@
 		std::string currentValue;
 	} currentRelation;
 
+	static void parseConnectionString(const std::string& connectionString, std::string& hostname, int& port) {
+		std::regex pattern("mqtt:(//)?([^:]+)(:([0-9]+))?");
+		std::smatch match;
+		if (std::regex_match(connectionString, match, pattern)) {
+			hostname = match[2];
+			port = stoi(match[4]);
+		} else {
+			throw std::invalid_argument("Invalid connection string format. Expecting something like: mqtt://localhost:1883");
+		}
+	}
+
+	static void check(std::string operation, int result) {
+		if (result) throw std::logic_error("mosquitto operation failed: " + operation + " = " + std::to_string(result));
+	}
+
 public:
 
-	MQTTHandler(Configuration configuration) : configuration(configuration) {
-		mq->connect("localhost", 1883);
+	static MQTTHandler* create(Configuration configuration) {
+		MQTTHandler* h = new MQTTHandler();
+
+		std::string connectionString = h->convertor.to_bytes(configuration.connectionString);
+		std::string username;
+		std::string password;
+		std::string hostname;
+		int port;
+
+		parseConnectionString(connectionString, hostname, port);
+
+		for (auto o : configuration.connectionOptions) {
+			if (o.name == L"username") username = h->convertor.to_bytes(o.value);
+			else if (o.name == L"password") password = h->convertor.to_bytes(o.value);
+			else throw std::invalid_argument("Unsupported connection option: " + h->convertor.to_bytes(o.name));
+		}
+
+		if (username.size()) check("set credentials", h->mq->username_pw_set(username.c_str(), password.c_str()));
+
+		check("connect", h->mq->connect(hostname.c_str(), port));
+
+		return h;
 	}
 
 	void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
@@ -93,14 +127,14 @@
 		if (currentRelation.attributeIndex == currentRelation.attributes.size()) {
 			currentRelation.attributeIndex = 0;
 			int mid = -1;
-			mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str());
+			check("publish", mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str()));
 			// std::cerr << "MQTT message enqueued: " << mid << std::endl;
 		}
 
 	}
 
 	void endOfPipe() {
-		mq->disconnect();
+		check("disconnect", mq->disconnect());
 	}
 
 };