src/SqlHandler.h
branchv_0
changeset 36 91cb012d779a
parent 34 24c05e69d68f
child 37 3de41719d7eb
equal deleted inserted replaced
35:cd9db43db120 36:91cb012d779a
    73 				*sql << scanner.getAndReset().c_str();
    73 				*sql << scanner.getAndReset().c_str();
    74 				return true;
    74 				return true;
    75 			}
    75 			}
    76 		}
    76 		}
    77 
    77 
       
    78 		// TODO: support comments at the end of the script (after last ;)
    78 		string_t remainingSql = scanner.getAndReset();
    79 		string_t remainingSql = scanner.getAndReset();
    79 		for (wchar_t ch : remainingSql) if (ch != L' ' && ch != L'\n' && ch != L'\r' && ch != L'\t') throw SqlException(L"Unexpected EOF, missing „;“ after: „" + remainingSql + L"“");
    80 		for (wchar_t ch : remainingSql) if (ch != L' ' && ch != L'\n' && ch != L'\r' && ch != L'\t') throw SqlException(L"Unexpected EOF, missing „;“ after: „" + remainingSql + L"“");
    80 
    81 
    81 		return false;
    82 		return false;
    82 	}
    83 	}
    83 
    84 
    84 	void processSqlInput(std::wistream* input) {
    85 	void processSqlInput(std::wistream* input) {
    85 		if (input == nullptr) return;
    86 		if (input == nullptr) return;
    86 		*input >> std::ws >> std::noskipws;
    87 		*input >> std::ws >> std::noskipws;
    87 		for (std::wstringstream sql; readNextSqlStatement(input, &sql);) {
    88 		for (std::wstringstream sql; readNextSqlStatement(input, &sql);) {
    88 			std::unique_ptr<PreparedStatement> prepared(connection->prepareStatement(convertor.to_bytes(sql.str()).c_str()));
    89 			std::unique_ptr<PreparedStatement> prepared(connection->prepareStatement(sql.str()));
    89 			while (prepared->next());
    90 			prepared->executeUpdate();
    90 		}
    91 		}
    91 	}
    92 	}
    92 
    93 
    93 	relpipe::writer::TypeId findType(string_t columnName, int columnIndex, const Statement& statement, std::shared_ptr<PreparedStatement> preparedStatement) {
    94 	relpipe::writer::TypeId findType(string_t columnName, const Statement& statement, relpipe::writer::TypeId typeFromResultSet) {
    94 		for (TypeCast typeCast : statement.typeCasts) if (typeCast.name == columnName) return relationalWriter->toTypeId(typeCast.type);
    95 		for (TypeCast typeCast : statement.typeCasts) if (typeCast.name == columnName) return relationalWriter->toTypeId(typeCast.type);
    95 		return preparedStatement->getColumType(columnIndex);
    96 		return typeFromResultSet;
    96 	}
    97 	}
    97 
    98 
    98 	void processStatement(const Statement& statement) {
    99 	void processStatement(const Statement& statement) {
    99 		std::shared_ptr<PreparedStatement> prepared(connection->prepareStatement(convertor.to_bytes(statement.sql).c_str()));
   100 		std::shared_ptr<PreparedStatement> prepared(connection->prepareStatement(statement.sql));
   100 		int columnCount = prepared->getColumnCount();
       
   101 		int parameterCount = statement.parameters.size();
   101 		int parameterCount = statement.parameters.size();
   102 
   102 
   103 		for (int i = 0; i < parameterCount; i++) {
   103 		for (int i = 0; i < parameterCount; i++) {
   104 			prepared->setString(i + 1, convertor.to_bytes(statement.parameters[i].value));
   104 			prepared->setString(i + 1, statement.parameters[i].value);
   105 		}
   105 		}
   106 
   106 
       
   107 		std::shared_ptr<ResultSet> resultSet(prepared->executeQuery());
       
   108 		std::shared_ptr<ResultSet::MetaData> metaData(resultSet->getMetaData());
       
   109 
       
   110 		auto columnCount = metaData->getColumnCount();
   107 		std::vector<relpipe::writer::AttributeMetadata> metadata;
   111 		std::vector<relpipe::writer::AttributeMetadata> metadata;
   108 		for (int i = 0; i < columnCount; i++) {
   112 		for (int columnNumber = 1; columnNumber <= columnCount; columnNumber++) {
   109 			string_t columnName = convertor.from_bytes(prepared->getColumName(i).c_str());
   113 			auto columnDescriptor = metaData->describeColumn(columnNumber);
   110 			metadata.push_back({columnName, findType(columnName, i, statement, prepared)});
   114 			metadata.push_back({columnDescriptor.name, findType(columnDescriptor.name, statement, columnDescriptor.type)});
   111 		}
   115 		}
   112 		relationalWriter->startRelation(statement.relation, metadata, true);
   116 		relationalWriter->startRelation(statement.relation, metadata, true);
   113 
   117 
   114 		while (prepared->next()) {
   118 		while (resultSet->next()) {
   115 			for (int i = 0; i < columnCount; i++) {
   119 			for (int columnNumber = 1; columnNumber <= columnCount; columnNumber++) {
   116 				relationalWriter->writeAttribute(convertor.from_bytes(prepared->getString(i)));
   120 				relationalWriter->writeAttribute(resultSet->getString(columnNumber));
   117 			}
   121 			}
   118 		}
   122 		}
   119 	}
       
   120 
       
   121 	std::vector<string_t> getAllRelations() {
       
   122 		std::vector<string_t> relations;
       
   123 		std::unique_ptr<PreparedStatement> prepared(connection->prepareStatement("SELECT name FROM sqlite_master WHERE type IN ('table', 'view')"));
       
   124 		while (prepared->next()) relations.push_back(convertor.from_bytes(prepared->getString(0)));
       
   125 		return relations;
       
   126 	}
   123 	}
   127 
   124 
   128 	void copyRelations(const CopyRelations& copy) {
   125 	void copyRelations(const CopyRelations& copy) {
   129 		std::wregex pattern(copy.pattern);
   126 		std::wregex pattern(copy.pattern);
   130 		for (string_t relation : getAllRelations()) {
   127 		relpipe::writer::string_t userName = connection->getUserName();
   131 			if (regex_match(relation, pattern)) {
   128 		for (Connection::TablePrivilege tableMetaData : connection->getTablePrivileges()) {
       
   129 			if (regex_match(tableMetaData.name, pattern) && tableMetaData.privilege == L"SELECT" && tableMetaData.grantee == userName) {
       
   130 				// TODO: May we have multiple SELECT permissions for same table? Copy it only once.
   132 				std::wstringstream select;
   131 				std::wstringstream select;
   133 				select << L"SELECT * FROM ";
   132 				select << L"SELECT * FROM ";
   134 				writeIdentifier(select, relation);
   133 				if (tableMetaData.schema.size()) {
       
   134 					// TODO: use qualified table name also for regex matching and for relation name
       
   135 					writeIdentifier(select, tableMetaData.schema);
       
   136 					select << L".";
       
   137 				}
       
   138 				writeIdentifier(select, tableMetaData.name);
   135 
   139 
   136 				Statement statement;
   140 				Statement statement;
   137 				statement.relation = copy.replace ? regex_replace(relation, pattern, copy.replacement) : relation;
   141 				statement.relation = copy.replace ? regex_replace(tableMetaData.name, pattern, copy.replacement) : tableMetaData.name;
   138 				statement.sql = select.str();
   142 				statement.sql = select.str();
   139 				processStatement(statement);
   143 				processStatement(statement);
   140 			}
   144 			}
   141 		}
   145 		}
   142 	}
   146 	}
   168 			fileAlreadyExisted = (stat(file.c_str(), &fileStat) == 0);
   172 			fileAlreadyExisted = (stat(file.c_str(), &fileStat) == 0);
   169 		} else {
   173 		} else {
   170 			file = ":memory:";
   174 			file = ":memory:";
   171 		}
   175 		}
   172 
   176 
   173 		connection.reset(new Connection(file.c_str()));
   177 		connection.reset(driverManager->getConnectionByDSN(L"sqlite-memory")); // FIXME: custom DSN and files
   174 		connection->setAutoCommit(false);
   178 		connection.reset(driverManager->getConnectionByDSN(L"relpipe")); // FIXME: custom DSN and files
       
   179 		//connection->setAutoCommit(false);
   175 	}
   180 	}
   176 
   181 
   177 	virtual ~SqlHandler() {
   182 	virtual ~SqlHandler() {
   178 	}
   183 	}
   179 
   184 
   192 			sql << L" " << toSQLType(attributes[i].getTypeId());
   197 			sql << L" " << toSQLType(attributes[i].getTypeId());
   193 			if (i < attributes.size() - 1) sql << L",\n";
   198 			if (i < attributes.size() - 1) sql << L",\n";
   194 		}
   199 		}
   195 		sql << L"\n)";
   200 		sql << L"\n)";
   196 
   201 
   197 		std::unique_ptr<PreparedStatement> createTable(connection->prepareStatement(convertor.to_bytes(sql.str()).c_str()));
   202 		std::unique_ptr<PreparedStatement> createTable(connection->prepareStatement(sql.str()));
   198 		createTable->next();
   203 		createTable->executeUpdate();
   199 
   204 
   200 		// prepare INSERT:
   205 		// prepare INSERT:
   201 		sql = wstringstream();
   206 		sql = wstringstream();
   202 		sql << L"INSERT INTO ";
   207 		sql << L"INSERT INTO ";
   203 		writeIdentifier(sql, name);
   208 		writeIdentifier(sql, name);
   205 		for (int i = 0; i < attributes.size(); i++) {
   210 		for (int i = 0; i < attributes.size(); i++) {
   206 			sql << L"?";
   211 			sql << L"?";
   207 			if (i < attributes.size() - 1) sql << L",";
   212 			if (i < attributes.size() - 1) sql << L",";
   208 		}
   213 		}
   209 		sql << L")";
   214 		sql << L")";
   210 		currentInsert.reset(connection->prepareStatement(convertor.to_bytes(sql.str()).c_str()));
   215 		currentInsert.reset(connection->prepareStatement(sql.str()));
   211 	}
   216 	}
   212 
   217 
   213 	void attribute(const void* value, const std::type_info& typeInfo) override {
   218 	void attribute(const void* value, const std::type_info& typeInfo) override {
   214 		relpipe::reader::TypeId type = currentReaderMetadata[currentAttributeIndex].getTypeId();
   219 		relpipe::reader::TypeId type = currentReaderMetadata[currentAttributeIndex].getTypeId();
   215 		currentAttributeIndex++;
   220 		currentAttributeIndex++;
   231 			}
   236 			}
   232 			case relpipe::reader::TypeId::STRING:
   237 			case relpipe::reader::TypeId::STRING:
   233 			{
   238 			{
   234 				assert(typeInfo == typeid (string_t));
   239 				assert(typeInfo == typeid (string_t));
   235 				auto* typedValue = static_cast<const string_t*> (value);
   240 				auto* typedValue = static_cast<const string_t*> (value);
   236 				currentInsert->setString(currentAttributeIndex, convertor.to_bytes(*typedValue).c_str());
   241 				currentInsert->setString(currentAttributeIndex, *typedValue);
   237 				break;
   242 				break;
   238 			}
   243 			}
   239 			default:
   244 			default:
   240 				throw SqlException(L"Unsupported type in attribute()");
   245 				throw SqlException(L"Unsupported type in attribute()");
   241 		}
   246 		}
   242 
   247 
   243 		if (currentAttributeIndex % currentReaderMetadata.size() == 0) {
   248 		if (currentAttributeIndex % currentReaderMetadata.size() == 0) {
   244 			currentInsert->next();
   249 			currentInsert->executeUpdate();
   245 			currentInsert->reset();
   250 			currentInsert->reset();
   246 			currentAttributeIndex = 0;
   251 			currentAttributeIndex = 0;
   247 		}
   252 		}
   248 	}
   253 	}
   249 
   254