split ISlotControl from ConcurrencyControl

This commit is contained in:
serxa 2024-01-28 20:26:55 +00:00
parent 29d54dab55
commit ba85642453
7 changed files with 132 additions and 56 deletions

View File

@ -1366,7 +1366,7 @@ try
global_context->setMaxDatabaseNumToWarn(new_server_settings.max_database_num_to_warn);
global_context->setMaxPartNumToWarn(new_server_settings.max_part_num_to_warn);
ConcurrencyControl::SlotCount concurrent_threads_soft_limit = ConcurrencyControl::Unlimited;
SlotCount concurrent_threads_soft_limit = UnlimitedSlots;
if (new_server_settings.concurrent_threads_soft_limit_num > 0 && new_server_settings.concurrent_threads_soft_limit_num < concurrent_threads_soft_limit)
concurrent_threads_soft_limit = new_server_settings.concurrent_threads_soft_limit_num;
if (new_server_settings.concurrent_threads_soft_limit_ratio_to_cores > 0)

View File

@ -12,10 +12,10 @@ namespace ErrorCodes
ConcurrencyControl::Slot::~Slot()
{
allocation->release();
static_cast<ConcurrencyControl::Allocation&>(*allocation).release();
}
ConcurrencyControl::Slot::Slot(AllocationPtr && allocation_)
ConcurrencyControl::Slot::Slot(SlotAllocationPtr && allocation_)
: allocation(std::move(allocation_))
{
}
@ -27,7 +27,7 @@ ConcurrencyControl::Allocation::~Allocation()
parent.free(this);
}
[[nodiscard]] ConcurrencyControl::SlotPtr ConcurrencyControl::Allocation::tryAcquire()
[[nodiscard]] AcquiredSlotPtr ConcurrencyControl::Allocation::tryAcquire()
{
SlotCount value = granted.load();
while (value)
@ -35,15 +35,21 @@ ConcurrencyControl::Allocation::~Allocation()
if (granted.compare_exchange_strong(value, value - 1))
{
std::unique_lock lock{mutex};
return SlotPtr(new Slot(shared_from_this())); // can't use std::make_shared due to private ctor
return AcquiredSlotPtr(new Slot(shared_from_this())); // can't use std::make_shared due to private ctor
}
}
return {}; // avoid unnecessary locking
}
ConcurrencyControl::SlotCount ConcurrencyControl::Allocation::grantedCount() const
SlotCount ConcurrencyControl::Allocation::grantedCount() const
{
return granted;
return granted.load();
}
SlotCount ConcurrencyControl::Allocation::allocatedCount() const
{
std::unique_lock lock{mutex};
return allocated;
}
ConcurrencyControl::Allocation::Allocation(ConcurrencyControl & parent_, SlotCount limit_, SlotCount granted_, Waiters::iterator waiter_)
@ -87,7 +93,7 @@ ConcurrencyControl::~ConcurrencyControl()
abort();
}
[[nodiscard]] ConcurrencyControl::AllocationPtr ConcurrencyControl::allocate(SlotCount min, SlotCount max)
[[nodiscard]] SlotAllocationPtr ConcurrencyControl::allocate(SlotCount min, SlotCount max)
{
if (min > max)
throw Exception(ErrorCodes::LOGICAL_ERROR, "ConcurrencyControl: invalid allocation requirements");
@ -100,13 +106,13 @@ ConcurrencyControl::~ConcurrencyControl()
// Create allocation and start waiting if more slots are required
if (granted < max)
return AllocationPtr(new Allocation(*this, max, granted,
return SlotAllocationPtr(new Allocation(*this, max, granted,
waiters.insert(cur_waiter, nullptr /* pointer is set by Allocation ctor */)));
else
return AllocationPtr(new Allocation(*this, max, granted));
return SlotAllocationPtr(new Allocation(*this, max, granted));
}
void ConcurrencyControl::setMaxConcurrency(ConcurrencyControl::SlotCount value)
void ConcurrencyControl::setMaxConcurrency(SlotCount value)
{
std::unique_lock lock{mutex};
max_concurrency = std::max<SlotCount>(1, value); // never allow max_concurrency to be zero
@ -162,7 +168,7 @@ void ConcurrencyControl::schedule(std::unique_lock<std::mutex> &)
}
}
ConcurrencyControl::SlotCount ConcurrencyControl::available(std::unique_lock<std::mutex> &) const
SlotCount ConcurrencyControl::available(std::unique_lock<std::mutex> &) const
{
if (cur_concurrency < max_concurrency)
return max_concurrency - cur_concurrency;

View File

@ -7,6 +7,7 @@
#include <base/types.h>
#include <boost/core/noncopyable.hpp>
#include <Common/ISlotControl.h>
namespace DB
{
@ -34,41 +35,35 @@ namespace DB
* Oversubscription is possible: total amount of allocated slots can exceed `setMaxConcurrency(limit)`
* because `min` amount of slots is allocated for each query unconditionally.
*/
class ConcurrencyControl : boost::noncopyable
class ConcurrencyControl : public ISlotControl
{
public:
struct Allocation;
using AllocationPtr = std::shared_ptr<Allocation>;
using SlotCount = UInt64;
using Waiters = std::list<Allocation *>;
static constexpr SlotCount Unlimited = std::numeric_limits<SlotCount>::max();
// Scoped guard for acquired slot, see Allocation::tryAcquire()
struct Slot : boost::noncopyable
struct Slot : public IAcquiredSlot
{
~Slot();
~Slot() override;
private:
friend struct Allocation; // for ctor
explicit Slot(AllocationPtr && allocation_);
explicit Slot(SlotAllocationPtr && allocation_);
AllocationPtr allocation;
SlotAllocationPtr allocation;
};
// FIXME: have to be unique_ptr, but ThreadFromGlobalPool does not support move semantics yet
using SlotPtr = std::shared_ptr<Slot>;
// Manages group of slots for a single query, see ConcurrencyControl::allocate(min, max)
struct Allocation : std::enable_shared_from_this<Allocation>, boost::noncopyable
struct Allocation : public ISlotAllocation
{
~Allocation();
~Allocation() override;
// Take one already granted slot if available. Lock-free iff there is no granted slot.
[[nodiscard]] SlotPtr tryAcquire();
[[nodiscard]] AcquiredSlotPtr tryAcquire() override;
SlotCount grantedCount() const;
SlotCount grantedCount() const override;
SlotCount allocatedCount() const override;
private:
friend struct Slot; // for release()
@ -94,7 +89,7 @@ public:
ConcurrencyControl & parent;
const SlotCount limit;
std::mutex mutex; // the following values must be accessed under this mutex
mutable std::mutex mutex; // the following values must be accessed under this mutex
SlotCount allocated; // allocated total (including already `released`)
SlotCount released = 0;
@ -103,17 +98,16 @@ public:
const Waiters::iterator waiter; // iterator to itself in Waiters list; valid iff allocated < limit
};
public:
ConcurrencyControl();
// WARNING: all Allocation objects MUST be destructed before ConcurrencyControl
// NOTE: Recommended way to achieve this is to use `instance()` and do graceful shutdown of queries
~ConcurrencyControl();
~ConcurrencyControl() override;
// Allocate at least `min` and at most `max` slots.
// If not all `max` slots were successfully allocated, a subscription for later allocation is created
// Use `Allocation::tryAcquire()` to acquire allocated slot, before running a thread.
[[nodiscard]] AllocationPtr allocate(SlotCount min, SlotCount max);
[[nodiscard]] SlotAllocationPtr allocate(SlotCount min, SlotCount max) override;
void setMaxConcurrency(SlotCount value);
@ -134,7 +128,7 @@ private:
std::mutex mutex;
Waiters waiters;
Waiters::iterator cur_waiter; // round-robin pointer
SlotCount max_concurrency = Unlimited;
SlotCount max_concurrency = UnlimitedSlots;
SlotCount cur_concurrency = 0;
};

76
src/Common/ISlotControl.h Normal file
View File

@ -0,0 +1,76 @@
#pragma once
#include <limits>
#include <memory>
#include <base/types.h>
#include <boost/core/noncopyable.hpp>
namespace DB
{
// Interfaces for abstract "slot" allocation and control.
// Slot is a virtual entity existing in a limited amount (CPUs or memory chunks, etc).
//
// Every slot can be in one of the following states:
// * free: slot is available to be allocated.
// * allocated: slot is allocated to a specific ISlotAllocation.
//
// Allocated slots can be considered as:
// * granted: allocated, but not yet acquired.
// * acquired: acquired using IAcquiredSlot.
//
// Example for CPU (see ConcurrencyControl.h). Every slot represents one CPU in the system.
// Slot allocation is a request to allocate specific number of CPUs for a specific query.
// Acquired slot is an entity that is held by a thread as long as it is running. This allows
// total number of threads in the system to be limited and the distribution process to be controlled.
//
// TODO:
// - for preemption - ability to return granted slot back and reacquire it later.
// - for memory allocations - variable size of slots (in bytes).
/// Number of slots
using SlotCount = UInt64;
/// Unlimited number of slots
constexpr SlotCount UnlimitedSlots = std::numeric_limits<SlotCount>::max();
/// Acquired slot holder. Slot is considered to be acquired as long the object exists.
class IAcquiredSlot : public std::enable_shared_from_this<IAcquiredSlot>, boost::noncopyable
{
public:
virtual ~IAcquiredSlot() = default;
};
using AcquiredSlotPtr = std::shared_ptr<IAcquiredSlot>;
/// Request for allocation of slots from ISlotControl.
/// Allows for more slots to be acquired and the whole request to be canceled.
class ISlotAllocation : public std::enable_shared_from_this<ISlotAllocation>, boost::noncopyable
{
public:
virtual ~ISlotAllocation() = default;
/// Take one already granted slot if available.
[[nodiscard]] virtual AcquiredSlotPtr tryAcquire() = 0;
/// Returns the number of granted slots for given allocation (i.e. available to be acquired)
virtual SlotCount grantedCount() const = 0;
/// Returns the total number of slots allocated at the moment (acquired and granted)
virtual SlotCount allocatedCount() const = 0;
};
using SlotAllocationPtr = std::shared_ptr<ISlotAllocation>;
class ISlotControl : boost::noncopyable
{
public:
virtual ~ISlotControl() = default;
// Allocate at least `min` and at most `max` slots.
// If not all `max` slots were successfully allocated, a "subscription" for later allocation is created
[[nodiscard]] virtual SlotAllocationPtr allocate(SlotCount min, SlotCount max) = 0;
};
}

View File

@ -15,7 +15,7 @@ struct ConcurrencyControlTest
{
ConcurrencyControl cc;
explicit ConcurrencyControlTest(ConcurrencyControl::SlotCount limit = ConcurrencyControl::Unlimited)
explicit ConcurrencyControlTest(SlotCount limit = UnlimitedSlots)
{
cc.setMaxConcurrency(limit);
}
@ -25,7 +25,7 @@ TEST(ConcurrencyControl, Unlimited)
{
ConcurrencyControlTest t; // unlimited number of slots
auto slots = t.cc.allocate(0, 100500);
std::vector<ConcurrencyControl::SlotPtr> acquired;
std::vector<AcquiredSlotPtr> acquired;
while (auto slot = slots->tryAcquire())
acquired.emplace_back(std::move(slot));
ASSERT_TRUE(acquired.size() == 100500);
@ -34,14 +34,14 @@ TEST(ConcurrencyControl, Unlimited)
TEST(ConcurrencyControl, Fifo)
{
ConcurrencyControlTest t(1); // use single slot
std::vector<ConcurrencyControl::AllocationPtr> allocations;
std::vector<SlotAllocationPtr> allocations;
constexpr int count = 42;
allocations.reserve(count);
for (int i = 0; i < count; i++)
allocations.emplace_back(t.cc.allocate(0, 1));
for (int i = 0; i < count; i++)
{
ConcurrencyControl::SlotPtr holder;
AcquiredSlotPtr holder;
for (int j = 0; j < count; j++)
{
auto slot = allocations[j]->tryAcquire();
@ -60,11 +60,11 @@ TEST(ConcurrencyControl, Fifo)
TEST(ConcurrencyControl, Oversubscription)
{
ConcurrencyControlTest t(10);
std::vector<ConcurrencyControl::AllocationPtr> allocations;
std::vector<SlotAllocationPtr> allocations;
allocations.reserve(10);
for (int i = 0; i < 10; i++)
allocations.emplace_back(t.cc.allocate(1, 2));
std::vector<ConcurrencyControl::SlotPtr> slots;
std::vector<AcquiredSlotPtr> slots;
// Normal allocation using maximum amount of slots
for (int i = 0; i < 5; i++)
{
@ -90,7 +90,7 @@ TEST(ConcurrencyControl, ReleaseUnacquiredSlots)
{
ConcurrencyControlTest t(10);
{
std::vector<ConcurrencyControl::AllocationPtr> allocations;
std::vector<SlotAllocationPtr> allocations;
allocations.reserve(10);
for (int i = 0; i < 10; i++)
allocations.emplace_back(t.cc.allocate(1, 2));
@ -98,7 +98,7 @@ TEST(ConcurrencyControl, ReleaseUnacquiredSlots)
}
// Check that slots were actually released
auto allocation = t.cc.allocate(0, 20);
std::vector<ConcurrencyControl::SlotPtr> acquired;
std::vector<AcquiredSlotPtr> acquired;
while (auto slot = allocation->tryAcquire())
acquired.emplace_back(std::move(slot));
ASSERT_TRUE(acquired.size() == 10);
@ -110,7 +110,7 @@ TEST(ConcurrencyControl, DestroyNotFullyAllocatedAllocation)
for (int i = 0; i < 3; i++)
{
auto allocation = t.cc.allocate(5, 20);
std::vector<ConcurrencyControl::SlotPtr> acquired;
std::vector<AcquiredSlotPtr> acquired;
while (auto slot = allocation->tryAcquire())
acquired.emplace_back(std::move(slot));
ASSERT_TRUE(acquired.size() == 10);
@ -122,7 +122,7 @@ TEST(ConcurrencyControl, DestroyAllocationBeforeSlots)
ConcurrencyControlTest t(10);
for (int i = 0; i < 3; i++)
{
std::vector<ConcurrencyControl::SlotPtr> acquired;
std::vector<AcquiredSlotPtr> acquired;
auto allocation = t.cc.allocate(5, 20);
while (auto slot = allocation->tryAcquire())
acquired.emplace_back(std::move(slot));
@ -135,7 +135,7 @@ TEST(ConcurrencyControl, GrantReleasedToTheSameAllocation)
{
ConcurrencyControlTest t(3);
auto allocation = t.cc.allocate(0, 10);
std::list<ConcurrencyControl::SlotPtr> acquired;
std::list<AcquiredSlotPtr> acquired;
while (auto slot = allocation->tryAcquire())
acquired.emplace_back(std::move(slot));
ASSERT_TRUE(acquired.size() == 3); // 0 1 2
@ -183,7 +183,7 @@ TEST(ConcurrencyControl, SetSlotCount)
{
ConcurrencyControlTest t(10);
auto allocation = t.cc.allocate(5, 30);
std::vector<ConcurrencyControl::SlotPtr> acquired;
std::vector<AcquiredSlotPtr> acquired;
while (auto slot = allocation->tryAcquire())
acquired.emplace_back(std::move(slot));
ASSERT_TRUE(acquired.size() == 10);
@ -200,7 +200,7 @@ TEST(ConcurrencyControl, SetSlotCount)
ASSERT_TRUE(acquired.size() == 5);
// Check that newly added slots are equally distributed over waiting allocations
std::vector<ConcurrencyControl::SlotPtr> acquired2;
std::vector<AcquiredSlotPtr> acquired2;
auto allocation2 = t.cc.allocate(0, 30);
ASSERT_TRUE(!allocation->tryAcquire());
t.cc.setMaxConcurrency(15); // 10 slots added: 5 to the first allocation and 5 to the second one
@ -224,7 +224,7 @@ TEST(ConcurrencyControl, MultipleThreads)
auto run_query = [&] (size_t max_threads)
{
ConcurrencyControl::AllocationPtr slots = t.cc.allocate(1, max_threads);
SlotAllocationPtr slots = t.cc.allocate(1, max_threads);
std::mutex threads_mutex;
std::vector<std::thread> threads;
threads.reserve(max_threads);

View File

@ -138,8 +138,8 @@ bool PipelineExecutor::executeStep(std::atomic_bool * yield_flag)
initializeExecution(1, true);
// Acquire slot until we are done
single_thread_slot = slots->tryAcquire();
chassert(single_thread_slot && "Unable to allocate slot for the first thread, but we just allocated at least one slot");
single_thread_cpu_slot = cpu_slots->tryAcquire();
chassert(single_thread_cpu_slot && "Unable to allocate cpu slot for the first thread, but we just allocated at least one slot");
if (yield_flag && *yield_flag)
return true;
@ -155,7 +155,7 @@ bool PipelineExecutor::executeStep(std::atomic_bool * yield_flag)
if (node->exception)
std::rethrow_exception(node->exception);
single_thread_slot.reset();
single_thread_cpu_slot.reset();
finalizeExecution();
return false;
@ -333,8 +333,8 @@ void PipelineExecutor::initializeExecution(size_t num_threads, bool concurrency_
/// Allocate CPU slots from concurrency control
size_t min_threads = concurrency_control ? 1uz : num_threads;
slots = ConcurrencyControl::instance().allocate(min_threads, num_threads);
use_threads = slots->grantedCount();
cpu_slots = ConcurrencyControl::instance().allocate(min_threads, num_threads);
use_threads = cpu_slots->grantedCount();
Queue queue;
graph->initializeExecution(queue);
@ -348,7 +348,7 @@ void PipelineExecutor::initializeExecution(size_t num_threads, bool concurrency_
void PipelineExecutor::spawnThreads()
{
while (auto slot = slots->tryAcquire())
while (auto slot = cpu_slots->tryAcquire())
{
size_t thread_num = threads.fetch_add(1);
@ -405,7 +405,7 @@ void PipelineExecutor::executeImpl(size_t num_threads, bool concurrency_control)
}
else
{
auto slot = slots->tryAcquire();
auto slot = cpu_slots->tryAcquire();
executeSingleThread(0);
}

View File

@ -68,8 +68,8 @@ private:
ExecutorTasks tasks;
/// Concurrency control related
ConcurrencyControl::AllocationPtr slots;
ConcurrencyControl::SlotPtr single_thread_slot; // slot for single-thread mode to work using executeStep()
SlotAllocationPtr cpu_slots;
AcquiredSlotPtr single_thread_cpu_slot; // cpu slot for single-thread mode to work using executeStep()
std::unique_ptr<ThreadPool> pool;
std::atomic_size_t threads = 0;