Allow to give access only to certain named collections

This commit is contained in:
kssenii 2023-02-09 15:52:11 +01:00
parent 9b23068d5e
commit 9e04e57949
12 changed files with 189 additions and 37 deletions

View File

@ -61,12 +61,22 @@ namespace
res.any_database = true;
res.any_table = true;
res.any_column = true;
res.any_named_collection = !access_flags.isNamedCollectionAccessOnly();
break;
}
case 1:
{
res.any_database = false;
res.database = full_name[0];
res.any_named_collection = !access_flags.isNamedCollectionAccessOnly();
if (!res.any_named_collection)
{
res.any_database = true;
res.named_collection = full_name[0];
}
else
{
res.any_database = false;
res.database = full_name[0];
}
res.any_table = true;
res.any_column = true;
break;
@ -317,8 +327,8 @@ public:
const Node * child = tryGetChild(name);
if (child)
return child->isGranted(flags_to_check, subnames...);
else
return flags.contains(flags_to_check);
return flags.contains(flags_to_check);
}
template <typename StringT>
@ -783,7 +793,9 @@ void AccessRights::grantImplHelper(const AccessRightsElement & element)
{
assert(!element.is_partial_revoke);
assert(!element.grant_option || with_grant_option);
if (element.any_database)
if (!element.any_named_collection)
grantImpl<with_grant_option>(element.access_flags, element.named_collection);
else if (element.any_database)
grantImpl<with_grant_option>(element.access_flags);
else if (element.any_table)
grantImpl<with_grant_option>(element.access_flags, element.database);
@ -825,7 +837,10 @@ void AccessRights::grant(const AccessFlags & flags, std::string_view database, s
void AccessRights::grant(const AccessFlags & flags, std::string_view database, std::string_view table, const std::vector<std::string_view> & columns) { grantImpl<false>(flags, database, table, columns); }
void AccessRights::grant(const AccessFlags & flags, std::string_view database, std::string_view table, const Strings & columns) { grantImpl<false>(flags, database, table, columns); }
void AccessRights::grant(const AccessRightsElement & element) { grantImpl<false>(element); }
void AccessRights::grant(const AccessRightsElements & elements) { grantImpl<false>(elements); }
void AccessRights::grant(const AccessRightsElements & elements)
{
grantImpl<false>(elements);
}
void AccessRights::grantWithGrantOption(const AccessFlags & flags) { grantImpl<true>(flags); }
void AccessRights::grantWithGrantOption(const AccessFlags & flags, std::string_view database) { grantImpl<true>(flags, database); }
@ -858,7 +873,9 @@ template <bool grant_option>
void AccessRights::revokeImplHelper(const AccessRightsElement & element)
{
assert(!element.grant_option || grant_option);
if (element.any_database)
if (!element.any_named_collection)
revokeImpl<grant_option>(element.access_flags, element.named_collection);
else if (element.any_database)
revokeImpl<grant_option>(element.access_flags);
else if (element.any_table)
revokeImpl<grant_option>(element.access_flags, element.database);
@ -912,7 +929,7 @@ void AccessRights::revokeGrantOption(const AccessRightsElements & elements) { re
AccessRightsElements AccessRights::getElements() const
{
#if 0
#if 1
logTree();
#endif
if (!root)
@ -934,6 +951,7 @@ bool AccessRights::isGrantedImpl(const AccessFlags & flags, const Args &... args
{
auto helper = [&](const std::unique_ptr<Node> & root_node) -> bool
{
logTree();
if (!root_node)
return flags.isEmpty();
return root_node->isGranted(flags, args...);
@ -948,7 +966,9 @@ template <bool grant_option>
bool AccessRights::isGrantedImplHelper(const AccessRightsElement & element) const
{
assert(!element.grant_option || grant_option);
if (element.any_database)
if (!element.any_named_collection)
return isGrantedImpl<grant_option>(element.access_flags, element.named_collection);
else if (element.any_database)
return isGrantedImpl<grant_option>(element.access_flags);
else if (element.any_table)
return isGrantedImpl<grant_option>(element.access_flags, element.database);

View File

@ -96,11 +96,13 @@ namespace
const Flags & getAllFlags() const { return all_flags; }
const Flags & getGlobalFlags() const { return all_flags_for_target[GLOBAL]; }
const Flags & getGlobalWithParameterFlags() const { return all_flags_for_target[GLOBAL_WITH_PARAMETER]; }
const Flags & getDatabaseFlags() const { return all_flags_for_target[DATABASE]; }
const Flags & getTableFlags() const { return all_flags_for_target[TABLE]; }
const Flags & getColumnFlags() const { return all_flags_for_target[COLUMN]; }
const Flags & getDictionaryFlags() const { return all_flags_for_target[DICTIONARY]; }
const Flags & getAllFlagsGrantableOnGlobalLevel() const { return getAllFlags(); }
const Flags & getAllFlagsGrantableOnNamedCollectionLevel() const { return all_flags_for_target[NAMED_COLLECTION]; }
const Flags & getAllFlagsGrantableOnDatabaseLevel() const { return all_flags_grantable_on_database_level; }
const Flags & getAllFlagsGrantableOnTableLevel() const { return all_flags_grantable_on_table_level; }
const Flags & getAllFlagsGrantableOnColumnLevel() const { return getColumnFlags(); }
@ -116,6 +118,8 @@ namespace
VIEW = TABLE,
COLUMN,
DICTIONARY,
GLOBAL_WITH_PARAMETER,
NAMED_COLLECTION,
};
struct Node;
@ -295,7 +299,7 @@ namespace
collectAllFlags(child.get());
all_flags_grantable_on_table_level = all_flags_for_target[TABLE] | all_flags_for_target[DICTIONARY] | all_flags_for_target[COLUMN];
all_flags_grantable_on_database_level = all_flags_for_target[DATABASE] | all_flags_grantable_on_table_level;
all_flags_grantable_on_database_level = all_flags_for_target[DATABASE] | all_flags_for_target[NAMED_COLLECTION] | all_flags_grantable_on_table_level;
}
Helper()
@ -345,7 +349,7 @@ namespace
std::unordered_map<std::string_view, Flags> keyword_to_flags_map;
std::vector<Flags> access_type_to_flags_mapping;
Flags all_flags;
Flags all_flags_for_target[static_cast<size_t>(DICTIONARY) + 1];
Flags all_flags_for_target[static_cast<size_t>(NAMED_COLLECTION) + 1];
Flags all_flags_grantable_on_database_level;
Flags all_flags_grantable_on_table_level;
};
@ -361,11 +365,13 @@ std::vector<AccessType> AccessFlags::toAccessTypes() const { return Helper::inst
std::vector<std::string_view> AccessFlags::toKeywords() const { return Helper::instance().flagsToKeywords(flags); }
AccessFlags AccessFlags::allFlags() { return Helper::instance().getAllFlags(); }
AccessFlags AccessFlags::allGlobalFlags() { return Helper::instance().getGlobalFlags(); }
AccessFlags AccessFlags::allGlobalWithParameterFlags() { return Helper::instance().getGlobalWithParameterFlags(); }
AccessFlags AccessFlags::allDatabaseFlags() { return Helper::instance().getDatabaseFlags(); }
AccessFlags AccessFlags::allTableFlags() { return Helper::instance().getTableFlags(); }
AccessFlags AccessFlags::allColumnFlags() { return Helper::instance().getColumnFlags(); }
AccessFlags AccessFlags::allDictionaryFlags() { return Helper::instance().getDictionaryFlags(); }
AccessFlags AccessFlags::allFlagsGrantableOnGlobalLevel() { return Helper::instance().getAllFlagsGrantableOnGlobalLevel(); }
AccessFlags AccessFlags::allFlagsGrantableOnNamedCollectionLevel() { return Helper::instance().getAllFlagsGrantableOnNamedCollectionLevel(); }
AccessFlags AccessFlags::allFlagsGrantableOnDatabaseLevel() { return Helper::instance().getAllFlagsGrantableOnDatabaseLevel(); }
AccessFlags AccessFlags::allFlagsGrantableOnTableLevel() { return Helper::instance().getAllFlagsGrantableOnTableLevel(); }
AccessFlags AccessFlags::allFlagsGrantableOnColumnLevel() { return Helper::instance().getAllFlagsGrantableOnColumnLevel(); }

View File

@ -50,6 +50,7 @@ public:
bool isEmpty() const { return flags.none(); }
explicit operator bool() const { return !isEmpty(); }
bool contains(const AccessFlags & other) const { return (flags & other.flags) == other.flags; }
bool isNamedCollectionAccessOnly() const { return (flags & ~allFlagsGrantableOnNamedCollectionLevel()).isEmpty(); }
friend bool operator ==(const AccessFlags & left, const AccessFlags & right) { return left.flags == right.flags; }
friend bool operator !=(const AccessFlags & left, const AccessFlags & right) { return !(left == right); }
@ -76,6 +77,8 @@ public:
/// Returns all the global flags.
static AccessFlags allGlobalFlags();
static AccessFlags allGlobalWithParameterFlags();
/// Returns all the flags related to a database.
static AccessFlags allDatabaseFlags();
@ -104,6 +107,8 @@ public:
/// The same as allColumnFlags().
static AccessFlags allFlagsGrantableOnColumnLevel();
static AccessFlags allFlagsGrantableOnNamedCollectionLevel();
static constexpr size_t SIZE = 256;
private:
using Flags = std::bitset<SIZE>;

View File

@ -21,24 +21,31 @@ namespace
result += ")";
}
void formatONClause(const String & database, bool any_database, const String & table, bool any_table, String & result)
void formatONClause(const AccessRightsElement & element, String & result)
{
result += "ON ";
if (any_database)
if (!element.any_named_collection)
{
if (element.named_collection.empty())
result += "*";
else
result += backQuoteIfNeed(element.named_collection);
}
else if (element.any_database)
{
result += "*.*";
}
else
{
if (!database.empty())
if (!element.database.empty())
{
result += backQuoteIfNeed(database);
result += backQuoteIfNeed(element.database);
result += ".";
}
if (any_table)
if (element.any_table)
result += "*";
else
result += backQuoteIfNeed(table);
result += backQuoteIfNeed(element.table);
}
}
@ -96,7 +103,7 @@ namespace
String result;
formatAccessFlagsWithColumns(element.access_flags, element.columns, element.any_column, result);
result += " ";
formatONClause(element.database, element.any_database, element.table, element.any_table, result);
formatONClause(element, result);
if (with_options)
formatOptions(element.grant_option, element.is_partial_revoke, result);
return result;
@ -129,7 +136,7 @@ namespace
if (!next_element_uses_same_table_and_options)
{
part += " ";
formatONClause(element.database, element.any_database, element.table, element.any_table, part);
formatONClause(element, part);
if (with_options)
formatOptions(element.grant_option, element.is_partial_revoke, part);
if (result.empty())

View File

@ -14,9 +14,11 @@ struct AccessRightsElement
String database;
String table;
Strings columns;
String named_collection;
bool any_database = true;
bool any_table = true;
bool any_column = true;
bool any_named_collection = true;
bool grant_option = false;
bool is_partial_revoke = false;

View File

@ -12,7 +12,7 @@ enum class AccessType
/// Macro M should be defined as M(name, aliases, node_type, parent_group_name)
/// where name is identifier with underscores (instead of spaces);
/// aliases is a string containing comma-separated list;
/// node_type either specifies access type's level (GLOBAL/DATABASE/TABLE/DICTIONARY/VIEW/COLUMNS),
/// node_type either specifies access type's level (GLOBAL/NAMED_COLLECTION/DATABASE/TABLE/DICTIONARY/VIEW/COLUMNS),
/// or specifies that the access type is a GROUP of other access types;
/// parent_group_name is the name of the group containing this access type (or NONE if there is no such group).
#define APPLY_FOR_ACCESS_TYPES(M) \
@ -69,7 +69,7 @@ enum class AccessType
M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \
\
M(ALTER_DATABASE_SETTINGS, "ALTER DATABASE SETTING, ALTER MODIFY DATABASE SETTING, MODIFY DATABASE SETTING", DATABASE, ALTER_DATABASE) /* allows to execute ALTER MODIFY SETTING */\
M(ALTER_NAMED_COLLECTION, "", GROUP, ALTER) /* allows to execute ALTER NAMED COLLECTION */\
M(ALTER_NAMED_COLLECTION, "", NAMED_COLLECTION, ALTER) /* allows to execute ALTER NAMED COLLECTION */\
\
M(ALTER_TABLE, "", GROUP, ALTER) \
M(ALTER_DATABASE, "", GROUP, ALTER) \
@ -89,7 +89,7 @@ enum class AccessType
M(CREATE_TEMPORARY_TABLE, "", GLOBAL, CREATE) /* allows to create and manipulate temporary tables;
implicitly enabled by the grant CREATE_TABLE on any table */ \
M(CREATE_FUNCTION, "", GLOBAL, CREATE) /* allows to execute CREATE FUNCTION */ \
M(CREATE_NAMED_COLLECTION, "", GLOBAL, CREATE) /* allows to execute CREATE NAMED COLLECTION */ \
M(CREATE_NAMED_COLLECTION, "", NAMED_COLLECTION, CREATE) /* allows to execute CREATE NAMED COLLECTION */ \
M(CREATE, "", GROUP, ALL) /* allows to execute {CREATE|ATTACH} */ \
\
M(DROP_DATABASE, "", DATABASE, DROP) /* allows to execute {DROP|DETACH} DATABASE */\
@ -98,7 +98,7 @@ enum class AccessType
implicitly enabled by the grant DROP_TABLE */\
M(DROP_DICTIONARY, "", DICTIONARY, DROP) /* allows to execute {DROP|DETACH} DICTIONARY */\
M(DROP_FUNCTION, "", GLOBAL, DROP) /* allows to execute DROP FUNCTION */\
M(DROP_NAMED_COLLECTION, "", GLOBAL, DROP) /* allows to execute DROP NAMED COLLECTION */\
M(DROP_NAMED_COLLECTION, "", NAMED_COLLECTION, DROP) /* allows to execute DROP NAMED COLLECTION */\
M(DROP, "", GROUP, ALL) /* allows to execute {DROP|DETACH} */\
\
M(TRUNCATE, "TRUNCATE TABLE", TABLE, ALL) \
@ -134,7 +134,7 @@ enum class AccessType
M(SHOW_QUOTAS, "SHOW CREATE QUOTA", GLOBAL, SHOW_ACCESS) \
M(SHOW_SETTINGS_PROFILES, "SHOW PROFILES, SHOW CREATE SETTINGS PROFILE, SHOW CREATE PROFILE", GLOBAL, SHOW_ACCESS) \
M(SHOW_ACCESS, "", GROUP, ACCESS_MANAGEMENT) \
M(SHOW_NAMED_COLLECTIONS, "SHOW NAMED COLLECTIONS", GLOBAL, ACCESS_MANAGEMENT) \
M(SHOW_NAMED_COLLECTIONS, "SHOW NAMED COLLECTIONS", NAMED_COLLECTION, ACCESS_MANAGEMENT) \
M(ACCESS_MANAGEMENT, "", GROUP, ALL) \
\
M(SYSTEM_SHUTDOWN, "SYSTEM KILL, SHUTDOWN", GLOBAL, SYSTEM) \

View File

@ -606,7 +606,9 @@ template <bool throw_if_denied, bool grant_option>
bool ContextAccess::checkAccessImplHelper(const AccessRightsElement & element) const
{
assert(!element.grant_option || grant_option);
if (element.any_database)
if (!element.any_named_collection)
return checkAccessImpl<throw_if_denied, grant_option>(element.access_flags, element.named_collection);
else if (element.any_database)
return checkAccessImpl<throw_if_denied, grant_option>(element.access_flags);
else if (element.any_table)
return checkAccessImpl<throw_if_denied, grant_option>(element.access_flags, element.database);

View File

@ -27,21 +27,25 @@ namespace
}
void formatONClause(const String & database, bool any_database, const String & table, bool any_table, const IAST::FormatSettings & settings)
void formatONClause(const AccessRightsElement & element, const IAST::FormatSettings & settings)
{
settings.ostr << (settings.hilite ? IAST::hilite_keyword : "") << "ON " << (settings.hilite ? IAST::hilite_none : "");
if (any_database)
if (!element.any_named_collection)
{
settings.ostr << backQuoteIfNeed(element.named_collection);
}
else if (element.any_database)
{
settings.ostr << "*.*";
}
else
{
if (!database.empty())
settings.ostr << backQuoteIfNeed(database) << ".";
if (any_table)
if (!element.database.empty())
settings.ostr << backQuoteIfNeed(element.database) << ".";
if (element.any_table)
settings.ostr << "*";
else
settings.ostr << backQuoteIfNeed(table);
settings.ostr << backQuoteIfNeed(element.table);
}
}
@ -71,14 +75,15 @@ namespace
{
const auto & next_element = elements[i + 1];
if ((element.database == next_element.database) && (element.any_database == next_element.any_database)
&& (element.table == next_element.table) && (element.any_table == next_element.any_table))
&& (element.table == next_element.table) && (element.any_table == next_element.any_table)
&& (element.named_collection == next_element.named_collection))
next_element_on_same_db_and_table = true;
}
if (!next_element_on_same_db_and_table)
{
settings.ostr << " ";
formatONClause(element.database, element.any_database, element.table, element.any_table, settings);
formatONClause(element, settings);
}
}

View File

@ -123,12 +123,38 @@ namespace
if (!parseAccessFlagsWithColumns(pos, expected, access_and_columns))
return false;
String database_name, table_name, collection_name;
bool any_database = false, any_table = false, any_named_collection = true;
size_t named_collection_access = 0;
for (const auto & elem : access_and_columns)
{
if (elem.first.isNamedCollectionAccessOnly())
++named_collection_access;
}
const bool grant_named_collection_access = named_collection_access == access_and_columns.size();
if (!ParserKeyword{"ON"}.ignore(pos, expected))
return false;
String database_name, table_name;
bool any_database = false, any_table = false;
if (!parseDatabaseAndTableNameOrAsterisks(pos, expected, database_name, any_database, table_name, any_table))
if (grant_named_collection_access)
{
ASTPtr collection;
if (ParserToken{TokenType::Asterisk}.ignore(pos, expected))
{
any_named_collection = true;
}
else if (ParserIdentifier{}.parse(pos, collection, expected))
{
any_named_collection = false;
collection_name = getIdentifierName(collection);
}
else
return false;
any_database = any_table = true;
}
else if (!parseDatabaseAndTableNameOrAsterisks(pos, expected, database_name, any_database, table_name, any_table))
return false;
for (auto & [access_flags, columns] : access_and_columns)
@ -140,6 +166,8 @@ namespace
element.any_database = any_database;
element.database = database_name;
element.any_table = any_table;
element.any_named_collection = any_named_collection;
element.named_collection = collection_name;
element.table = table_name;
res_elements.emplace_back(std::move(element));
}

View File

@ -7,6 +7,7 @@
#include <Interpreters/ProfileEventsExt.h>
#include <Access/Common/AccessType.h>
#include <Access/Common/AccessFlags.h>
#include <Access/ContextAccess.h>
#include <Columns/ColumnMap.h>
#include <Common/NamedCollections/NamedCollections.h>
@ -29,11 +30,14 @@ StorageSystemNamedCollections::StorageSystemNamedCollections(const StorageID & t
void StorageSystemNamedCollections::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
{
context->checkAccess(AccessType::SHOW_NAMED_COLLECTIONS);
const auto & access = context->getAccess();
auto collections = NamedCollectionFactory::instance().getAll();
for (const auto & [name, collection] : collections)
{
if (!access->isGranted(AccessType::SHOW_NAMED_COLLECTIONS, name))
continue;
res_columns[0]->insert(name);
auto * column_map = typeid_cast<ColumnMap *>(res_columns[1].get());

View File

@ -28,6 +28,8 @@ namespace
DICTIONARY,
VIEW,
COLUMN,
GLOBAL_WITH_PARAMETER,
NAMED_COLLECTION,
};
DataTypeEnum8::Values getLevelEnumValues()
@ -39,6 +41,7 @@ namespace
enum_values.emplace_back("DICTIONARY", static_cast<Int8>(DICTIONARY));
enum_values.emplace_back("VIEW", static_cast<Int8>(VIEW));
enum_values.emplace_back("COLUMN", static_cast<Int8>(COLUMN));
enum_values.emplace_back("NAMED_COLLECTION", static_cast<Int8>(NAMED_COLLECTION));
return enum_values;
}
}

View File

@ -105,6 +105,76 @@ def test_access(cluster):
assert int(node.query("select count() from system.named_collections")) > 0
def test_granular_access(cluster):
node = cluster.instances["node"]
assert 1 == int(node.query("SELECT count() FROM system.named_collections"))
assert (
"collection1" == node.query("SELECT name FROM system.named_collections").strip()
)
node.query("CREATE USER kek")
node.query("GRANT select ON *.* TO kek")
assert 0 == int(
node.query("SELECT count() FROM system.named_collections", user="kek")
)
node.query("GRANT show named collections ON collection1 TO kek")
assert 1 == int(
node.query("SELECT count() FROM system.named_collections", user="kek")
)
assert (
"collection1"
== node.query("SELECT name FROM system.named_collections", user="kek").strip()
)
node.query("CREATE NAMED COLLECTION collection2 AS key1=1, key2='value2'")
assert 2 == int(node.query("SELECT count() FROM system.named_collections"))
assert (
"collection1\ncollection2"
== node.query("select name from system.named_collections").strip()
)
assert 1 == int(
node.query("SELECT count() FROM system.named_collections", user="kek")
)
assert (
"collection1"
== node.query("select name from system.named_collections", user="kek").strip()
)
node.query("GRANT show named collections ON collection2 TO kek")
assert 2 == int(
node.query("SELECT count() FROM system.named_collections", user="kek")
)
assert (
"collection1\ncollection2"
== node.query("select name from system.named_collections", user="kek").strip()
)
node.restart_clickhouse()
assert (
"collection1\ncollection2"
== node.query("select name from system.named_collections", user="kek").strip()
)
node.query("CREATE USER koko")
node.query("GRANT select ON *.* TO koko")
assert 0 == int(
node.query("SELECT count() FROM system.named_collections", user="koko")
)
node.query("GRANT show named collections ON * TO koko")
assert (
"collection1\ncollection2"
== node.query("select name from system.named_collections", user="koko").strip()
)
node.restart_clickhouse()
assert (
"collection1\ncollection2"
== node.query("select name from system.named_collections", user="koko").strip()
)
node.query("DROP NAMED COLLECTION collection2")
def test_config_reload(cluster):
node = cluster.instances["node"]
assert (