--- a/src/relpipe-in-kafka.cpp Sun Apr 24 22:21:02 2022 +0200
+++ b/src/relpipe-in-kafka.cpp Wed Apr 27 01:47:35 2022 +0200
@@ -15,6 +15,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <cstdlib>
+#include <csignal>
#include <vector>
#include <memory>
#include <regex>
@@ -28,6 +29,7 @@
#include <relpipe/writer/TypeId.h>
#include <relpipe/cli/CLI.h>
+#include <condition_variable>
#include "KafkaCommand.h"
#include "CLIParser.h"
@@ -37,6 +39,12 @@
using namespace relpipe::writer;
using namespace relpipe::in::kafka;
+static std::shared_ptr<KafkaCommand> kafkaCommand = nullptr;
+
+void finish(int sig) {
+ if (kafkaCommand) kafkaCommand->finish(sig);
+}
+
int main(int argc, char** argv) {
setlocale(LC_ALL, "");
CLI::untieStdIO();
@@ -45,11 +53,14 @@
int resultCode = CLI::EXIT_CODE_UNEXPECTED_ERROR;
try {
+ signal(SIGHUP, finish);
+ signal(SIGINT, finish);
CLIParser cliParser;
Configuration configuration = cliParser.parse(cli.arguments());
- KafkaCommand command;
+ kafkaCommand.reset(new KafkaCommand());
std::shared_ptr<RelationalWriter> writer(Factory::create(std::cout));
- command.process(writer, configuration);
+ writer->setBufferingMode(BufferingMode::ENVIRONMENT, BufferingMode::RECORD);
+ kafkaCommand->process(writer, configuration);
resultCode = CLI::EXIT_CODE_SUCCESS;
} catch (RelpipeCLIException e) {
fwprintf(stderr, L"Caught CLI exception: %ls\n", e.getMessage().c_str());