# HG changeset patch # User František Kučera # Date 1579560474 -3600 # Node ID 81a53e7cf0abaab0f4d85e125d8ab8c87e132253 # Parent 698836fc65b4b7a5cdb96e8085fcc8c37e3181ab parallel processing: POSIX MQ helper classes + some demo code diff -r 698836fc65b4 -r 81a53e7cf0ab src/ParallelFilesystemCommand.h --- a/src/ParallelFilesystemCommand.h Mon Jan 20 15:48:39 2020 +0100 +++ b/src/ParallelFilesystemCommand.h Mon Jan 20 23:47:54 2020 +0100 @@ -17,6 +17,7 @@ #pragma once #include +#include #include "FilesystemCommand.h" @@ -27,12 +28,131 @@ namespace fs = std::filesystem; using namespace relpipe::writer; +class MQ { +protected: + std::string name; + mqd_t handle; + bool unlinkAfterClose; + static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable + static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count + +public: + + class Message { + public: + + enum class Type { + FILENAME, + END + }; + + Type type; + size_t dataLength; // TODO: maybe uint16_t from #include would be enough (and shorten the message minimum size from 16 to 4) + char data[MQ::MAX_DATA_LENGTH]; + + void checkDataLength() const { + if (dataLength > sizeof (data) || dataLength < 0) throw RelpipeWriterException(L"Invalid POSIX MQ message size."); + } + + size_t getMessageLength() const { + return sizeof (*this) - sizeof (data) + dataLength; + } + + std::string getStringData() { + return std::string(data, dataLength); + } + + void setStringData(const std::string& s) { + if (s.size() > sizeof (data)) throw RelpipeWriterException(L"Unable set message data: string too long."); + ::memcpy(data, s.c_str(), s.size()); + dataLength = s.size(); + } + }; + + MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) { + if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ."); + } + + virtual ~MQ() { + mq_close(handle); + if (unlinkAfterClose) mq_unlink(name.c_str()); + } + + MQ(const MQ& other) = delete; + void operator=(const MQ& right) = delete; +}; + +class MQReader : public MQ { +public: + + MQReader(std::string name) : MQ(name, mq_open(name.c_str(), O_RDONLY)) { + } + + void receive(Message* m) { + int result = mq_receive(handle, (char*) m, sizeof (*m), nullptr); + if (result < 0) throw RelpipeWriterException(L"Unable to receive POSIX MQ message."); + m->checkDataLength(); + } +}; + +class MQWriter : public MQ { +private: + mq_attr attributes; + + mq_attr* getAttributes() { + attributes.mq_maxmsg = MQ::MAX_MESSAGES; + attributes.mq_msgsize = sizeof (Message); + return &attributes; + } +public: + + MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT, S_IRWXU, getAttributes()), true) { + } + + void send(const Message* m, unsigned int priority = 0) { + m->checkDataLength(); + int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority); + if (result) throw RelpipeWriterException(L"Unable to send POSIX MQ message."); + } +}; + +class ParallelFilesystemWorker { +}; + +class ParallelFilesystemProcess { +}; + class ParallelFilesystemCommand : public FilesystemCommand { public: void process(std::istream& input, std::ostream& output, Configuration& configuration) { // TODO: ParallelFilesystemCommand - mq_close(0); // FIXME: remove (this line just tests that linking to librt worked well) + + { // TODO: demo code – remove: + std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid()); + + MQWriter mqWriter(queueName.c_str()); + MQReader mqReader(queueName.c_str()); + + MQ::Message writeBuffer; + MQ::Message readBuffer; + + // ::memset(&writeBuffer, 0, sizeof (writeBuffer)); + // ::memset(&readBuffer, 0, sizeof (readBuffer)); + + writeBuffer.type = MQ::Message::Type::END; + writeBuffer.setStringData("ahoj"); + + mqWriter.send(&writeBuffer); + + mqReader.receive(&readBuffer); + + std::string readData(readBuffer.data, readBuffer.dataLength); + std::wstring_convert < codecvt_utf8> convertor; + + std::wcerr << L"Zpráva „" << convertor.from_bytes(readData).c_str() << L"“ typu " << (int) readBuffer.type << L" o celkové délce " << readBuffer.getMessageLength() << L" a délce dat " << readBuffer.dataLength << L" byla přijata." << std::endl; + } + throw RelpipeWriterException(L"ParallelFilesystemCommand is not yet implemented"); } };