src/ParallelFilesystemCommand.h
author František Kučera <franta-hg@frantovo.cz>
Fri, 24 Jan 2020 21:05:10 +0100
branchv_0
changeset 59 7471529c0d11
parent 58 4679f67a8324
child 60 bb7ca5891755
permissions -rw-r--r--
parallel processing: first working version
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     1
/**
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     2
 * Relational pipes
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     3
 * Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     4
 *
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     5
 * This program is free software: you can redistribute it and/or modify
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     6
 * it under the terms of the GNU General Public License as published by
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     7
 * the Free Software Foundation, version 3 of the License.
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     8
 *
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     9
 * This program is distributed in the hope that it will be useful,
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    10
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    11
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    12
 * GNU General Public License for more details.
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    13
 *
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    14
 * You should have received a copy of the GNU General Public License
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    15
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    16
 */
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    17
#pragma once
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    18
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    19
#include <mutex>
55
698836fc65b4 parallel processing: link to librt (POSIX MQ)
František Kučera <franta-hg@frantovo.cz>
parents: 54
diff changeset
    20
#include <mqueue.h>
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    21
#include <limits.h>
57
c40a241d6e0c parallel processing: use directly file descriptors (FD) instead of STDIO streams
František Kučera <franta-hg@frantovo.cz>
parents: 56
diff changeset
    22
#include <ext/stdio_filebuf.h>
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    23
#include <sys/wait.h>
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    24
#include <semaphore.h>
55
698836fc65b4 parallel processing: link to librt (POSIX MQ)
František Kučera <franta-hg@frantovo.cz>
parents: 54
diff changeset
    25
54
ef726975c34b parallel processing: rename FilesystemCommandBase to FilesystemCommand
František Kučera <franta-hg@frantovo.cz>
parents: 52
diff changeset
    26
