configuration + option: --unlink-on-close v_0
authorFrantišek Kučera <franta-hg@frantovo.cz>
Fri, 04 Mar 2022 21:30:08 +0100
branchv_0
changeset 3 be6f2e307a65
parent 2 fc9911b1d295
child 4 8a5b86415d80
configuration + option: --unlink-on-close
bash-completion.sh
src/CLIParser.h
src/Configuration.h
src/PosixMQ.h
src/PosixMQHandler.h
src/relpipe-out-posixmq.cpp
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/bash-completion.sh	Fri Mar 04 21:30:08 2022 +0100
@@ -0,0 +1,49 @@
+# Relational pipes
+# Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info)
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, version 3 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+_relpipe_out_posixmq_completion() {
+	local w0 w1 w2 w3
+
+	COMPREPLY=()
+	w0=${COMP_WORDS[COMP_CWORD]}
+	w1=${COMP_WORDS[COMP_CWORD-1]}
+	w2=${COMP_WORDS[COMP_CWORD-2]}
+	w3=${COMP_WORDS[COMP_CWORD-3]}
+
+	DATA_TYPE=(
+		"string"
+		"integer"
+		"boolean"
+	)
+
+	BOOLEAN_VALUES=(
+		"true"
+		"false"
+	)
+
+	if   [[ "$w1" == "--relation"                      && "x$w0" == "x" ]];    then COMPREPLY=("''")
+	elif [[ "$w1" == "--unlink-on-close"                                ]];    then COMPREPLY=($(compgen -W "${BOOLEAN_VALUES[*]}" -- "$w0"))
+	elif [[ "$w1" == "--queue"                         && "x$w0" == "x" ]];    then COMPREPLY=("''")
+	else
+		OPTIONS=(
+			"--relation"
+			"--unlink-on-close"
+			"--queue"
+		)
+		COMPREPLY=($(compgen -W "${OPTIONS[*]}" -- "$w0"))
+	fi
+}
+
+complete -F _relpipe_out_posixmq_completion relpipe-out-posixmq
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/CLIParser.h	Fri Mar 04 21:30:08 2022 +0100
@@ -0,0 +1,83 @@
+/**
+ * Relational pipes
+ * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, version 3 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#pragma once
+
+#include <vector>
+#include <iostream>
+
+#include <relpipe/common/type/typedefs.h>
+#include <relpipe/cli/CLI.h>
+#include <relpipe/cli/RelpipeCLIException.h>
+
+#include "Configuration.h"
+
+namespace relpipe {
+namespace out {
+namespace posixmq {
+
+class CLIParser {
+private:
+
+	relpipe::common::type::StringX readNext(const std::vector<relpipe::common::type::StringX>& arguments, int& i) {
+		if (i < arguments.size()) return arguments[i++];
+		else throw relpipe::cli::RelpipeCLIException(L"Missing CLI argument" + (i > 0 ? (L" after " + arguments[i - 1]) : L""), relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
+	}
+
+	/**
+	 * TODO: use a common method
+	 */
+	bool parseBoolean(const relpipe::common::type::StringX& value) {
+		if (value == L"true") return true;
+		else if (value == L"false") return false;
+		else throw relpipe::cli::RelpipeCLIException(L"Unable to parse boolean value: " + value + L" (expecting true or false)", relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
+	}
+
+public:
+
+	static const relpipe::common::type::StringX OPTION_RELATION;
+	static const relpipe::common::type::StringX OPTION_UNLINK_ON_CLOSE;
+	static const relpipe::common::type::StringX OPTION_QUEUE;
+
+	Configuration parse(const std::vector<relpipe::common::type::StringX>& arguments) {
+		Configuration c;
+
+		for (int i = 0; i < arguments.size();) {
+			relpipe::common::type::StringX option = readNext(arguments, i);
+
+			if (option == OPTION_RELATION) {
+				c.relation = readNext(arguments, i);
+			} else if (option == OPTION_UNLINK_ON_CLOSE) {
+				c.unlinkOnClose = parseBoolean(readNext(arguments, i));
+			} else if (option == OPTION_QUEUE) {
+				c.queue = readNext(arguments, i);
+			} else throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
+		}
+
+		return c;
+	}
+
+	virtual ~CLIParser() {
+	}
+};
+
+const relpipe::common::type::StringX CLIParser::OPTION_RELATION = L"--relation";
+const relpipe::common::type::StringX CLIParser::OPTION_UNLINK_ON_CLOSE = L"--unlink-on-close";
+const relpipe::common::type::StringX CLIParser::OPTION_QUEUE = L"--queue";
+
+}
+}
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/Configuration.h	Fri Mar 04 21:30:08 2022 +0100
@@ -0,0 +1,42 @@
+/**
+ * Relational pipes
+ * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, version 3 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#pragma once
+
+#include <vector>
+#include <iostream>
+
+#include <relpipe/common/type/typedefs.h>
+
+
+namespace relpipe {
+namespace out {
+namespace posixmq {
+
+class Configuration {
+public:
+
+	relpipe::common::type::StringX relation = L"posixmq";
+	relpipe::common::type::StringX queue = L"/relpipe";
+	relpipe::common::type::Boolean unlinkOnClose = false;
+
+	virtual ~Configuration() {
+	}
+};
+
+}
+}
+}
\ No newline at end of file
--- a/src/PosixMQ.h	Fri Mar 04 01:40:50 2022 +0100
+++ b/src/PosixMQ.h	Fri Mar 04 21:30:08 2022 +0100
@@ -27,28 +27,30 @@
 
 class PosixMQ {
 private:
-	size_t MSG_SIZE = 8192; // TODO: configurable/dynamic
+	const static size_t MSG_SIZE = 8192; // TODO: configurable/dynamic
 	std::string queueName;
 	mqd_t handle = -2;
+	bool unlinkOnClose = false;
 
-	PosixMQ(std::string queueName, mqd_t handle) : queueName(queueName), handle(handle) {
+	PosixMQ(std::string queueName, mqd_t handle, bool unlinkOnClose) : queueName(queueName), handle(handle), unlinkOnClose(unlinkOnClose) {
 	}
 
 public:
 
 	virtual ~PosixMQ() {
 		if (handle >= 0) mq_close(handle);
+		if (unlinkOnClose) mq_unlink(queueName.c_str());
 	}
 
-	static PosixMQ* open(std::string queueName) {
-		mqd_t handle = mq_open(queueName.c_str(), O_RDWR | O_CREAT);
-		if (handle >= 0) return new PosixMQ(queueName, handle);
+	static PosixMQ* open(std::string queueName, bool unlinkOnClose = false) {
+		mqd_t handle = mq_open(queueName.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
+		if (handle >= 0) return new PosixMQ(queueName, handle, unlinkOnClose);
 		else throw std::logic_error("Unable to open PosixMQ: " + queueName + " error: " + strerror(errno));
 	}
 
 	void send(std::string message) {
 		int result = mq_send(handle, message.c_str(), message.size(), 0);
-		if (result) throw std::logic_error("mq_send() = " + std::to_string(result) + " error: " + strerror(errno));
+		if (result) throw std::logic_error("Unable to send message to" + queueName + " error: " + strerror(errno));
 	}
 
 	std::string receive() {
@@ -60,10 +62,6 @@
 		else throw std::logic_error("Unable to receive PosixMQ message from " + queueName + " error: " + strerror(errno));
 	}
 
-	void unlink() {
-		mq_unlink(queueName.c_str());
-	}
-
 };
 
 }
--- a/src/PosixMQHandler.h	Fri Mar 04 01:40:50 2022 +0100
+++ b/src/PosixMQHandler.h	Fri Mar 04 21:30:08 2022 +0100
@@ -30,6 +30,7 @@
 #include <relpipe/reader/handlers/AttributeMetadata.h>
 
 #include "PosixMQ.h"
+#include "Configuration.h"
 
 namespace relpipe {
 namespace out {
@@ -38,14 +39,13 @@
 class PosixMQHandler : public relpipe::reader::handlers::RelationalReaderStringHandler {
 private:
 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
+	Configuration configuration;
 	shared_ptr<PosixMQ> mq;
 
 public:
 
-	PosixMQHandler(std::ostream& output) {
-		relpipe::common::type::StringX queueName = L"/relpipe";
-		mq.reset(PosixMQ::open(convertor.to_bytes(queueName)));
-
+	PosixMQHandler(Configuration configuration) : configuration(configuration) {
+		mq.reset(PosixMQ::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose));
 	}
 
 	void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
--- a/src/relpipe-out-posixmq.cpp	Fri Mar 04 01:40:50 2022 +0100
+++ b/src/relpipe-out-posixmq.cpp	Fri Mar 04 21:30:08 2022 +0100
@@ -26,25 +26,28 @@
 #include <relpipe/reader/RelpipeReaderException.h>
 
 #include "PosixMQHandler.h"
+#include "CLIParser.h"
+#include "Configuration.h"
 
 using namespace relpipe::cli;
 using namespace relpipe::reader;
 using namespace relpipe::out::posixmq;
 
 int main(int argc, char**argv) {
-	CLI cli(argc, argv);
+	setlocale(LC_ALL, "");
 	CLI::untieStdIO();
-	
+	CLI cli(argc, argv);
+
 	int resultCode = CLI::EXIT_CODE_UNEXPECTED_ERROR;
 
 	try {
+		CLIParser cliParser;
+		Configuration configuration = cliParser.parse(cli.arguments());
 		std::shared_ptr<RelationalReader> reader(Factory::create(std::cin));
-		PosixMQHandler handler(std::cout);
+		PosixMQHandler handler(configuration);
 		reader->addHandler(&handler);
 		reader->process();
-
 		resultCode = CLI::EXIT_CODE_SUCCESS;
-
 	} catch (RelpipeCLIException e) {
 		fwprintf(stderr, L"Caught CLI exception: %ls\n", e.getMessage().c_str());
 		fwprintf(stderr, L"Debug: Input stream: eof=%ls, lastRead=%d\n", (cin.eof() ? L"true" : L"false"), cin.gcount());