Merge pull request #52975 from aalexfvk/ignore_on_cluster_for_replicated_entity_queries

Ignore ON CLUSTER clause in queries for management of replicated entities (UDF and Access)
This commit is contained in:
Alexander Tokmakov 2023-08-18 19:29:41 +03:00 committed by GitHub
commit 14590305ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 213 additions and 38 deletions

View File

@ -502,4 +502,15 @@ void MultipleAccessStorage::restoreFromBackup(RestorerFromBackup & restorer)
throwBackupNotAllowed();
}
bool MultipleAccessStorage::containsStorage(std::string_view storage_type) const
{
auto storages = getStoragesInternal();
for (const auto & storage : *storages)
{
if (storage->getStorageType() == storage_type)
return true;
}
return false;
}
}

View File

@ -57,6 +57,7 @@ public:
bool isRestoreAllowed() const override;
void backup(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, AccessEntityType type) const override;
void restoreFromBackup(RestorerFromBackup & restorer) override;
bool containsStorage(std::string_view storage_type) const;
protected:
std::optional<UUID> findImpl(AccessEntityType type, const String & name) const override;

View File

@ -307,6 +307,9 @@ class IColumn;
M(Bool, final, false, "Query with the FINAL modifier by default. If the engine does not support final, it does not have any effect. On queries with multiple tables final is applied only on those that support it. It also works on distributed tables", 0) \
\
M(Bool, partial_result_on_first_cancel, false, "Allows query to return a partial result after cancel.", 0) \
\
M(Bool, ignore_on_cluster_for_replicated_udf_queries, false, "Ignore ON CLUSTER clause for replicated UDF management queries.", 0) \
M(Bool, ignore_on_cluster_for_replicated_access_entities_queries, false, "Ignore ON CLUSTER clause for replicated access entities management queries.", 0) \
/** Settings for testing hedged requests */ \
M(Milliseconds, sleep_in_send_tables_status_ms, 0, "Time to sleep in sending tables status response in TCPHandler", 0) \
M(Milliseconds, sleep_in_send_data_ms, 0, "Time to sleep in sending data in TCPHandler", 0) \

View File

