parallel processing: prepare infrastructure v_0
authorFrantišek Kučera <franta-hg@frantovo.cz>
Mon, 20 Jan 2020 12:39:22 +0100
branchv_0
changeset 52 fea625f0a096
parent 51 841845ccf06d
child 53 170a993745be
parallel processing: prepare infrastructure
bash-completion.sh
nbproject/configurations.xml
src/CLIParser.h
src/Configuration.h
src/FilesystemCommand.h
src/FilesystemCommandBase.h
src/ParallelFilesystemCommand.h
src/relpipe-in-filesystem.cpp
--- a/bash-completion.sh	Sun Jan 19 18:41:32 2020 +0100
+++ b/bash-completion.sh	Mon Jan 20 12:39:22 2020 +0100
@@ -54,6 +54,7 @@
 	elif [[ "$w1" == "--as"            && "x$w0" == "x" ]];    then COMPREPLY=("''")
 	elif [[ "$w1" == "--option"        && "x$w0" == "x" ]];    then COMPREPLY=("''")
 	elif [[ "$w2" == "--option"        && "x$w0" == "x" ]];    then COMPREPLY=("''")
+	elif [[ "$w1" == "--parallel"      && "x$w0" == "x" ]];    then COMPREPLY=($(nproc --all))
 	elif [[ "$w1" == "--file"                           ]];    then COMPREPLY=($(compgen -W "${FILE_FIELDS[*]}"  -- "$w0"))
 	elif [[ "$w1" == "--xattr"                          ]];    then COMPREPLY=($(compgen -W "${XATTR_FIELDS[*]}" -- "$w0"))
 	elif [[ "$w1" == "--streamlet"                      ]];    then COMPREPLY=($(while read c; do PATH="$RELPIPE_IN_FILESYSTEM_STREAMLET_PATH" type -P "$c" &>/dev/null && echo "$c"; done < <(PATH="$RELPIPE_IN_FILESYSTEM_STREAMLET_PATH" compgen -A command -- "$w0")))
@@ -65,6 +66,7 @@
 			"--streamlet"
 			"--as"
 			"--option"
+			"--parallel"
 		)
 		COMPREPLY=($(compgen -W "${OPTIONS[*]}" -- "$w0"))
 	fi
--- a/nbproject/configurations.xml	Sun Jan 19 18:41:32 2020 +0100
+++ b/nbproject/configurations.xml	Mon Jan 20 12:39:22 2020 +0100
@@ -42,14 +42,7 @@
   <logicalFolder name="root" displayName="root" projectFiles="true" kind="ROOT">
     <df root="." name="0">
       <df name="src">
-        <in>AttributeFinder.h</in>
-        <in>CLIParser.h</in>
-        <in>Configuration.h</in>
-        <in>FileAttributeFinder.h</in>
-        <in>RequestedField.h</in>
         <in>SubProcess.cpp</in>
-        <in>SubProcess.h</in>
-        <in>XattrAttributeFinder.h</in>
         <in>relpipe-in-filesystem.cpp</in>
       </df>
     </df>
@@ -115,7 +108,7 @@
         <rebuildPropChanged>false</rebuildPropChanged>
       </toolsSet>
       <flagsDictionary>
-        <element flagsID="0" commonFlags="-O3"/>
+        <element flagsID="0" commonFlags="-O3 -std=gnu++1z"/>
       </flagsDictionary>
       <codeAssistance>
       </codeAssistance>
@@ -128,6 +121,7 @@
           <ccTool>
             <incDir>
               <pElem>../relpipe-lib-writer.cpp/include</pElem>
+              <pElem>../relpipe-lib-common.cpp/include</pElem>
               <pElem>../relpipe-lib-cli.cpp/include</pElem>
               <pElem>build/Release/src</pElem>
             </incDir>
@@ -142,23 +136,11 @@
           <preBuildFirst>true</preBuildFirst>
         </preBuild>
       </makefileType>
-      <item path="src/AttributeFinder.h" ex="false" tool="3" flavor2="0">
-      </item>
-      <item path="src/CLIParser.h" ex="false" tool="3" flavor2="0">
-      </item>
-      <item path="src/Configuration.h" ex="false" tool="3" flavor2="0">
-      </item>
-      <item path="src/FileAttributeFinder.h" ex="false" tool="3" flavor2="0">
+      <item path="src/SubProcess.cpp" ex="false" tool="1" flavor2="11">
+        <ccTool flags="0">
+        </ccTool>
       </item>
