45 namespace socket { |
45 namespace socket { |
46 |
46 |
47 void SocketCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) { |
47 void SocketCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) { |
48 vector<AttributeMetadata> metadata; |
48 vector<AttributeMetadata> metadata; |
49 |
49 |
50 std::shared_ptr<Socket> mq(Socket::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose)); |
50 std::shared_ptr<Socket> socket = std::make_shared<Socket>(); // TODO: create a TCP, UDP… socket |
51 |
51 |
52 writer->startRelation(configuration.relation,{ |
52 writer->startRelation(configuration.relation,{ |
53 {L"queue", TypeId::STRING}, |
53 {L"queue", TypeId::STRING}, |
54 {L"text", TypeId::STRING}, |
54 {L"text", TypeId::STRING}, |
55 {L"data", TypeId::STRING} |
55 {L"data", TypeId::STRING} |
56 }, true); |
56 }, true); |
57 |
57 |
58 for (int i = configuration.messageCount; continueProcessing && i > 0; i--) { |
58 for (int i = configuration.messageCount; continueProcessing && i > 0; i--) { |
59 // TODO: maybe rather call mq_timedreceive() inside and check continueProcessing (to be able to stop even when no messages are comming) |
59 // TODO: maybe rather call mq_timedreceive() inside and check continueProcessing (to be able to stop even when no messages are comming) |
60 std::string message = mq->receive(); |
60 std::string message = socket->receive(); |
61 |
61 |
62 writer->writeAttribute(configuration.queue); |
62 writer->writeAttribute(configuration.queue); |
63 writer->writeAttribute(Hex::toTxt(message)); |
63 writer->writeAttribute(Hex::toTxt(message)); |
64 writer->writeAttribute(Hex::toHex(message)); |
64 writer->writeAttribute(Hex::toHex(message)); |
65 } |
65 } |