# HG changeset patch # User František Kučera # Date 1646092069 -3600 # Node ID 291bdd97fcff54762256f169bd6fd69bfc415089 # Parent e8205d9206fb20b4f87b6a1b079be92e8b7dc935 early version, support text messages only diff -r e8205d9206fb -r 291bdd97fcff bash-completion.sh --- a/bash-completion.sh Sat Feb 26 01:21:14 2022 +0100 +++ b/bash-completion.sh Tue Mar 01 00:47:49 2022 +0100 @@ -35,14 +35,13 @@ ) if [[ "$w1" == "--relation" && "x$w0" == "x" ]]; then COMPREPLY=("''") - elif [[ "$w1" == "--attribute" && "x$w0" == "x" ]]; then COMPREPLY=("''") - elif [[ "$w2" == "--attribute" ]]; then COMPREPLY=($(compgen -W "${DATA_TYPE[*]}" -- "$w0")) - elif [[ "$w1" == "--read-types" ]]; then COMPREPLY=($(compgen -W "${READ_TYPES[*]}" -- "$w0")) + elif [[ "$w1" == "--queue" && "x$w0" == "x" ]]; then COMPREPLY=("''") + elif [[ "$w1" == "--message-count" && "x$w0" == "x" ]]; then COMPREPLY=("1") else OPTIONS=( "--relation" - "--attribute" - "--read-types" + "--queue" + "--message-count" ) COMPREPLY=($(compgen -W "${OPTIONS[*]}" -- "$w0")) fi diff -r e8205d9206fb -r 291bdd97fcff nbproject/configurations.xml --- a/nbproject/configurations.xml Sat Feb 26 01:21:14 2022 +0100 +++ b/nbproject/configurations.xml Tue Mar 01 00:47:49 2022 +0100 @@ -42,6 +42,7 @@ + PosixMQ.h PosixMQCommand.cpp relpipe-in-posixmq.cpp @@ -93,6 +94,8 @@ true + + @@ -131,6 +134,8 @@ true + + diff -r e8205d9206fb -r 291bdd97fcff src/CLIParser.h --- a/src/CLIParser.h Sat Feb 26 01:21:14 2022 +0100 +++ b/src/CLIParser.h Tue Mar 01 00:47:49 2022 +0100 @@ -37,38 +37,11 @@ else throw relpipe::cli::RelpipeCLIException(L"Missing CLI argument" + (i > 0 ? (L" after " + arguments[i - 1]) : L""), relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); } - /** - * TODO: use a common method - */ - bool parseBoolean(const relpipe::writer::string_t& value) { - if (value == L"true") return true; - else if (value == L"false") return false; - else throw relpipe::cli::RelpipeCLIException(L"Unable to parse boolean value: " + value + L" (expecting true or false)", relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); - } - - /** - * TODO: use a common method - */ - relpipe::writer::TypeId parseTypeId(const relpipe::writer::string_t& value) { - using t = relpipe::writer::TypeId; - if (value == L"string") return t::STRING; - else if (value == L"integer") return t::INTEGER; - else if (value == L"boolean") return t::BOOLEAN; - else throw relpipe::cli::RelpipeCLIException(L"Unable to parse TypeId: " + value, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); - } - - Configuration::ReadTypes parseReadTypes(const relpipe::writer::string_t& value) { - if (value == L"auto") return Configuration::ReadTypes::AUTO; - else if (value == L"true") return Configuration::ReadTypes::TRUE; - else if (value == L"false") return Configuration::ReadTypes::FALSE; - else throw relpipe::cli::RelpipeCLIException(L"Unable to parse ReadTypes: " + value, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); - } - public: static const relpipe::writer::string_t OPTION_RELATION; - static const relpipe::writer::string_t OPTION_ATTRIBUTE; - static const relpipe::writer::string_t OPTION_READ_TYPES; + static const relpipe::writer::string_t OPTION_QUEUE; + static const relpipe::writer::string_t OPTION_MESSAGE_COUNT; Configuration parse(const std::vector& arguments) { Configuration c; @@ -78,13 +51,10 @@ if (option == OPTION_RELATION) { c.relation = readNext(arguments, i); - } else if (option == OPTION_ATTRIBUTE) { - AttributeRecipe attribute; - attribute.name = readNext(arguments, i); - attribute.type = parseTypeId(readNext(arguments, i)); - c.attributes.push_back(attribute); - } else if (option == OPTION_READ_TYPES) { - c.readTypes = parseReadTypes(readNext(arguments, i)); + } else if (option == OPTION_QUEUE) { + c.queue = readNext(arguments, i); + } else if (option == OPTION_MESSAGE_COUNT) { + c.messageCount = std::stoull(readNext(arguments, i)); } else throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); } @@ -96,8 +66,8 @@ }; const relpipe::writer::string_t CLIParser::OPTION_RELATION = L"--relation"; -const relpipe::writer::string_t CLIParser::OPTION_ATTRIBUTE = L"--attribute"; -const relpipe::writer::string_t CLIParser::OPTION_READ_TYPES = L"--read-types"; +const relpipe::writer::string_t CLIParser::OPTION_QUEUE = L"--queue"; +const relpipe::writer::string_t CLIParser::OPTION_MESSAGE_COUNT = L"--message-count"; } } diff -r e8205d9206fb -r 291bdd97fcff src/CMakeLists.txt --- a/src/CMakeLists.txt Sat Feb 26 01:21:14 2022 +0100 +++ b/src/CMakeLists.txt Tue Mar 01 00:47:49 2022 +0100 @@ -34,7 +34,7 @@ ) # Link libraries: -target_link_libraries(${EXECUTABLE_FILE} ${RELPIPE_LIBS_LIBRARIES}) +target_link_libraries(${EXECUTABLE_FILE} ${RELPIPE_LIBS_LIBRARIES} rt) set_property(TARGET ${EXECUTABLE_FILE} PROPERTY INSTALL_RPATH_USE_LINK_PATH TRUE) install(TARGETS ${EXECUTABLE_FILE} DESTINATION bin) diff -r e8205d9206fb -r 291bdd97fcff src/Configuration.h --- a/src/Configuration.h Sat Feb 26 01:21:14 2022 +0100 +++ b/src/Configuration.h Tue Mar 01 00:47:49 2022 +0100 @@ -26,29 +26,12 @@ namespace in { namespace posixmq { -class AttributeRecipe { -public: - - virtual ~AttributeRecipe() { - } - - relpipe::writer::string_t name; - relpipe::writer::TypeId type; - -}; - class Configuration { public: - enum class ReadTypes { - AUTO, - TRUE, - FALSE, - }; - - ReadTypes readTypes = ReadTypes::AUTO; + relpipe::writer::integer_t messageCount = 1; relpipe::writer::string_t relation = L"posixmq"; - std::vector attributes; + relpipe::writer::string_t queue = L"relpipe"; virtual ~Configuration() { } diff -r e8205d9206fb -r 291bdd97fcff src/PosixMQ.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/PosixMQ.h Tue Mar 01 00:47:49 2022 +0100 @@ -0,0 +1,66 @@ +/** + * Relational pipes + * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#pragma once + +#include +#include +#include +#include + +namespace relpipe { +namespace in { +namespace posixmq { + +class PosixMQ { +private: + size_t MSG_SIZE = 8192; // TODO: configurable/dynamic + std::string queueName; + mqd_t handle = -2; + + PosixMQ(std::string queueName, mqd_t handle) : queueName(queueName), handle(handle) { + } + +public: + + virtual ~PosixMQ() { + if (handle >= 0) mq_close(handle); + } + + static PosixMQ* open(std::string queueName) { + mqd_t handle = mq_open(queueName.c_str(), O_RDONLY | O_CREAT); + if (handle >= 0) return new PosixMQ(queueName, handle); + else throw std::logic_error("Unable to open PosixMQ: " + queueName + " error: " + strerror(errno)); + } + + std::string receive() { + char buffer[MSG_SIZE + 1]; + memset(buffer, 0, MSG_SIZE + 1); + ssize_t msgSize = mq_receive(handle, buffer, MSG_SIZE, nullptr); + + if (msgSize >= 0) return std::string(buffer); + else throw std::logic_error("Unable to receive PosixMQ message from " + queueName + " error: " + strerror(errno)); + } + + void unlink() { + mq_unlink(queueName.c_str()); + } + +}; + +} +} +} diff -r e8205d9206fb -r 291bdd97fcff src/PosixMQCommand.cpp --- a/src/PosixMQCommand.cpp Sat Feb 26 01:21:14 2022 +0100 +++ b/src/PosixMQCommand.cpp Tue Mar 01 00:47:49 2022 +0100 @@ -31,6 +31,7 @@ #include #include "PosixMQCommand.h" +#include "PosixMQ.h" using namespace std; using namespace relpipe::cli; @@ -41,18 +42,25 @@ namespace posixmq { void PosixMQCommand::process(std::shared_ptr writer, Configuration& configuration) { - wstring_convert < codecvt_utf8> convertor; // UTF-8 is required for PosixMQ vector metadata; - writer->startRelation(L"posix_mq",{ + std::shared_ptr mq(PosixMQ::open(convertor.to_bytes(configuration.queue))); + + writer->startRelation(configuration.relation,{ + {L"queue", TypeId::STRING}, {L"message", TypeId::STRING} }, true); - - writer->writeAttribute(L"TODO: read messages from POSIX MQ"); + for (int i = configuration.messageCount; i > 0; i--) { + writer->writeAttribute(configuration.queue); + writer->writeAttribute(convertor.from_bytes(mq->receive())); + } } +PosixMQCommand::~PosixMQCommand() { +} + } } } \ No newline at end of file diff -r e8205d9206fb -r 291bdd97fcff src/PosixMQCommand.h --- a/src/PosixMQCommand.h Sat Feb 26 01:21:14 2022 +0100 +++ b/src/PosixMQCommand.h Tue Mar 01 00:47:49 2022 +0100 @@ -30,7 +30,11 @@ namespace posixmq { class PosixMQCommand { +private: + std::wstring_convert> convertor; // TODO: support also other encodings. public: + virtual ~PosixMQCommand(); + void process(std::shared_ptr writer, Configuration& configuration); };