#pragma once
#include <string>
#include <iostream>
#include <vector>
#include <relpipe/protocol/constants.h>
#include "../include/relpipe/reader/typedefs.h"
#include "../include/relpipe/reader/RelationalReader.h"
#include "../include/relpipe/reader/TypeId.h"
#include "../include/relpipe/reader/handlers/RelationalReaderBaseHandler.h"
#include "../include/relpipe/reader/handlers/RelationalReaderStringHandler.h"
#include "../include/relpipe/reader/handlers/RelationalReaderValueHandler.h"
#include "DataTypeReaderBase.h"
#include "types/BooleanDataTypeReader.h"
#include "types/IntegerDataTypeReader.h"
#include "types/StringDataTypeReader.h"
namespace relpipe {
namespace reader {
using namespace relpipe::protocol;
class StreamRelationalReader : public RelationalReader {
private:
std::istream &input;
types::BooleanDataTypeReader booleanReader;
types::IntegerDataTypeReader integerReader;
types::StringDataTypeReader stringReader;
std::vector<DataTypeReaderBase*> readers = {&booleanReader, &integerReader, &stringReader};
std::vector<handlers::RelationalReaderStringHadler*> stringHandlers;
std::vector<handlers::RelationalReaderValueHadler*> valueHandlers;
/**
* count of columns in the current table
*/
integer_t columnCount;
/**
* number of column (0 = first) that will be written; after writing, the number is increased and prepared for next one
*/
integer_t currentColumn;
/**
* types of columns in the current table
*/
std::vector<TypeId> columnTypes;
std::vector<string_t> columnNames;
std::vector<std::pair<string_t, TypeId>> columns;
/**
* TODO: remove?
*/
string_t readString(std::istream &input, const TypeId typeId) {
for (DataTypeReaderBase* reader : readers) if (reader->supports(typeId)) return reader->readString(input);
throw RelpipeReaderException(L"Unsupported data type: " + (int) typeId);
}
void read(std::istream &input, std::function<void(const void *, const std::type_info&) > handler, const TypeId typeId) {
for (DataTypeReaderBase* reader : readers) if (reader->supports(typeId)) return reader->read(input, handler);
throw RelpipeReaderException(L"Unsupported data type: " + (int) typeId);
}
void read(std::istream &input, std::function<void(const string_t&, const void *, const std::type_info&) > handler, const TypeId typeId) {
for (DataTypeReaderBase* reader : readers) if (reader->supports(typeId)) return reader->read(input, handler);
throw RelpipeReaderException(L"Unsupported data type: " + (int) typeId);
}
public:
StreamRelationalReader(std::istream &input) :
input(input) {
}
string_t toTypeCode(const TypeId typeId) override {
for (DataTypeReaderBase* reader : readers) if (reader->supports(typeId)) return reader->getTypeCode();
throw RelpipeReaderException(L"Unsupported data type: " + static_cast<integer_t> (typeId));
}
void addHandler(handlers::RelationalReaderStringHadler* handler) override {
stringHandlers.push_back(handler);
}
void addHandler(handlers::RelationalReaderValueHadler* handler) override {
valueHandlers.push_back(handler);
}
void process() override {
while (true) {
integer_t dataPart;
try {
dataPart = integerReader.readValue(input);
// output << "dataPart: " << dataPart << endl;
} catch (RelpipeReaderException e) {
if (input.eof() && input.gcount() == 0) {
if (dataPart == DATA_PART_ROW) {
// last part was row
// input was fully read
// we are finished
// TODO: printCachedData(output); ???
return;
} else if (dataPart == DATA_PART_START) {
// Empty relation might be weird but it is valid data.
// Actually, it is not so weird as it looks.
// fwprintf(stderr, L"Warning: The table has no rows. Weird… but OK.\n");
return;
} else {
// in current format, there is no other data part
// so this will never happen
// but maybe later…
throw RelpipeReaderException(L"Unexpected EOF");
}
} else if (input.eof()) {
fwprintf(stderr, L"Error: found some unexpected data on the input stream: %d\n", input.gcount());
throw e;
} else {
// other error
throw e;
}
}
if (dataPart == DATA_PART_START) {
// Print data of previous table
// TODO: if (values.size() > 0) printCachedData(output); ???
// Read table name
string_t tableName = stringReader.readValue(input);
// Read column count
columnCount = integerReader.readValue(input);
columnTypes.resize(columnCount);
columnNames.resize(columnCount);
columns.resize(columnCount);
// Read column names
for (int i = 0; i < columnCount; i++) {
columnNames[i] = stringReader.readValue(input);
}
// Read column types
for (int i = 0; i < columnCount; i++) {
TypeId typeId = (TypeId) integerReader.readValue(input); // TODO: přetypování OK?
string_t typeCode = toTypeCode(typeId); // validate typeId TODO: je potřeba?
columnTypes[i] = typeId;
}
for (int i = 0; i < columnCount; i++) {
columns[i] = {columnNames[i], columnTypes[i]};
}
for (int i = 0; i < stringHandlers.size(); i++) stringHandlers[i]->startRelation(tableName, columns);
for (int i = 0; i < valueHandlers.size(); i++) valueHandlers[i]->startRelation(tableName, columns);
} else if (dataPart == DATA_PART_ROW) {
for (int i = 0; i < columnCount; i++) {
TypeId typeId = columnTypes[i];
if (stringHandlers.empty()) {
read(input, [&](const void * rawValue, const std::type_info & typeInfo) {
for (int i = 0; i < valueHandlers.size(); i++) valueHandlers[i]->attribute(rawValue, typeInfo);
}, typeId);
} else {
read(input, [&](const string_t& stringValue, const void * rawValue, const std::type_info & typeInfo) {
for (int i = 0; i < stringHandlers.size(); i++) stringHandlers[i]->attribute(stringValue);
for (int i = 0; i < valueHandlers.size(); i++) valueHandlers[i]->attribute(rawValue, typeInfo);
}, typeId);
}
}
} else {
throw RelpipeReaderException(L"Unknown data part");
}
}
throw RelpipeReaderException(L"Unexpected exception"); // should never happen
}
};
}
}