# HG changeset patch # User František Kučera # Date 1650749009 -7200 # Node ID 4fce579bed224eb4e3cc3f60a5b1d30bd3b35828 # Parent 25c1ff79297c75f02ac5c9b31e0f36523e503366 BufferingMode: configurable modes that control when flush() is called set through the environmental variable RELPIPE_WRITER_BUFFERING_MODE or through the API method setBufferingMode() diff -r 25c1ff79297c -r 4fce579bed22 include/relpipe/writer/BufferingMode.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/include/relpipe/writer/BufferingMode.h Sat Apr 23 23:23:29 2022 +0200 @@ -0,0 +1,62 @@ +/** + * Relational pipes (library) + * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the: + * - GNU Lesser General Public License as published by the Free Software Foundation; + * version 3 of the License or (at your option) + * - GNU General Public License as published by the Free Software Foundation; + * version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#pragma once + +namespace relpipe { +namespace writer { + +/** + * Relational output might be – and usually is – buffered + * i.e. relational data are written to a buffer and flushed later and sent to the actual output stream. + * + * Through these modes we can control when the flush is done. + * + * Recommendation: + * - batch processing: AUTO + * - interactive processing: RECORD + * - tools should not set the mode explicitly unless asked by the user or unles specific mode is obvious for given task; + * then ENVIRONMENT mode is the initial one and it defaults to AUTO when user has not set the environmental variable + */ +enum class BufferingMode { + /** + * Output buffer is not explicitly flushed, it is written when full. + */ + AUTO, + /** + * Mode is determined by the environmental variable RELPIPE_WRITER_BUFFERING_MODE. + * If value is missing or has invalid value, AUTO is used as default. + */ + ENVIRONMENT, + /** + * Output buffer is flushed at least at the end of each relation. + */ + RELATION, + /** + * Output buffer is flushed at least at the end of each record. + */ + RECORD, + /** + * Output buffer is flushed at least at the end of each attribute. + */ + ATTRIBUTE +}; + +} +} diff -r 25c1ff79297c -r 4fce579bed22 include/relpipe/writer/RelationalWriter.h --- a/include/relpipe/writer/RelationalWriter.h Sat Apr 23 21:09:24 2022 +0200 +++ b/include/relpipe/writer/RelationalWriter.h Sat Apr 23 23:23:29 2022 +0200 @@ -25,6 +25,7 @@ #include "typedefs.h" #include "TypeId.h" #include "AttributeMetadata.h" +#include "BufferingMode.h" namespace relpipe { namespace writer { @@ -71,6 +72,13 @@ */ virtual void writeAttribute(const void* value, const std::type_info& type) = 0; + + /** + * @param mode buffering mode to be set + * @param envDefault if mode is ENVIRONMENT but given environmental variable is not set, this defaul is used + */ + virtual void setBufferingMode(BufferingMode mode, BufferingMode envDefault = BufferingMode::AUTO) = 0; + }; } diff -r 25c1ff79297c -r 4fce579bed22 src/StreamRelationalWriter.h --- a/src/StreamRelationalWriter.h Sat Apr 23 21:09:24 2022 +0200 +++ b/src/StreamRelationalWriter.h Sat Apr 23 23:23:29 2022 +0200 @@ -20,8 +20,10 @@ #pragma once #include +#include #include #include +#include #include @@ -46,6 +48,8 @@ types::IntegerDataTypeWriter integerWriter; types::StringDataTypeWriter stringWriter; std::vector writers = {&booleanWriter, &integerWriter, &stringWriter}; + BufferingMode bufferingMode = BufferingMode::ENVIRONMENT; + BufferingMode bufferingModeEnvDefault = BufferingMode::AUTO; /** * count of columns in the current table @@ -73,6 +77,28 @@ throw RelpipeWriterException(L"Unsupported data type: " + static_cast (typeId)); } + BufferingMode decodeBufferingMode(std::string modeName) { + if (modeName == "AUTO") return BufferingMode::AUTO; + else if (modeName == "ENVIRONMENT") return BufferingMode::ENVIRONMENT; + else if (modeName == "RELATION") return BufferingMode::RELATION; + else if (modeName == "RECORD") return BufferingMode::RECORD; + else if (modeName == "ATTRIBUTE") return BufferingMode::ATTRIBUTE; + else throw std::invalid_argument("Invalid value of BufferingMode."); + } + + void updateBufferingMode() { + if (bufferingMode == BufferingMode::ENVIRONMENT) { + bufferingMode = bufferingModeEnvDefault; + try { + char* modeName = getenv("RELPIPE_WRITER_BUFFERING_MODE"); + if (modeName && strlen(modeName)) bufferingMode = decodeBufferingMode(modeName); + } catch (...) { + throw RelpipeWriterException(L"Invalid value of the RELPIPE_WRITER_BUFFERING_MODE environmental variable."); + } + if (bufferingMode == BufferingMode::ENVIRONMENT) throw RelpipeWriterException(L"RELPIPE_WRITER_BUFFERING_MODE must not be set to ENVIRONMENT (infinite recursion)"); + } + } + public: StreamRelationalWriter(std::ostream &output) : @@ -89,6 +115,8 @@ columnCount = attributes.size(); currentColumn = 0; + updateBufferingMode(); + // Write table name and column count: if (writeHeader) { integerWriter.writeValue(output, DATA_PART_START); @@ -114,26 +142,36 @@ columnTypes[c] = typeId; } - // TODO: configurable buffer control - output.flush(); + if (bufferingMode >= BufferingMode::RELATION && bufferingMode <= BufferingMode::ATTRIBUTE) output.flush(); } void writeAttribute(const string_t& value) override { if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW); + // TODO: select writer for each attribute just once in startRelation() instead of looking it each time here writeString(value, columnTypes[currentColumn]); - if (++currentColumn == columnCount) currentColumn = 0; - // TODO: configurable buffer control - output.flush(); + + bool endOfRedord = ++currentColumn == columnCount; + if (endOfRedord) currentColumn = 0; + + if (bufferingMode == BufferingMode::ATTRIBUTE || (endOfRedord && bufferingMode == BufferingMode::RECORD)) output.flush(); } void writeAttribute(const void* value, const std::type_info& type) override { if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW); + // TODO: select writer for each attribute just once in startRelation() instead of looking it each time here writeRaw(value, type, columnTypes[currentColumn]); - if (++currentColumn == columnCount) currentColumn = 0; - // TODO: configurable buffer control - output.flush(); + + bool endOfRedord = ++currentColumn == columnCount; + if (endOfRedord) currentColumn = 0; + + if (bufferingMode == BufferingMode::ATTRIBUTE || (endOfRedord && bufferingMode == BufferingMode::RECORD)) output.flush(); + } + + void setBufferingMode(BufferingMode mode, BufferingMode envDefault) override { + bufferingMode = mode; + bufferingModeEnvDefault = envDefault; } };