Merge pull request #16358 from azat/dist-insert-fix

Fix async Distributed INSERT w/ prefer_localhost_replica=0 and internal_replication
This commit is contained in:
alexey-milovidov 2020-10-26 05:07:35 +03:00 committed by GitHub
commit bfe17d7dca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 75 additions and 4 deletions

View File

@ -392,6 +392,22 @@
</replica>
</shard>
</test_cluster_two_shards>
<test_cluster_two_shards_internal_replication>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>127.0.0.2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_two_shards_internal_replication>
<test_shard_localhost_secure>
<shard>
<replica>

View File

@ -614,14 +614,19 @@ const std::string & Cluster::ShardInfo::pathForInsert(bool prefer_localhost_repl
if (!has_internal_replication)
throw Exception("internal_replication is not set", ErrorCodes::LOGICAL_ERROR);
if (dir_name_for_internal_replication.empty() || dir_name_for_internal_replication_with_local.empty())
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
if (prefer_localhost_replica)
{
if (dir_name_for_internal_replication.empty())
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
return dir_name_for_internal_replication;
}
else
{
if (dir_name_for_internal_replication_with_local.empty())
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
return dir_name_for_internal_replication_with_local;
}
}
bool Cluster::maybeCrossReplication() const
{

View File

@ -1,4 +1,20 @@
test_cluster_two_shards prefer_localhost_replica=0
0
0
1
1
test_cluster_two_shards prefer_localhost_replica=1
0
0
1
1
test_cluster_two_shards_internal_replication prefer_localhost_replica=0
0
0
1
1
test_cluster_two_shards_internal_replication prefer_localhost_replica=1
0
0
1
1

View File

@ -7,6 +7,40 @@ DROP TABLE IF EXISTS dist_test_01040;
CREATE TABLE test_01040 (key UInt64) ENGINE=TinyLog();
CREATE TABLE dist_test_01040 AS test_01040 Engine=Distributed(test_cluster_two_shards, currentDatabase(), test_01040, key);
-- internal_replication=false
SELECT 'test_cluster_two_shards prefer_localhost_replica=0';
SET prefer_localhost_replica=0;
INSERT INTO dist_test_01040 SELECT toUInt64(number) FROM numbers(2);
SYSTEM FLUSH DISTRIBUTED dist_test_01040;
SELECT * FROM dist_test_01040;
SELECT * FROM dist_test_01040 ORDER BY key;
TRUNCATE TABLE test_01040;
SELECT 'test_cluster_two_shards prefer_localhost_replica=1';
SET prefer_localhost_replica=1;
INSERT INTO dist_test_01040 SELECT toUInt64(number) FROM numbers(2);
SYSTEM FLUSH DISTRIBUTED dist_test_01040;
SELECT * FROM dist_test_01040 ORDER BY key;
TRUNCATE TABLE test_01040;
DROP TABLE dist_test_01040;
-- internal_replication=true
CREATE TABLE dist_test_01040 AS test_01040 Engine=Distributed(test_cluster_two_shards_internal_replication, currentDatabase(), test_01040, key);
SELECT 'test_cluster_two_shards_internal_replication prefer_localhost_replica=0';
SET prefer_localhost_replica=0;
INSERT INTO dist_test_01040 SELECT toUInt64(number) FROM numbers(2);
SYSTEM FLUSH DISTRIBUTED dist_test_01040;
SELECT * FROM dist_test_01040 ORDER BY key;
TRUNCATE TABLE test_01040;
SELECT 'test_cluster_two_shards_internal_replication prefer_localhost_replica=1';
SET prefer_localhost_replica=1;
INSERT INTO dist_test_01040 SELECT toUInt64(number) FROM numbers(2);
SYSTEM FLUSH DISTRIBUTED dist_test_01040;
SELECT * FROM dist_test_01040 ORDER BY key;
TRUNCATE TABLE test_01040;
DROP TABLE dist_test_01040;
DROP TABLE test_01040;