Fix performance introspection.

This commit is contained in:
Nikolai Kochetov 2019-05-14 15:53:20 +03:00
parent 96174e90b0
commit a43f789338

View File

@ -478,107 +478,108 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
auto lazy_format = std::make_shared<LazyOutputFormat>(pipeline.getHeader());
pipeline.setOutput(lazy_format);
ThreadPool pool(1);
auto executor = pipeline.execute();
std::atomic_bool exception = false;
auto thread_group = CurrentThread::getGroup();
pool.schedule([&]()
{
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
setThreadName("QueryPipelineEx");
auto thread_group = CurrentThread::getGroup();
ThreadPool pool(1, std::make_unique<ThreadGroupThreadPoolCallbacks>(thread_group));
auto executor = pipeline.execute();
std::atomic_bool exception = false;
/// Manually attach and detach thread_group in order to collect metrics after pool.wait() call.
if (thread_group)
CurrentThread::attachTo(thread_group);
pool.schedule([&]()
{
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
setThreadName("QueryPipelineEx");
/// Manually attach and detach thread_group in order to collect metrics after pool.wait() call.
// if (thread_group)
// CurrentThread::attachTo(thread_group);
//
// SCOPE_EXIT(
// if (thread_group)
// CurrentThread::detachQueryIfNotDetached();
// );
ThreadPool inner_pool(num_threads,
std::make_unique<ThreadGroupThreadPoolCallbacks>(thread_group));
try
{
executor->execute(&inner_pool);
}
catch (...)
{
exception = true;
throw;
}
});
/// Wait in case of exception. Delete pipeline to release memory.
SCOPE_EXIT(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
pool.wait();
pipeline = QueryPipeline()
);
ThreadPool inner_pool(num_threads, std::make_unique<ThreadGroupThreadPoolCallbacks>(thread_group));
try
{
executor->execute(&inner_pool);
}
catch (...)
{
exception = true;
throw;
}
});
/// Wait in case of exception. Delete pipeline to release memory.
SCOPE_EXIT(
pool.wait();
pipeline = QueryPipeline()
);
while (true)
{
Block block;
while (true)
{
if (isQueryCancelled())
{
/// A packet was received requesting to stop execution of the request.
executor->cancel();
break;
}
else
{
if (after_send_progress.elapsed() / 1000 >= query_context->getSettingsRef().interactive_delay)
{
/// Some time passed and there is a progress.
after_send_progress.restart();
sendProgress();
}
Block block;
while (true)
{
if (isQueryCancelled())
{
/// A packet was received requesting to stop execution of the request.
executor->cancel();
break;
}
else
{
if (after_send_progress.elapsed() / 1000 >= query_context->getSettingsRef().interactive_delay)
{
/// Some time passed and there is a progress.
after_send_progress.restart();
sendProgress();
}
sendLogs();
if ((block = lazy_format->getBlock(query_context->getSettingsRef().interactive_delay / 1000)))
break;
if (lazy_format->isFinished())
break;
if (exception)
{
pool.wait();
break;
}
}
}
/** If data has run out, we will send the profiling data and total values to
* the last zero block to be able to use
* this information in the suffix output of stream.
* If the request was interrupted, then `sendTotals` and other methods could not be called,
* because we have not read all the data yet,
* and there could be ongoing calculations in other threads at the same time.
*/
if (!block && !isQueryCancelled())
{
pool.wait();
pipeline.finalize();
sendTotals(lazy_format->getTotals());
sendExtremes(lazy_format->getExtremes());
sendProfileInfo(lazy_format->getProfileInfo());
sendProgress();
sendLogs();
if ((block = lazy_format->getBlock(query_context->getSettingsRef().interactive_delay / 1000)))
break;
if (lazy_format->isFinished())
break;
if (exception)
{
pool.wait();
break;
}
}
sendData(block);
if (!block)
break;
}
/** If data has run out, we will send the profiling data and total values to
* the last zero block to be able to use
* this information in the suffix output of stream.
* If the request was interrupted, then `sendTotals` and other methods could not be called,
* because we have not read all the data yet,
* and there could be ongoing calculations in other threads at the same time.
*/
if (!block && !isQueryCancelled())
{
pool.wait();
pipeline.finalize();
sendTotals(lazy_format->getTotals());
sendExtremes(lazy_format->getExtremes());
sendProfileInfo(lazy_format->getProfileInfo());
sendProgress();
sendLogs();
}
sendData(block);
if (!block)
break;
}
pool.wait();
state.io.onFinish();
}