79 } |
79 } |
80 }; |
80 }; |
81 |
81 |
82 MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) { |
82 MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) { |
83 if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ."); |
83 if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ."); |
84 // FIXME: sometimes we got this error, especially with higher process counts like: --parallel 50 |
84 // TODO: factory method |
85 } |
85 } |
86 |
86 |
87 virtual ~MQ() { |
87 virtual ~MQ() { |
88 mq_close(handle); |
88 mq_close(handle); |
89 if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str()); |
89 if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str()); |
138 * Using this variable we can detect the copy. |
138 * Using this variable we can detect the copy. |
139 */ |
139 */ |
140 __pid_t originalPid; |
140 __pid_t originalPid; |
141 sem_t* handle; |
141 sem_t* handle; |
142 std::string name; |
142 std::string name; |
143 bool owner; |
|
144 public: |
143 public: |
145 |
144 |
146 NamedMutex(std::string name) : originalPid(getpid()), name(name) { |
145 NamedMutex(std::string name) : originalPid(getpid()), name(name) { |
147 handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); |
146 handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); |
148 sem_post(handle); |
147 sem_post(handle); |
|
148 // TODO: factory method, check errors |
149 } |
149 } |
150 |
150 |
151 ~NamedMutex() { |
151 ~NamedMutex() { |
152 sem_close(handle); |
152 sem_close(handle); |
153 if (originalPid == getpid()) sem_unlink(name.c_str()); |
153 if (originalPid == getpid()) sem_unlink(name.c_str()); |
161 } |
161 } |
162 |
162 |
163 void unlock() { |
163 void unlock() { |
164 sem_post(handle); |
164 sem_post(handle); |
165 } |
165 } |
166 |
166 }; |
167 void disown() { |
167 |
168 owner = false; |
168 class ParallelFilesystemWorker : FilesystemCommand { |
169 } |
|
170 }; |
|
171 |
|
172 class ParallelFilesystemWorker { |
|
173 private: |
169 private: |
174 std::string queueName; |
170 std::string queueName; |
175 NamedMutex& stdoutMutex; |
171 NamedMutex& stdoutMutex; |
176 string_t relationName; |
172 string_t relationName; |
177 std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders; |
173 std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders; |
184 |
180 |
185 void run() { |
181 void run() { |
186 MQ::Message readBuffer; |
182 MQ::Message readBuffer; |
187 MQReader mq(queueName.c_str()); |
183 MQReader mq(queueName.c_str()); |
188 |
184 |
189 for (bool running = true; running;) { |
185 std::stringstream writeBuffer; |
|
186 std::shared_ptr<RelationalWriter> writer(Factory::create(writeBuffer)); |
|
187 |
|
188 writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields, false); |
|
189 |
|
190 while (true) { |
190 mq.receive(&readBuffer); |
191 mq.receive(&readBuffer); |
191 std::wstringstream debugLog; |
192 if (readBuffer.type == MQ::Message::Type::FILENAME) { |
192 if (readBuffer.type == MQ::Message::Type::END) { |
193 std::stringstream originalName(readBuffer.getStringData()); |
193 debugLog << L"PID: " << getpid() << L" received END message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl; |
194 processSingleFile(writer, originalName, attributeFinders, configuration, relationName); |
194 running = false; |
195 |
195 } else if (readBuffer.type == MQ::Message::Type::FILENAME) { |
196 { |
196 debugLog << L"PID: " << getpid() << L" received FILENAME message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl; |
197 std::lock_guard lock(stdoutMutex); |
|
198 std::cout << writeBuffer.rdbuf() << std::flush; |
|
199 // TODO: optional (configurable) buffering: write multiple records in a single batch |
|
200 } |
|
201 writeBuffer.str(""); |
|
202 writeBuffer.clear(); |
|
203 } else if (readBuffer.type == MQ::Message::Type::END) { |
|
204 break; |
197 } else { |
205 } else { |
198 debugLog << L"PID: " << getpid() << L" received unexpected message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“ of type: " << ((int) readBuffer.type) << std::endl; |
206 throw RelpipeWriterException(L"ParallelFilesystemWorker recieved message of unsupported type: " + std::to_wstring((int) readBuffer.type)); // TODO: better exception |
199 } |
207 } |
200 |
208 } |
201 |
209 |
202 { |
210 } |
203 std::lock_guard lock(stdoutMutex); |
211 |
204 std::wcerr << debugLog.str() << std::flush; |
212 void process(int inputFD, int outputFD, Configuration& configuration) override { |
205 } |
213 // TODO: refactoring, not used |
206 debugLog.str(L""); |
|
207 debugLog.clear(); |
|
208 } |
|
209 |
|
210 } |
214 } |
211 |
215 |
212 }; |
216 }; |
213 |
217 |
214 class ParallelFilesystemProcess { |
218 class ParallelFilesystemProcess { |