--- a/.hgignore Wed Jan 29 12:40:43 2020 +0100
+++ b/.hgignore Wed Jan 29 18:05:13 2020 +0100
@@ -2,6 +2,7 @@
*~
CMakeLists.txt.user
+*.class
syntax: regexp
@@ -12,4 +13,4 @@
^build/
^nbproject/private/
-^streamlet-examples/(xpath|pid)$
\ No newline at end of file
+^streamlet-examples/(xpath|pid|jar_info|zip_info)$
\ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/streamlet-examples/JarInfo.java Wed Jan 29 18:05:13 2020 +0100
@@ -0,0 +1,51 @@
+
+/**
+ * Relational pipes
+ * Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, version 3 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.jar.JarFile;
+
+public class JarInfo extends Streamlet {
+
+ public static void main(String[] args) throws IOException {
+ JarInfo s = new JarInfo();
+ int status = s.run();
+ System.exit(status);
+
+ // TODO: return real values:
+ JarFile jar = new JarFile(new File(args[0]));
+ String mainClass = jar.getManifest() == null ? null : jar.getManifest().getMainAttributes().getValue("Main-Class");
+ System.out.println("Name: " + jar.getName());
+ System.out.println("Comment: " + jar.getComment());
+ System.out.println("Entries: " + jar.stream().count());
+ System.out.println("Main class: " + mainClass);
+ }
+
+ protected List<Streamlet.AttributeMetadata> getOutputAttributesMetadata() {
+ List<Streamlet.AttributeMetadata> result = new LinkedList<>();
+ result.add(new Streamlet.AttributeMetadata("main_class", Streamlet.Type.STRING));
+ return result;
+ }
+
+ protected List<Object> getOutputAttributes() {
+ List<Object> result = new LinkedList<>();
+ result.add("TODO: main class");
+ return result;
+ }
+}
--- a/streamlet-examples/Makefile Wed Jan 29 12:40:43 2020 +0100
+++ b/streamlet-examples/Makefile Wed Jan 29 18:05:13 2020 +0100
@@ -1,4 +1,4 @@
-all: xpath pid
+all: xpath pid jar_info zip_info
.PHONY: all clean
@@ -8,6 +8,17 @@
pid: streamlet-common.h pid.cpp
g++ -g -fno-omit-frame-pointer -fsanitize=address pid.cpp -o pid
+jar_info: Streamlet.java JarInfo.java
+ javac JarInfo.java
+ jar cfe jar_info JarInfo Streamlet*.class JarInfo.class
+ chmod +x jar_info
+
+zip_info: jar_info
+ ln -s jar_info zip_info
+
clean:
rm -f xpath
rm -f pid
+ rm -f jar_info
+ rm -f zip_info
+ rm -f *.class
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/streamlet-examples/Streamlet.java Wed Jan 29 18:05:13 2020 +0100
@@ -0,0 +1,328 @@
+
+/**
+ * Relational pipes
+ * Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, version 3 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Unlike the protocol and the message format, these helper classes are not part of the public API. Thus when writing
+ * custom streamlets, it is better to copy this file and review its changes while upgrading to new upstream version.
+ */
+public abstract class Streamlet {
+
+ // TODO: use generated constants
+ private static final int VERSION_SUPPORTED = 100;
+ private static final int WAITING_FOR_VERSION = 101;
+ private static final int VERSION_ACCEPTED = 102;
+ private static final int RELATION_START = 103;
+ private static final int INPUT_ATTRIBUTE_METADATA = 104;
+ private static final int OUTPUT_ATTRIBUTE_ALIAS = 105;
+ private static final int OPTION = 106;
+ private static final int COMPLETION_REQUEST = 107;
+ private static final int COMPLETION = 108;
+ private static final int COMPLETION_END = 109;
+ private static final int WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA = 110;
+ private static final int OUTPUT_ATTRIBUTE_METADATA = 111;
+ private static final int WAITING_FOR_INPUT_ATTRIBUTES = 112;
+ private static final int INPUT_ATTRIBUTE = 113;
+ private static final int WAITING_FOR_OUTPUT_ATTRIBUTES = 114;
+ private static final int OUTPUT_ATTRIBUTE = 115;
+ private static final int EXECUTOR_ERROR = 116;
+ private static final int STREAMLET_ERROR = 117;
+ private static final int STREAMLET_WARNING = 118;
+ private static final int RELATION_END = 120;
+
+ protected static class Message {
+
+ public int code;
+ public List<String> parameters;
+
+ public Message() {
+ }
+
+ public Message(int code, String... parameters) {
+ this.code = code;
+ this.parameters = Arrays.asList(parameters);
+ }
+
+ }
+
+ private static final char SEPARATOR = '\0';
+
+ private int readInt() throws IOException {
+ return Integer.valueOf(readString());
+ }
+
+ private String readString() throws IOException {
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ for (int b; (b = System.in.read()) > 0;) {
+ buffer.write(b);
+ }
+ return new String(buffer.toByteArray());
+ }
+
+ private void writeString(String s) {
+ System.out.print(s);
+ System.out.print(SEPARATOR);
+ }
+
+ private void writeInt(int i) {
+ writeString(String.valueOf(i));
+ }
+
+ private void flush() {
+ System.out.flush();
+ }
+
+ private Message read() throws IOException {
+ Message m = new Message();
+ m.code = readInt();
+ int count = readInt();
+ m.parameters = new ArrayList<>(count);
+ for (int i = 0; i < count; i++) {
+ m.parameters.add(readString());
+ }
+ return m;
+ }
+
+ private void copyMatches(Matcher source, List<String> destination) {
+ for (int i = 1; i < source.groupCount(); i++) {
+ destination.add(source.group(i));
+ }
+ }
+
+ private void processMessages() throws IOException {
+ OUTER:
+ while (true) {
+ Message m = read();
+ switch (m.code) {
+ case VERSION_SUPPORTED:
+ processVersionSupported(m);
+ break;
+ case WAITING_FOR_VERSION:
+ processWaitingForVersion(m);
+ break;
+ case RELATION_START:
+ processRelationStart(m);
+ break;
+ case INPUT_ATTRIBUTE_METADATA:
+ processInputAttributeMetadata(m);
+ break;
+ case OUTPUT_ATTRIBUTE_ALIAS:
+ processOutputAttributeAlias(m);
+ break;
+ case OPTION:
+ processOption(m);
+ break;
+ case INPUT_ATTRIBUTE:
+ processInputAttribute(m);
+ break;
+ case WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA:
+ processWaitingForOutputAttributesMetadata(m);
+ break;
+ case WAITING_FOR_OUTPUT_ATTRIBUTES:
+ processWaitingForOutputAttributes(m);
+ break;
+ case RELATION_END:
+ break OUTER;
+ default:
+ processUnsupportedMessage(m);
+ break;
+ }
+ }
+ }
+
+ protected static class AttributeMetadata {
+
+ public String name;
+ public Type type;
+
+ public AttributeMetadata(String name, Type type) {
+ this.name = name;
+ this.type = type;
+ }
+ };
+
+ protected static class Option {
+
+ public String name;
+ public String value;
+ public List<String> nameMatch;
+ public List<String> valueMatch;
+
+ Option(String name, String value) {
+ this.name = name;
+ this.value = value;
+ }
+ }
+
+ protected List<String> versionsSupported = new LinkedList<>();
+ protected List<AttributeMetadata> inputAttributes = new ArrayList<>();
+ protected List<String> outputAttributeAliases = new ArrayList<>();
+ protected List<Option> options = new LinkedList<>();
+ protected String currentRelation;
+ protected String currentFile;
+
+ protected static enum Type {
+ BOOLEAN,
+ INTEGER,
+ STRING
+ }
+
+ protected void write(Message m) {
+ writeInt(m.code);
+ writeInt(m.parameters.size());
+ for (String p : m.parameters) {
+ writeString(p);
+ }
+ flush();
+ }
+
+ protected void processVersionSupported(Message m) {
+ versionsSupported.add(m.parameters.get(0));
+ }
+
+ protected void processWaitingForVersion(Message m) {
+ for (String v : versionsSupported) {
+ if ("1".equals(v)) {
+ write(new Message(VERSION_ACCEPTED, "1"));
+ return;
+ }
+ }
+ write(new Message(STREAMLET_ERROR, "INCOMPATIBLE_VERSION", "Only version 1 is supported by this streamlet."));
+ }
+
+ protected void processRelationStart(Message m) {
+ currentRelation = m.parameters.get(0);
+ }
+
+ protected void processInputAttributeMetadata(Message m) {
+ inputAttributes.add(new AttributeMetadata(m.parameters.get(0), Type.valueOf(m.parameters.get(1).toUpperCase())));
+ }
+
+ protected void processOutputAttributeAlias(Message m) {
+ outputAttributeAliases.add(m.parameters.get(0));
+ }
+
+ protected void processOption(Message m) {
+ options.add(new Option(m.parameters.get(0), m.parameters.get(1)));
+ }
+
+ protected void processInputAttribute(Message m) {
+ int index = Integer.valueOf(m.parameters.get(0));
+ String value = m.parameters.get(1);
+ boolean isNull = "true".equals(m.parameters.get(2));
+ if ("path".equals(inputAttributes.get(index).name)) {
+ currentFile = value;
+ }
+ }
+
+ protected void processWaitingForOutputAttributesMetadata(Message m) {
+ for (AttributeMetadata am : getOutputAttributesMetadata()) {
+ write(new Message(OUTPUT_ATTRIBUTE_METADATA, am.name, am.type.name().toLowerCase()));
+ }
+ write(new Message(WAITING_FOR_INPUT_ATTRIBUTES));
+ }
+
+ protected void processWaitingForOutputAttributes(Message m) {
+ for (Object oa : getOutputAttributes()) {
+ write(new Message(OUTPUT_ATTRIBUTE, String.valueOf(oa), oa == null ? "true" : "false"));
+ }
+ write(new Message(WAITING_FOR_INPUT_ATTRIBUTES));
+ }
+
+ protected void processUnsupportedMessage(Message m) {
+ write(new Message(STREAMLET_ERROR, "UNSUPPORTED_MESSAGE"));
+ }
+
+ protected String getAlias(int index, String defaultValue) {
+ if (outputAttributeAliases.size() > index) {
+ return outputAttributeAliases.get(index);
+ } else {
+ return defaultValue;
+ }
+ }
+
+ protected List<Option> getOptions(String name) {
+ List<Option> result = new ArrayList<>();
+ for (Option o : options) {
+ if (Objects.equals(o.name, name)) {
+ result.add(o);
+ }
+ }
+ return result;
+ }
+
+ protected List<Option> getOptions(Pattern namePattern) {
+ List<Option> result = new ArrayList<>();
+ for (Option o : options) {
+ Matcher nameMatcher = namePattern.matcher(o.name);
+ if (nameMatcher.matches()) {
+ o.nameMatch = o.nameMatch == null ? new ArrayList<>() : o.nameMatch;
+ copyMatches(nameMatcher, o.nameMatch);
+ result.add(o);
+ }
+ }
+ return result;
+ }
+
+ protected List<Option> getOptions(Pattern namePattern, Pattern valuePattern) {
+ // TODO: support multiple modes:
+ // a) throw an exception if valuePattern does not match
+ // b) return option even if valuePattern does not match (valueMatch will be empty)
+ // c) skip options with value not matching (current behavior)
+ List<Option> result = new ArrayList<>();
+ for (Option o : options) {
+ Matcher nameMatcher = namePattern.matcher(o.name);
+ Matcher valueMatcher = valuePattern.matcher(o.value);
+ if (nameMatcher.matches() && valueMatcher.matches()) {
+ o.nameMatch = o.nameMatch == null ? new ArrayList<>() : o.nameMatch;
+ o.valueMatch = o.valueMatch == null ? new ArrayList<>() : o.valueMatch;
+ copyMatches(nameMatcher, o.nameMatch);
+ copyMatches(valueMatcher, o.valueMatch);
+ result.add(o);
+ }
+ }
+ return result;
+ }
+
+ protected abstract List<AttributeMetadata> getOutputAttributesMetadata();
+
+ protected abstract List<Object> getOutputAttributes();
+
+ public int run() {
+ try {
+ processMessages();
+ return 0;
+ } catch (Exception e) {
+ StringWriter stackTraceStringWriter = new StringWriter();
+ PrintWriter stackTracePrintWriter = new PrintWriter(stackTraceStringWriter);
+ e.printStackTrace(stackTracePrintWriter);
+ write(new Message(STREAMLET_ERROR, "xxxx", "Exception in streamlet: " + e.getClass().getName() + ": " + e.getLocalizedMessage() + "\n" + stackTraceStringWriter.toString())); // FIXME: correct error codes
+ return 1;
+ }
+ }
+}