src/SqlHandler.h
author František Kučera <franta-hg@frantovo.cz>
Thu, 04 Jun 2020 13:24:17 +0200
branchv_0
changeset 48 c83119110c7b
parent 47 428c278af4be
child 58 a4907b207f0c
permissions -rw-r--r--
support also boolean and integer data types, do not treat everything as mere strings

/**
 * Relational pipes
 * Copyright © 2019 František Kučera (Frantovo.cz, GlobalCode.info)
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, version 3.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#pragma once

#include <memory>
#include <string>
#include <sstream>
#include <regex>
#include <vector>
#include <locale>
#include <codecvt>
#include <unistd.h>
#include <cassert>
#include <sys/stat.h>

#include <relpipe/reader/typedefs.h>
#include <relpipe/reader/TypeId.h>
#include <relpipe/reader/handlers/RelationalReaderValueHandler.h>
#include <relpipe/reader/handlers/AttributeMetadata.h>

#include <relpipe/writer/Factory.h>

#include "Configuration.h"
#include "SqlException.h"
#include "SqlInputScanner.h"
#include "PreparedStatement.h"
#include "Connection.h"
#include "DriverManager.h"

namespace relpipe {
namespace tr {
namespace sql {

using namespace std;
using namespace relpipe;
using namespace relpipe::reader;
using namespace relpipe::reader::handlers;

class SqlHandler : public RelationalReaderValueHandler {
private:
	Configuration configuration;
	boolean_t fileAlreadyExisted = false;
	writer::RelationalWriter* relationalWriter;
	DriverManager* driverManager;
	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings
	vector<AttributeMetadata> currentReaderMetadata;
	integer_t currentAttributeIndex = 0;
	std::unique_ptr<Connection> connection;
	std::unique_ptr<PreparedStatement> currentInsert;

	bool readNextSqlStatement(std::wistream* input, std::wstringstream* sql) {
		sql->str(L"");
		sql->clear();

		SqlInputScanner scanner;

		for (wchar_t ch; *input >> ch;) {
			if (scanner.append(ch)) {
				*sql << scanner.getAndReset().c_str();
				return true;
			}
		}

		// TODO: support comments at the end of the script (after last ;)
		string_t remainingSql = scanner.getAndReset();
		for (wchar_t ch : remainingSql) if (ch != L' ' && ch != L'\n' && ch != L'\r' && ch != L'\t') throw SqlException(L"Unexpected EOF, missing „;“ after: „" + remainingSql + L"“");

		return false;
	}

	void processSqlInput(std::wistream* input) {
		if (input == nullptr) return;
		*input >> std::ws >> std::noskipws;
		for (std::wstringstream sql; readNextSqlStatement(input, &sql);) {
			std::unique_ptr<PreparedStatement> prepared(connection->prepareStatement(sql.str()));
			prepared->executeUpdate();
		}
	}

	relpipe::writer::TypeId findType(string_t columnName, const Statement& statement, relpipe::writer::TypeId typeFromResultSet) {
		for (TypeCast typeCast : statement.typeCasts) if (typeCast.name == columnName) return relationalWriter->toTypeId(typeCast.type);
		return typeFromResultSet;
	}

	void processStatement(const Statement& statement) {
		std::shared_ptr<PreparedStatement> prepared(connection->prepareStatement(statement.sql));
		int parameterCount = statement.parameters.size();

		for (int i = 0; i < parameterCount; i++) {
			prepared->setString(i + 1, statement.parameters[i].value);
		}

		std::shared_ptr<ResultSet> resultSet(prepared->executeQuery());
		std::shared_ptr<ResultSet::MetaData> metaData(resultSet->getMetaData());

		auto columnCount = metaData->getColumnCount();
		std::vector<relpipe::writer::AttributeMetadata> metadata;
		for (int columnNumber = 1; columnNumber <= columnCount; columnNumber++) {
			auto columnDescriptor = metaData->describeColumn(columnNumber);
			metadata.push_back({columnDescriptor.name, findType(columnDescriptor.name, statement, columnDescriptor.type)});
		}
		relationalWriter->startRelation(statement.relation, metadata, true);

		while (resultSet->next()) {
			for (int columnNumber = 1; columnNumber <= columnCount; columnNumber++) {
				// TODO: null values (when supported in the format)
				if (metadata[columnNumber - 1].typeId == relpipe::writer::TypeId::BOOLEAN) {
					auto booleanValue = resultSet->getBoolean(columnNumber);
					relationalWriter->writeAttribute(&booleanValue, typeid (booleanValue));
				} else if (metadata[columnNumber - 1].typeId == relpipe::writer::TypeId::INTEGER) {
					auto integerValue = resultSet->getInteger(columnNumber);
					relationalWriter->writeAttribute(&integerValue, typeid (integerValue));
				} else {
					relationalWriter->writeAttribute(resultSet->getString(columnNumber));
				}
			}
		}
	}

	void copyRelations(const CopyRelations& copy) {
		std::wregex pattern(copy.pattern);
		relpipe::writer::string_t userName = connection->getUserName();
		for (Connection::TablePrivilege tableMetaData : connection->getTablePrivileges()) {
			if (regex_match(tableMetaData.name, pattern) && tableMetaData.privilege == L"SELECT" && tableMetaData.grantee == userName) {
				// TODO: May we have multiple SELECT permissions for same table? Copy it only once.
				std::wstringstream select;
				select << L"SELECT * FROM ";
				if (tableMetaData.schema.size()) {
					// TODO: use qualified table name also for regex matching and for relation name
					writeIdentifier(select, tableMetaData.schema);
					select << L".";
				}
				writeIdentifier(select, tableMetaData.name);

				Statement statement;
				statement.relation = copy.replace ? regex_replace(tableMetaData.name, pattern, copy.replacement) : tableMetaData.name;
				statement.sql = select.str();
				processStatement(statement);
			}
		}
	}

	relpipe::writer::string_t toSQLType(relpipe::reader::TypeId typeId) {
		if (typeId == relpipe::reader::TypeId::BOOLEAN) return L"integer"; // TODO: bit type might fit better, but needs more testing (support in various DBMS and their drivers)
		else if (typeId == relpipe::reader::TypeId::INTEGER) return L"bigint";
		else return L"text";
	}

	void writeIdentifier(std::wstringstream& output, relpipe::writer::string_t identifier) {
		output << L'"';
		for (auto & ch : identifier) {
			if (ch == L'"') output << L"\"\"";
			else output << ch;
		}
		output << L'"';
	}

	Connection* getConnection() {
		if (configuration.dataSourceName.size()) return driverManager->getConnectionByDSN(configuration.dataSourceName);
		else if (configuration.dataSourceString.size()) return driverManager->getConnectionByString(configuration.dataSourceString);
		else return driverManager->getConnectionByString(L"Driver=SQLite3;Database=:memory:");
		// SQLite is default/fallback oprion
		// TODO: use environmental variable to allow setting a different default
	}

public:

	SqlHandler(writer::RelationalWriter* relationalWriter, DriverManager* driverManager, Configuration& configuration) : relationalWriter(relationalWriter), driverManager(driverManager), configuration(configuration) {
		connection.reset(getConnection());
		//connection->setAutoCommit(false);
	}

	virtual ~SqlHandler() {
	}

	void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
		currentReaderMetadata = attributes;

		// CREATE TABLE:
		std::wstringstream sql;
		// TODO: if already exist just append new columns
		sql << L"CREATE TABLE ";
		writeIdentifier(sql, name);
		sql << L" (\n";
		for (int i = 0; i < attributes.size(); i++) {
			sql << L"\t";
			writeIdentifier(sql, attributes[i].getAttributeName());
			sql << L" " << toSQLType(attributes[i].getTypeId());
			if (i < attributes.size() - 1) sql << L",\n";
		}
		sql << L"\n)";

		std::unique_ptr<PreparedStatement> createTable(connection->prepareStatement(sql.str()));
		createTable->executeUpdate();

		// prepare INSERT:
		sql = wstringstream();
		sql << L"INSERT INTO ";
		writeIdentifier(sql, name);
		sql << L" VALUES (";
		for (int i = 0; i < attributes.size(); i++) {
			sql << L"?";
			if (i < attributes.size() - 1) sql << L",";
		}
		sql << L")";
		currentInsert.reset(connection->prepareStatement(sql.str()));
	}

	void attribute(const void* value, const std::type_info& typeInfo) override {
		relpipe::reader::TypeId type = currentReaderMetadata[currentAttributeIndex].getTypeId();
		currentAttributeIndex++;

		switch (type) {
			case relpipe::reader::TypeId::BOOLEAN:
			{
				assert(typeInfo == typeid (boolean_t));
				auto* typedValue = static_cast<const boolean_t*> (value);
				currentInsert->setBoolean(currentAttributeIndex, *typedValue);
				break;
			}
			case relpipe::reader::TypeId::INTEGER:
			{
				assert(typeInfo == typeid (integer_t));
				auto* typedValue = static_cast<const integer_t*> (value);
				currentInsert->setInteger(currentAttributeIndex, *typedValue);
				break;
			}
			case relpipe::reader::TypeId::STRING:
			{
				assert(typeInfo == typeid (string_t));
				auto* typedValue = static_cast<const string_t*> (value);
				currentInsert->setString(currentAttributeIndex, *typedValue);
				break;
			}
			default:
				throw SqlException(L"Unsupported type in attribute()");
		}

		if (currentAttributeIndex % currentReaderMetadata.size() == 0) {
			currentInsert->executeUpdate();
			currentInsert->reset();
			currentAttributeIndex = 0;
		}
	}

	void endOfPipe() {
		// process optional SQL input
		processSqlInput(configuration.sqlBeforeRelational);

		// run the transformation – process all statements:
		for (const Statement& statement : configuration.statements) processStatement(statement);

		// process optional SQL input
		processSqlInput(configuration.sqlAfterRelational);

		// pass-through some relations:
		for (const CopyRelations& copy : configuration.copyRelations) copyRelations(copy);

		connection->commit();
	}

	static void listDataSources(writer::RelationalWriter* relationalWriter, DriverManager* driverManager) {
		relationalWriter->startRelation(L"data_source",{
			{L"name", writer::TypeId::STRING},
			{L"description", writer::TypeId::STRING}
		}, true);

		for (DriverManager::DataSource ds : driverManager->getDataSources()) {
			relationalWriter->writeAttribute(ds.name);
			relationalWriter->writeAttribute(ds.description);
		}
	}

};

}
}
}