--- a/src/ParallelFilesystemCommand.h Fri Jan 24 16:53:31 2020 +0100
+++ b/src/ParallelFilesystemCommand.h Fri Jan 24 21:05:10 2020 +0100
@@ -81,7 +81,7 @@
MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
- // FIXME: sometimes we got this error, especially with higher process counts like: --parallel 50
+ // TODO: factory method
}
virtual ~MQ() {
@@ -140,12 +140,12 @@
__pid_t originalPid;
sem_t* handle;
std::string name;
- bool owner;
public:
NamedMutex(std::string name) : originalPid(getpid()), name(name) {
handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
sem_post(handle);
+ // TODO: factory method, check errors
}
~NamedMutex() {
@@ -163,13 +163,9 @@
void unlock() {
sem_post(handle);
}
-
- void disown() {
- owner = false;
- }
};
-class ParallelFilesystemWorker {
+class ParallelFilesystemWorker : FilesystemCommand {
private:
std::string queueName;
NamedMutex& stdoutMutex;
@@ -186,29 +182,37 @@
MQ::Message readBuffer;
MQReader mq(queueName.c_str());
- for (bool running = true; running;) {
+ std::stringstream writeBuffer;
+ std::shared_ptr<RelationalWriter> writer(Factory::create(writeBuffer));
+
+ writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields, false);
+
+ while (true) {
mq.receive(&readBuffer);
- std::wstringstream debugLog;
- if (readBuffer.type == MQ::Message::Type::END) {
- debugLog << L"PID: " << getpid() << L" received END message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl;
- running = false;
- } else if (readBuffer.type == MQ::Message::Type::FILENAME) {
- debugLog << L"PID: " << getpid() << L" received FILENAME message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl;
+ if (readBuffer.type == MQ::Message::Type::FILENAME) {
+ std::stringstream originalName(readBuffer.getStringData());
+ processSingleFile(writer, originalName, attributeFinders, configuration, relationName);
+
+ {
+ std::lock_guard lock(stdoutMutex);
+ std::cout << writeBuffer.rdbuf() << std::flush;
+ // TODO: optional (configurable) buffering: write multiple records in a single batch
+ }
+ writeBuffer.str("");
+ writeBuffer.clear();
+ } else if (readBuffer.type == MQ::Message::Type::END) {
+ break;
} else {
- debugLog << L"PID: " << getpid() << L" received unexpected message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“ of type: " << ((int) readBuffer.type) << std::endl;
+ throw RelpipeWriterException(L"ParallelFilesystemWorker recieved message of unsupported type: " + std::to_wstring((int) readBuffer.type)); // TODO: better exception
}
-
-
- {
- std::lock_guard lock(stdoutMutex);
- std::wcerr << debugLog.str() << std::flush;
- }
- debugLog.str(L"");
- debugLog.clear();
}
}
+ void process(int inputFD, int outputFD, Configuration& configuration) override {
+ // TODO: refactoring, not used
+ }
+
};
class ParallelFilesystemProcess {