# HG changeset patch # User František Kučera # Date 1578762810 -3600 # Node ID c64e1588f428708a3847e79206a52703e647caa8 # Parent 56409232e1a14132d8d33b184c892454099a81dd rename --exec to --streamlet diff -r 56409232e1a1 -r c64e1588f428 bash-completion.sh --- a/bash-completion.sh Sat Jan 11 00:58:35 2020 +0100 +++ b/bash-completion.sh Sat Jan 11 18:13:30 2020 +0100 @@ -60,13 +60,13 @@ elif [[ "$w2" == "--option" && "x$w0" == "x" ]]; then COMPREPLY=("''") elif [[ "$w1" == "--file" ]]; then COMPREPLY=($(compgen -W "${FILE_FIELDS[*]}" -- "$w0")) elif [[ "$w1" == "--xattr" ]]; then COMPREPLY=($(compgen -W "${XATTR_FIELDS[*]}" -- "$w0")) - elif [[ "$w1" == "--exec" ]]; then COMPREPLY=($(compgen -W "$(_relpipe_in_filesystem_scripts)" -- "$w0")) + elif [[ "$w1" == "--streamlet" ]]; then COMPREPLY=($(compgen -W "$(_relpipe_in_filesystem_scripts)" -- "$w0")) else OPTIONS=( "--relation" "--file" "--xattr" - "--exec" + "--streamlet" "--as" "--option" ) diff -r 56409232e1a1 -r c64e1588f428 src/CLIParser.h --- a/src/CLIParser.h Sat Jan 11 00:58:35 2020 +0100 +++ b/src/CLIParser.h Sat Jan 11 18:13:30 2020 +0100 @@ -51,7 +51,7 @@ static const string_t OPTION_FILE; static const string_t OPTION_XATTR; - static const string_t OPTION_EXEC; + static const string_t OPTION_STREAMLET; static const string_t OPTION_AS; static const string_t OPTION_OPTION; static const string_t OPTION_RELATION; @@ -68,7 +68,7 @@ for (int i = 0; i < arguments.size();) { string_t option = readNext(arguments, i); - if (option == CLIParser::OPTION_FILE || option == CLIParser::OPTION_XATTR || option == CLIParser::OPTION_EXEC) { + if (option == CLIParser::OPTION_FILE || option == CLIParser::OPTION_XATTR || option == CLIParser::OPTION_STREAMLET) { addField(c, currentGroup, currentName, currentAliases, currentOptions); // previous field currentGroup = option.substr(2); // cut off -- currentName = readNext(arguments, i); @@ -115,7 +115,7 @@ const string_t CLIParser::OPTION_FILE = L"--" + RequestedField::GROUP_FILE; const string_t CLIParser::OPTION_XATTR = L"--" + RequestedField::GROUP_XATTR; -const string_t CLIParser::OPTION_EXEC = L"--" + RequestedField::GROUP_EXEC; +const string_t CLIParser::OPTION_STREAMLET = L"--" + RequestedField::GROUP_STREAMLET; const string_t CLIParser::OPTION_AS = L"--as"; const string_t CLIParser::OPTION_OPTION = L"--option"; const string_t CLIParser::OPTION_RELATION = L"--relation"; diff -r 56409232e1a1 -r c64e1588f428 src/ExecAttributeFinder.h --- a/src/ExecAttributeFinder.h Sat Jan 11 00:58:35 2020 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,139 +0,0 @@ -/** - * Relational pipes - * Copyright © 2019 František Kučera (Frantovo.cz, GlobalCode.info) - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, version 3 of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -#pragma once - -#include -#include -#include -#include - -#include -#include -#include - -#include "RequestedField.h" -#include "SubProcess.h" -#include "AttributeFinder.h" -#include "ExecMsg.h" - -namespace relpipe { -namespace in { -namespace filesystem { - -namespace fs = std::filesystem; -using namespace relpipe::writer; - -class ExecAttributeFinder : public AttributeFinder { -private: - std::wstring_convert> convertor; // TODO: support also other encodings. - std::map> subProcesses; - std::map> cachedMetadata; - - string_t getExecCommand(const RequestedField& field) { - // TODO: move to another directory, exec, not script + use custom $PATH with no prefix - return SCRIPT_PREFIX + field.name; - } - -protected: - - virtual void writeFieldOfExistingFile(RelationalWriter* writer, const RequestedField& field) override { - // TODO: paralelize also over records → fork multiple processes and distribute records across them; then collect results (with a lock) - if (field.group == RequestedField::GROUP_EXEC) { - - subProcesses[field.id]->write({ExecMsg::INPUT_ATTRIBUTE, L"0", convertor.from_bytes(currentFileRaw), L"false"}); // index, value, isNull - subProcesses[field.id]->write({ExecMsg::WAITING_FOR_OUTPUT_ATTRIBUTES}); - - for (auto metadata : cachedMetadata[field.id]) { - SubProcess::Message m = subProcesses[field.id]->read(); - if (m.code == ExecMsg::OUTPUT_ATTRIBUTE) writer->writeAttribute(m.parameters[0]); - else throw RelpipeWriterException(L"Protocol violation from exec sub-process while reading: „" + metadata.attributeName + L"“. Expected OUTPUT_ATTRIBUTE but got: " + m.toString()); - } - - SubProcess::Message m = subProcesses[field.id]->read(); - if (m.code != ExecMsg::WAITING_FOR_INPUT_ATTRIBUTES) throw RelpipeWriterException(L"Protocol violation from exec sub-process. Expected WAITING_FOR_INPUT_ATTRIBUTES but got: " + m.toString()); - // TODO: generic protocol violation error messages / method for checking responses - } - } - -public: - - static const string_t SCRIPT_PREFIX; - - virtual vector toMetadata(RelationalWriter* writer, const RequestedField& field) override { - if (field.group == RequestedField::GROUP_EXEC) { - - if (cachedMetadata.count(field.id)) { - return cachedMetadata[field.id]; - } else { - - std::vector commandLine = {getExecCommand(field)}; - std::map environment; - - for (auto mn : ExecMsg::getMessageNames()) { - environment[L"EXEC_MSG_" + mn.second] = std::to_wstring(mn.first); - environment[L"EXEC_MSG_" + std::to_wstring(mn.first)] = mn.second; - } - - shared_ptr subProcess(SubProcess::create(commandLine, environment, false)); - subProcesses[field.id] = subProcess; - - string_t version = L"1"; - subProcess->write({ExecMsg::VERSION_SUPPORTED, version}); - subProcess->write({ExecMsg::WAITING_FOR_VERSION}); - SubProcess::Message versionMessage = subProcess->read(); - if (versionMessage.code == ExecMsg::VERSION_ACCEPTED && versionMessage.parameters[0] == version) { - subProcess->write({ExecMsg::RELATION_START}); - subProcess->write({ExecMsg::INPUT_ATTRIBUTE_METADATA, L"path", L"string"}); - for (string_t alias : field.getAliases()) subProcess->write({ExecMsg::OUTPUT_ATTRIBUTE_ALIAS, alias}); - for (int i = 0; i < field.options.size();) subProcess->write({ExecMsg::OPTION, field.options[i++], field.options[i++]}); - subProcess->write({ExecMsg::WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA}); - - vector metadata; - while (true) { - SubProcess::Message m = subProcess->read(); - if (m.code == ExecMsg::OUTPUT_ATTRIBUTE_METADATA) metadata.push_back({m.parameters[0], writer->toTypeId(m.parameters[1])}); - else if (m.code == ExecMsg::WAITING_FOR_INPUT_ATTRIBUTES) break; - } - - cachedMetadata[field.id] = metadata; - return metadata; - } else { - throw RelpipeWriterException(L"Incompatible exec sub-process version or message: " + versionMessage.toString()); - } - } - } else { - return {}; - } - } - - virtual ~ExecAttributeFinder() override { - for (auto s : subProcesses) { - try { - s.second->write({ExecMsg::RELATION_END}); - s.second->wait(); - } catch (...) { - std::wcerr << L"Exception caught during closing sub-process #" + std::to_wstring(s.first) + L" and waiting for its end." << std::endl; - } - } - } -}; - -const relpipe::writer::string_t ExecAttributeFinder::SCRIPT_PREFIX = L"__relpipe_in_filesystem_script_"; - -} -} -} diff -r 56409232e1a1 -r c64e1588f428 src/ExecMsg.h --- a/src/ExecMsg.h Sat Jan 11 00:58:35 2020 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,86 +0,0 @@ -// This file was generated from the specification. - -#include -#include - -namespace relpipe { -namespace in { -namespace filesystem { - -class ExecMsg { -public: - - static const int VERSION_SUPPORTED; - static const int WAITING_FOR_VERSION; - static const int VERSION_ACCEPTED; - static const int RELATION_START; - static const int INPUT_ATTRIBUTE_METADATA; - static const int OUTPUT_ATTRIBUTE_ALIAS; - static const int OPTION; - static const int COMPLETION_REQUEST; - static const int COMPLETION; - static const int COMPLETION_END; - static const int WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA; - static const int OUTPUT_ATTRIBUTE_METADATA; - static const int WAITING_FOR_INPUT_ATTRIBUTES; - static const int INPUT_ATTRIBUTE; - static const int WAITING_FOR_OUTPUT_ATTRIBUTES; - static const int OUTPUT_ATTRIBUTE; - static const int EXECUTOR_ERROR; - static const int PROCESS_ERROR; - static const int PROCESS_WARNING; - static const int RELATION_END; - - static std::map getMessageNames() { - std::map m; - - m[VERSION_SUPPORTED] = L"VERSION_SUPPORTED"; - m[WAITING_FOR_VERSION] = L"WAITING_FOR_VERSION"; - m[VERSION_ACCEPTED] = L"VERSION_ACCEPTED"; - m[RELATION_START] = L"RELATION_START"; - m[INPUT_ATTRIBUTE_METADATA] = L"INPUT_ATTRIBUTE_METADATA"; - m[OUTPUT_ATTRIBUTE_ALIAS] = L"OUTPUT_ATTRIBUTE_ALIAS"; - m[OPTION] = L"OPTION"; - m[COMPLETION_REQUEST] = L"COMPLETION_REQUEST"; - m[COMPLETION] = L"COMPLETION"; - m[COMPLETION_END] = L"COMPLETION_END"; - m[WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA] = L"WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA"; - m[OUTPUT_ATTRIBUTE_METADATA] = L"OUTPUT_ATTRIBUTE_METADATA"; - m[WAITING_FOR_INPUT_ATTRIBUTES] = L"WAITING_FOR_INPUT_ATTRIBUTES"; - m[INPUT_ATTRIBUTE] = L"INPUT_ATTRIBUTE"; - m[WAITING_FOR_OUTPUT_ATTRIBUTES] = L"WAITING_FOR_OUTPUT_ATTRIBUTES"; - m[OUTPUT_ATTRIBUTE] = L"OUTPUT_ATTRIBUTE"; - m[EXECUTOR_ERROR] = L"EXECUTOR_ERROR"; - m[PROCESS_ERROR] = L"PROCESS_ERROR"; - m[PROCESS_WARNING] = L"PROCESS_WARNING"; - m[RELATION_END] = L"RELATION_END"; - - return m; - } - -}; - -const int ExecMsg::VERSION_SUPPORTED = 100; -const int ExecMsg::WAITING_FOR_VERSION = 101; -const int ExecMsg::VERSION_ACCEPTED = 102; -const int ExecMsg::RELATION_START = 103; -const int ExecMsg::INPUT_ATTRIBUTE_METADATA = 104; -const int ExecMsg::OUTPUT_ATTRIBUTE_ALIAS = 105; -const int ExecMsg::OPTION = 106; -const int ExecMsg::COMPLETION_REQUEST = 107; -const int ExecMsg::COMPLETION = 108; -const int ExecMsg::COMPLETION_END = 109; -const int ExecMsg::WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA = 110; -const int ExecMsg::OUTPUT_ATTRIBUTE_METADATA = 111; -const int ExecMsg::WAITING_FOR_INPUT_ATTRIBUTES = 112; -const int ExecMsg::INPUT_ATTRIBUTE = 113; -const int ExecMsg::WAITING_FOR_OUTPUT_ATTRIBUTES = 114; -const int ExecMsg::OUTPUT_ATTRIBUTE = 115; -const int ExecMsg::EXECUTOR_ERROR = 116; -const int ExecMsg::PROCESS_ERROR = 117; -const int ExecMsg::PROCESS_WARNING = 118; -const int ExecMsg::RELATION_END = 120; - -} -} -} diff -r 56409232e1a1 -r c64e1588f428 src/FilesystemCommand.h --- a/src/FilesystemCommand.h Sat Jan 11 00:58:35 2020 +0100 +++ b/src/FilesystemCommand.h Sat Jan 11 18:13:30 2020 +0100 @@ -37,7 +37,7 @@ #include "AttributeFinder.h" #include "FileAttributeFinder.h" #include "XattrAttributeFinder.h" -#include "ExecAttributeFinder.h" +#include "StreamletAttributeFinder.h" namespace relpipe { namespace in { @@ -51,12 +51,12 @@ std::wstring_convert> convertor; // TODO: support also other encodings. FileAttributeFinder fileAttributeFinder; - ExecAttributeFinder execAttributeFinder; + StreamletAttributeFinder execAttributeFinder; XattrAttributeFinder xattrAttributeFinder; std::map attributeFinders{ {RequestedField::GROUP_FILE, &fileAttributeFinder}, - {RequestedField::GROUP_EXEC, &execAttributeFinder}, + {RequestedField::GROUP_STREAMLET, &execAttributeFinder}, {RequestedField::GROUP_XATTR, &xattrAttributeFinder}}; void reset(std::stringstream& stream) { diff -r 56409232e1a1 -r c64e1588f428 src/RequestedField.h --- a/src/RequestedField.h Sat Jan 11 00:58:35 2020 +0100 +++ b/src/RequestedField.h Sat Jan 11 18:13:30 2020 +0100 @@ -30,7 +30,7 @@ public: static const string_t GROUP_FILE; static const string_t GROUP_XATTR; - static const string_t GROUP_EXEC; + static const string_t GROUP_STREAMLET; integer_t id; string_t group; string_t name; @@ -58,7 +58,7 @@ const string_t RequestedField::GROUP_FILE = L"file"; const string_t RequestedField::GROUP_XATTR = L"xattr"; -const string_t RequestedField::GROUP_EXEC = L"exec"; +const string_t RequestedField::GROUP_STREAMLET = L"streamlet"; } } diff -r 56409232e1a1 -r c64e1588f428 src/StreamletAttributeFinder.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/StreamletAttributeFinder.h Sat Jan 11 18:13:30 2020 +0100 @@ -0,0 +1,139 @@ +/** + * Relational pipes + * Copyright © 2019 František Kučera (Frantovo.cz, GlobalCode.info) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include + +#include "RequestedField.h" +#include "SubProcess.h" +#include "AttributeFinder.h" +#include "StreamletMsg.h" + +namespace relpipe { +namespace in { +namespace filesystem { + +namespace fs = std::filesystem; +using namespace relpipe::writer; + +class StreamletAttributeFinder : public AttributeFinder { +private: + std::wstring_convert> convertor; // TODO: support also other encodings. + std::map> subProcesses; + std::map> cachedMetadata; + + string_t getExecCommand(const RequestedField& field) { + // TODO: move to another directory, exec, not script + use custom $PATH with no prefix + return SCRIPT_PREFIX + field.name; + } + +protected: + + virtual void writeFieldOfExistingFile(RelationalWriter* writer, const RequestedField& field) override { + // TODO: paralelize also over records → fork multiple processes and distribute records across them; then collect results (with a lock) + if (field.group == RequestedField::GROUP_STREAMLET) { + + subProcesses[field.id]->write({StreamletMsg::INPUT_ATTRIBUTE, L"0", convertor.from_bytes(currentFileRaw), L"false"}); // index, value, isNull + subProcesses[field.id]->write({StreamletMsg::WAITING_FOR_OUTPUT_ATTRIBUTES}); + + for (auto metadata : cachedMetadata[field.id]) { + SubProcess::Message m = subProcesses[field.id]->read(); + if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE) writer->writeAttribute(m.parameters[0]); + else throw RelpipeWriterException(L"Protocol violation from exec sub-process while reading: „" + metadata.attributeName + L"“. Expected OUTPUT_ATTRIBUTE but got: " + m.toString()); + } + + SubProcess::Message m = subProcesses[field.id]->read(); + if (m.code != StreamletMsg::WAITING_FOR_INPUT_ATTRIBUTES) throw RelpipeWriterException(L"Protocol violation from exec sub-process. Expected WAITING_FOR_INPUT_ATTRIBUTES but got: " + m.toString()); + // TODO: generic protocol violation error messages / method for checking responses + } + } + +public: + + static const string_t SCRIPT_PREFIX; + + virtual vector toMetadata(RelationalWriter* writer, const RequestedField& field) override { + if (field.group == RequestedField::GROUP_STREAMLET) { + + if (cachedMetadata.count(field.id)) { + return cachedMetadata[field.id]; + } else { + + std::vector commandLine = {getExecCommand(field)}; + std::map environment; + + for (auto mn : StreamletMsg::getMessageNames()) { + environment[L"EXEC_MSG_" + mn.second] = std::to_wstring(mn.first); + environment[L"EXEC_MSG_" + std::to_wstring(mn.first)] = mn.second; + } + + shared_ptr subProcess(SubProcess::create(commandLine, environment)); + subProcesses[field.id] = subProcess; + + string_t version = L"1"; + subProcess->write({StreamletMsg::VERSION_SUPPORTED, version}); + subProcess->write({StreamletMsg::WAITING_FOR_VERSION}); + SubProcess::Message versionMessage = subProcess->read(); + if (versionMessage.code == StreamletMsg::VERSION_ACCEPTED && versionMessage.parameters[0] == version) { + subProcess->write({StreamletMsg::RELATION_START}); + subProcess->write({StreamletMsg::INPUT_ATTRIBUTE_METADATA, L"path", L"string"}); + for (string_t alias : field.getAliases()) subProcess->write({StreamletMsg::OUTPUT_ATTRIBUTE_ALIAS, alias}); + for (int i = 0; i < field.options.size();) subProcess->write({StreamletMsg::OPTION, field.options[i++], field.options[i++]}); + subProcess->write({StreamletMsg::WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA}); + + vector metadata; + while (true) { + SubProcess::Message m = subProcess->read(); + if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE_METADATA) metadata.push_back({m.parameters[0], writer->toTypeId(m.parameters[1])}); + else if (m.code == StreamletMsg::WAITING_FOR_INPUT_ATTRIBUTES) break; + } + + cachedMetadata[field.id] = metadata; + return metadata; + } else { + throw RelpipeWriterException(L"Incompatible exec sub-process version or message: " + versionMessage.toString()); + } + } + } else { + return {}; + } + } + + virtual ~StreamletAttributeFinder() override { + for (auto s : subProcesses) { + try { + s.second->write({StreamletMsg::RELATION_END}); + s.second->wait(); + } catch (...) { + std::wcerr << L"Exception caught during closing sub-process #" + std::to_wstring(s.first) + L" and waiting for its end." << std::endl; + } + } + } +}; + +const relpipe::writer::string_t StreamletAttributeFinder::SCRIPT_PREFIX = L"__relpipe_in_filesystem_script_"; + +} +} +} diff -r 56409232e1a1 -r c64e1588f428 src/StreamletMsg.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/StreamletMsg.h Sat Jan 11 18:13:30 2020 +0100 @@ -0,0 +1,88 @@ +// This file was generated from the specification. + +#pragma once + +#include +#include + +namespace relpipe { +namespace in { +namespace filesystem { + +class StreamletMsg { +public: + + static const int VERSION_SUPPORTED; + static const int WAITING_FOR_VERSION; + static const int VERSION_ACCEPTED; + static const int RELATION_START; + static const int INPUT_ATTRIBUTE_METADATA; + static const int OUTPUT_ATTRIBUTE_ALIAS; + static const int OPTION; + static const int COMPLETION_REQUEST; + static const int COMPLETION; + static const int COMPLETION_END; + static const int WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA; + static const int OUTPUT_ATTRIBUTE_METADATA; + static const int WAITING_FOR_INPUT_ATTRIBUTES; + static const int INPUT_ATTRIBUTE; + static const int WAITING_FOR_OUTPUT_ATTRIBUTES; + static const int OUTPUT_ATTRIBUTE; + static const int EXECUTOR_ERROR; + static const int STREAMLET_ERROR; + static const int STREAMLET_WARNING; + static const int RELATION_END; + + static std::map getMessageNames() { + std::map m; + + m[VERSION_SUPPORTED] = L"VERSION_SUPPORTED"; + m[WAITING_FOR_VERSION] = L"WAITING_FOR_VERSION"; + m[VERSION_ACCEPTED] = L"VERSION_ACCEPTED"; + m[RELATION_START] = L"RELATION_START"; + m[INPUT_ATTRIBUTE_METADATA] = L"INPUT_ATTRIBUTE_METADATA"; + m[OUTPUT_ATTRIBUTE_ALIAS] = L"OUTPUT_ATTRIBUTE_ALIAS"; + m[OPTION] = L"OPTION"; + m[COMPLETION_REQUEST] = L"COMPLETION_REQUEST"; + m[COMPLETION] = L"COMPLETION"; + m[COMPLETION_END] = L"COMPLETION_END"; + m[WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA] = L"WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA"; + m[OUTPUT_ATTRIBUTE_METADATA] = L"OUTPUT_ATTRIBUTE_METADATA"; + m[WAITING_FOR_INPUT_ATTRIBUTES] = L"WAITING_FOR_INPUT_ATTRIBUTES"; + m[INPUT_ATTRIBUTE] = L"INPUT_ATTRIBUTE"; + m[WAITING_FOR_OUTPUT_ATTRIBUTES] = L"WAITING_FOR_OUTPUT_ATTRIBUTES"; + m[OUTPUT_ATTRIBUTE] = L"OUTPUT_ATTRIBUTE"; + m[EXECUTOR_ERROR] = L"EXECUTOR_ERROR"; + m[STREAMLET_ERROR] = L"STREAMLET_ERROR"; + m[STREAMLET_WARNING] = L"STREAMLET_WARNING"; + m[RELATION_END] = L"RELATION_END"; + + return m; + } + +}; + +const int StreamletMsg::VERSION_SUPPORTED = 100; +const int StreamletMsg::WAITING_FOR_VERSION = 101; +const int StreamletMsg::VERSION_ACCEPTED = 102; +const int StreamletMsg::RELATION_START = 103; +const int StreamletMsg::INPUT_ATTRIBUTE_METADATA = 104; +const int StreamletMsg::OUTPUT_ATTRIBUTE_ALIAS = 105; +const int StreamletMsg::OPTION = 106; +const int StreamletMsg::COMPLETION_REQUEST = 107; +const int StreamletMsg::COMPLETION = 108; +const int StreamletMsg::COMPLETION_END = 109; +const int StreamletMsg::WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA = 110; +const int StreamletMsg::OUTPUT_ATTRIBUTE_METADATA = 111; +const int StreamletMsg::WAITING_FOR_INPUT_ATTRIBUTES = 112; +const int StreamletMsg::INPUT_ATTRIBUTE = 113; +const int StreamletMsg::WAITING_FOR_OUTPUT_ATTRIBUTES = 114; +const int StreamletMsg::OUTPUT_ATTRIBUTE = 115; +const int StreamletMsg::EXECUTOR_ERROR = 116; +const int StreamletMsg::STREAMLET_ERROR = 117; +const int StreamletMsg::STREAMLET_WARNING = 118; +const int StreamletMsg::RELATION_END = 120; + +} +} +}