Fix distributed requests cancellation with async_socket_for_remote=1

Before this patch for distributed queries, that requires cancellation
(simple select from multiple shards with limit, i.e. `select * from
remote('127.{2,3}', system.numbers) limit 100`) it is very easy to
trigger the situation when remote shard is in the middle of sending Data
block while the initiator already send Cancel and expecting some new
packet, but it will receive not new packet, but part of the Data block
that was in the middle of sending before cancellation, and this will
lead to some various errors, like:
- Unknown packet X from server Y
- Unexpected packet from server Y
- and a lot more...

Fix this, by correctly waiting for the pending packet before
cancellation.

It is not very easy to write a test, since localhost is too fast.

Also note, that it is not possible to get these errors with hedged
requests (use_hedged_requests=1) since handle fibers correctly.

But it had been disabled by default for 21.3 in #21534, while
async_socket_for_remote is enabled by default.
This commit is contained in:
Azat Khuzhin 2021-03-11 21:52:10 +03:00
parent b30a0c68da
commit 65f90f2ce9
2 changed files with 13 additions and 6 deletions

View File

@ -104,11 +104,11 @@ void RemoteQueryExecutorReadContext::setConnectionFD(int fd, const Poco::Timespa
connection_fd_description = fd_description;
}
bool RemoteQueryExecutorReadContext::checkTimeout() const
bool RemoteQueryExecutorReadContext::checkTimeout(bool blocking) const
{
try
{
return checkTimeoutImpl();
return checkTimeoutImpl(blocking);
}
catch (DB::Exception & e)
{
@ -118,13 +118,13 @@ bool RemoteQueryExecutorReadContext::checkTimeout() const
}
}
bool RemoteQueryExecutorReadContext::checkTimeoutImpl() const
bool RemoteQueryExecutorReadContext::checkTimeoutImpl(bool blocking) const
{
/// Wait for epoll will not block if it was polled externally.
epoll_event events[3];
events[0].data.fd = events[1].data.fd = events[2].data.fd = -1;
int num_events = epoll.getManyReady(3, events,/* blocking = */ false);
int num_events = epoll.getManyReady(3, events, blocking);
bool is_socket_ready = false;
bool is_pipe_alarmed = false;
@ -184,9 +184,16 @@ bool RemoteQueryExecutorReadContext::resumeRoutine()
void RemoteQueryExecutorReadContext::cancel()
{
std::lock_guard guard(fiber_lock);
/// It is safe to just destroy fiber - we are not in the process of reading from socket.
boost::context::fiber to_destroy = std::move(fiber);
while (is_read_in_progress.load(std::memory_order_relaxed))
{
checkTimeout(/* blocking= */ true);
to_destroy = std::move(to_destroy).resume();
}
/// Send something to pipe to cancel executor waiting.
uint64_t buf = 0;
while (-1 == write(pipe_fd[1], &buf, sizeof(buf)))

View File

@ -54,8 +54,8 @@ public:
explicit RemoteQueryExecutorReadContext(IConnections & connections_);
~RemoteQueryExecutorReadContext();
bool checkTimeout() const;
bool checkTimeoutImpl() const;
bool checkTimeout(bool blocking = false) const;
bool checkTimeoutImpl(bool blocking) const;
void setConnectionFD(int fd, const Poco::Timespan & timeout = 0, const std::string & fd_description = "");
void setTimer() const;