/**
* Relational pipes
* Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <mutex>
#include <mqueue.h>
#include <limits.h>
#include <ext/stdio_filebuf.h>
#include <sys/wait.h>
#include <semaphore.h>
#include "FilesystemCommand.h"
namespace relpipe {
namespace in {
namespace filesystem {
namespace fs = std::filesystem;
using namespace relpipe::writer;
class MQ {
protected:
/**
* Process where this object was created.
* During fork() this object is copied.
* Using this variable we can detect the copy.
*/
__pid_t originalPid;
std::string name;
mqd_t handle;
bool unlinkAfterClose;
static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable
static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count
template<typename... Args> static mqd_t mqOpen(const char *__name, int __oflag, Args... args) {
mqd_t handle = mq_open(__name, __oflag, args...);
if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ.");
else return handle;
}
/**
* @param name
* @param handle do not call mq_open() directly, use MQ:mqOpen() instead.
* @param unlinkAfterClose
*/
MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : originalPid(getpid()), name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) {
}
public:
class Message {
public:
enum class Type {
FILENAME,
END
};
Type type;
size_t dataLength; // TODO: maybe uint16_t from #include <cstdint> would be enough (and shorten the message minimum size from 16 to 4)
char data[MQ::MAX_DATA_LENGTH];
void checkDataLength() const {
if (dataLength > sizeof (data) || dataLength < 0) throw RelpipeWriterException(L"Invalid POSIX MQ message size.");
}
size_t getMessageLength() const {
return sizeof (*this) - sizeof (data) + dataLength;
}
std::string getStringData() {
return std::string(data, dataLength);
}
void setStringData(const std::string& s) {
if (s.size() > sizeof (data)) throw RelpipeWriterException(L"Unable set message data: string too long.");
::memcpy(data, s.c_str(), s.size());
dataLength = s.size();
}
};
virtual ~MQ() {
mq_close(handle);
if (unlinkAfterClose && originalPid == getpid()) mq_unlink(name.c_str());
}
MQ(const MQ& other) = delete;
void operator=(const MQ& right) = delete;
};
class MQReader : public MQ {
public:
MQReader(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDONLY)) {
}
void receive(Message* m) {
int result = mq_receive(handle, (char*) m, sizeof (*m), nullptr);
if (result < 0) throw RelpipeWriterException(L"Unable to receive POSIX MQ message.");
m->checkDataLength();
}
};
class MQWriter : public MQ {
private:
mq_attr attributes;
mq_attr* getAttributes() {
attributes.mq_maxmsg = MQ::MAX_MESSAGES;
attributes.mq_msgsize = sizeof (Message);
return &attributes;
}
public:
MQWriter(std::string name) : MQ(name, mqOpen(name.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, getAttributes()), true) {
}
void send(const Message* m, unsigned int priority = 0) {
m->checkDataLength();
int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority);
if (result) throw RelpipeWriterException(L"Unable to send POSIX MQ message.");
}
};
/**
* TODO: move to a common/streamlet library
*/
class NamedMutex {
private:
/**
* Process where this object was created.
* During fork() this object is copied.
* Using this variable we can detect the copy.
*/
__pid_t originalPid;
sem_t* handle;
std::string name;
NamedMutex() {
}
public:
static NamedMutex* create(std::string name) {
sem_t* handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
if (handle == SEM_FAILED) throw RelpipeWriterException(L"Unable to open POSIX semaphore.");
NamedMutex* result = new NamedMutex();
result->name = name;
result->handle = handle;
result->originalPid = getpid();
result->unlock();
return result;
}
~NamedMutex() {
sem_close(handle);
if (originalPid == getpid()) sem_unlink(name.c_str());
}
NamedMutex(const NamedMutex&) = delete;
NamedMutex& operator=(const NamedMutex&) = delete;
void lock() {
int error = sem_wait(handle);
if (error) throw RelpipeWriterException(L"Unable to lock POSIX semaphore.");
}
void unlock() {
int error = sem_post(handle);
if (error) throw RelpipeWriterException(L"Unable to unlock POSIX semaphore.");
}
};
class ParallelFilesystemWorker : FilesystemWorker {
private:
std::string queueName;
NamedMutex& stdoutMutex;
string_t relationName;
std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders;
Configuration& configuration;
std::wstring_convert < codecvt_utf8<wchar_t>> convertor; // TODO: support also other encodings.
public:
ParallelFilesystemWorker(std::string queueName, NamedMutex& stdoutMutex, string_t relationName, std::map<string_t, std::shared_ptr<AttributeFinder> > attributeFinders, Configuration& configuration) : queueName(queueName), stdoutMutex(stdoutMutex), relationName(relationName), attributeFinders(attributeFinders), configuration(configuration) {
}
void run() {
MQ::Message readBuffer;
MQReader mq(queueName.c_str());
std::stringstream writeBuffer;
std::shared_ptr<RelationalWriter> writer(Factory::create(writeBuffer));
writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields, false);
while (true) {
mq.receive(&readBuffer);
if (readBuffer.type == MQ::Message::Type::FILENAME) {
std::stringstream originalName(readBuffer.getStringData());
processSingleFile(writer, originalName, attributeFinders, configuration, relationName);
{
std::lock_guard lock(stdoutMutex);
std::cout << writeBuffer.rdbuf() << std::flush;
// TODO: optional (configurable) buffering: write multiple records in a single batch
}
writeBuffer.str("");
writeBuffer.clear();
} else if (readBuffer.type == MQ::Message::Type::END) {
break;
} else {
throw RelpipeWriterException(L"ParallelFilesystemWorker recieved message of unsupported type: " + std::to_wstring((int) readBuffer.type)); // TODO: better exception
}
}
}
};
class ParallelFilesystemProcess {
private:
__pid_t subPid;
ParallelFilesystemProcess(__pid_t subPid) : subPid(subPid) {
}
/**
* TODO: move to a common library (copied from the AWK module)
*/
static void redirectFD(int oldfd, int newfd) {
int result = dup2(oldfd, newfd);
if (result < 0) throw ParallelFilesystemProcess::Exception(L"Unable redirect FD.");
}
/**
* TODO: move to a common library (copied from the AWK module)
*/
static void closeOrThrow(int fd) {
int error = close(fd);
if (error) throw ParallelFilesystemProcess::Exception(L"Unable to close FD: " + std::to_wstring(fd) + L" from PID: " + std::to_wstring(getpid()));
}
public:
class Exception : public relpipe::writer::RelpipeWriterException {
public:
Exception(std::wstring message) : relpipe::writer::RelpipeWriterException(message) {
}
};
static ParallelFilesystemProcess* create(std::string queueName, int outputFD, NamedMutex& stdoutMutex, string_t relationName, std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders, Configuration& configuration) {
__pid_t subPid = fork();
if (subPid < 0) {
throw SubProcess::Exception(L"Unable to fork the hash process.");
} else if (subPid == 0) {
// Child process
closeOrThrow(STDIN_FILENO); // strace -cf will show failed close() calls (same as number of processes)
if (outputFD != STDOUT_FILENO) redirectFD(outputFD, STDOUT_FILENO);
ParallelFilesystemWorker w(queueName, stdoutMutex, relationName, attributeFinders, configuration);
w.run();
return nullptr;
} else {
// Parent process
return new ParallelFilesystemProcess(subPid);
}
}
int wait() {
int status = -1;
::waitpid(subPid, &status, 0);
return status;
}
__pid_t getPid() const {
return subPid;
}
};
class ParallelFilesystemCommand : public FilesystemCommand {
public:
void process(int inputFD, int outputFD, Configuration& configuration) {
__gnu_cxx::stdio_filebuf<char> inputBuffer(inputFD, std::ios::in);
__gnu_cxx::stdio_filebuf<char> outputBuffer(outputFD, std::ios::out);
std::istream input(&inputBuffer);
std::ostream output(&outputBuffer);
// Write relation header:
string_t relationName = fetchRelationName(&configuration);
std::map<string_t, std::shared_ptr < AttributeFinder>> attributeFinders = createAttributeFinders();
std::shared_ptr<RelationalWriter> writer(Factory::create(output));
writeHeader(writer.get(), attributeFinders, relationName, &configuration.fields);
output.flush();
// Create queue:
std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid());
MQWriter mq(queueName.c_str());
MQ::Message writeBuffer;
// Create lock for STDOUT synchronization:
std::unique_ptr<NamedMutex> stdoutMutex(NamedMutex::create(queueName));
// Start workers:
std::vector<std::shared_ptr < ParallelFilesystemProcess>> workerProcesses;
bool inMainProcess = true;
for (int i = 0; i < configuration.parallelism; i++) {
std::shared_ptr<ParallelFilesystemProcess> workerProcess(ParallelFilesystemProcess::create(queueName, outputFD, *stdoutMutex, relationName, createAttributeFinders(), configuration));
if (workerProcess) {
workerProcesses.push_back(workerProcess);
} else {
inMainProcess = false;
break;
}
}
if (inMainProcess) {
// Distribute file names to the workers:
for (std::stringstream originalName; readNext(input, originalName); reset(originalName)) {
writeBuffer.type = MQ::Message::Type::FILENAME;
writeBuffer.setStringData(originalName.str());
mq.send(&writeBuffer);
}
// Tell workers that everything is done:
writeBuffer.type = MQ::Message::Type::END;
writeBuffer.setStringData("");
for (int i = 0; i < configuration.parallelism; i++) mq.send(&writeBuffer);
// Wait for workers exit:
std::map<__pid_t, int> failedProcesses;
for (std::shared_ptr<ParallelFilesystemProcess> p : workerProcesses) {
int result = p->wait();
if (result) failedProcesses[p->getPid()] = result;
}
if (failedProcesses.size()) {
std::wstringstream errorMessage;
errorMessage << L"One or more processes failed: ";
for (auto failed : failedProcesses) errorMessage << failed.first << L":" << failed.second << L", ";
throw ParallelFilesystemProcess::Exception(errorMessage.str());
}
} else {
// we are in a worker process → do nothing, finished
}
}
};
}
}
}