# HG changeset patch # User František Kučera # Date 1646352687 -3600 # Node ID 67898f122f53545bcda47d4a771e0877b49ffab1 # Parent a64022f9684e3e453c694336537841351fb2606a import PosixMQ.h from relpipe-in-posixmq diff -r a64022f9684e -r 67898f122f53 src/PosixMQ.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/PosixMQ.h Fri Mar 04 01:11:27 2022 +0100 @@ -0,0 +1,66 @@ +/** + * Relational pipes + * Copyright © 2022 František Kučera (Frantovo.cz, GlobalCode.info) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#pragma once + +#include +#include +#include +#include + +namespace relpipe { +namespace in { +namespace posixmq { + +class PosixMQ { +private: + size_t MSG_SIZE = 8192; // TODO: configurable/dynamic + std::string queueName; + mqd_t handle = -2; + + PosixMQ(std::string queueName, mqd_t handle) : queueName(queueName), handle(handle) { + } + +public: + + virtual ~PosixMQ() { + if (handle >= 0) mq_close(handle); + } + + static PosixMQ* open(std::string queueName) { + mqd_t handle = mq_open(queueName.c_str(), O_RDONLY | O_CREAT); + if (handle >= 0) return new PosixMQ(queueName, handle); + else throw std::logic_error("Unable to open PosixMQ: " + queueName + " error: " + strerror(errno)); + } + + std::string receive() { + char buffer[MSG_SIZE + 1]; + memset(buffer, 0, MSG_SIZE + 1); + ssize_t msgSize = mq_receive(handle, buffer, MSG_SIZE, nullptr); + + if (msgSize >= 0) return std::string(buffer); + else throw std::logic_error("Unable to receive PosixMQ message from " + queueName + " error: " + strerror(errno)); + } + + void unlink() { + mq_unlink(queueName.c_str()); + } + +}; + +} +} +}