src/MQTTHandler.h
branchv_0
changeset 3 62ced7c41c67
parent 2 1a0fbd17db13
equal deleted inserted replaced
2:1a0fbd17db13 3:62ced7c41c67
    22 #include <iostream>
    22 #include <iostream>
    23 #include <sstream>
    23 #include <sstream>
    24 #include <locale>
    24 #include <locale>
    25 #include <codecvt>
    25 #include <codecvt>
    26 #include <random>
    26 #include <random>
       
    27 #include <regex>
    27 
    28 
    28 #include <mosquittopp.h>
    29 #include <mosquittopp.h>
    29 
    30 
    30 #include <relpipe/common/type/typedefs.h>
    31 #include <relpipe/common/type/typedefs.h>
    31 #include <relpipe/reader/TypeId.h>
    32 #include <relpipe/reader/TypeId.h>
    48 	/**
    49 	/**
    49 	 * @return unique (random) client ID for MQTT to allow multiple simultaneous connections
    50 	 * @return unique (random) client ID for MQTT to allow multiple simultaneous connections
    50 	 */
    51 	 */
    51 	static std::string generateClientID() {
    52 	static std::string generateClientID() {
    52 		std::stringstream result;
    53 		std::stringstream result;
    53 		// result << "relpipe-out-";
       
    54 		std::string symbols("0123456789abcdef");
    54 		std::string symbols("0123456789abcdef");
    55 
    55 
    56 		std::random_device dev;
    56 		std::random_device dev;
    57 		std::mt19937 rng(dev());
    57 		std::mt19937 rng(dev());
    58 		std::uniform_int_distribution<std::mt19937::result_type> dist6(0, symbols.size());
    58 		std::uniform_int_distribution<std::mt19937::result_type> dist(0, symbols.size());
    59 
    59 
    60 		for (int i = 0; i < 8; i++) result << symbols[dist6(rng)];
    60 		for (int i = 0; i < 8; i++) result << symbols[dist(rng)];
    61 
    61 
    62 		// std::cerr << "generated clien ID = " << result.str() << std::endl;
       
    63 		return result.str();
    62 		return result.str();
    64 	}
    63 	}
    65 
    64 
    66 	struct CurrentRelation {
    65 	struct CurrentRelation {
    67 		relpipe::common::type::StringX name;
    66 		relpipe::common::type::StringX name;
    68 		std::vector<relpipe::reader::handlers::AttributeMetadata> attributes;
    67 		std::vector<relpipe::reader::handlers::AttributeMetadata> attributes;
    69 		relpipe::common::type::Integer attributeIndex = 0;
    68 		relpipe::common::type::Integer attributeIndex = 0;
    70 		std::string currentValue;
    69 		std::string currentValue;
    71 	} currentRelation;
    70 	} currentRelation;
    72 
    71 
       
    72 	static void parseConnectionString(const std::string& connectionString, std::string& hostname, int& port) {
       
    73 		std::regex pattern("mqtt:(//)?([^:]+)(:([0-9]+))?");
       
    74 		std::smatch match;
       
    75 		if (std::regex_match(connectionString, match, pattern)) {
       
    76 			hostname = match[2];
       
    77 			port = stoi(match[4]);
       
    78 		} else {
       
    79 			throw std::invalid_argument("Invalid connection string format. Expecting something like: mqtt://localhost:1883");
       
    80 		}
       
    81 	}
       
    82 
       
    83 	static void check(std::string operation, int result) {
       
    84 		if (result) throw std::logic_error("mosquitto operation failed: " + operation + " = " + std::to_string(result));
       
    85 	}
       
    86 
    73 public:
    87 public:
    74 
    88 
    75 	MQTTHandler(Configuration configuration) : configuration(configuration) {
    89 	static MQTTHandler* create(Configuration configuration) {
    76 		mq->connect("localhost", 1883);
    90 		MQTTHandler* h = new MQTTHandler();
       
    91 
       
    92 		std::string connectionString = h->convertor.to_bytes(configuration.connectionString);
       
    93 		std::string username;
       
    94 		std::string password;
       
    95 		std::string hostname;
       
    96 		int port;
       
    97 
       
    98 		parseConnectionString(connectionString, hostname, port);
       
    99 
       
   100 		for (auto o : configuration.connectionOptions) {
       
   101 			if (o.name == L"username") username = h->convertor.to_bytes(o.value);
       
   102 			else if (o.name == L"password") password = h->convertor.to_bytes(o.value);
       
   103 			else throw std::invalid_argument("Unsupported connection option: " + h->convertor.to_bytes(o.name));
       
   104 		}
       
   105 
       
   106 		if (username.size()) check("set credentials", h->mq->username_pw_set(username.c_str(), password.c_str()));
       
   107 
       
   108 		check("connect", h->mq->connect(hostname.c_str(), port));
       
   109 
       
   110 		return h;
    77 	}
   111 	}
    78 
   112 
    79 	void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
   113 	void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
    80 		// TODO: check relation name according to the configuration
   114 		// TODO: check relation name according to the configuration
    81 		currentRelation = CurrentRelation{name, attributes};
   115 		currentRelation = CurrentRelation{name, attributes};
    91 
   125 
    92 		currentRelation.attributeIndex++;
   126 		currentRelation.attributeIndex++;
    93 		if (currentRelation.attributeIndex == currentRelation.attributes.size()) {
   127 		if (currentRelation.attributeIndex == currentRelation.attributes.size()) {
    94 			currentRelation.attributeIndex = 0;
   128 			currentRelation.attributeIndex = 0;
    95 			int mid = -1;
   129 			int mid = -1;
    96 			mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str());
   130 			check("publish", mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str()));
    97 			// std::cerr << "MQTT message enqueued: " << mid << std::endl;
   131 			// std::cerr << "MQTT message enqueued: " << mid << std::endl;
    98 		}
   132 		}
    99 
   133 
   100 	}
   134 	}
   101 
   135 
   102 	void endOfPipe() {
   136 	void endOfPipe() {
   103 		mq->disconnect();
   137 		check("disconnect", mq->disconnect());
   104 	}
   138 	}
   105 
   139 
   106 };
   140 };
   107 
   141 
   108 }
   142 }