src/ParallelFilesystemCommand.h
author František Kučera <franta-hg@frantovo.cz>
Mon, 20 Jan 2020 23:47:54 +0100
branchv_0
changeset 56 81a53e7cf0ab
parent 55 698836fc65b4
child 57 c40a241d6e0c
permissions -rw-r--r--
parallel processing: POSIX MQ helper classes + some demo code

/**
 * Relational pipes
 * Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, version 3 of the License.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
#pragma once

#include <mqueue.h>
#include <limits.h>

#include "FilesystemCommand.h"

namespace relpipe {
namespace in {
namespace filesystem {

namespace fs = std::filesystem;
using namespace relpipe::writer;

class MQ {
protected:
	std::string name;
	mqd_t handle;
	bool unlinkAfterClose;
	static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable
	static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count

public:

	class Message {
	public:

		enum class Type {
			FILENAME,
			END
		};

		Type type;
		size_t dataLength; // TODO: maybe uint16_t from #include <cstdint> would be enough (and shorten the message minimum size from 16 to 4)
		char data[MQ::MAX_DATA_LENGTH];

		void checkDataLength() const {
			if (dataLength > sizeof (data) || dataLength < 0) throw RelpipeWriterException(L"Invalid POSIX MQ message size.");
		}

		size_t getMessageLength() const {
			return sizeof (*this) - sizeof (data) + dataLength;
		}

		std::string getStringData() {
			return std::string(data, dataLength);
		}

		void setStringData(const std::string& s) {
			if (s.size() > sizeof (data)) throw RelpipeWriterException(L"Unable set message data: string too long.");
			::memcpy(data, s.c_str(), s.size());
			dataLength = s.size();
		}
	};

	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
	}

	virtual ~MQ() {
		mq_close(handle);
		if (unlinkAfterClose) mq_unlink(name.c_str());
	}

	MQ(const MQ& other) = delete;
	void operator=(const MQ& right) = delete;
};

class MQReader : public MQ {
public:

	MQReader(std::string name) : MQ(name, mq_open(name.c_str(), O_RDONLY)) {
	}

	void receive(Message* m) {
		int result = mq_receive(handle, (char*) m, sizeof (*m), nullptr);
		if (result < 0) throw RelpipeWriterException(L"Unable to receive POSIX MQ message.");
		m->checkDataLength();
	}
};

class MQWriter : public MQ {
private:
	mq_attr attributes;

	mq_attr* getAttributes() {
		attributes.mq_maxmsg = MQ::MAX_MESSAGES;
		attributes.mq_msgsize = sizeof (Message);
		return &attributes;
	}
public:

	MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT, S_IRWXU, getAttributes()), true) {
	}

	void send(const Message* m, unsigned int priority = 0) {
		m->checkDataLength();
		int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority);
		if (result) throw RelpipeWriterException(L"Unable to send POSIX MQ message.");
	}
};

class ParallelFilesystemWorker {
};

class ParallelFilesystemProcess {
};

class ParallelFilesystemCommand : public FilesystemCommand {
public:

	void process(std::istream& input, std::ostream& output, Configuration& configuration) {
		// TODO: ParallelFilesystemCommand

		{ // TODO: demo code – remove:
			std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid());

			MQWriter mqWriter(queueName.c_str());
			MQReader mqReader(queueName.c_str());

			MQ::Message writeBuffer;
			MQ::Message readBuffer;

			// ::memset(&writeBuffer, 0, sizeof (writeBuffer));
			// ::memset(&readBuffer, 0, sizeof (readBuffer));

			writeBuffer.type = MQ::Message::Type::END;
			writeBuffer.setStringData("ahoj");

			mqWriter.send(&writeBuffer);

			mqReader.receive(&readBuffer);
			
			std::string readData(readBuffer.data, readBuffer.dataLength);
			std::wstring_convert < codecvt_utf8<wchar_t>> convertor;

			std::wcerr << L"Zpráva „" << convertor.from_bytes(readData).c_str() << L"“ typu " << (int) readBuffer.type << L" o celkové délce " << readBuffer.getMessageLength() << L" a délce dat " << readBuffer.dataLength << L" byla přijata." << std::endl;
		}

		throw RelpipeWriterException(L"ParallelFilesystemCommand is not yet implemented");
	}
};

}
}
}