parallel processing: first working version v_0
authorFrantišek Kučera <franta-hg@frantovo.cz>
Fri, 24 Jan 2020 21:05:10 +0100
branchv_0
changeset 59 7471529c0d11
parent 58 4679f67a8324
child 60 bb7ca5891755
parallel processing: first working version
nbproject/configurations.xml
src/FilesystemCommand.h
src/ParallelFilesystemCommand.h
src/PlainFilesystemCommand.h
--- a/nbproject/configurations.xml	Fri Jan 24 16:53:31 2020 +0100
+++ b/nbproject/configurations.xml	Fri Jan 24 21:05:10 2020 +0100
@@ -69,6 +69,8 @@
       </toolsSet>
       <flagsDictionary>
         <element flagsID="0" commonFlags="-fsanitize=address -std=gnu++1z"/>
+        <element flagsID="1"
+                 commonFlags="-mtune=generic -march=x86-64 -std=gnu++17 -fsanitize=address -fstack-protector-strong"/>
       </flagsDictionary>
       <codeAssistance>
       </codeAssistance>
@@ -78,13 +80,6 @@
           <buildCommand>${MAKE} -f Makefile</buildCommand>
           <cleanCommand>${MAKE} -f Makefile clean</cleanCommand>
           <executablePath>build/Debug/src/relpipe-in-filesystem</executablePath>
-          <ccTool>
-            <incDir>
-              <pElem>../relpipe-lib-writer.cpp/include</pElem>
-              <pElem>../relpipe-lib-cli.cpp/include</pElem>
-              <pElem>build/Debug/src</pElem>
-            </incDir>
-          </ccTool>
         </makeTool>
         <preBuild>
           <preBuildCommandWorkingDir>build/Debug</preBuildCommandWorkingDir>
@@ -93,11 +88,22 @@
         </preBuild>
       </makefileType>
       <item path="src/SubProcess.cpp" ex="false" tool="1" flavor2="11">
-        <ccTool flags="0">
+        <ccTool flags="1">
+          <incDir>
+            <pElem>../relpipe-lib-writer.cpp/include/relpipe/writer</pElem>
+            <pElem>src</pElem>
+            <pElem>../relpipe-lib-writer.cpp/include</pElem>
+            <pElem>build/Debug/src</pElem>
+          </incDir>
         </ccTool>
       </item>
       <item path="src/relpipe-in-filesystem.cpp" ex="false" tool="1" flavor2="11">
         <ccTool flags="0">
+          <incDir>
+            <pElem>../relpipe-lib-writer.cpp/include</pElem>
+            <pElem>../relpipe-lib-cli.cpp/include</pElem>
+            <pElem>build/Debug/src</pElem>
+          </incDir>
         </ccTool>
       </item>
     </conf>
--- a/src/FilesystemCommand.h	Fri Jan 24 16:53:31 2020 +0100
+++ b/src/FilesystemCommand.h	Fri Jan 24 21:05:10 2020 +0100
@@ -75,7 +75,7 @@
 		return configuration->relation.empty() ? L"filesystem" : configuration->relation;
 	}
 
