mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
separate limits on number of waiting and executing queries
This commit is contained in:
parent
9c4b15c1c1
commit
0f0ea422f2
@ -379,6 +379,18 @@ Type: UInt64
|
||||
|
||||
Default: 0
|
||||
|
||||
## max_waiting_queries
|
||||
|
||||
Limit on total number of concurrently waiting queries. Execution of a waiting query is blocked while required tables are loading asynchronously (see `async_load_databases`). Note that waiting queries are not counted when `max_concurrent_queries`, `max_concurrent_insert_queries`, `max_concurrent_select_queries`, `max_concurrent_queries_for_user` and `max_concurrent_queries_for_all_users` limits are checked. This correction is done to avoid hitting these limits just after server startup. Zero means unlimited.
|
||||
|
||||
:::note
|
||||
This setting can be modified at runtime and will take effect immediately. Queries that are already running will remain unchanged.
|
||||
:::
|
||||
|
||||
Type: UInt64
|
||||
|
||||
Default: 0
|
||||
|
||||
## max_connections
|
||||
|
||||
Max server connections.
|
||||
@ -1725,7 +1737,7 @@ Default value: `0.5`.
|
||||
|
||||
Asynchronous loading of databases and tables.
|
||||
|
||||
If `true` all non-system databases with `Ordinary`, `Atomic` and `Replicated` engine will be loaded asynchronously after the ClickHouse server start up. See `system.asynchronous_loader` table, `tables_loader_background_pool_size` and `tables_loader_foreground_pool_size` server settings. Any query that tries to access a table, that is not yet loaded, will wait for exactly this table to be started up. If load job fails, query will rethrow an error (instead of shutting down the whole server in case of `async_load_databases = false`). The table that is waited for by at least one query will be loaded with higher priority. DDL queries on a database will wait for exactly that database to be started up.
|
||||
If `true` all non-system databases with `Ordinary`, `Atomic` and `Replicated` engine will be loaded asynchronously after the ClickHouse server start up. See `system.asynchronous_loader` table, `tables_loader_background_pool_size` and `tables_loader_foreground_pool_size` server settings. Any query that tries to access a table, that is not yet loaded, will wait for exactly this table to be started up. If load job fails, query will rethrow an error (instead of shutting down the whole server in case of `async_load_databases = false`). The table that is waited for by at least one query will be loaded with higher priority. DDL queries on a database will wait for exactly that database to be started up. Also consider setting a limit `max_waiting_queries` for the total number of waiting queries.
|
||||
|
||||
If `false`, all databases are loaded when the server starts.
|
||||
|
||||
@ -2926,7 +2938,7 @@ Default: 0
|
||||
|
||||
## ignore_empty_sql_security_in_create_view_query {#ignore_empty_sql_security_in_create_view_query}
|
||||
|
||||
If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries.
|
||||
If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries.
|
||||
|
||||
:::note
|
||||
This setting is only necessary for the migration period and will become obsolete in 24.4
|
||||
|
@ -1429,6 +1429,7 @@ try
|
||||
global_context->getProcessList().setMaxSize(new_server_settings.max_concurrent_queries);
|
||||
global_context->getProcessList().setMaxInsertQueriesAmount(new_server_settings.max_concurrent_insert_queries);
|
||||
global_context->getProcessList().setMaxSelectQueriesAmount(new_server_settings.max_concurrent_select_queries);
|
||||
global_context->getProcessList().setMaxWaitingQueriesAmount(new_server_settings.max_waiting_queries);
|
||||
|
||||
if (config->has("keeper_server"))
|
||||
global_context->updateKeeperConfiguration(*config);
|
||||
|
@ -140,6 +140,11 @@ void LoadJob::finish()
|
||||
finish_time = std::chrono::system_clock::now();
|
||||
if (waiters > 0)
|
||||
finished.notify_all();
|
||||
else
|
||||
{
|
||||
on_waiters_increment = {};
|
||||
on_waiters_decrement = {};
|
||||
}
|
||||
}
|
||||
|
||||
void LoadJob::scheduled(UInt64 job_id_)
|
||||
@ -765,11 +770,25 @@ void AsyncLoader::wait(std::unique_lock<std::mutex> & job_lock, const LoadJobPtr
|
||||
if (job->load_status != LoadStatus::PENDING) // Shortcut just to avoid incrementing ProfileEvents
|
||||
return;
|
||||
|
||||
if (job->on_waiters_increment)
|
||||
job->on_waiters_increment(job);
|
||||
|
||||
// WARNING: it is important not to throw below this point to avoid `on_waiters_increment` call w/o matching `on_waiters_decrement` call
|
||||
|
||||
Stopwatch watch;
|
||||
job->waiters++;
|
||||
job->finished.wait(job_lock, [&] { return job->load_status != LoadStatus::PENDING; });
|
||||
job->waiters--;
|
||||
ProfileEvents::increment(ProfileEvents::AsyncLoaderWaitMicroseconds, watch.elapsedMicroseconds());
|
||||
|
||||
if (job->on_waiters_decrement)
|
||||
job->on_waiters_decrement(job);
|
||||
|
||||
if (job->waiters == 0)
|
||||
{
|
||||
job->on_waiters_increment = {};
|
||||
job->on_waiters_decrement = {};
|
||||
}
|
||||
}
|
||||
|
||||
bool AsyncLoader::canSpawnWorker(Pool & pool, std::unique_lock<std::mutex> &)
|
||||
@ -859,7 +878,7 @@ void AsyncLoader::worker(Pool & pool)
|
||||
try
|
||||
{
|
||||
current_load_job = job.get();
|
||||
SCOPE_EXIT({ current_load_job = nullptr; }); // Note that recursive job execution is not supported
|
||||
SCOPE_EXIT({ current_load_job = nullptr; }); // Note that recursive job execution is not supported, but jobs can wait one another
|
||||
job->execute(*this, pool_id, job);
|
||||
exception_from_job = {};
|
||||
}
|
||||
|
@ -59,7 +59,8 @@ enum class LoadStatus
|
||||
class LoadJob : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
template <class LoadJobSetType, class Func, class DFFunc>
|
||||
// NOTE: makeLoadJob() helper should be used instead of direct ctor call
|
||||
template <class LoadJobSetType, class DFFunc, class Func>
|
||||
LoadJob(LoadJobSetType && dependencies_, String name_, size_t pool_id_, DFFunc && dependency_failure_, Func && func_)
|
||||
: dependencies(std::forward<LoadJobSetType>(dependencies_))
|
||||
, name(std::move(name_))
|
||||
@ -69,6 +70,19 @@ public:
|
||||
, func(std::forward<Func>(func_))
|
||||
{}
|
||||
|
||||
// NOTE: makeLoadJob() helper should be used instead of direct ctor call
|
||||
template <class LoadJobSetType, class WIFunc, class WDFunc, class DFFunc, class Func>
|
||||
LoadJob(LoadJobSetType && dependencies_, String name_, size_t pool_id_, WIFunc && on_waiters_increment_, WDFunc && on_waiters_decrement_, DFFunc && dependency_failure_, Func && func_)
|
||||
: dependencies(std::forward<LoadJobSetType>(dependencies_))
|
||||
, name(std::move(name_))
|
||||
, execution_pool_id(pool_id_)
|
||||
, pool_id(pool_id_)
|
||||
, on_waiters_increment(std::forward<WIFunc>(on_waiters_increment_))
|
||||
, on_waiters_decrement(std::forward<WDFunc>(on_waiters_decrement_))
|
||||
, dependency_failure(std::forward<DFFunc>(dependency_failure_))
|
||||
, func(std::forward<Func>(func_))
|
||||
{}
|
||||
|
||||
// Current job status.
|
||||
LoadStatus status() const;
|
||||
std::exception_ptr exception() const;
|
||||
@ -112,6 +126,13 @@ private:
|
||||
std::atomic<size_t> execution_pool_id;
|
||||
std::atomic<size_t> pool_id;
|
||||
|
||||
// Handlers that is called by every new waiting thread, just before going to sleep.
|
||||
// If `on_waiters_increment` throws, then wait is canceled, and corresponding `on_waiters_decrement` will never be called.
|
||||
// It can be used for counting and limits on number of waiters.
|
||||
// Note that implementations are called under `LoadJob::mutex` and should be fast.
|
||||
std::function<void(const LoadJobPtr & self)> on_waiters_increment;
|
||||
std::function<void(const LoadJobPtr & self)> on_waiters_decrement;
|
||||
|
||||
// Handler for failed or canceled dependencies.
|
||||
// If job needs to be canceled on `dependency` failure, then function should set `cancel` to a specific reason.
|
||||
// Note that implementation should be fast and cannot use AsyncLoader, because it is called under `AsyncLoader::mutex`.
|
||||
@ -140,8 +161,50 @@ void cancelOnDependencyFailure(const LoadJobPtr & self, const LoadJobPtr & depen
|
||||
void ignoreDependencyFailure(const LoadJobPtr & self, const LoadJobPtr & dependency, std::exception_ptr & cancel);
|
||||
|
||||
template <class F> concept LoadJobDependencyFailure = std::invocable<F, const LoadJobPtr &, const LoadJobPtr &, std::exception_ptr &>;
|
||||
template <class F> concept LoadJobOnWaiters = std::invocable<F, const LoadJobPtr &>;
|
||||
template <class F> concept LoadJobFunc = std::invocable<F, AsyncLoader &, const LoadJobPtr &>;
|
||||
|
||||
LoadJobPtr makeLoadJob(LoadJobSet && dependencies, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobDependencyFailure auto && dependency_failure, LoadJobFunc auto && func)
|
||||
{
|
||||
return std::make_shared<LoadJob>(std::move(dependencies), std::move(name), 0, on_waiters_increment, on_waiters_decrement, std::forward<decltype(dependency_failure)>(dependency_failure), std::forward<decltype(func)>(func));
|
||||
}
|
||||
|
||||
LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobDependencyFailure auto && dependency_failure, LoadJobFunc auto && func)
|
||||
{
|
||||
return std::make_shared<LoadJob>(dependencies, std::move(name), 0, on_waiters_increment, on_waiters_decrement, std::forward<decltype(dependency_failure)>(dependency_failure), std::forward<decltype(func)>(func));
|
||||
}
|
||||
|
||||
LoadJobPtr makeLoadJob(LoadJobSet && dependencies, size_t pool_id, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobDependencyFailure auto && dependency_failure, LoadJobFunc auto && func)
|
||||
{
|
||||
return std::make_shared<LoadJob>(std::move(dependencies), std::move(name), pool_id, on_waiters_increment, on_waiters_decrement, std::forward<decltype(dependency_failure)>(dependency_failure), std::forward<decltype(func)>(func));
|
||||
}
|
||||
|
||||
LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, size_t pool_id, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobDependencyFailure auto && dependency_failure, LoadJobFunc auto && func)
|
||||
{
|
||||
return std::make_shared<LoadJob>(dependencies, std::move(name), pool_id, on_waiters_increment, on_waiters_decrement, std::forward<decltype(dependency_failure)>(dependency_failure), std::forward<decltype(func)>(func));
|
||||
}
|
||||
|
||||
LoadJobPtr makeLoadJob(LoadJobSet && dependencies, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobFunc auto && func)
|
||||
{
|
||||
return std::make_shared<LoadJob>(std::move(dependencies), std::move(name), 0, on_waiters_increment, on_waiters_decrement, cancelOnDependencyFailure, std::forward<decltype(func)>(func));
|
||||
}
|
||||
|
||||
LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobFunc auto && func)
|
||||
{
|
||||
return std::make_shared<LoadJob>(dependencies, std::move(name), 0, on_waiters_increment, on_waiters_decrement, cancelOnDependencyFailure, std::forward<decltype(func)>(func));
|
||||
}
|
||||
|
||||
LoadJobPtr makeLoadJob(LoadJobSet && dependencies, size_t pool_id, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobFunc auto && func)
|
||||
{
|
||||
return std::make_shared<LoadJob>(std::move(dependencies), std::move(name), pool_id, on_waiters_increment, on_waiters_decrement, cancelOnDependencyFailure, std::forward<decltype(func)>(func));
|
||||
}
|
||||
|
||||
LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, size_t pool_id, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobFunc auto && func)
|
||||
{
|
||||
return std::make_shared<LoadJob>(dependencies, std::move(name), pool_id, on_waiters_increment, on_waiters_decrement, cancelOnDependencyFailure, std::forward<decltype(func)>(func));
|
||||
}
|
||||
|
||||
|
||||
LoadJobPtr makeLoadJob(LoadJobSet && dependencies, String name, LoadJobDependencyFailure auto && dependency_failure, LoadJobFunc auto && func)
|
||||
{
|
||||
return std::make_shared<LoadJob>(std::move(dependencies), std::move(name), 0, std::forward<decltype(dependency_failure)>(dependency_failure), std::forward<decltype(func)>(func));
|
||||
|
@ -643,6 +643,72 @@ TEST(AsyncLoader, CustomDependencyFailure)
|
||||
ASSERT_EQ(good_count.load(), 3);
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, WaitersLimit)
|
||||
{
|
||||
AsyncLoaderTest t(16);
|
||||
|
||||
std::atomic<int> waiters_total{0};
|
||||
int waiters_limit = 5;
|
||||
auto waiters_inc = [&] (const LoadJobPtr &) {
|
||||
int value = waiters_total.load();
|
||||
while (true)
|
||||
{
|
||||
if (value >= waiters_limit)
|
||||
throw Exception(ErrorCodes::ASYNC_LOAD_FAILED, "Too many waiters: {}", value);
|
||||
if (waiters_total.compare_exchange_strong(value, value + 1))
|
||||
break;
|
||||
}
|
||||
};
|
||||
auto waiters_dec = [&] (const LoadJobPtr &) {
|
||||
waiters_total.fetch_sub(1);
|
||||
};
|
||||
|
||||
std::barrier sync(2);
|
||||
t.loader.start();
|
||||
|
||||
auto job_func = [&] (AsyncLoader &, const LoadJobPtr &) {
|
||||
sync.arrive_and_wait(); // (A)
|
||||
};
|
||||
|
||||
auto job = makeLoadJob({}, "job", waiters_inc, waiters_dec, job_func);
|
||||
auto task = t.schedule({job});
|
||||
|
||||
std::atomic<int> failure{0};
|
||||
std::atomic<int> success{0};
|
||||
std::vector<std::thread> waiters;
|
||||
waiters.reserve(10);
|
||||
auto waiter = [&] {
|
||||
try
|
||||
{
|
||||
t.loader.wait(job);
|
||||
success.fetch_add(1);
|
||||
}
|
||||
catch(...)
|
||||
{
|
||||
failure.fetch_add(1);
|
||||
}
|
||||
};
|
||||
|
||||
for (int i = 0; i < 10; i++)
|
||||
waiters.emplace_back(waiter);
|
||||
|
||||
while (failure.load() != 5)
|
||||
std::this_thread::yield();
|
||||
|
||||
ASSERT_EQ(job->waitersCount(), 5);
|
||||
|
||||
sync.arrive_and_wait(); // (A)
|
||||
|
||||
for (auto & thread : waiters)
|
||||
thread.join();
|
||||
|
||||
ASSERT_EQ(success.load(), 5);
|
||||
ASSERT_EQ(failure.load(), 5);
|
||||
ASSERT_EQ(waiters_total.load(), 0);
|
||||
|
||||
t.loader.wait();
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, TestConcurrency)
|
||||
{
|
||||
AsyncLoaderTest t(10);
|
||||
|
@ -64,6 +64,7 @@ namespace DB
|
||||
M(UInt64, max_concurrent_queries, 0, "Maximum number of concurrently executed queries. Zero means unlimited.", 0) \
|
||||
M(UInt64, max_concurrent_insert_queries, 0, "Maximum number of concurrently INSERT queries. Zero means unlimited.", 0) \
|
||||
M(UInt64, max_concurrent_select_queries, 0, "Maximum number of concurrently SELECT queries. Zero means unlimited.", 0) \
|
||||
M(UInt64, max_waiting_queries, 0, "Maximum number of concurrently waiting queries blocked due to `async_load_databases`. Note that waiting queries are not considered by `max_concurrent_*queries*` limits. Zero means unlimited.", 0) \
|
||||
\
|
||||
M(Double, cache_size_to_ram_max_ratio, 0.5, "Set cache size ro RAM max ratio. Allows to lower cache size on low-memory systems.", 0) \
|
||||
M(String, uncompressed_cache_policy, DEFAULT_UNCOMPRESSED_CACHE_POLICY, "Uncompressed cache policy name.", 0) \
|
||||
|
@ -83,25 +83,31 @@ ProcessList::insert(const String & query_, const IAST * ast, ContextMutablePtr q
|
||||
IAST::QueryKind query_kind = ast->getQueryKind();
|
||||
|
||||
const auto queue_max_wait_ms = settings.queue_max_wait_ms.totalMilliseconds();
|
||||
if (!is_unlimited_query && max_size && processes.size() >= max_size)
|
||||
UInt64 waiting_queries = waiting_queries_amount.load();
|
||||
if (!is_unlimited_query && max_size && processes.size() >= max_size + waiting_queries)
|
||||
{
|
||||
if (queue_max_wait_ms)
|
||||
LOG_WARNING(getLogger("ProcessList"), "Too many simultaneous queries, will wait {} ms.", queue_max_wait_ms);
|
||||
if (!queue_max_wait_ms || !have_space.wait_for(lock, std::chrono::milliseconds(queue_max_wait_ms), [&]{ return processes.size() < max_size; }))
|
||||
throw Exception(ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES, "Too many simultaneous queries. Maximum: {}", max_size);
|
||||
if (!queue_max_wait_ms || !have_space.wait_for(lock, std::chrono::milliseconds(queue_max_wait_ms),
|
||||
[&]{ waiting_queries = waiting_queries_amount.load(); return processes.size() < max_size + waiting_queries; }))
|
||||
throw Exception(ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES,
|
||||
"Too many simultaneous queries. Maximum: {}{}",
|
||||
max_size, waiting_queries == 0 ? "" : fmt::format(", waiting: {}", waiting_queries));
|
||||
}
|
||||
|
||||
if (!is_unlimited_query)
|
||||
{
|
||||
QueryAmount amount = getQueryKindAmount(query_kind);
|
||||
if (max_insert_queries_amount && query_kind == IAST::QueryKind::Insert && amount >= max_insert_queries_amount)
|
||||
UInt64 waiting_inserts = waiting_insert_queries_amount.load();
|
||||
UInt64 waiting_selects = waiting_select_queries_amount.load();
|
||||
if (max_insert_queries_amount && query_kind == IAST::QueryKind::Insert && amount >= max_insert_queries_amount + waiting_inserts)
|
||||
throw Exception(ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES,
|
||||
"Too many simultaneous insert queries. Maximum: {}, current: {}",
|
||||
max_insert_queries_amount, amount);
|
||||
if (max_select_queries_amount && query_kind == IAST::QueryKind::Select && amount >= max_select_queries_amount)
|
||||
"Too many simultaneous insert queries. Maximum: {}, current: {}{}",
|
||||
max_insert_queries_amount, amount, waiting_inserts == 0 ? "" : fmt::format(", waiting: {}", waiting_inserts));
|
||||
if (max_select_queries_amount && query_kind == IAST::QueryKind::Select && amount >= max_select_queries_amount + waiting_selects)
|
||||
throw Exception(ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES,
|
||||
"Too many simultaneous select queries. Maximum: {}, current: {}",
|
||||
max_select_queries_amount, amount);
|
||||
"Too many simultaneous select queries. Maximum: {}, current: {}{}",
|
||||
max_select_queries_amount, amount, waiting_selects == 0 ? "" : fmt::format(", waiting: {}", waiting_selects));
|
||||
}
|
||||
|
||||
{
|
||||
@ -124,10 +130,12 @@ ProcessList::insert(const String & query_, const IAST * ast, ContextMutablePtr q
|
||||
* once is already processing 50+ concurrent queries (including analysts or any other users).
|
||||
*/
|
||||
|
||||
waiting_queries = waiting_queries_amount.load();
|
||||
if (!is_unlimited_query && settings.max_concurrent_queries_for_all_users
|
||||
&& processes.size() >= settings.max_concurrent_queries_for_all_users)
|
||||
&& processes.size() >= settings.max_concurrent_queries_for_all_users + waiting_queries_amount)
|
||||
throw Exception(ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES, "Too many simultaneous queries for all users. "
|
||||
"Current: {}, maximum: {}", processes.size(), settings.max_concurrent_queries_for_all_users.toString());
|
||||
"Current: {}, maximum: {}{}", processes.size(), settings.max_concurrent_queries_for_all_users.toString(),
|
||||
waiting_queries == 0 ? "" : fmt::format(", waiting: {}", waiting_queries));
|
||||
}
|
||||
|
||||
/** Why we use current user?
|
||||
@ -145,13 +153,15 @@ ProcessList::insert(const String & query_, const IAST * ast, ContextMutablePtr q
|
||||
|
||||
if (user_process_list != user_to_queries.end())
|
||||
{
|
||||
UInt64 user_waiting_queries = user_process_list->second.waiting_queries_amount.load();
|
||||
if (!is_unlimited_query && settings.max_concurrent_queries_for_user
|
||||
&& user_process_list->second.queries.size() >= settings.max_concurrent_queries_for_user)
|
||||
&& user_process_list->second.queries.size() >= settings.max_concurrent_queries_for_user + user_waiting_queries)
|
||||
throw Exception(ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES,
|
||||
"Too many simultaneous queries for user {}. "
|
||||
"Current: {}, maximum: {}",
|
||||
"Current: {}, maximum: {}{}",
|
||||
client_info.current_user, user_process_list->second.queries.size(),
|
||||
settings.max_concurrent_queries_for_user.toString());
|
||||
settings.max_concurrent_queries_for_user.toString(),
|
||||
user_waiting_queries == 0 ? "" : fmt::format(", waiting: {}", user_waiting_queries));
|
||||
|
||||
auto running_query = user_process_list->second.queries.find(client_info.current_query_id);
|
||||
|
||||
@ -745,4 +755,69 @@ ProcessList::QueryAmount ProcessList::getQueryKindAmount(const IAST::QueryKind &
|
||||
return found->second;
|
||||
}
|
||||
|
||||
void ProcessList::increaseWaitingQueryAmount(const QueryStatusPtr & status)
|
||||
{
|
||||
UInt64 limit = max_waiting_queries_amount.load();
|
||||
UInt64 value = waiting_queries_amount.load();
|
||||
while (true)
|
||||
{
|
||||
if (value >= limit)
|
||||
throw Exception(ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES,
|
||||
"Too many simultaneous waiting queries. Maximum: {}, waiting: {}",
|
||||
limit, value);
|
||||
if (waiting_queries_amount.compare_exchange_strong(value, value + 1))
|
||||
break;
|
||||
}
|
||||
|
||||
// WARNING: After this point we should not throw, otherwise corresponding `decreaseWaitingQueryAmount` will not be called.
|
||||
|
||||
// Update query kind counters
|
||||
if (status->query_kind == IAST::QueryKind::Insert)
|
||||
waiting_insert_queries_amount.fetch_add(1);
|
||||
if (status->query_kind == IAST::QueryKind::Select)
|
||||
waiting_select_queries_amount.fetch_add(1);
|
||||
|
||||
// Update per-user counter
|
||||
status->getUserProcessList()->waiting_queries_amount.fetch_add(1);
|
||||
|
||||
// We have to notify because some queries might be waiting on `have_space`
|
||||
// and this query leaves its space by transitioning to waiting state
|
||||
have_space.notify_all();
|
||||
}
|
||||
|
||||
void ProcessList::decreaseWaitingQueryAmount(const QueryStatusPtr & status)
|
||||
{
|
||||
if (status->getUserProcessList()->waiting_queries_amount.fetch_sub(1) == 0)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong insert waiting query amount for user: decrease to negative");
|
||||
|
||||
if (status->query_kind == IAST::QueryKind::Insert && waiting_insert_queries_amount.fetch_sub(1) == 0)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong insert waiting query amount: decrease to negative");
|
||||
|
||||
if (status->query_kind == IAST::QueryKind::Select && waiting_select_queries_amount.fetch_sub(1) == 0)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong select waiting query amount: decrease to negative");
|
||||
|
||||
if (waiting_queries_amount.fetch_sub(1) == 0)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong waiting query amount: decrease to negative");
|
||||
}
|
||||
|
||||
void ProcessList::incrementWaiters()
|
||||
{
|
||||
ContextPtr context = CurrentThread::getQueryContext();
|
||||
QueryStatusPtr status = context->getProcessListElement();
|
||||
|
||||
// Query became "waiting" with the first thread that waits
|
||||
if (status->waiting_threads.fetch_add(1) == 0)
|
||||
increaseWaitingQueryAmount(status);
|
||||
}
|
||||
|
||||
void ProcessList::decrementWaiters()
|
||||
{
|
||||
ContextPtr context = CurrentThread::getQueryContext();
|
||||
QueryStatusPtr status = context->getProcessListElement();
|
||||
|
||||
// Query became "non-waiting" with the last thread that no longer waits
|
||||
if (status->waiting_threads.fetch_sub(1) == 1)
|
||||
decreaseWaitingQueryAmount(status);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -42,10 +42,6 @@ class ThreadStatus;
|
||||
class ProcessListEntry;
|
||||
|
||||
|
||||
/** List of currently executing queries.
|
||||
* Also implements limit on their number.
|
||||
*/
|
||||
|
||||
/** Information of process list element.
|
||||
* To output in SHOW PROCESSLIST query. Does not contain any complex objects, that do something on copy or destructor.
|
||||
*/
|
||||
@ -114,8 +110,13 @@ protected:
|
||||
/// Including EndOfStream or Exception.
|
||||
std::atomic<bool> is_all_data_sent { false };
|
||||
|
||||
/// Number of threads for the query that are waiting for load jobs
|
||||
std::atomic<UInt64> waiting_threads{0};
|
||||
|
||||
/// For initialization of ProcessListForUser during process insertion.
|
||||
void setUserProcessList(ProcessListForUser * user_process_list_);
|
||||
/// Be careful using it. For example, queries field of ProcessListForUser could be modified concurrently.
|
||||
ProcessListForUser * getUserProcessList() { return user_process_list; }
|
||||
const ProcessListForUser * getUserProcessList() const { return user_process_list; }
|
||||
|
||||
/// Sets an entry in the ProcessList associated with this QueryStatus.
|
||||
@ -283,6 +284,9 @@ struct ProcessListForUser
|
||||
/// Count network usage for all simultaneously running queries of single user.
|
||||
ThrottlerPtr user_throttler;
|
||||
|
||||
/// Number of queries waiting on load jobs
|
||||
std::atomic<UInt64> waiting_queries_amount{0};
|
||||
|
||||
ProcessListForUserInfo getInfo(bool get_profile_events = false) const;
|
||||
|
||||
/// Clears MemoryTracker for the user.
|
||||
@ -341,6 +345,9 @@ protected:
|
||||
};
|
||||
|
||||
|
||||
/** List of currently executing queries.
|
||||
* Also implements limit on their number.
|
||||
*/
|
||||
class ProcessList : public ProcessListBase
|
||||
{
|
||||
public:
|
||||
@ -399,10 +406,21 @@ protected:
|
||||
/// amount of queries by query kind.
|
||||
QueryKindAmounts query_kind_amounts;
|
||||
|
||||
/// limit for waiting queries. 0 means no limit. Otherwise, when limit exceeded, an exception is thrown.
|
||||
std::atomic<UInt64> max_waiting_queries_amount{0};
|
||||
|
||||
/// amounts of waiting queries
|
||||
std::atomic<UInt64> waiting_queries_amount{0};
|
||||
std::atomic<UInt64> waiting_insert_queries_amount{0};
|
||||
std::atomic<UInt64> waiting_select_queries_amount{0};
|
||||
|
||||
void increaseQueryKindAmount(const IAST::QueryKind & query_kind);
|
||||
void decreaseQueryKindAmount(const IAST::QueryKind & query_kind);
|
||||
QueryAmount getQueryKindAmount(const IAST::QueryKind & query_kind) const;
|
||||
|
||||
void increaseWaitingQueryAmount(const QueryStatusPtr & status);
|
||||
void decreaseWaitingQueryAmount(const QueryStatusPtr & status);
|
||||
|
||||
public:
|
||||
using EntryPtr = std::shared_ptr<ProcessListEntry>;
|
||||
|
||||
@ -458,6 +476,21 @@ public:
|
||||
return max_select_queries_amount;
|
||||
}
|
||||
|
||||
void setMaxWaitingQueriesAmount(UInt64 max_waiting_queries_amount_)
|
||||
{
|
||||
max_waiting_queries_amount.store(max_waiting_queries_amount_);
|
||||
// NOTE: We cannot cancel waiting queries when limit is lowered. They have to wait anyways, but new queries will be canceled instead of waiting.
|
||||
}
|
||||
|
||||
size_t getMaxWaitingQueriesAmount() const
|
||||
{
|
||||
return max_waiting_queries_amount.load();
|
||||
}
|
||||
|
||||
// Handlers for AsyncLoader waiters
|
||||
void incrementWaiters();
|
||||
void decrementWaiters();
|
||||
|
||||
/// Try call cancel() for input and output streams of query with specified id and user
|
||||
CancellationCode sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill = false);
|
||||
CancellationCode sendCancelToQuery(QueryStatusPtr elem, bool kill = false);
|
||||
|
@ -70,6 +70,7 @@ void StorageSystemServerSettings::fillData(MutableColumns & res_columns, Context
|
||||
{"max_concurrent_queries", {std::to_string(context->getProcessList().getMaxSize()), ChangeableWithoutRestart::Yes}},
|
||||
{"max_concurrent_insert_queries", {std::to_string(context->getProcessList().getMaxInsertQueriesAmount()), ChangeableWithoutRestart::Yes}},
|
||||
{"max_concurrent_select_queries", {std::to_string(context->getProcessList().getMaxSelectQueriesAmount()), ChangeableWithoutRestart::Yes}},
|
||||
{"max_waiting_queries", {std::to_string(context->getProcessList().getMaxWaitingQueriesAmount()), ChangeableWithoutRestart::Yes}},
|
||||
|
||||
{"background_buffer_flush_schedule_pool_size", {std::to_string(CurrentMetrics::get(CurrentMetrics::BackgroundBufferFlushSchedulePoolSize)), ChangeableWithoutRestart::IncreaseOnly}},
|
||||
{"background_schedule_pool_size", {std::to_string(CurrentMetrics::get(CurrentMetrics::BackgroundSchedulePoolSize)), ChangeableWithoutRestart::IncreaseOnly}},
|
||||
|
Loading…
Reference in New Issue
Block a user