diff --git a/programs/copier/ClusterCopier.cpp b/programs/copier/ClusterCopier.cpp index 9663ca93c12..f3609902fcb 100644 --- a/programs/copier/ClusterCopier.cpp +++ b/programs/copier/ClusterCopier.cpp @@ -8,7 +8,7 @@ #include #include #include -#include +#include #include #include #include @@ -1455,7 +1455,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl( local_context->setSettings(task_cluster->settings_pull); local_context->setSetting("skip_unavailable_shards", true); - InterpreterSelectQuery select(query_select_ast, local_context, SelectQueryOptions{}); + InterpreterSelectWithUnionQuery select(query_select_ast, local_context, SelectQueryOptions{}); QueryPlan plan; select.buildQueryPlan(plan); auto builder = std::move(*plan.buildQueryPipeline( @@ -1545,7 +1545,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl( { BlockIO io_insert = InterpreterFactory::get(query_insert_ast, context_insert)->execute(); - InterpreterSelectQuery select(query_select_ast, context_select, SelectQueryOptions{}); + InterpreterSelectWithUnionQuery select(query_select_ast, context_select, SelectQueryOptions{}); QueryPlan plan; select.buildQueryPlan(plan); auto builder = std::move(*plan.buildQueryPipeline( @@ -1875,7 +1875,7 @@ std::set ClusterCopier::getShardPartitions(const ConnectionTimeouts & ti auto local_context = Context::createCopy(context); local_context->setSettings(task_cluster->settings_pull); - InterpreterSelectQuery select(query_ast, local_context, SelectQueryOptions{}); + InterpreterSelectWithUnionQuery select(query_ast, local_context, SelectQueryOptions{}); QueryPlan plan; select.buildQueryPlan(plan); auto builder = std::move(*plan.buildQueryPipeline( diff --git a/src/Processors/ISource.h b/src/Processors/ISource.h index 292f79ba348..06c54981192 100644 --- a/src/Processors/ISource.h +++ b/src/Processors/ISource.h @@ -40,7 +40,7 @@ public: void setStorageLimits(const std::shared_ptr & storage_limits_) override; /// Default implementation for all the sources. - std::optional getReadProgress() final; + std::optional getReadProgress() override; void addTotalRowsApprox(size_t value) { read_progress.total_rows_approx += value; } }; diff --git a/src/Processors/Sources/NullSource.h b/src/Processors/Sources/NullSource.h index d1f0ec5e6ca..e23ae99f2d0 100644 --- a/src/Processors/Sources/NullSource.h +++ b/src/Processors/Sources/NullSource.h @@ -11,6 +11,8 @@ public: explicit NullSource(Block header) : ISource(std::move(header)) {} String getName() const override { return "NullSource"; } + std::optional getReadProgress() override { return {}; } + protected: Chunk generate() override { return Chunk(); } }; diff --git a/src/Processors/Sources/RemoteSource.h b/src/Processors/Sources/RemoteSource.h index f415b91aae0..a256247117d 100644 --- a/src/Processors/Sources/RemoteSource.h +++ b/src/Processors/Sources/RemoteSource.h @@ -61,6 +61,8 @@ public: String getName() const override { return "RemoteTotals"; } + std::optional getReadProgress() override { return {}; } + protected: Chunk generate() override; @@ -77,6 +79,8 @@ public: String getName() const override { return "RemoteExtremes"; } + std::optional getReadProgress() override { return {}; } + protected: Chunk generate() override; diff --git a/src/Storages/ReadFinalForExternalReplicaStorage.cpp b/src/Storages/ReadFinalForExternalReplicaStorage.cpp index 189e09a3fb3..3ec7a074fd4 100644 --- a/src/Storages/ReadFinalForExternalReplicaStorage.cpp +++ b/src/Storages/ReadFinalForExternalReplicaStorage.cpp @@ -13,6 +13,7 @@ #include #include #include +#include namespace DB @@ -59,6 +60,13 @@ void readFinalFromNestedStorage( auto nested_snapshot = nested_storage->getStorageSnapshot(nested_metadata, context); nested_storage->read(query_plan, require_columns_name, nested_snapshot, query_info, context, processed_stage, max_block_size, num_streams); + + if (!query_plan.isInitialized()) + { + InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, nested_header, query_info, context); + return; + } + query_plan.addTableLock(lock); query_plan.addStorageHolder(nested_storage);