src/StreamletAttributeFinder.h
branchv_0
changeset 46 b5ae61996281
parent 45 f466b4c7d9b1
child 47 beefddde951e
equal deleted inserted replaced
45:f466b4c7d9b1 46:b5ae61996281
    18 
    18 
    19 #include <vector>
    19 #include <vector>
    20 #include <filesystem>
    20 #include <filesystem>
    21 #include <regex>
    21 #include <regex>
    22 #include <memory>
    22 #include <memory>
       
    23 #include <sstream>
    23 
    24 
    24 #include <relpipe/writer/typedefs.h>
    25 #include <relpipe/writer/typedefs.h>
    25 #include <relpipe/writer/AttributeMetadata.h>
    26 #include <relpipe/writer/AttributeMetadata.h>
    26 #include <relpipe/writer/RelationalWriter.h>
    27 #include <relpipe/writer/RelationalWriter.h>
    27 
    28 
    41 private:
    42 private:
    42 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
    43 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
    43 	std::map<int, std::shared_ptr<SubProcess>> subProcesses;
    44 	std::map<int, std::shared_ptr<SubProcess>> subProcesses;
    44 	std::map<int, std::vector<AttributeMetadata>> cachedMetadata;
    45 	std::map<int, std::vector<AttributeMetadata>> cachedMetadata;
    45 
    46 
    46 	string_t getStreamletPath() {
    47 	string_t getStreamletCommand(const RequestedField& field) {
    47 		const char* originalPath = getenv("PATH");
    48 		const char* streamletPathChars = getenv("RELPIPE_IN_FILESYSTEM_STREAMLET_PATH");
    48 		const char* streamletPath = getenv("RELPIPE_IN_FILESYSTEM_STREAMLET_PATH");
    49 		if (streamletPathChars) {
       
    50 			std::wstringstream current;
       
    51 			string_t streamletPath = convertor.from_bytes(streamletPathChars);
       
    52 			for (int i = 0, streamletPathSize = streamletPath.size(); i < streamletPathSize; i++) {
       
    53 				if (streamletPath[i] == ':' || i == streamletPathSize - 1) { // FIXME: support \: and \\ escaping
       
    54 					current << L"/" << field.name;
       
    55 					fs::path streamletFile(current.str());
       
    56 					if (fs::exists(streamletFile)) { // FIXME: check executable bit
       
    57 						return current.str();
       
    58 					} else {
       
    59 						current.str(L"");
       
    60 						current.clear();
       
    61 					}
       
    62 				} else {
       
    63 					current.put(streamletPath[i]);
       
    64 				}
       
    65 			}
       
    66 			throw RelpipeWriterException(L"Streamlet „" + field.name + L"“ was not found at $RELPIPE_IN_FILESYSTEM_STREAMLET_PATH");
       
    67 		} else {
       
    68 			throw RelpipeWriterException(L"Missing environment variable RELPIPE_IN_FILESYSTEM_STREAMLET_PATH → unable to find streamlet.");
       
    69 		}
       
    70 	}
    49 
    71 
    50 		if (originalPath && streamletPath) return convertor.from_bytes(std::string(streamletPath) + ":" + originalPath);
       
    51 		else if (originalPath) return convertor.from_bytes(std::string(originalPath));
       
    52 		else if (streamletPath) return convertor.from_bytes(std::string(streamletPath));
       
    53 		else return L"";
       
    54 	}
       
    55 protected:
    72 protected:
    56 
    73 
    57 	void startFile(const fs::path& file, const string& fileRaw, bool exists) override {
    74 	void startFile(const fs::path& file, const string& fileRaw, bool exists) override {
    58 		AttributeFinder::startFile(file, fileRaw, exists);
    75 		AttributeFinder::startFile(file, fileRaw, exists);
    59 		if (exists) {
    76 		if (exists) {
    88 
   105 
    89 			if (cachedMetadata.count(field.id)) {
   106 			if (cachedMetadata.count(field.id)) {
    90 				return cachedMetadata[field.id];
   107 				return cachedMetadata[field.id];
    91 			} else {
   108 			} else {
    92 
   109 
    93 				std::vector<string_t> commandLine = {field.name};
   110 				std::vector<string_t> commandLine = {getStreamletCommand(field)};
    94 				std::map<string_t, string_t> environment;
   111 				std::map<string_t, string_t> environment;
    95 
   112 
    96 				for (auto mn : StreamletMsg::getMessageNames()) {
   113 				for (auto mn : StreamletMsg::getMessageNames()) {
    97 					environment[L"EXEC_MSG_" + mn.second] = std::to_wstring(mn.first);
   114 					environment[L"EXEC_MSG_" + mn.second] = std::to_wstring(mn.first);
    98 					environment[L"EXEC_MSG_" + std::to_wstring(mn.first)] = mn.second;
   115 					environment[L"EXEC_MSG_" + std::to_wstring(mn.first)] = mn.second;
    99 				}
   116 				}
   100 
       
   101 				environment[L"PATH"] = getStreamletPath();
       
   102 
   117 
   103 				shared_ptr<SubProcess> subProcess(SubProcess::create(commandLine, environment));
   118 				shared_ptr<SubProcess> subProcess(SubProcess::create(commandLine, environment));
   104 				subProcesses[field.id] = subProcess;
   119 				subProcesses[field.id] = subProcess;
   105 
   120 
   106 				string_t version = L"1";
   121 				string_t version = L"1";