Merge pull request #29954 from vitlibar/fix-flaky-test_grpc_protocol

gRPC: Fix releasing query ID and session ID at the end of query processing
This commit is contained in:
Vitaly Baranov 2021-10-12 13:32:27 +03:00 committed by GitHub
commit 4a4d913cfd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 29 additions and 2 deletions

View File

@ -474,5 +474,14 @@ ContextMutablePtr Session::makeQueryContextImpl(const ClientInfo * client_info_t
return query_context; return query_context;
} }
void Session::releaseSessionID()
{
if (!named_session)
return;
named_session->release();
named_session = nullptr;
}
} }

View File

@ -68,6 +68,9 @@ public:
ContextMutablePtr makeQueryContext(const ClientInfo & query_client_info) const; ContextMutablePtr makeQueryContext(const ClientInfo & query_client_info) const;
ContextMutablePtr makeQueryContext(ClientInfo && query_client_info) const; ContextMutablePtr makeQueryContext(ClientInfo && query_client_info) const;
/// Releases the currently used session ID so it becomes available for reuse by another session.
void releaseSessionID();
private: private:
std::shared_ptr<SessionLog> getSessionLog() const; std::shared_ptr<SessionLog> getSessionLog() const;
ContextMutablePtr makeQueryContextImpl(const ClientInfo * client_info_to_copy, ClientInfo * client_info_to_move) const; ContextMutablePtr makeQueryContextImpl(const ClientInfo * client_info_to_copy, ClientInfo * client_info_to_move) const;

View File

@ -587,6 +587,7 @@ namespace
void finishQuery(); void finishQuery();
void onException(const Exception & exception); void onException(const Exception & exception);
void onFatalError(); void onFatalError();
void releaseQueryIDAndSessionID();
void close(); void close();
void readQueryInfo(); void readQueryInfo();
@ -1178,6 +1179,7 @@ namespace
addProgressToResult(); addProgressToResult();
query_scope->logPeakMemoryUsage(); query_scope->logPeakMemoryUsage();
addLogsToResult(); addLogsToResult();
releaseQueryIDAndSessionID();
sendResult(); sendResult();
close(); close();
@ -1208,6 +1210,8 @@ namespace
LOG_WARNING(log, "Couldn't send logs to client"); LOG_WARNING(log, "Couldn't send logs to client");
} }
releaseQueryIDAndSessionID();
try try
{ {
sendException(exception); sendException(exception);
@ -1227,7 +1231,7 @@ namespace
{ {
try try
{ {
finalize = true; result.mutable_exception()->set_name("FatalError");
addLogsToResult(); addLogsToResult();
sendResult(); sendResult();
} }
@ -1237,6 +1241,17 @@ namespace
} }
} }
void Call::releaseQueryIDAndSessionID()
{
/// releaseQueryIDAndSessionID() should be called before sending the final result to the client
/// because the client may decide to send another query with the same query ID or session ID
/// immediately after it receives our final result, and it's prohibited to have
/// two queries executed at the same time with the same query ID or session ID.
io.process_list_entry.reset();
if (session)
session->releaseSessionID();
}
void Call::close() void Call::close()
{ {
responder.reset(); responder.reset();

View File

@ -211,7 +211,7 @@ def test_errors_handling():
assert "Table default.t already exists" in e.display_text assert "Table default.t already exists" in e.display_text
def test_authentication(): def test_authentication():
query("CREATE USER john IDENTIFIED BY 'qwe123'") query("CREATE USER OR REPLACE john IDENTIFIED BY 'qwe123'")
assert query("SELECT currentUser()", user_name="john", password="qwe123") == "john\n" assert query("SELECT currentUser()", user_name="john", password="qwe123") == "john\n"
def test_logs(): def test_logs():