implement --relation option (thus some relations might pass unmodified by AWK), support per-relation variables v_0
authorFrantišek Kučera <franta-hg@frantovo.cz>
Mon, 06 May 2019 23:52:22 +0200
branchv_0
changeset 13 b74001992ec3
parent 12 8844ebce8fb4
child 14 f407f2a2871d
implement --relation option (thus some relations might pass unmodified by AWK), support per-relation variables
src/AwkHandler.h
src/relpipe-tr-awk.cpp
--- a/src/AwkHandler.h	Mon May 06 21:57:16 2019 +0200
+++ b/src/AwkHandler.h	Mon May 06 23:52:22 2019 +0200
@@ -17,6 +17,7 @@
  */
 #pragma once
 
+#include<functional>
 #include <memory>
 #include <string>
 #include <vector>
@@ -62,10 +63,13 @@
 private:
 	Configuration configuration;
 	writer::RelationalWriter* relationalWriter;
+	std::function<void() > relationalWriterFlush;
 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings
 
 	int awkInputWriterFD = -1;
+	RelationConfiguration* currentRelationConfiguration = nullptr;
 	std::vector<AttributeMetadata> currentReaderMetadata;
+	vector<writer::AttributeMetadata> currentWriterMetadata;
 	integer_t currentAttributeIndex = 0;
 
 	void createPipe(int& readerFD, int& writerFD) {
@@ -97,6 +101,19 @@
 		throw cli::RelpipeCLIException(L"Unable to do execvp().", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
 	}
 
+	void addDefinition(std::vector<std::string>& awkCliArgs, DefinitionRecipe& d) {
+		awkCliArgs.push_back("-v");
+		awkCliArgs.push_back(convertor.to_bytes(a2v(d.name) + L"=" + d.value));
+	}
+
+	void add(vector<AttributeMetadata>& readerAttributes, vector<writer::AttributeMetadata>& writerAttributes) {
+		for (AttributeMetadata readerAttributes : readerAttributes)
+			writerAttributes.push_back({
+				readerAttributes.getAttributeName(),
+				relationalWriter->toTypeId(readerAttributes.getTypeName())
+			});
+	}
+
 	void cleanUp() {
 		if (awkInputWriterFD >= 0) {
 			closeOrThrow(awkInputWriterFD);
@@ -108,6 +125,7 @@
 
 		currentAttributeIndex = 0;
 		currentReaderMetadata.clear();
+		currentWriterMetadata.clear();
 	}
 
 	string_t a2v(const string_t& attributeName) {
@@ -124,7 +142,16 @@
 
 public:
 
-	AwkHandler(writer::RelationalWriter* relationalWriter, Configuration& configuration) : relationalWriter(relationalWriter), configuration(configuration) {
+	/**
+	 * @param relationalWriter
+	 * @param relationalWriterFlush the writer must be flushed before fork() in order to 
+	 * avoid duplicate output (otherwise single relation might be written from two processes); 
+	 * This is a little hack – if it stops working, we should reconnect the pipes 
+	 * and use the writer only from a single process and avoid its effective duplication,
+	 * or use different writers for each relation (or process).
+	 * @param configuration
+	 */
+	AwkHandler(writer::RelationalWriter* relationalWriter, std::function<void() > relationalWriterFlush, Configuration& configuration) : relationalWriter(relationalWriter), relationalWriterFlush(relationalWriterFlush), configuration(configuration) {
 	}
 
 	void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
@@ -132,100 +159,118 @@
 
 		currentReaderMetadata = attributes;
 
-		int awkInputReaderFD;
-		int awkOutputReaderFD;
-		int awkOutputWriterFD;
-
-		createPipe(awkInputReaderFD, awkInputWriterFD);
-		createPipe(awkOutputReaderFD, awkOutputWriterFD);
+		currentRelationConfiguration = nullptr;
+		for (int i = 0; i < configuration.relationConfigurations.size(); i++) {
+			if (regex_match(name, wregex(configuration.relationConfigurations[i].relation))) {
+				currentRelationConfiguration = &configuration.relationConfigurations[i];
+				break; // it there are multiple matches, only the first configuration is used
+			}
+		}
 
-		__pid_t awkPid = fork();
-
-		if (awkPid < 0) {
-			throw cli::RelpipeCLIException(L"Unable to fork AWK process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
-		} else if (awkPid == 0) {
-			// AWK child process
-			closeOrThrow(awkInputWriterFD);
-			closeOrThrow(awkOutputReaderFD);
+		if (currentRelationConfiguration) {
+			int awkInputReaderFD;
+			int awkOutputReaderFD;
+			int awkOutputWriterFD;
 
-			redirectFD(awkInputReaderFD, STDIN_FILENO);
-			redirectFD(awkOutputWriterFD, STDOUT_FILENO);
+			createPipe(awkInputReaderFD, awkInputWriterFD);
+			createPipe(awkOutputReaderFD, awkOutputWriterFD);
 
-			// AWK script:
-			std::wstringstream awkScript;
-			awkScript << L"BEGIN {" << std::endl;
-			awkScript << L"FS=\"\\t\";" << std::endl;
-			awkScript << L"};" << std::endl;
+			relationalWriterFlush();
+			__pid_t awkPid = fork();
 
-			awkScript << L"END {" << std::endl;
-			// awkScript << … << std::endl;
-			awkScript << L"};" << std::endl;
-
-			awkScript << L"{print \"AWK says: line \" NR \" '\" $0 \"' has \" NF \" fields; first field is '\" $1 \"'\";}" << std::endl;
-
-			// CLI arguments:
-			std::vector<std::string> args;
-			args.push_back("awk");
+			if (awkPid < 0) {
+				throw cli::RelpipeCLIException(L"Unable to fork AWK process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
+			} else if (awkPid == 0) {
+				// AWK child process
+				closeOrThrow(awkInputWriterFD);
+				closeOrThrow(awkOutputReaderFD);
 
-			for (auto d : configuration.definitions) {
-				args.push_back("-v");
-				args.push_back(convertor.to_bytes(a2v(d.name) + L"=" + d.value));
-			}
-			args.push_back(convertor.to_bytes(awkScript.str()));
+				redirectFD(awkInputReaderFD, STDIN_FILENO);
+				redirectFD(awkOutputWriterFD, STDOUT_FILENO);
 
-			// Runs AWK program found on $PATH → user can plug-in a custom implementation or a wrapper, but this can be also bit dangerous (however AWK itself is dangerous).
-			execp(args);
-		} else {
-			// Parent process
-			closeOrThrow(awkInputReaderFD);
-			closeOrThrow(awkOutputWriterFD);
+				// AWK script:
+				std::wstringstream awkScript;
+				awkScript << L"BEGIN {" << std::endl;
+				awkScript << L"FS=\"\\t\";" << std::endl;
+				awkScript << L"};" << std::endl;
 
-			__pid_t writerPid = fork();
+				awkScript << L"END {" << std::endl;
+				// awkScript << … << std::endl;
+				awkScript << L"};" << std::endl;
 
-			if (writerPid < 0) {
-				throw cli::RelpipeCLIException(L"Unable to fork Writer process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
-			} else if (writerPid == 0) {
-				// Writer child process
-				closeOrThrow(awkInputWriterFD);
+				awkScript << L"{print \"AWK says: line \" NR \" '\" $0 \"' has \" NF \" fields; first field is '\" $1 \"'\";}" << std::endl;
 
-				locale::global(locale("")); // needed for processing unicode texts, otherwise getline() stopped working on first line with non-ascii characters; TODO: move somewhere else?
-
-				__gnu_cxx::stdio_filebuf<wchar_t> awkOutputReaderBuffer(awkOutputReaderFD, std::ios::in);
-				std::wistream awkOutputReader(&awkOutputReaderBuffer);
+				// CLI arguments:
+				std::vector<std::string> args;
+				args.push_back("awk");
 
-				relationalWriter->startRelation(L"writer_debug",{
-					{L"message", writer::TypeId::STRING},
-				}, true);
+				for (auto d : configuration.definitions) addDefinition(args, d);
+				for (auto d : currentRelationConfiguration->definitions) addDefinition(args, d);
 
-				for (string_t line; getline(awkOutputReader, line).good();) {
-					relationalWriter->writeAttribute(line);
-				}
+				args.push_back(convertor.to_bytes(awkScript.str()));
 
-				closeOrThrow(awkOutputReaderFD);
-				exit(0);
+				// Runs AWK program found on $PATH → user can plug-in a custom implementation or a wrapper, but this can be also bit dangerous (however AWK itself is dangerous).
+				execp(args);
 			} else {
 				// Parent process
-				closeOrThrow(awkOutputReaderFD);
+				closeOrThrow(awkInputReaderFD);
+				closeOrThrow(awkOutputWriterFD);
+
+				__pid_t writerPid = fork();
+
+				if (writerPid < 0) {
+					throw cli::RelpipeCLIException(L"Unable to fork Writer process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
+				} else if (writerPid == 0) {
+					// Writer child process
+					closeOrThrow(awkInputWriterFD);
+
+					locale::global(locale("")); // needed for processing unicode texts, otherwise getline() stopped working on first line with non-ascii characters; TODO: move somewhere else?
+
+					__gnu_cxx::stdio_filebuf<wchar_t> awkOutputReaderBuffer(awkOutputReaderFD, std::ios::in);
+					std::wistream awkOutputReader(&awkOutputReaderBuffer);
+
+					// FIXME: currentWriterMetadata
+					relationalWriter->startRelation(name,{
+						{L"message", writer::TypeId::STRING},
+					}, true);
+
+					for (string_t line; getline(awkOutputReader, line).good();) {
+						relationalWriter->writeAttribute(line);
+					}
+
+					closeOrThrow(awkOutputReaderFD);
+					exit(0);
+				} else {
+					// Parent process
+					closeOrThrow(awkOutputReaderFD);
+				}
 			}
+		} else {
+			add(currentReaderMetadata, currentWriterMetadata);
+			relationalWriter->startRelation(name, currentWriterMetadata, true);
 		}
 
 	}
 
 	void attribute(const string_t& value) override {
-		string_t variableName = a2v(currentReaderMetadata[currentAttributeIndex].getAttributeName());
-		string_t variableValue = escapeAwkValue(value);
+		if (currentRelationConfiguration) {
+			string_t variableName = a2v(currentReaderMetadata[currentAttributeIndex].getAttributeName());
+			string_t variableValue = escapeAwkValue(value);
 
-		currentAttributeIndex++;
-		currentAttributeIndex = currentAttributeIndex % currentReaderMetadata.size();
+			currentAttributeIndex++;
+			currentAttributeIndex = currentAttributeIndex % currentReaderMetadata.size();
 
-		// TODO: just the value – move name to the AWK function
-		std::string variablePair = convertor.to_bytes(variableName + L"=" + variableValue);
+			// TODO: just the value – move name to the AWK function
+			std::string variablePair = convertor.to_bytes(variableName + L"=" + variableValue);
+
+			if (currentAttributeIndex == 0) variablePair += "\n";
+			else variablePair += "\t";
 
-		if (currentAttributeIndex == 0) variablePair += "\n";
-		else variablePair += "\t";
+			write(awkInputWriterFD, variablePair.c_str(), variablePair.length());
 
-		write(awkInputWriterFD, variablePair.c_str(), variablePair.length());
-
+		} else {
+			relationalWriter->writeAttribute(value);
+		}
 	}
 
 	void endOfPipe() {
--- a/src/relpipe-tr-awk.cpp	Mon May 06 21:57:16 2019 +0200
+++ b/src/relpipe-tr-awk.cpp	Mon May 06 23:52:22 2019 +0200
@@ -19,6 +19,7 @@
 #include <cstdio>
 #include <cstdlib>
 #include <memory>
+#include <functional>
 
 #include <relpipe/cli/CLI.h>
 #include <relpipe/cli/RelpipeCLIException.h>
@@ -52,10 +53,10 @@
 		Configuration configuration = cliParser.parse(cli.arguments());
 		std::shared_ptr<reader::RelationalReader> reader(reader::Factory::create(std::cin));
 		std::shared_ptr<writer::RelationalWriter> writer(writer::Factory::create(std::cout));
-		AwkHandler handler(writer.get(), configuration);
+		AwkHandler handler(writer.get(), std::bind(fflush, stdout), configuration); // std::bind(fflush, XXX) writer::Factory::create(XXX) must be the same stream XXX
 		reader->addHandler(&handler);
 		reader->process();
-		
+
 		resultCode = CLI::EXIT_CODE_SUCCESS;
 
 	} catch (RelpipeCLIException& e) {