Lazy marks loading

This commit is contained in:
Anton Kozlov 2022-04-29 17:39:06 +00:00
parent ea93ab51d5
commit 23e6792898
6 changed files with 84 additions and 12 deletions

View File

@ -68,7 +68,7 @@ void MergeTreeIndexReader::seek(size_t mark)
MergeTreeIndexGranulePtr MergeTreeIndexReader::read()
{
auto granule = index->createIndexGranule();
granule->deserializeBinary(*stream->data_buffer, version);
granule->deserializeBinary(*stream->getDataBuffer(), version);
return granule;
}

View File

@ -17,25 +17,35 @@ namespace ErrorCodes
MergeTreeReaderStream::MergeTreeReaderStream(
DiskPtr disk_,
const String & path_prefix_, const String & data_file_extension_, size_t marks_count_,
const MarkRanges & all_mark_ranges,
const MergeTreeReaderSettings & settings,
const MarkRanges & all_mark_ranges_,
const MergeTreeReaderSettings & settings_,
MarkCache * mark_cache_,
UncompressedCache * uncompressed_cache, size_t file_size_,
UncompressedCache * uncompressed_cache_, size_t file_size_,
const MergeTreeIndexGranularityInfo * index_granularity_info_,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type,
const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_,
bool is_low_cardinality_dictionary_)
: disk(std::move(disk_))
: settings(settings_)
, profile_callback(profile_callback_)
, clock_type(clock_type_)
, all_mark_ranges(all_mark_ranges_)
, file_size(file_size_)
, uncompressed_cache(uncompressed_cache_)
, disk(std::move(disk_))
, path_prefix(path_prefix_)
, data_file_extension(data_file_extension_)
, is_low_cardinality_dictionary(is_low_cardinality_dictionary_)
, marks_count(marks_count_)
, file_size(file_size_)
, mark_cache(mark_cache_)
, save_marks_in_cache(settings.save_marks_in_cache)
, index_granularity_info(index_granularity_info_)
, marks_loader(disk, mark_cache, index_granularity_info->getMarksFilePath(path_prefix),
marks_count, *index_granularity_info, save_marks_in_cache)
marks_count, *index_granularity_info, save_marks_in_cache) {}
void MergeTreeReaderStream::init()
{
if (initialized)
return;
initialized = true;
/// Compute the size of the buffer.
size_t max_mark_range_bytes = 0;
size_t sum_mark_range_bytes = 0;
@ -193,6 +203,7 @@ size_t MergeTreeReaderStream::getRightOffset(size_t right_mark_non_included)
void MergeTreeReaderStream::seekToMark(size_t index)
{
init();
MarkInCompressedFile mark = marks_loader.getMark(index);
try
@ -215,6 +226,7 @@ void MergeTreeReaderStream::seekToMark(size_t index)
void MergeTreeReaderStream::seekToStart()
{
init();
try
{
compressed_data_buffer->seek(0, 0);
@ -237,6 +249,7 @@ void MergeTreeReaderStream::adjustRightMark(size_t right_mark)
* read from stream, but we must update last_right_offset only if it is bigger than
* the last one to avoid redundantly cancelling prefetches.
*/
init();
auto right_offset = getRightOffset(right_mark);
if (!right_offset)
{
@ -256,4 +269,16 @@ void MergeTreeReaderStream::adjustRightMark(size_t right_mark)
}
}
ReadBuffer * MergeTreeReaderStream::getDataBuffer()
{
init();
return data_buffer;
}
CompressedReadBufferBase * MergeTreeReaderStream::getCompressedDataBuffer()
{
init();
return compressed_data_buffer;
}
}

View File

@ -1,4 +1,5 @@
#pragma once
#include <tuple>
#include <Storages/MarkCache.h>
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/MergeTreeData.h>
@ -37,12 +38,20 @@ public:
*/
void adjustRightMark(size_t right_mark);
ReadBuffer * data_buffer;
CompressedReadBufferBase * compressed_data_buffer;
ReadBuffer * getDataBuffer();
CompressedReadBufferBase * getCompressedDataBuffer();
private:
void init();
size_t getRightOffset(size_t right_mark_non_included);
const MergeTreeReaderSettings settings;
const ReadBufferFromFileBase::ProfileCallback profile_callback;
clockid_t clock_type;
const MarkRanges all_mark_ranges;
size_t file_size;
UncompressedCache * uncompressed_cache;
DiskPtr disk;
std::string path_prefix;
std::string data_file_extension;
@ -50,10 +59,13 @@ private:
bool is_low_cardinality_dictionary = false;
size_t marks_count;
size_t file_size;
ReadBuffer * data_buffer;
CompressedReadBufferBase * compressed_data_buffer;
MarkCache * mark_cache;
bool save_marks_in_cache;
bool initialized = false;
std::optional<size_t> last_right_offset;

View File

@ -221,7 +221,7 @@ static ReadBuffer * getStream(
else if (seek_to_mark)
stream.seekToMark(from_mark);
return stream.data_buffer;
return stream.getDataBuffer();
}
void MergeTreeReaderWide::deserializePrefix(

View File

@ -0,0 +1 @@
2

View File

@ -0,0 +1,34 @@
#!/usr/bin/env bash
set -eo pipefail
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
QUERY_ID=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(reverse(reinterpretAsString(generateUUIDv4()))))")
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS lazy_mark_test;"
${CLICKHOUSE_CLIENT} <<EOF
CREATE TABLE lazy_mark_test
(
n0 UInt64,
n1 UInt64,
n2 UInt64,
n3 UInt64,
n4 UInt64,
n5 UInt64,
n6 UInt64,
n7 UInt64,
n8 UInt64,
n9 UInt64
)
ENGINE = MergeTree
ORDER BY n0 SETTINGS min_bytes_for_wide_part = 0;
EOF
${CLICKHOUSE_CLIENT} -q "INSERT INTO lazy_mark_test select number, number % 3, number % 5, number % 10, number % 13, number % 15, number % 17, number % 18, number % 22, number % 25 from numbers(1000000)"
${CLICKHOUSE_CLIENT} -q "SYSTEM DROP MARK CACHE"
${CLICKHOUSE_CLIENT} --log_queries=1 --query_id "${QUERY_ID}" -q "SELECT * FROM lazy_mark_test WHERE n3==11"
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} -q "select ProfileEvents['FileOpen'] from system.query_log where query_id = '${QUERY_ID}' and type = 'QueryFinish' and current_database = currentDatabase()"