diff -r c8c8ec34120f -r 001b956610ca src/JackCommand.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/JackCommand.h Mon May 18 18:04:12 2020 +0200 @@ -0,0 +1,201 @@ +/** + * Relational pipes + * Copyright © 2020 František Kučera (Frantovo.cz, GlobalCode.info) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "JackException.h" + +using namespace relpipe::writer; + +namespace relpipe { +namespace in { +namespace jack { + +int enqueueMessage(jack_nframes_t frames, void* arg); + +class JackCommand { +private: + jack_port_t* jackPort = nullptr; + jack_ringbuffer_t* ringBuffer = nullptr; + pthread_mutex_t messageThreadLock = PTHREAD_MUTEX_INITIALIZER; + pthread_cond_t dataReady = PTHREAD_COND_INITIALIZER; + + std::atomic continueProcessing{true}; + + const int RING_BUFFER_SIZE = 100; + + struct MidiMessage { + uint8_t buffer[4096]; + uint32_t size; + uint32_t time; + }; + +public: + + int enqueueMessage(jack_nframes_t frames) { + void* buffer = jack_port_get_buffer(jackPort, frames); + if (buffer == nullptr) throw JackException(L"Unable to get port buffer."); // TODO: exception in RT callback? + + for (jack_nframes_t i = 0, eventCount = jack_midi_get_event_count(buffer); i < eventCount; i++) { + jack_midi_event_t event; + int noData = jack_midi_event_get(&event, buffer, i); + if (noData) continue; + + if (event.size > sizeof (MidiMessage::buffer)) { + // TODO: should not printf in RT callback: + fwprintf(stderr, L"Error: MIDI message was too large → skipping event. Maximum allowed size: %lu bytes.\n", sizeof (MidiMessage::buffer)); + } else if (jack_ringbuffer_write_space(ringBuffer) >= sizeof (MidiMessage)) { + MidiMessage m; + m.time = event.time; + m.size = event.size; + memcpy(m.buffer, event.buffer, event.size); + jack_ringbuffer_write(ringBuffer, (const char *) &m, sizeof (MidiMessage)); + } else { + // TODO: should not printf in RT callback: + fwprintf(stderr, L"Error: ring buffer is full → skipping event.\n"); + } + } + + // TODO: just count skipped events and bytes and report them in next successful message instead of printing to STDERR + + if (pthread_mutex_trylock(&messageThreadLock) == 0) { + pthread_cond_signal(&dataReady); + pthread_mutex_unlock(&messageThreadLock); + } + + return 0; + } + +private: + + static void writeRecord(std::shared_ptr writer, + string_t eventType, integer_t channel, + boolean_t noteOn, integer_t pitch, integer_t velocity, + integer_t controllerId, integer_t value) { + writer->writeAttribute(eventType); + writer->writeAttribute(&channel, typeid (channel)); + writer->writeAttribute(¬eOn, typeid (noteOn)); + writer->writeAttribute(&pitch, typeid (pitch)); + writer->writeAttribute(&velocity, typeid (velocity)); + writer->writeAttribute(&controllerId, typeid (controllerId)); + writer->writeAttribute(&value, typeid (value)); + } + + void processMessage(std::shared_ptr writer, MidiMessage* event) { + if (event->size == 0) { + return; + } else { + uint8_t type = event->buffer[0] & 0xF0; + uint8_t channel = event->buffer[0] & 0x0F; + + // TODO: write timestamp, message number + // TODO: write raw buffer in hex + + if ((type == 0x90 || type == 0x80) && event->size == 3) { + writeRecord(writer, L"note", channel, type == 0x90, event->buffer[1], event->buffer[2], 0, 0); + } else if (type == 0xB0 && event->size == 3) { + writeRecord(writer, L"control", channel, false, 0, 0, event->buffer[1], event->buffer[2]); + } + } + } + +public: + + void finish(int sig) { + continueProcessing = false; + } + + void processJackStream(ostream &output) { + // Relation headers: + std::shared_ptr writer(Factory::create(output)); + vector metadata; + metadata.push_back({L"event", TypeId::STRING}); + metadata.push_back({L"channel", TypeId::INTEGER}); + metadata.push_back({L"note_on", TypeId::BOOLEAN}); + metadata.push_back({L"note_pitch", TypeId::INTEGER}); + metadata.push_back({L"note_velocity", TypeId::INTEGER}); + metadata.push_back({L"controller_id", TypeId::INTEGER}); + metadata.push_back({L"controller_value", TypeId::INTEGER}); + writer->startRelation(L"midi", metadata, true); + output.flush(); + + // Initialize JACK connection: + std::string clientName = "relpipe-in-jack"; + jack_client_t* client = jack_client_open(clientName.c_str(), JackNullOption, nullptr); + if (client == nullptr) throw JackException(L"Could not create JACK client."); + + ringBuffer = jack_ringbuffer_create(RING_BUFFER_SIZE * sizeof (MidiMessage)); + + jack_set_process_callback(client, relpipe::in::jack::enqueueMessage, this); + // TODO: report also other events (connections etc.) + + jackPort = jack_port_register(client, "input", JACK_DEFAULT_MIDI_TYPE, JackPortIsInput, 0); + if (jackPort == nullptr) throw JackException(L"Could not register port."); + + if (mlockall(MCL_CURRENT | MCL_FUTURE)) fwprintf(stderr, L"Warning: Can not lock memory.\n"); + + int jackError = jack_activate(client); + if (jackError) throw JackException(L"Could not activate client."); + + // Process messages from the ring buffer queue: + pthread_mutex_lock(&messageThreadLock); + while (continueProcessing) { + const size_t queuedMessages = jack_ringbuffer_read_space(ringBuffer) / sizeof (MidiMessage); + for (size_t i = 0; i < queuedMessages; ++i) { + MidiMessage m; + jack_ringbuffer_read(ringBuffer, (char*) &m, sizeof (MidiMessage)); + processMessage(writer, &m); + output.flush(); + } + pthread_cond_wait(&dataReady, &messageThreadLock); + } + pthread_mutex_unlock(&messageThreadLock); + + // Close JACK connection: + jack_deactivate(client); + jack_client_close(client); + jack_ringbuffer_free(ringBuffer); + } + +}; + +int enqueueMessage(jack_nframes_t frames, void* arg) { + JackCommand* instance = (JackCommand*) arg; + return instance->enqueueMessage(frames); +} + +} +} +}