src/StreamRelationalWriter.h
author František Kučera <franta-hg@frantovo.cz>
Sat, 23 Apr 2022 23:23:29 +0200
branchv_0
changeset 59 4fce579bed22
parent 58 25c1ff79297c
child 60 1b20c1e03065
permissions -rw-r--r--
BufferingMode: configurable modes that control when flush() is called set through the environmental variable RELPIPE_WRITER_BUFFERING_MODE or through the API method setBufferingMode()

/**
 * Relational pipes (library)
 * Copyright © 2018 František Kučera (Frantovo.cz, GlobalCode.info)
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the:
 *  - GNU Lesser General Public License as published by the Free Software Foundation;
 *    version 3 of the License or (at your option)
 *  - GNU General Public License as published by the Free Software Foundation;
 *    version 2 of the License.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#pragma once

#include <string>
#include <cstring>
#include <iostream>
#include <vector>
#include <stdexcept>

#include <relpipe/protocol/constants.h>

#include "../include/relpipe/writer/typedefs.h"
#include "../include/relpipe/writer/RelationalWriter.h"
#include "../include/relpipe/writer/TypeId.h"
#include "../include/relpipe/writer/AttributeMetadata.h"
#include "DataTypeWriterBase.h"
#include "types/BooleanDataTypeWriter.h"
#include "types/IntegerDataTypeWriter.h"
#include "types/StringDataTypeWriter.h"

namespace relpipe {
namespace writer {

using namespace relpipe::protocol;

class StreamRelationalWriter : public RelationalWriter {
private:
	std::ostream &output;
	types::BooleanDataTypeWriter booleanWriter;
	types::IntegerDataTypeWriter integerWriter;
	types::StringDataTypeWriter stringWriter;
	std::vector<DataTypeWriterBase*> writers = {&booleanWriter, &integerWriter, &stringWriter};
	BufferingMode bufferingMode = BufferingMode::ENVIRONMENT;
	BufferingMode bufferingModeEnvDefault = BufferingMode::AUTO;

	/**
	 * count of columns in the current table
	 */
	integer_t columnCount;
	/**
	 * number of column (0 = first) that will be written; after writing, the number is increased and prepared for next one
	 */
	integer_t currentColumn;

	/**
	 * types of columns in the current table
	 */
	std::vector<TypeId> columnTypes;

	void writeString(const string_t &stringValue, const TypeId typeId) {
		// TODO: cache writers at given positions
		for (DataTypeWriterBase* writer : writers) if (writer->supports(typeId)) return writer->writeString(output, stringValue);
		throw RelpipeWriterException(L"Unsupported data type: " + static_cast<integer_t> (typeId));
	}

	void writeRaw(const void* value, const type_info& typeInfo, const TypeId typeId) {
		// TODO: cache writers at given positions
		for (DataTypeWriterBase* writer : writers) if (writer->supports(typeId)) return writer->writeRaw(output, value, typeInfo);
		throw RelpipeWriterException(L"Unsupported data type: " + static_cast<integer_t> (typeId));
	}

	BufferingMode decodeBufferingMode(std::string modeName) {
		if (modeName == "AUTO") return BufferingMode::AUTO;
		else if (modeName == "ENVIRONMENT") return BufferingMode::ENVIRONMENT;
		else if (modeName == "RELATION") return BufferingMode::RELATION;
		else if (modeName == "RECORD") return BufferingMode::RECORD;
		else if (modeName == "ATTRIBUTE") return BufferingMode::ATTRIBUTE;
		else throw std::invalid_argument("Invalid value of BufferingMode.");
	}

	void updateBufferingMode() {
		if (bufferingMode == BufferingMode::ENVIRONMENT) {
			bufferingMode = bufferingModeEnvDefault;
			try {
				char* modeName = getenv("RELPIPE_WRITER_BUFFERING_MODE");
				if (modeName && strlen(modeName)) bufferingMode = decodeBufferingMode(modeName);
			} catch (...) {
				throw RelpipeWriterException(L"Invalid value of the RELPIPE_WRITER_BUFFERING_MODE environmental variable.");
			}
			if (bufferingMode == BufferingMode::ENVIRONMENT) throw RelpipeWriterException(L"RELPIPE_WRITER_BUFFERING_MODE must not be set to ENVIRONMENT (infinite recursion)");
		}
	}

public:

	StreamRelationalWriter(std::ostream &output) :
	output(output) {
	}

	TypeId toTypeId(const string_t typeCode) override {
		for (DataTypeWriterBase* writer : writers) if (writer->supports(typeCode)) return writer->getTypeId();
		throw RelpipeWriterException(L"Unsupported data type: " + typeCode);
	}

	void startRelation(string_t name, std::vector<AttributeMetadata> attributes, boolean_t writeHeader) override {
		string_t tableName = name;
		columnCount = attributes.size();
		currentColumn = 0;

		updateBufferingMode();

		// Write table name and column count:
		if (writeHeader) {
			integerWriter.writeValue(output, DATA_PART_START);
			stringWriter.writeValue(output, tableName);
			integerWriter.writeValue(output, columnCount);
		}

		columnTypes.clear();
		columnTypes.resize(columnCount);

		// Write column names:
		if (writeHeader) {
			for (size_t c = 0; c < columnCount; c++) {
				wstring columnName = attributes[c].attributeName;
				stringWriter.writeValue(output, columnName);
			}
		}

		// Write column types:
		for (size_t c = 0; c < columnCount; c++) {
			TypeId typeId = attributes[c].typeId;
			if (writeHeader) integerWriter.writeValue(output, static_cast<integer_t> (typeId));
			columnTypes[c] = typeId;
		}

		if (bufferingMode >= BufferingMode::RELATION && bufferingMode <= BufferingMode::ATTRIBUTE) output.flush();
	}

	void writeAttribute(const string_t& value) override {
		if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);

		// TODO: select writer for each attribute just once in startRelation() instead of looking it each time here
		writeString(value, columnTypes[currentColumn]);

		bool endOfRedord = ++currentColumn == columnCount;
		if (endOfRedord) currentColumn = 0;

		if (bufferingMode == BufferingMode::ATTRIBUTE || (endOfRedord && bufferingMode == BufferingMode::RECORD)) output.flush();
	}

	void writeAttribute(const void* value, const std::type_info& type) override {
		if (currentColumn == 0) integerWriter.writeValue(output, DATA_PART_ROW);

		// TODO: select writer for each attribute just once in startRelation() instead of looking it each time here
		writeRaw(value, type, columnTypes[currentColumn]);

		bool endOfRedord = ++currentColumn == columnCount;
		if (endOfRedord) currentColumn = 0;

		if (bufferingMode == BufferingMode::ATTRIBUTE || (endOfRedord && bufferingMode == BufferingMode::RECORD)) output.flush();
	}

	void setBufferingMode(BufferingMode mode, BufferingMode envDefault) override {
		bufferingMode = mode;
		bufferingModeEnvDefault = envDefault;
	}

};

}
}