mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge pull request #66288 from MikhailBurdukov/ignore_on_cluster_named_collection
Ignore ON CLUSTER clause in queries for management of replicated named collections.
This commit is contained in:
commit
c23aa2b765
@ -235,7 +235,7 @@ bool NamedCollectionFactory::loadIfNot(std::lock_guard<std::mutex> & lock)
|
||||
loadFromConfig(context->getConfigRef(), lock);
|
||||
loadFromSQL(lock);
|
||||
|
||||
if (metadata_storage->supportsPeriodicUpdate())
|
||||
if (metadata_storage->isReplicated())
|
||||
{
|
||||
update_task = context->getSchedulePool().createTask("NamedCollectionsMetadataStorage", [this]{ updateFunc(); });
|
||||
update_task->activate();
|
||||
@ -357,6 +357,13 @@ void NamedCollectionFactory::reloadFromSQL()
|
||||
add(std::move(collections), lock);
|
||||
}
|
||||
|
||||
bool NamedCollectionFactory::usesReplicatedStorage()
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
loadIfNot(lock);
|
||||
return metadata_storage->isReplicated();
|
||||
}
|
||||
|
||||
void NamedCollectionFactory::updateFunc()
|
||||
{
|
||||
LOG_TRACE(log, "Named collections background updating thread started");
|
||||
|
@ -34,6 +34,8 @@ public:
|
||||
|
||||
void updateFromSQL(const ASTAlterNamedCollectionQuery & query);
|
||||
|
||||
bool usesReplicatedStorage();
|
||||
|
||||
void loadIfNot();
|
||||
|
||||
void shutdown();
|
||||
|
@ -67,7 +67,7 @@ public:
|
||||
|
||||
virtual bool removeIfExists(const std::string & path) = 0;
|
||||
|
||||
virtual bool supportsPeriodicUpdate() const = 0;
|
||||
virtual bool isReplicated() const = 0;
|
||||
|
||||
virtual bool waitUpdate(size_t /* timeout */) { return false; }
|
||||
};
|
||||
@ -89,7 +89,7 @@ public:
|
||||
|
||||
~LocalStorage() override = default;
|
||||
|
||||
bool supportsPeriodicUpdate() const override { return false; }
|
||||
bool isReplicated() const override { return false; }
|
||||
|
||||
std::vector<std::string> list() const override
|
||||
{
|
||||
@ -221,7 +221,7 @@ public:
|
||||
|
||||
~ZooKeeperStorage() override = default;
|
||||
|
||||
bool supportsPeriodicUpdate() const override { return true; }
|
||||
bool isReplicated() const override { return true; }
|
||||
|
||||
/// Return true if children changed.
|
||||
bool waitUpdate(size_t timeout) override
|
||||
@ -465,14 +465,14 @@ void NamedCollectionsMetadataStorage::writeCreateQuery(const ASTCreateNamedColle
|
||||
storage->write(getFileName(query.collection_name), serializeAST(*normalized_query), replace);
|
||||
}
|
||||
|
||||
bool NamedCollectionsMetadataStorage::supportsPeriodicUpdate() const
|
||||
bool NamedCollectionsMetadataStorage::isReplicated() const
|
||||
{
|
||||
return storage->supportsPeriodicUpdate();
|
||||
return storage->isReplicated();
|
||||
}
|
||||
|
||||
bool NamedCollectionsMetadataStorage::waitUpdate()
|
||||
{
|
||||
if (!storage->supportsPeriodicUpdate())
|
||||
if (!storage->isReplicated())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Periodic updates are not supported");
|
||||
|
||||
const auto & config = Context::getGlobalContextInstance()->getConfigRef();
|
||||
|
@ -30,7 +30,7 @@ public:
|
||||
/// Return true if update was made
|
||||
bool waitUpdate();
|
||||
|
||||
bool supportsPeriodicUpdate() const;
|
||||
bool isReplicated() const;
|
||||
|
||||
private:
|
||||
class INamedCollectionsStorage;
|
||||
|
@ -346,6 +346,7 @@ class IColumn;
|
||||
\
|
||||
M(Bool, ignore_on_cluster_for_replicated_udf_queries, false, "Ignore ON CLUSTER clause for replicated UDF management queries.", 0) \
|
||||
M(Bool, ignore_on_cluster_for_replicated_access_entities_queries, false, "Ignore ON CLUSTER clause for replicated access entities management queries.", 0) \
|
||||
M(Bool, ignore_on_cluster_for_replicated_named_collections_queries, false, "Ignore ON CLUSTER clause for replicated named collections management queries.", 0) \
|
||||
/** Settings for testing hedged requests */ \
|
||||
M(Milliseconds, sleep_in_send_tables_status_ms, 0, "Time to sleep in sending tables status response in TCPHandler", 0) \
|
||||
M(Milliseconds, sleep_in_send_data_ms, 0, "Time to sleep in sending data in TCPHandler", 0) \
|
||||
|
@ -76,6 +76,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
|
||||
{"azure_sdk_max_retries", 10, 10, "Maximum number of retries in azure sdk"},
|
||||
{"azure_sdk_retry_initial_backoff_ms", 10, 10, "Minimal backoff between retries in azure sdk"},
|
||||
{"azure_sdk_retry_max_backoff_ms", 1000, 1000, "Maximal backoff between retries in azure sdk"},
|
||||
{"ignore_on_cluster_for_replicated_named_collections_queries", false, false, "Ignore ON CLUSTER clause for replicated named collections management queries."},
|
||||
{"postgresql_connection_attempt_timeout", 2, 2, "Allow to control 'connect_timeout' parameter of PostgreSQL connection."},
|
||||
{"postgresql_connection_pool_retries", 2, 2, "Allow to control the number of retries in PostgreSQL connection pool."}
|
||||
}},
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <Access/ContextAccess.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Interpreters/removeOnClusterClauseIfNeeded.h>
|
||||
#include <Common/NamedCollections/NamedCollectionsFactory.h>
|
||||
|
||||
|
||||
@ -13,14 +14,16 @@ namespace DB
|
||||
BlockIO InterpreterAlterNamedCollectionQuery::execute()
|
||||
{
|
||||
auto current_context = getContext();
|
||||
const auto & query = query_ptr->as<const ASTAlterNamedCollectionQuery &>();
|
||||
|
||||
const auto updated_query = removeOnClusterClauseIfNeeded(query_ptr, getContext());
|
||||
const auto & query = updated_query->as<const ASTAlterNamedCollectionQuery &>();
|
||||
|
||||
current_context->checkAccess(AccessType::ALTER_NAMED_COLLECTION, query.collection_name);
|
||||
|
||||
if (!query.cluster.empty())
|
||||
{
|
||||
DDLQueryOnClusterParams params;
|
||||
return executeDDLQueryOnCluster(query_ptr, current_context, params);
|
||||
return executeDDLQueryOnCluster(updated_query, current_context, params);
|
||||
}
|
||||
|
||||
NamedCollectionFactory::instance().updateFromSQL(query);
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <Access/ContextAccess.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Interpreters/removeOnClusterClauseIfNeeded.h>
|
||||
#include <Common/NamedCollections/NamedCollectionsFactory.h>
|
||||
|
||||
|
||||
@ -13,14 +14,16 @@ namespace DB
|
||||
BlockIO InterpreterCreateNamedCollectionQuery::execute()
|
||||
{
|
||||
auto current_context = getContext();
|
||||
const auto & query = query_ptr->as<const ASTCreateNamedCollectionQuery &>();
|
||||
|
||||
const auto updated_query = removeOnClusterClauseIfNeeded(query_ptr, getContext());
|
||||
const auto & query = updated_query->as<const ASTCreateNamedCollectionQuery &>();
|
||||
|
||||
current_context->checkAccess(AccessType::CREATE_NAMED_COLLECTION, query.collection_name);
|
||||
|
||||
if (!query.cluster.empty())
|
||||
{
|
||||
DDLQueryOnClusterParams params;
|
||||
return executeDDLQueryOnCluster(query_ptr, current_context, params);
|
||||
return executeDDLQueryOnCluster(updated_query, current_context, params);
|
||||
}
|
||||
|
||||
NamedCollectionFactory::instance().createFromSQL(query);
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <Access/ContextAccess.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Interpreters/removeOnClusterClauseIfNeeded.h>
|
||||
#include <Common/NamedCollections/NamedCollectionsFactory.h>
|
||||
|
||||
|
||||
@ -13,14 +14,16 @@ namespace DB
|
||||
BlockIO InterpreterDropNamedCollectionQuery::execute()
|
||||
{
|
||||
auto current_context = getContext();
|
||||
const auto & query = query_ptr->as<const ASTDropNamedCollectionQuery &>();
|
||||
|
||||
const auto updated_query = removeOnClusterClauseIfNeeded(query_ptr, getContext());
|
||||
const auto & query = updated_query->as<const ASTDropNamedCollectionQuery &>();
|
||||
|
||||
current_context->checkAccess(AccessType::DROP_NAMED_COLLECTION, query.collection_name);
|
||||
|
||||
if (!query.cluster.empty())
|
||||
{
|
||||
DDLQueryOnClusterParams params;
|
||||
return executeDDLQueryOnCluster(query_ptr, current_context, params);
|
||||
return executeDDLQueryOnCluster(updated_query, current_context, params);
|
||||
}
|
||||
|
||||
NamedCollectionFactory::instance().removeFromSQL(query);
|
||||
|
@ -15,6 +15,10 @@
|
||||
#include <Parsers/Access/ASTCreateUserQuery.h>
|
||||
#include <Parsers/Access/ASTDropAccessEntityQuery.h>
|
||||
#include <Parsers/Access/ASTGrantQuery.h>
|
||||
#include <Parsers/ASTCreateNamedCollectionQuery.h>
|
||||
#include <Parsers/ASTAlterNamedCollectionQuery.h>
|
||||
#include <Parsers/ASTDropNamedCollectionQuery.h>
|
||||
#include <Common/NamedCollections/NamedCollectionsFactory.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -38,6 +42,13 @@ static bool isAccessControlQuery(const ASTPtr & query)
|
||||
|| query->as<ASTGrantQuery>();
|
||||
}
|
||||
|
||||
static bool isNamedCollectionQuery(const ASTPtr & query)
|
||||
{
|
||||
return query->as<ASTCreateNamedCollectionQuery>()
|
||||
|| query->as<ASTDropNamedCollectionQuery>()
|
||||
|| query->as<ASTAlterNamedCollectionQuery>();
|
||||
}
|
||||
|
||||
ASTPtr removeOnClusterClauseIfNeeded(const ASTPtr & query, ContextPtr context, const WithoutOnClusterASTRewriteParams & params)
|
||||
{
|
||||
auto * query_on_cluster = dynamic_cast<ASTQueryWithOnCluster *>(query.get());
|
||||
@ -50,7 +61,10 @@ ASTPtr removeOnClusterClauseIfNeeded(const ASTPtr & query, ContextPtr context, c
|
||||
&& context->getUserDefinedSQLObjectsStorage().isReplicated())
|
||||
|| (isAccessControlQuery(query)
|
||||
&& context->getSettings().ignore_on_cluster_for_replicated_access_entities_queries
|
||||
&& context->getAccessControl().containsStorage(ReplicatedAccessStorage::STORAGE_TYPE)))
|
||||
&& context->getAccessControl().containsStorage(ReplicatedAccessStorage::STORAGE_TYPE))
|
||||
|| (isNamedCollectionQuery(query)
|
||||
&& context->getSettings().ignore_on_cluster_for_replicated_named_collections_queries
|
||||
&& NamedCollectionFactory::instance().usesReplicatedStorage()))
|
||||
{
|
||||
LOG_DEBUG(getLogger("removeOnClusterClauseIfNeeded"), "ON CLUSTER clause was ignored for query {}", query->getID());
|
||||
return query_on_cluster->getRewrittenASTWithoutOnCluster(params);
|
||||
|
@ -9,4 +9,21 @@
|
||||
<key1>value1</key1>
|
||||
</collection1>
|
||||
</named_collections>
|
||||
|
||||
<remote_servers>
|
||||
<replicated_nc_nodes_cluster>
|
||||
<shard>
|
||||
<internal_replication>true</internal_replication>
|
||||
<replica>
|
||||
<host>node_with_keeper</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<host>node_with_keeper_2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
<allow_distributed_ddl_queries>true</allow_distributed_ddl_queries>
|
||||
</replicated_nc_nodes_cluster>
|
||||
</remote_servers>
|
||||
</clickhouse>
|
||||
|
@ -1,4 +1,9 @@
|
||||
<clickhouse>
|
||||
<profiles>
|
||||
<default>
|
||||
<ignore_on_cluster_for_replicated_named_collections_queries>0</ignore_on_cluster_for_replicated_named_collections_queries>
|
||||
</default>
|
||||
</profiles>
|
||||
<users>
|
||||
<default>
|
||||
<password></password>
|
||||
|
@ -3,6 +3,8 @@ import pytest
|
||||
import os
|
||||
import time
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from contextlib import nullcontext as does_not_raise
|
||||
from helpers.client import QueryRuntimeException
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
NAMED_COLLECTIONS_CONFIG = os.path.join(
|
||||
@ -761,3 +763,32 @@ def test_keeper_storage(cluster):
|
||||
|
||||
check_dropped(node1)
|
||||
check_dropped(node2)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"ignore, expected_raise",
|
||||
[(True, does_not_raise()), (False, pytest.raises(QueryRuntimeException))],
|
||||
)
|
||||
def test_keeper_storage_remove_on_cluster(cluster, ignore, expected_raise):
|
||||
node = cluster.instances["node_with_keeper"]
|
||||
|
||||
replace_in_users_config(
|
||||
node,
|
||||
"ignore_on_cluster_for_replicated_named_collections_queries>.",
|
||||
f"ignore_on_cluster_for_replicated_named_collections_queries>{int(ignore)}",
|
||||
)
|
||||
node.query("SYSTEM RELOAD CONFIG")
|
||||
|
||||
with expected_raise:
|
||||
node.query(
|
||||
"DROP NAMED COLLECTION IF EXISTS test_nc ON CLUSTER `replicated_nc_nodes_cluster`"
|
||||
)
|
||||
node.query(
|
||||
f"CREATE NAMED COLLECTION test_nc ON CLUSTER `replicated_nc_nodes_cluster` AS key1=1, key2=2 OVERRIDABLE"
|
||||
)
|
||||
node.query(
|
||||
f"ALTER NAMED COLLECTION test_nc ON CLUSTER `replicated_nc_nodes_cluster` SET key2=3"
|
||||
)
|
||||
node.query(
|
||||
f"DROP NAMED COLLECTION test_nc ON CLUSTER `replicated_nc_nodes_cluster`"
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user