29 #include <relpipe/writer/TypeId.h> |
29 #include <relpipe/writer/TypeId.h> |
30 |
30 |
31 #include <relpipe/cli/CLI.h> |
31 #include <relpipe/cli/CLI.h> |
32 |
32 |
33 #include "PosixMQCommand.h" |
33 #include "PosixMQCommand.h" |
|
34 #include "PosixMQ.h" |
34 |
35 |
35 using namespace std; |
36 using namespace std; |
36 using namespace relpipe::cli; |
37 using namespace relpipe::cli; |
37 using namespace relpipe::writer; |
38 using namespace relpipe::writer; |
38 |
39 |
39 namespace relpipe { |
40 namespace relpipe { |
40 namespace in { |
41 namespace in { |
41 namespace posixmq { |
42 namespace posixmq { |
42 |
43 |
43 void PosixMQCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) { |
44 void PosixMQCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) { |
44 wstring_convert < codecvt_utf8<wchar_t>> convertor; // UTF-8 is required for PosixMQ |
|
45 vector<AttributeMetadata> metadata; |
45 vector<AttributeMetadata> metadata; |
46 |
46 |
47 writer->startRelation(L"posix_mq",{ |
47 std::shared_ptr<PosixMQ> mq(PosixMQ::open(convertor.to_bytes(configuration.queue))); |
|
48 |
|
49 writer->startRelation(configuration.relation,{ |
|
50 {L"queue", TypeId::STRING}, |
48 {L"message", TypeId::STRING} |
51 {L"message", TypeId::STRING} |
49 }, true); |
52 }, true); |
50 |
|
51 writer->writeAttribute(L"TODO: read messages from POSIX MQ"); |
|
52 |
53 |
|
54 for (int i = configuration.messageCount; i > 0; i--) { |
|
55 writer->writeAttribute(configuration.queue); |
|
56 writer->writeAttribute(convertor.from_bytes(mq->receive())); |
|
57 } |
53 |
58 |
|
59 } |
|
60 |
|
61 PosixMQCommand::~PosixMQCommand() { |
54 } |
62 } |
55 |
63 |
56 } |
64 } |
57 } |
65 } |
58 } |
66 } |