# HG changeset patch # User František Kučera # Date 1601994258 -7200 # Node ID 0d362165241e2e6078e664e0897400bf7287cfcf # Parent 8ef1980db90793903a320809888a714a1638618f refactoring, syncrhonize code structure with JackHandler.cpp (relpipe-out-jack) diff -r 8ef1980db907 -r 0d362165241e src/JackCommand.h --- a/src/JackCommand.h Tue Sep 29 22:53:08 2020 +0200 +++ b/src/JackCommand.h Tue Oct 06 16:24:18 2020 +0200 @@ -16,21 +16,22 @@ */ #pragma once +#include +#include #include #include -#include +#include +#include #include #include -#include -#include #include -#include #include #include #include #include +#include #include #include #include @@ -38,10 +39,9 @@ #include #include +#include "Configuration.h" #include "JackException.h" -using namespace relpipe::writer; - namespace relpipe { namespace in { namespace jack { @@ -53,64 +53,77 @@ Configuration& configuration; std::wstring_convert> convertor; // TODO: local system encoding - jack_port_t* jackPort = nullptr; - jack_ringbuffer_t* ringBuffer = nullptr; - pthread_mutex_t messageThreadLock = PTHREAD_MUTEX_INITIALIZER; - pthread_cond_t dataReady = PTHREAD_COND_INITIALIZER; - std::atomic continueProcessing{true}; - const int RING_BUFFER_SIZE = 100; - + /** + * Is passed through the ring buffer + * from the the jack-writing thread (callback) to the relpipe-writing thread. + */ struct MidiMessage { - uint8_t buffer[4096]; + uint8_t buffer[4096] = {0}; uint32_t size; uint32_t time; }; -public: + /** + * JACK callbacks (called from the real-time thread) + */ + class RealTimeContext { + public: + jack_client_t* jackClient = nullptr; + jack_port_t* jackPort = nullptr; + jack_ringbuffer_t* ringBuffer = nullptr; - int enqueueMessage(jack_nframes_t frames) { - void* buffer = jack_port_get_buffer(jackPort, frames); - if (buffer == nullptr) throw JackException(L"Unable to get port buffer."); // TODO: exception in RT callback? + pthread_mutex_t processingLock = PTHREAD_MUTEX_INITIALIZER; + pthread_cond_t processingDone = PTHREAD_COND_INITIALIZER; + + const int RING_BUFFER_SIZE = 100; - for (jack_nframes_t i = 0, eventCount = jack_midi_get_event_count(buffer); i < eventCount; i++) { - jack_midi_event_t event; - int noData = jack_midi_event_get(&event, buffer, i); - if (noData) continue; + int processCallback(jack_nframes_t frames) { + void* buffer = jack_port_get_buffer(jackPort, frames); + if (buffer == nullptr) throw JackException(L"Unable to get port buffer."); // TODO: exception in RT callback? + + for (jack_nframes_t i = 0, eventCount = jack_midi_get_event_count(buffer); i < eventCount; i++) { + jack_midi_event_t event; + int noData = jack_midi_event_get(&event, buffer, i); + if (noData) continue; - if (event.size > sizeof (MidiMessage::buffer)) { - // TODO: should not printf in RT callback: - fwprintf(stderr, L"Error: MIDI message was too large → skipping event. Maximum allowed size: %lu bytes.\n", sizeof (MidiMessage::buffer)); - } else if (jack_ringbuffer_write_space(ringBuffer) >= sizeof (MidiMessage)) { - MidiMessage m; - m.time = event.time; - m.size = event.size; - memcpy(m.buffer, event.buffer, event.size); - jack_ringbuffer_write(ringBuffer, (const char *) &m, sizeof (MidiMessage)); - } else { - // TODO: should not printf in RT callback: - fwprintf(stderr, L"Error: ring buffer is full → skipping event.\n"); + if (event.size > sizeof (MidiMessage::buffer)) { + // TODO: should not printf in RT callback: + fwprintf(stderr, L"Error: MIDI message was too large → skipping event. Maximum allowed size: %lu bytes.\n", sizeof (MidiMessage::buffer)); + } else if (jack_ringbuffer_write_space(ringBuffer) >= sizeof (MidiMessage)) { + MidiMessage m; + m.time = event.time; + m.size = event.size; + memcpy(m.buffer, event.buffer, event.size); + jack_ringbuffer_write(ringBuffer, (const char *) &m, sizeof (MidiMessage)); + } else { + // TODO: should not printf in RT callback: + fwprintf(stderr, L"Error: ring buffer is full → skipping event.\n"); + } } + + // TODO: just count skipped events and bytes and report them in next successful message instead of printing to STDERR + + if (pthread_mutex_trylock(&processingLock) == 0) { + pthread_cond_signal(&processingDone); + pthread_mutex_unlock(&processingLock); + } + + return 0; } - // TODO: just count skipped events and bytes and report them in next successful message instead of printing to STDERR - - if (pthread_mutex_trylock(&messageThreadLock) == 0) { - pthread_cond_signal(&dataReady); - pthread_mutex_unlock(&messageThreadLock); + static int processCallback(jack_nframes_t frames, void* instance) { + return static_cast (instance)->processCallback(frames); } - return 0; - } - -private: + } realTimeContext; - static void writeRecord(std::shared_ptr writer, - string_t eventType, integer_t channel, - boolean_t noteOn, integer_t pitch, integer_t velocity, - integer_t controllerId, integer_t value, - string_t raw) { + static void writeRecord(std::shared_ptr writer, + relpipe::common::type::StringX eventType, relpipe::common::type::Integer channel, + relpipe::common::type::Boolean noteOn, relpipe::common::type::Integer pitch, relpipe::common::type::Integer velocity, + relpipe::common::type::Integer controllerId, relpipe::common::type::Integer value, + relpipe::common::type::StringX raw) { writer->writeAttribute(eventType); writer->writeAttribute(&channel, typeid (channel)); writer->writeAttribute(¬eOn, typeid (noteOn)); @@ -121,7 +134,7 @@ writer->writeAttribute(&raw, typeid (raw)); } - void processMessage(std::shared_ptr writer, MidiMessage* event) { + void processMessage(std::shared_ptr writer, MidiMessage* event) { if (event->size == 0) { return; } else { @@ -142,7 +155,7 @@ } } - string_t toHex(MidiMessage* event) { + relpipe::common::type::StringX toHex(MidiMessage* event) { std::wstringstream result; result << std::hex << std::setfill(L'0'); @@ -156,17 +169,60 @@ return result.str(); } + static void jackErrorCallback(const char * message) { + std::wstring_convert < std::codecvt_utf8> convertor; // TODO: local system encoding + std::wcerr << L"JACK: " << convertor.from_bytes(message) << std::endl; + } + + void finalize() { + // Close JACK connection: + jack_deactivate(realTimeContext.jackClient); + jack_client_close(realTimeContext.jackClient); + jack_ringbuffer_free(realTimeContext.ringBuffer); + pthread_mutex_unlock(&realTimeContext.processingLock); + } + + void failInConstructor(const relpipe::common::type::StringX& errorMessage) { + finalize(); + throw JackException(errorMessage); + } + + /** + * Wait for the signal that is emitted at the end of the real-time processCallback() cycle. + */ + void waitForRTCycle() { + pthread_cond_wait(&realTimeContext.processingDone, &realTimeContext.processingLock); + } + public: JackCommand(Configuration& configuration) : configuration(configuration) { + pthread_mutex_lock(&realTimeContext.processingLock); + + // Initialize JACK connection: + std::string clientName = convertor.to_bytes(configuration.jackClientName); + realTimeContext.jackClient = jack_client_open(clientName.c_str(), JackNullOption, nullptr); + if (realTimeContext.jackClient == nullptr) failInConstructor(L"Could not create JACK client."); + + realTimeContext.ringBuffer = jack_ringbuffer_create(realTimeContext.RING_BUFFER_SIZE * sizeof (MidiMessage)); + + jack_set_process_callback(realTimeContext.jackClient, RealTimeContext::processCallback, &realTimeContext); + // TODO: report also other events (connections etc.) + jack_set_error_function(jackErrorCallback); + jack_set_info_function(jackErrorCallback); + + realTimeContext.jackPort = jack_port_register(realTimeContext.jackClient, "input", JACK_DEFAULT_MIDI_TYPE, JackPortIsInput, 0); + if (realTimeContext.jackPort == nullptr) failInConstructor(L"Could not register the JACK port."); + + if (mlockall(MCL_CURRENT | MCL_FUTURE)) fwprintf(stderr, L"Warning: Can not lock memory.\n"); + + int jackError = jack_activate(realTimeContext.jackClient); + if (jackError) failInConstructor(L"Could not activate the JACK client."); } - void finish(int sig) { - continueProcessing = false; - } - - void processJackStream(std::shared_ptr writer, std::function relationalWriterFlush) { + void processJackStream(std::shared_ptr writer, std::function relationalWriterFlush) { // Relation headers: + using namespace relpipe::writer; vector metadata; metadata.push_back({L"event", TypeId::STRING}); metadata.push_back({L"channel", TypeId::INTEGER}); @@ -179,51 +235,28 @@ writer->startRelation(L"midi", metadata, true); relationalWriterFlush(); - // Initialize JACK connection: - std::string clientName = convertor.to_bytes(configuration.jackClientName); - jack_client_t* client = jack_client_open(clientName.c_str(), JackNullOption, nullptr); - if (client == nullptr) throw JackException(L"Could not create JACK client."); - - ringBuffer = jack_ringbuffer_create(RING_BUFFER_SIZE * sizeof (MidiMessage)); - - jack_set_process_callback(client, relpipe::in::jack::enqueueMessage, this); - // TODO: report also other events (connections etc.) - - jackPort = jack_port_register(client, "input", JACK_DEFAULT_MIDI_TYPE, JackPortIsInput, 0); - if (jackPort == nullptr) throw JackException(L"Could not register port."); - - if (mlockall(MCL_CURRENT | MCL_FUTURE)) fwprintf(stderr, L"Warning: Can not lock memory.\n"); - - int jackError = jack_activate(client); - if (jackError) throw JackException(L"Could not activate client."); - // Process messages from the ring buffer queue: - pthread_mutex_lock(&messageThreadLock); while (continueProcessing) { - const size_t queuedMessages = jack_ringbuffer_read_space(ringBuffer) / sizeof (MidiMessage); - for (size_t i = 0; i < queuedMessages; ++i) { + while (jack_ringbuffer_read_space(realTimeContext.ringBuffer) >= sizeof (MidiMessage)) { MidiMessage m; - jack_ringbuffer_read(ringBuffer, (char*) &m, sizeof (MidiMessage)); + jack_ringbuffer_read(realTimeContext.ringBuffer, (char*) &m, sizeof (MidiMessage)); processMessage(writer, &m); relationalWriterFlush(); } - pthread_cond_wait(&dataReady, &messageThreadLock); + waitForRTCycle(); } - pthread_mutex_unlock(&messageThreadLock); + } - // Close JACK connection: - jack_deactivate(client); - jack_client_close(client); - jack_ringbuffer_free(ringBuffer); + void finish(int sig) { + continueProcessing = false; + } + + virtual ~JackCommand() { + finalize(); } }; -int enqueueMessage(jack_nframes_t frames, void* arg) { - JackCommand* instance = (JackCommand*) arg; - return instance->enqueueMessage(frames); -} - } } }