src/KafkaHandler.h
branchv_0
changeset 1 d7824971fd9e
parent 0 f4d7e0965055
equal deleted inserted replaced
0:f4d7e0965055 1:d7824971fd9e
    21 #include <vector>
    21 #include <vector>
    22 #include <iostream>
    22 #include <iostream>
    23 #include <sstream>
    23 #include <sstream>
    24 #include <locale>
    24 #include <locale>
    25 #include <codecvt>
    25 #include <codecvt>
       
    26 #include <unistd.h>
       
    27 
       
    28 #include <librdkafka/rdkafkacpp.h>
    26 
    29 
    27 #include <relpipe/common/type/typedefs.h>
    30 #include <relpipe/common/type/typedefs.h>
    28 #include <relpipe/reader/TypeId.h>
    31 #include <relpipe/reader/TypeId.h>
    29 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
    32 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
    30 #include <relpipe/reader/handlers/AttributeMetadata.h>
    33 #include <relpipe/reader/handlers/AttributeMetadata.h>
    31 
    34 
    32 #include "Kafka.h"
       
    33 #include "Configuration.h"
    35 #include "Configuration.h"
    34 #include "Hex.h"
    36 #include "Hex.h"
    35 
    37 
    36 namespace relpipe {
    38 namespace relpipe {
    37 namespace out {
    39 namespace out {
    38 namespace kafka {
    40 namespace kafka {
    39 
    41 
       
    42 static void check(RdKafka::Conf::ConfResult result, const std::string& errString) {
       
    43 	if (result != RdKafka::Conf::CONF_OK) {
       
    44 		throw std::logic_error("Unable to configure Kafka: " + errString);
       
    45 	}
       
    46 }
       
    47 
       
    48 static void check(RdKafka::ErrorCode result, const std::string& errString) {
       
    49 	if (result != RdKafka::ERR_NO_ERROR) {
       
    50 		throw std::logic_error("Kafka error: " + errString);
       
    51 	}
       
    52 }
       
    53 
       
    54 static void check(std::shared_ptr<RdKafka::Producer> kafkaProducer, const std::string& errString) {
       
    55 	if (kafkaProducer.get() == nullptr) {
       
    56 		throw std::logic_error("Unable to create Kafka producer: " + errString);
       
    57 	}
       
    58 }
       
    59 
    40 class KafkaHandler : public relpipe::reader::handlers::RelationalReaderStringHandler {
    60 class KafkaHandler : public relpipe::reader::handlers::RelationalReaderStringHandler {
    41 private:
    61 private:
    42 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
    62 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
    43 	Configuration configuration;
    63 	Configuration configuration;
    44 	shared_ptr<Kafka> mq;
    64 	std::shared_ptr<RdKafka::Producer> kafkaProducer;
    45 
    65 
    46 	struct CurrentRelation {
    66 	struct CurrentRelation {
    47 		relpipe::common::type::StringX name;
    67 		relpipe::common::type::StringX name;
    48 		std::vector<relpipe::reader::handlers::AttributeMetadata> attributes;
    68 		std::vector<relpipe::reader::handlers::AttributeMetadata> attributes;
    49 		relpipe::common::type::Integer attributeIndex = 0;
    69 		relpipe::common::type::Integer attributeIndex = 0;
    50 		std::string currentValue;
    70 		std::string currentValue;
    51 	} currentRelation;
    71 	} currentRelation;
    52 
    72 
       
    73 	void connect() {
       
    74 		if (!kafkaProducer) {
       
    75 			std::string errString;
       
    76 			std::shared_ptr<RdKafka::Conf> producerConf(RdKafka::Conf::create(RdKafka::Conf::ConfType::CONF_GLOBAL));
       
    77 
       
    78 			// TODO: configurable groupId, clientId and other parameters
       
    79 			std::string groupId = "relpipe-in-kafka-group-" + std::to_string(getpid());
       
    80 			std::string clientId = "relpipe-in-kafka-client-" + std::to_string(getpid());
       
    81 
       
    82 			check(producerConf->set("client.id", clientId, errString), errString);
       
    83 			check(producerConf->set("group.id", groupId, errString), errString);
       
    84 			check(producerConf->set("bootstrap.servers", "plaintext://127.0.0.1:9092", errString), errString); // "host1:9092,host2:9092"    "192.168.1.56:9092"
       
    85 			//check(consumerConf->set("debug", "all", errString), errString);
       
    86 
       
    87 			kafkaProducer.reset(RdKafka::Producer::create(producerConf.get(), errString));
       
    88 			check(kafkaProducer, errString);
       
    89 		}
       
    90 	}
       
    91 
       
    92 	void sendMessage(const std::string& payload) {
       
    93 		std::string errString;
       
    94 		check(kafkaProducer->produce(
       
    95 				convertor.to_bytes(configuration.queue),
       
    96 				RdKafka::Topic::PARTITION_UA,
       
    97 				RdKafka::Producer::RK_MSG_COPY,
       
    98 				const_cast<char *> (payload.c_str()), payload.size(),
       
    99 				nullptr, 0,
       
   100 				0,
       
   101 				nullptr), errString);
       
   102 	}
       
   103 
    53 public:
   104 public:
    54 
   105 
    55 	KafkaHandler(Configuration configuration) : configuration(configuration) {
   106 	KafkaHandler(Configuration configuration) : configuration(configuration) {
    56 		mq.reset(Kafka::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose));
       
    57 	}
   107 	}
    58 
   108 
    59 	void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
   109 	void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
    60 		currentRelation = CurrentRelation{name, attributes};
   110 		currentRelation = CurrentRelation{name, attributes};
       
   111 		connect();
    61 	}
   112 	}
    62 
   113 
    63 	void attribute(const relpipe::common::type::StringX& value) override {
   114 	void attribute(const relpipe::common::type::StringX& value) override {
    64 
   115 
    65 		auto attributeName = currentRelation.attributes[currentRelation.attributeIndex].getAttributeName();
   116 		auto attributeName = currentRelation.attributes[currentRelation.attributeIndex].getAttributeName();
    67 		else if (attributeName == L"data") currentRelation.currentValue = Hex::fromHex(value).str();
   118 		else if (attributeName == L"data") currentRelation.currentValue = Hex::fromHex(value).str();
    68 
   119 
    69 		currentRelation.attributeIndex++;
   120 		currentRelation.attributeIndex++;
    70 		if (currentRelation.attributeIndex == currentRelation.attributes.size()) {
   121 		if (currentRelation.attributeIndex == currentRelation.attributes.size()) {
    71 			currentRelation.attributeIndex = 0;
   122 			currentRelation.attributeIndex = 0;
    72 			mq->send(currentRelation.currentValue);
   123 			sendMessage(currentRelation.currentValue);
    73 		}
   124 		}
    74 
   125 
    75 	}
   126 	}
    76 
   127 
    77 	void endOfPipe() {
   128 	void endOfPipe() {
    78 
   129 		std::string errString;
       
   130 		check(kafkaProducer->flush(60000), errString);
    79 	}
   131 	}
    80 
   132 
    81 };
   133 };
    82 
   134 
    83 }
   135 }