Added first draft of azure blob storage cluster

This commit is contained in:
Smita Kulkarni 2023-06-08 09:26:30 +02:00
parent caabbfd5b1
commit 6328811097
11 changed files with 463 additions and 12 deletions

View File

@ -136,7 +136,6 @@ private:
const String & format_name,
const ContextPtr & ctx);
};
class StorageAzureBlobSource : public ISource, WithContext

View File

@ -0,0 +1,105 @@
#include "Storages/StorageAzureBlobCluster.h"
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
#include <DataTypes/DataTypeString.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Storages/IStorage.h>
#include <Storages/StorageURL.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDictionary.h>
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h>
#include <Storages/getVirtualsForStorage.h>
#include <Common/Exception.h>
#include <Parsers/queryToString.h>
#include <TableFunctions/TableFunctionAzureBlobStorageCluster.h>
#include <memory>
#include <string>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
StorageAzureBlobCluster::StorageAzureBlobCluster(
const String & cluster_name_,
const StorageAzureBlob::Configuration & configuration_,
std::unique_ptr<AzureObjectStorage> && object_storage_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_,
bool structure_argument_was_provided_)
: IStorageCluster(cluster_name_, table_id_, &Poco::Logger::get("StorageAzureBlobCluster (" + table_id_.table_name + ")"), structure_argument_was_provided_)
, configuration{configuration_}
, object_storage(std::move(object_storage_))
{
context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration_.getConnectionURL());
StorageInMemoryMetadata storage_metadata;
updateConfigurationIfChanged(context_);
if (columns_.empty())
{
/// `format_settings` is set to std::nullopt, because StorageAzureBlobCluster is used only as table function
auto columns = StorageAzureBlob::getTableStructureFromData(object_storage.get(), configuration, /*format_settings=*/std::nullopt, context_);
storage_metadata.setColumns(columns);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
auto default_virtuals = NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
for (const auto & column : virtual_columns)
virtual_block.insert({column.type->createColumn(), column.type, column.name});
}
void StorageAzureBlobCluster::addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context)
{
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query);
if (!expression_list)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function s3Cluster, got '{}'", queryToString(query));
TableFunctionAzureBlobStorageCluster::addColumnsStructureToArguments(expression_list->children, structure, context);
}
void StorageAzureBlobCluster::updateConfigurationIfChanged(ContextPtr /*local_context*/)
{
// configuration.update(local_context);
}
RemoteQueryExecutor::Extension StorageAzureBlobCluster::getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const
{
auto iterator = std::make_shared<StorageAzureBlobSource::Iterator>(
object_storage.get(), configuration.container, std::nullopt,
configuration.blob_path, query, virtual_block, context, nullptr);
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String { return iterator->next().relative_path; });
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
}
NamesAndTypesList StorageAzureBlobCluster::getVirtuals() const
{
return virtual_columns;
}
}
#endif

View File

@ -0,0 +1,56 @@
#pragma once
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
#include <memory>
#include <optional>
#include "Client/Connection.h"
#include <Interpreters/Cluster.h>
#include <Storages/IStorageCluster.h>
#include <Storages/StorageAzureBlob.h>
namespace DB
{
class Context;
class StorageAzureBlobCluster : public IStorageCluster
{
public:
StorageAzureBlobCluster(
const String & cluster_name_,
const StorageAzureBlob::Configuration & configuration_,
std::unique_ptr<AzureObjectStorage> && object_storage_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_,
bool structure_argument_was_provided_);
std::string getName() const override { return "AzureBlobStorageCluster"; }
NamesAndTypesList getVirtuals() const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const override;
protected:
void updateConfigurationIfChanged(ContextPtr local_context);
private:
void updateBeforeRead(const ContextPtr & context) override { updateConfigurationIfChanged(context); }
void addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context) override;
StorageAzureBlob::Configuration configuration;
NamesAndTypesList virtual_columns;
Block virtual_block;
std::unique_ptr<AzureObjectStorage> object_storage;
};
}
#endif

View File

