ClickHouse/tests/integration/test_move_partition_to_volume_async/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

134 lines
3.0 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import logging
import time
import os
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.mock_servers import start_s3_mock, start_mock_servers
from helpers.utility import generate_values, replace_config, SafeThread
from helpers.wait_for_helpers import wait_for_delete_inactive_parts
from helpers.wait_for_helpers import wait_for_delete_empty_parts
from helpers.wait_for_helpers import wait_for_merges
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
2023-11-15 17:48:54 +00:00
@pytest.fixture(scope="module")
def init_broken_s3(cluster):
yield start_s3_mock(cluster, "broken_s3", "8083")
@pytest.fixture(scope="function")
def broken_s3(init_broken_s3):
init_broken_s3.reset()
yield init_broken_s3
2023-11-15 17:48:54 +00:00
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node",
main_configs=[
"configs/storage_policy.xml",
],
with_minio=True,
)
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def test_async_alter_move(cluster, broken_s3):
node = cluster.instances["node"]
2023-11-15 17:48:54 +00:00
node.query(
"""
CREATE TABLE moving_table_async
(
key UInt64,
data String
)
ENGINE MergeTree()
ORDER BY tuple()
SETTINGS storage_policy = 'slow_s3'
2023-11-15 17:48:54 +00:00
"""
)
2023-11-15 17:48:54 +00:00
node.query(
"INSERT INTO moving_table_async SELECT number, randomPrintableASCII(1000) FROM numbers(10000)"
)
broken_s3.setup_slow_answers(
timeout=5,
count=1000000,
)
2023-11-15 17:48:54 +00:00
node.query(
"ALTER TABLE moving_table_async MOVE PARTITION tuple() TO DISK 'broken_s3'",
settings={"alter_move_to_space_execute_async": True},
timeout=10,
)
# not flaky, just introduce some wait
time.sleep(3)
for i in range(100):
2023-11-15 17:48:54 +00:00
count = node.query(
"SELECT count() FROM system.moves where table = 'moving_table_async'"
)
if count == "1\n":
break
time.sleep(0.1)
else:
assert False, "Cannot find any moving background operation"
2023-11-15 17:48:54 +00:00
def test_sync_alter_move(cluster, broken_s3):
node = cluster.instances["node"]
2023-11-15 17:48:54 +00:00
node.query(
"""
CREATE TABLE moving_table_sync
(
key UInt64,
data String
)
ENGINE MergeTree()
ORDER BY tuple()
SETTINGS storage_policy = 'slow_s3'
2023-11-15 17:48:54 +00:00
"""
)
2023-11-15 17:48:54 +00:00
node.query(
"INSERT INTO moving_table_sync SELECT number, randomPrintableASCII(1000) FROM numbers(10000)"
)
broken_s3.reset()
2023-11-15 17:48:54 +00:00
node.query(
"ALTER TABLE moving_table_sync MOVE PARTITION tuple() TO DISK 'broken_s3'",
timeout=30,
)
# not flaky, just introduce some wait
time.sleep(3)
2023-11-15 17:48:54 +00:00
assert (
node.query("SELECT count() FROM system.moves where table = 'moving_table_sync'")
== "0\n"
)
2023-11-15 17:48:54 +00:00
assert (
node.query(
"SELECT disk_name FROM system.parts WHERE table = 'moving_table_sync'"
)
== "broken_s3\n"
)