# HG changeset patch # User František Kučera # Date 1658870544 -7200 # Node ID d93ea7346b66d9529133bcb5fcdf73b33fe4e50c # Parent e8f15f432efca7109c3e54b0600ff64a287dec25 first version: TCP-only + fixed parameters diff -r e8f15f432efc -r d93ea7346b66 src/Configuration.h --- a/src/Configuration.h Sun May 01 18:42:53 2022 +0200 +++ b/src/Configuration.h Tue Jul 26 23:22:24 2022 +0200 @@ -30,7 +30,7 @@ public: relpipe::common::type::Integer messageCount = 1; - relpipe::common::type::StringX relation = L"socket"; + relpipe::common::type::StringX relation = L"message"; relpipe::common::type::StringX queue = L"/relpipe"; relpipe::common::type::Boolean unlinkOnClose = false; diff -r e8f15f432efc -r d93ea7346b66 src/Socket.h --- a/src/Socket.h Sun May 01 18:42:53 2022 +0200 +++ b/src/Socket.h Tue Jul 26 23:22:24 2022 +0200 @@ -16,10 +16,14 @@ */ #pragma once -#include #include +#include +#include #include -#include +#include +#include +#include +#include namespace relpipe { namespace in { @@ -28,39 +32,36 @@ class Socket { private: const static size_t MSG_SIZE = 8192; // TODO: configurable/dynamic - std::string queueName; - mqd_t handle = -2; - bool unlinkOnClose = false; - - Socket(std::string queueName, mqd_t handle, bool unlinkOnClose) : queueName(queueName), handle(handle), unlinkOnClose(unlinkOnClose) { - } public: virtual ~Socket() { - if (handle >= 0) mq_close(handle); - if (unlinkOnClose) mq_unlink(queueName.c_str()); - } - - static Socket* open(std::string queueName, bool unlinkOnClose = false) { - mqd_t handle = mq_open(queueName.c_str(), O_RDONLY | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); - if (handle >= 0) return new Socket(queueName, handle, unlinkOnClose); - else throw std::logic_error("Unable to open Socket: " + queueName + " error: " + strerror(errno)); - } - - void send(std::string message) { - int result = mq_send(handle, message.c_str(), message.size(), 0); - if (result) throw std::logic_error("Unable to send message to" + queueName + " error: " + strerror(errno)); } std::string receive() { char buffer[MSG_SIZE + 1]; memset(buffer, 0, MSG_SIZE + 1); - ssize_t msgSize = mq_receive(handle, buffer, MSG_SIZE, nullptr); + + int s = ::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + + struct sockaddr_in a; + memset((char *) &a, 0, sizeof (a)); + a.sin_family = AF_INET; + a.sin_addr.s_addr = inet_addr("127.0.0.1"); // TODO: use getaddrinfo() instead (because of error -1 = 255.255.255.255) + a.sin_port = htons(1234); + ::bind(s, (sockaddr*) & a, sizeof (a)); + + struct sockaddr_in remoteAddress; + memset((char *) &remoteAddress, 0, sizeof (remoteAddress)); + socklen_t remoteAddressSize = sizeof(remoteAddress); + + ssize_t msgSize = recvfrom(s, buffer, sizeof (buffer), 0, (sockaddr*) & remoteAddress, &remoteAddressSize); + + close(s); if (msgSize > sizeof (buffer))throw std::logic_error("Invalid Socket message size."); else if (msgSize >= 0) return std::string(buffer, msgSize); - else throw std::logic_error("Unable to receive Socket message from " + queueName + " error: " + strerror(errno)); + else throw std::logic_error("Unable to receive Socket message the socket; error: " + std::string(strerror(errno))); } }; diff -r e8f15f432efc -r d93ea7346b66 src/SocketCommand.cpp --- a/src/SocketCommand.cpp Sun May 01 18:42:53 2022 +0200 +++ b/src/SocketCommand.cpp Tue Jul 26 23:22:24 2022 +0200 @@ -47,7 +47,7 @@ void SocketCommand::process(std::shared_ptr writer, Configuration& configuration) { vector metadata; - std::shared_ptr mq(Socket::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose)); + std::shared_ptr socket = std::make_shared(); // TODO: create a TCP, UDP… socket writer->startRelation(configuration.relation,{ {L"queue", TypeId::STRING}, @@ -57,7 +57,7 @@ for (int i = configuration.messageCount; continueProcessing && i > 0; i--) { // TODO: maybe rather call mq_timedreceive() inside and check continueProcessing (to be able to stop even when no messages are comming) - std::string message = mq->receive(); + std::string message = socket->receive(); writer->writeAttribute(configuration.queue); writer->writeAttribute(Hex::toTxt(message));