src/MQTTCommand.cpp
branchv_0
changeset 1 4993a084b8ba
parent 0 7fdf75648c0a
child 2 0799eaf338b9
equal deleted inserted replaced
0:7fdf75648c0a 1:4993a084b8ba
    22 #include <algorithm>
    22 #include <algorithm>
    23 #include <unistd.h>
    23 #include <unistd.h>
    24 #include <sstream>
    24 #include <sstream>
    25 #include <iomanip>
    25 #include <iomanip>
    26 
    26 
       
    27 #include <mosquittopp.h>
       
    28 
    27 #include <relpipe/writer/RelationalWriter.h>
    29 #include <relpipe/writer/RelationalWriter.h>
    28 #include <relpipe/writer/RelpipeWriterException.h>
    30 #include <relpipe/writer/RelpipeWriterException.h>
    29 #include <relpipe/writer/AttributeMetadata.h>
    31 #include <relpipe/writer/AttributeMetadata.h>
    30 #include <relpipe/writer/Factory.h>
    32 #include <relpipe/writer/Factory.h>
    31 #include <relpipe/writer/TypeId.h>
    33 #include <relpipe/writer/TypeId.h>
    32 
    34 
    33 #include <relpipe/cli/CLI.h>
    35 #include <relpipe/cli/CLI.h>
    34 
    36 
    35 #include "MQTTCommand.h"
    37 #include "MQTTCommand.h"
    36 #include "MQTT.h"
       
    37 #include "Hex.h"
    38 #include "Hex.h"
    38 
    39 
    39 using namespace std;
    40 using namespace std;
    40 using namespace relpipe::cli;
    41 using namespace relpipe::cli;
    41 using namespace relpipe::writer;
    42 using namespace relpipe::writer;
    45 namespace mqtt {
    46 namespace mqtt {
    46 
    47 
    47 void MQTTCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) {
    48 void MQTTCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) {
    48 	vector<AttributeMetadata> metadata;
    49 	vector<AttributeMetadata> metadata;
    49 
    50 
    50 	std::shared_ptr<MQTT> mq(MQTT::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose));
    51 	{
       
    52 		// TODO: remove
       
    53 		int major, minor, patch;
       
    54 		mosqpp::lib_version(&major, &minor, &patch);
       
    55 		std::cerr << "mosquitto version: " << major << "." << minor << "." << patch << std::endl;
       
    56 	}
    51 
    57 
    52 	writer->startRelation(configuration.relation,{
    58 	writer->startRelation(configuration.relation,{
    53 		{L"queue", TypeId::STRING},
    59 		{L"queue", TypeId::STRING},
    54 		{L"text", TypeId::STRING},
    60 		{L"text", TypeId::STRING},
    55 		{L"data", TypeId::STRING}
    61 		{L"data", TypeId::STRING}
    56 	}, true);
    62 	}, true);
    57 
    63 
    58 	for (int i = configuration.messageCount; continueProcessing && i > 0; i--) {
    64 	for (int i = configuration.messageCount; continueProcessing && i > 0; i--) {
    59 		// TODO: maybe rather call mq_timedreceive() inside and check continueProcessing (to be able to stop even when no messages are comming)
    65 		// TODO: maybe rather call mq_timedreceive() inside and check continueProcessing (to be able to stop even when no messages are comming)
    60 		std::string message = mq->receive();
    66 		std::string message = "TODO"; // FIXME: receive message
    61 
    67 
    62 		writer->writeAttribute(configuration.queue);
    68 		writer->writeAttribute(configuration.queue);
    63 		writer->writeAttribute(Hex::toTxt(message));
    69 		writer->writeAttribute(Hex::toTxt(message));
    64 		writer->writeAttribute(Hex::toHex(message));
    70 		writer->writeAttribute(Hex::toHex(message));
    65 	}
    71 	}