# HG changeset patch # User František Kučera # Date 1651016855 -7200 # Node ID 6a2ae23c53c4a93bc7c3ec0a13ee0cea71a13571 # Parent 5499cbd842ab288c4952f198997beffd9b2d1ca6 first version of Kafka consumer diff -r 5499cbd842ab -r 6a2ae23c53c4 src/KafkaCommand.cpp --- a/src/KafkaCommand.cpp Sun Apr 24 22:21:02 2022 +0200 +++ b/src/KafkaCommand.cpp Wed Apr 27 01:47:35 2022 +0200 @@ -19,6 +19,8 @@ #include #include +#include + #include #include #include @@ -26,6 +28,8 @@ #include #include +#include +#include #include "KafkaCommand.h" #include "Hex.h" @@ -38,22 +42,76 @@ namespace in { namespace kafka { +static void check(RdKafka::Conf::ConfResult result, const std::string& errString) { + if (result != RdKafka::Conf::CONF_OK) { + throw std::logic_error("Unable to configure Kafka: " + errString); + } +} + +static void check(RdKafka::ErrorCode result, const std::string& errString) { + if (result != RdKafka::ERR_NO_ERROR) { + throw std::logic_error("Kafka error: " + errString); + } +} + +static void check(std::shared_ptr kafkaConsumer, const std::string& errString) { + if (kafkaConsumer.get() == nullptr) { + throw std::logic_error("Unable to create Kafka consumer: " + errString); + } +} + void KafkaCommand::process(std::shared_ptr writer, Configuration& configuration) { vector metadata; + std::string errString; + std::shared_ptr consumerConf(RdKafka::Conf::create(RdKafka::Conf::ConfType::CONF_GLOBAL)); + + // TODO: configurable groupId, clientId and other parameters + std::string groupId = "relpipe-in-kafka-group-" + std::to_string(getpid()); + std::string clientId = "relpipe-in-kafka-client-" + std::to_string(getpid()); + + check(consumerConf->set("client.id", clientId, errString), errString); + check(consumerConf->set("group.id", groupId, errString), errString); + check(consumerConf->set("bootstrap.servers", "plaintext://127.0.0.1:9092", errString), errString); + //check(consumerConf->set("auto.offset.reset", "earliest", errString), errString); + //check(consumerConf->set("debug", "all", errString), errString); + + std::shared_ptr kafkaConsumer(RdKafka::KafkaConsumer::create(consumerConf.get(), errString)); + check(kafkaConsumer, errString); + + check(kafkaConsumer->subscribe({"relpipe"}), errString); + + writer->startRelation(configuration.relation,{ {L"queue", TypeId::STRING}, {L"text", TypeId::STRING}, {L"data", TypeId::STRING} }, true); - for (int i = configuration.messageCount; i > 0; i--) { - std::string message = "TODO: read message from Kafka"; - writer->writeAttribute(configuration.queue); - writer->writeAttribute(Hex::toTxt(message)); - writer->writeAttribute(Hex::toHex(message)); + for (int i = configuration.messageCount; continueProcessing && i > 0;) { + shared_ptr message(kafkaConsumer->consume(100)); + if (message.get() && message->err() == 0) { + std::string payload = message->payload() ? std::string((const char*) message->payload(), message->len()) : std::string(""); + writer->writeAttribute(convertor.from_bytes(message->topic_name())); + writer->writeAttribute(Hex::toTxt(payload)); + writer->writeAttribute(Hex::toHex(payload)); + i--; + } else if (message->err() == RdKafka::ErrorCode::ERR__TIMED_OUT) { + // timeout → try again + } else if (message->err() == RdKafka::ErrorCode::ERR__PARTITION_EOF) { + // reached the end of the topic/partition → try again + } else { + std::string m = "error while reading message: " + (message.get() ? message->errstr() : "message is missing"); + writer->writeAttribute(configuration.queue); + writer->writeAttribute(Hex::toTxt(m)); + writer->writeAttribute(Hex::toHex(m)); + break; + } } + // TODO: wrap and close even on exception + check(kafkaConsumer->close(), errString); + } KafkaCommand::~KafkaCommand() { diff -r 5499cbd842ab -r 6a2ae23c53c4 src/KafkaCommand.h --- a/src/KafkaCommand.h Sun Apr 24 22:21:02 2022 +0200 +++ b/src/KafkaCommand.h Wed Apr 27 01:47:35 2022 +0200 @@ -20,6 +20,7 @@ #include #include #include +#include #include @@ -32,11 +33,16 @@ class KafkaCommand { private: std::wstring_convert> convertor; // TODO: support also other encodings. + std::atomic continueProcessing{true}; public: virtual ~KafkaCommand(); void process(std::shared_ptr writer, Configuration& configuration); + void finish(int sig) { + continueProcessing = false; + } + }; } diff -r 5499cbd842ab -r 6a2ae23c53c4 src/relpipe-in-kafka.cpp --- a/src/relpipe-in-kafka.cpp Sun Apr 24 22:21:02 2022 +0200 +++ b/src/relpipe-in-kafka.cpp Wed Apr 27 01:47:35 2022 +0200 @@ -15,6 +15,7 @@ * along with this program. If not, see . */ #include +#include #include #include #include @@ -28,6 +29,7 @@ #include #include +#include #include "KafkaCommand.h" #include "CLIParser.h" @@ -37,6 +39,12 @@ using namespace relpipe::writer; using namespace relpipe::in::kafka; +static std::shared_ptr kafkaCommand = nullptr; + +void finish(int sig) { + if (kafkaCommand) kafkaCommand->finish(sig); +} + int main(int argc, char** argv) { setlocale(LC_ALL, ""); CLI::untieStdIO(); @@ -45,11 +53,14 @@ int resultCode = CLI::EXIT_CODE_UNEXPECTED_ERROR; try { + signal(SIGHUP, finish); + signal(SIGINT, finish); CLIParser cliParser; Configuration configuration = cliParser.parse(cli.arguments()); - KafkaCommand command; + kafkaCommand.reset(new KafkaCommand()); std::shared_ptr writer(Factory::create(std::cout)); - command.process(writer, configuration); + writer->setBufferingMode(BufferingMode::ENVIRONMENT, BufferingMode::RECORD); + kafkaCommand->process(writer, configuration); resultCode = CLI::EXIT_CODE_SUCCESS; } catch (RelpipeCLIException e) { fwprintf(stderr, L"Caught CLI exception: %ls\n", e.getMessage().c_str());