ClickHouse/tests/integration/test_file_schema_inference_cache/test.py
2022-08-24 11:28:56 +00:00

152 lines
4.2 KiB
Python
Executable File

import pytest
import time
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
stay_alive=True,
main_configs=[
"configs/config.d/query_log.xml",
"configs/config.d/schema_cache.xml",
],
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_profile_event_for_query(node, query, profile_event):
node.query("system flush logs")
query = query.replace("'", "\\'")
return int(
node.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by query_start_time_microseconds desc limit 1"
)
)
def check_cache_misses(node, file, amount=1):
assert (
get_profile_event_for_query(
node, f"desc file('{file}')", "SchemaInferenceCacheMisses"
)
== amount
)
def check_cache_hits(node, file, amount=1):
assert (
get_profile_event_for_query(
node, f"desc file('{file}')", "SchemaInferenceCacheHits"
)
== amount
)
def check_cache_invalidations(node, file, amount=1):
assert (
get_profile_event_for_query(
node, f"desc file('{file}')", "SchemaInferenceCacheInvalidations"
)
== amount
)
def check_cache_evictions(node, file, amount=1):
assert (
get_profile_event_for_query(
node, f"desc file('{file}')", "SchemaInferenceCacheEvictions"
)
== amount
)
def check_cache(node, expected_files):
sources = node.query("select source from system.schema_inference_cache")
assert sorted(map(lambda x: x.strip().split("/")[-1], sources.split())) == sorted(
expected_files
)
def test(start_cluster):
node.query("insert into function file('data.jsonl') select * from numbers(100)")
time.sleep(1)
node.query("desc file('data.jsonl')")
check_cache(node, ["data.jsonl"])
check_cache_misses(node, "data.jsonl")
node.query("desc file('data.jsonl')")
check_cache_hits(node, "data.jsonl")
node.query("insert into function file('data.jsonl') select * from numbers(100)")
time.sleep(1)
node.query("desc file('data.jsonl')")
check_cache_invalidations(node, "data.jsonl")
node.query("insert into function file('data1.jsonl') select * from numbers(100)")
time.sleep(1)
node.query("desc file('data1.jsonl')")
check_cache(node, ["data.jsonl", "data1.jsonl"])
check_cache_misses(node, "data1.jsonl")
node.query("desc file('data1.jsonl')")
check_cache_hits(node, "data1.jsonl")
node.query("insert into function file('data2.jsonl') select * from numbers(100)")
time.sleep(1)
node.query("desc file('data2.jsonl')")
check_cache(node, ["data1.jsonl", "data2.jsonl"])
check_cache_misses(node, "data2.jsonl")
check_cache_evictions(node, "data2.jsonl")
node.query("desc file('data2.jsonl')")
check_cache_hits(node, "data2.jsonl")
node.query("desc file('data1.jsonl')")
check_cache_hits(node, "data1.jsonl")
node.query("desc file('data.jsonl')")
check_cache(node, ["data.jsonl", "data1.jsonl"])
check_cache_misses(node, "data.jsonl")
check_cache_evictions(node, "data.jsonl")
node.query("desc file('data2.jsonl')")
check_cache(node, ["data.jsonl", "data2.jsonl"])
check_cache_misses(node, "data2.jsonl")
check_cache_evictions(node, "data2.jsonl")
node.query("desc file('data2.jsonl')")
check_cache_hits(node, "data2.jsonl")
node.query("desc file('data.jsonl')")
check_cache_hits(node, "data.jsonl")
node.query("insert into function file('data3.jsonl') select * from numbers(100)")
time.sleep(1)
node.query("desc file('data*.jsonl')")
check_cache_hits(node, "data*.jsonl")
node.query("system drop schema cache for file")
check_cache(node, [])
node.query("desc file('data*.jsonl')")
check_cache_misses(node, "data*.jsonl", 4)
node.query("system drop schema cache")
check_cache(node, [])
node.query("desc file('data*.jsonl')")
check_cache_misses(node, "data*.jsonl", 4)