This commit is contained in:
kssenii 2024-04-20 17:27:54 +01:00
parent a4daf2b454
commit 399414bb40
37 changed files with 427 additions and 536 deletions

View File

@ -168,6 +168,9 @@
M(ObjectStorageS3Threads, "Number of threads in the S3ObjectStorage thread pool.") \
M(ObjectStorageS3ThreadsActive, "Number of threads in the S3ObjectStorage thread pool running a task.") \
M(ObjectStorageS3ThreadsScheduled, "Number of queued or active jobs in the S3ObjectStorage thread pool.") \
M(StorageObjectStorageThreads, "Number of threads in the remote table engines thread pools.") \
M(StorageObjectStorageThreadsActive, "Number of threads in the remote table engines thread pool running a task.") \
M(StorageObjectStorageThreadsScheduled, "Number of queued or active jobs in remote table engines thread pool.") \
M(ObjectStorageAzureThreads, "Number of threads in the AzureObjectStorage thread pool.") \
M(ObjectStorageAzureThreadsActive, "Number of threads in the AzureObjectStorage thread pool running a task.") \
M(ObjectStorageAzureThreadsScheduled, "Number of queued or active jobs in the AzureObjectStorage thread pool.") \

View File

@ -53,6 +53,9 @@
#include <Storages/StorageFile.h>
#include <Storages/StorageURL.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ObjectStorage/HDFS/Configuration.h>
#include <Storages/ObjectStorage/AzureBlob/Configuration.h>
#include <Storages/MaterializedView/RefreshTask.h>
#include <Storages/System/StorageSystemFilesystemCache.h>
#include <Parsers/ASTSystemQuery.h>
@ -489,17 +492,17 @@ BlockIO InterpreterSystemQuery::execute()
StorageFile::getSchemaCache(getContext()).clear();
#if USE_AWS_S3
if (caches_to_drop.contains("S3"))
StorageS3::getSchemaCache(getContext()).clear();
StorageObjectStorage::getSchemaCache(getContext(), StorageS3Configuration::type_name).clear();
#endif
#if USE_HDFS
if (caches_to_drop.contains("HDFS"))
StorageHDFS::getSchemaCache(getContext()).clear();
StorageObjectStorage::getSchemaCache(getContext(), StorageHDFSConfiguration::type_name).clear();
#endif
if (caches_to_drop.contains("URL"))
StorageURL::getSchemaCache(getContext()).clear();
#if USE_AZURE_BLOB_STORAGE
if (caches_to_drop.contains("AZURE"))
StorageAzureBlob::getSchemaCache(getContext()).clear();
StorageObjectStorage::getSchemaCache(getContext(), StorageAzureBlobConfiguration::type_name).clear();
#endif
break;
}

View File

@ -101,6 +101,21 @@ AzureObjectStorage::SettingsPtr StorageAzureBlobConfiguration::createSettings(Co
return settings_ptr;
}
StorageObjectStorage::QuerySettings StorageAzureBlobConfiguration::getQuerySettings(const ContextPtr & context) const
{
const auto & settings = context->getSettingsRef();
return StorageObjectStorage::QuerySettings{
.truncate_on_insert = settings.azure_truncate_on_insert,
.create_new_file_on_insert = settings.azure_create_new_file_on_insert,
.schema_inference_use_cache = settings.schema_inference_use_cache_for_azure,
.schema_inference_mode = settings.schema_inference_mode,
.skip_empty_files = settings.s3_skip_empty_files, /// TODO: add setting for azure
.list_object_keys_size = settings.azure_list_object_keys_size,
.throw_on_zero_files_match = settings.s3_throw_on_zero_files_match,
.ignore_non_existent_file = settings.azure_ignore_file_doesnt_exist,
};
}
ObjectStoragePtr StorageAzureBlobConfiguration::createObjectStorage(ContextPtr context, bool is_readonly) /// NOLINT
{
assertInitialized();

View File

@ -18,9 +18,15 @@ class StorageAzureBlobConfiguration : public StorageObjectStorageConfiguration
friend void registerBackupEngineAzureBlobStorage(BackupFactory & factory);
public:
static constexpr auto type_name = "azure";
static constexpr auto engine_name = "Azure";
StorageAzureBlobConfiguration() = default;
StorageAzureBlobConfiguration(const StorageAzureBlobConfiguration & other);
std::string getTypeName() const override { return type_name; }
std::string getEngineName() const override { return engine_name; }
Path getPath() const override { return blob_path; }
void setPath(const Path & path) override { blob_path = path; }
@ -30,6 +36,7 @@ public:
String getDataSourceDescription() override { return fs::path(connection_url) / container; }
String getNamespace() const override { return container; }
StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr &) const override;
void check(ContextPtr context) const override;
ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly = true) override; /// NOLINT
@ -37,8 +44,8 @@ public:
void fromNamedCollection(const NamedCollection & collection) override;
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;
static void addStructureAndFormatToArgs(
ASTs & args, const String & structure_, const String & format_, ContextPtr context);
void addStructureAndFormatToArgs(
ASTs & args, const String & structure_, const String & format_, ContextPtr context) override;
protected:
using AzureClient = Azure::Storage::Blobs::BlobContainerClient;

View File

@ -184,7 +184,7 @@ struct DeltaLakeMetadata::Impl
*
* We need to get "version", which is the version of the checkpoint we need to read.
*/
size_t readLastCheckpointIfExists()
size_t readLastCheckpointIfExists() const
{
const auto last_checkpoint_file = fs::path(configuration->getPath()) / deltalake_metadata_directory / "_last_checkpoint";
if (!object_storage->exists(StoredObject(last_checkpoint_file)))

View File

@ -21,17 +21,16 @@ namespace DB
/// Storage for read-only integration with Apache Iceberg tables in Amazon S3 (see https://iceberg.apache.org/)
/// Right now it's implemented on top of StorageS3 and right now it doesn't support
/// many Iceberg features like schema evolution, partitioning, positional and equality deletes.
template <typename DataLakeMetadata, typename StorageSettings>
class IStorageDataLake final : public StorageObjectStorage<StorageSettings>
template <typename DataLakeMetadata>
class IStorageDataLake final : public StorageObjectStorage
{
public:
using Storage = StorageObjectStorage<StorageSettings>;
using Storage = StorageObjectStorage;
using ConfigurationPtr = Storage::ConfigurationPtr;
static StoragePtr create(
ConfigurationPtr base_configuration,
ContextPtr context,
const String & engine_name_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
@ -64,9 +63,9 @@ public:
tryLogCurrentException(__PRETTY_FUNCTION__);
}
return std::make_shared<IStorageDataLake<DataLakeMetadata, StorageSettings>>(
return std::make_shared<IStorageDataLake<DataLakeMetadata>>(
base_configuration, std::move(metadata), configuration, object_storage,
engine_name_, context, table_id_,
context, table_id_,
columns_.empty() ? ColumnsDescription(schema_from_metadata) : columns_,
constraints_, comment_, format_settings_);
}
@ -133,9 +132,9 @@ private:
DataLakeMetadataPtr current_metadata;
};
using StorageIceberg = IStorageDataLake<IcebergMetadata, S3StorageSettings>;
using StorageDeltaLake = IStorageDataLake<DeltaLakeMetadata, S3StorageSettings>;
using StorageHudi = IStorageDataLake<HudiMetadata, S3StorageSettings>;
using StorageIceberg = IStorageDataLake<IcebergMetadata>;
using StorageDeltaLake = IStorageDataLake<DeltaLakeMetadata>;
using StorageHudi = IStorageDataLake<HudiMetadata>;
}

View File

@ -6,7 +6,6 @@
#include <Storages/ObjectStorage/DataLakes/IStorageDataLake.h>
#include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ObjectStorage/StorageObjectStorageQuerySettings.h>
namespace DB
@ -24,7 +23,7 @@ void registerStorageIceberg(StorageFactory & factory)
StorageObjectStorageConfiguration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageIceberg::create(
configuration, args.getContext(), "Iceberg", args.table_id, args.columns,
configuration, args.getContext(), args.table_id, args.columns,
args.constraints, args.comment, std::nullopt, args.mode);
},
{
@ -47,7 +46,7 @@ void registerStorageDeltaLake(StorageFactory & factory)
StorageObjectStorageConfiguration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageDeltaLake::create(
configuration, args.getContext(), "DeltaLake", args.table_id, args.columns,
configuration, args.getContext(), args.table_id, args.columns,
args.constraints, args.comment, std::nullopt, args.mode);
},
{
@ -68,7 +67,7 @@ void registerStorageHudi(StorageFactory & factory)
StorageObjectStorageConfiguration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageHudi::create(
configuration, args.getContext(), "Hudi", args.table_id, args.columns,
configuration, args.getContext(), args.table_id, args.columns,
args.constraints, args.comment, std::nullopt, args.mode);
},
{

View File

@ -60,6 +60,20 @@ std::string StorageHDFSConfiguration::getPathWithoutGlob() const
return "/";
return path.substr(0, end_of_path_without_globs);
}
StorageObjectStorage::QuerySettings StorageHDFSConfiguration::getQuerySettings(const ContextPtr & context) const
{
const auto & settings = context->getSettingsRef();
return StorageObjectStorage::QuerySettings{
.truncate_on_insert = settings.hdfs_truncate_on_insert,
.create_new_file_on_insert = settings.hdfs_create_new_file_on_insert,
.schema_inference_use_cache = settings.schema_inference_use_cache_for_hdfs,
.schema_inference_mode = settings.schema_inference_mode,
.skip_empty_files = settings.hdfs_skip_empty_files, /// TODO: add setting for hdfs
.list_object_keys_size = settings.s3_list_object_keys_size, /// TODO: add a setting for hdfs
.throw_on_zero_files_match = settings.s3_throw_on_zero_files_match,
.ignore_non_existent_file = settings.hdfs_ignore_file_doesnt_exist,
};
}
void StorageHDFSConfiguration::fromAST(ASTs & args, ContextPtr context, bool with_structure)
{

View File

@ -13,9 +13,15 @@ namespace DB
class StorageHDFSConfiguration : public StorageObjectStorageConfiguration
{
public:
static constexpr auto type_name = "hdfs";
static constexpr auto engine_name = "HDFS";
StorageHDFSConfiguration() = default;
StorageHDFSConfiguration(const StorageHDFSConfiguration & other);
std::string getTypeName() const override { return type_name; }
std::string getEngineName() const override { return engine_name; }
Path getPath() const override { return path; }
void setPath(const Path & path_) override { path = path_; }
@ -25,13 +31,14 @@ public:
String getNamespace() const override { return ""; }
String getDataSourceDescription() override { return url; }
StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr &) const override;
void check(ContextPtr context) const override;
ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly = true) override; /// NOLINT
StorageObjectStorageConfigurationPtr clone() override { return std::make_shared<StorageHDFSConfiguration>(*this); }
static void addStructureAndFormatToArgs(
ASTs & args, const String & structure_, const String & format_, ContextPtr context);
void addStructureAndFormatToArgs(
ASTs & args, const String & structure_, const String & format_, ContextPtr context) override;
std::string getPathWithoutGlob() const override;

