AttributeMetadata: call vector.clear() before vector.reserve(), fixes error (if there are multiple relations in the stream)
#pragma once
#include <string>
#include <iostream>
#include <vector>
#include <relpipe/protocol/constants.h>
#include <memory>
#include "../include/relpipe/reader/typedefs.h"
#include "../include/relpipe/reader/RelationalReader.h"
#include "../include/relpipe/reader/TypeId.h"
#include "../include/relpipe/reader/handlers/RelationalReaderBaseHandler.h"
#include "../include/relpipe/reader/handlers/RelationalReaderStringHandler.h"
#include "../include/relpipe/reader/handlers/RelationalReaderValueHandler.h"
#include "AttributeMetadataPrivate.h"
#include "DataTypeReaderBase.h"
#include "types/BooleanDataTypeReader.h"
#include "types/IntegerDataTypeReader.h"
#include "types/StringDataTypeReader.h"
namespace relpipe {
namespace reader {
using namespace relpipe::protocol;
using StringHandler = relpipe::reader::handlers::RelationalReaderStringHadler;
using ValuesHandler = relpipe::reader::handlers::RelationalReaderValueHadler;
using AttributeMetadata = relpipe::reader::handlers::AttributeMetadata;
using AttributeMetadataPrivate = relpipe::reader::handlers::AttributeMetadataPrivate;
class StreamRelationalReader : public RelationalReader {
private:
std::istream &input;
types::BooleanDataTypeReader booleanReader;
types::IntegerDataTypeReader integerReader;
types::StringDataTypeReader stringReader;
std::vector<DataTypeReaderBase*> readers = {&booleanReader, &integerReader, &stringReader};
std::vector<StringHandler*> stringHandlers;
std::vector<ValuesHandler*> valuesHandlers;
/**
* count of columns in the current table
*/
integer_t columnCount;
/**
* number of column (0 = first) that will be written; after writing, the number is increased and prepared for next one
*/
integer_t currentColumn;
/**
* types of columns in the current table
*/
std::vector<TypeId> columnTypes;
std::vector<string_t> columnNames;
std::vector<AttributeMetadata> columns;
/**
* TODO: remove?
*/
string_t readString(std::istream &input, const TypeId typeId) {
for (DataTypeReaderBase* reader : readers) if (reader->supports(typeId)) return reader->readString(input);
throw RelpipeReaderException(L"Unsupported data type: " + (int) typeId);
}
void read(std::istream &input, std::function<void(const void *, const std::type_info&) > handler, const TypeId typeId) {
for (DataTypeReaderBase* reader : readers) if (reader->supports(typeId)) return reader->read(input, handler);
throw RelpipeReaderException(L"Unsupported data type: " + (int) typeId);
}
void read(std::istream &input, std::function<void(const string_t&, const void *, const std::type_info&) > handler, const TypeId typeId) {
for (DataTypeReaderBase* reader : readers) if (reader->supports(typeId)) return reader->read(input, handler);
throw RelpipeReaderException(L"Unsupported data type: " + (int) typeId);
}
void endOfPipe() {
for (StringHandler* handler : stringHandlers) handler->endOfPipe();
for (ValuesHandler* handler : valuesHandlers) handler->endOfPipe();
}
public:
StreamRelationalReader(std::istream &input) :
input(input) {
}
string_t toTypeCode(const TypeId typeId) override {
for (DataTypeReaderBase* reader : readers) if (reader->supports(typeId)) return reader->getTypeCode();
throw RelpipeReaderException(L"Unsupported data type: " + static_cast<integer_t> (typeId));
}
void addHandler(StringHandler* handler) override {
stringHandlers.push_back(handler);
}
void addHandler(ValuesHandler* handler) override {
valuesHandlers.push_back(handler);
}
void process() override {
while (true) {
integer_t dataPart;
try {
dataPart = integerReader.readValue(input);
// output << "dataPart: " << dataPart << endl;
} catch (RelpipeReaderException e) {
if (input.eof() && input.gcount() == 0) {
if (dataPart == DATA_PART_ROW) {
// last part was row
// input was fully read
// we are finished
// TODO: printCachedData(output); ???
endOfPipe();
return;
} else if (dataPart == DATA_PART_START) {
// Empty relation might be weird but it is valid data.
// Actually, it is not so weird as it looks.
// fwprintf(stderr, L"Warning: The table has no rows. Weird… but OK.\n");
endOfPipe();
return;
} else {
// in current format, there is no other data part
// so this will never happen
// but maybe later…
throw RelpipeReaderException(L"Unexpected EOF");
}
} else if (input.eof()) {
fwprintf(stderr, L"Error: found some unexpected data on the input stream: %d\n", input.gcount());
throw e;
} else {
// other error
throw e;
}
}
if (dataPart == DATA_PART_START) {
// Print data of previous table
// TODO: if (values.size() > 0) printCachedData(output); ???
// Read table name
string_t tableName = stringReader.readValue(input);
// Read column count
columnCount = integerReader.readValue(input);
columnTypes.clear();
columnNames.clear();
columns.clear();
columnTypes.reserve(columnCount);
columnNames.reserve(columnCount);
columns.reserve(columnCount);
// Read column names
for (int i = 0; i < columnCount; i++) {
columnNames.push_back(stringReader.readValue(input));
}
// Read column types
for (int i = 0; i < columnCount; i++) {
TypeId typeId = (TypeId) integerReader.readValue(input); // TODO: přetypování OK?
string_t typeCode = toTypeCode(typeId); // validate typeId TODO: je potřeba?
columnTypes.push_back(typeId);
// put together names, type ids and type codes:
columns.push_back(std::shared_ptr<AttributeMetadataPrivate>(new AttributeMetadataPrivate({columnNames[i], columnTypes[i], typeCode})));
}
for (StringHandler* handler : stringHandlers) handler->startRelation(tableName, columns);
for (ValuesHandler* handler : valuesHandlers) handler->startRelation(tableName, columns);
} else if (dataPart == DATA_PART_ROW) {
for (int i = 0; i < columnCount; i++) {
TypeId typeId = columnTypes[i];
if (stringHandlers.empty()) {
read(input, [&](const void * rawValue, const std::type_info & typeInfo) {
for (ValuesHandler* handler : valuesHandlers) handler->attribute(rawValue, typeInfo);
}, typeId);
} else {
read(input, [&](const string_t& stringValue, const void * rawValue, const std::type_info & typeInfo) {
for (StringHandler* handler : stringHandlers) handler->attribute(stringValue);
for (ValuesHandler* handler : valuesHandlers) handler->attribute(rawValue, typeInfo);
}, typeId);
}
}
} else {
throw RelpipeReaderException(L"Unknown data part");
}
}
throw RelpipeReaderException(L"Unexpected exception"); // should never happen
}
};
}
}