35 namespace serialize { |
38 namespace serialize { |
36 |
39 |
37 class SerializeHandler : public relpipe::reader::handlers::RelationalReaderStringHandler { |
40 class SerializeHandler : public relpipe::reader::handlers::RelationalReaderStringHandler { |
38 private: |
41 private: |
39 Configuration configuration; |
42 Configuration configuration; |
40 shared_ptr<relpipe::writer::RelationalWriter> relationalWriter; |
43 shared_ptr<relpipe::writer::RelationalWriter> writer; |
|
44 std::wstring_convert<codecvt_utf8<wchar_t>> convertor; |
|
45 |
|
46 class RelationContext { |
|
47 public: |
|
48 relpipe::common::type::StringX name; |
|
49 std::vector<relpipe::reader::handlers::AttributeMetadata> readerMetadata; |
|
50 std::vector<relpipe::writer::AttributeMetadata> writerMetadata; |
|
51 } relationContext; |
|
52 |
|
53 class RecordContext { |
|
54 public: |
|
55 std::stringstream buffer; |
|
56 shared_ptr<relpipe::writer::RelationalWriter> writer; |
|
57 size_t attributeIndex = 0; |
|
58 } recordContext; |
|
59 |
|
60 relpipe::common::type::StringX toHex(const std::string& octets) { |
|
61 std::stringstream hex; |
|
62 hex << std::hex << std::setfill('0') << std::hex; |
|
63 for (size_t i = 0, size = octets.size(); i < size; i++) hex << std::setw(2) << (0xff & octets[i]); |
|
64 return convertor.from_bytes(hex.str()); |
|
65 } |
|
66 |
41 public: |
67 public: |
42 |
68 |
43 SerializeHandler(shared_ptr<relpipe::writer::RelationalWriter> relationalWriter, Configuration configuration) : relationalWriter(relationalWriter), configuration(configuration) { |
69 SerializeHandler(shared_ptr<relpipe::writer::RelationalWriter> writer, Configuration configuration) : writer(writer), configuration(configuration) { |
|
70 // TODO: configurable relation name |
|
71 // TODO: configurable attribute name |
|
72 // TODO: optional custom attributes with constant value or ordinal number |
|
73 // TODO: optional serialization of only certain relations? and certain fields? |
|
74 // TODO: optional pass-through of certain relations? |
|
75 // TODO: multiple modes? one output record per a) one input record, b) several input records, c) one input relation, … ? |
|
76 |
|
77 writer->startRelation(L"message",{ |
|
78 {L"data", relpipe::writer::TypeId::STRING} // TODO: octet-string (when supported) instead of HEX |
|
79 }, true); |
44 } |
80 } |
45 |
81 |
46 virtual ~SerializeHandler() = default; |
82 virtual ~SerializeHandler() = default; |
47 |
83 |
48 void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override { |
84 void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override { |
|
85 relationContext = RelationContext(); |
|
86 |
|
87 relationContext.name = name; |
|
88 relationContext.readerMetadata = attributes; |
|
89 |
|
90 recordContext = RecordContext(); |
|
91 recordContext.writer.reset(relpipe::writer::Factory::create(recordContext.buffer)); |
|
92 |
|
93 for (relpipe::reader::handlers::AttributeMetadata readerMetadata : attributes) { |
|
94 relationContext.writerMetadata.push_back({readerMetadata.getAttributeName(), writer->toTypeId(readerMetadata.getTypeName())}); |
|
95 } |
49 } |
96 } |
50 |
97 |
51 void attribute(const relpipe::common::type::StringX& value) override { |
98 void attribute(const relpipe::common::type::StringX& value) override { |
|
99 if (recordContext.attributeIndex == 0) { |
|
100 recordContext.writer->startRelation(relationContext.name, relationContext.writerMetadata, true); |
|
101 } |
52 |
102 |
|
103 recordContext.writer->writeAttribute(value); // TODO: typed values instead of strings |
|
104 recordContext.attributeIndex++; |
|
105 |
|
106 if (recordContext.attributeIndex % relationContext.readerMetadata.size() == 0) { |
|
107 writer->writeAttribute(toHex(recordContext.buffer.str())); |
|
108 recordContext = RecordContext(); |
|
109 recordContext.writer.reset(relpipe::writer::Factory::create(recordContext.buffer)); |
|
110 } |
53 } |
111 } |
54 |
112 |
55 void endOfPipe() { |
113 void endOfPipe() { |
|
114 // TODO: check number of attribute values |
56 } |
115 } |
57 |
116 |
58 }; |
117 }; |
59 |
118 |
60 } |
119 } |