ClickHouse/src/Storages/StorageS3.cpp

1274 lines
45 KiB
C++
Raw Normal View History

2019-12-06 14:37:21 +00:00
#include <Common/config.h>
2022-03-21 14:52:26 +00:00
#include "IO/ParallelReadBuffer.h"
#include "IO/IOThreadPool.h"
2021-08-23 19:05:28 +00:00
#include "Parsers/ASTCreateQuery.h"
2019-12-06 14:37:21 +00:00
#if USE_AWS_S3
#include <Common/isValidUTF8.h>
2021-05-31 08:46:28 +00:00
2021-06-01 12:41:23 +00:00
#include <Functions/FunctionsConversion.h>
2019-12-03 16:23:24 +00:00
#include <IO/S3Common.h>
2019-05-23 09:03:39 +00:00
2021-05-25 06:45:30 +00:00
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
2019-05-23 09:03:39 +00:00
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/threadPoolCallbackRunner.h>
2021-05-25 06:45:30 +00:00
2021-06-01 12:41:23 +00:00
#include <Parsers/ASTFunction.h>
2021-05-25 06:45:30 +00:00
#include <Parsers/ASTInsertQuery.h>
2019-05-23 09:03:39 +00:00
#include <Parsers/ASTLiteral.h>
2021-06-01 12:41:23 +00:00
#include <Storages/StorageFactory.h>
#include <Storages/StorageS3.h>
#include <Storages/StorageS3Settings.h>
#include <Storages/StorageSnapshot.h>
2021-10-26 09:31:01 +00:00
#include <Storages/PartitionedSink.h>
2022-05-19 11:18:58 +00:00
#include <Storages/VirtualColumnUtils.h>
2022-03-28 19:18:20 +00:00
#include <Storages/getVirtualsForStorage.h>
2019-05-23 09:03:39 +00:00
2019-06-01 21:18:20 +00:00
#include <IO/ReadBufferFromS3.h>
#include <IO/WriteBufferFromS3.h>
2019-05-23 09:03:39 +00:00
#include <Formats/FormatFactory.h>
#include <Formats/ReadSchemaUtils.h>
2019-05-31 07:27:14 +00:00
2021-07-21 16:13:17 +00:00
#include <Processors/Transforms/AddingDefaultsTransform.h>
2021-10-11 16:11:50 +00:00
#include <Processors/Formats/IOutputFormat.h>
2021-10-13 18:22:02 +00:00
#include <Processors/Formats/IInputFormat.h>
#include <QueryPipeline/narrowPipe.h>
2019-05-23 09:03:39 +00:00
2021-10-16 14:03:50 +00:00
#include <QueryPipeline/QueryPipelineBuilder.h>
2021-07-20 18:18:43 +00:00
#include <Processors/Executors/PullingPipelineExecutor.h>
2020-01-26 14:05:51 +00:00
#include <DataTypes/DataTypeString.h>
#include <aws/core/auth/AWSCredentials.h>
2019-12-03 16:23:24 +00:00
#include <aws/s3/S3Client.h>
#include <aws/s3/model/ListObjectsV2Request.h>
2021-06-21 15:44:36 +00:00
#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
2020-01-26 13:03:47 +00:00
#include <Common/parseGlobs.h>
2020-02-17 19:28:25 +00:00
#include <Common/quoteString.h>
2020-01-26 13:03:47 +00:00
#include <re2/re2.h>
2019-05-23 09:03:39 +00:00
2022-05-20 19:49:31 +00:00
#include <Processors/ISource.h>
2021-07-23 19:33:59 +00:00
#include <Processors/Sinks/SinkToStorage.h>
2021-10-16 14:03:50 +00:00
#include <QueryPipeline/Pipe.h>
2021-05-08 10:59:55 +00:00
#include <filesystem>
2019-05-23 09:03:39 +00:00
2021-05-08 10:59:55 +00:00
namespace fs = std::filesystem;
2019-05-31 07:27:14 +00:00
2021-05-25 06:45:30 +00:00
static const String PARTITION_ID_WILDCARD = "{_partition_id}";
2019-05-23 09:03:39 +00:00
namespace DB
{
2021-07-18 11:03:45 +00:00
2019-05-23 09:03:39 +00:00
namespace ErrorCodes
{
2021-07-18 11:03:45 +00:00
extern const int CANNOT_PARSE_TEXT;
2021-08-20 13:12:30 +00:00
extern const int BAD_ARGUMENTS;
2019-05-23 09:03:39 +00:00
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
2020-01-30 06:17:55 +00:00
extern const int S3_ERROR;
2021-07-18 11:03:45 +00:00
extern const int UNEXPECTED_EXPRESSION;
extern const int DATABASE_ACCESS_DENIED;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
2022-05-19 11:18:58 +00:00
extern const int NOT_IMPLEMENTED;
2019-05-23 09:03:39 +00:00
}
2021-10-11 16:11:50 +00:00
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
2022-05-19 11:18:58 +00:00
class StorageS3Source::DisclosedGlobIterator::Impl : WithContext
2021-04-06 19:18:45 +00:00
{
public:
Impl(
const Aws::S3::S3Client & client_,
const S3::URI & globbed_uri_,
ASTPtr & query_,
const Block & virtual_header_,
ContextPtr context_)
: WithContext(context_)
, client(client_)
, globbed_uri(globbed_uri_)
, query(query_)
, virtual_header(virtual_header_)
2021-04-12 17:07:01 +00:00
{
2021-04-06 19:18:45 +00:00
if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos)
throw Exception("Expression can not have wildcards inside bucket name", ErrorCodes::UNEXPECTED_EXPRESSION);
const String key_prefix = globbed_uri.key.substr(0, globbed_uri.key.find_first_of("*?{"));
2021-06-28 17:02:22 +00:00
/// We don't have to list bucket, because there is no asterisks.
2021-04-06 19:18:45 +00:00
if (key_prefix.size() == globbed_uri.key.size())
2021-04-12 16:51:52 +00:00
{
2021-04-06 19:18:45 +00:00
buffer.emplace_back(globbed_uri.key);
2021-04-12 16:51:52 +00:00
buffer_iter = buffer.begin();
is_finished = true;
return;
}
2021-04-06 19:18:45 +00:00
2022-05-19 11:18:58 +00:00
/// Create a virtual block with one row to construct filter
if (query && virtual_header)
{
/// Append "key" column as the filter result
virtual_header.insert({ColumnString::create(), std::make_shared<DataTypeString>(), "_key"});
auto block = virtual_header.cloneEmpty();
MutableColumns columns = block.mutateColumns();
for (auto & column : columns)
column->insertDefault();
block.setColumns(std::move(columns));
VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast);
}
2021-04-06 19:18:45 +00:00
request.SetBucket(globbed_uri.bucket);
request.SetPrefix(key_prefix);
matcher = std::make_unique<re2::RE2>(makeRegexpPatternFromGlobs(globbed_uri.key));
2021-04-10 14:58:29 +00:00
fillInternalBufferAssumeLocked();
2021-04-06 19:18:45 +00:00
}
2021-04-13 10:59:02 +00:00
String next()
2021-04-10 02:21:18 +00:00
{
std::lock_guard lock(mutex);
return nextAssumeLocked();
}
private:
2021-04-13 10:59:02 +00:00
String nextAssumeLocked()
2021-04-06 19:18:45 +00:00
{
if (buffer_iter != buffer.end())
{
auto answer = *buffer_iter;
++buffer_iter;
return answer;
}
if (is_finished)
2021-04-13 10:59:02 +00:00
return {};
2021-04-06 19:18:45 +00:00
2021-04-10 02:21:18 +00:00
fillInternalBufferAssumeLocked();
2021-04-06 19:18:45 +00:00
2021-04-10 02:21:18 +00:00
return nextAssumeLocked();
2021-04-06 19:18:45 +00:00
}
2021-04-10 02:21:18 +00:00
void fillInternalBufferAssumeLocked()
2021-04-06 19:18:45 +00:00
{
buffer.clear();
outcome = client.ListObjectsV2(request);
if (!outcome.IsSuccess())
throw Exception(ErrorCodes::S3_ERROR, "Could not list objects in bucket {} with prefix {}, S3 exception: {}, message: {}",
quoteString(request.GetBucket()), quoteString(request.GetPrefix()),
backQuote(outcome.GetError().GetExceptionName()), quoteString(outcome.GetError().GetMessage()));
const auto & result_batch = outcome.GetResult().GetContents();
2022-05-19 11:18:58 +00:00
if (filter_ast)
2021-04-06 19:18:45 +00:00
{
2022-05-19 11:18:58 +00:00
auto block = virtual_header.cloneEmpty();
MutableColumnPtr path_column;
MutableColumnPtr file_column;
MutableColumnPtr key_column = block.getByName("_key").column->assumeMutable();
if (block.has("_path"))
path_column = block.getByName("_path").column->assumeMutable();
if (block.has("_file"))
file_column = block.getByName("_file").column->assumeMutable();
for (const auto & row : result_batch)
{
const String & key = row.GetKey();
if (re2::RE2::FullMatch(key, *matcher))
{
String path = fs::path(globbed_uri.bucket) / key;
String file = path.substr(path.find_last_of('/') + 1);
if (path_column)
path_column->insert(path);
if (file_column)
file_column->insert(file);
key_column->insert(key);
}
}
VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast);
const ColumnString & keys = typeid_cast<const ColumnString &>(*block.getByName("_key").column);
size_t rows = block.rows();
buffer.reserve(rows);
for (size_t i = 0; i < rows; ++i)
buffer.emplace_back(keys.getDataAt(i).toString());
2021-04-06 19:18:45 +00:00
}
2022-05-19 11:18:58 +00:00
else
2021-04-06 19:18:45 +00:00
{
2022-05-19 11:18:58 +00:00
buffer.reserve(result_batch.size());
for (const auto & row : result_batch)
{
String key = row.GetKey();
if (re2::RE2::FullMatch(key, *matcher))
buffer.emplace_back(std::move(key));
}
2021-04-06 19:18:45 +00:00
}
2022-05-19 11:18:58 +00:00
2021-04-06 19:18:45 +00:00
/// Set iterator only after the whole batch is processed
buffer_iter = buffer.begin();
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
2021-04-12 17:07:01 +00:00
2021-04-06 19:18:45 +00:00
/// It returns false when all objects were returned
is_finished = !outcome.GetResult().GetIsTruncated();
}
2021-04-10 02:21:18 +00:00
std::mutex mutex;
2021-04-06 19:18:45 +00:00
Strings buffer;
Strings::iterator buffer_iter;
Aws::S3::S3Client client;
S3::URI globbed_uri;
2022-05-19 11:18:58 +00:00
ASTPtr query;
Block virtual_header;
ASTPtr filter_ast;
2021-04-06 19:18:45 +00:00
Aws::S3::Model::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
std::unique_ptr<re2::RE2> matcher;
bool is_finished{false};
};
2022-05-19 11:18:58 +00:00
StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
2022-05-25 19:47:05 +00:00
const Aws::S3::S3Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const Block & virtual_header,
ContextPtr context)
2022-05-19 11:18:58 +00:00
: pimpl(std::make_shared<StorageS3Source::DisclosedGlobIterator::Impl>(client_, globbed_uri_, query, virtual_header, context))
{
}
2021-04-06 19:18:45 +00:00
2021-04-13 10:59:02 +00:00
String StorageS3Source::DisclosedGlobIterator::next()
2021-04-06 19:18:45 +00:00
{
return pimpl->next();
}
2022-05-19 11:18:58 +00:00
class StorageS3Source::KeysIterator::Impl : WithContext
{
public:
2022-05-19 11:18:58 +00:00
explicit Impl(
const std::vector<String> & keys_, const String & bucket_, ASTPtr query_, const Block & virtual_header_, ContextPtr context_)
: WithContext(context_), keys(keys_), bucket(bucket_), query(query_), virtual_header(virtual_header_)
{
2022-05-19 11:18:58 +00:00
/// Create a virtual block with one row to construct filter
if (query && virtual_header)
{
/// Append "key" column as the filter result
virtual_header.insert({ColumnString::create(), std::make_shared<DataTypeString>(), "_key"});
auto block = virtual_header.cloneEmpty();
MutableColumns columns = block.mutateColumns();
for (auto & column : columns)
column->insertDefault();
block.setColumns(std::move(columns));
ASTPtr filter_ast;
VirtualColumnUtils::prepareFilterBlockWithQuery(query, getContext(), block, filter_ast);
if (filter_ast)
{
block = virtual_header.cloneEmpty();
MutableColumnPtr path_column;
MutableColumnPtr file_column;
MutableColumnPtr key_column = block.getByName("_key").column->assumeMutable();
if (block.has("_path"))
path_column = block.getByName("_path").column->assumeMutable();
if (block.has("_file"))
file_column = block.getByName("_file").column->assumeMutable();
for (const auto & key : keys)
{
String path = fs::path(bucket) / key;
String file = path.substr(path.find_last_of('/') + 1);
if (path_column)
path_column->insert(path);
if (file_column)
file_column->insert(file);
key_column->insert(key);
}
VirtualColumnUtils::filterBlockWithQuery(query, block, getContext(), filter_ast);
const ColumnString & keys_col = typeid_cast<const ColumnString &>(*block.getByName("_key").column);
size_t rows = block.rows();
Strings filtered_keys;
filtered_keys.reserve(rows);
for (size_t i = 0; i < rows; ++i)
filtered_keys.emplace_back(keys_col.getDataAt(i).toString());
keys = std::move(filtered_keys);
}
}
}
String next()
{
2022-05-02 12:09:54 +00:00
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
2022-04-19 18:23:04 +00:00
if (current_index >= keys.size())
return "";
2022-04-19 18:23:04 +00:00
return keys[current_index];
}
private:
2022-01-18 19:26:13 +00:00
Strings keys;
2022-04-19 18:23:04 +00:00
std::atomic_size_t index = 0;
2022-05-19 11:18:58 +00:00
String bucket;
ASTPtr query;
Block virtual_header;
};
2022-05-19 11:18:58 +00:00
StorageS3Source::KeysIterator::KeysIterator(
const std::vector<String> & keys_, const String & bucket_, ASTPtr query, const Block & virtual_header, ContextPtr context)
: pimpl(std::make_shared<StorageS3Source::KeysIterator::Impl>(keys_, bucket_, query, virtual_header, context))
{
}
String StorageS3Source::KeysIterator::next()
{
return pimpl->next();
}
2019-12-03 16:23:24 +00:00
2022-04-19 18:23:04 +00:00
class StorageS3Source::ReadTasksIterator::Impl
{
public:
explicit Impl(const std::vector<String> & read_tasks_, const ReadTaskCallback & new_read_tasks_callback_)
: read_tasks(read_tasks_), new_read_tasks_callback(new_read_tasks_callback_)
{
}
String next()
{
2022-05-02 12:09:54 +00:00
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
2022-04-19 18:23:04 +00:00
if (current_index >= read_tasks.size())
return new_read_tasks_callback();
return read_tasks[current_index];
}
private:
std::atomic_size_t index = 0;
std::vector<String> read_tasks;
ReadTaskCallback new_read_tasks_callback;
};
StorageS3Source::ReadTasksIterator::ReadTasksIterator(
const std::vector<String> & read_tasks_, const ReadTaskCallback & new_read_tasks_callback_)
: pimpl(std::make_shared<StorageS3Source::ReadTasksIterator::Impl>(read_tasks_, new_read_tasks_callback_))
{
}
String StorageS3Source::ReadTasksIterator::next()
{
return pimpl->next();
}
2022-03-28 19:18:20 +00:00
Block StorageS3Source::getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns)
2019-05-23 09:03:39 +00:00
{
2022-03-28 19:18:20 +00:00
for (const auto & virtual_column : requested_virtual_columns)
sample_block.insert({virtual_column.type->createColumn(), virtual_column.type, virtual_column.name});
2020-02-14 17:47:39 +00:00
2021-03-22 17:12:31 +00:00
return sample_block;
}
2020-02-14 17:47:39 +00:00
2021-03-22 17:12:31 +00:00
StorageS3Source::StorageS3Source(
2022-03-28 19:18:20 +00:00
const std::vector<NameAndTypePair> & requested_virtual_columns_,
2021-04-08 00:09:15 +00:00
const String & format_,
2021-03-22 17:12:31 +00:00
String name_,
2021-04-08 00:09:15 +00:00
const Block & sample_block_,
2021-04-12 19:35:26 +00:00
ContextPtr context_,
2021-08-23 19:05:28 +00:00
std::optional<FormatSettings> format_settings_,
2021-04-08 00:09:15 +00:00
const ColumnsDescription & columns_,
UInt64 max_block_size_,
2021-05-19 21:42:25 +00:00
UInt64 max_single_read_retries_,
String compression_hint_,
2022-05-11 22:04:54 +00:00
const std::shared_ptr<const Aws::S3::S3Client> & client_,
2021-04-08 00:09:15 +00:00
const String & bucket_,
const String & version_id_,
2022-03-23 08:40:00 +00:00
std::shared_ptr<IteratorWrapper> file_iterator_,
const size_t download_thread_num_)
2022-05-20 19:49:31 +00:00
: ISource(getHeader(sample_block_, requested_virtual_columns_))
2021-04-12 19:35:26 +00:00
, WithContext(context_)
2021-03-22 17:12:31 +00:00
, name(std::move(name_))
2021-04-08 00:09:15 +00:00
, bucket(bucket_)
, version_id(version_id_)
2021-04-08 00:09:15 +00:00
, format(format_)
, columns_desc(columns_)
, max_block_size(max_block_size_)
2021-05-19 21:42:25 +00:00
, max_single_read_retries(max_single_read_retries_)
, compression_hint(std::move(compression_hint_))
2021-04-08 00:09:15 +00:00
, client(client_)
, sample_block(sample_block_)
2021-08-23 19:05:28 +00:00
, format_settings(format_settings_)
2022-03-28 19:18:20 +00:00
, requested_virtual_columns(requested_virtual_columns_)
2021-04-08 00:09:15 +00:00
, file_iterator(file_iterator_)
2022-03-23 08:40:00 +00:00
, download_thread_num(download_thread_num_)
2021-04-08 00:09:15 +00:00
{
initialize();
}
2021-12-27 19:42:56 +00:00
void StorageS3Source::onCancel()
{
std::lock_guard lock(reader_mutex);
2021-12-27 19:42:56 +00:00
if (reader)
reader->cancel();
}
2021-04-08 00:09:15 +00:00
bool StorageS3Source::initialize()
2021-03-22 17:12:31 +00:00
{
2021-04-13 10:59:02 +00:00
String current_key = (*file_iterator)();
if (current_key.empty())
2021-04-08 00:09:15 +00:00
return false;
2021-04-13 10:59:02 +00:00
2021-05-08 10:59:55 +00:00
file_path = fs::path(bucket) / current_key;
2021-04-08 00:09:15 +00:00
auto zstd_window_log_max = getContext()->getSettingsRef().zstd_window_log_max;
read_buf = wrapReadBufferWithCompressionMethod(createS3ReadBuffer(current_key), chooseCompressionMethod(current_key, compression_hint), zstd_window_log_max);
2022-03-21 14:52:26 +00:00
2021-10-11 16:11:50 +00:00
auto input_format = getContext()->getInputFormat(format, *read_buf, sample_block, max_block_size, format_settings);
2021-09-16 17:40:42 +00:00
QueryPipelineBuilder builder;
builder.init(Pipe(input_format));
2020-02-14 17:47:39 +00:00
2021-04-08 00:09:15 +00:00
if (columns_desc.hasDefaults())
2021-07-20 18:18:43 +00:00
{
2022-03-23 08:40:00 +00:00
builder.addSimpleTransform(
[&](const Block & header)
{ return std::make_shared<AddingDefaultsTransform>(header, columns_desc, *input_format, getContext()); });
2021-07-20 18:18:43 +00:00
}
2022-05-24 20:06:08 +00:00
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
2021-07-20 18:18:43 +00:00
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
2021-04-08 00:09:15 +00:00
return true;
2021-03-22 17:12:31 +00:00
}
2022-04-07 14:07:12 +00:00
std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & key)
2022-03-23 08:40:00 +00:00
{
2022-04-07 09:22:01 +00:00
const size_t object_size = DB::S3::getObjectSize(client, bucket, key, version_id, false);
2022-03-23 08:40:00 +00:00
auto download_buffer_size = getContext()->getSettings().max_download_buffer_size;
const bool use_parallel_download = download_buffer_size > 0 && download_thread_num > 1;
const bool object_too_small = object_size < download_thread_num * download_buffer_size;
if (!use_parallel_download || object_too_small)
{
2022-03-28 08:19:23 +00:00
LOG_TRACE(log, "Downloading object of size {} from S3 in single thread", object_size);
2022-04-07 09:22:01 +00:00
return std::make_unique<ReadBufferFromS3>(client, bucket, key, version_id, max_single_read_retries, getContext()->getReadSettings());
2022-03-23 08:40:00 +00:00
}
assert(object_size > 0);
if (download_buffer_size < DBMS_DEFAULT_BUFFER_SIZE)
{
2022-03-28 08:19:23 +00:00
LOG_WARNING(log, "Downloading buffer {} bytes too small, set at least {} bytes", download_buffer_size, DBMS_DEFAULT_BUFFER_SIZE);
2022-03-23 08:40:00 +00:00
download_buffer_size = DBMS_DEFAULT_BUFFER_SIZE;
}
auto factory = std::make_unique<ReadBufferS3Factory>(
2022-04-07 09:22:01 +00:00
client, bucket, key, version_id, download_buffer_size, object_size, max_single_read_retries, getContext()->getReadSettings());
2022-03-23 08:40:00 +00:00
LOG_TRACE(
2022-03-28 08:19:23 +00:00
log, "Downloading from S3 in {} threads. Object size: {}, Range size: {}.", download_thread_num, object_size, download_buffer_size);
2022-03-23 08:40:00 +00:00
2022-03-30 10:49:37 +00:00
return std::make_unique<ParallelReadBuffer>(std::move(factory), threadPoolCallbackRunner(IOThreadPool::get()), download_thread_num);
2022-03-23 08:40:00 +00:00
}
2021-03-22 17:12:31 +00:00
String StorageS3Source::getName() const
{
return name;
}
2019-05-23 09:03:39 +00:00
2021-03-22 17:12:31 +00:00
Chunk StorageS3Source::generate()
{
2022-02-11 18:14:55 +00:00
while (true)
2021-03-22 17:12:31 +00:00
{
2022-02-11 18:14:55 +00:00
if (!reader || isCancelled())
break;
2021-03-22 17:12:31 +00:00
2022-02-11 18:14:55 +00:00
Chunk chunk;
if (reader->pull(chunk))
2019-05-31 07:27:14 +00:00
{
2022-02-11 18:14:55 +00:00
UInt64 num_rows = chunk.getNumRows();
2022-03-28 19:18:20 +00:00
for (const auto & virtual_column : requested_virtual_columns)
2022-02-11 18:14:55 +00:00
{
2022-03-28 19:18:20 +00:00
if (virtual_column.name == "_path")
{
chunk.addColumn(virtual_column.type->createColumnConst(num_rows, file_path)->convertToFullColumnIfConst());
}
else if (virtual_column.name == "_file")
{
size_t last_slash_pos = file_path.find_last_of('/');
auto column = virtual_column.type->createColumnConst(num_rows, file_path.substr(last_slash_pos + 1));
chunk.addColumn(column->convertToFullColumnIfConst());
}
2022-02-11 18:14:55 +00:00
}
return chunk;
2019-05-31 07:27:14 +00:00
}
2022-02-11 18:14:55 +00:00
{
std::lock_guard lock(reader_mutex);
reader.reset();
pipeline.reset();
read_buf.reset();
2021-03-22 17:12:31 +00:00
2022-02-11 18:14:55 +00:00
if (!initialize())
break;
}
2022-01-31 11:03:06 +00:00
}
2022-02-11 18:14:55 +00:00
return {};
2021-03-22 17:12:31 +00:00
}
2022-05-11 22:04:54 +00:00
static bool checkIfObjectExists(const std::shared_ptr<const Aws::S3::S3Client> & client, const String & bucket, const String & key)
{
bool is_finished = false;
Aws::S3::Model::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
request.SetBucket(bucket);
request.SetPrefix(key);
while (!is_finished)
{
outcome = client->ListObjectsV2(request);
if (!outcome.IsSuccess())
throw Exception(
ErrorCodes::S3_ERROR,
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
quoteString(bucket),
quoteString(key),
backQuote(outcome.GetError().GetExceptionName()),
quoteString(outcome.GetError().GetMessage()));
const auto & result_batch = outcome.GetResult().GetContents();
for (const auto & obj : result_batch)
{
if (obj.GetKey() == key)
return true;
}
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
is_finished = !outcome.GetResult().GetIsTruncated();
}
return false;
}
2021-04-12 19:35:26 +00:00
2021-07-23 19:33:59 +00:00
class StorageS3Sink : public SinkToStorage
2021-03-22 17:12:31 +00:00
{
2021-04-12 17:07:01 +00:00
public:
2021-07-23 19:33:59 +00:00
StorageS3Sink(
2021-04-12 17:07:01 +00:00
const String & format,
const Block & sample_block_,
ContextPtr context,
2021-08-23 19:05:28 +00:00
std::optional<FormatSettings> format_settings_,
2021-04-12 17:07:01 +00:00
const CompressionMethod compression_method,
2022-04-03 22:33:59 +00:00
const StorageS3::S3Configuration & s3_configuration_,
2021-04-12 17:07:01 +00:00
const String & bucket,
2022-04-03 22:33:59 +00:00
const String & key)
2021-07-23 19:33:59 +00:00
: SinkToStorage(sample_block_)
, sample_block(sample_block_)
2021-08-23 19:05:28 +00:00
, format_settings(format_settings_)
2019-05-23 09:03:39 +00:00
{
2021-04-12 17:07:01 +00:00
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromS3>(
2022-04-03 22:33:59 +00:00
s3_configuration_.client,
2022-03-30 08:15:20 +00:00
bucket,
key,
2022-04-03 22:33:59 +00:00
s3_configuration_.rw_settings,
2022-03-30 08:15:20 +00:00
std::nullopt,
DBMS_DEFAULT_BUFFER_SIZE,
threadPoolCallbackRunner(IOThreadPool::get())),
compression_method,
3);
writer
= FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, {}, format_settings);
2021-04-12 17:07:01 +00:00
}
2019-05-31 07:27:14 +00:00
2021-07-23 19:33:59 +00:00
String getName() const override { return "StorageS3Sink"; }
2019-05-31 07:27:14 +00:00
2021-07-23 19:33:59 +00:00
void consume(Chunk chunk) override
2021-04-12 17:07:01 +00:00
{
writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
2021-04-12 17:07:01 +00:00
}
2019-05-31 07:27:14 +00:00
void onException() override
{
if (!writer)
return;
Fix possible "Cannot write to finalized buffer" It is still possible to get this error since onException does not finalize format correctly. Here is an example of such error, that was found by CI [1]: <details> [ 2686 ] {fa01bf02-73f6-4f7f-b14f-e725de6d7f9b} <Fatal> : Logical error: 'Cannot write to finalized buffer'. [ 34577 ] {} <Fatal> BaseDaemon: ######################################## [ 34577 ] {} <Fatal> BaseDaemon: (version 22.6.1.1, build id: AB8040A6769E01A0) (from thread 2686) (query_id: fa01bf02-73f6-4f7f-b14f-e725de6d7f9b) (query: insert into test_02302 select number from numbers(10) settings s3_truncate_on_insert=1;) Received signal Aborted (6) [ 34577 ] {} <Fatal> BaseDaemon: [ 34577 ] {} <Fatal> BaseDaemon: Stack trace: 0x7fcbaa5a703b 0x7fcbaa586859 0xfad9bab 0xfad9e05 0xfaf6a3b 0x24a48c7f 0x258fb9b9 0x258f2004 0x258b88f4 0x258b863b 0x2581773d 0x258177ce 0x24bb5e98 0xfad01d6 0xfad0105 0x2419b11d 0xfad01d6 0xfad0105 0x2215afbb 0x2215aa48 0xfad01d6 0xfad0105 0xfcc265d 0x225cc546 0x249a1c40 0x249bc1b6 0x2685902c 0x26859505 0x269d7767 0x269d504c 0x7fcbaa75e609 0x7fcbaa683163 [ 34577 ] {} <Fatal> BaseDaemon: 3. raise @ 0x7fcbaa5a703b in ? [ 34577 ] {} <Fatal> BaseDaemon: 4. abort @ 0x7fcbaa586859 in ? [ 34577 ] {} <Fatal> BaseDaemon: 5. ./build_docker/../src/Common/Exception.cpp:47: DB::abortOnFailedAssertion(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) @ 0xfad9bab in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 6. ./build_docker/../src/Common/Exception.cpp:70: DB::Exception::Exception(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, int, bool) @ 0xfad9e05 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 7. ./build_docker/../src/IO/WriteBuffer.h:0: DB::WriteBuffer::write(char const*, unsigned long) @ 0xfaf6a3b in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 8. ./build_docker/../src/Processors/Formats/Impl/ArrowBufferedStreams.cpp:47: DB::ArrowBufferedOutputStream::Write(void const*, long) @ 0x24a48c7f in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 9. long parquet::ThriftSerializer::Serialize<parquet::format::FileMetaData>(parquet::format::FileMetaData const*, arrow::io::OutputStream*, std::__1::shared_ptr<parquet::Encryptor> const&) @ 0x258fb9b9 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 10. parquet::FileMetaData::FileMetaDataImpl::WriteTo(arrow::io::OutputStream*, std::__1::shared_ptr<parquet::Encryptor> const&) const @ 0x258f2004 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 11. parquet::WriteFileMetaData(parquet::FileMetaData const&, arrow::io::OutputStream*) @ 0x258b88f4 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 12. parquet::ParquetFileWriter::~ParquetFileWriter() @ 0x258b863b in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 13. parquet::arrow::FileWriterImpl::~FileWriterImpl() @ 0x2581773d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 14. parquet::arrow::FileWriterImpl::~FileWriterImpl() @ 0x258177ce in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 15. ./build_docker/../src/Processors/Formats/Impl/ParquetBlockOutputFormat.h:27: DB::ParquetBlockOutputFormat::~ParquetBlockOutputFormat() @ 0x24bb5e98 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 16. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 17. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 18.1. inlined from ./build_docker/../contrib/libcxx/include/__memory/unique_ptr.h:312: std::__1::unique_ptr<DB::WriteBuffer, std::__1::default_delete<DB::WriteBuffer> >::reset(DB::WriteBuffer*) [ 34577 ] {} <Fatal> BaseDaemon: 18.2. inlined from ../contrib/libcxx/include/__memory/unique_ptr.h:269: ~unique_ptr [ 34577 ] {} <Fatal> BaseDaemon: 18. ../src/Storages/StorageS3.cpp:566: DB::StorageS3Sink::~StorageS3Sink() @ 0x2419b11d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 19. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 20. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 21. ./build_docker/../contrib/abseil-cpp/absl/container/internal/raw_hash_set.h:1662: absl::lts_20211102::container_internal::raw_hash_set<absl::lts_20211102::container_internal::FlatHashMapPolicy<StringRef, std::__1::shared_ptr<DB::SinkToStorage> >, absl::lts_20211102::hash_internal::Hash<StringRef>, std::__1::equal_to<StringRef>, std::__1::allocator<std::__1::pair<StringRef const, std::__1::shared_ptr<DB::SinkToStorage> > > >::destroy_slots() @ 0x2215afbb in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 22.1. inlined from ./build_docker/../contrib/libcxx/include/string:1445: std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >::__is_long() const [ 34577 ] {} <Fatal> BaseDaemon: 22.2. inlined from ../contrib/libcxx/include/string:2231: ~basic_string [ 34577 ] {} <Fatal> BaseDaemon: 22. ../src/Storages/PartitionedSink.h:14: DB::PartitionedSink::~PartitionedSink() @ 0x2215aa48 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 23. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:173: std::__1::__shared_count::__release_shared() @ 0xfad01d6 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 24. ./build_docker/../contrib/libcxx/include/__memory/shared_ptr.h:216: std::__1::__shared_weak_count::__release_shared() @ 0xfad0105 in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 25. ./build_docker/../contrib/libcxx/include/vector:802: std::__1::vector<std::__1::shared_ptr<DB::IProcessor>, std::__1::allocator<std::__1::shared_ptr<DB::IProcessor> > >::__base_destruct_at_end(std::__1::shared_ptr<DB::IProcessor>*) @ 0xfcc265d in /usr/bin/clickhouse [ 34577 ] {} <Fatal> BaseDaemon: 26.1. inlined from ./build_docker/../contrib/libcxx/include/vector:402: ~vector [ 34577 ] {} <Fatal> BaseDaemon: 26.2. inlined from ../src/QueryPipeline/QueryPipeline.cpp:29: ~QueryPipeline [ 34577 ] {} <Fatal> BaseDaemon: 26. ../src/QueryPipeline/QueryPipeline.cpp:535: DB::QueryPipeline::reset() @ 0x225cc546 in /usr/bin/clickhouse [ 614 ] {} <Fatal> Application: Child process was terminated by signal 6. </details> [1]: https://s3.amazonaws.com/clickhouse-test-reports/37542/8a224239c1d922158b4dc9f5d6609dca836dfd06/stress_test__undefined__actions_.html Follow-up for: #36979 Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-05-30 10:13:47 +00:00
onFinish();
}
2021-07-23 19:33:59 +00:00
void onFinish() override
2021-04-12 17:07:01 +00:00
{
try
{
2021-11-11 18:09:21 +00:00
writer->finalize();
writer->flush();
write_buf->finalize();
}
catch (...)
{
/// Stop ParallelFormattingOutputFormat correctly.
writer.reset();
throw;
}
2021-04-12 17:07:01 +00:00
}
2019-05-31 07:27:14 +00:00
2021-04-12 17:07:01 +00:00
private:
Block sample_block;
2021-08-23 19:05:28 +00:00
std::optional<FormatSettings> format_settings;
2021-04-12 17:07:01 +00:00
std::unique_ptr<WriteBuffer> write_buf;
2021-10-11 16:11:50 +00:00
OutputFormatPtr writer;
2021-04-12 17:07:01 +00:00
};
2019-05-23 09:03:39 +00:00
2021-10-26 09:31:01 +00:00
class PartitionedStorageS3Sink : public PartitionedSink
2021-05-25 06:45:30 +00:00
{
public:
PartitionedStorageS3Sink(
const ASTPtr & partition_by,
const String & format_,
const Block & sample_block_,
ContextPtr context_,
2021-08-25 15:37:17 +00:00
std::optional<FormatSettings> format_settings_,
2021-05-25 06:45:30 +00:00
const CompressionMethod compression_method_,
2022-04-03 22:33:59 +00:00
const StorageS3::S3Configuration & s3_configuration_,
2021-05-25 06:45:30 +00:00
const String & bucket_,
2022-04-03 22:33:59 +00:00
const String & key_)
2021-10-26 09:31:01 +00:00
: PartitionedSink(partition_by, context_, sample_block_)
2021-05-25 06:45:30 +00:00
, format(format_)
, sample_block(sample_block_)
, context(context_)
, compression_method(compression_method_)
2022-04-03 22:33:59 +00:00
, s3_configuration(s3_configuration_)
2021-05-25 06:45:30 +00:00
, bucket(bucket_)
, key(key_)
2021-08-25 15:37:17 +00:00
, format_settings(format_settings_)
2021-05-25 06:45:30 +00:00
{
}
2021-10-26 09:31:01 +00:00
SinkPtr createSinkForPartition(const String & partition_id) override
2021-05-25 06:45:30 +00:00
{
2021-10-26 09:31:01 +00:00
auto partition_bucket = replaceWildcards(bucket, partition_id);
validateBucket(partition_bucket);
2021-05-25 06:45:30 +00:00
2021-10-26 09:31:01 +00:00
auto partition_key = replaceWildcards(key, partition_id);
validateKey(partition_key);
2021-05-25 06:45:30 +00:00
2021-10-26 09:31:01 +00:00
return std::make_shared<StorageS3Sink>(
format,
sample_block,
context,
format_settings,
compression_method,
2022-04-03 22:33:59 +00:00
s3_configuration,
2021-10-26 09:31:01 +00:00
partition_bucket,
2022-04-03 22:33:59 +00:00
partition_key
2021-10-26 09:31:01 +00:00
);
2021-05-25 06:45:30 +00:00
}
private:
const String format;
const Block sample_block;
ContextPtr context;
const CompressionMethod compression_method;
2022-04-06 20:27:38 +00:00
const StorageS3::S3Configuration & s3_configuration;
2021-05-25 06:45:30 +00:00
const String bucket;
const String key;
2021-08-25 15:37:17 +00:00
std::optional<FormatSettings> format_settings;
2021-05-25 06:45:30 +00:00
ExpressionActionsPtr partition_by_expr;
2021-08-20 13:12:30 +00:00
static void validateBucket(const String & str)
{
2021-08-20 13:12:30 +00:00
S3::URI::validateBucket(str, {});
2021-08-20 13:12:30 +00:00
if (!DB::UTF8::isValidUTF8(reinterpret_cast<const UInt8 *>(str.data()), str.size()))
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Incorrect non-UTF8 sequence in bucket name");
validatePartitionKey(str, false);
}
2021-08-20 13:12:30 +00:00
static void validateKey(const String & str)
{
/// See:
/// - https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-keys.html
/// - https://cloud.ibm.com/apidocs/cos/cos-compatibility#putobject
2021-08-20 13:12:30 +00:00
if (str.empty() || str.size() > 1024)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Incorrect key length (not empty, max 1023 characters), got: {}", str.size());
2021-08-20 13:12:30 +00:00
if (!DB::UTF8::isValidUTF8(reinterpret_cast<const UInt8 *>(str.data()), str.size()))
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Incorrect non-UTF8 sequence in key");
validatePartitionKey(str, true);
}
2021-05-25 06:45:30 +00:00
};
2019-12-04 16:06:55 +00:00
StorageS3::StorageS3(
const S3::URI & uri_,
const String & access_key_id_,
const String & secret_access_key_,
2019-12-04 16:06:55 +00:00
const StorageID & table_id_,
2019-09-22 22:13:42 +00:00
const String & format_name_,
2022-04-06 20:27:38 +00:00
const S3Settings::ReadWriteSettings & rw_settings_,
2019-09-22 22:13:42 +00:00
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
2021-04-23 12:18:23 +00:00
const String & comment,
ContextPtr context_,
2021-08-23 19:05:28 +00:00
std::optional<FormatSettings> format_settings_,
2021-04-13 20:17:25 +00:00
const String & compression_method_,
2021-10-26 12:22:13 +00:00
bool distributed_processing_,
ASTPtr partition_by_)
2020-04-27 13:55:30 +00:00
: IStorage(table_id_)
2022-04-06 20:27:38 +00:00
, s3_configuration{uri_, access_key_id_, secret_access_key_, {}, {}, rw_settings_} /// Client and settings will be updated later
, keys({uri_.key})
2019-09-22 22:13:42 +00:00
, format_name(format_name_)
, compression_method(compression_method_)
, name(uri_.storage_name)
2021-04-13 20:17:25 +00:00
, distributed_processing(distributed_processing_)
2021-08-23 19:05:28 +00:00
, format_settings(format_settings_)
2021-10-26 12:22:13 +00:00
, partition_by(partition_by_)
, is_key_with_globs(uri_.key.find_first_of("*?{") != std::string::npos)
2019-09-22 22:13:42 +00:00
{
2022-05-23 12:48:48 +00:00
FormatFactory::instance().checkFormatName(format_name);
2021-03-16 18:41:29 +00:00
context_->getGlobalContext()->getRemoteHostFilter().checkURL(uri_.uri);
2020-06-19 15:39:41 +00:00
StorageInMemoryMetadata storage_metadata;
2022-04-06 20:27:38 +00:00
updateS3Configuration(context_, s3_configuration);
if (columns_.empty())
{
2022-04-19 18:23:04 +00:00
auto columns = getTableStructureFromDataImpl(
format_name,
s3_configuration,
2022-04-19 18:23:04 +00:00
compression_method,
distributed_processing_,
is_key_with_globs,
format_settings,
context_,
&read_tasks_used_in_schema_inference);
storage_metadata.setColumns(columns);
}
else
storage_metadata.setColumns(columns_);
2020-06-19 15:39:41 +00:00
storage_metadata.setConstraints(constraints_);
2021-04-23 12:18:23 +00:00
storage_metadata.setComment(comment);
2020-06-19 15:39:41 +00:00
setInMemoryMetadata(storage_metadata);
2022-03-28 19:18:20 +00:00
auto default_virtuals = NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
2022-05-19 11:18:58 +00:00
for (const auto & column : virtual_columns)
virtual_block.insert({column.type->createColumn(), column.type, column.name});
2019-09-22 22:13:42 +00:00
}
2022-04-19 18:23:04 +00:00
std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(
const S3Configuration & s3_configuration,
2022-04-19 18:23:04 +00:00
const std::vector<String> & keys,
bool is_key_with_globs,
bool distributed_processing,
ContextPtr local_context,
2022-05-19 11:18:58 +00:00
ASTPtr query,
const Block & virtual_block,
2022-04-19 18:23:04 +00:00
const std::vector<String> & read_tasks)
{
if (distributed_processing)
{
return std::make_shared<StorageS3Source::IteratorWrapper>(
2022-04-19 18:23:04 +00:00
[read_tasks_iterator = std::make_shared<StorageS3Source::ReadTasksIterator>(read_tasks, local_context->getReadTaskCallback())]() -> String
{
return read_tasks_iterator->next();
});
}
else if (is_key_with_globs)
{
/// Iterate through disclosed globs and make a source for each file
2022-05-19 11:18:58 +00:00
auto glob_iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.uri, query, virtual_block, local_context);
return std::make_shared<StorageS3Source::IteratorWrapper>([glob_iterator]() { return glob_iterator->next(); });
}
else
{
2022-05-19 11:18:58 +00:00
auto keys_iterator
= std::make_shared<StorageS3Source::KeysIterator>(keys, s3_configuration.uri.bucket, query, virtual_block, local_context);
return std::make_shared<StorageS3Source::IteratorWrapper>([keys_iterator]() { return keys_iterator->next(); });
}
}
2019-09-22 22:13:42 +00:00
2022-05-13 18:39:19 +00:00
bool StorageS3::supportsSubsetOfColumns() const
2022-02-23 19:31:16 +00:00
{
2022-05-13 18:39:19 +00:00
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(format_name);
2022-02-23 19:31:16 +00:00
}
2020-08-03 13:54:14 +00:00
Pipe StorageS3::read(
2019-09-22 22:13:42 +00:00
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
2022-05-19 11:18:58 +00:00
SelectQueryInfo & query_info,
ContextPtr local_context,
2019-06-01 21:18:20 +00:00
QueryProcessingStage::Enum /*processed_stage*/,
2019-05-23 09:03:39 +00:00
size_t max_block_size,
unsigned num_streams)
2019-05-23 09:03:39 +00:00
{
2022-05-19 11:18:58 +00:00
bool has_wildcards = s3_configuration.uri.bucket.find(PARTITION_ID_WILDCARD) != String::npos
|| keys.back().find(PARTITION_ID_WILDCARD) != String::npos;
if (partition_by && has_wildcards)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned S3 storage is not implemented yet");
2022-04-03 22:33:59 +00:00
updateS3Configuration(local_context, s3_configuration);
Pipes pipes;
2022-03-28 19:18:20 +00:00
std::unordered_set<String> column_names_set(column_names.begin(), column_names.end());
std::vector<NameAndTypePair> requested_virtual_columns;
for (const auto & virtual_column : getVirtuals())
2020-01-26 14:05:51 +00:00
{
2022-03-28 19:18:20 +00:00
if (column_names_set.contains(virtual_column.name))
requested_virtual_columns.push_back(virtual_column);
2020-01-26 14:05:51 +00:00
}
2022-05-19 11:18:58 +00:00
std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper = createFileIterator(
s3_configuration,
keys,
is_key_with_globs,
distributed_processing,
local_context,
query_info.query,
virtual_block,
read_tasks_used_in_schema_inference);
2021-04-13 20:19:04 +00:00
2022-02-23 19:31:16 +00:00
ColumnsDescription columns_description;
Block block_for_format;
2022-05-13 18:39:19 +00:00
if (supportsSubsetOfColumns())
2022-02-23 19:31:16 +00:00
{
2022-03-24 16:10:04 +00:00
auto fetch_columns = column_names;
2022-03-25 10:42:51 +00:00
const auto & virtuals = getVirtuals();
std::erase_if(
fetch_columns,
[&](const String & col)
{ return std::any_of(virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col){ return col == virtual_col.name; }); });
2022-03-24 16:10:04 +00:00
if (fetch_columns.empty())
fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()));
columns_description = storage_snapshot->getDescriptionForColumns(fetch_columns);
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
2022-02-23 19:31:16 +00:00
}
else
{
columns_description = storage_snapshot->metadata->getColumns();
block_for_format = storage_snapshot->metadata->getSampleBlock();
2022-02-23 19:31:16 +00:00
}
2022-03-23 08:40:00 +00:00
const size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
2021-04-08 00:09:15 +00:00
for (size_t i = 0; i < num_streams; ++i)
2021-04-06 19:18:45 +00:00
{
2020-02-14 17:47:39 +00:00
pipes.emplace_back(std::make_shared<StorageS3Source>(
2022-03-28 19:18:20 +00:00
requested_virtual_columns,
format_name,
getName(),
2022-02-23 19:31:16 +00:00
block_for_format,
local_context,
2021-08-23 19:05:28 +00:00
format_settings,
2022-02-23 19:31:16 +00:00
columns_description,
max_block_size,
2022-04-03 22:33:59 +00:00
s3_configuration.rw_settings.max_single_read_retries,
2021-04-08 00:09:15 +00:00
compression_method,
2022-04-03 22:33:59 +00:00
s3_configuration.client,
s3_configuration.uri.bucket,
2022-04-20 16:35:56 +00:00
s3_configuration.uri.version_id,
2022-03-23 08:40:00 +00:00
iterator_wrapper,
2022-03-23 11:52:31 +00:00
max_download_threads));
2021-04-06 19:18:45 +00:00
}
2021-04-10 14:58:29 +00:00
auto pipe = Pipe::unitePipes(std::move(pipes));
narrowPipe(pipe, num_streams);
return pipe;
2019-05-23 09:03:39 +00:00
}
2021-05-25 06:45:30 +00:00
SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
2019-05-23 09:03:39 +00:00
{
2022-04-03 22:33:59 +00:00
updateS3Configuration(local_context, s3_configuration);
2021-05-25 06:45:30 +00:00
auto sample_block = metadata_snapshot->getSampleBlock();
auto chosen_compression_method = chooseCompressionMethod(keys.back(), compression_method);
2022-04-03 22:33:59 +00:00
bool has_wildcards = s3_configuration.uri.bucket.find(PARTITION_ID_WILDCARD) != String::npos || keys.back().find(PARTITION_ID_WILDCARD) != String::npos;
2021-07-14 08:49:05 +00:00
auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query);
2021-10-26 12:22:13 +00:00
auto partition_by_ast = insert_query ? (insert_query->partition_by ? insert_query->partition_by : partition_by) : nullptr;
bool is_partitioned_implementation = partition_by_ast && has_wildcards;
2021-07-14 08:49:05 +00:00
if (is_partitioned_implementation)
2021-05-25 06:45:30 +00:00
{
return std::make_shared<PartitionedStorageS3Sink>(
2021-10-26 12:22:13 +00:00
partition_by_ast,
2021-05-25 06:45:30 +00:00
format_name,
sample_block,
local_context,
2021-08-25 15:37:17 +00:00
format_settings,
2021-05-25 06:45:30 +00:00
chosen_compression_method,
2022-04-03 22:33:59 +00:00
s3_configuration,
2022-04-06 20:27:38 +00:00
s3_configuration.uri.bucket,
2022-04-03 22:33:59 +00:00
keys.back());
2021-05-25 06:45:30 +00:00
}
else
{
2022-01-14 18:18:16 +00:00
if (is_key_with_globs)
2022-04-03 22:33:59 +00:00
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "S3 key '{}' contains globs, so the table is in readonly mode", s3_configuration.uri.key);
2022-01-14 18:18:16 +00:00
bool truncate_in_insert = local_context->getSettingsRef().s3_truncate_on_insert;
2022-04-03 22:33:59 +00:00
if (!truncate_in_insert && checkIfObjectExists(s3_configuration.client, s3_configuration.uri.bucket, keys.back()))
2022-01-14 18:18:16 +00:00
{
if (local_context->getSettingsRef().s3_create_new_file_on_insert)
{
size_t index = keys.size();
auto pos = keys[0].find_first_of('.');
String new_key;
do
{
new_key = keys[0].substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : keys[0].substr(pos));
++index;
}
2022-04-03 22:33:59 +00:00
while (checkIfObjectExists(s3_configuration.client, s3_configuration.uri.bucket, new_key));
2022-01-14 18:18:16 +00:00
keys.push_back(new_key);
}
else
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Object in bucket {} with key {} already exists. If you want to overwrite it, enable setting s3_truncate_on_insert, if you "
"want to create a new file on each insert, enable setting s3_create_new_file_on_insert",
2022-04-03 22:33:59 +00:00
s3_configuration.uri.bucket,
2022-01-14 18:18:16 +00:00
keys.back());
}
2021-05-25 06:45:30 +00:00
return std::make_shared<StorageS3Sink>(
format_name,
sample_block,
local_context,
2021-08-25 15:37:17 +00:00
format_settings,
2021-05-25 06:45:30 +00:00
chosen_compression_method,
2022-04-03 22:33:59 +00:00
s3_configuration,
s3_configuration.uri.bucket,
keys.back());
2021-05-25 06:45:30 +00:00
}
2019-05-23 09:03:39 +00:00
}
2021-06-21 15:44:36 +00:00
void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
{
2022-04-03 22:33:59 +00:00
updateS3Configuration(local_context, s3_configuration);
2021-06-21 15:44:36 +00:00
if (is_key_with_globs)
2022-04-03 22:33:59 +00:00
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "S3 key '{}' contains globs, so the table is in readonly mode", s3_configuration.uri.key);
2021-06-21 15:44:36 +00:00
Aws::S3::Model::Delete delkeys;
for (const auto & key : keys)
{
Aws::S3::Model::ObjectIdentifier obj;
obj.SetKey(key);
delkeys.AddObjects(std::move(obj));
}
2021-06-21 15:44:36 +00:00
Aws::S3::Model::DeleteObjectsRequest request;
2022-04-03 22:33:59 +00:00
request.SetBucket(s3_configuration.uri.bucket);
2021-06-21 15:44:36 +00:00
request.SetDelete(delkeys);
2022-04-03 22:33:59 +00:00
auto response = s3_configuration.client->DeleteObjects(request);
2021-06-21 15:44:36 +00:00
if (!response.IsSuccess())
{
const auto & err = response.GetError();
throw Exception(std::to_string(static_cast<int>(err.GetErrorType())) + ": " + err.GetMessage(), ErrorCodes::S3_ERROR);
}
}
2022-04-06 20:27:38 +00:00
void StorageS3::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configuration & upd)
{
2021-03-16 18:41:29 +00:00
auto settings = ctx->getStorageS3Settings().getSettings(upd.uri.uri.toString());
2022-04-06 20:27:38 +00:00
bool need_update_configuration = settings != S3Settings{};
if (need_update_configuration)
2022-04-03 22:33:59 +00:00
{
2022-04-06 20:27:38 +00:00
if (upd.rw_settings != settings.rw_settings)
upd.rw_settings = settings.rw_settings;
2022-04-03 22:33:59 +00:00
}
2022-04-06 20:27:38 +00:00
upd.rw_settings.updateFromSettingsIfEmpty(ctx->getSettings());
if (upd.client && (!upd.access_key_id.empty() || settings.auth_settings == upd.auth_settings))
return;
2021-03-16 18:41:29 +00:00
Aws::Auth::AWSCredentials credentials(upd.access_key_id, upd.secret_access_key);
HeaderCollection headers;
2021-03-16 18:41:29 +00:00
if (upd.access_key_id.empty())
{
2022-04-06 20:27:38 +00:00
credentials = Aws::Auth::AWSCredentials(settings.auth_settings.access_key_id, settings.auth_settings.secret_access_key);
headers = settings.auth_settings.headers;
}
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
2022-04-06 20:27:38 +00:00
settings.auth_settings.region,
2022-05-15 22:16:00 +00:00
ctx->getRemoteHostFilter(), ctx->getGlobalContext()->getSettingsRef().s3_max_redirects,
ctx->getGlobalContext()->getSettingsRef().enable_s3_requests_logging);
2021-03-16 18:41:29 +00:00
client_configuration.endpointOverride = upd.uri.endpoint;
2022-04-06 20:27:38 +00:00
client_configuration.maxConnections = upd.rw_settings.max_connections;
2021-03-16 18:41:29 +00:00
upd.client = S3::ClientFactory::instance().create(
client_configuration,
2021-03-16 18:41:29 +00:00
upd.uri.is_virtual_hosted_style,
credentials.GetAWSAccessKeyId(),
credentials.GetAWSSecretKey(),
2022-04-06 20:27:38 +00:00
settings.auth_settings.server_side_encryption_customer_key_base64,
std::move(headers),
2022-04-06 20:27:38 +00:00
settings.auth_settings.use_environment_credentials.value_or(ctx->getConfigRef().getBool("s3.use_environment_credentials", false)),
settings.auth_settings.use_insecure_imds_request.value_or(ctx->getConfigRef().getBool("s3.use_insecure_imds_request", false)));
2022-04-06 20:27:38 +00:00
upd.auth_settings = std::move(settings.auth_settings);
2022-04-03 22:33:59 +00:00
}
void StorageS3::processNamedCollectionResult(StorageS3Configuration & configuration, const std::vector<std::pair<String, ASTPtr>> & key_value_args)
{
for (const auto & [arg_name, arg_value] : key_value_args)
{
if (arg_name == "access_key_id")
configuration.auth_settings.access_key_id = arg_value->as<ASTLiteral>()->value.safeGet<String>();
else if (arg_name == "secret_access_key")
configuration.auth_settings.secret_access_key = arg_value->as<ASTLiteral>()->value.safeGet<String>();
else if (arg_name == "filename")
configuration.url = std::filesystem::path(configuration.url) / arg_value->as<ASTLiteral>()->value.safeGet<String>();
else if (arg_name == "use_environment_credentials")
configuration.auth_settings.use_environment_credentials = arg_value->as<ASTLiteral>()->value.safeGet<UInt8>();
else if (arg_name == "max_single_read_retries")
configuration.rw_settings.max_single_read_retries = arg_value->as<ASTLiteral>()->value.safeGet<UInt64>();
else if (arg_name == "min_upload_part_size")
configuration.rw_settings.max_single_read_retries = arg_value->as<ASTLiteral>()->value.safeGet<UInt64>();
else if (arg_name == "upload_part_size_multiply_factor")
configuration.rw_settings.max_single_read_retries = arg_value->as<ASTLiteral>()->value.safeGet<UInt64>();
else if (arg_name == "upload_part_size_multiply_parts_count_threshold")
configuration.rw_settings.max_single_read_retries = arg_value->as<ASTLiteral>()->value.safeGet<UInt64>();
else if (arg_name == "max_single_part_upload_size")
configuration.rw_settings.max_single_read_retries = arg_value->as<ASTLiteral>()->value.safeGet<UInt64>();
else if (arg_name == "max_connections")
configuration.rw_settings.max_single_read_retries = arg_value->as<ASTLiteral>()->value.safeGet<UInt64>();
else
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Unknown key-value argument `{}` for StorageS3, expected: url, [access_key_id, secret_access_key], name of used format and [compression_method].",
arg_name);
}
}
2021-09-07 11:17:25 +00:00
StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPtr local_context)
2019-05-23 09:03:39 +00:00
{
2021-09-15 22:45:43 +00:00
StorageS3Configuration configuration;
2019-06-17 00:06:14 +00:00
2021-09-15 22:45:43 +00:00
if (auto named_collection = getURLBasedDataSourceConfiguration(engine_args, local_context))
2019-06-17 00:06:14 +00:00
{
2021-09-15 22:45:43 +00:00
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
2022-04-03 22:33:59 +00:00
processNamedCollectionResult(configuration, storage_specific_args);
2021-09-07 11:17:25 +00:00
}
else
{
2022-01-14 13:27:57 +00:00
if (engine_args.empty() || engine_args.size() > 5)
2019-06-17 00:06:14 +00:00
throw Exception(
"Storage S3 requires 1 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method].",
2021-04-08 00:09:15 +00:00
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
2019-06-17 00:06:14 +00:00
2020-03-09 01:22:33 +00:00
for (auto & engine_arg : engine_args)
2021-09-07 11:17:25 +00:00
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context);
2021-09-09 09:18:08 +00:00
configuration.url = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
2021-09-07 11:17:25 +00:00
if (engine_args.size() >= 4)
{
2022-04-03 22:33:59 +00:00
configuration.auth_settings.access_key_id = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
configuration.auth_settings.secret_access_key = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
2021-09-07 11:17:25 +00:00
}
if (engine_args.size() == 3 || engine_args.size() == 5)
{
configuration.compression_method = engine_args.back()->as<ASTLiteral &>().value.safeGet<String>();
configuration.format = engine_args[engine_args.size() - 2]->as<ASTLiteral &>().value.safeGet<String>();
}
else if (engine_args.size() != 1)
2021-09-07 11:17:25 +00:00
{
configuration.compression_method = "auto";
configuration.format = engine_args.back()->as<ASTLiteral &>().value.safeGet<String>();
}
}
2019-06-17 00:06:14 +00:00
if (configuration.format == "auto")
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.url, true);
2021-09-07 11:17:25 +00:00
return configuration;
}
ColumnsDescription StorageS3::getTableStructureFromData(
const String & format,
const S3::URI & uri,
const String & access_key_id,
const String & secret_access_key,
const String & compression_method,
bool distributed_processing,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx)
{
2022-04-06 20:27:38 +00:00
S3Configuration s3_configuration{ uri, access_key_id, secret_access_key, {}, {}, S3Settings::ReadWriteSettings(ctx->getSettingsRef()) };
2022-04-03 22:33:59 +00:00
updateS3Configuration(ctx, s3_configuration);
return getTableStructureFromDataImpl(format, s3_configuration, compression_method, distributed_processing, uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx);
}
ColumnsDescription StorageS3::getTableStructureFromDataImpl(
const String & format,
2022-04-03 22:33:59 +00:00
const S3Configuration & s3_configuration,
const String & compression_method,
bool distributed_processing,
bool is_key_with_globs,
const std::optional<FormatSettings> & format_settings,
2022-04-19 18:23:04 +00:00
ContextPtr ctx,
std::vector<String> * read_keys_in_distributed_processing)
{
2022-05-19 11:18:58 +00:00
auto file_iterator
= createFileIterator(s3_configuration, {s3_configuration.uri.key}, is_key_with_globs, distributed_processing, ctx, nullptr, {});
2022-02-09 16:14:14 +00:00
ReadBufferIterator read_buffer_iterator = [&, first = true]() mutable -> std::unique_ptr<ReadBuffer>
{
2022-04-19 19:16:47 +00:00
auto key = (*file_iterator)();
2022-04-19 18:23:04 +00:00
2022-04-19 19:16:47 +00:00
if (key.empty())
2022-02-09 16:14:14 +00:00
{
2022-04-19 19:16:47 +00:00
if (first)
2022-02-09 16:14:14 +00:00
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, because there are no files with provided path in S3. You must specify "
"table structure manually",
format);
2022-04-19 19:16:47 +00:00
return nullptr;
2022-02-09 16:14:14 +00:00
}
if (distributed_processing && read_keys_in_distributed_processing)
2022-05-03 12:47:24 +00:00
read_keys_in_distributed_processing->push_back(key);
2022-04-19 19:16:47 +00:00
first = false;
const auto zstd_window_log_max = ctx->getSettingsRef().zstd_window_log_max;
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>(
2022-04-26 10:39:16 +00:00
s3_configuration.client, s3_configuration.uri.bucket, key, s3_configuration.uri.version_id, s3_configuration.rw_settings.max_single_read_retries, ctx->getReadSettings()),
chooseCompressionMethod(key, compression_method),
zstd_window_log_max);
};
2022-04-19 19:16:47 +00:00
return readSchemaFromFormat(format, format_settings, read_buffer_iterator, is_key_with_globs, ctx);
}
2019-06-17 00:06:14 +00:00
2021-09-07 11:17:25 +00:00
void registerStorageS3Impl(const String & name, StorageFactory & factory)
{
factory.registerStorage(name, [](const StorageFactory::Arguments & args)
{
auto & engine_args = args.engine_args;
if (engine_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
2021-09-15 18:11:49 +00:00
auto configuration = StorageS3::getConfiguration(engine_args, args.getLocalContext());
2021-08-23 19:05:28 +00:00
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
std::optional<FormatSettings> format_settings;
if (args.storage_def->settings)
{
FormatFactorySettings user_format_settings;
// Apply changed settings from global context, but ignore the
// unknown ones, because we only have the format settings here.
const auto & changes = args.getContext()->getSettingsRef().changes();
for (const auto & change : changes)
{
if (user_format_settings.has(change.name))
user_format_settings.set(change.name, change.value);
}
// Apply changes from SETTINGS clause, with validation.
user_format_settings.applyChanges(args.storage_def->settings->changes);
format_settings = getFormatSettings(args.getContext(), user_format_settings);
}
else
{
format_settings = getFormatSettings(args.getContext());
}
2021-09-07 11:17:25 +00:00
S3::URI s3_uri(Poco::URI(configuration.url));
2021-10-26 12:22:13 +00:00
ASTPtr partition_by;
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
return std::make_shared<StorageS3>(
s3_uri,
2022-04-03 22:33:59 +00:00
configuration.auth_settings.access_key_id,
configuration.auth_settings.secret_access_key,
args.table_id,
2021-09-07 11:17:25 +00:00
configuration.format,
2022-04-06 20:27:38 +00:00
configuration.rw_settings,
args.columns,
args.constraints,
2021-04-23 12:18:23 +00:00
args.comment,
args.getContext(),
2021-08-23 19:05:28 +00:00
format_settings,
2021-10-26 12:22:13 +00:00
configuration.compression_method,
/* distributed_processing_ */false,
partition_by);
},
{
2021-08-23 19:05:28 +00:00
.supports_settings = true,
2021-10-26 12:22:13 +00:00
.supports_sort_order = true, // for partition by
2021-12-17 15:34:13 +00:00
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
2019-06-17 00:06:14 +00:00
});
2019-05-23 09:03:39 +00:00
}
2019-12-11 14:21:48 +00:00
void registerStorageS3(StorageFactory & factory)
{
return registerStorageS3Impl("S3", factory);
}
void registerStorageCOS(StorageFactory & factory)
{
return registerStorageS3Impl("COSN", factory);
}
NamesAndTypesList StorageS3::getVirtuals() const
2020-04-27 13:55:30 +00:00
{
2022-03-28 19:18:20 +00:00
return virtual_columns;
2020-04-27 13:55:30 +00:00
}
2021-07-14 08:49:05 +00:00
bool StorageS3::supportsPartitionBy() const
{
return true;
}
2019-05-23 09:03:39 +00:00
}
2019-12-11 14:21:48 +00:00
#endif