src/UnionAllHandler.h
branchv_0
changeset 1 d53041bb781b
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/UnionAllHandler.h	Tue Apr 19 23:17:15 2022 +0200
@@ -0,0 +1,111 @@
+/**
+ * Relational pipes
+ * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, version 3 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#pragma once
+
+#include <regex>
+#include <stdexcept>
+#include <sstream>
+#include <codecvt>
+#include <iomanip>
+
+#include <relpipe/common/type/typedefs.h>
+#include <relpipe/reader/TypeId.h>
+#include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
+#include <relpipe/reader/handlers/AttributeMetadata.h>
+
+#include <relpipe/writer/Factory.h>
+
+#include <relpipe/cli/RelpipeCLIException.h>
+
+#include "Configuration.h"
+
+namespace relpipe {
+namespace tr {
+namespace deserialize {
+
+/**
+ * Forwards all attributes to given writer.
+ * If an relation has same name and attributes as the previous one, records are just appended to it.
+ * If name or attributes differ, new relation is started and records are written to it.
+ * 
+ * So it does something like UNION ALL in SQL for sequences of relations that have same name and attributes.
+ */
+class UnionAllHandler : public relpipe::reader::handlers::RelationalReaderValueHandler {
+private:
+	Configuration configuration;
+	shared_ptr<relpipe::writer::RelationalWriter> writer;
+
+	class RelationContext {
+	public:
+		relpipe::common::type::StringX name;
+		std::vector<relpipe::reader::handlers::AttributeMetadata> readerMetadata;
+		std::vector<relpipe::writer::AttributeMetadata> writerMetadata;
+	} relationContext;
+
+	void convert(const std::vector<relpipe::reader::handlers::AttributeMetadata>& r, std::vector<relpipe::writer::AttributeMetadata>& w) {
+		w.clear();
+		for (const relpipe::reader::handlers::AttributeMetadata& a : r) {
+			w.push_back({a.getAttributeName(), writer->toTypeId(a.getTypeName())});
+		}
+	}
+
+	bool equals(const std::vector<relpipe::reader::handlers::AttributeMetadata>& a, const std::vector<relpipe::reader::handlers::AttributeMetadata>& b) {
+		if (a.size() != b.size()) return false;
+
+		for (size_t i = 0, limit = a.size(); i < limit; i++) {
+			if (a[i].getAttributeName() != b[i].getAttributeName()) return false;
+			if (a[i].getTypeId() != b[i].getTypeId()) return false;
+		}
+
+		return true;
+	}
+
+public:
+
+	UnionAllHandler(shared_ptr<relpipe::writer::RelationalWriter> writer, Configuration configuration) : writer(writer), configuration(configuration) {
+	}
+
+	virtual ~UnionAllHandler() = default;
+
+	void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
+
+		bool continuation = name == relationContext.name && equals(attributes, relationContext.readerMetadata);
+
+		relationContext = RelationContext();
+		relationContext.name = name;
+		relationContext.readerMetadata = attributes;
+
+		if (continuation) {
+			// same relation → just append records
+		} else {
+			convert(attributes, relationContext.writerMetadata);
+			writer->startRelation(name, relationContext.writerMetadata, true);
+		}
+	}
+
+	void attribute(const void* value, const std::type_info& type) override {
+		writer->writeAttribute(value, type);
+	}
+
+	void endOfPipe() {
+	}
+
+};
+
+}
+}
+}