streamlet-examples/streamlet-common.sh
branchv_0
changeset 33 f9cada1d46a4
child 38 4191af89968a
equal deleted inserted replaced
32:bccda5688d71 33:f9cada1d46a4
       
     1 #!/bin/bash
       
     2 
       
     3 # Relational pipes
       
     4 # Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
       
     5 #
       
     6 # This program is free software: you can redistribute it and/or modify
       
     7 # it under the terms of the GNU General Public License as published by
       
     8 # the Free Software Foundation, version 3 of the License.
       
     9 #
       
    10 # This program is distributed in the hope that it will be useful,
       
    11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
       
    12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
       
    13 # GNU General Public License for more details.
       
    14 #
       
    15 # You should have received a copy of the GNU General Public License
       
    16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
       
    17 
       
    18 
       
    19 # Unlike the protocol and the message format,
       
    20 # these helper functions are not part of the public API.
       
    21 # Thus when writing custom streamlets, it is better to copy this file
       
    22 # and review its changes while upgrading to new upstream version.
       
    23 
       
    24 
       
    25 send() {
       
    26 	local msgVar="EXEC_MSG_$1";
       
    27 	printf '%s\0' "${!msgVar}"
       
    28 	shift;
       
    29 	printf '%s\0' "$#"
       
    30 	for param in "$@"; do printf '%s\0' "$param"; done 
       
    31 }
       
    32 
       
    33 read_nullbyte() { local IFS=; for v in "$@"; do export "$v"; read -r -d '' "$v"; done }
       
    34 
       
    35 processMessages() {
       
    36 	while read_nullbyte code length; do
       
    37 		local msgVar="EXEC_MSG_$code";
       
    38 		local functionName="processMessage_${!msgVar}";
       
    39 		local parameters=();
       
    40 		for (( i=0; i<$length; i++)); do
       
    41 			read_nullbyte parameter;
       
    42 			parameters+=("$parameter")
       
    43 		done
       
    44 		type "$functionName" &>/dev/null || functionName="processUnsupportedMessage";
       
    45 		"$functionName" "${parameters[@]}";
       
    46 	done
       
    47 }
       
    48 
       
    49 processUnsupportedMessage() {
       
    50 	echo "Unsupported message: $*" >&2
       
    51 	# TODO: send error, no debug
       
    52 	# send STREAMLET_ERROR "UNSUPPORTED_MESSAGE"
       
    53 }
       
    54 
       
    55 initialize() {
       
    56 	versionsSupported=();
       
    57 	optionNames=();
       
    58 	optionValues=();
       
    59 	inputAttributeNames=();
       
    60 	inputAttributeTypes=();
       
    61 	outputAttributeAliases=();
       
    62 	currentInputAttributeIndex=0;
       
    63 	currentOutputAttributeIndex=0;
       
    64 	currentRelation="";
       
    65 	currentFile="";
       
    66 }
       
    67 
       
    68 processMessage_VERSION_SUPPORTED() {
       
    69 	versionsSupported+=("$1");
       
    70 }
       
    71 
       
    72 processMessage_WAITING_FOR_VERSION() {
       
    73 	for v in "${versionsSupported[@]}"; do
       
    74 		if [[ "x$v" == "x1" ]]; then
       
    75 			send VERSION_ACCEPTED 1;
       
    76 			return;
       
    77 		fi
       
    78 	done
       
    79 	send STREAMLET_ERROR "INCOMPATIBLE_VERSION" "Only version 1 is supported by this streamlet."
       
    80 }
       
    81 
       
    82 processMessage_RELATION_START() {
       
    83 	currentRelation="$1";
       
    84 	return;
       
    85 }
       
    86 
       
    87 processMessage_INPUT_ATTRIBUTE_METADATA() {
       
    88 	inputAttributeNames+=("$1");
       
    89 	inputAttributeTypes+=("$2");
       
    90 }
       
    91 
       
    92 processMessage_OUTPUT_ATTRIBUTE_ALIAS() {
       
    93 	outputAttributeAliases+=("$1");
       
    94 }
       
    95 
       
    96 processMessage_OPTION() {
       
    97 	optionNames+=("$1");
       
    98 	optionValues+=("$2");
       
    99 }
       
   100 
       
   101 processMessage_INPUT_ATTRIBUTE() {
       
   102 	local index="$1";
       
   103 	local value="$2";
       
   104 	local isNull="$3";
       
   105 	if [[ "_${inputAttributeNames[$index]}" == "_path" ]]; then
       
   106 		currentFile="$value";
       
   107 	fi
       
   108 }
       
   109 
       
   110 processMessage_RELATION_END() {
       
   111 	# Now we should stop reading messages,
       
   112 	# but the executor also closes our STDIN,
       
   113 	# so the loop in processMessages() terminates.
       
   114 	# Override this function to do some clean-up.
       
   115 	return;
       
   116 }