src/StreamRelationalWriter.h
branchv_0
changeset 59 4fce579bed22
parent 58 25c1ff79297c
child 60 1b20c1e03065
--- a/src/StreamRelationalWriter.h	Sat Apr 23 21:09:24 2022 +0200
+++ b/src/StreamRelationalWriter.h	Sat Apr 23 23:23:29 2022 +0200
@@ -20,8 +20,10 @@
 #pragma once
 
 #include <string>
+#include <cstring>
 #include <iostream>
 #include <vector>
+#include <stdexcept>
 
 #include <relpipe/protocol/constants.h>
 
@@ -46,6 +48,8 @@
 	types::IntegerDataTypeWriter integerWriter;
 	types::StringDataTypeWriter stringWriter;
 	std::vector<DataTypeWriterBase*> writers = {&booleanWriter, &integerWriter, &stringWriter};
+	BufferingMode bufferingMode = BufferingMode::ENVIRONMENT;
+	BufferingMode bufferingModeEnvDefault = BufferingMode::AUTO;
 
 	/**
 	 * count of columns in the current table
@@ -73,6 +77,28 @@
 		throw RelpipeWriterException(L"Unsupported data type: " + static_cast<integer_t> (typeId));
 	}
 
+	BufferingMode decodeBufferingMode(std::string modeName) {
+		if (modeName == "AUTO") return BufferingMode::AUTO;
+		else if (modeName == "ENVIRONMENT") return BufferingMode::ENVIRONMENT;
+		else if (modeName == "RELATION") return BufferingMode::RELATION;
+		else if (modeName == "RECORD") return BufferingMode::RECORD;
+		else if (modeName == "ATTRIBUTE") return BufferingMode::ATTRIBUTE;
+		else throw std::invalid_argument("Invalid value of BufferingMode.");
+	}
+
+	void updateBufferingMode() {
+		if (bufferingMode == BufferingMode::ENVIRONMENT) {
+			bufferingMode = bufferingModeEnvDefault;
+			try {
+				char* modeName = getenv("RELPIPE_WRITER_BUFFERING_MODE");
+				if (modeName && strlen(modeName)) bufferingMode = decodeBufferingMode(modeName);
+			} catch (...) {
+				throw RelpipeWriterException(L"Invalid value of the RELPIPE_WRITER_BUFFERING_MODE environmental variable.");
+			}
+			if (bufferingMode == BufferingMode::ENVIRONMENT) throw RelpipeWriterException(L"RELPIPE_WRITER_BUFFERING_MODE must not be set to ENVIRONMENT (infinite recursion)");
+		}
+	}
+
 public:
 
 	StreamRelationalWriter(std::ostream &output) :
@@ -89,6 +115,8 @@
 		columnCount = attributes.size();
 		currentColumn = 0;
 
+		updateBufferingMode();
+
 		// Write table name and column count:
 		if (writeHeader) {
 			integerWriter.writeValue(output, DATA_PART_START);
@@ -114,26 +142,36 @@
 			columnTypes[c] = typeId;
 		}
 
-		// TODO: configurable buffer control
-		output.flush();
+		if (bufferingMode >= BufferingMode::RELATION && bufferingMode <= BufferingMode::ATTRIBUTE) output.flush();
 	}
 
 	void writeAttribute(const string_t& value) override {
 		if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);
+
 		// TODO: select writer for each attribute just once in startRelation() instead of looking it each time here
 		writeString(value, columnTypes[currentColumn]);
-		if (++currentColumn == columnCount) currentColumn = 0;
-		// TODO: configurable buffer control
-		output.flush();
+
+		bool endOfRedord = ++currentColumn == columnCount;
+		if (endOfRedord) currentColumn = 0;
+
+		if (bufferingMode == BufferingMode::ATTRIBUTE || (endOfRedord && bufferingMode == BufferingMode::RECORD)) output.flush();
 	}
 
 	void writeAttribute(const void* value, const std::type_info& type) override {
 		if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);
+
 		// TODO: select writer for each attribute just once in startRelation() instead of looking it each time here
 		writeRaw(value, type, columnTypes[currentColumn]);
-		if (++currentColumn == columnCount) currentColumn = 0;
-		// TODO: configurable buffer control
-		output.flush();
+
+		bool endOfRedord = ++currentColumn == columnCount;
+		if (endOfRedord) currentColumn = 0;
+
+		if (bufferingMode == BufferingMode::ATTRIBUTE || (endOfRedord && bufferingMode == BufferingMode::RECORD)) output.flush();
+	}
+
+	void setBufferingMode(BufferingMode mode, BufferingMode envDefault) override {
+		bufferingMode = mode;
+		bufferingModeEnvDefault = envDefault;
 	}
 
 };