35 #include <relpipe/cli/CLI.h> |
36 #include <relpipe/cli/CLI.h> |
36 |
37 |
37 #include "MQTTCommand.h" |
38 #include "MQTTCommand.h" |
38 #include "Hex.h" |
39 #include "Hex.h" |
39 |
40 |
40 using namespace std; |
|
41 using namespace relpipe::cli; |
41 using namespace relpipe::cli; |
42 using namespace relpipe::writer; |
42 using namespace relpipe::writer; |
43 |
43 |
44 namespace relpipe { |
44 namespace relpipe { |
45 namespace in { |
45 namespace in { |
46 namespace mqtt { |
46 namespace mqtt { |
47 |
47 |
48 void MQTTCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) { |
48 class MQTTClient : public mosqpp::mosquittopp { |
49 vector<AttributeMetadata> metadata; |
49 private: |
|
50 std::shared_ptr<writer::RelationalWriter> writer; |
|
51 Configuration& configuration; |
|
52 int messageCount = 0; |
|
53 std::string clientId; |
50 |
54 |
51 { |
55 /** |
52 // TODO: remove |
56 * @return unique (random) client ID for MQTT to allow multiple simultaneous connections |
53 int major, minor, patch; |
57 */ |
54 mosqpp::lib_version(&major, &minor, &patch); |
58 static std::string generateClientID() { |
55 std::cerr << "mosquitto version: " << major << "." << minor << "." << patch << std::endl; |
59 std::stringstream result; |
|
60 // result << "relpipe-in-"; |
|
61 std::string symbols("0123456789abcdef"); |
|
62 |
|
63 std::random_device dev; |
|
64 std::mt19937 rng(dev()); |
|
65 std::uniform_int_distribution<std::mt19937::result_type> dist6(0, symbols.size()); |
|
66 |
|
67 for (int i = 0; i < 8; i++) result << symbols[dist6(rng)]; |
|
68 |
|
69 // std::cerr << "generated clien ID = " << result.str() << std::endl; |
|
70 return result.str(); |
|
71 } |
|
72 public: |
|
73 |
|
74 MQTTClient(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) : mosqpp::mosquittopp((generateClientID()).c_str()), writer(writer), configuration(configuration) { |
56 } |
75 } |
57 |
76 |
|
77 void on_message(const mosquitto_message* message) override { |
|
78 // std::cerr << "got MQTT message: length=" << message->payloadlen << std::endl; |
|
79 std::string payload = std::string((const char*) message->payload, message->payloadlen); |
|
80 writer->writeAttribute(configuration.stream); |
|
81 writer->writeAttribute(Hex::toTxt(payload)); |
|
82 writer->writeAttribute(Hex::toHex(payload)); |
|
83 messageCount++; |
|
84 } |
|
85 |
|
86 int popMessageCount() { |
|
87 int count = messageCount; |
|
88 messageCount = 0; |
|
89 return count; |
|
90 } |
|
91 |
|
92 }; |
|
93 |
|
94 void MQTTCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) { |
|
95 std::shared_ptr<MQTTClient> mq = std::make_shared<MQTTClient>(writer, configuration); |
|
96 |
58 writer->startRelation(configuration.relation,{ |
97 writer->startRelation(configuration.relation,{ |
59 {L"queue", TypeId::STRING}, |
98 {L"stream", TypeId::STRING}, |
60 {L"text", TypeId::STRING}, |
99 {L"text", TypeId::STRING}, |
61 {L"data", TypeId::STRING} |
100 {L"data", TypeId::STRING} |
62 }, true); |
101 }, true); |
63 |
102 |
64 for (int i = configuration.messageCount; continueProcessing && i > 0; i--) { |
103 mq->max_inflight_messages_set(1); |
65 // TODO: maybe rather call mq_timedreceive() inside and check continueProcessing (to be able to stop even when no messages are comming) |
104 mq->connect("localhost", 1883); |
66 std::string message = "TODO"; // FIXME: receive message |
105 int mid; |
|
106 mq->subscribe(&mid, convertor.to_bytes(configuration.stream).c_str()); |
67 |
107 |
68 writer->writeAttribute(configuration.queue); |
108 //for (int i = configuration.messageCount; continueProcessing && i > 0; i--) { |
69 writer->writeAttribute(Hex::toTxt(message)); |
109 for (int i = configuration.messageCount; continueProcessing && i > 0; i = i - mq->popMessageCount()) { |
70 writer->writeAttribute(Hex::toHex(message)); |
110 // std::cerr << "loop(): i=" << i << std::endl; |
|
111 |
|
112 //mq->loop(); |
|
113 mq->loop(1000, 1); |
|
114 //mq->loop(1000, -1); |
|
115 //mq->loop_forever(); |
|
116 //mq->loop_write(); |
71 } |
117 } |
72 |
118 |
|
119 // FIXME: move do destructor |
|
120 mq->disconnect(); |
73 } |
121 } |
74 |
122 |
75 MQTTCommand::~MQTTCommand() { |
123 MQTTCommand::~MQTTCommand() { |
76 } |
124 } |
77 |
125 |