Merge pull request #19072 from ClickHouse/fix_19021

Fix checkpoint in PeekableReadBuffer over ConcatReadBuffer
This commit is contained in:
tavplubix 2021-01-22 21:00:04 +03:00 committed by GitHub
commit 49e1321df4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 31 additions and 17 deletions

View File

@ -23,7 +23,7 @@ void PeekableReadBuffer::reset()
checkStateCorrect(); checkStateCorrect();
peeked_size = 0; peeked_size = 0;
checkpoint = nullptr; checkpoint = std::nullopt;
checkpoint_in_own_memory = false; checkpoint_in_own_memory = false;
if (!currentlyReadFromOwnMemory()) if (!currentlyReadFromOwnMemory())
@ -45,7 +45,7 @@ bool PeekableReadBuffer::peekNext()
{ {
/// Don't have to copy all data from sub-buffer if there is no data in own memory (checkpoint and pos are in sub-buffer) /// Don't have to copy all data from sub-buffer if there is no data in own memory (checkpoint and pos are in sub-buffer)
if (checkpoint) if (checkpoint)
copy_from = checkpoint; copy_from = *checkpoint;
bytes_to_copy = sub_buf.buffer().end() - copy_from; bytes_to_copy = sub_buf.buffer().end() - copy_from;
if (!bytes_to_copy) if (!bytes_to_copy)
{ {
@ -55,7 +55,7 @@ bool PeekableReadBuffer::peekNext()
bool res = sub_buf.next(); bool res = sub_buf.next();
BufferBase::set(sub_buf.buffer().begin(), sub_buf.buffer().size(), sub_buf.offset()); BufferBase::set(sub_buf.buffer().begin(), sub_buf.buffer().size(), sub_buf.offset());
if (checkpoint) if (checkpoint)
checkpoint = pos; checkpoint.emplace(pos);
checkStateCorrect(); checkStateCorrect();
return res; return res;
@ -77,7 +77,7 @@ bool PeekableReadBuffer::peekNext()
/// Otherwise, checkpoint now at the beginning of own memory /// Otherwise, checkpoint now at the beginning of own memory
if (checkpoint && useSubbufferOnly()) if (checkpoint && useSubbufferOnly())
{ {
checkpoint = memory.data(); checkpoint.emplace(memory.data());
checkpoint_in_own_memory = true; checkpoint_in_own_memory = true;
} }
if (currentlyReadFromOwnMemory()) if (currentlyReadFromOwnMemory())
@ -113,9 +113,9 @@ void PeekableReadBuffer::rollbackToCheckpoint()
if (!checkpoint) if (!checkpoint)
throw DB::Exception("There is no checkpoint", ErrorCodes::LOGICAL_ERROR); throw DB::Exception("There is no checkpoint", ErrorCodes::LOGICAL_ERROR);
else if (checkpointInOwnMemory() == currentlyReadFromOwnMemory()) else if (checkpointInOwnMemory() == currentlyReadFromOwnMemory())
pos = checkpoint; pos = *checkpoint;
else /// Checkpoint is in own memory and pos is not. Switch to reading from own memory else /// Checkpoint is in own memory and pos is not. Switch to reading from own memory
BufferBase::set(memory.data(), peeked_size, checkpoint - memory.data()); BufferBase::set(memory.data(), peeked_size, *checkpoint - memory.data());
checkStateCorrect(); checkStateCorrect();
} }
@ -167,7 +167,7 @@ void PeekableReadBuffer::checkStateCorrect() const
{ {
if (!peeked_size) if (!peeked_size)
throw DB::Exception("Checkpoint in empty own buffer", ErrorCodes::LOGICAL_ERROR); throw DB::Exception("Checkpoint in empty own buffer", ErrorCodes::LOGICAL_ERROR);
if (currentlyReadFromOwnMemory() && pos < checkpoint) if (currentlyReadFromOwnMemory() && pos < *checkpoint)
throw DB::Exception("Current position in own buffer before checkpoint in own buffer", ErrorCodes::LOGICAL_ERROR); throw DB::Exception("Current position in own buffer before checkpoint in own buffer", ErrorCodes::LOGICAL_ERROR);
if (!currentlyReadFromOwnMemory() && pos < sub_buf.position()) if (!currentlyReadFromOwnMemory() && pos < sub_buf.position())
throw DB::Exception("Current position in subbuffer less than sub_buf.position()", ErrorCodes::LOGICAL_ERROR); throw DB::Exception("Current position in subbuffer less than sub_buf.position()", ErrorCodes::LOGICAL_ERROR);
@ -178,7 +178,7 @@ void PeekableReadBuffer::checkStateCorrect() const
throw DB::Exception("Own buffer is not empty", ErrorCodes::LOGICAL_ERROR); throw DB::Exception("Own buffer is not empty", ErrorCodes::LOGICAL_ERROR);
if (currentlyReadFromOwnMemory()) if (currentlyReadFromOwnMemory())
throw DB::Exception("Current position in own buffer before checkpoint in subbuffer", ErrorCodes::LOGICAL_ERROR); throw DB::Exception("Current position in own buffer before checkpoint in subbuffer", ErrorCodes::LOGICAL_ERROR);
if (pos < checkpoint) if (pos < *checkpoint)
throw DB::Exception("Current position in subbuffer before checkpoint in subbuffer", ErrorCodes::LOGICAL_ERROR); throw DB::Exception("Current position in subbuffer before checkpoint in subbuffer", ErrorCodes::LOGICAL_ERROR);
} }
} }
@ -198,7 +198,7 @@ void PeekableReadBuffer::resizeOwnMemoryIfNecessary(size_t bytes_to_append)
bool need_update_pos = currentlyReadFromOwnMemory(); bool need_update_pos = currentlyReadFromOwnMemory();
size_t offset = 0; size_t offset = 0;
if (need_update_checkpoint) if (need_update_checkpoint)
offset = checkpoint - memory.data(); offset = *checkpoint - memory.data();
else if (need_update_pos) else if (need_update_pos)
offset = this->offset(); offset = this->offset();
@ -212,7 +212,7 @@ void PeekableReadBuffer::resizeOwnMemoryIfNecessary(size_t bytes_to_append)
memmove(memory.data(), memory.data() + offset, peeked_size); memmove(memory.data(), memory.data() + offset, peeked_size);
if (need_update_checkpoint) if (need_update_checkpoint)
checkpoint -= offset; *checkpoint -= offset;
if (need_update_pos) if (need_update_pos)
pos -= offset; pos -= offset;
} }
@ -226,7 +226,7 @@ void PeekableReadBuffer::resizeOwnMemoryIfNecessary(size_t bytes_to_append)
memory.resize(new_size_amortized); memory.resize(new_size_amortized);
if (need_update_checkpoint) if (need_update_checkpoint)
checkpoint = memory.data() + offset; checkpoint.emplace(memory.data() + offset);
if (need_update_pos) if (need_update_pos)
{ {
BufferBase::set(memory.data(), peeked_size, pos_offset); BufferBase::set(memory.data(), peeked_size, pos_offset);
@ -243,7 +243,7 @@ void PeekableReadBuffer::makeContinuousMemoryFromCheckpointToPos()
checkStateCorrect(); checkStateCorrect();
if (!checkpointInOwnMemory() || currentlyReadFromOwnMemory()) if (!checkpointInOwnMemory() || currentlyReadFromOwnMemory())
return; /// is't already continuous return; /// it's already continuous
size_t bytes_to_append = pos - sub_buf.position(); size_t bytes_to_append = pos - sub_buf.position();
resizeOwnMemoryIfNecessary(bytes_to_append); resizeOwnMemoryIfNecessary(bytes_to_append);

View File

@ -37,7 +37,7 @@ public:
/// Don't need to store unread data anymore /// Don't need to store unread data anymore
peeked_size = 0; peeked_size = 0;
} }
checkpoint = pos; checkpoint.emplace(pos);
// FIXME: we are checking checkpoint existence in few places (rollbackToCheckpoint/dropCheckpoint) // FIXME: we are checking checkpoint existence in few places (rollbackToCheckpoint/dropCheckpoint)
// by simple if(checkpoint) but checkpoint can be nullptr after // by simple if(checkpoint) but checkpoint can be nullptr after
@ -57,7 +57,7 @@ public:
/// Don't need to store unread data anymore /// Don't need to store unread data anymore
peeked_size = 0; peeked_size = 0;
} }
checkpoint = nullptr; checkpoint = std::nullopt;
checkpoint_in_own_memory = false; checkpoint_in_own_memory = false;
} }
@ -95,7 +95,7 @@ private:
ReadBuffer & sub_buf; ReadBuffer & sub_buf;
size_t peeked_size = 0; size_t peeked_size = 0;
Position checkpoint = nullptr; std::optional<Position> checkpoint = std::nullopt;
bool checkpoint_in_own_memory = false; bool checkpoint_in_own_memory = false;
}; };

