# HG changeset patch # User František Kučera # Date 1556897580 -7200 # Node ID 05d969cd90d47d9e5f17fde52733f3440b4875e3 # Parent 644fd2ce2580042496560c9722551c077a0d0462 fork() processes diff -r 644fd2ce2580 -r 05d969cd90d4 src/AwkHandler.h --- a/src/AwkHandler.h Wed May 01 22:11:52 2019 +0200 +++ b/src/AwkHandler.h Fri May 03 17:33:00 2019 +0200 @@ -26,6 +26,10 @@ #include #include +#include +#include +#include + #include #include #include @@ -51,6 +55,24 @@ Configuration configuration; writer::RelationalWriter* relationalWriter; + void createPipe(int& readerFD, int& writerFD) { + int fds[2]; + int result = pipe(fds); + readerFD = fds[0]; + writerFD = fds[1]; + if (result < 0) throw cli::RelpipeCLIException(L"Unable to create a pipe.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? + } + + void redirectFD(int oldfd, int newfd) { + int result = dup2(oldfd, newfd); + if (result < 0) throw cli::RelpipeCLIException(L"Unable redirect FD.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? + } + + void closeOrThrow(int fd) { + int error = close(fd); + if (error) throw cli::RelpipeCLIException(L"Unable to close FD: ", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? + } + public: AwkHandler(writer::RelationalWriter* relationalWriter, Configuration& configuration) : relationalWriter(relationalWriter), configuration(configuration) { @@ -58,6 +80,80 @@ void startRelation(string_t name, vector attributes) override { + + int awkInputReaderFD; + int awkInputWriterFD; + int awkOutputReaderFD; + int awkOutputWriterFD; + + createPipe(awkInputReaderFD, awkInputWriterFD); + createPipe(awkOutputReaderFD, awkOutputWriterFD); + + __pid_t awkPid = fork(); + + if (awkPid < 0) { + throw cli::RelpipeCLIException(L"Unable to fork AWK process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? + } else if (awkPid == 0) { + // AWK child process + close(awkInputWriterFD); + close(awkOutputReaderFD); + + redirectFD(awkInputReaderFD, 0); + redirectFD(awkOutputWriterFD, 1); + + + fwprintf(stderr, L"I am child AWK with PID: %d\n", getpid()); + execlp("awk", "awk", "{print \"AWK says: line \" NR \" = \" $0;}", nullptr); + + } else { + // Parent process + close(awkInputReaderFD); + close(awkOutputWriterFD); + fwprintf(stderr, L"Forked AWK has PID: %d\n", awkPid); + + __pid_t writerPid = fork(); + + if (writerPid < 0) { + throw cli::RelpipeCLIException(L"Unable to fork Writer process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? + } else if (writerPid == 0) { + // Writer child process + close(awkInputWriterFD); + fwprintf(stderr, L"I am child Writer with PID: %d\n", getpid()); + + __gnu_cxx::stdio_filebuf awkOutputReaderBuffer(awkOutputReaderFD, std::ios::in); + std::istream awkOutputReader(&awkOutputReaderBuffer); + + std::cout << "--- PIPE start --------" << std::endl; + for (char ch = awkOutputReader.get(); awkOutputReader.good(); ch = awkOutputReader.get()) { + std::cout << ch; + } + std::cout << "--- PIPE end ----------" << std::endl; + + close(awkOutputReaderFD); + } else { + // Parent process + close(awkOutputReaderFD); + fwprintf(stderr, L"Forked Writer has PID: %d\n", writerPid); + + dprintf(awkInputWriterFD, "hello world :-)\n"); + close(awkInputWriterFD); + + /* + __gnu_cxx::stdio_filebuf awkInputWriterBuffer(awkInputWriterFD, std::ios::out); + std::ostream awkInputWriter(&awkInputWriterBuffer); + awkInputWriter << "hello world :-)" << std::endl; + awkInputWriter.flush(); + close(awkInputWriterFD); + */ + + fwprintf(stderr, L"writing done and closed\n"); + __pid_t waitResult1 = wait(NULL); + fwprintf(stderr, L"wait 1 done: %d\n", waitResult1); + __pid_t waitResult2 = wait(NULL); + fwprintf(stderr, L"wait 2 done: %d\n", waitResult2); + } + } + } void attribute(const string_t& value) override {