# HG changeset patch # User František Kučera # Date 1650403035 -7200 # Node ID d53041bb781bed074086221d6628d3d90d8f216c # Parent 7e986fcf0d8fda932d207222a89a363b55ffe588 first version of deserializer diff -r 7e986fcf0d8f -r d53041bb781b nbproject/configurations.xml --- a/nbproject/configurations.xml Sun Apr 17 21:24:32 2022 +0200 +++ b/nbproject/configurations.xml Tue Apr 19 23:17:15 2022 +0200 @@ -42,6 +42,7 @@ + UnionAllHandler.h relpipe-tr-deserialize.cpp @@ -93,6 +94,8 @@ true + + @@ -133,6 +136,8 @@ true + + diff -r 7e986fcf0d8f -r d53041bb781b src/DeserializeHandler.h --- a/src/DeserializeHandler.h Sun Apr 17 21:24:32 2022 +0200 +++ b/src/DeserializeHandler.h Tue Apr 19 23:17:15 2022 +0200 @@ -32,6 +32,7 @@ #include #include "Configuration.h" +#include "UnionAllHandler.h" namespace relpipe { namespace tr { @@ -41,18 +42,84 @@ private: Configuration configuration; shared_ptr writer; + std::wstring_convert> convertor; + UnionAllHandler unionAllHandler; + class RelationContext { + public: + relpipe::common::type::StringX name; + std::vector readerMetadata; + std::vector writerMetadata; + } relationContext; + + class RecordContext { + public: + std::stringstream buffer; + size_t attributeIndex = 0; + } recordContext; + + char fromHex(wchar_t ch) { + if (L'0' <= ch && ch <= L'9') return ch - L'0'; + else if (L'a' <= ch && ch <= L'f') return ch - L'a' + 10; + else throw relpipe::reader::RelpipeReaderException(L"Unable to decode hexadeximal string."); + } + + std::stringstream fromHex(const relpipe::common::type::StringX& hex) { + std::stringstream octets; + + char octet = 0; + + for (size_t i = 0, limit = hex.size(); i < limit; i++) { + if (i % 2 == 0) { + octet = fromHex(hex[i]) << 4; + } else { + octet += fromHex(hex[i]); + octets.put(octet); + } + } + + return octets; + } + public: - DeserializeHandler(shared_ptr writer, Configuration configuration) : writer(writer), configuration(configuration) { + DeserializeHandler(shared_ptr writer, Configuration configuration) : writer(writer), configuration(configuration), unionAllHandler(writer, configuration) { + // TODO: configurable relation name? + // TODO: configurable attribute name? + // TODO: optional custom attributes with constant value or ordinal number? + // TODO: optional deserialization of only certain relations? and certain fields? + // TODO: optional pass-through of certain relations? } virtual ~DeserializeHandler() = default; void startRelation(relpipe::common::type::StringX name, std::vector attributes) override { + relationContext = RelationContext(); + + relationContext.name = name; + relationContext.readerMetadata = attributes; + + for (relpipe::reader::handlers::AttributeMetadata readerMetadata : attributes) { + relationContext.writerMetadata.push_back({readerMetadata.getAttributeName(), writer->toTypeId(readerMetadata.getTypeName())}); + } } void attribute(const relpipe::common::type::StringX& value) override { + auto attributeName = relationContext.readerMetadata[recordContext.attributeIndex].getAttributeName(); + + if (recordContext.attributeIndex == 0) recordContext = RecordContext(); + + if (attributeName == L"data") recordContext.buffer = fromHex(value); + + recordContext.attributeIndex++; + + if (recordContext.attributeIndex % relationContext.readerMetadata.size() == 0) { + std::stringstream octets = fromHex(value); // TODO: skip this step once we have octet-string data type + typed values instead of strings + std::shared_ptr reader(relpipe::reader::Factory::create(octets)); + reader->addHandler(&unionAllHandler); + reader->process(); + recordContext.attributeIndex = 0; + } } void endOfPipe() { diff -r 7e986fcf0d8f -r d53041bb781b src/UnionAllHandler.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/UnionAllHandler.h Tue Apr 19 23:17:15 2022 +0200 @@ -0,0 +1,111 @@ +/** + * Relational pipes + * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include + +#include "Configuration.h" + +namespace relpipe { +namespace tr { +namespace deserialize { + +/** + * Forwards all attributes to given writer. + * If an relation has same name and attributes as the previous one, records are just appended to it. + * If name or attributes differ, new relation is started and records are written to it. + * + * So it does something like UNION ALL in SQL for sequences of relations that have same name and attributes. + */ +class UnionAllHandler : public relpipe::reader::handlers::RelationalReaderValueHandler { +private: + Configuration configuration; + shared_ptr writer; + + class RelationContext { + public: + relpipe::common::type::StringX name; + std::vector readerMetadata; + std::vector writerMetadata; + } relationContext; + + void convert(const std::vector& r, std::vector& w) { + w.clear(); + for (const relpipe::reader::handlers::AttributeMetadata& a : r) { + w.push_back({a.getAttributeName(), writer->toTypeId(a.getTypeName())}); + } + } + + bool equals(const std::vector& a, const std::vector& b) { + if (a.size() != b.size()) return false; + + for (size_t i = 0, limit = a.size(); i < limit; i++) { + if (a[i].getAttributeName() != b[i].getAttributeName()) return false; + if (a[i].getTypeId() != b[i].getTypeId()) return false; + } + + return true; + } + +public: + + UnionAllHandler(shared_ptr writer, Configuration configuration) : writer(writer), configuration(configuration) { + } + + virtual ~UnionAllHandler() = default; + + void startRelation(relpipe::common::type::StringX name, std::vector attributes) override { + + bool continuation = name == relationContext.name && equals(attributes, relationContext.readerMetadata); + + relationContext = RelationContext(); + relationContext.name = name; + relationContext.readerMetadata = attributes; + + if (continuation) { + // same relation → just append records + } else { + convert(attributes, relationContext.writerMetadata); + writer->startRelation(name, relationContext.writerMetadata, true); + } + } + + void attribute(const void* value, const std::type_info& type) override { + writer->writeAttribute(value, type); + } + + void endOfPipe() { + } + +}; + +} +} +}