diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 67377a54c34..6d6ee43acb3 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -3,6 +3,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -11,8 +14,6 @@ #include #include #include -#include -#include #include #include #include @@ -88,38 +89,35 @@ size_t minMarksForConcurrentRead( } ReadFromMergeTree::ReadFromMergeTree( - SelectQueryInfo query_info_, - const MergeTreeDataSelectExecutor::PartitionIdToMaxBlock * max_block_numbers_to_read_, + const SelectQueryInfo & query_info_, + const PartitionIdToMaxBlock * max_block_numbers_to_read_, ContextPtr context_, const MergeTreeData & data_, - const MergeTreeData & storage_, StorageMetadataPtr metadata_snapshot_, StorageMetadataPtr metadata_snapshot_base_, Names real_column_names_, MergeTreeData::DataPartsVector parts_, - //IndexStatPtr index_stats_, PrewhereInfoPtr prewhere_info_, Names virt_column_names_, Settings settings_, - size_t num_streams_) + Poco::Logger * log_) : ISourceStep(DataStream{.header = MergeTreeBaseSelectProcessor::transformHeader( - metadata_snapshot_->getSampleBlockForColumns(real_column_names_, storage_.getVirtuals(), storage_.getStorageID()), + metadata_snapshot_->getSampleBlockForColumns(real_column_names_, data_.getVirtuals(), data_.getStorageID()), prewhere_info_, - storage_.getPartitionValueType(), + data_.getPartitionValueType(), virt_column_names_)}) , query_info(std::move(query_info_)) , max_block_numbers_to_read(max_block_numbers_to_read_) , context(std::move(context_)) , data(data_) - , storage(storage_) , metadata_snapshot(std::move(metadata_snapshot_)) , metadata_snapshot_base(std::move(metadata_snapshot_base_)) , real_column_names(std::move(real_column_names_)) - , parts(std::move(parts_)) + , prepared_parts(std::move(parts_)) , prewhere_info(std::move(prewhere_info_)) , virt_column_names(std::move(virt_column_names_)) , settings(std::move(settings_)) - , num_streams(num_streams_) + , log(log_) { } @@ -142,7 +140,7 @@ Pipe ReadFromMergeTree::readFromPool( sum_marks, min_marks_for_concurrent_read, std::move(parts_with_range), - storage, + data, metadata_snapshot, prewhere_info, true, @@ -151,7 +149,7 @@ Pipe ReadFromMergeTree::readFromPool( settings.preferred_block_size_bytes, false); - auto * logger = &Poco::Logger::get(storage.getLogName() + " (SelectExecutor)"); + auto * logger = &Poco::Logger::get(data.getLogName() + " (SelectExecutor)"); LOG_DEBUG(logger, "Reading approx. {} rows with {} streams", total_rows, used_max_streams); for (size_t i = 0; i < used_max_streams; ++i) @@ -159,7 +157,7 @@ Pipe ReadFromMergeTree::readFromPool( auto source = std::make_shared( i, pool, min_marks_for_concurrent_read, settings.max_block_size, settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes, - storage, metadata_snapshot, use_uncompressed_cache, + data, metadata_snapshot, use_uncompressed_cache, prewhere_info, settings.reader_settings, virt_column_names); if (i == 0) @@ -178,7 +176,7 @@ template ProcessorPtr ReadFromMergeTree::createSource(const RangesInDataPart & part, const Names & required_columns, bool use_uncompressed_cache) { return std::make_shared( - storage, metadata_snapshot, part.data_part, settings.max_block_size, settings.preferred_block_size_bytes, + data, metadata_snapshot, part.data_part, settings.max_block_size, settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache, prewhere_info, true, settings.reader_settings, virt_column_names, part.part_index_in_query); } @@ -212,7 +210,7 @@ Pipe ReadFromMergeTree::read( RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, size_t used_max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache) { - if (read_type == ReadType::Default && num_streams > 1) + if (read_type == ReadType::Default && used_max_streams > 1) return readFromPool(parts_with_range, required_columns, used_max_streams, min_marks_for_concurrent_read, use_uncompressed_cache); auto pipe = readInOrder(parts_with_range, required_columns, read_type, use_uncompressed_cache); @@ -225,126 +223,6 @@ Pipe ReadFromMergeTree::read( return pipe; } -static std::optional> filterPartsByVirtualColumns( - const MergeTreeData & data, - MergeTreeData::DataPartsVector & parts, - ASTPtr & query, - ContextPtr context) -{ - std::unordered_set part_values; - ASTPtr expression_ast; - auto virtual_columns_block = data.getBlockWithVirtualPartColumns(parts, true /* one_part */); - - // Generate valid expressions for filtering - VirtualColumnUtils::prepareFilterBlockWithQuery(query, context, virtual_columns_block, expression_ast); - - // If there is still something left, fill the virtual block and do the filtering. - if (expression_ast) - { - virtual_columns_block = data.getBlockWithVirtualPartColumns(parts, false /* one_part */); - VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context, expression_ast); - return VirtualColumnUtils::extractSingleValueFromBlock(virtual_columns_block, "_part"); - } - - return {}; -} - -static void filterPartsByPartition( - StorageMetadataPtr & metadata_snapshot, - const MergeTreeData & data, - SelectQueryInfo & query_info, - ContextPtr & context, - ContextPtr & query_context, - MergeTreeData::DataPartsVector & parts, - const std::optional> & part_values, - const MergeTreeDataSelectExecutor::PartitionIdToMaxBlock * max_block_numbers_to_read, - Poco::Logger * log, - ReadFromMergeTree::IndexStats & index_stats) -{ - const Settings & settings = context->getSettingsRef(); - std::optional partition_pruner; - std::optional minmax_idx_condition; - DataTypes minmax_columns_types; - if (metadata_snapshot->hasPartitionKey()) - { - const auto & partition_key = metadata_snapshot->getPartitionKey(); - auto minmax_columns_names = data.getMinMaxColumnsNames(partition_key); - minmax_columns_types = data.getMinMaxColumnsTypes(partition_key); - - minmax_idx_condition.emplace( - query_info, context, minmax_columns_names, data.getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(context))); - partition_pruner.emplace(metadata_snapshot->getPartitionKey(), query_info, context, false /* strict */); - - if (settings.force_index_by_date && (minmax_idx_condition->alwaysUnknownOrTrue() && partition_pruner->isUseless())) - { - String msg = "Neither MinMax index by columns ("; - bool first = true; - for (const String & col : minmax_columns_names) - { - if (first) - first = false; - else - msg += ", "; - msg += col; - } - msg += ") nor partition expr is used and setting 'force_index_by_date' is set"; - - throw Exception(msg, ErrorCodes::INDEX_NOT_USED); - } - } - - MergeTreeDataSelectExecutor::PartFilterCounters part_filter_counters; - if (query_context->getSettingsRef().allow_experimental_query_deduplication) - MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter( - parts, - part_values, - data.getPinnedPartUUIDs(), - minmax_idx_condition, - minmax_columns_types, - partition_pruner, - max_block_numbers_to_read, - query_context, - part_filter_counters, - log); - else - MergeTreeDataSelectExecutor::selectPartsToRead( - parts, - part_values, - minmax_idx_condition, - minmax_columns_types, - partition_pruner, - max_block_numbers_to_read, - part_filter_counters); - - index_stats.emplace_back(ReadFromMergeTree::IndexStat{ - .type = ReadFromMergeTree::IndexType::None, - .num_parts_after = part_filter_counters.num_initial_selected_parts, - .num_granules_after = part_filter_counters.num_initial_selected_granules}); - - if (minmax_idx_condition) - { - auto description = minmax_idx_condition->getDescription(); - index_stats.emplace_back(ReadFromMergeTree::IndexStat{ - .type = ReadFromMergeTree::IndexType::MinMax, - .condition = std::move(description.condition), - .used_keys = std::move(description.used_keys), - .num_parts_after = part_filter_counters.num_parts_after_minmax, - .num_granules_after = part_filter_counters.num_granules_after_minmax}); - LOG_DEBUG(log, "MinMax index condition: {}", minmax_idx_condition->toString()); - } - - if (partition_pruner) - { - auto description = partition_pruner->getKeyCondition().getDescription(); - index_stats.emplace_back(ReadFromMergeTree::IndexStat{ - .type = ReadFromMergeTree::IndexType::Partition, - .condition = std::move(description.condition), - .used_keys = std::move(description.used_keys), - .num_parts_after = part_filter_counters.num_parts_after_partition_pruner, - .num_granules_after = part_filter_counters.num_granules_after_partition_pruner}); - } -} - Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams( RangesInDataParts && parts_with_ranges, const Names & column_names) @@ -392,7 +270,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams( if (0 == sum_marks) return {}; - size_t used_num_streams = num_streams; + size_t used_num_streams = settings.num_streams; if (used_num_streams > 1) { /// Reduce the number of num_streams if the data is small. @@ -506,12 +384,12 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder( return new_ranges; }; - const size_t min_marks_per_stream = (sum_marks - 1) / num_streams + 1; + const size_t min_marks_per_stream = (sum_marks - 1) / settings.num_streams + 1; bool need_preliminary_merge = (parts_with_ranges.size() > q_settings.read_in_order_two_level_merge_threshold); Pipes pipes; - for (size_t i = 0; i < num_streams && !parts_with_ranges.empty(); ++i) + for (size_t i = 0; i < settings.num_streams && !parts_with_ranges.empty(); ++i) { size_t need_marks = min_marks_per_stream; RangesInDataParts new_parts; @@ -577,7 +455,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder( : ReadFromMergeTree::ReadType::InReverseOrder; pipes.emplace_back(read(std::move(new_parts), column_names, read_type, - num_streams, min_marks_for_concurrent_read, use_uncompressed_cache)); + settings.num_streams, min_marks_for_concurrent_read, use_uncompressed_cache)); } if (need_preliminary_merge) @@ -755,7 +633,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( if (sum_marks > max_marks_to_use_cache) use_uncompressed_cache = false; - size_t used_num_streams = num_streams; + size_t used_num_streams = settings.num_streams; if (used_num_streams > q_settings.max_final_threads) used_num_streams = q_settings.max_final_threads; @@ -966,9 +844,10 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal( void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) { + auto parts = std::move(prepared_parts); size_t total_parts = parts.size(); - auto part_values = filterPartsByVirtualColumns(data, parts, query_info.query, context); + auto part_values = MergeTreeDataSelectExecutor::filterPartsByVirtualColumns(data, parts, query_info.query, context); if (part_values && part_values->empty()) { pipeline.init(Pipe(std::make_shared(getOutputStream().header))); @@ -1001,7 +880,7 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build const auto & select = query_info.query->as(); auto query_context = context->hasQueryContext() ? context->getQueryContext() : context; - filterPartsByPartition( + MergeTreeDataSelectExecutor::filterPartsByPartition( metadata_snapshot_base, data, query_info, context, query_context, parts, part_values, max_block_numbers_to_read, log, index_stats); bool sample_factor_column_queried = false; @@ -1023,16 +902,17 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build for (const auto & part : parts) total_marks_pk += part->index_granularity.getMarksCountWithoutFinal(); - auto parts_with_ranges = MergeTreeDataSelectExecutor::filterParts( - parts, + auto parts_with_ranges = MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes( + std::move(parts), metadata_snapshot, query_info, context, key_condition, settings.reader_settings, log, - num_streams, - index_stats); + settings.num_streams, + index_stats, + true); size_t sum_marks_pk = total_marks_pk; for (const auto & stat : index_stats) @@ -1132,12 +1012,69 @@ void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const Build column_names_to_read); } + if (pipe.empty()) + { + pipeline.init(Pipe(std::make_shared(getOutputStream().header))); + return; + } + + if (sampling.use_sampling) + { + auto sampling_actions = std::make_shared(sampling.filter_expression); + pipe.addSimpleTransform([&](const Block & header) + { + return std::make_shared( + header, + sampling_actions, + sampling.filter_function->getColumnName(), + false); + }); + } + + if (result_projection) + { + auto projection_actions = std::make_shared(result_projection); + pipe.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, projection_actions); + }); + } + + /// By the way, if a distributed query or query to a Merge table is made, then the `_sample_factor` column can have different values. + if (sample_factor_column_queried) + { + ColumnWithTypeAndName column; + column.name = "_sample_factor"; + column.type = std::make_shared(); + column.column = column.type->createColumnConst(0, Field(sampling.used_sample_factor)); + + auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column)); + auto adding_column_action = std::make_shared(adding_column_dag); + + pipe.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, adding_column_action); + }); + } + + // TODO There seems to be no place initializing remove_columns_actions + if (query_info.prewhere_info && query_info.prewhere_info->remove_columns_actions) + { + auto remove_columns_action = std::make_shared( + query_info.prewhere_info->remove_columns_actions->getActionsDAG().clone()); + + pipe.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, remove_columns_action); + }); + } + for (const auto & processor : pipe.getProcessors()) processors.emplace_back(processor); // Attach QueryIdHolder if needed if (!query_id.empty()) - pipe.addQueryIdHolder(std::make_shared(query_id, storage)); + pipe.addQueryIdHolder(std::make_shared(query_id, data)); pipeline.init(std::move(pipe)); } @@ -1161,20 +1098,20 @@ static const char * indexTypeToString(ReadFromMergeTree::IndexType type) __builtin_unreachable(); } -static const char * readTypeToString(ReadFromMergeTree::ReadType type) -{ - switch (type) - { - case ReadFromMergeTree::ReadType::Default: - return "Default"; - case ReadFromMergeTree::ReadType::InOrder: - return "InOrder"; - case ReadFromMergeTree::ReadType::InReverseOrder: - return "InReverseOrder"; - } +// static const char * readTypeToString(ReadFromMergeTree::ReadType type) +// { +// switch (type) +// { +// case ReadFromMergeTree::ReadType::Default: +// return "Default"; +// case ReadFromMergeTree::ReadType::InOrder: +// return "InOrder"; +// case ReadFromMergeTree::ReadType::InReverseOrder: +// return "InReverseOrder"; +// } - __builtin_unreachable(); -} +// __builtin_unreachable(); +// } void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const { diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index b3366afb118..e9341e46770 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -3,11 +3,13 @@ #include #include #include -#include +//#include namespace DB { +using PartitionIdToMaxBlock = std::unordered_map; + /// This step is created to read from MergeTree* table. /// For now, it takes a list of parts and creates source from it. class ReadFromMergeTree final : public ISourceStep @@ -42,9 +44,10 @@ public: struct Settings { UInt64 max_block_size; + size_t num_streams; size_t preferred_block_size_bytes; size_t preferred_max_column_in_block_size_bytes; - size_t min_marks_for_concurrent_read; + //size_t min_marks_for_concurrent_read; bool use_uncompressed_cache; bool force_primary_key; @@ -68,21 +71,18 @@ public: }; ReadFromMergeTree( - SelectQueryInfo query_info_, - const MergeTreeDataSelectExecutor::PartitionIdToMaxBlock * max_block_numbers_to_read_, + const SelectQueryInfo & query_info_, + const PartitionIdToMaxBlock * max_block_numbers_to_read_, ContextPtr context_, const MergeTreeData & data_, - const MergeTreeData & storage_, StorageMetadataPtr metadata_snapshot_, StorageMetadataPtr metadata_snapshot_base_, Names real_column_names_, MergeTreeData::DataPartsVector parts_, - //IndexStatPtr index_stats_, PrewhereInfoPtr prewhere_info_, Names virt_column_names_, Settings settings_, - size_t num_streams_, - //ReadType read_type_ + Poco::Logger * log_ ); String getName() const override { return "ReadFromMergeTree"; } @@ -97,23 +97,19 @@ public: private: SelectQueryInfo query_info; - const MergeTreeDataSelectExecutor::PartitionIdToMaxBlock * max_block_numbers_to_read; + const PartitionIdToMaxBlock * max_block_numbers_to_read; ContextPtr context; const MergeTreeData & data; - const MergeTreeData & storage; StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot_base; Names real_column_names; - MergeTreeData::DataPartsVector parts; - IndexStats index_stats; + MergeTreeData::DataPartsVector prepared_parts; PrewhereInfoPtr prewhere_info; + IndexStats index_stats; Names virt_column_names; Settings settings; - size_t num_streams; - //ReadType read_type; - Poco::Logger * log; Pipe read(RangesInDataParts parts_with_range, Names required_columns, ReadType read_type, size_t used_max_streams, size_t min_marks_for_concurrent_read, bool use_uncompressed_cache); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 41adca37c60..e16bbb640e2 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3839,21 +3839,18 @@ static void selectBestProjection( if (projection_parts.empty()) return; - candidate.merge_tree_data_select_base_cache = std::make_unique(); - candidate.merge_tree_data_select_projection_cache = std::make_unique(); - reader.readFromParts( + //candidate.merge_tree_data_select_base_cache = std::make_unique(); + //candidate.merge_tree_data_select_projection_cache = std::make_unique(); + auto sum_marks = reader.estimateNumMarksToRead( projection_parts, candidate.required_columns, metadata_snapshot, candidate.desc->metadata, query_info, // TODO syntax_analysis_result set in index query_context, - 0, // max_block_size is unused when getting cache settings.max_threads, - max_added_blocks, - candidate.merge_tree_data_select_projection_cache.get()); + max_added_blocks); - size_t sum_marks = candidate.merge_tree_data_select_projection_cache->sum_marks; if (normal_parts.empty()) { // All parts are projection parts which allows us to use in_order_optimization. @@ -3862,18 +3859,15 @@ static void selectBestProjection( } else { - reader.readFromParts( + sum_marks += reader.estimateNumMarksToRead( normal_parts, required_columns, metadata_snapshot, metadata_snapshot, query_info, // TODO syntax_analysis_result set in index query_context, - 0, // max_block_size is unused when getting cache settings.max_threads, - max_added_blocks, - candidate.merge_tree_data_select_base_cache.get()); - sum_marks += candidate.merge_tree_data_select_base_cache->sum_marks; + max_added_blocks); } // We choose the projection with least sum_marks to read. @@ -4101,7 +4095,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( if (!candidates.empty()) { // First build a MergeTreeDataSelectCache to check if a projection is indeed better than base - query_info.merge_tree_data_select_cache = std::make_unique(); + // query_info.merge_tree_data_select_cache = std::make_unique(); std::unique_ptr max_added_blocks; if (settings.select_sequential_consistency) @@ -4112,21 +4106,10 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( auto parts = getDataPartsVector(); MergeTreeDataSelectExecutor reader(*this); - reader.readFromParts( - parts, - analysis_result.required_columns, - metadata_snapshot, - metadata_snapshot, - query_info, // TODO syntax_analysis_result set in index - query_context, - 0, // max_block_size is unused when getting cache - settings.max_threads, - max_added_blocks.get(), - query_info.merge_tree_data_select_cache.get()); - // Add 1 to base sum_marks so that we prefer projections even when they have equal number of marks to read. - size_t min_sum_marks = query_info.merge_tree_data_select_cache->sum_marks + 1; ProjectionCandidate * selected_candidate = nullptr; + size_t min_sum_marks = std::numeric_limits::max(); + bool has_ordinary_projection = false; /// Favor aggregate projections for (auto & candidate : candidates) { @@ -4145,11 +4128,27 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection( selected_candidate, min_sum_marks); } + else + has_ordinary_projection = true; } /// Select the best normal projection if no aggregate projection is available - if (!selected_candidate) + if (!selected_candidate && has_ordinary_projection) { + min_sum_marks = reader.estimateNumMarksToRead( + parts, + analysis_result.required_columns, + metadata_snapshot, + metadata_snapshot, + query_info, // TODO syntax_analysis_result set in index + query_context, + settings.max_threads, + max_added_blocks.get()); + + // Add 1 to base sum_marks so that we prefer projections even when they have equal number of marks to read. + // NOTE: It is not clear if we need it. E.g. projections do not support skip index for now. + min_sum_marks += 1; + for (auto & candidate : candidates) { if (candidate.desc->type == ProjectionDescription::Type::Normal) diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 49bcf751bd1..8a3550fc511 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -47,7 +47,6 @@ #include #include #include -#include #include namespace ProfileEvents @@ -143,6 +142,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( const PartitionIdToMaxBlock * max_block_numbers_to_read) const { const auto & settings = context->getSettingsRef(); + auto parts = data.getDataPartsVector(); if (!query_info.projection) { if (settings.allow_experimental_projection_optimization && settings.force_optimize_projection @@ -150,7 +150,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( throw Exception("No projection is used when allow_experimental_projection_optimization = 1", ErrorCodes::PROJECTION_NOT_USED); return readFromParts( - data.getDataPartsVector(), + parts, column_names_to_return, metadata_snapshot, metadata_snapshot, @@ -158,8 +158,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( context, max_block_size, num_streams, - max_block_numbers_to_read, - query_info.merge_tree_data_select_cache.get()); + max_block_numbers_to_read); } LOG_DEBUG( @@ -168,21 +167,33 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( ProjectionDescription::typeToString(query_info.projection->desc->type), query_info.projection->desc->name); - if (query_info.projection->merge_tree_data_select_base_cache->sum_marks - + query_info.projection->merge_tree_data_select_projection_cache->sum_marks - == 0) - return std::make_unique(); + // if (query_info.projection->merge_tree_data_select_base_cache->sum_marks + // + query_info.projection->merge_tree_data_select_projection_cache->sum_marks + // == 0) + // return std::make_unique(); + + MergeTreeData::DataPartsVector projection_parts; + MergeTreeData::DataPartsVector normal_parts; + for (const auto & part : parts) + { + const auto & projections = part->getProjectionParts(); + auto it = projections.find(query_info.projection->desc->name); + if (it != projections.end()) + projection_parts.push_back(it->second); + else + normal_parts.push_back(part); + } Pipes pipes; Pipe projection_pipe; Pipe ordinary_pipe; const auto & given_select = query_info.query->as(); - if (query_info.projection->merge_tree_data_select_projection_cache->sum_marks > 0) + if (!projection_parts.empty()) { LOG_DEBUG(log, "projection required columns: {}", fmt::join(query_info.projection->required_columns, ", ")); auto plan = readFromParts( - {}, + projection_parts, query_info.projection->required_columns, metadata_snapshot, query_info.projection->desc->metadata, @@ -190,8 +201,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( context, max_block_size, num_streams, - max_block_numbers_to_read, - query_info.projection->merge_tree_data_select_projection_cache.get()); + max_block_numbers_to_read); if (plan) { @@ -225,9 +235,9 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( } } - if (query_info.projection->merge_tree_data_select_base_cache->sum_marks > 0) + if (!normal_parts.empty()) { - auto storage_from_base_parts_of_projection = StorageFromBasePartsOfProjection::create(data, metadata_snapshot); + auto storage_from_base_parts_of_projection = StorageFromMergeTreeDataPart::create(std::move(normal_parts)); auto ast = query_info.projection->desc->query_ast->clone(); auto & select = ast->as(); if (given_select.where()) @@ -382,7 +392,6 @@ MergeTreeDataSelectSamplingData MergeTreeDataSelectExecutor::getSampling( ContextPtr context) { const Settings & settings = context->getSettingsRef(); - Float64 used_sample_factor = 1; /// Sampling. MergeTreeDataSelectSamplingData sampling; @@ -485,7 +494,7 @@ MergeTreeDataSelectSamplingData MergeTreeDataSelectExecutor::getSampling( if (sampling.use_sampling) { if (sample_factor_column_queried && relative_sample_size != RelativeSize(0)) - used_sample_factor = 1.0 / boost::rational_cast(relative_sample_size); + sampling.used_sample_factor = 1.0 / boost::rational_cast(relative_sample_size); RelativeSize size_of_universum = 0; const auto & sampling_key = metadata_snapshot->getSamplingKey(); @@ -633,16 +642,137 @@ MergeTreeDataSelectSamplingData MergeTreeDataSelectExecutor::getSampling( return sampling; } -RangesInDataParts MergeTreeDataSelectExecutor::filterParts( +std::optional> MergeTreeDataSelectExecutor::filterPartsByVirtualColumns( + const MergeTreeData & data, MergeTreeData::DataPartsVector & parts, + const ASTPtr & query, + ContextPtr context) +{ + std::unordered_set part_values; + ASTPtr expression_ast; + auto virtual_columns_block = data.getBlockWithVirtualPartColumns(parts, true /* one_part */); + + // Generate valid expressions for filtering + VirtualColumnUtils::prepareFilterBlockWithQuery(query, context, virtual_columns_block, expression_ast); + + // If there is still something left, fill the virtual block and do the filtering. + if (expression_ast) + { + virtual_columns_block = data.getBlockWithVirtualPartColumns(parts, false /* one_part */); + VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context, expression_ast); + return VirtualColumnUtils::extractSingleValueFromBlock(virtual_columns_block, "_part"); + } + + return {}; +} + +void MergeTreeDataSelectExecutor::filterPartsByPartition( + const StorageMetadataPtr & metadata_snapshot, + const MergeTreeData & data, + const SelectQueryInfo & query_info, + ContextPtr & context, + ContextPtr & query_context, + MergeTreeData::DataPartsVector & parts, + const std::optional> & part_values, + const PartitionIdToMaxBlock * max_block_numbers_to_read, + Poco::Logger * log, + ReadFromMergeTree::IndexStats & index_stats) +{ + const Settings & settings = context->getSettingsRef(); + std::optional partition_pruner; + std::optional minmax_idx_condition; + DataTypes minmax_columns_types; + if (metadata_snapshot->hasPartitionKey()) + { + const auto & partition_key = metadata_snapshot->getPartitionKey(); + auto minmax_columns_names = data.getMinMaxColumnsNames(partition_key); + minmax_columns_types = data.getMinMaxColumnsTypes(partition_key); + + minmax_idx_condition.emplace( + query_info, context, minmax_columns_names, data.getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(context))); + partition_pruner.emplace(metadata_snapshot->getPartitionKey(), query_info, context, false /* strict */); + + if (settings.force_index_by_date && (minmax_idx_condition->alwaysUnknownOrTrue() && partition_pruner->isUseless())) + { + String msg = "Neither MinMax index by columns ("; + bool first = true; + for (const String & col : minmax_columns_names) + { + if (first) + first = false; + else + msg += ", "; + msg += col; + } + msg += ") nor partition expr is used and setting 'force_index_by_date' is set"; + + throw Exception(msg, ErrorCodes::INDEX_NOT_USED); + } + } + + PartFilterCounters part_filter_counters; + if (query_context->getSettingsRef().allow_experimental_query_deduplication) + selectPartsToReadWithUUIDFilter( + parts, + part_values, + data.getPinnedPartUUIDs(), + minmax_idx_condition, + minmax_columns_types, + partition_pruner, + max_block_numbers_to_read, + query_context, + part_filter_counters, + log); + else + selectPartsToRead( + parts, + part_values, + minmax_idx_condition, + minmax_columns_types, + partition_pruner, + max_block_numbers_to_read, + part_filter_counters); + + index_stats.emplace_back(ReadFromMergeTree::IndexStat{ + .type = ReadFromMergeTree::IndexType::None, + .num_parts_after = part_filter_counters.num_initial_selected_parts, + .num_granules_after = part_filter_counters.num_initial_selected_granules}); + + if (minmax_idx_condition) + { + auto description = minmax_idx_condition->getDescription(); + index_stats.emplace_back(ReadFromMergeTree::IndexStat{ + .type = ReadFromMergeTree::IndexType::MinMax, + .condition = std::move(description.condition), + .used_keys = std::move(description.used_keys), + .num_parts_after = part_filter_counters.num_parts_after_minmax, + .num_granules_after = part_filter_counters.num_granules_after_minmax}); + LOG_DEBUG(log, "MinMax index condition: {}", minmax_idx_condition->toString()); + } + + if (partition_pruner) + { + auto description = partition_pruner->getKeyCondition().getDescription(); + index_stats.emplace_back(ReadFromMergeTree::IndexStat{ + .type = ReadFromMergeTree::IndexType::Partition, + .condition = std::move(description.condition), + .used_keys = std::move(description.used_keys), + .num_parts_after = part_filter_counters.num_parts_after_partition_pruner, + .num_granules_after = part_filter_counters.num_granules_after_partition_pruner}); + } +} + +RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes( + MergeTreeData::DataPartsVector && parts, StorageMetadataPtr metadata_snapshot, - SelectQueryInfo & query_info, + const SelectQueryInfo & query_info, ContextPtr & context, KeyCondition & key_condition, const MergeTreeReaderSettings & reader_settings, Poco::Logger * log, size_t num_streams, - ReadFromMergeTree::IndexStats & index_stats) + ReadFromMergeTree::IndexStats & index_stats, + bool use_skip_indexes) { RangesInDataParts parts_with_ranges(parts.size()); const Settings & settings = context->getSettingsRef(); @@ -665,15 +795,18 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterParts( }; std::list useful_indices; - for (const auto & index : metadata_snapshot->getSecondaryIndices()) + if (use_skip_indexes) { - auto index_helper = MergeTreeIndexFactory::instance().get(index); - auto condition = index_helper->createIndexCondition(query_info, context); - if (!condition->alwaysUnknownOrTrue()) - useful_indices.emplace_back(index_helper, condition); + for (const auto & index : metadata_snapshot->getSecondaryIndices()) + { + auto index_helper = MergeTreeIndexFactory::instance().get(index); + auto condition = index_helper->createIndexCondition(query_info, context); + if (!condition->alwaysUnknownOrTrue()) + useful_indices.emplace_back(index_helper, condition); + } } - if (settings.force_data_skipping_indices.changed) + if (use_skip_indexes && settings.force_data_skipping_indices.changed) { const auto & indices = settings.force_data_skipping_indices.toString(); @@ -860,7 +993,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterParts( return parts_with_ranges; } -void checkLimits(MergeTreeData & data, const RangesInDataParts & parts_with_range, ContextPtr & context) +String MergeTreeDataSelectExecutor::checkLimits(const MergeTreeData & data, const RangesInDataParts & parts_with_ranges, ContextPtr & context) { const auto & settings = context->getSettingsRef(); // Check limitations. query_id is used as the quota RAII's resource key. @@ -872,7 +1005,7 @@ void checkLimits(MergeTreeData & data, const RangesInDataParts & parts_with_rang if (max_partitions_to_read > 0) { std::set partitions; - for (auto & part_with_ranges : parts_with_ranges) + for (const auto & part_with_ranges : parts_with_ranges) partitions.insert(part_with_ranges.data_part->info.partition_id); if (partitions.size() > size_t(max_partitions_to_read)) throw Exception( @@ -896,33 +1029,18 @@ void checkLimits(MergeTreeData & data, const RangesInDataParts & parts_with_rang } } } + + return query_id; } -QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( - MergeTreeData::DataPartsVector parts, +static void selectColumnNames( const Names & column_names_to_return, - const StorageMetadataPtr & metadata_snapshot_base, - const StorageMetadataPtr & metadata_snapshot, - const SelectQueryInfo & query_info, - ContextPtr context, - const UInt64 max_block_size, - const unsigned num_streams, - const PartitionIdToMaxBlock * max_block_numbers_to_read, - MergeTreeDataSelectCache * cache) const + const MergeTreeData & data, + Names & real_column_names, + Names & virt_column_names, + bool & sample_factor_column_queried) { - bool use_cache = cache && cache->use_cache; - - /// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it. - /// The virtual column `_sample_factor` (which is equal to 1 / used sample rate) can be requested in the query. - Names virt_column_names; - Names real_column_names; - - size_t total_parts = parts.size(); - if (!use_cache && total_parts == 0) - return std::make_unique(); - - bool sample_factor_column_queried = false; - Float64 used_sample_factor = 1; + sample_factor_column_queried = false; for (const String & name : column_names_to_return) { @@ -964,395 +1082,112 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( real_column_names.push_back(name); } } +} - // Filter parts by virtual columns. - std::unordered_set part_values; - if (!use_cache) - { - ASTPtr expression_ast; - auto virtual_columns_block = data.getBlockWithVirtualPartColumns(parts, true /* one_part */); +size_t MergeTreeDataSelectExecutor::estimateNumMarksToRead( + MergeTreeData::DataPartsVector parts, + const Names & column_names_to_return, + const StorageMetadataPtr & metadata_snapshot_base, + const StorageMetadataPtr & metadata_snapshot, + const SelectQueryInfo & query_info, + ContextPtr context, + unsigned num_streams, + const PartitionIdToMaxBlock * max_block_numbers_to_read) const +{ + size_t total_parts = parts.size(); + if (total_parts == 0) + return 0; - // Generate valid expressions for filtering - VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, context, virtual_columns_block, expression_ast); + Names real_column_names; + Names virt_column_names; + /// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it. + /// The virtual column `_sample_factor` (which is equal to 1 / used sample rate) can be requested in the query. + bool sample_factor_column_queried = false; - // If there is still something left, fill the virtual block and do the filtering. - if (expression_ast) - { - virtual_columns_block = data.getBlockWithVirtualPartColumns(parts, false /* one_part */); - VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, context, expression_ast); - part_values = VirtualColumnUtils::extractSingleValueFromBlock(virtual_columns_block, "_part"); - if (part_values.empty()) - return std::make_unique(); - } - } - // At this point, empty `part_values` means all parts. + selectColumnNames(column_names_to_return, data, real_column_names, virt_column_names, sample_factor_column_queried); - const Settings & settings = context->getSettingsRef(); - NamesAndTypesList available_real_columns = metadata_snapshot->getColumns().getAllPhysical(); + auto part_values = filterPartsByVirtualColumns(data, parts, query_info.query, context); + if (part_values && part_values->empty()) + return 0; /// If there are only virtual columns in the query, you must request at least one non-virtual one. if (real_column_names.empty()) + { + NamesAndTypesList available_real_columns = metadata_snapshot->getColumns().getAllPhysical(); real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns)); + } metadata_snapshot->check(real_column_names, data.getVirtuals(), data.getStorageID()); - // Build and check if primary key is used when necessary - std::optional key_condition; - if (!use_cache) - { - const auto & primary_key = metadata_snapshot->getPrimaryKey(); - Names primary_key_columns = primary_key.column_names; - key_condition.emplace(query_info, context, primary_key_columns, primary_key.expression); + const auto & primary_key = metadata_snapshot->getPrimaryKey(); + Names primary_key_columns = primary_key.column_names; + KeyCondition key_condition(query_info, context, primary_key_columns, primary_key.expression); - if (settings.force_primary_key && key_condition->alwaysUnknownOrTrue()) - { - throw Exception( - ErrorCodes::INDEX_NOT_USED, - "Primary key ({}) is not used and setting 'force_primary_key' is set.", - fmt::join(primary_key_columns, ", ")); - } - LOG_DEBUG(log, "Key condition: {}", key_condition->toString()); + if (key_condition.alwaysUnknownOrTrue()) + { + size_t total_marks = 0; + for (const auto & part : parts) + total_marks += part->index_granularity.getMarksCountWithoutFinal(); + + return total_marks; } const auto & select = query_info.query->as(); auto query_context = context->hasQueryContext() ? context->getQueryContext() : context; - auto index_stats = use_cache ? std::move(cache->index_stats) : std::make_unique(); + ReadFromMergeTree::IndexStats index_stats; - // Select parts to read and do partition pruning via partition value and minmax indices - if (!use_cache) - { - std::optional partition_pruner; - std::optional minmax_idx_condition; - DataTypes minmax_columns_types; - if (metadata_snapshot_base->hasPartitionKey()) - { - const auto & partition_key = metadata_snapshot_base->getPartitionKey(); - auto minmax_columns_names = data.getMinMaxColumnsNames(partition_key); - minmax_columns_types = data.getMinMaxColumnsTypes(partition_key); + filterPartsByPartition( + metadata_snapshot_base, data, query_info, context, query_context, parts, part_values, max_block_numbers_to_read, log, index_stats); - minmax_idx_condition.emplace( - query_info, context, minmax_columns_names, data.getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(context))); - partition_pruner.emplace(metadata_snapshot_base->getPartitionKey(), query_info, context, false /* strict */); + auto sampling = MergeTreeDataSelectExecutor::getSampling( + select, parts, metadata_snapshot, key_condition, + data, log, sample_factor_column_queried, metadata_snapshot->getColumns().getAllPhysical(), context); - if (settings.force_index_by_date && (minmax_idx_condition->alwaysUnknownOrTrue() && partition_pruner->isUseless())) - { - String msg = "Neither MinMax index by columns ("; - bool first = true; - for (const String & col : minmax_columns_names) - { - if (first) - first = false; - else - msg += ", "; - msg += col; - } - msg += ") nor partition expr is used and setting 'force_index_by_date' is set"; + if (sampling.read_nothing) + return 0; - throw Exception(msg, ErrorCodes::INDEX_NOT_USED); - } - } + /// Do not init. Ther are not used (cause skip index is ignored) + MergeTreeReaderSettings reader_settings; - PartFilterCounters part_filter_counters; - if (query_context->getSettingsRef().allow_experimental_query_deduplication) - selectPartsToReadWithUUIDFilter( - parts, - part_values, - data.getPinnedPartUUIDs(), - minmax_idx_condition, - minmax_columns_types, - partition_pruner, - max_block_numbers_to_read, - query_context, - part_filter_counters, - log); - else - selectPartsToRead( - parts, - part_values, - minmax_idx_condition, - minmax_columns_types, - partition_pruner, - max_block_numbers_to_read, - part_filter_counters); + auto parts_with_ranges = filterPartsByPrimaryKeyAndSkipIndexes( + std::move(parts), + metadata_snapshot, + query_info, + context, + key_condition, + reader_settings, + log, + num_streams, + index_stats, + false); - index_stats->emplace_back(ReadFromMergeTree::IndexStat{ - .type = ReadFromMergeTree::IndexType::None, - .num_parts_after = part_filter_counters.num_initial_selected_parts, - .num_granules_after = part_filter_counters.num_initial_selected_granules}); + return index_stats.back().num_granules_after; +} - if (minmax_idx_condition) - { - auto description = minmax_idx_condition->getDescription(); - index_stats->emplace_back(ReadFromMergeTree::IndexStat{ - .type = ReadFromMergeTree::IndexType::MinMax, - .condition = std::move(description.condition), - .used_keys = std::move(description.used_keys), - .num_parts_after = part_filter_counters.num_parts_after_minmax, - .num_granules_after = part_filter_counters.num_granules_after_minmax}); - LOG_DEBUG(log, "MinMax index condition: {}", minmax_idx_condition->toString()); - } +QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( + MergeTreeData::DataPartsVector parts, + const Names & column_names_to_return, + const StorageMetadataPtr & metadata_snapshot_base, + const StorageMetadataPtr & metadata_snapshot, + const SelectQueryInfo & query_info, + ContextPtr context, + const UInt64 max_block_size, + const unsigned num_streams, + const PartitionIdToMaxBlock * max_block_numbers_to_read) const +{ + size_t total_parts = parts.size(); + if (total_parts == 0) + return std::make_unique(); - if (partition_pruner) - { - auto description = partition_pruner->getKeyCondition().getDescription(); - index_stats->emplace_back(ReadFromMergeTree::IndexStat{ - .type = ReadFromMergeTree::IndexType::Partition, - .condition = std::move(description.condition), - .used_keys = std::move(description.used_keys), - .num_parts_after = part_filter_counters.num_parts_after_partition_pruner, - .num_granules_after = part_filter_counters.num_granules_after_partition_pruner}); - } - } + Names real_column_names; + Names virt_column_names; + /// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it. + /// The virtual column `_sample_factor` (which is equal to 1 / used sample rate) can be requested in the query. + bool sample_factor_column_queried = false; - /// Sampling. - MergeTreeDataSelectSamplingData sampling = use_cache ? std::move(cache->sampling) : MergeTreeDataSelectSamplingData{}; - if (!use_cache) - { - assert(key_condition.has_value()); + selectColumnNames(column_names_to_return, data, real_column_names, virt_column_names, sample_factor_column_queried); - RelativeSize relative_sample_size = 0; - RelativeSize relative_sample_offset = 0; - - auto select_sample_size = select.sampleSize(); - auto select_sample_offset = select.sampleOffset(); - - if (select_sample_size) - { - relative_sample_size.assign( - select_sample_size->as().ratio.numerator, - select_sample_size->as().ratio.denominator); - - if (relative_sample_size < 0) - throw Exception("Negative sample size", ErrorCodes::ARGUMENT_OUT_OF_BOUND); - - relative_sample_offset = 0; - if (select_sample_offset) - relative_sample_offset.assign( - select_sample_offset->as().ratio.numerator, - select_sample_offset->as().ratio.denominator); - - if (relative_sample_offset < 0) - throw Exception("Negative sample offset", ErrorCodes::ARGUMENT_OUT_OF_BOUND); - - /// Convert absolute value of the sampling (in form `SAMPLE 1000000` - how many rows to - /// read) into the relative `SAMPLE 0.1` (how much data to read). - size_t approx_total_rows = 0; - if (relative_sample_size > 1 || relative_sample_offset > 1) - approx_total_rows = getApproximateTotalRowsToRead(parts, metadata_snapshot, *key_condition, settings); //-V1007 - - if (relative_sample_size > 1) - { - relative_sample_size = convertAbsoluteSampleSizeToRelative(select_sample_size, approx_total_rows); - LOG_DEBUG(log, "Selected relative sample size: {}", toString(relative_sample_size)); - } - - /// SAMPLE 1 is the same as the absence of SAMPLE. - if (relative_sample_size == RelativeSize(1)) - relative_sample_size = 0; - - if (relative_sample_offset > 0 && RelativeSize(0) == relative_sample_size) - throw Exception("Sampling offset is incorrect because no sampling", ErrorCodes::ARGUMENT_OUT_OF_BOUND); - - if (relative_sample_offset > 1) - { - relative_sample_offset = convertAbsoluteSampleSizeToRelative(select_sample_offset, approx_total_rows); - LOG_DEBUG(log, "Selected relative sample offset: {}", toString(relative_sample_offset)); - } - } - - /** Which range of sampling key values do I need to read? - * First, in the whole range ("universe") we select the interval - * of relative `relative_sample_size` size, offset from the beginning by `relative_sample_offset`. - * - * Example: SAMPLE 0.4 OFFSET 0.3 - * - * [------********------] - * ^ - offset - * <------> - size - * - * If the interval passes through the end of the universe, then cut its right side. - * - * Example: SAMPLE 0.4 OFFSET 0.8 - * - * [----------------****] - * ^ - offset - * <------> - size - * - * Next, if the `parallel_replicas_count`, `parallel_replica_offset` settings are set, - * then it is necessary to break the received interval into pieces of the number `parallel_replicas_count`, - * and select a piece with the number `parallel_replica_offset` (from zero). - * - * Example: SAMPLE 0.4 OFFSET 0.3, parallel_replicas_count = 2, parallel_replica_offset = 1 - * - * [----------****------] - * ^ - offset - * <------> - size - * <--><--> - pieces for different `parallel_replica_offset`, select the second one. - * - * It is very important that the intervals for different `parallel_replica_offset` cover the entire range without gaps and overlaps. - * It is also important that the entire universe can be covered using SAMPLE 0.1 OFFSET 0, ... OFFSET 0.9 and similar decimals. - */ - - /// Parallel replicas has been requested but there is no way to sample data. - /// Select all data from first replica and no data from other replicas. - if (settings.parallel_replicas_count > 1 && !data.supportsSampling() && settings.parallel_replica_offset > 0) - { - LOG_DEBUG(log, "Will use no data on this replica because parallel replicas processing has been requested" - " (the setting 'max_parallel_replicas') but the table does not support sampling and this replica is not the first."); - return std::make_unique(); - } - - sampling.use_sampling = relative_sample_size > 0 || (settings.parallel_replicas_count > 1 && data.supportsSampling()); - bool no_data = false; /// There is nothing left after sampling. - - if (sampling.use_sampling) - { - if (sample_factor_column_queried && relative_sample_size != RelativeSize(0)) - used_sample_factor = 1.0 / boost::rational_cast(relative_sample_size); - - RelativeSize size_of_universum = 0; - const auto & sampling_key = metadata_snapshot->getSamplingKey(); - DataTypePtr sampling_column_type = sampling_key.data_types[0]; - - if (sampling_key.data_types.size() == 1) - { - if (typeid_cast(sampling_column_type.get())) - size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); - else if (typeid_cast(sampling_column_type.get())) - size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); - else if (typeid_cast(sampling_column_type.get())) - size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); - else if (typeid_cast(sampling_column_type.get())) - size_of_universum = RelativeSize(std::numeric_limits::max()) + RelativeSize(1); - } - - if (size_of_universum == RelativeSize(0)) - throw Exception( - "Invalid sampling column type in storage parameters: " + sampling_column_type->getName() - + ". Must be one unsigned integer type", - ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER); - - if (settings.parallel_replicas_count > 1) - { - if (relative_sample_size == RelativeSize(0)) - relative_sample_size = 1; - - relative_sample_size /= settings.parallel_replicas_count.value; - relative_sample_offset += relative_sample_size * RelativeSize(settings.parallel_replica_offset.value); - } - - if (relative_sample_offset >= RelativeSize(1)) - no_data = true; - - /// Calculate the half-interval of `[lower, upper)` column values. - bool has_lower_limit = false; - bool has_upper_limit = false; - - RelativeSize lower_limit_rational = relative_sample_offset * size_of_universum; - RelativeSize upper_limit_rational = (relative_sample_offset + relative_sample_size) * size_of_universum; - - UInt64 lower = boost::rational_cast(lower_limit_rational); - UInt64 upper = boost::rational_cast(upper_limit_rational); - - if (lower > 0) - has_lower_limit = true; - - if (upper_limit_rational < size_of_universum) - has_upper_limit = true; - - /*std::cerr << std::fixed << std::setprecision(100) - << "relative_sample_size: " << relative_sample_size << "\n" - << "relative_sample_offset: " << relative_sample_offset << "\n" - << "lower_limit_float: " << lower_limit_rational << "\n" - << "upper_limit_float: " << upper_limit_rational << "\n" - << "lower: " << lower << "\n" - << "upper: " << upper << "\n";*/ - - if ((has_upper_limit && upper == 0) - || (has_lower_limit && has_upper_limit && lower == upper)) - no_data = true; - - if (no_data || (!has_lower_limit && !has_upper_limit)) - { - sampling.use_sampling = false; - } - else - { - /// Let's add the conditions to cut off something else when the index is scanned again and when the request is processed. - - std::shared_ptr lower_function; - std::shared_ptr upper_function; - - /// If sample and final are used together no need to calculate sampling expression twice. - /// The first time it was calculated for final, because sample key is a part of the PK. - /// So, assume that we already have calculated column. - ASTPtr sampling_key_ast = metadata_snapshot->getSamplingKeyAST(); - - if (select.final()) - { - sampling_key_ast = std::make_shared(sampling_key.column_names[0]); - /// We do spoil available_real_columns here, but it is not used later. - available_real_columns.emplace_back(sampling_key.column_names[0], std::move(sampling_column_type)); - } - - if (has_lower_limit) - { - if (!key_condition->addCondition(sampling_key.column_names[0], Range::createLeftBounded(lower, true))) //-V1007 - throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN); - - ASTPtr args = std::make_shared(); - args->children.push_back(sampling_key_ast); - args->children.push_back(std::make_shared(lower)); - - lower_function = std::make_shared(); - lower_function->name = "greaterOrEquals"; - lower_function->arguments = args; - lower_function->children.push_back(lower_function->arguments); - - sampling.filter_function = lower_function; - } - - if (has_upper_limit) - { - if (!key_condition->addCondition(sampling_key.column_names[0], Range::createRightBounded(upper, false))) //-V1007 - throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN); - - ASTPtr args = std::make_shared(); - args->children.push_back(sampling_key_ast); - args->children.push_back(std::make_shared(upper)); - - upper_function = std::make_shared(); - upper_function->name = "less"; - upper_function->arguments = args; - upper_function->children.push_back(upper_function->arguments); - - sampling.filter_function = upper_function; - } - - if (has_lower_limit && has_upper_limit) - { - ASTPtr args = std::make_shared(); - args->children.push_back(lower_function); - args->children.push_back(upper_function); - - sampling.filter_function = std::make_shared(); - sampling.filter_function->name = "and"; - sampling.filter_function->arguments = args; - sampling.filter_function->children.push_back(sampling.filter_function->arguments); - } - - ASTPtr query = sampling.filter_function; - auto syntax_result = TreeRewriter(context).analyze(query, available_real_columns); - sampling.filter_expression = ExpressionAnalyzer(sampling.filter_function, syntax_result, context).getActionsDAG(false); - } - } - - if (no_data) - { - LOG_DEBUG(log, "Sampling yields no data."); - return std::make_unique(); - } - } + const auto & settings = context->getSettingsRef(); MergeTreeReaderSettings reader_settings = { @@ -1364,443 +1199,48 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts( .checksum_on_read = settings.checksum_on_read, }; - RangesInDataParts parts_with_ranges(parts.size()); - size_t sum_marks = 0; - size_t sum_ranges = 0; - - /// Let's start analyzing all useful indices - if (!use_cache) + ReadFromMergeTree::Settings step_settings { - struct DataSkippingIndexAndCondition - { - MergeTreeIndexPtr index; - MergeTreeIndexConditionPtr condition; - std::atomic total_granules{0}; - std::atomic granules_dropped{0}; - std::atomic total_parts{0}; - std::atomic parts_dropped{0}; - - DataSkippingIndexAndCondition(MergeTreeIndexPtr index_, MergeTreeIndexConditionPtr condition_) - : index(index_), condition(condition_) - { - } - }; - std::list useful_indices; - - for (const auto & index : metadata_snapshot->getSecondaryIndices()) - { - auto index_helper = MergeTreeIndexFactory::instance().get(index); - auto condition = index_helper->createIndexCondition(query_info, context); - if (!condition->alwaysUnknownOrTrue()) - useful_indices.emplace_back(index_helper, condition); - } - - if (settings.force_data_skipping_indices.changed) - { - const auto & indices = settings.force_data_skipping_indices.toString(); - - Strings forced_indices; - { - Tokens tokens(&indices[0], &indices[indices.size()], settings.max_query_size); - IParser::Pos pos(tokens, settings.max_parser_depth); - Expected expected; - if (!parseIdentifiersOrStringLiterals(pos, expected, forced_indices)) - throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Cannot parse force_data_skipping_indices ('{}')", indices); - } - - if (forced_indices.empty()) - throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "No indices parsed from force_data_skipping_indices ('{}')", indices); - - std::unordered_set useful_indices_names; - for (const auto & useful_index : useful_indices) - useful_indices_names.insert(useful_index.index->index.name); - - for (const auto & index_name : forced_indices) - { - if (!useful_indices_names.count(index_name)) - { - throw Exception( - ErrorCodes::INDEX_NOT_USED, - "Index {} is not used and setting 'force_data_skipping_indices' contains it", - backQuote(index_name)); - } - } - } - - std::atomic sum_marks_pk = 0; - std::atomic sum_parts_pk = 0; - std::atomic total_marks_pk = 0; - - /// Let's find what range to read from each part. - { - std::atomic total_rows{0}; - - SizeLimits limits; - if (settings.read_overflow_mode == OverflowMode::THROW && settings.max_rows_to_read) - limits = SizeLimits(settings.max_rows_to_read, 0, settings.read_overflow_mode); - - SizeLimits leaf_limits; - if (settings.read_overflow_mode_leaf == OverflowMode::THROW && settings.max_rows_to_read_leaf) - leaf_limits = SizeLimits(settings.max_rows_to_read_leaf, 0, settings.read_overflow_mode_leaf); - - auto process_part = [&](size_t part_index) - { - auto & part = parts[part_index]; - - RangesInDataPart ranges(part, part_index); - - size_t total_marks_count = part->getMarksCount(); - if (total_marks_count && part->index_granularity.hasFinalMark()) - --total_marks_count; - - total_marks_pk.fetch_add(total_marks_count, std::memory_order_relaxed); - - if (metadata_snapshot->hasPrimaryKey()) - ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, *key_condition, settings, log); - else if (total_marks_count) - ranges.ranges = MarkRanges{MarkRange{0, total_marks_count}}; - - sum_marks_pk.fetch_add(ranges.getMarksCount(), std::memory_order_relaxed); - - if (!ranges.ranges.empty()) - sum_parts_pk.fetch_add(1, std::memory_order_relaxed); - - for (auto & index_and_condition : useful_indices) - { - if (ranges.ranges.empty()) - break; - - index_and_condition.total_parts.fetch_add(1, std::memory_order_relaxed); - - size_t total_granules = 0; - size_t granules_dropped = 0; - ranges.ranges = filterMarksUsingIndex( - index_and_condition.index, - index_and_condition.condition, - part, - ranges.ranges, - settings, - reader_settings, - total_granules, - granules_dropped, - log); - - index_and_condition.total_granules.fetch_add(total_granules, std::memory_order_relaxed); - index_and_condition.granules_dropped.fetch_add(granules_dropped, std::memory_order_relaxed); - - if (ranges.ranges.empty()) - index_and_condition.parts_dropped.fetch_add(1, std::memory_order_relaxed); - } - - if (!ranges.ranges.empty()) - { - if (limits.max_rows || leaf_limits.max_rows) - { - /// Fail fast if estimated number of rows to read exceeds the limit - auto current_rows_estimate = ranges.getRowsCount(); - size_t prev_total_rows_estimate = total_rows.fetch_add(current_rows_estimate); - size_t total_rows_estimate = current_rows_estimate + prev_total_rows_estimate; - limits.check(total_rows_estimate, 0, "rows (controlled by 'max_rows_to_read' setting)", ErrorCodes::TOO_MANY_ROWS); - leaf_limits.check( - total_rows_estimate, 0, "rows (controlled by 'max_rows_to_read_leaf' setting)", ErrorCodes::TOO_MANY_ROWS); - } - - parts_with_ranges[part_index] = std::move(ranges); - } - }; - - size_t num_threads = std::min(size_t(num_streams), parts.size()); - - if (num_threads <= 1) - { - for (size_t part_index = 0; part_index < parts.size(); ++part_index) - process_part(part_index); - } - else - { - /// Parallel loading of data parts. - ThreadPool pool(num_threads); - - for (size_t part_index = 0; part_index < parts.size(); ++part_index) - pool.scheduleOrThrowOnError([&, part_index, thread_group = CurrentThread::getGroup()] - { - SCOPE_EXIT_SAFE(if (thread_group) CurrentThread::detachQueryIfNotDetached();); - if (thread_group) - CurrentThread::attachTo(thread_group); - - process_part(part_index); - }); - - pool.wait(); - } - - /// Skip empty ranges. - size_t next_part = 0; - for (size_t part_index = 0; part_index < parts.size(); ++part_index) - { - auto & part = parts_with_ranges[part_index]; - if (!part.data_part) - continue; - - sum_ranges += part.ranges.size(); - sum_marks += part.getMarksCount(); - - if (next_part != part_index) - std::swap(parts_with_ranges[next_part], part); - - ++next_part; - } - - parts_with_ranges.resize(next_part); - } - - if (metadata_snapshot->hasPrimaryKey()) - { - auto description = key_condition->getDescription(); - - index_stats->emplace_back(ReadFromMergeTree::IndexStat{ - .type = ReadFromMergeTree::IndexType::PrimaryKey, - .condition = std::move(description.condition), - .used_keys = std::move(description.used_keys), - .num_parts_after = sum_parts_pk.load(std::memory_order_relaxed), - .num_granules_after = sum_marks_pk.load(std::memory_order_relaxed)}); - } - - for (const auto & index_and_condition : useful_indices) - { - const auto & index_name = index_and_condition.index->index.name; - LOG_DEBUG( - log, - "Index {} has dropped {}/{} granules.", - backQuote(index_name), - index_and_condition.granules_dropped, - index_and_condition.total_granules); - - std::string description - = index_and_condition.index->index.type + " GRANULARITY " + std::to_string(index_and_condition.index->index.granularity); - - index_stats->emplace_back(ReadFromMergeTree::IndexStat{ - .type = ReadFromMergeTree::IndexType::Skip, - .name = index_name, - .description = std::move(description), - .num_parts_after = index_and_condition.total_parts - index_and_condition.parts_dropped, - .num_granules_after = index_and_condition.total_granules - index_and_condition.granules_dropped}); - } - - LOG_DEBUG( - log, - "Selected {}/{} parts by partition key, {} parts by primary key, {}/{} marks by primary key, {} marks to read from {} ranges", - parts.size(), - total_parts, - parts_with_ranges.size(), - sum_marks_pk.load(std::memory_order_relaxed), - total_marks_pk.load(std::memory_order_relaxed), - sum_marks, - sum_ranges); - } - - if (cache) - { - if (cache->use_cache) - { - parts_with_ranges = std::move(cache->parts_with_ranges); - sum_marks = cache->sum_marks; - sum_ranges = cache->sum_ranges; - } - else - { - // We are asking for ranges_to_read. Return immediately without further planning. - cache->parts_with_ranges = std::move(parts_with_ranges); - cache->sampling = std::move(sampling); - cache->index_stats = std::move(index_stats); - cache->sum_marks = sum_marks; - cache->sum_ranges = sum_ranges; - cache->use_cache = true; - return std::make_unique(); - } - } - - if (parts_with_ranges.empty()) - return std::make_unique(); - - // Check limitations. query_id is used as the quota RAII's resource key. - String query_id; - { - const auto data_settings = data.getSettings(); - auto max_partitions_to_read - = settings.max_partitions_to_read.changed ? settings.max_partitions_to_read : data_settings->max_partitions_to_read; - if (max_partitions_to_read > 0) - { - std::set partitions; - for (auto & part_with_ranges : parts_with_ranges) - partitions.insert(part_with_ranges.data_part->info.partition_id); - if (partitions.size() > size_t(max_partitions_to_read)) - throw Exception( - ErrorCodes::TOO_MANY_PARTITIONS, - "Too many partitions to read. Current {}, max {}", - partitions.size(), - max_partitions_to_read); - } - - if (data_settings->max_concurrent_queries > 0) - { - if (data_settings->min_marks_to_honor_max_concurrent_queries > 0 - && sum_marks >= data_settings->min_marks_to_honor_max_concurrent_queries) - { - query_id = context->getCurrentQueryId(); - if (!query_id.empty()) - data.insertQueryIdOrThrow(query_id, data_settings->max_concurrent_queries); - } - } - } - - ProfileEvents::increment(ProfileEvents::SelectedParts, parts_with_ranges.size()); - ProfileEvents::increment(ProfileEvents::SelectedRanges, sum_ranges); - ProfileEvents::increment(ProfileEvents::SelectedMarks, sum_marks); - - QueryPlanPtr plan; - - /// Projection, that needed to drop columns, which have appeared by execution - /// of some extra expressions, and to allow execute the same expressions later. - /// NOTE: It may lead to double computation of expressions. - ActionsDAGPtr result_projection; - - Names column_names_to_read = real_column_names; - if (!select.final() && sampling.use_sampling) - { - /// Add columns needed for `sample_by_ast` to `column_names_to_read`. - /// Skip this if final was used, because such columns were already added from PK. - std::vector add_columns = sampling.filter_expression->getRequiredColumns().getNames(); - column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end()); - std::sort(column_names_to_read.begin(), column_names_to_read.end()); - column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), - column_names_to_read.end()); - } - - const auto & input_order_info = query_info.input_order_info - ? query_info.input_order_info - : (query_info.projection ? query_info.projection->input_order_info : nullptr); - - if (select.final()) - { - /// Add columns needed to calculate the sorting expression and the sign. - std::vector add_columns = metadata_snapshot->getColumnsRequiredForSortingKey(); - column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end()); - - if (!data.merging_params.sign_column.empty()) - column_names_to_read.push_back(data.merging_params.sign_column); - if (!data.merging_params.version_column.empty()) - column_names_to_read.push_back(data.merging_params.version_column); - - std::sort(column_names_to_read.begin(), column_names_to_read.end()); - column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end()); - - plan = spreadMarkRangesAmongStreamsFinal( - std::move(parts_with_ranges), - std::move(index_stats), - num_streams, - column_names_to_read, - metadata_snapshot, - max_block_size, - settings.use_uncompressed_cache, - query_info, - virt_column_names, - settings, - reader_settings, - result_projection, - query_id); - } - else if ((settings.optimize_read_in_order || settings.optimize_aggregation_in_order) && input_order_info) - { - size_t prefix_size = input_order_info->order_key_prefix_descr.size(); - auto order_key_prefix_ast = metadata_snapshot->getSortingKey().expression_list_ast->clone(); - order_key_prefix_ast->children.resize(prefix_size); - - auto syntax_result = TreeRewriter(context).analyze(order_key_prefix_ast, metadata_snapshot->getColumns().getAllPhysical()); - auto sorting_key_prefix_expr = ExpressionAnalyzer(order_key_prefix_ast, syntax_result, context).getActionsDAG(false); - - plan = spreadMarkRangesAmongStreamsWithOrder( - std::move(parts_with_ranges), - std::move(index_stats), - num_streams, - column_names_to_read, - metadata_snapshot, - max_block_size, - settings.use_uncompressed_cache, - query_info, - sorting_key_prefix_expr, - virt_column_names, - settings, - reader_settings, - result_projection, - query_id, - input_order_info); - } - else - { - plan = spreadMarkRangesAmongStreams( - std::move(parts_with_ranges), - std::move(index_stats), - num_streams, - column_names_to_read, - metadata_snapshot, - max_block_size, - settings.use_uncompressed_cache, - query_info, - virt_column_names, - settings, - reader_settings, - query_id); - } - - if (!plan) - return std::make_unique(); - - if (sampling.use_sampling) - { - auto sampling_step = std::make_unique( - plan->getCurrentDataStream(), - sampling.filter_expression, - sampling.filter_function->getColumnName(), - false); - - sampling_step->setStepDescription("Sampling"); - plan->addStep(std::move(sampling_step)); - } - - if (result_projection) - { - auto projection_step = std::make_unique(plan->getCurrentDataStream(), result_projection); - projection_step->setStepDescription("Remove unused columns after reading from storage"); - plan->addStep(std::move(projection_step)); - } - - /// By the way, if a distributed query or query to a Merge table is made, then the `_sample_factor` column can have different values. - if (sample_factor_column_queried) - { - ColumnWithTypeAndName column; - column.name = "_sample_factor"; - column.type = std::make_shared(); - column.column = column.type->createColumnConst(0, Field(used_sample_factor)); - - auto adding_column_action = ActionsDAG::makeAddingColumnActions(std::move(column)); - - auto adding_column = std::make_unique(plan->getCurrentDataStream(), std::move(adding_column_action)); - adding_column->setStepDescription("Add _sample_factor column"); - plan->addStep(std::move(adding_column)); - } - - // TODO There seems to be no place initializing remove_columns_actions - if (query_info.prewhere_info && query_info.prewhere_info->remove_columns_actions) - { - auto expression_step = std::make_unique( - plan->getCurrentDataStream(), - query_info.prewhere_info->remove_columns_actions->getActionsDAG().clone()); - - expression_step->setStepDescription("Remove unused columns after PREWHERE"); - plan->addStep(std::move(expression_step)); - } - + .max_block_size = max_block_size, + .num_streams = num_streams, + .preferred_block_size_bytes = settings.preferred_block_size_bytes, + .preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes, + //.min_marks_for_concurrent_read = settings.min_marks_for_concurrent_read, + .use_uncompressed_cache = settings.use_uncompressed_cache, + .reader_settings = reader_settings, + .backoff_settings = MergeTreeReadPool::BackoffSettings(settings), + }; + + // const SelectQueryInfo & query_info_, + // const MergeTreeDataSelectExecutor::PartitionIdToMaxBlock * max_block_numbers_to_read_, + // ContextPtr context_, + // const MergeTreeData & data_, + // StorageMetadataPtr metadata_snapshot_, + // StorageMetadataPtr metadata_snapshot_base_, + // Names real_column_names_, + // MergeTreeData::DataPartsVector parts_, + // PrewhereInfoPtr prewhere_info_, + // Names virt_column_names_, + // Settings settings_, + // Poco::Logger * log_ + + auto read_from_merge_tree = std::make_unique( + query_info, + max_block_numbers_to_read, + context, + data, + metadata_snapshot, + metadata_snapshot_base, + real_column_names, + parts, + query_info.projection ? query_info.projection->prewhere_info : query_info.prewhere_info, + virt_column_names, + step_settings, + log + ); + + QueryPlanPtr plan = std::make_unique(); + plan->addStep(std::move(read_from_merge_tree)); return plan; } @@ -1822,638 +1262,609 @@ size_t roundRowsOrBytesToMarks( else return std::max(res, (bytes_setting + bytes_granularity - 1) / bytes_granularity); } -/// Same as roundRowsOrBytesToMarks() but do not return more then max_marks -size_t minMarksForConcurrentRead( - size_t rows_setting, - size_t bytes_setting, - size_t rows_granularity, - size_t bytes_granularity, - size_t max_marks) -{ - size_t marks = 1; - - if (rows_setting + rows_granularity <= rows_setting) /// overflow - marks = max_marks; - else if (rows_setting) - marks = (rows_setting + rows_granularity - 1) / rows_granularity; - - if (bytes_granularity == 0) - return marks; - else - { - /// Overflow - if (bytes_setting + bytes_granularity <= bytes_setting) /// overflow - return max_marks; - if (bytes_setting) - return std::max(marks, (bytes_setting + bytes_granularity - 1) / bytes_granularity); - else - return marks; - } -} } -QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams( - RangesInDataParts && parts, - ReadFromMergeTree::IndexStatPtr index_stats, - size_t num_streams, - const Names & column_names, - const StorageMetadataPtr & metadata_snapshot, - UInt64 max_block_size, - bool use_uncompressed_cache, - const SelectQueryInfo & query_info, - const Names & virt_columns, - const Settings & settings, - const MergeTreeReaderSettings & reader_settings, - const String & query_id) const -{ - /// Count marks for each part. - std::vector sum_marks_in_parts(parts.size()); - size_t sum_marks = 0; - size_t total_rows = 0; - - const auto data_settings = data.getSettings(); - size_t adaptive_parts = 0; - for (size_t i = 0; i < parts.size(); ++i) - { - total_rows += parts[i].getRowsCount(); - sum_marks_in_parts[i] = parts[i].getMarksCount(); - sum_marks += sum_marks_in_parts[i]; - - if (parts[i].data_part->index_granularity_info.is_adaptive) - ++adaptive_parts; - } - - size_t index_granularity_bytes = 0; - if (adaptive_parts > parts.size() / 2) - index_granularity_bytes = data_settings->index_granularity_bytes; - - const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks( - settings.merge_tree_max_rows_to_use_cache, - settings.merge_tree_max_bytes_to_use_cache, - data_settings->index_granularity, - index_granularity_bytes); - - const size_t min_marks_for_concurrent_read = minMarksForConcurrentRead( - settings.merge_tree_min_rows_for_concurrent_read, - settings.merge_tree_min_bytes_for_concurrent_read, - data_settings->index_granularity, - index_granularity_bytes, - sum_marks); - - if (sum_marks > max_marks_to_use_cache) - use_uncompressed_cache = false; - - if (0 == sum_marks) - return {}; - - ReadFromMergeTree::Settings step_settings - { - .max_block_size = max_block_size, - .preferred_block_size_bytes = settings.preferred_block_size_bytes, - .preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes, - .min_marks_for_concurrent_read = min_marks_for_concurrent_read, - .use_uncompressed_cache = use_uncompressed_cache, - .reader_settings = reader_settings, - .backoff_settings = MergeTreeReadPool::BackoffSettings(settings), - }; - - if (num_streams > 1) - { - /// Reduce the number of num_streams if the data is small. - if (sum_marks < num_streams * min_marks_for_concurrent_read && parts.size() < num_streams) - num_streams = std::max((sum_marks + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, parts.size()); - } - - auto plan = std::make_unique(); - auto step = std::make_unique( - data, - metadata_snapshot, - query_id, - column_names, - std::move(parts), - std::move(index_stats), - query_info.projection ? query_info.projection->prewhere_info : query_info.prewhere_info, - virt_columns, - step_settings, - num_streams, - ReadFromMergeTree::ReadType::Default); - - plan->addStep(std::move(step)); - return plan; -} - -static ActionsDAGPtr createProjection(const Block & header) -{ - auto projection = std::make_shared(header.getNamesAndTypesList()); - projection->removeUnusedActions(header.getNames()); - projection->projectInput(); - return projection; -} - -QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( - RangesInDataParts && parts, - ReadFromMergeTree::IndexStatPtr index_stats, - size_t num_streams, - const Names & column_names, - const StorageMetadataPtr & metadata_snapshot, - UInt64 max_block_size, - bool use_uncompressed_cache, - const SelectQueryInfo & query_info, - const ActionsDAGPtr & sorting_key_prefix_expr, - const Names & virt_columns, - const Settings & settings, - const MergeTreeReaderSettings & reader_settings, - ActionsDAGPtr & out_projection, - const String & query_id, - const InputOrderInfoPtr & input_order_info) const -{ - size_t sum_marks = 0; - size_t adaptive_parts = 0; - std::vector sum_marks_in_parts(parts.size()); - const auto data_settings = data.getSettings(); - - for (size_t i = 0; i < parts.size(); ++i) - { - sum_marks_in_parts[i] = parts[i].getMarksCount(); - sum_marks += sum_marks_in_parts[i]; - - if (parts[i].data_part->index_granularity_info.is_adaptive) - ++adaptive_parts; - } - - size_t index_granularity_bytes = 0; - if (adaptive_parts > parts.size() / 2) - index_granularity_bytes = data_settings->index_granularity_bytes; - - const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks( - settings.merge_tree_max_rows_to_use_cache, - settings.merge_tree_max_bytes_to_use_cache, - data_settings->index_granularity, - index_granularity_bytes); - - const size_t min_marks_for_concurrent_read = minMarksForConcurrentRead( - settings.merge_tree_min_rows_for_concurrent_read, - settings.merge_tree_min_bytes_for_concurrent_read, - data_settings->index_granularity, - index_granularity_bytes, - sum_marks); - - if (sum_marks > max_marks_to_use_cache) - use_uncompressed_cache = false; - - Pipes res; - - if (sum_marks == 0) - return {}; - - /// Let's split ranges to avoid reading much data. - auto split_ranges = [rows_granularity = data_settings->index_granularity, max_block_size](const auto & ranges, int direction) - { - MarkRanges new_ranges; - const size_t max_marks_in_range = (max_block_size + rows_granularity - 1) / rows_granularity; - size_t marks_in_range = 1; - - if (direction == 1) - { - /// Split first few ranges to avoid reading much data. - bool split = false; - for (auto range : ranges) - { - while (!split && range.begin + marks_in_range < range.end) - { - new_ranges.emplace_back(range.begin, range.begin + marks_in_range); - range.begin += marks_in_range; - marks_in_range *= 2; - - if (marks_in_range > max_marks_in_range) - split = true; - } - new_ranges.emplace_back(range.begin, range.end); - } - } - else - { - /// Split all ranges to avoid reading much data, because we have to - /// store whole range in memory to reverse it. - for (auto it = ranges.rbegin(); it != ranges.rend(); ++it) - { - auto range = *it; - while (range.begin + marks_in_range < range.end) - { - new_ranges.emplace_front(range.end - marks_in_range, range.end); - range.end -= marks_in_range; - marks_in_range = std::min(marks_in_range * 2, max_marks_in_range); - } - new_ranges.emplace_front(range.begin, range.end); - } - } - - return new_ranges; - }; - - const size_t min_marks_per_stream = (sum_marks - 1) / num_streams + 1; - bool need_preliminary_merge = (parts.size() > settings.read_in_order_two_level_merge_threshold); - - std::vector plans; - - for (size_t i = 0; i < num_streams && !parts.empty(); ++i) - { - size_t need_marks = min_marks_per_stream; - RangesInDataParts new_parts; - - /// Loop over parts. - /// We will iteratively take part or some subrange of a part from the back - /// and assign a stream to read from it. - while (need_marks > 0 && !parts.empty()) - { - RangesInDataPart part = parts.back(); - parts.pop_back(); - - size_t & marks_in_part = sum_marks_in_parts.back(); - - /// We will not take too few rows from a part. - if (marks_in_part >= min_marks_for_concurrent_read && - need_marks < min_marks_for_concurrent_read) - need_marks = min_marks_for_concurrent_read; - - /// Do not leave too few rows in the part. - if (marks_in_part > need_marks && - marks_in_part - need_marks < min_marks_for_concurrent_read) - need_marks = marks_in_part; - - MarkRanges ranges_to_get_from_part; - - /// We take the whole part if it is small enough. - if (marks_in_part <= need_marks) - { - ranges_to_get_from_part = part.ranges; - - need_marks -= marks_in_part; - sum_marks_in_parts.pop_back(); - } - else - { - /// Loop through ranges in part. Take enough ranges to cover "need_marks". - while (need_marks > 0) - { - if (part.ranges.empty()) - throw Exception("Unexpected end of ranges while spreading marks among streams", ErrorCodes::LOGICAL_ERROR); - - MarkRange & range = part.ranges.front(); - - const size_t marks_in_range = range.end - range.begin; - const size_t marks_to_get_from_range = std::min(marks_in_range, need_marks); - - ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range); - range.begin += marks_to_get_from_range; - marks_in_part -= marks_to_get_from_range; - need_marks -= marks_to_get_from_range; - if (range.begin == range.end) - part.ranges.pop_front(); - } - parts.emplace_back(part); - } - ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_order_info->direction); - new_parts.emplace_back(part.data_part, part.part_index_in_query, std::move(ranges_to_get_from_part)); - } - - ReadFromMergeTree::Settings step_settings - { - .max_block_size = max_block_size, - .preferred_block_size_bytes = settings.preferred_block_size_bytes, - .preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes, - .min_marks_for_concurrent_read = min_marks_for_concurrent_read, - .use_uncompressed_cache = use_uncompressed_cache, - .reader_settings = reader_settings, - .backoff_settings = MergeTreeReadPool::BackoffSettings(settings), - }; - - auto read_type = input_order_info->direction == 1 - ? ReadFromMergeTree::ReadType::InOrder - : ReadFromMergeTree::ReadType::InReverseOrder; - - auto plan = std::make_unique(); - auto step = std::make_unique( - data, - metadata_snapshot, - query_id, - column_names, - std::move(new_parts), - std::move(index_stats), - query_info.projection ? query_info.projection->prewhere_info : query_info.prewhere_info, - virt_columns, - step_settings, - num_streams, - read_type); - - plan->addStep(std::move(step)); - plans.emplace_back(std::move(plan)); - } - - if (need_preliminary_merge) - { - SortDescription sort_description; - for (size_t j = 0; j < input_order_info->order_key_prefix_descr.size(); ++j) - sort_description.emplace_back(metadata_snapshot->getSortingKey().column_names[j], - input_order_info->direction, 1); - - for (auto & plan : plans) - { - /// Drop temporary columns, added by 'sorting_key_prefix_expr' - out_projection = createProjection(plan->getCurrentDataStream().header); - - auto expression_step = std::make_unique( - plan->getCurrentDataStream(), - sorting_key_prefix_expr); - - expression_step->setStepDescription("Calculate sorting key prefix"); - plan->addStep(std::move(expression_step)); - - auto merging_sorted = std::make_unique( - plan->getCurrentDataStream(), - sort_description, - max_block_size); - - merging_sorted->setStepDescription("Merge sorting mark ranges"); - plan->addStep(std::move(merging_sorted)); - } - } - - if (plans.size() == 1) - return std::move(plans.front()); - - DataStreams input_streams; - for (const auto & plan : plans) - input_streams.emplace_back(plan->getCurrentDataStream()); - - auto union_step = std::make_unique(std::move(input_streams)); - - auto plan = std::make_unique(); - plan->unitePlans(std::move(union_step), std::move(plans)); - - return plan; -} - - -QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( - RangesInDataParts && parts, - ReadFromMergeTree::IndexStatPtr index_stats, - size_t num_streams, - const Names & column_names, - const StorageMetadataPtr & metadata_snapshot, - UInt64 max_block_size, - bool use_uncompressed_cache, - const SelectQueryInfo & query_info, - const Names & virt_columns, - const Settings & settings, - const MergeTreeReaderSettings & reader_settings, - ActionsDAGPtr & out_projection, - const String & query_id) const -{ - const auto data_settings = data.getSettings(); - size_t sum_marks = 0; - size_t adaptive_parts = 0; - for (const auto & part : parts) - { - for (const auto & range : part.ranges) - sum_marks += range.end - range.begin; - - if (part.data_part->index_granularity_info.is_adaptive) - ++adaptive_parts; - } - - size_t index_granularity_bytes = 0; - if (adaptive_parts >= parts.size() / 2) - index_granularity_bytes = data_settings->index_granularity_bytes; - - const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks( - settings.merge_tree_max_rows_to_use_cache, - settings.merge_tree_max_bytes_to_use_cache, - data_settings->index_granularity, - index_granularity_bytes); - - if (sum_marks > max_marks_to_use_cache) - use_uncompressed_cache = false; - - if (num_streams > settings.max_final_threads) - num_streams = settings.max_final_threads; - - /// If setting do_not_merge_across_partitions_select_final is true than we won't merge parts from different partitions. - /// We have all parts in parts vector, where parts with same partition are nearby. - /// So we will store iterators pointed to the beginning of each partition range (and parts.end()), - /// then we will create a pipe for each partition that will run selecting processor and merging processor - /// for the parts with this partition. In the end we will unite all the pipes. - std::vector parts_to_merge_ranges; - auto it = parts.begin(); - parts_to_merge_ranges.push_back(it); - - if (settings.do_not_merge_across_partitions_select_final) - { - while (it != parts.end()) - { - it = std::find_if( - it, parts.end(), [&it](auto & part) { return it->data_part->info.partition_id != part.data_part->info.partition_id; }); - parts_to_merge_ranges.push_back(it); - } - /// We divide threads for each partition equally. But we will create at least the number of partitions threads. - /// (So, the total number of threads could be more than initial num_streams. - num_streams /= (parts_to_merge_ranges.size() - 1); - } - else - { - /// If do_not_merge_across_partitions_select_final is false we just merge all the parts. - parts_to_merge_ranges.push_back(parts.end()); - } - - std::vector partition_plans; - - /// If do_not_merge_across_partitions_select_final is true and num_streams > 1 - /// we will store lonely parts with level > 0 to use parallel select on them. - std::vector lonely_parts; - size_t total_rows_in_lonely_parts = 0; - size_t sum_marks_in_lonely_parts = 0; - - for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index) - { - QueryPlanPtr plan; - - { - RangesInDataParts new_parts; - - /// If do_not_merge_across_partitions_select_final is true and there is only one part in partition - /// with level > 0 then we won't postprocess this part and if num_streams > 1 we - /// can use parallel select on such parts. We save such parts in one vector and then use - /// MergeTreeReadPool and MergeTreeThreadSelectBlockInputProcessor for parallel select. - if (num_streams > 1 && settings.do_not_merge_across_partitions_select_final && - std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 && - parts_to_merge_ranges[range_index]->data_part->info.level > 0) - { - total_rows_in_lonely_parts += parts_to_merge_ranges[range_index]->getRowsCount(); - sum_marks_in_lonely_parts += parts_to_merge_ranges[range_index]->getMarksCount(); - lonely_parts.push_back(std::move(*parts_to_merge_ranges[range_index])); - continue; - } - else - { - for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it) - { - new_parts.emplace_back(part_it->data_part, part_it->part_index_in_query, part_it->ranges); - } - } - - if (new_parts.empty()) - continue; - - ReadFromMergeTree::Settings step_settings - { - .max_block_size = max_block_size, - .preferred_block_size_bytes = settings.preferred_block_size_bytes, - .preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes, - .min_marks_for_concurrent_read = 0, /// this setting is not used for reading in order - .use_uncompressed_cache = use_uncompressed_cache, - .reader_settings = reader_settings, - .backoff_settings = MergeTreeReadPool::BackoffSettings(settings), - }; - - plan = std::make_unique(); - auto step = std::make_unique( - data, - metadata_snapshot, - query_id, - column_names, - std::move(new_parts), - std::move(index_stats), - query_info.projection ? query_info.projection->prewhere_info : query_info.prewhere_info, - virt_columns, - step_settings, - num_streams, - ReadFromMergeTree::ReadType::InOrder); - - plan->addStep(std::move(step)); - - /// Drop temporary columns, added by 'sorting_key_expr' - if (!out_projection) - out_projection = createProjection(plan->getCurrentDataStream().header); - } - - auto expression_step = std::make_unique( - plan->getCurrentDataStream(), - metadata_snapshot->getSortingKey().expression->getActionsDAG().clone()); - - expression_step->setStepDescription("Calculate sorting key expression"); - plan->addStep(std::move(expression_step)); - - /// If do_not_merge_across_partitions_select_final is true and there is only one part in partition - /// with level > 0 then we won't postprocess this part - if (settings.do_not_merge_across_partitions_select_final && - std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 && - parts_to_merge_ranges[range_index]->data_part->info.level > 0) - { - partition_plans.emplace_back(std::move(plan)); - continue; - } - - Names sort_columns = metadata_snapshot->getSortingKeyColumns(); - SortDescription sort_description; - size_t sort_columns_size = sort_columns.size(); - sort_description.reserve(sort_columns_size); - - Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names; - - const auto & header = plan->getCurrentDataStream().header; - for (size_t i = 0; i < sort_columns_size; ++i) - sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1); - - auto final_step = std::make_unique( - plan->getCurrentDataStream(), - std::min(num_streams, settings.max_final_threads), - sort_description, - data.merging_params, - partition_key_columns, - max_block_size); - - final_step->setStepDescription("Merge rows for FINAL"); - plan->addStep(std::move(final_step)); - - partition_plans.emplace_back(std::move(plan)); - } - - if (!lonely_parts.empty()) - { - RangesInDataParts new_parts; - - size_t num_streams_for_lonely_parts = num_streams * lonely_parts.size(); - - const size_t min_marks_for_concurrent_read = minMarksForConcurrentRead( - settings.merge_tree_min_rows_for_concurrent_read, - settings.merge_tree_min_bytes_for_concurrent_read, - data_settings->index_granularity, - index_granularity_bytes, - sum_marks_in_lonely_parts); - - /// Reduce the number of num_streams_for_lonely_parts if the data is small. - if (sum_marks_in_lonely_parts < num_streams_for_lonely_parts * min_marks_for_concurrent_read && lonely_parts.size() < num_streams_for_lonely_parts) - num_streams_for_lonely_parts = std::max((sum_marks_in_lonely_parts + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, lonely_parts.size()); - - ReadFromMergeTree::Settings step_settings - { - .max_block_size = max_block_size, - .preferred_block_size_bytes = settings.preferred_block_size_bytes, - .preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes, - .min_marks_for_concurrent_read = min_marks_for_concurrent_read, - .use_uncompressed_cache = use_uncompressed_cache, - .reader_settings = reader_settings, - .backoff_settings = MergeTreeReadPool::BackoffSettings(settings), - }; - - auto plan = std::make_unique(); - auto step = std::make_unique( - data, - metadata_snapshot, - query_id, - column_names, - std::move(lonely_parts), - std::move(index_stats), - query_info.projection ? query_info.projection->prewhere_info : query_info.prewhere_info, - virt_columns, - step_settings, - num_streams_for_lonely_parts, - ReadFromMergeTree::ReadType::Default); - - plan->addStep(std::move(step)); - - /// Drop temporary columns, added by 'sorting_key_expr' - if (!out_projection) - out_projection = createProjection(plan->getCurrentDataStream().header); - - auto expression_step = std::make_unique( - plan->getCurrentDataStream(), - metadata_snapshot->getSortingKey().expression->getActionsDAG().clone()); - - expression_step->setStepDescription("Calculate sorting key expression"); - plan->addStep(std::move(expression_step)); - - partition_plans.emplace_back(std::move(plan)); - } - - if (partition_plans.empty()) - return {}; - - if (partition_plans.size() == 1) - return std::move(partition_plans.front()); - - auto result_header = partition_plans.front()->getCurrentDataStream().header; - DataStreams input_streams; - for (const auto & partition_plan : partition_plans) - input_streams.push_back(partition_plan->getCurrentDataStream()); - - auto union_step = std::make_unique(std::move(input_streams), result_header); - union_step->setStepDescription("Unite sources after FINAL"); - QueryPlanPtr plan = std::make_unique(); - plan->unitePlans(std::move(union_step), std::move(partition_plans)); - return plan; -} +// QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams( +// RangesInDataParts && parts, +// // ReadFromMergeTree::IndexStatPtr index_stats, +// size_t num_streams, +// const Names & column_names, +// const StorageMetadataPtr & metadata_snapshot, +// UInt64 max_block_size, +// bool use_uncompressed_cache, +// const SelectQueryInfo & query_info, +// const Names & virt_columns, +// const Settings & settings, +// const MergeTreeReaderSettings & reader_settings, +// const String & query_id) const +// { +// /// Count marks for each part. +// std::vector sum_marks_in_parts(parts.size()); +// size_t sum_marks = 0; +// size_t total_rows = 0; + +// const auto data_settings = data.getSettings(); +// size_t adaptive_parts = 0; +// for (size_t i = 0; i < parts.size(); ++i) +// { +// total_rows += parts[i].getRowsCount(); +// sum_marks_in_parts[i] = parts[i].getMarksCount(); +// sum_marks += sum_marks_in_parts[i]; + +// if (parts[i].data_part->index_granularity_info.is_adaptive) +// ++adaptive_parts; +// } + +// size_t index_granularity_bytes = 0; +// if (adaptive_parts > parts.size() / 2) +// index_granularity_bytes = data_settings->index_granularity_bytes; + +// const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks( +// settings.merge_tree_max_rows_to_use_cache, +// settings.merge_tree_max_bytes_to_use_cache, +// data_settings->index_granularity, +// index_granularity_bytes); + +// const size_t min_marks_for_concurrent_read = minMarksForConcurrentRead( +// settings.merge_tree_min_rows_for_concurrent_read, +// settings.merge_tree_min_bytes_for_concurrent_read, +// data_settings->index_granularity, +// index_granularity_bytes, +// sum_marks); + +// if (sum_marks > max_marks_to_use_cache) +// use_uncompressed_cache = false; + +// if (0 == sum_marks) +// return {}; + +// ReadFromMergeTree::Settings step_settings +// { +// .max_block_size = max_block_size, +// .preferred_block_size_bytes = settings.preferred_block_size_bytes, +// .preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes, +// .min_marks_for_concurrent_read = min_marks_for_concurrent_read, +// .use_uncompressed_cache = use_uncompressed_cache, +// .reader_settings = reader_settings, +// .backoff_settings = MergeTreeReadPool::BackoffSettings(settings), +// }; + +// if (num_streams > 1) +// { +// /// Reduce the number of num_streams if the data is small. +// if (sum_marks < num_streams * min_marks_for_concurrent_read && parts.size() < num_streams) +// num_streams = std::max((sum_marks + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, parts.size()); +// } + +// auto plan = std::make_unique(); +// auto step = std::make_unique( +// data, +// metadata_snapshot, +// query_id, +// column_names, +// std::move(parts), +// // std::move(index_stats), +// query_info.projection ? query_info.projection->prewhere_info : query_info.prewhere_info, +// virt_columns, +// step_settings, +// num_streams, +// ReadFromMergeTree::ReadType::Default); + +// plan->addStep(std::move(step)); +// return plan; +// } + +// static ActionsDAGPtr createProjection(const Block & header) +// { +// auto projection = std::make_shared(header.getNamesAndTypesList()); +// projection->removeUnusedActions(header.getNames()); +// projection->projectInput(); +// return projection; +// } + +// QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder( +// RangesInDataParts && parts, +// // ReadFromMergeTree::IndexStatPtr index_stats, +// size_t num_streams, +// const Names & column_names, +// const StorageMetadataPtr & metadata_snapshot, +// UInt64 max_block_size, +// bool use_uncompressed_cache, +// const SelectQueryInfo & query_info, +// const ActionsDAGPtr & sorting_key_prefix_expr, +// const Names & virt_columns, +// const Settings & settings, +// const MergeTreeReaderSettings & reader_settings, +// ActionsDAGPtr & out_projection, +// const String & query_id, +// const InputOrderInfoPtr & input_order_info) const +// { +// size_t sum_marks = 0; +// size_t adaptive_parts = 0; +// std::vector sum_marks_in_parts(parts.size()); +// const auto data_settings = data.getSettings(); + +// for (size_t i = 0; i < parts.size(); ++i) +// { +// sum_marks_in_parts[i] = parts[i].getMarksCount(); +// sum_marks += sum_marks_in_parts[i]; + +// if (parts[i].data_part->index_granularity_info.is_adaptive) +// ++adaptive_parts; +// } + +// size_t index_granularity_bytes = 0; +// if (adaptive_parts > parts.size() / 2) +// index_granularity_bytes = data_settings->index_granularity_bytes; + +// const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks( +// settings.merge_tree_max_rows_to_use_cache, +// settings.merge_tree_max_bytes_to_use_cache, +// data_settings->index_granularity, +// index_granularity_bytes); + +// const size_t min_marks_for_concurrent_read = minMarksForConcurrentRead( +// settings.merge_tree_min_rows_for_concurrent_read, +// settings.merge_tree_min_bytes_for_concurrent_read, +// data_settings->index_granularity, +// index_granularity_bytes, +// sum_marks); + +// if (sum_marks > max_marks_to_use_cache) +// use_uncompressed_cache = false; + +// Pipes res; + +// if (sum_marks == 0) +// return {}; + +// /// Let's split ranges to avoid reading much data. +// auto split_ranges = [rows_granularity = data_settings->index_granularity, max_block_size](const auto & ranges, int direction) +// { +// MarkRanges new_ranges; +// const size_t max_marks_in_range = (max_block_size + rows_granularity - 1) / rows_granularity; +// size_t marks_in_range = 1; + +// if (direction == 1) +// { +// /// Split first few ranges to avoid reading much data. +// bool split = false; +// for (auto range : ranges) +// { +// while (!split && range.begin + marks_in_range < range.end) +// { +// new_ranges.emplace_back(range.begin, range.begin + marks_in_range); +// range.begin += marks_in_range; +// marks_in_range *= 2; + +// if (marks_in_range > max_marks_in_range) +// split = true; +// } +// new_ranges.emplace_back(range.begin, range.end); +// } +// } +// else +// { +// /// Split all ranges to avoid reading much data, because we have to +// /// store whole range in memory to reverse it. +// for (auto it = ranges.rbegin(); it != ranges.rend(); ++it) +// { +// auto range = *it; +// while (range.begin + marks_in_range < range.end) +// { +// new_ranges.emplace_front(range.end - marks_in_range, range.end); +// range.end -= marks_in_range; +// marks_in_range = std::min(marks_in_range * 2, max_marks_in_range); +// } +// new_ranges.emplace_front(range.begin, range.end); +// } +// } + +// return new_ranges; +// }; + +// const size_t min_marks_per_stream = (sum_marks - 1) / num_streams + 1; +// bool need_preliminary_merge = (parts.size() > settings.read_in_order_two_level_merge_threshold); + +// std::vector plans; + +// for (size_t i = 0; i < num_streams && !parts.empty(); ++i) +// { +// size_t need_marks = min_marks_per_stream; +// RangesInDataParts new_parts; + +// /// Loop over parts. +// /// We will iteratively take part or some subrange of a part from the back +// /// and assign a stream to read from it. +// while (need_marks > 0 && !parts.empty()) +// { +// RangesInDataPart part = parts.back(); +// parts.pop_back(); + +// size_t & marks_in_part = sum_marks_in_parts.back(); + +// /// We will not take too few rows from a part. +// if (marks_in_part >= min_marks_for_concurrent_read && +// need_marks < min_marks_for_concurrent_read) +// need_marks = min_marks_for_concurrent_read; + +// /// Do not leave too few rows in the part. +// if (marks_in_part > need_marks && +// marks_in_part - need_marks < min_marks_for_concurrent_read) +// need_marks = marks_in_part; + +// MarkRanges ranges_to_get_from_part; + +// /// We take the whole part if it is small enough. +// if (marks_in_part <= need_marks) +// { +// ranges_to_get_from_part = part.ranges; + +// need_marks -= marks_in_part; +// sum_marks_in_parts.pop_back(); +// } +// else +// { +// /// Loop through ranges in part. Take enough ranges to cover "need_marks". +// while (need_marks > 0) +// { +// if (part.ranges.empty()) +// throw Exception("Unexpected end of ranges while spreading marks among streams", ErrorCodes::LOGICAL_ERROR); + +// MarkRange & range = part.ranges.front(); + +// const size_t marks_in_range = range.end - range.begin; +// const size_t marks_to_get_from_range = std::min(marks_in_range, need_marks); + +// ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range); +// range.begin += marks_to_get_from_range; +// marks_in_part -= marks_to_get_from_range; +// need_marks -= marks_to_get_from_range; +// if (range.begin == range.end) +// part.ranges.pop_front(); +// } +// parts.emplace_back(part); +// } +// ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_order_info->direction); +// new_parts.emplace_back(part.data_part, part.part_index_in_query, std::move(ranges_to_get_from_part)); +// } + +// ReadFromMergeTree::Settings step_settings +// { +// .max_block_size = max_block_size, +// .preferred_block_size_bytes = settings.preferred_block_size_bytes, +// .preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes, +// .min_marks_for_concurrent_read = min_marks_for_concurrent_read, +// .use_uncompressed_cache = use_uncompressed_cache, +// .reader_settings = reader_settings, +// .backoff_settings = MergeTreeReadPool::BackoffSettings(settings), +// }; + +// auto read_type = input_order_info->direction == 1 +// ? ReadFromMergeTree::ReadType::InOrder +// : ReadFromMergeTree::ReadType::InReverseOrder; + +// auto plan = std::make_unique(); +// auto step = std::make_unique( +// data, +// metadata_snapshot, +// query_id, +// column_names, +// std::move(new_parts), +// // std::move(index_stats), +// query_info.projection ? query_info.projection->prewhere_info : query_info.prewhere_info, +// virt_columns, +// step_settings, +// num_streams, +// read_type); + +// plan->addStep(std::move(step)); +// plans.emplace_back(std::move(plan)); +// } + +// if (need_preliminary_merge) +// { +// SortDescription sort_description; +// for (size_t j = 0; j < input_order_info->order_key_prefix_descr.size(); ++j) +// sort_description.emplace_back(metadata_snapshot->getSortingKey().column_names[j], +// input_order_info->direction, 1); + +// for (auto & plan : plans) +// { +// /// Drop temporary columns, added by 'sorting_key_prefix_expr' +// out_projection = createProjection(plan->getCurrentDataStream().header); + +// auto expression_step = std::make_unique( +// plan->getCurrentDataStream(), +// sorting_key_prefix_expr); + +// expression_step->setStepDescription("Calculate sorting key prefix"); +// plan->addStep(std::move(expression_step)); + +// auto merging_sorted = std::make_unique( +// plan->getCurrentDataStream(), +// sort_description, +// max_block_size); + +// merging_sorted->setStepDescription("Merge sorting mark ranges"); +// plan->addStep(std::move(merging_sorted)); +// } +// } + +// if (plans.size() == 1) +// return std::move(plans.front()); + +// DataStreams input_streams; +// for (const auto & plan : plans) +// input_streams.emplace_back(plan->getCurrentDataStream()); + +// auto union_step = std::make_unique(std::move(input_streams)); + +// auto plan = std::make_unique(); +// plan->unitePlans(std::move(union_step), std::move(plans)); + +// return plan; +// } + + +// QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( +// RangesInDataParts && parts, +// size_t num_streams, +// const Names & column_names, +// const StorageMetadataPtr & metadata_snapshot, +// UInt64 max_block_size, +// bool use_uncompressed_cache, +// const SelectQueryInfo & query_info, +// const Names & virt_columns, +// const Settings & settings, +// const MergeTreeReaderSettings & reader_settings, +// ActionsDAGPtr & out_projection, +// const String & query_id) const +// { +// const auto data_settings = data.getSettings(); +// size_t sum_marks = 0; +// size_t adaptive_parts = 0; +// for (const auto & part : parts) +// { +// for (const auto & range : part.ranges) +// sum_marks += range.end - range.begin; + +// if (part.data_part->index_granularity_info.is_adaptive) +// ++adaptive_parts; +// } + +// size_t index_granularity_bytes = 0; +// if (adaptive_parts >= parts.size() / 2) +// index_granularity_bytes = data_settings->index_granularity_bytes; + +// const size_t max_marks_to_use_cache = roundRowsOrBytesToMarks( +// settings.merge_tree_max_rows_to_use_cache, +// settings.merge_tree_max_bytes_to_use_cache, +// data_settings->index_granularity, +// index_granularity_bytes); + +// if (sum_marks > max_marks_to_use_cache) +// use_uncompressed_cache = false; + +// if (num_streams > settings.max_final_threads) +// num_streams = settings.max_final_threads; + +// /// If setting do_not_merge_across_partitions_select_final is true than we won't merge parts from different partitions. +// /// We have all parts in parts vector, where parts with same partition are nearby. +// /// So we will store iterators pointed to the beginning of each partition range (and parts.end()), +// /// then we will create a pipe for each partition that will run selecting processor and merging processor +// /// for the parts with this partition. In the end we will unite all the pipes. +// std::vector parts_to_merge_ranges; +// auto it = parts.begin(); +// parts_to_merge_ranges.push_back(it); + +// if (settings.do_not_merge_across_partitions_select_final) +// { +// while (it != parts.end()) +// { +// it = std::find_if( +// it, parts.end(), [&it](auto & part) { return it->data_part->info.partition_id != part.data_part->info.partition_id; }); +// parts_to_merge_ranges.push_back(it); +// } +// /// We divide threads for each partition equally. But we will create at least the number of partitions threads. +// /// (So, the total number of threads could be more than initial num_streams. +// num_streams /= (parts_to_merge_ranges.size() - 1); +// } +// else +// { +// /// If do_not_merge_across_partitions_select_final is false we just merge all the parts. +// parts_to_merge_ranges.push_back(parts.end()); +// } + +// std::vector partition_plans; + +// /// If do_not_merge_across_partitions_select_final is true and num_streams > 1 +// /// we will store lonely parts with level > 0 to use parallel select on them. +// std::vector lonely_parts; +// size_t total_rows_in_lonely_parts = 0; +// size_t sum_marks_in_lonely_parts = 0; + +// for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index) +// { +// QueryPlanPtr plan; + +// { +// RangesInDataParts new_parts; + +// /// If do_not_merge_across_partitions_select_final is true and there is only one part in partition +// /// with level > 0 then we won't postprocess this part and if num_streams > 1 we +// /// can use parallel select on such parts. We save such parts in one vector and then use +// /// MergeTreeReadPool and MergeTreeThreadSelectBlockInputProcessor for parallel select. +// if (num_streams > 1 && settings.do_not_merge_across_partitions_select_final && +// std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 && +// parts_to_merge_ranges[range_index]->data_part->info.level > 0) +// { +// total_rows_in_lonely_parts += parts_to_merge_ranges[range_index]->getRowsCount(); +// sum_marks_in_lonely_parts += parts_to_merge_ranges[range_index]->getMarksCount(); +// lonely_parts.push_back(std::move(*parts_to_merge_ranges[range_index])); +// continue; +// } +// else +// { +// for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it) +// { +// new_parts.emplace_back(part_it->data_part, part_it->part_index_in_query, part_it->ranges); +// } +// } + +// if (new_parts.empty()) +// continue; + +// ReadFromMergeTree::Settings step_settings +// { +// .max_block_size = max_block_size, +// .preferred_block_size_bytes = settings.preferred_block_size_bytes, +// .preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes, +// .min_marks_for_concurrent_read = 0, /// this setting is not used for reading in order +// .use_uncompressed_cache = use_uncompressed_cache, +// .reader_settings = reader_settings, +// .backoff_settings = MergeTreeReadPool::BackoffSettings(settings), +// }; + +// plan = std::make_unique(); +// auto step = std::make_unique( +// data, +// metadata_snapshot, +// query_id, +// column_names, +// std::move(new_parts), +// // std::move(index_stats), +// query_info.projection ? query_info.projection->prewhere_info : query_info.prewhere_info, +// virt_columns, +// step_settings, +// num_streams, +// ReadFromMergeTree::ReadType::InOrder); + +// plan->addStep(std::move(step)); + +// /// Drop temporary columns, added by 'sorting_key_expr' +// if (!out_projection) +// out_projection = createProjection(plan->getCurrentDataStream().header); +// } + +// auto expression_step = std::make_unique( +// plan->getCurrentDataStream(), +// metadata_snapshot->getSortingKey().expression->getActionsDAG().clone()); + +// expression_step->setStepDescription("Calculate sorting key expression"); +// plan->addStep(std::move(expression_step)); + +// /// If do_not_merge_across_partitions_select_final is true and there is only one part in partition +// /// with level > 0 then we won't postprocess this part +// if (settings.do_not_merge_across_partitions_select_final && +// std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 && +// parts_to_merge_ranges[range_index]->data_part->info.level > 0) +// { +// partition_plans.emplace_back(std::move(plan)); +// continue; +// } + +// Names sort_columns = metadata_snapshot->getSortingKeyColumns(); +// SortDescription sort_description; +// size_t sort_columns_size = sort_columns.size(); +// sort_description.reserve(sort_columns_size); + +// Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names; + +// const auto & header = plan->getCurrentDataStream().header; +// for (size_t i = 0; i < sort_columns_size; ++i) +// sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1); + +// auto final_step = std::make_unique( +// plan->getCurrentDataStream(), +// std::min(num_streams, settings.max_final_threads), +// sort_description, +// data.merging_params, +// partition_key_columns, +// max_block_size); + +// final_step->setStepDescription("Merge rows for FINAL"); +// plan->addStep(std::move(final_step)); + +// partition_plans.emplace_back(std::move(plan)); +// } + +// if (!lonely_parts.empty()) +// { +// RangesInDataParts new_parts; + +// size_t num_streams_for_lonely_parts = num_streams * lonely_parts.size(); + +// const size_t min_marks_for_concurrent_read = minMarksForConcurrentRead( +// settings.merge_tree_min_rows_for_concurrent_read, +// settings.merge_tree_min_bytes_for_concurrent_read, +// data_settings->index_granularity, +// index_granularity_bytes, +// sum_marks_in_lonely_parts); + +// /// Reduce the number of num_streams_for_lonely_parts if the data is small. +// if (sum_marks_in_lonely_parts < num_streams_for_lonely_parts * min_marks_for_concurrent_read && lonely_parts.size() < num_streams_for_lonely_parts) +// num_streams_for_lonely_parts = std::max((sum_marks_in_lonely_parts + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, lonely_parts.size()); + +// ReadFromMergeTree::Settings step_settings +// { +// .max_block_size = max_block_size, +// .preferred_block_size_bytes = settings.preferred_block_size_bytes, +// .preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes, +// .min_marks_for_concurrent_read = min_marks_for_concurrent_read, +// .use_uncompressed_cache = use_uncompressed_cache, +// .reader_settings = reader_settings, +// .backoff_settings = MergeTreeReadPool::BackoffSettings(settings), +// }; + +// auto plan = std::make_unique(); +// auto step = std::make_unique( +// data, +// metadata_snapshot, +// query_id, +// column_names, +// std::move(lonely_parts), +// // std::move(index_stats), +// query_info.projection ? query_info.projection->prewhere_info : query_info.prewhere_info, +// virt_columns, +// step_settings, +// num_streams_for_lonely_parts, +// ReadFromMergeTree::ReadType::Default); + +// plan->addStep(std::move(step)); + +// /// Drop temporary columns, added by 'sorting_key_expr' +// if (!out_projection) +// out_projection = createProjection(plan->getCurrentDataStream().header); + +// auto expression_step = std::make_unique( +// plan->getCurrentDataStream(), +// metadata_snapshot->getSortingKey().expression->getActionsDAG().clone()); + +// expression_step->setStepDescription("Calculate sorting key expression"); +// plan->addStep(std::move(expression_step)); + +// partition_plans.emplace_back(std::move(plan)); +// } + +// if (partition_plans.empty()) +// return {}; + +// if (partition_plans.size() == 1) +// return std::move(partition_plans.front()); + +// auto result_header = partition_plans.front()->getCurrentDataStream().header; +// DataStreams input_streams; +// for (const auto & partition_plan : partition_plans) +// input_streams.push_back(partition_plan->getCurrentDataStream()); + +// auto union_step = std::make_unique(std::move(input_streams), result_header); +// union_step->setStepDescription("Unite sources after FINAL"); +// QueryPlanPtr plan = std::make_unique(); +// plan->unitePlans(std::move(union_step), std::move(partition_plans)); +// return plan; +// } /// Calculates a set of mark ranges, that could possibly contain keys, required by condition. /// In other words, it removes subranges from whole range, that definitely could not contain required keys. diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index 7597af2e173..3e8076de8d3 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -17,19 +17,22 @@ struct MergeTreeDataSelectSamplingData { bool use_sampling = false; bool read_nothing = false; + Float64 used_sample_factor = 1.0; std::shared_ptr filter_function; ActionsDAGPtr filter_expression; }; -struct MergeTreeDataSelectCache -{ - RangesInDataParts parts_with_ranges; - MergeTreeDataSelectSamplingData sampling; - std::unique_ptr index_stats; - size_t sum_marks = 0; - size_t sum_ranges = 0; - bool use_cache = false; -}; +// struct MergeTreeDataSelectCache +// { +// RangesInDataParts parts_with_ranges; +// MergeTreeDataSelectSamplingData sampling; +// std::unique_ptr index_stats; +// size_t sum_marks = 0; +// size_t sum_ranges = 0; +// bool use_cache = false; +// }; + +using PartitionIdToMaxBlock = std::unordered_map; /** Executes SELECT queries on data from the merge tree. */ @@ -41,7 +44,6 @@ public: /** When reading, selects a set of parts that covers the desired range of the index. * max_blocks_number_to_read - if not nullptr, do not read all the parts whose right border is greater than max_block in partition. */ - using PartitionIdToMaxBlock = std::unordered_map; QueryPlanPtr read( const Names & column_names, @@ -53,6 +55,16 @@ public: QueryProcessingStage::Enum processed_stage, const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const; + size_t estimateNumMarksToRead( + MergeTreeData::DataPartsVector parts, + const Names & column_names, + const StorageMetadataPtr & metadata_snapshot_base, + const StorageMetadataPtr & metadata_snapshot, + const SelectQueryInfo & query_info, + ContextPtr context, + unsigned num_streams, + const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const; + QueryPlanPtr readFromParts( MergeTreeData::DataPartsVector parts, const Names & column_names, @@ -62,57 +74,56 @@ public: ContextPtr context, UInt64 max_block_size, unsigned num_streams, - const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr, - MergeTreeDataSelectCache * cache = nullptr) const; + const PartitionIdToMaxBlock * max_block_numbers_to_read = nullptr) const; private: const MergeTreeData & data; Poco::Logger * log; - QueryPlanPtr spreadMarkRangesAmongStreams( - RangesInDataParts && parts, - size_t num_streams, - const Names & column_names, - const StorageMetadataPtr & metadata_snapshot, - UInt64 max_block_size, - bool use_uncompressed_cache, - const SelectQueryInfo & query_info, - const Names & virt_columns, - const Settings & settings, - const MergeTreeReaderSettings & reader_settings, - const String & query_id) const; + // QueryPlanPtr spreadMarkRangesAmongStreams( + // RangesInDataParts && parts, + // size_t num_streams, + // const Names & column_names, + // const StorageMetadataPtr & metadata_snapshot, + // UInt64 max_block_size, + // bool use_uncompressed_cache, + // const SelectQueryInfo & query_info, + // const Names & virt_columns, + // const Settings & settings, + // const MergeTreeReaderSettings & reader_settings, + // const String & query_id) const; - /// out_projection - save projection only with columns, requested to read - QueryPlanPtr spreadMarkRangesAmongStreamsWithOrder( - RangesInDataParts && parts, - size_t num_streams, - const Names & column_names, - const StorageMetadataPtr & metadata_snapshot, - UInt64 max_block_size, - bool use_uncompressed_cache, - const SelectQueryInfo & query_info, - const ActionsDAGPtr & sorting_key_prefix_expr, - const Names & virt_columns, - const Settings & settings, - const MergeTreeReaderSettings & reader_settings, - ActionsDAGPtr & out_projection, - const String & query_id, - const InputOrderInfoPtr & input_order_info) const; + // /// out_projection - save projection only with columns, requested to read + // QueryPlanPtr spreadMarkRangesAmongStreamsWithOrder( + // RangesInDataParts && parts, + // size_t num_streams, + // const Names & column_names, + // const StorageMetadataPtr & metadata_snapshot, + // UInt64 max_block_size, + // bool use_uncompressed_cache, + // const SelectQueryInfo & query_info, + // const ActionsDAGPtr & sorting_key_prefix_expr, + // const Names & virt_columns, + // const Settings & settings, + // const MergeTreeReaderSettings & reader_settings, + // ActionsDAGPtr & out_projection, + // const String & query_id, + // const InputOrderInfoPtr & input_order_info) const; - QueryPlanPtr spreadMarkRangesAmongStreamsFinal( - RangesInDataParts && parts, - size_t num_streams, - const Names & column_names, - const StorageMetadataPtr & metadata_snapshot, - UInt64 max_block_size, - bool use_uncompressed_cache, - const SelectQueryInfo & query_info, - const Names & virt_columns, - const Settings & settings, - const MergeTreeReaderSettings & reader_settings, - ActionsDAGPtr & out_projection, - const String & query_id) const; + // QueryPlanPtr spreadMarkRangesAmongStreamsFinal( + // RangesInDataParts && parts, + // size_t num_streams, + // const Names & column_names, + // const StorageMetadataPtr & metadata_snapshot, + // UInt64 max_block_size, + // bool use_uncompressed_cache, + // const SelectQueryInfo & query_info, + // const Names & virt_columns, + // const Settings & settings, + // const MergeTreeReaderSettings & reader_settings, + // ActionsDAGPtr & out_projection, + // const String & query_id) const; /// Get the approximate value (bottom estimate - only by full marks) of the number of rows falling under the index. static size_t getApproximateTotalRowsToRead( @@ -140,7 +151,6 @@ private: size_t & granules_dropped, Poco::Logger * log); -public: struct PartFilterCounters { size_t num_initial_selected_parts = 0; @@ -175,16 +185,36 @@ public: PartFilterCounters & counters, Poco::Logger * log); - static RangesInDataParts filterParts( +public: + static std::optional> filterPartsByVirtualColumns( + const MergeTreeData & data, MergeTreeData::DataPartsVector & parts, + const ASTPtr & query, + ContextPtr context); + + static void filterPartsByPartition( + const StorageMetadataPtr & metadata_snapshot, + const MergeTreeData & data, + const SelectQueryInfo & query_info, + ContextPtr & context, + ContextPtr & query_context, + MergeTreeData::DataPartsVector & parts, + const std::optional> & part_values, + const PartitionIdToMaxBlock * max_block_numbers_to_read, + Poco::Logger * log, + ReadFromMergeTree::IndexStats & index_stats); + + static RangesInDataParts filterPartsByPrimaryKeyAndSkipIndexes( + MergeTreeData::DataPartsVector && parts, StorageMetadataPtr metadata_snapshot, - SelectQueryInfo & query_info, + const SelectQueryInfo & query_info, ContextPtr & context, KeyCondition & key_condition, const MergeTreeReaderSettings & reader_settings, Poco::Logger * log, size_t num_streams, - ReadFromMergeTree::IndexStats & index_stats); + ReadFromMergeTree::IndexStats & index_stats, + bool use_skip_indexes); static MergeTreeDataSelectSamplingData getSampling( const ASTSelectQuery & select, @@ -197,7 +227,7 @@ public: NamesAndTypesList available_real_columns, ContextPtr context); - static String checkLimits(MergeTreeData & data, const RangesInDataParts & parts_with_ranges, ContextPtr & context); + static String checkLimits(const MergeTreeData & data, const RangesInDataParts & parts_with_ranges, ContextPtr & context); }; } diff --git a/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h b/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h index e8b39c8e28c..dff864bc58c 100644 --- a/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h +++ b/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h @@ -41,10 +41,7 @@ public: query_info, context, max_block_size, - num_streams, - nullptr, - query_info.projection ? query_info.projection->merge_tree_data_select_base_cache.get() - : query_info.merge_tree_data_select_cache.get())); + num_streams)); return query_plan.convertToPipe( QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)); diff --git a/src/Storages/SelectQueryInfo.h b/src/Storages/SelectQueryInfo.h index afed41189c2..73cf3893a89 100644 --- a/src/Storages/SelectQueryInfo.h +++ b/src/Storages/SelectQueryInfo.h @@ -137,8 +137,8 @@ struct ProjectionCandidate ReadInOrderOptimizerPtr order_optimizer; InputOrderInfoPtr input_order_info; ManyExpressionActions group_by_elements_actions; - std::shared_ptr merge_tree_data_select_base_cache; - std::shared_ptr merge_tree_data_select_projection_cache; + // std::shared_ptr merge_tree_data_select_base_cache; + // std::shared_ptr merge_tree_data_select_projection_cache; }; /** Query along with some additional data,