diff --git a/src/DataStreams/RemoteQueryExecutor.cpp b/src/DataStreams/RemoteQueryExecutor.cpp index 7dacfca8b60..06d8b0b7e37 100644 --- a/src/DataStreams/RemoteQueryExecutor.cpp +++ b/src/DataStreams/RemoteQueryExecutor.cpp @@ -210,10 +210,13 @@ std::variant RemoteQueryExecutor::read(std::unique_ptr return Block(); } + if (!read_context) { std::lock_guard lock(was_cancelled_mutex); - if (!read_context) - read_context = std::make_unique(*multiplexed_connections); + if (was_cancelled) + return Block(); + + read_context = std::make_unique(*multiplexed_connections); } do @@ -312,15 +315,12 @@ void RemoteQueryExecutor::finish(std::unique_ptr * read_context) if (!isQueryPending() || hasThrownException()) return; - if (read_context && *read_context) - (*read_context)->cancel(); - /** If you have not read all the data yet, but they are no longer needed. * This may be due to the fact that the data is sufficient (for example, when using LIMIT). */ /// Send the request to abort the execution of the request, if not already sent. - tryCancel("Cancelling query because enough data has been read"); + tryCancel("Cancelling query because enough data has been read", read_context); /// Get the remaining packets so that there is no out of sync in the connections to the replicas. Packet packet = multiplexed_connections->drain(); @@ -363,13 +363,7 @@ void RemoteQueryExecutor::cancel(std::unique_ptr * read_context) if (!isQueryPending() || hasThrownException()) return; - { - std::lock_guard lock(was_cancelled_mutex); - if (read_context && *read_context) - (*read_context)->cancel(); - } - - tryCancel("Cancelling query"); + tryCancel("Cancelling query", read_context); } void RemoteQueryExecutor::sendScalars() @@ -421,7 +415,7 @@ void RemoteQueryExecutor::sendExternalTables() multiplexed_connections->sendExternalTablesData(external_tables_data); } -void RemoteQueryExecutor::tryCancel(const char * reason) +void RemoteQueryExecutor::tryCancel(const char * reason, std::unique_ptr * read_context) { { /// Flag was_cancelled is atomic because it is checked in read(). @@ -431,6 +425,10 @@ void RemoteQueryExecutor::tryCancel(const char * reason) return; was_cancelled = true; + + if (read_context && *read_context) + (*read_context)->cancel(); + multiplexed_connections->sendCancel(); } diff --git a/src/DataStreams/RemoteQueryExecutor.h b/src/DataStreams/RemoteQueryExecutor.h index 6c668f280a1..d15bd7a2aa0 100644 --- a/src/DataStreams/RemoteQueryExecutor.h +++ b/src/DataStreams/RemoteQueryExecutor.h @@ -164,7 +164,7 @@ private: void sendExternalTables(); /// If wasn't sent yet, send request to cancel all connections to replicas - void tryCancel(const char * reason); + void tryCancel(const char * reason, std::unique_ptr * read_context); /// Returns true if query was sent bool isQueryPending() const;