# HG changeset patch # User František Kučera # Date 1650151888 -7200 # Node ID c0a1a008982c8ee301283558d07ff7faaef0930d # Parent 47ab821dd9ca2544aa778db9f5aa7da8ba213cdf first version of serializer diff -r 47ab821dd9ca -r c0a1a008982c src/SerializeHandler.h --- a/src/SerializeHandler.h Sat Apr 16 22:19:37 2022 +0200 +++ b/src/SerializeHandler.h Sun Apr 17 01:31:28 2022 +0200 @@ -18,6 +18,9 @@ #include #include +#include +#include +#include #include #include @@ -37,22 +40,78 @@ class SerializeHandler : public relpipe::reader::handlers::RelationalReaderStringHandler { private: Configuration configuration; - shared_ptr relationalWriter; + shared_ptr writer; + std::wstring_convert> convertor; + + class RelationContext { + public: + relpipe::common::type::StringX name; + std::vector readerMetadata; + std::vector writerMetadata; + } relationContext; + + class RecordContext { + public: + std::stringstream buffer; + shared_ptr writer; + size_t attributeIndex = 0; + } recordContext; + + relpipe::common::type::StringX toHex(const std::string& octets) { + std::stringstream hex; + hex << std::hex << std::setfill('0') << std::hex; + for (size_t i = 0, size = octets.size(); i < size; i++) hex << std::setw(2) << (0xff & octets[i]); + return convertor.from_bytes(hex.str()); + } + public: - SerializeHandler(shared_ptr relationalWriter, Configuration configuration) : relationalWriter(relationalWriter), configuration(configuration) { + SerializeHandler(shared_ptr writer, Configuration configuration) : writer(writer), configuration(configuration) { + // TODO: configurable relation name + // TODO: configurable attribute name + // TODO: optional custom attributes with constant value or ordinal number + // TODO: optional serialization of only certain relations? and certain fields? + // TODO: optional pass-through of certain relations? + // TODO: multiple modes? one output record per a) one input record, b) several input records, c) one input relation, … ? + + writer->startRelation(L"message",{ + {L"data", relpipe::writer::TypeId::STRING} // TODO: octet-string (when supported) instead of HEX + }, true); } virtual ~SerializeHandler() = default; void startRelation(relpipe::common::type::StringX name, std::vector attributes) override { + relationContext = RelationContext(); + + relationContext.name = name; + relationContext.readerMetadata = attributes; + + recordContext = RecordContext(); + recordContext.writer.reset(relpipe::writer::Factory::create(recordContext.buffer)); + + for (relpipe::reader::handlers::AttributeMetadata readerMetadata : attributes) { + relationContext.writerMetadata.push_back({readerMetadata.getAttributeName(), writer->toTypeId(readerMetadata.getTypeName())}); + } } void attribute(const relpipe::common::type::StringX& value) override { + if (recordContext.attributeIndex == 0) { + recordContext.writer->startRelation(relationContext.name, relationContext.writerMetadata, true); + } + recordContext.writer->writeAttribute(value); // TODO: typed values instead of strings + recordContext.attributeIndex++; + + if (recordContext.attributeIndex % relationContext.readerMetadata.size() == 0) { + writer->writeAttribute(toHex(recordContext.buffer.str())); + recordContext = RecordContext(); + recordContext.writer.reset(relpipe::writer::Factory::create(recordContext.buffer)); + } } void endOfPipe() { + // TODO: check number of attribute values } };