@ -44,10 +44,8 @@ bool isConnectionString(const std::string & candidate)
}
StorageAzureBlob::Configuration TableFunctionAzureBlobStorage::parseArgumentsImpl(ASTs & engine_args, const ContextPtr & local_context, bool get_format_from_file)
void TableFunctionAzureBlobStorage::parseArgumentsImpl(ASTs & engine_args, const ContextPtr & local_context)
{
StorageAzureBlob::Configuration configuration;
/// Supported signatures:
///
/// AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])
@ -59,10 +57,8 @@ StorageAzureBlob::Configuration TableFunctionAzureBlobStorage::parseArgumentsImp
configuration.blobs_paths = {configuration.blob_path};
if (configuration.format == "auto" && get_format_from_file)
if (configuration.format == "auto")
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path, true);
return configuration;
}
if (engine_args.size() < 3 || engine_args.size() > 8)
@ -172,10 +168,8 @@ StorageAzureBlob::Configuration TableFunctionAzureBlobStorage::parseArgumentsImp
configuration.blobs_paths = {configuration.blob_path};
if (configuration.format == "auto" && get_format_from_file)
if (configuration.format == "auto")
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path, true);
return configuration;
}
void TableFunctionAzureBlobStorage::parseArguments(const ASTPtr & ast_function, ContextPtr context)
@ -190,9 +184,44 @@ void TableFunctionAzureBlobStorage::parseArguments(const ASTPtr & ast_function,
auto & args = args_func.at(0)->children;
configuration = parseArgumentsImpl(args, context);
parseArgumentsImpl(args, context);
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "CONFIGURATION {}", configuration.connection_url);
}
void TableFunctionAzureBlobStorage::addColumnsStructureToArguments(ASTs & args, const String & structure, const ContextPtr & context)
{
if (tryGetNamedCollectionWithOverrides(args, context))
{
/// In case of named collection, just add key-value pair "structure='...'"
/// at the end of arguments to override existed structure.
ASTs equal_func_args = {std::make_shared<ASTIdentifier>("structure"), std::make_shared<ASTLiteral>(structure)};
auto equal_func = makeASTFunction("equals", std::move(equal_func_args));
args.push_back(equal_func);
}
else
{
if (args.size() < 3 || args.size() > 8)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage Azure requires 3 to 7 arguments: "
"AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])");
auto structure_literal = std::make_shared<ASTLiteral>(structure);
if (args.size() == 3)
{
/// Add format=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
else if (args.size() == 4)
{
args.push_back(structure_literal);
}
}
}
ColumnsDescription TableFunctionAzureBlobStorage::getActualTableStructure(ContextPtr context) const
{
if (configuration.structure == "auto")

View File

@ -46,7 +46,9 @@ public:
return {"_path", "_file"};
}
static StorageAzureBlob::Configuration parseArgumentsImpl(ASTs & args, const ContextPtr & context, bool get_format_from_file = true);
virtual void parseArgumentsImpl(ASTs & args, const ContextPtr & context);
static void addColumnsStructureToArguments(ASTs & args, const String & structure, const ContextPtr & context);
protected:

View File

@ -0,0 +1,79 @@
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
#include <TableFunctions/TableFunctionAzureBlobStorageCluster.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Storages/StorageAzureBlob.h>
#include "registerTableFunctions.h"
#include <memory>
namespace DB
{
StoragePtr TableFunctionAzureBlobStorageCluster::executeImpl(
const ASTPtr & /*function*/, ContextPtr context,
const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
StoragePtr storage;
ColumnsDescription columns;
bool structure_argument_was_provided = configuration.structure != "auto";
if (structure_argument_was_provided)
{
columns = parseColumnsListFromString(configuration.structure, context);
}
else if (!structure_hint.empty())
{
columns = structure_hint;
}
auto client = StorageAzureBlob::createClient(configuration);
auto settings = StorageAzureBlob::createSettings(context);
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
{
/// On worker node this filename won't contains globs
storage = std::make_shared<StorageAzureBlob>(
configuration,
std::make_unique<AzureObjectStorage>("AzureBlobStorageTableFunction", std::move(client), std::move(settings)),
context,
StorageID(getDatabaseName(), table_name),
columns,
ConstraintsDescription{},
/* comment */String{},
/* format_settings */std::nullopt, /// No format_settings for S3Cluster
/*partition_by_=*/nullptr);
}
else
{
storage = std::make_shared<StorageAzureBlobCluster>(
cluster_name,
configuration,
std::make_unique<AzureObjectStorage>("AzureBlobStorageTableFunction", std::move(client), std::move(settings)),
StorageID(getDatabaseName(), table_name),
columns,
ConstraintsDescription{},
context,
structure_argument_was_provided);
}
storage->startup();
return storage;
}
void registerTableFunctionAzureBlobStorageCluster(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionAzureBlobStorageCluster>();
}
}
#endif

View File

