--- a/src/ZeroMQCommand.cpp Sun May 01 18:23:45 2022 +0200
+++ b/src/ZeroMQCommand.cpp Sun May 01 22:27:32 2022 +0200
@@ -22,7 +22,8 @@
#include <algorithm>
#include <unistd.h>
#include <sstream>
-#include <iomanip>
+
+#include <zmq.hpp>
#include <relpipe/writer/RelationalWriter.h>
#include <relpipe/writer/RelpipeWriterException.h>
@@ -33,7 +34,6 @@
#include <relpipe/cli/CLI.h>
#include "ZeroMQCommand.h"
-#include "ZeroMQ.h"
#include "Hex.h"
using namespace std;
@@ -45,21 +45,20 @@
namespace zeromq {
void ZeroMQCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) {
- vector<AttributeMetadata> metadata;
-
- std::shared_ptr<ZeroMQ> mq(ZeroMQ::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose));
+ zmq::context_t zmqContext;
+ zmq::socket_t zmqSocket(zmqContext, zmq::socket_type::pull);
+ zmqSocket.bind(convertor.to_bytes(configuration.endpointUrl));
writer->startRelation(configuration.relation,{
- {L"queue", TypeId::STRING},
{L"text", TypeId::STRING},
{L"data", TypeId::STRING}
}, true);
for (int i = configuration.messageCount; continueProcessing && i > 0; i--) {
- // TODO: maybe rather call mq_timedreceive() inside and check continueProcessing (to be able to stop even when no messages are comming)
- std::string message = mq->receive();
+ zmq::message_t msg;
+ zmqSocket.recv(&msg, 0); // FIXME: check return value
+ std::string message(msg.data<char>(), msg.size());
- writer->writeAttribute(configuration.queue);
writer->writeAttribute(Hex::toTxt(message));
writer->writeAttribute(Hex::toHex(message));
}