Fixes after merge with master, move some part of code to object storage

This commit is contained in:
kssenii 2024-02-19 10:45:56 +01:00
parent 1cccb3ccdf
commit 0552f44f70
32 changed files with 498 additions and 330 deletions

View File

@ -127,10 +127,10 @@ BackupReaderS3::BackupReaderS3(
: BackupReaderDefault(read_settings_, write_settings_, getLogger("BackupReaderS3"))
, s3_uri(s3_uri_)
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::S3, MetadataStorageType::None, s3_uri.endpoint, false, false}
, s3_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()))
, s3_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).value_or(S3Settings{}))
{
auto & request_settings = s3_settings.request_settings;
request_settings.updateFromSettings(context_->getSettingsRef());
request_settings.updateFromSettingsIfChanged(context_->getSettingsRef());
request_settings.max_single_read_retries = context_->getSettingsRef().s3_max_single_read_retries; // FIXME: Avoid taking value for endpoint
request_settings.allow_native_copy = allow_s3_native_copy;
client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, s3_settings, context_);
@ -217,10 +217,10 @@ BackupWriterS3::BackupWriterS3(
: BackupWriterDefault(read_settings_, write_settings_, getLogger("BackupWriterS3"))
, s3_uri(s3_uri_)
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::S3, MetadataStorageType::None, s3_uri.endpoint, false, false}
, s3_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()))
, s3_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).value_or(S3Settings{}))
{
auto & request_settings = s3_settings.request_settings;
request_settings.updateFromSettings(context_->getSettingsRef());
request_settings.updateFromSettingsIfChanged(context_->getSettingsRef());
request_settings.max_single_read_retries = context_->getSettingsRef().s3_max_single_read_retries; // FIXME: Avoid taking value for endpoint
request_settings.allow_native_copy = allow_s3_native_copy;
request_settings.setStorageClassName(storage_class_name);

View File

@ -193,8 +193,7 @@ public:
virtual void applyNewSettings(
const Poco::Util::AbstractConfiguration &,
const std::string & /*config_prefix*/,
ContextPtr)
{}
ContextPtr) {}
/// Sometimes object storages have something similar to chroot or namespace, for example
/// buckets in S3. If object storage doesn't have any namepaces return empty string.

View File

@ -126,7 +126,7 @@ void registerS3ObjectStorage(ObjectStorageFactory & factory)
auto uri = getS3URI(config, config_prefix, context);
auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
auto settings = getSettings(config, config_prefix, context);
auto client = getClient(config, config_prefix, context, *settings);
auto client = getClient(config, config_prefix, context, *settings, true);
auto key_generator = getKeyGenerator(disk_type, uri, config, config_prefix);
auto object_storage = std::make_shared<S3ObjectStorage>(
@ -162,7 +162,7 @@ void registerS3PlainObjectStorage(ObjectStorageFactory & factory)
auto uri = getS3URI(config, config_prefix, context);
auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
auto settings = getSettings(config, config_prefix, context);
auto client = getClient(config, config_prefix, context, *settings);
auto client = getClient(config, config_prefix, context, *settings, true);
auto key_generator = getKeyGenerator(disk_type, uri, config, config_prefix);
auto object_storage = std::make_shared<S3PlainObjectStorage>(

View File

@ -242,7 +242,12 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
if (mode != WriteMode::Rewrite)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3 doesn't support append to files");
auto settings_ptr = s3_settings.get();
S3Settings::RequestSettings request_settings = s3_settings.get()->request_settings;
if (auto query_context = CurrentThread::getQueryContext())
{
request_settings.updateFromSettingsIfChanged(query_context->getSettingsRef());
}
ThreadPoolCallbackRunner<void> scheduler;
if (write_settings.s3_allow_parallel_part_upload)
scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "VFSWrite");
@ -256,7 +261,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
uri.bucket,
object.remote_path,
buf_size,
settings_ptr->request_settings,
request_settings,
std::move(blob_storage_log),
attributes,
std::move(scheduler),
@ -534,19 +539,57 @@ void S3ObjectStorage::startup()
const_cast<S3::Client &>(*client.get()).EnableRequestProcessing();
}
void S3ObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
void S3ObjectStorage::applyNewSettings(
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
ContextPtr context)
{
auto new_s3_settings = getSettings(config, config_prefix, context);
auto new_client = getClient(config, config_prefix, context, *new_s3_settings);
if (!static_headers.empty())
{
new_s3_settings->auth_settings.headers.insert(
new_s3_settings->auth_settings.headers.end(),
static_headers.begin(), static_headers.end());
}
if (auto endpoint_settings = context->getStorageS3Settings().getSettings(uri.uri.toString()))
new_s3_settings->auth_settings.updateFrom(endpoint_settings->auth_settings);
auto current_s3_settings = s3_settings.get();
if (current_s3_settings->auth_settings.hasUpdates(new_s3_settings->auth_settings) || for_disk_s3)
{
auto new_client = getClient(config, config_prefix, context, *new_s3_settings, for_disk_s3, &uri);
client.set(std::move(new_client));
}
s3_settings.set(std::move(new_s3_settings));
client.set(std::move(new_client));
}
// void S3ObjectStorage::applyNewSettings(ContextPtr context)
// {
// auto settings = s3_settings.get();
// if (!endpoint_settings || !settings->auth_settings.hasUpdates(endpoint_settings->auth_settings))
// return;
//
// const auto & config = context->getConfigRef();
// auto new_s3_settings = getSettings(uri, config, "s3.", context);
//
// new_s3_settings->auth_settings.updateFrom(endpoint_settings->auth_settings);
//
// auto new_client = getClient(config, "s3.", context, *new_s3_settings, false);
//
// s3_settings.set(std::move(new_s3_settings));
// client.set(std::move(new_client));
// }
std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
const std::string & new_namespace, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
const std::string & new_namespace,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
ContextPtr context)
{
auto new_s3_settings = getSettings(config, config_prefix, context);
auto new_client = getClient(config, config_prefix, context, *new_s3_settings);
auto new_client = getClient(config, config_prefix, context, *new_s3_settings, true);
String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
auto new_uri{uri};

View File

@ -21,11 +21,13 @@ struct S3ObjectStorageSettings
S3ObjectStorageSettings(
const S3Settings::RequestSettings & request_settings_,
const S3::AuthSettings & auth_settings_,
uint64_t min_bytes_for_seek_,
int32_t list_object_keys_size_,
int32_t objects_chunk_size_to_delete_,
bool read_only_)
: request_settings(request_settings_)
, auth_settings(auth_settings_)
, min_bytes_for_seek(min_bytes_for_seek_)
, list_object_keys_size(list_object_keys_size_)
, objects_chunk_size_to_delete(objects_chunk_size_to_delete_)
@ -33,6 +35,7 @@ struct S3ObjectStorageSettings
{}
S3Settings::RequestSettings request_settings;
S3::AuthSettings auth_settings;
uint64_t min_bytes_for_seek;
int32_t list_object_keys_size;
@ -52,7 +55,9 @@ private:
S3::URI uri_,
const S3Capabilities & s3_capabilities_,
ObjectStorageKeysGeneratorPtr key_generator_,
const String & disk_name_)
const String & disk_name_,
bool for_disk_s3_ = true,
const HTTPHeaderEntries & static_headers_ = {})
: uri(uri_)
, key_generator(std::move(key_generator_))
, disk_name(disk_name_)
@ -60,6 +65,8 @@ private:
, s3_settings(std::move(s3_settings_))
, s3_capabilities(s3_capabilities_)
, log(getLogger(logger_name))
, for_disk_s3(for_disk_s3_)
, static_headers(static_headers_)
{
}
@ -180,6 +187,9 @@ private:
S3Capabilities s3_capabilities;
LoggerPtr log;
const bool for_disk_s3;
const HTTPHeaderEntries static_headers;
};
/// Do not encode keys, store as-is, and do not require separate disk for metadata.

