mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-15 10:52:30 +00:00
106 lines
3.6 KiB
Python
106 lines
3.6 KiB
Python
import logging
|
|
|
|
import pytest
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
logging.getLogger().setLevel(logging.INFO)
|
|
logging.getLogger().addHandler(logging.StreamHandler())
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def cluster():
|
|
try:
|
|
cluster = ClickHouseCluster(__file__)
|
|
cluster.add_instance("node", main_configs=["configs/config.d/log_conf.xml", "configs/config.d/storage_conf.xml",
|
|
"configs/config.d/ssl_conf.xml", "configs/config.d/query_log.xml"],
|
|
user_configs=["configs/config.d/users.xml"], with_minio=True)
|
|
logging.info("Starting cluster...")
|
|
cluster.start()
|
|
logging.info("Cluster started")
|
|
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def get_query_stat(instance, hint):
|
|
result = {}
|
|
instance.query("SYSTEM FLUSH LOGS")
|
|
events = instance.query('''
|
|
SELECT ProfileEvents.Names, ProfileEvents.Values
|
|
FROM system.query_log
|
|
ARRAY JOIN ProfileEvents
|
|
WHERE type != 1 AND query LIKE '%{}%'
|
|
'''.format(hint.replace("'", "\\'"))).split("\n")
|
|
for event in events:
|
|
ev = event.split("\t")
|
|
if len(ev) == 2:
|
|
if ev[0].startswith("S3"):
|
|
result[ev[0]] = int(ev[1])
|
|
return result
|
|
|
|
|
|
@pytest.mark.parametrize("min_rows_for_wide_part,read_requests", [(0, 2), (8192, 1)])
|
|
def test_write_is_cached(cluster, min_rows_for_wide_part, read_requests):
|
|
node = cluster.instances["node"]
|
|
|
|
node.query(
|
|
"""
|
|
CREATE TABLE s3_test (
|
|
id Int64,
|
|
data String
|
|
) ENGINE=MergeTree()
|
|
ORDER BY id
|
|
SETTINGS storage_policy='s3', min_rows_for_wide_part={}
|
|
""".format(min_rows_for_wide_part)
|
|
)
|
|
|
|
node.query("SYSTEM FLUSH LOGS")
|
|
node.query("TRUNCATE TABLE system.query_log")
|
|
|
|
node.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')")
|
|
|
|
select_query = "SELECT * FROM s3_test order by id FORMAT Values"
|
|
assert node.query(select_query) == "(0,'data'),(1,'data')"
|
|
|
|
stat = get_query_stat(node, select_query)
|
|
assert stat["S3ReadRequestsCount"] == read_requests # Only .bin files should be accessed from S3.
|
|
|
|
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
|
|
|
|
@pytest.mark.parametrize("min_rows_for_wide_part,all_files,bin_files", [(0, 4, 2), (8192, 2, 1)])
|
|
def test_read_after_cache_is_wiped(cluster, min_rows_for_wide_part, all_files, bin_files):
|
|
node = cluster.instances["node"]
|
|
|
|
node.query(
|
|
"""
|
|
CREATE TABLE s3_test (
|
|
id Int64,
|
|
data String
|
|
) ENGINE=MergeTree()
|
|
ORDER BY id
|
|
SETTINGS storage_policy='s3', min_rows_for_wide_part={}
|
|
""".format(min_rows_for_wide_part)
|
|
)
|
|
|
|
node.query("SYSTEM FLUSH LOGS")
|
|
node.query("TRUNCATE TABLE system.query_log")
|
|
|
|
node.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')")
|
|
|
|
# Wipe cache
|
|
cluster.exec_in_container(cluster.get_container_id("node"), ["rm", "-rf", "/var/lib/clickhouse/disks/s3/cache/"])
|
|
|
|
select_query = "SELECT * FROM s3_test"
|
|
node.query(select_query)
|
|
stat = get_query_stat(node, select_query)
|
|
assert stat["S3ReadRequestsCount"] == all_files # .mrk and .bin files should be accessed from S3.
|
|
|
|
# After cache is populated again, only .bin files should be accessed from S3.
|
|
select_query = "SELECT * FROM s3_test order by id FORMAT Values"
|
|
assert node.query(select_query) == "(0,'data'),(1,'data')"
|
|
stat = get_query_stat(node, select_query)
|
|
assert stat["S3ReadRequestsCount"] == bin_files
|
|
|
|
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
|