parallel processing: put some common code in FilesystemCommand + use POSIX semaphores for STDOUT synchronization across sub-processes
--- a/src/CMakeLists.txt Tue Jan 21 00:19:56 2020 +0100
+++ b/src/CMakeLists.txt Fri Jan 24 16:53:31 2020 +0100
@@ -34,7 +34,7 @@
)
# Link libraries:
-target_link_libraries(${EXECUTABLE_FILE} ${RELPIPE_LIBS_LIBRARIES} stdc++fs rt)
+target_link_libraries(${EXECUTABLE_FILE} ${RELPIPE_LIBS_LIBRARIES} stdc++fs rt -pthread)
set_property(TARGET ${EXECUTABLE_FILE} PROPERTY CXX_STANDARD 17)
set_property(TARGET ${EXECUTABLE_FILE} PROPERTY INSTALL_RPATH_USE_LINK_PATH TRUE)
--- a/src/FilesystemCommand.h Tue Jan 21 00:19:56 2020 +0100
+++ b/src/FilesystemCommand.h Fri Jan 24 16:53:31 2020 +0100
@@ -49,7 +49,7 @@
class FilesystemCommand {
protected:
- std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
+ std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
std::map<string_t, std::shared_ptr<AttributeFinder>> createAttributeFinders() {
return {
@@ -71,6 +71,21 @@
return originalName.tellp();
}
+ string_t fetchRelationName(Configuration* configuration) {
+ return configuration->relation.empty() ? L"filesystem" : configuration->relation;
+ }
+
+ void writeHeader(RelationalWriter* writer, std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders, string_t relationName, std::vector<RequestedField>* fields) {
+ std::vector<AttributeMetadata> attributesMetadata;
+ for (RequestedField field : *fields) {
+ std::shared_ptr<AttributeFinder> finder = attributeFinders[field.group];
+ if (finder) for (AttributeMetadata m : finder->toMetadata(writer, relationName, field)) attributesMetadata.push_back(m);
+ else throw RelpipeWriterException(L"Unsupported field group: " + field.group);
+ }
+
+ writer->startRelation(relationName, attributesMetadata, true);
+ }
+
public:
virtual ~FilesystemCommand() = default;
--- a/src/ParallelFilesystemCommand.h Tue Jan 21 00:19:56 2020 +0100
+++ b/src/ParallelFilesystemCommand.h Fri Jan 24 16:53:31 2020 +0100
@@ -16,9 +16,12 @@
*/
#pragma once
+#include <mutex>
#include <mqueue.h>
#include <limits.h>
#include <ext/stdio_filebuf.h>
+#include <sys/wait.h>
+#include <semaphore.h>
#include "FilesystemCommand.h"
@@ -31,6 +34,12 @@
class MQ {
protected:
+ /**
+ * Process where this object was created.
+ * During fork() this object is copied.
+ * Using this variable we can detect the copy.
+ */
+ __pid_t originalPid;
std::string name;
mqd_t handle;
bool unlinkAfterClose;
@@ -70,13 +79,14 @@
}
};
- MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
+ MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
+ // FIXME: sometimes we got this error, especially with higher process counts like: --parallel 50
}
virtual ~MQ() {
mq_close(handle);
- if (unlinkAfterClose) mq_unlink(name.c_str());
+ if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str());
}
MQ(const MQ& other) = delete;
@@ -107,7 +117,7 @@
}
public:
- MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT, S_IRWXU, getAttributes()), true) {
+ MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) {
}
void send(const Message* m, unsigned int priority = 0) {
@@ -117,44 +127,220 @@
}
};
+/**
+ * TODO: move to a common/streamlet library
+ */
+class NamedMutex {
+private:
+ /**
+ * Process where this object was created.
+ * During fork() this object is copied.
+ * Using this variable we can detect the copy.
+ */
+ __pid_t originalPid;
+ sem_t* handle;
+ std::string name;
+ bool owner;
+public:
+
+ NamedMutex(std::string name) : originalPid(getpid()), name(name) {
+ handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
+ sem_post(handle);
+ }
+
+ ~NamedMutex() {
+ sem_close(handle);
+ if (originalPid == getpid()) sem_unlink(name.c_str());
+ }
+
+ NamedMutex(const NamedMutex&) = delete;
+ NamedMutex& operator=(const NamedMutex&) = delete;
+
+ void lock() {
+ sem_wait(handle);
+ }
+
+ void unlock() {
+ sem_post(handle);
+ }
+
+ void disown() {
+ owner = false;
+ }
+};
+
class ParallelFilesystemWorker {
+private:
+ std::string queueName;
+ NamedMutex& stdoutMutex;
+ string_t relationName;
+ std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders;
+ Configuration& configuration;
+ std::wstring_convert < codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
+public:
+
+ ParallelFilesystemWorker(std::string queueName, NamedMutex& stdoutMutex, string_t relationName, std::map<string_t, std::shared_ptr<AttributeFinder> > attributeFinders, Configuration& configuration) : queueName(queueName), stdoutMutex(stdoutMutex), relationName(relationName), attributeFinders(attributeFinders), configuration(configuration) {
+ }
+
+ void run() {
+ MQ::Message readBuffer;
+ MQReader mq(queueName.c_str());
+
+ for (bool running = true; running;) {
+ mq.receive(&readBuffer);
+ std::wstringstream debugLog;
+ if (readBuffer.type == MQ::Message::Type::END) {
+ debugLog << L"PID: " << getpid() << L" received END message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl;
+ running = false;
+ } else if (readBuffer.type == MQ::Message::Type::FILENAME) {
+ debugLog << L"PID: " << getpid() << L" received FILENAME message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl;
+ } else {
+ debugLog << L"PID: " << getpid() << L" received unexpected message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“ of type: " << ((int) readBuffer.type) << std::endl;
+ }
+
+
+ {
+ std::lock_guard lock(stdoutMutex);
+ std::wcerr << debugLog.str() << std::flush;
+ }
+ debugLog.str(L"");
+ debugLog.clear();
+ }
+
+ }
+
};
class ParallelFilesystemProcess {
+private:
+ __pid_t subPid;
+
+ ParallelFilesystemProcess(__pid_t subPid) : subPid(subPid) {
+ }
+
+ /**
+ * TODO: move to a common library (copied from the AWK module)
+ */
+ static void redirectFD(int oldfd, int newfd) {
+ int result = dup2(oldfd, newfd);
+ if (result < 0) throw ParallelFilesystemProcess::Exception(L"Unable redirect FD.");
+ }
+
+ /**
+ * TODO: move to a common library (copied from the AWK module)
+ */
+ static void closeOrThrow(int fd) {
+ int error = close(fd);
+ if (error) throw ParallelFilesystemProcess::Exception(L"Unable to close FD: " + std::to_wstring(fd) + L" from PID: " + std::to_wstring(getpid()));
+ }
+
+public:
+
+ class Exception : public relpipe::writer::RelpipeWriterException {
+ public:
+
+ Exception(std::wstring message) : relpipe::writer::RelpipeWriterException(message) {
+ }
+
+ };
+
+ static ParallelFilesystemProcess* create(std::string queueName, int outputFD, NamedMutex& stdoutMutex, string_t relationName, std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders, Configuration& configuration) {
+ __pid_t subPid = fork();
+
+ if (subPid < 0) {
+ throw SubProcess::Exception(L"Unable to fork the hash process.");
+ } else if (subPid == 0) {
+ // Child process
+ closeOrThrow(STDIN_FILENO); // strace -cf will show failed close() calls (same as number of processes)
+ if (outputFD != STDOUT_FILENO) redirectFD(outputFD, STDOUT_FILENO);
+ ParallelFilesystemWorker w(queueName, stdoutMutex, relationName, attributeFinders, configuration);
+ w.run();
+ return nullptr;
+ } else {
+ // Parent process
+ return new ParallelFilesystemProcess(subPid);
+ }
+ }
+
+ int wait() {
+ int status = -1;
+ ::waitpid(subPid, &status, 0);
+ return status;
+ }
+
+ __pid_t getPid() const {
+ return subPid;
+ }
+
};
class ParallelFilesystemCommand : public FilesystemCommand {
public:
void process(int inputFD, int outputFD, Configuration& configuration) {
- // TODO: ParallelFilesystemCommand
-
- { // TODO: demo code – remove:
- std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid());
+ __gnu_cxx::stdio_filebuf<char> inputBuffer(inputFD, std::ios::in);
+ __gnu_cxx::stdio_filebuf<char> outputBuffer(outputFD, std::ios::out);
+ std::istream input(&inputBuffer);
+ std::ostream output(&outputBuffer);
- MQWriter mqWriter(queueName.c_str());
- MQReader mqReader(queueName.c_str());
-
- MQ::Message writeBuffer;
- MQ::Message readBuffer;
+ // Write relation header:
+ string_t relationName = fetchRelationName(&configuration);
+ std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders = createAttributeFinders();
+ std::shared_ptr<RelationalWriter> writer(Factory::create(output));
+ writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields);
+ output.flush();
- // ::memset(&writeBuffer, 0, sizeof (writeBuffer));
- // ::memset(&readBuffer, 0, sizeof (readBuffer));
+ // Create queue:
+ std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid());
+ MQWriter mq(queueName.c_str());
+ MQ::Message writeBuffer;
- writeBuffer.type = MQ::Message::Type::END;
- writeBuffer.setStringData("ahoj");
+ // Create lock for STDOUT synchronization:
+ NamedMutex stdoutMutex(queueName);
- mqWriter.send(&writeBuffer);
-
- mqReader.receive(&readBuffer);
-
- std::string readData(readBuffer.data, readBuffer.dataLength);
- std::wstring_convert < codecvt_utf8<wchar_t>> convertor;
-
- std::wcerr << L"Zpráva „" << convertor.from_bytes(readData).c_str() << L"“ typu " << (int) readBuffer.type << L" o celkové délce " << readBuffer.getMessageLength() << L" a délce dat " << readBuffer.dataLength << L" byla přijata." << std::endl;
+ // Start workers:
+ std::vector<std::shared_ptr < ParallelFilesystemProcess>> workerProcesses;
+ bool inMainProcess = true;
+ for (int i = 0; i < configuration.parallelism; i++) {
+ std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, stdoutMutex, relationName, createAttributeFinders(), configuration));
+ if (workerProcess) {
+ workerProcesses.push_back(workerProcess);
+ } else {
+ inMainProcess = false;
+ break;
+ }
}
- throw RelpipeWriterException(L"ParallelFilesystemCommand is not yet implemented");
+ if (inMainProcess) {
+ // Distribute file names to the workers:
+ for (std::stringstream originalName; readNext(input, originalName); reset(originalName)) {
+ writeBuffer.type = MQ::Message::Type::FILENAME;
+ writeBuffer.setStringData(originalName.str());
+ mq.send(&writeBuffer);
+ }
+
+ // Tell workers that everything is done:
+ writeBuffer.type = MQ::Message::Type::END;
+ writeBuffer.setStringData("");
+ for (int i = 0; i < configuration.parallelism; i++) mq.send(&writeBuffer);
+
+ // Wait for workers exit:
+ std::map<__pid_t, int> failedProcesses;
+ for (std::shared_ptr<ParallelFilesystemProcess> p : workerProcesses) {
+ int result = p->wait();
+ if (result) failedProcesses[p->getPid()] = result;
+ }
+
+ if (failedProcesses.size()) {
+ std::wstringstream errorMessage;
+ errorMessage << L"One or more processes failed: ";
+ for (auto failed : failedProcesses) errorMessage << failed.first << L":" << failed.second << L", ";
+ throw ParallelFilesystemProcess::Exception(errorMessage.str());
+ }
+
+ } else {
+ // we are in a worker process → do nothing, finished
+ }
}
};
--- a/src/PlainFilesystemCommand.h Tue Jan 21 00:19:56 2020 +0100
+++ b/src/PlainFilesystemCommand.h Fri Jan 24 16:53:31 2020 +0100
@@ -28,9 +28,6 @@
using namespace relpipe::writer;
class PlainFilesystemCommand : public FilesystemCommand {
-private:
- std::map<string_t, std::shared_ptr<AttributeFinder>> attributeFinders = createAttributeFinders();
-
public:
void process(int inputFD, int outputFD, Configuration& configuration) {
@@ -40,18 +37,11 @@
std::ostream output(&outputBuffer);
std::shared_ptr<RelationalWriter> writer(Factory::create(output));
-
- string_t relationName = configuration.relation.empty() ? L"filesystem" : configuration.relation;
+ std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders = createAttributeFinders();
- std::vector<AttributeMetadata> attributesMetadata;
- for (RequestedField field : configuration.fields) {
- std::shared_ptr<AttributeFinder> finder = attributeFinders[field.group];
- if (finder) for (AttributeMetadata m : finder->toMetadata(writer.get(), relationName, field)) attributesMetadata.push_back(m);
- else throw RelpipeWriterException(L"Unsupported field group: " + field.group);
- }
-
- writer->startRelation(relationName, attributesMetadata, true);
-
+ string_t relationName = fetchRelationName(&configuration);
+ writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields);
+
for (std::stringstream originalName; readNext(input, originalName); reset(originalName)) {
fs::path file(originalName.str().empty() ? "." : originalName.str()); // interpret empty string as current directory (e.g. result of: find -printf '%P\0')