@ -1,16 +1,17 @@
#include <Interpreters/Access/InterpreterCreateQuotaQuery.h>
#include <Parsers/Access/ASTCreateQuotaQuery.h>
#include <Parsers/Access/ASTRolesOrUsersSet.h>
#include <Access/AccessControl.h>
#include <Access/Common/AccessFlags.h>
#include <Access/Quota.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/removeOnClusterClauseIfNeeded.h>
#include <Parsers/Access/ASTCreateQuotaQuery.h>
#include <Parsers/Access/ASTRolesOrUsersSet.h>
#include <base/range.h>
#include <boost/range/algorithm/find_if.hpp>
#include <boost/range/algorithm/upper_bound.hpp>
#include <boost/range/algorithm/sort.hpp>
#include <boost/range/algorithm/upper_bound.hpp>
namespace DB
{
@ -82,14 +83,16 @@ namespace
BlockIO InterpreterCreateQuotaQuery::execute()
{
auto & query = query_ptr->as<ASTCreateQuotaQuery &>();
const auto updated_query_ptr = removeOnClusterClauseIfNeeded(query_ptr, getContext());
auto & query = updated_query_ptr->as<ASTCreateQuotaQuery &>();
auto & access_control = getContext()->getAccessControl();
getContext()->checkAccess(query.alter ? AccessType::ALTER_QUOTA : AccessType::CREATE_QUOTA);
if (!query.cluster.empty())
{
query.replaceCurrentUserTag(getContext()->getUserName());
return executeDDLQueryOnCluster(query_ptr, getContext());
return executeDDLQueryOnCluster(updated_query_ptr, getContext());
}
std::optional<RolesOrUsersSet> roles_from_query;

View File

@ -1,9 +1,11 @@
#include <Interpreters/Access/InterpreterCreateRoleQuery.h>
#include <Parsers/Access/ASTCreateRoleQuery.h>
#include <Access/AccessControl.h>
#include <Access/Role.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/removeOnClusterClauseIfNeeded.h>
#include <Parsers/Access/ASTCreateRoleQuery.h>
namespace DB
@ -39,7 +41,9 @@ namespace
BlockIO InterpreterCreateRoleQuery::execute()
{
const auto & query = query_ptr->as<const ASTCreateRoleQuery &>();
const auto updated_query_ptr = removeOnClusterClauseIfNeeded(query_ptr, getContext());
const auto & query = updated_query_ptr->as<const ASTCreateRoleQuery &>();
auto & access_control = getContext()->getAccessControl();
if (query.alter)
getContext()->checkAccess(AccessType::ALTER_ROLE);
@ -56,7 +60,7 @@ BlockIO InterpreterCreateRoleQuery::execute()
}
if (!query.cluster.empty())
return executeDDLQueryOnCluster(query_ptr, getContext());
return executeDDLQueryOnCluster(updated_query_ptr, getContext());
IAccessStorage * storage = &access_control;
MultipleAccessStorage::StoragePtr storage_ptr;

View File

@ -1,14 +1,16 @@
#include <Interpreters/Access/InterpreterCreateRowPolicyQuery.h>
#include <Parsers/Access/ASTCreateRowPolicyQuery.h>
#include <Parsers/Access/ASTRowPolicyName.h>
#include <Parsers/Access/ASTRolesOrUsersSet.h>
#include <Parsers/formatAST.h>
#include <Access/AccessControl.h>
#include <Access/Common/AccessFlags.h>
#include <Access/Common/AccessRightsElement.h>
#include <Access/RowPolicy.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/removeOnClusterClauseIfNeeded.h>
#include <Parsers/Access/ASTCreateRowPolicyQuery.h>
#include <Parsers/Access/ASTRolesOrUsersSet.h>
#include <Parsers/Access/ASTRowPolicyName.h>
#include <Parsers/formatAST.h>
#include <boost/range/algorithm/sort.hpp>
@ -51,7 +53,8 @@ namespace
BlockIO InterpreterCreateRowPolicyQuery::execute()
{
auto & query = query_ptr->as<ASTCreateRowPolicyQuery &>();
const auto updated_query_ptr = removeOnClusterClauseIfNeeded(query_ptr, getContext());
auto & query = updated_query_ptr->as<ASTCreateRowPolicyQuery &>();
auto required_access = getRequiredAccess();
if (!query.cluster.empty())
@ -59,7 +62,7 @@ BlockIO InterpreterCreateRowPolicyQuery::execute()
query.replaceCurrentUserTag(getContext()->getUserName());
DDLQueryOnClusterParams params;
params.access_to_check = std::move(required_access);
return executeDDLQueryOnCluster(query_ptr, getContext(), params);
return executeDDLQueryOnCluster(updated_query_ptr, getContext(), params);
}
assert(query.names->cluster.empty());

View File

@ -1,11 +1,13 @@
#include <Interpreters/Access/InterpreterCreateSettingsProfileQuery.h>
#include <Parsers/Access/ASTCreateSettingsProfileQuery.h>
#include <Parsers/Access/ASTRolesOrUsersSet.h>
#include <Access/AccessControl.h>
#include <Access/SettingsProfile.h>
#include <Access/Common/AccessFlags.h>
#include <Access/SettingsProfile.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/removeOnClusterClauseIfNeeded.h>
#include <Parsers/Access/ASTCreateSettingsProfileQuery.h>
#include <Parsers/Access/ASTRolesOrUsersSet.h>
namespace DB
@ -47,7 +49,9 @@ namespace
BlockIO InterpreterCreateSettingsProfileQuery::execute()
{
auto & query = query_ptr->as<ASTCreateSettingsProfileQuery &>();
const auto updated_query_ptr = removeOnClusterClauseIfNeeded(query_ptr, getContext());
auto & query = updated_query_ptr->as<ASTCreateSettingsProfileQuery &>();
auto & access_control = getContext()->getAccessControl();
if (query.alter)
getContext()->checkAccess(AccessType::ALTER_SETTINGS_PROFILE);
@ -66,7 +70,7 @@ BlockIO InterpreterCreateSettingsProfileQuery::execute()
if (!query.cluster.empty())
{
query.replaceCurrentUserTag(getContext()->getUserName());
return executeDDLQueryOnCluster(query_ptr, getContext());
return executeDDLQueryOnCluster(updated_query_ptr, getContext());
}
std::optional<RolesOrUsersSet> roles_from_query;

View File

@ -1,14 +1,18 @@
#include <Interpreters/Access/InterpreterCreateUserQuery.h>
#include <Parsers/Access/ASTCreateUserQuery.h>
#include <Parsers/Access/ASTRolesOrUsersSet.h>
#include <Parsers/Access/ASTUserNameWithHost.h>
#include <Parsers/ASTDatabaseOrNone.h>
#include <Access/AccessControl.h>
#include <Access/ContextAccess.h>
#include <Access/ReplicatedAccessStorage.h>
#include <Access/User.h>
#include <Common/logger_useful.h>
#include <Interpreters/Access/InterpreterSetRoleQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/removeOnClusterClauseIfNeeded.h>
#include <Parsers/ASTDatabaseOrNone.h>
#include <Parsers/Access/ASTCreateUserQuery.h>
#include <Parsers/Access/ASTRolesOrUsersSet.h>
#include <Parsers/Access/ASTUserNameWithHost.h>
#include <boost/range/algorithm/copy.hpp>
@ -105,7 +109,9 @@ namespace
BlockIO InterpreterCreateUserQuery::execute()
{
const auto & query = query_ptr->as<const ASTCreateUserQuery &>();
const auto updated_query_ptr = removeOnClusterClauseIfNeeded(query_ptr, getContext());
const auto & query = updated_query_ptr->as<const ASTCreateUserQuery &>();
auto & access_control = getContext()->getAccessControl();
auto access = getContext()->getAccess();
access->checkAccess(query.alter ? AccessType::ALTER_USER : AccessType::CREATE_USER);
@ -138,7 +144,7 @@ BlockIO InterpreterCreateUserQuery::execute()
}
if (!query.cluster.empty())
return executeDDLQueryOnCluster(query_ptr, getContext());
return executeDDLQueryOnCluster(updated_query_ptr, getContext());
IAccessStorage * storage = &access_control;
MultipleAccessStorage::StoragePtr storage_ptr;

View File

@ -1,11 +1,12 @@
#include <Interpreters/Access/InterpreterDropAccessEntityQuery.h>
#include <Parsers/Access/ASTDropAccessEntityQuery.h>
#include <Parsers/Access/ASTRowPolicyName.h>
#include <Access/AccessControl.h>
#include <Access/Common/AccessRightsElement.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/removeOnClusterClauseIfNeeded.h>
#include <Parsers/Access/ASTDropAccessEntityQuery.h>
#include <Parsers/Access/ASTRowPolicyName.h>
namespace DB
{
@ -17,12 +18,14 @@ namespace ErrorCodes
BlockIO InterpreterDropAccessEntityQuery::execute()
{
auto & query = query_ptr->as<ASTDropAccessEntityQuery &>();
const auto updated_query_ptr = removeOnClusterClauseIfNeeded(query_ptr, getContext());
auto & query = updated_query_ptr->as<ASTDropAccessEntityQuery &>();
auto & access_control = getContext()->getAccessControl();
getContext()->checkAccess(getRequiredAccess());
if (!query.cluster.empty())
return executeDDLQueryOnCluster(query_ptr, getContext());
return executeDDLQueryOnCluster(updated_query_ptr, getContext());
query.replaceEmptyDatabase(getContext()->getCurrentDatabase());

View File

@ -5,6 +5,7 @@
#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/removeOnClusterClauseIfNeeded.h>
#include <Parsers/ASTCreateFunctionQuery.h>
@ -18,7 +19,8 @@ namespace ErrorCodes
BlockIO InterpreterCreateFunctionQuery::execute()
{
ASTCreateFunctionQuery & create_function_query = query_ptr->as<ASTCreateFunctionQuery &>();
const auto updated_query_ptr = removeOnClusterClauseIfNeeded(query_ptr, getContext());
ASTCreateFunctionQuery & create_function_query = updated_query_ptr->as<ASTCreateFunctionQuery &>();
AccessRightsElements access_rights_elements;
access_rights_elements.emplace_back(AccessType::CREATE_FUNCTION);
@ -35,7 +37,7 @@ BlockIO InterpreterCreateFunctionQuery::execute()
DDLQueryOnClusterParams params;
params.access_to_check = std::move(access_rights_elements);
return executeDDLQueryOnCluster(query_ptr, current_context, params);
return executeDDLQueryOnCluster(updated_query_ptr, current_context, params);
}
current_context->checkAccess(access_rights_elements);
@ -44,7 +46,7 @@ BlockIO InterpreterCreateFunctionQuery::execute()
bool throw_if_exists = !create_function_query.if_not_exists && !create_function_query.or_replace;
bool replace_if_exists = create_function_query.or_replace;
UserDefinedSQLFunctionFactory::instance().registerFunction(current_context, function_name, query_ptr, throw_if_exists, replace_if_exists);
UserDefinedSQLFunctionFactory::instance().registerFunction(current_context, function_name, updated_query_ptr, throw_if_exists, replace_if_exists);
return {};
}

View File

@ -1,12 +1,13 @@
#include <Parsers/ASTDropFunctionQuery.h>
#include <Interpreters/InterpreterDropFunctionQuery.h>
#include <Access/ContextAccess.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Interpreters/InterpreterDropFunctionQuery.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Interpreters/removeOnClusterClauseIfNeeded.h>
#include <Parsers/ASTDropFunctionQuery.h>
namespace DB
@ -20,7 +21,9 @@ namespace ErrorCodes
BlockIO InterpreterDropFunctionQuery::execute()
{
FunctionNameNormalizer().visit(query_ptr.get());
ASTDropFunctionQuery & drop_function_query = query_ptr->as<ASTDropFunctionQuery &>();
const auto updated_query_ptr = removeOnClusterClauseIfNeeded(query_ptr, getContext());
ASTDropFunctionQuery & drop_function_query = updated_query_ptr->as<ASTDropFunctionQuery &>();
AccessRightsElements access_rights_elements;
access_rights_elements.emplace_back(AccessType::DROP_FUNCTION);
@ -34,7 +37,7 @@ BlockIO InterpreterDropFunctionQuery::execute()
DDLQueryOnClusterParams params;
params.access_to_check = std::move(access_rights_elements);
return executeDDLQueryOnCluster(query_ptr, current_context, params);
return executeDDLQueryOnCluster(updated_query_ptr, current_context, params);
}
current_context->checkAccess(access_rights_elements);

View File

@ -0,0 +1,59 @@
#include <Interpreters/removeOnClusterClauseIfNeeded.h>
#include <Access/AccessControl.h>
#include <Access/ReplicatedAccessStorage.h>
#include <Common/logger_useful.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Parsers/ASTDropFunctionQuery.h>
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/Access/ASTCreateQuotaQuery.h>
#include <Parsers/Access/ASTCreateRoleQuery.h>
#include <Parsers/Access/ASTCreateRowPolicyQuery.h>
#include <Parsers/Access/ASTCreateSettingsProfileQuery.h>
#include <Parsers/Access/ASTCreateUserQuery.h>
#include <Parsers/Access/ASTDropAccessEntityQuery.h>
namespace DB
{
static bool isUserDefinedFunctionQuery(const ASTPtr & query)
{
return query->as<ASTCreateFunctionQuery>()
|| query->as<ASTDropFunctionQuery>();
}
static bool isAccessControlQuery(const ASTPtr & query)
{
return query->as<ASTCreateUserQuery>()
|| query->as<ASTCreateQuotaQuery>()
|| query->as<ASTCreateRoleQuery>()
|| query->as<ASTCreateRowPolicyQuery>()
|| query->as<ASTCreateSettingsProfileQuery>()
|| query->as<ASTDropAccessEntityQuery>();
}
ASTPtr removeOnClusterClauseIfNeeded(const ASTPtr & query, ContextPtr context, const WithoutOnClusterASTRewriteParams & params)
{
auto * query_on_cluster = dynamic_cast<ASTQueryWithOnCluster *>(query.get());
if (!query_on_cluster || query_on_cluster->cluster.empty())
return query;
if ((isUserDefinedFunctionQuery(query)
&& context->getSettings().ignore_on_cluster_for_replicated_udf_queries
&& context->getUserDefinedSQLObjectsLoader().isReplicated())
|| (isAccessControlQuery(query)
&& context->getSettings().ignore_on_cluster_for_replicated_access_entities_queries
&& context->getAccessControl().containsStorage(ReplicatedAccessStorage::STORAGE_TYPE)))
{
LOG_DEBUG(&Poco::Logger::get("removeOnClusterClauseIfNeeded"), "ON CLUSTER clause was ignored for query {}", query->getID());
return query_on_cluster->getRewrittenASTWithoutOnCluster(params);
}
return query;
}
}

View File

@ -0,0 +1,12 @@
#pragma once
#include <Interpreters/IInterpreter.h>
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
ASTPtr removeOnClusterClauseIfNeeded(const ASTPtr & query_ptr, ContextPtr context, const WithoutOnClusterASTRewriteParams & params = {});
}

View File

@ -1,10 +1,12 @@
import inspect
from contextlib import nullcontext as does_not_raise
import pytest
import time
import os.path
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
from helpers.test_tools import assert_eq_with_retry, TSV
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -83,6 +85,33 @@ def test_create_and_drop():
node1.query("DROP FUNCTION f1")
@pytest.mark.parametrize(
"ignore, expected_raise",
[("true", does_not_raise()), ("false", pytest.raises(QueryRuntimeException))],
)
def test_create_and_drop_udf_on_cluster(ignore, expected_raise):
node1.replace_config(
"/etc/clickhouse-server/users.d/users.xml",
inspect.cleandoc(
f"""
<clickhouse>
<profiles>
<default>
<ignore_on_cluster_for_replicated_udf_queries>{ignore}</ignore_on_cluster_for_replicated_udf_queries>
</default>
</profiles>
</clickhouse>
"""
),
)
node1.query("SYSTEM RELOAD CONFIG")
with expected_raise:
node1.query("CREATE FUNCTION f1 ON CLUSTER default AS (x, y) -> x + y")
assert node1.query("SELECT f1(12, 3)") == "15\n"
node1.query("DROP FUNCTION f1 ON CLUSTER default")
def test_create_and_replace():
node1.query("CREATE FUNCTION f1 AS (x, y) -> x + y")
assert node1.query("SELECT f1(12, 3)") == "15\n"

View File

@ -1,3 +1,4 @@
import inspect
import pytest
import time
@ -82,6 +83,37 @@ def test_create_replicated_on_cluster(started_cluster, entity):
node1.query(f"DROP {entity.keyword} {entity.name} {entity.options}")
@pytest.mark.parametrize("entity", entities, ids=get_entity_id)
def test_create_replicated_on_cluster_ignore(started_cluster, entity):
node1.replace_config(
"/etc/clickhouse-server/users.d/users.xml",
inspect.cleandoc(
f"""
<clickhouse>
<profiles>
<default>
<ignore_on_cluster_for_replicated_access_entities_queries>true</ignore_on_cluster_for_replicated_access_entities_queries>
</default>
</profiles>
</clickhouse>
"""
),
)
node1.query("SYSTEM RELOAD CONFIG")
node1.query(
f"CREATE {entity.keyword} {entity.name} ON CLUSTER default {entity.options}"
)
assert (
f"cannot insert because {entity.keyword.lower()} `{entity.name}{entity.options}` already exists in replicated"
in node2.query_and_get_error_with_retry(
f"CREATE {entity.keyword} {entity.name} {entity.options}"
)
)
node1.query(f"DROP {entity.keyword} {entity.name} {entity.options}")
@pytest.mark.parametrize("entity", entities, ids=get_entity_id)
def test_create_replicated_if_not_exists_on_cluster(started_cluster, entity):
node1.query(