--- a/nbproject/configurations.xml Fri Jan 24 16:53:31 2020 +0100
+++ b/nbproject/configurations.xml Fri Jan 24 21:05:10 2020 +0100
@@ -69,6 +69,8 @@
</toolsSet>
<flagsDictionary>
<element flagsID="0" commonFlags="-fsanitize=address -std=gnu++1z"/>
+ <element flagsID="1"
+ commonFlags="-mtune=generic -march=x86-64 -std=gnu++17 -fsanitize=address -fstack-protector-strong"/>
</flagsDictionary>
<codeAssistance>
</codeAssistance>
@@ -78,13 +80,6 @@
<buildCommand>${MAKE} -f Makefile</buildCommand>
<cleanCommand>${MAKE} -f Makefile clean</cleanCommand>
<executablePath>build/Debug/src/relpipe-in-filesystem</executablePath>
- <ccTool>
- <incDir>
- <pElem>../relpipe-lib-writer.cpp/include</pElem>
- <pElem>../relpipe-lib-cli.cpp/include</pElem>
- <pElem>build/Debug/src</pElem>
- </incDir>
- </ccTool>
</makeTool>
<preBuild>
<preBuildCommandWorkingDir>build/Debug</preBuildCommandWorkingDir>
@@ -93,11 +88,22 @@
</preBuild>
</makefileType>
<item path="src/SubProcess.cpp" ex="false" tool="1" flavor2="11">
- <ccTool flags="0">
+ <ccTool flags="1">
+ <incDir>
+ <pElem>../relpipe-lib-writer.cpp/include/relpipe/writer</pElem>
+ <pElem>src</pElem>
+ <pElem>../relpipe-lib-writer.cpp/include</pElem>
+ <pElem>build/Debug/src</pElem>
+ </incDir>
</ccTool>
</item>
<item path="src/relpipe-in-filesystem.cpp" ex="false" tool="1" flavor2="11">
<ccTool flags="0">
+ <incDir>
+ <pElem>../relpipe-lib-writer.cpp/include</pElem>
+ <pElem>../relpipe-lib-cli.cpp/include</pElem>
+ <pElem>build/Debug/src</pElem>
+ </incDir>
</ccTool>
</item>
</conf>
--- a/src/FilesystemCommand.h Fri Jan 24 16:53:31 2020 +0100
+++ b/src/FilesystemCommand.h Fri Jan 24 21:05:10 2020 +0100
@@ -75,7 +75,7 @@
return configuration->relation.empty() ? L"filesystem" : configuration->relation;
}
- void writeHeader(RelationalWriter* writer, std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders, string_t relationName, std::vector<RequestedField>* fields) {
+ void writeHeader(RelationalWriter* writer, std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders, string_t relationName, std::vector<RequestedField>* fields, bool writeHeader = true) {
std::vector<AttributeMetadata> attributesMetadata;
for (RequestedField field : *fields) {
std::shared_ptr<AttributeFinder> finder = attributeFinders[field.group];
@@ -83,7 +83,27 @@
else throw RelpipeWriterException(L"Unsupported field group: " + field.group);
}
- writer->startRelation(relationName, attributesMetadata, true);
+ writer->startRelation(relationName, attributesMetadata, writeHeader);
+ }
+
+ void processSingleFile(std::shared_ptr<RelationalWriter> writer, std::stringstream& originalName, std::map<string_t, std::shared_ptr < AttributeFinder>>&attributeFinders, Configuration& configuration, string_t relationName) {
+ fs::path file(originalName.str().empty() ? "." : originalName.str()); // interpret empty string as current directory (e.g. result of: find -printf '%P\0')
+ bool exists = false;
+
+ try {
+ exists = fs::exists(file);
+ } catch (const fs::filesystem_error& e) {
+ // we probably do not have permissions to given directory → pretend that the file does not exist
+ }
+
+ for (auto& finder : attributeFinders) finder.second->startFile(file, originalName.str(), exists);
+
+ for (RequestedField field : configuration.fields) {
+ std::shared_ptr<AttributeFinder> finder = attributeFinders[field.group]; // should not be nullptr, because already checked while writing the relation metadata
+ finder->writeField(writer.get(), relationName, field);
+ }
+
+ for (auto& finder : attributeFinders) finder.second->endFile();
}
public:
--- a/src/ParallelFilesystemCommand.h Fri Jan 24 16:53:31 2020 +0100
+++ b/src/ParallelFilesystemCommand.h Fri Jan 24 21:05:10 2020 +0100
@@ -81,7 +81,7 @@
MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
- // FIXME: sometimes we got this error, especially with higher process counts like: --parallel 50
+ // TODO: factory method
}
virtual ~MQ() {
@@ -140,12 +140,12 @@
__pid_t originalPid;
sem_t* handle;
std::string name;
- bool owner;
public:
NamedMutex(std::string name) : originalPid(getpid()), name(name) {
handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
sem_post(handle);
+ // TODO: factory method, check errors
}
~NamedMutex() {
@@ -163,13 +163,9 @@
void unlock() {
sem_post(handle);
}
-
- void disown() {
- owner = false;
- }
};
-class ParallelFilesystemWorker {
+class ParallelFilesystemWorker : FilesystemCommand {
private:
std::string queueName;
NamedMutex& stdoutMutex;
@@ -186,29 +182,37 @@
MQ::Message readBuffer;
MQReader mq(queueName.c_str());
- for (bool running = true; running;) {
+ std::stringstream writeBuffer;
+ std::shared_ptr<RelationalWriter> writer(Factory::create(writeBuffer));
+
+ writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields, false);
+
+ while (true) {
mq.receive(&readBuffer);
- std::wstringstream debugLog;
- if (readBuffer.type == MQ::Message::Type::END) {
- debugLog << L"PID: " << getpid() << L" received END message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl;
- running = false;
- } else if (readBuffer.type == MQ::Message::Type::FILENAME) {
- debugLog << L"PID: " << getpid() << L" received FILENAME message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl;
+ if (readBuffer.type == MQ::Message::Type::FILENAME) {
+ std::stringstream originalName(readBuffer.getStringData());
+ processSingleFile(writer, originalName, attributeFinders, configuration, relationName);
+
+ {
+ std::lock_guard lock(stdoutMutex);
+ std::cout << writeBuffer.rdbuf() << std::flush;
+ // TODO: optional (configurable) buffering: write multiple records in a single batch
+ }
+ writeBuffer.str("");
+ writeBuffer.clear();
+ } else if (readBuffer.type == MQ::Message::Type::END) {
+ break;
} else {
- debugLog << L"PID: " << getpid() << L" received unexpected message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“ of type: " << ((int) readBuffer.type) << std::endl;
+ throw RelpipeWriterException(L"ParallelFilesystemWorker recieved message of unsupported type: " + std::to_wstring((int) readBuffer.type)); // TODO: better exception
}
-
-
- {
- std::lock_guard lock(stdoutMutex);
- std::wcerr << debugLog.str() << std::flush;
- }
- debugLog.str(L"");
- debugLog.clear();
}
}
+ void process(int inputFD, int outputFD, Configuration& configuration) override {
+ // TODO: refactoring, not used
+ }
+
};
class ParallelFilesystemProcess {
--- a/src/PlainFilesystemCommand.h Fri Jan 24 16:53:31 2020 +0100
+++ b/src/PlainFilesystemCommand.h Fri Jan 24 21:05:10 2020 +0100
@@ -41,26 +41,10 @@
string_t relationName = fetchRelationName(&configuration);
writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields);
-
+
for (std::stringstream originalName; readNext(input, originalName); reset(originalName)) {
- fs::path file(originalName.str().empty() ? "." : originalName.str()); // interpret empty string as current directory (e.g. result of: find -printf '%P\0')
- bool exists = false;
-
- try {
- exists = fs::exists(file);
- } catch (const fs::filesystem_error& e) {
- // we probably do not have permissions to given directory → pretend that the file does not exist
- }
-
- for (auto& finder : attributeFinders) finder.second->startFile(file, originalName.str(), exists);
-
- for (RequestedField field : configuration.fields) {
- std::shared_ptr<AttributeFinder> finder = attributeFinders[field.group]; // should not be nullptr, because already checked while writing the relation metadata
- finder->writeField(writer.get(), relationName, field);
- }
-
- for (auto& finder : attributeFinders) finder.second->endFile();
+ processSingleFile(writer, originalName, attributeFinders, configuration, relationName);
}
}
};