# HG changeset patch # User František Kučera # Date 1651436862 -7200 # Node ID ccaed729f8d30ec07204e512098911aa6f9e5e0f # Parent 452d06d24ac27ceea9611c3f228a23b8480cb584 first version diff -r 452d06d24ac2 -r ccaed729f8d3 bash-completion.sh --- a/bash-completion.sh Sun May 01 18:24:24 2022 +0200 +++ b/bash-completion.sh Sun May 01 22:27:42 2022 +0200 @@ -22,25 +22,10 @@ w2=${COMP_WORDS[COMP_CWORD-2]} w3=${COMP_WORDS[COMP_CWORD-3]} - DATA_TYPE=( - "string" - "integer" - "boolean" - ) - - BOOLEAN_VALUES=( - "true" - "false" - ) - - if [[ "$w1" == "--relation" && "x$w0" == "x" ]]; then COMPREPLY=("''") - elif [[ "$w1" == "--unlink-on-close" ]]; then COMPREPLY=($(compgen -W "${BOOLEAN_VALUES[*]}" -- "$w0")) - elif [[ "$w1" == "--queue" && "x$w0" == "x" ]]; then COMPREPLY=("''") + if [[ "$w1" == "--endpoint-url" && "x$w0" == "x" ]]; then COMPREPLY=("'tcp://localhost:1234'") else OPTIONS=( - "--relation" - "--unlink-on-close" - "--queue" + "--endpoint-url" ) COMPREPLY=($(compgen -W "${OPTIONS[*]}" -- "$w0")) fi diff -r 452d06d24ac2 -r ccaed729f8d3 src/CLIParser.h --- a/src/CLIParser.h Sun May 01 18:24:24 2022 +0200 +++ b/src/CLIParser.h Sun May 01 22:27:42 2022 +0200 @@ -37,20 +37,9 @@ else throw relpipe::cli::RelpipeCLIException(L"Missing CLI argument" + (i > 0 ? (L" after " + arguments[i - 1]) : L""), relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); } - /** - * TODO: use a common method - */ - bool parseBoolean(const relpipe::common::type::StringX& value) { - if (value == L"true") return true; - else if (value == L"false") return false; - else throw relpipe::cli::RelpipeCLIException(L"Unable to parse boolean value: " + value + L" (expecting true or false)", relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); - } - public: - static const relpipe::common::type::StringX OPTION_RELATION; - static const relpipe::common::type::StringX OPTION_UNLINK_ON_CLOSE; - static const relpipe::common::type::StringX OPTION_QUEUE; + static const relpipe::common::type::StringX OPTION_ENDPOINT_URL; Configuration parse(const std::vector& arguments) { Configuration c; @@ -58,12 +47,8 @@ for (int i = 0; i < arguments.size();) { relpipe::common::type::StringX option = readNext(arguments, i); - if (option == OPTION_RELATION) { - c.relation = readNext(arguments, i); - } else if (option == OPTION_UNLINK_ON_CLOSE) { - c.unlinkOnClose = parseBoolean(readNext(arguments, i)); - } else if (option == OPTION_QUEUE) { - c.queue = readNext(arguments, i); + if (option == OPTION_ENDPOINT_URL) { + c.endpointUrl = readNext(arguments, i); } else throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); } @@ -74,9 +59,7 @@ } }; -const relpipe::common::type::StringX CLIParser::OPTION_RELATION = L"--relation"; -const relpipe::common::type::StringX CLIParser::OPTION_UNLINK_ON_CLOSE = L"--unlink-on-close"; -const relpipe::common::type::StringX CLIParser::OPTION_QUEUE = L"--queue"; +const relpipe::common::type::StringX CLIParser::OPTION_ENDPOINT_URL = L"--endpoint-url"; } } diff -r 452d06d24ac2 -r ccaed729f8d3 src/CMakeLists.txt --- a/src/CMakeLists.txt Sun May 01 18:24:24 2022 +0200 +++ b/src/CMakeLists.txt Sun May 01 22:27:42 2022 +0200 @@ -17,7 +17,7 @@ # Relpipe libraries: INCLUDE(FindPkgConfig) -pkg_check_modules (RELPIPE_LIBS relpipe-lib-reader.cpp relpipe-lib-cli.cpp) +pkg_check_modules (RELPIPE_LIBS relpipe-lib-reader.cpp relpipe-lib-cli.cpp libzmq) include_directories(${RELPIPE_LIBS_INCLUDE_DIRS}) link_directories(${RELPIPE_LIBS_LIBRARY_DIRS}) @@ -34,7 +34,7 @@ ) # Link libraries: -target_link_libraries(${EXECUTABLE_FILE} ${RELPIPE_LIBS_LIBRARIES} rt) +target_link_libraries(${EXECUTABLE_FILE} ${RELPIPE_LIBS_LIBRARIES}) set_property(TARGET ${EXECUTABLE_FILE} PROPERTY INSTALL_RPATH_USE_LINK_PATH TRUE) install(TARGETS ${EXECUTABLE_FILE} DESTINATION bin) diff -r 452d06d24ac2 -r ccaed729f8d3 src/Configuration.h --- a/src/Configuration.h Sun May 01 18:24:24 2022 +0200 +++ b/src/Configuration.h Sun May 01 22:27:42 2022 +0200 @@ -29,9 +29,7 @@ class Configuration { public: - relpipe::common::type::StringX relation = L"zeromq"; - relpipe::common::type::StringX queue = L"/relpipe"; - relpipe::common::type::Boolean unlinkOnClose = false; + relpipe::common::type::StringX endpointUrl = L"tcp://localhost:1234"; virtual ~Configuration() { } diff -r 452d06d24ac2 -r ccaed729f8d3 src/ZeroMQ.h --- a/src/ZeroMQ.h Sun May 01 18:24:24 2022 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,70 +0,0 @@ -/** - * Relational pipes - * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info) - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, version 3 of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -#pragma once - -#include -#include -#include -#include - -namespace relpipe { -namespace out { -namespace zeromq { - -class ZeroMQ { -private: - const static size_t MSG_SIZE = 8192; // TODO: configurable/dynamic - std::string queueName; - mqd_t handle = -2; - bool unlinkOnClose = false; - - ZeroMQ(std::string queueName, mqd_t handle, bool unlinkOnClose) : queueName(queueName), handle(handle), unlinkOnClose(unlinkOnClose) { - } - -public: - - virtual ~ZeroMQ() { - if (handle >= 0) mq_close(handle); - if (unlinkOnClose) mq_unlink(queueName.c_str()); - } - - static ZeroMQ* open(std::string queueName, bool unlinkOnClose = false) { - mqd_t handle = mq_open(queueName.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); - if (handle >= 0) return new ZeroMQ(queueName, handle, unlinkOnClose); - else throw std::logic_error("Unable to open ZeroMQ: " + queueName + " error: " + strerror(errno)); - } - - void send(std::string message) { - int result = mq_send(handle, message.c_str(), message.size(), 0); - if (result) throw std::logic_error("Unable to send message to" + queueName + " error: " + strerror(errno)); - } - - std::string receive() { - char buffer[MSG_SIZE + 1]; - memset(buffer, 0, MSG_SIZE + 1); - ssize_t msgSize = mq_receive(handle, buffer, MSG_SIZE, nullptr); - - if (msgSize > sizeof (buffer))throw std::logic_error("Invalid ZeroMQ message size."); - else if (msgSize >= 0) return std::string(buffer, msgSize); - else throw std::logic_error("Unable to receive ZeroMQ message from " + queueName + " error: " + strerror(errno)); - } - -}; - -} -} -} diff -r 452d06d24ac2 -r ccaed729f8d3 src/ZeroMQHandler.h --- a/src/ZeroMQHandler.h Sun May 01 18:24:24 2022 +0200 +++ b/src/ZeroMQHandler.h Sun May 01 22:27:42 2022 +0200 @@ -24,12 +24,13 @@ #include #include +#include + #include #include #include #include -#include "ZeroMQ.h" #include "Configuration.h" #include "Hex.h" @@ -41,7 +42,8 @@ private: std::wstring_convert> convertor; // TODO: support also other encodings. Configuration configuration; - shared_ptr mq; + zmq::context_t zmqContext; + zmq::socket_t zmqSocket; struct CurrentRelation { relpipe::common::type::StringX name; @@ -52,9 +54,8 @@ public: - ZeroMQHandler(Configuration configuration) : configuration(configuration) { - // TODO: do not throw exception from the constructor: ZeroMQ::open() - mq.reset(ZeroMQ::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose)); + ZeroMQHandler(Configuration configuration) : configuration(configuration), zmqSocket(zmqContext, zmq::socket_type::push) { + zmqSocket.connect(convertor.to_bytes(configuration.endpointUrl)); } void startRelation(relpipe::common::type::StringX name, std::vector attributes) override { @@ -72,7 +73,7 @@ currentRelation.attributeIndex++; if (currentRelation.attributeIndex == currentRelation.attributes.size()) { currentRelation.attributeIndex = 0; - mq->send(currentRelation.currentValue); + zmqSocket.send(currentRelation.currentValue.c_str(), currentRelation.currentValue.size(), 0); // FIXME: check return value } }