s3, url, http headers

This commit is contained in:
kssenii 2021-09-07 14:17:25 +03:00
parent a1e4d2e230
commit 3deb9a0ecb
11 changed files with 347 additions and 134 deletions

View File

@ -20,16 +20,30 @@ namespace ErrorCodes
String ExternalDataSourceConfiguration::toString() const
{
WriteBufferFromOwnString configuration_info;
configuration_info << "host: " << host << "\t";
configuration_info << "port: " << port << "\t";
configuration_info << "username: " << username;
configuration_info << "username: " << username << "\t";
if (addresses.empty())
{
configuration_info << "host: " << host << "\t";
configuration_info << "port: " << port << "\t";
}
else
{
for (const auto & [replica_host, replica_port] : addresses)
{
configuration_info << "host: " << replica_host << "\t";
configuration_info << "port: " << replica_port << "\t";
}
}
return configuration_info.str();
}
std::tuple<ExternalDataSourceConfiguration, EngineArgs, bool>
getExternalDataSourceConfiguration(ASTs args, ContextPtr context, bool is_database_engine)
getExternalDataSourceConfiguration(const ASTs & args, ContextPtr context, bool is_database_engine)
{
if (args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
ExternalDataSourceConfiguration configuration;
EngineArgs non_common_args;
@ -189,4 +203,80 @@ ExternalDataSourcesByPriority getExternalDataSourceConfigurationByPriority(
return configuration;
}
std::tuple<URLBasedDataSourceConfiguration, EngineArgs, bool>
getURLBasedDataSourceConfiguration(const ASTs & args, ContextPtr context)
{
if (args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
URLBasedDataSourceConfiguration configuration;
EngineArgs non_common_args;
if (const auto * collection = typeid_cast<const ASTIdentifier *>(args[0].get()))
{
const auto & config = context->getConfigRef();
auto config_prefix = fmt::format("named_collections.{}", collection->name());
if (!config.has(config_prefix))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection->name());
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
for (const auto & key : keys)
{
if (key == "url")
configuration.url = config.getString(config_prefix + ".url", "");
if (key == "headers")
{
Poco::Util::AbstractConfiguration::Keys header_keys;
config.keys(config_prefix + '.' + "headers", header_keys);
for (const auto & header : header_keys)
{
const auto header_prefix = config_prefix + ".headers." + header;
configuration.headers.emplace_back(std::make_pair(config.getString(header_prefix + ".name"), config.getString(header_prefix + ".value")));
}
}
else
non_common_args.emplace_back(std::make_pair(key, config.getString(config_prefix + '.' + key)));
}
for (size_t i = 1; i < args.size(); ++i)
{
if (const auto * ast_function = typeid_cast<const ASTFunction *>(args[i].get()))
{
const auto * args_expr = assert_cast<const ASTExpressionList *>(ast_function->arguments.get());
auto function_args = args_expr->children;
if (function_args.size() != 2)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument");
auto arg_name = function_args[0]->as<ASTIdentifier>()->name();
auto arg_value = evaluateConstantExpressionOrIdentifierAsLiteral(function_args[1], context)->as<ASTLiteral>()->value;
if (arg_name == "url")
configuration.url = arg_value.safeGet<String>();
else if (arg_name == "format")
configuration.format = arg_value.safeGet<String>();
else if (arg_name == "compression_method")
configuration.compression_method = arg_value.safeGet<String>();
else if (arg_name == "structure")
configuration.structure = arg_value.safeGet<String>();
else
non_common_args.emplace_back(std::make_pair(arg_name, arg_value));
}
else
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument");
}
}
if (configuration.url.empty() || configuration.format.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Storage requires {}", configuration.url.empty() ? "uri" : "format");
return std::make_tuple(configuration, non_common_args, true);
}
return std::make_tuple(configuration, non_common_args, false);
}
}

View File

@ -5,10 +5,6 @@
#include <Poco/Util/AbstractConfiguration.h>
namespace ExternalDataSource
{
}
namespace DB
{
@ -22,6 +18,8 @@ struct ExternalDataSourceConfiguration
String table;
String schema;
std::vector<std::pair<String, UInt16>> addresses; /// Failover replicas.
String toString() const;
};
@ -30,14 +28,10 @@ using ExternalDataSourceConfigurationPtr = std::shared_ptr<ExternalDataSourceCon
struct StoragePostgreSQLConfiguration : ExternalDataSourceConfiguration
{
explicit StoragePostgreSQLConfiguration(
const ExternalDataSourceConfiguration & common_configuration,
const std::vector<std::pair<String, UInt16>> & addresses_ = {})
: ExternalDataSourceConfiguration(common_configuration)
, addresses(addresses_) {}
explicit StoragePostgreSQLConfiguration(const ExternalDataSourceConfiguration & common_configuration)
: ExternalDataSourceConfiguration(common_configuration) {}
String on_conflict;
std::vector<std::pair<String, UInt16>> addresses; /// Failover replicas.
};
@ -48,7 +42,6 @@ struct StorageMySQLConfiguration : ExternalDataSourceConfiguration
bool replace_query = false;
String on_duplicate_clause;
std::vector<std::pair<String, UInt16>> addresses; /// Failover replicas.
};
struct StorageMongoDBConfiguration : ExternalDataSourceConfiguration
@ -74,7 +67,7 @@ using EngineArgs = std::vector<std::pair<String, DB::Field>>;
* i.e. storage-specific arguments, then return them back in a set: ExternalDataSource::EngineArgs.
*/
std::tuple<ExternalDataSourceConfiguration, EngineArgs, bool>
getExternalDataSourceConfiguration(ASTs args, ContextPtr context, bool is_database_engine = false);
getExternalDataSourceConfiguration(const ASTs & args, ContextPtr context, bool is_database_engine = false);
ExternalDataSourceConfiguration getExternalDataSourceConfiguration(
const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context);
@ -94,4 +87,27 @@ struct ExternalDataSourcesByPriority
ExternalDataSourcesByPriority
getExternalDataSourceConfigurationByPriority(const Poco::Util::AbstractConfiguration & dict_config, const String & dict_config_prefix, ContextPtr context);
struct URLBasedDataSourceConfiguration
{
String url;
String format;
String compression_method = "auto";
String structure;
std::vector<std::pair<String, Field>> headers;
};
struct StorageS3Configuration : URLBasedDataSourceConfiguration
{
explicit StorageS3Configuration(const URLBasedDataSourceConfiguration & common_configuration)
: URLBasedDataSourceConfiguration(common_configuration) {}
String access_key_id;
String secret_access_key;
};
std::tuple<URLBasedDataSourceConfiguration, EngineArgs, bool>
getURLBasedDataSourceConfiguration(const ASTs & args, ContextPtr context);
}

