Merge pull request #50795 from ClickHouse/azure_table_function_cluster

Table function azureBlobStorageCluster
This commit is contained in:
SmitaRKulkarni 2023-08-07 09:16:52 +02:00 committed by GitHub
commit 75f81bdc44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 931 additions and 104 deletions

View File

@ -0,0 +1,47 @@
---
slug: /en/sql-reference/table-functions/azureBlobStorageCluster
sidebar_position: 55
sidebar_label: azureBlobStorageCluster
title: "azureBlobStorageCluster Table Function"
---
Allows processing files from [Azure Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs) in parallel from many nodes in a specified cluster. On initiator it creates a connection to all nodes in the cluster, discloses asterisks in S3 file path, and dispatches each file dynamically. On the worker node it asks the initiator about the next task to process and processes it. This is repeated until all tasks are finished.
This table function is similar to the [s3Cluster function](../../sql-reference/table-functions/s3Cluster.md).
**Syntax**
``` sql
azureBlobStorageCluster(cluster_name, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])
```
**Arguments**
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
- `connection_string|storage_account_url` — connection_string includes account name & key ([Create connection string](https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string?toc=%2Fazure%2Fstorage%2Fblobs%2Ftoc.json&bc=%2Fazure%2Fstorage%2Fblobs%2Fbreadcrumb%2Ftoc.json#configure-a-connection-string-for-an-azure-storage-account)) or you could also provide the storage account url here and account name & account key as separate parameters (see parameters account_name & account_key)
- `container_name` - Container name
- `blobpath` - file path. Supports following wildcards in readonly mode: `*`, `?`, `{abc,def}` and `{N..M}` where `N`, `M` — numbers, `'abc'`, `'def'` — strings.
- `account_name` - if storage_account_url is used, then account name can be specified here
- `account_key` - if storage_account_url is used, then account key can be specified here
- `format` — The [format](../../interfaces/formats.md#formats) of the file.
- `compression` — Supported values: `none`, `gzip/gz`, `brotli/br`, `xz/LZMA`, `zstd/zst`. By default, it will autodetect compression by file extension. (same as setting to `auto`).
- `structure` — Structure of the table. Format `'column1_name column1_type, column2_name column2_type, ...'`.
**Returned value**
A table with the specified structure for reading or writing data in the specified file.
**Examples**
Select the count for the file `test_cluster_*.csv`, using all the nodes in the `cluster_simple` cluster:
``` sql
SELECT count(*) from azureBlobStorageCluster(
'cluster_simple', 'http://azurite1:10000/devstoreaccount1', 'test_container', 'test_cluster_count.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
'auto', 'key UInt64')
```
**See Also**
- [AzureBlobStorage engine](../../engines/table-engines/integrations/azureBlobStorage.md)
- [azureBlobStorage table function](../../sql-reference/table-functions/azureBlobStorage.md)

View File

@ -300,6 +300,7 @@ void registerStorageAzureBlob(StorageFactory & factory)
args.constraints,
args.comment,
format_settings,
/* distributed_processing */ false,
partition_by);
},
{
@ -448,12 +449,13 @@ StorageAzureBlob::StorageAzureBlob(
const ConstraintsDescription & constraints_,
const String & comment,
std::optional<FormatSettings> format_settings_,
bool distributed_processing_,
ASTPtr partition_by_)
: IStorage(table_id_)
, name("AzureBlobStorage")
, configuration(configuration_)
, object_storage(std::move(object_storage_))
, distributed_processing(false)
, distributed_processing(distributed_processing_)
, format_settings(format_settings_)
, partition_by(partition_by_)
{
@ -463,7 +465,7 @@ StorageAzureBlob::StorageAzureBlob(
StorageInMemoryMetadata storage_metadata;
if (columns_.empty())
{
auto columns = getTableStructureFromData(object_storage.get(), configuration, format_settings, context);
auto columns = getTableStructureFromData(object_storage.get(), configuration, format_settings, context, distributed_processing);
storage_metadata.setColumns(columns);
}
else
@ -672,7 +674,12 @@ Pipe StorageAzureBlob::read(
Pipes pipes;
std::shared_ptr<StorageAzureBlobSource::IIterator> iterator_wrapper;
if (configuration.withGlobs())
if (distributed_processing)
{
iterator_wrapper = std::make_shared<StorageAzureBlobSource::ReadIterator>(local_context,
local_context->getReadTaskCallback());
}
else if (configuration.withGlobs())
{
/// Iterate through disclosed globs and make a source for each file
iterator_wrapper = std::make_shared<StorageAzureBlobSource::GlobIterator>(
@ -845,6 +852,7 @@ StorageAzureBlobSource::GlobIterator::GlobIterator(
blobs_with_metadata.emplace_back(blob_path_with_globs, object_metadata);
if (outer_blobs)
outer_blobs->emplace_back(blobs_with_metadata.back());
is_finished = true;
return;
}
@ -863,8 +871,10 @@ RelativePathWithMetadata StorageAzureBlobSource::GlobIterator::next()
{
std::lock_guard lock(next_mutex);
if (is_finished)
if (is_finished && index >= blobs_with_metadata.size())
{
return {};
}
bool need_new_batch = blobs_with_metadata.empty() || index >= blobs_with_metadata.size();
@ -1184,11 +1194,17 @@ ColumnsDescription StorageAzureBlob::getTableStructureFromData(
AzureObjectStorage * object_storage,
const Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx)
ContextPtr ctx,
bool distributed_processing)
{
RelativePathsWithMetadata read_keys;
std::shared_ptr<StorageAzureBlobSource::IIterator> file_iterator;
if (configuration.withGlobs())
if (distributed_processing)
{
file_iterator = std::make_shared<StorageAzureBlobSource::ReadIterator>(ctx,
ctx->getReadTaskCallback());
}
else if (configuration.withGlobs())
{
file_iterator = std::make_shared<StorageAzureBlobSource::GlobIterator>(
object_storage, configuration.container, configuration.blob_path, nullptr, Block{}, ctx, &read_keys);

View File

@ -63,6 +63,7 @@ public:
const ConstraintsDescription & constraints_,
const String & comment,
std::optional<FormatSettings> format_settings_,
bool distributed_processing_,
ASTPtr partition_by_);
static StorageAzureBlob::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context);
@ -108,7 +109,8 @@ public:
AzureObjectStorage * object_storage,
const Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx);
ContextPtr ctx,
bool distributed_processing = false);
private:
std::string name;
@ -137,7 +139,6 @@ private:
const String & format_name,
const ContextPtr & ctx);
};
class StorageAzureBlobSource : public ISource, WithContext
@ -169,7 +170,7 @@ public:
RelativePathWithMetadata next() override;
~GlobIterator() override = default;
private:
private:
AzureObjectStorage * object_storage;
std::string container;
String blob_path_with_globs;
@ -194,6 +195,21 @@ public:
std::function<void(FileProgress)> file_progress_callback;
};
class ReadIterator : public IIterator
{
public:
explicit ReadIterator(ContextPtr context_,
const ReadTaskCallback & callback_)
: IIterator(context_), callback(callback_) { }
RelativePathWithMetadata next() override
{
return {callback(), {}};
}
private:
ReadTaskCallback callback;
};
class KeysIterator : public IIterator
{
public:

View File

@ -0,0 +1,99 @@
#include "Storages/StorageAzureBlobCluster.h"
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
#include <DataTypes/DataTypeString.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Storages/IStorage.h>
#include <Storages/StorageURL.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDictionary.h>
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h>
#include <Storages/getVirtualsForStorage.h>
#include <Common/Exception.h>
#include <Parsers/queryToString.h>
#include <TableFunctions/TableFunctionAzureBlobStorageCluster.h>
#include <memory>
#include <string>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
StorageAzureBlobCluster::StorageAzureBlobCluster(
const String & cluster_name_,
const StorageAzureBlob::Configuration & configuration_,
std::unique_ptr<AzureObjectStorage> && object_storage_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_,
bool structure_argument_was_provided_)
: IStorageCluster(cluster_name_, table_id_, &Poco::Logger::get("StorageAzureBlobCluster (" + table_id_.table_name + ")"), structure_argument_was_provided_)
, configuration{configuration_}
, object_storage(std::move(object_storage_))
{
context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration_.getConnectionURL());
StorageInMemoryMetadata storage_metadata;
if (columns_.empty())
{
/// `format_settings` is set to std::nullopt, because StorageAzureBlobCluster is used only as table function
auto columns = StorageAzureBlob::getTableStructureFromData(object_storage.get(), configuration, /*format_settings=*/std::nullopt, context_, false);
storage_metadata.setColumns(columns);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
auto default_virtuals = NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
for (const auto & column : virtual_columns)
virtual_block.insert({column.type->createColumn(), column.type, column.name});
}
void StorageAzureBlobCluster::addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context)
{
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query);
if (!expression_list)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function s3Cluster, got '{}'", queryToString(query));
TableFunctionAzureBlobStorageCluster::addColumnsStructureToArguments(expression_list->children, structure, context);
}
RemoteQueryExecutor::Extension StorageAzureBlobCluster::getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const
{
auto iterator = std::make_shared<StorageAzureBlobSource::GlobIterator>(
object_storage.get(), configuration.container, configuration.blob_path,
query, virtual_block, context, nullptr);
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String{ return iterator->next().relative_path; });
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
}
NamesAndTypesList StorageAzureBlobCluster::getVirtuals() const
{
return virtual_columns;
}
}
#endif