View File

@ -1,5 +1,4 @@
#include <Storages/ObjectStorage/ReadBufferIterator.h>
#include <Storages/ObjectStorage/StorageObjectStorageQuerySettings.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <IO/ReadBufferFromFileBase.h>
@ -19,7 +18,6 @@ ReadBufferIterator::ReadBufferIterator(
ConfigurationPtr configuration_,
const FileIterator & file_iterator_,
const std::optional<FormatSettings> & format_settings_,
const StorageObjectStorageSettings & query_settings_,
SchemaCache & schema_cache_,
ObjectInfos & read_keys_,
const ContextPtr & context_)
@ -28,7 +26,7 @@ ReadBufferIterator::ReadBufferIterator(
, configuration(configuration_)
, file_iterator(file_iterator_)
, format_settings(format_settings_)
, query_settings(query_settings_)
, query_settings(configuration->getQuerySettings(context_))
, schema_cache(schema_cache_)
, read_keys(read_keys_)
, format(configuration->format == "auto" ? std::nullopt : std::optional<String>(configuration->format))

View File

@ -2,7 +2,6 @@
#include <Interpreters/Context_fwd.h>
#include <Storages/ObjectStorage/StorageObjectStorage_fwd_internal.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Storages/ObjectStorage/StorageObjectStorageQuerySettings.h>
#include <Formats/ReadSchemaUtils.h>
@ -19,7 +18,6 @@ public:
ConfigurationPtr configuration_,
const FileIterator & file_iterator_,
const std::optional<FormatSettings> & format_settings_,
const StorageObjectStorageSettings & query_settings_,
SchemaCache & schema_cache_,
ObjectInfos & read_keys_,
const ContextPtr & context_);
@ -50,7 +48,7 @@ private:
const ConfigurationPtr configuration;
const FileIterator file_iterator;
const std::optional<FormatSettings> & format_settings;
const StorageObjectStorageSettings query_settings;
const StorageObjectStorage::QuerySettings query_settings;
SchemaCache & schema_cache;
ObjectInfos & read_keys;
std::optional<String> format;

View File

