Revert "Revert "Support resource request canceling""

This commit is contained in:
Sergei Trifonov 2024-02-29 13:53:27 +01:00 committed by GitHub
parent 0ad0344dc7
commit f8561b2265
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 224 additions and 108 deletions

View File

@ -26,7 +26,9 @@ priority: 0
is_active: 0
active_children: 0
dequeued_requests: 67
canceled_requests: 0
dequeued_cost: 4692272
canceled_cost: 0
busy_periods: 63
vruntime: 938454.1999999989
system_vruntime: ᴺᵁᴸᴸ
@ -54,7 +56,9 @@ Columns:
- `is_active` (`UInt8`) - Whether this node is currently active - has resource requests to be dequeued and constraints satisfied.
- `active_children` (`UInt64`) - The number of children in active state.
- `dequeued_requests` (`UInt64`) - The total number of resource requests dequeued from this node.
- `canceled_requests` (`UInt64`) - The total number of resource requests canceled from this node.
- `dequeued_cost` (`UInt64`) - The sum of costs (e.g. size in bytes) of all requests dequeued from this node.
- `canceled_cost` (`UInt64`) - The sum of costs (e.g. size in bytes) of all requests canceled from this node.
- `busy_periods` (`UInt64`) - The total number of deactivations of this node.
- `vruntime` (`Nullable(Float64)`) - For children of `fair` nodes only. Virtual runtime of a node used by SFQ algorithm to select the next child to process in a max-min fair manner.
- `system_vruntime` (`Nullable(Float64)`) - For `fair` nodes only. Virtual runtime showing `vruntime` of the last processed resource request. Used during child activation as the new value of `vruntime`.

View File

@ -387,7 +387,9 @@ public:
/// Introspection
std::atomic<UInt64> dequeued_requests{0};
std::atomic<UInt64> canceled_requests{0};
std::atomic<ResourceCost> dequeued_cost{0};
std::atomic<ResourceCost> canceled_cost{0};
std::atomic<UInt64> busy_periods{0};
};

View File

@ -50,6 +50,12 @@ public:
/// Should be called outside of scheduling subsystem, implementation must be thread-safe.
virtual void enqueueRequest(ResourceRequest * request) = 0;
/// Cancel previously enqueued request.
/// Returns `false` and does nothing given unknown or already executed request.
/// Returns `true` if requests has been found and canceled.
/// Should be called outside of scheduling subsystem, implementation must be thread-safe.
virtual bool cancelRequest(ResourceRequest * request) = 0;
/// For introspection
ResourceCost getBudget() const
{

View File

@ -134,56 +134,65 @@ public:
std::pair<ResourceRequest *, bool> dequeueRequest() override
{
if (heap_size == 0)
return {nullptr, false};
// Recursively pull request from child
auto [request, child_active] = items.front().child->dequeueRequest();
assert(request != nullptr);
std::pop_heap(items.begin(), items.begin() + heap_size);
Item & current = items[heap_size - 1];
// SFQ fairness invariant: system vruntime equals last served request start-time
assert(current.vruntime >= system_vruntime);
system_vruntime = current.vruntime;
// By definition vruntime is amount of consumed resource (cost) divided by weight
current.vruntime += double(request->cost) / current.child->info.weight;
max_vruntime = std::max(max_vruntime, current.vruntime);
if (child_active) // Put active child back in heap after vruntime update
// Cycle is required to do deactivations in the case of canceled requests, when dequeueRequest returns `nullptr`
while (true)
{
std::push_heap(items.begin(), items.begin() + heap_size);
}
else // Deactivate child if it is empty, but remember it's vruntime for latter activations
{
heap_size--;
if (heap_size == 0)
return {nullptr, false};
// Store index of this inactive child in `parent.idx`
// This enables O(1) search of inactive children instead of O(n)
current.child->info.parent.idx = heap_size;
}
// Recursively pull request from child
auto [request, child_active] = items.front().child->dequeueRequest();
std::pop_heap(items.begin(), items.begin() + heap_size);
Item & current = items[heap_size - 1];
// Reset any difference between children on busy period end
if (heap_size == 0)
{
// Reset vtime to zero to avoid floating-point error accumulation,
// but do not reset too often, because it's O(N)
UInt64 ns = clock_gettime_ns();
if (last_reset_ns + 1000000000 < ns)
if (request)
{
last_reset_ns = ns;
for (Item & item : items)
item.vruntime = 0;
max_vruntime = 0;
}
system_vruntime = max_vruntime;
busy_periods++;
}
// SFQ fairness invariant: system vruntime equals last served request start-time
assert(current.vruntime >= system_vruntime);
system_vruntime = current.vruntime;
dequeued_requests++;
dequeued_cost += request->cost;
return {request, heap_size > 0};
// By definition vruntime is amount of consumed resource (cost) divided by weight
current.vruntime += double(request->cost) / current.child->info.weight;
max_vruntime = std::max(max_vruntime, current.vruntime);
}
if (child_active) // Put active child back in heap after vruntime update
{
std::push_heap(items.begin(), items.begin() + heap_size);
}
else // Deactivate child if it is empty, but remember it's vruntime for latter activations
{
heap_size--;
// Store index of this inactive child in `parent.idx`
// This enables O(1) search of inactive children instead of O(n)
current.child->info.parent.idx = heap_size;
}
// Reset any difference between children on busy period end
if (heap_size == 0)
{
// Reset vtime to zero to avoid floating-point error accumulation,
// but do not reset too often, because it's O(N)
UInt64 ns = clock_gettime_ns();
if (last_reset_ns + 1000000000 < ns)
{
last_reset_ns = ns;
for (Item & item : items)
item.vruntime = 0;
max_vruntime = 0;
}
system_vruntime = max_vruntime;
busy_periods++;
}
if (request)
{
dequeued_requests++;
dequeued_cost += request->cost;
return {request, heap_size > 0};
}
}
}
bool isActive() override

