# HG changeset patch # User František Kučera # Date 1557179542 -7200 # Node ID b74001992ec3f10354f43c890e79680f518f08cc # Parent 8844ebce8fb49a29294f03c0c5eb383475998170 implement --relation option (thus some relations might pass unmodified by AWK), support per-relation variables diff -r 8844ebce8fb4 -r b74001992ec3 src/AwkHandler.h --- a/src/AwkHandler.h Mon May 06 21:57:16 2019 +0200 +++ b/src/AwkHandler.h Mon May 06 23:52:22 2019 +0200 @@ -17,6 +17,7 @@ */ #pragma once +#include #include #include #include @@ -62,10 +63,13 @@ private: Configuration configuration; writer::RelationalWriter* relationalWriter; + std::function relationalWriterFlush; std::wstring_convert> convertor; // TODO: support also other encodings int awkInputWriterFD = -1; + RelationConfiguration* currentRelationConfiguration = nullptr; std::vector currentReaderMetadata; + vector currentWriterMetadata; integer_t currentAttributeIndex = 0; void createPipe(int& readerFD, int& writerFD) { @@ -97,6 +101,19 @@ throw cli::RelpipeCLIException(L"Unable to do execvp().", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? } + void addDefinition(std::vector& awkCliArgs, DefinitionRecipe& d) { + awkCliArgs.push_back("-v"); + awkCliArgs.push_back(convertor.to_bytes(a2v(d.name) + L"=" + d.value)); + } + + void add(vector& readerAttributes, vector& writerAttributes) { + for (AttributeMetadata readerAttributes : readerAttributes) + writerAttributes.push_back({ + readerAttributes.getAttributeName(), + relationalWriter->toTypeId(readerAttributes.getTypeName()) + }); + } + void cleanUp() { if (awkInputWriterFD >= 0) { closeOrThrow(awkInputWriterFD); @@ -108,6 +125,7 @@ currentAttributeIndex = 0; currentReaderMetadata.clear(); + currentWriterMetadata.clear(); } string_t a2v(const string_t& attributeName) { @@ -124,7 +142,16 @@ public: - AwkHandler(writer::RelationalWriter* relationalWriter, Configuration& configuration) : relationalWriter(relationalWriter), configuration(configuration) { + /** + * @param relationalWriter + * @param relationalWriterFlush the writer must be flushed before fork() in order to + * avoid duplicate output (otherwise single relation might be written from two processes); + * This is a little hack – if it stops working, we should reconnect the pipes + * and use the writer only from a single process and avoid its effective duplication, + * or use different writers for each relation (or process). + * @param configuration + */ + AwkHandler(writer::RelationalWriter* relationalWriter, std::function relationalWriterFlush, Configuration& configuration) : relationalWriter(relationalWriter), relationalWriterFlush(relationalWriterFlush), configuration(configuration) { } void startRelation(string_t name, vector attributes) override { @@ -132,100 +159,118 @@ currentReaderMetadata = attributes; - int awkInputReaderFD; - int awkOutputReaderFD; - int awkOutputWriterFD; - - createPipe(awkInputReaderFD, awkInputWriterFD); - createPipe(awkOutputReaderFD, awkOutputWriterFD); + currentRelationConfiguration = nullptr; + for (int i = 0; i < configuration.relationConfigurations.size(); i++) { + if (regex_match(name, wregex(configuration.relationConfigurations[i].relation))) { + currentRelationConfiguration = &configuration.relationConfigurations[i]; + break; // it there are multiple matches, only the first configuration is used + } + } - __pid_t awkPid = fork(); - - if (awkPid < 0) { - throw cli::RelpipeCLIException(L"Unable to fork AWK process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? - } else if (awkPid == 0) { - // AWK child process - closeOrThrow(awkInputWriterFD); - closeOrThrow(awkOutputReaderFD); + if (currentRelationConfiguration) { + int awkInputReaderFD; + int awkOutputReaderFD; + int awkOutputWriterFD; - redirectFD(awkInputReaderFD, STDIN_FILENO); - redirectFD(awkOutputWriterFD, STDOUT_FILENO); + createPipe(awkInputReaderFD, awkInputWriterFD); + createPipe(awkOutputReaderFD, awkOutputWriterFD); - // AWK script: - std::wstringstream awkScript; - awkScript << L"BEGIN {" << std::endl; - awkScript << L"FS=\"\\t\";" << std::endl; - awkScript << L"};" << std::endl; + relationalWriterFlush(); + __pid_t awkPid = fork(); - awkScript << L"END {" << std::endl; - // awkScript << … << std::endl; - awkScript << L"};" << std::endl; - - awkScript << L"{print \"AWK says: line \" NR \" '\" $0 \"' has \" NF \" fields; first field is '\" $1 \"'\";}" << std::endl; - - // CLI arguments: - std::vector args; - args.push_back("awk"); + if (awkPid < 0) { + throw cli::RelpipeCLIException(L"Unable to fork AWK process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? + } else if (awkPid == 0) { + // AWK child process + closeOrThrow(awkInputWriterFD); + closeOrThrow(awkOutputReaderFD); - for (auto d : configuration.definitions) { - args.push_back("-v"); - args.push_back(convertor.to_bytes(a2v(d.name) + L"=" + d.value)); - } - args.push_back(convertor.to_bytes(awkScript.str())); + redirectFD(awkInputReaderFD, STDIN_FILENO); + redirectFD(awkOutputWriterFD, STDOUT_FILENO); - // Runs AWK program found on $PATH → user can plug-in a custom implementation or a wrapper, but this can be also bit dangerous (however AWK itself is dangerous). - execp(args); - } else { - // Parent process - closeOrThrow(awkInputReaderFD); - closeOrThrow(awkOutputWriterFD); + // AWK script: + std::wstringstream awkScript; + awkScript << L"BEGIN {" << std::endl; + awkScript << L"FS=\"\\t\";" << std::endl; + awkScript << L"};" << std::endl; - __pid_t writerPid = fork(); + awkScript << L"END {" << std::endl; + // awkScript << … << std::endl; + awkScript << L"};" << std::endl; - if (writerPid < 0) { - throw cli::RelpipeCLIException(L"Unable to fork Writer process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? - } else if (writerPid == 0) { - // Writer child process - closeOrThrow(awkInputWriterFD); + awkScript << L"{print \"AWK says: line \" NR \" '\" $0 \"' has \" NF \" fields; first field is '\" $1 \"'\";}" << std::endl; - locale::global(locale("")); // needed for processing unicode texts, otherwise getline() stopped working on first line with non-ascii characters; TODO: move somewhere else? - - __gnu_cxx::stdio_filebuf awkOutputReaderBuffer(awkOutputReaderFD, std::ios::in); - std::wistream awkOutputReader(&awkOutputReaderBuffer); + // CLI arguments: + std::vector args; + args.push_back("awk"); - relationalWriter->startRelation(L"writer_debug",{ - {L"message", writer::TypeId::STRING}, - }, true); + for (auto d : configuration.definitions) addDefinition(args, d); + for (auto d : currentRelationConfiguration->definitions) addDefinition(args, d); - for (string_t line; getline(awkOutputReader, line).good();) { - relationalWriter->writeAttribute(line); - } + args.push_back(convertor.to_bytes(awkScript.str())); - closeOrThrow(awkOutputReaderFD); - exit(0); + // Runs AWK program found on $PATH → user can plug-in a custom implementation or a wrapper, but this can be also bit dangerous (however AWK itself is dangerous). + execp(args); } else { // Parent process - closeOrThrow(awkOutputReaderFD); + closeOrThrow(awkInputReaderFD); + closeOrThrow(awkOutputWriterFD); + + __pid_t writerPid = fork(); + + if (writerPid < 0) { + throw cli::RelpipeCLIException(L"Unable to fork Writer process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions? + } else if (writerPid == 0) { + // Writer child process + closeOrThrow(awkInputWriterFD); + + locale::global(locale("")); // needed for processing unicode texts, otherwise getline() stopped working on first line with non-ascii characters; TODO: move somewhere else? + + __gnu_cxx::stdio_filebuf awkOutputReaderBuffer(awkOutputReaderFD, std::ios::in); + std::wistream awkOutputReader(&awkOutputReaderBuffer); + + // FIXME: currentWriterMetadata + relationalWriter->startRelation(name,{ + {L"message", writer::TypeId::STRING}, + }, true); + + for (string_t line; getline(awkOutputReader, line).good();) { + relationalWriter->writeAttribute(line); + } + + closeOrThrow(awkOutputReaderFD); + exit(0); + } else { + // Parent process + closeOrThrow(awkOutputReaderFD); + } } + } else { + add(currentReaderMetadata, currentWriterMetadata); + relationalWriter->startRelation(name, currentWriterMetadata, true); } } void attribute(const string_t& value) override { - string_t variableName = a2v(currentReaderMetadata[currentAttributeIndex].getAttributeName()); - string_t variableValue = escapeAwkValue(value); + if (currentRelationConfiguration) { + string_t variableName = a2v(currentReaderMetadata[currentAttributeIndex].getAttributeName()); + string_t variableValue = escapeAwkValue(value); - currentAttributeIndex++; - currentAttributeIndex = currentAttributeIndex % currentReaderMetadata.size(); + currentAttributeIndex++; + currentAttributeIndex = currentAttributeIndex % currentReaderMetadata.size(); - // TODO: just the value – move name to the AWK function - std::string variablePair = convertor.to_bytes(variableName + L"=" + variableValue); + // TODO: just the value – move name to the AWK function + std::string variablePair = convertor.to_bytes(variableName + L"=" + variableValue); + + if (currentAttributeIndex == 0) variablePair += "\n"; + else variablePair += "\t"; - if (currentAttributeIndex == 0) variablePair += "\n"; - else variablePair += "\t"; + write(awkInputWriterFD, variablePair.c_str(), variablePair.length()); - write(awkInputWriterFD, variablePair.c_str(), variablePair.length()); - + } else { + relationalWriter->writeAttribute(value); + } } void endOfPipe() { diff -r 8844ebce8fb4 -r b74001992ec3 src/relpipe-tr-awk.cpp --- a/src/relpipe-tr-awk.cpp Mon May 06 21:57:16 2019 +0200 +++ b/src/relpipe-tr-awk.cpp Mon May 06 23:52:22 2019 +0200 @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -52,10 +53,10 @@ Configuration configuration = cliParser.parse(cli.arguments()); std::shared_ptr reader(reader::Factory::create(std::cin)); std::shared_ptr writer(writer::Factory::create(std::cout)); - AwkHandler handler(writer.get(), configuration); + AwkHandler handler(writer.get(), std::bind(fflush, stdout), configuration); // std::bind(fflush, XXX) writer::Factory::create(XXX) must be the same stream XXX reader->addHandler(&handler); reader->process(); - + resultCode = CLI::EXIT_CODE_SUCCESS; } catch (RelpipeCLIException& e) {