/**
* Relational pipes
* Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Unlike the protocol and the message format, these helper classes are not part of the public API. Thus when writing
* custom streamlets, it is better to copy this file and review its changes while upgrading to new upstream version.
*/
public abstract class Streamlet {
// TODO: use generated constants
private static final int VERSION_SUPPORTED = 100;
private static final int WAITING_FOR_VERSION = 101;
private static final int VERSION_ACCEPTED = 102;
private static final int RELATION_START = 103;
private static final int INPUT_ATTRIBUTE_METADATA = 104;
private static final int OUTPUT_ATTRIBUTE_ALIAS = 105;
private static final int OPTION = 106;
private static final int COMPLETION_REQUEST = 107;
private static final int COMPLETION = 108;
private static final int COMPLETION_END = 109;
private static final int WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA = 110;
private static final int OUTPUT_ATTRIBUTE_METADATA = 111;
private static final int WAITING_FOR_INPUT_ATTRIBUTES = 112;
private static final int INPUT_ATTRIBUTE = 113;
private static final int WAITING_FOR_OUTPUT_ATTRIBUTES = 114;
private static final int OUTPUT_ATTRIBUTE = 115;
private static final int EXECUTOR_ERROR = 116;
private static final int STREAMLET_ERROR = 117;
private static final int STREAMLET_WARNING = 118;
private static final int RELATION_END = 120;
protected static class Message {
public int code;
public List<String> parameters;
public Message() {
}
public Message(int code, String... parameters) {
this.code = code;
this.parameters = Arrays.asList(parameters);
}
}
private static final char SEPARATOR = '\0';
private int readInt() throws IOException {
return Integer.valueOf(readString());
}
private String readString() throws IOException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
for (int b; (b = System.in.read()) > 0;) {
buffer.write(b);
}
return new String(buffer.toByteArray());
}
private void writeString(String s) {
System.out.print(s);
System.out.print(SEPARATOR);
}
private void writeInt(int i) {
writeString(String.valueOf(i));
}
private void flush() {
System.out.flush();
}
private Message read() throws IOException {
Message m = new Message();
m.code = readInt();
int count = readInt();
m.parameters = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
m.parameters.add(readString());
}
return m;
}
private void copyMatches(Matcher source, List<String> destination) {
for (int i = 1; i < source.groupCount(); i++) {
destination.add(source.group(i));
}
}
private void processMessages() throws IOException {
OUTER:
while (true) {
Message m = read();
switch (m.code) {
case VERSION_SUPPORTED:
processVersionSupported(m);
break;
case WAITING_FOR_VERSION:
processWaitingForVersion(m);
break;
case RELATION_START:
processRelationStart(m);
break;
case INPUT_ATTRIBUTE_METADATA:
processInputAttributeMetadata(m);
break;
case OUTPUT_ATTRIBUTE_ALIAS:
processOutputAttributeAlias(m);
break;
case OPTION:
processOption(m);
break;
case INPUT_ATTRIBUTE:
processInputAttribute(m);
break;
case WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA:
processWaitingForOutputAttributesMetadata(m);
break;
case WAITING_FOR_OUTPUT_ATTRIBUTES:
processWaitingForOutputAttributes(m);
break;
case RELATION_END:
break OUTER;
default:
processUnsupportedMessage(m);
break;
}
}
}
protected static class AttributeMetadata {
public String name;
public Type type;
public AttributeMetadata(String name, Type type) {
this.name = name;
this.type = type;
}
};
protected static class Option {
public String name;
public String value;
public List<String> nameMatch;
public List<String> valueMatch;
Option(String name, String value) {
this.name = name;
this.value = value;
}
}
private List<String> versionsSupported = new LinkedList<>();
private List<AttributeMetadata> inputAttributes = new ArrayList<>();
private List<String> outputAttributeAliases = new ArrayList<>();
private List<Option> options = new LinkedList<>();
private String currentRelation;
private String currentFile;
/**
* @return n.b. generic streamlet (later in relpipe-tr-streamler) will not have currentFile
*/
public String getCurrentFile() {
return currentFile;
}
public String getCurrentRelation() {
return currentRelation;
}
protected static enum Type {
BOOLEAN,
INTEGER,
STRING
}
protected void write(Message m) {
writeInt(m.code);
writeInt(m.parameters.size());
for (String p : m.parameters) {
writeString(p);
}
flush();
}
protected void processVersionSupported(Message m) {
versionsSupported.add(m.parameters.get(0));
}
protected void processWaitingForVersion(Message m) {
for (String v : versionsSupported) {
if ("1".equals(v)) {
write(new Message(VERSION_ACCEPTED, "1"));
return;
}
}
write(new Message(STREAMLET_ERROR, "INCOMPATIBLE_VERSION", "Only version 1 is supported by this streamlet."));
}
protected void processRelationStart(Message m) {
currentRelation = m.parameters.get(0);
}
protected void processInputAttributeMetadata(Message m) {
inputAttributes.add(new AttributeMetadata(m.parameters.get(0), Type.valueOf(m.parameters.get(1).toUpperCase())));
}
protected void processOutputAttributeAlias(Message m) {
outputAttributeAliases.add(m.parameters.get(0));
}
protected void processOption(Message m) {
options.add(new Option(m.parameters.get(0), m.parameters.get(1)));
}
protected void processInputAttribute(Message m) {
int index = Integer.valueOf(m.parameters.get(0));
String value = m.parameters.get(1);
boolean isNull = "true".equals(m.parameters.get(2));
if ("path".equals(inputAttributes.get(index).name)) {
currentFile = value;
}
}
protected void processWaitingForOutputAttributesMetadata(Message m) {
for (AttributeMetadata am : getOutputAttributesMetadata()) {
write(new Message(OUTPUT_ATTRIBUTE_METADATA, am.name, am.type.name().toLowerCase()));
}
write(new Message(WAITING_FOR_INPUT_ATTRIBUTES));
}
protected void processWaitingForOutputAttributes(Message m) {
for (Object oa : getOutputAttributes()) {
write(new Message(OUTPUT_ATTRIBUTE, String.valueOf(oa), oa == null ? "true" : "false"));
}
write(new Message(WAITING_FOR_INPUT_ATTRIBUTES));
}
protected void processUnsupportedMessage(Message m) {
write(new Message(STREAMLET_ERROR, "UNSUPPORTED_MESSAGE"));
}
protected String getAlias(int index, String defaultValue) {
if (outputAttributeAliases.size() > index) {
return outputAttributeAliases.get(index);
} else {
return defaultValue;
}
}
protected List<Option> getOptions(String name) {
List<Option> result = new ArrayList<>();
for (Option o : options) {
if (Objects.equals(o.name, name)) {
result.add(o);
}
}
return result;
}
protected List<Option> getOptions(Pattern namePattern) {
List<Option> result = new ArrayList<>();
for (Option o : options) {
Matcher nameMatcher = namePattern.matcher(o.name);
if (nameMatcher.matches()) {
o.nameMatch = o.nameMatch == null ? new ArrayList<>() : o.nameMatch;
copyMatches(nameMatcher, o.nameMatch);
result.add(o);
}
}
return result;
}
protected List<Option> getOptions(Pattern namePattern, Pattern valuePattern) {
// TODO: support multiple modes:
// a) throw an exception if valuePattern does not match
// b) return option even if valuePattern does not match (valueMatch will be empty)
// c) skip options with value not matching (current behavior)
List<Option> result = new ArrayList<>();
for (Option o : options) {
Matcher nameMatcher = namePattern.matcher(o.name);
Matcher valueMatcher = valuePattern.matcher(o.value);
if (nameMatcher.matches() && valueMatcher.matches()) {
o.nameMatch = o.nameMatch == null ? new ArrayList<>() : o.nameMatch;
o.valueMatch = o.valueMatch == null ? new ArrayList<>() : o.valueMatch;
copyMatches(nameMatcher, o.nameMatch);
copyMatches(valueMatcher, o.valueMatch);
result.add(o);
}
}
return result;
}
protected abstract List<AttributeMetadata> getOutputAttributesMetadata();
protected abstract List<Object> getOutputAttributes();
public int run() {
try {
processMessages();
return 0;
} catch (Exception e) {
StringWriter stackTraceStringWriter = new StringWriter();
PrintWriter stackTracePrintWriter = new PrintWriter(stackTraceStringWriter);
e.printStackTrace(stackTracePrintWriter);
write(new Message(STREAMLET_ERROR, "xxxx", "Exception in streamlet: " + e.getClass().getName() + ": " + e.getLocalizedMessage() + "\n" + stackTraceStringWriter.toString())); // FIXME: correct error codes
return 1;
}
}
}