/**
* Relational pipes
* Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <regex>
#include <stdexcept>
#include <sstream>
#include <codecvt>
#include <iomanip>
#include <relpipe/common/type/typedefs.h>
#include <relpipe/reader/TypeId.h>
#include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
#include <relpipe/reader/handlers/AttributeMetadata.h>
#include <relpipe/writer/Factory.h>
#include <relpipe/cli/RelpipeCLIException.h>
#include "Configuration.h"
namespace relpipe {
namespace tr {
namespace deserialize {
/**
* Forwards all attributes to given writer.
* If an relation has same name and attributes as the previous one, records are just appended to it.
* If name or attributes differ, new relation is started and records are written to it.
*
* So it does something like UNION ALL in SQL for sequences of relations that have same name and attributes.
*/
class UnionAllHandler : public relpipe::reader::handlers::RelationalReaderValueHandler {
private:
Configuration configuration;
shared_ptr<relpipe::writer::RelationalWriter> writer;
class RelationContext {
public:
relpipe::common::type::StringX name;
std::vector<relpipe::reader::handlers::AttributeMetadata> readerMetadata;
std::vector<relpipe::writer::AttributeMetadata> writerMetadata;
} relationContext;
void convert(const std::vector<relpipe::reader::handlers::AttributeMetadata>& r, std::vector<relpipe::writer::AttributeMetadata>& w) {
w.clear();
for (const relpipe::reader::handlers::AttributeMetadata& a : r) {
w.push_back({a.getAttributeName(), writer->toTypeId(a.getTypeName())});
}
}
bool equals(const std::vector<relpipe::reader::handlers::AttributeMetadata>& a, const std::vector<relpipe::reader::handlers::AttributeMetadata>& b) {
if (a.size() != b.size()) return false;
for (size_t i = 0, limit = a.size(); i < limit; i++) {
if (a[i].getAttributeName() != b[i].getAttributeName()) return false;
if (a[i].getTypeId() != b[i].getTypeId()) return false;
}
return true;
}
public:
UnionAllHandler(shared_ptr<relpipe::writer::RelationalWriter> writer, Configuration configuration) : writer(writer), configuration(configuration) {
}
virtual ~UnionAllHandler() = default;
void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
bool continuation = name == relationContext.name && equals(attributes, relationContext.readerMetadata);
relationContext = RelationContext();
relationContext.name = name;
relationContext.readerMetadata = attributes;
if (continuation) {
// same relation → just append records
} else {
convert(attributes, relationContext.writerMetadata);
writer->startRelation(name, relationContext.writerMetadata, true);
}
}
void attribute(const void* value, const std::type_info& type) override {
writer->writeAttribute(value, type);
}
void endOfPipe() {
}
};
}
}
}