/**
* Relational pipes
* Copyright © 2019 František Kučera (Frantovo.cz, GlobalCode.info)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <memory>
#include <string>
#include <sstream>
#include <regex>
#include <vector>
#include <locale>
#include <codecvt>
#include <unistd.h>
#include <cassert>
#include <sys/stat.h>
#include <relpipe/reader/typedefs.h>
#include <relpipe/reader/TypeId.h>
#include <relpipe/reader/handlers/RelationalReaderValueHandler.h>
#include <relpipe/reader/handlers/AttributeMetadata.h>
#include <relpipe/writer/Factory.h>
#include "Configuration.h"
#include "SqlException.h"
#include "SqlInputScanner.h"
#include "PreparedStatement.h"
#include "Connection.h"
#include "DriverManager.h"
namespace relpipe {
namespace tr {
namespace sql {
using namespace std;
using namespace relpipe;
using namespace relpipe::reader;
using namespace relpipe::reader::handlers;
class SqlHandler : public RelationalReaderValueHandler {
private:
Configuration configuration;
boolean_t fileAlreadyExisted = false;
writer::RelationalWriter* relationalWriter;
DriverManager* driverManager;
std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings
vector<AttributeMetadata> currentReaderMetadata;
integer_t currentAttributeIndex = 0;
std::unique_ptr<Connection> connection;
std::unique_ptr<PreparedStatement> currentInsert;
bool readNextSqlStatement(std::wistream* input, std::wstringstream* sql) {
sql->str(L"");
sql->clear();
SqlInputScanner scanner;
for (wchar_t ch; *input >> ch;) {
if (scanner.append(ch)) {
*sql << scanner.getAndReset().c_str();
return true;
}
}
string_t remainingSql = scanner.getAndReset();
for (wchar_t ch : remainingSql) if (ch != L' ' && ch != L'\n' && ch != L'\r' && ch != L'\t') throw SqlException(L"Unexpected EOF, missing „;“ after: „" + remainingSql + L"“");
return false;
}
void processSqlInput(std::wistream* input) {
if (input == nullptr) return;
*input >> std::ws >> std::noskipws;
for (std::wstringstream sql; readNextSqlStatement(input, &sql);) {
std::unique_ptr<PreparedStatement> prepared(connection->prepareStatement(convertor.to_bytes(sql.str()).c_str()));
while (prepared->next());
}
}
relpipe::writer::TypeId findType(string_t columnName, int columnIndex, const Statement& statement, std::shared_ptr<PreparedStatement> preparedStatement) {
for (TypeCast typeCast : statement.typeCasts) if (typeCast.name == columnName) return relationalWriter->toTypeId(typeCast.type);
return preparedStatement->getColumType(columnIndex);
}
void processStatement(const Statement& statement) {
std::shared_ptr<PreparedStatement> prepared(connection->prepareStatement(convertor.to_bytes(statement.sql).c_str()));
int columnCount = prepared->getColumnCount();
int parameterCount = statement.parameters.size();
for (int i = 0; i < parameterCount; i++) {
prepared->setString(i + 1, convertor.to_bytes(statement.parameters[i].value));
}
std::vector<relpipe::writer::AttributeMetadata> metadata;
for (int i = 0; i < columnCount; i++) {
string_t columnName = convertor.from_bytes(prepared->getColumName(i).c_str());
metadata.push_back({columnName, findType(columnName, i, statement, prepared)});
}
relationalWriter->startRelation(statement.relation, metadata, true);
while (prepared->next()) {
for (int i = 0; i < columnCount; i++) {
relationalWriter->writeAttribute(convertor.from_bytes(prepared->getString(i)));
}
}
}
std::vector<string_t> getAllRelations() {
std::vector<string_t> relations;
std::unique_ptr<PreparedStatement> prepared(connection->prepareStatement("SELECT name FROM sqlite_master WHERE type IN ('table', 'view')"));
while (prepared->next()) relations.push_back(convertor.from_bytes(prepared->getString(0)));
return relations;
}
void copyRelations(const CopyRelations& copy) {
std::wregex pattern(copy.pattern);
for (string_t relation : getAllRelations()) {
if (regex_match(relation, pattern)) {
std::wstringstream select;
select << L"SELECT * FROM ";
writeIdentifier(select, relation);
Statement statement;
statement.relation = copy.replace ? regex_replace(relation, pattern, copy.replacement) : relation;
statement.sql = select.str();
processStatement(statement);
}
}
}
relpipe::writer::string_t toSQLType(relpipe::reader::TypeId typeId) {
if (typeId == relpipe::reader::TypeId::BOOLEAN) return L"integer"; // TODO: map selected values back to booleans or allow optional storage as string
else if (typeId == relpipe::reader::TypeId::INTEGER) return L"integer";
else return L"text";
}
void writeIdentifier(std::wstringstream& output, relpipe::writer::string_t identifier) {
output << L'"';
for (auto & ch : identifier) {
if (ch == L'"') output << L"\"\"";
else output << ch;
}
output << L'"';
}
public:
SqlHandler(writer::RelationalWriter* relationalWriter, DriverManager* driverManager, Configuration& configuration) : relationalWriter(relationalWriter), driverManager(driverManager), configuration(configuration) {
std::string file;
if (configuration.file.size()) {
file = convertor.to_bytes(configuration.file);
// in C++17 we can use: std::filesystem::exists()
struct stat fileStat;
fileAlreadyExisted = (stat(file.c_str(), &fileStat) == 0);
} else {
file = ":memory:";
}
connection.reset(new Connection(file.c_str()));
connection->setAutoCommit(false);
}
virtual ~SqlHandler() {
}
void startRelation(string_t name, vector<AttributeMetadata> attributes) override {
currentReaderMetadata = attributes;
// CREATE TABLE:
std::wstringstream sql;
// TODO: if already exist just append new columns
sql << L"CREATE TABLE ";
writeIdentifier(sql, name);
sql << L" (\n";
for (int i = 0; i < attributes.size(); i++) {
sql << L"\t";
writeIdentifier(sql, attributes[i].getAttributeName());
sql << L" " << toSQLType(attributes[i].getTypeId());
if (i < attributes.size() - 1) sql << L",\n";
}
sql << L"\n)";
std::unique_ptr<PreparedStatement> createTable(connection->prepareStatement(convertor.to_bytes(sql.str()).c_str()));
createTable->next();
// prepare INSERT:
sql = wstringstream();
sql << L"INSERT INTO ";
writeIdentifier(sql, name);
sql << L" VALUES (";
for (int i = 0; i < attributes.size(); i++) {
sql << L"?";
if (i < attributes.size() - 1) sql << L",";
}
sql << L")";
currentInsert.reset(connection->prepareStatement(convertor.to_bytes(sql.str()).c_str()));
}
void attribute(const void* value, const std::type_info& typeInfo) override {
relpipe::reader::TypeId type = currentReaderMetadata[currentAttributeIndex].getTypeId();
currentAttributeIndex++;
switch (type) {
case relpipe::reader::TypeId::BOOLEAN:
{
assert(typeInfo == typeid (boolean_t));
auto* typedValue = static_cast<const boolean_t*> (value);
currentInsert->setBoolean(currentAttributeIndex, *typedValue);
break;
}
case relpipe::reader::TypeId::INTEGER:
{
assert(typeInfo == typeid (integer_t));
auto* typedValue = static_cast<const integer_t*> (value);
currentInsert->setInteger(currentAttributeIndex, *typedValue);
break;
}
case relpipe::reader::TypeId::STRING:
{
assert(typeInfo == typeid (string_t));
auto* typedValue = static_cast<const string_t*> (value);
currentInsert->setString(currentAttributeIndex, convertor.to_bytes(*typedValue).c_str());
break;
}
default:
throw SqlException(L"Unsupported type in attribute()");
}
if (currentAttributeIndex % currentReaderMetadata.size() == 0) {
currentInsert->next();
currentInsert->reset();
currentAttributeIndex = 0;
}
}
void endOfPipe() {
// process optional SQL input
processSqlInput(configuration.sqlBeforeRelational);
// run the transformation – process all statements:
for (const Statement& statement : configuration.statements) processStatement(statement);
// process optional SQL input
processSqlInput(configuration.sqlAfterRelational);
// pass-through some relations:
for (const CopyRelations& copy : configuration.copyRelations) copyRelations(copy);
connection->commit();
// delete or keep the file:
if (configuration.file.size()) {
if (configuration.keepFile == KeepFile::Never || (configuration.keepFile == KeepFile::Automatic && !fileAlreadyExisted)) {
std::wcerr << L"will unlink file" << std::endl;
int result = unlink(convertor.to_bytes(configuration.file).c_str());
if (result) throw SqlException(L"Unable to delete SQLite file.");
}
} // else: we had no file, everything was in memory
}
static void listDataSources(writer::RelationalWriter* relationalWriter, DriverManager* driverManager) {
relationalWriter->startRelation(L"data_source",{
{L"name", writer::TypeId::STRING},
{L"description", writer::TypeId::STRING}
}, true);
for (DriverManager::DataSource ds : driverManager->getDataSources()) {
relationalWriter->writeAttribute(ds.name);
relationalWriter->writeAttribute(ds.description);
}
}
};
}
}
}