diff --git a/src/IO/PeekableReadBuffer.cpp b/src/IO/PeekableReadBuffer.cpp index e2b1873283f..40929acd848 100644 --- a/src/IO/PeekableReadBuffer.cpp +++ b/src/IO/PeekableReadBuffer.cpp @@ -9,8 +9,8 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, bool use_existing_memory /*= false*/, size_t start_size_ /*= DBMS_DEFAULT_BUFFER_SIZE*/) - : BufferWithOwnMemory(use_existing_memory ? sizeof(existing_memory) : start_size_, use_existing_memory ? existing_memory : nullptr), sub_buf(sub_buf_) +PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ /*= 0*/) + : BufferWithOwnMemory(start_size_), sub_buf(sub_buf_) { padded &= sub_buf.isPadded(); /// Read from sub-buffer @@ -27,6 +27,7 @@ void PeekableReadBuffer::reset() peeked_size = 0; checkpoint = std::nullopt; checkpoint_in_own_memory = false; + use_stack_memory = true; if (!currentlyReadFromOwnMemory()) sub_buf.position() = pos; @@ -72,21 +73,23 @@ bool PeekableReadBuffer::peekNext() sub_buf.position() = copy_from; } + char * memory_data = getMemoryData(); + /// Save unread data from sub-buffer to own memory - memcpy(memory.data() + peeked_size, sub_buf.position(), bytes_to_copy); + memcpy(memory_data + peeked_size, sub_buf.position(), bytes_to_copy); /// If useSubbufferOnly() is false, then checkpoint is in own memory and it was updated in resizeOwnMemoryIfNecessary /// Otherwise, checkpoint now at the beginning of own memory if (checkpoint && useSubbufferOnly()) { - checkpoint.emplace(memory.data()); + checkpoint.emplace(memory_data); checkpoint_in_own_memory = true; } if (currentlyReadFromOwnMemory()) { /// Update buffer size - BufferBase::set(memory.data(), peeked_size + bytes_to_copy, offset()); + BufferBase::set(memory_data, peeked_size + bytes_to_copy, offset()); } else { @@ -99,7 +102,7 @@ bool PeekableReadBuffer::peekNext() else pos_offset = 0; } - BufferBase::set(memory.data(), peeked_size + bytes_to_copy, pos_offset); + BufferBase::set(memory_data, peeked_size + bytes_to_copy, pos_offset); } peeked_size += bytes_to_copy; @@ -125,8 +128,9 @@ void PeekableReadBuffer::rollbackToCheckpoint(bool drop) /// Checkpoint is in own memory and position is not. assert(checkpointInOwnMemory()); + char * memory_data = getMemoryData(); /// Switch to reading from own memory. - BufferBase::set(memory.data(), peeked_size, *checkpoint - memory.data()); + BufferBase::set(memory_data, peeked_size, *checkpoint - memory_data); } if (drop) @@ -224,12 +228,31 @@ void PeekableReadBuffer::resizeOwnMemoryIfNecessary(size_t bytes_to_append) bool need_update_pos = currentlyReadFromOwnMemory(); size_t offset = 0; if (need_update_checkpoint) - offset = *checkpoint - memory.data(); + { + char * memory_data = getMemoryData(); + offset = *checkpoint - memory_data; + } else if (need_update_pos) offset = this->offset(); size_t new_size = peeked_size + bytes_to_append; - if (memory.size() < new_size) + + if (use_stack_memory) + { + /// If stack memory is still enough, do nothing. + if (sizeof(stack_memory) >= new_size) + return; + + /// Stack memory is not enough, allocate larger buffer. + use_stack_memory = false; + memory.resize(std::max(size_t(DBMS_DEFAULT_BUFFER_SIZE), new_size)); + memcpy(memory.data(), stack_memory, sizeof(stack_memory)); + if (need_update_checkpoint) + checkpoint.emplace(memory.data() + offset); + if (need_update_pos) + BufferBase::set(memory.data(), peeked_size, pos - stack_memory); + } + else if (memory.size() < new_size) { if (bytes_to_append < offset && 2 * (peeked_size - offset) <= memory.size()) { @@ -273,10 +296,11 @@ void PeekableReadBuffer::makeContinuousMemoryFromCheckpointToPos() size_t bytes_to_append = pos - sub_buf.position(); resizeOwnMemoryIfNecessary(bytes_to_append); - memcpy(memory.data() + peeked_size, sub_buf.position(), bytes_to_append); + char * memory_data = getMemoryData(); + memcpy(memory_data + peeked_size, sub_buf.position(), bytes_to_append); sub_buf.position() = pos; peeked_size += bytes_to_append; - BufferBase::set(memory.data(), peeked_size, peeked_size); + BufferBase::set(memory_data, peeked_size, peeked_size); } PeekableReadBuffer::~PeekableReadBuffer() @@ -287,7 +311,7 @@ PeekableReadBuffer::~PeekableReadBuffer() bool PeekableReadBuffer::hasUnreadData() const { - return peeked_size && pos != memory.data() + peeked_size; + return peeked_size && pos != getMemoryData() + peeked_size; } } diff --git a/src/IO/PeekableReadBuffer.h b/src/IO/PeekableReadBuffer.h index a8eff09c4f2..f22987d9daa 100644 --- a/src/IO/PeekableReadBuffer.h +++ b/src/IO/PeekableReadBuffer.h @@ -20,7 +20,7 @@ class PeekableReadBuffer : public BufferWithOwnMemory { friend class PeekableReadBufferCheckpoint; public: - explicit PeekableReadBuffer(ReadBuffer & sub_buf_, bool use_existing_memory = false, size_t start_size_ = DBMS_DEFAULT_BUFFER_SIZE); + explicit PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ = 0); ~PeekableReadBuffer() override; @@ -84,16 +84,21 @@ private: /// Updates all invalidated pointers and sizes. void resizeOwnMemoryIfNecessary(size_t bytes_to_append); + char * getMemoryData() { return use_stack_memory ? stack_memory : memory.data(); } + const char * getMemoryData() const { return use_stack_memory ? stack_memory : memory.data(); } + ReadBuffer & sub_buf; size_t peeked_size = 0; std::optional checkpoint = std::nullopt; bool checkpoint_in_own_memory = false; - /// Small amount of memory on stack to use in BufferWithOwnMemory on - /// it's creation to prevent unnecessary allocation if PeekableReadBuffer - /// is often created. - char existing_memory[16]; + /// To prevent expensive and in some cases unnecessary memory allocations on PeekableReadBuffer + /// creation (for example if PeekableReadBuffer is often created or if we need to remember small amount of + /// data after checkpoint), at the beginning we will use small amount of memory on stack and allocate + /// larger buffer only if reserved memory is not enough. + char stack_memory[16]; + bool use_stack_memory = true; }; diff --git a/tests/queries/0_stateless/02103_tsv_csv_custom_null_representation.reference b/tests/queries/0_stateless/02103_tsv_csv_custom_null_representation.reference index 06618cc63b1..a89bc46acfb 100644 --- a/tests/queries/0_stateless/02103_tsv_csv_custom_null_representation.reference +++ b/tests/queries/0_stateless/02103_tsv_csv_custom_null_representation.reference @@ -53,3 +53,24 @@ Some text \N Some text CustomNull Some text OK OK +Large custom NULL +\N +\N +\N +\N +\N +\N +\N +\N +\N +\N +0000000000Custom NULL representation0000000000 +0000000000Custom NULL representation0000000000 +0000000000Custom NULL representation0000000000 +0000000000Custom NULL representation0000000000 +0000000000Custom NULL representation0000000000 +0000000000Custom NULL representation0000000000 +0000000000Custom NULL representation0000000000 +0000000000Custom NULL representation0000000000 +0000000000Custom NULL representation0000000000 +0000000000Custom NULL representation0000000000 diff --git a/tests/queries/0_stateless/02103_tsv_csv_custom_null_representation.sh b/tests/queries/0_stateless/02103_tsv_csv_custom_null_representation.sh index 1d8e080c7b6..676e8cb867f 100755 --- a/tests/queries/0_stateless/02103_tsv_csv_custom_null_representation.sh +++ b/tests/queries/0_stateless/02103_tsv_csv_custom_null_representation.sh @@ -121,5 +121,12 @@ $CLICKHOUSE_CLIENT -q "SELECT * FROM file('test_02103_null.data', 'CSV', 's Stri echo -e "Some text,NU,LL" > $DATA_FILE $CLICKHOUSE_CLIENT -q "SELECT * FROM file('test_02103_null.data', 'CSV', 's String, n Nullable(String)') settings max_read_buffer_size=13, format_csv_null_representation='NU,L', input_format_parallel_parsing=0" 2>&1 | grep -F -q "CANNOT_READ_ALL_DATA" && echo 'OK' || echo 'FAIL' + +echo 'Large custom NULL' + +$CLICKHOUSE_CLIENT -q "select '0000000000Custom NULL representation0000000000' FROM numbers(10)" > $DATA_FILE +$CLICKHOUSE_CLIENT -q "SELECT * FROM file('test_02103_null.data', 'TSV', 's Nullable(String)') SETTINGS max_read_buffer_size=5, input_format_parallel_parsing=0, format_tsv_null_representation='0000000000Custom NULL representation0000000000'" +$CLICKHOUSE_CLIENT -q "SELECT * FROM file('test_02103_null.data', 'TSV', 's Nullable(String)') SETTINGS max_read_buffer_size=5, input_format_parallel_parsing=0, format_tsv_null_representation='0000000000Custom NULL representation000000000'" + rm $DATA_FILE