perf/safety/docs improvements

This commit is contained in:
Sergei Trifonov 2022-06-02 11:17:13 +02:00
parent 9884ea1945
commit db2cf73b52

View File

@ -7,6 +7,16 @@
#include <list> #include <list>
#include <condition_variable> #include <condition_variable>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
/* /*
* Controls how many threads can be allocated for a query (or another activity). * Controls how many threads can be allocated for a query (or another activity).
* There is a limited amount of slots for threads. It can be set with `setMaxConcurrency(limit)`. * There is a limited amount of slots for threads. It can be set with `setMaxConcurrency(limit)`.
@ -66,31 +76,42 @@ public:
{ {
~Allocation() ~Allocation()
{ {
parent.free(this); // We have to lock parent's mutex to avoid race with grant() // We have to lock parent's mutex to avoid race with grant()
// NOTE: shortcut can be added, but it requires Allocation::mutex lock even to check if shortcut is possible
parent.free(this);
} }
// Take one already granted slot if available // Take one already granted slot if available. Lock-free iff there is no granted slot.
[[nodiscard]] SlotPtr tryAcquire() [[nodiscard]] SlotPtr tryAcquire()
{
SlotCount value = granted.load();
while (value)
{
if (granted.compare_exchange_strong(value, value - 1))
{ {
std::unique_lock lock{mutex}; std::unique_lock lock{mutex};
if (!granted) return SlotPtr(new Slot(shared_from_this())); // can't use std::make_shared due to private ctor
return {}; }
granted--; }
return SlotPtr(new Slot(shared_from_this())); return {}; // avoid unnecessary locking
} }
private: private:
friend struct Slot; // for release() friend struct Slot; // for release()
friend class ConcurrencyControl; // for grant(), free() and ctor friend class ConcurrencyControl; // for grant(), free() and ctor
Allocation(ConcurrencyControl & parent_, SlotCount limit_, SlotCount granted_) Allocation(ConcurrencyControl & parent_, SlotCount limit_, SlotCount granted_, Waiters::iterator waiter_ = {})
: parent(parent_) : parent(parent_)
, limit(limit_) , limit(limit_)
, allocated(granted_) , allocated(granted_)
, granted(granted_) , granted(granted_)
{} , waiter(waiter_)
{
if (allocated < limit)
*waiter = this;
}
auto free() auto cancel()
{ {
std::unique_lock lock{mutex}; std::unique_lock lock{mutex};
return std::pair{allocated - released, return std::pair{allocated - released,
@ -99,11 +120,6 @@ public:
std::optional<Waiters::iterator>()}; std::optional<Waiters::iterator>()};
} }
void wait(Waiters::iterator waiter_)
{
waiter = waiter_;
}
// Grant single slot to allocation, returns true iff more slot(s) are required // Grant single slot to allocation, returns true iff more slot(s) are required
bool grant() bool grant()
{ {
@ -119,18 +135,20 @@ public:
parent.release(1); parent.release(1);
std::unique_lock lock{mutex}; std::unique_lock lock{mutex};
released++; released++;
assert(released <= allocated); if (released > allocated)
abort();
} }
ConcurrencyControl & parent; ConcurrencyControl & parent;
const SlotCount limit; const SlotCount limit;
std::mutex mutex; // the following values must be accessed under this mutex std::mutex mutex; // the following values must be accessed under this mutex
SlotCount allocated = 0; // allocated total (including already released) SlotCount allocated; // allocated total (including already `released`)
SlotCount granted = 0; // allocated, but not yet acquired
SlotCount released = 0; SlotCount released = 0;
Waiters::iterator waiter; // iterator to itself in Waiters list; valid iff allocated < limit std::atomic<SlotCount> granted; // allocated, but not yet acquired
const Waiters::iterator waiter; // iterator to itself in Waiters list; valid iff allocated < limit
}; };
public: public:
@ -142,26 +160,30 @@ public:
// NOTE: Recommended way to achieve this is to use `instance()` and do graceful shutdown of queries // NOTE: Recommended way to achieve this is to use `instance()` and do graceful shutdown of queries
~ConcurrencyControl() ~ConcurrencyControl()
{ {
assert(waiters.empty()); if (!waiters.empty())
abort();
} }
// Allocate at least `min` and at most `max` slots. // Allocate at least `min` and at most `max` slots.
// If not all `max` slots were successfully allocated, a subscription for later allocation is created // If not all `max` slots were successfully allocated, a subscription for later allocation is created
// Use Allocation::tryAcquire() to acquire allocated slot, before running a thread. // Use `Allocation::tryAcquire()` to acquire allocated slot, before running a thread.
[[nodiscard]] AllocationPtr allocate(SlotCount min, SlotCount max) [[nodiscard]] AllocationPtr allocate(SlotCount min, SlotCount max)
{ {
if (min > max)
throw DB::Exception("ConcurrencyControl: invalid allocation requirements", DB::ErrorCodes::LOGICAL_ERROR);
std::unique_lock lock{mutex}; std::unique_lock lock{mutex};
// Acquire as much slots as we can, but not lower than `min` // Acquire as much slots as we can, but not lower than `min`
SlotCount limit = std::max(min, max); SlotCount granted = std::max(min, std::min(max, available(lock)));
SlotCount granted = std::max(min, std::min(limit, available(lock)));
cur_concurrency += granted; cur_concurrency += granted;
// Create allocation and start waiting if more slots are required // Create allocation and start waiting if more slots are required
auto allocation = new Allocation(*this, limit, granted); if (granted < max)
if (granted < limit) return AllocationPtr(new Allocation(*this, max, granted,
allocation->wait(waiters.insert(cur_waiter, allocation)); waiters.insert(cur_waiter, nullptr /* pointer is set by Allocation ctor */)));
return AllocationPtr(allocation); else
return AllocationPtr(new Allocation(*this, max, granted));
} }
void setMaxConcurrency(SlotCount value) void setMaxConcurrency(SlotCount value)
@ -182,8 +204,13 @@ private:
void free(Allocation * allocation) void free(Allocation * allocation)
{ {
// Allocation is allowed to be canceled even if there are:
// - `amount`: granted slots (acquired slots are not possible, because Slot holds AllocationPtr)
// - `waiter`: active waiting for more slots to be allocated
// Thus Allocation destruction may require the following lock, to avoid race conditions
std::unique_lock lock{mutex}; std::unique_lock lock{mutex};
auto [amount, waiter] = allocation->free(); auto [amount, waiter] = allocation->cancel();
cur_concurrency -= amount; cur_concurrency -= amount;
if (waiter) if (waiter)
{ {