Merge pull request #16244 from ClickHouse/fix_race_data_parts_exchange_endpoint

Fix race on StorageReplicatedMergeTree::data_parts_exchange_endpoint
This commit is contained in:
tavplubix 2020-10-28 13:25:55 +03:00 committed by GitHub
commit 4a63f1271b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -3481,8 +3481,10 @@ void StorageReplicatedMergeTree::startup()
{
queue.initialize(getDataParts());
data_parts_exchange_endpoint = std::make_shared<DataPartsExchange::Service>(*this);
global_context.getInterserverIOHandler().addEndpoint(data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint);
InterserverIOEndpointPtr data_parts_exchange_ptr = std::make_shared<DataPartsExchange::Service>(*this);
[[maybe_unused]] auto prev_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, data_parts_exchange_ptr);
assert(prev_ptr == nullptr);
global_context.getInterserverIOHandler().addEndpoint(data_parts_exchange_ptr->getId(replica_path), data_parts_exchange_ptr);
/// In this thread replica will be activated.
restarting_thread.start();
@ -3549,15 +3551,15 @@ void StorageReplicatedMergeTree::shutdown()
global_context.getBackgroundMovePool().removeTask(move_parts_task_handle);
move_parts_task_handle.reset();
if (data_parts_exchange_endpoint)
auto data_parts_exchange_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, InterserverIOEndpointPtr{});
if (data_parts_exchange_ptr)
{
global_context.getInterserverIOHandler().removeEndpointIfExists(data_parts_exchange_endpoint->getId(replica_path));
global_context.getInterserverIOHandler().removeEndpointIfExists(data_parts_exchange_ptr->getId(replica_path));
/// Ask all parts exchange handlers to finish asap. New ones will fail to start
data_parts_exchange_endpoint->blocker.cancelForever();
data_parts_exchange_ptr->blocker.cancelForever();
/// Wait for all of them
std::unique_lock lock(data_parts_exchange_endpoint->rwlock);
std::unique_lock lock(data_parts_exchange_ptr->rwlock);
}
data_parts_exchange_endpoint.reset();
/// We clear all old parts after stopping all background operations. It's
/// important, because background operations can produce temporary parts
@ -5900,7 +5902,10 @@ ActionLock StorageReplicatedMergeTree::getActionLock(StorageActionBlockType acti
return fetcher.blocker.cancel();
if (action_type == ActionLocks::PartsSend)
return data_parts_exchange_endpoint ? data_parts_exchange_endpoint->blocker.cancel() : ActionLock();
{
auto data_parts_exchange_ptr = std::atomic_load(&data_parts_exchange_endpoint);
return data_parts_exchange_ptr ? data_parts_exchange_ptr->blocker.cancel() : ActionLock();
}
if (action_type == ActionLocks::ReplicationQueue)
return queue.actions_blocker.cancel();