TransactionalBuffer v_0
authorFrantišek Kučera <franta-hg@frantovo.cz>
Sat, 19 Jun 2021 12:59:07 +0200
branchv_0
changeset 10 6904e4448807
parent 9 7a6abdd00ab5
child 11 8fbe93f78e2b
TransactionalBuffer
nbproject/configurations.xml
src/XMLDocumentConstructor.h
src/lib/AbstractParser.cpp
src/lib/AbstractParser.h
src/lib/TransactionalBuffer.h
--- a/nbproject/configurations.xml	Sat Jun 12 22:37:44 2021 +0200
+++ b/nbproject/configurations.xml	Sat Jun 19 12:59:07 2021 +0200
@@ -50,6 +50,7 @@
           <in>BasicASN1Reader.h</in>
           <in>DOMBuildingXMLContentHandler.h</in>
           <in>GenericASN1ContentHandler.h</in>
+          <in>TransactionalBuffer.h</in>
           <in>XMLContentHandler.h</in>
         </df>
         <in>XMLDocumentConstructor.h</in>
@@ -132,6 +133,8 @@
             tool="3"
             flavor2="0">
       </item>
+      <item path="src/lib/TransactionalBuffer.h" ex="false" tool="3" flavor2="0">
+      </item>
       <item path="src/lib/XMLContentHandler.h" ex="false" tool="3" flavor2="0">
       </item>
       <item path="src/relpipe-in-xmltable.cpp" ex="false" tool="1" flavor2="0">
@@ -195,6 +198,8 @@
             tool="3"
             flavor2="0">
       </item>
+      <item path="src/lib/TransactionalBuffer.h" ex="false" tool="3" flavor2="0">
+      </item>
       <item path="src/lib/XMLContentHandler.h" ex="false" tool="3" flavor2="0">
       </item>
       <item path="src/relpipe-in-xmltable.cpp" ex="false" tool="1" flavor2="0">
--- a/src/XMLDocumentConstructor.h	Sat Jun 12 22:37:44 2021 +0200
+++ b/src/XMLDocumentConstructor.h	Sat Jun 19 12:59:07 2021 +0200
@@ -23,6 +23,7 @@
 #include "lib/BasicASN1Reader.h"
 #include "lib/GenericASN1ContentHandler.h"
 #include "lib/DOMBuildingXMLContentHandler.h"
+#include "lib/TransactionalBuffer.h"
 
 namespace relpipe {
 namespace in {
@@ -50,10 +51,17 @@
 		reader.addHandler(asn1handler);
 
 
-		// TODO: buffering? (reader itself also buffers)
-		for (char ch = input->get(); input->good(); ch = input->get()) reader.write(&ch, 1);
+		try {
+			// TODO: buffering? (reader itself also buffers)
+			for (char ch = input->get(); input->good(); ch = input->get()) reader.write(&ch, 1);
+		} catch (const relpipe::in::asn1::lib::TransactionalBuffer::WriteBufferOverflowException& e) {
+			// TODO: avoid leaky abstraction and use different exception
+			throw relpipe::writer::RelpipeWriterException(L"Transactional buffer for ASN.1 input is too small");
+		}
 
 		reader.close();
+
+		if (parser->get_document()->get_root_node() == nullptr) throw relpipe::writer::RelpipeWriterException(L"Empty ASN.1 input"); // TODO: move to common class
 	}
 };
 
--- a/src/lib/AbstractParser.cpp	Sat Jun 12 22:37:44 2021 +0200
+++ b/src/lib/AbstractParser.cpp	Sat Jun 19 12:59:07 2021 +0200
@@ -16,6 +16,7 @@
  */
 
 #include "AbstractParser.h"
