Merge pull request #44719 from ClickHouse/fix_attach_thread_deadlock

Fix deadlock in attach thread
This commit is contained in:
Alexey Milovidov 2022-12-30 13:56:18 +03:00 committed by GitHub
commit 13b76ad760
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 52 additions and 39 deletions

View File

@ -191,7 +191,7 @@ void ReplicatedMergeTreeAttachThread::runImpl()
void ReplicatedMergeTreeAttachThread::finalizeInitialization() TSA_NO_THREAD_SAFETY_ANALYSIS
{
storage.startupImpl();
storage.startupImpl(/* from_attach_thread */ true);
storage.initialization_done = true;
LOG_INFO(log, "Table is initialized");
}

View File

@ -10,11 +10,6 @@
#include <boost/algorithm/string/replace.hpp>
namespace ProfileEvents
{
extern const Event ReplicaPartialShutdown;
}
namespace CurrentMetrics
{
extern const Metric ReadonlyReplica;
@ -335,34 +330,11 @@ void ReplicatedMergeTreeRestartingThread::activateReplica()
void ReplicatedMergeTreeRestartingThread::partialShutdown(bool part_of_full_shutdown)
{
setReadonly(part_of_full_shutdown);
ProfileEvents::increment(ProfileEvents::ReplicaPartialShutdown);
storage.partial_shutdown_called = true;
storage.partial_shutdown_event.set();
storage.replica_is_active_node = nullptr;
LOG_TRACE(log, "Waiting for threads to finish");
storage.merge_selecting_task->deactivate();
storage.queue_updating_task->deactivate();
storage.mutations_updating_task->deactivate();
storage.mutations_finalizing_task->deactivate();
storage.cleanup_thread.stop();
storage.part_check_thread.stop();
/// Stop queue processing
{
auto fetch_lock = storage.fetcher.blocker.cancel();
auto merge_lock = storage.merger_mutator.merges_blocker.cancel();
auto move_lock = storage.parts_mover.moves_blocker.cancel();
storage.background_operations_assignee.finish();
}
LOG_TRACE(log, "Threads finished");
storage.partialShutdown();
}
void ReplicatedMergeTreeRestartingThread::shutdown()
void ReplicatedMergeTreeRestartingThread::shutdown(bool part_of_full_shutdown)
{
/// Stop restarting_thread before stopping other tasks - so that it won't restart them again.
need_stop = true;
@ -370,7 +342,7 @@ void ReplicatedMergeTreeRestartingThread::shutdown()
LOG_TRACE(log, "Restarting thread finished");
/// Stop other tasks.
partialShutdown(/* part_of_full_shutdown */ true);
partialShutdown(part_of_full_shutdown);
}
void ReplicatedMergeTreeRestartingThread::setReadonly(bool on_shutdown)

View File

@ -28,7 +28,7 @@ public:
void wakeup() { task->schedule(); }
void shutdown();
void shutdown(bool part_of_full_shutdown);
private:
StorageReplicatedMergeTree & storage;

View File

@ -113,6 +113,7 @@ namespace ProfileEvents
extern const Event NotCreatedLogEntryForMerge;
extern const Event CreatedLogEntryForMutation;
extern const Event NotCreatedLogEntryForMutation;
extern const Event ReplicaPartialShutdown;
}
namespace CurrentMetrics
@ -4246,10 +4247,10 @@ void StorageReplicatedMergeTree::startup()
return;
}
startupImpl();
startupImpl(/* from_attach_thread */ false);
}
void StorageReplicatedMergeTree::startupImpl()
void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread)
{
/// Do not start replication if ZooKeeper is not configured or there is no metadata in zookeeper
if (!has_metadata_in_zookeeper.has_value() || !*has_metadata_in_zookeeper)
@ -4291,7 +4292,16 @@ void StorageReplicatedMergeTree::startupImpl()
/// It means that failed "startup" must not create any background tasks that we will have to wait.
try
{
shutdown();
/// it's important to avoid full shutdown here, because it even tries to shutdown attach thread which was
/// designed exactly for this: try to start table if no zookeeper connection available.
if (from_attach_thread)
{
restarting_thread.shutdown(/* part_of_full_shutdown */false);
}
else
{
shutdown();
}
}
catch (...)
{
@ -4311,6 +4321,35 @@ void StorageReplicatedMergeTree::flush()
flushAllInMemoryPartsIfNeeded();
}
void StorageReplicatedMergeTree::partialShutdown()
{
ProfileEvents::increment(ProfileEvents::ReplicaPartialShutdown);
partial_shutdown_called = true;
partial_shutdown_event.set();
replica_is_active_node = nullptr;
LOG_TRACE(log, "Waiting for threads to finish");
merge_selecting_task->deactivate();
queue_updating_task->deactivate();
mutations_updating_task->deactivate();
mutations_finalizing_task->deactivate();
cleanup_thread.stop();
part_check_thread.stop();
/// Stop queue processing
{
auto fetch_lock = fetcher.blocker.cancel();
auto merge_lock = merger_mutator.merges_blocker.cancel();
auto move_lock = parts_mover.moves_blocker.cancel();
background_operations_assignee.finish();
}
LOG_TRACE(log, "Threads finished");
}
void StorageReplicatedMergeTree::shutdown()
{
if (shutdown_called.exchange(true))
@ -4327,7 +4366,8 @@ void StorageReplicatedMergeTree::shutdown()
if (attach_thread)
attach_thread->shutdown();
restarting_thread.shutdown();
restarting_thread.shutdown(/* part_of_full_shutdown */true);
background_operations_assignee.finish();
part_moves_between_shards_orchestrator.shutdown();
@ -5161,7 +5201,7 @@ void StorageReplicatedMergeTree::restoreMetadataInZooKeeper()
LOG_INFO(log, "Attached all partitions, starting table");
startupImpl();
startupImpl(/* from_attach_thread */ false);
}
void StorageReplicatedMergeTree::dropPartNoWaitNoThrow(const String & part_name)

View File

@ -109,6 +109,7 @@ public:
void startup() override;
void shutdown() override;
void partialShutdown();
void flush() override;
~StorageReplicatedMergeTree() override;
@ -858,7 +859,7 @@ private:
/// If somebody already holding the lock -- return std::nullopt.
std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk) override;
void startupImpl();
void startupImpl(bool from_attach_thread);
};
String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info);