author | František Kučera <franta-hg@frantovo.cz> |
Mon, 20 Jan 2020 23:47:54 +0100 | |
branch | v_0 |
changeset 56 | 81a53e7cf0ab |
parent 55 | 698836fc65b4 |
child 57 | c40a241d6e0c |
permissions | -rw-r--r-- |
52
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
1 |
/** |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
2 |
* Relational pipes |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
3 |
* Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info) |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
4 |
* |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
5 |
* This program is free software: you can redistribute it and/or modify |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
6 |
* it under the terms of the GNU General Public License as published by |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
7 |
* the Free Software Foundation, version 3 of the License. |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
8 |
* |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
9 |
* This program is distributed in the hope that it will be useful, |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
10 |
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
11 |
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
12 |
* GNU General Public License for more details. |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
13 |
* |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
14 |
* You should have received a copy of the GNU General Public License |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
15 |
* along with this program. If not, see <http://www.gnu.org/licenses/>. |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
16 |
*/ |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
17 |
#pragma once |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
18 |
|
55
698836fc65b4
parallel processing: link to librt (POSIX MQ)
František Kučera <franta-hg@frantovo.cz>
parents:
54
diff
changeset
|
19 |
#include <mqueue.h> |
56
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
20 |
#include <limits.h> |
55
698836fc65b4
parallel processing: link to librt (POSIX MQ)
František Kučera <franta-hg@frantovo.cz>
parents:
54
diff
changeset
|
21 |
|
54
ef726975c34b
parallel processing: rename FilesystemCommandBase to FilesystemCommand
František Kučera <franta-hg@frantovo.cz>
parents:
52
diff
changeset
|
22 |
#include "FilesystemCommand.h" |
52
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
23 |
|
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
24 |
namespace relpipe { |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
25 |
namespace in { |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
26 |
namespace filesystem { |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
27 |
|
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
28 |
namespace fs = std::filesystem; |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
29 |
using namespace relpipe::writer; |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
30 |
|
56
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
31 |
class MQ { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
32 |
protected: |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
33 |
std::string name; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
34 |
mqd_t handle; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
35 |
bool unlinkAfterClose; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
36 |
static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
37 |
static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
38 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
39 |
public: |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
40 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
41 |
class Message { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
42 |
public: |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
43 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
44 |
enum class Type { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
45 |
FILENAME, |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
46 |
END |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
47 |
}; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
48 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
49 |
Type type; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
50 |
size_t dataLength; // TODO: maybe uint16_t from #include <cstdint> would be enough (and shorten the message minimum size from 16 to 4) |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
51 |
char data[MQ::MAX_DATA_LENGTH]; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
52 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
53 |
void checkDataLength() const { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
54 |
if (dataLength > sizeof (data) || dataLength < 0) throw RelpipeWriterException(L"Invalid POSIX MQ message size."); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
55 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
56 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
57 |
size_t getMessageLength() const { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
58 |
return sizeof (*this) - sizeof (data) + dataLength; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
59 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
60 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
61 |
std::string getStringData() { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
62 |
return std::string(data, dataLength); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
63 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
64 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
65 |
void setStringData(const std::string& s) { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
66 |
if (s.size() > sizeof (data)) throw RelpipeWriterException(L"Unable set message data: string too long."); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
67 |
::memcpy(data, s.c_str(), s.size()); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
68 |
dataLength = s.size(); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
69 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
70 |
}; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
71 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
72 |
MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
73 |
if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ."); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
74 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
75 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
76 |
virtual ~MQ() { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
77 |
mq_close(handle); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
78 |
if (unlinkAfterClose) mq_unlink(name.c_str()); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
79 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
80 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
81 |
MQ(const MQ& other) = delete; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
82 |
void operator=(const MQ& right) = delete; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
83 |
}; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
84 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
85 |
class MQReader : public MQ { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
86 |
public: |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
87 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
88 |
MQReader(std::string name) : MQ(name, mq_open(name.c_str(), O_RDONLY)) { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
89 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
90 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
91 |
void receive(Message* m) { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
92 |
int result = mq_receive(handle, (char*) m, sizeof (*m), nullptr); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
93 |
if (result < 0) throw RelpipeWriterException(L"Unable to receive POSIX MQ message."); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
94 |
m->checkDataLength(); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
95 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
96 |
}; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
97 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
98 |
class MQWriter : public MQ { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
99 |
private: |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
100 |
mq_attr attributes; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
101 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
102 |
mq_attr* getAttributes() { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
103 |
attributes.mq_maxmsg = MQ::MAX_MESSAGES; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
104 |
attributes.mq_msgsize = sizeof (Message); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
105 |
return &attributes; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
106 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
107 |
public: |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
108 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
109 |
MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT, S_IRWXU, getAttributes()), true) { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
110 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
111 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
112 |
void send(const Message* m, unsigned int priority = 0) { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
113 |
m->checkDataLength(); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
114 |
int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
115 |
if (result) throw RelpipeWriterException(L"Unable to send POSIX MQ message."); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
116 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
117 |
}; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
118 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
119 |
class ParallelFilesystemWorker { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
120 |
}; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
121 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
122 |
class ParallelFilesystemProcess { |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
123 |
}; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
124 |
|
54
ef726975c34b
parallel processing: rename FilesystemCommandBase to FilesystemCommand
František Kučera <franta-hg@frantovo.cz>
parents:
52
diff
changeset
|
125 |
class ParallelFilesystemCommand : public FilesystemCommand { |
52
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
126 |
public: |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
127 |
|
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
128 |
void process(std::istream& input, std::ostream& output, Configuration& configuration) { |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
129 |
// TODO: ParallelFilesystemCommand |
56
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
130 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
131 |
{ // TODO: demo code – remove: |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
132 |
std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid()); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
133 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
134 |
MQWriter mqWriter(queueName.c_str()); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
135 |
MQReader mqReader(queueName.c_str()); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
136 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
137 |
MQ::Message writeBuffer; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
138 |
MQ::Message readBuffer; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
139 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
140 |
// ::memset(&writeBuffer, 0, sizeof (writeBuffer)); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
141 |
// ::memset(&readBuffer, 0, sizeof (readBuffer)); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
142 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
143 |
writeBuffer.type = MQ::Message::Type::END; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
144 |
writeBuffer.setStringData("ahoj"); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
145 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
146 |
mqWriter.send(&writeBuffer); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
147 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
148 |
mqReader.receive(&readBuffer); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
149 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
150 |
std::string readData(readBuffer.data, readBuffer.dataLength); |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
151 |
std::wstring_convert < codecvt_utf8<wchar_t>> convertor; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
152 |
|
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
153 |
std::wcerr << L"Zpráva „" << convertor.from_bytes(readData).c_str() << L"“ typu " << (int) readBuffer.type << L" o celkové délce " << readBuffer.getMessageLength() << L" a délce dat " << readBuffer.dataLength << L" byla přijata." << std::endl; |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
154 |
} |
81a53e7cf0ab
parallel processing: POSIX MQ helper classes + some demo code
František Kučera <franta-hg@frantovo.cz>
parents:
55
diff
changeset
|
155 |
|
52
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
156 |
throw RelpipeWriterException(L"ParallelFilesystemCommand is not yet implemented"); |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
157 |
} |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
158 |
}; |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
159 |
|
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
160 |
} |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
161 |
} |
fea625f0a096
parallel processing: prepare infrastructure
František Kučera <franta-hg@frantovo.cz>
parents:
diff
changeset
|
162 |
} |