--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/bash-completion.sh Fri Mar 04 21:30:08 2022 +0100
@@ -0,0 +1,49 @@
+# Relational pipes
+# Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info)
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, version 3 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+_relpipe_out_posixmq_completion() {
+ local w0 w1 w2 w3
+
+ COMPREPLY=()
+ w0=${COMP_WORDS[COMP_CWORD]}
+ w1=${COMP_WORDS[COMP_CWORD-1]}
+ w2=${COMP_WORDS[COMP_CWORD-2]}
+ w3=${COMP_WORDS[COMP_CWORD-3]}
+
+ DATA_TYPE=(
+ "string"
+ "integer"
+ "boolean"
+ )
+
+ BOOLEAN_VALUES=(
+ "true"
+ "false"
+ )
+
+ if [[ "$w1" == "--relation" && "x$w0" == "x" ]]; then COMPREPLY=("''")
+ elif [[ "$w1" == "--unlink-on-close" ]]; then COMPREPLY=($(compgen -W "${BOOLEAN_VALUES[*]}" -- "$w0"))
+ elif [[ "$w1" == "--queue" && "x$w0" == "x" ]]; then COMPREPLY=("''")
+ else
+ OPTIONS=(
+ "--relation"
+ "--unlink-on-close"
+ "--queue"
+ )
+ COMPREPLY=($(compgen -W "${OPTIONS[*]}" -- "$w0"))
+ fi
+}
+
+complete -F _relpipe_out_posixmq_completion relpipe-out-posixmq
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/CLIParser.h Fri Mar 04 21:30:08 2022 +0100
@@ -0,0 +1,83 @@
+/**
+ * Relational pipes
+ * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, version 3 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#pragma once
+
+#include <vector>
+#include <iostream>
+
+#include <relpipe/common/type/typedefs.h>
+#include <relpipe/cli/CLI.h>
+#include <relpipe/cli/RelpipeCLIException.h>
+
+#include "Configuration.h"
+
+namespace relpipe {
+namespace out {
+namespace posixmq {
+
+class CLIParser {
+private:
+
+ relpipe::common::type::StringX readNext(const std::vector<relpipe::common::type::StringX>& arguments, int& i) {
+ if (i < arguments.size()) return arguments[i++];
+ else throw relpipe::cli::RelpipeCLIException(L"Missing CLI argument" + (i > 0 ? (L" after " + arguments[i - 1]) : L""), relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
+ }
+
+ /**
+ * TODO: use a common method
+ */
+ bool parseBoolean(const relpipe::common::type::StringX& value) {
+ if (value == L"true") return true;
+ else if (value == L"false") return false;
+ else throw relpipe::cli::RelpipeCLIException(L"Unable to parse boolean value: " + value + L" (expecting true or false)", relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
+ }
+
+public:
+
+ static const relpipe::common::type::StringX OPTION_RELATION;
+ static const relpipe::common::type::StringX OPTION_UNLINK_ON_CLOSE;
+ static const relpipe::common::type::StringX OPTION_QUEUE;
+
+ Configuration parse(const std::vector<relpipe::common::type::StringX>& arguments) {
+ Configuration c;
+
+ for (int i = 0; i < arguments.size();) {
+ relpipe::common::type::StringX option = readNext(arguments, i);
+
+ if (option == OPTION_RELATION) {
+ c.relation = readNext(arguments, i);
+ } else if (option == OPTION_UNLINK_ON_CLOSE) {
+ c.unlinkOnClose = parseBoolean(readNext(arguments, i));
+ } else if (option == OPTION_QUEUE) {
+ c.queue = readNext(arguments, i);
+ } else throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
+ }
+
+ return c;
+ }
+
+ virtual ~CLIParser() {
+ }
+};
+
+const relpipe::common::type::StringX CLIParser::OPTION_RELATION = L"--relation";
+const relpipe::common::type::StringX CLIParser::OPTION_UNLINK_ON_CLOSE = L"--unlink-on-close";
+const relpipe::common::type::StringX CLIParser::OPTION_QUEUE = L"--queue";
+
+}
+}
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Configuration.h Fri Mar 04 21:30:08 2022 +0100
@@ -0,0 +1,42 @@
+/**
+ * Relational pipes
+ * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, version 3 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#pragma once
+
+#include <vector>
+#include <iostream>
+
+#include <relpipe/common/type/typedefs.h>
+
+
+namespace relpipe {
+namespace out {
+namespace posixmq {
+
+class Configuration {
+public:
+
+ relpipe::common::type::StringX relation = L"posixmq";
+ relpipe::common::type::StringX queue = L"/relpipe";
+ relpipe::common::type::Boolean unlinkOnClose = false;
+
+ virtual ~Configuration() {
+ }
+};
+
+}
+}
+}
\ No newline at end of file
--- a/src/PosixMQ.h Fri Mar 04 01:40:50 2022 +0100
+++ b/src/PosixMQ.h Fri Mar 04 21:30:08 2022 +0100
@@ -27,28 +27,30 @@
class PosixMQ {
private:
- size_t MSG_SIZE = 8192; // TODO: configurable/dynamic
+ const static size_t MSG_SIZE = 8192; // TODO: configurable/dynamic
std::string queueName;
mqd_t handle = -2;
+ bool unlinkOnClose = false;
- PosixMQ(std::string queueName, mqd_t handle) : queueName(queueName), handle(handle) {
+ PosixMQ(std::string queueName, mqd_t handle, bool unlinkOnClose) : queueName(queueName), handle(handle), unlinkOnClose(unlinkOnClose) {
}
public:
virtual ~PosixMQ() {
if (handle >= 0) mq_close(handle);
+ if (unlinkOnClose) mq_unlink(queueName.c_str());
}
- static PosixMQ* open(std::string queueName) {
- mqd_t handle = mq_open(queueName.c_str(), O_RDWR | O_CREAT);
- if (handle >= 0) return new PosixMQ(queueName, handle);
+ static PosixMQ* open(std::string queueName, bool unlinkOnClose = false) {
+ mqd_t handle = mq_open(queueName.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
+ if (handle >= 0) return new PosixMQ(queueName, handle, unlinkOnClose);
else throw std::logic_error("Unable to open PosixMQ: " + queueName + " error: " + strerror(errno));
}
void send(std::string message) {
int result = mq_send(handle, message.c_str(), message.size(), 0);
- if (result) throw std::logic_error("mq_send() = " + std::to_string(result) + " error: " + strerror(errno));
+ if (result) throw std::logic_error("Unable to send message to" + queueName + " error: " + strerror(errno));
}
std::string receive() {
@@ -60,10 +62,6 @@
else throw std::logic_error("Unable to receive PosixMQ message from " + queueName + " error: " + strerror(errno));
}
- void unlink() {
- mq_unlink(queueName.c_str());
- }
-
};
}
--- a/src/PosixMQHandler.h Fri Mar 04 01:40:50 2022 +0100
+++ b/src/PosixMQHandler.h Fri Mar 04 21:30:08 2022 +0100
@@ -30,6 +30,7 @@
#include <relpipe/reader/handlers/AttributeMetadata.h>
#include "PosixMQ.h"
+#include "Configuration.h"
namespace relpipe {
namespace out {
@@ -38,14 +39,13 @@
class PosixMQHandler : public relpipe::reader::handlers::RelationalReaderStringHandler {
private:
std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
+ Configuration configuration;
shared_ptr<PosixMQ> mq;
public:
- PosixMQHandler(std::ostream& output) {
- relpipe::common::type::StringX queueName = L"/relpipe";
- mq.reset(PosixMQ::open(convertor.to_bytes(queueName)));
-
+ PosixMQHandler(Configuration configuration) : configuration(configuration) {
+ mq.reset(PosixMQ::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose));
}
void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
--- a/src/relpipe-out-posixmq.cpp Fri Mar 04 01:40:50 2022 +0100
+++ b/src/relpipe-out-posixmq.cpp Fri Mar 04 21:30:08 2022 +0100
@@ -26,25 +26,28 @@
#include <relpipe/reader/RelpipeReaderException.h>
#include "PosixMQHandler.h"
+#include "CLIParser.h"
+#include "Configuration.h"
using namespace relpipe::cli;
using namespace relpipe::reader;
using namespace relpipe::out::posixmq;
int main(int argc, char**argv) {
- CLI cli(argc, argv);
+ setlocale(LC_ALL, "");
CLI::untieStdIO();
-
+ CLI cli(argc, argv);
+
int resultCode = CLI::EXIT_CODE_UNEXPECTED_ERROR;
try {
+ CLIParser cliParser;
+ Configuration configuration = cliParser.parse(cli.arguments());
std::shared_ptr<RelationalReader> reader(Factory::create(std::cin));
- PosixMQHandler handler(std::cout);
+ PosixMQHandler handler(configuration);
reader->addHandler(&handler);
reader->process();
-
resultCode = CLI::EXIT_CODE_SUCCESS;
-
} catch (RelpipeCLIException e) {
fwprintf(stderr, L"Caught CLI exception: %ls\n", e.getMessage().c_str());
fwprintf(stderr, L"Debug: Input stream: eof=%ls, lastRead=%d\n", (cin.eof() ? L"true" : L"false"), cin.gcount());