Merge pull request #48515 from ClickHouse/Clear_marks_cache_for_outdated_parts

MergeTreeMarksLoader holds DataPart instead of DataPartStorage
This commit is contained in:
SmitaRKulkarni 2023-05-02 15:11:14 +02:00 committed by GitHub
commit 997210c782
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 18 additions and 13 deletions

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeTreeIndexReader.h>
#include <Interpreters/Context.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
namespace
{
@ -20,7 +21,7 @@ std::unique_ptr<MergeTreeReaderStream> makeIndexReader(
auto * load_marks_threadpool = settings.read_settings.load_marks_asynchronously ? &context->getLoadMarksThreadpool() : nullptr;
return std::make_unique<MergeTreeReaderStream>(
part->getDataPartStoragePtr(),
std::make_shared<LoadedMergeTreeDataPartInfoForReader>(part),
index->getFileName(), extension, marks_count,
all_mark_ranges,
std::move(settings), mark_cache, uncompressed_cache,

View File

@ -30,7 +30,7 @@ namespace ErrorCodes
}
MergeTreeMarksLoader::MergeTreeMarksLoader(
DataPartStoragePtr data_part_storage_,
MergeTreeDataPartInfoForReaderPtr data_part_reader_,
MarkCache * mark_cache_,
const String & mrk_path_,
size_t marks_count_,
@ -39,7 +39,7 @@ MergeTreeMarksLoader::MergeTreeMarksLoader(
const ReadSettings & read_settings_,
ThreadPool * load_marks_threadpool_,
size_t columns_in_mark_)
: data_part_storage(std::move(data_part_storage_))
: data_part_reader(data_part_reader_)
, mark_cache(mark_cache_)
, mrk_path(mrk_path_)
, marks_count(marks_count_)
@ -98,6 +98,8 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
auto data_part_storage = data_part_reader->getDataPartStorage();
size_t file_size = data_part_storage->getFileSize(mrk_path);
size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark);
size_t expected_uncompressed_size = mark_size * marks_count;
@ -177,6 +179,8 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarks()
{
MarkCache::MappedPtr loaded_marks;
auto data_part_storage = data_part_reader->getDataPartStorage();
if (mark_cache)
{
auto key = mark_cache->hash(fs::path(data_part_storage->getFullPath()) / mrk_path);

View File

@ -1,9 +1,9 @@
#pragma once
#include <Storages/MergeTree/IDataPartStorage.h>
#include <Storages/MarkCache.h>
#include <IO/ReadSettings.h>
#include <Common/ThreadPool_fwd.h>
#include <Storages/MergeTree/IMergeTreeDataPartInfoForReader.h>
namespace DB
@ -18,7 +18,7 @@ public:
using MarksPtr = MarkCache::MappedPtr;
MergeTreeMarksLoader(
DataPartStoragePtr data_part_storage_,
MergeTreeDataPartInfoForReaderPtr data_part_reader_,
MarkCache * mark_cache_,
const String & mrk_path,
size_t marks_count_,
@ -33,7 +33,7 @@ public:
MarkInCompressedFile getMark(size_t row_index, size_t column_index = 0);
private:
DataPartStoragePtr data_part_storage;
MergeTreeDataPartInfoForReaderPtr data_part_reader;
MarkCache * mark_cache = nullptr;
String mrk_path;
size_t marks_count;

View File

@ -36,7 +36,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
settings_,
avg_value_size_hints_)
, marks_loader(
data_part_info_for_read_->getDataPartStorage(),
data_part_info_for_read_,
mark_cache,
data_part_info_for_read_->getIndexGranularityInfo().getMarksFilePath(MergeTreeDataPartCompact::DATA_FILE_NAME),
data_part_info_for_read_->getMarksCount(),

View File

@ -15,7 +15,7 @@ namespace ErrorCodes
}
MergeTreeReaderStream::MergeTreeReaderStream(
DataPartStoragePtr data_part_storage_,
MergeTreeDataPartInfoForReaderPtr data_part_reader_,
const String & path_prefix_,
const String & data_file_extension_,
size_t marks_count_,
@ -35,7 +35,7 @@ MergeTreeReaderStream::MergeTreeReaderStream(
, all_mark_ranges(all_mark_ranges_)
, file_size(file_size_)
, uncompressed_cache(uncompressed_cache_)
, data_part_storage(std::move(data_part_storage_))
, data_part_storage(data_part_reader_->getDataPartStorage())
, path_prefix(path_prefix_)
, data_file_extension(data_file_extension_)
, is_low_cardinality_dictionary(is_low_cardinality_dictionary_)
@ -44,7 +44,7 @@ MergeTreeReaderStream::MergeTreeReaderStream(
, save_marks_in_cache(settings.save_marks_in_cache)
, index_granularity_info(index_granularity_info_)
, marks_loader(
data_part_storage,
data_part_reader_,
mark_cache,
index_granularity_info->getMarksFilePath(path_prefix),
marks_count,

View File

@ -9,6 +9,7 @@
#include <Compression/CompressedReadBufferFromFile.h>
#include <Storages/MergeTree/MergeTreeIOSettings.h>
#include <Storages/MergeTree/MergeTreeMarksLoader.h>
#include <Storages/MergeTree/IMergeTreeDataPartInfoForReader.h>
namespace DB
@ -19,7 +20,7 @@ class MergeTreeReaderStream
{
public:
MergeTreeReaderStream(
DataPartStoragePtr data_part_storage_,
MergeTreeDataPartInfoForReaderPtr data_part_reader_,
const String & path_prefix_,
const String & data_file_extension_,
size_t marks_count_,

View File

@ -242,7 +242,7 @@ void MergeTreeReaderWide::addStreams(
auto * load_marks_threadpool = settings.read_settings.load_marks_asynchronously ? &context->getLoadMarksThreadpool() : nullptr;
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
data_part_info_for_read->getDataPartStorage(), stream_name, DATA_FILE_EXTENSION,
data_part_info_for_read, stream_name, DATA_FILE_EXTENSION,
data_part_info_for_read->getMarksCount(), all_mark_ranges, settings, mark_cache,
uncompressed_cache, data_part_info_for_read->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
&data_part_info_for_read->getIndexGranularityInfo(),

View File

@ -63,7 +63,6 @@ function thread6()
done
}
# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
export -f thread1;
export -f thread2;