@ -1,11 +1,11 @@
#include <Storages/ObjectStorage/ReadFromStorageObjectStorage.h>
#include <Storages/ObjectStorage/ReadFromObjectStorageStep.h>
#include <Processors/Sources/NullSource.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace DB
{
ReadFromStorageObejctStorage::ReadFromStorageObejctStorage(
ReadFromObjectStorageStep::ReadFromObjectStorageStep(
ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
const String & name_,
@ -14,49 +14,41 @@ ReadFromStorageObejctStorage::ReadFromStorageObejctStorage(
const SelectQueryInfo & query_info_,
const StorageSnapshotPtr & storage_snapshot_,
const std::optional<DB::FormatSettings> & format_settings_,
const StorageObjectStorageSettings & query_settings_,
bool distributed_processing_,
ReadFromFormatInfo info_,
SchemaCache & schema_cache_,
const bool need_only_count_,
ContextPtr context_,
size_t max_block_size_,
size_t num_streams_,
CurrentMetrics::Metric metric_threads_count_,
CurrentMetrics::Metric metric_threads_active_,
CurrentMetrics::Metric metric_threads_scheduled_)
size_t num_streams_)
: SourceStepWithFilter(DataStream{.header = info_.source_header}, columns_to_read, query_info_, storage_snapshot_, context_)
, object_storage(object_storage_)
, configuration(configuration_)
, info(std::move(info_))
, virtual_columns(virtual_columns_)
, format_settings(format_settings_)
, query_settings(query_settings_)
, query_settings(configuration->getQuerySettings(context_))
, schema_cache(schema_cache_)
, name(name_ + "Source")
, need_only_count(need_only_count_)
, max_block_size(max_block_size_)
, num_streams(num_streams_)
, distributed_processing(distributed_processing_)
, metric_threads_count(metric_threads_count_)
, metric_threads_active(metric_threads_active_)
, metric_threads_scheduled(metric_threads_scheduled_)
{
}
void ReadFromStorageObejctStorage::createIterator(const ActionsDAG::Node * predicate)
void ReadFromObjectStorageStep::createIterator(const ActionsDAG::Node * predicate)
{
if (!iterator_wrapper)
{
auto context = getContext();
iterator_wrapper = StorageObjectStorageSource::createFileIterator(
configuration, object_storage, query_settings, distributed_processing,
context, predicate, virtual_columns, nullptr, metric_threads_count,
metric_threads_active, metric_threads_scheduled, context->getFileProgressCallback());
configuration, object_storage, distributed_processing,
context, predicate, virtual_columns, nullptr, context->getFileProgressCallback());
}
}
void ReadFromStorageObejctStorage::applyFilters(ActionDAGNodes added_filter_nodes)
void ReadFromObjectStorageStep::applyFilters(ActionDAGNodes added_filter_nodes)
{
filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes);
const ActionsDAG::Node * predicate = nullptr;
@ -66,7 +58,7 @@ void ReadFromStorageObejctStorage::applyFilters(ActionDAGNodes added_filter_node
createIterator(predicate);
}
void ReadFromStorageObejctStorage::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
void ReadFromObjectStorageStep::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
createIterator(nullptr);
auto context = getContext();
@ -74,13 +66,9 @@ void ReadFromStorageObejctStorage::initializePipeline(QueryPipelineBuilder & pip
Pipes pipes;
for (size_t i = 0; i < num_streams; ++i)
{
auto threadpool = std::make_shared<ThreadPool>(
metric_threads_count, metric_threads_active, metric_threads_scheduled, /* max_threads */1);
auto source = std::make_shared<StorageObjectStorageSource>(
getName(), object_storage, configuration, info, format_settings, query_settings,
context, max_block_size, iterator_wrapper, need_only_count, schema_cache,
std::move(threadpool), metric_threads_count, metric_threads_active, metric_threads_scheduled);
context, max_block_size, iterator_wrapper, need_only_count, schema_cache);
source->setKeyCondition(filter_actions_dag, context);
pipes.emplace_back(std::move(source));

View File

@ -1,17 +1,16 @@
#pragma once
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Storages/ObjectStorage/StorageObjectStorageQuerySettings.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
namespace DB
{
class ReadFromStorageObejctStorage : public SourceStepWithFilter
class ReadFromObjectStorageStep : public SourceStepWithFilter
{
public:
using ConfigurationPtr = StorageObjectStorageConfigurationPtr;
ReadFromStorageObejctStorage(
ReadFromObjectStorageStep(
ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
const String & name_,
@ -20,17 +19,13 @@ public:
const SelectQueryInfo & query_info_,
const StorageSnapshotPtr & storage_snapshot_,
const std::optional<DB::FormatSettings> & format_settings_,
const StorageObjectStorageSettings & query_settings_,
bool distributed_processing_,
ReadFromFormatInfo info_,
SchemaCache & schema_cache_,
bool need_only_count_,
ContextPtr context_,
size_t max_block_size_,
size_t num_streams_,
CurrentMetrics::Metric metric_threads_count_,
CurrentMetrics::Metric metric_threads_active_,
CurrentMetrics::Metric metric_threads_scheduled_);
size_t num_streams_);
std::string getName() const override { return name; }
@ -46,16 +41,13 @@ private:
const ReadFromFormatInfo info;
const NamesAndTypesList virtual_columns;
const std::optional<DB::FormatSettings> format_settings;
const StorageObjectStorageSettings query_settings;
const StorageObjectStorage::QuerySettings query_settings;
SchemaCache & schema_cache;
const String name;
const bool need_only_count;
const size_t max_block_size;
const size_t num_streams;
const bool distributed_processing;
const CurrentMetrics::Metric metric_threads_count;
const CurrentMetrics::Metric metric_threads_active;
const CurrentMetrics::Metric metric_threads_scheduled;
void createIterator(const ActionsDAG::Node * predicate);
};

View File

@ -70,6 +70,21 @@ StorageS3Configuration::StorageS3Configuration(const StorageS3Configuration & ot
keys = other.keys;
}
StorageObjectStorage::QuerySettings StorageS3Configuration::getQuerySettings(const ContextPtr & context) const
{
const auto & settings = context->getSettingsRef();
return StorageObjectStorage::QuerySettings{
.truncate_on_insert = settings.s3_truncate_on_insert,
.create_new_file_on_insert = settings.s3_create_new_file_on_insert,
.schema_inference_use_cache = settings.schema_inference_use_cache_for_s3,
.schema_inference_mode = settings.schema_inference_mode,
.skip_empty_files = settings.s3_skip_empty_files,
.list_object_keys_size = settings.s3_list_object_keys_size,
.throw_on_zero_files_match = settings.s3_throw_on_zero_files_match,
.ignore_non_existent_file = settings.s3_ignore_file_doesnt_exist,
};
}
ObjectStoragePtr StorageS3Configuration::createObjectStorage(ContextPtr context, bool /* is_readonly */) /// NOLINT
{
assertInitialized();

View File

@ -7,6 +7,7 @@
#include <IO/S3/getObjectInfo.h>
#include <Storages/StorageS3Settings.h>
#include <Storages/ObjectStorage/StorageObjectStorageConfiguration.h>
#include <Common/CurrentMetrics.h>
namespace DB
{
@ -14,9 +15,14 @@ namespace DB
class StorageS3Configuration : public StorageObjectStorageConfiguration
{
public:
static constexpr auto type_name = "s3";
StorageS3Configuration() = default;
StorageS3Configuration(const StorageS3Configuration & other);
std::string getTypeName() const override { return type_name; }
std::string getEngineName() const override { return url.storage_name; }
Path getPath() const override { return url.key; }
void setPath(const Path & path) override { url.key = path; }
@ -26,6 +32,7 @@ public:
String getNamespace() const override { return url.bucket; }
String getDataSourceDescription() override;
StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr &) const override;
void check(ContextPtr context) const override;
void validateNamespace(const String & name) const override;
@ -34,8 +41,8 @@ public:
bool isStaticConfiguration() const override { return static_configuration; }
ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly = true) override; /// NOLINT
static void addStructureAndFormatToArgs(
ASTs & args, const String & structure, const String & format, ContextPtr context);
void addStructureAndFormatToArgs(
ASTs & args, const String & structure, const String & format, ContextPtr context) override;
private:
void fromNamedCollection(const NamedCollection & collection) override;

View File

@ -11,10 +11,9 @@
#include <Storages/StorageFactory.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/ObjectStorage/StorageObjectStorageConfiguration.h>
#include <Storages/ObjectStorage/StorageObjectStorageQuerySettings.h>
#include <Storages/ObjectStorage/StorageObjectStorageSink.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Storages/ObjectStorage/ReadFromStorageObjectStorage.h>
#include <Storages/ObjectStorage/ReadFromObjectStorageStep.h>
#include <Storages/ObjectStorage/ReadBufferIterator.h>
#include <Storages/ObjectStorage/Utils.h>
#include <Storages/Cache/SchemaCache.h>
@ -25,53 +24,13 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int DATABASE_ACCESS_DENIED;
extern const int NOT_IMPLEMENTED;
}
template <typename StorageSettings>
std::unique_ptr<StorageInMemoryMetadata> getStorageMetadata(
ObjectStoragePtr object_storage,
const StorageObjectStorageConfigurationPtr & configuration,
const ColumnsDescription & columns,
const ConstraintsDescription & constraints,
std::optional<FormatSettings> format_settings,
const String & comment,
const std::string & engine_name,
const ContextPtr & context)
{
using Storage = StorageObjectStorage<StorageSettings>;
auto storage_metadata = std::make_unique<StorageInMemoryMetadata>();
if (columns.empty())
{
auto fetched_columns = Storage::getTableStructureFromData(object_storage, configuration, format_settings, context);
storage_metadata->setColumns(fetched_columns);
}
else if (!columns.hasOnlyOrdinary())
{
/// We don't allow special columns.
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table engine {} doesn't support special columns "
"like MATERIALIZED, ALIAS or EPHEMERAL", engine_name);
}
else
{
if (configuration->format == "auto")
Storage::setFormatFromData(object_storage, configuration, format_settings, context);
storage_metadata->setColumns(columns);
}
storage_metadata->setConstraints(constraints);
storage_metadata->setComment(comment);
return storage_metadata;
}
template <typename StorageSettings>
StorageObjectStorage<StorageSettings>::StorageObjectStorage(
StorageObjectStorage::StorageObjectStorage(
ConfigurationPtr configuration_,
ObjectStoragePtr object_storage_,
const String & engine_name_,
ContextPtr context,
const StorageID & table_id_,
const ColumnsDescription & columns_,
@ -80,16 +39,13 @@ StorageObjectStorage<StorageSettings>::StorageObjectStorage(
std::optional<FormatSettings> format_settings_,
bool distributed_processing_,
ASTPtr partition_by_)
: IStorage(table_id_, getStorageMetadata<StorageSettings>(
object_storage_, configuration_, columns_, constraints_, format_settings_,
comment, engine_name, context))
, engine_name(engine_name_)
: IStorage(table_id_)
, configuration(configuration_)
, object_storage(object_storage_)
, format_settings(format_settings_)
, partition_by(partition_by_)
, distributed_processing(distributed_processing_)
, log(getLogger("Storage" + engine_name_))
, object_storage(object_storage_)
, configuration(configuration_)
, log(getLogger(fmt::format("Storage{}({})", configuration->getEngineName(), table_id_.getFullTableName())))
{
FormatFactory::instance().checkFormatName(configuration->format);
configuration->check(context);
@ -98,46 +54,41 @@ StorageObjectStorage<StorageSettings>::StorageObjectStorage(
for (const auto & key : configuration->getPaths())
objects.emplace_back(key);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(getInMemoryMetadataPtr()->getColumns()));
auto metadata = getStorageMetadata(
object_storage_, configuration_, columns_,
constraints_, format_settings_, comment, context);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.getColumns()));
setInMemoryMetadata(std::move(metadata));
}
template <typename StorageSettings>
bool StorageObjectStorage<StorageSettings>::prefersLargeBlocks() const
String StorageObjectStorage::getName() const
{
return configuration->getEngineName();
}
bool StorageObjectStorage::prefersLargeBlocks() const
{
return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(configuration->format);
}
template <typename StorageSettings>
bool StorageObjectStorage<StorageSettings>::parallelizeOutputAfterReading(ContextPtr context) const
bool StorageObjectStorage::parallelizeOutputAfterReading(ContextPtr context) const
{
return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration->format, context);
}
template <typename StorageSettings>
bool StorageObjectStorage<StorageSettings>::supportsSubsetOfColumns(const ContextPtr & context) const
bool StorageObjectStorage::supportsSubsetOfColumns(const ContextPtr & context) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->format, context, format_settings);
}
template <typename StorageSettings>
void StorageObjectStorage<StorageSettings>::updateConfiguration(ContextPtr context)
void StorageObjectStorage::updateConfiguration(ContextPtr context)
{
if (!configuration->isStaticConfiguration())
object_storage->applyNewSettings(context->getConfigRef(), "s3.", context);
}
template <typename StorageSettings>
SchemaCache & StorageObjectStorage<StorageSettings>::getSchemaCache(const ContextPtr & context)
{
static SchemaCache schema_cache(
context->getConfigRef().getUInt(
StorageSettings::SCHEMA_CACHE_MAX_ELEMENTS_CONFIG_SETTING,
DEFAULT_SCHEMA_CACHE_ELEMENTS));
return schema_cache;
}
template <typename StorageSettings>
void StorageObjectStorage<StorageSettings>::read(
void StorageObjectStorage::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
@ -155,13 +106,12 @@ void StorageObjectStorage<StorageSettings>::read(
getName());
}
const auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context));
const auto read_from_format_info = prepareReadingFromFormat(
column_names, storage_snapshot, supportsSubsetOfColumns(local_context));
const bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef().optimize_count_from_files;
LOG_TEST(&Poco::Logger::get("KSSENII"), "KSSENII SOURCE HEADER: {}", read_from_format_info.source_header.dumpStructure());
LOG_TEST(&Poco::Logger::get("KSSENII"), "KSSENII FORMAT HEADER: {}", read_from_format_info.format_header.dumpStructure());
auto read_step = std::make_unique<ReadFromStorageObejctStorage>(
auto read_step = std::make_unique<ReadFromObjectStorageStep>(
object_storage,
configuration,
getName(),
@ -170,23 +120,18 @@ void StorageObjectStorage<StorageSettings>::read(
query_info,
storage_snapshot,
format_settings,
StorageSettings::create(local_context->getSettingsRef()),
distributed_processing,
std::move(read_from_format_info),
getSchemaCache(local_context),
need_only_count,
local_context,
max_block_size,
num_streams,
StorageSettings::ObjectStorageThreads(),
StorageSettings::ObjectStorageThreadsActive(),
StorageSettings::ObjectStorageThreadsScheduled());
num_streams);
query_plan.addStep(std::move(read_step));
}
template <typename StorageSettings>
SinkToStoragePtr StorageObjectStorage<StorageSettings>::write(
SinkToStoragePtr StorageObjectStorage::write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr local_context,
@ -194,7 +139,7 @@ SinkToStoragePtr StorageObjectStorage<StorageSettings>::write(
{
updateConfiguration(local_context);
const auto sample_block = metadata_snapshot->getSampleBlock();
const auto & query_settings = StorageSettings::create(local_context->getSettingsRef());
const auto & settings = configuration->getQuerySettings(local_context);
if (configuration->withWildcard())
{
@ -209,23 +154,22 @@ SinkToStoragePtr StorageObjectStorage<StorageSettings>::write(
if (partition_by_ast)
{
LOG_TEST(log, "Using PartitionedSink for {}", configuration->getPath());
return std::make_shared<PartitionedStorageObjectStorageSink>(
object_storage, configuration, query_settings,
format_settings, sample_block, local_context, partition_by_ast);
object_storage, configuration, format_settings, sample_block, local_context, partition_by_ast);
}
}
if (configuration->withGlobs())
{
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
throw Exception(
ErrorCodes::DATABASE_ACCESS_DENIED,
"{} key '{}' contains globs, so the table is in readonly mode",
getName(), configuration->getPath());
}
auto & paths = configuration->getPaths();
if (auto new_key = checkAndGetNewFileOnInsertIfNeeded(
*object_storage, *configuration, query_settings, paths.front(), paths.size()))
*object_storage, *configuration, settings, paths.front(), paths.size()))
{
paths.push_back(*new_key);
}
@ -238,9 +182,11 @@ SinkToStoragePtr StorageObjectStorage<StorageSettings>::write(
local_context);
}
template <typename StorageSettings>
void StorageObjectStorage<StorageSettings>::truncate(
const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
void StorageObjectStorage::truncate(
const ASTPtr &,
const StorageMetadataPtr &,
ContextPtr,
TableExclusiveLockHolder &)
{
if (configuration->withGlobs())
{
@ -257,34 +203,37 @@ void StorageObjectStorage<StorageSettings>::truncate(
object_storage->removeObjectsIfExist(objects);
}
template <typename StorageSettings>
std::unique_ptr<ReadBufferIterator> StorageObjectStorage<StorageSettings>::createReadBufferIterator(
std::unique_ptr<ReadBufferIterator> StorageObjectStorage::createReadBufferIterator(
const ObjectStoragePtr & object_storage,
const ConfigurationPtr & configuration,
const std::optional<FormatSettings> & format_settings,
ObjectInfos & read_keys,
const ContextPtr & context)
{
const auto settings = StorageSettings::create(context->getSettingsRef());
auto file_iterator = StorageObjectStorageSource::createFileIterator(
configuration, object_storage, settings, /* distributed_processing */false,
context, /* predicate */{}, /* virtual_columns */{}, &read_keys,
StorageSettings::ObjectStorageThreads(), StorageSettings::ObjectStorageThreadsActive(), StorageSettings::ObjectStorageThreadsScheduled());
configuration,
object_storage,
false/* distributed_processing */,
context,
{}/* predicate */,
{}/* virtual_columns */,
&read_keys);
return std::make_unique<ReadBufferIterator>(
object_storage, configuration, file_iterator,
format_settings, StorageSettings::create(context->getSettingsRef()), getSchemaCache(context), read_keys, context);
format_settings, getSchemaCache(context, configuration->getTypeName()), read_keys, context);
}
template <typename StorageSettings>
ColumnsDescription StorageObjectStorage<StorageSettings>::getTableStructureFromData(
ColumnsDescription StorageObjectStorage::getTableStructureFromData(
const ObjectStoragePtr & object_storage,
const ConfigurationPtr & configuration,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context)
{
ObjectInfos read_keys;
auto read_buffer_iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context);
auto read_buffer_iterator = createReadBufferIterator(
object_storage, configuration, format_settings, read_keys, context);
if (configuration->format == "auto")
{
auto [columns, format] = detectFormatAndReadSchema(format_settings, *read_buffer_iterator, context);
@ -297,20 +246,34 @@ ColumnsDescription StorageObjectStorage<StorageSettings>::getTableStructureFromD
}
}
template <typename StorageSettings>
void StorageObjectStorage<StorageSettings>::setFormatFromData(
void StorageObjectStorage::setFormatFromData(
const ObjectStoragePtr & object_storage,
const ConfigurationPtr & configuration,
const std::optional<FormatSettings> & format_settings,
const ContextPtr & context)
{
ObjectInfos read_keys;
auto read_buffer_iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context);
auto read_buffer_iterator = createReadBufferIterator(
object_storage, configuration, format_settings, read_keys, context);
configuration->format = detectFormatAndReadSchema(format_settings, *read_buffer_iterator, context).second;
}
template class StorageObjectStorage<S3StorageSettings>;
template class StorageObjectStorage<AzureStorageSettings>;
template class StorageObjectStorage<HDFSStorageSettings>;
SchemaCache & StorageObjectStorage::getSchemaCache(const ContextPtr & context)
{
static SchemaCache schema_cache(
context->getConfigRef().getUInt(
"schema_inference_cache_max_elements_for_" + configuration->getTypeName(),
DEFAULT_SCHEMA_CACHE_ELEMENTS));
return schema_cache;
}
SchemaCache & StorageObjectStorage::getSchemaCache(const ContextPtr & context, const std::string & storage_type_name)
{
static SchemaCache schema_cache(
context->getConfigRef().getUInt(
"schema_inference_cache_max_elements_for_" + storage_type_name,
DEFAULT_SCHEMA_CACHE_ELEMENTS));
return schema_cache;
}
}

View File

@ -1,31 +1,22 @@
#pragma once
#include <Common/re2.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Common/threadPoolCallbackRunner.h>
#include <Common/logger_useful.h>
#include <Storages/IStorage.h>
#include <Storages/prepareReadingFromFormat.h>
#include <Processors/Formats/IInputFormat.h>
namespace DB
{
struct SelectQueryInfo;
class StorageObjectStorageConfiguration;
struct S3StorageSettings;
struct HDFSStorageSettings;
struct AzureStorageSettings;
class PullingPipelineExecutor;
using ReadTaskCallback = std::function<String()>;
class IOutputFormat;
class IInputFormat;
class SchemaCache;
class ReadBufferIterator;
class SchemaCache;
template <typename StorageSettings>
/**
* A general class containing implementation for external table engines
* such as StorageS3, StorageAzure, StorageHDFS.
* Works with an object of IObjectStorage class.
*/
class StorageObjectStorage : public IStorage
{
public:
@ -35,10 +26,26 @@ public:
using ObjectInfoPtr = std::shared_ptr<ObjectInfo>;
using ObjectInfos = std::vector<ObjectInfoPtr>;
struct QuerySettings
{
/// Insert settings:
bool truncate_on_insert;
bool create_new_file_on_insert;
/// Schema inference settings:
bool schema_inference_use_cache;
SchemaInferenceMode schema_inference_mode;
/// List settings:
bool skip_empty_files;
size_t list_object_keys_size;
bool throw_on_zero_files_match;
bool ignore_non_existent_file;
};
StorageObjectStorage(
ConfigurationPtr configuration_,
ObjectStoragePtr object_storage_,
const String & engine_name_,
ContextPtr context_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
@ -48,17 +55,17 @@ public:
bool distributed_processing_ = false,
ASTPtr partition_by_ = nullptr);
String getName() const override { return engine_name; }
String getName() const override;
void read(
QueryPlan & query_plan,
const Names &,
const StorageSnapshotPtr &,
SelectQueryInfo &,
ContextPtr,
QueryProcessingStage::Enum,
size_t,
size_t) override;
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams) override;
SinkToStoragePtr write(
const ASTPtr & query,
@ -84,7 +91,9 @@ public:
bool parallelizeOutputAfterReading(ContextPtr context) const override;
static SchemaCache & getSchemaCache(const ContextPtr & context);
SchemaCache & getSchemaCache(const ContextPtr & context);
static SchemaCache & getSchemaCache(const ContextPtr & context, const std::string & storage_type_name);
static ColumnsDescription getTableStructureFromData(
const ObjectStoragePtr & object_storage,
@ -108,19 +117,15 @@ protected:
ObjectInfos & read_keys,
const ContextPtr & context);
ConfigurationPtr configuration;
const ObjectStoragePtr object_storage;
const std::string engine_name;
std::optional<FormatSettings> format_settings;
const std::optional<FormatSettings> format_settings;
const ASTPtr partition_by;
const bool distributed_processing;
LoggerPtr log;
ObjectStoragePtr object_storage;
ConfigurationPtr configuration;
std::mutex configuration_update_mutex;
};
using StorageS3 = StorageObjectStorage<S3StorageSettings>;
using StorageAzureBlob = StorageObjectStorage<AzureStorageSettings>;
using StorageHDFS = StorageObjectStorage<HDFSStorageSettings>;
}

View File

@ -15,6 +15,7 @@
#include <Storages/ObjectStorage/StorageObjectStorageConfiguration.h>
#include <Common/Exception.h>
#include <Parsers/queryToString.h>
#include <Storages/ObjectStorage/Utils.h>
namespace DB
{
@ -24,47 +25,34 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
template <typename Definition, typename StorageSettings, typename Configuration>
StorageObjectStorageCluster<Definition, StorageSettings, Configuration>::StorageObjectStorageCluster(
StorageObjectStorageCluster::StorageObjectStorageCluster(
const String & cluster_name_,
const Storage::ConfigurationPtr & configuration_,
ConfigurationPtr configuration_,
ObjectStoragePtr object_storage_,
const String & engine_name_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_)
: IStorageCluster(cluster_name_,
table_id_,
getLogger(fmt::format("{}({})", engine_name_, table_id_.table_name)))
, engine_name(engine_name_)
: IStorageCluster(
cluster_name_, table_id_, getLogger(fmt::format("{}({})", configuration_->getEngineName(), table_id_.table_name)))
, configuration{configuration_}
, object_storage(object_storage_)
{
configuration->check(context_);
StorageInMemoryMetadata storage_metadata;
auto metadata = getStorageMetadata(
object_storage, configuration, columns_, constraints_,
{}/* format_settings */, ""/* comment */, context_);
if (columns_.empty())
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.getColumns()));
setInMemoryMetadata(std::move(metadata));
}
std::string StorageObjectStorageCluster::getName() const
{
ColumnsDescription columns = Storage::getTableStructureFromData(object_storage, configuration, /*format_settings=*/std::nullopt, context_);
storage_metadata.setColumns(columns);
}
else
{
if (configuration->format == "auto")
StorageS3::setFormatFromData(object_storage, configuration, /*format_settings=*/std::nullopt, context_);
storage_metadata.setColumns(columns_);
return configuration->getEngineName();
}
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.getColumns()));
}
template <typename Definition, typename StorageSettings, typename Configuration>
void StorageObjectStorageCluster<Definition, StorageSettings, Configuration>::updateQueryToSendIfNeeded(
void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
ASTPtr & query,
const DB::StorageSnapshotPtr & storage_snapshot,
const ContextPtr & context)
@ -72,24 +60,32 @@ void StorageObjectStorageCluster<Definition, StorageSettings, Configuration>::up
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query);
if (!expression_list)
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Expected SELECT query from table function {}, got '{}'",
engine_name, queryToString(query));
configuration->getEngineName(), queryToString(query));
}
TableFunction::updateStructureAndFormatArgumentsIfNeeded(
expression_list->children,
storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(),
configuration->format,
context);
ASTs & args = expression_list->children;
const auto & structure = storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription();
if (args.empty())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Unexpected empty list of arguments for {}Cluster table function",
configuration->getEngineName());
}
template <typename Definition, typename StorageSettings, typename Configuration>
RemoteQueryExecutor::Extension
StorageObjectStorageCluster<Definition, StorageSettings, Configuration>::getTaskIteratorExtension(
ASTPtr cluster_name_arg = args.front();
args.erase(args.begin());
configuration->addStructureAndFormatToArgs(args, structure, configuration->format, context);
args.insert(args.begin(), cluster_name_arg);
}
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
const ActionsDAG::Node * predicate, const ContextPtr & local_context) const
{
const auto settings = StorageSettings::create(local_context->getSettingsRef());
const auto settings = configuration->getQuerySettings(local_context);
auto iterator = std::make_shared<StorageObjectStorageSource::GlobIterator>(
object_storage, configuration, predicate, virtual_columns, local_context,
nullptr, settings.list_object_keys_size, settings.throw_on_zero_files_match,
@ -106,17 +102,4 @@ StorageObjectStorageCluster<Definition, StorageSettings, Configuration>::getTask
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
}
#if USE_AWS_S3
template class StorageObjectStorageCluster<S3ClusterDefinition, S3StorageSettings, StorageS3Configuration>;
#endif
#if USE_AZURE_BLOB_STORAGE
template class StorageObjectStorageCluster<AzureClusterDefinition, AzureStorageSettings, StorageAzureBlobConfiguration>;
#endif
#if USE_HDFS
template class StorageObjectStorageCluster<HDFSClusterDefinition, HDFSStorageSettings, StorageHDFSConfiguration>;
#endif
}