#include "FilesystemCommand.h"
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    27
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    28
namespace relpipe {
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    29
namespace in {
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    30
namespace filesystem {
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    31
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    32
namespace fs = std::filesystem;
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    33
using namespace relpipe::writer;
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    34
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    35
class MQ {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    36
protected:
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    37
	/**
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    38
	 * Process where this object was created. 
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    39
	 * During fork() this object is copied.
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    40
	 * Using this variable we can detect the copy.
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    41
	 */
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    42
	__pid_t originalPid;
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    43
	std::string name;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    44
	mqd_t handle;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    45
	bool unlinkAfterClose;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    46
	static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    47
	static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    48
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    49
public:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    50
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    51
	class Message {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    52
	public:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    53
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    54
		enum class Type {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    55
			FILENAME,
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    56
			END
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    57
		};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    58
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    59
		Type type;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    60
		size_t dataLength; // TODO: maybe uint16_t from #include <cstdint> would be enough (and shorten the message minimum size from 16 to 4)
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    61
		char data[MQ::MAX_DATA_LENGTH];
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    62
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    63
		void checkDataLength() const {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    64
			if (dataLength > sizeof (data) || dataLength < 0) throw RelpipeWriterException(L"Invalid POSIX MQ message size.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    65
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    66
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    67
		size_t getMessageLength() const {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    68
			return sizeof (*this) - sizeof (data) + dataLength;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    69
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    70
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    71
		std::string getStringData() {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    72
			return std::string(data, dataLength);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    73
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    74
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    75
		void setStringData(const std::string& s) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    76
			if (s.size() > sizeof (data)) throw RelpipeWriterException(L"Unable set message data: string too long.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    77
			::memcpy(data, s.c_str(), s.size());
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    78
			dataLength = s.size();
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    79
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    80
	};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    81
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    82
	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    83
		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
59
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
    84
		// TODO: factory method
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    85
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    86
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    87
	virtual ~MQ() {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    88
		mq_close(handle);
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    89
		if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str());
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    90
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    91
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    92
	MQ(const MQ& other) = delete;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    93
	void operator=(const MQ& right) = delete;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    94
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    95
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    96
class MQReader : public MQ {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    97
public:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    98
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    99
	MQReader(std::string name) : MQ(name, mq_open(name.c_str(), O_RDONLY)) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   100
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   101
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   102
	void receive(Message* m) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   103
		int result = mq_receive(handle, (char*) m, sizeof (*m), nullptr);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   104
		if (result < 0) throw RelpipeWriterException(L"Unable to receive POSIX MQ message.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   105
		m->checkDataLength();
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   106
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   107
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   108
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   109
class MQWriter : public MQ {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   110
private:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   111
	mq_attr attributes;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   112
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   113
	mq_attr* getAttributes() {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   114
		attributes.mq_maxmsg = MQ::MAX_MESSAGES;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   115
		attributes.mq_msgsize = sizeof (Message);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   116
		return &attributes;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   117
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   118
public:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   119
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   120
	MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) {
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   121
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   122
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   123
	void send(const Message* m, unsigned int priority = 0) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   124
		m->checkDataLength();
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   125
		int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   126
		if (result) throw RelpipeWriterException(L"Unable to send POSIX MQ message.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   127
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   128
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   129
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   130
/**
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   131
 * TODO: move to a common/streamlet library
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   132
 */
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   133
class NamedMutex {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   134
private:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   135
	/**
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   136
	 * Process where this object was created. 
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   137
	 * During fork() this object is copied.
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   138
	 * Using this variable we can detect the copy.
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   139
	 */
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   140
	__pid_t originalPid;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   141
	sem_t* handle;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   142
	std::string name;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   143
public:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   144
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   145
	NamedMutex(std::string name) : originalPid(getpid()), name(name) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   146
		handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   147
		sem_post(handle);
59
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   148
		// TODO: factory method, check errors
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   149
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   150
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   151
	~NamedMutex() {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   152
		sem_close(handle);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   153
		if (originalPid == getpid()) sem_unlink(name.c_str());
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   154
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   155
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   156
	NamedMutex(const NamedMutex&) = delete;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   157
	NamedMutex& operator=(const NamedMutex&) = delete;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   158
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   159
	void lock() {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   160
		sem_wait(handle);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   161
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   162
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   163
	void unlock() {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   164
		sem_post(handle);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   165
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   166
};
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   167
59
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   168
class ParallelFilesystemWorker : FilesystemCommand {
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   169
private:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   170
	std::string queueName;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   171
	NamedMutex& stdoutMutex;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   172
	string_t relationName;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   173
	std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   174
	Configuration& configuration;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   175
	std::wstring_convert < codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   176
public:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   177
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   178
	ParallelFilesystemWorker(std::string queueName, NamedMutex& stdoutMutex, string_t relationName, std::map<string_t, std::shared_ptr<AttributeFinder> > attributeFinders, Configuration& configuration) : queueName(queueName), stdoutMutex(stdoutMutex), relationName(relationName), attributeFinders(attributeFinders), configuration(configuration) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   179
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   180
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   181
	void run() {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   182
		MQ::Message readBuffer;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   183
		MQReader mq(queueName.c_str());
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   184
59
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   185
		std::stringstream writeBuffer;
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   186
		std::shared_ptr<RelationalWriter> writer(Factory::create(writeBuffer));
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   187
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   188
		writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields, false);
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   189
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   190
		while (true) {
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   191
			mq.receive(&readBuffer);
59
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   192
			if (readBuffer.type == MQ::Message::Type::FILENAME) {
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   193
				std::stringstream originalName(readBuffer.getStringData());
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   194
				processSingleFile(writer, originalName, attributeFinders, configuration, relationName);
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   195
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   196
				{
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   197
					std::lock_guard lock(stdoutMutex);
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   198
					std::cout << writeBuffer.rdbuf() << std::flush;
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   199
					// TODO: optional (configurable) buffering: write multiple records in a single batch
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   200
				}
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   201
				writeBuffer.str("");
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   202
				writeBuffer.clear();
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   203
			} else if (readBuffer.type == MQ::Message::Type::END) {
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   204
				break;
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   205
			} else {
59
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   206
				throw RelpipeWriterException(L"ParallelFilesystemWorker recieved message of unsupported type: " + std::to_wstring((int) readBuffer.type)); // TODO: better exception
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   207
			}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   208
		}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   209
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   210
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   211
59
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   212
	void process(int inputFD, int outputFD, Configuration& configuration) override {
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   213
		// TODO: refactoring, not used
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   214
	}
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   215
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   216
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   217
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   218
class ParallelFilesystemProcess {
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   219
private:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   220
	__pid_t subPid;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   221
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   222
	ParallelFilesystemProcess(__pid_t subPid) : subPid(subPid) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   223
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   224
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   225
	/**
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   226
	 * TODO: move to a common library (copied from the AWK module) 
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   227
	 */
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   228
	static void redirectFD(int oldfd, int newfd) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   229
		int result = dup2(oldfd, newfd);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   230
		if (result < 0) throw ParallelFilesystemProcess::Exception(L"Unable redirect FD.");
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   231
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   232
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   233
	/**
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   234
	 * TODO: move to a common library (copied from the AWK module) 
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   235
	 */
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   236
	static void closeOrThrow(int fd) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   237
		int error = close(fd);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   238
		if (error) throw ParallelFilesystemProcess::Exception(L"Unable to close FD: " + std::to_wstring(fd) + L" from PID: " + std::to_wstring(getpid()));
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   239
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   240
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   241
public:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   242
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   243
	class Exception : public relpipe::writer::RelpipeWriterException {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   244
	public:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   245
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   246
		Exception(std::wstring message) : relpipe::writer::RelpipeWriterException(message) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   247
		}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   248
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   249
	};
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   250
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   251
	static ParallelFilesystemProcess* create(std::string queueName, int outputFD, NamedMutex& stdoutMutex, string_t relationName, std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders, Configuration& configuration) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   252
		__pid_t subPid = fork();
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   253
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   254
		if (subPid < 0) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   255
			throw SubProcess::Exception(L"Unable to fork the hash process.");
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   256
		} else if (subPid == 0) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   257
			// Child process
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   258
			closeOrThrow(STDIN_FILENO); // strace -cf will show failed close() calls (same as number of processes)
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   259
			if (outputFD != STDOUT_FILENO) redirectFD(outputFD, STDOUT_FILENO);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   260
			ParallelFilesystemWorker w(queueName, stdoutMutex, relationName, attributeFinders, configuration);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   261
			w.run();
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   262
			return nullptr;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   263
		} else {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   264
			// Parent process
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   265
			return new ParallelFilesystemProcess(subPid);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   266
		}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   267
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   268
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   269
	int wait() {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   270
		int status = -1;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   271
		::waitpid(subPid, &status, 0);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   272
		return status;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   273
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   274
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   275
	__pid_t getPid() const {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   276
		return subPid;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   277
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   278
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   279
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   280
54
ef726975c34b parallel processing: rename FilesystemCommandBase to FilesystemCommand
František Kučera <franta-hg@frantovo.cz>
parents: 52
diff changeset
   281
class ParallelFilesystemCommand : public FilesystemCommand {
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   282
public:
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   283
57
c40a241d6e0c parallel processing: use directly file descriptors (FD) instead of STDIO streams
František Kučera <franta-hg@frantovo.cz>
parents: 56
diff changeset
   284
	void process(int inputFD, int outputFD, Configuration& configuration) {
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   285
		__gnu_cxx::stdio_filebuf<char> inputBuffer(inputFD, std::ios::in);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   286
		__gnu_cxx::stdio_filebuf<char> outputBuffer(outputFD, std::ios::out);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   287
		std::istream input(&inputBuffer);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   288
		std::ostream output(&outputBuffer);
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   289
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   290
		// Write relation header:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   291
		string_t relationName = fetchRelationName(&configuration);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   292
		std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders = createAttributeFinders();
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   293
		std::shared_ptr<RelationalWriter> writer(Factory::create(output));
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   294
		writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   295
		output.flush();
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   296
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   297
		// Create queue:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   298
		std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid());
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   299
		MQWriter mq(queueName.c_str());
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   300
		MQ::Message writeBuffer;
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   301
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   302
		// Create lock for STDOUT synchronization:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   303
		NamedMutex stdoutMutex(queueName);
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   304
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   305
		// Start workers:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   306
		std::vector<std::shared_ptr < ParallelFilesystemProcess>> workerProcesses;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   307
		bool inMainProcess = true;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   308
		for (int i = 0; i < configuration.parallelism; i++) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   309
			std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, stdoutMutex, relationName, createAttributeFinders(), configuration));
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   310
			if (workerProcess) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   311
				workerProcesses.push_back(workerProcess);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   312
			} else {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   313
				inMainProcess = false;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   314
				break;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   315
			}
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   316
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   317
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   318
		if (inMainProcess) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   319
			// Distribute file names to the workers:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   320
			for (std::stringstream originalName; readNext(input, originalName); reset(originalName)) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   321
				writeBuffer.type = MQ::Message::Type::FILENAME;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   322
				writeBuffer.setStringData(originalName.str());
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   323
				mq.send(&writeBuffer);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   324
			}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   325
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   326
			// Tell workers that everything is done:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   327
			writeBuffer.type = MQ::Message::Type::END;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   328
			writeBuffer.setStringData("");
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   329
			for (int i = 0; i < configuration.parallelism; i++) mq.send(&writeBuffer);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   330
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   331
			// Wait for workers exit:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   332
			std::map<__pid_t, int> failedProcesses;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   333
			for (std::shared_ptr<ParallelFilesystemProcess> p : workerProcesses) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   334
				int result = p->wait();
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   335
				if (result) failedProcesses[p->getPid()] = result;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   336
			}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   337
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   338
			if (failedProcesses.size()) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   339
				std::wstringstream errorMessage;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   340
				errorMessage << L"One or more processes failed: ";
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   341
				for (auto failed : failedProcesses) errorMessage << failed.first << L":" << failed.second << L", ";
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   342
				throw ParallelFilesystemProcess::Exception(errorMessage.str());
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   343
			}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   344
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   345
		} else {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   346
			// we are in a worker process → do nothing, finished
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   347
		}
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   348
	}
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   349
};
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   350
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   351
}
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   352
}
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   353
}