View File

@ -0,0 +1,53 @@
#pragma once
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
#include <memory>
#include <optional>
#include "Client/Connection.h"
#include <Interpreters/Cluster.h>
#include <Storages/IStorageCluster.h>
#include <Storages/StorageAzureBlob.h>
namespace DB
{
class Context;
class StorageAzureBlobCluster : public IStorageCluster
{
public:
StorageAzureBlobCluster(
const String & cluster_name_,
const StorageAzureBlob::Configuration & configuration_,
std::unique_ptr<AzureObjectStorage> && object_storage_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_,
bool structure_argument_was_provided_);
std::string getName() const override { return "AzureBlobStorageCluster"; }
NamesAndTypesList getVirtuals() const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const override;
private:
void updateBeforeRead(const ContextPtr & /*context*/) override {}
void addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context) override;
StorageAzureBlob::Configuration configuration;
NamesAndTypesList virtual_columns;
Block virtual_block;
std::unique_ptr<AzureObjectStorage> object_storage;
};
}
#endif

View File

@ -44,10 +44,8 @@ bool isConnectionString(const std::string & candidate)
}
StorageAzureBlob::Configuration TableFunctionAzureBlobStorage::parseArgumentsImpl(ASTs & engine_args, const ContextPtr & local_context, bool get_format_from_file)
void TableFunctionAzureBlobStorage::parseArgumentsImpl(ASTs & engine_args, const ContextPtr & local_context)
{
StorageAzureBlob::Configuration configuration;
/// Supported signatures:
///
/// AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])
@ -59,87 +57,80 @@ StorageAzureBlob::Configuration TableFunctionAzureBlobStorage::parseArgumentsImp
configuration.blobs_paths = {configuration.blob_path};
if (configuration.format == "auto" && get_format_from_file)
if (configuration.format == "auto")
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path, true);
return configuration;
}
if (engine_args.size() < 3 || engine_args.size() > 8)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage Azure requires 3 to 7 arguments: "
"AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])");
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context);
std::unordered_map<std::string_view, size_t> engine_args_to_idx;
configuration.connection_url = checkAndGetLiteralArgument<String>(engine_args[0], "connection_string/storage_account_url");
configuration.is_connection_string = isConnectionString(configuration.connection_url);
configuration.container = checkAndGetLiteralArgument<String>(engine_args[1], "container");
configuration.blob_path = checkAndGetLiteralArgument<String>(engine_args[2], "blobpath");
auto is_format_arg = [] (const std::string & s) -> bool
else
{
return s == "auto" || FormatFactory::instance().getAllFormats().contains(s);
};
if (engine_args.size() < 3 || engine_args.size() > 8)
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage Azure requires 3 to 7 arguments: "
"AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])");
if (engine_args.size() == 4)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name/structure");
if (is_format_arg(fourth_arg))
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context);
std::unordered_map<std::string_view, size_t> engine_args_to_idx;
configuration.connection_url = checkAndGetLiteralArgument<String>(engine_args[0], "connection_string/storage_account_url");
configuration.is_connection_string = isConnectionString(configuration.connection_url);
configuration.container = checkAndGetLiteralArgument<String>(engine_args[1], "container");
configuration.blob_path = checkAndGetLiteralArgument<String>(engine_args[2], "blobpath");
auto is_format_arg
= [](const std::string & s) -> bool { return s == "auto" || FormatFactory::instance().getAllFormats().contains(s); };
if (engine_args.size() == 4)
{
configuration.format = fourth_arg;
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name/structure");
if (is_format_arg(fourth_arg))
{
configuration.format = fourth_arg;
}
else
{
configuration.structure = fourth_arg;
}
}
else
else if (engine_args.size() == 5)
{
configuration.structure = fourth_arg;
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name");
if (is_format_arg(fourth_arg))
{
configuration.format = fourth_arg;
configuration.compression_method = checkAndGetLiteralArgument<String>(engine_args[4], "compression");
}
else
{
configuration.account_name = fourth_arg;
configuration.account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key");
}
}
}
else if (engine_args.size() == 5)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name");
if (is_format_arg(fourth_arg))
else if (engine_args.size() == 6)
{
configuration.format = fourth_arg;
configuration.compression_method = checkAndGetLiteralArgument<String>(engine_args[4], "compression");
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name");
if (is_format_arg(fourth_arg))
{
configuration.format = fourth_arg;
configuration.compression_method = checkAndGetLiteralArgument<String>(engine_args[4], "compression");
configuration.structure = checkAndGetLiteralArgument<String>(engine_args[5], "structure");
}
else
{
configuration.account_name = fourth_arg;
configuration.account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key");
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name/structure");
if (is_format_arg(sixth_arg))
configuration.format = sixth_arg;
else
configuration.structure = sixth_arg;
}
}
else
{
configuration.account_name = fourth_arg;
configuration.account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key");
}
}
else if (engine_args.size() == 6)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name");
if (is_format_arg(fourth_arg))
{
configuration.format = fourth_arg;
configuration.compression_method = checkAndGetLiteralArgument<String>(engine_args[4], "compression");
configuration.structure = checkAndGetLiteralArgument<String>(engine_args[5], "structure");
}
else
{
configuration.account_name = fourth_arg;
configuration.account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key");
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name");
if (!is_format_arg(sixth_arg))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg);
configuration.format = sixth_arg;
}
}
else if (engine_args.size() == 7)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name");
if (is_format_arg(fourth_arg))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Format, compression and structure must be last arguments");
}
else
else if (engine_args.size() == 7)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name");
configuration.account_name = fourth_arg;
configuration.account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key");
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name");
@ -148,17 +139,9 @@ StorageAzureBlob::Configuration TableFunctionAzureBlobStorage::parseArgumentsImp
configuration.format = sixth_arg;
configuration.compression_method = checkAndGetLiteralArgument<String>(engine_args[6], "compression");
}
}
else if (engine_args.size() == 8)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name");
if (is_format_arg(fourth_arg))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Format and compression must be last arguments");
}
else
else if (engine_args.size() == 8)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name");
configuration.account_name = fourth_arg;
configuration.account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key");
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name");
@ -168,14 +151,12 @@ StorageAzureBlob::Configuration TableFunctionAzureBlobStorage::parseArgumentsImp
configuration.compression_method = checkAndGetLiteralArgument<String>(engine_args[6], "compression");
configuration.structure = checkAndGetLiteralArgument<String>(engine_args[7], "structure");
}
configuration.blobs_paths = {configuration.blob_path};
if (configuration.format == "auto")
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path, true);
}
configuration.blobs_paths = {configuration.blob_path};
if (configuration.format == "auto" && get_format_from_file)
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path, true);
return configuration;
}
void TableFunctionAzureBlobStorage::parseArguments(const ASTPtr & ast_function, ContextPtr context)
@ -190,7 +171,87 @@ void TableFunctionAzureBlobStorage::parseArguments(const ASTPtr & ast_function,
auto & args = args_func.at(0)->children;
configuration = parseArgumentsImpl(args, context);
parseArgumentsImpl(args, context);
}
void TableFunctionAzureBlobStorage::addColumnsStructureToArguments(ASTs & args, const String & structure, const ContextPtr & context)
{
if (tryGetNamedCollectionWithOverrides(args, context))
{
/// In case of named collection, just add key-value pair "structure='...'"
/// at the end of arguments to override existed structure.
ASTs equal_func_args = {std::make_shared<ASTIdentifier>("structure"), std::make_shared<ASTLiteral>(structure)};
auto equal_func = makeASTFunction("equals", std::move(equal_func_args));
args.push_back(equal_func);
}
else
{
if (args.size() < 3 || args.size() > 8)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage Azure requires 3 to 7 arguments: "
"AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])");
auto structure_literal = std::make_shared<ASTLiteral>(structure);
auto is_format_arg
= [](const std::string & s) -> bool { return s == "auto" || FormatFactory::instance().getAllFormats().contains(s); };
if (args.size() == 3)
{
/// Add format=auto & compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
else if (args.size() == 4)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/account_name/structure");
if (is_format_arg(fourth_arg))
{
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
else
{
args.back() = structure_literal;
}
}
else if (args.size() == 5)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/account_name");
if (!is_format_arg(fourth_arg))
{
/// Add format=auto & compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(std::make_shared<ASTLiteral>("auto"));
}
args.push_back(structure_literal);
}
else if (args.size() == 6)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/account_name");
if (!is_format_arg(fourth_arg))
{
/// Add compression=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
else
{
args.back() = structure_literal;
}
}
else if (args.size() == 7)
{
args.push_back(structure_literal);
}
else if (args.size() == 8)
{
args.back() = structure_literal;
}
}
}
ColumnsDescription TableFunctionAzureBlobStorage::getActualTableStructure(ContextPtr context, bool is_insert_query) const
@ -202,7 +263,7 @@ ColumnsDescription TableFunctionAzureBlobStorage::getActualTableStructure(Contex
auto settings = StorageAzureBlob::createSettings(context);
auto object_storage = std::make_unique<AzureObjectStorage>("AzureBlobStorageTableFunction", std::move(client), std::move(settings));
return StorageAzureBlob::getTableStructureFromData(object_storage.get(), configuration, std::nullopt, context);
return StorageAzureBlob::getTableStructureFromData(object_storage.get(), configuration, std::nullopt, context, false);
}
return parseColumnsListFromString(configuration.structure, context);
@ -234,6 +295,7 @@ StoragePtr TableFunctionAzureBlobStorage::executeImpl(const ASTPtr & /*ast_funct
String{},
/// No format_settings for table function Azure
std::nullopt,
/* distributed_processing */ false,
nullptr);
storage->startup();

