ClickHouse/tests/integration/test_async_insert_adaptive_busy_timeout/test.py
Julia Kartseva 689c368b76 Adaptive async timeouts: consider queue flush history
In addition to the time since the most recent insert,
consider the elapsed time between the two recent queue
flushes when decreasing the timeout or processing an
entry synchronously.
2024-02-01 22:45:10 +00:00

373 lines
10 KiB
Python

import copy
import logging
import pytest
import random
import timeit
from math import floor
from multiprocessing import Pool
from itertools import repeat
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/zookeeper_config.xml"],
user_configs=[
"configs/users.xml",
],
with_zookeeper=True,
)
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
_query_settings = {"async_insert": 1, "wait_for_async_insert": 1}
def _generate_values(size, min_int, max_int, array_size_range):
gen_tuple = lambda _min_int, _max_int, _array_size_range: (
random.randint(_min_int, _max_int),
[
random.randint(_min_int, _max_int)
for _ in range(random.randint(*_array_size_range))
],
)
return map(lambda _: gen_tuple(min_int, max_int, array_size_range), range(size))
def _insert_query(table_name, settings, *args, **kwargs):
settings_s = ", ".join("{}={}".format(k, settings[k]) for k in settings)
INSERT_QUERY = "INSERT INTO {} SETTINGS {} VALUES {}"
node.query(
INSERT_QUERY.format(
table_name,
settings_s,
", ".join(map(str, _generate_values(*args, **kwargs))),
)
)
def _insert_queries_sequentially(
table_name, settings, iterations, max_values_size, array_size_range
):
for iter in range(iterations):
_insert_query(
table_name,
settings,
random.randint(1, max_values_size),
iter * max_values_size,
(iter + 1) * max_values_size - 1,
array_size_range,
)
def _insert_queries_in_parallel(
table_name, settings, thread_num, tasks, max_values_size, array_size_range
):
sizes = [random.randint(1, max_values_size) for _ in range(tasks)]
min_ints = [iter * max_values_size for iter in range(tasks)]
max_ints = [(iter + 1) * max_values_size - 1 for iter in range(tasks)]
with Pool(thread_num) as p:
p.starmap(
_insert_query,
zip(
repeat(table_name),
repeat(settings),
sizes,
min_ints,
max_ints,
repeat(array_size_range),
),
)
def test_with_merge_tree():
table_name = "async_insert_mt_table"
node.query(
"CREATE TABLE {} (a UInt64, b Array(UInt64)) ENGINE=MergeTree() ORDER BY a".format(
table_name
)
)
_insert_queries_sequentially(
table_name,
_query_settings,
iterations=100,
max_values_size=1000,
array_size_range=[10, 50],
)
node.query("DROP TABLE IF EXISTS {}".format(table_name))
def test_with_merge_tree_multithread():
thread_num = 15
table_name = "async_insert_mt_multithread_table"
node.query(
"CREATE TABLE {} (a UInt64, b Array(UInt64)) ENGINE=MergeTree() ORDER BY a".format(
table_name
)
)
_insert_queries_in_parallel(
table_name,
_query_settings,
thread_num=15,
tasks=1000,
max_values_size=1000,
array_size_range=[10, 15],
)
node.query("DROP TABLE IF EXISTS {}".format(table_name))
def test_with_replicated_merge_tree():
table_name = "async_insert_replicated_mt_table"
create_query = " ".join(
(
"CREATE TABLE {} (a UInt64, b Array(UInt64))".format(table_name),
"ENGINE=ReplicatedMergeTree('/clickhouse/tables/test/{}', 'node')".format(
table_name
),
"ORDER BY a",
)
)
node.query(create_query)
settings = _query_settings
_insert_queries_sequentially(
table_name,
settings,
iterations=100,
max_values_size=1000,
array_size_range=[10, 50],
)
node.query("DROP TABLE IF EXISTS {}".format(table_name))
def test_with_replicated_merge_tree_multithread():
thread_num = 15
table_name = "async_insert_replicated_mt_multithread_table"
create_query = " ".join(
(
"CREATE TABLE {} (a UInt64, b Array(UInt64))".format(table_name),
"ENGINE=ReplicatedMergeTree('/clickhouse/tables/test/{}', 'node')".format(
table_name
),
"ORDER BY a",
)
)
node.query(create_query)
_insert_queries_in_parallel(
table_name,
_query_settings,
thread_num=15,
tasks=1000,
max_values_size=1000,
array_size_range=[10, 15],
)
node.query("DROP TABLE IF EXISTS {}".format(table_name))
# Ensure that the combined duration of inserts with adaptive timeouts is less than
# the combined duration for fixed timeouts.
def test_compare_sequential_inserts_durations_for_adaptive_and_fixed_async_timeouts():
fixed_tm_table_name = "async_insert_mt_fixed_async_timeout"
node.query(
"CREATE TABLE {} (a UInt64, b Array(UInt64)) ENGINE=MergeTree() ORDER BY a".format(
fixed_tm_table_name
)
)
fixed_tm_settings = copy.copy(_query_settings)
fixed_tm_settings["async_insert_use_adaptive_busy_timeout"] = 0
fixed_tm_settings["async_insert_busy_timeout_ms"] = 200
fixed_tm_run_duration = timeit.timeit(
lambda: _insert_queries_sequentially(
fixed_tm_table_name,
fixed_tm_settings,
iterations=100,
max_values_size=1000,
array_size_range=[10, 50],
),
setup="pass",
number=3,
)
node.query("DROP TABLE IF EXISTS {}".format(fixed_tm_table_name))
logging.debug(
"Run duration with fixed asynchronous timeout is {} seconds".format(
fixed_tm_run_duration
)
)
adaptive_tm_table_name = "async_insert_mt_adaptive_async_timeout"
node.query(
"CREATE TABLE {} (a UInt64, b Array(UInt64)) ENGINE=MergeTree() ORDER BY a".format(
adaptive_tm_table_name
)
)
adaptive_tm_settings = copy.copy(_query_settings)
adaptive_tm_settings["async_insert_busy_timeout_min_ms"] = 10
adaptive_tm_settings["async_insert_busy_timeout_max_ms"] = 1000
adaptive_tm_run_duration = timeit.timeit(
lambda: _insert_queries_sequentially(
adaptive_tm_table_name,
adaptive_tm_settings,
iterations=100,
max_values_size=1000,
array_size_range=[10, 50],
),
setup="pass",
number=3,
)
logging.debug(
"Run duration with adaptive asynchronous timeout is {} seconds.".format(
adaptive_tm_run_duration
)
)
node.query("DROP TABLE IF EXISTS {}".format(adaptive_tm_table_name))
assert adaptive_tm_run_duration <= fixed_tm_run_duration
# Ensure that the combined duration of inserts with adaptive timeouts is less than
# the combined duration for fixed timeouts.
def test_compare_parallel_inserts_durations_for_adaptive_and_fixed_async_timeouts():
fixed_tm_table_name = "async_insert_mt_fixed_async_timeout"
node.query(
"CREATE TABLE {} (a UInt64, b Array(UInt64)) ENGINE=MergeTree() ORDER BY a".format(
fixed_tm_table_name
)
)
fixed_tm_settings = copy.copy(_query_settings)
fixed_tm_settings["async_insert_use_adaptive_busy_timeout"] = 0
fixed_tm_settings["async_insert_busy_timeout_ms"] = 200
fixed_tm_run_duration = timeit.timeit(
lambda: _insert_queries_in_parallel(
fixed_tm_table_name,
fixed_tm_settings,
thread_num=15,
tasks=1000,
max_values_size=1000,
array_size_range=[10, 50],
),
setup="pass",
number=3,
)
node.query("DROP TABLE IF EXISTS {}".format(fixed_tm_table_name))
logging.debug(
"Run duration with fixed asynchronous timeout is {} seconds".format(
fixed_tm_run_duration
)
)
adaptive_tm_table_name = "async_insert_mt_adaptive_async_timeout"
node.query(
"CREATE TABLE {} (a UInt64, b Array(UInt64)) ENGINE=MergeTree() ORDER BY a".format(
adaptive_tm_table_name
)
)
adaptive_tm_settings = copy.copy(_query_settings)
adaptive_tm_settings["async_insert_busy_timeout_min_ms"] = 10
adaptive_tm_settings["async_insert_busy_timeout_max_ms"] = 200
adaptive_tm_run_duration = timeit.timeit(
lambda: _insert_queries_in_parallel(
adaptive_tm_table_name,
adaptive_tm_settings,
thread_num=15,
tasks=100,
max_values_size=1000,
array_size_range=[10, 50],
),
setup="pass",
number=3,
)
logging.debug(
"Run duration with adaptive asynchronous timeout is {} seconds.".format(
adaptive_tm_run_duration
)
)
node.query("DROP TABLE IF EXISTS {}".format(adaptive_tm_table_name))
assert adaptive_tm_run_duration <= fixed_tm_run_duration
# Ensure that the delay converges to a minimum for sequential inserts and wait_for_async_insert=1.
def test_change_queries_frequency():
table_name = "async_insert_mt_change_queries_frequencies"
create_query = " ".join(
(
"CREATE TABLE {} (a UInt64, b Array(UInt64))".format(table_name),
"ENGINE=ReplicatedMergeTree('/clickhouse/tables/test_frequencies/{}', 'node')".format(
table_name
),
"ORDER BY a",
)
)
node.query(create_query)
settings = copy.copy(_query_settings)
min_ms = 50
settings["async_insert_busy_timeout_min_ms"] = min_ms
settings["async_insert_busy_timeout_max_ms"] = 2000
_insert_queries_in_parallel(
table_name,
settings,
thread_num=15,
tasks=2000,
max_values_size=1000,
array_size_range=[10, 15],
)
_insert_queries_sequentially(
table_name,
settings,
iterations=200,
max_values_size=1000,
array_size_range=[10, 50],
)
select_log_query = "SELECT timeout_milliseconds FROM system.asynchronous_insert_log ORDER BY event_time DESC LIMIT 50"
res = node.query(select_log_query)
for line in res.splitlines():
assert int(line) == min_ms
node.query("DROP TABLE IF EXISTS {}".format(table_name))