--- a/bash-completion.sh Sun May 08 21:42:14 2022 +0200
+++ b/bash-completion.sh Sun Jun 05 22:51:45 2022 +0200
@@ -22,23 +22,23 @@
w2=${COMP_WORDS[COMP_CWORD-2]}
w3=${COMP_WORDS[COMP_CWORD-3]}
- DATA_TYPE=(
- "string"
- "integer"
- "boolean"
- )
-
- BOOLEAN_VALUES=(
- "true"
- "false"
+ CONNECTION_OPTIONS=(
+ "username"
+ "password"
)
if [[ "$w1" == "--relation" && "x$w0" == "x" ]]; then COMPREPLY=("''")
+ elif [[ "$w1" == "--connection-string" && "x$w0" == "x" ]]; then COMPREPLY=("'mqtt://localhost:1883'")
+ elif [[ "$w1" == "--connection-option" ]]; then COMPREPLY=($(compgen -W "${CONNECTION_OPTIONS[*]}" -- "$w0"))
+ elif [[ "$w2" == "--connection-option" && "x$w0" == "x" ]]; then COMPREPLY=("''")
elif [[ "$w1" == "--stream" && "x$w0" == "x" ]]; then COMPREPLY=("''")
elif [[ "$w1" == "--message-count" && "x$w0" == "x" ]]; then COMPREPLY=("1")
else
OPTIONS=(
"--relation"
+ #"--connection-name"
+ "--connection-string"
+ "--connection-option"
"--stream"
"--message-count"
)
--- a/src/CLIParser.h Sun May 08 21:42:14 2022 +0200
+++ b/src/CLIParser.h Sun Jun 05 22:51:45 2022 +0200
@@ -19,7 +19,7 @@
#include <vector>
#include <iostream>
-#include <relpipe/writer/typedefs.h>
+#include <relpipe/common/type/typedefs.h>
#include <relpipe/cli/CLI.h>
#include <relpipe/cli/RelpipeCLIException.h>
@@ -32,27 +32,35 @@
class CLIParser {
private:
- relpipe::writer::string_t readNext(const std::vector<relpipe::writer::string_t>& arguments, int& i) {
+ relpipe::common::type::StringX readNext(const std::vector<relpipe::common::type::StringX>& arguments, int& i) {
if (i < arguments.size()) return arguments[i++];
else throw relpipe::cli::RelpipeCLIException(L"Missing CLI argument" + (i > 0 ? (L" after " + arguments[i - 1]) : L""), relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
}
public:
- static const relpipe::writer::string_t OPTION_RELATION;
- static const relpipe::writer::string_t OPTION_STREAM;
- static const relpipe::writer::string_t OPTION_MESSAGE_COUNT;
+ static const relpipe::common::type::StringX OPTION_RELATION;
+ static const relpipe::common::type::StringX OPTION_STREAM;
+ static const relpipe::common::type::StringX OPTION_CONNECTION_STRING;
+ static const relpipe::common::type::StringX OPTION_CONNECTION_OPTION;
+ static const relpipe::common::type::StringX OPTION_MESSAGE_COUNT;
- Configuration parse(const std::vector<relpipe::writer::string_t>& arguments) {
+ Configuration parse(const std::vector<relpipe::common::type::StringX>& arguments) {
Configuration c;
for (int i = 0; i < arguments.size();) {
- relpipe::writer::string_t option = readNext(arguments, i);
+ relpipe::common::type::StringX option = readNext(arguments, i);
if (option == OPTION_RELATION) {
c.relation = readNext(arguments, i);
} else if (option == OPTION_STREAM) {
c.stream = readNext(arguments, i);
+ } else if (option == OPTION_CONNECTION_STRING) {
+ c.connectionString = readNext(arguments, i);
+ } else if (option == OPTION_CONNECTION_OPTION) {
+ auto name = readNext(arguments, i);
+ auto value = readNext(arguments, i);
+ c.connectionOptions.push_back({name, value});
} else if (option == OPTION_MESSAGE_COUNT) {
c.messageCount = std::stoull(readNext(arguments, i));
} else throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
@@ -65,9 +73,11 @@
}
};
-const relpipe::writer::string_t CLIParser::OPTION_RELATION = L"--relation";
-const relpipe::writer::string_t CLIParser::OPTION_STREAM = L"--stream";
-const relpipe::writer::string_t CLIParser::OPTION_MESSAGE_COUNT = L"--message-count";
+const relpipe::common::type::StringX CLIParser::OPTION_RELATION = L"--relation";
+const relpipe::common::type::StringX CLIParser::OPTION_STREAM = L"--stream";
+const relpipe::common::type::StringX CLIParser::OPTION_CONNECTION_STRING = L"--connection-string";
+const relpipe::common::type::StringX CLIParser::OPTION_CONNECTION_OPTION = L"--connection-option";
+const relpipe::common::type::StringX CLIParser::OPTION_MESSAGE_COUNT = L"--message-count";
}
}
--- a/src/Configuration.h Sun May 08 21:42:14 2022 +0200
+++ b/src/Configuration.h Sun Jun 05 22:51:45 2022 +0200
@@ -29,14 +29,29 @@
class Configuration {
public:
+ class ConnectionOption {
+ public:
+ relpipe::common::type::StringX name;
+ relpipe::common::type::StringX value;
+
+ ConnectionOption() = default;
+
+ ConnectionOption(relpipe::common::type::StringX name, relpipe::common::type::StringX value) : name(name), value(value) {
+ }
+
+
+ };
+
relpipe::common::type::Integer messageCount = 1;
relpipe::common::type::StringX relation = L"message";
relpipe::common::type::StringX stream = L"relpipe";
-
+ relpipe::common::type::StringX connectionString = L"mqtt://localhost:1883";
+ std::vector<ConnectionOption> connectionOptions;
+
virtual ~Configuration() {
}
};
}
}
-}
\ No newline at end of file
+}
--- a/src/MQTTCommand.cpp Sun May 08 21:42:14 2022 +0200
+++ b/src/MQTTCommand.cpp Sun Jun 05 22:51:45 2022 +0200
@@ -24,6 +24,7 @@
#include <sstream>
#include <iomanip>
#include <random>
+#include <regex>
#include <mosquittopp.h>
@@ -57,16 +58,14 @@
*/
static std::string generateClientID() {
std::stringstream result;
- // result << "relpipe-in-";
std::string symbols("0123456789abcdef");
std::random_device dev;
std::mt19937 rng(dev());
- std::uniform_int_distribution<std::mt19937::result_type> dist6(0, symbols.size());
+ std::uniform_int_distribution<std::mt19937::result_type> dist(0, symbols.size());
- for (int i = 0; i < 8; i++) result << symbols[dist6(rng)];
+ for (int i = 0; i < 8; i++) result << symbols[dist(rng)];
- // std::cerr << "generated clien ID = " << result.str() << std::endl;
return result.str();
}
public:
@@ -91,6 +90,21 @@
};
+void MQTTCommand::parseConnectionString(const std::string& connectionString, std::string& hostname, int& port) {
+ std::regex pattern("mqtt:(//)?([^:]+)(:([0-9]+))?");
+ std::smatch match;
+ if (std::regex_match(connectionString, match, pattern)) {
+ hostname = match[2];
+ port = stoi(match[4]);
+ } else {
+ throw std::invalid_argument("Invalid connection string format. Expecting something like: mqtt://localhost:1883");
+ }
+}
+
+void MQTTCommand::check(std::string operation, int result) {
+ if (result) throw std::logic_error("mosquitto operation failed: " + operation + " = " + std::to_string(result));
+}
+
void MQTTCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) {
std::shared_ptr<MQTTClient> mq = std::make_shared<MQTTClient>(writer, configuration);
@@ -100,17 +114,33 @@
{L"data", TypeId::STRING}
}, true);
- mq->max_inflight_messages_set(1);
- mq->connect("localhost", 1883);
+ std::string connectionString = convertor.to_bytes(configuration.connectionString);
+ std::string username;
+ std::string password;
+ std::string hostname;
+ int port;
+
+ parseConnectionString(connectionString, hostname, port);
+
+ for (auto o : configuration.connectionOptions) {
+ if (o.name == L"username") username = convertor.to_bytes(o.value);
+ else if (o.name == L"password") password = convertor.to_bytes(o.value);
+ else throw std::invalid_argument("Unsupported connection option: " + convertor.to_bytes(o.name));
+ }
+
+ if (username.size()) check("set credentials", mq->username_pw_set(username.c_str(), password.c_str()));
+
+ check("set maximum inflight messages", mq->max_inflight_messages_set(1));
+ check("connect", mq->connect(hostname.c_str(), port));
int mid;
- mq->subscribe(&mid, convertor.to_bytes(configuration.stream).c_str());
+ check("substcribe", mq->subscribe(&mid, convertor.to_bytes(configuration.stream).c_str()));
//for (int i = configuration.messageCount; continueProcessing && i > 0; i--) {
for (int i = configuration.messageCount; continueProcessing && i > 0; i = i - mq->popMessageCount()) {
// std::cerr << "loop(): i=" << i << std::endl;
//mq->loop();
- mq->loop(1000, 1);
+ check("loop", mq->loop(1000, 1));
//mq->loop(1000, -1);
//mq->loop_forever();
//mq->loop_write();
--- a/src/MQTTCommand.h Sun May 08 21:42:14 2022 +0200
+++ b/src/MQTTCommand.h Sun Jun 05 22:51:45 2022 +0200
@@ -34,6 +34,8 @@
private:
std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
std::atomic<bool> continueProcessing{true};
+ static void parseConnectionString(const std::string& connectionString, std::string& hostname, int& port);
+ static void check(std::string operation, int result);
public:
virtual ~MQTTCommand();