View File

@ -96,10 +96,11 @@ StorageExternalDistributed::StorageExternalDistributed(
.username = username,
.password = password,
.database = remote_database,
.addresses = addresses
};
auto pool = std::make_shared<postgres::PoolWithFailover>(
StoragePostgreSQLConfiguration(configuration, addresses),
StoragePostgreSQLConfiguration(configuration),
context->getSettingsRef().postgresql_connection_pool_size,
context->getSettingsRef().postgresql_connection_pool_wait_timeout);

View File

@ -732,20 +732,68 @@ void StorageS3::updateClientAndAuthSettings(ContextPtr ctx, StorageS3::ClientAut
upd.auth_settings = std::move(settings);
}
void registerStorageS3Impl(const String & name, StorageFactory & factory)
{
factory.registerStorage(name, [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPtr local_context)
{
auto [common_configuration, storage_specific_args, with_named_collection] = getURLBasedDataSourceConfiguration(engine_args, local_context);
StorageS3Configuration configuration(common_configuration);
if (with_named_collection)
{
for (const auto & [arg_name, arg_value] : storage_specific_args)
{
if (arg_name == "access_key_id")
configuration.access_key_id = arg_value.safeGet<String>();
else if (arg_name == "secret_access_key")
configuration.secret_access_key = arg_value.safeGet<String>();
else
throw Exception(
"Storage S3 requires 2 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method].",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
}
else
{
if (engine_args.size() < 2 || engine_args.size() > 5)
throw Exception(
"Storage S3 requires 2 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method].",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.getLocalContext());
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context);
String url = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
if (engine_args.size() >= 4)
{
configuration.access_key_id = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
configuration.secret_access_key = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
}
if (engine_args.size() == 3 || engine_args.size() == 5)
{
configuration.compression_method = engine_args.back()->as<ASTLiteral &>().value.safeGet<String>();
configuration.format = engine_args[engine_args.size() - 2]->as<ASTLiteral &>().value.safeGet<String>();
}
else
{
configuration.compression_method = "auto";
configuration.format = engine_args.back()->as<ASTLiteral &>().value.safeGet<String>();
}
}
return configuration;
}
void registerStorageS3Impl(const String & name, StorageFactory & factory)
{
factory.registerStorage(name, [](const StorageFactory::Arguments & args)
{
auto & engine_args = args.engine_args;
if (engine_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
auto configuration = StorageS3::getConfiguration(engine_args, args.getContext());
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
@ -760,9 +808,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
for (const auto & change : changes)
{
if (user_format_settings.has(change.name))
{
user_format_settings.set(change.name, change.value);
}
}
// Apply changes from SETTINGS clause, with validation.
@ -774,42 +820,18 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
format_settings = getFormatSettings(args.getContext());
}
String url = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
Poco::URI uri (url);
S3::URI s3_uri (uri);
String access_key_id;
String secret_access_key;
if (engine_args.size() >= 4)
{
access_key_id = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
secret_access_key = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
}
UInt64 max_single_read_retries = args.getLocalContext()->getSettingsRef().s3_max_single_read_retries;
UInt64 min_upload_part_size = args.getLocalContext()->getSettingsRef().s3_min_upload_part_size;
UInt64 max_single_part_upload_size = args.getLocalContext()->getSettingsRef().s3_max_single_part_upload_size;
UInt64 max_connections = args.getLocalContext()->getSettingsRef().s3_max_connections;
String compression_method;
String format_name;
if (engine_args.size() == 3 || engine_args.size() == 5)
{
compression_method = engine_args.back()->as<ASTLiteral &>().value.safeGet<String>();
format_name = engine_args[engine_args.size() - 2]->as<ASTLiteral &>().value.safeGet<String>();
}
else
{
compression_method = "auto";
format_name = engine_args.back()->as<ASTLiteral &>().value.safeGet<String>();
}
S3::URI s3_uri(Poco::URI(configuration.url));
auto max_single_read_retries = args.getLocalContext()->getSettingsRef().s3_max_single_read_retries;
auto min_upload_part_size = args.getLocalContext()->getSettingsRef().s3_min_upload_part_size;
auto max_single_part_upload_size = args.getLocalContext()->getSettingsRef().s3_max_single_part_upload_size;
auto max_connections = args.getLocalContext()->getSettingsRef().s3_max_connections;
return StorageS3::create(
s3_uri,
access_key_id,
secret_access_key,
configuration.access_key_id,
configuration.secret_access_key,
args.table_id,
format_name,
configuration.format,
max_single_read_retries,
min_upload_part_size,
max_single_part_upload_size,
@ -819,7 +841,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
args.comment,
args.getContext(),
format_settings,
compression_method);
configuration.compression_method);
},
{
.supports_settings = true,

View File

@ -18,6 +18,7 @@
#include <IO/S3Common.h>
#include <IO/CompressionMethod.h>
#include <Interpreters/Context.h>
#include <Storages/ExternalDataSourceConfiguration.h>
namespace Aws::S3
{
@ -141,6 +142,8 @@ public:
bool supportsPartitionBy() const override;
static StorageS3Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context);
private:
friend class StorageS3Cluster;

View File

@ -6,7 +6,6 @@
#include <Parsers/ASTLiteral.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/WriteBufferFromHTTP.h>
#include <IO/WriteHelpers.h>
#include <IO/ConnectionTimeouts.h>
@ -43,8 +42,9 @@ IStorageURLBase::IStorageURLBase(
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,
const String & compression_method_)
: IStorage(table_id_), uri(uri_), compression_method(compression_method_), format_name(format_name_), format_settings(format_settings_)
const String & compression_method_,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_)
: IStorage(table_id_), uri(uri_), compression_method(compression_method_), format_name(format_name_), format_settings(format_settings_), headers(headers_)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
@ -69,10 +69,14 @@ namespace
const ColumnsDescription & columns,
UInt64 max_block_size,
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method)
const CompressionMethod compression_method,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {})
: SourceWithProgress(sample_block), name(std::move(name_))
{
ReadWriteBufferFromHTTP::HTTPHeaderEntries header;
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
for (const auto & header : headers_)
headers.emplace_back(header);
// Propagate OpenTelemetry trace context, if any, downstream.
if (CurrentThread::isInitialized())
@ -80,12 +84,12 @@ namespace
const auto & thread_trace_context = CurrentThread::get().thread_trace_context;
if (thread_trace_context.trace_id != UUID())
{
header.emplace_back("traceparent",
headers.emplace_back("traceparent",
thread_trace_context.composeTraceparentHeader());
if (!thread_trace_context.tracestate.empty())
{
header.emplace_back("tracestate",
headers.emplace_back("tracestate",
thread_trace_context.tracestate);
}
}
@ -100,7 +104,7 @@ namespace
context->getSettingsRef().max_http_get_redirects,
Poco::Net::HTTPBasicCredentials{},
DBMS_DEFAULT_BUFFER_SIZE,
header,
headers,
context->getRemoteHostFilter()),
compression_method);
@ -237,7 +241,8 @@ Pipe IStorageURLBase::read(
metadata_snapshot->getColumns(),
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
chooseCompressionMethod(request_uri.getPath(), compression_method)));
chooseCompressionMethod(request_uri.getPath(), compression_method),
headers));
}
@ -312,8 +317,9 @@ StorageURL::StorageURL(
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
const String & compression_method_)
: IStorageURLBase(uri_, context_, table_id_, format_name_, format_settings_, columns_, constraints_, comment, compression_method_)
const String & compression_method_,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_)
: IStorageURLBase(uri_, context_, table_id_, format_name_, format_settings_, columns_, constraints_, comment, compression_method_, headers_)
{
context_->getRemoteHostFilter().checkURL(uri);
}
@ -375,45 +381,62 @@ FormatSettings StorageURL::getFormatSettingsFromArgs(const StorageFactory::Argum
return format_settings;
}
URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, ContextPtr local_context)
{
auto [common_configuration, storage_specific_args, with_named_collection] = getURLBasedDataSourceConfiguration(args, local_context);
URLBasedDataSourceConfiguration configuration(common_configuration);
if (with_named_collection)
{
if (!storage_specific_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Illegal arguments");
}
else
{
if (args.size() != 2 && args.size() != 3)
throw Exception(
"Storage URL requires 2 or 3 arguments: url, name of used format and optional compression method.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, local_context);
configuration.url = args[0]->as<ASTLiteral &>().value.safeGet<String>();
configuration.format = args[1]->as<ASTLiteral &>().value.safeGet<String>();
if (args.size() == 3)
configuration.compression_method = args[2]->as<ASTLiteral &>().value.safeGet<String>();
}
return configuration;
}
void registerStorageURL(StorageFactory & factory)
{
factory.registerStorage("URL", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 2 && engine_args.size() != 3)
throw Exception(
"Storage URL requires 2 or 3 arguments: url, name of used format and optional compression method.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.getLocalContext());
const String & url = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
Poco::URI uri(url);
engine_args[1] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[1], args.getLocalContext());
const String & format_name = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
String compression_method = "auto";
if (engine_args.size() == 3)
{
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.getLocalContext());
compression_method = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
}
auto configuration = StorageURL::getConfiguration(engine_args, args.getContext());
auto format_settings = StorageURL::getFormatSettingsFromArgs(args);
Poco::URI uri(configuration.url);
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
for (const auto & [header, value] : configuration.headers)
{
auto value_literal = value.safeGet<String>();
headers.emplace_back(std::make_pair(header, value_literal));
}
return StorageURL::create(
uri,
args.table_id,
format_name,
configuration.format,
format_settings,
args.columns,
args.constraints,
args.comment,
args.getContext(),
compression_method);
configuration.compression_method,
headers);
},
{
.supports_settings = true,

View File

@ -6,7 +6,9 @@
#include <Processors/Sinks/SinkToStorage.h>
#include <Formats/FormatSettings.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Storages/StorageFactory.h>
#include <Storages/ExternalDataSourceConfiguration.h>
namespace DB
@ -44,7 +46,8 @@ protected:
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,
const String & compression_method_);
const String & compression_method_,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {});
Poco::URI uri;
String compression_method;
@ -54,6 +57,7 @@ protected:
// For `url` table function, we use settings from current query context.
// In this case, format_settings is not set.
std::optional<FormatSettings> format_settings;
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
virtual std::string getReadMethod() const;
@ -113,7 +117,8 @@ public:
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
const String & compression_method_);
const String & compression_method_,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {});
String getName() const override
{
@ -126,6 +131,8 @@ public:
}
static FormatSettings getFormatSettingsFromArgs(const StorageFactory::Arguments & args);
static URLBasedDataSourceConfiguration getConfiguration(ASTs & args, ContextPtr context);
};
@ -152,6 +159,13 @@ public:
size_t max_block_size,
unsigned num_streams) override;
struct Configuration
{
String url;
String compression_method = "auto";
std::vector<std::pair<String, String>> headers;
};
private:
std::vector<Poco::URI> uri_options;
};

