mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 02:21:59 +00:00
Adjusting num_streams
by expected work in StorageS3
This commit is contained in:
parent
5fb8e46967
commit
34aecc0bf3
@ -180,6 +180,13 @@ public:
|
|||||||
return nextAssumeLocked();
|
return nextAssumeLocked();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t objectsCount()
|
||||||
|
{
|
||||||
|
assert(outcome_future.valid());
|
||||||
|
first_outcome = outcome_future.get();
|
||||||
|
return first_outcome->GetResult().GetContents().size();
|
||||||
|
}
|
||||||
|
|
||||||
~Impl()
|
~Impl()
|
||||||
{
|
{
|
||||||
list_objects_pool.wait();
|
list_objects_pool.wait();
|
||||||
@ -225,8 +232,17 @@ private:
|
|||||||
{
|
{
|
||||||
buffer.clear();
|
buffer.clear();
|
||||||
|
|
||||||
assert(outcome_future.valid());
|
ListObjectsOutcome outcome;
|
||||||
auto outcome = outcome_future.get();
|
if (unlikely(first_outcome))
|
||||||
|
{
|
||||||
|
outcome = std::move(*first_outcome);
|
||||||
|
first_outcome = std::nullopt;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
assert(outcome_future.valid());
|
||||||
|
outcome = outcome_future.get();
|
||||||
|
}
|
||||||
|
|
||||||
if (!outcome.IsSuccess())
|
if (!outcome.IsSuccess())
|
||||||
{
|
{
|
||||||
@ -343,6 +359,7 @@ private:
|
|||||||
ThreadPool list_objects_pool;
|
ThreadPool list_objects_pool;
|
||||||
ThreadPoolCallbackRunner<ListObjectsOutcome> list_objects_scheduler;
|
ThreadPoolCallbackRunner<ListObjectsOutcome> list_objects_scheduler;
|
||||||
std::future<ListObjectsOutcome> outcome_future;
|
std::future<ListObjectsOutcome> outcome_future;
|
||||||
|
std::optional<ListObjectsOutcome> first_outcome; /// the result will be set by `estimatedKeysCount`
|
||||||
std::function<void(FileProgress)> file_progress_callback;
|
std::function<void(FileProgress)> file_progress_callback;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -364,6 +381,11 @@ StorageS3Source::KeyWithInfo StorageS3Source::DisclosedGlobIterator::next()
|
|||||||
return pimpl->next();
|
return pimpl->next();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t StorageS3Source::DisclosedGlobIterator::estimatedKeysCount()
|
||||||
|
{
|
||||||
|
return pimpl->objectsCount();
|
||||||
|
}
|
||||||
|
|
||||||
class StorageS3Source::KeysIterator::Impl : WithContext
|
class StorageS3Source::KeysIterator::Impl : WithContext
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -425,6 +447,11 @@ public:
|
|||||||
return {key, info};
|
return {key, info};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t objectsCount()
|
||||||
|
{
|
||||||
|
return keys.size();
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Strings keys;
|
Strings keys;
|
||||||
std::atomic_size_t index = 0;
|
std::atomic_size_t index = 0;
|
||||||
@ -459,6 +486,43 @@ StorageS3Source::KeyWithInfo StorageS3Source::KeysIterator::next()
|
|||||||
return pimpl->next();
|
return pimpl->next();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t StorageS3Source::KeysIterator::estimatedKeysCount()
|
||||||
|
{
|
||||||
|
return pimpl->objectsCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
StorageS3Source::ReadTaskIterator::ReadTaskIterator(
|
||||||
|
const DB::ReadTaskCallback & callback_,
|
||||||
|
const size_t max_threads_count)
|
||||||
|
: callback(callback_)
|
||||||
|
{
|
||||||
|
ThreadPool pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, max_threads_count);
|
||||||
|
auto pool_scheduler = threadPoolCallbackRunner<String>(pool, "ReadTaskIteratorPrefetch");
|
||||||
|
|
||||||
|
std::vector<std::future<String>> keys;
|
||||||
|
for (size_t i = 0; i < max_threads_count; ++i)
|
||||||
|
keys.push_back(pool_scheduler([this] { return callback(); }, Priority{}));
|
||||||
|
|
||||||
|
pool.wait();
|
||||||
|
buffer.reserve(max_threads_count);
|
||||||
|
for (auto & key_future : keys)
|
||||||
|
buffer.emplace_back(key_future.get(), std::nullopt);
|
||||||
|
}
|
||||||
|
|
||||||
|
StorageS3Source::KeyWithInfo StorageS3Source::ReadTaskIterator::next()
|
||||||
|
{
|
||||||
|
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
|
||||||
|
if (current_index >= buffer.size())
|
||||||
|
return {callback(), {}};
|
||||||
|
|
||||||
|
return buffer[current_index];
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t StorageS3Source::ReadTaskIterator::estimatedKeysCount()
|
||||||
|
{
|
||||||
|
return buffer.size();
|
||||||
|
}
|
||||||
|
|
||||||
StorageS3Source::StorageS3Source(
|
StorageS3Source::StorageS3Source(
|
||||||
const ReadFromFormatInfo & info,
|
const ReadFromFormatInfo & info,
|
||||||
const String & format_,
|
const String & format_,
|
||||||
@ -965,7 +1029,7 @@ std::shared_ptr<StorageS3Source::IIterator> StorageS3::createFileIterator(
|
|||||||
{
|
{
|
||||||
if (distributed_processing)
|
if (distributed_processing)
|
||||||
{
|
{
|
||||||
return std::make_shared<StorageS3Source::ReadTaskIterator>(local_context->getReadTaskCallback());
|
return std::make_shared<StorageS3Source::ReadTaskIterator>(local_context->getReadTaskCallback(), local_context->getSettingsRef().max_threads);
|
||||||
}
|
}
|
||||||
else if (configuration.withGlobs())
|
else if (configuration.withGlobs())
|
||||||
{
|
{
|
||||||
@ -1017,6 +1081,9 @@ Pipe StorageS3::read(
|
|||||||
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(
|
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(
|
||||||
query_configuration, distributed_processing, local_context, query_info.query, virtual_columns, nullptr, local_context->getFileProgressCallback());
|
query_configuration, distributed_processing, local_context, query_info.query, virtual_columns, nullptr, local_context->getFileProgressCallback());
|
||||||
|
|
||||||
|
size_t estimated_keys_count = iterator_wrapper->estimatedKeysCount();
|
||||||
|
num_streams = std::min(num_streams, estimated_keys_count);
|
||||||
|
|
||||||
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
|
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
|
||||||
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
|
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
|
||||||
&& local_context->getSettingsRef().optimize_count_from_files;
|
&& local_context->getSettingsRef().optimize_count_from_files;
|
||||||
@ -1024,6 +1091,7 @@ Pipe StorageS3::read(
|
|||||||
const size_t max_threads = local_context->getSettingsRef().max_threads;
|
const size_t max_threads = local_context->getSettingsRef().max_threads;
|
||||||
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / num_streams);
|
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / num_streams);
|
||||||
|
|
||||||
|
pipes.reserve(num_streams);
|
||||||
for (size_t i = 0; i < num_streams; ++i)
|
for (size_t i = 0; i < num_streams; ++i)
|
||||||
{
|
{
|
||||||
pipes.emplace_back(std::make_shared<StorageS3Source>(
|
pipes.emplace_back(std::make_shared<StorageS3Source>(
|
||||||
|
@ -60,6 +60,10 @@ public:
|
|||||||
virtual ~IIterator() = default;
|
virtual ~IIterator() = default;
|
||||||
virtual KeyWithInfo next() = 0;
|
virtual KeyWithInfo next() = 0;
|
||||||
|
|
||||||
|
/// Estimates how many streams we need to process all files.
|
||||||
|
/// If keys count >= max_threads_count, the returned number may not represent the actual number of the keys.
|
||||||
|
virtual size_t estimatedKeysCount() = 0;
|
||||||
|
|
||||||
KeyWithInfo operator ()() { return next(); }
|
KeyWithInfo operator ()() { return next(); }
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -77,6 +81,7 @@ public:
|
|||||||
std::function<void(FileProgress)> progress_callback_ = {});
|
std::function<void(FileProgress)> progress_callback_ = {});
|
||||||
|
|
||||||
KeyWithInfo next() override;
|
KeyWithInfo next() override;
|
||||||
|
size_t estimatedKeysCount() override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
class Impl;
|
class Impl;
|
||||||
@ -100,6 +105,7 @@ public:
|
|||||||
std::function<void(FileProgress)> progress_callback_ = {});
|
std::function<void(FileProgress)> progress_callback_ = {});
|
||||||
|
|
||||||
KeyWithInfo next() override;
|
KeyWithInfo next() override;
|
||||||
|
size_t estimatedKeysCount() override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
class Impl;
|
class Impl;
|
||||||
@ -110,11 +116,15 @@ public:
|
|||||||
class ReadTaskIterator : public IIterator
|
class ReadTaskIterator : public IIterator
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
explicit ReadTaskIterator(const ReadTaskCallback & callback_) : callback(callback_) {}
|
explicit ReadTaskIterator(const ReadTaskCallback & callback_, const size_t max_threads_count);
|
||||||
|
|
||||||
KeyWithInfo next() override { return {callback(), {}}; }
|
KeyWithInfo next() override;
|
||||||
|
size_t estimatedKeysCount() override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
KeysWithInfo buffer;
|
||||||
|
std::atomic_size_t index = 0;
|
||||||
|
|
||||||
ReadTaskCallback callback;
|
ReadTaskCallback callback;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user