View File

@ -10,8 +10,6 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include "Disks/DiskFactory.h"
#include <aws/core/client/DefaultRetryStrategy.h>
#include <base/getFQDNOrHostName.h>
#include <IO/S3Common.h>
@ -25,13 +23,19 @@
namespace DB
{
std::unique_ptr<S3ObjectStorageSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context)
std::unique_ptr<S3ObjectStorageSettings> getSettings(
const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context)
{
const Settings & settings = context->getSettingsRef();
S3Settings::RequestSettings request_settings(config, config_prefix, settings, "s3_");
/// TODO: add request settings prefix, becausse for StorageS3 it should be "s3."
S3::AuthSettings auth_settings;
auth_settings.loadFromConfig(config_prefix, config);
return std::make_unique<S3ObjectStorageSettings>(
request_settings,
auth_settings,
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getInt(config_prefix + ".list_object_keys_size", 1000),
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000),
@ -42,78 +46,92 @@ std::unique_ptr<S3::Client> getClient(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const S3ObjectStorageSettings & settings)
const S3ObjectStorageSettings & settings,
bool for_disk_s3,
const S3::URI * url_)
{
const Settings & global_settings = context->getGlobalContext()->getSettingsRef();
const Settings & local_settings = context->getSettingsRef();
String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
S3::URI uri(endpoint);
if (!uri.key.ends_with('/'))
uri.key.push_back('/');
const auto & auth_settings = settings.auth_settings;
const auto & request_settings = settings.request_settings;
S3::URI url;
if (for_disk_s3)
{
String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
S3::URI uri(endpoint);
if (!uri.key.ends_with('/'))
uri.key.push_back('/');
}
else
{
if (!url_)
throw Exception(ErrorCodes::LOGICAL_ERROR, "URL not passed");
url = *url_;
}
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
config.getString(config_prefix + ".region", ""),
auth_settings.region,
context->getRemoteHostFilter(),
static_cast<int>(global_settings.s3_max_redirects),
static_cast<int>(global_settings.s3_retry_attempts),
global_settings.enable_s3_requests_logging,
/* for_disk_s3 = */ true,
for_disk_s3,
settings.request_settings.get_request_throttler,
settings.request_settings.put_request_throttler,
uri.uri.getScheme());
url.uri.getScheme());
client_configuration.endpointOverride = url.endpoint;
client_configuration.maxConnections = static_cast<unsigned>(request_settings.max_connections);
client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", S3::DEFAULT_CONNECT_TIMEOUT_MS);
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", S3::DEFAULT_REQUEST_TIMEOUT_MS);
client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", S3::DEFAULT_MAX_CONNECTIONS);
client_configuration.endpointOverride = uri.endpoint;
client_configuration.http_keep_alive_timeout_ms = config.getUInt(
config_prefix + ".http_keep_alive_timeout_ms", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT * 1000);
client_configuration.http_connection_pool_size = config.getUInt(config_prefix + ".http_connection_pool_size", 1000);
client_configuration.wait_on_pool_size_limit = false;
client_configuration.s3_use_adaptive_timeouts = config.getBool(
config_prefix + ".use_adaptive_timeouts", client_configuration.s3_use_adaptive_timeouts);
/*
* Override proxy configuration for backwards compatibility with old configuration format.
* */
auto proxy_config = DB::ProxyConfigurationResolverProvider::getFromOldSettingsFormat(
ProxyConfiguration::protocolFromString(uri.uri.getScheme()),
config_prefix,
config
);
if (proxy_config)
client_configuration.http_keep_alive_timeout_ms = config.getUInt(config_prefix + ".http_keep_alive_timeout_ms", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT * 1000);
client_configuration.http_connection_pool_size = config.getUInt(
config_prefix + ".http_connection_pool_size", static_cast<UInt32>(global_settings.s3_http_connection_pool_size.value));
client_configuration.s3_use_adaptive_timeouts = config.getBool(config_prefix + ".use_adaptive_timeouts", client_configuration.s3_use_adaptive_timeouts);
client_configuration.wait_on_pool_size_limit = for_disk_s3;
if (for_disk_s3)
{
client_configuration.per_request_configuration
= [proxy_config]() { return proxy_config->resolve(); };
client_configuration.error_report
= [proxy_config](const auto & request_config) { proxy_config->errorReport(request_config); };
/*
* Override proxy configuration for backwards compatibility with old configuration format.
* */
if (auto proxy_config = DB::ProxyConfigurationResolverProvider::getFromOldSettingsFormat(
ProxyConfiguration::protocolFromString(url.uri.getScheme()), config_prefix, config))
{
client_configuration.per_request_configuration
= [proxy_config]() { return proxy_config->resolve(); };
client_configuration.error_report
= [proxy_config](const auto & request_config) { proxy_config->errorReport(request_config); };
}
}
HTTPHeaderEntries headers = S3::getHTTPHeaders(config_prefix, config);
S3::ServerSideEncryptionKMSConfig sse_kms_config = S3::getSSEKMSConfig(config_prefix, config);
S3::ClientSettings client_settings{
.use_virtual_addressing = uri.is_virtual_hosted_style,
.use_virtual_addressing = url.is_virtual_hosted_style,
.disable_checksum = local_settings.s3_disable_checksum,
.gcs_issue_compose_request = config.getBool("s3.gcs_issue_compose_request", false),
};
auto credentials_configuration = S3::CredentialsConfiguration
{
auth_settings.use_environment_credentials.value_or(context->getConfigRef().getBool("s3.use_environment_credentials", true)),
auth_settings.use_insecure_imds_request.value_or(context->getConfigRef().getBool("s3.use_insecure_imds_request", false)),
auth_settings.expiration_window_seconds.value_or(context->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
auth_settings.no_sign_request.value_or(context->getConfigRef().getBool("s3.no_sign_request", false)),
};
return S3::ClientFactory::instance().create(
client_configuration,
client_settings,
config.getString(config_prefix + ".access_key_id", ""),
config.getString(config_prefix + ".secret_access_key", ""),
config.getString(config_prefix + ".server_side_encryption_customer_key_base64", ""),
auth_settings.access_key_id,
auth_settings.secret_access_key,
auth_settings.server_side_encryption_customer_key_base64,
std::move(sse_kms_config),
std::move(headers),
S3::CredentialsConfiguration
{
config.getBool(config_prefix + ".use_environment_credentials", config.getBool("s3.use_environment_credentials", true)),
config.getBool(config_prefix + ".use_insecure_imds_request", config.getBool("s3.use_insecure_imds_request", false)),
config.getUInt64(config_prefix + ".expiration_window_seconds", config.getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
config.getBool(config_prefix + ".no_sign_request", config.getBool("s3.no_sign_request", false))
});
auth_settings.headers,
credentials_configuration);
}
}

View File

@ -22,9 +22,18 @@ namespace DB
struct S3ObjectStorageSettings;
std::unique_ptr<S3ObjectStorageSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context);
std::unique_ptr<S3ObjectStorageSettings> getSettings(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context);
std::unique_ptr<S3::Client> getClient(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context, const S3ObjectStorageSettings & settings);
std::unique_ptr<S3::Client> getClient(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const S3ObjectStorageSettings & settings,
bool for_disk_s3,
const S3::URI * url_ = nullptr);
}

View File