@ -0,0 +1,54 @@
#pragma once
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionAzureBlobStorage.h>
#include <TableFunctions/ITableFunctionCluster.h>
#include <Storages/StorageAzureBlobCluster.h>
namespace DB
{
class Context;
/**
* azure_blob_storage_cluster(cluster_name, source, [access_key_id, secret_access_key,] format, structure, compression_method)
* A table function, which allows to process many files from Azure Blob Storage on a specific cluster
* On initiator it creates a connection to _all_ nodes in cluster, discloses asterisks
* in Azure Blob Storage file path and dispatch each file dynamically.
* On worker node it asks initiator about next task to process, processes it.
* This is repeated until the tasks are finished.
*/
class TableFunctionAzureBlobStorageCluster : public ITableFunctionCluster<TableFunctionAzureBlobStorage>
{
public:
static constexpr auto name = "azure_blob_storage_cluster";
static constexpr auto signature = " - cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure]";
String getName() const override
{
return name;
}
String getSignature() const override
{
return signature;
}
protected:
StoragePtr executeImpl(
const ASTPtr & ast_function,
ContextPtr context,
const std::string & table_name,
ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "AzureBlobStorageCluster"; }
};
}
#endif

View File

@ -74,6 +74,7 @@ void registerTableFunctions()
#if USE_AZURE_BLOB_STORAGE
registerTableFunctionAzureBlobStorage(factory);
registerTableFunctionAzureBlobStorageCluster(factory);
#endif

View File

@ -71,6 +71,7 @@ void registerTableFunctionExplain(TableFunctionFactory & factory);
#if USE_AZURE_BLOB_STORAGE
void registerTableFunctionAzureBlobStorage(TableFunctionFactory & factory);
void registerTableFunctionAzureBlobStorageCluster(TableFunctionFactory & factory);
#endif
void registerTableFunctions();

View File

@ -0,0 +1,23 @@
<clickhouse>
<remote_servers>
<simple_cluster>
<shard>
<replica>
<host>node_0</host>
<port>9000</port>
</replica>
<replica>
<host>node_1</host>
<port>9000</port>
</replica>
<replica>
<host>node_2</host>
<port>9000</port>
</replica>
</shard>
</simple_cluster>
</remote_servers>
<macros>
<default_cluster_macro>simple_cluster</default_cluster_macro>
</macros>
</clickhouse>

View File

@ -0,0 +1,102 @@
#!/usr/bin/env python3
import gzip
import json
import logging
import os
import io
import random
import threading
import time
from azure.storage.blob import BlobServiceClient
import helpers.client
import pytest
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from helpers.test_tools import TSV
from helpers.network import PartitionManager
from helpers.mock_servers import start_mock_servers
from helpers.test_tools import exec_query_with_retry
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node_0",
main_configs=["configs/named_collections.xml", "configs/cluster.xml"],
with_azurite=True,
)
cluster.add_instance(
"node_1",
main_configs=["configs/named_collections.xml", "configs/cluster.xml"],
with_azurite=True,
)
cluster.start()
yield cluster
finally:
cluster.shutdown()
def azure_query(node, query, try_num=3, settings={}):
for i in range(try_num):
try:
return node.query(query, settings=settings)
except Exception as ex:
retriable_errors = [
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response"
]
retry = False
for error in retriable_errors:
if error in str(ex):
retry = True
print(f"Try num: {i}. Having retriable error: {ex}")
time.sleep(i)
break
if not retry or i == try_num - 1:
raise Exception(ex)
continue
def get_azure_file_content(filename):
container_name = "cont"
connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(filename)
download_stream = blob_client.download_blob()
return download_stream.readall().decode("utf-8")
def test_simple_write_account_string_table_function(cluster):
node = cluster.instances["node_0"]
azure_query(
node,
"INSERT INTO TABLE FUNCTION azure_blob_storage("
"'http://azurite1:10000/devstoreaccount1', 'cont', 'test_simple_write_tf.csv', 'devstoreaccount1', "
"'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', "
"'auto', 'key UInt64, data String') VALUES (1, 'a')",
)
print(get_azure_file_content("test_simple_write_tf.csv"))
assert get_azure_file_content("test_simple_write_tf.csv") == '1,"a"\n'
pure_azure = node.query(
"""
SELECT * from azure_blob_storage(
'http://azurite1:10000/devstoreaccount1', 'cont', 'test_simple_write_tf.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
'auto', 'key UInt64, data String')"""
)
print(pure_azure)
distributed_azure = node.query(
"""
SELECT * from azure_blob_storage_cluster(
'simple_cluster', 'http://azurite1:10000/devstoreaccount1', 'cont', 'test_simple_write_tf.csv', 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
'auto', 'key UInt64, data String')"""
)
print(distributed_azure)
assert TSV(pure_azure) == TSV(distributed_azure)