src/ParallelFilesystemCommand.h
branchv_0
changeset 60 bb7ca5891755
parent 59 7471529c0d11
child 61 640ba8948d69
--- a/src/ParallelFilesystemCommand.h	Fri Jan 24 21:05:10 2020 +0100
+++ b/src/ParallelFilesystemCommand.h	Sat Jan 25 16:37:20 2020 +0100
@@ -46,6 +46,20 @@
 	static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable
 	static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count
 
+	template<typename... Args> static mqd_t mqOpen(const char *__name, int __oflag, Args... args) {
+		mqd_t handle = mq_open(__name, __oflag, args...);
+		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
+		else return handle;
+	}
+
+	/**
+	 * @param name
+	 * @param handle do not call mq_open() directly, use MQ:mqOpen() instead.
+	 * @param unlinkAfterClose
+	 */
+	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
+	}
+
 public:
 
 	class Message {
@@ -79,11 +93,6 @@
 		}
 	};
 
-	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
-		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
-		// TODO: factory method
-	}
-
 	virtual ~MQ() {
 		mq_close(handle);
 		if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str());
@@ -96,7 +105,7 @@
 class MQReader : public MQ {
 public:
 
-	MQReader(std::string name) : MQ(name, mq_open(name.c_str(), O_RDONLY)) {
+	MQReader(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDONLY)) {
 	}
 
 	void receive(Message* m) {
@@ -117,7 +126,7 @@
 	}
 public:
 
-	MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) {
+	MQWriter(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) {
 	}
 
 	void send(const Message* m, unsigned int priority = 0) {
@@ -140,12 +149,23 @@
 	__pid_t originalPid;
 	sem_t* handle;
 	std::string name;
+
+	NamedMutex() {
+	}
+
 public:
 
-	NamedMutex(std::string name) : originalPid(getpid()), name(name) {
-		handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
-		sem_post(handle);
-		// TODO: factory method, check errors
+	static NamedMutex* create(std::string name) {
+		sem_t* handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
+		if (handle == SEM_FAILED) throw RelpipeWriterException(L"Unable to open POSIX semaphore.");
+
+		NamedMutex* result = new NamedMutex();
+		result->name = name;
+		result->handle = handle;
+		result->originalPid = getpid();
+		result->unlock();
+
+		return result;
 	}
 
 	~NamedMutex() {
@@ -157,11 +177,13 @@
 	NamedMutex& operator=(const NamedMutex&) = delete;
 
 	void lock() {
-		sem_wait(handle);
+		int error = sem_wait(handle);
+		if (error) throw RelpipeWriterException(L"Unable to lock POSIX semaphore.");
 	}
 
 	void unlock() {
-		sem_post(handle);
+		int error = sem_post(handle);
+		if (error) throw RelpipeWriterException(L"Unable to unlock POSIX semaphore.");
 	}
 };
 
@@ -210,7 +232,7 @@
 	}
 
 	void process(int inputFD, int outputFD, Configuration& configuration) override {
-		// TODO: refactoring, not used
+		// FIXME: refactoring, not used
 	}
 
 };
@@ -300,13 +322,13 @@
 		MQ::Message writeBuffer;
 
 		// Create lock for STDOUT synchronization:
-		NamedMutex stdoutMutex(queueName);
+		std::unique_ptr<NamedMutex> stdoutMutex(NamedMutex::create(queueName));
 
 		// Start workers:
 		std::vector<std::shared_ptr < ParallelFilesystemProcess>> workerProcesses;
 		bool inMainProcess = true;
 		for (int i = 0; i < configuration.parallelism; i++) {
-			std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, stdoutMutex, relationName, createAttributeFinders(), configuration));
+			std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, *stdoutMutex, relationName, createAttributeFinders(), configuration));
 			if (workerProcess) {
 				workerProcesses.push_back(workerProcess);
 			} else {