streamlet-examples/Streamlet.java
author František Kučera <franta-hg@frantovo.cz>
Thu, 30 Jan 2020 23:27:49 +0100
branchv_0
changeset 78 5a63bf594f53
parent 75 ecbf6504915c
permissions -rw-r--r--
streamlet examples: xpath: support XInclude (like in relpipe-tr-xmltable)


/**
 * Relational pipes
 * Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, version 3 of the License.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * Unlike the protocol and the message format, these helper classes are not part of the public API. Thus when writing
 * custom streamlets, it is better to copy this file and review its changes while upgrading to new upstream version.
 */
public abstract class Streamlet {

	// TODO: use generated constants
	private static final int VERSION_SUPPORTED = 100;
	private static final int WAITING_FOR_VERSION = 101;
	private static final int VERSION_ACCEPTED = 102;
	private static final int RELATION_START = 103;
	private static final int INPUT_ATTRIBUTE_METADATA = 104;
	private static final int OUTPUT_ATTRIBUTE_ALIAS = 105;
	private static final int OPTION = 106;
	private static final int COMPLETION_REQUEST = 107;
	private static final int COMPLETION = 108;
	private static final int COMPLETION_END = 109;
	private static final int WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA = 110;
	private static final int OUTPUT_ATTRIBUTE_METADATA = 111;
	private static final int WAITING_FOR_INPUT_ATTRIBUTES = 112;
	private static final int INPUT_ATTRIBUTE = 113;
	private static final int WAITING_FOR_OUTPUT_ATTRIBUTES = 114;
	private static final int OUTPUT_ATTRIBUTE = 115;
	private static final int EXECUTOR_ERROR = 116;
	private static final int STREAMLET_ERROR = 117;
	private static final int STREAMLET_WARNING = 118;
	private static final int RELATION_END = 120;

	protected static class Message {

		public int code;
		public List<String> parameters;

		public Message() {
		}

		public Message(int code, String... parameters) {
			this.code = code;
			this.parameters = Arrays.asList(parameters);
		}

	}

	private static final char SEPARATOR = '\0';

	private int readInt() throws IOException {
		return Integer.valueOf(readString());
	}

	private String readString() throws IOException {
		ByteArrayOutputStream buffer = new ByteArrayOutputStream();
		for (int b; (b = System.in.read()) > 0;) {
			buffer.write(b);
		}
		return new String(buffer.toByteArray());
	}

	private void writeString(String s) {
		System.out.print(s);
		System.out.print(SEPARATOR);
	}

	private void writeInt(int i) {
		writeString(String.valueOf(i));
	}

	private void flush() {
		System.out.flush();
	}

	private Message read() throws IOException {
		Message m = new Message();
		m.code = readInt();
		int count = readInt();
		m.parameters = new ArrayList<>(count);
		for (int i = 0; i < count; i++) {
			m.parameters.add(readString());
		}
		return m;
	}

	private void copyMatches(Matcher source, List<String> destination) {
		for (int i = 1; i < source.groupCount(); i++) {
			destination.add(source.group(i));
		}
	}

	private void processMessages() throws IOException {
		OUTER:
		while (true) {
			Message m = read();
			switch (m.code) {
				case VERSION_SUPPORTED:
					processVersionSupported(m);
					break;
				case WAITING_FOR_VERSION:
					processWaitingForVersion(m);
					break;
				case RELATION_START:
					processRelationStart(m);
					break;
				case INPUT_ATTRIBUTE_METADATA:
					processInputAttributeMetadata(m);
					break;
				case OUTPUT_ATTRIBUTE_ALIAS:
					processOutputAttributeAlias(m);
					break;
				case OPTION:
					processOption(m);
					break;
				case INPUT_ATTRIBUTE:
					processInputAttribute(m);
					break;
				case WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA:
					processWaitingForOutputAttributesMetadata(m);
					break;
				case WAITING_FOR_OUTPUT_ATTRIBUTES:
					processWaitingForOutputAttributes(m);
					break;
				case RELATION_END:
					break OUTER;
				default:
					processUnsupportedMessage(m);
					break;
			}
		}
	}

	protected static class AttributeMetadata {

		public String name;
		public Type type;

		public AttributeMetadata(String name, Type type) {
			this.name = name;
			this.type = type;
		}
	};

	protected static class Option {

		public String name;
		public String value;
		public List<String> nameMatch;
		public List<String> valueMatch;

		Option(String name, String value) {
			this.name = name;
			this.value = value;
		}
	}

	private List<String> versionsSupported = new LinkedList<>();
	private List<AttributeMetadata> inputAttributes = new ArrayList<>();
	private List<String> outputAttributeAliases = new ArrayList<>();
	private List<Option> options = new LinkedList<>();
	private String currentRelation;
	private String currentFile;

	/**
	 * @return n.b. generic streamlet (later in relpipe-tr-streamler) will not have currentFile
	 */
	public String getCurrentFile() {
		return currentFile;
	}

