author | František Kučera <franta-hg@frantovo.cz> |
Sun, 25 Apr 2021 20:30:53 +0200 | |
branch | v_0 |
changeset 91 | cb1adcd17d0c |
parent 61 | 640ba8948d69 |
permissions | -rw-r--r-- |
52
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
1 |
/** |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
2 |
* Relational pipes |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
3 |
* Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info) |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
4 |
* |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
5 |
* This program is free software: you can redistribute it and/or modify |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
6 |
* it under the terms of the GNU General Public License as published by |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
7 |
* the Free Software Foundation, version 3 of the License. |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
8 |
* |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
9 |
* This program is distributed in the hope that it will be useful, |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
10 |
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
11 |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
12 |
* GNU General Public License for more details. |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
13 |
* |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
14 |
* You should have received a copy of the GNU General Public License |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
15 |
* along with this program. If not, see <http://www.gnu.org/licenses/>. |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
16 |
*/ |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
17 |
#pragma once |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
18 |
|
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
19 |
#include <mutex> |
55
698836fc65b4
parallel processing: link to librt (POSIX MQ)
František Kučera <franta-hg@frantovo.cz>
parents:
54
diff
changeset
|
20 |
#include <mqueue.h> |
56
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
21 |
#include <limits.h> |
57
c40a241d6e0c
parallel processing: use directly file descriptors (FD) instead of STDIO streams
František Kučera <franta-hg@frantovo.cz>
parents:
56
diff
changeset
|
22 |
#include <ext/stdio_filebuf.h> |
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
23 |
#include <sys/wait.h> |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
24 |
#include <semaphore.h> |
55
698836fc65b4
parallel processing: link to librt (POSIX MQ)
František Kučera <franta-hg@frantovo.cz>
parents:
54
diff
changeset
|
25 |
|
54
ef726975c34b
parallel processing: rename FilesystemCommandBase to FilesystemCommand
František Kučera <franta-hg@frantovo.cz>
parents:
52
diff
changeset
|
26 |
#include "FilesystemCommand.h" |
52
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
27 |
|
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
28 |
namespace relpipe { |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
29 |
namespace in { |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
30 |
namespace filesystem { |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
31 |
|
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
32 |
namespace fs = std::filesystem; |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
33 |
using namespace relpipe::writer; |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
34 |
|
56
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
35 |
class MQ { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
36 |
protected: |
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
37 |
/** |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
38 |
* Process where this object was created. |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
39 |
* During fork() this object is copied. |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
40 |
* Using this variable we can detect the copy. |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
41 |
*/ |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
42 |
__pid_t originalPid; |
56
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
43 |
std::string name; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
44 |
mqd_t handle; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
45 |
bool unlinkAfterClose; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
46 |
static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
47 |
static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
48 |
|
60
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
49 |
template<typename... Args> static mqd_t mqOpen(const char *__name, int __oflag, Args... args) { |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
50 |
mqd_t handle = mq_open(__name, __oflag, args...); |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
51 |
if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ."); |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
52 |
else return handle; |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
53 |
} |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
54 |
|
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
55 |
/** |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
56 |
* @param name |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
57 |
* @param handle do not call mq_open() directly, use MQ:mqOpen() instead. |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
58 |
* @param unlinkAfterClose |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
59 |
*/ |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
60 |
MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) { |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
61 |
} |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
62 |
|
56
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
63 |
public: |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
64 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
65 |
class Message { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
66 |
public: |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
67 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
68 |
enum class Type { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
69 |
FILENAME, |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
70 |
END |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
71 |
}; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
72 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
73 |
Type type; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
74 |
size_t dataLength; // TODO: maybe uint16_t from #include <cstdint> would be enough (and shorten the message minimum size from 16 to 4) |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
75 |
char data[MQ::MAX_DATA_LENGTH]; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
76 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
77 |
void checkDataLength() const { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
78 |
if (dataLength > sizeof (data) || dataLength < 0) throw RelpipeWriterException(L"Invalid POSIX MQ message size."); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
79 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
80 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
81 |
size_t getMessageLength() const { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
82 |
return sizeof (*this) - sizeof (data) + dataLength; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
83 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
84 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
85 |
std::string getStringData() { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
86 |
return std::string(data, dataLength); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
87 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
88 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
89 |
void setStringData(const std::string& s) { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
90 |
if (s.size() > sizeof (data)) throw RelpipeWriterException(L"Unable set message data: string too long."); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
91 |
::memcpy(data, s.c_str(), s.size()); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
92 |
dataLength = s.size(); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
93 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
94 |
}; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
95 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
96 |
virtual ~MQ() { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
97 |
mq_close(handle); |
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
98 |
if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str()); |
56
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
99 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
100 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
101 |
MQ(const MQ& other) = delete; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
102 |
void operator=(const MQ& right) = delete; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
103 |
}; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
104 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
105 |
class MQReader : public MQ { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
106 |
public: |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
107 |
|
60
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
108 |
MQReader(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDONLY)) { |
56
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
109 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
110 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
111 |
void receive(Message* m) { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
112 |
int result = mq_receive(handle, (char*) m, sizeof (*m), nullptr); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
113 |
if (result < 0) throw RelpipeWriterException(L"Unable to receive POSIX MQ message."); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
114 |
m->checkDataLength(); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
115 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
116 |
}; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
117 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
118 |
class MQWriter : public MQ { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
119 |
private: |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
120 |
mq_attr attributes; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
121 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
122 |
mq_attr* getAttributes() { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
123 |
attributes.mq_maxmsg = MQ::MAX_MESSAGES; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
124 |
attributes.mq_msgsize = sizeof (Message); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
125 |
return &attributes; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
126 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
127 |
public: |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
128 |
|
60
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
129 |
MQWriter(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) { |
56
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
130 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
131 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
132 |
void send(const Message* m, unsigned int priority = 0) { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
133 |
m->checkDataLength(); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
134 |
int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
135 |
if (result) throw RelpipeWriterException(L"Unable to send POSIX MQ message."); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
136 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
137 |
}; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
138 |
|
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
139 |
/** |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
140 |
* TODO: move to a common/streamlet library |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
141 |
*/ |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
142 |
class NamedMutex { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
143 |
private: |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
144 |
/** |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
145 |
* Process where this object was created. |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
146 |
* During fork() this object is copied. |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
147 |
* Using this variable we can detect the copy. |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
148 |
*/ |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
149 |
__pid_t originalPid; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
150 |
sem_t* handle; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
151 |
std::string name; |
60
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
152 |
|
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
153 |
NamedMutex() { |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
154 |
} |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
155 |
|
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
156 |
public: |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
157 |
|
60
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
158 |
static NamedMutex* create(std::string name) { |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
159 |
sem_t* handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
160 |
if (handle == SEM_FAILED) throw RelpipeWriterException(L"Unable to open POSIX semaphore."); |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
161 |
|
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
162 |
NamedMutex* result = new NamedMutex(); |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
163 |
result->name = name; |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
164 |
result->handle = handle; |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
165 |
result->originalPid = getpid(); |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
166 |
result->unlock(); |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
167 |
|
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
168 |
return result; |
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
169 |
} |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
170 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
171 |
~NamedMutex() { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
172 |
sem_close(handle); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
173 |
if (originalPid == getpid()) sem_unlink(name.c_str()); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
174 |
} |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
175 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
176 |
NamedMutex(const NamedMutex&) = delete; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
177 |
NamedMutex& operator=(const NamedMutex&) = delete; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
178 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
179 |
void lock() { |
60
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
180 |
int error = sem_wait(handle); |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
181 |
if (error) throw RelpipeWriterException(L"Unable to lock POSIX semaphore."); |
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
182 |
} |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
183 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
184 |
void unlock() { |
60
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
185 |
int error = sem_post(handle); |
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
186 |
if (error) throw RelpipeWriterException(L"Unable to unlock POSIX semaphore."); |
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
187 |
} |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
188 |
}; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
189 |
|
61
640ba8948d69
parallel processing: refactoring: ParallelFilesystemWorker inherits FilesystemWorker
František Kučera <franta-hg@frantovo.cz>
parents:
60
diff
changeset
|
190 |
class ParallelFilesystemWorker : FilesystemWorker { |
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
191 |
private: |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
192 |
std::string queueName; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
193 |
NamedMutex& stdoutMutex; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
194 |
string_t relationName; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
195 |
std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
196 |
Configuration& configuration; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
197 |
std::wstring_convert < codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings. |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
198 |
public: |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
199 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
200 |
ParallelFilesystemWorker(std::string queueName, NamedMutex& stdoutMutex, string_t relationName, std::map<string_t, std::shared_ptr<AttributeFinder> > attributeFinders, Configuration& configuration) : queueName(queueName), stdoutMutex(stdoutMutex), relationName(relationName), attributeFinders(attributeFinders), configuration(configuration) { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
201 |
} |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
202 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
203 |
void run() { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
204 |
MQ::Message readBuffer; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
205 |
MQReader mq(queueName.c_str()); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
206 |
|
59
7471529c0d11
parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents:
58
diff
changeset
|
207 |
std::stringstream writeBuffer; |
7471529c0d11
parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents:
58
diff
changeset
|
208 |
std::shared_ptr<RelationalWriter> writer(Factory::create(writeBuffer)); |
7471529c0d11
parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents:
58
diff
changeset
|
209 |
|
7471529c0d11
parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents:
58
diff
changeset
|
210 |
writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields, false); |
7471529c0d11
parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents:
58
diff
changeset
|
211 |
|
7471529c0d11
parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents:
58
diff
changeset
|
212 |
while (true) { |
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
213 |
mq.receive(&readBuffer); |
59
7471529c0d11
parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents:
58
diff
changeset
|
214 |
if (readBuffer.type == MQ::Message::Type::FILENAME) { |
7471529c0d11
parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents:
58
diff
changeset
|
215 |
std::stringstream originalName(readBuffer.getStringData()); |
7471529c0d11
parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents:
58
diff
changeset
|
216 |
processSingleFile(writer, originalName, attributeFinders, configuration, relationName); |
7471529c0d11
parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents:
58
diff
changeset
|
217 |
|
7471529c0d11
parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents:
58
diff
changeset
|
218 |
{ |
7471529c0d11
parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents:
58
diff
changeset
|
219 |
std::lock_guard lock(stdoutMutex); |
7471529c0d11
parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents:
58
diff
changeset
|
220 |
std::cout << writeBuffer.rdbuf() << std::flush; |
7471529c0d11
parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents:
58
diff
changeset
|
221 |
// TODO: optional (configurable) buffering: write multiple records in a single batch |
7471529c0d11
parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents:
58
diff
changeset
|
222 |
} |
7471529c0d11
parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents:
58
diff
changeset
|
223 |
writeBuffer.str(""); |
7471529c0d11
parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents:
58
diff
changeset
|
224 |
writeBuffer.clear(); |
7471529c0d11
parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents:
58
diff
changeset
|
225 |
} else if (readBuffer.type == MQ::Message::Type::END) { |
7471529c0d11
parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents:
58
diff
changeset
|
226 |
break; |
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
227 |
} else { |
59
7471529c0d11
parallel processing: first working version
František Kučera <franta-hg@frantovo.cz>
parents:
58
diff
changeset
|
228 |
throw RelpipeWriterException(L"ParallelFilesystemWorker recieved message of unsupported type: " + std::to_wstring((int) readBuffer.type)); // TODO: better exception |
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
229 |
} |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
230 |
} |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
231 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
232 |
} |
56
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
233 |
}; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
234 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
235 |
class ParallelFilesystemProcess { |
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
236 |
private: |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
237 |
__pid_t subPid; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
238 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
239 |
ParallelFilesystemProcess(__pid_t subPid) : subPid(subPid) { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
240 |
} |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
241 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
242 |
/** |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
243 |
* TODO: move to a common library (copied from the AWK module) |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
244 |
*/ |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
245 |
static void redirectFD(int oldfd, int newfd) { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
246 |
int result = dup2(oldfd, newfd); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
247 |
if (result < 0) throw ParallelFilesystemProcess::Exception(L"Unable redirect FD."); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
248 |
} |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
249 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
250 |
/** |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
251 |
* TODO: move to a common library (copied from the AWK module) |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
252 |
*/ |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
253 |
static void closeOrThrow(int fd) { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
254 |
int error = close(fd); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
255 |
if (error) throw ParallelFilesystemProcess::Exception(L"Unable to close FD: " + std::to_wstring(fd) + L" from PID: " + std::to_wstring(getpid())); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
256 |
} |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
257 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
258 |
public: |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
259 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
260 |
class Exception : public relpipe::writer::RelpipeWriterException { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
261 |
public: |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
262 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
263 |
Exception(std::wstring message) : relpipe::writer::RelpipeWriterException(message) { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
264 |
} |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
265 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
266 |
}; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
267 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
268 |
static ParallelFilesystemProcess* create(std::string queueName, int outputFD, NamedMutex& stdoutMutex, string_t relationName, std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders, Configuration& configuration) { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
269 |
__pid_t subPid = fork(); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
270 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
271 |
if (subPid < 0) { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
272 |
throw SubProcess::Exception(L"Unable to fork the hash process."); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
273 |
} else if (subPid == 0) { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
274 |
// Child process |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
275 |
closeOrThrow(STDIN_FILENO); // strace -cf will show failed close() calls (same as number of processes) |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
276 |
if (outputFD != STDOUT_FILENO) redirectFD(outputFD, STDOUT_FILENO); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
277 |
ParallelFilesystemWorker w(queueName, stdoutMutex, relationName, attributeFinders, configuration); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
278 |
w.run(); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
279 |
return nullptr; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
280 |
} else { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
281 |
// Parent process |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
282 |
return new ParallelFilesystemProcess(subPid); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
283 |
} |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
284 |
} |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
285 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
286 |
int wait() { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
287 |
int status = -1; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
288 |
::waitpid(subPid, &status, 0); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
289 |
return status; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
290 |
} |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
291 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
292 |
__pid_t getPid() const { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
293 |
return subPid; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
294 |
} |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
295 |
|
56
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
296 |
}; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
297 |
|
54
ef726975c34b
parallel processing: rename FilesystemCommandBase to FilesystemCommand
František Kučera <franta-hg@frantovo.cz>
parents:
52
diff
changeset
|
298 |
class ParallelFilesystemCommand : public FilesystemCommand { |
52
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
299 |
public: |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
300 |
|
57
c40a241d6e0c
parallel processing: use directly file descriptors (FD) instead of STDIO streams
František Kučera <franta-hg@frantovo.cz>
parents:
56
diff
changeset
|
301 |
void process(int inputFD, int outputFD, Configuration& configuration) { |
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
302 |
__gnu_cxx::stdio_filebuf<char> inputBuffer(inputFD, std::ios::in); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
303 |
__gnu_cxx::stdio_filebuf<char> outputBuffer(outputFD, std::ios::out); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
304 |
std::istream input(&inputBuffer); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
305 |
std::ostream output(&outputBuffer); |
56
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
306 |
|
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
307 |
// Write relation header: |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
308 |
string_t relationName = fetchRelationName(&configuration); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
309 |
std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders = createAttributeFinders(); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
310 |
std::shared_ptr<RelationalWriter> writer(Factory::create(output)); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
311 |
writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
312 |
output.flush(); |
56
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
313 |
|
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
314 |
// Create queue: |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
315 |
std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid()); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
316 |
MQWriter mq(queueName.c_str()); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
317 |
MQ::Message writeBuffer; |
56
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
318 |
|
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
319 |
// Create lock for STDOUT synchronization: |
60
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
320 |
std::unique_ptr<NamedMutex> stdoutMutex(NamedMutex::create(queueName)); |
56
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
321 |
|
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
322 |
// Start workers: |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
323 |
std::vector<std::shared_ptr < ParallelFilesystemProcess>> workerProcesses; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
324 |
bool inMainProcess = true; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
325 |
for (int i = 0; i < configuration.parallelism; i++) { |
60
bb7ca5891755
parallel processing: refactoring and clean-up
František Kučera <franta-hg@frantovo.cz>
parents:
59
diff
changeset
|
326 |
std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, *stdoutMutex, relationName, createAttributeFinders(), configuration)); |
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
327 |
if (workerProcess) { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
328 |
workerProcesses.push_back(workerProcess); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
329 |
} else { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
330 |
inMainProcess = false; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
331 |
break; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
332 |
} |
56
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
333 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
334 |
|
58
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
335 |
if (inMainProcess) { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
336 |
// Distribute file names to the workers: |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
337 |
for (std::stringstream originalName; readNext(input, originalName); reset(originalName)) { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
338 |
writeBuffer.type = MQ::Message::Type::FILENAME; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
339 |
writeBuffer.setStringData(originalName.str()); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
340 |
mq.send(&writeBuffer); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
341 |
} |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
342 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
343 |
// Tell workers that everything is done: |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
344 |
writeBuffer.type = MQ::Message::Type::END; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
345 |
writeBuffer.setStringData(""); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
346 |
for (int i = 0; i < configuration.parallelism; i++) mq.send(&writeBuffer); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
347 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
348 |
// Wait for workers exit: |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
349 |
std::map<__pid_t, int> failedProcesses; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
350 |
for (std::shared_ptr<ParallelFilesystemProcess> p : workerProcesses) { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
351 |
int result = p->wait(); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
352 |
if (result) failedProcesses[p->getPid()] = result; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
353 |
} |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
354 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
355 |
if (failedProcesses.size()) { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
356 |
std::wstringstream errorMessage; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
357 |
errorMessage << L"One or more processes failed: "; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
358 |
for (auto failed : failedProcesses) errorMessage << failed.first << L":" << failed.second << L", "; |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
359 |
throw ParallelFilesystemProcess::Exception(errorMessage.str()); |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
360 |
} |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
361 |
|
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
362 |
} else { |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
363 |
// we are in a worker process → do nothing, finished |
4679f67a8324
parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
František Kučera <franta-hg@frantovo.cz>
parents:
57
diff
changeset
|
364 |
} |
52
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
365 |
} |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
366 |
}; |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
367 |
|
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
368 |
} |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
369 |
} |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
370 |
} |