make ResourceGuards more straight-forward

This commit is contained in:
serxa 2024-06-13 18:58:39 +00:00
parent f1f354f22b
commit 937e170825
10 changed files with 50 additions and 53 deletions

View File

@ -276,21 +276,18 @@ struct ResourceGuardSessionDataHooks : public Poco::Net::IHTTPSessionDataHooks
void start(int bytes) override
{
// TODO(serxa): add metrics here or better in scheduler code (e.g. during enqueue, or better in REsourceGuard::REquest)?
request.enqueue(bytes, link);
request.wait();
}
void finish(int bytes) override
{
request.finish();
link.adjust(request.cost, bytes); // success
request.finish(bytes, link);
}
void fail() override
{
request.finish();
link.accumulate(request.cost); // We assume no resource was used in case of failure
request.finish(0, link);
}
ResourceLink link;
@ -466,6 +463,9 @@ private:
}
}
response_stream = nullptr;
// FIXME: We are not sure that response stream is fully read at this moment, so hooks could possible be called after this point, right?
// Session::setSendDataHooks();
// Session::setReceiveDataHooks();
group->atConnectionDestroy();

View File

@ -232,12 +232,13 @@ struct ResourceTestManager : public ResourceTestBase
ResourceTestManager & t;
Guard(ResourceTestManager & t_, ResourceLink link_, ResourceCost cost)
: ResourceGuard(ResourceGuard::Metrics::getIOWrite(), link_, cost, PostponeLocking)
: ResourceGuard(ResourceGuard::Metrics::getIOWrite(), link_, cost, Lock::Postpone)
, t(t_)
{
t.onEnqueue(link);
lock();
t.onExecute(link);
consume(cost);
}
};
@ -310,8 +311,9 @@ struct ResourceTestManager : public ResourceTestBase
// NOTE: actually leader's request(s) make their own small busy period.
void blockResource(ResourceLink link)
{
ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, 1, ResourceGuard::PostponeLocking);
ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, 1, ResourceGuard::Lock::Postpone);
g.lock();
g.consume(1);
// NOTE: at this point we assume resource to be blocked by single request (<max_requests>1</max_requests>)
busy_period.arrive_and_wait(); // (1) notify all followers that resource is blocked
busy_period.arrive_and_wait(); // (2) wait all followers to enqueue their requests
@ -320,10 +322,11 @@ struct ResourceTestManager : public ResourceTestBase
{
getLinkData(link).left += total_requests + 1;
busy_period.arrive_and_wait(); // (1) wait leader to block resource
ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, cost, ResourceGuard::PostponeLocking);
ResourceGuard g(ResourceGuard::Metrics::getIOWrite(), link, cost, ResourceGuard::Lock::Postpone);
onEnqueue(link);
busy_period.arrive_and_wait(); // (2) notify leader to unblock
g.lock();
g.consume(cost);
onExecute(link);
}
};

View File

@ -36,11 +36,16 @@ TEST(SchedulerDynamicResourceManager, Smoke)
for (int i = 0; i < 10; i++)
{
ResourceGuard gA(ResourceGuard::Metrics::getIOWrite(), cA->get("res1"), ResourceGuard::PostponeLocking);
ResourceGuard gA(ResourceGuard::Metrics::getIOWrite(), cA->get("res1"), 1, ResourceGuard::Lock::Postpone);
gA.lock();
gA.consume(1);
gA.unlock();
ResourceGuard gB(ResourceGuard::Metrics::getIOWrite(), cB->get("res1"));
gB.unlock();
ResourceGuard gC(ResourceGuard::Metrics::getIORead(), cB->get("res1"));
gB.consume(2);
}
}

View File

@ -111,21 +111,25 @@ TEST(SchedulerRoot, Smoke)
{
ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), a);
EXPECT_TRUE(fc1->requests.contains(&rg.request));
rg.consume(1);
}
{
ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), b);
EXPECT_TRUE(fc1->requests.contains(&rg.request));
rg.consume(1);
}
{
ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), c);
EXPECT_TRUE(fc2->requests.contains(&rg.request));
rg.consume(1);
}
{
ResourceGuard rg(ResourceGuard::Metrics::getIOWrite(), d);
EXPECT_TRUE(fc2->requests.contains(&rg.request));
rg.consume(1);
}
}

View File

@ -42,12 +42,12 @@ namespace DB
class ResourceGuard
{
public:
enum ResourceGuardCtor
enum class Lock
{
LockStraightAway, /// Locks inside constructor (default)
StraightAway, /// Locks inside constructor (default)
// WARNING: Only for tests. It is not exception-safe because `lock()` must be called after construction.
PostponeLocking /// Don't lock in constructor, but send request
Postpone /// Don't lock in constructor, but send request
};
struct Metrics
@ -96,8 +96,6 @@ public:
chassert(state == Finished);
state = Enqueued;
ResourceRequest::reset(cost_);
ProfileEvents::increment(metrics->requests);
ProfileEvents::increment(metrics->cost, cost_);
link_.queue->enqueueRequestUsingBudget(this);
}
@ -121,12 +119,16 @@ public:
dequeued_cv.wait(lock, [this] { return state == Dequeued; });
}
void finish()
void finish(ResourceCost real_cost_, ResourceLink link_)
{
// lock(mutex) is not required because `Dequeued` request cannot be used by the scheduler thread
chassert(state == Dequeued);
state = Finished;
if (cost != real_cost_)
link_.adjust(cost, real_cost_);
ResourceRequest::finish();
ProfileEvents::increment(metrics->requests);
ProfileEvents::increment(metrics->cost, real_cost_);
}
void assertFinished()
@ -153,7 +155,7 @@ public:
};
/// Creates pending request for resource; blocks while resource is not available (unless `PostponeLocking`)
explicit ResourceGuard(const Metrics * metrics, ResourceLink link_, ResourceCost cost = 1, ResourceGuardCtor ctor = LockStraightAway)
explicit ResourceGuard(const Metrics * metrics, ResourceLink link_, ResourceCost cost = 1, ResourceGuard::Lock type = ResourceGuard::Lock::StraightAway)
: link(link_)
, request(Request::local(metrics))
{
@ -162,7 +164,7 @@ public:
else if (link)
{
request.enqueue(cost, link);
if (ctor == LockStraightAway)
if (type == Lock::StraightAway)
request.wait();
}
}
@ -179,18 +181,25 @@ public:
request.wait();
}
/// Report resource consumption has finished
void unlock()
void consume(ResourceCost cost)
{
real_cost += cost;
}
/// Report resource consumption has finished
void unlock(ResourceCost consumed = 0)
{
consume(consumed);
if (link)
{
request.finish();
request.finish(real_cost, link);
link.reset();
}
}
ResourceLink link;
Request & request;
ResourceCost real_cost = 0;
};
}

