src/StreamRelationalWriter.h
branchv_0
changeset 59 4fce579bed22
parent 58 25c1ff79297c
child 60 1b20c1e03065
equal deleted inserted replaced
58:25c1ff79297c 59:4fce579bed22
    18  * along with this program. If not, see <http://www.gnu.org/licenses/>.
    18  * along with this program. If not, see <http://www.gnu.org/licenses/>.
    19  */
    19  */
    20 #pragma once
    20 #pragma once
    21 
    21 
    22 #include <string>
    22 #include <string>
       
    23 #include <cstring>
    23 #include <iostream>
    24 #include <iostream>
    24 #include <vector>
    25 #include <vector>
       
    26 #include <stdexcept>
    25 
    27 
    26 #include <relpipe/protocol/constants.h>
    28 #include <relpipe/protocol/constants.h>
    27 
    29 
    28 #include "../include/relpipe/writer/typedefs.h"
    30 #include "../include/relpipe/writer/typedefs.h"
    29 #include "../include/relpipe/writer/RelationalWriter.h"
    31 #include "../include/relpipe/writer/RelationalWriter.h"
    44 	std::ostream &output;
    46 	std::ostream &output;
    45 	types::BooleanDataTypeWriter booleanWriter;
    47 	types::BooleanDataTypeWriter booleanWriter;
    46 	types::IntegerDataTypeWriter integerWriter;
    48 	types::IntegerDataTypeWriter integerWriter;
    47 	types::StringDataTypeWriter stringWriter;
    49 	types::StringDataTypeWriter stringWriter;
    48 	std::vector<DataTypeWriterBase*> writers = {&booleanWriter, &integerWriter, &stringWriter};
    50 	std::vector<DataTypeWriterBase*> writers = {&booleanWriter, &integerWriter, &stringWriter};
       
    51 	BufferingMode bufferingMode = BufferingMode::ENVIRONMENT;
       
    52 	BufferingMode bufferingModeEnvDefault = BufferingMode::AUTO;
    49 
    53 
    50 	/**
    54 	/**
    51 	 * count of columns in the current table
    55 	 * count of columns in the current table
    52 	 */
    56 	 */
    53 	integer_t columnCount;
    57 	integer_t columnCount;
    71 		// TODO: cache writers at given positions
    75 		// TODO: cache writers at given positions
    72 		for (DataTypeWriterBase* writer : writers) if (writer->supports(typeId)) return writer->writeRaw(output, value, typeInfo);
    76 		for (DataTypeWriterBase* writer : writers) if (writer->supports(typeId)) return writer->writeRaw(output, value, typeInfo);
    73 		throw RelpipeWriterException(L"Unsupported data type: " + static_cast<integer_t> (typeId));
    77 		throw RelpipeWriterException(L"Unsupported data type: " + static_cast<integer_t> (typeId));
    74 	}
    78 	}
    75 
    79 
       
    80 	BufferingMode decodeBufferingMode(std::string modeName) {
       
    81 		if (modeName == "AUTO") return BufferingMode::AUTO;
       
    82 		else if (modeName == "ENVIRONMENT") return BufferingMode::ENVIRONMENT;
       
    83 		else if (modeName == "RELATION") return BufferingMode::RELATION;
       
    84 		else if (modeName == "RECORD") return BufferingMode::RECORD;
       
    85 		else if (modeName == "ATTRIBUTE") return BufferingMode::ATTRIBUTE;
       
    86 		else throw std::invalid_argument("Invalid value of BufferingMode.");
       
    87 	}
       
    88 
       
    89 	void updateBufferingMode() {
       
    90 		if (bufferingMode == BufferingMode::ENVIRONMENT) {
       
    91 			bufferingMode = bufferingModeEnvDefault;
       
    92 			try {
       
    93 				char* modeName = getenv("RELPIPE_WRITER_BUFFERING_MODE");
       
    94 				if (modeName && strlen(modeName)) bufferingMode = decodeBufferingMode(modeName);
       
    95 			} catch (...) {
       
    96 				throw RelpipeWriterException(L"Invalid value of the RELPIPE_WRITER_BUFFERING_MODE environmental variable.");
       
    97 			}
       
    98 			if (bufferingMode == BufferingMode::ENVIRONMENT) throw RelpipeWriterException(L"RELPIPE_WRITER_BUFFERING_MODE must not be set to ENVIRONMENT (infinite recursion)");
       
    99 		}
       
   100 	}
       
   101 
    76 public:
   102 public:
    77 
   103 
    78 	StreamRelationalWriter(std::ostream &output) :
   104 	StreamRelationalWriter(std::ostream &output) :
    79 	output(output) {
   105 	output(output) {
    80 	}
   106 	}
    86 
   112 
    87 	void startRelation(string_t name, std::vector<AttributeMetadata> attributes, boolean_t writeHeader) override {
   113 	void startRelation(string_t name, std::vector<AttributeMetadata> attributes, boolean_t writeHeader) override {
    88 		string_t tableName = name;
   114 		string_t tableName = name;
    89 		columnCount = attributes.size();
   115 		columnCount = attributes.size();
    90 		currentColumn = 0;
   116 		currentColumn = 0;
       
   117 
       
   118 		updateBufferingMode();
    91 
   119 
    92 		// Write table name and column count:
   120 		// Write table name and column count:
    93 		if (writeHeader) {
   121 		if (writeHeader) {
    94 			integerWriter.writeValue(output, DATA_PART_START);
   122 			integerWriter.writeValue(output, DATA_PART_START);
    95 			stringWriter.writeValue(output, tableName);
   123 			stringWriter.writeValue(output, tableName);
   112 			TypeId typeId = attributes[c].typeId;
   140 			TypeId typeId = attributes[c].typeId;
   113 			if (writeHeader) integerWriter.writeValue(output, static_cast<integer_t> (typeId));
   141 			if (writeHeader) integerWriter.writeValue(output, static_cast<integer_t> (typeId));
   114 			columnTypes[c] = typeId;
   142 			columnTypes[c] = typeId;
   115 		}
   143 		}
   116 
   144 
   117 		// TODO: configurable buffer control
   145 		if (bufferingMode >= BufferingMode::RELATION && bufferingMode <= BufferingMode::ATTRIBUTE) output.flush();
   118 		output.flush();
       
   119 	}
   146 	}
   120 
   147 
   121 	void writeAttribute(const string_t& value) override {
   148 	void writeAttribute(const string_t& value) override {
   122 		if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);
   149 		if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);
       
   150 
   123 		// TODO: select writer for each attribute just once in startRelation() instead of looking it each time here
   151 		// TODO: select writer for each attribute just once in startRelation() instead of looking it each time here
   124 		writeString(value, columnTypes[currentColumn]);
   152 		writeString(value, columnTypes[currentColumn]);
   125 		if (++currentColumn == columnCount) currentColumn = 0;
   153 
   126 		// TODO: configurable buffer control
   154 		bool endOfRedord = ++currentColumn == columnCount;
   127 		output.flush();
   155 		if (endOfRedord) currentColumn = 0;
       
   156 
       
   157 		if (bufferingMode == BufferingMode::ATTRIBUTE || (endOfRedord && bufferingMode == BufferingMode::RECORD)) output.flush();
   128 	}
   158 	}
   129 
   159 
   130 	void writeAttribute(const void* value, const std::type_info& type) override {
   160 	void writeAttribute(const void* value, const std::type_info& type) override {
   131 		if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);
   161 		if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);
       
   162 
   132 		// TODO: select writer for each attribute just once in startRelation() instead of looking it each time here
   163 		// TODO: select writer for each attribute just once in startRelation() instead of looking it each time here
   133 		writeRaw(value, type, columnTypes[currentColumn]);
   164 		writeRaw(value, type, columnTypes[currentColumn]);
   134 		if (++currentColumn == columnCount) currentColumn = 0;
   165 
   135 		// TODO: configurable buffer control
   166 		bool endOfRedord = ++currentColumn == columnCount;
   136 		output.flush();
   167 		if (endOfRedord) currentColumn = 0;
       
   168 
       
   169 		if (bufferingMode == BufferingMode::ATTRIBUTE || (endOfRedord && bufferingMode == BufferingMode::RECORD)) output.flush();
       
   170 	}
       
   171 
       
   172 	void setBufferingMode(BufferingMode mode, BufferingMode envDefault) override {
       
   173 		bufferingMode = mode;
       
   174 		bufferingModeEnvDefault = envDefault;
   137 	}
   175 	}
   138 
   176 
   139 };
   177 };
   140 
   178 
   141 }
   179 }