src/MQTTHandler.h
branchv_0
changeset 2 1a0fbd17db13
parent 1 cb9577442d3b
child 3 62ced7c41c67
equal deleted inserted replaced
1:cb9577442d3b 2:1a0fbd17db13
    21 #include <vector>
    21 #include <vector>
    22 #include <iostream>
    22 #include <iostream>
    23 #include <sstream>
    23 #include <sstream>
    24 #include <locale>
    24 #include <locale>
    25 #include <codecvt>
    25 #include <codecvt>
       
    26 #include <random>
    26 
    27 
    27 #include <mosquittopp.h>
    28 #include <mosquittopp.h>
    28 
    29 
    29 #include <relpipe/common/type/typedefs.h>
    30 #include <relpipe/common/type/typedefs.h>
    30 #include <relpipe/reader/TypeId.h>
    31 #include <relpipe/reader/TypeId.h>
    40 
    41 
    41 class MQTTHandler : public relpipe::reader::handlers::RelationalReaderStringHandler {
    42 class MQTTHandler : public relpipe::reader::handlers::RelationalReaderStringHandler {
    42 private:
    43 private:
    43 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
    44 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
    44 	Configuration configuration;
    45 	Configuration configuration;
       
    46 	std::shared_ptr<mosqpp::mosquittopp> mq = std::make_shared<mosqpp::mosquittopp>(generateClientID().c_str());
       
    47 
       
    48 	/**
       
    49 	 * @return unique (random) client ID for MQTT to allow multiple simultaneous connections
       
    50 	 */
       
    51 	static std::string generateClientID() {
       
    52 		std::stringstream result;
       
    53 		// result << "relpipe-out-";
       
    54 		std::string symbols("0123456789abcdef");
       
    55 
       
    56 		std::random_device dev;
       
    57 		std::mt19937 rng(dev());
       
    58 		std::uniform_int_distribution<std::mt19937::result_type> dist6(0, symbols.size());
       
    59 
       
    60 		for (int i = 0; i < 8; i++) result << symbols[dist6(rng)];
       
    61 
       
    62 		// std::cerr << "generated clien ID = " << result.str() << std::endl;
       
    63 		return result.str();
       
    64 	}
    45 
    65 
    46 	struct CurrentRelation {
    66 	struct CurrentRelation {
    47 		relpipe::common::type::StringX name;
    67 		relpipe::common::type::StringX name;
    48 		std::vector<relpipe::reader::handlers::AttributeMetadata> attributes;
    68 		std::vector<relpipe::reader::handlers::AttributeMetadata> attributes;
    49 		relpipe::common::type::Integer attributeIndex = 0;
    69 		relpipe::common::type::Integer attributeIndex = 0;
    51 	} currentRelation;
    71 	} currentRelation;
    52 
    72 
    53 public:
    73 public:
    54 
    74 
    55 	MQTTHandler(Configuration configuration) : configuration(configuration) {
    75 	MQTTHandler(Configuration configuration) : configuration(configuration) {
    56 
    76 		mq->connect("localhost", 1883);
    57 		{
       
    58 			// TODO: remove
       
    59 			int major, minor, patch;
       
    60 			mosqpp::lib_version(&major, &minor, &patch);
       
    61 			std::cerr << "mosquitto version: " << major << "." << minor << "." << patch << std::endl;
       
    62 		}
       
    63 	}
    77 	}
    64 
    78 
    65 	void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
    79 	void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
       
    80 		// TODO: check relation name according to the configuration
    66 		currentRelation = CurrentRelation{name, attributes};
    81 		currentRelation = CurrentRelation{name, attributes};
    67 	}
    82 	}
    68 
    83 
    69 	void attribute(const relpipe::common::type::StringX& value) override {
    84 	void attribute(const relpipe::common::type::StringX& value) override {
    70 
    85 
    75 		else if (attributeName == L"data"); // keep empty or value from 'text'
    90 		else if (attributeName == L"data"); // keep empty or value from 'text'
    76 
    91 
    77 		currentRelation.attributeIndex++;
    92 		currentRelation.attributeIndex++;
    78 		if (currentRelation.attributeIndex == currentRelation.attributes.size()) {
    93 		if (currentRelation.attributeIndex == currentRelation.attributes.size()) {
    79 			currentRelation.attributeIndex = 0;
    94 			currentRelation.attributeIndex = 0;
    80 			// FIXME: send the message
    95 			int mid = -1;
       
    96 			mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str());
       
    97 			// std::cerr << "MQTT message enqueued: " << mid << std::endl;
    81 		}
    98 		}
    82 
    99 
    83 	}
   100 	}
    84 
   101 
    85 	void endOfPipe() {
   102 	void endOfPipe() {
    86 
   103 		mq->disconnect();
    87 	}
   104 	}
    88 
   105 
    89 };
   106 };
    90 
   107 
    91 }
   108 }