From 65f90f2ce9ea9e9d4076f06c58ddd981c82cc098 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 11 Mar 2021 21:52:10 +0300 Subject: [PATCH] Fix distributed requests cancellation with async_socket_for_remote=1 Before this patch for distributed queries, that requires cancellation (simple select from multiple shards with limit, i.e. `select * from remote('127.{2,3}', system.numbers) limit 100`) it is very easy to trigger the situation when remote shard is in the middle of sending Data block while the initiator already send Cancel and expecting some new packet, but it will receive not new packet, but part of the Data block that was in the middle of sending before cancellation, and this will lead to some various errors, like: - Unknown packet X from server Y - Unexpected packet from server Y - and a lot more... Fix this, by correctly waiting for the pending packet before cancellation. It is not very easy to write a test, since localhost is too fast. Also note, that it is not possible to get these errors with hedged requests (use_hedged_requests=1) since handle fibers correctly. But it had been disabled by default for 21.3 in #21534, while async_socket_for_remote is enabled by default. --- .../RemoteQueryExecutorReadContext.cpp | 15 +++++++++++---- src/DataStreams/RemoteQueryExecutorReadContext.h | 4 ++-- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/DataStreams/RemoteQueryExecutorReadContext.cpp b/src/DataStreams/RemoteQueryExecutorReadContext.cpp index c2a65f02d08..11cc2dcd8e4 100644 --- a/src/DataStreams/RemoteQueryExecutorReadContext.cpp +++ b/src/DataStreams/RemoteQueryExecutorReadContext.cpp @@ -104,11 +104,11 @@ void RemoteQueryExecutorReadContext::setConnectionFD(int fd, const Poco::Timespa connection_fd_description = fd_description; } -bool RemoteQueryExecutorReadContext::checkTimeout() const +bool RemoteQueryExecutorReadContext::checkTimeout(bool blocking) const { try { - return checkTimeoutImpl(); + return checkTimeoutImpl(blocking); } catch (DB::Exception & e) { @@ -118,13 +118,13 @@ bool RemoteQueryExecutorReadContext::checkTimeout() const } } -bool RemoteQueryExecutorReadContext::checkTimeoutImpl() const +bool RemoteQueryExecutorReadContext::checkTimeoutImpl(bool blocking) const { /// Wait for epoll will not block if it was polled externally. epoll_event events[3]; events[0].data.fd = events[1].data.fd = events[2].data.fd = -1; - int num_events = epoll.getManyReady(3, events,/* blocking = */ false); + int num_events = epoll.getManyReady(3, events, blocking); bool is_socket_ready = false; bool is_pipe_alarmed = false; @@ -184,9 +184,16 @@ bool RemoteQueryExecutorReadContext::resumeRoutine() void RemoteQueryExecutorReadContext::cancel() { std::lock_guard guard(fiber_lock); + /// It is safe to just destroy fiber - we are not in the process of reading from socket. boost::context::fiber to_destroy = std::move(fiber); + while (is_read_in_progress.load(std::memory_order_relaxed)) + { + checkTimeout(/* blocking= */ true); + to_destroy = std::move(to_destroy).resume(); + } + /// Send something to pipe to cancel executor waiting. uint64_t buf = 0; while (-1 == write(pipe_fd[1], &buf, sizeof(buf))) diff --git a/src/DataStreams/RemoteQueryExecutorReadContext.h b/src/DataStreams/RemoteQueryExecutorReadContext.h index cb6421f78d0..5fbe52469cd 100644 --- a/src/DataStreams/RemoteQueryExecutorReadContext.h +++ b/src/DataStreams/RemoteQueryExecutorReadContext.h @@ -54,8 +54,8 @@ public: explicit RemoteQueryExecutorReadContext(IConnections & connections_); ~RemoteQueryExecutorReadContext(); - bool checkTimeout() const; - bool checkTimeoutImpl() const; + bool checkTimeout(bool blocking = false) const; + bool checkTimeoutImpl(bool blocking) const; void setConnectionFD(int fd, const Poco::Timespan & timeout = 0, const std::string & fd_description = ""); void setTimer() const;