streamlet-examples/streamlet-common.h
branchv_0
changeset 63 8c6885543e2c
child 66 8a8b6434e4bb
equal deleted inserted replaced
62:a467e8cbd16b 63:8c6885543e2c
       
     1 /**
       
     2  * Relational pipes
       
     3  * Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
       
     4  *
       
     5  * This program is free software: you can redistribute it and/or modify
       
     6  * it under the terms of the GNU General Public License as published by
       
     7  * the Free Software Foundation, version 3 of the License.
       
     8  *
       
     9  * This program is distributed in the hope that it will be useful,
       
    10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
       
    11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
       
    12  * GNU General Public License for more details.
       
    13  *
       
    14  * You should have received a copy of the GNU General Public License
       
    15  * along with this program. If not, see <http://www.gnu.org/licenses/>.
       
    16  */
       
    17 #pragma once
       
    18 
       
    19 #include <iostream>
       
    20 #include <exception>
       
    21 #include <vector>
       
    22 #include <string>
       
    23 #include <sstream>
       
    24 #include <codecvt>
       
    25 #include <locale>
       
    26 
       
    27 #include "../src/StreamletMsg.h"
       
    28 
       
    29 /**
       
    30  * Unlike the protocol and the message format,
       
    31  * these helper classes and functions are not part of the public API.
       
    32  * Thus when writing custom streamlets, it is better to copy this file
       
    33  * and review its changes while upgrading to new upstream version.
       
    34  */
       
    35 
       
    36 using S = relpipe::in::filesystem::StreamletMsg;
       
    37 
       
    38 class Streamlet {
       
    39 private:
       
    40 
       
    41 	class Message {
       
    42 	public:
       
    43 		int code;
       
    44 		std::vector<std::wstring> parameters;
       
    45 
       
    46 		Message() {
       
    47 		}
       
    48 
       
    49 		Message(int code) : code(code) {
       
    50 		}
       
    51 
       
    52 		Message(int code, std::vector<std::wstring> parameters) : code(code), parameters(parameters) {
       
    53 		}
       
    54 
       
    55 		Message(int code, std::wstring p1) : code(code), parameters({p1}) {
       
    56 		}
       
    57 
       
    58 		Message(int code, std::wstring p1, std::wstring p2) : code(code), parameters({p1, p2}) {
       
    59 		}
       
    60 	};
       
    61 
       
    62 	static const char SEPARATOR = '\0';
       
    63 
       
    64 	int readInt() {
       
    65 		return std::stoi(readString());
       
    66 	}
       
    67 
       
    68 	std::wstring readString() {
       
    69 		std::stringstream s;
       
    70 		for (char ch; std::cin.read(&ch, 1).good() && ch != SEPARATOR;) s.put(ch);
       
    71 		return convertor.from_bytes(s.str());
       
    72 	}
       
    73 
       
    74 	void writeString(std::wstring s) {
       
    75 		std::cout << convertor.to_bytes(s.c_str());
       
    76 		std::cout.put(SEPARATOR);
       
    77 		if (std::cout.bad()) throw std::runtime_error("Unable to write to sub-process.");
       
    78 	}
       
    79 
       
    80 	void writeInt(int i) {
       
    81 		writeString(std::to_wstring(i));
       
    82 	}
       
    83 
       
    84 	void flush() {
       
    85 		std::cout.flush();
       
    86 	}
       
    87 
       
    88 	Message read() {
       
    89 		Message m;
       
    90 		m.code = readInt();
       
    91 		int count = readInt();
       
    92 		for (int i = 0; i < count; i++) m.parameters.push_back(readString());
       
    93 		return m;
       
    94 	}
       
    95 
       
    96 	void processMessages() {
       
    97 		while (true) {
       
    98 			Message m = read();
       
    99 			if (m.code == S::VERSION_SUPPORTED) processVersionSupported(m);
       
   100 			else if (m.code == S::WAITING_FOR_VERSION) processWaitingForVersion(m);
       
   101 			else if (m.code == S::RELATION_START) processRelationStart(m);
       
   102 			else if (m.code == S::INPUT_ATTRIBUTE_METADATA) processInputAttributeMetadata(m);
       
   103 			else if (m.code == S::OUTPUT_ATTRIBUTE_ALIAS) processOutputAttributeAlias(m);
       
   104 			else if (m.code == S::OPTION) processOption(m);
       
   105 			else if (m.code == S::INPUT_ATTRIBUTE) processInputAttribute(m);
       
   106 			else if (m.code == S::WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA) processWaitingForOutputAttributesMetadata(m);
       
   107 			else if (m.code == S::WAITING_FOR_OUTPUT_ATTRIBUTES) processWaitingForOutputAttributes(m);
       
   108 			else if (m.code == S::RELATION_END) break;
       
   109 			else processUnsupportedMessage(m);
       
   110 		}
       
   111 	}
       
   112 
       
   113 protected:
       
   114 
       
   115 	class AttributeMetadata {
       
   116 	public:
       
   117 		std::wstring name;
       
   118 		std::wstring type;
       
   119 	};
       
   120 
       
   121 	class OutputAttribute {
       
   122 	public:
       
   123 		std::wstring value;
       
   124 		bool isNull;
       
   125 	};
       
   126 
       
   127 	class Option {
       
   128 	public:
       
   129 		std::wstring name;
       
   130 		std::wstring value;
       
   131 	};
       
   132 
       
   133 	std::vector<std::wstring> versionsSupported;
       
   134 	std::vector<AttributeMetadata> inputAttributes;
       
   135 	std::vector<std::wstring> outputAttributeAliases;
       
   136 	std::vector<Option> options;
       
   137 	std::wstring currentRelation;
       
   138 	std::wstring currentFile;
       
   139 
       
   140 	std::wstring_convert < std::codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings. Or use always UTF-8 for communication with subprocesses.
       
   141 
       
   142 	static const std::wstring BOOLEAN;
       
   143 	static const std::wstring INTEGER;
       
   144 	static const std::wstring STRING;
       
   145 
       
   146 	virtual void write(Message m) {
       
   147 		writeInt(m.code);
       
   148 		writeInt(m.parameters.size());
       
   149 		for (auto p : m.parameters) writeString(p);
       
   150 		flush();
       
   151 	}
       
   152 
       
   153 	virtual void processVersionSupported(Message& m) {
       
   154 		versionsSupported.push_back(m.parameters[0]);
       
   155 	}
       
   156 
       
   157 	virtual void processWaitingForVersion(Message& m) {
       
   158 		// TODO: check that 1 is supported
       
   159 		write({S::VERSION_ACCEPTED, L"1"});
       
   160 	}
       
   161 
       
   162 	virtual void processRelationStart(Message& m) {
       
   163 		currentRelation = m.parameters[0];
       
   164 	}
       
   165 
       
   166 	virtual void processInputAttributeMetadata(Message& m) {
       
   167 		inputAttributes.push_back({m.parameters[0], m.parameters[1]});
       
   168 	}
       
   169 
       
   170 	virtual void processOutputAttributeAlias(Message& m) {
       
   171 		outputAttributeAliases.push_back(m.parameters[0]);
       
   172 	}
       
   173 
       
   174 	virtual void processOption(Message& m) {
       
   175 		options.push_back({m.parameters[0], m.parameters[1]});
       
   176 	}
       
   177 
       
   178 	virtual void processInputAttribute(Message& m) {
       
   179 		int index = std::stoi(m.parameters[0]);
       
   180 		std::wstring value = m.parameters[1];
       
   181 		bool isNull = m.parameters[2] == L"true";
       
   182 		if (inputAttributes[index].name == L"path") currentFile = value;
       
   183 	}
       
   184 
       
   185 	virtual void processWaitingForOutputAttributesMetadata(Message& m) {
       
   186 		for (AttributeMetadata am : getOutputAttributesMetadata()) write({S::OUTPUT_ATTRIBUTE_METADATA, am.name, am.type});
       
   187 		write({S::WAITING_FOR_INPUT_ATTRIBUTES});
       
   188 	}
       
   189 
       
   190 	virtual void processWaitingForOutputAttributes(Message& m) {
       
   191 		for (OutputAttribute oa : getOutputAttributes()) write({S::OUTPUT_ATTRIBUTE, oa.value, oa.isNull ? L"true" : L"false"});
       
   192 		write({S::WAITING_FOR_INPUT_ATTRIBUTES});
       
   193 	}
       
   194 
       
   195 	virtual void processUnsupportedMessage(Message& m) {
       
   196 		write({S::STREAMLET_ERROR, L"UNSUPPORTED_MESSAGE"});
       
   197 	}
       
   198 
       
   199 	virtual std::wstring getAlias(int index, const std::wstring& defaultValue) {
       
   200 		if (outputAttributeAliases.size() > index) return outputAttributeAliases[index];
       
   201 		else return defaultValue;
       
   202 	}
       
   203 
       
   204 	virtual std::vector<AttributeMetadata> getOutputAttributesMetadata() = 0;
       
   205 	virtual std::vector<OutputAttribute> getOutputAttributes() = 0;
       
   206 
       
   207 public:
       
   208 
       
   209 	virtual ~Streamlet() {
       
   210 	}
       
   211 
       
   212 	int run() {
       
   213 		try {
       
   214 			processMessages();
       
   215 			return 0;
       
   216 		} catch (...) {
       
   217 			return 1;
       
   218 		}
       
   219 	}
       
   220 };
       
   221 
       
   222 const std::wstring Streamlet::BOOLEAN = L"boolean";
       
   223 const std::wstring Streamlet::INTEGER = L"integer";
       
   224 const std::wstring Streamlet::STRING = L"string";
       
   225 
       
   226 #define STREAMLET_RUN(clazz) \
       
   227 int main(int argc, char** argv) { \
       
   228 	clazz s; \
       
   229 	return s.run(); \
       
   230 }