equal
deleted
inserted
replaced
27 #include <relpipe/common/type/typedefs.h> |
27 #include <relpipe/common/type/typedefs.h> |
28 #include <relpipe/reader/TypeId.h> |
28 #include <relpipe/reader/TypeId.h> |
29 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h> |
29 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h> |
30 #include <relpipe/reader/handlers/AttributeMetadata.h> |
30 #include <relpipe/reader/handlers/AttributeMetadata.h> |
31 |
31 |
|
32 #include "PosixMQ.h" |
|
33 |
32 namespace relpipe { |
34 namespace relpipe { |
33 namespace out { |
35 namespace out { |
34 namespace posixmq { |
36 namespace posixmq { |
35 |
37 |
36 class PosixMQHandler : public relpipe::reader::handlers::RelationalReaderStringHandler { |
38 class PosixMQHandler : public relpipe::reader::handlers::RelationalReaderStringHandler { |
37 private: |
39 private: |
38 |
40 std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings. |
|
41 shared_ptr<PosixMQ> mq; |
|
42 |
39 public: |
43 public: |
40 |
44 |
41 PosixMQHandler(std::ostream& output) { |
45 PosixMQHandler(std::ostream& output) { |
|
46 relpipe::common::type::StringX queueName = L"/relpipe"; |
|
47 mq.reset(PosixMQ::open(convertor.to_bytes(queueName))); |
|
48 |
42 } |
49 } |
43 |
50 |
44 void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override { |
51 void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override { |
45 |
52 |
46 } |
53 } |
47 |
54 |
48 void attribute(const relpipe::common::type::StringX& value) override { |
55 void attribute(const relpipe::common::type::StringX& value) override { |
49 |
56 // TODO: send only certain attributes |
|
57 mq->send(convertor.to_bytes(value)); |
50 } |
58 } |
51 |
59 |
52 void endOfPipe() { |
60 void endOfPipe() { |
53 |
61 |
54 } |
62 } |
55 |
63 |
56 }; |
64 }; |
57 |
65 |
58 } |
66 } |