Merge pull request #35081 from azat/fix-shutdown-deadlock

Avoid possible deadlock on server shutdown
This commit is contained in:
tavplubix 2022-03-07 13:47:15 +01:00 committed by GitHub
commit 34505be1de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -179,10 +179,10 @@ struct ContextSharedPart
mutable VolumePtr backups_volume; /// Volume for all the backups.
mutable std::optional<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
mutable std::optional<ExternalDictionariesLoader> external_dictionaries_loader;
mutable std::optional<ExternalUserDefinedExecutableFunctionsLoader> external_user_defined_executable_functions_loader;
mutable std::optional<ExternalModelsLoader> external_models_loader;
mutable std::unique_ptr<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
mutable std::unique_ptr<ExternalDictionariesLoader> external_dictionaries_loader;
mutable std::unique_ptr<ExternalUserDefinedExecutableFunctionsLoader> external_user_defined_executable_functions_loader;
mutable std::unique_ptr<ExternalModelsLoader> external_models_loader;
ExternalLoaderXMLConfigRepository * external_models_config_repository = nullptr;
scope_guard models_repository_guard;
@ -214,10 +214,10 @@ struct ContextSharedPart
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
mutable std::optional<BackgroundSchedulePool> buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables.
mutable std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
mutable std::optional<BackgroundSchedulePool> distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
mutable std::optional<BackgroundSchedulePool> message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used for message brokers, like RabbitMQ and Kafka)
mutable std::unique_ptr<BackgroundSchedulePool> buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables.
mutable std::unique_ptr<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
mutable std::unique_ptr<BackgroundSchedulePool> distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
mutable std::unique_ptr<BackgroundSchedulePool> message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used for message brokers, like RabbitMQ and Kafka)
mutable ThrottlerPtr replicated_fetches_throttler; /// A server-wide throttler for replicated fetches
mutable ThrottlerPtr replicated_sends_throttler; /// A server-wide throttler for replicated sends
@ -344,12 +344,23 @@ struct ContextSharedPart
common_executor->wait();
std::unique_ptr<SystemLogs> delete_system_logs;
std::unique_ptr<EmbeddedDictionaries> delete_embedded_dictionaries;
std::unique_ptr<ExternalDictionariesLoader> delete_external_dictionaries_loader;
std::unique_ptr<ExternalUserDefinedExecutableFunctionsLoader> delete_external_user_defined_executable_functions_loader;
std::unique_ptr<ExternalModelsLoader> delete_external_models_loader;
std::unique_ptr<BackgroundSchedulePool> delete_buffer_flush_schedule_pool;
std::unique_ptr<BackgroundSchedulePool> delete_schedule_pool;
std::unique_ptr<BackgroundSchedulePool> delete_distributed_schedule_pool;
std::unique_ptr<BackgroundSchedulePool> delete_message_broker_schedule_pool;
std::unique_ptr<DDLWorker> delete_ddl_worker;
std::unique_ptr<AccessControl> delete_access_control;
{
auto lock = std::lock_guard(mutex);
/** Compiled expressions stored in cache need to be destroyed before destruction of static objects.
* Because CHJIT instance can be static object.
*/
/** Compiled expressions stored in cache need to be destroyed before destruction of static objects.
* Because CHJIT instance can be static object.
*/
#if USE_EMBEDDED_COMPILER
if (auto * cache = CompiledExpressionCacheFactory::instance().tryGetCache())
cache->reset();
@ -369,19 +380,19 @@ struct ContextSharedPart
/// but at least they can be preserved for storage termination.
dictionaries_xmls.reset();
user_defined_executable_functions_xmls.reset();
models_repository_guard.reset();
delete_system_logs = std::move(system_logs);
embedded_dictionaries.reset();
external_dictionaries_loader.reset();
external_user_defined_executable_functions_loader.reset();
models_repository_guard.reset();
external_models_loader.reset();
buffer_flush_schedule_pool.reset();
schedule_pool.reset();
distributed_schedule_pool.reset();
message_broker_schedule_pool.reset();
ddl_worker.reset();
access_control.reset();
delete_embedded_dictionaries = std::move(embedded_dictionaries);
delete_external_dictionaries_loader = std::move(external_dictionaries_loader);
delete_external_user_defined_executable_functions_loader = std::move(external_user_defined_executable_functions_loader);
delete_external_models_loader = std::move(external_models_loader);
delete_buffer_flush_schedule_pool = std::move(buffer_flush_schedule_pool);
delete_schedule_pool = std::move(schedule_pool);
delete_distributed_schedule_pool = std::move(distributed_schedule_pool);
delete_message_broker_schedule_pool = std::move(message_broker_schedule_pool);
delete_ddl_worker = std::move(ddl_worker);
delete_access_control = std::move(access_control);
/// Stop trace collector if any
trace_collector.reset();
@ -391,6 +402,17 @@ struct ContextSharedPart
/// Can be removed w/o context lock
delete_system_logs.reset();
delete_embedded_dictionaries.reset();
delete_external_dictionaries_loader.reset();
delete_external_user_defined_executable_functions_loader.reset();
delete_external_models_loader.reset();
delete_ddl_worker.reset();
delete_buffer_flush_schedule_pool.reset();
delete_schedule_pool.reset();
delete_distributed_schedule_pool.reset();
delete_message_broker_schedule_pool.reset();
delete_ddl_worker.reset();
delete_access_control.reset();
}
bool hasTraceCollector() const
@ -1365,7 +1387,8 @@ ExternalDictionariesLoader & Context::getExternalDictionariesLoader()
ExternalDictionariesLoader & Context::getExternalDictionariesLoaderUnlocked()
{
if (!shared->external_dictionaries_loader)
shared->external_dictionaries_loader.emplace(getGlobalContext());
shared->external_dictionaries_loader =
std::make_unique<ExternalDictionariesLoader>(getGlobalContext());
return *shared->external_dictionaries_loader;
}
@ -1383,7 +1406,8 @@ ExternalUserDefinedExecutableFunctionsLoader & Context::getExternalUserDefinedEx
ExternalUserDefinedExecutableFunctionsLoader & Context::getExternalUserDefinedExecutableFunctionsLoaderUnlocked()
{
if (!shared->external_user_defined_executable_functions_loader)
shared->external_user_defined_executable_functions_loader.emplace(getGlobalContext());
shared->external_user_defined_executable_functions_loader =
std::make_unique<ExternalUserDefinedExecutableFunctionsLoader>(getGlobalContext());
return *shared->external_user_defined_executable_functions_loader;
}
@ -1401,7 +1425,8 @@ ExternalModelsLoader & Context::getExternalModelsLoader()
ExternalModelsLoader & Context::getExternalModelsLoaderUnlocked()
{
if (!shared->external_models_loader)
shared->external_models_loader.emplace(getGlobalContext());
shared->external_models_loader =
std::make_unique<ExternalModelsLoader>(getGlobalContext());
return *shared->external_models_loader;
}
@ -1436,7 +1461,7 @@ EmbeddedDictionaries & Context::getEmbeddedDictionariesImpl(const bool throw_on_
{
auto geo_dictionaries_loader = std::make_unique<GeoDictionariesLoader>();
shared->embedded_dictionaries.emplace(
shared->embedded_dictionaries = std::make_unique<EmbeddedDictionaries>(
std::move(geo_dictionaries_loader),
getGlobalContext(),
throw_on_error);
@ -1695,7 +1720,7 @@ BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const
{
auto lock = getLock();
if (!shared->buffer_flush_schedule_pool)
shared->buffer_flush_schedule_pool.emplace(
shared->buffer_flush_schedule_pool = std::make_unique<BackgroundSchedulePool>(
settings.background_buffer_flush_schedule_pool_size,
CurrentMetrics::BackgroundBufferFlushSchedulePoolTask,
"BgBufSchPool");
@ -1737,7 +1762,7 @@ BackgroundSchedulePool & Context::getSchedulePool() const
{
auto lock = getLock();
if (!shared->schedule_pool)
shared->schedule_pool.emplace(
shared->schedule_pool = std::make_unique<BackgroundSchedulePool>(
settings.background_schedule_pool_size,
CurrentMetrics::BackgroundSchedulePoolTask,
"BgSchPool");
@ -1748,7 +1773,7 @@ BackgroundSchedulePool & Context::getDistributedSchedulePool() const
{
auto lock = getLock();
if (!shared->distributed_schedule_pool)
shared->distributed_schedule_pool.emplace(
shared->distributed_schedule_pool = std::make_unique<BackgroundSchedulePool>(
settings.background_distributed_schedule_pool_size,
CurrentMetrics::BackgroundDistributedSchedulePoolTask,
"BgDistSchPool");
@ -1759,7 +1784,7 @@ BackgroundSchedulePool & Context::getMessageBrokerSchedulePool() const
{
auto lock = getLock();
if (!shared->message_broker_schedule_pool)
shared->message_broker_schedule_pool.emplace(
shared->message_broker_schedule_pool = std::make_unique<BackgroundSchedulePool>(
settings.background_message_broker_schedule_pool_size,
CurrentMetrics::BackgroundMessageBrokerSchedulePoolTask,
"BgMBSchPool");