#pragma once
#include <string>
#include <iostream>
#include <vector>
#include <relpipe/protocol/constants.h>
#include "../include/relpipe/writer/typedefs.h"
#include "../include/relpipe/writer/RelationalWriter.h"
#include "../include/relpipe/writer/TypeId.h"
#include "DataTypeWriterBase.h"
#include "types/BooleanDataTypeWriter.h"
#include "types/IntegerDataTypeWriter.h"
#include "types/StringDataTypeWriter.h"
namespace relpipe {
namespace writer {
using namespace relpipe::protocol;
class StreamRelationalWriter : public RelationalWriter {
private:
std::ostream &output;
types::BooleanDataTypeWriter booleanWriter;
types::IntegerDataTypeWriter integerWriter;
types::StringDataTypeWriter stringWriter;
std::vector<DataTypeWriterBase*> writers = {&booleanWriter, &integerWriter, &stringWriter};
/**
* count of columns in the current table
*/
integer_t columnCount;
/**
* number of column (0 = first) that will be written; after writing, the number is increased and prepared for next one
*/
integer_t currentColumn;
/**
* types of columns in the current table
*/
std::vector<TypeId> columnTypes;
void writeString(const string_t &stringValue, const TypeId typeId) {
for (DataTypeWriterBase* writer : writers) if (writer->supports(typeId)) return writer->writeString(output, stringValue);
throw RelpipeWriterException(L"Unsupported data type: " + static_cast<integer_t> (typeId));
}
void writeRaw(const void* value, const type_info& typeInfo, const TypeId typeId) {
for (DataTypeWriterBase* writer : writers) if (writer->supports(typeId)) return writer->writeRaw(output, value, typeInfo);
throw RelpipeWriterException(L"Unsupported data type: " + static_cast<integer_t> (typeId));
}
public:
StreamRelationalWriter(std::ostream &output) :
output(output) {
}
TypeId toTypeId(const string_t typeCode) override {
for (DataTypeWriterBase* writer : writers) if (writer->supports(typeCode)) return writer->getTypeId();
throw RelpipeWriterException(L"Unsupported data type: " + typeCode);
}
void startRelation(string_t name, std::vector<std::pair<string_t, TypeId> > attributes, boolean_t writeHeader) override {
string_t tableName = name;
columnCount = attributes.size();
currentColumn = 0;
// Write table name and column count:
integerWriter.writeValue(output, DATA_PART_START);
stringWriter.writeValue(output, tableName);
integerWriter.writeValue(output, columnCount);
columnTypes.clear();
columnTypes.resize(columnCount);
// Write column names:
for (size_t c = 0; c < columnCount; c++) {
wstring columnName = attributes[c].first;
stringWriter.writeValue(output, columnName);
}
// Write column types:
for (size_t c = 0; c < columnCount; c++) {
TypeId typeId = attributes[c].second;
integerWriter.writeValue(output, static_cast<integer_t> (typeId));
columnTypes[c] = typeId;
}
}
void writeAttribute(const string_t& value) override {
if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);
writeString(value, columnTypes[currentColumn]);
if (++currentColumn == columnCount) currentColumn = 0;
}
void writeAttribute(const void* value, const std::type_info& type) override {
if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);
writeRaw(value, type, columnTypes[currentColumn]);
if (++currentColumn == columnCount) currentColumn = 0;
}
};
}
}