--- a/src/ParallelFilesystemCommand.h Fri Jan 24 21:05:10 2020 +0100
+++ b/src/ParallelFilesystemCommand.h Sat Jan 25 16:37:20 2020 +0100
@@ -46,6 +46,20 @@
static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable
static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count
+ template<typename... Args> static mqd_t mqOpen(const char *__name, int __oflag, Args... args) {
+ mqd_t handle = mq_open(__name, __oflag, args...);
+ if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
+ else return handle;
+ }
+
+ /**
+ * @param name
+ * @param handle do not call mq_open() directly, use MQ:mqOpen() instead.
+ * @param unlinkAfterClose
+ */
+ MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
+ }
+
public:
class Message {
@@ -79,11 +93,6 @@
}
};
- MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
- if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
- // TODO: factory method
- }
-
virtual ~MQ() {
mq_close(handle);
if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str());
@@ -96,7 +105,7 @@
class MQReader : public MQ {
public:
- MQReader(std::string name) : MQ(name, mq_open(name.c_str(), O_RDONLY)) {
+ MQReader(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDONLY)) {
}
void receive(Message* m) {
@@ -117,7 +126,7 @@
}
public:
- MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) {
+ MQWriter(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) {
}
void send(const Message* m, unsigned int priority = 0) {
@@ -140,12 +149,23 @@
__pid_t originalPid;
sem_t* handle;
std::string name;
+
+ NamedMutex() {
+ }
+
public:
- NamedMutex(std::string name) : originalPid(getpid()), name(name) {
- handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
- sem_post(handle);
- // TODO: factory method, check errors
+ static NamedMutex* create(std::string name) {
+ sem_t* handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
+ if (handle == SEM_FAILED) throw RelpipeWriterException(L"Unable to open POSIX semaphore.");
+
+ NamedMutex* result = new NamedMutex();
+ result->name = name;
+ result->handle = handle;
+ result->originalPid = getpid();
+ result->unlock();
+
+ return result;
}
~NamedMutex() {
@@ -157,11 +177,13 @@
NamedMutex& operator=(const NamedMutex&) = delete;
void lock() {
- sem_wait(handle);
+ int error = sem_wait(handle);
+ if (error) throw RelpipeWriterException(L"Unable to lock POSIX semaphore.");
}
void unlock() {
- sem_post(handle);
+ int error = sem_post(handle);
+ if (error) throw RelpipeWriterException(L"Unable to unlock POSIX semaphore.");
}
};
@@ -210,7 +232,7 @@
}
void process(int inputFD, int outputFD, Configuration& configuration) override {
- // TODO: refactoring, not used
+ // FIXME: refactoring, not used
}
};
@@ -300,13 +322,13 @@
MQ::Message writeBuffer;
// Create lock for STDOUT synchronization:
- NamedMutex stdoutMutex(queueName);
+ std::unique_ptr<NamedMutex> stdoutMutex(NamedMutex::create(queueName));
// Start workers:
std::vector<std::shared_ptr < ParallelFilesystemProcess>> workerProcesses;
bool inMainProcess = true;
for (int i = 0; i < configuration.parallelism; i++) {
- std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, stdoutMutex, relationName, createAttributeFinders(), configuration));
+ std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, *stdoutMutex, relationName, createAttributeFinders(), configuration));
if (workerProcess) {
workerProcesses.push_back(workerProcess);
} else {