# HG changeset patch # User František Kučera # Date 1661033810 -7200 # Node ID 2b57c8683ffe0b09522ccb003ee752103190ced8 # Parent 2665ab0bcf445d0c690e9fbb2f5b998a632749ab synchronize interface and common classes with relpipe-out-socket + implement basic UDPServerSocket::receive() diff -r 2665ab0bcf44 -r 2b57c8683ffe bash-completion.sh --- a/bash-completion.sh Sun Aug 07 10:45:05 2022 +0200 +++ b/bash-completion.sh Sun Aug 21 00:16:50 2022 +0200 @@ -22,27 +22,54 @@ w2=${COMP_WORDS[COMP_CWORD-2]} w3=${COMP_WORDS[COMP_CWORD-3]} - DATA_TYPE=( - "string" - "integer" - "boolean" + CONNECTION_STRINGS=( + "udp://127.0.0.1:64000" + "tcp://127.0.0.1:64000" + "sctp://127.0.0.1:64000" + "uds:///tmp/relpipe.socket" ) - BOOLEAN_VALUES=( - "true" - "false" + OPTIONS=( + "protocol" + "role" + "mode" + "host" + "port" + "path" + "delay" + ) + + PROTOCOLS=( + "tcp" + "udp" + "uds" + "sctp" + ) + + ROLES=( + "client" + "server" + ) + + MODES=( + "stream" + "datagram" ) if [[ "$w1" == "--relation" && "x$w0" == "x" ]]; then COMPREPLY=("''") - elif [[ "$w1" == "--unlink-on-close" ]]; then COMPREPLY=($(compgen -W "${BOOLEAN_VALUES[*]}" -- "$w0")) - elif [[ "$w1" == "--queue" && "x$w0" == "x" ]]; then COMPREPLY=("''") elif [[ "$w1" == "--message-count" && "x$w0" == "x" ]]; then COMPREPLY=("1") + elif [[ "$w1" == "--connection-string" ]]; then COMPREPLY=($(compgen -W "${CONNECTION_STRINGS[*]}" -- "$w0")) + elif [[ "$w1" == "--connection-option" ]]; then COMPREPLY=($(compgen -W "${OPTIONS[*]}" -- "$w0")) + elif [[ "$w2" == "--connection-option" && "$w1" == "protocol" ]]; then COMPREPLY=($(compgen -W "${PROTOCOLS[*]}" -- "$w0")) + elif [[ "$w2" == "--connection-option" && "$w1" == "role" ]]; then COMPREPLY=($(compgen -W "${ROLES[*]}" -- "$w0")) + elif [[ "$w2" == "--connection-option" && "$w1" == "mode" ]]; then COMPREPLY=($(compgen -W "${MODES[*]}" -- "$w0")) + elif [[ "$w2" == "--connection-option" && "$w1" == "path" ]]; then COMPREPLY=($(compgen -f -- "$w0")) else OPTIONS=( "--relation" - "--unlink-on-close" - "--queue" "--message-count" + "--connection-string" + "--connection-option" ) COMPREPLY=($(compgen -W "${OPTIONS[*]}" -- "$w0")) fi diff -r 2665ab0bcf44 -r 2b57c8683ffe nbproject/configurations.xml --- a/nbproject/configurations.xml Sun Aug 07 10:45:05 2022 +0200 +++ b/nbproject/configurations.xml Sun Aug 21 00:16:50 2022 +0200 @@ -42,6 +42,7 @@ + Socket.cpp Socket.h SocketCommand.cpp relpipe-in-socket.cpp @@ -94,7 +95,9 @@ true - + + + diff -r 2665ab0bcf44 -r 2b57c8683ffe src/CLIParser.h --- a/src/CLIParser.h Sun Aug 07 10:45:05 2022 +0200 +++ b/src/CLIParser.h Sun Aug 21 00:16:50 2022 +0200 @@ -18,8 +18,10 @@ #include #include +#include +#include -#include +#include #include #include @@ -31,8 +33,9 @@ class CLIParser { private: + std::wstring_convert> convertor; // TODO: support also other encodings. - relpipe::writer::string_t readNext(const std::vector& arguments, int& i) { + relpipe::common::type::StringX readNext(const std::vector& arguments, int& i) { if (i < arguments.size()) return arguments[i++]; else throw relpipe::cli::RelpipeCLIException(L"Missing CLI argument" + (i > 0 ? (L" after " + arguments[i - 1]) : L""), relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); } @@ -46,30 +49,68 @@ else throw relpipe::cli::RelpipeCLIException(L"Unable to parse boolean value: " + value + L" (expecting true or false)", relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); } + void setIfMissing(std::vector& options, const std::string& name, const std::string& value, bool includeEmptyValues = true) { + auto n = convertor.from_bytes(name); + auto v = convertor.from_bytes(value); + for (auto o : options) if (o.name == n) return; + if (value.size() || includeEmptyValues) options.push_back({n, v}); + } + public: - static const relpipe::writer::string_t OPTION_RELATION; - static const relpipe::writer::string_t OPTION_UNLINK_ON_CLOSE; - static const relpipe::writer::string_t OPTION_QUEUE; + static const relpipe::common::type::StringX OPTION_RELATION; + static const relpipe::common::type::StringX OPTION_CONNECTION_STRING; + static const relpipe::common::type::StringX OPTION_CONNECTION_OPTION; static const relpipe::writer::string_t OPTION_MESSAGE_COUNT; - Configuration parse(const std::vector& arguments) { + Configuration parse(const std::vector& arguments) { Configuration c; + relpipe::common::type::StringX connectionString; for (int i = 0; i < arguments.size();) { - relpipe::writer::string_t option = readNext(arguments, i); + relpipe::common::type::StringX option = readNext(arguments, i); if (option == OPTION_RELATION) { c.relation = readNext(arguments, i); - } else if (option == OPTION_UNLINK_ON_CLOSE) { - c.unlinkOnClose = parseBoolean(readNext(arguments, i)); - } else if (option == OPTION_QUEUE) { - c.queue = readNext(arguments, i); } else if (option == OPTION_MESSAGE_COUNT) { c.messageCount = std::stoull(readNext(arguments, i)); + } else if (option == OPTION_CONNECTION_STRING) { + connectionString = readNext(arguments, i); + } else if (option == OPTION_CONNECTION_OPTION) { + auto name = readNext(arguments, i); + auto value = readNext(arguments, i); + c.options.push_back({name, value}); } else throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); } + using namespace options; + + // Parse the connection string and convert it to options: + if (connectionString.size()) { + std::string connectionStringBytes = convertor.to_bytes(connectionString); + std::regex pattern("(tcp|udp|sctp)://(([^:]+)|\\[([0-9a-fA-F:]+(%[a-zA-Z0-9]+)?)\\]):([0-9]+)|(uds)://(.*)"); + // 1 23 4 5 6 7 8 + std::smatch match; + if (std::regex_match(connectionStringBytes, match, pattern)) { + setIfMissing(c.options, OPTION_PROTOCOL, match[1], false); + setIfMissing(c.options, OPTION_PROTOCOL, match[7], false); + setIfMissing(c.options, OPTION_HOST, match[3], false); + setIfMissing(c.options, OPTION_HOST, match[4], false); + setIfMissing(c.options, OPTION_PORT, match[6], false); + setIfMissing(c.options, OPTION_PATH, match[8], false); + if (match[1] == PROTOCOL_TCP) setIfMissing(c.options, OPTION_MODE, MODE_STREAM); + } else { + throw relpipe::cli::RelpipeCLIException(L"Invalid connection string: " + connectionString, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); + } + } + + // Set defaults when values are missing: + setIfMissing(c.options, OPTION_PROTOCOL, PROTOCOL_UDP); + setIfMissing(c.options, OPTION_ROLE, ROLE_SERVER); + setIfMissing(c.options, OPTION_MODE, MODE_DATAGRAM); + setIfMissing(c.options, OPTION_HOST, "127.0.0.1"); + setIfMissing(c.options, OPTION_PORT, "64000"); + return c; } @@ -77,9 +118,9 @@ } }; -const relpipe::writer::string_t CLIParser::OPTION_RELATION = L"--relation"; -const relpipe::writer::string_t CLIParser::OPTION_UNLINK_ON_CLOSE = L"--unlink-on-close"; -const relpipe::writer::string_t CLIParser::OPTION_QUEUE = L"--queue"; +const relpipe::common::type::StringX CLIParser::OPTION_RELATION = L"--relation"; +const relpipe::common::type::StringX CLIParser::OPTION_CONNECTION_STRING = L"--connection-string"; +const relpipe::common::type::StringX CLIParser::OPTION_CONNECTION_OPTION = L"--connection-option"; const relpipe::writer::string_t CLIParser::OPTION_MESSAGE_COUNT = L"--message-count"; } diff -r 2665ab0bcf44 -r 2b57c8683ffe src/CMakeLists.txt --- a/src/CMakeLists.txt Sun Aug 07 10:45:05 2022 +0200 +++ b/src/CMakeLists.txt Sun Aug 21 00:16:50 2022 +0200 @@ -30,6 +30,7 @@ add_executable( ${EXECUTABLE_FILE} SocketCommand.cpp + Socket.cpp relpipe-in-socket.cpp ) diff -r 2665ab0bcf44 -r 2b57c8683ffe src/Configuration.h --- a/src/Configuration.h Sun Aug 07 10:45:05 2022 +0200 +++ b/src/Configuration.h Sun Aug 21 00:16:50 2022 +0200 @@ -21,6 +21,7 @@ #include +#include "Socket.h" namespace relpipe { namespace in { @@ -29,10 +30,20 @@ class Configuration { public: + class SocketOption { + public: + const relpipe::common::type::StringX name; + const relpipe::common::type::StringX value; + + SocketOption(const relpipe::common::type::StringX name, const relpipe::common::type::StringX value) : name(name), value(value) { + } + + virtual ~SocketOption() = default; + }; + relpipe::common::type::Integer messageCount = 1; relpipe::common::type::StringX relation = L"message"; - relpipe::common::type::StringX queue = L"/relpipe"; - relpipe::common::type::Boolean unlinkOnClose = false; + std::vector options; virtual ~Configuration() { } diff -r 2665ab0bcf44 -r 2b57c8683ffe src/Socket.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/Socket.cpp Sun Aug 21 00:16:50 2022 +0200 @@ -0,0 +1,412 @@ +/** + * Relational pipes + * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Socket.h" + +namespace relpipe { +namespace in { +namespace socket { + +using namespace relpipe::in::socket::options; + +static const std::string findOption(SocketOptions options, std::string name, bool required = false, const std::string defaultValue = "") { + for (auto o : options) if (o.name == name) return o.value; + if (required) throw std::invalid_argument("Option " + name + " is required but was not found"); + else return defaultValue; +} + +class FD { +private: + int fd; +public: + + FD(int fd) : fd(fd) { + }; + + virtual ~FD() { + close(fd); + } + + int getFD() { + return fd; + } +}; + +static void check(int result, std::string message) { + if (result == 0); // OK + else throw std::logic_error("Got error result: " + message + ": " + strerror(errno) + " (result=" + std::to_string(result) + ", errno=" + std::to_string(errno) + ")"); +} + +class AddressInfos { +private: + std::shared_ptr addrInfo; + + AddressInfos(addrinfo* addrInfo) : addrInfo(std::shared_ptr(addrInfo, freeaddrinfo)) { + } + +public: + + virtual ~AddressInfos() { + } + + class AddressInfo { + private: + std::shared_ptr parent; // to avoid premature deletion of the whole structure + public: + const addrinfo * const ai; + + AddressInfo(const addrinfo * const ai, std::shared_ptr parent) : ai(ai), parent(parent) { + } + + const std::string toString() const { + char buffer[INET6_ADDRSTRLEN] = {0}; + if (ai->ai_family == AF_INET) return inet_ntop(ai->ai_family, &((sockaddr_in const *) ai->ai_addr)->sin_addr, buffer, sizeof (buffer)); // TODO: check 0 result + else if (ai->ai_family == AF_INET6) return inet_ntop(ai->ai_family, &((sockaddr_in6 const *) ai->ai_addr)->sin6_addr, buffer, sizeof (buffer)); // TODO: check 0 result + else return "unknown address family: " + std::to_string(ai->ai_family); + } + }; + + static AddressInfos getAddressInfos(const std::string& host, const std::string& port, int socketType = SOCK_STREAM, int protocol = IPPROTO_TCP) { + struct addrinfo query; + memset(&query, sizeof (query), 0); + query.ai_family = AF_UNSPEC; + query.ai_socktype = socketType; + query.ai_protocol = protocol; + query.ai_flags = AI_ALL; + + struct addrinfo* addrInfo; + check(getaddrinfo(host.c_str(), port.c_str(), &query, &addrInfo), "getaddrinfo"); + + return AddressInfos(addrInfo); + } + + const std::size_t size() const { + std::size_t size = 0; + for (addrinfo* ai = addrInfo.get(); ai; ai = ai->ai_next) size++; + return size; + } + + const AddressInfo operator[](std::size_t index) const { + for (addrinfo* ai = addrInfo.get(); ai; index--) { + if (index == 0) return AddressInfo(ai, addrInfo); + else ai = ai->ai_next; + } + + throw std::out_of_range("invalid index for AddressInfo: " + std::to_string(index)); + } + +}; + +template static std::shared_ptr openClientSocket(const SocketOptions& options, int socketType, int protocol, MoreArgs... moreArgs) { + AddressInfos remoteAddresses = AddressInfos::getAddressInfos( + findOption(options, OPTION_HOST, true), + findOption(options, OPTION_PORT, true), + socketType, + protocol); + + return std::shared_ptr(new SocketClass(remoteAddresses[0], moreArgs...)); +} + +/** + * abstract class for sockets that use sendmsg() / recvmsg() + */ +class MSGSocket : public Socket { +protected: + FD socket; + + void sendmsg(const std::string& message) { + iovec iov[1]; + msghdr msg = {}; + msg.msg_iov = iov; + msg.msg_iov[0].iov_base = (void*) message.c_str(); + msg.msg_iov[0].iov_len = message.size(); + msg.msg_iovlen = sizeof (iov) / sizeof (iov[0]); + ssize_t written = ::sendmsg(socket.getFD(), &msg, 0); + if (written != message.size()) throw std::logic_error("writing to the socket failed"); + } +public: + + MSGSocket(int socket) : socket(socket) { + } + + void send(const OutgoingMessage& message) { + sendmsg(message.data); + } + +}; + +class UDPClientSocket : public Socket { +private: + AddressInfos::AddressInfo remoteAddress; + useconds_t delay = 0; + FD socket; + +public: + + UDPClientSocket(AddressInfos::AddressInfo remoteAddress) : remoteAddress(remoteAddress), socket(::socket(remoteAddress.ai->ai_family, remoteAddress.ai->ai_socktype, remoteAddress.ai->ai_protocol)) { + } + + static std::shared_ptr open(const SocketOptions& options) { + auto socket = openClientSocket(options, SOCK_DGRAM, IPPROTO_UDP); + socket->delay = std::stol(findOption(options, OPTION_DELAY, false, "0")); // TODO: Move to SocketHandler? Or delete. + return socket; + } + + void send(const OutgoingMessage& message) override { + auto ai = remoteAddress.ai; + sendto(socket.getFD(), message.data.c_str(), message.data.size(), 0, ai->ai_addr, ai->ai_addrlen); + if (delay) usleep(delay); + } + + const IncomingMessage receive() override { + // TODO: UDP receive() + return IncomingMessage("TODO: receive() a message"); + } +}; + +class UDPServerSocket : public Socket { +private: + AddressInfos::AddressInfo localAddress; + sockaddr_in6 remoteAddressForSending = {}; + useconds_t delay = 0; + FD socket; + +public: + + UDPServerSocket(AddressInfos::AddressInfo remoteAddress) : localAddress(remoteAddress), socket(::socket(remoteAddress.ai->ai_family, remoteAddress.ai->ai_socktype, remoteAddress.ai->ai_protocol)) { + } + + static std::shared_ptr open(const SocketOptions& options) { + auto socket = openClientSocket(options, SOCK_DGRAM, IPPROTO_UDP); + auto ai = socket->localAddress.ai; + int reuseAddr = true; + check(::setsockopt(socket->socket.getFD(), SOL_SOCKET, SO_REUSEADDR, &reuseAddr, sizeof (reuseAddr)), "setsockopt SO_REUSEADDR"); + check(::bind(socket->socket.getFD(), ai->ai_addr, ai->ai_addrlen), "bind"); + return socket; + } + + void send(const OutgoingMessage& message) override { + // receive a message first to get a remote address: + char buffer[8192]; + memset((char *) &remoteAddressForSending, 0, sizeof (remoteAddressForSending)); + socklen_t remoteAddressSize = sizeof (remoteAddressForSending); + ssize_t msgSize = recvfrom(socket.getFD(), buffer, sizeof (buffer), 0, (sockaddr*) & remoteAddressForSending, &remoteAddressSize); + + // respond with our messsage: + sendto(socket.getFD(), message.data.c_str(), message.data.size(), 0, (sockaddr*) & remoteAddressForSending, remoteAddressSize); + } + + const IncomingMessage receive() override { + char buffer[8192]; + sockaddr_in6 remoteAddress; + memset((char *) &remoteAddress, 0, sizeof (remoteAddress)); + socklen_t remoteAddressSize = sizeof (remoteAddress); + ssize_t msgSize = recvfrom(socket.getFD(), buffer, sizeof (buffer), 0, (sockaddr*) & remoteAddress, &remoteAddressSize); + check(msgSize < 0, "recvfrom"); + + IncomingMessage message(std::string(buffer, std::min(sizeof (buffer), (size_t) msgSize))); + + // TODO: move to a common method + char hostBuffer[INET6_ADDRSTRLEN] = {0}; + if (remoteAddress.sin6_family == AF_INET) { + sockaddr_in* remoteAddress4 = (sockaddr_in*) & remoteAddress; + message.remoteHost = inet_ntop(remoteAddress4->sin_family, &remoteAddress4->sin_addr, hostBuffer, sizeof (hostBuffer)); // TODO: check 0 result + message.remotePort = remoteAddress4->sin_port; + } else if (remoteAddress.sin6_family == AF_INET6) { + message.remoteHost = inet_ntop(remoteAddress.sin6_family, &remoteAddress.sin6_addr, hostBuffer, sizeof (hostBuffer)); // TODO: check 0 result + message.remotePort = remoteAddress.sin6_port; + } + + return message; + } +}; + +class TCPClientSocket : public Socket { +private: + AddressInfos::AddressInfo remoteAddress; + +public: + + TCPClientSocket(AddressInfos::AddressInfo remoteAddress) : remoteAddress(remoteAddress) { + } + + static std::shared_ptr open(const SocketOptions& options) { + return openClientSocket(options, SOCK_STREAM, IPPROTO_TCP); + } + + void send(const OutgoingMessage& message) override { + auto ai = remoteAddress.ai; + FD s(::socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)); + check(::connect(s.getFD(), ai->ai_addr, ai->ai_addrlen), "connect socket"); + ssize_t written = ::write(s.getFD(), message.data.c_str(), message.data.size()); + if (written != message.data.size()) throw std::logic_error("writing to the socket failed"); + // TODO: partial writes, repeat + } + + const IncomingMessage receive() override { + // TODO: TCP receive() + return IncomingMessage("TODO: receive() a message"); + } +}; + +class TCPServerSocket : public Socket { +private: + AddressInfos::AddressInfo localAddress; + FD socket; +public: + + TCPServerSocket(AddressInfos::AddressInfo localAddress) : localAddress(localAddress), socket(::socket(localAddress.ai->ai_family, localAddress.ai->ai_socktype, localAddress.ai->ai_protocol)) { + } + + static std::shared_ptr open(const SocketOptions& options) { + auto socket = openClientSocket(options, SOCK_STREAM, IPPROTO_TCP); + auto ai = socket->localAddress.ai; + int reuseAddr = true; + check(::setsockopt(socket->socket.getFD(), SOL_SOCKET, SO_REUSEADDR, &reuseAddr, sizeof (reuseAddr)), "setsockopt SO_REUSEADDR"); + check(::bind(socket->socket.getFD(), ai->ai_addr, ai->ai_addrlen), "bind"); + check(::listen(socket->socket.getFD(), 10), "listen"); // TODO: configurable backlog connection count? + return socket; + } + + void send(const OutgoingMessage& message) override { + FD clientSocket(accept(socket.getFD(), nullptr, 0)); + ssize_t written = ::write(clientSocket.getFD(), message.data.c_str(), message.data.size()); + if (written != message.data.size()) throw std::logic_error("writing to the socket failed"); + // TODO: partial writes, repeat + } + + const IncomingMessage receive() override { + // TODO: TCP receive() + return IncomingMessage("TODO: receive() a message"); + } +}; + +class SCTPClientSocket : public MSGSocket { +private: + AddressInfos::AddressInfo remoteAddress; +public: + + SCTPClientSocket(AddressInfos::AddressInfo remoteAddress) : remoteAddress(remoteAddress), MSGSocket(::socket(remoteAddress.ai->ai_family, remoteAddress.ai->ai_socktype, remoteAddress.ai->ai_protocol)) { + } + + static std::shared_ptr open(const SocketOptions& options) { + auto socket = openClientSocket(options, SOCK_STREAM, IPPROTO_SCTP); + check(::connect(socket->socket.getFD(), socket->remoteAddress.ai->ai_addr, socket->remoteAddress.ai->ai_addrlen), "connect socket"); + return socket; + } + + const IncomingMessage receive() override { + // TODO: SCTP receive() + return IncomingMessage("TODO: receive() a message"); + } +}; + +class UDSClientSocket : public MSGSocket { +public: + + UDSClientSocket(int fd) : MSGSocket(fd) { + } + + static std::shared_ptr open(const SocketOptions& options) { + struct sockaddr_un address; + std::string path = findOption(options, OPTION_PATH); + + memset(&address, 0x00, sizeof (address)); + address.sun_family = AF_UNIX; + strncpy(address.sun_path, path.c_str(), path.size()); + + int fd = ::socket(AF_UNIX, SOCK_STREAM, 0); + + auto socket = std::make_shared(fd); + check(::connect(socket->socket.getFD(), (const sockaddr*) &address, sizeof (address)), "connect socket"); + + + if (findOption(options, "debug") == "true") { // TODO: undocumented feature → standardize or remove + struct ucred credentials; + socklen_t credentialsLength = sizeof (credentials); + memset(&credentials, 0x00, credentialsLength); + getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &credentials, &credentialsLength); + + printf("uds.fd = %d\n", fd); + printf("uds.path = %s\n", address.sun_path); + printf("uds.server.pid = %d\n", credentials.pid); + printf("uds.server.uid = %d\n", credentials.uid); + printf("uds.server.gid = %d\n", credentials.gid); + } + + + return socket; + } + + const IncomingMessage receive() override { + // TODO: UDS receive() + return IncomingMessage("TODO: receive() a message"); + } +}; + +template +class TemplateSocketFactory : public SocketFactory { +public: + + bool canHandle(const SocketOptions& options) override { + return findOption(options, OPTION_PROTOCOL) == protocol + && findOption(options, OPTION_ROLE) == role + && findOption(options, OPTION_MODE) == mode; + } + + std::shared_ptr open(const SocketOptions& options) override { + return SocketClass::open(options); + } +}; + +static std::vector> factories +{ + std::make_shared> (), + std::make_shared> (), + std::make_shared> (), + std::make_shared> (), + std::make_shared> (), + std::make_shared> (), // TODO: correct class + std::make_shared> (), + std::make_shared> (), // TODO: correct class + std::make_shared> (), // TODO: correct class + std::make_shared> (), // TODO: correct class +}; + +std::shared_ptr SocketFactory::find(const SocketOptions& options) { + for (auto f : factories) if (f->canHandle(options)) return f; + throw std::logic_error("Unable to find a SocketFactory"); // TODO: add relevant options? +} + + +} +} +} diff -r 2665ab0bcf44 -r 2b57c8683ffe src/Socket.h --- a/src/Socket.h Sun Aug 07 10:45:05 2022 +0200 +++ b/src/Socket.h Sun Aug 21 00:16:50 2022 +0200 @@ -17,6 +17,7 @@ #pragma once #include +#include #include #include #include @@ -29,55 +30,92 @@ namespace in { namespace socket { -class Socket { -private: - const static size_t MSG_SIZE = 8192; // TODO: configurable/dynamic - int s = ::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); +namespace options { +static const char OPTION_PROTOCOL[] = "protocol"; +static const char OPTION_ROLE[] = "role"; +static const char OPTION_MODE[] = "mode"; +static const char OPTION_HOST[] = "host"; +static const char OPTION_PORT[] = "port"; +static const char OPTION_PATH[] = "path"; +static const char OPTION_DELAY[] = "delay"; + +static const char PROTOCOL_TCP[] = "tcp"; +static const char PROTOCOL_UDP[] = "udp"; +static const char PROTOCOL_UDS[] = "uds"; +static const char PROTOCOL_SCTP[] = "sctp"; + +static const char ROLE_CLIENT[] = "client"; +static const char ROLE_SERVER[] = "server"; +static const char MODE_STREAM[] = "stream"; +static const char MODE_DATAGRAM[] = "datagram"; +} + +class Message { +public: + std::string data; + + Message() { + } + + Message(const std::string& data) : data(data) { + } + + virtual ~Message() = default; + + +}; + +class IncomingMessage : public Message { public: - Socket() { - s = ::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - - struct sockaddr_in a; - memset((char *) &a, 0, sizeof (a)); - a.sin_family = AF_INET; - a.sin_addr.s_addr = inet_addr("127.0.0.1"); // TODO: use getaddrinfo() instead (because of error -1 = 255.255.255.255) - a.sin_port = htons(1234); - ::bind(s, (sockaddr*) & a, sizeof (a)); - - // int soBufferSize = 1024 * 1024; - // setsockopt(s, SOL_SOCKET, SO_RCVBUF, &soBufferSize, sizeof (soBufferSize)); - // soBufferSize = 0; - // socklen_t soBufferSizeLength = sizeof (soBufferSize); - - // std::cerr << "soBufferSize=" << soBufferSize << std::endl; - // getsockopt(s, SOL_SOCKET, SO_RCVBUF, &soBufferSize, &soBufferSizeLength); - // std::cerr << "soBufferSize=" << soBufferSize << " length=" << soBufferSizeLength << std::endl; + IncomingMessage(const std::string& data) : Message(data) { } - virtual ~Socket() { - close(s); - } - - std::string receive() { - char buffer[MSG_SIZE + 1]; - memset(buffer, 0, MSG_SIZE + 1); + std::string remoteHost; + in_port_t remotePort = 0; + pid_t remotePID = 0; + uid_t remoteUID = 0; + gid_t remoteGID = 0; +}; - struct sockaddr_in remoteAddress; - memset((char *) &remoteAddress, 0, sizeof (remoteAddress)); - socklen_t remoteAddressSize = sizeof (remoteAddress); +class OutgoingMessage : public Message { +public: - ssize_t msgSize = recvfrom(s, buffer, sizeof (buffer), 0, (sockaddr*) & remoteAddress, &remoteAddressSize); - - - if (msgSize > sizeof (buffer))throw std::logic_error("Invalid Socket message size."); - else if (msgSize >= 0) return std::string(buffer, msgSize); - else throw std::logic_error("Unable to receive Socket message the socket; error: " + std::string(strerror(errno))); + OutgoingMessage(const std::string& data) : Message(data) { } }; +class Socket { +public: + virtual ~Socket() = default; + virtual void send(const OutgoingMessage& message) = 0; + virtual const IncomingMessage receive() = 0; +}; + +class SocketOption { +public: + const std::string name; + const std::string value; + + SocketOption(const std::string name, const std::string value) : name(name), value(value) { + } + + virtual ~SocketOption() = default; +}; + +using SocketOptions = std::vector; + +class SocketFactory { +public: + virtual ~SocketFactory() = default; + virtual bool canHandle(const SocketOptions& options) = 0; + virtual std::shared_ptr open(const SocketOptions& options) = 0; + static std::shared_ptr find(const SocketOptions& options); +}; + + } } } diff -r 2665ab0bcf44 -r 2b57c8683ffe src/SocketCommand.cpp --- a/src/SocketCommand.cpp Sun Aug 07 10:45:05 2022 +0200 +++ b/src/SocketCommand.cpp Sun Aug 21 00:16:50 2022 +0200 @@ -47,21 +47,31 @@ void SocketCommand::process(std::shared_ptr writer, Configuration& configuration) { vector metadata; - std::shared_ptr socket = std::make_shared(); // TODO: create a TCP, UDP… socket + std::vector options; + for (auto o : configuration.options) options.push_back({convertor.to_bytes(o.name), convertor.to_bytes(o.value)}); + std::shared_ptr socket = SocketFactory::find(options)->open(options); writer->startRelation(configuration.relation,{ - {L"queue", TypeId::STRING}, + {L"host", TypeId::STRING}, + {L"port", TypeId::INTEGER}, + {L"pid", TypeId::INTEGER}, + {L"gid", TypeId::INTEGER}, + {L"uid", TypeId::INTEGER}, {L"text", TypeId::STRING}, {L"data", TypeId::STRING} }, true); for (int i = configuration.messageCount; continueProcessing && i > 0; i--) { // TODO: maybe rather call mq_timedreceive() inside and check continueProcessing (to be able to stop even when no messages are comming) - std::string message = socket->receive(); + IncomingMessage message = socket->receive(); - writer->writeAttribute(configuration.queue); - writer->writeAttribute(Hex::toTxt(message)); - writer->writeAttribute(Hex::toHex(message)); + writer->writeAttribute(convertor.from_bytes(message.remoteHost)); + writer->writeAttribute(std::to_wstring(message.remotePort)); + writer->writeAttribute(std::to_wstring(message.remotePID)); + writer->writeAttribute(std::to_wstring(message.remoteGID)); + writer->writeAttribute(std::to_wstring(message.remoteUID)); + writer->writeAttribute(Hex::toTxt(message.data)); + writer->writeAttribute(Hex::toHex(message.data)); } }