158 if (typeId == relpipe::reader::TypeId::BOOLEAN) return L"integer"; // TODO: bit type might fit better, but needs more testing (support in various DBMS and their drivers) |
158 if (typeId == relpipe::reader::TypeId::BOOLEAN) return L"integer"; // TODO: bit type might fit better, but needs more testing (support in various DBMS and their drivers) |
159 else if (typeId == relpipe::reader::TypeId::INTEGER) return L"bigint"; |
159 else if (typeId == relpipe::reader::TypeId::INTEGER) return L"bigint"; |
160 else return L"text"; |
160 else return L"text"; |
161 } |
161 } |
162 |
162 |
|
163 bool isInsertable(const relpipe::writer::string_t& tableName) { |
|
164 for (Connection::TablePrivilege tp : connection->getTablePrivileges()) if (tp.name == tableName && tp.privilege == L"INSERT") return true; |
|
165 return false; |
|
166 } |
|
167 |
163 void writeIdentifier(std::wstringstream& output, relpipe::writer::string_t identifier) { |
168 void writeIdentifier(std::wstringstream& output, relpipe::writer::string_t identifier) { |
164 output << L'"'; |
169 output << L'"'; |
165 for (auto & ch : identifier) { |
170 for (auto & ch : identifier) { |
166 if (ch == L'"') output << L"\"\""; |
171 if (ch == L'"') output << L"\"\""; |
167 else output << ch; |
172 else output << ch; |
168 } |
173 } |
169 output << L'"'; |
174 output << L'"'; |
170 } |
175 } |
171 |
176 |
172 Connection* getConnection() { |
177 void createTable(const string_t& name, const std::vector<AttributeMetadata>& attributes) { |
173 if (configuration.dataSourceName.size()) return driverManager->getConnectionByDSN(configuration.dataSourceName); |
|
174 else if (configuration.dataSourceString.size()) return driverManager->getConnectionByString(configuration.dataSourceString); |
|
175 else return driverManager->getConnectionByString(L"Driver=SQLite3;Database=:memory:"); |
|
176 // SQLite is default/fallback oprion |
|
177 // TODO: use environmental variable to allow setting a different default |
|
178 } |
|
179 |
|
180 public: |
|
181 |
|
182 SqlHandler(writer::RelationalWriter* relationalWriter, DriverManager* driverManager, Configuration& configuration) : relationalWriter(relationalWriter), driverManager(driverManager), configuration(configuration) { |
|
183 connection.reset(getConnection()); |
|
184 //connection->setAutoCommit(false); |
|
185 } |
|
186 |
|
187 virtual ~SqlHandler() { |
|
188 } |
|
189 |
|
190 void startRelation(string_t name, vector<AttributeMetadata> attributes) override { |
|
191 currentReaderMetadata = attributes; |
|
192 |
|
193 // CREATE TABLE: |
|
194 std::wstringstream sql; |
178 std::wstringstream sql; |
195 // TODO: if already exist just append new columns |
|
196 sql << L"CREATE TABLE "; |
179 sql << L"CREATE TABLE "; |
197 writeIdentifier(sql, name); |
180 writeIdentifier(sql, name); |
198 sql << L" (\n"; |
181 sql << L" (\n"; |
199 for (int i = 0; i < attributes.size(); i++) { |
182 for (int i = 0; i < attributes.size(); i++) { |
200 sql << L"\t"; |
183 sql << L"\t"; |
204 } |
187 } |
205 sql << L"\n)"; |
188 sql << L"\n)"; |
206 |
189 |
207 std::unique_ptr<PreparedStatement> createTable(connection->prepareStatement(sql.str())); |
190 std::unique_ptr<PreparedStatement> createTable(connection->prepareStatement(sql.str())); |
208 createTable->executeUpdate(); |
191 createTable->executeUpdate(); |
|
192 } |
|
193 |
|
194 Connection* getConnection() { |
|
195 if (configuration.dataSourceName.size()) return driverManager->getConnectionByDSN(configuration.dataSourceName); |
|
196 else if (configuration.dataSourceString.size()) return driverManager->getConnectionByString(configuration.dataSourceString); |
|
197 else return driverManager->getConnectionByString(L"Driver=SQLite3;Database=:memory:"); |
|
198 // SQLite is default/fallback oprion |
|
199 // TODO: use environmental variable to allow setting a different default |
|
200 } |
|
201 |
|
202 public: |
|
203 |
|
204 SqlHandler(writer::RelationalWriter* relationalWriter, DriverManager* driverManager, Configuration& configuration) : relationalWriter(relationalWriter), driverManager(driverManager), configuration(configuration) { |
|
205 connection.reset(getConnection()); |
|
206 //connection->setAutoCommit(false); |
|
207 } |
|
208 |
|
209 virtual ~SqlHandler() { |
|
210 } |
|
211 |
|
212 void startRelation(string_t name, vector<AttributeMetadata> attributes) override { |
|
213 currentReaderMetadata = attributes; |
|
214 |
|
215 // CREATE TABLE: |
|
216 if (configuration.onDuplicateRelation == OnDuplicateRelation::Fail) { |
|
217 createTable(name, attributes); // duplicate will cause exception |
|
218 } else if (configuration.onDuplicateRelation == OnDuplicateRelation::Insert) { |
|
219 if (isInsertable(name)); // nothing needs to be created; we will just append new records to the existing table |
|
220 else createTable(name, attributes); |
|
221 } else { |
|
222 throw SqlException(L"Unsupported OnDuplicateRelation mode: " + std::to_wstring((int) configuration.onDuplicateRelation)); |
|
223 } |
209 |
224 |
210 // prepare INSERT: |
225 // prepare INSERT: |
211 sql = wstringstream(); |
226 std::wstringstream sql; |
212 sql << L"INSERT INTO "; |
227 sql << L"INSERT INTO "; |
213 writeIdentifier(sql, name); |
228 writeIdentifier(sql, name); |
214 sql << L" VALUES ("; |
229 sql << L" ("; |
|
230 for (int i = 0; i < attributes.size(); i++) { |
|
231 writeIdentifier(sql, attributes[i].getAttributeName()); |
|
232 if (i < attributes.size() - 1) sql << L","; |
|
233 } |
|
234 sql << L") VALUES ("; |
215 for (int i = 0; i < attributes.size(); i++) { |
235 for (int i = 0; i < attributes.size(); i++) { |
216 sql << L"?"; |
236 sql << L"?"; |
217 if (i < attributes.size() - 1) sql << L","; |
237 if (i < attributes.size() - 1) sql << L","; |
218 } |
238 } |
219 sql << L")"; |
239 sql << L")"; |