--- a/nbproject/configurations.xml Fri Jan 24 21:05:10 2020 +0100
+++ b/nbproject/configurations.xml Sat Jan 25 16:37:20 2020 +0100
@@ -69,8 +69,6 @@
</toolsSet>
<flagsDictionary>
<element flagsID="0" commonFlags="-fsanitize=address -std=gnu++1z"/>
- <element flagsID="1"
- commonFlags="-mtune=generic -march=x86-64 -std=gnu++17 -fsanitize=address -fstack-protector-strong"/>
</flagsDictionary>
<codeAssistance>
</codeAssistance>
@@ -80,6 +78,13 @@
<buildCommand>${MAKE} -f Makefile</buildCommand>
<cleanCommand>${MAKE} -f Makefile clean</cleanCommand>
<executablePath>build/Debug/src/relpipe-in-filesystem</executablePath>
+ <ccTool>
+ <incDir>
+ <pElem>../relpipe-lib-writer.cpp/include</pElem>
+ <pElem>../relpipe-lib-cli.cpp/include</pElem>
+ <pElem>build/Debug/src</pElem>
+ </incDir>
+ </ccTool>
</makeTool>
<preBuild>
<preBuildCommandWorkingDir>build/Debug</preBuildCommandWorkingDir>
@@ -88,22 +93,11 @@
</preBuild>
</makefileType>
<item path="src/SubProcess.cpp" ex="false" tool="1" flavor2="11">
- <ccTool flags="1">
- <incDir>
- <pElem>../relpipe-lib-writer.cpp/include/relpipe/writer</pElem>
- <pElem>src</pElem>
- <pElem>../relpipe-lib-writer.cpp/include</pElem>
- <pElem>build/Debug/src</pElem>
- </incDir>
+ <ccTool flags="0">
</ccTool>
</item>
<item path="src/relpipe-in-filesystem.cpp" ex="false" tool="1" flavor2="11">
<ccTool flags="0">
- <incDir>
- <pElem>../relpipe-lib-writer.cpp/include</pElem>
- <pElem>../relpipe-lib-cli.cpp/include</pElem>
- <pElem>build/Debug/src</pElem>
- </incDir>
</ccTool>
</item>
</conf>
--- a/src/FileAttributeFinder.h Fri Jan 24 21:05:10 2020 +0100
+++ b/src/FileAttributeFinder.h Sat Jan 25 16:37:20 2020 +0100
@@ -55,10 +55,9 @@
}
void fetchOwner(const fs::path& file, string_t& owner, string_t& group) {
- // TODO: throw exception on error
- // TODO: get user and group in C++ way?
struct stat info;
- stat(file.c_str(), &info);
+ int result = ::stat(file.c_str(), &info);
+ if (result) throw RelpipeWriterException(L"Unable to stat() file „" + file.wstring() + L"“ in fetchOwner(). Result: " + std::to_wstring(result));
/**
* The return value may point to a static area, and may be
* overwritten by subsequent calls to getpwent(3), getpw‐
--- a/src/ParallelFilesystemCommand.h Fri Jan 24 21:05:10 2020 +0100
+++ b/src/ParallelFilesystemCommand.h Sat Jan 25 16:37:20 2020 +0100
@@ -46,6 +46,20 @@
static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable
static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count
+ template<typename... Args> static mqd_t mqOpen(const char *__name, int __oflag, Args... args) {
+ mqd_t handle = mq_open(__name, __oflag, args...);
+ if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
+ else return handle;
+ }
+
+ /**
+ * @param name
+ * @param handle do not call mq_open() directly, use MQ:mqOpen() instead.
+ * @param unlinkAfterClose
+ */
+ MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
+ }
+
public:
class Message {
@@ -79,11 +93,6 @@
}
};
- MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
- if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
- // TODO: factory method
- }
-
virtual ~MQ() {
mq_close(handle);
if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str());
@@ -96,7 +105,7 @@
class MQReader : public MQ {
public:
- MQReader(std::string name) : MQ(name, mq_open(name.c_str(), O_RDONLY)) {
+ MQReader(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDONLY)) {
}
void receive(Message* m) {
@@ -117,7 +126,7 @@
}
public:
- MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) {
+ MQWriter(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) {
}
void send(const Message* m, unsigned int priority = 0) {
@@ -140,12 +149,23 @@
__pid_t originalPid;
sem_t* handle;
std::string name;
+
+ NamedMutex() {
+ }
+
public:
- NamedMutex(std::string name) : originalPid(getpid()), name(name) {
- handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
- sem_post(handle);
- // TODO: factory method, check errors
+ static NamedMutex* create(std::string name) {
+ sem_t* handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
+ if (handle == SEM_FAILED) throw RelpipeWriterException(L"Unable to open POSIX semaphore.");
+
+ NamedMutex* result = new NamedMutex();
+ result->name = name;
+ result->handle = handle;
+ result->originalPid = getpid();
+ result->unlock();
+
+ return result;
}
~NamedMutex() {
@@ -157,11 +177,13 @@
NamedMutex& operator=(const NamedMutex&) = delete;
void lock() {
- sem_wait(handle);
+ int error = sem_wait(handle);
+ if (error) throw RelpipeWriterException(L"Unable to lock POSIX semaphore.");
}
void unlock() {
- sem_post(handle);
+ int error = sem_post(handle);
+ if (error) throw RelpipeWriterException(L"Unable to unlock POSIX semaphore.");
}
};
@@ -210,7 +232,7 @@
}
void process(int inputFD, int outputFD, Configuration& configuration) override {
- // TODO: refactoring, not used
+ // FIXME: refactoring, not used
}
};
@@ -300,13 +322,13 @@
MQ::Message writeBuffer;
// Create lock for STDOUT synchronization:
- NamedMutex stdoutMutex(queueName);
+ std::unique_ptr<NamedMutex> stdoutMutex(NamedMutex::create(queueName));
// Start workers:
std::vector<std::shared_ptr < ParallelFilesystemProcess>> workerProcesses;
bool inMainProcess = true;
for (int i = 0; i < configuration.parallelism; i++) {
- std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, stdoutMutex, relationName, createAttributeFinders(), configuration));
+ std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, *stdoutMutex, relationName, createAttributeFinders(), configuration));
if (workerProcess) {
workerProcesses.push_back(workerProcess);
} else {
--- a/src/StreamletAttributeFinder.h Fri Jan 24 21:05:10 2020 +0100
+++ b/src/StreamletAttributeFinder.h Sat Jan 25 16:37:20 2020 +0100
@@ -95,7 +95,6 @@
}
virtual void writeFieldOfExistingFile(RelationalWriter* writer, const string_t& relationName, const RequestedField & field) override {
- // TODO: paralelize also over records → fork multiple processes and distribute records across them; then collect results (with a lock)
if (field.group == RequestedField::GROUP_STREAMLET) {
for (auto metadata : cachedMetadata[field.id]) {
SubProcess::Message m = subProcesses[field.id]->read();
@@ -105,7 +104,7 @@
SubProcess::Message m = subProcesses[field.id]->read();
if (m.code != StreamletMsg::WAITING_FOR_INPUT_ATTRIBUTES) throw RelpipeWriterException(L"Protocol violation from exec sub-process. Expected WAITING_FOR_INPUT_ATTRIBUTES but got: " + m.toString());
- // TODO: generic protocol violation error messages / method for checking responses
+ // FIXME: generic protocol violation error messages / method for checking responses
}
}
--- a/src/SubProcess.h Fri Jan 24 21:05:10 2020 +0100
+++ b/src/SubProcess.h Sat Jan 25 16:37:20 2020 +0100
@@ -25,7 +25,7 @@
#include <relpipe/writer/RelpipeWriterException.h>
/**
- * TODO: move to a separate library → can be used later also in relpipe-tr-exec
+ * TODO: move to a separate library → can be used later also in relpipe-tr-streamlet
*/
class SubProcess {
public: