src/AwkHandler.h
branchv_0
changeset 21 d46a727b7965
parent 20 f937ad57351f
child 22 98acfdc4c20b
equal deleted inserted replaced
20:f937ad57351f 21:d46a727b7965
   201 	}
   201 	}
   202 
   202 
   203 	void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
   203 	void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
   204 		cleanUp();
   204 		cleanUp();
   205 
   205 
   206 		currentReaderMetadata = attributes;
       
   207 
       
   208 		for (int i = 0; i < configuration.relationConfigurations.size(); i++) {
   206 		for (int i = 0; i < configuration.relationConfigurations.size(); i++) {
   209 			if (regex_match(name, wregex(configuration.relationConfigurations[i].relation))) {
   207 			if (regex_match(name, wregex(configuration.relationConfigurations[i].relation))) {
   210 				currentRelationConfiguration = &configuration.relationConfigurations[i];
   208 				currentRelationConfiguration = &configuration.relationConfigurations[i];
   211 				break; // it there are multiple matches, only the first configuration is used
   209 				break; // it there are multiple matches, only the first configuration is used
   212 			}
   210 			}
       
   211 		}
       
   212 
       
   213 		currentReaderMetadata = attributes;
       
   214 		// TODO: move to a reusable method (or use same metadata on both reader and writer side?)		
       
   215 		if (currentRelationConfiguration && currentRelationConfiguration->writerMetadata.size()) {
       
   216 			if (currentRelationConfiguration->inputAttributesPrepend) add(currentReaderMetadata, currentWriterMetadata);
       
   217 			currentWriterMetadata.insert(currentWriterMetadata.end(), currentRelationConfiguration->writerMetadata.begin(), currentRelationConfiguration->writerMetadata.end());
       
   218 			if (currentRelationConfiguration->inputAttributesAppend) add(currentReaderMetadata, currentWriterMetadata);
       
   219 		} else {
       
   220 			add(currentReaderMetadata, currentWriterMetadata);
   213 		}
   221 		}
   214 
   222 
   215 		if (currentRelationConfiguration) {
   223 		if (currentRelationConfiguration) {
   216 			int awkInputReaderFD;
   224 			int awkInputReaderFD;
   217 			int awkOutputReaderFD;
   225 			int awkOutputReaderFD;
   259 				awkScript << L"END {" << std::endl;
   267 				awkScript << L"END {" << std::endl;
   260 				awkScript << currentRelationConfiguration->awkAfterRecords << std::endl;
   268 				awkScript << currentRelationConfiguration->awkAfterRecords << std::endl;
   261 				awkScript << L"};" << std::endl;
   269 				awkScript << L"};" << std::endl;
   262 				awkScript << std::endl;
   270 				awkScript << std::endl;
   263 
   271 
   264 				awkScript << currentRelationConfiguration->awkForEach << std::endl;
   272 				awkScript << L"function _escape(value) {" << std::endl;
       
   273 				// TODO: escape function
       
   274 				awkScript << L"return value;" << std::endl;
       
   275 				awkScript << L"};" << std::endl;
       
   276 				awkScript << std::endl;
       
   277 
       
   278 				awkScript << L"function _unescape(value) {" << std::endl;
       
   279 				// TODO: unescape function
       
   280 				awkScript << L"return value;" << std::endl;
       
   281 				awkScript << L"};" << std::endl;
       
   282 				awkScript << std::endl;
       
   283 
       
   284 				awkScript << L"function _readVariables() {" << std::endl;
       
   285 				for (int i = 0; i < currentReaderMetadata.size(); i++) awkScript << a2v(currentReaderMetadata[i].getAttributeName()) << L"=_unescape($" << (i + 1) << L");" << std::endl;
       
   286 				awkScript << L"};" << std::endl;
       
   287 				awkScript << std::endl;
       
   288 
       
   289 				awkScript << L"function _writeVariables() {" << std::endl;
       
   290 				awkScript << L"NF=" << currentWriterMetadata.size() << ";" << std::endl;
       
   291 				for (int i = 0; i < currentWriterMetadata.size(); i++) awkScript << L"$" << (i + 1) << L"=_escape(" << a2v(currentWriterMetadata[i].attributeName) << L");" << std::endl;
       
   292 				awkScript << L"};" << std::endl;
       
   293 				awkScript << std::endl;
       
   294 
       
   295 				awkScript << L"function record() {" << std::endl;
       
   296 				awkScript << L"_writeVariables();" << std::endl;
       
   297 				awkScript << L"print;" << std::endl;
       
   298 				awkScript << L"};" << std::endl;
       
   299 				awkScript << std::endl;
       
   300 
       
   301 				awkScript << L"{ _readVariables();  }" << std::endl; // read line (input attributes) into AWK variables
       
   302 				awkScript << L"{ _writeVariables(); }" << std::endl; // write AWK variables to the line (so it matches the output attributes and can be implicitly printed without explicit record() call)
       
   303 				awkScript << std::endl;
       
   304 
       
   305 				awkScript << currentRelationConfiguration->awkForEach << std::endl; // user's code – can modify variables, filter results or explicitly call record() (can generate additional records or duplicate them)
   265 
   306 
   266 				// CLI arguments:
   307 				// CLI arguments:
   267 				std::vector<std::string> args;
   308 				std::vector<std::string> args;
   268 				args.push_back("awk");
   309 				args.push_back("awk");
   269 
   310 
   288 					closeOrThrow(awkInputWriterFD);
   329 					closeOrThrow(awkInputWriterFD);
   289 
   330 
   290 					if (currentRelationConfiguration->drop) {
   331 					if (currentRelationConfiguration->drop) {
   291 						// TODO: omit whole this process and pipe AWK output to /dev/null?
   332 						// TODO: omit whole this process and pipe AWK output to /dev/null?
   292 					} else {
   333 					} else {
   293 						// FIXME: currentWriterMetadata
   334 						relationalWriter->startRelation(name, currentWriterMetadata, true);
   294 						relationalWriter->startRelation(name,{
       
   295 							{L"message", writer::TypeId::STRING},
       
   296 						}, true);
       
   297 					}
   335 					}
   298 
   336 
   299 					processAwkOutput(awkOutputReaderFD);
   337 					processAwkOutput(awkOutputReaderFD);
   300 					exit(0);
   338 					exit(0);
   301 				} else {
   339 				} else {
   302 					// Parent process
   340 					// Parent process
   303 					closeOrThrow(awkOutputReaderFD);
   341 					closeOrThrow(awkOutputReaderFD);
   304 				}
   342 				}
   305 			}
   343 			}
   306 		} else {
   344 		} else {
   307 			add(currentReaderMetadata, currentWriterMetadata);
       
   308 			relationalWriter->startRelation(name, currentWriterMetadata, true);
   345 			relationalWriter->startRelation(name, currentWriterMetadata, true);
   309 		}
   346 		}
   310 
   347 
   311 	}
   348 	}
   312 
   349