src/relpipe-in-kafka.cpp
branchv_0
changeset 1 6a2ae23c53c4
parent 0 5499cbd842ab
child 2 c6b1ae438ab3
--- a/src/relpipe-in-kafka.cpp	Sun Apr 24 22:21:02 2022 +0200
+++ b/src/relpipe-in-kafka.cpp	Wed Apr 27 01:47:35 2022 +0200
@@ -15,6 +15,7 @@
  * along with this program. If not, see <http://www.gnu.org/licenses/>.
  */
 #include <cstdlib>
+#include <csignal>
 #include <vector>
 #include <memory>
 #include <regex>
@@ -28,6 +29,7 @@
 #include <relpipe/writer/TypeId.h>
 
 #include <relpipe/cli/CLI.h>
+#include <condition_variable>
 
 #include "KafkaCommand.h"
 #include "CLIParser.h"
@@ -37,6 +39,12 @@
 using namespace relpipe::writer;
 using namespace relpipe::in::kafka;
 
+static std::shared_ptr<KafkaCommand> kafkaCommand = nullptr;
+
+void finish(int sig) {
+	if (kafkaCommand) kafkaCommand->finish(sig);
+}
+
 int main(int argc, char** argv) {
 	setlocale(LC_ALL, "");
 	CLI::untieStdIO();
@@ -45,11 +53,14 @@
 	int resultCode = CLI::EXIT_CODE_UNEXPECTED_ERROR;
 
 	try {
+		signal(SIGHUP, finish);
+		signal(SIGINT, finish);
 		CLIParser cliParser;
 		Configuration configuration = cliParser.parse(cli.arguments());
-		KafkaCommand command;
+		kafkaCommand.reset(new KafkaCommand());
 		std::shared_ptr<RelationalWriter> writer(Factory::create(std::cout));
-		command.process(writer, configuration);
+		writer->setBufferingMode(BufferingMode::ENVIRONMENT, BufferingMode::RECORD);
+		kafkaCommand->process(writer, configuration);
 		resultCode = CLI::EXIT_CODE_SUCCESS;
 	} catch (RelpipeCLIException e) {
 		fwprintf(stderr, L"Caught CLI exception: %ls\n", e.getMessage().c_str());