#!/bin/bash
# Relational pipes
# Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# Unlike the protocol and the message format,
# these helper functions are not part of the public API.
# Thus when writing custom streamlets, it is better to copy this file
# and review its changes while upgrading to new upstream version.
send() {
local msgVar="EXEC_MSG_$1";
printf '%s\0' "${!msgVar}"
shift;
printf '%s\0' "$#"
for param in "$@"; do printf '%s\0' "$param"; done
}
read_nullbyte() { local IFS=; for v in "$@"; do export "$v"; read -r -d '' "$v"; done }
processMessages() {
while read_nullbyte code length; do
local msgVar="EXEC_MSG_$code";
local functionName="processMessage_${!msgVar}";
local parameters=();
for (( i=0; i<$length; i++)); do
read_nullbyte parameter;
parameters+=("$parameter")
done
type "$functionName" &>/dev/null || functionName="processUnsupportedMessage";
"$functionName" "${parameters[@]}";
done
}
processUnsupportedMessage() {
echo "Unsupported message: $*" >&2
# TODO: send error, no debug
# send STREAMLET_ERROR "UNSUPPORTED_MESSAGE"
}
initialize() {
versionsSupported=();
optionNames=();
optionValues=();
inputAttributeNames=();
inputAttributeTypes=();
outputAttributeAliases=();
currentRelation="";
currentFile="";
}
processMessage_VERSION_SUPPORTED() {
versionsSupported+=("$1");
}
processMessage_WAITING_FOR_VERSION() {
for v in "${versionsSupported[@]}"; do
if [[ "x$v" == "x1" ]]; then
send VERSION_ACCEPTED 1;
return;
fi
done
send STREAMLET_ERROR "INCOMPATIBLE_VERSION" "Only version 1 is supported by this streamlet."
}
processMessage_RELATION_START() {
currentRelation="$1";
return;
}
processMessage_INPUT_ATTRIBUTE_METADATA() {
inputAttributeNames+=("$1");
inputAttributeTypes+=("$2");
}
processMessage_OUTPUT_ATTRIBUTE_ALIAS() {
outputAttributeAliases+=("$1");
}
processMessage_OPTION() {
optionNames+=("$1");
optionValues+=("$2");
}
processMessage_INPUT_ATTRIBUTE() {
local index="$1";
local value="$2";
local isNull="$3";
if [[ "_${inputAttributeNames[$index]}" == "_path" ]]; then
currentFile="$value";
fi
}
processMessage_RELATION_END() {
# Now we should stop reading messages,
# but the executor also closes our STDIN,
# so the loop in processMessages() terminates.
# Override this function to do some clean-up.
return;
}