@ -157,8 +157,11 @@ void AuthSettings::updateFrom(const AuthSettings & from)
if (!from.session_token.empty())
session_token = from.session_token;
headers = from.headers;
region = from.region;
if (!from.headers.empty())
headers = from.headers;
if (!from.region.empty())
region = from.region;
server_side_encryption_customer_key_base64 = from.server_side_encryption_customer_key_base64;
server_side_encryption_kms_config = from.server_side_encryption_kms_config;

View File

@ -1,5 +1,6 @@
#include <Storages/Cache/SchemaCache.h>
#include <Common/ProfileEvents.h>
#include <Common/logger_useful.h>
#include <ctime>
namespace ProfileEvents
@ -109,6 +110,7 @@ std::optional<SchemaCache::SchemaInfo> SchemaCache::tryGetImpl(const Key & key,
}
ProfileEvents::increment(ProfileEvents::SchemaInferenceCacheHits);
LOG_TEST(&Poco::Logger::get("kssenii"), "KSSENII: {}", StackTrace().toString());
auto & schema_info = it->second.schema_info;
auto & queue_iterator = it->second.iterator;

View File

@ -102,7 +102,7 @@ AzureObjectStorage::SettingsPtr StorageAzureBlobConfiguration::createSettings(Co
return settings_ptr;
}
ObjectStoragePtr StorageAzureBlobConfiguration::createOrUpdateObjectStorage(ContextPtr context, bool is_readonly) /// NOLINT
ObjectStoragePtr StorageAzureBlobConfiguration::createObjectStorage(ContextPtr context, bool is_readonly) /// NOLINT
{
auto client = createClient(is_readonly);
auto settings = createSettings(context);
@ -245,8 +245,6 @@ void StorageAzureBlobConfiguration::fromNamedCollection(const NamedCollection &
compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
blobs_paths = {blob_path};
if (format == "auto")
format = FormatFactory::instance().getFormatFromFileName(blob_path, true);
}
void StorageAzureBlobConfiguration::fromAST(ASTs & engine_args, ContextPtr context, bool with_structure)
@ -367,9 +365,6 @@ void StorageAzureBlobConfiguration::fromAST(ASTs & engine_args, ContextPtr conte
}
blobs_paths = {blob_path};
if (format == "auto")
format = FormatFactory::instance().getFormatFromFileName(blob_path, true);
}
void StorageAzureBlobConfiguration::addStructureToArgs(ASTs & args, const String & structure_, ContextPtr context)

View File

@ -31,7 +31,7 @@ public:
String getNamespace() const override { return container; }
void check(ContextPtr context) const override;
ObjectStoragePtr createOrUpdateObjectStorage(ContextPtr context, bool is_readonly = true) override; /// NOLINT
ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly = true) override; /// NOLINT
StorageObjectStorageConfigurationPtr clone() override { return std::make_shared<StorageAzureBlobConfiguration>(*this); }
void fromNamedCollection(const NamedCollection & collection) override;

View File

@ -39,7 +39,7 @@ public:
std::optional<FormatSettings> format_settings_,
bool attach)
{
auto object_storage = base_configuration->createOrUpdateObjectStorage(context);
auto object_storage = base_configuration->createObjectStorage(context);
DataLakeMetadataPtr metadata;
NamesAndTypesList schema_from_metadata;
ConfigurationPtr configuration = base_configuration->clone();
@ -75,28 +75,22 @@ public:
return ColumnsDescription(metadata->getTableSchema());
}
std::pair<ConfigurationPtr, ObjectStoragePtr> updateConfigurationAndGetCopy(ContextPtr local_context) override
void updateConfiguration(ContextPtr local_context) override
{
std::lock_guard lock(Storage::configuration_update_mutex);
auto new_object_storage = base_configuration->createOrUpdateObjectStorage(local_context);
bool updated = new_object_storage != nullptr;
if (updated)
Storage::object_storage = new_object_storage;
Storage::updateConfiguration(local_context);
auto new_metadata = DataLakeMetadata::create(Storage::object_storage, base_configuration, local_context);
if (!current_metadata || !(*current_metadata == *new_metadata))
current_metadata = std::move(new_metadata);
else if (!updated)
return {Storage::configuration, Storage::object_storage};
if (current_metadata && *current_metadata == *new_metadata)
return;
current_metadata = std::move(new_metadata);
auto updated_configuration = base_configuration->clone();
/// If metadata wasn't changed, we won't list data files again.
updated_configuration->getPaths() = current_metadata->getDataFiles();
Storage::configuration = updated_configuration;
return {Storage::configuration, Storage::object_storage};
}
template <typename... Args>

View File

