mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-15 19:02:04 +00:00
237 lines
7.5 KiB
Python
237 lines
7.5 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import logging
|
|
import random
|
|
import string
|
|
import time
|
|
|
|
from multiprocessing.dummy import Pool
|
|
import pytest
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
cluster.add_instance(
|
|
"node1",
|
|
main_configs=["configs/storage_conf.xml"],
|
|
user_configs=["configs/users.xml"],
|
|
with_minio=True,
|
|
with_zookeeper=True,
|
|
)
|
|
cluster.add_instance(
|
|
"node2",
|
|
main_configs=["configs/storage_conf.xml"],
|
|
user_configs=["configs/users.xml"],
|
|
with_minio=True,
|
|
with_zookeeper=True,
|
|
)
|
|
cluster.start()
|
|
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_fetch_correct_volume(started_cluster):
|
|
node1 = cluster.instances["node1"]
|
|
node2 = cluster.instances["node2"]
|
|
|
|
node1.query(
|
|
"""
|
|
CREATE TABLE test1 (EventDate Date, CounterID UInt32)
|
|
ENGINE = ReplicatedMergeTree('/clickhouse-tables/test1', 'r1')
|
|
PARTITION BY toMonday(EventDate)
|
|
ORDER BY (CounterID, EventDate)
|
|
SETTINGS index_granularity = 8192, storage_policy = 's3', temporary_directories_lifetime=1"""
|
|
)
|
|
|
|
node1.query(
|
|
"INSERT INTO test1 SELECT toDate('2023-01-01') + toIntervalDay(number), number + 1000 from system.numbers limit 20"
|
|
)
|
|
|
|
def get_part_to_disk(query_result):
|
|
part_to_disk = {}
|
|
for row in query_result.strip().split("\n"):
|
|
print(row)
|
|
disk, part = row.split("\t")
|
|
part_to_disk[part] = disk
|
|
return part_to_disk
|
|
|
|
part_to_disk = get_part_to_disk(
|
|
node1.query(
|
|
"SELECT disk_name, name FROM system.parts where table = 'test1' and active"
|
|
)
|
|
)
|
|
for disk in part_to_disk.values():
|
|
assert disk == "default"
|
|
|
|
node1.query("ALTER TABLE test1 MOVE PARTITION '2022-12-26' TO DISK 's3'")
|
|
node1.query("ALTER TABLE test1 MOVE PARTITION '2023-01-02' TO DISK 's3'")
|
|
node1.query("ALTER TABLE test1 MOVE PARTITION '2023-01-09' TO DISK 's3'")
|
|
|
|
part_to_disk = get_part_to_disk(
|
|
node1.query(
|
|
"SELECT disk_name, name FROM system.parts where table = 'test1' and active"
|
|
)
|
|
)
|
|
assert part_to_disk["20221226_0_0_0"] == "s3"
|
|
assert part_to_disk["20230102_0_0_0"] == "s3"
|
|
assert part_to_disk["20230109_0_0_0"] == "s3"
|
|
assert part_to_disk["20230116_0_0_0"] == "default"
|
|
|
|
node2.query(
|
|
"""
|
|
CREATE TABLE test1 (EventDate Date, CounterID UInt32)
|
|
ENGINE = ReplicatedMergeTree('/clickhouse-tables/test1', 'r2')
|
|
PARTITION BY toMonday(EventDate)
|
|
ORDER BY (CounterID, EventDate)
|
|
SETTINGS index_granularity = 8192, storage_policy = 's3'"""
|
|
)
|
|
|
|
node2.query("SYSTEM SYNC REPLICA test1")
|
|
|
|
part_to_disk = get_part_to_disk(
|
|
node2.query(
|
|
"SELECT disk_name, name FROM system.parts where table = 'test1' and active"
|
|
)
|
|
)
|
|
assert part_to_disk["20221226_0_0_0"] == "s3"
|
|
assert part_to_disk["20230102_0_0_0"] == "s3"
|
|
assert part_to_disk["20230109_0_0_0"] == "s3"
|
|
assert part_to_disk["20230116_0_0_0"] == "default"
|
|
|
|
|
|
def test_concurrent_move_to_s3(started_cluster):
|
|
node1 = cluster.instances["node1"]
|
|
node2 = cluster.instances["node2"]
|
|
|
|
node1.query(
|
|
"""
|
|
CREATE TABLE test_concurrent_move (EventDate Date, CounterID UInt32)
|
|
ENGINE = ReplicatedMergeTree('/clickhouse-tables/test_concurrent_move', 'r1')
|
|
PARTITION BY CounterID
|
|
ORDER BY (CounterID, EventDate)
|
|
SETTINGS index_granularity = 8192, storage_policy = 's3'"""
|
|
)
|
|
|
|
node2.query(
|
|
"""
|
|
CREATE TABLE test_concurrent_move (EventDate Date, CounterID UInt32)
|
|
ENGINE = ReplicatedMergeTree('/clickhouse-tables/test_concurrent_move', 'r2')
|
|
PARTITION BY CounterID
|
|
ORDER BY (CounterID, EventDate)
|
|
SETTINGS index_granularity = 8192, storage_policy = 's3'"""
|
|
)
|
|
partitions = range(10)
|
|
|
|
for i in partitions:
|
|
node1.query(
|
|
f"INSERT INTO test_concurrent_move SELECT toDate('2023-01-01') + toIntervalDay(number), {i} from system.numbers limit 20"
|
|
)
|
|
node1.query(
|
|
f"INSERT INTO test_concurrent_move SELECT toDate('2023-01-01') + toIntervalDay(number) + rand(), {i} from system.numbers limit 20"
|
|
)
|
|
node1.query(
|
|
f"INSERT INTO test_concurrent_move SELECT toDate('2023-01-01') + toIntervalDay(number) + rand(), {i} from system.numbers limit 20"
|
|
)
|
|
node1.query(
|
|
f"INSERT INTO test_concurrent_move SELECT toDate('2023-01-01') + toIntervalDay(number) + rand(), {i} from system.numbers limit 20"
|
|
)
|
|
|
|
node2.query("SYSTEM SYNC REPLICA test_concurrent_move")
|
|
|
|
# check that we can move parts concurrently without exceptions
|
|
p = Pool(3)
|
|
for i in partitions:
|
|
|
|
def move_partition_to_s3(node):
|
|
node.query(
|
|
f"ALTER TABLE test_concurrent_move MOVE PARTITION '{i}' TO DISK 's3'"
|
|
)
|
|
|
|
j1 = p.apply_async(move_partition_to_s3, (node1,))
|
|
j2 = p.apply_async(move_partition_to_s3, (node2,))
|
|
j1.get()
|
|
j2.get()
|
|
|
|
def get_part_to_disk(query_result):
|
|
part_to_disk = {}
|
|
for row in query_result.strip().split("\n"):
|
|
disk, part = row.split("\t")
|
|
part_to_disk[part] = disk
|
|
return part_to_disk
|
|
|
|
part_to_disk = get_part_to_disk(
|
|
node1.query(
|
|
"SELECT disk_name, name FROM system.parts where table = 'test_concurrent_move' and active"
|
|
)
|
|
)
|
|
|
|
assert all([value == "s3" for value in part_to_disk.values()])
|
|
|
|
part_to_disk = get_part_to_disk(
|
|
node2.query(
|
|
"SELECT disk_name, name FROM system.parts where table = 'test_concurrent_move' and active"
|
|
)
|
|
)
|
|
assert all([value == "s3" for value in part_to_disk.values()])
|
|
|
|
|
|
def test_zero_copy_mutation(started_cluster):
|
|
node1 = cluster.instances["node1"]
|
|
node2 = cluster.instances["node2"]
|
|
|
|
node1.query(
|
|
"""
|
|
CREATE TABLE test_zero_copy_mutation (EventDate Date, CounterID UInt32)
|
|
ENGINE = ReplicatedMergeTree('/clickhouse-tables/test_zero_copy_mutation', 'r1')
|
|
ORDER BY (CounterID, EventDate)
|
|
SETTINGS index_granularity = 8192, storage_policy = 's3_only'"""
|
|
)
|
|
|
|
node2.query(
|
|
"""
|
|
CREATE TABLE test_zero_copy_mutation (EventDate Date, CounterID UInt32)
|
|
ENGINE = ReplicatedMergeTree('/clickhouse-tables/test_zero_copy_mutation', 'r2')
|
|
ORDER BY (CounterID, EventDate)
|
|
SETTINGS index_granularity = 8192, storage_policy = 's3_only'"""
|
|
)
|
|
|
|
node1.query(
|
|
"INSERT INTO test_zero_copy_mutation SELECT toDate('2023-01-01') + toIntervalDay(number) + rand(), number * number from system.numbers limit 10"
|
|
)
|
|
|
|
node2.query("SYSTEM STOP REPLICATION QUEUES test_zero_copy_mutation")
|
|
p = Pool(3)
|
|
|
|
def run_long_mutation(node):
|
|
node1.query(
|
|
"ALTER TABLE test_zero_copy_mutation DELETE WHERE sleepEachRow(1) == 1"
|
|
)
|
|
|
|
job = p.apply_async(run_long_mutation, (node1,))
|
|
|
|
for i in range(30):
|
|
count = node1.query(
|
|
"SELECT count() FROM system.replication_queue WHERE type = 'MUTATE_PART'"
|
|
).strip()
|
|
if int(count) > 0:
|
|
break
|
|
else:
|
|
time.sleep(0.1)
|
|
|
|
node2.query("SYSTEM START REPLICATION QUEUES test_zero_copy_mutation")
|
|
|
|
node2.query("SYSTEM SYNC REPLICA test_zero_copy_mutation")
|
|
|
|
job.get()
|
|
|
|
assert node2.contains_in_log("all_0_0_0_1/part_exclusive_lock exists")
|
|
assert node2.contains_in_log("Removing zero-copy lock on")
|
|
assert node2.contains_in_log("all_0_0_0_1/part_exclusive_lock doesn't exist")
|