41 private: |
42 private: |
42 std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings. |
43 std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings. |
43 std::map<int, std::shared_ptr<SubProcess>> subProcesses; |
44 std::map<int, std::shared_ptr<SubProcess>> subProcesses; |
44 std::map<int, std::vector<AttributeMetadata>> cachedMetadata; |
45 std::map<int, std::vector<AttributeMetadata>> cachedMetadata; |
45 |
46 |
46 string_t getStreamletPath() { |
47 string_t getStreamletCommand(const RequestedField& field) { |
47 const char* originalPath = getenv("PATH"); |
48 const char* streamletPathChars = getenv("RELPIPE_IN_FILESYSTEM_STREAMLET_PATH"); |
48 const char* streamletPath = getenv("RELPIPE_IN_FILESYSTEM_STREAMLET_PATH"); |
49 if (streamletPathChars) { |
|
50 std::wstringstream current; |
|
51 string_t streamletPath = convertor.from_bytes(streamletPathChars); |
|
52 for (int i = 0, streamletPathSize = streamletPath.size(); i < streamletPathSize; i++) { |
|
53 if (streamletPath[i] == ':' || i == streamletPathSize - 1) { // FIXME: support \: and \\ escaping |
|
54 current << L"/" << field.name; |
|
55 fs::path streamletFile(current.str()); |
|
56 if (fs::exists(streamletFile)) { // FIXME: check executable bit |
|
57 return current.str(); |
|
58 } else { |
|
59 current.str(L""); |
|
60 current.clear(); |
|
61 } |
|
62 } else { |
|
63 current.put(streamletPath[i]); |
|
64 } |
|
65 } |
|
66 throw RelpipeWriterException(L"Streamlet „" + field.name + L"“ was not found at $RELPIPE_IN_FILESYSTEM_STREAMLET_PATH"); |
|
67 } else { |
|
68 throw RelpipeWriterException(L"Missing environment variable RELPIPE_IN_FILESYSTEM_STREAMLET_PATH → unable to find streamlet."); |
|
69 } |
|
70 } |
49 |
71 |
50 if (originalPath && streamletPath) return convertor.from_bytes(std::string(streamletPath) + ":" + originalPath); |
|
51 else if (originalPath) return convertor.from_bytes(std::string(originalPath)); |
|
52 else if (streamletPath) return convertor.from_bytes(std::string(streamletPath)); |
|
53 else return L""; |
|
54 } |
|
55 protected: |
72 protected: |
56 |
73 |
57 void startFile(const fs::path& file, const string& fileRaw, bool exists) override { |
74 void startFile(const fs::path& file, const string& fileRaw, bool exists) override { |
58 AttributeFinder::startFile(file, fileRaw, exists); |
75 AttributeFinder::startFile(file, fileRaw, exists); |
59 if (exists) { |
76 if (exists) { |
88 |
105 |
89 if (cachedMetadata.count(field.id)) { |
106 if (cachedMetadata.count(field.id)) { |
90 return cachedMetadata[field.id]; |
107 return cachedMetadata[field.id]; |
91 } else { |
108 } else { |
92 |
109 |
93 std::vector<string_t> commandLine = {field.name}; |
110 std::vector<string_t> commandLine = {getStreamletCommand(field)}; |
94 std::map<string_t, string_t> environment; |
111 std::map<string_t, string_t> environment; |
95 |
112 |
96 for (auto mn : StreamletMsg::getMessageNames()) { |
113 for (auto mn : StreamletMsg::getMessageNames()) { |
97 environment[L"EXEC_MSG_" + mn.second] = std::to_wstring(mn.first); |
114 environment[L"EXEC_MSG_" + mn.second] = std::to_wstring(mn.first); |
98 environment[L"EXEC_MSG_" + std::to_wstring(mn.first)] = mn.second; |
115 environment[L"EXEC_MSG_" + std::to_wstring(mn.first)] = mn.second; |
99 } |
116 } |
100 |
|
101 environment[L"PATH"] = getStreamletPath(); |
|
102 |
117 |
103 shared_ptr<SubProcess> subProcess(SubProcess::create(commandLine, environment)); |
118 shared_ptr<SubProcess> subProcess(SubProcess::create(commandLine, environment)); |
104 subProcesses[field.id] = subProcess; |
119 subProcesses[field.id] = subProcess; |
105 |
120 |
106 string_t version = L"1"; |
121 string_t version = L"1"; |