122 return value; |
140 return value; |
123 } |
141 } |
124 |
142 |
125 public: |
143 public: |
126 |
144 |
127 AwkHandler(writer::RelationalWriter* relationalWriter, Configuration& configuration) : relationalWriter(relationalWriter), configuration(configuration) { |
145 /** |
|
146 * @param relationalWriter |
|
147 * @param relationalWriterFlush the writer must be flushed before fork() in order to |
|
148 * avoid duplicate output (otherwise single relation might be written from two processes); |
|
149 * This is a little hack – if it stops working, we should reconnect the pipes |
|
150 * and use the writer only from a single process and avoid its effective duplication, |
|
151 * or use different writers for each relation (or process). |
|
152 * @param configuration |
|
153 */ |
|
154 AwkHandler(writer::RelationalWriter* relationalWriter, std::function<void() > relationalWriterFlush, Configuration& configuration) : relationalWriter(relationalWriter), relationalWriterFlush(relationalWriterFlush), configuration(configuration) { |
128 } |
155 } |
129 |
156 |
130 void startRelation(string_t name, vector<AttributeMetadata> attributes) override { |
157 void startRelation(string_t name, vector<AttributeMetadata> attributes) override { |
131 cleanUp(); |
158 cleanUp(); |
132 |
159 |
133 currentReaderMetadata = attributes; |
160 currentReaderMetadata = attributes; |
134 |
161 |
135 int awkInputReaderFD; |
162 currentRelationConfiguration = nullptr; |
136 int awkOutputReaderFD; |
163 for (int i = 0; i < configuration.relationConfigurations.size(); i++) { |
137 int awkOutputWriterFD; |
164 if (regex_match(name, wregex(configuration.relationConfigurations[i].relation))) { |
138 |
165 currentRelationConfiguration = &configuration.relationConfigurations[i]; |
139 createPipe(awkInputReaderFD, awkInputWriterFD); |
166 break; // it there are multiple matches, only the first configuration is used |
140 createPipe(awkOutputReaderFD, awkOutputWriterFD); |
|
141 |
|
142 __pid_t awkPid = fork(); |
|
143 |
|
144 if (awkPid < 0) { |
|
145 throw cli::RelpipeCLIException(L"Unable to fork AWK process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? |
|
146 } else if (awkPid == 0) { |
|
147 // AWK child process |
|
148 closeOrThrow(awkInputWriterFD); |
|
149 closeOrThrow(awkOutputReaderFD); |
|
150 |
|
151 redirectFD(awkInputReaderFD, STDIN_FILENO); |
|
152 redirectFD(awkOutputWriterFD, STDOUT_FILENO); |
|
153 |
|
154 // AWK script: |
|
155 std::wstringstream awkScript; |
|
156 awkScript << L"BEGIN {" << std::endl; |
|
157 awkScript << L"FS=\"\\t\";" << std::endl; |
|
158 awkScript << L"};" << std::endl; |
|
159 |
|
160 awkScript << L"END {" << std::endl; |
|
161 // awkScript << … << std::endl; |
|
162 awkScript << L"};" << std::endl; |
|
163 |
|
164 awkScript << L"{print \"AWK says: line \" NR \" '\" $0 \"' has \" NF \" fields; first field is '\" $1 \"'\";}" << std::endl; |
|
165 |
|
166 // CLI arguments: |
|
167 std::vector<std::string> args; |
|
168 args.push_back("awk"); |
|
169 |
|
170 for (auto d : configuration.definitions) { |
|
171 args.push_back("-v"); |
|
172 args.push_back(convertor.to_bytes(a2v(d.name) + L"=" + d.value)); |
|
173 } |
167 } |
174 args.push_back(convertor.to_bytes(awkScript.str())); |
168 } |
175 |
169 |
176 // Runs AWK program found on $PATH → user can plug-in a custom implementation or a wrapper, but this can be also bit dangerous (however AWK itself is dangerous). |
170 if (currentRelationConfiguration) { |
177 execp(args); |
171 int awkInputReaderFD; |
178 } else { |
172 int awkOutputReaderFD; |
179 // Parent process |
173 int awkOutputWriterFD; |
180 closeOrThrow(awkInputReaderFD); |
174 |
181 closeOrThrow(awkOutputWriterFD); |
175 createPipe(awkInputReaderFD, awkInputWriterFD); |
182 |
176 createPipe(awkOutputReaderFD, awkOutputWriterFD); |
183 __pid_t writerPid = fork(); |
177 |
184 |
178 relationalWriterFlush(); |
185 if (writerPid < 0) { |
179 __pid_t awkPid = fork(); |
186 throw cli::RelpipeCLIException(L"Unable to fork Writer process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? |
180 |
187 } else if (writerPid == 0) { |
181 if (awkPid < 0) { |
188 // Writer child process |
182 throw cli::RelpipeCLIException(L"Unable to fork AWK process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? |
|
183 } else if (awkPid == 0) { |
|
184 // AWK child process |
189 closeOrThrow(awkInputWriterFD); |
185 closeOrThrow(awkInputWriterFD); |
190 |
|
191 locale::global(locale("")); // needed for processing unicode texts, otherwise getline() stopped working on first line with non-ascii characters; TODO: move somewhere else? |
|
192 |
|
193 __gnu_cxx::stdio_filebuf<wchar_t> awkOutputReaderBuffer(awkOutputReaderFD, std::ios::in); |
|
194 std::wistream awkOutputReader(&awkOutputReaderBuffer); |
|
195 |
|
196 relationalWriter->startRelation(L"writer_debug",{ |
|
197 {L"message", writer::TypeId::STRING}, |
|
198 }, true); |
|
199 |
|
200 for (string_t line; getline(awkOutputReader, line).good();) { |
|
201 relationalWriter->writeAttribute(line); |
|
202 } |
|
203 |
|
204 closeOrThrow(awkOutputReaderFD); |
186 closeOrThrow(awkOutputReaderFD); |
205 exit(0); |
187 |
|
188 redirectFD(awkInputReaderFD, STDIN_FILENO); |
|
189 redirectFD(awkOutputWriterFD, STDOUT_FILENO); |
|
190 |
|
191 // AWK script: |
|
192 std::wstringstream awkScript; |
|
193 awkScript << L"BEGIN {" << std::endl; |
|
194 awkScript << L"FS=\"\\t\";" << std::endl; |
|
195 awkScript << L"};" << std::endl; |
|
196 |
|
197 awkScript << L"END {" << std::endl; |
|
198 // awkScript << … << std::endl; |
|
199 awkScript << L"};" << std::endl; |
|
200 |
|
201 awkScript << L"{print \"AWK says: line \" NR \" '\" $0 \"' has \" NF \" fields; first field is '\" $1 \"'\";}" << std::endl; |
|
202 |
|
203 // CLI arguments: |
|
204 std::vector<std::string> args; |
|
205 args.push_back("awk"); |
|
206 |
|
207 for (auto d : configuration.definitions) addDefinition(args, d); |
|
208 for (auto d : currentRelationConfiguration->definitions) addDefinition(args, d); |
|
209 |
|
210 args.push_back(convertor.to_bytes(awkScript.str())); |
|
211 |
|
212 // Runs AWK program found on $PATH → user can plug-in a custom implementation or a wrapper, but this can be also bit dangerous (however AWK itself is dangerous). |
|
213 execp(args); |
206 } else { |
214 } else { |
207 // Parent process |
215 // Parent process |
208 closeOrThrow(awkOutputReaderFD); |
216 closeOrThrow(awkInputReaderFD); |
|
217 closeOrThrow(awkOutputWriterFD); |
|
218 |
|
219 __pid_t writerPid = fork(); |
|
220 |
|
221 if (writerPid < 0) { |
|
222 throw cli::RelpipeCLIException(L"Unable to fork Writer process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? |
|
223 } else if (writerPid == 0) { |
|
224 // Writer child process |
|
225 closeOrThrow(awkInputWriterFD); |
|
226 |
|
227 locale::global(locale("")); // needed for processing unicode texts, otherwise getline() stopped working on first line with non-ascii characters; TODO: move somewhere else? |
|
228 |
|
229 __gnu_cxx::stdio_filebuf<wchar_t> awkOutputReaderBuffer(awkOutputReaderFD, std::ios::in); |
|
230 std::wistream awkOutputReader(&awkOutputReaderBuffer); |
|
231 |
|
232 // FIXME: currentWriterMetadata |
|
233 relationalWriter->startRelation(name,{ |
|
234 {L"message", writer::TypeId::STRING}, |
|
235 }, true); |
|
236 |
|
237 for (string_t line; getline(awkOutputReader, line).good();) { |
|
238 relationalWriter->writeAttribute(line); |
|
239 } |
|
240 |
|
241 closeOrThrow(awkOutputReaderFD); |
|
242 exit(0); |
|
243 } else { |
|
244 // Parent process |
|
245 closeOrThrow(awkOutputReaderFD); |
|
246 } |
209 } |
247 } |
|
248 } else { |
|
249 add(currentReaderMetadata, currentWriterMetadata); |
|
250 relationalWriter->startRelation(name, currentWriterMetadata, true); |
210 } |
251 } |
211 |
252 |
212 } |
253 } |
213 |
254 |
214 void attribute(const string_t& value) override { |
255 void attribute(const string_t& value) override { |
215 string_t variableName = a2v(currentReaderMetadata[currentAttributeIndex].getAttributeName()); |
256 if (currentRelationConfiguration) { |
216 string_t variableValue = escapeAwkValue(value); |
257 string_t variableName = a2v(currentReaderMetadata[currentAttributeIndex].getAttributeName()); |
217 |
258 string_t variableValue = escapeAwkValue(value); |
218 currentAttributeIndex++; |
259 |
219 currentAttributeIndex = currentAttributeIndex % currentReaderMetadata.size(); |
260 currentAttributeIndex++; |
220 |
261 currentAttributeIndex = currentAttributeIndex % currentReaderMetadata.size(); |
221 // TODO: just the value – move name to the AWK function |
262 |
222 std::string variablePair = convertor.to_bytes(variableName + L"=" + variableValue); |
263 // TODO: just the value – move name to the AWK function |
223 |
264 std::string variablePair = convertor.to_bytes(variableName + L"=" + variableValue); |
224 if (currentAttributeIndex == 0) variablePair += "\n"; |
265 |
225 else variablePair += "\t"; |
266 if (currentAttributeIndex == 0) variablePair += "\n"; |
226 |
267 else variablePair += "\t"; |
227 write(awkInputWriterFD, variablePair.c_str(), variablePair.length()); |
268 |
228 |
269 write(awkInputWriterFD, variablePair.c_str(), variablePair.length()); |
|
270 |
|
271 } else { |
|
272 relationalWriter->writeAttribute(value); |
|
273 } |
229 } |
274 } |
230 |
275 |
231 void endOfPipe() { |
276 void endOfPipe() { |
232 cleanUp(); |
277 cleanUp(); |
233 } |
278 } |