add diagnostics for concurrency control

This commit is contained in:
serxa 2024-03-15 18:40:50 +00:00
parent d045ab150e
commit dd6c1ac19a
7 changed files with 53 additions and 4 deletions

View File

@ -737,6 +737,14 @@ Number of sessions (connections) to ZooKeeper. Should be no more than one, becau
Number of watches (event subscriptions) in ZooKeeper. Number of watches (event subscriptions) in ZooKeeper.
### ConcurrencyControlAcquired
Total number of acquired CPU slots.
### ConcurrencyControlSoftLimit
Value of soft limit on number of CPU slots.
**See Also** **See Also**
- [system.asynchronous_metrics](../../operations/system-tables/asynchronous_metrics.md#system_tables-asynchronous_metrics) — Contains periodically calculated metrics. - [system.asynchronous_metrics](../../operations/system-tables/asynchronous_metrics.md#system_tables-asynchronous_metrics) — Contains periodically calculated metrics.

View File

@ -1433,6 +1433,7 @@ try
concurrent_threads_soft_limit = value; concurrent_threads_soft_limit = value;
} }
ConcurrencyControl::instance().setMaxConcurrency(concurrent_threads_soft_limit); ConcurrencyControl::instance().setMaxConcurrency(concurrent_threads_soft_limit);
LOG_INFO(log, "ConcurrencyControl limit is set to {}", concurrent_threads_soft_limit);
global_context->getProcessList().setMaxSize(new_server_settings.max_concurrent_queries); global_context->getProcessList().setMaxSize(new_server_settings.max_concurrent_queries);
global_context->getProcessList().setMaxInsertQueriesAmount(new_server_settings.max_concurrent_insert_queries); global_context->getProcessList().setMaxInsertQueriesAmount(new_server_settings.max_concurrent_insert_queries);

View File

@ -1,7 +1,23 @@
#include <Common/ISlotControl.h>
#include <Common/ConcurrencyControl.h> #include <Common/ConcurrencyControl.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/ProfileEvents.h>
namespace ProfileEvents
{
extern const Event ConcurrencyControlGrantedHard;
extern const Event ConcurrencyControlGrantDelayed;
extern const Event ConcurrencyControlAcquiredTotal;
extern const Event ConcurrencyControlAllocationDelayed;
}
namespace CurrentMetrics
{
extern const Metric ConcurrencyControlAcquired;
extern const Metric ConcurrencyControlSoftLimit;
}
namespace DB namespace DB
{ {
@ -16,7 +32,8 @@ ConcurrencyControl::Slot::~Slot()
} }
ConcurrencyControl::Slot::Slot(SlotAllocationPtr && allocation_) ConcurrencyControl::Slot::Slot(SlotAllocationPtr && allocation_)
: allocation(std::move(allocation_)) : IAcquiredSlot(CurrentMetrics::ConcurrencyControlAcquired)
, allocation(std::move(allocation_))
{ {
} }
@ -34,6 +51,7 @@ ConcurrencyControl::Allocation::~Allocation()
{ {
if (granted.compare_exchange_strong(value, value - 1)) if (granted.compare_exchange_strong(value, value - 1))
{ {
ProfileEvents::increment(ProfileEvents::ConcurrencyControlAcquiredTotal, 1);
std::unique_lock lock{mutex}; std::unique_lock lock{mutex};
return AcquiredSlotPtr(new Slot(shared_from_this())); // can't use std::make_shared due to private ctor return AcquiredSlotPtr(new Slot(shared_from_this())); // can't use std::make_shared due to private ctor
} }
@ -84,6 +102,7 @@ void ConcurrencyControl::Allocation::release()
ConcurrencyControl::ConcurrencyControl() ConcurrencyControl::ConcurrencyControl()
: cur_waiter(waiters.end()) : cur_waiter(waiters.end())
, max_concurrency_metric(CurrentMetrics::ConcurrencyControlSoftLimit, 0)
{ {
} }
@ -103,11 +122,16 @@ ConcurrencyControl::~ConcurrencyControl()
// Acquire as many slots as we can, but not lower than `min` // Acquire as many slots as we can, but not lower than `min`
SlotCount granted = std::max(min, std::min(max, available(lock))); SlotCount granted = std::max(min, std::min(max, available(lock)));
cur_concurrency += granted; cur_concurrency += granted;
ProfileEvents::increment(ProfileEvents::ConcurrencyControlGrantedHard, granted);
// Create allocation and start waiting if more slots are required // Create allocation and start waiting if more slots are required
if (granted < max) if (granted < max)
{
ProfileEvents::increment(ProfileEvents::ConcurrencyControlGrantDelayed, max - granted);
ProfileEvents::increment(ProfileEvents::ConcurrencyControlAllocationDelayed);
return SlotAllocationPtr(new Allocation(*this, max, granted, return SlotAllocationPtr(new Allocation(*this, max, granted,
waiters.insert(cur_waiter, nullptr /* pointer is set by Allocation ctor */))); waiters.insert(cur_waiter, nullptr /* pointer is set by Allocation ctor */)));
}
else else
return SlotAllocationPtr(new Allocation(*this, max, granted)); return SlotAllocationPtr(new Allocation(*this, max, granted));
} }
@ -116,6 +140,7 @@ void ConcurrencyControl::setMaxConcurrency(SlotCount value)
{ {
std::unique_lock lock{mutex}; std::unique_lock lock{mutex};
max_concurrency = std::max<SlotCount>(1, value); // never allow max_concurrency to be zero max_concurrency = std::max<SlotCount>(1, value); // never allow max_concurrency to be zero
max_concurrency_metric.changeTo(max_concurrency == UnlimitedSlots ? 0 : max_concurrency);
schedule(lock); schedule(lock);
} }

