Refactor RemoteQueryExecutor, make it more thread safe

This commit is contained in:
avogar 2023-03-23 19:52:37 +00:00
parent 783e3ab789
commit bf7e62ff56
5 changed files with 79 additions and 48 deletions

View File

@ -214,8 +214,28 @@ ConnectionPoolWithFailover::tryGetEntry(
const QualifiedTableName * table_to_check,
AsyncCallback async_callback)
{
if (async_callback)
{
ConnectionEstablisherAsync connection_establisher_async(&pool, &timeouts, settings, log, table_to_check);
while (true)
{
connection_establisher_async.resume();
if (connection_establisher_async.isFinished())
break;
async_callback(
connection_establisher_async.getFileDescriptor(),
0,
AsyncEventTimeoutType::NONE,
"Connection establisher file descriptor",
AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR);
}
fail_message = connection_establisher_async.getFailMessage();
return connection_establisher_async.getResult();
}
ConnectionEstablisher connection_establisher(&pool, &timeouts, settings, log, table_to_check);
connection_establisher.setAsyncCallback(std::move(async_callback));
TryResult result;
connection_establisher.run(result, fail_message);
return result;

View File

@ -388,7 +388,7 @@ int HedgedConnections::getReadyFileDescriptor(AsyncCallback async_callback)
{
events_count = epoll.getManyReady(1, &event, blocking);
if (!events_count && async_callback)
async_callback(epoll.getFileDescriptor(), 0, AsyncEventTimeoutType::NONE, epoll.getDescription(), EPOLLIN | EPOLLPRI | EPOLLERR);
async_callback(epoll.getFileDescriptor(), 0, AsyncEventTimeoutType::NONE, epoll.getDescription(), AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR);
}
return event.data.fd;
}

View File

@ -266,7 +266,7 @@ int HedgedConnectionsFactory::getReadyFileDescriptor(bool blocking, AsyncCallbac
{
events_count = epoll.getManyReady(1, &event, !static_cast<bool>(async_callback));
if (!events_count && async_callback)
async_callback(epoll.getFileDescriptor(), 0, AsyncEventTimeoutType::NONE, epoll.getDescription(), EPOLLIN | EPOLLPRI | EPOLLERR);
async_callback(epoll.getFileDescriptor(), 0, AsyncEventTimeoutType::NONE, epoll.getDescription(), AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR);
}
return event.data.fd;
}

View File

