# HG changeset patch # User František Kučera # Date 1651436852 -7200 # Node ID 27c11cea34deaa44c7aa0347240dd82e84b2f34c # Parent e5d547ab0c519ac62e10aab81f74b67f59a43eb5 first version diff -r e5d547ab0c51 -r 27c11cea34de bash-completion.sh --- a/bash-completion.sh Sun May 01 18:23:45 2022 +0200 +++ b/bash-completion.sh Sun May 01 22:27:32 2022 +0200 @@ -22,26 +22,13 @@ w2=${COMP_WORDS[COMP_CWORD-2]} w3=${COMP_WORDS[COMP_CWORD-3]} - DATA_TYPE=( - "string" - "integer" - "boolean" - ) - - BOOLEAN_VALUES=( - "true" - "false" - ) - if [[ "$w1" == "--relation" && "x$w0" == "x" ]]; then COMPREPLY=("''") - elif [[ "$w1" == "--unlink-on-close" ]]; then COMPREPLY=($(compgen -W "${BOOLEAN_VALUES[*]}" -- "$w0")) - elif [[ "$w1" == "--queue" && "x$w0" == "x" ]]; then COMPREPLY=("''") + elif [[ "$w1" == "--endpoint-url" && "x$w0" == "x" ]]; then COMPREPLY=("'tcp://*:1234'") elif [[ "$w1" == "--message-count" && "x$w0" == "x" ]]; then COMPREPLY=("1") else OPTIONS=( "--relation" - "--unlink-on-close" - "--queue" + "--endpoint-url" "--message-count" ) COMPREPLY=($(compgen -W "${OPTIONS[*]}" -- "$w0")) diff -r e5d547ab0c51 -r 27c11cea34de nbproject/configurations.xml --- a/nbproject/configurations.xml Sun May 01 18:23:45 2022 +0200 +++ b/nbproject/configurations.xml Sun May 01 22:27:32 2022 +0200 @@ -42,7 +42,6 @@ - ZeroMQ.h ZeroMQCommand.cpp relpipe-in-zeromq.cpp @@ -94,8 +93,6 @@ true - - @@ -134,8 +131,6 @@ true - - diff -r e5d547ab0c51 -r 27c11cea34de src/CLIParser.h --- a/src/CLIParser.h Sun May 01 18:23:45 2022 +0200 +++ b/src/CLIParser.h Sun May 01 22:27:32 2022 +0200 @@ -37,20 +37,10 @@ else throw relpipe::cli::RelpipeCLIException(L"Missing CLI argument" + (i > 0 ? (L" after " + arguments[i - 1]) : L""), relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); } - /** - * TODO: use a common method - */ - bool parseBoolean(const relpipe::common::type::StringX& value) { - if (value == L"true") return true; - else if (value == L"false") return false; - else throw relpipe::cli::RelpipeCLIException(L"Unable to parse boolean value: " + value + L" (expecting true or false)", relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); - } - public: static const relpipe::writer::string_t OPTION_RELATION; - static const relpipe::writer::string_t OPTION_UNLINK_ON_CLOSE; - static const relpipe::writer::string_t OPTION_QUEUE; + static const relpipe::writer::string_t OPTION_ENDPOINT_URL; static const relpipe::writer::string_t OPTION_MESSAGE_COUNT; Configuration parse(const std::vector& arguments) { @@ -58,13 +48,11 @@ for (int i = 0; i < arguments.size();) { relpipe::writer::string_t option = readNext(arguments, i); - + if (option == OPTION_RELATION) { c.relation = readNext(arguments, i); - } else if (option == OPTION_UNLINK_ON_CLOSE) { - c.unlinkOnClose = parseBoolean(readNext(arguments, i)); - } else if (option == OPTION_QUEUE) { - c.queue = readNext(arguments, i); + } else if (option == OPTION_ENDPOINT_URL) { + c.endpointUrl = readNext(arguments, i); } else if (option == OPTION_MESSAGE_COUNT) { c.messageCount = std::stoull(readNext(arguments, i)); } else throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); @@ -78,8 +66,7 @@ }; const relpipe::writer::string_t CLIParser::OPTION_RELATION = L"--relation"; -const relpipe::writer::string_t CLIParser::OPTION_UNLINK_ON_CLOSE = L"--unlink-on-close"; -const relpipe::writer::string_t CLIParser::OPTION_QUEUE = L"--queue"; +const relpipe::writer::string_t CLIParser::OPTION_ENDPOINT_URL = L"--endpoint-url"; const relpipe::writer::string_t CLIParser::OPTION_MESSAGE_COUNT = L"--message-count"; } diff -r e5d547ab0c51 -r 27c11cea34de src/CMakeLists.txt --- a/src/CMakeLists.txt Sun May 01 18:23:45 2022 +0200 +++ b/src/CMakeLists.txt Sun May 01 22:27:32 2022 +0200 @@ -17,7 +17,7 @@ # Relpipe libraries: INCLUDE(FindPkgConfig) -pkg_check_modules (RELPIPE_LIBS relpipe-lib-writer.cpp relpipe-lib-cli.cpp) +pkg_check_modules (RELPIPE_LIBS relpipe-lib-writer.cpp relpipe-lib-cli.cpp libzmq) include_directories(${RELPIPE_LIBS_INCLUDE_DIRS}) link_directories(${RELPIPE_LIBS_LIBRARY_DIRS}) @@ -34,7 +34,7 @@ ) # Link libraries: -target_link_libraries(${EXECUTABLE_FILE} ${RELPIPE_LIBS_LIBRARIES} rt) +target_link_libraries(${EXECUTABLE_FILE} ${RELPIPE_LIBS_LIBRARIES}) set_property(TARGET ${EXECUTABLE_FILE} PROPERTY INSTALL_RPATH_USE_LINK_PATH TRUE) install(TARGETS ${EXECUTABLE_FILE} DESTINATION bin) diff -r e5d547ab0c51 -r 27c11cea34de src/Configuration.h --- a/src/Configuration.h Sun May 01 18:23:45 2022 +0200 +++ b/src/Configuration.h Sun May 01 22:27:32 2022 +0200 @@ -29,10 +29,9 @@ class Configuration { public: + relpipe::common::type::StringX relation = L"message"; relpipe::common::type::Integer messageCount = 1; - relpipe::common::type::StringX relation = L"zeromq"; - relpipe::common::type::StringX queue = L"/relpipe"; - relpipe::common::type::Boolean unlinkOnClose = false; + relpipe::common::type::StringX endpointUrl = L"tcp://*:1234"; virtual ~Configuration() { } diff -r e5d547ab0c51 -r 27c11cea34de src/ZeroMQ.h --- a/src/ZeroMQ.h Sun May 01 18:23:45 2022 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,70 +0,0 @@ -/** - * Relational pipes - * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info) - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, version 3 of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -#pragma once - -#include -#include -#include -#include - -namespace relpipe { -namespace in { -namespace zeromq { - -class ZeroMQ { -private: - const static size_t MSG_SIZE = 8192; // TODO: configurable/dynamic - std::string queueName; - mqd_t handle = -2; - bool unlinkOnClose = false; - - ZeroMQ(std::string queueName, mqd_t handle, bool unlinkOnClose) : queueName(queueName), handle(handle), unlinkOnClose(unlinkOnClose) { - } - -public: - - virtual ~ZeroMQ() { - if (handle >= 0) mq_close(handle); - if (unlinkOnClose) mq_unlink(queueName.c_str()); - } - - static ZeroMQ* open(std::string queueName, bool unlinkOnClose = false) { - mqd_t handle = mq_open(queueName.c_str(), O_RDONLY | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); - if (handle >= 0) return new ZeroMQ(queueName, handle, unlinkOnClose); - else throw std::logic_error("Unable to open ZeroMQ: " + queueName + " error: " + strerror(errno)); - } - - void send(std::string message) { - int result = mq_send(handle, message.c_str(), message.size(), 0); - if (result) throw std::logic_error("Unable to send message to" + queueName + " error: " + strerror(errno)); - } - - std::string receive() { - char buffer[MSG_SIZE + 1]; - memset(buffer, 0, MSG_SIZE + 1); - ssize_t msgSize = mq_receive(handle, buffer, MSG_SIZE, nullptr); - - if (msgSize > sizeof (buffer))throw std::logic_error("Invalid ZeroMQ message size."); - else if (msgSize >= 0) return std::string(buffer, msgSize); - else throw std::logic_error("Unable to receive ZeroMQ message from " + queueName + " error: " + strerror(errno)); - } - -}; - -} -} -} diff -r e5d547ab0c51 -r 27c11cea34de src/ZeroMQCommand.cpp --- a/src/ZeroMQCommand.cpp Sun May 01 18:23:45 2022 +0200 +++ b/src/ZeroMQCommand.cpp Sun May 01 22:27:32 2022 +0200 @@ -22,7 +22,8 @@ #include #include #include -#include + +#include #include #include @@ -33,7 +34,6 @@ #include #include "ZeroMQCommand.h" -#include "ZeroMQ.h" #include "Hex.h" using namespace std; @@ -45,21 +45,20 @@ namespace zeromq { void ZeroMQCommand::process(std::shared_ptr writer, Configuration& configuration) { - vector metadata; - - std::shared_ptr mq(ZeroMQ::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose)); + zmq::context_t zmqContext; + zmq::socket_t zmqSocket(zmqContext, zmq::socket_type::pull); + zmqSocket.bind(convertor.to_bytes(configuration.endpointUrl)); writer->startRelation(configuration.relation,{ - {L"queue", TypeId::STRING}, {L"text", TypeId::STRING}, {L"data", TypeId::STRING} }, true); for (int i = configuration.messageCount; continueProcessing && i > 0; i--) { - // TODO: maybe rather call mq_timedreceive() inside and check continueProcessing (to be able to stop even when no messages are comming) - std::string message = mq->receive(); + zmq::message_t msg; + zmqSocket.recv(&msg, 0); // FIXME: check return value + std::string message(msg.data(), msg.size()); - writer->writeAttribute(configuration.queue); writer->writeAttribute(Hex::toTxt(message)); writer->writeAttribute(Hex::toHex(message)); }