diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 341d8b3f768..e99be7a1204 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -180,6 +180,13 @@ public: return nextAssumeLocked(); } + size_t objectsCount() + { + assert(outcome_future.valid()); + first_outcome = outcome_future.get(); + return first_outcome->GetResult().GetContents().size(); + } + ~Impl() { list_objects_pool.wait(); @@ -225,8 +232,17 @@ private: { buffer.clear(); - assert(outcome_future.valid()); - auto outcome = outcome_future.get(); + ListObjectsOutcome outcome; + if (unlikely(first_outcome)) + { + outcome = std::move(*first_outcome); + first_outcome = std::nullopt; + } + else + { + assert(outcome_future.valid()); + outcome = outcome_future.get(); + } if (!outcome.IsSuccess()) { @@ -343,6 +359,7 @@ private: ThreadPool list_objects_pool; ThreadPoolCallbackRunner list_objects_scheduler; std::future outcome_future; + std::optional first_outcome; /// the result will be set by `estimatedKeysCount` std::function file_progress_callback; }; @@ -364,6 +381,11 @@ StorageS3Source::KeyWithInfo StorageS3Source::DisclosedGlobIterator::next() return pimpl->next(); } +size_t StorageS3Source::DisclosedGlobIterator::estimatedKeysCount() +{ + return pimpl->objectsCount(); +} + class StorageS3Source::KeysIterator::Impl : WithContext { public: @@ -425,6 +447,11 @@ public: return {key, info}; } + size_t objectsCount() + { + return keys.size(); + } + private: Strings keys; std::atomic_size_t index = 0; @@ -459,6 +486,43 @@ StorageS3Source::KeyWithInfo StorageS3Source::KeysIterator::next() return pimpl->next(); } +size_t StorageS3Source::KeysIterator::estimatedKeysCount() +{ + return pimpl->objectsCount(); +} + +StorageS3Source::ReadTaskIterator::ReadTaskIterator( + const DB::ReadTaskCallback & callback_, + const size_t max_threads_count) + : callback(callback_) +{ + ThreadPool pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, max_threads_count); + auto pool_scheduler = threadPoolCallbackRunner(pool, "ReadTaskIteratorPrefetch"); + + std::vector> keys; + for (size_t i = 0; i < max_threads_count; ++i) + keys.push_back(pool_scheduler([this] { return callback(); }, Priority{})); + + pool.wait(); + buffer.reserve(max_threads_count); + for (auto & key_future : keys) + buffer.emplace_back(key_future.get(), std::nullopt); +} + +StorageS3Source::KeyWithInfo StorageS3Source::ReadTaskIterator::next() +{ + size_t current_index = index.fetch_add(1, std::memory_order_relaxed); + if (current_index >= buffer.size()) + return {callback(), {}}; + + return buffer[current_index]; +} + +size_t StorageS3Source::ReadTaskIterator::estimatedKeysCount() +{ + return buffer.size(); +} + StorageS3Source::StorageS3Source( const ReadFromFormatInfo & info, const String & format_, @@ -965,7 +1029,7 @@ std::shared_ptr StorageS3::createFileIterator( { if (distributed_processing) { - return std::make_shared(local_context->getReadTaskCallback()); + return std::make_shared(local_context->getReadTaskCallback(), local_context->getSettingsRef().max_threads); } else if (configuration.withGlobs()) { @@ -1017,6 +1081,9 @@ Pipe StorageS3::read( std::shared_ptr iterator_wrapper = createFileIterator( query_configuration, distributed_processing, local_context, query_info.query, virtual_columns, nullptr, local_context->getFileProgressCallback()); + size_t estimated_keys_count = iterator_wrapper->estimatedKeysCount(); + num_streams = std::min(num_streams, estimated_keys_count); + auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals()); bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty()) && local_context->getSettingsRef().optimize_count_from_files; @@ -1024,6 +1091,7 @@ Pipe StorageS3::read( const size_t max_threads = local_context->getSettingsRef().max_threads; const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / num_streams); + pipes.reserve(num_streams); for (size_t i = 0; i < num_streams; ++i) { pipes.emplace_back(std::make_shared( diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index ee03b9f18c2..f0315244088 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -60,6 +60,10 @@ public: virtual ~IIterator() = default; virtual KeyWithInfo next() = 0; + /// Estimates how many streams we need to process all files. + /// If keys count >= max_threads_count, the returned number may not represent the actual number of the keys. + virtual size_t estimatedKeysCount() = 0; + KeyWithInfo operator ()() { return next(); } }; @@ -77,6 +81,7 @@ public: std::function progress_callback_ = {}); KeyWithInfo next() override; + size_t estimatedKeysCount() override; private: class Impl; @@ -100,6 +105,7 @@ public: std::function progress_callback_ = {}); KeyWithInfo next() override; + size_t estimatedKeysCount() override; private: class Impl; @@ -110,11 +116,15 @@ public: class ReadTaskIterator : public IIterator { public: - explicit ReadTaskIterator(const ReadTaskCallback & callback_) : callback(callback_) {} + explicit ReadTaskIterator(const ReadTaskCallback & callback_, const size_t max_threads_count); - KeyWithInfo next() override { return {callback(), {}}; } + KeyWithInfo next() override; + size_t estimatedKeysCount() override; private: + KeysWithInfo buffer; + std::atomic_size_t index = 0; + ReadTaskCallback callback; };