src/ParallelFilesystemCommand.h
author František Kučera <franta-hg@frantovo.cz>
Mon, 20 Jan 2020 23:47:54 +0100
branchv_0
changeset 56 81a53e7cf0ab
parent 55 698836fc65b4
child 57 c40a241d6e0c
permissions -rw-r--r--
parallel processing: POSIX MQ helper classes + some demo code
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     1
/**
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     2
 * Relational pipes
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     3
 * Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     4
 *
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     5
 * This program is free software: you can redistribute it and/or modify
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     6
 * it under the terms of the GNU General Public License as published by
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     7
 * the Free Software Foundation, version 3 of the License.
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     8
 *
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     9
 * This program is distributed in the hope that it will be useful,
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    10
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    11
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    12
 * GNU General Public License for more details.
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    13
 *
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    14
 * You should have received a copy of the GNU General Public License
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    15
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    16
 */
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    17
#pragma once
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    18
55
698836fc65b4 parallel processing: link to librt (POSIX MQ)
František Kučera <franta-hg@frantovo.cz>
parents: 54
diff changeset
    19
#include <mqueue.h>
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    20
#include <limits.h>
55
698836fc65b4 parallel processing: link to librt (POSIX MQ)
František Kučera <franta-hg@frantovo.cz>
parents: 54
diff changeset
    21
54
ef726975c34b parallel processing: rename FilesystemCommandBase to FilesystemCommand
František Kučera <franta-hg@frantovo.cz>
parents: 52
diff changeset
    22
#include "FilesystemCommand.h"
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    23
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    24
namespace relpipe {
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    25
namespace in {
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    26
namespace filesystem {
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    27
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    28
namespace fs = std::filesystem;
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    29
using namespace relpipe::writer;
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    30
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    31
class MQ {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    32
protected:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    33
	std::string name;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    34
	mqd_t handle;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    35
	bool unlinkAfterClose;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    36
	static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    37
	static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    38
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    39
public:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    40
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    41
	class Message {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    42
	public:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    43
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    44
		enum class Type {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    45
			FILENAME,
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    46
			END
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    47
		};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    48
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    49
		Type type;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    50
		size_t dataLength; // TODO: maybe uint16_t from #include <cstdint> would be enough (and shorten the message minimum size from 16 to 4)
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    51
		char data[MQ::MAX_DATA_LENGTH];
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    52
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    53
		void checkDataLength() const {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    54
			if (dataLength > sizeof (data) || dataLength < 0) throw RelpipeWriterException(L"Invalid POSIX MQ message size.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    55
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    57
		size_t getMessageLength() const {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    58
			return sizeof (*this) - sizeof (data) + dataLength;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    59
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    60
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    61
		std::string getStringData() {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    62
			return std::string(data, dataLength);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    63
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    64
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    65
		void setStringData(const std::string& s) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    66
			if (s.size() > sizeof (data)) throw RelpipeWriterException(L"Unable set message data: string too long.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    67
			::memcpy(data, s.c_str(), s.size());
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    68
			dataLength = s.size();
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    69
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    70
	};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    71
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    72
	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    73
		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    74
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    75
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    76
	virtual ~MQ() {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    77
		mq_close(handle);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    78
		if (unlinkAfterClose) mq_unlink(name.c_str());
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    79
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    80
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    81
	MQ(const MQ& other) = delete;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    82
	void operator=(const MQ& right) = delete;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    83
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    84
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    85
class MQReader : public MQ {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    86
public:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    87
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    88
	MQReader(std::string name) : MQ(name, mq_open(name.c_str(), O_RDONLY)) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    89
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    90
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    91
	void receive(Message* m) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    92
		int result = mq_receive(handle, (char*) m, sizeof (*m), nullptr);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    93
		if (result < 0) throw RelpipeWriterException(L"Unable to receive POSIX MQ message.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    94
		m->checkDataLength();
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    95
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    96
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    97
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    98
class MQWriter : public MQ {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    99
private:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   100
	mq_attr attributes;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   101
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   102
	mq_attr* getAttributes() {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   103
		attributes.mq_maxmsg = MQ::MAX_MESSAGES;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   104
		attributes.mq_msgsize = sizeof (Message);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   105
		return &attributes;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   106
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   107
public:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   108
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   109
	MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT, S_IRWXU, getAttributes()), true) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   110
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   111
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   112
	void send(const Message* m, unsigned int priority = 0) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   113
		m->checkDataLength();
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   114
		int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   115
		if (result) throw RelpipeWriterException(L"Unable to send POSIX MQ message.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   116
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   117
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   118
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   119
class ParallelFilesystemWorker {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   120
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   121
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   122
class ParallelFilesystemProcess {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   123
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   124
54
ef726975c34b parallel processing: rename FilesystemCommandBase to FilesystemCommand
František Kučera <franta-hg@frantovo.cz>
parents: 52
diff changeset
   125
class ParallelFilesystemCommand : public FilesystemCommand {
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   126
public:
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   127
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   128
	void process(std::istream& input, std::ostream& output, Configuration& configuration) {
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   129
		// TODO: ParallelFilesystemCommand
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   130
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   131
		{ // TODO: demo code – remove:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   132
			std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid());
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   133
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   134
			MQWriter mqWriter(queueName.c_str());
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   135
			MQReader mqReader(queueName.c_str());
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   136
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   137
			MQ::Message writeBuffer;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   138
			MQ::Message readBuffer;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   139
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   140
			// ::memset(&writeBuffer, 0, sizeof (writeBuffer));
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   141
			// ::memset(&readBuffer, 0, sizeof (readBuffer));
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   142
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   143
			writeBuffer.type = MQ::Message::Type::END;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   144
			writeBuffer.setStringData("ahoj");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   145
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   146
			mqWriter.send(&writeBuffer);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   147
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   148
			mqReader.receive(&readBuffer);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   149
			
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   150
			std::string readData(readBuffer.data, readBuffer.dataLength);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   151
			std::wstring_convert < codecvt_utf8<wchar_t>> convertor;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   152
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   153
			std::wcerr << L"Zpráva „" << convertor.from_bytes(readData).c_str() << L"“ typu " << (int) readBuffer.type << L" o celkové délce " << readBuffer.getMessageLength() << L" a délce dat " << readBuffer.dataLength << L" byla přijata." << std::endl;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   154
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   155
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   156
		throw RelpipeWriterException(L"ParallelFilesystemCommand is not yet implemented");
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   157
	}
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   158
};
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   159
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   160
}
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   161
}
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   162
}