streamlet-examples/Streamlet.java
branchv_0
changeset 72 f7b9db6fc32b
parent 69 52f837fbb216
child 75 ecbf6504915c
equal deleted inserted replaced
71:f8fe085c1c9f 72:f7b9db6fc32b
       
     1 
       
     2 /**
       
     3  * Relational pipes
       
     4  * Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
       
     5  *
       
     6  * This program is free software: you can redistribute it and/or modify
       
     7  * it under the terms of the GNU General Public License as published by
       
     8  * the Free Software Foundation, version 3 of the License.
       
     9  *
       
    10  * This program is distributed in the hope that it will be useful,
       
    11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
       
    12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
       
    13  * GNU General Public License for more details.
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License
       
    16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
       
    17  */
       
    18 import java.io.ByteArrayOutputStream;
       
    19 import java.io.IOException;
       
    20 import java.io.PrintWriter;
       
    21 import java.io.StringWriter;
       
    22 import java.util.ArrayList;
       
    23 import java.util.Arrays;
       
    24 import java.util.LinkedList;
       
    25 import java.util.List;
       
    26 import java.util.Objects;
       
    27 import java.util.regex.Matcher;
       
    28 import java.util.regex.Pattern;
       
    29 
       
    30 /**
       
    31  * Unlike the protocol and the message format, these helper classes are not part of the public API. Thus when writing
       
    32  * custom streamlets, it is better to copy this file and review its changes while upgrading to new upstream version.
       
    33  */
       
    34 public abstract class Streamlet {
       
    35 
       
    36 	// TODO: use generated constants
       
    37 	private static final int VERSION_SUPPORTED = 100;
       
    38 	private static final int WAITING_FOR_VERSION = 101;
       
    39 	private static final int VERSION_ACCEPTED = 102;
       
    40 	private static final int RELATION_START = 103;
       
    41 	private static final int INPUT_ATTRIBUTE_METADATA = 104;
       
    42 	private static final int OUTPUT_ATTRIBUTE_ALIAS = 105;
       
    43 	private static final int OPTION = 106;
       
    44 	private static final int COMPLETION_REQUEST = 107;
       
    45 	private static final int COMPLETION = 108;
       
    46 	private static final int COMPLETION_END = 109;
       
    47 	private static final int WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA = 110;
       
    48 	private static final int OUTPUT_ATTRIBUTE_METADATA = 111;
       
    49 	private static final int WAITING_FOR_INPUT_ATTRIBUTES = 112;
       
    50 	private static final int INPUT_ATTRIBUTE = 113;
       
    51 	private static final int WAITING_FOR_OUTPUT_ATTRIBUTES = 114;
       
    52 	private static final int OUTPUT_ATTRIBUTE = 115;
       
    53 	private static final int EXECUTOR_ERROR = 116;
       
    54 	private static final int STREAMLET_ERROR = 117;
       
    55 	private static final int STREAMLET_WARNING = 118;
       
    56 	private static final int RELATION_END = 120;
       
    57 
       
    58 	protected static class Message {
       
    59 
       
    60 		public int code;
       
    61 		public List<String> parameters;
       
    62 
       
    63 		public Message() {
       
    64 		}
       
    65 
       
    66 		public Message(int code, String... parameters) {
       
    67 			this.code = code;
       
    68 			this.parameters = Arrays.asList(parameters);
       
    69 		}
       
    70 
       
    71 	}
       
    72 
       
    73 	private static final char SEPARATOR = '\0';
       
    74 
       
    75 	private int readInt() throws IOException {
       
    76 		return Integer.valueOf(readString());
       
    77 	}
       
    78 
       
    79 	private String readString() throws IOException {
       
    80 		ByteArrayOutputStream buffer = new ByteArrayOutputStream();
       
    81 		for (int b; (b = System.in.read()) > 0;) {
       
    82 			buffer.write(b);
       
    83 		}
       
    84 		return new String(buffer.toByteArray());
       
    85 	}
       
    86 
       
    87 	private void writeString(String s) {
       
    88 		System.out.print(s);
       
    89 		System.out.print(SEPARATOR);
       
    90 	}
       
    91 
       
    92 	private void writeInt(int i) {
       
    93 		writeString(String.valueOf(i));
       
    94 	}
       
    95 
       
    96 	private void flush() {
       
    97 		System.out.flush();
       
    98 	}
       
    99 
       
   100 	private Message read() throws IOException {
       
   101 		Message m = new Message();
       
   102 		m.code = readInt();
       
   103 		int count = readInt();
       
   104 		m.parameters = new ArrayList<>(count);
       
   105 		for (int i = 0; i < count; i++) {
       
   106 			m.parameters.add(readString());
       
   107 		}
       
   108 		return m;
       
   109 	}
       
   110 
       
   111 	private void copyMatches(Matcher source, List<String> destination) {
       
   112 		for (int i = 1; i < source.groupCount(); i++) {
       
   113 			destination.add(source.group(i));
       
   114 		}
       
   115 	}
       
   116 
       
   117 	private void processMessages() throws IOException {
       
   118 		OUTER:
       
   119 		while (true) {
       
   120 			Message m = read();
       
   121 			switch (m.code) {
       
   122 				case VERSION_SUPPORTED:
       
   123 					processVersionSupported(m);
       
   124 					break;
       
   125 				case WAITING_FOR_VERSION:
       
   126 					processWaitingForVersion(m);
       
   127 					break;
       
   128 				case RELATION_START:
       
   129 					processRelationStart(m);
       
   130 					break;
       
   131 				case INPUT_ATTRIBUTE_METADATA:
       
   132 					processInputAttributeMetadata(m);
       
   133 					break;
       
   134 				case OUTPUT_ATTRIBUTE_ALIAS:
       
   135 					processOutputAttributeAlias(m);
       
   136 					break;
       
   137 				case OPTION:
       
   138 					processOption(m);
       
   139 					break;
       
   140 				case INPUT_ATTRIBUTE:
       
   141 					processInputAttribute(m);
       
   142 					break;
       
   143 				case WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA:
       
   144 					processWaitingForOutputAttributesMetadata(m);
       
   145 					break;
       
   146 				case WAITING_FOR_OUTPUT_ATTRIBUTES:
       
   147 					processWaitingForOutputAttributes(m);
       
   148 					break;
       
   149 				case RELATION_END:
       
   150 					break OUTER;
       
   151 				default:
       
   152 					processUnsupportedMessage(m);
       
   153 					break;
       
   154 			}
       
   155 		}
       
   156 	}
       
   157 
       
   158 	protected static class AttributeMetadata {
       
   159 
       
   160 		public String name;
       
   161 		public Type type;
       
   162 
       
   163 		public AttributeMetadata(String name, Type type) {
       
   164 			this.name = name;
       
   165 			this.type = type;
       
   166 		}
       
   167 	};
       
   168 
       
   169 	protected static class Option {
       
   170 
       
   171 		public String name;
       
   172 		public String value;
       
   173 		public List<String> nameMatch;
       
   174 		public List<String> valueMatch;
       
   175 
       
   176 		Option(String name, String value) {
       
   177 			this.name = name;
       
   178 			this.value = value;
       
   179 		}
       
   180 	}
       
   181 
       
   182 	protected List<String> versionsSupported = new LinkedList<>();
       
   183 	protected List<AttributeMetadata> inputAttributes = new ArrayList<>();
       
   184 	protected List<String> outputAttributeAliases = new ArrayList<>();
       
   185 	protected List<Option> options = new LinkedList<>();
       
   186 	protected String currentRelation;
       
   187 	protected String currentFile;
       
   188 
       
   189 	protected static enum Type {
       
   190 		BOOLEAN,
       
   191 		INTEGER,
       
   192 		STRING
       
   193 	}
       
   194 
       
   195 	protected void write(Message m) {
       
   196 		writeInt(m.code);
       
   197 		writeInt(m.parameters.size());
       
   198 		for (String p : m.parameters) {
       
   199 			writeString(p);
       
   200 		}
       
   201 		flush();
       
   202 	}
       
   203 
       
   204 	protected void processVersionSupported(Message m) {
       
   205 		versionsSupported.add(m.parameters.get(0));
       
   206 	}
       
   207 
       
   208 	protected void processWaitingForVersion(Message m) {
       
   209 		for (String v : versionsSupported) {
       
   210 			if ("1".equals(v)) {
       
   211 				write(new Message(VERSION_ACCEPTED, "1"));
       
   212 				return;
       
   213 			}
       
   214 		}
       
   215 		write(new Message(STREAMLET_ERROR, "INCOMPATIBLE_VERSION", "Only version 1 is supported by this streamlet."));
       
   216 	}
       
   217 
       
   218 	protected void processRelationStart(Message m) {
       
   219 		currentRelation = m.parameters.get(0);
       
   220 	}
       
   221 
       
   222 	protected void processInputAttributeMetadata(Message m) {
       
   223 		inputAttributes.add(new AttributeMetadata(m.parameters.get(0), Type.valueOf(m.parameters.get(1).toUpperCase())));
       
   224 	}
       
   225 
       
   226 	protected void processOutputAttributeAlias(Message m) {
       
   227 		outputAttributeAliases.add(m.parameters.get(0));
       
   228 	}
       
   229 
       
   230 	protected void processOption(Message m) {
       
   231 		options.add(new Option(m.parameters.get(0), m.parameters.get(1)));
       
   232 	}
       
   233 
       
   234 	protected void processInputAttribute(Message m) {
       
   235 		int index = Integer.valueOf(m.parameters.get(0));
       
   236 		String value = m.parameters.get(1);
       
   237 		boolean isNull = "true".equals(m.parameters.get(2));
       
   238 		if ("path".equals(inputAttributes.get(index).name)) {
       
   239 			currentFile = value;
       
   240 		}
       
   241 	}
       
   242 
       
   243 	protected void processWaitingForOutputAttributesMetadata(Message m) {
       
   244 		for (AttributeMetadata am : getOutputAttributesMetadata()) {
       
   245 			write(new Message(OUTPUT_ATTRIBUTE_METADATA, am.name, am.type.name().toLowerCase()));
       
   246 		}
       
   247 		write(new Message(WAITING_FOR_INPUT_ATTRIBUTES));
       
   248 	}
       
   249 
       
   250 	protected void processWaitingForOutputAttributes(Message m) {
       
   251 		for (Object oa : getOutputAttributes()) {
       
   252 			write(new Message(OUTPUT_ATTRIBUTE, String.valueOf(oa), oa == null ? "true" : "false"));
       
   253 		}
       
   254 		write(new Message(WAITING_FOR_INPUT_ATTRIBUTES));
       
   255 	}
       
   256 
       
   257 	protected void processUnsupportedMessage(Message m) {
       
   258 		write(new Message(STREAMLET_ERROR, "UNSUPPORTED_MESSAGE"));
       
   259 	}
       
   260 
       
   261 	protected String getAlias(int index, String defaultValue) {
       
   262 		if (outputAttributeAliases.size() > index) {
       
   263 			return outputAttributeAliases.get(index);
       
   264 		} else {
       
   265 			return defaultValue;
       
   266 		}
       
   267 	}
       
   268 
       
   269 	protected List<Option> getOptions(String name) {
       
   270 		List<Option> result = new ArrayList<>();
       
   271 		for (Option o : options) {
       
   272 			if (Objects.equals(o.name, name)) {
       
   273 				result.add(o);
       
   274 			}
       
   275 		}
       
   276 		return result;
       
   277 	}
       
   278 
       
   279 	protected List<Option> getOptions(Pattern namePattern) {
       
   280 		List<Option> result = new ArrayList<>();
       
   281 		for (Option o : options) {
       
   282 			Matcher nameMatcher = namePattern.matcher(o.name);
       
   283 			if (nameMatcher.matches()) {
       
   284 				o.nameMatch = o.nameMatch == null ? new ArrayList<>() : o.nameMatch;
       
   285 				copyMatches(nameMatcher, o.nameMatch);
       
   286 				result.add(o);
       
   287 			}
       
   288 		}
       
   289 		return result;
       
   290 	}
       
   291 
       
   292 	protected List<Option> getOptions(Pattern namePattern, Pattern valuePattern) {
       
   293 		// TODO: support multiple modes: 
       
   294 		//   a) throw an exception if valuePattern does not match
       
   295 		//   b) return option even if valuePattern does not match (valueMatch will be empty)
       
   296 		//   c) skip options with value not matching (current behavior)
       
   297 		List<Option> result = new ArrayList<>();
       
   298 		for (Option o : options) {
       
   299 			Matcher nameMatcher = namePattern.matcher(o.name);
       
   300 			Matcher valueMatcher = valuePattern.matcher(o.value);
       
   301 			if (nameMatcher.matches() && valueMatcher.matches()) {
       
   302 				o.nameMatch = o.nameMatch == null ? new ArrayList<>() : o.nameMatch;
       
   303 				o.valueMatch = o.valueMatch == null ? new ArrayList<>() : o.valueMatch;
       
   304 				copyMatches(nameMatcher, o.nameMatch);
       
   305 				copyMatches(valueMatcher, o.valueMatch);
       
   306 				result.add(o);
       
   307 			}
       
   308 		}
       
   309 		return result;
       
   310 	}
       
   311 
       
   312 	protected abstract List<AttributeMetadata> getOutputAttributesMetadata();
       
   313 
       
   314 	protected abstract List<Object> getOutputAttributes();
       
   315 
       
   316 	public int run() {
       
   317 		try {
       
   318 			processMessages();
       
   319 			return 0;
       
   320 		} catch (Exception e) {
       
   321 			StringWriter stackTraceStringWriter = new StringWriter();
       
   322 			PrintWriter stackTracePrintWriter = new PrintWriter(stackTraceStringWriter);
       
   323 			e.printStackTrace(stackTracePrintWriter);
       
   324 			write(new Message(STREAMLET_ERROR, "xxxx", "Exception in streamlet: " + e.getClass().getName() + ": " + e.getLocalizedMessage() + "\n" + stackTraceStringWriter.toString())); // FIXME: correct error codes
       
   325 			return 1;
       
   326 		}
       
   327 	}
       
   328 }