Try fix race in RemoteQueryExecutor.

This commit is contained in:
Nikolai Kochetov 2020-12-17 13:07:28 +03:00
parent 53d5012a20
commit a860f128f5
2 changed files with 13 additions and 15 deletions

View File

@ -210,10 +210,13 @@ std::variant<Block, int> RemoteQueryExecutor::read(std::unique_ptr<ReadContext>
return Block(); return Block();
} }
if (!read_context)
{ {
std::lock_guard lock(was_cancelled_mutex); std::lock_guard lock(was_cancelled_mutex);
if (!read_context) if (was_cancelled)
read_context = std::make_unique<ReadContext>(*multiplexed_connections); return Block();
read_context = std::make_unique<ReadContext>(*multiplexed_connections);
} }
do do
@ -312,15 +315,12 @@ void RemoteQueryExecutor::finish(std::unique_ptr<ReadContext> * read_context)
if (!isQueryPending() || hasThrownException()) if (!isQueryPending() || hasThrownException())
return; return;
if (read_context && *read_context)
(*read_context)->cancel();
/** If you have not read all the data yet, but they are no longer needed. /** If you have not read all the data yet, but they are no longer needed.
* This may be due to the fact that the data is sufficient (for example, when using LIMIT). * This may be due to the fact that the data is sufficient (for example, when using LIMIT).
*/ */
/// Send the request to abort the execution of the request, if not already sent. /// Send the request to abort the execution of the request, if not already sent.
tryCancel("Cancelling query because enough data has been read"); tryCancel("Cancelling query because enough data has been read", read_context);
/// Get the remaining packets so that there is no out of sync in the connections to the replicas. /// Get the remaining packets so that there is no out of sync in the connections to the replicas.
Packet packet = multiplexed_connections->drain(); Packet packet = multiplexed_connections->drain();
@ -363,13 +363,7 @@ void RemoteQueryExecutor::cancel(std::unique_ptr<ReadContext> * read_context)
if (!isQueryPending() || hasThrownException()) if (!isQueryPending() || hasThrownException())
return; return;
{ tryCancel("Cancelling query", read_context);
std::lock_guard lock(was_cancelled_mutex);
if (read_context && *read_context)
(*read_context)->cancel();
}
tryCancel("Cancelling query");
} }
void RemoteQueryExecutor::sendScalars() void RemoteQueryExecutor::sendScalars()
@ -421,7 +415,7 @@ void RemoteQueryExecutor::sendExternalTables()
multiplexed_connections->sendExternalTablesData(external_tables_data); multiplexed_connections->sendExternalTablesData(external_tables_data);
} }
void RemoteQueryExecutor::tryCancel(const char * reason) void RemoteQueryExecutor::tryCancel(const char * reason, std::unique_ptr<ReadContext> * read_context)
{ {
{ {
/// Flag was_cancelled is atomic because it is checked in read(). /// Flag was_cancelled is atomic because it is checked in read().
@ -431,6 +425,10 @@ void RemoteQueryExecutor::tryCancel(const char * reason)
return; return;
was_cancelled = true; was_cancelled = true;
if (read_context && *read_context)
(*read_context)->cancel();
multiplexed_connections->sendCancel(); multiplexed_connections->sendCancel();
} }

View File

@ -164,7 +164,7 @@ private:
void sendExternalTables(); void sendExternalTables();
/// If wasn't sent yet, send request to cancel all connections to replicas /// If wasn't sent yet, send request to cancel all connections to replicas
void tryCancel(const char * reason); void tryCancel(const char * reason, std::unique_ptr<ReadContext> * read_context);
/// Returns true if query was sent /// Returns true if query was sent
bool isQueryPending() const; bool isQueryPending() const;