/**
* Relational pipes
* Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <iostream>
#include <exception>
#include <vector>
#include <string>
#include <sstream>
#include <codecvt>
#include <locale>
#include <regex>
#include "StreamletMsg.h"
/**
* Unlike the protocol and the message format,
* these helper classes and functions are not part of the public API.
* Thus when writing custom streamlets, it is better to copy this file
* and review its changes while upgrading to new upstream version.
*/
using S = relpipe::in::filesystem::StreamletMsg;
class Streamlet {
protected:
class Message {
public:
int code;
std::vector<std::wstring> parameters;
Message() {
}
Message(int code) : code(code) {
}
Message(int code, std::vector<std::wstring> parameters) : code(code), parameters(parameters) {
}
Message(int code, std::wstring p1) : code(code), parameters({p1}) {
}
Message(int code, std::wstring p1, std::wstring p2) : code(code), parameters({p1, p2}) {
}
};
private:
static const char SEPARATOR = '\0';
int readInt() {
return std::stoi(readString());
}
std::wstring readString() {
std::stringstream s;
for (char ch; std::cin.read(&ch, 1).good() && ch != SEPARATOR;) s.put(ch);
return convertor.from_bytes(s.str());
}
void writeString(std::wstring s) {
std::cout << convertor.to_bytes(s.c_str());
std::cout.put(SEPARATOR);
if (std::cout.bad()) throw std::runtime_error("Unable to write to sub-process.");
}
void writeInt(int i) {
writeString(std::to_wstring(i));
}
void flush() {
std::cout.flush();
}
Message read() {
Message m;
m.code = readInt();
int count = readInt();
for (int i = 0; i < count; i++) m.parameters.push_back(readString());
return m;
}
/**
* The std::wsmatch contains only references to original string,
* so we need to copy it in order to make it persistent and independent from variables that may evaporate.
*/
void copyMatches(std::wsmatch& source, std::vector<std::wstring>& destination) {
for (std::wstring s : source) destination.emplace_back(s);
}
void processMessages() {
while (true) {
Message m = read();
if (m.code == S::VERSION_SUPPORTED) processVersionSupported(m);
else if (m.code == S::WAITING_FOR_VERSION) processWaitingForVersion(m);
else if (m.code == S::RELATION_START) processRelationStart(m);
else if (m.code == S::INPUT_ATTRIBUTE_METADATA) processInputAttributeMetadata(m);
else if (m.code == S::OUTPUT_ATTRIBUTE_ALIAS) processOutputAttributeAlias(m);
else if (m.code == S::OPTION) processOption(m);
else if (m.code == S::INPUT_ATTRIBUTE) processInputAttribute(m);
else if (m.code == S::WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA) processWaitingForOutputAttributesMetadata(m);
else if (m.code == S::WAITING_FOR_OUTPUT_ATTRIBUTES) processWaitingForOutputAttributes(m);
else if (m.code == S::RELATION_END) break;
else processUnsupportedMessage(m);
}
}
protected:
class AttributeMetadata {
public:
std::wstring name;
std::wstring type;
};
class OutputAttribute {
public:
std::wstring value;
bool isNull;
};
class Option {
public:
std::wstring name;
std::wstring value;
std::vector<std::wstring> nameMatch;
std::vector<std::wstring> valueMatch;
Option(std::wstring name, std::wstring value) : name(name), value(value) {
}
};
private:
std::vector<std::wstring> versionsSupported;
std::vector<AttributeMetadata> inputAttributes;
std::vector<std::wstring> outputAttributeAliases;
std::vector<Option> options;
std::wstring currentRelation;
std::wstring currentFile;
std::wstring_convert < std::codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings. Or use always UTF-8 for communication with subprocesses.
protected:
/**
* n.b. generic streamlet (later in relpipe-tr-streamler) will not have currentFile
*/
std::wstring getCurrentFile() {
return currentFile;
}
std::wstring getCurrentRelation() {
return currentRelation;
}
std::wstring fromBytes(std::string s) {
return convertor.from_bytes(s);
}
std::string toBytes(std::wstring s) {
return convertor.to_bytes(s);
}
static const std::wstring BOOLEAN;
static const std::wstring INTEGER;
static const std::wstring STRING;
virtual void write(Message m) {
writeInt(m.code);
writeInt(m.parameters.size());
for (auto p : m.parameters) writeString(p);
flush();
}
virtual void processVersionSupported(Message& m) {
versionsSupported.push_back(m.parameters[0]);
}
virtual void processWaitingForVersion(Message& m) {
for (std::wstring v : versionsSupported) if (v == L"1") return write({S::VERSION_ACCEPTED, L"1"});
write({S::STREAMLET_ERROR, L"INCOMPATIBLE_VERSION", L"Only version 1 is supported by this streamlet."});
}
virtual void processRelationStart(Message& m) {
currentRelation = m.parameters[0];
}
virtual void processInputAttributeMetadata(Message& m) {
inputAttributes.push_back({m.parameters[0], m.parameters[1]});
}
virtual void processOutputAttributeAlias(Message& m) {
outputAttributeAliases.push_back(m.parameters[0]);
}
virtual void processOption(Message& m) {
options.push_back({m.parameters[0], m.parameters[1]});
}
virtual void processInputAttribute(Message& m) {
int index = std::stoi(m.parameters[0]);
std::wstring value = m.parameters[1];
bool isNull = m.parameters[2] == L"true";
if (inputAttributes[index].name == L"path") currentFile = value;
}
virtual void processWaitingForOutputAttributesMetadata(Message& m) {
for (AttributeMetadata am : getOutputAttributesMetadata()) write({S::OUTPUT_ATTRIBUTE_METADATA, am.name, am.type});
write({S::WAITING_FOR_INPUT_ATTRIBUTES});
}
virtual void processWaitingForOutputAttributes(Message& m) {
for (OutputAttribute oa : getOutputAttributes()) write({S::OUTPUT_ATTRIBUTE, oa.value, oa.isNull ? L"true" : L"false"});
write({S::WAITING_FOR_INPUT_ATTRIBUTES});
}
virtual void processUnsupportedMessage(Message& m) {
write({S::STREAMLET_ERROR, L"UNSUPPORTED_MESSAGE"});
}
virtual std::wstring getAlias(int index, const std::wstring& defaultValue) {
if (outputAttributeAliases.size() > index) return outputAttributeAliases[index];
else return defaultValue;
}
virtual std::vector<Option> getOptions(std::wstring name) {
std::vector<Option> result;
for (Option o : options) if (o.name == name) result.push_back(o);
return result;
}
virtual std::vector<Option> getOptions(std::wregex namePattern) {
std::vector<Option> result;
std::wsmatch nameMatch;
for (Option o : options) if (std::regex_match(o.name, nameMatch, namePattern)) {
copyMatches(nameMatch, o.nameMatch);
result.push_back(o);
}
return result;
}
virtual std::vector<Option> getOptions(std::wregex namePattern, std::wregex valuePattern) {
// TODO: support multiple modes:
// a) throw an exception if valuePattern does not match
// b) return option even if valuePattern does not match (valueMatch will be empty)
// c) skip options with value not matching (current behavior)
std::wsmatch nameMatch;
std::wsmatch valueMatch;
std::vector<Option> result;
for (Option o : options) if (std::regex_match(o.name, nameMatch, namePattern) && std::regex_match(o.value, valueMatch, valuePattern)) {
copyMatches(nameMatch, o.nameMatch);
copyMatches(valueMatch, o.valueMatch);
result.push_back(o);
}
return result;
}
virtual std::vector<AttributeMetadata> getOutputAttributesMetadata() = 0;
virtual std::vector<OutputAttribute> getOutputAttributes() = 0;
public:
virtual ~Streamlet() {
}
int run() {
try {
processMessages();
return 0;
} catch (std::exception& e) {
write({S::STREAMLET_ERROR, L"xxxx", L"Exception in streamlet: " + convertor.from_bytes(e.what())}); // FIXME: correct error codes
return 1;
} catch (...) {
write({S::STREAMLET_ERROR, L"xxxx", L"Unknown exception in streamlet."}); // FIXME: correct error codes
return 1;
}
}
};
const std::wstring Streamlet::BOOLEAN = L"boolean";
const std::wstring Streamlet::INTEGER = L"integer";
const std::wstring Streamlet::STRING = L"string";
#define STREAMLET_RUN(clazz) \
int main(int argc, char** argv) { \
setlocale(LC_ALL, ""); \
clazz s; \
return s.run(); \
}