--- a/bash-completion.sh Sat Feb 26 01:21:14 2022 +0100
+++ b/bash-completion.sh Tue Mar 01 00:47:49 2022 +0100
@@ -35,14 +35,13 @@
)
if [[ "$w1" == "--relation" && "x$w0" == "x" ]]; then COMPREPLY=("''")
- elif [[ "$w1" == "--attribute" && "x$w0" == "x" ]]; then COMPREPLY=("''")
- elif [[ "$w2" == "--attribute" ]]; then COMPREPLY=($(compgen -W "${DATA_TYPE[*]}" -- "$w0"))
- elif [[ "$w1" == "--read-types" ]]; then COMPREPLY=($(compgen -W "${READ_TYPES[*]}" -- "$w0"))
+ elif [[ "$w1" == "--queue" && "x$w0" == "x" ]]; then COMPREPLY=("''")
+ elif [[ "$w1" == "--message-count" && "x$w0" == "x" ]]; then COMPREPLY=("1")
else
OPTIONS=(
"--relation"
- "--attribute"
- "--read-types"
+ "--queue"
+ "--message-count"
)
COMPREPLY=($(compgen -W "${OPTIONS[*]}" -- "$w0"))
fi
--- a/nbproject/configurations.xml Sat Feb 26 01:21:14 2022 +0100
+++ b/nbproject/configurations.xml Tue Mar 01 00:47:49 2022 +0100
@@ -42,6 +42,7 @@
<logicalFolder name="root" displayName="root" projectFiles="true" kind="ROOT">
<df root="." name="0">
<df name="src">
+ <in>PosixMQ.h</in>
<in>PosixMQCommand.cpp</in>
<in>relpipe-in-posixmq.cpp</in>
</df>
@@ -93,6 +94,8 @@
<preBuildFirst>true</preBuildFirst>
</preBuild>
</makefileType>
+ <item path="src/PosixMQ.h" ex="false" tool="3" flavor2="0">
+ </item>
<item path="src/PosixMQCommand.cpp" ex="false" tool="1" flavor2="0">
<ccTool flags="0">
</ccTool>
@@ -131,6 +134,8 @@
<preBuildFirst>true</preBuildFirst>
</preBuild>
</makefileType>
+ <item path="src/PosixMQ.h" ex="false" tool="3" flavor2="0">
+ </item>
</conf>
</confs>
</configurationDescriptor>
--- a/src/CLIParser.h Sat Feb 26 01:21:14 2022 +0100
+++ b/src/CLIParser.h Tue Mar 01 00:47:49 2022 +0100
@@ -37,38 +37,11 @@
else throw relpipe::cli::RelpipeCLIException(L"Missing CLI argument" + (i > 0 ? (L" after " + arguments[i - 1]) : L""), relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
}
- /**
- * TODO: use a common method
- */
- bool parseBoolean(const relpipe::writer::string_t& value) {
- if (value == L"true") return true;
- else if (value == L"false") return false;
- else throw relpipe::cli::RelpipeCLIException(L"Unable to parse boolean value: " + value + L" (expecting true or false)", relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
- }
-
- /**
- * TODO: use a common method
- */
- relpipe::writer::TypeId parseTypeId(const relpipe::writer::string_t& value) {
- using t = relpipe::writer::TypeId;
- if (value == L"string") return t::STRING;
- else if (value == L"integer") return t::INTEGER;
- else if (value == L"boolean") return t::BOOLEAN;
- else throw relpipe::cli::RelpipeCLIException(L"Unable to parse TypeId: " + value, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
- }
-
- Configuration::ReadTypes parseReadTypes(const relpipe::writer::string_t& value) {
- if (value == L"auto") return Configuration::ReadTypes::AUTO;
- else if (value == L"true") return Configuration::ReadTypes::TRUE;
- else if (value == L"false") return Configuration::ReadTypes::FALSE;
- else throw relpipe::cli::RelpipeCLIException(L"Unable to parse ReadTypes: " + value, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
- }
-
public:
static const relpipe::writer::string_t OPTION_RELATION;
- static const relpipe::writer::string_t OPTION_ATTRIBUTE;
- static const relpipe::writer::string_t OPTION_READ_TYPES;
+ static const relpipe::writer::string_t OPTION_QUEUE;
+ static const relpipe::writer::string_t OPTION_MESSAGE_COUNT;
Configuration parse(const std::vector<relpipe::writer::string_t>& arguments) {
Configuration c;
@@ -78,13 +51,10 @@
if (option == OPTION_RELATION) {
c.relation = readNext(arguments, i);
- } else if (option == OPTION_ATTRIBUTE) {
- AttributeRecipe attribute;
- attribute.name = readNext(arguments, i);
- attribute.type = parseTypeId(readNext(arguments, i));
- c.attributes.push_back(attribute);
- } else if (option == OPTION_READ_TYPES) {
- c.readTypes = parseReadTypes(readNext(arguments, i));
+ } else if (option == OPTION_QUEUE) {
+ c.queue = readNext(arguments, i);
+ } else if (option == OPTION_MESSAGE_COUNT) {
+ c.messageCount = std::stoull(readNext(arguments, i));
} else throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
}
@@ -96,8 +66,8 @@
};
const relpipe::writer::string_t CLIParser::OPTION_RELATION = L"--relation";
-const relpipe::writer::string_t CLIParser::OPTION_ATTRIBUTE = L"--attribute";
-const relpipe::writer::string_t CLIParser::OPTION_READ_TYPES = L"--read-types";
+const relpipe::writer::string_t CLIParser::OPTION_QUEUE = L"--queue";
+const relpipe::writer::string_t CLIParser::OPTION_MESSAGE_COUNT = L"--message-count";
}
}
--- a/src/CMakeLists.txt Sat Feb 26 01:21:14 2022 +0100
+++ b/src/CMakeLists.txt Tue Mar 01 00:47:49 2022 +0100
@@ -34,7 +34,7 @@
)
# Link libraries:
-target_link_libraries(${EXECUTABLE_FILE} ${RELPIPE_LIBS_LIBRARIES})
+target_link_libraries(${EXECUTABLE_FILE} ${RELPIPE_LIBS_LIBRARIES} rt)
set_property(TARGET ${EXECUTABLE_FILE} PROPERTY INSTALL_RPATH_USE_LINK_PATH TRUE)
install(TARGETS ${EXECUTABLE_FILE} DESTINATION bin)
--- a/src/Configuration.h Sat Feb 26 01:21:14 2022 +0100
+++ b/src/Configuration.h Tue Mar 01 00:47:49 2022 +0100
@@ -26,29 +26,12 @@
namespace in {
namespace posixmq {
-class AttributeRecipe {
-public:
-
- virtual ~AttributeRecipe() {
- }
-
- relpipe::writer::string_t name;
- relpipe::writer::TypeId type;
-
-};
-
class Configuration {
public:
- enum class ReadTypes {
- AUTO,
- TRUE,
- FALSE,
- };
-
- ReadTypes readTypes = ReadTypes::AUTO;
+ relpipe::writer::integer_t messageCount = 1;
relpipe::writer::string_t relation = L"posixmq";
- std::vector<AttributeRecipe> attributes;
+ relpipe::writer::string_t queue = L"relpipe";
virtual ~Configuration() {
}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/PosixMQ.h Tue Mar 01 00:47:49 2022 +0100
@@ -0,0 +1,66 @@
+/**
+ * Relational pipes
+ * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, version 3 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#pragma once
+
+#include <mqueue.h>
+#include <string>
+#include <stdexcept>
+#include <cstring>
+
+namespace relpipe {
+namespace in {
+namespace posixmq {
+
+class PosixMQ {
+private:
+ size_t MSG_SIZE = 8192; // TODO: configurable/dynamic
+ std::string queueName;
+ mqd_t handle = -2;
+
+ PosixMQ(std::string queueName, mqd_t handle) : queueName(queueName), handle(handle) {
+ }
+
+public:
+
+ virtual ~PosixMQ() {
+ if (handle >= 0) mq_close(handle);
+ }
+
+ static PosixMQ* open(std::string queueName) {
+ mqd_t handle = mq_open(queueName.c_str(), O_RDONLY | O_CREAT);
+ if (handle >= 0) return new PosixMQ(queueName, handle);
+ else throw std::logic_error("Unable to open PosixMQ: " + queueName + " error: " + strerror(errno));
+ }
+
+ std::string receive() {
+ char buffer[MSG_SIZE + 1];
+ memset(buffer, 0, MSG_SIZE + 1);
+ ssize_t msgSize = mq_receive(handle, buffer, MSG_SIZE, nullptr);
+
+ if (msgSize >= 0) return std::string(buffer);
+ else throw std::logic_error("Unable to receive PosixMQ message from " + queueName + " error: " + strerror(errno));
+ }
+
+ void unlink() {
+ mq_unlink(queueName.c_str());
+ }
+
+};
+
+}
+}
+}
--- a/src/PosixMQCommand.cpp Sat Feb 26 01:21:14 2022 +0100
+++ b/src/PosixMQCommand.cpp Tue Mar 01 00:47:49 2022 +0100
@@ -31,6 +31,7 @@
#include <relpipe/cli/CLI.h>
#include "PosixMQCommand.h"
+#include "PosixMQ.h"
using namespace std;
using namespace relpipe::cli;
@@ -41,18 +42,25 @@
namespace posixmq {
void PosixMQCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) {
- wstring_convert < codecvt_utf8<wchar_t>> convertor; // UTF-8 is required for PosixMQ
vector<AttributeMetadata> metadata;
- writer->startRelation(L"posix_mq",{
+ std::shared_ptr<PosixMQ> mq(PosixMQ::open(convertor.to_bytes(configuration.queue)));
+
+ writer->startRelation(configuration.relation,{
+ {L"queue", TypeId::STRING},
{L"message", TypeId::STRING}
}, true);
-
- writer->writeAttribute(L"TODO: read messages from POSIX MQ");
+ for (int i = configuration.messageCount; i > 0; i--) {
+ writer->writeAttribute(configuration.queue);
+ writer->writeAttribute(convertor.from_bytes(mq->receive()));
+ }
}
+PosixMQCommand::~PosixMQCommand() {
+}
+
}
}
}
\ No newline at end of file
--- a/src/PosixMQCommand.h Sat Feb 26 01:21:14 2022 +0100
+++ b/src/PosixMQCommand.h Tue Mar 01 00:47:49 2022 +0100
@@ -30,7 +30,11 @@
namespace posixmq {
class PosixMQCommand {
+private:
+ std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
public:
+ virtual ~PosixMQCommand();
+
void process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration);
};