parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes v_0
authorFrantišek Kučera <franta-hg@frantovo.cz>
Fri, 24 Jan 2020 16:53:31 +0100
branchv_0
changeset 58 4679f67a8324
parent 57 c40a241d6e0c
child 59 7471529c0d11
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
src/CMakeLists.txt
src/FilesystemCommand.h
src/ParallelFilesystemCommand.h
src/PlainFilesystemCommand.h
--- a/src/CMakeLists.txt	Tue Jan 21 00:19:56 2020 +0100
+++ b/src/CMakeLists.txt	Fri Jan 24 16:53:31 2020 +0100
@@ -34,7 +34,7 @@
 )
 
 # Link libraries:
-target_link_libraries(${EXECUTABLE_FILE} ${RELPIPE_LIBS_LIBRARIES} stdc++fs rt)
+target_link_libraries(${EXECUTABLE_FILE} ${RELPIPE_LIBS_LIBRARIES} stdc++fs rt -pthread)
 set_property(TARGET ${EXECUTABLE_FILE} PROPERTY CXX_STANDARD 17)
 set_property(TARGET ${EXECUTABLE_FILE} PROPERTY INSTALL_RPATH_USE_LINK_PATH TRUE)
 
--- a/src/FilesystemCommand.h	Tue Jan 21 00:19:56 2020 +0100
+++ b/src/FilesystemCommand.h	Fri Jan 24 16:53:31 2020 +0100
@@ -49,7 +49,7 @@
 
 class FilesystemCommand {
 protected:
-	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
+	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.	
 
 	std::map<string_t, std::shared_ptr<AttributeFinder>> createAttributeFinders() {
 		return {
@@ -71,6 +71,21 @@
 		return originalName.tellp();
 	}
 
+	string_t fetchRelationName(Configuration* configuration) {
+		return configuration->relation.empty() ? L"filesystem" : configuration->relation;
+	}
+
+	void writeHeader(RelationalWriter* writer, std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders, string_t relationName, std::vector<RequestedField>* fields) {
+		std::vector<AttributeMetadata> attributesMetadata;
+		for (RequestedField field : *fields) {
+			std::shared_ptr<AttributeFinder> finder = attributeFinders[field.group];
+			if (finder) for (AttributeMetadata m : finder->toMetadata(writer, relationName, field)) attributesMetadata.push_back(m);
+			else throw RelpipeWriterException(L"Unsupported field group: " + field.group);
+		}
+
+		writer->startRelation(relationName, attributesMetadata, true);
+	}
+
 public:
 
 	virtual ~FilesystemCommand() = default;
--- a/src/ParallelFilesystemCommand.h	Tue Jan 21 00:19:56 2020 +0100
+++ b/src/ParallelFilesystemCommand.h	Fri Jan 24 16:53:31 2020 +0100
@@ -16,9 +16,12 @@
  */
 #pragma once
 
+#include <mutex>
 #include <mqueue.h>
 #include <limits.h>
 #include <ext/stdio_filebuf.h>
+#include <sys/wait.h>
+#include <semaphore.h>
 
 #include "FilesystemCommand.h"
 
@@ -31,6 +34,12 @@
 
 class MQ {
 protected:
+	/**
+	 * Process where this object was created. 
+	 * During fork() this object is copied.
+	 * Using this variable we can detect the copy.
+	 */
+	__pid_t originalPid;
 	std::string name;
 	mqd_t handle;
 	bool unlinkAfterClose;
@@ -70,13 +79,14 @@
 		}
 	};
 
-	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
+	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
 		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
+		// FIXME: sometimes we got this error, especially with higher process counts like: --parallel 50
 	}
 
 	virtual ~MQ() {
 		mq_close(handle);
-		if (unlinkAfterClose) mq_unlink(name.c_str());
+		if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str());
 	}
 
 	MQ(const MQ& other) = delete;
@@ -107,7 +117,7 @@
 	}
 public:
 
-	MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT, S_IRWXU, getAttributes()), true) {
+	MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) {
 	}
 
 	void send(const Message* m, unsigned int priority = 0) {
@@ -117,44 +127,220 @@
 	}
 };
 
