src/SqlHandler.h
branchv_0
changeset 7 9119b29d1e7c
parent 6 32b4293307f4
child 8 3e076cc76c89
equal deleted inserted replaced
6:32b4293307f4 7:9119b29d1e7c
    74 		if (result == SQLITE_ROW) return true;
    74 		if (result == SQLITE_ROW) return true;
    75 		else if (result == SQLITE_DONE) return false;
    75 		else if (result == SQLITE_DONE) return false;
    76 		else throw SqlException(L"Error while iterating over SQLite result.");
    76 		else throw SqlException(L"Error while iterating over SQLite result.");
    77 	}
    77 	}
    78 
    78 
       
    79 	void reset() {
       
    80 		int result = sqlite3_reset(stmt);
       
    81 		if (result != SQLITE_OK) throw SqlException(L"Unable to reset SQLite prepared statement.");
       
    82 	}
       
    83 
    79 	int getColumnCount() {
    84 	int getColumnCount() {
    80 		return sqlite3_column_count(stmt);
    85 		return sqlite3_column_count(stmt);
    81 	}
    86 	}
    82 
    87 
    83 	std::string getColumName(int columnIndex) {
    88 	std::string getColumName(int columnIndex) {
   110 
   115 
   111 	virtual ~Connection() {
   116 	virtual ~Connection() {
   112 		sqlite3_close(db);
   117 		sqlite3_close(db);
   113 	}
   118 	}
   114 
   119 
   115 	PreparedStatement prepareStatement(const char* sql) {
   120 	PreparedStatement* prepareStatement(const char* sql) {
   116 		const char* remaining;
   121 		const char* remaining;
   117 		sqlite3_stmt *stmt;
   122 		sqlite3_stmt *stmt;
   118 		int result = sqlite3_prepare(db, sql, -1, &stmt, &remaining);
   123 		int result = sqlite3_prepare(db, sql, -1, &stmt, &remaining);
   119 		if (result == SQLITE_OK) return PreparedStatement(stmt);
   124 		if (result == SQLITE_OK) return new PreparedStatement(stmt);
   120 		else throw SqlException(L"Unable to prepare SQLite statement.");
   125 		else throw SqlException(L"Unable to prepare SQLite statement.");
   121 	}
   126 	}
   122 
   127 
   123 };
   128 };
   124 
   129 
   125 class SqlHandler : public RelationalReaderStringHandler {
   130 class SqlHandler : public RelationalReaderStringHandler {
   126 private:
   131 private:
   127 	Configuration configuration;
   132 	Configuration configuration;
   128 	writer::RelationalWriter* relationalWriter;
   133 	writer::RelationalWriter* relationalWriter;
   129 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings
   134 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings
       
   135 	vector<AttributeMetadata> currentReaderMetadata;
       
   136 	integer_t currentAttributeIndex = 0;
   130 	std::unique_ptr<Connection> connection;
   137 	std::unique_ptr<Connection> connection;
   131 	relpipe::writer::string_t currentInsert;
   138 	std::unique_ptr<PreparedStatement> currentInsert;
   132 
   139 
   133 	void processStatement(const Statement& statement) {
   140 	void processStatement(const Statement& statement) {
   134 		PreparedStatement prepared = connection->prepareStatement(convertor.to_bytes(statement.sql).c_str());
   141 		std::unique_ptr<PreparedStatement> prepared(connection->prepareStatement(convertor.to_bytes(statement.sql).c_str()));
   135 		int columnCount = prepared.getColumnCount();
   142 		int columnCount = prepared->getColumnCount();
   136 		int parameterCount = statement.parameters.size();
   143 		int parameterCount = statement.parameters.size();
   137 
   144 
   138 		for (int i = 0; i < parameterCount; i++) {
   145 		for (int i = 0; i < parameterCount; i++) {
   139 			prepared.setString(i + 1, convertor.to_bytes(statement.parameters[i].value));
   146 			prepared->setString(i + 1, convertor.to_bytes(statement.parameters[i].value));
   140 		}
   147 		}
   141 
   148 
   142 		std::vector<relpipe::writer::AttributeMetadata> metadata;
   149 		std::vector<relpipe::writer::AttributeMetadata> metadata;
   143 		// TODO: support also other data types
   150 		// TODO: support also other data types
   144 		for (int i = 0; i < columnCount; i++) metadata.push_back({convertor.from_bytes(prepared.getColumName(i).c_str()), relpipe::writer::TypeId::STRING});
   151 		for (int i = 0; i < columnCount; i++) metadata.push_back({convertor.from_bytes(prepared->getColumName(i).c_str()), relpipe::writer::TypeId::STRING});
   145 		relationalWriter->startRelation(statement.relation, metadata, true);
   152 		relationalWriter->startRelation(statement.relation, metadata, true);
   146 
   153 
   147 		while (prepared.next()) {
   154 		while (prepared->next()) {
   148 			for (int i = 0; i < columnCount; i++) {
   155 			for (int i = 0; i < columnCount; i++) {
   149 				relationalWriter->writeAttribute(convertor.from_bytes(prepared.getString(i)));
   156 				relationalWriter->writeAttribute(convertor.from_bytes(prepared->getString(i)));
   150 			}
   157 			}
   151 		}
   158 		}
   152 	}
   159 	}
   153 
   160 
   154 	relpipe::writer::string_t toSQLType(relpipe::reader::TypeId typeId) {
   161 	relpipe::writer::string_t toSQLType(relpipe::reader::TypeId typeId) {
   173 
   180 
   174 	virtual ~SqlHandler() {
   181 	virtual ~SqlHandler() {
   175 	}
   182 	}
   176 
   183 
   177 	void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
   184 	void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
       
   185 		currentReaderMetadata = attributes;
       
   186 
       
   187 		// CREATE TABLE:
   178 		std::wstringstream sql;
   188 		std::wstringstream sql;
       
   189 		// TODO: if already exist just append new columns
   179 		sql << L"CREATE TABLE ";
   190 		sql << L"CREATE TABLE ";
   180 		writeIdentifier(sql, name);
   191 		writeIdentifier(sql, name);
   181 		sql << L" (\n";
   192 		sql << L" (\n";
   182 		for (int i = 0; i < attributes.size(); i++) {
   193 		for (int i = 0; i < attributes.size(); i++) {
   183 			sql << L"\t";
   194 			sql << L"\t";
   185 			sql << L" " << toSQLType(attributes[i].getTypeId());
   196 			sql << L" " << toSQLType(attributes[i].getTypeId());
   186 			if (i < attributes.size() - 1) sql << L",\n";
   197 			if (i < attributes.size() - 1) sql << L",\n";
   187 		}
   198 		}
   188 		sql << L"\n)";
   199 		sql << L"\n)";
   189 
   200 
   190 		PreparedStatement createTable = connection->prepareStatement(convertor.to_bytes(sql.str()).c_str());
   201 		std::unique_ptr<PreparedStatement> createTable(connection->prepareStatement(convertor.to_bytes(sql.str()).c_str()));
   191 		createTable.next();
   202 		createTable->next();
       
   203 
       
   204 		// prepare INSERT:
       
   205 		sql = wstringstream();
       
   206 		sql << L"INSERT INTO ";
       
   207 		writeIdentifier(sql, name);
       
   208 		sql << L" VALUES (";
       
   209 		for (int i = 0; i < attributes.size(); i++) {
       
   210 			sql << L"?";
       
   211 			if (i < attributes.size() - 1) sql << L",";
       
   212 		}
       
   213 		sql << L")";
       
   214 		currentInsert.reset(connection->prepareStatement(convertor.to_bytes(sql.str()).c_str()));
   192 	}
   215 	}
   193 
   216 
   194 	void attribute(const string_t& value) override {
   217 	void attribute(const string_t& value) override {
   195 
   218 		currentAttributeIndex++;
       
   219 		if (currentAttributeIndex % currentReaderMetadata.size()) {
       
   220 			currentInsert->setString(currentAttributeIndex, convertor.to_bytes(value).c_str());
       
   221 		} else {
       
   222 			currentInsert->next();
       
   223 			currentInsert->reset();
       
   224 			currentAttributeIndex = 0;
       
   225 		}
   196 	}
   226 	}
   197 
   227 
   198 	void endOfPipe() {
   228 	void endOfPipe() {
   199 		for (const Statement& statement : configuration.statements) processStatement(statement);
   229 		for (const Statement& statement : configuration.statements) processStatement(statement);
   200 
   230