rework AsyncLoaderPoolId.h to match settings and metrics

This commit is contained in:
serxa 2023-11-06 13:08:27 +00:00
parent 6babc8e934
commit d50747847d
13 changed files with 80 additions and 72 deletions

View File

@ -23,7 +23,7 @@
#include <Common/scope_guard_safe.h>
#include <Interpreters/Session.h>
#include <Access/AccessControl.h>
#include <Common/AsyncLoaderPoolId.h>
#include <Common/PoolId.h>
#include <Common/Exception.h>
#include <Common/Macros.h>
#include <Common/Config/ConfigProcessor.h>
@ -758,7 +758,7 @@ void LocalServer::processConfig()
attachSystemTablesLocal(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA));
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
waitLoad(AsyncLoaderPoolId::Foreground, startup_system_tasks);
waitLoad(TablesLoaderForegroundPoolId, startup_system_tasks);
if (!config().has("only-system-tables"))
{

View File

@ -20,7 +20,7 @@
#include <base/coverage.h>
#include <base/getFQDNOrHostName.h>
#include <base/safeExit.h>
#include <Common/AsyncLoaderPoolId.h>
#include <Common/PoolId.h>
#include <Common/MemoryTracker.h>
#include <Common/ClickHouseRevision.h>
#include <Common/DNSResolver.h>
@ -1319,9 +1319,9 @@ try
global_context->getMessageBrokerSchedulePool().increaseThreadsCount(server_settings_.background_message_broker_schedule_pool_size);
global_context->getDistributedSchedulePool().increaseThreadsCount(server_settings_.background_distributed_schedule_pool_size);
global_context->getAsyncLoader().setMaxThreads(AsyncLoaderPoolId::Foreground, server_settings_.tables_loader_foreground_pool_size);
global_context->getAsyncLoader().setMaxThreads(AsyncLoaderPoolId::BackgroundLoad, server_settings_.tables_loader_background_pool_size);
global_context->getAsyncLoader().setMaxThreads(AsyncLoaderPoolId::BackgroundStartup, server_settings_.tables_loader_background_pool_size);
global_context->getAsyncLoader().setMaxThreads(TablesLoaderForegroundPoolId, server_settings_.tables_loader_foreground_pool_size);
global_context->getAsyncLoader().setMaxThreads(TablesLoaderBackgroundLoadPoolId, server_settings_.tables_loader_background_pool_size);
global_context->getAsyncLoader().setMaxThreads(TablesLoaderBackgroundStartupPoolId, server_settings_.tables_loader_background_pool_size);
getIOThreadPool().reloadConfiguration(
server_settings.max_io_thread_pool_size,
@ -1668,7 +1668,7 @@ try
/// This has to be done before the initialization of system logs,
/// otherwise there is a race condition between the system database initialization
/// and creation of new tables in the database.
waitLoad(AsyncLoaderPoolId::Foreground, system_startup_tasks);
waitLoad(TablesLoaderForegroundPoolId, system_startup_tasks);
/// After attaching system databases we can initialize system log.
global_context->initializeSystemLogs();
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();

View File

@ -1,24 +0,0 @@
#pragma once
namespace DB
{
/// Indices of `AsyncLoader` pools.
/// Note that pools that have different `AsyncLoader` priorities do NOT run jobs simultaneously.
/// (It is possible only for the short transition period after dynamic prioritization due to waiting query).
/// So the following pools cannot be considered independent.
enum AsyncLoaderPoolId
{
/// Used for executing load jobs that are waited for by queries or in case of synchronous table loading.
Foreground,
/// Has lower priority and is used by table load jobs.
BackgroundLoad,
/// Has even lower priority and is used by startup jobs.
/// NOTE: This pool is required to begin table startup only after all tables are loaded.
/// NOTE: Which is needed to prevent heavy merges/mutations from consuming all the resources, slowing table loading down.
BackgroundStartup,
};
}

32
src/Common/PoolId.h Normal file
View File

@ -0,0 +1,32 @@
#pragma once
#include <Common/Priority.h>
namespace DB
{
/// Indices and priorities of `AsyncLoader` pools.
/// The most important difference from regular ThreadPools is priorities of pools:
/// * Pools that have different priorities do NOT run jobs simultaneously (with small exception due to dynamic prioritization).
/// * Pools with lower priority wait for all jobs in higher priority pools to be done.
/// Note that pools also have different configurable sizes not listed here.
/// WARNING: `*PoolId` values must be unique and sequential w/o gaps.
/// Used for executing load jobs that are waited for by queries or in case of synchronous table loading.
constexpr size_t TablesLoaderForegroundPoolId = 0;
constexpr Priority TablesLoaderForegroundPriority{0};
/// Has lower priority and is used by table load jobs.
constexpr size_t TablesLoaderBackgroundLoadPoolId = 1;
constexpr Priority TablesLoaderBackgroundLoadPriority{1};
/// Has even lower priority and is used by startup jobs.
/// NOTE: This pool is required to begin table startup only after all tables are loaded.
/// NOTE: Which is needed to prevent heavy merges/mutations from consuming all the resources, slowing table loading down.
constexpr size_t TablesLoaderBackgroundStartupPoolId = 2;
constexpr Priority TablesLoaderBackgroundStartupPriority{2};
}

View File

@ -5,7 +5,7 @@
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromFile.h>
#include <Parsers/formatAST.h>
#include <Common/AsyncLoaderPoolId.h>
#include <Common/PoolId.h>
#include <Common/atomicRename.h>
#include <Common/filesystemHelpers.h>
#include <Storages/StorageMaterializedView.h>
@ -451,7 +451,7 @@ LoadTaskPtr DatabaseAtomic::startupDatabaseAsync(AsyncLoader & async_loader, Loa
auto base = DatabaseOrdinary::startupDatabaseAsync(async_loader, std::move(startup_after), mode);
auto job = makeLoadJob(
base->goals(),
AsyncLoaderPoolId::BackgroundStartup,
TablesLoaderBackgroundStartupPoolId,
fmt::format("startup Atomic database {}", getDatabaseName()),
[this, mode] (AsyncLoader &, const LoadJobPtr &)
{
@ -473,7 +473,7 @@ LoadTaskPtr DatabaseAtomic::startupDatabaseAsync(AsyncLoader & async_loader, Loa
void DatabaseAtomic::waitDatabaseStarted() const
{
if (startup_atomic_database_task)
waitLoad(currentPoolOr(AsyncLoaderPoolId::Foreground), startup_atomic_database_task);
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), startup_atomic_database_task);
}
void DatabaseAtomic::tryCreateSymlink(const String & table_name, const String & actual_data_path, bool if_data_path_exist)

View File

@ -22,7 +22,7 @@
#include <Parsers/queryToString.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadPool.h>
#include <Common/AsyncLoaderPoolId.h>
#include <Common/PoolId.h>
#include <Common/escapeForFileName.h>
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
@ -171,7 +171,7 @@ LoadTaskPtr DatabaseOrdinary::loadTableFromMetadataAsync(
std::scoped_lock lock(mutex);
auto job = makeLoadJob(
std::move(load_after),
AsyncLoaderPoolId::BackgroundLoad,
TablesLoaderBackgroundLoadPoolId,
fmt::format("load table {}", name.getFullName()),
[this, local_context, file_path, name, ast, mode] (AsyncLoader &, const LoadJobPtr &)
{
@ -198,7 +198,7 @@ LoadTaskPtr DatabaseOrdinary::startupTableAsync(
auto job = makeLoadJob(
std::move(startup_after),
AsyncLoaderPoolId::BackgroundStartup,
TablesLoaderBackgroundStartupPoolId,
fmt::format("startup table {}", name.getFullName()),
[this, name] (AsyncLoader &, const LoadJobPtr &)
{
@ -226,7 +226,7 @@ LoadTaskPtr DatabaseOrdinary::startupDatabaseAsync(
// NOTE: this task is empty, but it is required for correct dependency handling (startup should be done after tables loading)
auto job = makeLoadJob(
std::move(startup_after),
AsyncLoaderPoolId::BackgroundStartup,
TablesLoaderBackgroundStartupPoolId,
fmt::format("startup Ordinary database {}", getDatabaseName()));
return startup_database_task = makeLoadTask(async_loader, {job});
}
@ -242,14 +242,14 @@ void DatabaseOrdinary::waitTableStarted(const String & name) const
}
if (task)
waitLoad(currentPoolOr(AsyncLoaderPoolId::Foreground), task);
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), task);
}
void DatabaseOrdinary::waitDatabaseStarted() const
{
/// Prioritize load and startup of all tables and database itself and wait for them synchronously
if (startup_database_task)
waitLoad(currentPoolOr(AsyncLoaderPoolId::Foreground), startup_database_task);
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), startup_database_task);
}
DatabaseTablesIteratorPtr DatabaseOrdinary::getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const

View File

@ -12,7 +12,7 @@
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/AsyncLoaderPoolId.h>
#include <Common/PoolId.h>
#include <Databases/DatabaseReplicated.h>
#include <Databases/DatabaseReplicatedWorker.h>
#include <Databases/DDLDependencyVisitor.h>
@ -543,7 +543,7 @@ LoadTaskPtr DatabaseReplicated::startupDatabaseAsync(AsyncLoader & async_loader,
auto base = DatabaseAtomic::startupDatabaseAsync(async_loader, std::move(startup_after), mode);
auto job = makeLoadJob(
base->goals(),
AsyncLoaderPoolId::BackgroundStartup,
TablesLoaderBackgroundStartupPoolId,
fmt::format("startup Replicated database {}", getDatabaseName()),
[this] (AsyncLoader &, const LoadJobPtr &)
{
@ -574,7 +574,7 @@ LoadTaskPtr DatabaseReplicated::startupDatabaseAsync(AsyncLoader & async_loader,
void DatabaseReplicated::waitDatabaseStarted() const
{
if (startup_replicated_database_task)
waitLoad(currentPoolOr(AsyncLoaderPoolId::Foreground), startup_replicated_database_task);
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), startup_replicated_database_task);
}
bool DatabaseReplicated::checkDigestValid(const ContextPtr & local_context, bool debug_check /* = true */) const

View File

@ -8,7 +8,7 @@
#include <base/types.h>
#include <Common/Exception.h>
#include <Common/AsyncLoader.h>
#include <Common/AsyncLoaderPoolId.h>
#include <Common/PoolId.h>
#include <Common/ThreadPool_fwd.h>
#include <QueryPipeline/BlockIO.h>
@ -112,7 +112,7 @@ public:
const StoragePtr & table() const override
{
if (auto task = tasks.find(it->first); task != tasks.end())
waitLoad(currentPoolOr(AsyncLoaderPoolId::Foreground), task->second);
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), task->second);
return it->second;
}

View File

@ -10,7 +10,7 @@
# include <Parsers/ASTCreateQuery.h>
# include <Storages/StorageMaterializedMySQL.h>
# include <Common/setThreadName.h>
# include <Common/AsyncLoaderPoolId.h>
# include <Common/PoolId.h>
# include <filesystem>
namespace fs = std::filesystem;
@ -69,7 +69,7 @@ LoadTaskPtr DatabaseMaterializedMySQL::startupDatabaseAsync(AsyncLoader & async_
auto base = DatabaseAtomic::startupDatabaseAsync(async_loader, std::move(startup_after), mode);
auto job = makeLoadJob(
base->goals(),
AsyncLoaderPoolId::BackgroundStartup,
TablesLoaderBackgroundStartupPoolId,
fmt::format("startup MaterializedMySQL database {}", getDatabaseName()),
[this, mode] (AsyncLoader &, const LoadJobPtr &)
{
@ -86,7 +86,7 @@ LoadTaskPtr DatabaseMaterializedMySQL::startupDatabaseAsync(AsyncLoader & async_
void DatabaseMaterializedMySQL::waitDatabaseStarted() const
{
if (startup_mysql_database_task)
waitLoad(currentPoolOr(AsyncLoaderPoolId::Foreground), startup_mysql_database_task);
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), startup_mysql_database_task);
}
void DatabaseMaterializedMySQL::createTable(ContextPtr context_, const String & name, const StoragePtr & table, const ASTPtr & query)

View File

@ -7,7 +7,7 @@
#include <Common/logger_useful.h>
#include <Common/Macros.h>
#include <Common/AsyncLoaderPoolId.h>
#include <Common/PoolId.h>
#include <Core/UUID.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
@ -144,7 +144,7 @@ LoadTaskPtr DatabaseMaterializedPostgreSQL::startupDatabaseAsync(AsyncLoader & a
auto base = DatabaseAtomic::startupDatabaseAsync(async_loader, std::move(startup_after), mode);
auto job = makeLoadJob(
base->goals(),
AsyncLoaderPoolId::BackgroundStartup,
TablesLoaderBackgroundStartupPoolId,
fmt::format("startup MaterializedMySQL database {}", getDatabaseName()),
[this] (AsyncLoader &, const LoadJobPtr &)
{
@ -156,7 +156,7 @@ LoadTaskPtr DatabaseMaterializedPostgreSQL::startupDatabaseAsync(AsyncLoader & a
void DatabaseMaterializedPostgreSQL::waitDatabaseStarted() const
{
if (startup_postgresql_database_task)
waitLoad(currentPoolOr(AsyncLoaderPoolId::Foreground), startup_postgresql_database_task);
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), startup_postgresql_database_task);
}
void DatabaseMaterializedPostgreSQL::applySettingsChanges(const SettingsChanges & settings_changes, ContextPtr query_context)

View File

@ -6,7 +6,7 @@
#include <Poco/UUID.h>
#include <Poco/Util/Application.h>
#include <Common/AsyncLoader.h>
#include <Common/AsyncLoaderPoolId.h>
#include <Common/PoolId.h>
#include <Common/Macros.h>
#include <Common/EventNotifier.h>
#include <Common/Stopwatch.h>
@ -2251,27 +2251,27 @@ AsyncLoader & Context::getAsyncLoader() const
{
callOnce(shared->async_loader_initialized, [&] {
shared->async_loader = std::make_unique<AsyncLoader>(std::vector<AsyncLoader::PoolInitializer>{
// IMPORTANT: Pool declaration order should match the order in `AsyncLoaderPoolId.h` to get the indices right.
{ // AsyncLoaderPoolId::Foreground
// IMPORTANT: Pool declaration order should match the order in `PoolId.h` to get the indices right.
{ // TablesLoaderForegroundPoolId
"FgLoad",
CurrentMetrics::TablesLoaderForegroundThreads,
CurrentMetrics::TablesLoaderForegroundThreadsActive,
shared->server_settings.tables_loader_foreground_pool_size,
Priority{0}
TablesLoaderForegroundPriority
},
{ // AsyncLoaderPoolId::BackgroundLoad
{ // TablesLoaderBackgroundLoadPoolId
"BgLoad",
CurrentMetrics::TablesLoaderBackgroundThreads,
CurrentMetrics::TablesLoaderBackgroundThreadsActive,
shared->server_settings.tables_loader_background_pool_size,
Priority{1}
TablesLoaderBackgroundLoadPriority
},
{ // AsyncLoaderPoolId::BackgroundStartup
{ // TablesLoaderBackgroundStartupPoolId
"BgStartup",
CurrentMetrics::TablesLoaderBackgroundThreads,
CurrentMetrics::TablesLoaderBackgroundThreadsActive,
shared->server_settings.tables_loader_background_pool_size,
Priority{2}
TablesLoaderBackgroundStartupPriority
}
},
/* log_failures = */ true,
@ -2934,7 +2934,7 @@ void Context::setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker, const LoadTask
auto job = makeLoadJob(
getGoals(startup_after),
AsyncLoaderPoolId::BackgroundStartup,
TablesLoaderBackgroundStartupPoolId,
"startup ddl worker",
[this] (AsyncLoader &, const LoadJobPtr &)
{

View File

@ -9,7 +9,7 @@
#include <Common/Macros.h>
#include <Common/randomSeed.h>
#include <Common/atomicRename.h>
#include <Common/AsyncLoaderPoolId.h>
#include <Common/PoolId.h>
#include <Common/logger_useful.h>
#include <base/hex.h>
@ -339,9 +339,9 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
else
{
/// First prioritize, schedule and wait all the load table tasks
waitLoad(currentPoolOr(AsyncLoaderPoolId::Foreground), load_tasks);
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), load_tasks);
/// Only then prioritize, schedule and wait all the startup tasks
waitLoad(currentPoolOr(AsyncLoaderPoolId::Foreground), startup_tasks);
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), startup_tasks);
}
}
}

View File

@ -1,5 +1,5 @@
#include <Common/ThreadPool.h>
#include <Common/AsyncLoaderPoolId.h>
#include <Common/PoolId.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ASTCreateQuery.h>
@ -246,7 +246,7 @@ LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_data
// Note that to achieve behaviour similar to synchronous case (postponing of merges) we use priorities.
// All startup jobs are assigned to pool with lower priority than load jobs pool.
// So all tables will finish loading before the first table startup if there are no queries (or dependencies).
// Query waiting for a table boosts its priority by moving jobs into `AsyncLoaderPoolId::Foreground` pool
// Query waiting for a table boosts its priority by moving jobs into `TablesLoaderForegroundPoolId` pool
// to finish table startup faster than load of the other tables.
scheduleLoad(load_tasks);
scheduleLoad(startup_tasks);
@ -259,8 +259,8 @@ LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_data
LOG_INFO(log, "Start synchronous loading of databases");
// Note that wait implicitly calls schedule
waitLoad(AsyncLoaderPoolId::Foreground, load_tasks); // First prioritize, schedule and wait all the load table tasks
waitLoad(AsyncLoaderPoolId::Foreground, startup_tasks); // Only then prioritize, schedule and wait all the startup tasks
waitLoad(TablesLoaderForegroundPoolId, load_tasks); // First prioritize, schedule and wait all the load table tasks
waitLoad(TablesLoaderForegroundPoolId, startup_tasks); // Only then prioritize, schedule and wait all the startup tasks
return {};
}
}
@ -396,7 +396,7 @@ static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, cons
if (startup_tasks) // NOTE: only for system database
{
/// It's not quite correct to run DDL queries while database is not started up.
waitLoad(AsyncLoaderPoolId::Foreground, *startup_tasks);
waitLoad(TablesLoaderForegroundPoolId, *startup_tasks);
startup_tasks->clear();
}
@ -447,14 +447,14 @@ static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, cons
{database_name, DatabaseCatalog::instance().getDatabase(database_name)},
};
TablesLoader loader{context, databases, LoadingStrictnessLevel::FORCE_RESTORE};
waitLoad(AsyncLoaderPoolId::Foreground, loader.loadTablesAsync());
waitLoad(TablesLoaderForegroundPoolId, loader.loadTablesAsync());
/// Startup tables if they were started before conversion and detach/attach
if (startup_tasks) // NOTE: only for system database
*startup_tasks = loader.startupTablesAsync(); // We have loaded old database(s), replace tasks to startup new database
else
// An old database was already loaded, so we should load new one as well
waitLoad(AsyncLoaderPoolId::Foreground, loader.startupTablesAsync());
waitLoad(TablesLoaderForegroundPoolId, loader.startupTablesAsync());
}
catch (Exception & e)
{
@ -485,7 +485,7 @@ void convertDatabasesEnginesIfNeed(const LoadTaskPtrs & load_metadata, ContextMu
"will try to convert all Ordinary databases to Atomic");
// Wait for all table to be loaded and started
waitLoad(AsyncLoaderPoolId::Foreground, load_metadata);
waitLoad(TablesLoaderForegroundPoolId, load_metadata);
for (const auto & [name, _] : DatabaseCatalog::instance().getDatabases())
if (name != DatabaseCatalog::SYSTEM_DATABASE)
@ -509,7 +509,7 @@ LoadTaskPtrs loadMetadataSystem(ContextMutablePtr context)
};
TablesLoader loader{context, databases, LoadingStrictnessLevel::FORCE_RESTORE};
auto tasks = loader.loadTablesAsync();
waitLoad(AsyncLoaderPoolId::Foreground, tasks);
waitLoad(TablesLoaderForegroundPoolId, tasks);
/// Will startup tables in system database after all databases are loaded.
return loader.startupTablesAsync();