--- a/src/StreamRelationalWriter.h Sat Apr 23 21:09:24 2022 +0200
+++ b/src/StreamRelationalWriter.h Sat Apr 23 23:23:29 2022 +0200
@@ -20,8 +20,10 @@
#pragma once
#include <string>
+#include <cstring>
#include <iostream>
#include <vector>
+#include <stdexcept>
#include <relpipe/protocol/constants.h>
@@ -46,6 +48,8 @@
types::IntegerDataTypeWriter integerWriter;
types::StringDataTypeWriter stringWriter;
std::vector<DataTypeWriterBase*> writers = {&booleanWriter, &integerWriter, &stringWriter};
+ BufferingMode bufferingMode = BufferingMode::ENVIRONMENT;
+ BufferingMode bufferingModeEnvDefault = BufferingMode::AUTO;
/**
* count of columns in the current table
@@ -73,6 +77,28 @@
throw RelpipeWriterException(L"Unsupported data type: " + static_cast<integer_t> (typeId));
}
+ BufferingMode decodeBufferingMode(std::string modeName) {
+ if (modeName == "AUTO") return BufferingMode::AUTO;
+ else if (modeName == "ENVIRONMENT") return BufferingMode::ENVIRONMENT;
+ else if (modeName == "RELATION") return BufferingMode::RELATION;
+ else if (modeName == "RECORD") return BufferingMode::RECORD;
+ else if (modeName == "ATTRIBUTE") return BufferingMode::ATTRIBUTE;
+ else throw std::invalid_argument("Invalid value of BufferingMode.");
+ }
+
+ void updateBufferingMode() {
+ if (bufferingMode == BufferingMode::ENVIRONMENT) {
+ bufferingMode = bufferingModeEnvDefault;
+ try {
+ char* modeName = getenv("RELPIPE_WRITER_BUFFERING_MODE");
+ if (modeName && strlen(modeName)) bufferingMode = decodeBufferingMode(modeName);
+ } catch (...) {
+ throw RelpipeWriterException(L"Invalid value of the RELPIPE_WRITER_BUFFERING_MODE environmental variable.");
+ }
+ if (bufferingMode == BufferingMode::ENVIRONMENT) throw RelpipeWriterException(L"RELPIPE_WRITER_BUFFERING_MODE must not be set to ENVIRONMENT (infinite recursion)");
+ }
+ }
+
public:
StreamRelationalWriter(std::ostream &output) :
@@ -89,6 +115,8 @@
columnCount = attributes.size();
currentColumn = 0;
+ updateBufferingMode();
+
// Write table name and column count:
if (writeHeader) {
integerWriter.writeValue(output, DATA_PART_START);
@@ -114,26 +142,36 @@
columnTypes[c] = typeId;
}
- // TODO: configurable buffer control
- output.flush();
+ if (bufferingMode >= BufferingMode::RELATION && bufferingMode <= BufferingMode::ATTRIBUTE) output.flush();
}
void writeAttribute(const string_t& value) override {
if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);
+
// TODO: select writer for each attribute just once in startRelation() instead of looking it each time here
writeString(value, columnTypes[currentColumn]);
- if (++currentColumn == columnCount) currentColumn = 0;
- // TODO: configurable buffer control
- output.flush();
+
+ bool endOfRedord = ++currentColumn == columnCount;
+ if (endOfRedord) currentColumn = 0;
+
+ if (bufferingMode == BufferingMode::ATTRIBUTE || (endOfRedord && bufferingMode == BufferingMode::RECORD)) output.flush();
}
void writeAttribute(const void* value, const std::type_info& type) override {
if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);
+
// TODO: select writer for each attribute just once in startRelation() instead of looking it each time here
writeRaw(value, type, columnTypes[currentColumn]);
- if (++currentColumn == columnCount) currentColumn = 0;
- // TODO: configurable buffer control
- output.flush();
+
+ bool endOfRedord = ++currentColumn == columnCount;
+ if (endOfRedord) currentColumn = 0;
+
+ if (bufferingMode == BufferingMode::ATTRIBUTE || (endOfRedord && bufferingMode == BufferingMode::RECORD)) output.flush();
+ }
+
+ void setBufferingMode(BufferingMode mode, BufferingMode envDefault) override {
+ bufferingMode = mode;
+ bufferingModeEnvDefault = envDefault;
}
};