View File

@ -13,13 +13,23 @@ namespace DB
class Context;
/* AzureBlob(source, [access_key_id, secret_access_key,] [format, structure, compression]) - creates a temporary storage for a file in AzureBlob.
/* AzureBlob(source, [access_key_id, secret_access_key,] [format, compression, structure]) - creates a temporary storage for a file in AzureBlob.
*/
class TableFunctionAzureBlobStorage : public ITableFunction
{
public:
static constexpr auto name = "azureBlobStorage";
static constexpr auto signature = "- connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure]\n";
static constexpr auto signature = " - connection_string, container_name, blobpath\n"
" - connection_string, container_name, blobpath, structure \n"
" - connection_string, container_name, blobpath, format \n"
" - connection_string, container_name, blobpath, format, compression \n"
" - connection_string, container_name, blobpath, format, compression, structure \n"
" - storage_account_url, container_name, blobpath, account_name, account_key\n"
" - storage_account_url, container_name, blobpath, account_name, account_key, structure\n"
" - storage_account_url, container_name, blobpath, account_name, account_key, format\n"
" - storage_account_url, container_name, blobpath, account_name, account_key, format, compression\n"
" - storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure\n";
static size_t getMaxNumberOfArguments() { return 8; }
@ -46,7 +56,9 @@ public:
return {"_path", "_file"};
}
static StorageAzureBlob::Configuration parseArgumentsImpl(ASTs & args, const ContextPtr & context, bool get_format_from_file = true);
virtual void parseArgumentsImpl(ASTs & args, const ContextPtr & context);
static void addColumnsStructureToArguments(ASTs & args, const String & structure, const ContextPtr & context);
protected:

