refactoring, syncrhonize code structure with JackHandler.cpp (relpipe-out-jack) v_0
authorFrantišek Kučera <franta-hg@frantovo.cz>
Tue, 06 Oct 2020 16:24:18 +0200
branchv_0
changeset 9 0d362165241e
parent 8 8ef1980db907
child 10 ded44e94147c
refactoring, syncrhonize code structure with JackHandler.cpp (relpipe-out-jack)
src/JackCommand.h
--- a/src/JackCommand.h	Tue Sep 29 22:53:08 2020 +0200
+++ b/src/JackCommand.h	Tue Oct 06 16:24:18 2020 +0200
@@ -16,21 +16,22 @@
  */
 #pragma once
 
+#include <memory>
+#include <atomic>
 #include <cstdlib>
 #include <cstring>
-#include <memory>
+#include <sstream>
+#include <sys/mman.h>
 #include <unistd.h>
 #include <pthread.h>
-#include <sys/mman.h>
-#include <atomic>
 #include <functional>
-#include <sstream>
 #include <iomanip>
 
 #include <jack/jack.h>
 #include <jack/midiport.h>
 #include <jack/ringbuffer.h>
 
+#include <relpipe/common/type/typedefs.h>
 #include <relpipe/writer/RelationalWriter.h>
 #include <relpipe/writer/RelpipeWriterException.h>
 #include <relpipe/writer/AttributeMetadata.h>
@@ -38,10 +39,9 @@
 #include <relpipe/writer/TypeId.h>
 #include <relpipe/cli/CLI.h>
 
+#include "Configuration.h"
 #include "JackException.h"
 
