src/AwkHandler.h
branchv_0
changeset 13 b74001992ec3
parent 12 8844ebce8fb4
child 15 ba91a464d2b3
equal deleted inserted replaced
12:8844ebce8fb4 13:b74001992ec3
    15  * You should have received a copy of the GNU General Public License
    15  * You should have received a copy of the GNU General Public License
    16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
    16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
    17  */
    17  */
    18 #pragma once
    18 #pragma once
    19 
    19 
       
    20 #include<functional>
    20 #include <memory>
    21 #include <memory>
    21 #include <string>
    22 #include <string>
    22 #include <vector>
    23 #include <vector>
    23 #include <iostream>
    24 #include <iostream>
    24 #include <sstream>
    25 #include <sstream>
    60  */
    61  */
    61 class AwkHandler : public RelationalReaderStringHandler {
    62 class AwkHandler : public RelationalReaderStringHandler {
    62 private:
    63 private:
    63 	Configuration configuration;
    64 	Configuration configuration;
    64 	writer::RelationalWriter* relationalWriter;
    65 	writer::RelationalWriter* relationalWriter;
       
    66 	std::function<void() > relationalWriterFlush;
    65 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings
    67 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings
    66 
    68 
    67 	int awkInputWriterFD = -1;
    69 	int awkInputWriterFD = -1;
       
    70 	RelationConfiguration* currentRelationConfiguration = nullptr;
    68 	std::vector<AttributeMetadata> currentReaderMetadata;
    71 	std::vector<AttributeMetadata> currentReaderMetadata;
       
    72 	vector<writer::AttributeMetadata> currentWriterMetadata;
    69 	integer_t currentAttributeIndex = 0;
    73 	integer_t currentAttributeIndex = 0;
    70 
    74 
    71 	void createPipe(int& readerFD, int& writerFD) {
    75 	void createPipe(int& readerFD, int& writerFD) {
    72 		int fds[2];
    76 		int fds[2];
    73 		int result = pipe(fds);
    77 		int result = pipe(fds);
    93 
    97 
    94 		execvp(a[0], (char*const*) a);
    98 		execvp(a[0], (char*const*) a);
    95 
    99 
    96 		delete[] a;
   100 		delete[] a;
    97 		throw cli::RelpipeCLIException(L"Unable to do execvp().", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
   101 		throw cli::RelpipeCLIException(L"Unable to do execvp().", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
       
   102 	}
       
   103 
       
   104 	void addDefinition(std::vector<std::string>& awkCliArgs, DefinitionRecipe& d) {
       
   105 		awkCliArgs.push_back("-v");
       
   106 		awkCliArgs.push_back(convertor.to_bytes(a2v(d.name) + L"=" + d.value));
       
   107 	}
       
   108 
       
   109 	void add(vector<AttributeMetadata>& readerAttributes, vector<writer::AttributeMetadata>& writerAttributes) {
       
   110 		for (AttributeMetadata readerAttributes : readerAttributes)
       
   111 			writerAttributes.push_back({
       
   112 				readerAttributes.getAttributeName(),
       
   113 				relationalWriter->toTypeId(readerAttributes.getTypeName())
       
   114 			});
    98 	}
   115 	}
    99 
   116 
   100 	void cleanUp() {
   117 	void cleanUp() {
   101 		if (awkInputWriterFD >= 0) {
   118 		if (awkInputWriterFD >= 0) {
   102 			closeOrThrow(awkInputWriterFD);
   119 			closeOrThrow(awkInputWriterFD);
   106 			awkInputWriterFD = -1;
   123 			awkInputWriterFD = -1;
   107 		}
   124 		}
   108 
   125 
   109 		currentAttributeIndex = 0;
   126 		currentAttributeIndex = 0;
   110 		currentReaderMetadata.clear();
   127 		currentReaderMetadata.clear();
       
   128 		currentWriterMetadata.clear();
   111 	}
   129 	}
   112 
   130 
   113 	string_t a2v(const string_t& attributeName) {
   131 	string_t a2v(const string_t& attributeName) {
   114 		// FIXME: escape reserved names; prefix with _ ?
   132 		// FIXME: escape reserved names; prefix with _ ?
   115 		// cat awkgram.y | awk -v FS='\\{"|",' -v ORS='|' '/static const struct token tokentab/, /\};/ { if (/^\{/) { print $2} }'
   133 		// cat awkgram.y | awk -v FS='\\{"|",' -v ORS='|' '/static const struct token tokentab/, /\};/ { if (/^\{/) { print $2} }'
   122 		return value;
   140 		return value;
   123 	}
   141 	}
   124 
   142 
   125 public:
   143 public:
   126 
   144 
   127 	AwkHandler(writer::RelationalWriter* relationalWriter, Configuration& configuration) : relationalWriter(relationalWriter), configuration(configuration) {
   145 	/**
       
   146 	 * @param relationalWriter
       
   147 	 * @param relationalWriterFlush the writer must be flushed before fork() in order to 
       
   148 	 * avoid duplicate output (otherwise single relation might be written from two processes); 
       
   149 	 * This is a little hack – if it stops working, we should reconnect the pipes 
       
   150 	 * and use the writer only from a single process and avoid its effective duplication,
       
   151 	 * or use different writers for each relation (or process).
       
   152 	 * @param configuration
       
   153 	 */
       
   154 	AwkHandler(writer::RelationalWriter* relationalWriter, std::function<void() > relationalWriterFlush, Configuration& configuration) : relationalWriter(relationalWriter), relationalWriterFlush(relationalWriterFlush), configuration(configuration) {
   128 	}
   155 	}
   129 
   156 
   130 	void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
   157 	void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
   131 		cleanUp();
   158 		cleanUp();
   132 
   159 
   133 		currentReaderMetadata = attributes;
   160 		currentReaderMetadata = attributes;
   134 
   161 
   135 		int awkInputReaderFD;
   162 		currentRelationConfiguration = nullptr;
   136 		int awkOutputReaderFD;
   163 		for (int i = 0; i < configuration.relationConfigurations.size(); i++) {
   137 		int awkOutputWriterFD;
   164 			if (regex_match(name, wregex(configuration.relationConfigurations[i].relation))) {
   138 
   165 				currentRelationConfiguration = &configuration.relationConfigurations[i];
   139 		createPipe(awkInputReaderFD, awkInputWriterFD);
   166 				break; // it there are multiple matches, only the first configuration is used
   140 		createPipe(awkOutputReaderFD, awkOutputWriterFD);
       
   141 
       
   142 		__pid_t awkPid = fork();
       
   143 
       
   144 		if (awkPid < 0) {
       
   145 			throw cli::RelpipeCLIException(L"Unable to fork AWK process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
       
   146 		} else if (awkPid == 0) {
       
   147 			// AWK child process
       
   148 			closeOrThrow(awkInputWriterFD);
       
   149 			closeOrThrow(awkOutputReaderFD);
       
   150 
       
   151 			redirectFD(awkInputReaderFD, STDIN_FILENO);
       
   152 			redirectFD(awkOutputWriterFD, STDOUT_FILENO);
       
   153 
       
   154 			// AWK script:
       
   155 			std::wstringstream awkScript;
       
   156 			awkScript << L"BEGIN {" << std::endl;
       
   157 			awkScript << L"FS=\"\\t\";" << std::endl;
       
   158 			awkScript << L"};" << std::endl;
       
   159 
       
   160 			awkScript << L"END {" << std::endl;
       
   161 			// awkScript << … << std::endl;
       
   162 			awkScript << L"};" << std::endl;
       
   163 
       
   164 			awkScript << L"{print \"AWK says: line \" NR \" '\" $0 \"' has \" NF \" fields; first field is '\" $1 \"'\";}" << std::endl;
       
   165 
       
   166 			// CLI arguments:
       
   167 			std::vector<std::string> args;
       
   168 			args.push_back("awk");
       
   169 
       
   170 			for (auto d : configuration.definitions) {
       
   171 				args.push_back("-v");
       
   172 				args.push_back(convertor.to_bytes(a2v(d.name) + L"=" + d.value));
       
   173 			}
   167 			}
   174 			args.push_back(convertor.to_bytes(awkScript.str()));
   168 		}
   175 
   169 
   176 			// Runs AWK program found on $PATH → user can plug-in a custom implementation or a wrapper, but this can be also bit dangerous (however AWK itself is dangerous).
   170 		if (currentRelationConfiguration) {
   177 			execp(args);
   171 			int awkInputReaderFD;
   178 		} else {
   172 			int awkOutputReaderFD;
   179 			// Parent process
   173 			int awkOutputWriterFD;
   180 			closeOrThrow(awkInputReaderFD);
   174 
   181 			closeOrThrow(awkOutputWriterFD);
   175 			createPipe(awkInputReaderFD, awkInputWriterFD);
   182 
   176 			createPipe(awkOutputReaderFD, awkOutputWriterFD);
   183 			__pid_t writerPid = fork();
   177 
   184 
   178 			relationalWriterFlush();
   185 			if (writerPid < 0) {
   179 			__pid_t awkPid = fork();
   186 				throw cli::RelpipeCLIException(L"Unable to fork Writer process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
   180 
   187 			} else if (writerPid == 0) {
   181 			if (awkPid < 0) {
   188 				// Writer child process
   182 				throw cli::RelpipeCLIException(L"Unable to fork AWK process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
       
   183 			} else if (awkPid == 0) {
       
   184 				// AWK child process
   189 				closeOrThrow(awkInputWriterFD);
   185 				closeOrThrow(awkInputWriterFD);
   190 
       
   191 				locale::global(locale("")); // needed for processing unicode texts, otherwise getline() stopped working on first line with non-ascii characters; TODO: move somewhere else?
       
   192 
       
   193 				__gnu_cxx::stdio_filebuf<wchar_t> awkOutputReaderBuffer(awkOutputReaderFD, std::ios::in);
       
   194 				std::wistream awkOutputReader(&awkOutputReaderBuffer);
       
   195 
       
   196 				relationalWriter->startRelation(L"writer_debug",{
       
   197 					{L"message", writer::TypeId::STRING},
       
   198 				}, true);
       
   199 
       
   200 				for (string_t line; getline(awkOutputReader, line).good();) {
       
   201 					relationalWriter->writeAttribute(line);
       
   202 				}
       
   203 
       
   204 				closeOrThrow(awkOutputReaderFD);
   186 				closeOrThrow(awkOutputReaderFD);
   205 				exit(0);
   187 
       
   188 				redirectFD(awkInputReaderFD, STDIN_FILENO);
       
   189 				redirectFD(awkOutputWriterFD, STDOUT_FILENO);
       
   190 
       
   191 				// AWK script:
       
   192 				std::wstringstream awkScript;
       
   193 				awkScript << L"BEGIN {" << std::endl;
       
   194 				awkScript << L"FS=\"\\t\";" << std::endl;
       
   195 				awkScript << L"};" << std::endl;
       
   196 
       
   197 				awkScript << L"END {" << std::endl;
       
   198 				// awkScript << … << std::endl;
       
   199 				awkScript << L"};" << std::endl;
       
   200 
       
   201 				awkScript << L"{print \"AWK says: line \" NR \" '\" $0 \"' has \" NF \" fields; first field is '\" $1 \"'\";}" << std::endl;
       
   202 
       
   203 				// CLI arguments:
       
   204 				std::vector<std::string> args;
       
   205 				args.push_back("awk");
       
   206 
       
   207 				for (auto d : configuration.definitions) addDefinition(args, d);
       
   208 				for (auto d : currentRelationConfiguration->definitions) addDefinition(args, d);
       
   209 
       
   210 				args.push_back(convertor.to_bytes(awkScript.str()));
       
   211 
       
   212 				// Runs AWK program found on $PATH → user can plug-in a custom implementation or a wrapper, but this can be also bit dangerous (however AWK itself is dangerous).
       
   213 				execp(args);
   206 			} else {
   214 			} else {
   207 				// Parent process
   215 				// Parent process
   208 				closeOrThrow(awkOutputReaderFD);
   216 				closeOrThrow(awkInputReaderFD);
       
   217 				closeOrThrow(awkOutputWriterFD);
       
   218 
       
   219 				__pid_t writerPid = fork();
       
   220 
       
   221 				if (writerPid < 0) {
       
   222 					throw cli::RelpipeCLIException(L"Unable to fork Writer process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
       
   223 				} else if (writerPid == 0) {
       
   224 					// Writer child process
       
   225 					closeOrThrow(awkInputWriterFD);
       
   226 
       
   227 					locale::global(locale("")); // needed for processing unicode texts, otherwise getline() stopped working on first line with non-ascii characters; TODO: move somewhere else?
       
   228 
       
   229 					__gnu_cxx::stdio_filebuf<wchar_t> awkOutputReaderBuffer(awkOutputReaderFD, std::ios::in);
       
   230 					std::wistream awkOutputReader(&awkOutputReaderBuffer);
       
   231 
       
   232 					// FIXME: currentWriterMetadata
       
   233 					relationalWriter->startRelation(name,{
       
   234 						{L"message", writer::TypeId::STRING},
       
   235 					}, true);
       
   236 
       
   237 					for (string_t line; getline(awkOutputReader, line).good();) {
       
   238 						relationalWriter->writeAttribute(line);
       
   239 					}
       
   240 
       
   241 					closeOrThrow(awkOutputReaderFD);
       
   242 					exit(0);
       
   243 				} else {
       
   244 					// Parent process
       
   245 					closeOrThrow(awkOutputReaderFD);
       
   246 				}
   209 			}
   247 			}
       
   248 		} else {
       
   249 			add(currentReaderMetadata, currentWriterMetadata);
       
   250 			relationalWriter->startRelation(name, currentWriterMetadata, true);
   210 		}
   251 		}
   211 
   252 
   212 	}
   253 	}
   213 
   254 
   214 	void attribute(const string_t& value) override {
   255 	void attribute(const string_t& value) override {
   215 		string_t variableName = a2v(currentReaderMetadata[currentAttributeIndex].getAttributeName());
   256 		if (currentRelationConfiguration) {
   216 		string_t variableValue = escapeAwkValue(value);
   257 			string_t variableName = a2v(currentReaderMetadata[currentAttributeIndex].getAttributeName());
   217 
   258 			string_t variableValue = escapeAwkValue(value);
   218 		currentAttributeIndex++;
   259 
   219 		currentAttributeIndex = currentAttributeIndex % currentReaderMetadata.size();
   260 			currentAttributeIndex++;
   220 
   261 			currentAttributeIndex = currentAttributeIndex % currentReaderMetadata.size();
   221 		// TODO: just the value – move name to the AWK function
   262 
   222 		std::string variablePair = convertor.to_bytes(variableName + L"=" + variableValue);
   263 			// TODO: just the value – move name to the AWK function
   223 
   264 			std::string variablePair = convertor.to_bytes(variableName + L"=" + variableValue);
   224 		if (currentAttributeIndex == 0) variablePair += "\n";
   265 
   225 		else variablePair += "\t";
   266 			if (currentAttributeIndex == 0) variablePair += "\n";
   226 
   267 			else variablePair += "\t";
   227 		write(awkInputWriterFD, variablePair.c_str(), variablePair.length());
   268 
   228 
   269 			write(awkInputWriterFD, variablePair.c_str(), variablePair.length());
       
   270 
       
   271 		} else {
       
   272 			relationalWriter->writeAttribute(value);
       
   273 		}
   229 	}
   274 	}
   230 
   275 
   231 	void endOfPipe() {
   276 	void endOfPipe() {
   232 		cleanUp();
   277 		cleanUp();
   233 	}
   278 	}