mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
proper destruction order of AsyncLoader::Pool fields
This commit is contained in:
parent
bd4f8524bf
commit
d2d72794a1
@ -49,6 +49,7 @@ void logAboutProgress(LoggerPtr log, size_t processed, size_t total, AtomicStopw
|
|||||||
AsyncLoader::Pool::Pool(const AsyncLoader::PoolInitializer & init)
|
AsyncLoader::Pool::Pool(const AsyncLoader::PoolInitializer & init)
|
||||||
: name(init.name)
|
: name(init.name)
|
||||||
, priority(init.priority)
|
, priority(init.priority)
|
||||||
|
, max_threads(init.max_threads > 0 ? init.max_threads : getNumberOfPhysicalCPUCores())
|
||||||
, thread_pool(std::make_unique<ThreadPool>(
|
, thread_pool(std::make_unique<ThreadPool>(
|
||||||
init.metric_threads,
|
init.metric_threads,
|
||||||
init.metric_active_threads,
|
init.metric_active_threads,
|
||||||
@ -56,17 +57,16 @@ AsyncLoader::Pool::Pool(const AsyncLoader::PoolInitializer & init)
|
|||||||
/* max_threads = */ std::numeric_limits<size_t>::max(), // Unlimited number of threads, we do worker management ourselves
|
/* max_threads = */ std::numeric_limits<size_t>::max(), // Unlimited number of threads, we do worker management ourselves
|
||||||
/* max_free_threads = */ 0, // We do not require free threads
|
/* max_free_threads = */ 0, // We do not require free threads
|
||||||
/* queue_size = */0)) // Unlimited queue to avoid blocking during worker spawning
|
/* queue_size = */0)) // Unlimited queue to avoid blocking during worker spawning
|
||||||
, max_threads(init.max_threads > 0 ? init.max_threads : getNumberOfPhysicalCPUCores())
|
|
||||||
{}
|
{}
|
||||||
|
|
||||||
AsyncLoader::Pool::Pool(Pool&& o) noexcept
|
AsyncLoader::Pool::Pool(Pool&& o) noexcept
|
||||||
: name(o.name)
|
: name(o.name)
|
||||||
, priority(o.priority)
|
, priority(o.priority)
|
||||||
, thread_pool(std::move(o.thread_pool))
|
|
||||||
, ready_queue(std::move(o.ready_queue))
|
, ready_queue(std::move(o.ready_queue))
|
||||||
, max_threads(o.max_threads)
|
, max_threads(o.max_threads)
|
||||||
, workers(o.workers)
|
, workers(o.workers)
|
||||||
, suspended_workers(o.suspended_workers.load()) // All these constructors are needed because std::atomic is neither copy-constructible, nor move-constructible. We never move pools after init, so it is safe.
|
, suspended_workers(o.suspended_workers.load()) // All these constructors are needed because std::atomic is neither copy-constructible, nor move-constructible. We never move pools after init, so it is safe.
|
||||||
|
, thread_pool(std::move(o.thread_pool))
|
||||||
{}
|
{}
|
||||||
|
|
||||||
void cancelOnDependencyFailure(const LoadJobPtr & self, const LoadJobPtr & dependency, std::exception_ptr & cancel)
|
void cancelOnDependencyFailure(const LoadJobPtr & self, const LoadJobPtr & dependency, std::exception_ptr & cancel)
|
||||||
|
@ -365,11 +365,11 @@ private:
|
|||||||
{
|
{
|
||||||
const String name;
|
const String name;
|
||||||
const Priority priority;
|
const Priority priority;
|
||||||
std::unique_ptr<ThreadPool> thread_pool; // NOTE: we avoid using a `ThreadPool` queue to be able to move jobs between pools.
|
|
||||||
std::map<UInt64, LoadJobPtr> ready_queue; // FIFO queue of jobs to be executed in this pool. Map is used for faster erasing. Key is `ready_seqno`
|
std::map<UInt64, LoadJobPtr> ready_queue; // FIFO queue of jobs to be executed in this pool. Map is used for faster erasing. Key is `ready_seqno`
|
||||||
size_t max_threads; // Max number of workers to be spawn
|
size_t max_threads; // Max number of workers to be spawn
|
||||||
size_t workers = 0; // Number of currently executing workers
|
size_t workers = 0; // Number of currently executing workers
|
||||||
std::atomic<size_t> suspended_workers{0}; // Number of workers that are blocked by `wait()` call on a job executing in the same pool (for deadlock resolution)
|
std::atomic<size_t> suspended_workers{0}; // Number of workers that are blocked by `wait()` call on a job executing in the same pool (for deadlock resolution)
|
||||||
|
std::unique_ptr<ThreadPool> thread_pool; // NOTE: we avoid using a `ThreadPool` queue to be able to move jobs between pools.
|
||||||
|
|
||||||
explicit Pool(const PoolInitializer & init);
|
explicit Pool(const PoolInitializer & init);
|
||||||
Pool(Pool&& o) noexcept;
|
Pool(Pool&& o) noexcept;
|
||||||
|
Loading…
Reference in New Issue
Block a user