40 |
41 |
41 class MQTTHandler : public relpipe::reader::handlers::RelationalReaderStringHandler { |
42 class MQTTHandler : public relpipe::reader::handlers::RelationalReaderStringHandler { |
42 private: |
43 private: |
43 std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings. |
44 std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings. |
44 Configuration configuration; |
45 Configuration configuration; |
|
46 std::shared_ptr<mosqpp::mosquittopp> mq = std::make_shared<mosqpp::mosquittopp>(generateClientID().c_str()); |
|
47 |
|
48 /** |
|
49 * @return unique (random) client ID for MQTT to allow multiple simultaneous connections |
|
50 */ |
|
51 static std::string generateClientID() { |
|
52 std::stringstream result; |
|
53 // result << "relpipe-out-"; |
|
54 std::string symbols("0123456789abcdef"); |
|
55 |
|
56 std::random_device dev; |
|
57 std::mt19937 rng(dev()); |
|
58 std::uniform_int_distribution<std::mt19937::result_type> dist6(0, symbols.size()); |
|
59 |
|
60 for (int i = 0; i < 8; i++) result << symbols[dist6(rng)]; |
|
61 |
|
62 // std::cerr << "generated clien ID = " << result.str() << std::endl; |
|
63 return result.str(); |
|
64 } |
45 |
65 |
46 struct CurrentRelation { |
66 struct CurrentRelation { |
47 relpipe::common::type::StringX name; |
67 relpipe::common::type::StringX name; |
48 std::vector<relpipe::reader::handlers::AttributeMetadata> attributes; |
68 std::vector<relpipe::reader::handlers::AttributeMetadata> attributes; |
49 relpipe::common::type::Integer attributeIndex = 0; |
69 relpipe::common::type::Integer attributeIndex = 0; |
51 } currentRelation; |
71 } currentRelation; |
52 |
72 |
53 public: |
73 public: |
54 |
74 |
55 MQTTHandler(Configuration configuration) : configuration(configuration) { |
75 MQTTHandler(Configuration configuration) : configuration(configuration) { |
56 |
76 mq->connect("localhost", 1883); |
57 { |
|
58 // TODO: remove |
|
59 int major, minor, patch; |
|
60 mosqpp::lib_version(&major, &minor, &patch); |
|
61 std::cerr << "mosquitto version: " << major << "." << minor << "." << patch << std::endl; |
|
62 } |
|
63 } |
77 } |
64 |
78 |
65 void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override { |
79 void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override { |
|
80 // TODO: check relation name according to the configuration |
66 currentRelation = CurrentRelation{name, attributes}; |
81 currentRelation = CurrentRelation{name, attributes}; |
67 } |
82 } |
68 |
83 |
69 void attribute(const relpipe::common::type::StringX& value) override { |
84 void attribute(const relpipe::common::type::StringX& value) override { |
70 |
85 |
75 else if (attributeName == L"data"); // keep empty or value from 'text' |
90 else if (attributeName == L"data"); // keep empty or value from 'text' |
76 |
91 |
77 currentRelation.attributeIndex++; |
92 currentRelation.attributeIndex++; |
78 if (currentRelation.attributeIndex == currentRelation.attributes.size()) { |
93 if (currentRelation.attributeIndex == currentRelation.attributes.size()) { |
79 currentRelation.attributeIndex = 0; |
94 currentRelation.attributeIndex = 0; |
80 // FIXME: send the message |
95 int mid = -1; |
|
96 mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str()); |
|
97 // std::cerr << "MQTT message enqueued: " << mid << std::endl; |
81 } |
98 } |
82 |
99 |
83 } |
100 } |
84 |
101 |
85 void endOfPipe() { |
102 void endOfPipe() { |
86 |
103 mq->disconnect(); |
87 } |
104 } |
88 |
105 |
89 }; |
106 }; |
90 |
107 |
91 } |
108 } |