49 class AwkHandler : public RelationalReaderStringHandler { |
53 class AwkHandler : public RelationalReaderStringHandler { |
50 private: |
54 private: |
51 Configuration configuration; |
55 Configuration configuration; |
52 writer::RelationalWriter* relationalWriter; |
56 writer::RelationalWriter* relationalWriter; |
53 |
57 |
|
58 void createPipe(int& readerFD, int& writerFD) { |
|
59 int fds[2]; |
|
60 int result = pipe(fds); |
|
61 readerFD = fds[0]; |
|
62 writerFD = fds[1]; |
|
63 if (result < 0) throw cli::RelpipeCLIException(L"Unable to create a pipe.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? |
|
64 } |
|
65 |
|
66 void redirectFD(int oldfd, int newfd) { |
|
67 int result = dup2(oldfd, newfd); |
|
68 if (result < 0) throw cli::RelpipeCLIException(L"Unable redirect FD.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? |
|
69 } |
|
70 |
|
71 void closeOrThrow(int fd) { |
|
72 int error = close(fd); |
|
73 if (error) throw cli::RelpipeCLIException(L"Unable to close FD: ", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? |
|
74 } |
|
75 |
54 public: |
76 public: |
55 |
77 |
56 AwkHandler(writer::RelationalWriter* relationalWriter, Configuration& configuration) : relationalWriter(relationalWriter), configuration(configuration) { |
78 AwkHandler(writer::RelationalWriter* relationalWriter, Configuration& configuration) : relationalWriter(relationalWriter), configuration(configuration) { |
57 } |
79 } |
58 |
80 |
59 void startRelation(string_t name, vector<AttributeMetadata> attributes) override { |
81 void startRelation(string_t name, vector<AttributeMetadata> attributes) override { |
|
82 |
|
83 |
|
84 int awkInputReaderFD; |
|
85 int awkInputWriterFD; |
|
86 int awkOutputReaderFD; |
|
87 int awkOutputWriterFD; |
|
88 |
|
89 createPipe(awkInputReaderFD, awkInputWriterFD); |
|
90 createPipe(awkOutputReaderFD, awkOutputWriterFD); |
|
91 |
|
92 __pid_t awkPid = fork(); |
|
93 |
|
94 if (awkPid < 0) { |
|
95 throw cli::RelpipeCLIException(L"Unable to fork AWK process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? |
|
96 } else if (awkPid == 0) { |
|
97 // AWK child process |
|
98 close(awkInputWriterFD); |
|
99 close(awkOutputReaderFD); |
|
100 |
|
101 redirectFD(awkInputReaderFD, 0); |
|
102 redirectFD(awkOutputWriterFD, 1); |
|
103 |
|
104 |
|
105 fwprintf(stderr, L"I am child AWK with PID: %d\n", getpid()); |
|
106 execlp("awk", "awk", "{print \"AWK says: line \" NR \" = \" $0;}", nullptr); |
|
107 |
|
108 } else { |
|
109 // Parent process |
|
110 close(awkInputReaderFD); |
|
111 close(awkOutputWriterFD); |
|
112 fwprintf(stderr, L"Forked AWK has PID: %d\n", awkPid); |
|
113 |
|
114 __pid_t writerPid = fork(); |
|
115 |
|
116 if (writerPid < 0) { |
|
117 throw cli::RelpipeCLIException(L"Unable to fork Writer process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? |
|
118 } else if (writerPid == 0) { |
|
119 // Writer child process |
|
120 close(awkInputWriterFD); |
|
121 fwprintf(stderr, L"I am child Writer with PID: %d\n", getpid()); |
|
122 |
|
123 __gnu_cxx::stdio_filebuf<char> awkOutputReaderBuffer(awkOutputReaderFD, std::ios::in); |
|
124 std::istream awkOutputReader(&awkOutputReaderBuffer); |
|
125 |
|
126 std::cout << "--- PIPE start --------" << std::endl; |
|
127 for (char ch = awkOutputReader.get(); awkOutputReader.good(); ch = awkOutputReader.get()) { |
|
128 std::cout << ch; |
|
129 } |
|
130 std::cout << "--- PIPE end ----------" << std::endl; |
|
131 |
|
132 close(awkOutputReaderFD); |
|
133 } else { |
|
134 // Parent process |
|
135 close(awkOutputReaderFD); |
|
136 fwprintf(stderr, L"Forked Writer has PID: %d\n", writerPid); |
|
137 |
|
138 dprintf(awkInputWriterFD, "hello world :-)\n"); |
|
139 close(awkInputWriterFD); |
|
140 |
|
141 /* |
|
142 __gnu_cxx::stdio_filebuf<char> awkInputWriterBuffer(awkInputWriterFD, std::ios::out); |
|
143 std::ostream awkInputWriter(&awkInputWriterBuffer); |
|
144 awkInputWriter << "hello world :-)" << std::endl; |
|
145 awkInputWriter.flush(); |
|
146 close(awkInputWriterFD); |
|
147 */ |
|
148 |
|
149 fwprintf(stderr, L"writing done and closed\n"); |
|
150 __pid_t waitResult1 = wait(NULL); |
|
151 fwprintf(stderr, L"wait 1 done: %d\n", waitResult1); |
|
152 __pid_t waitResult2 = wait(NULL); |
|
153 fwprintf(stderr, L"wait 2 done: %d\n", waitResult2); |
|
154 } |
|
155 } |
60 |
156 |
61 } |
157 } |
62 |
158 |
63 void attribute(const string_t& value) override { |
159 void attribute(const string_t& value) override { |
64 |
160 |