src/ParallelFilesystemCommand.h
branchv_0
changeset 59 7471529c0d11
parent 58 4679f67a8324
child 60 bb7ca5891755
--- a/src/ParallelFilesystemCommand.h	Fri Jan 24 16:53:31 2020 +0100
+++ b/src/ParallelFilesystemCommand.h	Fri Jan 24 21:05:10 2020 +0100
@@ -81,7 +81,7 @@
 
 	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
 		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
-		// FIXME: sometimes we got this error, especially with higher process counts like: --parallel 50
+		// TODO: factory method
 	}
 
 	virtual ~MQ() {
@@ -140,12 +140,12 @@
 	__pid_t originalPid;
 	sem_t* handle;
 	std::string name;
-	bool owner;
 public:
 
 	NamedMutex(std::string name) : originalPid(getpid()), name(name) {
 		handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
 		sem_post(handle);
+		// TODO: factory method, check errors
 	}
 
 	~NamedMutex() {
@@ -163,13 +163,9 @@
 	void unlock() {
 		sem_post(handle);
 	}
-
-	void disown() {
-		owner = false;
-	}
 };
 
-class ParallelFilesystemWorker {
+class ParallelFilesystemWorker : FilesystemCommand {
 private:
 	std::string queueName;
 	NamedMutex& stdoutMutex;
@@ -186,29 +182,37 @@
 		MQ::Message readBuffer;
 		MQReader mq(queueName.c_str());
 
-		for (bool running = true; running;) {
+		std::stringstream writeBuffer;
+		std::shared_ptr<RelationalWriter> writer(Factory::create(writeBuffer));
+
+		writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields, false);
+
+		while (true) {
 			mq.receive(&readBuffer);
-			std::wstringstream debugLog;
-			if (readBuffer.type == MQ::Message::Type::END) {
-				debugLog << L"PID: " << getpid() << L" received END message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl;
-				running = false;
-			} else if (readBuffer.type == MQ::Message::Type::FILENAME) {
-				debugLog << L"PID: " << getpid() << L" received FILENAME message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl;
+			if (readBuffer.type == MQ::Message::Type::FILENAME) {
+				std::stringstream originalName(readBuffer.getStringData());
+				processSingleFile(writer, originalName, attributeFinders, configuration, relationName);
+
+				{
+					std::lock_guard lock(stdoutMutex);
+					std::cout << writeBuffer.rdbuf() << std::flush;
+					// TODO: optional (configurable) buffering: write multiple records in a single batch
+				}
+				writeBuffer.str("");
+				writeBuffer.clear();
+			} else if (readBuffer.type == MQ::Message::Type::END) {
+				break;
 			} else {
-				debugLog << L"PID: " << getpid() << L" received unexpected message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“ of type: " << ((int) readBuffer.type) << std::endl;
+				throw RelpipeWriterException(L"ParallelFilesystemWorker recieved message of unsupported type: " + std::to_wstring((int) readBuffer.type)); // TODO: better exception
 			}
-
-
-			{
-				std::lock_guard lock(stdoutMutex);
-				std::wcerr << debugLog.str() << std::flush;
-			}
-			debugLog.str(L"");
-			debugLog.clear();
 		}
 
 	}
 
+	void process(int inputFD, int outputFD, Configuration& configuration) override {
+		// TODO: refactoring, not used
+	}
+
 };
 
 class ParallelFilesystemProcess {