fork() processes v_0
authorFrantišek Kučera <franta-hg@frantovo.cz>
Fri, 03 May 2019 17:33:00 +0200
branchv_0
changeset 1 05d969cd90d4
parent 0 644fd2ce2580
child 2 580ccb511301
fork() processes
src/AwkHandler.h
--- a/src/AwkHandler.h	Wed May 01 22:11:52 2019 +0200
+++ b/src/AwkHandler.h	Fri May 03 17:33:00 2019 +0200
@@ -26,6 +26,10 @@
 #include <codecvt>
 #include <regex>
 
+#include <unistd.h>
+#include <wait.h>
+#include <ext/stdio_filebuf.h>
+
 #include <relpipe/reader/typedefs.h>
 #include <relpipe/reader/TypeId.h>
 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
@@ -51,6 +55,24 @@
 	Configuration configuration;
 	writer::RelationalWriter* relationalWriter;
 
+	void createPipe(int& readerFD, int& writerFD) {
+		int fds[2];
+		int result = pipe(fds);
+		readerFD = fds[0];
+		writerFD = fds[1];
+		if (result < 0) throw cli::RelpipeCLIException(L"Unable to create a pipe.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
+	}
+
+	void redirectFD(int oldfd, int newfd) {
+		int result = dup2(oldfd, newfd);
+		if (result < 0) throw cli::RelpipeCLIException(L"Unable redirect FD.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
+	}
+
+	void closeOrThrow(int fd) {
+		int error = close(fd);
+		if (error) throw cli::RelpipeCLIException(L"Unable to close FD: ", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
+	}
+
 public:
 
 	AwkHandler(writer::RelationalWriter* relationalWriter, Configuration& configuration) : relationalWriter(relationalWriter), configuration(configuration) {
@@ -58,6 +80,80 @@
 
 	void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
 
+
+		int awkInputReaderFD;
+		int awkInputWriterFD;
+		int awkOutputReaderFD;
+		int awkOutputWriterFD;
+
+		createPipe(awkInputReaderFD, awkInputWriterFD);
+		createPipe(awkOutputReaderFD, awkOutputWriterFD);
+
+		__pid_t awkPid = fork();
+
+		if (awkPid < 0) {
+			throw cli::RelpipeCLIException(L"Unable to fork AWK process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
+		} else if (awkPid == 0) {
+			// AWK child process
+			close(awkInputWriterFD);
+			close(awkOutputReaderFD);
+
+			redirectFD(awkInputReaderFD, 0);
+			redirectFD(awkOutputWriterFD, 1);
+
+
+			fwprintf(stderr, L"I am child AWK with PID: %d\n", getpid());
+			execlp("awk", "awk", "{print \"AWK says: line \" NR \" = \" $0;}", nullptr);
+
+		} else {
+			// Parent process
+			close(awkInputReaderFD);
+			close(awkOutputWriterFD);
+			fwprintf(stderr, L"Forked AWK has PID: %d\n", awkPid);
+
+			__pid_t writerPid = fork();
+
+			if (writerPid < 0) {
+				throw cli::RelpipeCLIException(L"Unable to fork Writer process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
+			} else if (writerPid == 0) {
+				// Writer child process
+				close(awkInputWriterFD);
+				fwprintf(stderr, L"I am child Writer with PID: %d\n", getpid());
+
+				__gnu_cxx::stdio_filebuf<char> awkOutputReaderBuffer(awkOutputReaderFD, std::ios::in);
+				std::istream awkOutputReader(&awkOutputReaderBuffer);
+
+				std::cout << "--- PIPE start --------" << std::endl;
+				for (char ch = awkOutputReader.get(); awkOutputReader.good(); ch = awkOutputReader.get()) {
+					std::cout << ch;
+				}
+				std::cout << "--- PIPE end ----------" << std::endl;
+
+				close(awkOutputReaderFD);
+			} else {
+				// Parent process
+				close(awkOutputReaderFD);
+				fwprintf(stderr, L"Forked Writer has PID: %d\n", writerPid);
+
+				dprintf(awkInputWriterFD, "hello world :-)\n");
+				close(awkInputWriterFD);
+
+				/*
+				__gnu_cxx::stdio_filebuf<char> awkInputWriterBuffer(awkInputWriterFD, std::ios::out);
+				std::ostream awkInputWriter(&awkInputWriterBuffer);
+				awkInputWriter << "hello world :-)" << std::endl;
+				awkInputWriter.flush();
+				close(awkInputWriterFD);
+				 */
+
+				fwprintf(stderr, L"writing done and closed\n");
+				__pid_t waitResult1 = wait(NULL);
+				fwprintf(stderr, L"wait 1 done: %d\n", waitResult1);
+				__pid_t waitResult2 = wait(NULL);
+				fwprintf(stderr, L"wait 2 done: %d\n", waitResult2);
+			}
+		}
+
 	}
 
 	void attribute(const string_t& value) override {