From 1e43e3515ea889a55752bf8a1d9d401fdcdf4e0a Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Wed, 17 Apr 2024 16:47:52 +0000 Subject: [PATCH] Check table status for parallel replicas --- src/Client/Connection.cpp | 1 - src/Interpreters/ClusterProxy/executeQuery.cpp | 9 ++++++--- src/Interpreters/ClusterProxy/executeQuery.h | 4 +++- src/Planner/findParallelReplicasQuery.cpp | 11 +++-------- src/Processors/QueryPlan/ReadFromRemote.cpp | 5 +++-- src/Processors/QueryPlan/ReadFromRemote.h | 2 ++ .../RemoteQueryExecutorReadContext.h | 1 - src/Storages/StorageMergeTree.cpp | 11 +++-------- src/Storages/StorageReplicatedMergeTree.cpp | 15 +++++---------- src/Storages/StorageReplicatedMergeTree.h | 1 - 10 files changed, 25 insertions(+), 35 deletions(-) diff --git a/src/Client/Connection.cpp b/src/Client/Connection.cpp index 6a62a9eb296..e1bad42198f 100644 --- a/src/Client/Connection.cpp +++ b/src/Client/Connection.cpp @@ -617,7 +617,6 @@ TablesStatusResponse Connection::getTablesStatus(const ConnectionTimeouts & time out->next(); fiu_do_on(FailPoints::receive_timeout_on_table_status_response, { - sleepForSeconds(10); throw NetException(ErrorCodes::SOCKET_TIMEOUT, "Injected timeout exceeded while reading from socket ({}:{})", host, port); }); diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 07ef7aa6c96..35978f07c20 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -364,7 +364,9 @@ void executeQuery( void executeQueryWithParallelReplicas( QueryPlan & query_plan, - SelectStreamFactory & stream_factory, + const StorageID & storage_id, + const Block & header, + QueryProcessingStage::Enum processed_stage, const ASTPtr & query_ast, ContextPtr context, std::shared_ptr storage_limits) @@ -453,9 +455,10 @@ void executeQueryWithParallelReplicas( auto read_from_remote = std::make_unique( query_ast, new_cluster, + storage_id, std::move(coordinator), - stream_factory.header, - stream_factory.processed_stage, + header, + processed_stage, new_context, getThrottler(new_context), std::move(scalars), diff --git a/src/Interpreters/ClusterProxy/executeQuery.h b/src/Interpreters/ClusterProxy/executeQuery.h index 8f6f6300c7b..582f8d74fd5 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.h +++ b/src/Interpreters/ClusterProxy/executeQuery.h @@ -68,7 +68,9 @@ void executeQuery( void executeQueryWithParallelReplicas( QueryPlan & query_plan, - SelectStreamFactory & stream_factory, + const StorageID & storage_id, + const Block & header, + QueryProcessingStage::Enum processed_stage, const ASTPtr & query_ast, ContextPtr context, std::shared_ptr storage_limits); diff --git a/src/Planner/findParallelReplicasQuery.cpp b/src/Planner/findParallelReplicasQuery.cpp index ef640bcd42d..fbe81185239 100644 --- a/src/Planner/findParallelReplicasQuery.cpp +++ b/src/Planner/findParallelReplicasQuery.cpp @@ -412,17 +412,12 @@ JoinTreeQueryPlan buildQueryPlanForParallelReplicas( Block header = InterpreterSelectQueryAnalyzer::getSampleBlock( modified_query_tree, context, SelectQueryOptions(processed_stage).analyze()); - ClusterProxy::SelectStreamFactory select_stream_factory = - ClusterProxy::SelectStreamFactory( - header, - {}, - {}, - processed_stage); - QueryPlan query_plan; ClusterProxy::executeQueryWithParallelReplicas( query_plan, - select_stream_factory, + StorageID::createEmpty(), + header, + processed_stage, modified_query_ast, context, storage_limits); diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index 119710d06d8..b4e35af85d6 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -360,6 +360,7 @@ void ReadFromRemote::initializePipeline(QueryPipelineBuilder & pipeline, const B ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep( ASTPtr query_ast_, ClusterPtr cluster_, + const StorageID & storage_id_, ParallelReplicasReadingCoordinatorPtr coordinator_, Block header_, QueryProcessingStage::Enum stage_, @@ -372,6 +373,7 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep( : ISourceStep(DataStream{.header = std::move(header_)}) , cluster(cluster_) , query_ast(query_ast_) + , storage_id(storage_id_) , coordinator(std::move(coordinator_)) , stage(std::move(stage_)) , context(context_) @@ -419,7 +421,6 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder all_replicas_count = shard.getAllNodeCount(); } - std::vector shuffled_pool; if (all_replicas_count < shard.getAllNodeCount()) { @@ -452,7 +453,6 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder processor->setStorageLimits(storage_limits); pipeline.init(std::move(pipe)); - } @@ -488,6 +488,7 @@ void ReadFromParallelRemoteReplicasStep::addPipeForSingeReplica( RemoteQueryExecutor::Extension{.parallel_reading_coordinator = coordinator, .replica_info = std::move(replica_info)}); remote_query_executor->setLogger(log); + remote_query_executor->setMainTable(storage_id); pipes.emplace_back(createRemoteSourcePipe(std::move(remote_query_executor), add_agg_info, add_totals, add_extremes, async_read, async_query_sending)); addConvertingActions(pipes.back(), output_stream->header); diff --git a/src/Processors/QueryPlan/ReadFromRemote.h b/src/Processors/QueryPlan/ReadFromRemote.h index 498d584e85a..eb15269155a 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.h +++ b/src/Processors/QueryPlan/ReadFromRemote.h @@ -69,6 +69,7 @@ public: ReadFromParallelRemoteReplicasStep( ASTPtr query_ast_, ClusterPtr cluster_, + const StorageID & storage_id_, ParallelReplicasReadingCoordinatorPtr coordinator_, Block header_, QueryProcessingStage::Enum stage_, @@ -91,6 +92,7 @@ private: ClusterPtr cluster; ASTPtr query_ast; + StorageID storage_id; ParallelReplicasReadingCoordinatorPtr coordinator; QueryProcessingStage::Enum stage; ContextMutablePtr context; diff --git a/src/QueryPipeline/RemoteQueryExecutorReadContext.h b/src/QueryPipeline/RemoteQueryExecutorReadContext.h index 4e62b42a067..b8aa8bb9111 100644 --- a/src/QueryPipeline/RemoteQueryExecutorReadContext.h +++ b/src/QueryPipeline/RemoteQueryExecutorReadContext.h @@ -2,7 +2,6 @@ #if defined(OS_LINUX) -#include #include #include #include diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 86af02be899..65218632287 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -235,16 +235,11 @@ void StorageMergeTree::read( = InterpreterSelectQuery(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock(); } - ClusterProxy::SelectStreamFactory select_stream_factory = - ClusterProxy::SelectStreamFactory( - header, - {}, - storage_snapshot, - processed_stage); - ClusterProxy::executeQueryWithParallelReplicas( query_plan, - select_stream_factory, + getStorageID(), + header, + processed_stage, modified_query_ast, local_context, query_info.storage_limits); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index aa90fc43d52..c9b28c5405c 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5449,7 +5449,7 @@ void StorageReplicatedMergeTree::read( return readLocalSequentialConsistencyImpl(query_plan, column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams); if (local_context->canUseParallelReplicasOnInitiator()) - return readParallelReplicasImpl(query_plan, column_names, storage_snapshot, query_info, local_context, processed_stage); + return readParallelReplicasImpl(query_plan, column_names, query_info, local_context, processed_stage); readLocalImpl(query_plan, column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams); } @@ -5476,13 +5476,13 @@ void StorageReplicatedMergeTree::readLocalSequentialConsistencyImpl( void StorageReplicatedMergeTree::readParallelReplicasImpl( QueryPlan & query_plan, const Names & /*column_names*/, - const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr local_context, QueryProcessingStage::Enum processed_stage) { ASTPtr modified_query_ast; Block header; + const auto table_id = getStorageID(); if (local_context->getSettingsRef().allow_experimental_analyzer) { @@ -5496,22 +5496,17 @@ void StorageReplicatedMergeTree::readParallelReplicasImpl( } else { - const auto table_id = getStorageID(); modified_query_ast = ClusterProxy::rewriteSelectQuery(local_context, query_info.query, table_id.database_name, table_id.table_name, /*remote_table_function_ptr*/nullptr); header = InterpreterSelectQuery(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock(); } - ClusterProxy::SelectStreamFactory select_stream_factory = ClusterProxy::SelectStreamFactory( - header, - {}, - storage_snapshot, - processed_stage); - ClusterProxy::executeQueryWithParallelReplicas( query_plan, - select_stream_factory, + table_id, + header, + processed_stage, modified_query_ast, local_context, query_info.storage_limits); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index c472c11e7f8..c131aa3fad3 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -567,7 +567,6 @@ private: void readParallelReplicasImpl( QueryPlan & query_plan, const Names & column_names, - const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr local_context, QueryProcessingStage::Enum processed_stage);