View File

@ -3,13 +3,13 @@
#if USE_AWS_S3
#include <IO/S3Common.h>
#include <Storages/StorageS3.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionS3.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/StorageS3.h>
#include "registerTableFunctions.h"
@ -38,51 +38,72 @@ void TableFunctionS3::parseArguments(const ASTPtr & ast_function, ContextPtr con
throw Exception("Table function '" + getName() + "' must have arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = args_func.at(0)->children;
auto [common_configuration, storage_specific_args, with_named_collection] = getURLBasedDataSourceConfiguration(args, context);
StorageS3Configuration configuration(common_configuration);
if (args.size() < 3 || args.size() > 6)
throw Exception(message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
/// Size -> argument indexes
static auto size_to_args = std::map<size_t, std::map<String, size_t>>
if (with_named_collection)
{
{3, {{"format", 1}, {"structure", 2}}},
{4, {{"format", 1}, {"structure", 2}, {"compression_method", 3}}},
{5, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}}},
{6, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}, {"compression_method", 5}}}
};
for (const auto & [arg_name, arg_value] : storage_specific_args)
{
if (arg_name == "access_key_id")
configuration.access_key_id = arg_value.safeGet<String>();
else if (arg_name == "secret_access_key")
configuration.secret_access_key = arg_value.safeGet<String>();
else
throw Exception(
"Storage S3 requires 2 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method].",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
}
else
{
if (args.size() < 3 || args.size() > 6)
throw Exception(message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
/// This argument is always the first
filename = args[0]->as<ASTLiteral &>().value.safeGet<String>();
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
auto & args_to_idx = size_to_args[args.size()];
/// Size -> argument indexes
static auto size_to_args = std::map<size_t, std::map<String, size_t>>
{
{3, {{"format", 1}, {"structure", 2}}},
{4, {{"format", 1}, {"structure", 2}, {"compression_method", 3}}},
{5, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}}},
{6, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}, {"compression_method", 5}}}
};
if (args_to_idx.contains("format"))
format = args[args_to_idx["format"]]->as<ASTLiteral &>().value.safeGet<String>();
/// This argument is always the first
configuration.url = args[0]->as<ASTLiteral &>().value.safeGet<String>();
if (args_to_idx.contains("structure"))
structure = args[args_to_idx["structure"]]->as<ASTLiteral &>().value.safeGet<String>();
auto & args_to_idx = size_to_args[args.size()];
if (args_to_idx.contains("compression_method"))
compression_method = args[args_to_idx["compression_method"]]->as<ASTLiteral &>().value.safeGet<String>();
if (args_to_idx.contains("format"))
configuration.format = args[args_to_idx["format"]]->as<ASTLiteral &>().value.safeGet<String>();
if (args_to_idx.contains("access_key_id"))
access_key_id = args[args_to_idx["access_key_id"]]->as<ASTLiteral &>().value.safeGet<String>();
if (args_to_idx.contains("structure"))
configuration.structure = args[args_to_idx["structure"]]->as<ASTLiteral &>().value.safeGet<String>();
if (args_to_idx.contains("secret_access_key"))
secret_access_key = args[args_to_idx["secret_access_key"]]->as<ASTLiteral &>().value.safeGet<String>();
if (args_to_idx.contains("compression_method"))
configuration.compression_method = args[args_to_idx["compression_method"]]->as<ASTLiteral &>().value.safeGet<String>();
if (args_to_idx.contains("access_key_id"))
configuration.access_key_id = args[args_to_idx["access_key_id"]]->as<ASTLiteral &>().value.safeGet<String>();
if (args_to_idx.contains("secret_access_key"))
configuration.secret_access_key = args[args_to_idx["secret_access_key"]]->as<ASTLiteral &>().value.safeGet<String>();
}
s3_configuration = std::move(configuration);
}
ColumnsDescription TableFunctionS3::getActualTableStructure(ContextPtr context) const
{
return parseColumnsListFromString(structure, context);
return parseColumnsListFromString(s3_configuration->structure, context);
}
StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
Poco::URI uri (filename);
Poco::URI uri (s3_configuration->url);
S3::URI s3_uri (uri);
UInt64 max_single_read_retries = context->getSettingsRef().s3_max_single_read_retries;
UInt64 min_upload_part_size = context->getSettingsRef().s3_min_upload_part_size;
@ -91,10 +112,10 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context
StoragePtr storage = StorageS3::create(
s3_uri,
access_key_id,
secret_access_key,
s3_configuration->access_key_id,
s3_configuration->secret_access_key,
StorageID(getDatabaseName(), table_name),
format,
s3_configuration->format,
max_single_read_retries,
min_upload_part_size,
max_single_part_upload_size,
@ -105,7 +126,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context
context,
/// No format_settings for table function S3
std::nullopt,
compression_method);
s3_configuration->compression_method);
storage->startup();

