This commit is contained in:
kssenii 2022-12-07 11:22:48 +01:00
parent 1acd014e4e
commit 88523ef0b6

View File

@ -102,6 +102,7 @@ Pipe StorageS3Cluster::read(
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.uri, query_info.query, virtual_block, context);
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String { return iterator->next(); });
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
auto interpreter = InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze());
@ -129,7 +130,6 @@ Pipe StorageS3Cluster::read(
const auto & current_settings = context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
RemoteQueryExecutor::Extension extension;
for (const auto & shard_info : cluster->getShardsInfo())
{
auto try_results = shard_info.pool->getMany(timeouts, &current_settings, PoolMode::GET_MANY);
@ -145,7 +145,7 @@ Pipe StorageS3Cluster::read(
scalars,
Tables(),
processed_stage,
extension);
RemoteQueryExecutor::Extension{.task_iterator = callback});
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, add_agg_info, false));
}