Merge pull request #41261 from kssenii/s3-header-auth

Support s3 authorisation headers from ast arguments
This commit is contained in:
Kseniia Sumarokova 2022-09-23 12:48:08 +02:00 committed by GitHub
commit ea43cb5648
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 168 additions and 147 deletions

View File

@ -245,8 +245,8 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
credentials.setPassword(named_collection->configuration.password);
header_entries.reserve(named_collection->configuration.headers.size());
for (const auto & header : named_collection->configuration.headers)
header_entries.emplace_back(std::make_tuple(header.first, header.second.get<String>()));
for (const auto & [key, value] : named_collection->configuration.headers)
header_entries.emplace_back(std::make_tuple(key, value));
}
else
{

View File

@ -114,6 +114,7 @@ struct URI
bool is_virtual_hosted_style;
explicit URI(const Poco::URI & uri_);
explicit URI(const std::string & uri_) : URI(Poco::URI(uri_)) {}
static void validateBucket(const String & bucket, const Poco::URI & uri);
};

View File

@ -307,7 +307,8 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(
{
const auto header_prefix = headers_prefix + header;
configuration.headers.emplace_back(
std::make_pair(headers_config->getString(header_prefix + ".name"), headers_config->getString(header_prefix + ".value")));
headers_config->getString(header_prefix + ".name"),
headers_config->getString(header_prefix + ".value"));
}
}
@ -446,7 +447,9 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(const
for (const auto & header : header_keys)
{
const auto header_prefix = config_prefix + ".headers." + header;
configuration.headers.emplace_back(std::make_pair(config.getString(header_prefix + ".name"), config.getString(header_prefix + ".value")));
configuration.headers.emplace_back(
config.getString(header_prefix + ".name"),
config.getString(header_prefix + ".value"));
}
}
else

View File

@ -3,6 +3,7 @@
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Storages/StorageS3Settings.h>
#include <Storages/HeaderCollection.h>
namespace DB
@ -108,7 +109,7 @@ struct URLBasedDataSourceConfiguration
String user;
String password;
std::vector<std::pair<String, Field>> headers;
HeaderCollection headers;
String http_method;
void set(const URLBasedDataSourceConfiguration & conf);

View File

@ -0,0 +1,18 @@
#pragma once
#include <string>
namespace DB
{
struct HttpHeader
{
std::string name;
std::string value;
HttpHeader(const std::string & name_, const std::string & value_) : name(name_), value(value_) {}
inline bool operator==(const HttpHeader & other) const { return name == other.name && value == other.value; }
};
using HeaderCollection = std::vector<HttpHeader>;
}

View File

