src/ParallelFilesystemCommand.h
author František Kučera <franta-hg@frantovo.cz>
Sun, 25 Apr 2021 18:47:57 +0200
branchv_0
changeset 89 25a11859975b
parent 61 640ba8948d69
permissions -rw-r--r--
streamlet examples: QR: rename qr to qr-decode + simplify Makefile
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     1
/**
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     2
 * Relational pipes
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     3
 * Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     4
 *
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     5
 * This program is free software: you can redistribute it and/or modify
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     6
 * it under the terms of the GNU General Public License as published by
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     7
 * the Free Software Foundation, version 3 of the License.
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     8
 *
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
     9
 * This program is distributed in the hope that it will be useful,
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    10
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    11
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    12
 * GNU General Public License for more details.
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    13
 *
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    14
 * You should have received a copy of the GNU General Public License
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    15
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    16
 */
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    17
#pragma once
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    18
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    19
#include <mutex>
55
698836fc65b4 parallel processing: link to librt (POSIX MQ)
František Kučera <franta-hg@frantovo.cz>
parents: 54
diff changeset
    20
#include <mqueue.h>
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    21
#include <limits.h>
57
c40a241d6e0c parallel processing: use directly file descriptors (FD) instead of STDIO streams
František Kučera <franta-hg@frantovo.cz>
parents: 56
diff changeset
    22
#include <ext/stdio_filebuf.h>
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    23
#include <sys/wait.h>
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    24
#include <semaphore.h>
55
698836fc65b4 parallel processing: link to librt (POSIX MQ)
František Kučera <franta-hg@frantovo.cz>
parents: 54
diff changeset
    25
54
ef726975c34b parallel processing: rename FilesystemCommandBase to FilesystemCommand
František Kučera <franta-hg@frantovo.cz>
parents: 52
diff changeset
    26
