mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 16:50:48 +00:00
Merge pull request #18979 from ClickHouse/fix_18690
Remove unread data limit from PeekableReadBuffer
This commit is contained in:
commit
fb6d1dc18e
@ -4,13 +4,11 @@ namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int MEMORY_LIMIT_EXCEEDED;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ /*= DBMS_DEFAULT_BUFFER_SIZE*/,
|
||||
size_t unread_limit_ /* = default_limit*/)
|
||||
: BufferWithOwnMemory(start_size_), sub_buf(sub_buf_), unread_limit(unread_limit_)
|
||||
PeekableReadBuffer::PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ /*= DBMS_DEFAULT_BUFFER_SIZE*/)
|
||||
: BufferWithOwnMemory(start_size_), sub_buf(sub_buf_)
|
||||
{
|
||||
padded &= sub_buf.isPadded();
|
||||
/// Read from sub-buffer
|
||||
@ -191,8 +189,6 @@ void PeekableReadBuffer::checkStateCorrect() const
|
||||
}
|
||||
if (currentlyReadFromOwnMemory() && !peeked_size)
|
||||
throw DB::Exception("Pos in empty own buffer", ErrorCodes::LOGICAL_ERROR);
|
||||
if (unread_limit < memory.size())
|
||||
throw DB::Exception("Size limit exceed", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
||||
void PeekableReadBuffer::resizeOwnMemoryIfNecessary(size_t bytes_to_append)
|
||||
@ -222,16 +218,11 @@ void PeekableReadBuffer::resizeOwnMemoryIfNecessary(size_t bytes_to_append)
|
||||
}
|
||||
else
|
||||
{
|
||||
if (unread_limit < new_size)
|
||||
throw DB::Exception("PeekableReadBuffer: Memory limit exceed", ErrorCodes::MEMORY_LIMIT_EXCEEDED);
|
||||
|
||||
size_t pos_offset = pos - memory.data();
|
||||
|
||||
size_t new_size_amortized = memory.size() * 2;
|
||||
if (new_size_amortized < new_size)
|
||||
new_size_amortized = new_size;
|
||||
else if (unread_limit < new_size_amortized)
|
||||
new_size_amortized = unread_limit;
|
||||
memory.resize(new_size_amortized);
|
||||
|
||||
if (need_update_checkpoint)
|
||||
|
@ -20,8 +20,7 @@ class PeekableReadBuffer : public BufferWithOwnMemory<ReadBuffer>
|
||||
{
|
||||
friend class PeekableReadBufferCheckpoint;
|
||||
public:
|
||||
explicit PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ = DBMS_DEFAULT_BUFFER_SIZE,
|
||||
size_t unread_limit_ = 16 * DBMS_DEFAULT_BUFFER_SIZE);
|
||||
explicit PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ = DBMS_DEFAULT_BUFFER_SIZE);
|
||||
|
||||
~PeekableReadBuffer() override;
|
||||
|
||||
@ -95,7 +94,6 @@ private:
|
||||
|
||||
|
||||
ReadBuffer & sub_buf;
|
||||
const size_t unread_limit;
|
||||
size_t peeked_size = 0;
|
||||
Position checkpoint = nullptr;
|
||||
bool checkpoint_in_own_memory = false;
|
||||
|
@ -9,7 +9,6 @@
|
||||
namespace DB::ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int MEMORY_LIMIT_EXCEEDED;
|
||||
}
|
||||
|
||||
static void readAndAssert(DB::ReadBuffer & buf, const char * str)
|
||||
@ -40,7 +39,7 @@ try
|
||||
DB::ReadBufferFromString b4(s4);
|
||||
|
||||
DB::ConcatReadBuffer concat({&b1, &b2, &b3, &b4});
|
||||
DB::PeekableReadBuffer peekable(concat, 0, 16);
|
||||
DB::PeekableReadBuffer peekable(concat, 0);
|
||||
|
||||
ASSERT_TRUE(!peekable.eof());
|
||||
assertAvailable(peekable, "0123456789");
|
||||
@ -48,6 +47,8 @@ try
|
||||
DB::PeekableReadBufferCheckpoint checkpoint{peekable};
|
||||
readAndAssert(peekable, "01234");
|
||||
}
|
||||
|
||||
#ifndef ABORT_ON_LOGICAL_ERROR
|
||||
bool exception = false;
|
||||
try
|
||||
{
|
||||
@ -60,6 +61,7 @@ try
|
||||
exception = true;
|
||||
}
|
||||
ASSERT_TRUE(exception);
|
||||
#endif
|
||||
assertAvailable(peekable, "56789");
|
||||
|
||||
readAndAssert(peekable, "56");
|
||||
@ -70,19 +72,10 @@ try
|
||||
peekable.dropCheckpoint();
|
||||
assertAvailable(peekable, "789");
|
||||
|
||||
exception = false;
|
||||
try
|
||||
{
|
||||
DB::PeekableReadBufferCheckpoint checkpoint{peekable, true};
|
||||
peekable.ignore(30);
|
||||
peekable.ignore(20);
|
||||
}
|
||||
catch (DB::Exception & e)
|
||||
{
|
||||
if (e.code() != DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED)
|
||||
throw;
|
||||
exception = true;
|
||||
}
|
||||
ASSERT_TRUE(exception);
|
||||
assertAvailable(peekable, "789qwertyuiop");
|
||||
|
||||
readAndAssert(peekable, "789qwertyu");
|
||||
|
@ -0,0 +1,3 @@
|
||||
1000100
|
||||
1000100
|
||||
1000100
|
20
tests/queries/0_stateless/01184_insert_values_huge_strings.sh
Executable file
20
tests/queries/0_stateless/01184_insert_values_huge_strings.sh
Executable file
@ -0,0 +1,20 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "drop table if exists huge_strings"
|
||||
$CLICKHOUSE_CLIENT -q "create table huge_strings (n UInt64, l UInt64, s String, h UInt64) engine=MergeTree order by n"
|
||||
|
||||
for _ in {1..10}; do
|
||||
$CLICKHOUSE_CLIENT -q "select number, (rand() % 100*1000*1000) as l, repeat(randomString(l/1000/1000), 1000*1000) as s, cityHash64(s) from numbers(10) format Values" | $CLICKHOUSE_CLIENT -q "insert into huge_strings values" &
|
||||
$CLICKHOUSE_CLIENT -q "select number % 10, (rand() % 100) as l, randomString(l) as s, cityHash64(s) from numbers(100000)" | $CLICKHOUSE_CLIENT -q "insert into huge_strings format TSV" &
|
||||
done;
|
||||
wait
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "select count() from huge_strings"
|
||||
$CLICKHOUSE_CLIENT -q "select sum(l = length(s)) from huge_strings"
|
||||
$CLICKHOUSE_CLIENT -q "select sum(h = cityHash64(s)) from huge_strings"
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "drop table huge_strings"
|
Loading…
Reference in New Issue
Block a user