15 * along with this program. If not, see <http://www.gnu.org/licenses/>. |
15 * along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 */ |
16 */ |
17 #pragma once |
17 #pragma once |
18 |
18 |
19 #include <mqueue.h> |
19 #include <mqueue.h> |
|
20 #include <limits.h> |
20 |
21 |
21 #include "FilesystemCommand.h" |
22 #include "FilesystemCommand.h" |
22 |
23 |
23 namespace relpipe { |
24 namespace relpipe { |
24 namespace in { |
25 namespace in { |
25 namespace filesystem { |
26 namespace filesystem { |
26 |
27 |
27 namespace fs = std::filesystem; |
28 namespace fs = std::filesystem; |
28 using namespace relpipe::writer; |
29 using namespace relpipe::writer; |
29 |
30 |
|
31 class MQ { |
|
32 protected: |
|
33 std::string name; |
|
34 mqd_t handle; |
|
35 bool unlinkAfterClose; |
|
36 static const size_t MAX_DATA_LENGTH = PATH_MAX; // TODO: maybe more or configurable |
|
37 static const __syscall_slong_t MAX_MESSAGES = 10; // TODO: maybe configurable or derived from the process count |
|
38 |
|
39 public: |
|
40 |
|
41 class Message { |
|
42 public: |
|
43 |
|
44 enum class Type { |
|
45 FILENAME, |
|
46 END |
|
47 }; |
|
48 |
|
49 Type type; |
|
50 size_t dataLength; // TODO: maybe uint16_t from #include <cstdint> would be enough (and shorten the message minimum size from 16 to 4) |
|
51 char data[MQ::MAX_DATA_LENGTH]; |
|
52 |
|
53 void checkDataLength() const { |
|
54 if (dataLength > sizeof (data) || dataLength < 0) throw RelpipeWriterException(L"Invalid POSIX MQ message size."); |
|
55 } |
|
56 |
|
57 size_t getMessageLength() const { |
|
58 return sizeof (*this) - sizeof (data) + dataLength; |
|
59 } |
|
60 |
|
61 std::string getStringData() { |
|
62 return std::string(data, dataLength); |
|
63 } |
|
64 |
|
65 void setStringData(const std::string& s) { |
|
66 if (s.size() > sizeof (data)) throw RelpipeWriterException(L"Unable set message data: string too long."); |
|
67 ::memcpy(data, s.c_str(), s.size()); |
|
68 dataLength = s.size(); |
|
69 } |
|
70 }; |
|
71 |
|
72 MQ(const std::string& name, mqd_t handle, bool unlinkAfterClose = false) : name(name), handle(handle), unlinkAfterClose(unlinkAfterClose) { |
|
73 if (handle < 0) throw RelpipeWriterException(L"Unable to open POSIX MQ."); |
|
74 } |
|
75 |
|
76 virtual ~MQ() { |
|
77 mq_close(handle); |
|
78 if (unlinkAfterClose) mq_unlink(name.c_str()); |
|
79 } |
|
80 |
|
81 MQ(const MQ& other) = delete; |
|
82 void operator=(const MQ& right) = delete; |
|
83 }; |
|
84 |
|
85 class MQReader : public MQ { |
|
86 public: |
|
87 |
|
88 MQReader(std::string name) : MQ(name, mq_open(name.c_str(), O_RDONLY)) { |
|
89 } |
|
90 |
|
91 void receive(Message* m) { |
|
92 int result = mq_receive(handle, (char*) m, sizeof (*m), nullptr); |
|
93 if (result < 0) throw RelpipeWriterException(L"Unable to receive POSIX MQ message."); |
|
94 m->checkDataLength(); |
|
95 } |
|
96 }; |
|
97 |
|
98 class MQWriter : public MQ { |
|
99 private: |
|
100 mq_attr attributes; |
|
101 |
|
102 mq_attr* getAttributes() { |
|
103 attributes.mq_maxmsg = MQ::MAX_MESSAGES; |
|
104 attributes.mq_msgsize = sizeof (Message); |
|
105 return &attributes; |
|
106 } |
|
107 public: |
|
108 |
|
109 MQWriter(std::string name) : MQ(name, mq_open(name.c_str(), O_RDWR | O_CREAT, S_IRWXU, getAttributes()), true) { |
|
110 } |
|
111 |
|
112 void send(const Message* m, unsigned int priority = 0) { |
|
113 m->checkDataLength(); |
|
114 int result = mq_send(handle, (const char*) m, m->getMessageLength(), priority); |
|
115 if (result) throw RelpipeWriterException(L"Unable to send POSIX MQ message."); |
|
116 } |
|
117 }; |
|
118 |
|
119 class ParallelFilesystemWorker { |
|
120 }; |
|
121 |
|
122 class ParallelFilesystemProcess { |
|
123 }; |
|
124 |
30 class ParallelFilesystemCommand : public FilesystemCommand { |
125 class ParallelFilesystemCommand : public FilesystemCommand { |
31 public: |
126 public: |
32 |
127 |
33 void process(std::istream& input, std::ostream& output, Configuration& configuration) { |
128 void process(std::istream& input, std::ostream& output, Configuration& configuration) { |
34 // TODO: ParallelFilesystemCommand |
129 // TODO: ParallelFilesystemCommand |
35 mq_close(0); // FIXME: remove (this line just tests that linking to librt worked well) |
130 |
|
131 { // TODO: demo code – remove: |
|
132 std::string queueName = "/relpipe-in-filesystem_parallel_" + std::to_string(getpid()); |
|
133 |
|
134 MQWriter mqWriter(queueName.c_str()); |
|
135 MQReader mqReader(queueName.c_str()); |
|
136 |
|
137 MQ::Message writeBuffer; |
|
138 MQ::Message readBuffer; |
|
139 |
|
140 // ::memset(&writeBuffer, 0, sizeof (writeBuffer)); |
|
141 // ::memset(&readBuffer, 0, sizeof (readBuffer)); |
|
142 |
|
143 writeBuffer.type = MQ::Message::Type::END; |
|
144 writeBuffer.setStringData("ahoj"); |
|
145 |
|
146 mqWriter.send(&writeBuffer); |
|
147 |
|
148 mqReader.receive(&readBuffer); |
|
149 |
|
150 std::string readData(readBuffer.data, readBuffer.dataLength); |
|
151 std::wstring_convert < codecvt_utf8<wchar_t>> convertor; |
|
152 |
|
153 std::wcerr << L"Zpráva „" << convertor.from_bytes(readData).c_str() << L"“ typu " << (int) readBuffer.type << L" o celkové délce " << readBuffer.getMessageLength() << L" a délce dat " << readBuffer.dataLength << L" byla přijata." << std::endl; |
|
154 } |
|
155 |
36 throw RelpipeWriterException(L"ParallelFilesystemCommand is not yet implemented"); |
156 throw RelpipeWriterException(L"ParallelFilesystemCommand is not yet implemented"); |
37 } |
157 } |
38 }; |
158 }; |
39 |
159 |
40 } |
160 } |