src/ZeroMQHandler.h
branchv_0
changeset 1 ccaed729f8d3
parent 0 452d06d24ac2
child 2 e6294ba5017f
equal deleted inserted replaced
0:452d06d24ac2 1:ccaed729f8d3
    22 #include <iostream>
    22 #include <iostream>
    23 #include <sstream>
    23 #include <sstream>
    24 #include <locale>
    24 #include <locale>
    25 #include <codecvt>
    25 #include <codecvt>
    26 
    26 
       
    27 #include <zmq.hpp>
       
    28 
    27 #include <relpipe/common/type/typedefs.h>
    29 #include <relpipe/common/type/typedefs.h>
    28 #include <relpipe/reader/TypeId.h>
    30 #include <relpipe/reader/TypeId.h>
    29 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
    31 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
    30 #include <relpipe/reader/handlers/AttributeMetadata.h>
    32 #include <relpipe/reader/handlers/AttributeMetadata.h>
    31 
    33 
    32 #include "ZeroMQ.h"
       
    33 #include "Configuration.h"
    34 #include "Configuration.h"
    34 #include "Hex.h"
    35 #include "Hex.h"
    35 
    36 
    36 namespace relpipe {
    37 namespace relpipe {
    37 namespace out {
    38 namespace out {
    39 
    40 
    40 class ZeroMQHandler : public relpipe::reader::handlers::RelationalReaderStringHandler {
    41 class ZeroMQHandler : public relpipe::reader::handlers::RelationalReaderStringHandler {
    41 private:
    42 private:
    42 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
    43 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
    43 	Configuration configuration;
    44 	Configuration configuration;
    44 	shared_ptr<ZeroMQ> mq;
    45 	zmq::context_t zmqContext;
       
    46 	zmq::socket_t zmqSocket;
    45 
    47 
    46 	struct CurrentRelation {
    48 	struct CurrentRelation {
    47 		relpipe::common::type::StringX name;
    49 		relpipe::common::type::StringX name;
    48 		std::vector<relpipe::reader::handlers::AttributeMetadata> attributes;
    50 		std::vector<relpipe::reader::handlers::AttributeMetadata> attributes;
    49 		relpipe::common::type::Integer attributeIndex = 0;
    51 		relpipe::common::type::Integer attributeIndex = 0;
    50 		std::string currentValue;
    52 		std::string currentValue;
    51 	} currentRelation;
    53 	} currentRelation;
    52 
    54 
    53 public:
    55 public:
    54 
    56 
    55 	ZeroMQHandler(Configuration configuration) : configuration(configuration) {
    57 	ZeroMQHandler(Configuration configuration) : configuration(configuration), zmqSocket(zmqContext, zmq::socket_type::push) {
    56 		// TODO: do not throw exception from the constructor: ZeroMQ::open()
    58 		zmqSocket.connect(convertor.to_bytes(configuration.endpointUrl));
    57 		mq.reset(ZeroMQ::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose));
       
    58 	}
    59 	}
    59 
    60 
    60 	void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
    61 	void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
    61 		currentRelation = CurrentRelation{name, attributes};
    62 		currentRelation = CurrentRelation{name, attributes};
    62 	}
    63 	}
    70 		else if (attributeName == L"data"); // keep empty or value from 'text'
    71 		else if (attributeName == L"data"); // keep empty or value from 'text'
    71 
    72 
    72 		currentRelation.attributeIndex++;
    73 		currentRelation.attributeIndex++;
    73 		if (currentRelation.attributeIndex == currentRelation.attributes.size()) {
    74 		if (currentRelation.attributeIndex == currentRelation.attributes.size()) {
    74 			currentRelation.attributeIndex = 0;
    75 			currentRelation.attributeIndex = 0;
    75 			mq->send(currentRelation.currentValue);
    76 			zmqSocket.send(currentRelation.currentValue.c_str(), currentRelation.currentValue.size(), 0); // FIXME: check return value
    76 		}
    77 		}
    77 
    78 
    78 	}
    79 	}
    79 
    80 
    80 	void endOfPipe() {
    81 	void endOfPipe() {