201 } |
201 } |
202 |
202 |
203 void startRelation(string_t name, vector<AttributeMetadata> attributes) override { |
203 void startRelation(string_t name, vector<AttributeMetadata> attributes) override { |
204 cleanUp(); |
204 cleanUp(); |
205 |
205 |
206 currentReaderMetadata = attributes; |
|
207 |
|
208 for (int i = 0; i < configuration.relationConfigurations.size(); i++) { |
206 for (int i = 0; i < configuration.relationConfigurations.size(); i++) { |
209 if (regex_match(name, wregex(configuration.relationConfigurations[i].relation))) { |
207 if (regex_match(name, wregex(configuration.relationConfigurations[i].relation))) { |
210 currentRelationConfiguration = &configuration.relationConfigurations[i]; |
208 currentRelationConfiguration = &configuration.relationConfigurations[i]; |
211 break; // it there are multiple matches, only the first configuration is used |
209 break; // it there are multiple matches, only the first configuration is used |
212 } |
210 } |
|
211 } |
|
212 |
|
213 currentReaderMetadata = attributes; |
|
214 // TODO: move to a reusable method (or use same metadata on both reader and writer side?) |
|
215 if (currentRelationConfiguration && currentRelationConfiguration->writerMetadata.size()) { |
|
216 if (currentRelationConfiguration->inputAttributesPrepend) add(currentReaderMetadata, currentWriterMetadata); |
|
217 currentWriterMetadata.insert(currentWriterMetadata.end(), currentRelationConfiguration->writerMetadata.begin(), currentRelationConfiguration->writerMetadata.end()); |
|
218 if (currentRelationConfiguration->inputAttributesAppend) add(currentReaderMetadata, currentWriterMetadata); |
|
219 } else { |
|
220 add(currentReaderMetadata, currentWriterMetadata); |
213 } |
221 } |
214 |
222 |
215 if (currentRelationConfiguration) { |
223 if (currentRelationConfiguration) { |
216 int awkInputReaderFD; |
224 int awkInputReaderFD; |
217 int awkOutputReaderFD; |
225 int awkOutputReaderFD; |
259 awkScript << L"END {" << std::endl; |
267 awkScript << L"END {" << std::endl; |
260 awkScript << currentRelationConfiguration->awkAfterRecords << std::endl; |
268 awkScript << currentRelationConfiguration->awkAfterRecords << std::endl; |
261 awkScript << L"};" << std::endl; |
269 awkScript << L"};" << std::endl; |
262 awkScript << std::endl; |
270 awkScript << std::endl; |
263 |
271 |
264 awkScript << currentRelationConfiguration->awkForEach << std::endl; |
272 awkScript << L"function _escape(value) {" << std::endl; |
|
273 // TODO: escape function |
|
274 awkScript << L"return value;" << std::endl; |
|
275 awkScript << L"};" << std::endl; |
|
276 awkScript << std::endl; |
|
277 |
|
278 awkScript << L"function _unescape(value) {" << std::endl; |
|
279 // TODO: unescape function |
|
280 awkScript << L"return value;" << std::endl; |
|
281 awkScript << L"};" << std::endl; |
|
282 awkScript << std::endl; |
|
283 |
|
284 awkScript << L"function _readVariables() {" << std::endl; |
|
285 for (int i = 0; i < currentReaderMetadata.size(); i++) awkScript << a2v(currentReaderMetadata[i].getAttributeName()) << L"=_unescape($" << (i + 1) << L");" << std::endl; |
|
286 awkScript << L"};" << std::endl; |
|
287 awkScript << std::endl; |
|
288 |
|
289 awkScript << L"function _writeVariables() {" << std::endl; |
|
290 awkScript << L"NF=" << currentWriterMetadata.size() << ";" << std::endl; |
|
291 for (int i = 0; i < currentWriterMetadata.size(); i++) awkScript << L"$" << (i + 1) << L"=_escape(" << a2v(currentWriterMetadata[i].attributeName) << L");" << std::endl; |
|
292 awkScript << L"};" << std::endl; |
|
293 awkScript << std::endl; |
|
294 |
|
295 awkScript << L"function record() {" << std::endl; |
|
296 awkScript << L"_writeVariables();" << std::endl; |
|
297 awkScript << L"print;" << std::endl; |
|
298 awkScript << L"};" << std::endl; |
|
299 awkScript << std::endl; |
|
300 |
|
301 awkScript << L"{ _readVariables(); }" << std::endl; // read line (input attributes) into AWK variables |
|
302 awkScript << L"{ _writeVariables(); }" << std::endl; // write AWK variables to the line (so it matches the output attributes and can be implicitly printed without explicit record() call) |
|
303 awkScript << std::endl; |
|
304 |
|
305 awkScript << currentRelationConfiguration->awkForEach << std::endl; // user's code – can modify variables, filter results or explicitly call record() (can generate additional records or duplicate them) |
265 |
306 |
266 // CLI arguments: |
307 // CLI arguments: |
267 std::vector<std::string> args; |
308 std::vector<std::string> args; |
268 args.push_back("awk"); |
309 args.push_back("awk"); |
269 |
310 |
288 closeOrThrow(awkInputWriterFD); |
329 closeOrThrow(awkInputWriterFD); |
289 |
330 |
290 if (currentRelationConfiguration->drop) { |
331 if (currentRelationConfiguration->drop) { |
291 // TODO: omit whole this process and pipe AWK output to /dev/null? |
332 // TODO: omit whole this process and pipe AWK output to /dev/null? |
292 } else { |
333 } else { |
293 // FIXME: currentWriterMetadata |
334 relationalWriter->startRelation(name, currentWriterMetadata, true); |
294 relationalWriter->startRelation(name,{ |
|
295 {L"message", writer::TypeId::STRING}, |
|
296 }, true); |
|
297 } |
335 } |
298 |
336 |
299 processAwkOutput(awkOutputReaderFD); |
337 processAwkOutput(awkOutputReaderFD); |
300 exit(0); |
338 exit(0); |
301 } else { |
339 } else { |
302 // Parent process |
340 // Parent process |
303 closeOrThrow(awkOutputReaderFD); |
341 closeOrThrow(awkOutputReaderFD); |
304 } |
342 } |
305 } |
343 } |
306 } else { |
344 } else { |
307 add(currentReaderMetadata, currentWriterMetadata); |
|
308 relationalWriter->startRelation(name, currentWriterMetadata, true); |
345 relationalWriter->startRelation(name, currentWriterMetadata, true); |
309 } |
346 } |
310 |
347 |
311 } |
348 } |
312 |
349 |