22 #include <iostream> |
22 #include <iostream> |
23 #include <sstream> |
23 #include <sstream> |
24 #include <locale> |
24 #include <locale> |
25 #include <codecvt> |
25 #include <codecvt> |
26 |
26 |
|
27 #include <zmq.hpp> |
|
28 |
27 #include <relpipe/common/type/typedefs.h> |
29 #include <relpipe/common/type/typedefs.h> |
28 #include <relpipe/reader/TypeId.h> |
30 #include <relpipe/reader/TypeId.h> |
29 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h> |
31 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h> |
30 #include <relpipe/reader/handlers/AttributeMetadata.h> |
32 #include <relpipe/reader/handlers/AttributeMetadata.h> |
31 |
33 |
32 #include "ZeroMQ.h" |
|
33 #include "Configuration.h" |
34 #include "Configuration.h" |
34 #include "Hex.h" |
35 #include "Hex.h" |
35 |
36 |
36 namespace relpipe { |
37 namespace relpipe { |
37 namespace out { |
38 namespace out { |
39 |
40 |
40 class ZeroMQHandler : public relpipe::reader::handlers::RelationalReaderStringHandler { |
41 class ZeroMQHandler : public relpipe::reader::handlers::RelationalReaderStringHandler { |
41 private: |
42 private: |
42 std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings. |
43 std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings. |
43 Configuration configuration; |
44 Configuration configuration; |
44 shared_ptr<ZeroMQ> mq; |
45 zmq::context_t zmqContext; |
|
46 zmq::socket_t zmqSocket; |
45 |
47 |
46 struct CurrentRelation { |
48 struct CurrentRelation { |
47 relpipe::common::type::StringX name; |
49 relpipe::common::type::StringX name; |
48 std::vector<relpipe::reader::handlers::AttributeMetadata> attributes; |
50 std::vector<relpipe::reader::handlers::AttributeMetadata> attributes; |
49 relpipe::common::type::Integer attributeIndex = 0; |
51 relpipe::common::type::Integer attributeIndex = 0; |
50 std::string currentValue; |
52 std::string currentValue; |
51 } currentRelation; |
53 } currentRelation; |
52 |
54 |
53 public: |
55 public: |
54 |
56 |
55 ZeroMQHandler(Configuration configuration) : configuration(configuration) { |
57 ZeroMQHandler(Configuration configuration) : configuration(configuration), zmqSocket(zmqContext, zmq::socket_type::push) { |
56 // TODO: do not throw exception from the constructor: ZeroMQ::open() |
58 zmqSocket.connect(convertor.to_bytes(configuration.endpointUrl)); |
57 mq.reset(ZeroMQ::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose)); |
|
58 } |
59 } |
59 |
60 |
60 void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override { |
61 void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override { |
61 currentRelation = CurrentRelation{name, attributes}; |
62 currentRelation = CurrentRelation{name, attributes}; |
62 } |
63 } |
70 else if (attributeName == L"data"); // keep empty or value from 'text' |
71 else if (attributeName == L"data"); // keep empty or value from 'text' |
71 |
72 |
72 currentRelation.attributeIndex++; |
73 currentRelation.attributeIndex++; |
73 if (currentRelation.attributeIndex == currentRelation.attributes.size()) { |
74 if (currentRelation.attributeIndex == currentRelation.attributes.size()) { |
74 currentRelation.attributeIndex = 0; |
75 currentRelation.attributeIndex = 0; |
75 mq->send(currentRelation.currentValue); |
76 zmqSocket.send(currentRelation.currentValue.c_str(), currentRelation.currentValue.size(), 0); // FIXME: check return value |
76 } |
77 } |
77 |
78 |
78 } |
79 } |
79 |
80 |
80 void endOfPipe() { |
81 void endOfPipe() { |