This commit is contained in:
kssenii 2024-04-28 12:18:24 +02:00
parent 91014456b5
commit 671650bd2e
11 changed files with 38 additions and 51 deletions

View File

@ -36,7 +36,7 @@ BackupReaderAzureBlobStorage::BackupReaderAzureBlobStorage(
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.container, false, false}
, configuration(configuration_)
{
auto client_ptr = configuration.createClient(/* is_read_only */ false, /* attempt_to_create_container */true);
auto client_ptr = configuration.createClient(/* is_readonly */false, /* attempt_to_create_container */true);
object_storage = std::make_unique<AzureObjectStorage>("BackupReaderAzureBlobStorage",
std::move(client_ptr),
configuration.createSettings(context_),
@ -121,7 +121,7 @@ BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage(
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.container, false, false}
, configuration(configuration_)
{
auto client_ptr = configuration.createClient(/* is_read_only */ false, attempt_to_create_container);
auto client_ptr = configuration.createClient(/* is_readonly */false, attempt_to_create_container);
object_storage = std::make_unique<AzureObjectStorage>("BackupWriterAzureBlobStorage",
std::move(client_ptr),
configuration.createSettings(context_),

View File

@ -3,7 +3,6 @@
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
#include <Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <filesystem>
@ -36,20 +35,25 @@ public:
const Paths & getPaths() const override { return blobs_paths; }
void setPaths(const Paths & paths) override { blobs_paths = paths; }
String getDataSourceDescription() override { return std::filesystem::path(connection_url) / container; }
String getNamespace() const override { return container; }
String getDataSourceDescription() override { return std::filesystem::path(connection_url) / container; }
StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr &) const override;
void check(ContextPtr context) const override;
ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly = true) override; /// NOLINT
ConfigurationPtr clone() override { return std::make_shared<StorageAzureConfiguration>(*this); }
void fromNamedCollection(const NamedCollection & collection) override;
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;
ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly) override;
void addStructureAndFormatToArgs(
ASTs & args, const String & structure_, const String & format_, ContextPtr context) override;
ASTs & args,
const String & structure_,
const String & format_,
ContextPtr context) override;
protected:
void fromNamedCollection(const NamedCollection & collection) override;
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;
using AzureClient = Azure::Storage::Blobs::BlobContainerClient;
using AzureClientPtr = std::unique_ptr<Azure::Storage::Blobs::BlobContainerClient>;

View File

@ -38,7 +38,7 @@ public:
std::optional<FormatSettings> format_settings_,
LoadingStrictnessLevel mode)
{
auto object_storage = base_configuration->createObjectStorage(context);
auto object_storage = base_configuration->createObjectStorage(context, /* is_readonly */true);
DataLakeMetadataPtr metadata;
NamesAndTypesList schema_from_metadata;
@ -96,8 +96,6 @@ public:
void updateConfiguration(ContextPtr local_context) override
{
std::lock_guard lock(Storage::configuration_update_mutex);
Storage::updateConfiguration(local_context);
auto new_metadata = DataLakeMetadata::create(Storage::object_storage, base_configuration, local_context);

View File

@ -28,19 +28,22 @@ public:
const Paths & getPaths() const override { return paths; }
void setPaths(const Paths & paths_) override { paths = paths_; }
std::string getPathWithoutGlobs() const override;
String getNamespace() const override { return ""; }
String getDataSourceDescription() override { return url; }
StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr &) const override;
void check(ContextPtr context) const override;
ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly = true) override; /// NOLINT
ConfigurationPtr clone() override { return std::make_shared<StorageHDFSConfiguration>(*this); }
void addStructureAndFormatToArgs(
ASTs & args, const String & structure_, const String & format_, ContextPtr context) override;
ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly) override;
std::string getPathWithoutGlobs() const override;
void addStructureAndFormatToArgs(
ASTs & args,
const String & structure_,
const String & format_,
ContextPtr context) override;
private:
void fromNamedCollection(const NamedCollection &) override;

View File

