diff -r bccda5688d71 -r f9cada1d46a4 streamlet-examples/streamlet-common.sh --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/streamlet-examples/streamlet-common.sh Sat Jan 11 19:15:58 2020 +0100 @@ -0,0 +1,116 @@ +#!/bin/bash + +# Relational pipes +# Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info) +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +# Unlike the protocol and the message format, +# these helper functions are not part of the public API. +# Thus when writing custom streamlets, it is better to copy this file +# and review its changes while upgrading to new upstream version. + + +send() { + local msgVar="EXEC_MSG_$1"; + printf '%s\0' "${!msgVar}" + shift; + printf '%s\0' "$#" + for param in "$@"; do printf '%s\0' "$param"; done +} + +read_nullbyte() { local IFS=; for v in "$@"; do export "$v"; read -r -d '' "$v"; done } + +processMessages() { + while read_nullbyte code length; do + local msgVar="EXEC_MSG_$code"; + local functionName="processMessage_${!msgVar}"; + local parameters=(); + for (( i=0; i<$length; i++)); do + read_nullbyte parameter; + parameters+=("$parameter") + done + type "$functionName" &>/dev/null || functionName="processUnsupportedMessage"; + "$functionName" "${parameters[@]}"; + done +} + +processUnsupportedMessage() { + echo "Unsupported message: $*" >&2 + # TODO: send error, no debug + # send STREAMLET_ERROR "UNSUPPORTED_MESSAGE" +} + +initialize() { + versionsSupported=(); + optionNames=(); + optionValues=(); + inputAttributeNames=(); + inputAttributeTypes=(); + outputAttributeAliases=(); + currentInputAttributeIndex=0; + currentOutputAttributeIndex=0; + currentRelation=""; + currentFile=""; +} + +processMessage_VERSION_SUPPORTED() { + versionsSupported+=("$1"); +} + +processMessage_WAITING_FOR_VERSION() { + for v in "${versionsSupported[@]}"; do + if [[ "x$v" == "x1" ]]; then + send VERSION_ACCEPTED 1; + return; + fi + done + send STREAMLET_ERROR "INCOMPATIBLE_VERSION" "Only version 1 is supported by this streamlet." +} + +processMessage_RELATION_START() { + currentRelation="$1"; + return; +} + +processMessage_INPUT_ATTRIBUTE_METADATA() { + inputAttributeNames+=("$1"); + inputAttributeTypes+=("$2"); +} + +processMessage_OUTPUT_ATTRIBUTE_ALIAS() { + outputAttributeAliases+=("$1"); +} + +processMessage_OPTION() { + optionNames+=("$1"); + optionValues+=("$2"); +} + +processMessage_INPUT_ATTRIBUTE() { + local index="$1"; + local value="$2"; + local isNull="$3"; + if [[ "_${inputAttributeNames[$index]}" == "_path" ]]; then + currentFile="$value"; + fi +} + +processMessage_RELATION_END() { + # Now we should stop reading messages, + # but the executor also closes our STDIN, + # so the loop in processMessages() terminates. + # Override this function to do some clean-up. + return; +}