View File

@ -5,6 +5,7 @@
#if USE_AWS_S3
#include <TableFunctions/ITableFunction.h>
#include <Storages/ExternalDataSourceConfiguration.h>
namespace DB
@ -36,12 +37,7 @@ protected:
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
String filename;
String format;
String structure;
String access_key_id;
String secret_access_key;
String compression_method = "auto";
std::optional<StorageS3Configuration> s3_configuration;
};
class TableFunctionCOS : public TableFunctionS3

View File

@ -0,0 +1,16 @@
<yandex>
<named_collections>
<url1>
<headers>
<header>
<name>Range</name>
<value>bytes=0-1</value>
</header>
<header>
<name>Access-Control-Request-Method</name>
<value>PUT</value>
</header>
</headers>
</url1>
</named_collections>
</yandex>

View File

@ -2,7 +2,7 @@ import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=False, with_hdfs=True)
node1 = cluster.add_instance('node1', main_configs=['configs/named_collections.xml'], with_zookeeper=False, with_hdfs=True)
@pytest.fixture(scope="module")
@ -81,3 +81,14 @@ def test_url_with_redirect_allowed(started_cluster):
node1.query(
"create table WebHDFSStorageWithRedirect (id UInt32, name String, weight Float64) ENGINE = URL('http://hdfs1:50070/webhdfs/v1/simple_storage?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', 'TSV')")
assert node1.query("SET max_http_get_redirects=1; select * from WebHDFSStorageWithRedirect") == "1\tMark\t72.53\n"
def test_predefined_connection_configuration(started_cluster):
hdfs_api = started_cluster.hdfs_api
hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
node1.query(
"create table WebHDFSStorageWithRedirect (id UInt32, name String, weight Float64) ENGINE = URL(url1, url='http://hdfs1:50070/webhdfs/v1/simple_storage?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', format='TSV')")
assert node1.query("SET max_http_get_redirects=1; select * from WebHDFSStorageWithRedirect") == "1\tMark\t72.53\n"