Improve tests for Distributed INSERT

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
AVMusorin 2023-02-10 13:55:04 +01:00 committed by Azat Khuzhin
parent d06a4b50d6
commit 99329d8682
7 changed files with 135 additions and 48 deletions

View File

@ -3,8 +3,6 @@
<default>
<!-- always send via network -->
<prefer_localhost_replica>0</prefer_localhost_replica>
<!-- enable batching to check splitting -->
<distributed_directory_monitor_batch_inserts>1</distributed_directory_monitor_batch_inserts>
<!-- override defaults just in case they will be changed -->
<distributed_directory_monitor_split_batch_on_failure>1</distributed_directory_monitor_split_batch_on_failure>
<!-- wait for explicit flush -->

View File

@ -3,8 +3,6 @@
<default>
<!-- always send via network -->
<prefer_localhost_replica>0</prefer_localhost_replica>
<!-- enable batching to check splitting -->
<distributed_directory_monitor_batch_inserts>1</distributed_directory_monitor_batch_inserts>
<!-- disable -->
<distributed_directory_monitor_split_batch_on_failure>0</distributed_directory_monitor_split_batch_on_failure>
<!-- wait for explicit flush -->

View File

@ -18,61 +18,86 @@ node2 = cluster.add_instance(
)
def get_test_settings():
settings = {"monitor_batch_inserts": [0, 1]}
return [(k, v) for k, values in settings.items() for v in values]
def drop_tables():
tables = ["null_", "dist", "data", "mv", "dist_data"]
query = "\n".join([f"drop table if exists {table};" for table in tables])
for _, node in cluster.instances.items():
node.query(query)
def create_tables(**dist_settings):
drop_tables()
_settings_values = ",".join([f"{k}={v}" for k, v in dist_settings.items()])
_settings = f"settings {_settings_values}" if _settings_values else ""
for _, node in cluster.instances.items():
node.query(
f"""
create table null_ (key Int, value Int) engine=Null();
create table dist as null_ engine=Distributed(test_cluster, currentDatabase(), null_, key) {_settings};
create table data (key Int, uniq_values Int) engine=Memory();
create materialized view mv to data as select key, uniqExact(value) uniq_values from null_ group by key;
system stop distributed sends dist;
create table dist_data as data engine=Distributed(test_cluster, currentDatabase(), data);
"""
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for _, node in cluster.instances.items():
node.query(
"""
create table null_ (key Int, value Int) engine=Null();
create table dist as null_ engine=Distributed(test_cluster, currentDatabase(), null_, key);
create table data (key Int, uniq_values Int) engine=Memory();
create materialized view mv to data as select key, uniqExact(value) uniq_values from null_ group by key;
system stop distributed sends dist;
create table dist_data as data engine=Distributed(test_cluster, currentDatabase(), data);
"""
)
yield cluster
finally:
drop_tables()
cluster.shutdown()
def test_distributed_directory_monitor_split_batch_on_failure_OFF(started_cluster):
for i in range(0, 100):
limit = 100e3
node2.query(
f"insert into dist select number/100, number from system.numbers limit {limit} offset {limit*i}",
settings={
# max_memory_usage is the limit for the batch on the remote node
# (local query should not be affected since 30MB is enough for 100K rows)
"max_memory_usage": "30Mi",
"max_untracked_memory": "0",
},
)
# "Received from" is mandatory, since the exception should be thrown on the remote node.
with pytest.raises(
QueryRuntimeException,
match=r"DB::Exception: Received from.*Memory limit \(for query\) exceeded: .*while pushing to view default\.mv",
):
for setting, setting_value in get_test_settings():
create_tables(**{setting: setting_value})
for i in range(0, 100):
limit = 100e3
node2.query(
f"insert into dist select number/100, number from system.numbers limit {limit} offset {limit*i}",
settings={
# max_memory_usage is the limit for the batch on the remote node
# (local query should not be affected since 30MB is enough for 100K rows)
"max_memory_usage": "30Mi",
"max_untracked_memory": "0",
},
)
# "Received from" is mandatory, since the exception should be thrown on the remote node.
if setting == "monitor_batch_inserts" and setting_value == 1:
with pytest.raises(
QueryRuntimeException,
match=r"DB::Exception: Received from.*Memory limit \(for query\) exceeded: .*while pushing to view default\.mv",
):
node2.query("system flush distributed dist")
assert int(node2.query("select count() from dist_data")) == 0
continue
node2.query("system flush distributed dist")
assert int(node2.query("select count() from dist_data")) == 0
assert int(node2.query("select count() from dist_data")) == 100000
def test_distributed_directory_monitor_split_batch_on_failure_ON(started_cluster):
for i in range(0, 100):
limit = 100e3
node1.query(
f"insert into dist select number/100, number from system.numbers limit {limit} offset {limit*i}",
settings={
# max_memory_usage is the limit for the batch on the remote node
# (local query should not be affected since 30MB is enough for 100K rows)
"max_memory_usage": "30Mi",
"max_untracked_memory": "0",
},
)
node1.query("system flush distributed dist")
assert int(node1.query("select count() from dist_data")) == 100000
for setting, setting_value in get_test_settings():
create_tables(**{setting: setting_value})
for i in range(0, 100):
limit = 100e3
node1.query(
f"insert into dist select number/100, number from system.numbers limit {limit} offset {limit*i}",
settings={
# max_memory_usage is the limit for the batch on the remote node
# (local query should not be affected since 30MB is enough for 100K rows)
"max_memory_usage": "30Mi",
"max_untracked_memory": "0",
},
)
node1.query("system flush distributed dist")
assert int(node1.query("select count() from dist_data")) == 100000

View File

@ -0,0 +1,2 @@
0 0
10 20

View File

@ -0,0 +1,16 @@
-- test detach distributed table with pending files
CREATE TABLE test_02536 (n Int8) ENGINE=MergeTree() ORDER BY tuple();
CREATE TABLE test_dist_02536 (n Int8) ENGINE=Distributed(test_cluster_two_shards, currentDatabase(), test_02536, rand());
SYSTEM STOP DISTRIBUTED SENDS test_dist_02536;
INSERT INTO test_dist_02536 SELECT number FROM numbers(5) SETTINGS prefer_localhost_replica=0;
SELECT count(n), sum(n) FROM test_dist_02536; -- 0 0
DETACH TABLE test_dist_02536;
ATTACH TABLE test_dist_02536;
SYSTEM FLUSH DISTRIBUTED test_dist_02536;
SELECT count(n), sum(n) FROM test_dist_02536; -- 10 20
DROP TABLE test_02536;
DROP TABLE test_dist_02536;

View File

@ -0,0 +1,16 @@
monitor_batch_insert=0
1 2
1 0
-- { echoOn }
SELECT sum(key), count(key) FROM dist;
2 2
SELECT sum(key), count(key) FROM underlying;
2 2
monitor_batch_insert=1
1 2
1 0
-- { echoOn }
SELECT sum(key), count(key) FROM dist;
2 2
SELECT sum(key), count(key) FROM underlying;
2 2

View File

@ -0,0 +1,32 @@
{% for setting in [0, 1] %}
-- Testing that distributed table doesn't loose file after inserts which contain errors
SELECT 'monitor_batch_insert={{ setting }}';
DROP TABLE IF EXISTS dist;
DROP TABLE IF EXISTS underlying;
CREATE TABLE dist (key Int) ENGINE=Distributed(test_shard_localhost, currentDatabase(), underlying) SETTINGS monitor_batch_inserts={{ setting }};
SYSTEM STOP DISTRIBUTED SENDS dist;
INSERT INTO dist SETTINGS prefer_localhost_replica=0, max_threads=1 VALUES (1);
INSERT INTO dist SETTINGS prefer_localhost_replica=0, max_threads=2 VALUES (1);
SYSTEM FLUSH DISTRIBUTED dist; -- { serverError UNKNOWN_TABLE }
-- check the second since after using queue it may got lost from it
SYSTEM FLUSH DISTRIBUTED dist; -- { serverError UNKNOWN_TABLE }
SELECT is_blocked, data_files FROM system.distribution_queue WHERE database = currentDatabase() AND table = 'dist';
CREATE TABLE underlying (key Int) ENGINE=Memory();
SYSTEM FLUSH DISTRIBUTED dist;
-- all data should be flushed
SELECT is_blocked, data_files FROM system.distribution_queue WHERE database = currentDatabase() AND table = 'dist';
-- { echoOn }
SELECT sum(key), count(key) FROM dist;
SELECT sum(key), count(key) FROM underlying;
-- { echoOff }
{% endfor %}