parallel processing: POSIX MQ helper classes + some demo code v_0
authorFrantišek Kučera <>
Mon, 20 Jan 2020 23:47:54 +0100
changeset 56 81a53e7cf0ab
parent 55 698836fc65b4
child 57 c40a241d6e0c
parallel processing: POSIX MQ helper classes + some demo code
--- a/src/ParallelFilesystemCommand.h	Mon Jan 20 15:48:39 2020 +0100
+++ b/src/ParallelFilesystemCommand.h	Mon Jan 20 23:47:54 2020 +0100
@@ -17,6 +17,7 @@
 #pragma once
 #include <mqueue.h>
+#include <limits.h>
 #include "FilesystemCommand.h"
@@ -27,12 +28,131 @@
 namespace fs = std::filesystem;
 using namespace relpipe::writer;
+class MQ {
+	std::string name;
+	mqd_t handle;
+	bool unlinkAfterClose;
+	static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable
+	static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count
+	class Message {
+	public:
+		enum class Type {
+			END
+		};
+		Type type;
+		size_t dataLength; // TODO: maybe uint16_t from #include <cstdint> would be enough (and shorten the message minimum size from 16 to 4)
+		char data[MQ::MAX_DATA_LENGTH];
+		void checkDataLength() const {
+			if (dataLength > sizeof (data) || dataLength < 0) throw RelpipeWriterException(L"Invalid POSIX MQ message size.");
+		}
+		size_t getMessageLength() const {
+			return sizeof (*this) - sizeof (data) + dataLength;
+		}
+		std::string getStringData() {
+			return std::string(data, dataLength);
+		}
+		void setStringData(const std::string& s) {
+			if (s.size() > sizeof (data)) throw RelpipeWriterException(L"Unable set message data: string too long.");
+			::memcpy(data, s.c_str(), s.size());
+			dataLength = s.size();
+		}
+	};
+	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
+		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
+	}
+	virtual ~MQ() {
+		mq_close(handle);
+		if (unlinkAfterClose) mq_unlink(name.c_str());
+	}
+	MQ(const MQ& other) = delete;
+	void operator=(const MQ& right) = delete;
+class MQReader : public MQ {
+	MQReader(std::string name) : MQ(name, mq_open(name.c_str(), O_RDONLY)) {
+	}
+	void receive(Message* m) {
+		int result = mq_receive(handle, (char*) m, sizeof (*m), nullptr);
+		if (result < 0) throw RelpipeWriterException(L"Unable to receive POSIX MQ message.");
+		m->checkDataLength();
+	}
+class MQWriter : public MQ {
+	mq_attr attributes;
+	mq_attr* getAttributes() {
+		attributes.mq_maxmsg = MQ::MAX_MESSAGES;
+		attributes.mq_msgsize = sizeof (Message);
+		return &attributes;
+	}
+	MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT, S_IRWXU, getAttributes()), true) {
+	}
+	void send(const Message* m, unsigned int priority = 0) {
+		m->checkDataLength();
+		int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority);
+		if (result) throw RelpipeWriterException(L"Unable to send POSIX MQ message.");
+	}
+class ParallelFilesystemWorker {
+class ParallelFilesystemProcess {
 class ParallelFilesystemCommand : public FilesystemCommand {
 	void process(std::istream& input, std::ostream& output, Configuration& configuration) {
 		// TODO: ParallelFilesystemCommand
-		mq_close(0); // FIXME: remove (this line just tests that linking to librt worked well)
+		{ // TODO: demo code – remove:
+			std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid());
+			MQWriter mqWriter(queueName.c_str());
+			MQReader mqReader(queueName.c_str());
+			MQ::Message writeBuffer;
+			MQ::Message readBuffer;
+			// ::memset(&writeBuffer, 0, sizeof (writeBuffer));
+			// ::memset(&readBuffer, 0, sizeof (readBuffer));
+			writeBuffer.type = MQ::Message::Type::END;
+			writeBuffer.setStringData("ahoj");
+			mqWriter.send(&writeBuffer);
+			mqReader.receive(&readBuffer);
+			std::string readData(, readBuffer.dataLength);
+			std::wstring_convert < codecvt_utf8<wchar_t>> convertor;
+			std::wcerr << L"Zpráva „" << convertor.from_bytes(readData).c_str() << L"“ typu " << (int) readBuffer.type << L" o celkové délce " << readBuffer.getMessageLength() << L" a délce dat " << readBuffer.dataLength << L" byla přijata." << std::endl;
+		}
 		throw RelpipeWriterException(L"ParallelFilesystemCommand is not yet implemented");