equal
deleted
inserted
replaced
22 #include <iostream> |
22 #include <iostream> |
23 #include <sstream> |
23 #include <sstream> |
24 #include <locale> |
24 #include <locale> |
25 #include <codecvt> |
25 #include <codecvt> |
26 |
26 |
|
27 #include <mosquittopp.h> |
|
28 |
27 #include <relpipe/common/type/typedefs.h> |
29 #include <relpipe/common/type/typedefs.h> |
28 #include <relpipe/reader/TypeId.h> |
30 #include <relpipe/reader/TypeId.h> |
29 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h> |
31 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h> |
30 #include <relpipe/reader/handlers/AttributeMetadata.h> |
32 #include <relpipe/reader/handlers/AttributeMetadata.h> |
31 |
33 |
32 #include "MQTT.h" |
|
33 #include "Configuration.h" |
34 #include "Configuration.h" |
34 #include "Hex.h" |
35 #include "Hex.h" |
35 |
36 |
36 namespace relpipe { |
37 namespace relpipe { |
37 namespace out { |
38 namespace out { |
39 |
40 |
40 class MQTTHandler : public relpipe::reader::handlers::RelationalReaderStringHandler { |
41 class MQTTHandler : public relpipe::reader::handlers::RelationalReaderStringHandler { |
41 private: |
42 private: |
42 std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings. |
43 std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings. |
43 Configuration configuration; |
44 Configuration configuration; |
44 shared_ptr<MQTT> mq; |
|
45 |
45 |
46 struct CurrentRelation { |
46 struct CurrentRelation { |
47 relpipe::common::type::StringX name; |
47 relpipe::common::type::StringX name; |
48 std::vector<relpipe::reader::handlers::AttributeMetadata> attributes; |
48 std::vector<relpipe::reader::handlers::AttributeMetadata> attributes; |
49 relpipe::common::type::Integer attributeIndex = 0; |
49 relpipe::common::type::Integer attributeIndex = 0; |
51 } currentRelation; |
51 } currentRelation; |
52 |
52 |
53 public: |
53 public: |
54 |
54 |
55 MQTTHandler(Configuration configuration) : configuration(configuration) { |
55 MQTTHandler(Configuration configuration) : configuration(configuration) { |
56 // TODO: do not throw exception from the constructor: MQTT::open() |
56 |
57 mq.reset(MQTT::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose)); |
57 { |
|
58 // TODO: remove |
|
59 int major, minor, patch; |
|
60 mosqpp::lib_version(&major, &minor, &patch); |
|
61 std::cerr << "mosquitto version: " << major << "." << minor << "." << patch << std::endl; |
|
62 } |
58 } |
63 } |
59 |
64 |
60 void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override { |
65 void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override { |
61 currentRelation = CurrentRelation{name, attributes}; |
66 currentRelation = CurrentRelation{name, attributes}; |
62 } |
67 } |
70 else if (attributeName == L"data"); // keep empty or value from 'text' |
75 else if (attributeName == L"data"); // keep empty or value from 'text' |
71 |
76 |
72 currentRelation.attributeIndex++; |
77 currentRelation.attributeIndex++; |
73 if (currentRelation.attributeIndex == currentRelation.attributes.size()) { |
78 if (currentRelation.attributeIndex == currentRelation.attributes.size()) { |
74 currentRelation.attributeIndex = 0; |
79 currentRelation.attributeIndex = 0; |
75 mq->send(currentRelation.currentValue); |
80 // FIXME: send the message |
76 } |
81 } |
77 |
82 |
78 } |
83 } |
79 |
84 |
80 void endOfPipe() { |
85 void endOfPipe() { |