parallel processing: refactoring and clean-up v_0
authorFrantišek Kučera <franta-hg@frantovo.cz>
Sat, 25 Jan 2020 16:37:20 +0100
branchv_0
changeset 60 bb7ca5891755
parent 59 7471529c0d11
child 61 640ba8948d69
parallel processing: refactoring and clean-up
nbproject/configurations.xml
src/FileAttributeFinder.h
src/ParallelFilesystemCommand.h
src/StreamletAttributeFinder.h
src/SubProcess.h
--- a/nbproject/configurations.xml	Fri Jan 24 21:05:10 2020 +0100
+++ b/nbproject/configurations.xml	Sat Jan 25 16:37:20 2020 +0100
@@ -69,8 +69,6 @@
       </toolsSet>
       <flagsDictionary>
         <element flagsID="0" commonFlags="-fsanitize=address -std=gnu++1z"/>
-        <element flagsID="1"
-                 commonFlags="-mtune=generic -march=x86-64 -std=gnu++17 -fsanitize=address -fstack-protector-strong"/>
       </flagsDictionary>
       <codeAssistance>
       </codeAssistance>
@@ -80,6 +78,13 @@
           <buildCommand>${MAKE} -f Makefile</buildCommand>
           <cleanCommand>${MAKE} -f Makefile clean</cleanCommand>
           <executablePath>build/Debug/src/relpipe-in-filesystem</executablePath>
+          <ccTool>
+            <incDir>
+              <pElem>../relpipe-lib-writer.cpp/include</pElem>
+              <pElem>../relpipe-lib-cli.cpp/include</pElem>
+              <pElem>build/Debug/src</pElem>
+            </incDir>
+          </ccTool>
         </makeTool>
         <preBuild>
           <preBuildCommandWorkingDir>build/Debug</preBuildCommandWorkingDir>
@@ -88,22 +93,11 @@
         </preBuild>
       </makefileType>
       <item path="src/SubProcess.cpp" ex="false" tool="1" flavor2="11">
-        <ccTool flags="1">
-          <incDir>
-            <pElem>../relpipe-lib-writer.cpp/include/relpipe/writer</pElem>
-            <pElem>src</pElem>
-            <pElem>../relpipe-lib-writer.cpp/include</pElem>
-            <pElem>build/Debug/src</pElem>
-          </incDir>
+        <ccTool flags="0">
         </ccTool>
       </item>
       <item path="src/relpipe-in-filesystem.cpp" ex="false" tool="1" flavor2="11">
         <ccTool flags="0">
-          <incDir>
-            <pElem>../relpipe-lib-writer.cpp/include</pElem>
-            <pElem>../relpipe-lib-cli.cpp/include</pElem>
-            <pElem>build/Debug/src</pElem>
-          </incDir>
         </ccTool>
       </item>
     </conf>
--- a/src/FileAttributeFinder.h	Fri Jan 24 21:05:10 2020 +0100
+++ b/src/FileAttributeFinder.h	Sat Jan 25 16:37:20 2020 +0100
@@ -55,10 +55,9 @@
 	}
 
 	void fetchOwner(const fs::path& file, string_t& owner, string_t& group) {
-		// TODO: throw exception on error
-		// TODO: get user and group in C++ way?
 		struct stat info;
-		stat(file.c_str(), &info);
+		int result = ::stat(file.c_str(), &info);
+		if (result) throw RelpipeWriterException(L"Unable to stat() file „" + file.wstring() + L"“ in fetchOwner(). Result: " + std::to_wstring(result));
 		/**
 		 * The return value may point to a static area, and may  be
 		 * overwritten  by  subsequent calls to getpwent(3), getpw‐
--- a/src/ParallelFilesystemCommand.h	Fri Jan 24 21:05:10 2020 +0100
+++ b/src/ParallelFilesystemCommand.h	Sat Jan 25 16:37:20 2020 +0100
@@ -46,6 +46,20 @@
 	static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable
 	static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count
 
+	template<typename... Args> static mqd_t mqOpen(const char *__name, int __oflag, Args... args) {
+		mqd_t handle = mq_open(__name, __oflag, args...);
+		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
+		else return handle;
+	}
+
+	/**
+	 * @param name
+	 * @param handle do not call mq_open() directly, use MQ:mqOpen() instead.
+	 * @param unlinkAfterClose
+	 */
+	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
+	}
+
 public:
 
 	class Message {
@@ -79,11 +93,6 @@
 		}
 	};
 
