diff -r f8fe085c1c9f -r f7b9db6fc32b streamlet-examples/Streamlet.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/streamlet-examples/Streamlet.java Wed Jan 29 18:05:13 2020 +0100 @@ -0,0 +1,328 @@ + +/** + * Relational pipes + * Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Unlike the protocol and the message format, these helper classes are not part of the public API. Thus when writing + * custom streamlets, it is better to copy this file and review its changes while upgrading to new upstream version. + */ +public abstract class Streamlet { + + // TODO: use generated constants + private static final int VERSION_SUPPORTED = 100; + private static final int WAITING_FOR_VERSION = 101; + private static final int VERSION_ACCEPTED = 102; + private static final int RELATION_START = 103; + private static final int INPUT_ATTRIBUTE_METADATA = 104; + private static final int OUTPUT_ATTRIBUTE_ALIAS = 105; + private static final int OPTION = 106; + private static final int COMPLETION_REQUEST = 107; + private static final int COMPLETION = 108; + private static final int COMPLETION_END = 109; + private static final int WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA = 110; + private static final int OUTPUT_ATTRIBUTE_METADATA = 111; + private static final int WAITING_FOR_INPUT_ATTRIBUTES = 112; + private static final int INPUT_ATTRIBUTE = 113; + private static final int WAITING_FOR_OUTPUT_ATTRIBUTES = 114; + private static final int OUTPUT_ATTRIBUTE = 115; + private static final int EXECUTOR_ERROR = 116; + private static final int STREAMLET_ERROR = 117; + private static final int STREAMLET_WARNING = 118; + private static final int RELATION_END = 120; + + protected static class Message { + + public int code; + public List parameters; + + public Message() { + } + + public Message(int code, String... parameters) { + this.code = code; + this.parameters = Arrays.asList(parameters); + } + + } + + private static final char SEPARATOR = '\0'; + + private int readInt() throws IOException { + return Integer.valueOf(readString()); + } + + private String readString() throws IOException { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + for (int b; (b = System.in.read()) > 0;) { + buffer.write(b); + } + return new String(buffer.toByteArray()); + } + + private void writeString(String s) { + System.out.print(s); + System.out.print(SEPARATOR); + } + + private void writeInt(int i) { + writeString(String.valueOf(i)); + } + + private void flush() { + System.out.flush(); + } + + private Message read() throws IOException { + Message m = new Message(); + m.code = readInt(); + int count = readInt(); + m.parameters = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + m.parameters.add(readString()); + } + return m; + } + + private void copyMatches(Matcher source, List destination) { + for (int i = 1; i < source.groupCount(); i++) { + destination.add(source.group(i)); + } + } + + private void processMessages() throws IOException { + OUTER: + while (true) { + Message m = read(); + switch (m.code) { + case VERSION_SUPPORTED: + processVersionSupported(m); + break; + case WAITING_FOR_VERSION: + processWaitingForVersion(m); + break; + case RELATION_START: + processRelationStart(m); + break; + case INPUT_ATTRIBUTE_METADATA: + processInputAttributeMetadata(m); + break; + case OUTPUT_ATTRIBUTE_ALIAS: + processOutputAttributeAlias(m); + break; + case OPTION: + processOption(m); + break; + case INPUT_ATTRIBUTE: + processInputAttribute(m); + break; + case WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA: + processWaitingForOutputAttributesMetadata(m); + break; + case WAITING_FOR_OUTPUT_ATTRIBUTES: + processWaitingForOutputAttributes(m); + break; + case RELATION_END: + break OUTER; + default: + processUnsupportedMessage(m); + break; + } + } + } + + protected static class AttributeMetadata { + + public String name; + public Type type; + + public AttributeMetadata(String name, Type type) { + this.name = name; + this.type = type; + } + }; + + protected static class Option { + + public String name; + public String value; + public List nameMatch; + public List valueMatch; + + Option(String name, String value) { + this.name = name; + this.value = value; + } + } + + protected List versionsSupported = new LinkedList<>(); + protected List inputAttributes = new ArrayList<>(); + protected List outputAttributeAliases = new ArrayList<>(); + protected List