author František Kučera <>
Tue, 22 Oct 2019 16:05:36 +0200
changeset 13 19580b27ade2
parent 12 0b38339b871b
child 14 eacacf060755
permissions -rw-r--r--
relpipe-in-sql mode: read .sqlite file and generate relational data

 * Relational pipes
 * Copyright © 2019 František Kučera (,
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, version 3.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <>.
#pragma once

#include <memory>
#include <string>
#include <sstream>
#include <vector>
#include <locale>
#include <codecvt>
#include <unistd.h>
#include <cassert>
#include <sys/stat.h>

#include <sqlite3.h>

#include <relpipe/reader/typedefs.h>
#include <relpipe/reader/TypeId.h>
#include <relpipe/reader/handlers/RelationalReaderValueHandler.h>
#include <relpipe/reader/handlers/AttributeMetadata.h>

#include <relpipe/writer/Factory.h>

#include "Configuration.h"
#include "SqlException.h"

namespace relpipe {
namespace tr {
namespace sql {

using namespace std;
using namespace relpipe;
using namespace relpipe::reader;
using namespace relpipe::reader::handlers;

class PreparedStatement {
	sqlite3_stmt* stmt;


	PreparedStatement(sqlite3_stmt* stmt) : stmt(stmt) {

	virtual ~PreparedStatement() {

	void setBoolean(int parameterIndex, relpipe::reader::boolean_t value) {
		int result = sqlite3_bind_int(stmt, parameterIndex, value);
		if (result != SQLITE_OK) throw SqlException(L"Unable to set SQLite parameter.");

	void setInteger(int parameterIndex, relpipe::reader::integer_t value) {
		int result = sqlite3_bind_int64(stmt, parameterIndex, value);
		if (result != SQLITE_OK) throw SqlException(L"Unable to set SQLite parameter.");

	void setString(int parameterIndex, std::string value) {
		int result = sqlite3_bind_text(stmt, parameterIndex, value.c_str(), -1, SQLITE_TRANSIENT);
		if (result != SQLITE_OK) throw SqlException(L"Unable to set SQLite parameter.");

	void setNull(int parameterIndex) {
		int result = sqlite3_bind_null(stmt, parameterIndex);
		if (result != SQLITE_OK) throw SqlException(L"Unable to set SQLite parameter.");

	bool next() {
		int result = sqlite3_step(stmt);
		if (result == SQLITE_ROW) return true;
		else if (result == SQLITE_DONE) return false;
		else throw SqlException(L"Error while iterating over SQLite result.");

	void reset() {
		int result = sqlite3_reset(stmt);
		if (result != SQLITE_OK) throw SqlException(L"Unable to reset SQLite prepared statement.");

	int getColumnCount() {
		return sqlite3_column_count(stmt);

	std::string getColumName(int columnIndex) {
		const char* name = sqlite3_column_name(stmt, columnIndex);
		if (name) return name;
		else throw SqlException(L"Unable to get SQLite column name.");

	// TODO: sqlite3_column_type

	std::string getString(int columnIndex) {
		const char* value = (const char*) sqlite3_column_text(stmt, columnIndex);
		return value ? value : ""; // TODO: support NULL values (when supported in relpipe format)


class Connection {
	sqlite3* db;

	Connection(const char* filename) {
		int result = sqlite3_open(filename, &db);
		if (result != SQLITE_OK) {
			throw SqlException(L"Unable to open SQLite database.");

	virtual ~Connection() {

	PreparedStatement* prepareStatement(const char* sql) {
		const char* remaining;
		sqlite3_stmt *stmt;
		int result = sqlite3_prepare(db, sql, -1, &stmt, &remaining);
		if (result == SQLITE_OK) return new PreparedStatement(stmt);
		else throw SqlException(L"Unable to prepare SQLite statement.");


class SqlHandler : public RelationalReaderValueHandler {
	Configuration configuration;
	boolean_t fileAlreadyExisted = false;
	writer::RelationalWriter* relationalWriter;
	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings
	vector<AttributeMetadata> currentReaderMetadata;
	integer_t currentAttributeIndex = 0;
	std::unique_ptr<Connection> connection;
	std::unique_ptr<PreparedStatement> currentInsert;

	void processStatement(const Statement& statement) {
		std::unique_ptr<PreparedStatement> prepared(connection->prepareStatement(convertor.to_bytes(statement.sql).c_str()));
		int columnCount = prepared->getColumnCount();
		int parameterCount = statement.parameters.size();

		for (int i = 0; i < parameterCount; i++) {
			prepared->setString(i + 1, convertor.to_bytes(statement.parameters[i].value));

		std::vector<relpipe::writer::AttributeMetadata> metadata;
		// TODO: support also other data types
		for (int i = 0; i < columnCount; i++) metadata.push_back({convertor.from_bytes(prepared->getColumName(i).c_str()), relpipe::writer::TypeId::STRING});
		relationalWriter->startRelation(statement.relation, metadata, true);

		while (prepared->next()) {
			for (int i = 0; i < columnCount; i++) {

	relpipe::writer::string_t toSQLType(relpipe::reader::TypeId typeId) {
		if (typeId == relpipe::reader::TypeId::BOOLEAN) return L"integer"; // TODO: map selected values back to booleans or allow optional storage as string 
		else if (typeId == relpipe::reader::TypeId::INTEGER) return L"integer";
		else return L"text";

	void writeIdentifier(std::wstringstream& output, relpipe::writer::string_t identifier) {
		output << L'"';
		for (auto & ch : identifier) {
			if (ch == L'"') output << L"\"\"";
			else output << ch;
		output << L'"';


	SqlHandler(writer::RelationalWriter* relationalWriter, Configuration& configuration) : relationalWriter(relationalWriter), configuration(configuration) {
		std::string file;
		if (configuration.file.size()) {
			file = convertor.to_bytes(configuration.file);

			// in C++17 we can use: std::filesystem::exists()
			struct stat fileStat;
			fileAlreadyExisted = (stat(file.c_str(), &fileStat) == 0);
		} else {
			file = ":memory:";

		connection.reset(new Connection(file.c_str()));

	virtual ~SqlHandler() {

	void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
		currentReaderMetadata = attributes;

		std::wstringstream sql;
		// TODO: if already exist just append new columns
		sql << L"CREATE TABLE ";
		writeIdentifier(sql, name);
		sql << L" (\n";
		for (int i = 0; i < attributes.size(); i++) {
			sql << L"\t";
			writeIdentifier(sql, attributes[i].getAttributeName());
			sql << L" " << toSQLType(attributes[i].getTypeId());
			if (i < attributes.size() - 1) sql << L",\n";
		sql << L"\n)";

		std::unique_ptr<PreparedStatement> createTable(connection->prepareStatement(convertor.to_bytes(sql.str()).c_str()));

		// prepare INSERT:
		sql = wstringstream();
		sql << L"INSERT INTO ";
		writeIdentifier(sql, name);
		sql << L" VALUES (";
		for (int i = 0; i < attributes.size(); i++) {
			sql << L"?";
			if (i < attributes.size() - 1) sql << L",";
		sql << L")";

	void attribute(const void* value, const std::type_info& typeInfo) override {
		relpipe::reader::TypeId type = currentReaderMetadata[currentAttributeIndex].getTypeId();

		switch (type) {
			case relpipe::reader::TypeId::BOOLEAN:
				assert(typeInfo == typeid (boolean_t));
				auto* typedValue = static_cast<const boolean_t*> (value);
				currentInsert->setBoolean(currentAttributeIndex, *typedValue);
			case relpipe::reader::TypeId::INTEGER:
				assert(typeInfo == typeid (integer_t));
				auto* typedValue = static_cast<const integer_t*> (value);
				currentInsert->setInteger(currentAttributeIndex, *typedValue);
			case relpipe::reader::TypeId::STRING:
				assert(typeInfo == typeid (string_t));
				auto* typedValue = static_cast<const string_t*> (value);
				currentInsert->setString(currentAttributeIndex, convertor.to_bytes(*typedValue).c_str());
				throw SqlException(L"Unsupported type in attribute()");

		if (currentAttributeIndex % currentReaderMetadata.size() == 0) {
			currentAttributeIndex = 0;

	void endOfPipe() {
		// run the transformation – process all statements:
		for (const Statement& statement : configuration.statements) processStatement(statement);

		// delete or keep the file:
		if (configuration.file.size()) {
			if (configuration.keepFile == KeepFile::Never || (configuration.keepFile == KeepFile::Automatic && !fileAlreadyExisted)) {
				std::wcerr << L"will unlink file" << std::endl;
				int result = unlink(convertor.to_bytes(configuration.file).c_str());
				if (result) throw SqlException(L"Unable to delete SQLite file.");
		} // else: we had no file, everything was in memory

