BufferingMode: configurable modes that control when flush() is called
set through the environmental variable RELPIPE_WRITER_BUFFERING_MODE or through the API method setBufferingMode()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/include/relpipe/writer/BufferingMode.h Sat Apr 23 23:23:29 2022 +0200
@@ -0,0 +1,62 @@
+/**
+ * Relational pipes (library)
+ * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the:
+ * - GNU Lesser General Public License as published by the Free Software Foundation;
+ * version 3 of the License or (at your option)
+ * - GNU General Public License as published by the Free Software Foundation;
+ * version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#pragma once
+
+namespace relpipe {
+namespace writer {
+
+/**
+ * Relational output might be – and usually is – buffered
+ * i.e. relational data are written to a buffer and flushed later and sent to the actual output stream.
+ *
+ * Through these modes we can control when the flush is done.
+ *
+ * Recommendation:
+ * - batch processing: AUTO
+ * - interactive processing: RECORD
+ * - tools should not set the mode explicitly unless asked by the user or unles specific mode is obvious for given task;
+ * then ENVIRONMENT mode is the initial one and it defaults to AUTO when user has not set the environmental variable
+ */
+enum class BufferingMode {
+ /**
+ * Output buffer is not explicitly flushed, it is written when full.
+ */
+ AUTO,
+ /**
+ * Mode is determined by the environmental variable RELPIPE_WRITER_BUFFERING_MODE.
+ * If value is missing or has invalid value, AUTO is used as default.
+ */
+ ENVIRONMENT,
+ /**
+ * Output buffer is flushed at least at the end of each relation.
+ */
+ RELATION,
+ /**
+ * Output buffer is flushed at least at the end of each record.
+ */
+ RECORD,
+ /**
+ * Output buffer is flushed at least at the end of each attribute.
+ */
+ ATTRIBUTE
+};
+
+}
+}
--- a/include/relpipe/writer/RelationalWriter.h Sat Apr 23 21:09:24 2022 +0200
+++ b/include/relpipe/writer/RelationalWriter.h Sat Apr 23 23:23:29 2022 +0200
@@ -25,6 +25,7 @@
#include "typedefs.h"
#include "TypeId.h"
#include "AttributeMetadata.h"
+#include "BufferingMode.h"
namespace relpipe {
namespace writer {
@@ -71,6 +72,13 @@
*/
virtual void writeAttribute(const void* value, const std::type_info& type) = 0;
+
+ /**
+ * @param mode buffering mode to be set
+ * @param envDefault if mode is ENVIRONMENT but given environmental variable is not set, this defaul is used
+ */
+ virtual void setBufferingMode(BufferingMode mode, BufferingMode envDefault = BufferingMode::AUTO) = 0;
+
};
}
--- a/src/StreamRelationalWriter.h Sat Apr 23 21:09:24 2022 +0200
+++ b/src/StreamRelationalWriter.h Sat Apr 23 23:23:29 2022 +0200
@@ -20,8 +20,10 @@
#pragma once
#include <string>
+#include <cstring>
#include <iostream>
#include <vector>
+#include <stdexcept>
#include <relpipe/protocol/constants.h>
@@ -46,6 +48,8 @@
types::IntegerDataTypeWriter integerWriter;
types::StringDataTypeWriter stringWriter;
std::vector<DataTypeWriterBase*> writers = {&booleanWriter, &integerWriter, &stringWriter};
+ BufferingMode bufferingMode = BufferingMode::ENVIRONMENT;
+ BufferingMode bufferingModeEnvDefault = BufferingMode::AUTO;
/**
* count of columns in the current table
@@ -73,6 +77,28 @@
throw RelpipeWriterException(L"Unsupported data type: " + static_cast<integer_t> (typeId));
}
+ BufferingMode decodeBufferingMode(std::string modeName) {
+ if (modeName == "AUTO") return BufferingMode::AUTO;
+ else if (modeName == "ENVIRONMENT") return BufferingMode::ENVIRONMENT;
+ else if (modeName == "RELATION") return BufferingMode::RELATION;
+ else if (modeName == "RECORD") return BufferingMode::RECORD;
+ else if (modeName == "ATTRIBUTE") return BufferingMode::ATTRIBUTE;
+ else throw std::invalid_argument("Invalid value of BufferingMode.");
+ }
+
+ void updateBufferingMode() {
+ if (bufferingMode == BufferingMode::ENVIRONMENT) {
+ bufferingMode = bufferingModeEnvDefault;
+ try {
+ char* modeName = getenv("RELPIPE_WRITER_BUFFERING_MODE");
+ if (modeName && strlen(modeName)) bufferingMode = decodeBufferingMode(modeName);
+ } catch (...) {
+ throw RelpipeWriterException(L"Invalid value of the RELPIPE_WRITER_BUFFERING_MODE environmental variable.");
+ }
+ if (bufferingMode == BufferingMode::ENVIRONMENT) throw RelpipeWriterException(L"RELPIPE_WRITER_BUFFERING_MODE must not be set to ENVIRONMENT (infinite recursion)");
+ }
+ }
+
public:
StreamRelationalWriter(std::ostream &output) :
@@ -89,6 +115,8 @@
columnCount = attributes.size();
currentColumn = 0;
+ updateBufferingMode();
+
// Write table name and column count:
if (writeHeader) {
integerWriter.writeValue(output, DATA_PART_START);
@@ -114,26 +142,36 @@
columnTypes[c] = typeId;
}
- // TODO: configurable buffer control
- output.flush();
+ if (bufferingMode >= BufferingMode::RELATION && bufferingMode <= BufferingMode::ATTRIBUTE) output.flush();
}
void writeAttribute(const string_t& value) override {
if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);
+
// TODO: select writer for each attribute just once in startRelation() instead of looking it each time here
writeString(value, columnTypes[currentColumn]);
- if (++currentColumn == columnCount) currentColumn = 0;
- // TODO: configurable buffer control
- output.flush();
+
+ bool endOfRedord = ++currentColumn == columnCount;
+ if (endOfRedord) currentColumn = 0;
+
+ if (bufferingMode == BufferingMode::ATTRIBUTE || (endOfRedord && bufferingMode == BufferingMode::RECORD)) output.flush();
}
void writeAttribute(const void* value, const std::type_info& type) override {
if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);
+
// TODO: select writer for each attribute just once in startRelation() instead of looking it each time here
writeRaw(value, type, columnTypes[currentColumn]);
- if (++currentColumn == columnCount) currentColumn = 0;
- // TODO: configurable buffer control
- output.flush();
+
+ bool endOfRedord = ++currentColumn == columnCount;
+ if (endOfRedord) currentColumn = 0;
+
+ if (bufferingMode == BufferingMode::ATTRIBUTE || (endOfRedord && bufferingMode == BufferingMode::RECORD)) output.flush();
+ }
+
+ void setBufferingMode(BufferingMode mode, BufferingMode envDefault) override {
+ bufferingMode = mode;
+ bufferingModeEnvDefault = envDefault;
}
};