View File

@ -0,0 +1,85 @@
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
#include <TableFunctions/TableFunctionAzureBlobStorageCluster.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Storages/StorageAzureBlob.h>
#include "registerTableFunctions.h"
#include <memory>
namespace DB
{
StoragePtr TableFunctionAzureBlobStorageCluster::executeImpl(
const ASTPtr & /*function*/, ContextPtr context,
const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
{
StoragePtr storage;
ColumnsDescription columns;
bool structure_argument_was_provided = configuration.structure != "auto";
if (structure_argument_was_provided)
{
columns = parseColumnsListFromString(configuration.structure, context);
}
else if (!structure_hint.empty())
{
columns = structure_hint;
}
auto client = StorageAzureBlob::createClient(configuration, !is_insert_query);
auto settings = StorageAzureBlob::createSettings(context);
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
{
/// On worker node this filename won't contains globs
storage = std::make_shared<StorageAzureBlob>(
configuration,
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings)),
context,
StorageID(getDatabaseName(), table_name),
columns,
ConstraintsDescription{},
/* comment */String{},
/* format_settings */std::nullopt, /// No format_settings
/* distributed_processing */ true,
/*partition_by_=*/nullptr);
}
else
{
storage = std::make_shared<StorageAzureBlobCluster>(
cluster_name,
configuration,
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::move(settings)),
StorageID(getDatabaseName(), table_name),
columns,
ConstraintsDescription{},
context,
structure_argument_was_provided);
}
storage->startup();
return storage;
}
void registerTableFunctionAzureBlobStorageCluster(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionAzureBlobStorageCluster>(
{.documentation
= {.description=R"(The table function can be used to read the data stored on Azure Blob Storage in parallel for many nodes in a specified cluster.)",
.examples{{"azureBlobStorageCluster", "SELECT * FROM azureBlobStorageCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])", ""}}},
.allow_readonly = false}
);
}
}
#endif

