mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Fix async Distributed INSERT w/ prefer_localhost_replica=0 and internal_replication
This commit is contained in:
parent
f116679be9
commit
edc8d6e5e7
@ -392,6 +392,22 @@
|
|||||||
</replica>
|
</replica>
|
||||||
</shard>
|
</shard>
|
||||||
</test_cluster_two_shards>
|
</test_cluster_two_shards>
|
||||||
|
<test_cluster_two_shards_internal_replication>
|
||||||
|
<shard>
|
||||||
|
<internal_replication>true</internal_replication>
|
||||||
|
<replica>
|
||||||
|
<host>127.0.0.1</host>
|
||||||
|
<port>9000</port>
|
||||||
|
</replica>
|
||||||
|
</shard>
|
||||||
|
<shard>
|
||||||
|
<internal_replication>true</internal_replication>
|
||||||
|
<replica>
|
||||||
|
<host>127.0.0.2</host>
|
||||||
|
<port>9000</port>
|
||||||
|
</replica>
|
||||||
|
</shard>
|
||||||
|
</test_cluster_two_shards_internal_replication>
|
||||||
<test_shard_localhost_secure>
|
<test_shard_localhost_secure>
|
||||||
<shard>
|
<shard>
|
||||||
<replica>
|
<replica>
|
||||||
|
@ -614,13 +614,18 @@ const std::string & Cluster::ShardInfo::pathForInsert(bool prefer_localhost_repl
|
|||||||
if (!has_internal_replication)
|
if (!has_internal_replication)
|
||||||
throw Exception("internal_replication is not set", ErrorCodes::LOGICAL_ERROR);
|
throw Exception("internal_replication is not set", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
if (dir_name_for_internal_replication.empty() || dir_name_for_internal_replication_with_local.empty())
|
|
||||||
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
|
|
||||||
|
|
||||||
if (prefer_localhost_replica)
|
if (prefer_localhost_replica)
|
||||||
|
{
|
||||||
|
if (dir_name_for_internal_replication.empty())
|
||||||
|
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
|
||||||
return dir_name_for_internal_replication;
|
return dir_name_for_internal_replication;
|
||||||
|
}
|
||||||
else
|
else
|
||||||
|
{
|
||||||
|
if (dir_name_for_internal_replication_with_local.empty())
|
||||||
|
throw Exception("Directory name for async inserts is empty", ErrorCodes::LOGICAL_ERROR);
|
||||||
return dir_name_for_internal_replication_with_local;
|
return dir_name_for_internal_replication_with_local;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Cluster::maybeCrossReplication() const
|
bool Cluster::maybeCrossReplication() const
|
||||||
|
@ -1,4 +1,20 @@
|
|||||||
|
test_cluster_two_shards prefer_localhost_replica=0
|
||||||
|
0
|
||||||
0
|
0
|
||||||
1
|
1
|
||||||
|
1
|
||||||
|
test_cluster_two_shards prefer_localhost_replica=1
|
||||||
|
0
|
||||||
0
|
0
|
||||||
1
|
1
|
||||||
|
1
|
||||||
|
test_cluster_two_shards_internal_replication prefer_localhost_replica=0
|
||||||
|
0
|
||||||
|
0
|
||||||
|
1
|
||||||
|
1
|
||||||
|
test_cluster_two_shards_internal_replication prefer_localhost_replica=1
|
||||||
|
0
|
||||||
|
0
|
||||||
|
1
|
||||||
|
1
|
||||||
|
@ -7,6 +7,40 @@ DROP TABLE IF EXISTS dist_test_01040;
|
|||||||
|
|
||||||
CREATE TABLE test_01040 (key UInt64) ENGINE=TinyLog();
|
CREATE TABLE test_01040 (key UInt64) ENGINE=TinyLog();
|
||||||
CREATE TABLE dist_test_01040 AS test_01040 Engine=Distributed(test_cluster_two_shards, currentDatabase(), test_01040, key);
|
CREATE TABLE dist_test_01040 AS test_01040 Engine=Distributed(test_cluster_two_shards, currentDatabase(), test_01040, key);
|
||||||
|
|
||||||
|
-- internal_replication=false
|
||||||
|
SELECT 'test_cluster_two_shards prefer_localhost_replica=0';
|
||||||
|
SET prefer_localhost_replica=0;
|
||||||
INSERT INTO dist_test_01040 SELECT toUInt64(number) FROM numbers(2);
|
INSERT INTO dist_test_01040 SELECT toUInt64(number) FROM numbers(2);
|
||||||
SYSTEM FLUSH DISTRIBUTED dist_test_01040;
|
SYSTEM FLUSH DISTRIBUTED dist_test_01040;
|
||||||
SELECT * FROM dist_test_01040;
|
SELECT * FROM dist_test_01040 ORDER BY key;
|
||||||
|
TRUNCATE TABLE test_01040;
|
||||||
|
|
||||||
|
SELECT 'test_cluster_two_shards prefer_localhost_replica=1';
|
||||||
|
SET prefer_localhost_replica=1;
|
||||||
|
INSERT INTO dist_test_01040 SELECT toUInt64(number) FROM numbers(2);
|
||||||
|
SYSTEM FLUSH DISTRIBUTED dist_test_01040;
|
||||||
|
SELECT * FROM dist_test_01040 ORDER BY key;
|
||||||
|
TRUNCATE TABLE test_01040;
|
||||||
|
|
||||||
|
DROP TABLE dist_test_01040;
|
||||||
|
|
||||||
|
-- internal_replication=true
|
||||||
|
CREATE TABLE dist_test_01040 AS test_01040 Engine=Distributed(test_cluster_two_shards_internal_replication, currentDatabase(), test_01040, key);
|
||||||
|
SELECT 'test_cluster_two_shards_internal_replication prefer_localhost_replica=0';
|
||||||
|
SET prefer_localhost_replica=0;
|
||||||
|
INSERT INTO dist_test_01040 SELECT toUInt64(number) FROM numbers(2);
|
||||||
|
SYSTEM FLUSH DISTRIBUTED dist_test_01040;
|
||||||
|
SELECT * FROM dist_test_01040 ORDER BY key;
|
||||||
|
TRUNCATE TABLE test_01040;
|
||||||
|
|
||||||
|
SELECT 'test_cluster_two_shards_internal_replication prefer_localhost_replica=1';
|
||||||
|
SET prefer_localhost_replica=1;
|
||||||
|
INSERT INTO dist_test_01040 SELECT toUInt64(number) FROM numbers(2);
|
||||||
|
SYSTEM FLUSH DISTRIBUTED dist_test_01040;
|
||||||
|
SELECT * FROM dist_test_01040 ORDER BY key;
|
||||||
|
TRUNCATE TABLE test_01040;
|
||||||
|
|
||||||
|
|
||||||
|
DROP TABLE dist_test_01040;
|
||||||
|
DROP TABLE test_01040;
|
||||||
|
Loading…
Reference in New Issue
Block a user