src/StreamRelationalWriter.h
author František Kučera <franta-hg@frantovo.cz>
Fri, 24 Jan 2020 20:49:46 +0100
branchv_0
changeset 47 77fbaccdeea4
parent 41 744b61559eb2
child 52 785fc49b0a55
permissions -rw-r--r--
support „writeHeader = false“ to allow appending records to an existing relation

/**
 * Relational pipes (library)
 * Copyright © 2018 František Kučera (Frantovo.cz, GlobalCode.info)
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the:
 *  - GNU Lesser General Public License as published by the Free Software Foundation;
 *    version 3 of the License or (at your option)
 *  - GNU General Public License as published by the Free Software Foundation;
 *    version 2 of the License.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#pragma once

#include <string>
#include <iostream>
#include <vector>

#include <relpipe/protocol/constants.h>

#include "../include/relpipe/writer/typedefs.h"
#include "../include/relpipe/writer/RelationalWriter.h"
#include "../include/relpipe/writer/TypeId.h"
#include "../include/relpipe/writer/AttributeMetadata.h"
#include "DataTypeWriterBase.h"
#include "types/BooleanDataTypeWriter.h"
#include "types/IntegerDataTypeWriter.h"
#include "types/StringDataTypeWriter.h"

namespace relpipe {
namespace writer {

using namespace relpipe::protocol;

class StreamRelationalWriter : public RelationalWriter {
private:
	std::ostream &output;
	types::BooleanDataTypeWriter booleanWriter;
	types::IntegerDataTypeWriter integerWriter;
	types::StringDataTypeWriter stringWriter;
	std::vector<DataTypeWriterBase*> writers = {&booleanWriter, &integerWriter, &stringWriter};

	/**
	 * count of columns in the current table
	 */
	integer_t columnCount;
	/**
	 * number of column (0 = first) that will be written; after writing, the number is increased and prepared for next one
	 */
	integer_t currentColumn;

	/**
	 * types of columns in the current table
	 */
	std::vector<TypeId> columnTypes;

	void writeString(const string_t &stringValue, const TypeId typeId) {
		for (DataTypeWriterBase* writer : writers) if (writer->supports(typeId)) return writer->writeString(output, stringValue);
		throw RelpipeWriterException(L"Unsupported data type: " + static_cast<integer_t> (typeId));
	}

	void writeRaw(const void* value, const type_info& typeInfo, const TypeId typeId) {
		for (DataTypeWriterBase* writer : writers) if (writer->supports(typeId)) return writer->writeRaw(output, value, typeInfo);
		throw RelpipeWriterException(L"Unsupported data type: " + static_cast<integer_t> (typeId));
	}

public:

	StreamRelationalWriter(std::ostream &output) :
	output(output) {
	}

	TypeId toTypeId(const string_t typeCode) override {
		for (DataTypeWriterBase* writer : writers) if (writer->supports(typeCode)) return writer->getTypeId();
		throw RelpipeWriterException(L"Unsupported data type: " + typeCode);
	}

	void startRelation(string_t name, std::vector<AttributeMetadata> attributes, boolean_t writeHeader) override {
		string_t tableName = name;
		columnCount = attributes.size();
		currentColumn = 0;

		// Write table name and column count:
		if (writeHeader) {
			integerWriter.writeValue(output, DATA_PART_START);
			stringWriter.writeValue(output, tableName);
			integerWriter.writeValue(output, columnCount);
		}

		columnTypes.clear();
		columnTypes.resize(columnCount);

		// Write column names:
		if (writeHeader) {
			for (size_t c = 0; c < columnCount; c++) {
				wstring columnName = attributes[c].attributeName;
				stringWriter.writeValue(output, columnName);
			}
		}

		// Write column types:
		for (size_t c = 0; c < columnCount; c++) {
			TypeId typeId = attributes[c].typeId;
			if (writeHeader) integerWriter.writeValue(output, static_cast<integer_t> (typeId));
			columnTypes[c] = typeId;
		}

	}

	void writeAttribute(const string_t& value) override {
		if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);
		writeString(value, columnTypes[currentColumn]);
		if (++currentColumn == columnCount) currentColumn = 0;
	}

	void writeAttribute(const void* value, const std::type_info& type) override {
		if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);
		writeRaw(value, type, columnTypes[currentColumn]);
		if (++currentColumn == columnCount) currentColumn = 0;
	}

};

}
}