diff -r c40a241d6e0c -r 4679f67a8324 src/ParallelFilesystemCommand.h --- a/src/ParallelFilesystemCommand.h Tue Jan 21 00:19:56 2020 +0100 +++ b/src/ParallelFilesystemCommand.h Fri Jan 24 16:53:31 2020 +0100 @@ -16,9 +16,12 @@ */ #pragma once +#include #include #include #include +#include +#include #include "FilesystemCommand.h" @@ -31,6 +34,12 @@ class MQ { protected: + /** + * Process where this object was created. + * During fork() this object is copied. + * Using this variable we can detect the copy. + */ + __pid_t originalPid; std::string name; mqd_t handle; bool unlinkAfterClose; @@ -70,13 +79,14 @@ } }; - MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) { + MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) { if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ."); + // FIXME: sometimes we got this error, especially with higher process counts like: --parallel 50 } virtual ~MQ() { mq_close(handle); - if (unlinkAfterClose) mq_unlink(name.c_str()); + if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str()); } MQ(const MQ& other) = delete; @@ -107,7 +117,7 @@ } public: - MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT, S_IRWXU, getAttributes()), true) { + MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) { } void send(const Message* m, unsigned int priority = 0) { @@ -117,44 +127,220 @@ } }; +/** + * TODO: move to a common/streamlet library + */ +class NamedMutex { +private: + /** + * Process where this object was created. + * During fork() this object is copied. + * Using this variable we can detect the copy. + */ + __pid_t originalPid; + sem_t* handle; + std::string name; + bool owner; +public: + + NamedMutex(std::string name) : originalPid(getpid()), name(name) { + handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); + sem_post(handle); + } + + ~NamedMutex() { + sem_close(handle); + if (originalPid == getpid()) sem_unlink(name.c_str()); + } + + NamedMutex(const NamedMutex&) = delete; + NamedMutex& operator=(const NamedMutex&) = delete; + + void lock() { + sem_wait(handle); + } + + void unlock() { + sem_post(handle); + } + + void disown() { + owner = false; + } +}; + class ParallelFilesystemWorker { +private: + std::string queueName; + NamedMutex& stdoutMutex; + string_t relationName; + std::map> attributeFinders; + Configuration& configuration; + std::wstring_convert < codecvt_utf8> convertor; // TODO: support also other encodings. +public: + + ParallelFilesystemWorker(std::string queueName, NamedMutex& stdoutMutex, string_t relationName, std::map > attributeFinders, Configuration& configuration) : queueName(queueName), stdoutMutex(stdoutMutex), relationName(relationName), attributeFinders(attributeFinders), configuration(configuration) { + } + + void run() { + MQ::Message readBuffer; + MQReader mq(queueName.c_str()); + + for (bool running = true; running;) { + mq.receive(&readBuffer); + std::wstringstream debugLog; + if (readBuffer.type == MQ::Message::Type::END) { + debugLog << L"PID: " << getpid() << L" received END message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl; + running = false; + } else if (readBuffer.type == MQ::Message::Type::FILENAME) { + debugLog << L"PID: " << getpid() << L" received FILENAME message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl; + } else { + debugLog << L"PID: " << getpid() << L" received unexpected message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“ of type: " << ((int) readBuffer.type) << std::endl; + } + + + { + std::lock_guard lock(stdoutMutex); + std::wcerr << debugLog.str() << std::flush; + } + debugLog.str(L""); + debugLog.clear(); + } + + } + }; class ParallelFilesystemProcess { +private: + __pid_t subPid; + + ParallelFilesystemProcess(__pid_t subPid) : subPid(subPid) { + } + + /** + * TODO: move to a common library (copied from the AWK module) + */ + static void redirectFD(int oldfd, int newfd) { + int result = dup2(oldfd, newfd); + if (result < 0) throw ParallelFilesystemProcess::Exception(L"Unable redirect FD."); + } + + /** + * TODO: move to a common library (copied from the AWK module) + */ + static void closeOrThrow(int fd) { + int error = close(fd); + if (error) throw ParallelFilesystemProcess::Exception(L"Unable to close FD: " + std::to_wstring(fd) + L" from PID: " + std::to_wstring(getpid())); + } + +public: + + class Exception : public relpipe::writer::RelpipeWriterException { + public: + + Exception(std::wstring message) : relpipe::writer::RelpipeWriterException(message) { + } + + }; + + static ParallelFilesystemProcess* create(std::string queueName, int outputFD, NamedMutex& stdoutMutex, string_t relationName, std::map> attributeFinders, Configuration& configuration) { + __pid_t subPid = fork(); + + if (subPid < 0) { + throw SubProcess::Exception(L"Unable to fork the hash process."); + } else if (subPid == 0) { + // Child process + closeOrThrow(STDIN_FILENO); // strace -cf will show failed close() calls (same as number of processes) + if (outputFD != STDOUT_FILENO) redirectFD(outputFD, STDOUT_FILENO); + ParallelFilesystemWorker w(queueName, stdoutMutex, relationName, attributeFinders, configuration); + w.run(); + return nullptr; + } else { + // Parent process + return new ParallelFilesystemProcess(subPid); + } + } + + int wait() { + int status = -1; + ::waitpid(subPid, &status, 0); + return status; + } + + __pid_t getPid() const { + return subPid; + } + }; class ParallelFilesystemCommand : public FilesystemCommand { public: void process(int inputFD, int outputFD, Configuration& configuration) { - // TODO: ParallelFilesystemCommand - - { // TODO: demo code – remove: - std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid()); + __gnu_cxx::stdio_filebuf inputBuffer(inputFD, std::ios::in); + __gnu_cxx::stdio_filebuf outputBuffer(outputFD, std::ios::out); + std::istream input(&inputBuffer); + std::ostream output(&outputBuffer); - MQWriter mqWriter(queueName.c_str()); - MQReader mqReader(queueName.c_str()); - - MQ::Message writeBuffer; - MQ::Message readBuffer; + // Write relation header: + string_t relationName = fetchRelationName(&configuration); + std::map> attributeFinders = createAttributeFinders(); + std::shared_ptr writer(Factory::create(output)); + writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields); + output.flush(); - // ::memset(&writeBuffer, 0, sizeof (writeBuffer)); - // ::memset(&readBuffer, 0, sizeof (readBuffer)); + // Create queue: + std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid()); + MQWriter mq(queueName.c_str()); + MQ::Message writeBuffer; - writeBuffer.type = MQ::Message::Type::END; - writeBuffer.setStringData("ahoj"); + // Create lock for STDOUT synchronization: + NamedMutex stdoutMutex(queueName); - mqWriter.send(&writeBuffer); - - mqReader.receive(&readBuffer); - - std::string readData(readBuffer.data, readBuffer.dataLength); - std::wstring_convert < codecvt_utf8> convertor; - - std::wcerr << L"Zpráva „" << convertor.from_bytes(readData).c_str() << L"“ typu " << (int) readBuffer.type << L" o celkové délce " << readBuffer.getMessageLength() << L" a délce dat " << readBuffer.dataLength << L" byla přijata." << std::endl; + // Start workers: + std::vector> workerProcesses; + bool inMainProcess = true; + for (int i = 0; i < configuration.parallelism; i++) { + std::shared_ptr workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, stdoutMutex, relationName, createAttributeFinders(), configuration)); + if (workerProcess) { + workerProcesses.push_back(workerProcess); + } else { + inMainProcess = false; + break; + } } - throw RelpipeWriterException(L"ParallelFilesystemCommand is not yet implemented"); + if (inMainProcess) { + // Distribute file names to the workers: + for (std::stringstream originalName; readNext(input, originalName); reset(originalName)) { + writeBuffer.type = MQ::Message::Type::FILENAME; + writeBuffer.setStringData(originalName.str()); + mq.send(&writeBuffer); + } + + // Tell workers that everything is done: + writeBuffer.type = MQ::Message::Type::END; + writeBuffer.setStringData(""); + for (int i = 0; i < configuration.parallelism; i++) mq.send(&writeBuffer); + + // Wait for workers exit: + std::map<__pid_t, int> failedProcesses; + for (std::shared_ptr p : workerProcesses) { + int result = p->wait(); + if (result) failedProcesses[p->getPid()] = result; + } + + if (failedProcesses.size()) { + std::wstringstream errorMessage; + errorMessage << L"One or more processes failed: "; + for (auto failed : failedProcesses) errorMessage << failed.first << L":" << failed.second << L", "; + throw ParallelFilesystemProcess::Exception(errorMessage.str()); + } + + } else { + // we are in a worker process → do nothing, finished + } } };