@ -27,6 +27,7 @@
#include <Storages/VirtualColumnUtils.h>
#include <Storages/getVirtualsForStorage.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageURL.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/WriteBufferFromS3.h>
@ -767,33 +768,28 @@ private:
StorageS3::StorageS3(
const S3::URI & uri_,
const String & access_key_id_,
const String & secret_access_key_,
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
const String & format_name_,
const S3Settings::ReadWriteSettings & rw_settings_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
const String & compression_method_,
bool distributed_processing_,
ASTPtr partition_by_)
: IStorage(table_id_)
, s3_configuration{uri_, access_key_id_, secret_access_key_, {}, {}, rw_settings_} /// Client and settings will be updated later
, keys({uri_.key})
, format_name(format_name_)
, compression_method(compression_method_)
, name(uri_.storage_name)
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
, keys({s3_configuration.uri.key})
, format_name(configuration_.format)
, compression_method(configuration_.compression_method)
, name(s3_configuration.uri.storage_name)
, distributed_processing(distributed_processing_)
, format_settings(format_settings_)
, partition_by(partition_by_)
, is_key_with_globs(uri_.key.find_first_of("*?{") != std::string::npos)
, is_key_with_globs(s3_configuration.uri.key.find_first_of("*?{") != std::string::npos)
{
FormatFactory::instance().checkFormatName(format_name);
context_->getGlobalContext()->getRemoteHostFilter().checkURL(uri_.uri);
context_->getGlobalContext()->getRemoteHostFilter().checkURL(s3_configuration.uri.uri);
StorageInMemoryMetadata storage_metadata;
updateS3Configuration(context_, s3_configuration);
@ -1062,47 +1058,48 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
void StorageS3::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configuration & upd)
{
auto settings = ctx->getStorageS3Settings().getSettings(upd.uri.uri.toString());
const auto & config_rw_settings = settings.rw_settings;
bool need_update_configuration = settings != S3Settings{};
if (need_update_configuration)
{
if (upd.rw_settings != settings.rw_settings)
upd.rw_settings = settings.rw_settings;
}
if (upd.rw_settings != config_rw_settings)
upd.rw_settings = settings.rw_settings;
upd.rw_settings.updateFromSettingsIfEmpty(ctx->getSettings());
if (upd.client && (!upd.access_key_id.empty() || settings.auth_settings == upd.auth_settings))
return;
Aws::Auth::AWSCredentials credentials(upd.access_key_id, upd.secret_access_key);
HeaderCollection headers;
if (upd.access_key_id.empty())
if (upd.client)
{
credentials = Aws::Auth::AWSCredentials(settings.auth_settings.access_key_id, settings.auth_settings.secret_access_key);
headers = settings.auth_settings.headers;
if (upd.static_configuration)
return;
if (settings.auth_settings == upd.auth_settings)
return;
}
upd.auth_settings.updateFrom(settings.auth_settings);
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
settings.auth_settings.region,
ctx->getRemoteHostFilter(), ctx->getGlobalContext()->getSettingsRef().s3_max_redirects,
upd.auth_settings.region,
ctx->getRemoteHostFilter(),
ctx->getGlobalContext()->getSettingsRef().s3_max_redirects,
ctx->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
/* for_disk_s3 = */ false);
client_configuration.endpointOverride = upd.uri.endpoint;
client_configuration.maxConnections = upd.rw_settings.max_connections;
auto credentials = Aws::Auth::AWSCredentials(upd.auth_settings.access_key_id, upd.auth_settings.secret_access_key);
auto headers = upd.auth_settings.headers;
if (!upd.headers_from_ast.empty())
headers.insert(headers.end(), upd.headers_from_ast.begin(), upd.headers_from_ast.end());
upd.client = S3::ClientFactory::instance().create(
client_configuration,
upd.uri.is_virtual_hosted_style,
credentials.GetAWSAccessKeyId(),
credentials.GetAWSSecretKey(),
settings.auth_settings.server_side_encryption_customer_key_base64,
upd.auth_settings.server_side_encryption_customer_key_base64,
std::move(headers),
settings.auth_settings.use_environment_credentials.value_or(ctx->getConfigRef().getBool("s3.use_environment_credentials", false)),
settings.auth_settings.use_insecure_imds_request.value_or(ctx->getConfigRef().getBool("s3.use_insecure_imds_request", false)));
upd.auth_settings = std::move(settings.auth_settings);
upd.auth_settings.use_environment_credentials.value_or(ctx->getConfigRef().getBool("s3.use_environment_credentials", false)),
upd.auth_settings.use_insecure_imds_request.value_or(ctx->getConfigRef().getBool("s3.use_insecure_imds_request", false)));
}
@ -1155,6 +1152,10 @@ StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPt
"Storage S3 requires 1 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method].",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
auto header_it = StorageURL::collectHeaders(engine_args, configuration, local_context);
if (header_it != engine_args.end())
engine_args.erase(header_it);
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context);
@ -1184,19 +1185,23 @@ StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPt
}
ColumnsDescription StorageS3::getTableStructureFromData(
const String & format,
const S3::URI & uri,
const String & access_key_id,
const String & secret_access_key,
const String & compression_method,
const StorageS3Configuration & configuration,
bool distributed_processing,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx,
std::unordered_map<String, S3::ObjectInfo> * object_infos)
{
S3Configuration s3_configuration{ uri, access_key_id, secret_access_key, {}, {}, S3Settings::ReadWriteSettings(ctx->getSettingsRef()) };
S3Configuration s3_configuration{
configuration.url,
configuration.auth_settings,
S3Settings::ReadWriteSettings(ctx->getSettingsRef()),
configuration.headers};
updateS3Configuration(ctx, s3_configuration);
return getTableStructureFromDataImpl(format, s3_configuration, compression_method, distributed_processing, uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, nullptr, object_infos);
return getTableStructureFromDataImpl(
configuration.format, s3_configuration, configuration.compression_method, distributed_processing,
s3_configuration.uri.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, nullptr, object_infos);
}
ColumnsDescription StorageS3::getTableStructureFromDataImpl(
@ -1308,25 +1313,18 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
format_settings = getFormatSettings(args.getContext());
}
S3::URI s3_uri(Poco::URI(configuration.url));
ASTPtr partition_by;
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
return std::make_shared<StorageS3>(
s3_uri,
configuration.auth_settings.access_key_id,
configuration.auth_settings.secret_access_key,
configuration,
args.table_id,
configuration.format,
configuration.rw_settings,
args.columns,
args.constraints,
args.comment,
args.getContext(),
format_settings,
configuration.compression_method,
/* distributed_processing_ */false,
partition_by);
},

