Use small amount of memory on stack in PeekableReadBuffer

This commit is contained in:
avogar 2021-10-28 17:02:07 +03:00
parent d30aecbda8
commit 6e8c2ab28f
4 changed files with 74 additions and 17 deletions

View File

@ -9,8 +9,8 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, bool use_existing_memory /*= false*/, size_t start_size_ /*= DBMS_DEFAULT_BUFFER_SIZE*/) PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ /*= 0*/)
: BufferWithOwnMemory(use_existing_memory ? sizeof(existing_memory) : start_size_, use_existing_memory ? existing_memory : nullptr), sub_buf(sub_buf_) : BufferWithOwnMemory(start_size_), sub_buf(sub_buf_)
{ {
padded &= sub_buf.isPadded(); padded &= sub_buf.isPadded();
/// Read from sub-buffer /// Read from sub-buffer
@ -27,6 +27,7 @@ void PeekableReadBuffer::reset()
peeked_size = 0; peeked_size = 0;
checkpoint = std::nullopt; checkpoint = std::nullopt;
checkpoint_in_own_memory = false; checkpoint_in_own_memory = false;
use_stack_memory = true;
if (!currentlyReadFromOwnMemory()) if (!currentlyReadFromOwnMemory())
sub_buf.position() = pos; sub_buf.position() = pos;
@ -72,21 +73,23 @@ bool PeekableReadBuffer::peekNext()
sub_buf.position() = copy_from; sub_buf.position() = copy_from;
} }
char * memory_data = getMemoryData();
/// Save unread data from sub-buffer to own memory /// Save unread data from sub-buffer to own memory
memcpy(memory.data() + peeked_size, sub_buf.position(), bytes_to_copy); memcpy(memory_data + peeked_size, sub_buf.position(), bytes_to_copy);
/// If useSubbufferOnly() is false, then checkpoint is in own memory and it was updated in resizeOwnMemoryIfNecessary /// If useSubbufferOnly() is false, then checkpoint is in own memory and it was updated in resizeOwnMemoryIfNecessary
/// Otherwise, checkpoint now at the beginning of own memory /// Otherwise, checkpoint now at the beginning of own memory
if (checkpoint && useSubbufferOnly()) if (checkpoint && useSubbufferOnly())
{ {
checkpoint.emplace(memory.data()); checkpoint.emplace(memory_data);
checkpoint_in_own_memory = true; checkpoint_in_own_memory = true;
} }
if (currentlyReadFromOwnMemory()) if (currentlyReadFromOwnMemory())
{ {
/// Update buffer size /// Update buffer size
BufferBase::set(memory.data(), peeked_size + bytes_to_copy, offset()); BufferBase::set(memory_data, peeked_size + bytes_to_copy, offset());
} }
else else
{ {
@ -99,7 +102,7 @@ bool PeekableReadBuffer::peekNext()
else else
pos_offset = 0; pos_offset = 0;
} }
BufferBase::set(memory.data(), peeked_size + bytes_to_copy, pos_offset); BufferBase::set(memory_data, peeked_size + bytes_to_copy, pos_offset);
} }
peeked_size += bytes_to_copy; peeked_size += bytes_to_copy;
@ -125,8 +128,9 @@ void PeekableReadBuffer::rollbackToCheckpoint(bool drop)
/// Checkpoint is in own memory and position is not. /// Checkpoint is in own memory and position is not.
assert(checkpointInOwnMemory()); assert(checkpointInOwnMemory());
char * memory_data = getMemoryData();
/// Switch to reading from own memory. /// Switch to reading from own memory.
BufferBase::set(memory.data(), peeked_size, *checkpoint - memory.data()); BufferBase::set(memory_data, peeked_size, *checkpoint - memory_data);
} }
if (drop) if (drop)
@ -224,12 +228,31 @@ void PeekableReadBuffer::resizeOwnMemoryIfNecessary(size_t bytes_to_append)
bool need_update_pos = currentlyReadFromOwnMemory(); bool need_update_pos = currentlyReadFromOwnMemory();
size_t offset = 0; size_t offset = 0;
if (need_update_checkpoint) if (need_update_checkpoint)
offset = *checkpoint - memory.data(); {
char * memory_data = getMemoryData();
offset = *checkpoint - memory_data;
}
else if (need_update_pos) else if (need_update_pos)
offset = this->offset(); offset = this->offset();
size_t new_size = peeked_size + bytes_to_append; size_t new_size = peeked_size + bytes_to_append;
if (memory.size() < new_size)
if (use_stack_memory)
{
/// If stack memory is still enough, do nothing.
if (sizeof(stack_memory) >= new_size)
return;
/// Stack memory is not enough, allocate larger buffer.
use_stack_memory = false;
memory.resize(std::max(size_t(DBMS_DEFAULT_BUFFER_SIZE), new_size));
memcpy(memory.data(), stack_memory, sizeof(stack_memory));
if (need_update_checkpoint)
checkpoint.emplace(memory.data() + offset);
if (need_update_pos)
BufferBase::set(memory.data(), peeked_size, pos - stack_memory);
}
else if (memory.size() < new_size)
{ {
if (bytes_to_append < offset && 2 * (peeked_size - offset) <= memory.size()) if (bytes_to_append < offset && 2 * (peeked_size - offset) <= memory.size())
{ {
@ -273,10 +296,11 @@ void PeekableReadBuffer::makeContinuousMemoryFromCheckpointToPos()
size_t bytes_to_append = pos - sub_buf.position(); size_t bytes_to_append = pos - sub_buf.position();
resizeOwnMemoryIfNecessary(bytes_to_append); resizeOwnMemoryIfNecessary(bytes_to_append);
memcpy(memory.data() + peeked_size, sub_buf.position(), bytes_to_append); char * memory_data = getMemoryData();
memcpy(memory_data + peeked_size, sub_buf.position(), bytes_to_append);
sub_buf.position() = pos; sub_buf.position() = pos;
peeked_size += bytes_to_append; peeked_size += bytes_to_append;
BufferBase::set(memory.data(), peeked_size, peeked_size); BufferBase::set(memory_data, peeked_size, peeked_size);
} }
PeekableReadBuffer::~PeekableReadBuffer() PeekableReadBuffer::~PeekableReadBuffer()
@ -287,7 +311,7 @@ PeekableReadBuffer::~PeekableReadBuffer()
bool PeekableReadBuffer::hasUnreadData() const bool PeekableReadBuffer::hasUnreadData() const
{ {
return peeked_size && pos != memory.data() + peeked_size; return peeked_size && pos != getMemoryData() + peeked_size;
} }
} }

