44 mqd_t handle; |
44 mqd_t handle; |
45 bool unlinkAfterClose; |
45 bool unlinkAfterClose; |
46 static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable |
46 static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable |
47 static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count |
47 static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count |
48 |
48 |
|
49 template<typename... Args> static mqd_t mqOpen(const char *__name, int __oflag, Args... args) { |
|
50 mqd_t handle = mq_open(__name, __oflag, args...); |
|
51 if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ."); |
|
52 else return handle; |
|
53 } |
|
54 |
|
55 /** |
|
56 * @param name |
|
57 * @param handle do not call mq_open() directly, use MQ:mqOpen() instead. |
|
58 * @param unlinkAfterClose |
|
59 */ |
|
60 MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) { |
|
61 } |
|
62 |
49 public: |
63 public: |
50 |
64 |
51 class Message { |
65 class Message { |
52 public: |
66 public: |
53 |
67 |
77 ::memcpy(data, s.c_str(), s.size()); |
91 ::memcpy(data, s.c_str(), s.size()); |
78 dataLength = s.size(); |
92 dataLength = s.size(); |
79 } |
93 } |
80 }; |
94 }; |
81 |
95 |
82 MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) { |
|
83 if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ."); |
|
84 // TODO: factory method |
|
85 } |
|
86 |
|
87 virtual ~MQ() { |
96 virtual ~MQ() { |
88 mq_close(handle); |
97 mq_close(handle); |
89 if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str()); |
98 if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str()); |
90 } |
99 } |
91 |
100 |
94 }; |
103 }; |
95 |
104 |
96 class MQReader : public MQ { |
105 class MQReader : public MQ { |
97 public: |
106 public: |
98 |
107 |
99 MQReader(std::string name) : MQ(name, mq_open(name.c_str(), O_RDONLY)) { |
108 MQReader(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDONLY)) { |
100 } |
109 } |
101 |
110 |
102 void receive(Message* m) { |
111 void receive(Message* m) { |
103 int result = mq_receive(handle, (char*) m, sizeof (*m), nullptr); |
112 int result = mq_receive(handle, (char*) m, sizeof (*m), nullptr); |
104 if (result < 0) throw RelpipeWriterException(L"Unable to receive POSIX MQ message."); |
113 if (result < 0) throw RelpipeWriterException(L"Unable to receive POSIX MQ message."); |
115 attributes.mq_msgsize = sizeof (Message); |
124 attributes.mq_msgsize = sizeof (Message); |
116 return &attributes; |
125 return &attributes; |
117 } |
126 } |
118 public: |
127 public: |
119 |
128 |
120 MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) { |
129 MQWriter(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) { |
121 } |
130 } |
122 |
131 |
123 void send(const Message* m, unsigned int priority = 0) { |
132 void send(const Message* m, unsigned int priority = 0) { |
124 m->checkDataLength(); |
133 m->checkDataLength(); |
125 int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority); |
134 int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority); |
138 * Using this variable we can detect the copy. |
147 * Using this variable we can detect the copy. |
139 */ |
148 */ |
140 __pid_t originalPid; |
149 __pid_t originalPid; |
141 sem_t* handle; |
150 sem_t* handle; |
142 std::string name; |
151 std::string name; |
143 public: |
152 |
144 |
153 NamedMutex() { |
145 NamedMutex(std::string name) : originalPid(getpid()), name(name) { |
154 } |
146 handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); |
155 |
147 sem_post(handle); |
156 public: |
148 // TODO: factory method, check errors |
157 |
|
158 static NamedMutex* create(std::string name) { |
|
159 sem_t* handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); |
|
160 if (handle == SEM_FAILED) throw RelpipeWriterException(L"Unable to open POSIX semaphore."); |
|
161 |
|
162 NamedMutex* result = new NamedMutex(); |
|
163 result->name = name; |
|
164 result->handle = handle; |
|
165 result->originalPid = getpid(); |
|
166 result->unlock(); |
|
167 |
|
168 return result; |
149 } |
169 } |
150 |
170 |
151 ~NamedMutex() { |
171 ~NamedMutex() { |
152 sem_close(handle); |
172 sem_close(handle); |
153 if (originalPid == getpid()) sem_unlink(name.c_str()); |
173 if (originalPid == getpid()) sem_unlink(name.c_str()); |
155 |
175 |
156 NamedMutex(const NamedMutex&) = delete; |
176 NamedMutex(const NamedMutex&) = delete; |
157 NamedMutex& operator=(const NamedMutex&) = delete; |
177 NamedMutex& operator=(const NamedMutex&) = delete; |
158 |
178 |
159 void lock() { |
179 void lock() { |
160 sem_wait(handle); |
180 int error = sem_wait(handle); |
|
181 if (error) throw RelpipeWriterException(L"Unable to lock POSIX semaphore."); |
161 } |
182 } |
162 |
183 |
163 void unlock() { |
184 void unlock() { |
164 sem_post(handle); |
185 int error = sem_post(handle); |
|
186 if (error) throw RelpipeWriterException(L"Unable to unlock POSIX semaphore."); |
165 } |
187 } |
166 }; |
188 }; |
167 |
189 |
168 class ParallelFilesystemWorker : FilesystemCommand { |
190 class ParallelFilesystemWorker : FilesystemCommand { |
169 private: |
191 private: |
298 std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid()); |
320 std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid()); |
299 MQWriter mq(queueName.c_str()); |
321 MQWriter mq(queueName.c_str()); |
300 MQ::Message writeBuffer; |
322 MQ::Message writeBuffer; |
301 |
323 |
302 // Create lock for STDOUT synchronization: |
324 // Create lock for STDOUT synchronization: |
303 NamedMutex stdoutMutex(queueName); |
325 std::unique_ptr<NamedMutex> stdoutMutex(NamedMutex::create(queueName)); |
304 |
326 |
305 // Start workers: |
327 // Start workers: |
306 std::vector<std::shared_ptr < ParallelFilesystemProcess>> workerProcesses; |
328 std::vector<std::shared_ptr < ParallelFilesystemProcess>> workerProcesses; |
307 bool inMainProcess = true; |
329 bool inMainProcess = true; |
308 for (int i = 0; i < configuration.parallelism; i++) { |
330 for (int i = 0; i < configuration.parallelism; i++) { |
309 std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, stdoutMutex, relationName, createAttributeFinders(), configuration)); |
331 std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, *stdoutMutex, relationName, createAttributeFinders(), configuration)); |
310 if (workerProcess) { |
332 if (workerProcess) { |
311 workerProcesses.push_back(workerProcess); |
333 workerProcesses.push_back(workerProcess); |
312 } else { |
334 } else { |
313 inMainProcess = false; |
335 inMainProcess = false; |
314 break; |
336 break; |