148 vector<AttributeMetadata> currentReaderMetadata; |
148 vector<AttributeMetadata> currentReaderMetadata; |
149 integer_t currentAttributeIndex = 0; |
149 integer_t currentAttributeIndex = 0; |
150 std::unique_ptr<Connection> connection; |
150 std::unique_ptr<Connection> connection; |
151 std::unique_ptr<PreparedStatement> currentInsert; |
151 std::unique_ptr<PreparedStatement> currentInsert; |
152 |
152 |
|
153 bool readNextSqlStatement(std::wistream* input, std::wstringstream* sql) { |
|
154 sql->str(L""); |
|
155 sql->clear(); |
|
156 |
|
157 for (wchar_t ch; *input >> ch;) { |
|
158 *sql << ch; |
|
159 if (ch == L';' && sqlite3_complete(convertor.to_bytes(sql->str()).c_str())) return true; |
|
160 } |
|
161 |
|
162 string_t remainingSql = sql->str(); |
|
163 for (wchar_t ch : remainingSql) if (ch != L' ' && ch != L'\n' && ch != L'\r' && ch != L'\t') throw SqlException(L"Unexpected EOF, missing „;“ after: „" + remainingSql + L"“"); |
|
164 |
|
165 return false; |
|
166 } |
|
167 |
|
168 void processSqlInput(std::wistream* input) { |
|
169 if (input == nullptr) return; |
|
170 *input >> std::ws >> std::noskipws; |
|
171 for (std::wstringstream sql; readNextSqlStatement(input, &sql);) { |
|
172 std::unique_ptr<PreparedStatement> prepared(connection->prepareStatement(convertor.to_bytes(sql.str()).c_str())); |
|
173 while (prepared->next()); |
|
174 } |
|
175 } |
|
176 |
153 void processStatement(const Statement& statement) { |
177 void processStatement(const Statement& statement) { |
154 std::unique_ptr<PreparedStatement> prepared(connection->prepareStatement(convertor.to_bytes(statement.sql).c_str())); |
178 std::unique_ptr<PreparedStatement> prepared(connection->prepareStatement(convertor.to_bytes(statement.sql).c_str())); |
155 int columnCount = prepared->getColumnCount(); |
179 int columnCount = prepared->getColumnCount(); |
156 int parameterCount = statement.parameters.size(); |
180 int parameterCount = statement.parameters.size(); |
157 |
181 |
298 currentAttributeIndex = 0; |
322 currentAttributeIndex = 0; |
299 } |
323 } |
300 } |
324 } |
301 |
325 |
302 void endOfPipe() { |
326 void endOfPipe() { |
|
327 // process optional SQL input |
|
328 processSqlInput(configuration.sqlBeforeRelational); |
|
329 |
303 // run the transformation – process all statements: |
330 // run the transformation – process all statements: |
304 for (const Statement& statement : configuration.statements) processStatement(statement); |
331 for (const Statement& statement : configuration.statements) processStatement(statement); |
|
332 |
|
333 // process optional SQL input |
|
334 processSqlInput(configuration.sqlAfterRelational); |
305 |
335 |
306 // pass-through some relations: |
336 // pass-through some relations: |
307 if (configuration.dumpRelations.size()) dumpRelations(); |
337 if (configuration.dumpRelations.size()) dumpRelations(); |
308 |
338 |
309 // delete or keep the file: |
339 // delete or keep the file: |