mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #16244 from ClickHouse/fix_race_data_parts_exchange_endpoint
Fix race on StorageReplicatedMergeTree::data_parts_exchange_endpoint
This commit is contained in:
commit
4a63f1271b
@ -3481,8 +3481,10 @@ void StorageReplicatedMergeTree::startup()
|
||||
{
|
||||
queue.initialize(getDataParts());
|
||||
|
||||
data_parts_exchange_endpoint = std::make_shared<DataPartsExchange::Service>(*this);
|
||||
global_context.getInterserverIOHandler().addEndpoint(data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint);
|
||||
InterserverIOEndpointPtr data_parts_exchange_ptr = std::make_shared<DataPartsExchange::Service>(*this);
|
||||
[[maybe_unused]] auto prev_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, data_parts_exchange_ptr);
|
||||
assert(prev_ptr == nullptr);
|
||||
global_context.getInterserverIOHandler().addEndpoint(data_parts_exchange_ptr->getId(replica_path), data_parts_exchange_ptr);
|
||||
|
||||
/// In this thread replica will be activated.
|
||||
restarting_thread.start();
|
||||
@ -3549,15 +3551,15 @@ void StorageReplicatedMergeTree::shutdown()
|
||||
global_context.getBackgroundMovePool().removeTask(move_parts_task_handle);
|
||||
move_parts_task_handle.reset();
|
||||
|
||||
if (data_parts_exchange_endpoint)
|
||||
auto data_parts_exchange_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, InterserverIOEndpointPtr{});
|
||||
if (data_parts_exchange_ptr)
|
||||
{
|
||||
global_context.getInterserverIOHandler().removeEndpointIfExists(data_parts_exchange_endpoint->getId(replica_path));
|
||||
global_context.getInterserverIOHandler().removeEndpointIfExists(data_parts_exchange_ptr->getId(replica_path));
|
||||
/// Ask all parts exchange handlers to finish asap. New ones will fail to start
|
||||
data_parts_exchange_endpoint->blocker.cancelForever();
|
||||
data_parts_exchange_ptr->blocker.cancelForever();
|
||||
/// Wait for all of them
|
||||
std::unique_lock lock(data_parts_exchange_endpoint->rwlock);
|
||||
std::unique_lock lock(data_parts_exchange_ptr->rwlock);
|
||||
}
|
||||
data_parts_exchange_endpoint.reset();
|
||||
|
||||
/// We clear all old parts after stopping all background operations. It's
|
||||
/// important, because background operations can produce temporary parts
|
||||
@ -5900,7 +5902,10 @@ ActionLock StorageReplicatedMergeTree::getActionLock(StorageActionBlockType acti
|
||||
return fetcher.blocker.cancel();
|
||||
|
||||
if (action_type == ActionLocks::PartsSend)
|
||||
return data_parts_exchange_endpoint ? data_parts_exchange_endpoint->blocker.cancel() : ActionLock();
|
||||
{
|
||||
auto data_parts_exchange_ptr = std::atomic_load(&data_parts_exchange_endpoint);
|
||||
return data_parts_exchange_ptr ? data_parts_exchange_ptr->blocker.cancel() : ActionLock();
|
||||
}
|
||||
|
||||
if (action_type == ActionLocks::ReplicationQueue)
|
||||
return queue.actions_blocker.cancel();
|
||||
|
Loading…
Reference in New Issue
Block a user