diff --git a/src/Common/ConcurrencyControl.h b/src/Common/ConcurrencyControl.h index 53e04730e2e..9ea5efd53d0 100644 --- a/src/Common/ConcurrencyControl.h +++ b/src/Common/ConcurrencyControl.h @@ -7,6 +7,16 @@ #include #include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} +} + /* * Controls how many threads can be allocated for a query (or another activity). * There is a limited amount of slots for threads. It can be set with `setMaxConcurrency(limit)`. @@ -66,31 +76,42 @@ public: { ~Allocation() { - parent.free(this); // We have to lock parent's mutex to avoid race with grant() + // We have to lock parent's mutex to avoid race with grant() + // NOTE: shortcut can be added, but it requires Allocation::mutex lock even to check if shortcut is possible + parent.free(this); } - // Take one already granted slot if available + // Take one already granted slot if available. Lock-free iff there is no granted slot. [[nodiscard]] SlotPtr tryAcquire() { - std::unique_lock lock{mutex}; - if (!granted) - return {}; - granted--; - return SlotPtr(new Slot(shared_from_this())); + SlotCount value = granted.load(); + while (value) + { + if (granted.compare_exchange_strong(value, value - 1)) + { + std::unique_lock lock{mutex}; + return SlotPtr(new Slot(shared_from_this())); // can't use std::make_shared due to private ctor + } + } + return {}; // avoid unnecessary locking } private: friend struct Slot; // for release() friend class ConcurrencyControl; // for grant(), free() and ctor - Allocation(ConcurrencyControl & parent_, SlotCount limit_, SlotCount granted_) + Allocation(ConcurrencyControl & parent_, SlotCount limit_, SlotCount granted_, Waiters::iterator waiter_ = {}) : parent(parent_) , limit(limit_) , allocated(granted_) , granted(granted_) - {} + , waiter(waiter_) + { + if (allocated < limit) + *waiter = this; + } - auto free() + auto cancel() { std::unique_lock lock{mutex}; return std::pair{allocated - released, @@ -99,11 +120,6 @@ public: std::optional()}; } - void wait(Waiters::iterator waiter_) - { - waiter = waiter_; - } - // Grant single slot to allocation, returns true iff more slot(s) are required bool grant() { @@ -119,18 +135,20 @@ public: parent.release(1); std::unique_lock lock{mutex}; released++; - assert(released <= allocated); + if (released > allocated) + abort(); } ConcurrencyControl & parent; const SlotCount limit; std::mutex mutex; // the following values must be accessed under this mutex - SlotCount allocated = 0; // allocated total (including already released) - SlotCount granted = 0; // allocated, but not yet acquired + SlotCount allocated; // allocated total (including already `released`) SlotCount released = 0; - Waiters::iterator waiter; // iterator to itself in Waiters list; valid iff allocated < limit + std::atomic granted; // allocated, but not yet acquired + + const Waiters::iterator waiter; // iterator to itself in Waiters list; valid iff allocated < limit }; public: @@ -142,26 +160,30 @@ public: // NOTE: Recommended way to achieve this is to use `instance()` and do graceful shutdown of queries ~ConcurrencyControl() { - assert(waiters.empty()); + if (!waiters.empty()) + abort(); } // Allocate at least `min` and at most `max` slots. // If not all `max` slots were successfully allocated, a subscription for later allocation is created - // Use Allocation::tryAcquire() to acquire allocated slot, before running a thread. + // Use `Allocation::tryAcquire()` to acquire allocated slot, before running a thread. [[nodiscard]] AllocationPtr allocate(SlotCount min, SlotCount max) { + if (min > max) + throw DB::Exception("ConcurrencyControl: invalid allocation requirements", DB::ErrorCodes::LOGICAL_ERROR); + std::unique_lock lock{mutex}; // Acquire as much slots as we can, but not lower than `min` - SlotCount limit = std::max(min, max); - SlotCount granted = std::max(min, std::min(limit, available(lock))); + SlotCount granted = std::max(min, std::min(max, available(lock))); cur_concurrency += granted; // Create allocation and start waiting if more slots are required - auto allocation = new Allocation(*this, limit, granted); - if (granted < limit) - allocation->wait(waiters.insert(cur_waiter, allocation)); - return AllocationPtr(allocation); + if (granted < max) + return AllocationPtr(new Allocation(*this, max, granted, + waiters.insert(cur_waiter, nullptr /* pointer is set by Allocation ctor */))); + else + return AllocationPtr(new Allocation(*this, max, granted)); } void setMaxConcurrency(SlotCount value) @@ -182,8 +204,13 @@ private: void free(Allocation * allocation) { + // Allocation is allowed to be canceled even if there are: + // - `amount`: granted slots (acquired slots are not possible, because Slot holds AllocationPtr) + // - `waiter`: active waiting for more slots to be allocated + // Thus Allocation destruction may require the following lock, to avoid race conditions std::unique_lock lock{mutex}; - auto [amount, waiter] = allocation->free(); + auto [amount, waiter] = allocation->cancel(); + cur_concurrency -= amount; if (waiter) {