fix checkpoint in PeekableReadBuffer over ConcatReadBuffer

This commit is contained in:
Alexander Tokmakov 2021-01-14 17:31:35 +03:00
parent c97469773d
commit 547c7af1b1
6 changed files with 31 additions and 17 deletions

View File

@ -25,7 +25,7 @@ void PeekableReadBuffer::reset()
checkStateCorrect();
peeked_size = 0;
checkpoint = nullptr;
checkpoint = std::nullopt;
checkpoint_in_own_memory = false;
if (!currentlyReadFromOwnMemory())
@ -47,7 +47,7 @@ bool PeekableReadBuffer::peekNext()
{
/// Don't have to copy all data from sub-buffer if there is no data in own memory (checkpoint and pos are in sub-buffer)
if (checkpoint)
copy_from = checkpoint;
copy_from = *checkpoint;
bytes_to_copy = sub_buf.buffer().end() - copy_from;
if (!bytes_to_copy)
{
@ -57,7 +57,7 @@ bool PeekableReadBuffer::peekNext()
bool res = sub_buf.next();
BufferBase::set(sub_buf.buffer().begin(), sub_buf.buffer().size(), sub_buf.offset());
if (checkpoint)
checkpoint = pos;
checkpoint.emplace(pos);
checkStateCorrect();
return res;
@ -79,7 +79,7 @@ bool PeekableReadBuffer::peekNext()
/// Otherwise, checkpoint now at the beginning of own memory
if (checkpoint && useSubbufferOnly())
{
checkpoint = memory.data();
checkpoint.emplace(memory.data());
checkpoint_in_own_memory = true;
}
if (currentlyReadFromOwnMemory())
@ -115,9 +115,9 @@ void PeekableReadBuffer::rollbackToCheckpoint()
if (!checkpoint)
throw DB::Exception("There is no checkpoint", ErrorCodes::LOGICAL_ERROR);
else if (checkpointInOwnMemory() == currentlyReadFromOwnMemory())
pos = checkpoint;
pos = *checkpoint;
else /// Checkpoint is in own memory and pos is not. Switch to reading from own memory
BufferBase::set(memory.data(), peeked_size, checkpoint - memory.data());
BufferBase::set(memory.data(), peeked_size, *checkpoint - memory.data());
checkStateCorrect();
}
@ -169,7 +169,7 @@ void PeekableReadBuffer::checkStateCorrect() const
{
if (!peeked_size)
throw DB::Exception("Checkpoint in empty own buffer", ErrorCodes::LOGICAL_ERROR);
if (currentlyReadFromOwnMemory() && pos < checkpoint)
if (currentlyReadFromOwnMemory() && pos < *checkpoint)
throw DB::Exception("Current position in own buffer before checkpoint in own buffer", ErrorCodes::LOGICAL_ERROR);
if (!currentlyReadFromOwnMemory() && pos < sub_buf.position())
throw DB::Exception("Current position in subbuffer less than sub_buf.position()", ErrorCodes::LOGICAL_ERROR);
@ -180,7 +180,7 @@ void PeekableReadBuffer::checkStateCorrect() const
throw DB::Exception("Own buffer is not empty", ErrorCodes::LOGICAL_ERROR);
if (currentlyReadFromOwnMemory())
throw DB::Exception("Current position in own buffer before checkpoint in subbuffer", ErrorCodes::LOGICAL_ERROR);
if (pos < checkpoint)
if (pos < *checkpoint)
throw DB::Exception("Current position in subbuffer before checkpoint in subbuffer", ErrorCodes::LOGICAL_ERROR);
}
}
@ -202,7 +202,7 @@ void PeekableReadBuffer::resizeOwnMemoryIfNecessary(size_t bytes_to_append)
bool need_update_pos = currentlyReadFromOwnMemory();
size_t offset = 0;
if (need_update_checkpoint)
offset = checkpoint - memory.data();
offset = *checkpoint - memory.data();
else if (need_update_pos)
offset = this->offset();
@ -216,7 +216,7 @@ void PeekableReadBuffer::resizeOwnMemoryIfNecessary(size_t bytes_to_append)
memmove(memory.data(), memory.data() + offset, peeked_size);
if (need_update_checkpoint)
checkpoint -= offset;
*checkpoint -= offset;
if (need_update_pos)
pos -= offset;
}
@ -235,7 +235,7 @@ void PeekableReadBuffer::resizeOwnMemoryIfNecessary(size_t bytes_to_append)
memory.resize(new_size_amortized);
if (need_update_checkpoint)
checkpoint = memory.data() + offset;
checkpoint.emplace(memory.data() + offset);
if (need_update_pos)
{
BufferBase::set(memory.data(), peeked_size, pos_offset);
@ -252,7 +252,7 @@ void PeekableReadBuffer::makeContinuousMemoryFromCheckpointToPos()
checkStateCorrect();
if (!checkpointInOwnMemory() || currentlyReadFromOwnMemory())
return; /// is't already continuous
return; /// it's already continuous
size_t bytes_to_append = pos - sub_buf.position();
resizeOwnMemoryIfNecessary(bytes_to_append);

View File

@ -38,7 +38,7 @@ public:
/// Don't need to store unread data anymore
peeked_size = 0;
}
checkpoint = pos;
checkpoint.emplace(pos);
// FIXME: we are checking checkpoint existence in few places (rollbackToCheckpoint/dropCheckpoint)
// by simple if(checkpoint) but checkpoint can be nullptr after
@ -58,7 +58,7 @@ public:
/// Don't need to store unread data anymore
peeked_size = 0;
}
checkpoint = nullptr;
checkpoint = std::nullopt;
checkpoint_in_own_memory = false;
}
@ -97,7 +97,7 @@ private:
ReadBuffer & sub_buf;
const size_t unread_limit;
size_t peeked_size = 0;
Position checkpoint = nullptr;
std::optional<Position> checkpoint = std::nullopt;
bool checkpoint_in_own_memory = false;
};