View File

@ -11,32 +11,25 @@
namespace DB
{
class StorageS3Settings;
class StorageAzureBlobSettings;
class Context;
template <typename Definition, typename StorageSettings, typename Configuration>
class StorageObjectStorageCluster : public IStorageCluster
{
public:
using Storage = StorageObjectStorage<StorageSettings>;
using TableFunction = TableFunctionObjectStorageCluster<Definition, StorageSettings, Configuration>;
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
StorageObjectStorageCluster(
const String & cluster_name_,
const Storage::ConfigurationPtr & configuration_,
ConfigurationPtr configuration_,
ObjectStoragePtr object_storage_,
const String & engine_name_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_);
std::string getName() const override { return engine_name; }
std::string getName() const override;
RemoteQueryExecutor::Extension
getTaskIteratorExtension(
RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const ContextPtr & context) const override;
@ -53,20 +46,9 @@ private:
const ContextPtr & context) override;
const String engine_name;
const Storage::ConfigurationPtr configuration;
const StorageObjectStorage::ConfigurationPtr configuration;
const ObjectStoragePtr object_storage;
NamesAndTypesList virtual_columns;
};
#if USE_AWS_S3
using StorageS3Cluster = StorageObjectStorageCluster<S3ClusterDefinition, S3StorageSettings, StorageS3Configuration>;
#endif
#if USE_AZURE_BLOB_STORAGE
using StorageAzureBlobCluster = StorageObjectStorageCluster<AzureClusterDefinition, AzureStorageSettings, StorageAzureBlobConfiguration>;
#endif
#if USE_HDFS
using StorageHDFSCluster = StorageObjectStorageCluster<HDFSClusterDefinition, HDFSStorageSettings, StorageHDFSConfiguration>;
#endif
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Storages/NamedCollectionsHelpers.h>
#include "StorageObjectStorage.h"
#include <filesystem>
namespace fs = std::filesystem;
@ -27,6 +28,9 @@ public:
ContextPtr local_context,
bool with_table_structure);
virtual std::string getTypeName() const = 0;
virtual std::string getEngineName() const = 0;
virtual Path getPath() const = 0;
virtual void setPath(const Path & path) = 0;
@ -36,6 +40,9 @@ public:
virtual String getDataSourceDescription() = 0;
virtual String getNamespace() const = 0;
virtual StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr &) const = 0;
virtual void addStructureAndFormatToArgs(
ASTs & args, const String & structure_, const String & format_, ContextPtr context) = 0;
bool withWildcard() const;
bool withGlobs() const { return isPathWithGlobs() || isNamespaceWithGlobs(); }

