Add async iteration to object storage

This commit is contained in:
alesapin 2023-06-04 16:03:44 +02:00
parent 4fd674a3b2
commit 0f4dd26ceb
11 changed files with 361 additions and 0 deletions

View File

@ -131,6 +131,10 @@
M(DistributedInsertThreadsActive, "Number of threads used for INSERT into Distributed running a task.") \
M(StorageS3Threads, "Number of threads in the StorageS3 thread pool.") \
M(StorageS3ThreadsActive, "Number of threads in the StorageS3 thread pool running a task.") \
M(ObjectStorageS3Threads, "Number of threads in the S3ObjectStorage thread pool.") \
M(ObjectStorageS3ThreadsActive, "Number of threads in the S3ObjectStorage thread pool running a task.") \
M(ObjectStorageAzureThreads, "Number of threads in the AzureObjectStorage thread pool.") \
M(ObjectStorageAzureThreadsActive, "Number of threads in the AzureObjectStorage thread pool running a task.") \
M(MergeTreePartsLoaderThreads, "Number of threads in the MergeTree parts loader thread pool.") \
M(MergeTreePartsLoaderThreadsActive, "Number of threads in the MergeTree parts loader thread pool running a task.") \
M(MergeTreePartsCleanerThreads, "Number of threads in the MergeTree parts cleaner thread pool.") \

View File

