# HG changeset patch # User František Kučera # Date 1646425808 -3600 # Node ID be6f2e307a651c32f1015eb20aa175925f5fab28 # Parent fc9911b1d29518ec931c2cefbd130560d35c8818 configuration + option: --unlink-on-close diff -r fc9911b1d295 -r be6f2e307a65 bash-completion.sh --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/bash-completion.sh Fri Mar 04 21:30:08 2022 +0100 @@ -0,0 +1,49 @@ +# Relational pipes +# Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +_relpipe_out_posixmq_completion() { + local w0 w1 w2 w3 + + COMPREPLY=() + w0=${COMP_WORDS[COMP_CWORD]} + w1=${COMP_WORDS[COMP_CWORD-1]} + w2=${COMP_WORDS[COMP_CWORD-2]} + w3=${COMP_WORDS[COMP_CWORD-3]} + + DATA_TYPE=( + "string" + "integer" + "boolean" + ) + + BOOLEAN_VALUES=( + "true" + "false" + ) + + if [[ "$w1" == "--relation" && "x$w0" == "x" ]]; then COMPREPLY=("''") + elif [[ "$w1" == "--unlink-on-close" ]]; then COMPREPLY=($(compgen -W "${BOOLEAN_VALUES[*]}" -- "$w0")) + elif [[ "$w1" == "--queue" && "x$w0" == "x" ]]; then COMPREPLY=("''") + else + OPTIONS=( + "--relation" + "--unlink-on-close" + "--queue" + ) + COMPREPLY=($(compgen -W "${OPTIONS[*]}" -- "$w0")) + fi +} + +complete -F _relpipe_out_posixmq_completion relpipe-out-posixmq diff -r fc9911b1d295 -r be6f2e307a65 src/CLIParser.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/CLIParser.h Fri Mar 04 21:30:08 2022 +0100 @@ -0,0 +1,83 @@ +/** + * Relational pipes + * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#pragma once + +#include +#include + +#include +#include +#include + +#include "Configuration.h" + +namespace relpipe { +namespace out { +namespace posixmq { + +class CLIParser { +private: + + relpipe::common::type::StringX readNext(const std::vector& arguments, int& i) { + if (i < arguments.size()) return arguments[i++]; + else throw relpipe::cli::RelpipeCLIException(L"Missing CLI argument" + (i > 0 ? (L" after " + arguments[i - 1]) : L""), relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); + } + + /** + * TODO: use a common method + */ + bool parseBoolean(const relpipe::common::type::StringX& value) { + if (value == L"true") return true; + else if (value == L"false") return false; + else throw relpipe::cli::RelpipeCLIException(L"Unable to parse boolean value: " + value + L" (expecting true or false)", relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); + } + +public: + + static const relpipe::common::type::StringX OPTION_RELATION; + static const relpipe::common::type::StringX OPTION_UNLINK_ON_CLOSE; + static const relpipe::common::type::StringX OPTION_QUEUE; + + Configuration parse(const std::vector& arguments) { + Configuration c; + + for (int i = 0; i < arguments.size();) { + relpipe::common::type::StringX option = readNext(arguments, i); + + if (option == OPTION_RELATION) { + c.relation = readNext(arguments, i); + } else if (option == OPTION_UNLINK_ON_CLOSE) { + c.unlinkOnClose = parseBoolean(readNext(arguments, i)); + } else if (option == OPTION_QUEUE) { + c.queue = readNext(arguments, i); + } else throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); + } + + return c; + } + + virtual ~CLIParser() { + } +}; + +const relpipe::common::type::StringX CLIParser::OPTION_RELATION = L"--relation"; +const relpipe::common::type::StringX CLIParser::OPTION_UNLINK_ON_CLOSE = L"--unlink-on-close"; +const relpipe::common::type::StringX CLIParser::OPTION_QUEUE = L"--queue"; + +} +} +} diff -r fc9911b1d295 -r be6f2e307a65 src/Configuration.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/Configuration.h Fri Mar 04 21:30:08 2022 +0100 @@ -0,0 +1,42 @@ +/** + * Relational pipes + * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#pragma once + +#include +#include + +#include + + +namespace relpipe { +namespace out { +namespace posixmq { + +class Configuration { +public: + + relpipe::common::type::StringX relation = L"posixmq"; + relpipe::common::type::StringX queue = L"/relpipe"; + relpipe::common::type::Boolean unlinkOnClose = false; + + virtual ~Configuration() { + } +}; + +} +} +} \ No newline at end of file diff -r fc9911b1d295 -r be6f2e307a65 src/PosixMQ.h --- a/src/PosixMQ.h Fri Mar 04 01:40:50 2022 +0100 +++ b/src/PosixMQ.h Fri Mar 04 21:30:08 2022 +0100 @@ -27,28 +27,30 @@ class PosixMQ { private: - size_t MSG_SIZE = 8192; // TODO: configurable/dynamic + const static size_t MSG_SIZE = 8192; // TODO: configurable/dynamic std::string queueName; mqd_t handle = -2; + bool unlinkOnClose = false; - PosixMQ(std::string queueName, mqd_t handle) : queueName(queueName), handle(handle) { + PosixMQ(std::string queueName, mqd_t handle, bool unlinkOnClose) : queueName(queueName), handle(handle), unlinkOnClose(unlinkOnClose) { } public: virtual ~PosixMQ() { if (handle >= 0) mq_close(handle); + if (unlinkOnClose) mq_unlink(queueName.c_str()); } - static PosixMQ* open(std::string queueName) { - mqd_t handle = mq_open(queueName.c_str(), O_RDWR | O_CREAT); - if (handle >= 0) return new PosixMQ(queueName, handle); + static PosixMQ* open(std::string queueName, bool unlinkOnClose = false) { + mqd_t handle = mq_open(queueName.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); + if (handle >= 0) return new PosixMQ(queueName, handle, unlinkOnClose); else throw std::logic_error("Unable to open PosixMQ: " + queueName + " error: " + strerror(errno)); } void send(std::string message) { int result = mq_send(handle, message.c_str(), message.size(), 0); - if (result) throw std::logic_error("mq_send() = " + std::to_string(result) + " error: " + strerror(errno)); + if (result) throw std::logic_error("Unable to send message to" + queueName + " error: " + strerror(errno)); } std::string receive() { @@ -60,10 +62,6 @@ else throw std::logic_error("Unable to receive PosixMQ message from " + queueName + " error: " + strerror(errno)); } - void unlink() { - mq_unlink(queueName.c_str()); - } - }; } diff -r fc9911b1d295 -r be6f2e307a65 src/PosixMQHandler.h --- a/src/PosixMQHandler.h Fri Mar 04 01:40:50 2022 +0100 +++ b/src/PosixMQHandler.h Fri Mar 04 21:30:08 2022 +0100 @@ -30,6 +30,7 @@ #include #include "PosixMQ.h" +#include "Configuration.h" namespace relpipe { namespace out { @@ -38,14 +39,13 @@ class PosixMQHandler : public relpipe::reader::handlers::RelationalReaderStringHandler { private: std::wstring_convert> convertor; // TODO: support also other encodings. + Configuration configuration; shared_ptr mq; public: - PosixMQHandler(std::ostream& output) { - relpipe::common::type::StringX queueName = L"/relpipe"; - mq.reset(PosixMQ::open(convertor.to_bytes(queueName))); - + PosixMQHandler(Configuration configuration) : configuration(configuration) { + mq.reset(PosixMQ::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose)); } void startRelation(relpipe::common::type::StringX name, std::vector attributes) override { diff -r fc9911b1d295 -r be6f2e307a65 src/relpipe-out-posixmq.cpp --- a/src/relpipe-out-posixmq.cpp Fri Mar 04 01:40:50 2022 +0100 +++ b/src/relpipe-out-posixmq.cpp Fri Mar 04 21:30:08 2022 +0100 @@ -26,25 +26,28 @@ #include #include "PosixMQHandler.h" +#include "CLIParser.h" +#include "Configuration.h" using namespace relpipe::cli; using namespace relpipe::reader; using namespace relpipe::out::posixmq; int main(int argc, char**argv) { - CLI cli(argc, argv); + setlocale(LC_ALL, ""); CLI::untieStdIO(); - + CLI cli(argc, argv); + int resultCode = CLI::EXIT_CODE_UNEXPECTED_ERROR; try { + CLIParser cliParser; + Configuration configuration = cliParser.parse(cli.arguments()); std::shared_ptr reader(Factory::create(std::cin)); - PosixMQHandler handler(std::cout); + PosixMQHandler handler(configuration); reader->addHandler(&handler); reader->process(); - resultCode = CLI::EXIT_CODE_SUCCESS; - } catch (RelpipeCLIException e) { fwprintf(stderr, L"Caught CLI exception: %ls\n", e.getMessage().c_str()); fwprintf(stderr, L"Debug: Input stream: eof=%ls, lastRead=%d\n", (cin.eof() ? L"true" : L"false"), cin.gcount());