diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 71912fa1081..e12e531ab51 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -495,7 +495,7 @@ void executeQueryWithParallelReplicas( query_ast, new_cluster, storage_id, - std::move(coordinator), + coordinator, header, processed_stage, new_context, diff --git a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp index 64646960824..2bfd8965269 100644 --- a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp +++ b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp @@ -1,12 +1,32 @@ #include #include +#include "Storages/MergeTree/RequestResponse.h" #include #include #include #include #include #include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include + +namespace ProfileEvents +{ + extern const Event SelectedParts; + extern const Event SelectedRanges; + extern const Event SelectedMarks; +} namespace DB { @@ -40,13 +60,179 @@ void addConvertingActions(QueryPlan & plan, const Block & header, bool has_missi } +class ReadFromMergeTreeCoordinated : public ISourceStep +{ +public: + ReadFromMergeTreeCoordinated(QueryPlanStepPtr read_from_merge_tree_, ParallelReplicasReadingCoordinatorPtr coordinator_) + : ISourceStep(read_from_merge_tree_->getOutputStream()) + , read_from_merge_tree(std::move(read_from_merge_tree_)) + , coordinator(std::move(coordinator_)) + { + } + + void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) override; + String getName() const override { return "ReadFromLocalParallelReplica"; } + +private: + QueryPlanStepPtr read_from_merge_tree; + ParallelReplicasReadingCoordinatorPtr coordinator; +}; + +void ReadFromMergeTreeCoordinated::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*settings*/) +{ + ReadFromMergeTree & reading = *typeid_cast(read_from_merge_tree.get()); + + auto result = reading.getAnalysisResult(); + const auto & query_info = reading.getQueryInfo(); + const auto & data = reading.data; + const auto & context = reading.getContext(); + const auto & storage_snapshot = reading.getStorageSnapshot(); + + if (reading.enable_remove_parts_from_snapshot_optimization) + { + /// Do not keep data parts in snapshot. + /// They are stored separately, and some could be released after PK analysis. + reading.storage_snapshot->data = std::make_unique(); + } + + LOG_DEBUG( + reading.log, + "Selected {}/{} parts by partition key, {} parts by primary key, {}/{} marks by primary key, {} marks to read from {} ranges", + result.parts_before_pk, + result.total_parts, + result.selected_parts, + result.selected_marks_pk, + result.total_marks_pk, + result.selected_marks, + result.selected_ranges); + + // Adding partition info to QueryAccessInfo. + if (context->hasQueryContext() && !query_info.is_internal) + { + Names partition_names; + for (const auto & part : result.parts_with_ranges) + { + partition_names.emplace_back( + fmt::format("{}.{}", data.getStorageID().getFullNameNotQuoted(), part.data_part->info.partition_id)); + } + context->getQueryContext()->addQueryAccessInfo(partition_names); + + if (storage_snapshot->projection) + context->getQueryContext()->addQueryAccessInfo( + Context::QualifiedProjectionName{.storage_id = data.getStorageID(), .projection_name = storage_snapshot->projection->name}); + } + + ProfileEvents::increment(ProfileEvents::SelectedParts, result.selected_parts); + ProfileEvents::increment(ProfileEvents::SelectedRanges, result.selected_ranges); + ProfileEvents::increment(ProfileEvents::SelectedMarks, result.selected_marks); + + auto query_id_holder = MergeTreeDataSelectExecutor::checkLimits(data, result, context); + + // TODO: check this on plan level, we should be here if there is nothing to read + if (result.parts_with_ranges.empty()) + { + pipeline.init(Pipe(std::make_shared(getOutputStream().header))); + return; + } + + /// Projection, that needed to drop columns, which have appeared by execution + /// of some extra expressions, and to allow execute the same expressions later. + /// NOTE: It may lead to double computation of expressions. + ActionsDAGPtr result_projection; + + Pipe pipe = reading.spreadMarkRanges(std::move(result.parts_with_ranges), reading.requested_num_streams, result, result_projection); + + for (const auto & processor : pipe.getProcessors()) + processor->setStorageLimits(query_info.storage_limits); + + if (pipe.empty()) + { + pipeline.init(Pipe(std::make_shared(getOutputStream().header))); + return; + } + + if (result.sampling.use_sampling) + { + auto sampling_actions = std::make_shared(result.sampling.filter_expression); + pipe.addSimpleTransform([&](const Block & header) + { + return std::make_shared( + header, + sampling_actions, + result.sampling.filter_function->getColumnName(), + false); + }); + } + + Block cur_header = pipe.getHeader(); + + auto append_actions = [&result_projection](ActionsDAGPtr actions) + { + if (!result_projection) + result_projection = std::move(actions); + else + result_projection = ActionsDAG::merge(std::move(*result_projection), std::move(*actions)); + }; + + if (result_projection) + cur_header = result_projection->updateHeader(cur_header); + + /// Extra columns may be returned (for example, if sampling is used). + /// Convert pipe to step header structure. + if (!isCompatibleHeader(cur_header, getOutputStream().header)) + { + auto converting = ActionsDAG::makeConvertingActions( + cur_header.getColumnsWithTypeAndName(), + getOutputStream().header.getColumnsWithTypeAndName(), + ActionsDAG::MatchColumnsMode::Name); + + append_actions(std::move(converting)); + } + + if (result_projection) + { + auto projection_actions = std::make_shared(result_projection); + pipe.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, projection_actions); + }); + } + + /// Some extra columns could be added by sample/final/in-order/etc + /// Remove them from header if not needed. + if (!blocksHaveEqualStructure(pipe.getHeader(), getOutputStream().header)) + { + auto convert_actions_dag = ActionsDAG::makeConvertingActions( + pipe.getHeader().getColumnsWithTypeAndName(), + getOutputStream().header.getColumnsWithTypeAndName(), + ActionsDAG::MatchColumnsMode::Name, + true); + + auto converting_dag_expr = std::make_shared(convert_actions_dag); + + pipe.addSimpleTransform([&](const Block & header) + { + return std::make_shared(header, converting_dag_expr); + }); + } + + for (const auto & processor : pipe.getProcessors()) + processors.emplace_back(processor); + + pipeline.init(std::move(pipe)); + pipeline.addContext(context); + // Attach QueryIdHolder if needed + if (query_id_holder) + pipeline.setQueryIdHolder(std::move(query_id_holder)); +} + std::unique_ptr createLocalPlanForParallelReplicas( const ASTPtr & query_ast, const Block & header, ContextPtr context, QueryProcessingStage::Enum processed_stage, - ParallelReplicasReadingCoordinatorPtr /*coordinator*/, - QueryPlanStepPtr /*read_from_merge_tree*/, + ParallelReplicasReadingCoordinatorPtr coordinator, + QueryPlanStepPtr read_from_merge_tree, bool has_missing_objects) { checkStackSize(); @@ -72,6 +258,33 @@ std::unique_ptr createLocalPlanForParallelReplicas( auto interpreter = InterpreterSelectQueryAnalyzer(query_ast, new_context, select_query_options); query_plan = std::make_unique(std::move(interpreter).extractQueryPlan()); + QueryPlan::Node * node = query_plan->getRootNode(); + ReadFromMergeTree * reading = nullptr; + while (node) + { + reading = typeid_cast(node->step.get()); + if (reading) + break; + + if (!node->children.empty()) + node = node->children.at(0); + } + + chassert(reading); + + MergeTreeAllRangesCallback all_ranges_cb = [coordinator](InitialAllRangesAnnouncement announcement) + { + chassert(coordinator); + coordinator->handleInitialAllRangesAnnouncement(std::move(announcement)); + }; + + MergeTreeReadTaskCallback read_task_cb = [coordinator](ParallelReadRequest req) -> std::optional + { return coordinator->handleRequest(std::move(req)); }; + + const auto * analyzed_merge_tree = typeid_cast(read_from_merge_tree.get()); + auto read_from_merge_tree_parallel_replicas = reading->createLocalParallelReplicasReadingStep(analyzed_merge_tree, true, all_ranges_cb, read_task_cb); + node->step = std::move(read_from_merge_tree_parallel_replicas); + addConvertingActions(*query_plan, header, has_missing_objects); return query_plan; } diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 6f0fa55c349..21303cf2af2 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -272,7 +272,9 @@ ReadFromMergeTree::ReadFromMergeTree( std::shared_ptr max_block_numbers_to_read_, LoggerPtr log_, AnalysisResultPtr analyzed_result_ptr_, - bool enable_parallel_reading) + bool enable_parallel_reading_, + std::optional all_ranges_callback_, + std::optional read_task_callback_) : SourceStepWithFilter(DataStream{.header = MergeTreeSelectProcessor::transformHeader( storage_snapshot_->getSampleBlockForColumns(all_column_names_), query_info_.prewhere_info)}, all_column_names_, query_info_, storage_snapshot_, context_) @@ -291,13 +293,20 @@ ReadFromMergeTree::ReadFromMergeTree( , max_block_numbers_to_read(std::move(max_block_numbers_to_read_)) , log(std::move(log_)) , analyzed_result_ptr(analyzed_result_ptr_) - , is_parallel_reading_from_replicas(enable_parallel_reading) + , is_parallel_reading_from_replicas(enable_parallel_reading_) , enable_remove_parts_from_snapshot_optimization(query_info_.merge_tree_enable_remove_parts_from_snapshot_optimization) { if (is_parallel_reading_from_replicas) { - all_ranges_callback = context->getMergeTreeAllRangesCallback(); - read_task_callback = context->getMergeTreeReadTaskCallback(); + if (all_ranges_callback_) + all_ranges_callback = all_ranges_callback_.value(); + else + all_ranges_callback = context->getMergeTreeAllRangesCallback(); + + if (read_task_callback_) + read_task_callback = read_task_callback_.value(); + else + read_task_callback = context->getMergeTreeReadTaskCallback(); } const auto & settings = context->getSettingsRef(); @@ -331,11 +340,31 @@ ReadFromMergeTree::ReadFromMergeTree( enable_vertical_final); } +std::unique_ptr ReadFromMergeTree::createLocalParallelReplicasReadingStep( + const ReadFromMergeTree * analyzed_merge_tree, + bool enable_parallel_reading_, + std::optional all_ranges_callback_, + std::optional read_task_callback_) +{ + return std::make_unique( + prepared_parts, + alter_conversions_for_parts, + all_column_names, + data, + getQueryInfo(), + getStorageSnapshot(), + getContext(), + block_size.max_block_size_rows, + requested_num_streams, + max_block_numbers_to_read, + log, + analyzed_merge_tree->analyzed_result_ptr, + enable_parallel_reading_, + all_ranges_callback_, + read_task_callback_); +} -Pipe ReadFromMergeTree::readFromPoolParallelReplicas( - RangesInDataParts parts_with_range, - Names required_columns, - PoolSettings pool_settings) +Pipe ReadFromMergeTree::readFromPoolParallelReplicas(RangesInDataParts parts_with_range, Names required_columns, PoolSettings pool_settings) { const auto & client_info = context->getClientInfo(); diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index 5d7879e8dee..bba5293e863 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -119,7 +119,15 @@ public: std::shared_ptr max_block_numbers_to_read_, LoggerPtr log_, AnalysisResultPtr analyzed_result_ptr_, - bool enable_parallel_reading); + bool enable_parallel_reading_, + std::optional all_ranges_callback_ = std::nullopt, + std::optional read_task_callback_ = std::nullopt); + + std::unique_ptr createLocalParallelReplicasReadingStep( + const ReadFromMergeTree * analyzed_merge_tree, + bool enable_parallel_reading_, + std::optional all_ranges_callback_, + std::optional read_task_callback_); static constexpr auto name = "ReadFromMergeTree"; String getName() const override { return name; } @@ -282,6 +290,8 @@ private: std::optional read_task_callback; bool enable_vertical_final = false; bool enable_remove_parts_from_snapshot_optimization = true; + + friend class ReadFromMergeTreeCoordinated; }; } diff --git a/src/Storages/MergeTree/RequestResponse.h b/src/Storages/MergeTree/RequestResponse.h index 3a5bfde6c20..5f5516a6804 100644 --- a/src/Storages/MergeTree/RequestResponse.h +++ b/src/Storages/MergeTree/RequestResponse.h @@ -1,6 +1,5 @@ #pragma once -#include #include #include