20 namespace relpipe { |
20 namespace relpipe { |
21 namespace reader { |
21 namespace reader { |
22 |
22 |
23 using namespace relpipe::protocol; |
23 using namespace relpipe::protocol; |
24 |
24 |
|
25 using StringHandler = relpipe::reader::handlers::RelationalReaderStringHadler; |
|
26 using ValuesHandler = relpipe::reader::handlers::RelationalReaderValueHadler; |
|
27 |
25 class StreamRelationalReader : public RelationalReader { |
28 class StreamRelationalReader : public RelationalReader { |
26 private: |
29 private: |
27 std::istream &input; |
30 std::istream &input; |
28 types::BooleanDataTypeReader booleanReader; |
31 types::BooleanDataTypeReader booleanReader; |
29 types::IntegerDataTypeReader integerReader; |
32 types::IntegerDataTypeReader integerReader; |
30 types::StringDataTypeReader stringReader; |
33 types::StringDataTypeReader stringReader; |
31 std::vector<DataTypeReaderBase*> readers = {&booleanReader, &integerReader, &stringReader}; |
34 std::vector<DataTypeReaderBase*> readers = {&booleanReader, &integerReader, &stringReader}; |
32 |
35 |
33 std::vector<handlers::RelationalReaderStringHadler*> stringHandlers; |
36 std::vector<StringHandler*> stringHandlers; |
34 std::vector<handlers::RelationalReaderValueHadler*> valueHandlers; |
37 std::vector<ValuesHandler*> valuesHandlers; |
35 |
38 |
36 /** |
39 /** |
37 * count of columns in the current table |
40 * count of columns in the current table |
38 */ |
41 */ |
39 integer_t columnCount; |
42 integer_t columnCount; |
66 for (DataTypeReaderBase* reader : readers) if (reader->supports(typeId)) return reader->read(input, handler); |
69 for (DataTypeReaderBase* reader : readers) if (reader->supports(typeId)) return reader->read(input, handler); |
67 throw RelpipeReaderException(L"Unsupported data type: " + (int) typeId); |
70 throw RelpipeReaderException(L"Unsupported data type: " + (int) typeId); |
68 } |
71 } |
69 |
72 |
70 void endOfPipe() { |
73 void endOfPipe() { |
71 for (int i = 0; i < stringHandlers.size(); i++) stringHandlers[i]->endOfPipe(); |
74 for (StringHandler* handler : stringHandlers) handler->endOfPipe(); |
72 for (int i = 0; i < valueHandlers.size(); i++) valueHandlers[i]->endOfPipe(); |
75 for (ValuesHandler* handler : valuesHandlers) handler->endOfPipe(); |
73 } |
76 } |
74 |
77 |
75 public: |
78 public: |
76 |
79 |
77 StreamRelationalReader(std::istream &input) : |
80 StreamRelationalReader(std::istream &input) : |
81 string_t toTypeCode(const TypeId typeId) override { |
84 string_t toTypeCode(const TypeId typeId) override { |
82 for (DataTypeReaderBase* reader : readers) if (reader->supports(typeId)) return reader->getTypeCode(); |
85 for (DataTypeReaderBase* reader : readers) if (reader->supports(typeId)) return reader->getTypeCode(); |
83 throw RelpipeReaderException(L"Unsupported data type: " + static_cast<integer_t> (typeId)); |
86 throw RelpipeReaderException(L"Unsupported data type: " + static_cast<integer_t> (typeId)); |
84 } |
87 } |
85 |
88 |
86 void addHandler(handlers::RelationalReaderStringHadler* handler) override { |
89 void addHandler(StringHandler* handler) override { |
87 stringHandlers.push_back(handler); |
90 stringHandlers.push_back(handler); |
88 } |
91 } |
89 |
92 |
90 void addHandler(handlers::RelationalReaderValueHadler* handler) override { |
93 void addHandler(ValuesHandler* handler) override { |
91 valueHandlers.push_back(handler); |
94 valuesHandlers.push_back(handler); |
92 } |
95 } |
93 |
96 |
94 void process() override { |
97 void process() override { |
95 |
98 |
96 while (true) { |
99 while (true) { |
156 |
159 |
157 for (int i = 0; i < columnCount; i++) { |
160 for (int i = 0; i < columnCount; i++) { |
158 columns[i] = {columnNames[i], columnTypes[i]}; |
161 columns[i] = {columnNames[i], columnTypes[i]}; |
159 } |
162 } |
160 |
163 |
161 for (int i = 0; i < stringHandlers.size(); i++) stringHandlers[i]->startRelation(tableName, columns); |
164 for (StringHandler* handler : stringHandlers) handler->startRelation(tableName, columns); |
162 for (int i = 0; i < valueHandlers.size(); i++) valueHandlers[i]->startRelation(tableName, columns); |
165 for (ValuesHandler* handler : valuesHandlers) handler->startRelation(tableName, columns); |
163 |
166 |
164 } else if (dataPart == DATA_PART_ROW) { |
167 } else if (dataPart == DATA_PART_ROW) { |
165 for (int i = 0; i < columnCount; i++) { |
168 for (int i = 0; i < columnCount; i++) { |
166 TypeId typeId = columnTypes[i]; |
169 TypeId typeId = columnTypes[i]; |
167 |
170 |
168 if (stringHandlers.empty()) { |
171 if (stringHandlers.empty()) { |
169 read(input, [&](const void * rawValue, const std::type_info & typeInfo) { |
172 read(input, [&](const void * rawValue, const std::type_info & typeInfo) { |
170 for (int i = 0; i < valueHandlers.size(); i++) valueHandlers[i]->attribute(rawValue, typeInfo); |
173 for (ValuesHandler* handler : valuesHandlers) handler->attribute(rawValue, typeInfo); |
171 }, typeId); |
174 }, typeId); |
172 } else { |
175 } else { |
173 read(input, [&](const string_t& stringValue, const void * rawValue, const std::type_info & typeInfo) { |
176 read(input, [&](const string_t& stringValue, const void * rawValue, const std::type_info & typeInfo) { |
174 for (int i = 0; i < stringHandlers.size(); i++) stringHandlers[i]->attribute(stringValue); |
177 for (StringHandler* handler : stringHandlers) handler->attribute(stringValue); |
175 for (int i = 0; i < valueHandlers.size(); i++) valueHandlers[i]->attribute(rawValue, typeInfo); |
178 for (ValuesHandler* handler : valuesHandlers) handler->attribute(rawValue, typeInfo); |
176 }, typeId); |
179 }, typeId); |
177 } |
180 } |
178 } |
181 } |
179 |
182 |
180 } else { |
183 } else { |