diff --git a/src/DataStreams/RemoteQueryExecutor.cpp b/src/DataStreams/RemoteQueryExecutor.cpp index 51d229a1126..14e51ffefdf 100644 --- a/src/DataStreams/RemoteQueryExecutor.cpp +++ b/src/DataStreams/RemoteQueryExecutor.cpp @@ -225,7 +225,7 @@ std::variant RemoteQueryExecutor::read(std::unique_ptr if (!read_context->resumeRoutine()) return Block(); - if (read_context->is_read_in_progress) + if (read_context->is_read_in_progress.load(std::memory_order_relaxed)) { read_context->setTimer(); return read_context->epoll_fd; diff --git a/src/DataStreams/RemoteQueryExecutorReadContext.h b/src/DataStreams/RemoteQueryExecutorReadContext.h index f8c64954b83..6d7099899ae 100644 --- a/src/DataStreams/RemoteQueryExecutorReadContext.h +++ b/src/DataStreams/RemoteQueryExecutorReadContext.h @@ -22,7 +22,7 @@ class RemoteQueryExecutorReadContext public: using Self = RemoteQueryExecutorReadContext; - bool is_read_in_progress = false; + std::atomic_bool is_read_in_progress = false; Packet packet; std::exception_ptr exception; @@ -162,7 +162,7 @@ public: bool resumeRoutine() { - if (is_read_in_progress && !checkTimeout()) + if (is_read_in_progress.load(std::memory_order_relaxed) && !checkTimeout()) return false; { @@ -226,9 +226,9 @@ public: throw; } - read_context.is_read_in_progress = true; + read_context.is_read_in_progress.store(true, std::memory_order_relaxed); fiber = std::move(fiber).resume(); - read_context.is_read_in_progress = false; + read_context.is_read_in_progress.store(false, std::memory_order_relaxed); } };