mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 17:12:03 +00:00
Update TCPHandler.
This commit is contained in:
parent
0e906b29e1
commit
f431b10e38
@ -591,11 +591,9 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
|
||||
}
|
||||
});
|
||||
|
||||
/// Wait in case of exception. Delete pipeline to release memory.
|
||||
/// Wait in case of exception happened outside of pool.
|
||||
SCOPE_EXIT(
|
||||
/// Clear queue in case if somebody is waiting lazy_format to push.
|
||||
lazy_format->finish();
|
||||
lazy_format->clearQueue();
|
||||
|
||||
try
|
||||
{
|
||||
@ -604,72 +602,58 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
|
||||
catch (...)
|
||||
{
|
||||
/// If exception was thrown during pipeline execution, skip it while processing other exception.
|
||||
tryLogCurrentException(log);
|
||||
}
|
||||
|
||||
/// pipeline = QueryPipeline()
|
||||
);
|
||||
|
||||
while (true)
|
||||
while (!lazy_format->isFinished() && !exception)
|
||||
{
|
||||
Block block;
|
||||
|
||||
while (true)
|
||||
if (isQueryCancelled())
|
||||
{
|
||||
if (isQueryCancelled())
|
||||
{
|
||||
/// A packet was received requesting to stop execution of the request.
|
||||
executor->cancel();
|
||||
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (after_send_progress.elapsed() / 1000 >= query_context->getSettingsRef().interactive_delay)
|
||||
{
|
||||
/// Some time passed and there is a progress.
|
||||
after_send_progress.restart();
|
||||
sendProgress();
|
||||
}
|
||||
|
||||
sendLogs();
|
||||
|
||||
if ((block = lazy_format->getBlock(query_context->getSettingsRef().interactive_delay / 1000)))
|
||||
break;
|
||||
|
||||
if (lazy_format->isFinished())
|
||||
break;
|
||||
|
||||
if (exception)
|
||||
{
|
||||
pool.wait();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** If data has run out, we will send the profiling data and total values to
|
||||
* the last zero block to be able to use
|
||||
* this information in the suffix output of stream.
|
||||
* If the request was interrupted, then `sendTotals` and other methods could not be called,
|
||||
* because we have not read all the data yet,
|
||||
* and there could be ongoing calculations in other threads at the same time.
|
||||
*/
|
||||
if (!block && !isQueryCancelled())
|
||||
{
|
||||
pool.wait();
|
||||
pipeline.finalize();
|
||||
|
||||
sendTotals(lazy_format->getTotals());
|
||||
sendExtremes(lazy_format->getExtremes());
|
||||
sendProfileInfo(lazy_format->getProfileInfo());
|
||||
sendProgress();
|
||||
sendLogs();
|
||||
}
|
||||
|
||||
sendData(block);
|
||||
if (!block)
|
||||
/// A packet was received requesting to stop execution of the request.
|
||||
executor->cancel();
|
||||
break;
|
||||
}
|
||||
|
||||
if (after_send_progress.elapsed() / 1000 >= query_context->getSettingsRef().interactive_delay)
|
||||
{
|
||||
/// Some time passed and there is a progress.
|
||||
after_send_progress.restart();
|
||||
sendProgress();
|
||||
}
|
||||
|
||||
sendLogs();
|
||||
|
||||
if (auto block = lazy_format->getBlock(query_context->getSettingsRef().interactive_delay / 1000))
|
||||
{
|
||||
if (!state.io.null_format)
|
||||
sendData(block);
|
||||
}
|
||||
}
|
||||
|
||||
/// Finish lazy_format before waiting. Otherwise some thread may write into it, and waiting will lock.
|
||||
lazy_format->finish();
|
||||
pool.wait();
|
||||
|
||||
/** If data has run out, we will send the profiling data and total values to
|
||||
* the last zero block to be able to use
|
||||
* this information in the suffix output of stream.
|
||||
* If the request was interrupted, then `sendTotals` and other methods could not be called,
|
||||
* because we have not read all the data yet,
|
||||
* and there could be ongoing calculations in other threads at the same time.
|
||||
*/
|
||||
if (!isQueryCancelled())
|
||||
{
|
||||
pipeline.finalize();
|
||||
|
||||
sendTotals(lazy_format->getTotals());
|
||||
sendExtremes(lazy_format->getExtremes());
|
||||
sendProfileInfo(lazy_format->getProfileInfo());
|
||||
sendProgress();
|
||||
sendLogs();
|
||||
}
|
||||
|
||||
sendData({});
|
||||
}
|
||||
|
||||
state.io.onFinish();
|
||||
|
@ -26,8 +26,12 @@ public:
|
||||
|
||||
void setRowsBeforeLimit(size_t rows_before_limit) override;
|
||||
|
||||
void finish() { finished_processing = true; }
|
||||
void clearQueue() { queue.clear(); }
|
||||
void finish()
|
||||
{
|
||||
finished_processing = true;
|
||||
/// Clear queue in case if somebody is waiting lazy_format to push.
|
||||
queue.clear();
|
||||
}
|
||||
|
||||
protected:
|
||||
void consume(Chunk chunk) override
|
||||
|
Loading…
Reference in New Issue
Block a user