View File

@ -0,0 +1,55 @@
#pragma once
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionAzureBlobStorage.h>
#include <TableFunctions/ITableFunctionCluster.h>
#include <Storages/StorageAzureBlobCluster.h>
namespace DB
{
class Context;
/**
* azureBlobStorageCluster(cluster_name, source, [access_key_id, secret_access_key,] format, compression_method, structure)
* A table function, which allows to process many files from Azure Blob Storage on a specific cluster
* On initiator it creates a connection to _all_ nodes in cluster, discloses asterisks
* in Azure Blob Storage file path and dispatch each file dynamically.
* On worker node it asks initiator about next task to process, processes it.
* This is repeated until the tasks are finished.
*/
class TableFunctionAzureBlobStorageCluster : public ITableFunctionCluster<TableFunctionAzureBlobStorage>
{
public:
static constexpr auto name = "azureBlobStorageCluster";
static constexpr auto signature = " - cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure]";
String getName() const override
{
return name;
}
String getSignature() const override
{
return signature;
}
protected:
StoragePtr executeImpl(
const ASTPtr & ast_function,
ContextPtr context,
const std::string & table_name,
ColumnsDescription cached_columns,
bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "AzureBlobStorageCluster"; }
};
}
#endif

View File

@ -75,6 +75,7 @@ void registerTableFunctions()
#if USE_AZURE_BLOB_STORAGE
registerTableFunctionAzureBlobStorage(factory);
registerTableFunctionAzureBlobStorageCluster(factory);
#endif

