mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Better initialization of access storages. Make list of access storages dynamic.
This commit is contained in:
parent
ad03ff3887
commit
2909ed1bc0
@ -128,7 +128,6 @@ namespace ErrorCodes
|
||||
extern const int FAILED_TO_GETPWUID;
|
||||
extern const int MISMATCHING_USERS_FOR_PROCESS_AND_DATA;
|
||||
extern const int NETWORK_ERROR;
|
||||
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
|
||||
}
|
||||
|
||||
|
||||
@ -215,30 +214,6 @@ void Server::defineOptions(Poco::Util::OptionSet & options)
|
||||
}
|
||||
|
||||
|
||||
/// Check that there is no user-level settings at the top level in config.
|
||||
/// This is a common source of mistake (user don't know where to write user-level setting).
|
||||
void checkForUserSettingsAtTopLevel(const Poco::Util::AbstractConfiguration & config, const std::string & path)
|
||||
{
|
||||
if (config.getBool("skip_check_for_incorrect_settings", false))
|
||||
return;
|
||||
|
||||
Settings settings;
|
||||
for (const auto & setting : settings.all())
|
||||
{
|
||||
const auto & name = setting.getName();
|
||||
if (config.has(name))
|
||||
{
|
||||
throw Exception(fmt::format("A setting '{}' appeared at top level in config {}."
|
||||
" But it is user-level setting that should be located in users.xml inside <profiles> section for specific profile."
|
||||
" You can add it to <profiles><default> if you want to change default value of this setting."
|
||||
" You can also disable the check - specify <skip_check_for_incorrect_settings>1</skip_check_for_incorrect_settings>"
|
||||
" in the main configuration file.",
|
||||
name, path),
|
||||
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void checkForUsersNotInMainConfig(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_path,
|
||||
@ -319,7 +294,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
config().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false);
|
||||
}
|
||||
|
||||
checkForUserSettingsAtTopLevel(config(), config_path);
|
||||
Settings::checkNoSettingNamesAtTopLevel(config(), config_path);
|
||||
|
||||
const auto memory_amount = getMemoryAmount();
|
||||
|
||||
@ -538,7 +513,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
main_config_zk_changed_event,
|
||||
[&](ConfigurationPtr config)
|
||||
{
|
||||
checkForUserSettingsAtTopLevel(*config, config_path);
|
||||
Settings::checkNoSettingNamesAtTopLevel(*config, config_path);
|
||||
|
||||
// FIXME logging-related things need synchronization -- see the 'Logger * log' saved
|
||||
// in a lot of places. For now, disable updating log configuration without server restart.
|
||||
@ -573,34 +548,25 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
if (users_config_path != config_path)
|
||||
checkForUsersNotInMainConfig(config(), config_path, users_config_path, log);
|
||||
|
||||
auto & access_control = global_context->getAccessControlManager();
|
||||
if (config().has("custom_settings_prefixes"))
|
||||
global_context->getAccessControlManager().setCustomSettingsPrefixes(config().getString("custom_settings_prefixes"));
|
||||
access_control.setCustomSettingsPrefixes(config().getString("custom_settings_prefixes"));
|
||||
|
||||
auto users_config_reloader = std::make_unique<ConfigReloader>(
|
||||
users_config_path,
|
||||
include_from_path,
|
||||
config().getString("path", ""),
|
||||
zkutil::ZooKeeperNodeCache([&] { return global_context->getZooKeeper(); }),
|
||||
std::make_shared<Poco::Event>(),
|
||||
[&](ConfigurationPtr config)
|
||||
{
|
||||
global_context->setUsersConfig(config);
|
||||
checkForUserSettingsAtTopLevel(*config, users_config_path);
|
||||
},
|
||||
/* already_loaded = */ false);
|
||||
if (!users_config_path.empty())
|
||||
access_control.addUsersConfigStorage(users_config_path, include_from_path, path, [&] { return global_context->getZooKeeper(); });
|
||||
|
||||
/// Sets a local directory storing information about access control.
|
||||
std::string access_control_local_path = config().getString("access_control_path", "");
|
||||
if (!access_control_local_path.empty())
|
||||
access_control.addDiskStorage(access_control_local_path);
|
||||
|
||||
/// Reload config in SYSTEM RELOAD CONFIG query.
|
||||
global_context->setConfigReloadCallback([&]()
|
||||
{
|
||||
main_config_reloader->reload();
|
||||
users_config_reloader->reload();
|
||||
access_control.reloadUsersConfigs();
|
||||
});
|
||||
|
||||
/// Sets a local directory storing information about access control.
|
||||
std::string access_control_local_path = config().getString("access_control_path", "");
|
||||
if (!access_control_local_path.empty())
|
||||
global_context->getAccessControlManager().setLocalDirectory(access_control_local_path);
|
||||
|
||||
/// Limit on total number of concurrently executed queries.
|
||||
global_context->getProcessList().setMaxSize(config().getInt("max_concurrent_queries", 0));
|
||||
|
||||
@ -1069,7 +1035,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
buildLoggers(config(), logger());
|
||||
|
||||
main_config_reloader->start();
|
||||
users_config_reloader->start();
|
||||
access_control.startPeriodicReloadingUsersConfigs();
|
||||
if (dns_cache_updater)
|
||||
dns_cache_updater->start();
|
||||
|
||||
@ -1128,7 +1094,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
|
||||
dns_cache_updater.reset();
|
||||
main_config_reloader.reset();
|
||||
users_config_reloader.reset();
|
||||
|
||||
if (current_connections)
|
||||
{
|
||||
|
@ -24,24 +24,6 @@ namespace ErrorCodes
|
||||
extern const int UNKNOWN_SETTING;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
std::vector<std::unique_ptr<IAccessStorage>> createStorages()
|
||||
{
|
||||
std::vector<std::unique_ptr<IAccessStorage>> list;
|
||||
list.emplace_back(std::make_unique<UsersConfigAccessStorage>());
|
||||
list.emplace_back(std::make_unique<DiskAccessStorage>());
|
||||
|
||||
#if 0 /// Memory access storage is disabled.
|
||||
list.emplace_back(std::make_unique<MemoryAccessStorage>());
|
||||
#endif
|
||||
return list;
|
||||
}
|
||||
|
||||
constexpr size_t USERS_CONFIG_ACCESS_STORAGE_INDEX = 0;
|
||||
constexpr size_t DISK_ACCESS_STORAGE_INDEX = 1;
|
||||
}
|
||||
|
||||
|
||||
class AccessControlManager::ContextAccessCache
|
||||
{
|
||||
@ -114,7 +96,7 @@ private:
|
||||
|
||||
|
||||
AccessControlManager::AccessControlManager()
|
||||
: MultipleAccessStorage("user directories", createStorages()),
|
||||
: MultipleAccessStorage("user directories"),
|
||||
context_access_cache(std::make_unique<ContextAccessCache>(*this)),
|
||||
role_cache(std::make_unique<RoleCache>(*this)),
|
||||
row_policy_cache(std::make_unique<RowPolicyCache>(*this)),
|
||||
@ -123,20 +105,97 @@ AccessControlManager::AccessControlManager()
|
||||
external_authenticators(std::make_unique<ExternalAuthenticators>()),
|
||||
custom_settings_prefixes(std::make_unique<CustomSettingsPrefixes>())
|
||||
{
|
||||
/// Allow UsersConfigAccessStorage to check the names of settings which it will read from users.xml.
|
||||
auto check_setting_name_function = [this](const std::string_view & setting_name) { checkSettingNameIsAllowed(setting_name); };
|
||||
auto & users_config_access_storage = dynamic_cast<UsersConfigAccessStorage &>(getStorageByIndex(USERS_CONFIG_ACCESS_STORAGE_INDEX));
|
||||
users_config_access_storage.setCheckSettingNameFunction(check_setting_name_function);
|
||||
}
|
||||
|
||||
|
||||
AccessControlManager::~AccessControlManager() = default;
|
||||
|
||||
|
||||
void AccessControlManager::setLocalDirectory(const String & directory_path)
|
||||
void AccessControlManager::setUsersConfig(const Poco::Util::AbstractConfiguration & users_config_)
|
||||
{
|
||||
auto & disk_access_storage = dynamic_cast<DiskAccessStorage &>(getStorageByIndex(DISK_ACCESS_STORAGE_INDEX));
|
||||
disk_access_storage.setDirectory(directory_path);
|
||||
auto storages = getStoragesPtr();
|
||||
for (const auto & storage : *storages)
|
||||
{
|
||||
if (auto users_config_storage = typeid_cast<std::shared_ptr<UsersConfigAccessStorage>>(storage))
|
||||
{
|
||||
users_config_storage->setConfig(users_config_);
|
||||
return;
|
||||
}
|
||||
}
|
||||
addUsersConfigStorage(users_config_);
|
||||
}
|
||||
|
||||
void AccessControlManager::addUsersConfigStorage(const Poco::Util::AbstractConfiguration & users_config_)
|
||||
{
|
||||
addUsersConfigStorage(UsersConfigAccessStorage::STORAGE_TYPE, users_config_);
|
||||
}
|
||||
|
||||
void AccessControlManager::addUsersConfigStorage(const String & storage_name_, const Poco::Util::AbstractConfiguration & users_config_)
|
||||
{
|
||||
auto check_setting_name_function = [this](const std::string_view & setting_name) { checkSettingNameIsAllowed(setting_name); };
|
||||
auto new_storage = std::make_shared<UsersConfigAccessStorage>(storage_name_, check_setting_name_function);
|
||||
new_storage->setConfig(users_config_);
|
||||
addStorage(new_storage);
|
||||
}
|
||||
|
||||
void AccessControlManager::addUsersConfigStorage(
|
||||
const String & users_config_path_,
|
||||
const String & include_from_path_,
|
||||
const String & preprocessed_dir_,
|
||||
const zkutil::GetZooKeeper & get_zookeeper_function_)
|
||||
{
|
||||
addUsersConfigStorage(
|
||||
UsersConfigAccessStorage::STORAGE_TYPE, users_config_path_, include_from_path_, preprocessed_dir_, get_zookeeper_function_);
|
||||
}
|
||||
|
||||
void AccessControlManager::addUsersConfigStorage(
|
||||
const String & storage_name_,
|
||||
const String & users_config_path_,
|
||||
const String & include_from_path_,
|
||||
const String & preprocessed_dir_,
|
||||
const zkutil::GetZooKeeper & get_zookeeper_function_)
|
||||
{
|
||||
auto check_setting_name_function = [this](const std::string_view & setting_name) { checkSettingNameIsAllowed(setting_name); };
|
||||
auto new_storage = std::make_shared<UsersConfigAccessStorage>(storage_name_, check_setting_name_function);
|
||||
new_storage->load(users_config_path_, include_from_path_, preprocessed_dir_, get_zookeeper_function_);
|
||||
addStorage(new_storage);
|
||||
}
|
||||
|
||||
void AccessControlManager::reloadUsersConfigs()
|
||||
{
|
||||
auto storages = getStoragesPtr();
|
||||
for (const auto & storage : *storages)
|
||||
{
|
||||
if (auto users_config_storage = typeid_cast<std::shared_ptr<UsersConfigAccessStorage>>(storage))
|
||||
users_config_storage->reload();
|
||||
}
|
||||
}
|
||||
|
||||
void AccessControlManager::startPeriodicReloadingUsersConfigs()
|
||||
{
|
||||
auto storages = getStoragesPtr();
|
||||
for (const auto & storage : *storages)
|
||||
{
|
||||
if (auto users_config_storage = typeid_cast<std::shared_ptr<UsersConfigAccessStorage>>(storage))
|
||||
users_config_storage->startPeriodicReloading();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void AccessControlManager::addDiskStorage(const String & directory_, bool readonly_)
|
||||
{
|
||||
addStorage(std::make_shared<DiskAccessStorage>(directory_, readonly_));
|
||||
}
|
||||
|
||||
void AccessControlManager::addDiskStorage(const String & storage_name_, const String & directory_, bool readonly_)
|
||||
{
|
||||
addStorage(std::make_shared<DiskAccessStorage>(storage_name_, directory_, readonly_));
|
||||
}
|
||||
|
||||
|
||||
void AccessControlManager::addMemoryStorage(const String & storage_name_)
|
||||
{
|
||||
addStorage(std::make_shared<MemoryAccessStorage>(storage_name_));
|
||||
}
|
||||
|
||||
|
||||
@ -146,13 +205,6 @@ void AccessControlManager::setExternalAuthenticatorsConfig(const Poco::Util::Abs
|
||||
}
|
||||
|
||||
|
||||
void AccessControlManager::setUsersConfig(const Poco::Util::AbstractConfiguration & users_config)
|
||||
{
|
||||
auto & users_config_access_storage = dynamic_cast<UsersConfigAccessStorage &>(getStorageByIndex(USERS_CONFIG_ACCESS_STORAGE_INDEX));
|
||||
users_config_access_storage.setConfiguration(users_config);
|
||||
}
|
||||
|
||||
|
||||
void AccessControlManager::setDefaultProfileName(const String & default_profile_name)
|
||||
{
|
||||
settings_profiles_cache->setDefaultProfileName(default_profile_name);
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <Access/MultipleAccessStorage.h>
|
||||
#include <Common/SettingsChanges.h>
|
||||
#include <Common/ZooKeeper/Common.h>
|
||||
#include <boost/container/flat_set.hpp>
|
||||
#include <memory>
|
||||
|
||||
@ -48,9 +49,41 @@ public:
|
||||
AccessControlManager();
|
||||
~AccessControlManager();
|
||||
|
||||
void setLocalDirectory(const String & directory);
|
||||
void setExternalAuthenticatorsConfig(const Poco::Util::AbstractConfiguration & config);
|
||||
void setUsersConfig(const Poco::Util::AbstractConfiguration & users_config);
|
||||
/// Parses access entities from a configuration loaded from users.xml.
|
||||
/// This function add UsersConfigAccessStorage if it wasn't added before.
|
||||
void setUsersConfig(const Poco::Util::AbstractConfiguration & users_config_);
|
||||
|
||||
/// Adds UsersConfigAccessStorage.
|
||||
void addUsersConfigStorage(const Poco::Util::AbstractConfiguration & users_config_);
|
||||
|
||||
void addUsersConfigStorage(const String & storage_name_,
|
||||
const Poco::Util::AbstractConfiguration & users_config_);
|
||||
|
||||
void addUsersConfigStorage(const String & users_config_path_,
|
||||
const String & include_from_path_,
|
||||
const String & preprocessed_dir_,
|
||||
const zkutil::GetZooKeeper & get_zookeeper_function_ = {});
|
||||
|
||||
void addUsersConfigStorage(const String & storage_name_,
|
||||
const String & users_config_path_,
|
||||
const String & include_from_path_,
|
||||
const String & preprocessed_dir_,
|
||||
const zkutil::GetZooKeeper & get_zookeeper_function_ = {});
|
||||
|
||||
void reloadUsersConfigs();
|
||||
void startPeriodicReloadingUsersConfigs();
|
||||
|
||||
/// Loads access entities from the directory on the local disk.
|
||||
/// Use that directory to keep created users/roles/etc.
|
||||
void addDiskStorage(const String & directory_, bool readonly_ = false);
|
||||
void addDiskStorage(const String & storage_name_, const String & directory_, bool readonly_ = false);
|
||||
|
||||
/// Adds MemoryAccessStorage which keeps access entities in memory.
|
||||
void addMemoryStorage();
|
||||
void addMemoryStorage(const String & storage_name_);
|
||||
|
||||
/// Sets the default profile's name.
|
||||
/// The default profile's settings are always applied before any other profile's.
|
||||
void setDefaultProfileName(const String & default_profile_name);
|
||||
|
||||
/// Sets prefixes which should be used for custom settings.
|
||||
@ -60,6 +93,8 @@ public:
|
||||
bool isSettingNameAllowed(const std::string_view & name) const;
|
||||
void checkSettingNameIsAllowed(const std::string_view & name) const;
|
||||
|
||||
void setExternalAuthenticatorsConfig(const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
std::shared_ptr<const ContextAccess> getContextAccess(
|
||||
const UUID & user_id,
|
||||
const boost::container::flat_set<UUID> & current_roles,
|
||||
@ -96,8 +131,10 @@ public:
|
||||
|
||||
const ExternalAuthenticators & getExternalAuthenticators() const;
|
||||
|
||||
private: class ContextAccessCache;
|
||||
private:
|
||||
class ContextAccessCache;
|
||||
class CustomSettingsPrefixes;
|
||||
|
||||
std::unique_ptr<ContextAccessCache> context_access_cache;
|
||||
std::unique_ptr<RoleCache> role_cache;
|
||||
std::unique_ptr<RowPolicyCache> row_policy_cache;
|
||||
|
@ -47,7 +47,6 @@ namespace ErrorCodes
|
||||
extern const int DIRECTORY_DOESNT_EXIST;
|
||||
extern const int FILE_DOESNT_EXIST;
|
||||
extern const int INCORRECT_ACCESS_ENTITY_DEFINITION;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
|
||||
@ -86,7 +85,7 @@ namespace
|
||||
|
||||
|
||||
/// Reads a file containing ATTACH queries and then parses it to build an access entity.
|
||||
AccessEntityPtr readEntityFile(const std::filesystem::path & file_path)
|
||||
AccessEntityPtr readEntityFile(const String & file_path)
|
||||
{
|
||||
/// Read the file.
|
||||
ReadBufferFromFile in{file_path};
|
||||
@ -119,42 +118,42 @@ namespace
|
||||
if (auto * create_user_query = query->as<ASTCreateUserQuery>())
|
||||
{
|
||||
if (res)
|
||||
throw Exception("Two access entities in one file " + file_path.string(), ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
throw Exception("Two access entities in one file " + file_path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
res = user = std::make_unique<User>();
|
||||
InterpreterCreateUserQuery::updateUserFromQuery(*user, *create_user_query);
|
||||
}
|
||||
else if (auto * create_role_query = query->as<ASTCreateRoleQuery>())
|
||||
{
|
||||
if (res)
|
||||
throw Exception("Two access entities in one file " + file_path.string(), ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
throw Exception("Two access entities in one file " + file_path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
res = role = std::make_unique<Role>();
|
||||
InterpreterCreateRoleQuery::updateRoleFromQuery(*role, *create_role_query);
|
||||
}
|
||||
else if (auto * create_policy_query = query->as<ASTCreateRowPolicyQuery>())
|
||||
{
|
||||
if (res)
|
||||
throw Exception("Two access entities in one file " + file_path.string(), ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
throw Exception("Two access entities in one file " + file_path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
res = policy = std::make_unique<RowPolicy>();
|
||||
InterpreterCreateRowPolicyQuery::updateRowPolicyFromQuery(*policy, *create_policy_query);
|
||||
}
|
||||
else if (auto * create_quota_query = query->as<ASTCreateQuotaQuery>())
|
||||
{
|
||||
if (res)
|
||||
throw Exception("Two access entities are attached in the same file " + file_path.string(), ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
throw Exception("Two access entities are attached in the same file " + file_path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
res = quota = std::make_unique<Quota>();
|
||||
InterpreterCreateQuotaQuery::updateQuotaFromQuery(*quota, *create_quota_query);
|
||||
}
|
||||
else if (auto * create_profile_query = query->as<ASTCreateSettingsProfileQuery>())
|
||||
{
|
||||
if (res)
|
||||
throw Exception("Two access entities are attached in the same file " + file_path.string(), ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
throw Exception("Two access entities are attached in the same file " + file_path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
res = profile = std::make_unique<SettingsProfile>();
|
||||
InterpreterCreateSettingsProfileQuery::updateSettingsProfileFromQuery(*profile, *create_profile_query);
|
||||
}
|
||||
else if (auto * grant_query = query->as<ASTGrantQuery>())
|
||||
{
|
||||
if (!user && !role)
|
||||
throw Exception("A user or role should be attached before grant in file " + file_path.string(), ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
throw Exception("A user or role should be attached before grant in file " + file_path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
if (user)
|
||||
InterpreterGrantQuery::updateUserFromQuery(*user, *grant_query);
|
||||
else
|
||||
@ -165,13 +164,13 @@ namespace
|
||||
}
|
||||
|
||||
if (!res)
|
||||
throw Exception("No access entities attached in file " + file_path.string(), ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
throw Exception("No access entities attached in file " + file_path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
AccessEntityPtr tryReadEntityFile(const std::filesystem::path & file_path, Poco::Logger & log)
|
||||
AccessEntityPtr tryReadEntityFile(const String & file_path, Poco::Logger & log)
|
||||
{
|
||||
try
|
||||
{
|
||||
@ -179,14 +178,14 @@ namespace
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(&log, "Could not parse " + file_path.string());
|
||||
tryLogCurrentException(&log, "Could not parse " + file_path);
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Writes ATTACH queries for building a specified access entity to a file.
|
||||
void writeEntityFile(const std::filesystem::path & file_path, const IAccessEntity & entity)
|
||||
void writeEntityFile(const String & file_path, const IAccessEntity & entity)
|
||||
{
|
||||
/// Build list of ATTACH queries.
|
||||
ASTs queries;
|
||||
@ -220,14 +219,14 @@ namespace
|
||||
|
||||
|
||||
/// Calculates the path to a file named <id>.sql for saving an access entity.
|
||||
std::filesystem::path getEntityFilePath(const String & directory_path, const UUID & id)
|
||||
String getEntityFilePath(const String & directory_path, const UUID & id)
|
||||
{
|
||||
return std::filesystem::path(directory_path).append(toString(id)).replace_extension(".sql");
|
||||
return directory_path + toString(id) + ".sql";
|
||||
}
|
||||
|
||||
|
||||
/// Reads a map of name of access entity to UUID for access entities of some type from a file.
|
||||
std::vector<std::pair<UUID, String>> readListFile(const std::filesystem::path & file_path)
|
||||
std::vector<std::pair<UUID, String>> readListFile(const String & file_path)
|
||||
{
|
||||
ReadBufferFromFile in(file_path);
|
||||
|
||||
@ -250,7 +249,7 @@ namespace
|
||||
|
||||
|
||||
/// Writes a map of name of access entity to UUID for access entities of some type to a file.
|
||||
void writeListFile(const std::filesystem::path & file_path, const std::vector<std::pair<UUID, std::string_view>> & id_name_pairs)
|
||||
void writeListFile(const String & file_path, const std::vector<std::pair<UUID, std::string_view>> & id_name_pairs)
|
||||
{
|
||||
WriteBufferFromFile out(file_path);
|
||||
writeVarUInt(id_name_pairs.size(), out);
|
||||
@ -263,20 +262,19 @@ namespace
|
||||
|
||||
|
||||
/// Calculates the path for storing a map of name of access entity to UUID for access entities of some type.
|
||||
std::filesystem::path getListFilePath(const String & directory_path, EntityType type)
|
||||
String getListFilePath(const String & directory_path, EntityType type)
|
||||
{
|
||||
String file_name = EntityTypeInfo::get(type).plural_raw_name;
|
||||
boost::to_lower(file_name);
|
||||
file_name += ".list";
|
||||
return std::filesystem::path(directory_path).append(file_name);
|
||||
return directory_path + file_name + ".list";
|
||||
}
|
||||
|
||||
|
||||
/// Calculates the path to a temporary file which existence means that list files are corrupted
|
||||
/// and need to be rebuild.
|
||||
std::filesystem::path getNeedRebuildListsMarkFilePath(const String & directory_path)
|
||||
String getNeedRebuildListsMarkFilePath(const String & directory_path)
|
||||
{
|
||||
return std::filesystem::path(directory_path).append("need_rebuild_lists.mark");
|
||||
return directory_path + "need_rebuild_lists.mark";
|
||||
}
|
||||
|
||||
|
||||
@ -295,39 +293,18 @@ namespace
|
||||
}
|
||||
|
||||
|
||||
DiskAccessStorage::DiskAccessStorage()
|
||||
: IAccessStorage("local directory")
|
||||
DiskAccessStorage::DiskAccessStorage(const String & directory_path_, bool readonly_)
|
||||
: DiskAccessStorage(STORAGE_TYPE, directory_path_, readonly_)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
DiskAccessStorage::~DiskAccessStorage()
|
||||
{
|
||||
stopListsWritingThread();
|
||||
writeLists();
|
||||
}
|
||||
|
||||
|
||||
void DiskAccessStorage::setDirectory(const String & directory_path_)
|
||||
{
|
||||
Notifications notifications;
|
||||
SCOPE_EXIT({ notify(notifications); });
|
||||
|
||||
std::lock_guard lock{mutex};
|
||||
initialize(directory_path_, notifications);
|
||||
}
|
||||
|
||||
|
||||
void DiskAccessStorage::initialize(const String & directory_path_, Notifications & notifications)
|
||||
DiskAccessStorage::DiskAccessStorage(const String & storage_name_, const String & directory_path_, bool readonly_)
|
||||
: IAccessStorage(storage_name_)
|
||||
{
|
||||
auto canonical_directory_path = std::filesystem::weakly_canonical(directory_path_);
|
||||
|
||||
if (initialized)
|
||||
{
|
||||
if (directory_path == canonical_directory_path)
|
||||
return;
|
||||
throw Exception("Storage " + getStorageName() + " already initialized with another directory", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
if (canonical_directory_path.has_filename())
|
||||
canonical_directory_path += std::filesystem::path::preferred_separator;
|
||||
|
||||
std::error_code create_dir_error_code;
|
||||
std::filesystem::create_directories(canonical_directory_path, create_dir_error_code);
|
||||
@ -336,7 +313,7 @@ void DiskAccessStorage::initialize(const String & directory_path_, Notifications
|
||||
throw Exception("Couldn't create directory " + canonical_directory_path.string() + " reason: '" + create_dir_error_code.message() + "'", ErrorCodes::DIRECTORY_DOESNT_EXIST);
|
||||
|
||||
directory_path = canonical_directory_path;
|
||||
initialized = true;
|
||||
readonly = readonly_;
|
||||
|
||||
bool should_rebuild_lists = std::filesystem::exists(getNeedRebuildListsMarkFilePath(directory_path));
|
||||
if (!should_rebuild_lists)
|
||||
@ -350,9 +327,13 @@ void DiskAccessStorage::initialize(const String & directory_path_, Notifications
|
||||
rebuildLists();
|
||||
writeLists();
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto & [id, entry] : entries_by_id)
|
||||
prepareNotifications(id, entry, false, notifications);
|
||||
|
||||
DiskAccessStorage::~DiskAccessStorage()
|
||||
{
|
||||
stopListsWritingThread();
|
||||
writeLists();
|
||||
}
|
||||
|
||||
|
||||
@ -375,7 +356,7 @@ bool DiskAccessStorage::readLists()
|
||||
auto file_path = getListFilePath(directory_path, type);
|
||||
if (!std::filesystem::exists(file_path))
|
||||
{
|
||||
LOG_WARNING(getLogger(), "File {} doesn't exist", file_path.string());
|
||||
LOG_WARNING(getLogger(), "File {} doesn't exist", file_path);
|
||||
ok = false;
|
||||
break;
|
||||
}
|
||||
@ -393,7 +374,7 @@ bool DiskAccessStorage::readLists()
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(getLogger(), "Could not read " + file_path.string());
|
||||
tryLogCurrentException(getLogger(), "Could not read " + file_path);
|
||||
ok = false;
|
||||
break;
|
||||
}
|
||||
@ -428,7 +409,7 @@ bool DiskAccessStorage::writeLists()
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(getLogger(), "Could not write " + file_path.string());
|
||||
tryLogCurrentException(getLogger(), "Could not write " + file_path);
|
||||
failed_to_write_lists = true;
|
||||
types_of_lists_to_write.clear();
|
||||
return false;
|
||||
@ -598,7 +579,7 @@ String DiskAccessStorage::readNameImpl(const UUID & id) const
|
||||
|
||||
bool DiskAccessStorage::canInsertImpl(const AccessEntityPtr &) const
|
||||
{
|
||||
return initialized;
|
||||
return !readonly;
|
||||
}
|
||||
|
||||
|
||||
@ -618,11 +599,9 @@ void DiskAccessStorage::insertNoLock(const UUID & id, const AccessEntityPtr & ne
|
||||
{
|
||||
const String & name = new_entity->getName();
|
||||
EntityType type = new_entity->getType();
|
||||
if (!initialized)
|
||||
throw Exception(
|
||||
"Cannot insert " + new_entity->outputTypeAndName() + " to storage [" + getStorageName()
|
||||
+ "] because the output directory is not set",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (readonly)
|
||||
throwReadonlyCannotInsert(type, name);
|
||||
|
||||
/// Check that we can insert.
|
||||
auto it_by_id = entries_by_id.find(id);
|
||||
@ -675,6 +654,9 @@ void DiskAccessStorage::removeNoLock(const UUID & id, Notifications & notificati
|
||||
Entry & entry = it->second;
|
||||
EntityType type = entry.type;
|
||||
|
||||
if (readonly)
|
||||
throwReadonlyCannotRemove(type, entry.name);
|
||||
|
||||
scheduleWriteLists(type);
|
||||
deleteAccessEntityOnDisk(id);
|
||||
|
||||
@ -703,6 +685,8 @@ void DiskAccessStorage::updateNoLock(const UUID & id, const UpdateFunc & update_
|
||||
throwNotFound(id);
|
||||
|
||||
Entry & entry = it->second;
|
||||
if (readonly)
|
||||
throwReadonlyCannotUpdate(entry.type, entry.name);
|
||||
if (!entry.entity)
|
||||
entry.entity = readAccessEntityFromDisk(id);
|
||||
auto old_entity = entry.entity;
|
||||
@ -757,7 +741,7 @@ void DiskAccessStorage::deleteAccessEntityOnDisk(const UUID & id) const
|
||||
{
|
||||
auto file_path = getEntityFilePath(directory_path, id);
|
||||
if (!std::filesystem::remove(file_path))
|
||||
throw Exception("Couldn't delete " + file_path.string(), ErrorCodes::FILE_DOESNT_EXIST);
|
||||
throw Exception("Couldn't delete " + file_path, ErrorCodes::FILE_DOESNT_EXIST);
|
||||
}
|
||||
|
||||
|
||||
|
@ -11,10 +11,13 @@ namespace DB
|
||||
class DiskAccessStorage : public IAccessStorage
|
||||
{
|
||||
public:
|
||||
DiskAccessStorage();
|
||||
static constexpr char STORAGE_TYPE[] = "local directory";
|
||||
|
||||
DiskAccessStorage(const String & storage_name_, const String & directory_path_, bool readonly_ = false);
|
||||
DiskAccessStorage(const String & directory_path_, bool readonly_ = false);
|
||||
~DiskAccessStorage() override;
|
||||
|
||||
void setDirectory(const String & directory_path_);
|
||||
const char * getStorageType() const override { return STORAGE_TYPE; }
|
||||
|
||||
private:
|
||||
std::optional<UUID> findImpl(EntityType type, const String & name) const override;
|
||||
@ -31,7 +34,6 @@ private:
|
||||
bool hasSubscriptionImpl(const UUID & id) const override;
|
||||
bool hasSubscriptionImpl(EntityType type) const override;
|
||||
|
||||
void initialize(const String & directory_path_, Notifications & notifications);
|
||||
void clear();
|
||||
bool readLists();
|
||||
bool writeLists();
|
||||
@ -63,7 +65,7 @@ private:
|
||||
void prepareNotifications(const UUID & id, const Entry & entry, bool remove, Notifications & notifications) const;
|
||||
|
||||
String directory_path;
|
||||
bool initialized = false;
|
||||
bool readonly;
|
||||
std::unordered_map<UUID, Entry> entries_by_id;
|
||||
std::unordered_map<std::string_view, Entry *> entries_by_name_and_type[static_cast<size_t>(EntityType::MAX)];
|
||||
boost::container::flat_set<EntityType> types_of_lists_to_write;
|
||||
|
@ -24,6 +24,7 @@ public:
|
||||
|
||||
/// Returns the name of this storage.
|
||||
const String & getStorageName() const { return storage_name; }
|
||||
virtual const char * getStorageType() const = 0;
|
||||
|
||||
using EntityType = IAccessEntity::Type;
|
||||
using EntityTypeInfo = IAccessEntity::TypeInfo;
|
||||
|
@ -13,7 +13,11 @@ namespace DB
|
||||
class MemoryAccessStorage : public IAccessStorage
|
||||
{
|
||||
public:
|
||||
MemoryAccessStorage(const String & storage_name_ = "memory");
|
||||
static constexpr char STORAGE_TYPE[] = "memory";
|
||||
|
||||
MemoryAccessStorage(const String & storage_name_ = STORAGE_TYPE);
|
||||
|
||||
const char * getStorageType() const override { return STORAGE_TYPE; }
|
||||
|
||||
/// Sets all entities at once.
|
||||
void setAll(const std::vector<AccessEntityPtr> & all_entities);
|
||||
|
@ -1,6 +1,9 @@
|
||||
#include <Access/MultipleAccessStorage.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <ext/range.h>
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include <boost/range/algorithm/copy.hpp>
|
||||
#include <boost/range/algorithm/find.hpp>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -11,27 +14,90 @@ namespace ErrorCodes
|
||||
extern const int ACCESS_ENTITY_ALREADY_EXISTS;
|
||||
}
|
||||
|
||||
using Storage = IAccessStorage;
|
||||
using StoragePtr = std::shared_ptr<Storage>;
|
||||
using ConstStoragePtr = std::shared_ptr<const Storage>;
|
||||
using Storages = std::vector<StoragePtr>;
|
||||
|
||||
MultipleAccessStorage::MultipleAccessStorage(
|
||||
const String & storage_name_,
|
||||
std::vector<std::unique_ptr<Storage>> nested_storages_)
|
||||
|
||||
MultipleAccessStorage::MultipleAccessStorage(const String & storage_name_)
|
||||
: IAccessStorage(storage_name_)
|
||||
, nested_storages(std::move(nested_storages_))
|
||||
, nested_storages(std::make_shared<Storages>())
|
||||
, ids_cache(512 /* cache size */)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
void MultipleAccessStorage::setStorages(const std::vector<StoragePtr> & storages)
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
nested_storages = std::make_shared<const Storages>(storages);
|
||||
ids_cache.reset();
|
||||
updateSubscriptionsToNestedStorages(lock);
|
||||
}
|
||||
|
||||
void MultipleAccessStorage::addStorage(const StoragePtr & new_storage)
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
if (boost::range::find(*nested_storages, new_storage) != nested_storages->end())
|
||||
return;
|
||||
auto new_storages = std::make_shared<Storages>(*nested_storages);
|
||||
new_storages->push_back(new_storage);
|
||||
nested_storages = new_storages;
|
||||
updateSubscriptionsToNestedStorages(lock);
|
||||
}
|
||||
|
||||
void MultipleAccessStorage::removeStorage(const StoragePtr & storage_to_remove)
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
auto it = boost::range::find(*nested_storages, storage_to_remove);
|
||||
if (it == nested_storages->end())
|
||||
return;
|
||||
size_t index = it - nested_storages->begin();
|
||||
auto new_storages = std::make_shared<Storages>(*nested_storages);
|
||||
new_storages->erase(new_storages->begin() + index);
|
||||
nested_storages = new_storages;
|
||||
ids_cache.reset();
|
||||
updateSubscriptionsToNestedStorages(lock);
|
||||
}
|
||||
|
||||
std::vector<StoragePtr> MultipleAccessStorage::getStorages()
|
||||
{
|
||||
return *getStoragesPtr();
|
||||
}
|
||||
|
||||
std::vector<ConstStoragePtr> MultipleAccessStorage::getStorages() const
|
||||
{
|
||||
auto storages = getStoragesInternal();
|
||||
std::vector<ConstStoragePtr> res;
|
||||
res.reserve(storages->size());
|
||||
boost::range::copy(*storages, std::back_inserter(res));
|
||||
return res;
|
||||
}
|
||||
|
||||
std::shared_ptr<const Storages> MultipleAccessStorage::getStoragesPtr()
|
||||
{
|
||||
return getStoragesInternal();
|
||||
}
|
||||
|
||||
std::shared_ptr<const Storages> MultipleAccessStorage::getStoragesInternal() const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
return nested_storages;
|
||||
}
|
||||
|
||||
|
||||
std::optional<UUID> MultipleAccessStorage::findImpl(EntityType type, const String & name) const
|
||||
{
|
||||
for (const auto & nested_storage : nested_storages)
|
||||
auto storages = getStoragesInternal();
|
||||
for (const auto & storage : *storages)
|
||||
{
|
||||
auto id = nested_storage->find(type, name);
|
||||
auto id = storage->find(type, name);
|
||||
if (id)
|
||||
{
|
||||
std::lock_guard lock{ids_cache_mutex};
|
||||
ids_cache.set(*id, std::make_shared<Storage *>(nested_storage.get()));
|
||||
return *id;
|
||||
std::lock_guard lock{mutex};
|
||||
ids_cache.set(*id, storage);
|
||||
return id;
|
||||
}
|
||||
}
|
||||
return {};
|
||||
@ -41,9 +107,10 @@ std::optional<UUID> MultipleAccessStorage::findImpl(EntityType type, const Strin
|
||||
std::vector<UUID> MultipleAccessStorage::findAllImpl(EntityType type) const
|
||||
{
|
||||
std::vector<UUID> all_ids;
|
||||
for (const auto & nested_storage : nested_storages)
|
||||
auto storages = getStoragesInternal();
|
||||
for (const auto & storage : *storages)
|
||||
{
|
||||
auto ids = nested_storage->findAll(type);
|
||||
auto ids = storage->findAll(type);
|
||||
all_ids.insert(all_ids.end(), std::make_move_iterator(ids.begin()), std::make_move_iterator(ids.end()));
|
||||
}
|
||||
return all_ids;
|
||||
@ -56,26 +123,24 @@ bool MultipleAccessStorage::existsImpl(const UUID & id) const
|
||||
}
|
||||
|
||||
|
||||
IAccessStorage * MultipleAccessStorage::findStorage(const UUID & id)
|
||||
StoragePtr MultipleAccessStorage::findStorage(const UUID & id)
|
||||
{
|
||||
StoragePtr from_cache;
|
||||
{
|
||||
std::lock_guard lock{ids_cache_mutex};
|
||||
auto from_cache = ids_cache.get(id);
|
||||
if (from_cache)
|
||||
{
|
||||
auto * storage = *from_cache;
|
||||
if (storage->exists(id))
|
||||
return storage;
|
||||
}
|
||||
std::lock_guard lock{mutex};
|
||||
from_cache = ids_cache.get(id);
|
||||
}
|
||||
if (from_cache && from_cache->exists(id))
|
||||
return from_cache;
|
||||
|
||||
for (const auto & nested_storage : nested_storages)
|
||||
auto storages = getStoragesInternal();
|
||||
for (const auto & storage : *storages)
|
||||
{
|
||||
if (nested_storage->exists(id))
|
||||
if (storage->exists(id))
|
||||
{
|
||||
std::lock_guard lock{ids_cache_mutex};
|
||||
ids_cache.set(id, std::make_shared<Storage *>(nested_storage.get()));
|
||||
return nested_storage.get();
|
||||
std::lock_guard lock{mutex};
|
||||
ids_cache.set(id, storage);
|
||||
return storage;
|
||||
}
|
||||
}
|
||||
|
||||
@ -83,52 +148,44 @@ IAccessStorage * MultipleAccessStorage::findStorage(const UUID & id)
|
||||
}
|
||||
|
||||
|
||||
const IAccessStorage * MultipleAccessStorage::findStorage(const UUID & id) const
|
||||
ConstStoragePtr MultipleAccessStorage::findStorage(const UUID & id) const
|
||||
{
|
||||
return const_cast<MultipleAccessStorage *>(this)->findStorage(id);
|
||||
}
|
||||
|
||||
|
||||
IAccessStorage & MultipleAccessStorage::getStorage(const UUID & id)
|
||||
StoragePtr MultipleAccessStorage::getStorage(const UUID & id)
|
||||
{
|
||||
auto * storage = findStorage(id);
|
||||
auto storage = findStorage(id);
|
||||
if (storage)
|
||||
return *storage;
|
||||
return storage;
|
||||
throwNotFound(id);
|
||||
}
|
||||
|
||||
|
||||
const IAccessStorage & MultipleAccessStorage::getStorage(const UUID & id) const
|
||||
ConstStoragePtr MultipleAccessStorage::getStorage(const UUID & id) const
|
||||
{
|
||||
return const_cast<MultipleAccessStorage *>(this)->getStorage(id);
|
||||
}
|
||||
|
||||
void MultipleAccessStorage::addStorage(std::unique_ptr<Storage> nested_storage)
|
||||
{
|
||||
/// Note that IStorage::storage_name is not changed. It is ok as this method
|
||||
/// is considered as a temporary solution allowing third-party Arcadia applications
|
||||
/// using CH as a library to register their own access storages. Do not remove
|
||||
/// this method without providing any alternative :)
|
||||
nested_storages.emplace_back(std::move(nested_storage));
|
||||
}
|
||||
|
||||
AccessEntityPtr MultipleAccessStorage::readImpl(const UUID & id) const
|
||||
{
|
||||
return getStorage(id).read(id);
|
||||
return getStorage(id)->read(id);
|
||||
}
|
||||
|
||||
|
||||
String MultipleAccessStorage::readNameImpl(const UUID & id) const
|
||||
{
|
||||
return getStorage(id).readName(id);
|
||||
return getStorage(id)->readName(id);
|
||||
}
|
||||
|
||||
|
||||
bool MultipleAccessStorage::canInsertImpl(const AccessEntityPtr & entity) const
|
||||
{
|
||||
for (const auto & nested_storage : nested_storages)
|
||||
auto storages = getStoragesInternal();
|
||||
for (const auto & storage : *storages)
|
||||
{
|
||||
if (nested_storage->canInsert(entity))
|
||||
if (storage->canInsert(entity))
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
@ -137,99 +194,202 @@ bool MultipleAccessStorage::canInsertImpl(const AccessEntityPtr & entity) const
|
||||
|
||||
UUID MultipleAccessStorage::insertImpl(const AccessEntityPtr & entity, bool replace_if_exists)
|
||||
{
|
||||
IAccessStorage * nested_storage_for_insertion = nullptr;
|
||||
for (const auto & nested_storage : nested_storages)
|
||||
auto storages = getStoragesInternal();
|
||||
|
||||
std::shared_ptr<IAccessStorage> storage_for_insertion;
|
||||
for (const auto & storage : *storages)
|
||||
{
|
||||
if (nested_storage->canInsert(entity) ||
|
||||
nested_storage->find(entity->getType(), entity->getName()))
|
||||
if (storage->canInsert(entity) ||
|
||||
storage->find(entity->getType(), entity->getName()))
|
||||
{
|
||||
nested_storage_for_insertion = nested_storage.get();
|
||||
storage_for_insertion = storage;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!nested_storage_for_insertion)
|
||||
if (!storage_for_insertion)
|
||||
throw Exception("Not found a storage to insert " + entity->outputTypeAndName(), ErrorCodes::ACCESS_STORAGE_FOR_INSERTION_NOT_FOUND);
|
||||
|
||||
auto id = replace_if_exists ? nested_storage_for_insertion->insertOrReplace(entity) : nested_storage_for_insertion->insert(entity);
|
||||
std::lock_guard lock{ids_cache_mutex};
|
||||
ids_cache.set(id, std::make_shared<Storage *>(nested_storage_for_insertion));
|
||||
auto id = replace_if_exists ? storage_for_insertion->insertOrReplace(entity) : storage_for_insertion->insert(entity);
|
||||
std::lock_guard lock{mutex};
|
||||
ids_cache.set(id, storage_for_insertion);
|
||||
return id;
|
||||
}
|
||||
|
||||
|
||||
void MultipleAccessStorage::removeImpl(const UUID & id)
|
||||
{
|
||||
getStorage(id).remove(id);
|
||||
getStorage(id)->remove(id);
|
||||
}
|
||||
|
||||
|
||||
void MultipleAccessStorage::updateImpl(const UUID & id, const UpdateFunc & update_func)
|
||||
{
|
||||
auto & storage_for_updating = getStorage(id);
|
||||
auto storage_for_updating = getStorage(id);
|
||||
|
||||
/// If the updating involves renaming check that the renamed entity will be accessible by name.
|
||||
if ((nested_storages.size() > 1) && (nested_storages.front().get() != &storage_for_updating))
|
||||
auto storages = getStoragesInternal();
|
||||
if ((storages->size() > 1) && (storages->front() != storage_for_updating))
|
||||
{
|
||||
auto old_entity = storage_for_updating.read(id);
|
||||
auto old_entity = storage_for_updating->read(id);
|
||||
auto new_entity = update_func(old_entity);
|
||||
if (new_entity->getName() != old_entity->getName())
|
||||
{
|
||||
for (const auto & nested_storage : nested_storages)
|
||||
for (const auto & storage : *storages)
|
||||
{
|
||||
if (nested_storage.get() == &storage_for_updating)
|
||||
if (storage == storage_for_updating)
|
||||
break;
|
||||
if (nested_storage->find(new_entity->getType(), new_entity->getName()))
|
||||
if (storage->find(new_entity->getType(), new_entity->getName()))
|
||||
{
|
||||
throw Exception(
|
||||
old_entity->outputTypeAndName() + ": cannot rename to " + backQuote(new_entity->getName()) + " because "
|
||||
+ new_entity->outputTypeAndName() + " already exists in " + nested_storage->getStorageName(),
|
||||
+ new_entity->outputTypeAndName() + " already exists in " + storage->getStorageName(),
|
||||
ErrorCodes::ACCESS_ENTITY_ALREADY_EXISTS);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
storage_for_updating.update(id, update_func);
|
||||
storage_for_updating->update(id, update_func);
|
||||
}
|
||||
|
||||
|
||||
ext::scope_guard MultipleAccessStorage::subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const
|
||||
{
|
||||
const auto * storage = findStorage(id);
|
||||
auto storage = findStorage(id);
|
||||
if (!storage)
|
||||
return {};
|
||||
return storage->subscribeForChanges(id, handler);
|
||||
}
|
||||
|
||||
|
||||
ext::scope_guard MultipleAccessStorage::subscribeForChangesImpl(EntityType type, const OnChangedHandler & handler) const
|
||||
{
|
||||
ext::scope_guard subscriptions;
|
||||
for (const auto & nested_storage : nested_storages)
|
||||
subscriptions.join(nested_storage->subscribeForChanges(type, handler));
|
||||
return subscriptions;
|
||||
}
|
||||
|
||||
|
||||
bool MultipleAccessStorage::hasSubscriptionImpl(const UUID & id) const
|
||||
{
|
||||
for (const auto & nested_storage : nested_storages)
|
||||
auto storages = getStoragesInternal();
|
||||
for (const auto & storage : *storages)
|
||||
{
|
||||
if (nested_storage->hasSubscription(id))
|
||||
if (storage->hasSubscription(id))
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
ext::scope_guard MultipleAccessStorage::subscribeForChangesImpl(EntityType type, const OnChangedHandler & handler) const
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
auto & handlers = handlers_by_type[static_cast<size_t>(type)];
|
||||
handlers.push_back(handler);
|
||||
auto handler_it = std::prev(handlers.end());
|
||||
if (handlers.size() == 1)
|
||||
updateSubscriptionsToNestedStorages(lock);
|
||||
|
||||
return [this, type, handler_it]
|
||||
{
|
||||
std::unique_lock lock2{mutex};
|
||||
auto & handlers2 = handlers_by_type[static_cast<size_t>(type)];
|
||||
handlers2.erase(handler_it);
|
||||
if (handlers2.empty())
|
||||
updateSubscriptionsToNestedStorages(lock2);
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
bool MultipleAccessStorage::hasSubscriptionImpl(EntityType type) const
|
||||
{
|
||||
for (const auto & nested_storage : nested_storages)
|
||||
std::lock_guard lock{mutex};
|
||||
const auto & handlers = handlers_by_type[static_cast<size_t>(type)];
|
||||
return !handlers.empty();
|
||||
}
|
||||
|
||||
|
||||
/// Updates subscriptions to nested storages.
|
||||
/// We need the subscriptions to the nested storages if someone has subscribed to us.
|
||||
/// If any of the nested storages is changed we call our subscribers.
|
||||
void MultipleAccessStorage::updateSubscriptionsToNestedStorages(std::unique_lock<std::mutex> & lock) const
|
||||
{
|
||||
/// lock is already locked.
|
||||
|
||||
std::vector<std::pair<StoragePtr, ext::scope_guard>> added_subscriptions[static_cast<size_t>(EntityType::MAX)];
|
||||
std::vector<ext::scope_guard> removed_subscriptions;
|
||||
|
||||
for (auto type : ext::range(EntityType::MAX))
|
||||
{
|
||||
if (nested_storage->hasSubscription(type))
|
||||
return true;
|
||||
auto & handlers = handlers_by_type[static_cast<size_t>(type)];
|
||||
auto & subscriptions = subscriptions_to_nested_storages[static_cast<size_t>(type)];
|
||||
if (handlers.empty())
|
||||
{
|
||||
/// None has subscribed to us, we need no subscriptions to the nested storages.
|
||||
for (auto & subscription : subscriptions | boost::adaptors::map_values)
|
||||
removed_subscriptions.push_back(std::move(subscription));
|
||||
subscriptions.clear();
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Someone has subscribed to us, now we need to have a subscription to each nested storage.
|
||||
for (auto it = subscriptions.begin(); it != subscriptions.end();)
|
||||
{
|
||||
const auto & storage = it->first;
|
||||
auto & subscription = it->second;
|
||||
if (boost::range::find(*nested_storages, storage) == nested_storages->end())
|
||||
{
|
||||
removed_subscriptions.push_back(std::move(subscription));
|
||||
it = subscriptions.erase(it);
|
||||
}
|
||||
else
|
||||
++it;
|
||||
}
|
||||
|
||||
for (const auto & storage : *nested_storages)
|
||||
{
|
||||
if (!subscriptions.count(storage))
|
||||
added_subscriptions[static_cast<size_t>(type)].push_back({storage, nullptr});
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
|
||||
/// Unlock the mutex temporarily because it's much better to subscribe to the nested storages
|
||||
/// with the mutex unlocked.
|
||||
lock.unlock();
|
||||
removed_subscriptions.clear();
|
||||
|
||||
for (auto type : ext::range(EntityType::MAX))
|
||||
{
|
||||
if (!added_subscriptions[static_cast<size_t>(type)].empty())
|
||||
{
|
||||
auto on_changed = [this, type](const UUID & id, const AccessEntityPtr & entity)
|
||||
{
|
||||
Notifications notifications;
|
||||
SCOPE_EXIT({ notify(notifications); });
|
||||
std::lock_guard lock2{mutex};
|
||||
for (const auto & handler : handlers_by_type[static_cast<size_t>(type)])
|
||||
notifications.push_back({handler, id, entity});
|
||||
};
|
||||
for (auto & [storage, subscription] : added_subscriptions[static_cast<size_t>(type)])
|
||||
subscription = storage->subscribeForChanges(type, on_changed);
|
||||
}
|
||||
}
|
||||
|
||||
/// Lock the mutex again to store added subscriptions to the nested storages.
|
||||
lock.lock();
|
||||
for (auto type : ext::range(EntityType::MAX))
|
||||
{
|
||||
if (!added_subscriptions[static_cast<size_t>(type)].empty())
|
||||
{
|
||||
auto & subscriptions = subscriptions_to_nested_storages[static_cast<size_t>(type)];
|
||||
for (auto & [storage, subscription] : added_subscriptions[static_cast<size_t>(type)])
|
||||
{
|
||||
if (!subscriptions.count(storage) && (boost::range::find(*nested_storages, storage) != nested_storages->end())
|
||||
&& !handlers_by_type[static_cast<size_t>(type)].empty())
|
||||
{
|
||||
subscriptions.emplace(std::move(storage), std::move(subscription));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
lock.unlock();
|
||||
added_subscriptions->clear();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -11,19 +11,27 @@ namespace DB
|
||||
class MultipleAccessStorage : public IAccessStorage
|
||||
{
|
||||
public:
|
||||
static constexpr char STORAGE_TYPE[] = "multiple";
|
||||
|
||||
using Storage = IAccessStorage;
|
||||
using StoragePtr = std::shared_ptr<Storage>;
|
||||
using ConstStoragePtr = std::shared_ptr<const Storage>;
|
||||
|
||||
MultipleAccessStorage(const String & storage_name_, std::vector<std::unique_ptr<Storage>> nested_storages_);
|
||||
MultipleAccessStorage(const String & storage_name_ = STORAGE_TYPE);
|
||||
|
||||
const Storage * findStorage(const UUID & id) const;
|
||||
Storage * findStorage(const UUID & id);
|
||||
const Storage & getStorage(const UUID & id) const;
|
||||
Storage & getStorage(const UUID & id);
|
||||
const char * getStorageType() const override { return STORAGE_TYPE; }
|
||||
|
||||
void addStorage(std::unique_ptr<Storage> nested_storage);
|
||||
void setStorages(const std::vector<StoragePtr> & storages);
|
||||
void addStorage(const StoragePtr & new_storage);
|
||||
void removeStorage(const StoragePtr & storage_to_remove);
|
||||
std::vector<StoragePtr> getStorages();
|
||||
std::vector<ConstStoragePtr> getStorages() const;
|
||||
std::shared_ptr<const std::vector<StoragePtr>> getStoragesPtr();
|
||||
|
||||
Storage & getStorageByIndex(size_t i) { return *(nested_storages[i]); }
|
||||
const Storage & getStorageByIndex(size_t i) const { return *(nested_storages[i]); }
|
||||
ConstStoragePtr findStorage(const UUID & id) const;
|
||||
StoragePtr findStorage(const UUID & id);
|
||||
ConstStoragePtr getStorage(const UUID & id) const;
|
||||
StoragePtr getStorage(const UUID & id);
|
||||
|
||||
protected:
|
||||
std::optional<UUID> findImpl(EntityType type, const String & name) const override;
|
||||
@ -41,9 +49,15 @@ protected:
|
||||
bool hasSubscriptionImpl(EntityType type) const override;
|
||||
|
||||
private:
|
||||
std::vector<std::unique_ptr<Storage>> nested_storages;
|
||||
mutable LRUCache<UUID, Storage *> ids_cache;
|
||||
mutable std::mutex ids_cache_mutex;
|
||||
using Storages = std::vector<StoragePtr>;
|
||||
std::shared_ptr<const Storages> getStoragesInternal() const;
|
||||
void updateSubscriptionsToNestedStorages(std::unique_lock<std::mutex> & lock) const;
|
||||
|
||||
std::shared_ptr<const Storages> nested_storages;
|
||||
mutable LRUCache<UUID, Storage> ids_cache;
|
||||
mutable std::list<OnChangedHandler> handlers_by_type[static_cast<size_t>(EntityType::MAX)];
|
||||
mutable std::unordered_map<StoragePtr, ext::scope_guard> subscriptions_to_nested_storages[static_cast<size_t>(EntityType::MAX)];
|
||||
mutable std::mutex mutex;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <Access/User.h>
|
||||
#include <Access/SettingsProfile.h>
|
||||
#include <Dictionaries/IDictionary.h>
|
||||
#include <Common/Config/ConfigReloader.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Core/Settings.h>
|
||||
@ -13,6 +14,7 @@
|
||||
#include <boost/range/algorithm/copy.hpp>
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include <cstring>
|
||||
#include <filesystem>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -467,19 +469,28 @@ namespace
|
||||
}
|
||||
|
||||
|
||||
UsersConfigAccessStorage::UsersConfigAccessStorage() : IAccessStorage("users.xml")
|
||||
UsersConfigAccessStorage::UsersConfigAccessStorage(const CheckSettingNameFunction & check_setting_name_function_)
|
||||
: UsersConfigAccessStorage(STORAGE_TYPE, check_setting_name_function_)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
void UsersConfigAccessStorage::setCheckSettingNameFunction(
|
||||
const std::function<void(const std::string_view &)> & check_setting_name_function_)
|
||||
UsersConfigAccessStorage::UsersConfigAccessStorage(const String & storage_name_, const CheckSettingNameFunction & check_setting_name_function_)
|
||||
: IAccessStorage(storage_name_), check_setting_name_function(check_setting_name_function_)
|
||||
{
|
||||
check_setting_name_function = check_setting_name_function_;
|
||||
}
|
||||
|
||||
UsersConfigAccessStorage::~UsersConfigAccessStorage() = default;
|
||||
|
||||
void UsersConfigAccessStorage::setConfiguration(const Poco::Util::AbstractConfiguration & config)
|
||||
|
||||
void UsersConfigAccessStorage::setConfig(const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
std::lock_guard lock{load_mutex};
|
||||
path.clear();
|
||||
config_reloader.reset();
|
||||
parseFromConfig(config);
|
||||
}
|
||||
|
||||
void UsersConfigAccessStorage::parseFromConfig(const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
std::vector<std::pair<UUID, AccessEntityPtr>> all_entities;
|
||||
for (const auto & entity : parseUsers(config, getLogger()))
|
||||
@ -493,6 +504,41 @@ void UsersConfigAccessStorage::setConfiguration(const Poco::Util::AbstractConfig
|
||||
memory_storage.setAll(all_entities);
|
||||
}
|
||||
|
||||
void UsersConfigAccessStorage::load(const String & users_config_path,
|
||||
const String & include_from_path,
|
||||
const String & preprocessed_dir,
|
||||
const zkutil::GetZooKeeper & get_zookeeper_function)
|
||||
{
|
||||
std::lock_guard lock{load_mutex};
|
||||
path = std::filesystem::canonical(users_config_path);
|
||||
config_reloader.reset();
|
||||
config_reloader = std::make_unique<ConfigReloader>(
|
||||
users_config_path,
|
||||
include_from_path,
|
||||
preprocessed_dir,
|
||||
zkutil::ZooKeeperNodeCache(get_zookeeper_function),
|
||||
std::make_shared<Poco::Event>(),
|
||||
[&](Poco::AutoPtr<Poco::Util::AbstractConfiguration> new_config)
|
||||
{
|
||||
parseFromConfig(*new_config);
|
||||
Settings::checkNoSettingNamesAtTopLevel(*new_config, users_config_path);
|
||||
},
|
||||
/* already_loaded = */ false);
|
||||
}
|
||||
|
||||
void UsersConfigAccessStorage::reload()
|
||||
{
|
||||
std::lock_guard lock{load_mutex};
|
||||
if (config_reloader)
|
||||
config_reloader->reload();
|
||||
}
|
||||
|
||||
void UsersConfigAccessStorage::startPeriodicReloading()
|
||||
{
|
||||
std::lock_guard lock{load_mutex};
|
||||
if (config_reloader)
|
||||
config_reloader->start();
|
||||
}
|
||||
|
||||
std::optional<UUID> UsersConfigAccessStorage::findImpl(EntityType type, const String & name) const
|
||||
{
|
||||
|
@ -1,29 +1,44 @@
|
||||
#pragma once
|
||||
|
||||
#include <Access/MemoryAccessStorage.h>
|
||||
#include <Common/ZooKeeper/Common.h>
|
||||
|
||||
|
||||
namespace Poco
|
||||
namespace Poco::Util
|
||||
{
|
||||
namespace Util
|
||||
{
|
||||
class AbstractConfiguration;
|
||||
}
|
||||
class AbstractConfiguration;
|
||||
}
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class ConfigReloader;
|
||||
|
||||
/// Implementation of IAccessStorage which loads all from users.xml periodically.
|
||||
class UsersConfigAccessStorage : public IAccessStorage
|
||||
{
|
||||
public:
|
||||
UsersConfigAccessStorage();
|
||||
static constexpr char STORAGE_TYPE[] = "users.xml";
|
||||
using CheckSettingNameFunction = std::function<void(const std::string_view &)>;
|
||||
|
||||
void setCheckSettingNameFunction(const std::function<void(const std::string_view &)> & check_setting_name_function_);
|
||||
void setConfiguration(const Poco::Util::AbstractConfiguration & config);
|
||||
UsersConfigAccessStorage(const String & storage_name_ = STORAGE_TYPE, const CheckSettingNameFunction & check_setting_name_function_ = {});
|
||||
UsersConfigAccessStorage(const CheckSettingNameFunction & check_setting_name_function_);
|
||||
~UsersConfigAccessStorage() override;
|
||||
|
||||
const char * getStorageType() const override { return STORAGE_TYPE; }
|
||||
|
||||
void setConfig(const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
void load(const String & users_config_path,
|
||||
const String & include_from_path = {},
|
||||
const String & preprocessed_dir = {},
|
||||
const zkutil::GetZooKeeper & get_zookeeper_function = {});
|
||||
void reload();
|
||||
void startPeriodicReloading();
|
||||
|
||||
private:
|
||||
void parseFromConfig(const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
std::optional<UUID> findImpl(EntityType type, const String & name) const override;
|
||||
std::vector<UUID> findAllImpl(EntityType type) const override;
|
||||
bool existsImpl(const UUID & id) const override;
|
||||
@ -39,6 +54,10 @@ private:
|
||||
bool hasSubscriptionImpl(EntityType type) const override;
|
||||
|
||||
MemoryAccessStorage memory_storage;
|
||||
std::function<void(const std::string_view &)> check_setting_name_function;
|
||||
CheckSettingNameFunction check_setting_name_function;
|
||||
|
||||
String path;
|
||||
std::unique_ptr<ConfigReloader> config_reloader;
|
||||
mutable std::mutex load_mutex;
|
||||
};
|
||||
}
|
||||
|
@ -13,6 +13,7 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int THERE_IS_NO_PROFILE;
|
||||
extern const int NO_ELEMENTS_IN_CONFIG;
|
||||
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
|
||||
}
|
||||
|
||||
|
||||
@ -106,4 +107,27 @@ void Settings::addProgramOptions(boost::program_options::options_description & o
|
||||
field.getDescription())));
|
||||
}
|
||||
}
|
||||
|
||||
void Settings::checkNoSettingNamesAtTopLevel(const Poco::Util::AbstractConfiguration & config, const String & config_path)
|
||||
{
|
||||
if (config.getBool("skip_check_for_incorrect_settings", false))
|
||||
return;
|
||||
|
||||
Settings settings;
|
||||
for (auto setting : settings.all())
|
||||
{
|
||||
const auto & name = setting.getName();
|
||||
if (config.has(name))
|
||||
{
|
||||
throw Exception(fmt::format("A setting '{}' appeared at top level in config {}."
|
||||
" But it is user-level setting that should be located in users.xml inside <profiles> section for specific profile."
|
||||
" You can add it to <profiles><default> if you want to change default value of this setting."
|
||||
" You can also disable the check - specify <skip_check_for_incorrect_settings>1</skip_check_for_incorrect_settings>"
|
||||
" in the main configuration file.",
|
||||
name, config_path),
|
||||
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -490,6 +490,10 @@ struct Settings : public BaseSettings<SettingsTraits>
|
||||
/// Adds program options to set the settings from a command line.
|
||||
/// (Don't forget to call notify() on the `variables_map` after parsing it!)
|
||||
void addProgramOptions(boost::program_options::options_description & options);
|
||||
|
||||
/// Check that there is no user-level settings at the top level in config.
|
||||
/// This is a common source of mistake (user don't know where to write user-level setting).
|
||||
static void checkNoSettingNamesAtTopLevel(const Poco::Util::AbstractConfiguration & config, const String & config_path);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -112,9 +112,6 @@ void StorageSystemQuotaLimits::fillData(MutableColumns & res_columns, const Cont
|
||||
auto quota = access_control.tryRead<Quota>(id);
|
||||
if (!quota)
|
||||
continue;
|
||||
const auto * storage = access_control.findStorage(id);
|
||||
if (!storage)
|
||||
continue;
|
||||
|
||||
add_rows(quota->getName(), quota->all_limits);
|
||||
}
|
||||
|
@ -114,7 +114,7 @@ void StorageSystemQuotas::fillData(MutableColumns & res_columns, const Context &
|
||||
auto quota = access_control.tryRead<Quota>(id);
|
||||
if (!quota)
|
||||
continue;
|
||||
const auto * storage = access_control.findStorage(id);
|
||||
auto storage = access_control.findStorage(id);
|
||||
if (!storage)
|
||||
continue;
|
||||
|
||||
|
@ -49,7 +49,7 @@ void StorageSystemRoles::fillData(MutableColumns & res_columns, const Context &
|
||||
if (!role)
|
||||
continue;
|
||||
|
||||
const auto * storage = access_control.findStorage(id);
|
||||
auto storage = access_control.findStorage(id);
|
||||
if (!storage)
|
||||
continue;
|
||||
|
||||
|
@ -130,7 +130,7 @@ void StorageSystemRowPolicies::fillData(MutableColumns & res_columns, const Cont
|
||||
auto policy = access_control.tryRead<RowPolicy>(id);
|
||||
if (!policy)
|
||||
continue;
|
||||
const auto * storage = access_control.findStorage(id);
|
||||
auto storage = access_control.findStorage(id);
|
||||
if (!storage)
|
||||
continue;
|
||||
|
||||
|
@ -76,7 +76,7 @@ void StorageSystemSettingsProfiles::fillData(MutableColumns & res_columns, const
|
||||
if (!profile)
|
||||
continue;
|
||||
|
||||
const auto * storage = access_control.findStorage(id);
|
||||
auto storage = access_control.findStorage(id);
|
||||
if (!storage)
|
||||
continue;
|
||||
|
||||
|
@ -165,7 +165,7 @@ void StorageSystemUsers::fillData(MutableColumns & res_columns, const Context &
|
||||
if (!user)
|
||||
continue;
|
||||
|
||||
const auto * storage = access_control.findStorage(id);
|
||||
auto storage = access_control.findStorage(id);
|
||||
if (!storage)
|
||||
continue;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user