#include "FilesystemCommand.h"
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    27
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    28
namespace relpipe {
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    29
namespace in {
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    30
namespace filesystem {
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    31
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    32
namespace fs = std::filesystem;
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    33
using namespace relpipe::writer;
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
    34
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    35
class MQ {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    36
protected:
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    37
	/**
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    38
	 * Process where this object was created. 
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    39
	 * During fork() this object is copied.
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    40
	 * Using this variable we can detect the copy.
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    41
	 */
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    42
	__pid_t originalPid;
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    43
	std::string name;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    44
	mqd_t handle;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    45
	bool unlinkAfterClose;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    46
	static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    47
	static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    48
60
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
    49
	template<typename... Args> static mqd_t mqOpen(const char *__name, int __oflag, Args... args) {
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
    50
		mqd_t handle = mq_open(__name, __oflag, args...);
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
    51
		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
    52
		else return handle;
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
    53
	}
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
    54
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
    55
	/**
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
    56
	 * @param name
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
    57
	 * @param handle do not call mq_open() directly, use MQ:mqOpen() instead.
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
    58
	 * @param unlinkAfterClose
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
    59
	 */
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
    60
	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
    61
	}
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
    62
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    63
public:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    64
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    65
	class Message {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    66
	public:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    67
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    68
		enum class Type {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    69
			FILENAME,
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    70
			END
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    71
		};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    72
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    73
		Type type;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    74
		size_t dataLength; // TODO: maybe uint16_t from #include <cstdint> would be enough (and shorten the message minimum size from 16 to 4)
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    75
		char data[MQ::MAX_DATA_LENGTH];
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    76
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    77
		void checkDataLength() const {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    78
			if (dataLength > sizeof (data) || dataLength < 0) throw RelpipeWriterException(L"Invalid POSIX MQ message size.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    79
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    80
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    81
		size_t getMessageLength() const {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    82
			return sizeof (*this) - sizeof (data) + dataLength;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    83
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    84
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    85
		std::string getStringData() {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    86
			return std::string(data, dataLength);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    87
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    88
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    89
		void setStringData(const std::string& s) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    90
			if (s.size() > sizeof (data)) throw RelpipeWriterException(L"Unable set message data: string too long.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    91
			::memcpy(data, s.c_str(), s.size());
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    92
			dataLength = s.size();
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    93
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    94
	};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    95
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    96
	virtual ~MQ() {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    97
		mq_close(handle);
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
    98
		if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str());
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
    99
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   100
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   101
	MQ(const MQ& other) = delete;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   102
	void operator=(const MQ& right) = delete;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   103
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   104
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   105
class MQReader : public MQ {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   106
public:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   107
60
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   108
	MQReader(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDONLY)) {
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   109
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   110
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   111
	void receive(Message* m) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   112
		int result = mq_receive(handle, (char*) m, sizeof (*m), nullptr);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   113
		if (result < 0) throw RelpipeWriterException(L"Unable to receive POSIX MQ message.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   114
		m->checkDataLength();
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   115
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   116
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   117
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   118
class MQWriter : public MQ {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   119
private:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   120
	mq_attr attributes;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   121
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   122
	mq_attr* getAttributes() {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   123
		attributes.mq_maxmsg = MQ::MAX_MESSAGES;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   124
		attributes.mq_msgsize = sizeof (Message);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   125
		return &attributes;
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   126
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   127
public:
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   128
60
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   129
	MQWriter(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) {
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   130
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   131
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   132
	void send(const Message* m, unsigned int priority = 0) {
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   133
		m->checkDataLength();
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   134
		int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority);
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   135
		if (result) throw RelpipeWriterException(L"Unable to send POSIX MQ message.");
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   136
	}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   137
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   138
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   139
/**
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   140
 * TODO: move to a common/streamlet library
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   141
 */
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   142
class NamedMutex {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   143
private:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   144
	/**
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   145
	 * Process where this object was created. 
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   146
	 * During fork() this object is copied.
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   147
	 * Using this variable we can detect the copy.
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   148
	 */
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   149
	__pid_t originalPid;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   150
	sem_t* handle;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   151
	std::string name;
60
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   152
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   153
	NamedMutex() {
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   154
	}
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   155
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   156
public:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   157
60
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   158
	static NamedMutex* create(std::string name) {
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   159
		sem_t* handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   160
		if (handle == SEM_FAILED) throw RelpipeWriterException(L"Unable to open POSIX semaphore.");
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   161
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   162
		NamedMutex* result = new NamedMutex();
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   163
		result->name = name;
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   164
		result->handle = handle;
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   165
		result->originalPid = getpid();
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   166
		result->unlock();
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   167
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   168
		return result;
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   169
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   170
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   171
	~NamedMutex() {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   172
		sem_close(handle);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   173
		if (originalPid == getpid()) sem_unlink(name.c_str());
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   174
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   175
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   176
	NamedMutex(const NamedMutex&) = delete;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   177
	NamedMutex& operator=(const NamedMutex&) = delete;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   178
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   179
	void lock() {
60
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   180
		int error = sem_wait(handle);
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   181
		if (error) throw RelpipeWriterException(L"Unable to lock POSIX semaphore.");
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   182
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   183
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   184
	void unlock() {
60
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   185
		int error = sem_post(handle);
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   186
		if (error) throw RelpipeWriterException(L"Unable to unlock POSIX semaphore.");
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   187
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   188
};
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   189
61
640ba8948d69 parallel processing: refactoring: ParallelFilesystemWorker inherits FilesystemWorker
František Kučera <franta-hg@frantovo.cz>
parents: 60
diff changeset
   190
class ParallelFilesystemWorker : FilesystemWorker {
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   191
private:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   192
	std::string queueName;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   193
	NamedMutex& stdoutMutex;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   194
	string_t relationName;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   195
	std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   196
	Configuration& configuration;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   197
	std::wstring_convert < codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   198
public:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   199
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   200
	ParallelFilesystemWorker(std::string queueName, NamedMutex& stdoutMutex, string_t relationName, std::map<string_t, std::shared_ptr<AttributeFinder> > attributeFinders, Configuration& configuration) : queueName(queueName), stdoutMutex(stdoutMutex), relationName(relationName), attributeFinders(attributeFinders), configuration(configuration) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   201
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   202
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   203
	void run() {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   204
		MQ::Message readBuffer;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   205
		MQReader mq(queueName.c_str());
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   206
59
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   207
		std::stringstream writeBuffer;
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   208
		std::shared_ptr<RelationalWriter> writer(Factory::create(writeBuffer));
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   209
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   210
		writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields, false);
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   211
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   212
		while (true) {
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   213
			mq.receive(&readBuffer);
59
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   214
			if (readBuffer.type == MQ::Message::Type::FILENAME) {
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   215
				std::stringstream originalName(readBuffer.getStringData());
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   216
				processSingleFile(writer, originalName, attributeFinders, configuration, relationName);
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   217
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   218
				{
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   219
					std::lock_guard lock(stdoutMutex);
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   220
					std::cout << writeBuffer.rdbuf() << std::flush;
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   221
					// TODO: optional (configurable) buffering: write multiple records in a single batch
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   222
				}
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   223
				writeBuffer.str("");
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   224
				writeBuffer.clear();
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   225
			} else if (readBuffer.type == MQ::Message::Type::END) {
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   226
				break;
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   227
			} else {
59
7471529c0d11 parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents: 58
diff changeset
   228
				throw RelpipeWriterException(L"ParallelFilesystemWorker recieved message of unsupported type: " + std::to_wstring((int) readBuffer.type)); // TODO: better exception
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   229
			}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   230
		}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   231
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   232
	}
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   233
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   234
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   235
class ParallelFilesystemProcess {
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   236
private:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   237
	__pid_t subPid;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   238
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   239
	ParallelFilesystemProcess(__pid_t subPid) : subPid(subPid) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   240
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   241
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   242
	/**
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   243
	 * TODO: move to a common library (copied from the AWK module) 
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   244
	 */
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   245
	static void redirectFD(int oldfd, int newfd) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   246
		int result = dup2(oldfd, newfd);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   247
		if (result < 0) throw ParallelFilesystemProcess::Exception(L"Unable redirect FD.");
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   248
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   249
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   250
	/**
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   251
	 * TODO: move to a common library (copied from the AWK module) 
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   252
	 */
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   253
	static void closeOrThrow(int fd) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   254
		int error = close(fd);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   255
		if (error) throw ParallelFilesystemProcess::Exception(L"Unable to close FD: " + std::to_wstring(fd) + L" from PID: " + std::to_wstring(getpid()));
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   256
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   257
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   258
public:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   259
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   260
	class Exception : public relpipe::writer::RelpipeWriterException {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   261
	public:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   262
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   263
		Exception(std::wstring message) : relpipe::writer::RelpipeWriterException(message) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   264
		}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   265
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   266
	};
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   267
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   268
	static ParallelFilesystemProcess* create(std::string queueName, int outputFD, NamedMutex& stdoutMutex, string_t relationName, std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders, Configuration& configuration) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   269
		__pid_t subPid = fork();
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   270
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   271
		if (subPid < 0) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   272
			throw SubProcess::Exception(L"Unable to fork the hash process.");
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   273
		} else if (subPid == 0) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   274
			// Child process
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   275
			closeOrThrow(STDIN_FILENO); // strace -cf will show failed close() calls (same as number of processes)
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   276
			if (outputFD != STDOUT_FILENO) redirectFD(outputFD, STDOUT_FILENO);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   277
			ParallelFilesystemWorker w(queueName, stdoutMutex, relationName, attributeFinders, configuration);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   278
			w.run();
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   279
			return nullptr;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   280
		} else {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   281
			// Parent process
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   282
			return new ParallelFilesystemProcess(subPid);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   283
		}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   284
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   285
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   286
	int wait() {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   287
		int status = -1;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   288
		::waitpid(subPid, &status, 0);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   289
		return status;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   290
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   291
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   292
	__pid_t getPid() const {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   293
		return subPid;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   294
	}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   295
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   296
};
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   297
54
ef726975c34b parallel processing: rename FilesystemCommandBase to FilesystemCommand
František Kučera <franta-hg@frantovo.cz>
parents: 52
diff changeset
   298
class ParallelFilesystemCommand : public FilesystemCommand {
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   299
public:
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   300
57
c40a241d6e0c parallel processing: use directly file descriptors (FD) instead of STDIO streams
František Kučera <franta-hg@frantovo.cz>
parents: 56
diff changeset
   301
	void process(int inputFD, int outputFD, Configuration& configuration) {
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   302
		__gnu_cxx::stdio_filebuf<char> inputBuffer(inputFD, std::ios::in);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   303
		__gnu_cxx::stdio_filebuf<char> outputBuffer(outputFD, std::ios::out);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   304
		std::istream input(&inputBuffer);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   305
		std::ostream output(&outputBuffer);
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   306
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   307
		// Write relation header:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   308
		string_t relationName = fetchRelationName(&configuration);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   309
		std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders = createAttributeFinders();
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   310
		std::shared_ptr<RelationalWriter> writer(Factory::create(output));
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   311
		writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   312
		output.flush();
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   313
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   314
		// Create queue:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   315
		std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid());
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   316
		MQWriter mq(queueName.c_str());
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   317
		MQ::Message writeBuffer;
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   318
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   319
		// Create lock for STDOUT synchronization:
60
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   320
		std::unique_ptr<NamedMutex> stdoutMutex(NamedMutex::create(queueName));
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   321
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   322
		// Start workers:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   323
		std::vector<std::shared_ptr < ParallelFilesystemProcess>> workerProcesses;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   324
		bool inMainProcess = true;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   325
		for (int i = 0; i < configuration.parallelism; i++) {
60
bb7ca5891755 parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents: 59
diff changeset
   326
			std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, *stdoutMutex, relationName, createAttributeFinders(), configuration));
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   327
			if (workerProcess) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   328
				workerProcesses.push_back(workerProcess);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   329
			} else {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   330
				inMainProcess = false;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   331
				break;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   332
			}
56
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   333
		}
81a53e7cf0ab parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents: 55
diff changeset
   334
58
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   335
		if (inMainProcess) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   336
			// Distribute file names to the workers:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   337
			for (std::stringstream originalName; readNext(input, originalName); reset(originalName)) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   338
				writeBuffer.type = MQ::Message::Type::FILENAME;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   339
				writeBuffer.setStringData(originalName.str());
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   340
				mq.send(&writeBuffer);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   341
			}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   342
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   343
			// Tell workers that everything is done:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   344
			writeBuffer.type = MQ::Message::Type::END;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   345
			writeBuffer.setStringData("");
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   346
			for (int i = 0; i < configuration.parallelism; i++) mq.send(&writeBuffer);
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   347
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   348
			// Wait for workers exit:
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   349
			std::map<__pid_t, int> failedProcesses;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   350
			for (std::shared_ptr<ParallelFilesystemProcess> p : workerProcesses) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   351
				int result = p->wait();
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   352
				if (result) failedProcesses[p->getPid()] = result;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   353
			}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   354
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   355
			if (failedProcesses.size()) {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   356
				std::wstringstream errorMessage;
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   357
				errorMessage << L"One or more processes failed: ";
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   358
				for (auto failed : failedProcesses) errorMessage << failed.first << L":" << failed.second << L", ";
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   359
				throw ParallelFilesystemProcess::Exception(errorMessage.str());
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   360
			}
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   361
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   362
		} else {
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   363
			// we are in a worker process → do nothing, finished
4679f67a8324 parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents: 57
diff changeset
   364
		}
52
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   365
	}
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   366
};
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   367
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   368
}
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   369
}
fea625f0a096 parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff changeset
   370
}