View File

@ -45,7 +45,7 @@ constexpr ResourceCost ResourceCostMax = std::numeric_limits<int>::max();
class ResourceRequest : public boost::intrusive::list_base_hook<>
{
public:
/// Cost of request execution; should be filled before request enqueueing.
/// Cost of request execution; should be filled before request enqueueing and remain constant until `finish()`.
/// NOTE: If cost is not known in advance, ResourceBudget should be used (note that every ISchedulerQueue has it)
ResourceCost cost;

View File

@ -116,15 +116,13 @@ bool ReadBufferFromAzureBlobStorage::nextImpl()
{
ResourceGuard rlock(ResourceGuard::Metrics::getIORead(), read_settings.io_scheduling.read_resource_link, to_read_bytes);
bytes_read = data_stream->ReadToCount(reinterpret_cast<uint8_t *>(data_ptr), to_read_bytes);
read_settings.io_scheduling.read_resource_link.adjust(to_read_bytes, bytes_read);
rlock.unlock(); // Do not hold resource under bandwidth throttler
rlock.unlock(bytes_read); // Do not hold resource under bandwidth throttler
if (read_settings.remote_throttler)
read_settings.remote_throttler->add(bytes_read, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds);
break;
}
catch (const Azure::Core::RequestFailedException & e)
{
read_settings.io_scheduling.read_resource_link.accumulate(to_read_bytes); // We assume no resource was used in case of failure
ProfileEvents::increment(ProfileEvents::ReadBufferFromAzureRequestsErrors);
LOG_DEBUG(log, "Exception caught during Azure Read for file {} at attempt {}/{}: {}", path, i + 1, max_single_read_retries, e.Message);

View File

@ -94,13 +94,11 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func,
{
ResourceGuard rlock(ResourceGuard::Metrics::getIOWrite(), write_settings.io_scheduling.write_resource_link, cost); // Note that zero-cost requests are ignored
func();
rlock.unlock(cost);
break;
}
catch (const Azure::Core::RequestFailedException & e)
{
if (cost)
write_settings.io_scheduling.write_resource_link.accumulate(cost); // Accumulate resource for later use, because we have failed to consume it
if (i == num_tries - 1 || !isRetryableAzureException(e))
throw;
@ -108,8 +106,6 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func,
}
catch (...)
{
if (cost)
write_settings.io_scheduling.write_resource_link.accumulate(cost); // We assume no resource was used in case of failure
throw;
}
}

View File

@ -119,18 +119,9 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
return false;
}
int bytes_read;
try
{
ResourceGuard rlock(ResourceGuard::Metrics::getIORead(), read_settings.io_scheduling.read_resource_link, num_bytes_to_read);
bytes_read = hdfsRead(fs.get(), fin, internal_buffer.begin(), safe_cast<int>(num_bytes_to_read));
read_settings.io_scheduling.read_resource_link.adjust(num_bytes_to_read, std::max(0, bytes_read));
}
catch (...)
{
read_settings.io_scheduling.read_resource_link.accumulate(num_bytes_to_read); // We assume no resource was used in case of failure
throw;
}
ResourceGuard rlock(ResourceGuard::Metrics::getIORead(), read_settings.io_scheduling.read_resource_link, num_bytes_to_read);
int bytes_read = hdfsRead(fs.get(), fin, internal_buffer.begin(), safe_cast<int>(num_bytes_to_read));
rlock.unlock(std::max(0, bytes_read));
if (bytes_read < 0)
{

View File

@ -66,18 +66,9 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
int write(const char * start, size_t size)
{
int bytes_written;
try
{
ResourceGuard rlock(ResourceGuard::Metrics::getIOWrite(), write_settings.io_scheduling.write_resource_link, size);
bytes_written = hdfsWrite(fs.get(), fout, start, safe_cast<int>(size));
write_settings.io_scheduling.write_resource_link.adjust(size, std::max(0, bytes_written));
}
catch (...)
{
write_settings.io_scheduling.write_resource_link.accumulate(size); // We assume no resource was used in case of failure
throw;
}
ResourceGuard rlock(ResourceGuard::Metrics::getIOWrite(), write_settings.io_scheduling.write_resource_link, size);
int bytes_written = hdfsWrite(fs.get(), fout, start, safe_cast<int>(size));
rlock.unlock(std::max(0, bytes_written));
if (bytes_written < 0)
throw Exception(ErrorCodes::NETWORK_ERROR, "Fail to write HDFS file: {} {}", hdfs_uri, std::string(hdfsGetLastError()));