src/SqlHandler.h
author František Kučera <franta-hg@frantovo.cz>
Thu, 04 Jun 2020 00:46:00 +0200
branchv_0
changeset 47 428c278af4be
parent 37 3de41719d7eb
child 48 c83119110c7b
permissions -rw-r--r--
rename option --data-source-url to --data-source-string In some implementations like JDBC, the connection string is URL, but in ODBC the string is not formally URL, so it is better to use more general term „data source string“ instead of URL. - data source name (DSN) = name of a pre-configured database connection that should be looked-up in configuration and used - data source string (connection string) = arbitrary string containing (in certain encoding which might and might not be URL) all needed parameters (e.g. server name + port + user name + password) Name and string might sometimes be also combined: in ODBC we can e.g. connect to a string: DSN=relpipe;someParameter=foo;someOther=bar which will lookup configuration for the „relpipe“ data source and will combine it with given parameters.

/**
 * Relational pipes
 * Copyright © 2019 František Kučera (Frantovo.cz, GlobalCode.info)
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, version 3.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#pragma once

#include <memory>
#include <string>
#include <sstream>
#include <regex>
#include <vector>
#include <locale>
#include <codecvt>
#include <unistd.h>
#include <cassert>
#include <sys/stat.h>

#include <relpipe/reader/typedefs.h>
#include <relpipe/reader/TypeId.h>
#include <relpipe/reader/handlers/RelationalReaderValueHandler.h>
#include <relpipe/reader/handlers/AttributeMetadata.h>

#include <relpipe/writer/Factory.h>

#include "Configuration.h"
#include "SqlException.h"
#include "SqlInputScanner.h"
#include "PreparedStatement.h"
#include "Connection.h"
#include "DriverManager.h"

namespace relpipe {
namespace tr {
namespace sql {

using namespace std;
using namespace relpipe;
using namespace relpipe::reader;
using namespace relpipe::reader::handlers;

class SqlHandler : public RelationalReaderValueHandler {
private:
	Configuration configuration;
	boolean_t fileAlreadyExisted = false;
	writer::RelationalWriter* relationalWriter;
	DriverManager* driverManager;
	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings
	vector<AttributeMetadata> currentReaderMetadata;
	integer_t currentAttributeIndex = 0;
	std::unique_ptr<Connection> connection;
	std::unique_ptr<PreparedStatement> currentInsert;

	bool readNextSqlStatement(std::wistream* input, std::wstringstream* sql) {
		sql->str(L"");
		sql->clear();

		SqlInputScanner scanner;

		for (wchar_t ch; *input >> ch;) {
			if (scanner.append(ch)) {
				*sql << scanner.getAndReset().c_str();
				return true;
			}
		}

		// TODO: support comments at the end of the script (after last ;)
		string_t remainingSql = scanner.getAndReset();
		for (wchar_t ch : remainingSql) if (ch != L' ' && ch != L'\n' && ch != L'\r' && ch != L'\t') throw SqlException(L"Unexpected EOF, missing „;“ after: „" + remainingSql + L"“");

		return false;
	}

	void processSqlInput(std::wistream* input) {
		if (input == nullptr) return;
		*input >> std::ws >> std::noskipws;
		for (std::wstringstream sql; readNextSqlStatement(input, &sql);) {
			std::unique_ptr<PreparedStatement> prepared(connection->prepareStatement(sql.str()));
			prepared->executeUpdate();
		}
	}

	relpipe::writer::TypeId findType(string_t columnName, const Statement& statement, relpipe::writer::TypeId typeFromResultSet) {
		for (TypeCast typeCast : statement.typeCasts) if (typeCast.name == columnName) return relationalWriter->toTypeId(typeCast.type);
		return typeFromResultSet;
	}

	void processStatement(const Statement& statement) {
		std::shared_ptr<PreparedStatement> prepared(connection->prepareStatement(statement.sql));
		int parameterCount = statement.parameters.size();

		for (int i = 0; i < parameterCount; i++) {
			prepared->setString(i + 1, statement.parameters[i].value);
		}

		std::shared_ptr<ResultSet> resultSet(prepared->executeQuery());
		std::shared_ptr<ResultSet::MetaData> metaData(resultSet->getMetaData());

		auto columnCount = metaData->getColumnCount();
		std::vector<relpipe::writer::AttributeMetadata> metadata;
		for (int columnNumber = 1; columnNumber <= columnCount; columnNumber++) {
			auto columnDescriptor = metaData->describeColumn(columnNumber);
			metadata.push_back({columnDescriptor.name, findType(columnDescriptor.name, statement, columnDescriptor.type)});
		}
		relationalWriter->startRelation(statement.relation, metadata, true);

		while (resultSet->next()) {
			for (int columnNumber = 1; columnNumber <= columnCount; columnNumber++) {
				relationalWriter->writeAttribute(resultSet->getString(columnNumber));
			}
		}
	}

	void copyRelations(const CopyRelations& copy) {
		std::wregex pattern(copy.pattern);
		relpipe::writer::string_t userName = connection->getUserName();
		for (Connection::TablePrivilege tableMetaData : connection->getTablePrivileges()) {
			if (regex_match(tableMetaData.name, pattern) && tableMetaData.privilege == L"SELECT" && tableMetaData.grantee == userName) {
				// TODO: May we have multiple SELECT permissions for same table? Copy it only once.
				std::wstringstream select;
				select << L"SELECT * FROM ";
				if (tableMetaData.schema.size()) {
					// TODO: use qualified table name also for regex matching and for relation name
					writeIdentifier(select, tableMetaData.schema);
					select << L".";
				}
				writeIdentifier(select, tableMetaData.name);

				Statement statement;
				statement.relation = copy.replace ? regex_replace(tableMetaData.name, pattern, copy.replacement) : tableMetaData.name;
				statement.sql = select.str();
				processStatement(statement);
			}
		}
	}

	relpipe::writer::string_t toSQLType(relpipe::reader::TypeId typeId) {
		if (typeId == relpipe::reader::TypeId::BOOLEAN) return L"integer"; // TODO: map selected values back to booleans or allow optional storage as string 
		else if (typeId == relpipe::reader::TypeId::INTEGER) return L"integer";
		else return L"text";
	}

	void writeIdentifier(std::wstringstream& output, relpipe::writer::string_t identifier) {
		output << L'"';
		for (auto & ch : identifier) {
			if (ch == L'"') output << L"\"\"";
			else output << ch;
		}
		output << L'"';
	}

	Connection* getConnection() {
		if (configuration.dataSourceName.size()) return driverManager->getConnectionByDSN(configuration.dataSourceName);
		else if (configuration.dataSourceString.size()) return driverManager->getConnectionByString(configuration.dataSourceString);
		else return driverManager->getConnectionByString(L"Driver=SQLite3;Database=:memory:");
		// SQLite is default/fallback oprion
		// TODO: use environmental variable to allow setting a different default
	}

public:

	SqlHandler(writer::RelationalWriter* relationalWriter, DriverManager* driverManager, Configuration& configuration) : relationalWriter(relationalWriter), driverManager(driverManager), configuration(configuration) {
		connection.reset(getConnection());
		//connection->setAutoCommit(false);
	}

	virtual ~SqlHandler() {
	}

	void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
		currentReaderMetadata = attributes;

		// CREATE TABLE:
		std::wstringstream sql;
		// TODO: if already exist just append new columns
		sql << L"CREATE TABLE ";
		writeIdentifier(sql, name);
		sql << L" (\n";
		for (int i = 0; i < attributes.size(); i++) {
			sql << L"\t";
			writeIdentifier(sql, attributes[i].getAttributeName());
			sql << L" " << toSQLType(attributes[i].getTypeId());
			if (i < attributes.size() - 1) sql << L",\n";
		}
		sql << L"\n)";

		std::unique_ptr<PreparedStatement> createTable(connection->prepareStatement(sql.str()));
		createTable->executeUpdate();

		// prepare INSERT:
		sql = wstringstream();
		sql << L"INSERT INTO ";
		writeIdentifier(sql, name);
		sql << L" VALUES (";
		for (int i = 0; i < attributes.size(); i++) {
			sql << L"?";
			if (i < attributes.size() - 1) sql << L",";
		}
		sql << L")";
		currentInsert.reset(connection->prepareStatement(sql.str()));
	}

	void attribute(const void* value, const std::type_info& typeInfo) override {
		relpipe::reader::TypeId type = currentReaderMetadata[currentAttributeIndex].getTypeId();
		currentAttributeIndex++;

		switch (type) {
			case relpipe::reader::TypeId::BOOLEAN:
			{
				assert(typeInfo == typeid (boolean_t));
				auto* typedValue = static_cast<const boolean_t*> (value);
				currentInsert->setBoolean(currentAttributeIndex, *typedValue);
				break;
			}
			case relpipe::reader::TypeId::INTEGER:
			{
				assert(typeInfo == typeid (integer_t));
				auto* typedValue = static_cast<const integer_t*> (value);
				currentInsert->setInteger(currentAttributeIndex, *typedValue);
				break;
			}
			case relpipe::reader::TypeId::STRING:
			{
				assert(typeInfo == typeid (string_t));
				auto* typedValue = static_cast<const string_t*> (value);
				currentInsert->setString(currentAttributeIndex, *typedValue);
				break;
			}
			default:
				throw SqlException(L"Unsupported type in attribute()");
		}

		if (currentAttributeIndex % currentReaderMetadata.size() == 0) {
			currentInsert->executeUpdate();
			currentInsert->reset();
			currentAttributeIndex = 0;
		}
	}

	void endOfPipe() {
		// process optional SQL input
		processSqlInput(configuration.sqlBeforeRelational);

		// run the transformation – process all statements:
		for (const Statement& statement : configuration.statements) processStatement(statement);

		// process optional SQL input
		processSqlInput(configuration.sqlAfterRelational);

		// pass-through some relations:
		for (const CopyRelations& copy : configuration.copyRelations) copyRelations(copy);

		connection->commit();
	}

	static void listDataSources(writer::RelationalWriter* relationalWriter, DriverManager* driverManager) {
		relationalWriter->startRelation(L"data_source",{
			{L"name", writer::TypeId::STRING},
			{L"description", writer::TypeId::STRING}
		}, true);

		for (DriverManager::DataSource ds : driverManager->getDataSources()) {
			relationalWriter->writeAttribute(ds.name);
			relationalWriter->writeAttribute(ds.description);
		}
	}

};

}
}
}