2022-06-21 13:02:48 +00:00
import pytest
import time
from helpers . cluster import ClickHouseCluster
cluster = ClickHouseCluster ( __file__ )
2022-06-27 14:04:28 +00:00
node = cluster . add_instance (
2022-08-11 10:55:18 +00:00
" node " ,
stay_alive = True ,
main_configs = [
" configs/config.d/query_log.xml " ,
" configs/config.d/schema_cache.xml " ,
] ,
2022-06-27 14:04:28 +00:00
)
2022-06-21 13:02:48 +00:00
@pytest.fixture ( scope = " module " )
def start_cluster ( ) :
try :
cluster . start ( )
yield cluster
finally :
cluster . shutdown ( )
def get_profile_event_for_query ( node , query , profile_event ) :
2022-06-27 14:04:28 +00:00
node . query ( " system flush logs " )
2022-06-21 13:02:48 +00:00
query = query . replace ( " ' " , " \\ ' " )
2022-06-27 12:43:24 +00:00
return int (
node . query (
2022-08-24 11:28:56 +00:00
f " select ProfileEvents[ ' { profile_event } ' ] from system.query_log where query= ' { query } ' and type = ' QueryFinish ' order by query_start_time_microseconds desc limit 1 "
2022-06-27 12:43:24 +00:00
)
)
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
def check_cache_misses ( node , file , amount = 1 ) :
2022-08-11 10:55:18 +00:00
assert (
get_profile_event_for_query (
node , f " desc file( ' { file } ' ) " , " SchemaInferenceCacheMisses "
)
== amount
)
2022-08-05 16:20:15 +00:00
def check_cache_hits ( node , file , amount = 1 ) :
2022-08-11 10:55:18 +00:00
assert (
get_profile_event_for_query (
node , f " desc file( ' { file } ' ) " , " SchemaInferenceCacheHits "
)
== amount
)
2022-08-05 16:20:15 +00:00
def check_cache_invalidations ( node , file , amount = 1 ) :
2022-08-11 10:55:18 +00:00
assert (
get_profile_event_for_query (
node , f " desc file( ' { file } ' ) " , " SchemaInferenceCacheInvalidations "
)
== amount
)
2022-08-05 16:20:15 +00:00
def check_cache_evictions ( node , file , amount = 1 ) :
2022-08-11 10:55:18 +00:00
assert (
get_profile_event_for_query (
node , f " desc file( ' { file } ' ) " , " SchemaInferenceCacheEvictions "
)
== amount
)
2022-08-05 16:20:15 +00:00
def check_cache ( node , expected_files ) :
sources = node . query ( " select source from system.schema_inference_cache " )
2022-08-11 10:55:18 +00:00
assert sorted ( map ( lambda x : x . strip ( ) . split ( " / " ) [ - 1 ] , sources . split ( ) ) ) == sorted (
expected_files
)
2022-08-05 16:20:15 +00:00
2022-06-21 13:02:48 +00:00
def test ( start_cluster ) :
node . query ( " insert into function file( ' data.jsonl ' ) select * from numbers(100) " )
2022-06-28 16:13:42 +00:00
time . sleep ( 1 )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
node . query ( " desc file( ' data.jsonl ' ) " )
check_cache ( node , [ " data.jsonl " ] )
check_cache_misses ( node , " data.jsonl " )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
node . query ( " desc file( ' data.jsonl ' ) " )
check_cache_hits ( node , " data.jsonl " )
2022-06-28 16:13:42 +00:00
2022-08-05 16:20:15 +00:00
node . query ( " insert into function file( ' data.jsonl ' ) select * from numbers(100) " )
2022-06-28 16:13:42 +00:00
time . sleep ( 1 )
2022-08-05 16:20:15 +00:00
node . query ( " desc file( ' data.jsonl ' ) " )
check_cache_invalidations ( node , " data.jsonl " )
2022-06-21 13:02:48 +00:00
node . query ( " insert into function file( ' data1.jsonl ' ) select * from numbers(100) " )
2022-08-05 16:20:15 +00:00
time . sleep ( 1 )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
node . query ( " desc file( ' data1.jsonl ' ) " )
check_cache ( node , [ " data.jsonl " , " data1.jsonl " ] )
check_cache_misses ( node , " data1.jsonl " )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
node . query ( " desc file( ' data1.jsonl ' ) " )
check_cache_hits ( node , " data1.jsonl " )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
node . query ( " insert into function file( ' data2.jsonl ' ) select * from numbers(100) " )
time . sleep ( 1 )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
node . query ( " desc file( ' data2.jsonl ' ) " )
check_cache ( node , [ " data1.jsonl " , " data2.jsonl " ] )
check_cache_misses ( node , " data2.jsonl " )
check_cache_evictions ( node , " data2.jsonl " )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
node . query ( " desc file( ' data2.jsonl ' ) " )
check_cache_hits ( node , " data2.jsonl " )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
node . query ( " desc file( ' data1.jsonl ' ) " )
check_cache_hits ( node , " data1.jsonl " )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
node . query ( " desc file( ' data.jsonl ' ) " )
check_cache ( node , [ " data.jsonl " , " data1.jsonl " ] )
check_cache_misses ( node , " data.jsonl " )
check_cache_evictions ( node , " data.jsonl " )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
node . query ( " desc file( ' data2.jsonl ' ) " )
check_cache ( node , [ " data.jsonl " , " data2.jsonl " ] )
check_cache_misses ( node , " data2.jsonl " )
check_cache_evictions ( node , " data2.jsonl " )
node . query ( " desc file( ' data2.jsonl ' ) " )
check_cache_hits ( node , " data2.jsonl " )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
node . query ( " desc file( ' data.jsonl ' ) " )
check_cache_hits ( node , " data.jsonl " )
node . query ( " insert into function file( ' data3.jsonl ' ) select * from numbers(100) " )
2022-06-28 16:13:42 +00:00
time . sleep ( 1 )
2022-08-05 16:20:15 +00:00
node . query ( " desc file( ' data*.jsonl ' ) " )
check_cache_hits ( node , " data*.jsonl " )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
node . query ( " system drop schema cache for file " )
check_cache ( node , [ ] )
node . query ( " desc file( ' data*.jsonl ' ) " )
check_cache_misses ( node , " data*.jsonl " , 4 )
node . query ( " system drop schema cache " )
check_cache ( node , [ ] )
node . query ( " desc file( ' data*.jsonl ' ) " )
check_cache_misses ( node , " data*.jsonl " , 4 )