+/**
+ * TODO: move to a common/streamlet library
+ */
+class NamedMutex {
+private:
+	/**
+	 * Process where this object was created. 
+	 * During fork() this object is copied.
+	 * Using this variable we can detect the copy.
+	 */
+	__pid_t originalPid;
+	sem_t* handle;
+	std::string name;
+	bool owner;
+public:
+
+	NamedMutex(std::string name) : originalPid(getpid()), name(name) {
+		handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
+		sem_post(handle);
+	}
+
+	~NamedMutex() {
+		sem_close(handle);
+		if (originalPid == getpid()) sem_unlink(name.c_str());
+	}
+
+	NamedMutex(const NamedMutex&) = delete;
+	NamedMutex& operator=(const NamedMutex&) = delete;
+
+	void lock() {
+		sem_wait(handle);
+	}
+
+	void unlock() {
+		sem_post(handle);
+	}
+
+	void disown() {
+		owner = false;
+	}
+};
+
 class ParallelFilesystemWorker {
+private:
+	std::string queueName;
+	NamedMutex& stdoutMutex;
+	string_t relationName;
+	std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders;
+	Configuration& configuration;
+	std::wstring_convert < codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
+public:
+
+	ParallelFilesystemWorker(std::string queueName, NamedMutex& stdoutMutex, string_t relationName, std::map<string_t, std::shared_ptr<AttributeFinder> > attributeFinders, Configuration& configuration) : queueName(queueName), stdoutMutex(stdoutMutex), relationName(relationName), attributeFinders(attributeFinders), configuration(configuration) {
+	}
+
+	void run() {
+		MQ::Message readBuffer;
+		MQReader mq(queueName.c_str());
+
+		for (bool running = true; running;) {
+			mq.receive(&readBuffer);
+			std::wstringstream debugLog;
+			if (readBuffer.type == MQ::Message::Type::END) {
+				debugLog << L"PID: " << getpid() << L" received END message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl;
+				running = false;
+			} else if (readBuffer.type == MQ::Message::Type::FILENAME) {
+				debugLog << L"PID: " << getpid() << L" received FILENAME message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl;
+			} else {
+				debugLog << L"PID: " << getpid() << L" received unexpected message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“ of type: " << ((int) readBuffer.type) << std::endl;
+			}
+
+
+			{
+				std::lock_guard lock(stdoutMutex);
+				std::wcerr << debugLog.str() << std::flush;
+			}
+			debugLog.str(L"");
+			debugLog.clear();
+		}
+
+	}
+
 };
 
 class ParallelFilesystemProcess {
+private:
+	__pid_t subPid;
+
+	ParallelFilesystemProcess(__pid_t subPid) : subPid(subPid) {
+	}
+
+	/**
+	 * TODO: move to a common library (copied from the AWK module) 
+	 */
+	static void redirectFD(int oldfd, int newfd) {
+		int result = dup2(oldfd, newfd);
+		if (result < 0) throw ParallelFilesystemProcess::Exception(L"Unable redirect FD.");
+	}
+
+	/**
+	 * TODO: move to a common library (copied from the AWK module) 
+	 */
+	static void closeOrThrow(int fd) {
+		int error = close(fd);
+		if (error) throw ParallelFilesystemProcess::Exception(L"Unable to close FD: " + std::to_wstring(fd) + L" from PID: " + std::to_wstring(getpid()));
+	}
+
+public:
+
+	class Exception : public relpipe::writer::RelpipeWriterException {
+	public:
+
+		Exception(std::wstring message) : relpipe::writer::RelpipeWriterException(message) {
+		}
+
+	};
+
+	static ParallelFilesystemProcess* create(std::string queueName, int outputFD, NamedMutex& stdoutMutex, string_t relationName, std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders, Configuration& configuration) {
+		__pid_t subPid = fork();
+
+		if (subPid < 0) {
+			throw SubProcess::Exception(L"Unable to fork the hash process.");
+		} else if (subPid == 0) {
+			// Child process
+			closeOrThrow(STDIN_FILENO); // strace -cf will show failed close() calls (same as number of processes)
+			if (outputFD != STDOUT_FILENO) redirectFD(outputFD, STDOUT_FILENO);
+			ParallelFilesystemWorker w(queueName, stdoutMutex, relationName, attributeFinders, configuration);
+			w.run();
+			return nullptr;
+		} else {
+			// Parent process
+			return new ParallelFilesystemProcess(subPid);
+		}
+	}
+
+	int wait() {
+		int status = -1;
+		::waitpid(subPid, &status, 0);
+		return status;
+	}
+
+	__pid_t getPid() const {
+		return subPid;
+	}
+
 };
 
 class ParallelFilesystemCommand : public FilesystemCommand {
 public:
 
 	void process(int inputFD, int outputFD, Configuration& configuration) {
-		// TODO: ParallelFilesystemCommand
-
-		{ // TODO: demo code – remove:
-			std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid());
+		__gnu_cxx::stdio_filebuf<char> inputBuffer(inputFD, std::ios::in);
+		__gnu_cxx::stdio_filebuf<char> outputBuffer(outputFD, std::ios::out);
+		std::istream input(&inputBuffer);
+		std::ostream output(&outputBuffer);
 
-			MQWriter mqWriter(queueName.c_str());
-			MQReader mqReader(queueName.c_str());
-
-			MQ::Message writeBuffer;
-			MQ::Message readBuffer;
+		// Write relation header:
+		string_t relationName = fetchRelationName(&configuration);
+		std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders = createAttributeFinders();
+		std::shared_ptr<RelationalWriter> writer(Factory::create(output));
+		writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields);
+		output.flush();
 
-			// ::memset(&writeBuffer, 0, sizeof (writeBuffer));
-			// ::memset(&readBuffer, 0, sizeof (readBuffer));
+		// Create queue:
+		std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid());
+		MQWriter mq(queueName.c_str());
+		MQ::Message writeBuffer;
 
