src/MQTTCommand.cpp
branchv_0
changeset 2 0799eaf338b9
parent 1 4993a084b8ba
child 3 610783d70ae9
equal deleted inserted replaced
1:4993a084b8ba 2:0799eaf338b9
    21 #include <regex>
    21 #include <regex>
    22 #include <algorithm>
    22 #include <algorithm>
    23 #include <unistd.h>
    23 #include <unistd.h>
    24 #include <sstream>
    24 #include <sstream>
    25 #include <iomanip>
    25 #include <iomanip>
       
    26 #include <random>
    26 
    27 
    27 #include <mosquittopp.h>
    28 #include <mosquittopp.h>
    28 
    29 
    29 #include <relpipe/writer/RelationalWriter.h>
    30 #include <relpipe/writer/RelationalWriter.h>
    30 #include <relpipe/writer/RelpipeWriterException.h>
    31 #include <relpipe/writer/RelpipeWriterException.h>
    35 #include <relpipe/cli/CLI.h>
    36 #include <relpipe/cli/CLI.h>
    36 
    37 
    37 #include "MQTTCommand.h"
    38 #include "MQTTCommand.h"
    38 #include "Hex.h"
    39 #include "Hex.h"
    39 
    40 
    40 using namespace std;
       
    41 using namespace relpipe::cli;
    41 using namespace relpipe::cli;
    42 using namespace relpipe::writer;
    42 using namespace relpipe::writer;
    43 
    43 
    44 namespace relpipe {
    44 namespace relpipe {
    45 namespace in {
    45 namespace in {
    46 namespace mqtt {
    46 namespace mqtt {
    47 
    47 
    48 void MQTTCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) {
    48 class MQTTClient : public mosqpp::mosquittopp {
    49 	vector<AttributeMetadata> metadata;
    49 private:
       
    50 	std::shared_ptr<writer::RelationalWriter> writer;
       
    51 	Configuration& configuration;
       
    52 	int messageCount = 0;
       
    53 	std::string clientId;
    50 
    54 
    51 	{
    55 	/**
    52 		// TODO: remove
    56 	 * @return unique (random) client ID for MQTT to allow multiple simultaneous connections
    53 		int major, minor, patch;
    57 	 */
    54 		mosqpp::lib_version(&major, &minor, &patch);
    58 	static std::string generateClientID() {
    55 		std::cerr << "mosquitto version: " << major << "." << minor << "." << patch << std::endl;
    59 		std::stringstream result;
       
    60 		// result << "relpipe-in-";
       
    61 		std::string symbols("0123456789abcdef");
       
    62 
       
    63 		std::random_device dev;
       
    64 		std::mt19937 rng(dev());
       
    65 		std::uniform_int_distribution<std::mt19937::result_type> dist6(0, symbols.size());
       
    66 
       
    67 		for (int i = 0; i < 8; i++) result << symbols[dist6(rng)];
       
    68 
       
    69 		// std::cerr << "generated clien ID = " << result.str() << std::endl;
       
    70 		return result.str();
       
    71 	}
       
    72 public:
       
    73 
       
    74 	MQTTClient(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) : mosqpp::mosquittopp((generateClientID()).c_str()), writer(writer), configuration(configuration) {
    56 	}
    75 	}
    57 
    76 
       
    77 	void on_message(const mosquitto_message* message) override {
       
    78 		// std::cerr << "got MQTT message: length=" << message->payloadlen << std::endl;
       
    79 		std::string payload = std::string((const char*) message->payload, message->payloadlen);
       
    80 		writer->writeAttribute(configuration.stream);
       
    81 		writer->writeAttribute(Hex::toTxt(payload));
       
    82 		writer->writeAttribute(Hex::toHex(payload));
       
    83 		messageCount++;
       
    84 	}
       
    85 
       
    86 	int popMessageCount() {
       
    87 		int count = messageCount;
       
    88 		messageCount = 0;
       
    89 		return count;
       
    90 	}
       
    91 
       
    92 };
       
    93 
       
    94 void MQTTCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) {
       
    95 	std::shared_ptr<MQTTClient> mq = std::make_shared<MQTTClient>(writer, configuration);
       
    96 
    58 	writer->startRelation(configuration.relation,{
    97 	writer->startRelation(configuration.relation,{
    59 		{L"queue", TypeId::STRING},
    98 		{L"stream", TypeId::STRING},
    60 		{L"text", TypeId::STRING},
    99 		{L"text", TypeId::STRING},
    61 		{L"data", TypeId::STRING}
   100 		{L"data", TypeId::STRING}
    62 	}, true);
   101 	}, true);
    63 
   102 
    64 	for (int i = configuration.messageCount; continueProcessing && i > 0; i--) {
   103 	mq->max_inflight_messages_set(1);
    65 		// TODO: maybe rather call mq_timedreceive() inside and check continueProcessing (to be able to stop even when no messages are comming)
   104 	mq->connect("localhost", 1883);
    66 		std::string message = "TODO"; // FIXME: receive message
   105 	int mid;
       
   106 	mq->subscribe(&mid, convertor.to_bytes(configuration.stream).c_str());
    67 
   107 
    68 		writer->writeAttribute(configuration.queue);
   108 	//for (int i = configuration.messageCount; continueProcessing && i > 0; i--) {
    69 		writer->writeAttribute(Hex::toTxt(message));
   109 	for (int i = configuration.messageCount; continueProcessing && i > 0; i = i - mq->popMessageCount()) {
    70 		writer->writeAttribute(Hex::toHex(message));
   110 		// std::cerr << "loop(): i=" << i << std::endl;
       
   111 
       
   112 		//mq->loop();
       
   113 		mq->loop(1000, 1);
       
   114 		//mq->loop(1000, -1);
       
   115 		//mq->loop_forever();
       
   116 		//mq->loop_write();
    71 	}
   117 	}
    72 
   118 
       
   119 	// FIXME: move do destructor
       
   120 	mq->disconnect();
    73 }
   121 }
    74 
   122 
    75 MQTTCommand::~MQTTCommand() {
   123 MQTTCommand::~MQTTCommand() {
    76 }
   124 }
    77 
   125