# HG changeset patch # User František Kučera # Date 1658870538 -7200 # Node ID e3265afd1111c3d81b81e6c3e57b95e7119c5443 # Parent 924e354948dff20eb9d870dbff20909abac81006 first version: TCP-only + fixed parameters diff -r 924e354948df -r e3265afd1111 src/CMakeLists.txt --- a/src/CMakeLists.txt Sun May 01 18:42:50 2022 +0200 +++ b/src/CMakeLists.txt Tue Jul 26 23:22:18 2022 +0200 @@ -34,7 +34,7 @@ ) # Link libraries: -target_link_libraries(${EXECUTABLE_FILE} ${RELPIPE_LIBS_LIBRARIES} rt) +target_link_libraries(${EXECUTABLE_FILE} ${RELPIPE_LIBS_LIBRARIES}) set_property(TARGET ${EXECUTABLE_FILE} PROPERTY INSTALL_RPATH_USE_LINK_PATH TRUE) install(TARGETS ${EXECUTABLE_FILE} DESTINATION bin) diff -r 924e354948df -r e3265afd1111 src/Socket.h --- a/src/Socket.h Sun May 01 18:42:50 2022 +0200 +++ b/src/Socket.h Tue Jul 26 23:22:18 2022 +0200 @@ -16,10 +16,14 @@ */ #pragma once -#include #include +#include +#include #include -#include +#include +#include +#include +#include namespace relpipe { namespace out { @@ -27,41 +31,28 @@ class Socket { private: - const static size_t MSG_SIZE = 8192; // TODO: configurable/dynamic - std::string queueName; - mqd_t handle = -2; - bool unlinkOnClose = false; - - Socket(std::string queueName, mqd_t handle, bool unlinkOnClose) : queueName(queueName), handle(handle), unlinkOnClose(unlinkOnClose) { - } - public: virtual ~Socket() { - if (handle >= 0) mq_close(handle); - if (unlinkOnClose) mq_unlink(queueName.c_str()); - } - - static Socket* open(std::string queueName, bool unlinkOnClose = false) { - mqd_t handle = mq_open(queueName.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); - if (handle >= 0) return new Socket(queueName, handle, unlinkOnClose); - else throw std::logic_error("Unable to open Socket: " + queueName + " error: " + strerror(errno)); } - void send(std::string message) { - int result = mq_send(handle, message.c_str(), message.size(), 0); - if (result) throw std::logic_error("Unable to send message to" + queueName + " error: " + strerror(errno)); - } + void send(const std::string& message) { + + struct sockaddr_in a; + memset((char *) &a, 0, sizeof (a)); + a.sin_family = AF_INET; + a.sin_addr.s_addr = inet_addr("127.0.0.1"); // TODO: use getaddrinfo() instead (because of error -1 = 255.255.255.255) + a.sin_port = htons(1234); - std::string receive() { - char buffer[MSG_SIZE + 1]; - memset(buffer, 0, MSG_SIZE + 1); - ssize_t msgSize = mq_receive(handle, buffer, MSG_SIZE, nullptr); + int s = ::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + sendto(s, message.c_str(), message.size(), 0, (sockaddr*) & a, sizeof (a)); + + close(s); - if (msgSize > sizeof (buffer))throw std::logic_error("Invalid Socket message size."); - else if (msgSize >= 0) return std::string(buffer, msgSize); - else throw std::logic_error("Unable to receive Socket message from " + queueName + " error: " + strerror(errno)); + // TODO: send message } + + // virtual const std::string receive(); }; diff -r 924e354948df -r e3265afd1111 src/SocketHandler.h --- a/src/SocketHandler.h Sun May 01 18:42:50 2022 +0200 +++ b/src/SocketHandler.h Tue Jul 26 23:22:18 2022 +0200 @@ -41,7 +41,7 @@ private: std::wstring_convert> convertor; // TODO: support also other encodings. Configuration configuration; - shared_ptr mq; + shared_ptr socket; struct CurrentRelation { relpipe::common::type::StringX name; @@ -54,7 +54,7 @@ SocketHandler(Configuration configuration) : configuration(configuration) { // TODO: do not throw exception from the constructor: Socket::open() - mq.reset(Socket::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose)); + socket = std::make_shared(); // TODO: create a TCP, UDP… socket } void startRelation(relpipe::common::type::StringX name, std::vector attributes) override { @@ -72,7 +72,7 @@ currentRelation.attributeIndex++; if (currentRelation.attributeIndex == currentRelation.attributes.size()) { currentRelation.attributeIndex = 0; - mq->send(currentRelation.currentValue); + socket->send(currentRelation.currentValue); } }