105 attributes.mq_msgsize = sizeof (Message); |
115 attributes.mq_msgsize = sizeof (Message); |
106 return &attributes; |
116 return &attributes; |
107 } |
117 } |
108 public: |
118 public: |
109 |
119 |
110 MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT, S_IRWXU, getAttributes()), true) { |
120 MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) { |
111 } |
121 } |
112 |
122 |
113 void send(const Message* m, unsigned int priority = 0) { |
123 void send(const Message* m, unsigned int priority = 0) { |
114 m->checkDataLength(); |
124 m->checkDataLength(); |
115 int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority); |
125 int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority); |
116 if (result) throw RelpipeWriterException(L"Unable to send POSIX MQ message."); |
126 if (result) throw RelpipeWriterException(L"Unable to send POSIX MQ message."); |
117 } |
127 } |
118 }; |
128 }; |
119 |
129 |
|
130 /** |
|
131 * TODO: move to a common/streamlet library |
|
132 */ |
|
133 class NamedMutex { |
|
134 private: |
|
135 /** |
|
136 * Process where this object was created. |
|
137 * During fork() this object is copied. |
|
138 * Using this variable we can detect the copy. |
|
139 */ |
|
140 __pid_t originalPid; |
|
141 sem_t* handle; |
|
142 std::string name; |
|
143 bool owner; |
|
144 public: |
|
145 |
|
146 NamedMutex(std::string name) : originalPid(getpid()), name(name) { |
|
147 handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); |
|
148 sem_post(handle); |
|
149 } |
|
150 |
|
151 ~NamedMutex() { |
|
152 sem_close(handle); |
|
153 if (originalPid == getpid()) sem_unlink(name.c_str()); |
|
154 } |
|
155 |
|
156 NamedMutex(const NamedMutex&) = delete; |
|
157 NamedMutex& operator=(const NamedMutex&) = delete; |
|
158 |
|
159 void lock() { |
|
160 sem_wait(handle); |
|
161 } |
|
162 |
|
163 void unlock() { |
|
164 sem_post(handle); |
|
165 } |
|
166 |
|
167 void disown() { |
|
168 owner = false; |
|
169 } |
|
170 }; |
|
171 |
120 class ParallelFilesystemWorker { |
172 class ParallelFilesystemWorker { |
|
173 private: |
|
174 std::string queueName; |
|
175 NamedMutex& stdoutMutex; |
|
176 string_t relationName; |
|
177 std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders; |
|
178 Configuration& configuration; |
|
179 std::wstring_convert < codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings. |
|
180 public: |
|
181 |
|
182 ParallelFilesystemWorker(std::string queueName, NamedMutex& stdoutMutex, string_t relationName, std::map<string_t, std::shared_ptr<AttributeFinder> > attributeFinders, Configuration& configuration) : queueName(queueName), stdoutMutex(stdoutMutex), relationName(relationName), attributeFinders(attributeFinders), configuration(configuration) { |
|
183 } |
|
184 |
|
185 void run() { |
|
186 MQ::Message readBuffer; |
|
187 MQReader mq(queueName.c_str()); |
|
188 |
|
189 for (bool running = true; running;) { |
|
190 mq.receive(&readBuffer); |
|
191 std::wstringstream debugLog; |
|
192 if (readBuffer.type == MQ::Message::Type::END) { |
|
193 debugLog << L"PID: " << getpid() << L" received END message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl; |
|
194 running = false; |
|
195 } else if (readBuffer.type == MQ::Message::Type::FILENAME) { |
|
196 debugLog << L"PID: " << getpid() << L" received FILENAME message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl; |
|
197 } else { |
|
198 debugLog << L"PID: " << getpid() << L" received unexpected message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“ of type: " << ((int) readBuffer.type) << std::endl; |
|
199 } |
|
200 |
|
201 |
|
202 { |
|
203 std::lock_guard lock(stdoutMutex); |
|
204 std::wcerr << debugLog.str() << std::flush; |
|
205 } |
|
206 debugLog.str(L""); |
|
207 debugLog.clear(); |
|
208 } |
|
209 |
|
210 } |
|
211 |
121 }; |
212 }; |
122 |
213 |
123 class ParallelFilesystemProcess { |
214 class ParallelFilesystemProcess { |
|
215 private: |
|
216 __pid_t subPid; |
|
217 |
|
218 ParallelFilesystemProcess(__pid_t subPid) : subPid(subPid) { |
|
219 } |
|
220 |
|
221 /** |
|
222 * TODO: move to a common library (copied from the AWK module) |
|
223 */ |
|
224 static void redirectFD(int oldfd, int newfd) { |
|
225 int result = dup2(oldfd, newfd); |
|
226 if (result < 0) throw ParallelFilesystemProcess::Exception(L"Unable redirect FD."); |
|
227 } |
|
228 |
|
229 /** |
|
230 * TODO: move to a common library (copied from the AWK module) |
|
231 */ |
|
232 static void closeOrThrow(int fd) { |
|
233 int error = close(fd); |
|
234 if (error) throw ParallelFilesystemProcess::Exception(L"Unable to close FD: " + std::to_wstring(fd) + L" from PID: " + std::to_wstring(getpid())); |
|
235 } |
|
236 |
|
237 public: |
|
238 |
|
239 class Exception : public relpipe::writer::RelpipeWriterException { |
|
240 public: |
|
241 |
|
242 Exception(std::wstring message) : relpipe::writer::RelpipeWriterException(message) { |
|
243 } |
|
244 |
|
245 }; |
|
246 |
|
247 static ParallelFilesystemProcess* create(std::string queueName, int outputFD, NamedMutex& stdoutMutex, string_t relationName, std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders, Configuration& configuration) { |
|
248 __pid_t subPid = fork(); |
|
249 |
|
250 if (subPid < 0) { |
|
251 throw SubProcess::Exception(L"Unable to fork the hash process."); |
|
252 } else if (subPid == 0) { |
|
253 // Child process |
|
254 closeOrThrow(STDIN_FILENO); // strace -cf will show failed close() calls (same as number of processes) |
|
255 if (outputFD != STDOUT_FILENO) redirectFD(outputFD, STDOUT_FILENO); |
|
256 ParallelFilesystemWorker w(queueName, stdoutMutex, relationName, attributeFinders, configuration); |
|
257 w.run(); |
|
258 return nullptr; |
|
259 } else { |
|
260 // Parent process |
|
261 return new ParallelFilesystemProcess(subPid); |
|
262 } |
|
263 } |
|
264 |
|
265 int wait() { |
|
266 int status = -1; |
|
267 ::waitpid(subPid, &status, 0); |
|
268 return status; |
|
269 } |
|
270 |
|
271 __pid_t getPid() const { |
|
272 return subPid; |
|
273 } |
|
274 |
124 }; |
275 }; |
125 |
276 |
126 class ParallelFilesystemCommand : public FilesystemCommand { |
277 class ParallelFilesystemCommand : public FilesystemCommand { |
127 public: |
278 public: |
128 |
279 |
129 void process(int inputFD, int outputFD, Configuration& configuration) { |
280 void process(int inputFD, int outputFD, Configuration& configuration) { |
130 // TODO: ParallelFilesystemCommand |
281 __gnu_cxx::stdio_filebuf<char> inputBuffer(inputFD, std::ios::in); |
131 |
282 __gnu_cxx::stdio_filebuf<char> outputBuffer(outputFD, std::ios::out); |
132 { // TODO: demo code – remove: |
283 std::istream input(&inputBuffer); |
133 std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid()); |
284 std::ostream output(&outputBuffer); |
134 |
285 |
135 MQWriter mqWriter(queueName.c_str()); |
286 // Write relation header: |
136 MQReader mqReader(queueName.c_str()); |
287 string_t relationName = fetchRelationName(&configuration); |
137 |
288 std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders = createAttributeFinders(); |
138 MQ::Message writeBuffer; |
289 std::shared_ptr<RelationalWriter> writer(Factory::create(output)); |
139 MQ::Message readBuffer; |
290 writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields); |
140 |
291 output.flush(); |
141 // ::memset(&writeBuffer, 0, sizeof (writeBuffer)); |
292 |
142 // ::memset(&readBuffer, 0, sizeof (readBuffer)); |
293 // Create queue: |
143 |
294 std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid()); |
|
295 MQWriter mq(queueName.c_str()); |
|
296 MQ::Message writeBuffer; |
|
297 |
|
298 // Create lock for STDOUT synchronization: |
|
299 NamedMutex stdoutMutex(queueName); |
|
300 |
|
301 // Start workers: |
|
302 std::vector<std::shared_ptr < ParallelFilesystemProcess>> workerProcesses; |
|
303 bool inMainProcess = true; |
|
304 for (int i = 0; i < configuration.parallelism; i++) { |
|
305 std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, stdoutMutex, relationName, createAttributeFinders(), configuration)); |
|
306 if (workerProcess) { |
|
307 workerProcesses.push_back(workerProcess); |
|
308 } else { |
|
309 inMainProcess = false; |
|
310 break; |
|
311 } |
|
312 } |
|
313 |
|
314 if (inMainProcess) { |
|
315 // Distribute file names to the workers: |
|
316 for (std::stringstream originalName; readNext(input, originalName); reset(originalName)) { |
|
317 writeBuffer.type = MQ::Message::Type::FILENAME; |
|
318 writeBuffer.setStringData(originalName.str()); |
|
319 mq.send(&writeBuffer); |
|
320 } |
|
321 |
|
322 // Tell workers that everything is done: |
144 writeBuffer.type = MQ::Message::Type::END; |
323 writeBuffer.type = MQ::Message::Type::END; |
145 writeBuffer.setStringData("ahoj"); |
324 writeBuffer.setStringData(""); |
146 |
325 for (int i = 0; i < configuration.parallelism; i++) mq.send(&writeBuffer); |
147 mqWriter.send(&writeBuffer); |
326 |
148 |
327 // Wait for workers exit: |
149 mqReader.receive(&readBuffer); |
328 std::map<__pid_t, int> failedProcesses; |
150 |
329 for (std::shared_ptr<ParallelFilesystemProcess> p : workerProcesses) { |
151 std::string readData(readBuffer.data, readBuffer.dataLength); |
330 int result = p->wait(); |
152 std::wstring_convert < codecvt_utf8<wchar_t>> convertor; |
331 if (result) failedProcesses[p->getPid()] = result; |
153 |
332 } |
154 std::wcerr << L"Zpráva „" << convertor.from_bytes(readData).c_str() << L"“ typu " << (int) readBuffer.type << L" o celkové délce " << readBuffer.getMessageLength() << L" a délce dat " << readBuffer.dataLength << L" byla přijata." << std::endl; |
333 |
155 } |
334 if (failedProcesses.size()) { |
156 |
335 std::wstringstream errorMessage; |
157 throw RelpipeWriterException(L"ParallelFilesystemCommand is not yet implemented"); |
336 errorMessage << L"One or more processes failed: "; |
|
337 for (auto failed : failedProcesses) errorMessage << failed.first << L":" << failed.second << L", "; |
|
338 throw ParallelFilesystemProcess::Exception(errorMessage.str()); |
|
339 } |
|
340 |
|
341 } else { |
|
342 // we are in a worker process → do nothing, finished |
|
343 } |
158 } |
344 } |
159 }; |
345 }; |
160 |
346 |
161 } |
347 } |
162 } |
348 } |