bugfix: wrong estimated cost passed for budget adjusting

This commit is contained in:
serxa 2024-06-14 13:01:14 +00:00
parent 937e170825
commit a5eeeb3422
4 changed files with 9 additions and 46 deletions

View File

@ -22,10 +22,13 @@ public:
{}
// Wrapper for `enqueueRequest()` that should be used to account for available resource budget
void enqueueRequestUsingBudget(ResourceRequest * request)
// Returns `estimated_cost` that should be passed later to `adjustBudget()`
[[ nodiscard ]] ResourceCost enqueueRequestUsingBudget(ResourceRequest * request)
{
request->cost = budget.ask(request->cost);
ResourceCost estimated_cost = request->cost;
request->cost = budget.ask(estimated_cost);
enqueueRequest(request);
return estimated_cost;
}
// Should be called to account for difference between real and estimated costs
@ -34,18 +37,6 @@ public:
budget.adjust(estimated_cost, real_cost);
}
// Adjust budget to account for extra consumption of `cost` resource units
void consumeBudget(ResourceCost cost)
{
adjustBudget(0, cost);
}
// Adjust budget to account for requested, but not consumed `cost` resource units
void accumulateBudget(ResourceCost cost)
{
adjustBudget(cost, 0);
}
/// Enqueue new request to be executed using underlying resource.
/// Should be called outside of scheduling subsystem, implementation must be thread-safe.
virtual void enqueueRequest(ResourceRequest * request) = 0;

View File

@ -1,25 +0,0 @@
#include <Common/Scheduler/ISchedulerQueue.h>
#include <Common/Scheduler/ResourceLink.h>
#include <Common/Scheduler/ResourceRequest.h>
namespace DB
{
void ResourceLink::adjust(ResourceCost estimated_cost, ResourceCost real_cost) const
{
if (queue)
queue->adjustBudget(estimated_cost, real_cost);
}
void ResourceLink::consumed(ResourceCost cost) const
{
if (queue)
queue->consumeBudget(cost);
}
void ResourceLink::accumulate(DB::ResourceCost cost) const
{
if (queue)
queue->accumulateBudget(cost);
}
}

View File

@ -96,7 +96,7 @@ public:
chassert(state == Finished);
state = Enqueued;
ResourceRequest::reset(cost_);
link_.queue->enqueueRequestUsingBudget(this);
estimated_cost = link_.queue->enqueueRequestUsingBudget(this); // NOTE: it modifies `cost` and enqueues request
}
// This function is executed inside scheduler thread and wakes thread issued this `request`.
@ -124,8 +124,8 @@ public:
// lock(mutex) is not required because `Dequeued` request cannot be used by the scheduler thread
chassert(state == Dequeued);
state = Finished;
if (cost != real_cost_)
link_.adjust(cost, real_cost_);
if (estimated_cost != real_cost_)
link_.queue->adjustBudget(estimated_cost, real_cost_);
ResourceRequest::finish();
ProfileEvents::increment(metrics->requests);
ProfileEvents::increment(metrics->cost, real_cost_);
@ -149,6 +149,7 @@ public:
const Metrics * metrics = nullptr; // Must be initialized before use
private:
ResourceCost estimated_cost = 0; // Stores initial `cost` value in case budget was used to modify it
std::mutex mutex;
std::condition_variable dequeued_cv;
RequestState state = Finished;

View File

@ -17,10 +17,6 @@ struct ResourceLink
bool operator==(const ResourceLink &) const = default;
explicit operator bool() const { return queue != nullptr; }
void adjust(ResourceCost estimated_cost, ResourceCost real_cost) const;
void consumed(ResourceCost cost) const;
void accumulate(ResourceCost cost) const;
void reset()
{
queue = nullptr;