View File

@ -39,8 +39,7 @@ public:
void enqueueRequest(ResourceRequest * request) override
{
std::unique_lock lock(mutex);
request->enqueue_ns = clock_gettime_ns();
std::lock_guard lock(mutex);
queue_cost += request->cost;
bool was_empty = requests.empty();
requests.push_back(request);
@ -50,7 +49,7 @@ public:
std::pair<ResourceRequest *, bool> dequeueRequest() override
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
if (requests.empty())
return {nullptr, false};
ResourceRequest * result = requests.front();
@ -63,9 +62,29 @@ public:
return {result, !requests.empty()};
}
bool cancelRequest(ResourceRequest * request) override
{
std::lock_guard lock(mutex);
// TODO(serxa): reimplement queue as intrusive list of ResourceRequest to make this O(1) instead of O(N)
for (auto i = requests.begin(), e = requests.end(); i != e; ++i)
{
if (*i == request)
{
requests.erase(i);
if (requests.empty())
busy_periods++;
queue_cost -= request->cost;
canceled_requests++;
canceled_cost += request->cost;
return true;
}
}
return false;
}
bool isActive() override
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
return !requests.empty();
}
@ -98,14 +117,14 @@ public:
std::pair<UInt64, Int64> getQueueLengthAndCost()
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
return {requests.size(), queue_cost};
}
private:
std::mutex mutex;
Int64 queue_cost = 0;
std::deque<ResourceRequest *> requests;
std::deque<ResourceRequest *> requests; // TODO(serxa): reimplement it using intrusive list to avoid allocations/deallocations and O(N) during cancel
};
}

View File

@ -102,25 +102,31 @@ public:
std::pair<ResourceRequest *, bool> dequeueRequest() override
{
if (items.empty())
return {nullptr, false};
// Recursively pull request from child
auto [request, child_active] = items.front().child->dequeueRequest();
assert(request != nullptr);
// Deactivate child if it is empty
if (!child_active)
// Cycle is required to do deactivations in the case of canceled requests, when dequeueRequest returns `nullptr`
while (true)
{
std::pop_heap(items.begin(), items.end());
items.pop_back();
if (items.empty())
busy_periods++;
}
return {nullptr, false};
dequeued_requests++;
dequeued_cost += request->cost;
return {request, !items.empty()};
// Recursively pull request from child
auto [request, child_active] = items.front().child->dequeueRequest();
// Deactivate child if it is empty
if (!child_active)
{
std::pop_heap(items.begin(), items.end());
items.pop_back();
if (items.empty())
busy_periods++;
}
if (request)
{
dequeued_requests++;
dequeued_cost += request->cost;
return {request, !items.empty()};
}
}
}
bool isActive() override

View File

