mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-17 20:02:05 +00:00
Merge pull request #29060 from azat/inter-server-secret-auth-fix
Do not allow to reuse previous credentials in case of inter-server secret
This commit is contained in:
commit
27f6d5864d
@ -259,9 +259,7 @@ Session::~Session()
|
||||
|
||||
if (notified_session_log_about_login)
|
||||
{
|
||||
// must have been set in makeQueryContext or makeSessionContext
|
||||
assert(user);
|
||||
if (auto session_log = getSessionLog())
|
||||
if (auto session_log = getSessionLog(); session_log && user)
|
||||
session_log->addLogOut(session_id, user->getName(), getClientInfo());
|
||||
}
|
||||
}
|
||||
@ -467,7 +465,7 @@ ContextMutablePtr Session::makeQueryContextImpl(const ClientInfo * client_info_t
|
||||
|
||||
if (!notified_session_log_about_login)
|
||||
{
|
||||
if (auto session_log = getSessionLog())
|
||||
if (auto session_log = getSessionLog(); session_log && user)
|
||||
{
|
||||
session_log->addLoginSuccess(
|
||||
session_id,
|
||||
|
@ -1131,6 +1131,17 @@ void TCPHandler::receiveQuery()
|
||||
state.is_empty = false;
|
||||
readStringBinary(state.query_id, *in);
|
||||
|
||||
/// In interserer mode,
|
||||
/// initial_user can be empty in case of Distributed INSERT via Buffer/Kafka,
|
||||
/// (i.e. when the INSERT is done with the global context w/o user),
|
||||
/// so it is better to reset session to avoid using old user.
|
||||
if (is_interserver_mode)
|
||||
{
|
||||
ClientInfo original_session_client_info = session->getClientInfo();
|
||||
session = std::make_unique<Session>(server.context(), ClientInfo::Interface::TCP_INTERSERVER);
|
||||
session->getClientInfo() = original_session_client_info;
|
||||
}
|
||||
|
||||
/// Read client info.
|
||||
ClientInfo client_info = session->getClientInfo();
|
||||
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_CLIENT_INFO)
|
||||
@ -1178,11 +1189,13 @@ void TCPHandler::receiveQuery()
|
||||
throw NetException("Hash mismatch", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
||||
/// TODO: change error code?
|
||||
|
||||
/// initial_user can be empty in case of Distributed INSERT via Buffer/Kafka,
|
||||
/// i.e. when the INSERT is done with the global context (w/o user).
|
||||
if (!client_info.initial_user.empty())
|
||||
if (client_info.initial_user.empty())
|
||||
{
|
||||
LOG_DEBUG(log, "User (initial): {}", client_info.initial_user);
|
||||
LOG_DEBUG(log, "User (no user, interserver mode)");
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(log, "User (initial, interserver mode): {}", client_info.initial_user);
|
||||
session->authenticate(AlwaysAllowCredentials{client_info.initial_user}, client_info.initial_address);
|
||||
}
|
||||
#else
|
||||
|
@ -16,13 +16,34 @@ class Client:
|
||||
|
||||
self.command += ['--host', self.host, '--port', str(self.port), '--stacktrace']
|
||||
|
||||
def query(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None, database=None,
|
||||
ignore_error=False):
|
||||
return self.get_query_request(sql, stdin=stdin, timeout=timeout, settings=settings, user=user,
|
||||
password=password, database=database, ignore_error=ignore_error).get_answer()
|
||||
def query(self, sql,
|
||||
stdin=None,
|
||||
timeout=None,
|
||||
settings=None,
|
||||
user=None,
|
||||
password=None,
|
||||
database=None,
|
||||
ignore_error=False,
|
||||
query_id=None):
|
||||
return self.get_query_request(sql,
|
||||
stdin=stdin,
|
||||
timeout=timeout,
|
||||
settings=settings,
|
||||
user=user,
|
||||
password=password,
|
||||
database=database,
|
||||
ignore_error=ignore_error,
|
||||
query_id=query_id).get_answer()
|
||||
|
||||
def get_query_request(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None, database=None,
|
||||
ignore_error=False):
|
||||
def get_query_request(self, sql,
|
||||
stdin=None,
|
||||
timeout=None,
|
||||
settings=None,
|
||||
user=None,
|
||||
password=None,
|
||||
database=None,
|
||||
ignore_error=False,
|
||||
query_id=None):
|
||||
command = self.command[:]
|
||||
|
||||
if stdin is None:
|
||||
@ -44,6 +65,9 @@ class Client:
|
||||
if database is not None:
|
||||
command += ['--database', database]
|
||||
|
||||
if query_id is not None:
|
||||
command += ['--query_id', query_id]
|
||||
|
||||
return CommandRequest(command, stdin, timeout, ignore_error)
|
||||
|
||||
def query_and_get_error(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None,
|
||||
|
@ -1921,11 +1921,25 @@ class ClickHouseInstance:
|
||||
return self.is_built_with_sanitizer('memory')
|
||||
|
||||
# Connects to the instance via clickhouse-client, sends a query (1st argument) and returns the answer
|
||||
def query(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None, database=None,
|
||||
ignore_error=False):
|
||||
logging.debug(f"Executing query {sql} on {self.name}")
|
||||
return self.client.query(sql, stdin=stdin, timeout=timeout, settings=settings, user=user, password=password,
|
||||
database=database, ignore_error=ignore_error)
|
||||
def query(self, sql,
|
||||
stdin=None,
|
||||
timeout=None,
|
||||
settings=None,
|
||||
user=None,
|
||||
password=None,
|
||||
database=None,
|
||||
ignore_error=False,
|
||||
query_id=None):
|
||||
logging.debug("Executing query %s on %s", sql, self.name)
|
||||
return self.client.query(sql,
|
||||
stdin=stdin,
|
||||
timeout=timeout,
|
||||
settings=settings,
|
||||
user=user,
|
||||
password=password,
|
||||
database=database,
|
||||
ignore_error=ignore_error,
|
||||
query_id=query_id)
|
||||
|
||||
def query_with_retry(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None, database=None,
|
||||
ignore_error=False,
|
||||
|
@ -20,6 +20,8 @@
|
||||
<node>
|
||||
<host>n2</host>
|
||||
<port>9000</port>
|
||||
<!-- This is required only to -> replace it -> to force cluster reload -> to recreate connections -->
|
||||
<priority>1</priority>
|
||||
</node>
|
||||
</secure>
|
||||
</remote_servers>
|
||||
|
@ -3,6 +3,10 @@
|
||||
<profiles>
|
||||
<default>
|
||||
</default>
|
||||
|
||||
<ro>
|
||||
<readonly>1</readonly>
|
||||
</ro>
|
||||
</profiles>
|
||||
|
||||
<users>
|
||||
@ -32,6 +36,15 @@
|
||||
<profile>default</profile>
|
||||
<quota>default</quota>
|
||||
</pass>
|
||||
|
||||
<ro>
|
||||
<password></password>
|
||||
<networks>
|
||||
<ip>::/0</ip>
|
||||
</networks>
|
||||
<profile>ro</profile>
|
||||
<quota>default</quota>
|
||||
</ro>
|
||||
</users>
|
||||
|
||||
<quotas>
|
||||
|
@ -3,6 +3,8 @@
|
||||
# pylint: disable=line-too-long
|
||||
|
||||
import pytest
|
||||
import uuid
|
||||
import time
|
||||
|
||||
from helpers.client import QueryRuntimeException
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
@ -27,8 +29,10 @@ users = pytest.mark.parametrize('user,password', [
|
||||
def bootstrap():
|
||||
for n in list(cluster.instances.values()):
|
||||
n.query('DROP TABLE IF EXISTS data')
|
||||
n.query('DROP TABLE IF EXISTS data_from_buffer')
|
||||
n.query('DROP TABLE IF EXISTS dist')
|
||||
n.query('CREATE TABLE data (key Int) Engine=Memory()')
|
||||
n.query('CREATE TABLE data_from_buffer (key Int) Engine=Memory()')
|
||||
n.query("""
|
||||
CREATE TABLE dist_insecure AS data
|
||||
Engine=Distributed(insecure, currentDatabase(), data, key)
|
||||
@ -38,20 +42,24 @@ def bootstrap():
|
||||
Engine=Distributed(secure, currentDatabase(), data, key)
|
||||
""")
|
||||
n.query("""
|
||||
CREATE TABLE dist_secure_from_buffer AS data_from_buffer
|
||||
Engine=Distributed(secure, currentDatabase(), data_from_buffer, key)
|
||||
""")
|
||||
n.query("""
|
||||
CREATE TABLE dist_secure_disagree AS data
|
||||
Engine=Distributed(secure_disagree, currentDatabase(), data, key)
|
||||
""")
|
||||
n.query("""
|
||||
CREATE TABLE dist_secure_buffer AS dist_secure
|
||||
Engine=Buffer(currentDatabase(), dist_secure,
|
||||
CREATE TABLE dist_secure_buffer AS dist_secure_from_buffer
|
||||
Engine=Buffer(currentDatabase(), dist_secure_from_buffer,
|
||||
/* settings for manual flush only */
|
||||
1, /* num_layers */
|
||||
10e6, /* min_time, placeholder */
|
||||
10e6, /* max_time, placeholder */
|
||||
0, /* min_rows */
|
||||
10e6, /* max_rows */
|
||||
0, /* min_bytes */
|
||||
80e6 /* max_bytes */
|
||||
1, /* num_layers */
|
||||
0, /* min_time, placeholder */
|
||||
0, /* max_time, placeholder */
|
||||
0, /* min_rows */
|
||||
0, /* max_rows */
|
||||
0, /* min_bytes */
|
||||
0 /* max_bytes */
|
||||
)
|
||||
""")
|
||||
|
||||
@ -129,17 +137,62 @@ def test_secure_insert_sync():
|
||||
#
|
||||
# Buffer() flush happens with global context, that does not have user
|
||||
# And so Context::user/ClientInfo::current_user/ClientInfo::initial_user will be empty
|
||||
#
|
||||
# This is the regression test for the subsequent query that it
|
||||
# will not use user from the previous query.
|
||||
#
|
||||
# The test a little bit complex, but I will try to explain:
|
||||
# - first, we need to execute query with the readonly user (regualar SELECT),
|
||||
# and then we will execute INSERT, and if the bug is there, then INSERT will
|
||||
# use the user from SELECT and will fail (since you cannot do INSERT with
|
||||
# readonly=1/2)
|
||||
#
|
||||
# - the trick with generating random priority (via sed) is to avoid reusing
|
||||
# connection from n1 to n2 from another test (and we cannot simply use
|
||||
# another pool after ConnectionPoolFactory had been added [1].
|
||||
#
|
||||
# [1]: https://github.com/ClickHouse/ClickHouse/pull/26318
|
||||
#
|
||||
# We need at least one change in one of fields of the node/shard definition,
|
||||
# and this "priorirty" for us in this test.
|
||||
#
|
||||
# - after we will ensure that connection is really established from the context
|
||||
# of SELECT query, and that the connection will not be established from the
|
||||
# context of the INSERT query (but actually it is a no-op since the INSERT
|
||||
# will be done in background, due to insert_distributed_sync=false by
|
||||
# default)
|
||||
#
|
||||
# - if the bug is there, then FLUSH DISTRIBUTED will fail, because it will go
|
||||
# from n1 to n2 using previous user.
|
||||
#
|
||||
# I hope that this will clarify something for the reader.
|
||||
def test_secure_insert_buffer_async():
|
||||
n1.query("TRUNCATE TABLE data")
|
||||
n1.query('INSERT INTO dist_secure_buffer SELECT * FROM numbers(2)')
|
||||
n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure')
|
||||
# no Buffer flush happened
|
||||
assert int(n1.query('SELECT count() FROM dist_secure')) == 0
|
||||
# Change cluster definition so that the SELECT will always creates new connection
|
||||
priority = int(time.time())
|
||||
n1.exec_in_container(['bash', '-c', f'sed -i "s#<priority>.*</priority>#<priority>{priority}</priority>#" /etc/clickhouse-server/config.d/remote_servers.xml'])
|
||||
n1.query('SYSTEM RELOAD CONFIG')
|
||||
# ensure that SELECT creates new connection (we need separate table for
|
||||
# this, so that separate distributed pool will be used)
|
||||
query_id = uuid.uuid4().hex
|
||||
n1.query('SELECT * FROM dist_secure_from_buffer', user='ro', query_id=query_id)
|
||||
assert n1.contains_in_log('{' + query_id + '} <Trace> Connection (n2:9000): Connecting.')
|
||||
|
||||
query_id = uuid.uuid4().hex
|
||||
n1.query('INSERT INTO dist_secure_buffer SELECT * FROM numbers(2)', query_id=query_id)
|
||||
# ensure that INSERT does not creates new connection, so that it will use
|
||||
# previous connection that was instantiated with "ro" user (using
|
||||
# interserver secret)
|
||||
assert not n1.contains_in_log('{' + query_id + '} <Trace> Connection (n2:9000): Connecting.')
|
||||
|
||||
# And before the bug was fixed this query will fail with the following error:
|
||||
#
|
||||
# Code: 164. DB::Exception: Received from 172.16.2.5:9000. DB::Exception: There was an error on [n1:9000]: Code: 164. DB::Exception: Received from n2:9000. DB::Exception: ro: Cannot execute query in readonly mode. (READONLY)
|
||||
n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure_from_buffer')
|
||||
n1.query('OPTIMIZE TABLE dist_secure_buffer')
|
||||
# manual flush
|
||||
n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure')
|
||||
assert int(n1.query('SELECT count() FROM dist_secure')) == 2
|
||||
n1.query('TRUNCATE TABLE data ON CLUSTER secure')
|
||||
n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure_from_buffer')
|
||||
|
||||
assert int(n1.query('SELECT count() FROM dist_secure_from_buffer')) == 2
|
||||
n1.query('TRUNCATE TABLE data_from_buffer ON CLUSTER secure')
|
||||
|
||||
def test_secure_disagree():
|
||||
with pytest.raises(QueryRuntimeException, match='.*Hash mismatch.*'):
|
||||
@ -209,5 +262,3 @@ def test_per_user_protocol_settings_secure_cluster(user, password):
|
||||
'max_untracked_memory': 0,
|
||||
})
|
||||
assert int(get_query_setting_on_shard(n1, id_, 'max_memory_usage_for_user')) == int(1e9)
|
||||
|
||||
# TODO: check user for INSERT
|
||||
|
Loading…
Reference in New Issue
Block a user