src/StreamletAttributeFinder.h
branchv_0
changeset 48 26a8b1a14889
parent 47 beefddde951e
child 51 841845ccf06d
equal deleted inserted replaced
47:beefddde951e 48:26a8b1a14889
    43 private:
    43 private:
    44 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
    44 	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
    45 	std::map<int, std::shared_ptr<SubProcess>> subProcesses;
    45 	std::map<int, std::shared_ptr<SubProcess>> subProcesses;
    46 	std::map<int, std::vector<AttributeMetadata>> cachedMetadata;
    46 	std::map<int, std::vector<AttributeMetadata>> cachedMetadata;
    47 
    47 
       
    48 	std::vector<string_t> splitBySeparator(const string_t& originalString, const wchar_t separator = L':', const wchar_t escape = L'\\') {
       
    49 		std::vector<string_t> result;
       
    50 		std::wstringstream current;
       
    51 		for (int i = 0, size = originalString.size(); i < size; i++) {
       
    52 			wchar_t ch = originalString[i];
       
    53 			if (ch == escape) {
       
    54 				if (i + 1 < size) {
       
    55 					ch = originalString[++i];
       
    56 					if (ch == separator || ch == escape) current.put(ch);
       
    57 					else RelpipeWriterException(L"Invalid escape sequence at position " + std::to_wstring(i) + L" of: " + originalString);
       
    58 				} else {
       
    59 					throw RelpipeWriterException(L"Invalid use of escape character at the end of: " + originalString);
       
    60 				}
       
    61 			} else if (ch == separator || i + 1 == size) {
       
    62 				if (current.str().size()) result.push_back(current.str());
       
    63 				current.str(L"");
       
    64 				current.clear();
       
    65 			} else {
       
    66 				current.put(ch);
       
    67 			}
       
    68 		}
       
    69 		return result;
       
    70 	}
       
    71 
    48 	string_t getStreamletCommand(const RequestedField& field) {
    72 	string_t getStreamletCommand(const RequestedField& field) {
    49 		const char* streamletPathChars = getenv("RELPIPE_IN_FILESYSTEM_STREAMLET_PATH");
    73 		const char* streamletPathChars = getenv("RELPIPE_IN_FILESYSTEM_STREAMLET_PATH");
    50 		if (streamletPathChars) {
    74 		if (streamletPathChars) {
    51 			std::wstringstream current;
    75 			for (string_t path : splitBySeparator(convertor.from_bytes(streamletPathChars))) {
    52 			string_t streamletPath = convertor.from_bytes(streamletPathChars);
    76 				fs::path file = fs::path(path) / fs::path(field.name);
    53 			for (int i = 0, streamletPathSize = streamletPath.size(); i < streamletPathSize; i++) {
    77 				if (fs::exists(file) && ::access(file.c_str(), X_OK) == 0) return file.wstring();
    54 				if (streamletPath[i] == ':' || i == streamletPathSize - 1) { // FIXME: support \: and \\ escaping
       
    55 					current << L"/" << field.name;
       
    56 					fs::path streamletFile(current.str());
       
    57 					if (fs::exists(streamletFile) && ::access(streamletFile.c_str(), X_OK) == 0) { // n.b. must be set executable using e.g. chmod – files executable through only ACL, are actually not executable
       
    58 						return current.str();
       
    59 					} else {
       
    60 						current.str(L"");
       
    61 						current.clear();
       
    62 					}
       
    63 				} else {
       
    64 					current.put(streamletPath[i]);
       
    65 				}
       
    66 			}
    78 			}
    67 			throw RelpipeWriterException(L"Streamlet „" + field.name + L"“ was not found at $RELPIPE_IN_FILESYSTEM_STREAMLET_PATH");
    79 			throw RelpipeWriterException(L"Streamlet „" + field.name + L"“ was not found at $RELPIPE_IN_FILESYSTEM_STREAMLET_PATH");
    68 		} else {
    80 		} else {
    69 			throw RelpipeWriterException(L"Missing environment variable RELPIPE_IN_FILESYSTEM_STREAMLET_PATH → unable to find streamlet.");
    81 			throw RelpipeWriterException(L"Missing environment variable RELPIPE_IN_FILESYSTEM_STREAMLET_PATH → unable to find streamlet.");
    70 		}
    82 		}
    97 		}
   109 		}
    98 	}
   110 	}
    99 
   111 
   100 public:
   112 public:
   101 
   113 
   102 	static const string_t SCRIPT_PREFIX;
   114 	virtual vector<AttributeMetadata> toMetadata(RelationalWriter* writer, const string_t& relationName, const RequestedField & field) override {
   103 
       
   104 	virtual vector<AttributeMetadata> toMetadata(RelationalWriter* writer, const string_t& relationName, const RequestedField& field) override {
       
   105 		if (field.group == RequestedField::GROUP_STREAMLET) {
   115 		if (field.group == RequestedField::GROUP_STREAMLET) {
   106 
   116 
   107 			if (cachedMetadata.count(field.id)) {
   117 			if (cachedMetadata.count(field.id)) {
   108 				return cachedMetadata[field.id];
   118 				return cachedMetadata[field.id];
   109 			} else {
   119 			} else {
   158 			}
   168 			}
   159 		}
   169 		}
   160 	}
   170 	}
   161 };
   171 };
   162 
   172 
   163 const relpipe::writer::string_t StreamletAttributeFinder::SCRIPT_PREFIX = L"__relpipe_in_filesystem_script_";
       
   164 
       
   165 }
   173 }
   166 }
   174 }
   167 }
   175 }