mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-15 02:41:59 +00:00
140 lines
4.6 KiB
Python
140 lines
4.6 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import sys
|
|
import random
|
|
import queue
|
|
import time
|
|
from threading import Thread
|
|
|
|
CURDIR = os.path.dirname(os.path.realpath(__file__))
|
|
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))
|
|
|
|
from pure_http_client import ClickHouseClient
|
|
|
|
client = ClickHouseClient()
|
|
|
|
# test table without partition
|
|
client.query("DROP TABLE IF EXISTS t_async_insert_dedup_no_part NO DELAY")
|
|
client.query('''
|
|
CREATE TABLE t_async_insert_dedup_no_part (
|
|
KeyID UInt32
|
|
) Engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/t_async_insert_dedup', '{replica}')
|
|
ORDER BY (KeyID)
|
|
''')
|
|
|
|
client.query("insert into t_async_insert_dedup_no_part values (1), (2), (3), (4), (5)", settings = {"async_insert": 1, "wait_for_async_insert": 1, "insert_keeper_fault_injection_probability": 0})
|
|
result = client.query("select count(*) from t_async_insert_dedup_no_part")
|
|
print(result, flush=True)
|
|
client.query("DROP TABLE IF EXISTS t_async_insert_dedup_no_part NO DELAY")
|
|
|
|
# generate data and push to queue
|
|
def generate_data(q, total_number):
|
|
old_data = []
|
|
max_chunk_size = 30
|
|
partitions = ['2022-11-11 10:10:10', '2022-12-12 10:10:10']
|
|
last_number = 0
|
|
while True:
|
|
dup_simulate = random.randint(0,3)
|
|
# insert old data randomly. 25% of them are dup.
|
|
if dup_simulate == 0:
|
|
last_idx = len(old_data)-1
|
|
if last_idx < 0:
|
|
continue
|
|
idx = last_idx - random.randint(0, 50)
|
|
if idx < 0:
|
|
idx = 0
|
|
q.put(old_data[idx])
|
|
else:
|
|
# insert new data.
|
|
chunk_size = random.randint(1, max_chunk_size)
|
|
insert_stmt = "insert into t_async_insert_dedup values "
|
|
start = last_number + 1
|
|
end = start + chunk_size
|
|
if end > total_number:
|
|
end = total_number
|
|
for i in range(start, end+1):
|
|
partition = partitions[random.randint(0, 1)]
|
|
insert_stmt += "('{}', {}),".format(partition, i)
|
|
insert_stmt = insert_stmt[:-1]
|
|
q.put(insert_stmt)
|
|
old_data.append(insert_stmt)
|
|
last_number = end
|
|
if end >= total_number:
|
|
break
|
|
# wait all the tasks is done.
|
|
q.join()
|
|
|
|
def fetch_and_insert_data(q, client):
|
|
while True:
|
|
insert = q.get()
|
|
client.query(insert, settings = {"async_insert": 1, "async_insert_deduplicate": 1, "wait_for_async_insert": 0, "async_insert_busy_timeout_ms": 1500, "insert_keeper_fault_injection_probability": 0})
|
|
q.task_done()
|
|
sleep_time = random.randint(50, 500)
|
|
time.sleep(sleep_time/1000.0)
|
|
|
|
# main process
|
|
client.query("DROP TABLE IF EXISTS t_async_insert_dedup NO DELAY")
|
|
client.query('''
|
|
CREATE TABLE t_async_insert_dedup (
|
|
EventDate DateTime,
|
|
KeyID UInt32
|
|
) Engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/t_async_insert_dedup', '{replica}')
|
|
PARTITION BY toYYYYMM(EventDate)
|
|
ORDER BY (KeyID, EventDate) SETTINGS use_async_block_ids_cache = 1
|
|
''')
|
|
|
|
q = queue.Queue(100)
|
|
total_number = 10000
|
|
|
|
gen = Thread(target = generate_data, args = [q, total_number])
|
|
gen.start()
|
|
|
|
for i in range(3):
|
|
insert = Thread(target = fetch_and_insert_data, args = [q, client])
|
|
insert.start()
|
|
|
|
gen.join()
|
|
|
|
retry = 0
|
|
|
|
while True:
|
|
time.sleep(5)
|
|
result = client.query("select KeyID from t_async_insert_dedup order by KeyID")
|
|
result = result.split()
|
|
err = False
|
|
errMsg = ""
|
|
if len(result) != total_number:
|
|
err = True
|
|
errMsg = f"the size of result is {len(result)}. we expect {total_number}."
|
|
else:
|
|
for i in range(total_number):
|
|
expect = str(i+1)
|
|
real = result[i]
|
|
if expect != real:
|
|
err = True
|
|
errMsg = f"error, real value {real} is not equal to expect value {expect} for {i}-th elements"
|
|
break
|
|
# retry several times to get stable results.
|
|
if err and retry >= 5:
|
|
print (errMsg, flush=True)
|
|
elif err:
|
|
retry += 1
|
|
continue
|
|
else:
|
|
print(len(result), flush=True)
|
|
break
|
|
|
|
result = client.query("SELECT value FROM system.metrics where metric = 'AsyncInsertCacheSize'")
|
|
result = int(result.split()[0])
|
|
if result <= 0:
|
|
raise Exception(f"AsyncInsertCacheSize should > 0, but got {result}")
|
|
result = client.query("SELECT value FROM system.events where event = 'AsyncInsertCacheHits'")
|
|
result = int(result.split()[0])
|
|
if result <= 0:
|
|
raise Exception(f"AsyncInsertCacheHits should > 0, but got {result}")
|
|
|
|
client.query("DROP TABLE IF EXISTS t_async_insert_dedup NO DELAY")
|
|
|
|
os._exit(os.EX_OK)
|