@ -3,7 +3,6 @@
#include "config.h"
#if USE_AWS_S3
#include <Storages/StorageS3Settings.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
@ -35,13 +34,16 @@ public:
void check(ContextPtr context) const override;
void validateNamespace(const String & name) const override;
ConfigurationPtr clone() override { return std::make_shared<StorageS3Configuration>(*this); }
bool isStaticConfiguration() const override { return static_configuration; }
ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly = true) override; /// NOLINT
ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly) override;
void addStructureAndFormatToArgs(
ASTs & args, const String & structure, const String & format, ContextPtr context) override;
ASTs & args,
const String & structure,
const String & format,
ContextPtr context) override;
private:
void fromNamedCollection(const NamedCollection & collection) override;

View File

@ -91,6 +91,7 @@ bool StorageObjectStorage::supportsSubsetOfColumns(const ContextPtr & context) c
void StorageObjectStorage::updateConfiguration(ContextPtr context)
{
/// FIXME: we should be able to update everything apart from client if static_configuration == true.
if (!configuration->isStaticConfiguration())
object_storage->applyNewSettings(context->getConfigRef(), "s3.", context);
}
@ -113,7 +114,6 @@ public:
const std::optional<DB::FormatSettings> & format_settings_,
bool distributed_processing_,
ReadFromFormatInfo info_,
SchemaCache & schema_cache_,
const bool need_only_count_,
ContextPtr context_,
size_t max_block_size_,
@ -121,11 +121,9 @@ public:
: SourceStepWithFilter(DataStream{.header = info_.source_header}, columns_to_read, query_info_, storage_snapshot_, context_)
, object_storage(object_storage_)
, configuration(configuration_)
, schema_cache(schema_cache_)
, info(std::move(info_))
, virtual_columns(virtual_columns_)
, format_settings(format_settings_)
, query_settings(configuration->getQuerySettings(context_))
, name(name_ + "Source")
, need_only_count(need_only_count_)
, max_block_size(max_block_size_)
@ -154,8 +152,8 @@ public:
for (size_t i = 0; i < num_streams; ++i)
{
auto source = std::make_shared<StorageObjectStorageSource>(
getName(), object_storage, configuration, info, format_settings, query_settings,
context, max_block_size, iterator_wrapper, need_only_count, schema_cache);
getName(), object_storage, configuration, info, format_settings,
context, max_block_size, iterator_wrapper, need_only_count);
source->setKeyCondition(filter_actions_dag, context);
pipes.emplace_back(std::move(source));
@ -175,12 +173,10 @@ private:
ObjectStoragePtr object_storage;
ConfigurationPtr configuration;
std::shared_ptr<StorageObjectStorageSource::IIterator> iterator_wrapper;
SchemaCache & schema_cache;
const ReadFromFormatInfo info;
const NamesAndTypesList virtual_columns;
const std::optional<DB::FormatSettings> format_settings;
const StorageObjectStorage::QuerySettings query_settings;
const String name;
const bool need_only_count;
const size_t max_block_size;
@ -233,7 +229,6 @@ void StorageObjectStorage::read(
format_settings,
distributed_processing,
read_from_format_info,
getSchemaCache(local_context),
need_only_count,
local_context,
max_block_size,
@ -371,11 +366,6 @@ std::pair<ColumnsDescription, std::string> StorageObjectStorage::resolveSchemaAn
return std::pair(columns, format);
}
SchemaCache & StorageObjectStorage::getSchemaCache(const ContextPtr & context)
{
return getSchemaCache(context, configuration->getTypeName());
}
SchemaCache & StorageObjectStorage::getSchemaCache(const ContextPtr & context, const std::string & storage_type_name)
{
if (storage_type_name == "s3")

View File

@ -92,8 +92,6 @@ public:
bool parallelizeOutputAfterReading(ContextPtr context) const override;
SchemaCache & getSchemaCache(const ContextPtr & context);
static SchemaCache & getSchemaCache(const ContextPtr & context, const std::string & storage_type_name);
static ColumnsDescription resolveSchemaFromData(
@ -132,7 +130,6 @@ protected:
const bool distributed_processing;
LoggerPtr log;
std::mutex configuration_update_mutex;
};
class StorageObjectStorage::Configuration
@ -175,7 +172,7 @@ public:
virtual void check(ContextPtr context) const;
virtual void validateNamespace(const String & /* name */) const {}
virtual ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly = true) = 0; /// NOLINT
virtual ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly) = 0;
virtual ConfigurationPtr clone() = 0;
virtual bool isStaticConfiguration() const { return true; }

