ClickHouse/tests/integration/test_insert_into_distributed_sync_async/test.py
Azat Khuzhin c25d6cd624
Rename directory monitor concept into background INSERT (#55978)
* Limit log frequence for "Skipping send data over distributed table" message

After SYSTEM STOP DISTRIBUTED SENDS it will constantly print this
message.

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>

* Rename directory monitor concept into async INSERT

Rename the following query settings (with preserving backward
compatiblity, by keeping old name as an alias):
- distributed_directory_monitor_sleep_time_ms -> distributed_async_insert_sleep_time_ms
- distributed_directory_monitor_max_sleep_time_ms -> distributed_async_insert_max_sleep_time_ms
- distributed_directory_monitor_batch -> distributed_async_insert_batch_inserts
- distributed_directory_monitor_split_batch_on_failure -> distributed_async_insert_split_batch_on_failure

Rename the following table settings (with preserving backward
compatiblity, by keeping old name as an alias):
- monitor_batch_inserts -> async_insert_batch
- monitor_split_batch_on_failure -> async_insert_split_batch_on_failure
- directory_monitor_sleep_time_ms -> async_insert_sleep_time_ms
- directory_monitor_max_sleep_time_ms -> async_insert_max_sleep_time_ms

And also update all the references:

    $ gg -e directory_monitor_ -e monitor_ tests docs | cut -d: -f1 | sort -u | xargs sed -e 's/distributed_directory_monitor_sleep_time_ms/distributed_async_insert_sleep_time_ms/g' -e 's/distributed_directory_monitor_max_sleep_time_ms/distributed_async_insert_max_sleep_time_ms/g' -e 's/distributed_directory_monitor_batch_inserts/distributed_async_insert_batch/g' -e 's/distributed_directory_monitor_split_batch_on_failure/distributed_async_insert_split_batch_on_failure/g' -e 's/monitor_batch_inserts/async_insert_batch/g' -e 's/monitor_split_batch_on_failure/async_insert_split_batch_on_failure/g' -e 's/monitor_sleep_time_ms/async_insert_sleep_time_ms/g' -e 's/monitor_max_sleep_time_ms/async_insert_max_sleep_time_ms/g' -i

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>

* Rename async_insert for Distributed into background_insert

This will avoid amigibuity between general async INSERT's and INSERT
into Distributed, which are indeed background, so new term express it
even better.

Mostly done with:

    $ git di HEAD^ --name-only | xargs sed -i -e 's/distributed_async_insert/distributed_background_insert/g' -e 's/async_insert_batch/background_insert_batch/g' -e 's/async_insert_split_batch_on_failure/background_insert_split_batch_on_failure/g' -e 's/async_insert_sleep_time_ms/background_insert_sleep_time_ms/g' -e 's/async_insert_max_sleep_time_ms/background_insert_max_sleep_time_ms/g'

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>

* Mark 02417_opentelemetry_insert_on_distributed_table as long

CI: https://s3.amazonaws.com/clickhouse-test-reports/55978/7a6abb03a0b507e29e999cb7e04f246a119c6f28/stateless_tests_flaky_check__asan_.html
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>

---------

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2023-11-01 15:09:39 +01:00

162 lines
5.4 KiB
Python
Executable File

import os
import sys
from contextlib import contextmanager
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from helpers.test_tools import TSV
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException, QueryTimeoutExceedException
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", main_configs=["configs/remote_servers.xml"])
node2 = cluster.add_instance("node2", main_configs=["configs/remote_servers.xml"])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for node in (node1, node2):
node.query(
"""
CREATE TABLE local_table(date Date, val UInt64) ENGINE = MergeTree() PARTITION BY toYYYYMM(date) ORDER BY (date, val);
"""
)
node1.query(
"""
CREATE TABLE distributed_table(date Date, val UInt64) ENGINE = Distributed(test_cluster, default, local_table)
"""
)
yield cluster
finally:
cluster.shutdown()
def test_insertion_sync(started_cluster):
node1.query(
"""SET distributed_foreground_insert = 1, distributed_background_insert_timeout = 0;
INSERT INTO distributed_table SELECT today() as date, number as val FROM system.numbers LIMIT 10000"""
)
assert node2.query("SELECT count() FROM local_table").rstrip() == "10000"
node1.query(
"""
SET distributed_foreground_insert = 1, distributed_background_insert_timeout = 1;
INSERT INTO distributed_table SELECT today() - 1 as date, number as val FROM system.numbers LIMIT 10000"""
)
assert node2.query("SELECT count() FROM local_table").rstrip() == "20000"
# Insert with explicitly specified columns.
node1.query(
"""
SET distributed_foreground_insert = 1, distributed_background_insert_timeout = 1;
INSERT INTO distributed_table(date, val) VALUES ('2000-01-01', 100500)"""
)
# Insert with columns specified in different order.
node1.query(
"""
SET distributed_foreground_insert = 1, distributed_background_insert_timeout = 1;
INSERT INTO distributed_table(val, date) VALUES (100500, '2000-01-01')"""
)
# Insert with an incomplete list of columns.
node1.query(
"""
SET distributed_foreground_insert = 1, distributed_background_insert_timeout = 1;
INSERT INTO distributed_table(val) VALUES (100500)"""
)
expected = TSV(
"""
1970-01-01 100500
2000-01-01 100500
2000-01-01 100500"""
)
assert (
TSV(
node2.query(
"SELECT date, val FROM local_table WHERE val = 100500 ORDER BY date"
)
)
== expected
)
node1.query("TRUNCATE TABLE local_table SYNC")
node2.query("TRUNCATE TABLE local_table SYNC")
"""
def test_insertion_sync_fails_on_error(started_cluster):
with PartitionManager() as pm:
pm.partition_instances(node2, node1, action='REJECT --reject-with tcp-reset')
with pytest.raises(QueryRuntimeException):
node1.query('''
SET distributed_foreground_insert = 1, distributed_background_insert_timeout = 0;
INSERT INTO distributed_table SELECT today() as date, number as val FROM system.numbers''', timeout=2)
"""
def test_insertion_sync_fails_with_timeout(started_cluster):
with pytest.raises(QueryRuntimeException):
node1.query(
"""
SET distributed_foreground_insert = 1, distributed_background_insert_timeout = 1;
INSERT INTO distributed_table SELECT today() as date, number as val FROM system.numbers"""
)
def test_insertion_without_sync_ignores_timeout(started_cluster):
with pytest.raises(QueryTimeoutExceedException):
node1.query(
"""
SET distributed_foreground_insert = 0, distributed_background_insert_timeout = 1;
INSERT INTO distributed_table SELECT today() as date, number as val FROM system.numbers""",
timeout=1.5,
)
def test_insertion_sync_with_disabled_timeout(started_cluster):
with pytest.raises(QueryTimeoutExceedException):
node1.query(
"""
SET distributed_foreground_insert = 1, distributed_background_insert_timeout = 0;
INSERT INTO distributed_table SELECT today() as date, number as val FROM system.numbers""",
timeout=1,
)
def test_async_inserts_into_local_shard(started_cluster):
node1.query("""CREATE TABLE shard_local (i Int64) ENGINE = Memory""")
node1.query(
"""CREATE TABLE shard_distributed (i Int64) ENGINE = Distributed(local_shard_with_internal_replication, default, shard_local)"""
)
node1.query(
"""INSERT INTO shard_distributed VALUES (1)""",
settings={"distributed_foreground_insert": 0},
)
assert TSV(node1.query("""SELECT count() FROM shard_distributed""")) == TSV("1\n")
node1.query("""DETACH TABLE shard_distributed""")
node1.query("""ATTACH TABLE shard_distributed""")
assert TSV(node1.query("""SELECT count() FROM shard_distributed""")) == TSV("1\n")
node1.query("""DROP TABLE shard_distributed""")
node1.query("""DROP TABLE shard_local""")
if __name__ == "__main__":
with contextmanager(started_cluster)() as cluster:
for name, instance in list(cluster.instances.items()):
print(name, instance.ip_address)
input("Cluster created, press any key to destroy...")