diff -r 25c1ff79297c -r 4fce579bed22 src/StreamRelationalWriter.h --- a/src/StreamRelationalWriter.h Sat Apr 23 21:09:24 2022 +0200 +++ b/src/StreamRelationalWriter.h Sat Apr 23 23:23:29 2022 +0200 @@ -20,8 +20,10 @@ #pragma once #include +#include #include #include +#include #include @@ -46,6 +48,8 @@ types::IntegerDataTypeWriter integerWriter; types::StringDataTypeWriter stringWriter; std::vector writers = {&booleanWriter, &integerWriter, &stringWriter}; + BufferingMode bufferingMode = BufferingMode::ENVIRONMENT; + BufferingMode bufferingModeEnvDefault = BufferingMode::AUTO; /** * count of columns in the current table @@ -73,6 +77,28 @@ throw RelpipeWriterException(L"Unsupported data type: " + static_cast (typeId)); } + BufferingMode decodeBufferingMode(std::string modeName) { + if (modeName == "AUTO") return BufferingMode::AUTO; + else if (modeName == "ENVIRONMENT") return BufferingMode::ENVIRONMENT; + else if (modeName == "RELATION") return BufferingMode::RELATION; + else if (modeName == "RECORD") return BufferingMode::RECORD; + else if (modeName == "ATTRIBUTE") return BufferingMode::ATTRIBUTE; + else throw std::invalid_argument("Invalid value of BufferingMode."); + } + + void updateBufferingMode() { + if (bufferingMode == BufferingMode::ENVIRONMENT) { + bufferingMode = bufferingModeEnvDefault; + try { + char* modeName = getenv("RELPIPE_WRITER_BUFFERING_MODE"); + if (modeName && strlen(modeName)) bufferingMode = decodeBufferingMode(modeName); + } catch (...) { + throw RelpipeWriterException(L"Invalid value of the RELPIPE_WRITER_BUFFERING_MODE environmental variable."); + } + if (bufferingMode == BufferingMode::ENVIRONMENT) throw RelpipeWriterException(L"RELPIPE_WRITER_BUFFERING_MODE must not be set to ENVIRONMENT (infinite recursion)"); + } + } + public: StreamRelationalWriter(std::ostream &output) : @@ -89,6 +115,8 @@ columnCount = attributes.size(); currentColumn = 0; + updateBufferingMode(); + // Write table name and column count: if (writeHeader) { integerWriter.writeValue(output, DATA_PART_START); @@ -114,26 +142,36 @@ columnTypes[c] = typeId; } - // TODO: configurable buffer control - output.flush(); + if (bufferingMode >= BufferingMode::RELATION && bufferingMode <= BufferingMode::ATTRIBUTE) output.flush(); } void writeAttribute(const string_t& value) override { if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW); + // TODO: select writer for each attribute just once in startRelation() instead of looking it each time here writeString(value, columnTypes[currentColumn]); - if (++currentColumn == columnCount) currentColumn = 0; - // TODO: configurable buffer control - output.flush(); + + bool endOfRedord = ++currentColumn == columnCount; + if (endOfRedord) currentColumn = 0; + + if (bufferingMode == BufferingMode::ATTRIBUTE || (endOfRedord && bufferingMode == BufferingMode::RECORD)) output.flush(); } void writeAttribute(const void* value, const std::type_info& type) override { if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW); + // TODO: select writer for each attribute just once in startRelation() instead of looking it each time here writeRaw(value, type, columnTypes[currentColumn]); - if (++currentColumn == columnCount) currentColumn = 0; - // TODO: configurable buffer control - output.flush(); + + bool endOfRedord = ++currentColumn == columnCount; + if (endOfRedord) currentColumn = 0; + + if (bufferingMode == BufferingMode::ATTRIBUTE || (endOfRedord && bufferingMode == BufferingMode::RECORD)) output.flush(); + } + + void setBufferingMode(BufferingMode mode, BufferingMode envDefault) override { + bufferingMode = mode; + bufferingModeEnvDefault = envDefault; } };