# HG changeset patch # User František Kučera # Date 1646419396 -3600 # Node ID b71fc3b5e56b4bc539418ebb7267a4db158fa039 # Parent 1eef3d46586354769f4ac629f4e7962efed5cc2c new option: --unlink-on-close (delete the MQ file at the end) diff -r 1eef3d465863 -r b71fc3b5e56b bash-completion.sh --- a/bash-completion.sh Tue Mar 01 00:49:46 2022 +0100 +++ b/bash-completion.sh Fri Mar 04 19:43:16 2022 +0100 @@ -28,18 +28,19 @@ "boolean" ) - READ_TYPES=( - "auto" + BOOLEAN_VALUES=( "true" "false" ) if [[ "$w1" == "--relation" && "x$w0" == "x" ]]; then COMPREPLY=("''") + elif [[ "$w1" == "--unlink-on-close" ]]; then COMPREPLY=($(compgen -W "${BOOLEAN_VALUES[*]}" -- "$w0")) elif [[ "$w1" == "--queue" && "x$w0" == "x" ]]; then COMPREPLY=("''") elif [[ "$w1" == "--message-count" && "x$w0" == "x" ]]; then COMPREPLY=("1") else OPTIONS=( "--relation" + "--unlink-on-close" "--queue" "--message-count" ) diff -r 1eef3d465863 -r b71fc3b5e56b src/CLIParser.h --- a/src/CLIParser.h Tue Mar 01 00:49:46 2022 +0100 +++ b/src/CLIParser.h Fri Mar 04 19:43:16 2022 +0100 @@ -37,9 +37,19 @@ else throw relpipe::cli::RelpipeCLIException(L"Missing CLI argument" + (i > 0 ? (L" after " + arguments[i - 1]) : L""), relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); } + /** + * TODO: use a common method + */ + bool parseBoolean(const relpipe::common::type::StringX& value) { + if (value == L"true") return true; + else if (value == L"false") return false; + else throw relpipe::cli::RelpipeCLIException(L"Unable to parse boolean value: " + value + L" (expecting true or false)", relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); + } + public: static const relpipe::writer::string_t OPTION_RELATION; + static const relpipe::writer::string_t OPTION_UNLINK_ON_CLOSE; static const relpipe::writer::string_t OPTION_QUEUE; static const relpipe::writer::string_t OPTION_MESSAGE_COUNT; @@ -51,6 +61,8 @@ if (option == OPTION_RELATION) { c.relation = readNext(arguments, i); + } else if (option == OPTION_UNLINK_ON_CLOSE) { + c.unlinkOnClose = parseBoolean(readNext(arguments, i)); } else if (option == OPTION_QUEUE) { c.queue = readNext(arguments, i); } else if (option == OPTION_MESSAGE_COUNT) { @@ -66,6 +78,7 @@ }; const relpipe::writer::string_t CLIParser::OPTION_RELATION = L"--relation"; +const relpipe::writer::string_t CLIParser::OPTION_UNLINK_ON_CLOSE = L"--unlink-on-close"; const relpipe::writer::string_t CLIParser::OPTION_QUEUE = L"--queue"; const relpipe::writer::string_t CLIParser::OPTION_MESSAGE_COUNT = L"--message-count"; diff -r 1eef3d465863 -r b71fc3b5e56b src/Configuration.h --- a/src/Configuration.h Tue Mar 01 00:49:46 2022 +0100 +++ b/src/Configuration.h Fri Mar 04 19:43:16 2022 +0100 @@ -19,7 +19,7 @@ #include #include -#include +#include namespace relpipe { @@ -29,9 +29,10 @@ class Configuration { public: - relpipe::writer::integer_t messageCount = 1; - relpipe::writer::string_t relation = L"posixmq"; - relpipe::writer::string_t queue = L"relpipe"; + relpipe::common::type::Integer messageCount = 1; + relpipe::common::type::StringX relation = L"posixmq"; + relpipe::common::type::StringX queue = L"/relpipe"; + relpipe::common::type::Boolean unlinkOnClose = false; virtual ~Configuration() { } diff -r 1eef3d465863 -r b71fc3b5e56b src/PosixMQ.h --- a/src/PosixMQ.h Tue Mar 01 00:49:46 2022 +0100 +++ b/src/PosixMQ.h Fri Mar 04 19:43:16 2022 +0100 @@ -27,25 +27,32 @@ class PosixMQ { private: - size_t MSG_SIZE = 8192; // TODO: configurable/dynamic + const static size_t MSG_SIZE = 8192; // TODO: configurable/dynamic std::string queueName; mqd_t handle = -2; + bool unlinkOnClose = false; - PosixMQ(std::string queueName, mqd_t handle) : queueName(queueName), handle(handle) { + PosixMQ(std::string queueName, mqd_t handle, bool unlinkOnClose) : queueName(queueName), handle(handle), unlinkOnClose(unlinkOnClose) { } public: virtual ~PosixMQ() { if (handle >= 0) mq_close(handle); + if (unlinkOnClose) mq_unlink(queueName.c_str()); } - static PosixMQ* open(std::string queueName) { - mqd_t handle = mq_open(queueName.c_str(), O_RDONLY | O_CREAT); - if (handle >= 0) return new PosixMQ(queueName, handle); + static PosixMQ* open(std::string queueName, bool unlinkOnClose = false) { + mqd_t handle = mq_open(queueName.c_str(), O_RDONLY | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); + if (handle >= 0) return new PosixMQ(queueName, handle, unlinkOnClose); else throw std::logic_error("Unable to open PosixMQ: " + queueName + " error: " + strerror(errno)); } + void send(std::string message) { + int result = mq_send(handle, message.c_str(), message.size(), 0); + if (result) throw std::logic_error("Unable to send message to" + queueName + " error: " + strerror(errno)); + } + std::string receive() { char buffer[MSG_SIZE + 1]; memset(buffer, 0, MSG_SIZE + 1); @@ -55,10 +62,6 @@ else throw std::logic_error("Unable to receive PosixMQ message from " + queueName + " error: " + strerror(errno)); } - void unlink() { - mq_unlink(queueName.c_str()); - } - }; } diff -r 1eef3d465863 -r b71fc3b5e56b src/PosixMQCommand.cpp --- a/src/PosixMQCommand.cpp Tue Mar 01 00:49:46 2022 +0100 +++ b/src/PosixMQCommand.cpp Fri Mar 04 19:43:16 2022 +0100 @@ -44,7 +44,7 @@ void PosixMQCommand::process(std::shared_ptr writer, Configuration& configuration) { vector metadata; - std::shared_ptr mq(PosixMQ::open(convertor.to_bytes(configuration.queue))); + std::shared_ptr mq(PosixMQ::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose)); writer->startRelation(configuration.relation,{ {L"queue", TypeId::STRING}, diff -r 1eef3d465863 -r b71fc3b5e56b src/relpipe-in-posixmq.cpp --- a/src/relpipe-in-posixmq.cpp Tue Mar 01 00:49:46 2022 +0100 +++ b/src/relpipe-in-posixmq.cpp Fri Mar 04 19:43:16 2022 +0100 @@ -33,7 +33,6 @@ #include "CLIParser.h" #include "Configuration.h" -using namespace std; using namespace relpipe::cli; using namespace relpipe::writer; using namespace relpipe::in::posixmq;