Limit the number of in-flight tasks for loading outdated parts (#50450)

* Done

* Update programs/local/LocalServer.cpp

Co-authored-by: Alexander Tokmakov <tavplubix@clickhouse.com>

* Bump

---------

Co-authored-by: Alexander Tokmakov <tavplubix@clickhouse.com>
This commit is contained in:
Nikita Mikhaylov 2023-06-02 14:10:26 +02:00 committed by GitHub
parent 9922053419
commit 73db383727
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 4 additions and 5 deletions

View File

@ -138,7 +138,7 @@ void LocalServer::initialize(Poco::Util::Application & self)
OutdatedPartsLoadingThreadPool::initialize( OutdatedPartsLoadingThreadPool::initialize(
config().getUInt("max_outdated_parts_loading_thread_pool_size", 16), config().getUInt("max_outdated_parts_loading_thread_pool_size", 16),
0, // We don't need any threads one all the parts will be loaded 0, // We don't need any threads one all the parts will be loaded
config().getUInt("outdated_part_loading_thread_pool_queue_size", 10000)); config().getUInt("max_outdated_parts_loading_thread_pool_size", 16));
} }

View File

@ -696,7 +696,7 @@ try
OutdatedPartsLoadingThreadPool::initialize( OutdatedPartsLoadingThreadPool::initialize(
server_settings.max_outdated_parts_loading_thread_pool_size, server_settings.max_outdated_parts_loading_thread_pool_size,
0, // We don't need any threads one all the parts will be loaded 0, // We don't need any threads one all the parts will be loaded
server_settings.outdated_part_loading_thread_pool_queue_size); server_settings.max_outdated_parts_loading_thread_pool_size);
/// Initialize global local cache for remote filesystem. /// Initialize global local cache for remote filesystem.
if (config().has("local_cache_for_remote_fs")) if (config().has("local_cache_for_remote_fs"))

View File

@ -22,7 +22,6 @@ namespace DB
M(UInt64, max_io_thread_pool_free_size, 0, "Max free size for IO thread pool.", 0) \ M(UInt64, max_io_thread_pool_free_size, 0, "Max free size for IO thread pool.", 0) \
M(UInt64, io_thread_pool_queue_size, 10000, "Queue size for IO thread pool.", 0) \ M(UInt64, io_thread_pool_queue_size, 10000, "Queue size for IO thread pool.", 0) \
M(UInt64, max_outdated_parts_loading_thread_pool_size, 32, "The maximum number of threads that would be used for loading outdated data parts on startup", 0) \ M(UInt64, max_outdated_parts_loading_thread_pool_size, 32, "The maximum number of threads that would be used for loading outdated data parts on startup", 0) \
M(UInt64, outdated_part_loading_thread_pool_queue_size, 10000, "Queue size for parts loading thread pool.", 0) \
M(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited.", 0) \ M(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited.", 0) \
M(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited.", 0) \ M(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited.", 0) \
M(UInt64, max_remote_read_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for read. Zero means unlimited.", 0) \ M(UInt64, max_remote_read_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for read. Zero means unlimited.", 0) \

View File

@ -13,7 +13,7 @@ namespace DB
template <typename Result, typename Callback = std::function<Result()>> template <typename Result, typename Callback = std::function<Result()>>
using ThreadPoolCallbackRunner = std::function<std::future<Result>(Callback &&, Priority)>; using ThreadPoolCallbackRunner = std::function<std::future<Result>(Callback &&, Priority)>;
/// Creates CallbackRunner that runs every callback with 'pool->scheduleOrThrow()'. /// Creates CallbackRunner that runs every callback with 'pool->scheduleOrThrowOnError()'.
template <typename Result, typename Callback = std::function<Result()>> template <typename Result, typename Callback = std::function<Result()>>
ThreadPoolCallbackRunner<Result, Callback> threadPoolCallbackRunner(ThreadPool & pool, const std::string & thread_name) ThreadPoolCallbackRunner<Result, Callback> threadPoolCallbackRunner(ThreadPool & pool, const std::string & thread_name)
{ {
@ -44,7 +44,7 @@ ThreadPoolCallbackRunner<Result, Callback> threadPoolCallbackRunner(ThreadPool &
auto future = task->get_future(); auto future = task->get_future();
my_pool->scheduleOrThrow([my_task = std::move(task)]{ (*my_task)(); }, priority); my_pool->scheduleOrThrowOnError([my_task = std::move(task)]{ (*my_task)(); }, priority);
return future; return future;
}; };