src/ParallelFilesystemCommand.h
author František Kučera <franta-hg@frantovo.cz>
Fri, 24 Jan 2020 16:53:31 +0100
branchv_0
changeset 58 4679f67a8324
parent 57 c40a241d6e0c
child 59 7471529c0d11
permissions -rw-r--r--
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     1
/**
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     2
 * Relational pipes
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     3
 * Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     4
 *
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     5
 * This program is free software: you can redistribute it and/or modify
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     6
 * it under the terms of the GNU General Public License as published by
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     7
 * the Free Software Foundation, version 3 of the License.
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     8
 *
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     9
 * This program is distributed in the hope that it will be useful,
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    10
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    11
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    12
 * GNU General Public License for more details.
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    13
 *
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    14
 * You should have received a copy of the GNU General Public License
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    15
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    16
 */
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    17
#pragma once
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    18
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    19
#include <mutex>
55
698836fc65b4 parallel processing: link to librt (POSIX MQ)
František Kučera <franta-hg@frantovo.cz>
parents: 54
diff changeset
    20
#include <mqueue.h>
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    21
#include <limits.h>
57
c40a241d6e0c parallel processing: use directly file descriptors (FD) instead of STDIO streams
František Kučera <franta-hg@frantovo.cz>
parents: 56
diff changeset
    22
#include <ext/stdio_filebuf.h>
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    23
#include <sys/wait.h>
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    24
#include <semaphore.h>
55
698836fc65b4 parallel processing: link to librt (POSIX MQ)
František Kučera <franta-hg@frantovo.cz>
parents: 54
diff changeset
    25
54
ef726975c34b parallel processing: rename FilesystemCommandBase to FilesystemCommand
František Kučera <franta-hg@frantovo.cz>
parents: 52
diff changeset
    26
