diff --git a/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/configs/overrides_1.xml b/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/configs/overrides_1.xml index 397e05e7a60..a79ce3de1fc 100644 --- a/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/configs/overrides_1.xml +++ b/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/configs/overrides_1.xml @@ -3,8 +3,6 @@ 0 - - 1 1 diff --git a/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/configs/overrides_2.xml b/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/configs/overrides_2.xml index 2ffd5beaf8d..8279fcdbe6d 100644 --- a/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/configs/overrides_2.xml +++ b/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/configs/overrides_2.xml @@ -3,8 +3,6 @@ 0 - - 1 0 diff --git a/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/test.py b/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/test.py index a47268b06fd..faa38af6533 100644 --- a/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/test.py +++ b/tests/integration/test_distributed_directory_monitor_split_batch_on_failure/test.py @@ -18,61 +18,86 @@ node2 = cluster.add_instance( ) +def get_test_settings(): + settings = {"monitor_batch_inserts": [0, 1]} + return [(k, v) for k, values in settings.items() for v in values] + + +def drop_tables(): + tables = ["null_", "dist", "data", "mv", "dist_data"] + query = "\n".join([f"drop table if exists {table};" for table in tables]) + for _, node in cluster.instances.items(): + node.query(query) + + +def create_tables(**dist_settings): + drop_tables() + _settings_values = ",".join([f"{k}={v}" for k, v in dist_settings.items()]) + _settings = f"settings {_settings_values}" if _settings_values else "" + for _, node in cluster.instances.items(): + node.query( + f""" + create table null_ (key Int, value Int) engine=Null(); + create table dist as null_ engine=Distributed(test_cluster, currentDatabase(), null_, key) {_settings}; + create table data (key Int, uniq_values Int) engine=Memory(); + create materialized view mv to data as select key, uniqExact(value) uniq_values from null_ group by key; + system stop distributed sends dist; + + create table dist_data as data engine=Distributed(test_cluster, currentDatabase(), data); + """ + ) + + @pytest.fixture(scope="module") def started_cluster(): try: cluster.start() - - for _, node in cluster.instances.items(): - node.query( - """ - create table null_ (key Int, value Int) engine=Null(); - create table dist as null_ engine=Distributed(test_cluster, currentDatabase(), null_, key); - create table data (key Int, uniq_values Int) engine=Memory(); - create materialized view mv to data as select key, uniqExact(value) uniq_values from null_ group by key; - system stop distributed sends dist; - - create table dist_data as data engine=Distributed(test_cluster, currentDatabase(), data); - """ - ) - yield cluster finally: + drop_tables() cluster.shutdown() def test_distributed_directory_monitor_split_batch_on_failure_OFF(started_cluster): - for i in range(0, 100): - limit = 100e3 - node2.query( - f"insert into dist select number/100, number from system.numbers limit {limit} offset {limit*i}", - settings={ - # max_memory_usage is the limit for the batch on the remote node - # (local query should not be affected since 30MB is enough for 100K rows) - "max_memory_usage": "30Mi", - "max_untracked_memory": "0", - }, - ) - # "Received from" is mandatory, since the exception should be thrown on the remote node. - with pytest.raises( - QueryRuntimeException, - match=r"DB::Exception: Received from.*Memory limit \(for query\) exceeded: .*while pushing to view default\.mv", - ): + for setting, setting_value in get_test_settings(): + create_tables(**{setting: setting_value}) + for i in range(0, 100): + limit = 100e3 + node2.query( + f"insert into dist select number/100, number from system.numbers limit {limit} offset {limit*i}", + settings={ + # max_memory_usage is the limit for the batch on the remote node + # (local query should not be affected since 30MB is enough for 100K rows) + "max_memory_usage": "30Mi", + "max_untracked_memory": "0", + }, + ) + # "Received from" is mandatory, since the exception should be thrown on the remote node. + if setting == "monitor_batch_inserts" and setting_value == 1: + with pytest.raises( + QueryRuntimeException, + match=r"DB::Exception: Received from.*Memory limit \(for query\) exceeded: .*while pushing to view default\.mv", + ): + node2.query("system flush distributed dist") + assert int(node2.query("select count() from dist_data")) == 0 + continue node2.query("system flush distributed dist") - assert int(node2.query("select count() from dist_data")) == 0 + assert int(node2.query("select count() from dist_data")) == 100000 def test_distributed_directory_monitor_split_batch_on_failure_ON(started_cluster): - for i in range(0, 100): - limit = 100e3 - node1.query( - f"insert into dist select number/100, number from system.numbers limit {limit} offset {limit*i}", - settings={ - # max_memory_usage is the limit for the batch on the remote node - # (local query should not be affected since 30MB is enough for 100K rows) - "max_memory_usage": "30Mi", - "max_untracked_memory": "0", - }, - ) - node1.query("system flush distributed dist") - assert int(node1.query("select count() from dist_data")) == 100000 + for setting, setting_value in get_test_settings(): + create_tables(**{setting: setting_value}) + for i in range(0, 100): + limit = 100e3 + node1.query( + f"insert into dist select number/100, number from system.numbers limit {limit} offset {limit*i}", + settings={ + # max_memory_usage is the limit for the batch on the remote node + # (local query should not be affected since 30MB is enough for 100K rows) + "max_memory_usage": "30Mi", + "max_untracked_memory": "0", + }, + ) + node1.query("system flush distributed dist") + assert int(node1.query("select count() from dist_data")) == 100000 diff --git a/tests/queries/0_stateless/02536_distributed_detach_table.reference b/tests/queries/0_stateless/02536_distributed_detach_table.reference new file mode 100644 index 00000000000..f09bace4421 --- /dev/null +++ b/tests/queries/0_stateless/02536_distributed_detach_table.reference @@ -0,0 +1,2 @@ +0 0 +10 20 diff --git a/tests/queries/0_stateless/02536_distributed_detach_table.sql b/tests/queries/0_stateless/02536_distributed_detach_table.sql new file mode 100644 index 00000000000..92bee1ee544 --- /dev/null +++ b/tests/queries/0_stateless/02536_distributed_detach_table.sql @@ -0,0 +1,16 @@ +-- test detach distributed table with pending files +CREATE TABLE test_02536 (n Int8) ENGINE=MergeTree() ORDER BY tuple(); +CREATE TABLE test_dist_02536 (n Int8) ENGINE=Distributed(test_cluster_two_shards, currentDatabase(), test_02536, rand()); +SYSTEM STOP DISTRIBUTED SENDS test_dist_02536; + +INSERT INTO test_dist_02536 SELECT number FROM numbers(5) SETTINGS prefer_localhost_replica=0; +SELECT count(n), sum(n) FROM test_dist_02536; -- 0 0 + +DETACH TABLE test_dist_02536; +ATTACH TABLE test_dist_02536; + +SYSTEM FLUSH DISTRIBUTED test_dist_02536; + +SELECT count(n), sum(n) FROM test_dist_02536; -- 10 20 +DROP TABLE test_02536; +DROP TABLE test_dist_02536; diff --git a/tests/queries/0_stateless/02537_distributed_loosing_files_after_exception.reference b/tests/queries/0_stateless/02537_distributed_loosing_files_after_exception.reference new file mode 100644 index 00000000000..7793e91fcb6 --- /dev/null +++ b/tests/queries/0_stateless/02537_distributed_loosing_files_after_exception.reference @@ -0,0 +1,16 @@ +monitor_batch_insert=0 +1 2 +1 0 +-- { echoOn } +SELECT sum(key), count(key) FROM dist; +2 2 +SELECT sum(key), count(key) FROM underlying; +2 2 +monitor_batch_insert=1 +1 2 +1 0 +-- { echoOn } +SELECT sum(key), count(key) FROM dist; +2 2 +SELECT sum(key), count(key) FROM underlying; +2 2 diff --git a/tests/queries/0_stateless/02537_distributed_loosing_files_after_exception.sql.j2 b/tests/queries/0_stateless/02537_distributed_loosing_files_after_exception.sql.j2 new file mode 100644 index 00000000000..4f8cf1ccffe --- /dev/null +++ b/tests/queries/0_stateless/02537_distributed_loosing_files_after_exception.sql.j2 @@ -0,0 +1,32 @@ +{% for setting in [0, 1] %} +-- Testing that distributed table doesn't loose file after inserts which contain errors + +SELECT 'monitor_batch_insert={{ setting }}'; + +DROP TABLE IF EXISTS dist; +DROP TABLE IF EXISTS underlying; + +CREATE TABLE dist (key Int) ENGINE=Distributed(test_shard_localhost, currentDatabase(), underlying) SETTINGS monitor_batch_inserts={{ setting }}; +SYSTEM STOP DISTRIBUTED SENDS dist; + +INSERT INTO dist SETTINGS prefer_localhost_replica=0, max_threads=1 VALUES (1); +INSERT INTO dist SETTINGS prefer_localhost_replica=0, max_threads=2 VALUES (1); + +SYSTEM FLUSH DISTRIBUTED dist; -- { serverError UNKNOWN_TABLE } +-- check the second since after using queue it may got lost from it +SYSTEM FLUSH DISTRIBUTED dist; -- { serverError UNKNOWN_TABLE } + +SELECT is_blocked, data_files FROM system.distribution_queue WHERE database = currentDatabase() AND table = 'dist'; + +CREATE TABLE underlying (key Int) ENGINE=Memory(); +SYSTEM FLUSH DISTRIBUTED dist; + +-- all data should be flushed +SELECT is_blocked, data_files FROM system.distribution_queue WHERE database = currentDatabase() AND table = 'dist'; + +-- { echoOn } +SELECT sum(key), count(key) FROM dist; +SELECT sum(key), count(key) FROM underlying; +-- { echoOff } + +{% endfor %}