ClickHouse/src/Storages/StorageS3.cpp

2006 lines
76 KiB
C++
Raw Normal View History

#include "config.h"
2019-12-06 14:37:21 +00:00
#if USE_AWS_S3
#include <Common/isValidUTF8.h>
2021-05-31 08:46:28 +00:00
2019-12-03 16:23:24 +00:00
#include <IO/S3Common.h>
#include <IO/S3/Requests.h>
#include <IO/ParallelReadBuffer.h>
#include <IO/SharedThreadPools.h>
2024-01-07 01:16:29 +00:00
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
2019-05-23 09:03:39 +00:00
2021-05-25 06:45:30 +00:00
#include <Interpreters/TreeRewriter.h>
2019-05-23 09:03:39 +00:00
#include <Interpreters/evaluateConstantExpression.h>
2021-05-25 06:45:30 +00:00
2021-06-01 12:41:23 +00:00
#include <Parsers/ASTFunction.h>
2021-05-25 06:45:30 +00:00
#include <Parsers/ASTInsertQuery.h>
2024-01-02 17:50:06 +00:00
#include <Parsers/ASTCreateQuery.h>
2019-05-23 09:03:39 +00:00
2021-06-01 12:41:23 +00:00
#include <Storages/StorageFactory.h>
#include <Storages/StorageS3.h>
#include <Storages/StorageS3Settings.h>
#include <Storages/StorageSnapshot.h>
2021-10-26 09:31:01 +00:00
#include <Storages/PartitionedSink.h>
2022-05-19 11:18:58 +00:00
#include <Storages/VirtualColumnUtils.h>
#include <Storages/checkAndGetLiteralArgument.h>
2022-09-13 13:07:43 +00:00
#include <Storages/StorageURL.h>
#include <Storages/NamedCollectionsHelpers.h>
2019-05-23 09:03:39 +00:00
2023-05-22 17:51:58 +00:00
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
2022-11-26 03:24:11 +00:00
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/StoredObject.h>
2019-06-01 21:18:20 +00:00
#include <IO/ReadBufferFromS3.h>
#include <IO/WriteBufferFromS3.h>
2019-05-23 09:03:39 +00:00
#include <Formats/FormatFactory.h>
#include <Formats/ReadSchemaUtils.h>
2019-05-31 07:27:14 +00:00
2021-07-21 16:13:17 +00:00
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Transforms/ExtractColumnsTransform.h>
2021-10-11 16:11:50 +00:00
#include <Processors/Formats/IOutputFormat.h>
2021-10-13 18:22:02 +00:00
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Sources/ConstChunkGenerator.h>
2024-01-02 17:50:06 +00:00
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
2019-05-23 09:03:39 +00:00
2021-10-16 14:03:50 +00:00
#include <QueryPipeline/QueryPipelineBuilder.h>
2023-11-13 11:41:40 +00:00
#include <Planner/Utils.h>
#include <Analyzer/QueryNode.h>
2021-07-20 18:18:43 +00:00
2020-01-26 14:05:51 +00:00
#include <DataTypes/DataTypeString.h>
#include <aws/core/auth/AWSCredentials.h>
2020-01-26 13:03:47 +00:00
#include <Common/NamedCollections/NamedCollections.h>
2020-01-26 13:03:47 +00:00
#include <Common/parseGlobs.h>
2020-02-17 19:28:25 +00:00
#include <Common/quoteString.h>
#include <Common/CurrentMetrics.h>
2024-01-02 17:50:06 +00:00
#include <Common/ProfileEvents.h>
2019-05-23 09:03:39 +00:00
2022-05-20 19:49:31 +00:00
#include <Processors/ISource.h>
2021-07-23 19:33:59 +00:00
#include <Processors/Sinks/SinkToStorage.h>
2021-10-16 14:03:50 +00:00
#include <QueryPipeline/Pipe.h>
2021-05-08 10:59:55 +00:00
#include <filesystem>
2019-05-23 09:03:39 +00:00
#include <boost/algorithm/string.hpp>
2023-09-14 16:12:29 +00:00
#ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
#include <re2/re2.h>
#ifdef __clang__
# pragma clang diagnostic pop
#endif
2021-05-08 10:59:55 +00:00
namespace fs = std::filesystem;
2019-05-31 07:27:14 +00:00
2021-05-25 06:45:30 +00:00
namespace CurrentMetrics
{
extern const Metric StorageS3Threads;
extern const Metric StorageS3ThreadsActive;
extern const Metric StorageS3ThreadsScheduled;
}
2022-09-12 08:01:17 +00:00
namespace ProfileEvents
{
2022-09-19 17:23:22 +00:00
extern const Event S3DeleteObjects;
extern const Event S3ListObjects;
extern const Event EngineFileLikeReadFiles;
2022-09-12 08:01:17 +00:00
}
2019-05-23 09:03:39 +00:00
namespace DB
{
2021-07-18 11:03:45 +00:00
static const std::unordered_set<std::string_view> required_configuration_keys = {
"url",
};
static const std::unordered_set<std::string_view> optional_configuration_keys = {
"format",
"compression",
2022-12-15 08:53:44 +00:00
"compression_method",
"structure",
"access_key_id",
"secret_access_key",
"session_token",
"filename",
"use_environment_credentials",
"max_single_read_retries",
"min_upload_part_size",
"upload_part_size_multiply_factor",
"upload_part_size_multiply_parts_count_threshold",
"max_single_part_upload_size",
"max_connections",
2023-03-29 11:08:44 +00:00
"expiration_window_seconds",
"no_sign_request"
};
2019-05-23 09:03:39 +00:00
namespace ErrorCodes
{
2021-07-18 11:03:45 +00:00
extern const int CANNOT_PARSE_TEXT;
2021-08-20 13:12:30 +00:00
extern const int BAD_ARGUMENTS;
2019-05-23 09:03:39 +00:00
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
2020-01-30 06:17:55 +00:00
extern const int S3_ERROR;
2021-07-18 11:03:45 +00:00
extern const int UNEXPECTED_EXPRESSION;
extern const int DATABASE_ACCESS_DENIED;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int CANNOT_DETECT_FORMAT;
2022-05-19 11:18:58 +00:00
extern const int NOT_IMPLEMENTED;
extern const int CANNOT_COMPILE_REGEXP;
extern const int FILE_DOESNT_EXIST;
2019-05-23 09:03:39 +00:00
}
2021-10-11 16:11:50 +00:00
class ReadFromStorageS3Step : public SourceStepWithFilter
{
public:
std::string getName() const override { return "ReadFromStorageS3Step"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void applyFilters() override;
ReadFromStorageS3Step(
Block sample_block,
const Names & column_names_,
StorageSnapshotPtr storage_snapshot_,
StorageS3 & storage_,
2024-01-04 14:41:04 +00:00
ReadFromFormatInfo read_from_format_info_,
bool need_only_count_,
ContextPtr context_,
size_t max_block_size_,
size_t num_streams_)
: SourceStepWithFilter(DataStream{.header = std::move(sample_block)})
, column_names(column_names_)
, storage_snapshot(std::move(storage_snapshot_))
, storage(storage_)
2024-01-04 14:41:04 +00:00
, read_from_format_info(std::move(read_from_format_info_))
, need_only_count(need_only_count_)
, local_context(std::move(context_))
, max_block_size(max_block_size_)
, num_streams(num_streams_)
{
2023-12-29 17:41:11 +00:00
query_configuration = storage.updateConfigurationAndGetCopy(local_context);
virtual_columns = storage.getVirtuals();
}
private:
Names column_names;
StorageSnapshotPtr storage_snapshot;
StorageS3 & storage;
2024-01-04 14:41:04 +00:00
ReadFromFormatInfo read_from_format_info;
bool need_only_count;
2023-12-29 17:41:11 +00:00
StorageS3::Configuration query_configuration;
NamesAndTypesList virtual_columns;
ContextPtr local_context;
size_t max_block_size;
size_t num_streams;
2023-12-29 17:41:11 +00:00
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper;
void createIterator(const ActionsDAG::Node * predicate);
};
2021-10-11 16:11:50 +00:00
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
2022-11-28 13:56:25 +00:00
class StorageS3Source::DisclosedGlobIterator::Impl : WithContext
{
2021-04-06 19:18:45 +00:00
public:
2023-12-29 17:41:11 +00:00
Impl(
const S3::Client & client_,
const S3::URI & globbed_uri_,
const ActionsDAG::Node * predicate,
const NamesAndTypesList & virtual_columns_,
ContextPtr context_,
KeysWithInfo * read_keys_,
const S3Settings::RequestSettings & request_settings_,
std::function<void(FileProgress)> file_progress_callback_)
: WithContext(context_)
, client(client_.clone())
, globbed_uri(globbed_uri_)
, virtual_columns(virtual_columns_)
, read_keys(read_keys_)
, request_settings(request_settings_)
, list_objects_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, CurrentMetrics::StorageS3ThreadsScheduled, 1)
, list_objects_scheduler(threadPoolCallbackRunner<ListObjectsOutcome>(list_objects_pool, "ListObjects"))
, file_progress_callback(file_progress_callback_)
{
if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos)
throw Exception(ErrorCodes::UNEXPECTED_EXPRESSION, "Expression can not have wildcards inside bucket name");
const String key_prefix = globbed_uri.key.substr(0, globbed_uri.key.find_first_of("*?{"));
/// We don't have to list bucket, because there is no asterisks.
if (key_prefix.size() == globbed_uri.key.size())
{
buffer.emplace_back(std::make_shared<KeyWithInfo>(globbed_uri.key, std::nullopt));
buffer_iter = buffer.begin();
is_finished = true;
return;
}
request.SetBucket(globbed_uri.bucket);
request.SetPrefix(key_prefix);
request.SetMaxKeys(static_cast<int>(request_settings.list_object_keys_size));
outcome_future = listObjectsAsync();
matcher = std::make_unique<re2::RE2>(makeRegexpPatternFromGlobs(globbed_uri.key));
if (!matcher->ok())
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
"Cannot compile regex from glob ({}): {}", globbed_uri.key, matcher->error());
recursive = globbed_uri.key == "/**" ? true : false;
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
2024-01-04 14:41:04 +00:00
fillInternalBufferAssumeLocked();
2023-12-29 17:41:11 +00:00
}
KeyWithInfoPtr next(size_t)
2021-04-10 02:21:18 +00:00
{
std::lock_guard lock(mutex);
return nextAssumeLocked();
}
size_t objectsCount()
{
2023-09-20 14:41:47 +00:00
return buffer.size();
}
2022-11-26 03:24:11 +00:00
~Impl()
{
list_objects_pool.wait();
}
2021-04-10 02:21:18 +00:00
private:
2022-12-05 14:43:41 +00:00
using ListObjectsOutcome = Aws::S3::Model::ListObjectsV2Outcome;
2021-04-10 02:21:18 +00:00
2023-09-27 12:54:31 +00:00
KeyWithInfoPtr nextAssumeLocked()
2021-04-06 19:18:45 +00:00
{
if (buffer_iter != buffer.end())
{
auto answer = *buffer_iter;
++buffer_iter;
/// If url doesn't contain globs, we didn't list s3 bucket and didn't get object info for the key.
/// So we get object info lazily here on 'next()' request.
2023-10-17 10:03:49 +00:00
if (!answer->info)
{
2023-10-17 10:03:49 +00:00
answer->info = S3::getObjectInfo(*client, globbed_uri.bucket, answer->key, globbed_uri.version_id, request_settings);
if (file_progress_callback)
2023-10-17 10:03:49 +00:00
file_progress_callback(FileProgress(0, answer->info->size));
}
2021-04-06 19:18:45 +00:00
return answer;
}
if (is_finished)
2021-04-13 10:59:02 +00:00
return {};
2021-04-06 19:18:45 +00:00
try
{
fillInternalBufferAssumeLocked();
}
catch (...)
{
/// In case of exception thrown while listing new batch of files
/// iterator may be partially initialized and its further using may lead to UB.
/// Iterator is used by several processors from several threads and
/// it may take some time for threads to stop processors and they
/// may still use this iterator after exception is thrown.
/// To avoid this UB, reset the buffer and return defaults for further calls.
is_finished = true;
buffer.clear();
buffer_iter = buffer.begin();
throw;
}
2021-04-10 02:21:18 +00:00
return nextAssumeLocked();
2021-04-06 19:18:45 +00:00
}
2021-04-10 02:21:18 +00:00
void fillInternalBufferAssumeLocked()
2021-04-06 19:18:45 +00:00
{
buffer.clear();
2022-11-26 03:24:11 +00:00
assert(outcome_future.valid());
auto outcome = outcome_future.get();
2021-04-06 19:18:45 +00:00
if (!outcome.IsSuccess())
{
2023-07-31 15:39:07 +00:00
throw S3Exception(outcome.GetError().GetErrorType(), "Could not list objects in bucket {} with prefix {}, S3 exception: {}, message: {}",
2021-04-06 19:18:45 +00:00
quoteString(request.GetBucket()), quoteString(request.GetPrefix()),
backQuote(outcome.GetError().GetExceptionName()), quoteString(outcome.GetError().GetMessage()));
}
2021-04-06 19:18:45 +00:00
const auto & result_batch = outcome.GetResult().GetContents();
2022-11-28 13:56:25 +00:00
/// It returns false when all objects were returned
2022-11-26 03:24:11 +00:00
is_finished = !outcome.GetResult().GetIsTruncated();
2022-05-19 11:18:58 +00:00
2022-11-26 03:24:11 +00:00
if (!is_finished)
{
/// Even if task is finished the thread may be not freed in pool.
/// So wait until it will be freed before scheduling a new task.
list_objects_pool.wait();
2022-11-26 03:24:11 +00:00
outcome_future = listObjectsAsync();
}
2022-05-19 11:18:58 +00:00
2023-02-04 14:11:14 +00:00
if (request_settings.throw_on_zero_files_match && result_batch.empty())
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Can not match any files using prefix {}", request.GetPrefix());
2022-11-28 13:56:25 +00:00
KeysWithInfo temp_buffer;
temp_buffer.reserve(result_batch.size());
2022-05-19 11:18:58 +00:00
2022-11-28 13:56:25 +00:00
for (const auto & row : result_batch)
2021-04-06 19:18:45 +00:00
{
2022-11-28 13:56:25 +00:00
String key = row.GetKey();
if (recursive || re2::RE2::FullMatch(key, *matcher))
2022-05-19 11:18:58 +00:00
{
2022-11-28 13:56:25 +00:00
S3::ObjectInfo info =
2022-05-19 11:18:58 +00:00
{
2022-11-28 13:56:25 +00:00
.size = size_t(row.GetSize()),
.last_modification_time = row.GetLastModified().Millis() / 1000,
};
2022-05-19 11:18:58 +00:00
2023-09-27 12:54:31 +00:00
temp_buffer.emplace_back(std::make_shared<KeyWithInfo>(std::move(key), std::move(info)));
2022-12-05 17:32:56 +00:00
}
2021-04-06 19:18:45 +00:00
}
2022-05-19 11:18:58 +00:00
if (temp_buffer.empty())
2022-11-29 17:33:35 +00:00
{
buffer_iter = buffer.begin();
return;
2022-11-29 17:33:35 +00:00
}
2022-05-19 11:18:58 +00:00
2024-01-02 17:27:33 +00:00
if (filter_dag)
2023-12-29 17:41:11 +00:00
{
std::vector<String> paths;
paths.reserve(temp_buffer.size());
for (const auto & key_with_info : temp_buffer)
paths.push_back(fs::path(globbed_uri.bucket) / key_with_info->key);
VirtualColumnUtils::filterByPathOrFile(temp_buffer, paths, filter_dag, virtual_columns, getContext());
}
buffer = std::move(temp_buffer);
if (file_progress_callback)
2021-04-06 19:18:45 +00:00
{
2023-09-27 12:54:31 +00:00
for (const auto & key_with_info : buffer)
file_progress_callback(FileProgress(0, key_with_info->info->size));
2021-04-06 19:18:45 +00:00
}
2022-05-19 11:18:58 +00:00
2021-04-06 19:18:45 +00:00
/// Set iterator only after the whole batch is processed
buffer_iter = buffer.begin();
2022-11-26 03:24:11 +00:00
if (read_keys)
2023-03-29 22:09:17 +00:00
read_keys->insert(read_keys->end(), buffer.begin(), buffer.end());
2022-11-26 03:24:11 +00:00
}
2021-04-12 17:07:01 +00:00
2022-12-05 14:43:41 +00:00
std::future<ListObjectsOutcome> listObjectsAsync()
2022-11-26 03:24:11 +00:00
{
return list_objects_scheduler([this]
{
ProfileEvents::increment(ProfileEvents::S3ListObjects);
auto outcome = client->ListObjectsV2(request);
2022-11-26 03:24:11 +00:00
/// Outcome failure will be handled on the caller side.
if (outcome.IsSuccess())
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
return outcome;
}, Priority{});
2021-04-06 19:18:45 +00:00
}
2021-04-10 02:21:18 +00:00
std::mutex mutex;
2022-11-26 03:24:11 +00:00
KeysWithInfo buffer;
KeysWithInfo::iterator buffer_iter;
std::unique_ptr<S3::Client> client;
2021-04-06 19:18:45 +00:00
S3::URI globbed_uri;
2022-05-19 11:18:58 +00:00
ASTPtr query;
NamesAndTypesList virtual_columns;
2023-12-29 17:41:11 +00:00
ActionsDAGPtr filter_dag;
2021-04-06 19:18:45 +00:00
std::unique_ptr<re2::RE2> matcher;
bool recursive{false};
2021-04-06 19:18:45 +00:00
bool is_finished{false};
2023-03-29 22:09:17 +00:00
KeysWithInfo * read_keys;
2022-11-26 03:24:11 +00:00
S3::ListObjectsV2Request request;
S3Settings::RequestSettings request_settings;
2022-11-26 03:24:11 +00:00
ThreadPool list_objects_pool;
2022-12-05 14:43:41 +00:00
ThreadPoolCallbackRunner<ListObjectsOutcome> list_objects_scheduler;
std::future<ListObjectsOutcome> outcome_future;
2023-06-23 13:43:40 +00:00
std::function<void(FileProgress)> file_progress_callback;
2021-04-06 19:18:45 +00:00
};
2023-12-29 17:41:11 +00:00
StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
const S3::Client & client_,
const S3::URI & globbed_uri_,
const ActionsDAG::Node * predicate,
const NamesAndTypesList & virtual_columns_,
const ContextPtr & context,
2023-12-29 17:41:11 +00:00
KeysWithInfo * read_keys_,
const S3Settings::RequestSettings & request_settings_,
std::function<void(FileProgress)> file_progress_callback_)
: pimpl(std::make_shared<StorageS3Source::DisclosedGlobIterator::Impl>(client_, globbed_uri_, predicate, virtual_columns_, context, read_keys_, request_settings_, file_progress_callback_))
{
}
StorageS3Source::KeyWithInfoPtr StorageS3Source::DisclosedGlobIterator::next(size_t idx) /// NOLINT
2021-04-06 19:18:45 +00:00
{
return pimpl->next(idx);
2021-04-06 19:18:45 +00:00
}
size_t StorageS3Source::DisclosedGlobIterator::estimatedKeysCount()
{
return pimpl->objectsCount();
}
2023-11-13 11:41:40 +00:00
class StorageS3Source::KeysIterator::Impl
{
public:
2022-05-19 11:18:58 +00:00
explicit Impl(
const S3::Client & client_,
const std::string & version_id_,
const std::vector<String> & keys_,
const String & bucket_,
const S3Settings::RequestSettings & request_settings_,
KeysWithInfo * read_keys_,
2023-06-23 13:43:40 +00:00
std::function<void(FileProgress)> file_progress_callback_)
2023-11-13 11:41:40 +00:00
: keys(keys_)
2023-07-18 13:49:23 +00:00
, client(client_.clone())
, version_id(version_id_)
, bucket(bucket_)
, request_settings(request_settings_)
2023-06-23 13:43:40 +00:00
, file_progress_callback(file_progress_callback_)
{
if (read_keys_)
{
for (const auto & key : keys)
2023-09-27 12:54:31 +00:00
read_keys_->push_back(std::make_shared<KeyWithInfo>(key));
2022-05-19 11:18:58 +00:00
}
}
KeyWithInfoPtr next(size_t)
{
2022-05-02 12:09:54 +00:00
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
2022-04-19 18:23:04 +00:00
if (current_index >= keys.size())
2022-11-26 03:24:11 +00:00
return {};
auto key = keys[current_index];
std::optional<S3::ObjectInfo> info;
2023-06-23 13:43:40 +00:00
if (file_progress_callback)
{
info = S3::getObjectInfo(*client, bucket, key, version_id, request_settings);
2023-06-23 13:43:40 +00:00
file_progress_callback(FileProgress(0, info->size));
}
2022-11-26 03:24:11 +00:00
2023-09-27 12:54:31 +00:00
return std::make_shared<KeyWithInfo>(key, info);
}
size_t objectsCount()
{
return keys.size();
}
private:
Strings keys;
2022-04-19 18:23:04 +00:00
std::atomic_size_t index = 0;
std::unique_ptr<S3::Client> client;
String version_id;
2022-05-19 11:18:58 +00:00
String bucket;
S3Settings::RequestSettings request_settings;
2023-06-23 13:43:40 +00:00
std::function<void(FileProgress)> file_progress_callback;
};
2022-05-19 11:18:58 +00:00
StorageS3Source::KeysIterator::KeysIterator(
const S3::Client & client_,
const std::string & version_id_,
const std::vector<String> & keys_,
const String & bucket_,
const S3Settings::RequestSettings & request_settings_,
KeysWithInfo * read_keys,
2023-06-23 13:43:40 +00:00
std::function<void(FileProgress)> file_progress_callback_)
: pimpl(std::make_shared<StorageS3Source::KeysIterator::Impl>(
client_, version_id_, keys_, bucket_, request_settings_,
2023-11-13 11:41:40 +00:00
read_keys, file_progress_callback_))
{
}
StorageS3Source::KeyWithInfoPtr StorageS3Source::KeysIterator::next(size_t idx) /// NOLINT
{
return pimpl->next(idx);
}
2019-12-03 16:23:24 +00:00
size_t StorageS3Source::KeysIterator::estimatedKeysCount()
{
return pimpl->objectsCount();
}
StorageS3Source::ReadTaskIterator::ReadTaskIterator(
const DB::ReadTaskCallback & callback_,
size_t max_threads_count)
: callback(callback_)
{
ThreadPool pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, CurrentMetrics::StorageS3ThreadsScheduled, max_threads_count);
2023-09-20 17:33:25 +00:00
auto pool_scheduler = threadPoolCallbackRunner<String>(pool, "S3ReadTaskItr");
std::vector<std::future<String>> keys;
2023-09-20 17:37:06 +00:00
keys.reserve(max_threads_count);
for (size_t i = 0; i < max_threads_count; ++i)
keys.push_back(pool_scheduler([this] { return callback(); }, Priority{}));
pool.wait();
buffer.reserve(max_threads_count);
for (auto & key_future : keys)
2023-09-27 12:54:31 +00:00
buffer.emplace_back(std::make_shared<KeyWithInfo>(key_future.get(), std::nullopt));
}
StorageS3Source::KeyWithInfoPtr StorageS3Source::ReadTaskIterator::next(size_t) /// NOLINT
{
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= buffer.size())
2023-09-27 12:54:31 +00:00
return std::make_shared<KeyWithInfo>(callback());
return buffer[current_index];
}
size_t StorageS3Source::ReadTaskIterator::estimatedKeysCount()
{
return buffer.size();
}
2021-03-22 17:12:31 +00:00
StorageS3Source::StorageS3Source(
const ReadFromFormatInfo & info,
2021-04-08 00:09:15 +00:00
const String & format_,
2021-03-22 17:12:31 +00:00
String name_,
const ContextPtr & context_,
2021-08-23 19:05:28 +00:00
std::optional<FormatSettings> format_settings_,
2021-04-08 00:09:15 +00:00
UInt64 max_block_size_,
const S3Settings::RequestSettings & request_settings_,
String compression_hint_,
const std::shared_ptr<const S3::Client> & client_,
2021-04-08 00:09:15 +00:00
const String & bucket_,
const String & version_id_,
const String & url_host_and_port_,
std::shared_ptr<IIterator> file_iterator_,
const size_t max_parsing_threads_,
2023-12-29 17:41:11 +00:00
bool need_only_count_)
: SourceWithKeyCondition(info.source_header, false)
2021-04-12 19:35:26 +00:00
, WithContext(context_)
2021-03-22 17:12:31 +00:00
, name(std::move(name_))
2021-04-08 00:09:15 +00:00
, bucket(bucket_)
, version_id(version_id_)
, url_host_and_port(url_host_and_port_)
2021-04-08 00:09:15 +00:00
, format(format_)
, columns_desc(info.columns_description)
, requested_columns(info.requested_columns)
2021-04-08 00:09:15 +00:00
, max_block_size(max_block_size_)
, request_settings(request_settings_)
, compression_hint(std::move(compression_hint_))
2021-04-08 00:09:15 +00:00
, client(client_)
, sample_block(info.format_header)
2021-08-23 19:05:28 +00:00
, format_settings(format_settings_)
, requested_virtual_columns(info.requested_virtual_columns)
2021-04-08 00:09:15 +00:00
, file_iterator(file_iterator_)
, max_parsing_threads(max_parsing_threads_)
, need_only_count(need_only_count_)
, create_reader_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, CurrentMetrics::StorageS3ThreadsScheduled, 1)
2022-11-26 03:24:11 +00:00
, create_reader_scheduler(threadPoolCallbackRunner<ReaderHolder>(create_reader_pool, "CreateS3Reader"))
2021-04-08 00:09:15 +00:00
{
}
void StorageS3Source::lazyInitialize(size_t idx)
{
if (initialized)
return;
reader = createReader(idx);
2022-11-26 03:24:11 +00:00
if (reader)
reader_future = createReaderAsync(idx);
initialized = true;
2021-04-08 00:09:15 +00:00
}
StorageS3Source::ReaderHolder StorageS3Source::createReader(size_t idx)
2021-03-22 17:12:31 +00:00
{
2023-09-27 12:54:31 +00:00
KeyWithInfoPtr key_with_info;
2023-06-13 14:43:50 +00:00
do
{
key_with_info = file_iterator->next(idx);
2023-09-28 14:25:04 +00:00
if (!key_with_info || key_with_info->key.empty())
2023-06-13 14:43:50 +00:00
return {};
2023-09-27 12:54:31 +00:00
if (!key_with_info->info)
key_with_info->info = S3::getObjectInfo(*client, bucket, key_with_info->key, version_id, request_settings);
2023-06-13 14:43:50 +00:00
}
2023-09-27 12:54:31 +00:00
while (getContext()->getSettingsRef().s3_skip_empty_files && key_with_info->info->size == 0);
2021-04-13 10:59:02 +00:00
QueryPipelineBuilder builder;
std::shared_ptr<ISource> source;
std::unique_ptr<ReadBuffer> read_buf;
2023-09-27 12:54:31 +00:00
std::optional<size_t> num_rows_from_cache = need_only_count && getContext()->getSettingsRef().use_cache_for_count_from_files ? tryGetNumRowsFromCache(*key_with_info) : std::nullopt;
if (num_rows_from_cache)
{
2023-08-22 12:55:00 +00:00
/// We should not return single chunk with all number of rows,
/// because there is a chance that this chunk will be materialized later
/// (it can cause memory problems even with default values in columns or when virtual columns are requested).
/// Instead, we use special ConstChunkGenerator that will generate chunks
/// with max_block_size rows until total number of rows is reached.
source = std::make_shared<ConstChunkGenerator>(sample_block, *num_rows_from_cache, max_block_size);
builder.init(Pipe(source));
}
else
{
2023-09-27 12:54:31 +00:00
auto compression_method = chooseCompressionMethod(key_with_info->key, compression_hint);
read_buf = createS3ReadBuffer(key_with_info->key, key_with_info->info->size);
2021-04-08 00:09:15 +00:00
auto input_format = FormatFactory::instance().getInput(
2023-08-23 13:18:02 +00:00
format,
*read_buf,
sample_block,
getContext(),
max_block_size,
format_settings,
max_parsing_threads,
2023-08-23 13:18:02 +00:00
/* max_download_threads= */ std::nullopt,
/* is_remote_fs */ true,
compression_method,
need_only_count);
2023-08-22 13:22:17 +00:00
if (key_condition)
input_format->setKeyCondition(key_condition);
2022-03-21 14:52:26 +00:00
2023-08-23 20:31:49 +00:00
if (need_only_count)
input_format->needOnlyCount();
2022-03-21 14:52:26 +00:00
builder.init(Pipe(input_format));
2020-02-14 17:47:39 +00:00
if (columns_desc.hasDefaults())
{
builder.addSimpleTransform(
[&](const Block & header)
{ return std::make_shared<AddingDefaultsTransform>(header, columns_desc, *input_format, getContext()); });
}
source = input_format;
2021-07-20 18:18:43 +00:00
}
/// Add ExtractColumnsTransform to extract requested columns/subcolumns
/// from chunk read by IInputFormat.
builder.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExtractColumnsTransform>(header, requested_columns);
});
2022-11-26 03:24:11 +00:00
auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
auto current_reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
2021-04-08 00:09:15 +00:00
ProfileEvents::increment(ProfileEvents::EngineFileLikeReadFiles);
return ReaderHolder{key_with_info, bucket, std::move(read_buf), std::move(source), std::move(pipeline), std::move(current_reader)};
2021-03-22 17:12:31 +00:00
}
std::future<StorageS3Source::ReaderHolder> StorageS3Source::createReaderAsync(size_t idx)
2022-03-23 08:40:00 +00:00
{
return create_reader_scheduler([=, this] { return createReader(idx); }, Priority{});
2022-11-26 03:24:11 +00:00
}
std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & key, size_t object_size)
2022-11-26 03:24:11 +00:00
{
auto read_settings = getContext()->getReadSettings().adjustBufferSize(object_size);
read_settings.enable_filesystem_cache = false;
2022-03-23 08:40:00 +00:00
auto download_buffer_size = getContext()->getSettings().max_download_buffer_size;
2023-03-28 20:28:28 +00:00
const bool object_too_small = object_size <= 2 * download_buffer_size;
2022-11-26 03:24:11 +00:00
2023-03-28 20:28:28 +00:00
// Create a read buffer that will prefetch the first ~1 MB of the file.
// When reading lots of tiny files, this prefetching almost doubles the throughput.
// For bigger files, parallel reading is more useful.
if (object_too_small && read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
2022-03-23 08:40:00 +00:00
{
2023-03-28 20:28:28 +00:00
LOG_TRACE(log, "Downloading object of size {} from S3 with initial prefetch", object_size);
return createAsyncS3ReadBuffer(key, read_settings, object_size);
2022-03-23 08:40:00 +00:00
}
return std::make_unique<ReadBufferFromS3>(
client, bucket, key, version_id, request_settings, read_settings,
/*use_external_buffer*/ false, /*offset_*/ 0, /*read_until_position_*/ 0,
/*restricted_seek_*/ false, object_size);
2022-03-23 08:40:00 +00:00
}
2022-11-26 03:24:11 +00:00
std::unique_ptr<ReadBuffer> StorageS3Source::createAsyncS3ReadBuffer(
const String & key, const ReadSettings & read_settings, size_t object_size)
{
auto context = getContext();
2022-11-26 03:24:11 +00:00
auto read_buffer_creator =
2023-03-28 20:28:28 +00:00
[this, read_settings, object_size]
2023-04-17 14:41:21 +00:00
(const std::string & path, size_t read_until_position) -> std::unique_ptr<ReadBufferFromFileBase>
2022-11-26 03:24:11 +00:00
{
2023-04-17 14:41:21 +00:00
return std::make_unique<ReadBufferFromS3>(
2022-11-26 03:24:11 +00:00
client,
bucket,
path,
version_id,
request_settings,
read_settings,
/* use_external_buffer */true,
/* offset */0,
read_until_position,
2023-03-28 20:28:28 +00:00
/* restricted_seek */true,
object_size);
2022-11-26 03:24:11 +00:00
};
auto s3_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
StoredObjects{StoredObject{key, /* local_path */ "", object_size}},
read_settings,
2023-06-08 11:39:05 +00:00
/* cache_log */nullptr, /* use_external_buffer */true);
2023-05-03 11:16:08 +00:00
auto modified_settings{read_settings};
/// FIXME: Changing this setting to default value breaks something around parquet reading
modified_settings.remote_read_min_bytes_for_seek = modified_settings.remote_fs_buffer_size;
2022-11-26 03:24:11 +00:00
auto & pool_reader = context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
2023-05-22 17:51:58 +00:00
auto async_reader = std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(s3_impl), pool_reader, modified_settings,
context->getAsyncReadCounters(), context->getFilesystemReadPrefetchesLog());
2022-11-26 03:24:11 +00:00
async_reader->setReadUntilEnd();
if (read_settings.remote_fs_prefetch)
2023-02-07 17:50:31 +00:00
async_reader->prefetch(DEFAULT_PREFETCH_PRIORITY);
2022-11-26 03:24:11 +00:00
return async_reader;
}
StorageS3Source::~StorageS3Source()
{
create_reader_pool.wait();
}
2021-03-22 17:12:31 +00:00
String StorageS3Source::getName() const
{
return name;
}
2019-05-23 09:03:39 +00:00
2021-03-22 17:12:31 +00:00
Chunk StorageS3Source::generate()
{
lazyInitialize();
2022-02-11 18:14:55 +00:00
while (true)
2021-03-22 17:12:31 +00:00
{
if (isCancelled() || !reader)
{
if (reader)
reader->cancel();
2022-02-11 18:14:55 +00:00
break;
}
2021-03-22 17:12:31 +00:00
2022-02-11 18:14:55 +00:00
Chunk chunk;
if (reader->pull(chunk))
2019-05-31 07:27:14 +00:00
{
2022-02-11 18:14:55 +00:00
UInt64 num_rows = chunk.getNumRows();
total_rows_in_file += num_rows;
size_t chunk_size = 0;
if (const auto * input_format = reader.getInputFormat())
chunk_size = reader.getInputFormat()->getApproxBytesReadForChunk();
2023-06-23 13:43:40 +00:00
progress(num_rows, chunk_size ? chunk_size : chunk.bytes());
VirtualColumnUtils::addRequestedPathFileAndSizeVirtualsToChunk(chunk, requested_virtual_columns, reader.getPath(), reader.getFileSize());
2022-02-11 18:14:55 +00:00
return chunk;
2019-05-31 07:27:14 +00:00
}
if (reader.getInputFormat() && getContext()->getSettingsRef().use_cache_for_count_from_files)
addNumRowsToCache(reader.getFile(), total_rows_in_file);
total_rows_in_file = 0;
2021-03-22 17:12:31 +00:00
assert(reader_future.valid());
reader = reader_future.get();
if (!reader)
break;
/// Even if task is finished the thread may be not freed in pool.
/// So wait until it will be freed before scheduling a new task.
create_reader_pool.wait();
reader_future = createReaderAsync();
}
2022-11-26 03:24:11 +00:00
return {};
}
2021-04-12 19:35:26 +00:00
void StorageS3Source::addNumRowsToCache(const String & key, size_t num_rows)
{
String source = fs::path(url_host_and_port) / bucket / key;
auto cache_key = getKeyForSchemaCache(source, format, format_settings, getContext());
StorageS3::getSchemaCache(getContext()).addNumRows(cache_key, num_rows);
}
std::optional<size_t> StorageS3Source::tryGetNumRowsFromCache(const KeyWithInfo & key_with_info)
{
String source = fs::path(url_host_and_port) / bucket / key_with_info.key;
auto cache_key = getKeyForSchemaCache(source, format, format_settings, getContext());
auto get_last_mod_time = [&]() -> std::optional<time_t>
{
return key_with_info.info->last_modification_time;
};
return StorageS3::getSchemaCache(getContext()).tryGetNumRows(cache_key, get_last_mod_time);
}
2021-07-23 19:33:59 +00:00
class StorageS3Sink : public SinkToStorage
2021-03-22 17:12:31 +00:00
{
2021-04-12 17:07:01 +00:00
public:
2021-07-23 19:33:59 +00:00
StorageS3Sink(
2021-04-12 17:07:01 +00:00
const String & format,
const Block & sample_block_,
const ContextPtr & context,
2021-08-23 19:05:28 +00:00
std::optional<FormatSettings> format_settings_,
2021-04-12 17:07:01 +00:00
const CompressionMethod compression_method,
2023-03-28 13:39:59 +00:00
const StorageS3::Configuration & configuration_,
2021-04-12 17:07:01 +00:00
const String & bucket,
2022-04-03 22:33:59 +00:00
const String & key)
2021-07-23 19:33:59 +00:00
: SinkToStorage(sample_block_)
, sample_block(sample_block_)
2021-08-23 19:05:28 +00:00
, format_settings(format_settings_)
2019-05-23 09:03:39 +00:00
{
BlobStorageLogWriterPtr blob_log = nullptr;
if (auto blob_storage_log = context->getBlobStorageLog())
{
blob_log = std::make_shared<BlobStorageLogWriter>(std::move(blob_storage_log));
blob_log->query_id = context->getCurrentQueryId();
}
const auto & settings = context->getSettingsRef();
2021-04-12 17:07:01 +00:00
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromS3>(
2023-03-28 13:39:59 +00:00
configuration_.client,
2022-03-30 08:15:20 +00:00
bucket,
key,
DBMS_DEFAULT_BUFFER_SIZE,
2023-03-28 13:39:59 +00:00
configuration_.request_settings,
std::move(blob_log),
2022-03-30 08:15:20 +00:00
std::nullopt,
threadPoolCallbackRunner<void>(getIOThreadPool().get(), "S3ParallelWrite"),
2022-07-13 17:48:57 +00:00
context->getWriteSettings()),
2022-03-30 08:15:20 +00:00
compression_method,
static_cast<int>(settings.output_format_compression_level),
static_cast<int>(settings.output_format_compression_zstd_window_log));
2022-03-30 08:15:20 +00:00
writer
= FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, format_settings);
2021-04-12 17:07:01 +00:00
}
2019-05-31 07:27:14 +00:00
2021-07-23 19:33:59 +00:00
String getName() const override { return "StorageS3Sink"; }
2019-05-31 07:27:14 +00:00
2021-07-23 19:33:59 +00:00
void consume(Chunk chunk) override
2021-04-12 17:07:01 +00:00
{
std::lock_guard lock(cancel_mutex);
if (cancelled)
return;
writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
2021-04-12 17:07:01 +00:00
}
2019-05-31 07:27:14 +00:00
void onCancel() override
{
std::lock_guard lock(cancel_mutex);
finalize();
cancelled = true;
}
2023-06-22 09:09:26 +00:00
void onException(std::exception_ptr exception) override
{
std::lock_guard lock(cancel_mutex);
2023-06-22 09:09:26 +00:00
try
{
std::rethrow_exception(exception);
}
catch (...)
{
/// An exception context is needed to proper delete write buffers without finalization
release();
}
}
2021-07-23 19:33:59 +00:00
void onFinish() override
2021-04-12 17:07:01 +00:00
{
std::lock_guard lock(cancel_mutex);
finalize();
}
private:
void finalize()
{
if (!writer)
return;
try
{
2021-11-11 18:09:21 +00:00
writer->finalize();
writer->flush();
write_buf->finalize();
}
catch (...)
{
/// Stop ParallelFormattingOutputFormat correctly.
2023-06-22 09:09:26 +00:00
release();
throw;
}
2021-04-12 17:07:01 +00:00
}
2019-05-31 07:27:14 +00:00
2023-06-03 18:59:04 +00:00
void release()
{
writer.reset();
write_buf.reset();
}
2021-04-12 17:07:01 +00:00
Block sample_block;
2021-08-23 19:05:28 +00:00
std::optional<FormatSettings> format_settings;
2021-04-12 17:07:01 +00:00
std::unique_ptr<WriteBuffer> write_buf;
2021-10-11 16:11:50 +00:00
OutputFormatPtr writer;
bool cancelled = false;
std::mutex cancel_mutex;
2021-04-12 17:07:01 +00:00
};
2019-05-23 09:03:39 +00:00
class PartitionedStorageS3Sink : public PartitionedSink, WithContext
2021-05-25 06:45:30 +00:00
{
public:
PartitionedStorageS3Sink(
const ASTPtr & partition_by,
const String & format_,
const Block & sample_block_,
const ContextPtr & context_,
2021-08-25 15:37:17 +00:00
std::optional<FormatSettings> format_settings_,
2021-05-25 06:45:30 +00:00
const CompressionMethod compression_method_,
2023-03-28 13:39:59 +00:00
const StorageS3::Configuration & configuration_,
2021-05-25 06:45:30 +00:00
const String & bucket_,
2022-04-03 22:33:59 +00:00
const String & key_)
: PartitionedSink(partition_by, context_, sample_block_), WithContext(context_)
2021-05-25 06:45:30 +00:00
, format(format_)
, sample_block(sample_block_)
, compression_method(compression_method_)
2023-03-28 13:39:59 +00:00
, configuration(configuration_)
2021-05-25 06:45:30 +00:00
, bucket(bucket_)
, key(key_)
2021-08-25 15:37:17 +00:00
, format_settings(format_settings_)
2021-05-25 06:45:30 +00:00
{
}
2021-10-26 09:31:01 +00:00
SinkPtr createSinkForPartition(const String & partition_id) override
2021-05-25 06:45:30 +00:00
{
2021-10-26 09:31:01 +00:00
auto partition_bucket = replaceWildcards(bucket, partition_id);
validateBucket(partition_bucket);
2021-05-25 06:45:30 +00:00
2021-10-26 09:31:01 +00:00
auto partition_key = replaceWildcards(key, partition_id);
validateKey(partition_key);
2021-05-25 06:45:30 +00:00
2021-10-26 09:31:01 +00:00
return std::make_shared<StorageS3Sink>(
format,
sample_block,
getContext(),
2021-10-26 09:31:01 +00:00
format_settings,
compression_method,
2023-03-28 13:39:59 +00:00
configuration,
2021-10-26 09:31:01 +00:00
partition_bucket,
2022-04-03 22:33:59 +00:00
partition_key
2021-10-26 09:31:01 +00:00
);
2021-05-25 06:45:30 +00:00
}
private:
const String format;
const Block sample_block;
const CompressionMethod compression_method;
const StorageS3::Configuration configuration;
2021-05-25 06:45:30 +00:00
const String bucket;
const String key;
2023-03-31 14:08:28 +00:00
const std::optional<FormatSettings> format_settings;
2021-05-25 06:45:30 +00:00
2021-08-20 13:12:30 +00:00
static void validateBucket(const String & str)
{
2021-08-20 13:12:30 +00:00
S3::URI::validateBucket(str, {});
2021-08-20 13:12:30 +00:00
if (!DB::UTF8::isValidUTF8(reinterpret_cast<const UInt8 *>(str.data()), str.size()))
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Incorrect non-UTF8 sequence in bucket name");
validatePartitionKey(str, false);
}
2021-08-20 13:12:30 +00:00
static void validateKey(const String & str)
{
/// See:
/// - https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-keys.html
/// - https://cloud.ibm.com/apidocs/cos/cos-compatibility#putobject
2021-08-20 13:12:30 +00:00
if (str.empty() || str.size() > 1024)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Incorrect key length (not empty, max 1023 characters), got: {}", str.size());
2021-08-20 13:12:30 +00:00
if (!DB::UTF8::isValidUTF8(reinterpret_cast<const UInt8 *>(str.data()), str.size()))
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Incorrect non-UTF8 sequence in key");
validatePartitionKey(str, true);
}
2021-05-25 06:45:30 +00:00
};
2019-12-04 16:06:55 +00:00
StorageS3::StorageS3(
2023-03-28 13:39:59 +00:00
const Configuration & configuration_,
const ContextPtr & context_,
2019-12-04 16:06:55 +00:00
const StorageID & table_id_,
2019-09-22 22:13:42 +00:00
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
2021-04-23 12:18:23 +00:00
const String & comment,
2021-08-23 19:05:28 +00:00
std::optional<FormatSettings> format_settings_,
2021-10-26 12:22:13 +00:00
bool distributed_processing_,
ASTPtr partition_by_)
2020-04-27 13:55:30 +00:00
: IStorage(table_id_)
2023-03-28 13:39:59 +00:00
, configuration(configuration_)
, name(configuration.url.storage_name)
2021-04-13 20:17:25 +00:00
, distributed_processing(distributed_processing_)
2021-08-23 19:05:28 +00:00
, format_settings(format_settings_)
2021-10-26 12:22:13 +00:00
, partition_by(partition_by_)
2019-09-22 22:13:42 +00:00
{
2023-09-25 20:19:09 +00:00
updateConfiguration(context_); // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall)
if (configuration.format != "auto")
FormatFactory::instance().checkFormatName(configuration.format);
2023-03-28 13:39:59 +00:00
context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration.url.uri);
2023-06-15 13:49:49 +00:00
context_->getGlobalContext()->getHTTPHeaderFilter().checkHeaders(configuration.headers_from_ast);
2023-03-28 13:39:59 +00:00
StorageInMemoryMetadata storage_metadata;
if (columns_.empty())
{
ColumnsDescription columns;
if (configuration.format == "auto")
std::tie(columns, configuration.format) = getTableStructureAndFormatFromData(configuration, format_settings, context_);
else
columns = getTableStructureFromData(configuration, format_settings, context_);
storage_metadata.setColumns(columns);
}
else
{
if (configuration.format == "auto")
configuration.format = getTableStructureAndFormatFromData(configuration, format_settings, context_).second;
/// We don't allow special columns in S3 storage.
if (!columns_.hasOnlyOrdinary())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table engine S3 doesn't support special columns like MATERIALIZED, ALIAS or EPHEMERAL");
storage_metadata.setColumns(columns_);
}
2020-06-19 15:39:41 +00:00
storage_metadata.setConstraints(constraints_);
2021-04-23 12:18:23 +00:00
storage_metadata.setComment(comment);
2020-06-19 15:39:41 +00:00
setInMemoryMetadata(storage_metadata);
2022-03-28 19:18:20 +00:00
virtual_columns = VirtualColumnUtils::getPathFileAndSizeVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());
2019-09-22 22:13:42 +00:00
}
2023-11-13 11:41:40 +00:00
static std::shared_ptr<StorageS3Source::IIterator> createFileIterator(
const StorageS3::Configuration & configuration,
2022-04-19 18:23:04 +00:00
bool distributed_processing,
ContextPtr local_context,
2023-12-29 17:41:11 +00:00
const ActionsDAG::Node * predicate,
const NamesAndTypesList & virtual_columns,
2023-11-13 11:41:40 +00:00
StorageS3::KeysWithInfo * read_keys = nullptr,
std::function<void(FileProgress)> file_progress_callback = {})
{
if (distributed_processing)
{
return std::make_shared<StorageS3Source::ReadTaskIterator>(local_context->getReadTaskCallback(), local_context->getSettingsRef().max_threads);
}
2023-03-28 13:39:59 +00:00
else if (configuration.withGlobs())
{
/// Iterate through disclosed globs and make a source for each file
return std::make_shared<StorageS3Source::DisclosedGlobIterator>(
2023-12-29 17:41:11 +00:00
*configuration.client, configuration.url, predicate, virtual_columns,
2023-06-23 13:43:40 +00:00
local_context, read_keys, configuration.request_settings, file_progress_callback);
}
else
{
2024-01-04 14:41:04 +00:00
Strings keys = configuration.keys;
auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
if (filter_dag)
{
std::vector<String> paths;
paths.reserve(keys.size());
for (const auto & key : keys)
paths.push_back(fs::path(configuration.url.bucket) / key);
VirtualColumnUtils::filterByPathOrFile(keys, paths, filter_dag, virtual_columns, local_context);
}
return std::make_shared<StorageS3Source::KeysIterator>(
2023-11-13 11:41:40 +00:00
*configuration.client, configuration.url.version_id, keys,
configuration.url.bucket, configuration.request_settings, read_keys, file_progress_callback);
}
}
2019-09-22 22:13:42 +00:00
bool StorageS3::supportsSubsetOfColumns(const ContextPtr & context) const
2022-02-23 19:31:16 +00:00
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format, context, format_settings);
2022-02-23 19:31:16 +00:00
}
bool StorageS3::prefersLargeBlocks() const
{
return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(configuration.format);
}
bool StorageS3::parallelizeOutputAfterReading(ContextPtr context) const
{
return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration.format, context);
}
void StorageS3::read(
QueryPlan & query_plan,
2019-09-22 22:13:42 +00:00
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
2022-05-19 11:18:58 +00:00
SelectQueryInfo & query_info,
ContextPtr local_context,
2019-06-01 21:18:20 +00:00
QueryProcessingStage::Enum /*processed_stage*/,
2019-05-23 09:03:39 +00:00
size_t max_block_size,
size_t num_streams)
2019-05-23 09:03:39 +00:00
{
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), virtual_columns);
2024-01-04 14:41:04 +00:00
bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files;
auto reading = std::make_unique<ReadFromStorageS3Step>(
read_from_format_info.source_header,
column_names,
storage_snapshot,
*this,
2024-01-04 14:41:04 +00:00
std::move(read_from_format_info),
need_only_count,
local_context,
max_block_size,
num_streams);
query_plan.addStep(std::move(reading));
}
2023-03-30 21:06:53 +00:00
2023-12-29 17:41:11 +00:00
void ReadFromStorageS3Step::applyFilters()
{
auto filter_actions_dag = ActionsDAG::buildFilterActionsDAG(filter_nodes.nodes);
2023-12-29 17:41:11 +00:00
const ActionsDAG::Node * predicate = nullptr;
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);
createIterator(predicate);
}
void ReadFromStorageS3Step::createIterator(const ActionsDAG::Node * predicate)
{
if (iterator_wrapper)
return;
iterator_wrapper = createFileIterator(
query_configuration, storage.distributed_processing, local_context, predicate,
virtual_columns, nullptr, local_context->getFileProgressCallback());
}
2023-03-30 21:06:53 +00:00
2023-12-29 17:41:11 +00:00
void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
if (storage.partition_by && query_configuration.withWildcard())
2022-05-19 11:18:58 +00:00
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned S3 storage is not implemented yet");
2023-12-29 17:41:11 +00:00
createIterator(nullptr);
2022-03-28 19:18:20 +00:00
size_t estimated_keys_count = iterator_wrapper->estimatedKeysCount();
if (estimated_keys_count > 1)
num_streams = std::min(num_streams, estimated_keys_count);
else
/// Disclosed glob iterator can underestimate the amount of keys in some cases. We will keep one stream for this particular case.
num_streams = 1;
2023-08-22 15:23:10 +00:00
const size_t max_threads = local_context->getSettingsRef().max_threads;
2023-09-21 19:55:42 +00:00
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / std::max(num_streams, 1ul));
2024-01-23 17:04:50 +00:00
LOG_DEBUG(getLogger("StorageS3"), "Reading in {} streams, {} threads per stream", num_streams, max_parsing_threads);
2023-11-13 11:41:40 +00:00
Pipes pipes;
pipes.reserve(num_streams);
2021-04-08 00:09:15 +00:00
for (size_t i = 0; i < num_streams; ++i)
2021-04-06 19:18:45 +00:00
{
2024-01-03 17:44:28 +00:00
auto source = std::make_shared<StorageS3Source>(
read_from_format_info,
query_configuration.format,
storage.getName(),
local_context,
storage.format_settings,
max_block_size,
query_configuration.request_settings,
query_configuration.compression_method,
query_configuration.client,
query_configuration.url.bucket,
query_configuration.url.version_id,
query_configuration.url.uri.getHost() + std::to_string(query_configuration.url.uri.getPort()),
2022-03-23 08:40:00 +00:00
iterator_wrapper,
2023-08-22 15:23:10 +00:00
max_parsing_threads,
2024-01-03 17:44:28 +00:00
need_only_count);
source->setKeyCondition(filter_nodes.nodes, local_context);
pipes.emplace_back(std::move(source));
2021-04-06 19:18:45 +00:00
}
2024-01-02 15:18:13 +00:00
auto pipe = Pipe::unitePipes(std::move(pipes));
if (pipe.empty())
pipe = Pipe(std::make_shared<NullSource>(read_from_format_info.source_header));
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);
pipeline.init(std::move(pipe));
}
SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)
2019-05-23 09:03:39 +00:00
{
2023-04-03 17:53:34 +00:00
auto query_configuration = updateConfigurationAndGetCopy(local_context);
2021-05-25 06:45:30 +00:00
auto sample_block = metadata_snapshot->getSampleBlock();
2023-04-03 17:53:34 +00:00
auto chosen_compression_method = chooseCompressionMethod(query_configuration.keys.back(), query_configuration.compression_method);
2021-07-14 08:49:05 +00:00
auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query);
2021-10-26 12:22:13 +00:00
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
2023-04-03 17:53:34 +00:00
bool is_partitioned_implementation = partition_by_ast && query_configuration.withWildcard();
2021-07-14 08:49:05 +00:00
if (is_partitioned_implementation)
2021-05-25 06:45:30 +00:00
{
return std::make_shared<PartitionedStorageS3Sink>(
2021-10-26 12:22:13 +00:00
partition_by_ast,
2023-04-03 17:53:34 +00:00
query_configuration.format,
2021-05-25 06:45:30 +00:00
sample_block,
local_context,
2021-08-25 15:37:17 +00:00
format_settings,
2021-05-25 06:45:30 +00:00
chosen_compression_method,
query_configuration,
query_configuration.url.bucket,
query_configuration.keys.back());
2021-05-25 06:45:30 +00:00
}
else
{
2023-04-03 17:53:34 +00:00
if (query_configuration.withGlobs())
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
"S3 key '{}' contains globs, so the table is in readonly mode", query_configuration.url.key);
2022-01-14 18:18:16 +00:00
bool truncate_in_insert = local_context->getSettingsRef().s3_truncate_on_insert;
if (!truncate_in_insert && S3::objectExists(*query_configuration.client, query_configuration.url.bucket, query_configuration.keys.back(), query_configuration.url.version_id, query_configuration.request_settings))
2022-01-14 18:18:16 +00:00
{
if (local_context->getSettingsRef().s3_create_new_file_on_insert)
{
2023-04-03 17:53:34 +00:00
size_t index = query_configuration.keys.size();
const auto & first_key = query_configuration.keys[0];
2023-03-28 13:39:59 +00:00
auto pos = first_key.find_first_of('.');
2022-01-14 18:18:16 +00:00
String new_key;
do
{
2023-03-28 13:39:59 +00:00
new_key = first_key.substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : first_key.substr(pos));
2022-01-14 18:18:16 +00:00
++index;
}
while (S3::objectExists(*query_configuration.client, query_configuration.url.bucket, new_key, query_configuration.url.version_id, query_configuration.request_settings));
2023-04-05 12:13:46 +00:00
2023-04-03 17:53:34 +00:00
query_configuration.keys.push_back(new_key);
2023-04-05 12:13:46 +00:00
configuration.keys.push_back(new_key);
2022-01-14 18:18:16 +00:00
}
else
2023-03-28 13:39:59 +00:00
{
2022-01-14 18:18:16 +00:00
throw Exception(
2023-03-28 13:39:59 +00:00
ErrorCodes::BAD_ARGUMENTS,
"Object in bucket {} with key {} already exists. "
"If you want to overwrite it, enable setting s3_truncate_on_insert, if you "
"want to create a new file on each insert, enable setting s3_create_new_file_on_insert",
query_configuration.url.bucket, query_configuration.keys.back());
2023-03-28 13:39:59 +00:00
}
2022-01-14 18:18:16 +00:00
}
2021-05-25 06:45:30 +00:00
return std::make_shared<StorageS3Sink>(
2023-04-03 17:53:34 +00:00
query_configuration.format,
2021-05-25 06:45:30 +00:00
sample_block,
local_context,
2021-08-25 15:37:17 +00:00
format_settings,
2021-05-25 06:45:30 +00:00
chosen_compression_method,
query_configuration,
query_configuration.url.bucket,
query_configuration.keys.back());
2021-05-25 06:45:30 +00:00
}
2019-05-23 09:03:39 +00:00
}
2021-06-21 15:44:36 +00:00
void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
{
2023-04-03 17:53:34 +00:00
auto query_configuration = updateConfigurationAndGetCopy(local_context);
2021-06-21 15:44:36 +00:00
2023-04-03 17:53:34 +00:00
if (query_configuration.withGlobs())
2023-03-24 21:35:12 +00:00
{
throw Exception(
ErrorCodes::DATABASE_ACCESS_DENIED,
"S3 key '{}' contains globs, so the table is in readonly mode",
query_configuration.url.key);
2023-03-24 21:35:12 +00:00
}
2021-06-21 15:44:36 +00:00
Aws::S3::Model::Delete delkeys;
2023-04-03 17:53:34 +00:00
for (const auto & key : query_configuration.keys)
{
Aws::S3::Model::ObjectIdentifier obj;
obj.SetKey(key);
delkeys.AddObjects(std::move(obj));
}
2021-06-21 15:44:36 +00:00
2022-09-19 17:23:22 +00:00
ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
S3::DeleteObjectsRequest request;
request.SetBucket(query_configuration.url.bucket);
2021-06-21 15:44:36 +00:00
request.SetDelete(delkeys);
auto response = query_configuration.client->DeleteObjects(request);
const auto * response_error = response.IsSuccess() ? nullptr : &response.GetError();
auto time_now = std::chrono::system_clock::now();
if (auto blob_storage_log = BlobStorageLogWriter::create())
{
for (const auto & key : query_configuration.keys)
blob_storage_log->addEvent(BlobStorageLogElement::EventType::Delete, query_configuration.url.bucket, key, {}, 0, response_error, time_now);
}
2021-06-21 15:44:36 +00:00
if (!response.IsSuccess())
{
const auto & err = response.GetError();
2023-07-31 15:39:07 +00:00
throw S3Exception(err.GetMessage(), err.GetErrorType());
2021-06-21 15:44:36 +00:00
}
for (const auto & error : response.GetResult().GetErrors())
2024-01-23 17:04:50 +00:00
LOG_WARNING(getLogger("StorageS3"), "Failed to delete {}, error: {}", error.GetKey(), error.GetMessage());
2021-06-21 15:44:36 +00:00
}
StorageS3::Configuration StorageS3::updateConfigurationAndGetCopy(const ContextPtr & local_context)
2023-04-03 17:53:34 +00:00
{
std::lock_guard lock(configuration_update_mutex);
configuration.update(local_context);
return configuration;
}
void StorageS3::updateConfiguration(const ContextPtr & local_context)
2023-03-28 13:39:59 +00:00
{
2023-04-03 17:53:34 +00:00
std::lock_guard lock(configuration_update_mutex);
configuration.update(local_context);
2023-03-28 13:39:59 +00:00
}
void StorageS3::useConfiguration(const Configuration & new_configuration)
{
2023-04-03 17:53:34 +00:00
std::lock_guard lock(configuration_update_mutex);
2023-03-28 13:39:59 +00:00
configuration = new_configuration;
}
2021-06-21 15:44:36 +00:00
2023-04-03 17:53:34 +00:00
const StorageS3::Configuration & StorageS3::getConfiguration()
{
std::lock_guard lock(configuration_update_mutex);
return configuration;
}
bool StorageS3::Configuration::update(const ContextPtr & context)
{
auto s3_settings = context->getStorageS3Settings().getSettings(url.uri.toString(), context->getUserName());
2023-03-28 13:39:59 +00:00
request_settings = s3_settings.request_settings;
request_settings.updateFromSettings(context->getSettings());
2022-09-13 13:07:43 +00:00
2024-01-07 22:00:26 +00:00
if (client && (static_configuration || !auth_settings.hasUpdates(s3_settings.auth_settings)))
2023-03-28 13:39:59 +00:00
return false;
2023-03-28 13:39:59 +00:00
auth_settings.updateFrom(s3_settings.auth_settings);
2023-04-05 12:13:46 +00:00
keys[0] = url.key;
2023-03-28 13:39:59 +00:00
connect(context);
return true;
}
2022-09-13 13:07:43 +00:00
void StorageS3::Configuration::connect(const ContextPtr & context)
2023-03-28 13:39:59 +00:00
{
Add ability to disable checksums for S3 to avoid excessive input file read AWS S3 client can read file multiple times, this is required for: - calculate checksums - calculate signature (done only for HTTP, since ClickHouse uses PayloadSigningPolicy::Never) So this means that for HTTP, to send file to S3 it will be read 3x times, and for HTTPS 2x times. By overriding GetChecksumAlgorithmName() to return empty string, checksums can be disabled, and the input file will be read only once. And even though additional https layer adds extra integrity layer, someone still may find this too risky I guess, even though ClickHouse internal format (for MergeTree) has checksums, and more. Here is an example stacktrace of this excessive read: <details> <summary>stacktrace</summary> (lldb) bt * thread 383, name = 'BackupWorker', stop reason = breakpoint 1.1 * frame 0: 0x00000000103c5fc0 clickhouse`DB::StdStreamBufFromReadBuffer::seekpos() + 32 at StdStreamBufFromReadBuffer.cpp:67 frame 1: 0x000000001777f7f8 clickhouse`std::__1::basic_istream<char, std::__1::char_traits<char>>::tellg() [inlined] std::__1::basic_streambuf<char, std::__1::char_traits<char>>::pubseekoff[abi:v15000](this=<unavailable>, __off=0, __way=cur, __which=8) + 120 at streambuf:162 frame 2: 0x000000001777f7e3 clickhouse`std::__1::basic_istream<char, std::__1::char_traits<char>>::tellg() + 99 at istream:1249 frame 3: 0x00000000152e4979 clickhouse`Aws::Utils::Crypto::MD5OpenSSLImpl::Calculate() + 57 at CryptoImpl.cpp:223 frame 4: 0x00000000152dedee clickhouse`Aws::Utils::Crypto::MD5::Calculate() + 14 at MD5.cpp:30 frame 5: 0x00000000152db5ac clickhouse`Aws::Utils::HashingUtils::CalculateMD5() + 44 at HashingUtils.cpp:235 frame 6: 0x000000001528b97b clickhouse`Aws::Client::AWSClient::AddChecksumToRequest() const + 507 at AWSClient.cpp:772 frame 7: 0x000000001528ded2 clickhouse`Aws::Client::AWSClient::BuildHttpRequest() const + 1682 at AWSClient.cpp:930 frame 8: 0x00000000100b864f clickhouse`DB::S3::Client::BuildHttpRequest() const + 15 at Client.cpp:622 frame 9: 0x0000000015286a41 clickhouse`Aws::Client::AWSClient::AttemptOneRequest(this=0x00007ffde2f8f000, httpRequest=<unavailable>, request=<unavailable>, signerName=<unavailable>, signerRegionOverride=<unavailable>, signerServiceNameOverride="s3") const + 65 at AWSClient.cpp:491 frame 10: 0x00000000152845b9 clickhouse`Aws::Client::AWSClient::AttemptExhaustively(this=0x00007ffde2f8f000, uri=0x00007ffdd4d44f38, request=0x00007ffdd4d45d10, method=HTTP_PUT, signerName="SignatureV4", signerRegionOverride="us-east-1", signerServiceNameOverride="s3") const + 1337 at AWSClient.cpp:272 frame 11: 0x0000000015298d0d clickhouse`Aws::Client::AWSXMLClient::MakeRequest() const + 45 at AWSXmlClient.cpp:99 frame 12: 0x0000000015298cb5 clickhouse`Aws::Client::AWSXMLClient::MakeRequest() const + 309 at AWSXmlClient.cpp:66 frame 13: 0x0000000015354b23 clickhouse`Aws::S3::S3Client::PutObject(this=0x00007ffde2f8f000, request=0x00007ffdd4d45d10) const + 2659 at S3Client.cpp:1731 frame 14: 0x00000000100b174f clickhouse`DB::S3::Client::PutObject(DB::S3::ExtendedRequest<Aws::S3::Model::PutObjectRequest> const&) const [inlined] frame 15: 0x00000000100b173a clickhouse`DB::S3::Client::PutObject(DB::S3::ExtendedRequest<Aws::S3::Model::PutObjectRequest> const&) const + 41 at Client.cpp:578 frame 16: 0x00000000100b1711 clickhouse`DB::S3::Client::PutObject(DB::S3::ExtendedRequest<Aws::S3::Model::PutObjectRequest> const&) const + 981 at Client.cpp:508 frame 17: 0x00000000100b133c clickhouse`DB::S3::Client::PutObject(DB::S3::ExtendedRequest<Aws::S3::Model::PutObjectRequest> const&) const [inlined] frame 18: 0x00000000100b133c clickhouse`DB::S3::Client::PutObject() const + 28 at Client.cpp:418 frame 19: 0x00000000103b96d6 clickhouse`DB::copyDataToS3File() </details> This new behaviour could be enabled with `s3_disable_checksum=true`. Note, that I've checked this implementation with GCS/R2/S3/MinIO and it works everywhere.
2023-10-01 19:35:42 +00:00
const Settings & global_settings = context->getGlobalContext()->getSettingsRef();
const Settings & local_settings = context->getSettingsRef();
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
2023-03-24 21:35:12 +00:00
auth_settings.region,
context->getRemoteHostFilter(),
Add ability to disable checksums for S3 to avoid excessive input file read AWS S3 client can read file multiple times, this is required for: - calculate checksums - calculate signature (done only for HTTP, since ClickHouse uses PayloadSigningPolicy::Never) So this means that for HTTP, to send file to S3 it will be read 3x times, and for HTTPS 2x times. By overriding GetChecksumAlgorithmName() to return empty string, checksums can be disabled, and the input file will be read only once. And even though additional https layer adds extra integrity layer, someone still may find this too risky I guess, even though ClickHouse internal format (for MergeTree) has checksums, and more. Here is an example stacktrace of this excessive read: <details> <summary>stacktrace</summary> (lldb) bt * thread 383, name = 'BackupWorker', stop reason = breakpoint 1.1 * frame 0: 0x00000000103c5fc0 clickhouse`DB::StdStreamBufFromReadBuffer::seekpos() + 32 at StdStreamBufFromReadBuffer.cpp:67 frame 1: 0x000000001777f7f8 clickhouse`std::__1::basic_istream<char, std::__1::char_traits<char>>::tellg() [inlined] std::__1::basic_streambuf<char, std::__1::char_traits<char>>::pubseekoff[abi:v15000](this=<unavailable>, __off=0, __way=cur, __which=8) + 120 at streambuf:162 frame 2: 0x000000001777f7e3 clickhouse`std::__1::basic_istream<char, std::__1::char_traits<char>>::tellg() + 99 at istream:1249 frame 3: 0x00000000152e4979 clickhouse`Aws::Utils::Crypto::MD5OpenSSLImpl::Calculate() + 57 at CryptoImpl.cpp:223 frame 4: 0x00000000152dedee clickhouse`Aws::Utils::Crypto::MD5::Calculate() + 14 at MD5.cpp:30 frame 5: 0x00000000152db5ac clickhouse`Aws::Utils::HashingUtils::CalculateMD5() + 44 at HashingUtils.cpp:235 frame 6: 0x000000001528b97b clickhouse`Aws::Client::AWSClient::AddChecksumToRequest() const + 507 at AWSClient.cpp:772 frame 7: 0x000000001528ded2 clickhouse`Aws::Client::AWSClient::BuildHttpRequest() const + 1682 at AWSClient.cpp:930 frame 8: 0x00000000100b864f clickhouse`DB::S3::Client::BuildHttpRequest() const + 15 at Client.cpp:622 frame 9: 0x0000000015286a41 clickhouse`Aws::Client::AWSClient::AttemptOneRequest(this=0x00007ffde2f8f000, httpRequest=<unavailable>, request=<unavailable>, signerName=<unavailable>, signerRegionOverride=<unavailable>, signerServiceNameOverride="s3") const + 65 at AWSClient.cpp:491 frame 10: 0x00000000152845b9 clickhouse`Aws::Client::AWSClient::AttemptExhaustively(this=0x00007ffde2f8f000, uri=0x00007ffdd4d44f38, request=0x00007ffdd4d45d10, method=HTTP_PUT, signerName="SignatureV4", signerRegionOverride="us-east-1", signerServiceNameOverride="s3") const + 1337 at AWSClient.cpp:272 frame 11: 0x0000000015298d0d clickhouse`Aws::Client::AWSXMLClient::MakeRequest() const + 45 at AWSXmlClient.cpp:99 frame 12: 0x0000000015298cb5 clickhouse`Aws::Client::AWSXMLClient::MakeRequest() const + 309 at AWSXmlClient.cpp:66 frame 13: 0x0000000015354b23 clickhouse`Aws::S3::S3Client::PutObject(this=0x00007ffde2f8f000, request=0x00007ffdd4d45d10) const + 2659 at S3Client.cpp:1731 frame 14: 0x00000000100b174f clickhouse`DB::S3::Client::PutObject(DB::S3::ExtendedRequest<Aws::S3::Model::PutObjectRequest> const&) const [inlined] frame 15: 0x00000000100b173a clickhouse`DB::S3::Client::PutObject(DB::S3::ExtendedRequest<Aws::S3::Model::PutObjectRequest> const&) const + 41 at Client.cpp:578 frame 16: 0x00000000100b1711 clickhouse`DB::S3::Client::PutObject(DB::S3::ExtendedRequest<Aws::S3::Model::PutObjectRequest> const&) const + 981 at Client.cpp:508 frame 17: 0x00000000100b133c clickhouse`DB::S3::Client::PutObject(DB::S3::ExtendedRequest<Aws::S3::Model::PutObjectRequest> const&) const [inlined] frame 18: 0x00000000100b133c clickhouse`DB::S3::Client::PutObject() const + 28 at Client.cpp:418 frame 19: 0x00000000103b96d6 clickhouse`DB::copyDataToS3File() </details> This new behaviour could be enabled with `s3_disable_checksum=true`. Note, that I've checked this implementation with GCS/R2/S3/MinIO and it works everywhere.
2023-10-01 19:35:42 +00:00
static_cast<unsigned>(global_settings.s3_max_redirects),
static_cast<unsigned>(global_settings.s3_retry_attempts),
global_settings.enable_s3_requests_logging,
/* for_disk_s3 = */ false,
2023-03-24 21:35:12 +00:00
request_settings.get_request_throttler,
Add global proxy setting (#51749) * initial impl * fix env ut * move ut directory * make sure no null proxy resolver is returned by ProxyConfigurationResolverProvider * minor adjustment * add a few tests, still incomplete * add proxy support for url table function * use proxy for select from url as well * remove optional from return type, just returns empty config * fix style * style * black * ohg boy * rm in progress file * god pls don't let me kill anyone * ... * add use_aws guards * remove hard coded s3 proxy resolver * add concurrency-mt-unsafe * aa * black * add logging back * revert change * imrpove code a bit * helper functions and separate tests * for some reason, this env test is not working.. * formatting * :) * clangtidy * lint * revert some stupid things * small test adjusmtments * simplify tests * rename test * remove extra line * freaking style change * simplify a bit * fix segfault & remove an extra call * tightly couple proxy provider with context.. * remove useless include * rename config prefix parameter * simplify provider a bit * organize provider a bit * add a few comments * comment out proxy env tests * fix nullptr in unit tests * make sure old storage proxy config is properly covered without global context instance * move a few functions from class to anonymous namespace * fix no fallback for specific storage conf * change API to accept http method instead of bool * implement http/https distinction in listresolver, any still not implemented * implement http/https distinction in remote resolver * progress on code, improve tests and add url function working test * use protcol instead of method for http and https * small fix * few more adjustments * fix style * black * move enum to proxyconfiguration * wip * fix build * fix ut * delete atomicroundrobin class * remove stale include * add some tests.. need to spend some more time on the design.. * change design a bit * progress * use existing context for tests * rename aux function and fix ut * .. * rename test * try to simplify tests a bit * simplify tests a bit more * attempt to fix tests, accept more than one remote resolver * use proper log id * try waiting for resolver * proper wait logic * black * empty * address a few comments * refactor tests * remove old tests * baclk * use RAII to set/unset env * black * clang tidy * fix env proxy not respecting any * use log trace * fix wrong logic in getRemoteREsolver * fix wrong logic in getRemoteREsolver * fix test * remove unwanted code * remove ClientConfigurationperRequest and auxilary classes * remove unwanted code * remove adapter test * few adjustments and add test for s3 storage conf with new proxy settings * black * use chassert for context * Add getenv comment
2023-08-24 13:07:26 +00:00
request_settings.put_request_throttler,
url.uri.getScheme());
2023-03-24 21:35:12 +00:00
client_configuration.endpointOverride = url.endpoint;
client_configuration.maxConnections = static_cast<unsigned>(request_settings.max_connections);
Add ability to disable checksums for S3 to avoid excessive input file read AWS S3 client can read file multiple times, this is required for: - calculate checksums - calculate signature (done only for HTTP, since ClickHouse uses PayloadSigningPolicy::Never) So this means that for HTTP, to send file to S3 it will be read 3x times, and for HTTPS 2x times. By overriding GetChecksumAlgorithmName() to return empty string, checksums can be disabled, and the input file will be read only once. And even though additional https layer adds extra integrity layer, someone still may find this too risky I guess, even though ClickHouse internal format (for MergeTree) has checksums, and more. Here is an example stacktrace of this excessive read: <details> <summary>stacktrace</summary> (lldb) bt * thread 383, name = 'BackupWorker', stop reason = breakpoint 1.1 * frame 0: 0x00000000103c5fc0 clickhouse`DB::StdStreamBufFromReadBuffer::seekpos() + 32 at StdStreamBufFromReadBuffer.cpp:67 frame 1: 0x000000001777f7f8 clickhouse`std::__1::basic_istream<char, std::__1::char_traits<char>>::tellg() [inlined] std::__1::basic_streambuf<char, std::__1::char_traits<char>>::pubseekoff[abi:v15000](this=<unavailable>, __off=0, __way=cur, __which=8) + 120 at streambuf:162 frame 2: 0x000000001777f7e3 clickhouse`std::__1::basic_istream<char, std::__1::char_traits<char>>::tellg() + 99 at istream:1249 frame 3: 0x00000000152e4979 clickhouse`Aws::Utils::Crypto::MD5OpenSSLImpl::Calculate() + 57 at CryptoImpl.cpp:223 frame 4: 0x00000000152dedee clickhouse`Aws::Utils::Crypto::MD5::Calculate() + 14 at MD5.cpp:30 frame 5: 0x00000000152db5ac clickhouse`Aws::Utils::HashingUtils::CalculateMD5() + 44 at HashingUtils.cpp:235 frame 6: 0x000000001528b97b clickhouse`Aws::Client::AWSClient::AddChecksumToRequest() const + 507 at AWSClient.cpp:772 frame 7: 0x000000001528ded2 clickhouse`Aws::Client::AWSClient::BuildHttpRequest() const + 1682 at AWSClient.cpp:930 frame 8: 0x00000000100b864f clickhouse`DB::S3::Client::BuildHttpRequest() const + 15 at Client.cpp:622 frame 9: 0x0000000015286a41 clickhouse`Aws::Client::AWSClient::AttemptOneRequest(this=0x00007ffde2f8f000, httpRequest=<unavailable>, request=<unavailable>, signerName=<unavailable>, signerRegionOverride=<unavailable>, signerServiceNameOverride="s3") const + 65 at AWSClient.cpp:491 frame 10: 0x00000000152845b9 clickhouse`Aws::Client::AWSClient::AttemptExhaustively(this=0x00007ffde2f8f000, uri=0x00007ffdd4d44f38, request=0x00007ffdd4d45d10, method=HTTP_PUT, signerName="SignatureV4", signerRegionOverride="us-east-1", signerServiceNameOverride="s3") const + 1337 at AWSClient.cpp:272 frame 11: 0x0000000015298d0d clickhouse`Aws::Client::AWSXMLClient::MakeRequest() const + 45 at AWSXmlClient.cpp:99 frame 12: 0x0000000015298cb5 clickhouse`Aws::Client::AWSXMLClient::MakeRequest() const + 309 at AWSXmlClient.cpp:66 frame 13: 0x0000000015354b23 clickhouse`Aws::S3::S3Client::PutObject(this=0x00007ffde2f8f000, request=0x00007ffdd4d45d10) const + 2659 at S3Client.cpp:1731 frame 14: 0x00000000100b174f clickhouse`DB::S3::Client::PutObject(DB::S3::ExtendedRequest<Aws::S3::Model::PutObjectRequest> const&) const [inlined] frame 15: 0x00000000100b173a clickhouse`DB::S3::Client::PutObject(DB::S3::ExtendedRequest<Aws::S3::Model::PutObjectRequest> const&) const + 41 at Client.cpp:578 frame 16: 0x00000000100b1711 clickhouse`DB::S3::Client::PutObject(DB::S3::ExtendedRequest<Aws::S3::Model::PutObjectRequest> const&) const + 981 at Client.cpp:508 frame 17: 0x00000000100b133c clickhouse`DB::S3::Client::PutObject(DB::S3::ExtendedRequest<Aws::S3::Model::PutObjectRequest> const&) const [inlined] frame 18: 0x00000000100b133c clickhouse`DB::S3::Client::PutObject() const + 28 at Client.cpp:418 frame 19: 0x00000000103b96d6 clickhouse`DB::copyDataToS3File() </details> This new behaviour could be enabled with `s3_disable_checksum=true`. Note, that I've checked this implementation with GCS/R2/S3/MinIO and it works everywhere.
2023-10-01 19:35:42 +00:00
client_configuration.http_connection_pool_size = global_settings.s3_http_connection_pool_size;
2023-03-24 21:35:12 +00:00
auto headers = auth_settings.headers;
if (!headers_from_ast.empty())
headers.insert(headers.end(), headers_from_ast.begin(), headers_from_ast.end());
2022-09-21 14:36:47 +00:00
2023-05-21 10:42:28 +00:00
client_configuration.requestTimeoutMs = request_settings.request_timeout_ms;
S3::ClientSettings client_settings{
.use_virtual_addressing = url.is_virtual_hosted_style,
.disable_checksum = local_settings.s3_disable_checksum,
.gcs_issue_compose_request = context->getConfigRef().getBool("s3.gcs_issue_compose_request", false),
};
auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key, auth_settings.session_token);
2023-03-24 21:35:12 +00:00
client = S3::ClientFactory::instance().create(
client_configuration,
client_settings,
credentials.GetAWSAccessKeyId(),
credentials.GetAWSSecretKey(),
2023-03-24 21:35:12 +00:00
auth_settings.server_side_encryption_customer_key_base64,
auth_settings.server_side_encryption_kms_config,
std::move(headers),
S3::CredentialsConfiguration{
2023-05-21 10:42:28 +00:00
auth_settings.use_environment_credentials.value_or(context->getConfigRef().getBool("s3.use_environment_credentials", true)),
auth_settings.use_insecure_imds_request.value_or(context->getConfigRef().getBool("s3.use_insecure_imds_request", false)),
auth_settings.expiration_window_seconds.value_or(
context->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
auth_settings.no_sign_request.value_or(context->getConfigRef().getBool("s3.no_sign_request", false)),
});
2022-04-03 22:33:59 +00:00
}
void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection)
2022-04-03 22:33:59 +00:00
{
validateNamedCollection(collection, required_configuration_keys, optional_configuration_keys);
2022-12-20 21:33:54 +00:00
auto filename = collection.getOrDefault<String>("filename", "");
if (!filename.empty())
configuration.url = S3::URI(std::filesystem::path(collection.get<String>("url")) / filename);
else
configuration.url = S3::URI(collection.get<String>("url"));
configuration.auth_settings.access_key_id = collection.getOrDefault<String>("access_key_id", "");
configuration.auth_settings.secret_access_key = collection.getOrDefault<String>("secret_access_key", "");
configuration.auth_settings.use_environment_credentials = collection.getOrDefault<UInt64>("use_environment_credentials", 1);
2023-03-30 06:41:14 +00:00
configuration.auth_settings.no_sign_request = collection.getOrDefault<bool>("no_sign_request", false);
2023-03-29 11:08:44 +00:00
configuration.auth_settings.expiration_window_seconds = collection.getOrDefault<UInt64>("expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS);
configuration.format = collection.getOrDefault<String>("format", configuration.format);
configuration.compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
configuration.structure = collection.getOrDefault<String>("structure", "auto");
configuration.request_settings = S3Settings::RequestSettings(collection);
}
StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, const ContextPtr & local_context, bool get_format_from_file)
2019-05-23 09:03:39 +00:00
{
2023-03-28 13:39:59 +00:00
StorageS3::Configuration configuration;
2023-02-13 14:45:12 +00:00
2023-03-28 13:39:59 +00:00
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, local_context))
2019-06-17 00:06:14 +00:00
{
2023-03-28 13:39:59 +00:00
processNamedCollectionResult(configuration, *named_collection);
2021-09-07 11:17:25 +00:00
}
else
{
2023-02-13 14:45:12 +00:00
/// Supported signatures:
///
/// S3('url')
/// S3('url', 'format')
/// S3('url', 'format', 'compression')
/// S3('url', NOSIGN)
/// S3('url', NOSIGN, 'format')
/// S3('url', NOSIGN, 'format', 'compression')
2023-02-13 14:45:12 +00:00
/// S3('url', 'aws_access_key_id', 'aws_secret_access_key')
/// S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'session_token')
2023-02-13 14:45:12 +00:00
/// S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'format')
/// S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'session_token', 'format')
2023-02-13 14:45:12 +00:00
/// S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'format', 'compression')
/// S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'session_token', 'format', 'compression')
2023-02-13 14:45:12 +00:00
/// with optional headers() function
size_t count = StorageURL::evalArgsAndCollectHeaders(engine_args, configuration.headers_from_ast, local_context);
if (count == 0 || count > 6)
2023-02-13 14:45:12 +00:00
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage S3 requires 1 to 5 arguments: "
"url, [NOSIGN | access_key_id, secret_access_key], name of used format and [compression_method]");
2023-02-13 14:45:12 +00:00
std::unordered_map<std::string_view, size_t> engine_args_to_idx;
bool no_sign_request = false;
/// For 2 arguments we support 2 possible variants:
/// - s3(source, format)
/// - s3(source, NOSIGN)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or not.
if (count == 2)
{
auto second_arg = checkAndGetLiteralArgument<String>(engine_args[1], "format/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
no_sign_request = true;
else
engine_args_to_idx = {{"format", 1}};
}
2023-02-13 12:27:49 +00:00
/// For 3 arguments we support 2 possible variants:
/// - s3(source, format, compression_method)
/// - s3(source, access_key_id, secret_access_key)
/// - s3(source, NOSIGN, format)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or format name.
else if (count == 3)
2021-09-07 11:17:25 +00:00
{
auto second_arg = checkAndGetLiteralArgument<String>(engine_args[1], "format/access_key_id/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
{
no_sign_request = true;
engine_args_to_idx = {{"format", 2}};
}
else if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
2023-02-13 14:45:12 +00:00
engine_args_to_idx = {{"format", 1}, {"compression_method", 2}};
2023-02-13 12:27:49 +00:00
else
2023-02-13 14:45:12 +00:00
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}};
2021-09-07 11:17:25 +00:00
}
/// For 4 arguments we support 3 possible variants:
/// - s3(source, access_key_id, secret_access_key, session_token)
/// - s3(source, access_key_id, secret_access_key, format)
/// - s3(source, NOSIGN, format, compression_method)
/// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN or not.
else if (count == 4)
{
auto second_arg = checkAndGetLiteralArgument<String>(engine_args[1], "access_key_id/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
{
no_sign_request = true;
engine_args_to_idx = {{"format", 2}, {"compression_method", 3}};
}
else
{
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "session_token/format");
if (fourth_arg == "auto" || FormatFactory::instance().getAllFormats().contains(fourth_arg))
{
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}};
}
else
{
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}};
}
}
}
/// For 5 arguments we support 2 possible variants:
/// - s3(source, access_key_id, secret_access_key, session_token, format)
/// - s3(source, access_key_id, secret_access_key, format, compression)
else if (count == 5)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "session_token/format");
if (fourth_arg == "auto" || FormatFactory::instance().getAllFormats().contains(fourth_arg))
{
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"compression", 4}};
}
else
{
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}};
}
}
else if (count == 6)
2021-09-07 11:17:25 +00:00
{
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}, {"compression_method", 5}};
2021-09-07 11:17:25 +00:00
}
2023-02-13 12:27:49 +00:00
/// This argument is always the first
2023-03-28 13:39:59 +00:00
configuration.url = S3::URI(checkAndGetLiteralArgument<String>(engine_args[0], "url"));
2023-02-13 12:27:49 +00:00
2023-02-13 14:45:12 +00:00
if (engine_args_to_idx.contains("format"))
2023-03-28 13:39:59 +00:00
configuration.format = checkAndGetLiteralArgument<String>(engine_args[engine_args_to_idx["format"]], "format");
2023-02-13 12:27:49 +00:00
2023-02-13 14:45:12 +00:00
if (engine_args_to_idx.contains("compression_method"))
2023-03-28 13:39:59 +00:00
configuration.compression_method = checkAndGetLiteralArgument<String>(engine_args[engine_args_to_idx["compression_method"]], "compression_method");
2023-02-13 12:27:49 +00:00
2023-02-13 14:45:12 +00:00
if (engine_args_to_idx.contains("access_key_id"))
2023-03-28 13:39:59 +00:00
configuration.auth_settings.access_key_id = checkAndGetLiteralArgument<String>(engine_args[engine_args_to_idx["access_key_id"]], "access_key_id");
2023-02-13 12:27:49 +00:00
2023-02-13 14:45:12 +00:00
if (engine_args_to_idx.contains("secret_access_key"))
2023-03-28 13:39:59 +00:00
configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(engine_args[engine_args_to_idx["secret_access_key"]], "secret_access_key");
if (engine_args_to_idx.contains("session_token"))
configuration.auth_settings.session_token = checkAndGetLiteralArgument<String>(engine_args[engine_args_to_idx["session_token"]], "session_token");
2024-01-07 22:00:26 +00:00
if (no_sign_request)
configuration.auth_settings.no_sign_request = no_sign_request;
2021-09-07 11:17:25 +00:00
}
2019-06-17 00:06:14 +00:00
2024-01-07 22:00:26 +00:00
configuration.static_configuration = !configuration.auth_settings.access_key_id.empty() || configuration.auth_settings.no_sign_request.has_value();
2023-03-28 13:39:59 +00:00
configuration.keys = {configuration.url.key};
2023-02-13 14:45:12 +00:00
2023-03-28 13:39:59 +00:00
if (configuration.format == "auto" && get_format_from_file)
configuration.format = FormatFactory::instance().tryGetFormatFromFileName(configuration.url.key).value_or("auto");
2021-09-07 11:17:25 +00:00
return configuration;
}
ColumnsDescription StorageS3::getTableStructureFromData(
2023-03-30 21:06:53 +00:00
const StorageS3::Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx)
{
return getTableStructureAndFormatFromDataImpl(configuration.format, configuration, format_settings, ctx).first;
}
std::pair<ColumnsDescription, String> StorageS3::getTableStructureAndFormatFromData(
const StorageS3::Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx)
{
return getTableStructureAndFormatFromDataImpl(std::nullopt, configuration, format_settings, ctx);
}
namespace
{
class ReadBufferIterator : public IReadBufferIterator, WithContext
{
public:
ReadBufferIterator(
std::shared_ptr<StorageS3Source::IIterator> file_iterator_,
const StorageS3Source::KeysWithInfo & read_keys_,
const StorageS3::Configuration & configuration_,
std::optional<String> format_,
const std::optional<FormatSettings> & format_settings_,
const ContextPtr & context_)
: WithContext(context_)
, file_iterator(file_iterator_)
, read_keys(read_keys_)
, configuration(configuration_)
, format(std::move(format_))
, format_settings(format_settings_)
, prev_read_keys_size(read_keys_.size())
2022-02-09 16:14:14 +00:00
{
}
Data next() override
{
if (first)
{
/// If format is unknown we iterate through all currently read keys on first iteration and
/// try to determine format by file name.
if (!format)
{
for (const auto & key_with_info : read_keys)
{
if (auto format_from_file_name = FormatFactory::instance().tryGetFormatFromFileName(key_with_info->key))
{
format = format_from_file_name;
break;
}
}
}
/// For default mode check cached columns for currently read keys on first iteration.
if (first && getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT)
{
if (auto cached_columns = tryGetColumnsFromCache(read_keys.begin(), read_keys.end()))
return {nullptr, cached_columns, format};
}
}
while (true)
{
current_key_with_info = (*file_iterator)();
2023-09-28 14:25:04 +00:00
if (!current_key_with_info || current_key_with_info->key.empty())
2023-06-13 14:43:50 +00:00
{
if (first)
{
if (format)
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"The table structure cannot be extracted from a {} format file, because there are no files with provided path "
"in S3 or all files are empty. You can specify table structure manually",
*format);
throw Exception(
ErrorCodes::CANNOT_DETECT_FORMAT,
"The data format cannot be detected by the contents of the files, because there are no files with provided path "
"in S3 or all files are empty. You can specify the format manually");
}
return {nullptr, std::nullopt, format};
}
2024-01-24 17:55:31 +00:00
/// S3 file iterator could get new keys after new iteration
if (read_keys.size() > prev_read_keys_size)
{
2024-01-24 17:55:31 +00:00
/// If format is unknown we can try to determine it by new file names.
if (!format)
{
2024-01-24 17:55:31 +00:00
for (auto it = read_keys.begin() + prev_read_keys_size; it != read_keys.end(); ++it)
{
2024-01-24 17:55:31 +00:00
if (auto format_from_file_name = FormatFactory::instance().tryGetFormatFromFileName((*it)->key))
{
format = format_from_file_name;
break;
}
}
}
2024-01-24 17:55:31 +00:00
/// Check new files in schema cache if schema inference mode is default.
if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT)
{
auto columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end());
if (columns_from_cache)
return {nullptr, columns_from_cache, format};
}
prev_read_keys_size = read_keys.size();
}
2023-06-13 14:43:50 +00:00
2023-09-27 12:54:31 +00:00
if (getContext()->getSettingsRef().s3_skip_empty_files && current_key_with_info->info && current_key_with_info->info->size == 0)
continue;
/// In union mode, check cached columns only for current key.
if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::UNION)
{
2023-10-23 14:38:34 +00:00
StorageS3::KeysWithInfo keys = {current_key_with_info};
if (auto columns_from_cache = tryGetColumnsFromCache(keys.begin(), keys.end()))
{
first = false;
return {nullptr, columns_from_cache, format};
}
}
int zstd_window_log_max = static_cast<int>(getContext()->getSettingsRef().zstd_window_log_max);
2023-09-27 12:54:31 +00:00
auto impl = std::make_unique<ReadBufferFromS3>(configuration.client, configuration.url.bucket, current_key_with_info->key, configuration.url.version_id, configuration.request_settings, getContext()->getReadSettings());
if (!getContext()->getSettingsRef().s3_skip_empty_files || !impl->eof())
{
first = false;
return {wrapReadBufferWithCompressionMethod(std::move(impl), chooseCompressionMethod(current_key_with_info->key, configuration.compression_method), zstd_window_log_max), std::nullopt, format};
}
2023-06-13 14:43:50 +00:00
}
}
void setNumRowsToLastFile(size_t num_rows) override
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3)
return;
2023-09-27 12:54:31 +00:00
String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket / current_key_with_info->key;
auto key = getKeyForSchemaCache(source, *format, format_settings, getContext());
StorageS3::getSchemaCache(getContext()).addNumRows(key, num_rows);
}
void setSchemaToLastFile(const ColumnsDescription & columns) override
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3
|| getContext()->getSettingsRef().schema_inference_mode != SchemaInferenceMode::UNION)
return;
String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket / current_key_with_info->key;
auto cache_key = getKeyForSchemaCache(source, *format, format_settings, getContext());
StorageS3::getSchemaCache(getContext()).addColumns(cache_key, columns);
}
void setResultingSchema(const ColumnsDescription & columns) override
{
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3
|| getContext()->getSettingsRef().schema_inference_mode != SchemaInferenceMode::DEFAULT)
return;
auto host_and_bucket = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket;
Strings sources;
sources.reserve(read_keys.size());
2023-10-23 14:38:34 +00:00
std::transform(read_keys.begin(), read_keys.end(), std::back_inserter(sources), [&](const auto & elem){ return host_and_bucket / elem->key; });
auto cache_keys = getKeysForSchemaCache(sources, *format, format_settings, getContext());
StorageS3::getSchemaCache(getContext()).addManyColumns(cache_keys, columns);
}
void setFormatName(const String & format_name) override
{
format = format_name;
}
2023-10-24 17:35:03 +00:00
String getLastFileName() const override
{
if (current_key_with_info)
return current_key_with_info->key;
return "";
}
bool supportsLastReadBufferRecreation() const override { return true; }
std::unique_ptr<ReadBuffer> recreateLastReadBuffer() override
{
chassert(current_key_with_info);
int zstd_window_log_max = static_cast<int>(getContext()->getSettingsRef().zstd_window_log_max);
auto impl = std::make_unique<ReadBufferFromS3>(configuration.client, configuration.url.bucket, current_key_with_info->key, configuration.url.version_id, configuration.request_settings, getContext()->getReadSettings());
return wrapReadBufferWithCompressionMethod(std::move(impl), chooseCompressionMethod(current_key_with_info->key, configuration.compression_method), zstd_window_log_max);
}
private:
std::optional<ColumnsDescription> tryGetColumnsFromCache(
const StorageS3::KeysWithInfo::const_iterator & begin,
const StorageS3::KeysWithInfo::const_iterator & end)
{
auto context = getContext();
if (!context->getSettingsRef().schema_inference_use_cache_for_s3)
return std::nullopt;
auto & schema_cache = StorageS3::getSchemaCache(context);
for (auto it = begin; it < end; ++it)
{
auto get_last_mod_time = [&]
{
time_t last_modification_time = 0;
2023-10-23 14:38:34 +00:00
if ((*it)->info)
{
2023-10-23 14:38:34 +00:00
last_modification_time = (*it)->info->last_modification_time;
}
else
{
/// Note that in case of exception in getObjectInfo returned info will be empty,
/// but schema cache will handle this case and won't return columns from cache
/// because we can't say that it's valid without last modification time.
last_modification_time = S3::getObjectInfo(
*configuration.client,
configuration.url.bucket,
2023-10-23 14:38:34 +00:00
(*it)->key,
configuration.url.version_id,
configuration.request_settings,
/*with_metadata=*/ false,
/*for_disk_s3=*/ false,
/*throw_on_error= */ false).last_modification_time;
}
return last_modification_time ? std::make_optional(last_modification_time) : std::nullopt;
};
2023-10-23 14:38:34 +00:00
String path = fs::path(configuration.url.bucket) / (*it)->key;
String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / path;
if (format)
{
auto cache_key = getKeyForSchemaCache(source, *format, format_settings, context);
if (auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time))
return columns;
}
else
{
/// If format is unknown, we can iterate through all possible input formats
/// and check if we have an entry with this format and this file in schema cache.
/// If we have such entry for some format, we can use this format to read the file.
for (const auto & format_name : FormatFactory::instance().getAllInputFormats())
{
auto cache_key = getKeyForSchemaCache(source, format_name, format_settings, context);
if (auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time))
{
/// Now format is known. It should be the same for all files.
format = format_name;
return columns;
}
}
}
}
return std::nullopt;
}
std::shared_ptr<StorageS3Source::IIterator> file_iterator;
const StorageS3Source::KeysWithInfo & read_keys;
const StorageS3::Configuration & configuration;
std::optional<String> format;
const std::optional<FormatSettings> & format_settings;
2023-09-27 12:54:31 +00:00
StorageS3Source::KeyWithInfoPtr current_key_with_info;
size_t prev_read_keys_size;
bool first = true;
};
}
std::pair<ColumnsDescription, String> StorageS3::getTableStructureAndFormatFromDataImpl(
std::optional<String> format,
const Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & ctx)
{
KeysWithInfo read_keys;
2023-12-29 17:41:11 +00:00
auto file_iterator = createFileIterator(configuration, false, ctx, {}, {}, &read_keys);
ReadBufferIterator read_buffer_iterator(file_iterator, read_keys, configuration, format, format_settings, ctx);
if (format)
return {readSchemaFromFormat(*format, format_settings, read_buffer_iterator, ctx), *format};
return detectFormatAndReadSchema(format_settings, read_buffer_iterator, ctx);
}
2021-09-07 11:17:25 +00:00
void registerStorageS3Impl(const String & name, StorageFactory & factory)
{
factory.registerStorage(name, [](const StorageFactory::Arguments & args)
{
auto & engine_args = args.engine_args;
if (engine_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
2021-09-15 18:11:49 +00:00
auto configuration = StorageS3::getConfiguration(engine_args, args.getLocalContext());
2021-08-23 19:05:28 +00:00
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
std::optional<FormatSettings> format_settings;
if (args.storage_def->settings)
{
FormatFactorySettings user_format_settings;
// Apply changed settings from global context, but ignore the
// unknown ones, because we only have the format settings here.
const auto & changes = args.getContext()->getSettingsRef().changes();
for (const auto & change : changes)
{
if (user_format_settings.has(change.name))
user_format_settings.set(change.name, change.value);
}
// Apply changes from SETTINGS clause, with validation.
user_format_settings.applyChanges(args.storage_def->settings->changes);
format_settings = getFormatSettings(args.getContext(), user_format_settings);
}
else
{
format_settings = getFormatSettings(args.getContext());
}
2021-10-26 12:22:13 +00:00
ASTPtr partition_by;
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
return std::make_shared<StorageS3>(
2023-03-24 21:35:12 +00:00
std::move(configuration),
args.getContext(),
args.table_id,
args.columns,
args.constraints,
2021-04-23 12:18:23 +00:00
args.comment,
2021-08-23 19:05:28 +00:00
format_settings,
2021-10-26 12:22:13 +00:00
/* distributed_processing_ */false,
partition_by);
},
{
2021-08-23 19:05:28 +00:00
.supports_settings = true,
2021-10-26 12:22:13 +00:00
.supports_sort_order = true, // for partition by
2021-12-17 15:34:13 +00:00
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
2019-06-17 00:06:14 +00:00
});
2019-05-23 09:03:39 +00:00
}
2019-12-11 14:21:48 +00:00
void registerStorageS3(StorageFactory & factory)
{
return registerStorageS3Impl("S3", factory);
}
void registerStorageCOS(StorageFactory & factory)
{
return registerStorageS3Impl("COSN", factory);
}
2022-11-11 08:40:10 +00:00
void registerStorageOSS(StorageFactory & factory)
{
return registerStorageS3Impl("OSS", factory);
}
NamesAndTypesList StorageS3::getVirtuals() const
2020-04-27 13:55:30 +00:00
{
2022-03-28 19:18:20 +00:00
return virtual_columns;
2020-04-27 13:55:30 +00:00
}
Names StorageS3::getVirtualColumnNames()
{
return VirtualColumnUtils::getPathFileAndSizeVirtualsForStorage({}).getNames();
}
2021-07-14 08:49:05 +00:00
bool StorageS3::supportsPartitionBy() const
{
return true;
}
SchemaCache & StorageS3::getSchemaCache(const ContextPtr & ctx)
{
static SchemaCache schema_cache(ctx->getConfigRef().getUInt("schema_inference_cache_max_elements_for_s3", DEFAULT_SCHEMA_CACHE_ELEMENTS));
return schema_cache;
}
2019-05-23 09:03:39 +00:00
}
2019-12-11 14:21:48 +00:00
#endif