@ -27,7 +27,7 @@ void StorageHDFSConfiguration::check(ContextPtr context) const
checkHDFSURL(url);
}
ObjectStoragePtr StorageHDFSConfiguration::createOrUpdateObjectStorage(ContextPtr context, bool is_readonly) /// NOLINT
ObjectStoragePtr StorageHDFSConfiguration::createObjectStorage(ContextPtr context, bool is_readonly) /// NOLINT
{
UNUSED(is_readonly);
auto settings = std::make_unique<HDFSObjectStorageSettings>();
@ -42,16 +42,13 @@ void StorageHDFSConfiguration::fromAST(ASTs & args, ContextPtr, bool /* with_str
if (args.size() > 1)
format_name = checkAndGetLiteralArgument<String>(args[1], "format_name");
if (format_name == "auto")
format_name = FormatFactory::instance().getFormatFromFileName(url, true);
String compression_method;
if (args.size() == 3)
compression_method = checkAndGetLiteralArgument<String>(args[2], "compression_method");
else
compression_method = "auto";
}
}
#endif

View File

@ -26,7 +26,7 @@ public:
String getDataSourceDescription() override { return url; }
void check(ContextPtr context) const override;
ObjectStoragePtr createOrUpdateObjectStorage(ContextPtr context, bool is_readonly = true) override; /// NOLINT
ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly = true) override; /// NOLINT
StorageObjectStorageConfigurationPtr clone() override { return std::make_shared<StorageHDFSConfiguration>(*this); }
void fromNamedCollection(const NamedCollection &) override {}

View File

@ -10,6 +10,7 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int CANNOT_DETECT_FORMAT;
}
@ -30,14 +31,15 @@ ReadBufferIterator::ReadBufferIterator(
, query_settings(query_settings_)
, schema_cache(schema_cache_)
, read_keys(read_keys_)
, format(configuration->format.empty() || configuration->format == "auto" ? std::nullopt : std::optional<String>(configuration->format))
, prev_read_keys_size(read_keys_.size())
{
}
SchemaCache::Key ReadBufferIterator::getKeyForSchemaCache(const String & path) const
SchemaCache::Key ReadBufferIterator::getKeyForSchemaCache(const String & path, const String & format_name) const
{
auto source = fs::path(configuration->getDataSourceDescription()) / path;
return DB::getKeyForSchemaCache(source, configuration->format, format_settings, getContext());
return DB::getKeyForSchemaCache(source, format_name, format_settings, getContext());
}
SchemaCache::Keys ReadBufferIterator::getPathsForSchemaCache() const
@ -51,7 +53,7 @@ SchemaCache::Keys ReadBufferIterator::getPathsForSchemaCache() const
{
return fs::path(configuration->getDataSourceDescription()) / elem->relative_path;
});
return DB::getKeysForSchemaCache(sources, configuration->format, format_settings, getContext());
return DB::getKeysForSchemaCache(sources, *format, format_settings, getContext());
}
std::optional<ColumnsDescription> ReadBufferIterator::tryGetColumnsFromCache(
@ -75,10 +77,29 @@ std::optional<ColumnsDescription> ReadBufferIterator::tryGetColumnsFromCache(
}
};
auto cache_key = getKeyForSchemaCache(object_info->relative_path);
auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
if (columns)
return columns;
chassert(object_info);
if (format)
{
auto cache_key = getKeyForSchemaCache(object_info->relative_path, *format);
if (auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time))
return columns;
}
else
{
/// If format is unknown, we can iterate through all possible input formats
/// and check if we have an entry with this format and this file in schema cache.
/// If we have such entry for some format, we can use this format to read the file.
for (const auto & format_name : FormatFactory::instance().getAllInputFormats())
{
auto cache_key = getKeyForSchemaCache(object_info->relative_path, format_name);
if (auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time))
{
/// Now format is known. It should be the same for all files.
format = format_name;
return columns;
}
}
}
}
return std::nullopt;
@ -86,16 +107,18 @@ std::optional<ColumnsDescription> ReadBufferIterator::tryGetColumnsFromCache(
void ReadBufferIterator::setNumRowsToLastFile(size_t num_rows)
{
chassert(current_object_info);
if (query_settings.schema_inference_use_cache)
schema_cache.addNumRows(getKeyForSchemaCache(current_object_info->relative_path), num_rows);
schema_cache.addNumRows(getKeyForSchemaCache(current_object_info->relative_path, *format), num_rows);
}
void ReadBufferIterator::setSchemaToLastFile(const ColumnsDescription & columns)
{
chassert(current_object_info);
if (query_settings.schema_inference_use_cache
&& query_settings.schema_inference_mode == SchemaInferenceMode::UNION)
{
schema_cache.addColumns(getKeyForSchemaCache(current_object_info->relative_path), columns);
schema_cache.addColumns(getKeyForSchemaCache(current_object_info->relative_path, *format), columns);
}
}
@ -108,6 +131,11 @@ void ReadBufferIterator::setResultingSchema(const ColumnsDescription & columns)
}
}
void ReadBufferIterator::setFormatName(const String & format_name)
{
format = format_name;
}
String ReadBufferIterator::getLastFileName() const
{
if (current_object_info)
@ -116,64 +144,128 @@ String ReadBufferIterator::getLastFileName() const
return "";
}
std::pair<std::unique_ptr<ReadBuffer>, std::optional<ColumnsDescription>> ReadBufferIterator::next()
std::unique_ptr<ReadBuffer> ReadBufferIterator::recreateLastReadBuffer()
{
/// For default mode check cached columns for currently read keys on first iteration.
if (first && query_settings.schema_inference_mode == SchemaInferenceMode::DEFAULT)
{
if (auto cached_columns = tryGetColumnsFromCache(read_keys.begin(), read_keys.end()))
return {nullptr, cached_columns};
}
chassert(current_object_info);
current_object_info = file_iterator->next(0);
if (!current_object_info || current_object_info->relative_path.empty())
auto impl = object_storage->readObject(
StoredObject(current_object_info->relative_path), getContext()->getReadSettings());
int zstd_window_log_max = static_cast<int>(getContext()->getSettingsRef().zstd_window_log_max);
return wrapReadBufferWithCompressionMethod(
std::move(impl), chooseCompressionMethod(current_object_info->relative_path, configuration->compression_method),
zstd_window_log_max);
}
ReadBufferIterator::Data ReadBufferIterator::next()
{
if (first)
{
if (first)
/// If format is unknown we iterate through all currently read keys on first iteration and
/// try to determine format by file name.
if (!format)
{
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"Cannot extract table structure from {} format file, "
"because there are no files with provided path. "
"You must specify table structure manually",
configuration->format);
for (const auto & object_info : read_keys)
{
if (auto format_from_file_name = FormatFactory::instance().tryGetFormatFromFileName(object_info->relative_path))
{
format = format_from_file_name;
break;
}
}
}
/// For default mode check cached columns for currently read keys on first iteration.
if (first && getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT)
{
if (auto cached_columns = tryGetColumnsFromCache(read_keys.begin(), read_keys.end()))
return {nullptr, cached_columns, format};
}
return {nullptr, std::nullopt};
}
first = false;
/// File iterator could get new keys after new iteration,
/// check them in schema cache if schema inference mode is default.
if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT
&& read_keys.size() > prev_read_keys_size)
while (true)
{
auto columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end());
prev_read_keys_size = read_keys.size();
if (columns_from_cache)
return {nullptr, columns_from_cache};
current_object_info = file_iterator->next(0);
if (!current_object_info || current_object_info->relative_path.empty())
{
if (first)
{
if (format)
throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"The table structure cannot be extracted from a {} format file, because there are no files with provided path "
"in S3 or all files are empty. You can specify table structure manually",
*format);
throw Exception(
ErrorCodes::CANNOT_DETECT_FORMAT,
"The data format cannot be detected by the contents of the files, because there are no files with provided path "
"in S3 or all files are empty. You can specify the format manually");
}
return {nullptr, std::nullopt, format};
}
/// S3 file iterator could get new keys after new iteration
if (read_keys.size() > prev_read_keys_size)
{
/// If format is unknown we can try to determine it by new file names.
if (!format)
{
for (auto it = read_keys.begin() + prev_read_keys_size; it != read_keys.end(); ++it)
{
if (auto format_from_file_name = FormatFactory::instance().tryGetFormatFromFileName((*it)->relative_path))
{
format = format_from_file_name;
break;
}
}
}
/// Check new files in schema cache if schema inference mode is default.
if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::DEFAULT)
{
auto columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end());
if (columns_from_cache)
return {nullptr, columns_from_cache, format};
}
prev_read_keys_size = read_keys.size();
}
if (getContext()->getSettingsRef().s3_skip_empty_files
&& current_object_info->metadata && current_object_info->metadata->size_bytes == 0)
continue;
/// In union mode, check cached columns only for current key.
if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::UNION)
{
ObjectInfos objects{current_object_info};
if (auto columns_from_cache = tryGetColumnsFromCache(objects.begin(), objects.end()))
{
first = false;
return {nullptr, columns_from_cache, format};
}
}
std::unique_ptr<ReadBuffer> read_buffer = object_storage->readObject(
StoredObject(current_object_info->relative_path),
getContext()->getReadSettings(),
{},
current_object_info->metadata->size_bytes);
if (!getContext()->getSettingsRef().s3_skip_empty_files || !read_buffer->eof())
{
first = false;
read_buffer = wrapReadBufferWithCompressionMethod(
std::move(read_buffer),
chooseCompressionMethod(current_object_info->relative_path, configuration->compression_method),
static_cast<int>(getContext()->getSettingsRef().zstd_window_log_max));
return {std::move(read_buffer), std::nullopt, format};
}
}
else if (getContext()->getSettingsRef().schema_inference_mode == SchemaInferenceMode::UNION)
{
ObjectInfos paths = {current_object_info};
if (auto columns_from_cache = tryGetColumnsFromCache(paths.begin(), paths.end()))
return {nullptr, columns_from_cache};
}
first = false;
chassert(current_object_info->metadata);
std::unique_ptr<ReadBuffer> read_buffer = object_storage->readObject(
StoredObject(current_object_info->relative_path),
getContext()->getReadSettings(),
{},
current_object_info->metadata->size_bytes);
read_buffer = wrapReadBufferWithCompressionMethod(
std::move(read_buffer),
chooseCompressionMethod(current_object_info->relative_path, configuration->compression_method),
static_cast<int>(getContext()->getSettingsRef().zstd_window_log_max));
return {std::move(read_buffer), std::nullopt};
}
}

View File

@ -2,6 +2,7 @@
#include <Interpreters/Context_fwd.h>
#include <Storages/ObjectStorage/StorageObjectStorage_fwd_internal.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Storages/ObjectStorage/StorageObjectStorageQuerySettings.h>
#include <Formats/ReadSchemaUtils.h>
@ -23,7 +24,7 @@ public:
ObjectInfos & read_keys_,
const ContextPtr & context_);
std::pair<std::unique_ptr<ReadBuffer>, std::optional<ColumnsDescription>> next() override;
Data next() override;
void setNumRowsToLastFile(size_t num_rows) override;
@ -33,8 +34,14 @@ public:
String getLastFileName() const override;
void setFormatName(const String & format_name) override;
bool supportsLastReadBufferRecreation() const override { return true; }
std::unique_ptr<ReadBuffer> recreateLastReadBuffer() override;
private:
SchemaCache::Key getKeyForSchemaCache(const String & path) const;
SchemaCache::Key getKeyForSchemaCache(const String & path, const String & format_name) const;
SchemaCache::Keys getPathsForSchemaCache() const;
std::optional<ColumnsDescription> tryGetColumnsFromCache(
const ObjectInfos::iterator & begin, const ObjectInfos::iterator & end);
@ -46,6 +53,7 @@ private:
const StorageObjectStorageSettings query_settings;
SchemaCache & schema_cache;
ObjectInfos & read_keys;
std::optional<String> format;
size_t prev_read_keys_size;
ObjectInfoPtr current_object_info;

View File

@ -7,6 +7,7 @@
#include <Formats/FormatFactory.h>
#include <boost/algorithm/string.hpp>
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
#include <Disks/ObjectStorages/S3/diskSettings.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
@ -58,106 +59,47 @@ void StorageS3Configuration::check(ContextPtr context) const
StorageS3Configuration::StorageS3Configuration(const StorageS3Configuration & other)
{
url = other.url;
auth_settings = other.auth_settings;
request_settings = other.request_settings;
static_configuration = other.static_configuration;
headers_from_ast = other.headers_from_ast;
keys = other.keys;
initialized = other.initialized;
format = other.format;
compression_method = other.compression_method;
structure = other.structure;
}
ObjectStoragePtr StorageS3Configuration::createOrUpdateObjectStorage(ContextPtr context, bool /* is_readonly */) /// NOLINT
ObjectStoragePtr StorageS3Configuration::createObjectStorage(ContextPtr context, bool /* is_readonly */) /// NOLINT
{
auto s3_settings = context->getStorageS3Settings().getSettings(url.uri.toString());
request_settings = s3_settings.request_settings;
request_settings.updateFromSettings(context->getSettings());
const auto & config = context->getConfigRef();
const std::string config_prefix = "s3.";
if (!initialized || (!static_configuration && auth_settings.hasUpdates(s3_settings.auth_settings)))
auto s3_settings = getSettings(config, config_prefix, context);
auth_settings.updateFrom(s3_settings->auth_settings);
s3_settings->auth_settings = auth_settings;
s3_settings->request_settings = request_settings;
if (!headers_from_ast.empty())
{
auth_settings.updateFrom(s3_settings.auth_settings);
keys[0] = url.key;
initialized = true;
s3_settings->auth_settings.headers.insert(
s3_settings->auth_settings.headers.end(),
headers_from_ast.begin(), headers_from_ast.end());
}
const auto & config = context->getConfigRef();
if (auto endpoint_settings = context->getStorageS3Settings().getSettings(url.uri.toString()))
s3_settings->auth_settings.updateFrom(endpoint_settings->auth_settings);
auto client = getClient(config, config_prefix, context, *s3_settings, false, &url);
auto key_generator = createObjectStorageKeysGeneratorAsIsWithPrefix(url.key);
auto s3_capabilities = S3Capabilities
{
.support_batch_delete = config.getBool("s3.support_batch_delete", true),
.support_proxy = config.getBool("s3.support_proxy", config.has("s3.proxy")),
};
auto s3_storage_settings = std::make_unique<S3ObjectStorageSettings>(
request_settings,
config.getUInt64("s3.min_bytes_for_seek", 1024 * 1024),
config.getInt("s3.list_object_keys_size", 1000),
config.getInt("s3.objects_chunk_size_to_delete", 1000),
config.getBool("s3.readonly", false));
auto key_generator = createObjectStorageKeysGeneratorAsIsWithPrefix(url.key);
auto client = createClient(context);
std::string disk_name = "StorageS3";
return std::make_shared<S3ObjectStorage>(
std::move(client), std::move(s3_storage_settings), url, s3_capabilities, key_generator, /*disk_name*/disk_name);
}
std::unique_ptr<S3::Client> StorageS3Configuration::createClient(ContextPtr context)
{
const Settings & global_settings = context->getGlobalContext()->getSettingsRef();
const Settings & local_settings = context->getSettingsRef();
auto client_configuration = S3::ClientFactory::instance().createClientConfiguration(
auth_settings.region,
context->getRemoteHostFilter(),
static_cast<unsigned>(global_settings.s3_max_redirects),
static_cast<unsigned>(global_settings.s3_retry_attempts),
global_settings.enable_s3_requests_logging,
/* for_disk_s3 = */ false,
request_settings.get_request_throttler,
request_settings.put_request_throttler,
url.uri.getScheme());
client_configuration.endpointOverride = url.endpoint;
client_configuration.maxConnections = static_cast<unsigned>(request_settings.max_connections);
client_configuration.http_connection_pool_size = global_settings.s3_http_connection_pool_size;
auto headers = auth_settings.headers;
if (!headers_from_ast.empty())
headers.insert(headers.end(), headers_from_ast.begin(), headers_from_ast.end());
client_configuration.requestTimeoutMs = request_settings.request_timeout_ms;
S3::ClientSettings client_settings{
.use_virtual_addressing = url.is_virtual_hosted_style,
.disable_checksum = local_settings.s3_disable_checksum,
.gcs_issue_compose_request = context->getConfigRef().getBool("s3.gcs_issue_compose_request", false),
};
auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id,
auth_settings.secret_access_key,
auth_settings.session_token);
auto credentials_configuration = S3::CredentialsConfiguration
{
auth_settings.use_environment_credentials.value_or(context->getConfigRef().getBool("s3.use_environment_credentials", true)),
auth_settings.use_insecure_imds_request.value_or(context->getConfigRef().getBool("s3.use_insecure_imds_request", false)),
auth_settings.expiration_window_seconds.value_or(context->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
auth_settings.no_sign_request.value_or(context->getConfigRef().getBool("s3.no_sign_request", false)),
};
return S3::ClientFactory::instance().create(
client_configuration,
client_settings,
credentials.GetAWSAccessKeyId(),
credentials.GetAWSSecretKey(),
auth_settings.server_side_encryption_customer_key_base64,
auth_settings.server_side_encryption_kms_config,
std::move(headers),
credentials_configuration);
std::move(client), std::move(s3_settings), url, s3_capabilities,
key_generator, "StorageS3", false, headers_from_ast);
}
void StorageS3Configuration::fromNamedCollection(const NamedCollection & collection)
@ -185,10 +127,6 @@ void StorageS3Configuration::fromNamedCollection(const NamedCollection & collect
static_configuration = !auth_settings.access_key_id.empty() || auth_settings.no_sign_request.has_value();
keys = {url.key};
//if (format == "auto" && get_format_from_file)
if (format == "auto")
format = FormatFactory::instance().getFormatFromFileName(url.key, true);
}
void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_structure)
@ -386,10 +324,6 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_
auth_settings.no_sign_request = no_sign_request;
keys = {url.key};
// if (format == "auto" && get_format_from_file)
if (format == "auto")
format = FormatFactory::instance().getFormatFromFileName(url.key, true);
}
void StorageS3Configuration::addStructureToArgs(ASTs & args, const String & structure_, ContextPtr context)

