streamlet-examples/streamlet-common.sh
author František Kučera <franta-hg@frantovo.cz>
Fri, 13 May 2022 21:35:30 +0200
branchv_0
changeset 96 c34106244a54
parent 38 4191af89968a
permissions -rw-r--r--
portable order of (i++) parameters

#!/bin/bash

# Relational pipes
# Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.


# Unlike the protocol and the message format,
# these helper functions are not part of the public API.
# Thus when writing custom streamlets, it is better to copy this file
# and review its changes while upgrading to new upstream version.


send() {
	local msgVar="EXEC_MSG_$1";
	printf '%s\0' "${!msgVar}"
	shift;
	printf '%s\0' "$#"
	for param in "$@"; do printf '%s\0' "$param"; done 
}

read_nullbyte() { local IFS=; for v in "$@"; do export "$v"; read -r -d '' "$v"; done }

processMessages() {
	while read_nullbyte code length; do
		local msgVar="EXEC_MSG_$code";
		local functionName="processMessage_${!msgVar}";
		local parameters=();
		for (( i=0; i<$length; i++)); do
			read_nullbyte parameter;
			parameters+=("$parameter")
		done
		type "$functionName" &>/dev/null || functionName="processUnsupportedMessage";
		"$functionName" "${parameters[@]}";
	done
}

processUnsupportedMessage() {
	echo "Unsupported message: $*" >&2
	# TODO: send error, no debug
	# send STREAMLET_ERROR "UNSUPPORTED_MESSAGE"
}

initialize() {
	versionsSupported=();
	optionNames=();
	optionValues=();
	inputAttributeNames=();
	inputAttributeTypes=();
	outputAttributeAliases=();
	currentRelation="";
	currentFile="";
}

processMessage_VERSION_SUPPORTED() {
	versionsSupported+=("$1");
}

processMessage_WAITING_FOR_VERSION() {
	for v in "${versionsSupported[@]}"; do
		if [[ "x$v" == "x1" ]]; then
			send VERSION_ACCEPTED 1;
			return;
		fi
	done
	send STREAMLET_ERROR "INCOMPATIBLE_VERSION" "Only version 1 is supported by this streamlet."
}

processMessage_RELATION_START() {
	currentRelation="$1";
	return;
}

processMessage_INPUT_ATTRIBUTE_METADATA() {
	inputAttributeNames+=("$1");
	inputAttributeTypes+=("$2");
}

processMessage_OUTPUT_ATTRIBUTE_ALIAS() {
	outputAttributeAliases+=("$1");
}

processMessage_OPTION() {
	optionNames+=("$1");
	optionValues+=("$2");
}

processMessage_INPUT_ATTRIBUTE() {
	local index="$1";
	local value="$2";
	local isNull="$3";
	if [[ "_${inputAttributeNames[$index]}" == "_path" ]]; then
		currentFile="$value";
	fi
}

processMessage_RELATION_END() {
	# Now we should stop reading messages,
	# but the executor also closes our STDIN,
	# so the loop in processMessages() terminates.
	# Override this function to do some clean-up.
	return;
}