--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/UnionAllHandler.h Tue Apr 19 23:17:15 2022 +0200
@@ -0,0 +1,111 @@
+/**
+ * Relational pipes
+ * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, version 3 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#pragma once
+
+#include <regex>
+#include <stdexcept>
+#include <sstream>
+#include <codecvt>
+#include <iomanip>
+
+#include <relpipe/common/type/typedefs.h>
+#include <relpipe/reader/TypeId.h>
+#include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
+#include <relpipe/reader/handlers/AttributeMetadata.h>
+
+#include <relpipe/writer/Factory.h>
+
+#include <relpipe/cli/RelpipeCLIException.h>
+
+#include "Configuration.h"
+
+namespace relpipe {
+namespace tr {
+namespace deserialize {
+
+/**
+ * Forwards all attributes to given writer.
+ * If an relation has same name and attributes as the previous one, records are just appended to it.
+ * If name or attributes differ, new relation is started and records are written to it.
+ *
+ * So it does something like UNION ALL in SQL for sequences of relations that have same name and attributes.
+ */
+class UnionAllHandler : public relpipe::reader::handlers::RelationalReaderValueHandler {
+private:
+ Configuration configuration;
+ shared_ptr<relpipe::writer::RelationalWriter> writer;
+
+ class RelationContext {
+ public:
+ relpipe::common::type::StringX name;
+ std::vector<relpipe::reader::handlers::AttributeMetadata> readerMetadata;
+ std::vector<relpipe::writer::AttributeMetadata> writerMetadata;
+ } relationContext;
+
+ void convert(const std::vector<relpipe::reader::handlers::AttributeMetadata>& r, std::vector<relpipe::writer::AttributeMetadata>& w) {
+ w.clear();
+ for (const relpipe::reader::handlers::AttributeMetadata& a : r) {
+ w.push_back({a.getAttributeName(), writer->toTypeId(a.getTypeName())});
+ }
+ }
+
+ bool equals(const std::vector<relpipe::reader::handlers::AttributeMetadata>& a, const std::vector<relpipe::reader::handlers::AttributeMetadata>& b) {
+ if (a.size() != b.size()) return false;
+
+ for (size_t i = 0, limit = a.size(); i < limit; i++) {
+ if (a[i].getAttributeName() != b[i].getAttributeName()) return false;
+ if (a[i].getTypeId() != b[i].getTypeId()) return false;
+ }
+
+ return true;
+ }
+
+public:
+
+ UnionAllHandler(shared_ptr<relpipe::writer::RelationalWriter> writer, Configuration configuration) : writer(writer), configuration(configuration) {
+ }
+
+ virtual ~UnionAllHandler() = default;
+
+ void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
+
+ bool continuation = name == relationContext.name && equals(attributes, relationContext.readerMetadata);
+
+ relationContext = RelationContext();
+ relationContext.name = name;
+ relationContext.readerMetadata = attributes;
+
+ if (continuation) {
+ // same relation → just append records
+ } else {
+ convert(attributes, relationContext.writerMetadata);
+ writer->startRelation(name, relationContext.writerMetadata, true);
+ }
+ }
+
+ void attribute(const void* value, const std::type_info& type) override {
+ writer->writeAttribute(value, type);
+ }
+
+ void endOfPipe() {
+ }
+
+};
+
+}
+}
+}