src/ParallelFilesystemCommand.h
branchv_0
changeset 59 7471529c0d11
parent 58 4679f67a8324
child 60 bb7ca5891755
equal deleted inserted replaced
58:4679f67a8324 59:7471529c0d11
    79 		}
    79 		}
    80 	};
    80 	};
    81 
    81 
    82 	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
    82 	MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
    83 		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
    83 		if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
    84 		// FIXME: sometimes we got this error, especially with higher process counts like: --parallel 50
    84 		// TODO: factory method
    85 	}
    85 	}
    86 
    86 
    87 	virtual ~MQ() {
    87 	virtual ~MQ() {
    88 		mq_close(handle);
    88 		mq_close(handle);
    89 		if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str());
    89 		if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str());
   138 	 * Using this variable we can detect the copy.
   138 	 * Using this variable we can detect the copy.
   139 	 */
   139 	 */
   140 	__pid_t originalPid;
   140 	__pid_t originalPid;
   141 	sem_t* handle;
   141 	sem_t* handle;
   142 	std::string name;
   142 	std::string name;
   143 	bool owner;
       
   144 public:
   143 public:
   145 
   144 
   146 	NamedMutex(std::string name) : originalPid(getpid()), name(name) {
   145 	NamedMutex(std::string name) : originalPid(getpid()), name(name) {
   147 		handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
   146 		handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
   148 		sem_post(handle);
   147 		sem_post(handle);
       
   148 		// TODO: factory method, check errors
   149 	}
   149 	}
   150 
   150 
   151 	~NamedMutex() {
   151 	~NamedMutex() {
   152 		sem_close(handle);
   152 		sem_close(handle);
   153 		if (originalPid == getpid()) sem_unlink(name.c_str());
   153 		if (originalPid == getpid()) sem_unlink(name.c_str());
   161 	}
   161 	}
   162 
   162 
   163 	void unlock() {
   163 	void unlock() {
   164 		sem_post(handle);
   164 		sem_post(handle);
   165 	}
   165 	}
   166 
   166 };
   167 	void disown() {
   167 
   168 		owner = false;
   168 class ParallelFilesystemWorker : FilesystemCommand {
   169 	}
       
   170 };
       
   171 
       
   172 class ParallelFilesystemWorker {
       
   173 private:
   169 private:
   174 	std::string queueName;
   170 	std::string queueName;
   175 	NamedMutex& stdoutMutex;
   171 	NamedMutex& stdoutMutex;
   176 	string_t relationName;
   172 	string_t relationName;
   177 	std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders;
   173 	std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders;
   184 
   180 
   185 	void run() {
   181 	void run() {
   186 		MQ::Message readBuffer;
   182 		MQ::Message readBuffer;
   187 		MQReader mq(queueName.c_str());
   183 		MQReader mq(queueName.c_str());
   188 
   184 
   189 		for (bool running = true; running;) {
   185 		std::stringstream writeBuffer;
       
   186 		std::shared_ptr<RelationalWriter> writer(Factory::create(writeBuffer));
       
   187 
       
   188 		writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields, false);
       
   189 
       
   190 		while (true) {
   190 			mq.receive(&readBuffer);
   191 			mq.receive(&readBuffer);
   191 			std::wstringstream debugLog;
   192 			if (readBuffer.type == MQ::Message::Type::FILENAME) {
   192 			if (readBuffer.type == MQ::Message::Type::END) {
   193 				std::stringstream originalName(readBuffer.getStringData());
   193 				debugLog << L"PID: " << getpid() << L" received END message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl;
   194 				processSingleFile(writer, originalName, attributeFinders, configuration, relationName);
   194 				running = false;
   195 
   195 			} else if (readBuffer.type == MQ::Message::Type::FILENAME) {
   196 				{
   196 				debugLog << L"PID: " << getpid() << L" received FILENAME message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“" << std::endl;
   197 					std::lock_guard lock(stdoutMutex);
       
   198 					std::cout << writeBuffer.rdbuf() << std::flush;
       
   199 					// TODO: optional (configurable) buffering: write multiple records in a single batch
       
   200 				}
       
   201 				writeBuffer.str("");
       
   202 				writeBuffer.clear();
       
   203 			} else if (readBuffer.type == MQ::Message::Type::END) {
       
   204 				break;
   197 			} else {
   205 			} else {
   198 				debugLog << L"PID: " << getpid() << L" received unexpected message: „" << convertor.from_bytes(readBuffer.getStringData()) << L"“ of type: " << ((int) readBuffer.type) << std::endl;
   206 				throw RelpipeWriterException(L"ParallelFilesystemWorker recieved message of unsupported type: " + std::to_wstring((int) readBuffer.type)); // TODO: better exception
   199 			}
   207 			}
   200 
   208 		}
   201 
   209 
   202 			{
   210 	}
   203 				std::lock_guard lock(stdoutMutex);
   211 
   204 				std::wcerr << debugLog.str() << std::flush;
   212 	void process(int inputFD, int outputFD, Configuration& configuration) override {
   205 			}
   213 		// TODO: refactoring, not used
   206 			debugLog.str(L"");
       
   207 			debugLog.clear();
       
   208 		}
       
   209 
       
   210 	}
   214 	}
   211 
   215 
   212 };
   216 };
   213 
   217 
   214 class ParallelFilesystemProcess {
   218 class ParallelFilesystemProcess {