improve performance of storage S3

This commit is contained in:
Anton Popov 2022-11-26 03:24:11 +00:00
parent c46a659ad9
commit 65a78bcd91
14 changed files with 376 additions and 260 deletions

View File

@ -177,15 +177,6 @@ void BackupWriterS3::copyObjectImpl(
}
Aws::S3::Model::HeadObjectOutcome BackupWriterS3::requestObjectHeadData(const std::string & bucket_from, const std::string & key) const
{
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(bucket_from);
request.SetKey(key);
return client->HeadObject(request);
}
void BackupWriterS3::copyObjectMultipartImpl(
const String & src_bucket,
const String & src_key,
@ -294,7 +285,7 @@ void BackupWriterS3::copyFileNative(DiskPtr from_disk, const String & file_name_
std::string source_bucket = object_storage->getObjectsNamespace();
auto file_path = fs::path(s3_uri.key) / file_name_to;
auto head = requestObjectHeadData(source_bucket, objects[0].absolute_path).GetResult();
auto head = S3::headObject(client, source_bucket, objects[0].absolute_path).GetResult();
if (static_cast<size_t>(head.GetContentLength()) < request_settings.max_single_operation_copy_size)
{
copyObjectImpl(

View File

@ -61,8 +61,6 @@ public:
void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to) override;
private:
Aws::S3::Model::HeadObjectOutcome requestObjectHeadData(const std::string & bucket_from, const std::string & key) const;
void copyObjectImpl(
const String & src_bucket,
const String & src_key,

View File

@ -149,31 +149,13 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const std::string & snapshot_pa
};
};
const auto file_exists = [&](const auto & key)
{
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(s3_client->uri.bucket);
request.SetKey(key);
auto outcome = s3_client->client->HeadObject(request);
if (outcome.IsSuccess())
return true;
const auto & error = outcome.GetError();
if (error.GetErrorType() != Aws::S3::S3Errors::NO_SUCH_KEY && error.GetErrorType() != Aws::S3::S3Errors::RESOURCE_NOT_FOUND)
throw S3Exception(error.GetErrorType(), "Failed to verify existence of lock file: {}", error.GetMessage());
return false;
};
LOG_INFO(log, "Will try to upload snapshot on {} to S3", snapshot_path);
ReadBufferFromFile snapshot_file(snapshot_path);
auto snapshot_name = fs::path(snapshot_path).filename().string();
auto lock_file = fmt::format(".{}_LOCK", snapshot_name);
if (file_exists(snapshot_name))
if (S3::objectExists(s3_client->client, s3_client->uri.bucket, snapshot_name))
{
LOG_ERROR(log, "Snapshot {} already exists", snapshot_name);
return;
@ -181,7 +163,7 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const std::string & snapshot_pa
// First we need to verify that there isn't already a lock file for the snapshot we want to upload
// Only leader uploads a snapshot, but there can be a rare case where we have 2 leaders in NuRaft
if (file_exists(lock_file))
if (S3::objectExists(s3_client->client, s3_client->uri.bucket, lock_file))
{
LOG_ERROR(log, "Lock file for {} already, exists. Probably a different node is already uploading the snapshot", snapshot_name);
return;

View File

@ -66,12 +66,6 @@ namespace ErrorCodes
namespace
{
bool isNotFoundError(Aws::S3::S3Errors error)
{
return error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND
|| error == Aws::S3::S3Errors::NO_SUCH_KEY;
}
template <typename Result, typename Error>
void throwIfError(const Aws::Utils::Outcome<Result, Error> & response)
{
@ -89,7 +83,7 @@ void throwIfUnexpectedError(const Aws::Utils::Outcome<Result, Error> & response,
/// the log will be polluted with error messages from aws sdk.
/// Looks like there is no way to suppress them.
if (!response.IsSuccess() && (!if_exists || !isNotFoundError(response.GetError().GetErrorType())))
if (!response.IsSuccess() && (!if_exists || !S3::isNotFoundError(response.GetError().GetErrorType())))
{
const auto & err = response.GetError();
throw S3Exception(err.GetErrorType(), "{} (Code: {})", err.GetMessage(), static_cast<size_t>(err.GetErrorType()));
@ -130,28 +124,12 @@ std::string S3ObjectStorage::generateBlobNameForPath(const std::string & /* path
Aws::S3::Model::HeadObjectOutcome S3ObjectStorage::requestObjectHeadData(const std::string & bucket_from, const std::string & key) const
{
auto client_ptr = client.get();
ProfileEvents::increment(ProfileEvents::S3HeadObject);
ProfileEvents::increment(ProfileEvents::DiskS3HeadObject);
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(bucket_from);
request.SetKey(key);
return client_ptr->HeadObject(request);
return S3::headObject(client.get(), bucket_from, key, "", true);
}
bool S3ObjectStorage::exists(const StoredObject & object) const
{
auto object_head = requestObjectHeadData(bucket, object.absolute_path);
if (!object_head.IsSuccess())
{
if (object_head.GetError().GetErrorType() == Aws::S3::S3Errors::RESOURCE_NOT_FOUND)
return false;
throwIfError(object_head);
}
return true;
return S3::objectExists(client.get(), bucket, object.absolute_path, "", true);
}
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT

View File

@ -340,9 +340,7 @@ SeekableReadBufferPtr ReadBufferS3Factory::getReader()
{
const auto next_range = range_generator.nextRange();
if (!next_range)
{
return nullptr;
}
auto reader = std::make_shared<ReadBufferFromS3>(
client_ptr,
@ -350,10 +348,11 @@ SeekableReadBufferPtr ReadBufferS3Factory::getReader()
key,
version_id,
request_settings,
read_settings,
read_settings.adjustBufferSize(object_size),
false /*use_external_buffer*/,
next_range->first,
next_range->second);
return reader;
}

View File

@ -851,8 +851,12 @@ namespace S3
quoteString(bucket), !uri.empty() ? " (" + uri.toString() + ")" : "");
}
bool isNotFoundError(Aws::S3::S3Errors error)
{
return error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND || error == Aws::S3::S3Errors::NO_SUCH_KEY;
}
S3::ObjectInfo getObjectInfo(std::shared_ptr<const Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3)
Aws::S3::Model::HeadObjectOutcome headObject(ClientPtr client_ptr, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
{
ProfileEvents::increment(ProfileEvents::S3HeadObject);
if (for_disk_s3)
@ -865,7 +869,12 @@ namespace S3
if (!version_id.empty())
req.SetVersionId(version_id);
Aws::S3::Model::HeadObjectOutcome outcome = client_ptr->HeadObject(req);
return client_ptr->HeadObject(req);
}
S3::ObjectInfo getObjectInfo(ClientPtr client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3)
{
auto outcome = headObject(client_ptr, bucket, key, version_id, for_disk_s3);
if (outcome.IsSuccess())
{
@ -879,11 +888,26 @@ namespace S3
return {};
}
size_t getObjectSize(std::shared_ptr<const Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3)
size_t getObjectSize(ClientPtr client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3)
{
return getObjectInfo(client_ptr, bucket, key, version_id, throw_on_error, for_disk_s3).size;
}
bool objectExists(ClientPtr client_ptr, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
{
auto outcome = headObject(client_ptr, bucket, key, version_id, for_disk_s3);
if (outcome.IsSuccess())
return true;
const auto & error = outcome.GetError();
if (isNotFoundError(error.GetErrorType()))
return false;
throw S3Exception(error.GetErrorType(),
"Failed to check existence of key {} in bucket {}: {}",
bucket, key, error.GetMessage());
}
}
}

View File

@ -13,20 +13,17 @@
#include <base/types.h>
#include <aws/core/Aws.h>
#include <aws/core/client/ClientConfiguration.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/S3Errors.h>
#include <Poco/URI.h>
#include <Common/Exception.h>
#include <Common/Throttler_fwd.h>
namespace Aws::S3
{
class S3Client;
}
namespace DB
{
namespace ErrorCodes
{
extern const int S3_ERROR;
@ -130,16 +127,24 @@ struct ObjectInfo
time_t last_modification_time = 0;
};
S3::ObjectInfo getObjectInfo(std::shared_ptr<const Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3);
bool isNotFoundError(Aws::S3::S3Errors error);
size_t getObjectSize(std::shared_ptr<const Aws::S3::S3Client> client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3);
using ClientPtr = std::shared_ptr<const Aws::S3::S3Client>;
Aws::S3::Model::HeadObjectOutcome headObject(ClientPtr client_ptr, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false);
S3::ObjectInfo getObjectInfo(ClientPtr client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3);
size_t getObjectSize(ClientPtr client_ptr, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3);
bool objectExists(ClientPtr client_ptr, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false);
}
#endif
namespace Poco::Util
{
class AbstractConfiguration;
class AbstractConfiguration;
};
namespace DB::S3

View File

@ -184,17 +184,7 @@ void WriteBufferFromS3::finalizeImpl()
{
LOG_TRACE(log, "Checking object {} exists after upload", key);
Aws::S3::Model::HeadObjectRequest request;
request.SetBucket(bucket);
request.SetKey(key);
ProfileEvents::increment(ProfileEvents::S3HeadObject);
if (write_settings.for_object_storage)
ProfileEvents::increment(ProfileEvents::DiskS3HeadObject);
auto response = client_ptr->HeadObject(request);
auto response = S3::headObject(client_ptr, bucket, key, "", write_settings.for_object_storage);
if (!response.IsSuccess())
throw S3Exception(fmt::format("Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", key, bucket), response.GetError().GetErrorType());
else

View File

@ -1,44 +0,0 @@
#include "threadPoolCallbackRunner.h"
#include <Common/scope_guard_safe.h>
#include <Common/CurrentThread.h>
#include <Common/setThreadName.h>
#include <IO/AsynchronousReader.h>
#include <future>
namespace DB
{
template <typename Result> ThreadPoolCallbackRunner<Result> threadPoolCallbackRunner(ThreadPool & pool, const std::string & thread_name)
{
return [pool = &pool, thread_group = CurrentThread::getGroup(), thread_name](std::function<Result()> && callback, size_t priority) mutable -> std::future<Result>
{
auto task = std::make_shared<std::packaged_task<Result()>>([thread_group, thread_name, callback = std::move(callback)]() -> Result
{
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE({
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
});
setThreadName(thread_name.data());
return callback();
});
auto future = task->get_future();
/// ThreadPool is using "bigger is higher priority" instead of "smaller is more priority".
pool->scheduleOrThrow([task]{ (*task)(); }, -priority);
return future;
};
}
template ThreadPoolCallbackRunner<void> threadPoolCallbackRunner(ThreadPool & pool, const std::string & thread_name);
template ThreadPoolCallbackRunner<IAsynchronousReader::Result> threadPoolCallbackRunner(ThreadPool & pool, const std::string & thread_name);
}

View File

@ -1,9 +1,11 @@
#pragma once
#include <Common/ThreadPool.h>
#include <Common/scope_guard_safe.h>
#include <Common/CurrentThread.h>
#include <Common/setThreadName.h>
#include <future>
namespace DB
{
@ -13,6 +15,32 @@ using ThreadPoolCallbackRunner = std::function<std::future<Result>(std::function
/// Creates CallbackRunner that runs every callback with 'pool->scheduleOrThrow()'.
template <typename Result>
ThreadPoolCallbackRunner<Result> threadPoolCallbackRunner(ThreadPool & pool, const std::string & thread_name);
ThreadPoolCallbackRunner<Result> threadPoolCallbackRunner(ThreadPool & pool, const std::string & thread_name)
{
return [pool = &pool, thread_group = CurrentThread::getGroup(), thread_name](std::function<Result()> && callback, size_t priority) mutable -> std::future<Result>
{
auto task = std::make_shared<std::packaged_task<Result()>>([thread_group, thread_name, callback = std::move(callback)]() -> Result
{
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE({
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
});
setThreadName(thread_name.data());
return callback();
});
auto future = task->get_future();
/// ThreadPool is using "bigger is higher priority" instead of "smaller is more priority".
pool->scheduleOrThrow([task]{ (*task)(); }, -priority);
return future;
};
}
}

View File

@ -1,3 +1,4 @@
#include "base/types.h"
#include "config.h"
#include <Common/ProfileEvents.h>
#include "IO/ParallelReadBuffer.h"
@ -14,7 +15,6 @@
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTInsertQuery.h>
@ -29,6 +29,10 @@
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageURL.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/StoredObject.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/WriteBufferFromS3.h>
@ -41,7 +45,6 @@
#include <QueryPipeline/narrowPipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <DataTypes/DataTypeString.h>
@ -110,6 +113,8 @@ public:
, object_infos(object_infos_)
, read_keys(read_keys_)
, request_settings(request_settings_)
, list_objects_pool(1)
, list_objects_scheduler(threadPoolCallbackRunner<Aws::S3::Model::ListObjectsV2Outcome>(list_objects_pool, "ListObjects"))
{
if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos)
throw Exception("Expression can not have wildcards inside bucket name", ErrorCodes::UNEXPECTED_EXPRESSION);
@ -119,17 +124,22 @@ public:
/// We don't have to list bucket, because there is no asterisks.
if (key_prefix.size() == globbed_uri.key.size())
{
buffer.emplace_back(globbed_uri.key);
buffer.emplace_back(globbed_uri.key, std::nullopt);
buffer_iter = buffer.begin();
is_finished = true;
return;
}
request.SetBucket(globbed_uri.bucket);
request.SetPrefix(key_prefix);
outcome_future = listObjectsAsync();
/// Create a virtual block with one row to construct filter
if (query && virtual_header)
{
/// Append "key" column as the filter result
virtual_header.insert({ColumnString::create(), std::make_shared<DataTypeString>(), "_key"});
/// Append "idx" column as the filter result
virtual_header.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
auto block = virtual_header.cloneEmpty();
MutableColumns columns = block.mutateColumns();
@ -139,23 +149,24 @@ public:
VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast);
}
request.SetBucket(globbed_uri.bucket);
request.SetPrefix(key_prefix);
matcher = std::make_unique<re2::RE2>(makeRegexpPatternFromGlobs(globbed_uri.key));
recursive = globbed_uri.key == "/**" ? true : false;
fillInternalBufferAssumeLocked();
}
String next()
KeyWithInfo next()
{
std::lock_guard lock(mutex);
return nextAssumeLocked();
}
private:
~Impl()
{
list_objects_pool.wait();
}
String nextAssumeLocked()
private:
KeyWithInfo nextAssumeLocked()
{
if (buffer_iter != buffer.end())
{
@ -168,7 +179,6 @@ private:
return {};
fillInternalBufferAssumeLocked();
return nextAssumeLocked();
}
@ -176,21 +186,28 @@ private:
{
buffer.clear();
ProfileEvents::increment(ProfileEvents::S3ListObjects);
outcome = client.ListObjectsV2(request);
assert(outcome_future.valid());
auto outcome = outcome_future.get();
if (!outcome.IsSuccess())
throw Exception(ErrorCodes::S3_ERROR, "Could not list objects in bucket {} with prefix {}, S3 exception: {}, message: {}",
quoteString(request.GetBucket()), quoteString(request.GetPrefix()),
backQuote(outcome.GetError().GetExceptionName()), quoteString(outcome.GetError().GetMessage()));
/// It returns false when all objects were returned
is_finished = !outcome.GetResult().GetIsTruncated();
if (!is_finished)
outcome_future = listObjectsAsync();
const auto & result_batch = outcome.GetResult().GetContents();
if (filter_ast)
{
auto block = virtual_header.cloneEmpty();
MutableColumnPtr path_column;
MutableColumnPtr file_column;
MutableColumnPtr key_column = block.getByName("_key").column->assumeMutable();
MutableColumnPtr idx_column = block.getByName("_idx").column->assumeMutable();
if (block.has("_path"))
path_column = block.getByName("_path").column->assumeMutable();
@ -198,29 +215,45 @@ private:
if (block.has("_file"))
file_column = block.getByName("_file").column->assumeMutable();
for (const auto & row : result_batch)
KeysWithInfo temp_buffer;
for (size_t i = 0; i < result_batch.size(); ++i)
{
const String & key = row.GetKey();
const auto & row = result_batch[i];
String key = row.GetKey();
if (recursive || re2::RE2::FullMatch(key, *matcher))
{
S3::ObjectInfo info =
{
.size = size_t(row.GetSize()),
.last_modification_time = row.GetLastModified().Millis() / 1000,
};
String path = fs::path(globbed_uri.bucket) / key;
if (object_infos)
(*object_infos)[path] = {.size = size_t(row.GetSize()), .last_modification_time = row.GetLastModified().Millis() / 1000};
(*object_infos)[path] = info;
String file = path.substr(path.find_last_of('/') + 1);
if (path_column)
path_column->insert(path);
if (file_column)
file_column->insert(file);
key_column->insert(key);
temp_buffer.emplace_back(std::move(key), std::move(info));
idx_column->insert(i);
}
}
VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast);
const ColumnString & keys = typeid_cast<const ColumnString &>(*block.getByName("_key").column);
const auto & idxs = typeid_cast<const ColumnUInt64 &>(*block.getByName("_idx").column);
size_t rows = block.rows();
buffer.reserve(rows);
for (size_t i = 0; i < rows; ++i)
buffer.emplace_back(keys.getDataAt(i).toString());
{
UInt64 idx = idxs.getData()[i];
buffer.emplace_back(std::move(temp_buffer[idx]));
}
}
else
{
@ -229,38 +262,69 @@ private:
{
String key = row.GetKey();
if (recursive || re2::RE2::FullMatch(key, *matcher))
buffer.emplace_back(std::move(key));
{
S3::ObjectInfo info =
{
.size = size_t(row.GetSize()),
.last_modification_time = row.GetLastModified().Millis() / 1000,
};
if (object_infos)
(*object_infos)[fs::path(globbed_uri.bucket) / key] = info;
buffer.emplace_back(std::move(key), std::move(info));
}
}
}
/// Set iterator only after the whole batch is processed
buffer_iter = buffer.begin();
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
/// It returns false when all objects were returned
is_finished = !outcome.GetResult().GetIsTruncated();
if (read_keys)
read_keys->insert(read_keys->end(), buffer.begin(), buffer.end());
{
read_keys->reserve(read_keys->size() + buffer.size());
for (const auto & [key, _] : buffer)
read_keys->push_back(key);
}
}
std::future<Aws::S3::Model::ListObjectsV2Outcome> listObjectsAsync()
{
return list_objects_scheduler([this]
{
ProfileEvents::increment(ProfileEvents::S3ListObjects);
auto outcome = client.ListObjectsV2(request);
/// Outcome failure will be handled on the caller side.
if (outcome.IsSuccess())
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
return outcome;
}, 0);
}
std::mutex mutex;
Strings buffer;
Strings::iterator buffer_iter;
KeysWithInfo buffer;
KeysWithInfo::iterator buffer_iter;
Aws::S3::S3Client client;
S3::URI globbed_uri;
ASTPtr query;
Block virtual_header;
ASTPtr filter_ast;
Aws::S3::Model::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
std::unique_ptr<re2::RE2> matcher;
bool recursive{false};
bool is_finished{false};
std::unordered_map<String, S3::ObjectInfo> * object_infos;
Strings * read_keys;
Aws::S3::Model::ListObjectsV2Request request;
S3Settings::RequestSettings request_settings;
ThreadPool list_objects_pool;
ThreadPoolCallbackRunner<Aws::S3::Model::ListObjectsV2Outcome> list_objects_scheduler;
std::future<Aws::S3::Model::ListObjectsV2Outcome> outcome_future;
};
StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
@ -276,7 +340,7 @@ StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
{
}
String StorageS3Source::DisclosedGlobIterator::next()
StorageS3Source::KeyWithInfo StorageS3Source::DisclosedGlobIterator::next()
{
return pimpl->next();
}
@ -285,8 +349,16 @@ class StorageS3Source::KeysIterator::Impl : WithContext
{
public:
explicit Impl(
const std::vector<String> & keys_, const String & bucket_, ASTPtr query_, const Block & virtual_header_, ContextPtr context_)
: WithContext(context_), keys(keys_), bucket(bucket_), query(query_), virtual_header(virtual_header_)
const std::vector<String> & keys_,
const String & bucket_,
ASTPtr query_,
const Block & virtual_header_,
ContextPtr context_)
: WithContext(context_)
, keys(keys_)
, bucket(bucket_)
, query(query_)
, virtual_header(virtual_header_)
{
/// Create a virtual block with one row to construct filter
if (query && virtual_header)
@ -340,12 +412,13 @@ public:
}
}
String next()
KeyWithInfo next()
{
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= keys.size())
return "";
return keys[current_index];
return {};
return {keys[current_index], {}};
}
private:
@ -358,12 +431,16 @@ private:
};
StorageS3Source::KeysIterator::KeysIterator(
const std::vector<String> & keys_, const String & bucket_, ASTPtr query, const Block & virtual_header, ContextPtr context)
const std::vector<String> & keys_,
const String & bucket_,
ASTPtr query,
const Block & virtual_header,
ContextPtr context)
: pimpl(std::make_shared<StorageS3Source::KeysIterator::Impl>(keys_, bucket_, query, virtual_header, context))
{
}
String StorageS3Source::KeysIterator::next()
StorageS3Source::KeyWithInfo StorageS3Source::KeysIterator::next()
{
return pimpl->next();
}
@ -391,8 +468,7 @@ StorageS3Source::StorageS3Source(
const String & bucket_,
const String & version_id_,
std::shared_ptr<IteratorWrapper> file_iterator_,
const size_t download_thread_num_,
const std::unordered_map<String, S3::ObjectInfo> & object_infos_)
const size_t download_thread_num_)
: ISource(getHeader(sample_block_, requested_virtual_columns_))
, WithContext(context_)
, name(std::move(name_))
@ -409,9 +485,12 @@ StorageS3Source::StorageS3Source(
, requested_virtual_columns(requested_virtual_columns_)
, file_iterator(file_iterator_)
, download_thread_num(download_thread_num_)
, object_infos(object_infos_)
, create_reader_pool(1)
, create_reader_scheduler(threadPoolCallbackRunner<ReaderHolder>(create_reader_pool, "CreateS3Reader"))
{
initialize();
reader = createReader();
if (reader)
reader_future = createReaderAsync();
}
@ -423,17 +502,21 @@ void StorageS3Source::onCancel()
}
bool StorageS3Source::initialize()
StorageS3Source::ReaderHolder StorageS3Source::createReader()
{
String current_key = (*file_iterator)();
auto [current_key, info] = (*file_iterator)();
if (current_key.empty())
return false;
return {};
file_path = fs::path(bucket) / current_key;
size_t object_size = info
? info->size
: S3::getObjectSize(client, bucket, current_key, version_id, false, false);
int zstd_window_log_max = static_cast<int>(getContext()->getSettingsRef().zstd_window_log_max);
read_buf = wrapReadBufferWithCompressionMethod(
createS3ReadBuffer(current_key), chooseCompressionMethod(current_key, compression_hint), zstd_window_log_max);
auto read_buf = wrapReadBufferWithCompressionMethod(
createS3ReadBuffer(current_key, object_size),
chooseCompressionMethod(current_key, compression_hint),
zstd_window_log_max);
auto input_format = getContext()->getInputFormat(format, *read_buf, sample_block, max_block_size, format_settings);
QueryPipelineBuilder builder;
@ -446,32 +529,36 @@ bool StorageS3Source::initialize()
{ return std::make_shared<AddingDefaultsTransform>(header, columns_desc, *input_format, getContext()); });
}
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
auto current_reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
return true;
return ReaderHolder{fs::path(bucket) / current_key, std::move(read_buf), std::move(pipeline), std::move(current_reader)};
}
std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & key)
std::future<StorageS3Source::ReaderHolder> StorageS3Source::createReaderAsync()
{
size_t object_size;
auto it = object_infos.find(fs::path(bucket) / key);
if (it != object_infos.end())
object_size = it->second.size;
else
object_size = DB::S3::getObjectSize(client, bucket, key, version_id, false, false);
return create_reader_scheduler([this] { return createReader(); }, 0);
}
std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & key, size_t object_size)
{
auto read_settings = getContext()->getReadSettings().adjustBufferSize(object_size);
read_settings.enable_filesystem_cache = false;
auto download_buffer_size = getContext()->getSettings().max_download_buffer_size;
const bool use_parallel_download = download_buffer_size > 0 && download_thread_num > 1;
const bool object_too_small = object_size < download_thread_num * download_buffer_size;
if (!use_parallel_download || object_too_small)
{
LOG_TRACE(log, "Downloading object of size {} from S3 in single thread", object_size);
return std::make_unique<ReadBufferFromS3>(client, bucket, key, version_id, request_settings, getContext()->getReadSettings());
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
return createAsyncS3ReadBuffer(key, read_settings, object_size);
return std::make_unique<ReadBufferFromS3>(client, bucket, key, version_id, request_settings, read_settings);
}
assert(object_size > 0);
if (download_buffer_size < DBMS_DEFAULT_BUFFER_SIZE)
{
LOG_WARNING(log, "Downloading buffer {} bytes too small, set at least {} bytes", download_buffer_size, DBMS_DEFAULT_BUFFER_SIZE);
@ -479,13 +566,55 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & k
}
auto factory = std::make_unique<ReadBufferS3Factory>(
client, bucket, key, version_id, download_buffer_size, object_size, request_settings, getContext()->getReadSettings());
LOG_TRACE(
log, "Downloading from S3 in {} threads. Object size: {}, Range size: {}.", download_thread_num, object_size, download_buffer_size);
client, bucket, key, version_id, download_buffer_size, object_size, request_settings, read_settings);
LOG_TRACE(log,
"Downloading from S3 in {} threads. Object size: {}, Range size: {}.",
download_thread_num, object_size, download_buffer_size);
return std::make_unique<ParallelReadBuffer>(std::move(factory), threadPoolCallbackRunner<void>(IOThreadPool::get(), "S3ParallelRead"), download_thread_num);
}
std::unique_ptr<ReadBuffer> StorageS3Source::createAsyncS3ReadBuffer(
const String & key, const ReadSettings & read_settings, size_t object_size)
{
auto read_buffer_creator =
[this, read_settings]
(const std::string & path, size_t read_until_position) -> std::shared_ptr<ReadBufferFromFileBase>
{
return std::make_shared<ReadBufferFromS3>(
client,
bucket,
path,
version_id,
request_settings,
read_settings,
/* use_external_buffer */true,
/* offset */0,
read_until_position,
/* restricted_seek */true);
};
auto s3_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
StoredObjects{StoredObject{key, object_size}},
read_settings);
auto & pool_reader = getContext()->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
auto async_reader = std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(pool_reader, read_settings, std::move(s3_impl));
async_reader->setReadUntilEnd();
if (read_settings.remote_fs_prefetch)
async_reader->prefetch();
return async_reader;
}
StorageS3Source::~StorageS3Source()
{
create_reader_pool.wait();
}
String StorageS3Source::getName() const
{
return name;
@ -503,6 +632,7 @@ Chunk StorageS3Source::generate()
{
UInt64 num_rows = chunk.getNumRows();
const auto & file_path = reader.getPath();
for (const auto & virtual_column : requested_virtual_columns)
{
if (virtual_column.name == "_path")
@ -522,52 +652,19 @@ Chunk StorageS3Source::generate()
{
std::lock_guard lock(reader_mutex);
reader.reset();
pipeline.reset();
read_buf.reset();
if (!initialize())
assert(reader_future.valid());
reader = reader_future.get();
if (!reader)
break;
reader_future = createReaderAsync();
}
}
return {};
}
static bool checkIfObjectExists(const std::shared_ptr<const Aws::S3::S3Client> & client, const String & bucket, const String & key)
{
bool is_finished = false;
Aws::S3::Model::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
request.SetBucket(bucket);
request.SetPrefix(key);
while (!is_finished)
{
ProfileEvents::increment(ProfileEvents::S3ListObjects);
outcome = client->ListObjectsV2(request);
if (!outcome.IsSuccess())
throw Exception(
ErrorCodes::S3_ERROR,
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
quoteString(bucket),
quoteString(key),
backQuote(outcome.GetError().GetExceptionName()),
quoteString(outcome.GetError().GetMessage()));
const auto & result_batch = outcome.GetResult().GetContents();
for (const auto & obj : result_batch)
{
if (obj.GetKey() == key)
return true;
}
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
is_finished = !outcome.GetResult().GetIsTruncated();
}
return false;
}
class StorageS3Sink : public SinkToStorage
{
public:
@ -777,7 +874,9 @@ StorageS3::StorageS3(
distributed_processing_,
is_key_with_globs,
format_settings,
context_);
context_,
&object_infos);
storage_metadata.setColumns(columns);
}
else
@ -811,23 +910,35 @@ std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(
if (distributed_processing)
{
return std::make_shared<StorageS3Source::IteratorWrapper>(
[callback = local_context->getReadTaskCallback()]() -> String {
return callback();
});
[callback = local_context->getReadTaskCallback()]() -> StorageS3Source::KeyWithInfo
{
return {callback(), {}};
});
}
else if (is_key_with_globs)
{
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.uri, query, virtual_block, local_context, object_infos, read_keys, s3_configuration.request_settings);
*s3_configuration.client,
s3_configuration.uri,
query,
virtual_block,
local_context,
object_infos,
read_keys,
s3_configuration.request_settings);
return std::make_shared<StorageS3Source::IteratorWrapper>([glob_iterator]() { return glob_iterator->next(); });
}
else
{
auto keys_iterator
= std::make_shared<StorageS3Source::KeysIterator>(keys, s3_configuration.uri.bucket, query, virtual_block, local_context);
auto keys_iterator = std::make_shared<StorageS3Source::KeysIterator>(
keys, s3_configuration.uri.bucket,
query, virtual_block, local_context);
if (read_keys)
*read_keys = keys;
return std::make_shared<StorageS3Source::IteratorWrapper>([keys_iterator]() { return keys_iterator->next(); });
}
}
@ -915,8 +1026,7 @@ Pipe StorageS3::read(
s3_configuration.uri.bucket,
s3_configuration.uri.version_id,
iterator_wrapper,
max_download_threads,
object_infos));
max_download_threads));
}
auto pipe = Pipe::unitePipes(std::move(pipes));
@ -956,7 +1066,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
bool truncate_in_insert = local_context->getSettingsRef().s3_truncate_on_insert;
if (!truncate_in_insert && checkIfObjectExists(s3_configuration.client, s3_configuration.uri.bucket, keys.back()))
if (!truncate_in_insert && S3::objectExists(s3_configuration.client, s3_configuration.uri.bucket, keys.back(), s3_configuration.uri.version_id))
{
if (local_context->getSettingsRef().s3_create_new_file_on_insert)
{
@ -968,7 +1078,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
new_key = keys[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : keys[0].substr(pos));
++index;
}
while (checkIfObjectExists(s3_configuration.client, s3_configuration.uri.bucket, new_key));
while (S3::objectExists(s3_configuration.client, s3_configuration.uri.bucket, new_key, s3_configuration.uri.version_id));
keys.push_back(new_key);
}
else
@ -992,7 +1102,6 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
}
}
void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
{
updateS3Configuration(local_context, s3_configuration);
@ -1202,7 +1311,7 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
ReadBufferIterator read_buffer_iterator = [&, first = true](ColumnsDescription & cached_columns) mutable -> std::unique_ptr<ReadBuffer>
{
auto key = (*file_iterator)();
auto [key, _] = (*file_iterator)();
if (key.empty())
{
@ -1371,6 +1480,7 @@ std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
if (info.last_modification_time)
return info.last_modification_time;
return std::nullopt;
};

View File

@ -12,11 +12,13 @@
#include <Storages/StorageS3Settings.h>
#include <Processors/ISource.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Poco/URI.h>
#include <Common/logger_useful.h>
#include <IO/S3Common.h>
#include <IO/CompressionMethod.h>
#include <Interpreters/Context.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/Cache/SchemaCache.h>
@ -33,6 +35,21 @@ class StorageS3SequentialSource;
class StorageS3Source : public ISource, WithContext
{
public:
struct KeyWithInfo
{
KeyWithInfo() = default;
KeyWithInfo(String key_, std::optional<S3::ObjectInfo> info_)
: key(std::move(key_)), info(std::move(info_))
{
}
String key;
std::optional<S3::ObjectInfo> info;
};
using KeysWithInfo = std::vector<KeyWithInfo>;
class DisclosedGlobIterator
{
public:
@ -46,7 +63,7 @@ public:
Strings * read_keys_ = nullptr,
const S3Settings::RequestSettings & request_settings_ = {});
String next();
KeyWithInfo next();
private:
class Impl;
@ -57,9 +74,14 @@ public:
class KeysIterator
{
public:
explicit KeysIterator(
const std::vector<String> & keys_, const String & bucket_, ASTPtr query, const Block & virtual_header, ContextPtr context);
String next();
KeysIterator(
const std::vector<String> & keys_,
const String & bucket_,
ASTPtr query,
const Block & virtual_header,
ContextPtr context);
KeyWithInfo next();
private:
class Impl;
@ -67,7 +89,7 @@ public:
std::shared_ptr<Impl> pimpl;
};
using IteratorWrapper = std::function<String()>;
using IteratorWrapper = std::function<KeyWithInfo()>;
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
@ -86,8 +108,9 @@ public:
const String & bucket,
const String & version_id,
std::shared_ptr<IteratorWrapper> file_iterator_,
size_t download_thread_num,
const std::unordered_map<String, S3::ObjectInfo> & object_infos_);
size_t download_thread_num);
~StorageS3Source() override;
String getName() const override;
@ -99,7 +122,6 @@ private:
String name;
String bucket;
String version_id;
String file_path;
String format;
ColumnsDescription columns_desc;
UInt64 max_block_size;
@ -109,10 +131,37 @@ private:
Block sample_block;
std::optional<FormatSettings> format_settings;
struct ReaderHolder
{
public:
ReaderHolder(
String path_,
std::unique_ptr<ReadBuffer> read_buf_,
std::unique_ptr<QueryPipeline> pipeline_,
std::unique_ptr<PullingPipelineExecutor> reader_)
: path(std::move(path_))
, read_buf(std::move(read_buf_))
, pipeline(std::move(pipeline_))
, reader(std::move(reader_))
{
}
ReaderHolder() = default;
explicit operator bool() const { return reader != nullptr; }
PullingPipelineExecutor * operator->() { return reader.get(); }
const PullingPipelineExecutor * operator->() const { return reader.get(); }
const String & getPath() const { return path; }
private:
String path;
std::unique_ptr<ReadBuffer> read_buf;
std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
};
ReaderHolder reader;
std::unique_ptr<ReadBuffer> read_buf;
std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
/// onCancel and generate can be called concurrently
std::mutex reader_mutex;
std::vector<NameAndTypePair> requested_virtual_columns;
@ -121,12 +170,16 @@ private:
Poco::Logger * log = &Poco::Logger::get("StorageS3Source");
std::unordered_map<String, S3::ObjectInfo> object_infos;
ThreadPool create_reader_pool;
ThreadPoolCallbackRunner<ReaderHolder> create_reader_scheduler;
std::future<ReaderHolder> reader_future;
/// Recreate ReadBuffer and Pipeline for each file.
bool initialize();
ReaderHolder createReader();
std::future<ReaderHolder> createReaderAsync();
std::unique_ptr<ReadBuffer> createS3ReadBuffer(const String & key);
std::unique_ptr<ReadBuffer> createS3ReadBuffer(const String & key, size_t object_size);
std::unique_ptr<ReadBuffer> createAsyncS3ReadBuffer(const String & key, const ReadSettings & read_settings, size_t object_size);
};
/**

View File

@ -100,7 +100,8 @@ Pipe StorageS3Cluster::read(
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.uri, query_info.query, virtual_block, context);
auto callback = std::make_shared<StorageS3Source::IteratorWrapper>([iterator]() mutable -> String { return iterator->next(); });
auto callback = std::make_shared<TaskIterator>([iterator] { return iterator->next().key; });
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
Block header =

View File

@ -144,6 +144,7 @@ bool prepareFilterBlockWithQuery(const ASTPtr & query, ContextPtr context, Block
else
const_columns[i] = ColumnConst::create(columns[i]->cloneResized(1), 1);
}
block.setColumns(const_columns);
bool unmodified = true;