-			writeBuffer.type = MQ::Message::Type::END;
-			writeBuffer.setStringData("ahoj");
+		// Create lock for STDOUT synchronization:
+		NamedMutex stdoutMutex(queueName);
 
-			mqWriter.send(&writeBuffer);
-
-			mqReader.receive(&readBuffer);
-
-			std::string readData(readBuffer.data, readBuffer.dataLength);
-			std::wstring_convert < codecvt_utf8<wchar_t>> convertor;
-
-			std::wcerr << L"Zpráva „" << convertor.from_bytes(readData).c_str() << L"“ typu " << (int) readBuffer.type << L" o celkové délce " << readBuffer.getMessageLength() << L" a délce dat " << readBuffer.dataLength << L" byla přijata." << std::endl;
+		// Start workers:
+		std::vector<std::shared_ptr < ParallelFilesystemProcess>> workerProcesses;
+		bool inMainProcess = true;
+		for (int i = 0; i < configuration.parallelism; i++) {
+			std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, stdoutMutex, relationName, createAttributeFinders(), configuration));
+			if (workerProcess) {
+				workerProcesses.push_back(workerProcess);
+			} else {
+				inMainProcess = false;
+				break;
+			}
 		}
 
-		throw RelpipeWriterException(L"ParallelFilesystemCommand is not yet implemented");
+		if (inMainProcess) {
+			// Distribute file names to the workers:
+			for (std::stringstream originalName; readNext(input, originalName); reset(originalName)) {
+				writeBuffer.type = MQ::Message::Type::FILENAME;
+				writeBuffer.setStringData(originalName.str());
+				mq.send(&writeBuffer);
+			}
+
+			// Tell workers that everything is done:
+			writeBuffer.type = MQ::Message::Type::END;
+			writeBuffer.setStringData("");
+			for (int i = 0; i < configuration.parallelism; i++) mq.send(&writeBuffer);
+
+			// Wait for workers exit:
+			std::map<__pid_t, int> failedProcesses;
+			for (std::shared_ptr<ParallelFilesystemProcess> p : workerProcesses) {
+				int result = p->wait();
+				if (result) failedProcesses[p->getPid()] = result;
+			}
+
+			if (failedProcesses.size()) {
+				std::wstringstream errorMessage;
+				errorMessage << L"One or more processes failed: ";
+				for (auto failed : failedProcesses) errorMessage << failed.first << L":" << failed.second << L", ";
+				throw ParallelFilesystemProcess::Exception(errorMessage.str());
+			}
+
+		} else {
+			// we are in a worker process → do nothing, finished
+		}
 	}
 };
 
--- a/src/PlainFilesystemCommand.h	Tue Jan 21 00:19:56 2020 +0100
+++ b/src/PlainFilesystemCommand.h	Fri Jan 24 16:53:31 2020 +0100
@@ -28,9 +28,6 @@
 using namespace relpipe::writer;
 
 class PlainFilesystemCommand : public FilesystemCommand {
-private:
-	std::map<string_t, std::shared_ptr<AttributeFinder>> attributeFinders = createAttributeFinders();
-
 public:
 
 	void process(int inputFD, int outputFD, Configuration& configuration) {
@@ -40,18 +37,11 @@
 		std::ostream output(&outputBuffer);
 
 		std::shared_ptr<RelationalWriter> writer(Factory::create(output));
-
-		string_t relationName = configuration.relation.empty() ? L"filesystem" : configuration.relation;
+		std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders = createAttributeFinders();
 
-		std::vector<AttributeMetadata> attributesMetadata;
-		for (RequestedField field : configuration.fields) {
-			std::shared_ptr<AttributeFinder> finder = attributeFinders[field.group];
-			if (finder) for (AttributeMetadata m : finder->toMetadata(writer.get(), relationName, field)) attributesMetadata.push_back(m);
-			else throw RelpipeWriterException(L"Unsupported field group: " + field.group);
-		}
-
-		writer->startRelation(relationName, attributesMetadata, true);
-
+		string_t relationName = fetchRelationName(&configuration);
+		writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields);
+		
 
 		for (std::stringstream originalName; readNext(input, originalName); reset(originalName)) {
 			fs::path file(originalName.str().empty() ? "." : originalName.str()); // interpret empty string as current directory (e.g. result of: find -printf '%P\0')