+#include "TransactionalBuffer.h"
 
 namespace relpipe {
 namespace in {
@@ -24,7 +25,9 @@
 
 class AbstractParserImpl {
 private:
+	friend AbstractParser;
 	AbstractParser* interface;
+	TransactionalBuffer buffer;
 public:
 
 	AbstractParserImpl(AbstractParser* interface) : interface(interface) {
@@ -43,12 +46,15 @@
 }
 
 void AbstractParser::write(const char* buffer, const size_t length) {
-	// TODO: update pointers and positions
 	try {
+		// TODO: do not write to the buffer, just append in read()/peek() and write just the part that was not read during this cycle
+		implementation->buffer.write(buffer, length);
+
 		// TODO: call an overridable method to get preferred minimum block size and run cycle only if we have enough data or EOF
+		// and/or remember the length of last failed (rollbacked) read() call
 		update();
 		commit();
-	} catch (const AbstractParser::ReadBufferUnderflowException& e) {
+	} catch (const TransactionalBuffer::ReadBufferUnderflowException& e) {
 		rollback();
 	} catch (const AbstractParser::ExplicitRollbackException& e) {
 		rollback();
@@ -56,20 +62,24 @@
 }
 
 void AbstractParser::close() {
+	// TODO: check remaining data + call update() or just let parsers override this method?
 }
 
 void AbstractParser::rollback() {
-	// FIXME: store content of the current buffer + update pointers and positions
 	// TODO: notify rollback listeners? (they can monitor the performance / frequency of rollbacks)
+	implementation->buffer.rollbackRead();
 }
 
 void AbstractParser::commit() {
+	implementation->buffer.commitRead();
 }
 
 void AbstractParser::read(char* buffer, const size_t length) {
+	implementation->buffer.read(buffer, length);
 }
 
 void AbstractParser::peek(char* buffer, const size_t length) {
+	implementation->buffer.read(buffer, length);
 }
 
 }
--- a/src/lib/AbstractParser.h	Sat Jun 12 22:37:44 2021 +0200
+++ b/src/lib/AbstractParser.h	Sat Jun 19 12:59:07 2021 +0200
@@ -64,14 +64,6 @@
 	AbstractParser();
 
 	/**
-	 * Is thrown from read() and peak() methods if there are not enough data.
-	 * Interrupts current update() cycle and causes rollback.
-	 */
-	class ReadBufferUnderflowException {
-		// TODO: common super-class for exceptions, hidden implementation
-	};
-
-	/**
 	 * May be thrown from the update() method in order to cancel currenty cycle and do explicit rollback.
 	 * Same data will be processed in the next cycle.
 	 */
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib/TransactionalBuffer.h	Sat Jun 19 12:59:07 2021 +0200
@@ -0,0 +1,128 @@
+/**
+ * Relational pipes
+ * Copyright © 2021 František Kučera (Frantovo.cz, GlobalCode.info)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, version 3 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#pragma once
+
+#include <string>
+#include <exception>
+#include <cstring>
+
+#include "AbstractParser.h"
+
+namespace relpipe {
+namespace in {
+namespace asn1 {
+namespace lib {
+
+class TransactionalBuffer {
+private:
+	char* buffer;
+	size_t bufferSize;
+	bool hasData = false;
+	size_t writePosition = 0;
+	// size_t writePositionCommited = 0; // TODO: transactions also on the writing side?
+	size_t readPosition = 0;
+	size_t readPositionCommited = 0;
+
+	size_t availableForReading() {
+		if (readPosition < writePosition) return writePosition - readPosition;
+		else if (readPosition > writePosition) return bufferSize - readPosition + writePosition;
+		else if (hasData) return bufferSize;
+		else return 0;
+	}
+
+	size_t availableForWriting() {
+		if (readPositionCommited < writePosition) return bufferSize - writePosition + readPositionCommited;
+		else if (readPositionCommited > writePosition) return readPositionCommited - writePosition;
+		else if (hasData) return 0;
+		else return bufferSize;
+	}
+
+public:
+
+		/**
+	 * Is thrown from read() and peak() methods if there are not enough data.
+	 * Interrupts current update() cycle and causes rollback.
+	 */
+	class ReadBufferUnderflowException {
+		// TODO: common super-class for exceptions, hidden implementation
+	};
+
+	/**
+	 * Is thrown from write() method of the buffer when there is not enough space.
+	 * Interrupts whole process.
+	 */
+	class WriteBufferOverflowException {
+	};
+
+	TransactionalBuffer(size_t initialSize = 4096) : bufferSize(initialSize) {
+		// TODO: initial size + resize + hard upper limit
+		buffer = (char*) malloc(bufferSize);
+	}
+
+	virtual ~TransactionalBuffer() {
+		free(buffer);
+	}
+
+	void write(const char* inputBuffer, const size_t length) {
+		if (length == 0) return;
+		if (length > availableForWriting()) throw WriteBufferOverflowException(); // TODO: optional resize
+
+		hasData = true;
+
+		const size_t a = std::min(length, bufferSize - writePosition);
+		const size_t b = length - a;
+
+		// TODO: map buffer twice in the memory in order to create a continuous area and avoid the second memcpy()?
+		memcpy(buffer + writePosition, inputBuffer, a);
+		memcpy(buffer, inputBuffer + a, b);
+
+		writePosition = (writePosition + length) % bufferSize;
+	}
+
+	void read(char* outputBuffer, const size_t length) {
+		if (length == 0) return;
+
+		peek(outputBuffer, length);
+		readPosition = (readPosition + length) % bufferSize;
+		hasData = readPosition != writePosition;
+	}
+
+	void peek(char* outputBuffer, const size_t length) {
+		if (length == 0) return;
+		if (length > availableForReading()) throw ReadBufferUnderflowException();
+
+		const size_t a = std::min(length, bufferSize - readPosition);
+		const size_t b = length - a;
+
+		memcpy(outputBuffer, buffer + readPosition, a);
+		memcpy(outputBuffer + a, buffer, b);
+	}
+
+	void commitRead() {
+		readPositionCommited = readPosition;
+	}
+
+	void rollbackRead() {
+		readPosition = readPositionCommited;
+	}
+};
+
+
+}
+}
+}
+}