diff --git a/src/Storages/StorageS3Cluster.cpp b/src/Storages/StorageS3Cluster.cpp index 9e137c474c9..b6d714cc1cc 100644 --- a/src/Storages/StorageS3Cluster.cpp +++ b/src/Storages/StorageS3Cluster.cpp @@ -102,6 +102,7 @@ Pipe StorageS3Cluster::read( auto iterator = std::make_shared( *s3_configuration.client, s3_configuration.uri, query_info.query, virtual_block, context); + auto callback = std::make_shared>([iterator]() mutable -> String { return iterator->next(); }); /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) auto interpreter = InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze()); @@ -129,7 +130,6 @@ Pipe StorageS3Cluster::read( const auto & current_settings = context->getSettingsRef(); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); - RemoteQueryExecutor::Extension extension; for (const auto & shard_info : cluster->getShardsInfo()) { auto try_results = shard_info.pool->getMany(timeouts, ¤t_settings, PoolMode::GET_MANY); @@ -145,7 +145,7 @@ Pipe StorageS3Cluster::read( scalars, Tables(), processed_stage, - extension); + RemoteQueryExecutor::Extension{.task_iterator = callback}); pipes.emplace_back(std::make_shared(remote_query_executor, add_agg_info, false)); }