Merge pull request #60845 from lzydmxy/move_connection_drain_from_prepare_to_work

Move connection drain from prepare to work
This commit is contained in:
jsc0218 2024-03-08 14:33:15 -05:00 committed by GitHub
commit afc1bc6766
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 22 additions and 2 deletions

View File

@ -71,19 +71,36 @@ ISource::Status RemoteSource::prepare()
if (is_async_state)
return Status::Async;
if (executor_finished)
return Status::Finished;
Status status = ISource::prepare();
/// To avoid resetting the connection (because of "unfinished" query) in the
/// RemoteQueryExecutor it should be finished explicitly.
if (status == Status::Finished)
{
query_executor->finish();
is_async_state = false;
return status;
need_drain = true;
return Status::Ready;
}
return status;
}
void RemoteSource::work()
{
/// Connection drain is a heavy operation that may take a long time.
/// Therefore we move connection drain from prepare() to work(), and drain multiple connections in parallel.
/// See issue: https://github.com/ClickHouse/ClickHouse/issues/60844
if (need_drain)
{
query_executor->finish();
executor_finished = true;
return;
}
ISource::work();
}
std::optional<Chunk> RemoteSource::tryGenerate()
{
/// onCancel() will do the cancel if the query was sent.

View File

@ -22,6 +22,7 @@ public:
~RemoteSource() override;
Status prepare() override;
void work() override;
String getName() const override { return "Remote"; }
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit.swap(counter); }
@ -39,6 +40,8 @@ protected:
private:
bool was_query_sent = false;
bool need_drain = false;
bool executor_finished = false;
bool add_aggregation_info = false;
RemoteQueryExecutorPtr query_executor;
RowsBeforeLimitCounterPtr rows_before_limit;