src/JackCommand.h
branchv_0
changeset 9 0d362165241e
parent 8 8ef1980db907
child 10 ded44e94147c
equal deleted inserted replaced
8:8ef1980db907 9:0d362165241e
    14  * You should have received a copy of the GNU General Public License
    14  * You should have received a copy of the GNU General Public License
    15  * along with this program. If not, see <http://www.gnu.org/licenses/>.
    15  * along with this program. If not, see <http://www.gnu.org/licenses/>.
    16  */
    16  */
    17 #pragma once
    17 #pragma once
    18 
    18 
       
    19 #include <memory>
       
    20 #include <atomic>
    19 #include <cstdlib>
    21 #include <cstdlib>
    20 #include <cstring>
    22 #include <cstring>
    21 #include <memory>
    23 #include <sstream>
       
    24 #include <sys/mman.h>
    22 #include <unistd.h>
    25 #include <unistd.h>
    23 #include <pthread.h>
    26 #include <pthread.h>
    24 #include <sys/mman.h>
       
    25 #include <atomic>
       
    26 #include <functional>
    27 #include <functional>
    27 #include <sstream>
       
    28 #include <iomanip>
    28 #include <iomanip>
    29 
    29 
    30 #include <jack/jack.h>
    30 #include <jack/jack.h>
    31 #include <jack/midiport.h>
    31 #include <jack/midiport.h>
    32 #include <jack/ringbuffer.h>
    32 #include <jack/ringbuffer.h>
    33 
    33 
       
    34 #include <relpipe/common/type/typedefs.h>
    34 #include <relpipe/writer/RelationalWriter.h>
    35 #include <relpipe/writer/RelationalWriter.h>
    35 #include <relpipe/writer/RelpipeWriterException.h>
    36 #include <relpipe/writer/RelpipeWriterException.h>
    36 #include <relpipe/writer/AttributeMetadata.h>
    37 #include <relpipe/writer/AttributeMetadata.h>
    37 #include <relpipe/writer/Factory.h>
    38 #include <relpipe/writer/Factory.h>
    38 #include <relpipe/writer/TypeId.h>
    39 #include <relpipe/writer/TypeId.h>
    39 #include <relpipe/cli/CLI.h>
    40 #include <relpipe/cli/CLI.h>
    40 
    41 
       
    42 #include "Configuration.h"
    41 #include "JackException.h"
    43 #include "JackException.h"
    42 
       
    43 using namespace relpipe::writer;
       
    44 
    44 
    45 namespace relpipe {
    45 namespace relpipe {
    46 namespace in {
    46 namespace in {
    47 namespace jack {
    47 namespace jack {
    48 
    48 
    51 class JackCommand {
    51 class JackCommand {
    52 private:
    52 private:
    53 	Configuration& configuration;
    53 	Configuration& configuration;
    54 	std::wstring_convert<std::codecvt_utf8<wchar_t>> convertor; // TODO: local system encoding
    54 	std::wstring_convert<std::codecvt_utf8<wchar_t>> convertor; // TODO: local system encoding
    55 
    55 
    56 	jack_port_t* jackPort = nullptr;
       
    57 	jack_ringbuffer_t* ringBuffer = nullptr;
       
    58 	pthread_mutex_t messageThreadLock = PTHREAD_MUTEX_INITIALIZER;
       
    59 	pthread_cond_t dataReady = PTHREAD_COND_INITIALIZER;
       
    60 
       
    61 	std::atomic<bool> continueProcessing{true};
    56 	std::atomic<bool> continueProcessing{true};
    62 
    57 
    63 	const int RING_BUFFER_SIZE = 100;
    58 	/**
    64 
    59 	 * Is passed through the ring buffer
       
    60 	 * from the the jack-writing thread (callback) to the relpipe-writing thread.
       
    61 	 */
    65 	struct MidiMessage {
    62 	struct MidiMessage {
    66 		uint8_t buffer[4096];
    63 		uint8_t buffer[4096] = {0};
    67 		uint32_t size;
    64 		uint32_t size;
    68 		uint32_t time;
    65 		uint32_t time;
    69 	};
    66 	};
    70 
    67 
    71 public:
    68 	/**
    72 
    69 	 * JACK callbacks (called from the real-time thread)
    73 	int enqueueMessage(jack_nframes_t frames) {
    70 	 */
    74 		void* buffer = jack_port_get_buffer(jackPort, frames);
    71 	class RealTimeContext {
    75 		if (buffer == nullptr) throw JackException(L"Unable to get port buffer."); // TODO: exception in RT callback?
    72 	public:
    76 
    73 		jack_client_t* jackClient = nullptr;
    77 		for (jack_nframes_t i = 0, eventCount = jack_midi_get_event_count(buffer); i < eventCount; i++) {
    74 		jack_port_t* jackPort = nullptr;
    78 			jack_midi_event_t event;
    75 		jack_ringbuffer_t* ringBuffer = nullptr;
    79 			int noData = jack_midi_event_get(&event, buffer, i);
    76 
    80 			if (noData) continue;
    77 		pthread_mutex_t processingLock = PTHREAD_MUTEX_INITIALIZER;
    81 
    78 		pthread_cond_t processingDone = PTHREAD_COND_INITIALIZER;
    82 			if (event.size > sizeof (MidiMessage::buffer)) {
    79 
    83 				// TODO: should not printf in RT callback:
    80 		const int RING_BUFFER_SIZE = 100;
    84 				fwprintf(stderr, L"Error: MIDI message was too large → skipping event. Maximum allowed size: %lu bytes.\n", sizeof (MidiMessage::buffer));
    81 
    85 			} else if (jack_ringbuffer_write_space(ringBuffer) >= sizeof (MidiMessage)) {
    82 		int processCallback(jack_nframes_t frames) {
    86 				MidiMessage m;
    83 			void* buffer = jack_port_get_buffer(jackPort, frames);
    87 				m.time = event.time;
    84 			if (buffer == nullptr) throw JackException(L"Unable to get port buffer."); // TODO: exception in RT callback?
    88 				m.size = event.size;
    85 
    89 				memcpy(m.buffer, event.buffer, event.size);
    86 			for (jack_nframes_t i = 0, eventCount = jack_midi_get_event_count(buffer); i < eventCount; i++) {
    90 				jack_ringbuffer_write(ringBuffer, (const char *) &m, sizeof (MidiMessage));
    87 				jack_midi_event_t event;
    91 			} else {
    88 				int noData = jack_midi_event_get(&event, buffer, i);
    92 				// TODO: should not printf in RT callback:
    89 				if (noData) continue;
    93 				fwprintf(stderr, L"Error: ring buffer is full → skipping event.\n");
    90 
    94 			}
    91 				if (event.size > sizeof (MidiMessage::buffer)) {
    95 		}
    92 					// TODO: should not printf in RT callback:
    96 
    93 					fwprintf(stderr, L"Error: MIDI message was too large → skipping event. Maximum allowed size: %lu bytes.\n", sizeof (MidiMessage::buffer));
    97 		// TODO: just count skipped events and bytes and report them in next successful message instead of printing to STDERR
    94 				} else if (jack_ringbuffer_write_space(ringBuffer) >= sizeof (MidiMessage)) {
    98 
    95 					MidiMessage m;
    99 		if (pthread_mutex_trylock(&messageThreadLock) == 0) {
    96 					m.time = event.time;
   100 			pthread_cond_signal(&dataReady);
    97 					m.size = event.size;
   101 			pthread_mutex_unlock(&messageThreadLock);
    98 					memcpy(m.buffer, event.buffer, event.size);
   102 		}
    99 					jack_ringbuffer_write(ringBuffer, (const char *) &m, sizeof (MidiMessage));
   103 
   100 				} else {
   104 		return 0;
   101 					// TODO: should not printf in RT callback:
   105 	}
   102 					fwprintf(stderr, L"Error: ring buffer is full → skipping event.\n");
   106 
   103 				}
   107 private:
   104 			}
   108 
   105 
   109 	static void writeRecord(std::shared_ptr<RelationalWriter> writer,
   106 			// TODO: just count skipped events and bytes and report them in next successful message instead of printing to STDERR
   110 			string_t eventType, integer_t channel,
   107 
   111 			boolean_t noteOn, integer_t pitch, integer_t velocity,
   108 			if (pthread_mutex_trylock(&processingLock) == 0) {
   112 			integer_t controllerId, integer_t value,
   109 				pthread_cond_signal(&processingDone);
   113 			string_t raw) {
   110 				pthread_mutex_unlock(&processingLock);
       
   111 			}
       
   112 
       
   113 			return 0;
       
   114 		}
       
   115 
       
   116 		static int processCallback(jack_nframes_t frames, void* instance) {
       
   117 			return static_cast<RealTimeContext*> (instance)->processCallback(frames);
       
   118 		}
       
   119 
       
   120 	} realTimeContext;
       
   121 
       
   122 	static void writeRecord(std::shared_ptr<relpipe::writer::RelationalWriter> writer,
       
   123 			relpipe::common::type::StringX eventType, relpipe::common::type::Integer channel,
       
   124 			relpipe::common::type::Boolean noteOn, relpipe::common::type::Integer pitch, relpipe::common::type::Integer velocity,
       
   125 			relpipe::common::type::Integer controllerId, relpipe::common::type::Integer value,
       
   126 			relpipe::common::type::StringX raw) {
   114 		writer->writeAttribute(eventType);
   127 		writer->writeAttribute(eventType);
   115 		writer->writeAttribute(&channel, typeid (channel));
   128 		writer->writeAttribute(&channel, typeid (channel));
   116 		writer->writeAttribute(&noteOn, typeid (noteOn));
   129 		writer->writeAttribute(&noteOn, typeid (noteOn));
   117 		writer->writeAttribute(&pitch, typeid (pitch));
   130 		writer->writeAttribute(&pitch, typeid (pitch));
   118 		writer->writeAttribute(&velocity, typeid (velocity));
   131 		writer->writeAttribute(&velocity, typeid (velocity));
   119 		writer->writeAttribute(&controllerId, typeid (controllerId));
   132 		writer->writeAttribute(&controllerId, typeid (controllerId));
   120 		writer->writeAttribute(&value, typeid (value));
   133 		writer->writeAttribute(&value, typeid (value));
   121 		writer->writeAttribute(&raw, typeid (raw));
   134 		writer->writeAttribute(&raw, typeid (raw));
   122 	}
   135 	}
   123 
   136 
   124 	void processMessage(std::shared_ptr<RelationalWriter> writer, MidiMessage* event) {
   137 	void processMessage(std::shared_ptr<relpipe::writer::RelationalWriter> writer, MidiMessage* event) {
   125 		if (event->size == 0) {
   138 		if (event->size == 0) {
   126 			return;
   139 			return;
   127 		} else {
   140 		} else {
   128 			uint8_t type = event->buffer[0] & 0xF0;
   141 			uint8_t type = event->buffer[0] & 0xF0;
   129 			uint8_t channel = event->buffer[0] & 0x0F;
   142 			uint8_t channel = event->buffer[0] & 0x0F;
   140 				writeRecord(writer, L"unknown", channel, false, 0, 0, 0, 0, toHex(event));
   153 				writeRecord(writer, L"unknown", channel, false, 0, 0, 0, 0, toHex(event));
   141 			}
   154 			}
   142 		}
   155 		}
   143 	}
   156 	}
   144 
   157 
   145 	string_t toHex(MidiMessage* event) {
   158 	relpipe::common::type::StringX toHex(MidiMessage* event) {
   146 		std::wstringstream result;
   159 		std::wstringstream result;
   147 		result << std::hex << std::setfill(L'0');
   160 		result << std::hex << std::setfill(L'0');
   148 
   161 
   149 		for (size_t i = 0; i < event->size && i < sizeof (event->buffer); i++) {
   162 		for (size_t i = 0; i < event->size && i < sizeof (event->buffer); i++) {
   150 			if (i > 0) result << L' ';
   163 			if (i > 0) result << L' ';
   154 		}
   167 		}
   155 
   168 
   156 		return result.str();
   169 		return result.str();
   157 	}
   170 	}
   158 
   171 
       
   172 	static void jackErrorCallback(const char * message) {
       
   173 		std::wstring_convert < std::codecvt_utf8<wchar_t>> convertor; // TODO: local system encoding
       
   174 		std::wcerr << L"JACK: " << convertor.from_bytes(message) << std::endl;
       
   175 	}
       
   176 
       
   177 	void finalize() {
       
   178 		// Close JACK connection:
       
   179 		jack_deactivate(realTimeContext.jackClient);
       
   180 		jack_client_close(realTimeContext.jackClient);
       
   181 		jack_ringbuffer_free(realTimeContext.ringBuffer);
       
   182 		pthread_mutex_unlock(&realTimeContext.processingLock);
       
   183 	}
       
   184 
       
   185 	void failInConstructor(const relpipe::common::type::StringX& errorMessage) {
       
   186 		finalize();
       
   187 		throw JackException(errorMessage);
       
   188 	}
       
   189 
       
   190 	/**
       
   191 	 * Wait for the signal that is emitted at the end of the real-time processCallback() cycle.
       
   192 	 */
       
   193 	void waitForRTCycle() {
       
   194 		pthread_cond_wait(&realTimeContext.processingDone, &realTimeContext.processingLock);
       
   195 	}
       
   196 
   159 public:
   197 public:
   160 
   198 
   161 	JackCommand(Configuration& configuration) : configuration(configuration) {
   199 	JackCommand(Configuration& configuration) : configuration(configuration) {
   162 	}
   200 		pthread_mutex_lock(&realTimeContext.processingLock);
   163 
   201 
   164 	void finish(int sig) {
   202 		// Initialize JACK connection:
   165 		continueProcessing = false;
   203 		std::string clientName = convertor.to_bytes(configuration.jackClientName);
   166 	}
   204 		realTimeContext.jackClient = jack_client_open(clientName.c_str(), JackNullOption, nullptr);
   167 
   205 		if (realTimeContext.jackClient == nullptr) failInConstructor(L"Could not create JACK client.");
   168 	void processJackStream(std::shared_ptr<writer::RelationalWriter> writer, std::function<void() > relationalWriterFlush) {
   206 
       
   207 		realTimeContext.ringBuffer = jack_ringbuffer_create(realTimeContext.RING_BUFFER_SIZE * sizeof (MidiMessage));
       
   208 
       
   209 		jack_set_process_callback(realTimeContext.jackClient, RealTimeContext::processCallback, &realTimeContext);
       
   210 		// TODO: report also other events (connections etc.)
       
   211 		jack_set_error_function(jackErrorCallback);
       
   212 		jack_set_info_function(jackErrorCallback);
       
   213 
       
   214 		realTimeContext.jackPort = jack_port_register(realTimeContext.jackClient, "input", JACK_DEFAULT_MIDI_TYPE, JackPortIsInput, 0);
       
   215 		if (realTimeContext.jackPort == nullptr) failInConstructor(L"Could not register the JACK port.");
       
   216 
       
   217 		if (mlockall(MCL_CURRENT | MCL_FUTURE)) fwprintf(stderr, L"Warning: Can not lock memory.\n");
       
   218 
       
   219 		int jackError = jack_activate(realTimeContext.jackClient);
       
   220 		if (jackError) failInConstructor(L"Could not activate the JACK client.");
       
   221 	}
       
   222 
       
   223 	void processJackStream(std::shared_ptr<relpipe::writer::RelationalWriter> writer, std::function<void() > relationalWriterFlush) {
   169 		// Relation headers:
   224 		// Relation headers:
       
   225 		using namespace relpipe::writer;
   170 		vector<AttributeMetadata> metadata;
   226 		vector<AttributeMetadata> metadata;
   171 		metadata.push_back({L"event", TypeId::STRING});
   227 		metadata.push_back({L"event", TypeId::STRING});
   172 		metadata.push_back({L"channel", TypeId::INTEGER});
   228 		metadata.push_back({L"channel", TypeId::INTEGER});
   173 		metadata.push_back({L"note_on", TypeId::BOOLEAN});
   229 		metadata.push_back({L"note_on", TypeId::BOOLEAN});
   174 		metadata.push_back({L"note_pitch", TypeId::INTEGER});
   230 		metadata.push_back({L"note_pitch", TypeId::INTEGER});
   177 		metadata.push_back({L"controller_value", TypeId::INTEGER});
   233 		metadata.push_back({L"controller_value", TypeId::INTEGER});
   178 		metadata.push_back({L"raw", TypeId::STRING});
   234 		metadata.push_back({L"raw", TypeId::STRING});
   179 		writer->startRelation(L"midi", metadata, true);
   235 		writer->startRelation(L"midi", metadata, true);
   180 		relationalWriterFlush();
   236 		relationalWriterFlush();
   181 
   237 
   182 		// Initialize JACK connection:
       
   183 		std::string clientName = convertor.to_bytes(configuration.jackClientName);
       
   184 		jack_client_t* client = jack_client_open(clientName.c_str(), JackNullOption, nullptr);
       
   185 		if (client == nullptr) throw JackException(L"Could not create JACK client.");
       
   186 
       
   187 		ringBuffer = jack_ringbuffer_create(RING_BUFFER_SIZE * sizeof (MidiMessage));
       
   188 
       
   189 		jack_set_process_callback(client, relpipe::in::jack::enqueueMessage, this);
       
   190 		// TODO: report also other events (connections etc.)
       
   191 
       
   192 		jackPort = jack_port_register(client, "input", JACK_DEFAULT_MIDI_TYPE, JackPortIsInput, 0);
       
   193 		if (jackPort == nullptr) throw JackException(L"Could not register port.");
       
   194 
       
   195 		if (mlockall(MCL_CURRENT | MCL_FUTURE)) fwprintf(stderr, L"Warning: Can not lock memory.\n");
       
   196 
       
   197 		int jackError = jack_activate(client);
       
   198 		if (jackError) throw JackException(L"Could not activate client.");
       
   199 
       
   200 		// Process messages from the ring buffer queue:
   238 		// Process messages from the ring buffer queue:
   201 		pthread_mutex_lock(&messageThreadLock);
       
   202 		while (continueProcessing) {
   239 		while (continueProcessing) {
   203 			const size_t queuedMessages = jack_ringbuffer_read_space(ringBuffer) / sizeof (MidiMessage);
   240 			while (jack_ringbuffer_read_space(realTimeContext.ringBuffer) >= sizeof (MidiMessage)) {
   204 			for (size_t i = 0; i < queuedMessages; ++i) {
       
   205 				MidiMessage m;
   241 				MidiMessage m;
   206 				jack_ringbuffer_read(ringBuffer, (char*) &m, sizeof (MidiMessage));
   242 				jack_ringbuffer_read(realTimeContext.ringBuffer, (char*) &m, sizeof (MidiMessage));
   207 				processMessage(writer, &m);
   243 				processMessage(writer, &m);
   208 				relationalWriterFlush();
   244 				relationalWriterFlush();
   209 			}
   245 			}
   210 			pthread_cond_wait(&dataReady, &messageThreadLock);
   246 			waitForRTCycle();
   211 		}
   247 		}
   212 		pthread_mutex_unlock(&messageThreadLock);
   248 	}
   213 
   249 
   214 		// Close JACK connection:
   250 	void finish(int sig) {
   215 		jack_deactivate(client);
   251 		continueProcessing = false;
   216 		jack_client_close(client);
   252 	}
   217 		jack_ringbuffer_free(ringBuffer);
   253 
       
   254 	virtual ~JackCommand() {
       
   255 		finalize();
   218 	}
   256 	}
   219 
   257 
   220 };
   258 };
   221 
       
   222 int enqueueMessage(jack_nframes_t frames, void* arg) {
       
   223 	JackCommand* instance = (JackCommand*) arg;
       
   224 	return instance->enqueueMessage(frames);
       
   225 }
       
   226 
   259 
   227 }
   260 }
   228 }
   261 }
   229 }
   262 }