View File

@ -1,102 +0,0 @@
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Core/Settings.h>
#include <Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric ObjectStorageAzureThreads;
extern const Metric ObjectStorageAzureThreadsActive;
extern const Metric ObjectStorageAzureThreadsScheduled;
extern const Metric ObjectStorageS3Threads;
extern const Metric ObjectStorageS3ThreadsActive;
extern const Metric ObjectStorageS3ThreadsScheduled;
}
namespace DB
{
struct StorageObjectStorageSettings
{
bool truncate_on_insert;
bool create_new_file_on_insert;
bool schema_inference_use_cache;
SchemaInferenceMode schema_inference_mode;
bool skip_empty_files;
size_t list_object_keys_size;
bool throw_on_zero_files_match;
bool ignore_non_existent_file;
};
struct S3StorageSettings
{
static StorageObjectStorageSettings create(const Settings & settings)
{
return StorageObjectStorageSettings{
.truncate_on_insert = settings.s3_truncate_on_insert,
.create_new_file_on_insert = settings.s3_create_new_file_on_insert,
.schema_inference_use_cache = settings.schema_inference_use_cache_for_s3,
.schema_inference_mode = settings.schema_inference_mode,
.skip_empty_files = settings.s3_skip_empty_files,
.list_object_keys_size = settings.s3_list_object_keys_size,
.throw_on_zero_files_match = settings.s3_throw_on_zero_files_match,
.ignore_non_existent_file = settings.s3_ignore_file_doesnt_exist,
};
}
static constexpr auto SCHEMA_CACHE_MAX_ELEMENTS_CONFIG_SETTING = "schema_inference_cache_max_elements_for_s3";
static CurrentMetrics::Metric ObjectStorageThreads() { return CurrentMetrics::ObjectStorageS3Threads; } /// NOLINT
static CurrentMetrics::Metric ObjectStorageThreadsActive() { return CurrentMetrics::ObjectStorageS3ThreadsActive; } /// NOLINT
static CurrentMetrics::Metric ObjectStorageThreadsScheduled() { return CurrentMetrics::ObjectStorageS3ThreadsScheduled; } /// NOLINT
};
struct AzureStorageSettings
{
static StorageObjectStorageSettings create(const Settings & settings)
{
return StorageObjectStorageSettings{
.truncate_on_insert = settings.azure_truncate_on_insert,
.create_new_file_on_insert = settings.azure_create_new_file_on_insert,
.schema_inference_use_cache = settings.schema_inference_use_cache_for_azure,
.schema_inference_mode = settings.schema_inference_mode,
.skip_empty_files = settings.s3_skip_empty_files, /// TODO: add setting for azure
.list_object_keys_size = settings.azure_list_object_keys_size,
.throw_on_zero_files_match = settings.s3_throw_on_zero_files_match,
.ignore_non_existent_file = settings.azure_ignore_file_doesnt_exist,
};
}
static constexpr auto SCHEMA_CACHE_MAX_ELEMENTS_CONFIG_SETTING = "schema_inference_cache_max_elements_for_azure";
static CurrentMetrics::Metric ObjectStorageThreads() { return CurrentMetrics::ObjectStorageAzureThreads; } /// NOLINT
static CurrentMetrics::Metric ObjectStorageThreadsActive() { return CurrentMetrics::ObjectStorageAzureThreadsActive; } /// NOLINT
static CurrentMetrics::Metric ObjectStorageThreadsScheduled() { return CurrentMetrics::ObjectStorageAzureThreadsScheduled; } /// NOLINT
};
struct HDFSStorageSettings
{
static StorageObjectStorageSettings create(const Settings & settings)
{
return StorageObjectStorageSettings{
.truncate_on_insert = settings.hdfs_truncate_on_insert,
.create_new_file_on_insert = settings.hdfs_create_new_file_on_insert,
.schema_inference_use_cache = settings.schema_inference_use_cache_for_hdfs,
.schema_inference_mode = settings.schema_inference_mode,
.skip_empty_files = settings.hdfs_skip_empty_files, /// TODO: add setting for hdfs
.list_object_keys_size = settings.s3_list_object_keys_size, /// TODO: add a setting for hdfs
.throw_on_zero_files_match = settings.s3_throw_on_zero_files_match,
.ignore_non_existent_file = settings.hdfs_ignore_file_doesnt_exist,
};
}
static constexpr auto SCHEMA_CACHE_MAX_ELEMENTS_CONFIG_SETTING = "schema_inference_cache_max_elements_for_hdfs";
/// TODO: s3 -> hdfs
static CurrentMetrics::Metric ObjectStorageThreads() { return CurrentMetrics::ObjectStorageS3Threads; } /// NOLINT
static CurrentMetrics::Metric ObjectStorageThreadsActive() { return CurrentMetrics::ObjectStorageS3ThreadsActive; } /// NOLINT
static CurrentMetrics::Metric ObjectStorageThreadsScheduled() { return CurrentMetrics::ObjectStorageS3ThreadsScheduled; } /// NOLINT
};
}

View File

