Extend parallel_distributed_insert_select to run INSERT into local table

Before this patch there was:

- parallel_distributed_insert_select=1, that executes:

    INSERT INTO dist_out SELECT ... FROM underlying_dist_in

After this patch there will be:

- parallel_distributed_insert_select=2, that executes:

    INSERT INTO underlying_dist_out SELECT ... FROM underlying_dist_in

And cover the behaviour w/o integration test, by using the following
techincs:
- SYSTEM STOP DISTRIBUTED SENDS
- prefer_localhost_replica=0
This commit is contained in:
Azat Khuzhin 2020-08-25 21:42:03 +03:00
parent 0b07121177
commit 50a312534c
5 changed files with 128 additions and 4 deletions

View File

@ -1575,11 +1575,11 @@ Enables parallel distributed `INSERT ... SELECT` query.
If we execute `INSERT INTO distributed_table_a SELECT ... FROM distributed_table_b` queries and both tables use the same cluster, and both tables are either [replicated](../../engines/table-engines/mergetree-family/replication.md) or non-replicated, then this query is processed locally on every shard. If we execute `INSERT INTO distributed_table_a SELECT ... FROM distributed_table_b` queries and both tables use the same cluster, and both tables are either [replicated](../../engines/table-engines/mergetree-family/replication.md) or non-replicated, then this query is processed locally on every shard.
Possible values: Possible values:
- 0 — Disabled. - 0 — Disabled.
- 1 — Enabled. - 1 — `SELECT` will be executed on each shard from underlying table of the distributed engine.
- 2 — `SELECT` and `INSERT` will be executed on each shard from/to underlying table of the distributed engine.
Default value: 0. Default value: 0.

View File

