mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 16:42:05 +00:00
Merge pull request #27426 from aiven/kmichel-replicated-access-storage
ZooKeeper replication for users, roles, row policies, quotas and profiles.
This commit is contained in:
commit
fd33f2a2fe
@ -1247,6 +1247,7 @@ Default value: `/var/lib/clickhouse/access/`.
|
||||
Section of the configuration file that contains settings:
|
||||
- Path to configuration file with predefined users.
|
||||
- Path to folder where users created by SQL commands are stored.
|
||||
- ZooKeeper node path where users created by SQL commands are stored and replicated (experimental).
|
||||
|
||||
If this section is specified, the path from [users_config](../../operations/server-configuration-parameters/settings.md#users-config) and [access_control_path](../../operations/server-configuration-parameters/settings.md#access_control_path) won't be used.
|
||||
|
||||
@ -1262,6 +1263,9 @@ The `user_directories` section can contain any number of items, the order of the
|
||||
<local_directory>
|
||||
<path>/var/lib/clickhouse/access/</path>
|
||||
</local_directory>
|
||||
<replicated>
|
||||
<zookeeper_path>/clickhouse/access/</zookeeper_path>
|
||||
</replicated>
|
||||
</user_directories>
|
||||
```
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include <Access/AccessControlManager.h>
|
||||
#include <Access/MultipleAccessStorage.h>
|
||||
#include <Access/MemoryAccessStorage.h>
|
||||
#include <Access/ReplicatedAccessStorage.h>
|
||||
#include <Access/UsersConfigAccessStorage.h>
|
||||
#include <Access/DiskAccessStorage.h>
|
||||
#include <Access/LDAPAccessStorage.h>
|
||||
@ -225,6 +226,22 @@ void AccessControlManager::startPeriodicReloadingUsersConfigs()
|
||||
}
|
||||
}
|
||||
|
||||
void AccessControlManager::addReplicatedStorage(
|
||||
const String & storage_name_,
|
||||
const String & zookeeper_path_,
|
||||
const zkutil::GetZooKeeper & get_zookeeper_function_)
|
||||
{
|
||||
auto storages = getStoragesPtr();
|
||||
for (const auto & storage : *storages)
|
||||
{
|
||||
if (auto replicated_storage = typeid_cast<std::shared_ptr<ReplicatedAccessStorage>>(storage))
|
||||
return;
|
||||
}
|
||||
auto new_storage = std::make_shared<ReplicatedAccessStorage>(storage_name_, zookeeper_path_, get_zookeeper_function_);
|
||||
addStorage(new_storage);
|
||||
LOG_DEBUG(getLogger(), "Added {} access storage '{}'", String(new_storage->getStorageType()), new_storage->getStorageName());
|
||||
new_storage->startup();
|
||||
}
|
||||
|
||||
void AccessControlManager::addDiskStorage(const String & directory_, bool readonly_)
|
||||
{
|
||||
@ -322,6 +339,11 @@ void AccessControlManager::addStoragesFromUserDirectoriesConfig(
|
||||
{
|
||||
addLDAPStorage(name, config, prefix);
|
||||
}
|
||||
else if (type == ReplicatedAccessStorage::STORAGE_TYPE)
|
||||
{
|
||||
String zookeeper_path = config.getString(prefix + ".zookeeper_path");
|
||||
addReplicatedStorage(name, zookeeper_path, get_zookeeper_function);
|
||||
}
|
||||
else
|
||||
throw Exception("Unknown storage type '" + type + "' at " + prefix + " in config", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
|
||||
}
|
||||
|
@ -84,6 +84,10 @@ public:
|
||||
/// Adds LDAPAccessStorage which allows querying remote LDAP server for user info.
|
||||
void addLDAPStorage(const String & storage_name_, const Poco::Util::AbstractConfiguration & config_, const String & prefix_);
|
||||
|
||||
void addReplicatedStorage(const String & storage_name,
|
||||
const String & zookeeper_path,
|
||||
const zkutil::GetZooKeeper & get_zookeeper_function);
|
||||
|
||||
/// Adds storages from <users_directories> config.
|
||||
void addStoragesFromUserDirectoriesConfig(const Poco::Util::AbstractConfiguration & config,
|
||||
const String & key,
|
||||
|
175
src/Access/AccessEntityIO.cpp
Normal file
175
src/Access/AccessEntityIO.cpp
Normal file
@ -0,0 +1,175 @@
|
||||
#include <Access/AccessEntityIO.h>
|
||||
#include <Access/IAccessEntity.h>
|
||||
#include <Access/IAccessStorage.h>
|
||||
#include <Access/Quota.h>
|
||||
#include <Access/Role.h>
|
||||
#include <Access/RowPolicy.h>
|
||||
#include <Access/SettingsProfile.h>
|
||||
#include <Access/User.h>
|
||||
#include <Core/Defines.h>
|
||||
#include <Interpreters/InterpreterCreateQuotaQuery.h>
|
||||
#include <Interpreters/InterpreterCreateRoleQuery.h>
|
||||
#include <Interpreters/InterpreterCreateRowPolicyQuery.h>
|
||||
#include <Interpreters/InterpreterCreateSettingsProfileQuery.h>
|
||||
#include <Interpreters/InterpreterCreateUserQuery.h>
|
||||
#include <Interpreters/InterpreterGrantQuery.h>
|
||||
#include <Interpreters/InterpreterShowCreateAccessEntityQuery.h>
|
||||
#include <Interpreters/InterpreterShowGrantsQuery.h>
|
||||
#include <Parsers/ASTCreateQuotaQuery.h>
|
||||
#include <Parsers/ASTCreateRoleQuery.h>
|
||||
#include <Parsers/ASTCreateRowPolicyQuery.h>
|
||||
#include <Parsers/ASTCreateSettingsProfileQuery.h>
|
||||
#include <Parsers/ASTCreateUserQuery.h>
|
||||
#include <Parsers/ASTGrantQuery.h>
|
||||
#include <Parsers/ParserCreateQuotaQuery.h>
|
||||
#include <Parsers/ParserCreateRoleQuery.h>
|
||||
#include <Parsers/ParserCreateRowPolicyQuery.h>
|
||||
#include <Parsers/ParserCreateSettingsProfileQuery.h>
|
||||
#include <Parsers/ParserCreateUserQuery.h>
|
||||
#include <Parsers/ParserGrantQuery.h>
|
||||
#include <Parsers/formatAST.h>
|
||||
#include <Parsers/parseQuery.h>
|
||||
#include <boost/range/algorithm/copy.hpp>
|
||||
#include <boost/range/algorithm_ext/push_back.hpp>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int INCORRECT_ACCESS_ENTITY_DEFINITION;
|
||||
}
|
||||
|
||||
using EntityType = IAccessStorage::EntityType;
|
||||
using EntityTypeInfo = IAccessStorage::EntityTypeInfo;
|
||||
|
||||
namespace
|
||||
{
|
||||
/// Special parser for the 'ATTACH access entity' queries.
|
||||
class ParserAttachAccessEntity : public IParserBase
|
||||
{
|
||||
protected:
|
||||
const char * getName() const override { return "ATTACH access entity query"; }
|
||||
|
||||
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override
|
||||
{
|
||||
ParserCreateUserQuery create_user_p;
|
||||
ParserCreateRoleQuery create_role_p;
|
||||
ParserCreateRowPolicyQuery create_policy_p;
|
||||
ParserCreateQuotaQuery create_quota_p;
|
||||
ParserCreateSettingsProfileQuery create_profile_p;
|
||||
ParserGrantQuery grant_p;
|
||||
|
||||
create_user_p.useAttachMode();
|
||||
create_role_p.useAttachMode();
|
||||
create_policy_p.useAttachMode();
|
||||
create_quota_p.useAttachMode();
|
||||
create_profile_p.useAttachMode();
|
||||
grant_p.useAttachMode();
|
||||
|
||||
return create_user_p.parse(pos, node, expected) || create_role_p.parse(pos, node, expected)
|
||||
|| create_policy_p.parse(pos, node, expected) || create_quota_p.parse(pos, node, expected)
|
||||
|| create_profile_p.parse(pos, node, expected) || grant_p.parse(pos, node, expected);
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
||||
String serializeAccessEntity(const IAccessEntity & entity)
|
||||
{
|
||||
/// Build list of ATTACH queries.
|
||||
ASTs queries;
|
||||
queries.push_back(InterpreterShowCreateAccessEntityQuery::getAttachQuery(entity));
|
||||
if ((entity.getType() == EntityType::USER) || (entity.getType() == EntityType::ROLE))
|
||||
boost::range::push_back(queries, InterpreterShowGrantsQuery::getAttachGrantQueries(entity));
|
||||
|
||||
/// Serialize the list of ATTACH queries to a string.
|
||||
WriteBufferFromOwnString buf;
|
||||
for (const ASTPtr & query : queries)
|
||||
{
|
||||
formatAST(*query, buf, false, true);
|
||||
buf.write(";\n", 2);
|
||||
}
|
||||
return buf.str();
|
||||
}
|
||||
|
||||
AccessEntityPtr deserializeAccessEntity(const String & definition, const String & path)
|
||||
{
|
||||
ASTs queries;
|
||||
ParserAttachAccessEntity parser;
|
||||
const char * begin = definition.data(); /// begin of current query
|
||||
const char * pos = begin; /// parser moves pos from begin to the end of current query
|
||||
const char * end = begin + definition.size();
|
||||
while (pos < end)
|
||||
{
|
||||
queries.emplace_back(parseQueryAndMovePosition(parser, pos, end, "", true, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH));
|
||||
while (isWhitespaceASCII(*pos) || *pos == ';')
|
||||
++pos;
|
||||
}
|
||||
|
||||
/// Interpret the AST to build an access entity.
|
||||
std::shared_ptr<User> user;
|
||||
std::shared_ptr<Role> role;
|
||||
std::shared_ptr<RowPolicy> policy;
|
||||
std::shared_ptr<Quota> quota;
|
||||
std::shared_ptr<SettingsProfile> profile;
|
||||
AccessEntityPtr res;
|
||||
|
||||
for (const auto & query : queries)
|
||||
{
|
||||
if (auto * create_user_query = query->as<ASTCreateUserQuery>())
|
||||
{
|
||||
if (res)
|
||||
throw Exception("Two access entities attached in " + path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
res = user = std::make_unique<User>();
|
||||
InterpreterCreateUserQuery::updateUserFromQuery(*user, *create_user_query);
|
||||
}
|
||||
else if (auto * create_role_query = query->as<ASTCreateRoleQuery>())
|
||||
{
|
||||
if (res)
|
||||
throw Exception("Two access entities attached in " + path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
res = role = std::make_unique<Role>();
|
||||
InterpreterCreateRoleQuery::updateRoleFromQuery(*role, *create_role_query);
|
||||
}
|
||||
else if (auto * create_policy_query = query->as<ASTCreateRowPolicyQuery>())
|
||||
{
|
||||
if (res)
|
||||
throw Exception("Two access entities attached in " + path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
res = policy = std::make_unique<RowPolicy>();
|
||||
InterpreterCreateRowPolicyQuery::updateRowPolicyFromQuery(*policy, *create_policy_query);
|
||||
}
|
||||
else if (auto * create_quota_query = query->as<ASTCreateQuotaQuery>())
|
||||
{
|
||||
if (res)
|
||||
throw Exception("Two access entities attached in " + path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
res = quota = std::make_unique<Quota>();
|
||||
InterpreterCreateQuotaQuery::updateQuotaFromQuery(*quota, *create_quota_query);
|
||||
}
|
||||
else if (auto * create_profile_query = query->as<ASTCreateSettingsProfileQuery>())
|
||||
{
|
||||
if (res)
|
||||
throw Exception("Two access entities attached in " + path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
res = profile = std::make_unique<SettingsProfile>();
|
||||
InterpreterCreateSettingsProfileQuery::updateSettingsProfileFromQuery(*profile, *create_profile_query);
|
||||
}
|
||||
else if (auto * grant_query = query->as<ASTGrantQuery>())
|
||||
{
|
||||
if (!user && !role)
|
||||
throw Exception(
|
||||
"A user or role should be attached before grant in " + path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
if (user)
|
||||
InterpreterGrantQuery::updateUserFromQuery(*user, *grant_query);
|
||||
else
|
||||
InterpreterGrantQuery::updateRoleFromQuery(*role, *grant_query);
|
||||
}
|
||||
else
|
||||
throw Exception("No interpreter found for query " + query->getID(), ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
}
|
||||
|
||||
if (!res)
|
||||
throw Exception("No access entities attached in " + path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
}
|
12
src/Access/AccessEntityIO.h
Normal file
12
src/Access/AccessEntityIO.h
Normal file
@ -0,0 +1,12 @@
|
||||
#pragma once
|
||||
|
||||
#include <Access/IAccessEntity.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
String serializeAccessEntity(const IAccessEntity & entity);
|
||||
|
||||
AccessEntityPtr deserializeAccessEntity(const String & definition, const String & path);
|
||||
|
||||
}
|
@ -4,41 +4,20 @@
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <Access/AccessEntityIO.h>
|
||||
#include <Access/User.h>
|
||||
#include <Access/Role.h>
|
||||
#include <Access/RowPolicy.h>
|
||||
#include <Access/Quota.h>
|
||||
#include <Access/SettingsProfile.h>
|
||||
#include <Parsers/ASTCreateUserQuery.h>
|
||||
#include <Parsers/ASTCreateRoleQuery.h>
|
||||
#include <Parsers/ASTCreateRowPolicyQuery.h>
|
||||
#include <Parsers/ASTCreateQuotaQuery.h>
|
||||
#include <Parsers/ASTCreateSettingsProfileQuery.h>
|
||||
#include <Parsers/ASTGrantQuery.h>
|
||||
#include <Parsers/ParserCreateUserQuery.h>
|
||||
#include <Parsers/ParserCreateRoleQuery.h>
|
||||
#include <Parsers/ParserCreateRowPolicyQuery.h>
|
||||
#include <Parsers/ParserCreateQuotaQuery.h>
|
||||
#include <Parsers/ParserCreateSettingsProfileQuery.h>
|
||||
#include <Parsers/ParserGrantQuery.h>
|
||||
#include <Parsers/formatAST.h>
|
||||
#include <Parsers/parseQuery.h>
|
||||
#include <Interpreters/InterpreterCreateUserQuery.h>
|
||||
#include <Interpreters/InterpreterCreateRoleQuery.h>
|
||||
#include <Interpreters/InterpreterCreateRowPolicyQuery.h>
|
||||
#include <Interpreters/InterpreterCreateQuotaQuery.h>
|
||||
#include <Interpreters/InterpreterCreateSettingsProfileQuery.h>
|
||||
#include <Interpreters/InterpreterGrantQuery.h>
|
||||
#include <Interpreters/InterpreterShowCreateAccessEntityQuery.h>
|
||||
#include <Interpreters/InterpreterShowGrantsQuery.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Core/Defines.h>
|
||||
#include <Poco/JSON/JSON.h>
|
||||
#include <Poco/JSON/Object.h>
|
||||
#include <Poco/JSON/Stringifier.h>
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include <boost/range/algorithm/copy.hpp>
|
||||
#include <boost/range/algorithm_ext/push_back.hpp>
|
||||
#include <filesystem>
|
||||
#include <fstream>
|
||||
|
||||
@ -49,7 +28,6 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int DIRECTORY_DOESNT_EXIST;
|
||||
extern const int FILE_DOESNT_EXIST;
|
||||
extern const int INCORRECT_ACCESS_ENTITY_DEFINITION;
|
||||
}
|
||||
|
||||
|
||||
@ -58,34 +36,6 @@ namespace
|
||||
using EntityType = IAccessStorage::EntityType;
|
||||
using EntityTypeInfo = IAccessStorage::EntityTypeInfo;
|
||||
|
||||
/// Special parser for the 'ATTACH access entity' queries.
|
||||
class ParserAttachAccessEntity : public IParserBase
|
||||
{
|
||||
protected:
|
||||
const char * getName() const override { return "ATTACH access entity query"; }
|
||||
|
||||
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override
|
||||
{
|
||||
ParserCreateUserQuery create_user_p;
|
||||
ParserCreateRoleQuery create_role_p;
|
||||
ParserCreateRowPolicyQuery create_policy_p;
|
||||
ParserCreateQuotaQuery create_quota_p;
|
||||
ParserCreateSettingsProfileQuery create_profile_p;
|
||||
ParserGrantQuery grant_p;
|
||||
|
||||
create_user_p.useAttachMode();
|
||||
create_role_p.useAttachMode();
|
||||
create_policy_p.useAttachMode();
|
||||
create_quota_p.useAttachMode();
|
||||
create_profile_p.useAttachMode();
|
||||
grant_p.useAttachMode();
|
||||
|
||||
return create_user_p.parse(pos, node, expected) || create_role_p.parse(pos, node, expected)
|
||||
|| create_policy_p.parse(pos, node, expected) || create_quota_p.parse(pos, node, expected)
|
||||
|| create_profile_p.parse(pos, node, expected) || grant_p.parse(pos, node, expected);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/// Reads a file containing ATTACH queries and then parses it to build an access entity.
|
||||
AccessEntityPtr readEntityFile(const String & file_path)
|
||||
@ -96,80 +46,7 @@ namespace
|
||||
readStringUntilEOF(file_contents, in);
|
||||
|
||||
/// Parse the file contents.
|
||||
ASTs queries;
|
||||
ParserAttachAccessEntity parser;
|
||||
const char * begin = file_contents.data(); /// begin of current query
|
||||
const char * pos = begin; /// parser moves pos from begin to the end of current query
|
||||
const char * end = begin + file_contents.size();
|
||||
while (pos < end)
|
||||
{
|
||||
queries.emplace_back(parseQueryAndMovePosition(parser, pos, end, "", true, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH));
|
||||
while (isWhitespaceASCII(*pos) || *pos == ';')
|
||||
++pos;
|
||||
}
|
||||
|
||||
/// Interpret the AST to build an access entity.
|
||||
std::shared_ptr<User> user;
|
||||
std::shared_ptr<Role> role;
|
||||
std::shared_ptr<RowPolicy> policy;
|
||||
std::shared_ptr<Quota> quota;
|
||||
std::shared_ptr<SettingsProfile> profile;
|
||||
AccessEntityPtr res;
|
||||
|
||||
for (const auto & query : queries)
|
||||
{
|
||||
if (auto * create_user_query = query->as<ASTCreateUserQuery>())
|
||||
{
|
||||
if (res)
|
||||
throw Exception("Two access entities in one file " + file_path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
res = user = std::make_unique<User>();
|
||||
InterpreterCreateUserQuery::updateUserFromQuery(*user, *create_user_query);
|
||||
}
|
||||
else if (auto * create_role_query = query->as<ASTCreateRoleQuery>())
|
||||
{
|
||||
if (res)
|
||||
throw Exception("Two access entities in one file " + file_path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
res = role = std::make_unique<Role>();
|
||||
InterpreterCreateRoleQuery::updateRoleFromQuery(*role, *create_role_query);
|
||||
}
|
||||
else if (auto * create_policy_query = query->as<ASTCreateRowPolicyQuery>())
|
||||
{
|
||||
if (res)
|
||||
throw Exception("Two access entities in one file " + file_path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
res = policy = std::make_unique<RowPolicy>();
|
||||
InterpreterCreateRowPolicyQuery::updateRowPolicyFromQuery(*policy, *create_policy_query);
|
||||
}
|
||||
else if (auto * create_quota_query = query->as<ASTCreateQuotaQuery>())
|
||||
{
|
||||
if (res)
|
||||
throw Exception("Two access entities are attached in the same file " + file_path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
res = quota = std::make_unique<Quota>();
|
||||
InterpreterCreateQuotaQuery::updateQuotaFromQuery(*quota, *create_quota_query);
|
||||
}
|
||||
else if (auto * create_profile_query = query->as<ASTCreateSettingsProfileQuery>())
|
||||
{
|
||||
if (res)
|
||||
throw Exception("Two access entities are attached in the same file " + file_path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
res = profile = std::make_unique<SettingsProfile>();
|
||||
InterpreterCreateSettingsProfileQuery::updateSettingsProfileFromQuery(*profile, *create_profile_query);
|
||||
}
|
||||
else if (auto * grant_query = query->as<ASTGrantQuery>())
|
||||
{
|
||||
if (!user && !role)
|
||||
throw Exception("A user or role should be attached before grant in file " + file_path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
if (user)
|
||||
InterpreterGrantQuery::updateUserFromQuery(*user, *grant_query);
|
||||
else
|
||||
InterpreterGrantQuery::updateRoleFromQuery(*role, *grant_query);
|
||||
}
|
||||
else
|
||||
throw Exception("No interpreter found for query " + query->getID(), ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
}
|
||||
|
||||
if (!res)
|
||||
throw Exception("No access entities attached in file " + file_path, ErrorCodes::INCORRECT_ACCESS_ENTITY_DEFINITION);
|
||||
|
||||
return res;
|
||||
return deserializeAccessEntity(file_contents, file_path);
|
||||
}
|
||||
|
||||
|
||||
@ -186,24 +63,10 @@ namespace
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Writes ATTACH queries for building a specified access entity to a file.
|
||||
void writeEntityFile(const String & file_path, const IAccessEntity & entity)
|
||||
{
|
||||
/// Build list of ATTACH queries.
|
||||
ASTs queries;
|
||||
queries.push_back(InterpreterShowCreateAccessEntityQuery::getAttachQuery(entity));
|
||||
if ((entity.getType() == EntityType::USER) || (entity.getType() == EntityType::ROLE))
|
||||
boost::range::push_back(queries, InterpreterShowGrantsQuery::getAttachGrantQueries(entity));
|
||||
|
||||
/// Serialize the list of ATTACH queries to a string.
|
||||
WriteBufferFromOwnString buf;
|
||||
for (const ASTPtr & query : queries)
|
||||
{
|
||||
formatAST(*query, buf, false, true);
|
||||
buf.write(";\n", 2);
|
||||
}
|
||||
String file_contents = buf.str();
|
||||
String file_contents = serializeAccessEntity(entity);
|
||||
|
||||
/// First we save *.tmp file and then we rename if everything's ok.
|
||||
auto tmp_file_path = std::filesystem::path{file_path}.replace_extension(".tmp");
|
||||
|
618
src/Access/ReplicatedAccessStorage.cpp
Normal file
618
src/Access/ReplicatedAccessStorage.cpp
Normal file
@ -0,0 +1,618 @@
|
||||
#include <Access/AccessEntityIO.h>
|
||||
#include <Access/MemoryAccessStorage.h>
|
||||
#include <Access/ReplicatedAccessStorage.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <boost/container/flat_set.hpp>
|
||||
#include <Common/ZooKeeper/KeeperException.h>
|
||||
#include <Common/ZooKeeper/Types.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <common/range.h>
|
||||
#include <common/sleep.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int NO_ZOOKEEPER;
|
||||
}
|
||||
|
||||
static UUID parseUUID(const String & text)
|
||||
{
|
||||
UUID uuid = UUIDHelpers::Nil;
|
||||
auto buffer = ReadBufferFromMemory(text.data(), text.length());
|
||||
readUUIDText(uuid, buffer);
|
||||
return uuid;
|
||||
}
|
||||
|
||||
ReplicatedAccessStorage::ReplicatedAccessStorage(
|
||||
const String & storage_name_,
|
||||
const String & zookeeper_path_,
|
||||
zkutil::GetZooKeeper get_zookeeper_)
|
||||
: IAccessStorage(storage_name_)
|
||||
, zookeeper_path(zookeeper_path_)
|
||||
, get_zookeeper(get_zookeeper_)
|
||||
{
|
||||
if (zookeeper_path.empty())
|
||||
throw Exception("ZooKeeper path must be non-empty", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
if (zookeeper_path.back() == '/')
|
||||
zookeeper_path.resize(zookeeper_path.size() - 1);
|
||||
|
||||
/// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it.
|
||||
if (zookeeper_path.front() != '/')
|
||||
zookeeper_path = "/" + zookeeper_path;
|
||||
}
|
||||
|
||||
ReplicatedAccessStorage::~ReplicatedAccessStorage()
|
||||
{
|
||||
ReplicatedAccessStorage::shutdown();
|
||||
}
|
||||
|
||||
|
||||
void ReplicatedAccessStorage::startup()
|
||||
{
|
||||
initializeZookeeper();
|
||||
worker_thread = ThreadFromGlobalPool(&ReplicatedAccessStorage::runWorkerThread, this);
|
||||
}
|
||||
|
||||
void ReplicatedAccessStorage::shutdown()
|
||||
{
|
||||
bool prev_stop_flag = stop_flag.exchange(true);
|
||||
if (!prev_stop_flag)
|
||||
{
|
||||
/// Notify the worker thread to stop waiting for new queue items
|
||||
refresh_queue.push(UUIDHelpers::Nil);
|
||||
worker_thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Func>
|
||||
static void retryOnZooKeeperUserError(size_t attempts, Func && function)
|
||||
{
|
||||
while (attempts > 0)
|
||||
{
|
||||
try
|
||||
{
|
||||
function();
|
||||
return;
|
||||
}
|
||||
catch (zkutil::KeeperException & keeper_exception)
|
||||
{
|
||||
if (Coordination::isUserError(keeper_exception.code) && attempts > 1)
|
||||
attempts -= 1;
|
||||
else
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
UUID ReplicatedAccessStorage::insertImpl(const AccessEntityPtr & new_entity, bool replace_if_exists)
|
||||
{
|
||||
const UUID id = generateRandomID();
|
||||
const EntityTypeInfo type_info = EntityTypeInfo::get(new_entity->getType());
|
||||
const String & name = new_entity->getName();
|
||||
LOG_DEBUG(getLogger(), "Inserting entity of type {} named {} with id {}", type_info.name, name, toString(id));
|
||||
|
||||
auto zookeeper = get_zookeeper();
|
||||
retryOnZooKeeperUserError(10, [&]{ insertZooKeeper(zookeeper, id, new_entity, replace_if_exists); });
|
||||
|
||||
Notifications notifications;
|
||||
SCOPE_EXIT({ notify(notifications); });
|
||||
std::lock_guard lock{mutex};
|
||||
refreshEntityNoLock(zookeeper, id, notifications);
|
||||
return id;
|
||||
}
|
||||
|
||||
|
||||
void ReplicatedAccessStorage::insertZooKeeper(
|
||||
const zkutil::ZooKeeperPtr & zookeeper, const UUID & id, const AccessEntityPtr & new_entity, bool replace_if_exists)
|
||||
{
|
||||
const String & name = new_entity->getName();
|
||||
const EntityType type = new_entity->getType();
|
||||
const EntityTypeInfo type_info = EntityTypeInfo::get(type);
|
||||
|
||||
const String entity_uuid = toString(id);
|
||||
/// The entity data will be stored here, this ensures all entities have unique ids
|
||||
const String entity_path = zookeeper_path + "/uuid/" + entity_uuid;
|
||||
/// Then we create a znode with the entity name, inside the znode of each entity type
|
||||
/// This ensure all entities of the same type have a unique name
|
||||
const String name_path = zookeeper_path + "/" + type_info.unique_char + "/" + escapeForFileName(name);
|
||||
|
||||
Coordination::Requests ops;
|
||||
const String new_entity_definition = serializeAccessEntity(*new_entity);
|
||||
ops.emplace_back(zkutil::makeCreateRequest(entity_path, new_entity_definition, zkutil::CreateMode::Persistent));
|
||||
/// The content of the "name" znode is the uuid of the entity owning that name
|
||||
ops.emplace_back(zkutil::makeCreateRequest(name_path, entity_uuid, zkutil::CreateMode::Persistent));
|
||||
|
||||
Coordination::Responses responses;
|
||||
const Coordination::Error res = zookeeper->tryMulti(ops, responses);
|
||||
if (res == Coordination::Error::ZNODEEXISTS)
|
||||
{
|
||||
if (responses[0]->error == Coordination::Error::ZNODEEXISTS)
|
||||
{
|
||||
/// The UUID already exists, simply fail.
|
||||
|
||||
/// To fail with a nice error message, we need info about what already exists.
|
||||
/// This itself could fail if the conflicting uuid disappears in the meantime.
|
||||
/// If that happens, then we'll just retry from the start.
|
||||
String existing_entity_definition = zookeeper->get(entity_path);
|
||||
|
||||
AccessEntityPtr existing_entity = deserializeAccessEntity(existing_entity_definition, entity_path);
|
||||
EntityType existing_type = existing_entity->getType();
|
||||
String existing_name = existing_entity->getName();
|
||||
throwIDCollisionCannotInsert(id, type, name, existing_type, existing_name);
|
||||
}
|
||||
else if (replace_if_exists)
|
||||
{
|
||||
/// The name already exists for this type.
|
||||
/// If asked to, we need to replace the existing entity.
|
||||
|
||||
/// First get the uuid of the existing entity
|
||||
/// This itself could fail if the conflicting name disappears in the meantime.
|
||||
/// If that happens, then we'll just retry from the start.
|
||||
Coordination::Stat name_stat;
|
||||
String existing_entity_uuid = zookeeper->get(name_path, &name_stat);
|
||||
|
||||
const String existing_entity_path = zookeeper_path + "/uuid/" + existing_entity_uuid;
|
||||
Coordination::Requests replace_ops;
|
||||
replace_ops.emplace_back(zkutil::makeRemoveRequest(existing_entity_path, -1));
|
||||
replace_ops.emplace_back(zkutil::makeCreateRequest(entity_path, new_entity_definition, zkutil::CreateMode::Persistent));
|
||||
replace_ops.emplace_back(zkutil::makeSetRequest(name_path, entity_uuid, name_stat.version));
|
||||
|
||||
/// If this fails, then we'll just retry from the start.
|
||||
zookeeper->multi(replace_ops);
|
||||
}
|
||||
else
|
||||
{
|
||||
throwNameCollisionCannotInsert(type, name);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
zkutil::KeeperMultiException::check(res, ops, responses);
|
||||
}
|
||||
}
|
||||
|
||||
void ReplicatedAccessStorage::removeImpl(const UUID & id)
|
||||
{
|
||||
LOG_DEBUG(getLogger(), "Removing entity {}", toString(id));
|
||||
|
||||
auto zookeeper = get_zookeeper();
|
||||
retryOnZooKeeperUserError(10, [&] { removeZooKeeper(zookeeper, id); });
|
||||
|
||||
Notifications notifications;
|
||||
SCOPE_EXIT({ notify(notifications); });
|
||||
std::lock_guard lock{mutex};
|
||||
removeEntityNoLock(id, notifications);
|
||||
}
|
||||
|
||||
|
||||
void ReplicatedAccessStorage::removeZooKeeper(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id)
|
||||
{
|
||||
const String entity_uuid = toString(id);
|
||||
const String entity_path = zookeeper_path + "/uuid/" + entity_uuid;
|
||||
|
||||
String entity_definition;
|
||||
Coordination::Stat entity_stat;
|
||||
const bool uuid_exists = zookeeper->tryGet(entity_path, entity_definition, &entity_stat);
|
||||
if (!uuid_exists)
|
||||
throwNotFound(id);
|
||||
|
||||
const AccessEntityPtr entity = deserializeAccessEntity(entity_definition, entity_path);
|
||||
const EntityTypeInfo type_info = EntityTypeInfo::get(entity->getType());
|
||||
const String & name = entity->getName();
|
||||
|
||||
const String entity_name_path = zookeeper_path + "/" + type_info.unique_char + "/" + escapeForFileName(name);
|
||||
|
||||
Coordination::Requests ops;
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(entity_path, entity_stat.version));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(entity_name_path, -1));
|
||||
/// If this fails, then we'll just retry from the start.
|
||||
zookeeper->multi(ops);
|
||||
}
|
||||
|
||||
|
||||
void ReplicatedAccessStorage::updateImpl(const UUID & id, const UpdateFunc & update_func)
|
||||
{
|
||||
LOG_DEBUG(getLogger(), "Updating entity {}", toString(id));
|
||||
|
||||
auto zookeeper = get_zookeeper();
|
||||
retryOnZooKeeperUserError(10, [&] { updateZooKeeper(zookeeper, id, update_func); });
|
||||
|
||||
Notifications notifications;
|
||||
SCOPE_EXIT({ notify(notifications); });
|
||||
std::lock_guard lock{mutex};
|
||||
refreshEntityNoLock(zookeeper, id, notifications);
|
||||
}
|
||||
|
||||
|
||||
void ReplicatedAccessStorage::updateZooKeeper(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id, const UpdateFunc & update_func)
|
||||
{
|
||||
const String entity_uuid = toString(id);
|
||||
const String entity_path = zookeeper_path + "/uuid/" + entity_uuid;
|
||||
|
||||
String old_entity_definition;
|
||||
Coordination::Stat stat;
|
||||
const bool uuid_exists = zookeeper->tryGet(entity_path, old_entity_definition, &stat);
|
||||
if (!uuid_exists)
|
||||
throwNotFound(id);
|
||||
|
||||
const AccessEntityPtr old_entity = deserializeAccessEntity(old_entity_definition, entity_path);
|
||||
const AccessEntityPtr new_entity = update_func(old_entity);
|
||||
|
||||
if (!new_entity->isTypeOf(old_entity->getType()))
|
||||
throwBadCast(id, new_entity->getType(), new_entity->getName(), old_entity->getType());
|
||||
|
||||
const EntityTypeInfo type_info = EntityTypeInfo::get(new_entity->getType());
|
||||
|
||||
Coordination::Requests ops;
|
||||
const String new_entity_definition = serializeAccessEntity(*new_entity);
|
||||
ops.emplace_back(zkutil::makeSetRequest(entity_path, new_entity_definition, stat.version));
|
||||
|
||||
const String & old_name = old_entity->getName();
|
||||
const String & new_name = new_entity->getName();
|
||||
if (new_name != old_name)
|
||||
{
|
||||
auto old_name_path = zookeeper_path + "/" + type_info.unique_char + "/" + escapeForFileName(old_name);
|
||||
auto new_name_path = zookeeper_path + "/" + type_info.unique_char + "/" + escapeForFileName(new_name);
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(old_name_path, -1));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(new_name_path, entity_uuid, zkutil::CreateMode::Persistent));
|
||||
}
|
||||
|
||||
Coordination::Responses responses;
|
||||
const Coordination::Error res = zookeeper->tryMulti(ops, responses);
|
||||
if (res == Coordination::Error::ZNODEEXISTS)
|
||||
{
|
||||
throwNameCollisionCannotRename(new_entity->getType(), old_name, new_name);
|
||||
}
|
||||
else if (res == Coordination::Error::ZNONODE)
|
||||
{
|
||||
throwNotFound(id);
|
||||
}
|
||||
else
|
||||
{
|
||||
zkutil::KeeperMultiException::check(res, ops, responses);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ReplicatedAccessStorage::runWorkerThread()
|
||||
{
|
||||
LOG_DEBUG(getLogger(), "Started worker thread");
|
||||
while (!stop_flag)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!initialized)
|
||||
initializeZookeeper();
|
||||
refresh();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(getLogger(), "Unexpected error, will try to restart worker thread:");
|
||||
resetAfterError();
|
||||
sleepForSeconds(5);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ReplicatedAccessStorage::resetAfterError()
|
||||
{
|
||||
initialized = false;
|
||||
|
||||
UUID id;
|
||||
while (refresh_queue.tryPop(id)) {}
|
||||
|
||||
std::lock_guard lock{mutex};
|
||||
for (const auto type : collections::range(EntityType::MAX))
|
||||
entries_by_name_and_type[static_cast<size_t>(type)].clear();
|
||||
entries_by_id.clear();
|
||||
}
|
||||
|
||||
void ReplicatedAccessStorage::initializeZookeeper()
|
||||
{
|
||||
assert(!initialized);
|
||||
auto zookeeper = get_zookeeper();
|
||||
|
||||
if (!zookeeper)
|
||||
throw Exception("Can't have Replicated access without ZooKeeper", ErrorCodes::NO_ZOOKEEPER);
|
||||
|
||||
createRootNodes(zookeeper);
|
||||
|
||||
refreshEntities(zookeeper);
|
||||
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
void ReplicatedAccessStorage::createRootNodes(const zkutil::ZooKeeperPtr & zookeeper)
|
||||
{
|
||||
zookeeper->createAncestors(zookeeper_path);
|
||||
zookeeper->createIfNotExists(zookeeper_path, "");
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/uuid", "");
|
||||
for (const auto type : collections::range(EntityType::MAX))
|
||||
{
|
||||
/// Create a znode for each type of AccessEntity
|
||||
const auto type_info = EntityTypeInfo::get(type);
|
||||
zookeeper->createIfNotExists(zookeeper_path + "/" + type_info.unique_char, "");
|
||||
}
|
||||
}
|
||||
|
||||
void ReplicatedAccessStorage::refresh()
|
||||
{
|
||||
UUID id;
|
||||
if (refresh_queue.tryPop(id, /* timeout_ms: */ 10000))
|
||||
{
|
||||
if (stop_flag)
|
||||
return;
|
||||
|
||||
auto zookeeper = get_zookeeper();
|
||||
|
||||
if (id == UUIDHelpers::Nil)
|
||||
refreshEntities(zookeeper);
|
||||
else
|
||||
refreshEntity(zookeeper, id);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ReplicatedAccessStorage::refreshEntities(const zkutil::ZooKeeperPtr & zookeeper)
|
||||
{
|
||||
LOG_DEBUG(getLogger(), "Refreshing entities list");
|
||||
|
||||
const String zookeeper_uuids_path = zookeeper_path + "/uuid";
|
||||
auto watch_entities_list = [this](const Coordination::WatchResponse &)
|
||||
{
|
||||
refresh_queue.push(UUIDHelpers::Nil);
|
||||
};
|
||||
Coordination::Stat stat;
|
||||
const auto entity_uuid_strs = zookeeper->getChildrenWatch(zookeeper_uuids_path, &stat, watch_entities_list);
|
||||
|
||||
std::unordered_set<UUID> entity_uuids;
|
||||
entity_uuids.reserve(entity_uuid_strs.size());
|
||||
for (const String & entity_uuid_str : entity_uuid_strs)
|
||||
entity_uuids.insert(parseUUID(entity_uuid_str));
|
||||
|
||||
Notifications notifications;
|
||||
SCOPE_EXIT({ notify(notifications); });
|
||||
std::lock_guard lock{mutex};
|
||||
|
||||
std::vector<UUID> entities_to_remove;
|
||||
/// Locally remove entities that were removed from ZooKeeper
|
||||
for (const auto & pair : entries_by_id)
|
||||
{
|
||||
const UUID & entity_uuid = pair.first;
|
||||
if (!entity_uuids.contains(entity_uuid))
|
||||
entities_to_remove.push_back(entity_uuid);
|
||||
}
|
||||
for (const auto & entity_uuid : entities_to_remove)
|
||||
removeEntityNoLock(entity_uuid, notifications);
|
||||
|
||||
/// Locally add entities that were added to ZooKeeper
|
||||
for (const auto & entity_uuid : entity_uuids)
|
||||
{
|
||||
const auto it = entries_by_id.find(entity_uuid);
|
||||
if (it == entries_by_id.end())
|
||||
refreshEntityNoLock(zookeeper, entity_uuid, notifications);
|
||||
}
|
||||
|
||||
LOG_DEBUG(getLogger(), "Refreshing entities list finished");
|
||||
}
|
||||
|
||||
void ReplicatedAccessStorage::refreshEntity(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id)
|
||||
{
|
||||
Notifications notifications;
|
||||
SCOPE_EXIT({ notify(notifications); });
|
||||
std::lock_guard lock{mutex};
|
||||
|
||||
refreshEntityNoLock(zookeeper, id, notifications);
|
||||
}
|
||||
|
||||
void ReplicatedAccessStorage::refreshEntityNoLock(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id, Notifications & notifications)
|
||||
{
|
||||
LOG_DEBUG(getLogger(), "Refreshing entity {}", toString(id));
|
||||
|
||||
const auto watch_entity = [this, id](const Coordination::WatchResponse & response)
|
||||
{
|
||||
if (response.type == Coordination::Event::CHANGED)
|
||||
refresh_queue.push(id);
|
||||
};
|
||||
Coordination::Stat entity_stat;
|
||||
const String entity_path = zookeeper_path + "/uuid/" + toString(id);
|
||||
String entity_definition;
|
||||
const bool exists = zookeeper->tryGetWatch(entity_path, entity_definition, &entity_stat, watch_entity);
|
||||
if (exists)
|
||||
{
|
||||
const AccessEntityPtr entity = deserializeAccessEntity(entity_definition, entity_path);
|
||||
setEntityNoLock(id, entity, notifications);
|
||||
}
|
||||
else
|
||||
{
|
||||
removeEntityNoLock(id, notifications);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ReplicatedAccessStorage::setEntityNoLock(const UUID & id, const AccessEntityPtr & entity, Notifications & notifications)
|
||||
{
|
||||
LOG_DEBUG(getLogger(), "Setting id {} to entity named {}", toString(id), entity->getName());
|
||||
const EntityType type = entity->getType();
|
||||
const String & name = entity->getName();
|
||||
|
||||
/// If the type+name already exists and is a different entity, remove old entity
|
||||
auto & entries_by_name = entries_by_name_and_type[static_cast<size_t>(type)];
|
||||
if (auto it = entries_by_name.find(name); it != entries_by_name.end() && it->second->id != id)
|
||||
{
|
||||
removeEntityNoLock(it->second->id, notifications);
|
||||
}
|
||||
|
||||
/// If the entity already exists under a different type+name, remove old type+name
|
||||
if (auto it = entries_by_id.find(id); it != entries_by_id.end())
|
||||
{
|
||||
const AccessEntityPtr & existing_entity = it->second.entity;
|
||||
const EntityType existing_type = existing_entity->getType();
|
||||
const String & existing_name = existing_entity->getName();
|
||||
if (existing_type != type || existing_name != name)
|
||||
{
|
||||
auto & existing_entries_by_name = entries_by_name_and_type[static_cast<size_t>(existing_type)];
|
||||
existing_entries_by_name.erase(existing_name);
|
||||
}
|
||||
}
|
||||
|
||||
auto & entry = entries_by_id[id];
|
||||
entry.id = id;
|
||||
entry.entity = entity;
|
||||
entries_by_name[name] = &entry;
|
||||
prepareNotifications(entry, false, notifications);
|
||||
}
|
||||
|
||||
|
||||
void ReplicatedAccessStorage::removeEntityNoLock(const UUID & id, Notifications & notifications)
|
||||
{
|
||||
LOG_DEBUG(getLogger(), "Removing entity with id {}", toString(id));
|
||||
const auto it = entries_by_id.find(id);
|
||||
if (it == entries_by_id.end())
|
||||
{
|
||||
LOG_DEBUG(getLogger(), "Id {} not found, ignoring removal", toString(id));
|
||||
return;
|
||||
}
|
||||
|
||||
const Entry & entry = it->second;
|
||||
const EntityType type = entry.entity->getType();
|
||||
const String & name = entry.entity->getName();
|
||||
prepareNotifications(entry, true, notifications);
|
||||
|
||||
auto & entries_by_name = entries_by_name_and_type[static_cast<size_t>(type)];
|
||||
const auto name_it = entries_by_name.find(name);
|
||||
if (name_it == entries_by_name.end())
|
||||
LOG_WARNING(getLogger(), "Entity {} not found in names, ignoring removal of name", toString(id));
|
||||
else if (name_it->second != &(it->second))
|
||||
LOG_WARNING(getLogger(), "Name {} not pointing to entity {}, ignoring removal of name", name, toString(id));
|
||||
else
|
||||
entries_by_name.erase(name);
|
||||
|
||||
entries_by_id.erase(id);
|
||||
LOG_DEBUG(getLogger(), "Removed entity with id {}", toString(id));
|
||||
}
|
||||
|
||||
|
||||
std::optional<UUID> ReplicatedAccessStorage::findImpl(EntityType type, const String & name) const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
const auto & entries_by_name = entries_by_name_and_type[static_cast<size_t>(type)];
|
||||
const auto it = entries_by_name.find(name);
|
||||
if (it == entries_by_name.end())
|
||||
return {};
|
||||
|
||||
const Entry * entry = it->second;
|
||||
return entry->id;
|
||||
}
|
||||
|
||||
|
||||
std::vector<UUID> ReplicatedAccessStorage::findAllImpl(EntityType type) const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
std::vector<UUID> result;
|
||||
result.reserve(entries_by_id.size());
|
||||
for (const auto & [id, entry] : entries_by_id)
|
||||
if (entry.entity->isTypeOf(type))
|
||||
result.emplace_back(id);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
bool ReplicatedAccessStorage::existsImpl(const UUID & id) const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
return entries_by_id.count(id);
|
||||
}
|
||||
|
||||
|
||||
AccessEntityPtr ReplicatedAccessStorage::readImpl(const UUID & id) const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
const auto it = entries_by_id.find(id);
|
||||
if (it == entries_by_id.end())
|
||||
throwNotFound(id);
|
||||
const Entry & entry = it->second;
|
||||
return entry.entity;
|
||||
}
|
||||
|
||||
|
||||
String ReplicatedAccessStorage::readNameImpl(const UUID & id) const
|
||||
{
|
||||
return readImpl(id)->getName();
|
||||
}
|
||||
|
||||
|
||||
void ReplicatedAccessStorage::prepareNotifications(const Entry & entry, bool remove, Notifications & notifications) const
|
||||
{
|
||||
const AccessEntityPtr entity = remove ? nullptr : entry.entity;
|
||||
for (const auto & handler : entry.handlers_by_id)
|
||||
notifications.push_back({handler, entry.id, entity});
|
||||
|
||||
for (const auto & handler : handlers_by_type[static_cast<size_t>(entry.entity->getType())])
|
||||
notifications.push_back({handler, entry.id, entity});
|
||||
}
|
||||
|
||||
|
||||
scope_guard ReplicatedAccessStorage::subscribeForChangesImpl(EntityType type, const OnChangedHandler & handler) const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
auto & handlers = handlers_by_type[static_cast<size_t>(type)];
|
||||
handlers.push_back(handler);
|
||||
auto handler_it = std::prev(handlers.end());
|
||||
|
||||
return [this, type, handler_it]
|
||||
{
|
||||
std::lock_guard lock2{mutex};
|
||||
auto & handlers2 = handlers_by_type[static_cast<size_t>(type)];
|
||||
handlers2.erase(handler_it);
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
scope_guard ReplicatedAccessStorage::subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
const auto it = entries_by_id.find(id);
|
||||
if (it == entries_by_id.end())
|
||||
return {};
|
||||
const Entry & entry = it->second;
|
||||
auto handler_it = entry.handlers_by_id.insert(entry.handlers_by_id.end(), handler);
|
||||
|
||||
return [this, id, handler_it]
|
||||
{
|
||||
std::lock_guard lock2{mutex};
|
||||
auto it2 = entries_by_id.find(id);
|
||||
if (it2 != entries_by_id.end())
|
||||
{
|
||||
const Entry & entry2 = it2->second;
|
||||
entry2.handlers_by_id.erase(handler_it);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
bool ReplicatedAccessStorage::hasSubscriptionImpl(const UUID & id) const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
const auto & it = entries_by_id.find(id);
|
||||
if (it != entries_by_id.end())
|
||||
{
|
||||
const Entry & entry = it->second;
|
||||
return !entry.handlers_by_id.empty();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
bool ReplicatedAccessStorage::hasSubscriptionImpl(EntityType type) const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
const auto & handlers = handlers_by_type[static_cast<size_t>(type)];
|
||||
return !handlers.empty();
|
||||
}
|
||||
}
|
87
src/Access/ReplicatedAccessStorage.h
Normal file
87
src/Access/ReplicatedAccessStorage.h
Normal file
@ -0,0 +1,87 @@
|
||||
#pragma once
|
||||
|
||||
#include <Access/IAccessStorage.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/ZooKeeper/Common.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <common/scope_guard.h>
|
||||
#include <Coordination/ThreadSafeQueue.h>
|
||||
#include <atomic>
|
||||
#include <list>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <unordered_map>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
/// Implementation of IAccessStorage which keeps all data in zookeeper.
|
||||
class ReplicatedAccessStorage : public IAccessStorage
|
||||
{
|
||||
public:
|
||||
static constexpr char STORAGE_TYPE[] = "replicated";
|
||||
|
||||
ReplicatedAccessStorage(const String & storage_name, const String & zookeeper_path, zkutil::GetZooKeeper get_zookeeper);
|
||||
virtual ~ReplicatedAccessStorage() override;
|
||||
|
||||
const char * getStorageType() const override { return STORAGE_TYPE; }
|
||||
|
||||
virtual void startup();
|
||||
virtual void shutdown();
|
||||
|
||||
private:
|
||||
String zookeeper_path;
|
||||
zkutil::GetZooKeeper get_zookeeper;
|
||||
|
||||
std::atomic<bool> initialized = false;
|
||||
std::atomic<bool> stop_flag = false;
|
||||
ThreadFromGlobalPool worker_thread;
|
||||
ThreadSafeQueue<UUID> refresh_queue;
|
||||
|
||||
UUID insertImpl(const AccessEntityPtr & entity, bool replace_if_exists) override;
|
||||
void removeImpl(const UUID & id) override;
|
||||
void updateImpl(const UUID & id, const UpdateFunc & update_func) override;
|
||||
|
||||
void insertZooKeeper(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id, const AccessEntityPtr & entity, bool replace_if_exists);
|
||||
void removeZooKeeper(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id);
|
||||
void updateZooKeeper(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id, const UpdateFunc & update_func);
|
||||
|
||||
void runWorkerThread();
|
||||
void resetAfterError();
|
||||
void initializeZookeeper();
|
||||
void createRootNodes(const zkutil::ZooKeeperPtr & zookeeper);
|
||||
|
||||
void refresh();
|
||||
void refreshEntities(const zkutil::ZooKeeperPtr & zookeeper);
|
||||
void refreshEntity(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id);
|
||||
void refreshEntityNoLock(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id, Notifications & notifications);
|
||||
|
||||
void setEntityNoLock(const UUID & id, const AccessEntityPtr & entity, Notifications & notifications);
|
||||
void removeEntityNoLock(const UUID & id, Notifications & notifications);
|
||||
|
||||
struct Entry
|
||||
{
|
||||
UUID id;
|
||||
AccessEntityPtr entity;
|
||||
mutable std::list<OnChangedHandler> handlers_by_id;
|
||||
};
|
||||
|
||||
std::optional<UUID> findImpl(EntityType type, const String & name) const override;
|
||||
std::vector<UUID> findAllImpl(EntityType type) const override;
|
||||
bool existsImpl(const UUID & id) const override;
|
||||
AccessEntityPtr readImpl(const UUID & id) const override;
|
||||
String readNameImpl(const UUID & id) const override;
|
||||
bool canInsertImpl(const AccessEntityPtr &) const override { return true; }
|
||||
|
||||
void prepareNotifications(const Entry & entry, bool remove, Notifications & notifications) const;
|
||||
scope_guard subscribeForChangesImpl(const UUID & id, const OnChangedHandler & handler) const override;
|
||||
scope_guard subscribeForChangesImpl(EntityType type, const OnChangedHandler & handler) const override;
|
||||
bool hasSubscriptionImpl(const UUID & id) const override;
|
||||
bool hasSubscriptionImpl(EntityType type) const override;
|
||||
|
||||
mutable std::mutex mutex;
|
||||
std::unordered_map<UUID, Entry> entries_by_id;
|
||||
std::unordered_map<String, Entry *> entries_by_name_and_type[static_cast<size_t>(EntityType::MAX)];
|
||||
mutable std::list<OnChangedHandler> handlers_by_type[static_cast<size_t>(EntityType::MAX)];
|
||||
};
|
||||
}
|
@ -10,6 +10,7 @@ PEERDIR(
|
||||
|
||||
SRCS(
|
||||
AccessControlManager.cpp
|
||||
AccessEntityIO.cpp
|
||||
AccessRights.cpp
|
||||
AccessRightsElement.cpp
|
||||
AllowedClientHosts.cpp
|
||||
@ -34,6 +35,7 @@ SRCS(
|
||||
Quota.cpp
|
||||
QuotaCache.cpp
|
||||
QuotaUsage.cpp
|
||||
ReplicatedAccessStorage.cpp
|
||||
Role.cpp
|
||||
RoleCache.cpp
|
||||
RolesOrUsersSet.cpp
|
||||
|
0
tests/integration/test_replicated_users/__init__.py
Normal file
0
tests/integration/test_replicated_users/__init__.py
Normal file
22
tests/integration/test_replicated_users/configs/config.xml
Normal file
22
tests/integration/test_replicated_users/configs/config.xml
Normal file
@ -0,0 +1,22 @@
|
||||
<yandex>
|
||||
<remote_servers>
|
||||
<default>
|
||||
<shard>
|
||||
<internal_replication>true</internal_replication>
|
||||
<replica>
|
||||
<host>node1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<host>node2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</default>
|
||||
</remote_servers>
|
||||
<user_directories replace="replace">
|
||||
<replicated>
|
||||
<zookeeper_path>/clickhouse/access</zookeeper_path>
|
||||
</replicated>
|
||||
</user_directories>
|
||||
</yandex>
|
73
tests/integration/test_replicated_users/test.py
Normal file
73
tests/integration/test_replicated_users/test.py
Normal file
@ -0,0 +1,73 @@
|
||||
import pytest
|
||||
|
||||
from dataclasses import dataclass
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node1 = cluster.add_instance('node1', main_configs=['configs/config.xml'], with_zookeeper=True, stay_alive=True)
|
||||
node2 = cluster.add_instance('node2', main_configs=['configs/config.xml'], with_zookeeper=True, stay_alive=True)
|
||||
|
||||
all_nodes = [node1, node2]
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Entity:
|
||||
keyword: str
|
||||
name: str
|
||||
options: str = ""
|
||||
|
||||
|
||||
entities = [
|
||||
Entity(keyword="USER", name="theuser"),
|
||||
Entity(keyword="ROLE", name="therole"),
|
||||
Entity(keyword="ROW POLICY", name="thepolicy", options=" ON default.t1"),
|
||||
Entity(keyword="QUOTA", name="thequota"),
|
||||
Entity(keyword="SETTINGS PROFILE", name="theprofile")
|
||||
]
|
||||
|
||||
def get_entity_id(entity):
|
||||
return entity.keyword
|
||||
|
||||
|
||||
@pytest.mark.parametrize("entity", entities, ids=get_entity_id)
|
||||
def test_create_replicated(started_cluster, entity):
|
||||
node1.query(f"CREATE {entity.keyword} {entity.name} {entity.options}")
|
||||
assert f"cannot insert because {entity.keyword.lower()} `{entity.name}{entity.options}` already exists in replicated" in \
|
||||
node2.query_and_get_error(f"CREATE {entity.keyword} {entity.name} {entity.options}")
|
||||
node1.query(f"DROP {entity.keyword} {entity.name} {entity.options}")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("entity", entities, ids=get_entity_id)
|
||||
def test_create_and_delete_replicated(started_cluster, entity):
|
||||
node1.query(f"CREATE {entity.keyword} {entity.name} {entity.options}")
|
||||
node2.query(f"DROP {entity.keyword} {entity.name} {entity.options}")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("entity", entities, ids=get_entity_id)
|
||||
def test_create_replicated_on_cluster(started_cluster, entity):
|
||||
assert f"cannot insert because {entity.keyword.lower()} `{entity.name}{entity.options}` already exists in replicated" in \
|
||||
node1.query_and_get_error(f"CREATE {entity.keyword} {entity.name} ON CLUSTER default {entity.options}")
|
||||
node1.query(f"DROP {entity.keyword} {entity.name} {entity.options}")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("entity", entities, ids=get_entity_id)
|
||||
def test_create_replicated_if_not_exists_on_cluster(started_cluster, entity):
|
||||
node1.query(f"CREATE {entity.keyword} IF NOT EXISTS {entity.name} ON CLUSTER default {entity.options}")
|
||||
node1.query(f"DROP {entity.keyword} {entity.name} {entity.options}")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("entity", entities, ids=get_entity_id)
|
||||
def test_rename_replicated(started_cluster, entity):
|
||||
node1.query(f"CREATE {entity.keyword} {entity.name} {entity.options}")
|
||||
node2.query(f"ALTER {entity.keyword} {entity.name} {entity.options} RENAME TO {entity.name}2")
|
||||
node1.query(f"DROP {entity.keyword} {entity.name}2 {entity.options}")
|
||||
|
Loading…
Reference in New Issue
Block a user