Merge pull request #45711 from ClickHouse/io-scheduler-integration

Integrate IO scheduler with buffers for remote reads and writes
This commit is contained in:
Sergei Trifonov 2023-02-11 10:26:58 +01:00 committed by GitHub
commit 2931c3bbe1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 309 additions and 61 deletions

View File

@ -6,6 +6,7 @@
#include <Common/getRandomASCIIString.h>
#include <Common/logger_useful.h>
#include <Common/Throttler.h>
#include <IO/ResourceGuard.h>
namespace ProfileEvents
@ -40,10 +41,13 @@ WriteBufferFromAzureBlobStorage::~WriteBufferFromAzureBlobStorage()
finalize();
}
void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func, size_t num_tries)
void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func, size_t num_tries, size_t cost)
{
auto handle_exception = [&](const auto & e, size_t i)
auto handle_exception = [&, this](const auto & e, size_t i)
{
if (cost)
write_settings.resource_link.accumulate(cost); // Accumulate resource for later use, because we have failed to consume it
if (i == num_tries - 1)
throw;
@ -54,6 +58,7 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func,
{
try
{
ResourceGuard rlock(write_settings.resource_link, cost); // Note that zero-cost requests are ignored
func();
break;
}
@ -65,6 +70,12 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func,
{
handle_exception(e, i);
}
catch (...)
{
if (cost)
write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure
throw;
}
}
}
@ -87,7 +98,7 @@ void WriteBufferFromAzureBlobStorage::uploadBlock(const char * data, size_t size
const std::string & block_id = block_ids.emplace_back(getRandomASCIIString(64));
Azure::Core::IO::MemoryBodyStream memory_stream(reinterpret_cast<const uint8_t *>(data), size);
execWithRetry([&](){ block_blob_client.StageBlock(block_id, memory_stream); }, DEFAULT_RETRY_NUM);
execWithRetry([&](){ block_blob_client.StageBlock(block_id, memory_stream); }, DEFAULT_RETRY_NUM, size);
tmp_buffer_write_offset = 0;
LOG_TRACE(log, "Staged block (id: {}) of size {} (blob path: {}).", block_id, size, blob_path);

View File

@ -39,7 +39,7 @@ public:
private:
void finalizeImpl() override;
void execWithRetry(std::function<void()> func, size_t num_tries);
void execWithRetry(std::function<void()> func, size_t num_tries, size_t cost = 0);
void uploadBlock(const char * data, size_t size);
Poco::Logger * log;

View File

@ -1,6 +1,6 @@
#pragma once
#include <IO/ResourceRequest.h>
#include <IO/ResourceLink.h>
#include <Poco/Util/AbstractConfiguration.h>

View File

@ -1,6 +1,8 @@
#pragma once
#include <IO/ISchedulerNode.h>
#include <IO/ResourceBudget.h>
#include <IO/ResourceRequest.h>
#include <memory>
@ -10,17 +12,49 @@ namespace DB
/*
* Queue for pending requests for specific resource, leaf of hierarchy.
* Note that every queue has budget associated with it.
*/
class ISchedulerQueue : public ISchedulerNode
{
public:
ISchedulerQueue(EventQueue * event_queue_, const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
explicit ISchedulerQueue(EventQueue * event_queue_, const Poco::Util::AbstractConfiguration & config = emptyConfig(), const String & config_prefix = {})
: ISchedulerNode(event_queue_, config, config_prefix)
{}
// Wrapper for `enqueueRequest()` that should be used to account for available resource budget
void enqueueRequestUsingBudget(ResourceRequest * request)
{
request->cost = budget.ask(request->cost);
enqueueRequest(request);
}
// Should be called to account for difference between real and estimated costs
void adjustBudget(ResourceCost estimated_cost, ResourceCost real_cost)
{
budget.adjust(estimated_cost, real_cost);
}
// Adjust budget to account for extra consumption of `cost` resource units
void consumeBudget(ResourceCost cost)
{
adjustBudget(0, cost);
}
// Adjust budget to account for requested, but not consumed `cost` resource units
void accumulateBudget(ResourceCost cost)
{
adjustBudget(cost, 0);
}
/// Enqueue new request to be executed using underlying resource.
/// Should be called outside of scheduling subsystem, implementation must be thread-safe.
virtual void enqueueRequest(ResourceRequest * request) = 0;
private:
// Allows multiple consumers to synchronize with common "debit/credit" balance.
// 1) (positive) to avoid wasting of allocated but not used resource (e.g in case of a failure);
// 2) (negative) to account for overconsumption (e.g. if cost is not know in advance and estimation from below is applied).
ResourceBudget budget;
};
}

View File

@ -1,12 +1,12 @@
#include "config.h"
#include "IO/S3Common.h"
#include <IO/S3Common.h>
#if USE_AWS_S3
#include <IO/ReadBufferFromIStream.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ResourceGuard.h>
#include <IO/S3/getObjectInfo.h>
#include <IO/S3/Requests.h>
#include <Common/Stopwatch.h>
@ -323,16 +323,23 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
if (read_settings.for_object_storage)
ProfileEvents::increment(ProfileEvents::DiskS3GetObject);
// We do not know in advance how many bytes we are going to consume, to avoid blocking estimated it from below
constexpr ResourceCost estimated_cost = 1;
ResourceGuard rlock(read_settings.resource_link, estimated_cost);
Aws::S3::Model::GetObjectOutcome outcome = client_ptr->GetObject(req);
rlock.unlock();
if (outcome.IsSuccess())
{
read_result = outcome.GetResultWithOwnership();
ResourceCost bytes_read = outcome.GetResult().GetContentLength();
read_settings.resource_link.adjust(estimated_cost, bytes_read);
size_t buffer_size = use_external_buffer ? 0 : read_settings.remote_fs_buffer_size;
read_result = outcome.GetResultWithOwnership();
return std::make_unique<ReadBufferFromIStream>(read_result.GetBody(), buffer_size);
}
else
{
read_settings.resource_link.accumulate(estimated_cost);
const auto & error = outcome.GetError();
throw S3Exception(error.GetMessage(), error.GetErrorType());
}

View File

@ -5,6 +5,7 @@
#include <Core/Defines.h>
#include <Interpreters/Cache/FileCache_fwd.h>
#include <Common/Throttler_fwd.h>
#include <IO/ResourceLink.h>
namespace DB
{
@ -107,6 +108,9 @@ struct ReadSettings
/// Bandwidth throttler to use during reading
ThrottlerPtr remote_throttler;
// Resource to be used during reading
ResourceLink resource_link;
size_t http_max_tries = 1;
size_t http_retry_initial_backoff_ms = 100;
size_t http_retry_max_backoff_ms = 1600;

View File

@ -37,6 +37,7 @@ TEST(IOResourceStaticResourceManager, Smoke)
for (int i = 0; i < 10; i++)
{
ResourceGuard ga(ca->get("res1"));
ga.unlock();
ResourceGuard gb(cb->get("res1"));
}
}

55
src/IO/ResourceBudget.h Normal file
View File

@ -0,0 +1,55 @@
#pragma once
#include <IO/ResourceRequest.h>
#include <atomic>
namespace DB
{
/*
* Helper class to keep track of requested and consumed amount of resource.
* Useful if real amount of consumed resource can differ from requested amount of resource (e.g. in case of failures).
* Can be safely used from multiple threads.
* Usage example:
* ResourceBudget budget;
* while (!stop) {
* ResourceCost est_cost = myEstimateOfCostOrJustUseOne();
* myAllocateResource(budget.ask(est_cost)); // Ask external system to allocate resource for you
* ResourceCost real_cost = mySynchronousConsumptionOfResource(); // Real consumption can differ from est_cost
* budget.adjust(est_cost, real_cost); // Adjust balance according to the actual cost, may affect the next iteration
* }
*/
class ResourceBudget
{
public:
// Returns amount of resource to be requested according to current balance and estimated cost of new consumption
ResourceCost ask(ResourceCost estimated_cost)
{
ResourceCost budget = available.load();
while (true)
{
// Valid resource request must have positive `cost`. Also takes consumption history into account.
ResourceCost cost = std::max<ResourceCost>(1ll, estimated_cost - budget);
// Assume every request is satisfied (no resource request cancellation is possible now)
// So we requested additional `cost` units and are going to consume `estimated_cost`
ResourceCost new_budget = budget + cost - estimated_cost;
// Try to commit this transaction
if (new_budget == budget || available.compare_exchange_strong(budget, new_budget))
return cost;
}
}
// Should be called to account for difference between real and estimated costs
// Optional. May be skipped if `real_cost` is known in advance (equals `estimated_cost`).
void adjust(ResourceCost estimated_cost, ResourceCost real_cost)
{
available.fetch_add(estimated_cost - real_cost);
}
private:
std::atomic<ResourceCost> available = 0; // requested - consumed
};
}

View File

@ -3,10 +3,12 @@
#include <base/types.h>
#include <IO/ResourceRequest.h>
#include <IO/ISchedulerQueue.h>
#include <IO/ResourceLink.h>
#include <IO/ISchedulerConstraint.h>
#include <future>
#include <condition_variable>
#include <mutex>
namespace DB
{
@ -14,44 +16,91 @@ namespace DB
/*
* Scoped resource guard.
* Waits for resource to be available in constructor and releases resource in destructor
* IMPORTANT: multiple resources should not be locked concurrently by a single thread
*/
class ResourceGuard
{
public:
enum ResourceGuardCtor
{
LockStraightAway, /// Lock inside constructor (default)
PostponeLocking /// Don't lock in constructor, but during later `lock()` call
LockStraightAway, /// Locks inside constructor (default)
// WARNING: Only for tests. It is not exception-safe because `lock()` must be called after construction.
PostponeLocking /// Don't lock in constructor, but send request
};
struct Request : public ResourceRequest
enum RequestState
{
/// Promise to be set on request execution
std::promise<void> dequeued;
Finished, // Last request has already finished; no concurrent access is possible
Enqueued, // Enqueued into the scheduler; thread-safe access is required
Dequeued // Dequeued from the scheduler and is in consumption state; no concurrent access is possible
};
explicit Request(ResourceCost cost_ = 1)
: ResourceRequest(cost_)
{}
class Request : public ResourceRequest
{
public:
void enqueue(ResourceCost cost_, ResourceLink link_)
{
// lock(mutex) is not required because `Finished` request cannot be used by the scheduler thread
chassert(state == Finished);
state = Enqueued;
ResourceRequest::reset(cost_);
link_.queue->enqueueRequestUsingBudget(this);
}
// This function is executed inside scheduler thread and wakes thread issued this `request`.
// That thread will continue execution and do real consumption of requested resource synchronously.
void execute() override
{
// This function is executed inside scheduler thread and wakes thread issued this `request` (using ResourceGuard)
// That thread will continue execution and do real consumption of requested resource synchronously.
dequeued.set_value();
{
std::unique_lock lock(mutex);
chassert(state == Enqueued);
state = Dequeued;
}
dequeued_cv.notify_one();
}
void wait()
{
std::unique_lock lock(mutex);
dequeued_cv.wait(lock, [this] { return state == Dequeued; });
}
void finish()
{
// lock(mutex) is not required because `Dequeued` request cannot be used by the scheduler thread
chassert(state == Dequeued);
state = Finished;
if (constraint)
constraint->finishRequest(this);
}
static Request & local()
{
// Since single thread cannot use more than one resource request simultaneously,
// we can reuse thread-local request to avoid allocations
static thread_local Request instance;
return instance;
}
private:
std::mutex mutex;
std::condition_variable dequeued_cv;
RequestState state = Finished;
};
/// Creates pending request for resource; blocks while resource is not available (unless `PostponeLocking`)
explicit ResourceGuard(ResourceLink link_, ResourceCost cost = 1, ResourceGuardCtor ctor = LockStraightAway)
: link(link_)
, request(cost)
, request(Request::local())
{
if (link.queue)
if (cost == 0)
link.queue = nullptr; // Ignore zero-cost requests
else if (link.queue)
{
dequeued_future = request.dequeued.get_future();
link.queue->enqueueRequest(&request);
request.enqueue(cost, link);
if (ctor == LockStraightAway)
lock();
request.wait();
}
}
@ -64,17 +113,16 @@ public:
void lock()
{
if (link.queue)
dequeued_future.get();
request.wait();
}
/// Report request execution has finished
/// Report resource consumption has finished
void unlock()
{
if (link.queue)
{
assert(!dequeued_future.valid()); // unlock must be called only after lock()
if (request.constraint)
request.constraint->finishRequest(&request);
request.finish();
link.queue = nullptr;
}
}
@ -84,10 +132,8 @@ public:
request.successful = false;
}
public:
ResourceLink link;
Request request;
std::future<void> dequeued_future;
Request & request;
};
}

39
src/IO/ResourceLink.h Normal file
View File

@ -0,0 +1,39 @@
#pragma once
#include <base/types.h>
#include <IO/ResourceRequest.h>
#include <IO/ISchedulerQueue.h>
namespace DB
{
/*
* Everything required for resource consumption. Connection to a specific resource queue.
*/
struct ResourceLink
{
ISchedulerQueue * queue = nullptr;
bool operator==(const ResourceLink &) const = default;
void adjust(ResourceCost estimated_cost, ResourceCost real_cost) const
{
if (queue)
queue->adjustBudget(estimated_cost, real_cost);
}
void consumed(ResourceCost cost) const
{
if (queue)
queue->consumeBudget(cost);
}
void accumulate(ResourceCost cost) const
{
if (queue)
queue->accumulateBudget(cost);
}
};
}

View File

@ -8,29 +8,15 @@ namespace DB
// Forward declarations
class ISchedulerQueue;
class ISchedulerNode;
class ISchedulerConstraint;
/// Cost in terms of used resource (e.g. bytes for network IO)
using ResourceCost = Int64;
constexpr ResourceCost ResourceCostMax = std::numeric_limits<int>::max();
/// Internal identifier of a resource (for arrays; unique per scheduler)
using ResourceIdx = size_t;
constexpr ResourceIdx ResourceIdxNotSet = ResourceIdx(-1);
/// Timestamps (nanoseconds since epoch)
using ResourceNs = UInt64;
/*
* Info required for resource consumption.
*/
struct ResourceLink
{
ISchedulerQueue * queue = nullptr;
bool operator==(const ResourceLink &) const = default;
};
/*
* Request for a resource consumption. The main moving part of the scheduling subsystem.
* Resource requests processing workflow:
@ -65,25 +51,36 @@ public:
/// Request outcome
/// Should be filled during resource consumption
bool successful = true;
bool successful;
/// Scheduler node to be notified on consumption finish
/// Auto-filled during request enqueue/dequeue
ISchedulerConstraint * constraint = nullptr;
ISchedulerConstraint * constraint;
/// Timestamps for introspection
ResourceNs enqueue_ns = 0;
ResourceNs execute_ns = 0;
ResourceNs finish_ns = 0;
ResourceNs enqueue_ns;
ResourceNs execute_ns;
ResourceNs finish_ns;
explicit ResourceRequest(ResourceCost cost_ = 1)
: cost(cost_)
{}
{
reset(cost_);
}
void reset(ResourceCost cost_)
{
cost = cost_;
successful = true;
constraint = nullptr;
enqueue_ns = 0;
execute_ns = 0;
finish_ns = 0;
}
virtual ~ResourceRequest() = default;
/// Callback to trigger resource consumption.
/// IMPORTANT: is called from scheduler thread and must be fast,
/// IMPORTANT: it is called from scheduler thread and must be fast,
/// just triggering start of a consumption, not doing the consumption itself
/// (e.g. setting an std::promise or creating a job in a thread pool)
virtual void execute() = 0;

View File

@ -1,12 +1,13 @@
#include "config.h"
#include <Common/ProfileEvents.h>
#if USE_AWS_S3
#include <Common/logger_useful.h>
#include <Common/ProfileEvents.h>
#include <Common/Throttler.h>
#include <Interpreters/Cache/FileCache.h>
#include <IO/ResourceGuard.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/WriteHelpers.h>
#include <IO/S3Common.h>
@ -342,7 +343,10 @@ void WriteBufferFromS3::processUploadRequest(UploadPartTask & task)
if (write_settings.for_object_storage)
ProfileEvents::increment(ProfileEvents::DiskS3UploadPart);
ResourceCost cost = task.req.GetContentLength();
ResourceGuard rlock(write_settings.resource_link, cost);
auto outcome = client_ptr->UploadPart(task.req);
rlock.unlock(); // Avoid acquiring other locks under resource lock
if (outcome.IsSuccess())
{
@ -351,7 +355,10 @@ void WriteBufferFromS3::processUploadRequest(UploadPartTask & task)
LOG_TRACE(log, "Writing part finished. Bucket: {}, Key: {}, Upload_id: {}, Etag: {}, Parts: {}", bucket, key, multipart_upload_id, task.tag, part_tags.size());
}
else
{
write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
}
}
void WriteBufferFromS3::completeMultipartUpload()
@ -491,7 +498,12 @@ void WriteBufferFromS3::processPutRequest(const PutObjectTask & task)
ProfileEvents::increment(ProfileEvents::S3PutObject);
if (write_settings.for_object_storage)
ProfileEvents::increment(ProfileEvents::DiskS3PutObject);
ResourceCost cost = task.req.GetContentLength();
ResourceGuard rlock(write_settings.resource_link, cost);
auto outcome = client_ptr->PutObject(task.req);
rlock.unlock();
bool with_pool = static_cast<bool>(schedule);
if (outcome.IsSuccess())
{
@ -500,14 +512,18 @@ void WriteBufferFromS3::processPutRequest(const PutObjectTask & task)
}
else if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY)
{
write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure
/// For unknown reason, at least MinIO can respond with NO_SUCH_KEY for put requests
LOG_INFO(log, "Single part upload failed with NO_SUCH_KEY error for Bucket: {}, Key: {}, Object size: {}, WithPool: {}, will retry", bucket, key, task.req.GetContentLength(), with_pool);
}
else
{
write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure
throw S3Exception(
outcome.GetError().GetErrorType(),
"Message: {}, Key: {}, Bucket: {}, Object size: {}, WithPool: {}",
outcome.GetError().GetMessage(), key, bucket, task.req.GetContentLength(), with_pool);
}
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Common/Throttler_fwd.h>
#include <IO/ResourceLink.h>
namespace DB
{
@ -11,6 +12,9 @@ struct WriteSettings
/// Bandwidth throttler to use during writing
ThrottlerPtr remote_throttler;
// Resource to be used during reading
ResourceLink resource_link;
/// Filesystem cache settings
bool enable_filesystem_cache_on_write_operations = false;
bool enable_filesystem_cache_log = false;

View File

@ -2,6 +2,7 @@
#if USE_HDFS
#include <Storages/HDFS/HDFSCommon.h>
#include <IO/ResourceGuard.h>
#include <Common/Throttler.h>
#include <Common/safe_cast.h>
#include <hdfs/hdfs.h>
@ -97,11 +98,27 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
num_bytes_to_read = internal_buffer.size();
}
int bytes_read = hdfsRead(fs.get(), fin, internal_buffer.begin(), safe_cast<int>(num_bytes_to_read));
ResourceGuard rlock(read_settings.resource_link, num_bytes_to_read);
int bytes_read;
try
{
bytes_read = hdfsRead(fs.get(), fin, internal_buffer.begin(), safe_cast<int>(num_bytes_to_read));
}
catch (...)
{
read_settings.resource_link.accumulate(num_bytes_to_read); // We assume no resource was used in case of failure
throw;
}
rlock.unlock();
if (bytes_read < 0)
{
read_settings.resource_link.accumulate(num_bytes_to_read); // We assume no resource was used in case of failure
throw Exception(ErrorCodes::NETWORK_ERROR,
"Fail to read from HDFS: {}, file path: {}. Error: {}",
hdfs_uri, hdfs_file_path, std::string(hdfsGetLastError()));
}
read_settings.resource_link.adjust(num_bytes_to_read, bytes_read);
if (bytes_read)
{

View File

@ -4,6 +4,7 @@
#include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <IO/ResourceGuard.h>
#include <Common/Throttler.h>
#include <Common/safe_cast.h>
#include <hdfs/hdfs.h>
@ -62,11 +63,27 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
}
int write(const char * start, size_t size) const
int write(const char * start, size_t size)
{
int bytes_written = hdfsWrite(fs.get(), fout, start, safe_cast<int>(size));
ResourceGuard rlock(write_settings.resource_link, size);
int bytes_written;
try
{
bytes_written = hdfsWrite(fs.get(), fout, start, safe_cast<int>(size));
}
catch (...)
{
write_settings.resource_link.accumulate(size); // We assume no resource was used in case of failure
throw;
}
rlock.unlock();
if (bytes_written < 0)
{
write_settings.resource_link.accumulate(size); // We assume no resource was used in case of failure
throw Exception(ErrorCodes::NETWORK_ERROR, "Fail to write HDFS file: {} {}", hdfs_uri, std::string(hdfsGetLastError()));
}
write_settings.resource_link.adjust(size, bytes_written);
if (write_settings.remote_throttler)
write_settings.remote_throttler->add(bytes_written, ProfileEvents::RemoteWriteThrottlerBytes, ProfileEvents::RemoteWriteThrottlerSleepMicroseconds);