src/ZeroMQCommand.cpp
branchv_0
changeset 1 27c11cea34de
parent 0 e5d547ab0c51
child 2 f724d805c34a
equal deleted inserted replaced
0:e5d547ab0c51 1:27c11cea34de
    20 #include <locale>
    20 #include <locale>
    21 #include <regex>
    21 #include <regex>
    22 #include <algorithm>
    22 #include <algorithm>
    23 #include <unistd.h>
    23 #include <unistd.h>
    24 #include <sstream>
    24 #include <sstream>
    25 #include <iomanip>
    25 
       
    26 #include <zmq.hpp>
    26 
    27 
    27 #include <relpipe/writer/RelationalWriter.h>
    28 #include <relpipe/writer/RelationalWriter.h>
    28 #include <relpipe/writer/RelpipeWriterException.h>
    29 #include <relpipe/writer/RelpipeWriterException.h>
    29 #include <relpipe/writer/AttributeMetadata.h>
    30 #include <relpipe/writer/AttributeMetadata.h>
    30 #include <relpipe/writer/Factory.h>
    31 #include <relpipe/writer/Factory.h>
    31 #include <relpipe/writer/TypeId.h>
    32 #include <relpipe/writer/TypeId.h>
    32 
    33 
    33 #include <relpipe/cli/CLI.h>
    34 #include <relpipe/cli/CLI.h>
    34 
    35 
    35 #include "ZeroMQCommand.h"
    36 #include "ZeroMQCommand.h"
    36 #include "ZeroMQ.h"
       
    37 #include "Hex.h"
    37 #include "Hex.h"
    38 
    38 
    39 using namespace std;
    39 using namespace std;
    40 using namespace relpipe::cli;
    40 using namespace relpipe::cli;
    41 using namespace relpipe::writer;
    41 using namespace relpipe::writer;
    43 namespace relpipe {
    43 namespace relpipe {
    44 namespace in {
    44 namespace in {
    45 namespace zeromq {
    45 namespace zeromq {
    46 
    46 
    47 void ZeroMQCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) {
    47 void ZeroMQCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) {
    48 	vector<AttributeMetadata> metadata;
    48 	zmq::context_t zmqContext;
    49 
    49 	zmq::socket_t zmqSocket(zmqContext, zmq::socket_type::pull);
    50 	std::shared_ptr<ZeroMQ> mq(ZeroMQ::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose));
    50 	zmqSocket.bind(convertor.to_bytes(configuration.endpointUrl));
    51 
    51 
    52 	writer->startRelation(configuration.relation,{
    52 	writer->startRelation(configuration.relation,{
    53 		{L"queue", TypeId::STRING},
       
    54 		{L"text", TypeId::STRING},
    53 		{L"text", TypeId::STRING},
    55 		{L"data", TypeId::STRING}
    54 		{L"data", TypeId::STRING}
    56 	}, true);
    55 	}, true);
    57 
    56 
    58 	for (int i = configuration.messageCount; continueProcessing && i > 0; i--) {
    57 	for (int i = configuration.messageCount; continueProcessing && i > 0; i--) {
    59 		// TODO: maybe rather call mq_timedreceive() inside and check continueProcessing (to be able to stop even when no messages are comming)
    58 		zmq::message_t msg;
    60 		std::string message = mq->receive();
    59 		zmqSocket.recv(&msg, 0); // FIXME: check return value
       
    60 		std::string message(msg.data<char>(), msg.size());
    61 
    61 
    62 		writer->writeAttribute(configuration.queue);
       
    63 		writer->writeAttribute(Hex::toTxt(message));
    62 		writer->writeAttribute(Hex::toTxt(message));
    64 		writer->writeAttribute(Hex::toHex(message));
    63 		writer->writeAttribute(Hex::toHex(message));
    65 	}
    64 	}
    66 
    65 
    67 }
    66 }