ClickHouse/tests/integration/test_distributed_inter_server_secret/test.py
Azat Khuzhin 0159c74f21 Secure inter-cluster query execution (with initial_user as current query user) [v3]
Add inter-server cluster secret, it is used for Distributed queries
inside cluster, you can configure in the configuration file:

  <remote_servers>
      <logs>
          <shard>
              <secret>foobar</secret> <!-- empty -- works as before -->
              ...
          </shard>
      </logs>
  </remote_servers>

And this will allow clickhouse to make sure that the query was not
faked, and was issued from the node that knows the secret. And since
trust appeared it can use initial_user for query execution, this will
apply correct *_for_user (since with inter-server secret enabled, the
query will be executed from the same user on the shards as on initator,
unlike "default" user w/o it).

v2: Change user to the initial_user for Distributed queries if secret match
v3: Add Protocol::Cluster package
v4: Drop Protocol::Cluster and use plain Protocol::Hello + user marker
v5: Do not use user from Hello for cluster-secure (superfluous)
2020-09-15 01:36:28 +03:00

153 lines
5.7 KiB
Python

# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
# pylint: disable=line-too-long
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
def make_instance(name, cfg):
return cluster.add_instance(name,
with_zookeeper=True,
main_configs=['configs/remote_servers.xml', cfg],
user_configs=['configs/users.xml'])
# _n1/_n2 contains cluster with different <secret> -- should fail
n1 = make_instance('n1', 'configs/remote_servers_n1.xml')
n2 = make_instance('n2', 'configs/remote_servers_n2.xml')
users = pytest.mark.parametrize('user,password', [
('default', '' ),
('nopass', '' ),
('pass', 'foo'),
])
def bootstrap():
for n in cluster.instances.values():
n.query('DROP TABLE IF EXISTS data')
n.query('DROP TABLE IF EXISTS dist')
n.query('CREATE TABLE data (key Int) Engine=Memory()')
n.query("""
CREATE TABLE dist_insecure AS data
Engine=Distributed(insecure, currentDatabase(), data, key)
""")
n.query("""
CREATE TABLE dist_secure AS data
Engine=Distributed(secure, currentDatabase(), data, key)
""")
n.query("""
CREATE TABLE dist_secure_disagree AS data
Engine=Distributed(secure_disagree, currentDatabase(), data, key)
""")
n.query("""
CREATE TABLE dist_secure_buffer AS dist_secure
Engine=Buffer(currentDatabase(), dist_secure,
/* settings for manual flush only */
1, /* num_layers */
10e6, /* min_time, placeholder */
10e6, /* max_time, placeholder */
0, /* min_rows */
10e6, /* max_rows */
0, /* min_bytes */
80e6 /* max_bytes */
)
""")
@pytest.fixture(scope='module', autouse=True)
def start_cluster():
try:
cluster.start()
bootstrap()
yield cluster
finally:
cluster.shutdown()
def query_with_id(node, id_, query, **kwargs):
return node.query("WITH '{}' AS __id {}".format(id_, query), **kwargs)
# @return -- [user, initial_user]
def get_query_user_info(node, query_pattern):
node.query("SYSTEM FLUSH LOGS")
return node.query("""
SELECT user, initial_user
FROM system.query_log
WHERE
query LIKE '%{}%' AND
query NOT LIKE '%system.query_log%' AND
type = 'QueryFinish'
""".format(query_pattern)).strip().split('\t')
def test_insecure():
n1.query('SELECT * FROM dist_insecure')
def test_insecure_insert_async():
n1.query('INSERT INTO dist_insecure SELECT * FROM numbers(2)')
n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER insecure dist_insecure')
assert int(n1.query('SELECT count() FROM dist_insecure')) == 2
n1.query('TRUNCATE TABLE data ON CLUSTER insecure')
def test_insecure_insert_sync():
n1.query('INSERT INTO dist_insecure SELECT * FROM numbers(2)', settings={'insert_distributed_sync': 1})
assert int(n1.query('SELECT count() FROM dist_insecure')) == 2
n1.query('TRUNCATE TABLE data ON CLUSTER secure')
def test_secure():
n1.query('SELECT * FROM dist_secure')
def test_secure_insert_async():
n1.query('INSERT INTO dist_secure SELECT * FROM numbers(2)')
n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure')
assert int(n1.query('SELECT count() FROM dist_secure')) == 2
n1.query('TRUNCATE TABLE data ON CLUSTER secure')
def test_secure_insert_sync():
n1.query('INSERT INTO dist_secure SELECT * FROM numbers(2)', settings={'insert_distributed_sync': 1})
assert int(n1.query('SELECT count() FROM dist_secure')) == 2
n1.query('TRUNCATE TABLE data ON CLUSTER secure')
# INSERT w/o initial_user
#
# Buffer() flush happens with global context, that does not have user
# And so Context::user/ClientInfo::current_user/ClientInfo::initial_user will be empty
def test_secure_insert_buffer_async():
n1.query('INSERT INTO dist_secure_buffer SELECT * FROM numbers(2)')
n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure')
# no Buffer flush happened
assert int(n1.query('SELECT count() FROM dist_secure')) == 0
n1.query('OPTIMIZE TABLE dist_secure_buffer')
# manual flush
n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure')
assert int(n1.query('SELECT count() FROM dist_secure')) == 2
n1.query('TRUNCATE TABLE data ON CLUSTER secure')
def test_secure_disagree():
with pytest.raises(QueryRuntimeException, match='.*Hash mismatch.*'):
n1.query('SELECT * FROM dist_secure_disagree')
def test_secure_disagree_insert():
n1.query('INSERT INTO dist_secure_disagree SELECT * FROM numbers(2)')
with pytest.raises(QueryRuntimeException, match='.*Hash mismatch.*'):
n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure_disagree dist_secure_disagree')
# check the the connection will be re-established
# IOW that we will not get "Unknown BlockInfo field"
with pytest.raises(QueryRuntimeException, match='.*Hash mismatch.*'):
assert int(n1.query('SELECT count() FROM dist_secure_disagree')) == 0
@users
def test_user_insecure_cluster(user, password):
id_ = 'query-dist_insecure-' + user
query_with_id(n1, id_, 'SELECT * FROM dist_insecure', user=user, password=password)
assert get_query_user_info(n1, id_) == [user, user] # due to prefer_localhost_replica
assert get_query_user_info(n2, id_) == ['default', user]
@users
def test_user_secure_cluster(user, password):
id_ = 'query-dist_secure-' + user
query_with_id(n1, id_, 'SELECT * FROM dist_secure', user=user, password=password)
assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(n2, id_) == [user, user]
# TODO: check user for INSERT