streamlet-examples/streamlet-common.h
author František Kučera <franta-hg@frantovo.cz>
Fri, 13 May 2022 21:35:30 +0200
branchv_0
changeset 96 c34106244a54
parent 95 f2f2b81079a5
permissions -rw-r--r--
portable order of (i++) parameters

/**
 * Relational pipes
 * Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, version 3 of the License.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#pragma once

#include <iostream>
#include <exception>
#include <vector>
#include <string>
#include <sstream>
#include <codecvt>
#include <locale>
#include <regex>

#include "StreamletMsg.h"

/**
 * Unlike the protocol and the message format,
 * these helper classes and functions are not part of the public API.
 * Thus when writing custom streamlets, it is better to copy this file
 * and review its changes while upgrading to new upstream version.
 */

using S = relpipe::in::filesystem::StreamletMsg;

class Streamlet {
protected:

	class Message {
	public:
		int code;
		std::vector<std::wstring> parameters;

		Message() {
		}

		Message(int code) : code(code) {
		}

		Message(int code, std::vector<std::wstring> parameters) : code(code), parameters(parameters) {
		}

		Message(int code, std::wstring p1) : code(code), parameters({p1}) {
		}

		Message(int code, std::wstring p1, std::wstring p2) : code(code), parameters({p1, p2}) {
		}
	};

private:

	static const char SEPARATOR = '\0';

	int readInt() {
		return std::stoi(readString());
	}

	std::wstring readString() {
		std::stringstream s;
		for (char ch; std::cin.read(&ch, 1).good() && ch != SEPARATOR;) s.put(ch);
		return convertor.from_bytes(s.str());
	}

	void writeString(std::wstring s) {
		// if the value contains a null byte 0x00, it will be trimmed (passing 0x00 through would break the protocol)
		std::cout << convertor.to_bytes(s.c_str());
		std::cout.put(SEPARATOR);
		if (std::cout.bad()) throw std::runtime_error("Unable to write to sub-process.");
	}

	void writeInt(int i) {
		writeString(std::to_wstring(i));
	}

	void flush() {
		std::cout.flush();
	}

	Message read() {
		Message m;
		m.code = readInt();
		int count = readInt();
		for (int i = 0; i < count; i++) m.parameters.push_back(readString());
		return m;
	}

	/**
	 * The std::wsmatch contains only references to original string,
	 * so we need to copy it in order to make it persistent and independent from variables that may evaporate.
	 */
	void copyMatches(std::wsmatch& source, std::vector<std::wstring>& destination) {
		for (std::wstring s : source) destination.emplace_back(s);
	}

	void processMessages() {
		while (true) {
			Message m = read();
			if (m.code == S::VERSION_SUPPORTED) processVersionSupported(m);
			else if (m.code == S::WAITING_FOR_VERSION) processWaitingForVersion(m);
			else if (m.code == S::RELATION_START) processRelationStart(m);
			else if (m.code == S::INPUT_ATTRIBUTE_METADATA) processInputAttributeMetadata(m);
			else if (m.code == S::OUTPUT_ATTRIBUTE_ALIAS) processOutputAttributeAlias(m);
			else if (m.code == S::OPTION) processOption(m);
			else if (m.code == S::INPUT_ATTRIBUTE) processInputAttribute(m);
			else if (m.code == S::WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA) processWaitingForOutputAttributesMetadata(m);
			else if (m.code == S::WAITING_FOR_OUTPUT_ATTRIBUTES) processWaitingForOutputAttributes(m);
			else if (m.code == S::RELATION_END) break;
			else processUnsupportedMessage(m);
		}
	}

protected:

	class AttributeMetadata {
	public:
		std::wstring name;
		std::wstring type;
	};

	class OutputAttribute {
	public:
		std::wstring value;
		bool isNull;
	};

	class Option {
	public:
		std::wstring name;
		std::wstring value;
		std::vector<std::wstring> nameMatch;
		std::vector<std::wstring> valueMatch;

		Option(std::wstring name, std::wstring value) : name(name), value(value) {
		}
	};

private:
	std::vector<std::wstring> versionsSupported;
	std::vector<AttributeMetadata> inputAttributes;
	std::vector<std::wstring> outputAttributeAliases;
	std::vector<Option> options;
	std::wstring currentRelation;
	std::wstring currentFile;
	std::wstring_convert < std::codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings. Or use always UTF-8 for communication with subprocesses.

protected:

	/**
	 * n.b. generic streamlet (later in relpipe-tr-streamler) will not have currentFile
	 */
	std::wstring getCurrentFile() {
		return currentFile;
	}

	std::wstring getCurrentRelation() {
		return currentRelation;
	}

	std::wstring fromBytes(std::string s) {
		return convertor.from_bytes(s);
	}

