Merge pull request #48092 from ClickHouse/nosign-keyword-for-s3

Add support for `NOSIGN` keyword and `no_sign_request` config for S3
This commit is contained in:
Antonio Andelic 2023-03-30 18:10:56 +02:00 committed by GitHub
commit 80cb121d2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 264 additions and 79 deletions

View File

@ -12,7 +12,7 @@ This engine provides integration with [Amazon S3](https://aws.amazon.com/s3/) ec
``` sql
CREATE TABLE s3_engine_table (name String, value UInt32)
ENGINE = S3(path, [aws_access_key_id, aws_secret_access_key,] format, [compression])
ENGINE = S3(path [, NOSIGN | aws_access_key_id, aws_secret_access_key,] format, [compression])
[PARTITION BY expr]
[SETTINGS ...]
```
@ -20,6 +20,7 @@ CREATE TABLE s3_engine_table (name String, value UInt32)
**Engine parameters**
- `path` — Bucket url with path to file. Supports following wildcards in readonly mode: `*`, `?`, `{abc,def}` and `{N..M}` where `N`, `M` — numbers, `'abc'`, `'def'` — strings. For more information see [below](#wildcards-in-path).
- `NOSIGN` - If this keyword is provided in place of credentials, all the requests will not be signed.
- `format` — The [format](../../../interfaces/formats.md#formats) of the file.
- `aws_access_key_id`, `aws_secret_access_key` - Long-term credentials for the [AWS](https://aws.amazon.com/) account user. You can use these to authenticate your requests. Parameter is optional. If credentials are not specified, they are used from the configuration file. For more information see [Using S3 for Data Storage](../mergetree-family/mergetree.md#table_engine-mergetree-s3).
- `compression` — Compression type. Supported values: `none`, `gzip/gz`, `brotli/br`, `xz/LZMA`, `zstd/zst`. Parameter is optional. By default, it will autodetect compression by file extension.
@ -151,6 +152,7 @@ The following settings can be specified in configuration file for given endpoint
- `region` — Specifies S3 region name. Optional.
- `use_insecure_imds_request` — If set to `true`, S3 client will use insecure IMDS request while obtaining credentials from Amazon EC2 metadata. Optional, default value is `false`.
- `expiration_window_seconds` — Grace period for checking if expiration-based credentials have expired. Optional, default value is `120`.
- `no_sign_request` - Ignore all the credentials so requests are not signed. Useful for accessing public buckets.
- `header` — Adds specified HTTP header to a request to given endpoint. Optional, can be specified multiple times.
- `server_side_encryption_customer_key_base64` — If specified, required headers for accessing S3 objects with SSE-C encryption will be set. Optional.
- `max_single_read_retries` — The maximum number of attempts during single read. Default value is `4`. Optional.
@ -168,6 +170,7 @@ The following settings can be specified in configuration file for given endpoint
<!-- <use_environment_credentials>false</use_environment_credentials> -->
<!-- <use_insecure_imds_request>false</use_insecure_imds_request> -->
<!-- <expiration_window_seconds>120</expiration_window_seconds> -->
<!-- <no_sign_request>false</no_sign_request> -->
<!-- <header>Authorization: Bearer SOME-TOKEN</header> -->
<!-- <server_side_encryption_customer_key_base64>BASE64-ENCODED-KEY</server_side_encryption_customer_key_base64> -->
<!-- <max_single_read_retries>4</max_single_read_retries> -->
@ -175,6 +178,17 @@ The following settings can be specified in configuration file for given endpoint
</s3>
```
## Accessing public buckets
ClickHouse tries to fetch credentials from many different types of sources.
Sometimes, it can produce problems when accessing some buckets that are public causing the client to return `403` error code.
This issue can be avoided by using `NOSIGN` keyword, forcing the client to ignore all the credentials, and not sign the requests.
``` sql
CREATE TABLE big_table (name String, value UInt32)
ENGINE = S3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/aapl_stock.csv', NOSIGN, 'CSVWithNames');
```
## See also
- [s3 table function](../../../sql-reference/table-functions/s3.md)

View File

@ -12,7 +12,7 @@ Provides a table-like interface to select/insert files in [Amazon S3](https://aw
**Syntax**
``` sql
s3(path [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
s3(path [, NOSIGN | aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
```
:::tip GCS
@ -33,6 +33,7 @@ For GCS, substitute your HMAC key and HMAC secret where you see `aws_access_key_
and not ~~https://storage.cloud.google.com~~.
:::
- `NOSIGN` - If this keyword is provided in place of credentials, all the requests will not be signed.
- `format` — The [format](../../interfaces/formats.md#formats) of the file.
- `structure` — Structure of the table. Format `'column1_name column1_type, column2_name column2_type, ...'`.
- `compression` — Parameter is optional. Supported values: `none`, `gzip/gz`, `brotli/br`, `xz/LZMA`, `zstd/zst`. By default, it will autodetect compression by file extension.
@ -185,6 +186,21 @@ INSERT INTO TABLE FUNCTION
```
As a result, the data is written into three files in different buckets: `my_bucket_1/file.csv`, `my_bucket_10/file.csv`, and `my_bucket_20/file.csv`.
## Accessing public buckets
ClickHouse tries to fetch credentials from many different types of sources.
Sometimes, it can produce problems when accessing some buckets that are public causing the client to return `403` error code.
This issue can be avoided by using `NOSIGN` keyword, forcing the client to ignore all the credentials, and not sign the requests.
``` sql
SELECT *
FROM s3(
'https://datasets-documentation.s3.eu-west-3.amazonaws.com/aapl_stock.csv',
NOSIGN,
'CSVWithNames'
)
LIMIT 5;
```
**See Also**

View File

@ -66,12 +66,17 @@ namespace
credentials.GetAWSSecretKey(),
settings.auth_settings.server_side_encryption_customer_key_base64,
std::move(headers),
settings.auth_settings.use_environment_credentials.value_or(
context->getConfigRef().getBool("s3.use_environment_credentials", false)),
settings.auth_settings.use_insecure_imds_request.value_or(
context->getConfigRef().getBool("s3.use_insecure_imds_request", false)),
settings.auth_settings.expiration_window_seconds.value_or(
context->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)));
S3::CredentialsConfiguration
{
settings.auth_settings.use_environment_credentials.value_or(
context->getConfigRef().getBool("s3.use_environment_credentials", false)),
settings.auth_settings.use_insecure_imds_request.value_or(
context->getConfigRef().getBool("s3.use_insecure_imds_request", false)),
settings.auth_settings.expiration_window_seconds.value_or(
context->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
settings.auth_settings.no_sign_request.value_or(
context->getConfigRef().getBool("s3.no_sign_request", false)),
});
}
Aws::Vector<Aws::S3::Model::Object> listObjects(S3::Client & client, const S3::URI & s3_uri, const String & file_name)

View File

@ -103,9 +103,13 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
credentials.GetAWSSecretKey(),
auth_settings.server_side_encryption_customer_key_base64,
std::move(headers),
auth_settings.use_environment_credentials.value_or(false),
auth_settings.use_insecure_imds_request.value_or(false),
auth_settings.expiration_window_seconds.value_or(S3::DEFAULT_EXPIRATION_WINDOW_SECONDS));
S3::CredentialsConfiguration
{
auth_settings.use_environment_credentials.value_or(false),
auth_settings.use_insecure_imds_request.value_or(false),
auth_settings.expiration_window_seconds.value_or(S3::DEFAULT_EXPIRATION_WINDOW_SECONDS),
auth_settings.no_sign_request.value_or(false),
});
auto new_client = std::make_shared<KeeperSnapshotManagerS3::S3Configuration>(std::move(new_uri), std::move(auth_settings), std::move(client));

View File

@ -152,9 +152,13 @@ std::unique_ptr<S3::Client> getClient(
config.getString(config_prefix + ".secret_access_key", ""),
config.getString(config_prefix + ".server_side_encryption_customer_key_base64", ""),
{},
config.getBool(config_prefix + ".use_environment_credentials", config.getBool("s3.use_environment_credentials", false)),
config.getBool(config_prefix + ".use_insecure_imds_request", config.getBool("s3.use_insecure_imds_request", false)),
config.getUInt64(config_prefix + ".expiration_window_seconds", config.getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)));
S3::CredentialsConfiguration
{
config.getBool(config_prefix + ".use_environment_credentials", config.getBool("s3.use_environment_credentials", false)),
config.getBool(config_prefix + ".use_insecure_imds_request", config.getBool("s3.use_insecure_imds_request", false)),
config.getUInt64(config_prefix + ".expiration_window_seconds", config.getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
config.getBool(config_prefix + ".no_sign_request", config.getBool("s3.no_sign_request", false))
});
}
}

View File

@ -563,9 +563,7 @@ std::unique_ptr<S3::Client> ClientFactory::create( // NOLINT
const String & secret_access_key,
const String & server_side_encryption_customer_key_base64,
HTTPHeaderEntries headers,
bool use_environment_credentials,
bool use_insecure_imds_request,
uint64_t expiration_window_seconds)
CredentialsConfiguration credentials_configuration)
{
PocoHTTPClientConfiguration client_configuration = cfg_;
client_configuration.updateSchemeAndRegion();
@ -592,9 +590,7 @@ std::unique_ptr<S3::Client> ClientFactory::create( // NOLINT
auto credentials_provider = std::make_shared<S3CredentialsProviderChain>(
client_configuration,
std::move(credentials),
use_environment_credentials,
use_insecure_imds_request,
expiration_window_seconds);
credentials_configuration);
client_configuration.retryStrategy = std::make_shared<Client::RetryStrategy>(std::move(client_configuration.retryStrategy));
return Client::create(

View File

@ -228,9 +228,7 @@ public:
const String & secret_access_key,
const String & server_side_encryption_customer_key_base64,
HTTPHeaderEntries headers,
bool use_environment_credentials,
bool use_insecure_imds_request,
uint64_t expiration_window_seconds = DEFAULT_EXPIRATION_WINDOW_SECONDS);
CredentialsConfiguration credentials_configuration);
PocoHTTPClientConfiguration createClientConfiguration(
const String & force_region,

View File

@ -418,12 +418,14 @@ void AwsAuthSTSAssumeRoleWebIdentityCredentialsProvider::refreshIfExpired()
S3CredentialsProviderChain::S3CredentialsProviderChain(
const DB::S3::PocoHTTPClientConfiguration & configuration,
const Aws::Auth::AWSCredentials & credentials,
bool use_environment_credentials,
bool use_insecure_imds_request,
uint64_t expiration_window_seconds)
CredentialsConfiguration credentials_configuration)
{
auto * logger = &Poco::Logger::get("S3CredentialsProviderChain");
/// we don't provide any credentials to avoid signing
if (credentials_configuration.no_sign_request)
return;
/// add explicit credentials to the front of the chain
/// because it's manually defined by the user
if (!credentials.IsEmpty())
@ -432,7 +434,7 @@ S3CredentialsProviderChain::S3CredentialsProviderChain(
return;
}
if (use_environment_credentials)
if (credentials_configuration.use_environment_credentials)
{
static const char AWS_ECS_CONTAINER_CREDENTIALS_RELATIVE_URI[] = "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI";
static const char AWS_ECS_CONTAINER_CREDENTIALS_FULL_URI[] = "AWS_CONTAINER_CREDENTIALS_FULL_URI";
@ -453,7 +455,7 @@ S3CredentialsProviderChain::S3CredentialsProviderChain(
configuration.for_disk_s3,
configuration.get_request_throttler,
configuration.put_request_throttler);
AddProvider(std::make_shared<AwsAuthSTSAssumeRoleWebIdentityCredentialsProvider>(aws_client_configuration, expiration_window_seconds));
AddProvider(std::make_shared<AwsAuthSTSAssumeRoleWebIdentityCredentialsProvider>(aws_client_configuration, credentials_configuration.expiration_window_seconds));
}
AddProvider(std::make_shared<Aws::Auth::EnvironmentAWSCredentialsProvider>());
@ -519,7 +521,7 @@ S3CredentialsProviderChain::S3CredentialsProviderChain(
aws_client_configuration.retryStrategy = std::make_shared<Aws::Client::DefaultRetryStrategy>(1, 1000);
auto ec2_metadata_client = InitEC2MetadataClient(aws_client_configuration);
auto config_loader = std::make_shared<AWSEC2InstanceProfileConfigLoader>(ec2_metadata_client, !use_insecure_imds_request);
auto config_loader = std::make_shared<AWSEC2InstanceProfileConfigLoader>(ec2_metadata_client, !credentials_configuration.use_insecure_imds_request);
AddProvider(std::make_shared<AWSInstanceProfileCredentialsProvider>(config_loader));
LOG_INFO(logger, "Added EC2 metadata service credentials provider to the provider chain.");

View File

@ -121,15 +121,21 @@ private:
uint64_t expiration_window_seconds;
};
struct CredentialsConfiguration
{
bool use_environment_credentials = false;
bool use_insecure_imds_request = false;
uint64_t expiration_window_seconds = DEFAULT_EXPIRATION_WINDOW_SECONDS;
bool no_sign_request = false;
};
class S3CredentialsProviderChain : public Aws::Auth::AWSCredentialsProviderChain
{
public:
S3CredentialsProviderChain(
const DB::S3::PocoHTTPClientConfiguration & configuration,
const Aws::Auth::AWSCredentials & credentials,
bool use_environment_credentials,
bool use_insecure_imds_request,
uint64_t expiration_window_seconds);
CredentialsConfiguration credentials_configuration);
};
}

View File

@ -1,5 +1,6 @@
#include <gtest/gtest.h>
#include "IO/S3/Credentials.h"
#include "config.h"
@ -109,8 +110,11 @@ TEST(IOTestAwsS3Client, AppendExtraSSECHeaders)
secret_access_key,
server_side_encryption_customer_key_base64,
headers,
use_environment_credentials,
use_insecure_imds_request
DB::S3::CredentialsConfiguration
{
.use_environment_credentials = use_environment_credentials,
.use_insecure_imds_request = use_insecure_imds_request
}
);
ASSERT_TRUE(client);

View File

@ -89,6 +89,10 @@ AuthSettings AuthSettings::loadFromConfig(const std::string & config_elem, const
if (config.has(config_elem + ".expiration_window_seconds"))
expiration_window_seconds = config.getUInt64(config_elem + ".expiration_window_seconds");
std::optional<bool> no_sign_request;
if (config.has(config_elem + ".no_sign_request"))
no_sign_request = config.getBool(config_elem + ".no_sign_request");
HTTPHeaderEntries headers;
Poco::Util::AbstractConfiguration::Keys subconfig_keys;
config.keys(config_elem, subconfig_keys);
@ -112,7 +116,8 @@ AuthSettings AuthSettings::loadFromConfig(const std::string & config_elem, const
std::move(headers),
use_environment_credentials,
use_insecure_imds_request,
expiration_window_seconds
expiration_window_seconds,
no_sign_request
};
}
@ -130,9 +135,18 @@ void AuthSettings::updateFrom(const AuthSettings & from)
headers = from.headers;
region = from.region;
server_side_encryption_customer_key_base64 = from.server_side_encryption_customer_key_base64;
use_environment_credentials = from.use_environment_credentials;
use_insecure_imds_request = from.use_insecure_imds_request;
expiration_window_seconds = from.expiration_window_seconds;
if (from.use_environment_credentials.has_value())
use_environment_credentials = from.use_environment_credentials;
if (from.use_insecure_imds_request.has_value())
use_insecure_imds_request = from.use_insecure_imds_request;
if (from.expiration_window_seconds.has_value())
expiration_window_seconds = from.expiration_window_seconds;
if (from.no_sign_request.has_value())
no_sign_request = *from.no_sign_request;
}
}

View File

@ -85,6 +85,7 @@ struct AuthSettings
std::optional<bool> use_environment_credentials;
std::optional<bool> use_insecure_imds_request;
std::optional<uint64_t> expiration_window_seconds;
std::optional<bool> no_sign_request;
bool operator==(const AuthSettings & other) const = default;

View File

@ -63,6 +63,8 @@
#include <QueryPipeline/Pipe.h>
#include <filesystem>
#include <boost/algorithm/string.hpp>
namespace fs = std::filesystem;
@ -101,6 +103,8 @@ static const std::unordered_set<std::string_view> optional_configuration_keys =
"upload_part_size_multiply_parts_count_threshold",
"max_single_part_upload_size",
"max_connections",
"expiration_window_seconds",
"no_sign_request"
};
namespace ErrorCodes
@ -1272,9 +1276,13 @@ void StorageS3::updateConfiguration(ContextPtr ctx, StorageS3::Configuration & u
credentials.GetAWSSecretKey(),
upd.auth_settings.server_side_encryption_customer_key_base64,
std::move(headers),
upd.auth_settings.use_environment_credentials.value_or(ctx->getConfigRef().getBool("s3.use_environment_credentials", false)),
upd.auth_settings.use_insecure_imds_request.value_or(ctx->getConfigRef().getBool("s3.use_insecure_imds_request", false)),
upd.auth_settings.expiration_window_seconds.value_or(ctx->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)));
S3::CredentialsConfiguration{
upd.auth_settings.use_environment_credentials.value_or(ctx->getConfigRef().getBool("s3.use_environment_credentials", false)),
upd.auth_settings.use_insecure_imds_request.value_or(ctx->getConfigRef().getBool("s3.use_insecure_imds_request", false)),
upd.auth_settings.expiration_window_seconds.value_or(
ctx->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
upd.auth_settings.no_sign_request.value_or(ctx->getConfigRef().getBool("s3.no_sign_request", false)),
});
}
void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection)
@ -1290,6 +1298,8 @@ void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configur
configuration.auth_settings.access_key_id = collection.getOrDefault<String>("access_key_id", "");
configuration.auth_settings.secret_access_key = collection.getOrDefault<String>("secret_access_key", "");
configuration.auth_settings.use_environment_credentials = collection.getOrDefault<UInt64>("use_environment_credentials", 0);
configuration.auth_settings.no_sign_request = collection.getOrDefault<bool>("no_sign_request", false);
configuration.auth_settings.expiration_window_seconds = collection.getOrDefault<UInt64>("expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS);
configuration.format = collection.getOrDefault<String>("format", "auto");
configuration.compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
@ -1313,6 +1323,9 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
/// S3('url')
/// S3('url', 'format')
/// S3('url', 'format', 'compression')
/// S3('url', NOSIGN)
/// S3('url', NOSIGN, 'format')
/// S3('url', NOSIGN, 'format', 'compression')
/// S3('url', 'aws_access_key_id', 'aws_secret_access_key')
/// S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'format')
/// S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'format', 'compression')
@ -1321,7 +1334,7 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
if (engine_args.empty() || engine_args.size() > 5)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage S3 requires 1 to 5 arguments: "
"url, [access_key_id, secret_access_key], name of used format and [compression_method]");
"url, [NOSIGN | access_key_id, secret_access_key], name of used format and [compression_method]");
auto * header_it = StorageURL::collectHeaders(engine_args, configuration.headers_from_ast, local_context);
if (header_it != engine_args.end())
@ -1334,24 +1347,57 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
static std::unordered_map<size_t, std::unordered_map<std::string_view, size_t>> size_to_engine_args
{
{1, {{}}},
{2, {{"format", 1}}},
{4, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}}},
{5, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"compression_method", 4}}}
};
std::unordered_map<std::string_view, size_t> engine_args_to_idx;
/// For 3 arguments we support 2 possible variants:
/// s3(source, format, compression_method) and s3(source, access_key_id, access_key_id)
/// We can distinguish them by looking at the 2-nd argument: check if it's a format name or not.
if (engine_args.size() == 3)
{
auto second_arg = checkAndGetLiteralArgument<String>(engine_args[1], "format/access_key_id");
if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
engine_args_to_idx = {{"format", 1}, {"compression_method", 2}};
bool no_sign_request = false;
/// For 2 arguments we support 2 possible variants:
/// - s3(source, format)
/// - s3(source, NOSIGN)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or not.
if (engine_args.size() == 2)
{
auto second_arg = checkAndGetLiteralArgument<String>(engine_args[1], "format/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
no_sign_request = true;
else
engine_args_to_idx = {{"format", 1}};
}
/// For 3 arguments we support 2 possible variants:
/// - s3(source, format, compression_method)
/// - s3(source, access_key_id, access_key_id)
/// - s3(source, NOSIGN, format)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or format name.
else if (engine_args.size() == 3)
{
auto second_arg = checkAndGetLiteralArgument<String>(engine_args[1], "format/access_key_id/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
{
no_sign_request = true;
engine_args_to_idx = {{"format", 2}};
}
else if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
engine_args_to_idx = {{"format", 1}, {"compression_method", 2}};
else
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}};
}
/// For 4 arguments we support 2 possible variants:
/// - s3(source, access_key_id, secret_access_key, format)
/// - s3(source, NOSIGN, format, compression_method)
/// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN or not.
else if (engine_args.size() == 4)
{
auto second_arg = checkAndGetLiteralArgument<String>(engine_args[1], "access_key_id/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
{
no_sign_request = true;
engine_args_to_idx = {{"format", 2}, {"compression_method", 3}};
}
else
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}};
}
else
{
engine_args_to_idx = size_to_engine_args[engine_args.size()];
@ -1371,6 +1417,8 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
if (engine_args_to_idx.contains("secret_access_key"))
configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(engine_args[engine_args_to_idx["secret_access_key"]], "secret_access_key");
configuration.auth_settings.no_sign_request = no_sign_request;
}
configuration.static_configuration = !configuration.auth_settings.access_key_id.empty();

View File

@ -18,6 +18,8 @@
#include "registerTableFunctions.h"
#include <filesystem>
#include <boost/algorithm/string.hpp>
namespace DB
{
@ -52,36 +54,76 @@ void TableFunctionS3::parseArgumentsImpl(
static std::unordered_map<size_t, std::unordered_map<std::string_view, size_t>> size_to_args
{
{1, {{}}},
{2, {{"format", 1}}},
{5, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}}},
{6, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}, {"compression_method", 5}}}
};
std::unordered_map<std::string_view, size_t> args_to_idx;
/// For 4 arguments we support 2 possible variants:
/// s3(source, format, structure, compression_method) and s3(source, access_key_id, access_key_id, format)
/// We can distinguish them by looking at the 2-nd argument: check if it's a format name or not.
if (args.size() == 4)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/access_key_id");
if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
args_to_idx = {{"format", 1}, {"structure", 2}, {"compression_method", 3}};
bool no_sign_request = false;
/// For 2 arguments we support 2 possible variants:
/// - s3(source, format)
/// - s3(source, NOSIGN)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or not.
if (args.size() == 2)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
no_sign_request = true;
else
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}};
args_to_idx = {{"format", 1}};
}
/// For 3 arguments we support 2 possible variants:
/// s3(source, format, structure) and s3(source, access_key_id, access_key_id)
/// For 3 arguments we support 3 possible variants:
/// - s3(source, format, structure)
/// - s3(source, access_key_id, access_key_id)
/// - s3(source, NOSIGN, format)
/// We can distinguish them by looking at the 2-nd argument: check if it's a format name or not.
else if (args.size() == 3)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/access_key_id");
if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/access_key_id/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
{
no_sign_request = true;
args_to_idx = {{"format", 2}};
}
else if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
args_to_idx = {{"format", 1}, {"structure", 2}};
else
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}};
}
/// For 4 arguments we support 3 possible variants:
/// - s3(source, format, structure, compression_method),
/// - s3(source, access_key_id, access_key_id, format)
/// - s3(source, NOSIGN, format, structure)
/// We can distinguish them by looking at the 2-nd argument: check if it's a format name or not.
else if (args.size() == 4)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/access_key_id/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
{
no_sign_request = true;
args_to_idx = {{"format", 2}, {"structure", 3}};
}
else if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
args_to_idx = {{"format", 1}, {"structure", 2}, {"compression_method", 3}};
else
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}};
}
/// For 5 arguments we support 2 possible variants:
/// - s3(source, access_key_id, access_key_id, format, structure)
/// - s3(source, NOSIGN, format, structure, compression_method)
/// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN keyword name or not.
else if (args.size() == 5)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "NOSIGN/access_key_id");
if (boost::iequals(second_arg, "NOSIGN"))
{
no_sign_request = true;
args_to_idx = {{"format", 2}, {"structure", 3}, {"compression_method", 4}};
}
else
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}};
}
else
{
args_to_idx = size_to_args[args.size()];
@ -104,6 +146,8 @@ void TableFunctionS3::parseArgumentsImpl(
if (args_to_idx.contains("secret_access_key"))
s3_configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(args[args_to_idx["secret_access_key"]], "secret_access_key");
s3_configuration.auth_settings.no_sign_request = no_sign_request;
}
/// For DataLake table functions, we should specify default format.

View File

@ -35,5 +35,9 @@
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_parquet2>
<s3_json_no_sign>
<url>http://minio1:9001/root/test_cache4.jsonl</url>
<no_sign_request>true</no_sign_request>
</s3_json_no_sign>
</named_collections>
</clickhouse>

View File

@ -88,7 +88,10 @@ def started_cluster():
"AWS_ACCESS_KEY_ID": "aws",
"AWS_SECRET_ACCESS_KEY": "aws123",
},
main_configs=["configs/use_environment_credentials.xml"],
main_configs=[
"configs/use_environment_credentials.xml",
"configs/named_collections.xml",
],
)
logging.info("Starting cluster...")
@ -105,12 +108,39 @@ def started_cluster():
def test_with_invalid_environment_credentials(started_cluster):
auth = "'minio','minio123'"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["s3_with_invalid_environment_credentials"]
for bucket, auth in [
(started_cluster.minio_restricted_bucket, "'minio', 'minio123'"),
(started_cluster.minio_bucket, "NOSIGN"),
]:
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache4.jsonl', {auth}) select * from numbers(100) settings s3_truncate_on_insert=1"
)
with pytest.raises(helpers.client.QueryRuntimeException) as ei:
instance.query(
f"select count() from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache4.jsonl')"
)
assert ei.value.returncode == 243
assert "HTTP response code: 403" in ei.value.stderr
assert (
"100"
== instance.query(
f"select count() from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache4.jsonl', {auth})"
).strip()
)
def test_no_sign_named_collections(started_cluster):
instance = started_cluster.instances["s3_with_invalid_environment_credentials"]
bucket = started_cluster.minio_bucket
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache4.jsonl', {auth}) select * from numbers(100) settings s3_truncate_on_insert=1"
f"insert into function s3(s3_json_no_sign) select * from numbers(100) settings s3_truncate_on_insert=1"
)
with pytest.raises(helpers.client.QueryRuntimeException) as ei:
@ -121,9 +151,4 @@ def test_with_invalid_environment_credentials(started_cluster):
assert ei.value.returncode == 243
assert "HTTP response code: 403" in ei.value.stderr
assert (
"100"
== instance.query(
f"select count() from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache4.jsonl', {auth})"
).strip()
)
assert "100" == instance.query(f"select count() from s3(s3_json_no_sign)").strip()