View File

@ -27,27 +27,25 @@ public:
String getDataSourceDescription() override;
void check(ContextPtr context) const override;
ObjectStoragePtr createOrUpdateObjectStorage(ContextPtr context, bool is_readonly = true) override; /// NOLINT
StorageObjectStorageConfigurationPtr clone() override { return std::make_shared<StorageS3Configuration>(*this); }
bool isStaticConfiguration() const override { return static_configuration; }
void fromNamedCollection(const NamedCollection & collection) override;
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;
ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly = true) override; /// NOLINT
static void addStructureToArgs(ASTs & args, const String & structure, ContextPtr context);
private:
void fromNamedCollection(const NamedCollection & collection) override;
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;
S3::URI url;
std::vector<String> keys;
S3::AuthSettings auth_settings;
S3Settings::RequestSettings request_settings;
HTTPHeaderEntries headers_from_ast; /// Headers from ast is a part of static configuration.
/// If s3 configuration was passed from ast, then it is static.
/// If from config - it can be changed with config reload.
bool static_configuration = true;
/// Headers from ast is a part of static configuration.
HTTPHeaderEntries headers_from_ast;
std::vector<String> keys;
std::unique_ptr<S3::Client> createClient(ContextPtr context);
bool initialized = false;
};
}

View File

@ -3,6 +3,7 @@
#include <Formats/FormatFactory.h>
#include <Parsers/ASTInsertQuery.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Formats/ReadSchemaUtils.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Transforms/ExtractColumnsTransform.h>
@ -13,8 +14,9 @@
#include <Storages/ObjectStorage/StorageObjectStorageQuerySettings.h>
#include <Storages/ObjectStorage/StorageObjectStorageSink.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Storages/ObjectStorage/ReadBufferIterator.h>
#include <Storages/ObjectStorage/ReadFromStorageObjectStorage.h>
#include <Storages/ObjectStorage/ReadBufferIterator.h>
#include <Storages/Cache/SchemaCache.h>
namespace DB
@ -39,21 +41,24 @@ std::unique_ptr<StorageInMemoryMetadata> getStorageMetadata(
const std::string & engine_name,
const ContextPtr & context)
{
using Storage = StorageObjectStorage<StorageSettings>;
auto storage_metadata = std::make_unique<StorageInMemoryMetadata>();
if (columns.empty())
{
auto fetched_columns = StorageObjectStorage<StorageSettings>::getTableStructureFromData(
object_storage, configuration, format_settings, context);
auto fetched_columns = Storage::getTableStructureFromData(object_storage, configuration, format_settings, context);
storage_metadata->setColumns(fetched_columns);
}
else if (!columns.hasOnlyOrdinary())
{
/// We don't allow special columns.
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table engine {} doesn't support special columns "
"like MATERIALIZED, ALIAS or EPHEMERAL", engine_name);
}
else
{
/// We don't allow special columns.
if (!columns.hasOnlyOrdinary())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Table engine {} doesn't support special columns "
"like MATERIALIZED, ALIAS or EPHEMERAL",
engine_name);
if (configuration->format == "auto")
Storage::setFormatFromData(object_storage, configuration, format_settings, context);
storage_metadata->setColumns(columns);
}
@ -120,14 +125,10 @@ bool StorageObjectStorage<StorageSettings>::parallelizeOutputAfterReading(Contex
}
template <typename StorageSettings>
std::pair<StorageObjectStorageConfigurationPtr, ObjectStoragePtr>
StorageObjectStorage<StorageSettings>::updateConfigurationAndGetCopy(ContextPtr local_context)
void StorageObjectStorage<StorageSettings>::updateConfiguration(ContextPtr context)
{
std::lock_guard lock(configuration_update_mutex);
auto new_object_storage = configuration->createOrUpdateObjectStorage(local_context);
if (new_object_storage)
object_storage = new_object_storage;
return {configuration, object_storage};
if (!configuration->isStaticConfiguration())
object_storage->applyNewSettings(context->getConfigRef(), "s3.", context);
}
template <typename StorageSettings>
@ -151,8 +152,8 @@ void StorageObjectStorage<StorageSettings>::read(
size_t max_block_size,
size_t num_streams)
{
auto [query_configuration, query_object_storage] = updateConfigurationAndGetCopy(local_context);
if (partition_by && query_configuration->withWildcard())
updateConfiguration(local_context);
if (partition_by && configuration->withWildcard())
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Reading from a partitioned {} storage is not implemented yet",
@ -165,8 +166,8 @@ void StorageObjectStorage<StorageSettings>::read(
&& local_context->getSettingsRef().optimize_count_from_files;
auto read_step = std::make_unique<ReadFromStorageObejctStorage>(
query_object_storage,
query_configuration,
object_storage,
configuration,
getName(),
virtual_columns,
format_settings,
@ -192,10 +193,10 @@ SinkToStoragePtr StorageObjectStorage<StorageSettings>::write(
ContextPtr local_context,
bool /* async_insert */)
{
auto [query_configuration, query_object_storage] = updateConfigurationAndGetCopy(local_context);
updateConfiguration(local_context);
const auto sample_block = metadata_snapshot->getSampleBlock();
if (query_configuration->withWildcard())
if (configuration->withWildcard())
{
ASTPtr partition_by_ast = nullptr;
if (auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query))
@ -209,24 +210,28 @@ SinkToStoragePtr StorageObjectStorage<StorageSettings>::write(
if (partition_by_ast)
{
return std::make_shared<PartitionedStorageObjectStorageSink>(
object_storage, query_configuration, format_settings, sample_block, local_context, partition_by_ast);
object_storage, configuration, format_settings, sample_block, local_context, partition_by_ast);
}
}
if (query_configuration->withGlobs())
if (configuration->withGlobs())
{
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
"{} key '{}' contains globs, so the table is in readonly mode",
getName(), query_configuration->getPath());
getName(), configuration->getPath());
}
const auto storage_settings = StorageSettings::create(local_context->getSettingsRef());
LOG_TEST(&Poco::Logger::get("KSSENII"), "KSSENII: {}", object_storage->exists(StoredObject(configuration->getPath())));
auto configuration_copy = configuration->clone();
if (!storage_settings.truncate_on_insert
&& object_storage->exists(StoredObject(query_configuration->getPath())))
&& object_storage->exists(StoredObject(configuration->getPath())))
{
LOG_TEST(&Poco::Logger::get("KSSENII"), "KSSENII 2: {}", storage_settings.create_new_file_on_insert);
if (storage_settings.create_new_file_on_insert)
{
auto & paths = query_configuration->getPaths();
auto & paths = configuration_copy->getPaths();
size_t index = paths.size();
const auto & first_key = paths[0];
auto pos = first_key.find_first_of('.');
@ -243,6 +248,7 @@ SinkToStoragePtr StorageObjectStorage<StorageSettings>::write(
while (object_storage->exists(StoredObject(new_key)));
paths.push_back(new_key);
configuration->getPaths().push_back(new_key);
}
else
{
@ -251,12 +257,13 @@ SinkToStoragePtr StorageObjectStorage<StorageSettings>::write(
"Object in bucket {} with key {} already exists. "
"If you want to overwrite it, enable setting [engine_name]_truncate_on_insert, if you "
"want to create a new file on each insert, enable setting [engine_name]_create_new_file_on_insert",
query_configuration->getNamespace(), query_configuration->getPaths().back());
configuration_copy->getNamespace(), configuration_copy->getPaths().back());
}
}
LOG_TEST(&Poco::Logger::get("KSSENII"), "KSSENII 3: {}", configuration_copy->getPaths().size());
return std::make_shared<StorageObjectStorageSink>(
object_storage, query_configuration, format_settings, sample_block, local_context);
object_storage, configuration_copy, format_settings, sample_block, local_context);
}
template <typename StorageSettings>
@ -279,25 +286,55 @@ void StorageObjectStorage<StorageSettings>::truncate(
}
template <typename StorageSettings>
ColumnsDescription StorageObjectStorage<StorageSettings>::getTableStructureFromData(
ObjectStoragePtr object_storage,
std::unique_ptr<ReadBufferIterator> StorageObjectStorage<StorageSettings>::createReadBufferIterator(
const ObjectStoragePtr & object_storage,
const ConfigurationPtr & configuration,
const std::optional<FormatSettings> & format_settings,
ContextPtr context)
ObjectInfos & read_keys,
const ContextPtr & context)
{
ObjectInfos read_keys;
const auto settings = StorageSettings::create(context->getSettingsRef());
auto file_iterator = StorageObjectStorageSource::createFileIterator(
configuration, object_storage, /* distributed_processing */false,
context, /* predicate */{}, /* virtual_columns */{}, &read_keys, settings.list_object_keys_size,
StorageSettings::ObjectStorageThreads(), StorageSettings::ObjectStorageThreadsActive(), StorageSettings::ObjectStorageThreadsScheduled());
ReadBufferIterator read_buffer_iterator(
return std::make_unique<ReadBufferIterator>(
object_storage, configuration, file_iterator,
format_settings, StorageSettings::create(context->getSettingsRef()), getSchemaCache(context), read_keys, context);
}
const bool retry = configuration->withGlobs();
return readSchemaFromFormat(configuration->format, format_settings, read_buffer_iterator, retry, context);
template <typename StorageSettings>
ColumnsDescription StorageObjectStorage<StorageSettings>::getTableStructureFromData(
const ObjectStoragePtr & object_storage,
const ConfigurationPtr & configuration,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context)
{
ObjectInfos read_keys;
auto read_buffer_iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context);
if (configuration->format == "auto")
{
auto [columns, format] = detectFormatAndReadSchema(format_settings, *read_buffer_iterator, context);
configuration->format = format;
return columns;
}
else
{
return readSchemaFromFormat(configuration->format, format_settings, *read_buffer_iterator, context);
}
}
template <typename StorageSettings>
void StorageObjectStorage<StorageSettings>::setFormatFromData(
const ObjectStoragePtr & object_storage,
const ConfigurationPtr & configuration,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context)
{
ObjectInfos read_keys;
auto read_buffer_iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context);
configuration->format = detectFormatAndReadSchema(format_settings, *read_buffer_iterator, context).second;
}
template class StorageObjectStorage<S3StorageSettings>;

