From 50a312534c946f07547d0b932afb678cf8d615e0 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Tue, 25 Aug 2020 21:42:03 +0300 Subject: [PATCH] Extend parallel_distributed_insert_select to run INSERT into local table Before this patch there was: - parallel_distributed_insert_select=1, that executes: INSERT INTO dist_out SELECT ... FROM underlying_dist_in After this patch there will be: - parallel_distributed_insert_select=2, that executes: INSERT INTO underlying_dist_out SELECT ... FROM underlying_dist_in And cover the behaviour w/o integration test, by using the following techincs: - SYSTEM STOP DISTRIBUTED SENDS - prefer_localhost_replica=0 --- docs/en/operations/settings/settings.md | 6 +- src/Core/Settings.h | 2 +- src/Interpreters/InterpreterInsertQuery.cpp | 10 +++ ...rallel_distributed_insert_select.reference | 25 ++++++ ...099_parallel_distributed_insert_select.sql | 89 +++++++++++++++++++ 5 files changed, 128 insertions(+), 4 deletions(-) diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index c844a88613d..924c9b5fac9 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -1571,15 +1571,15 @@ Default value: 16. ## parallel_distributed_insert_select {#parallel_distributed_insert_select} -Enables parallel distributed `INSERT ... SELECT` query. +Enables parallel distributed `INSERT ... SELECT` query. If we execute `INSERT INTO distributed_table_a SELECT ... FROM distributed_table_b` queries and both tables use the same cluster, and both tables are either [replicated](../../engines/table-engines/mergetree-family/replication.md) or non-replicated, then this query is processed locally on every shard. - Possible values: - 0 — Disabled. -- 1 — Enabled. +- 1 — `SELECT` will be executed on each shard from underlying table of the distributed engine. +- 2 — `SELECT` and `INSERT` will be executed on each shard from/to underlying table of the distributed engine. Default value: 0. diff --git a/src/Core/Settings.h b/src/Core/Settings.h index f7438b356d5..59386fc9c04 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -108,7 +108,7 @@ class IColumn; M(Bool, skip_unavailable_shards, false, "If 1, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.", 0) \ \ M(Bool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.", 0) \ - M(Bool, parallel_distributed_insert_select, false, "If true, distributed insert select query in the same cluster will be processed on local tables on every shard", 0) \ + M(UInt64, parallel_distributed_insert_select, 0, "Process distributed INSERT SELECT query in the same cluster on local tables on every shard, if 1 SELECT is executed on each shard, if 2 SELECT and INSERT is executed on each shard", 0) \ M(Bool, optimize_distributed_group_by_sharding_key, false, "Optimize GROUP BY sharding_key queries (by avodiing costly aggregation on the initiator server).", 0) \ M(Bool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \ M(UInt64, force_optimize_skip_unused_shards, 0, "Throw an exception if unused shards cannot be skipped (1 - throw only if the table has the sharding key, 2 - always throw.", 0) \ diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 31a623e82fd..a32dedb6943 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -29,6 +29,11 @@ #include #include +namespace +{ +const UInt64 PARALLEL_DISTRIBUTED_INSERT_SELECT_ALL = 2; +} + namespace DB { @@ -157,6 +162,11 @@ BlockIO InterpreterInsertQuery::execute() { is_distributed_insert_select = true; + if (settings.parallel_distributed_insert_select == PARALLEL_DISTRIBUTED_INSERT_SELECT_ALL) + { + new_query->table_id = StorageID(storage_dst->getRemoteDatabaseName(), storage_dst->getRemoteTableName()); + } + const auto & cluster = storage_src->getCluster(); const auto & shards_info = cluster->getShardsInfo(); diff --git a/tests/queries/0_stateless/01099_parallel_distributed_insert_select.reference b/tests/queries/0_stateless/01099_parallel_distributed_insert_select.reference index dc58f3ae4a8..ba26f12fddf 100644 --- a/tests/queries/0_stateless/01099_parallel_distributed_insert_select.reference +++ b/tests/queries/0_stateless/01099_parallel_distributed_insert_select.reference @@ -1,3 +1,4 @@ +parallel_distributed_insert_select=1 test_shard_localhost 0 1 @@ -7,6 +8,30 @@ test_cluster_two_shards_localhost 1 2 2 2 test_cluster_two_shards +distributed +local +0 2 +1 2 +2 2 +distributed +0 4 +1 4 +2 4 +parallel_distributed_insert_select=2 +test_shard_localhost +0 +1 +2 +test_cluster_two_shards_localhost +0 2 +1 2 +2 2 +test_cluster_two_shards +distributed +0 4 +1 4 +2 4 +local 0 2 1 2 2 2 diff --git a/tests/queries/0_stateless/01099_parallel_distributed_insert_select.sql b/tests/queries/0_stateless/01099_parallel_distributed_insert_select.sql index 6e2146ad68c..4ae655b1ec9 100644 --- a/tests/queries/0_stateless/01099_parallel_distributed_insert_select.sql +++ b/tests/queries/0_stateless/01099_parallel_distributed_insert_select.sql @@ -4,6 +4,7 @@ DROP TABLE IF EXISTS distributed_01099_a; DROP TABLE IF EXISTS distributed_01099_b; SET parallel_distributed_insert_select=1; +SELECT 'parallel_distributed_insert_select=1'; -- -- test_shard_localhost @@ -58,10 +59,98 @@ CREATE TABLE local_01099_b (number UInt64) ENGINE = Log; CREATE TABLE distributed_01099_a AS local_01099_a ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), local_01099_a, rand()); CREATE TABLE distributed_01099_b AS local_01099_b ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), local_01099_b, rand()); +SYSTEM STOP DISTRIBUTED SENDS distributed_01099_b; +SET prefer_localhost_replica=0; -- to require distributed send for local replica too INSERT INTO local_01099_a SELECT number from system.numbers limit 3; INSERT INTO distributed_01099_b SELECT * from distributed_01099_a; +SET prefer_localhost_replica=1; + +-- distributed sends disabled, 0 rows (since parallel_distributed_insert_select=1) +SELECT 'distributed'; +SELECT number, count(number) FROM distributed_01099_b group by number order by number; SYSTEM FLUSH DISTRIBUTED distributed_01099_b; +SELECT 'local'; +SELECT number, count(number) FROM local_01099_b group by number order by number; +SELECT 'distributed'; +SELECT number, count(number) FROM distributed_01099_b group by number order by number; + +DROP TABLE local_01099_a; +DROP TABLE local_01099_b; +DROP TABLE distributed_01099_a; +DROP TABLE distributed_01099_b; + + +SET parallel_distributed_insert_select=2; +SELECT 'parallel_distributed_insert_select=2'; + +-- +-- test_shard_localhost +-- + +SELECT 'test_shard_localhost'; + +CREATE TABLE local_01099_a (number UInt64) ENGINE = Log; +CREATE TABLE local_01099_b (number UInt64) ENGINE = Log; +CREATE TABLE distributed_01099_a AS local_01099_a ENGINE = Distributed('test_shard_localhost', currentDatabase(), local_01099_a, rand()); +CREATE TABLE distributed_01099_b AS local_01099_b ENGINE = Distributed('test_shard_localhost', currentDatabase(), local_01099_b, rand()); + +INSERT INTO local_01099_a SELECT number from system.numbers limit 3; +INSERT INTO distributed_01099_b SELECT * from distributed_01099_a; + +SELECT * FROM distributed_01099_b; + +DROP TABLE local_01099_a; +DROP TABLE local_01099_b; +DROP TABLE distributed_01099_a; +DROP TABLE distributed_01099_b; + +-- +-- test_cluster_two_shards_localhost +-- + +SELECT 'test_cluster_two_shards_localhost'; + +-- Log engine will lead to deadlock: +-- DB::Exception: std::system_error: Resource deadlock avoided. +-- So use MergeTree instead. +CREATE TABLE local_01099_a (number UInt64) ENGINE = MergeTree() ORDER BY number; +CREATE TABLE local_01099_b (number UInt64) ENGINE = MergeTree() ORDER BY number; +CREATE TABLE distributed_01099_a AS local_01099_a ENGINE = Distributed('test_cluster_two_shards_localhost', currentDatabase(), local_01099_a, rand()); +CREATE TABLE distributed_01099_b AS local_01099_b ENGINE = Distributed('test_cluster_two_shards_localhost', currentDatabase(), local_01099_b, rand()); + +INSERT INTO local_01099_a SELECT number from system.numbers limit 3; +INSERT INTO distributed_01099_b SELECT * from distributed_01099_a; + +SELECT number, count(number) FROM local_01099_b group by number order by number; + +DROP TABLE local_01099_a; +DROP TABLE local_01099_b; +DROP TABLE distributed_01099_a; +DROP TABLE distributed_01099_b; + +-- +-- test_cluster_two_shards +-- + +SELECT 'test_cluster_two_shards'; + +CREATE TABLE local_01099_a (number UInt64) ENGINE = MergeTree() ORDER BY number; +CREATE TABLE local_01099_b (number UInt64) ENGINE = MergeTree() ORDER BY number; +CREATE TABLE distributed_01099_a AS local_01099_a ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), local_01099_a, rand()); +CREATE TABLE distributed_01099_b AS local_01099_b ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), local_01099_b, rand()); + +SYSTEM STOP DISTRIBUTED SENDS distributed_01099_b; +SET prefer_localhost_replica=0; -- to require distributed send for local replica too +INSERT INTO local_01099_a SELECT number from system.numbers limit 3; +INSERT INTO distributed_01099_b SELECT * from distributed_01099_a; +SET prefer_localhost_replica=1; + +-- distributed sends disabled, but they are not required, since insert is done into local table. +-- (since parallel_distributed_insert_select=2) +SELECT 'distributed'; +SELECT number, count(number) FROM distributed_01099_b group by number order by number; +SELECT 'local'; SELECT number, count(number) FROM local_01099_b group by number order by number; DROP TABLE local_01099_a;