From f431b10e38116ab9938d4d00cbed040d7b63af43 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 23 Jan 2020 13:04:18 +0300 Subject: [PATCH] Update TCPHandler. --- dbms/programs/server/TCPHandler.cpp | 108 ++++++++---------- .../src/Processors/Formats/LazyOutputFormat.h | 8 +- 2 files changed, 52 insertions(+), 64 deletions(-) diff --git a/dbms/programs/server/TCPHandler.cpp b/dbms/programs/server/TCPHandler.cpp index 29bba1cca5e..1975349fcf1 100644 --- a/dbms/programs/server/TCPHandler.cpp +++ b/dbms/programs/server/TCPHandler.cpp @@ -591,11 +591,9 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads) } }); - /// Wait in case of exception. Delete pipeline to release memory. + /// Wait in case of exception happened outside of pool. SCOPE_EXIT( - /// Clear queue in case if somebody is waiting lazy_format to push. lazy_format->finish(); - lazy_format->clearQueue(); try { @@ -604,72 +602,58 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads) catch (...) { /// If exception was thrown during pipeline execution, skip it while processing other exception. + tryLogCurrentException(log); } - - /// pipeline = QueryPipeline() ); - while (true) + while (!lazy_format->isFinished() && !exception) { - Block block; - - while (true) + if (isQueryCancelled()) { - if (isQueryCancelled()) - { - /// A packet was received requesting to stop execution of the request. - executor->cancel(); - - break; - } - else - { - if (after_send_progress.elapsed() / 1000 >= query_context->getSettingsRef().interactive_delay) - { - /// Some time passed and there is a progress. - after_send_progress.restart(); - sendProgress(); - } - - sendLogs(); - - if ((block = lazy_format->getBlock(query_context->getSettingsRef().interactive_delay / 1000))) - break; - - if (lazy_format->isFinished()) - break; - - if (exception) - { - pool.wait(); - break; - } - } - } - - /** If data has run out, we will send the profiling data and total values to - * the last zero block to be able to use - * this information in the suffix output of stream. - * If the request was interrupted, then `sendTotals` and other methods could not be called, - * because we have not read all the data yet, - * and there could be ongoing calculations in other threads at the same time. - */ - if (!block && !isQueryCancelled()) - { - pool.wait(); - pipeline.finalize(); - - sendTotals(lazy_format->getTotals()); - sendExtremes(lazy_format->getExtremes()); - sendProfileInfo(lazy_format->getProfileInfo()); - sendProgress(); - sendLogs(); - } - - sendData(block); - if (!block) + /// A packet was received requesting to stop execution of the request. + executor->cancel(); break; + } + + if (after_send_progress.elapsed() / 1000 >= query_context->getSettingsRef().interactive_delay) + { + /// Some time passed and there is a progress. + after_send_progress.restart(); + sendProgress(); + } + + sendLogs(); + + if (auto block = lazy_format->getBlock(query_context->getSettingsRef().interactive_delay / 1000)) + { + if (!state.io.null_format) + sendData(block); + } } + + /// Finish lazy_format before waiting. Otherwise some thread may write into it, and waiting will lock. + lazy_format->finish(); + pool.wait(); + + /** If data has run out, we will send the profiling data and total values to + * the last zero block to be able to use + * this information in the suffix output of stream. + * If the request was interrupted, then `sendTotals` and other methods could not be called, + * because we have not read all the data yet, + * and there could be ongoing calculations in other threads at the same time. + */ + if (!isQueryCancelled()) + { + pipeline.finalize(); + + sendTotals(lazy_format->getTotals()); + sendExtremes(lazy_format->getExtremes()); + sendProfileInfo(lazy_format->getProfileInfo()); + sendProgress(); + sendLogs(); + } + + sendData({}); } state.io.onFinish(); diff --git a/dbms/src/Processors/Formats/LazyOutputFormat.h b/dbms/src/Processors/Formats/LazyOutputFormat.h index 56aaf249480..a3bc76e839f 100644 --- a/dbms/src/Processors/Formats/LazyOutputFormat.h +++ b/dbms/src/Processors/Formats/LazyOutputFormat.h @@ -26,8 +26,12 @@ public: void setRowsBeforeLimit(size_t rows_before_limit) override; - void finish() { finished_processing = true; } - void clearQueue() { queue.clear(); } + void finish() + { + finished_processing = true; + /// Clear queue in case if somebody is waiting lazy_format to push. + queue.clear(); + } protected: void consume(Chunk chunk) override