src/PosixMQ.h
branchv_0
changeset 9 2f116bd15f27
parent 6 b0b7b6f1bc88
equal deleted inserted replaced
8:9f89ecc071bf 9:2f116bd15f27
    25 namespace out {
    25 namespace out {
    26 namespace posixmq {
    26 namespace posixmq {
    27 
    27 
    28 class PosixMQ {
    28 class PosixMQ {
    29 private:
    29 private:
       
    30 	const static size_t MSG_COUNT = 10; // TODO: configurable/dynamic
    30 	const static size_t MSG_SIZE = 8192; // TODO: configurable/dynamic
    31 	const static size_t MSG_SIZE = 8192; // TODO: configurable/dynamic
    31 	std::string queueName;
    32 	std::string queueName;
    32 	mqd_t handle = -2;
    33 	mqd_t handle = -2;
    33 	bool unlinkOnClose = false;
    34 	bool unlinkOnClose = false;
    34 
    35 
    41 		if (handle >= 0) mq_close(handle);
    42 		if (handle >= 0) mq_close(handle);
    42 		if (unlinkOnClose) mq_unlink(queueName.c_str());
    43 		if (unlinkOnClose) mq_unlink(queueName.c_str());
    43 	}
    44 	}
    44 
    45 
    45 	static PosixMQ* open(std::string queueName, bool unlinkOnClose = false) {
    46 	static PosixMQ* open(std::string queueName, bool unlinkOnClose = false) {
    46 		mqd_t handle = mq_open(queueName.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
    47 		struct mq_attr attr = (struct mq_attr){0};
       
    48 		attr.mq_maxmsg = MSG_COUNT;
       
    49 		attr.mq_msgsize = MSG_SIZE;
       
    50 
       
    51 		mqd_t handle = mq_open(queueName.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP, &attr);
    47 		if (handle >= 0) return new PosixMQ(queueName, handle, unlinkOnClose);
    52 		if (handle >= 0) return new PosixMQ(queueName, handle, unlinkOnClose);
    48 		else throw std::logic_error("Unable to open PosixMQ: " + queueName + " error: " + strerror(errno));
    53 		else throw std::logic_error("Unable to open PosixMQ: " + queueName + " error: " + strerror(errno));
    49 	}
    54 	}
    50 
    55 
    51 	void send(std::string message) {
    56 	void send(std::string message) {