diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 4515db11067..0672d7ed040 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -43,7 +44,6 @@ struct ReadFromMergeTree::AnalysisResult { RangesInDataParts parts_with_ranges; MergeTreeDataSelectSamplingData sampling; - String query_id; IndexStats index_stats; Names column_names_to_read; ReadFromMergeTree::ReadType read_type = ReadFromMergeTree::ReadType::Default; @@ -830,14 +830,14 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre LOG_DEBUG(log, "Key condition: {}", key_condition.toString()); const auto & select = query_info.query->as(); - auto query_context = context->hasQueryContext() ? context->getQueryContext() : context; MergeTreeDataSelectExecutor::filterPartsByPartition( - metadata_snapshot_base, data, query_info, context, query_context, parts, part_values, max_block_numbers_to_read.get(), log, result.index_stats); + parts, part_values, metadata_snapshot_base, data, query_info, context, + max_block_numbers_to_read.get(), log, result.index_stats); result.sampling = MergeTreeDataSelectExecutor::getSampling( - select, parts, metadata_snapshot, key_condition, - data, log, sample_factor_column_queried, metadata_snapshot->getColumns().getAllPhysical(), context); + select, metadata_snapshot->getColumns().getAllPhysical(), parts, key_condition, + data, metadata_snapshot, context, sample_factor_column_queried, log); if (result.sampling.read_nothing) return result; @@ -885,8 +885,6 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre sum_marks, sum_ranges); - result.query_id = MergeTreeDataSelectExecutor::checkLimits(data, result.parts_with_ranges, context); - ProfileEvents::increment(ProfileEvents::SelectedParts, result.parts_with_ranges.size()); ProfileEvents::increment(ProfileEvents::SelectedRanges, sum_ranges); ProfileEvents::increment(ProfileEvents::SelectedMarks, sum_marks); @@ -905,6 +903,8 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) { auto result = selectRangesToRead(prepared_parts); + auto query_id_holder = MergeTreeDataSelectExecutor::checkLimits(data, result.parts_with_ranges, context); + if (result.parts_with_ranges.empty()) { pipeline.init(Pipe(std::make_shared(getOutputStream().header))); @@ -1048,8 +1048,8 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build processors.emplace_back(processor); // Attach QueryIdHolder if needed - if (!result.query_id.empty()) - pipe.addQueryIdHolder(std::make_shared(result.query_id, data)); + if (query_id_holder) + pipe.addQueryIdHolder(std::move(query_id_holder)); pipeline.init(std::move(pipe)); } diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index 07d45a71e0a..6e1efffdb02 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -1,15 +1,14 @@ #pragma once #include -#include #include -#include -//#include namespace DB { using PartitionIdToMaxBlock = std::unordered_map; +class Pipe; + /// This step is created to read from MergeTree* table. /// For now, it takes a list of parts and creates source from it. class ReadFromMergeTree final : public ISourceStep diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index c12759bac97..caf69b8ecfb 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -381,14 +381,14 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( MergeTreeDataSelectSamplingData MergeTreeDataSelectExecutor::getSampling( const ASTSelectQuery & select, + NamesAndTypesList available_real_columns, const MergeTreeData::DataPartsVector & parts, - const StorageMetadataPtr & metadata_snapshot, KeyCondition & key_condition, const MergeTreeData & data, - Poco::Logger * log, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, bool sample_factor_column_queried, - NamesAndTypesList available_real_columns, - ContextPtr context) + Poco::Logger * log) { const Settings & settings = context->getSettingsRef(); /// Sampling. @@ -643,7 +643,7 @@ MergeTreeDataSelectSamplingData MergeTreeDataSelectExecutor::getSampling( std::optional> MergeTreeDataSelectExecutor::filterPartsByVirtualColumns( const MergeTreeData & data, - MergeTreeData::DataPartsVector & parts, + const MergeTreeData::DataPartsVector & parts, const ASTPtr & query, ContextPtr context) { @@ -666,13 +666,12 @@ std::optional> MergeTreeDataSelectExecutor::filterPar } void MergeTreeDataSelectExecutor::filterPartsByPartition( + MergeTreeData::DataPartsVector & parts, + const std::optional> & part_values, const StorageMetadataPtr & metadata_snapshot, const MergeTreeData & data, const SelectQueryInfo & query_info, const ContextPtr & context, - const ContextPtr & query_context, - MergeTreeData::DataPartsVector & parts, - const std::optional> & part_values, const PartitionIdToMaxBlock * max_block_numbers_to_read, Poco::Logger * log, ReadFromMergeTree::IndexStats & index_stats) @@ -709,6 +708,7 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition( } } + auto query_context = context->hasQueryContext() ? context->getQueryContext() : context; PartFilterCounters part_filter_counters; if (query_context->getSettingsRef().allow_experimental_query_deduplication) selectPartsToReadWithUUIDFilter( @@ -766,7 +766,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd StorageMetadataPtr metadata_snapshot, const SelectQueryInfo & query_info, const ContextPtr & context, - KeyCondition & key_condition, + const KeyCondition & key_condition, const MergeTreeReaderSettings & reader_settings, Poco::Logger * log, size_t num_streams, @@ -992,7 +992,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd return parts_with_ranges; } -String MergeTreeDataSelectExecutor::checkLimits( +std::shared_ptr MergeTreeDataSelectExecutor::checkLimits( const MergeTreeData & data, const RangesInDataParts & parts_with_ranges, const ContextPtr & context) @@ -1032,7 +1032,10 @@ String MergeTreeDataSelectExecutor::checkLimits( } } - return query_id; + if (!query_id.empty()) + return std::make_shared(query_id, data); + + return nullptr; } static void selectColumnNames( @@ -1135,15 +1138,15 @@ size_t MergeTreeDataSelectExecutor::estimateNumMarksToRead( } const auto & select = query_info.query->as(); - auto query_context = context->hasQueryContext() ? context->getQueryContext() : context; ReadFromMergeTree::IndexStats index_stats; filterPartsByPartition( - metadata_snapshot_base, data, query_info, context, query_context, parts, part_values, max_block_numbers_to_read.get(), log, index_stats); + parts, part_values, metadata_snapshot_base, data, query_info, + context, max_block_numbers_to_read.get(), log, index_stats); auto sampling = MergeTreeDataSelectExecutor::getSampling( - select, parts, metadata_snapshot, key_condition, - data, log, sample_factor_column_queried, metadata_snapshot->getColumns().getAllPhysical(), context); + select, metadata_snapshot->getColumns().getAllPhysical(), parts, key_condition, + data, metadata_snapshot, context, sample_factor_column_queried, log); if (sampling.read_nothing) return 0; diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index b43fb785573..bd2a79f0aee 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -45,16 +45,7 @@ public: QueryProcessingStage::Enum processed_stage, std::shared_ptr max_block_numbers_to_read = nullptr) const; - size_t estimateNumMarksToRead( - MergeTreeData::DataPartsVector parts, - const Names & column_names, - const StorageMetadataPtr & metadata_snapshot_base, - const StorageMetadataPtr & metadata_snapshot, - const SelectQueryInfo & query_info, - ContextPtr context, - unsigned num_streams, - std::shared_ptr max_block_numbers_to_read = nullptr) const; - + /// The same as read, but with specified set of parts. QueryPlanPtr readFromParts( MergeTreeData::DataPartsVector parts, const Names & column_names, @@ -66,6 +57,19 @@ public: unsigned num_streams, std::shared_ptr max_block_numbers_to_read = nullptr) const; + /// Get an estimation for the number of marks we are going to read. + /// Reads nothing. Secondary indexes are not used. + /// This method is used to select best projection for table. + size_t estimateNumMarksToRead( + MergeTreeData::DataPartsVector parts, + const Names & column_names, + const StorageMetadataPtr & metadata_snapshot_base, + const StorageMetadataPtr & metadata_snapshot, + const SelectQueryInfo & query_info, + ContextPtr context, + unsigned num_streams, + std::shared_ptr max_block_numbers_to_read = nullptr) const; + private: const MergeTreeData & data; Poco::Logger * log; @@ -131,12 +135,15 @@ private: Poco::Logger * log); public: + /// For given number rows and bytes, get the number of marks to read. + /// It is a minimal number of marks which contain so many rows and bytes. static size_t roundRowsOrBytesToMarks( size_t rows_setting, size_t bytes_setting, size_t rows_granularity, size_t bytes_granularity); + /// The same as roundRowsOrBytesToMarks, but return no more than max_marks. static size_t minMarksForConcurrentRead( size_t rows_setting, size_t bytes_setting, @@ -144,48 +151,58 @@ public: size_t bytes_granularity, size_t max_marks); + /// If possible, filter using expression on virtual columns. + /// Example: SELECT count() FROM table WHERE _part = 'part_name' + /// If expression found, return a set with allowed part names (std::nullopt otherwise). static std::optional> filterPartsByVirtualColumns( const MergeTreeData & data, - MergeTreeData::DataPartsVector & parts, + const MergeTreeData::DataPartsVector & parts, const ASTPtr & query, ContextPtr context); + /// Filter parts using minmax index and partition key. static void filterPartsByPartition( + MergeTreeData::DataPartsVector & parts, + const std::optional> & part_values, const StorageMetadataPtr & metadata_snapshot, const MergeTreeData & data, const SelectQueryInfo & query_info, const ContextPtr & context, - const ContextPtr & query_context, - MergeTreeData::DataPartsVector & parts, - const std::optional> & part_values, const PartitionIdToMaxBlock * max_block_numbers_to_read, Poco::Logger * log, ReadFromMergeTree::IndexStats & index_stats); + /// Filter parts using primary key and secondary indexes. + /// For every part, select mark ranges to read. static RangesInDataParts filterPartsByPrimaryKeyAndSkipIndexes( MergeTreeData::DataPartsVector && parts, StorageMetadataPtr metadata_snapshot, const SelectQueryInfo & query_info, const ContextPtr & context, - KeyCondition & key_condition, + const KeyCondition & key_condition, const MergeTreeReaderSettings & reader_settings, Poco::Logger * log, size_t num_streams, ReadFromMergeTree::IndexStats & index_stats, bool use_skip_indexes); + /// Create expression for sampling. + /// Also, calculate _sample_factor if needed. + /// Also, update key condition with selected sampling range. static MergeTreeDataSelectSamplingData getSampling( const ASTSelectQuery & select, + NamesAndTypesList available_real_columns, const MergeTreeData::DataPartsVector & parts, - const StorageMetadataPtr & metadata_snapshot, KeyCondition & key_condition, const MergeTreeData & data, - Poco::Logger * log, + const StorageMetadataPtr & metadata_snapshot, + ContextPtr context, bool sample_factor_column_queried, - NamesAndTypesList available_real_columns, - ContextPtr context); + Poco::Logger * log); - static String checkLimits( + /// Check query limits: max_partitions_to_read, max_concurrent_queries. + /// Also, return QueryIdHolder. If not null, we should keep it until query finishes. + static std::shared_ptr checkLimits( const MergeTreeData & data, const RangesInDataParts & parts_with_ranges, const ContextPtr & context);