#include "FilesystemCommand.h"
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    27
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    28
namespace relpipe {
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    29
namespace in {
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    30
namespace filesystem {
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    31
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    32
namespace fs = std::filesystem;
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    33
using namespace relpipe::writer;
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    34
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    35
class MQ {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    36
protected:
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    37
	/**
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    38
	 * Process where this object was created. 
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    39
	 * During fork() this object is copied.
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    40
	 * Using this variable we can detect the copy.
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    41
	 */
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    42
	__pid_t originalPid;
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    43
	std::string name;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    44
	mqd_t handle;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    45
	bool unlinkAfterClose;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    46
	static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    47
	static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    48
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    49
public:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    50
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    51
	class Message {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    52
	public:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    53
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    54
		enum class Type {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    55
			FILENAME,
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    56
			END
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    57
		};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    58
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    59
		Type type;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    60
		size_t dataLength; // TODO: maybe uint16_t from #include <cstdint> would be enough (and shorten the message minimum size from 16 to 4)
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    61
		char data[MQ::MAX_DATA_LENGTH];
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    62
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    63
		void checkDataLength() const {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    64
			if (dataLength > sizeof (data) || dataLength < 0) throw RelpipeWriterException(L"Invalid POSIX MQ message size.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    65
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    66
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    67
		size_t getMessageLength() const {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    68
			return sizeof (*this) - sizeof (data) + dataLength;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    69
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    70
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    71
		std::string getStringData() {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    72
			return std::string(data, dataLength);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    73
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    74
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    75
		void setStringData(const std::string& s) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    76
			if (s.size() > sizeof (data)) throw RelpipeWriterException(L"Unable set message data: string too long.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    77
			::memcpy(data, s.c_str(), s.size());
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    78
			dataLength = s.size();
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    79
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    80
	};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    81
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    82
	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    83
		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    84
		// FIXME: sometimes we got this error, especially with higher process counts like: --parallel 50
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    85
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    86
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    87
	virtual ~MQ() {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    88
		mq_close(handle);
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    89
		if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str());
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    90
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    91
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    92
	MQ(const MQ& other) = delete;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    93
	void operator=(const MQ& right) = delete;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    94
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    95
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    96
class MQReader : public MQ {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    97
public:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    98
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    99
	MQReader(std::string name) : MQ(name, mq_open(name.c_str(), O_RDONLY)) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   100
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   101
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   102
	void receive(Message* m) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   103
		int result = mq_receive(handle, (char*) m, sizeof (*m), nullptr);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   104
		if (result < 0) throw RelpipeWriterException(L"Unable to receive POSIX MQ message.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   105
		m->checkDataLength();
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   106
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   107
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   108
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   109
class MQWriter : public MQ {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   110
private:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   111
	mq_attr attributes;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   112
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   113
	mq_attr* getAttributes() {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   114
		attributes.mq_maxmsg = MQ::MAX_MESSAGES;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   115
		attributes.mq_msgsize = sizeof (Message);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   116
		return &attributes;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   117
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   118
public:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   119
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   120
	MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) {
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   121
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   122
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   123
	void send(const Message* m, unsigned int priority = 0) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   124
		m->checkDataLength();
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   125
		int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   126
		if (result) throw RelpipeWriterException(L"Unable to send POSIX MQ message.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   127
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   128
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   129
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   130
/**
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   131
 * TODO: move to a common/streamlet library
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   132
 */
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   133
class NamedMutex {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   134
private:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   135
	/**
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   136
	 * Process where this object was created. 
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   137
	 * During fork() this object is copied.
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   138
	 * Using this variable we can detect the copy.
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   139
	 */
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   140
	__pid_t originalPid;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   141
	sem_t* handle;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   142
	std::string name;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   143
	bool owner;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   144
public:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   145
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   146
	NamedMutex(std::string name) : originalPid(getpid()), name(name) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   147
		handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   148
		sem_post(handle);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   149
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   150
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   151
	~NamedMutex() {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   152
		sem_close(handle);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   153
		if (originalPid == getpid()) sem_unlink(name.c_str());
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   154
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   155
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   156
	NamedMutex(const NamedMutex&) = delete;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   157
	NamedMutex& operator=(const NamedMutex&) = delete;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   158
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   159
	void lock() {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   160
		sem_wait(handle);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   161
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   162
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   163
	void unlock() {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   164
		sem_post(handle);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   165
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   166
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   167
	void disown() {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   168
		owner = false;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   169
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   170
};
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   171
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   172
class ParallelFilesystemWorker {
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   173
private:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   174
	std::string queueName;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   175
	NamedMutex& stdoutMutex;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   176
	string_t relationName;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   177
	std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   178
	Configuration& configuration;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   179
	std::wstring_convert < codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   180
public:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   181
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   182
	ParallelFilesystemWorker(std::string queueName, NamedMutex& stdoutMutex, string_t relationName, std::map<string_t, std::shared_ptr<AttributeFinder> > attributeFinders, Configuration& configuration) : queueName(queueName), stdoutMutex(stdoutMutex), relationName(relationName), attributeFinders(attributeFinders), configuration(configuration) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   183
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   184
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   185
	void run() {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   186
		MQ::Message readBuffer;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   187
		MQReader mq(queueName.c_str());
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   188
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   189
		for (bool running = true; running;) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   190
			mq.receive(&readBuffer);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   191
			std::wstringstream debugLog;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   192
			if (readBuffer.type == MQ::Message::Type::END) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   193
				debugLog << L"PID: " << getpid() << L" received END message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   194
				running = false;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   195
			} else if (readBuffer.type == MQ::Message::Type::FILENAME) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   196
				debugLog << L"PID: " << getpid() << L" received FILENAME message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   197
			} else {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   198
				debugLog << L"PID: " << getpid() << L" received unexpected message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“ of type: " << ((int) readBuffer.type) << std::endl;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   199
			}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   200
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   201
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   202
			{
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   203
				std::lock_guard lock(stdoutMutex);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   204
				std::wcerr << debugLog.str() << std::flush;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   205
			}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   206
			debugLog.str(L"");
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   207
			debugLog.clear();
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   208
		}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   209
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   210
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   211
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   212
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   213
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   214
class ParallelFilesystemProcess {
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   215
private:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   216
	__pid_t subPid;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   217
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   218
	ParallelFilesystemProcess(__pid_t subPid) : subPid(subPid) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   219
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   220
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   221
	/**
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   222
	 * TODO: move to a common library (copied from the AWK module) 
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   223
	 */
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   224
	static void redirectFD(int oldfd, int newfd) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   225
		int result = dup2(oldfd, newfd);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   226
		if (result < 0) throw ParallelFilesystemProcess::Exception(L"Unable redirect FD.");
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   227
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   228
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   229
	/**
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   230
	 * TODO: move to a common library (copied from the AWK module) 
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   231
	 */
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   232
	static void closeOrThrow(int fd) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   233
		int error = close(fd);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   234
		if (error) throw ParallelFilesystemProcess::Exception(L"Unable to close FD: " + std::to_wstring(fd) + L" from PID: " + std::to_wstring(getpid()));
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   235
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   236
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   237
public:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   238
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   239
	class Exception : public relpipe::writer::RelpipeWriterException {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   240
	public:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   241
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   242
		Exception(std::wstring message) : relpipe::writer::RelpipeWriterException(message) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   243
		}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   244
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   245
	};
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   246
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   247
	static ParallelFilesystemProcess* create(std::string queueName, int outputFD, NamedMutex& stdoutMutex, string_t relationName, std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders, Configuration& configuration) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   248
		__pid_t subPid = fork();
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   249
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   250
		if (subPid < 0) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   251
			throw SubProcess::Exception(L"Unable to fork the hash process.");
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   252
		} else if (subPid == 0) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   253
			// Child process
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   254
			closeOrThrow(STDIN_FILENO); // strace -cf will show failed close() calls (same as number of processes)
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   255
			if (outputFD != STDOUT_FILENO) redirectFD(outputFD, STDOUT_FILENO);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   256
			ParallelFilesystemWorker w(queueName, stdoutMutex, relationName, attributeFinders, configuration);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   257
			w.run();
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   258
			return nullptr;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   259
		} else {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   260
			// Parent process
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   261
			return new ParallelFilesystemProcess(subPid);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   262
		}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   263
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   264
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   265
	int wait() {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   266
		int status = -1;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   267
		::waitpid(subPid, &status, 0);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   268
		return status;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   269
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   270
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   271
	__pid_t getPid() const {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   272
		return subPid;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   273
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   274
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   275
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   276
54
ef726975c34b parallel processing: rename FilesystemCommandBase to FilesystemCommand
František Kučera <franta-hg@frantovo.cz>
parents: 52
diff changeset
   277
class ParallelFilesystemCommand : public FilesystemCommand {
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   278
public:
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   279
57
c40a241d6e0c parallel processing: use directly file descriptors (FD) instead of STDIO streams
František Kučera <franta-hg@frantovo.cz>
parents: 56
diff changeset
   280
	void process(int inputFD, int outputFD, Configuration& configuration) {
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   281
		__gnu_cxx::stdio_filebuf<char> inputBuffer(inputFD, std::ios::in);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   282
		__gnu_cxx::stdio_filebuf<char> outputBuffer(outputFD, std::ios::out);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   283
		std::istream input(&inputBuffer);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   284
		std::ostream output(&outputBuffer);
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   285
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   286
		// Write relation header:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   287
		string_t relationName = fetchRelationName(&configuration);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   288
		std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders = createAttributeFinders();
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   289
		std::shared_ptr<RelationalWriter> writer(Factory::create(output));
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   290
		writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   291
		output.flush();
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   292
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   293
		// Create queue:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   294
		std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid());
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   295
		MQWriter mq(queueName.c_str());
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   296
		MQ::Message writeBuffer;
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   297
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   298
		// Create lock for STDOUT synchronization:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   299
		NamedMutex stdoutMutex(queueName);
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   300
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   301
		// Start workers:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   302
		std::vector<std::shared_ptr < ParallelFilesystemProcess>> workerProcesses;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   303
		bool inMainProcess = true;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   304
		for (int i = 0; i < configuration.parallelism; i++) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   305
			std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, stdoutMutex, relationName, createAttributeFinders(), configuration));
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   306
			if (workerProcess) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   307
				workerProcesses.push_back(workerProcess);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   308
			} else {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   309
				inMainProcess = false;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   310
				break;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   311
			}
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   312
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   313
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   314
		if (inMainProcess) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   315
			// Distribute file names to the workers:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   316
			for (std::stringstream originalName; readNext(input, originalName); reset(originalName)) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   317
				writeBuffer.type = MQ::Message::Type::FILENAME;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   318
				writeBuffer.setStringData(originalName.str());
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   319
				mq.send(&writeBuffer);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   320
			}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   321
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   322
			// Tell workers that everything is done:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   323
			writeBuffer.type = MQ::Message::Type::END;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   324
			writeBuffer.setStringData("");
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   325
			for (int i = 0; i < configuration.parallelism; i++) mq.send(&writeBuffer);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   326
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   327
			// Wait for workers exit:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   328
			std::map<__pid_t, int> failedProcesses;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   329
			for (std::shared_ptr<ParallelFilesystemProcess> p : workerProcesses) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   330
				int result = p->wait();
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   331
				if (result) failedProcesses[p->getPid()] = result;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   332
			}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   333
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   334
			if (failedProcesses.size()) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   335
				std::wstringstream errorMessage;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   336
				errorMessage << L"One or more processes failed: ";
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   337
				for (auto failed : failedProcesses) errorMessage << failed.first << L":" << failed.second << L", ";
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   338
				throw ParallelFilesystemProcess::Exception(errorMessage.str());
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   339
			}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   340
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   341
		} else {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   342
			// we are in a worker process → do nothing, finished
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   343
		}
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   344
	}
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   345
};
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   346
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   347
}
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   348
}
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   349
}