diff -r 7471529c0d11 -r bb7ca5891755 src/ParallelFilesystemCommand.h --- a/src/ParallelFilesystemCommand.h Fri Jan 24 21:05:10 2020 +0100 +++ b/src/ParallelFilesystemCommand.h Sat Jan 25 16:37:20 2020 +0100 @@ -46,6 +46,20 @@ static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count + template static mqd_t mqOpen(const char *__name, int __oflag, Args... args) { + mqd_t handle = mq_open(__name, __oflag, args...); + if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ."); + else return handle; + } + + /** + * @param name + * @param handle do not call mq_open() directly, use MQ:mqOpen() instead. + * @param unlinkAfterClose + */ + MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) { + } + public: class Message { @@ -79,11 +93,6 @@ } }; - MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) { - if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ."); - // TODO: factory method - } - virtual ~MQ() { mq_close(handle); if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str()); @@ -96,7 +105,7 @@ class MQReader : public MQ { public: - MQReader(std::string name) : MQ(name, mq_open(name.c_str(), O_RDONLY)) { + MQReader(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDONLY)) { } void receive(Message* m) { @@ -117,7 +126,7 @@ } public: - MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) { + MQWriter(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) { } void send(const Message* m, unsigned int priority = 0) { @@ -140,12 +149,23 @@ __pid_t originalPid; sem_t* handle; std::string name; + + NamedMutex() { + } + public: - NamedMutex(std::string name) : originalPid(getpid()), name(name) { - handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); - sem_post(handle); - // TODO: factory method, check errors + static NamedMutex* create(std::string name) { + sem_t* handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); + if (handle == SEM_FAILED) throw RelpipeWriterException(L"Unable to open POSIX semaphore."); + + NamedMutex* result = new NamedMutex(); + result->name = name; + result->handle = handle; + result->originalPid = getpid(); + result->unlock(); + + return result; } ~NamedMutex() { @@ -157,11 +177,13 @@ NamedMutex& operator=(const NamedMutex&) = delete; void lock() { - sem_wait(handle); + int error = sem_wait(handle); + if (error) throw RelpipeWriterException(L"Unable to lock POSIX semaphore."); } void unlock() { - sem_post(handle); + int error = sem_post(handle); + if (error) throw RelpipeWriterException(L"Unable to unlock POSIX semaphore."); } }; @@ -210,7 +232,7 @@ } void process(int inputFD, int outputFD, Configuration& configuration) override { - // TODO: refactoring, not used + // FIXME: refactoring, not used } }; @@ -300,13 +322,13 @@ MQ::Message writeBuffer; // Create lock for STDOUT synchronization: - NamedMutex stdoutMutex(queueName); + std::unique_ptr stdoutMutex(NamedMutex::create(queueName)); // Start workers: std::vector> workerProcesses; bool inMainProcess = true; for (int i = 0; i < configuration.parallelism; i++) { - std::shared_ptr workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, stdoutMutex, relationName, createAttributeFinders(), configuration)); + std::shared_ptr workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, *stdoutMutex, relationName, createAttributeFinders(), configuration)); if (workerProcess) { workerProcesses.push_back(workerProcess); } else {