src/MQTTHandler.h
branchv_0
changeset 1 cb9577442d3b
parent 0 7ef5ce9477c8
child 2 1a0fbd17db13
equal deleted inserted replaced
0:7ef5ce9477c8 1:cb9577442d3b
    22 #include <iostream>
    22 #include <iostream>
    23 #include <sstream>
    23 #include <sstream>
    24 #include <locale>
    24 #include <locale>
    25 #include <codecvt>
    25 #include <codecvt>
    26 
    26 
       
    27 #include <mosquittopp.h>
       
    28 
    27 #include <relpipe/common/type/typedefs.h>
    29 #include <relpipe/common/type/typedefs.h>
    28 #include <relpipe/reader/TypeId.h>
    30 #include <relpipe/reader/TypeId.h>
    29 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
    31 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
    30 #include <relpipe/reader/handlers/AttributeMetadata.h>
    32 #include <relpipe/reader/handlers/AttributeMetadata.h>
    31 
    33 
    32 #include "MQTT.h"
       
    33 #include "Configuration.h"
    34 #include "Configuration.h"
    34 #include "Hex.h"
    35 #include "Hex.h"
    35 
    36 
    36 namespace relpipe {
    37 namespace relpipe {
    37 namespace out {
    38 namespace out {
    39 
    40 
    40 class MQTTHandler : public relpipe::reader::handlers::RelationalReaderStringHandler {
    41 class MQTTHandler : public relpipe::reader::handlers::RelationalReaderStringHandler {
    41 private:
    42 private:
    42 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
    43 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
    43 	Configuration configuration;
    44 	Configuration configuration;
    44 	shared_ptr<MQTT> mq;
       
    45 
    45 
    46 	struct CurrentRelation {
    46 	struct CurrentRelation {
    47 		relpipe::common::type::StringX name;
    47 		relpipe::common::type::StringX name;
    48 		std::vector<relpipe::reader::handlers::AttributeMetadata> attributes;
    48 		std::vector<relpipe::reader::handlers::AttributeMetadata> attributes;
    49 		relpipe::common::type::Integer attributeIndex = 0;
    49 		relpipe::common::type::Integer attributeIndex = 0;
    51 	} currentRelation;
    51 	} currentRelation;
    52 
    52 
    53 public:
    53 public:
    54 
    54 
    55 	MQTTHandler(Configuration configuration) : configuration(configuration) {
    55 	MQTTHandler(Configuration configuration) : configuration(configuration) {
    56 		// TODO: do not throw exception from the constructor: MQTT::open()
    56 
    57 		mq.reset(MQTT::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose));
    57 		{
       
    58 			// TODO: remove
       
    59 			int major, minor, patch;
       
    60 			mosqpp::lib_version(&major, &minor, &patch);
       
    61 			std::cerr << "mosquitto version: " << major << "." << minor << "." << patch << std::endl;
       
    62 		}
    58 	}
    63 	}
    59 
    64 
    60 	void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
    65 	void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
    61 		currentRelation = CurrentRelation{name, attributes};
    66 		currentRelation = CurrentRelation{name, attributes};
    62 	}
    67 	}
    70 		else if (attributeName == L"data"); // keep empty or value from 'text'
    75 		else if (attributeName == L"data"); // keep empty or value from 'text'
    71 
    76 
    72 		currentRelation.attributeIndex++;
    77 		currentRelation.attributeIndex++;
    73 		if (currentRelation.attributeIndex == currentRelation.attributes.size()) {
    78 		if (currentRelation.attributeIndex == currentRelation.attributes.size()) {
    74 			currentRelation.attributeIndex = 0;
    79 			currentRelation.attributeIndex = 0;
    75 			mq->send(currentRelation.currentValue);
    80 			// FIXME: send the message
    76 		}
    81 		}
    77 
    82 
    78 	}
    83 	}
    79 
    84 
    80 	void endOfPipe() {
    85 	void endOfPipe() {