--- a/src/MQTTHandler.h Sun May 08 21:42:25 2022 +0200
+++ b/src/MQTTHandler.h Sun Jun 05 00:08:52 2022 +0200
@@ -24,6 +24,7 @@
#include <locale>
#include <codecvt>
#include <random>
+#include <regex>
#include <mosquittopp.h>
@@ -50,16 +51,14 @@
*/
static std::string generateClientID() {
std::stringstream result;
- // result << "relpipe-out-";
std::string symbols("0123456789abcdef");
std::random_device dev;
std::mt19937 rng(dev());
- std::uniform_int_distribution<std::mt19937::result_type> dist6(0, symbols.size());
+ std::uniform_int_distribution<std::mt19937::result_type> dist(0, symbols.size());
- for (int i = 0; i < 8; i++) result << symbols[dist6(rng)];
+ for (int i = 0; i < 8; i++) result << symbols[dist(rng)];
- // std::cerr << "generated clien ID = " << result.str() << std::endl;
return result.str();
}
@@ -70,10 +69,45 @@
std::string currentValue;
} currentRelation;
+ static void parseConnectionString(const std::string& connectionString, std::string& hostname, int& port) {
+ std::regex pattern("mqtt:(//)?([^:]+)(:([0-9]+))?");
+ std::smatch match;
+ if (std::regex_match(connectionString, match, pattern)) {
+ hostname = match[2];
+ port = stoi(match[4]);
+ } else {
+ throw std::invalid_argument("Invalid connection string format. Expecting something like: mqtt://localhost:1883");
+ }
+ }
+
+ static void check(std::string operation, int result) {
+ if (result) throw std::logic_error("mosquitto operation failed: " + operation + " = " + std::to_string(result));
+ }
+
public:
- MQTTHandler(Configuration configuration) : configuration(configuration) {
- mq->connect("localhost", 1883);
+ static MQTTHandler* create(Configuration configuration) {
+ MQTTHandler* h = new MQTTHandler();
+
+ std::string connectionString = h->convertor.to_bytes(configuration.connectionString);
+ std::string username;
+ std::string password;
+ std::string hostname;
+ int port;
+
+ parseConnectionString(connectionString, hostname, port);
+
+ for (auto o : configuration.connectionOptions) {
+ if (o.name == L"username") username = h->convertor.to_bytes(o.value);
+ else if (o.name == L"password") password = h->convertor.to_bytes(o.value);
+ else throw std::invalid_argument("Unsupported connection option: " + h->convertor.to_bytes(o.name));
+ }
+
+ if (username.size()) check("set credentials", h->mq->username_pw_set(username.c_str(), password.c_str()));
+
+ check("connect", h->mq->connect(hostname.c_str(), port));
+
+ return h;
}
void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
@@ -93,14 +127,14 @@
if (currentRelation.attributeIndex == currentRelation.attributes.size()) {
currentRelation.attributeIndex = 0;
int mid = -1;
- mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str());
+ check("publish", mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str()));
// std::cerr << "MQTT message enqueued: " << mid << std::endl;
}
}
void endOfPipe() {
- mq->disconnect();
+ check("disconnect", mq->disconnect());
}
};