Make test for s3 schema inference cache better

This commit is contained in:
avogar 2022-08-18 16:50:59 +00:00
parent 4c64b7a69e
commit 1fb99f51b0

View File

@ -1488,53 +1488,44 @@ def test_wrong_format_usage(started_cluster):
assert "Not a Parquet file" in result
def get_profile_event_for_query(instance, query, profile_event):
def check_profile_event_for_query(instance, query, profile_event, amount):
instance.query("system flush logs")
time.sleep(0.5)
query = query.replace("'", "\\'")
return int(
instance.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1"
attempt = 0
res = 0
while (attempt < 10):
res = int(
instance.query(
f"select ProfileEvents['{profile_event}'] from system.query_log where query='{query}' and type = 'QueryFinish' order by event_time desc limit 1"
)
)
)
if res == amount:
break
assert(res == amount)
def check_cache_misses(instance, file, storage_name, started_cluster, bucket, amount=1):
query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')"
assert (
get_profile_event_for_query(instance, query, "SchemaInferenceCacheMisses")
== amount
)
check_profile_event_for_query(instance, query, "SchemaInferenceCacheMisses", amount)
def check_cache_hits(instance, file, storage_name, started_cluster, bucket, amount=1):
query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')"
assert (
get_profile_event_for_query(instance, query, "SchemaInferenceCacheHits")
== amount
)
check_profile_event_for_query(instance, query, "SchemaInferenceCacheHits", amount)
def check_cache_invalidations(
instance, file, storage_name, started_cluster, bucket, amount=1
):
query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')"
assert (
get_profile_event_for_query(
instance, query, "SchemaInferenceCacheInvalidations"
)
== amount
)
check_profile_event_for_query(instance, query, "SchemaInferenceCacheInvalidations", amount)
def check_cache_evictions(
instance, file, storage_name, started_cluster, bucket, amount=1
):
query = f"desc {storage_name}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file}')"
assert (
get_profile_event_for_query(instance, query, "SchemaInferenceCacheEvictions")
== amount
)
check_profile_event_for_query(instance, query, "SchemaInferenceCacheEvictions", amount)
def run_describe_query(instance, file, storage_name, started_cluster, bucket):