View File

@ -72,6 +72,7 @@ void registerTableFunctionExplain(TableFunctionFactory & factory);
#if USE_AZURE_BLOB_STORAGE
void registerTableFunctionAzureBlobStorage(TableFunctionFactory & factory);
void registerTableFunctionAzureBlobStorageCluster(TableFunctionFactory & factory);
#endif
void registerTableFunctions();

View File

@ -0,0 +1,39 @@
<clickhouse>
<remote_servers>
<simple_cluster>
<shard>
<replica>
<host>node_0</host>
<port>9000</port>
</replica>
<replica>
<host>node_1</host>
<port>9000</port>
</replica>
<replica>
<host>node_2</host>
<port>9000</port>
</replica>
</shard>
</simple_cluster>
<cluster_non_existent_port>
<shard>
<replica>
<host>node_0</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node_1</host>
<port>19000</port>
</replica>
</shard>
</cluster_non_existent_port>
</remote_servers>
<macros>
<default_cluster_macro>simple_cluster</default_cluster_macro>
</macros>
</clickhouse>

View File

@ -657,3 +657,55 @@ def test_read_from_not_existing_container(cluster):
query = f"select * from azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont_not_exists', 'test_table.csv', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto')"
expected_err_msg = "container does not exist"
assert expected_err_msg in node.query_and_get_error(query)
def test_function_signatures(cluster):
node = cluster.instances["node"]
connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite1:10000/devstoreaccount1;"
storage_account_url = "http://azurite1:10000/devstoreaccount1"
account_name = "devstoreaccount1"
account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('{storage_account_url}', 'cont', 'test_signature.csv', '{account_name}', '{account_key}', 'CSV', 'auto', 'column1 UInt32') VALUES (1),(2),(3)",
)
# " - connection_string, container_name, blobpath\n"
query_1 = f"select * from azureBlobStorage('{connection_string}', 'cont', 'test_signature.csv')"
assert azure_query(node, query_1) == "1\n2\n3\n"
# " - connection_string, container_name, blobpath, structure \n"
query_2 = f"select * from azureBlobStorage('{connection_string}', 'cont', 'test_signature.csv', 'column1 UInt32')"
assert azure_query(node, query_2) == "1\n2\n3\n"
# " - connection_string, container_name, blobpath, format \n"
query_3 = f"select * from azureBlobStorage('{connection_string}', 'cont', 'test_signature.csv', 'CSV')"
assert azure_query(node, query_3) == "1\n2\n3\n"
# " - connection_string, container_name, blobpath, format, compression \n"
query_4 = f"select * from azureBlobStorage('{connection_string}', 'cont', 'test_signature.csv', 'CSV', 'auto')"
assert azure_query(node, query_4) == "1\n2\n3\n"
# " - connection_string, container_name, blobpath, format, compression, structure \n"
query_5 = f"select * from azureBlobStorage('{connection_string}', 'cont', 'test_signature.csv', 'CSV', 'auto', 'column1 UInt32')"
assert azure_query(node, query_5) == "1\n2\n3\n"
# " - storage_account_url, container_name, blobpath, account_name, account_key\n"
query_6 = f"select * from azureBlobStorage('{storage_account_url}', 'cont', 'test_signature.csv', '{account_name}', '{account_key}')"
assert azure_query(node, query_6) == "1\n2\n3\n"
# " - storage_account_url, container_name, blobpath, account_name, account_key, structure\n"
query_7 = f"select * from azureBlobStorage('{storage_account_url}', 'cont', 'test_signature.csv', '{account_name}', '{account_key}', 'column1 UInt32')"
assert azure_query(node, query_7) == "1\n2\n3\n"
# " - storage_account_url, container_name, blobpath, account_name, account_key, format\n"
query_8 = f"select * from azureBlobStorage('{storage_account_url}', 'cont', 'test_signature.csv', '{account_name}', '{account_key}', 'CSV')"
assert azure_query(node, query_8) == "1\n2\n3\n"
# " - storage_account_url, container_name, blobpath, account_name, account_key, format, compression\n"
query_9 = f"select * from azureBlobStorage('{storage_account_url}', 'cont', 'test_signature.csv', '{account_name}', '{account_key}', 'CSV', 'auto')"
assert azure_query(node, query_9) == "1\n2\n3\n"
# " - storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure\n"
query_10 = f"select * from azureBlobStorage('{storage_account_url}', 'cont', 'test_signature.csv', '{account_name}', '{account_key}', 'CSV', 'auto', 'column1 UInt32')"
assert azure_query(node, query_10) == "1\n2\n3\n"

