21 #include <vector> |
21 #include <vector> |
22 #include <iostream> |
22 #include <iostream> |
23 #include <sstream> |
23 #include <sstream> |
24 #include <locale> |
24 #include <locale> |
25 #include <codecvt> |
25 #include <codecvt> |
|
26 #include <unistd.h> |
|
27 |
|
28 #include <librdkafka/rdkafkacpp.h> |
26 |
29 |
27 #include <relpipe/common/type/typedefs.h> |
30 #include <relpipe/common/type/typedefs.h> |
28 #include <relpipe/reader/TypeId.h> |
31 #include <relpipe/reader/TypeId.h> |
29 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h> |
32 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h> |
30 #include <relpipe/reader/handlers/AttributeMetadata.h> |
33 #include <relpipe/reader/handlers/AttributeMetadata.h> |
31 |
34 |
32 #include "Kafka.h" |
|
33 #include "Configuration.h" |
35 #include "Configuration.h" |
34 #include "Hex.h" |
36 #include "Hex.h" |
35 |
37 |
36 namespace relpipe { |
38 namespace relpipe { |
37 namespace out { |
39 namespace out { |
38 namespace kafka { |
40 namespace kafka { |
39 |
41 |
|
42 static void check(RdKafka::Conf::ConfResult result, const std::string& errString) { |
|
43 if (result != RdKafka::Conf::CONF_OK) { |
|
44 throw std::logic_error("Unable to configure Kafka: " + errString); |
|
45 } |
|
46 } |
|
47 |
|
48 static void check(RdKafka::ErrorCode result, const std::string& errString) { |
|
49 if (result != RdKafka::ERR_NO_ERROR) { |
|
50 throw std::logic_error("Kafka error: " + errString); |
|
51 } |
|
52 } |
|
53 |
|
54 static void check(std::shared_ptr<RdKafka::Producer> kafkaProducer, const std::string& errString) { |
|
55 if (kafkaProducer.get() == nullptr) { |
|
56 throw std::logic_error("Unable to create Kafka producer: " + errString); |
|
57 } |
|
58 } |
|
59 |
40 class KafkaHandler : public relpipe::reader::handlers::RelationalReaderStringHandler { |
60 class KafkaHandler : public relpipe::reader::handlers::RelationalReaderStringHandler { |
41 private: |
61 private: |
42 std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings. |
62 std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings. |
43 Configuration configuration; |
63 Configuration configuration; |
44 shared_ptr<Kafka> mq; |
64 std::shared_ptr<RdKafka::Producer> kafkaProducer; |
45 |
65 |
46 struct CurrentRelation { |
66 struct CurrentRelation { |
47 relpipe::common::type::StringX name; |
67 relpipe::common::type::StringX name; |
48 std::vector<relpipe::reader::handlers::AttributeMetadata> attributes; |
68 std::vector<relpipe::reader::handlers::AttributeMetadata> attributes; |
49 relpipe::common::type::Integer attributeIndex = 0; |
69 relpipe::common::type::Integer attributeIndex = 0; |
50 std::string currentValue; |
70 std::string currentValue; |
51 } currentRelation; |
71 } currentRelation; |
52 |
72 |
|
73 void connect() { |
|
74 if (!kafkaProducer) { |
|
75 std::string errString; |
|
76 std::shared_ptr<RdKafka::Conf> producerConf(RdKafka::Conf::create(RdKafka::Conf::ConfType::CONF_GLOBAL)); |
|
77 |
|
78 // TODO: configurable groupId, clientId and other parameters |
|
79 std::string groupId = "relpipe-in-kafka-group-" + std::to_string(getpid()); |
|
80 std::string clientId = "relpipe-in-kafka-client-" + std::to_string(getpid()); |
|
81 |
|
82 check(producerConf->set("client.id", clientId, errString), errString); |
|
83 check(producerConf->set("group.id", groupId, errString), errString); |
|
84 check(producerConf->set("bootstrap.servers", "plaintext://127.0.0.1:9092", errString), errString); // "host1:9092,host2:9092" "192.168.1.56:9092" |
|
85 //check(consumerConf->set("debug", "all", errString), errString); |
|
86 |
|
87 kafkaProducer.reset(RdKafka::Producer::create(producerConf.get(), errString)); |
|
88 check(kafkaProducer, errString); |
|
89 } |
|
90 } |
|
91 |
|
92 void sendMessage(const std::string& payload) { |
|
93 std::string errString; |
|
94 check(kafkaProducer->produce( |
|
95 convertor.to_bytes(configuration.queue), |
|
96 RdKafka::Topic::PARTITION_UA, |
|
97 RdKafka::Producer::RK_MSG_COPY, |
|
98 const_cast<char *> (payload.c_str()), payload.size(), |
|
99 nullptr, 0, |
|
100 0, |
|
101 nullptr), errString); |
|
102 } |
|
103 |
53 public: |
104 public: |
54 |
105 |
55 KafkaHandler(Configuration configuration) : configuration(configuration) { |
106 KafkaHandler(Configuration configuration) : configuration(configuration) { |
56 mq.reset(Kafka::open(convertor.to_bytes(configuration.queue), configuration.unlinkOnClose)); |
|
57 } |
107 } |
58 |
108 |
59 void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override { |
109 void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override { |
60 currentRelation = CurrentRelation{name, attributes}; |
110 currentRelation = CurrentRelation{name, attributes}; |
|
111 connect(); |
61 } |
112 } |
62 |
113 |
63 void attribute(const relpipe::common::type::StringX& value) override { |
114 void attribute(const relpipe::common::type::StringX& value) override { |
64 |
115 |
65 auto attributeName = currentRelation.attributes[currentRelation.attributeIndex].getAttributeName(); |
116 auto attributeName = currentRelation.attributes[currentRelation.attributeIndex].getAttributeName(); |