Merge pull request #38029 from ClickHouse/fix-possible-crash-after-removing-replica-in-distributed

Fix possible crash in Distributed async insert in case of removing a replica from config.
This commit is contained in:
Nikolai Kochetov 2022-06-20 11:38:35 +02:00 committed by GitHub
commit fab62513aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 71 additions and 1 deletions

View File

@ -20,6 +20,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ALL_CONNECTION_TRIES_FAILED;
}
@ -45,6 +46,9 @@ IConnectionPool::Entry ConnectionPoolWithFailover::get(const ConnectionTimeouts
const Settings * settings,
bool /*force_connected*/)
{
if (nested_pools.empty())
throw DB::Exception(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED, "Cannot get connection from ConnectionPoolWithFailover cause nested pools are empty");
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, timeouts, fail_message, settings);
@ -167,6 +171,9 @@ std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::g
PoolMode pool_mode,
const TryGetEntryFunc & try_get_entry)
{
if (nested_pools.empty())
throw DB::Exception(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED, "Cannot get connection from ConnectionPoolWithFailover cause nested pools are empty");
size_t min_entries = (settings && settings->skip_unavailable_shards) ? 0 : 1;
size_t max_tries = (settings ?
size_t{settings->connections_with_failover_max_tries} :

View File

@ -0,0 +1,25 @@
<clickhouse>
<remote_servers>
<test_cluster_remove_replica1>
<shard>
<replica>
<host>not_existing</host>
<port>9000</port>
</replica>
<replica>
<host>not_existing2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_remove_replica1>
<test_cluster_remove_replica2>
<shard>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_remove_replica2>
</remote_servers>
</clickhouse>

View File

@ -6,7 +6,11 @@ import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", main_configs=["configs/remote_servers.xml"])
node = cluster.add_instance(
"node",
main_configs=["configs/remote_servers.xml", "configs/another_remote_servers.xml"],
stay_alive=True,
)
cluster_param = pytest.mark.parametrize(
"cluster",
@ -143,3 +147,37 @@ def test_single_file_old(started_cluster, cluster):
assert out == "1\ta\n2\tbb\n3\tccc\n"
node.query("drop table test.distr_3")
def test_remove_replica(started_cluster):
node.query(
"create table test.local_4 (x UInt64, s String) engine = MergeTree order by x"
)
node.query(
"create table test.distr_4 (x UInt64, s String) engine = Distributed('test_cluster_remove_replica1', test, local_4)"
)
node.query(
"insert into test.distr_4 values (1, 'a'), (2, 'bb'), (3, 'ccc'), (4, 'dddd')"
)
node.query("detach table test.distr_4")
node.exec_in_container(
[
"sed",
"-i",
"s/test_cluster_remove_replica1/test_cluster_remove_replica_tmp/g",
"/etc/clickhouse-server/config.d/another_remote_servers.xml",
]
)
node.exec_in_container(
[
"sed",
"-i",
"s/test_cluster_remove_replica2/test_cluster_remove_replica1/g",
"/etc/clickhouse-server/config.d/another_remote_servers.xml",
]
)
node.query("SYSTEM RELOAD CONFIG")
node.query("attach table test.distr_4", ignore_error=True)
node.query("SYSTEM FLUSH DISTRIBUTED test.distr_4", ignore_error=True)
assert node.query("select 1") == "1\n"