BufferingMode: configurable modes that control when flush() is called v_0
authorFrantišek Kučera <franta-hg@frantovo.cz>
Sat, 23 Apr 2022 23:23:29 +0200
branchv_0
changeset 59 4fce579bed22
parent 58 25c1ff79297c
child 60 1b20c1e03065
BufferingMode: configurable modes that control when flush() is called set through the environmental variable RELPIPE_WRITER_BUFFERING_MODE or through the API method setBufferingMode()
include/relpipe/writer/BufferingMode.h
include/relpipe/writer/RelationalWriter.h
src/StreamRelationalWriter.h
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/include/relpipe/writer/BufferingMode.h	Sat Apr 23 23:23:29 2022 +0200
@@ -0,0 +1,62 @@
+/**
+ * Relational pipes (library)
+ * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the:
+ *  - GNU Lesser General Public License as published by the Free Software Foundation;
+ *    version 3 of the License or (at your option)
+ *  - GNU General Public License as published by the Free Software Foundation;
+ *    version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#pragma once
+
+namespace relpipe {
+namespace writer {
+
+/**
+ * Relational output might be – and usually is – buffered
+ * i.e. relational data are written to a buffer and flushed later and sent to the actual output stream.
+ * 
+ * Through these modes we can control when the flush is done.
+ * 
+ * Recommendation:
+ *  - batch processing: AUTO
+ *  - interactive processing: RECORD
+ *  - tools should not set the mode explicitly unless asked by the user or unles specific mode is obvious for given task;
+ *    then ENVIRONMENT mode is the initial one and it defaults to AUTO when user has not set the environmental variable
+ */
+enum class BufferingMode {
+	/**
+	 * Output buffer is not explicitly flushed, it is written when full.
+	 */
+	AUTO,
+	/**
+	 * Mode is determined by the environmental variable RELPIPE_WRITER_BUFFERING_MODE.
+	 * If value is missing or has invalid value, AUTO is used as default.
+	 */
+	ENVIRONMENT,
+	/**
+	 * Output buffer is flushed at least at the end of each relation.
+	 */
+	RELATION,
+	/**
+	 * Output buffer is flushed at least at the end of each record.
+	 */
+	RECORD,
+	/**
+	 * Output buffer is flushed at least at the end of each attribute.
+	 */
+	ATTRIBUTE
+};
+
+}
+}
--- a/include/relpipe/writer/RelationalWriter.h	Sat Apr 23 21:09:24 2022 +0200
+++ b/include/relpipe/writer/RelationalWriter.h	Sat Apr 23 23:23:29 2022 +0200
@@ -25,6 +25,7 @@
 #include "typedefs.h"
 #include "TypeId.h"
 #include "AttributeMetadata.h"
+#include "BufferingMode.h"
 
 namespace relpipe {
 namespace writer {
@@ -71,6 +72,13 @@
 	 */
 	virtual void writeAttribute(const void* value, const std::type_info& type) = 0;
 
+
+	/**
+	 * @param mode buffering mode to be set
+	 * @param envDefault if mode is ENVIRONMENT but given environmental variable is not set, this defaul is used
+	 */
+	virtual void setBufferingMode(BufferingMode mode, BufferingMode envDefault = BufferingMode::AUTO) = 0;
+
 };
 
 }
--- a/src/StreamRelationalWriter.h	Sat Apr 23 21:09:24 2022 +0200
+++ b/src/StreamRelationalWriter.h	Sat Apr 23 23:23:29 2022 +0200
@@ -20,8 +20,10 @@
 #pragma once
 
 #include <string>
+#include <cstring>
 #include <iostream>
 #include <vector>
+#include <stdexcept>
 
 #include <relpipe/protocol/constants.h>
 
@@ -46,6 +48,8 @@
 	types::IntegerDataTypeWriter integerWriter;
 	types::StringDataTypeWriter stringWriter;
 	std::vector<DataTypeWriterBase*> writers = {&booleanWriter, &integerWriter, &stringWriter};
