mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 16:50:48 +00:00
Refactored to StorageAzureBlob
This commit is contained in:
parent
5637858182
commit
99f0be8ef5
@ -1,4 +1,4 @@
|
||||
#include <Storages/StorageAzure.h>
|
||||
#include <Storages/StorageAzureBlob.h>
|
||||
|
||||
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
@ -91,7 +91,7 @@ bool isConnectionString(const std::string & candidate)
|
||||
|
||||
}
|
||||
|
||||
void StorageAzure::processNamedCollectionResult(StorageAzure::Configuration & configuration, const NamedCollection & collection)
|
||||
void StorageAzureBlob::processNamedCollectionResult(StorageAzureBlob::Configuration & configuration, const NamedCollection & collection)
|
||||
{
|
||||
validateNamedCollection(collection, required_configuration_keys, optional_configuration_keys);
|
||||
|
||||
@ -122,15 +122,15 @@ void StorageAzure::processNamedCollectionResult(StorageAzure::Configuration & co
|
||||
}
|
||||
|
||||
|
||||
StorageAzure::Configuration StorageAzure::getConfiguration(ASTs & engine_args, ContextPtr local_context, bool get_format_from_file)
|
||||
StorageAzureBlob::Configuration StorageAzureBlob::getConfiguration(ASTs & engine_args, ContextPtr local_context, bool get_format_from_file)
|
||||
{
|
||||
LOG_INFO(&Poco::Logger::get("StorageAzure"), "get_format_from_file = {}", get_format_from_file);
|
||||
LOG_INFO(&Poco::Logger::get("StorageAzureBlob"), "get_format_from_file = {}", get_format_from_file);
|
||||
|
||||
StorageAzure::Configuration configuration;
|
||||
StorageAzureBlob::Configuration configuration;
|
||||
|
||||
/// Supported signatures:
|
||||
///
|
||||
/// Azure(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])
|
||||
/// AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])
|
||||
///
|
||||
|
||||
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, local_context))
|
||||
@ -147,8 +147,8 @@ StorageAzure::Configuration StorageAzure::getConfiguration(ASTs & engine_args, C
|
||||
|
||||
if (engine_args.size() < 3 || engine_args.size() > 7)
|
||||
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
|
||||
"Storage Azure requires 3 to 7 arguments: "
|
||||
"Azure(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])");
|
||||
"Storage AzureBlobStorage requires 3 to 7 arguments: "
|
||||
"AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])");
|
||||
|
||||
for (auto & engine_arg : engine_args)
|
||||
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context);
|
||||
@ -239,7 +239,7 @@ StorageAzure::Configuration StorageAzure::getConfiguration(ASTs & engine_args, C
|
||||
}
|
||||
|
||||
|
||||
AzureObjectStorage::SettingsPtr StorageAzure::createSettings(ContextPtr local_context)
|
||||
AzureObjectStorage::SettingsPtr StorageAzureBlob::createSettings(ContextPtr local_context)
|
||||
{
|
||||
const auto & context_settings = local_context->getSettingsRef();
|
||||
auto settings_ptr = std::make_unique<AzureObjectStorageSettings>();
|
||||
@ -250,7 +250,7 @@ AzureObjectStorage::SettingsPtr StorageAzure::createSettings(ContextPtr local_co
|
||||
return settings_ptr;
|
||||
}
|
||||
|
||||
void registerStorageAzure(StorageFactory & factory)
|
||||
void registerStorageAzureBlob(StorageFactory & factory)
|
||||
{
|
||||
factory.registerStorage("AzureBlobStorage", [](const StorageFactory::Arguments & args)
|
||||
{
|
||||
@ -258,8 +258,8 @@ void registerStorageAzure(StorageFactory & factory)
|
||||
if (engine_args.empty())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
|
||||
|
||||
auto configuration = StorageAzure::getConfiguration(engine_args, args.getLocalContext());
|
||||
auto client = StorageAzure::createClient(configuration);
|
||||
auto configuration = StorageAzureBlob::getConfiguration(engine_args, args.getLocalContext());
|
||||
auto client = StorageAzureBlob::createClient(configuration);
|
||||
// Use format settings from global server context + settings from
|
||||
// the SETTINGS clause of the create query. Settings from current
|
||||
// session and user are ignored.
|
||||
@ -290,11 +290,11 @@ void registerStorageAzure(StorageFactory & factory)
|
||||
if (args.storage_def->partition_by)
|
||||
partition_by = args.storage_def->partition_by->clone();
|
||||
|
||||
auto settings = StorageAzure::createSettings(args.getContext());
|
||||
auto settings = StorageAzureBlob::createSettings(args.getContext());
|
||||
|
||||
return std::make_shared<StorageAzure>(
|
||||
return std::make_shared<StorageAzureBlob>(
|
||||
std::move(configuration),
|
||||
std::make_unique<AzureObjectStorage>("AzureStorage", std::move(client), std::move(settings)),
|
||||
std::make_unique<AzureObjectStorage>("AzureBlobStorage", std::move(client), std::move(settings)),
|
||||
args.getContext(),
|
||||
args.table_id,
|
||||
args.columns,
|
||||
@ -311,7 +311,7 @@ void registerStorageAzure(StorageFactory & factory)
|
||||
});
|
||||
}
|
||||
|
||||
AzureClientPtr StorageAzure::createClient(StorageAzure::Configuration configuration)
|
||||
AzureClientPtr StorageAzureBlob::createClient(StorageAzureBlob::Configuration configuration)
|
||||
{
|
||||
AzureClientPtr result;
|
||||
|
||||
@ -375,7 +375,7 @@ AzureClientPtr StorageAzure::createClient(StorageAzure::Configuration configurat
|
||||
return result;
|
||||
}
|
||||
|
||||
Poco::URI StorageAzure::Configuration::getConnectionURL() const
|
||||
Poco::URI StorageAzureBlob::Configuration::getConnectionURL() const
|
||||
{
|
||||
if (!is_connection_string)
|
||||
return Poco::URI(connection_url);
|
||||
@ -385,7 +385,7 @@ Poco::URI StorageAzure::Configuration::getConnectionURL() const
|
||||
}
|
||||
|
||||
|
||||
StorageAzure::StorageAzure(
|
||||
StorageAzureBlob::StorageAzureBlob(
|
||||
const Configuration & configuration_,
|
||||
std::unique_ptr<AzureObjectStorage> && object_storage_,
|
||||
ContextPtr context,
|
||||
@ -434,7 +434,7 @@ StorageAzure::StorageAzure(
|
||||
virtual_block.insert({column.type->createColumn(), column.type, column.name});
|
||||
}
|
||||
|
||||
void StorageAzure::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
|
||||
void StorageAzureBlob::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
|
||||
{
|
||||
if (configuration.withGlobs())
|
||||
{
|
||||
@ -454,10 +454,10 @@ void StorageAzure::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextP
|
||||
namespace
|
||||
{
|
||||
|
||||
class StorageAzureSink : public SinkToStorage
|
||||
class StorageAzureBlobSink : public SinkToStorage
|
||||
{
|
||||
public:
|
||||
StorageAzureSink(
|
||||
StorageAzureBlobSink(
|
||||
const String & format,
|
||||
const Block & sample_block_,
|
||||
ContextPtr context,
|
||||
@ -474,7 +474,7 @@ public:
|
||||
writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format, *write_buf, sample_block, context, format_settings);
|
||||
}
|
||||
|
||||
String getName() const override { return "StorageS3Sink"; }
|
||||
String getName() const override { return "StorageAzureBlobSink"; }
|
||||
|
||||
void consume(Chunk chunk) override
|
||||
{
|
||||
@ -532,10 +532,10 @@ private:
|
||||
std::mutex cancel_mutex;
|
||||
};
|
||||
|
||||
class PartitionedStorageAzureSink : public PartitionedSink
|
||||
class PartitionedStorageAzureBlobSink : public PartitionedSink
|
||||
{
|
||||
public:
|
||||
PartitionedStorageAzureSink(
|
||||
PartitionedStorageAzureBlobSink(
|
||||
const ASTPtr & partition_by,
|
||||
const String & format_,
|
||||
const Block & sample_block_,
|
||||
@ -560,7 +560,7 @@ public:
|
||||
auto partition_key = replaceWildcards(blob, partition_id);
|
||||
validateKey(partition_key);
|
||||
|
||||
return std::make_shared<StorageAzureSink>(
|
||||
return std::make_shared<StorageAzureBlobSink>(
|
||||
format,
|
||||
sample_block,
|
||||
context,
|
||||
@ -590,7 +590,7 @@ private:
|
||||
|
||||
}
|
||||
|
||||
Pipe StorageAzure::read(
|
||||
Pipe StorageAzureBlob::read(
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
@ -613,17 +613,17 @@ Pipe StorageAzure::read(
|
||||
requested_virtual_columns.push_back(virtual_column);
|
||||
}
|
||||
|
||||
std::shared_ptr<StorageAzureSource::Iterator> iterator_wrapper;
|
||||
std::shared_ptr<StorageAzureBlobSource::Iterator> iterator_wrapper;
|
||||
if (configuration.withGlobs())
|
||||
{
|
||||
/// Iterate through disclosed globs and make a source for each file
|
||||
iterator_wrapper = std::make_shared<StorageAzureSource::Iterator>(
|
||||
iterator_wrapper = std::make_shared<StorageAzureBlobSource::Iterator>(
|
||||
object_storage.get(), configuration.container, std::nullopt,
|
||||
configuration.blob_path, query_info.query, virtual_block, local_context, nullptr);
|
||||
}
|
||||
else
|
||||
{
|
||||
iterator_wrapper = std::make_shared<StorageAzureSource::Iterator>(
|
||||
iterator_wrapper = std::make_shared<StorageAzureBlobSource::Iterator>(
|
||||
object_storage.get(), configuration.container, configuration.blobs_paths,
|
||||
std::nullopt, query_info.query, virtual_block, local_context, nullptr);
|
||||
}
|
||||
@ -653,7 +653,7 @@ Pipe StorageAzure::read(
|
||||
|
||||
for (size_t i = 0; i < num_streams; ++i)
|
||||
{
|
||||
pipes.emplace_back(std::make_shared<StorageAzureSource>(
|
||||
pipes.emplace_back(std::make_shared<StorageAzureBlobSource>(
|
||||
requested_virtual_columns,
|
||||
configuration.format,
|
||||
getName(),
|
||||
@ -671,7 +671,7 @@ Pipe StorageAzure::read(
|
||||
return Pipe::unitePipes(std::move(pipes));
|
||||
}
|
||||
|
||||
SinkToStoragePtr StorageAzure::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
|
||||
SinkToStoragePtr StorageAzureBlob::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
|
||||
{
|
||||
auto sample_block = metadata_snapshot->getSampleBlock();
|
||||
auto chosen_compression_method = chooseCompressionMethod(configuration.blobs_paths.back(), configuration.compression_method);
|
||||
@ -682,7 +682,7 @@ SinkToStoragePtr StorageAzure::write(const ASTPtr & query, const StorageMetadata
|
||||
|
||||
if (is_partitioned_implementation)
|
||||
{
|
||||
return std::make_shared<PartitionedStorageAzureSink>(
|
||||
return std::make_shared<PartitionedStorageAzureBlobSink>(
|
||||
partition_by_ast,
|
||||
configuration.format,
|
||||
sample_block,
|
||||
@ -696,7 +696,7 @@ SinkToStoragePtr StorageAzure::write(const ASTPtr & query, const StorageMetadata
|
||||
{
|
||||
if (configuration.withGlobs())
|
||||
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
|
||||
"Azure key '{}' contains globs, so the table is in readonly mode", configuration.blob_path);
|
||||
"AzureBlobStorage key '{}' contains globs, so the table is in readonly mode", configuration.blob_path);
|
||||
|
||||
bool truncate_in_insert = local_context->getSettingsRef().azure_truncate_on_insert;
|
||||
|
||||
@ -730,7 +730,7 @@ SinkToStoragePtr StorageAzure::write(const ASTPtr & query, const StorageMetadata
|
||||
}
|
||||
}
|
||||
|
||||
return std::make_shared<StorageAzureSink>(
|
||||
return std::make_shared<StorageAzureBlobSink>(
|
||||
configuration.format,
|
||||
sample_block,
|
||||
local_context,
|
||||
@ -741,32 +741,32 @@ SinkToStoragePtr StorageAzure::write(const ASTPtr & query, const StorageMetadata
|
||||
}
|
||||
}
|
||||
|
||||
NamesAndTypesList StorageAzure::getVirtuals() const
|
||||
NamesAndTypesList StorageAzureBlob::getVirtuals() const
|
||||
{
|
||||
return virtual_columns;
|
||||
}
|
||||
|
||||
bool StorageAzure::supportsPartitionBy() const
|
||||
bool StorageAzureBlob::supportsPartitionBy() const
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
bool StorageAzure::supportsSubcolumns() const
|
||||
bool StorageAzureBlob::supportsSubcolumns() const
|
||||
{
|
||||
return FormatFactory::instance().checkIfFormatSupportsSubcolumns(configuration.format);
|
||||
}
|
||||
|
||||
bool StorageAzure::supportsSubsetOfColumns() const
|
||||
bool StorageAzureBlob::supportsSubsetOfColumns() const
|
||||
{
|
||||
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
|
||||
}
|
||||
|
||||
bool StorageAzure::prefersLargeBlocks() const
|
||||
bool StorageAzureBlob::prefersLargeBlocks() const
|
||||
{
|
||||
return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(configuration.format);
|
||||
}
|
||||
|
||||
bool StorageAzure::parallelizeOutputAfterReading(ContextPtr context) const
|
||||
bool StorageAzureBlob::parallelizeOutputAfterReading(ContextPtr context) const
|
||||
{
|
||||
return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration.format, context);
|
||||
}
|
||||
@ -788,7 +788,7 @@ static void addPathToVirtualColumns(Block & block, const String & path, size_t i
|
||||
block.getByName("_idx").column->assumeMutableRef().insert(idx);
|
||||
}
|
||||
|
||||
StorageAzureSource::Iterator::Iterator(
|
||||
StorageAzureBlobSource::Iterator::Iterator(
|
||||
AzureObjectStorage * object_storage_,
|
||||
const std::string & container_,
|
||||
std::optional<Strings> keys_,
|
||||
@ -886,7 +886,7 @@ StorageAzureSource::Iterator::Iterator(
|
||||
|
||||
}
|
||||
|
||||
RelativePathWithMetadata StorageAzureSource::Iterator::next()
|
||||
RelativePathWithMetadata StorageAzureBlobSource::Iterator::next()
|
||||
{
|
||||
if (is_finished)
|
||||
return {};
|
||||
@ -971,13 +971,13 @@ RelativePathWithMetadata StorageAzureSource::Iterator::next()
|
||||
}
|
||||
}
|
||||
|
||||
size_t StorageAzureSource::Iterator::getTotalSize() const
|
||||
size_t StorageAzureBlobSource::Iterator::getTotalSize() const
|
||||
{
|
||||
return total_size.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
|
||||
void StorageAzureSource::Iterator::createFilterAST(const String & any_key)
|
||||
void StorageAzureBlobSource::Iterator::createFilterAST(const String & any_key)
|
||||
{
|
||||
if (!query || !virtual_header)
|
||||
return;
|
||||
@ -992,7 +992,7 @@ void StorageAzureSource::Iterator::createFilterAST(const String & any_key)
|
||||
}
|
||||
|
||||
|
||||
Chunk StorageAzureSource::generate()
|
||||
Chunk StorageAzureBlobSource::generate()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
@ -1049,7 +1049,7 @@ Chunk StorageAzureSource::generate()
|
||||
return {};
|
||||
}
|
||||
|
||||
Block StorageAzureSource::getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns)
|
||||
Block StorageAzureBlobSource::getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns)
|
||||
{
|
||||
for (const auto & virtual_column : requested_virtual_columns)
|
||||
sample_block.insert({virtual_column.type->createColumn(), virtual_column.type, virtual_column.name});
|
||||
@ -1057,7 +1057,7 @@ Block StorageAzureSource::getHeader(Block sample_block, const std::vector<NameAn
|
||||
return sample_block;
|
||||
}
|
||||
|
||||
StorageAzureSource::StorageAzureSource(
|
||||
StorageAzureBlobSource::StorageAzureBlobSource(
|
||||
const std::vector<NameAndTypePair> & requested_virtual_columns_,
|
||||
const String & format_,
|
||||
String name_,
|
||||
@ -1092,17 +1092,17 @@ StorageAzureSource::StorageAzureSource(
|
||||
}
|
||||
|
||||
|
||||
StorageAzureSource::~StorageAzureSource()
|
||||
StorageAzureBlobSource::~StorageAzureBlobSource()
|
||||
{
|
||||
create_reader_pool.wait();
|
||||
}
|
||||
|
||||
String StorageAzureSource::getName() const
|
||||
String StorageAzureBlobSource::getName() const
|
||||
{
|
||||
return name;
|
||||
}
|
||||
|
||||
StorageAzureSource::ReaderHolder StorageAzureSource::createReader()
|
||||
StorageAzureBlobSource::ReaderHolder StorageAzureBlobSource::createReader()
|
||||
{
|
||||
auto [current_key, info] = file_iterator->next();
|
||||
LOG_DEBUG(log, "KEY {} SIZE {}", current_key, info.size_bytes);
|
||||
@ -1135,12 +1135,12 @@ StorageAzureSource::ReaderHolder StorageAzureSource::createReader()
|
||||
return ReaderHolder{fs::path(container) / current_key, std::move(read_buf), std::move(pipeline), std::move(current_reader)};
|
||||
}
|
||||
|
||||
std::future<StorageAzureSource::ReaderHolder> StorageAzureSource::createReaderAsync()
|
||||
std::future<StorageAzureBlobSource::ReaderHolder> StorageAzureBlobSource::createReaderAsync()
|
||||
{
|
||||
return create_reader_scheduler([this] { return createReader(); }, Priority{});
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBuffer> StorageAzureSource::createAzureReadBuffer(const String & key, size_t object_size)
|
||||
std::unique_ptr<ReadBuffer> StorageAzureBlobSource::createAzureReadBuffer(const String & key, size_t object_size)
|
||||
{
|
||||
auto read_settings = getContext()->getReadSettings().adjustBufferSize(object_size);
|
||||
read_settings.enable_filesystem_cache = false;
|
||||
@ -1159,23 +1159,23 @@ std::unique_ptr<ReadBuffer> StorageAzureSource::createAzureReadBuffer(const Stri
|
||||
return object_storage->readObject(StoredObject(key), read_settings, {}, object_size);
|
||||
}
|
||||
|
||||
ColumnsDescription StorageAzure::getTableStructureFromData(
|
||||
ColumnsDescription StorageAzureBlob::getTableStructureFromData(
|
||||
AzureObjectStorage * object_storage,
|
||||
const Configuration & configuration,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
ContextPtr ctx)
|
||||
{
|
||||
RelativePathsWithMetadata read_keys;
|
||||
std::shared_ptr<StorageAzureSource::Iterator> file_iterator;
|
||||
std::shared_ptr<StorageAzureBlobSource::Iterator> file_iterator;
|
||||
if (configuration.withGlobs())
|
||||
{
|
||||
file_iterator = std::make_shared<StorageAzureSource::Iterator>(
|
||||
file_iterator = std::make_shared<StorageAzureBlobSource::Iterator>(
|
||||
object_storage, configuration.container, std::nullopt,
|
||||
configuration.blob_path, nullptr, Block{}, ctx, &read_keys);
|
||||
}
|
||||
else
|
||||
{
|
||||
file_iterator = std::make_shared<StorageAzureSource::Iterator>(
|
||||
file_iterator = std::make_shared<StorageAzureBlobSource::Iterator>(
|
||||
object_storage, configuration.container, configuration.blobs_paths,
|
||||
std::nullopt, nullptr, Block{}, ctx, &read_keys);
|
||||
}
|
||||
@ -1233,10 +1233,10 @@ ColumnsDescription StorageAzure::getTableStructureFromData(
|
||||
|
||||
}
|
||||
|
||||
std::optional<ColumnsDescription> StorageAzure::tryGetColumnsFromCache(
|
||||
std::optional<ColumnsDescription> StorageAzureBlob::tryGetColumnsFromCache(
|
||||
const RelativePathsWithMetadata::const_iterator & begin,
|
||||
const RelativePathsWithMetadata::const_iterator & end,
|
||||
const StorageAzure::Configuration & configuration,
|
||||
const StorageAzureBlob::Configuration & configuration,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
const ContextPtr & ctx)
|
||||
{
|
||||
@ -1260,10 +1260,10 @@ std::optional<ColumnsDescription> StorageAzure::tryGetColumnsFromCache(
|
||||
|
||||
}
|
||||
|
||||
void StorageAzure::addColumnsToCache(
|
||||
void StorageAzureBlob::addColumnsToCache(
|
||||
const RelativePathsWithMetadata & keys,
|
||||
const ColumnsDescription & columns,
|
||||
const StorageAzure::Configuration & configuration,
|
||||
const StorageAzureBlob::Configuration & configuration,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
const String & format_name,
|
||||
const ContextPtr & ctx)
|
||||
@ -1277,14 +1277,14 @@ void StorageAzure::addColumnsToCache(
|
||||
schema_cache.addMany(cache_keys, columns);
|
||||
}
|
||||
|
||||
SchemaCache & StorageAzure::getSchemaCache(const ContextPtr & ctx)
|
||||
SchemaCache & StorageAzureBlob::getSchemaCache(const ContextPtr & ctx)
|
||||
{
|
||||
static SchemaCache schema_cache(ctx->getConfigRef().getUInt("schema_inference_cache_max_elements_for_azure", DEFAULT_SCHEMA_CACHE_ELEMENTS));
|
||||
return schema_cache;
|
||||
}
|
||||
|
||||
|
||||
std::unique_ptr<ReadBuffer> StorageAzureSource::createAsyncAzureReadBuffer(
|
||||
std::unique_ptr<ReadBuffer> StorageAzureBlobSource::createAsyncAzureReadBuffer(
|
||||
const String & key, const ReadSettings & read_settings, size_t object_size)
|
||||
{
|
||||
auto modified_settings{read_settings};
|
@ -24,7 +24,7 @@ using AzureConnectionString = std::string;
|
||||
|
||||
using AzureCredentials = std::variant<AzureSimpleAccountConfiguration, AzureConnectionString>;
|
||||
|
||||
class StorageAzure : public IStorage
|
||||
class StorageAzureBlob : public IStorage
|
||||
{
|
||||
public:
|
||||
|
||||
@ -62,7 +62,7 @@ public:
|
||||
std::vector<String> blobs_paths;
|
||||
};
|
||||
|
||||
StorageAzure(
|
||||
StorageAzureBlob(
|
||||
const Configuration & configuration_,
|
||||
std::unique_ptr<AzureObjectStorage> && object_storage_,
|
||||
ContextPtr context_,
|
||||
@ -73,12 +73,12 @@ public:
|
||||
std::optional<FormatSettings> format_settings_,
|
||||
ASTPtr partition_by_);
|
||||
|
||||
static StorageAzure::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context, bool get_format_from_file = true);
|
||||
static AzureClientPtr createClient(StorageAzure::Configuration configuration);
|
||||
static StorageAzureBlob::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context, bool get_format_from_file = true);
|
||||
static AzureClientPtr createClient(StorageAzureBlob::Configuration configuration);
|
||||
|
||||
static AzureObjectStorage::SettingsPtr createSettings(ContextPtr local_context);
|
||||
|
||||
static void processNamedCollectionResult(StorageAzure::Configuration & configuration, const NamedCollection & collection);
|
||||
static void processNamedCollectionResult(StorageAzureBlob::Configuration & configuration, const NamedCollection & collection);
|
||||
|
||||
String getName() const override
|
||||
{
|
||||
@ -133,7 +133,7 @@ private:
|
||||
static std::optional<ColumnsDescription> tryGetColumnsFromCache(
|
||||
const RelativePathsWithMetadata::const_iterator & begin,
|
||||
const RelativePathsWithMetadata::const_iterator & end,
|
||||
const StorageAzure::Configuration & configuration,
|
||||
const StorageAzureBlob::Configuration & configuration,
|
||||
const std::optional<FormatSettings> & format_settings,
|
||||
const ContextPtr & ctx);
|
||||
|
||||
@ -148,7 +148,7 @@ private:
|
||||
|
||||
};
|
||||
|
||||
class StorageAzureSource : public ISource, WithContext
|
||||
class StorageAzureBlobSource : public ISource, WithContext
|
||||
{
|
||||
public:
|
||||
class Iterator : WithContext
|
||||
@ -192,7 +192,7 @@ public:
|
||||
bool is_initialized = false;
|
||||
};
|
||||
|
||||
StorageAzureSource(
|
||||
StorageAzureBlobSource(
|
||||
const std::vector<NameAndTypePair> & requested_virtual_columns_,
|
||||
const String & format_,
|
||||
String name_,
|
||||
@ -206,7 +206,7 @@ public:
|
||||
const String & container_,
|
||||
std::shared_ptr<Iterator> file_iterator_);
|
||||
|
||||
~StorageAzureSource() override;
|
||||
~StorageAzureBlobSource() override;
|
||||
|
||||
Chunk generate() override;
|
||||
|
@ -14,7 +14,7 @@
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Storages/checkAndGetLiteralArgument.h>
|
||||
#include <Storages/StorageAzure.h>
|
||||
#include <Storages/StorageAzureBlob.h>
|
||||
#include <Storages/StorageS3.h>
|
||||
#include <Storages/StorageURL.h>
|
||||
#include <Storages/NamedCollectionsHelpers.h>
|
||||
@ -46,9 +46,9 @@ bool isConnectionString(const std::string & candidate)
|
||||
|
||||
}
|
||||
|
||||
StorageAzure::Configuration TableFunctionAzureBlobStorage::parseArgumentsImpl(ASTs & engine_args, const ContextPtr & local_context, bool get_format_from_file)
|
||||
StorageAzureBlob::Configuration TableFunctionAzureBlobStorage::parseArgumentsImpl(ASTs & engine_args, const ContextPtr & local_context, bool get_format_from_file)
|
||||
{
|
||||
StorageAzure::Configuration configuration;
|
||||
StorageAzureBlob::Configuration configuration;
|
||||
|
||||
/// Supported signatures:
|
||||
///
|
||||
@ -57,7 +57,7 @@ StorageAzure::Configuration TableFunctionAzureBlobStorage::parseArgumentsImpl(AS
|
||||
|
||||
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, local_context))
|
||||
{
|
||||
StorageAzure::processNamedCollectionResult(configuration, *named_collection);
|
||||
StorageAzureBlob::processNamedCollectionResult(configuration, *named_collection);
|
||||
|
||||
configuration.blobs_paths = {configuration.blob_path};
|
||||
|
||||
@ -202,11 +202,11 @@ ColumnsDescription TableFunctionAzureBlobStorage::getActualTableStructure(Contex
|
||||
if (configuration.structure == "auto")
|
||||
{
|
||||
context->checkAccess(getSourceAccessType());
|
||||
auto client = StorageAzure::createClient(configuration);
|
||||
auto settings = StorageAzure::createSettings(context);
|
||||
auto client = StorageAzureBlob::createClient(configuration);
|
||||
auto settings = StorageAzureBlob::createSettings(context);
|
||||
|
||||
auto object_storage = std::make_unique<AzureObjectStorage>("AzureBlobStorageTableFunction", std::move(client), std::move(settings));
|
||||
return StorageAzure::getTableStructureFromData(object_storage.get(), configuration, std::nullopt, context);
|
||||
return StorageAzureBlob::getTableStructureFromData(object_storage.get(), configuration, std::nullopt, context);
|
||||
}
|
||||
|
||||
return parseColumnsListFromString(configuration.structure, context);
|
||||
@ -219,8 +219,8 @@ bool TableFunctionAzureBlobStorage::supportsReadingSubsetOfColumns()
|
||||
|
||||
StoragePtr TableFunctionAzureBlobStorage::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
|
||||
{
|
||||
auto client = StorageAzure::createClient(configuration);
|
||||
auto settings = StorageAzure::createSettings(context);
|
||||
auto client = StorageAzureBlob::createClient(configuration);
|
||||
auto settings = StorageAzureBlob::createSettings(context);
|
||||
|
||||
ColumnsDescription columns;
|
||||
if (configuration.structure != "auto")
|
||||
@ -228,7 +228,7 @@ StoragePtr TableFunctionAzureBlobStorage::executeImpl(const ASTPtr & /*ast_funct
|
||||
else if (!structure_hint.empty())
|
||||
columns = structure_hint;
|
||||
|
||||
StoragePtr storage = std::make_shared<StorageAzure>(
|
||||
StoragePtr storage = std::make_shared<StorageAzureBlob>(
|
||||
configuration,
|
||||
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings)),
|
||||
context,
|
||||
|
@ -5,7 +5,7 @@
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
|
||||
#include <TableFunctions/ITableFunction.h>
|
||||
#include <Storages/StorageAzure.h>
|
||||
#include <Storages/StorageAzureBlob.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -46,7 +46,7 @@ public:
|
||||
return {"_path", "_file"};
|
||||
}
|
||||
|
||||
static StorageAzure::Configuration parseArgumentsImpl(ASTs & args, const ContextPtr & context, bool get_format_from_file = true);
|
||||
static StorageAzureBlob::Configuration parseArgumentsImpl(ASTs & args, const ContextPtr & context, bool get_format_from_file = true);
|
||||
|
||||
protected:
|
||||
|
||||
@ -61,7 +61,7 @@ protected:
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
|
||||
|
||||
mutable StorageAzure::Configuration configuration;
|
||||
mutable StorageAzureBlob::Configuration configuration;
|
||||
ColumnsDescription structure_hint;
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user