# pylint: disable=unused-argument # pylint: disable=redefined-outer-name # pylint: disable=line-too-long # NOTES: # - timeout should not be reduced due to bit flip of the corrupted buffer import pytest from helpers.cluster import ClickHouseCluster from helpers.client import QueryRuntimeException cluster = ClickHouseCluster(__file__) # n1 -- distributed_directory_monitor_batch_inserts=1 n1 = cluster.add_instance('n1', main_configs=['configs/remote_servers.xml'], user_configs=['configs/users.d/batch.xml']) # n2 -- distributed_directory_monitor_batch_inserts=0 n2 = cluster.add_instance('n2', main_configs=['configs/remote_servers.xml'], user_configs=['configs/users.d/no_batch.xml']) batch_params = pytest.mark.parametrize('batch', [ (1), (0), ]) @pytest.fixture(scope='module', autouse=True) def start_cluster(): try: cluster.start() yield cluster finally: cluster.shutdown() def create_tables(remote_cluster_name): for _, instance in list(cluster.instances.items()): instance.query('CREATE TABLE data (key Int, value String) Engine=MergeTree() ORDER BY key') instance.query(f""" CREATE TABLE dist AS data Engine=Distributed( {remote_cluster_name}, currentDatabase(), data, key ) """) # only via SYSTEM FLUSH DISTRIBUTED instance.query('SYSTEM STOP DISTRIBUTED SENDS dist') def drop_tables(): for _, instance in list(cluster.instances.items()): instance.query('DROP TABLE IF EXISTS data') instance.query('DROP TABLE IF EXISTS dist') # return amount of bytes of the 2.bin for n2 shard def insert_data(node): node.query('INSERT INTO dist SELECT number, randomPrintableASCII(100) FROM numbers(10000)', settings={ # do not do direct INSERT, always via SYSTEM FLUSH DISTRIBUTED 'prefer_localhost_replica': 0, }) path = get_path_to_dist_batch() size = int(node.exec_in_container(['bash', '-c', f'wc -c < {path}'])) assert size > 1<<16 return size def get_node(batch): if batch: return n1 return n2 def bootstrap(batch): drop_tables() create_tables('insert_distributed_async_send_cluster_two_replicas') return insert_data(get_node(batch)) def get_path_to_dist_batch(file='2.bin'): # There are: # - /var/lib/clickhouse/data/default/dist/shard1_replica1/1.bin # - /var/lib/clickhouse/data/default/dist/shard1_replica2/2.bin # # @return the file for the n2 shard return f'/var/lib/clickhouse/data/default/dist/shard1_replica2/{file}' def check_dist_after_corruption(truncate, batch): node = get_node(batch) if batch: # In batch mode errors are ignored node.query('SYSTEM FLUSH DISTRIBUTED dist') else: if truncate: with pytest.raises(QueryRuntimeException, match="Cannot read all data. Bytes read:"): node.query('SYSTEM FLUSH DISTRIBUTED dist') else: with pytest.raises(QueryRuntimeException, match="Checksum doesn't match: corrupted data. Reference:"): node.query('SYSTEM FLUSH DISTRIBUTED dist') # send pending files # (since we have two nodes and corrupt file for only one of them) node.query('SYSTEM FLUSH DISTRIBUTED dist') # but there is broken file broken = get_path_to_dist_batch('broken') node.exec_in_container(['bash', '-c', f'ls {broken}/2.bin']) assert int(n1.query('SELECT count() FROM data')) == 10000 assert int(n2.query('SELECT count() FROM data')) == 0 @batch_params def test_insert_distributed_async_send_success(batch): bootstrap(batch) node = get_node(batch) node.query('SYSTEM FLUSH DISTRIBUTED dist') assert int(n1.query('SELECT count() FROM data')) == 10000 assert int(n2.query('SELECT count() FROM data')) == 10000 @batch_params def test_insert_distributed_async_send_truncated_1(batch): size = bootstrap(batch) path = get_path_to_dist_batch() node = get_node(batch) new_size = size - 10 # we cannot use truncate, due to hardlinks node.exec_in_container(['bash', '-c', f'mv {path} /tmp/bin && head -c {new_size} /tmp/bin > {path}']) check_dist_after_corruption(True, batch) @batch_params def test_insert_distributed_async_send_truncated_2(batch): bootstrap(batch) path = get_path_to_dist_batch() node = get_node(batch) # we cannot use truncate, due to hardlinks node.exec_in_container(['bash', '-c', f'mv {path} /tmp/bin && head -c 10000 /tmp/bin > {path}']) check_dist_after_corruption(True, batch) # The difference from the test_insert_distributed_async_send_corrupted_small # is that small corruption will be seen only on local node @batch_params def test_insert_distributed_async_send_corrupted_big(batch): size = bootstrap(batch) path = get_path_to_dist_batch() node = get_node(batch) from_original_size = size - 8192 zeros_size = 8192 node.exec_in_container(['bash', '-c', f'mv {path} /tmp/bin && head -c {from_original_size} /tmp/bin > {path} && head -c {zeros_size} /dev/zero >> {path}']) check_dist_after_corruption(False, batch) @batch_params def test_insert_distributed_async_send_corrupted_small(batch): size = bootstrap(batch) path = get_path_to_dist_batch() node = get_node(batch) from_original_size = size - 60 zeros_size = 60 node.exec_in_container(['bash', '-c', f'mv {path} /tmp/bin && head -c {from_original_size} /tmp/bin > {path} && head -c {zeros_size} /dev/zero >> {path}']) check_dist_after_corruption(False, batch) @batch_params def test_insert_distributed_async_send_different_header(batch): """ Check INSERT Into Distributed() with different headers in *.bin If batching will not distinguish headers underlying table will never receive the data. """ drop_tables() create_tables('insert_distributed_async_send_cluster_two_shards') node = get_node(batch) node.query("INSERT INTO dist VALUES (0, '')", settings={ 'prefer_localhost_replica': 0, }) node.query('ALTER TABLE dist MODIFY COLUMN value Nullable(String)') node.query("INSERT INTO dist VALUES (2, '')", settings={ 'prefer_localhost_replica': 0, }) if batch: # first batch with Nullable(String) n1.query('ALTER TABLE data MODIFY COLUMN value Nullable(String)', settings={ 'mutations_sync': 1, }) # but only one batch will be sent with pytest.raises(QueryRuntimeException, match=r"DB::Exception: Cannot convert: String to Nullable\(String\)\. Stack trace:"): node.query('SYSTEM FLUSH DISTRIBUTED dist') assert int(n1.query('SELECT count() FROM data')) == 1 # second batch with String n1.query('ALTER TABLE data MODIFY COLUMN value String', settings={ 'mutations_sync': 1, }) node.query('SYSTEM FLUSH DISTRIBUTED dist') assert int(n1.query('SELECT count() FROM data')) == 2 else: # first send with String with pytest.raises(QueryRuntimeException, match=r"DB::Exception: Cannot convert: Nullable\(String\) to String\. Stack trace:"): node.query('SYSTEM FLUSH DISTRIBUTED dist') assert int(n1.query('SELECT count() FROM data')) == 1 # second send with Nullable(String) n1.query('ALTER TABLE data MODIFY COLUMN value Nullable(String)', settings={ 'mutations_sync': 1, }) node.query('SYSTEM FLUSH DISTRIBUTED dist') assert int(n1.query('SELECT count() FROM data')) == 2 assert int(n2.query('SELECT count() FROM data')) == 0