src/StreamRelationalReader.h
author František Kučera <franta-hg@frantovo.cz>
Sun, 16 Sep 2018 18:00:34 +0200
branchv_0
changeset 28 c9e03557c1e1
parent 26 019edca46769
child 29 755978b0935c
permissions -rw-r--r--
AttributeMetadata: call vector.clear() before vector.reserve(), fixes error (if there are multiple relations in the stream)

#pragma once

#include <string>
#include <iostream>
#include <vector>

#include <relpipe/protocol/constants.h>
#include <memory>

#include "../include/relpipe/reader/typedefs.h"
#include "../include/relpipe/reader/RelationalReader.h"
#include "../include/relpipe/reader/TypeId.h"
#include "../include/relpipe/reader/handlers/RelationalReaderBaseHandler.h"
#include "../include/relpipe/reader/handlers/RelationalReaderStringHandler.h"
#include "../include/relpipe/reader/handlers/RelationalReaderValueHandler.h"
#include "AttributeMetadataPrivate.h"
#include "DataTypeReaderBase.h"
#include "types/BooleanDataTypeReader.h"
#include "types/IntegerDataTypeReader.h"
#include "types/StringDataTypeReader.h"

namespace relpipe {
namespace reader {

using namespace relpipe::protocol;

using StringHandler = relpipe::reader::handlers::RelationalReaderStringHadler;
using ValuesHandler = relpipe::reader::handlers::RelationalReaderValueHadler;
using AttributeMetadata = relpipe::reader::handlers::AttributeMetadata;
using AttributeMetadataPrivate = relpipe::reader::handlers::AttributeMetadataPrivate;

class StreamRelationalReader : public RelationalReader {
private:
	std::istream &input;
	types::BooleanDataTypeReader booleanReader;
	types::IntegerDataTypeReader integerReader;
	types::StringDataTypeReader stringReader;
	std::vector<DataTypeReaderBase*> readers = {&booleanReader, &integerReader, &stringReader};

	std::vector<StringHandler*> stringHandlers;
	std::vector<ValuesHandler*> valuesHandlers;

	/**
	 * count of columns in the current table
	 */
	integer_t columnCount;
	/**
	 * number of column (0 = first) that will be written; after writing, the number is increased and prepared for next one
	 */
	integer_t currentColumn;

	/**
	 * types of columns in the current table
	 */
	std::vector<TypeId> columnTypes;
	std::vector<string_t> columnNames;
	std::vector<AttributeMetadata> columns;

	/**
	 * TODO: remove?
	 */
	string_t readString(std::istream &input, const TypeId typeId) {
		for (DataTypeReaderBase* reader : readers) if (reader->supports(typeId)) return reader->readString(input);
		throw RelpipeReaderException(L"Unsupported data type: " + (int) typeId);
	}

	void read(std::istream &input, std::function<void(const void *, const std::type_info&) > handler, const TypeId typeId) {
		for (DataTypeReaderBase* reader : readers) if (reader->supports(typeId)) return reader->read(input, handler);
		throw RelpipeReaderException(L"Unsupported data type: " + (int) typeId);
	}

	void read(std::istream &input, std::function<void(const string_t&, const void *, const std::type_info&) > handler, const TypeId typeId) {
		for (DataTypeReaderBase* reader : readers) if (reader->supports(typeId)) return reader->read(input, handler);
		throw RelpipeReaderException(L"Unsupported data type: " + (int) typeId);
	}

	void endOfPipe() {
		for (StringHandler* handler : stringHandlers) handler->endOfPipe();
		for (ValuesHandler* handler : valuesHandlers) handler->endOfPipe();
	}

public:

	StreamRelationalReader(std::istream &input) :
	input(input) {
	}

	string_t toTypeCode(const TypeId typeId) override {
		for (DataTypeReaderBase* reader : readers) if (reader->supports(typeId)) return reader->getTypeCode();
		throw RelpipeReaderException(L"Unsupported data type: " + static_cast<integer_t> (typeId));
	}

	void addHandler(StringHandler* handler) override {
		stringHandlers.push_back(handler);
	}

	void addHandler(ValuesHandler* handler) override {
		valuesHandlers.push_back(handler);
	}

	void process() override {

		while (true) {
			integer_t dataPart;
			try {
				dataPart = integerReader.readValue(input);
				// output << "dataPart: " << dataPart << endl;
			} catch (RelpipeReaderException e) {
				if (input.eof() && input.gcount() == 0) {
					if (dataPart == DATA_PART_ROW) {
						// last part was row
						// input was fully read
						// we are finished
						// TODO: printCachedData(output); ???
						endOfPipe();
						return;
					} else if (dataPart == DATA_PART_START) {
						// Empty relation might be weird but it is valid data.
						// Actually, it is not so weird as it looks.
						// fwprintf(stderr, L"Warning: The table has no rows. Weird… but OK.\n");
						endOfPipe();
						return;
					} else {
						// in current format, there is no other data part
						// so this will never happen
						// but maybe later…
						throw RelpipeReaderException(L"Unexpected EOF");
					}
				} else if (input.eof()) {
					fwprintf(stderr, L"Error: found some unexpected data on the input stream: %d\n", input.gcount());
					throw e;
				} else {
					// other error
					throw e;
				}
			}

			if (dataPart == DATA_PART_START) {
				// Print data of previous table
				// TODO: if (values.size() > 0) printCachedData(output); ???

				// Read table name
				string_t tableName = stringReader.readValue(input);

				// Read column count
				columnCount = integerReader.readValue(input);

				columnTypes.clear();
				columnNames.clear();
				columns.clear();
				columnTypes.reserve(columnCount);
				columnNames.reserve(columnCount);
				columns.reserve(columnCount);

				// Read column names
				for (int i = 0; i < columnCount; i++) {
					columnNames.push_back(stringReader.readValue(input));
				}

				// Read column types
				for (int i = 0; i < columnCount; i++) {
					TypeId typeId = (TypeId) integerReader.readValue(input); // TODO: přetypování OK?
					string_t typeCode = toTypeCode(typeId); // validate typeId TODO: je potřeba?
					columnTypes.push_back(typeId);

					// put together names, type ids and type codes:
					columns.push_back(std::shared_ptr<AttributeMetadataPrivate>(new AttributeMetadataPrivate({columnNames[i], columnTypes[i], typeCode})));
				}

				for (StringHandler* handler : stringHandlers) handler->startRelation(tableName, columns);
				for (ValuesHandler* handler : valuesHandlers) handler->startRelation(tableName, columns);

			} else if (dataPart == DATA_PART_ROW) {
				for (int i = 0; i < columnCount; i++) {
					TypeId typeId = columnTypes[i];

					if (stringHandlers.empty()) {
						read(input, [&](const void * rawValue, const std::type_info & typeInfo) {
							for (ValuesHandler* handler : valuesHandlers) handler->attribute(rawValue, typeInfo);
						}, typeId);
					} else {
						read(input, [&](const string_t& stringValue, const void * rawValue, const std::type_info & typeInfo) {
							for (StringHandler* handler : stringHandlers) handler->attribute(stringValue);
							for (ValuesHandler* handler : valuesHandlers) handler->attribute(rawValue, typeInfo);
						}, typeId);
					}
				}

			} else {
				throw RelpipeReaderException(L"Unknown data part");
			}
		}

		throw RelpipeReaderException(L"Unexpected exception"); // should never happen

	}

};

}
}