Define S3 client with bucket and endpoint resolution (#45783)

* Update aws

* Define S3 client with bucket and endpoint resolution

* Add defines for ErrorCodes

* Use S3Client everywhere

* Remove unused errorcode

* Add DROP S3 CLIENT CACHE query

* Add a comment

* Fix style

* Update aws

* Update reference files

* Add missing include

* Fix unit test

* Remove unneeded declarations

* Correctly use RetryStrategy

* Rename S3Client to Client

* Fix retry count

* fix clang-tidy warnings
This commit is contained in:
Antonio Andelic 2023-02-03 14:30:52 +01:00 committed by GitHub
parent c34a09f215
commit d5117f2aa6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 1115 additions and 380 deletions

2
contrib/aws vendored

@ -1 +1 @@
Subproject commit 4a12641211d4dbc8e2fdb2dd0f1eea0927db9252
Subproject commit 06a6610e6fb3385e22ad85014a67aa307825ffb1

View File

@ -146,6 +146,7 @@ enum class AccessType
M(SYSTEM_DROP_COMPILED_EXPRESSION_CACHE, "SYSTEM DROP COMPILED EXPRESSION, DROP COMPILED EXPRESSION CACHE, DROP COMPILED EXPRESSIONS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_FILESYSTEM_CACHE, "SYSTEM DROP FILESYSTEM CACHE, DROP FILESYSTEM CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_USERS, "RELOAD USERS", GLOBAL, SYSTEM_RELOAD) \

View File

@ -9,13 +9,12 @@
#include <IO/WriteBufferFromS3.h>
#include <IO/HTTPHeaderEntries.h>
#include <IO/S3/copyS3File.h>
#include <IO/S3/Client.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/ListObjectsRequest.h>
#include <filesystem>
@ -31,7 +30,7 @@ namespace ErrorCodes
namespace
{
std::shared_ptr<Aws::S3::S3Client>
std::shared_ptr<S3::Client>
makeS3Client(const S3::URI & s3_uri, const String & access_key_id, const String & secret_access_key, const ContextPtr & context)
{
auto settings = context->getStorageS3Settings().getSettings(s3_uri.uri.toString());
@ -71,9 +70,9 @@ namespace
context->getConfigRef().getBool("s3.use_insecure_imds_request", false)));
}
Aws::Vector<Aws::S3::Model::Object> listObjects(Aws::S3::S3Client & client, const S3::URI & s3_uri, const String & file_name)
Aws::Vector<Aws::S3::Model::Object> listObjects(S3::Client & client, const S3::URI & s3_uri, const String & file_name)
{
Aws::S3::Model::ListObjectsRequest request;
S3::ListObjectsRequest request;
request.SetBucket(s3_uri.bucket);
request.SetPrefix(fs::path{s3_uri.key} / file_name);
request.SetMaxKeys(1);
@ -228,7 +227,7 @@ std::unique_ptr<WriteBuffer> BackupWriterS3::writeFile(const String & file_name)
void BackupWriterS3::removeFile(const String & file_name)
{
Aws::S3::Model::DeleteObjectRequest request;
S3::DeleteObjectRequest request;
request.SetBucket(s3_uri.bucket);
request.SetKey(fs::path(s3_uri.key) / file_name);
auto outcome = client->DeleteObject(request);
@ -285,7 +284,7 @@ void BackupWriterS3::removeFilesBatch(const Strings & file_names)
Aws::S3::Model::Delete delkeys;
delkeys.SetObjects(current_chunk);
Aws::S3::Model::DeleteObjectsRequest request;
S3::DeleteObjectsRequest request;
request.SetBucket(s3_uri.bucket);
request.SetDelete(delkeys);

View File

@ -7,7 +7,6 @@
#include <IO/ReadSettings.h>
#include <IO/S3Common.h>
#include <Storages/StorageS3Settings.h>
#include <aws/s3/S3Client.h>
namespace DB
@ -27,7 +26,7 @@ public:
private:
S3::URI s3_uri;
std::shared_ptr<Aws::S3::S3Client> client;
std::shared_ptr<S3::Client> client;
ReadSettings read_settings;
S3Settings::RequestSettings request_settings;
};
@ -73,7 +72,7 @@ private:
void removeFilesBatch(const Strings & file_names);
S3::URI s3_uri;
std::shared_ptr<Aws::S3::S3Client> client;
std::shared_ptr<S3::Client> client;
ReadSettings read_settings;
S3Settings::RequestSettings request_settings;
Poco::Logger * log;

View File

@ -17,10 +17,7 @@
#include <Common/Macros.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/S3Errors.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <filesystem>
@ -31,7 +28,7 @@ namespace DB
struct KeeperSnapshotManagerS3::S3Configuration
{
S3Configuration(S3::URI uri_, S3::AuthSettings auth_settings_, std::shared_ptr<const Aws::S3::S3Client> client_)
S3Configuration(S3::URI uri_, S3::AuthSettings auth_settings_, std::shared_ptr<const S3::Client> client_)
: uri(std::move(uri_))
, auth_settings(std::move(auth_settings_))
, client(std::move(client_))
@ -39,7 +36,7 @@ struct KeeperSnapshotManagerS3::S3Configuration
S3::URI uri;
S3::AuthSettings auth_settings;
std::shared_ptr<const Aws::S3::S3Client> client;
std::shared_ptr<const S3::Client> client;
};
KeeperSnapshotManagerS3::KeeperSnapshotManagerS3()
@ -202,7 +199,7 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const std::string & snapshot_pa
LOG_INFO(log, "Removing lock file");
try
{
Aws::S3::Model::DeleteObjectRequest delete_request;
S3::DeleteObjectRequest delete_request;
delete_request.SetBucket(s3_client->uri.bucket);
delete_request.SetKey(lock_file);
auto delete_outcome = s3_client->client->DeleteObject(delete_request);

View File

@ -21,10 +21,6 @@
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Disks/ObjectStorages/S3/diskSettings.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <Common/getRandomASCIIString.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/logger_useful.h>
@ -213,7 +209,7 @@ void S3ObjectStorage::findAllFiles(const std::string & path, RelativePathsWithSi
auto settings_ptr = s3_settings.get();
auto client_ptr = client.get();
Aws::S3::Model::ListObjectsV2Request request;
S3::ListObjectsV2Request request;
request.SetBucket(bucket);
request.SetPrefix(path);
if (max_keys)
@ -257,7 +253,7 @@ void S3ObjectStorage::getDirectoryContents(const std::string & path,
auto settings_ptr = s3_settings.get();
auto client_ptr = client.get();
Aws::S3::Model::ListObjectsV2Request request;
S3::ListObjectsV2Request request;
request.SetBucket(bucket);
/// NOTE: if you do "ls /foo" instead of "ls /foo/" over S3 with this API
/// it will return only "/foo" itself without any underlying nodes.
@ -304,7 +300,7 @@ void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exis
ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
ProfileEvents::increment(ProfileEvents::DiskS3DeleteObjects);
Aws::S3::Model::DeleteObjectRequest request;
S3::DeleteObjectRequest request;
request.SetBucket(bucket);
request.SetKey(object.absolute_path);
auto outcome = client_ptr->DeleteObject(request);
@ -352,7 +348,7 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
ProfileEvents::increment(ProfileEvents::DiskS3DeleteObjects);
Aws::S3::Model::DeleteObjectsRequest request;
S3::DeleteObjectsRequest request;
request.SetBucket(bucket);
request.SetDelete(delkeys);
auto outcome = client_ptr->DeleteObjects(request);
@ -435,7 +431,7 @@ void S3ObjectStorage::setNewSettings(std::unique_ptr<S3ObjectStorageSettings> &&
s3_settings.set(std::move(s3_settings_));
}
void S3ObjectStorage::setNewClient(std::unique_ptr<Aws::S3::S3Client> && client_)
void S3ObjectStorage::setNewClient(std::unique_ptr<S3::Client> && client_)
{
client.set(std::move(client_));
}
@ -447,7 +443,7 @@ void S3ObjectStorage::shutdown()
/// If S3 request is failed and the method below is executed S3 client immediately returns the last failed S3 request outcome.
/// If S3 is healthy nothing wrong will be happened and S3 requests will be processed in a regular way without errors.
/// This should significantly speed up shutdown process if S3 is unhealthy.
const_cast<Aws::S3::S3Client &>(*client_ptr).DisableRequestProcessing();
const_cast<S3::Client &>(*client_ptr).DisableRequestProcessing();
}
void S3ObjectStorage::startup()
@ -455,7 +451,7 @@ void S3ObjectStorage::startup()
auto client_ptr = client.get();
/// Need to be enabled if it was disabled during shutdown() call.
const_cast<Aws::S3::S3Client &>(*client_ptr).EnableRequestProcessing();
const_cast<S3::Client &>(*client_ptr).EnableRequestProcessing();
}
void S3ObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)

View File

@ -7,7 +7,6 @@
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/ObjectStorages/S3/S3Capabilities.h>
#include <memory>
#include <aws/s3/S3Client.h>
#include <Storages/StorageS3Settings.h>
#include <Common/MultiVersion.h>
#include <Common/logger_useful.h>
@ -46,7 +45,7 @@ private:
S3ObjectStorage(
const char * logger_name,
std::unique_ptr<Aws::S3::S3Client> && client_,
std::unique_ptr<S3::Client> && client_,
std::unique_ptr<S3ObjectStorageSettings> && s3_settings_,
String version_id_,
const S3Capabilities & s3_capabilities_,
@ -68,7 +67,7 @@ private:
public:
template <class ...Args>
S3ObjectStorage(std::unique_ptr<Aws::S3::S3Client> && client_, Args && ...args)
explicit S3ObjectStorage(std::unique_ptr<S3::Client> && client_, Args && ...args)
: S3ObjectStorage("S3ObjectStorage", std::move(client_), std::forward<Args>(args)...)
{
}
@ -163,14 +162,14 @@ public:
private:
void setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_);
void setNewClient(std::unique_ptr<Aws::S3::S3Client> && client_);
void setNewClient(std::unique_ptr<S3::Client> && client_);
void removeObjectImpl(const StoredObject & object, bool if_exists);
void removeObjectsImpl(const StoredObjects & objects, bool if_exists);
std::string bucket;
MultiVersion<Aws::S3::S3Client> client;
MultiVersion<S3::Client> client;
MultiVersion<S3ObjectStorageSettings> s3_settings;
S3Capabilities s3_capabilities;
@ -191,7 +190,7 @@ public:
std::string getName() const override { return "S3PlainObjectStorage"; }
template <class ...Args>
S3PlainObjectStorage(Args && ...args)
explicit S3PlainObjectStorage(Args && ...args)
: S3ObjectStorage("S3PlainObjectStorage", std::forward<Args>(args)...)
{
data_source_description.type = DataSourceType::S3_Plain;

View File

@ -107,7 +107,7 @@ std::shared_ptr<S3::ProxyConfiguration> getProxyConfiguration(const String & pre
}
std::unique_ptr<Aws::S3::S3Client> getClient(
std::unique_ptr<S3::Client> getClient(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,

View File

@ -7,11 +7,13 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Interpreters/Context_fwd.h>
#include <IO/S3/Client.h>
namespace Aws
{
namespace S3
{
class S3Client;
class Client;
}
}
@ -22,7 +24,7 @@ struct S3ObjectStorageSettings;
std::unique_ptr<S3ObjectStorageSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context);
std::unique_ptr<Aws::S3::S3Client> getClient(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context, const S3ObjectStorageSettings & settings);
std::unique_ptr<S3::Client> getClient(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context, const S3ObjectStorageSettings & settings);
}

View File

@ -7,9 +7,7 @@
#include <IO/ReadBufferFromS3.h>
#include <IO/S3/getObjectInfo.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <IO/S3/Requests.h>
#include <Common/Stopwatch.h>
#include <Common/Throttler.h>
@ -44,7 +42,7 @@ namespace ErrorCodes
ReadBufferFromS3::ReadBufferFromS3(
std::shared_ptr<const Aws::S3::S3Client> client_ptr_,
std::shared_ptr<const S3::Client> client_ptr_,
const String & bucket_,
const String & key_,
const String & version_id_,
@ -281,7 +279,7 @@ SeekableReadBuffer::Range ReadBufferFromS3::getRemainingReadRange() const
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
{
Aws::S3::Model::GetObjectRequest req;
S3::GetObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
if (!version_id.empty())

View File

@ -19,7 +19,7 @@
namespace Aws::S3
{
class S3Client;
class Client;
}
namespace DB
@ -30,7 +30,7 @@ namespace DB
class ReadBufferFromS3 : public ReadBufferFromFileBase
{
private:
std::shared_ptr<const Aws::S3::S3Client> client_ptr;
std::shared_ptr<const S3::Client> client_ptr;
String bucket;
String key;
String version_id;
@ -49,7 +49,7 @@ private:
public:
ReadBufferFromS3(
std::shared_ptr<const Aws::S3::S3Client> client_ptr_,
std::shared_ptr<const S3::Client> client_ptr_,
const String & bucket_,
const String & key_,
const String & version_id_,
@ -95,7 +95,7 @@ class ReadBufferS3Factory : public ParallelReadBuffer::ReadBufferFactory, public
{
public:
explicit ReadBufferS3Factory(
std::shared_ptr<const Aws::S3::S3Client> client_ptr_,
std::shared_ptr<const S3::Client> client_ptr_,
const String & bucket_,
const String & key_,
const String & version_id_,
@ -126,7 +126,7 @@ public:
String getFileName() const override { return bucket + "/" + key; }
private:
std::shared_ptr<const Aws::S3::S3Client> client_ptr;
std::shared_ptr<const S3::Client> client_ptr;
const String bucket;
const String key;
const String version_id;

400
src/IO/S3/Client.cpp Normal file
View File

@ -0,0 +1,400 @@
#include <IO/S3/Client.h>
#if USE_AWS_S3
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/s3/model/HeadBucketRequest.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/core/client/AWSErrorMarshaller.h>
#include <aws/core/endpoint/EndpointParameter.h>
#include <IO/S3Common.h>
#include <IO/S3/Requests.h>
#include <Common/assert_cast.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace S3
{
Client::RetryStrategy::RetryStrategy(std::shared_ptr<Aws::Client::RetryStrategy> wrapped_strategy_)
: wrapped_strategy(std::move(wrapped_strategy_))
{
if (!wrapped_strategy)
wrapped_strategy = Aws::Client::InitRetryStrategy();
}
/// NOLINTNEXTLINE(google-runtime-int)
bool Client::RetryStrategy::ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error, long attemptedRetries) const
{
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::MOVED_PERMANENTLY)
return false;
return wrapped_strategy->ShouldRetry(error, attemptedRetries);
}
/// NOLINTNEXTLINE(google-runtime-int)
long Client::RetryStrategy::CalculateDelayBeforeNextRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error, long attemptedRetries) const
{
return wrapped_strategy->CalculateDelayBeforeNextRetry(error, attemptedRetries);
}
/// NOLINTNEXTLINE(google-runtime-int)
long Client::RetryStrategy::GetMaxAttempts() const
{
return wrapped_strategy->GetMaxAttempts();
}
void Client::RetryStrategy::GetSendToken()
{
return wrapped_strategy->GetSendToken();
}
bool Client::RetryStrategy::HasSendToken()
{
return wrapped_strategy->HasSendToken();
}
void Client::RetryStrategy::RequestBookkeeping(const Aws::Client::HttpResponseOutcome& httpResponseOutcome)
{
return wrapped_strategy->RequestBookkeeping(httpResponseOutcome);
}
void Client::RetryStrategy::RequestBookkeeping(const Aws::Client::HttpResponseOutcome& httpResponseOutcome, const Aws::Client::AWSError<Aws::Client::CoreErrors>& lastError)
{
return wrapped_strategy->RequestBookkeeping(httpResponseOutcome, lastError);
}
bool Client::checkIfWrongRegionDefined(const std::string & bucket, const Aws::S3::S3Error & error, std::string & region) const
{
if (detect_region)
return false;
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::BAD_REQUEST && error.GetExceptionName() == "AuthorizationHeaderMalformed")
{
region = GetErrorMarshaller()->ExtractRegion(error);
if (region.empty())
region = getRegionForBucket(bucket, /*force_detect*/ true);
assert(!explicit_region.empty());
if (region == explicit_region)
return false;
insertRegionOverride(bucket, region);
return true;
}
return false;
}
void Client::insertRegionOverride(const std::string & bucket, const std::string & region) const
{
std::lock_guard lock(cache->region_cache_mutex);
auto [it, inserted] = cache->region_for_bucket_cache.emplace(bucket, region);
if (inserted)
LOG_INFO(log, "Detected different region ('{}') for bucket {} than the one defined ('{}')", region, bucket, explicit_region);
}
Model::HeadObjectOutcome Client::HeadObject(const HeadObjectRequest & request) const
{
const auto & bucket = request.GetBucket();
if (auto region = getRegionForBucket(bucket); !region.empty())
{
if (!detect_region)
LOG_INFO(log, "Using region override {} for bucket {}", region, bucket);
request.overrideRegion(std::move(region));
}
if (auto uri = getURIForBucket(bucket); uri.has_value())
request.overrideURI(std::move(*uri));
auto result = Aws::S3::S3Client::HeadObject(request);
if (result.IsSuccess())
return result;
const auto & error = result.GetError();
std::string new_region;
if (checkIfWrongRegionDefined(bucket, error, new_region))
{
request.overrideRegion(new_region);
return HeadObject(request);
}
if (error.GetResponseCode() != Aws::Http::HttpResponseCode::MOVED_PERMANENTLY)
return result;
// maybe we detect a correct region
if (!detect_region)
{
if (auto region = GetErrorMarshaller()->ExtractRegion(error); !region.empty() && region != explicit_region)
{
request.overrideRegion(region);
insertRegionOverride(bucket, region);
}
}
auto bucket_uri = getURIForBucket(bucket);
if (!bucket_uri)
{
if (auto maybe_error = updateURIForBucketForHead(bucket); maybe_error.has_value())
return *maybe_error;
if (auto region = getRegionForBucket(bucket); !region.empty())
{
if (!detect_region)
LOG_INFO(log, "Using region override {} for bucket {}", region, bucket);
request.overrideRegion(std::move(region));
}
bucket_uri = getURIForBucket(bucket);
if (!bucket_uri)
{
LOG_ERROR(log, "Missing resolved URI for bucket {}, maybe the cache was cleaned", bucket);
return result;
}
}
const auto & current_uri_override = request.getURIOverride();
/// we already tried with this URI
if (current_uri_override && current_uri_override->uri == bucket_uri->uri)
{
LOG_INFO(log, "Getting redirected to the same invalid location {}", bucket_uri->uri.toString());
return result;
}
request.overrideURI(std::move(*bucket_uri));
return Aws::S3::S3Client::HeadObject(request);
}
Model::ListObjectsV2Outcome Client::ListObjectsV2(const ListObjectsV2Request & request) const
{
return doRequest(request, [this](const Model::ListObjectsV2Request & req) { return Aws::S3::S3Client::ListObjectsV2(req); });
}
Model::ListObjectsOutcome Client::ListObjects(const ListObjectsRequest & request) const
{
return doRequest(request, [this](const Model::ListObjectsRequest & req) { return Aws::S3::S3Client::ListObjects(req); });
}
Model::GetObjectOutcome Client::GetObject(const GetObjectRequest & request) const
{
return doRequest(request, [this](const Model::GetObjectRequest & req) { return Aws::S3::S3Client::GetObject(req); });
}
Model::AbortMultipartUploadOutcome Client::AbortMultipartUpload(const AbortMultipartUploadRequest & request) const
{
return doRequest(
request, [this](const Model::AbortMultipartUploadRequest & req) { return Aws::S3::S3Client::AbortMultipartUpload(req); });
}
Model::CreateMultipartUploadOutcome Client::CreateMultipartUpload(const CreateMultipartUploadRequest & request) const
{
return doRequest(
request, [this](const Model::CreateMultipartUploadRequest & req) { return Aws::S3::S3Client::CreateMultipartUpload(req); });
}
Model::CompleteMultipartUploadOutcome Client::CompleteMultipartUpload(const CompleteMultipartUploadRequest & request) const
{
return doRequest(
request, [this](const Model::CompleteMultipartUploadRequest & req) { return Aws::S3::S3Client::CompleteMultipartUpload(req); });
}
Model::CopyObjectOutcome Client::CopyObject(const CopyObjectRequest & request) const
{
return doRequest(request, [this](const Model::CopyObjectRequest & req) { return Aws::S3::S3Client::CopyObject(req); });
}
Model::PutObjectOutcome Client::PutObject(const PutObjectRequest & request) const
{
return doRequest(request, [this](const Model::PutObjectRequest & req) { return Aws::S3::S3Client::PutObject(req); });
}
Model::UploadPartOutcome Client::UploadPart(const UploadPartRequest & request) const
{
return doRequest(request, [this](const Model::UploadPartRequest & req) { return Aws::S3::S3Client::UploadPart(req); });
}
Model::UploadPartCopyOutcome Client::UploadPartCopy(const UploadPartCopyRequest & request) const
{
return doRequest(request, [this](const Model::UploadPartCopyRequest & req) { return Aws::S3::S3Client::UploadPartCopy(req); });
}
Model::DeleteObjectOutcome Client::DeleteObject(const DeleteObjectRequest & request) const
{
return doRequest(request, [this](const Model::DeleteObjectRequest & req) { return Aws::S3::S3Client::DeleteObject(req); });
}
Model::DeleteObjectsOutcome Client::DeleteObjects(const DeleteObjectsRequest & request) const
{
return doRequest(request, [this](const Model::DeleteObjectsRequest & req) { return Aws::S3::S3Client::DeleteObjects(req); });
}
std::string Client::getRegionForBucket(const std::string & bucket, bool force_detect) const
{
std::lock_guard lock(cache->region_cache_mutex);
if (auto it = cache->region_for_bucket_cache.find(bucket); it != cache->region_for_bucket_cache.end())
return it->second;
if (!force_detect && !detect_region)
return "";
LOG_INFO(log, "Resolving region for bucket {}", bucket);
Aws::S3::Model::HeadBucketRequest req;
req.SetBucket(bucket);
std::string region;
auto outcome = HeadBucket(req);
if (outcome.IsSuccess())
{
const auto & result = outcome.GetResult();
region = result.GetRegion();
}
else
{
static const std::string region_header = "x-amz-bucket-region";
const auto & headers = outcome.GetError().GetResponseHeaders();
if (auto it = headers.find(region_header); it != headers.end())
region = it->second;
}
if (region.empty())
{
LOG_INFO(log, "Failed resolving region for bucket {}", bucket);
return "";
}
LOG_INFO(log, "Found region {} for bucket {}", region, bucket);
auto [it, _] = cache->region_for_bucket_cache.emplace(bucket, std::move(region));
return it->second;
}
std::optional<S3::URI> Client::getURIFromError(const Aws::S3::S3Error & error) const
{
auto endpoint = GetErrorMarshaller()->ExtractEndpoint(error);
if (endpoint.empty())
return std::nullopt;
auto & s3_client = const_cast<Client &>(*this);
const auto * endpoint_provider = dynamic_cast<Aws::S3::Endpoint::S3DefaultEpProviderBase *>(s3_client.accessEndpointProvider().get());
auto resolved_endpoint = endpoint_provider->ResolveEndpoint({});
if (!resolved_endpoint.IsSuccess())
return std::nullopt;
auto uri = resolved_endpoint.GetResult().GetURI();
uri.SetAuthority(endpoint);
return S3::URI(uri.GetURIString());
}
// Do a list request because head requests don't have body in response
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket) const
{
ListObjectsV2Request req;
req.SetBucket(bucket);
req.SetMaxKeys(1);
auto result = ListObjectsV2(req);
if (result.IsSuccess())
return std::nullopt;
return result.GetError();
}
std::optional<S3::URI> Client::getURIForBucket(const std::string & bucket) const
{
std::lock_guard lock(cache->uri_cache_mutex);
if (auto it = cache->uri_for_bucket_cache.find(bucket); it != cache->uri_for_bucket_cache.end())
return it->second;
return std::nullopt;
}
void Client::updateURIForBucket(const std::string & bucket, S3::URI new_uri) const
{
std::lock_guard lock(cache->uri_cache_mutex);
if (auto it = cache->uri_for_bucket_cache.find(bucket); it != cache->uri_for_bucket_cache.end())
{
if (it->second.uri == new_uri.uri)
return;
LOG_INFO(log, "Updating URI for bucket {} to {}", bucket, new_uri.uri.toString());
it->second = std::move(new_uri);
return;
}
LOG_INFO(log, "Updating URI for bucket {} to {}", bucket, new_uri.uri.toString());
cache->uri_for_bucket_cache.emplace(bucket, std::move(new_uri));
}
void ClientCache::clearCache()
{
{
std::lock_guard lock(region_cache_mutex);
region_for_bucket_cache.clear();
}
{
std::lock_guard lock(uri_cache_mutex);
uri_for_bucket_cache.clear();
}
}
void ClientCacheRegistry::registerClient(const std::shared_ptr<ClientCache> & client_cache)
{
std::lock_guard lock(clients_mutex);
auto [it, inserted] = client_caches.emplace(client_cache.get(), client_cache);
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Same S3 client registered twice");
}
void ClientCacheRegistry::unregisterClient(ClientCache * client)
{
std::lock_guard lock(clients_mutex);
auto erased = client_caches.erase(client);
if (erased == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't unregister S3 client, either it was already unregistered or not registered at all");
}
void ClientCacheRegistry::clearCacheForAll()
{
std::lock_guard lock(clients_mutex);
for (auto it = client_caches.begin(); it != client_caches.end();)
{
if (auto locked_client = it->second.lock(); locked_client)
{
locked_client->clearCache();
++it;
}
else
{
LOG_INFO(&Poco::Logger::get("ClientCacheRegistry"), "Deleting leftover S3 client cache");
it = client_caches.erase(it);
}
}
}
}
}
#endif

309
src/IO/S3/Client.h Normal file
View File

@ -0,0 +1,309 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include <Common/logger_useful.h>
#include <Common/assert_cast.h>
#include <base/scope_guard.h>
#include <IO/S3/URI.h>
#include <IO/S3/Requests.h>
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/S3ServiceClientModel.h>
#include <aws/core/client/AWSErrorMarshaller.h>
#include <aws/core/client/RetryStrategy.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TOO_MANY_REDIRECTS;
}
namespace S3
{
namespace Model = Aws::S3::Model;
struct ClientCache
{
ClientCache() = default;
ClientCache(const ClientCache & other)
: region_for_bucket_cache(other.region_for_bucket_cache)
, uri_for_bucket_cache(other.uri_for_bucket_cache)
{}
ClientCache(ClientCache && other) = delete;
ClientCache & operator=(const ClientCache &) = delete;
ClientCache & operator=(ClientCache &&) = delete;
void clearCache();
std::mutex region_cache_mutex;
std::unordered_map<std::string, std::string> region_for_bucket_cache;
std::mutex uri_cache_mutex;
std::unordered_map<std::string, URI> uri_for_bucket_cache;
};
class ClientCacheRegistry
{
public:
static ClientCacheRegistry & instance()
{
static ClientCacheRegistry registry;
return registry;
}
void registerClient(const std::shared_ptr<ClientCache> & client_cache);
void unregisterClient(ClientCache * client);
void clearCacheForAll();
private:
ClientCacheRegistry() = default;
std::mutex clients_mutex;
std::unordered_map<ClientCache *, std::weak_ptr<ClientCache>> client_caches;
};
/// Client that improves the client from the AWS SDK
/// - inject region and URI into requests so they are rerouted to the correct destination if needed
/// - automatically detect endpoint and regions for each bucket and cache them
///
/// For this client to work correctly both Client::RetryStrategy and Requests defined in <IO/S3/Requests.h> should be used.
class Client : public Aws::S3::S3Client
{
public:
template <typename... Args>
static std::unique_ptr<Client> create(Args &&... args)
{
(verifyArgument(args), ...);
return std::unique_ptr<Client>(new Client(std::forward<Args>(args)...));
}
Client & operator=(const Client &) = delete;
Client(Client && other) = delete;
Client & operator=(Client &&) = delete;
~Client() override
{
try
{
ClientCacheRegistry::instance().unregisterClient(cache.get());
}
catch (...)
{
tryLogCurrentException(log);
throw;
}
}
/// Decorator for RetryStrategy needed for this client to work correctly
class RetryStrategy : public Aws::Client::RetryStrategy
{
public:
explicit RetryStrategy(std::shared_ptr<Aws::Client::RetryStrategy> wrapped_strategy_);
/// NOLINTNEXTLINE(google-runtime-int)
bool ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error, long attemptedRetries) const override;
/// NOLINTNEXTLINE(google-runtime-int)
long CalculateDelayBeforeNextRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error, long attemptedRetries) const override;
/// NOLINTNEXTLINE(google-runtime-int)
long GetMaxAttempts() const override;
void GetSendToken() override;
bool HasSendToken() override;
void RequestBookkeeping(const Aws::Client::HttpResponseOutcome& httpResponseOutcome) override;
void RequestBookkeeping(const Aws::Client::HttpResponseOutcome& httpResponseOutcome, const Aws::Client::AWSError<Aws::Client::CoreErrors>& lastError) override;
private:
std::shared_ptr<Aws::Client::RetryStrategy> wrapped_strategy;
};
Model::HeadObjectOutcome HeadObject(const HeadObjectRequest & request) const;
Model::ListObjectsV2Outcome ListObjectsV2(const ListObjectsV2Request & request) const;
Model::ListObjectsOutcome ListObjects(const ListObjectsRequest & request) const;
Model::GetObjectOutcome GetObject(const GetObjectRequest & request) const;
Model::AbortMultipartUploadOutcome AbortMultipartUpload(const AbortMultipartUploadRequest & request) const;
Model::CreateMultipartUploadOutcome CreateMultipartUpload(const CreateMultipartUploadRequest & request) const;
Model::CompleteMultipartUploadOutcome CompleteMultipartUpload(const CompleteMultipartUploadRequest & request) const;
Model::UploadPartOutcome UploadPart(const UploadPartRequest & request) const;
Model::UploadPartCopyOutcome UploadPartCopy(const UploadPartCopyRequest & request) const;
Model::CopyObjectOutcome CopyObject(const CopyObjectRequest & request) const;
Model::PutObjectOutcome PutObject(const PutObjectRequest & request) const;
Model::DeleteObjectOutcome DeleteObject(const DeleteObjectRequest & request) const;
Model::DeleteObjectsOutcome DeleteObjects(const DeleteObjectsRequest & request) const;
private:
template <typename... Args>
explicit Client(size_t max_redirects_, Args &&... args)
: Aws::S3::S3Client(std::forward<Args>(args)...)
, max_redirects(max_redirects_)
, log(&Poco::Logger::get("S3Client"))
{
auto * endpoint_provider = dynamic_cast<Aws::S3::Endpoint::S3DefaultEpProviderBase *>(accessEndpointProvider().get());
endpoint_provider->GetBuiltInParameters().GetParameter("Region").GetString(explicit_region);
std::string endpoint;
endpoint_provider->GetBuiltInParameters().GetParameter("Endpoint").GetString(endpoint);
detect_region = explicit_region == Aws::Region::AWS_GLOBAL && endpoint.find(".amazonaws.com") != std::string::npos;
cache = std::make_shared<ClientCache>();
ClientCacheRegistry::instance().registerClient(cache);
}
Client(const Client & other)
: Aws::S3::S3Client(other)
, explicit_region(other.explicit_region)
, detect_region(other.detect_region)
, max_redirects(other.max_redirects)
, log(&Poco::Logger::get("S3Client"))
{
cache = std::make_shared<ClientCache>(*other.cache);
ClientCacheRegistry::instance().registerClient(cache);
}
/// Make regular functions private
using Aws::S3::S3Client::HeadObject;
using Aws::S3::S3Client::ListObjectsV2;
using Aws::S3::S3Client::ListObjects;
using Aws::S3::S3Client::GetObject;
using Aws::S3::S3Client::AbortMultipartUpload;
using Aws::S3::S3Client::CreateMultipartUpload;
using Aws::S3::S3Client::CompleteMultipartUpload;
using Aws::S3::S3Client::UploadPart;
using Aws::S3::S3Client::UploadPartCopy;
using Aws::S3::S3Client::CopyObject;
using Aws::S3::S3Client::PutObject;
using Aws::S3::S3Client::DeleteObject;
using Aws::S3::S3Client::DeleteObjects;
template <typename RequestType, typename RequestFn>
std::invoke_result_t<RequestFn, RequestType>
doRequest(const RequestType & request, RequestFn request_fn) const
{
const auto & bucket = request.GetBucket();
if (auto region = getRegionForBucket(bucket); !region.empty())
{
if (!detect_region)
LOG_INFO(log, "Using region override {} for bucket {}", region, bucket);
request.overrideRegion(std::move(region));
}
if (auto uri = getURIForBucket(bucket); uri.has_value())
request.overrideURI(std::move(*uri));
bool found_new_endpoint = false;
// if we found correct endpoint after 301 responses, update the cache for future requests
SCOPE_EXIT(
if (found_new_endpoint)
{
auto uri_override = request.getURIOverride();
assert(uri_override.has_value());
updateURIForBucket(bucket, std::move(*uri_override));
}
);
for (size_t attempt = 0; attempt <= max_redirects; ++attempt)
{
auto result = request_fn(request);
if (result.IsSuccess())
return result;
const auto & error = result.GetError();
std::string new_region;
if (checkIfWrongRegionDefined(bucket, error, new_region))
{
request.overrideRegion(new_region);
continue;
}
if (error.GetResponseCode() != Aws::Http::HttpResponseCode::MOVED_PERMANENTLY)
return result;
// maybe we detect a correct region
if (!detect_region)
{
if (auto region = GetErrorMarshaller()->ExtractRegion(error); !region.empty() && region != explicit_region)
{
request.overrideRegion(region);
insertRegionOverride(bucket, region);
}
}
// we possibly got new location, need to try with that one
auto new_uri = getURIFromError(error);
if (!new_uri)
return result;
const auto & current_uri_override = request.getURIOverride();
/// we already tried with this URI
if (current_uri_override && current_uri_override->uri == new_uri->uri)
{
LOG_INFO(log, "Getting redirected to the same invalid location {}", new_uri->uri.toString());
return result;
}
found_new_endpoint = true;
request.overrideURI(*new_uri);
}
throw Exception(ErrorCodes::TOO_MANY_REDIRECTS, "Too many redirects");
}
void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const;
std::optional<S3::URI> getURIFromError(const Aws::S3::S3Error & error) const;
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket) const;
std::string getRegionForBucket(const std::string & bucket, bool force_detect = false) const;
std::optional<S3::URI> getURIForBucket(const std::string & bucket) const;
bool checkIfWrongRegionDefined(const std::string & bucket, const Aws::S3::S3Error & error, std::string & region) const;
void insertRegionOverride(const std::string & bucket, const std::string & region) const;
template <typename T>
static void verifyArgument(const T & /*arg*/)
{}
template <std::derived_from<Aws::Client::ClientConfiguration> T>
static void verifyArgument(const T & client_config)
{
if (!client_config.retryStrategy)
throw Exception(ErrorCodes::LOGICAL_ERROR, "The S3 client can only be used with Client::RetryStrategy, define it in the client configuration");
assert_cast<const RetryStrategy &>(*client_config.retryStrategy);
}
std::string explicit_region;
mutable bool detect_region = true;
mutable std::shared_ptr<ClientCache> cache;
const size_t max_redirects;
Poco::Logger * log;
};
}
}
#endif