@ -38,7 +38,6 @@ TEST(SchedulerDynamicResourceManager, Smoke)
{
ResourceGuard gA(cA->get("res1"), ResourceGuard::PostponeLocking);
gA.lock();
gA.setFailure();
gA.unlock();
ResourceGuard gB(cB->get("res1"));

View File

@ -4,6 +4,7 @@
#include <Common/Scheduler/Nodes/tests/ResourceTest.h>
#include <barrier>
#include <future>
using namespace DB;
@ -73,6 +74,22 @@ struct ResourceHolder
}
};
struct MyRequest : public ResourceRequest
{
std::function<void()> on_execute;
explicit MyRequest(ResourceCost cost_, std::function<void()> on_execute_)
: ResourceRequest(cost_)
, on_execute(on_execute_)
{}
void execute() override
{
if (on_execute)
on_execute();
}
};
TEST(SchedulerRoot, Smoke)
{
ResourceTest t;
@ -111,3 +128,49 @@ TEST(SchedulerRoot, Smoke)
EXPECT_TRUE(fc2->requests.contains(&rg.request));
}
}
TEST(SchedulerRoot, Cancel)
{
ResourceTest t;
ResourceHolder r1(t);
auto * fc1 = r1.add<ConstraintTest>("/", "<max_requests>1</max_requests>");
r1.add<PriorityPolicy>("/prio");
auto a = r1.addQueue("/prio/A", "<priority>1</priority>");
auto b = r1.addQueue("/prio/B", "<priority>2</priority>");
r1.registerResource();
std::barrier sync(2);
std::thread consumer1([&]
{
std::barrier destruct_sync(2);
MyRequest request(1,[&]
{
sync.arrive_and_wait(); // (A)
EXPECT_TRUE(fc1->requests.contains(&request));
sync.arrive_and_wait(); // (B)
request.finish();
destruct_sync.arrive_and_wait(); // (C)
});
a.queue->enqueueRequest(&request);
destruct_sync.arrive_and_wait(); // (C)
});
std::thread consumer2([&]
{
MyRequest request(1,[&]
{
FAIL() << "This request must be canceled, but instead executes";
});
sync.arrive_and_wait(); // (A) wait for request of consumer1 to be inside execute, so that constraint is in violated state and our request will not be executed immediately
b.queue->enqueueRequest(&request);
bool canceled = b.queue->cancelRequest(&request);
EXPECT_TRUE(canceled);
sync.arrive_and_wait(); // (B) release request of consumer1 to be finished
});
consumer1.join();
consumer2.join();
EXPECT_TRUE(fc1->requests.empty());
}

View File

@ -71,8 +71,7 @@ public:
// lock(mutex) is not required because `Dequeued` request cannot be used by the scheduler thread
chassert(state == Dequeued);
state = Finished;
if (constraint)
constraint->finishRequest(this);
ResourceRequest::finish();
}
static Request & local()
@ -126,12 +125,6 @@ public:
}
}
/// Mark request as unsuccessful; by default request is considered to be successful
void setFailure()
{
request.successful = false;
}
ResourceLink link;
Request & request;
};

View File

@ -0,0 +1,13 @@
#include <Common/Scheduler/ResourceRequest.h>
#include <Common/Scheduler/ISchedulerConstraint.h>
namespace DB
{
void ResourceRequest::finish()
{
if (constraint)
constraint->finishRequest(this);
}
}

View File

