Added a test

This commit is contained in:
Nikita Mikhaylov 2022-09-20 22:46:40 +00:00
parent 852d084950
commit 502338560c
4 changed files with 74 additions and 1 deletions

View File

@ -1416,7 +1416,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (settings.async_insert_threads)
global_context->setAsynchronousInsertQueue(std::make_shared<AsynchronousInsertQueue>(
global_context,
settings.async_insert_threads));
settings.async_insert_threads,
settings.async_insert_cleanup_timeout_ms));
/// Size of cache for marks (index of MergeTree family of tables).
size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120);

View File

@ -0,0 +1,62 @@
#!/usr/bin/env python3
import os
import sys
import time
CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, "helpers"))
CLICKHOUSE_URL = os.environ.get("CLICKHOUSE_URL")
CLICKHOUSE_TMP = os.environ.get("CLICKHOUSE_TMP")
from pure_http_client import ClickHouseClient
client = ClickHouseClient()
NUM_RUNS = 20
TIME_TO_WAIT_MS = 500
# The purpose of this test is to check that AsyncInsertQueue
# respects timeouts specified in the scope of query.
# Like if we execute NUM_RUNS subsequent inserts
# then we should spend at least (NUM_RUNS - 1) * TIME_TO_WAIT_MS
# Because each query corresponds to a timepoint when it been flushed
# And time period between first and last flush is exactly such
# as descibed above.
# Note that this doesn't include the time to process the query itself
# and this time maybe different depending on the build type (release or with sanitizer)
gen_data_query = "SELECT number + {} AS id, toString(id) AS s, range(id) AS arr FROM numbers(10) FORMAT TSV"
insert_query = "INSERT INTO t_async_insert_user_settings FORMAT TSV"
settings = {
"async_insert": 1,
"wait_for_async_insert": 1,
"async_insert_busy_timeout_ms": TIME_TO_WAIT_MS,
}
all_data = []
for i in range(NUM_RUNS):
all_data.append(
client.query(gen_data_query.format(i * 10), settings={}, binary_result=True)
)
client.query("DROP TABLE IF EXISTS t_async_insert_user_settings")
client.query(
"CREATE TABLE t_async_insert_user_settings (id UInt64, s String, arr Array(UInt64)) ENGINE = Memory"
)
start_ms = time.time() * 1000.0
for i in range(NUM_RUNS):
client.query_with_data(insert_query, all_data[i], settings=settings)
end_ms = time.time() * 1000.0
duration = end_ms - start_ms
expected = (NUM_RUNS - 1) * TIME_TO_WAIT_MS
if duration >= expected:
print("Ok.")
else:
print(f"Fail. Duration: {duration}. Expected: {expected}")
client.query("DROP TABLE IF EXISTS t_async_insert_user_settings")

View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# We should have correct env vars from shell_config.sh to run this test
python3 "$CURDIR"/02451_async_insert_user_level_settings.python