View File

@ -21,6 +21,7 @@ using ReadTaskCallback = std::function<String()>;
class IOutputFormat;
class IInputFormat;
class SchemaCache;
class ReadBufferIterator;
template <typename StorageSettings>
@ -89,13 +90,26 @@ public:
static SchemaCache & getSchemaCache(const ContextPtr & context);
static ColumnsDescription getTableStructureFromData(
ObjectStoragePtr object_storage,
const ObjectStoragePtr & object_storage,
const ConfigurationPtr & configuration,
const std::optional<FormatSettings> & format_settings,
ContextPtr context);
const ContextPtr & context);
static void setFormatFromData(
const ObjectStoragePtr & object_storage,
const ConfigurationPtr & configuration,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context);
protected:
virtual std::pair<ConfigurationPtr, ObjectStoragePtr> updateConfigurationAndGetCopy(ContextPtr local_context);
virtual void updateConfiguration(ContextPtr local_context);
static std::unique_ptr<ReadBufferIterator> createReadBufferIterator(
const ObjectStoragePtr & object_storage,
const ConfigurationPtr & configuration,
const std::optional<FormatSettings> & format_settings,
ObjectInfos & read_keys,
const ContextPtr & context);
const std::string engine_name;
const NamesAndTypesList virtual_columns;

