diff --git a/src/Storages/MergeTree/MergeTreeIndexReader.cpp b/src/Storages/MergeTree/MergeTreeIndexReader.cpp index 7d7024a8ac2..88fbc8c2488 100644 --- a/src/Storages/MergeTree/MergeTreeIndexReader.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexReader.cpp @@ -1,5 +1,6 @@ #include #include +#include namespace { @@ -20,7 +21,7 @@ std::unique_ptr makeIndexReader( auto * load_marks_threadpool = settings.read_settings.load_marks_asynchronously ? &context->getLoadMarksThreadpool() : nullptr; return std::make_unique( - part->getDataPartStoragePtr(), + std::make_shared(part), index->getFileName(), extension, marks_count, all_mark_ranges, std::move(settings), mark_cache, uncompressed_cache, diff --git a/src/Storages/MergeTree/MergeTreeMarksLoader.cpp b/src/Storages/MergeTree/MergeTreeMarksLoader.cpp index c6bb021e80f..9a5576f0ad2 100644 --- a/src/Storages/MergeTree/MergeTreeMarksLoader.cpp +++ b/src/Storages/MergeTree/MergeTreeMarksLoader.cpp @@ -30,7 +30,7 @@ namespace ErrorCodes } MergeTreeMarksLoader::MergeTreeMarksLoader( - DataPartStoragePtr data_part_storage_, + MergeTreeDataPartInfoForReaderPtr data_part_reader_, MarkCache * mark_cache_, const String & mrk_path_, size_t marks_count_, @@ -39,7 +39,7 @@ MergeTreeMarksLoader::MergeTreeMarksLoader( const ReadSettings & read_settings_, ThreadPool * load_marks_threadpool_, size_t columns_in_mark_) - : data_part_storage(std::move(data_part_storage_)) + : data_part_reader(data_part_reader_) , mark_cache(mark_cache_) , mrk_path(mrk_path_) , marks_count(marks_count_) @@ -98,6 +98,8 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl() /// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache. MemoryTrackerBlockerInThread temporarily_disable_memory_tracker; + auto data_part_storage = data_part_reader->getDataPartStorage(); + size_t file_size = data_part_storage->getFileSize(mrk_path); size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark); size_t expected_uncompressed_size = mark_size * marks_count; @@ -177,6 +179,8 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarks() { MarkCache::MappedPtr loaded_marks; + auto data_part_storage = data_part_reader->getDataPartStorage(); + if (mark_cache) { auto key = mark_cache->hash(fs::path(data_part_storage->getFullPath()) / mrk_path); diff --git a/src/Storages/MergeTree/MergeTreeMarksLoader.h b/src/Storages/MergeTree/MergeTreeMarksLoader.h index 17e52939d3f..0889da0cb85 100644 --- a/src/Storages/MergeTree/MergeTreeMarksLoader.h +++ b/src/Storages/MergeTree/MergeTreeMarksLoader.h @@ -1,9 +1,9 @@ #pragma once -#include #include #include #include +#include namespace DB @@ -18,7 +18,7 @@ public: using MarksPtr = MarkCache::MappedPtr; MergeTreeMarksLoader( - DataPartStoragePtr data_part_storage_, + MergeTreeDataPartInfoForReaderPtr data_part_reader_, MarkCache * mark_cache_, const String & mrk_path, size_t marks_count_, @@ -33,7 +33,7 @@ public: MarkInCompressedFile getMark(size_t row_index, size_t column_index = 0); private: - DataPartStoragePtr data_part_storage; + MergeTreeDataPartInfoForReaderPtr data_part_reader; MarkCache * mark_cache = nullptr; String mrk_path; size_t marks_count; diff --git a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp index d1796dac6cc..26a7cb2b50b 100644 --- a/src/Storages/MergeTree/MergeTreeReaderCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderCompact.cpp @@ -36,7 +36,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact( settings_, avg_value_size_hints_) , marks_loader( - data_part_info_for_read_->getDataPartStorage(), + data_part_info_for_read_, mark_cache, data_part_info_for_read_->getIndexGranularityInfo().getMarksFilePath(MergeTreeDataPartCompact::DATA_FILE_NAME), data_part_info_for_read_->getMarksCount(), diff --git a/src/Storages/MergeTree/MergeTreeReaderStream.cpp b/src/Storages/MergeTree/MergeTreeReaderStream.cpp index cdca5aa1247..6d80dc5522c 100644 --- a/src/Storages/MergeTree/MergeTreeReaderStream.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderStream.cpp @@ -15,7 +15,7 @@ namespace ErrorCodes } MergeTreeReaderStream::MergeTreeReaderStream( - DataPartStoragePtr data_part_storage_, + MergeTreeDataPartInfoForReaderPtr data_part_reader_, const String & path_prefix_, const String & data_file_extension_, size_t marks_count_, @@ -35,7 +35,7 @@ MergeTreeReaderStream::MergeTreeReaderStream( , all_mark_ranges(all_mark_ranges_) , file_size(file_size_) , uncompressed_cache(uncompressed_cache_) - , data_part_storage(std::move(data_part_storage_)) + , data_part_storage(data_part_reader_->getDataPartStorage()) , path_prefix(path_prefix_) , data_file_extension(data_file_extension_) , is_low_cardinality_dictionary(is_low_cardinality_dictionary_) @@ -44,7 +44,7 @@ MergeTreeReaderStream::MergeTreeReaderStream( , save_marks_in_cache(settings.save_marks_in_cache) , index_granularity_info(index_granularity_info_) , marks_loader( - data_part_storage, + data_part_reader_, mark_cache, index_granularity_info->getMarksFilePath(path_prefix), marks_count, diff --git a/src/Storages/MergeTree/MergeTreeReaderStream.h b/src/Storages/MergeTree/MergeTreeReaderStream.h index f3785e175df..baf8ec713f9 100644 --- a/src/Storages/MergeTree/MergeTreeReaderStream.h +++ b/src/Storages/MergeTree/MergeTreeReaderStream.h @@ -9,6 +9,7 @@ #include #include #include +#include namespace DB @@ -19,7 +20,7 @@ class MergeTreeReaderStream { public: MergeTreeReaderStream( - DataPartStoragePtr data_part_storage_, + MergeTreeDataPartInfoForReaderPtr data_part_reader_, const String & path_prefix_, const String & data_file_extension_, size_t marks_count_, diff --git a/src/Storages/MergeTree/MergeTreeReaderWide.cpp b/src/Storages/MergeTree/MergeTreeReaderWide.cpp index 05af33da20a..69617fdf9e3 100644 --- a/src/Storages/MergeTree/MergeTreeReaderWide.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderWide.cpp @@ -242,7 +242,7 @@ void MergeTreeReaderWide::addStreams( auto * load_marks_threadpool = settings.read_settings.load_marks_asynchronously ? &context->getLoadMarksThreadpool() : nullptr; streams.emplace(stream_name, std::make_unique( - data_part_info_for_read->getDataPartStorage(), stream_name, DATA_FILE_EXTENSION, + data_part_info_for_read, stream_name, DATA_FILE_EXTENSION, data_part_info_for_read->getMarksCount(), all_mark_ranges, settings, mark_cache, uncompressed_cache, data_part_info_for_read->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION), &data_part_info_for_read->getIndexGranularityInfo(), diff --git a/tests/queries/0_stateless/00993_system_parts_race_condition_drop_zookeeper.sh b/tests/queries/0_stateless/00993_system_parts_race_condition_drop_zookeeper.sh index bceda77c7f8..f4f38ad9c83 100755 --- a/tests/queries/0_stateless/00993_system_parts_race_condition_drop_zookeeper.sh +++ b/tests/queries/0_stateless/00993_system_parts_race_condition_drop_zookeeper.sh @@ -63,7 +63,6 @@ function thread6() done } - # https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout export -f thread1; export -f thread2;