87
src/IO/S3/Requests.h Normal file
View File

@ -0,0 +1,87 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include <IO/S3/URI.h>
#include <aws/core/endpoint/EndpointParameter.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/ListObjectsRequest.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/AbortMultipartUploadRequest.h>
#include <aws/s3/model/CreateMultipartUploadRequest.h>
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/UploadPartRequest.h>
#include <aws/s3/model/UploadPartCopyRequest.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
namespace DB::S3
{
namespace Model = Aws::S3::Model;
template <typename BaseRequest>
class ExtendedRequest : public BaseRequest
{
public:
Aws::Endpoint::EndpointParameters GetEndpointContextParams() const override
{
auto params = BaseRequest::GetEndpointContextParams();
if (!region_override.empty())
params.emplace_back("Region", region_override);
if (uri_override.has_value())
{
static const Aws::String AWS_S3_FORCE_PATH_STYLE = "ForcePathStyle";
params.emplace_back(AWS_S3_FORCE_PATH_STYLE, !uri_override->is_virtual_hosted_style);
params.emplace_back("Endpoint", uri_override->endpoint);
}
return params;
}
void overrideRegion(std::string region) const
{
region_override = std::move(region);
}
void overrideURI(S3::URI uri) const
{
uri_override = std::move(uri);
}
const auto & getURIOverride() const
{
return uri_override;
}
protected:
mutable std::string region_override;
mutable std::optional<S3::URI> uri_override;
};
using HeadObjectRequest = ExtendedRequest<Model::HeadObjectRequest>;
using ListObjectsV2Request = ExtendedRequest<Model::ListObjectsV2Request>;
using ListObjectsRequest = ExtendedRequest<Model::ListObjectsRequest>;
using GetObjectRequest = ExtendedRequest<Model::GetObjectRequest>;
using CreateMultipartUploadRequest = ExtendedRequest<Model::CreateMultipartUploadRequest>;
using CompleteMultipartUploadRequest = ExtendedRequest<Model::CompleteMultipartUploadRequest>;
using AbortMultipartUploadRequest = ExtendedRequest<Model::AbortMultipartUploadRequest>;
using UploadPartRequest = ExtendedRequest<Model::UploadPartRequest>;
using UploadPartCopyRequest = ExtendedRequest<Model::UploadPartCopyRequest>;
using PutObjectRequest = ExtendedRequest<Model::PutObjectRequest>;
using CopyObjectRequest = ExtendedRequest<Model::CopyObjectRequest>;
using DeleteObjectRequest = ExtendedRequest<Model::DeleteObjectRequest>;
using DeleteObjectsRequest = ExtendedRequest<Model::DeleteObjectsRequest>;
}
#endif