-      <item path="src/RequestedField.h" ex="false" tool="3" flavor2="0">
-      </item>
-      <item path="src/SubProcess.cpp" ex="false" tool="1" flavor2="0">
-      </item>
-      <item path="src/SubProcess.h" ex="false" tool="3" flavor2="0">
-      </item>
-      <item path="src/XattrAttributeFinder.h" ex="false" tool="3" flavor2="0">
-      </item>
-      <item path="src/relpipe-in-filesystem.cpp" ex="false" tool="1" flavor2="0">
+      <item path="src/relpipe-in-filesystem.cpp" ex="false" tool="1" flavor2="11">
         <ccTool flags="0">
         </ccTool>
       </item>
--- a/src/CLIParser.h	Sun Jan 19 18:41:32 2020 +0100
+++ b/src/CLIParser.h	Mon Jan 20 12:39:22 2020 +0100
@@ -55,6 +55,7 @@
 	static const string_t OPTION_AS;
 	static const string_t OPTION_OPTION;
 	static const string_t OPTION_RELATION;
+	static const string_t OPTION_PARALLEL;
 
 	Configuration parse(const std::vector<string_t>& arguments) {
 		Configuration c;
@@ -79,6 +80,9 @@
 					currentOptions.push_back(readNext(arguments, i));
 				} else if (option == OPTION_RELATION) {
 					c.relation = readNext(arguments, i);
+				} else if (option == OPTION_PARALLEL) {
+					c.parallelism = std::stoi(readNext(arguments, i));
+					if (c.parallelism < 1) throw relpipe::cli::RelpipeCLIException(L"Number of parallel processes must be 1 or more.", relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
 				} else {
 					throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS);
 				}
@@ -119,6 +123,7 @@
 const string_t CLIParser::OPTION_AS = L"--as";
 const string_t CLIParser::OPTION_OPTION = L"--option";
 const string_t CLIParser::OPTION_RELATION = L"--relation";
+const string_t CLIParser::OPTION_PARALLEL = L"--parallel";
 
 }
 }
--- a/src/Configuration.h	Sun Jan 19 18:41:32 2020 +0100
+++ b/src/Configuration.h	Mon Jan 20 12:39:22 2020 +0100
@@ -30,6 +30,7 @@
 public:
 	string_t relation;
 	std::vector<RequestedField> fields;
+	int parallelism = 1;
 
 	virtual ~Configuration() {
 	}
--- a/src/FilesystemCommand.h	Sun Jan 19 18:41:32 2020 +0100
+++ b/src/FilesystemCommand.h	Mon Jan 20 12:39:22 2020 +0100
@@ -16,28 +16,7 @@
  */
 #pragma once
 
