--- a/src/AwkHandler.h Mon May 06 21:57:16 2019 +0200
+++ b/src/AwkHandler.h Mon May 06 23:52:22 2019 +0200
@@ -17,6 +17,7 @@
*/
#pragma once
+#include<functional>
#include <memory>
#include <string>
#include <vector>
@@ -62,10 +63,13 @@
private:
Configuration configuration;
writer::RelationalWriter* relationalWriter;
+ std::function<void() > relationalWriterFlush;
std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings
int awkInputWriterFD = -1;
+ RelationConfiguration* currentRelationConfiguration = nullptr;
std::vector<AttributeMetadata> currentReaderMetadata;
+ vector<writer::AttributeMetadata> currentWriterMetadata;
integer_t currentAttributeIndex = 0;
void createPipe(int& readerFD, int& writerFD) {
@@ -97,6 +101,19 @@
throw cli::RelpipeCLIException(L"Unable to do execvp().", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
}
+ void addDefinition(std::vector<std::string>& awkCliArgs, DefinitionRecipe& d) {
+ awkCliArgs.push_back("-v");
+ awkCliArgs.push_back(convertor.to_bytes(a2v(d.name) + L"=" + d.value));
+ }
+
+ void add(vector<AttributeMetadata>& readerAttributes, vector<writer::AttributeMetadata>& writerAttributes) {
+ for (AttributeMetadata readerAttributes : readerAttributes)
+ writerAttributes.push_back({
+ readerAttributes.getAttributeName(),
+ relationalWriter->toTypeId(readerAttributes.getTypeName())
+ });
+ }
+
void cleanUp() {
if (awkInputWriterFD >= 0) {
closeOrThrow(awkInputWriterFD);
@@ -108,6 +125,7 @@
currentAttributeIndex = 0;
currentReaderMetadata.clear();
+ currentWriterMetadata.clear();
}
string_t a2v(const string_t& attributeName) {
@@ -124,7 +142,16 @@
public:
- AwkHandler(writer::RelationalWriter* relationalWriter, Configuration& configuration) : relationalWriter(relationalWriter), configuration(configuration) {
+ /**
+ * @param relationalWriter
+ * @param relationalWriterFlush the writer must be flushed before fork() in order to
+ * avoid duplicate output (otherwise single relation might be written from two processes);
+ * This is a little hack – if it stops working, we should reconnect the pipes
+ * and use the writer only from a single process and avoid its effective duplication,
+ * or use different writers for each relation (or process).
+ * @param configuration
+ */
+ AwkHandler(writer::RelationalWriter* relationalWriter, std::function<void() > relationalWriterFlush, Configuration& configuration) : relationalWriter(relationalWriter), relationalWriterFlush(relationalWriterFlush), configuration(configuration) {
}
void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
@@ -132,100 +159,118 @@
currentReaderMetadata = attributes;
- int awkInputReaderFD;
- int awkOutputReaderFD;
- int awkOutputWriterFD;
-
- createPipe(awkInputReaderFD, awkInputWriterFD);
- createPipe(awkOutputReaderFD, awkOutputWriterFD);
+ currentRelationConfiguration = nullptr;
+ for (int i = 0; i < configuration.relationConfigurations.size(); i++) {
+ if (regex_match(name, wregex(configuration.relationConfigurations[i].relation))) {
+ currentRelationConfiguration = &configuration.relationConfigurations[i];
+ break; // it there are multiple matches, only the first configuration is used
+ }
+ }
- __pid_t awkPid = fork();
-
- if (awkPid < 0) {
- throw cli::RelpipeCLIException(L"Unable to fork AWK process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
- } else if (awkPid == 0) {
- // AWK child process
- closeOrThrow(awkInputWriterFD);
- closeOrThrow(awkOutputReaderFD);
+ if (currentRelationConfiguration) {
+ int awkInputReaderFD;
+ int awkOutputReaderFD;
+ int awkOutputWriterFD;
- redirectFD(awkInputReaderFD, STDIN_FILENO);
- redirectFD(awkOutputWriterFD, STDOUT_FILENO);
+ createPipe(awkInputReaderFD, awkInputWriterFD);
+ createPipe(awkOutputReaderFD, awkOutputWriterFD);
- // AWK script:
- std::wstringstream awkScript;
- awkScript << L"BEGIN {" << std::endl;
- awkScript << L"FS=\"\\t\";" << std::endl;
- awkScript << L"};" << std::endl;
+ relationalWriterFlush();
+ __pid_t awkPid = fork();
- awkScript << L"END {" << std::endl;
- // awkScript << … << std::endl;
- awkScript << L"};" << std::endl;
-
- awkScript << L"{print \"AWK says: line \" NR \" '\" $0 \"' has \" NF \" fields; first field is '\" $1 \"'\";}" << std::endl;
-
- // CLI arguments:
- std::vector<std::string> args;
- args.push_back("awk");
+ if (awkPid < 0) {
+ throw cli::RelpipeCLIException(L"Unable to fork AWK process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
+ } else if (awkPid == 0) {
+ // AWK child process
+ closeOrThrow(awkInputWriterFD);
+ closeOrThrow(awkOutputReaderFD);
- for (auto d : configuration.definitions) {
- args.push_back("-v");
- args.push_back(convertor.to_bytes(a2v(d.name) + L"=" + d.value));
- }
- args.push_back(convertor.to_bytes(awkScript.str()));
+ redirectFD(awkInputReaderFD, STDIN_FILENO);
+ redirectFD(awkOutputWriterFD, STDOUT_FILENO);
- // Runs AWK program found on $PATH → user can plug-in a custom implementation or a wrapper, but this can be also bit dangerous (however AWK itself is dangerous).
- execp(args);
- } else {
- // Parent process
- closeOrThrow(awkInputReaderFD);
- closeOrThrow(awkOutputWriterFD);
+ // AWK script:
+ std::wstringstream awkScript;
+ awkScript << L"BEGIN {" << std::endl;
+ awkScript << L"FS=\"\\t\";" << std::endl;
+ awkScript << L"};" << std::endl;
- __pid_t writerPid = fork();
+ awkScript << L"END {" << std::endl;
+ // awkScript << … << std::endl;
+ awkScript << L"};" << std::endl;
- if (writerPid < 0) {
- throw cli::RelpipeCLIException(L"Unable to fork Writer process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
- } else if (writerPid == 0) {
- // Writer child process
- closeOrThrow(awkInputWriterFD);
+ awkScript << L"{print \"AWK says: line \" NR \" '\" $0 \"' has \" NF \" fields; first field is '\" $1 \"'\";}" << std::endl;
- locale::global(locale("")); // needed for processing unicode texts, otherwise getline() stopped working on first line with non-ascii characters; TODO: move somewhere else?
-
- __gnu_cxx::stdio_filebuf<wchar_t> awkOutputReaderBuffer(awkOutputReaderFD, std::ios::in);
- std::wistream awkOutputReader(&awkOutputReaderBuffer);
+ // CLI arguments:
+ std::vector<std::string> args;
+ args.push_back("awk");
- relationalWriter->startRelation(L"writer_debug",{
- {L"message", writer::TypeId::STRING},
- }, true);
+ for (auto d : configuration.definitions) addDefinition(args, d);
+ for (auto d : currentRelationConfiguration->definitions) addDefinition(args, d);
- for (string_t line; getline(awkOutputReader, line).good();) {
- relationalWriter->writeAttribute(line);
- }
+ args.push_back(convertor.to_bytes(awkScript.str()));
- closeOrThrow(awkOutputReaderFD);
- exit(0);
+ // Runs AWK program found on $PATH → user can plug-in a custom implementation or a wrapper, but this can be also bit dangerous (however AWK itself is dangerous).
+ execp(args);
} else {
// Parent process
- closeOrThrow(awkOutputReaderFD);
+ closeOrThrow(awkInputReaderFD);
+ closeOrThrow(awkOutputWriterFD);
+
+ __pid_t writerPid = fork();
+
+ if (writerPid < 0) {
+ throw cli::RelpipeCLIException(L"Unable to fork Writer process.", cli::CLI::EXIT_CODE_UNEXPECTED_ERROR); // TODO: better exceptions?
+ } else if (writerPid == 0) {
+ // Writer child process
+ closeOrThrow(awkInputWriterFD);
+
+ locale::global(locale("")); // needed for processing unicode texts, otherwise getline() stopped working on first line with non-ascii characters; TODO: move somewhere else?
+
+ __gnu_cxx::stdio_filebuf<wchar_t> awkOutputReaderBuffer(awkOutputReaderFD, std::ios::in);
+ std::wistream awkOutputReader(&awkOutputReaderBuffer);
+
+ // FIXME: currentWriterMetadata
+ relationalWriter->startRelation(name,{
+ {L"message", writer::TypeId::STRING},
+ }, true);
+
+ for (string_t line; getline(awkOutputReader, line).good();) {
+ relationalWriter->writeAttribute(line);
+ }
+
+ closeOrThrow(awkOutputReaderFD);
+ exit(0);
+ } else {
+ // Parent process
+ closeOrThrow(awkOutputReaderFD);
+ }
}
+ } else {
+ add(currentReaderMetadata, currentWriterMetadata);
+ relationalWriter->startRelation(name, currentWriterMetadata, true);
}
}
void attribute(const string_t& value) override {
- string_t variableName = a2v(currentReaderMetadata[currentAttributeIndex].getAttributeName());
- string_t variableValue = escapeAwkValue(value);
+ if (currentRelationConfiguration) {
+ string_t variableName = a2v(currentReaderMetadata[currentAttributeIndex].getAttributeName());
+ string_t variableValue = escapeAwkValue(value);
- currentAttributeIndex++;
- currentAttributeIndex = currentAttributeIndex % currentReaderMetadata.size();
+ currentAttributeIndex++;
+ currentAttributeIndex = currentAttributeIndex % currentReaderMetadata.size();
- // TODO: just the value – move name to the AWK function
- std::string variablePair = convertor.to_bytes(variableName + L"=" + variableValue);
+ // TODO: just the value – move name to the AWK function
+ std::string variablePair = convertor.to_bytes(variableName + L"=" + variableValue);
+
+ if (currentAttributeIndex == 0) variablePair += "\n";
+ else variablePair += "\t";
- if (currentAttributeIndex == 0) variablePair += "\n";
- else variablePair += "\t";
+ write(awkInputWriterFD, variablePair.c_str(), variablePair.length());
- write(awkInputWriterFD, variablePair.c_str(), variablePair.length());
-
+ } else {
+ relationalWriter->writeAttribute(value);
+ }
}
void endOfPipe() {
--- a/src/relpipe-tr-awk.cpp Mon May 06 21:57:16 2019 +0200
+++ b/src/relpipe-tr-awk.cpp Mon May 06 23:52:22 2019 +0200
@@ -19,6 +19,7 @@
#include <cstdio>
#include <cstdlib>
#include <memory>
+#include <functional>
#include <relpipe/cli/CLI.h>
#include <relpipe/cli/RelpipeCLIException.h>
@@ -52,10 +53,10 @@
Configuration configuration = cliParser.parse(cli.arguments());
std::shared_ptr<reader::RelationalReader> reader(reader::Factory::create(std::cin));
std::shared_ptr<writer::RelationalWriter> writer(writer::Factory::create(std::cout));
- AwkHandler handler(writer.get(), configuration);
+ AwkHandler handler(writer.get(), std::bind(fflush, stdout), configuration); // std::bind(fflush, XXX) writer::Factory::create(XXX) must be the same stream XXX
reader->addHandler(&handler);
reader->process();
-
+
resultCode = CLI::EXIT_CODE_SUCCESS;
} catch (RelpipeCLIException& e) {