src/KafkaCommand.cpp
branchv_0
changeset 1 6a2ae23c53c4
parent 0 5499cbd842ab
equal deleted inserted replaced
0:5499cbd842ab 1:6a2ae23c53c4
    17 #include <vector>
    17 #include <vector>
    18 #include <memory>
    18 #include <memory>
    19 #include <locale>
    19 #include <locale>
    20 #include <algorithm>
    20 #include <algorithm>
    21 
    21 
       
    22 #include <librdkafka/rdkafkacpp.h>
       
    23 
    22 #include <relpipe/writer/RelationalWriter.h>
    24 #include <relpipe/writer/RelationalWriter.h>
    23 #include <relpipe/writer/RelpipeWriterException.h>
    25 #include <relpipe/writer/RelpipeWriterException.h>
    24 #include <relpipe/writer/AttributeMetadata.h>
    26 #include <relpipe/writer/AttributeMetadata.h>
    25 #include <relpipe/writer/Factory.h>
    27 #include <relpipe/writer/Factory.h>
    26 #include <relpipe/writer/TypeId.h>
    28 #include <relpipe/writer/TypeId.h>
    27 
    29 
    28 #include <relpipe/cli/CLI.h>
    30 #include <relpipe/cli/CLI.h>
       
    31 #include <condition_variable>
       
    32 #include <unistd.h>
    29 
    33 
    30 #include "KafkaCommand.h"
    34 #include "KafkaCommand.h"
    31 #include "Hex.h"
    35 #include "Hex.h"
    32 
    36 
    33 using namespace std;
    37 using namespace std;
    36 
    40 
    37 namespace relpipe {
    41 namespace relpipe {
    38 namespace in {
    42 namespace in {
    39 namespace kafka {
    43 namespace kafka {
    40 
    44 
       
    45 static void check(RdKafka::Conf::ConfResult result, const std::string& errString) {
       
    46 	if (result != RdKafka::Conf::CONF_OK) {
       
    47 		throw std::logic_error("Unable to configure Kafka: " + errString);
       
    48 	}
       
    49 }
       
    50 
       
    51 static void check(RdKafka::ErrorCode result, const std::string& errString) {
       
    52 	if (result != RdKafka::ERR_NO_ERROR) {
       
    53 		throw std::logic_error("Kafka error: " + errString);
       
    54 	}
       
    55 }
       
    56 
       
    57 static void check(std::shared_ptr<RdKafka::KafkaConsumer> kafkaConsumer, const std::string& errString) {
       
    58 	if (kafkaConsumer.get() == nullptr) {
       
    59 		throw std::logic_error("Unable to create Kafka consumer: " + errString);
       
    60 	}
       
    61 }
       
    62 
    41 void KafkaCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) {
    63 void KafkaCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) {
    42 	vector<AttributeMetadata> metadata;
    64 	vector<AttributeMetadata> metadata;
       
    65 
       
    66 	std::string errString;
       
    67 	std::shared_ptr<RdKafka::Conf> consumerConf(RdKafka::Conf::create(RdKafka::Conf::ConfType::CONF_GLOBAL));
       
    68 
       
    69 	// TODO: configurable groupId, clientId and other parameters
       
    70 	std::string groupId = "relpipe-in-kafka-group-" + std::to_string(getpid());
       
    71 	std::string clientId = "relpipe-in-kafka-client-" + std::to_string(getpid());
       
    72 
       
    73 	check(consumerConf->set("client.id", clientId, errString), errString);
       
    74 	check(consumerConf->set("group.id", groupId, errString), errString);
       
    75 	check(consumerConf->set("bootstrap.servers", "plaintext://127.0.0.1:9092", errString), errString);
       
    76 	//check(consumerConf->set("auto.offset.reset", "earliest", errString), errString);
       
    77 	//check(consumerConf->set("debug", "all", errString), errString);
       
    78 
       
    79 	std::shared_ptr<RdKafka::KafkaConsumer> kafkaConsumer(RdKafka::KafkaConsumer::create(consumerConf.get(), errString));
       
    80 	check(kafkaConsumer, errString);
       
    81 
       
    82 	check(kafkaConsumer->subscribe({"relpipe"}), errString);
       
    83 
    43 
    84 
    44 	writer->startRelation(configuration.relation,{
    85 	writer->startRelation(configuration.relation,{
    45 		{L"queue", TypeId::STRING},
    86 		{L"queue", TypeId::STRING},
    46 		{L"text", TypeId::STRING},
    87 		{L"text", TypeId::STRING},
    47 		{L"data", TypeId::STRING}
    88 		{L"data", TypeId::STRING}
    48 	}, true);
    89 	}, true);
    49 
    90 
    50 	for (int i = configuration.messageCount; i > 0; i--) {
    91 	for (int i = configuration.messageCount; continueProcessing && i > 0;) {
    51 		std::string message = "TODO: read message from Kafka";
    92 		shared_ptr<RdKafka::Message> message(kafkaConsumer->consume(100));
    52 		writer->writeAttribute(configuration.queue);
    93 		if (message.get() && message->err() == 0) {
    53 		writer->writeAttribute(Hex::toTxt(message));
    94 			std::string payload = message->payload() ? std::string((const char*) message->payload(), message->len()) : std::string("");
    54 		writer->writeAttribute(Hex::toHex(message));
    95 			writer->writeAttribute(convertor.from_bytes(message->topic_name()));
       
    96 			writer->writeAttribute(Hex::toTxt(payload));
       
    97 			writer->writeAttribute(Hex::toHex(payload));
       
    98 			i--;
       
    99 		} else if (message->err() == RdKafka::ErrorCode::ERR__TIMED_OUT) {
       
   100 			// timeout → try again
       
   101 		} else if (message->err() == RdKafka::ErrorCode::ERR__PARTITION_EOF) {
       
   102 			// reached the end of the topic/partition → try again
       
   103 		} else {
       
   104 			std::string m = "error while reading message: " + (message.get() ? message->errstr() : "message is missing");
       
   105 			writer->writeAttribute(configuration.queue);
       
   106 			writer->writeAttribute(Hex::toTxt(m));
       
   107 			writer->writeAttribute(Hex::toHex(m));
       
   108 			break;
       
   109 		}
    55 	}
   110 	}
       
   111 
       
   112 	// TODO: wrap and close even on exception
       
   113 	check(kafkaConsumer->close(), errString);
    56 
   114 
    57 }
   115 }
    58 
   116 
    59 KafkaCommand::~KafkaCommand() {
   117 KafkaCommand::~KafkaCommand() {
    60 }
   118 }