ClickHouse/tests/queries/0_stateless/02481_async_insert_dedup.python
2023-01-17 15:47:52 +01:00

140 lines
4.6 KiB
Python

#!/usr/bin/env python3
import os
import sys
import random
import queue
import time
from threading import Thread
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
from pure_http_client import ClickHouseClient
client = ClickHouseClient()
# test table without partition
client.query("DROP TABLE IF EXISTS t_async_insert_dedup_no_part NO DELAY")
client.query('''
CREATE TABLE t_async_insert_dedup_no_part (
KeyID UInt32
) Engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/t_async_insert_dedup', '{replica}')
ORDER BY (KeyID)
''')
client.query("insert into t_async_insert_dedup_no_part values (1), (2), (3), (4), (5)", settings = {"async_insert": 1, "wait_for_async_insert": 1, "insert_keeper_fault_injection_probability": 0})
result = client.query("select count(*) from t_async_insert_dedup_no_part")
print(result, flush=True)
client.query("DROP TABLE IF EXISTS t_async_insert_dedup_no_part NO DELAY")
# generate data and push to queue
def generate_data(q, total_number):
old_data = []
max_chunk_size = 30
partitions = ['2022-11-11 10:10:10', '2022-12-12 10:10:10']
last_number = 0
while True:
dup_simulate = random.randint(0,3)
# insert old data randomly. 25% of them are dup.
if dup_simulate == 0:
last_idx = len(old_data)-1
if last_idx < 0:
continue
idx = last_idx - random.randint(0, 50)
if idx < 0:
idx = 0
q.put(old_data[idx])
else:
# insert new data.
chunk_size = random.randint(1, max_chunk_size)
insert_stmt = "insert into t_async_insert_dedup values "
start = last_number + 1
end = start + chunk_size
if end > total_number:
end = total_number
for i in range(start, end+1):
partition = partitions[random.randint(0, 1)]
insert_stmt += "('{}', {}),".format(partition, i)
insert_stmt = insert_stmt[:-1]
q.put(insert_stmt)
old_data.append(insert_stmt)
last_number = end
if end >= total_number:
break
# wait all the tasks is done.
q.join()
def fetch_and_insert_data(q, client):
while True:
insert = q.get()
client.query(insert, settings = {"async_insert": 1, "async_insert_deduplicate": 1, "wait_for_async_insert": 0, "async_insert_busy_timeout_ms": 1500, "insert_keeper_fault_injection_probability": 0})
q.task_done()
sleep_time = random.randint(50, 500)
time.sleep(sleep_time/1000.0)
# main process
client.query("DROP TABLE IF EXISTS t_async_insert_dedup NO DELAY")
client.query('''
CREATE TABLE t_async_insert_dedup (
EventDate DateTime,
KeyID UInt32
) Engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/t_async_insert_dedup', '{replica}')
PARTITION BY toYYYYMM(EventDate)
ORDER BY (KeyID, EventDate) SETTINGS use_async_block_ids_cache = 1
''')
q = queue.Queue(100)
total_number = 10000
gen = Thread(target = generate_data, args = [q, total_number])
gen.start()
for i in range(3):
insert = Thread(target = fetch_and_insert_data, args = [q, client])
insert.start()
gen.join()
retry = 0
while True:
time.sleep(5)
result = client.query("select KeyID from t_async_insert_dedup order by KeyID")
result = result.split()
err = False
errMsg = ""
if len(result) != total_number:
err = True
errMsg = f"the size of result is {len(result)}. we expect {total_number}."
else:
for i in range(total_number):
expect = str(i+1)
real = result[i]
if expect != real:
err = True
errMsg = f"error, real value {real} is not equal to expect value {expect} for {i}-th elements"
break
# retry several times to get stable results.
if err and retry >= 5:
print (errMsg, flush=True)
elif err:
retry += 1
continue
else:
print(len(result), flush=True)
break
result = client.query("SELECT value FROM system.metrics where metric = 'AsyncInsertCacheSize'")
result = int(result.split()[0])
if result <= 0:
raise Exception(f"AsyncInsertCacheSize should > 0, but got {result}")
result = client.query("SELECT value FROM system.events where event = 'AsyncInsertCacheHits'")
result = int(result.split()[0])
if result <= 0:
raise Exception(f"AsyncInsertCacheHits should > 0, but got {result}")
client.query("DROP TABLE IF EXISTS t_async_insert_dedup NO DELAY")
os._exit(os.EX_OK)