	public String getCurrentRelation() {
		return currentRelation;
	}

	protected static enum Type {
		BOOLEAN,
		INTEGER,
		STRING
	}

	protected void write(Message m) {
		writeInt(m.code);
		writeInt(m.parameters.size());
		for (String p : m.parameters) {
			writeString(p);
		}
		flush();
	}

	protected void processVersionSupported(Message m) {
		versionsSupported.add(m.parameters.get(0));
	}

	protected void processWaitingForVersion(Message m) {
		for (String v : versionsSupported) {
			if ("1".equals(v)) {
				write(new Message(VERSION_ACCEPTED, "1"));
				return;
			}
		}
		write(new Message(STREAMLET_ERROR, "INCOMPATIBLE_VERSION", "Only version 1 is supported by this streamlet."));
	}

	protected void processRelationStart(Message m) {
		currentRelation = m.parameters.get(0);
	}

	protected void processInputAttributeMetadata(Message m) {
		inputAttributes.add(new AttributeMetadata(m.parameters.get(0), Type.valueOf(m.parameters.get(1).toUpperCase())));
	}

	protected void processOutputAttributeAlias(Message m) {
		outputAttributeAliases.add(m.parameters.get(0));
	}

	protected void processOption(Message m) {
		options.add(new Option(m.parameters.get(0), m.parameters.get(1)));
	}

	protected void processInputAttribute(Message m) {
		int index = Integer.valueOf(m.parameters.get(0));
		String value = m.parameters.get(1);
		boolean isNull = "true".equals(m.parameters.get(2));
		if ("path".equals(inputAttributes.get(index).name)) {
			currentFile = value;
		}
	}

	protected void processWaitingForOutputAttributesMetadata(Message m) {
		for (AttributeMetadata am : getOutputAttributesMetadata()) {
			write(new Message(OUTPUT_ATTRIBUTE_METADATA, am.name, am.type.name().toLowerCase()));
		}
		write(new Message(WAITING_FOR_INPUT_ATTRIBUTES));
	}

	protected void processWaitingForOutputAttributes(Message m) {
		for (Object oa : getOutputAttributes()) {
			write(new Message(OUTPUT_ATTRIBUTE, String.valueOf(oa), oa == null ? "true" : "false"));
		}
		write(new Message(WAITING_FOR_INPUT_ATTRIBUTES));
	}

	protected void processUnsupportedMessage(Message m) {
		write(new Message(STREAMLET_ERROR, "UNSUPPORTED_MESSAGE"));
	}

	protected String getAlias(int index, String defaultValue) {
		if (outputAttributeAliases.size() > index) {
			return outputAttributeAliases.get(index);
		} else {
			return defaultValue;
		}
	}

	protected List<Option> getOptions(String name) {
		List<Option> result = new ArrayList<>();
		for (Option o : options) {
			if (Objects.equals(o.name, name)) {
				result.add(o);
			}
		}
		return result;
	}

	protected List<Option> getOptions(Pattern namePattern) {
		List<Option> result = new ArrayList<>();
		for (Option o : options) {
			Matcher nameMatcher = namePattern.matcher(o.name);
			if (nameMatcher.matches()) {
				o.nameMatch = o.nameMatch == null ? new ArrayList<>() : o.nameMatch;
				copyMatches(nameMatcher, o.nameMatch);
				result.add(o);
			}
		}
		return result;
	}

	protected List<Option> getOptions(Pattern namePattern, Pattern valuePattern) {
		// TODO: support multiple modes: 
		//   a) throw an exception if valuePattern does not match
		//   b) return option even if valuePattern does not match (valueMatch will be empty)
		//   c) skip options with value not matching (current behavior)
		List<Option> result = new ArrayList<>();
		for (Option o : options) {
			Matcher nameMatcher = namePattern.matcher(o.name);
			Matcher valueMatcher = valuePattern.matcher(o.value);
			if (nameMatcher.matches() && valueMatcher.matches()) {
				o.nameMatch = o.nameMatch == null ? new ArrayList<>() : o.nameMatch;
				o.valueMatch = o.valueMatch == null ? new ArrayList<>() : o.valueMatch;
				copyMatches(nameMatcher, o.nameMatch);
				copyMatches(valueMatcher, o.valueMatch);
				result.add(o);
			}
		}
		return result;
	}

	protected abstract List<AttributeMetadata> getOutputAttributesMetadata();

	protected abstract List<Object> getOutputAttributes();

	public int run() {
		try {
			processMessages();
			return 0;
		} catch (Exception e) {
			StringWriter stackTraceStringWriter = new StringWriter();
			PrintWriter stackTracePrintWriter = new PrintWriter(stackTraceStringWriter);
			e.printStackTrace(stackTracePrintWriter);
			write(new Message(STREAMLET_ERROR, "xxxx", "Exception in streamlet: " + e.getClass().getName() + ": " + e.getLocalizedMessage() + "\n" + stackTraceStringWriter.toString())); // FIXME: correct error codes
			return 1;
		}
	}
}