This commit is contained in:
Nikita Mikhaylov 2021-08-23 19:05:28 +00:00
parent 04f469e8e0
commit ae241fc129
9 changed files with 61 additions and 2 deletions

View File

@ -35,6 +35,7 @@ SRCS(
CompressionFactoryAdditions.cpp
ICompressionCodec.cpp
LZ4_decompress_faster.cpp
fuzzers/compressed_buffer_fuzzer.cpp
getCompressionCodecForFile.cpp
)

View File

@ -44,6 +44,7 @@ SRCS(
SettingsQuirks.cpp
SortDescription.cpp
UUID.cpp
fuzzers/names_and_types_fuzzer.cpp
iostream_debug_helpers.cpp
)

View File

@ -1,4 +1,5 @@
#include <Common/config.h>
#include "Parsers/ASTCreateQuery.h"
#if USE_AWS_S3
@ -192,6 +193,7 @@ StorageS3Source::StorageS3Source(
String name_,
const Block & sample_block_,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
const ColumnsDescription & columns_,
UInt64 max_block_size_,
UInt64 max_single_read_retries_,
@ -210,6 +212,7 @@ StorageS3Source::StorageS3Source(
, compression_hint(compression_hint_)
, client(client_)
, sample_block(sample_block_)
, format_settings(format_settings_)
, with_file_column(need_file)
, with_path_column(need_path)
, file_iterator(file_iterator_)
@ -229,7 +232,7 @@ bool StorageS3Source::initialize()
read_buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>(client, bucket, current_key, max_single_read_retries, DBMS_DEFAULT_BUFFER_SIZE),
chooseCompressionMethod(current_key, compression_hint));
auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, getContext(), max_block_size);
auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, getContext(), max_block_size, format_settings);
pipeline = std::make_unique<QueryPipeline>();
pipeline->init(Pipe(input_format));
@ -292,6 +295,7 @@ public:
const String & format,
const Block & sample_block_,
ContextPtr context,
std::optional<FormatSettings> format_settings_,
const CompressionMethod compression_method,
const std::shared_ptr<Aws::S3::S3Client> & client,
const String & bucket,
@ -300,10 +304,11 @@ public:
size_t max_single_part_upload_size)
: SinkToStorage(sample_block_)
, sample_block(sample_block_)
, format_settings(format_settings_)
{
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromS3>(client, bucket, key, min_upload_part_size, max_single_part_upload_size), compression_method, 3);
writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, *write_buf, sample_block, context);
writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, *write_buf, sample_block, context, {}, format_settings);
}
String getName() const override { return "StorageS3Sink"; }
@ -336,6 +341,7 @@ public:
private:
Block sample_block;
std::optional<FormatSettings> format_settings;
std::unique_ptr<WriteBuffer> write_buf;
BlockOutputStreamPtr writer;
bool is_first_chunk = true;
@ -535,6 +541,7 @@ StorageS3::StorageS3(
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
const String & compression_method_,
bool distributed_processing_)
: IStorage(table_id_)
@ -546,6 +553,7 @@ StorageS3::StorageS3(
, compression_method(compression_method_)
, name(uri_.storage_name)
, distributed_processing(distributed_processing_)
, format_settings(format_settings_)
{
context_->getGlobalContext()->getRemoteHostFilter().checkURL(uri_.uri);
StorageInMemoryMetadata storage_metadata;
@ -606,6 +614,7 @@ Pipe StorageS3::read(
getName(),
metadata_snapshot->getSampleBlock(),
local_context,
format_settings,
metadata_snapshot->getColumns(),
max_block_size,
max_single_read_retries,
@ -732,6 +741,34 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.getLocalContext());
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
std::optional<FormatSettings> format_settings;
if (args.storage_def->settings)
{
FormatFactorySettings user_format_settings;
// Apply changed settings from global context, but ignore the
// unknown ones, because we only have the format settings here.
const auto & changes = args.getContext()->getSettingsRef().changes();
for (const auto & change : changes)
{
if (user_format_settings.has(change.name))
{
user_format_settings.set(change.name, change.value);
}
}
// Apply changes from SETTINGS clause, with validation.
user_format_settings.applyChanges(args.storage_def->settings->changes);
format_settings = getFormatSettings(args.getContext(), user_format_settings);
}
else
{
format_settings = getFormatSettings(args.getContext());
}
String url = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
Poco::URI uri (url);
S3::URI s3_uri (uri);
@ -776,9 +813,11 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
args.constraints,
args.comment,
args.getContext(),
format_settings,
compression_method);
},
{
.supports_settings = true,
.source_access_type = AccessType::S3,
});
}

View File

@ -54,6 +54,7 @@ public:
String name_,
const Block & sample_block,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
const ColumnsDescription & columns_,
UInt64 max_block_size_,
UInt64 max_single_read_retries_,
@ -77,6 +78,7 @@ private:
String compression_hint;
std::shared_ptr<Aws::S3::S3Client> client;
Block sample_block;
std::optional<FormatSettings> format_settings;
std::unique_ptr<ReadBuffer> read_buf;
@ -113,6 +115,7 @@ public:
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
const String & compression_method_ = "",
bool distributed_processing_ = false);
@ -162,6 +165,7 @@ private:
String compression_method;
String name;
const bool distributed_processing;
std::optional<FormatSettings> format_settings;
static void updateClientAndAuthSettings(ContextPtr, ClientAuthentication &);
};

View File

@ -222,6 +222,8 @@ SRCS(
TTLDescription.cpp
VirtualColumnUtils.cpp
extractKeyExpressionList.cpp
fuzzers/columns_description_fuzzer.cpp
fuzzers/mergetree_checksum_fuzzer.cpp
getStructureOfRemoteTable.cpp
registerStorages.cpp
transformQueryForExternalDatabase.cpp

View File

@ -103,6 +103,8 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context
ConstraintsDescription{},
String{},
context,
/// No format_settings for table function S3
std::nullopt,
compression_method);
storage->startup();

View File

@ -128,6 +128,8 @@ StoragePtr TableFunctionS3Cluster::executeImpl(
ConstraintsDescription{},
String{},
context,
// No format_settings for S3Cluster, but maybe we could get them from current context ?
std::nullopt,
compression_method,
/*distributed_processing=*/true);
}

View File

@ -0,0 +1,8 @@
CREATE TABLE table_with_range
(
`name` String,
`value` UInt32
)
ENGINE = S3('https://storage.yandexcloud.net/my-test-bucket-768/{some,another}_prefix/some_file_{1..3}', 'CSV')
SETTINGS input_format_with_names_use_header = 0;