74 if (result == SQLITE_ROW) return true; |
74 if (result == SQLITE_ROW) return true; |
75 else if (result == SQLITE_DONE) return false; |
75 else if (result == SQLITE_DONE) return false; |
76 else throw SqlException(L"Error while iterating over SQLite result."); |
76 else throw SqlException(L"Error while iterating over SQLite result."); |
77 } |
77 } |
78 |
78 |
|
79 void reset() { |
|
80 int result = sqlite3_reset(stmt); |
|
81 if (result != SQLITE_OK) throw SqlException(L"Unable to reset SQLite prepared statement."); |
|
82 } |
|
83 |
79 int getColumnCount() { |
84 int getColumnCount() { |
80 return sqlite3_column_count(stmt); |
85 return sqlite3_column_count(stmt); |
81 } |
86 } |
82 |
87 |
83 std::string getColumName(int columnIndex) { |
88 std::string getColumName(int columnIndex) { |
110 |
115 |
111 virtual ~Connection() { |
116 virtual ~Connection() { |
112 sqlite3_close(db); |
117 sqlite3_close(db); |
113 } |
118 } |
114 |
119 |
115 PreparedStatement prepareStatement(const char* sql) { |
120 PreparedStatement* prepareStatement(const char* sql) { |
116 const char* remaining; |
121 const char* remaining; |
117 sqlite3_stmt *stmt; |
122 sqlite3_stmt *stmt; |
118 int result = sqlite3_prepare(db, sql, -1, &stmt, &remaining); |
123 int result = sqlite3_prepare(db, sql, -1, &stmt, &remaining); |
119 if (result == SQLITE_OK) return PreparedStatement(stmt); |
124 if (result == SQLITE_OK) return new PreparedStatement(stmt); |
120 else throw SqlException(L"Unable to prepare SQLite statement."); |
125 else throw SqlException(L"Unable to prepare SQLite statement."); |
121 } |
126 } |
122 |
127 |
123 }; |
128 }; |
124 |
129 |
125 class SqlHandler : public RelationalReaderStringHandler { |
130 class SqlHandler : public RelationalReaderStringHandler { |
126 private: |
131 private: |
127 Configuration configuration; |
132 Configuration configuration; |
128 writer::RelationalWriter* relationalWriter; |
133 writer::RelationalWriter* relationalWriter; |
129 std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings |
134 std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings |
|
135 vector<AttributeMetadata> currentReaderMetadata; |
|
136 integer_t currentAttributeIndex = 0; |
130 std::unique_ptr<Connection> connection; |
137 std::unique_ptr<Connection> connection; |
131 relpipe::writer::string_t currentInsert; |
138 std::unique_ptr<PreparedStatement> currentInsert; |
132 |
139 |
133 void processStatement(const Statement& statement) { |
140 void processStatement(const Statement& statement) { |
134 PreparedStatement prepared = connection->prepareStatement(convertor.to_bytes(statement.sql).c_str()); |
141 std::unique_ptr<PreparedStatement> prepared(connection->prepareStatement(convertor.to_bytes(statement.sql).c_str())); |
135 int columnCount = prepared.getColumnCount(); |
142 int columnCount = prepared->getColumnCount(); |
136 int parameterCount = statement.parameters.size(); |
143 int parameterCount = statement.parameters.size(); |
137 |
144 |
138 for (int i = 0; i < parameterCount; i++) { |
145 for (int i = 0; i < parameterCount; i++) { |
139 prepared.setString(i + 1, convertor.to_bytes(statement.parameters[i].value)); |
146 prepared->setString(i + 1, convertor.to_bytes(statement.parameters[i].value)); |
140 } |
147 } |
141 |
148 |
142 std::vector<relpipe::writer::AttributeMetadata> metadata; |
149 std::vector<relpipe::writer::AttributeMetadata> metadata; |
143 // TODO: support also other data types |
150 // TODO: support also other data types |
144 for (int i = 0; i < columnCount; i++) metadata.push_back({convertor.from_bytes(prepared.getColumName(i).c_str()), relpipe::writer::TypeId::STRING}); |
151 for (int i = 0; i < columnCount; i++) metadata.push_back({convertor.from_bytes(prepared->getColumName(i).c_str()), relpipe::writer::TypeId::STRING}); |
145 relationalWriter->startRelation(statement.relation, metadata, true); |
152 relationalWriter->startRelation(statement.relation, metadata, true); |
146 |
153 |
147 while (prepared.next()) { |
154 while (prepared->next()) { |
148 for (int i = 0; i < columnCount; i++) { |
155 for (int i = 0; i < columnCount; i++) { |
149 relationalWriter->writeAttribute(convertor.from_bytes(prepared.getString(i))); |
156 relationalWriter->writeAttribute(convertor.from_bytes(prepared->getString(i))); |
150 } |
157 } |
151 } |
158 } |
152 } |
159 } |
153 |
160 |
154 relpipe::writer::string_t toSQLType(relpipe::reader::TypeId typeId) { |
161 relpipe::writer::string_t toSQLType(relpipe::reader::TypeId typeId) { |
173 |
180 |
174 virtual ~SqlHandler() { |
181 virtual ~SqlHandler() { |
175 } |
182 } |
176 |
183 |
177 void startRelation(string_t name, vector<AttributeMetadata> attributes) override { |
184 void startRelation(string_t name, vector<AttributeMetadata> attributes) override { |
|
185 currentReaderMetadata = attributes; |
|
186 |
|
187 // CREATE TABLE: |
178 std::wstringstream sql; |
188 std::wstringstream sql; |
|
189 // TODO: if already exist just append new columns |
179 sql << L"CREATE TABLE "; |
190 sql << L"CREATE TABLE "; |
180 writeIdentifier(sql, name); |
191 writeIdentifier(sql, name); |
181 sql << L" (\n"; |
192 sql << L" (\n"; |
182 for (int i = 0; i < attributes.size(); i++) { |
193 for (int i = 0; i < attributes.size(); i++) { |
183 sql << L"\t"; |
194 sql << L"\t"; |
185 sql << L" " << toSQLType(attributes[i].getTypeId()); |
196 sql << L" " << toSQLType(attributes[i].getTypeId()); |
186 if (i < attributes.size() - 1) sql << L",\n"; |
197 if (i < attributes.size() - 1) sql << L",\n"; |
187 } |
198 } |
188 sql << L"\n)"; |
199 sql << L"\n)"; |
189 |
200 |
190 PreparedStatement createTable = connection->prepareStatement(convertor.to_bytes(sql.str()).c_str()); |
201 std::unique_ptr<PreparedStatement> createTable(connection->prepareStatement(convertor.to_bytes(sql.str()).c_str())); |
191 createTable.next(); |
202 createTable->next(); |
|
203 |
|
204 // prepare INSERT: |
|
205 sql = wstringstream(); |
|
206 sql << L"INSERT INTO "; |
|
207 writeIdentifier(sql, name); |
|
208 sql << L" VALUES ("; |
|
209 for (int i = 0; i < attributes.size(); i++) { |
|
210 sql << L"?"; |
|
211 if (i < attributes.size() - 1) sql << L","; |
|
212 } |
|
213 sql << L")"; |
|
214 currentInsert.reset(connection->prepareStatement(convertor.to_bytes(sql.str()).c_str())); |
192 } |
215 } |
193 |
216 |
194 void attribute(const string_t& value) override { |
217 void attribute(const string_t& value) override { |
195 |
218 currentAttributeIndex++; |
|
219 if (currentAttributeIndex % currentReaderMetadata.size()) { |
|
220 currentInsert->setString(currentAttributeIndex, convertor.to_bytes(value).c_str()); |
|
221 } else { |
|
222 currentInsert->next(); |
|
223 currentInsert->reset(); |
|
224 currentAttributeIndex = 0; |
|
225 } |
196 } |
226 } |
197 |
227 |
198 void endOfPipe() { |
228 void endOfPipe() { |
199 for (const Statement& statement : configuration.statements) processStatement(statement); |
229 for (const Statement& statement : configuration.statements) processStatement(statement); |
200 |
230 |