Undo add thread_group to ThreadPool

This commit is contained in:
Nikolai Kochetov 2019-04-18 21:40:55 +03:00
parent 36392aab6b
commit a07b592a95
3 changed files with 12 additions and 19 deletions

View File

@ -477,15 +477,24 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
auto lazy_format = std::make_shared<LazyOutputFormat>(pipeline.getHeader());
pipeline.setOutput(lazy_format);
ThreadPool pool(1, 1, 1, CurrentThread::getGroup());
ThreadPool pool(1, 1, 1);
auto executor = pipeline.execute(num_threads);
bool exception = false;
auto thread_group = CurrentThread::getGroup();
pool.schedule([&]()
{
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
setThreadName("QueryPipelineEx");
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
try
{
executor->execute();

View File

@ -143,14 +143,6 @@ size_t ThreadPoolImpl<Thread>::active() const
template <typename Thread>
void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_it)
{
if (thread_group)
DB::CurrentThread::attachTo(thread_group);
SCOPE_EXIT(
if (thread_group)
DB::CurrentThread::detachQueryIfNotDetached()
);
while (true)
{
Job job;

View File

@ -13,13 +13,6 @@
#include <Poco/Event.h>
#include <Common/ThreadStatus.h>
namespace DB
{
class ThreadGroupStatus;
using ThreadGroupStatusPtr = std::shared_ptr<ThreadGroupStatus>;
}
/** Very simple thread pool similar to boost::threadpool.
* Advantages:
@ -38,11 +31,10 @@ public:
using Job = std::function<void()>;
/// Size is constant. Up to num_threads are created on demand and then run until shutdown.
explicit ThreadPoolImpl(size_t max_threads, DB::ThreadGroupStatusPtr thread_group_ = nullptr);
explicit ThreadPoolImpl(size_t max_threads);
/// queue_size - maximum number of running plus scheduled jobs. It can be greater than max_threads. Zero means unlimited.
/// Specify thread_group if thread pool is used for concrete query execution (or leave it it nullptr and attach to thread group manually).
ThreadPoolImpl(size_t max_threads, size_t max_free_threads, size_t queue_size, DB::ThreadGroupStatusPtr thread_group_ = nullptr);
ThreadPoolImpl(size_t max_threads, size_t max_free_threads, size_t queue_size);
/// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown.
/// If an exception in some thread was thrown, method silently returns, and exception will be rethrown only on call to 'wait' function.