/**
* Relational pipes
* Copyright © 2019 František Kučera (Frantovo.cz, GlobalCode.info)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <memory>
#include <string>
#include <vector>
#include <locale>
#include <codecvt>
#include <sqlite3.h>
#include <relpipe/reader/typedefs.h>
#include <relpipe/reader/TypeId.h>
#include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
#include <relpipe/reader/handlers/AttributeMetadata.h>
#include <relpipe/writer/Factory.h>
#include "Configuration.h"
#include "SqlException.h"
namespace relpipe {
namespace tr {
namespace sql {
using namespace std;
using namespace relpipe;
using namespace relpipe::reader;
using namespace relpipe::reader::handlers;
class PreparedStatement {
private:
sqlite3_stmt* stmt;
public:
PreparedStatement(sqlite3_stmt* stmt) : stmt(stmt) {
}
virtual ~PreparedStatement() {
sqlite3_finalize(stmt);
}
void setString(int parameterIndex, std::string value) {
int result = sqlite3_bind_text(stmt, parameterIndex, value.c_str(), -1, SQLITE_TRANSIENT);
if (result != SQLITE_OK) throw SqlException(L"Unable to set SQLite parameter.");
}
void setNull(int parameterIndex) {
int result = sqlite3_bind_null(stmt, parameterIndex);
if (result != SQLITE_OK) throw SqlException(L"Unable to set SQLite parameter.");
}
bool next() {
int result = sqlite3_step(stmt);
if (result == SQLITE_ROW) return true;
else if (result == SQLITE_DONE) return false;
else throw SqlException(L"Error while iterating over SQLite result.");
}
int getColumnCount() {
return sqlite3_column_count(stmt);
}
std::string getColumName(int columnIndex) {
const char* name = sqlite3_column_name(stmt, columnIndex);
if (name) return name;
else throw SqlException(L"Unable to get SQLite column name.");
}
// TODO: sqlite3_column_type
std::string getString(int columnIndex) {
const char* value = (const char*) sqlite3_column_text(stmt, columnIndex);
return value ? value : ""; // TODO: support NULL values (when supported in relpipe format)
}
};
class Connection {
private:
sqlite3* db;
public:
Connection(const char* filename) {
int result = sqlite3_open(filename, &db);
if (result != SQLITE_OK) throw SqlException(L"Unable to open SQLite database.");
}
virtual ~Connection() {
sqlite3_close(db);
}
PreparedStatement prepareStatement(const char* sql) {
const char* remaining;
sqlite3_stmt *stmt;
int result = sqlite3_prepare(db, sql, -1, &stmt, &remaining);
if (result == SQLITE_OK) return PreparedStatement(stmt);
else throw SqlException(L"Unable to prepare SQLite statement.");
}
};
class SqlHandler : public RelationalReaderStringHandler {
private:
Configuration configuration;
writer::RelationalWriter* relationalWriter;
std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings
std::unique_ptr<Connection> connection;
void processStatement(const Statement& statement) {
PreparedStatement prepared = connection->prepareStatement(convertor.to_bytes(statement.sql).c_str());
int columnCount = prepared.getColumnCount();
int parameterCount = statement.parameters.size();
for (int i = 0; i < parameterCount; i++) {
prepared.setString(i + 1, convertor.to_bytes(statement.parameters[i].value));
}
std::vector<relpipe::writer::AttributeMetadata> metadata;
for (int i = 0; i < columnCount; i++) metadata.push_back({convertor.from_bytes(prepared.getColumName(i).c_str()), relpipe::writer::TypeId::STRING});
relationalWriter->startRelation(statement.relation, metadata, true);
while (prepared.next()) {
for (int i = 0; i < columnCount; i++) {
relationalWriter->writeAttribute(convertor.from_bytes(prepared.getString(i)));
}
}
}
public:
SqlHandler(writer::RelationalWriter* relationalWriter, Configuration& configuration) : relationalWriter(relationalWriter), configuration(configuration) {
connection.reset(new Connection(":memory:"));
}
virtual ~SqlHandler() {
}
void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
}
void attribute(const string_t& value) override {
}
void endOfPipe() {
for (const Statement& statement : configuration.statements) processStatement(statement);
}
};
}
}
}