diff -r 56409232e1a1 -r c64e1588f428 src/StreamletAttributeFinder.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/StreamletAttributeFinder.h Sat Jan 11 18:13:30 2020 +0100 @@ -0,0 +1,139 @@ +/** + * Relational pipes + * Copyright © 2019 František Kučera (Frantovo.cz, GlobalCode.info) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include + +#include "RequestedField.h" +#include "SubProcess.h" +#include "AttributeFinder.h" +#include "StreamletMsg.h" + +namespace relpipe { +namespace in { +namespace filesystem { + +namespace fs = std::filesystem; +using namespace relpipe::writer; + +class StreamletAttributeFinder : public AttributeFinder { +private: + std::wstring_convert> convertor; // TODO: support also other encodings. + std::map> subProcesses; + std::map> cachedMetadata; + + string_t getExecCommand(const RequestedField& field) { + // TODO: move to another directory, exec, not script + use custom $PATH with no prefix + return SCRIPT_PREFIX + field.name; + } + +protected: + + virtual void writeFieldOfExistingFile(RelationalWriter* writer, const RequestedField& field) override { + // TODO: paralelize also over records → fork multiple processes and distribute records across them; then collect results (with a lock) + if (field.group == RequestedField::GROUP_STREAMLET) { + + subProcesses[field.id]->write({StreamletMsg::INPUT_ATTRIBUTE, L"0", convertor.from_bytes(currentFileRaw), L"false"}); // index, value, isNull + subProcesses[field.id]->write({StreamletMsg::WAITING_FOR_OUTPUT_ATTRIBUTES}); + + for (auto metadata : cachedMetadata[field.id]) { + SubProcess::Message m = subProcesses[field.id]->read(); + if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE) writer->writeAttribute(m.parameters[0]); + else throw RelpipeWriterException(L"Protocol violation from exec sub-process while reading: „" + metadata.attributeName + L"“. Expected OUTPUT_ATTRIBUTE but got: " + m.toString()); + } + + SubProcess::Message m = subProcesses[field.id]->read(); + if (m.code != StreamletMsg::WAITING_FOR_INPUT_ATTRIBUTES) throw RelpipeWriterException(L"Protocol violation from exec sub-process. Expected WAITING_FOR_INPUT_ATTRIBUTES but got: " + m.toString()); + // TODO: generic protocol violation error messages / method for checking responses + } + } + +public: + + static const string_t SCRIPT_PREFIX; + + virtual vector toMetadata(RelationalWriter* writer, const RequestedField& field) override { + if (field.group == RequestedField::GROUP_STREAMLET) { + + if (cachedMetadata.count(field.id)) { + return cachedMetadata[field.id]; + } else { + + std::vector commandLine = {getExecCommand(field)}; + std::map environment; + + for (auto mn : StreamletMsg::getMessageNames()) { + environment[L"EXEC_MSG_" + mn.second] = std::to_wstring(mn.first); + environment[L"EXEC_MSG_" + std::to_wstring(mn.first)] = mn.second; + } + + shared_ptr subProcess(SubProcess::create(commandLine, environment)); + subProcesses[field.id] = subProcess; + + string_t version = L"1"; + subProcess->write({StreamletMsg::VERSION_SUPPORTED, version}); + subProcess->write({StreamletMsg::WAITING_FOR_VERSION}); + SubProcess::Message versionMessage = subProcess->read(); + if (versionMessage.code == StreamletMsg::VERSION_ACCEPTED && versionMessage.parameters[0] == version) { + subProcess->write({StreamletMsg::RELATION_START}); + subProcess->write({StreamletMsg::INPUT_ATTRIBUTE_METADATA, L"path", L"string"}); + for (string_t alias : field.getAliases()) subProcess->write({StreamletMsg::OUTPUT_ATTRIBUTE_ALIAS, alias}); + for (int i = 0; i < field.options.size();) subProcess->write({StreamletMsg::OPTION, field.options[i++], field.options[i++]}); + subProcess->write({StreamletMsg::WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA}); + + vector metadata; + while (true) { + SubProcess::Message m = subProcess->read(); + if (m.code == StreamletMsg::OUTPUT_ATTRIBUTE_METADATA) metadata.push_back({m.parameters[0], writer->toTypeId(m.parameters[1])}); + else if (m.code == StreamletMsg::WAITING_FOR_INPUT_ATTRIBUTES) break; + } + + cachedMetadata[field.id] = metadata; + return metadata; + } else { + throw RelpipeWriterException(L"Incompatible exec sub-process version or message: " + versionMessage.toString()); + } + } + } else { + return {}; + } + } + + virtual ~StreamletAttributeFinder() override { + for (auto s : subProcesses) { + try { + s.second->write({StreamletMsg::RELATION_END}); + s.second->wait(); + } catch (...) { + std::wcerr << L"Exception caught during closing sub-process #" + std::to_wstring(s.first) + L" and waiting for its end." << std::endl; + } + } + } +}; + +const relpipe::writer::string_t StreamletAttributeFinder::SCRIPT_PREFIX = L"__relpipe_in_filesystem_script_"; + +} +} +}