@ -11,9 +11,16 @@
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.h>
#include <Disks/ObjectStorages/ObjectStorageIteratorAsync.h>
#include <Interpreters/Context.h>
#include <Common/logger_useful.h>
namespace CurrentMetrics
{
extern const Metric ObjectStorageAzureThreads;
extern const Metric ObjectStorageAzureThreadsActive;
}
namespace DB
{
@ -26,6 +33,60 @@ namespace ErrorCodes
}
namespace
{
class AzureIteratorAsync final : public IObjectStorageIteratorAsync
{
public:
AzureIteratorAsync(
const std::string & path_prefix,
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client_,
size_t max_list_size)
: IObjectStorageIteratorAsync(
CurrentMetrics::ObjectStorageAzureThreads,
CurrentMetrics::ObjectStorageAzureThreadsActive,
"ListObjectAzure")
, client(client_)
{
options.Prefix = path_prefix;
options.PageSizeHint = static_cast<int>(max_list_size);
}
private:
bool getBatchAndCheckNext(RelativePathsWithMetadata & batch) override
{
auto outcome = client->ListBlobs(options);
auto blob_list_response = client->ListBlobs(options);
auto blobs_list = blob_list_response.Blobs;
for (const auto & blob : blobs_list)
{
batch.emplace_back(
blob.Name,
ObjectMetadata{
static_cast<uint64_t>(blob.BlobSize),
Poco::Timestamp::fromEpochTime(
std::chrono::duration_cast<std::chrono::seconds>(
blob.Details.LastModified.time_since_epoch()).count()),
{}});
}
options.ContinuationToken = blob_list_response.NextPageToken;
if (blob_list_response.HasPage())
return true;
return false;
}
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client;
Azure::Storage::Blobs::ListBlobsOptions options;
};
}
AzureObjectStorage::AzureObjectStorage(
const String & name_,
AzureClientPtr && client_,
@ -67,6 +128,14 @@ bool AzureObjectStorage::exists(const StoredObject & object) const
return false;
}
ObjectStorageIteratorPtr AzureObjectStorage::iterate(const std::string & path_prefix) const
{
auto settings_ptr = settings.get();
auto client_ptr = client.get();
return std::make_shared<AzureIteratorAsync>(path_prefix, client_ptr, settings_ptr->list_object_keys_size);
}
void AzureObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const
{
auto client_ptr = client.get();

View File

@ -60,6 +60,8 @@ public:
void listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const override;
ObjectStorageIteratorPtr iterate(const std::string & path_prefix) const override;
DataSourceDescription getDataSourceDescription() const override { return data_source_description; }
std::string getName() const override { return "AzureObjectStorage"; }

View File

@ -5,6 +5,7 @@
#include <IO/copyData.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Interpreters/Context.h>
#include <Disks/ObjectStorages/ObjectStorageIterator.h>
namespace DB
@ -29,6 +30,14 @@ void IObjectStorage::listObjects(const std::string &, RelativePathsWithMetadata
}
ObjectStorageIteratorPtr IObjectStorage::iterate(const std::string & path_prefix) const
{
RelativePathsWithMetadata files;
listObjects(path_prefix, files, 0);
return std::make_shared<ObjectStorageIteratorFromList>(std::move(files));
}
std::optional<ObjectMetadata> IObjectStorage::tryGetObjectMetadata(const std::string & path) const
{
try

View File

@ -20,6 +20,9 @@
#include <Disks/WriteMode.h>
#include <Interpreters/Context_fwd.h>
#include <Core/Types.h>
#include <Disks/DirectoryIterator.h>
#include <Common/ThreadPool.h>
#include <Interpreters/threadPoolCallbackRunner.h>
namespace DB
@ -51,6 +54,8 @@ struct RelativePathWithMetadata
using RelativePathsWithMetadata = std::vector<RelativePathWithMetadata>;
class IObjectStorageIterator;
using ObjectStorageIteratorPtr = std::shared_ptr<IObjectStorageIterator>;
/// Base class for all object storages which implement some subset of ordinary filesystem operations.
///
@ -75,6 +80,8 @@ public:
virtual void listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const;
virtual ObjectStorageIteratorPtr iterate(const std::string & path_prefix) const;
/// Get object metadata if supported. It should be possible to receive
/// at least size of object
virtual std::optional<ObjectMetadata> tryGetObjectMetadata(const std::string & path) const;

View File

@ -0,0 +1,20 @@
#include <Disks/ObjectStorages/ObjectStorageIterator.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
RelativePathWithMetadata ObjectStorageIteratorFromList::current() const
{
if (!isValid())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to access invalid iterator");
return *batch_iterator;
}
}

View File

@ -0,0 +1,53 @@
#pragma once
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <memory>
namespace DB
{
class IObjectStorageIterator
{
public:
virtual void next() = 0;
virtual bool isValid() const = 0;
virtual RelativePathWithMetadata current() const = 0;
virtual size_t getAccumulatedSize() const = 0;
virtual ~IObjectStorageIterator() = default;
};
using ObjectStorageIteratorPtr = std::shared_ptr<IObjectStorageIterator>;
class ObjectStorageIteratorFromList : public IObjectStorageIterator
{
public:
explicit ObjectStorageIteratorFromList(RelativePathsWithMetadata && batch_)
: batch(std::move(batch_))
, batch_iterator(batch.begin())
{
}
void next() override
{
if (isValid())
++batch_iterator;
}
bool isValid() const override
{
return batch_iterator != batch.end();
}
RelativePathWithMetadata current() const override;
size_t getAccumulatedSize() const override
{
return batch.size();
}
private:
RelativePathsWithMetadata batch;
RelativePathsWithMetadata::iterator batch_iterator;
};
}

View File

@ -0,0 +1,64 @@
#include <Disks/ObjectStorages/ObjectStorageIteratorAsync.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
void IObjectStorageIteratorAsync::next()
{
std::lock_guard lock(mutex);
if (current_batch_iterator != current_batch.end())
{
++current_batch_iterator;
}
else if (!is_finished)
{
if (outcome_future.valid())
{
BatchAndHasNext next_batch = outcome_future.get();
current_batch = std::move(next_batch.batch);
accumulated_size.fetch_add(current_batch.size(), std::memory_order_relaxed);
current_batch_iterator = current_batch.begin();
if (next_batch.has_next)
outcome_future = scheduleBatch();
else
is_finished = true;
}
}
}
std::future<IObjectStorageIteratorAsync::BatchAndHasNext> IObjectStorageIteratorAsync::scheduleBatch()
{
return list_objects_scheduler([this]
{
BatchAndHasNext result;
result.has_next = getBatchAndCheckNext(result.batch);
return result;
}, Priority{});
}
bool IObjectStorageIteratorAsync::isValid() const
{
return current_batch_iterator != current_batch.end();
}
RelativePathWithMetadata IObjectStorageIteratorAsync::current() const
{
if (!isValid())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to access invalid iterator");
return *current_batch_iterator;
}
size_t IObjectStorageIteratorAsync::getAccumulatedSize() const
{
return accumulated_size.load(std::memory_order_relaxed);
}
}

View File

