# HG changeset patch # User František Kučera # Date 1651093776 -7200 # Node ID d7824971fd9ed4ef0b2d2ca0878dc945c18ec319 # Parent f4d7e0965055891e6d50735ae29a78babbfd3152 first version of Kafka producer diff -r f4d7e0965055 -r d7824971fd9e src/CMakeLists.txt --- a/src/CMakeLists.txt Wed Apr 27 21:10:10 2022 +0200 +++ b/src/CMakeLists.txt Wed Apr 27 23:09:36 2022 +0200 @@ -17,7 +17,7 @@ # Relpipe libraries: INCLUDE(FindPkgConfig) -pkg_check_modules (RELPIPE_LIBS relpipe-lib-reader.cpp relpipe-lib-cli.cpp) +pkg_check_modules (RELPIPE_LIBS relpipe-lib-reader.cpp relpipe-lib-cli.cpp rdkafka++) include_directories(${RELPIPE_LIBS_INCLUDE_DIRS}) link_directories(${RELPIPE_LIBS_LIBRARY_DIRS}) @@ -34,7 +34,7 @@ ) # Link libraries: -target_link_libraries(${EXECUTABLE_FILE} ${RELPIPE_LIBS_LIBRARIES} rt) +target_link_libraries(${EXECUTABLE_FILE} ${RELPIPE_LIBS_LIBRARIES}) set_property(TARGET ${EXECUTABLE_FILE} PROPERTY INSTALL_RPATH_USE_LINK_PATH TRUE) install(TARGETS ${EXECUTABLE_FILE} DESTINATION bin) diff -r f4d7e0965055 -r d7824971fd9e src/Configuration.h --- a/src/Configuration.h Wed Apr 27 21:10:10 2022 +0200 +++ b/src/Configuration.h Wed Apr 27 23:09:36 2022 +0200 @@ -30,7 +30,7 @@ public: relpipe::common::type::StringX relation = L"kafka"; - relpipe::common::type::StringX queue = L"/relpipe"; + relpipe::common::type::StringX queue = L"relpipe"; relpipe::common::type::Boolean unlinkOnClose = false; virtual ~Configuration() { diff -r f4d7e0965055 -r d7824971fd9e src/Kafka.h --- a/src/Kafka.h Wed Apr 27 21:10:10 2022 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,70 +0,0 @@ -/** - * Relational pipes - * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info) - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, version 3 of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -#pragma once - -#include -#include -#include -#include - -namespace relpipe { -namespace out { -namespace kafka { - -class Kafka { -private: - const static size_t MSG_SIZE = 8192; // TODO: configurable/dynamic - std::string queueName; - mqd_t handle = -2; - bool unlinkOnClose = false; - - Kafka(std::string queueName, mqd_t handle, bool unlinkOnClose) : queueName(queueName), handle(handle), unlinkOnClose(unlinkOnClose) { - } - -public: - - virtual ~Kafka() { - if (handle >= 0) mq_close(handle); - if (unlinkOnClose) mq_unlink(queueName.c_str()); - } - - static Kafka* open(std::string queueName, bool unlinkOnClose = false) { - mqd_t handle = mq_open(queueName.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); - if (handle >= 0) return new Kafka(queueName, handle, unlinkOnClose); - else throw std::logic_error("Unable to open Kafka: " + queueName + " error: " + strerror(errno)); - } - - void send(std::string message) { - int result = mq_send(handle, message.c_str(), message.size(), 0); - if (result) throw std::logic_error("Unable to send message to" + queueName + " error: " + strerror(errno)); - } - - std::string receive() { - char buffer[MSG_SIZE + 1]; - memset(buffer, 0, MSG_SIZE + 1); - ssize_t msgSize = mq_receive(handle, buffer, MSG_SIZE, nullptr); - - if (msgSize > sizeof (buffer))throw std::logic_error("Invalid Kafka message size."); - else if (msgSize >= 0) return std::string(buffer, msgSize); - else throw std::logic_error("Unable to receive Kafka message from " + queueName + " error: " + strerror(errno)); - } - -}; - -} -} -} diff -r f4d7e0965055 -r d7824971fd9e src/KafkaHandler.h --- a/src/KafkaHandler.h Wed Apr 27 21:10:10 2022 +0200 +++ b/src/KafkaHandler.h Wed Apr 27 23:09:36 2022 +0200 @@ -23,13 +23,15 @@ #include #include #include +#include + +#include #include #include #include #include -#include "Kafka.h" #include "Configuration.h" #include "Hex.h" @@ -37,11 +39,29 @@ namespace out { namespace kafka { +static void check(RdKafka::Conf::ConfResult result, const std::string& errString) { + if (result != RdKafka::Conf::CONF_OK) { + throw std::logic_error("Unable to configure Kafka: " + errString); + } +} + +static void check(RdKafka::ErrorCode result, const std::string& errString) { + if (result != RdKafka::ERR_NO_ERROR) { + throw std::logic_error("Kafka error: " + errString); + } +} + +static void check(std::shared_ptr kafkaProducer, const std::string& errString) { + if (kafkaProducer.get() == nullptr) { + throw std::logic_error("Unable to create Kafka producer: " + errString); + } +} + class KafkaHandler : public relpipe::reader::handlers::RelationalReaderStringHandler { private: std::wstring_convert> convertor; // TODO: support also other encodings. Configuration configuration; - shared_ptr mq; + std::shared_ptr kafkaProducer; struct CurrentRelation { relpipe::common::type::StringX name; @@ -50,14 +70,45 @@ std::string currentValue; } currentRelation; + void connect() { + if (!kafkaProducer) { + std::string errString; + std::shared_ptr producerConf(RdKafka::Conf::create(RdKafka::Conf::ConfType::CONF_GLOBAL)); + + // TODO: configurable groupId, clientId and other parameters + std::string groupId = "relpipe-in-kafka-group-" + std::to_string(getpid()); + std::string clientId = "relpipe-in-kafka-client-" + std::to_string(getpid()); + + check(producerConf->set("client.id", clientId, errString), errString); + check(producerConf->set("group.id", groupId, errString), errString); + check(producerConf->set("bootstrap.servers", "plaintext://127.0.0.1:9092", errString), errString); // "host1:9092,host2:9092" "192.168.1.56:9092" + //check(consumerConf->set("debug", "all", errString), errString); + + kafkaProducer.reset(RdKafka::Producer::create(producerConf.get(), errString)); + check(kafkaProducer, errString); + } + } + + void sendMessage(const std::string& payload) { + std::string errString; + check(kafkaProducer->produce( + convertor.to_bytes(configuration.queue), + RdKafka::Topic::PARTITION_UA, + RdKafka::Producer::RK_MSG_COPY, + const_cast (payload.c_str()), payload.size(), + nullptr, 0, + 0, + nullptr), errString); + } + public: KafkaHandler(Configuration configuration) : configuration(configuration) { - mq.reset(Kafka::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose)); } void startRelation(relpipe::common::type::StringX name, std::vector attributes) override { currentRelation = CurrentRelation{name, attributes}; + connect(); } void attribute(const relpipe::common::type::StringX& value) override { @@ -69,13 +120,14 @@ currentRelation.attributeIndex++; if (currentRelation.attributeIndex == currentRelation.attributes.size()) { currentRelation.attributeIndex = 0; - mq->send(currentRelation.currentValue); + sendMessage(currentRelation.currentValue); } } void endOfPipe() { - + std::string errString; + check(kafkaProducer->flush(60000), errString); } };