From 26aded5062f73e14f428af0dc2f4280fae813964 Mon Sep 17 00:00:00 2001 From: Alexey Gerasimchuck Date: Thu, 10 Aug 2023 04:11:07 +0000 Subject: [PATCH 1/4] Used main connections for suggestions --- src/Client/ClientBase.cpp | 8 ++++ src/Client/Suggest.cpp | 41 ++++++++++++++----- src/Client/Suggest.h | 9 ++++ tests/integration/parallel_skip.json | 3 +- .../test.py | 18 ++++++++ 5 files changed, 68 insertions(+), 11 deletions(-) diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index a72de2645d4..9e4d79cd323 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -105,6 +105,7 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; extern const int CANNOT_OPEN_FILE; extern const int FILE_ALREADY_EXISTS; + extern const int USER_SESSION_LIMIT_EXCEEDED; } } @@ -2408,6 +2409,13 @@ void ClientBase::runInteractive() } } + if (suggest && suggest->getLastError() == ErrorCodes::USER_SESSION_LIMIT_EXCEEDED) + { + // If a separate connection loading suggestions failed to open a new session, + // use the main session to receive them. + suggest->load(*connection, connection_parameters.timeouts, config().getInt("suggestion_limit")); + } + try { if (!processQueryText(input)) diff --git a/src/Client/Suggest.cpp b/src/Client/Suggest.cpp index 00e0ebd8b91..c854d471fae 100644 --- a/src/Client/Suggest.cpp +++ b/src/Client/Suggest.cpp @@ -22,9 +22,11 @@ namespace DB { namespace ErrorCodes { + extern const int OK; extern const int LOGICAL_ERROR; extern const int UNKNOWN_PACKET_FROM_SERVER; extern const int DEADLOCK_AVOIDED; + extern const int USER_SESSION_LIMIT_EXCEEDED; } Suggest::Suggest() @@ -121,21 +123,24 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p } catch (const Exception & e) { + last_error = e.code(); if (e.code() == ErrorCodes::DEADLOCK_AVOIDED) continue; - - /// Client can successfully connect to the server and - /// get ErrorCodes::USER_SESSION_LIMIT_EXCEEDED for suggestion connection. - - /// We should not use std::cerr here, because this method works concurrently with the main thread. - /// WriteBufferFromFileDescriptor will write directly to the file descriptor, avoiding data race on std::cerr. - - WriteBufferFromFileDescriptor out(STDERR_FILENO, 4096); - out << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n"; - out.next(); + else if (e.code() != ErrorCodes::USER_SESSION_LIMIT_EXCEEDED) + { + /// We should not use std::cerr here, because this method works concurrently with the main thread. + /// WriteBufferFromFileDescriptor will write directly to the file descriptor, avoiding data race on std::cerr. + /// + /// USER_SESSION_LIMIT_EXCEEDED is ignored here. The client will try to receive + /// suggestions using the main connection later. + WriteBufferFromFileDescriptor out(STDERR_FILENO, 4096); + out << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n"; + out.next(); + } } catch (...) { + last_error = getCurrentExceptionCode(); WriteBufferFromFileDescriptor out(STDERR_FILENO, 4096); out << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n"; out.next(); @@ -148,6 +153,21 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p }); } +void Suggest::load(IServerConnection & connection, + const ConnectionTimeouts & timeouts, + Int32 suggestion_limit) +{ + try + { + fetch(connection, timeouts, getLoadSuggestionQuery(suggestion_limit, true)); + } + catch (...) + { + std::cerr << "Suggestions loading exception: " << getCurrentExceptionMessage(false, true) << std::endl; + last_error = getCurrentExceptionCode(); + } +} + void Suggest::fetch(IServerConnection & connection, const ConnectionTimeouts & timeouts, const std::string & query) { connection.sendQuery( @@ -176,6 +196,7 @@ void Suggest::fetch(IServerConnection & connection, const ConnectionTimeouts & t return; case Protocol::Server::EndOfStream: + last_error = ErrorCodes::OK; return; default: diff --git a/src/Client/Suggest.h b/src/Client/Suggest.h index cfe9315879c..5cecdc4501b 100644 --- a/src/Client/Suggest.h +++ b/src/Client/Suggest.h @@ -7,6 +7,7 @@ #include #include #include +#include #include @@ -28,9 +29,15 @@ public: template void load(ContextPtr context, const ConnectionParameters & connection_parameters, Int32 suggestion_limit); + void load(IServerConnection & connection, + const ConnectionTimeouts & timeouts, + Int32 suggestion_limit); + /// Older server versions cannot execute the query loading suggestions. static constexpr int MIN_SERVER_REVISION = DBMS_MIN_PROTOCOL_VERSION_WITH_VIEW_IF_PERMITTED; + int getLastError() const { return last_error.load(); } + private: void fetch(IServerConnection & connection, const ConnectionTimeouts & timeouts, const std::string & query); @@ -38,6 +45,8 @@ private: /// Words are fetched asynchronously. std::thread loading_thread; + + std::atomic last_error { -1 }; }; } diff --git a/tests/integration/parallel_skip.json b/tests/integration/parallel_skip.json index dec51396c51..d056225fee4 100644 --- a/tests/integration/parallel_skip.json +++ b/tests/integration/parallel_skip.json @@ -91,5 +91,6 @@ "test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_http_named_session", "test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_grpc", "test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_tcp_and_others", - "test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_setting_in_query" + "test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_setting_in_query", + "test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_client_suggestions_load" ] diff --git a/tests/integration/test_profile_max_sessions_for_user/test.py b/tests/integration/test_profile_max_sessions_for_user/test.py index 2930262f63e..925fa05881d 100755 --- a/tests/integration/test_profile_max_sessions_for_user/test.py +++ b/tests/integration/test_profile_max_sessions_for_user/test.py @@ -10,6 +10,7 @@ import threading from helpers.cluster import ClickHouseCluster, run_and_check from helpers.test_tools import assert_logs_contain_with_retry +from helpers.uclient import client, prompt MAX_SESSIONS_FOR_USER = 2 POSTGRES_SERVER_PORT = 5433 @@ -209,3 +210,20 @@ def test_profile_max_sessions_for_user_tcp_and_others(started_cluster): def test_profile_max_sessions_for_user_setting_in_query(started_cluster): instance.query_and_get_error("SET max_sessions_for_user = 10") + + +def test_profile_max_sessions_for_user_client_suggestions_connection(started_cluster): + command_text = f"{started_cluster.get_client_cmd()} --host {instance.ip_address} --port 9000 -u {TEST_USER} --password {TEST_PASSWORD}" + with client(name="client1>", log=None, command=command_text) as client1: + client1.expect(prompt) + with client(name="client2>", log=None, command=command_text) as client2: + client2.expect(prompt) + with client(name="client3>", log=None, command=command_text) as client3: + client3.expect("USER_SESSION_LIMIT_EXCEEDED") + + client1.send("SELECT 'CLIENT_1_SELECT' FORMAT CSV") + client1.expect("CLIENT_1_SELECT") + client1.expect(prompt) + client2.send("SELECT 'CLIENT_2_SELECT' FORMAT CSV") + client2.expect("CLIENT_2_SELECT") + client2.expect(prompt) From 0ff5d12788f1656f61c5b8df2a716675aef02f88 Mon Sep 17 00:00:00 2001 From: Alexey Gerasimchuck Date: Thu, 10 Aug 2023 11:14:55 +0000 Subject: [PATCH 2/4] Added decription to the test + race condition fix --- .../test_profile_max_sessions_for_user/test.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_profile_max_sessions_for_user/test.py b/tests/integration/test_profile_max_sessions_for_user/test.py index 925fa05881d..78e201f88b9 100755 --- a/tests/integration/test_profile_max_sessions_for_user/test.py +++ b/tests/integration/test_profile_max_sessions_for_user/test.py @@ -214,7 +214,21 @@ def test_profile_max_sessions_for_user_setting_in_query(started_cluster): def test_profile_max_sessions_for_user_client_suggestions_connection(started_cluster): command_text = f"{started_cluster.get_client_cmd()} --host {instance.ip_address} --port 9000 -u {TEST_USER} --password {TEST_PASSWORD}" - with client(name="client1>", log=None, command=command_text) as client1: + command_text_without_suggestions = command_text + " --disable_suggestion" + + # Launch client1 without suggestions to avoid a race condition: + # Client1 opens a session. + # Client1 opens a session for suggestion connection. + # Client2 fails to open a session and gets the USER_SESSION_LIMIT_EXCEEDED error. + # + # Expected order: + # Client1 opens a session. + # Client2 opens a session. + # Client2 fails to open a session for suggestions and with USER_SESSION_LIMIT_EXCEEDED (No error printed). + # Client3 fails to open a session. + # Client1 executes the query. + # Client2 loads suggestions from the server using the main connection and executes a query. + with client(name="client1>", log=None, command=command_text_without_suggestions) as client1: client1.expect(prompt) with client(name="client2>", log=None, command=command_text) as client2: client2.expect(prompt) From 7ed7707ab7e6ccd6b2f26675f3349b29e703b442 Mon Sep 17 00:00:00 2001 From: Alexey Gerasimchuck Date: Thu, 10 Aug 2023 11:19:16 +0000 Subject: [PATCH 3/4] black run --- tests/integration/test_profile_max_sessions_for_user/test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_profile_max_sessions_for_user/test.py b/tests/integration/test_profile_max_sessions_for_user/test.py index 78e201f88b9..c5c33b1cddb 100755 --- a/tests/integration/test_profile_max_sessions_for_user/test.py +++ b/tests/integration/test_profile_max_sessions_for_user/test.py @@ -228,7 +228,9 @@ def test_profile_max_sessions_for_user_client_suggestions_connection(started_clu # Client3 fails to open a session. # Client1 executes the query. # Client2 loads suggestions from the server using the main connection and executes a query. - with client(name="client1>", log=None, command=command_text_without_suggestions) as client1: + with client( + name="client1>", log=None, command=command_text_without_suggestions + ) as client1: client1.expect(prompt) with client(name="client2>", log=None, command=command_text) as client2: client2.expect(prompt) From 7312de59c508932934c3bc8aa03818d74215e343 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Mon, 14 Aug 2023 23:33:30 +0200 Subject: [PATCH 4/4] empty commit