119
src/IO/S3/URI.cpp Normal file
View File

@ -0,0 +1,119 @@
#include <IO/S3/URI.h>
#if USE_AWS_S3
#include <Common/Exception.h>
#include <Common/quoteString.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <re2/re2.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace S3
{
URI::URI(const std::string & uri_)
{
/// Case when bucket name represented in domain name of S3 URL.
/// E.g. (https://bucket-name.s3.Region.amazonaws.com/key)
/// https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html#virtual-hosted-style-access
static const RE2 virtual_hosted_style_pattern(R"((.+)\.(s3|cos|obs|oss)([.\-][a-z0-9\-.:]+))");
/// Case when bucket name and key represented in path of S3 URL.
/// E.g. (https://s3.Region.amazonaws.com/bucket-name/key)
/// https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html#path-style-access
static const RE2 path_style_pattern("^/([^/]*)/(.*)");
static constexpr auto S3 = "S3";
static constexpr auto COSN = "COSN";
static constexpr auto COS = "COS";
static constexpr auto OBS = "OBS";
static constexpr auto OSS = "OSS";
uri = Poco::URI(uri_);
storage_name = S3;
if (uri.getHost().empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Host is empty in S3 URI.");
/// Extract object version ID from query string.
bool has_version_id = false;
for (const auto & [query_key, query_value] : uri.getQueryParameters())
if (query_key == "versionId")
{
version_id = query_value;
has_version_id = true;
}
/// Poco::URI will ignore '?' when parsing the path, but if there is a vestionId in the http parameter,
/// '?' can not be used as a wildcard, otherwise it will be ambiguous.
/// If no "vertionId" in the http parameter, '?' can be used as a wildcard.
/// It is necessary to encode '?' to avoid deletion during parsing path.
if (!has_version_id && uri_.find('?') != String::npos)
{
String uri_with_question_mark_encode;
Poco::URI::encode(uri_, "?", uri_with_question_mark_encode);
uri = Poco::URI(uri_with_question_mark_encode);
}
String name;
String endpoint_authority_from_uri;
if (re2::RE2::FullMatch(uri.getAuthority(), virtual_hosted_style_pattern, &bucket, &name, &endpoint_authority_from_uri))
{
is_virtual_hosted_style = true;
endpoint = uri.getScheme() + "://" + name + endpoint_authority_from_uri;
validateBucket(bucket, uri);
if (!uri.getPath().empty())
{
/// Remove leading '/' from path to extract key.
key = uri.getPath().substr(1);
}
boost::to_upper(name);
if (name != S3 && name != COS && name != OBS && name != OSS)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Object storage system name is unrecognized in virtual hosted style S3 URI: {}",
quoteString(name));
if (name == S3)
storage_name = name;
else if (name == OBS)
storage_name = OBS;
else if (name == OSS)
storage_name = OSS;
else
storage_name = COSN;
}
else if (re2::RE2::PartialMatch(uri.getPath(), path_style_pattern, &bucket, &key))
{
is_virtual_hosted_style = false;
endpoint = uri.getScheme() + "://" + uri.getAuthority();
validateBucket(bucket, uri);
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bucket or key name are invalid in S3 URI.");
}
void URI::validateBucket(const String & bucket, const Poco::URI & uri)
{
/// S3 specification requires at least 3 and at most 63 characters in bucket name.
/// https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-s3-bucket-naming-requirements.html
if (bucket.length() < 3 || bucket.length() > 63)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bucket name length is out of bounds in virtual hosted style S3 URI: {}{}",
quoteString(bucket), !uri.empty() ? " (" + uri.toString() + ")" : "");
}
}
}
#endif

