14 * You should have received a copy of the GNU General Public License |
14 * You should have received a copy of the GNU General Public License |
15 * along with this program. If not, see <http://www.gnu.org/licenses/>. |
15 * along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 */ |
16 */ |
17 #pragma once |
17 #pragma once |
18 |
18 |
|
19 #include <memory> |
|
20 #include <atomic> |
19 #include <cstdlib> |
21 #include <cstdlib> |
20 #include <cstring> |
22 #include <cstring> |
21 #include <memory> |
23 #include <sstream> |
|
24 #include <sys/mman.h> |
22 #include <unistd.h> |
25 #include <unistd.h> |
23 #include <pthread.h> |
26 #include <pthread.h> |
24 #include <sys/mman.h> |
|
25 #include <atomic> |
|
26 #include <functional> |
27 #include <functional> |
27 #include <sstream> |
|
28 #include <iomanip> |
28 #include <iomanip> |
29 |
29 |
30 #include <jack/jack.h> |
30 #include <jack/jack.h> |
31 #include <jack/midiport.h> |
31 #include <jack/midiport.h> |
32 #include <jack/ringbuffer.h> |
32 #include <jack/ringbuffer.h> |
33 |
33 |
|
34 #include <relpipe/common/type/typedefs.h> |
34 #include <relpipe/writer/RelationalWriter.h> |
35 #include <relpipe/writer/RelationalWriter.h> |
35 #include <relpipe/writer/RelpipeWriterException.h> |
36 #include <relpipe/writer/RelpipeWriterException.h> |
36 #include <relpipe/writer/AttributeMetadata.h> |
37 #include <relpipe/writer/AttributeMetadata.h> |
37 #include <relpipe/writer/Factory.h> |
38 #include <relpipe/writer/Factory.h> |
38 #include <relpipe/writer/TypeId.h> |
39 #include <relpipe/writer/TypeId.h> |
39 #include <relpipe/cli/CLI.h> |
40 #include <relpipe/cli/CLI.h> |
40 |
41 |
|
42 #include "Configuration.h" |
41 #include "JackException.h" |
43 #include "JackException.h" |
42 |
|
43 using namespace relpipe::writer; |
|
44 |
44 |
45 namespace relpipe { |
45 namespace relpipe { |
46 namespace in { |
46 namespace in { |
47 namespace jack { |
47 namespace jack { |
48 |
48 |
51 class JackCommand { |
51 class JackCommand { |
52 private: |
52 private: |
53 Configuration& configuration; |
53 Configuration& configuration; |
54 std::wstring_convert<std::codecvt_utf8<wchar_t>> convertor; // TODO: local system encoding |
54 std::wstring_convert<std::codecvt_utf8<wchar_t>> convertor; // TODO: local system encoding |
55 |
55 |
56 jack_port_t* jackPort = nullptr; |
|
57 jack_ringbuffer_t* ringBuffer = nullptr; |
|
58 pthread_mutex_t messageThreadLock = PTHREAD_MUTEX_INITIALIZER; |
|
59 pthread_cond_t dataReady = PTHREAD_COND_INITIALIZER; |
|
60 |
|
61 std::atomic<bool> continueProcessing{true}; |
56 std::atomic<bool> continueProcessing{true}; |
62 |
57 |
63 const int RING_BUFFER_SIZE = 100; |
58 /** |
64 |
59 * Is passed through the ring buffer |
|
60 * from the the jack-writing thread (callback) to the relpipe-writing thread. |
|
61 */ |
65 struct MidiMessage { |
62 struct MidiMessage { |
66 uint8_t buffer[4096]; |
63 uint8_t buffer[4096] = {0}; |
67 uint32_t size; |
64 uint32_t size; |
68 uint32_t time; |
65 uint32_t time; |
69 }; |
66 }; |
70 |
67 |
71 public: |
68 /** |
72 |
69 * JACK callbacks (called from the real-time thread) |
73 int enqueueMessage(jack_nframes_t frames) { |
70 */ |
74 void* buffer = jack_port_get_buffer(jackPort, frames); |
71 class RealTimeContext { |
75 if (buffer == nullptr) throw JackException(L"Unable to get port buffer."); // TODO: exception in RT callback? |
72 public: |
76 |
73 jack_client_t* jackClient = nullptr; |
77 for (jack_nframes_t i = 0, eventCount = jack_midi_get_event_count(buffer); i < eventCount; i++) { |
74 jack_port_t* jackPort = nullptr; |
78 jack_midi_event_t event; |
75 jack_ringbuffer_t* ringBuffer = nullptr; |
79 int noData = jack_midi_event_get(&event, buffer, i); |
76 |
80 if (noData) continue; |
77 pthread_mutex_t processingLock = PTHREAD_MUTEX_INITIALIZER; |
81 |
78 pthread_cond_t processingDone = PTHREAD_COND_INITIALIZER; |
82 if (event.size > sizeof (MidiMessage::buffer)) { |
79 |
83 // TODO: should not printf in RT callback: |
80 const int RING_BUFFER_SIZE = 100; |
84 fwprintf(stderr, L"Error: MIDI message was too large → skipping event. Maximum allowed size: %lu bytes.\n", sizeof (MidiMessage::buffer)); |
81 |
85 } else if (jack_ringbuffer_write_space(ringBuffer) >= sizeof (MidiMessage)) { |
82 int processCallback(jack_nframes_t frames) { |
86 MidiMessage m; |
83 void* buffer = jack_port_get_buffer(jackPort, frames); |
87 m.time = event.time; |
84 if (buffer == nullptr) throw JackException(L"Unable to get port buffer."); // TODO: exception in RT callback? |
88 m.size = event.size; |
85 |
89 memcpy(m.buffer, event.buffer, event.size); |
86 for (jack_nframes_t i = 0, eventCount = jack_midi_get_event_count(buffer); i < eventCount; i++) { |
90 jack_ringbuffer_write(ringBuffer, (const char *) &m, sizeof (MidiMessage)); |
87 jack_midi_event_t event; |
91 } else { |
88 int noData = jack_midi_event_get(&event, buffer, i); |
92 // TODO: should not printf in RT callback: |
89 if (noData) continue; |
93 fwprintf(stderr, L"Error: ring buffer is full → skipping event.\n"); |
90 |
94 } |
91 if (event.size > sizeof (MidiMessage::buffer)) { |
95 } |
92 // TODO: should not printf in RT callback: |
96 |
93 fwprintf(stderr, L"Error: MIDI message was too large → skipping event. Maximum allowed size: %lu bytes.\n", sizeof (MidiMessage::buffer)); |
97 // TODO: just count skipped events and bytes and report them in next successful message instead of printing to STDERR |
94 } else if (jack_ringbuffer_write_space(ringBuffer) >= sizeof (MidiMessage)) { |
98 |
95 MidiMessage m; |
99 if (pthread_mutex_trylock(&messageThreadLock) == 0) { |
96 m.time = event.time; |
100 pthread_cond_signal(&dataReady); |
97 m.size = event.size; |
101 pthread_mutex_unlock(&messageThreadLock); |
98 memcpy(m.buffer, event.buffer, event.size); |
102 } |
99 jack_ringbuffer_write(ringBuffer, (const char *) &m, sizeof (MidiMessage)); |
103 |
100 } else { |
104 return 0; |
101 // TODO: should not printf in RT callback: |
105 } |
102 fwprintf(stderr, L"Error: ring buffer is full → skipping event.\n"); |
106 |
103 } |
107 private: |
104 } |
108 |
105 |
109 static void writeRecord(std::shared_ptr<RelationalWriter> writer, |
106 // TODO: just count skipped events and bytes and report them in next successful message instead of printing to STDERR |
110 string_t eventType, integer_t channel, |
107 |
111 boolean_t noteOn, integer_t pitch, integer_t velocity, |
108 if (pthread_mutex_trylock(&processingLock) == 0) { |
112 integer_t controllerId, integer_t value, |
109 pthread_cond_signal(&processingDone); |
113 string_t raw) { |
110 pthread_mutex_unlock(&processingLock); |
|
111 } |
|
112 |
|
113 return 0; |
|
114 } |
|
115 |
|
116 static int processCallback(jack_nframes_t frames, void* instance) { |
|
117 return static_cast<RealTimeContext*> (instance)->processCallback(frames); |
|
118 } |
|
119 |
|
120 } realTimeContext; |
|
121 |
|
122 static void writeRecord(std::shared_ptr<relpipe::writer::RelationalWriter> writer, |
|
123 relpipe::common::type::StringX eventType, relpipe::common::type::Integer channel, |
|
124 relpipe::common::type::Boolean noteOn, relpipe::common::type::Integer pitch, relpipe::common::type::Integer velocity, |
|
125 relpipe::common::type::Integer controllerId, relpipe::common::type::Integer value, |
|
126 relpipe::common::type::StringX raw) { |
114 writer->writeAttribute(eventType); |
127 writer->writeAttribute(eventType); |
115 writer->writeAttribute(&channel, typeid (channel)); |
128 writer->writeAttribute(&channel, typeid (channel)); |
116 writer->writeAttribute(¬eOn, typeid (noteOn)); |
129 writer->writeAttribute(¬eOn, typeid (noteOn)); |
117 writer->writeAttribute(&pitch, typeid (pitch)); |
130 writer->writeAttribute(&pitch, typeid (pitch)); |
118 writer->writeAttribute(&velocity, typeid (velocity)); |
131 writer->writeAttribute(&velocity, typeid (velocity)); |
119 writer->writeAttribute(&controllerId, typeid (controllerId)); |
132 writer->writeAttribute(&controllerId, typeid (controllerId)); |
120 writer->writeAttribute(&value, typeid (value)); |
133 writer->writeAttribute(&value, typeid (value)); |
121 writer->writeAttribute(&raw, typeid (raw)); |
134 writer->writeAttribute(&raw, typeid (raw)); |
122 } |
135 } |
123 |
136 |
124 void processMessage(std::shared_ptr<RelationalWriter> writer, MidiMessage* event) { |
137 void processMessage(std::shared_ptr<relpipe::writer::RelationalWriter> writer, MidiMessage* event) { |
125 if (event->size == 0) { |
138 if (event->size == 0) { |
126 return; |
139 return; |
127 } else { |
140 } else { |
128 uint8_t type = event->buffer[0] & 0xF0; |
141 uint8_t type = event->buffer[0] & 0xF0; |
129 uint8_t channel = event->buffer[0] & 0x0F; |
142 uint8_t channel = event->buffer[0] & 0x0F; |
154 } |
167 } |
155 |
168 |
156 return result.str(); |
169 return result.str(); |
157 } |
170 } |
158 |
171 |
|
172 static void jackErrorCallback(const char * message) { |
|
173 std::wstring_convert < std::codecvt_utf8<wchar_t>> convertor; // TODO: local system encoding |
|
174 std::wcerr << L"JACK: " << convertor.from_bytes(message) << std::endl; |
|
175 } |
|
176 |
|
177 void finalize() { |
|
178 // Close JACK connection: |
|
179 jack_deactivate(realTimeContext.jackClient); |
|
180 jack_client_close(realTimeContext.jackClient); |
|
181 jack_ringbuffer_free(realTimeContext.ringBuffer); |
|
182 pthread_mutex_unlock(&realTimeContext.processingLock); |
|
183 } |
|
184 |
|
185 void failInConstructor(const relpipe::common::type::StringX& errorMessage) { |
|
186 finalize(); |
|
187 throw JackException(errorMessage); |
|
188 } |
|
189 |
|
190 /** |
|
191 * Wait for the signal that is emitted at the end of the real-time processCallback() cycle. |
|
192 */ |
|
193 void waitForRTCycle() { |
|
194 pthread_cond_wait(&realTimeContext.processingDone, &realTimeContext.processingLock); |
|
195 } |
|
196 |
159 public: |
197 public: |
160 |
198 |
161 JackCommand(Configuration& configuration) : configuration(configuration) { |
199 JackCommand(Configuration& configuration) : configuration(configuration) { |
162 } |
200 pthread_mutex_lock(&realTimeContext.processingLock); |
163 |
201 |
164 void finish(int sig) { |
202 // Initialize JACK connection: |
165 continueProcessing = false; |
203 std::string clientName = convertor.to_bytes(configuration.jackClientName); |
166 } |
204 realTimeContext.jackClient = jack_client_open(clientName.c_str(), JackNullOption, nullptr); |
167 |
205 if (realTimeContext.jackClient == nullptr) failInConstructor(L"Could not create JACK client."); |
168 void processJackStream(std::shared_ptr<writer::RelationalWriter> writer, std::function<void() > relationalWriterFlush) { |
206 |
|
207 realTimeContext.ringBuffer = jack_ringbuffer_create(realTimeContext.RING_BUFFER_SIZE * sizeof (MidiMessage)); |
|
208 |
|
209 jack_set_process_callback(realTimeContext.jackClient, RealTimeContext::processCallback, &realTimeContext); |
|
210 // TODO: report also other events (connections etc.) |
|
211 jack_set_error_function(jackErrorCallback); |
|
212 jack_set_info_function(jackErrorCallback); |
|
213 |
|
214 realTimeContext.jackPort = jack_port_register(realTimeContext.jackClient, "input", JACK_DEFAULT_MIDI_TYPE, JackPortIsInput, 0); |
|
215 if (realTimeContext.jackPort == nullptr) failInConstructor(L"Could not register the JACK port."); |
|
216 |
|
217 if (mlockall(MCL_CURRENT | MCL_FUTURE)) fwprintf(stderr, L"Warning: Can not lock memory.\n"); |
|
218 |
|
219 int jackError = jack_activate(realTimeContext.jackClient); |
|
220 if (jackError) failInConstructor(L"Could not activate the JACK client."); |
|
221 } |
|
222 |
|
223 void processJackStream(std::shared_ptr<relpipe::writer::RelationalWriter> writer, std::function<void() > relationalWriterFlush) { |
169 // Relation headers: |
224 // Relation headers: |
|
225 using namespace relpipe::writer; |
170 vector<AttributeMetadata> metadata; |
226 vector<AttributeMetadata> metadata; |
171 metadata.push_back({L"event", TypeId::STRING}); |
227 metadata.push_back({L"event", TypeId::STRING}); |
172 metadata.push_back({L"channel", TypeId::INTEGER}); |
228 metadata.push_back({L"channel", TypeId::INTEGER}); |
173 metadata.push_back({L"note_on", TypeId::BOOLEAN}); |
229 metadata.push_back({L"note_on", TypeId::BOOLEAN}); |
174 metadata.push_back({L"note_pitch", TypeId::INTEGER}); |
230 metadata.push_back({L"note_pitch", TypeId::INTEGER}); |
177 metadata.push_back({L"controller_value", TypeId::INTEGER}); |
233 metadata.push_back({L"controller_value", TypeId::INTEGER}); |
178 metadata.push_back({L"raw", TypeId::STRING}); |
234 metadata.push_back({L"raw", TypeId::STRING}); |
179 writer->startRelation(L"midi", metadata, true); |
235 writer->startRelation(L"midi", metadata, true); |
180 relationalWriterFlush(); |
236 relationalWriterFlush(); |
181 |
237 |
182 // Initialize JACK connection: |
|
183 std::string clientName = convertor.to_bytes(configuration.jackClientName); |
|
184 jack_client_t* client = jack_client_open(clientName.c_str(), JackNullOption, nullptr); |
|
185 if (client == nullptr) throw JackException(L"Could not create JACK client."); |
|
186 |
|
187 ringBuffer = jack_ringbuffer_create(RING_BUFFER_SIZE * sizeof (MidiMessage)); |
|
188 |
|
189 jack_set_process_callback(client, relpipe::in::jack::enqueueMessage, this); |
|
190 // TODO: report also other events (connections etc.) |
|
191 |
|
192 jackPort = jack_port_register(client, "input", JACK_DEFAULT_MIDI_TYPE, JackPortIsInput, 0); |
|
193 if (jackPort == nullptr) throw JackException(L"Could not register port."); |
|
194 |
|
195 if (mlockall(MCL_CURRENT | MCL_FUTURE)) fwprintf(stderr, L"Warning: Can not lock memory.\n"); |
|
196 |
|
197 int jackError = jack_activate(client); |
|
198 if (jackError) throw JackException(L"Could not activate client."); |
|
199 |
|
200 // Process messages from the ring buffer queue: |
238 // Process messages from the ring buffer queue: |
201 pthread_mutex_lock(&messageThreadLock); |
|
202 while (continueProcessing) { |
239 while (continueProcessing) { |
203 const size_t queuedMessages = jack_ringbuffer_read_space(ringBuffer) / sizeof (MidiMessage); |
240 while (jack_ringbuffer_read_space(realTimeContext.ringBuffer) >= sizeof (MidiMessage)) { |
204 for (size_t i = 0; i < queuedMessages; ++i) { |
|
205 MidiMessage m; |
241 MidiMessage m; |
206 jack_ringbuffer_read(ringBuffer, (char*) &m, sizeof (MidiMessage)); |
242 jack_ringbuffer_read(realTimeContext.ringBuffer, (char*) &m, sizeof (MidiMessage)); |
207 processMessage(writer, &m); |
243 processMessage(writer, &m); |
208 relationalWriterFlush(); |
244 relationalWriterFlush(); |
209 } |
245 } |
210 pthread_cond_wait(&dataReady, &messageThreadLock); |
246 waitForRTCycle(); |
211 } |
247 } |
212 pthread_mutex_unlock(&messageThreadLock); |
248 } |
213 |
249 |
214 // Close JACK connection: |
250 void finish(int sig) { |
215 jack_deactivate(client); |
251 continueProcessing = false; |
216 jack_client_close(client); |
252 } |
217 jack_ringbuffer_free(ringBuffer); |
253 |
|
254 virtual ~JackCommand() { |
|
255 finalize(); |
218 } |
256 } |
219 |
257 |
220 }; |
258 }; |
221 |
|
222 int enqueueMessage(jack_nframes_t frames, void* arg) { |
|
223 JackCommand* instance = (JackCommand*) arg; |
|
224 return instance->enqueueMessage(frames); |
|
225 } |
|
226 |
259 |
227 } |
260 } |
228 } |
261 } |
229 } |
262 } |