# HG changeset patch # User František Kučera # Date 1579896310 -3600 # Node ID 7471529c0d113098013d640917372acea880100f # Parent 4679f67a832492e623f5e23ed84f78bcf42792d7 parallel processing: first working version diff -r 4679f67a8324 -r 7471529c0d11 nbproject/configurations.xml --- a/nbproject/configurations.xml Fri Jan 24 16:53:31 2020 +0100 +++ b/nbproject/configurations.xml Fri Jan 24 21:05:10 2020 +0100 @@ -69,6 +69,8 @@ + @@ -78,13 +80,6 @@ ${MAKE} -f Makefile ${MAKE} -f Makefile clean build/Debug/src/relpipe-in-filesystem - - - ../relpipe-lib-writer.cpp/include - ../relpipe-lib-cli.cpp/include - build/Debug/src - - build/Debug @@ -93,11 +88,22 @@ - + + + ../relpipe-lib-writer.cpp/include/relpipe/writer + src + ../relpipe-lib-writer.cpp/include + build/Debug/src + + + ../relpipe-lib-writer.cpp/include + ../relpipe-lib-cli.cpp/include + build/Debug/src + diff -r 4679f67a8324 -r 7471529c0d11 src/FilesystemCommand.h --- a/src/FilesystemCommand.h Fri Jan 24 16:53:31 2020 +0100 +++ b/src/FilesystemCommand.h Fri Jan 24 21:05:10 2020 +0100 @@ -75,7 +75,7 @@ return configuration->relation.empty() ? L"filesystem" : configuration->relation; } - void writeHeader(RelationalWriter* writer, std::map> attributeFinders, string_t relationName, std::vector* fields) { + void writeHeader(RelationalWriter* writer, std::map> attributeFinders, string_t relationName, std::vector* fields, bool writeHeader = true) { std::vector attributesMetadata; for (RequestedField field : *fields) { std::shared_ptr finder = attributeFinders[field.group]; @@ -83,7 +83,27 @@ else throw RelpipeWriterException(L"Unsupported field group: " + field.group); } - writer->startRelation(relationName, attributesMetadata, true); + writer->startRelation(relationName, attributesMetadata, writeHeader); + } + + void processSingleFile(std::shared_ptr writer, std::stringstream& originalName, std::map>&attributeFinders, Configuration& configuration, string_t relationName) { + fs::path file(originalName.str().empty() ? "." : originalName.str()); // interpret empty string as current directory (e.g. result of: find -printf '%P\0') + bool exists = false; + + try { + exists = fs::exists(file); + } catch (const fs::filesystem_error& e) { + // we probably do not have permissions to given directory → pretend that the file does not exist + } + + for (auto& finder : attributeFinders) finder.second->startFile(file, originalName.str(), exists); + + for (RequestedField field : configuration.fields) { + std::shared_ptr finder = attributeFinders[field.group]; // should not be nullptr, because already checked while writing the relation metadata + finder->writeField(writer.get(), relationName, field); + } + + for (auto& finder : attributeFinders) finder.second->endFile(); } public: diff -r 4679f67a8324 -r 7471529c0d11 src/ParallelFilesystemCommand.h --- a/src/ParallelFilesystemCommand.h Fri Jan 24 16:53:31 2020 +0100 +++ b/src/ParallelFilesystemCommand.h Fri Jan 24 21:05:10 2020 +0100 @@ -81,7 +81,7 @@ MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) { if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ."); - // FIXME: sometimes we got this error, especially with higher process counts like: --parallel 50 + // TODO: factory method } virtual ~MQ() { @@ -140,12 +140,12 @@ __pid_t originalPid; sem_t* handle; std::string name; - bool owner; public: NamedMutex(std::string name) : originalPid(getpid()), name(name) { handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); sem_post(handle); + // TODO: factory method, check errors } ~NamedMutex() { @@ -163,13 +163,9 @@ void unlock() { sem_post(handle); } - - void disown() { - owner = false; - } }; -class ParallelFilesystemWorker { +class ParallelFilesystemWorker : FilesystemCommand { private: std::string queueName; NamedMutex& stdoutMutex; @@ -186,29 +182,37 @@ MQ::Message readBuffer; MQReader mq(queueName.c_str()); - for (bool running = true; running;) { + std::stringstream writeBuffer; + std::shared_ptr writer(Factory::create(writeBuffer)); + + writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields, false); + + while (true) { mq.receive(&readBuffer); - std::wstringstream debugLog; - if (readBuffer.type == MQ::Message::Type::END) { - debugLog << L"PID: " << getpid() << L" received END message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl; - running = false; - } else if (readBuffer.type == MQ::Message::Type::FILENAME) { - debugLog << L"PID: " << getpid() << L" received FILENAME message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl; + if (readBuffer.type == MQ::Message::Type::FILENAME) { + std::stringstream originalName(readBuffer.getStringData()); + processSingleFile(writer, originalName, attributeFinders, configuration, relationName); + + { + std::lock_guard lock(stdoutMutex); + std::cout << writeBuffer.rdbuf() << std::flush; + // TODO: optional (configurable) buffering: write multiple records in a single batch + } + writeBuffer.str(""); + writeBuffer.clear(); + } else if (readBuffer.type == MQ::Message::Type::END) { + break; } else { - debugLog << L"PID: " << getpid() << L" received unexpected message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“ of type: " << ((int) readBuffer.type) << std::endl; + throw RelpipeWriterException(L"ParallelFilesystemWorker recieved message of unsupported type: " + std::to_wstring((int) readBuffer.type)); // TODO: better exception } - - - { - std::lock_guard lock(stdoutMutex); - std::wcerr << debugLog.str() << std::flush; - } - debugLog.str(L""); - debugLog.clear(); } } + void process(int inputFD, int outputFD, Configuration& configuration) override { + // TODO: refactoring, not used + } + }; class ParallelFilesystemProcess { diff -r 4679f67a8324 -r 7471529c0d11 src/PlainFilesystemCommand.h --- a/src/PlainFilesystemCommand.h Fri Jan 24 16:53:31 2020 +0100 +++ b/src/PlainFilesystemCommand.h Fri Jan 24 21:05:10 2020 +0100 @@ -41,26 +41,10 @@ string_t relationName = fetchRelationName(&configuration); writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields); - + for (std::stringstream originalName; readNext(input, originalName); reset(originalName)) { - fs::path file(originalName.str().empty() ? "." : originalName.str()); // interpret empty string as current directory (e.g. result of: find -printf '%P\0') - bool exists = false; - - try { - exists = fs::exists(file); - } catch (const fs::filesystem_error& e) { - // we probably do not have permissions to given directory → pretend that the file does not exist - } - - for (auto& finder : attributeFinders) finder.second->startFile(file, originalName.str(), exists); - - for (RequestedField field : configuration.fields) { - std::shared_ptr finder = attributeFinders[field.group]; // should not be nullptr, because already checked while writing the relation metadata - finder->writeField(writer.get(), relationName, field); - } - - for (auto& finder : attributeFinders) finder.second->endFile(); + processSingleFile(writer, originalName, attributeFinders, configuration, relationName); } } };