mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #11106 from ClickHouse/fix_deadlock_system_logs_startup
Fix deadlock in system tables during server startup
This commit is contained in:
commit
cd83ebc599
@ -9,9 +9,11 @@
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class Context;
|
||||
class DiskSelector;
|
||||
using DiskSelectorPtr = std::shared_ptr<const DiskSelector>;
|
||||
using DisksMap = std::map<String, DiskPtr>;
|
||||
|
||||
/// Parse .xml configuration and store information about disks
|
||||
/// Mostly used for introspection.
|
||||
@ -28,14 +30,14 @@ public:
|
||||
DiskPtr get(const String & name) const;
|
||||
|
||||
/// Get all disks with names
|
||||
const auto & getDisksMap() const { return disks; }
|
||||
const DisksMap & getDisksMap() const { return disks; }
|
||||
void addToDiskMap(String name, DiskPtr disk)
|
||||
{
|
||||
disks.emplace(name, disk);
|
||||
}
|
||||
|
||||
private:
|
||||
std::map<String, DiskPtr> disks;
|
||||
DisksMap disks;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -99,6 +99,7 @@ private:
|
||||
|
||||
class StoragePolicySelector;
|
||||
using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>;
|
||||
using StoragePoliciesMap = std::map<String, StoragePolicyPtr>;
|
||||
|
||||
/// Parse .xml configuration and store information about policies
|
||||
/// Mostly used for introspection.
|
||||
@ -113,10 +114,10 @@ public:
|
||||
StoragePolicyPtr get(const String & name) const;
|
||||
|
||||
/// All policies
|
||||
const std::map<String, StoragePolicyPtr> & getPoliciesMap() const { return policies; }
|
||||
const StoragePoliciesMap & getPoliciesMap() const { return policies; }
|
||||
|
||||
private:
|
||||
std::map<String, StoragePolicyPtr> policies;
|
||||
StoragePoliciesMap policies;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -295,6 +295,10 @@ struct ContextShared
|
||||
mutable std::mutex embedded_dictionaries_mutex;
|
||||
mutable std::mutex external_dictionaries_mutex;
|
||||
mutable std::mutex external_models_mutex;
|
||||
/// Separate mutex for storage policies. During server startup we may
|
||||
/// initialize some important storages (system logs with MergeTree engine)
|
||||
/// under context lock.
|
||||
mutable std::mutex storage_policies_mutex;
|
||||
/// Separate mutex for re-initialization of zookeeper session. This operation could take a long time and must not interfere with another operations.
|
||||
mutable std::mutex zookeeper_mutex;
|
||||
|
||||
@ -567,7 +571,7 @@ void Context::setPath(const String & path)
|
||||
|
||||
VolumeJBODPtr Context::setTemporaryStorage(const String & path, const String & policy_name)
|
||||
{
|
||||
auto lock = getLock();
|
||||
std::lock_guard lock(shared->storage_policies_mutex);
|
||||
|
||||
if (policy_name.empty())
|
||||
{
|
||||
@ -580,7 +584,7 @@ VolumeJBODPtr Context::setTemporaryStorage(const String & path, const String & p
|
||||
}
|
||||
else
|
||||
{
|
||||
StoragePolicyPtr tmp_policy = getStoragePolicySelector()->get(policy_name);
|
||||
StoragePolicyPtr tmp_policy = getStoragePolicySelector(lock)->get(policy_name);
|
||||
if (tmp_policy->getVolumes().size() != 1)
|
||||
throw Exception("Policy " + policy_name + " is used temporary files, such policy should have exactly one volume", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
|
||||
shared->tmp_volume = tmp_policy->getVolume(0);
|
||||
@ -1688,18 +1692,37 @@ CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double par
|
||||
|
||||
DiskPtr Context::getDisk(const String & name) const
|
||||
{
|
||||
auto lock = getLock();
|
||||
std::lock_guard lock(shared->storage_policies_mutex);
|
||||
|
||||
auto disk_selector = getDiskSelector();
|
||||
auto disk_selector = getDiskSelector(lock);
|
||||
|
||||
return disk_selector->get(name);
|
||||
}
|
||||
|
||||
|
||||
DiskSelectorPtr Context::getDiskSelector() const
|
||||
StoragePolicyPtr Context::getStoragePolicy(const String & name) const
|
||||
{
|
||||
auto lock = getLock();
|
||||
std::lock_guard lock(shared->storage_policies_mutex);
|
||||
|
||||
auto policy_selector = getStoragePolicySelector(lock);
|
||||
|
||||
return policy_selector->get(name);
|
||||
}
|
||||
|
||||
|
||||
DisksMap Context::getDisksMap() const
|
||||
{
|
||||
std::lock_guard lock(shared->storage_policies_mutex);
|
||||
return getDiskSelector(lock)->getDisksMap();
|
||||
}
|
||||
|
||||
StoragePoliciesMap Context::getPoliciesMap() const
|
||||
{
|
||||
std::lock_guard lock(shared->storage_policies_mutex);
|
||||
return getStoragePolicySelector(lock)->getPoliciesMap();
|
||||
}
|
||||
|
||||
DiskSelectorPtr Context::getDiskSelector(std::lock_guard<std::mutex> & /* lock */) const
|
||||
{
|
||||
if (!shared->merge_tree_disk_selector)
|
||||
{
|
||||
constexpr auto config_name = "storage_configuration.disks";
|
||||
@ -1710,27 +1733,14 @@ DiskSelectorPtr Context::getDiskSelector() const
|
||||
return shared->merge_tree_disk_selector;
|
||||
}
|
||||
|
||||
|
||||
StoragePolicyPtr Context::getStoragePolicy(const String & name) const
|
||||
StoragePolicySelectorPtr Context::getStoragePolicySelector(std::lock_guard<std::mutex> & lock) const
|
||||
{
|
||||
auto lock = getLock();
|
||||
|
||||
auto policy_selector = getStoragePolicySelector();
|
||||
|
||||
return policy_selector->get(name);
|
||||
}
|
||||
|
||||
|
||||
StoragePolicySelectorPtr Context::getStoragePolicySelector() const
|
||||
{
|
||||
auto lock = getLock();
|
||||
|
||||
if (!shared->merge_tree_storage_policy_selector)
|
||||
{
|
||||
constexpr auto config_name = "storage_configuration.policies";
|
||||
const auto & config = getConfigRef();
|
||||
|
||||
shared->merge_tree_storage_policy_selector = std::make_shared<StoragePolicySelector>(config, config_name, getDiskSelector());
|
||||
shared->merge_tree_storage_policy_selector = std::make_shared<StoragePolicySelector>(config, config_name, getDiskSelector(lock));
|
||||
}
|
||||
return shared->merge_tree_storage_policy_selector;
|
||||
}
|
||||
@ -1738,7 +1748,7 @@ StoragePolicySelectorPtr Context::getStoragePolicySelector() const
|
||||
|
||||
void Context::updateStorageConfiguration(const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
auto lock = getLock();
|
||||
std::lock_guard lock(shared->storage_policies_mutex);
|
||||
|
||||
if (shared->merge_tree_disk_selector)
|
||||
shared->merge_tree_disk_selector = shared->merge_tree_disk_selector->updateFromConfig(config, "storage_configuration.disks", *this);
|
||||
|
@ -97,8 +97,10 @@ class IDisk;
|
||||
using DiskPtr = std::shared_ptr<IDisk>;
|
||||
class DiskSelector;
|
||||
using DiskSelectorPtr = std::shared_ptr<const DiskSelector>;
|
||||
using DisksMap = std::map<String, DiskPtr>;
|
||||
class StoragePolicy;
|
||||
using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;
|
||||
using StoragePoliciesMap = std::map<String, StoragePolicyPtr>;
|
||||
class StoragePolicySelector;
|
||||
using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>;
|
||||
|
||||
@ -541,14 +543,12 @@ public:
|
||||
/// Lets you select the compression codec according to the conditions described in the configuration file.
|
||||
std::shared_ptr<ICompressionCodec> chooseCompressionCodec(size_t part_size, double part_size_ratio) const;
|
||||
|
||||
DiskSelectorPtr getDiskSelector() const;
|
||||
|
||||
/// Provides storage disks
|
||||
DiskPtr getDisk(const String & name) const;
|
||||
DiskPtr getDefaultDisk() const { return getDisk("default"); }
|
||||
|
||||
StoragePolicySelectorPtr getStoragePolicySelector() const;
|
||||
|
||||
StoragePoliciesMap getPoliciesMap() const;
|
||||
DisksMap getDisksMap() const;
|
||||
void updateStorageConfiguration(const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
/// Provides storage politics schemes
|
||||
@ -626,6 +626,10 @@ private:
|
||||
EmbeddedDictionaries & getEmbeddedDictionariesImpl(bool throw_on_error) const;
|
||||
|
||||
void checkCanBeDropped(const String & database, const String & table, const size_t & size, const size_t & max_size_to_drop) const;
|
||||
|
||||
StoragePolicySelectorPtr getStoragePolicySelector(std::lock_guard<std::mutex> & lock) const;
|
||||
|
||||
DiskSelectorPtr getDiskSelector(std::lock_guard<std::mutex> & /* lock */) const;
|
||||
};
|
||||
|
||||
|
||||
|
@ -204,8 +204,8 @@ SystemLog<LogElement>::SystemLog(Context & context_,
|
||||
size_t flush_interval_milliseconds_)
|
||||
: context(context_)
|
||||
, table_id(database_name_, table_name_)
|
||||
, storage_def(storage_def_),
|
||||
flush_interval_milliseconds(flush_interval_milliseconds_)
|
||||
, storage_def(storage_def_)
|
||||
, flush_interval_milliseconds(flush_interval_milliseconds_)
|
||||
{
|
||||
assert(database_name_ == DatabaseCatalog::SYSTEM_DATABASE);
|
||||
log = &Logger::get("SystemLog (" + database_name_ + "." + table_name_ + ")");
|
||||
|
@ -867,7 +867,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
||||
for (const auto & disk_ptr : disks)
|
||||
defined_disk_names.insert(disk_ptr->getName());
|
||||
|
||||
for (const auto & [disk_name, disk] : global_context.getDiskSelector()->getDisksMap())
|
||||
for (const auto & [disk_name, disk] : global_context.getDisksMap())
|
||||
{
|
||||
if (defined_disk_names.count(disk_name) == 0 && disk->exists(relative_data_path))
|
||||
{
|
||||
|
@ -335,7 +335,7 @@ void StorageDistributed::createStorage()
|
||||
}
|
||||
else
|
||||
{
|
||||
auto policy = global_context->getStoragePolicySelector()->get(storage_policy);
|
||||
auto policy = global_context->getStoragePolicy(storage_policy);
|
||||
if (policy->getVolumes().size() != 1)
|
||||
throw Exception("Policy for Distributed table, should have exactly one volume", ErrorCodes::BAD_ARGUMENTS);
|
||||
volume = policy->getVolume(0);
|
||||
@ -629,7 +629,7 @@ StoragePolicyPtr StorageDistributed::getStoragePolicy() const
|
||||
{
|
||||
if (storage_policy.empty())
|
||||
return {};
|
||||
return global_context->getStoragePolicySelector()->get(storage_policy);
|
||||
return global_context->getStoragePolicy(storage_policy);
|
||||
}
|
||||
|
||||
void StorageDistributed::createDirectoryMonitors(const std::string & disk)
|
||||
|
@ -40,9 +40,7 @@ Pipes StorageSystemDisks::read(
|
||||
MutableColumnPtr col_total = ColumnUInt64::create();
|
||||
MutableColumnPtr col_keep = ColumnUInt64::create();
|
||||
|
||||
const auto & disk_selector = context.getDiskSelector();
|
||||
|
||||
for (const auto & [disk_name, disk_ptr] : disk_selector->getDisksMap())
|
||||
for (const auto & [disk_name, disk_ptr] : context.getDisksMap())
|
||||
{
|
||||
col_name->insert(disk_name);
|
||||
col_path->insert(disk_ptr->getPath());
|
||||
|
@ -45,9 +45,7 @@ Pipes StorageSystemStoragePolicies::read(
|
||||
MutableColumnPtr col_max_part_size = ColumnUInt64::create();
|
||||
MutableColumnPtr col_move_factor = ColumnFloat32::create();
|
||||
|
||||
const auto & policy_selector = context.getStoragePolicySelector();
|
||||
|
||||
for (const auto & [policy_name, policy_ptr] : policy_selector->getPoliciesMap())
|
||||
for (const auto & [policy_name, policy_ptr] : context.getPoliciesMap())
|
||||
{
|
||||
const auto & volumes = policy_ptr->getVolumes();
|
||||
for (size_t i = 0; i != volumes.size(); ++i)
|
||||
|
Loading…
Reference in New Issue
Block a user