mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-17 20:02:05 +00:00
Merge pull request #11420 from ClickHouse/fix-msan-failure
Fix MSan failure in MergeTree background task
This commit is contained in:
commit
0a453f5c83
@ -690,6 +690,10 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
|
||||
/// Then background task is created by "startup" method. And when destructor of a table object is called, background task is still active,
|
||||
/// and the task will use references to freed data.
|
||||
|
||||
/// Also note that "startup" method is exception-safe. If exception is thrown from "startup",
|
||||
/// we can safely destroy the object without a call to "shutdown", because there is guarantee
|
||||
/// that no background threads/similar resources remain after exception from "startup".
|
||||
|
||||
res->startup();
|
||||
return true;
|
||||
}
|
||||
|
@ -450,7 +450,6 @@ void StorageBuffer::startup()
|
||||
LOG_WARNING(log, "Storage {} is run with readonly settings, it will not be able to insert data. Set appropriate system_profile to fix this.", getName());
|
||||
}
|
||||
|
||||
|
||||
flush_handle = bg_pool.createTask(log->name() + "/Bg", [this]{ flushBack(); });
|
||||
flush_handle->activateAndSchedule();
|
||||
}
|
||||
|
@ -95,16 +95,36 @@ void StorageMergeTree::startup()
|
||||
/// NOTE background task will also do the above cleanups periodically.
|
||||
time_after_previous_cleanup.restart();
|
||||
|
||||
auto & merge_pool = global_context.getBackgroundPool();
|
||||
merging_mutating_task_handle = merge_pool.createTask([this] { return mergeMutateTask(); });
|
||||
/// Ensure that thread started only after assignment to 'merging_mutating_task_handle' is done.
|
||||
merge_pool.startTask(merging_mutating_task_handle);
|
||||
|
||||
if (areBackgroundMovesNeeded())
|
||||
try
|
||||
{
|
||||
auto & move_pool = global_context.getBackgroundMovePool();
|
||||
moving_task_handle = move_pool.createTask([this] { return movePartsTask(); });
|
||||
move_pool.startTask(moving_task_handle);
|
||||
auto & merge_pool = global_context.getBackgroundPool();
|
||||
merging_mutating_task_handle = merge_pool.createTask([this] { return mergeMutateTask(); });
|
||||
/// Ensure that thread started only after assignment to 'merging_mutating_task_handle' is done.
|
||||
merge_pool.startTask(merging_mutating_task_handle);
|
||||
|
||||
if (areBackgroundMovesNeeded())
|
||||
{
|
||||
auto & move_pool = global_context.getBackgroundMovePool();
|
||||
moving_task_handle = move_pool.createTask([this] { return movePartsTask(); });
|
||||
move_pool.startTask(moving_task_handle);
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// Exception safety: failed "startup" does not require a call to "shutdown" from the caller.
|
||||
/// And it should be able to safely destroy table after exception in "startup" method.
|
||||
/// It means that failed "startup" must not create any background tasks that we will have to wait.
|
||||
try
|
||||
{
|
||||
shutdown();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
std::terminate();
|
||||
}
|
||||
|
||||
/// Note: after failed "startup", the table will be in a state that only allows to destroy the object.
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2907,44 +2907,60 @@ void StorageReplicatedMergeTree::startup()
|
||||
if (is_readonly)
|
||||
return;
|
||||
|
||||
queue.initialize(
|
||||
zookeeper_path, replica_path,
|
||||
getStorageID().getFullTableName() + " (ReplicatedMergeTreeQueue)",
|
||||
getDataParts());
|
||||
|
||||
data_parts_exchange_endpoint = std::make_shared<DataPartsExchange::Service>(*this);
|
||||
global_context.getInterserverIOHandler().addEndpoint(data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint);
|
||||
|
||||
/// In this thread replica will be activated.
|
||||
restarting_thread.start();
|
||||
|
||||
/// Wait while restarting_thread initializes LeaderElection (and so on) or makes first attmept to do it
|
||||
startup_event.wait();
|
||||
|
||||
/// If we don't separate create/start steps, race condition will happen
|
||||
/// between the assignment of queue_task_handle and queueTask that use the queue_task_handle.
|
||||
try
|
||||
{
|
||||
auto lock = queue.lockQueue();
|
||||
auto & pool = global_context.getBackgroundPool();
|
||||
queue_task_handle = pool.createTask([this] { return queueTask(); });
|
||||
pool.startTask(queue_task_handle);
|
||||
}
|
||||
queue.initialize(
|
||||
zookeeper_path, replica_path,
|
||||
getStorageID().getFullTableName() + " (ReplicatedMergeTreeQueue)",
|
||||
getDataParts());
|
||||
|
||||
if (areBackgroundMovesNeeded())
|
||||
{
|
||||
auto & pool = global_context.getBackgroundMovePool();
|
||||
move_parts_task_handle = pool.createTask([this] { return movePartsTask(); });
|
||||
pool.startTask(move_parts_task_handle);
|
||||
data_parts_exchange_endpoint = std::make_shared<DataPartsExchange::Service>(*this);
|
||||
global_context.getInterserverIOHandler().addEndpoint(data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint);
|
||||
|
||||
/// In this thread replica will be activated.
|
||||
restarting_thread.start();
|
||||
|
||||
/// Wait while restarting_thread initializes LeaderElection (and so on) or makes first attmept to do it
|
||||
startup_event.wait();
|
||||
|
||||
/// If we don't separate create/start steps, race condition will happen
|
||||
/// between the assignment of queue_task_handle and queueTask that use the queue_task_handle.
|
||||
{
|
||||
auto lock = queue.lockQueue();
|
||||
auto & pool = global_context.getBackgroundPool();
|
||||
queue_task_handle = pool.createTask([this] { return queueTask(); });
|
||||
pool.startTask(queue_task_handle);
|
||||
}
|
||||
|
||||
if (areBackgroundMovesNeeded())
|
||||
{
|
||||
auto & pool = global_context.getBackgroundMovePool();
|
||||
move_parts_task_handle = pool.createTask([this] { return movePartsTask(); });
|
||||
pool.startTask(move_parts_task_handle);
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// Exception safety: failed "startup" does not require a call to "shutdown" from the caller.
|
||||
/// And it should be able to safely destroy table after exception in "startup" method.
|
||||
/// It means that failed "startup" must not create any background tasks that we will have to wait.
|
||||
try
|
||||
{
|
||||
shutdown();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
std::terminate();
|
||||
}
|
||||
|
||||
/// Note: after failed "startup", the table will be in a state that only allows to destroy the object.
|
||||
throw;
|
||||
}
|
||||
need_shutdown.store(true);
|
||||
}
|
||||
|
||||
|
||||
void StorageReplicatedMergeTree::shutdown()
|
||||
{
|
||||
if (!need_shutdown.load())
|
||||
return;
|
||||
|
||||
clearOldPartsFromFilesystem(true);
|
||||
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
|
||||
fetcher.blocker.cancelForever();
|
||||
@ -2981,7 +2997,6 @@ void StorageReplicatedMergeTree::shutdown()
|
||||
std::unique_lock lock(data_parts_exchange_endpoint->rwlock);
|
||||
}
|
||||
data_parts_exchange_endpoint.reset();
|
||||
need_shutdown.store(false);
|
||||
}
|
||||
|
||||
|
||||
|
@ -288,8 +288,6 @@ private:
|
||||
/// True if replica was created for existing table with fixed granularity
|
||||
bool other_replicas_fixed_granularity = false;
|
||||
|
||||
std::atomic_bool need_shutdown{false};
|
||||
|
||||
template <class Func>
|
||||
void foreachCommittedParts(const Func & func) const;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user