src/ParallelFilesystemCommand.h
branchv_0
changeset 58 4679f67a8324
parent 57 c40a241d6e0c
child 59 7471529c0d11
equal deleted inserted replaced
57:c40a241d6e0c 58:4679f67a8324
    14  * You should have received a copy of the GNU General Public License
    14  * You should have received a copy of the GNU General Public License
    15  * along with this program. If not, see <http://www.gnu.org/licenses/>.
    15  * along with this program. If not, see <http://www.gnu.org/licenses/>.
    16  */
    16  */
    17 #pragma once
    17 #pragma once
    18 
    18 
       
    19 #include <mutex>
    19 #include <mqueue.h>
    20 #include <mqueue.h>
    20 #include <limits.h>
    21 #include <limits.h>
    21 #include <ext/stdio_filebuf.h>
    22 #include <ext/stdio_filebuf.h>
       
    23 #include <sys/wait.h>
       
    24 #include <semaphore.h>
    22 
    25 
    23 #include "FilesystemCommand.h"
    26 #include "FilesystemCommand.h"
    24 
    27 
    25 namespace relpipe {
    28 namespace relpipe {
    26 namespace in {
    29 namespace in {
    29 namespace fs = std::filesystem;
    32 namespace fs = std::filesystem;
    30 using namespace relpipe::writer;
    33 using namespace relpipe::writer;
    31 
    34 
    32 class MQ {
    35 class MQ {
    33 protected:
    36 protected:
       
    37 	/**
       
    38 	 * Process where this object was created. 
       
    39 	 * During fork() this object is copied.
       
    40 	 * Using this variable we can detect the copy.
       
    41 	 */
       
    42 	__pid_t originalPid;
    34 	std::string name;
    43 	std::string name;
    35 	mqd_t handle;
    44 	mqd_t handle;
    36 	bool unlinkAfterClose;
    45 	bool unlinkAfterClose;
    37 	static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable
    46 	static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable
    38 	static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count
    47 	static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count
    68 			::memcpy(data, s.c_str(), s.size());
    77 			::memcpy(data, s.c_str(), s.size());
    69 			dataLength = s.size();
    78 			dataLength = s.size();
    70 		}
    79 		}
    71 	};
    80 	};
    72 
    81 
    73 	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
    82 	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
    74 		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
    83 		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
       
    84 		// FIXME: sometimes we got this error, especially with higher process counts like: --parallel 50
    75 	}
    85 	}
    76 
    86 
    77 	virtual ~MQ() {
    87 	virtual ~MQ() {
    78 		mq_close(handle);
    88 		mq_close(handle);
    79 		if (unlinkAfterClose) mq_unlink(name.c_str());
    89 		if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str());
    80 	}
    90 	}
    81 
    91 
    82 	MQ(const MQ& other) = delete;
    92 	MQ(const MQ& other) = delete;
    83 	void operator=(const MQ& right) = delete;
    93 	void operator=(const MQ& right) = delete;
    84 };
    94 };
   105 		attributes.mq_msgsize = sizeof (Message);
   115 		attributes.mq_msgsize = sizeof (Message);
   106 		return &attributes;
   116 		return &attributes;
   107 	}
   117 	}
   108 public:
   118 public:
   109 
   119 
   110 	MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT, S_IRWXU, getAttributes()), true) {
   120 	MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) {
   111 	}
   121 	}
   112 
   122 
   113 	void send(const Message* m, unsigned int priority = 0) {
   123 	void send(const Message* m, unsigned int priority = 0) {
   114 		m->checkDataLength();
   124 		m->checkDataLength();
   115 		int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority);
   125 		int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority);
   116 		if (result) throw RelpipeWriterException(L"Unable to send POSIX MQ message.");
   126 		if (result) throw RelpipeWriterException(L"Unable to send POSIX MQ message.");
   117 	}
   127 	}
   118 };
   128 };
   119 
   129 
       
   130 /**
       
   131  * TODO: move to a common/streamlet library
       
   132  */
       
   133 class NamedMutex {
       
   134 private:
       
   135 	/**
       
   136 	 * Process where this object was created. 
       
   137 	 * During fork() this object is copied.
       
   138 	 * Using this variable we can detect the copy.
       
   139 	 */
       
   140 	__pid_t originalPid;
       
   141 	sem_t* handle;
       
   142 	std::string name;
       
   143 	bool owner;
       
   144 public:
       
   145 
       
   146 	NamedMutex(std::string name) : originalPid(getpid()), name(name) {
       
   147 		handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
       
   148 		sem_post(handle);
       
   149 	}
       
   150 
       
   151 	~NamedMutex() {
       
   152 		sem_close(handle);
       
   153 		if (originalPid == getpid()) sem_unlink(name.c_str());
       
   154 	}
       
   155 
       
   156 	NamedMutex(const NamedMutex&) = delete;
       
   157 	NamedMutex& operator=(const NamedMutex&) = delete;
       
   158 
       
   159 	void lock() {
       
   160 		sem_wait(handle);
       
   161 	}
       
   162 
       
   163 	void unlock() {
       
   164 		sem_post(handle);
       
   165 	}
       
   166 
       
   167 	void disown() {
       
   168 		owner = false;
       
   169 	}
       
   170 };
       
   171 
   120 class ParallelFilesystemWorker {
   172 class ParallelFilesystemWorker {
       
   173 private:
       
   174 	std::string queueName;
       
   175 	NamedMutex& stdoutMutex;
       
   176 	string_t relationName;
       
   177 	std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders;
       
   178 	Configuration& configuration;
       
   179 	std::wstring_convert < codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
       
   180 public:
       
   181 
       
   182 	ParallelFilesystemWorker(std::string queueName, NamedMutex& stdoutMutex, string_t relationName, std::map<string_t, std::shared_ptr<AttributeFinder> > attributeFinders, Configuration& configuration) : queueName(queueName), stdoutMutex(stdoutMutex), relationName(relationName), attributeFinders(attributeFinders), configuration(configuration) {
       
   183 	}
       
   184 
       
   185 	void run() {
       
   186 		MQ::Message readBuffer;
       
   187 		MQReader mq(queueName.c_str());
       
   188 
       
   189 		for (bool running = true; running;) {
       
   190 			mq.receive(&readBuffer);
       
   191 			std::wstringstream debugLog;
       
   192 			if (readBuffer.type == MQ::Message::Type::END) {
       
   193 				debugLog << L"PID: " << getpid() << L" received END message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl;
       
   194 				running = false;
       
   195 			} else if (readBuffer.type == MQ::Message::Type::FILENAME) {
       
   196 				debugLog << L"PID: " << getpid() << L" received FILENAME message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl;
       
   197 			} else {
       
   198 				debugLog << L"PID: " << getpid() << L" received unexpected message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“ of type: " << ((int) readBuffer.type) << std::endl;
       
   199 			}
       
   200 
       
   201 
       
   202 			{
       
   203 				std::lock_guard lock(stdoutMutex);
       
   204 				std::wcerr << debugLog.str() << std::flush;
       
   205 			}
       
   206 			debugLog.str(L"");
       
   207 			debugLog.clear();
       
   208 		}
       
   209 
       
   210 	}
       
   211 
   121 };
   212 };
   122 
   213 
   123 class ParallelFilesystemProcess {
   214 class ParallelFilesystemProcess {
       
   215 private:
       
   216 	__pid_t subPid;
       
   217 
       
   218 	ParallelFilesystemProcess(__pid_t subPid) : subPid(subPid) {
       
   219 	}
       
   220 
       
   221 	/**
       
   222 	 * TODO: move to a common library (copied from the AWK module) 
       
   223 	 */
       
   224 	static void redirectFD(int oldfd, int newfd) {
       
   225 		int result = dup2(oldfd, newfd);
       
   226 		if (result < 0) throw ParallelFilesystemProcess::Exception(L"Unable redirect FD.");
       
   227 	}
       
   228 
       
   229 	/**
       
   230 	 * TODO: move to a common library (copied from the AWK module) 
       
   231 	 */
       
   232 	static void closeOrThrow(int fd) {
       
   233 		int error = close(fd);
       
   234 		if (error) throw ParallelFilesystemProcess::Exception(L"Unable to close FD: " + std::to_wstring(fd) + L" from PID: " + std::to_wstring(getpid()));
       
   235 	}
       
   236 
       
   237 public:
       
   238 
       
   239 	class Exception : public relpipe::writer::RelpipeWriterException {
       
   240 	public:
       
   241 
       
   242 		Exception(std::wstring message) : relpipe::writer::RelpipeWriterException(message) {
       
   243 		}
       
   244 
       
   245 	};
       
   246 
       
   247 	static ParallelFilesystemProcess* create(std::string queueName, int outputFD, NamedMutex& stdoutMutex, string_t relationName, std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders, Configuration& configuration) {
       
   248 		__pid_t subPid = fork();
       
   249 
       
   250 		if (subPid < 0) {
       
   251 			throw SubProcess::Exception(L"Unable to fork the hash process.");
       
   252 		} else if (subPid == 0) {
       
   253 			// Child process
       
   254 			closeOrThrow(STDIN_FILENO); // strace -cf will show failed close() calls (same as number of processes)
       
   255 			if (outputFD != STDOUT_FILENO) redirectFD(outputFD, STDOUT_FILENO);
       
   256 			ParallelFilesystemWorker w(queueName, stdoutMutex, relationName, attributeFinders, configuration);
       
   257 			w.run();
       
   258 			return nullptr;
       
   259 		} else {
       
   260 			// Parent process
       
   261 			return new ParallelFilesystemProcess(subPid);
       
   262 		}
       
   263 	}
       
   264 
       
   265 	int wait() {
       
   266 		int status = -1;
       
   267 		::waitpid(subPid, &status, 0);
       
   268 		return status;
       
   269 	}
       
   270 
       
   271 	__pid_t getPid() const {
       
   272 		return subPid;
       
   273 	}
       
   274 
   124 };
   275 };
   125 
   276 
   126 class ParallelFilesystemCommand : public FilesystemCommand {
   277 class ParallelFilesystemCommand : public FilesystemCommand {
   127 public:
   278 public:
   128 
   279 
   129 	void process(int inputFD, int outputFD, Configuration& configuration) {
   280 	void process(int inputFD, int outputFD, Configuration& configuration) {
   130 		// TODO: ParallelFilesystemCommand
   281 		__gnu_cxx::stdio_filebuf<char> inputBuffer(inputFD, std::ios::in);
   131 
   282 		__gnu_cxx::stdio_filebuf<char> outputBuffer(outputFD, std::ios::out);
   132 		{ // TODO: demo code – remove:
   283 		std::istream input(&inputBuffer);
   133 			std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid());
   284 		std::ostream output(&outputBuffer);
   134 
   285 
   135 			MQWriter mqWriter(queueName.c_str());
   286 		// Write relation header:
   136 			MQReader mqReader(queueName.c_str());
   287 		string_t relationName = fetchRelationName(&configuration);
   137 
   288 		std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders = createAttributeFinders();
   138 			MQ::Message writeBuffer;
   289 		std::shared_ptr<RelationalWriter> writer(Factory::create(output));
   139 			MQ::Message readBuffer;
   290 		writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields);
   140 
   291 		output.flush();
   141 			// ::memset(&writeBuffer, 0, sizeof (writeBuffer));
   292 
   142 			// ::memset(&readBuffer, 0, sizeof (readBuffer));
   293 		// Create queue:
   143 
   294 		std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid());
       
   295 		MQWriter mq(queueName.c_str());
       
   296 		MQ::Message writeBuffer;
       
   297 
       
   298 		// Create lock for STDOUT synchronization:
       
   299 		NamedMutex stdoutMutex(queueName);
       
   300 
       
   301 		// Start workers:
       
   302 		std::vector<std::shared_ptr < ParallelFilesystemProcess>> workerProcesses;
       
   303 		bool inMainProcess = true;
       
   304 		for (int i = 0; i < configuration.parallelism; i++) {
       
   305 			std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, stdoutMutex, relationName, createAttributeFinders(), configuration));
       
   306 			if (workerProcess) {
       
   307 				workerProcesses.push_back(workerProcess);
       
   308 			} else {
       
   309 				inMainProcess = false;
       
   310 				break;
       
   311 			}
       
   312 		}
       
   313 
       
   314 		if (inMainProcess) {
       
   315 			// Distribute file names to the workers:
       
   316 			for (std::stringstream originalName; readNext(input, originalName); reset(originalName)) {
       
   317 				writeBuffer.type = MQ::Message::Type::FILENAME;
       
   318 				writeBuffer.setStringData(originalName.str());
       
   319 				mq.send(&writeBuffer);
       
   320 			}
       
   321 
       
   322 			// Tell workers that everything is done:
   144 			writeBuffer.type = MQ::Message::Type::END;
   323 			writeBuffer.type = MQ::Message::Type::END;
   145 			writeBuffer.setStringData("ahoj");
   324 			writeBuffer.setStringData("");
   146 
   325 			for (int i = 0; i < configuration.parallelism; i++) mq.send(&writeBuffer);
   147 			mqWriter.send(&writeBuffer);
   326 
   148 
   327 			// Wait for workers exit:
   149 			mqReader.receive(&readBuffer);
   328 			std::map<__pid_t, int> failedProcesses;
   150 
   329 			for (std::shared_ptr<ParallelFilesystemProcess> p : workerProcesses) {
   151 			std::string readData(readBuffer.data, readBuffer.dataLength);
   330 				int result = p->wait();
   152 			std::wstring_convert < codecvt_utf8<wchar_t>> convertor;
   331 				if (result) failedProcesses[p->getPid()] = result;
   153 
   332 			}
   154 			std::wcerr << L"Zpráva „" << convertor.from_bytes(readData).c_str() << L"“ typu " << (int) readBuffer.type << L" o celkové délce " << readBuffer.getMessageLength() << L" a délce dat " << readBuffer.dataLength << L" byla přijata." << std::endl;
   333 
   155 		}
   334 			if (failedProcesses.size()) {
   156 
   335 				std::wstringstream errorMessage;
   157 		throw RelpipeWriterException(L"ParallelFilesystemCommand is not yet implemented");
   336 				errorMessage << L"One or more processes failed: ";
       
   337 				for (auto failed : failedProcesses) errorMessage << failed.first << L":" << failed.second << L", ";
       
   338 				throw ParallelFilesystemProcess::Exception(errorMessage.str());
       
   339 			}
       
   340 
       
   341 		} else {
       
   342 			// we are in a worker process → do nothing, finished
       
   343 		}
   158 	}
   344 	}
   159 };
   345 };
   160 
   346 
   161 }
   347 }
   162 }
   348 }