ClickHouse/base/poco/Foundation/include/Poco/FIFOBuffer.h

569 lines
16 KiB
C++

//
// FIFOBuffer.h
//
// Library: Foundation
// Package: Core
// Module: FIFOBuffer
//
// Definition of the FIFOBuffer class.
//
// Copyright (c) 2006, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#ifndef Foundation_FIFOBuffer_INCLUDED
#define Foundation_FIFOBuffer_INCLUDED
#include "Poco/BasicEvent.h"
#include "Poco/Buffer.h"
#include "Poco/Exception.h"
#include "Poco/Format.h"
#include "Poco/Foundation.h"
#include "Poco/Mutex.h"
namespace Poco
{
template <class T>
class BasicFIFOBuffer
/// A simple buffer class with support for re-entrant,
/// FIFO-style read/write operations, as well as (optional)
/// empty/non-empty/full (i.e. writable/readable) transition
/// notifications. Buffer can be flagged with end-of-file and
/// error flags, which renders it un-readable/writable.
///
/// Critical portions of code are protected by a recursive mutex.
/// However, to achieve thread-safety in cases where multiple
/// member function calls are involved and have to be atomic,
/// the mutex must be locked externally.
///
/// Buffer size, as well as amount of unread data and
/// available space introspections are supported as well.
///
/// This class is useful anywhere where a FIFO functionality
/// is needed.
{
public:
typedef T Type;
mutable Poco::BasicEvent<bool> writable;
/// Event indicating "writability" of the buffer,
/// triggered as follows:
///
/// * when buffer transitions from non-full to full,
/// Writable event observers are notified, with
/// false value as the argument
///
/// * when buffer transitions from full to non-full,
/// Writable event observers are notified, with
/// true value as the argument
mutable Poco::BasicEvent<bool> readable;
/// Event indicating "readability" of the buffer,
/// triggered as follows:
///
/// * when buffer transitions from non-empty to empty,
/// Readable event observers are notified, with false
/// value as the argument
///
/// * when FIFOBuffer transitions from empty to non-empty,
/// Readable event observers are notified, with true value
/// as the argument
BasicFIFOBuffer(std::size_t size, bool notify = false) : _buffer(size), _begin(0), _used(0), _notify(notify), _eof(false), _error(false)
/// Creates the FIFOBuffer.
{
}
BasicFIFOBuffer(T * pBuffer, std::size_t size, bool notify = false)
: _buffer(pBuffer, size), _begin(0), _used(0), _notify(notify), _eof(false), _error(false)
/// Creates the FIFOBuffer.
{
}
BasicFIFOBuffer(const T * pBuffer, std::size_t size, bool notify = false)
: _buffer(pBuffer, size), _begin(0), _used(size), _notify(notify), _eof(false), _error(false)
/// Creates the FIFOBuffer.
{
}
~BasicFIFOBuffer()
/// Destroys the FIFOBuffer.
{
}
void resize(std::size_t newSize, bool preserveContent = true)
/// Resizes the buffer. If preserveContent is true,
/// the content of the old buffer is preserved.
/// New size can be larger or smaller than
/// the current size, but it must not be 0.
/// Additionally, if the new length is smaller
/// than currently used length and preserveContent
/// is true, InvalidAccessException is thrown.
{
Mutex::ScopedLock lock(_mutex);
if (preserveContent && (newSize < _used))
throw InvalidAccessException("Can not resize FIFO without data loss.");
std::size_t usedBefore = _used;
_buffer.resize(newSize, preserveContent);
if (!preserveContent)
_used = 0;
if (_notify)
notify(usedBefore);
}
std::size_t peek(T * pBuffer, std::size_t length) const
/// Peeks into the data currently in the FIFO
/// without actually extracting it.
/// If length is zero, the return is immediate.
/// If length is greater than used length,
/// it is substituted with the the current FIFO
/// used length.
///
/// Returns the number of elements copied in the
/// supplied buffer.
{
if (0 == length)
return 0;
Mutex::ScopedLock lock(_mutex);
if (!isReadable())
return 0;
if (length > _used)
length = _used;
std::memcpy(pBuffer, _buffer.begin() + _begin, length * sizeof(T));
return length;
}
std::size_t peek(Poco::Buffer<T> & buffer, std::size_t length = 0) const
/// Peeks into the data currently in the FIFO
/// without actually extracting it.
/// Resizes the supplied buffer to the size of
/// data written to it. If length is not
/// supplied by the caller or is greater than length
/// of currently used data, the current FIFO used
/// data length is substituted for it.
///
/// Returns the number of elements copied in the
/// supplied buffer.
{
Mutex::ScopedLock lock(_mutex);
if (!isReadable())
return 0;
if (0 == length || length > _used)
length = _used;
buffer.resize(length);
return peek(buffer.begin(), length);
}
std::size_t read(T * pBuffer, std::size_t length)
/// Copies the data currently in the FIFO
/// into the supplied buffer, which must be
/// preallocated to at least the length size
/// before calling this function.
///
/// Returns the size of the copied data.
{
if (0 == length)
return 0;
Mutex::ScopedLock lock(_mutex);
if (!isReadable())
return 0;
std::size_t usedBefore = _used;
std::size_t readLen = peek(pBuffer, length);
poco_assert(_used >= readLen);
_used -= readLen;
if (0 == _used)
_begin = 0;
else
_begin += length;
if (_notify)
notify(usedBefore);
return readLen;
}
std::size_t read(Poco::Buffer<T> & buffer, std::size_t length = 0)
/// Copies the data currently in the FIFO
/// into the supplied buffer.
/// Resizes the supplied buffer to the size of
/// data written to it.
///
/// Returns the size of the copied data.
{
Mutex::ScopedLock lock(_mutex);
if (!isReadable())
return 0;
std::size_t usedBefore = _used;
std::size_t readLen = peek(buffer, length);
poco_assert(_used >= readLen);
_used -= readLen;
if (0 == _used)
_begin = 0;
else
_begin += length;
if (_notify)
notify(usedBefore);
return readLen;
}
std::size_t write(const T * pBuffer, std::size_t length)
/// Writes data from supplied buffer to the FIFO buffer.
/// If there is no sufficient space for the whole
/// buffer to be written, data up to available
/// length is written.
/// The length of data to be written is determined from the
/// length argument. Function does nothing and returns zero
/// if length argument is equal to zero.
///
/// Returns the length of data written.
{
if (0 == length)
return 0;
Mutex::ScopedLock lock(_mutex);
if (!isWritable())
return 0;
if (_buffer.size() - (_begin + _used) < length)
{
std::memmove(_buffer.begin(), begin(), _used * sizeof(T));
_begin = 0;
}
std::size_t usedBefore = _used;
std::size_t available = _buffer.size() - _used - _begin;
std::size_t len = length > available ? available : length;
std::memcpy(begin() + _used, pBuffer, len * sizeof(T));
_used += len;
poco_assert(_used <= _buffer.size());
if (_notify)
notify(usedBefore);
return len;
}
std::size_t write(const Buffer<T> & buffer, std::size_t length = 0)
/// Writes data from supplied buffer to the FIFO buffer.
/// If there is no sufficient space for the whole
/// buffer to be written, data up to available
/// length is written.
/// The length of data to be written is determined from the
/// length argument or buffer size (when length argument is
/// default zero or greater than buffer size).
///
/// Returns the length of data written.
{
if (length == 0 || length > buffer.size())
length = buffer.size();
return write(buffer.begin(), length);
}
std::size_t size() const
/// Returns the size of the buffer.
{
return _buffer.size();
}
std::size_t used() const
/// Returns the size of the used portion of the buffer.
{
return _used;
}
std::size_t available() const
/// Returns the size of the available portion of the buffer.
{
return size() - _used;
}
void drain(std::size_t length = 0)
/// Drains length number of elements from the buffer.
/// If length is zero or greater than buffer current
/// content length, buffer is emptied.
{
Mutex::ScopedLock lock(_mutex);
std::size_t usedBefore = _used;
if (0 == length || length >= _used)
{
_begin = 0;
_used = 0;
}
else
{
_begin += length;
_used -= length;
}
if (_notify)
notify(usedBefore);
}
void copy(const T * ptr, std::size_t length)
/// Copies the supplied data to the buffer and adjusts
/// the used buffer size.
{
poco_check_ptr(ptr);
if (0 == length)
return;
Mutex::ScopedLock lock(_mutex);
if (length > available())
throw Poco::InvalidAccessException("Cannot extend buffer.");
if (!isWritable())
throw Poco::InvalidAccessException("Buffer not writable.");
std::memcpy(begin() + _used, ptr, length * sizeof(T));
std::size_t usedBefore = _used;
_used += length;
if (_notify)
notify(usedBefore);
}
void advance(std::size_t length)
/// Advances buffer by length elements.
/// Should be called AFTER the data
/// was copied into the buffer.
{
Mutex::ScopedLock lock(_mutex);
if (length > available())
throw Poco::InvalidAccessException("Cannot extend buffer.");
if (!isWritable())
throw Poco::InvalidAccessException("Buffer not writable.");
if (_buffer.size() - (_begin + _used) < length)
{
std::memmove(_buffer.begin(), begin(), _used * sizeof(T));
_begin = 0;
}
std::size_t usedBefore = _used;
_used += length;
if (_notify)
notify(usedBefore);
}
T * begin()
/// Returns the pointer to the beginning of the buffer.
{
Mutex::ScopedLock lock(_mutex);
if (_begin != 0)
{
// Move the data to the start of the buffer so begin() and next()
// always return consistent pointers with each other and allow writing
// to the end of the buffer.
std::memmove(_buffer.begin(), _buffer.begin() + _begin, _used * sizeof(T));
_begin = 0;
}
return _buffer.begin();
}
T * next()
/// Returns the pointer to the next available position in the buffer.
{
Mutex::ScopedLock lock(_mutex);
return begin() + _used;
}
T & operator[](std::size_t index)
/// Returns value at index position.
/// Throws InvalidAccessException if index is larger than
/// the last valid (used) buffer position.
{
Mutex::ScopedLock lock(_mutex);
if (index >= _used)
throw InvalidAccessException(format("Index out of bounds: %z (max index allowed: %z)", index, _used - 1));
return _buffer[_begin + index];
}
const T & operator[](std::size_t index) const
/// Returns value at index position.
/// Throws InvalidAccessException if index is larger than
/// the last valid (used) buffer position.
{
Mutex::ScopedLock lock(_mutex);
if (index >= _used)
throw InvalidAccessException(format("Index out of bounds: %z (max index allowed: %z)", index, _used - 1));
return _buffer[_begin + index];
}
const Buffer<T> & buffer() const
/// Returns const reference to the underlying buffer.
{
return _buffer;
}
void setError(bool error = true)
/// Sets the error flag on the buffer and empties it.
/// If notifications are enabled, they will be triggered
/// if appropriate.
///
/// Setting error flag to true prevents reading and writing
/// to the buffer; to re-enable FIFOBuffer for reading/writing,
/// the error flag must be set to false.
{
if (error)
{
bool f = false;
Mutex::ScopedLock lock(_mutex);
if (error && isReadable() && _notify)
readable.notify(this, f);
if (error && isWritable() && _notify)
writable.notify(this, f);
_error = error;
_used = 0;
}
else
{
bool t = true;
Mutex::ScopedLock lock(_mutex);
_error = false;
if (_notify && !_eof)
writable.notify(this, t);
}
}
bool isValid() const
/// Returns true if error flag is not set on the buffer,
/// otherwise returns false.
{
return !_error;
}
void setEOF(bool eof = true)
/// Sets end-of-file flag on the buffer.
///
/// Setting EOF flag to true prevents writing to the
/// buffer; reading from the buffer will still be
/// allowed until all data present in the buffer at the
/// EOF set time is drained. After that, to re-enable
/// FIFOBuffer for reading/writing, EOF must be
/// set to false.
///
/// Setting EOF flag to false clears EOF state if it
/// was previously set. If EOF was not set, it has no
/// effect.
{
Mutex::ScopedLock lock(_mutex);
bool flag = !eof;
if (_notify)
writable.notify(this, flag);
_eof = eof;
}
bool hasEOF() const
/// Returns true if EOF flag has been set.
{
return _eof;
}
bool isEOF() const
/// Returns true if EOF flag has been set and buffer is empty.
{
return isEmpty() && _eof;
}
bool isEmpty() const
/// Returns true is buffer is empty, false otherwise.
{
return 0 == _used;
}
bool isFull() const
/// Returns true is buffer is full, false otherwise.
{
return size() == _used;
}
bool isReadable() const
/// Returns true if buffer contains data and is not
/// in error state.
{
return !isEmpty() && isValid();
}
bool isWritable() const
/// Returns true if buffer is not full and is not
/// in error state.
{
return !isFull() && isValid() && !_eof;
}
void setNotify(bool notify = true)
/// Enables/disables notifications.
{
_notify = notify;
}
bool getNotify() const
/// Returns true if notifications are enabled, false otherwise.
{
return _notify;
}
Mutex & mutex()
/// Returns reference to mutex.
{
return _mutex;
}
private:
void notify(std::size_t usedBefore)
{
bool t = true, f = false;
if (usedBefore == 0 && _used > 0)
readable.notify(this, t);
else if (usedBefore > 0 && 0 == _used)
readable.notify(this, f);
if (usedBefore == _buffer.size() && _used < _buffer.size())
writable.notify(this, t);
else if (usedBefore < _buffer.size() && _used == _buffer.size())
writable.notify(this, f);
}
BasicFIFOBuffer();
BasicFIFOBuffer(const BasicFIFOBuffer &);
BasicFIFOBuffer & operator=(const BasicFIFOBuffer &);
Buffer<T> _buffer;
std::size_t _begin;
std::size_t _used;
bool _notify;
mutable Mutex _mutex;
bool _eof;
bool _error;
};
//
// We provide an instantiation for char
//
typedef BasicFIFOBuffer<char> FIFOBuffer;
} // namespace Poco
#endif // Foundation_FIFOBuffer_INCLUDED