diff --git a/dbms/programs/server/TCPHandler.cpp b/dbms/programs/server/TCPHandler.cpp index 7e1d0fbc3a2..5a9c83d2c2f 100644 --- a/dbms/programs/server/TCPHandler.cpp +++ b/dbms/programs/server/TCPHandler.cpp @@ -514,6 +514,10 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads) { /// A packet was received requesting to stop execution of the request. executor->cancel(); + /// Clear queue in case if somebody is waiting lazy_format to push. + lazy_format->finish(); + lazy_format->clearQueue(); + break; } else diff --git a/dbms/src/Processors/Formats/LazyOutputFormat.h b/dbms/src/Processors/Formats/LazyOutputFormat.h index 8cb148a0cef..636181d2c96 100644 --- a/dbms/src/Processors/Formats/LazyOutputFormat.h +++ b/dbms/src/Processors/Formats/LazyOutputFormat.h @@ -26,8 +26,16 @@ public: void setRowsBeforeLimit(size_t rows_before_limit) override; + void finish() { finished_processing = true; } + void clearQueue() { queue.clear(); } + protected: - void consume(Chunk chunk) override { queue.emplace(std::move(chunk)); } + void consume(Chunk chunk) override + { + if (!finished_processing) + queue.emplace(std::move(chunk)); + } + void consumeTotals(Chunk chunk) override { totals = std::move(chunk); } void consumeExtremes(Chunk chunk) override { extremes = std::move(chunk); } diff --git a/dbms/src/Processors/NullSink.h b/dbms/src/Processors/NullSink.h index 1b2a1c5a3bc..e4968daee29 100644 --- a/dbms/src/Processors/NullSink.h +++ b/dbms/src/Processors/NullSink.h @@ -15,6 +15,8 @@ public: inputs.front().close(); return Status::Finished; } + + InputPort & getPort() { return inputs.front(); } }; }