From b68857d08685d771acff539e8c5c19feefd681a4 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Sat, 28 Aug 2021 02:35:13 +0800 Subject: [PATCH] Simplify projection, add minmax_count projection. --- src/Storages/MergeTree/IMergeTreeDataPart.h | 1 - src/Storages/MergeTree/MergeTreeData.cpp | 176 +++++++++++++++--- src/Storages/MergeTree/MergeTreeData.h | 8 +- .../MergeTree/MergeTreeDataMergerMutator.cpp | 38 ++-- .../MergeTree/MergeTreeDataMergerMutator.h | 12 +- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 61 +++--- .../MergeTree/MergeTreeProjections.cpp | 78 -------- src/Storages/MergeTree/MergeTreeProjections.h | 88 --------- .../MergeTree/MergedBlockOutputStream.h | 2 +- .../MergeTree/registerStorageMergeTree.cpp | 7 + src/Storages/ProjectionsDescription.cpp | 52 ++++++ src/Storages/ProjectionsDescription.h | 11 ++ src/Storages/SelectQueryInfo.h | 3 +- src/Storages/StorageInMemoryMetadata.cpp | 6 + src/Storages/StorageInMemoryMetadata.h | 3 + src/Storages/ya.make | 1 - .../01710_minmax_count_projection.reference | 4 + .../01710_minmax_count_projection.sql | 14 ++ 18 files changed, 314 insertions(+), 251 deletions(-) delete mode 100644 src/Storages/MergeTree/MergeTreeProjections.cpp delete mode 100644 src/Storages/MergeTree/MergeTreeProjections.h create mode 100644 tests/queries/0_stateless/01710_minmax_count_projection.reference create mode 100644 tests/queries/0_stateless/01710_minmax_count_projection.sql diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 8b7a15e5da0..f4493adf0b6 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -7,7 +7,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index bdb3471fb01..d907005da86 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -57,6 +57,7 @@ #include #include #include +#include #include #include @@ -447,8 +448,6 @@ void MergeTreeData::checkProperties( for (const auto & projection : new_metadata.projections) { - MergeTreeProjectionFactory::instance().validate(projection); - if (projections_names.find(projection.name) != projections_names.end()) throw Exception( "Projection with name " + backQuote(projection.name) + " already exists", @@ -768,7 +767,7 @@ Block MergeTreeData::getSampleBlockWithVirtualColumns() const } -Block MergeTreeData::getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part) const +Block MergeTreeData::getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part, bool ignore_empty) const { auto block = getSampleBlockWithVirtualColumns(); MutableColumns columns = block.mutateColumns(); @@ -781,6 +780,8 @@ Block MergeTreeData::getBlockWithVirtualPartColumns(const MergeTreeData::DataPar bool has_partition_value = typeid_cast(partition_value_column.get()); for (const auto & part_or_projection : parts) { + if (ignore_empty && part_or_projection->isEmpty()) + continue; const auto * part = part_or_projection->isProjectionPart() ? part_or_projection->getParentPart() : part_or_projection.get(); part_column->insert(part->name); partition_id_column->insert(part->info.partition_id); @@ -4143,6 +4144,110 @@ static void selectBestProjection( } +Block MergeTreeData::getMinMaxCountProjectionBlock( + const StorageMetadataPtr & metadata_snapshot, + const Names & required_columns, + const SelectQueryInfo & query_info, + ContextPtr query_context) const +{ + if (!metadata_snapshot->minmax_count_projection) + throw Exception( + "Cannot find the definition of minmax_count projection but it's used in current query. It's a bug", + ErrorCodes::LOGICAL_ERROR); + + auto block = metadata_snapshot->minmax_count_projection->sample_block; + auto minmax_count_columns = block.mutateColumns(); + + auto insert = [](ColumnAggregateFunction & column, const Field & value) + { + auto func = column.getAggregateFunction(); + Arena & arena = column.createOrGetArena(); + size_t size_of_state = func->sizeOfData(); + size_t align_of_state = func->alignOfData(); + auto * place = arena.alignedAlloc(size_of_state, align_of_state); + func->create(place); + auto value_column = func->getReturnType()->createColumnConst(1, value)->convertToFullColumnIfConst(); + const auto * value_column_ptr = value_column.get(); + func->add(place, &value_column_ptr, 0, &arena); + column.insertFrom(place); + }; + + auto parts = getDataPartsVector(); + ASTPtr expression_ast; + Block virtual_columns_block = getBlockWithVirtualPartColumns(parts, false /* one_part */, true /* ignore_empty */); + if (virtual_columns_block.rows() == 0) + return {}; + + // Generate valid expressions for filtering + VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, query_context, virtual_columns_block, expression_ast); + if (expression_ast) + VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, query_context, expression_ast); + + size_t rows = virtual_columns_block.rows(); + const ColumnString & part_name_column = typeid_cast(*virtual_columns_block.getByName("_part").column); + size_t part_idx = 0; + for (size_t row = 0; row < rows; ++row) + { + while (parts[part_idx]->name != part_name_column.getDataAt(row)) + ++part_idx; + + const auto & part = parts[part_idx]; + + if (!part->minmax_idx.initialized) + throw Exception("Found a non-empty part with uninitialized minmax_idx. It's a bug", ErrorCodes::LOGICAL_ERROR); + + size_t minmax_idx_size = part->minmax_idx.hyperrectangle.size(); + if (2 * minmax_idx_size + 1 != minmax_count_columns.size()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "minmax_count projection should have twice plus one the number of ranges in minmax_idx. 2 * minmax_idx_size + 1 = {}, " + "minmax_count_columns.size() = {}. It's a bug", + 2 * minmax_idx_size + 1, + minmax_count_columns.size()); + + for (size_t i = 0; i < minmax_idx_size; ++i) + { + size_t min_pos = i * 2; + size_t max_pos = i * 2 + 1; + auto & min_column = assert_cast(*minmax_count_columns[min_pos]); + auto & max_column = assert_cast(*minmax_count_columns[max_pos]); + const auto & range = part->minmax_idx.hyperrectangle[i]; + insert(min_column, range.left); + insert(max_column, range.right); + } + + { + auto & column = assert_cast(*minmax_count_columns.back()); + auto func = column.getAggregateFunction(); + Arena & arena = column.createOrGetArena(); + size_t size_of_state = func->sizeOfData(); + size_t align_of_state = func->alignOfData(); + auto * place = arena.alignedAlloc(size_of_state, align_of_state); + func->create(place); + const AggregateFunctionCount & agg_count = assert_cast(*func); + agg_count.set(place, part->rows_count); + column.insertFrom(place); + } + } + block.setColumns(std::move(minmax_count_columns)); + + Block res; + for (const auto & name : required_columns) + { + if (virtual_columns_block.has(name)) + res.insert(virtual_columns_block.getByName(name)); + else if (block.has(name)) + res.insert(block.getByName(name)); + else + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Cannot find column {} in minmax_count projection but query analysis still selects this projection. It's a bug", + name); + } + return res; +} + + bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info) const { @@ -4356,11 +4461,27 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( } }; - for (const auto & projection : metadata_snapshot->projections) - add_projection_candidate(projection); + ProjectionCandidate * selected_candidate = nullptr; + size_t min_sum_marks = std::numeric_limits::max(); + if (metadata_snapshot->minmax_count_projection) + add_projection_candidate(*metadata_snapshot->minmax_count_projection); + + // Only add more projection candidates if minmax_count_projection cannot match. + if (candidates.empty()) + { + for (const auto & projection : metadata_snapshot->projections) + add_projection_candidate(projection); + } + else + { + selected_candidate = &candidates.front(); + query_info.minmax_count_projection_block + = getMinMaxCountProjectionBlock(metadata_snapshot, selected_candidate->required_columns, query_info, query_context); + min_sum_marks = query_info.minmax_count_projection_block.rows(); + } // Let's select the best projection to execute the query. - if (!candidates.empty()) + if (!candidates.empty() && !selected_candidate) { std::shared_ptr max_added_blocks; if (settings.select_sequential_consistency) @@ -4381,7 +4502,6 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( settings.max_threads, max_added_blocks); - size_t min_sum_marks = std::numeric_limits::max(); if (!query_info.merge_tree_select_result_ptr->error()) { // Add 1 to base sum_marks so that we prefer projections even when they have equal number of marks to read. @@ -4389,7 +4509,6 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( min_sum_marks = query_info.merge_tree_select_result_ptr->marks() + 1; } - ProjectionCandidate * selected_candidate = nullptr; /// Favor aggregate projections for (auto & candidate : candidates) { @@ -4429,28 +4548,27 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( min_sum_marks); } } - - if (!selected_candidate) - return false; - else if (min_sum_marks == 0) - { - /// If selected_projection indicated an empty result set. Remember it in query_info but - /// don't use projection to run the query, because projection pipeline with empty result - /// set will not work correctly with empty_result_for_aggregation_by_empty_set. - query_info.merge_tree_empty_result = true; - return false; - } - - if (selected_candidate->desc->type == ProjectionDescription::Type::Aggregate) - { - selected_candidate->aggregation_keys = select.getQueryAnalyzer()->aggregationKeys(); - selected_candidate->aggregate_descriptions = select.getQueryAnalyzer()->aggregates(); - } - - query_info.projection = std::move(*selected_candidate); - return true; } - return false; + + if (!selected_candidate) + return false; + else if (min_sum_marks == 0) + { + /// If selected_projection indicated an empty result set. Remember it in query_info but + /// don't use projection to run the query, because projection pipeline with empty result + /// set will not work correctly with empty_result_for_aggregation_by_empty_set. + query_info.merge_tree_empty_result = true; + return false; + } + + if (selected_candidate->desc->type == ProjectionDescription::Type::Aggregate) + { + selected_candidate->aggregation_keys = select.getQueryAnalyzer()->aggregationKeys(); + selected_candidate->aggregate_descriptions = select.getQueryAnalyzer()->aggregates(); + } + + query_info.projection = std::move(*selected_candidate); + return true; } diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index ef5f22ed096..d06fa598b2d 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -362,6 +362,12 @@ public: bool attach, BrokenPartCallback broken_part_callback_ = [](const String &){}); + Block getMinMaxCountProjectionBlock( + const StorageMetadataPtr & metadata_snapshot, + const Names & required_columns, + const SelectQueryInfo & query_info, + ContextPtr query_context) const; + bool getQueryProcessingStageWithAggregateProjection( ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info) const; @@ -800,7 +806,7 @@ public: /// Construct a block consisting only of possible virtual columns for part pruning. /// If one_part is true, fill in at most one part. - Block getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part) const; + Block getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part, bool ignore_empty = false) const; /// Limiting parallel sends per one table, used in DataPartsExchange std::atomic_uint current_table_sends {0}; diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 00a599af9c3..4948a5884d3 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -1362,7 +1362,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor String destination = new_part_tmp_path; String file_name = it->name(); - auto rename_it = std::find_if(files_to_rename.begin(), files_to_rename.end(), [&file_name](const auto & rename_pair) { return rename_pair.first == file_name; }); + auto rename_it = std::find_if(files_to_rename.begin(), files_to_rename.end(), [&file_name](const auto & rename_pair) + { + return rename_pair.first == file_name; + }); if (rename_it != files_to_rename.end()) { if (rename_it->second.empty()) @@ -1723,7 +1726,7 @@ NameSet MergeTreeDataMergerMutator::collectFilesToSkip( const Block & updated_header, const std::set & indices_to_recalc, const String & mrk_extension, - const std::set & projections_to_recalc) + const std::set & projections_to_recalc) { NameSet files_to_skip = source_part->getFileNamesWithoutChecksums(); @@ -1740,16 +1743,16 @@ NameSet MergeTreeDataMergerMutator::collectFilesToSkip( auto serialization = source_part->getSerializationForColumn({entry.name, entry.type}); serialization->enumerateStreams(callback); } + for (const auto & index : indices_to_recalc) { files_to_skip.insert(index->getFileName() + ".idx"); files_to_skip.insert(index->getFileName() + ".idx2"); files_to_skip.insert(index->getFileName() + mrk_extension); } + for (const auto & projection : projections_to_recalc) - { files_to_skip.insert(projection->getDirectoryName()); - } return files_to_skip; } @@ -1853,7 +1856,7 @@ MergeTreeIndices MergeTreeDataMergerMutator::getIndicesForNewDataPart( return new_indices; } -MergeTreeProjections MergeTreeDataMergerMutator::getProjectionsForNewDataPart( +std::vector MergeTreeDataMergerMutator::getProjectionsForNewDataPart( const ProjectionsDescription & all_projections, const MutationCommands & commands_for_removes) { @@ -1862,10 +1865,10 @@ MergeTreeProjections MergeTreeDataMergerMutator::getProjectionsForNewDataPart( if (command.type == MutationCommand::DROP_PROJECTION) removed_projections.insert(command.column_name); - MergeTreeProjections new_projections; + std::vector new_projections; for (const auto & projection : all_projections) if (!removed_projections.count(projection.name)) - new_projections.push_back(MergeTreeProjectionFactory::instance().get(projection)); + new_projections.push_back(&projection); return new_projections; } @@ -1941,21 +1944,20 @@ std::set MergeTreeDataMergerMutator::getIndicesToRecalculate( return indices_to_recalc; } -std::set MergeTreeDataMergerMutator::getProjectionsToRecalculate( +std::set MergeTreeDataMergerMutator::getProjectionsToRecalculate( const NameSet & updated_columns, const StorageMetadataPtr & metadata_snapshot, const NameSet & materialized_projections, const MergeTreeData::DataPartPtr & source_part) { /// Checks if columns used in projections modified. - const auto & projection_factory = MergeTreeProjectionFactory::instance(); - std::set projections_to_recalc; + std::set projections_to_recalc; for (const auto & projection : metadata_snapshot->getProjections()) { // If we ask to materialize and it doesn't exist if (!source_part->checksums.has(projection.name + ".proj") && materialized_projections.count(projection.name)) { - projections_to_recalc.insert(projection_factory.get(projection)); + projections_to_recalc.insert(&projection); } else { @@ -1971,7 +1973,7 @@ std::set MergeTreeDataMergerMutator::getProjectionsToRec } } if (mutate) - projections_to_recalc.insert(projection_factory.get(projection)); + projections_to_recalc.insert(&projection); } } return projections_to_recalc; @@ -2001,7 +2003,7 @@ ExecuteTTLType MergeTreeDataMergerMutator::shouldExecuteTTL(const StorageMetadat void MergeTreeDataMergerMutator::writeWithProjections( MergeTreeData::MutableDataPartPtr new_data_part, const StorageMetadataPtr & metadata_snapshot, - const MergeTreeProjections & projections_to_build, + const std::vector & projections_to_build, BlockInputStreamPtr mutating_stream, IMergedBlockOutputStream & out, time_t time_of_mutation, @@ -2037,7 +2039,7 @@ void MergeTreeDataMergerMutator::writeWithProjections( for (size_t i = 0, size = projections_to_build.size(); i < size; ++i) { - const auto & projection = projections_to_build[i]->projection; + const auto & projection = *projections_to_build[i]; auto projection_block = projection_squashes[i].add(projection.calculate(block, context)); if (projection_block) projection_parts[projection.name].emplace_back(MergeTreeDataWriter::writeTempProjectionPart( @@ -2051,7 +2053,7 @@ void MergeTreeDataMergerMutator::writeWithProjections( // Write the last block for (size_t i = 0, size = projections_to_build.size(); i < size; ++i) { - const auto & projection = projections_to_build[i]->projection; + const auto & projection = *projections_to_build[i]; auto & projection_squash = projection_squashes[i]; auto projection_block = projection_squash.add({}); if (projection_block) @@ -2154,7 +2156,7 @@ void MergeTreeDataMergerMutator::mutateAllPartColumns( MergeTreeData::MutableDataPartPtr new_data_part, const StorageMetadataPtr & metadata_snapshot, const MergeTreeIndices & skip_indices, - const MergeTreeProjections & projections_to_build, + const std::vector & projections_to_build, BlockInputStreamPtr mutating_stream, time_t time_of_mutation, const CompressionCodecPtr & compression_codec, @@ -2212,7 +2214,7 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns( const MergeTreeDataPartPtr & source_part, const StorageMetadataPtr & metadata_snapshot, const std::set & indices_to_recalc, - const std::set & projections_to_recalc, + const std::set & projections_to_recalc, const Block & mutation_header, MergeTreeData::MutableDataPartPtr new_data_part, BlockInputStreamPtr mutating_stream, @@ -2249,7 +2251,7 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns( mutating_stream->readPrefix(); out.writePrefix(); - std::vector projections_to_build(projections_to_recalc.begin(), projections_to_recalc.end()); + std::vector projections_to_build(projections_to_recalc.begin(), projections_to_recalc.end()); writeWithProjections( new_data_part, metadata_snapshot, diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 3a0041e4a37..b78d2364f2a 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -189,7 +189,7 @@ private: const Block & updated_header, const std::set & indices_to_recalc, const String & mrk_extension, - const std::set & projections_to_recalc); + const std::set & projections_to_recalc); /// Get the columns list of the resulting part in the same order as storage_columns. static NamesAndTypesList getColumnsForNewDataPart( @@ -203,7 +203,7 @@ private: const IndicesDescription & all_indices, const MutationCommands & commands_for_removes); - static MergeTreeProjections getProjectionsForNewDataPart( + static std::vector getProjectionsForNewDataPart( const ProjectionsDescription & all_projections, const MutationCommands & commands_for_removes); @@ -219,7 +219,7 @@ private: const NameSet & materialized_indices, const MergeTreeData::DataPartPtr & source_part); - static std::set getProjectionsToRecalculate( + static std::set getProjectionsToRecalculate( const NameSet & updated_columns, const StorageMetadataPtr & metadata_snapshot, const NameSet & materialized_projections, @@ -228,7 +228,7 @@ private: void writeWithProjections( MergeTreeData::MutableDataPartPtr new_data_part, const StorageMetadataPtr & metadata_snapshot, - const MergeTreeProjections & projections_to_build, + const std::vector & projections_to_build, BlockInputStreamPtr mutating_stream, IMergedBlockOutputStream & out, time_t time_of_mutation, @@ -243,7 +243,7 @@ private: MergeTreeData::MutableDataPartPtr new_data_part, const StorageMetadataPtr & metadata_snapshot, const MergeTreeIndices & skip_indices, - const MergeTreeProjections & projections_to_build, + const std::vector & projections_to_build, BlockInputStreamPtr mutating_stream, time_t time_of_mutation, const CompressionCodecPtr & compression_codec, @@ -259,7 +259,7 @@ private: const MergeTreeDataPartPtr & source_part, const StorageMetadataPtr & metadata_snapshot, const std::set & indices_to_recalc, - const std::set & projections_to_recalc, + const std::set & projections_to_recalc, const Block & mutation_header, MergeTreeData::MutableDataPartPtr new_data_part, BlockInputStreamPtr mutating_stream, diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 004eaa6254c..729d6813c7c 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -169,10 +170,19 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( Pipe projection_pipe; Pipe ordinary_pipe; - if (query_info.projection->merge_tree_projection_select_result_ptr) + auto projection_plan = std::make_unique(); + if (query_info.projection->desc->is_minmax_count_projection) + { + Pipe pipe(std::make_shared( + query_info.minmax_count_projection_block, + Chunk(query_info.minmax_count_projection_block.getColumns(), query_info.minmax_count_projection_block.rows()))); + auto read_from_pipe = std::make_unique(std::move(pipe)); + projection_plan->addStep(std::move(read_from_pipe)); + } + else if (query_info.projection->merge_tree_projection_select_result_ptr) { LOG_DEBUG(log, "projection required columns: {}", fmt::join(query_info.projection->required_columns, ", ")); - auto plan = readFromParts( + projection_plan = readFromParts( {}, query_info.projection->required_columns, metadata_snapshot, @@ -183,35 +193,32 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( num_streams, max_block_numbers_to_read, query_info.projection->merge_tree_projection_select_result_ptr); + } - if (plan->isInitialized()) + if (projection_plan->isInitialized()) + { + if (query_info.projection->before_where) { - // If `before_where` is not empty, transform input blocks by adding needed columns - // originated from key columns. We already project the block at the end, using - // projection_block, so we can just add more columns here without worrying - // NOTE: prewhere is executed inside readFromParts - if (query_info.projection->before_where) - { - auto where_step = std::make_unique( - plan->getCurrentDataStream(), - query_info.projection->before_where, - query_info.projection->where_column_name, - query_info.projection->remove_where_filter); + auto where_step = std::make_unique( + projection_plan->getCurrentDataStream(), + query_info.projection->before_where, + query_info.projection->where_column_name, + query_info.projection->remove_where_filter); - where_step->setStepDescription("WHERE"); - plan->addStep(std::move(where_step)); - } - - if (query_info.projection->before_aggregation) - { - auto expression_before_aggregation - = std::make_unique(plan->getCurrentDataStream(), query_info.projection->before_aggregation); - expression_before_aggregation->setStepDescription("Before GROUP BY"); - plan->addStep(std::move(expression_before_aggregation)); - } - projection_pipe = plan->convertToPipe( - QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); + where_step->setStepDescription("WHERE"); + projection_plan->addStep(std::move(where_step)); } + + if (query_info.projection->before_aggregation) + { + auto expression_before_aggregation + = std::make_unique(projection_plan->getCurrentDataStream(), query_info.projection->before_aggregation); + expression_before_aggregation->setStepDescription("Before GROUP BY"); + projection_plan->addStep(std::move(expression_before_aggregation)); + } + + projection_pipe = projection_plan->convertToPipe( + QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); } if (query_info.projection->merge_tree_normal_select_result_ptr) diff --git a/src/Storages/MergeTree/MergeTreeProjections.cpp b/src/Storages/MergeTree/MergeTreeProjections.cpp deleted file mode 100644 index b20aa07b70c..00000000000 --- a/src/Storages/MergeTree/MergeTreeProjections.cpp +++ /dev/null @@ -1,78 +0,0 @@ -#include -#include -#include -#include -#include - -#include - -#include - - -namespace DB -{ -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; - extern const int INCORRECT_QUERY; -} - -void MergeTreeProjectionFactory::registerCreator(ProjectionDescription::Type projection_type, Creator creator) -{ - if (!creators.emplace(projection_type, std::move(creator)).second) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "MergeTreeProjectionFactory: the Projection creator name '{}' is not unique", - ProjectionDescription::typeToString(projection_type)); -} - -MergeTreeProjectionPtr MergeTreeProjectionFactory::get(const ProjectionDescription & projection) const -{ - auto it = creators.find(projection.type); - if (it == creators.end()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Projection type {} is not registered", - ProjectionDescription::typeToString(projection.type)); - - return it->second(projection); -} - - -MergeTreeProjections MergeTreeProjectionFactory::getMany(const std::vector & projections) const -{ - MergeTreeProjections result; - for (const auto & projection : projections) - result.emplace_back(get(projection)); - return result; -} - -void MergeTreeProjectionFactory::validate(const ProjectionDescription & projection) const -{ - if (startsWith(projection.name, "tmp_")) - throw Exception("Projection's name cannot start with 'tmp_'", ErrorCodes::INCORRECT_QUERY); - - get(projection); -} - -MergeTreeProjectionPtr normalProjectionCreator(const ProjectionDescription & projection) -{ - return std::make_shared(projection); -} - -MergeTreeProjectionPtr aggregateProjectionCreator(const ProjectionDescription & projection) -{ - return std::make_shared(projection); -} - -MergeTreeProjectionFactory::MergeTreeProjectionFactory() -{ - registerCreator(ProjectionDescription::Type::Normal, normalProjectionCreator); - registerCreator(ProjectionDescription::Type::Aggregate, aggregateProjectionCreator); -} - -MergeTreeProjectionFactory & MergeTreeProjectionFactory::instance() -{ - static MergeTreeProjectionFactory instance; - return instance; -} - -} diff --git a/src/Storages/MergeTree/MergeTreeProjections.h b/src/Storages/MergeTree/MergeTreeProjections.h deleted file mode 100644 index 434e0390845..00000000000 --- a/src/Storages/MergeTree/MergeTreeProjections.h +++ /dev/null @@ -1,88 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace DB -{ -/// Condition on the projection. -class IMergeTreeProjectionCondition -{ -public: - virtual ~IMergeTreeProjectionCondition() = default; - /// Checks if this projection is useful for query. - virtual bool canHandleQuery() const = 0; -}; - -using MergeTreeProjectionConditionPtr = std::shared_ptr; - -struct IMergeTreeProjection -{ - IMergeTreeProjection(const ProjectionDescription & projection_) : projection(projection_) { } - - virtual ~IMergeTreeProjection() = default; - - /// gets directory name - String getDirectoryName() const { return projection.name + ".proj"; } - - const String & getName() const { return projection.name; } - - Names getColumnsRequiredForProjectionCalc() const { return projection.required_columns; } - - const ProjectionDescription & projection; -}; - -using MergeTreeProjectionPtr = std::shared_ptr; -using MergeTreeProjections = std::vector; - -class MergeTreeProjectionNormal : public IMergeTreeProjection -{ -public: - explicit MergeTreeProjectionNormal(const ProjectionDescription & projection_) : IMergeTreeProjection(projection_) { } - - ~MergeTreeProjectionNormal() override = default; -}; - -class MergeTreeProjectionAggregate : public IMergeTreeProjection -{ -public: - explicit MergeTreeProjectionAggregate(const ProjectionDescription & projection_) : IMergeTreeProjection(projection_) { } - - ~MergeTreeProjectionAggregate() override = default; -}; - -class MergeTreeProjectionFactory : private boost::noncopyable -{ -public: - static MergeTreeProjectionFactory & instance(); - - using Creator = std::function; - - void validate(const ProjectionDescription & projection) const; - - MergeTreeProjectionPtr get(const ProjectionDescription & projection) const; - - MergeTreeProjections getMany(const std::vector & projections) const; - - void registerCreator(ProjectionDescription::Type projection_type, Creator creator); - -protected: - MergeTreeProjectionFactory(); - -private: - using Creators = std::unordered_map; - Creators creators; -}; - -} diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.h b/src/Storages/MergeTree/MergedBlockOutputStream.h index 4c36508ebf5..9b76574ac9a 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.h +++ b/src/Storages/MergeTree/MergedBlockOutputStream.h @@ -33,7 +33,7 @@ public: void writeSuffix() override; - /// Finilize writing part and fill inner structures + /// Finalize writing part and fill inner structures /// If part is new and contains projections, they should be added before invoking this method. void writeSuffixAndFinalizePart( MergeTreeData::MutableDataPartPtr & new_part, diff --git a/src/Storages/MergeTree/registerStorageMergeTree.cpp b/src/Storages/MergeTree/registerStorageMergeTree.cpp index 910492d2467..06e04d065dd 100644 --- a/src/Storages/MergeTree/registerStorageMergeTree.cpp +++ b/src/Storages/MergeTree/registerStorageMergeTree.cpp @@ -651,6 +651,10 @@ static StoragePtr create(const StorageFactory::Arguments & args) /// single default partition with name "all". metadata.partition_key = KeyDescription::getKeyFromAST(partition_by_key, metadata.columns, args.getContext()); + auto minmax_columns = metadata.getColumnsRequiredForPartitionKey(); + metadata.minmax_count_projection.emplace( + ProjectionDescription::getMinMaxCountProjection(args.columns, minmax_columns, args.getContext())); + /// PRIMARY KEY without ORDER BY is allowed and considered as ORDER BY. if (!args.storage_def->order_by && args.storage_def->primary_key) args.storage_def->set(args.storage_def->order_by, args.storage_def->primary_key->clone()); @@ -732,6 +736,9 @@ static StoragePtr create(const StorageFactory::Arguments & args) metadata.partition_key = KeyDescription::getKeyFromAST(partition_by_ast, metadata.columns, args.getContext()); + auto minmax_columns = metadata.getColumnsRequiredForPartitionKey(); + metadata.minmax_count_projection.emplace( + ProjectionDescription::getMinMaxCountProjection(args.columns, minmax_columns, args.getContext())); ++arg_num; diff --git a/src/Storages/ProjectionsDescription.cpp b/src/Storages/ProjectionsDescription.cpp index 5fc44bc044f..2d40ddfc99d 100644 --- a/src/Storages/ProjectionsDescription.cpp +++ b/src/Storages/ProjectionsDescription.cpp @@ -74,6 +74,7 @@ ProjectionDescription ProjectionDescription::clone() const other.sample_block_for_keys = sample_block_for_keys; other.metadata = metadata; other.key_size = key_size; + other.is_minmax_count_projection = is_minmax_count_projection; return other; } @@ -103,6 +104,9 @@ ProjectionDescription::getProjectionFromAST(const ASTPtr & definition_ast, const if (projection_definition->name.empty()) throw Exception("Projection must have name in definition.", ErrorCodes::INCORRECT_QUERY); + if (startsWith(projection_definition->name, "tmp_")) + throw Exception("Projection's name cannot start with 'tmp_'", ErrorCodes::INCORRECT_QUERY); + if (!projection_definition->query) throw Exception("QUERY is required for projection", ErrorCodes::INCORRECT_QUERY); @@ -189,6 +193,54 @@ ProjectionDescription::getProjectionFromAST(const ASTPtr & definition_ast, const return result; } +ProjectionDescription +ProjectionDescription::getMinMaxCountProjection(const ColumnsDescription & columns, const Names & minmax_columns, ContextPtr query_context) +{ + auto select_query = std::make_shared(); + ASTPtr select_expression_list = std::make_shared(); + for (const auto & column : minmax_columns) + { + select_expression_list->children.push_back(makeASTFunction("min", std::make_shared(column))); + select_expression_list->children.push_back(makeASTFunction("max", std::make_shared(column))); + } + select_expression_list->children.push_back(makeASTFunction("count")); + select_query->setExpression(ASTProjectionSelectQuery::Expression::SELECT, std::move(select_expression_list)); + + ProjectionDescription result; + result.definition_ast = select_query; + result.name = MINMAX_COUNT_PROJECTION_NAME; + result.query_ast = select_query->cloneToASTSelect(); + + auto external_storage_holder = std::make_shared(query_context, columns, ConstraintsDescription{}); + StoragePtr storage = external_storage_holder->getTable(); + InterpreterSelectQuery select( + result.query_ast, query_context, storage, {}, SelectQueryOptions{QueryProcessingStage::WithMergeableState}.modify().ignoreAlias()); + result.required_columns = select.getRequiredColumns(); + result.sample_block = select.getSampleBlock(); + + for (size_t i = 0; i < result.sample_block.columns(); ++i) + { + const auto & column_with_type_name = result.sample_block.getByPosition(i); + + if (column_with_type_name.column && isColumnConst(*column_with_type_name.column)) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Projections cannot contain constant columns: {}", column_with_type_name.name); + + result.column_names.emplace_back(column_with_type_name.name); + result.data_types.emplace_back(column_with_type_name.type); + } + result.type = ProjectionDescription::Type::Aggregate; + StorageInMemoryMetadata metadata; + metadata.setColumns(ColumnsDescription(result.sample_block.getNamesAndTypesList())); + metadata.partition_key = KeyDescription::getSortingKeyFromAST({}, metadata.columns, query_context, {}); + metadata.sorting_key = KeyDescription::getSortingKeyFromAST({}, metadata.columns, query_context, {}); + metadata.primary_key = KeyDescription::getKeyFromAST({}, metadata.columns, query_context); + metadata.primary_key.definition_ast = nullptr; + result.metadata = std::make_shared(metadata); + result.is_minmax_count_projection = true; + return result; +} + + void ProjectionDescription::recalculateWithNewColumns(const ColumnsDescription & new_columns, ContextPtr query_context) { *this = getProjectionFromAST(definition_ast, new_columns, query_context); diff --git a/src/Storages/ProjectionsDescription.h b/src/Storages/ProjectionsDescription.h index 2b279c711fe..e35fbd72f22 100644 --- a/src/Storages/ProjectionsDescription.h +++ b/src/Storages/ProjectionsDescription.h @@ -28,6 +28,8 @@ struct ProjectionDescription Aggregate, }; + static constexpr const char * MINMAX_COUNT_PROJECTION_NAME = "_minmax_count_projection"; + static const char * typeToString(Type type); /// Definition AST of projection @@ -62,10 +64,15 @@ struct ProjectionDescription size_t key_size = 0; + bool is_minmax_count_projection = false; + /// Parse projection from definition AST static ProjectionDescription getProjectionFromAST(const ASTPtr & definition_ast, const ColumnsDescription & columns, ContextPtr query_context); + static ProjectionDescription + getMinMaxCountProjection(const ColumnsDescription & columns, const Names & minmax_columns, ContextPtr query_context); + ProjectionDescription() = default; /// We need custom copy constructors because we don't want @@ -87,8 +94,12 @@ struct ProjectionDescription bool isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const; Block calculate(const Block & block, ContextPtr context) const; + + String getDirectoryName() const { return name + ".proj"; } }; +using ProjectionDescriptionRawPtr = const ProjectionDescription *; + /// All projections in storage struct ProjectionsDescription { diff --git a/src/Storages/SelectQueryInfo.h b/src/Storages/SelectQueryInfo.h index a2db6655223..b08818a2baa 100644 --- a/src/Storages/SelectQueryInfo.h +++ b/src/Storages/SelectQueryInfo.h @@ -106,7 +106,7 @@ using ManyExpressionActions = std::vector; // The projection selected to execute current query struct ProjectionCandidate { - const ProjectionDescription * desc{}; + ProjectionDescriptionRawPtr desc{}; PrewhereInfoPtr prewhere_info; ActionsDAGPtr before_where; String where_column_name; @@ -164,6 +164,7 @@ struct SelectQueryInfo bool ignore_projections = false; bool is_projection_query = false; bool merge_tree_empty_result = false; + Block minmax_count_projection_block; MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr; }; diff --git a/src/Storages/StorageInMemoryMetadata.cpp b/src/Storages/StorageInMemoryMetadata.cpp index cbd27afe106..dcdf3a097e6 100644 --- a/src/Storages/StorageInMemoryMetadata.cpp +++ b/src/Storages/StorageInMemoryMetadata.cpp @@ -29,6 +29,8 @@ StorageInMemoryMetadata::StorageInMemoryMetadata(const StorageInMemoryMetadata & , secondary_indices(other.secondary_indices) , constraints(other.constraints) , projections(other.projections.clone()) + , minmax_count_projection( + other.minmax_count_projection ? std::optional(other.minmax_count_projection->clone()) : std::nullopt) , partition_key(other.partition_key) , primary_key(other.primary_key) , sorting_key(other.sorting_key) @@ -50,6 +52,10 @@ StorageInMemoryMetadata & StorageInMemoryMetadata::operator=(const StorageInMemo secondary_indices = other.secondary_indices; constraints = other.constraints; projections = other.projections.clone(); + if (other.minmax_count_projection) + minmax_count_projection = other.minmax_count_projection->clone(); + else + minmax_count_projection = std::nullopt; partition_key = other.partition_key; primary_key = other.primary_key; sorting_key = other.sorting_key; diff --git a/src/Storages/StorageInMemoryMetadata.h b/src/Storages/StorageInMemoryMetadata.h index 9accdb9b3b6..795bb8f7131 100644 --- a/src/Storages/StorageInMemoryMetadata.h +++ b/src/Storages/StorageInMemoryMetadata.h @@ -28,6 +28,8 @@ struct StorageInMemoryMetadata ConstraintsDescription constraints; /// Table projections. Currently supported for MergeTree only. ProjectionsDescription projections; + /// Table minmax_count projection. Currently supported for MergeTree only. + std::optional minmax_count_projection; /// PARTITION BY expression. Currently supported for MergeTree only. KeyDescription partition_key; /// PRIMARY KEY expression. If absent, than equal to order_by_ast. @@ -107,6 +109,7 @@ struct StorageInMemoryMetadata const ConstraintsDescription & getConstraints() const; const ProjectionsDescription & getProjections() const; + /// Has at least one projection bool hasProjections() const; diff --git a/src/Storages/ya.make b/src/Storages/ya.make index b3494849441..244b5fedae2 100644 --- a/src/Storages/ya.make +++ b/src/Storages/ya.make @@ -76,7 +76,6 @@ SRCS( MergeTree/MergeTreePartInfo.cpp MergeTree/MergeTreePartition.cpp MergeTree/MergeTreePartsMover.cpp - MergeTree/MergeTreeProjections.cpp MergeTree/MergeTreeRangeReader.cpp MergeTree/MergeTreeReadPool.cpp MergeTree/MergeTreeReaderCompact.cpp diff --git a/tests/queries/0_stateless/01710_minmax_count_projection.reference b/tests/queries/0_stateless/01710_minmax_count_projection.reference new file mode 100644 index 00000000000..882d808069e --- /dev/null +++ b/tests/queries/0_stateless/01710_minmax_count_projection.reference @@ -0,0 +1,4 @@ +0 9999 10000 +0 9998 5000 +1 9999 5000 +0 9998 5000 diff --git a/tests/queries/0_stateless/01710_minmax_count_projection.sql b/tests/queries/0_stateless/01710_minmax_count_projection.sql new file mode 100644 index 00000000000..3ee19fe8c2e --- /dev/null +++ b/tests/queries/0_stateless/01710_minmax_count_projection.sql @@ -0,0 +1,14 @@ +drop table if exists d; + +create table d (i int, j int) engine MergeTree partition by i % 2 order by tuple() settings index_granularity = 1; + +insert into d select number, number from numbers(10000); + +set max_rows_to_read = 2, allow_experimental_projection_optimization = 1; + +select min(i), max(i), count() from d; +select min(i), max(i), count() from d group by _partition_id order by _partition_id; +select min(i), max(i), count() from d where _partition_value.1 = 0 group by _partition_id order by _partition_id; +select min(i), max(i), count() from d where _partition_value.1 = 10 group by _partition_id order by _partition_id; + +drop table d;