View File

@ -12,7 +12,6 @@
#include <DataStreams/RemoteBlockInputStream.h> #include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/SquashingBlockOutputStream.h> #include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/copyData.h> #include <DataStreams/copyData.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ConnectionTimeoutsContext.h> #include <IO/ConnectionTimeoutsContext.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h> #include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterWatchQuery.h> #include <Interpreters/InterpreterWatchQuery.h>

View File

@ -3,7 +3,6 @@
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Common/ThreadProfileEvents.h> #include <Common/ThreadProfileEvents.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/WriteBufferFromFile.h> #include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromVector.h> #include <IO/WriteBufferFromVector.h>
#include <IO/LimitReadBuffer.h> #include <IO/LimitReadBuffer.h>

View File

@ -0,0 +1,2 @@
2021-Jan d1 d2
1000000 1

View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
echo 'DROP TABLE IF EXISTS mydb' | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}" -d @-
echo 'CREATE TABLE mydb (datetime String, d1 String, d2 String ) ENGINE=Memory' | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}" -d @-
echo "2021-Jan^d1^d2" | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}&query=INSERT%20INTO%20mydb%20FORMAT%20CustomSeparated%20SETTINGS%20format_custom_escaping_rule%3D%27CSV%27%2C%20format_custom_field_delimiter%20%3D%20%27%5E%27" --data-binary @-
echo -n "" | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}&query=INSERT%20INTO%20mydb%20FORMAT%20CustomSeparated%20SETTINGS%20format_custom_escaping_rule%3D%27CSV%27%2C%20format_custom_field_delimiter%20%3D%20%27%5E%27" --data-binary @-
echo 'SELECT * FROM mydb' | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}" -d @-
printf "2021-Jan^d1^d2\n%.0s" {1..999999} | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}&query=INSERT%20INTO%20mydb%20FORMAT%20CustomSeparated%20SETTINGS%20format_custom_escaping_rule%3D%27CSV%27%2C%20format_custom_field_delimiter%20%3D%20%27%5E%27" --data-binary @-
echo 'SELECT count(*), countDistinct(datetime, d1, d2) FROM mydb' | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}" -d @-
echo 'DROP TABLE mydb' | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}" -d @-