-	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
-		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
-		// TODO: factory method
-	}
-
 	virtual ~MQ() {
 		mq_close(handle);
 		if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str());
@@ -96,7 +105,7 @@
 class MQReader : public MQ {
 public:
 
-	MQReader(std::string name) : MQ(name, mq_open(name.c_str(), O_RDONLY)) {
+	MQReader(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDONLY)) {
 	}
 
 	void receive(Message* m) {
@@ -117,7 +126,7 @@
 	}
 public:
 
-	MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) {
+	MQWriter(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) {
 	}
 
 	void send(const Message* m, unsigned int priority = 0) {
@@ -140,12 +149,23 @@
 	__pid_t originalPid;
 	sem_t* handle;
 	std::string name;
+
+	NamedMutex() {
+	}
+
 public:
 
-	NamedMutex(std::string name) : originalPid(getpid()), name(name) {
-		handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
-		sem_post(handle);
-		// TODO: factory method, check errors
+	static NamedMutex* create(std::string name) {
+		sem_t* handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
+		if (handle == SEM_FAILED) throw RelpipeWriterException(L"Unable to open POSIX semaphore.");
+
+		NamedMutex* result = new NamedMutex();
+		result->name = name;
+		result->handle = handle;
+		result->originalPid = getpid();
+		result->unlock();
+
+		return result;
 	}
 
 	~NamedMutex() {
@@ -157,11 +177,13 @@
 	NamedMutex& operator=(const NamedMutex&) = delete;
 
 	void lock() {
-		sem_wait(handle);
+		int error = sem_wait(handle);
+		if (error) throw RelpipeWriterException(L"Unable to lock POSIX semaphore.");
 	}
 
 	void unlock() {
-		sem_post(handle);
+		int error = sem_post(handle);
+		if (error) throw RelpipeWriterException(L"Unable to unlock POSIX semaphore.");
 	}
 };
 
@@ -210,7 +232,7 @@
 	}
 
 	void process(int inputFD, int outputFD, Configuration& configuration) override {
-		// TODO: refactoring, not used
+		// FIXME: refactoring, not used
 	}
 
 };
@@ -300,13 +322,13 @@
 		MQ::Message writeBuffer;
 
 		// Create lock for STDOUT synchronization:
-		NamedMutex stdoutMutex(queueName);
+		std::unique_ptr<NamedMutex> stdoutMutex(NamedMutex::create(queueName));
 
 		// Start workers:
 		std::vector<std::shared_ptr < ParallelFilesystemProcess>> workerProcesses;
 		bool inMainProcess = true;
 		for (int i = 0; i < configuration.parallelism; i++) {
-			std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, stdoutMutex, relationName, createAttributeFinders(), configuration));
+			std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, *stdoutMutex, relationName, createAttributeFinders(), configuration));
 			if (workerProcess) {
 				workerProcesses.push_back(workerProcess);
 			} else {
--- a/src/StreamletAttributeFinder.h	Fri Jan 24 21:05:10 2020 +0100
+++ b/src/StreamletAttributeFinder.h	Sat Jan 25 16:37:20 2020 +0100
@@ -95,7 +95,6 @@
 	}
 
 	virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField & field) override {
-		// TODO: paralelize also over records → fork multiple processes and distribute records across them; then collect results (with a lock)
 		if (field.group == RequestedField::GROUP_STREAMLET) {
 			for (auto metadata : cachedMetadata[field.id]) {
 				SubProcess::Message m = subProcesses[field.id]->read();
@@ -105,7 +104,7 @@
 
 			SubProcess::Message m = subProcesses[field.id]->read();
 			if (m.code != StreamletMsg::WAITING_FOR_INPUT_ATTRIBUTES) throw RelpipeWriterException(L"Protocol violation from exec sub-process. Expected WAITING_FOR_INPUT_ATTRIBUTES but got: " + m.toString());
-			// TODO: generic protocol violation error messages / method for checking responses
+			// FIXME: generic protocol violation error messages / method for checking responses
 		}
 	}
 
--- a/src/SubProcess.h	Fri Jan 24 21:05:10 2020 +0100
+++ b/src/SubProcess.h	Sat Jan 25 16:37:20 2020 +0100
@@ -25,7 +25,7 @@
 #include <relpipe/writer/RelpipeWriterException.h>
 
 /**
- * TODO: move to a separate library → can be used later also in relpipe-tr-exec
+ * TODO: move to a separate library → can be used later also in relpipe-tr-streamlet
  */
 class SubProcess {
 public: