diff --git a/docs/en/operations/server-configuration-parameters/settings.md b/docs/en/operations/server-configuration-parameters/settings.md index 07c9a2b88ab..63fbd9d1964 100644 --- a/docs/en/operations/server-configuration-parameters/settings.md +++ b/docs/en/operations/server-configuration-parameters/settings.md @@ -379,6 +379,18 @@ Type: UInt64 Default: 0 +## max_waiting_queries + +Limit on total number of concurrently waiting queries. Execution of a waiting query is blocked while required tables are loading asynchronously (see `async_load_databases`). Note that waiting queries are not counted when `max_concurrent_queries`, `max_concurrent_insert_queries`, `max_concurrent_select_queries`, `max_concurrent_queries_for_user` and `max_concurrent_queries_for_all_users` limits are checked. This correction is done to avoid hitting these limits just after server startup. Zero means unlimited. + +:::note +This setting can be modified at runtime and will take effect immediately. Queries that are already running will remain unchanged. +::: + +Type: UInt64 + +Default: 0 + ## max_connections Max server connections. @@ -1725,7 +1737,7 @@ Default value: `0.5`. Asynchronous loading of databases and tables. -If `true` all non-system databases with `Ordinary`, `Atomic` and `Replicated` engine will be loaded asynchronously after the ClickHouse server start up. See `system.asynchronous_loader` table, `tables_loader_background_pool_size` and `tables_loader_foreground_pool_size` server settings. Any query that tries to access a table, that is not yet loaded, will wait for exactly this table to be started up. If load job fails, query will rethrow an error (instead of shutting down the whole server in case of `async_load_databases = false`). The table that is waited for by at least one query will be loaded with higher priority. DDL queries on a database will wait for exactly that database to be started up. +If `true` all non-system databases with `Ordinary`, `Atomic` and `Replicated` engine will be loaded asynchronously after the ClickHouse server start up. See `system.asynchronous_loader` table, `tables_loader_background_pool_size` and `tables_loader_foreground_pool_size` server settings. Any query that tries to access a table, that is not yet loaded, will wait for exactly this table to be started up. If load job fails, query will rethrow an error (instead of shutting down the whole server in case of `async_load_databases = false`). The table that is waited for by at least one query will be loaded with higher priority. DDL queries on a database will wait for exactly that database to be started up. Also consider setting a limit `max_waiting_queries` for the total number of waiting queries. If `false`, all databases are loaded when the server starts. @@ -2926,7 +2938,7 @@ Default: 0 ## ignore_empty_sql_security_in_create_view_query {#ignore_empty_sql_security_in_create_view_query} -If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. +If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. :::note This setting is only necessary for the migration period and will become obsolete in 24.4 diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index a10f47be0b8..336563665a2 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1429,6 +1429,7 @@ try global_context->getProcessList().setMaxSize(new_server_settings.max_concurrent_queries); global_context->getProcessList().setMaxInsertQueriesAmount(new_server_settings.max_concurrent_insert_queries); global_context->getProcessList().setMaxSelectQueriesAmount(new_server_settings.max_concurrent_select_queries); + global_context->getProcessList().setMaxWaitingQueriesAmount(new_server_settings.max_waiting_queries); if (config->has("keeper_server")) global_context->updateKeeperConfiguration(*config); diff --git a/src/Common/AsyncLoader.cpp b/src/Common/AsyncLoader.cpp index 140194e10b4..80e4c72f1c1 100644 --- a/src/Common/AsyncLoader.cpp +++ b/src/Common/AsyncLoader.cpp @@ -140,6 +140,11 @@ void LoadJob::finish() finish_time = std::chrono::system_clock::now(); if (waiters > 0) finished.notify_all(); + else + { + on_waiters_increment = {}; + on_waiters_decrement = {}; + } } void LoadJob::scheduled(UInt64 job_id_) @@ -765,11 +770,25 @@ void AsyncLoader::wait(std::unique_lock & job_lock, const LoadJobPtr if (job->load_status != LoadStatus::PENDING) // Shortcut just to avoid incrementing ProfileEvents return; + if (job->on_waiters_increment) + job->on_waiters_increment(job); + + // WARNING: it is important not to throw below this point to avoid `on_waiters_increment` call w/o matching `on_waiters_decrement` call + Stopwatch watch; job->waiters++; job->finished.wait(job_lock, [&] { return job->load_status != LoadStatus::PENDING; }); job->waiters--; ProfileEvents::increment(ProfileEvents::AsyncLoaderWaitMicroseconds, watch.elapsedMicroseconds()); + + if (job->on_waiters_decrement) + job->on_waiters_decrement(job); + + if (job->waiters == 0) + { + job->on_waiters_increment = {}; + job->on_waiters_decrement = {}; + } } bool AsyncLoader::canSpawnWorker(Pool & pool, std::unique_lock &) @@ -859,7 +878,7 @@ void AsyncLoader::worker(Pool & pool) try { current_load_job = job.get(); - SCOPE_EXIT({ current_load_job = nullptr; }); // Note that recursive job execution is not supported + SCOPE_EXIT({ current_load_job = nullptr; }); // Note that recursive job execution is not supported, but jobs can wait one another job->execute(*this, pool_id, job); exception_from_job = {}; } diff --git a/src/Common/AsyncLoader.h b/src/Common/AsyncLoader.h index b1b336d24dc..3f81a36aa96 100644 --- a/src/Common/AsyncLoader.h +++ b/src/Common/AsyncLoader.h @@ -59,7 +59,8 @@ enum class LoadStatus class LoadJob : private boost::noncopyable { public: - template + // NOTE: makeLoadJob() helper should be used instead of direct ctor call + template LoadJob(LoadJobSetType && dependencies_, String name_, size_t pool_id_, DFFunc && dependency_failure_, Func && func_) : dependencies(std::forward(dependencies_)) , name(std::move(name_)) @@ -69,6 +70,19 @@ public: , func(std::forward(func_)) {} + // NOTE: makeLoadJob() helper should be used instead of direct ctor call + template + LoadJob(LoadJobSetType && dependencies_, String name_, size_t pool_id_, WIFunc && on_waiters_increment_, WDFunc && on_waiters_decrement_, DFFunc && dependency_failure_, Func && func_) + : dependencies(std::forward(dependencies_)) + , name(std::move(name_)) + , execution_pool_id(pool_id_) + , pool_id(pool_id_) + , on_waiters_increment(std::forward(on_waiters_increment_)) + , on_waiters_decrement(std::forward(on_waiters_decrement_)) + , dependency_failure(std::forward(dependency_failure_)) + , func(std::forward(func_)) + {} + // Current job status. LoadStatus status() const; std::exception_ptr exception() const; @@ -112,6 +126,13 @@ private: std::atomic execution_pool_id; std::atomic pool_id; + // Handlers that is called by every new waiting thread, just before going to sleep. + // If `on_waiters_increment` throws, then wait is canceled, and corresponding `on_waiters_decrement` will never be called. + // It can be used for counting and limits on number of waiters. + // Note that implementations are called under `LoadJob::mutex` and should be fast. + std::function on_waiters_increment; + std::function on_waiters_decrement; + // Handler for failed or canceled dependencies. // If job needs to be canceled on `dependency` failure, then function should set `cancel` to a specific reason. // Note that implementation should be fast and cannot use AsyncLoader, because it is called under `AsyncLoader::mutex`. @@ -140,8 +161,50 @@ void cancelOnDependencyFailure(const LoadJobPtr & self, const LoadJobPtr & depen void ignoreDependencyFailure(const LoadJobPtr & self, const LoadJobPtr & dependency, std::exception_ptr & cancel); template concept LoadJobDependencyFailure = std::invocable; +template concept LoadJobOnWaiters = std::invocable; template concept LoadJobFunc = std::invocable; +LoadJobPtr makeLoadJob(LoadJobSet && dependencies, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobDependencyFailure auto && dependency_failure, LoadJobFunc auto && func) +{ + return std::make_shared(std::move(dependencies), std::move(name), 0, on_waiters_increment, on_waiters_decrement, std::forward(dependency_failure), std::forward(func)); +} + +LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobDependencyFailure auto && dependency_failure, LoadJobFunc auto && func) +{ + return std::make_shared(dependencies, std::move(name), 0, on_waiters_increment, on_waiters_decrement, std::forward(dependency_failure), std::forward(func)); +} + +LoadJobPtr makeLoadJob(LoadJobSet && dependencies, size_t pool_id, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobDependencyFailure auto && dependency_failure, LoadJobFunc auto && func) +{ + return std::make_shared(std::move(dependencies), std::move(name), pool_id, on_waiters_increment, on_waiters_decrement, std::forward(dependency_failure), std::forward(func)); +} + +LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, size_t pool_id, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobDependencyFailure auto && dependency_failure, LoadJobFunc auto && func) +{ + return std::make_shared(dependencies, std::move(name), pool_id, on_waiters_increment, on_waiters_decrement, std::forward(dependency_failure), std::forward(func)); +} + +LoadJobPtr makeLoadJob(LoadJobSet && dependencies, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobFunc auto && func) +{ + return std::make_shared(std::move(dependencies), std::move(name), 0, on_waiters_increment, on_waiters_decrement, cancelOnDependencyFailure, std::forward(func)); +} + +LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobFunc auto && func) +{ + return std::make_shared(dependencies, std::move(name), 0, on_waiters_increment, on_waiters_decrement, cancelOnDependencyFailure, std::forward(func)); +} + +LoadJobPtr makeLoadJob(LoadJobSet && dependencies, size_t pool_id, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobFunc auto && func) +{ + return std::make_shared(std::move(dependencies), std::move(name), pool_id, on_waiters_increment, on_waiters_decrement, cancelOnDependencyFailure, std::forward(func)); +} + +LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, size_t pool_id, String name, LoadJobOnWaiters auto && on_waiters_increment, LoadJobOnWaiters auto && on_waiters_decrement, LoadJobFunc auto && func) +{ + return std::make_shared(dependencies, std::move(name), pool_id, on_waiters_increment, on_waiters_decrement, cancelOnDependencyFailure, std::forward(func)); +} + + LoadJobPtr makeLoadJob(LoadJobSet && dependencies, String name, LoadJobDependencyFailure auto && dependency_failure, LoadJobFunc auto && func) { return std::make_shared(std::move(dependencies), std::move(name), 0, std::forward(dependency_failure), std::forward(func)); diff --git a/src/Common/tests/gtest_async_loader.cpp b/src/Common/tests/gtest_async_loader.cpp index fc2537abcfc..62a27f259cc 100644 --- a/src/Common/tests/gtest_async_loader.cpp +++ b/src/Common/tests/gtest_async_loader.cpp @@ -643,6 +643,72 @@ TEST(AsyncLoader, CustomDependencyFailure) ASSERT_EQ(good_count.load(), 3); } +TEST(AsyncLoader, WaitersLimit) +{ + AsyncLoaderTest t(16); + + std::atomic waiters_total{0}; + int waiters_limit = 5; + auto waiters_inc = [&] (const LoadJobPtr &) { + int value = waiters_total.load(); + while (true) + { + if (value >= waiters_limit) + throw Exception(ErrorCodes::ASYNC_LOAD_FAILED, "Too many waiters: {}", value); + if (waiters_total.compare_exchange_strong(value, value + 1)) + break; + } + }; + auto waiters_dec = [&] (const LoadJobPtr &) { + waiters_total.fetch_sub(1); + }; + + std::barrier sync(2); + t.loader.start(); + + auto job_func = [&] (AsyncLoader &, const LoadJobPtr &) { + sync.arrive_and_wait(); // (A) + }; + + auto job = makeLoadJob({}, "job", waiters_inc, waiters_dec, job_func); + auto task = t.schedule({job}); + + std::atomic failure{0}; + std::atomic success{0}; + std::vector waiters; + waiters.reserve(10); + auto waiter = [&] { + try + { + t.loader.wait(job); + success.fetch_add(1); + } + catch(...) + { + failure.fetch_add(1); + } + }; + + for (int i = 0; i < 10; i++) + waiters.emplace_back(waiter); + + while (failure.load() != 5) + std::this_thread::yield(); + + ASSERT_EQ(job->waitersCount(), 5); + + sync.arrive_and_wait(); // (A) + + for (auto & thread : waiters) + thread.join(); + + ASSERT_EQ(success.load(), 5); + ASSERT_EQ(failure.load(), 5); + ASSERT_EQ(waiters_total.load(), 0); + + t.loader.wait(); +} + TEST(AsyncLoader, TestConcurrency) { AsyncLoaderTest t(10); diff --git a/src/Core/ServerSettings.h b/src/Core/ServerSettings.h index c82255ec59c..129b1016fca 100644 --- a/src/Core/ServerSettings.h +++ b/src/Core/ServerSettings.h @@ -64,6 +64,7 @@ namespace DB M(UInt64, max_concurrent_queries, 0, "Maximum number of concurrently executed queries. Zero means unlimited.", 0) \ M(UInt64, max_concurrent_insert_queries, 0, "Maximum number of concurrently INSERT queries. Zero means unlimited.", 0) \ M(UInt64, max_concurrent_select_queries, 0, "Maximum number of concurrently SELECT queries. Zero means unlimited.", 0) \ + M(UInt64, max_waiting_queries, 0, "Maximum number of concurrently waiting queries blocked due to `async_load_databases`. Note that waiting queries are not considered by `max_concurrent_*queries*` limits. Zero means unlimited.", 0) \ \ M(Double, cache_size_to_ram_max_ratio, 0.5, "Set cache size ro RAM max ratio. Allows to lower cache size on low-memory systems.", 0) \ M(String, uncompressed_cache_policy, DEFAULT_UNCOMPRESSED_CACHE_POLICY, "Uncompressed cache policy name.", 0) \ diff --git a/src/Interpreters/ProcessList.cpp b/src/Interpreters/ProcessList.cpp index 3bd7b2d4206..f451d561e60 100644 --- a/src/Interpreters/ProcessList.cpp +++ b/src/Interpreters/ProcessList.cpp @@ -83,25 +83,31 @@ ProcessList::insert(const String & query_, const IAST * ast, ContextMutablePtr q IAST::QueryKind query_kind = ast->getQueryKind(); const auto queue_max_wait_ms = settings.queue_max_wait_ms.totalMilliseconds(); - if (!is_unlimited_query && max_size && processes.size() >= max_size) + UInt64 waiting_queries = waiting_queries_amount.load(); + if (!is_unlimited_query && max_size && processes.size() >= max_size + waiting_queries) { if (queue_max_wait_ms) LOG_WARNING(getLogger("ProcessList"), "Too many simultaneous queries, will wait {} ms.", queue_max_wait_ms); - if (!queue_max_wait_ms || !have_space.wait_for(lock, std::chrono::milliseconds(queue_max_wait_ms), [&]{ return processes.size() < max_size; })) - throw Exception(ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES, "Too many simultaneous queries. Maximum: {}", max_size); + if (!queue_max_wait_ms || !have_space.wait_for(lock, std::chrono::milliseconds(queue_max_wait_ms), + [&]{ waiting_queries = waiting_queries_amount.load(); return processes.size() < max_size + waiting_queries; })) + throw Exception(ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES, + "Too many simultaneous queries. Maximum: {}{}", + max_size, waiting_queries == 0 ? "" : fmt::format(", waiting: {}", waiting_queries)); } if (!is_unlimited_query) { QueryAmount amount = getQueryKindAmount(query_kind); - if (max_insert_queries_amount && query_kind == IAST::QueryKind::Insert && amount >= max_insert_queries_amount) + UInt64 waiting_inserts = waiting_insert_queries_amount.load(); + UInt64 waiting_selects = waiting_select_queries_amount.load(); + if (max_insert_queries_amount && query_kind == IAST::QueryKind::Insert && amount >= max_insert_queries_amount + waiting_inserts) throw Exception(ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES, - "Too many simultaneous insert queries. Maximum: {}, current: {}", - max_insert_queries_amount, amount); - if (max_select_queries_amount && query_kind == IAST::QueryKind::Select && amount >= max_select_queries_amount) + "Too many simultaneous insert queries. Maximum: {}, current: {}{}", + max_insert_queries_amount, amount, waiting_inserts == 0 ? "" : fmt::format(", waiting: {}", waiting_inserts)); + if (max_select_queries_amount && query_kind == IAST::QueryKind::Select && amount >= max_select_queries_amount + waiting_selects) throw Exception(ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES, - "Too many simultaneous select queries. Maximum: {}, current: {}", - max_select_queries_amount, amount); + "Too many simultaneous select queries. Maximum: {}, current: {}{}", + max_select_queries_amount, amount, waiting_selects == 0 ? "" : fmt::format(", waiting: {}", waiting_selects)); } { @@ -124,10 +130,12 @@ ProcessList::insert(const String & query_, const IAST * ast, ContextMutablePtr q * once is already processing 50+ concurrent queries (including analysts or any other users). */ + waiting_queries = waiting_queries_amount.load(); if (!is_unlimited_query && settings.max_concurrent_queries_for_all_users - && processes.size() >= settings.max_concurrent_queries_for_all_users) + && processes.size() >= settings.max_concurrent_queries_for_all_users + waiting_queries_amount) throw Exception(ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES, "Too many simultaneous queries for all users. " - "Current: {}, maximum: {}", processes.size(), settings.max_concurrent_queries_for_all_users.toString()); + "Current: {}, maximum: {}{}", processes.size(), settings.max_concurrent_queries_for_all_users.toString(), + waiting_queries == 0 ? "" : fmt::format(", waiting: {}", waiting_queries)); } /** Why we use current user? @@ -145,13 +153,15 @@ ProcessList::insert(const String & query_, const IAST * ast, ContextMutablePtr q if (user_process_list != user_to_queries.end()) { + UInt64 user_waiting_queries = user_process_list->second.waiting_queries_amount.load(); if (!is_unlimited_query && settings.max_concurrent_queries_for_user - && user_process_list->second.queries.size() >= settings.max_concurrent_queries_for_user) + && user_process_list->second.queries.size() >= settings.max_concurrent_queries_for_user + user_waiting_queries) throw Exception(ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES, "Too many simultaneous queries for user {}. " - "Current: {}, maximum: {}", + "Current: {}, maximum: {}{}", client_info.current_user, user_process_list->second.queries.size(), - settings.max_concurrent_queries_for_user.toString()); + settings.max_concurrent_queries_for_user.toString(), + user_waiting_queries == 0 ? "" : fmt::format(", waiting: {}", user_waiting_queries)); auto running_query = user_process_list->second.queries.find(client_info.current_query_id); @@ -745,4 +755,69 @@ ProcessList::QueryAmount ProcessList::getQueryKindAmount(const IAST::QueryKind & return found->second; } +void ProcessList::increaseWaitingQueryAmount(const QueryStatusPtr & status) +{ + UInt64 limit = max_waiting_queries_amount.load(); + UInt64 value = waiting_queries_amount.load(); + while (true) + { + if (value >= limit) + throw Exception(ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES, + "Too many simultaneous waiting queries. Maximum: {}, waiting: {}", + limit, value); + if (waiting_queries_amount.compare_exchange_strong(value, value + 1)) + break; + } + + // WARNING: After this point we should not throw, otherwise corresponding `decreaseWaitingQueryAmount` will not be called. + + // Update query kind counters + if (status->query_kind == IAST::QueryKind::Insert) + waiting_insert_queries_amount.fetch_add(1); + if (status->query_kind == IAST::QueryKind::Select) + waiting_select_queries_amount.fetch_add(1); + + // Update per-user counter + status->getUserProcessList()->waiting_queries_amount.fetch_add(1); + + // We have to notify because some queries might be waiting on `have_space` + // and this query leaves its space by transitioning to waiting state + have_space.notify_all(); +} + +void ProcessList::decreaseWaitingQueryAmount(const QueryStatusPtr & status) +{ + if (status->getUserProcessList()->waiting_queries_amount.fetch_sub(1) == 0) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong insert waiting query amount for user: decrease to negative"); + + if (status->query_kind == IAST::QueryKind::Insert && waiting_insert_queries_amount.fetch_sub(1) == 0) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong insert waiting query amount: decrease to negative"); + + if (status->query_kind == IAST::QueryKind::Select && waiting_select_queries_amount.fetch_sub(1) == 0) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong select waiting query amount: decrease to negative"); + + if (waiting_queries_amount.fetch_sub(1) == 0) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong waiting query amount: decrease to negative"); +} + +void ProcessList::incrementWaiters() +{ + ContextPtr context = CurrentThread::getQueryContext(); + QueryStatusPtr status = context->getProcessListElement(); + + // Query became "waiting" with the first thread that waits + if (status->waiting_threads.fetch_add(1) == 0) + increaseWaitingQueryAmount(status); +} + +void ProcessList::decrementWaiters() +{ + ContextPtr context = CurrentThread::getQueryContext(); + QueryStatusPtr status = context->getProcessListElement(); + + // Query became "non-waiting" with the last thread that no longer waits + if (status->waiting_threads.fetch_sub(1) == 1) + decreaseWaitingQueryAmount(status); +} + } diff --git a/src/Interpreters/ProcessList.h b/src/Interpreters/ProcessList.h index 1c253f562e8..75350627698 100644 --- a/src/Interpreters/ProcessList.h +++ b/src/Interpreters/ProcessList.h @@ -42,10 +42,6 @@ class ThreadStatus; class ProcessListEntry; -/** List of currently executing queries. - * Also implements limit on their number. - */ - /** Information of process list element. * To output in SHOW PROCESSLIST query. Does not contain any complex objects, that do something on copy or destructor. */ @@ -114,8 +110,13 @@ protected: /// Including EndOfStream or Exception. std::atomic is_all_data_sent { false }; + /// Number of threads for the query that are waiting for load jobs + std::atomic waiting_threads{0}; + + /// For initialization of ProcessListForUser during process insertion. void setUserProcessList(ProcessListForUser * user_process_list_); /// Be careful using it. For example, queries field of ProcessListForUser could be modified concurrently. + ProcessListForUser * getUserProcessList() { return user_process_list; } const ProcessListForUser * getUserProcessList() const { return user_process_list; } /// Sets an entry in the ProcessList associated with this QueryStatus. @@ -283,6 +284,9 @@ struct ProcessListForUser /// Count network usage for all simultaneously running queries of single user. ThrottlerPtr user_throttler; + /// Number of queries waiting on load jobs + std::atomic waiting_queries_amount{0}; + ProcessListForUserInfo getInfo(bool get_profile_events = false) const; /// Clears MemoryTracker for the user. @@ -341,6 +345,9 @@ protected: }; +/** List of currently executing queries. + * Also implements limit on their number. + */ class ProcessList : public ProcessListBase { public: @@ -399,10 +406,21 @@ protected: /// amount of queries by query kind. QueryKindAmounts query_kind_amounts; + /// limit for waiting queries. 0 means no limit. Otherwise, when limit exceeded, an exception is thrown. + std::atomic max_waiting_queries_amount{0}; + + /// amounts of waiting queries + std::atomic waiting_queries_amount{0}; + std::atomic waiting_insert_queries_amount{0}; + std::atomic waiting_select_queries_amount{0}; + void increaseQueryKindAmount(const IAST::QueryKind & query_kind); void decreaseQueryKindAmount(const IAST::QueryKind & query_kind); QueryAmount getQueryKindAmount(const IAST::QueryKind & query_kind) const; + void increaseWaitingQueryAmount(const QueryStatusPtr & status); + void decreaseWaitingQueryAmount(const QueryStatusPtr & status); + public: using EntryPtr = std::shared_ptr; @@ -458,6 +476,21 @@ public: return max_select_queries_amount; } + void setMaxWaitingQueriesAmount(UInt64 max_waiting_queries_amount_) + { + max_waiting_queries_amount.store(max_waiting_queries_amount_); + // NOTE: We cannot cancel waiting queries when limit is lowered. They have to wait anyways, but new queries will be canceled instead of waiting. + } + + size_t getMaxWaitingQueriesAmount() const + { + return max_waiting_queries_amount.load(); + } + + // Handlers for AsyncLoader waiters + void incrementWaiters(); + void decrementWaiters(); + /// Try call cancel() for input and output streams of query with specified id and user CancellationCode sendCancelToQuery(const String & current_query_id, const String & current_user, bool kill = false); CancellationCode sendCancelToQuery(QueryStatusPtr elem, bool kill = false); diff --git a/src/Storages/System/StorageSystemServerSettings.cpp b/src/Storages/System/StorageSystemServerSettings.cpp index f390985546b..bf14f757a19 100644 --- a/src/Storages/System/StorageSystemServerSettings.cpp +++ b/src/Storages/System/StorageSystemServerSettings.cpp @@ -70,6 +70,7 @@ void StorageSystemServerSettings::fillData(MutableColumns & res_columns, Context {"max_concurrent_queries", {std::to_string(context->getProcessList().getMaxSize()), ChangeableWithoutRestart::Yes}}, {"max_concurrent_insert_queries", {std::to_string(context->getProcessList().getMaxInsertQueriesAmount()), ChangeableWithoutRestart::Yes}}, {"max_concurrent_select_queries", {std::to_string(context->getProcessList().getMaxSelectQueriesAmount()), ChangeableWithoutRestart::Yes}}, + {"max_waiting_queries", {std::to_string(context->getProcessList().getMaxWaitingQueriesAmount()), ChangeableWithoutRestart::Yes}}, {"background_buffer_flush_schedule_pool_size", {std::to_string(CurrentMetrics::get(CurrentMetrics::BackgroundBufferFlushSchedulePoolSize)), ChangeableWithoutRestart::IncreaseOnly}}, {"background_schedule_pool_size", {std::to_string(CurrentMetrics::get(CurrentMetrics::BackgroundSchedulePoolSize)), ChangeableWithoutRestart::IncreaseOnly}},