@ -245,10 +245,6 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As
established = true;
/// Query could be cancelled during creating connections. No need to send a query.
if (was_cancelled)
return;
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
ClientInfo modified_client_info = context->getClientInfo();
modified_client_info.query_kind = query_kind;
@ -271,11 +267,12 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As
int RemoteQueryExecutor::sendQueryAsync()
{
std::lock_guard lock(was_cancelled_mutex);
if (was_cancelled)
return -1;
if (!read_context)
{
std::lock_guard lock(was_cancelled_mutex);
read_context = std::make_unique<ReadContext>(*this, /*suspend_when_query_sent*/ true);
}
/// If query already sent, do nothing. Note that we cannot use sent_query flag here,
/// because we can still be in process of sending scalars or external tables.
@ -322,8 +319,10 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::read()
return anything;
if (got_duplicated_part_uuids)
return restartQueryWithoutDuplicatedUUIDs();
break;
}
return restartQueryWithoutDuplicatedUUIDs();
}
RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
@ -332,38 +331,38 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
if (!read_context || (resent_query && recreate_read_context))
{
std::lock_guard lock(was_cancelled_mutex);
if (was_cancelled)
return ReadResult(Block());
read_context = std::make_unique<ReadContext>(*this);
recreate_read_context = false;
}
while (true)
{
std::lock_guard lock(was_cancelled_mutex);
if (was_cancelled)
return ReadResult(Block());
read_context->resume();
if (needToSkipUnavailableShard())
return ReadResult(Block());
if (read_context->isCancelled())
return ReadResult(Block());
/// Check if packet is not ready yet.
if (read_context->isInProgress())
return ReadResult(read_context->getFileDescriptor());
/// We need to check that query was not cancelled again,
/// to avoid the race between cancel() thread and read() thread.
/// (since cancel() thread will steal the fiber and may update the packet).
if (was_cancelled)
return ReadResult(Block());
auto anything = processPacket(read_context->getPacket());
if (anything.getType() == ReadResult::Type::Data || anything.getType() == ReadResult::Type::ParallelReplicasToken)
return anything;
if (got_duplicated_part_uuids)
return restartQueryWithoutDuplicatedUUIDs();
break;
}
return restartQueryWithoutDuplicatedUUIDs();
#else
return read();
#endif
@ -372,13 +371,19 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
RemoteQueryExecutor::ReadResult RemoteQueryExecutor::restartQueryWithoutDuplicatedUUIDs()
{
/// Cancel previous query and disconnect before retry.
cancel();
connections->disconnect();
/// Only resend once, otherwise throw an exception
if (!resent_query)
{
std::lock_guard lock(was_cancelled_mutex);
if (was_cancelled)
return ReadResult(Block());
/// Cancel previous query and disconnect before retry.
cancelUnlocked();
connections->disconnect();
/// Only resend once, otherwise throw an exception
if (resent_query)
throw Exception(ErrorCodes::DUPLICATED_PART_UUIDS, "Found duplicate uuids while processing query");
if (log)
LOG_DEBUG(log, "Found duplicate UUIDs, will retry query without those parts");
@ -386,13 +391,14 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::restartQueryWithoutDuplicat
recreate_read_context = true;
sent_query = false;
got_duplicated_part_uuids = false;
/// Consecutive read will implicitly send query first.
if (!read_context)
return read();
else
return readAsync();
was_cancelled = false;
}
throw Exception(ErrorCodes::DUPLICATED_PART_UUIDS, "Found duplicate uuids while processing query");
/// Consecutive read will implicitly send query first.
if (!read_context)
return read();
else
return readAsync();
}
RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet)
@ -534,6 +540,8 @@ void RemoteQueryExecutor::processMergeTreeInitialReadAnnounecement(InitialAllRan
void RemoteQueryExecutor::finish()
{
std::lock_guard guard(was_cancelled_mutex);
/** If one of:
* - nothing started to do;
* - received all packets before EndOfStream;
@ -590,6 +598,12 @@ void RemoteQueryExecutor::finish()
}
void RemoteQueryExecutor::cancel()
{
std::lock_guard guard(was_cancelled_mutex);
cancelUnlocked();
}
void RemoteQueryExecutor::cancelUnlocked()
{
{
std::lock_guard lock(external_tables_mutex);
@ -676,10 +690,6 @@ void RemoteQueryExecutor::sendExternalTables()
void RemoteQueryExecutor::tryCancel(const char * reason)
{
/// Flag was_cancelled is atomic because it is checked in read(),
/// in case of packet had been read by fiber (async_socket_for_remote).
std::lock_guard guard(was_cancelled_mutex);
if (was_cancelled)
return;
@ -688,8 +698,8 @@ void RemoteQueryExecutor::tryCancel(const char * reason)
if (read_context)
read_context->cancel();
/// Query could be cancelled during connection creation, we should check
/// if connections were already created.
/// Query could be cancelled during connection creation or query sending,
/// we should check if connections were already created and query were sent.
if (connections && sent_query)
{
connections->sendCancel();

View File

@ -94,8 +94,8 @@ public:
int sendQueryAsync();
/// Query is resent to a replica, the query itself can be modified.
std::atomic<bool> resent_query { false };
std::atomic<bool> recreate_read_context { false };
bool resent_query { false };
bool recreate_read_context { false };
struct ReadResult
{
@ -228,39 +228,39 @@ private:
std::mutex external_tables_mutex;
/// Connections to replicas are established, but no queries are sent yet
std::atomic<bool> established { false };
bool established = false;
/// Query is sent (used before getting first block)
std::atomic<bool> sent_query { false };
bool sent_query { false };
/** All data from all replicas are received, before EndOfStream packet.
* To prevent desynchronization, if not all data is read before object
* destruction, it's required to send cancel query request to replicas and
* read all packets before EndOfStream
*/
std::atomic<bool> finished { false };
bool finished = false;
/** Cancel query request was sent to all replicas because data is not needed anymore
* This behaviour may occur when:
* - data size is already satisfactory (when using LIMIT, for example)
* - an exception was thrown from client side
*/
std::atomic<bool> was_cancelled { false };
bool was_cancelled = false;
std::mutex was_cancelled_mutex;
/** An exception from replica was received. No need in receiving more packets or
* requesting to cancel query execution
*/
std::atomic<bool> got_exception_from_replica { false };
bool got_exception_from_replica = false;
/** Unknown packet was received from replica. No need in receiving more packets or
* requesting to cancel query execution
*/
std::atomic<bool> got_unknown_packet_from_replica { false };
bool got_unknown_packet_from_replica = false;
/** Got duplicated uuids from replica
*/
std::atomic<bool> got_duplicated_part_uuids{ false };
bool got_duplicated_part_uuids = false;
/// Parts uuids, collected from remote replicas
std::mutex duplicated_part_uuids_mutex;
@ -291,6 +291,7 @@ private:
ReadResult restartQueryWithoutDuplicatedUUIDs();
/// If wasn't sent yet, send request to cancel all connections to replicas
void cancelUnlocked();
void tryCancel(const char * reason);
/// Returns true if query was sent