From 3e68103ac84e0c3d59759f5af5d615191a6a59e5 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Fri, 26 Apr 2024 09:11:20 +0200 Subject: [PATCH] Fix interserver secret for Distributed over Distributed from remote() Right if you are executing remote() and later the query will go to the cluster with interserver secret, then you should have the same user on the nodes from that cluster, otherwise the query will fail with: DB::NetException: Connection reset by peer And on the remote node: TCPHandler: User (initial, interserver mode): new_user (client: 172.16.1.5:40536) TCP_INTERSERVER-Session: d29ecf7d-2c1c-44d2-8cc9-4ab08175bf05 Authentication failed with error: new_user: Authentication failed: password is incorrect, or there is no user with such name. ServerErrorHandler: Code: 516. DB::Exception: new_user: Authentication failed: password is incorrect, or there is no user with such name. (AUTHENTICATION_FAILED), Stack trace (when copying this message, always include the lines below): The problem is that remote() will not use passed to it user in any form, and instead, the initial user will be used, i.e. "cli_user" not "query_user": chc --user cli_user -q "select * from remote(node, default, some_dist_table, 'query_user')" Fix this by using the user from query for the remote(). Note, that the Distributed over Distributed in case of tables still wont work, for this you have to have the same users on all nodes in all clusters that are involved in case of interserver secret is enabled (see also test). Signed-off-by: Azat Khuzhin v2: move client initial_user adjustment into ClusterProxy/executeQuery.cpp v3: we cannot check for interserver_mode in updateSettingsAndClientInfoForCluster() since it is not yet interserver in remote() context --- .../ClusterProxy/executeQuery.cpp | 32 ++++++++++++-- src/Interpreters/ClusterProxy/executeQuery.h | 11 ++--- src/Storages/StorageDistributed.cpp | 3 +- .../configs/users.d/new_user.xml | 12 +++++ .../test.py | 44 ++++++++++++++++--- 5 files changed, 83 insertions(+), 19 deletions(-) create mode 100644 tests/integration/test_distributed_inter_server_secret/configs/users.d/new_user.xml diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index f7727f70ff7..2af33421add 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -38,7 +38,8 @@ namespace ErrorCodes namespace ClusterProxy { -ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, +ContextMutablePtr updateSettingsAndClientInfoForCluster(const Cluster & cluster, + bool is_remote_function, ContextPtr context, const Settings & settings, const StorageID & main_table, @@ -46,9 +47,17 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, LoggerPtr log, const DistributedSettings * distributed_settings) { + ClientInfo new_client_info = context->getClientInfo(); Settings new_settings = settings; new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.max_execution_time); + /// In case of interserver mode we should reset initial_user for remote() function to use passed user from the query. + if (is_remote_function) + { + const auto & address = cluster.getShardsAddresses().front().front(); + new_client_info.initial_user = address.user; + } + /// If "secret" (in remote_servers) is not in use, /// user on the shard is not the same as the user on the initiator, /// hence per-user limits should not be applied. @@ -168,9 +177,23 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, auto new_context = Context::createCopy(context); new_context->setSettings(new_settings); + new_context->setClientInfo(new_client_info); return new_context; } +ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, ContextPtr context, const Settings & settings, const StorageID & main_table) +{ + return updateSettingsAndClientInfoForCluster(cluster, + /* is_remote_function= */ false, + context, + settings, + main_table, + /* additional_filter_ast= */ {}, + /* log= */ {}, + /* distributed_settings= */ {}); +} + + static ThrottlerPtr getThrottler(const ContextPtr & context) { const Settings & settings = context->getSettingsRef(); @@ -209,7 +232,8 @@ void executeQuery( const ExpressionActionsPtr & sharding_key_expr, const std::string & sharding_key_column_name, const DistributedSettings & distributed_settings, - AdditionalShardFilterGenerator shard_filter_generator) + AdditionalShardFilterGenerator shard_filter_generator, + bool is_remote_function) { const Settings & settings = context->getSettingsRef(); @@ -222,8 +246,8 @@ void executeQuery( SelectStreamFactory::Shards remote_shards; auto cluster = query_info.getCluster(); - auto new_context = updateSettingsForCluster(*cluster, context, settings, main_table, query_info.additional_filter_ast, log, - &distributed_settings); + auto new_context = updateSettingsAndClientInfoForCluster(*cluster, is_remote_function, context, + settings, main_table, query_info.additional_filter_ast, log, &distributed_settings); if (context->getSettingsRef().allow_experimental_parallel_reading_from_replicas && context->getSettingsRef().allow_experimental_parallel_reading_from_replicas.value != new_context->getSettingsRef().allow_experimental_parallel_reading_from_replicas.value) diff --git a/src/Interpreters/ClusterProxy/executeQuery.h b/src/Interpreters/ClusterProxy/executeQuery.h index 8f6f6300c7b..3734a237d19 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.h +++ b/src/Interpreters/ClusterProxy/executeQuery.h @@ -38,13 +38,7 @@ class SelectStreamFactory; /// - optimize_skip_unused_shards_nesting /// /// @return new Context with adjusted settings -ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, - ContextPtr context, - const Settings & settings, - const StorageID & main_table, - ASTPtr additional_filter_ast = nullptr, - LoggerPtr log = nullptr, - const DistributedSettings * distributed_settings = nullptr); +ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, ContextPtr context, const Settings & settings, const StorageID & main_table); using AdditionalShardFilterGenerator = std::function; /// Execute a distributed query, creating a query plan, from which the query pipeline can be built. @@ -63,7 +57,8 @@ void executeQuery( const ExpressionActionsPtr & sharding_key_expr, const std::string & sharding_key_column_name, const DistributedSettings & distributed_settings, - AdditionalShardFilterGenerator shard_filter_generator); + AdditionalShardFilterGenerator shard_filter_generator, + bool is_remote_function); void executeQueryWithParallelReplicas( diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 69d3cf3ad3b..a4f51f1f587 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -927,7 +927,8 @@ void StorageDistributed::read( sharding_key_expr, sharding_key_column_name, distributed_settings, - additional_shard_filter_generator); + additional_shard_filter_generator, + /* is_remote_function= */ static_cast(owned_cluster)); /// This is a bug, it is possible only when there is no shards to query, and this is handled earlier. if (!query_plan.isInitialized()) diff --git a/tests/integration/test_distributed_inter_server_secret/configs/users.d/new_user.xml b/tests/integration/test_distributed_inter_server_secret/configs/users.d/new_user.xml new file mode 100644 index 00000000000..a747d61a0dd --- /dev/null +++ b/tests/integration/test_distributed_inter_server_secret/configs/users.d/new_user.xml @@ -0,0 +1,12 @@ + + + + + + ::/0 + + default + default + + + diff --git a/tests/integration/test_distributed_inter_server_secret/test.py b/tests/integration/test_distributed_inter_server_secret/test.py index 10dbb23d961..50d7be4d11e 100644 --- a/tests/integration/test_distributed_inter_server_secret/test.py +++ b/tests/integration/test_distributed_inter_server_secret/test.py @@ -12,12 +12,16 @@ from helpers.cluster import ClickHouseCluster, CLICKHOUSE_CI_MIN_TESTED_VERSION cluster = ClickHouseCluster(__file__) -def make_instance(name, cfg, *args, **kwargs): +def make_instance(name, *args, **kwargs): + main_configs = kwargs.pop("main_configs", []) + main_configs.append("configs/remote_servers.xml") + user_configs = kwargs.pop("user_configs", []) + user_configs.append("configs/users.xml") return cluster.add_instance( name, with_zookeeper=True, - main_configs=["configs/remote_servers.xml", cfg], - user_configs=["configs/users.xml"], + main_configs=main_configs, + user_configs=user_configs, *args, **kwargs, ) @@ -27,11 +31,16 @@ def make_instance(name, cfg, *args, **kwargs): assert CLICKHOUSE_CI_MIN_TESTED_VERSION < "23.3" # _n1/_n2 contains cluster with different -- should fail -n1 = make_instance("n1", "configs/remote_servers_n1.xml") -n2 = make_instance("n2", "configs/remote_servers_n2.xml") +# only n1 contains new_user +n1 = make_instance( + "n1", + main_configs=["configs/remote_servers_n1.xml"], + user_configs=["configs/users.d/new_user.xml"], +) +n2 = make_instance("n2", main_configs=["configs/remote_servers_n2.xml"]) backward = make_instance( "backward", - "configs/remote_servers_backward.xml", + main_configs=["configs/remote_servers_backward.xml"], image="clickhouse/clickhouse-server", # version without DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2 tag=CLICKHOUSE_CI_MIN_TESTED_VERSION, @@ -100,6 +109,12 @@ def bootstrap(): ) """ ) + n.query( + """ + CREATE TABLE dist_over_dist_secure AS data + Engine=Distributed(secure, currentDatabase(), dist_secure, key) + """ + ) @pytest.fixture(scope="module", autouse=True) @@ -432,3 +447,20 @@ def test_user_secure_cluster_from_backward(user, password): assert n1.contains_in_log( "Using deprecated interserver protocol because the client is too old. Consider upgrading all nodes in cluster." ) + + +def test_secure_cluster_distributed_over_distributed_different_users(): + # This works because we will have initial_user='default' + n1.query( + "SELECT * FROM remote('n1', currentDatabase(), dist_secure)", user="new_user" + ) + # While this is broken because now initial_user='new_user', and n2 does not has it + with pytest.raises(QueryRuntimeException): + n2.query( + "SELECT * FROM remote('n1', currentDatabase(), dist_secure, 'new_user')" + ) + # And this is still a problem, let's assume that this is OK, since we are + # expecting that in case of dist-over-dist the clusters are the same (users + # and stuff). + with pytest.raises(QueryRuntimeException): + n1.query("SELECT * FROM dist_over_dist_secure", user="new_user")