--- a/src/SerializeHandler.h Sat Apr 16 22:19:37 2022 +0200
+++ b/src/SerializeHandler.h Sun Apr 17 01:31:28 2022 +0200
@@ -18,6 +18,9 @@
#include <regex>
#include <stdexcept>
+#include <sstream>
+#include <codecvt>
+#include <iomanip>
#include <relpipe/common/type/typedefs.h>
#include <relpipe/reader/TypeId.h>
@@ -37,22 +40,78 @@
class SerializeHandler : public relpipe::reader::handlers::RelationalReaderStringHandler {
private:
Configuration configuration;
- shared_ptr<relpipe::writer::RelationalWriter> relationalWriter;
+ shared_ptr<relpipe::writer::RelationalWriter> writer;
+ std::wstring_convert<codecvt_utf8<wchar_t>> convertor;
+
+ class RelationContext {
+ public:
+ relpipe::common::type::StringX name;
+ std::vector<relpipe::reader::handlers::AttributeMetadata> readerMetadata;
+ std::vector<relpipe::writer::AttributeMetadata> writerMetadata;
+ } relationContext;
+
+ class RecordContext {
+ public:
+ std::stringstream buffer;
+ shared_ptr<relpipe::writer::RelationalWriter> writer;
+ size_t attributeIndex = 0;
+ } recordContext;
+
+ relpipe::common::type::StringX toHex(const std::string& octets) {
+ std::stringstream hex;
+ hex << std::hex << std::setfill('0') << std::hex;
+ for (size_t i = 0, size = octets.size(); i < size; i++) hex << std::setw(2) << (0xff & octets[i]);
+ return convertor.from_bytes(hex.str());
+ }
+
public:
- SerializeHandler(shared_ptr<relpipe::writer::RelationalWriter> relationalWriter, Configuration configuration) : relationalWriter(relationalWriter), configuration(configuration) {
+ SerializeHandler(shared_ptr<relpipe::writer::RelationalWriter> writer, Configuration configuration) : writer(writer), configuration(configuration) {
+ // TODO: configurable relation name
+ // TODO: configurable attribute name
+ // TODO: optional custom attributes with constant value or ordinal number
+ // TODO: optional serialization of only certain relations? and certain fields?
+ // TODO: optional pass-through of certain relations?
+ // TODO: multiple modes? one output record per a) one input record, b) several input records, c) one input relation, … ?
+
+ writer->startRelation(L"message",{
+ {L"data", relpipe::writer::TypeId::STRING} // TODO: octet-string (when supported) instead of HEX
+ }, true);
}
virtual ~SerializeHandler() = default;
void startRelation(relpipe::common::type::StringX name, std::vector<relpipe::reader::handlers::AttributeMetadata> attributes) override {
+ relationContext = RelationContext();
+
+ relationContext.name = name;
+ relationContext.readerMetadata = attributes;
+
+ recordContext = RecordContext();
+ recordContext.writer.reset(relpipe::writer::Factory::create(recordContext.buffer));
+
+ for (relpipe::reader::handlers::AttributeMetadata readerMetadata : attributes) {
+ relationContext.writerMetadata.push_back({readerMetadata.getAttributeName(), writer->toTypeId(readerMetadata.getTypeName())});
+ }
}
void attribute(const relpipe::common::type::StringX& value) override {
+ if (recordContext.attributeIndex == 0) {
+ recordContext.writer->startRelation(relationContext.name, relationContext.writerMetadata, true);
+ }
+ recordContext.writer->writeAttribute(value); // TODO: typed values instead of strings
+ recordContext.attributeIndex++;
+
+ if (recordContext.attributeIndex % relationContext.readerMetadata.size() == 0) {
+ writer->writeAttribute(toHex(recordContext.buffer.str()));
+ recordContext = RecordContext();
+ recordContext.writer.reset(relpipe::writer::Factory::create(recordContext.buffer));
+ }
}
void endOfPipe() {
+ // TODO: check number of attribute values
}
};