diff --git a/src/DataStreams/RemoteBlockInputStream.cpp b/src/DataStreams/RemoteBlockInputStream.cpp index 9d9f629d463..6be189503e9 100644 --- a/src/DataStreams/RemoteBlockInputStream.cpp +++ b/src/DataStreams/RemoteBlockInputStream.cpp @@ -359,12 +359,17 @@ void RemoteBlockInputStream::sendQuery() void RemoteBlockInputStream::tryCancel(const char * reason) { - bool old_val = false; - if (!was_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) - return; + { + std::lock_guard guard(was_cancelled_mutex); + + if (was_cancelled) + return; + + was_cancelled = true; + multiplexed_connections->sendCancel(); + } LOG_TRACE(log, "(" << multiplexed_connections->dumpAddresses() << ") " << reason); - multiplexed_connections->sendCancel(); } bool RemoteBlockInputStream::isQueryPending() const diff --git a/src/DataStreams/RemoteBlockInputStream.h b/src/DataStreams/RemoteBlockInputStream.h index 783811f2521..66b1ebbb6c3 100644 --- a/src/DataStreams/RemoteBlockInputStream.h +++ b/src/DataStreams/RemoteBlockInputStream.h @@ -135,7 +135,8 @@ private: * - data size is already satisfactory (when using LIMIT, for example) * - an exception was thrown from client side */ - std::atomic was_cancelled { false }; + bool was_cancelled { false }; + std::mutex was_cancelled_mutex; /** An exception from replica was received. No need in receiving more packets or * requesting to cancel query execution