Fix lock in case of query was cancelled.

This commit is contained in:
Nikolai Kochetov 2019-07-08 13:14:36 +03:00
parent fc3b465d60
commit e5103d741c
3 changed files with 15 additions and 1 deletions

View File

@ -514,6 +514,10 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
{
/// A packet was received requesting to stop execution of the request.
executor->cancel();
/// Clear queue in case if somebody is waiting lazy_format to push.
lazy_format->finish();
lazy_format->clearQueue();
break;
}
else

View File

@ -26,8 +26,16 @@ public:
void setRowsBeforeLimit(size_t rows_before_limit) override;
void finish() { finished_processing = true; }
void clearQueue() { queue.clear(); }
protected:
void consume(Chunk chunk) override { queue.emplace(std::move(chunk)); }
void consume(Chunk chunk) override
{
if (!finished_processing)
queue.emplace(std::move(chunk));
}
void consumeTotals(Chunk chunk) override { totals = std::move(chunk); }
void consumeExtremes(Chunk chunk) override { extremes = std::move(chunk); }

View File

@ -15,6 +15,8 @@ public:
inputs.front().close();
return Status::Finished;
}
InputPort & getPort() { return inputs.front(); }
};
}