View File

@ -130,6 +130,7 @@ private:
Waiters::iterator cur_waiter; // round-robin pointer Waiters::iterator cur_waiter; // round-robin pointer
SlotCount max_concurrency = UnlimitedSlots; SlotCount max_concurrency = UnlimitedSlots;
SlotCount cur_concurrency = 0; SlotCount cur_concurrency = 0;
CurrentMetrics::Increment max_concurrency_metric;
}; };
} }

View File

@ -285,7 +285,10 @@
M(HTTPConnectionsTotal, "Total count of all sessions: stored in the pool and actively used right now for http hosts") \ M(HTTPConnectionsTotal, "Total count of all sessions: stored in the pool and actively used right now for http hosts") \
\ \
M(AddressesActive, "Total count of addresses which are used for creation connections with connection pools") \ M(AddressesActive, "Total count of addresses which are used for creation connections with connection pools") \
\
M(ConcurrencyControlAcquired, "Total number of acquired CPU slots") \
M(ConcurrencyControlSoftLimit, "Value of soft limit on number of CPU slots") \
/**/
#ifdef APPLY_FOR_EXTERNAL_METRICS #ifdef APPLY_FOR_EXTERNAL_METRICS
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M) #define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M)

View File

@ -1,11 +1,11 @@
#pragma once #pragma once
#include <Common/CurrentMetrics.h>
#include <limits> #include <limits>
#include <memory> #include <memory>
#include <base/types.h> #include <base/types.h>
#include <boost/core/noncopyable.hpp> #include <boost/core/noncopyable.hpp>
namespace DB namespace DB
{ {
@ -39,7 +39,13 @@ constexpr SlotCount UnlimitedSlots = std::numeric_limits<SlotCount>::max();
class IAcquiredSlot : public std::enable_shared_from_this<IAcquiredSlot>, boost::noncopyable class IAcquiredSlot : public std::enable_shared_from_this<IAcquiredSlot>, boost::noncopyable
{ {
public: public:
IAcquiredSlot(CurrentMetrics::Metric metric, CurrentMetrics::Value amount = 1)
: acquired_slot_increment(metric, amount)
{}
virtual ~IAcquiredSlot() = default; virtual ~IAcquiredSlot() = default;
private:
CurrentMetrics::Increment acquired_slot_increment;
}; };
using AcquiredSlotPtr = std::shared_ptr<IAcquiredSlot>; using AcquiredSlotPtr = std::shared_ptr<IAcquiredSlot>;

View File

@ -726,7 +726,12 @@ The server successfully detected this situation and will download merged part fr
M(AddressesDiscovered, "Total count of new addresses in dns resolve results for http connections") \ M(AddressesDiscovered, "Total count of new addresses in dns resolve results for http connections") \
M(AddressesExpired, "Total count of expired addresses which is no longer presented in dns resolve results for http connections") \ M(AddressesExpired, "Total count of expired addresses which is no longer presented in dns resolve results for http connections") \
M(AddressesMarkedAsFailed, "Total count of addresses which has been marked as faulty due to connection errors for http connections") \ M(AddressesMarkedAsFailed, "Total count of addresses which has been marked as faulty due to connection errors for http connections") \
\
M(ConcurrencyControlGrantedHard, "Number of CPU slot granted according to guarantee of 1 thread per query and for queries with setting 'use_concurrency_control' = 0") \
M(ConcurrencyControlGrantDelayed, "Number of CPU slot not granted initially and required to wait for a free CPU slot") \
M(ConcurrencyControlAcquiredTotal, "Total number of CPU slot acquired") \
M(ConcurrencyControlAllocationDelayed, "Total number of CPU slot allocations (queries) that were required to wait for slots to upscale") \
/**/
#ifdef APPLY_FOR_EXTERNAL_EVENTS #ifdef APPLY_FOR_EXTERNAL_EVENTS
#define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M) #define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M)