+	BufferingMode bufferingMode = BufferingMode::ENVIRONMENT;
+	BufferingMode bufferingModeEnvDefault = BufferingMode::AUTO;
 
 	/**
 	 * count of columns in the current table
@@ -73,6 +77,28 @@
 		throw RelpipeWriterException(L"Unsupported data type: " + static_cast<integer_t> (typeId));
 	}
 
+	BufferingMode decodeBufferingMode(std::string modeName) {
+		if (modeName == "AUTO") return BufferingMode::AUTO;
+		else if (modeName == "ENVIRONMENT") return BufferingMode::ENVIRONMENT;
+		else if (modeName == "RELATION") return BufferingMode::RELATION;
+		else if (modeName == "RECORD") return BufferingMode::RECORD;
+		else if (modeName == "ATTRIBUTE") return BufferingMode::ATTRIBUTE;
+		else throw std::invalid_argument("Invalid value of BufferingMode.");
+	}
+
+	void updateBufferingMode() {
+		if (bufferingMode == BufferingMode::ENVIRONMENT) {
+			bufferingMode = bufferingModeEnvDefault;
+			try {
+				char* modeName = getenv("RELPIPE_WRITER_BUFFERING_MODE");
+				if (modeName && strlen(modeName)) bufferingMode = decodeBufferingMode(modeName);
+			} catch (...) {
+				throw RelpipeWriterException(L"Invalid value of the RELPIPE_WRITER_BUFFERING_MODE environmental variable.");
+			}
+			if (bufferingMode == BufferingMode::ENVIRONMENT) throw RelpipeWriterException(L"RELPIPE_WRITER_BUFFERING_MODE must not be set to ENVIRONMENT (infinite recursion)");
+		}
+	}
+
 public:
 
 	StreamRelationalWriter(std::ostream &output) :
@@ -89,6 +115,8 @@
 		columnCount = attributes.size();
 		currentColumn = 0;
 
+		updateBufferingMode();
+
 		// Write table name and column count:
 		if (writeHeader) {
 			integerWriter.writeValue(output, DATA_PART_START);
@@ -114,26 +142,36 @@
 			columnTypes[c] = typeId;
 		}
 
-		// TODO: configurable buffer control
-		output.flush();
+		if (bufferingMode >= BufferingMode::RELATION && bufferingMode <= BufferingMode::ATTRIBUTE) output.flush();
 	}
 
 	void writeAttribute(const string_t& value) override {
 		if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);
+
 		// TODO: select writer for each attribute just once in startRelation() instead of looking it each time here
 		writeString(value, columnTypes[currentColumn]);
-		if (++currentColumn == columnCount) currentColumn = 0;
-		// TODO: configurable buffer control
-		output.flush();
+
+		bool endOfRedord = ++currentColumn == columnCount;
+		if (endOfRedord) currentColumn = 0;
+
+		if (bufferingMode == BufferingMode::ATTRIBUTE || (endOfRedord && bufferingMode == BufferingMode::RECORD)) output.flush();
 	}
 
 	void writeAttribute(const void* value, const std::type_info& type) override {
 		if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);
+
 		// TODO: select writer for each attribute just once in startRelation() instead of looking it each time here
 		writeRaw(value, type, columnTypes[currentColumn]);
-		if (++currentColumn == columnCount) currentColumn = 0;
-		// TODO: configurable buffer control
-		output.flush();
+
+		bool endOfRedord = ++currentColumn == columnCount;
+		if (endOfRedord) currentColumn = 0;
+
+		if (bufferingMode == BufferingMode::ATTRIBUTE || (endOfRedord && bufferingMode == BufferingMode::RECORD)) output.flush();
+	}
+
+	void setBufferingMode(BufferingMode mode, BufferingMode envDefault) override {
+		bufferingMode = mode;
+		bufferingModeEnvDefault = envDefault;
 	}
 
 };