Merge pull request #34263 from azat/interserver-mode-fix

Fix current_user/current_address for interserver mode
This commit is contained in:
alexey-milovidov 2022-02-03 05:28:56 +03:00 committed by GitHub
commit 14811a357e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 0 deletions

View File

@ -1392,6 +1392,12 @@ void TCPHandler::receiveQuery()
if (is_interserver_mode)
{
ClientInfo original_session_client_info = session->getClientInfo();
/// Cleanup fields that should not be reused from previous query.
original_session_client_info.current_user.clear();
original_session_client_info.current_query_id.clear();
original_session_client_info.current_address = {};
session = std::make_unique<Session>(server.context(), ClientInfo::Interface::TCP_INTERSERVER);
session->getClientInfo() = original_session_client_info;
}

View File

@ -87,6 +87,17 @@ def get_query_user_info(node, query_pattern):
type = 'QueryFinish'
""".format(query_pattern)).strip().split('\t')
# @return -- [user, initial_user]
def get_query_user_info_by_id(node, query_id):
node.query("SYSTEM FLUSH LOGS")
return node.query("""
SELECT user, initial_user
FROM system.query_log
WHERE
query_id = '{}' AND
type = 'QueryFinish'
""".format(query_id)).strip().split('\t')
# @return -- settings
def get_query_setting_on_shard(node, query_pattern, setting):
node.query("SYSTEM FLUSH LOGS")
@ -183,6 +194,7 @@ def test_secure_insert_buffer_async():
# previous connection that was instantiated with "ro" user (using
# interserver secret)
assert not n1.contains_in_log('{' + query_id + '} <Trace> Connection (n2:9000): Connecting.')
assert get_query_user_info_by_id(n1, query_id) == ['default', 'default']
# And before the bug was fixed this query will fail with the following error:
#
@ -191,6 +203,18 @@ def test_secure_insert_buffer_async():
n1.query('OPTIMIZE TABLE dist_secure_buffer')
n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure_from_buffer')
# Check user from which the INSERT on the remote node will be executed
#
# Incorrect example:
#
# {2c55669f-71ad-48fe-98fa-7b475b80718e} <Debug> executeQuery: (from 172.16.1.1:44636, user: ro) INSERT INTO default.data_from_buffer (key) VALUES
#
# Correct example:
#
# {2c55669f-71ad-48fe-98fa-7b475b80718e} <Debug> executeQuery: (from 0.0.0.0:0, user: ) INSERT INTO default.data_from_buffer (key) VALUES
#
assert n2.contains_in_log('executeQuery: (from 0.0.0.0:0, user: ) INSERT INTO default.data_from_buffer (key) VALUES')
assert int(n1.query('SELECT count() FROM dist_secure_from_buffer')) == 2
n1.query('TRUNCATE TABLE data_from_buffer ON CLUSTER secure')