src/StreamRelationalWriter.h
branchv_0
changeset 58 25c1ff79297c
parent 52 785fc49b0a55
child 59 4fce579bed22
equal deleted inserted replaced
57:08c392fc66e4 58:25c1ff79297c
    60 	 * types of columns in the current table
    60 	 * types of columns in the current table
    61 	 */
    61 	 */
    62 	std::vector<TypeId> columnTypes;
    62 	std::vector<TypeId> columnTypes;
    63 
    63 
    64 	void writeString(const string_t &stringValue, const TypeId typeId) {
    64 	void writeString(const string_t &stringValue, const TypeId typeId) {
       
    65 		// TODO: cache writers at given positions
    65 		for (DataTypeWriterBase* writer : writers) if (writer->supports(typeId)) return writer->writeString(output, stringValue);
    66 		for (DataTypeWriterBase* writer : writers) if (writer->supports(typeId)) return writer->writeString(output, stringValue);
    66 		throw RelpipeWriterException(L"Unsupported data type: " + static_cast<integer_t> (typeId));
    67 		throw RelpipeWriterException(L"Unsupported data type: " + static_cast<integer_t> (typeId));
    67 	}
    68 	}
    68 
    69 
    69 	void writeRaw(const void* value, const type_info& typeInfo, const TypeId typeId) {
    70 	void writeRaw(const void* value, const type_info& typeInfo, const TypeId typeId) {
       
    71 		// TODO: cache writers at given positions
    70 		for (DataTypeWriterBase* writer : writers) if (writer->supports(typeId)) return writer->writeRaw(output, value, typeInfo);
    72 		for (DataTypeWriterBase* writer : writers) if (writer->supports(typeId)) return writer->writeRaw(output, value, typeInfo);
    71 		throw RelpipeWriterException(L"Unsupported data type: " + static_cast<integer_t> (typeId));
    73 		throw RelpipeWriterException(L"Unsupported data type: " + static_cast<integer_t> (typeId));
    72 	}
    74 	}
    73 
    75 
    74 public:
    76 public:
   110 			TypeId typeId = attributes[c].typeId;
   112 			TypeId typeId = attributes[c].typeId;
   111 			if (writeHeader) integerWriter.writeValue(output, static_cast<integer_t> (typeId));
   113 			if (writeHeader) integerWriter.writeValue(output, static_cast<integer_t> (typeId));
   112 			columnTypes[c] = typeId;
   114 			columnTypes[c] = typeId;
   113 		}
   115 		}
   114 
   116 
       
   117 		// TODO: configurable buffer control
       
   118 		output.flush();
   115 	}
   119 	}
   116 
   120 
   117 	void writeAttribute(const string_t& value) override {
   121 	void writeAttribute(const string_t& value) override {
   118 		if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);
   122 		if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);
   119 		// TODO: select writer for each attribute just once in startRelation() instead of looking it each time here
   123 		// TODO: select writer for each attribute just once in startRelation() instead of looking it each time here
   120 		writeString(value, columnTypes[currentColumn]);
   124 		writeString(value, columnTypes[currentColumn]);
   121 		if (++currentColumn == columnCount) currentColumn = 0;
   125 		if (++currentColumn == columnCount) currentColumn = 0;
       
   126 		// TODO: configurable buffer control
       
   127 		output.flush();
   122 	}
   128 	}
   123 
   129 
   124 	void writeAttribute(const void* value, const std::type_info& type) override {
   130 	void writeAttribute(const void* value, const std::type_info& type) override {
   125 		if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);
   131 		if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);
   126 		// TODO: select writer for each attribute just once in startRelation() instead of looking it each time here
   132 		// TODO: select writer for each attribute just once in startRelation() instead of looking it each time here
   127 		writeRaw(value, type, columnTypes[currentColumn]);
   133 		writeRaw(value, type, columnTypes[currentColumn]);
   128 		if (++currentColumn == columnCount) currentColumn = 0;
   134 		if (++currentColumn == columnCount) currentColumn = 0;
       
   135 		// TODO: configurable buffer control
       
   136 		output.flush();
   129 	}
   137 	}
   130 
   138 
   131 };
   139 };
   132 
   140 
   133 }
   141 }