src/ParallelFilesystemCommand.h
branchv_0
changeset 60 bb7ca5891755
parent 59 7471529c0d11
child 61 640ba8948d69
equal deleted inserted replaced
59:7471529c0d11 60:bb7ca5891755
    44 	mqd_t handle;
    44 	mqd_t handle;
    45 	bool unlinkAfterClose;
    45 	bool unlinkAfterClose;
    46 	static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable
    46 	static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable
    47 	static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count
    47 	static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count
    48 
    48 
       
    49 	template<typename... Args> static mqd_t mqOpen(const char *__name, int __oflag, Args... args) {
       
    50 		mqd_t handle = mq_open(__name, __oflag, args...);
       
    51 		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
       
    52 		else return handle;
       
    53 	}
       
    54 
       
    55 	/**
       
    56 	 * @param name
       
    57 	 * @param handle do not call mq_open() directly, use MQ:mqOpen() instead.
       
    58 	 * @param unlinkAfterClose
       
    59 	 */
       
    60 	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
       
    61 	}
       
    62 
    49 public:
    63 public:
    50 
    64 
    51 	class Message {
    65 	class Message {
    52 	public:
    66 	public:
    53 
    67 
    77 			::memcpy(data, s.c_str(), s.size());
    91 			::memcpy(data, s.c_str(), s.size());
    78 			dataLength = s.size();
    92 			dataLength = s.size();
    79 		}
    93 		}
    80 	};
    94 	};
    81 
    95 
    82 	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
       
    83 		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
       
    84 		// TODO: factory method
       
    85 	}
       
    86 
       
    87 	virtual ~MQ() {
    96 	virtual ~MQ() {
    88 		mq_close(handle);
    97 		mq_close(handle);
    89 		if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str());
    98 		if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str());
    90 	}
    99 	}
    91 
   100 
    94 };
   103 };
    95 
   104 
    96 class MQReader : public MQ {
   105 class MQReader : public MQ {
    97 public:
   106 public:
    98 
   107 
    99 	MQReader(std::string name) : MQ(name, mq_open(name.c_str(), O_RDONLY)) {
   108 	MQReader(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDONLY)) {
   100 	}
   109 	}
   101 
   110 
   102 	void receive(Message* m) {
   111 	void receive(Message* m) {
   103 		int result = mq_receive(handle, (char*) m, sizeof (*m), nullptr);
   112 		int result = mq_receive(handle, (char*) m, sizeof (*m), nullptr);
   104 		if (result < 0) throw RelpipeWriterException(L"Unable to receive POSIX MQ message.");
   113 		if (result < 0) throw RelpipeWriterException(L"Unable to receive POSIX MQ message.");
   115 		attributes.mq_msgsize = sizeof (Message);
   124 		attributes.mq_msgsize = sizeof (Message);
   116 		return &attributes;
   125 		return &attributes;
   117 	}
   126 	}
   118 public:
   127 public:
   119 
   128 
   120 	MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) {
   129 	MQWriter(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) {
   121 	}
   130 	}
   122 
   131 
   123 	void send(const Message* m, unsigned int priority = 0) {
   132 	void send(const Message* m, unsigned int priority = 0) {
   124 		m->checkDataLength();
   133 		m->checkDataLength();
   125 		int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority);
   134 		int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority);
   138 	 * Using this variable we can detect the copy.
   147 	 * Using this variable we can detect the copy.
   139 	 */
   148 	 */
   140 	__pid_t originalPid;
   149 	__pid_t originalPid;
   141 	sem_t* handle;
   150 	sem_t* handle;
   142 	std::string name;
   151 	std::string name;
   143 public:
   152 
   144 
   153 	NamedMutex() {
   145 	NamedMutex(std::string name) : originalPid(getpid()), name(name) {
   154 	}
   146 		handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
   155 
   147 		sem_post(handle);
   156 public:
   148 		// TODO: factory method, check errors
   157 
       
   158 	static NamedMutex* create(std::string name) {
       
   159 		sem_t* handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
       
   160 		if (handle == SEM_FAILED) throw RelpipeWriterException(L"Unable to open POSIX semaphore.");
       
   161 
       
   162 		NamedMutex* result = new NamedMutex();
       
   163 		result->name = name;
       
   164 		result->handle = handle;
       
   165 		result->originalPid = getpid();
       
   166 		result->unlock();
       
   167 
       
   168 		return result;
   149 	}
   169 	}
   150 
   170 
   151 	~NamedMutex() {
   171 	~NamedMutex() {
   152 		sem_close(handle);
   172 		sem_close(handle);
   153 		if (originalPid == getpid()) sem_unlink(name.c_str());
   173 		if (originalPid == getpid()) sem_unlink(name.c_str());
   155 
   175 
   156 	NamedMutex(const NamedMutex&) = delete;
   176 	NamedMutex(const NamedMutex&) = delete;
   157 	NamedMutex& operator=(const NamedMutex&) = delete;
   177 	NamedMutex& operator=(const NamedMutex&) = delete;
   158 
   178 
   159 	void lock() {
   179 	void lock() {
   160 		sem_wait(handle);
   180 		int error = sem_wait(handle);
       
   181 		if (error) throw RelpipeWriterException(L"Unable to lock POSIX semaphore.");
   161 	}
   182 	}
   162 
   183 
   163 	void unlock() {
   184 	void unlock() {
   164 		sem_post(handle);
   185 		int error = sem_post(handle);
       
   186 		if (error) throw RelpipeWriterException(L"Unable to unlock POSIX semaphore.");
   165 	}
   187 	}
   166 };
   188 };
   167 
   189 
   168 class ParallelFilesystemWorker : FilesystemCommand {
   190 class ParallelFilesystemWorker : FilesystemCommand {
   169 private:
   191 private:
   208 		}
   230 		}
   209 
   231 
   210 	}
   232 	}
   211 
   233 
   212 	void process(int inputFD, int outputFD, Configuration& configuration) override {
   234 	void process(int inputFD, int outputFD, Configuration& configuration) override {
   213 		// TODO: refactoring, not used
   235 		// FIXME: refactoring, not used
   214 	}
   236 	}
   215 
   237 
   216 };
   238 };
   217 
   239 
   218 class ParallelFilesystemProcess {
   240 class ParallelFilesystemProcess {
   298 		std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid());
   320 		std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid());
   299 		MQWriter mq(queueName.c_str());
   321 		MQWriter mq(queueName.c_str());
   300 		MQ::Message writeBuffer;
   322 		MQ::Message writeBuffer;
   301 
   323 
   302 		// Create lock for STDOUT synchronization:
   324 		// Create lock for STDOUT synchronization:
   303 		NamedMutex stdoutMutex(queueName);
   325 		std::unique_ptr<NamedMutex> stdoutMutex(NamedMutex::create(queueName));
   304 
   326 
   305 		// Start workers:
   327 		// Start workers:
   306 		std::vector<std::shared_ptr < ParallelFilesystemProcess>> workerProcesses;
   328 		std::vector<std::shared_ptr < ParallelFilesystemProcess>> workerProcesses;
   307 		bool inMainProcess = true;
   329 		bool inMainProcess = true;
   308 		for (int i = 0; i < configuration.parallelism; i++) {
   330 		for (int i = 0; i < configuration.parallelism; i++) {
   309 			std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, stdoutMutex, relationName, createAttributeFinders(), configuration));
   331 			std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, *stdoutMutex, relationName, createAttributeFinders(), configuration));
   310 			if (workerProcess) {
   332 			if (workerProcess) {
   311 				workerProcesses.push_back(workerProcess);
   333 				workerProcesses.push_back(workerProcess);
   312 			} else {
   334 			} else {
   313 				inMainProcess = false;
   335 				inMainProcess = false;
   314 				break;
   336 				break;