View File

@ -12,7 +12,6 @@
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterWatchQuery.h>

View File

@ -3,7 +3,6 @@
#include <Common/typeid_cast.h>
#include <Common/ThreadProfileEvents.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromVector.h>
#include <IO/LimitReadBuffer.h>

View File

@ -0,0 +1,2 @@
2021-Jan d1 d2
1000000 1

View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
echo 'DROP TABLE IF EXISTS mydb' | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}" -d @-
echo 'CREATE TABLE mydb (datetime String, d1 String, d2 String ) ENGINE=Memory' | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}" -d @-
echo "2021-Jan^d1^d2" | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}&query=INSERT%20INTO%20mydb%20FORMAT%20CustomSeparated%20SETTINGS%20format_custom_escaping_rule%3D%27CSV%27%2C%20format_custom_field_delimiter%20%3D%20%27%5E%27" --data-binary @-
echo -n "" | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}&query=INSERT%20INTO%20mydb%20FORMAT%20CustomSeparated%20SETTINGS%20format_custom_escaping_rule%3D%27CSV%27%2C%20format_custom_field_delimiter%20%3D%20%27%5E%27" --data-binary @-
echo 'SELECT * FROM mydb' | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}" -d @-
printf "2021-Jan^d1^d2\n%.0s" {1..999999} | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}&query=INSERT%20INTO%20mydb%20FORMAT%20CustomSeparated%20SETTINGS%20format_custom_escaping_rule%3D%27CSV%27%2C%20format_custom_field_delimiter%20%3D%20%27%5E%27" --data-binary @-
echo 'SELECT count(*), countDistinct(datetime, d1, d2) FROM mydb' | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}" -d @-
echo 'DROP TABLE mydb' | ${CLICKHOUSE_CURL} -sSg "${CLICKHOUSE_URL}" -d @-