# pylint: disable=unused-argument # pylint: disable=redefined-outer-name # pylint: disable=line-too-long # NOTES: # - timeout should not be reduced due to bit flip of the corrupted buffer import pytest from helpers.cluster import ClickHouseCluster from helpers.client import QueryRuntimeException cluster = ClickHouseCluster(__file__) # n1 -- distributed_directory_monitor_batch_inserts=1 n1 = cluster.add_instance('n1', main_configs=['configs/remote_servers.xml'], user_configs=['configs/users.d/batch.xml']) # n2 -- distributed_directory_monitor_batch_inserts=0 n2 = cluster.add_instance('n2', main_configs=['configs/remote_servers.xml'], user_configs=['configs/users.d/no_batch.xml']) # n3 -- distributed_directory_monitor_batch_inserts=1/distributed_directory_monitor_split_batch_on_failure=1 n3 = cluster.add_instance('n3', main_configs=['configs/remote_servers_split.xml'], user_configs=[ 'configs/users.d/batch.xml', 'configs/users.d/split.xml', ]) # n4 -- distributed_directory_monitor_batch_inserts=0/distributed_directory_monitor_split_batch_on_failure=1 n4 = cluster.add_instance('n4', main_configs=['configs/remote_servers_split.xml'], user_configs=[ 'configs/users.d/no_batch.xml', 'configs/users.d/split.xml', ]) batch_params = pytest.mark.parametrize('batch', [ (1), (0), ]) batch_and_split_params = pytest.mark.parametrize('batch,split', [ (1, 0), (0, 0), (1, 1), (0, 1), ]) @pytest.fixture(scope='module', autouse=True) def start_cluster(): try: cluster.start() yield cluster finally: cluster.shutdown() def create_tables(remote_cluster_name): for _, instance in list(cluster.instances.items()): instance.query('CREATE TABLE data (key Int, value String) Engine=MergeTree() ORDER BY key') instance.query(f""" CREATE TABLE dist AS data Engine=Distributed( {remote_cluster_name}, currentDatabase(), data, key ) """) # only via SYSTEM FLUSH DISTRIBUTED instance.query('SYSTEM STOP DISTRIBUTED SENDS dist') def drop_tables(): for _, instance in list(cluster.instances.items()): instance.query('DROP TABLE IF EXISTS data') instance.query('DROP TABLE IF EXISTS dist') # return amount of bytes of the 2.bin for n2 shard def insert_data(node): node.query('INSERT INTO dist SELECT number, randomPrintableASCII(100) FROM numbers(10000)', settings={ # do not do direct INSERT, always via SYSTEM FLUSH DISTRIBUTED 'prefer_localhost_replica': 0, }) path = get_path_to_dist_batch() size = int(node.exec_in_container(['bash', '-c', f'wc -c < {path}'])) assert size > 1<<16 return size def get_node(batch, split=None): if split: if batch: return n3 return n4 if batch: return n1 return n2 def bootstrap(batch, split=None): drop_tables() create_tables('insert_distributed_async_send_cluster_two_replicas') return insert_data(get_node(batch, split)) def get_path_to_dist_batch(file='2.bin'): # There are: # - /var/lib/clickhouse/data/default/dist/shard1_replica1/1.bin # - /var/lib/clickhouse/data/default/dist/shard1_replica2/2.bin # # @return the file for the n2 shard return f'/var/lib/clickhouse/data/default/dist/shard1_replica2/{file}' def check_dist_after_corruption(truncate, batch, split=None): node = get_node(batch, split) if batch: # In batch mode errors are ignored node.query('SYSTEM FLUSH DISTRIBUTED dist') else: if truncate: with pytest.raises(QueryRuntimeException, match="Cannot read all data. Bytes read:"): node.query('SYSTEM FLUSH DISTRIBUTED dist') else: with pytest.raises(QueryRuntimeException, match="Checksum doesn't match: corrupted data. Reference:"): node.query('SYSTEM FLUSH DISTRIBUTED dist') # send pending files # (since we have two nodes and corrupt file for only one of them) node.query('SYSTEM FLUSH DISTRIBUTED dist') # but there is broken file broken = get_path_to_dist_batch('broken') node.exec_in_container(['bash', '-c', f'ls {broken}/2.bin']) if split: assert int(n3.query('SELECT count() FROM data')) == 10000 assert int(n4.query('SELECT count() FROM data')) == 0 else: assert int(n1.query('SELECT count() FROM data')) == 10000 assert int(n2.query('SELECT count() FROM data')) == 0 @batch_params def test_insert_distributed_async_send_success(batch): bootstrap(batch) node = get_node(batch) node.query('SYSTEM FLUSH DISTRIBUTED dist') assert int(n1.query('SELECT count() FROM data')) == 10000 assert int(n2.query('SELECT count() FROM data')) == 10000 @batch_and_split_params def test_insert_distributed_async_send_truncated_1(batch, split): size = bootstrap(batch, split) path = get_path_to_dist_batch() node = get_node(batch, split) new_size = size - 10 # we cannot use truncate, due to hardlinks node.exec_in_container(['bash', '-c', f'mv {path} /tmp/bin && head -c {new_size} /tmp/bin > {path}']) check_dist_after_corruption(True, batch, split) @batch_params def test_insert_distributed_async_send_truncated_2(batch): bootstrap(batch) path = get_path_to_dist_batch() node = get_node(batch) # we cannot use truncate, due to hardlinks node.exec_in_container(['bash', '-c', f'mv {path} /tmp/bin && head -c 10000 /tmp/bin > {path}']) check_dist_after_corruption(True, batch) # The difference from the test_insert_distributed_async_send_corrupted_small # is that small corruption will be seen only on local node @batch_params def test_insert_distributed_async_send_corrupted_big(batch): size = bootstrap(batch) path = get_path_to_dist_batch() node = get_node(batch) from_original_size = size - 8192 zeros_size = 8192 node.exec_in_container(['bash', '-c', f'mv {path} /tmp/bin && head -c {from_original_size} /tmp/bin > {path} && head -c {zeros_size} /dev/zero >> {path}']) check_dist_after_corruption(False, batch) @batch_params def test_insert_distributed_async_send_corrupted_small(batch): size = bootstrap(batch) path = get_path_to_dist_batch() node = get_node(batch) from_original_size = size - 60 zeros_size = 60 node.exec_in_container(['bash', '-c', f'mv {path} /tmp/bin && head -c {from_original_size} /tmp/bin > {path} && head -c {zeros_size} /dev/zero >> {path}']) check_dist_after_corruption(False, batch) @batch_params def test_insert_distributed_async_send_different_header(batch): """ Check INSERT Into Distributed() with different headers in *.bin If batching will not distinguish headers underlying table will never receive the data. """ drop_tables() create_tables('insert_distributed_async_send_cluster_two_shards') node = get_node(batch) node.query("INSERT INTO dist VALUES (0, 'f')", settings={ 'prefer_localhost_replica': 0, }) node.query('ALTER TABLE dist MODIFY COLUMN value UInt64') node.query("INSERT INTO dist VALUES (2, 1)", settings={ 'prefer_localhost_replica': 0, }) n1.query('ALTER TABLE data MODIFY COLUMN value UInt64', settings={ 'mutations_sync': 1, }) if batch: # but only one batch will be sent, and first is with UInt64 column, so # one rows inserted, and for string ('f') exception will be throw. with pytest.raises(QueryRuntimeException, match=r"DB::Exception: Cannot parse string 'f' as UInt64: syntax error at begin of string"): node.query('SYSTEM FLUSH DISTRIBUTED dist') assert int(n1.query('SELECT count() FROM data')) == 1 # but once underlying column String, implicit conversion will do the # thing, and insert left batch. n1.query(""" DROP TABLE data SYNC; CREATE TABLE data (key Int, value String) Engine=MergeTree() ORDER BY key; """) node.query('SYSTEM FLUSH DISTRIBUTED dist') assert int(n1.query('SELECT count() FROM data')) == 1 else: # first send with String ('f'), so zero rows will be inserted with pytest.raises(QueryRuntimeException, match=r"DB::Exception: Cannot parse string 'f' as UInt64: syntax error at begin of string"): node.query('SYSTEM FLUSH DISTRIBUTED dist') assert int(n1.query('SELECT count() FROM data')) == 0 # but once underlying column String, implicit conversion will do the # thing, and insert 2 rows (mixed UInt64 and String). n1.query(""" DROP TABLE data SYNC; CREATE TABLE data (key Int, value String) Engine=MergeTree() ORDER BY key; """) node.query('SYSTEM FLUSH DISTRIBUTED dist') assert int(n1.query('SELECT count() FROM data')) == 2 assert int(n2.query('SELECT count() FROM data')) == 0