diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 758c8b5999d..8c0ebba694d 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1416,7 +1416,8 @@ int Server::main(const std::vector & /*args*/) if (settings.async_insert_threads) global_context->setAsynchronousInsertQueue(std::make_shared( global_context, - settings.async_insert_threads)); + settings.async_insert_threads, + settings.async_insert_cleanup_timeout_ms)); /// Size of cache for marks (index of MergeTree family of tables). size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120); diff --git a/tests/queries/0_stateless/02451_async_insert_user_level_settings.python b/tests/queries/0_stateless/02451_async_insert_user_level_settings.python new file mode 100644 index 00000000000..9fe6142edc6 --- /dev/null +++ b/tests/queries/0_stateless/02451_async_insert_user_level_settings.python @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 +import os +import sys +import time + +CURDIR = os.path.dirname(os.path.realpath(__file__)) +sys.path.insert(0, os.path.join(CURDIR, "helpers")) + +CLICKHOUSE_URL = os.environ.get("CLICKHOUSE_URL") +CLICKHOUSE_TMP = os.environ.get("CLICKHOUSE_TMP") + +from pure_http_client import ClickHouseClient + +client = ClickHouseClient() + +NUM_RUNS = 20 +TIME_TO_WAIT_MS = 500 + +# The purpose of this test is to check that AsyncInsertQueue +# respects timeouts specified in the scope of query. +# Like if we execute NUM_RUNS subsequent inserts +# then we should spend at least (NUM_RUNS - 1) * TIME_TO_WAIT_MS +# Because each query corresponds to a timepoint when it been flushed +# And time period between first and last flush is exactly such +# as descibed above. +# Note that this doesn't include the time to process the query itself +# and this time maybe different depending on the build type (release or with sanitizer) + +gen_data_query = "SELECT number + {} AS id, toString(id) AS s, range(id) AS arr FROM numbers(10) FORMAT TSV" +insert_query = "INSERT INTO t_async_insert_user_settings FORMAT TSV" +settings = { + "async_insert": 1, + "wait_for_async_insert": 1, + "async_insert_busy_timeout_ms": TIME_TO_WAIT_MS, +} + +all_data = [] + +for i in range(NUM_RUNS): + all_data.append( + client.query(gen_data_query.format(i * 10), settings={}, binary_result=True) + ) + +client.query("DROP TABLE IF EXISTS t_async_insert_user_settings") +client.query( + "CREATE TABLE t_async_insert_user_settings (id UInt64, s String, arr Array(UInt64)) ENGINE = Memory" +) + +start_ms = time.time() * 1000.0 +for i in range(NUM_RUNS): + client.query_with_data(insert_query, all_data[i], settings=settings) +end_ms = time.time() * 1000.0 + +duration = end_ms - start_ms + +expected = (NUM_RUNS - 1) * TIME_TO_WAIT_MS +if duration >= expected: + print("Ok.") +else: + print(f"Fail. Duration: {duration}. Expected: {expected}") + +client.query("DROP TABLE IF EXISTS t_async_insert_user_settings") diff --git a/tests/queries/0_stateless/02451_async_insert_user_level_settings.reference b/tests/queries/0_stateless/02451_async_insert_user_level_settings.reference new file mode 100644 index 00000000000..587579af915 --- /dev/null +++ b/tests/queries/0_stateless/02451_async_insert_user_level_settings.reference @@ -0,0 +1 @@ +Ok. diff --git a/tests/queries/0_stateless/02451_async_insert_user_level_settings.sh b/tests/queries/0_stateless/02451_async_insert_user_level_settings.sh new file mode 100755 index 00000000000..3d627e273b9 --- /dev/null +++ b/tests/queries/0_stateless/02451_async_insert_user_level_settings.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +# Tags: no-fasttest + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +# We should have correct env vars from shell_config.sh to run this test +python3 "$CURDIR"/02451_async_insert_user_level_settings.python