mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 16:12:01 +00:00
commit
5ab82cc019
@ -105,6 +105,7 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int CANNOT_OPEN_FILE;
|
||||
extern const int FILE_ALREADY_EXISTS;
|
||||
extern const int USER_SESSION_LIMIT_EXCEEDED;
|
||||
}
|
||||
|
||||
}
|
||||
@ -2408,6 +2409,13 @@ void ClientBase::runInteractive()
|
||||
}
|
||||
}
|
||||
|
||||
if (suggest && suggest->getLastError() == ErrorCodes::USER_SESSION_LIMIT_EXCEEDED)
|
||||
{
|
||||
// If a separate connection loading suggestions failed to open a new session,
|
||||
// use the main session to receive them.
|
||||
suggest->load(*connection, connection_parameters.timeouts, config().getInt("suggestion_limit"));
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if (!processQueryText(input))
|
||||
|
@ -22,9 +22,11 @@ namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int OK;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int UNKNOWN_PACKET_FROM_SERVER;
|
||||
extern const int DEADLOCK_AVOIDED;
|
||||
extern const int USER_SESSION_LIMIT_EXCEEDED;
|
||||
}
|
||||
|
||||
Suggest::Suggest()
|
||||
@ -121,21 +123,24 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
last_error = e.code();
|
||||
if (e.code() == ErrorCodes::DEADLOCK_AVOIDED)
|
||||
continue;
|
||||
|
||||
/// Client can successfully connect to the server and
|
||||
/// get ErrorCodes::USER_SESSION_LIMIT_EXCEEDED for suggestion connection.
|
||||
|
||||
/// We should not use std::cerr here, because this method works concurrently with the main thread.
|
||||
/// WriteBufferFromFileDescriptor will write directly to the file descriptor, avoiding data race on std::cerr.
|
||||
|
||||
WriteBufferFromFileDescriptor out(STDERR_FILENO, 4096);
|
||||
out << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n";
|
||||
out.next();
|
||||
else if (e.code() != ErrorCodes::USER_SESSION_LIMIT_EXCEEDED)
|
||||
{
|
||||
/// We should not use std::cerr here, because this method works concurrently with the main thread.
|
||||
/// WriteBufferFromFileDescriptor will write directly to the file descriptor, avoiding data race on std::cerr.
|
||||
///
|
||||
/// USER_SESSION_LIMIT_EXCEEDED is ignored here. The client will try to receive
|
||||
/// suggestions using the main connection later.
|
||||
WriteBufferFromFileDescriptor out(STDERR_FILENO, 4096);
|
||||
out << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n";
|
||||
out.next();
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
last_error = getCurrentExceptionCode();
|
||||
WriteBufferFromFileDescriptor out(STDERR_FILENO, 4096);
|
||||
out << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false, true) << "\n";
|
||||
out.next();
|
||||
@ -148,6 +153,21 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p
|
||||
});
|
||||
}
|
||||
|
||||
void Suggest::load(IServerConnection & connection,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
Int32 suggestion_limit)
|
||||
{
|
||||
try
|
||||
{
|
||||
fetch(connection, timeouts, getLoadSuggestionQuery(suggestion_limit, true));
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
std::cerr << "Suggestions loading exception: " << getCurrentExceptionMessage(false, true) << std::endl;
|
||||
last_error = getCurrentExceptionCode();
|
||||
}
|
||||
}
|
||||
|
||||
void Suggest::fetch(IServerConnection & connection, const ConnectionTimeouts & timeouts, const std::string & query)
|
||||
{
|
||||
connection.sendQuery(
|
||||
@ -176,6 +196,7 @@ void Suggest::fetch(IServerConnection & connection, const ConnectionTimeouts & t
|
||||
return;
|
||||
|
||||
case Protocol::Server::EndOfStream:
|
||||
last_error = ErrorCodes::OK;
|
||||
return;
|
||||
|
||||
default:
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <Client/LocalConnection.h>
|
||||
#include <Client/LineReader.h>
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
#include <atomic>
|
||||
#include <thread>
|
||||
|
||||
|
||||
@ -28,9 +29,15 @@ public:
|
||||
template <typename ConnectionType>
|
||||
void load(ContextPtr context, const ConnectionParameters & connection_parameters, Int32 suggestion_limit);
|
||||
|
||||
void load(IServerConnection & connection,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
Int32 suggestion_limit);
|
||||
|
||||
/// Older server versions cannot execute the query loading suggestions.
|
||||
static constexpr int MIN_SERVER_REVISION = DBMS_MIN_PROTOCOL_VERSION_WITH_VIEW_IF_PERMITTED;
|
||||
|
||||
int getLastError() const { return last_error.load(); }
|
||||
|
||||
private:
|
||||
void fetch(IServerConnection & connection, const ConnectionTimeouts & timeouts, const std::string & query);
|
||||
|
||||
@ -38,6 +45,8 @@ private:
|
||||
|
||||
/// Words are fetched asynchronously.
|
||||
std::thread loading_thread;
|
||||
|
||||
std::atomic<int> last_error { -1 };
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -91,5 +91,6 @@
|
||||
"test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_http_named_session",
|
||||
"test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_grpc",
|
||||
"test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_tcp_and_others",
|
||||
"test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_setting_in_query"
|
||||
"test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_setting_in_query",
|
||||
"test_profile_max_sessions_for_user/test.py::test_profile_max_sessions_for_user_client_suggestions_load"
|
||||
]
|
||||
|
@ -10,6 +10,7 @@ import threading
|
||||
from helpers.cluster import ClickHouseCluster, run_and_check
|
||||
from helpers.test_tools import assert_logs_contain_with_retry
|
||||
|
||||
from helpers.uclient import client, prompt
|
||||
|
||||
MAX_SESSIONS_FOR_USER = 2
|
||||
POSTGRES_SERVER_PORT = 5433
|
||||
@ -209,3 +210,36 @@ def test_profile_max_sessions_for_user_tcp_and_others(started_cluster):
|
||||
|
||||
def test_profile_max_sessions_for_user_setting_in_query(started_cluster):
|
||||
instance.query_and_get_error("SET max_sessions_for_user = 10")
|
||||
|
||||
|
||||
def test_profile_max_sessions_for_user_client_suggestions_connection(started_cluster):
|
||||
command_text = f"{started_cluster.get_client_cmd()} --host {instance.ip_address} --port 9000 -u {TEST_USER} --password {TEST_PASSWORD}"
|
||||
command_text_without_suggestions = command_text + " --disable_suggestion"
|
||||
|
||||
# Launch client1 without suggestions to avoid a race condition:
|
||||
# Client1 opens a session.
|
||||
# Client1 opens a session for suggestion connection.
|
||||
# Client2 fails to open a session and gets the USER_SESSION_LIMIT_EXCEEDED error.
|
||||
#
|
||||
# Expected order:
|
||||
# Client1 opens a session.
|
||||
# Client2 opens a session.
|
||||
# Client2 fails to open a session for suggestions and with USER_SESSION_LIMIT_EXCEEDED (No error printed).
|
||||
# Client3 fails to open a session.
|
||||
# Client1 executes the query.
|
||||
# Client2 loads suggestions from the server using the main connection and executes a query.
|
||||
with client(
|
||||
name="client1>", log=None, command=command_text_without_suggestions
|
||||
) as client1:
|
||||
client1.expect(prompt)
|
||||
with client(name="client2>", log=None, command=command_text) as client2:
|
||||
client2.expect(prompt)
|
||||
with client(name="client3>", log=None, command=command_text) as client3:
|
||||
client3.expect("USER_SESSION_LIMIT_EXCEEDED")
|
||||
|
||||
client1.send("SELECT 'CLIENT_1_SELECT' FORMAT CSV")
|
||||
client1.expect("CLIENT_1_SELECT")
|
||||
client1.expect(prompt)
|
||||
client2.send("SELECT 'CLIENT_2_SELECT' FORMAT CSV")
|
||||
client2.expect("CLIENT_2_SELECT")
|
||||
client2.expect(prompt)
|
||||
|
Loading…
Reference in New Issue
Block a user