-	void writeHeader(RelationalWriter* writer, std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders, string_t relationName, std::vector<RequestedField>* fields) {
+	void writeHeader(RelationalWriter* writer, std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders, string_t relationName, std::vector<RequestedField>* fields, bool writeHeader = true) {
 		std::vector<AttributeMetadata> attributesMetadata;
 		for (RequestedField field : *fields) {
 			std::shared_ptr<AttributeFinder> finder = attributeFinders[field.group];
@@ -83,7 +83,27 @@
 			else throw RelpipeWriterException(L"Unsupported field group: " + field.group);
 		}
 
-		writer->startRelation(relationName, attributesMetadata, true);
+		writer->startRelation(relationName, attributesMetadata, writeHeader);
+	}
+
+	void processSingleFile(std::shared_ptr<RelationalWriter> writer, std::stringstream& originalName, std::map<string_t, std::shared_ptr < AttributeFinder>>&attributeFinders, Configuration& configuration, string_t relationName) {
+		fs::path file(originalName.str().empty() ? "." : originalName.str()); // interpret empty string as current directory (e.g. result of: find -printf '%P\0')
+		bool exists = false;
+
+		try {
+			exists = fs::exists(file);
+		} catch (const fs::filesystem_error& e) {
+			// we probably do not have permissions to given directory → pretend that the file does not exist
+		}
+
+		for (auto& finder : attributeFinders) finder.second->startFile(file, originalName.str(), exists);
+
+		for (RequestedField field : configuration.fields) {
+			std::shared_ptr<AttributeFinder> finder = attributeFinders[field.group]; // should not be nullptr, because already checked while writing the relation metadata
+			finder->writeField(writer.get(), relationName, field);
+		}
+
+		for (auto& finder : attributeFinders) finder.second->endFile();
 	}
 
 public:
--- a/src/ParallelFilesystemCommand.h	Fri Jan 24 16:53:31 2020 +0100
+++ b/src/ParallelFilesystemCommand.h	Fri Jan 24 21:05:10 2020 +0100
@@ -81,7 +81,7 @@
 
 	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
 		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
-		// FIXME: sometimes we got this error, especially with higher process counts like: --parallel 50
+		// TODO: factory method
 	}
 
 	virtual ~MQ() {
@@ -140,12 +140,12 @@
 	__pid_t originalPid;
 	sem_t* handle;
 	std::string name;
-	bool owner;
 public:
 
 	NamedMutex(std::string name) : originalPid(getpid()), name(name) {
 		handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
 		sem_post(handle);
+		// TODO: factory method, check errors
 	}
 
 	~NamedMutex() {
@@ -163,13 +163,9 @@
 	void unlock() {
 		sem_post(handle);
 	}
-
-	void disown() {
-		owner = false;
-	}
 };
 
-class ParallelFilesystemWorker {
+class ParallelFilesystemWorker : FilesystemCommand {
 private:
 	std::string queueName;
 	NamedMutex& stdoutMutex;
@@ -186,29 +182,37 @@
 		MQ::Message readBuffer;
 		MQReader mq(queueName.c_str());
 
-		for (bool running = true; running;) {
+		std::stringstream writeBuffer;
+		std::shared_ptr<RelationalWriter> writer(Factory::create(writeBuffer));
+
+		writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields, false);
+
+		while (true) {
 			mq.receive(&readBuffer);
-			std::wstringstream debugLog;
-			if (readBuffer.type == MQ::Message::Type::END) {
-				debugLog << L"PID: " << getpid() << L" received END message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl;
-				running = false;
-			} else if (readBuffer.type == MQ::Message::Type::FILENAME) {
-				debugLog << L"PID: " << getpid() << L" received FILENAME message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl;
+			if (readBuffer.type == MQ::Message::Type::FILENAME) {
+				std::stringstream originalName(readBuffer.getStringData());
+				processSingleFile(writer, originalName, attributeFinders, configuration, relationName);
+
+				{
+					std::lock_guard lock(stdoutMutex);
+					std::cout << writeBuffer.rdbuf() << std::flush;
+					// TODO: optional (configurable) buffering: write multiple records in a single batch
+				}
+				writeBuffer.str("");
+				writeBuffer.clear();
+			} else if (readBuffer.type == MQ::Message::Type::END) {
+				break;
 			} else {
-				debugLog << L"PID: " << getpid() << L" received unexpected message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“ of type: " << ((int) readBuffer.type) << std::endl;
+				throw RelpipeWriterException(L"ParallelFilesystemWorker recieved message of unsupported type: " + std::to_wstring((int) readBuffer.type)); // TODO: better exception
 			}
-
-
-			{
-				std::lock_guard lock(stdoutMutex);
-				std::wcerr << debugLog.str() << std::flush;
-			}
-			debugLog.str(L"");
-			debugLog.clear();
 		}
 
 	}
 
+	void process(int inputFD, int outputFD, Configuration& configuration) override {
+		// TODO: refactoring, not used
+	}
+
 };
 
 class ParallelFilesystemProcess {
--- a/src/PlainFilesystemCommand.h	Fri Jan 24 16:53:31 2020 +0100
+++ b/src/PlainFilesystemCommand.h	Fri Jan 24 21:05:10 2020 +0100
@@ -41,26 +41,10 @@
 
 		string_t relationName = fetchRelationName(&configuration);
 		writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields);
-		
+
 
 		for (std::stringstream originalName; readNext(input, originalName); reset(originalName)) {
-			fs::path file(originalName.str().empty() ? "." : originalName.str()); // interpret empty string as current directory (e.g. result of: find -printf '%P\0')
-			bool exists = false;
-
-			try {
-				exists = fs::exists(file);
-			} catch (const fs::filesystem_error& e) {
-				// we probably do not have permissions to given directory → pretend that the file does not exist
-			}
-
-			for (auto& finder : attributeFinders) finder.second->startFile(file, originalName.str(), exists);
-
-			for (RequestedField field : configuration.fields) {
-				std::shared_ptr<AttributeFinder> finder = attributeFinders[field.group]; // should not be nullptr, because already checked while writing the relation metadata
-				finder->writeField(writer.get(), relationName, field);
-			}
-
-			for (auto& finder : attributeFinders) finder.second->endFile();
+			processSingleFile(writer, originalName, attributeFinders, configuration, relationName);
 		}
 	}
 };