# HG changeset patch # User František Kučera # Date 1564520673 -7200 # Node ID 9119b29d1e7cf2b0b375564ad24e139e11ec5004 # Parent 32b4293307f47aa14ef011d477fe305e31677242 insert records diff -r 32b4293307f4 -r 9119b29d1e7c src/SqlHandler.h --- a/src/SqlHandler.h Tue Jul 30 18:40:35 2019 +0200 +++ b/src/SqlHandler.h Tue Jul 30 23:04:33 2019 +0200 @@ -76,6 +76,11 @@ else throw SqlException(L"Error while iterating over SQLite result."); } + void reset() { + int result = sqlite3_reset(stmt); + if (result != SQLITE_OK) throw SqlException(L"Unable to reset SQLite prepared statement."); + } + int getColumnCount() { return sqlite3_column_count(stmt); } @@ -112,11 +117,11 @@ sqlite3_close(db); } - PreparedStatement prepareStatement(const char* sql) { + PreparedStatement* prepareStatement(const char* sql) { const char* remaining; sqlite3_stmt *stmt; int result = sqlite3_prepare(db, sql, -1, &stmt, &remaining); - if (result == SQLITE_OK) return PreparedStatement(stmt); + if (result == SQLITE_OK) return new PreparedStatement(stmt); else throw SqlException(L"Unable to prepare SQLite statement."); } @@ -127,26 +132,28 @@ Configuration configuration; writer::RelationalWriter* relationalWriter; std::wstring_convert> convertor; // TODO: support also other encodings + vector currentReaderMetadata; + integer_t currentAttributeIndex = 0; std::unique_ptr connection; - relpipe::writer::string_t currentInsert; + std::unique_ptr currentInsert; void processStatement(const Statement& statement) { - PreparedStatement prepared = connection->prepareStatement(convertor.to_bytes(statement.sql).c_str()); - int columnCount = prepared.getColumnCount(); + std::unique_ptr prepared(connection->prepareStatement(convertor.to_bytes(statement.sql).c_str())); + int columnCount = prepared->getColumnCount(); int parameterCount = statement.parameters.size(); for (int i = 0; i < parameterCount; i++) { - prepared.setString(i + 1, convertor.to_bytes(statement.parameters[i].value)); + prepared->setString(i + 1, convertor.to_bytes(statement.parameters[i].value)); } std::vector metadata; // TODO: support also other data types - for (int i = 0; i < columnCount; i++) metadata.push_back({convertor.from_bytes(prepared.getColumName(i).c_str()), relpipe::writer::TypeId::STRING}); + for (int i = 0; i < columnCount; i++) metadata.push_back({convertor.from_bytes(prepared->getColumName(i).c_str()), relpipe::writer::TypeId::STRING}); relationalWriter->startRelation(statement.relation, metadata, true); - while (prepared.next()) { + while (prepared->next()) { for (int i = 0; i < columnCount; i++) { - relationalWriter->writeAttribute(convertor.from_bytes(prepared.getString(i))); + relationalWriter->writeAttribute(convertor.from_bytes(prepared->getString(i))); } } } @@ -175,7 +182,11 @@ } void startRelation(string_t name, vector attributes) override { + currentReaderMetadata = attributes; + + // CREATE TABLE: std::wstringstream sql; + // TODO: if already exist just append new columns sql << L"CREATE TABLE "; writeIdentifier(sql, name); sql << L" (\n"; @@ -187,12 +198,31 @@ } sql << L"\n)"; - PreparedStatement createTable = connection->prepareStatement(convertor.to_bytes(sql.str()).c_str()); - createTable.next(); + std::unique_ptr createTable(connection->prepareStatement(convertor.to_bytes(sql.str()).c_str())); + createTable->next(); + + // prepare INSERT: + sql = wstringstream(); + sql << L"INSERT INTO "; + writeIdentifier(sql, name); + sql << L" VALUES ("; + for (int i = 0; i < attributes.size(); i++) { + sql << L"?"; + if (i < attributes.size() - 1) sql << L","; + } + sql << L")"; + currentInsert.reset(connection->prepareStatement(convertor.to_bytes(sql.str()).c_str())); } void attribute(const string_t& value) override { - + currentAttributeIndex++; + if (currentAttributeIndex % currentReaderMetadata.size()) { + currentInsert->setString(currentAttributeIndex, convertor.to_bytes(value).c_str()); + } else { + currentInsert->next(); + currentInsert->reset(); + currentAttributeIndex = 0; + } } void endOfPipe() {