src/ParallelFilesystemCommand.h
branchv_0
changeset 56 81a53e7cf0ab
parent 55 698836fc65b4
child 57 c40a241d6e0c
equal deleted inserted replaced
55:698836fc65b4 56:81a53e7cf0ab
    15  * along with this program. If not, see <http://www.gnu.org/licenses/>.
    15  * along with this program. If not, see <http://www.gnu.org/licenses/>.
    16  */
    16  */
    17 #pragma once
    17 #pragma once
    18 
    18 
    19 #include <mqueue.h>
    19 #include <mqueue.h>
       
    20 #include <limits.h>
    20 
    21 
    21 #include "FilesystemCommand.h"
    22 #include "FilesystemCommand.h"
    22 
    23 
    23 namespace relpipe {
    24 namespace relpipe {
    24 namespace in {
    25 namespace in {
    25 namespace filesystem {
    26 namespace filesystem {
    26 
    27 
    27 namespace fs = std::filesystem;
    28 namespace fs = std::filesystem;
    28 using namespace relpipe::writer;
    29 using namespace relpipe::writer;
    29 
    30 
       
    31 class MQ {
       
    32 protected:
       
    33 	std::string name;
       
    34 	mqd_t handle;
       
    35 	bool unlinkAfterClose;
       
    36 	static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable
       
    37 	static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count
       
    38 
       
    39 public:
       
    40 
       
    41 	class Message {
       
    42 	public:
       
    43 
       
    44 		enum class Type {
       
    45 			FILENAME,
       
    46 			END
       
    47 		};
       
    48 
       
    49 		Type type;
       
    50 		size_t dataLength; // TODO: maybe uint16_t from #include <cstdint> would be enough (and shorten the message minimum size from 16 to 4)
       
    51 		char data[MQ::MAX_DATA_LENGTH];
       
    52 
       
    53 		void checkDataLength() const {
       
    54 			if (dataLength > sizeof (data) || dataLength < 0) throw RelpipeWriterException(L"Invalid POSIX MQ message size.");
       
    55 		}
       
    56 
       
    57 		size_t getMessageLength() const {
       
    58 			return sizeof (*this) - sizeof (data) + dataLength;
       
    59 		}
       
    60 
       
    61 		std::string getStringData() {
       
    62 			return std::string(data, dataLength);
       
    63 		}
       
    64 
       
    65 		void setStringData(const std::string& s) {
       
    66 			if (s.size() > sizeof (data)) throw RelpipeWriterException(L"Unable set message data: string too long.");
       
    67 			::memcpy(data, s.c_str(), s.size());
       
    68 			dataLength = s.size();
       
    69 		}
       
    70 	};
       
    71 
       
    72 	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
       
    73 		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
       
    74 	}
       
    75 
       
    76 	virtual ~MQ() {
       
    77 		mq_close(handle);
       
    78 		if (unlinkAfterClose) mq_unlink(name.c_str());
       
    79 	}
       
    80 
       
    81 	MQ(const MQ& other) = delete;
       
    82 	void operator=(const MQ& right) = delete;
       
    83 };
       
    84 
       
    85 class MQReader : public MQ {
       
    86 public:
       
    87 
       
    88 	MQReader(std::string name) : MQ(name, mq_open(name.c_str(), O_RDONLY)) {
       
    89 	}
       
    90 
       
    91 	void receive(Message* m) {
       
    92 		int result = mq_receive(handle, (char*) m, sizeof (*m), nullptr);
       
    93 		if (result < 0) throw RelpipeWriterException(L"Unable to receive POSIX MQ message.");
       
    94 		m->checkDataLength();
       
    95 	}
       
    96 };
       
    97 
       
    98 class MQWriter : public MQ {
       
    99 private:
       
   100 	mq_attr attributes;
       
   101 
       
   102 	mq_attr* getAttributes() {
       
   103 		attributes.mq_maxmsg = MQ::MAX_MESSAGES;
       
   104 		attributes.mq_msgsize = sizeof (Message);
       
   105 		return &attributes;
       
   106 	}
       
   107 public:
       
   108 
       
   109 	MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT, S_IRWXU, getAttributes()), true) {
       
   110 	}
       
   111 
       
   112 	void send(const Message* m, unsigned int priority = 0) {
       
   113 		m->checkDataLength();
       
   114 		int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority);
       
   115 		if (result) throw RelpipeWriterException(L"Unable to send POSIX MQ message.");
       
   116 	}
       
   117 };
       
   118 
       
   119 class ParallelFilesystemWorker {
       
   120 };
       
   121 
       
   122 class ParallelFilesystemProcess {
       
   123 };
       
   124 
    30 class ParallelFilesystemCommand : public FilesystemCommand {
   125 class ParallelFilesystemCommand : public FilesystemCommand {
    31 public:
   126 public:
    32 
   127 
    33 	void process(std::istream& input, std::ostream& output, Configuration& configuration) {
   128 	void process(std::istream& input, std::ostream& output, Configuration& configuration) {
    34 		// TODO: ParallelFilesystemCommand
   129 		// TODO: ParallelFilesystemCommand
    35 		mq_close(0); // FIXME: remove (this line just tests that linking to librt worked well)
   130 
       
   131 		{ // TODO: demo code – remove:
       
   132 			std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid());
       
   133 
       
   134 			MQWriter mqWriter(queueName.c_str());
       
   135 			MQReader mqReader(queueName.c_str());
       
   136 
       
   137 			MQ::Message writeBuffer;
       
   138 			MQ::Message readBuffer;
       
   139 
       
   140 			// ::memset(&writeBuffer, 0, sizeof (writeBuffer));
       
   141 			// ::memset(&readBuffer, 0, sizeof (readBuffer));
       
   142 
       
   143 			writeBuffer.type = MQ::Message::Type::END;
       
   144 			writeBuffer.setStringData("ahoj");
       
   145 
       
   146 			mqWriter.send(&writeBuffer);
       
   147 
       
   148 			mqReader.receive(&readBuffer);
       
   149 			
       
   150 			std::string readData(readBuffer.data, readBuffer.dataLength);
       
   151 			std::wstring_convert < codecvt_utf8<wchar_t>> convertor;
       
   152 
       
   153 			std::wcerr << L"Zpráva „" << convertor.from_bytes(readData).c_str() << L"“ typu " << (int) readBuffer.type << L" o celkové délce " << readBuffer.getMessageLength() << L" a délce dat " << readBuffer.dataLength << L" byla přijata." << std::endl;
       
   154 		}
       
   155 
    36 		throw RelpipeWriterException(L"ParallelFilesystemCommand is not yet implemented");
   156 		throw RelpipeWriterException(L"ParallelFilesystemCommand is not yet implemented");
    37 	}
   157 	}
    38 };
   158 };
    39 
   159 
    40 }
   160 }