# HG changeset patch # User František Kučera # Date 1579520362 -3600 # Node ID fea625f0a096acb7b20b18cf52f73785249788b9 # Parent 841845ccf06d26773678eb51d6df424ad41e208f parallel processing: prepare infrastructure diff -r 841845ccf06d -r fea625f0a096 bash-completion.sh --- a/bash-completion.sh Sun Jan 19 18:41:32 2020 +0100 +++ b/bash-completion.sh Mon Jan 20 12:39:22 2020 +0100 @@ -54,6 +54,7 @@ elif [[ "$w1" == "--as" && "x$w0" == "x" ]]; then COMPREPLY=("''") elif [[ "$w1" == "--option" && "x$w0" == "x" ]]; then COMPREPLY=("''") elif [[ "$w2" == "--option" && "x$w0" == "x" ]]; then COMPREPLY=("''") + elif [[ "$w1" == "--parallel" && "x$w0" == "x" ]]; then COMPREPLY=($(nproc --all)) elif [[ "$w1" == "--file" ]]; then COMPREPLY=($(compgen -W "${FILE_FIELDS[*]}" -- "$w0")) elif [[ "$w1" == "--xattr" ]]; then COMPREPLY=($(compgen -W "${XATTR_FIELDS[*]}" -- "$w0")) elif [[ "$w1" == "--streamlet" ]]; then COMPREPLY=($(while read c; do PATH="$RELPIPE_IN_FILESYSTEM_STREAMLET_PATH" type -P "$c" &>/dev/null && echo "$c"; done < <(PATH="$RELPIPE_IN_FILESYSTEM_STREAMLET_PATH" compgen -A command -- "$w0"))) @@ -65,6 +66,7 @@ "--streamlet" "--as" "--option" + "--parallel" ) COMPREPLY=($(compgen -W "${OPTIONS[*]}" -- "$w0")) fi diff -r 841845ccf06d -r fea625f0a096 nbproject/configurations.xml --- a/nbproject/configurations.xml Sun Jan 19 18:41:32 2020 +0100 +++ b/nbproject/configurations.xml Mon Jan 20 12:39:22 2020 +0100 @@ -42,14 +42,7 @@ - AttributeFinder.h - CLIParser.h - Configuration.h - FileAttributeFinder.h - RequestedField.h SubProcess.cpp - SubProcess.h - XattrAttributeFinder.h relpipe-in-filesystem.cpp @@ -115,7 +108,7 @@ false - + @@ -128,6 +121,7 @@ ../relpipe-lib-writer.cpp/include + ../relpipe-lib-common.cpp/include ../relpipe-lib-cli.cpp/include build/Release/src @@ -142,23 +136,11 @@ true - - - - - - - + + + - - - - - - - - - + diff -r 841845ccf06d -r fea625f0a096 src/CLIParser.h --- a/src/CLIParser.h Sun Jan 19 18:41:32 2020 +0100 +++ b/src/CLIParser.h Mon Jan 20 12:39:22 2020 +0100 @@ -55,6 +55,7 @@ static const string_t OPTION_AS; static const string_t OPTION_OPTION; static const string_t OPTION_RELATION; + static const string_t OPTION_PARALLEL; Configuration parse(const std::vector& arguments) { Configuration c; @@ -79,6 +80,9 @@ currentOptions.push_back(readNext(arguments, i)); } else if (option == OPTION_RELATION) { c.relation = readNext(arguments, i); + } else if (option == OPTION_PARALLEL) { + c.parallelism = std::stoi(readNext(arguments, i)); + if (c.parallelism < 1) throw relpipe::cli::RelpipeCLIException(L"Number of parallel processes must be 1 or more.", relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); } else { throw relpipe::cli::RelpipeCLIException(L"Unsupported CLI option: " + option, relpipe::cli::CLI::EXIT_CODE_BAD_CLI_ARGUMENTS); } @@ -119,6 +123,7 @@ const string_t CLIParser::OPTION_AS = L"--as"; const string_t CLIParser::OPTION_OPTION = L"--option"; const string_t CLIParser::OPTION_RELATION = L"--relation"; +const string_t CLIParser::OPTION_PARALLEL = L"--parallel"; } } diff -r 841845ccf06d -r fea625f0a096 src/Configuration.h --- a/src/Configuration.h Sun Jan 19 18:41:32 2020 +0100 +++ b/src/Configuration.h Mon Jan 20 12:39:22 2020 +0100 @@ -30,6 +30,7 @@ public: string_t relation; std::vector fields; + int parallelism = 1; virtual ~Configuration() { } diff -r 841845ccf06d -r fea625f0a096 src/FilesystemCommand.h --- a/src/FilesystemCommand.h Sun Jan 19 18:41:32 2020 +0100 +++ b/src/FilesystemCommand.h Mon Jan 20 12:39:22 2020 +0100 @@ -16,28 +16,7 @@ */ #pragma once -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -#include - -#include - -#include "Configuration.h" -#include "AttributeFinder.h" -#include "FileAttributeFinder.h" -#include "XattrAttributeFinder.h" -#include "StreamletAttributeFinder.h" +#include "FilesystemCommandBase.h" namespace relpipe { namespace in { @@ -46,31 +25,9 @@ namespace fs = std::filesystem; using namespace relpipe::writer; -class FilesystemCommand { +class FilesystemCommand : public FilesystemCommandBase { private: - std::wstring_convert> convertor; // TODO: support also other encodings. - - FileAttributeFinder fileAttributeFinder; - StreamletAttributeFinder execAttributeFinder; - XattrAttributeFinder xattrAttributeFinder; - - std::map attributeFinders{ - {RequestedField::GROUP_FILE, &fileAttributeFinder}, - {RequestedField::GROUP_STREAMLET, &execAttributeFinder}, - {RequestedField::GROUP_XATTR, &xattrAttributeFinder}}; - - void reset(std::stringstream& stream) { - stream.str(""); - stream.clear(); - } - - bool readNext(std::istream& input, std::stringstream& originalName) { - for (char ch; input.get(ch);) { - if (ch == 0) return true; - else originalName << ch; - } - return originalName.tellp(); - } + std::map> attributeFinders = createAttributeFinders(); public: @@ -81,7 +38,7 @@ std::vector attributesMetadata; for (RequestedField field : configuration.fields) { - AttributeFinder* finder = attributeFinders[field.group]; + std::shared_ptr finder = attributeFinders[field.group]; if (finder) for (AttributeMetadata m : finder->toMetadata(writer.get(), relationName, field)) attributesMetadata.push_back(m); else throw RelpipeWriterException(L"Unsupported field group: " + field.group); } @@ -102,7 +59,7 @@ for (auto& finder : attributeFinders) finder.second->startFile(file, originalName.str(), exists); for (RequestedField field : configuration.fields) { - AttributeFinder* finder = attributeFinders[field.group]; // should not be nullptr, because already checked while writing the relation metadata + std::shared_ptr finder = attributeFinders[field.group]; // should not be nullptr, because already checked while writing the relation metadata finder->writeField(writer.get(), relationName, field); } diff -r 841845ccf06d -r fea625f0a096 src/FilesystemCommandBase.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/FilesystemCommandBase.h Mon Jan 20 12:39:22 2020 +0100 @@ -0,0 +1,83 @@ +/** + * Relational pipes + * Copyright © 2019 František Kučera (Frantovo.cz, GlobalCode.info) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include + +#include "Configuration.h" +#include "AttributeFinder.h" +#include "FileAttributeFinder.h" +#include "XattrAttributeFinder.h" +#include "StreamletAttributeFinder.h" + +namespace relpipe { +namespace in { +namespace filesystem { + +namespace fs = std::filesystem; +using namespace relpipe::writer; + +class FilesystemCommandBase { +protected: + std::wstring_convert> convertor; // TODO: support also other encodings. + + std::map> createAttributeFinders() { + return { + {RequestedField::GROUP_FILE, std::make_shared()}, + {RequestedField::GROUP_STREAMLET, std::make_shared()}, + {RequestedField::GROUP_XATTR, std::make_shared()}}; + } + + void reset(std::stringstream& stream) { + stream.str(""); + stream.clear(); + } + + bool readNext(std::istream& input, std::stringstream& originalName) { + for (char ch; input.get(ch);) { + if (ch == 0) return true; + else originalName << ch; + } + return originalName.tellp(); + } + +public: + + virtual ~FilesystemCommandBase() = default; + + virtual void process(std::istream& input, std::ostream& output, Configuration& configuration) = 0; +}; + +} +} +} diff -r 841845ccf06d -r fea625f0a096 src/ParallelFilesystemCommand.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/ParallelFilesystemCommand.h Mon Jan 20 12:39:22 2020 +0100 @@ -0,0 +1,39 @@ +/** + * Relational pipes + * Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#pragma once + +#include "FilesystemCommandBase.h" + +namespace relpipe { +namespace in { +namespace filesystem { + +namespace fs = std::filesystem; +using namespace relpipe::writer; + +class ParallelFilesystemCommand : public FilesystemCommandBase { +public: + + void process(std::istream& input, std::ostream& output, Configuration& configuration) { + // TODO: ParallelFilesystemCommand + throw RelpipeWriterException(L"ParallelFilesystemCommand is not yet implemented"); + } +}; + +} +} +} diff -r 841845ccf06d -r fea625f0a096 src/relpipe-in-filesystem.cpp --- a/src/relpipe-in-filesystem.cpp Sun Jan 19 18:41:32 2020 +0100 +++ b/src/relpipe-in-filesystem.cpp Mon Jan 20 12:39:22 2020 +0100 @@ -30,6 +30,7 @@ #include #include "FilesystemCommand.h" +#include "ParallelFilesystemCommand.h" #include "CLIParser.h" using namespace relpipe::cli; @@ -46,8 +47,10 @@ try { CLIParser cliParser; Configuration configuration = cliParser.parse(cli.arguments()); - FilesystemCommand command; - command.process(cin, cout, configuration); + std::unique_ptr command; + if (configuration.parallelism == 1) command = std::make_unique(); + else command = std::make_unique(); + command->process(cin, cout, configuration); resultCode = CLI::EXIT_CODE_SUCCESS; } catch (RelpipeWriterException e) { fwprintf(stderr, L"Caught Writer exception: %ls\n", e.getMessge().c_str());