diff -r 4679f67a8324 -r 7471529c0d11 src/ParallelFilesystemCommand.h --- a/src/ParallelFilesystemCommand.h Fri Jan 24 16:53:31 2020 +0100 +++ b/src/ParallelFilesystemCommand.h Fri Jan 24 21:05:10 2020 +0100 @@ -81,7 +81,7 @@ MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) { if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ."); - // FIXME: sometimes we got this error, especially with higher process counts like: --parallel 50 + // TODO: factory method } virtual ~MQ() { @@ -140,12 +140,12 @@ __pid_t originalPid; sem_t* handle; std::string name; - bool owner; public: NamedMutex(std::string name) : originalPid(getpid()), name(name) { handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); sem_post(handle); + // TODO: factory method, check errors } ~NamedMutex() { @@ -163,13 +163,9 @@ void unlock() { sem_post(handle); } - - void disown() { - owner = false; - } }; -class ParallelFilesystemWorker { +class ParallelFilesystemWorker : FilesystemCommand { private: std::string queueName; NamedMutex& stdoutMutex; @@ -186,29 +182,37 @@ MQ::Message readBuffer; MQReader mq(queueName.c_str()); - for (bool running = true; running;) { + std::stringstream writeBuffer; + std::shared_ptr writer(Factory::create(writeBuffer)); + + writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields, false); + + while (true) { mq.receive(&readBuffer); - std::wstringstream debugLog; - if (readBuffer.type == MQ::Message::Type::END) { - debugLog << L"PID: " << getpid() << L" received END message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl; - running = false; - } else if (readBuffer.type == MQ::Message::Type::FILENAME) { - debugLog << L"PID: " << getpid() << L" received FILENAME message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl; + if (readBuffer.type == MQ::Message::Type::FILENAME) { + std::stringstream originalName(readBuffer.getStringData()); + processSingleFile(writer, originalName, attributeFinders, configuration, relationName); + + { + std::lock_guard lock(stdoutMutex); + std::cout << writeBuffer.rdbuf() << std::flush; + // TODO: optional (configurable) buffering: write multiple records in a single batch + } + writeBuffer.str(""); + writeBuffer.clear(); + } else if (readBuffer.type == MQ::Message::Type::END) { + break; } else { - debugLog << L"PID: " << getpid() << L" received unexpected message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“ of type: " << ((int) readBuffer.type) << std::endl; + throw RelpipeWriterException(L"ParallelFilesystemWorker recieved message of unsupported type: " + std::to_wstring((int) readBuffer.type)); // TODO: better exception } - - - { - std::lock_guard lock(stdoutMutex); - std::wcerr << debugLog.str() << std::flush; - } - debugLog.str(L""); - debugLog.clear(); } } + void process(int inputFD, int outputFD, Configuration& configuration) override { + // TODO: refactoring, not used + } + }; class ParallelFilesystemProcess {