--- a/bash-completion.sh Sun May 08 21:42:25 2022 +0200
+++ b/bash-completion.sh Sun Jun 05 00:08:52 2022 +0200
@@ -22,22 +22,22 @@
w2=${COMP_WORDS[COMP_CWORD-2]}
w3=${COMP_WORDS[COMP_CWORD-3]}
- DATA_TYPE=(
- "string"
- "integer"
- "boolean"
- )
-
- BOOLEAN_VALUES=(
- "true"
- "false"
+ CONNECTION_OPTIONS=(
+ "username"
+ "password"
)
if [[ "$w1" == "--relation" && "x$w0" == "x" ]]; then COMPREPLY=("''")
+ elif [[ "$w1" == "--connection-string" && "x$w0" == "x" ]]; then COMPREPLY=("'mqtt://localhost:1883'")
+ elif [[ "$w1" == "--connection-option" ]]; then COMPREPLY=($(compgen -W "${CONNECTION_OPTIONS[*]}" -- "$w0"))
+ elif [[ "$w2" == "--connection-option" && "x$w0" == "x" ]]; then COMPREPLY=("''")
elif [[ "$w1" == "--stream" && "x$w0" == "x" ]]; then COMPREPLY=("''")
else
OPTIONS=(
"--relation"
+ #"--connection-name"
+ "--connection-string"
+ "--connection-option"
"--stream"
)
COMPREPLY=($(compgen -W "${OPTIONS[*]}" -- "$w0"))
--- a/src/CLIParser.h Sun May 08 21:42:25 2022 +0200
+++ b/src/CLIParser.h Sun Jun 05 00:08:52 2022 +0200
@@ -41,6 +41,8 @@
static const relpipe::common::type::StringX OPTION_RELATION;
static const relpipe::common::type::StringX OPTION_STREAM;
+ static const relpipe::common::type::StringX OPTION_CONNECTION_STRING;
+ static const relpipe::common::type::StringX OPTION_CONNECTION_OPTION;
Configuration parse(const std::vector<relpipe::common::type::StringX>& arguments) {
Configuration c;
@@ -52,6 +54,12 @@
c.relation = readNext(arguments, i);
} else if (option == OPTION_STREAM) {
c.stream = readNext(arguments, i);
+ } else if (option == OPTION_CONNECTION_STRING) {
+ c.connectionString = readNext(arguments, i);
+ } else if (option == OPTION_CONNECTION_OPTION) {
+ auto name = readNext(arguments, i);
+ auto value = readNext(arguments, i);
+ c.connectionOptions.push_back({name, value});
} else throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
}
@@ -64,6 +72,8 @@
const relpipe::common::type::StringX CLIParser::OPTION_RELATION = L"--relation";
const relpipe::common::type::StringX CLIParser::OPTION_STREAM = L"--stream";
+const relpipe::common::type::StringX CLIParser::OPTION_CONNECTION_STRING = L"--connection-string";
+const relpipe::common::type::StringX CLIParser::OPTION_CONNECTION_OPTION = L"--connection-option";
}
}
--- a/src/Configuration.h Sun May 08 21:42:25 2022 +0200
+++ b/src/Configuration.h Sun Jun 05 00:08:52 2022 +0200
@@ -29,9 +29,24 @@
class Configuration {
public:
+ class ConnectionOption {
+ public:
+ relpipe::common::type::StringX name;
+ relpipe::common::type::StringX value;
+
+ ConnectionOption() = default;
+
+ ConnectionOption(relpipe::common::type::StringX name, relpipe::common::type::StringX value) : name(name), value(value) {
+ }
+
+
+ };
+
relpipe::common::type::StringX relation = L"message";
relpipe::common::type::StringX stream = L"relpipe";
-
+ relpipe::common::type::StringX connectionString = L"mqtt://localhost:1883";
+ std::vector<ConnectionOption> connectionOptions;
+
virtual ~Configuration() {
}
};
--- a/src/MQTTHandler.h Sun May 08 21:42:25 2022 +0200
+++ b/src/MQTTHandler.h Sun Jun 05 00:08:52 2022 +0200
@@ -24,6 +24,7 @@
#include <locale>
#include <codecvt>
#include <random>
+#include <regex>
#include <mosquittopp.h>
@@ -50,16 +51,14 @@
*/
static std::string generateClientID() {
std::stringstream result;
- // result << "relpipe-out-";
std::string symbols("0123456789abcdef");
std::random_device dev;
std::mt19937 rng(dev());
- std::uniform_int_distribution<std::mt19937::result_type> dist6(0, symbols.size());
+ std::uniform_int_distribution<std::mt19937::result_type> dist(0, symbols.size());
- for (int i = 0; i < 8; i++) result << symbols[dist6(rng)];
+ for (int i = 0; i < 8; i++) result << symbols[dist(rng)];
- // std::cerr << "generated clien ID = " << result.str() << std::endl;
return result.str();
}
@@ -70,10 +69,45 @@
std::string currentValue;
} currentRelation;
+ static void parseConnectionString(const std::string& connectionString, std::string& hostname, int& port) {
+ std::regex pattern("mqtt:(//)?([^:]+)(:([0-9]+))?");
+ std::smatch match;
+ if (std::regex_match(connectionString, match, pattern)) {
+ hostname = match[2];
+ port = stoi(match[4]);
+ } else {
+ throw std::invalid_argument("Invalid connection string format. Expecting something like: mqtt://localhost:1883");
+ }
+ }
+
+ static void check(std::string operation, int result) {
+ if (result) throw std::logic_error("mosquitto operation failed: " + operation + " = " + std::to_string(result));
+ }
+
public:
- MQTTHandler(Configuration configuration) : configuration(configuration) {
- mq->connect("localhost", 1883);
+ static MQTTHandler* create(Configuration configuration) {
+ MQTTHandler* h = new MQTTHandler();
+
+ std::string connectionString = h->convertor.to_bytes(configuration.connectionString);
+ std::string username;
+ std::string password;
+ std::string hostname;
+ int port;
+
+ parseConnectionString(connectionString, hostname, port);
+
+ for (auto o : configuration.connectionOptions) {
+ if (o.name == L"username") username = h->convertor.to_bytes(o.value);
+ else if (o.name == L"password") password = h->convertor.to_bytes(o.value);
+ else throw std::invalid_argument("Unsupported connection option: " + h->convertor.to_bytes(o.name));
+ }
+
+ if (username.size()) check("set credentials", h->mq->username_pw_set(username.c_str(), password.c_str()));
+
+ check("connect", h->mq->connect(hostname.c_str(), port));
+
+ return h;
}
void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
@@ -93,14 +127,14 @@
if (currentRelation.attributeIndex == currentRelation.attributes.size()) {
currentRelation.attributeIndex = 0;
int mid = -1;
- mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str());
+ check("publish", mq->publish(&mid, convertor.to_bytes(configuration.stream).c_str(), currentRelation.currentValue.size(), currentRelation.currentValue.c_str()));
// std::cerr << "MQTT message enqueued: " << mid << std::endl;
}
}
void endOfPipe() {
- mq->disconnect();
+ check("disconnect", mq->disconnect());
}
};
--- a/src/relpipe-out-mqtt.cpp Sun May 08 21:42:25 2022 +0200
+++ b/src/relpipe-out-mqtt.cpp Sun Jun 05 00:08:52 2022 +0200
@@ -44,8 +44,8 @@
CLIParser cliParser;
Configuration configuration = cliParser.parse(cli.arguments());
std::shared_ptr<RelationalReader> reader(Factory::create(std::cin));
- MQTTHandler handler(configuration);
- reader->addHandler(&handler);
+ std::shared_ptr<MQTTHandler> handler(MQTTHandler::create(configuration));
+ reader->addHandler(handler.get());
reader->process();
resultCode = CLI::EXIT_CODE_SUCCESS;
} catch (RelpipeCLIException e) {