53 class AwkHandler : public RelationalReaderStringHandler { |
53 class AwkHandler : public RelationalReaderStringHandler { |
54 private: |
54 private: |
55 Configuration configuration; |
55 Configuration configuration; |
56 writer::RelationalWriter* relationalWriter; |
56 writer::RelationalWriter* relationalWriter; |
57 |
57 |
|
58 int awkInputWriterFD = -1; |
|
59 |
58 void createPipe(int& readerFD, int& writerFD) { |
60 void createPipe(int& readerFD, int& writerFD) { |
59 int fds[2]; |
61 int fds[2]; |
60 int result = pipe(fds); |
62 int result = pipe(fds); |
61 readerFD = fds[0]; |
63 readerFD = fds[0]; |
62 writerFD = fds[1]; |
64 writerFD = fds[1]; |
68 if (result < 0) throw cli::RelpipeCLIException(L"Unable redirect FD.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? |
70 if (result < 0) throw cli::RelpipeCLIException(L"Unable redirect FD.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? |
69 } |
71 } |
70 |
72 |
71 void closeOrThrow(int fd) { |
73 void closeOrThrow(int fd) { |
72 int error = close(fd); |
74 int error = close(fd); |
73 if (error) throw cli::RelpipeCLIException(L"Unable to close FD: ", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? |
75 if (error) throw cli::RelpipeCLIException(L"Unable to close FD: " + to_wstring(fd) + L" from PID: " + to_wstring(getpid()), cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? |
|
76 } |
|
77 |
|
78 void cleanUp() { |
|
79 if (awkInputWriterFD >= 0) { |
|
80 closeOrThrow(awkInputWriterFD); |
|
81 fwprintf(stderr, L"writing done and closed\n"); |
|
82 __pid_t waitResult1 = wait(NULL); |
|
83 fwprintf(stderr, L"wait 1 done: %d\n", waitResult1); |
|
84 __pid_t waitResult2 = wait(NULL); |
|
85 fwprintf(stderr, L"wait 2 done: %d\n", waitResult2); |
|
86 awkInputWriterFD = -1; |
|
87 } |
74 } |
88 } |
75 |
89 |
76 public: |
90 public: |
77 |
91 |
78 AwkHandler(writer::RelationalWriter* relationalWriter, Configuration& configuration) : relationalWriter(relationalWriter), configuration(configuration) { |
92 AwkHandler(writer::RelationalWriter* relationalWriter, Configuration& configuration) : relationalWriter(relationalWriter), configuration(configuration) { |
79 } |
93 } |
80 |
94 |
81 void startRelation(string_t name, vector<AttributeMetadata> attributes) override { |
95 void startRelation(string_t name, vector<AttributeMetadata> attributes) override { |
|
96 cleanUp(); |
82 |
97 |
83 |
98 |
84 int awkInputReaderFD; |
99 int awkInputReaderFD; |
85 int awkInputWriterFD; |
|
86 int awkOutputReaderFD; |
100 int awkOutputReaderFD; |
87 int awkOutputWriterFD; |
101 int awkOutputWriterFD; |
88 |
102 |
89 createPipe(awkInputReaderFD, awkInputWriterFD); |
103 createPipe(awkInputReaderFD, awkInputWriterFD); |
90 createPipe(awkOutputReaderFD, awkOutputWriterFD); |
104 createPipe(awkOutputReaderFD, awkOutputWriterFD); |
93 |
107 |
94 if (awkPid < 0) { |
108 if (awkPid < 0) { |
95 throw cli::RelpipeCLIException(L"Unable to fork AWK process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? |
109 throw cli::RelpipeCLIException(L"Unable to fork AWK process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? |
96 } else if (awkPid == 0) { |
110 } else if (awkPid == 0) { |
97 // AWK child process |
111 // AWK child process |
98 close(awkInputWriterFD); |
112 closeOrThrow(awkInputWriterFD); |
99 close(awkOutputReaderFD); |
113 closeOrThrow(awkOutputReaderFD); |
100 |
114 |
101 redirectFD(awkInputReaderFD, 0); |
115 redirectFD(awkInputReaderFD, STDIN_FILENO); |
102 redirectFD(awkOutputWriterFD, 1); |
116 redirectFD(awkOutputWriterFD, STDOUT_FILENO); |
103 |
117 |
104 |
118 |
105 fwprintf(stderr, L"I am child AWK with PID: %d\n", getpid()); |
119 fwprintf(stderr, L"I am child AWK with PID: %d\n", getpid()); |
106 execlp("awk", "awk", "{print \"AWK says: line \" NR \" = \" $0;}", nullptr); |
120 execlp("awk", "awk", "{print \"AWK says: line \" NR \" = \" $0;}", nullptr); |
107 |
121 |
108 } else { |
122 } else { |
109 // Parent process |
123 // Parent process |
110 close(awkInputReaderFD); |
124 closeOrThrow(awkInputReaderFD); |
111 close(awkOutputWriterFD); |
125 closeOrThrow(awkOutputWriterFD); |
112 fwprintf(stderr, L"Forked AWK has PID: %d\n", awkPid); |
126 fwprintf(stderr, L"Forked AWK has PID: %d\n", awkPid); |
113 |
127 |
114 __pid_t writerPid = fork(); |
128 __pid_t writerPid = fork(); |
115 |
129 |
116 if (writerPid < 0) { |
130 if (writerPid < 0) { |
117 throw cli::RelpipeCLIException(L"Unable to fork Writer process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? |
131 throw cli::RelpipeCLIException(L"Unable to fork Writer process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? |
118 } else if (writerPid == 0) { |
132 } else if (writerPid == 0) { |
119 // Writer child process |
133 // Writer child process |
120 close(awkInputWriterFD); |
134 closeOrThrow(awkInputWriterFD); |
121 fwprintf(stderr, L"I am child Writer with PID: %d\n", getpid()); |
135 fwprintf(stderr, L"I am child Writer with PID: %d\n", getpid()); |
122 |
136 |
123 __gnu_cxx::stdio_filebuf<char> awkOutputReaderBuffer(awkOutputReaderFD, std::ios::in); |
137 __gnu_cxx::stdio_filebuf<char> awkOutputReaderBuffer(awkOutputReaderFD, std::ios::in); |
124 std::istream awkOutputReader(&awkOutputReaderBuffer); |
138 std::istream awkOutputReader(&awkOutputReaderBuffer); |
125 |
139 |
127 for (char ch = awkOutputReader.get(); awkOutputReader.good(); ch = awkOutputReader.get()) { |
141 for (char ch = awkOutputReader.get(); awkOutputReader.good(); ch = awkOutputReader.get()) { |
128 std::cout << ch; |
142 std::cout << ch; |
129 } |
143 } |
130 std::cout << "--- PIPE end ----------" << std::endl; |
144 std::cout << "--- PIPE end ----------" << std::endl; |
131 |
145 |
132 close(awkOutputReaderFD); |
146 closeOrThrow(awkOutputReaderFD); |
|
147 exit(0); |
133 } else { |
148 } else { |
134 // Parent process |
149 // Parent process |
135 close(awkOutputReaderFD); |
150 closeOrThrow(awkOutputReaderFD); |
136 fwprintf(stderr, L"Forked Writer has PID: %d\n", writerPid); |
151 fwprintf(stderr, L"Forked Writer has PID: %d\n", writerPid); |
137 |
152 |
138 dprintf(awkInputWriterFD, "hello world :-)\n"); |
153 dprintf(awkInputWriterFD, "hello world :-)\n"); |
139 close(awkInputWriterFD); |
154 //closeOrThrow(awkInputWriterFD); |
140 |
155 |
141 /* |
156 /* |
142 __gnu_cxx::stdio_filebuf<char> awkInputWriterBuffer(awkInputWriterFD, std::ios::out); |
157 __gnu_cxx::stdio_filebuf<char> awkInputWriterBuffer(awkInputWriterFD, std::ios::out); |
143 std::ostream awkInputWriter(&awkInputWriterBuffer); |
158 std::ostream awkInputWriter(&awkInputWriterBuffer); |
144 awkInputWriter << "hello world :-)" << std::endl; |
159 awkInputWriter << "hello world :-)" << std::endl; |
145 awkInputWriter.flush(); |
160 awkInputWriter.flush(); |
146 close(awkInputWriterFD); |
161 closeOrThrow(awkInputWriterFD); |
147 */ |
162 */ |
148 |
163 |
149 fwprintf(stderr, L"writing done and closed\n"); |
|
150 __pid_t waitResult1 = wait(NULL); |
|
151 fwprintf(stderr, L"wait 1 done: %d\n", waitResult1); |
|
152 __pid_t waitResult2 = wait(NULL); |
|
153 fwprintf(stderr, L"wait 2 done: %d\n", waitResult2); |
|
154 } |
164 } |
155 } |
165 } |
156 |
166 |
157 } |
167 } |
158 |
168 |
159 void attribute(const string_t& value) override { |
169 void attribute(const string_t& value) override { |
160 |
170 dprintf(awkInputWriterFD, "attribute!\n"); |
161 } |
171 } |
162 |
172 |
163 void endOfPipe() { |
173 void endOfPipe() { |
164 |
174 cleanUp(); |
165 } |
175 } |
166 |
176 |
167 }; |
177 }; |
168 |
178 |
169 } |
179 } |