# This test is a subset of the 01223_dist_on_dist. # (just in case, with real separate instances). import pytest from helpers.cluster import ClickHouseCluster cluster = ClickHouseCluster(__file__) NODES = { "node" + str(i): cluster.add_instance( "node" + str(i), main_configs=["configs/remote_servers.xml"], user_configs=["configs/set_distributed_defaults.xml"], ) for i in (1, 2) } CREATE_TABLES_SQL = """ CREATE TABLE base_table( node String, key Int32, value Int32 ) ENGINE = Memory; CREATE TABLE distributed_table AS base_table ENGINE = Distributed(test_cluster, default, base_table); CREATE TABLE distributed_over_distributed_table AS distributed_table ENGINE = Distributed('test_cluster', default, distributed_table); """ INSERT_SQL_TEMPLATE = "INSERT INTO base_table VALUES ('{node_id}', {key}, {value})" @pytest.fixture(scope="session") def started_cluster(): try: cluster.start() for node_index, (node_name, node) in enumerate(NODES.items()): node.query(CREATE_TABLES_SQL) for i in range(0, 2): node.query( INSERT_SQL_TEMPLATE.format( node_id=node_name, key=i, value=i + (node_index * 10) ) ) yield cluster finally: cluster.shutdown() @pytest.mark.parametrize( "node,source", [ pytest.param( NODES["node1"], "distributed_over_distributed_table", id="dod_node1" ), pytest.param( NODES["node1"], "cluster('test_cluster', default, distributed_table)", id="cluster_node1", ), pytest.param( NODES["node2"], "distributed_over_distributed_table", id="dod_node2" ), pytest.param( NODES["node2"], "cluster('test_cluster', default, distributed_table)", id="cluster_node2", ), ], ) class TestDistributedOverDistributedSuite: def test_select_with_order_by_node(self, started_cluster, node, source): assert ( node.query( "SELECT * FROM {source} ORDER BY node, key".format(source=source) ) == """node1 0 0 node1 0 0 node1 1 1 node1 1 1 node2 0 10 node2 0 10 node2 1 11 node2 1 11 """ ) def test_select_with_order_by_key(self, started_cluster, node, source): assert ( node.query( "SELECT * FROM {source} ORDER BY key, node".format(source=source) ) == """node1 0 0 node1 0 0 node2 0 10 node2 0 10 node1 1 1 node1 1 1 node2 1 11 node2 1 11 """ ) def test_select_with_group_by_node(self, started_cluster, node, source): assert ( node.query( "SELECT node, SUM(value) FROM {source} GROUP BY node ORDER BY node".format( source=source ) ) == "node1 2\nnode2 42\n" ) def test_select_with_group_by_key(self, started_cluster, node, source): assert ( node.query( "SELECT key, SUM(value) FROM {source} GROUP BY key ORDER BY key".format( source=source ) ) == "0 20\n1 24\n" ) def test_select_sum(self, started_cluster, node, source): assert ( node.query("SELECT SUM(value) FROM {source}".format(source=source)) == "44\n" )