diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 6beb767c603..db047c0fc54 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -1928,8 +1928,6 @@ bool Aggregator::mergeBlock(Block block, AggregatedDataVariants & result, bool & else if (result.type != AggregatedDataVariants::Type::without_key) throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); - - size_t result_size = result.sizeWithoutOverflowRow(); Int64 current_memory_usage = 0; if (auto * memory_tracker_child = CurrentThread::getMemoryTracker()) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index aaeabbefc4d..0a297a9db1d 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -396,7 +396,7 @@ InterpreterSelectQuery::InterpreterSelectQuery( } // TODO Check if we can have prewhere work for projections, also need to allow it in TreeRewriter - if (try_move_to_prewhere && storage && query.where() && !query.prewhere() && !query.final()) + if (try_move_to_prewhere && storage && query.where() && !query.prewhere()) { /// PREWHERE optimization: transfer some condition from WHERE to PREWHERE if enabled and viable if (const auto & column_sizes = storage->getColumnSizes(); !column_sizes.empty()) @@ -590,7 +590,7 @@ Block InterpreterSelectQuery::getSampleBlockImpl() from_stage = storage->getQueryProcessingStage(context, options.to_stage, metadata_snapshot, query_info); /// XXX Used for IN set index analysis. Is this a proper way? - metadata_snapshot->selected_projection = query_info.aggregate_projection; + metadata_snapshot->selected_projection = query_info.projection; } /// Do I need to perform the first part of the pipeline? @@ -1822,7 +1822,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc /// Create optimizer with prepared actions. /// Maybe we will need to calc input_order_info later, e.g. while reading from StorageMerge. - if ((analysis_result.optimize_read_in_order || analysis_result.optimize_aggregation_in_order) && !query_info.aggregate_projection) + if ((analysis_result.optimize_read_in_order || analysis_result.optimize_aggregation_in_order) && !query_info.projection) { if (analysis_result.optimize_read_in_order) query_info.order_optimizer = std::make_shared( @@ -1864,7 +1864,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc backQuoteIfNeed(local_storage_id.getDatabaseName()), local_storage_id.getFullTableName(), required_columns, - query_info.aggregate_projection ? query_info.aggregate_projection->name : ""); + query_info.projection ? query_info.projection->name : ""); } /// Create step which reads from empty source if storage has no data. @@ -1988,7 +1988,7 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool /// It is already added by storage (because of performance issues). /// TODO: We should probably add another one processing stage for storage? /// WithMergeableStateAfterAggregation is not ok because, e.g., it skips sorting after aggregation. - if (query_info.aggregate_projection) + if (query_info.projection && query_info.projection->type == "aggregate") return; const auto & header_before_merge = query_plan.getCurrentDataStream().header; diff --git a/src/Interpreters/TreeRewriter.cpp b/src/Interpreters/TreeRewriter.cpp index b125d6e21e4..92cfba1bcb1 100644 --- a/src/Interpreters/TreeRewriter.cpp +++ b/src/Interpreters/TreeRewriter.cpp @@ -951,12 +951,6 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect( !select_query->sampleSize() && !select_query->sampleOffset() && !select_query->final() && (tables_with_columns.size() < 2 || isLeft(result.analyzed_join->kind())); - // TODO Perhaps we can support distinct or other group by variants for projections - result.can_use_projection = !result.optimize_trivial_count && settings.allow_experimental_projection_optimization - && !select_query->prewhere() && !select_query->sampleSize() && !select_query->sampleOffset() && !select_query->final() - && (tables_with_columns.size() < 2) && !select_query->distinct && !select_query->group_by_with_totals - && !select_query->group_by_with_rollup && !select_query->group_by_with_cube; - return std::make_shared(result); } diff --git a/src/Interpreters/TreeRewriter.h b/src/Interpreters/TreeRewriter.h index 223ba57fc8c..32826bcc61d 100644 --- a/src/Interpreters/TreeRewriter.h +++ b/src/Interpreters/TreeRewriter.h @@ -62,8 +62,6 @@ struct TreeRewriterResult /// instead of actual retrieving columns and counting rows. bool optimize_trivial_count = false; - bool can_use_projection = false; - /// Cache isRemote() call for storage, because it may be too heavy. bool is_remote_storage = false; diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 57490061c37..7e856192a43 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -19,7 +19,7 @@ ReadFromMergeTree::ReadFromMergeTree( RangesInDataParts parts_, IndexStatPtr index_stats_, PrewhereInfoPtr prewhere_info_, - const ProjectionDescription * aggregate_projection_, + const ProjectionDescription * projection_, Names virt_column_names_, Settings settings_, size_t num_streams_, @@ -36,7 +36,7 @@ ReadFromMergeTree::ReadFromMergeTree( , parts(std::move(parts_)) , index_stats(std::move(index_stats_)) , prewhere_info(std::move(prewhere_info_)) - , aggregate_projection(aggregate_projection_) + , projection(projection_) , virt_column_names(std::move(virt_column_names_)) , settings(std::move(settings_)) , num_streams(num_streams_) @@ -79,7 +79,7 @@ Pipe ReadFromMergeTree::readFromPool() i, pool, settings.min_marks_for_concurrent_read, settings.max_block_size, settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes, storage, metadata_snapshot, settings.use_uncompressed_cache, - prewhere_info, aggregate_projection, settings.reader_settings, virt_column_names); + prewhere_info, projection, settings.reader_settings, virt_column_names); if (i == 0) { @@ -99,7 +99,7 @@ ProcessorPtr ReadFromMergeTree::createSource(const RangesInDataPart & part) return std::make_shared( storage, metadata_snapshot, part.data_part, settings.max_block_size, settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes, required_columns, part.ranges, settings.use_uncompressed_cache, - prewhere_info, aggregate_projection, true, settings.reader_settings, virt_column_names, part.part_index_in_query); + prewhere_info, projection, true, settings.reader_settings, virt_column_names, part.part_index_in_query); } Pipe ReadFromMergeTree::readInOrder() diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index 14279945326..bc4f509c704 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -74,7 +74,7 @@ public: RangesInDataParts parts_, IndexStatPtr index_stats_, PrewhereInfoPtr prewhere_info_, - const ProjectionDescription * aggregate_projection_, + const ProjectionDescription * projection_, Names virt_column_names_, Settings settings_, size_t num_streams_, @@ -100,7 +100,7 @@ private: RangesInDataParts parts; IndexStatPtr index_stats; PrewhereInfoPtr prewhere_info; - const ProjectionDescription * aggregate_projection; + const ProjectionDescription * projection; Names virt_column_names; Settings settings; diff --git a/src/Storages/IStorage.cpp b/src/Storages/IStorage.cpp index e82c80761d8..b93e0a30a15 100644 --- a/src/Storages/IStorage.cpp +++ b/src/Storages/IStorage.cpp @@ -105,7 +105,7 @@ void IStorage::read( auto pipe = read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams); if (pipe.empty()) { - auto header = (query_info.aggregate_projection ? query_info.aggregate_projection->metadata : metadata_snapshot) + auto header = (query_info.projection ? query_info.projection->metadata : metadata_snapshot) ->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()); InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info, context); } diff --git a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp index d05c0d56d82..66561c38749 100644 --- a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp @@ -26,7 +26,7 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor( const MergeTreeData & storage_, const StorageMetadataPtr & metadata_snapshot_, const PrewhereInfoPtr & prewhere_info_, - const ProjectionDescription * aggregate_projection_, + const ProjectionDescription * projection_, UInt64 max_block_size_rows_, UInt64 preferred_block_size_bytes_, UInt64 preferred_max_column_in_block_size_bytes_, @@ -37,7 +37,7 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor( , storage(storage_) , metadata_snapshot(metadata_snapshot_) , prewhere_info(prewhere_info_) - , aggregate_projection(aggregate_projection_) + , projection(projection_) , max_block_size_rows(max_block_size_rows_) , preferred_block_size_bytes(preferred_block_size_bytes_) , preferred_max_column_in_block_size_bytes(preferred_max_column_in_block_size_bytes_) @@ -66,7 +66,7 @@ Chunk MergeTreeBaseSelectProcessor::generate() if (res.hasRows()) { injectVirtualColumns(res, task.get(), partition_value_type, virt_column_names); - if (aggregate_projection) + if (projection) { auto info = std::make_shared(); info->bucket_num = -1; diff --git a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h index 99d16efd5c1..751e0ca2ad9 100644 --- a/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.h @@ -24,7 +24,7 @@ public: const MergeTreeData & storage_, const StorageMetadataPtr & metadata_snapshot_, const PrewhereInfoPtr & prewhere_info_, - const ProjectionDescription * aggregate_projection_, + const ProjectionDescription * projection_, UInt64 max_block_size_rows_, UInt64 preferred_block_size_bytes_, UInt64 preferred_max_column_in_block_size_bytes_, @@ -62,7 +62,7 @@ protected: StorageMetadataPtr metadata_snapshot; PrewhereInfoPtr prewhere_info; - const ProjectionDescription * aggregate_projection; + const ProjectionDescription * projection; UInt64 max_block_size_rows; UInt64 preferred_block_size_bytes; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 5dfbffc4b38..61891575e29 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -16,9 +16,7 @@ #include #include #include -#include #include -#include #include #include #include @@ -3816,12 +3814,26 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( InterpreterSelectQuery select( query_ptr, query_context, SelectQueryOptions{QueryProcessingStage::WithMergeableState}.ignoreProjections().ignoreAlias()); const auto & analysis_result = select.getAnalysisResult(); - /// If first staging query pipeline is more complex than Aggregating - Expression - Filter - ReadFromStorage, return false + + bool can_use_aggregate_projection = true; + /// If the first stage of the query pipeline is more complex than Aggregating - Expression - Filter - ReadFromStorage, + /// we cannot use aggregate projection. if (analysis_result.join != nullptr || analysis_result.array_join != nullptr) - return false; + can_use_aggregate_projection = false; auto query_block = select.getSampleBlock(); - auto required_query = select.getQuery(); + const auto & required_query = select.getQuery()->as(); + auto required_predicates = [&required_query]() -> ASTPtr + { + if (required_query.prewhere() && required_query.where()) + return makeASTFunction("and", required_query.prewhere()->clone(), required_query.where()->clone()); + else if (required_query.prewhere()) + return required_query.prewhere()->clone(); + else if (required_query.where()) + return required_query.where()->clone(); + else + return nullptr; + }(); /// Check if all needed columns can be provided by some aggregate projection. Here we also try /// to find expression matches. For example, suppose an aggregate projection contains a column @@ -3833,11 +3845,18 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( /// The ownership of ProjectionDescription is hold in metadata_snapshot which lives along with /// InterpreterSelect, thus we can store the raw pointer here. - std::vector> candidates; + struct ProjectionCandidate + { + const ProjectionDescription * desc; + ProjectionKeyActions key_actions; + Names required_columns; + NameSet required_columns_in_predicate; + }; + std::vector candidates; ParserFunction parse_function; for (const auto & projection : metadata_snapshot->projections) { - if (projection.type == "aggregate" && !analysis_result.need_aggregate) + if (projection.type == "aggregate" && (!analysis_result.need_aggregate || !can_use_aggregate_projection)) continue; if (projection.type == "normal" && !(analysis_result.hasWhere() || analysis_result.hasPrewhere())) @@ -3898,18 +3917,13 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( } ProjectionCondition projection_condition(projection.column_names, required_column_names); - const auto & where = query_ptr->as().where(); - if (where && !projection_condition.check(where)) + if (required_predicates && !projection_condition.check(required_predicates)) continue; - candidates.push_back({&projection, std::move(key_actions)}); - // A candidate is found, setup needed info but only once. - if (query_info.projection_names.empty()) - { - query_info.projection_names = projection_condition.getRequiredColumns(); - query_info.projection_block = query_block; - query_info.aggregation_keys = select.getQueryAnalyzer()->aggregationKeys(); - query_info.aggregate_descriptions = select.getQueryAnalyzer()->aggregates(); - } + candidates.push_back( + {&projection, + std::move(key_actions), + projection_condition.getRequiredColumns(), + projection_condition.getRequiredColumnsInPredicate()}); } } @@ -3917,16 +3931,54 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( if (!candidates.empty()) { size_t min_key_size = std::numeric_limits::max(); - for (auto & [candidate, key_actions] : candidates) + const ProjectionCandidate * selected_candidate = nullptr; + /// Favor aggregate projections + for (const auto & candidate : candidates) { // TODO We choose the projection with least key_size. Perhaps we can do better? (key rollups) - if (candidate->key_size < min_key_size) + if (candidate.desc->type == "aggregate" && candidate.desc->key_size < min_key_size) { - query_info.aggregate_projection = candidate; - query_info.key_actions = std::move(key_actions); - min_key_size = candidate->key_size; + selected_candidate = &candidate; + min_key_size = candidate.desc->key_size; } } + + /// Select the best normal projection if no aggregate projection is available + if (!selected_candidate) + { + size_t max_num_of_matched_key_columns = 0; + for (const auto & candidate : candidates) + { + NameSet column_names( + candidate.desc->metadata->sorting_key.column_names.begin(), candidate.desc->metadata->sorting_key.column_names.end()); + + size_t num_of_matched_key_columns = 0; + for (const auto & name : candidate.desc->metadata->sorting_key.column_names) + { + if (candidate.required_columns_in_predicate.find(name) != candidate.required_columns_in_predicate.end()) + ++num_of_matched_key_columns; + } + + /// Select the normal projection that has the most matched key columns in predicate + /// TODO What's the best strategy here? + if (num_of_matched_key_columns > max_num_of_matched_key_columns) + { + selected_candidate = &candidate; + max_num_of_matched_key_columns = num_of_matched_key_columns; + } + } + } + + if (!selected_candidate) + throw Exception("None of the projection candidates is selected", ErrorCodes::LOGICAL_ERROR); + + query_info.projection = selected_candidate->desc; + query_info.key_actions = std::move(selected_candidate->key_actions); + query_info.projection_names = std::move(selected_candidate->required_columns); + query_info.projection_block = query_block; + query_info.aggregation_keys = select.getQueryAnalyzer()->aggregationKeys(); + query_info.aggregate_descriptions = select.getQueryAnalyzer()->aggregates(); + return true; } return false; @@ -3943,7 +3995,7 @@ QueryProcessingStage::Enum MergeTreeData::getQueryProcessingStage( { if (getQueryProcessingStageWithAggregateProjection(query_context, metadata_snapshot, query_info)) { - if (query_info.aggregate_projection->type == "aggregate") + if (query_info.projection->type == "aggregate") return QueryProcessingStage::Enum::WithMergeableState; } } diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index edcfb4111b0..bce60697712 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -142,7 +142,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( const PartitionIdToMaxBlock * max_block_numbers_to_read) const { const auto & settings = context->getSettingsRef(); - if (query_info.aggregate_projection == nullptr) + if (query_info.projection == nullptr) { if (settings.allow_experimental_projection_optimization && settings.force_optimize_projection && !metadata_snapshot->projections.empty()) @@ -165,7 +165,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( size_t no_projection_granules = 0; size_t with_projection_granules = 0; - if (query_info.aggregate_projection->type == "normal") + if (query_info.projection->type == "normal") plan_no_projections = readFromParts( data.getDataPartsVector(), column_names_to_return, @@ -177,7 +177,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( max_block_numbers_to_read, &no_projection_granules); - LOG_DEBUG(log, "Choose projection {}", query_info.aggregate_projection->name); + LOG_DEBUG(log, "Choose projection {}", query_info.projection->name); Pipes pipes; auto parts = data.getDataPartsVector(); @@ -187,7 +187,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( for (auto & part : parts) { const auto & projections = part->getProjectionParts(); - auto it = projections.find(query_info.aggregate_projection->name); + auto it = projections.find(query_info.projection->name); if (it != projections.end()) { projection_parts.push_back(it->second); @@ -210,45 +210,20 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( throw Exception( ErrorCodes::PROJECTION_NOT_USED, "No projection is used because there are more than 50% rows don't have projection {}. {} normal rows, {} projection rows", - query_info.aggregate_projection->name, + query_info.projection->name, rows_without_projection, rows_with_projection); - // std::cerr << "========== Normal parts " << normal_parts.size() << std::endl; - // std::cerr << "========== Projec parts " << projection_parts.size() << std::endl; - Pipe projection_pipe; Pipe ordinary_pipe; const auto & given_select = query_info.query->as(); if (!projection_parts.empty()) { - auto projection_names = query_info.projection_names; - - if (query_info.prewhere_info) - { - const auto prewhere_columns = - (query_info.prewhere_info->alias_actions ? - query_info.prewhere_info->alias_actions : - (query_info.prewhere_info->row_level_filter ? - query_info.prewhere_info->row_level_filter : - query_info.prewhere_info->prewhere_actions))->getRequiredColumns(); - - NameSet added_column(projection_names.begin(), projection_names.end()); - for (const auto & col : prewhere_columns) - { - if (added_column.count(col) == 0) - { - added_column.insert(col); - projection_names.push_back(col); - } - } - } - auto plan = readFromParts( std::move(projection_parts), - projection_names, // raw columns without key transformation - query_info.aggregate_projection->metadata, + query_info.projection_names, // raw columns without key transformation + query_info.projection->metadata, query_info, context, max_block_size, @@ -258,7 +233,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( true); if (plan) - projection_pipe = plan->convertToPipe(QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); + projection_pipe = plan->convertToPipe( + QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); if (!projection_pipe.empty()) { @@ -277,16 +253,15 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( auto syntax_result = TreeRewriter(context).analyze(expr, columns); auto expression = ExpressionAnalyzer(expr, syntax_result, context).getActions(false); - projection_pipe.addSimpleTransform([&expression](const Block & header) - { - return std::make_shared(header, expression); - }); + projection_pipe.addSimpleTransform( + [&expression](const Block & header) { return std::make_shared(header, expression); }); } /// In sample block we use just key columns if (given_select.where()) { - Block filter_block = projection_pipe.getHeader(); // we can use the previous pipeline's sample block here + // we can use the previous pipeline's sample block here + Block filter_block = projection_pipe.getHeader(); ASTPtr where = given_select.where()->clone(); ProjectionCondition projection_condition(filter_block.getNames(), {}); @@ -313,14 +288,16 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( if (!normal_parts.empty()) { auto storage_from_source_part = StorageFromMergeTreeDataPart::create(std::move(normal_parts)); - auto ast = query_info.aggregate_projection->query_ast->clone(); + auto ast = query_info.projection->query_ast->clone(); auto & select = ast->as(); if (given_select.where()) select.setExpression(ASTSelectQuery::Expression::WHERE, given_select.where()->clone()); + // After overriding the group by clause, we finish the possible aggregations directly if (processed_stage >= QueryProcessingStage::Enum::WithMergeableState && given_select.groupBy()) select.setExpression(ASTSelectQuery::Expression::GROUP_BY, given_select.groupBy()->clone()); - auto interpreter = InterpreterSelectQuery(ast, context, storage_from_source_part, nullptr, SelectQueryOptions{processed_stage}.ignoreAggregation()); + auto interpreter = InterpreterSelectQuery( + ast, context, storage_from_source_part, nullptr, SelectQueryOptions{processed_stage}.ignoreAggregation()); ordinary_pipe = QueryPipeline::getPipe(interpreter.execute().pipeline); with_projection_granules += storage_from_source_part->getNumGranulesFromLastRead(); @@ -328,19 +305,17 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( if (!ordinary_pipe.empty() && processed_stage < QueryProcessingStage::Enum::WithMergeableState) { // projection and set bucket number to -1 - ordinary_pipe.addSimpleTransform([&](const Block & header) - { - return std::make_shared(header, query_info.projection_block); - }); + ordinary_pipe.addSimpleTransform( + [&](const Block & header) { return std::make_shared(header, query_info.projection_block); }); } } /// Use normal projection only if we read less granules then without it. /// TODO: check if read-in-order optimization possible for normal projection. - if (query_info.aggregate_projection->type == "normal" && with_projection_granules > no_projection_granules) + if (query_info.projection->type == "normal" && with_projection_granules > no_projection_granules) return plan_no_projections; - if (query_info.aggregate_projection->type == "aggregate") + if (query_info.projection->type == "aggregate") { /// Here we create shared ManyAggregatedData for both projection and ordinary data. /// For ordinary data, AggregatedData is filled in a usual way. @@ -356,17 +331,14 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( auto many_data = std::make_shared(projection_pipe.numOutputPorts() + ordinary_pipe.numOutputPorts()); size_t counter = 0; - bool overflow_row = - given_select.group_by_with_totals && - settings.max_rows_to_group_by && - settings.group_by_overflow_mode == OverflowMode::ANY && - settings.totals_mode != TotalsMode::AFTER_HAVING_EXCLUSIVE; + bool overflow_row = given_select.group_by_with_totals && settings.max_rows_to_group_by + && settings.group_by_overflow_mode == OverflowMode::ANY && settings.totals_mode != TotalsMode::AFTER_HAVING_EXCLUSIVE; if (!projection_pipe.empty()) { const auto & header_before_merge = projection_pipe.getHeader(); - // std::cerr << "============ header before merge" << std::endl; - // std::cerr << header_before_merge.dumpStructure() << std::endl; + std::cerr << "============ header before merge" << std::endl; + std::cerr << header_before_merge.dumpStructure() << std::endl; ColumnNumbers keys; for (const auto & key : query_info.aggregation_keys) keys.push_back(header_before_merge.getPositionByName(key.name)); @@ -376,15 +348,20 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( AggregateDescriptions aggregates = query_info.aggregate_descriptions; /// Aggregator::Params params(header_before_merge, keys, query_info.aggregate_descriptions, overflow_row, settings.max_threads); - Aggregator::Params params(header_before_merge, keys, aggregates, - overflow_row, settings.max_rows_to_group_by, settings.group_by_overflow_mode, - settings.group_by_two_level_threshold, - settings.group_by_two_level_threshold_bytes, - settings.max_bytes_before_external_group_by, - settings.empty_result_for_aggregation_by_empty_set, - context->getTemporaryVolume(), - settings.max_threads, - settings.min_free_disk_space_for_temporary_data); + Aggregator::Params params( + header_before_merge, + keys, + aggregates, + overflow_row, + settings.max_rows_to_group_by, + settings.group_by_overflow_mode, + settings.group_by_two_level_threshold, + settings.group_by_two_level_threshold_bytes, + settings.max_bytes_before_external_group_by, + settings.empty_result_for_aggregation_by_empty_set, + context->getTemporaryVolume(), + settings.max_threads, + settings.min_free_disk_space_for_temporary_data); auto transform_params = std::make_shared(std::move(params), /*final*/ true); /// This part is hacky. @@ -393,31 +370,32 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( /// /// It is needed because data in projection: /// * is not merged completely (we may have states with the same key in different parts) - /// * is not splitted into buckets (so if we just use MergingAggregated, it will use single thread) + /// * is not split into buckets (so if we just use MergingAggregated, it will use single thread) transform_params->only_merge = true; projection_pipe.resize(projection_pipe.numOutputPorts(), true, true); auto merge_threads = num_streams; auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads - ? static_cast(settings.aggregation_memory_efficient_merge_threads) - : static_cast(settings.max_threads); + ? static_cast(settings.aggregation_memory_efficient_merge_threads) + : static_cast(settings.max_threads); projection_pipe.addSimpleTransform([&](const Block & header) { - return std::make_shared(header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads); + return std::make_shared( + header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads); }); - // std::cerr << "========== header after merge" << std::endl; - // std::cerr << projection_pipe.getHeader().dumpStructure() << std::endl; + std::cerr << "========== header after merge" << std::endl; + std::cerr << projection_pipe.getHeader().dumpStructure() << std::endl; } if (!ordinary_pipe.empty()) { const auto & header_before_aggregation = ordinary_pipe.getHeader(); - // std::cerr << "============ header before aggregation" << std::endl; - // std::cerr << header_before_aggregation.dumpStructure() << std::endl; + std::cerr << "============ header before aggregation" << std::endl; + std::cerr << header_before_aggregation.dumpStructure() << std::endl; ColumnNumbers keys; for (const auto & key : query_info.aggregation_keys) @@ -429,15 +407,20 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( for (const auto & name : descr.argument_names) descr.arguments.push_back(header_before_aggregation.getPositionByName(name)); - Aggregator::Params params(header_before_aggregation, keys, aggregates, - overflow_row, settings.max_rows_to_group_by, settings.group_by_overflow_mode, - settings.group_by_two_level_threshold, - settings.group_by_two_level_threshold_bytes, - settings.max_bytes_before_external_group_by, - settings.empty_result_for_aggregation_by_empty_set, - context->getTemporaryVolume(), - settings.max_threads, - settings.min_free_disk_space_for_temporary_data); + Aggregator::Params params( + header_before_aggregation, + keys, + aggregates, + overflow_row, + settings.max_rows_to_group_by, + settings.group_by_overflow_mode, + settings.group_by_two_level_threshold, + settings.group_by_two_level_threshold_bytes, + settings.max_bytes_before_external_group_by, + settings.empty_result_for_aggregation_by_empty_set, + context->getTemporaryVolume(), + settings.max_threads, + settings.min_free_disk_space_for_temporary_data); auto transform_params = std::make_shared(std::move(params), /*final*/ true); @@ -445,16 +428,17 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( auto merge_threads = num_streams; auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads - ? static_cast(settings.aggregation_memory_efficient_merge_threads) - : static_cast(settings.max_threads); + ? static_cast(settings.aggregation_memory_efficient_merge_threads) + : static_cast(settings.max_threads); ordinary_pipe.addSimpleTransform([&](const Block & header) { - return std::make_shared(header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads); + return std::make_shared( + header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads); }); - // std::cerr << "============ header after aggregation" << std::endl; - // std::cerr << ordinary_pipe.getHeader().dumpStructure() << std::endl; + std::cerr << "============ header after aggregation" << std::endl; + std::cerr << ordinary_pipe.getHeader().dumpStructure() << std::endl; } } @@ -478,11 +462,11 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( const UInt64 max_block_size, const unsigned num_streams, const PartitionIdToMaxBlock * max_block_numbers_to_read, - size_t * num_granules_are_to_read, + size_t * num_granules_to_read, bool use_projection_metadata) const { const StorageMetadataPtr & metadata_snapshot - = (query_info.aggregate_projection && use_projection_metadata) ? query_info.aggregate_projection->metadata : metadata_snapshot_base; + = (query_info.projection && use_projection_metadata) ? query_info.projection->metadata : metadata_snapshot_base; /// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it. /// The virtual column `_sample_factor` (which is equal to 1 / used sample rate) can be requested in the query. @@ -1140,8 +1124,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( total_marks_pk.load(std::memory_order_relaxed), sum_marks, sum_ranges); - if (num_granules_are_to_read) - *num_granules_are_to_read = sum_marks_pk.load(std::memory_order_relaxed); + if (num_granules_to_read) + *num_granules_to_read = sum_marks_pk.load(std::memory_order_relaxed); if (parts_with_ranges.empty()) return std::make_unique(); @@ -1436,7 +1420,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams( std::move(parts), std::move(index_stats), query_info.prewhere_info, - query_info.aggregate_projection, + query_info.projection, virt_columns, step_settings, num_streams, @@ -1646,7 +1630,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( std::move(new_parts), std::move(index_stats), query_info.prewhere_info, - query_info.aggregate_projection, + query_info.projection, virt_columns, step_settings, num_streams, @@ -1830,7 +1814,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( std::move(new_parts), std::move(index_stats), query_info.prewhere_info, - query_info.aggregate_projection, + query_info.projection, virt_columns, step_settings, num_streams, @@ -1922,7 +1906,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( std::move(lonely_parts), std::move(index_stats), query_info.prewhere_info, - query_info.aggregate_projection, + query_info.projection, virt_columns, step_settings, num_streams_for_lonely_parts, diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index 7cbd4340c25..08225558d0d 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -46,7 +46,7 @@ public: UInt64 max_block_size, unsigned num_streams, const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr, - size_t * num_granules_are_to_read = nullptr, + size_t * num_granules_to_read = nullptr, bool use_projection_metadata = false) const; private: diff --git a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp index 668f340d52a..ea2c00fe577 100644 --- a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.cpp @@ -23,7 +23,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor( MarkRanges mark_ranges_, bool use_uncompressed_cache_, const PrewhereInfoPtr & prewhere_info_, - const ProjectionDescription * aggregate_projection_, + const ProjectionDescription * projection_, bool check_columns, const MergeTreeReaderSettings & reader_settings_, const Names & virt_column_names_, @@ -32,7 +32,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor( : MergeTreeBaseSelectProcessor{ metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()), - storage_, metadata_snapshot_, prewhere_info_, aggregate_projection_, max_block_size_rows_, + storage_, metadata_snapshot_, prewhere_info_, projection_, max_block_size_rows_, preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, reader_settings_, use_uncompressed_cache_, virt_column_names_}, required_columns{std::move(required_columns_)}, diff --git a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h index d34a6e65cbc..d8ae71f3cce 100644 --- a/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeReverseSelectProcessor.h @@ -27,7 +27,7 @@ public: MarkRanges mark_ranges, bool use_uncompressed_cache, const PrewhereInfoPtr & prewhere_info, - const ProjectionDescription * aggregate_projection_, + const ProjectionDescription * projection_, bool check_columns, const MergeTreeReaderSettings & reader_settings, const Names & virt_column_names = {}, diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp index b912fefa4ab..33e75a21255 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp @@ -23,7 +23,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor( MarkRanges mark_ranges_, bool use_uncompressed_cache_, const PrewhereInfoPtr & prewhere_info_, - const ProjectionDescription * aggregate_projection_, + const ProjectionDescription * projection_, bool check_columns_, const MergeTreeReaderSettings & reader_settings_, const Names & virt_column_names_, @@ -32,7 +32,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor( : MergeTreeBaseSelectProcessor{ metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()), - storage_, metadata_snapshot_, prewhere_info_, aggregate_projection_, max_block_size_rows_, + storage_, metadata_snapshot_, prewhere_info_, projection_, max_block_size_rows_, preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, reader_settings_, use_uncompressed_cache_, virt_column_names_}, required_columns{std::move(required_columns_)}, diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.h b/src/Storages/MergeTree/MergeTreeSelectProcessor.h index 7488d4bda4c..82dd3ac6493 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.h @@ -27,7 +27,7 @@ public: MarkRanges mark_ranges, bool use_uncompressed_cache, const PrewhereInfoPtr & prewhere_info, - const ProjectionDescription * aggregate_projection_, + const ProjectionDescription * projection_, bool check_columns, const MergeTreeReaderSettings & reader_settings, const Names & virt_column_names = {}, diff --git a/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.cpp b/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.cpp index 1ca77ff8207..1c922202740 100644 --- a/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.cpp @@ -19,12 +19,12 @@ MergeTreeThreadSelectBlockInputProcessor::MergeTreeThreadSelectBlockInputProcess const StorageMetadataPtr & metadata_snapshot_, const bool use_uncompressed_cache_, const PrewhereInfoPtr & prewhere_info_, - const ProjectionDescription * aggregate_projection_, + const ProjectionDescription * projection_, const MergeTreeReaderSettings & reader_settings_, const Names & virt_column_names_) : MergeTreeBaseSelectProcessor{ - pool_->getHeader(), storage_, metadata_snapshot_, prewhere_info_, aggregate_projection_, + pool_->getHeader(), storage_, metadata_snapshot_, prewhere_info_, projection_, max_block_size_rows_, preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, reader_settings_, use_uncompressed_cache_, virt_column_names_}, diff --git a/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h b/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h index 9075bb04226..18364308ae3 100644 --- a/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h +++ b/src/Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h @@ -25,7 +25,7 @@ public: const StorageMetadataPtr & metadata_snapshot_, const bool use_uncompressed_cache_, const PrewhereInfoPtr & prewhere_info_, - const ProjectionDescription * aggregate_projection_, + const ProjectionDescription * projection_, const MergeTreeReaderSettings & reader_settings_, const Names & virt_column_names_); diff --git a/src/Storages/MergeTree/ProjectionCondition.cpp b/src/Storages/MergeTree/ProjectionCondition.cpp index 4d7440838e2..0f36869584a 100644 --- a/src/Storages/MergeTree/ProjectionCondition.cpp +++ b/src/Storages/MergeTree/ProjectionCondition.cpp @@ -26,6 +26,7 @@ bool ProjectionCondition::check(const ASTPtr & node) if (key_columns.end() != it) { ++it->second; + required_columns_in_predicate.insert(name); return true; } else @@ -55,7 +56,7 @@ bool ProjectionCondition::check(const ASTPtr & node) return false; } - // TODO zhentianqi other special functions such as joinGet/dictGet + // TODO Need to check other special functions such as joinGet/dictGet auto name = node->getColumnNameWithoutAlias(); auto it = key_columns.find(name); if (key_columns.end() != it) diff --git a/src/Storages/MergeTree/ProjectionCondition.h b/src/Storages/MergeTree/ProjectionCondition.h index eeb7d0583c0..77d2a05c933 100644 --- a/src/Storages/MergeTree/ProjectionCondition.h +++ b/src/Storages/MergeTree/ProjectionCondition.h @@ -16,10 +16,13 @@ public: Names getRequiredColumns() const; + NameSet getRequiredColumnsInPredicate() const { return required_columns_in_predicate; } + void rewrite(ASTPtr & node) const; private: std::unordered_map key_columns; + NameSet required_columns_in_predicate; }; } diff --git a/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h b/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h index cd094c47fc6..2ad2dde1fef 100644 --- a/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h +++ b/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h @@ -31,9 +31,17 @@ public: size_t max_block_size, unsigned num_streams) override { - QueryPlan query_plan = - std::move(*MergeTreeDataSelectExecutor(parts.front()->storage) - .readFromParts(parts, column_names, metadata_snapshot, query_info, context, max_block_size, num_streams, nullptr, &num_granules_from_last_read)); + QueryPlan query_plan = std::move(*MergeTreeDataSelectExecutor(parts.front()->storage) + .readFromParts( + parts, + column_names, + metadata_snapshot, + query_info, + context, + max_block_size, + num_streams, + nullptr, + &num_granules_from_last_read)); return query_plan.convertToPipe(QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); } diff --git a/src/Storages/SelectQueryInfo.h b/src/Storages/SelectQueryInfo.h index 8eaf185df39..922659d125a 100644 --- a/src/Storages/SelectQueryInfo.h +++ b/src/Storages/SelectQueryInfo.h @@ -147,8 +147,8 @@ struct SelectQueryInfo ClusterPtr getCluster() const { return !optimized_cluster ? cluster : optimized_cluster; } - /// If not null, it means we choose an aggregate projection to execute current query. - const ProjectionDescription * aggregate_projection{}; + /// If not null, it means we choose a projection to execute current query. + const ProjectionDescription * projection{}; ProjectionKeyActions key_actions; Names projection_names; Block projection_block; diff --git a/tests/queries/0_stateless/01604_explain_ast_of_nonselect_query.reference b/tests/queries/0_stateless/01604_explain_ast_of_nonselect_query.reference index 6ae67d7d9ad..8863d3b57c7 100644 --- a/tests/queries/0_stateless/01604_explain_ast_of_nonselect_query.reference +++ b/tests/queries/0_stateless/01604_explain_ast_of_nonselect_query.reference @@ -1,6 +1,6 @@ AlterQuery t1 (children 1) ExpressionList (children 1) - AlterCommand 27 (children 1) + AlterCommand 30 (children 1) Function equals (children 1) ExpressionList (children 2) Identifier date