streamlet examples: dirty implementation of Java helper classes + demo code v_0
authorFrantišek Kučera <franta-hg@frantovo.cz>
Wed, 29 Jan 2020 18:05:13 +0100
branchv_0
changeset 72 f7b9db6fc32b
parent 71 f8fe085c1c9f
child 73 1a067a217454
streamlet examples: dirty implementation of Java helper classes + demo code
.hgignore
streamlet-examples/JarInfo.java
streamlet-examples/Makefile
streamlet-examples/Streamlet.java
--- a/.hgignore	Wed Jan 29 12:40:43 2020 +0100
+++ b/.hgignore	Wed Jan 29 18:05:13 2020 +0100
@@ -2,6 +2,7 @@
 
 *~
 CMakeLists.txt.user
+*.class
 
 syntax: regexp
 
@@ -12,4 +13,4 @@
 ^build/
 ^nbproject/private/
 
-^streamlet-examples/(xpath|pid)$
\ No newline at end of file
+^streamlet-examples/(xpath|pid|jar_info|zip_info)$
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/streamlet-examples/JarInfo.java	Wed Jan 29 18:05:13 2020 +0100
@@ -0,0 +1,51 @@
+
+/**
+ * Relational pipes
+ * Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, version 3 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.jar.JarFile;
+
+public class JarInfo extends Streamlet {
+
+	public static void main(String[] args) throws IOException {
+		JarInfo s = new JarInfo();
+		int status = s.run();
+		System.exit(status);
+
+		// TODO: return real values:
+		JarFile jar = new JarFile(new File(args[0]));
+		String mainClass = jar.getManifest() == null ? null : jar.getManifest().getMainAttributes().getValue("Main-Class");
+		System.out.println("Name:       " + jar.getName());
+		System.out.println("Comment:    " + jar.getComment());
+		System.out.println("Entries:    " + jar.stream().count());
+		System.out.println("Main class: " + mainClass);
+	}
+
+	protected List<Streamlet.AttributeMetadata> getOutputAttributesMetadata() {
+		List<Streamlet.AttributeMetadata> result = new LinkedList<>();
+		result.add(new Streamlet.AttributeMetadata("main_class", Streamlet.Type.STRING));
+		return result;
+	}
+
+	protected List<Object> getOutputAttributes() {
+		List<Object> result = new LinkedList<>();
+		result.add("TODO: main class");
+		return result;
+	}
+}
--- a/streamlet-examples/Makefile	Wed Jan 29 12:40:43 2020 +0100
+++ b/streamlet-examples/Makefile	Wed Jan 29 18:05:13 2020 +0100
@@ -1,4 +1,4 @@
-all: xpath pid
+all: xpath pid jar_info zip_info
 	
 .PHONY: all clean
 
@@ -8,6 +8,17 @@
 pid: streamlet-common.h pid.cpp
 	g++ -g -fno-omit-frame-pointer -fsanitize=address pid.cpp -o pid
 
+jar_info: Streamlet.java JarInfo.java
+	javac JarInfo.java
+	jar cfe jar_info JarInfo Streamlet*.class JarInfo.class
+	chmod +x jar_info
+
+zip_info: jar_info
+	ln -s jar_info zip_info
+
 clean:
 	rm -f xpath
 	rm -f pid
+	rm -f jar_info
+	rm -f zip_info
+	rm -f *.class
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/streamlet-examples/Streamlet.java	Wed Jan 29 18:05:13 2020 +0100
@@ -0,0 +1,328 @@
+
+/**
+ * Relational pipes
+ * Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, version 3 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Unlike the protocol and the message format, these helper classes are not part of the public API. Thus when writing
+ * custom streamlets, it is better to copy this file and review its changes while upgrading to new upstream version.
+ */
+public abstract class Streamlet {
+
+	// TODO: use generated constants
+	private static final int VERSION_SUPPORTED = 100;
+	private static final int WAITING_FOR_VERSION = 101;
+	private static final int VERSION_ACCEPTED = 102;
+	private static final int RELATION_START = 103;
+	private static final int INPUT_ATTRIBUTE_METADATA = 104;
+	private static final int OUTPUT_ATTRIBUTE_ALIAS = 105;
+	private static final int OPTION = 106;
+	private static final int COMPLETION_REQUEST = 107;
+	private static final int COMPLETION = 108;
+	private static final int COMPLETION_END = 109;
+	private static final int WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA = 110;
+	private static final int OUTPUT_ATTRIBUTE_METADATA = 111;
+	private static final int WAITING_FOR_INPUT_ATTRIBUTES = 112;
+	private static final int INPUT_ATTRIBUTE = 113;
+	private static final int WAITING_FOR_OUTPUT_ATTRIBUTES = 114;
+	private static final int OUTPUT_ATTRIBUTE = 115;
+	private static final int EXECUTOR_ERROR = 116;
+	private static final int STREAMLET_ERROR = 117;
+	private static final int STREAMLET_WARNING = 118;
+	private static final int RELATION_END = 120;
+
+	protected static class Message {
+
+		public int code;
+		public List<String> parameters;
+
+		public Message() {
+		}
+
+		public Message(int code, String... parameters) {
+			this.code = code;
+			this.parameters = Arrays.asList(parameters);
+		}
+
+	}
+
+	private static final char SEPARATOR = '\0';
+
+	private int readInt() throws IOException {
+		return Integer.valueOf(readString());
+	}
+
+	private String readString() throws IOException {
+		ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+		for (int b; (b = System.in.read()) > 0;) {
+			buffer.write(b);
+		}
+		return new String(buffer.toByteArray());
+	}
+
+	private void writeString(String s) {
+		System.out.print(s);
+		System.out.print(SEPARATOR);
+	}
+
+	private void writeInt(int i) {
+		writeString(String.valueOf(i));
+	}
+
+	private void flush() {
+		System.out.flush();
+	}
+
+	private Message read() throws IOException {
+		Message m = new Message();
+		m.code = readInt();
+		int count = readInt();
+		m.parameters = new ArrayList<>(count);
+		for (int i = 0; i < count; i++) {
+			m.parameters.add(readString());
+		}
+		return m;
+	}
+
+	private void copyMatches(Matcher source, List<String> destination) {
+		for (int i = 1; i < source.groupCount(); i++) {
+			destination.add(source.group(i));
+		}
+	}
+
+	private void processMessages() throws IOException {
+		OUTER:
+		while (true) {
+			Message m = read();
+			switch (m.code) {
+				case VERSION_SUPPORTED:
+					processVersionSupported(m);
+					break;
+				case WAITING_FOR_VERSION:
+					processWaitingForVersion(m);
+					break;
+				case RELATION_START:
+					processRelationStart(m);
+					break;
+				case INPUT_ATTRIBUTE_METADATA:
+					processInputAttributeMetadata(m);
+					break;
+				case OUTPUT_ATTRIBUTE_ALIAS:
+					processOutputAttributeAlias(m);
+					break;
+				case OPTION:
+					processOption(m);
+					break;
+				case INPUT_ATTRIBUTE:
+					processInputAttribute(m);
+					break;
+				case WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA:
+					processWaitingForOutputAttributesMetadata(m);
+					break;
+				case WAITING_FOR_OUTPUT_ATTRIBUTES:
+					processWaitingForOutputAttributes(m);
+					break;
+				case RELATION_END:
+					break OUTER;
+				default:
+					processUnsupportedMessage(m);
+					break;
+			}
+		}
+	}
+
+	protected static class AttributeMetadata {
+
+		public String name;
+		public Type type;
+
+		public AttributeMetadata(String name, Type type) {
+			this.name = name;
+			this.type = type;
+		}
+	};
+
+	protected static class Option {
+
+		public String name;
+		public String value;
+		public List<String> nameMatch;
+		public List<String> valueMatch;
+
+		Option(String name, String value) {
+			this.name = name;
+			this.value = value;
+		}
+	}
+
+	protected List<String> versionsSupported = new LinkedList<>();
+	protected List<AttributeMetadata> inputAttributes = new ArrayList<>();
+	protected List<String> outputAttributeAliases = new ArrayList<>();
+	protected List<Option> options = new LinkedList<>();
+	protected String currentRelation;
+	protected String currentFile;
+
+	protected static enum Type {
+		BOOLEAN,
+		INTEGER,
+		STRING
+	}
+
+	protected void write(Message m) {
+		writeInt(m.code);
+		writeInt(m.parameters.size());
+		for (String p : m.parameters) {
+			writeString(p);
+		}
+		flush();
+	}
+
+	protected void processVersionSupported(Message m) {
+		versionsSupported.add(m.parameters.get(0));
+	}
+
+	protected void processWaitingForVersion(Message m) {
+		for (String v : versionsSupported) {
+			if ("1".equals(v)) {
+				write(new Message(VERSION_ACCEPTED, "1"));
+				return;
+			}
+		}
+		write(new Message(STREAMLET_ERROR, "INCOMPATIBLE_VERSION", "Only version 1 is supported by this streamlet."));
+	}
+
+	protected void processRelationStart(Message m) {
+		currentRelation = m.parameters.get(0);
+	}
+
+	protected void processInputAttributeMetadata(Message m) {
+		inputAttributes.add(new AttributeMetadata(m.parameters.get(0), Type.valueOf(m.parameters.get(1).toUpperCase())));
+	}
+
+	protected void processOutputAttributeAlias(Message m) {
+		outputAttributeAliases.add(m.parameters.get(0));
+	}
+
+	protected void processOption(Message m) {
+		options.add(new Option(m.parameters.get(0), m.parameters.get(1)));
+	}
+
+	protected void processInputAttribute(Message m) {
+		int index = Integer.valueOf(m.parameters.get(0));
+		String value = m.parameters.get(1);
+		boolean isNull = "true".equals(m.parameters.get(2));
+		if ("path".equals(inputAttributes.get(index).name)) {
+			currentFile = value;
+		}
+	}
+
+	protected void processWaitingForOutputAttributesMetadata(Message m) {
+		for (AttributeMetadata am : getOutputAttributesMetadata()) {
+			write(new Message(OUTPUT_ATTRIBUTE_METADATA, am.name, am.type.name().toLowerCase()));
+		}
+		write(new Message(WAITING_FOR_INPUT_ATTRIBUTES));
+	}
+
+	protected void processWaitingForOutputAttributes(Message m) {
+		for (Object oa : getOutputAttributes()) {
+			write(new Message(OUTPUT_ATTRIBUTE, String.valueOf(oa), oa == null ? "true" : "false"));
+		}
+		write(new Message(WAITING_FOR_INPUT_ATTRIBUTES));
+	}
+
+	protected void processUnsupportedMessage(Message m) {
+		write(new Message(STREAMLET_ERROR, "UNSUPPORTED_MESSAGE"));
+	}
+
+	protected String getAlias(int index, String defaultValue) {
+		if (outputAttributeAliases.size() > index) {
+			return outputAttributeAliases.get(index);
+		} else {
+			return defaultValue;
+		}
+	}
+
+	protected List<Option> getOptions(String name) {
+		List<Option> result = new ArrayList<>();
+		for (Option o : options) {
+			if (Objects.equals(o.name, name)) {
+				result.add(o);
+			}
+		}
+		return result;
+	}
+
+	protected List<Option> getOptions(Pattern namePattern) {
+		List<Option> result = new ArrayList<>();
+		for (Option o : options) {
+			Matcher nameMatcher = namePattern.matcher(o.name);
+			if (nameMatcher.matches()) {
+				o.nameMatch = o.nameMatch == null ? new ArrayList<>() : o.nameMatch;
+				copyMatches(nameMatcher, o.nameMatch);
+				result.add(o);
+			}
+		}
+		return result;
+	}
+
+	protected List<Option> getOptions(Pattern namePattern, Pattern valuePattern) {
+		// TODO: support multiple modes: 
+		//   a) throw an exception if valuePattern does not match
+		//   b) return option even if valuePattern does not match (valueMatch will be empty)
+		//   c) skip options with value not matching (current behavior)
+		List<Option> result = new ArrayList<>();
+		for (Option o : options) {
+			Matcher nameMatcher = namePattern.matcher(o.name);
+			Matcher valueMatcher = valuePattern.matcher(o.value);
+			if (nameMatcher.matches() && valueMatcher.matches()) {
+				o.nameMatch = o.nameMatch == null ? new ArrayList<>() : o.nameMatch;
+				o.valueMatch = o.valueMatch == null ? new ArrayList<>() : o.valueMatch;
+				copyMatches(nameMatcher, o.nameMatch);
+				copyMatches(valueMatcher, o.valueMatch);
+				result.add(o);
+			}
+		}
+		return result;
+	}
+
+	protected abstract List<AttributeMetadata> getOutputAttributesMetadata();
+
+	protected abstract List<Object> getOutputAttributes();
+
+	public int run() {
+		try {
+			processMessages();
+			return 0;
+		} catch (Exception e) {
+			StringWriter stackTraceStringWriter = new StringWriter();
+			PrintWriter stackTracePrintWriter = new PrintWriter(stackTraceStringWriter);
+			e.printStackTrace(stackTracePrintWriter);
+			write(new Message(STREAMLET_ERROR, "xxxx", "Exception in streamlet: " + e.getClass().getName() + ": " + e.getLocalizedMessage() + "\n" + stackTraceStringWriter.toString())); // FIXME: correct error codes
+			return 1;
+		}
+	}
+}