Context locks small fixes

This commit is contained in:
Maksim Kita 2023-10-08 19:55:45 +03:00
parent 32a77ca1eb
commit f0643a2311

View File

@ -274,9 +274,13 @@ struct ContextSharedPart : boost::noncopyable
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
OnceFlag buffer_flush_schedule_pool_initializer;
mutable std::unique_ptr<BackgroundSchedulePool> buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables.
OnceFlag schedule_pool_initializer;
mutable std::unique_ptr<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
OnceFlag distributed_schedule_pool_initializer;
mutable std::unique_ptr<BackgroundSchedulePool> distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
OnceFlag message_broker_schedule_pool_initializer;
mutable std::unique_ptr<BackgroundSchedulePool> message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used for message brokers, like RabbitMQ and Kafka)
mutable OnceFlag readers_initializer;
@ -2068,10 +2072,8 @@ void Context::setCurrentDatabaseWithLock(const String & name, const std::unique_
void Context::setCurrentDatabase(const String & name)
{
DatabaseCatalog::instance().assertDatabaseExists(name);
auto lock = getLocalLock();
current_database = name;
need_recalculate_access = true;
setCurrentDatabaseWithLock(name, lock);
}
void Context::setCurrentQueryId(const String & query_id)
@ -2355,7 +2357,6 @@ void Context::loadOrReloadUserDefinedExecutableFunctions(const Poco::Util::Abstr
const IUserDefinedSQLObjectsLoader & Context::getUserDefinedSQLObjectsLoader() const
{
callOnce(shared->user_defined_sql_objects_loader_initializer, [&] {
if (!shared->user_defined_sql_objects_loader)
shared->user_defined_sql_objects_loader = createUserDefinedSQLObjectsLoader(getGlobalContext());
});
@ -2366,7 +2367,6 @@ const IUserDefinedSQLObjectsLoader & Context::getUserDefinedSQLObjectsLoader() c
IUserDefinedSQLObjectsLoader & Context::getUserDefinedSQLObjectsLoader()
{
callOnce(shared->user_defined_sql_objects_loader_initializer, [&] {
if (!shared->user_defined_sql_objects_loader)
shared->user_defined_sql_objects_loader = createUserDefinedSQLObjectsLoader(getGlobalContext());
});
@ -2729,15 +2729,13 @@ size_t Context::getPrefetchThreadpoolSize() const
BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const
{
auto lock = getLocalLock();
if (!shared->buffer_flush_schedule_pool)
{
callOnce(shared->buffer_flush_schedule_pool_initializer, [&] {
shared->buffer_flush_schedule_pool = std::make_unique<BackgroundSchedulePool>(
shared->server_settings.background_buffer_flush_schedule_pool_size,
CurrentMetrics::BackgroundBufferFlushSchedulePoolTask,
CurrentMetrics::BackgroundBufferFlushSchedulePoolSize,
"BgBufSchPool");
}
});
return *shared->buffer_flush_schedule_pool;
}
@ -2775,45 +2773,39 @@ BackgroundTaskSchedulingSettings Context::getBackgroundMoveTaskSchedulingSetting
BackgroundSchedulePool & Context::getSchedulePool() const
{
auto lock = getLocalLock();
if (!shared->schedule_pool)
{
callOnce(shared->schedule_pool_initializer, [&] {
shared->schedule_pool = std::make_unique<BackgroundSchedulePool>(
shared->server_settings.background_schedule_pool_size,
CurrentMetrics::BackgroundSchedulePoolTask,
CurrentMetrics::BackgroundSchedulePoolSize,
"BgSchPool");
}
});
return *shared->schedule_pool;
}
BackgroundSchedulePool & Context::getDistributedSchedulePool() const
{
auto lock = getLocalLock();
if (!shared->distributed_schedule_pool)
{
callOnce(shared->distributed_schedule_pool_initializer, [&] {
shared->distributed_schedule_pool = std::make_unique<BackgroundSchedulePool>(
shared->server_settings.background_distributed_schedule_pool_size,
CurrentMetrics::BackgroundDistributedSchedulePoolTask,
CurrentMetrics::BackgroundDistributedSchedulePoolSize,
"BgDistSchPool");
}
});
return *shared->distributed_schedule_pool;
}
BackgroundSchedulePool & Context::getMessageBrokerSchedulePool() const
{
auto lock = getLocalLock();
if (!shared->message_broker_schedule_pool)
{
callOnce(shared->message_broker_schedule_pool_initializer, [&] {
shared->message_broker_schedule_pool = std::make_unique<BackgroundSchedulePool>(
shared->server_settings.background_message_broker_schedule_pool_size,
CurrentMetrics::BackgroundMessageBrokerSchedulePoolTask,
CurrentMetrics::BackgroundMessageBrokerSchedulePoolSize,
"BgMBSchPool");
}
});
return *shared->message_broker_schedule_pool;
}
@ -2900,7 +2892,7 @@ bool Context::hasDistributedDDL() const
void Context::setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker)
{
auto lock = getLocalLock();
auto lock = getGlobalLock();
if (shared->ddl_worker)
throw Exception(ErrorCodes::LOGICAL_ERROR, "DDL background thread has already been initialized");
ddl_worker->startup();
@ -4099,7 +4091,6 @@ const IHostContextPtr & Context::getHostContext() const
std::shared_ptr<ActionLocksManager> Context::getActionLocksManager() const
{
callOnce(shared->action_locks_manager_initializer, [&] {
if (!shared->action_locks_manager)
shared->action_locks_manager = std::make_shared<ActionLocksManager>(shared_from_this());
});