@ -103,7 +103,6 @@ void StorageObjectStorageSink::release()
PartitionedStorageObjectStorageSink::PartitionedStorageObjectStorageSink(
ObjectStoragePtr object_storage_,
StorageObjectStorageConfigurationPtr configuration_,
const StorageObjectStorageSettings & query_settings_,
std::optional<FormatSettings> format_settings_,
const Block & sample_block_,
ContextPtr context_,
@ -111,7 +110,7 @@ PartitionedStorageObjectStorageSink::PartitionedStorageObjectStorageSink(
: PartitionedSink(partition_by, context_, sample_block_)
, object_storage(object_storage_)
, configuration(configuration_)
, query_settings(query_settings_)
, query_settings(configuration_->getQuerySettings(context_))
, format_settings(format_settings_)
, sample_block(sample_block_)
, context(context_)

View File

@ -1,7 +1,6 @@
#pragma once
#include <Storages/PartitionedSink.h>
#include <Storages/ObjectStorage/StorageObjectStorageConfiguration.h>
#include <Storages/ObjectStorage/StorageObjectStorageQuerySettings.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Disks/ObjectStorages/IObjectStorage_fwd.h>
@ -47,7 +46,6 @@ public:
PartitionedStorageObjectStorageSink(
ObjectStoragePtr object_storage_,
StorageObjectStorageConfigurationPtr configuration_,
const StorageObjectStorageSettings & query_settings_,
std::optional<FormatSettings> format_settings_,
const Block & sample_block_,
ContextPtr context_,
@ -61,7 +59,7 @@ private:
ObjectStoragePtr object_storage;
StorageObjectStorageConfigurationPtr configuration;
const StorageObjectStorageSettings query_settings;
const StorageObjectStorage::QuerySettings query_settings;
const std::optional<FormatSettings> format_settings;
const Block sample_block;
const ContextPtr context;

View File

@ -10,7 +10,6 @@
#include <Formats/FormatFactory.h>
#include <Formats/ReadSchemaUtils.h>
#include <Storages/ObjectStorage/StorageObjectStorageConfiguration.h>
#include <Storages/ObjectStorage/StorageObjectStorageQuerySettings.h>
#include <Storages/Cache/SchemaCache.h>
#include <Common/parseGlobs.h>
@ -20,6 +19,13 @@ namespace ProfileEvents
extern const Event EngineFileLikeReadFiles;
}
namespace CurrentMetrics
{
extern const Metric StorageObjectStorageThreads;
extern const Metric StorageObjectStorageThreadsActive;
extern const Metric StorageObjectStorageThreadsScheduled;
}
namespace DB
{
@ -37,16 +43,12 @@ StorageObjectStorageSource::StorageObjectStorageSource(
ConfigurationPtr configuration_,
const ReadFromFormatInfo & info,
std::optional<FormatSettings> format_settings_,
const StorageObjectStorageSettings & query_settings_,
const StorageObjectStorage::QuerySettings & query_settings_,
ContextPtr context_,
UInt64 max_block_size_,
std::shared_ptr<IIterator> file_iterator_,
bool need_only_count_,
SchemaCache & schema_cache_,
std::shared_ptr<ThreadPool> reader_pool_,
CurrentMetrics::Metric metric_threads_,
CurrentMetrics::Metric metric_threads_active_,
CurrentMetrics::Metric metric_threads_scheduled_)
SchemaCache & schema_cache_)
: SourceWithKeyCondition(info.source_header, false)
, WithContext(context_)
, name(std::move(name_))
@ -57,13 +59,14 @@ StorageObjectStorageSource::StorageObjectStorageSource(
, max_block_size(max_block_size_)
, need_only_count(need_only_count_)
, read_from_format_info(info)
, create_reader_pool(reader_pool_)
, create_reader_pool(std::make_shared<ThreadPool>(
CurrentMetrics::StorageObjectStorageThreads,
CurrentMetrics::StorageObjectStorageThreadsActive,
CurrentMetrics::StorageObjectStorageThreadsScheduled,
1/* max_threads */))
, columns_desc(info.columns_description)
, file_iterator(file_iterator_)
, schema_cache(schema_cache_)
, metric_threads(metric_threads_)
, metric_threads_active(metric_threads_active_)
, metric_threads_scheduled(metric_threads_scheduled_)
, create_reader_scheduler(threadPoolCallbackRunnerUnsafe<ReaderHolder>(*create_reader_pool, "Reader"))
{
}
@ -76,26 +79,23 @@ StorageObjectStorageSource::~StorageObjectStorageSource()
std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSource::createFileIterator(
ConfigurationPtr configuration,
ObjectStoragePtr object_storage,
const StorageObjectStorageSettings & settings,
bool distributed_processing,
const ContextPtr & local_context,
const ActionsDAG::Node * predicate,
const NamesAndTypesList & virtual_columns,
ObjectInfos * read_keys,
CurrentMetrics::Metric metric_threads_,
CurrentMetrics::Metric metric_threads_active_,
CurrentMetrics::Metric metric_threads_scheduled_,
std::function<void(FileProgress)> file_progress_callback)
{
if (distributed_processing)
return std::make_shared<ReadTaskIterator>(
local_context->getReadTaskCallback(),
local_context->getSettingsRef().max_threads,
metric_threads_, metric_threads_active_, metric_threads_scheduled_);
local_context->getSettingsRef().max_threads);
if (configuration->isNamespaceWithGlobs())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expression can not have wildcards inside namespace name");
auto settings = configuration->getQuerySettings(local_context);
if (configuration->isPathWithGlobs())
{
/// Iterate through disclosed globs and make a source for each file
@ -568,7 +568,8 @@ StorageObjectStorageSource::ReaderHolder::ReaderHolder(
{
}
StorageObjectStorageSource::ReaderHolder & StorageObjectStorageSource::ReaderHolder::operator=(ReaderHolder && other) noexcept
StorageObjectStorageSource::ReaderHolder &
StorageObjectStorageSource::ReaderHolder::operator=(ReaderHolder && other) noexcept
{
/// The order of destruction is important.
/// reader uses pipeline, pipeline uses read_buf.
@ -581,15 +582,15 @@ StorageObjectStorageSource::ReaderHolder & StorageObjectStorageSource::ReaderHol
}
StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator(
const ReadTaskCallback & callback_,
size_t max_threads_count,
CurrentMetrics::Metric metric_threads_,
CurrentMetrics::Metric metric_threads_active_,
CurrentMetrics::Metric metric_threads_scheduled_)
const ReadTaskCallback & callback_, size_t max_threads_count)
: IIterator("ReadTaskIterator")
, callback(callback_)
{
ThreadPool pool(metric_threads_, metric_threads_active_, metric_threads_scheduled_, max_threads_count);
ThreadPool pool(
CurrentMetrics::StorageObjectStorageThreads,
CurrentMetrics::StorageObjectStorageThreadsActive,
CurrentMetrics::StorageObjectStorageThreadsScheduled, max_threads_count);
auto pool_scheduler = threadPoolCallbackRunnerUnsafe<String>(pool, "ReadTaskIter");
std::vector<std::future<String>> keys;

View File

@ -3,7 +3,6 @@
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Interpreters/Context_fwd.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorageQuerySettings.h>
#include <Storages/ObjectStorage/StorageObjectStorage_fwd_internal.h>
#include <Processors/Formats/IInputFormat.h>
@ -28,16 +27,12 @@ public:
ConfigurationPtr configuration,
const ReadFromFormatInfo & info,
std::optional<FormatSettings> format_settings_,
const StorageObjectStorageSettings & query_settings_,
const StorageObjectStorage::QuerySettings & query_settings_,
ContextPtr context_,
UInt64 max_block_size_,
std::shared_ptr<IIterator> file_iterator_,
bool need_only_count_,
SchemaCache & schema_cache_,
std::shared_ptr<ThreadPool> reader_pool_,
CurrentMetrics::Metric metric_threads_,
CurrentMetrics::Metric metric_threads_active_,
CurrentMetrics::Metric metric_threads_scheduled_);
SchemaCache & schema_cache_);
~StorageObjectStorageSource() override;
@ -53,15 +48,11 @@ public:
static std::shared_ptr<IIterator> createFileIterator(
ConfigurationPtr configuration,
ObjectStoragePtr object_storage,
const StorageObjectStorageSettings & settings,
bool distributed_processing,
const ContextPtr & local_context,
const ActionsDAG::Node * predicate,
const NamesAndTypesList & virtual_columns,
ObjectInfos * read_keys,
CurrentMetrics::Metric metric_threads_,
CurrentMetrics::Metric metric_threads_active_,
CurrentMetrics::Metric metric_threads_scheduled_,
std::function<void(FileProgress)> file_progress_callback = {});
protected:
@ -69,7 +60,7 @@ protected:
ObjectStoragePtr object_storage;
const ConfigurationPtr configuration;
const std::optional<FormatSettings> format_settings;
const StorageObjectStorageSettings query_settings;
const StorageObjectStorage::QuerySettings query_settings;
const UInt64 max_block_size;
const bool need_only_count;
const ReadFromFormatInfo read_from_format_info;
@ -79,10 +70,6 @@ protected:
SchemaCache & schema_cache;
bool initialized = false;
const CurrentMetrics::Metric metric_threads;
const CurrentMetrics::Metric metric_threads_active;
const CurrentMetrics::Metric metric_threads_scheduled;
size_t total_rows_in_file = 0;
LoggerPtr log = getLogger("StorageObjectStorageSource");
@ -149,12 +136,7 @@ protected:
class StorageObjectStorageSource::ReadTaskIterator : public IIterator
{
public:
ReadTaskIterator(
const ReadTaskCallback & callback_,
size_t max_threads_count,
CurrentMetrics::Metric metric_threads_,
CurrentMetrics::Metric metric_threads_active_,
CurrentMetrics::Metric metric_threads_scheduled_);
ReadTaskIterator(const ReadTaskCallback & callback_, size_t max_threads_count);
size_t estimatedKeysCount() override { return buffer.size(); }

View File

@ -1,8 +1,6 @@
#include <Storages/ObjectStorage/Utils.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorageConfiguration.h>
#include <Storages/ObjectStorage/StorageObjectStorageQuerySettings.h>
namespace DB
{
@ -15,15 +13,15 @@ namespace ErrorCodes
std::optional<String> checkAndGetNewFileOnInsertIfNeeded(
const IObjectStorage & object_storage,
const StorageObjectStorageConfiguration & configuration,
const StorageObjectStorageSettings & query_settings,
const StorageObjectStorage::QuerySettings & settings,
const String & key,
size_t sequence_number)
{
if (query_settings.truncate_on_insert
if (settings.truncate_on_insert
|| !object_storage.exists(StoredObject(key)))
return std::nullopt;
if (query_settings.create_new_file_on_insert)
if (settings.create_new_file_on_insert)
{
auto pos = key.find_first_of('.');
String new_key;
@ -45,4 +43,38 @@ std::optional<String> checkAndGetNewFileOnInsertIfNeeded(
configuration.getNamespace(), key);
}
StorageInMemoryMetadata getStorageMetadata(
ObjectStoragePtr object_storage,
const StorageObjectStorageConfigurationPtr & configuration,
const ColumnsDescription & columns,
const ConstraintsDescription & constraints,
std::optional<FormatSettings> format_settings,
const String & comment,
const ContextPtr & context)
{
StorageInMemoryMetadata storage_metadata;
if (columns.empty())
{
auto fetched_columns = StorageObjectStorage::getTableStructureFromData(object_storage, configuration, format_settings, context);
storage_metadata.setColumns(fetched_columns);
}
else if (!columns.hasOnlyOrdinary())
{
/// We don't allow special columns.
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Special columns are not supported for {} storage"
"like MATERIALIZED, ALIAS or EPHEMERAL", configuration->getTypeName());
}
else
{
if (configuration->format == "auto")
StorageObjectStorage::setFormatFromData(object_storage, configuration, format_settings, context);
storage_metadata.setColumns(columns);
}
storage_metadata.setConstraints(constraints);
storage_metadata.setComment(comment);
return storage_metadata;
}
}

View File

@ -1,18 +1,30 @@
#pragma once
#include <Core/Types.h>
#include "StorageObjectStorage.h"
namespace DB
{
class IObjectStorage;
class StorageObjectStorageConfiguration;
using StorageObjectStorageConfigurationPtr = std::shared_ptr<StorageObjectStorageConfiguration>;
struct StorageObjectStorageSettings;
std::optional<std::string> checkAndGetNewFileOnInsertIfNeeded(
const IObjectStorage & object_storage,
const StorageObjectStorageConfiguration & configuration,
const StorageObjectStorageSettings & query_settings,
const StorageObjectStorage::QuerySettings & settings,
const std::string & key,
size_t sequence_number);
StorageInMemoryMetadata getStorageMetadata(
ObjectStoragePtr object_storage,
const StorageObjectStorageConfigurationPtr & configuration,
const ColumnsDescription & columns,
const ConstraintsDescription & constraints,
std::optional<FormatSettings> format_settings,
const String & comment,
const ContextPtr & context);
}

View File

@ -2,22 +2,23 @@
#include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ObjectStorage/HDFS/Configuration.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorageConfiguration.h>
#include <Storages/StorageFactory.h>
#include <Formats/FormatFactory.h>
namespace DB
{
#if USE_AWS_S3 || USE_AZURE_BLOB_STORAGE || USE_HDFS
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
template <typename StorageSettings>
static std::shared_ptr<StorageObjectStorage<StorageSettings>> createStorageObjectStorage(
static std::shared_ptr<StorageObjectStorage> createStorageObjectStorage(
const StorageFactory::Arguments & args,
typename StorageObjectStorage<StorageSettings>::ConfigurationPtr configuration,
const String & engine_name,
typename StorageObjectStorage::ConfigurationPtr configuration,
ContextPtr context)
{
auto & engine_args = args.engine_args;
@ -54,10 +55,9 @@ static std::shared_ptr<StorageObjectStorage<StorageSettings>> createStorageObjec
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
return std::make_shared<StorageObjectStorage<StorageSettings>>(
return std::make_shared<StorageObjectStorage>(
configuration,
configuration->createObjectStorage(context),
engine_name,
args.getContext(),
args.table_id,
args.columns,
@ -68,6 +68,8 @@ static std::shared_ptr<StorageObjectStorage<StorageSettings>> createStorageObjec
partition_by);
}
#endif
#if USE_AZURE_BLOB_STORAGE
void registerStorageAzure(StorageFactory & factory)
{
@ -76,7 +78,7 @@ void registerStorageAzure(StorageFactory & factory)
auto context = args.getLocalContext();
auto configuration = std::make_shared<StorageAzureBlobConfiguration>();
StorageObjectStorageConfiguration::initialize(*configuration, args.engine_args, context, false);
return createStorageObjectStorage<AzureStorageSettings>(args, configuration, "Azure", context);
return createStorageObjectStorage(args, configuration, context);
},
{
.supports_settings = true,
@ -95,7 +97,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
auto context = args.getLocalContext();
auto configuration = std::make_shared<StorageS3Configuration>();
StorageObjectStorageConfiguration::initialize(*configuration, args.engine_args, context, false);
return createStorageObjectStorage<S3StorageSettings>(args, configuration, name, context);
return createStorageObjectStorage(args, configuration, context);
},
{
.supports_settings = true,
@ -130,7 +132,7 @@ void registerStorageHDFS(StorageFactory & factory)
auto context = args.getLocalContext();
auto configuration = std::make_shared<StorageHDFSConfiguration>();
StorageObjectStorageConfiguration::initialize(*configuration, args.engine_args, context, false);
return createStorageObjectStorage<HDFSStorageSettings>(args, configuration, "HDFS", context);
return createStorageObjectStorage(args, configuration, context);
},
{
.supports_settings = true,

View File

@ -7,7 +7,6 @@
#include <Storages/S3Queue/S3QueueFilesMetadata.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Storages/ObjectStorage/StorageObjectStorageQuerySettings.h>
#include <Interpreters/S3QueueLog.h>
@ -21,7 +20,7 @@ struct ObjectMetadata;
class StorageS3QueueSource : public ISource, WithContext
{
public:
using Storage = StorageObjectStorage<S3StorageSettings>;
using Storage = StorageObjectStorage;
using ConfigurationPtr = Storage::ConfigurationPtr;
using GlobIterator = StorageObjectStorageSource::GlobIterator;

View File

@ -37,13 +37,6 @@ namespace ProfileEvents
extern const Event S3ListObjects;
}
namespace CurrentMetrics
{
extern const Metric ObjectStorageS3Threads;
extern const Metric ObjectStorageS3ThreadsActive;
extern const Metric ObjectStorageS3ThreadsScheduled;
}
namespace DB
{
@ -151,14 +144,14 @@ StorageS3Queue::StorageS3Queue(
StorageInMemoryMetadata storage_metadata;
if (columns_.empty())
{
auto columns = Storage::getTableStructureFromData(object_storage, configuration, format_settings, context_);
auto columns = StorageObjectStorage::getTableStructureFromData(object_storage, configuration, format_settings, context_);
storage_metadata.setColumns(columns);
}
else
{
if (configuration->format == "auto")
{
StorageObjectStorage<S3StorageSettings>::setFormatFromData(object_storage, configuration, format_settings, context_);
StorageObjectStorage::setFormatFromData(object_storage, configuration, format_settings, context_);
}
storage_metadata.setColumns(columns_);
}
@ -370,26 +363,18 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
size_t max_block_size,
ContextPtr local_context)
{
auto threadpool = std::make_shared<ThreadPool>(CurrentMetrics::ObjectStorageS3Threads,
CurrentMetrics::ObjectStorageS3ThreadsActive,
CurrentMetrics::ObjectStorageS3ThreadsScheduled,
/* max_threads */1);
auto internal_source = std::make_unique<StorageObjectStorageSource>(
getName(),
object_storage,
configuration,
info,
format_settings,
S3StorageSettings::create(local_context->getSettingsRef()),
configuration->getQuerySettings(local_context),
local_context,
max_block_size,
file_iterator,
false,
Storage::getSchemaCache(local_context),
threadpool,
CurrentMetrics::ObjectStorageS3Threads,
CurrentMetrics::ObjectStorageS3ThreadsActive,
CurrentMetrics::ObjectStorageS3ThreadsScheduled);
StorageObjectStorage::getSchemaCache(local_context, configuration->getTypeName()));
auto file_deleter = [=, this](const std::string & path) mutable
{
@ -596,7 +581,7 @@ void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const
std::shared_ptr<StorageS3Queue::FileIterator> StorageS3Queue::createFileIterator(ContextPtr local_context, const ActionsDAG::Node * predicate)
{
auto settings = S3StorageSettings::create(local_context->getSettingsRef());
auto settings = configuration->getQuerySettings(local_context);
auto glob_iterator = std::make_unique<StorageObjectStorageSource::GlobIterator>(
object_storage, configuration, predicate, getVirtualsList(), local_context, nullptr, settings.list_object_keys_size, settings.throw_on_zero_files_match);

View File

@ -21,8 +21,7 @@ class S3QueueFilesMetadata;
class StorageS3Queue : public IStorage, WithContext
{
public:
using Storage = StorageObjectStorage<S3StorageSettings>;
using ConfigurationPtr = Storage::ConfigurationPtr;
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
StorageS3Queue(
std::unique_ptr<S3QueueSettings> s3queue_settings_,

View File

@ -9,6 +9,9 @@
#include <Interpreters/Context.h>
#include <IO/WriteHelpers.h>
#include <Formats/ReadSchemaUtils.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ObjectStorage/HDFS/Configuration.h>
#include <Storages/ObjectStorage/AzureBlob/Configuration.h>
namespace DB
{
@ -74,14 +77,14 @@ void StorageSystemSchemaInferenceCache::fillData(MutableColumns & res_columns, C
{
fillDataImpl(res_columns, StorageFile::getSchemaCache(context), "File");
#if USE_AWS_S3
fillDataImpl(res_columns, StorageS3::getSchemaCache(context), "S3");
fillDataImpl(res_columns, StorageObjectStorage::getSchemaCache(context, StorageS3Configuration::type_name), "S3");
#endif
#if USE_HDFS
fillDataImpl(res_columns, StorageHDFS::getSchemaCache(context), "HDFS");
fillDataImpl(res_columns, StorageObjectStorage::getSchemaCache(context, StorageHDFSConfiguration::type_name), "HDFS");
#endif
fillDataImpl(res_columns, StorageURL::getSchemaCache(context), "URL");
#if USE_AZURE_BLOB_STORAGE
fillDataImpl(res_columns, StorageAzureBlob::getSchemaCache(context), "Azure"); /// FIXME
fillDataImpl(res_columns, StorageObjectStorage::getSchemaCache(context, StorageAzureBlobConfiguration::type_name), "Azure");
#endif
}

View File

@ -39,7 +39,7 @@ protected:
columns = cached_columns;
StoragePtr storage = Storage::create(
configuration, context, "", StorageID(TableFunction::getDatabaseName(), table_name),
configuration, context, StorageID(TableFunction::getDatabaseName(), table_name),
columns, ConstraintsDescription{}, String{}, std::nullopt, LoadingStrictnessLevel::CREATE);
storage->startup();

View File

@ -27,27 +27,27 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
template <typename Definition, typename StorageSettings, typename Configuration>
template <typename Definition, typename Configuration>
ObjectStoragePtr TableFunctionObjectStorage<
Definition, StorageSettings, Configuration>::getObjectStorage(const ContextPtr & context, bool create_readonly) const
Definition, Configuration>::getObjectStorage(const ContextPtr & context, bool create_readonly) const
{
if (!object_storage)
object_storage = configuration->createObjectStorage(context, create_readonly);
return object_storage;
}
template <typename Definition, typename StorageSettings, typename Configuration>
template <typename Definition, typename Configuration>
StorageObjectStorageConfigurationPtr TableFunctionObjectStorage<
Definition, StorageSettings, Configuration>::getConfiguration() const
Definition, Configuration>::getConfiguration() const
{
if (!configuration)
configuration = std::make_shared<Configuration>();
return configuration;
}
template <typename Definition, typename StorageSettings, typename Configuration>
template <typename Definition, typename Configuration>
std::vector<size_t> TableFunctionObjectStorage<
Definition, StorageSettings, Configuration>::skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr) const
Definition, Configuration>::skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr) const
{
auto & table_function_node = query_node_table_function->as<TableFunctionNode &>();
auto & table_function_arguments_nodes = table_function_node.getArguments().getNodes();
@ -63,22 +63,21 @@ std::vector<size_t> TableFunctionObjectStorage<
return result;
}
template <typename Definition, typename StorageSettings, typename Configuration>
void TableFunctionObjectStorage<Definition, StorageSettings, Configuration>::updateStructureAndFormatArgumentsIfNeeded(
template <typename Definition, typename Configuration>
void TableFunctionObjectStorage<Definition, Configuration>::updateStructureAndFormatArgumentsIfNeeded(
ASTs & args, const String & structure, const String & format, const ContextPtr & context)
{
Configuration::addStructureAndFormatToArgs(args, structure, format, context);
Configuration().addStructureAndFormatToArgs(args, structure, format, context);
}
template <typename Definition, typename StorageSettings, typename Configuration>
void TableFunctionObjectStorage<
Definition, StorageSettings, Configuration>::parseArgumentsImpl(ASTs & engine_args, const ContextPtr & local_context)
template <typename Definition, typename Configuration>
void TableFunctionObjectStorage<Definition, Configuration>::parseArgumentsImpl(ASTs & engine_args, const ContextPtr & local_context)
{
StorageObjectStorageConfiguration::initialize(*getConfiguration(), engine_args, local_context, true);
}
template <typename Definition, typename StorageSettings, typename Configuration>
void TableFunctionObjectStorage<Definition, StorageSettings, Configuration>::parseArguments(const ASTPtr & ast_function, ContextPtr context)
template <typename Definition, typename Configuration>
void TableFunctionObjectStorage<Definition, Configuration>::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{
/// Clone ast function, because we can modify its arguments like removing headers.
auto ast_copy = ast_function->clone();
@ -90,38 +89,38 @@ void TableFunctionObjectStorage<Definition, StorageSettings, Configuration>::par
parseArgumentsImpl(args, context);
}
template <typename Definition, typename StorageSettings, typename Configuration>
template <typename Definition, typename Configuration>
ColumnsDescription TableFunctionObjectStorage<
Definition, StorageSettings, Configuration>::getActualTableStructure(ContextPtr context, bool is_insert_query) const
Definition, Configuration>::getActualTableStructure(ContextPtr context, bool is_insert_query) const
{
chassert(configuration);
if (configuration->structure == "auto")
{
context->checkAccess(getSourceAccessType());
auto storage = getObjectStorage(context, !is_insert_query);
return StorageObjectStorage<StorageSettings>::getTableStructureFromData(storage, configuration, std::nullopt, context);
return StorageObjectStorage::getTableStructureFromData(storage, configuration, std::nullopt, context);
}
return parseColumnsListFromString(configuration->structure, context);
}
template <typename Definition, typename StorageSettings, typename Configuration>
template <typename Definition, typename Configuration>
bool TableFunctionObjectStorage<
Definition, StorageSettings, Configuration>::supportsReadingSubsetOfColumns(const ContextPtr & context)
Definition, Configuration>::supportsReadingSubsetOfColumns(const ContextPtr & context)
{
chassert(configuration);
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->format, context);
}
template <typename Definition, typename StorageSettings, typename Configuration>
template <typename Definition, typename Configuration>
std::unordered_set<String> TableFunctionObjectStorage<
Definition, StorageSettings, Configuration>::getVirtualsToCheckBeforeUsingStructureHint() const
Definition, Configuration>::getVirtualsToCheckBeforeUsingStructureHint() const
{
return VirtualColumnUtils::getVirtualNamesForFileLikeStorage();
}
template <typename Definition, typename StorageSettings, typename Configuration>
StoragePtr TableFunctionObjectStorage<Definition, StorageSettings, Configuration>::executeImpl(
template <typename Definition, typename Configuration>
StoragePtr TableFunctionObjectStorage<Definition, Configuration>::executeImpl(
const ASTPtr & /* ast_function */,
ContextPtr context,
const std::string & table_name,
@ -137,10 +136,9 @@ StoragePtr TableFunctionObjectStorage<Definition, StorageSettings, Configuration
else if (!cached_columns.empty())
columns = cached_columns;
StoragePtr storage = std::make_shared<StorageObjectStorage<StorageSettings>>(
StoragePtr storage = std::make_shared<StorageObjectStorage>(
configuration,
getObjectStorage(context, !is_insert_query),
Definition::storage_type_name,
context,
StorageID(getDatabaseName(), table_name),
columns,
@ -159,7 +157,7 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AWS_S3
factory.registerFunction<TableFunctionObjectStorage<S3Definition, S3StorageSettings, StorageS3Configuration>>(
factory.registerFunction<TableFunctionObjectStorage<S3Definition, StorageS3Configuration>>(
{
.documentation =
{
@ -170,7 +168,7 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory)
.allow_readonly = false
});
factory.registerFunction<TableFunctionObjectStorage<GCSDefinition, S3StorageSettings, StorageS3Configuration>>(
factory.registerFunction<TableFunctionObjectStorage<GCSDefinition, StorageS3Configuration>>(
{
.documentation =
{
@ -181,7 +179,7 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory)
.allow_readonly = false
});
factory.registerFunction<TableFunctionObjectStorage<COSNDefinition, S3StorageSettings, StorageS3Configuration>>(
factory.registerFunction<TableFunctionObjectStorage<COSNDefinition, StorageS3Configuration>>(
{
.documentation =
{
@ -191,7 +189,7 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory)
.categories{"DataLake"}},
.allow_readonly = false
});
factory.registerFunction<TableFunctionObjectStorage<OSSDefinition, S3StorageSettings, StorageS3Configuration>>(
factory.registerFunction<TableFunctionObjectStorage<OSSDefinition, StorageS3Configuration>>(
{
.documentation =
{
@ -204,7 +202,7 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory)
#endif
#if USE_AZURE_BLOB_STORAGE
factory.registerFunction<TableFunctionObjectStorage<AzureDefinition, AzureStorageSettings, StorageAzureBlobConfiguration>>(
factory.registerFunction<TableFunctionObjectStorage<AzureDefinition, StorageAzureBlobConfiguration>>(
{
.documentation =
{
@ -220,7 +218,7 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory)
});
#endif
#if USE_HDFS
factory.registerFunction<TableFunctionObjectStorage<HDFSDefinition, HDFSStorageSettings, StorageHDFSConfiguration>>(
factory.registerFunction<TableFunctionObjectStorage<HDFSDefinition, StorageHDFSConfiguration>>(
{
.allow_readonly = false
});
@ -228,21 +226,21 @@ void registerTableFunctionObjectStorage(TableFunctionFactory & factory)
}
#if USE_AZURE_BLOB_STORAGE
template class TableFunctionObjectStorage<AzureDefinition, AzureStorageSettings, StorageAzureBlobConfiguration>;
template class TableFunctionObjectStorage<AzureClusterDefinition, AzureStorageSettings, StorageAzureBlobConfiguration>;
template class TableFunctionObjectStorage<AzureDefinition, StorageAzureBlobConfiguration>;
template class TableFunctionObjectStorage<AzureClusterDefinition, StorageAzureBlobConfiguration>;
#endif
#if USE_AWS_S3
template class TableFunctionObjectStorage<S3Definition, S3StorageSettings, StorageS3Configuration>;
template class TableFunctionObjectStorage<S3ClusterDefinition, S3StorageSettings, StorageS3Configuration>;
template class TableFunctionObjectStorage<GCSDefinition, S3StorageSettings, StorageS3Configuration>;
template class TableFunctionObjectStorage<COSNDefinition, S3StorageSettings, StorageS3Configuration>;
template class TableFunctionObjectStorage<OSSDefinition, S3StorageSettings, StorageS3Configuration>;
template class TableFunctionObjectStorage<S3Definition, StorageS3Configuration>;
template class TableFunctionObjectStorage<S3ClusterDefinition, StorageS3Configuration>;
template class TableFunctionObjectStorage<GCSDefinition, StorageS3Configuration>;
template class TableFunctionObjectStorage<COSNDefinition, StorageS3Configuration>;
template class TableFunctionObjectStorage<OSSDefinition, StorageS3Configuration>;
#endif
#if USE_HDFS
template class TableFunctionObjectStorage<HDFSDefinition, HDFSStorageSettings, StorageHDFSConfiguration>;
template class TableFunctionObjectStorage<HDFSClusterDefinition, HDFSStorageSettings, StorageHDFSConfiguration>;
template class TableFunctionObjectStorage<HDFSDefinition, StorageHDFSConfiguration>;
template class TableFunctionObjectStorage<HDFSClusterDefinition, StorageHDFSConfiguration>;
#endif
}

View File

@ -85,7 +85,7 @@ struct HDFSDefinition
" - uri, format, structure, compression_method\n";
};
template <typename Definition, typename StorageSettings, typename Configuration>
template <typename Definition, typename Configuration>
class TableFunctionObjectStorage : public ITableFunction
{
public:
@ -142,14 +142,14 @@ protected:
};
#if USE_AWS_S3
using TableFunctionS3 = TableFunctionObjectStorage<S3Definition, S3StorageSettings, StorageS3Configuration>;
using TableFunctionS3 = TableFunctionObjectStorage<S3Definition, StorageS3Configuration>;
#endif
#if USE_AZURE_BLOB_STORAGE
using TableFunctionAzureBlob = TableFunctionObjectStorage<AzureDefinition, AzureStorageSettings, StorageAzureBlobConfiguration>;
using TableFunctionAzureBlob = TableFunctionObjectStorage<AzureDefinition, StorageAzureBlobConfiguration>;
#endif
#if USE_HDFS
using TableFunctionHDFS = TableFunctionObjectStorage<HDFSDefinition, HDFSStorageSettings, StorageHDFSConfiguration>;
using TableFunctionHDFS = TableFunctionObjectStorage<HDFSDefinition, StorageHDFSConfiguration>;
#endif
}

View File

@ -14,8 +14,8 @@
namespace DB
{
template <typename Definition, typename StorageSettings, typename Configuration>
StoragePtr TableFunctionObjectStorageCluster<Definition, StorageSettings, Configuration>::executeImpl(
template <typename Definition, typename Configuration>
StoragePtr TableFunctionObjectStorageCluster<Definition, Configuration>::executeImpl(
const ASTPtr & /*function*/, ContextPtr context,
const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const
{
@ -34,10 +34,9 @@ StoragePtr TableFunctionObjectStorageCluster<Definition, StorageSettings, Config
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
{
/// On worker node this filename won't contains globs
storage = std::make_shared<StorageObjectStorage<StorageSettings>>(
storage = std::make_shared<StorageObjectStorage>(
configuration,
object_storage,
Definition::storage_type_name,
context,
StorageID(Base::getDatabaseName(), table_name),
columns,
@ -49,11 +48,10 @@ StoragePtr TableFunctionObjectStorageCluster<Definition, StorageSettings, Config
}
else
{
storage = std::make_shared<StorageObjectStorageCluster<Definition, StorageSettings, Configuration>>(
storage = std::make_shared<StorageObjectStorageCluster>(
ITableFunctionCluster<Base>::cluster_name,
configuration,
object_storage,
Definition::storage_type_name,
StorageID(Base::getDatabaseName(), table_name),
columns,
ConstraintsDescription{},
@ -107,14 +105,14 @@ void registerTableFunctionObjectStorageCluster(TableFunctionFactory & factory)
}
#if USE_AWS_S3
template class TableFunctionObjectStorageCluster<S3ClusterDefinition, S3StorageSettings, StorageS3Configuration>;
template class TableFunctionObjectStorageCluster<S3ClusterDefinition, StorageS3Configuration>;
#endif
#if USE_AZURE_BLOB_STORAGE
template class TableFunctionObjectStorageCluster<AzureClusterDefinition, AzureStorageSettings, StorageAzureBlobConfiguration>;
template class TableFunctionObjectStorageCluster<AzureClusterDefinition, StorageAzureBlobConfiguration>;
#endif
#if USE_HDFS
template class TableFunctionObjectStorageCluster<HDFSClusterDefinition, HDFSStorageSettings, StorageHDFSConfiguration>;
template class TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>;
#endif
}

View File

@ -56,8 +56,8 @@ struct HDFSClusterDefinition
" - cluster_name, uri, format, structure, compression_method\n";
};
template <typename Definition, typename StorageSettings, typename Configuration>
class TableFunctionObjectStorageCluster : public ITableFunctionCluster<TableFunctionObjectStorage<Definition, StorageSettings, Configuration>>
template <typename Definition, typename Configuration>
class TableFunctionObjectStorageCluster : public ITableFunctionCluster<TableFunctionObjectStorage<Definition, Configuration>>
{
public:
static constexpr auto name = Definition::name;
@ -67,7 +67,7 @@ public:
String getSignature() const override { return signature; }
protected:
using Base = TableFunctionObjectStorage<Definition, StorageSettings, Configuration>;
using Base = TableFunctionObjectStorage<Definition, Configuration>;
StoragePtr executeImpl(
const ASTPtr & ast_function,
@ -86,14 +86,14 @@ protected:
};
#if USE_AWS_S3
using TableFunctionS3Cluster = TableFunctionObjectStorageCluster<S3ClusterDefinition, S3StorageSettings, StorageS3Configuration>;
using TableFunctionS3Cluster = TableFunctionObjectStorageCluster<S3ClusterDefinition, StorageS3Configuration>;
#endif
#if USE_AZURE_BLOB_STORAGE
using TableFunctionAzureBlobCluster = TableFunctionObjectStorageCluster<AzureClusterDefinition, AzureStorageSettings, StorageAzureBlobConfiguration>;
using TableFunctionAzureBlobCluster = TableFunctionObjectStorageCluster<AzureClusterDefinition, StorageAzureBlobConfiguration>;
#endif
#if USE_HDFS
using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster<HDFSClusterDefinition, HDFSStorageSettings, StorageHDFSConfiguration>;
using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster<HDFSClusterDefinition, StorageHDFSConfiguration>;
#endif
}