View File

@ -44,19 +44,16 @@ StorageObjectStorageSource::StorageObjectStorageSource(
ConfigurationPtr configuration_,
const ReadFromFormatInfo & info,
std::optional<FormatSettings> format_settings_,
const StorageObjectStorage::QuerySettings & query_settings_,
ContextPtr context_,
UInt64 max_block_size_,
std::shared_ptr<IIterator> file_iterator_,
bool need_only_count_,
SchemaCache & schema_cache_)
bool need_only_count_)
: SourceWithKeyCondition(info.source_header, false)
, WithContext(context_)
, name(std::move(name_))
, object_storage(object_storage_)
, configuration(configuration_)
, format_settings(format_settings_)
, query_settings(query_settings_)
, max_block_size(max_block_size_)
, need_only_count(need_only_count_)
, read_from_format_info(info)
@ -67,7 +64,7 @@ StorageObjectStorageSource::StorageObjectStorageSource(
1/* max_threads */))
, columns_desc(info.columns_description)
, file_iterator(file_iterator_)
, schema_cache(schema_cache_)
, schema_cache(StorageObjectStorage::getSchemaCache(context_, configuration->getTypeName()))
, create_reader_scheduler(threadPoolCallbackRunnerUnsafe<ReaderHolder>(*create_reader_pool, "Reader"))
{
}
@ -229,6 +226,8 @@ std::optional<size_t> StorageObjectStorageSource::tryGetNumRowsFromCache(const O
StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReader(size_t processor)
{
ObjectInfoPtr object_info;
auto query_settings = configuration->getQuerySettings(getContext());
do
{
object_info = file_iterator->next(processor);

View File

@ -32,12 +32,10 @@ public:
ConfigurationPtr configuration,
const ReadFromFormatInfo & info,
std::optional<FormatSettings> format_settings_,
const StorageObjectStorage::QuerySettings & query_settings_,
ContextPtr context_,
UInt64 max_block_size_,
std::shared_ptr<IIterator> file_iterator_,
bool need_only_count_,
SchemaCache & schema_cache_);
bool need_only_count_);
~StorageObjectStorageSource() override;
@ -62,7 +60,6 @@ protected:
ObjectStoragePtr object_storage;
const ConfigurationPtr configuration;
const std::optional<FormatSettings> format_settings;
const StorageObjectStorage::QuerySettings query_settings;
const UInt64 max_block_size;
const bool need_only_count;
const ReadFromFormatInfo read_from_format_info;

View File

@ -2,7 +2,6 @@
#include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ObjectStorage/HDFS/Configuration.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/StorageFactory.h>
#include <Formats/FormatFactory.h>
@ -59,7 +58,7 @@ static std::shared_ptr<StorageObjectStorage> createStorageObjectStorage(
return std::make_shared<StorageObjectStorage>(
configuration,
configuration->createObjectStorage(context),
configuration->createObjectStorage(context, /* is_readonly */false),
args.getContext(),
args.table_id,
args.columns,

View File

@ -138,7 +138,7 @@ StorageS3Queue::StorageS3Queue(
checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef());
object_storage = configuration->createObjectStorage(context_);
object_storage = configuration->createObjectStorage(context_, /* is_readonly */true);
FormatFactory::instance().checkFormatName(configuration->format);
configuration->check(context_);
@ -361,12 +361,10 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
configuration,
info,
format_settings,
configuration->getQuerySettings(local_context),
local_context,
max_block_size,
file_iterator,
false,
StorageObjectStorage::getSchemaCache(local_context, configuration->getTypeName()));
false);
auto file_deleter = [=, this](const std::string & path) mutable
{