equal
deleted
inserted
replaced
20 #include <string> |
20 #include <string> |
21 #include <stdexcept> |
21 #include <stdexcept> |
22 #include <cstring> |
22 #include <cstring> |
23 |
23 |
24 namespace relpipe { |
24 namespace relpipe { |
25 namespace in { |
25 namespace out { |
26 namespace posixmq { |
26 namespace posixmq { |
27 |
27 |
28 class PosixMQ { |
28 class PosixMQ { |
29 private: |
29 private: |
30 size_t MSG_SIZE = 8192; // TODO: configurable/dynamic |
30 size_t MSG_SIZE = 8192; // TODO: configurable/dynamic |
39 virtual ~PosixMQ() { |
39 virtual ~PosixMQ() { |
40 if (handle >= 0) mq_close(handle); |
40 if (handle >= 0) mq_close(handle); |
41 } |
41 } |
42 |
42 |
43 static PosixMQ* open(std::string queueName) { |
43 static PosixMQ* open(std::string queueName) { |
44 mqd_t handle = mq_open(queueName.c_str(), O_RDONLY | O_CREAT); |
44 mqd_t handle = mq_open(queueName.c_str(), O_RDWR | O_CREAT); |
45 if (handle >= 0) return new PosixMQ(queueName, handle); |
45 if (handle >= 0) return new PosixMQ(queueName, handle); |
46 else throw std::logic_error("Unable to open PosixMQ: " + queueName + " error: " + strerror(errno)); |
46 else throw std::logic_error("Unable to open PosixMQ: " + queueName + " error: " + strerror(errno)); |
|
47 } |
|
48 |
|
49 void send(std::string message) { |
|
50 int result = mq_send(handle, message.c_str(), message.size(), 0); |
|
51 if (result) throw std::logic_error("mq_send() = " + std::to_string(result) + " error: " + strerror(errno)); |
47 } |
52 } |
48 |
53 |
49 std::string receive() { |
54 std::string receive() { |
50 char buffer[MSG_SIZE + 1]; |
55 char buffer[MSG_SIZE + 1]; |
51 memset(buffer, 0, MSG_SIZE + 1); |
56 memset(buffer, 0, MSG_SIZE + 1); |