# HG changeset patch # User František Kučera # Date 1579966640 -3600 # Node ID bb7ca589175570e25674f06a35c3c97aa1e5d756 # Parent 7471529c0d113098013d640917372acea880100f parallel processing: refactoring and clean-up diff -r 7471529c0d11 -r bb7ca5891755 nbproject/configurations.xml --- a/nbproject/configurations.xml Fri Jan 24 21:05:10 2020 +0100 +++ b/nbproject/configurations.xml Sat Jan 25 16:37:20 2020 +0100 @@ -69,8 +69,6 @@ - @@ -80,6 +78,13 @@ ${MAKE} -f Makefile ${MAKE} -f Makefile clean build/Debug/src/relpipe-in-filesystem + + + ../relpipe-lib-writer.cpp/include + ../relpipe-lib-cli.cpp/include + build/Debug/src + + build/Debug @@ -88,22 +93,11 @@ - - - ../relpipe-lib-writer.cpp/include/relpipe/writer - src - ../relpipe-lib-writer.cpp/include - build/Debug/src - + - - ../relpipe-lib-writer.cpp/include - ../relpipe-lib-cli.cpp/include - build/Debug/src - diff -r 7471529c0d11 -r bb7ca5891755 src/FileAttributeFinder.h --- a/src/FileAttributeFinder.h Fri Jan 24 21:05:10 2020 +0100 +++ b/src/FileAttributeFinder.h Sat Jan 25 16:37:20 2020 +0100 @@ -55,10 +55,9 @@ } void fetchOwner(const fs::path& file, string_t& owner, string_t& group) { - // TODO: throw exception on error - // TODO: get user and group in C++ way? struct stat info; - stat(file.c_str(), &info); + int result = ::stat(file.c_str(), &info); + if (result) throw RelpipeWriterException(L"Unable to stat() file „" + file.wstring() + L"“ in fetchOwner(). Result: " + std::to_wstring(result)); /** * The return value may point to a static area, and may be * overwritten by subsequent calls to getpwent(3), getpw‐ diff -r 7471529c0d11 -r bb7ca5891755 src/ParallelFilesystemCommand.h --- a/src/ParallelFilesystemCommand.h Fri Jan 24 21:05:10 2020 +0100 +++ b/src/ParallelFilesystemCommand.h Sat Jan 25 16:37:20 2020 +0100 @@ -46,6 +46,20 @@ static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count + template static mqd_t mqOpen(const char *__name, int __oflag, Args... args) { + mqd_t handle = mq_open(__name, __oflag, args...); + if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ."); + else return handle; + } + + /** + * @param name + * @param handle do not call mq_open() directly, use MQ:mqOpen() instead. + * @param unlinkAfterClose + */ + MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) { + } + public: class Message { @@ -79,11 +93,6 @@ } }; - MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) { - if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ."); - // TODO: factory method - } - virtual ~MQ() { mq_close(handle); if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str()); @@ -96,7 +105,7 @@ class MQReader : public MQ { public: - MQReader(std::string name) : MQ(name, mq_open(name.c_str(), O_RDONLY)) { + MQReader(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDONLY)) { } void receive(Message* m) { @@ -117,7 +126,7 @@ } public: - MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) { + MQWriter(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) { } void send(const Message* m, unsigned int priority = 0) { @@ -140,12 +149,23 @@ __pid_t originalPid; sem_t* handle; std::string name; + + NamedMutex() { + } + public: - NamedMutex(std::string name) : originalPid(getpid()), name(name) { - handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); - sem_post(handle); - // TODO: factory method, check errors + static NamedMutex* create(std::string name) { + sem_t* handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); + if (handle == SEM_FAILED) throw RelpipeWriterException(L"Unable to open POSIX semaphore."); + + NamedMutex* result = new NamedMutex(); + result->name = name; + result->handle = handle; + result->originalPid = getpid(); + result->unlock(); + + return result; } ~NamedMutex() { @@ -157,11 +177,13 @@ NamedMutex& operator=(const NamedMutex&) = delete; void lock() { - sem_wait(handle); + int error = sem_wait(handle); + if (error) throw RelpipeWriterException(L"Unable to lock POSIX semaphore."); } void unlock() { - sem_post(handle); + int error = sem_post(handle); + if (error) throw RelpipeWriterException(L"Unable to unlock POSIX semaphore."); } }; @@ -210,7 +232,7 @@ } void process(int inputFD, int outputFD, Configuration& configuration) override { - // TODO: refactoring, not used + // FIXME: refactoring, not used } }; @@ -300,13 +322,13 @@ MQ::Message writeBuffer; // Create lock for STDOUT synchronization: - NamedMutex stdoutMutex(queueName); + std::unique_ptr stdoutMutex(NamedMutex::create(queueName)); // Start workers: std::vector> workerProcesses; bool inMainProcess = true; for (int i = 0; i < configuration.parallelism; i++) { - std::shared_ptr workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, stdoutMutex, relationName, createAttributeFinders(), configuration)); + std::shared_ptr workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, *stdoutMutex, relationName, createAttributeFinders(), configuration)); if (workerProcess) { workerProcesses.push_back(workerProcess); } else { diff -r 7471529c0d11 -r bb7ca5891755 src/StreamletAttributeFinder.h --- a/src/StreamletAttributeFinder.h Fri Jan 24 21:05:10 2020 +0100 +++ b/src/StreamletAttributeFinder.h Sat Jan 25 16:37:20 2020 +0100 @@ -95,7 +95,6 @@ } virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField & field) override { - // TODO: paralelize also over records → fork multiple processes and distribute records across them; then collect results (with a lock) if (field.group == RequestedField::GROUP_STREAMLET) { for (auto metadata : cachedMetadata[field.id]) { SubProcess::Message m = subProcesses[field.id]->read(); @@ -105,7 +104,7 @@ SubProcess::Message m = subProcesses[field.id]->read(); if (m.code != StreamletMsg::WAITING_FOR_INPUT_ATTRIBUTES) throw RelpipeWriterException(L"Protocol violation from exec sub-process. Expected WAITING_FOR_INPUT_ATTRIBUTES but got: " + m.toString()); - // TODO: generic protocol violation error messages / method for checking responses + // FIXME: generic protocol violation error messages / method for checking responses } } diff -r 7471529c0d11 -r bb7ca5891755 src/SubProcess.h --- a/src/SubProcess.h Fri Jan 24 21:05:10 2020 +0100 +++ b/src/SubProcess.h Sat Jan 25 16:37:20 2020 +0100 @@ -25,7 +25,7 @@ #include /** - * TODO: move to a separate library → can be used later also in relpipe-tr-exec + * TODO: move to a separate library → can be used later also in relpipe-tr-streamlet */ class SubProcess { public: