36 |
40 |
37 namespace relpipe { |
41 namespace relpipe { |
38 namespace in { |
42 namespace in { |
39 namespace kafka { |
43 namespace kafka { |
40 |
44 |
|
45 static void check(RdKafka::Conf::ConfResult result, const std::string& errString) { |
|
46 if (result != RdKafka::Conf::CONF_OK) { |
|
47 throw std::logic_error("Unable to configure Kafka: " + errString); |
|
48 } |
|
49 } |
|
50 |
|
51 static void check(RdKafka::ErrorCode result, const std::string& errString) { |
|
52 if (result != RdKafka::ERR_NO_ERROR) { |
|
53 throw std::logic_error("Kafka error: " + errString); |
|
54 } |
|
55 } |
|
56 |
|
57 static void check(std::shared_ptr<RdKafka::KafkaConsumer> kafkaConsumer, const std::string& errString) { |
|
58 if (kafkaConsumer.get() == nullptr) { |
|
59 throw std::logic_error("Unable to create Kafka consumer: " + errString); |
|
60 } |
|
61 } |
|
62 |
41 void KafkaCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) { |
63 void KafkaCommand::process(std::shared_ptr<writer::RelationalWriter> writer, Configuration& configuration) { |
42 vector<AttributeMetadata> metadata; |
64 vector<AttributeMetadata> metadata; |
|
65 |
|
66 std::string errString; |
|
67 std::shared_ptr<RdKafka::Conf> consumerConf(RdKafka::Conf::create(RdKafka::Conf::ConfType::CONF_GLOBAL)); |
|
68 |
|
69 // TODO: configurable groupId, clientId and other parameters |
|
70 std::string groupId = "relpipe-in-kafka-group-" + std::to_string(getpid()); |
|
71 std::string clientId = "relpipe-in-kafka-client-" + std::to_string(getpid()); |
|
72 |
|
73 check(consumerConf->set("client.id", clientId, errString), errString); |
|
74 check(consumerConf->set("group.id", groupId, errString), errString); |
|
75 check(consumerConf->set("bootstrap.servers", "plaintext://127.0.0.1:9092", errString), errString); |
|
76 //check(consumerConf->set("auto.offset.reset", "earliest", errString), errString); |
|
77 //check(consumerConf->set("debug", "all", errString), errString); |
|
78 |
|
79 std::shared_ptr<RdKafka::KafkaConsumer> kafkaConsumer(RdKafka::KafkaConsumer::create(consumerConf.get(), errString)); |
|
80 check(kafkaConsumer, errString); |
|
81 |
|
82 check(kafkaConsumer->subscribe({"relpipe"}), errString); |
|
83 |
43 |
84 |
44 writer->startRelation(configuration.relation,{ |
85 writer->startRelation(configuration.relation,{ |
45 {L"queue", TypeId::STRING}, |
86 {L"queue", TypeId::STRING}, |
46 {L"text", TypeId::STRING}, |
87 {L"text", TypeId::STRING}, |
47 {L"data", TypeId::STRING} |
88 {L"data", TypeId::STRING} |
48 }, true); |
89 }, true); |
49 |
90 |
50 for (int i = configuration.messageCount; i > 0; i--) { |
91 for (int i = configuration.messageCount; continueProcessing && i > 0;) { |
51 std::string message = "TODO: read message from Kafka"; |
92 shared_ptr<RdKafka::Message> message(kafkaConsumer->consume(100)); |
52 writer->writeAttribute(configuration.queue); |
93 if (message.get() && message->err() == 0) { |
53 writer->writeAttribute(Hex::toTxt(message)); |
94 std::string payload = message->payload() ? std::string((const char*) message->payload(), message->len()) : std::string(""); |
54 writer->writeAttribute(Hex::toHex(message)); |
95 writer->writeAttribute(convertor.from_bytes(message->topic_name())); |
|
96 writer->writeAttribute(Hex::toTxt(payload)); |
|
97 writer->writeAttribute(Hex::toHex(payload)); |
|
98 i--; |
|
99 } else if (message->err() == RdKafka::ErrorCode::ERR__TIMED_OUT) { |
|
100 // timeout → try again |
|
101 } else if (message->err() == RdKafka::ErrorCode::ERR__PARTITION_EOF) { |
|
102 // reached the end of the topic/partition → try again |
|
103 } else { |
|
104 std::string m = "error while reading message: " + (message.get() ? message->errstr() : "message is missing"); |
|
105 writer->writeAttribute(configuration.queue); |
|
106 writer->writeAttribute(Hex::toTxt(m)); |
|
107 writer->writeAttribute(Hex::toHex(m)); |
|
108 break; |
|
109 } |
55 } |
110 } |
|
111 |
|
112 // TODO: wrap and close even on exception |
|
113 check(kafkaConsumer->close(), errString); |
56 |
114 |
57 } |
115 } |
58 |
116 |
59 KafkaCommand::~KafkaCommand() { |
117 KafkaCommand::~KafkaCommand() { |
60 } |
118 } |