diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 0f25397a961..983e737991c 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -168,6 +168,9 @@ M(ObjectStorageS3Threads, "Number of threads in the S3ObjectStorage thread pool.") \ M(ObjectStorageS3ThreadsActive, "Number of threads in the S3ObjectStorage thread pool running a task.") \ M(ObjectStorageS3ThreadsScheduled, "Number of queued or active jobs in the S3ObjectStorage thread pool.") \ + M(StorageObjectStorageThreads, "Number of threads in the remote table engines thread pools.") \ + M(StorageObjectStorageThreadsActive, "Number of threads in the remote table engines thread pool running a task.") \ + M(StorageObjectStorageThreadsScheduled, "Number of queued or active jobs in remote table engines thread pool.") \ M(ObjectStorageAzureThreads, "Number of threads in the AzureObjectStorage thread pool.") \ M(ObjectStorageAzureThreadsActive, "Number of threads in the AzureObjectStorage thread pool running a task.") \ M(ObjectStorageAzureThreadsScheduled, "Number of queued or active jobs in the AzureObjectStorage thread pool.") \ diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index 27b2a9460b7..af9dc08e8c7 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -53,6 +53,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -489,17 +492,17 @@ BlockIO InterpreterSystemQuery::execute() StorageFile::getSchemaCache(getContext()).clear(); #if USE_AWS_S3 if (caches_to_drop.contains("S3")) - StorageS3::getSchemaCache(getContext()).clear(); + StorageObjectStorage::getSchemaCache(getContext(), StorageS3Configuration::type_name).clear(); #endif #if USE_HDFS if (caches_to_drop.contains("HDFS")) - StorageHDFS::getSchemaCache(getContext()).clear(); + StorageObjectStorage::getSchemaCache(getContext(), StorageHDFSConfiguration::type_name).clear(); #endif if (caches_to_drop.contains("URL")) StorageURL::getSchemaCache(getContext()).clear(); #if USE_AZURE_BLOB_STORAGE if (caches_to_drop.contains("AZURE")) - StorageAzureBlob::getSchemaCache(getContext()).clear(); + StorageObjectStorage::getSchemaCache(getContext(), StorageAzureBlobConfiguration::type_name).clear(); #endif break; } diff --git a/src/Storages/ObjectStorage/AzureBlob/Configuration.cpp b/src/Storages/ObjectStorage/AzureBlob/Configuration.cpp index 4b826a0c721..c9bc59d62aa 100644 --- a/src/Storages/ObjectStorage/AzureBlob/Configuration.cpp +++ b/src/Storages/ObjectStorage/AzureBlob/Configuration.cpp @@ -101,6 +101,21 @@ AzureObjectStorage::SettingsPtr StorageAzureBlobConfiguration::createSettings(Co return settings_ptr; } +StorageObjectStorage::QuerySettings StorageAzureBlobConfiguration::getQuerySettings(const ContextPtr & context) const +{ + const auto & settings = context->getSettingsRef(); + return StorageObjectStorage::QuerySettings{ + .truncate_on_insert = settings.azure_truncate_on_insert, + .create_new_file_on_insert = settings.azure_create_new_file_on_insert, + .schema_inference_use_cache = settings.schema_inference_use_cache_for_azure, + .schema_inference_mode = settings.schema_inference_mode, + .skip_empty_files = settings.s3_skip_empty_files, /// TODO: add setting for azure + .list_object_keys_size = settings.azure_list_object_keys_size, + .throw_on_zero_files_match = settings.s3_throw_on_zero_files_match, + .ignore_non_existent_file = settings.azure_ignore_file_doesnt_exist, + }; +} + ObjectStoragePtr StorageAzureBlobConfiguration::createObjectStorage(ContextPtr context, bool is_readonly) /// NOLINT { assertInitialized(); diff --git a/src/Storages/ObjectStorage/AzureBlob/Configuration.h b/src/Storages/ObjectStorage/AzureBlob/Configuration.h index c12ff81197d..7e105ea82b5 100644 --- a/src/Storages/ObjectStorage/AzureBlob/Configuration.h +++ b/src/Storages/ObjectStorage/AzureBlob/Configuration.h @@ -18,9 +18,15 @@ class StorageAzureBlobConfiguration : public StorageObjectStorageConfiguration friend void registerBackupEngineAzureBlobStorage(BackupFactory & factory); public: + static constexpr auto type_name = "azure"; + static constexpr auto engine_name = "Azure"; + StorageAzureBlobConfiguration() = default; StorageAzureBlobConfiguration(const StorageAzureBlobConfiguration & other); + std::string getTypeName() const override { return type_name; } + std::string getEngineName() const override { return engine_name; } + Path getPath() const override { return blob_path; } void setPath(const Path & path) override { blob_path = path; } @@ -30,6 +36,7 @@ public: String getDataSourceDescription() override { return fs::path(connection_url) / container; } String getNamespace() const override { return container; } + StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr &) const override; void check(ContextPtr context) const override; ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly = true) override; /// NOLINT @@ -37,8 +44,8 @@ public: void fromNamedCollection(const NamedCollection & collection) override; void fromAST(ASTs & args, ContextPtr context, bool with_structure) override; - static void addStructureAndFormatToArgs( - ASTs & args, const String & structure_, const String & format_, ContextPtr context); + void addStructureAndFormatToArgs( + ASTs & args, const String & structure_, const String & format_, ContextPtr context) override; protected: using AzureClient = Azure::Storage::Blobs::BlobContainerClient; diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp index d0f203b32bd..c6590ba8d43 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp @@ -184,7 +184,7 @@ struct DeltaLakeMetadata::Impl * * We need to get "version", which is the version of the checkpoint we need to read. */ - size_t readLastCheckpointIfExists() + size_t readLastCheckpointIfExists() const { const auto last_checkpoint_file = fs::path(configuration->getPath()) / deltalake_metadata_directory / "_last_checkpoint"; if (!object_storage->exists(StoredObject(last_checkpoint_file))) diff --git a/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h b/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h index 64228e880f8..e1851775925 100644 --- a/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h +++ b/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h @@ -21,17 +21,16 @@ namespace DB /// Storage for read-only integration with Apache Iceberg tables in Amazon S3 (see https://iceberg.apache.org/) /// Right now it's implemented on top of StorageS3 and right now it doesn't support /// many Iceberg features like schema evolution, partitioning, positional and equality deletes. -template -class IStorageDataLake final : public StorageObjectStorage +template +class IStorageDataLake final : public StorageObjectStorage { public: - using Storage = StorageObjectStorage; + using Storage = StorageObjectStorage; using ConfigurationPtr = Storage::ConfigurationPtr; static StoragePtr create( ConfigurationPtr base_configuration, ContextPtr context, - const String & engine_name_, const StorageID & table_id_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, @@ -64,9 +63,9 @@ public: tryLogCurrentException(__PRETTY_FUNCTION__); } - return std::make_shared>( + return std::make_shared>( base_configuration, std::move(metadata), configuration, object_storage, - engine_name_, context, table_id_, + context, table_id_, columns_.empty() ? ColumnsDescription(schema_from_metadata) : columns_, constraints_, comment_, format_settings_); } @@ -133,9 +132,9 @@ private: DataLakeMetadataPtr current_metadata; }; -using StorageIceberg = IStorageDataLake; -using StorageDeltaLake = IStorageDataLake; -using StorageHudi = IStorageDataLake; +using StorageIceberg = IStorageDataLake; +using StorageDeltaLake = IStorageDataLake; +using StorageHudi = IStorageDataLake; } diff --git a/src/Storages/ObjectStorage/DataLakes/registerDataLakeStorages.cpp b/src/Storages/ObjectStorage/DataLakes/registerDataLakeStorages.cpp index d11dd1ca836..a5170e5ed6b 100644 --- a/src/Storages/ObjectStorage/DataLakes/registerDataLakeStorages.cpp +++ b/src/Storages/ObjectStorage/DataLakes/registerDataLakeStorages.cpp @@ -6,7 +6,6 @@ #include #include #include -#include namespace DB @@ -24,7 +23,7 @@ void registerStorageIceberg(StorageFactory & factory) StorageObjectStorageConfiguration::initialize(*configuration, args.engine_args, args.getLocalContext(), false); return StorageIceberg::create( - configuration, args.getContext(), "Iceberg", args.table_id, args.columns, + configuration, args.getContext(), args.table_id, args.columns, args.constraints, args.comment, std::nullopt, args.mode); }, { @@ -47,7 +46,7 @@ void registerStorageDeltaLake(StorageFactory & factory) StorageObjectStorageConfiguration::initialize(*configuration, args.engine_args, args.getLocalContext(), false); return StorageDeltaLake::create( - configuration, args.getContext(), "DeltaLake", args.table_id, args.columns, + configuration, args.getContext(), args.table_id, args.columns, args.constraints, args.comment, std::nullopt, args.mode); }, { @@ -68,7 +67,7 @@ void registerStorageHudi(StorageFactory & factory) StorageObjectStorageConfiguration::initialize(*configuration, args.engine_args, args.getLocalContext(), false); return StorageHudi::create( - configuration, args.getContext(), "Hudi", args.table_id, args.columns, + configuration, args.getContext(), args.table_id, args.columns, args.constraints, args.comment, std::nullopt, args.mode); }, { diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.cpp b/src/Storages/ObjectStorage/HDFS/Configuration.cpp index 84f0a7bfe9f..0062ac969ac 100644 --- a/src/Storages/ObjectStorage/HDFS/Configuration.cpp +++ b/src/Storages/ObjectStorage/HDFS/Configuration.cpp @@ -60,6 +60,20 @@ std::string StorageHDFSConfiguration::getPathWithoutGlob() const return "/"; return path.substr(0, end_of_path_without_globs); } +StorageObjectStorage::QuerySettings StorageHDFSConfiguration::getQuerySettings(const ContextPtr & context) const +{ + const auto & settings = context->getSettingsRef(); + return StorageObjectStorage::QuerySettings{ + .truncate_on_insert = settings.hdfs_truncate_on_insert, + .create_new_file_on_insert = settings.hdfs_create_new_file_on_insert, + .schema_inference_use_cache = settings.schema_inference_use_cache_for_hdfs, + .schema_inference_mode = settings.schema_inference_mode, + .skip_empty_files = settings.hdfs_skip_empty_files, /// TODO: add setting for hdfs + .list_object_keys_size = settings.s3_list_object_keys_size, /// TODO: add a setting for hdfs + .throw_on_zero_files_match = settings.s3_throw_on_zero_files_match, + .ignore_non_existent_file = settings.hdfs_ignore_file_doesnt_exist, + }; +} void StorageHDFSConfiguration::fromAST(ASTs & args, ContextPtr context, bool with_structure) { diff --git a/src/Storages/ObjectStorage/HDFS/Configuration.h b/src/Storages/ObjectStorage/HDFS/Configuration.h index 23a7e8e4549..0a502857153 100644 --- a/src/Storages/ObjectStorage/HDFS/Configuration.h +++ b/src/Storages/ObjectStorage/HDFS/Configuration.h @@ -13,9 +13,15 @@ namespace DB class StorageHDFSConfiguration : public StorageObjectStorageConfiguration { public: + static constexpr auto type_name = "hdfs"; + static constexpr auto engine_name = "HDFS"; + StorageHDFSConfiguration() = default; StorageHDFSConfiguration(const StorageHDFSConfiguration & other); + std::string getTypeName() const override { return type_name; } + std::string getEngineName() const override { return engine_name; } + Path getPath() const override { return path; } void setPath(const Path & path_) override { path = path_; } @@ -25,13 +31,14 @@ public: String getNamespace() const override { return ""; } String getDataSourceDescription() override { return url; } + StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr &) const override; void check(ContextPtr context) const override; ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly = true) override; /// NOLINT StorageObjectStorageConfigurationPtr clone() override { return std::make_shared(*this); } - static void addStructureAndFormatToArgs( - ASTs & args, const String & structure_, const String & format_, ContextPtr context); + void addStructureAndFormatToArgs( + ASTs & args, const String & structure_, const String & format_, ContextPtr context) override; std::string getPathWithoutGlob() const override; diff --git a/src/Storages/ObjectStorage/ReadBufferIterator.cpp b/src/Storages/ObjectStorage/ReadBufferIterator.cpp index 0b6e34fb831..f8ce90a2b1f 100644 --- a/src/Storages/ObjectStorage/ReadBufferIterator.cpp +++ b/src/Storages/ObjectStorage/ReadBufferIterator.cpp @@ -1,5 +1,4 @@ #include -#include #include #include @@ -19,7 +18,6 @@ ReadBufferIterator::ReadBufferIterator( ConfigurationPtr configuration_, const FileIterator & file_iterator_, const std::optional & format_settings_, - const StorageObjectStorageSettings & query_settings_, SchemaCache & schema_cache_, ObjectInfos & read_keys_, const ContextPtr & context_) @@ -28,7 +26,7 @@ ReadBufferIterator::ReadBufferIterator( , configuration(configuration_) , file_iterator(file_iterator_) , format_settings(format_settings_) - , query_settings(query_settings_) + , query_settings(configuration->getQuerySettings(context_)) , schema_cache(schema_cache_) , read_keys(read_keys_) , format(configuration->format == "auto" ? std::nullopt : std::optional(configuration->format)) diff --git a/src/Storages/ObjectStorage/ReadBufferIterator.h b/src/Storages/ObjectStorage/ReadBufferIterator.h index 053bcbf894f..2d58e1c789e 100644 --- a/src/Storages/ObjectStorage/ReadBufferIterator.h +++ b/src/Storages/ObjectStorage/ReadBufferIterator.h @@ -2,7 +2,6 @@ #include #include #include -#include #include @@ -19,7 +18,6 @@ public: ConfigurationPtr configuration_, const FileIterator & file_iterator_, const std::optional & format_settings_, - const StorageObjectStorageSettings & query_settings_, SchemaCache & schema_cache_, ObjectInfos & read_keys_, const ContextPtr & context_); @@ -50,7 +48,7 @@ private: const ConfigurationPtr configuration; const FileIterator file_iterator; const std::optional & format_settings; - const StorageObjectStorageSettings query_settings; + const StorageObjectStorage::QuerySettings query_settings; SchemaCache & schema_cache; ObjectInfos & read_keys; std::optional format; diff --git a/src/Storages/ObjectStorage/ReadFromStorageObjectStorage.cpp b/src/Storages/ObjectStorage/ReadFromObjectStorageStep.cpp similarity index 62% rename from src/Storages/ObjectStorage/ReadFromStorageObjectStorage.cpp rename to src/Storages/ObjectStorage/ReadFromObjectStorageStep.cpp index 89d33191f41..f19e01cdc3e 100644 --- a/src/Storages/ObjectStorage/ReadFromStorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/ReadFromObjectStorageStep.cpp @@ -1,11 +1,11 @@ -#include +#include #include #include namespace DB { -ReadFromStorageObejctStorage::ReadFromStorageObejctStorage( +ReadFromObjectStorageStep::ReadFromObjectStorageStep( ObjectStoragePtr object_storage_, ConfigurationPtr configuration_, const String & name_, @@ -14,49 +14,41 @@ ReadFromStorageObejctStorage::ReadFromStorageObejctStorage( const SelectQueryInfo & query_info_, const StorageSnapshotPtr & storage_snapshot_, const std::optional & format_settings_, - const StorageObjectStorageSettings & query_settings_, bool distributed_processing_, ReadFromFormatInfo info_, SchemaCache & schema_cache_, const bool need_only_count_, ContextPtr context_, size_t max_block_size_, - size_t num_streams_, - CurrentMetrics::Metric metric_threads_count_, - CurrentMetrics::Metric metric_threads_active_, - CurrentMetrics::Metric metric_threads_scheduled_) + size_t num_streams_) : SourceStepWithFilter(DataStream{.header = info_.source_header}, columns_to_read, query_info_, storage_snapshot_, context_) , object_storage(object_storage_) , configuration(configuration_) , info(std::move(info_)) , virtual_columns(virtual_columns_) , format_settings(format_settings_) - , query_settings(query_settings_) + , query_settings(configuration->getQuerySettings(context_)) , schema_cache(schema_cache_) , name(name_ + "Source") , need_only_count(need_only_count_) , max_block_size(max_block_size_) , num_streams(num_streams_) , distributed_processing(distributed_processing_) - , metric_threads_count(metric_threads_count_) - , metric_threads_active(metric_threads_active_) - , metric_threads_scheduled(metric_threads_scheduled_) { } -void ReadFromStorageObejctStorage::createIterator(const ActionsDAG::Node * predicate) +void ReadFromObjectStorageStep::createIterator(const ActionsDAG::Node * predicate) { if (!iterator_wrapper) { auto context = getContext(); iterator_wrapper = StorageObjectStorageSource::createFileIterator( - configuration, object_storage, query_settings, distributed_processing, - context, predicate, virtual_columns, nullptr, metric_threads_count, - metric_threads_active, metric_threads_scheduled, context->getFileProgressCallback()); + configuration, object_storage, distributed_processing, + context, predicate, virtual_columns, nullptr, context->getFileProgressCallback()); } } -void ReadFromStorageObejctStorage::applyFilters(ActionDAGNodes added_filter_nodes) +void ReadFromObjectStorageStep::applyFilters(ActionDAGNodes added_filter_nodes) { filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes); const ActionsDAG::Node * predicate = nullptr; @@ -66,7 +58,7 @@ void ReadFromStorageObejctStorage::applyFilters(ActionDAGNodes added_filter_node createIterator(predicate); } -void ReadFromStorageObejctStorage::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) +void ReadFromObjectStorageStep::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { createIterator(nullptr); auto context = getContext(); @@ -74,13 +66,9 @@ void ReadFromStorageObejctStorage::initializePipeline(QueryPipelineBuilder & pip Pipes pipes; for (size_t i = 0; i < num_streams; ++i) { - auto threadpool = std::make_shared( - metric_threads_count, metric_threads_active, metric_threads_scheduled, /* max_threads */1); - auto source = std::make_shared( getName(), object_storage, configuration, info, format_settings, query_settings, - context, max_block_size, iterator_wrapper, need_only_count, schema_cache, - std::move(threadpool), metric_threads_count, metric_threads_active, metric_threads_scheduled); + context, max_block_size, iterator_wrapper, need_only_count, schema_cache); source->setKeyCondition(filter_actions_dag, context); pipes.emplace_back(std::move(source)); diff --git a/src/Storages/ObjectStorage/ReadFromStorageObjectStorage.h b/src/Storages/ObjectStorage/ReadFromObjectStorageStep.h similarity index 70% rename from src/Storages/ObjectStorage/ReadFromStorageObjectStorage.h rename to src/Storages/ObjectStorage/ReadFromObjectStorageStep.h index c0dd02d75f8..d98ebfef1f2 100644 --- a/src/Storages/ObjectStorage/ReadFromStorageObjectStorage.h +++ b/src/Storages/ObjectStorage/ReadFromObjectStorageStep.h @@ -1,17 +1,16 @@ #pragma once -#include -#include #include +#include namespace DB { -class ReadFromStorageObejctStorage : public SourceStepWithFilter +class ReadFromObjectStorageStep : public SourceStepWithFilter { public: using ConfigurationPtr = StorageObjectStorageConfigurationPtr; - ReadFromStorageObejctStorage( + ReadFromObjectStorageStep( ObjectStoragePtr object_storage_, ConfigurationPtr configuration_, const String & name_, @@ -20,17 +19,13 @@ public: const SelectQueryInfo & query_info_, const StorageSnapshotPtr & storage_snapshot_, const std::optional & format_settings_, - const StorageObjectStorageSettings & query_settings_, bool distributed_processing_, ReadFromFormatInfo info_, SchemaCache & schema_cache_, bool need_only_count_, ContextPtr context_, size_t max_block_size_, - size_t num_streams_, - CurrentMetrics::Metric metric_threads_count_, - CurrentMetrics::Metric metric_threads_active_, - CurrentMetrics::Metric metric_threads_scheduled_); + size_t num_streams_); std::string getName() const override { return name; } @@ -46,16 +41,13 @@ private: const ReadFromFormatInfo info; const NamesAndTypesList virtual_columns; const std::optional format_settings; - const StorageObjectStorageSettings query_settings; + const StorageObjectStorage::QuerySettings query_settings; SchemaCache & schema_cache; const String name; const bool need_only_count; const size_t max_block_size; const size_t num_streams; const bool distributed_processing; - const CurrentMetrics::Metric metric_threads_count; - const CurrentMetrics::Metric metric_threads_active; - const CurrentMetrics::Metric metric_threads_scheduled; void createIterator(const ActionsDAG::Node * predicate); }; diff --git a/src/Storages/ObjectStorage/S3/Configuration.cpp b/src/Storages/ObjectStorage/S3/Configuration.cpp index 4c9e49d0705..139d9004f8e 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.cpp +++ b/src/Storages/ObjectStorage/S3/Configuration.cpp @@ -70,6 +70,21 @@ StorageS3Configuration::StorageS3Configuration(const StorageS3Configuration & ot keys = other.keys; } +StorageObjectStorage::QuerySettings StorageS3Configuration::getQuerySettings(const ContextPtr & context) const +{ + const auto & settings = context->getSettingsRef(); + return StorageObjectStorage::QuerySettings{ + .truncate_on_insert = settings.s3_truncate_on_insert, + .create_new_file_on_insert = settings.s3_create_new_file_on_insert, + .schema_inference_use_cache = settings.schema_inference_use_cache_for_s3, + .schema_inference_mode = settings.schema_inference_mode, + .skip_empty_files = settings.s3_skip_empty_files, + .list_object_keys_size = settings.s3_list_object_keys_size, + .throw_on_zero_files_match = settings.s3_throw_on_zero_files_match, + .ignore_non_existent_file = settings.s3_ignore_file_doesnt_exist, + }; +} + ObjectStoragePtr StorageS3Configuration::createObjectStorage(ContextPtr context, bool /* is_readonly */) /// NOLINT { assertInitialized(); diff --git a/src/Storages/ObjectStorage/S3/Configuration.h b/src/Storages/ObjectStorage/S3/Configuration.h index ff5e8680e66..de4a6d17579 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.h +++ b/src/Storages/ObjectStorage/S3/Configuration.h @@ -7,6 +7,7 @@ #include #include #include +#include namespace DB { @@ -14,9 +15,14 @@ namespace DB class StorageS3Configuration : public StorageObjectStorageConfiguration { public: + static constexpr auto type_name = "s3"; + StorageS3Configuration() = default; StorageS3Configuration(const StorageS3Configuration & other); + std::string getTypeName() const override { return type_name; } + std::string getEngineName() const override { return url.storage_name; } + Path getPath() const override { return url.key; } void setPath(const Path & path) override { url.key = path; } @@ -26,6 +32,7 @@ public: String getNamespace() const override { return url.bucket; } String getDataSourceDescription() override; + StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr &) const override; void check(ContextPtr context) const override; void validateNamespace(const String & name) const override; @@ -34,8 +41,8 @@ public: bool isStaticConfiguration() const override { return static_configuration; } ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly = true) override; /// NOLINT - static void addStructureAndFormatToArgs( - ASTs & args, const String & structure, const String & format, ContextPtr context); + void addStructureAndFormatToArgs( + ASTs & args, const String & structure, const String & format, ContextPtr context) override; private: void fromNamedCollection(const NamedCollection & collection) override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 13f3557d927..441639629a3 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -11,10 +11,9 @@ #include #include #include -#include #include #include -#include +#include #include #include #include @@ -25,53 +24,13 @@ namespace DB namespace ErrorCodes { - extern const int BAD_ARGUMENTS; extern const int DATABASE_ACCESS_DENIED; extern const int NOT_IMPLEMENTED; } -template -std::unique_ptr getStorageMetadata( - ObjectStoragePtr object_storage, - const StorageObjectStorageConfigurationPtr & configuration, - const ColumnsDescription & columns, - const ConstraintsDescription & constraints, - std::optional format_settings, - const String & comment, - const std::string & engine_name, - const ContextPtr & context) -{ - using Storage = StorageObjectStorage; - - auto storage_metadata = std::make_unique(); - if (columns.empty()) - { - auto fetched_columns = Storage::getTableStructureFromData(object_storage, configuration, format_settings, context); - storage_metadata->setColumns(fetched_columns); - } - else if (!columns.hasOnlyOrdinary()) - { - /// We don't allow special columns. - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table engine {} doesn't support special columns " - "like MATERIALIZED, ALIAS or EPHEMERAL", engine_name); - } - else - { - if (configuration->format == "auto") - Storage::setFormatFromData(object_storage, configuration, format_settings, context); - - storage_metadata->setColumns(columns); - } - storage_metadata->setConstraints(constraints); - storage_metadata->setComment(comment); - return storage_metadata; -} - -template -StorageObjectStorage::StorageObjectStorage( +StorageObjectStorage::StorageObjectStorage( ConfigurationPtr configuration_, ObjectStoragePtr object_storage_, - const String & engine_name_, ContextPtr context, const StorageID & table_id_, const ColumnsDescription & columns_, @@ -80,16 +39,13 @@ StorageObjectStorage::StorageObjectStorage( std::optional format_settings_, bool distributed_processing_, ASTPtr partition_by_) - : IStorage(table_id_, getStorageMetadata( - object_storage_, configuration_, columns_, constraints_, format_settings_, - comment, engine_name, context)) - , engine_name(engine_name_) + : IStorage(table_id_) + , configuration(configuration_) + , object_storage(object_storage_) , format_settings(format_settings_) , partition_by(partition_by_) , distributed_processing(distributed_processing_) - , log(getLogger("Storage" + engine_name_)) - , object_storage(object_storage_) - , configuration(configuration_) + , log(getLogger(fmt::format("Storage{}({})", configuration->getEngineName(), table_id_.getFullTableName()))) { FormatFactory::instance().checkFormatName(configuration->format); configuration->check(context); @@ -98,46 +54,41 @@ StorageObjectStorage::StorageObjectStorage( for (const auto & key : configuration->getPaths()) objects.emplace_back(key); - setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(getInMemoryMetadataPtr()->getColumns())); + auto metadata = getStorageMetadata( + object_storage_, configuration_, columns_, + constraints_, format_settings_, comment, context); + + setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.getColumns())); + setInMemoryMetadata(std::move(metadata)); } -template -bool StorageObjectStorage::prefersLargeBlocks() const +String StorageObjectStorage::getName() const +{ + return configuration->getEngineName(); +} + +bool StorageObjectStorage::prefersLargeBlocks() const { return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(configuration->format); } -template -bool StorageObjectStorage::parallelizeOutputAfterReading(ContextPtr context) const +bool StorageObjectStorage::parallelizeOutputAfterReading(ContextPtr context) const { return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration->format, context); } -template -bool StorageObjectStorage::supportsSubsetOfColumns(const ContextPtr & context) const +bool StorageObjectStorage::supportsSubsetOfColumns(const ContextPtr & context) const { return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->format, context, format_settings); } -template -void StorageObjectStorage::updateConfiguration(ContextPtr context) +void StorageObjectStorage::updateConfiguration(ContextPtr context) { if (!configuration->isStaticConfiguration()) object_storage->applyNewSettings(context->getConfigRef(), "s3.", context); } -template -SchemaCache & StorageObjectStorage::getSchemaCache(const ContextPtr & context) -{ - static SchemaCache schema_cache( - context->getConfigRef().getUInt( - StorageSettings::SCHEMA_CACHE_MAX_ELEMENTS_CONFIG_SETTING, - DEFAULT_SCHEMA_CACHE_ELEMENTS)); - return schema_cache; -} - -template -void StorageObjectStorage::read( +void StorageObjectStorage::read( QueryPlan & query_plan, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, @@ -155,13 +106,12 @@ void StorageObjectStorage::read( getName()); } - const auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context)); + const auto read_from_format_info = prepareReadingFromFormat( + column_names, storage_snapshot, supportsSubsetOfColumns(local_context)); const bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty()) && local_context->getSettingsRef().optimize_count_from_files; - LOG_TEST(&Poco::Logger::get("KSSENII"), "KSSENII SOURCE HEADER: {}", read_from_format_info.source_header.dumpStructure()); - LOG_TEST(&Poco::Logger::get("KSSENII"), "KSSENII FORMAT HEADER: {}", read_from_format_info.format_header.dumpStructure()); - auto read_step = std::make_unique( + auto read_step = std::make_unique( object_storage, configuration, getName(), @@ -170,23 +120,18 @@ void StorageObjectStorage::read( query_info, storage_snapshot, format_settings, - StorageSettings::create(local_context->getSettingsRef()), distributed_processing, std::move(read_from_format_info), getSchemaCache(local_context), need_only_count, local_context, max_block_size, - num_streams, - StorageSettings::ObjectStorageThreads(), - StorageSettings::ObjectStorageThreadsActive(), - StorageSettings::ObjectStorageThreadsScheduled()); + num_streams); query_plan.addStep(std::move(read_step)); } -template -SinkToStoragePtr StorageObjectStorage::write( +SinkToStoragePtr StorageObjectStorage::write( const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, @@ -194,7 +139,7 @@ SinkToStoragePtr StorageObjectStorage::write( { updateConfiguration(local_context); const auto sample_block = metadata_snapshot->getSampleBlock(); - const auto & query_settings = StorageSettings::create(local_context->getSettingsRef()); + const auto & settings = configuration->getQuerySettings(local_context); if (configuration->withWildcard()) { @@ -209,23 +154,22 @@ SinkToStoragePtr StorageObjectStorage::write( if (partition_by_ast) { - LOG_TEST(log, "Using PartitionedSink for {}", configuration->getPath()); return std::make_shared( - object_storage, configuration, query_settings, - format_settings, sample_block, local_context, partition_by_ast); + object_storage, configuration, format_settings, sample_block, local_context, partition_by_ast); } } if (configuration->withGlobs()) { - throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, - "{} key '{}' contains globs, so the table is in readonly mode", - getName(), configuration->getPath()); + throw Exception( + ErrorCodes::DATABASE_ACCESS_DENIED, + "{} key '{}' contains globs, so the table is in readonly mode", + getName(), configuration->getPath()); } auto & paths = configuration->getPaths(); if (auto new_key = checkAndGetNewFileOnInsertIfNeeded( - *object_storage, *configuration, query_settings, paths.front(), paths.size())) + *object_storage, *configuration, settings, paths.front(), paths.size())) { paths.push_back(*new_key); } @@ -238,9 +182,11 @@ SinkToStoragePtr StorageObjectStorage::write( local_context); } -template -void StorageObjectStorage::truncate( - const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) +void StorageObjectStorage::truncate( + const ASTPtr &, + const StorageMetadataPtr &, + ContextPtr, + TableExclusiveLockHolder &) { if (configuration->withGlobs()) { @@ -257,34 +203,37 @@ void StorageObjectStorage::truncate( object_storage->removeObjectsIfExist(objects); } -template -std::unique_ptr StorageObjectStorage::createReadBufferIterator( +std::unique_ptr StorageObjectStorage::createReadBufferIterator( const ObjectStoragePtr & object_storage, const ConfigurationPtr & configuration, const std::optional & format_settings, ObjectInfos & read_keys, const ContextPtr & context) { - const auto settings = StorageSettings::create(context->getSettingsRef()); auto file_iterator = StorageObjectStorageSource::createFileIterator( - configuration, object_storage, settings, /* distributed_processing */false, - context, /* predicate */{}, /* virtual_columns */{}, &read_keys, - StorageSettings::ObjectStorageThreads(), StorageSettings::ObjectStorageThreadsActive(), StorageSettings::ObjectStorageThreadsScheduled()); + configuration, + object_storage, + false/* distributed_processing */, + context, + {}/* predicate */, + {}/* virtual_columns */, + &read_keys); return std::make_unique( object_storage, configuration, file_iterator, - format_settings, StorageSettings::create(context->getSettingsRef()), getSchemaCache(context), read_keys, context); + format_settings, getSchemaCache(context, configuration->getTypeName()), read_keys, context); } -template -ColumnsDescription StorageObjectStorage::getTableStructureFromData( +ColumnsDescription StorageObjectStorage::getTableStructureFromData( const ObjectStoragePtr & object_storage, const ConfigurationPtr & configuration, const std::optional & format_settings, const ContextPtr & context) { ObjectInfos read_keys; - auto read_buffer_iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context); + auto read_buffer_iterator = createReadBufferIterator( + object_storage, configuration, format_settings, read_keys, context); + if (configuration->format == "auto") { auto [columns, format] = detectFormatAndReadSchema(format_settings, *read_buffer_iterator, context); @@ -297,20 +246,34 @@ ColumnsDescription StorageObjectStorage::getTableStructureFromD } } -template -void StorageObjectStorage::setFormatFromData( +void StorageObjectStorage::setFormatFromData( const ObjectStoragePtr & object_storage, const ConfigurationPtr & configuration, const std::optional & format_settings, const ContextPtr & context) { ObjectInfos read_keys; - auto read_buffer_iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context); + auto read_buffer_iterator = createReadBufferIterator( + object_storage, configuration, format_settings, read_keys, context); configuration->format = detectFormatAndReadSchema(format_settings, *read_buffer_iterator, context).second; } -template class StorageObjectStorage; -template class StorageObjectStorage; -template class StorageObjectStorage; +SchemaCache & StorageObjectStorage::getSchemaCache(const ContextPtr & context) +{ + static SchemaCache schema_cache( + context->getConfigRef().getUInt( + "schema_inference_cache_max_elements_for_" + configuration->getTypeName(), + DEFAULT_SCHEMA_CACHE_ELEMENTS)); + return schema_cache; +} + +SchemaCache & StorageObjectStorage::getSchemaCache(const ContextPtr & context, const std::string & storage_type_name) +{ + static SchemaCache schema_cache( + context->getConfigRef().getUInt( + "schema_inference_cache_max_elements_for_" + storage_type_name, + DEFAULT_SCHEMA_CACHE_ELEMENTS)); + return schema_cache; +} } diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index a2112f7ed01..3dbe010e406 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -1,31 +1,22 @@ #pragma once - -#include #include #include -#include #include #include #include - namespace DB { -struct SelectQueryInfo; class StorageObjectStorageConfiguration; -struct S3StorageSettings; -struct HDFSStorageSettings; -struct AzureStorageSettings; -class PullingPipelineExecutor; -using ReadTaskCallback = std::function; -class IOutputFormat; -class IInputFormat; -class SchemaCache; class ReadBufferIterator; +class SchemaCache; - -template +/** + * A general class containing implementation for external table engines + * such as StorageS3, StorageAzure, StorageHDFS. + * Works with an object of IObjectStorage class. + */ class StorageObjectStorage : public IStorage { public: @@ -35,10 +26,26 @@ public: using ObjectInfoPtr = std::shared_ptr; using ObjectInfos = std::vector; + struct QuerySettings + { + /// Insert settings: + bool truncate_on_insert; + bool create_new_file_on_insert; + + /// Schema inference settings: + bool schema_inference_use_cache; + SchemaInferenceMode schema_inference_mode; + + /// List settings: + bool skip_empty_files; + size_t list_object_keys_size; + bool throw_on_zero_files_match; + bool ignore_non_existent_file; + }; + StorageObjectStorage( ConfigurationPtr configuration_, ObjectStoragePtr object_storage_, - const String & engine_name_, ContextPtr context_, const StorageID & table_id_, const ColumnsDescription & columns_, @@ -48,17 +55,17 @@ public: bool distributed_processing_ = false, ASTPtr partition_by_ = nullptr); - String getName() const override { return engine_name; } + String getName() const override; void read( QueryPlan & query_plan, - const Names &, - const StorageSnapshotPtr &, - SelectQueryInfo &, - ContextPtr, - QueryProcessingStage::Enum, - size_t, - size_t) override; + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr local_context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + size_t num_streams) override; SinkToStoragePtr write( const ASTPtr & query, @@ -84,7 +91,9 @@ public: bool parallelizeOutputAfterReading(ContextPtr context) const override; - static SchemaCache & getSchemaCache(const ContextPtr & context); + SchemaCache & getSchemaCache(const ContextPtr & context); + + static SchemaCache & getSchemaCache(const ContextPtr & context, const std::string & storage_type_name); static ColumnsDescription getTableStructureFromData( const ObjectStoragePtr & object_storage, @@ -108,19 +117,15 @@ protected: ObjectInfos & read_keys, const ContextPtr & context); + ConfigurationPtr configuration; + const ObjectStoragePtr object_storage; const std::string engine_name; - std::optional format_settings; + const std::optional format_settings; const ASTPtr partition_by; const bool distributed_processing; LoggerPtr log; - ObjectStoragePtr object_storage; - ConfigurationPtr configuration; std::mutex configuration_update_mutex; }; -using StorageS3 = StorageObjectStorage; -using StorageAzureBlob = StorageObjectStorage; -using StorageHDFS = StorageObjectStorage; - } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index f023bb068d4..72a35ae33eb 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -15,6 +15,7 @@ #include #include #include +#include namespace DB { @@ -24,47 +25,34 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -template -StorageObjectStorageCluster::StorageObjectStorageCluster( +StorageObjectStorageCluster::StorageObjectStorageCluster( const String & cluster_name_, - const Storage::ConfigurationPtr & configuration_, + ConfigurationPtr configuration_, ObjectStoragePtr object_storage_, - const String & engine_name_, const StorageID & table_id_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, ContextPtr context_) - : IStorageCluster(cluster_name_, - table_id_, - getLogger(fmt::format("{}({})", engine_name_, table_id_.table_name))) - , engine_name(engine_name_) + : IStorageCluster( + cluster_name_, table_id_, getLogger(fmt::format("{}({})", configuration_->getEngineName(), table_id_.table_name))) , configuration{configuration_} , object_storage(object_storage_) { configuration->check(context_); - StorageInMemoryMetadata storage_metadata; + auto metadata = getStorageMetadata( + object_storage, configuration, columns_, constraints_, + {}/* format_settings */, ""/* comment */, context_); - if (columns_.empty()) - { - ColumnsDescription columns = Storage::getTableStructureFromData(object_storage, configuration, /*format_settings=*/std::nullopt, context_); - storage_metadata.setColumns(columns); - } - else - { - if (configuration->format == "auto") - StorageS3::setFormatFromData(object_storage, configuration, /*format_settings=*/std::nullopt, context_); - - storage_metadata.setColumns(columns_); - } - - storage_metadata.setConstraints(constraints_); - setInMemoryMetadata(storage_metadata); - - setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns())); + setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.getColumns())); + setInMemoryMetadata(std::move(metadata)); } -template -void StorageObjectStorageCluster::updateQueryToSendIfNeeded( +std::string StorageObjectStorageCluster::getName() const +{ + return configuration->getEngineName(); +} + +void StorageObjectStorageCluster::updateQueryToSendIfNeeded( ASTPtr & query, const DB::StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) @@ -72,24 +60,32 @@ void StorageObjectStorageCluster::up ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query); if (!expression_list) { - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Expected SELECT query from table function {}, got '{}'", - engine_name, queryToString(query)); + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Expected SELECT query from table function {}, got '{}'", + configuration->getEngineName(), queryToString(query)); } - TableFunction::updateStructureAndFormatArgumentsIfNeeded( - expression_list->children, - storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), - configuration->format, - context); + ASTs & args = expression_list->children; + const auto & structure = storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(); + if (args.empty()) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Unexpected empty list of arguments for {}Cluster table function", + configuration->getEngineName()); + } + + ASTPtr cluster_name_arg = args.front(); + args.erase(args.begin()); + configuration->addStructureAndFormatToArgs(args, structure, configuration->format, context); + args.insert(args.begin(), cluster_name_arg); } -template -RemoteQueryExecutor::Extension -StorageObjectStorageCluster::getTaskIteratorExtension( +RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( const ActionsDAG::Node * predicate, const ContextPtr & local_context) const { - const auto settings = StorageSettings::create(local_context->getSettingsRef()); + const auto settings = configuration->getQuerySettings(local_context); auto iterator = std::make_shared( object_storage, configuration, predicate, virtual_columns, local_context, nullptr, settings.list_object_keys_size, settings.throw_on_zero_files_match, @@ -106,17 +102,4 @@ StorageObjectStorageCluster::getTask return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; } - -#if USE_AWS_S3 -template class StorageObjectStorageCluster; -#endif - -#if USE_AZURE_BLOB_STORAGE -template class StorageObjectStorageCluster; -#endif - -#if USE_HDFS -template class StorageObjectStorageCluster; -#endif - } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index ac894e14f24..2db8f5c352e 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -11,32 +11,25 @@ namespace DB { -class StorageS3Settings; -class StorageAzureBlobSettings; - class Context; -template class StorageObjectStorageCluster : public IStorageCluster { public: - using Storage = StorageObjectStorage; - using TableFunction = TableFunctionObjectStorageCluster; + using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr; StorageObjectStorageCluster( const String & cluster_name_, - const Storage::ConfigurationPtr & configuration_, + ConfigurationPtr configuration_, ObjectStoragePtr object_storage_, - const String & engine_name_, const StorageID & table_id_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, ContextPtr context_); - std::string getName() const override { return engine_name; } + std::string getName() const override; - RemoteQueryExecutor::Extension - getTaskIteratorExtension( + RemoteQueryExecutor::Extension getTaskIteratorExtension( const ActionsDAG::Node * predicate, const ContextPtr & context) const override; @@ -53,20 +46,9 @@ private: const ContextPtr & context) override; const String engine_name; - const Storage::ConfigurationPtr configuration; + const StorageObjectStorage::ConfigurationPtr configuration; const ObjectStoragePtr object_storage; NamesAndTypesList virtual_columns; }; - -#if USE_AWS_S3 -using StorageS3Cluster = StorageObjectStorageCluster; -#endif -#if USE_AZURE_BLOB_STORAGE -using StorageAzureBlobCluster = StorageObjectStorageCluster; -#endif -#if USE_HDFS -using StorageHDFSCluster = StorageObjectStorageCluster; -#endif - } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h index 647575aaa90..34965174bf9 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h @@ -1,6 +1,7 @@ #pragma once #include #include +#include "StorageObjectStorage.h" #include namespace fs = std::filesystem; @@ -27,6 +28,9 @@ public: ContextPtr local_context, bool with_table_structure); + virtual std::string getTypeName() const = 0; + virtual std::string getEngineName() const = 0; + virtual Path getPath() const = 0; virtual void setPath(const Path & path) = 0; @@ -36,6 +40,9 @@ public: virtual String getDataSourceDescription() = 0; virtual String getNamespace() const = 0; + virtual StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr &) const = 0; + virtual void addStructureAndFormatToArgs( + ASTs & args, const String & structure_, const String & format_, ContextPtr context) = 0; bool withWildcard() const; bool withGlobs() const { return isPathWithGlobs() || isNamespaceWithGlobs(); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageQuerySettings.h b/src/Storages/ObjectStorage/StorageObjectStorageQuerySettings.h deleted file mode 100644 index 606456011c3..00000000000 --- a/src/Storages/ObjectStorage/StorageObjectStorageQuerySettings.h +++ /dev/null @@ -1,102 +0,0 @@ -#pragma once -#include -#include -#include - -namespace CurrentMetrics -{ - extern const Metric ObjectStorageAzureThreads; - extern const Metric ObjectStorageAzureThreadsActive; - extern const Metric ObjectStorageAzureThreadsScheduled; - - extern const Metric ObjectStorageS3Threads; - extern const Metric ObjectStorageS3ThreadsActive; - extern const Metric ObjectStorageS3ThreadsScheduled; -} - -namespace DB -{ - -struct StorageObjectStorageSettings -{ - bool truncate_on_insert; - bool create_new_file_on_insert; - bool schema_inference_use_cache; - SchemaInferenceMode schema_inference_mode; - bool skip_empty_files; - size_t list_object_keys_size; - bool throw_on_zero_files_match; - bool ignore_non_existent_file; -}; - -struct S3StorageSettings -{ - static StorageObjectStorageSettings create(const Settings & settings) - { - return StorageObjectStorageSettings{ - .truncate_on_insert = settings.s3_truncate_on_insert, - .create_new_file_on_insert = settings.s3_create_new_file_on_insert, - .schema_inference_use_cache = settings.schema_inference_use_cache_for_s3, - .schema_inference_mode = settings.schema_inference_mode, - .skip_empty_files = settings.s3_skip_empty_files, - .list_object_keys_size = settings.s3_list_object_keys_size, - .throw_on_zero_files_match = settings.s3_throw_on_zero_files_match, - .ignore_non_existent_file = settings.s3_ignore_file_doesnt_exist, - }; - } - - static constexpr auto SCHEMA_CACHE_MAX_ELEMENTS_CONFIG_SETTING = "schema_inference_cache_max_elements_for_s3"; - - static CurrentMetrics::Metric ObjectStorageThreads() { return CurrentMetrics::ObjectStorageS3Threads; } /// NOLINT - static CurrentMetrics::Metric ObjectStorageThreadsActive() { return CurrentMetrics::ObjectStorageS3ThreadsActive; } /// NOLINT - static CurrentMetrics::Metric ObjectStorageThreadsScheduled() { return CurrentMetrics::ObjectStorageS3ThreadsScheduled; } /// NOLINT -}; - -struct AzureStorageSettings -{ - static StorageObjectStorageSettings create(const Settings & settings) - { - return StorageObjectStorageSettings{ - .truncate_on_insert = settings.azure_truncate_on_insert, - .create_new_file_on_insert = settings.azure_create_new_file_on_insert, - .schema_inference_use_cache = settings.schema_inference_use_cache_for_azure, - .schema_inference_mode = settings.schema_inference_mode, - .skip_empty_files = settings.s3_skip_empty_files, /// TODO: add setting for azure - .list_object_keys_size = settings.azure_list_object_keys_size, - .throw_on_zero_files_match = settings.s3_throw_on_zero_files_match, - .ignore_non_existent_file = settings.azure_ignore_file_doesnt_exist, - }; - } - - static constexpr auto SCHEMA_CACHE_MAX_ELEMENTS_CONFIG_SETTING = "schema_inference_cache_max_elements_for_azure"; - - static CurrentMetrics::Metric ObjectStorageThreads() { return CurrentMetrics::ObjectStorageAzureThreads; } /// NOLINT - static CurrentMetrics::Metric ObjectStorageThreadsActive() { return CurrentMetrics::ObjectStorageAzureThreadsActive; } /// NOLINT - static CurrentMetrics::Metric ObjectStorageThreadsScheduled() { return CurrentMetrics::ObjectStorageAzureThreadsScheduled; } /// NOLINT -}; - -struct HDFSStorageSettings -{ - static StorageObjectStorageSettings create(const Settings & settings) - { - return StorageObjectStorageSettings{ - .truncate_on_insert = settings.hdfs_truncate_on_insert, - .create_new_file_on_insert = settings.hdfs_create_new_file_on_insert, - .schema_inference_use_cache = settings.schema_inference_use_cache_for_hdfs, - .schema_inference_mode = settings.schema_inference_mode, - .skip_empty_files = settings.hdfs_skip_empty_files, /// TODO: add setting for hdfs - .list_object_keys_size = settings.s3_list_object_keys_size, /// TODO: add a setting for hdfs - .throw_on_zero_files_match = settings.s3_throw_on_zero_files_match, - .ignore_non_existent_file = settings.hdfs_ignore_file_doesnt_exist, - }; - } - - static constexpr auto SCHEMA_CACHE_MAX_ELEMENTS_CONFIG_SETTING = "schema_inference_cache_max_elements_for_hdfs"; - - /// TODO: s3 -> hdfs - static CurrentMetrics::Metric ObjectStorageThreads() { return CurrentMetrics::ObjectStorageS3Threads; } /// NOLINT - static CurrentMetrics::Metric ObjectStorageThreadsActive() { return CurrentMetrics::ObjectStorageS3ThreadsActive; } /// NOLINT - static CurrentMetrics::Metric ObjectStorageThreadsScheduled() { return CurrentMetrics::ObjectStorageS3ThreadsScheduled; } /// NOLINT -}; - -} diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp index 42371764920..62367a6b933 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp @@ -103,7 +103,6 @@ void StorageObjectStorageSink::release() PartitionedStorageObjectStorageSink::PartitionedStorageObjectStorageSink( ObjectStoragePtr object_storage_, StorageObjectStorageConfigurationPtr configuration_, - const StorageObjectStorageSettings & query_settings_, std::optional format_settings_, const Block & sample_block_, ContextPtr context_, @@ -111,7 +110,7 @@ PartitionedStorageObjectStorageSink::PartitionedStorageObjectStorageSink( : PartitionedSink(partition_by, context_, sample_block_) , object_storage(object_storage_) , configuration(configuration_) - , query_settings(query_settings_) + , query_settings(configuration_->getQuerySettings(context_)) , format_settings(format_settings_) , sample_block(sample_block_) , context(context_) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSink.h b/src/Storages/ObjectStorage/StorageObjectStorageSink.h index 38805332a35..6c2f73e40e3 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSink.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSink.h @@ -1,7 +1,6 @@ #pragma once #include #include -#include #include #include @@ -47,7 +46,6 @@ public: PartitionedStorageObjectStorageSink( ObjectStoragePtr object_storage_, StorageObjectStorageConfigurationPtr configuration_, - const StorageObjectStorageSettings & query_settings_, std::optional format_settings_, const Block & sample_block_, ContextPtr context_, @@ -61,7 +59,7 @@ private: ObjectStoragePtr object_storage; StorageObjectStorageConfigurationPtr configuration; - const StorageObjectStorageSettings query_settings; + const StorageObjectStorage::QuerySettings query_settings; const std::optional format_settings; const Block sample_block; const ContextPtr context; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 82824b0e7f7..3101a7ebf51 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -10,7 +10,6 @@ #include #include #include -#include #include #include @@ -20,6 +19,13 @@ namespace ProfileEvents extern const Event EngineFileLikeReadFiles; } +namespace CurrentMetrics +{ + extern const Metric StorageObjectStorageThreads; + extern const Metric StorageObjectStorageThreadsActive; + extern const Metric StorageObjectStorageThreadsScheduled; +} + namespace DB { @@ -37,16 +43,12 @@ StorageObjectStorageSource::StorageObjectStorageSource( ConfigurationPtr configuration_, const ReadFromFormatInfo & info, std::optional format_settings_, - const StorageObjectStorageSettings & query_settings_, + const StorageObjectStorage::QuerySettings & query_settings_, ContextPtr context_, UInt64 max_block_size_, std::shared_ptr file_iterator_, bool need_only_count_, - SchemaCache & schema_cache_, - std::shared_ptr reader_pool_, - CurrentMetrics::Metric metric_threads_, - CurrentMetrics::Metric metric_threads_active_, - CurrentMetrics::Metric metric_threads_scheduled_) + SchemaCache & schema_cache_) : SourceWithKeyCondition(info.source_header, false) , WithContext(context_) , name(std::move(name_)) @@ -57,13 +59,14 @@ StorageObjectStorageSource::StorageObjectStorageSource( , max_block_size(max_block_size_) , need_only_count(need_only_count_) , read_from_format_info(info) - , create_reader_pool(reader_pool_) + , create_reader_pool(std::make_shared( + CurrentMetrics::StorageObjectStorageThreads, + CurrentMetrics::StorageObjectStorageThreadsActive, + CurrentMetrics::StorageObjectStorageThreadsScheduled, + 1/* max_threads */)) , columns_desc(info.columns_description) , file_iterator(file_iterator_) , schema_cache(schema_cache_) - , metric_threads(metric_threads_) - , metric_threads_active(metric_threads_active_) - , metric_threads_scheduled(metric_threads_scheduled_) , create_reader_scheduler(threadPoolCallbackRunnerUnsafe(*create_reader_pool, "Reader")) { } @@ -76,26 +79,23 @@ StorageObjectStorageSource::~StorageObjectStorageSource() std::shared_ptr StorageObjectStorageSource::createFileIterator( ConfigurationPtr configuration, ObjectStoragePtr object_storage, - const StorageObjectStorageSettings & settings, bool distributed_processing, const ContextPtr & local_context, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, ObjectInfos * read_keys, - CurrentMetrics::Metric metric_threads_, - CurrentMetrics::Metric metric_threads_active_, - CurrentMetrics::Metric metric_threads_scheduled_, std::function file_progress_callback) { if (distributed_processing) return std::make_shared( local_context->getReadTaskCallback(), - local_context->getSettingsRef().max_threads, - metric_threads_, metric_threads_active_, metric_threads_scheduled_); + local_context->getSettingsRef().max_threads); if (configuration->isNamespaceWithGlobs()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expression can not have wildcards inside namespace name"); + auto settings = configuration->getQuerySettings(local_context); + if (configuration->isPathWithGlobs()) { /// Iterate through disclosed globs and make a source for each file @@ -568,7 +568,8 @@ StorageObjectStorageSource::ReaderHolder::ReaderHolder( { } -StorageObjectStorageSource::ReaderHolder & StorageObjectStorageSource::ReaderHolder::operator=(ReaderHolder && other) noexcept +StorageObjectStorageSource::ReaderHolder & +StorageObjectStorageSource::ReaderHolder::operator=(ReaderHolder && other) noexcept { /// The order of destruction is important. /// reader uses pipeline, pipeline uses read_buf. @@ -581,15 +582,15 @@ StorageObjectStorageSource::ReaderHolder & StorageObjectStorageSource::ReaderHol } StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator( - const ReadTaskCallback & callback_, - size_t max_threads_count, - CurrentMetrics::Metric metric_threads_, - CurrentMetrics::Metric metric_threads_active_, - CurrentMetrics::Metric metric_threads_scheduled_) + const ReadTaskCallback & callback_, size_t max_threads_count) : IIterator("ReadTaskIterator") , callback(callback_) { - ThreadPool pool(metric_threads_, metric_threads_active_, metric_threads_scheduled_, max_threads_count); + ThreadPool pool( + CurrentMetrics::StorageObjectStorageThreads, + CurrentMetrics::StorageObjectStorageThreadsActive, + CurrentMetrics::StorageObjectStorageThreadsScheduled, max_threads_count); + auto pool_scheduler = threadPoolCallbackRunnerUnsafe(pool, "ReadTaskIter"); std::vector> keys; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.h b/src/Storages/ObjectStorage/StorageObjectStorageSource.h index f75bfc390bb..3c2cc3f80cd 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.h @@ -3,7 +3,6 @@ #include #include #include -#include #include #include @@ -28,16 +27,12 @@ public: ConfigurationPtr configuration, const ReadFromFormatInfo & info, std::optional format_settings_, - const StorageObjectStorageSettings & query_settings_, + const StorageObjectStorage::QuerySettings & query_settings_, ContextPtr context_, UInt64 max_block_size_, std::shared_ptr file_iterator_, bool need_only_count_, - SchemaCache & schema_cache_, - std::shared_ptr reader_pool_, - CurrentMetrics::Metric metric_threads_, - CurrentMetrics::Metric metric_threads_active_, - CurrentMetrics::Metric metric_threads_scheduled_); + SchemaCache & schema_cache_); ~StorageObjectStorageSource() override; @@ -53,15 +48,11 @@ public: static std::shared_ptr createFileIterator( ConfigurationPtr configuration, ObjectStoragePtr object_storage, - const StorageObjectStorageSettings & settings, bool distributed_processing, const ContextPtr & local_context, const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, ObjectInfos * read_keys, - CurrentMetrics::Metric metric_threads_, - CurrentMetrics::Metric metric_threads_active_, - CurrentMetrics::Metric metric_threads_scheduled_, std::function file_progress_callback = {}); protected: @@ -69,7 +60,7 @@ protected: ObjectStoragePtr object_storage; const ConfigurationPtr configuration; const std::optional format_settings; - const StorageObjectStorageSettings query_settings; + const StorageObjectStorage::QuerySettings query_settings; const UInt64 max_block_size; const bool need_only_count; const ReadFromFormatInfo read_from_format_info; @@ -79,10 +70,6 @@ protected: SchemaCache & schema_cache; bool initialized = false; - const CurrentMetrics::Metric metric_threads; - const CurrentMetrics::Metric metric_threads_active; - const CurrentMetrics::Metric metric_threads_scheduled; - size_t total_rows_in_file = 0; LoggerPtr log = getLogger("StorageObjectStorageSource"); @@ -149,12 +136,7 @@ protected: class StorageObjectStorageSource::ReadTaskIterator : public IIterator { public: - ReadTaskIterator( - const ReadTaskCallback & callback_, - size_t max_threads_count, - CurrentMetrics::Metric metric_threads_, - CurrentMetrics::Metric metric_threads_active_, - CurrentMetrics::Metric metric_threads_scheduled_); + ReadTaskIterator(const ReadTaskCallback & callback_, size_t max_threads_count); size_t estimatedKeysCount() override { return buffer.size(); } diff --git a/src/Storages/ObjectStorage/Utils.cpp b/src/Storages/ObjectStorage/Utils.cpp index 9caab709081..94d6dadee3b 100644 --- a/src/Storages/ObjectStorage/Utils.cpp +++ b/src/Storages/ObjectStorage/Utils.cpp @@ -1,8 +1,6 @@ #include #include #include -#include - namespace DB { @@ -15,15 +13,15 @@ namespace ErrorCodes std::optional checkAndGetNewFileOnInsertIfNeeded( const IObjectStorage & object_storage, const StorageObjectStorageConfiguration & configuration, - const StorageObjectStorageSettings & query_settings, + const StorageObjectStorage::QuerySettings & settings, const String & key, size_t sequence_number) { - if (query_settings.truncate_on_insert + if (settings.truncate_on_insert || !object_storage.exists(StoredObject(key))) return std::nullopt; - if (query_settings.create_new_file_on_insert) + if (settings.create_new_file_on_insert) { auto pos = key.find_first_of('.'); String new_key; @@ -45,4 +43,38 @@ std::optional checkAndGetNewFileOnInsertIfNeeded( configuration.getNamespace(), key); } +StorageInMemoryMetadata getStorageMetadata( + ObjectStoragePtr object_storage, + const StorageObjectStorageConfigurationPtr & configuration, + const ColumnsDescription & columns, + const ConstraintsDescription & constraints, + std::optional format_settings, + const String & comment, + const ContextPtr & context) +{ + StorageInMemoryMetadata storage_metadata; + if (columns.empty()) + { + auto fetched_columns = StorageObjectStorage::getTableStructureFromData(object_storage, configuration, format_settings, context); + storage_metadata.setColumns(fetched_columns); + } + else if (!columns.hasOnlyOrdinary()) + { + /// We don't allow special columns. + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Special columns are not supported for {} storage" + "like MATERIALIZED, ALIAS or EPHEMERAL", configuration->getTypeName()); + } + else + { + if (configuration->format == "auto") + StorageObjectStorage::setFormatFromData(object_storage, configuration, format_settings, context); + + storage_metadata.setColumns(columns); + } + storage_metadata.setConstraints(constraints); + storage_metadata.setComment(comment); + return storage_metadata; +} + } diff --git a/src/Storages/ObjectStorage/Utils.h b/src/Storages/ObjectStorage/Utils.h index afc0f31a33f..37bd49a77c0 100644 --- a/src/Storages/ObjectStorage/Utils.h +++ b/src/Storages/ObjectStorage/Utils.h @@ -1,18 +1,30 @@ #pragma once #include +#include "StorageObjectStorage.h" namespace DB { class IObjectStorage; class StorageObjectStorageConfiguration; +using StorageObjectStorageConfigurationPtr = std::shared_ptr; struct StorageObjectStorageSettings; std::optional checkAndGetNewFileOnInsertIfNeeded( const IObjectStorage & object_storage, const StorageObjectStorageConfiguration & configuration, - const StorageObjectStorageSettings & query_settings, + const StorageObjectStorage::QuerySettings & settings, const std::string & key, size_t sequence_number); + +StorageInMemoryMetadata getStorageMetadata( + ObjectStoragePtr object_storage, + const StorageObjectStorageConfigurationPtr & configuration, + const ColumnsDescription & columns, + const ConstraintsDescription & constraints, + std::optional format_settings, + const String & comment, + const ContextPtr & context); + } diff --git a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp index 3271b766f68..06b8aefb716 100644 --- a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp @@ -2,22 +2,23 @@ #include #include #include +#include #include #include namespace DB { +#if USE_AWS_S3 || USE_AZURE_BLOB_STORAGE || USE_HDFS + namespace ErrorCodes { extern const int BAD_ARGUMENTS; } -template -static std::shared_ptr> createStorageObjectStorage( +static std::shared_ptr createStorageObjectStorage( const StorageFactory::Arguments & args, - typename StorageObjectStorage::ConfigurationPtr configuration, - const String & engine_name, + typename StorageObjectStorage::ConfigurationPtr configuration, ContextPtr context) { auto & engine_args = args.engine_args; @@ -54,10 +55,9 @@ static std::shared_ptr> createStorageObjec if (args.storage_def->partition_by) partition_by = args.storage_def->partition_by->clone(); - return std::make_shared>( + return std::make_shared( configuration, configuration->createObjectStorage(context), - engine_name, args.getContext(), args.table_id, args.columns, @@ -68,6 +68,8 @@ static std::shared_ptr> createStorageObjec partition_by); } +#endif + #if USE_AZURE_BLOB_STORAGE void registerStorageAzure(StorageFactory & factory) { @@ -76,7 +78,7 @@ void registerStorageAzure(StorageFactory & factory) auto context = args.getLocalContext(); auto configuration = std::make_shared(); StorageObjectStorageConfiguration::initialize(*configuration, args.engine_args, context, false); - return createStorageObjectStorage(args, configuration, "Azure", context); + return createStorageObjectStorage(args, configuration, context); }, { .supports_settings = true, @@ -95,7 +97,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory) auto context = args.getLocalContext(); auto configuration = std::make_shared(); StorageObjectStorageConfiguration::initialize(*configuration, args.engine_args, context, false); - return createStorageObjectStorage(args, configuration, name, context); + return createStorageObjectStorage(args, configuration, context); }, { .supports_settings = true, @@ -130,7 +132,7 @@ void registerStorageHDFS(StorageFactory & factory) auto context = args.getLocalContext(); auto configuration = std::make_shared(); StorageObjectStorageConfiguration::initialize(*configuration, args.engine_args, context, false); - return createStorageObjectStorage(args, configuration, "HDFS", context); + return createStorageObjectStorage(args, configuration, context); }, { .supports_settings = true, diff --git a/src/Storages/S3Queue/S3QueueSource.h b/src/Storages/S3Queue/S3QueueSource.h index c1b45108b36..5a1f0f6dd04 100644 --- a/src/Storages/S3Queue/S3QueueSource.h +++ b/src/Storages/S3Queue/S3QueueSource.h @@ -7,7 +7,6 @@ #include #include #include -#include #include @@ -21,7 +20,7 @@ struct ObjectMetadata; class StorageS3QueueSource : public ISource, WithContext { public: - using Storage = StorageObjectStorage; + using Storage = StorageObjectStorage; using ConfigurationPtr = Storage::ConfigurationPtr; using GlobIterator = StorageObjectStorageSource::GlobIterator; diff --git a/src/Storages/S3Queue/StorageS3Queue.cpp b/src/Storages/S3Queue/StorageS3Queue.cpp index c5799d23abd..6b504b0d986 100644 --- a/src/Storages/S3Queue/StorageS3Queue.cpp +++ b/src/Storages/S3Queue/StorageS3Queue.cpp @@ -37,13 +37,6 @@ namespace ProfileEvents extern const Event S3ListObjects; } -namespace CurrentMetrics -{ - extern const Metric ObjectStorageS3Threads; - extern const Metric ObjectStorageS3ThreadsActive; - extern const Metric ObjectStorageS3ThreadsScheduled; -} - namespace DB { @@ -151,14 +144,14 @@ StorageS3Queue::StorageS3Queue( StorageInMemoryMetadata storage_metadata; if (columns_.empty()) { - auto columns = Storage::getTableStructureFromData(object_storage, configuration, format_settings, context_); + auto columns = StorageObjectStorage::getTableStructureFromData(object_storage, configuration, format_settings, context_); storage_metadata.setColumns(columns); } else { if (configuration->format == "auto") { - StorageObjectStorage::setFormatFromData(object_storage, configuration, format_settings, context_); + StorageObjectStorage::setFormatFromData(object_storage, configuration, format_settings, context_); } storage_metadata.setColumns(columns_); } @@ -370,26 +363,18 @@ std::shared_ptr StorageS3Queue::createSource( size_t max_block_size, ContextPtr local_context) { - auto threadpool = std::make_shared(CurrentMetrics::ObjectStorageS3Threads, - CurrentMetrics::ObjectStorageS3ThreadsActive, - CurrentMetrics::ObjectStorageS3ThreadsScheduled, - /* max_threads */1); auto internal_source = std::make_unique( getName(), object_storage, configuration, info, format_settings, - S3StorageSettings::create(local_context->getSettingsRef()), + configuration->getQuerySettings(local_context), local_context, max_block_size, file_iterator, false, - Storage::getSchemaCache(local_context), - threadpool, - CurrentMetrics::ObjectStorageS3Threads, - CurrentMetrics::ObjectStorageS3ThreadsActive, - CurrentMetrics::ObjectStorageS3ThreadsScheduled); + StorageObjectStorage::getSchemaCache(local_context, configuration->getTypeName())); auto file_deleter = [=, this](const std::string & path) mutable { @@ -596,7 +581,7 @@ void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const std::shared_ptr StorageS3Queue::createFileIterator(ContextPtr local_context, const ActionsDAG::Node * predicate) { - auto settings = S3StorageSettings::create(local_context->getSettingsRef()); + auto settings = configuration->getQuerySettings(local_context); auto glob_iterator = std::make_unique( object_storage, configuration, predicate, getVirtualsList(), local_context, nullptr, settings.list_object_keys_size, settings.throw_on_zero_files_match); diff --git a/src/Storages/S3Queue/StorageS3Queue.h b/src/Storages/S3Queue/StorageS3Queue.h index 72c41a6a694..1464e15ebf2 100644 --- a/src/Storages/S3Queue/StorageS3Queue.h +++ b/src/Storages/S3Queue/StorageS3Queue.h @@ -21,8 +21,7 @@ class S3QueueFilesMetadata; class StorageS3Queue : public IStorage, WithContext { public: - using Storage = StorageObjectStorage; - using ConfigurationPtr = Storage::ConfigurationPtr; + using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr; StorageS3Queue( std::unique_ptr s3queue_settings_, diff --git a/src/Storages/System/StorageSystemSchemaInferenceCache.cpp b/src/Storages/System/StorageSystemSchemaInferenceCache.cpp index 9ef64f2b90d..a2d3f342a63 100644 --- a/src/Storages/System/StorageSystemSchemaInferenceCache.cpp +++ b/src/Storages/System/StorageSystemSchemaInferenceCache.cpp @@ -9,6 +9,9 @@ #include #include #include +#include +#include +#include namespace DB { @@ -74,14 +77,14 @@ void StorageSystemSchemaInferenceCache::fillData(MutableColumns & res_columns, C { fillDataImpl(res_columns, StorageFile::getSchemaCache(context), "File"); #if USE_AWS_S3 - fillDataImpl(res_columns, StorageS3::getSchemaCache(context), "S3"); + fillDataImpl(res_columns, StorageObjectStorage::getSchemaCache(context, StorageS3Configuration::type_name), "S3"); #endif #if USE_HDFS - fillDataImpl(res_columns, StorageHDFS::getSchemaCache(context), "HDFS"); + fillDataImpl(res_columns, StorageObjectStorage::getSchemaCache(context, StorageHDFSConfiguration::type_name), "HDFS"); #endif fillDataImpl(res_columns, StorageURL::getSchemaCache(context), "URL"); #if USE_AZURE_BLOB_STORAGE - fillDataImpl(res_columns, StorageAzureBlob::getSchemaCache(context), "Azure"); /// FIXME + fillDataImpl(res_columns, StorageObjectStorage::getSchemaCache(context, StorageAzureBlobConfiguration::type_name), "Azure"); #endif } diff --git a/src/TableFunctions/ITableFunctionDataLake.h b/src/TableFunctions/ITableFunctionDataLake.h index 8cbd855bb96..02c8c623e61 100644 --- a/src/TableFunctions/ITableFunctionDataLake.h +++ b/src/TableFunctions/ITableFunctionDataLake.h @@ -39,7 +39,7 @@ protected: columns = cached_columns; StoragePtr storage = Storage::create( - configuration, context, "", StorageID(TableFunction::getDatabaseName(), table_name), + configuration, context, StorageID(TableFunction::getDatabaseName(), table_name), columns, ConstraintsDescription{}, String{}, std::nullopt, LoadingStrictnessLevel::CREATE); storage->startup(); diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 9223642a7e6..2b5c774ff78 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -27,27 +27,27 @@ namespace ErrorCodes extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; } -template +template ObjectStoragePtr TableFunctionObjectStorage< - Definition, StorageSettings, Configuration>::getObjectStorage(const ContextPtr & context, bool create_readonly) const + Definition, Configuration>::getObjectStorage(const ContextPtr & context, bool create_readonly) const { if (!object_storage) object_storage = configuration->createObjectStorage(context, create_readonly); return object_storage; } -template +template StorageObjectStorageConfigurationPtr TableFunctionObjectStorage< - Definition, StorageSettings, Configuration>::getConfiguration() const + Definition, Configuration>::getConfiguration() const { if (!configuration) configuration = std::make_shared(); return configuration; } -template +template std::vector TableFunctionObjectStorage< - Definition, StorageSettings, Configuration>::skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr) const + Definition, Configuration>::skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr) const { auto & table_function_node = query_node_table_function->as(); auto & table_function_arguments_nodes = table_function_node.getArguments().getNodes(); @@ -63,22 +63,21 @@ std::vector TableFunctionObjectStorage< return result; } -template -void TableFunctionObjectStorage::updateStructureAndFormatArgumentsIfNeeded( +template +void TableFunctionObjectStorage::updateStructureAndFormatArgumentsIfNeeded( ASTs & args, const String & structure, const String & format, const ContextPtr & context) { - Configuration::addStructureAndFormatToArgs(args, structure, format, context); + Configuration().addStructureAndFormatToArgs(args, structure, format, context); } -template -void TableFunctionObjectStorage< - Definition, StorageSettings, Configuration>::parseArgumentsImpl(ASTs & engine_args, const ContextPtr & local_context) +template +void TableFunctionObjectStorage::parseArgumentsImpl(ASTs & engine_args, const ContextPtr & local_context) { StorageObjectStorageConfiguration::initialize(*getConfiguration(), engine_args, local_context, true); } -template -void TableFunctionObjectStorage::parseArguments(const ASTPtr & ast_function, ContextPtr context) +template +void TableFunctionObjectStorage::parseArguments(const ASTPtr & ast_function, ContextPtr context) { /// Clone ast function, because we can modify its arguments like removing headers. auto ast_copy = ast_function->clone(); @@ -90,38 +89,38 @@ void TableFunctionObjectStorage::par parseArgumentsImpl(args, context); } -template +template ColumnsDescription TableFunctionObjectStorage< - Definition, StorageSettings, Configuration>::getActualTableStructure(ContextPtr context, bool is_insert_query) const + Definition, Configuration>::getActualTableStructure(ContextPtr context, bool is_insert_query) const { chassert(configuration); if (configuration->structure == "auto") { context->checkAccess(getSourceAccessType()); auto storage = getObjectStorage(context, !is_insert_query); - return StorageObjectStorage::getTableStructureFromData(storage, configuration, std::nullopt, context); + return StorageObjectStorage::getTableStructureFromData(storage, configuration, std::nullopt, context); } return parseColumnsListFromString(configuration->structure, context); } -template +template bool TableFunctionObjectStorage< - Definition, StorageSettings, Configuration>::supportsReadingSubsetOfColumns(const ContextPtr & context) + Definition, Configuration>::supportsReadingSubsetOfColumns(const ContextPtr & context) { chassert(configuration); return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->format, context); } -template +template std::unordered_set TableFunctionObjectStorage< - Definition, StorageSettings, Configuration>::getVirtualsToCheckBeforeUsingStructureHint() const + Definition, Configuration>::getVirtualsToCheckBeforeUsingStructureHint() const { return VirtualColumnUtils::getVirtualNamesForFileLikeStorage(); } -template -StoragePtr TableFunctionObjectStorage::executeImpl( +template +StoragePtr TableFunctionObjectStorage::executeImpl( const ASTPtr & /* ast_function */, ContextPtr context, const std::string & table_name, @@ -137,10 +136,9 @@ StoragePtr TableFunctionObjectStorage>( + StoragePtr storage = std::make_shared( configuration, getObjectStorage(context, !is_insert_query), - Definition::storage_type_name, context, StorageID(getDatabaseName(), table_name), columns, @@ -159,7 +157,7 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory) { UNUSED(factory); #if USE_AWS_S3 - factory.registerFunction>( + factory.registerFunction>( { .documentation = { @@ -170,7 +168,7 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory) .allow_readonly = false }); - factory.registerFunction>( + factory.registerFunction>( { .documentation = { @@ -181,7 +179,7 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory) .allow_readonly = false }); - factory.registerFunction>( + factory.registerFunction>( { .documentation = { @@ -191,7 +189,7 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory) .categories{"DataLake"}}, .allow_readonly = false }); - factory.registerFunction>( + factory.registerFunction>( { .documentation = { @@ -204,7 +202,7 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory) #endif #if USE_AZURE_BLOB_STORAGE - factory.registerFunction>( + factory.registerFunction>( { .documentation = { @@ -220,7 +218,7 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory) }); #endif #if USE_HDFS - factory.registerFunction>( + factory.registerFunction>( { .allow_readonly = false }); @@ -228,21 +226,21 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory) } #if USE_AZURE_BLOB_STORAGE -template class TableFunctionObjectStorage; -template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; #endif #if USE_AWS_S3 -template class TableFunctionObjectStorage; -template class TableFunctionObjectStorage; -template class TableFunctionObjectStorage; -template class TableFunctionObjectStorage; -template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; #endif #if USE_HDFS -template class TableFunctionObjectStorage; -template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; #endif } diff --git a/src/TableFunctions/TableFunctionObjectStorage.h b/src/TableFunctions/TableFunctionObjectStorage.h index 9022f6e577f..bd43cae3697 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.h +++ b/src/TableFunctions/TableFunctionObjectStorage.h @@ -85,7 +85,7 @@ struct HDFSDefinition " - uri, format, structure, compression_method\n"; }; -template +template class TableFunctionObjectStorage : public ITableFunction { public: @@ -142,14 +142,14 @@ protected: }; #if USE_AWS_S3 -using TableFunctionS3 = TableFunctionObjectStorage; +using TableFunctionS3 = TableFunctionObjectStorage; #endif #if USE_AZURE_BLOB_STORAGE -using TableFunctionAzureBlob = TableFunctionObjectStorage; +using TableFunctionAzureBlob = TableFunctionObjectStorage; #endif #if USE_HDFS -using TableFunctionHDFS = TableFunctionObjectStorage; +using TableFunctionHDFS = TableFunctionObjectStorage; #endif } diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp index 909ace788eb..ce78076dd21 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp @@ -14,8 +14,8 @@ namespace DB { -template -StoragePtr TableFunctionObjectStorageCluster::executeImpl( +template +StoragePtr TableFunctionObjectStorageCluster::executeImpl( const ASTPtr & /*function*/, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const { @@ -34,10 +34,9 @@ StoragePtr TableFunctionObjectStorageClustergetClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) { /// On worker node this filename won't contains globs - storage = std::make_shared>( + storage = std::make_shared( configuration, object_storage, - Definition::storage_type_name, context, StorageID(Base::getDatabaseName(), table_name), columns, @@ -49,11 +48,10 @@ StoragePtr TableFunctionObjectStorageCluster>( + storage = std::make_shared( ITableFunctionCluster::cluster_name, configuration, object_storage, - Definition::storage_type_name, StorageID(Base::getDatabaseName(), table_name), columns, ConstraintsDescription{}, @@ -107,14 +105,14 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory) } #if USE_AWS_S3 -template class TableFunctionObjectStorageCluster; +template class TableFunctionObjectStorageCluster; #endif #if USE_AZURE_BLOB_STORAGE -template class TableFunctionObjectStorageCluster; +template class TableFunctionObjectStorageCluster; #endif #if USE_HDFS -template class TableFunctionObjectStorageCluster; +template class TableFunctionObjectStorageCluster; #endif } diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.h b/src/TableFunctions/TableFunctionObjectStorageCluster.h index 21c2f8995dc..a8bc11b5e40 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.h +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.h @@ -56,8 +56,8 @@ struct HDFSClusterDefinition " - cluster_name, uri, format, structure, compression_method\n"; }; -template -class TableFunctionObjectStorageCluster : public ITableFunctionCluster> +template +class TableFunctionObjectStorageCluster : public ITableFunctionCluster> { public: static constexpr auto name = Definition::name; @@ -67,7 +67,7 @@ public: String getSignature() const override { return signature; } protected: - using Base = TableFunctionObjectStorage; + using Base = TableFunctionObjectStorage; StoragePtr executeImpl( const ASTPtr & ast_function, @@ -86,14 +86,14 @@ protected: }; #if USE_AWS_S3 -using TableFunctionS3Cluster = TableFunctionObjectStorageCluster; +using TableFunctionS3Cluster = TableFunctionObjectStorageCluster; #endif #if USE_AZURE_BLOB_STORAGE -using TableFunctionAzureBlobCluster = TableFunctionObjectStorageCluster; +using TableFunctionAzureBlobCluster = TableFunctionObjectStorageCluster; #endif #if USE_HDFS -using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster; +using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster; #endif }