Merge pull request #59528 from ClickHouse/vdimir/analyzer/fix_test_select_access_rights

Analyzer fix test_select_access_rights/test_main.py::test_select_count
This commit is contained in:
vdimir 2024-02-07 17:00:46 +01:00 committed by GitHub
commit 82cb09cab3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 60 additions and 27 deletions

View File

@ -84,39 +84,47 @@ namespace
{
/// Check if current user has privileges to SELECT columns from table
void checkAccessRights(const TableNode & table_node, const Names & column_names, const ContextPtr & query_context)
/// Throws an exception if access to any column from `column_names` is not granted
/// If `column_names` is empty, check access to any columns and return names of accessible columns
NameSet checkAccessRights(const TableNode & table_node, Names & column_names, const ContextPtr & query_context)
{
/// StorageDummy is created on preliminary stage, ignore access check for it.
if (typeid_cast<const StorageDummy *>(table_node.getStorage().get()))
return;
return {};
const auto & storage_id = table_node.getStorageID();
const auto & storage_snapshot = table_node.getStorageSnapshot();
if (column_names.empty())
{
NameSet accessible_columns;
/** For a trivial queries like "SELECT count() FROM table", "SELECT 1 FROM table" access is granted if at least
* one table column is accessible.
*/
auto access = query_context->getAccess();
for (const auto & column : storage_snapshot->metadata->getColumns())
{
if (access->isGranted(AccessType::SELECT, storage_id.database_name, storage_id.table_name, column.name))
return;
accessible_columns.insert(column.name);
}
if (accessible_columns.empty())
{
throw Exception(ErrorCodes::ACCESS_DENIED,
"{}: Not enough privileges. To execute this query, it's necessary to have the grant SELECT for at least one column on {}",
query_context->getUserName(),
storage_id.getFullTableName());
}
return accessible_columns;
}
// In case of cross-replication we don't know what database is used for the table.
// `storage_id.hasDatabase()` can return false only on the initiator node.
// Each shard will use the default database (in the case of cross-replication shards may have different defaults).
if (storage_id.hasDatabase())
query_context->checkAccess(AccessType::SELECT, storage_id, column_names);
return {};
}
bool shouldIgnoreQuotaAndLimits(const TableNode & table_node)
@ -133,7 +141,7 @@ bool shouldIgnoreQuotaAndLimits(const TableNode & table_node)
return false;
}
NameAndTypePair chooseSmallestColumnToReadFromStorage(const StoragePtr & storage, const StorageSnapshotPtr & storage_snapshot)
NameAndTypePair chooseSmallestColumnToReadFromStorage(const StoragePtr & storage, const StorageSnapshotPtr & storage_snapshot, const NameSet & column_names_allowed_to_select)
{
/** We need to read at least one column to find the number of rows.
* We will find a column with minimum <compressed_size, type_size, uncompressed_size>.
@ -167,6 +175,18 @@ NameAndTypePair chooseSmallestColumnToReadFromStorage(const StoragePtr & storage
auto column_sizes = storage->getColumnSizes();
auto column_names_and_types = storage_snapshot->getColumns(GetColumnsOptions(GetColumnsOptions::AllPhysical).withSubcolumns());
if (!column_names_allowed_to_select.empty())
{
auto it = column_names_and_types.begin();
while (it != column_names_and_types.end())
{
if (!column_names_allowed_to_select.contains(it->name))
it = column_names_and_types.erase(it);
else
++it;
}
}
if (!column_sizes.empty())
{
for (auto & column_name_and_type : column_names_and_types)
@ -330,12 +350,13 @@ void prepareBuildQueryPlanForTableExpression(const QueryTreeNodePtr & table_expr
/** The current user must have the SELECT privilege.
* We do not check access rights for table functions because they have been already checked in ITableFunction::execute().
*/
NameSet columns_names_allowed_to_select;
if (table_node)
{
auto column_names_with_aliases = columns_names;
const auto & alias_columns_names = table_expression_data.getAliasColumnsNames();
column_names_with_aliases.insert(column_names_with_aliases.end(), alias_columns_names.begin(), alias_columns_names.end());
checkAccessRights(*table_node, column_names_with_aliases, query_context);
columns_names_allowed_to_select = checkAccessRights(*table_node, column_names_with_aliases, query_context);
}
if (columns_names.empty())
@ -346,8 +367,7 @@ void prepareBuildQueryPlanForTableExpression(const QueryTreeNodePtr & table_expr
{
const auto & storage = table_node ? table_node->getStorage() : table_function_node->getStorage();
const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot();
additional_column_to_read = chooseSmallestColumnToReadFromStorage(storage, storage_snapshot);
additional_column_to_read = chooseSmallestColumnToReadFromStorage(storage, storage_snapshot, columns_names_allowed_to_select);
}
else if (query_node || union_node)
{

View File

@ -12,8 +12,6 @@ test_mysql_database_engine/test.py::test_mysql_ddl_for_mysql_database
test_passing_max_partitions_to_read_remotely/test.py::test_default_database_on_cluster
test_replicating_constants/test.py::test_different_versions
test_select_access_rights/test_main.py::test_alias_columns
test_select_access_rights/test_main.py::test_select_count
test_select_access_rights/test_main.py::test_select_join
test_settings_profile/test.py::test_show_profiles
test_shard_level_const_function/test.py::test_remote
test_sql_user_defined_functions_on_cluster/test.py::test_sql_user_defined_functions_on_cluster

View File

@ -1,6 +1,7 @@
import pytest
import re
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance("instance")
@ -185,25 +186,39 @@ def test_select_join():
)
select_query = "SELECT * FROM table1 JOIN table2 USING(d)"
assert (
"it's necessary to have the grant SELECT(d, x, y) ON default.table2"
in instance.query_and_get_error(select_query, user="A")
def match_error(err, columns, table):
"""Check if the error message contains the expected table and columns"""
match = re.search(
r"it's necessary to have the grant SELECT\((.*)\) ON default\.(\w+)", err
)
if not match:
return False
if match.group(2) != table:
return False
assert set(match.group(1).split(", ")) == set(
columns.split(", ")
), f"expected {columns} in {err}"
return True
response = instance.query_and_get_error(select_query, user="A")
table1_match = match_error(response, "d, a, b", "table1")
table2_match = match_error(response, "d, x, y", "table2")
assert table1_match or table2_match, response
instance.query("GRANT SELECT(d, x, y) ON default.table2 TO A")
assert (
"it's necessary to have the grant SELECT(d, a, b) ON default.table1"
in instance.query_and_get_error(select_query, user="A")
)
response = instance.query_and_get_error(select_query, user="A")
assert match_error(response, "d, a, b", "table1")
response = instance.query_and_get_error(select_query, user="A")
instance.query("GRANT SELECT(d, a, b) ON default.table1 TO A")
assert instance.query(select_query, user="A") == ""
instance.query("REVOKE SELECT ON default.table2 FROM A")
assert (
"it's necessary to have the grant SELECT(d, x, y) ON default.table2"
in instance.query_and_get_error(select_query, user="A")
)
response = instance.query_and_get_error(select_query, user="A")
assert match_error(response, "d, x, y", "table2")
def test_select_union():