# HG changeset patch # User František Kučera # Date 1589817852 -7200 # Node ID 001b956610ca88dd85d38a39a556b10f7565b30f # Parent c8c8ec34120fd3f18996b4518fe6295dcd116f3e separate JackCommand class diff -r c8c8ec34120f -r 001b956610ca src/JackCommand.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/JackCommand.h Mon May 18 18:04:12 2020 +0200 @@ -0,0 +1,201 @@ +/** + * Relational pipes + * Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "JackException.h" + +using namespace relpipe::writer; + +namespace relpipe { +namespace in { +namespace jack { + +int enqueueMessage(jack_nframes_t frames, void* arg); + +class JackCommand { +private: + jack_port_t* jackPort = nullptr; + jack_ringbuffer_t* ringBuffer = nullptr; + pthread_mutex_t messageThreadLock = PTHREAD_MUTEX_INITIALIZER; + pthread_cond_t dataReady = PTHREAD_COND_INITIALIZER; + + std::atomic continueProcessing{true}; + + const int RING_BUFFER_SIZE = 100; + + struct MidiMessage { + uint8_t buffer[4096]; + uint32_t size; + uint32_t time; + }; + +public: + + int enqueueMessage(jack_nframes_t frames) { + void* buffer = jack_port_get_buffer(jackPort, frames); + if (buffer == nullptr) throw JackException(L"Unable to get port buffer."); // TODO: exception in RT callback? + + for (jack_nframes_t i = 0, eventCount = jack_midi_get_event_count(buffer); i < eventCount; i++) { + jack_midi_event_t event; + int noData = jack_midi_event_get(&event, buffer, i); + if (noData) continue; + + if (event.size > sizeof (MidiMessage::buffer)) { + // TODO: should not printf in RT callback: + fwprintf(stderr, L"Error: MIDI message was too large → skipping event. Maximum allowed size: %lu bytes.\n", sizeof (MidiMessage::buffer)); + } else if (jack_ringbuffer_write_space(ringBuffer) >= sizeof (MidiMessage)) { + MidiMessage m; + m.time = event.time; + m.size = event.size; + memcpy(m.buffer, event.buffer, event.size); + jack_ringbuffer_write(ringBuffer, (const char *) &m, sizeof (MidiMessage)); + } else { + // TODO: should not printf in RT callback: + fwprintf(stderr, L"Error: ring buffer is full → skipping event.\n"); + } + } + + // TODO: just count skipped events and bytes and report them in next successful message instead of printing to STDERR + + if (pthread_mutex_trylock(&messageThreadLock) == 0) { + pthread_cond_signal(&dataReady); + pthread_mutex_unlock(&messageThreadLock); + } + + return 0; + } + +private: + + static void writeRecord(std::shared_ptr writer, + string_t eventType, integer_t channel, + boolean_t noteOn, integer_t pitch, integer_t velocity, + integer_t controllerId, integer_t value) { + writer->writeAttribute(eventType); + writer->writeAttribute(&channel, typeid (channel)); + writer->writeAttribute(¬eOn, typeid (noteOn)); + writer->writeAttribute(&pitch, typeid (pitch)); + writer->writeAttribute(&velocity, typeid (velocity)); + writer->writeAttribute(&controllerId, typeid (controllerId)); + writer->writeAttribute(&value, typeid (value)); + } + + void processMessage(std::shared_ptr writer, MidiMessage* event) { + if (event->size == 0) { + return; + } else { + uint8_t type = event->buffer[0] & 0xF0; + uint8_t channel = event->buffer[0] & 0x0F; + + // TODO: write timestamp, message number + // TODO: write raw buffer in hex + + if ((type == 0x90 || type == 0x80) && event->size == 3) { + writeRecord(writer, L"note", channel, type == 0x90, event->buffer[1], event->buffer[2], 0, 0); + } else if (type == 0xB0 && event->size == 3) { + writeRecord(writer, L"control", channel, false, 0, 0, event->buffer[1], event->buffer[2]); + } + } + } + +public: + + void finish(int sig) { + continueProcessing = false; + } + + void processJackStream(ostream &output) { + // Relation headers: + std::shared_ptr writer(Factory::create(output)); + vector metadata; + metadata.push_back({L"event", TypeId::STRING}); + metadata.push_back({L"channel", TypeId::INTEGER}); + metadata.push_back({L"note_on", TypeId::BOOLEAN}); + metadata.push_back({L"note_pitch", TypeId::INTEGER}); + metadata.push_back({L"note_velocity", TypeId::INTEGER}); + metadata.push_back({L"controller_id", TypeId::INTEGER}); + metadata.push_back({L"controller_value", TypeId::INTEGER}); + writer->startRelation(L"midi", metadata, true); + output.flush(); + + // Initialize JACK connection: + std::string clientName = "relpipe-in-jack"; + jack_client_t* client = jack_client_open(clientName.c_str(), JackNullOption, nullptr); + if (client == nullptr) throw JackException(L"Could not create JACK client."); + + ringBuffer = jack_ringbuffer_create(RING_BUFFER_SIZE * sizeof (MidiMessage)); + + jack_set_process_callback(client, relpipe::in::jack::enqueueMessage, this); + // TODO: report also other events (connections etc.) + + jackPort = jack_port_register(client, "input", JACK_DEFAULT_MIDI_TYPE, JackPortIsInput, 0); + if (jackPort == nullptr) throw JackException(L"Could not register port."); + + if (mlockall(MCL_CURRENT | MCL_FUTURE)) fwprintf(stderr, L"Warning: Can not lock memory.\n"); + + int jackError = jack_activate(client); + if (jackError) throw JackException(L"Could not activate client."); + + // Process messages from the ring buffer queue: + pthread_mutex_lock(&messageThreadLock); + while (continueProcessing) { + const size_t queuedMessages = jack_ringbuffer_read_space(ringBuffer) / sizeof (MidiMessage); + for (size_t i = 0; i < queuedMessages; ++i) { + MidiMessage m; + jack_ringbuffer_read(ringBuffer, (char*) &m, sizeof (MidiMessage)); + processMessage(writer, &m); + output.flush(); + } + pthread_cond_wait(&dataReady, &messageThreadLock); + } + pthread_mutex_unlock(&messageThreadLock); + + // Close JACK connection: + jack_deactivate(client); + jack_client_close(client); + jack_ringbuffer_free(ringBuffer); + } + +}; + +int enqueueMessage(jack_nframes_t frames, void* arg) { + JackCommand* instance = (JackCommand*) arg; + return instance->enqueueMessage(frames); +} + +} +} +} diff -r c8c8ec34120f -r 001b956610ca src/relpipe-in-jack.cpp --- a/src/relpipe-in-jack.cpp Mon May 11 20:50:54 2020 +0200 +++ b/src/relpipe-in-jack.cpp Mon May 18 18:04:12 2020 +0200 @@ -14,169 +14,24 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ -#include -#include #include -#include -#include -#include -#include +#include -#include -#include -#include - -#include #include -#include #include -#include #include #include "JackException.h" +#include "JackCommand.h" using namespace relpipe::cli; using namespace relpipe::writer; using namespace relpipe::in::jack; -static jack_port_t* jackPort = nullptr; -static jack_ringbuffer_t* ringBuffer = nullptr; -static pthread_mutex_t messageThreadLock = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t dataReady = PTHREAD_COND_INITIALIZER; - -static bool continueProcessing = true; - -const int RING_BUFFER_SIZE = 100; - -struct MidiMessage { - uint8_t buffer[4096]; - uint32_t size; - uint32_t time; -}; - -int enqueueMessage(jack_nframes_t frames, void* arg) { - void* buffer = jack_port_get_buffer(jackPort, frames); - if (buffer == nullptr) throw JackException(L"Unable to get port buffer."); // TODO: exception in RT callback? - - for (jack_nframes_t i = 0, eventCount = jack_midi_get_event_count(buffer); i < eventCount; i++) { - jack_midi_event_t event; - int noData = jack_midi_event_get(&event, buffer, i); - if (noData) continue; - - if (event.size > sizeof (MidiMessage::buffer)) { - // TODO: should not printf in RT callback: - fwprintf(stderr, L"Error: MIDI message was too large → skipping event. Maximum allowed size: %lu bytes.\n", sizeof (MidiMessage::buffer)); - } else if (jack_ringbuffer_write_space(ringBuffer) >= sizeof (MidiMessage)) { - MidiMessage m; - m.time = event.time; - m.size = event.size; - memcpy(m.buffer, event.buffer, event.size); - jack_ringbuffer_write(ringBuffer, (const char *) &m, sizeof (MidiMessage)); - } else { - // TODO: should not printf in RT callback: - fwprintf(stderr, L"Error: ring buffer is full → skipping event.\n"); - } - } - - // TODO: just count skipped events and bytes and report them in next successful message instead of printing to STDERR - - if (pthread_mutex_trylock(&messageThreadLock) == 0) { - pthread_cond_signal(&dataReady); - pthread_mutex_unlock(&messageThreadLock); - } - - return 0; -} - -static void writeRecord(std::shared_ptr writer, - string_t eventType, integer_t channel, - boolean_t noteOn, integer_t pitch, integer_t velocity, - integer_t controllerId, integer_t value) { - writer->writeAttribute(eventType); - writer->writeAttribute(&channel, typeid (channel)); - writer->writeAttribute(¬eOn, typeid (noteOn)); - writer->writeAttribute(&pitch, typeid (pitch)); - writer->writeAttribute(&velocity, typeid (velocity)); - writer->writeAttribute(&controllerId, typeid (controllerId)); - writer->writeAttribute(&value, typeid (value)); -} - -static void processMessage(std::shared_ptr writer, MidiMessage* event) { - if (event->size == 0) { - return; - } else { - uint8_t type = event->buffer[0] & 0xF0; - uint8_t channel = event->buffer[0] & 0x0F; +static std::shared_ptr jackCommand = nullptr; - // TODO: write timestamp, message number - // TODO: write raw buffer in hex - - if ((type == 0x90 || type == 0x80) && event->size == 3) { - writeRecord(writer, L"note", channel, type == 0x90, event->buffer[1], event->buffer[2], 0, 0); - } else if (type == 0xB0 && event->size == 3) { - writeRecord(writer, L"control", channel, false, 0, 0, event->buffer[1], event->buffer[2]); - } - } -} - -static void finish(int sig) { - continueProcessing = false; -} - -void processJackStream(ostream &output) { - // Relation headers: - std::shared_ptr writer(Factory::create(output)); - vector metadata; - metadata.push_back({L"event", TypeId::STRING}); - metadata.push_back({L"channel", TypeId::INTEGER}); - metadata.push_back({L"note_on", TypeId::BOOLEAN}); - metadata.push_back({L"note_pitch", TypeId::INTEGER}); - metadata.push_back({L"note_velocity", TypeId::INTEGER}); - metadata.push_back({L"controller_id", TypeId::INTEGER}); - metadata.push_back({L"controller_value", TypeId::INTEGER}); - writer->startRelation(L"midi", metadata, true); - output.flush(); - - // Initialize JACK connection: - std::string clientName = "relpipe-in-jack"; - jack_client_t* client = jack_client_open(clientName.c_str(), JackNullOption, nullptr); - if (client == nullptr) throw JackException(L"Could not create JACK client."); - - ringBuffer = jack_ringbuffer_create(RING_BUFFER_SIZE * sizeof (MidiMessage)); - - jack_set_process_callback(client, enqueueMessage, nullptr); - // TODO: report also other events (connections etc.) - - jackPort = jack_port_register(client, "input", JACK_DEFAULT_MIDI_TYPE, JackPortIsInput, 0); - if (jackPort == nullptr) throw JackException(L"Could not register port."); - - if (mlockall(MCL_CURRENT | MCL_FUTURE)) fwprintf(stderr, L"Warning: Can not lock memory.\n"); - - int jackError = jack_activate(client); - if (jackError) throw JackException(L"Could not activate client."); - - signal(SIGHUP, finish); - signal(SIGINT, finish); - - - // Process messages from the ring buffer queue: - pthread_mutex_lock(&messageThreadLock); - while (continueProcessing) { - const size_t queuedMessages = jack_ringbuffer_read_space(ringBuffer) / sizeof (MidiMessage); - for (size_t i = 0; i < queuedMessages; ++i) { - MidiMessage m; - jack_ringbuffer_read(ringBuffer, (char*) &m, sizeof (MidiMessage)); - processMessage(writer, &m); - output.flush(); - } - pthread_cond_wait(&dataReady, &messageThreadLock); - } - pthread_mutex_unlock(&messageThreadLock); - - // Close JACK connection: - jack_deactivate(client); - jack_client_close(client); - jack_ringbuffer_free(ringBuffer); +void finish(int sig) { + if (jackCommand) jackCommand->finish(sig); } int main(int argc, char** argv) { @@ -184,12 +39,13 @@ CLI::untieStdIO(); CLI cli(argc, argv); // TODO: options, CLI parsing, configurable attributes - // TODO: separate handler class - int resultCode = CLI::EXIT_CODE_UNEXPECTED_ERROR; try { - processJackStream(cout); + signal(SIGHUP, finish); + signal(SIGINT, finish); + jackCommand.reset(new JackCommand()); + jackCommand->processJackStream(cout); resultCode = CLI::EXIT_CODE_SUCCESS; } catch (JackException e) { fwprintf(stderr, L"Caught JACK exception: %ls\n", e.getMessge().c_str());