44 std::ostream &output; |
46 std::ostream &output; |
45 types::BooleanDataTypeWriter booleanWriter; |
47 types::BooleanDataTypeWriter booleanWriter; |
46 types::IntegerDataTypeWriter integerWriter; |
48 types::IntegerDataTypeWriter integerWriter; |
47 types::StringDataTypeWriter stringWriter; |
49 types::StringDataTypeWriter stringWriter; |
48 std::vector<DataTypeWriterBase*> writers = {&booleanWriter, &integerWriter, &stringWriter}; |
50 std::vector<DataTypeWriterBase*> writers = {&booleanWriter, &integerWriter, &stringWriter}; |
|
51 BufferingMode bufferingMode = BufferingMode::ENVIRONMENT; |
|
52 BufferingMode bufferingModeEnvDefault = BufferingMode::AUTO; |
49 |
53 |
50 /** |
54 /** |
51 * count of columns in the current table |
55 * count of columns in the current table |
52 */ |
56 */ |
53 integer_t columnCount; |
57 integer_t columnCount; |
71 // TODO: cache writers at given positions |
75 // TODO: cache writers at given positions |
72 for (DataTypeWriterBase* writer : writers) if (writer->supports(typeId)) return writer->writeRaw(output, value, typeInfo); |
76 for (DataTypeWriterBase* writer : writers) if (writer->supports(typeId)) return writer->writeRaw(output, value, typeInfo); |
73 throw RelpipeWriterException(L"Unsupported data type: " + static_cast<integer_t> (typeId)); |
77 throw RelpipeWriterException(L"Unsupported data type: " + static_cast<integer_t> (typeId)); |
74 } |
78 } |
75 |
79 |
|
80 BufferingMode decodeBufferingMode(std::string modeName) { |
|
81 if (modeName == "AUTO") return BufferingMode::AUTO; |
|
82 else if (modeName == "ENVIRONMENT") return BufferingMode::ENVIRONMENT; |
|
83 else if (modeName == "RELATION") return BufferingMode::RELATION; |
|
84 else if (modeName == "RECORD") return BufferingMode::RECORD; |
|
85 else if (modeName == "ATTRIBUTE") return BufferingMode::ATTRIBUTE; |
|
86 else throw std::invalid_argument("Invalid value of BufferingMode."); |
|
87 } |
|
88 |
|
89 void updateBufferingMode() { |
|
90 if (bufferingMode == BufferingMode::ENVIRONMENT) { |
|
91 bufferingMode = bufferingModeEnvDefault; |
|
92 try { |
|
93 char* modeName = getenv("RELPIPE_WRITER_BUFFERING_MODE"); |
|
94 if (modeName && strlen(modeName)) bufferingMode = decodeBufferingMode(modeName); |
|
95 } catch (...) { |
|
96 throw RelpipeWriterException(L"Invalid value of the RELPIPE_WRITER_BUFFERING_MODE environmental variable."); |
|
97 } |
|
98 if (bufferingMode == BufferingMode::ENVIRONMENT) throw RelpipeWriterException(L"RELPIPE_WRITER_BUFFERING_MODE must not be set to ENVIRONMENT (infinite recursion)"); |
|
99 } |
|
100 } |
|
101 |
76 public: |
102 public: |
77 |
103 |
78 StreamRelationalWriter(std::ostream &output) : |
104 StreamRelationalWriter(std::ostream &output) : |
79 output(output) { |
105 output(output) { |
80 } |
106 } |
86 |
112 |
87 void startRelation(string_t name, std::vector<AttributeMetadata> attributes, boolean_t writeHeader) override { |
113 void startRelation(string_t name, std::vector<AttributeMetadata> attributes, boolean_t writeHeader) override { |
88 string_t tableName = name; |
114 string_t tableName = name; |
89 columnCount = attributes.size(); |
115 columnCount = attributes.size(); |
90 currentColumn = 0; |
116 currentColumn = 0; |
|
117 |
|
118 updateBufferingMode(); |
91 |
119 |
92 // Write table name and column count: |
120 // Write table name and column count: |
93 if (writeHeader) { |
121 if (writeHeader) { |
94 integerWriter.writeValue(output, DATA_PART_START); |
122 integerWriter.writeValue(output, DATA_PART_START); |
95 stringWriter.writeValue(output, tableName); |
123 stringWriter.writeValue(output, tableName); |
112 TypeId typeId = attributes[c].typeId; |
140 TypeId typeId = attributes[c].typeId; |
113 if (writeHeader) integerWriter.writeValue(output, static_cast<integer_t> (typeId)); |
141 if (writeHeader) integerWriter.writeValue(output, static_cast<integer_t> (typeId)); |
114 columnTypes[c] = typeId; |
142 columnTypes[c] = typeId; |
115 } |
143 } |
116 |
144 |
117 // TODO: configurable buffer control |
145 if (bufferingMode >= BufferingMode::RELATION && bufferingMode <= BufferingMode::ATTRIBUTE) output.flush(); |
118 output.flush(); |
|
119 } |
146 } |
120 |
147 |
121 void writeAttribute(const string_t& value) override { |
148 void writeAttribute(const string_t& value) override { |
122 if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW); |
149 if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW); |
|
150 |
123 // TODO: select writer for each attribute just once in startRelation() instead of looking it each time here |
151 // TODO: select writer for each attribute just once in startRelation() instead of looking it each time here |
124 writeString(value, columnTypes[currentColumn]); |
152 writeString(value, columnTypes[currentColumn]); |
125 if (++currentColumn == columnCount) currentColumn = 0; |
153 |
126 // TODO: configurable buffer control |
154 bool endOfRedord = ++currentColumn == columnCount; |
127 output.flush(); |
155 if (endOfRedord) currentColumn = 0; |
|
156 |
|
157 if (bufferingMode == BufferingMode::ATTRIBUTE || (endOfRedord && bufferingMode == BufferingMode::RECORD)) output.flush(); |
128 } |
158 } |
129 |
159 |
130 void writeAttribute(const void* value, const std::type_info& type) override { |
160 void writeAttribute(const void* value, const std::type_info& type) override { |
131 if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW); |
161 if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW); |
|
162 |
132 // TODO: select writer for each attribute just once in startRelation() instead of looking it each time here |
163 // TODO: select writer for each attribute just once in startRelation() instead of looking it each time here |
133 writeRaw(value, type, columnTypes[currentColumn]); |
164 writeRaw(value, type, columnTypes[currentColumn]); |
134 if (++currentColumn == columnCount) currentColumn = 0; |
165 |
135 // TODO: configurable buffer control |
166 bool endOfRedord = ++currentColumn == columnCount; |
136 output.flush(); |
167 if (endOfRedord) currentColumn = 0; |
|
168 |
|
169 if (bufferingMode == BufferingMode::ATTRIBUTE || (endOfRedord && bufferingMode == BufferingMode::RECORD)) output.flush(); |
|
170 } |
|
171 |
|
172 void setBufferingMode(BufferingMode mode, BufferingMode envDefault) override { |
|
173 bufferingMode = mode; |
|
174 bufferingModeEnvDefault = envDefault; |
137 } |
175 } |
138 |
176 |
139 }; |
177 }; |
140 |
178 |
141 } |
179 } |