-#include <cstdlib>
-#include <iostream>
-#include <sstream>
-#include <string>
-#include <vector>
-#include <map>
-#include <algorithm>
-#include <filesystem>
-
-#include <pwd.h>
-#include <grp.h>
-#include <sys/stat.h>
-
-#include <sys/xattr.h>
-
-#include <relpipe/writer/typedefs.h>
-
-#include "Configuration.h"
-#include "AttributeFinder.h"
-#include "FileAttributeFinder.h"
-#include "XattrAttributeFinder.h"
-#include "StreamletAttributeFinder.h"
+#include "FilesystemCommandBase.h"
 
 namespace relpipe {
 namespace in {
@@ -46,31 +25,9 @@
 namespace fs = std::filesystem;
 using namespace relpipe::writer;
 
-class FilesystemCommand {
+class FilesystemCommand : public FilesystemCommandBase {
 private:
-	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
-
-	FileAttributeFinder fileAttributeFinder;
-	StreamletAttributeFinder execAttributeFinder;
-	XattrAttributeFinder xattrAttributeFinder;
-
-	std::map<string_t, AttributeFinder*> attributeFinders{
-		{RequestedField::GROUP_FILE, &fileAttributeFinder},
-		{RequestedField::GROUP_STREAMLET, &execAttributeFinder},
-		{RequestedField::GROUP_XATTR, &xattrAttributeFinder}};
-
-	void reset(std::stringstream& stream) {
-		stream.str("");
-		stream.clear();
-	}
-
-	bool readNext(std::istream& input, std::stringstream& originalName) {
-		for (char ch; input.get(ch);) {
-			if (ch == 0) return true;
-			else originalName << ch;
-		}
-		return originalName.tellp();
-	}
+	std::map<string_t, std::shared_ptr<AttributeFinder>> attributeFinders = createAttributeFinders();
 
 public:
 
@@ -81,7 +38,7 @@
 
 		std::vector<AttributeMetadata> attributesMetadata;
 		for (RequestedField field : configuration.fields) {
-			AttributeFinder* finder = attributeFinders[field.group];
+			std::shared_ptr<AttributeFinder> finder = attributeFinders[field.group];
 			if (finder) for (AttributeMetadata m : finder->toMetadata(writer.get(), relationName, field)) attributesMetadata.push_back(m);
 			else throw RelpipeWriterException(L"Unsupported field group: " + field.group);
 		}
@@ -102,7 +59,7 @@
 			for (auto& finder : attributeFinders) finder.second->startFile(file, originalName.str(), exists);
 
 			for (RequestedField field : configuration.fields) {
-				AttributeFinder* finder = attributeFinders[field.group]; // should not be nullptr, because already checked while writing the relation metadata
+				std::shared_ptr<AttributeFinder> finder = attributeFinders[field.group]; // should not be nullptr, because already checked while writing the relation metadata
 				finder->writeField(writer.get(), relationName, field);
 			}
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/FilesystemCommandBase.h	Mon Jan 20 12:39:22 2020 +0100
@@ -0,0 +1,83 @@
+/**
+ * Relational pipes
+ * Copyright © 2019 František Kučera (Frantovo.cz, GlobalCode.info)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, version 3 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#pragma once
+
+#include <cstdlib>
+#include <iostream>
+#include <sstream>
+#include <string>
+#include <vector>
+#include <map>
+#include <memory>
+#include <algorithm>
+#include <filesystem>
+
+#include <pwd.h>
+#include <grp.h>
+#include <sys/stat.h>
+
+#include <sys/xattr.h>
+
+#include <relpipe/writer/typedefs.h>
+
+#include "Configuration.h"
+#include "AttributeFinder.h"
+#include "FileAttributeFinder.h"
+#include "XattrAttributeFinder.h"
+#include "StreamletAttributeFinder.h"
+
+namespace relpipe {
+namespace in {
+namespace filesystem {
+
+namespace fs = std::filesystem;
+using namespace relpipe::writer;
+
+class FilesystemCommandBase {
+protected:
+	std::wstring_convert<codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
+
+	std::map<string_t, std::shared_ptr<AttributeFinder>> createAttributeFinders() {
+		return {
+			{RequestedField::GROUP_FILE, std::make_shared<FileAttributeFinder>()},
+			{RequestedField::GROUP_STREAMLET, std::make_shared<StreamletAttributeFinder>()},
+			{RequestedField::GROUP_XATTR, std::make_shared<XattrAttributeFinder>()}};
+	}
+
+	void reset(std::stringstream& stream) {
+		stream.str("");
+		stream.clear();
+	}
+
+	bool readNext(std::istream& input, std::stringstream& originalName) {
+		for (char ch; input.get(ch);) {
+			if (ch == 0) return true;
+			else originalName << ch;
+		}
+		return originalName.tellp();
+	}
+
+public:
+
+	virtual ~FilesystemCommandBase() = default;
+
+	virtual void process(std::istream& input, std::ostream& output, Configuration& configuration) = 0;
+};
+
+}
+}
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/ParallelFilesystemCommand.h	Mon Jan 20 12:39:22 2020 +0100
@@ -0,0 +1,39 @@
+/**
+ * Relational pipes
+ * Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, version 3 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#pragma once
+
+#include "FilesystemCommandBase.h"
+
+namespace relpipe {
+namespace in {
+namespace filesystem {
+
+namespace fs = std::filesystem;
+using namespace relpipe::writer;
+
+class ParallelFilesystemCommand : public FilesystemCommandBase {
+public:
+
+	void process(std::istream& input, std::ostream& output, Configuration& configuration) {
+		// TODO: ParallelFilesystemCommand
+		throw RelpipeWriterException(L"ParallelFilesystemCommand is not yet implemented");
+	}
+};
+
+}
+}
+}
--- a/src/relpipe-in-filesystem.cpp	Sun Jan 19 18:41:32 2020 +0100
+++ b/src/relpipe-in-filesystem.cpp	Mon Jan 20 12:39:22 2020 +0100
@@ -30,6 +30,7 @@
 #include <relpipe/cli/RelpipeCLIException.h>
 
 #include "FilesystemCommand.h"
+#include "ParallelFilesystemCommand.h"
 #include "CLIParser.h"
 
 using namespace relpipe::cli;
@@ -46,8 +47,10 @@
 	try {
 		CLIParser cliParser;
 		Configuration configuration = cliParser.parse(cli.arguments());
-		FilesystemCommand command;
-		command.process(cin, cout, configuration);
+		std::unique_ptr<FilesystemCommandBase> command;
+		if (configuration.parallelism == 1) command = std::make_unique<FilesystemCommand>();
+		else command = std::make_unique<ParallelFilesystemCommand>();
+		command->process(cin, cout, configuration);
 		resultCode = CLI::EXIT_CODE_SUCCESS;
 	} catch (RelpipeWriterException e) {
 		fwprintf(stderr, L"Caught Writer exception: %ls\n", e.getMessge().c_str());