diff -r 7e986fcf0d8f -r d53041bb781b src/UnionAllHandler.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/UnionAllHandler.h Tue Apr 19 23:17:15 2022 +0200 @@ -0,0 +1,111 @@ +/** + * Relational pipes + * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include + +#include "Configuration.h" + +namespace relpipe { +namespace tr { +namespace deserialize { + +/** + * Forwards all attributes to given writer. + * If an relation has same name and attributes as the previous one, records are just appended to it. + * If name or attributes differ, new relation is started and records are written to it. + * + * So it does something like UNION ALL in SQL for sequences of relations that have same name and attributes. + */ +class UnionAllHandler : public relpipe::reader::handlers::RelationalReaderValueHandler { +private: + Configuration configuration; + shared_ptr writer; + + class RelationContext { + public: + relpipe::common::type::StringX name; + std::vector readerMetadata; + std::vector writerMetadata; + } relationContext; + + void convert(const std::vector& r, std::vector& w) { + w.clear(); + for (const relpipe::reader::handlers::AttributeMetadata& a : r) { + w.push_back({a.getAttributeName(), writer->toTypeId(a.getTypeName())}); + } + } + + bool equals(const std::vector& a, const std::vector& b) { + if (a.size() != b.size()) return false; + + for (size_t i = 0, limit = a.size(); i < limit; i++) { + if (a[i].getAttributeName() != b[i].getAttributeName()) return false; + if (a[i].getTypeId() != b[i].getTypeId()) return false; + } + + return true; + } + +public: + + UnionAllHandler(shared_ptr writer, Configuration configuration) : writer(writer), configuration(configuration) { + } + + virtual ~UnionAllHandler() = default; + + void startRelation(relpipe::common::type::StringX name, std::vector attributes) override { + + bool continuation = name == relationContext.name && equals(attributes, relationContext.readerMetadata); + + relationContext = RelationContext(); + relationContext.name = name; + relationContext.readerMetadata = attributes; + + if (continuation) { + // same relation → just append records + } else { + convert(attributes, relationContext.writerMetadata); + writer->startRelation(name, relationContext.writerMetadata, true); + } + } + + void attribute(const void* value, const std::type_info& type) override { + writer->writeAttribute(value, type); + } + + void endOfPipe() { + } + +}; + +} +} +}