src/ParallelFilesystemCommand.h
author František Kučera <franta-hg@frantovo.cz>
Sun, 25 Apr 2021 18:47:57 +0200
branchv_0
changeset 89 25a11859975b
parent 61 640ba8948d69
permissions -rw-r--r--
streamlet examples: QR: rename qr to qr-decode + simplify Makefile

/**
 * Relational pipes
 * Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, version 3 of the License.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#pragma once

#include <mutex>
#include <mqueue.h>
#include <limits.h>
#include <ext/stdio_filebuf.h>
#include <sys/wait.h>
#include <semaphore.h>

#include "FilesystemCommand.h"

namespace relpipe {
namespace in {
namespace filesystem {

namespace fs = std::filesystem;
using namespace relpipe::writer;

class MQ {
protected:
	/**
	 * Process where this object was created. 
	 * During fork() this object is copied.
	 * Using this variable we can detect the copy.
	 */
	__pid_t originalPid;
	std::string name;
	mqd_t handle;
	bool unlinkAfterClose;
	static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable
	static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count

	template<typename... Args> static mqd_t mqOpen(const char *__name, int __oflag, Args... args) {
		mqd_t handle = mq_open(__name, __oflag, args...);
		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
		else return handle;
	}

	/**
	 * @param name
	 * @param handle do not call mq_open() directly, use MQ:mqOpen() instead.
	 * @param unlinkAfterClose
	 */
	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
	}

public:

	class Message {
	public:

		enum class Type {
			FILENAME,
			END
		};

		Type type;
		size_t dataLength; // TODO: maybe uint16_t from #include <cstdint> would be enough (and shorten the message minimum size from 16 to 4)
		char data[MQ::MAX_DATA_LENGTH];

		void checkDataLength() const {
			if (dataLength > sizeof (data) || dataLength < 0) throw RelpipeWriterException(L"Invalid POSIX MQ message size.");
		}

		size_t getMessageLength() const {
			return sizeof (*this) - sizeof (data) + dataLength;
		}

		std::string getStringData() {
			return std::string(data, dataLength);
		}

		void setStringData(const std::string& s) {
			if (s.size() > sizeof (data)) throw RelpipeWriterException(L"Unable set message data: string too long.");
			::memcpy(data, s.c_str(), s.size());
			dataLength = s.size();
		}
	};

	virtual ~MQ() {
		mq_close(handle);
		if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str());
	}

	MQ(const MQ& other) = delete;
	void operator=(const MQ& right) = delete;
};

class MQReader : public MQ {
public:

	MQReader(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDONLY)) {
	}

	void receive(Message* m) {
		int result = mq_receive(handle, (char*) m, sizeof (*m), nullptr);
		if (result < 0) throw RelpipeWriterException(L"Unable to receive POSIX MQ message.");
		m->checkDataLength();
	}
};

class MQWriter : public MQ {
private:
	mq_attr attributes;

	mq_attr* getAttributes() {
		attributes.mq_maxmsg = MQ::MAX_MESSAGES;
		attributes.mq_msgsize = sizeof (Message);
		return &attributes;
	}
public:

	MQWriter(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) {
	}

	void send(const Message* m, unsigned int priority = 0) {
		m->checkDataLength();
		int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority);
		if (result) throw RelpipeWriterException(L"Unable to send POSIX MQ message.");
	}
};

/**
 * TODO: move to a common/streamlet library
 */
class NamedMutex {
private:
	/**
	 * Process where this object was created. 
	 * During fork() this object is copied.
	 * Using this variable we can detect the copy.
	 */
	__pid_t originalPid;
	sem_t* handle;
	std::string name;

	NamedMutex() {
	}

public:

	static NamedMutex* create(std::string name) {
		sem_t* handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
		if (handle == SEM_FAILED) throw RelpipeWriterException(L"Unable to open POSIX semaphore.");

		NamedMutex* result = new NamedMutex();
		result->name = name;
		result->handle = handle;
		result->originalPid = getpid();
		result->unlock();

		return result;
	}

	~NamedMutex() {
		sem_close(handle);
		if (originalPid == getpid()) sem_unlink(name.c_str());
	}

	NamedMutex(const NamedMutex&) = delete;
	NamedMutex& operator=(const NamedMutex&) = delete;

	void lock() {
		int error = sem_wait(handle);
		if (error) throw RelpipeWriterException(L"Unable to lock POSIX semaphore.");
	}

	void unlock() {
		int error = sem_post(handle);
		if (error) throw RelpipeWriterException(L"Unable to unlock POSIX semaphore.");
	}
};

class ParallelFilesystemWorker : FilesystemWorker {
private:
	std::string queueName;
	NamedMutex& stdoutMutex;
	string_t relationName;
	std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders;
	Configuration& configuration;
	std::wstring_convert < codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
public:

	ParallelFilesystemWorker(std::string queueName, NamedMutex& stdoutMutex, string_t relationName, std::map<string_t, std::shared_ptr<AttributeFinder> > attributeFinders, Configuration& configuration) : queueName(queueName), stdoutMutex(stdoutMutex), relationName(relationName), attributeFinders(attributeFinders), configuration(configuration) {
	}

	void run() {
		MQ::Message readBuffer;
		MQReader mq(queueName.c_str());

		std::stringstream writeBuffer;
		std::shared_ptr<RelationalWriter> writer(Factory::create(writeBuffer));

		writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields, false);

