1 /** |
|
2 * Relational pipes |
|
3 * Copyright © 2019 František Kučera (Frantovo.cz, GlobalCode.info) |
|
4 * |
|
5 * This program is free software: you can redistribute it and/or modify |
|
6 * it under the terms of the GNU General Public License as published by |
|
7 * the Free Software Foundation, version 3 of the License. |
|
8 * |
|
9 * This program is distributed in the hope that it will be useful, |
|
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 * GNU General Public License for more details. |
|
13 * |
|
14 * You should have received a copy of the GNU General Public License |
|
15 * along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 */ |
|
17 #pragma once |
|
18 |
|
19 #include <vector> |
|
20 #include <filesystem> |
|
21 #include <regex> |
|
22 #include <memory> |
|
23 |
|
24 #include <relpipe/writer/typedefs.h> |
|
25 #include <relpipe/writer/AttributeMetadata.h> |
|
26 #include <relpipe/writer/RelationalWriter.h> |
|
27 |
|
28 #include "RequestedField.h" |
|
29 #include "SubProcess.h" |
|
30 #include "AttributeFinder.h" |
|
31 #include "ExecMsg.h" |
|
32 |
|
33 namespace relpipe { |
|
34 namespace in { |
|
35 namespace filesystem { |
|
36 |
|
37 namespace fs = std::filesystem; |
|
38 using namespace relpipe::writer; |
|
39 |
|
40 class ExecAttributeFinder : public AttributeFinder { |
|
41 private: |
|
42 std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings. |
|
43 std::map<int, std::shared_ptr<SubProcess>> subProcesses; |
|
44 std::map<int, std::vector<AttributeMetadata>> cachedMetadata; |
|
45 |
|
46 string_t getExecCommand(const RequestedField& field) { |
|
47 // TODO: move to another directory, exec, not script + use custom $PATH with no prefix |
|
48 return SCRIPT_PREFIX + field.name; |
|
49 } |
|
50 |
|
51 protected: |
|
52 |
|
53 virtual void writeFieldOfExistingFile(RelationalWriter* writer, const RequestedField& field) override { |
|
54 // TODO: paralelize also over records → fork multiple processes and distribute records across them; then collect results (with a lock) |
|
55 if (field.group == RequestedField::GROUP_EXEC) { |
|
56 |
|
57 subProcesses[field.id]->write({ExecMsg::INPUT_ATTRIBUTE, L"0", convertor.from_bytes(currentFileRaw), L"false"}); // index, value, isNull |
|
58 subProcesses[field.id]->write({ExecMsg::WAITING_FOR_OUTPUT_ATTRIBUTES}); |
|
59 |
|
60 for (auto metadata : cachedMetadata[field.id]) { |
|
61 SubProcess::Message m = subProcesses[field.id]->read(); |
|
62 if (m.code == ExecMsg::OUTPUT_ATTRIBUTE) writer->writeAttribute(m.parameters[0]); |
|
63 else throw RelpipeWriterException(L"Protocol violation from exec sub-process while reading: „" + metadata.attributeName + L"“. Expected OUTPUT_ATTRIBUTE but got: " + m.toString()); |
|
64 } |
|
65 |
|
66 SubProcess::Message m = subProcesses[field.id]->read(); |
|
67 if (m.code != ExecMsg::WAITING_FOR_INPUT_ATTRIBUTES) throw RelpipeWriterException(L"Protocol violation from exec sub-process. Expected WAITING_FOR_INPUT_ATTRIBUTES but got: " + m.toString()); |
|
68 // TODO: generic protocol violation error messages / method for checking responses |
|
69 } |
|
70 } |
|
71 |
|
72 public: |
|
73 |
|
74 static const string_t SCRIPT_PREFIX; |
|
75 |
|
76 virtual vector<AttributeMetadata> toMetadata(RelationalWriter* writer, const RequestedField& field) override { |
|
77 if (field.group == RequestedField::GROUP_EXEC) { |
|
78 |
|
79 if (cachedMetadata.count(field.id)) { |
|
80 return cachedMetadata[field.id]; |
|
81 } else { |
|
82 |
|
83 std::vector<string_t> commandLine = {getExecCommand(field)}; |
|
84 std::map<string_t, string_t> environment; |
|
85 |
|
86 for (auto mn : ExecMsg::getMessageNames()) { |
|
87 environment[L"EXEC_MSG_" + mn.second] = std::to_wstring(mn.first); |
|
88 environment[L"EXEC_MSG_" + std::to_wstring(mn.first)] = mn.second; |
|
89 } |
|
90 |
|
91 shared_ptr<SubProcess> subProcess(SubProcess::create(commandLine, environment, false)); |
|
92 subProcesses[field.id] = subProcess; |
|
93 |
|
94 string_t version = L"1"; |
|
95 subProcess->write({ExecMsg::VERSION_SUPPORTED, version}); |
|
96 subProcess->write({ExecMsg::WAITING_FOR_VERSION}); |
|
97 SubProcess::Message versionMessage = subProcess->read(); |
|
98 if (versionMessage.code == ExecMsg::VERSION_ACCEPTED && versionMessage.parameters[0] == version) { |
|
99 subProcess->write({ExecMsg::RELATION_START}); |
|
100 subProcess->write({ExecMsg::INPUT_ATTRIBUTE_METADATA, L"path", L"string"}); |
|
101 for (string_t alias : field.getAliases()) subProcess->write({ExecMsg::OUTPUT_ATTRIBUTE_ALIAS, alias}); |
|
102 for (int i = 0; i < field.options.size();) subProcess->write({ExecMsg::OPTION, field.options[i++], field.options[i++]}); |
|
103 subProcess->write({ExecMsg::WAITING_FOR_OUTPUT_ATTRIBUTES_METADATA}); |
|
104 |
|
105 vector<AttributeMetadata> metadata; |
|
106 while (true) { |
|
107 SubProcess::Message m = subProcess->read(); |
|
108 if (m.code == ExecMsg::OUTPUT_ATTRIBUTE_METADATA) metadata.push_back({m.parameters[0], writer->toTypeId(m.parameters[1])}); |
|
109 else if (m.code == ExecMsg::WAITING_FOR_INPUT_ATTRIBUTES) break; |
|
110 } |
|
111 |
|
112 cachedMetadata[field.id] = metadata; |
|
113 return metadata; |
|
114 } else { |
|
115 throw RelpipeWriterException(L"Incompatible exec sub-process version or message: " + versionMessage.toString()); |
|
116 } |
|
117 } |
|
118 } else { |
|
119 return {}; |
|
120 } |
|
121 } |
|
122 |
|
123 virtual ~ExecAttributeFinder() override { |
|
124 for (auto s : subProcesses) { |
|
125 try { |
|
126 s.second->write({ExecMsg::RELATION_END}); |
|
127 s.second->wait(); |
|
128 } catch (...) { |
|
129 std::wcerr << L"Exception caught during closing sub-process #" + std::to_wstring(s.first) + L" and waiting for its end." << std::endl; |
|
130 } |
|
131 } |
|
132 } |
|
133 }; |
|
134 |
|
135 const relpipe::writer::string_t ExecAttributeFinder::SCRIPT_PREFIX = L"__relpipe_in_filesystem_script_"; |
|
136 |
|
137 } |
|
138 } |
|
139 } |
|