--- a/src/MQTTHandler.h Fri May 06 23:06:44 2022 +0200
+++ b/src/MQTTHandler.h Sun May 08 21:42:25 2022 +0200
@@ -23,6 +23,7 @@
#include <sstream>
#include <locale>
#include <codecvt>
+#include <random>
#include <mosquittopp.h>
@@ -42,6 +43,25 @@
private:
std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
Configuration configuration;
+ std::shared_ptr<mosqpp::mosquittopp> mq = std::make_shared<mosqpp::mosquittopp>(generateClientID().c_str());
+
+ /**
+ * @return unique (random) client ID for MQTT to allow multiple simultaneous connections
+ */
+ static std::string generateClientID() {
+ std::stringstream result;
+ // result << "relpipe-out-";
+ std::string symbols("0123456789abcdef");
+
+ std::random_device dev;
+ std::mt19937 rng(dev());
+ std::uniform_int_distribution<std::mt19937::result_type> dist6(0, symbols.size());
+
+ for (int i = 0; i < 8; i++) result << symbols[dist6(rng)];
+
+ // std::cerr << "generated clien ID = " << result.str() << std::endl;
+ return result.str();
+ }
struct CurrentRelation {
relpipe::common::type::StringX name;
@@ -53,16 +73,11 @@
public:
MQTTHandler(Configuration configuration) : configuration(configuration) {
-
- {
- // TODO: remove
- int major, minor, patch;
- mosqpp::lib_version(&major, &minor, &patch);
- std::cerr << "mosquitto version: " << major << "." << minor << "." << patch << std::endl;
- }
+ mq->connect("localhost", 1883);
}
void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
+ // TODO: check relation name according to the configuration
currentRelation = CurrentRelation{name, attributes};
}
@@ -77,13 +92,15 @@
currentRelation.attributeIndex++;
if (currentRelation.attributeIndex == currentRelation.attributes.size()) {
currentRelation.attributeIndex = 0;
- // FIXME: send the message
+ int mid = -1;
+ mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str());
+ // std::cerr << "MQTT message enqueued: " << mid << std::endl;
}
}
void endOfPipe() {
-
+ mq->disconnect();
}
};