View File

@ -33,12 +33,10 @@ StorageObjectStorageCluster<Definition, StorageSettings, Configuration>::Storage
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_,
bool structure_argument_was_provided_)
ContextPtr context_)
: IStorageCluster(cluster_name_,
table_id_,
getLogger(fmt::format("{}({})", engine_name_, table_id_.table_name)),
structure_argument_was_provided_)
getLogger(fmt::format("{}({})", engine_name_, table_id_.table_name)))
, engine_name(engine_name_)
, configuration{configuration_}
, object_storage(object_storage_)
@ -48,13 +46,16 @@ StorageObjectStorageCluster<Definition, StorageSettings, Configuration>::Storage
if (columns_.empty())
{
/// `format_settings` is set to std::nullopt, because StorageObjectStorageCluster is used only as table function
auto columns = StorageObjectStorage<StorageSettings>::getTableStructureFromData(
object_storage, configuration, /*format_settings=*/std::nullopt, context_);
ColumnsDescription columns = Storage::getTableStructureFromData(object_storage, configuration, /*format_settings=*/std::nullopt, context_);
storage_metadata.setColumns(columns);
}
else
{
if (configuration->format == "auto")
StorageS3::setFormatFromData(object_storage, configuration, /*format_settings=*/std::nullopt, context_);
storage_metadata.setColumns(columns_);
}
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
@ -64,9 +65,9 @@ StorageObjectStorageCluster<Definition, StorageSettings, Configuration>::Storage
}
template <typename Definition, typename StorageSettings, typename Configuration>
void StorageObjectStorageCluster<Definition, StorageSettings, Configuration>::addColumnsStructureToQuery(
void StorageObjectStorageCluster<Definition, StorageSettings, Configuration>::updateQueryToSendIfNeeded(
ASTPtr & query,
const String & structure,
const DB::StorageSnapshotPtr & storage_snapshot,
const ContextPtr & context)
{
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query);
@ -76,13 +77,18 @@ void StorageObjectStorageCluster<Definition, StorageSettings, Configuration>::ad
"Expected SELECT query from table function {}, got '{}'",
engine_name, queryToString(query));
}
using TableFunction = TableFunctionObjectStorageCluster<Definition, StorageSettings, Configuration>;
TableFunction::addColumnsStructureToArguments(expression_list->children, structure, context);
TableFunction::updateStructureAndFormatArgumentsIfNeeded(
expression_list->children,
storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(),
configuration->format,
context);
}
template <typename Definition, typename StorageSettings, typename Configuration>
RemoteQueryExecutor::Extension
StorageObjectStorageCluster<Definition, StorageSettings, Configuration>::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & local_context) const
StorageObjectStorageCluster<Definition, StorageSettings, Configuration>::getTaskIteratorExtension(
const ActionsDAG::Node * predicate, const ContextPtr & local_context) const
{
const auto settings = StorageSettings::create(local_context->getSettingsRef());
auto iterator = std::make_shared<StorageObjectStorageSource::GlobIterator>(

View File

@ -21,6 +21,7 @@ class StorageObjectStorageCluster : public IStorageCluster
{
public:
using Storage = StorageObjectStorage<StorageSettings>;
using TableFunction = TableFunctionObjectStorageCluster<Definition, StorageSettings, Configuration>;
StorageObjectStorageCluster(
const String & cluster_name_,
@ -30,8 +31,7 @@ public:
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_,
bool structure_argument_was_provided_);
ContextPtr context_);
std::string getName() const override { return engine_name; }
@ -49,9 +49,9 @@ public:
private:
void updateBeforeRead(const ContextPtr & /* context */) override {}
void addColumnsStructureToQuery(
void updateQueryToSendIfNeeded(
ASTPtr & query,
const String & structure,
const StorageSnapshotPtr & storage_snapshot,
const ContextPtr & context) override;
const String & engine_name;

View File

@ -1,5 +1,5 @@
#include <Storages/ObjectStorage/StorageObjectStorageConfiguration.h>
#include <Formats/FormatFactory.h>
namespace DB
{
@ -14,6 +14,10 @@ void StorageObjectStorageConfiguration::initialize(
configuration.fromNamedCollection(*named_collection);
else
configuration.fromAST(engine_args, local_context, with_table_structure);
// FIXME: it should be - if (format == "auto" && get_format_from_file)
if (configuration.format == "auto")
configuration.format = FormatFactory::instance().tryGetFormatFromFileName(configuration.getPath()).value_or("auto");
}
bool StorageObjectStorageConfiguration::withWildcard() const

View File

@ -39,8 +39,9 @@ public:
std::string getPathWithoutGlob() const;
virtual void check(ContextPtr context) const = 0;
virtual ObjectStoragePtr createOrUpdateObjectStorage(ContextPtr context, bool is_readonly = true) = 0; /// NOLINT
virtual ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly = true) = 0; /// NOLINT
virtual StorageObjectStorageConfigurationPtr clone() = 0;
virtual bool isStaticConfiguration() const { return true; }
String format = "auto";
String compression_method = "auto";

View File

@ -5,10 +5,14 @@
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorageQuerySettings.h>
#include <Storages/ObjectStorage/StorageObjectStorage_fwd_internal.h>
#include <Processors/Formats/IInputFormat.h>
namespace DB
{
class SchemaCache;
class StorageObjectStorageSource : public SourceWithKeyCondition, WithContext
{
friend class StorageS3QueueSource;

View File

@ -56,7 +56,7 @@ static std::shared_ptr<StorageObjectStorage<StorageSettings>> createStorageObjec
return std::make_shared<StorageObjectStorage<StorageSettings>>(
configuration,
configuration->createOrUpdateObjectStorage(context),
configuration->createObjectStorage(context),
engine_name,
args.getContext(),
args.table_id,

View File

@ -134,7 +134,7 @@ StorageS3Queue::StorageS3Queue(
checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef());
object_storage = configuration->createOrUpdateObjectStorage(context_);
object_storage = configuration->createObjectStorage(context_);
FormatFactory::instance().checkFormatName(configuration->format);
configuration->check(context_);
@ -146,8 +146,10 @@ StorageS3Queue::StorageS3Queue(
}
else
{
if (configuration.format == "auto")
configuration.format = StorageS3::getTableStructureAndFormatFromData(configuration, format_settings, context_).second;
if (configuration->format == "auto")
{
StorageObjectStorage<S3StorageSettings>::setFormatFromData(object_storage, configuration, format_settings, context_);
}
storage_metadata.setColumns(columns_);
}

View File

@ -21,7 +21,7 @@ namespace ErrorCodes
S3Settings::RequestSettings::PartUploadSettings::PartUploadSettings(const Settings & settings)
{
updateFromSettingsImpl(settings, false);
updateFromSettings(settings, false);
validate();
}
@ -66,7 +66,7 @@ S3Settings::RequestSettings::PartUploadSettings::PartUploadSettings(const NamedC
validate();
}
void S3Settings::RequestSettings::PartUploadSettings::updateFromSettingsImpl(const Settings & settings, bool if_changed)
void S3Settings::RequestSettings::PartUploadSettings::updateFromSettings(const Settings & settings, bool if_changed)
{
if (!if_changed || settings.s3_strict_upload_part_size.changed)
strict_upload_part_size = settings.s3_strict_upload_part_size;
@ -263,13 +263,12 @@ void S3Settings::RequestSettings::updateFromSettingsImpl(const Settings & settin
request_timeout_ms = settings.s3_request_timeout_ms;
}
void S3Settings::RequestSettings::updateFromSettings(const Settings & settings)
void S3Settings::RequestSettings::updateFromSettingsIfChanged(const Settings & settings)
{
updateFromSettingsImpl(settings, true);
upload_settings.updateFromSettings(settings);
upload_settings.updateFromSettings(settings, true);
}
void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config, const Settings & settings)
{
std::lock_guard lock(mutex);
@ -293,7 +292,7 @@ void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::U
}
}
S3Settings StorageS3Settings::getSettings(const String & endpoint) const
std::optional<S3Settings> StorageS3Settings::getSettings(const String & endpoint) const
{
std::lock_guard lock(mutex);
auto next_prefix_setting = s3_settings.upper_bound(endpoint);

View File

@ -39,7 +39,7 @@ struct S3Settings
size_t max_single_operation_copy_size = 5ULL * 1024 * 1024 * 1024;
String storage_class_name;
void updateFromSettings(const Settings & settings) { updateFromSettingsImpl(settings, true); }
void updateFromSettings(const Settings & settings, bool if_changed);
void validate();
private:
@ -52,8 +52,6 @@ struct S3Settings
const Settings & settings,
String setting_name_prefix = {});
void updateFromSettingsImpl(const Settings & settings, bool if_changed);
friend struct RequestSettings;
};
@ -96,7 +94,7 @@ struct S3Settings
const Settings & settings,
String setting_name_prefix = {});
void updateFromSettings(const Settings & settings);
void updateFromSettingsIfChanged(const Settings & settings);
private:
void updateFromSettingsImpl(const Settings & settings, bool if_changed);
@ -112,7 +110,7 @@ class StorageS3Settings
public:
void loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config, const Settings & settings);
S3Settings getSettings(const String & endpoint) const;
std::optional<S3Settings> getSettings(const String & endpoint) const;
private:
mutable std::mutex mutex;

View File

@ -31,7 +31,7 @@ ObjectStoragePtr TableFunctionObjectStorage<
Definition, StorageSettings, Configuration>::getObjectStorage(const ContextPtr & context, bool create_readonly) const
{
if (!object_storage)
object_storage = configuration->createOrUpdateObjectStorage(context, create_readonly);
object_storage = configuration->createObjectStorage(context, create_readonly);
return object_storage;
}
@ -63,8 +63,8 @@ std::vector<size_t> TableFunctionObjectStorage<
}
template <typename Definition, typename StorageSettings, typename Configuration>
void TableFunctionObjectStorage<
Definition, StorageSettings, Configuration>::addColumnsStructureToArguments(ASTs & args, const String & structure, const ContextPtr & context)
void TableFunctionObjectStorage<Definition, StorageSettings, Configuration>::updateStructureAndFormatArgumentsIfNeeded(
ASTs & args, const String & structure, const String & /* format */, const ContextPtr & context)
{
Configuration::addStructureToArgs(args, structure, context);
}

View File

@ -110,7 +110,11 @@ public:
virtual void parseArgumentsImpl(ASTs & args, const ContextPtr & context);
static void addColumnsStructureToArguments(ASTs & args, const String & structure, const ContextPtr & context);
static void updateStructureAndFormatArgumentsIfNeeded(
ASTs & args,
const String & structure,
const String & format,
const ContextPtr & context);
protected:
using ConfigurationPtr = StorageObjectStorageConfigurationPtr;

View File

@ -20,12 +20,10 @@ StoragePtr TableFunctionObjectStorageCluster<Definition, StorageSettings, Config
const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
{
using Base = TableFunctionObjectStorage<Definition, StorageSettings, Configuration>;
auto configuration = Base::getConfiguration();
bool structure_argument_was_provided = configuration->structure != "auto";
ColumnsDescription columns;
if (structure_argument_was_provided)
if (configuration->structure != "auto")
columns = parseColumnsListFromString(configuration->structure, context);
else if (!Base::structure_hint.empty())
columns = Base::structure_hint;
@ -58,8 +56,7 @@ StoragePtr TableFunctionObjectStorageCluster<Definition, StorageSettings, Config
StorageID(Base::getDatabaseName(), table_name),
columns,
ConstraintsDescription{},
context,
structure_argument_was_provided);
context);
}
storage->startup();