mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Merge pull request #32694 from ClickHouse/add-test_s3_zero_copy_concurrent_merge
Add test_s3_zero_copy_concurrent_merge
This commit is contained in:
commit
a02da27c6f
@ -46,6 +46,20 @@ def wait_for_large_objects_count(cluster, expected, size=100, timeout=30):
|
||||
assert get_large_objects_count(cluster, size=size) == expected
|
||||
|
||||
|
||||
def wait_for_active_parts(node, num_expected_parts, table_name, timeout=30):
|
||||
deadline = time.monotonic() + timeout
|
||||
num_parts = 0
|
||||
while time.monotonic() < deadline:
|
||||
num_parts_str = node.query("select count() from system.parts where table = '{}' and active".format(table_name))
|
||||
num_parts = int(num_parts_str.strip())
|
||||
if num_parts == num_expected_parts:
|
||||
return
|
||||
|
||||
time.sleep(0.2)
|
||||
|
||||
assert num_parts == num_expected_parts
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"policy", ["s3"]
|
||||
)
|
||||
@ -248,3 +262,50 @@ def test_s3_zero_copy_with_ttl_delete(cluster, large_data, iterations):
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
|
||||
node2.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
|
||||
|
||||
|
||||
def test_s3_zero_copy_concurrent_merge(cluster):
|
||||
node1 = cluster.instances["node1"]
|
||||
node2 = cluster.instances["node2"]
|
||||
|
||||
node1.query("DROP TABLE IF EXISTS concurrent_merge NO DELAY")
|
||||
node2.query("DROP TABLE IF EXISTS concurrent_merge NO DELAY")
|
||||
|
||||
for node in (node1, node2):
|
||||
node.query(
|
||||
"""
|
||||
CREATE TABLE concurrent_merge (id UInt64)
|
||||
ENGINE=ReplicatedMergeTree('/clickhouse/tables/concurrent_merge', '{replica}')
|
||||
ORDER BY id
|
||||
SETTINGS index_granularity=2, storage_policy='s3', remote_fs_execute_merges_on_single_replica_time_threshold=1
|
||||
"""
|
||||
)
|
||||
|
||||
node1.query("system stop merges")
|
||||
node2.query("system stop merges")
|
||||
|
||||
# This will generate two parts with 20 granules each
|
||||
node1.query("insert into concurrent_merge select number from numbers(40)")
|
||||
node1.query("insert into concurrent_merge select number + 1 from numbers(40)")
|
||||
|
||||
wait_for_active_parts(node2, 2, 'concurrent_merge')
|
||||
|
||||
# Merge will materialize default column, it should sleep every granule and take 20 * 2 * 0.1 = 4 sec.
|
||||
node1.query("alter table concurrent_merge add column x UInt32 default sleep(0.1)")
|
||||
|
||||
node1.query("system start merges")
|
||||
node2.query("system start merges")
|
||||
|
||||
# Now, the merge should start.
|
||||
# Because of remote_fs_execute_merges_on_single_replica_time_threshold=1,
|
||||
# only one replica will start merge instantly.
|
||||
# The other replica should wait for 1 sec and also start it.
|
||||
# That should probably cause a data race at s3 storage.
|
||||
# For now, it does not happen (every blob has a random name, and we just have a duplicating data)
|
||||
node1.query("optimize table concurrent_merge final")
|
||||
|
||||
wait_for_active_parts(node1, 1, 'concurrent_merge')
|
||||
wait_for_active_parts(node2, 1, 'concurrent_merge')
|
||||
|
||||
for node in (node1, node2):
|
||||
assert node.query('select sum(id) from concurrent_merge').strip() == '1600'
|
||||
|
Loading…
Reference in New Issue
Block a user