src/AwkHandler.h
branchv_0
changeset 1 05d969cd90d4
parent 0 644fd2ce2580
child 2 580ccb511301
equal deleted inserted replaced
0:644fd2ce2580 1:05d969cd90d4
    24 #include <sstream>
    24 #include <sstream>
    25 #include <locale>
    25 #include <locale>
    26 #include <codecvt>
    26 #include <codecvt>
    27 #include <regex>
    27 #include <regex>
    28 
    28 
       
    29 #include <unistd.h>
       
    30 #include <wait.h>
       
    31 #include <ext/stdio_filebuf.h>
       
    32 
    29 #include <relpipe/reader/typedefs.h>
    33 #include <relpipe/reader/typedefs.h>
    30 #include <relpipe/reader/TypeId.h>
    34 #include <relpipe/reader/TypeId.h>
    31 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
    35 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
    32 #include <relpipe/reader/handlers/AttributeMetadata.h>
    36 #include <relpipe/reader/handlers/AttributeMetadata.h>
    33 
    37 
    49 class AwkHandler : public RelationalReaderStringHandler {
    53 class AwkHandler : public RelationalReaderStringHandler {
    50 private:
    54 private:
    51 	Configuration configuration;
    55 	Configuration configuration;
    52 	writer::RelationalWriter* relationalWriter;
    56 	writer::RelationalWriter* relationalWriter;
    53 
    57 
       
    58 	void createPipe(int& readerFD, int& writerFD) {
       
    59 		int fds[2];
       
    60 		int result = pipe(fds);
       
    61 		readerFD = fds[0];
       
    62 		writerFD = fds[1];
       
    63 		if (result < 0) throw cli::RelpipeCLIException(L"Unable to create a pipe.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
       
    64 	}
       
    65 
       
    66 	void redirectFD(int oldfd, int newfd) {
       
    67 		int result = dup2(oldfd, newfd);
       
    68 		if (result < 0) throw cli::RelpipeCLIException(L"Unable redirect FD.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
       
    69 	}
       
    70 
       
    71 	void closeOrThrow(int fd) {
       
    72 		int error = close(fd);
       
    73 		if (error) throw cli::RelpipeCLIException(L"Unable to close FD: ", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
       
    74 	}
       
    75 
    54 public:
    76 public:
    55 
    77 
    56 	AwkHandler(writer::RelationalWriter* relationalWriter, Configuration& configuration) : relationalWriter(relationalWriter), configuration(configuration) {
    78 	AwkHandler(writer::RelationalWriter* relationalWriter, Configuration& configuration) : relationalWriter(relationalWriter), configuration(configuration) {
    57 	}
    79 	}
    58 
    80 
    59 	void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
    81 	void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
       
    82 
       
    83 
       
    84 		int awkInputReaderFD;
       
    85 		int awkInputWriterFD;
       
    86 		int awkOutputReaderFD;
       
    87 		int awkOutputWriterFD;
       
    88 
       
    89 		createPipe(awkInputReaderFD, awkInputWriterFD);
       
    90 		createPipe(awkOutputReaderFD, awkOutputWriterFD);
       
    91 
       
    92 		__pid_t awkPid = fork();
       
    93 
       
    94 		if (awkPid < 0) {
       
    95 			throw cli::RelpipeCLIException(L"Unable to fork AWK process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
       
    96 		} else if (awkPid == 0) {
       
    97 			// AWK child process
       
    98 			close(awkInputWriterFD);
       
    99 			close(awkOutputReaderFD);
       
   100 
       
   101 			redirectFD(awkInputReaderFD, 0);
       
   102 			redirectFD(awkOutputWriterFD, 1);
       
   103 
       
   104 
       
   105 			fwprintf(stderr, L"I am child AWK with PID: %d\n", getpid());
       
   106 			execlp("awk", "awk", "{print \"AWK says: line \" NR \" = \" $0;}", nullptr);
       
   107 
       
   108 		} else {
       
   109 			// Parent process
       
   110 			close(awkInputReaderFD);
       
   111 			close(awkOutputWriterFD);
       
   112 			fwprintf(stderr, L"Forked AWK has PID: %d\n", awkPid);
       
   113 
       
   114 			__pid_t writerPid = fork();
       
   115 
       
   116 			if (writerPid < 0) {
       
   117 				throw cli::RelpipeCLIException(L"Unable to fork Writer process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
       
   118 			} else if (writerPid == 0) {
       
   119 				// Writer child process
       
   120 				close(awkInputWriterFD);
       
   121 				fwprintf(stderr, L"I am child Writer with PID: %d\n", getpid());
       
   122 
       
   123 				__gnu_cxx::stdio_filebuf<char> awkOutputReaderBuffer(awkOutputReaderFD, std::ios::in);
       
   124 				std::istream awkOutputReader(&awkOutputReaderBuffer);
       
   125 
       
   126 				std::cout << "--- PIPE start --------" << std::endl;
       
   127 				for (char ch = awkOutputReader.get(); awkOutputReader.good(); ch = awkOutputReader.get()) {
       
   128 					std::cout << ch;
       
   129 				}
       
   130 				std::cout << "--- PIPE end ----------" << std::endl;
       
   131 
       
   132 				close(awkOutputReaderFD);
       
   133 			} else {
       
   134 				// Parent process
       
   135 				close(awkOutputReaderFD);
       
   136 				fwprintf(stderr, L"Forked Writer has PID: %d\n", writerPid);
       
   137 
       
   138 				dprintf(awkInputWriterFD, "hello world :-)\n");
       
   139 				close(awkInputWriterFD);
       
   140 
       
   141 				/*
       
   142 				__gnu_cxx::stdio_filebuf<char> awkInputWriterBuffer(awkInputWriterFD, std::ios::out);
       
   143 				std::ostream awkInputWriter(&awkInputWriterBuffer);
       
   144 				awkInputWriter << "hello world :-)" << std::endl;
       
   145 				awkInputWriter.flush();
       
   146 				close(awkInputWriterFD);
       
   147 				 */
       
   148 
       
   149 				fwprintf(stderr, L"writing done and closed\n");
       
   150 				__pid_t waitResult1 = wait(NULL);
       
   151 				fwprintf(stderr, L"wait 1 done: %d\n", waitResult1);
       
   152 				__pid_t waitResult2 = wait(NULL);
       
   153 				fwprintf(stderr, L"wait 2 done: %d\n", waitResult2);
       
   154 			}
       
   155 		}
    60 
   156 
    61 	}
   157 	}
    62 
   158 
    63 	void attribute(const string_t& value) override {
   159 	void attribute(const string_t& value) override {
    64 
   160