ClickHouse/tests/integration/test_merge_tree_s3_failover/test.py

118 lines
3.9 KiB
Python
Raw Normal View History

import logging
import os
import time
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
# Runs custom python-based S3 endpoint.
def run_endpoint(cluster):
logging.info("Starting custom S3 endpoint")
container_id = cluster.get_container_id('resolver')
current_dir = os.path.dirname(__file__)
cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_endpoint", "endpoint.py"), "endpoint.py")
cluster.exec_in_container(container_id, ["python", "endpoint.py"], detach=True)
# Wait for S3 endpoint start
for attempt in range(10):
ping_response = cluster.exec_in_container(cluster.get_container_id('resolver'),
["curl", "-s", "http://resolver:8080/"], nothrow=True)
if ping_response != 'OK':
if attempt == 9:
assert ping_response == 'OK', 'Expected "OK", but got "{}"'.format(ping_response)
else:
time.sleep(1)
else:
break
logging.info("S3 endpoint started")
def fail_request(cluster, request):
response = cluster.exec_in_container(cluster.get_container_id('resolver'),
["curl", "-s", "http://resolver:8080/fail_request/{}".format(request)])
assert response == 'OK', 'Expected "OK", but got "{}"'.format(response)
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node",
main_configs=["configs/config.d/log_conf.xml", "configs/config.d/storage_conf.xml"],
with_minio=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
run_endpoint(cluster)
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def drop_table(cluster):
yield
node = cluster.instances["node"]
node.query("DROP TABLE IF EXISTS s3_failover_test NO DELAY")
# S3 request will be failed for an appropriate part file write.
FILES_PER_PART_BASE = 5 # partition.dat, default_compression_codec.txt, count.txt, columns.txt, checksums.txt
FILES_PER_PART_WIDE = FILES_PER_PART_BASE + 1 + 1 + 3 * 2 # Primary index, MinMax, Mark and data file for column(s)
FILES_PER_PART_COMPACT = FILES_PER_PART_BASE + 1 + 1 + 2
@pytest.mark.parametrize(
"min_bytes_for_wide_part,request_count",
[
(0, FILES_PER_PART_WIDE),
(1024 * 1024, FILES_PER_PART_COMPACT)
]
)
def test_write_failover(cluster, min_bytes_for_wide_part, request_count):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE s3_failover_test (
dt Date,
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
PARTITION BY dt
SETTINGS storage_policy='s3', min_bytes_for_wide_part={}
"""
.format(min_bytes_for_wide_part)
)
for request in range(request_count + 1):
# Fail N-th request to S3.
fail_request(cluster, request + 1)
data = "('2020-03-01',0,'data'),('2020-03-01',1,'data')"
positive = request == request_count
try:
node.query("INSERT INTO s3_failover_test VALUES {}".format(data))
assert positive, "Insert query should be failed, request {}".format(request)
except QueryRuntimeException as e:
assert not positive, "Insert query shouldn't be failed, request {}".format(request)
assert str(e).find("Expected Error") != -1, "Unexpected error {}".format(str(e))
if positive:
# Disable request failing.
fail_request(cluster, 0)
assert node.query("CHECK TABLE s3_failover_test") == '1\n'
assert node.query("SELECT * FROM s3_failover_test FORMAT Values") == data