	std::string toBytes(std::wstring s) {
		return convertor.to_bytes(s);
	}

	static const std::wstring BOOLEAN;
	static const std::wstring INTEGER;
	static const std::wstring STRING;

	virtual void write(Message m) {
		writeInt(m.code);
		writeInt(m.parameters.size());
		for (auto p : m.parameters) writeString(p);
		flush();
	}

	virtual void processVersionSupported(Message& m) {
		versionsSupported.push_back(m.parameters[0]);
	}

	virtual void processWaitingForVersion(Message& m) {
		for (std::wstring v : versionsSupported) if (v == L"1") return write({S::VERSION_ACCEPTED, L"1"});
		write({S::STREAMLET_ERROR, L"INCOMPATIBLE_VERSION", L"Only version 1 is supported by this streamlet."});
	}

	virtual void processRelationStart(Message& m) {
		currentRelation = m.parameters[0];
	}

	virtual void processInputAttributeMetadata(Message& m) {
		inputAttributes.push_back({m.parameters[0], m.parameters[1]});
	}

	virtual void processOutputAttributeAlias(Message& m) {
		outputAttributeAliases.push_back(m.parameters[0]);
	}

	virtual void processOption(Message& m) {
		options.push_back({m.parameters[0], m.parameters[1]});
	}

	virtual void processInputAttribute(Message& m) {
		int index = std::stoi(m.parameters[0]);
		std::wstring value = m.parameters[1];
		bool isNull = m.parameters[2] == L"true";
		if (inputAttributes[index].name == L"path") currentFile = value;
	}

	virtual void processWaitingForOutputAttributesMetadata(Message& m) {
		for (AttributeMetadata am : getOutputAttributesMetadata()) write({S::OUTPUT_ATTRIBUTE_METADATA, am.name, am.type});
		write({S::WAITING_FOR_INPUT_ATTRIBUTES});
	}

	virtual void processWaitingForOutputAttributes(Message& m) {
		for (OutputAttribute oa : getOutputAttributes()) write({S::OUTPUT_ATTRIBUTE, oa.value, oa.isNull ? L"true" : L"false"});
		write({S::WAITING_FOR_INPUT_ATTRIBUTES});
	}

	virtual void processUnsupportedMessage(Message& m) {
		write({S::STREAMLET_ERROR, L"UNSUPPORTED_MESSAGE"});
	}

	virtual std::wstring getAlias(int index, const std::wstring& defaultValue) {
		if (outputAttributeAliases.size() > index) return outputAttributeAliases[index];
		else return defaultValue;
	}

	virtual std::vector<Option> getOptions(std::wstring name) {
		std::vector<Option> result;
		for (Option o : options) if (o.name == name) result.push_back(o);
		return result;
	}

	virtual std::vector<Option> getOptions(std::wregex namePattern) {
		std::vector<Option> result;
		std::wsmatch nameMatch;
		for (Option o : options) if (std::regex_match(o.name, nameMatch, namePattern)) {
				copyMatches(nameMatch, o.nameMatch);
				result.push_back(o);
			}
		return result;
	}

	virtual std::vector<Option> getOptions(std::wregex namePattern, std::wregex valuePattern) {
		// TODO: support multiple modes: 
		//   a) throw an exception if valuePattern does not match
		//   b) return option even if valuePattern does not match (valueMatch will be empty)
		//   c) skip options with value not matching (current behavior)
		std::wsmatch nameMatch;
		std::wsmatch valueMatch;
		std::vector<Option> result;
		for (Option o : options) if (std::regex_match(o.name, nameMatch, namePattern) && std::regex_match(o.value, valueMatch, valuePattern)) {
				copyMatches(nameMatch, o.nameMatch);
				copyMatches(valueMatch, o.valueMatch);
				result.push_back(o);
			}
		return result;
	}

	virtual std::vector<AttributeMetadata> getOutputAttributesMetadata() = 0;
	virtual std::vector<OutputAttribute> getOutputAttributes() = 0;

public:

	virtual ~Streamlet() {
	}

	int run() {
		try {
			processMessages();
			return 0;
		} catch (std::exception& e) {
			write({S::STREAMLET_ERROR, L"xxxx", L"Exception in streamlet: " + convertor.from_bytes(e.what())}); // FIXME: correct error codes
			return 1;
		} catch (...) {
			write({S::STREAMLET_ERROR, L"xxxx", L"Unknown exception in streamlet."}); // FIXME: correct error codes
			return 1;
		}
	}
};

const std::wstring Streamlet::BOOLEAN = L"boolean";
const std::wstring Streamlet::INTEGER = L"integer";
const std::wstring Streamlet::STRING = L"string";

#define STREAMLET_RUN(clazz) \
int main(int argc, char** argv) { \
	setlocale(LC_ALL, ""); \
	clazz s; \
	return s.run(); \
}