mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-02 04:22:03 +00:00
Better
This commit is contained in:
parent
dfd19576cd
commit
f85030f481
@ -66,7 +66,7 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
|
||||
}
|
||||
|
||||
const auto & settings = Context::getGlobalContextInstance()->getSettingsRef();
|
||||
auto auth_settings = S3::AuthSettings(config, settings, true, config_prefix);
|
||||
auto auth_settings = S3::AuthSettings(config, settings, config_prefix);
|
||||
|
||||
String endpoint = macros->expand(config.getString(config_prefix + ".endpoint"));
|
||||
auto new_uri = S3::URI{endpoint};
|
||||
|
@ -146,7 +146,7 @@ struct ContextSharedPart : boost::noncopyable
|
||||
mutable ThrottlerPtr local_read_throttler; /// A server-wide throttler for local IO reads
|
||||
mutable ThrottlerPtr local_write_throttler; /// A server-wide throttler for local IO writes
|
||||
|
||||
std::optional<StorageS3Settings> storage_s3_settings TSA_GUARDED_BY(mutex); /// Settings of S3 storage
|
||||
std::optional<S3SettingsByEndpoint> storage_s3_settings TSA_GUARDED_BY(mutex); /// Settings of S3 storage
|
||||
|
||||
mutable std::mutex keeper_dispatcher_mutex;
|
||||
mutable std::shared_ptr<KeeperDispatcher> keeper_dispatcher TSA_GUARDED_BY(keeper_dispatcher_mutex);
|
||||
@ -455,14 +455,14 @@ std::shared_ptr<zkutil::ZooKeeper> Context::getZooKeeper() const
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot connect to ZooKeeper from Keeper");
|
||||
}
|
||||
|
||||
const StorageS3Settings & Context::getStorageS3Settings() const
|
||||
const S3SettingsByEndpoint & Context::getStorageS3Settings() const
|
||||
{
|
||||
std::lock_guard lock(shared->mutex);
|
||||
|
||||
if (!shared->storage_s3_settings)
|
||||
{
|
||||
const auto & config = shared->config ? *shared->config : Poco::Util::Application::instance().config();
|
||||
shared->storage_s3_settings.emplace().loadFromConfig("s3", config, getSettingsRef());
|
||||
shared->storage_s3_settings.emplace().loadFromConfig(config, "s3", getSettingsRef());
|
||||
}
|
||||
|
||||
return *shared->storage_s3_settings;
|
||||
|
@ -37,7 +37,7 @@ class FilesystemCacheLog;
|
||||
class FilesystemReadPrefetchesLog;
|
||||
class BlobStorageLog;
|
||||
class IOUringReader;
|
||||
class StorageS3Settings;
|
||||
class S3SettingsByEndpoint;
|
||||
|
||||
/// A small class which owns ContextShared.
|
||||
/// We don't use something like unique_ptr directly to allow ContextShared type to be incomplete.
|
||||
@ -163,7 +163,7 @@ public:
|
||||
|
||||
zkutil::ZooKeeperPtr getZooKeeper() const;
|
||||
|
||||
const StorageS3Settings & getStorageS3Settings() const;
|
||||
const S3SettingsByEndpoint & getStorageS3Settings() const;
|
||||
|
||||
const String & getUserName() const { static std::string user; return user; }
|
||||
|
||||
|
@ -191,7 +191,7 @@ void registerS3ObjectStorage(ObjectStorageFactory & factory)
|
||||
auto uri = getS3URI(config, config_prefix, context);
|
||||
auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
|
||||
auto endpoint = getEndpoint(config, config_prefix, context);
|
||||
auto settings = getSettings(config, config_prefix, context, endpoint, /* for_disk_s3 */true, /* validate_settings */true);
|
||||
auto settings = getSettings(config, config_prefix, context, endpoint, /* validate_settings */true);
|
||||
auto client = getClient(endpoint, *settings, context, /* for_disk_s3 */true);
|
||||
auto key_generator = getKeyGenerator(uri, config, config_prefix);
|
||||
|
||||
@ -228,7 +228,7 @@ void registerS3PlainObjectStorage(ObjectStorageFactory & factory)
|
||||
auto uri = getS3URI(config, config_prefix, context);
|
||||
auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
|
||||
auto endpoint = getEndpoint(config, config_prefix, context);
|
||||
auto settings = getSettings(config, config_prefix, context, endpoint, /* for_disk_s3 */true, /* validate_settings */true);
|
||||
auto settings = getSettings(config, config_prefix, context, endpoint, /* validate_settings */true);
|
||||
auto client = getClient(endpoint, *settings, context, /* for_disk_s3 */true);
|
||||
auto key_generator = getKeyGenerator(uri, config, config_prefix);
|
||||
|
||||
@ -263,7 +263,7 @@ void registerS3PlainRewritableObjectStorage(ObjectStorageFactory & factory)
|
||||
auto uri = getS3URI(config, config_prefix, context);
|
||||
auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
|
||||
auto endpoint = getEndpoint(config, config_prefix, context);
|
||||
auto settings = getSettings(config, config_prefix, context, endpoint, /* for_disk_s3 */true, /* validate_settings */true);
|
||||
auto settings = getSettings(config, config_prefix, context, endpoint, /* validate_settings */true);
|
||||
auto client = getClient(endpoint, *settings, context, /* for_disk_s3 */true);
|
||||
auto key_generator = getKeyGenerator(uri, config, config_prefix);
|
||||
|
||||
|
@ -574,7 +574,7 @@ void S3ObjectStorage::applyNewSettings(
|
||||
ContextPtr context,
|
||||
const ApplyNewSettingsOptions & options)
|
||||
{
|
||||
auto settings_from_config = getSettings(config, config_prefix, context, uri.endpoint, for_disk_s3, context->getSettingsRef().s3_validate_request_settings);
|
||||
auto settings_from_config = getSettings(config, config_prefix, context, uri.uri_str, context->getSettingsRef().s3_validate_request_settings);
|
||||
auto modified_settings = std::make_unique<S3ObjectStorageSettings>(*s3_settings.get());
|
||||
modified_settings->auth_settings.updateIfChanged(settings_from_config->auth_settings);
|
||||
modified_settings->request_settings.updateIfChanged(settings_from_config->request_settings);
|
||||
@ -598,7 +598,8 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
|
||||
const std::string & config_prefix,
|
||||
ContextPtr context)
|
||||
{
|
||||
auto new_s3_settings = getSettings(config, config_prefix, context, uri.endpoint, for_disk_s3, true);
|
||||
const auto & settings = context->getSettingsRef();
|
||||
auto new_s3_settings = getSettings(config, config_prefix, context, uri.uri_str, settings.s3_validate_request_settings);
|
||||
auto new_client = getClient(uri, *new_s3_settings, context, for_disk_s3);
|
||||
|
||||
auto new_uri{uri};
|
||||
|
@ -35,12 +35,11 @@ std::unique_ptr<S3ObjectStorageSettings> getSettings(
|
||||
const String & config_prefix,
|
||||
ContextPtr context,
|
||||
const std::string & endpoint,
|
||||
bool for_disk_s3,
|
||||
bool validate_settings)
|
||||
{
|
||||
const auto & settings = context->getSettingsRef();
|
||||
auto auth_settings = S3::AuthSettings(config, settings, for_disk_s3, for_disk_s3 ? config_prefix : "");
|
||||
auto request_settings = S3::RequestSettings(config, settings, for_disk_s3, validate_settings, for_disk_s3 ? config_prefix : "");
|
||||
auto auth_settings = S3::AuthSettings(config, settings, config_prefix);
|
||||
auto request_settings = S3::RequestSettings(config, settings, config_prefix, validate_settings);
|
||||
|
||||
request_settings.proxy_resolver = DB::ProxyConfigurationResolverProvider::getFromOldSettingsFormat(
|
||||
ProxyConfiguration::protocolFromString(S3::URI(endpoint).uri.getScheme()), config_prefix, config);
|
||||
|
@ -19,7 +19,6 @@ std::unique_ptr<S3ObjectStorageSettings> getSettings(
|
||||
const String & config_prefix,
|
||||
ContextPtr context,
|
||||
const std::string & endpoint,
|
||||
bool for_disk_s3,
|
||||
bool validate_settings);
|
||||
|
||||
std::unique_ptr<S3::Client> getClient(
|
||||
|
@ -103,48 +103,45 @@ ServerSideEncryptionKMSConfig getSSEKMSConfig(const std::string & config_elem, c
|
||||
}
|
||||
|
||||
template <typename Settings>
|
||||
static void updateS3SettingsFromConfig(
|
||||
Settings & s3_settings,
|
||||
static bool setValueFromConfig(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & path,
|
||||
typename Settings::SettingFieldRef & field)
|
||||
{
|
||||
if (!config.has(path))
|
||||
return false;
|
||||
|
||||
auto which = field.getValue().getType();
|
||||
if (isInt64OrUInt64FieldType(which))
|
||||
field.setValue(config.getUInt64(path));
|
||||
else if (which == Field::Types::String)
|
||||
field.setValue(config.getString(path));
|
||||
else if (which == Field::Types::Bool)
|
||||
field.setValue(config.getBool(path));
|
||||
else
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected type: {}", field.getTypeName());
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
AuthSettings::AuthSettings(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const DB::Settings & settings,
|
||||
bool for_disk_s3,
|
||||
const std::string & disk_config_prefix)
|
||||
const std::string & config_prefix,
|
||||
const std::string & fallback_config_prefix)
|
||||
{
|
||||
if (for_disk_s3 && disk_config_prefix.empty())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk config path cannot be empty");
|
||||
if (config_prefix.empty())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Config path cannot be empty");
|
||||
|
||||
auto update_value_if_exists = [&](const std::string & path, Settings::SettingFieldRef & field) -> bool
|
||||
for (auto & field : allMutable())
|
||||
{
|
||||
if (!config.has(path))
|
||||
return false;
|
||||
auto path = fmt::format("{}.{}", config_prefix, field.getName());
|
||||
auto fallback_path = fallback_config_prefix.empty() ? "" : fmt::format("{}.{}", fallback_config_prefix, field.getName());
|
||||
|
||||
auto which = field.getValue().getType();
|
||||
if (isInt64OrUInt64FieldType(which))
|
||||
field.setValue(config.getUInt64(path));
|
||||
else if (which == Field::Types::String)
|
||||
field.setValue(config.getString(path));
|
||||
else if (which == Field::Types::Bool)
|
||||
field.setValue(config.getBool(path));
|
||||
else
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected type: {}", field.getTypeName());
|
||||
return true;
|
||||
};
|
||||
|
||||
for (auto & field : s3_settings.allMutable())
|
||||
{
|
||||
std::string path, fallback_path;
|
||||
if (for_disk_s3)
|
||||
{
|
||||
path = fmt::format("{}.s3_{}", disk_config_prefix, field.getName());
|
||||
fallback_path = fmt::format("s3.{}", field.getName());
|
||||
}
|
||||
else
|
||||
path = fmt::format("s3.{}", field.getName());
|
||||
|
||||
bool updated = update_value_if_exists(path, field);
|
||||
bool updated = setValueFromConfig<AuthSettings>(config, path, field);
|
||||
|
||||
if (!updated && !fallback_path.empty())
|
||||
updated = update_value_if_exists(fallback_path, field);
|
||||
updated = setValueFromConfig<AuthSettings>(config, fallback_path, field);
|
||||
|
||||
if (!updated)
|
||||
{
|
||||
@ -153,17 +150,7 @@ static void updateS3SettingsFromConfig(
|
||||
field.setValue(settings.get(setting_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
AuthSettings::AuthSettings(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const DB::Settings & settings,
|
||||
bool for_disk_s3,
|
||||
const std::string & disk_config_prefix)
|
||||
{
|
||||
updateS3SettingsFromConfig(*this, config, settings, for_disk_s3, disk_config_prefix);
|
||||
|
||||
const auto config_prefix = for_disk_s3 ? disk_config_prefix : "s3";
|
||||
headers = getHTTPHeaders(config_prefix, config);
|
||||
server_side_encryption_kms_config = getSSEKMSConfig(config_prefix, config);
|
||||
|
||||
@ -220,11 +207,48 @@ void AuthSettings::updateIfChanged(const AuthSettings & settings)
|
||||
RequestSettings::RequestSettings(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const DB::Settings & settings,
|
||||
bool for_disk_s3,
|
||||
bool validate_settings,
|
||||
const std::string & disk_config_path)
|
||||
const std::string & config_prefix,
|
||||
bool validate_settings)
|
||||
: RequestSettings(
|
||||
config,
|
||||
settings,
|
||||
config_prefix,
|
||||
validate_settings,
|
||||
config_prefix == "s3" ? "" : "s3_", /* setting_name_prefix */
|
||||
config_prefix == "s3" ? "" : "s3", /* fallback_config_prefix */
|
||||
"") /* fallback_setting_name_prefix */
|
||||
{
|
||||
updateS3SettingsFromConfig(*this, config, settings, for_disk_s3, disk_config_path);
|
||||
}
|
||||
|
||||
RequestSettings::RequestSettings(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const DB::Settings & settings,
|
||||
const std::string & config_prefix,
|
||||
bool validate_settings,
|
||||
const std::string & setting_name_prefix,
|
||||
const std::string & fallback_config_prefix,
|
||||
const std::string & fallback_setting_name_prefix)
|
||||
{
|
||||
if (config_prefix.empty())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Config path cannot be empty");
|
||||
|
||||
for (auto & field : allMutable())
|
||||
{
|
||||
auto path = fmt::format("{}.{}{}", config_prefix, setting_name_prefix, field.getName());
|
||||
auto fallback_path = fallback_config_prefix.empty() ? "" : fmt::format("{}.{}{}", fallback_config_prefix, fallback_setting_name_prefix, field.getName());
|
||||
|
||||
bool updated = setValueFromConfig<RequestSettings>(config, path, field);
|
||||
|
||||
if (!updated && !fallback_path.empty())
|
||||
updated = setValueFromConfig<RequestSettings>(config, fallback_path, field);
|
||||
|
||||
if (!updated)
|
||||
{
|
||||
auto setting_name = "s3_" + field.getName();
|
||||
if (settings.has(setting_name) && settings.isChanged(setting_name))
|
||||
field.setValue(settings.get(setting_name));
|
||||
}
|
||||
}
|
||||
finishInit(settings, validate_settings);
|
||||
}
|
||||
|
||||
|
@ -75,7 +75,8 @@ namespace S3
|
||||
{
|
||||
/// We use s3 settings for DiskS3, StorageS3 (StorageS3Cluster, S3Queue, etc), BackupIO_S3, etc.
|
||||
/// 1. For DiskS3 we usually have configuration in disk section in configuration file.
|
||||
/// All s3 related settings start with "s3_" prefix there.
|
||||
/// REQUEST_SETTINGS, PART_UPLOAD_SETTINGS start with "s3_" prefix there, while AUTH_SETTINGS and CLIENT_SETTINGS do not
|
||||
/// (does not make sense, but it happened this way).
|
||||
/// If some setting is absent from disk configuration, we look up for it in the "s3." server config section,
|
||||
/// where s3 settings no longer have "s3_" prefix like in disk configuration section.
|
||||
/// If the settings is absent there as well, we look up for it in Users config (where query/session settings are also updated).
|
||||
@ -143,8 +144,8 @@ struct AuthSettings : public BaseSettings<AuthSettingsTraits>
|
||||
AuthSettings(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const DB::Settings & settings,
|
||||
bool for_disk_s3,
|
||||
const std::string & disk_config_prefix = "");
|
||||
const std::string & config_prefix,
|
||||
const std::string & fallback_config_prefix = "");
|
||||
|
||||
AuthSettings(const DB::Settings & settings);
|
||||
|
||||
@ -168,9 +169,8 @@ struct RequestSettings : public BaseSettings<RequestSettingsTraits>
|
||||
RequestSettings(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const DB::Settings & settings,
|
||||
bool for_disk_s3,
|
||||
bool validate_settings = true,
|
||||
const std::string & disk_config_path = "");
|
||||
const std::string & config_prefix,
|
||||
bool validate_settings = true);
|
||||
|
||||
/// Create request settings from DB::Settings.
|
||||
explicit RequestSettings(const DB::Settings & settings, bool validate_settings = true);
|
||||
@ -190,6 +190,14 @@ struct RequestSettings : public BaseSettings<RequestSettingsTraits>
|
||||
std::shared_ptr<ProxyConfigurationResolver> proxy_resolver;
|
||||
|
||||
private:
|
||||
RequestSettings(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const DB::Settings & settings,
|
||||
const std::string & config_prefix,
|
||||
bool validate_settings,
|
||||
const std::string & setting_name_prefix,
|
||||
const std::string & fallback_config_prefix,
|
||||
const std::string & fallback_setting_name_prefix);
|
||||
void finishInit(const DB::Settings & settings, bool validate_settings);
|
||||
void normalizeSettings();
|
||||
};
|
||||
|
@ -28,8 +28,8 @@ void S3SettingsByEndpoint::loadFromConfig(
|
||||
if (config.has(endpoint_path))
|
||||
{
|
||||
auto endpoint = config.getString(endpoint_path);
|
||||
auto auth_settings = S3::AuthSettings(config, settings, /* for_disk_s3 */false, config_prefix);
|
||||
auto request_settings = S3::RequestSettings(config, settings, /* for_disk_s3 */false, settings.s3_validate_request_settings, config_prefix);
|
||||
auto auth_settings = S3::AuthSettings(config, settings, config_prefix);
|
||||
auto request_settings = S3::RequestSettings(config, settings, config_prefix, settings.s3_validate_request_settings);
|
||||
s3_settings.emplace(endpoint, S3Settings{std::move(auth_settings), std::move(request_settings)});
|
||||
}
|
||||
}
|
||||
|
@ -106,11 +106,9 @@ ObjectStoragePtr StorageS3Configuration::createObjectStorage(ContextPtr context,
|
||||
|
||||
const auto & config = context->getConfigRef();
|
||||
const auto & settings = context->getSettingsRef();
|
||||
const std::string config_prefix = "s3.";
|
||||
|
||||
auto s3_settings = getSettings(
|
||||
config, config_prefix, context, url.endpoint, /* for_disk_s3 */false,
|
||||
settings.s3_validate_request_settings);
|
||||
config, "s3"/* config_prefix */, context, url.endpoint, settings.s3_validate_request_settings);
|
||||
|
||||
s3_settings->auth_settings.updateIfChanged(auth_settings);
|
||||
s3_settings->request_settings.updateIfChanged(request_settings);
|
||||
|
Loading…
Reference in New Issue
Block a user