40
src/IO/S3/URI.h Normal file
View File

@ -0,0 +1,40 @@
#pragma once
#include <string>
#include "config.h"
#if USE_AWS_S3
#include <Poco/URI.h>
namespace DB::S3
{
/**
* Represents S3 URI.
*
* The following patterns are allowed:
* s3://bucket/key
* http(s)://endpoint/bucket/key
*/
struct URI
{
Poco::URI uri;
// Custom endpoint if URI scheme is not S3.
std::string endpoint;
std::string bucket;
std::string key;
std::string version_id;
std::string storage_name;
bool is_virtual_hosted_style;
explicit URI(const std::string & uri_);
static void validateBucket(const std::string & bucket, const Poco::URI & uri);
};
}
#endif

View File

@ -9,15 +9,7 @@
#include <IO/SeekableReadBuffer.h>
#include <IO/StdStreamFromReadBuffer.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/AbortMultipartUploadRequest.h>
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/CreateMultipartUploadRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/UploadPartCopyRequest.h>
#include <aws/s3/model/UploadPartRequest.h>
#include <IO/S3/Requests.h>
namespace ProfileEvents
{
@ -54,7 +46,7 @@ namespace
{
public:
UploadHelper(
const std::shared_ptr<const Aws::S3::S3Client> & client_ptr_,
const std::shared_ptr<const S3::Client> & client_ptr_,
const String & dest_bucket_,
const String & dest_key_,
const S3Settings::RequestSettings & request_settings_,
@ -77,7 +69,7 @@ namespace
virtual ~UploadHelper() = default;
protected:
std::shared_ptr<const Aws::S3::S3Client> client_ptr;
std::shared_ptr<const S3::Client> client_ptr;
const String & dest_bucket;
const String & dest_key;
const S3Settings::RequestSettings & request_settings;
@ -108,7 +100,7 @@ namespace
void createMultipartUpload()
{
Aws::S3::Model::CreateMultipartUploadRequest request;
S3::CreateMultipartUploadRequest request;
request.SetBucket(dest_bucket);
request.SetKey(dest_key);
@ -147,7 +139,7 @@ namespace
if (part_tags.empty())
throw Exception(ErrorCodes::S3_ERROR, "Failed to complete multipart upload. No parts have uploaded");
Aws::S3::Model::CompleteMultipartUploadRequest request;
S3::CompleteMultipartUploadRequest request;
request.SetBucket(dest_bucket);
request.SetKey(dest_key);
request.SetUploadId(multipart_upload_id);
@ -194,7 +186,7 @@ namespace
void abortMultipartUpload()
{
LOG_TRACE(log, "Aborting multipart upload. Bucket: {}, Key: {}, Upload_id: {}", dest_bucket, dest_key, multipart_upload_id);
Aws::S3::Model::AbortMultipartUploadRequest abort_request;
S3::AbortMultipartUploadRequest abort_request;
abort_request.SetBucket(dest_bucket);
abort_request.SetKey(dest_key);
abort_request.SetUploadId(multipart_upload_id);
@ -404,7 +396,7 @@ namespace
const std::function<std::unique_ptr<SeekableReadBuffer>()> & create_read_buffer_,
size_t offset_,
size_t size_,
const std::shared_ptr<const Aws::S3::S3Client> & client_ptr_,
const std::shared_ptr<const S3::Client> & client_ptr_,
const String & dest_bucket_,
const String & dest_key_,
const S3Settings::RequestSettings & request_settings_,
@ -436,12 +428,12 @@ namespace
void performSinglepartUpload()
{
Aws::S3::Model::PutObjectRequest request;
S3::PutObjectRequest request;
fillPutRequest(request);
processPutRequest(request);
}
void fillPutRequest(Aws::S3::Model::PutObjectRequest & request)
void fillPutRequest(S3::PutObjectRequest & request)
{
auto read_buffer = std::make_unique<LimitSeekableReadBuffer>(create_read_buffer(), offset, size);
@ -461,7 +453,7 @@ namespace
request.SetContentType("binary/octet-stream");
}
void processPutRequest(const Aws::S3::Model::PutObjectRequest & request)
void processPutRequest(const S3::PutObjectRequest & request)
{
size_t max_retries = std::max(request_settings.max_unexpected_write_error_retries, 1UL);
for (size_t retries = 1;; ++retries)
@ -526,7 +518,7 @@ namespace
auto read_buffer = std::make_unique<LimitSeekableReadBuffer>(create_read_buffer(), part_offset, part_size);
/// Setup request.
auto request = std::make_unique<Aws::S3::Model::UploadPartRequest>();
auto request = std::make_unique<S3::UploadPartRequest>();
request->SetBucket(dest_bucket);
request->SetKey(dest_key);
request->SetPartNumber(static_cast<int>(part_number));
@ -542,7 +534,7 @@ namespace
String processUploadPartRequest(Aws::AmazonWebServiceRequest & request) override
{
auto & req = typeid_cast<Aws::S3::Model::UploadPartRequest &>(request);
auto & req = typeid_cast<S3::UploadPartRequest &>(request);
ProfileEvents::increment(ProfileEvents::S3UploadPart);
if (for_disk_s3)
@ -564,7 +556,7 @@ namespace
{
public:
CopyFileHelper(
const std::shared_ptr<const Aws::S3::S3Client> & client_ptr_,
const std::shared_ptr<const S3::Client> & client_ptr_,
const String & src_bucket_,
const String & src_key_,
size_t src_offset_,
@ -602,12 +594,12 @@ namespace
void performSingleOperationCopy()
{
Aws::S3::Model::CopyObjectRequest request;
S3::CopyObjectRequest request;
fillCopyRequest(request);
processCopyRequest(request);
}
void fillCopyRequest(Aws::S3::Model::CopyObjectRequest & request)
void fillCopyRequest(S3::CopyObjectRequest & request)
{
request.SetCopySource(src_bucket + "/" + src_key);
request.SetBucket(dest_bucket);
@ -627,7 +619,7 @@ namespace
request.SetContentType("binary/octet-stream");
}
void processCopyRequest(const Aws::S3::Model::CopyObjectRequest & request)
void processCopyRequest(const S3::CopyObjectRequest & request)
{
size_t max_retries = std::max(request_settings.max_unexpected_write_error_retries, 1UL);
for (size_t retries = 1;; ++retries)
@ -689,7 +681,7 @@ namespace
std::unique_ptr<Aws::AmazonWebServiceRequest> fillUploadPartRequest(size_t part_number, size_t part_offset, size_t part_size) override
{
auto request = std::make_unique<Aws::S3::Model::UploadPartCopyRequest>();
auto request = std::make_unique<S3::UploadPartCopyRequest>();
/// Make a copy request to copy a part.
request->SetCopySource(src_bucket + "/" + src_key);
@ -704,7 +696,7 @@ namespace
String processUploadPartRequest(Aws::AmazonWebServiceRequest & request) override
{
auto & req = typeid_cast<Aws::S3::Model::UploadPartCopyRequest &>(request);
auto & req = typeid_cast<S3::UploadPartCopyRequest &>(request);
ProfileEvents::increment(ProfileEvents::S3UploadPartCopy);
if (for_disk_s3)
@ -727,7 +719,7 @@ void copyDataToS3File(
const std::function<std::unique_ptr<SeekableReadBuffer>()> & create_read_buffer,
size_t offset,
size_t size,
const std::shared_ptr<const Aws::S3::S3Client> & dest_s3_client,
const std::shared_ptr<const S3::Client> & dest_s3_client,
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,
@ -741,7 +733,7 @@ void copyDataToS3File(
void copyS3File(
const std::shared_ptr<const Aws::S3::S3Client> & s3_client,
const std::shared_ptr<const S3::Client> & s3_client,
const String & src_bucket,
const String & src_key,
size_t src_offset,

View File

@ -7,7 +7,6 @@
#include <Storages/StorageS3Settings.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <base/types.h>
#include <aws/s3/S3Client.h>
#include <functional>
#include <memory>
@ -21,7 +20,7 @@ class SeekableReadBuffer;
/// however copyS3File() is faster and spends less network traffic and memory.
/// The parameters `src_offset` and `src_size` specify a part in the source to copy.
void copyS3File(
const std::shared_ptr<const Aws::S3::S3Client> & s3_client,
const std::shared_ptr<const S3::Client> & s3_client,
const String & src_bucket,
const String & src_key,
size_t src_offset,
@ -42,7 +41,7 @@ void copyDataToS3File(
const std::function<std::unique_ptr<SeekableReadBuffer>()> & create_read_buffer,
size_t offset,
size_t size,
const std::shared_ptr<const Aws::S3::S3Client> & dest_s3_client,
const std::shared_ptr<const S3::Client> & dest_s3_client,
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,

View File

@ -1,11 +1,6 @@
#include <IO/S3/getObjectInfo.h>
#if USE_AWS_S3
#include <aws/s3/S3Client.h>
#include <aws/s3/model/GetObjectAttributesRequest.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
namespace ErrorCodes
{
@ -30,13 +25,13 @@ namespace DB::S3
namespace
{
Aws::S3::Model::HeadObjectOutcome headObject(
const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
const S3::Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
{
ProfileEvents::increment(ProfileEvents::S3HeadObject);
if (for_disk_s3)
ProfileEvents::increment(ProfileEvents::DiskS3HeadObject);
Aws::S3::Model::HeadObjectRequest req;
S3::HeadObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
@ -46,93 +41,25 @@ namespace
return client.HeadObject(req);
}
Aws::S3::Model::GetObjectAttributesOutcome getObjectAttributes(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
{
ProfileEvents::increment(ProfileEvents::S3GetObjectAttributes);
if (for_disk_s3)
ProfileEvents::increment(ProfileEvents::DiskS3GetObjectAttributes);
Aws::S3::Model::GetObjectAttributesRequest req;
req.SetBucket(bucket);
req.SetKey(key);
if (!version_id.empty())
req.SetVersionId(version_id);
req.SetObjectAttributes({Aws::S3::Model::ObjectAttributes::ObjectSize});
return client.GetObjectAttributes(req);
}
Aws::S3::Model::GetObjectOutcome getObjectDummy(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
{
ProfileEvents::increment(ProfileEvents::S3GetObject);
if (for_disk_s3)
ProfileEvents::increment(ProfileEvents::DiskS3GetObject);
Aws::S3::Model::GetObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
if (!version_id.empty())
req.SetVersionId(version_id);
/// Only the first byte will be read.
/// We don't need that first byte but the range should be set otherwise the entire object will be read.
req.SetRange("bytes=0-0");
return client.GetObject(req);
}
/// Performs a request to get the size and last modification time of an object.
/// The function performs either HeadObject or GetObjectAttributes request depending on the endpoint.
std::pair<std::optional<ObjectInfo>, Aws::S3::S3Error> tryGetObjectInfo(
const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id,
const S3Settings::RequestSettings & request_settings, bool with_metadata, bool for_disk_s3)
const S3::Client & client, const String & bucket, const String & key, const String & version_id,
const S3Settings::RequestSettings & /*request_settings*/, bool with_metadata, bool for_disk_s3)
{
if (request_settings.allow_head_object_request)
{
auto outcome = headObject(client, bucket, key, version_id, for_disk_s3);
if (!outcome.IsSuccess())
return {std::nullopt, outcome.GetError()};
auto outcome = headObject(client, bucket, key, version_id, for_disk_s3);
if (!outcome.IsSuccess())
return {std::nullopt, outcome.GetError()};
const auto & result = outcome.GetResult();
ObjectInfo object_info;
object_info.size = static_cast<size_t>(result.GetContentLength());
object_info.last_modification_time = result.GetLastModified().Millis() / 1000;
const auto & result = outcome.GetResult();
ObjectInfo object_info;
object_info.size = static_cast<size_t>(result.GetContentLength());
object_info.last_modification_time = result.GetLastModified().Millis() / 1000;
if (with_metadata)
object_info.metadata = result.GetMetadata();
if (with_metadata)
object_info.metadata = result.GetMetadata();
return {object_info, {}};
}
else
{
ObjectInfo object_info;
{
auto outcome = getObjectAttributes(client, bucket, key, version_id, for_disk_s3);
if (!outcome.IsSuccess())
return {std::nullopt, outcome.GetError()};
const auto & result = outcome.GetResult();
object_info.size = static_cast<size_t>(result.GetObjectSize());
object_info.last_modification_time = result.GetLastModified().Millis() / 1000;
}
if (with_metadata)
{
auto outcome = getObjectDummy(client, bucket, key, version_id, for_disk_s3);
if (!outcome.IsSuccess())
return {std::nullopt, outcome.GetError()};
const auto & result = outcome.GetResult();
object_info.metadata = result.GetMetadata();
}
return {object_info, {}};
}
return {object_info, {}};
}
}
@ -143,7 +70,7 @@ bool isNotFoundError(Aws::S3::S3Errors error)
}
ObjectInfo getObjectInfo(
const Aws::S3::S3Client & client,
const S3::Client & client,
const String & bucket,
const String & key,
const String & version_id,
@ -167,7 +94,7 @@ ObjectInfo getObjectInfo(
}
size_t getObjectSize(
const Aws::S3::S3Client & client,
const S3::Client & client,
const String & bucket,
const String & key,
const String & version_id,
@ -179,7 +106,7 @@ size_t getObjectSize(
}
bool objectExists(
const Aws::S3::S3Client & client,
const S3::Client & client,
const String & bucket,
const String & key,
const String & version_id,
@ -199,7 +126,7 @@ bool objectExists(
}
void checkObjectExists(
const Aws::S3::S3Client & client,
const S3::Client & client,
const String & bucket,
const String & key,
const String & version_id,

View File

@ -5,7 +5,7 @@
#if USE_AWS_S3
#include <Storages/StorageS3Settings.h>
#include <base/types.h>
#include <aws/s3/S3Client.h>
#include <IO/S3/Client.h>
namespace DB::S3
@ -20,7 +20,7 @@ struct ObjectInfo
};
ObjectInfo getObjectInfo(
const Aws::S3::S3Client & client,
const S3::Client & client,
const String & bucket,
const String & key,
const String & version_id = {},
@ -30,7 +30,7 @@ ObjectInfo getObjectInfo(
bool throw_on_error = true);
size_t getObjectSize(
const Aws::S3::S3Client & client,
const S3::Client & client,
const String & bucket,
const String & key,
const String & version_id = {},
@ -39,7 +39,7 @@ size_t getObjectSize(
bool throw_on_error = true);
bool objectExists(
const Aws::S3::S3Client & client,
const S3::Client & client,
const String & bucket,
const String & key,
const String & version_id = {},
@ -48,7 +48,7 @@ bool objectExists(
/// Throws an exception if a specified object doesn't exist. `description` is used as a part of the error message.
void checkObjectExists(
const Aws::S3::S3Client & client,
const S3::Client & client,
const String & bucket,
const String & key,
const String & version_id = {},

View File

@ -19,13 +19,13 @@
#include <aws/core/client/CoreErrors.h>
#include <aws/core/client/RetryStrategy.h>
#include <aws/core/http/URI.h>
#include <aws/s3/S3Client.h>
#include <Common/RemoteHostFilter.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadSettings.h>
#include <IO/S3Common.h>
#include <IO/S3/Client.h>
#include <IO/HTTPHeaderEntries.h>
#include <Storages/StorageS3Settings.h>
@ -102,7 +102,7 @@ TEST(IOTestAwsS3Client, AppendExtraSSECHeaders)
bool use_environment_credentials = false;
bool use_insecure_imds_request = false;
std::shared_ptr<Aws::S3::S3Client> client = DB::S3::ClientFactory::instance().create(
std::shared_ptr<DB::S3::Client> client = DB::S3::ClientFactory::instance().create(
client_configuration,
uri.is_virtual_hosted_style,
access_key_id,

View File

@ -16,7 +16,6 @@
# include <aws/core/auth/AWSCredentialsProvider.h>
# include <aws/core/auth/AWSCredentialsProviderChain.h>
# include <aws/core/auth/STSCredentialsProvider.h>
# include <aws/core/client/DefaultRetryStrategy.h>
# include <aws/core/client/SpecifiedRetryableErrorsRetryStrategy.h>
# include <aws/core/platform/Environment.h>
# include <aws/core/platform/OSVersionInfo.h>
@ -26,16 +25,12 @@
# include <aws/core/utils/HashingUtils.h>
# include <aws/core/utils/UUID.h>
# include <aws/core/http/HttpClientFactory.h>
# include <aws/s3/S3Client.h>
# include <aws/s3/model/GetObjectAttributesRequest.h>
# include <aws/s3/model/GetObjectRequest.h>
# include <aws/s3/model/HeadObjectRequest.h>
# include <IO/S3/PocoHTTPClientFactory.h>
# include <IO/S3/PocoHTTPClient.h>
# include <Poco/URI.h>
# include <re2/re2.h>
# include <boost/algorithm/string/case_conv.hpp>
# include <IO/S3/Client.h>
# include <IO/S3/URI.h>
# include <IO/S3/Requests.h>
# include <Common/logger_useful.h>
# include <fstream>
@ -712,7 +707,6 @@ namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int S3_ERROR;
}
@ -738,7 +732,7 @@ namespace S3
return ret;
}
std::unique_ptr<Aws::S3::S3Client> ClientFactory::create( // NOLINT
std::unique_ptr<S3::Client> ClientFactory::create( // NOLINT
const PocoHTTPClientConfiguration & cfg_,
bool is_virtual_hosted_style,
const String & access_key_id,
@ -753,7 +747,7 @@ namespace S3
if (!server_side_encryption_customer_key_base64.empty())
{
/// See S3Client::GeneratePresignedUrlWithSSEC().
/// See Client::GeneratePresignedUrlWithSSEC().
headers.push_back({Aws::S3::SSEHeaders::SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
Aws::S3::Model::ServerSideEncryptionMapper::GetNameForServerSideEncryption(Aws::S3::Model::ServerSideEncryption::AES256)});
@ -776,7 +770,9 @@ namespace S3
use_environment_credentials,
use_insecure_imds_request);
return std::make_unique<Aws::S3::S3Client>(
client_configuration.retryStrategy = std::make_shared<Client::RetryStrategy>(std::move(client_configuration.retryStrategy));
return Client::create(
client_configuration.s3_max_redirects,
std::move(credentials_provider),
std::move(client_configuration), // Client configuration.
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
@ -802,100 +798,6 @@ namespace S3
get_request_throttler,
put_request_throttler);
}
URI::URI(const std::string & uri_)
{
/// Case when bucket name represented in domain name of S3 URL.
/// E.g. (https://bucket-name.s3.Region.amazonaws.com/key)
/// https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html#virtual-hosted-style-access
static const RE2 virtual_hosted_style_pattern(R"((.+)\.(s3|cos|obs|oss)([.\-][a-z0-9\-.:]+))");
/// Case when bucket name and key represented in path of S3 URL.
/// E.g. (https://s3.Region.amazonaws.com/bucket-name/key)
/// https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html#path-style-access
static const RE2 path_style_pattern("^/([^/]*)/(.*)");
static constexpr auto S3 = "S3";
static constexpr auto COSN = "COSN";
static constexpr auto COS = "COS";
static constexpr auto OBS = "OBS";
static constexpr auto OSS = "OSS";
uri = Poco::URI(uri_);
storage_name = S3;
if (uri.getHost().empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Host is empty in S3 URI.");
/// Extract object version ID from query string.
bool has_version_id = false;
for (const auto & [query_key, query_value] : uri.getQueryParameters())
if (query_key == "versionId")
{
version_id = query_value;
has_version_id = true;
}
/// Poco::URI will ignore '?' when parsing the path, but if there is a vestionId in the http parameter,
/// '?' can not be used as a wildcard, otherwise it will be ambiguous.
/// If no "vertionId" in the http parameter, '?' can be used as a wildcard.
/// It is necessary to encode '?' to avoid deletion during parsing path.
if (!has_version_id && uri_.find('?') != String::npos)
{
String uri_with_question_mark_encode;
Poco::URI::encode(uri_, "?", uri_with_question_mark_encode);
uri = Poco::URI(uri_with_question_mark_encode);
}
String name;
String endpoint_authority_from_uri;
if (re2::RE2::FullMatch(uri.getAuthority(), virtual_hosted_style_pattern, &bucket, &name, &endpoint_authority_from_uri))
{
is_virtual_hosted_style = true;
endpoint = uri.getScheme() + "://" + name + endpoint_authority_from_uri;
validateBucket(bucket, uri);
if (!uri.getPath().empty())
{
/// Remove leading '/' from path to extract key.
key = uri.getPath().substr(1);
}
boost::to_upper(name);
if (name != S3 && name != COS && name != OBS && name != OSS)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Object storage system name is unrecognized in virtual hosted style S3 URI: {}",
quoteString(name));
if (name == S3)
storage_name = name;
else if (name == OBS)
storage_name = OBS;
else if (name == OSS)
storage_name = OSS;
else
storage_name = COSN;
}
else if (re2::RE2::PartialMatch(uri.getPath(), path_style_pattern, &bucket, &key))
{
is_virtual_hosted_style = false;
endpoint = uri.getScheme() + "://" + uri.getAuthority();
validateBucket(bucket, uri);
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bucket or key name are invalid in S3 URI.");
}
void URI::validateBucket(const String & bucket, const Poco::URI & uri)
{
/// S3 specification requires at least 3 and at most 63 characters in bucket name.
/// https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-s3-bucket-naming-requirements.html
if (bucket.length() < 3 || bucket.length() > 63)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bucket name length is out of bounds in virtual hosted style S3 URI: {}{}",
quoteString(bucket), !uri.empty() ? " (" + uri.toString() + ")" : "");
}
}
}

View File

@ -14,12 +14,13 @@
#include <Common/Exception.h>
#include <Common/Throttler_fwd.h>
#include <Poco/URI.h>
#include <IO/S3/Client.h>
#include <IO/S3/URI.h>
#include <aws/core/Aws.h>
#include <aws/s3/S3Errors.h>
namespace Aws::S3 { class S3Client; }
namespace Aws::S3 { class Client; }
namespace DB
{
@ -60,7 +61,6 @@ private:
};
}
namespace DB::S3
{
@ -71,7 +71,7 @@ public:
static ClientFactory & instance();
std::unique_ptr<Aws::S3::S3Client> create(
std::unique_ptr<S3::Client> create(
const PocoHTTPClientConfiguration & cfg,
bool is_virtual_hosted_style,
const String & access_key_id,
@ -97,30 +97,6 @@ private:
std::atomic<bool> s3_requests_logging_enabled;
};
/**
* Represents S3 URI.
*
* The following patterns are allowed:
* s3://bucket/key
* http(s)://endpoint/bucket/key
*/
struct URI
{
Poco::URI uri;
// Custom endpoint if URI scheme is not S3.
String endpoint;
String bucket;
String key;
String version_id;
String storage_name;
bool is_virtual_hosted_style;
explicit URI(const std::string & uri_);
static void validateBucket(const String & bucket, const Poco::URI & uri);
};
}
#endif

View File

@ -9,15 +9,11 @@
#include <IO/WriteBufferFromS3.h>
#include <IO/WriteHelpers.h>
#include <IO/S3Common.h>
#include <IO/S3/Requests.h>
#include <IO/S3/getObjectInfo.h>
#include <Interpreters/Context.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/CreateMultipartUploadRequest.h>
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/UploadPartRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/StorageClass.h>
#include <utility>
@ -28,13 +24,11 @@ namespace ProfileEvents
extern const Event WriteBufferFromS3Bytes;
extern const Event S3WriteBytes;
extern const Event S3HeadObject;
extern const Event S3CreateMultipartUpload;
extern const Event S3CompleteMultipartUpload;
extern const Event S3UploadPart;
extern const Event S3PutObject;
extern const Event DiskS3HeadObject;
extern const Event DiskS3CreateMultipartUpload;
extern const Event DiskS3CompleteMultipartUpload;
extern const Event DiskS3UploadPart;
@ -59,7 +53,7 @@ namespace ErrorCodes
struct WriteBufferFromS3::UploadPartTask
{
Aws::S3::Model::UploadPartRequest req;
S3::UploadPartRequest req;
bool is_finished = false;
std::string tag;
std::exception_ptr exception;
@ -67,13 +61,13 @@ struct WriteBufferFromS3::UploadPartTask
struct WriteBufferFromS3::PutObjectTask
{
Aws::S3::Model::PutObjectRequest req;
S3::PutObjectRequest req;
bool is_finished = false;
std::exception_ptr exception;
};
WriteBufferFromS3::WriteBufferFromS3(
std::shared_ptr<const Aws::S3::S3Client> client_ptr_,
std::shared_ptr<const S3::Client> client_ptr_,
const String & bucket_,
const String & key_,
const S3Settings::RequestSettings & request_settings_,
@ -191,7 +185,7 @@ void WriteBufferFromS3::finalizeImpl()
void WriteBufferFromS3::createMultipartUpload()
{
Aws::S3::Model::CreateMultipartUploadRequest req;
DB::S3::CreateMultipartUploadRequest req;
req.SetBucket(bucket);
req.SetKey(key);
@ -298,7 +292,7 @@ void WriteBufferFromS3::writePart()
}
}
void WriteBufferFromS3::fillUploadRequest(Aws::S3::Model::UploadPartRequest & req)
void WriteBufferFromS3::fillUploadRequest(S3::UploadPartRequest & req)
{
/// Increase part number.
++part_number;
@ -369,7 +363,7 @@ void WriteBufferFromS3::completeMultipartUpload()
if (tags.empty())
throw Exception(ErrorCodes::S3_ERROR, "Failed to complete multipart upload. No parts have uploaded");
Aws::S3::Model::CompleteMultipartUploadRequest req;
S3::CompleteMultipartUploadRequest req;
req.SetBucket(bucket);
req.SetKey(key);
req.SetUploadId(multipart_upload_id);
@ -474,7 +468,7 @@ void WriteBufferFromS3::makeSinglepartUpload()
}
}
void WriteBufferFromS3::fillPutRequest(Aws::S3::Model::PutObjectRequest & req)
void WriteBufferFromS3::fillPutRequest(S3::PutObjectRequest & req)
{
req.SetBucket(bucket);
req.SetKey(key);

View File

@ -14,6 +14,7 @@
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteSettings.h>
#include <IO/S3/Requests.h>
#include <Storages/StorageS3Settings.h>
#include <Interpreters/threadPoolCallbackRunner.h>
@ -22,13 +23,7 @@
namespace Aws::S3
{
class S3Client;
}
namespace Aws::S3::Model
{
class UploadPartRequest;
class PutObjectRequest;
class Client;
}
namespace DB
@ -47,7 +42,7 @@ class WriteBufferFromS3 final : public BufferWithOwnMemory<WriteBuffer>
{
public:
WriteBufferFromS3(
std::shared_ptr<const Aws::S3::S3Client> client_ptr_,
std::shared_ptr<const S3::Client> client_ptr_,
const String & bucket_,
const String & key_,
const S3Settings::RequestSettings & request_settings_,
@ -75,11 +70,11 @@ private:
void finalizeImpl() override;
struct UploadPartTask;
void fillUploadRequest(Aws::S3::Model::UploadPartRequest & req);
void fillUploadRequest(S3::UploadPartRequest & req);
void processUploadRequest(UploadPartTask & task);
struct PutObjectTask;
void fillPutRequest(Aws::S3::Model::PutObjectRequest & req);
void fillPutRequest(S3::PutObjectRequest & req);
void processPutRequest(const PutObjectTask & task);
void waitForReadyBackGroundTasks();
@ -90,7 +85,7 @@ private:
const String key;
const S3Settings::RequestSettings request_settings;
const S3Settings::RequestSettings::PartUploadSettings & upload_settings;
const std::shared_ptr<const Aws::S3::S3Client> client_ptr;
const std::shared_ptr<const S3::Client> client_ptr;
const std::optional<std::map<String, String>> object_metadata;
size_t upload_part_size = 0;

View File

@ -58,6 +58,10 @@
#include <algorithm>
#include <unistd.h>
#if USE_AWS_S3
#include <IO/S3/Client.h>
#endif
#include "config.h"
namespace DB
@ -338,6 +342,13 @@ BlockIO InterpreterSystemQuery::execute()
cache->reset();
break;
#endif
#if USE_AWS_S3
case Type::DROP_S3_CLIENT_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_S3_CLIENT_CACHE);
S3::ClientCacheRegistry::instance().clearCacheForAll();
break;
#endif
case Type::DROP_FILESYSTEM_CACHE:
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_FILESYSTEM_CACHE);
@ -978,6 +989,9 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
case Type::DROP_FILESYSTEM_CACHE:
case Type::DROP_SCHEMA_CACHE:
#if USE_AWS_S3
case Type::DROP_S3_CLIENT_CACHE:
#endif
{
required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE);
break;

View File

@ -31,6 +31,9 @@ public:
#endif
DROP_FILESYSTEM_CACHE,
DROP_SCHEMA_CACHE,
#if USE_AWS_S3
DROP_S3_CLIENT_CACHE,
#endif
STOP_LISTEN_QUERIES,
START_LISTEN_QUERIES,
RESTART_REPLICAS,

View File

@ -8,6 +8,7 @@
#include <IO/ReadHelpers.h>
#include <IO/ReadSettings.h>
#include <IO/S3Common.h>
#include <IO/S3/Requests.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/StorageFactory.h>
@ -16,8 +17,6 @@
#include <Formats/FormatFactory.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <QueryPipeline/Pipe.h>
@ -101,7 +100,7 @@ std::vector<String> JsonMetadataGetter::getJsonLogFiles()
const auto & client = base_configuration.client;
Aws::S3::Model::ListObjectsV2Request request;
S3::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
bool is_finished{false};

View File

@ -15,11 +15,6 @@ namespace Poco
class Logger;
}
namespace Aws::S3
{
class S3Client;
}
namespace DB
{

View File

@ -7,12 +7,11 @@
#include <Formats/FormatFactory.h>
#include <IO/S3Common.h>
#include <IO/S3/Requests.h>
#include <IO/ReadHelpers.h>
#include <Storages/StorageFactory.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <QueryPipeline/Pipe.h>
@ -98,7 +97,7 @@ std::vector<std::string> getKeysFromS3(const StorageS3::S3Configuration & base_c
const auto & client = base_configuration.client;
Aws::S3::Model::ListObjectsV2Request request;
S3::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
bool is_finished{false};

View File

@ -12,11 +12,6 @@ namespace Poco
class Logger;
}
namespace Aws::S3
{
class S3Client;
}
namespace DB
{

View File

@ -11,6 +11,7 @@
#include <Functions/FunctionsConversion.h>
#include <IO/S3Common.h>
#include <IO/S3/Requests.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/evaluateConstantExpression.h>
@ -51,10 +52,6 @@
#include <DataTypes/DataTypeString.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <Common/parseGlobs.h>
#include <Common/quoteString.h>
@ -136,7 +133,7 @@ class StorageS3Source::DisclosedGlobIterator::Impl : WithContext
{
public:
Impl(
const Aws::S3::S3Client & client_,
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr & query_,
const Block & virtual_header_,
@ -145,7 +142,7 @@ public:
Strings * read_keys_,
const S3Settings::RequestSettings & request_settings_)
: WithContext(context_)
, client(client_)
, client(S3::Client::create(client_))
, globbed_uri(globbed_uri_)
, query(query_)
, virtual_header(virtual_header_)
@ -349,7 +346,7 @@ private:
return list_objects_scheduler([this]
{
ProfileEvents::increment(ProfileEvents::S3ListObjects);
auto outcome = client.ListObjectsV2(request);
auto outcome = client->ListObjectsV2(request);
/// Outcome failure will be handled on the caller side.
if (outcome.IsSuccess())
@ -364,7 +361,7 @@ private:
KeysWithInfo buffer;
KeysWithInfo::iterator buffer_iter;
Aws::S3::S3Client client;
std::unique_ptr<S3::Client> client;
S3::URI globbed_uri;
ASTPtr query;
Block virtual_header;
@ -376,7 +373,7 @@ private:
ObjectInfos * object_infos;
Strings * read_keys;
Aws::S3::Model::ListObjectsV2Request request;
S3::ListObjectsV2Request request;
S3Settings::RequestSettings request_settings;
ThreadPool list_objects_pool;
@ -386,7 +383,7 @@ private:
};
StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
const Aws::S3::S3Client & client_,
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const Block & virtual_header,
@ -412,7 +409,7 @@ class StorageS3Source::KeysIterator::Impl : WithContext
{
public:
explicit Impl(
const Aws::S3::S3Client & client_,
const S3::Client & client_,
const std::string & version_id_,
const std::vector<String> & keys_,
const String & bucket_,
@ -507,7 +504,7 @@ private:
};
StorageS3Source::KeysIterator::KeysIterator(
const Aws::S3::S3Client & client_,
const S3::Client & client_,
const std::string & version_id_,
const std::vector<String> & keys_,
const String & bucket_,
@ -552,7 +549,7 @@ StorageS3Source::StorageS3Source(
UInt64 max_block_size_,
const S3Settings::RequestSettings & request_settings_,
String compression_hint_,
const std::shared_ptr<const Aws::S3::S3Client> & client_,
const std::shared_ptr<const S3::Client> & client_,
const String & bucket_,
const String & version_id_,
std::shared_ptr<IIterator> file_iterator_,
@ -1201,7 +1198,7 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
}
ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
Aws::S3::Model::DeleteObjectsRequest request;
S3::DeleteObjectsRequest request;
request.SetBucket(s3_configuration.uri.bucket);
request.SetDelete(delkeys);
@ -1211,6 +1208,9 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
const auto & err = response.GetError();
throw Exception(ErrorCodes::S3_ERROR, "{}: {}", std::to_string(static_cast<int>(err.GetErrorType())), err.GetMessage());
}
for (const auto & error : response.GetResult().GetErrors())
LOG_WARNING(&Poco::Logger::get("StorageS3"), "Failed to delete {}, error: {}", error.GetKey(), error.GetMessage());
}

View File

@ -24,7 +24,7 @@
namespace Aws::S3
{
class S3Client;
class Client;
}
namespace DB
@ -66,7 +66,7 @@ public:
{
public:
DisclosedGlobIterator(
const Aws::S3::S3Client & client_,
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const Block & virtual_header,
@ -88,7 +88,7 @@ public:
{
public:
explicit KeysIterator(
const Aws::S3::S3Client & client_,
const S3::Client & client_,
const std::string & version_id_,
const std::vector<String> & keys_,
const String & bucket_,
@ -134,7 +134,7 @@ public:
UInt64 max_block_size_,
const S3Settings::RequestSettings & request_settings_,
String compression_hint_,
const std::shared_ptr<const Aws::S3::S3Client> & client_,
const std::shared_ptr<const S3::Client> & client_,
const String & bucket,
const String & version_id,
std::shared_ptr<IIterator> file_iterator_,
@ -155,7 +155,7 @@ private:
UInt64 max_block_size;
S3Settings::RequestSettings request_settings;
String compression_hint;
std::shared_ptr<const Aws::S3::S3Client> client;
std::shared_ptr<const S3::Client> client;
Block sample_block;
std::optional<FormatSettings> format_settings;
@ -287,7 +287,7 @@ public:
struct S3Configuration
{
const S3::URI uri;
std::shared_ptr<const Aws::S3::S3Client> client;
std::shared_ptr<const S3::Client> client;
S3::AuthSettings auth_settings;
S3Settings::RequestSettings request_settings;

View File

@ -33,8 +33,6 @@
#include <Common/logger_useful.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <memory>
#include <string>

View File

@ -100,6 +100,7 @@ SYSTEM DROP QUERY CACHE ['SYSTEM DROP QUERY','DROP QUERY CACHE','DROP QUERY'] GL
SYSTEM DROP COMPILED EXPRESSION CACHE ['SYSTEM DROP COMPILED EXPRESSION','DROP COMPILED EXPRESSION CACHE','DROP COMPILED EXPRESSIONS'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP FILESYSTEM CACHE ['SYSTEM DROP FILESYSTEM CACHE','DROP FILESYSTEM CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP SCHEMA CACHE ['SYSTEM DROP SCHEMA CACHE','DROP SCHEMA CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP S3 CLIENT CACHE ['SYSTEM DROP S3 CLIENT','DROP S3 CLIENT CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP CACHE ['DROP CACHE'] \N SYSTEM
SYSTEM RELOAD CONFIG ['RELOAD CONFIG'] GLOBAL SYSTEM RELOAD
SYSTEM RELOAD USERS ['RELOAD USERS'] GLOBAL SYSTEM RELOAD

View File

@ -288,7 +288,7 @@ CREATE TABLE system.grants
(
`user_name` Nullable(String),
`role_name` Nullable(String),
`access_type` Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW FILESYSTEM CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER NAMED COLLECTION' = 41, 'ALTER TABLE' = 42, 'ALTER DATABASE' = 43, 'ALTER VIEW REFRESH' = 44, 'ALTER VIEW MODIFY QUERY' = 45, 'ALTER VIEW' = 46, 'ALTER' = 47, 'CREATE DATABASE' = 48, 'CREATE TABLE' = 49, 'CREATE VIEW' = 50, 'CREATE DICTIONARY' = 51, 'CREATE TEMPORARY TABLE' = 52, 'CREATE FUNCTION' = 53, 'CREATE NAMED COLLECTION' = 54, 'CREATE' = 55, 'DROP DATABASE' = 56, 'DROP TABLE' = 57, 'DROP VIEW' = 58, 'DROP DICTIONARY' = 59, 'DROP FUNCTION' = 60, 'DROP NAMED COLLECTION' = 61, 'DROP' = 62, 'TRUNCATE' = 63, 'OPTIMIZE' = 64, 'BACKUP' = 65, 'KILL QUERY' = 66, 'KILL TRANSACTION' = 67, 'MOVE PARTITION BETWEEN SHARDS' = 68, 'CREATE USER' = 69, 'ALTER USER' = 70, 'DROP USER' = 71, 'CREATE ROLE' = 72, 'ALTER ROLE' = 73, 'DROP ROLE' = 74, 'ROLE ADMIN' = 75, 'CREATE ROW POLICY' = 76, 'ALTER ROW POLICY' = 77, 'DROP ROW POLICY' = 78, 'CREATE QUOTA' = 79, 'ALTER QUOTA' = 80, 'DROP QUOTA' = 81, 'CREATE SETTINGS PROFILE' = 82, 'ALTER SETTINGS PROFILE' = 83, 'DROP SETTINGS PROFILE' = 84, 'SHOW USERS' = 85, 'SHOW ROLES' = 86, 'SHOW ROW POLICIES' = 87, 'SHOW QUOTAS' = 88, 'SHOW SETTINGS PROFILES' = 89, 'SHOW ACCESS' = 90, 'SHOW NAMED COLLECTIONS' = 91, 'ACCESS MANAGEMENT' = 92, 'SYSTEM SHUTDOWN' = 93, 'SYSTEM DROP DNS CACHE' = 94, 'SYSTEM DROP MARK CACHE' = 95, 'SYSTEM DROP UNCOMPRESSED CACHE' = 96, 'SYSTEM DROP MMAP CACHE' = 97, 'SYSTEM DROP QUERY CACHE' = 98, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 99, 'SYSTEM DROP FILESYSTEM CACHE' = 100, 'SYSTEM DROP SCHEMA CACHE' = 101, 'SYSTEM DROP CACHE' = 102, 'SYSTEM RELOAD CONFIG' = 103, 'SYSTEM RELOAD USERS' = 104, 'SYSTEM RELOAD SYMBOLS' = 105, 'SYSTEM RELOAD DICTIONARY' = 106, 'SYSTEM RELOAD MODEL' = 107, 'SYSTEM RELOAD FUNCTION' = 108, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 109, 'SYSTEM RELOAD' = 110, 'SYSTEM RESTART DISK' = 111, 'SYSTEM MERGES' = 112, 'SYSTEM TTL MERGES' = 113, 'SYSTEM FETCHES' = 114, 'SYSTEM MOVES' = 115, 'SYSTEM DISTRIBUTED SENDS' = 116, 'SYSTEM REPLICATED SENDS' = 117, 'SYSTEM SENDS' = 118, 'SYSTEM REPLICATION QUEUES' = 119, 'SYSTEM DROP REPLICA' = 120, 'SYSTEM SYNC REPLICA' = 121, 'SYSTEM RESTART REPLICA' = 122, 'SYSTEM RESTORE REPLICA' = 123, 'SYSTEM WAIT LOADING PARTS' = 124, 'SYSTEM SYNC DATABASE REPLICA' = 125, 'SYSTEM SYNC TRANSACTION LOG' = 126, 'SYSTEM SYNC FILE CACHE' = 127, 'SYSTEM FLUSH DISTRIBUTED' = 128, 'SYSTEM FLUSH LOGS' = 129, 'SYSTEM FLUSH' = 130, 'SYSTEM THREAD FUZZER' = 131, 'SYSTEM UNFREEZE' = 132, 'SYSTEM' = 133, 'dictGet' = 134, 'addressToLine' = 135, 'addressToLineWithInlines' = 136, 'addressToSymbol' = 137, 'demangle' = 138, 'INTROSPECTION' = 139, 'FILE' = 140, 'URL' = 141, 'REMOTE' = 142, 'MONGO' = 143, 'MEILISEARCH' = 144, 'MYSQL' = 145, 'POSTGRES' = 146, 'SQLITE' = 147, 'ODBC' = 148, 'JDBC' = 149, 'HDFS' = 150, 'S3' = 151, 'HIVE' = 152, 'SOURCES' = 153, 'CLUSTER' = 154, 'ALL' = 155, 'NONE' = 156),
`access_type` Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW FILESYSTEM CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER NAMED COLLECTION' = 41, 'ALTER TABLE' = 42, 'ALTER DATABASE' = 43, 'ALTER VIEW REFRESH' = 44, 'ALTER VIEW MODIFY QUERY' = 45, 'ALTER VIEW' = 46, 'ALTER' = 47, 'CREATE DATABASE' = 48, 'CREATE TABLE' = 49, 'CREATE VIEW' = 50, 'CREATE DICTIONARY' = 51, 'CREATE TEMPORARY TABLE' = 52, 'CREATE FUNCTION' = 53, 'CREATE NAMED COLLECTION' = 54, 'CREATE' = 55, 'DROP DATABASE' = 56, 'DROP TABLE' = 57, 'DROP VIEW' = 58, 'DROP DICTIONARY' = 59, 'DROP FUNCTION' = 60, 'DROP NAMED COLLECTION' = 61, 'DROP' = 62, 'TRUNCATE' = 63, 'OPTIMIZE' = 64, 'BACKUP' = 65, 'KILL QUERY' = 66, 'KILL TRANSACTION' = 67, 'MOVE PARTITION BETWEEN SHARDS' = 68, 'CREATE USER' = 69, 'ALTER USER' = 70, 'DROP USER' = 71, 'CREATE ROLE' = 72, 'ALTER ROLE' = 73, 'DROP ROLE' = 74, 'ROLE ADMIN' = 75, 'CREATE ROW POLICY' = 76, 'ALTER ROW POLICY' = 77, 'DROP ROW POLICY' = 78, 'CREATE QUOTA' = 79, 'ALTER QUOTA' = 80, 'DROP QUOTA' = 81, 'CREATE SETTINGS PROFILE' = 82, 'ALTER SETTINGS PROFILE' = 83, 'DROP SETTINGS PROFILE' = 84, 'SHOW USERS' = 85, 'SHOW ROLES' = 86, 'SHOW ROW POLICIES' = 87, 'SHOW QUOTAS' = 88, 'SHOW SETTINGS PROFILES' = 89, 'SHOW ACCESS' = 90, 'SHOW NAMED COLLECTIONS' = 91, 'ACCESS MANAGEMENT' = 92, 'SYSTEM SHUTDOWN' = 93, 'SYSTEM DROP DNS CACHE' = 94, 'SYSTEM DROP MARK CACHE' = 95, 'SYSTEM DROP UNCOMPRESSED CACHE' = 96, 'SYSTEM DROP MMAP CACHE' = 97, 'SYSTEM DROP QUERY CACHE' = 98, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 99, 'SYSTEM DROP FILESYSTEM CACHE' = 100, 'SYSTEM DROP SCHEMA CACHE' = 101, 'SYSTEM DROP S3 CLIENT CACHE' = 102, 'SYSTEM DROP CACHE' = 103, 'SYSTEM RELOAD CONFIG' = 104, 'SYSTEM RELOAD USERS' = 105, 'SYSTEM RELOAD SYMBOLS' = 106, 'SYSTEM RELOAD DICTIONARY' = 107, 'SYSTEM RELOAD MODEL' = 108, 'SYSTEM RELOAD FUNCTION' = 109, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 110, 'SYSTEM RELOAD' = 111, 'SYSTEM RESTART DISK' = 112, 'SYSTEM MERGES' = 113, 'SYSTEM TTL MERGES' = 114, 'SYSTEM FETCHES' = 115, 'SYSTEM MOVES' = 116, 'SYSTEM DISTRIBUTED SENDS' = 117, 'SYSTEM REPLICATED SENDS' = 118, 'SYSTEM SENDS' = 119, 'SYSTEM REPLICATION QUEUES' = 120, 'SYSTEM DROP REPLICA' = 121, 'SYSTEM SYNC REPLICA' = 122, 'SYSTEM RESTART REPLICA' = 123, 'SYSTEM RESTORE REPLICA' = 124, 'SYSTEM WAIT LOADING PARTS' = 125, 'SYSTEM SYNC DATABASE REPLICA' = 126, 'SYSTEM SYNC TRANSACTION LOG' = 127, 'SYSTEM SYNC FILE CACHE' = 128, 'SYSTEM FLUSH DISTRIBUTED' = 129, 'SYSTEM FLUSH LOGS' = 130, 'SYSTEM FLUSH' = 131, 'SYSTEM THREAD FUZZER' = 132, 'SYSTEM UNFREEZE' = 133, 'SYSTEM' = 134, 'dictGet' = 135, 'addressToLine' = 136, 'addressToLineWithInlines' = 137, 'addressToSymbol' = 138, 'demangle' = 139, 'INTROSPECTION' = 140, 'FILE' = 141, 'URL' = 142, 'REMOTE' = 143, 'MONGO' = 144, 'MEILISEARCH' = 145, 'MYSQL' = 146, 'POSTGRES' = 147, 'SQLITE' = 148, 'ODBC' = 149, 'JDBC' = 150, 'HDFS' = 151, 'S3' = 152, 'HIVE' = 153, 'SOURCES' = 154, 'CLUSTER' = 155, 'ALL' = 156, 'NONE' = 157),
`database` Nullable(String),
`table` Nullable(String),
`column` Nullable(String),
@ -569,10 +569,10 @@ ENGINE = SystemPartsColumns
COMMENT 'SYSTEM TABLE is built on the fly.'
CREATE TABLE system.privileges
(
`privilege` Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW FILESYSTEM CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER NAMED COLLECTION' = 41, 'ALTER TABLE' = 42, 'ALTER DATABASE' = 43, 'ALTER VIEW REFRESH' = 44, 'ALTER VIEW MODIFY QUERY' = 45, 'ALTER VIEW' = 46, 'ALTER' = 47, 'CREATE DATABASE' = 48, 'CREATE TABLE' = 49, 'CREATE VIEW' = 50, 'CREATE DICTIONARY' = 51, 'CREATE TEMPORARY TABLE' = 52, 'CREATE FUNCTION' = 53, 'CREATE NAMED COLLECTION' = 54, 'CREATE' = 55, 'DROP DATABASE' = 56, 'DROP TABLE' = 57, 'DROP VIEW' = 58, 'DROP DICTIONARY' = 59, 'DROP FUNCTION' = 60, 'DROP NAMED COLLECTION' = 61, 'DROP' = 62, 'TRUNCATE' = 63, 'OPTIMIZE' = 64, 'BACKUP' = 65, 'KILL QUERY' = 66, 'KILL TRANSACTION' = 67, 'MOVE PARTITION BETWEEN SHARDS' = 68, 'CREATE USER' = 69, 'ALTER USER' = 70, 'DROP USER' = 71, 'CREATE ROLE' = 72, 'ALTER ROLE' = 73, 'DROP ROLE' = 74, 'ROLE ADMIN' = 75, 'CREATE ROW POLICY' = 76, 'ALTER ROW POLICY' = 77, 'DROP ROW POLICY' = 78, 'CREATE QUOTA' = 79, 'ALTER QUOTA' = 80, 'DROP QUOTA' = 81, 'CREATE SETTINGS PROFILE' = 82, 'ALTER SETTINGS PROFILE' = 83, 'DROP SETTINGS PROFILE' = 84, 'SHOW USERS' = 85, 'SHOW ROLES' = 86, 'SHOW ROW POLICIES' = 87, 'SHOW QUOTAS' = 88, 'SHOW SETTINGS PROFILES' = 89, 'SHOW ACCESS' = 90, 'SHOW NAMED COLLECTIONS' = 91, 'ACCESS MANAGEMENT' = 92, 'SYSTEM SHUTDOWN' = 93, 'SYSTEM DROP DNS CACHE' = 94, 'SYSTEM DROP MARK CACHE' = 95, 'SYSTEM DROP UNCOMPRESSED CACHE' = 96, 'SYSTEM DROP MMAP CACHE' = 97, 'SYSTEM DROP QUERY CACHE' = 98, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 99, 'SYSTEM DROP FILESYSTEM CACHE' = 100, 'SYSTEM DROP SCHEMA CACHE' = 101, 'SYSTEM DROP CACHE' = 102, 'SYSTEM RELOAD CONFIG' = 103, 'SYSTEM RELOAD USERS' = 104, 'SYSTEM RELOAD SYMBOLS' = 105, 'SYSTEM RELOAD DICTIONARY' = 106, 'SYSTEM RELOAD MODEL' = 107, 'SYSTEM RELOAD FUNCTION' = 108, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 109, 'SYSTEM RELOAD' = 110, 'SYSTEM RESTART DISK' = 111, 'SYSTEM MERGES' = 112, 'SYSTEM TTL MERGES' = 113, 'SYSTEM FETCHES' = 114, 'SYSTEM MOVES' = 115, 'SYSTEM DISTRIBUTED SENDS' = 116, 'SYSTEM REPLICATED SENDS' = 117, 'SYSTEM SENDS' = 118, 'SYSTEM REPLICATION QUEUES' = 119, 'SYSTEM DROP REPLICA' = 120, 'SYSTEM SYNC REPLICA' = 121, 'SYSTEM RESTART REPLICA' = 122, 'SYSTEM RESTORE REPLICA' = 123, 'SYSTEM WAIT LOADING PARTS' = 124, 'SYSTEM SYNC DATABASE REPLICA' = 125, 'SYSTEM SYNC TRANSACTION LOG' = 126, 'SYSTEM SYNC FILE CACHE' = 127, 'SYSTEM FLUSH DISTRIBUTED' = 128, 'SYSTEM FLUSH LOGS' = 129, 'SYSTEM FLUSH' = 130, 'SYSTEM THREAD FUZZER' = 131, 'SYSTEM UNFREEZE' = 132, 'SYSTEM' = 133, 'dictGet' = 134, 'addressToLine' = 135, 'addressToLineWithInlines' = 136, 'addressToSymbol' = 137, 'demangle' = 138, 'INTROSPECTION' = 139, 'FILE' = 140, 'URL' = 141, 'REMOTE' = 142, 'MONGO' = 143, 'MEILISEARCH' = 144, 'MYSQL' = 145, 'POSTGRES' = 146, 'SQLITE' = 147, 'ODBC' = 148, 'JDBC' = 149, 'HDFS' = 150, 'S3' = 151, 'HIVE' = 152, 'SOURCES' = 153, 'CLUSTER' = 154, 'ALL' = 155, 'NONE' = 156),
`privilege` Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW FILESYSTEM CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER NAMED COLLECTION' = 41, 'ALTER TABLE' = 42, 'ALTER DATABASE' = 43, 'ALTER VIEW REFRESH' = 44, 'ALTER VIEW MODIFY QUERY' = 45, 'ALTER VIEW' = 46, 'ALTER' = 47, 'CREATE DATABASE' = 48, 'CREATE TABLE' = 49, 'CREATE VIEW' = 50, 'CREATE DICTIONARY' = 51, 'CREATE TEMPORARY TABLE' = 52, 'CREATE FUNCTION' = 53, 'CREATE NAMED COLLECTION' = 54, 'CREATE' = 55, 'DROP DATABASE' = 56, 'DROP TABLE' = 57, 'DROP VIEW' = 58, 'DROP DICTIONARY' = 59, 'DROP FUNCTION' = 60, 'DROP NAMED COLLECTION' = 61, 'DROP' = 62, 'TRUNCATE' = 63, 'OPTIMIZE' = 64, 'BACKUP' = 65, 'KILL QUERY' = 66, 'KILL TRANSACTION' = 67, 'MOVE PARTITION BETWEEN SHARDS' = 68, 'CREATE USER' = 69, 'ALTER USER' = 70, 'DROP USER' = 71, 'CREATE ROLE' = 72, 'ALTER ROLE' = 73, 'DROP ROLE' = 74, 'ROLE ADMIN' = 75, 'CREATE ROW POLICY' = 76, 'ALTER ROW POLICY' = 77, 'DROP ROW POLICY' = 78, 'CREATE QUOTA' = 79, 'ALTER QUOTA' = 80, 'DROP QUOTA' = 81, 'CREATE SETTINGS PROFILE' = 82, 'ALTER SETTINGS PROFILE' = 83, 'DROP SETTINGS PROFILE' = 84, 'SHOW USERS' = 85, 'SHOW ROLES' = 86, 'SHOW ROW POLICIES' = 87, 'SHOW QUOTAS' = 88, 'SHOW SETTINGS PROFILES' = 89, 'SHOW ACCESS' = 90, 'SHOW NAMED COLLECTIONS' = 91, 'ACCESS MANAGEMENT' = 92, 'SYSTEM SHUTDOWN' = 93, 'SYSTEM DROP DNS CACHE' = 94, 'SYSTEM DROP MARK CACHE' = 95, 'SYSTEM DROP UNCOMPRESSED CACHE' = 96, 'SYSTEM DROP MMAP CACHE' = 97, 'SYSTEM DROP QUERY CACHE' = 98, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 99, 'SYSTEM DROP FILESYSTEM CACHE' = 100, 'SYSTEM DROP SCHEMA CACHE' = 101, 'SYSTEM DROP S3 CLIENT CACHE' = 102, 'SYSTEM DROP CACHE' = 103, 'SYSTEM RELOAD CONFIG' = 104, 'SYSTEM RELOAD USERS' = 105, 'SYSTEM RELOAD SYMBOLS' = 106, 'SYSTEM RELOAD DICTIONARY' = 107, 'SYSTEM RELOAD MODEL' = 108, 'SYSTEM RELOAD FUNCTION' = 109, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 110, 'SYSTEM RELOAD' = 111, 'SYSTEM RESTART DISK' = 112, 'SYSTEM MERGES' = 113, 'SYSTEM TTL MERGES' = 114, 'SYSTEM FETCHES' = 115, 'SYSTEM MOVES' = 116, 'SYSTEM DISTRIBUTED SENDS' = 117, 'SYSTEM REPLICATED SENDS' = 118, 'SYSTEM SENDS' = 119, 'SYSTEM REPLICATION QUEUES' = 120, 'SYSTEM DROP REPLICA' = 121, 'SYSTEM SYNC REPLICA' = 122, 'SYSTEM RESTART REPLICA' = 123, 'SYSTEM RESTORE REPLICA' = 124, 'SYSTEM WAIT LOADING PARTS' = 125, 'SYSTEM SYNC DATABASE REPLICA' = 126, 'SYSTEM SYNC TRANSACTION LOG' = 127, 'SYSTEM SYNC FILE CACHE' = 128, 'SYSTEM FLUSH DISTRIBUTED' = 129, 'SYSTEM FLUSH LOGS' = 130, 'SYSTEM FLUSH' = 131, 'SYSTEM THREAD FUZZER' = 132, 'SYSTEM UNFREEZE' = 133, 'SYSTEM' = 134, 'dictGet' = 135, 'addressToLine' = 136, 'addressToLineWithInlines' = 137, 'addressToSymbol' = 138, 'demangle' = 139, 'INTROSPECTION' = 140, 'FILE' = 141, 'URL' = 142, 'REMOTE' = 143, 'MONGO' = 144, 'MEILISEARCH' = 145, 'MYSQL' = 146, 'POSTGRES' = 147, 'SQLITE' = 148, 'ODBC' = 149, 'JDBC' = 150, 'HDFS' = 151, 'S3' = 152, 'HIVE' = 153, 'SOURCES' = 154, 'CLUSTER' = 155, 'ALL' = 156, 'NONE' = 157),
`aliases` Array(String),
`level` Nullable(Enum8('GLOBAL' = 0, 'DATABASE' = 1, 'TABLE' = 2, 'DICTIONARY' = 3, 'VIEW' = 4, 'COLUMN' = 5)),
`parent_group` Nullable(Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW FILESYSTEM CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER NAMED COLLECTION' = 41, 'ALTER TABLE' = 42, 'ALTER DATABASE' = 43, 'ALTER VIEW REFRESH' = 44, 'ALTER VIEW MODIFY QUERY' = 45, 'ALTER VIEW' = 46, 'ALTER' = 47, 'CREATE DATABASE' = 48, 'CREATE TABLE' = 49, 'CREATE VIEW' = 50, 'CREATE DICTIONARY' = 51, 'CREATE TEMPORARY TABLE' = 52, 'CREATE FUNCTION' = 53, 'CREATE NAMED COLLECTION' = 54, 'CREATE' = 55, 'DROP DATABASE' = 56, 'DROP TABLE' = 57, 'DROP VIEW' = 58, 'DROP DICTIONARY' = 59, 'DROP FUNCTION' = 60, 'DROP NAMED COLLECTION' = 61, 'DROP' = 62, 'TRUNCATE' = 63, 'OPTIMIZE' = 64, 'BACKUP' = 65, 'KILL QUERY' = 66, 'KILL TRANSACTION' = 67, 'MOVE PARTITION BETWEEN SHARDS' = 68, 'CREATE USER' = 69, 'ALTER USER' = 70, 'DROP USER' = 71, 'CREATE ROLE' = 72, 'ALTER ROLE' = 73, 'DROP ROLE' = 74, 'ROLE ADMIN' = 75, 'CREATE ROW POLICY' = 76, 'ALTER ROW POLICY' = 77, 'DROP ROW POLICY' = 78, 'CREATE QUOTA' = 79, 'ALTER QUOTA' = 80, 'DROP QUOTA' = 81, 'CREATE SETTINGS PROFILE' = 82, 'ALTER SETTINGS PROFILE' = 83, 'DROP SETTINGS PROFILE' = 84, 'SHOW USERS' = 85, 'SHOW ROLES' = 86, 'SHOW ROW POLICIES' = 87, 'SHOW QUOTAS' = 88, 'SHOW SETTINGS PROFILES' = 89, 'SHOW ACCESS' = 90, 'SHOW NAMED COLLECTIONS' = 91, 'ACCESS MANAGEMENT' = 92, 'SYSTEM SHUTDOWN' = 93, 'SYSTEM DROP DNS CACHE' = 94, 'SYSTEM DROP MARK CACHE' = 95, 'SYSTEM DROP UNCOMPRESSED CACHE' = 96, 'SYSTEM DROP MMAP CACHE' = 97, 'SYSTEM DROP QUERY CACHE' = 98, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 99, 'SYSTEM DROP FILESYSTEM CACHE' = 100, 'SYSTEM DROP SCHEMA CACHE' = 101, 'SYSTEM DROP CACHE' = 102, 'SYSTEM RELOAD CONFIG' = 103, 'SYSTEM RELOAD USERS' = 104, 'SYSTEM RELOAD SYMBOLS' = 105, 'SYSTEM RELOAD DICTIONARY' = 106, 'SYSTEM RELOAD MODEL' = 107, 'SYSTEM RELOAD FUNCTION' = 108, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 109, 'SYSTEM RELOAD' = 110, 'SYSTEM RESTART DISK' = 111, 'SYSTEM MERGES' = 112, 'SYSTEM TTL MERGES' = 113, 'SYSTEM FETCHES' = 114, 'SYSTEM MOVES' = 115, 'SYSTEM DISTRIBUTED SENDS' = 116, 'SYSTEM REPLICATED SENDS' = 117, 'SYSTEM SENDS' = 118, 'SYSTEM REPLICATION QUEUES' = 119, 'SYSTEM DROP REPLICA' = 120, 'SYSTEM SYNC REPLICA' = 121, 'SYSTEM RESTART REPLICA' = 122, 'SYSTEM RESTORE REPLICA' = 123, 'SYSTEM WAIT LOADING PARTS' = 124, 'SYSTEM SYNC DATABASE REPLICA' = 125, 'SYSTEM SYNC TRANSACTION LOG' = 126, 'SYSTEM SYNC FILE CACHE' = 127, 'SYSTEM FLUSH DISTRIBUTED' = 128, 'SYSTEM FLUSH LOGS' = 129, 'SYSTEM FLUSH' = 130, 'SYSTEM THREAD FUZZER' = 131, 'SYSTEM UNFREEZE' = 132, 'SYSTEM' = 133, 'dictGet' = 134, 'addressToLine' = 135, 'addressToLineWithInlines' = 136, 'addressToSymbol' = 137, 'demangle' = 138, 'INTROSPECTION' = 139, 'FILE' = 140, 'URL' = 141, 'REMOTE' = 142, 'MONGO' = 143, 'MEILISEARCH' = 144, 'MYSQL' = 145, 'POSTGRES' = 146, 'SQLITE' = 147, 'ODBC' = 148, 'JDBC' = 149, 'HDFS' = 150, 'S3' = 151, 'HIVE' = 152, 'SOURCES' = 153, 'CLUSTER' = 154, 'ALL' = 155, 'NONE' = 156))
`parent_group` Nullable(Enum16('SHOW DATABASES' = 0, 'SHOW TABLES' = 1, 'SHOW COLUMNS' = 2, 'SHOW DICTIONARIES' = 3, 'SHOW' = 4, 'SHOW FILESYSTEM CACHES' = 5, 'SELECT' = 6, 'INSERT' = 7, 'ALTER UPDATE' = 8, 'ALTER DELETE' = 9, 'ALTER ADD COLUMN' = 10, 'ALTER MODIFY COLUMN' = 11, 'ALTER DROP COLUMN' = 12, 'ALTER COMMENT COLUMN' = 13, 'ALTER CLEAR COLUMN' = 14, 'ALTER RENAME COLUMN' = 15, 'ALTER MATERIALIZE COLUMN' = 16, 'ALTER COLUMN' = 17, 'ALTER MODIFY COMMENT' = 18, 'ALTER ORDER BY' = 19, 'ALTER SAMPLE BY' = 20, 'ALTER ADD INDEX' = 21, 'ALTER DROP INDEX' = 22, 'ALTER MATERIALIZE INDEX' = 23, 'ALTER CLEAR INDEX' = 24, 'ALTER INDEX' = 25, 'ALTER ADD PROJECTION' = 26, 'ALTER DROP PROJECTION' = 27, 'ALTER MATERIALIZE PROJECTION' = 28, 'ALTER CLEAR PROJECTION' = 29, 'ALTER PROJECTION' = 30, 'ALTER ADD CONSTRAINT' = 31, 'ALTER DROP CONSTRAINT' = 32, 'ALTER CONSTRAINT' = 33, 'ALTER TTL' = 34, 'ALTER MATERIALIZE TTL' = 35, 'ALTER SETTINGS' = 36, 'ALTER MOVE PARTITION' = 37, 'ALTER FETCH PARTITION' = 38, 'ALTER FREEZE PARTITION' = 39, 'ALTER DATABASE SETTINGS' = 40, 'ALTER NAMED COLLECTION' = 41, 'ALTER TABLE' = 42, 'ALTER DATABASE' = 43, 'ALTER VIEW REFRESH' = 44, 'ALTER VIEW MODIFY QUERY' = 45, 'ALTER VIEW' = 46, 'ALTER' = 47, 'CREATE DATABASE' = 48, 'CREATE TABLE' = 49, 'CREATE VIEW' = 50, 'CREATE DICTIONARY' = 51, 'CREATE TEMPORARY TABLE' = 52, 'CREATE FUNCTION' = 53, 'CREATE NAMED COLLECTION' = 54, 'CREATE' = 55, 'DROP DATABASE' = 56, 'DROP TABLE' = 57, 'DROP VIEW' = 58, 'DROP DICTIONARY' = 59, 'DROP FUNCTION' = 60, 'DROP NAMED COLLECTION' = 61, 'DROP' = 62, 'TRUNCATE' = 63, 'OPTIMIZE' = 64, 'BACKUP' = 65, 'KILL QUERY' = 66, 'KILL TRANSACTION' = 67, 'MOVE PARTITION BETWEEN SHARDS' = 68, 'CREATE USER' = 69, 'ALTER USER' = 70, 'DROP USER' = 71, 'CREATE ROLE' = 72, 'ALTER ROLE' = 73, 'DROP ROLE' = 74, 'ROLE ADMIN' = 75, 'CREATE ROW POLICY' = 76, 'ALTER ROW POLICY' = 77, 'DROP ROW POLICY' = 78, 'CREATE QUOTA' = 79, 'ALTER QUOTA' = 80, 'DROP QUOTA' = 81, 'CREATE SETTINGS PROFILE' = 82, 'ALTER SETTINGS PROFILE' = 83, 'DROP SETTINGS PROFILE' = 84, 'SHOW USERS' = 85, 'SHOW ROLES' = 86, 'SHOW ROW POLICIES' = 87, 'SHOW QUOTAS' = 88, 'SHOW SETTINGS PROFILES' = 89, 'SHOW ACCESS' = 90, 'SHOW NAMED COLLECTIONS' = 91, 'ACCESS MANAGEMENT' = 92, 'SYSTEM SHUTDOWN' = 93, 'SYSTEM DROP DNS CACHE' = 94, 'SYSTEM DROP MARK CACHE' = 95, 'SYSTEM DROP UNCOMPRESSED CACHE' = 96, 'SYSTEM DROP MMAP CACHE' = 97, 'SYSTEM DROP QUERY CACHE' = 98, 'SYSTEM DROP COMPILED EXPRESSION CACHE' = 99, 'SYSTEM DROP FILESYSTEM CACHE' = 100, 'SYSTEM DROP SCHEMA CACHE' = 101, 'SYSTEM DROP S3 CLIENT CACHE' = 102, 'SYSTEM DROP CACHE' = 103, 'SYSTEM RELOAD CONFIG' = 104, 'SYSTEM RELOAD USERS' = 105, 'SYSTEM RELOAD SYMBOLS' = 106, 'SYSTEM RELOAD DICTIONARY' = 107, 'SYSTEM RELOAD MODEL' = 108, 'SYSTEM RELOAD FUNCTION' = 109, 'SYSTEM RELOAD EMBEDDED DICTIONARIES' = 110, 'SYSTEM RELOAD' = 111, 'SYSTEM RESTART DISK' = 112, 'SYSTEM MERGES' = 113, 'SYSTEM TTL MERGES' = 114, 'SYSTEM FETCHES' = 115, 'SYSTEM MOVES' = 116, 'SYSTEM DISTRIBUTED SENDS' = 117, 'SYSTEM REPLICATED SENDS' = 118, 'SYSTEM SENDS' = 119, 'SYSTEM REPLICATION QUEUES' = 120, 'SYSTEM DROP REPLICA' = 121, 'SYSTEM SYNC REPLICA' = 122, 'SYSTEM RESTART REPLICA' = 123, 'SYSTEM RESTORE REPLICA' = 124, 'SYSTEM WAIT LOADING PARTS' = 125, 'SYSTEM SYNC DATABASE REPLICA' = 126, 'SYSTEM SYNC TRANSACTION LOG' = 127, 'SYSTEM SYNC FILE CACHE' = 128, 'SYSTEM FLUSH DISTRIBUTED' = 129, 'SYSTEM FLUSH LOGS' = 130, 'SYSTEM FLUSH' = 131, 'SYSTEM THREAD FUZZER' = 132, 'SYSTEM UNFREEZE' = 133, 'SYSTEM' = 134, 'dictGet' = 135, 'addressToLine' = 136, 'addressToLineWithInlines' = 137, 'addressToSymbol' = 138, 'demangle' = 139, 'INTROSPECTION' = 140, 'FILE' = 141, 'URL' = 142, 'REMOTE' = 143, 'MONGO' = 144, 'MEILISEARCH' = 145, 'MYSQL' = 146, 'POSTGRES' = 147, 'SQLITE' = 148, 'ODBC' = 149, 'JDBC' = 150, 'HDFS' = 151, 'S3' = 152, 'HIVE' = 153, 'SOURCES' = 154, 'CLUSTER' = 155, 'ALL' = 156, 'NONE' = 157))
)
ENGINE = SystemPrivileges
COMMENT 'SYSTEM TABLE is built on the fly.'