Attach to current thread in threadpool.

This commit is contained in:
Nikolai Kochetov 2019-04-29 18:01:53 +03:00
parent aacc0572e5
commit 1c0cf652ef
6 changed files with 78 additions and 18 deletions

View File

@ -32,6 +32,7 @@
#include <Compression/CompressionFactory.h>
#include <Processors/Formats/LazyOutputFormat.h>
#include <Interpreters/ThreadGroupThreadPoolCallbacks.h>
#include "TCPHandler.h"
@ -487,6 +488,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
setThreadName("QueryPipelineEx");
/// Manually attach and detach thread_group in order to collect metrics after pool.wait() call.
if (thread_group)
CurrentThread::attachTo(thread_group);
@ -495,7 +497,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
CurrentThread::detachQueryIfNotDetached();
);
ThreadPool inner_pool(num_threads, thread_group);
ThreadPool inner_pool(num_threads, std::make_unique<ThreadGroupThreadPoolCallbacks>(thread_group));
try
{

View File

@ -17,14 +17,14 @@ namespace DB
template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads, DB::ThreadGroupStatusPtr thread_group)
: ThreadPoolImpl(max_threads, max_threads, max_threads, thread_group)
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads, ThreadPoolCallbacksPtr callbacks_)
: ThreadPoolImpl(max_threads, max_threads, max_threads, std::move(callbacks_))
{
}
template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads, size_t max_free_threads, size_t queue_size, DB::ThreadGroupStatusPtr thread_group)
: max_threads(max_threads), max_free_threads(max_free_threads), queue_size(queue_size), thread_group(thread_group)
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads, size_t max_free_threads, size_t queue_size, ThreadPoolCallbacksPtr callbacks_)
: max_threads(max_threads), max_free_threads(max_free_threads), queue_size(queue_size), callbacks(std::move(callbacks_))
{
}
@ -142,12 +142,12 @@ size_t ThreadPoolImpl<Thread>::active() const
template <typename Thread>
void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_it)
{
if (thread_group)
DB::CurrentThread::attachToIfDetached(thread_group);
if (callbacks)
callbacks->onThreadStart();
SCOPE_EXIT(
if (thread_group)
DB::CurrentThread::detachQueryIfNotDetached()
if (callbacks)
callbacks->onThreadFinish();
);
while (true)

View File

@ -13,11 +13,18 @@
#include <Poco/Event.h>
#include <Common/ThreadStatus.h>
namespace DB
/// Callbacks which can be passed to ThreadPool.
/// Are used to calculate metrics on thread.
class ThreadPoolCallbacks
{
class ThreadGroupStatus;
using ThreadGroupStatusPtr = std::shared_ptr<ThreadGroupStatus>;
}
public:
virtual void onThreadStart() = 0;
virtual void onThreadFinish() = 0;
virtual ~ThreadPoolCallbacks() = default;
};
using ThreadPoolCallbacksPtr = std::unique_ptr<ThreadPoolCallbacks>;
/** Very simple thread pool similar to boost::threadpool.
* Advantages:
@ -36,10 +43,10 @@ public:
using Job = std::function<void()>;
/// Size is constant. Up to num_threads are created on demand and then run until shutdown.
explicit ThreadPoolImpl(size_t max_threads, DB::ThreadGroupStatusPtr thread_group = nullptr);
explicit ThreadPoolImpl(size_t max_threads, ThreadPoolCallbacksPtr callbacks_ = nullptr);
/// queue_size - maximum number of running plus scheduled jobs. It can be greater than max_threads. Zero means unlimited.
ThreadPoolImpl(size_t max_threads, size_t max_free_threads, size_t queue_size, DB::ThreadGroupStatusPtr thread_group = nullptr);
ThreadPoolImpl(size_t max_threads, size_t max_free_threads, size_t queue_size, ThreadPoolCallbacksPtr callbacks_ = nullptr);
/// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown.
/// If an exception in some thread was thrown, method silently returns, and exception will be rethrown only on call to 'wait' function.
@ -95,7 +102,7 @@ private:
std::list<Thread> threads;
std::exception_ptr first_exception;
DB::ThreadGroupStatusPtr thread_group;
ThreadPoolCallbacksPtr callbacks;
template <typename ReturnType>
ReturnType scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds);

View File

@ -0,0 +1,24 @@
#include <Interpreters/ThreadGroupThreadPoolCallbacks.h>
#include <Common/CurrentThread.h>
namespace DB
{
ThreadGroupThreadPoolCallbacks::ThreadGroupThreadPoolCallbacks(ThreadGroupStatusPtr thread_group_)
: thread_group(std::move(thread_group_))
{
}
void ThreadGroupThreadPoolCallbacks::onThreadStart()
{
if (thread_group)
DB::CurrentThread::attachToIfDetached(thread_group);
}
void ThreadGroupThreadPoolCallbacks::onThreadFinish()
{
if (thread_group)
DB::CurrentThread::detachQueryIfNotDetached();
}
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <Common/ThreadPool.h>
namespace DB
{
class ThreadGroupStatus;
using ThreadGroupStatusPtr = std::shared_ptr<ThreadGroupStatus>;
/// Attach/detach thread from ThreadPool to specified thread_group.
class ThreadGroupThreadPoolCallbacks : public ThreadPoolCallbacks
{
public:
explicit ThreadGroupThreadPoolCallbacks(ThreadGroupStatusPtr thread_group_);
void onThreadStart() override;
void onThreadFinish() override;
private:
ThreadGroupStatusPtr thread_group;
};
}

View File

@ -27,7 +27,8 @@
#include <Interpreters/QueryLog.h>
#include <Interpreters/InterpreterSetQuery.h>
#include <Interpreters/executeQuery.h>
#include "DNSCacheUpdater.h"
#include <Interpreters/DNSCacheUpdater.h>
#include <Interpreters/ThreadGroupThreadPoolCallbacks.h>
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
@ -624,7 +625,8 @@ void executeQuery(
auto executor = pipeline.execute();
{
ThreadPool pool(context.getSettingsRef().max_threads, CurrentThread::getGroup());
ThreadPool pool(context.getSettingsRef().max_threads,
std::make_unique<ThreadGroupThreadPoolCallbacks>(CurrentThread::getGroup()));
executor->execute(&pool);
}
pipeline.finalize();