View File

@ -149,18 +149,13 @@ class StorageS3 : public IStorage, WithContext
{
public:
StorageS3(
const S3::URI & uri,
const String & access_key_id,
const String & secret_access_key,
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
const String & format_name_,
const S3Settings::ReadWriteSettings & rw_settings_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
const String & compression_method_ = "",
bool distributed_processing_ = false,
ASTPtr partition_by_ = nullptr);
@ -189,11 +184,7 @@ public:
static StorageS3Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context);
static ColumnsDescription getTableStructureFromData(
const String & format,
const S3::URI & uri,
const String & access_key_id,
const String & secret_access_key,
const String & compression_method,
const StorageS3Configuration & configuration,
bool distributed_processing,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx,
@ -204,11 +195,28 @@ public:
struct S3Configuration
{
const S3::URI uri;
const String access_key_id;
const String secret_access_key;
std::shared_ptr<const Aws::S3::S3Client> client;
S3Settings::AuthSettings auth_settings;
S3Settings::ReadWriteSettings rw_settings;
/// If s3 configuration was passed from ast, then it is static.
/// If from config - it can be changed with config reload.
bool static_configuration = true;
/// Headers from ast is a part of static configuration.
HeaderCollection headers_from_ast;
S3Configuration(
const String & url_,
const S3Settings::AuthSettings & auth_settings_,
const S3Settings::ReadWriteSettings & rw_settings_,
const HeaderCollection & headers_from_ast_)
: uri(S3::URI(url_))
, auth_settings(auth_settings_)
, rw_settings(rw_settings_)
, static_configuration(!auth_settings_.access_key_id.empty())
, headers_from_ast(headers_from_ast_) {}
};
static SchemaCache & getSchemaCache(const ContextPtr & ctx);

View File

@ -46,22 +46,17 @@
namespace DB
{
StorageS3Cluster::StorageS3Cluster(
const String & filename_,
const String & access_key_id_,
const String & secret_access_key_,
const StorageS3ClusterConfiguration & configuration_,
const StorageID & table_id_,
String cluster_name_,
const String & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_,
const String & compression_method_)
ContextPtr context_)
: IStorage(table_id_)
, s3_configuration{S3::URI{Poco::URI{filename_}}, access_key_id_, secret_access_key_, {}, {}, S3Settings::ReadWriteSettings(context_->getSettingsRef())}
, filename(filename_)
, cluster_name(cluster_name_)
, format_name(format_name_)
, compression_method(compression_method_)
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
, filename(configuration_.url)
, cluster_name(configuration_.cluster_name)
, format_name(configuration_.format)
, compression_method(configuration_.compression_method)
{
context_->getGlobalContext()->getRemoteHostFilter().checkURL(Poco::URI{filename});
StorageInMemoryMetadata storage_metadata;

View File

@ -21,16 +21,11 @@ class StorageS3Cluster : public IStorage
{
public:
StorageS3Cluster(
const String & filename_,
const String & access_key_id_,
const String & secret_access_key_,
const StorageS3ClusterConfiguration & configuration_,
const StorageID & table_id_,
String cluster_name_,
const String & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_,
const String & compression_method_);
ContextPtr context_);
std::string getName() const override { return "S3Cluster"; }

View File

@ -7,6 +7,7 @@
#include <vector>
#include <base/types.h>
#include <Interpreters/Context_fwd.h>
#include <Storages/HeaderCollection.h>
namespace Poco::Util
{
@ -15,15 +16,6 @@ class AbstractConfiguration;
namespace DB
{
struct HttpHeader
{
String name;
String value;
inline bool operator==(const HttpHeader & other) const { return name == other.name && value == other.value; }
};
using HeaderCollection = std::vector<HttpHeader>;
struct Settings;
@ -50,6 +42,21 @@ struct S3Settings
&& use_environment_credentials == other.use_environment_credentials
&& use_insecure_imds_request == other.use_insecure_imds_request;
}
void updateFrom(const AuthSettings & from)
{
/// Update with check for emptyness only parameters which
/// can be passed not only from config, but via ast.
if (!from.access_key_id.empty())
access_key_id = from.access_key_id;
if (!from.secret_access_key.empty())
secret_access_key = from.secret_access_key;
headers = from.headers;
region = from.region;
server_side_encryption_customer_key_base64 = from.server_side_encryption_customer_key_base64;
}
};
struct ReadWriteSettings
@ -94,7 +101,6 @@ struct S3Settings
class StorageS3Settings
{
public:
StorageS3Settings() = default;
void loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config, const Settings & settings);
S3Settings getSettings(const String & endpoint) const;

View File

@ -1018,7 +1018,7 @@ ASTs::iterator StorageURL::collectHeaders(
if (arg_value.getType() != Field::Types::Which::String)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected string as header value");
configuration.headers.emplace_back(arg_name, arg_value);
configuration.headers.emplace_back(arg_name, arg_value.safeGet<String>());
}
headers_it = arg_it;
@ -1096,10 +1096,9 @@ void registerStorageURL(StorageFactory & factory)
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
for (const auto & [header, value] : configuration.headers)
{
auto value_literal = value.safeGet<String>();
if (header == "Range")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
headers.emplace_back(std::make_pair(header, value_literal));
headers.emplace_back(header, value);
}
ASTPtr partition_by;

View File

@ -12,6 +12,7 @@
#include <Parsers/ASTLiteral.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageS3.h>
#include <Storages/StorageURL.h>
#include <Formats/FormatFactory.h>
#include "registerTableFunctions.h"
#include <filesystem>
@ -40,6 +41,10 @@ void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & ar
if (args.empty() || args.size() > 6)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, error_message);
auto header_it = StorageURL::collectHeaders(args, s3_configuration, context);
if (header_it != args.end())
args.erase(header_it);
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
@ -135,15 +140,7 @@ ColumnsDescription TableFunctionS3::getActualTableStructure(ContextPtr context)
if (configuration.structure == "auto")
{
context->checkAccess(getSourceAccessType());
return StorageS3::getTableStructureFromData(
configuration.format,
S3::URI(Poco::URI(configuration.url)),
configuration.auth_settings.access_key_id,
configuration.auth_settings.secret_access_key,
configuration.compression_method,
false,
std::nullopt,
context);
return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
}
return parseColumnsListFromString(configuration.structure, context);
@ -161,19 +158,14 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context
columns = structure_hint;
StoragePtr storage = std::make_shared<StorageS3>(
s3_uri,
configuration.auth_settings.access_key_id,
configuration.auth_settings.secret_access_key,
configuration,
StorageID(getDatabaseName(), table_name),
configuration.format,
configuration.rw_settings,
columns,
ConstraintsDescription{},
String{},
context,
/// No format_settings for table function S3
std::nullopt,
configuration.compression_method);
std::nullopt);
storage->startup();

