/**
* Relational pipes
* Copyright © 2021 František Kučera (Frantovo.cz, GlobalCode.info)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <string>
#include <exception>
#include <cstring>
#include "AbstractParser.h"
namespace relpipe {
namespace in {
namespace asn1 {
namespace lib {
class TransactionalBuffer {
private:
uint8_t* buffer;
size_t bufferSize;
bool hasData = false;
size_t writePosition = 0;
// size_t writePositionCommited = 0; // TODO: transactions also on the writing side?
size_t readPosition = 0;
size_t readPositionCommited = 0;
size_t bytesRead = 0;
size_t bytesReadCommited = 0;
size_t availableForReading() {
if (readPosition < writePosition) return writePosition - readPosition;
else if (readPosition > writePosition) return bufferSize - readPosition + writePosition;
else if (hasData) return bufferSize;
else return 0;
}
size_t availableForWriting() {
if (readPositionCommited < writePosition) return bufferSize - writePosition + readPositionCommited;
else if (readPositionCommited > writePosition) return readPositionCommited - writePosition;
else if (hasData) return 0;
else return bufferSize;
}
public:
/**
* Is thrown from read() and peak() methods if there are not enough data.
* Interrupts current update() cycle and causes rollback.
*/
class ReadBufferUnderflowException {
// TODO: common super-class for exceptions, hidden implementation
};
/**
* Is thrown from write() method of the buffer when there is not enough space.
* Interrupts whole process.
*/
class WriteBufferOverflowException {
};
TransactionalBuffer(size_t initialSize = 4096) : bufferSize(initialSize) {
// TODO: initial size + resize + hard upper limit
buffer = (uint8_t*) malloc(bufferSize);
}
virtual ~TransactionalBuffer() {
free(buffer);
}
void write(const uint8_t* inputBuffer, const size_t length) {
if (length == 0) return;
if (length > availableForWriting()) throw WriteBufferOverflowException(); // TODO: optional resize
hasData = true;
const size_t a = std::min(length, bufferSize - writePosition);
const size_t b = length - a;
// TODO: map buffer twice in the memory in order to create a continuous area and avoid the second memcpy()?
memcpy(buffer + writePosition, inputBuffer, a);
memcpy(buffer, inputBuffer + a, b);
writePosition = (writePosition + length) % bufferSize;
}
void read(uint8_t* outputBuffer, const size_t length) {
if (length == 0) return;
peek(outputBuffer, length);
readPosition = (readPosition + length) % bufferSize;
hasData = readPosition != writePosition;
bytesRead += length;
}
void peek(uint8_t* outputBuffer, const size_t length) {
if (length == 0) return;
if (length > availableForReading()) throw ReadBufferUnderflowException();
const size_t a = std::min(length, bufferSize - readPosition);
const size_t b = length - a;
memcpy(outputBuffer, buffer + readPosition, a);
memcpy(outputBuffer + a, buffer, b);
}
void commitRead() {
readPositionCommited = readPosition;
bytesReadCommited = bytesRead;
}
void rollbackRead() {
readPosition = readPositionCommited;
bytesRead = bytesReadCommited;
}
size_t getBytesRead() {
return bytesRead;
}
};
}
}
}
}