View File

@ -0,0 +1,288 @@
#!/usr/bin/env python3
import gzip
import json
import logging
import os
import io
import random
import threading
import time
from azure.storage.blob import BlobServiceClient
import helpers.client
import pytest
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from helpers.test_tools import TSV
from helpers.network import PartitionManager
from helpers.mock_servers import start_mock_servers
from helpers.test_tools import exec_query_with_retry
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node_0",
main_configs=["configs/named_collections.xml", "configs/cluster.xml"],
user_configs=["configs/disable_profilers.xml", "configs/users.xml"],
with_azurite=True,
)
cluster.add_instance(
"node_1",
main_configs=["configs/named_collections.xml", "configs/cluster.xml"],
user_configs=["configs/disable_profilers.xml", "configs/users.xml"],
with_azurite=True,
)
cluster.add_instance(
"node_2",
main_configs=["configs/named_collections.xml", "configs/cluster.xml"],
user_configs=["configs/disable_profilers.xml", "configs/users.xml"],
with_azurite=True,
)
cluster.start()
yield cluster
finally:
cluster.shutdown()
def azure_query(node, query, try_num=3, settings={}):
for i in range(try_num):
try:
return node.query(query, settings=settings)
except Exception as ex:
retriable_errors = [
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response"
]
retry = False
for error in retriable_errors:
if error in str(ex):
retry = True
print(f"Try num: {i}. Having retriable error: {ex}")
time.sleep(i)
break
if not retry or i == try_num - 1:
raise Exception(ex)
continue
def get_azure_file_content(filename):
container_name = "cont"
connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(filename)
download_stream = blob_client.download_blob()
return download_stream.readall().decode("utf-8")
def test_select_all(cluster):
node = cluster.instances["node_0"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage("
"'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1', "
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', "
"'auto', 'key UInt64, data String') VALUES (1, 'a'), (2, 'b')",
)
print(get_azure_file_content("test_cluster_select_all.csv"))
pure_azure = node.query(
"""
SELECT * from azureBlobStorage(
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
'auto')"""
)
print(pure_azure)
distributed_azure = node.query(
"""
SELECT * from azureBlobStorageCluster(
'simple_cluster', 'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_select_all.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
'auto')"""
)
print(distributed_azure)
assert TSV(pure_azure) == TSV(distributed_azure)
def test_count(cluster):
node = cluster.instances["node_0"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage("
"'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_count.csv', 'devstoreaccount1', "
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', "
"'auto', 'key UInt64') VALUES (1), (2)",
)
print(get_azure_file_content("test_cluster_count.csv"))
pure_azure = node.query(
"""
SELECT count(*) from azureBlobStorage(
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_count.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
'auto', 'key UInt64')"""
)
print(pure_azure)
distributed_azure = node.query(
"""
SELECT count(*) from azureBlobStorageCluster(
'simple_cluster', 'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_count.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
'auto', 'key UInt64')"""
)
print(distributed_azure)
assert TSV(pure_azure) == TSV(distributed_azure)
def test_union_all(cluster):
node = cluster.instances["node_0"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage("
"'http://azurite1:10000/devstoreaccount1', 'cont', 'test_parquet_union_all', 'devstoreaccount1', "
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet', "
"'auto', 'a Int32, b String') VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')",
)
pure_azure = node.query(
"""
SELECT * FROM
(
SELECT * from azureBlobStorage(
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_parquet_union_all', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet',
'auto', 'a Int32, b String')
UNION ALL
SELECT * from azureBlobStorage(
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_parquet_union_all', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet',
'auto', 'a Int32, b String')
)
ORDER BY (a)
"""
)
azure_distributed = node.query(
"""
SELECT * FROM
(
SELECT * from azureBlobStorageCluster(
'simple_cluster',
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_parquet_union_all', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet',
'auto', 'a Int32, b String')
UNION ALL
SELECT * from azureBlobStorageCluster(
'simple_cluster',
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_parquet_union_all', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet',
'auto', 'a Int32, b String')
)
ORDER BY (a)
"""
)
assert TSV(pure_azure) == TSV(azure_distributed)
def test_skip_unavailable_shards(cluster):
node = cluster.instances["node_0"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage("
"'http://azurite1:10000/devstoreaccount1', 'cont', 'test_skip_unavailable.csv', 'devstoreaccount1', "
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', "
"'auto', 'a UInt64') VALUES (1), (2)",
)
result = node.query(
"""
SELECT count(*) from azureBlobStorageCluster(
'cluster_non_existent_port',
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_skip_unavailable.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==')
SETTINGS skip_unavailable_shards = 1
"""
)
assert result == "2\n"
def test_unset_skip_unavailable_shards(cluster):
# Although skip_unavailable_shards is not set, cluster table functions should always skip unavailable shards.
node = cluster.instances["node_0"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage("
"'http://azurite1:10000/devstoreaccount1', 'cont', 'test_unset_skip_unavailable.csv', 'devstoreaccount1', "
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', "
"'auto', 'a UInt64') VALUES (1), (2)",
)
result = node.query(
"""
SELECT count(*) from azureBlobStorageCluster(
'cluster_non_existent_port',
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_skip_unavailable.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==')
"""
)
assert result == "2\n"
def test_cluster_with_named_collection(cluster):
node = cluster.instances["node_0"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azureBlobStorage("
"'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_with_named_collection.csv', 'devstoreaccount1', "
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'auto', "
"'auto', 'a UInt64') VALUES (1), (2)",
)
pure_azure = node.query(
"""
SELECT * from azureBlobStorage(
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_cluster_with_named_collection.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==')
"""
)
azure_cluster = node.query(
"""
SELECT * from azureBlobStorageCluster(
'simple_cluster', azure_conf2, container='cont', blob_path='test_cluster_with_named_collection.csv')
"""
)
assert TSV(pure_azure) == TSV(azure_cluster)
def test_partition_parallel_readig_withcluster(cluster):
node = cluster.instances["node_0"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
partition_by = "column3"
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
filename = "test_tf_{_partition_id}.csv"
azure_query(
node,
f"INSERT INTO TABLE FUNCTION azureBlobStorage('http://azurite1:10000/devstoreaccount1', 'cont', '{filename}', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', 'auto', '{table_format}') PARTITION BY {partition_by} VALUES {values}",
)
assert "1,2,3\n" == get_azure_file_content("test_tf_3.csv")
assert "3,2,1\n" == get_azure_file_content("test_tf_1.csv")
assert "78,43,45\n" == get_azure_file_content("test_tf_45.csv")
azure_cluster = node.query(
"""
SELECT count(*) from azureBlobStorageCluster(
'simple_cluster',
azure_conf2, container='cont', blob_path='test_tf_*.csv', format='CSV', compression='auto', structure='column1 UInt32, column2 UInt32, column3 UInt32')
"""
)
assert azure_cluster == "3\n"

View File

@ -1096,6 +1096,7 @@ avro
avx
aws
azureBlobStorage
azureBlobStorageCluster
backend
backoff
backticks