mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-17 11:52:27 +00:00
c6f7698f9a
Lambda erase_from_active() captures the item (TaskRuntimeDataPtr), most of the code path is OK, since it explicitly reset the item->task. However one is not, when it moves the item to pending list, which will be cleaned up when the table will be DROP/DETACH'ed, from MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(), and in this case if IStorage will be already removed, then it will lead to use-after-free on destroying the lambda, since it captures the item by value. And I belive that CI founds this issue here [1]: <details> <summary>stack trace</summary> 4 0x268d1354 in DB::ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting() build_docker/../src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp:1510:33 5 0x268ead1a in std::__1::default_delete<DB::ReplicatedMergeTreeQueue::CurrentlyExecuting>::operator()(DB::ReplicatedMergeTreeQueue::CurrentlyExecuting*) const build_docker/../contrib/libcxx/include/__memory/unique_ptr.h:54:5 6 0x268ead1a in std::__1::unique_ptr<DB::ReplicatedMergeTreeQueue::CurrentlyExecuting, std::__1::default_delete<DB::ReplicatedMergeTreeQueue::CurrentlyExecuting> >::reset(DB::ReplicatedMergeTreeQueue::CurrentlyExecuting*) build_docker/../contrib/libcxx/include/__memory/unique_ptr.h:315:7 7 0x268ead1a in std::__1::unique_ptr<DB::ReplicatedMergeTreeQueue::CurrentlyExecuting, std::__1::default_delete<DB::ReplicatedMergeTreeQueue::CurrentlyExecuting> >::~unique_ptr() build_docker/../contrib/libcxx/include/__memory/unique_ptr.h:269:19 8 0x268ead1a in DB::ReplicatedMergeTreeQueue::SelectedEntry::~SelectedEntry() build_docker/../src/Storages/MergeTree/ReplicatedMergeTreeQueue.h:351:12 9 0x268ead1a in void std::__1::__destroy_at<DB::ReplicatedMergeTreeQueue::SelectedEntry, 0>(DB::ReplicatedMergeTreeQueue::SelectedEntry*) build_docker/../contrib/libcxx/include/__memory/construct_at.h:56:13 ... 16 0x265e9abb in DB::MergeTreeBackgroundExecutor<DB::MergeMutateRuntimeQueue>::routine(std::__1::shared_ptr<DB::TaskRuntimeData>)::'lambda'()::~() build_docker/../src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp:127:30 17 0x265e9abb in DB::MergeTreeBackgroundExecutor<DB::MergeMutateRuntimeQueue>::routine(std::__1::shared_ptr<DB::TaskRuntimeData>) build_docker/../src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp:235:1 18 0x265ea730 in DB::MergeTreeBackgroundExecutor<DB::MergeMutateRuntimeQueue>::threadFunction() build_docker/../src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp:261:13 </details> [1]: https://s3.amazonaws.com/clickhouse-test-reports/0/533c7888038453c047df816f3f65292cca05a54f/stateless_tests__ubsan__actions_.html And I also I think that the following issue will also be fixed [2]: <details> <summary>stack trace</summary> [ 680 ] {} <Fatal> : Logical error: 'Memory tracker: allocations not allowed.'. ... [ 23430 ] {} <Fatal> BaseDaemon: 23. /build/build_docker/../src/Common/formatReadable.cpp:46: formatReadableSizeWithBinarySuffix(double, int) @ 0x1713bb98 in /usr/bin/clickhouse ... [ 23430 ] {} <Fatal> BaseDaemon: 31. ../src/Common/MemoryTracker.cpp:112: MemoryTracker::logPeakMemoryUsage() @ 0x170e0ab9 in /usr/bin/clickhouse [ 23430 ] {} <Fatal> BaseDaemon: 32. /build/build_docker/../src/Common/MemoryTracker.cpp:98: MemoryTracker::~MemoryTracker() @ 0x170e063a in /usr/bin/clickhouse [ 23430 ] {} <Fatal> BaseDaemon: 33. /build/build_docker/../src/Storages/MergeTree/MergeList.cpp:144: DB::MergeListElement::~MergeListElement() @ 0x279fb290 in /usr/bin/clickhouse ... [ 23430 ] {} <Fatal> BaseDaemon: 38. /build/build_docker/../src/Storages/MergeTree/BackgroundProcessList.h:41: DB::BackgroundProcessListEntry<DB::MergeListElement, DB::MergeInfo>::~BackgroundProcessListEntry() @ 0x276ce6c7 in /usr/bin/clickhouse [ 23430 ] {} <Fatal> BaseDaemon: 39. /build/build_docker/../contrib/libcxx/include/__memory/unique_ptr.h:54: std::__1::default_delete<DB::BackgroundProcessListEntry<DB::MergeListElement, DB::MergeInfo> >::operator()(DB::BackgroundProcessListEntry<DB::MergeListElement, DB::MergeInfo>*) const @ 0x276ce60b in /usr/bin/clickhouse [ 23430 ] {} <Fatal> BaseDaemon: 40. /build/build_docker/../contrib/libcxx/include/__memory/unique_ptr.h:316: std::__1::unique_ptr<DB::BackgroundProcessListEntry<DB::MergeListElement, DB::MergeInfo>, std::__1::default_delete<DB::BackgroundProcessListEntry<DB::MergeListElement, DB::MergeInfo> > >::reset(DB::BackgroundProcessListEntry<DB::MergeListElement, DB::MergeInfo>*) @ 0x276ce57c in /usr/bin/clickhouse [ 23430 ] {} <Fatal> BaseDaemon: 41. /build/build_docker/../contrib/libcxx/include/__memory/unique_ptr.h:269: std::__1::unique_ptr<DB::BackgroundProcessListEntry<DB::MergeListElement, DB::MergeInfo>, std::__1::default_delete<DB::BackgroundProcessListEntry<DB::MergeListElement, DB::MergeInfo> > >::~unique_ptr() @ 0x276ce399 in /usr/bin/clickhouse [ 23430 ] {} <Fatal> BaseDaemon: 42. /build/build_docker/../src/Storages/MergeTree/MutatePlainMergeTreeTask.h:22: DB::MutatePlainMergeTreeTask::~MutatePlainMergeTreeTask() @ 0x27defceb in /usr/bin/clickhouse [ 23430 ] {} <Fatal> BaseDaemon: 43. /build/build_docker/../contrib/libcxx/include/__memory/construct_at.h:57: void std::__1::__destroy_at<DB::MutatePlainMergeTreeTask, 0>(DB::MutatePlainMergeTreeTask*) @ 0x27dd69c1 in /usr/bin/clickhouse [ 23430 ] {} <Fatal> BaseDaemon: 44. /build/build_docker/../contrib/libcxx/include/__memory/construct_at.h:82: void std::__1::destroy_at<DB::MutatePlainMergeTreeTask, 0>(DB::MutatePlainMergeTreeTask*) @ 0x27dd6955 in /usr/bin/clickhouse [ 23430 ] {} <Fatal> BaseDaemon: Integrity check of the executable skipped because the reference checksum could not be read. (calculated checksum: 91F5937571C11255DFE73230B52CE9C0) [ 602 ] {} <Fatal> Application: Child process was terminated by signal 6. </details> [2]: https://s3.amazonaws.com/clickhouse-test-reports/39222/a068c397dfd7943359a8b554566c3c70b78baf8d/stateless_tests__debug__actions__%5B1/3%5D.html Refs: https://github.com/ClickHouse/ClickHouse/pull/29614#discussion_r720455032 (cc @nikitamikhaylov) Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
276 lines
7.9 KiB
C++
276 lines
7.9 KiB
C++
#include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
|
|
|
|
#include <algorithm>
|
|
|
|
#include <Common/setThreadName.h>
|
|
#include <Common/Exception.h>
|
|
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
|
|
#include <Common/noexcept_scope.h>
|
|
|
|
|
|
namespace DB
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
{
|
|
extern const int ABORTED;
|
|
}
|
|
|
|
|
|
template <class Queue>
|
|
void MergeTreeBackgroundExecutor<Queue>::wait()
|
|
{
|
|
{
|
|
std::lock_guard lock(mutex);
|
|
shutdown = true;
|
|
has_tasks.notify_all();
|
|
}
|
|
|
|
pool.wait();
|
|
}
|
|
|
|
template <class Queue>
|
|
void MergeTreeBackgroundExecutor<Queue>::increaseThreadsAndMaxTasksCount(size_t new_threads_count, size_t new_max_tasks_count)
|
|
{
|
|
std::lock_guard lock(mutex);
|
|
|
|
/// Do not throw any exceptions from global pool. Just log a warning and silently return.
|
|
if (new_threads_count < threads_count)
|
|
{
|
|
LOG_WARNING(log, "Loaded new threads count for {}Executor from top level config, but new value ({}) is not greater than current {}", name, new_threads_count, threads_count);
|
|
return;
|
|
}
|
|
|
|
if (new_max_tasks_count < max_tasks_count)
|
|
{
|
|
LOG_WARNING(log, "Loaded new max tasks count for {}Executor from top level config, but new value ({}) is not greater than current {}", name, new_max_tasks_count, max_tasks_count);
|
|
return;
|
|
}
|
|
|
|
LOG_INFO(log, "Loaded new threads count ({}) and max tasks count ({}) for {}Executor", new_threads_count, new_max_tasks_count, name);
|
|
|
|
pending.setCapacity(new_max_tasks_count);
|
|
active.set_capacity(new_max_tasks_count);
|
|
|
|
pool.setMaxThreads(std::max(1UL, new_threads_count));
|
|
pool.setMaxFreeThreads(std::max(1UL, new_threads_count));
|
|
pool.setQueueSize(std::max(1UL, new_threads_count));
|
|
|
|
for (size_t number = threads_count; number < new_threads_count; ++number)
|
|
pool.scheduleOrThrowOnError([this] { threadFunction(); });
|
|
|
|
max_tasks_count = new_max_tasks_count;
|
|
threads_count = new_threads_count;
|
|
}
|
|
|
|
template <class Queue>
|
|
size_t MergeTreeBackgroundExecutor<Queue>::getMaxTasksCount() const
|
|
{
|
|
std::lock_guard lock(mutex);
|
|
return max_tasks_count;
|
|
}
|
|
|
|
template <class Queue>
|
|
bool MergeTreeBackgroundExecutor<Queue>::trySchedule(ExecutableTaskPtr task)
|
|
{
|
|
std::lock_guard lock(mutex);
|
|
|
|
if (shutdown)
|
|
return false;
|
|
|
|
auto & value = CurrentMetrics::values[metric];
|
|
if (value.load() >= static_cast<int64_t>(max_tasks_count))
|
|
return false;
|
|
|
|
pending.push(std::make_shared<TaskRuntimeData>(std::move(task), metric));
|
|
|
|
has_tasks.notify_one();
|
|
return true;
|
|
}
|
|
|
|
|
|
template <class Queue>
|
|
void MergeTreeBackgroundExecutor<Queue>::removeTasksCorrespondingToStorage(StorageID id)
|
|
{
|
|
std::vector<TaskRuntimeDataPtr> tasks_to_wait;
|
|
{
|
|
std::lock_guard lock(mutex);
|
|
|
|
/// Erase storage related tasks from pending and select active tasks to wait for
|
|
pending.remove(id);
|
|
|
|
/// Copy items to wait for their completion
|
|
std::copy_if(active.begin(), active.end(), std::back_inserter(tasks_to_wait),
|
|
[&] (auto item) -> bool { return item->task->getStorageID() == id; });
|
|
|
|
for (auto & item : tasks_to_wait)
|
|
item->is_currently_deleting = true;
|
|
}
|
|
|
|
/// Wait for each task to be executed
|
|
for (auto & item : tasks_to_wait)
|
|
{
|
|
item->is_done.wait();
|
|
item.reset();
|
|
}
|
|
}
|
|
|
|
|
|
template <class Queue>
|
|
void MergeTreeBackgroundExecutor<Queue>::routine(TaskRuntimeDataPtr item)
|
|
{
|
|
/// FIXME Review exception-safety of this, remove NOEXCEPT_SCOPE and ALLOW_ALLOCATIONS_IN_SCOPE if possible
|
|
DENY_ALLOCATIONS_IN_SCOPE;
|
|
|
|
/// All operations with queues are considered no to do any allocations
|
|
|
|
auto erase_from_active = [this, &item]() TSA_REQUIRES(mutex)
|
|
{
|
|
active.erase(std::remove(active.begin(), active.end(), item), active.end());
|
|
};
|
|
|
|
bool need_execute_again = false;
|
|
|
|
try
|
|
{
|
|
ALLOW_ALLOCATIONS_IN_SCOPE;
|
|
need_execute_again = item->task->executeStep();
|
|
}
|
|
catch (const Exception & e)
|
|
{
|
|
NOEXCEPT_SCOPE({
|
|
ALLOW_ALLOCATIONS_IN_SCOPE;
|
|
if (e.code() == ErrorCodes::ABORTED) /// Cancelled merging parts is not an error - log as info.
|
|
LOG_INFO(log, fmt::runtime(getCurrentExceptionMessage(false)));
|
|
else
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
});
|
|
}
|
|
catch (...)
|
|
{
|
|
NOEXCEPT_SCOPE({
|
|
ALLOW_ALLOCATIONS_IN_SCOPE;
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
});
|
|
}
|
|
|
|
if (need_execute_again)
|
|
{
|
|
std::lock_guard guard(mutex);
|
|
erase_from_active();
|
|
|
|
if (item->is_currently_deleting)
|
|
{
|
|
/// This is significant to order the destructors.
|
|
{
|
|
NOEXCEPT_SCOPE({
|
|
ALLOW_ALLOCATIONS_IN_SCOPE;
|
|
item->task.reset();
|
|
});
|
|
}
|
|
item->is_done.set();
|
|
item = nullptr;
|
|
return;
|
|
}
|
|
|
|
/// After the `guard` destruction `item` has to be in moved from state
|
|
/// Not to own the object it points to.
|
|
/// Otherwise the destruction of the task won't be ordered with the destruction of the
|
|
/// storage.
|
|
pending.push(std::move(item));
|
|
has_tasks.notify_one();
|
|
item = nullptr;
|
|
return;
|
|
}
|
|
|
|
{
|
|
std::lock_guard guard(mutex);
|
|
erase_from_active();
|
|
has_tasks.notify_one();
|
|
|
|
try
|
|
{
|
|
ALLOW_ALLOCATIONS_IN_SCOPE;
|
|
/// In a situation of a lack of memory this method can throw an exception,
|
|
/// because it may interact somehow with BackgroundSchedulePool, which may allocate memory
|
|
/// But it is rather safe, because we have try...catch block here, and another one in ThreadPool.
|
|
item->task->onCompleted();
|
|
}
|
|
catch (const Exception & e)
|
|
{
|
|
NOEXCEPT_SCOPE({
|
|
ALLOW_ALLOCATIONS_IN_SCOPE;
|
|
if (e.code() == ErrorCodes::ABORTED) /// Cancelled merging parts is not an error - log as info.
|
|
LOG_INFO(log, fmt::runtime(getCurrentExceptionMessage(false)));
|
|
else
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
});
|
|
}
|
|
catch (...)
|
|
{
|
|
NOEXCEPT_SCOPE({
|
|
ALLOW_ALLOCATIONS_IN_SCOPE;
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
});
|
|
}
|
|
|
|
|
|
/// We have to call reset() under a lock, otherwise a race is possible.
|
|
/// Imagine, that task is finally completed (last execution returned false),
|
|
/// we removed the task from both queues, but still have pointer.
|
|
/// The thread that shutdowns storage will scan queues in order to find some tasks to wait for, but will find nothing.
|
|
/// So, the destructor of a task and the destructor of a storage will be executed concurrently.
|
|
{
|
|
NOEXCEPT_SCOPE({
|
|
ALLOW_ALLOCATIONS_IN_SCOPE;
|
|
item->task.reset();
|
|
});
|
|
}
|
|
|
|
item->is_done.set();
|
|
item = nullptr;
|
|
}
|
|
}
|
|
|
|
|
|
template <class Queue>
|
|
void MergeTreeBackgroundExecutor<Queue>::threadFunction()
|
|
{
|
|
setThreadName(name.c_str());
|
|
|
|
DENY_ALLOCATIONS_IN_SCOPE;
|
|
|
|
while (true)
|
|
{
|
|
try
|
|
{
|
|
TaskRuntimeDataPtr item;
|
|
{
|
|
std::unique_lock lock(mutex);
|
|
has_tasks.wait(lock, [this]() TSA_REQUIRES(mutex) { return !pending.empty() || shutdown; });
|
|
|
|
if (shutdown)
|
|
break;
|
|
|
|
item = std::move(pending.pop());
|
|
active.push_back(item);
|
|
}
|
|
|
|
routine(std::move(item));
|
|
}
|
|
catch (...)
|
|
{
|
|
NOEXCEPT_SCOPE({
|
|
ALLOW_ALLOCATIONS_IN_SCOPE;
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
template class MergeTreeBackgroundExecutor<MergeMutateRuntimeQueue>;
|
|
template class MergeTreeBackgroundExecutor<OrdinaryRuntimeQueue>;
|
|
|
|
}
|