src/AwkHandler.h
branchv_0
changeset 2 580ccb511301
parent 1 05d969cd90d4
child 3 e086ae6a19c3
equal deleted inserted replaced
1:05d969cd90d4 2:580ccb511301
    53 class AwkHandler : public RelationalReaderStringHandler {
    53 class AwkHandler : public RelationalReaderStringHandler {
    54 private:
    54 private:
    55 	Configuration configuration;
    55 	Configuration configuration;
    56 	writer::RelationalWriter* relationalWriter;
    56 	writer::RelationalWriter* relationalWriter;
    57 
    57 
       
    58 	int awkInputWriterFD = -1;
       
    59 
    58 	void createPipe(int& readerFD, int& writerFD) {
    60 	void createPipe(int& readerFD, int& writerFD) {
    59 		int fds[2];
    61 		int fds[2];
    60 		int result = pipe(fds);
    62 		int result = pipe(fds);
    61 		readerFD = fds[0];
    63 		readerFD = fds[0];
    62 		writerFD = fds[1];
    64 		writerFD = fds[1];
    68 		if (result < 0) throw cli::RelpipeCLIException(L"Unable redirect FD.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
    70 		if (result < 0) throw cli::RelpipeCLIException(L"Unable redirect FD.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
    69 	}
    71 	}
    70 
    72 
    71 	void closeOrThrow(int fd) {
    73 	void closeOrThrow(int fd) {
    72 		int error = close(fd);
    74 		int error = close(fd);
    73 		if (error) throw cli::RelpipeCLIException(L"Unable to close FD: ", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
    75 		if (error) throw cli::RelpipeCLIException(L"Unable to close FD: " + to_wstring(fd) + L" from PID: " + to_wstring(getpid()), cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
       
    76 	}
       
    77 
       
    78 	void cleanUp() {
       
    79 		if (awkInputWriterFD >= 0) {
       
    80 			closeOrThrow(awkInputWriterFD);
       
    81 			fwprintf(stderr, L"writing done and closed\n");
       
    82 			__pid_t waitResult1 = wait(NULL);
       
    83 			fwprintf(stderr, L"wait 1 done: %d\n", waitResult1);
       
    84 			__pid_t waitResult2 = wait(NULL);
       
    85 			fwprintf(stderr, L"wait 2 done: %d\n", waitResult2);
       
    86 			awkInputWriterFD = -1;
       
    87 		}
    74 	}
    88 	}
    75 
    89 
    76 public:
    90 public:
    77 
    91 
    78 	AwkHandler(writer::RelationalWriter* relationalWriter, Configuration& configuration) : relationalWriter(relationalWriter), configuration(configuration) {
    92 	AwkHandler(writer::RelationalWriter* relationalWriter, Configuration& configuration) : relationalWriter(relationalWriter), configuration(configuration) {
    79 	}
    93 	}
    80 
    94 
    81 	void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
    95 	void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
       
    96 		cleanUp();
    82 
    97 
    83 
    98 
    84 		int awkInputReaderFD;
    99 		int awkInputReaderFD;
    85 		int awkInputWriterFD;
       
    86 		int awkOutputReaderFD;
   100 		int awkOutputReaderFD;
    87 		int awkOutputWriterFD;
   101 		int awkOutputWriterFD;
    88 
   102 
    89 		createPipe(awkInputReaderFD, awkInputWriterFD);
   103 		createPipe(awkInputReaderFD, awkInputWriterFD);
    90 		createPipe(awkOutputReaderFD, awkOutputWriterFD);
   104 		createPipe(awkOutputReaderFD, awkOutputWriterFD);
    93 
   107 
    94 		if (awkPid < 0) {
   108 		if (awkPid < 0) {
    95 			throw cli::RelpipeCLIException(L"Unable to fork AWK process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
   109 			throw cli::RelpipeCLIException(L"Unable to fork AWK process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
    96 		} else if (awkPid == 0) {
   110 		} else if (awkPid == 0) {
    97 			// AWK child process
   111 			// AWK child process
    98 			close(awkInputWriterFD);
   112 			closeOrThrow(awkInputWriterFD);
    99 			close(awkOutputReaderFD);
   113 			closeOrThrow(awkOutputReaderFD);
   100 
   114 
   101 			redirectFD(awkInputReaderFD, 0);
   115 			redirectFD(awkInputReaderFD, STDIN_FILENO);
   102 			redirectFD(awkOutputWriterFD, 1);
   116 			redirectFD(awkOutputWriterFD, STDOUT_FILENO);
   103 
   117 
   104 
   118 
   105 			fwprintf(stderr, L"I am child AWK with PID: %d\n", getpid());
   119 			fwprintf(stderr, L"I am child AWK with PID: %d\n", getpid());
   106 			execlp("awk", "awk", "{print \"AWK says: line \" NR \" = \" $0;}", nullptr);
   120 			execlp("awk", "awk", "{print \"AWK says: line \" NR \" = \" $0;}", nullptr);
   107 
   121 
   108 		} else {
   122 		} else {
   109 			// Parent process
   123 			// Parent process
   110 			close(awkInputReaderFD);
   124 			closeOrThrow(awkInputReaderFD);
   111 			close(awkOutputWriterFD);
   125 			closeOrThrow(awkOutputWriterFD);
   112 			fwprintf(stderr, L"Forked AWK has PID: %d\n", awkPid);
   126 			fwprintf(stderr, L"Forked AWK has PID: %d\n", awkPid);
   113 
   127 
   114 			__pid_t writerPid = fork();
   128 			__pid_t writerPid = fork();
   115 
   129 
   116 			if (writerPid < 0) {
   130 			if (writerPid < 0) {
   117 				throw cli::RelpipeCLIException(L"Unable to fork Writer process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
   131 				throw cli::RelpipeCLIException(L"Unable to fork Writer process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
   118 			} else if (writerPid == 0) {
   132 			} else if (writerPid == 0) {
   119 				// Writer child process
   133 				// Writer child process
   120 				close(awkInputWriterFD);
   134 				closeOrThrow(awkInputWriterFD);
   121 				fwprintf(stderr, L"I am child Writer with PID: %d\n", getpid());
   135 				fwprintf(stderr, L"I am child Writer with PID: %d\n", getpid());
   122 
   136 
   123 				__gnu_cxx::stdio_filebuf<char> awkOutputReaderBuffer(awkOutputReaderFD, std::ios::in);
   137 				__gnu_cxx::stdio_filebuf<char> awkOutputReaderBuffer(awkOutputReaderFD, std::ios::in);
   124 				std::istream awkOutputReader(&awkOutputReaderBuffer);
   138 				std::istream awkOutputReader(&awkOutputReaderBuffer);
   125 
   139 
   127 				for (char ch = awkOutputReader.get(); awkOutputReader.good(); ch = awkOutputReader.get()) {
   141 				for (char ch = awkOutputReader.get(); awkOutputReader.good(); ch = awkOutputReader.get()) {
   128 					std::cout << ch;
   142 					std::cout << ch;
   129 				}
   143 				}
   130 				std::cout << "--- PIPE end ----------" << std::endl;
   144 				std::cout << "--- PIPE end ----------" << std::endl;
   131 
   145 
   132 				close(awkOutputReaderFD);
   146 				closeOrThrow(awkOutputReaderFD);
       
   147 				exit(0);
   133 			} else {
   148 			} else {
   134 				// Parent process
   149 				// Parent process
   135 				close(awkOutputReaderFD);
   150 				closeOrThrow(awkOutputReaderFD);
   136 				fwprintf(stderr, L"Forked Writer has PID: %d\n", writerPid);
   151 				fwprintf(stderr, L"Forked Writer has PID: %d\n", writerPid);
   137 
   152 
   138 				dprintf(awkInputWriterFD, "hello world :-)\n");
   153 				dprintf(awkInputWriterFD, "hello world :-)\n");
   139 				close(awkInputWriterFD);
   154 				//closeOrThrow(awkInputWriterFD);
   140 
   155 
   141 				/*
   156 				/*
   142 				__gnu_cxx::stdio_filebuf<char> awkInputWriterBuffer(awkInputWriterFD, std::ios::out);
   157 				__gnu_cxx::stdio_filebuf<char> awkInputWriterBuffer(awkInputWriterFD, std::ios::out);
   143 				std::ostream awkInputWriter(&awkInputWriterBuffer);
   158 				std::ostream awkInputWriter(&awkInputWriterBuffer);
   144 				awkInputWriter << "hello world :-)" << std::endl;
   159 				awkInputWriter << "hello world :-)" << std::endl;
   145 				awkInputWriter.flush();
   160 				awkInputWriter.flush();
   146 				close(awkInputWriterFD);
   161 				closeOrThrow(awkInputWriterFD);
   147 				 */
   162 				 */
   148 
   163 
   149 				fwprintf(stderr, L"writing done and closed\n");
       
   150 				__pid_t waitResult1 = wait(NULL);
       
   151 				fwprintf(stderr, L"wait 1 done: %d\n", waitResult1);
       
   152 				__pid_t waitResult2 = wait(NULL);
       
   153 				fwprintf(stderr, L"wait 2 done: %d\n", waitResult2);
       
   154 			}
   164 			}
   155 		}
   165 		}
   156 
   166 
   157 	}
   167 	}
   158 
   168 
   159 	void attribute(const string_t& value) override {
   169 	void attribute(const string_t& value) override {
   160 
   170 		dprintf(awkInputWriterFD, "attribute!\n");
   161 	}
   171 	}
   162 
   172 
   163 	void endOfPipe() {
   173 	void endOfPipe() {
   164 
   174 		cleanUp();
   165 	}
   175 	}
   166 
   176 
   167 };
   177 };
   168 
   178 
   169 }
   179 }