--- a/bash-completion.sh Thu Jul 28 02:45:11 2022 +0200
+++ b/bash-completion.sh Fri Jul 29 18:03:49 2022 +0200
@@ -22,25 +22,38 @@
w2=${COMP_WORDS[COMP_CWORD-2]}
w3=${COMP_WORDS[COMP_CWORD-3]}
- DATA_TYPE=(
- "string"
- "integer"
- "boolean"
+ CONNECTION_STRINGS=(
+ "udp://127.0.0.1:64000"
+ "tcp://127.0.0.1:64000"
+ "sctp://127.0.0.1:64000"
+ "uds:///tmp/relpipe.socket"
)
- BOOLEAN_VALUES=(
- "true"
- "false"
+ CONNECTION_OPTIONS=(
+ "role"
+ "mode"
+ )
+
+ CONNECTION_ROLES=(
+ "client"
+ "server"
+ )
+
+ CONNECTION_MODES=(
+ "stream"
+ "datagram"
)
if [[ "$w1" == "--relation" && "x$w0" == "x" ]]; then COMPREPLY=("''")
- elif [[ "$w1" == "--unlink-on-close" ]]; then COMPREPLY=($(compgen -W "${BOOLEAN_VALUES[*]}" -- "$w0"))
- elif [[ "$w1" == "--queue" && "x$w0" == "x" ]]; then COMPREPLY=("''")
+ elif [[ "$w1" == "--connection-string" ]]; then COMPREPLY=($(compgen -W "${CONNECTION_STRINGS[*]}" -- "$w0"))
+ elif [[ "$w1" == "--connection-option" ]]; then COMPREPLY=($(compgen -W "${CONNECTION_OPTIONS[*]}" -- "$w0"))
+ elif [[ "$w2" == "--connection-option" && "$w1" == "role" ]]; then COMPREPLY=($(compgen -W "${CONNECTION_ROLES[*]}" -- "$w0"))
+ elif [[ "$w2" == "--connection-option" && "$w1" == "mode" ]]; then COMPREPLY=($(compgen -W "${CONNECTION_MODES[*]}" -- "$w0"))
else
OPTIONS=(
"--relation"
- "--unlink-on-close"
- "--queue"
+ "--connection-string"
+ "--connection-option"
)
COMPREPLY=($(compgen -W "${OPTIONS[*]}" -- "$w0"))
fi
--- a/src/CLIParser.h Thu Jul 28 02:45:11 2022 +0200
+++ b/src/CLIParser.h Fri Jul 29 18:03:49 2022 +0200
@@ -49,8 +49,8 @@
public:
static const relpipe::common::type::StringX OPTION_RELATION;
- static const relpipe::common::type::StringX OPTION_UNLINK_ON_CLOSE;
- static const relpipe::common::type::StringX OPTION_QUEUE;
+ static const relpipe::common::type::StringX OPTION_CONNECTION_STRING;
+ static const relpipe::common::type::StringX OPTION_CONNECTION_OPTION;
Configuration parse(const std::vector<relpipe::common::type::StringX>& arguments) {
Configuration c;
@@ -60,10 +60,12 @@
if (option == OPTION_RELATION) {
c.relation = readNext(arguments, i);
- } else if (option == OPTION_UNLINK_ON_CLOSE) {
- c.unlinkOnClose = parseBoolean(readNext(arguments, i));
- } else if (option == OPTION_QUEUE) {
- c.queue = readNext(arguments, i);
+ } else if (option == OPTION_CONNECTION_STRING) {
+ c.connectionString = readNext(arguments, i);
+ } else if (option == OPTION_CONNECTION_OPTION) {
+ auto name = readNext(arguments, i);
+ auto value = readNext(arguments, i);
+ c.options.push_back({name, value});
} else throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
}
@@ -75,8 +77,8 @@
};
const relpipe::common::type::StringX CLIParser::OPTION_RELATION = L"--relation";
-const relpipe::common::type::StringX CLIParser::OPTION_UNLINK_ON_CLOSE = L"--unlink-on-close";
-const relpipe::common::type::StringX CLIParser::OPTION_QUEUE = L"--queue";
+const relpipe::common::type::StringX CLIParser::OPTION_CONNECTION_STRING = L"--connection-string";
+const relpipe::common::type::StringX CLIParser::OPTION_CONNECTION_OPTION = L"--connection-option";
}
}
--- a/src/Configuration.h Thu Jul 28 02:45:11 2022 +0200
+++ b/src/Configuration.h Fri Jul 29 18:03:49 2022 +0200
@@ -29,9 +29,21 @@
class Configuration {
public:
+ class SocketOption {
+ public:
+ const relpipe::common::type::StringX name;
+ const relpipe::common::type::StringX value;
+
+ SocketOption(const relpipe::common::type::StringX name, const relpipe::common::type::StringX value) : name(name), value(value) {
+ }
+
+ virtual ~SocketOption() = default;
+ };
+
+
relpipe::common::type::StringX relation = L"socket";
- relpipe::common::type::StringX queue = L"/relpipe";
- relpipe::common::type::Boolean unlinkOnClose = false;
+ relpipe::common::type::StringX connectionString = L"udp://127.0.0.1:64000";
+ std::vector<SocketOption> options;
virtual ~Configuration() {
}
--- a/src/Socket.cpp Thu Jul 28 02:45:11 2022 +0200
+++ b/src/Socket.cpp Fri Jul 29 18:03:49 2022 +0200
@@ -24,6 +24,7 @@
#include <netinet/in.h>
#include <vector>
#include <memory>
+#include <regex>
#include "Socket.h"
@@ -40,21 +41,50 @@
static const char PROTOCOL_UDS_LISTEN[] = "uds-listen://";
static const char PROTOCOL_SCTP_LISTEN[] = "sctp-listen://";
-class TCPSocket : public Socket {
+class FD {
+private:
+ int fd;
public:
+ FD(int fd) : fd(fd) {
+ };
+
+ virtual ~FD() {
+ close(fd);
+ }
+
+ int getFD() {
+ return fd;
+ }
+};
+
+/**
+ * @param connectionString
+ * @return protocol, hostname or path, port (optional)
+ */
+static std::tuple<std::string, std::string, uint16_t> parseConnectionString(const std::string& connectionString) {
+ // TODO: support „:“ in domain socket paths?
+ std::regex pattern("([^:]+)://([^:]+)(:([0-9]+))?");
+
+}
+
+class UDPSocket : public Socket {
+private:
+ struct sockaddr_in remoteAddress;
+public:
+
+ static std::shared_ptr<Socket> open(const std::string& connectionString) {
+ std::shared_ptr<UDPSocket> s = std::make_shared<UDPSocket>();
+ memset((char *) &s->remoteAddress, 0, sizeof (s->remoteAddress));
+ s->remoteAddress.sin_family = AF_INET;
+ s->remoteAddress.sin_addr.s_addr = inet_addr("127.0.0.1"); // TODO: use getaddrinfo() instead (because of error -1 = 255.255.255.255)
+ s->remoteAddress.sin_port = htons(1234);
+ return s;
+ }
+
void send(const std::string& message) override {
- // TODO: TCP send()
- struct sockaddr_in a;
- memset((char *) &a, 0, sizeof (a));
- a.sin_family = AF_INET;
- a.sin_addr.s_addr = inet_addr("127.0.0.1"); // TODO: use getaddrinfo() instead (because of error -1 = 255.255.255.255)
- a.sin_port = htons(1234);
-
- int s = ::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
- sendto(s, message.c_str(), message.size(), 0, (sockaddr*) & a, sizeof (a));
-
- close(s);
+ FD s(::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP));
+ sendto(s.getFD(), message.c_str(), message.size(), 0, (sockaddr*) & remoteAddress, sizeof (remoteAddress));
}
const std::string receive() override {
@@ -72,24 +102,25 @@
return connectionString.rfind(protocol, 0) == 0;
}
- Socket* open(const std::string& connectionString) override {
+ std::shared_ptr<Socket> open(const std::string& connectionString) override {
// TODO: pass string to constructor
// TODO: return shared_ptr?
- return new SocketClass();
+ return SocketClass::open(connectionString);
}
};
-static std::vector<std::shared_ptr<SocketFactory>> factories{
+static std::vector<std::shared_ptr<SocketFactory>> factories
+{
// FIXME: different classes than TCPSocket
- std::make_shared<TemplateSocketFactory<PROTOCOL_TCP, TCPSocket>>(),
- std::make_shared<TemplateSocketFactory<PROTOCOL_UDP, TCPSocket>>(),
- std::make_shared<TemplateSocketFactory<PROTOCOL_UDS, TCPSocket>>(),
- std::make_shared<TemplateSocketFactory<PROTOCOL_SCTP, TCPSocket>>(),
+ std::make_shared<TemplateSocketFactory<PROTOCOL_TCP, UDPSocket >> (),
+ std::make_shared<TemplateSocketFactory<PROTOCOL_UDP, UDPSocket >> (),
+ std::make_shared<TemplateSocketFactory<PROTOCOL_UDS, UDPSocket >> (),
+ std::make_shared<TemplateSocketFactory<PROTOCOL_SCTP, UDPSocket >> (),
};
std::shared_ptr<SocketFactory> SocketFactory::find(const std::string& connectionString) {
for (auto f : factories) if (f->canHandle(connectionString)) return f;
- return std::shared_ptr<SocketFactory>();
+ throw std::logic_error("Unable to find a SocketFactory for connection string: " + connectionString);
}
--- a/src/Socket.h Thu Jul 28 02:45:11 2022 +0200
+++ b/src/Socket.h Fri Jul 29 18:03:49 2022 +0200
@@ -32,17 +32,26 @@
class Socket {
public:
virtual ~Socket() = default;
- // virtual void setOption(const std::string& uri, const std::string& value) = 0;
virtual void send(const std::string& message) = 0;
virtual const std::string receive() = 0;
};
+class SocketOption {
+public:
+ const std::string name;
+ const std::string value;
+
+ SocketOption(const std::string name, const std::string value) : name(name), value(value) {
+ }
+
+ virtual ~SocketOption() = default;
+};
+
class SocketFactory {
public:
virtual ~SocketFactory() = default;
virtual bool canHandle(const std::string& connectionString) = 0;
- // virtual void setOption(const std::string& uri, const std::string& value) = 0;
- virtual Socket* open(const std::string& connectionString) = 0;
+ virtual std::shared_ptr<Socket> open(const std::string& connectionString) = 0;
static std::shared_ptr<SocketFactory> find(const std::string& connectionString);
};
--- a/src/SocketHandler.h Thu Jul 28 02:45:11 2022 +0200
+++ b/src/SocketHandler.h Fri Jul 29 18:03:49 2022 +0200
@@ -50,15 +50,24 @@
std::string currentValue;
} currentRelation;
+ void configureSocket() {
+ if (socket) {
+ // already configured, reuse
+ } else {
+ std::vector<SocketOption> options;
+ for (auto o : configuration.options) options.push_back({convertor.to_bytes(o.name), convertor.to_bytes(o.value)});
+ const std::string connectionString = convertor.to_bytes(configuration.connectionString);
+ if (auto f = SocketFactory::find(connectionString)) socket = f->open(connectionString);
+ }
+ }
+
public:
SocketHandler(Configuration configuration) : configuration(configuration) {
- // TODO: do not throw exception from the constructor: Socket::open()
- // socket = std::make_shared<TCPSocket>(); // TODO: create a TCP, UDP… socket
- socket.reset(SocketFactory::find("tcp://TODO:connectionString")->open("tcp://TODO:connectionString2")); // TODO: connection string + check nullptr
}
void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
+ configureSocket();
currentRelation = CurrentRelation{name, attributes};
}