View File

@ -20,7 +20,7 @@ class PeekableReadBuffer : public BufferWithOwnMemory<ReadBuffer>
{ {
friend class PeekableReadBufferCheckpoint; friend class PeekableReadBufferCheckpoint;
public: public:
explicit PeekableReadBuffer(ReadBuffer & sub_buf_, bool use_existing_memory = false, size_t start_size_ = DBMS_DEFAULT_BUFFER_SIZE); explicit PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ = 0);
~PeekableReadBuffer() override; ~PeekableReadBuffer() override;
@ -84,16 +84,21 @@ private:
/// Updates all invalidated pointers and sizes. /// Updates all invalidated pointers and sizes.
void resizeOwnMemoryIfNecessary(size_t bytes_to_append); void resizeOwnMemoryIfNecessary(size_t bytes_to_append);
char * getMemoryData() { return use_stack_memory ? stack_memory : memory.data(); }
const char * getMemoryData() const { return use_stack_memory ? stack_memory : memory.data(); }
ReadBuffer & sub_buf; ReadBuffer & sub_buf;
size_t peeked_size = 0; size_t peeked_size = 0;
std::optional<Position> checkpoint = std::nullopt; std::optional<Position> checkpoint = std::nullopt;
bool checkpoint_in_own_memory = false; bool checkpoint_in_own_memory = false;
/// Small amount of memory on stack to use in BufferWithOwnMemory on /// To prevent expensive and in some cases unnecessary memory allocations on PeekableReadBuffer
/// it's creation to prevent unnecessary allocation if PeekableReadBuffer /// creation (for example if PeekableReadBuffer is often created or if we need to remember small amount of
/// is often created. /// data after checkpoint), at the beginning we will use small amount of memory on stack and allocate
char existing_memory[16]; /// larger buffer only if reserved memory is not enough.
char stack_memory[16];
bool use_stack_memory = true;
}; };

View File

@ -53,3 +53,24 @@ Some text \N
Some text CustomNull Some text Some text CustomNull Some text
OK OK
OK OK
Large custom NULL
\N
\N
\N
\N
\N
\N
\N
\N
\N
\N
0000000000Custom NULL representation0000000000
0000000000Custom NULL representation0000000000
0000000000Custom NULL representation0000000000
0000000000Custom NULL representation0000000000
0000000000Custom NULL representation0000000000
0000000000Custom NULL representation0000000000
0000000000Custom NULL representation0000000000
0000000000Custom NULL representation0000000000
0000000000Custom NULL representation0000000000
0000000000Custom NULL representation0000000000

View File

@ -121,5 +121,12 @@ $CLICKHOUSE_CLIENT -q "SELECT * FROM file('test_02103_null.data', 'CSV', 's Stri
echo -e "Some text,NU,LL" > $DATA_FILE echo -e "Some text,NU,LL" > $DATA_FILE
$CLICKHOUSE_CLIENT -q "SELECT * FROM file('test_02103_null.data', 'CSV', 's String, n Nullable(String)') settings max_read_buffer_size=13, format_csv_null_representation='NU,L', input_format_parallel_parsing=0" 2>&1 | grep -F -q "CANNOT_READ_ALL_DATA" && echo 'OK' || echo 'FAIL' $CLICKHOUSE_CLIENT -q "SELECT * FROM file('test_02103_null.data', 'CSV', 's String, n Nullable(String)') settings max_read_buffer_size=13, format_csv_null_representation='NU,L', input_format_parallel_parsing=0" 2>&1 | grep -F -q "CANNOT_READ_ALL_DATA" && echo 'OK' || echo 'FAIL'
echo 'Large custom NULL'
$CLICKHOUSE_CLIENT -q "select '0000000000Custom NULL representation0000000000' FROM numbers(10)" > $DATA_FILE
$CLICKHOUSE_CLIENT -q "SELECT * FROM file('test_02103_null.data', 'TSV', 's Nullable(String)') SETTINGS max_read_buffer_size=5, input_format_parallel_parsing=0, format_tsv_null_representation='0000000000Custom NULL representation0000000000'"
$CLICKHOUSE_CLIENT -q "SELECT * FROM file('test_02103_null.data', 'TSV', 's Nullable(String)') SETTINGS max_read_buffer_size=5, input_format_parallel_parsing=0, format_tsv_null_representation='0000000000Custom NULL representation000000000'"
rm $DATA_FILE rm $DATA_FILE