src/ParallelFilesystemCommand.h
author František Kučera <franta-hg@frantovo.cz>
Tue, 21 Jan 2020 00:19:56 +0100
branchv_0
changeset 57 c40a241d6e0c
parent 56 81a53e7cf0ab
child 58 4679f67a8324
permissions -rw-r--r--
parallel processing: use directly file descriptors (FD) instead of STDIO streams
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     1
/**
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     2
 * Relational pipes
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     3
 * Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     4
 *
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     5
 * This program is free software: you can redistribute it and/or modify
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     6
 * it under the terms of the GNU General Public License as published by
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     7
 * the Free Software Foundation, version 3 of the License.
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     8
 *
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     9
 * This program is distributed in the hope that it will be useful,
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    10
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    11
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    12
 * GNU General Public License for more details.
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    13
 *
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    14
 * You should have received a copy of the GNU General Public License
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    15
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    16
 */
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    17
#pragma once
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    18
55
698836fc65b4 parallel processing: link to librt (POSIX MQ)
František Kučera <franta-hg@frantovo.cz>
parents: 54
diff changeset
    19
#include <mqueue.h>
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    20
#include <limits.h>
57
c40a241d6e0c parallel processing: use directly file descriptors (FD) instead of STDIO streams
František Kučera <franta-hg@frantovo.cz>
parents: 56
diff changeset
    21
#include <ext/stdio_filebuf.h>
55
698836fc65b4 parallel processing: link to librt (POSIX MQ)
František Kučera <franta-hg@frantovo.cz>
parents: 54
diff changeset
    22
54
ef726975c34b parallel processing: rename FilesystemCommandBase to FilesystemCommand
František Kučera <franta-hg@frantovo.cz>
parents: 52
diff changeset
    23
#include "FilesystemCommand.h"
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    24
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    25
namespace relpipe {
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    26
namespace in {
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    27
namespace filesystem {
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    28
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    29
namespace fs = std::filesystem;
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    30
using namespace relpipe::writer;
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    31
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    32
class MQ {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    33
protected:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    34
	std::string name;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    35
	mqd_t handle;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    36
	bool unlinkAfterClose;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    37
	static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    38
	static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    39
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    40
public:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    41
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    42
	class Message {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    43
	public:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    44
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    45
		enum class Type {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    46
			FILENAME,
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    47
			END
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    48
		};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    49
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    50
		Type type;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    51
		size_t dataLength; // TODO: maybe uint16_t from #include <cstdint> would be enough (and shorten the message minimum size from 16 to 4)
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    52
		char data[MQ::MAX_DATA_LENGTH];
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    53
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    54
		void checkDataLength() const {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    55
			if (dataLength > sizeof (data) || dataLength < 0) throw RelpipeWriterException(L"Invalid POSIX MQ message size.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    56
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    57
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    58
		size_t getMessageLength() const {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    59
			return sizeof (*this) - sizeof (data) + dataLength;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    60
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    61
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    62
		std::string getStringData() {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    63
			return std::string(data, dataLength);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    64
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    65
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    66
		void setStringData(const std::string& s) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    67
			if (s.size() > sizeof (data)) throw RelpipeWriterException(L"Unable set message data: string too long.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    68
			::memcpy(data, s.c_str(), s.size());
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    69
			dataLength = s.size();
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    70
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    71
	};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    72
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    73
	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    74
		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    75
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    76
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    77
	virtual ~MQ() {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    78
		mq_close(handle);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    79
		if (unlinkAfterClose) mq_unlink(name.c_str());
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    80
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    81
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    82
	MQ(const MQ& other) = delete;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    83
	void operator=(const MQ& right) = delete;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    84
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    85
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    86
class MQReader : public MQ {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    87
public:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    88
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    89
	MQReader(std::string name) : MQ(name, mq_open(name.c_str(), O_RDONLY)) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    90
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    91
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    92
	void receive(Message* m) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    93
		int result = mq_receive(handle, (char*) m, sizeof (*m), nullptr);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    94
		if (result < 0) throw RelpipeWriterException(L"Unable to receive POSIX MQ message.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    95
		m->checkDataLength();
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    96
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    97
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    98
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    99
class MQWriter : public MQ {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   100
private:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   101
	mq_attr attributes;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   102
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   103
	mq_attr* getAttributes() {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   104
		attributes.mq_maxmsg = MQ::MAX_MESSAGES;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   105
		attributes.mq_msgsize = sizeof (Message);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   106
		return &attributes;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   107
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   108
public:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   109
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   110
	MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT, S_IRWXU, getAttributes()), true) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   111
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   112
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   113
	void send(const Message* m, unsigned int priority = 0) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   114
		m->checkDataLength();
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   115
		int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   116
		if (result) throw RelpipeWriterException(L"Unable to send POSIX MQ message.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   117
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   118
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   119
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   120
class ParallelFilesystemWorker {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   121
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   122
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   123
class ParallelFilesystemProcess {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   124
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   125
54
ef726975c34b parallel processing: rename FilesystemCommandBase to FilesystemCommand
František Kučera <franta-hg@frantovo.cz>
parents: 52
diff changeset
   126
class ParallelFilesystemCommand : public FilesystemCommand {
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   127
public:
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   128
57
c40a241d6e0c parallel processing: use directly file descriptors (FD) instead of STDIO streams
František Kučera <franta-hg@frantovo.cz>
parents: 56
diff changeset
   129
	void process(int inputFD, int outputFD, Configuration& configuration) {
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   130
		// TODO: ParallelFilesystemCommand
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   131
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   132
		{ // TODO: demo code – remove:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   133
			std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid());
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   134
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   135
			MQWriter mqWriter(queueName.c_str());
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   136
			MQReader mqReader(queueName.c_str());
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   137
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   138
			MQ::Message writeBuffer;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   139
			MQ::Message readBuffer;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   140
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   141
			// ::memset(&writeBuffer, 0, sizeof (writeBuffer));
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   142
			// ::memset(&readBuffer, 0, sizeof (readBuffer));
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   143
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   144
			writeBuffer.type = MQ::Message::Type::END;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   145
			writeBuffer.setStringData("ahoj");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   146
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   147
			mqWriter.send(&writeBuffer);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   148
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   149
			mqReader.receive(&readBuffer);
57
c40a241d6e0c parallel processing: use directly file descriptors (FD) instead of STDIO streams
František Kučera <franta-hg@frantovo.cz>
parents: 56
diff changeset
   150
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   151
			std::string readData(readBuffer.data, readBuffer.dataLength);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   152
			std::wstring_convert < codecvt_utf8<wchar_t>> convertor;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   153
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   154
			std::wcerr << L"Zpráva „" << convertor.from_bytes(readData).c_str() << L"“ typu " << (int) readBuffer.type << L" o celkové délce " << readBuffer.getMessageLength() << L" a délce dat " << readBuffer.dataLength << L" byla přijata." << std::endl;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   155
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   156
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   157
		throw RelpipeWriterException(L"ParallelFilesystemCommand is not yet implemented");
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   158
	}
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   159
};
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   160
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   161
}
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   162
}
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   163
}