26 #include <relpipe/writer/AttributeMetadata.h> |
27 #include <relpipe/writer/AttributeMetadata.h> |
27 #include <relpipe/writer/Factory.h> |
28 #include <relpipe/writer/Factory.h> |
28 #include <relpipe/writer/TypeId.h> |
29 #include <relpipe/writer/TypeId.h> |
29 |
30 |
30 #include <relpipe/cli/CLI.h> |
31 #include <relpipe/cli/CLI.h> |
|
32 #include <condition_variable> |
31 |
33 |
32 #include "KafkaCommand.h" |
34 #include "KafkaCommand.h" |
33 #include "CLIParser.h" |
35 #include "CLIParser.h" |
34 #include "Configuration.h" |
36 #include "Configuration.h" |
35 |
37 |
36 using namespace relpipe::cli; |
38 using namespace relpipe::cli; |
37 using namespace relpipe::writer; |
39 using namespace relpipe::writer; |
38 using namespace relpipe::in::kafka; |
40 using namespace relpipe::in::kafka; |
39 |
41 |
|
42 static std::shared_ptr<KafkaCommand> kafkaCommand = nullptr; |
|
43 |
|
44 void finish(int sig) { |
|
45 if (kafkaCommand) kafkaCommand->finish(sig); |
|
46 } |
|
47 |
40 int main(int argc, char** argv) { |
48 int main(int argc, char** argv) { |
41 setlocale(LC_ALL, ""); |
49 setlocale(LC_ALL, ""); |
42 CLI::untieStdIO(); |
50 CLI::untieStdIO(); |
43 CLI cli(argc, argv); |
51 CLI cli(argc, argv); |
44 |
52 |
45 int resultCode = CLI::EXIT_CODE_UNEXPECTED_ERROR; |
53 int resultCode = CLI::EXIT_CODE_UNEXPECTED_ERROR; |
46 |
54 |
47 try { |
55 try { |
|
56 signal(SIGHUP, finish); |
|
57 signal(SIGINT, finish); |
48 CLIParser cliParser; |
58 CLIParser cliParser; |
49 Configuration configuration = cliParser.parse(cli.arguments()); |
59 Configuration configuration = cliParser.parse(cli.arguments()); |
50 KafkaCommand command; |
60 kafkaCommand.reset(new KafkaCommand()); |
51 std::shared_ptr<RelationalWriter> writer(Factory::create(std::cout)); |
61 std::shared_ptr<RelationalWriter> writer(Factory::create(std::cout)); |
52 command.process(writer, configuration); |
62 writer->setBufferingMode(BufferingMode::ENVIRONMENT, BufferingMode::RECORD); |
|
63 kafkaCommand->process(writer, configuration); |
53 resultCode = CLI::EXIT_CODE_SUCCESS; |
64 resultCode = CLI::EXIT_CODE_SUCCESS; |
54 } catch (RelpipeCLIException e) { |
65 } catch (RelpipeCLIException e) { |
55 fwprintf(stderr, L"Caught CLI exception: %ls\n", e.getMessage().c_str()); |
66 fwprintf(stderr, L"Caught CLI exception: %ls\n", e.getMessage().c_str()); |
56 fwprintf(stderr, L"Debug: Input stream: eof=%ls, lastRead=%d\n", (cin.eof() ? L"true" : L"false"), cin.gcount()); |
67 fwprintf(stderr, L"Debug: Input stream: eof=%ls, lastRead=%d\n", (cin.eof() ? L"true" : L"false"), cin.gcount()); |
57 resultCode = e.getExitCode(); |
68 resultCode = e.getExitCode(); |