use pool ids instead of priorities in load jobs

This commit is contained in:
serxa 2023-05-31 16:53:10 +00:00
parent e1045f76de
commit 5048050085
10 changed files with 50 additions and 31 deletions

View File

@ -1228,8 +1228,9 @@ try
auto fg_pool_size = server_settings_.async_loader_foreground_pool_size;
auto bg_pool_size = server_settings_.async_loader_background_pool_size;
global_context->getAsyncLoader().setMaxThreads(AsyncLoaderPool::Foreground, fg_pool_size ? fg_pool_size : getNumberOfPhysicalCPUCores());
global_context->getAsyncLoader().setMaxThreads(AsyncLoaderPool::Background, bg_pool_size ? bg_pool_size : getNumberOfPhysicalCPUCores());
global_context->getAsyncLoader().setMaxThreads(AsyncLoaderPoolId::Foreground, fg_pool_size ? fg_pool_size : getNumberOfPhysicalCPUCores());
global_context->getAsyncLoader().setMaxThreads(AsyncLoaderPoolId::BackgroundLoad, bg_pool_size ? bg_pool_size : getNumberOfPhysicalCPUCores());
global_context->getAsyncLoader().setMaxThreads(AsyncLoaderPoolId::BackgroundStartup, bg_pool_size ? bg_pool_size : getNumberOfPhysicalCPUCores());
if (config->has("resources"))
{

View File

@ -1,13 +0,0 @@
#pragma once
namespace DB
{
// Indices of `AsyncLoader` pools
enum AsyncLoaderPool
{
Foreground,
Background,
};
}

View File

@ -0,0 +1,24 @@
#pragma once
namespace DB
{
/// Indices of `AsyncLoader` pools.
/// Note that pools that have different `AsyncLoader` priorities do NOT run jobs simultaneously.
/// (It is possible only for the short transition period after dynamic prioritization due to waiting query).
/// So the following pools cannot be considered independent.
enum AsyncLoaderPoolId
{
/// Used for executing load jobs that are waited for by queries or in case of synchronous table loading.
Foreground,
/// Has lower priority and is used by table load jobs.
BackgroundLoad,
/// Has even lower priority and is used by startup jobs.
/// NOTE: This pool is required to begin table startup only after all tables are loaded.
/// NOTE: Which is needed to prevent heavy merges/mutations from consuming all the resources, slowing table loading down.
BackgroundStartup,
};
}

View File

@ -5,6 +5,7 @@
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromFile.h>
#include <Parsers/formatAST.h>
#include <Common/AsyncLoaderPoolId.h>
#include <Common/atomicRename.h>
#include <Common/filesystemHelpers.h>
#include <Storages/StorageMaterializedView.h>
@ -447,7 +448,7 @@ LoadTaskPtr DatabaseAtomic::startupDatabaseAsync(AsyncLoader & async_loader, Loa
std::scoped_lock lock{mutex};
auto job = makeLoadJob(
base->goals(),
DATABASE_STARTUP_PRIORITY,
AsyncLoaderPoolId::BackgroundStartup,
fmt::format("startup Atomic database {}", database_name),
[this, mode] (const LoadJobPtr &)
{

View File

@ -21,6 +21,7 @@
#include <Parsers/queryToString.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadPool.h>
#include <Common/AsyncLoaderPoolId.h>
#include <Common/escapeForFileName.h>
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
@ -169,7 +170,7 @@ LoadTaskPtr DatabaseOrdinary::loadTableFromMetadataAsync(
std::scoped_lock lock(mutex);
auto job = makeLoadJob(
std::move(load_after),
TABLE_LOAD_PRIORITY,
AsyncLoaderPoolId::BackgroundLoad,
fmt::format("load table {}", name.getFullName()),
[this, local_context, file_path, name, ast, mode] (const LoadJobPtr &)
{
@ -196,7 +197,7 @@ LoadTaskPtr DatabaseOrdinary::startupTableAsync(
auto job = makeLoadJob(
std::move(startup_after),
TABLE_STARTUP_PRIORITY,
AsyncLoaderPoolId::BackgroundStartup,
fmt::format("startup table {}", name.getFullName()),
[this, name] (const LoadJobPtr &)
{
@ -225,7 +226,7 @@ LoadTaskPtr DatabaseOrdinary::startupDatabaseAsync(
// NOTE: this task is empty, but it is required for correct dependency handling (startup should be done after tables loading)
auto job = makeLoadJob(
std::move(startup_after),
DATABASE_STARTUP_PRIORITY,
AsyncLoaderPoolId::BackgroundStartup,
fmt::format("startup Ordinary database {}", database_name));
return makeLoadTask(async_loader, {job});
}

View File

@ -12,6 +12,7 @@
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/AsyncLoaderPoolId.h>
#include <Databases/DatabaseReplicated.h>
#include <Databases/DatabaseReplicatedWorker.h>
#include <Databases/DDLDependencyVisitor.h>
@ -506,7 +507,7 @@ LoadTaskPtr DatabaseReplicated::startupDatabaseAsync(AsyncLoader & async_loader,
std::scoped_lock lock{mutex};
auto job = makeLoadJob(
base->goals(),
DATABASE_STARTUP_PRIORITY,
AsyncLoaderPoolId::BackgroundStartup,
fmt::format("startup Replicated database {}", database_name),
[this] (const LoadJobPtr &)
{

View File

@ -42,11 +42,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
static constexpr auto TABLE_LOAD_PRIORITY = -1; /// Initial priority for table loading jobs
static constexpr auto TABLE_STARTUP_PRIORITY = -2; /// Initial priority for table startup jobs
static constexpr auto DATABASE_STARTUP_PRIORITY = -2; /// Initial priority for database startup jobs
static constexpr auto TABLE_WAIT_PRIORITY = 0; /// Prioritize load jobs that block queries
class IDatabaseTablesIterator
{
public:

View File

@ -10,6 +10,7 @@
# include <Parsers/ASTCreateQuery.h>
# include <Storages/StorageMaterializedMySQL.h>
# include <Common/setThreadName.h>
# include <Common/AsyncLoaderPoolId.h>
# include <filesystem>
namespace fs = std::filesystem;
@ -69,7 +70,7 @@ LoadTaskPtr DatabaseMaterializedMySQL::startupDatabaseAsync(AsyncLoader & async_
std::scoped_lock lock{mutex};
auto job = makeLoadJob(
base->goals(),
DATABASE_STARTUP_PRIORITY,
AsyncLoaderPoolId::BackgroundStartup,
fmt::format("startup MaterializedMySQL database {}", database_name),
[this, mode] (const LoadJobPtr &)
{

View File

@ -7,6 +7,7 @@
#include <Common/logger_useful.h>
#include <Common/Macros.h>
#include <Common/AsyncLoaderPoolId.h>
#include <Core/UUID.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
@ -131,7 +132,7 @@ LoadTaskPtr DatabaseMaterializedPostgreSQL::startupDatabaseAsync(AsyncLoader & a
std::scoped_lock lock{mutex};
auto job = makeLoadJob(
base->goals(),
DATABASE_STARTUP_PRIORITY,
AsyncLoaderPoolId::BackgroundStartup,
fmt::format("startup MaterializedMySQL database {}", database_name),
[this] (const LoadJobPtr &)
{

View File

@ -7,7 +7,7 @@
#include <Poco/Net/IPAddress.h>
#include <Poco/Util/Application.h>
#include <Common/AsyncLoader.h>
#include <Common/AsyncLoaderPool.h>
#include <Common/AsyncLoaderPoolId.h>
#include <Common/Macros.h>
#include <Common/escapeForFileName.h>
#include <Common/EventNotifier.h>
@ -2004,20 +2004,27 @@ AsyncLoader & Context::getAsyncLoader() const
size_t bg_max_threads = shared->server_settings.async_loader_background_pool_size;
if (!shared->async_loader)
shared->async_loader = std::make_unique<AsyncLoader>({
// IMPORTANT: Pool declaration order should match the order in `AsyncLoaderPool.h` to get the indices right.
{
// IMPORTANT: Pool declaration order should match the order in `AsyncLoaderPoolId.h` to get the indices right.
{ // AsyncLoaderPoolId::Foreground
"FgLoad",
CurrentMetrics::AsyncLoaderForegroundThreads,
CurrentMetrics::AsyncLoaderForegroundThreadsActive,
.max_threads = fg_max_threads ? fg_max_threads : getNumberOfPhysicalCPUCores(),
.priority{0}
},
{
{ // AsyncLoaderPoolId::BackgroundLoad
"BgLoad",
CurrentMetrics::AsyncLoaderBackgroundThreads,
CurrentMetrics::AsyncLoaderBackgroundThreadsActive,
.max_threads = bg_max_threads ? bg_max_threads : getNumberOfPhysicalCPUCores(),
.priority{1}
},
{ // AsyncLoaderPoolId::BackgroundStartup
"BgStartup",
CurrentMetrics::AsyncLoaderBackgroundThreads,
CurrentMetrics::AsyncLoaderBackgroundThreadsActive,
.max_threads = bg_max_threads ? bg_max_threads : getNumberOfPhysicalCPUCores(),
.priority{2}
}
},
/* log_failures = */ true,