View File

@ -82,19 +82,10 @@ void TableFunctionS3Cluster::parseArguments(const ASTPtr & ast_function, Context
ColumnsDescription TableFunctionS3Cluster::getActualTableStructure(ContextPtr context) const
{
context->checkAccess(getSourceAccessType());
if (configuration.structure == "auto")
{
context->checkAccess(getSourceAccessType());
return StorageS3::getTableStructureFromData(
configuration.format,
S3::URI(Poco::URI(configuration.url)),
configuration.auth_settings.access_key_id,
configuration.auth_settings.secret_access_key,
configuration.compression_method,
false,
std::nullopt,
context);
}
return StorageS3::getTableStructureFromData(configuration, false, std::nullopt, context);
return parseColumnsListFromString(configuration.structure, context);
}
@ -104,46 +95,38 @@ StoragePtr TableFunctionS3Cluster::executeImpl(
const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
StoragePtr storage;
ColumnsDescription columns;
if (configuration.structure != "auto")
{
columns = parseColumnsListFromString(configuration.structure, context);
}
else if (!structure_hint.empty())
{
columns = structure_hint;
}
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
{
/// On worker node this filename won't contains globs
Poco::URI uri (configuration.url);
S3::URI s3_uri (uri);
storage = std::make_shared<StorageS3>(
s3_uri,
configuration.auth_settings.access_key_id,
configuration.auth_settings.secret_access_key,
configuration,
StorageID(getDatabaseName(), table_name),
configuration.format,
configuration.rw_settings,
columns,
ConstraintsDescription{},
String{},
/* comment */String{},
context,
// No format_settings for S3Cluster
std::nullopt,
configuration.compression_method,
/* format_settings */std::nullopt, /// No format_settings for S3Cluster
/*distributed_processing=*/true);
}
else
{
storage = std::make_shared<StorageS3Cluster>(
configuration.url,
configuration.auth_settings.access_key_id,
configuration.auth_settings.secret_access_key,
configuration,
StorageID(getDatabaseName(), table_name),
configuration.cluster_name, configuration.format,
columns,
ConstraintsDescription{},
context,
configuration.compression_method);
context);
}
storage->startup();

View File

@ -103,10 +103,9 @@ ReadWriteBufferFromHTTP::HTTPHeaderEntries TableFunctionURL::getHeaders() const
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
for (const auto & [header, value] : configuration.headers)
{
auto value_literal = value.safeGet<String>();
if (header == "Range")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
headers.emplace_back(std::make_pair(header, value_literal));
headers.emplace_back(header, value);
}
return headers;
}

View File

@ -110,6 +110,10 @@ def started_cluster():
main_configs=["configs/defaultS3.xml"],
user_configs=["configs/s3_max_redirects.xml"],
)
cluster.add_instance(
"s3_non_default",
with_minio=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
@ -1689,3 +1693,22 @@ def test_schema_inference_cache(started_cluster):
test("s3")
test("url")
def test_ast_auth_headers(started_cluster):
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["s3_non_default"] # type: ClickHouseInstance
filename = "test.csv"
result = instance.query_and_get_error(
f"select count() from s3('http://resolver:8080/{bucket}/{filename}', 'CSV')"
)
assert "Forbidden Error" in result
assert "S3_ERROR" in result
result = instance.query(
f"select * from s3('http://resolver:8080/{bucket}/{filename}', 'CSV', headers(Authorization=`Bearer TOKEN`))"
)
assert result.strip() == "1\t2\t3"