diff --git a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp index 1c5e0861530..48184cb2b5f 100644 --- a/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp +++ b/src/Processors/QueryPlan/ParallelReplicasLocalPlan.cpp @@ -273,37 +273,37 @@ std::unique_ptr createLocalPlanForParallelReplicas( } chassert(reading); - const auto * analyzed_merge_tree = typeid_cast(read_from_merge_tree.get()); - if (!analyzed_merge_tree->hasAnalyzedResult()) - analyzed_merge_tree->selectRangesToRead(); + chassert(analyzed_merge_tree->hasAnalyzedResult()); + CoordinationMode mode = CoordinationMode::Default; switch (analyzed_merge_tree->getReadType()) { - case ReadFromMergeTree::ReadType::Default: - coordinator->initialize(CoordinationMode::Default); - break; - case ReadFromMergeTree::ReadType::InOrder: - coordinator->initialize(CoordinationMode::WithOrder); - break; - case ReadFromMergeTree::ReadType::InReverseOrder: - coordinator->initialize(CoordinationMode::ReverseOrder); - break; - case ReadFromMergeTree::ReadType::ParallelReplicas: - chassert(false); - UNREACHABLE(); + case ReadFromMergeTree::ReadType::Default: + mode = CoordinationMode::Default; + break; + case ReadFromMergeTree::ReadType::InOrder: + mode = CoordinationMode::WithOrder; + break; + case ReadFromMergeTree::ReadType::InReverseOrder: + mode = CoordinationMode::ReverseOrder; + break; + case ReadFromMergeTree::ReadType::ParallelReplicas: + chassert(false); + UNREACHABLE(); } - MergeTreeAllRangesCallback all_ranges_cb = [coordinator](InitialAllRangesAnnouncement announcement) - { - chassert(coordinator); - coordinator->handleInitialAllRangesAnnouncement(std::move(announcement)); - }; + const auto number_of_local_replica = new_context->getSettingsRef().max_parallel_replicas - 1; + coordinator->handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement( + mode, analyzed_merge_tree->getAnalysisResult().parts_with_ranges.getDescriptions(), number_of_local_replica)); + + MergeTreeAllRangesCallback all_ranges_cb = [coordinator](InitialAllRangesAnnouncement) {}; MergeTreeReadTaskCallback read_task_cb = [coordinator](ParallelReadRequest req) -> std::optional { return coordinator->handleRequest(std::move(req)); }; - auto read_from_merge_tree_parallel_replicas = reading->createLocalParallelReplicasReadingStep(analyzed_merge_tree, true, all_ranges_cb, read_task_cb); + auto read_from_merge_tree_parallel_replicas + = reading->createLocalParallelReplicasReadingStep(analyzed_merge_tree, true, all_ranges_cb, read_task_cb, number_of_local_replica); node->step = std::move(read_from_merge_tree_parallel_replicas); addConvertingActions(*query_plan, header, has_missing_objects); diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 198236c4f49..e3e09673431 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -273,8 +273,9 @@ ReadFromMergeTree::ReadFromMergeTree( LoggerPtr log_, AnalysisResultPtr analyzed_result_ptr_, bool enable_parallel_reading_, - std::optional all_ranges_callback_, - std::optional read_task_callback_) + std::optional all_ranges_callback_, + std::optional read_task_callback_, + std::optional number_of_current_replica_) : SourceStepWithFilter(DataStream{.header = MergeTreeSelectProcessor::transformHeader( storage_snapshot_->getSampleBlockForColumns(all_column_names_), query_info_.prewhere_info)}, all_column_names_, query_info_, storage_snapshot_, context_) @@ -295,18 +296,12 @@ ReadFromMergeTree::ReadFromMergeTree( , analyzed_result_ptr(analyzed_result_ptr_) , is_parallel_reading_from_replicas(enable_parallel_reading_) , enable_remove_parts_from_snapshot_optimization(query_info_.merge_tree_enable_remove_parts_from_snapshot_optimization) + , number_of_current_replica(number_of_current_replica_) { if (is_parallel_reading_from_replicas) { - if (all_ranges_callback_) - all_ranges_callback = all_ranges_callback_.value(); - else - all_ranges_callback = context->getMergeTreeAllRangesCallback(); - - if (read_task_callback_) - read_task_callback = read_task_callback_.value(); - else - read_task_callback = context->getMergeTreeReadTaskCallback(); + all_ranges_callback = all_ranges_callback_.value_or(context->getMergeTreeAllRangesCallback()); + read_task_callback = read_task_callback_.value_or(context->getMergeTreeReadTaskCallback()); } const auto & settings = context->getSettingsRef(); @@ -344,7 +339,8 @@ std::unique_ptr ReadFromMergeTree::createLocalParallelReplica const ReadFromMergeTree * analyzed_merge_tree, bool enable_parallel_reading_, std::optional all_ranges_callback_, - std::optional read_task_callback_) + std::optional read_task_callback_, + std::optional number_of_current_replica_) { return std::make_unique( prepared_parts, @@ -361,7 +357,8 @@ std::unique_ptr ReadFromMergeTree::createLocalParallelReplica analyzed_merge_tree->analyzed_result_ptr, enable_parallel_reading_, all_ranges_callback_, - read_task_callback_); + read_task_callback_, + number_of_current_replica_); } Pipe ReadFromMergeTree::readFromPoolParallelReplicas(RangesInDataParts parts_with_range, Names required_columns, PoolSettings pool_settings) @@ -372,9 +369,7 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(RangesInDataParts parts_wit { .all_callback = all_ranges_callback.value(), .callback = read_task_callback.value(), - .count_participating_replicas = client_info.count_participating_replicas, - .number_of_current_replica = client_info.number_of_current_replica, - .columns_to_read = required_columns, + .number_of_current_replica = number_of_current_replica.value_or(client_info.number_of_current_replica), }; /// We have a special logic for local replica. It has to read less data, because in some cases it should @@ -545,9 +540,7 @@ Pipe ReadFromMergeTree::readInOrder( { .all_callback = all_ranges_callback.value(), .callback = read_task_callback.value(), - .count_participating_replicas = client_info.count_participating_replicas, .number_of_current_replica = client_info.number_of_current_replica, - .columns_to_read = required_columns, }; const auto multiplier = context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier; diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index adc3818c3ab..8cabc6822c1 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -121,13 +121,15 @@ public: AnalysisResultPtr analyzed_result_ptr_, bool enable_parallel_reading_, std::optional all_ranges_callback_ = std::nullopt, - std::optional read_task_callback_ = std::nullopt); + std::optional read_task_callback_ = std::nullopt, + std::optional number_of_current_replica_ = std::nullopt); std::unique_ptr createLocalParallelReplicasReadingStep( const ReadFromMergeTree * analyzed_merge_tree, bool enable_parallel_reading_, std::optional all_ranges_callback_, - std::optional read_task_callback_); + std::optional read_task_callback_, + std::optional number_of_current_replica_); static constexpr auto name = "ReadFromMergeTree"; String getName() const override { return name; } @@ -192,6 +194,7 @@ public: bool hasAnalyzedResult() const { return analyzed_result_ptr != nullptr; } void setAnalyzedResult(AnalysisResultPtr analyzed_result_ptr_) { analyzed_result_ptr = std::move(analyzed_result_ptr_); } + ReadFromMergeTree::AnalysisResult getAnalysisResult() const; const MergeTreeData::DataPartsVector & getParts() const { return prepared_parts; } const std::vector & getAlterConvertionsForParts() const { return alter_conversions_for_parts; } @@ -286,8 +289,6 @@ private: Pipe spreadMarkRangesAmongStreamsFinal( RangesInDataParts && parts, size_t num_streams, const Names & origin_column_names, const Names & column_names, ActionsDAGPtr & out_projection); - ReadFromMergeTree::AnalysisResult getAnalysisResult() const; - AnalysisResultPtr analyzed_result_ptr; VirtualFields shared_virtual_fields; @@ -296,6 +297,7 @@ private: std::optional read_task_callback; bool enable_vertical_final = false; bool enable_remove_parts_from_snapshot_optimization = true; + std::optional number_of_current_replica; friend class ReadFromMergeTreeCoordinated; }; diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index 485d5f675ab..23d8f2b496f 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -390,8 +390,18 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep( std::vector description; description.push_back(fmt::format("query: {}", formattedAST(query_ast))); - for (const auto & pool : cluster->getShardsInfo().front().per_replica_pools) - description.push_back(fmt::format("Replica: {}", pool->getHost())); + bool first_local = false; + for (const auto & addr : cluster->getShardsAddresses().front()) + { + /// skip first local + if (exclude_local_replica && addr.is_local && !first_local) + { + first_local = true; + continue; + } + + description.push_back(fmt::format("Replica: {}", addr.host_name)); + } setStepDescription(boost::algorithm::join(description, ", ")); } @@ -414,9 +424,6 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder const auto & shard = cluster->getShardsInfo().at(0); size_t all_replicas_count = current_settings.max_parallel_replicas; - if (exclude_local_replica) - --all_replicas_count; - if (all_replicas_count > shard.getAllNodeCount()) { LOG_INFO( @@ -427,6 +434,8 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder shard.getAllNodeCount()); all_replicas_count = shard.getAllNodeCount(); } + if (exclude_local_replica) + --all_replicas_count; std::vector shuffled_pool; if (all_replicas_count < shard.getAllNodeCount()) diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.h b/src/Storages/MergeTree/MergeTreeSelectProcessor.h index 8f41f5deacb..03ca30dd5b3 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.h @@ -26,12 +26,7 @@ struct ParallelReadingExtension { MergeTreeAllRangesCallback all_callback; MergeTreeReadTaskCallback callback; - size_t count_participating_replicas{0}; size_t number_of_current_replica{0}; - /// This is needed to estimate the number of bytes - /// between a pair of marks to perform one request - /// over the network for a 1Gb of data. - Names columns_to_read; }; /// Base class for MergeTreeThreadSelectAlgorithm and MergeTreeSelectAlgorithm