Use ThreadPool to ensure that everything has been finished in PipelineExecutor

This will fix possible UAF, since ThreadFromGlobalPool cannot wait until
the job will be reseted, for this you need to call ThreadPool::wait()
(that waits until scheduled_jobs will be zero), and this guarantee that
the job was reseted, otherwise some variables that job is referencing
may be already destroyed, while job hasn't been destoyed yet (example
that I found PipelineExecutor -> ThreadGroupStatusPtr -> MemoryTracker
-> ~MemoryTracker -> log, while log had been destroyed already, here is
TSAN report --
https://gist.github.com/azat/d480dc3af5a0a44de4b038e20807c4b9, copied
from
https://clickhouse-test-reports.s3.yandex.net/15035/79133a426fdf042e383ea0cdccc4dc8273baa3a7/functional_stateless_tests_(thread)/test_run.txt.out.log)
This commit is contained in:
Azat Khuzhin 2020-09-22 12:15:57 +03:00
parent 789434994a
commit 1fcc36ef80

View File

@ -694,9 +694,7 @@ void PipelineExecutor::executeImpl(size_t num_threads)
{ {
initializeExecution(num_threads); initializeExecution(num_threads);
using ThreadsData = std::vector<ThreadFromGlobalPool>; ThreadPool threads(num_threads);
ThreadsData threads;
threads.reserve(num_threads);
bool finished_flag = false; bool finished_flag = false;
@ -704,10 +702,7 @@ void PipelineExecutor::executeImpl(size_t num_threads)
if (!finished_flag) if (!finished_flag)
{ {
finish(); finish();
threads.wait();
for (auto & thread : threads)
if (thread.joinable())
thread.join();
} }
); );
@ -717,7 +712,7 @@ void PipelineExecutor::executeImpl(size_t num_threads)
for (size_t i = 0; i < num_threads; ++i) for (size_t i = 0; i < num_threads; ++i)
{ {
threads.emplace_back([this, thread_group, thread_num = i, num_threads] threads.scheduleOrThrowOnError([this, thread_group, thread_num = i, num_threads]
{ {
/// ThreadStatus thread_status; /// ThreadStatus thread_status;
@ -744,9 +739,14 @@ void PipelineExecutor::executeImpl(size_t num_threads)
}); });
} }
for (auto & thread : threads) /// Because ThreadPool::wait() waits until scheduled_jobs will be zero,
if (thread.joinable()) /// and this guarantee that the job was reseted, otherwise
thread.join(); /// some variables that job is referencing may be already destroyed,
/// while job hasn't been destoyed yet (example that pops up --
/// PipelineExecutor -> ThreadGroupStatusPtr -> MemoryTracker ->
/// ~MemoryTracker -> log, while log had been destroyed already)
/// (see 01505_pipeline_executor_UAF)
threads.wait();
} }
else else
executeSingleThread(0, num_threads); executeSingleThread(0, num_threads);