diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeReadPool.h b/dbms/include/DB/Storages/MergeTree/MergeTreeReadPool.h index 1759029ef3f..d4ea74c7a4d 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeReadPool.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeReadPool.h @@ -20,14 +20,16 @@ struct MergeTreeReadTask const NamesAndTypesList & columns; const NamesAndTypesList & pre_columns; const bool remove_prewhere_column; + const MarkRanges & all_ranges; MergeTreeReadTask(const MergeTreeData::DataPartPtr & data_part, const MarkRanges & ranges, const std::size_t part_index_in_query, const Names & ordered_names, const NameSet & column_name_set, const NamesAndTypesList & columns, - const NamesAndTypesList & pre_columns, const bool remove_prewhere_column) + const NamesAndTypesList & pre_columns, const bool remove_prewhere_column, + const MarkRanges & all_ranges) : data_part{data_part}, mark_ranges{ranges}, part_index_in_query{part_index_in_query}, ordered_names{ordered_names}, column_name_set{column_name_set}, columns{columns}, pre_columns{pre_columns}, - remove_prewhere_column{remove_prewhere_column} + remove_prewhere_column{remove_prewhere_column}, all_ranges{all_ranges} {} }; @@ -115,7 +117,7 @@ public: return std::make_unique( part.data_part, ranges_to_get_from_part, part.part_index_in_query, ordered_names, column_name_set, columns, - pre_columns, remove_prewhere_column); + pre_columns, remove_prewhere_column, per_part_all_ranges[part_id]); } public: @@ -124,6 +126,8 @@ public: { for (const auto & part : parts) { + per_part_all_ranges.push_back(part.ranges); + per_part_columns_lock.push_back(std::make_unique( part.data_part->columns_lock)); @@ -238,6 +242,7 @@ public: std::vector> per_part_columns_lock; RangesInDataParts parts; + std::vector per_part_all_ranges; std::vector per_part_sum_marks; std::size_t sum_marks; MergeTreeData & data; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h b/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h index c55e67412f1..64aeb1076d5 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeReader.h @@ -203,13 +203,15 @@ public: fillMissingColumnsImpl(res, ordered_names, true); } + const MergeTreeData::DataPartPtr & getDataPart() const { return data_part; } + private: struct Stream { MarkCache::MappedPtr marks; ReadBuffer * data_buffer; - Poco::SharedPtr cached_buffer; - Poco::SharedPtr non_cached_buffer; + std::unique_ptr cached_buffer; + std::unique_ptr non_cached_buffer; std::string path_prefix; size_t max_mark_range; @@ -269,15 +271,15 @@ private: if (uncompressed_cache) { - cached_buffer = new CachedCompressedReadBuffer(path_prefix + ".bin", uncompressed_cache, - estimated_size, aio_threshold, buffer_size); - data_buffer = &*cached_buffer; + cached_buffer = std::make_unique( + path_prefix + ".bin", uncompressed_cache, estimated_size, aio_threshold, buffer_size); + data_buffer = cached_buffer.get(); } else { - non_cached_buffer = new CompressedReadBufferFromFile(path_prefix + ".bin", estimated_size, - aio_threshold, buffer_size); - data_buffer = &*non_cached_buffer; + non_cached_buffer = std::make_unique( + path_prefix + ".bin", estimated_size, aio_threshold, buffer_size); + data_buffer = non_cached_buffer.get(); } } diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeThreadBlockInputStream.h b/dbms/include/DB/Storages/MergeTree/MergeTreeThreadBlockInputStream.h index 6cc887ebf0c..a5330cfaf46 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeThreadBlockInputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeThreadBlockInputStream.h @@ -77,16 +77,18 @@ protected: const auto path = storage.getFullPath() + task->data_part->name + '/'; - reader = std::make_unique( - path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), - storage, - task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size); + if (!reader || reader->getDataPart() != task->data_part) + { + reader = std::make_unique( + path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), + storage, task->all_ranges, min_bytes_to_use_direct_io, max_read_buffer_size); - if (prewhere_actions) - pre_reader = std::make_unique( - path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(), - owned_mark_cache.get(), - storage, task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size); + if (prewhere_actions) + pre_reader = std::make_unique( + path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(), + owned_mark_cache.get(), storage, task->all_ranges, min_bytes_to_use_direct_io, + max_read_buffer_size); + } } res = readFromPart();