--- a/src/SqlHandler.h Tue Jul 30 18:40:35 2019 +0200
+++ b/src/SqlHandler.h Tue Jul 30 23:04:33 2019 +0200
@@ -76,6 +76,11 @@
else throw SqlException(L"Error while iterating over SQLite result.");
}
+ void reset() {
+ int result = sqlite3_reset(stmt);
+ if (result != SQLITE_OK) throw SqlException(L"Unable to reset SQLite prepared statement.");
+ }
+
int getColumnCount() {
return sqlite3_column_count(stmt);
}
@@ -112,11 +117,11 @@
sqlite3_close(db);
}
- PreparedStatement prepareStatement(const char* sql) {
+ PreparedStatement* prepareStatement(const char* sql) {
const char* remaining;
sqlite3_stmt *stmt;
int result = sqlite3_prepare(db, sql, -1, &stmt, &remaining);
- if (result == SQLITE_OK) return PreparedStatement(stmt);
+ if (result == SQLITE_OK) return new PreparedStatement(stmt);
else throw SqlException(L"Unable to prepare SQLite statement.");
}
@@ -127,26 +132,28 @@
Configuration configuration;
writer::RelationalWriter* relationalWriter;
std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings
+ vector<AttributeMetadata> currentReaderMetadata;
+ integer_t currentAttributeIndex = 0;
std::unique_ptr<Connection> connection;
- relpipe::writer::string_t currentInsert;
+ std::unique_ptr<PreparedStatement> currentInsert;
void processStatement(const Statement& statement) {
- PreparedStatement prepared = connection->prepareStatement(convertor.to_bytes(statement.sql).c_str());
- int columnCount = prepared.getColumnCount();
+ std::unique_ptr<PreparedStatement> prepared(connection->prepareStatement(convertor.to_bytes(statement.sql).c_str()));
+ int columnCount = prepared->getColumnCount();
int parameterCount = statement.parameters.size();
for (int i = 0; i < parameterCount; i++) {
- prepared.setString(i + 1, convertor.to_bytes(statement.parameters[i].value));
+ prepared->setString(i + 1, convertor.to_bytes(statement.parameters[i].value));
}
std::vector<relpipe::writer::AttributeMetadata> metadata;
// TODO: support also other data types
- for (int i = 0; i < columnCount; i++) metadata.push_back({convertor.from_bytes(prepared.getColumName(i).c_str()), relpipe::writer::TypeId::STRING});
+ for (int i = 0; i < columnCount; i++) metadata.push_back({convertor.from_bytes(prepared->getColumName(i).c_str()), relpipe::writer::TypeId::STRING});
relationalWriter->startRelation(statement.relation, metadata, true);
- while (prepared.next()) {
+ while (prepared->next()) {
for (int i = 0; i < columnCount; i++) {
- relationalWriter->writeAttribute(convertor.from_bytes(prepared.getString(i)));
+ relationalWriter->writeAttribute(convertor.from_bytes(prepared->getString(i)));
}
}
}
@@ -175,7 +182,11 @@
}
void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
+ currentReaderMetadata = attributes;
+
+ // CREATE TABLE:
std::wstringstream sql;
+ // TODO: if already exist just append new columns
sql << L"CREATE TABLE ";
writeIdentifier(sql, name);
sql << L" (\n";
@@ -187,12 +198,31 @@
}
sql << L"\n)";
- PreparedStatement createTable = connection->prepareStatement(convertor.to_bytes(sql.str()).c_str());
- createTable.next();
+ std::unique_ptr<PreparedStatement> createTable(connection->prepareStatement(convertor.to_bytes(sql.str()).c_str()));
+ createTable->next();
+
+ // prepare INSERT:
+ sql = wstringstream();
+ sql << L"INSERT INTO ";
+ writeIdentifier(sql, name);
+ sql << L" VALUES (";
+ for (int i = 0; i < attributes.size(); i++) {
+ sql << L"?";
+ if (i < attributes.size() - 1) sql << L",";
+ }
+ sql << L")";
+ currentInsert.reset(connection->prepareStatement(convertor.to_bytes(sql.str()).c_str()));
}
void attribute(const string_t& value) override {
-
+ currentAttributeIndex++;
+ if (currentAttributeIndex % currentReaderMetadata.size()) {
+ currentInsert->setString(currentAttributeIndex, convertor.to_bytes(value).c_str());
+ } else {
+ currentInsert->next();
+ currentInsert->reset();
+ currentAttributeIndex = 0;
+ }
}
void endOfPipe() {