mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 01:22:04 +00:00
Fix interserver secret for Distributed over Distributed from remote()
Right if you are executing remote() and later the query will go to the cluster with interserver secret, then you should have the same user on the nodes from that cluster, otherwise the query will fail with: DB::NetException: Connection reset by peer And on the remote node: <Debug> TCPHandler: User (initial, interserver mode): new_user (client: 172.16.1.5:40536) <Debug> TCP_INTERSERVER-Session: d29ecf7d-2c1c-44d2-8cc9-4ab08175bf05 Authentication failed with error: new_user: Authentication failed: password is incorrect, or there is no user with such name. <Error> ServerErrorHandler: Code: 516. DB::Exception: new_user: Authentication failed: password is incorrect, or there is no user with such name. (AUTHENTICATION_FAILED), Stack trace (when copying this message, always include the lines below): The problem is that remote() will not use passed to it user in any form, and instead, the initial user will be used, i.e. "cli_user" not "query_user": chc --user cli_user -q "select * from remote(node, default, some_dist_table, 'query_user')" Fix this by using the user from query for the remote(). Note, that the Distributed over Distributed in case of tables still wont work, for this you have to have the same users on all nodes in all clusters that are involved in case of interserver secret is enabled (see also test). Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> v2: move client initial_user adjustment into ClusterProxy/executeQuery.cpp v3: we cannot check for interserver_mode in updateSettingsAndClientInfoForCluster() since it is not yet interserver in remote() context
This commit is contained in:
parent
dffcc51b83
commit
3e68103ac8
@ -38,7 +38,8 @@ namespace ErrorCodes
|
||||
namespace ClusterProxy
|
||||
{
|
||||
|
||||
ContextMutablePtr updateSettingsForCluster(const Cluster & cluster,
|
||||
ContextMutablePtr updateSettingsAndClientInfoForCluster(const Cluster & cluster,
|
||||
bool is_remote_function,
|
||||
ContextPtr context,
|
||||
const Settings & settings,
|
||||
const StorageID & main_table,
|
||||
@ -46,9 +47,17 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster,
|
||||
LoggerPtr log,
|
||||
const DistributedSettings * distributed_settings)
|
||||
{
|
||||
ClientInfo new_client_info = context->getClientInfo();
|
||||
Settings new_settings = settings;
|
||||
new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.max_execution_time);
|
||||
|
||||
/// In case of interserver mode we should reset initial_user for remote() function to use passed user from the query.
|
||||
if (is_remote_function)
|
||||
{
|
||||
const auto & address = cluster.getShardsAddresses().front().front();
|
||||
new_client_info.initial_user = address.user;
|
||||
}
|
||||
|
||||
/// If "secret" (in remote_servers) is not in use,
|
||||
/// user on the shard is not the same as the user on the initiator,
|
||||
/// hence per-user limits should not be applied.
|
||||
@ -168,9 +177,23 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster,
|
||||
|
||||
auto new_context = Context::createCopy(context);
|
||||
new_context->setSettings(new_settings);
|
||||
new_context->setClientInfo(new_client_info);
|
||||
return new_context;
|
||||
}
|
||||
|
||||
ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, ContextPtr context, const Settings & settings, const StorageID & main_table)
|
||||
{
|
||||
return updateSettingsAndClientInfoForCluster(cluster,
|
||||
/* is_remote_function= */ false,
|
||||
context,
|
||||
settings,
|
||||
main_table,
|
||||
/* additional_filter_ast= */ {},
|
||||
/* log= */ {},
|
||||
/* distributed_settings= */ {});
|
||||
}
|
||||
|
||||
|
||||
static ThrottlerPtr getThrottler(const ContextPtr & context)
|
||||
{
|
||||
const Settings & settings = context->getSettingsRef();
|
||||
@ -209,7 +232,8 @@ void executeQuery(
|
||||
const ExpressionActionsPtr & sharding_key_expr,
|
||||
const std::string & sharding_key_column_name,
|
||||
const DistributedSettings & distributed_settings,
|
||||
AdditionalShardFilterGenerator shard_filter_generator)
|
||||
AdditionalShardFilterGenerator shard_filter_generator,
|
||||
bool is_remote_function)
|
||||
{
|
||||
const Settings & settings = context->getSettingsRef();
|
||||
|
||||
@ -222,8 +246,8 @@ void executeQuery(
|
||||
SelectStreamFactory::Shards remote_shards;
|
||||
|
||||
auto cluster = query_info.getCluster();
|
||||
auto new_context = updateSettingsForCluster(*cluster, context, settings, main_table, query_info.additional_filter_ast, log,
|
||||
&distributed_settings);
|
||||
auto new_context = updateSettingsAndClientInfoForCluster(*cluster, is_remote_function, context,
|
||||
settings, main_table, query_info.additional_filter_ast, log, &distributed_settings);
|
||||
if (context->getSettingsRef().allow_experimental_parallel_reading_from_replicas
|
||||
&& context->getSettingsRef().allow_experimental_parallel_reading_from_replicas.value
|
||||
!= new_context->getSettingsRef().allow_experimental_parallel_reading_from_replicas.value)
|
||||
|
@ -38,13 +38,7 @@ class SelectStreamFactory;
|
||||
/// - optimize_skip_unused_shards_nesting
|
||||
///
|
||||
/// @return new Context with adjusted settings
|
||||
ContextMutablePtr updateSettingsForCluster(const Cluster & cluster,
|
||||
ContextPtr context,
|
||||
const Settings & settings,
|
||||
const StorageID & main_table,
|
||||
ASTPtr additional_filter_ast = nullptr,
|
||||
LoggerPtr log = nullptr,
|
||||
const DistributedSettings * distributed_settings = nullptr);
|
||||
ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, ContextPtr context, const Settings & settings, const StorageID & main_table);
|
||||
|
||||
using AdditionalShardFilterGenerator = std::function<ASTPtr(uint64_t)>;
|
||||
/// Execute a distributed query, creating a query plan, from which the query pipeline can be built.
|
||||
@ -63,7 +57,8 @@ void executeQuery(
|
||||
const ExpressionActionsPtr & sharding_key_expr,
|
||||
const std::string & sharding_key_column_name,
|
||||
const DistributedSettings & distributed_settings,
|
||||
AdditionalShardFilterGenerator shard_filter_generator);
|
||||
AdditionalShardFilterGenerator shard_filter_generator,
|
||||
bool is_remote_function);
|
||||
|
||||
|
||||
void executeQueryWithParallelReplicas(
|
||||
|
@ -927,7 +927,8 @@ void StorageDistributed::read(
|
||||
sharding_key_expr,
|
||||
sharding_key_column_name,
|
||||
distributed_settings,
|
||||
additional_shard_filter_generator);
|
||||
additional_shard_filter_generator,
|
||||
/* is_remote_function= */ static_cast<bool>(owned_cluster));
|
||||
|
||||
/// This is a bug, it is possible only when there is no shards to query, and this is handled earlier.
|
||||
if (!query_plan.isInitialized())
|
||||
|
@ -0,0 +1,12 @@
|
||||
<clickhouse>
|
||||
<users>
|
||||
<new_user>
|
||||
<password></password>
|
||||
<networks>
|
||||
<ip>::/0</ip>
|
||||
</networks>
|
||||
<profile>default</profile>
|
||||
<quota>default</quota>
|
||||
</new_user>
|
||||
</users>
|
||||
</clickhouse>
|
@ -12,12 +12,16 @@ from helpers.cluster import ClickHouseCluster, CLICKHOUSE_CI_MIN_TESTED_VERSION
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
|
||||
def make_instance(name, cfg, *args, **kwargs):
|
||||
def make_instance(name, *args, **kwargs):
|
||||
main_configs = kwargs.pop("main_configs", [])
|
||||
main_configs.append("configs/remote_servers.xml")
|
||||
user_configs = kwargs.pop("user_configs", [])
|
||||
user_configs.append("configs/users.xml")
|
||||
return cluster.add_instance(
|
||||
name,
|
||||
with_zookeeper=True,
|
||||
main_configs=["configs/remote_servers.xml", cfg],
|
||||
user_configs=["configs/users.xml"],
|
||||
main_configs=main_configs,
|
||||
user_configs=user_configs,
|
||||
*args,
|
||||
**kwargs,
|
||||
)
|
||||
@ -27,11 +31,16 @@ def make_instance(name, cfg, *args, **kwargs):
|
||||
assert CLICKHOUSE_CI_MIN_TESTED_VERSION < "23.3"
|
||||
|
||||
# _n1/_n2 contains cluster with different <secret> -- should fail
|
||||
n1 = make_instance("n1", "configs/remote_servers_n1.xml")
|
||||
n2 = make_instance("n2", "configs/remote_servers_n2.xml")
|
||||
# only n1 contains new_user
|
||||
n1 = make_instance(
|
||||
"n1",
|
||||
main_configs=["configs/remote_servers_n1.xml"],
|
||||
user_configs=["configs/users.d/new_user.xml"],
|
||||
)
|
||||
n2 = make_instance("n2", main_configs=["configs/remote_servers_n2.xml"])
|
||||
backward = make_instance(
|
||||
"backward",
|
||||
"configs/remote_servers_backward.xml",
|
||||
main_configs=["configs/remote_servers_backward.xml"],
|
||||
image="clickhouse/clickhouse-server",
|
||||
# version without DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2
|
||||
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
|
||||
@ -100,6 +109,12 @@ def bootstrap():
|
||||
)
|
||||
"""
|
||||
)
|
||||
n.query(
|
||||
"""
|
||||
CREATE TABLE dist_over_dist_secure AS data
|
||||
Engine=Distributed(secure, currentDatabase(), dist_secure, key)
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
@ -432,3 +447,20 @@ def test_user_secure_cluster_from_backward(user, password):
|
||||
assert n1.contains_in_log(
|
||||
"Using deprecated interserver protocol because the client is too old. Consider upgrading all nodes in cluster."
|
||||
)
|
||||
|
||||
|
||||
def test_secure_cluster_distributed_over_distributed_different_users():
|
||||
# This works because we will have initial_user='default'
|
||||
n1.query(
|
||||
"SELECT * FROM remote('n1', currentDatabase(), dist_secure)", user="new_user"
|
||||
)
|
||||
# While this is broken because now initial_user='new_user', and n2 does not has it
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
n2.query(
|
||||
"SELECT * FROM remote('n1', currentDatabase(), dist_secure, 'new_user')"
|
||||
)
|
||||
# And this is still a problem, let's assume that this is OK, since we are
|
||||
# expecting that in case of dist-over-dist the clusters are the same (users
|
||||
# and stuff).
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
n1.query("SELECT * FROM dist_over_dist_secure", user="new_user")
|
||||
|
Loading…
Reference in New Issue
Block a user