@ -108,7 +108,7 @@ class IColumn;
M(Bool, skip_unavailable_shards, false, "If 1, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.", 0) \ M(Bool, skip_unavailable_shards, false, "If 1, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.", 0) \
\ \
M(Bool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.", 0) \ M(Bool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.", 0) \
M(Bool, parallel_distributed_insert_select, false, "If true, distributed insert select query in the same cluster will be processed on local tables on every shard", 0) \ M(UInt64, parallel_distributed_insert_select, 0, "Process distributed INSERT SELECT query in the same cluster on local tables on every shard, if 1 SELECT is executed on each shard, if 2 SELECT and INSERT is executed on each shard", 0) \
M(Bool, optimize_distributed_group_by_sharding_key, false, "Optimize GROUP BY sharding_key queries (by avodiing costly aggregation on the initiator server).", 0) \ M(Bool, optimize_distributed_group_by_sharding_key, false, "Optimize GROUP BY sharding_key queries (by avodiing costly aggregation on the initiator server).", 0) \
M(Bool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \ M(Bool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \
M(UInt64, force_optimize_skip_unused_shards, 0, "Throw an exception if unused shards cannot be skipped (1 - throw only if the table has the sharding key, 2 - always throw.", 0) \ M(UInt64, force_optimize_skip_unused_shards, 0, "Throw an exception if unused shards cannot be skipped (1 - throw only if the table has the sharding key, 2 - always throw.", 0) \

View File

@ -29,6 +29,11 @@
#include <Processors/Transforms/ConvertingTransform.h> #include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Sources/SinkToOutputStream.h> #include <Processors/Sources/SinkToOutputStream.h>
namespace
{
const UInt64 PARALLEL_DISTRIBUTED_INSERT_SELECT_ALL = 2;
}
namespace DB namespace DB
{ {
@ -157,6 +162,11 @@ BlockIO InterpreterInsertQuery::execute()
{ {
is_distributed_insert_select = true; is_distributed_insert_select = true;
if (settings.parallel_distributed_insert_select == PARALLEL_DISTRIBUTED_INSERT_SELECT_ALL)
{
new_query->table_id = StorageID(storage_dst->getRemoteDatabaseName(), storage_dst->getRemoteTableName());
}
const auto & cluster = storage_src->getCluster(); const auto & cluster = storage_src->getCluster();
const auto & shards_info = cluster->getShardsInfo(); const auto & shards_info = cluster->getShardsInfo();

View File

@ -1,3 +1,4 @@
parallel_distributed_insert_select=1
test_shard_localhost test_shard_localhost
0 0
1 1
@ -7,6 +8,30 @@ test_cluster_two_shards_localhost
1 2 1 2
2 2 2 2
test_cluster_two_shards test_cluster_two_shards
distributed
local
0 2
1 2
2 2
distributed
0 4
1 4
2 4
parallel_distributed_insert_select=2
test_shard_localhost
0
1
2
test_cluster_two_shards_localhost
0 2
1 2
2 2
test_cluster_two_shards
distributed
0 4
1 4
2 4
local
0 2 0 2
1 2 1 2
2 2 2 2

View File

@ -4,6 +4,7 @@ DROP TABLE IF EXISTS distributed_01099_a;
DROP TABLE IF EXISTS distributed_01099_b; DROP TABLE IF EXISTS distributed_01099_b;
SET parallel_distributed_insert_select=1; SET parallel_distributed_insert_select=1;
SELECT 'parallel_distributed_insert_select=1';
-- --
-- test_shard_localhost -- test_shard_localhost
@ -58,10 +59,98 @@ CREATE TABLE local_01099_b (number UInt64) ENGINE = Log;
CREATE TABLE distributed_01099_a AS local_01099_a ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), local_01099_a, rand()); CREATE TABLE distributed_01099_a AS local_01099_a ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), local_01099_a, rand());
CREATE TABLE distributed_01099_b AS local_01099_b ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), local_01099_b, rand()); CREATE TABLE distributed_01099_b AS local_01099_b ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), local_01099_b, rand());
SYSTEM STOP DISTRIBUTED SENDS distributed_01099_b;
SET prefer_localhost_replica=0; -- to require distributed send for local replica too
INSERT INTO local_01099_a SELECT number from system.numbers limit 3; INSERT INTO local_01099_a SELECT number from system.numbers limit 3;
INSERT INTO distributed_01099_b SELECT * from distributed_01099_a; INSERT INTO distributed_01099_b SELECT * from distributed_01099_a;
SET prefer_localhost_replica=1;
-- distributed sends disabled, 0 rows (since parallel_distributed_insert_select=1)
SELECT 'distributed';
SELECT number, count(number) FROM distributed_01099_b group by number order by number;
SYSTEM FLUSH DISTRIBUTED distributed_01099_b; SYSTEM FLUSH DISTRIBUTED distributed_01099_b;
SELECT 'local';
SELECT number, count(number) FROM local_01099_b group by number order by number;
SELECT 'distributed';
SELECT number, count(number) FROM distributed_01099_b group by number order by number;
DROP TABLE local_01099_a;
DROP TABLE local_01099_b;
DROP TABLE distributed_01099_a;
DROP TABLE distributed_01099_b;
SET parallel_distributed_insert_select=2;
SELECT 'parallel_distributed_insert_select=2';
--
-- test_shard_localhost
--
SELECT 'test_shard_localhost';
CREATE TABLE local_01099_a (number UInt64) ENGINE = Log;
CREATE TABLE local_01099_b (number UInt64) ENGINE = Log;
CREATE TABLE distributed_01099_a AS local_01099_a ENGINE = Distributed('test_shard_localhost', currentDatabase(), local_01099_a, rand());
CREATE TABLE distributed_01099_b AS local_01099_b ENGINE = Distributed('test_shard_localhost', currentDatabase(), local_01099_b, rand());
INSERT INTO local_01099_a SELECT number from system.numbers limit 3;
INSERT INTO distributed_01099_b SELECT * from distributed_01099_a;
SELECT * FROM distributed_01099_b;
DROP TABLE local_01099_a;
DROP TABLE local_01099_b;
DROP TABLE distributed_01099_a;
DROP TABLE distributed_01099_b;
--
-- test_cluster_two_shards_localhost
--
SELECT 'test_cluster_two_shards_localhost';
-- Log engine will lead to deadlock:
-- DB::Exception: std::system_error: Resource deadlock avoided.
-- So use MergeTree instead.
CREATE TABLE local_01099_a (number UInt64) ENGINE = MergeTree() ORDER BY number;
CREATE TABLE local_01099_b (number UInt64) ENGINE = MergeTree() ORDER BY number;
CREATE TABLE distributed_01099_a AS local_01099_a ENGINE = Distributed('test_cluster_two_shards_localhost', currentDatabase(), local_01099_a, rand());
CREATE TABLE distributed_01099_b AS local_01099_b ENGINE = Distributed('test_cluster_two_shards_localhost', currentDatabase(), local_01099_b, rand());
INSERT INTO local_01099_a SELECT number from system.numbers limit 3;
INSERT INTO distributed_01099_b SELECT * from distributed_01099_a;
SELECT number, count(number) FROM local_01099_b group by number order by number;
DROP TABLE local_01099_a;
DROP TABLE local_01099_b;
DROP TABLE distributed_01099_a;
DROP TABLE distributed_01099_b;
--
-- test_cluster_two_shards
--
SELECT 'test_cluster_two_shards';
CREATE TABLE local_01099_a (number UInt64) ENGINE = MergeTree() ORDER BY number;
CREATE TABLE local_01099_b (number UInt64) ENGINE = MergeTree() ORDER BY number;
CREATE TABLE distributed_01099_a AS local_01099_a ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), local_01099_a, rand());
CREATE TABLE distributed_01099_b AS local_01099_b ENGINE = Distributed('test_cluster_two_shards', currentDatabase(), local_01099_b, rand());
SYSTEM STOP DISTRIBUTED SENDS distributed_01099_b;
SET prefer_localhost_replica=0; -- to require distributed send for local replica too
INSERT INTO local_01099_a SELECT number from system.numbers limit 3;
INSERT INTO distributed_01099_b SELECT * from distributed_01099_a;
SET prefer_localhost_replica=1;
-- distributed sends disabled, but they are not required, since insert is done into local table.
-- (since parallel_distributed_insert_select=2)
SELECT 'distributed';
SELECT number, count(number) FROM distributed_01099_b group by number order by number;
SELECT 'local';
SELECT number, count(number) FROM local_01099_b group by number order by number; SELECT number, count(number) FROM local_01099_b group by number order by number;
DROP TABLE local_01099_a; DROP TABLE local_01099_a;