src/MQTTCommand.cpp
branchv_0
changeset 3 610783d70ae9
parent 2 0799eaf338b9
equal deleted inserted replaced
2:0799eaf338b9 3:610783d70ae9
    22 #include <algorithm>
    22 #include <algorithm>
    23 #include <unistd.h>
    23 #include <unistd.h>
    24 #include <sstream>
    24 #include <sstream>
    25 #include <iomanip>
    25 #include <iomanip>
    26 #include <random>
    26 #include <random>
       
    27 #include <regex>
    27 
    28 
    28 #include <mosquittopp.h>
    29 #include <mosquittopp.h>
    29 
    30 
    30 #include <relpipe/writer/RelationalWriter.h>
    31 #include <relpipe/writer/RelationalWriter.h>
    31 #include <relpipe/writer/RelpipeWriterException.h>
    32 #include <relpipe/writer/RelpipeWriterException.h>
    55 	/**
    56 	/**
    56 	 * @return unique (random) client ID for MQTT to allow multiple simultaneous connections
    57 	 * @return unique (random) client ID for MQTT to allow multiple simultaneous connections
    57 	 */
    58 	 */
    58 	static std::string generateClientID() {
    59 	static std::string generateClientID() {
    59 		std::stringstream result;
    60 		std::stringstream result;
    60 		// result << "relpipe-in-";
       
    61 		std::string symbols("0123456789abcdef");
    61 		std::string symbols("0123456789abcdef");
    62 
    62 
    63 		std::random_device dev;
    63 		std::random_device dev;
    64 		std::mt19937 rng(dev());
    64 		std::mt19937 rng(dev());
    65 		std::uniform_int_distribution<std::mt19937::result_type> dist6(0, symbols.size());
    65 		std::uniform_int_distribution<std::mt19937::result_type> dist(0, symbols.size());
    66 
    66 
    67 		for (int i = 0; i < 8; i++) result << symbols[dist6(rng)];
    67 		for (int i = 0; i < 8; i++) result << symbols[dist(rng)];
    68 
    68 
    69 		// std::cerr << "generated clien ID = " << result.str() << std::endl;
       
    70 		return result.str();
    69 		return result.str();
    71 	}
    70 	}
    72 public:
    71 public:
    73 
    72 
    74 	MQTTClient(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) : mosqpp::mosquittopp((generateClientID()).c_str()), writer(writer), configuration(configuration) {
    73 	MQTTClient(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) : mosqpp::mosquittopp((generateClientID()).c_str()), writer(writer), configuration(configuration) {
    89 		return count;
    88 		return count;
    90 	}
    89 	}
    91 
    90 
    92 };
    91 };
    93 
    92 
       
    93 void MQTTCommand::parseConnectionString(const std::string& connectionString, std::string& hostname, int& port) {
       
    94 	std::regex pattern("mqtt:(//)?([^:]+)(:([0-9]+))?");
       
    95 	std::smatch match;
       
    96 	if (std::regex_match(connectionString, match, pattern)) {
       
    97 		hostname = match[2];
       
    98 		port = stoi(match[4]);
       
    99 	} else {
       
   100 		throw std::invalid_argument("Invalid connection string format. Expecting something like: mqtt://localhost:1883");
       
   101 	}
       
   102 }
       
   103 
       
   104 void MQTTCommand::check(std::string operation, int result) {
       
   105 	if (result) throw std::logic_error("mosquitto operation failed: " + operation + " = " + std::to_string(result));
       
   106 }
       
   107 
    94 void MQTTCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) {
   108 void MQTTCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) {
    95 	std::shared_ptr<MQTTClient> mq = std::make_shared<MQTTClient>(writer, configuration);
   109 	std::shared_ptr<MQTTClient> mq = std::make_shared<MQTTClient>(writer, configuration);
    96 
   110 
    97 	writer->startRelation(configuration.relation,{
   111 	writer->startRelation(configuration.relation,{
    98 		{L"stream", TypeId::STRING},
   112 		{L"stream", TypeId::STRING},
    99 		{L"text", TypeId::STRING},
   113 		{L"text", TypeId::STRING},
   100 		{L"data", TypeId::STRING}
   114 		{L"data", TypeId::STRING}
   101 	}, true);
   115 	}, true);
   102 
   116 
   103 	mq->max_inflight_messages_set(1);
   117 	std::string connectionString = convertor.to_bytes(configuration.connectionString);
   104 	mq->connect("localhost", 1883);
   118 	std::string username;
       
   119 	std::string password;
       
   120 	std::string hostname;
       
   121 	int port;
       
   122 
       
   123 	parseConnectionString(connectionString, hostname, port);
       
   124 
       
   125 	for (auto o : configuration.connectionOptions) {
       
   126 		if (o.name == L"username") username = convertor.to_bytes(o.value);
       
   127 		else if (o.name == L"password") password = convertor.to_bytes(o.value);
       
   128 		else throw std::invalid_argument("Unsupported connection option: " + convertor.to_bytes(o.name));
       
   129 	}
       
   130 
       
   131 	if (username.size()) check("set credentials", mq->username_pw_set(username.c_str(), password.c_str()));
       
   132 
       
   133 	check("set maximum inflight messages", mq->max_inflight_messages_set(1));
       
   134 	check("connect", mq->connect(hostname.c_str(), port));
   105 	int mid;
   135 	int mid;
   106 	mq->subscribe(&mid, convertor.to_bytes(configuration.stream).c_str());
   136 	check("substcribe", mq->subscribe(&mid, convertor.to_bytes(configuration.stream).c_str()));
   107 
   137 
   108 	//for (int i = configuration.messageCount; continueProcessing && i > 0; i--) {
   138 	//for (int i = configuration.messageCount; continueProcessing && i > 0; i--) {
   109 	for (int i = configuration.messageCount; continueProcessing && i > 0; i = i - mq->popMessageCount()) {
   139 	for (int i = configuration.messageCount; continueProcessing && i > 0; i = i - mq->popMessageCount()) {
   110 		// std::cerr << "loop(): i=" << i << std::endl;
   140 		// std::cerr << "loop(): i=" << i << std::endl;
   111 
   141 
   112 		//mq->loop();
   142 		//mq->loop();
   113 		mq->loop(1000, 1);
   143 		check("loop", mq->loop(1000, 1));
   114 		//mq->loop(1000, -1);
   144 		//mq->loop(1000, -1);
   115 		//mq->loop_forever();
   145 		//mq->loop_forever();
   116 		//mq->loop_write();
   146 		//mq->loop_write();
   117 	}
   147 	}
   118 
   148