--- a/nbproject/configurations.xml Sun Apr 17 21:24:32 2022 +0200
+++ b/nbproject/configurations.xml Tue Apr 19 23:17:15 2022 +0200
@@ -42,6 +42,7 @@
<logicalFolder name="root" displayName="root" projectFiles="true" kind="ROOT">
<df root="." name="0">
<df name="src">
+ <in>UnionAllHandler.h</in>
<in>relpipe-tr-deserialize.cpp</in>
</df>
</df>
@@ -93,6 +94,8 @@
<preBuildFirst>true</preBuildFirst>
</preBuild>
</makefileType>
+ <item path="src/UnionAllHandler.h" ex="false" tool="3" flavor2="0">
+ </item>
<item path="src/relpipe-tr-deserialize.cpp" ex="false" tool="1" flavor2="0">
<ccTool flags="0">
</ccTool>
@@ -133,6 +136,8 @@
<preBuildFirst>true</preBuildFirst>
</preBuild>
</makefileType>
+ <item path="src/UnionAllHandler.h" ex="false" tool="3" flavor2="0">
+ </item>
<item path="src/relpipe-tr-deserialize.cpp" ex="false" tool="1" flavor2="0">
<ccTool flags="0">
</ccTool>
--- a/src/DeserializeHandler.h Sun Apr 17 21:24:32 2022 +0200
+++ b/src/DeserializeHandler.h Tue Apr 19 23:17:15 2022 +0200
@@ -32,6 +32,7 @@
#include <relpipe/cli/RelpipeCLIException.h>
#include "Configuration.h"
+#include "UnionAllHandler.h"
namespace relpipe {
namespace tr {
@@ -41,18 +42,84 @@
private:
Configuration configuration;
shared_ptr<relpipe::writer::RelationalWriter> writer;
+ std::wstring_convert<codecvt_utf8<wchar_t>> convertor;
+ UnionAllHandler unionAllHandler;
+ class RelationContext {
+ public:
+ relpipe::common::type::StringX name;
+ std::vector<relpipe::reader::handlers::AttributeMetadata> readerMetadata;
+ std::vector<relpipe::writer::AttributeMetadata> writerMetadata;
+ } relationContext;
+
+ class RecordContext {
+ public:
+ std::stringstream buffer;
+ size_t attributeIndex = 0;
+ } recordContext;
+
+ char fromHex(wchar_t ch) {
+ if (L'0' <= ch && ch <= L'9') return ch - L'0';
+ else if (L'a' <= ch && ch <= L'f') return ch - L'a' + 10;
+ else throw relpipe::reader::RelpipeReaderException(L"Unable to decode hexadeximal string.");
+ }
+
+ std::stringstream fromHex(const relpipe::common::type::StringX& hex) {
+ std::stringstream octets;
+
+ char octet = 0;
+
+ for (size_t i = 0, limit = hex.size(); i < limit; i++) {
+ if (i % 2 == 0) {
+ octet = fromHex(hex[i]) << 4;
+ } else {
+ octet += fromHex(hex[i]);
+ octets.put(octet);
+ }
+ }
+
+ return octets;
+ }
+
public:
- DeserializeHandler(shared_ptr<relpipe::writer::RelationalWriter> writer, Configuration configuration) : writer(writer), configuration(configuration) {
+ DeserializeHandler(shared_ptr<relpipe::writer::RelationalWriter> writer, Configuration configuration) : writer(writer), configuration(configuration), unionAllHandler(writer, configuration) {
+ // TODO: configurable relation name?
+ // TODO: configurable attribute name?
+ // TODO: optional custom attributes with constant value or ordinal number?
+ // TODO: optional deserialization of only certain relations? and certain fields?
+ // TODO: optional pass-through of certain relations?
}
virtual ~DeserializeHandler() = default;
void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
+ relationContext = RelationContext();
+
+ relationContext.name = name;
+ relationContext.readerMetadata = attributes;
+
+ for (relpipe::reader::handlers::AttributeMetadata readerMetadata : attributes) {
+ relationContext.writerMetadata.push_back({readerMetadata.getAttributeName(), writer->toTypeId(readerMetadata.getTypeName())});
+ }
}
void attribute(const relpipe::common::type::StringX& value) override {
+ auto attributeName = relationContext.readerMetadata[recordContext.attributeIndex].getAttributeName();
+
+ if (recordContext.attributeIndex == 0) recordContext = RecordContext();
+
+ if (attributeName == L"data") recordContext.buffer = fromHex(value);
+
+ recordContext.attributeIndex++;
+
+ if (recordContext.attributeIndex % relationContext.readerMetadata.size() == 0) {
+ std::stringstream octets = fromHex(value); // TODO: skip this step once we have octet-string data type + typed values instead of strings
+ std::shared_ptr<relpipe::reader::RelationalReader> reader(relpipe::reader::Factory::create(octets));
+ reader->addHandler(&unionAllHandler);
+ reader->process();
+ recordContext.attributeIndex = 0;
+ }
}
void endOfPipe() {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/UnionAllHandler.h Tue Apr 19 23:17:15 2022 +0200
@@ -0,0 +1,111 @@
+/**
+ * Relational pipes
+ * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, version 3 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#pragma once
+
+#include <regex>
+#include <stdexcept>
+#include <sstream>
+#include <codecvt>
+#include <iomanip>
+
+#include <relpipe/common/type/typedefs.h>
+#include <relpipe/reader/TypeId.h>
+#include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
+#include <relpipe/reader/handlers/AttributeMetadata.h>
+
+#include <relpipe/writer/Factory.h>
+
+#include <relpipe/cli/RelpipeCLIException.h>
+
+#include "Configuration.h"
+
+namespace relpipe {
+namespace tr {
+namespace deserialize {
+
+/**
+ * Forwards all attributes to given writer.
+ * If an relation has same name and attributes as the previous one, records are just appended to it.
+ * If name or attributes differ, new relation is started and records are written to it.
+ *
+ * So it does something like UNION ALL in SQL for sequences of relations that have same name and attributes.
+ */
+class UnionAllHandler : public relpipe::reader::handlers::RelationalReaderValueHandler {
+private:
+ Configuration configuration;
+ shared_ptr<relpipe::writer::RelationalWriter> writer;
+
+ class RelationContext {
+ public:
+ relpipe::common::type::StringX name;
+ std::vector<relpipe::reader::handlers::AttributeMetadata> readerMetadata;
+ std::vector<relpipe::writer::AttributeMetadata> writerMetadata;
+ } relationContext;
+
+ void convert(const std::vector<relpipe::reader::handlers::AttributeMetadata>& r, std::vector<relpipe::writer::AttributeMetadata>& w) {
+ w.clear();
+ for (const relpipe::reader::handlers::AttributeMetadata& a : r) {
+ w.push_back({a.getAttributeName(), writer->toTypeId(a.getTypeName())});
+ }
+ }
+
+ bool equals(const std::vector<relpipe::reader::handlers::AttributeMetadata>& a, const std::vector<relpipe::reader::handlers::AttributeMetadata>& b) {
+ if (a.size() != b.size()) return false;
+
+ for (size_t i = 0, limit = a.size(); i < limit; i++) {
+ if (a[i].getAttributeName() != b[i].getAttributeName()) return false;
+ if (a[i].getTypeId() != b[i].getTypeId()) return false;
+ }
+
+ return true;
+ }
+
+public:
+
+ UnionAllHandler(shared_ptr<relpipe::writer::RelationalWriter> writer, Configuration configuration) : writer(writer), configuration(configuration) {
+ }
+
+ virtual ~UnionAllHandler() = default;
+
+ void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
+
+ bool continuation = name == relationContext.name && equals(attributes, relationContext.readerMetadata);
+
+ relationContext = RelationContext();
+ relationContext.name = name;
+ relationContext.readerMetadata = attributes;
+
+ if (continuation) {
+ // same relation → just append records
+ } else {
+ convert(attributes, relationContext.writerMetadata);
+ writer->startRelation(name, relationContext.writerMetadata, true);
+ }
+ }
+
+ void attribute(const void* value, const std::type_info& type) override {
+ writer->writeAttribute(value, type);
+ }
+
+ void endOfPipe() {
+ }
+
+};
+
+}
+}
+}