-using namespace relpipe::writer;
-
 namespace relpipe {
 namespace in {
 namespace jack {
@@ -53,64 +53,77 @@
 	Configuration& configuration;
 	std::wstring_convert<std::codecvt_utf8<wchar_t>> convertor; // TODO: local system encoding
 
-	jack_port_t* jackPort = nullptr;
-	jack_ringbuffer_t* ringBuffer = nullptr;
-	pthread_mutex_t messageThreadLock = PTHREAD_MUTEX_INITIALIZER;
-	pthread_cond_t dataReady = PTHREAD_COND_INITIALIZER;
-
 	std::atomic<bool> continueProcessing{true};
 
-	const int RING_BUFFER_SIZE = 100;
-
+	/**
+	 * Is passed through the ring buffer
+	 * from the the jack-writing thread (callback) to the relpipe-writing thread.
+	 */
 	struct MidiMessage {
-		uint8_t buffer[4096];
+		uint8_t buffer[4096] = {0};
 		uint32_t size;
 		uint32_t time;
 	};
 
-public:
+	/**
+	 * JACK callbacks (called from the real-time thread)
+	 */
+	class RealTimeContext {
+	public:
+		jack_client_t* jackClient = nullptr;
+		jack_port_t* jackPort = nullptr;
+		jack_ringbuffer_t* ringBuffer = nullptr;
 
-	int enqueueMessage(jack_nframes_t frames) {
-		void* buffer = jack_port_get_buffer(jackPort, frames);
-		if (buffer == nullptr) throw JackException(L"Unable to get port buffer."); // TODO: exception in RT callback?
+		pthread_mutex_t processingLock = PTHREAD_MUTEX_INITIALIZER;
+		pthread_cond_t processingDone = PTHREAD_COND_INITIALIZER;
+
+		const int RING_BUFFER_SIZE = 100;
 
-		for (jack_nframes_t i = 0, eventCount = jack_midi_get_event_count(buffer); i < eventCount; i++) {
-			jack_midi_event_t event;
-			int noData = jack_midi_event_get(&event, buffer, i);
-			if (noData) continue;
+		int processCallback(jack_nframes_t frames) {
+			void* buffer = jack_port_get_buffer(jackPort, frames);
+			if (buffer == nullptr) throw JackException(L"Unable to get port buffer."); // TODO: exception in RT callback?
+
+			for (jack_nframes_t i = 0, eventCount = jack_midi_get_event_count(buffer); i < eventCount; i++) {
+				jack_midi_event_t event;
+				int noData = jack_midi_event_get(&event, buffer, i);
+				if (noData) continue;
 
-			if (event.size > sizeof (MidiMessage::buffer)) {
-				// TODO: should not printf in RT callback:
-				fwprintf(stderr, L"Error: MIDI message was too large → skipping event. Maximum allowed size: %lu bytes.\n", sizeof (MidiMessage::buffer));
-			} else if (jack_ringbuffer_write_space(ringBuffer) >= sizeof (MidiMessage)) {
-				MidiMessage m;
-				m.time = event.time;
-				m.size = event.size;
-				memcpy(m.buffer, event.buffer, event.size);
-				jack_ringbuffer_write(ringBuffer, (const char *) &m, sizeof (MidiMessage));
-			} else {
-				// TODO: should not printf in RT callback:
-				fwprintf(stderr, L"Error: ring buffer is full → skipping event.\n");
+				if (event.size > sizeof (MidiMessage::buffer)) {
+					// TODO: should not printf in RT callback:
+					fwprintf(stderr, L"Error: MIDI message was too large → skipping event. Maximum allowed size: %lu bytes.\n", sizeof (MidiMessage::buffer));
+				} else if (jack_ringbuffer_write_space(ringBuffer) >= sizeof (MidiMessage)) {
+					MidiMessage m;
+					m.time = event.time;
+					m.size = event.size;
+					memcpy(m.buffer, event.buffer, event.size);
+					jack_ringbuffer_write(ringBuffer, (const char *) &m, sizeof (MidiMessage));
+				} else {
+					// TODO: should not printf in RT callback:
+					fwprintf(stderr, L"Error: ring buffer is full → skipping event.\n");
+				}
 			}
+
+			// TODO: just count skipped events and bytes and report them in next successful message instead of printing to STDERR
+
+			if (pthread_mutex_trylock(&processingLock) == 0) {
+				pthread_cond_signal(&processingDone);
+				pthread_mutex_unlock(&processingLock);
+			}
+
+			return 0;
 		}
 
-		// TODO: just count skipped events and bytes and report them in next successful message instead of printing to STDERR
-
-		if (pthread_mutex_trylock(&messageThreadLock) == 0) {
-			pthread_cond_signal(&dataReady);
-			pthread_mutex_unlock(&messageThreadLock);
+		static int processCallback(jack_nframes_t frames, void* instance) {
+			return static_cast<RealTimeContext*> (instance)->processCallback(frames);
 		}
 
-		return 0;
-	}
-
-private:
+	} realTimeContext;
 
-	static void writeRecord(std::shared_ptr<RelationalWriter> writer,
-			string_t eventType, integer_t channel,
-			boolean_t noteOn, integer_t pitch, integer_t velocity,
-			integer_t controllerId, integer_t value,
-			string_t raw) {
+	static void writeRecord(std::shared_ptr<relpipe::writer::RelationalWriter> writer,
+			relpipe::common::type::StringX eventType, relpipe::common::type::Integer channel,
+			relpipe::common::type::Boolean noteOn, relpipe::common::type::Integer pitch, relpipe::common::type::Integer velocity,
+			relpipe::common::type::Integer controllerId, relpipe::common::type::Integer value,
+			relpipe::common::type::StringX raw) {
 		writer->writeAttribute(eventType);
 		writer->writeAttribute(&channel, typeid (channel));
 		writer->writeAttribute(&noteOn, typeid (noteOn));
@@ -121,7 +134,7 @@
 		writer->writeAttribute(&raw, typeid (raw));
 	}
 
-	void processMessage(std::shared_ptr<RelationalWriter> writer, MidiMessage* event) {
+	void processMessage(std::shared_ptr<relpipe::writer::RelationalWriter> writer, MidiMessage* event) {
 		if (event->size == 0) {
 			return;
 		} else {
@@ -142,7 +155,7 @@
 		}
 	}
 
-	string_t toHex(MidiMessage* event) {
+	relpipe::common::type::StringX toHex(MidiMessage* event) {
 		std::wstringstream result;
 		result << std::hex << std::setfill(L'0');
 
@@ -156,17 +169,60 @@
 		return result.str();
 	}
 
+	static void jackErrorCallback(const char * message) {
+		std::wstring_convert < std::codecvt_utf8<wchar_t>> convertor; // TODO: local system encoding
+		std::wcerr << L"JACK: " << convertor.from_bytes(message) << std::endl;
+	}
+
+	void finalize() {
+		// Close JACK connection:
+		jack_deactivate(realTimeContext.jackClient);
+		jack_client_close(realTimeContext.jackClient);
+		jack_ringbuffer_free(realTimeContext.ringBuffer);
+		pthread_mutex_unlock(&realTimeContext.processingLock);
+	}
+
+	void failInConstructor(const relpipe::common::type::StringX& errorMessage) {
+		finalize();
+		throw JackException(errorMessage);
+	}
+
+	/**
+	 * Wait for the signal that is emitted at the end of the real-time processCallback() cycle.
+	 */
+	void waitForRTCycle() {
+		pthread_cond_wait(&realTimeContext.processingDone, &realTimeContext.processingLock);
+	}
+
 public:
 
 	JackCommand(Configuration& configuration) : configuration(configuration) {
+		pthread_mutex_lock(&realTimeContext.processingLock);
+
+		// Initialize JACK connection:
+		std::string clientName = convertor.to_bytes(configuration.jackClientName);
+		realTimeContext.jackClient = jack_client_open(clientName.c_str(), JackNullOption, nullptr);
+		if (realTimeContext.jackClient == nullptr) failInConstructor(L"Could not create JACK client.");
+
+		realTimeContext.ringBuffer = jack_ringbuffer_create(realTimeContext.RING_BUFFER_SIZE * sizeof (MidiMessage));
+
+		jack_set_process_callback(realTimeContext.jackClient, RealTimeContext::processCallback, &realTimeContext);
+		// TODO: report also other events (connections etc.)
+		jack_set_error_function(jackErrorCallback);
+		jack_set_info_function(jackErrorCallback);
+
+		realTimeContext.jackPort = jack_port_register(realTimeContext.jackClient, "input", JACK_DEFAULT_MIDI_TYPE, JackPortIsInput, 0);
+		if (realTimeContext.jackPort == nullptr) failInConstructor(L"Could not register the JACK port.");
+
+		if (mlockall(MCL_CURRENT | MCL_FUTURE)) fwprintf(stderr, L"Warning: Can not lock memory.\n");
+
+		int jackError = jack_activate(realTimeContext.jackClient);
+		if (jackError) failInConstructor(L"Could not activate the JACK client.");
 	}
 
-	void finish(int sig) {
-		continueProcessing = false;
-	}
-
-	void processJackStream(std::shared_ptr<writer::RelationalWriter> writer, std::function<void() > relationalWriterFlush) {
+	void processJackStream(std::shared_ptr<relpipe::writer::RelationalWriter> writer, std::function<void() > relationalWriterFlush) {
 		// Relation headers:
+		using namespace relpipe::writer;
 		vector<AttributeMetadata> metadata;
 		metadata.push_back({L"event", TypeId::STRING});
 		metadata.push_back({L"channel", TypeId::INTEGER});
@@ -179,51 +235,28 @@
 		writer->startRelation(L"midi", metadata, true);
 		relationalWriterFlush();
 
-		// Initialize JACK connection:
-		std::string clientName = convertor.to_bytes(configuration.jackClientName);
-		jack_client_t* client = jack_client_open(clientName.c_str(), JackNullOption, nullptr);
-		if (client == nullptr) throw JackException(L"Could not create JACK client.");
-
-		ringBuffer = jack_ringbuffer_create(RING_BUFFER_SIZE * sizeof (MidiMessage));
-
-		jack_set_process_callback(client, relpipe::in::jack::enqueueMessage, this);
-		// TODO: report also other events (connections etc.)
-
-		jackPort = jack_port_register(client, "input", JACK_DEFAULT_MIDI_TYPE, JackPortIsInput, 0);
-		if (jackPort == nullptr) throw JackException(L"Could not register port.");
-
-		if (mlockall(MCL_CURRENT | MCL_FUTURE)) fwprintf(stderr, L"Warning: Can not lock memory.\n");
-
-		int jackError = jack_activate(client);
-		if (jackError) throw JackException(L"Could not activate client.");
-
 		// Process messages from the ring buffer queue:
-		pthread_mutex_lock(&messageThreadLock);
 		while (continueProcessing) {
-			const size_t queuedMessages = jack_ringbuffer_read_space(ringBuffer) / sizeof (MidiMessage);
-			for (size_t i = 0; i < queuedMessages; ++i) {
+			while (jack_ringbuffer_read_space(realTimeContext.ringBuffer) >= sizeof (MidiMessage)) {
 				MidiMessage m;
-				jack_ringbuffer_read(ringBuffer, (char*) &m, sizeof (MidiMessage));
+				jack_ringbuffer_read(realTimeContext.ringBuffer, (char*) &m, sizeof (MidiMessage));
 				processMessage(writer, &m);
 				relationalWriterFlush();
 			}
-			pthread_cond_wait(&dataReady, &messageThreadLock);
+			waitForRTCycle();
 		}
-		pthread_mutex_unlock(&messageThreadLock);
+	}
 
-		// Close JACK connection:
-		jack_deactivate(client);
-		jack_client_close(client);
-		jack_ringbuffer_free(ringBuffer);
+	void finish(int sig) {
+		continueProcessing = false;
+	}
+
+	virtual ~JackCommand() {
+		finalize();
 	}
 
 };
 
-int enqueueMessage(jack_nframes_t frames, void* arg) {
-	JackCommand* instance = (JackCommand*) arg;
-	return instance->enqueueMessage(frames);
-}
-
 }
 }
 }