src/ZeroMQCommand.cpp
branchv_0
changeset 1 27c11cea34de
parent 0 e5d547ab0c51
child 2 f724d805c34a
--- a/src/ZeroMQCommand.cpp	Sun May 01 18:23:45 2022 +0200
+++ b/src/ZeroMQCommand.cpp	Sun May 01 22:27:32 2022 +0200
@@ -22,7 +22,8 @@
 #include <algorithm>
 #include <unistd.h>
 #include <sstream>
-#include <iomanip>
+
+#include <zmq.hpp>
 
 #include <relpipe/writer/RelationalWriter.h>
 #include <relpipe/writer/RelpipeWriterException.h>
@@ -33,7 +34,6 @@
 #include <relpipe/cli/CLI.h>
 
 #include "ZeroMQCommand.h"
-#include "ZeroMQ.h"
 #include "Hex.h"
 
 using namespace std;
@@ -45,21 +45,20 @@
 namespace zeromq {
 
 void ZeroMQCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) {
-	vector<AttributeMetadata> metadata;
-
-	std::shared_ptr<ZeroMQ> mq(ZeroMQ::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose));
+	zmq::context_t zmqContext;
+	zmq::socket_t zmqSocket(zmqContext, zmq::socket_type::pull);
+	zmqSocket.bind(convertor.to_bytes(configuration.endpointUrl));
 
 	writer->startRelation(configuration.relation,{
-		{L"queue", TypeId::STRING},
 		{L"text", TypeId::STRING},
 		{L"data", TypeId::STRING}
 	}, true);
 
 	for (int i = configuration.messageCount; continueProcessing && i > 0; i--) {
-		// TODO: maybe rather call mq_timedreceive() inside and check continueProcessing (to be able to stop even when no messages are comming)
-		std::string message = mq->receive();
+		zmq::message_t msg;
+		zmqSocket.recv(&msg, 0); // FIXME: check return value
+		std::string message(msg.data<char>(), msg.size());
 
-		writer->writeAttribute(configuration.queue);
 		writer->writeAttribute(Hex::toTxt(message));
 		writer->writeAttribute(Hex::toHex(message));
 	}