src/SerializeHandler.h
branchv_0
changeset 1 c0a1a008982c
parent 0 47ab821dd9ca
child 2 d2ba14aa4e20
equal deleted inserted replaced
0:47ab821dd9ca 1:c0a1a008982c
    16  */
    16  */
    17 #pragma once
    17 #pragma once
    18 
    18 
    19 #include <regex>
    19 #include <regex>
    20 #include <stdexcept>
    20 #include <stdexcept>
       
    21 #include <sstream>
       
    22 #include <codecvt>
       
    23 #include <iomanip>
    21 
    24 
    22 #include <relpipe/common/type/typedefs.h>
    25 #include <relpipe/common/type/typedefs.h>
    23 #include <relpipe/reader/TypeId.h>
    26 #include <relpipe/reader/TypeId.h>
    24 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
    27 #include <relpipe/reader/handlers/RelationalReaderStringHandler.h>
    25 #include <relpipe/reader/handlers/AttributeMetadata.h>
    28 #include <relpipe/reader/handlers/AttributeMetadata.h>
    35 namespace serialize {
    38 namespace serialize {
    36 
    39 
    37 class SerializeHandler : public relpipe::reader::handlers::RelationalReaderStringHandler {
    40 class SerializeHandler : public relpipe::reader::handlers::RelationalReaderStringHandler {
    38 private:
    41 private:
    39 	Configuration configuration;
    42 	Configuration configuration;
    40 	shared_ptr<relpipe::writer::RelationalWriter> relationalWriter;
    43 	shared_ptr<relpipe::writer::RelationalWriter> writer;
       
    44 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor;
       
    45 
       
    46 	class RelationContext {
       
    47 	public:
       
    48 		relpipe::common::type::StringX name;
       
    49 		std::vector<relpipe::reader::handlers::AttributeMetadata> readerMetadata;
       
    50 		std::vector<relpipe::writer::AttributeMetadata> writerMetadata;
       
    51 	} relationContext;
       
    52 
       
    53 	class RecordContext {
       
    54 	public:
       
    55 		std::stringstream buffer;
       
    56 		shared_ptr<relpipe::writer::RelationalWriter> writer;
       
    57 		size_t attributeIndex = 0;
       
    58 	} recordContext;
       
    59 
       
    60 	relpipe::common::type::StringX toHex(const std::string& octets) {
       
    61 		std::stringstream hex;
       
    62 		hex << std::hex << std::setfill('0') << std::hex;
       
    63 		for (size_t i = 0, size = octets.size(); i < size; i++) hex << std::setw(2) << (0xff & octets[i]);
       
    64 		return convertor.from_bytes(hex.str());
       
    65 	}
       
    66 
    41 public:
    67 public:
    42 
    68 
    43 	SerializeHandler(shared_ptr<relpipe::writer::RelationalWriter> relationalWriter, Configuration configuration) : relationalWriter(relationalWriter), configuration(configuration) {
    69 	SerializeHandler(shared_ptr<relpipe::writer::RelationalWriter> writer, Configuration configuration) : writer(writer), configuration(configuration) {
       
    70 		// TODO: configurable relation name
       
    71 		// TODO: configurable attribute name
       
    72 		// TODO: optional custom attributes with constant value or ordinal number
       
    73 		// TODO: optional serialization of only certain relations? and certain fields?
       
    74 		// TODO: optional pass-through of certain relations?
       
    75 		// TODO: multiple modes? one output record per a) one input record, b) several input records, c) one input relation, … ?
       
    76 
       
    77 		writer->startRelation(L"message",{
       
    78 			{L"data", relpipe::writer::TypeId::STRING} // TODO: octet-string (when supported) instead of HEX
       
    79 		}, true);
    44 	}
    80 	}
    45 
    81 
    46 	virtual ~SerializeHandler() = default;
    82 	virtual ~SerializeHandler() = default;
    47 
    83 
    48 	void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
    84 	void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
       
    85 		relationContext = RelationContext();
       
    86 
       
    87 		relationContext.name = name;
       
    88 		relationContext.readerMetadata = attributes;
       
    89 
       
    90 		recordContext = RecordContext();
       
    91 		recordContext.writer.reset(relpipe::writer::Factory::create(recordContext.buffer));
       
    92 
       
    93 		for (relpipe::reader::handlers::AttributeMetadata readerMetadata : attributes) {
       
    94 			relationContext.writerMetadata.push_back({readerMetadata.getAttributeName(), writer->toTypeId(readerMetadata.getTypeName())});
       
    95 		}
    49 	}
    96 	}
    50 
    97 
    51 	void attribute(const relpipe::common::type::StringX& value) override {
    98 	void attribute(const relpipe::common::type::StringX& value) override {
       
    99 		if (recordContext.attributeIndex == 0) {
       
   100 			recordContext.writer->startRelation(relationContext.name, relationContext.writerMetadata, true);
       
   101 		}
    52 
   102 
       
   103 		recordContext.writer->writeAttribute(value); // TODO: typed values instead of strings
       
   104 		recordContext.attributeIndex++;
       
   105 
       
   106 		if (recordContext.attributeIndex % relationContext.readerMetadata.size() == 0) {
       
   107 			writer->writeAttribute(toHex(recordContext.buffer.str()));
       
   108 			recordContext = RecordContext();
       
   109 			recordContext.writer.reset(relpipe::writer::Factory::create(recordContext.buffer));
       
   110 		}
    53 	}
   111 	}
    54 
   112 
    55 	void endOfPipe() {
   113 	void endOfPipe() {
       
   114 		// TODO: check number of attribute values
    56 	}
   115 	}
    57 
   116 
    58 };
   117 };
    59 
   118 
    60 }
   119 }