@ -14,9 +14,6 @@ class ISchedulerConstraint;
using ResourceCost = Int64;
constexpr ResourceCost ResourceCostMax = std::numeric_limits<int>::max();
/// Timestamps (nanoseconds since epoch)
using ResourceNs = UInt64;
/*
* Request for a resource consumption. The main moving part of the scheduling subsystem.
* Resource requests processing workflow:
@ -31,7 +28,7 @@ using ResourceNs = UInt64;
* 3) Scheduler calls ISchedulerNode::dequeueRequest() that returns the request.
* 4) Callback ResourceRequest::execute() is called to provide access to the resource.
* 5) The resource consumption is happening outside of the scheduling subsystem.
* 6) request->constraint->finishRequest() is called when consumption is finished.
* 6) ResourceRequest::finish() is called when consumption is finished.
*
* Steps (5) and (6) can be omitted if constraint is not used by the resource.
*
@ -39,7 +36,10 @@ using ResourceNs = UInt64;
* Request ownership is done outside of the scheduling subsystem.
* After (6) request can be destructed safely.
*
* Request cancelling is not supported yet.
* Request can also be canceled before (3) using ISchedulerQueue::cancelRequest().
* Returning false means it is too late for request to be canceled. It should be processed in a regular way.
* Returning true means successful cancel and therefore steps (4) and (5) are not going to happen
* and step (6) MUST be omitted.
*/
class ResourceRequest
{
@ -48,32 +48,20 @@ public:
/// NOTE: If cost is not known in advance, ResourceBudget should be used (note that every ISchedulerQueue has it)
ResourceCost cost;
/// Request outcome
/// Should be filled during resource consumption
bool successful;
/// Scheduler node to be notified on consumption finish
/// Auto-filled during request enqueue/dequeue
ISchedulerConstraint * constraint;
/// Timestamps for introspection
ResourceNs enqueue_ns;
ResourceNs execute_ns;
ResourceNs finish_ns;
explicit ResourceRequest(ResourceCost cost_ = 1)
{
reset(cost_);
}
/// ResourceRequest object may be reused again after reset()
void reset(ResourceCost cost_)
{
cost = cost_;
successful = true;
constraint = nullptr;
enqueue_ns = 0;
execute_ns = 0;
finish_ns = 0;
}
virtual ~ResourceRequest() = default;
@ -83,6 +71,12 @@ public:
/// just triggering start of a consumption, not doing the consumption itself
/// (e.g. setting an std::promise or creating a job in a thread pool)
virtual void execute() = 0;
/// Stop resource consumption and notify resource scheduler.
/// Should be called when resource consumption is finished by consumer.
/// ResourceRequest should not be destructed or reset before calling to `finish()`.
/// WARNING: this function MUST not be called if request was canceled.
void finish();
};
}

View File

@ -145,22 +145,27 @@ public:
std::pair<ResourceRequest *, bool> dequeueRequest() override
{
if (current == nullptr) // No active resources
return {nullptr, false};
while (true)
{
if (current == nullptr) // No active resources
return {nullptr, false};
// Dequeue request from current resource
auto [request, resource_active] = current->root->dequeueRequest();
assert(request != nullptr);
// Dequeue request from current resource
auto [request, resource_active] = current->root->dequeueRequest();
// Deactivate resource if required
if (!resource_active)
deactivate(current);
else
current = current->next; // Just move round-robin pointer
// Deactivate resource if required
if (!resource_active)
deactivate(current);
else
current = current->next; // Just move round-robin pointer
dequeued_requests++;
dequeued_cost += request->cost;
return {request, current != nullptr};
if (request == nullptr) // Possible in case of request cancel, just retry
continue;
dequeued_requests++;
dequeued_cost += request->cost;
return {request, current != nullptr};
}
}
bool isActive() override
@ -245,7 +250,6 @@ private:
void execute(ResourceRequest * request)
{
request->execute_ns = clock_gettime_ns();
request->execute();
}

View File

@ -30,7 +30,9 @@ ColumnsDescription StorageSystemScheduler::getColumnsDescription()
{"is_active", std::make_shared<DataTypeUInt8>(), "Whether this node is currently active - has resource requests to be dequeued and constraints satisfied."},
{"active_children", std::make_shared<DataTypeUInt64>(), "The number of children in active state."},
{"dequeued_requests", std::make_shared<DataTypeUInt64>(), "The total number of resource requests dequeued from this node."},
{"canceled_requests", std::make_shared<DataTypeUInt64>(), "The total number of resource requests canceled from this node."},
{"dequeued_cost", std::make_shared<DataTypeInt64>(), "The sum of costs (e.g. size in bytes) of all requests dequeued from this node."},
{"canceled_cost", std::make_shared<DataTypeInt64>(), "The sum of costs (e.g. size in bytes) of all requests canceled from this node."},
{"busy_periods", std::make_shared<DataTypeUInt64>(), "The total number of deactivations of this node."},
{"vruntime", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeFloat64>()),
"For children of `fair` nodes only. Virtual runtime of a node used by SFQ algorithm to select the next child to process in a max-min fair manner."},
@ -93,7 +95,9 @@ void StorageSystemScheduler::fillData(MutableColumns & res_columns, ContextPtr c
res_columns[i++]->insert(node->isActive());
res_columns[i++]->insert(node->activeChildren());
res_columns[i++]->insert(node->dequeued_requests.load());
res_columns[i++]->insert(node->canceled_requests.load());
res_columns[i++]->insert(node->dequeued_cost.load());
res_columns[i++]->insert(node->canceled_cost.load());
res_columns[i++]->insert(node->busy_periods.load());
Field vruntime;