mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-12 09:22:05 +00:00
689c368b76
In addition to the time since the most recent insert, consider the elapsed time between the two recent queue flushes when decreasing the timeout or processing an entry synchronously.
373 lines
10 KiB
Python
373 lines
10 KiB
Python
import copy
|
|
import logging
|
|
import pytest
|
|
import random
|
|
import timeit
|
|
|
|
from math import floor
|
|
from multiprocessing import Pool
|
|
from itertools import repeat
|
|
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
|
|
|
|
node = cluster.add_instance(
|
|
"node",
|
|
main_configs=["configs/zookeeper_config.xml"],
|
|
user_configs=[
|
|
"configs/users.xml",
|
|
],
|
|
with_zookeeper=True,
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="module", autouse=True)
|
|
def start_cluster():
|
|
try:
|
|
cluster.start()
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
_query_settings = {"async_insert": 1, "wait_for_async_insert": 1}
|
|
|
|
|
|
def _generate_values(size, min_int, max_int, array_size_range):
|
|
gen_tuple = lambda _min_int, _max_int, _array_size_range: (
|
|
random.randint(_min_int, _max_int),
|
|
[
|
|
random.randint(_min_int, _max_int)
|
|
for _ in range(random.randint(*_array_size_range))
|
|
],
|
|
)
|
|
|
|
return map(lambda _: gen_tuple(min_int, max_int, array_size_range), range(size))
|
|
|
|
|
|
def _insert_query(table_name, settings, *args, **kwargs):
|
|
settings_s = ", ".join("{}={}".format(k, settings[k]) for k in settings)
|
|
INSERT_QUERY = "INSERT INTO {} SETTINGS {} VALUES {}"
|
|
node.query(
|
|
INSERT_QUERY.format(
|
|
table_name,
|
|
settings_s,
|
|
", ".join(map(str, _generate_values(*args, **kwargs))),
|
|
)
|
|
)
|
|
|
|
|
|
def _insert_queries_sequentially(
|
|
table_name, settings, iterations, max_values_size, array_size_range
|
|
):
|
|
for iter in range(iterations):
|
|
_insert_query(
|
|
table_name,
|
|
settings,
|
|
random.randint(1, max_values_size),
|
|
iter * max_values_size,
|
|
(iter + 1) * max_values_size - 1,
|
|
array_size_range,
|
|
)
|
|
|
|
|
|
def _insert_queries_in_parallel(
|
|
table_name, settings, thread_num, tasks, max_values_size, array_size_range
|
|
):
|
|
sizes = [random.randint(1, max_values_size) for _ in range(tasks)]
|
|
min_ints = [iter * max_values_size for iter in range(tasks)]
|
|
max_ints = [(iter + 1) * max_values_size - 1 for iter in range(tasks)]
|
|
with Pool(thread_num) as p:
|
|
p.starmap(
|
|
_insert_query,
|
|
zip(
|
|
repeat(table_name),
|
|
repeat(settings),
|
|
sizes,
|
|
min_ints,
|
|
max_ints,
|
|
repeat(array_size_range),
|
|
),
|
|
)
|
|
|
|
|
|
def test_with_merge_tree():
|
|
table_name = "async_insert_mt_table"
|
|
node.query(
|
|
"CREATE TABLE {} (a UInt64, b Array(UInt64)) ENGINE=MergeTree() ORDER BY a".format(
|
|
table_name
|
|
)
|
|
)
|
|
|
|
_insert_queries_sequentially(
|
|
table_name,
|
|
_query_settings,
|
|
iterations=100,
|
|
max_values_size=1000,
|
|
array_size_range=[10, 50],
|
|
)
|
|
|
|
node.query("DROP TABLE IF EXISTS {}".format(table_name))
|
|
|
|
|
|
def test_with_merge_tree_multithread():
|
|
thread_num = 15
|
|
table_name = "async_insert_mt_multithread_table"
|
|
node.query(
|
|
"CREATE TABLE {} (a UInt64, b Array(UInt64)) ENGINE=MergeTree() ORDER BY a".format(
|
|
table_name
|
|
)
|
|
)
|
|
|
|
_insert_queries_in_parallel(
|
|
table_name,
|
|
_query_settings,
|
|
thread_num=15,
|
|
tasks=1000,
|
|
max_values_size=1000,
|
|
array_size_range=[10, 15],
|
|
)
|
|
|
|
node.query("DROP TABLE IF EXISTS {}".format(table_name))
|
|
|
|
|
|
def test_with_replicated_merge_tree():
|
|
table_name = "async_insert_replicated_mt_table"
|
|
|
|
create_query = " ".join(
|
|
(
|
|
"CREATE TABLE {} (a UInt64, b Array(UInt64))".format(table_name),
|
|
"ENGINE=ReplicatedMergeTree('/clickhouse/tables/test/{}', 'node')".format(
|
|
table_name
|
|
),
|
|
"ORDER BY a",
|
|
)
|
|
)
|
|
|
|
node.query(create_query)
|
|
|
|
settings = _query_settings
|
|
_insert_queries_sequentially(
|
|
table_name,
|
|
settings,
|
|
iterations=100,
|
|
max_values_size=1000,
|
|
array_size_range=[10, 50],
|
|
)
|
|
|
|
node.query("DROP TABLE IF EXISTS {}".format(table_name))
|
|
|
|
|
|
def test_with_replicated_merge_tree_multithread():
|
|
thread_num = 15
|
|
table_name = "async_insert_replicated_mt_multithread_table"
|
|
|
|
create_query = " ".join(
|
|
(
|
|
"CREATE TABLE {} (a UInt64, b Array(UInt64))".format(table_name),
|
|
"ENGINE=ReplicatedMergeTree('/clickhouse/tables/test/{}', 'node')".format(
|
|
table_name
|
|
),
|
|
"ORDER BY a",
|
|
)
|
|
)
|
|
|
|
node.query(create_query)
|
|
|
|
_insert_queries_in_parallel(
|
|
table_name,
|
|
_query_settings,
|
|
thread_num=15,
|
|
tasks=1000,
|
|
max_values_size=1000,
|
|
array_size_range=[10, 15],
|
|
)
|
|
|
|
node.query("DROP TABLE IF EXISTS {}".format(table_name))
|
|
|
|
|
|
# Ensure that the combined duration of inserts with adaptive timeouts is less than
|
|
# the combined duration for fixed timeouts.
|
|
def test_compare_sequential_inserts_durations_for_adaptive_and_fixed_async_timeouts():
|
|
fixed_tm_table_name = "async_insert_mt_fixed_async_timeout"
|
|
node.query(
|
|
"CREATE TABLE {} (a UInt64, b Array(UInt64)) ENGINE=MergeTree() ORDER BY a".format(
|
|
fixed_tm_table_name
|
|
)
|
|
)
|
|
|
|
fixed_tm_settings = copy.copy(_query_settings)
|
|
fixed_tm_settings["async_insert_use_adaptive_busy_timeout"] = 0
|
|
fixed_tm_settings["async_insert_busy_timeout_ms"] = 200
|
|
|
|
fixed_tm_run_duration = timeit.timeit(
|
|
lambda: _insert_queries_sequentially(
|
|
fixed_tm_table_name,
|
|
fixed_tm_settings,
|
|
iterations=100,
|
|
max_values_size=1000,
|
|
array_size_range=[10, 50],
|
|
),
|
|
setup="pass",
|
|
number=3,
|
|
)
|
|
|
|
node.query("DROP TABLE IF EXISTS {}".format(fixed_tm_table_name))
|
|
|
|
logging.debug(
|
|
"Run duration with fixed asynchronous timeout is {} seconds".format(
|
|
fixed_tm_run_duration
|
|
)
|
|
)
|
|
|
|
adaptive_tm_table_name = "async_insert_mt_adaptive_async_timeout"
|
|
node.query(
|
|
"CREATE TABLE {} (a UInt64, b Array(UInt64)) ENGINE=MergeTree() ORDER BY a".format(
|
|
adaptive_tm_table_name
|
|
)
|
|
)
|
|
|
|
adaptive_tm_settings = copy.copy(_query_settings)
|
|
adaptive_tm_settings["async_insert_busy_timeout_min_ms"] = 10
|
|
adaptive_tm_settings["async_insert_busy_timeout_max_ms"] = 1000
|
|
|
|
adaptive_tm_run_duration = timeit.timeit(
|
|
lambda: _insert_queries_sequentially(
|
|
adaptive_tm_table_name,
|
|
adaptive_tm_settings,
|
|
iterations=100,
|
|
max_values_size=1000,
|
|
array_size_range=[10, 50],
|
|
),
|
|
setup="pass",
|
|
number=3,
|
|
)
|
|
|
|
logging.debug(
|
|
"Run duration with adaptive asynchronous timeout is {} seconds.".format(
|
|
adaptive_tm_run_duration
|
|
)
|
|
)
|
|
|
|
node.query("DROP TABLE IF EXISTS {}".format(adaptive_tm_table_name))
|
|
|
|
assert adaptive_tm_run_duration <= fixed_tm_run_duration
|
|
|
|
|
|
# Ensure that the combined duration of inserts with adaptive timeouts is less than
|
|
# the combined duration for fixed timeouts.
|
|
def test_compare_parallel_inserts_durations_for_adaptive_and_fixed_async_timeouts():
|
|
fixed_tm_table_name = "async_insert_mt_fixed_async_timeout"
|
|
node.query(
|
|
"CREATE TABLE {} (a UInt64, b Array(UInt64)) ENGINE=MergeTree() ORDER BY a".format(
|
|
fixed_tm_table_name
|
|
)
|
|
)
|
|
|
|
fixed_tm_settings = copy.copy(_query_settings)
|
|
fixed_tm_settings["async_insert_use_adaptive_busy_timeout"] = 0
|
|
fixed_tm_settings["async_insert_busy_timeout_ms"] = 200
|
|
|
|
fixed_tm_run_duration = timeit.timeit(
|
|
lambda: _insert_queries_in_parallel(
|
|
fixed_tm_table_name,
|
|
fixed_tm_settings,
|
|
thread_num=15,
|
|
tasks=1000,
|
|
max_values_size=1000,
|
|
array_size_range=[10, 50],
|
|
),
|
|
setup="pass",
|
|
number=3,
|
|
)
|
|
|
|
node.query("DROP TABLE IF EXISTS {}".format(fixed_tm_table_name))
|
|
|
|
logging.debug(
|
|
"Run duration with fixed asynchronous timeout is {} seconds".format(
|
|
fixed_tm_run_duration
|
|
)
|
|
)
|
|
|
|
adaptive_tm_table_name = "async_insert_mt_adaptive_async_timeout"
|
|
node.query(
|
|
"CREATE TABLE {} (a UInt64, b Array(UInt64)) ENGINE=MergeTree() ORDER BY a".format(
|
|
adaptive_tm_table_name
|
|
)
|
|
)
|
|
|
|
adaptive_tm_settings = copy.copy(_query_settings)
|
|
adaptive_tm_settings["async_insert_busy_timeout_min_ms"] = 10
|
|
adaptive_tm_settings["async_insert_busy_timeout_max_ms"] = 200
|
|
|
|
adaptive_tm_run_duration = timeit.timeit(
|
|
lambda: _insert_queries_in_parallel(
|
|
adaptive_tm_table_name,
|
|
adaptive_tm_settings,
|
|
thread_num=15,
|
|
tasks=100,
|
|
max_values_size=1000,
|
|
array_size_range=[10, 50],
|
|
),
|
|
setup="pass",
|
|
number=3,
|
|
)
|
|
|
|
logging.debug(
|
|
"Run duration with adaptive asynchronous timeout is {} seconds.".format(
|
|
adaptive_tm_run_duration
|
|
)
|
|
)
|
|
|
|
node.query("DROP TABLE IF EXISTS {}".format(adaptive_tm_table_name))
|
|
|
|
assert adaptive_tm_run_duration <= fixed_tm_run_duration
|
|
|
|
|
|
# Ensure that the delay converges to a minimum for sequential inserts and wait_for_async_insert=1.
|
|
def test_change_queries_frequency():
|
|
table_name = "async_insert_mt_change_queries_frequencies"
|
|
|
|
create_query = " ".join(
|
|
(
|
|
"CREATE TABLE {} (a UInt64, b Array(UInt64))".format(table_name),
|
|
"ENGINE=ReplicatedMergeTree('/clickhouse/tables/test_frequencies/{}', 'node')".format(
|
|
table_name
|
|
),
|
|
"ORDER BY a",
|
|
)
|
|
)
|
|
|
|
node.query(create_query)
|
|
|
|
settings = copy.copy(_query_settings)
|
|
min_ms = 50
|
|
settings["async_insert_busy_timeout_min_ms"] = min_ms
|
|
settings["async_insert_busy_timeout_max_ms"] = 2000
|
|
|
|
_insert_queries_in_parallel(
|
|
table_name,
|
|
settings,
|
|
thread_num=15,
|
|
tasks=2000,
|
|
max_values_size=1000,
|
|
array_size_range=[10, 15],
|
|
)
|
|
|
|
_insert_queries_sequentially(
|
|
table_name,
|
|
settings,
|
|
iterations=200,
|
|
max_values_size=1000,
|
|
array_size_range=[10, 50],
|
|
)
|
|
|
|
select_log_query = "SELECT timeout_milliseconds FROM system.asynchronous_insert_log ORDER BY event_time DESC LIMIT 50"
|
|
res = node.query(select_log_query)
|
|
for line in res.splitlines():
|
|
assert int(line) == min_ms
|
|
|
|
node.query("DROP TABLE IF EXISTS {}".format(table_name))
|