@ -0,0 +1,58 @@
#pragma once
#include <Disks/ObjectStorages/ObjectStorageIterator.h>
#include <Common/ThreadPool.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <mutex>
#include <Common/CurrentMetrics.h>
namespace DB
{
class IObjectStorageIteratorAsync : public IObjectStorageIterator
{
public:
IObjectStorageIteratorAsync(
CurrentMetrics::Metric threads_metric,
CurrentMetrics::Metric threads_active_metric,
const std::string & thread_name)
: list_objects_pool(threads_metric, threads_active_metric, 1)
, list_objects_scheduler(threadPoolCallbackRunner<BatchAndHasNext>(list_objects_pool, thread_name))
{
}
void next() override;
bool isValid() const override;
RelativePathWithMetadata current() const override;
size_t getAccumulatedSize() const override;
~IObjectStorageIteratorAsync() override
{
list_objects_pool.wait();
}
protected:
virtual bool getBatchAndCheckNext(RelativePathsWithMetadata & batch) = 0;
struct BatchAndHasNext
{
RelativePathsWithMetadata batch;
bool has_next;
};
std::future<BatchAndHasNext> scheduleBatch();
bool is_finished{false};
std::mutex mutex;
ThreadPool list_objects_pool;
ThreadPoolCallbackRunner<BatchAndHasNext> list_objects_scheduler;
std::future<BatchAndHasNext> outcome_future;
RelativePathsWithMetadata current_batch;
RelativePathsWithMetadata::iterator current_batch_iterator;
std::atomic<size_t> accumulated_size = 0;
};
}

View File

@ -3,6 +3,7 @@
#if USE_AWS_S3
#include <IO/S3Common.h>
#include <Disks/ObjectStorages/ObjectStorageIteratorAsync.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
@ -33,6 +34,13 @@ namespace ProfileEvents
extern const Event DiskS3ListObjects;
}
namespace CurrentMetrics
{
extern const Metric ObjectStorageS3Threads;
extern const Metric ObjectStorageS3ThreadsActive;
}
namespace DB
{
@ -84,6 +92,62 @@ void logIfError(const Aws::Utils::Outcome<Result, Error> & response, std::functi
}
namespace
{
class S3IteratorAsync final : public IObjectStorageIteratorAsync
{
public:
S3IteratorAsync(
const std::string & bucket,
const std::string & path_prefix,
std::shared_ptr<const S3::Client> client_,
size_t max_list_size)
: IObjectStorageIteratorAsync(
CurrentMetrics::ObjectStorageS3Threads,
CurrentMetrics::ObjectStorageS3ThreadsActive,
"ListObjectS3")
, client(client_)
{
request.SetBucket(bucket);
request.SetPrefix(path_prefix);
request.SetMaxKeys(static_cast<int>(max_list_size));
}
private:
bool getBatchAndCheckNext(RelativePathsWithMetadata & batch) override
{
ProfileEvents::increment(ProfileEvents::S3ListObjects);
bool result = false;
auto outcome = client->ListObjectsV2(request);
/// Outcome failure will be handled on the caller side.
if (outcome.IsSuccess())
{
auto objects = outcome.GetResult().GetContents();
result = !objects.empty();
for (const auto & object : objects)
batch.emplace_back(object.GetKey(), ObjectMetadata{static_cast<uint64_t>(object.GetSize()), Poco::Timestamp::fromEpochTime(object.GetLastModified().Seconds()), {}});
if (result)
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
return result;
}
throw Exception(ErrorCodes::S3_ERROR, "Could not list objects in bucket {} with prefix {}, S3 exception: {}, message: {}",
quoteString(request.GetBucket()), quoteString(request.GetPrefix()),
backQuote(outcome.GetError().GetExceptionName()), quoteString(outcome.GetError().GetMessage()));
}
std::shared_ptr<const S3::Client> client;
S3::ListObjectsV2Request request;
};
}
bool S3ObjectStorage::exists(const StoredObject & object) const
{
auto settings_ptr = s3_settings.get();
@ -183,6 +247,15 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
disk_write_settings);
}
ObjectStorageIteratorPtr S3ObjectStorage::iterate(const std::string & path_prefix) const
{
auto settings_ptr = s3_settings.get();
auto client_ptr = client.get();
return std::make_shared<S3IteratorAsync>(bucket, path_prefix, client_ptr, settings_ptr->list_object_keys_size);
}
void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const
{
auto settings_ptr = s3_settings.get();

View File

@ -102,6 +102,8 @@ public:
void listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const override;
ObjectStorageIteratorPtr iterate(const std::string & path_prefix) const override;
/// Uses `DeleteObjectRequest`.
void removeObject(const StoredObject & object) override;