		while (true) {
			mq.receive(&readBuffer);
			if (readBuffer.type == MQ::Message::Type::FILENAME) {
				std::stringstream originalName(readBuffer.getStringData());
				processSingleFile(writer, originalName, attributeFinders, configuration, relationName);

				{
					std::lock_guard lock(stdoutMutex);
					std::cout << writeBuffer.rdbuf() << std::flush;
					// TODO: optional (configurable) buffering: write multiple records in a single batch
				}
				writeBuffer.str("");
				writeBuffer.clear();
			} else if (readBuffer.type == MQ::Message::Type::END) {
				break;
			} else {
				throw RelpipeWriterException(L"ParallelFilesystemWorker recieved message of unsupported type: " + std::to_wstring((int) readBuffer.type)); // TODO: better exception
			}
		}

	}
};

class ParallelFilesystemProcess {
private:
	__pid_t subPid;

	ParallelFilesystemProcess(__pid_t subPid) : subPid(subPid) {
	}

	/**
	 * TODO: move to a common library (copied from the AWK module) 
	 */
	static void redirectFD(int oldfd, int newfd) {
		int result = dup2(oldfd, newfd);
		if (result < 0) throw ParallelFilesystemProcess::Exception(L"Unable redirect FD.");
	}

	/**
	 * TODO: move to a common library (copied from the AWK module) 
	 */
	static void closeOrThrow(int fd) {
		int error = close(fd);
		if (error) throw ParallelFilesystemProcess::Exception(L"Unable to close FD: " + std::to_wstring(fd) + L" from PID: " + std::to_wstring(getpid()));
	}

public:

	class Exception : public relpipe::writer::RelpipeWriterException {
	public:

		Exception(std::wstring message) : relpipe::writer::RelpipeWriterException(message) {
		}

	};

	static ParallelFilesystemProcess* create(std::string queueName, int outputFD, NamedMutex& stdoutMutex, string_t relationName, std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders, Configuration& configuration) {
		__pid_t subPid = fork();

		if (subPid < 0) {
			throw SubProcess::Exception(L"Unable to fork the hash process.");
		} else if (subPid == 0) {
			// Child process
			closeOrThrow(STDIN_FILENO); // strace -cf will show failed close() calls (same as number of processes)
			if (outputFD != STDOUT_FILENO) redirectFD(outputFD, STDOUT_FILENO);
			ParallelFilesystemWorker w(queueName, stdoutMutex, relationName, attributeFinders, configuration);
			w.run();
			return nullptr;
		} else {
			// Parent process
			return new ParallelFilesystemProcess(subPid);
		}
	}

	int wait() {
		int status = -1;
		::waitpid(subPid, &status, 0);
		return status;
	}

	__pid_t getPid() const {
		return subPid;
	}

};

class ParallelFilesystemCommand : public FilesystemCommand {
public:

	void process(int inputFD, int outputFD, Configuration& configuration) {
		__gnu_cxx::stdio_filebuf<char> inputBuffer(inputFD, std::ios::in);
		__gnu_cxx::stdio_filebuf<char> outputBuffer(outputFD, std::ios::out);
		std::istream input(&inputBuffer);
		std::ostream output(&outputBuffer);

		// Write relation header:
		string_t relationName = fetchRelationName(&configuration);
		std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders = createAttributeFinders();
		std::shared_ptr<RelationalWriter> writer(Factory::create(output));
		writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields);
		output.flush();

		// Create queue:
		std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid());
		MQWriter mq(queueName.c_str());
		MQ::Message writeBuffer;

		// Create lock for STDOUT synchronization:
		std::unique_ptr<NamedMutex> stdoutMutex(NamedMutex::create(queueName));

		// Start workers:
		std::vector<std::shared_ptr < ParallelFilesystemProcess>> workerProcesses;
		bool inMainProcess = true;
		for (int i = 0; i < configuration.parallelism; i++) {
			std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, *stdoutMutex, relationName, createAttributeFinders(), configuration));
			if (workerProcess) {
				workerProcesses.push_back(workerProcess);
			} else {
				inMainProcess = false;
				break;
			}
		}

		if (inMainProcess) {
			// Distribute file names to the workers:
			for (std::stringstream originalName; readNext(input, originalName); reset(originalName)) {
				writeBuffer.type = MQ::Message::Type::FILENAME;
				writeBuffer.setStringData(originalName.str());
				mq.send(&writeBuffer);
			}

			// Tell workers that everything is done:
			writeBuffer.type = MQ::Message::Type::END;
			writeBuffer.setStringData("");
			for (int i = 0; i < configuration.parallelism; i++) mq.send(&writeBuffer);

			// Wait for workers exit:
			std::map<__pid_t, int> failedProcesses;
			for (std::shared_ptr<ParallelFilesystemProcess> p : workerProcesses) {
				int result = p->wait();
				if (result) failedProcesses[p->getPid()] = result;
			}

			if (failedProcesses.size()) {
				std::wstringstream errorMessage;
				errorMessage << L"One or more processes failed: ";
				for (auto failed : failedProcesses) errorMessage << failed.first << L":" << failed.second << L", ";
				throw ParallelFilesystemProcess::Exception(errorMessage.str());
			}

		} else {
			// we are in a worker process → do nothing, finished
		}
	}
};

}
}
}