src/SqlHandler.h
branchv_0
changeset 59 a1775ba6d056
parent 58 a4907b207f0c
equal deleted inserted replaced
58:a4907b207f0c 59:a1775ba6d056
   158 		if (typeId == relpipe::reader::TypeId::BOOLEAN) return L"integer"; // TODO: bit type might fit better, but needs more testing (support in various DBMS and their drivers)
   158 		if (typeId == relpipe::reader::TypeId::BOOLEAN) return L"integer"; // TODO: bit type might fit better, but needs more testing (support in various DBMS and their drivers)
   159 		else if (typeId == relpipe::reader::TypeId::INTEGER) return L"bigint";
   159 		else if (typeId == relpipe::reader::TypeId::INTEGER) return L"bigint";
   160 		else return L"text";
   160 		else return L"text";
   161 	}
   161 	}
   162 
   162 
       
   163 	bool isInsertable(const relpipe::writer::string_t& tableName) {
       
   164 		for (Connection::TablePrivilege tp : connection->getTablePrivileges()) if (tp.name == tableName && tp.privilege == L"INSERT") return true;
       
   165 		return false;
       
   166 	}
       
   167 
   163 	void writeIdentifier(std::wstringstream& output, relpipe::writer::string_t identifier) {
   168 	void writeIdentifier(std::wstringstream& output, relpipe::writer::string_t identifier) {
   164 		output << L'"';
   169 		output << L'"';
   165 		for (auto & ch : identifier) {
   170 		for (auto & ch : identifier) {
   166 			if (ch == L'"') output << L"\"\"";
   171 			if (ch == L'"') output << L"\"\"";
   167 			else output << ch;
   172 			else output << ch;
   168 		}
   173 		}
   169 		output << L'"';
   174 		output << L'"';
   170 	}
   175 	}
   171 
   176 
   172 	Connection* getConnection() {
   177 	void createTable(const string_t& name, const std::vector<AttributeMetadata>& attributes) {
   173 		if (configuration.dataSourceName.size()) return driverManager->getConnectionByDSN(configuration.dataSourceName);
       
   174 		else if (configuration.dataSourceString.size()) return driverManager->getConnectionByString(configuration.dataSourceString);
       
   175 		else return driverManager->getConnectionByString(L"Driver=SQLite3;Database=:memory:");
       
   176 		// SQLite is default/fallback oprion
       
   177 		// TODO: use environmental variable to allow setting a different default
       
   178 	}
       
   179 
       
   180 public:
       
   181 
       
   182 	SqlHandler(writer::RelationalWriter* relationalWriter, DriverManager* driverManager, Configuration& configuration) : relationalWriter(relationalWriter), driverManager(driverManager), configuration(configuration) {
       
   183 		connection.reset(getConnection());
       
   184 		//connection->setAutoCommit(false);
       
   185 	}
       
   186 
       
   187 	virtual ~SqlHandler() {
       
   188 	}
       
   189 
       
   190 	void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
       
   191 		currentReaderMetadata = attributes;
       
   192 
       
   193 		// CREATE TABLE:
       
   194 		std::wstringstream sql;
   178 		std::wstringstream sql;
   195 		// TODO: if already exist just append new columns
       
   196 		sql << L"CREATE TABLE ";
   179 		sql << L"CREATE TABLE ";
   197 		writeIdentifier(sql, name);
   180 		writeIdentifier(sql, name);
   198 		sql << L" (\n";
   181 		sql << L" (\n";
   199 		for (int i = 0; i < attributes.size(); i++) {
   182 		for (int i = 0; i < attributes.size(); i++) {
   200 			sql << L"\t";
   183 			sql << L"\t";
   204 		}
   187 		}
   205 		sql << L"\n)";
   188 		sql << L"\n)";
   206 
   189 
   207 		std::unique_ptr<PreparedStatement> createTable(connection->prepareStatement(sql.str()));
   190 		std::unique_ptr<PreparedStatement> createTable(connection->prepareStatement(sql.str()));
   208 		createTable->executeUpdate();
   191 		createTable->executeUpdate();
       
   192 	}
       
   193 
       
   194 	Connection* getConnection() {
       
   195 		if (configuration.dataSourceName.size()) return driverManager->getConnectionByDSN(configuration.dataSourceName);
       
   196 		else if (configuration.dataSourceString.size()) return driverManager->getConnectionByString(configuration.dataSourceString);
       
   197 		else return driverManager->getConnectionByString(L"Driver=SQLite3;Database=:memory:");
       
   198 		// SQLite is default/fallback oprion
       
   199 		// TODO: use environmental variable to allow setting a different default
       
   200 	}
       
   201 
       
   202 public:
       
   203 
       
   204 	SqlHandler(writer::RelationalWriter* relationalWriter, DriverManager* driverManager, Configuration& configuration) : relationalWriter(relationalWriter), driverManager(driverManager), configuration(configuration) {
       
   205 		connection.reset(getConnection());
       
   206 		//connection->setAutoCommit(false);
       
   207 	}
       
   208 
       
   209 	virtual ~SqlHandler() {
       
   210 	}
       
   211 
       
   212 	void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
       
   213 		currentReaderMetadata = attributes;
       
   214 
       
   215 		// CREATE TABLE:
       
   216 		if (configuration.onDuplicateRelation == OnDuplicateRelation::Fail) {
       
   217 			createTable(name, attributes); // duplicate will cause exception
       
   218 		} else if (configuration.onDuplicateRelation == OnDuplicateRelation::Insert) {
       
   219 			if (isInsertable(name)); // nothing needs to be created; we will just append new records to the existing table
       
   220 			else createTable(name, attributes);
       
   221 		} else {
       
   222 			throw SqlException(L"Unsupported OnDuplicateRelation mode: " + std::to_wstring((int) configuration.onDuplicateRelation));
       
   223 		}
   209 
   224 
   210 		// prepare INSERT:
   225 		// prepare INSERT:
   211 		sql = wstringstream();
   226 		std::wstringstream sql;
   212 		sql << L"INSERT INTO ";
   227 		sql << L"INSERT INTO ";
   213 		writeIdentifier(sql, name);
   228 		writeIdentifier(sql, name);
   214 		sql << L" VALUES (";
   229 		sql << L" (";
       
   230 		for (int i = 0; i < attributes.size(); i++) {
       
   231 			writeIdentifier(sql, attributes[i].getAttributeName());
       
   232 			if (i < attributes.size() - 1) sql << L",";
       
   233 		}
       
   234 		sql << L") VALUES (";
   215 		for (int i = 0; i < attributes.size(); i++) {
   235 		for (int i = 0; i < attributes.size(); i++) {
   216 			sql << L"?";
   236 			sql << L"?";
   217 			if (i < attributes.size() - 1) sql << L",";
   237 			if (i < attributes.size() - 1) sql << L",";
   218 		}
   238 		}
   219 		sql << L")";
   239 		sql << L")";