--- a/src/KafkaHandler.h Wed Apr 27 21:10:10 2022 +0200
+++ b/src/KafkaHandler.h Wed Apr 27 23:09:36 2022 +0200
@@ -23,13 +23,15 @@
#include <sstream>
#include <locale>
#include <codecvt>
+#include <unistd.h>
+
+#include <librdkafka/rdkafkacpp.h>
#include <relpipe/common/type/typedefs.h>
#include <relpipe/reader/TypeId.h>
#include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
#include <relpipe/reader/handlers/AttributeMetadata.h>
-#include "Kafka.h"
#include "Configuration.h"
#include "Hex.h"
@@ -37,11 +39,29 @@
namespace out {
namespace kafka {
+static void check(RdKafka::Conf::ConfResult result, const std::string& errString) {
+ if (result != RdKafka::Conf::CONF_OK) {
+ throw std::logic_error("Unable to configure Kafka: " + errString);
+ }
+}
+
+static void check(RdKafka::ErrorCode result, const std::string& errString) {
+ if (result != RdKafka::ERR_NO_ERROR) {
+ throw std::logic_error("Kafka error: " + errString);
+ }
+}
+
+static void check(std::shared_ptr<RdKafka::Producer> kafkaProducer, const std::string& errString) {
+ if (kafkaProducer.get() == nullptr) {
+ throw std::logic_error("Unable to create Kafka producer: " + errString);
+ }
+}
+
class KafkaHandler : public relpipe::reader::handlers::RelationalReaderStringHandler {
private:
std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
Configuration configuration;
- shared_ptr<Kafka> mq;
+ std::shared_ptr<RdKafka::Producer> kafkaProducer;
struct CurrentRelation {
relpipe::common::type::StringX name;
@@ -50,14 +70,45 @@
std::string currentValue;
} currentRelation;
+ void connect() {
+ if (!kafkaProducer) {
+ std::string errString;
+ std::shared_ptr<RdKafka::Conf> producerConf(RdKafka::Conf::create(RdKafka::Conf::ConfType::CONF_GLOBAL));
+
+ // TODO: configurable groupId, clientId and other parameters
+ std::string groupId = "relpipe-in-kafka-group-" + std::to_string(getpid());
+ std::string clientId = "relpipe-in-kafka-client-" + std::to_string(getpid());
+
+ check(producerConf->set("client.id", clientId, errString), errString);
+ check(producerConf->set("group.id", groupId, errString), errString);
+ check(producerConf->set("bootstrap.servers", "plaintext://127.0.0.1:9092", errString), errString); // "host1:9092,host2:9092" "192.168.1.56:9092"
+ //check(consumerConf->set("debug", "all", errString), errString);
+
+ kafkaProducer.reset(RdKafka::Producer::create(producerConf.get(), errString));
+ check(kafkaProducer, errString);
+ }
+ }
+
+ void sendMessage(const std::string& payload) {
+ std::string errString;
+ check(kafkaProducer->produce(
+ convertor.to_bytes(configuration.queue),
+ RdKafka::Topic::PARTITION_UA,
+ RdKafka::Producer::RK_MSG_COPY,
+ const_cast<char *> (payload.c_str()), payload.size(),
+ nullptr, 0,
+ 0,
+ nullptr), errString);
+ }
+
public:
KafkaHandler(Configuration configuration) : configuration(configuration) {
- mq.reset(Kafka::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose));
}
void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
currentRelation = CurrentRelation{name, attributes};
+ connect();
}
void attribute(const relpipe::common::type::StringX& value) override {
@@ -69,13 +120,14 @@
currentRelation.attributeIndex++;
if (currentRelation.attributeIndex == currentRelation.attributes.size()) {
currentRelation.attributeIndex = 0;
- mq->send(currentRelation.currentValue);
+ sendMessage(currentRelation.currentValue);
}
}
void endOfPipe() {
-
+ std::string errString;
+ check(kafkaProducer->flush(60000), errString);
}
};