mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 21:42:39 +00:00
add helpers for prioritize+wait calls and fix build
This commit is contained in:
parent
5048050085
commit
303e3f00c9
@ -144,6 +144,7 @@ LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, size_t pool_id, String n
|
||||
return std::make_shared<LoadJob>(dependencies, std::move(name), pool_id, std::forward<Func>(func));
|
||||
}
|
||||
|
||||
|
||||
// Represents a logically connected set of LoadJobs required to achieve some goals (final LoadJob in the set).
|
||||
class LoadTask : private boost::noncopyable
|
||||
{
|
||||
@ -168,10 +169,11 @@ public:
|
||||
// auto load_task = loadSomethingAsync(async_loader, load_after_task.goals(), something);
|
||||
const LoadJobSet & goals() const { return goal_jobs.empty() ? jobs : goal_jobs; }
|
||||
|
||||
AsyncLoader & loader;
|
||||
|
||||
private:
|
||||
friend class AsyncLoader;
|
||||
|
||||
AsyncLoader & loader;
|
||||
LoadJobSet jobs;
|
||||
LoadJobSet goal_jobs;
|
||||
};
|
||||
@ -181,91 +183,6 @@ inline LoadTaskPtr makeLoadTask(AsyncLoader & loader, LoadJobSet && jobs, LoadJo
|
||||
return std::make_shared<LoadTask>(loader, std::move(jobs), std::move(goals));
|
||||
}
|
||||
|
||||
inline void scheduleLoad(const LoadTaskPtr & task)
|
||||
{
|
||||
task->schedule();
|
||||
}
|
||||
|
||||
inline void scheduleLoad(const LoadTaskPtrs & tasks)
|
||||
{
|
||||
for (const auto & task : tasks)
|
||||
task->schedule();
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
inline void scheduleLoadAll(Args && ... args)
|
||||
{
|
||||
(scheduleLoad(std::forward<Args>(args)), ...);
|
||||
}
|
||||
|
||||
inline void waitLoad(const LoadJobSet & jobs)
|
||||
{
|
||||
for (const auto & job : jobs)
|
||||
job->wait();
|
||||
}
|
||||
|
||||
inline void waitLoad(const LoadTaskPtr & task)
|
||||
{
|
||||
waitLoad(task->goals());
|
||||
}
|
||||
|
||||
inline void waitLoad(const LoadTaskPtrs & tasks)
|
||||
{
|
||||
for (const auto & task : tasks)
|
||||
waitLoad(task->goals());
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
inline void waitLoadAll(Args && ... args)
|
||||
{
|
||||
(waitLoad(std::forward<Args>(args)), ...);
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
inline void scheduleAndWaitLoadAll(Args && ... args)
|
||||
{
|
||||
scheduleLoadAll(std::forward<Args>(args)...);
|
||||
waitLoadAll(std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
inline LoadJobSet getGoals(const LoadTaskPtrs & tasks)
|
||||
{
|
||||
LoadJobSet result;
|
||||
for (const auto & task : tasks)
|
||||
result.insert(task->goals().begin(), task->goals().end());
|
||||
return result;
|
||||
}
|
||||
|
||||
inline LoadJobSet getGoalsOr(const LoadTaskPtrs & tasks, const LoadJobSet & alternative)
|
||||
{
|
||||
LoadJobSet result;
|
||||
for (const auto & task : tasks)
|
||||
result.insert(task->goals().begin(), task->goals().end());
|
||||
return result.empty() ? alternative : result;
|
||||
}
|
||||
|
||||
inline LoadJobSet joinJobs(const LoadJobSet & jobs1, const LoadJobSet & jobs2)
|
||||
{
|
||||
LoadJobSet result;
|
||||
if (!jobs1.empty())
|
||||
result.insert(jobs1.begin(), jobs1.end());
|
||||
if (!jobs2.empty())
|
||||
result.insert(jobs2.begin(), jobs2.end());
|
||||
return result;
|
||||
}
|
||||
|
||||
inline LoadTaskPtrs joinTasks(const LoadTaskPtrs & tasks1, const LoadTaskPtrs & tasks2)
|
||||
{
|
||||
if (tasks1.empty())
|
||||
return tasks2;
|
||||
if (tasks2.empty())
|
||||
return tasks1;
|
||||
LoadTaskPtrs result;
|
||||
result.reserve(tasks1.size() + tasks2.size());
|
||||
result.insert(result.end(), tasks1.begin(), tasks1.end());
|
||||
result.insert(result.end(), tasks2.begin(), tasks2.end());
|
||||
return result;
|
||||
}
|
||||
|
||||
// `AsyncLoader` is a scheduler for DAG of `LoadJob`s. It tracks job dependencies and priorities.
|
||||
// Basic usage example:
|
||||
@ -287,7 +204,7 @@ inline LoadTaskPtrs joinTasks(const LoadTaskPtrs & tasks1, const LoadTaskPtrs &
|
||||
// task.schedule();
|
||||
//
|
||||
// // Another thread may prioritize a job by changing its pool and wait for it:
|
||||
// async_loader->prioritize(job3, /* pool_id = */ 0); // Increase priority: 1 -> 0 (lower is better)
|
||||
// async_loader.prioritize(job3, /* pool_id = */ 0); // Increase priority: 1 -> 0 (lower is better)
|
||||
// job3->wait(); // Blocks until job completion or cancellation and rethrow an exception (if any)
|
||||
//
|
||||
// Every job has a pool associated with it. AsyncLoader starts every job in its thread pool.
|
||||
@ -463,4 +380,123 @@ private:
|
||||
std::chrono::system_clock::time_point busy_period_start_time;
|
||||
};
|
||||
|
||||
|
||||
inline void scheduleLoad(const LoadTaskPtr & task)
|
||||
{
|
||||
task->schedule();
|
||||
}
|
||||
|
||||
inline void scheduleLoad(const LoadTaskPtrs & tasks)
|
||||
{
|
||||
for (const auto & task : tasks)
|
||||
task->schedule();
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
inline void scheduleLoadAll(Args && ... args)
|
||||
{
|
||||
(scheduleLoad(std::forward<Args>(args)), ...);
|
||||
}
|
||||
|
||||
inline void waitLoad(const LoadJobSet & jobs)
|
||||
{
|
||||
for (const auto & job : jobs)
|
||||
job->wait();
|
||||
}
|
||||
|
||||
inline void waitLoad(const LoadTaskPtr & task)
|
||||
{
|
||||
waitLoad(task->goals());
|
||||
}
|
||||
|
||||
inline void waitLoad(const LoadTaskPtrs & tasks)
|
||||
{
|
||||
for (const auto & task : tasks)
|
||||
waitLoad(task->goals());
|
||||
}
|
||||
|
||||
inline void waitLoad(AsyncLoader & async_loader, size_t pool_id, const LoadJobSet & jobs)
|
||||
{
|
||||
for (const auto & job : jobs)
|
||||
async_loader.prioritize(job, pool_id);
|
||||
for (const auto & job : jobs)
|
||||
job->wait();
|
||||
}
|
||||
|
||||
inline void waitLoad(size_t pool_id, const LoadTaskPtr & task)
|
||||
{
|
||||
waitLoad(task->loader, pool_id, task->goals());
|
||||
}
|
||||
|
||||
inline void waitLoad(size_t pool_id, const LoadTaskPtrs & tasks)
|
||||
{
|
||||
for (const auto & task : tasks)
|
||||
waitLoad(task->loader, pool_id, task->goals());
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
inline void waitLoadAll(Args && ... args)
|
||||
{
|
||||
(waitLoad(std::forward<Args>(args)), ...);
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
inline void waitLoadAllIn(size_t pool_id, Args && ... args)
|
||||
{
|
||||
(waitLoad(pool_id, std::forward<Args>(args)), ...);
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
inline void scheduleAndWaitLoadAll(Args && ... args)
|
||||
{
|
||||
scheduleLoadAll(std::forward<Args>(args)...);
|
||||
waitLoadAll(std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
inline void scheduleAndWaitLoadAllIn(size_t pool_id, Args && ... args)
|
||||
{
|
||||
scheduleLoadAll(std::forward<Args>(args)...);
|
||||
waitLoadAllIn(pool_id, std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
inline LoadJobSet getGoals(const LoadTaskPtrs & tasks)
|
||||
{
|
||||
LoadJobSet result;
|
||||
for (const auto & task : tasks)
|
||||
result.insert(task->goals().begin(), task->goals().end());
|
||||
return result;
|
||||
}
|
||||
|
||||
inline LoadJobSet getGoalsOr(const LoadTaskPtrs & tasks, const LoadJobSet & alternative)
|
||||
{
|
||||
LoadJobSet result;
|
||||
for (const auto & task : tasks)
|
||||
result.insert(task->goals().begin(), task->goals().end());
|
||||
return result.empty() ? alternative : result;
|
||||
}
|
||||
|
||||
inline LoadJobSet joinJobs(const LoadJobSet & jobs1, const LoadJobSet & jobs2)
|
||||
{
|
||||
LoadJobSet result;
|
||||
if (!jobs1.empty())
|
||||
result.insert(jobs1.begin(), jobs1.end());
|
||||
if (!jobs2.empty())
|
||||
result.insert(jobs2.begin(), jobs2.end());
|
||||
return result;
|
||||
}
|
||||
|
||||
inline LoadTaskPtrs joinTasks(const LoadTaskPtrs & tasks1, const LoadTaskPtrs & tasks2)
|
||||
{
|
||||
if (tasks1.empty())
|
||||
return tasks2;
|
||||
if (tasks2.empty())
|
||||
return tasks1;
|
||||
LoadTaskPtrs result;
|
||||
result.reserve(tasks1.size() + tasks2.size());
|
||||
result.insert(result.end(), tasks1.begin(), tasks1.end());
|
||||
result.insert(result.end(), tasks2.begin(), tasks2.end());
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -231,12 +231,11 @@ LoadTaskPtr DatabaseOrdinary::startupDatabaseAsync(
|
||||
return makeLoadTask(async_loader, {job});
|
||||
}
|
||||
|
||||
DatabaseTablesIteratorPtr DatabaseOrdinary::getTablesIterator(ContextPtr context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
|
||||
{
|
||||
// TODO(serxa): implement
|
||||
}
|
||||
// TODO(serxa): implement
|
||||
// DatabaseTablesIteratorPtr DatabaseOrdinary::getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
|
||||
// }
|
||||
|
||||
StoragePtr DatabaseOrdinary::tryGetTable(const String & name, ContextPtr context) const
|
||||
StoragePtr DatabaseOrdinary::tryGetTable(const String & name, ContextPtr local_context) const
|
||||
{
|
||||
const LoadTaskPtr * startup_task = nullptr;
|
||||
{
|
||||
@ -245,11 +244,11 @@ StoragePtr DatabaseOrdinary::tryGetTable(const String & name, ContextPtr context
|
||||
startup_task = &it->second;
|
||||
}
|
||||
|
||||
// TODO(serxa): prioritize always? what priority should be used?
|
||||
// Prioritize jobs (load and startup) to be executed in foreground pool and wait for them synchronously
|
||||
if (startup_task)
|
||||
waitLoad(*startup_task);
|
||||
waitLoad(AsyncLoaderPoolId::Foreground, *startup_task);
|
||||
|
||||
return DatabaseOnDisk::tryGetTable(name, context);
|
||||
return DatabaseOnDisk::tryGetTable(name, local_context);
|
||||
}
|
||||
|
||||
void DatabaseOrdinary::alterTable(ContextPtr local_context, const StorageID & table_id, const StorageInMemoryMetadata & metadata)
|
||||
|
@ -51,9 +51,10 @@ public:
|
||||
|
||||
LoadTaskPtr startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode) override;
|
||||
|
||||
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;
|
||||
// TODO(serxa): implement
|
||||
// DatabaseTablesIteratorPtr getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;
|
||||
|
||||
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;
|
||||
StoragePtr tryGetTable(const String & name, ContextPtr local_context) const override;
|
||||
|
||||
void alterTable(
|
||||
ContextPtr context,
|
||||
|
@ -2003,28 +2003,28 @@ AsyncLoader & Context::getAsyncLoader() const
|
||||
size_t fg_max_threads = shared->server_settings.async_loader_foreground_pool_size;
|
||||
size_t bg_max_threads = shared->server_settings.async_loader_background_pool_size;
|
||||
if (!shared->async_loader)
|
||||
shared->async_loader = std::make_unique<AsyncLoader>({
|
||||
shared->async_loader = std::make_unique<AsyncLoader>(std::vector<AsyncLoader::PoolInitializer>{
|
||||
// IMPORTANT: Pool declaration order should match the order in `AsyncLoaderPoolId.h` to get the indices right.
|
||||
{ // AsyncLoaderPoolId::Foreground
|
||||
"FgLoad",
|
||||
CurrentMetrics::AsyncLoaderForegroundThreads,
|
||||
CurrentMetrics::AsyncLoaderForegroundThreadsActive,
|
||||
.max_threads = fg_max_threads ? fg_max_threads : getNumberOfPhysicalCPUCores(),
|
||||
.priority{0}
|
||||
fg_max_threads ? fg_max_threads : getNumberOfPhysicalCPUCores(),
|
||||
Priority{0}
|
||||
},
|
||||
{ // AsyncLoaderPoolId::BackgroundLoad
|
||||
"BgLoad",
|
||||
CurrentMetrics::AsyncLoaderBackgroundThreads,
|
||||
CurrentMetrics::AsyncLoaderBackgroundThreadsActive,
|
||||
.max_threads = bg_max_threads ? bg_max_threads : getNumberOfPhysicalCPUCores(),
|
||||
.priority{1}
|
||||
bg_max_threads ? bg_max_threads : getNumberOfPhysicalCPUCores(),
|
||||
Priority{1}
|
||||
},
|
||||
{ // AsyncLoaderPoolId::BackgroundStartup
|
||||
"BgStartup",
|
||||
CurrentMetrics::AsyncLoaderBackgroundThreads,
|
||||
CurrentMetrics::AsyncLoaderBackgroundThreadsActive,
|
||||
.max_threads = bg_max_threads ? bg_max_threads : getNumberOfPhysicalCPUCores(),
|
||||
.priority{2}
|
||||
bg_max_threads ? bg_max_threads : getNumberOfPhysicalCPUCores(),
|
||||
Priority{2}
|
||||
}
|
||||
},
|
||||
/* log_failures = */ true,
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <Common/Macros.h>
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Common/atomicRename.h>
|
||||
#include <Common/AsyncLoaderPoolId.h>
|
||||
#include <base/hex.h>
|
||||
|
||||
#include <Core/Defines.h>
|
||||
@ -323,8 +324,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
|
||||
{
|
||||
/// We use global context here, because storages lifetime is bigger than query context lifetime
|
||||
TablesLoader loader{getContext()->getGlobalContext(), {{database_name, database}}, mode};
|
||||
scheduleAndWaitLoad(loader.loadTablesAsync());
|
||||
scheduleAndWaitLoad(loader.startupTablesAsync());
|
||||
scheduleAndWaitLoadAllIn(AsyncLoaderPoolId::Foreground, loader.loadTablesAsync(), loader.startupTablesAsync());
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
|
@ -1,4 +1,5 @@
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/AsyncLoaderPoolId.h>
|
||||
|
||||
#include <Parsers/ParserCreateQuery.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
@ -240,20 +241,20 @@ LoadTaskPtrs loadMetadata(ContextMutablePtr context, const String & default_data
|
||||
if (!async_load_databases)
|
||||
{
|
||||
// First, load all tables
|
||||
scheduleAndWaitLoad(load_tasks);
|
||||
scheduleAndWaitLoadAllIn(AsyncLoaderPoolId::Foreground, load_tasks);
|
||||
|
||||
// Then, startup all tables. This is done to postpone merges and mutations
|
||||
// Note that with async loader it would be a total barrier, which is unacceptable for the purpose of waiting.
|
||||
scheduleAndWaitLoad(startup_tasks);
|
||||
scheduleAndWaitLoadAllIn(AsyncLoaderPoolId::Foreground, startup_tasks);
|
||||
return {};
|
||||
}
|
||||
else
|
||||
{
|
||||
// Schedule all the jobs.
|
||||
// Note that to achieve behaviour similar to synchronous case (postponing of merges) we use priorities.
|
||||
// All startup jobs have lower priorities than load jobs.
|
||||
// So _almost_ all tables will finish loading before the first table startup it there are no queries.
|
||||
// Query waiting for a table boost its priority to finish table startup faster than load of the other tables.
|
||||
// All startup jobs are assigned to pool with lower priority than load jobs pool.
|
||||
// So all tables will finish loading before the first table startup if there are no queries.
|
||||
// Query waiting for a table boosts its priority by moving jobs into `AsyncLoaderPoolId::Foreground` pool
|
||||
// to finish table startup faster than load of the other tables.
|
||||
scheduleLoadAll(load_tasks, startup_tasks);
|
||||
|
||||
// Do NOT wait, just return tasks for continuation or later wait.
|
||||
@ -389,7 +390,7 @@ static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, cons
|
||||
if (startup_tasks) // NOTE: only for system database
|
||||
{
|
||||
/// It's not quite correct to run DDL queries while database is not started up.
|
||||
scheduleAndWaitLoad(*startup_tasks);
|
||||
scheduleAndWaitLoadAllIn(AsyncLoaderPoolId::Foreground, *startup_tasks);
|
||||
startup_tasks->clear();
|
||||
}
|
||||
|
||||
@ -440,13 +441,13 @@ static void maybeConvertOrdinaryDatabaseToAtomic(ContextMutablePtr context, cons
|
||||
{database_name, DatabaseCatalog::instance().getDatabase(database_name)},
|
||||
};
|
||||
TablesLoader loader{context, databases, LoadingStrictnessLevel::FORCE_RESTORE};
|
||||
scheduleAndWaitLoad(loader.loadTablesAsync());
|
||||
scheduleAndWaitLoadAllIn(AsyncLoaderPoolId::Foreground, loader.loadTablesAsync());
|
||||
|
||||
/// Startup tables if they were started before conversion and detach/attach
|
||||
if (startup_tasks) // NOTE: only for system database
|
||||
*startup_tasks = loader.startupTablesAsync(); // We have loaded old database(s), replace tasks to startup new database
|
||||
else
|
||||
scheduleAndWaitLoad(loader.startupTablesAsync()); // An old database was already loaded, so we should load new one as well
|
||||
scheduleAndWaitLoadAllIn(AsyncLoaderPoolId::Foreground, loader.startupTablesAsync()); // An old database was already loaded, so we should load new one as well
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
@ -477,7 +478,7 @@ void convertDatabasesEnginesIfNeed(const LoadTaskPtrs & load_metadata, ContextMu
|
||||
"will try to convert all Ordinary databases to Atomic");
|
||||
|
||||
// Wait for all table to be loaded and started
|
||||
waitLoad(load_metadata);
|
||||
waitLoadAllIn(AsyncLoaderPoolId::Foreground, load_metadata);
|
||||
|
||||
for (const auto & [name, _] : DatabaseCatalog::instance().getDatabases())
|
||||
if (name != DatabaseCatalog::SYSTEM_DATABASE)
|
||||
@ -500,7 +501,7 @@ LoadTaskPtrs loadMetadataSystem(ContextMutablePtr context)
|
||||
{DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE, DatabaseCatalog::instance().getDatabase(DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE)},
|
||||
};
|
||||
TablesLoader loader{context, databases, LoadingStrictnessLevel::FORCE_RESTORE};
|
||||
scheduleAndWaitLoad(loader.loadTablesAsync());
|
||||
scheduleAndWaitLoadAllIn(AsyncLoaderPoolId::Foreground, loader.loadTablesAsync());
|
||||
|
||||
/// Will startup tables in system database after all databases are loaded.
|
||||
return loader.startupTablesAsync();
|
||||
|
Loading…
Reference in New Issue
Block a user