From 6328811097204e87abc31973adcec84ddf8f0fc4 Mon Sep 17 00:00:00 2001 From: Smita Kulkarni Date: Thu, 8 Jun 2023 09:26:30 +0200 Subject: [PATCH] Added first draft of azure blob storage cluster --- src/Storages/StorageAzureBlob.h | 1 - src/Storages/StorageAzureBlobCluster.cpp | 105 ++++++++++++++++++ src/Storages/StorageAzureBlobCluster.h | 56 ++++++++++ .../TableFunctionAzureBlobStorage.cpp | 49 ++++++-- .../TableFunctionAzureBlobStorage.h | 4 +- .../TableFunctionAzureBlobStorageCluster.cpp | 79 +++++++++++++ .../TableFunctionAzureBlobStorageCluster.h | 54 +++++++++ src/TableFunctions/registerTableFunctions.cpp | 1 + src/TableFunctions/registerTableFunctions.h | 1 + .../configs/cluster.xml | 23 ++++ .../test_cluster.py | 102 +++++++++++++++++ 11 files changed, 463 insertions(+), 12 deletions(-) create mode 100644 src/Storages/StorageAzureBlobCluster.cpp create mode 100644 src/Storages/StorageAzureBlobCluster.h create mode 100644 src/TableFunctions/TableFunctionAzureBlobStorageCluster.cpp create mode 100644 src/TableFunctions/TableFunctionAzureBlobStorageCluster.h create mode 100644 tests/integration/test_storage_azure_blob_storage/configs/cluster.xml create mode 100644 tests/integration/test_storage_azure_blob_storage/test_cluster.py diff --git a/src/Storages/StorageAzureBlob.h b/src/Storages/StorageAzureBlob.h index 1f91e47ddbe..cbea0b1e26c 100644 --- a/src/Storages/StorageAzureBlob.h +++ b/src/Storages/StorageAzureBlob.h @@ -136,7 +136,6 @@ private: const String & format_name, const ContextPtr & ctx); - }; class StorageAzureBlobSource : public ISource, WithContext diff --git a/src/Storages/StorageAzureBlobCluster.cpp b/src/Storages/StorageAzureBlobCluster.cpp new file mode 100644 index 00000000000..203c8cbc12d --- /dev/null +++ b/src/Storages/StorageAzureBlobCluster.cpp @@ -0,0 +1,105 @@ +#include "Storages/StorageAzureBlobCluster.h" + +#include "config.h" + +#if USE_AZURE_BLOB_STORAGE + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +StorageAzureBlobCluster::StorageAzureBlobCluster( + const String & cluster_name_, + const StorageAzureBlob::Configuration & configuration_, + std::unique_ptr && object_storage_, + const StorageID & table_id_, + const ColumnsDescription & columns_, + const ConstraintsDescription & constraints_, + ContextPtr context_, + bool structure_argument_was_provided_) + : IStorageCluster(cluster_name_, table_id_, &Poco::Logger::get("StorageAzureBlobCluster (" + table_id_.table_name + ")"), structure_argument_was_provided_) + , configuration{configuration_} + , object_storage(std::move(object_storage_)) +{ + context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration_.getConnectionURL()); + StorageInMemoryMetadata storage_metadata; + updateConfigurationIfChanged(context_); + + if (columns_.empty()) + { + /// `format_settings` is set to std::nullopt, because StorageAzureBlobCluster is used only as table function + auto columns = StorageAzureBlob::getTableStructureFromData(object_storage.get(), configuration, /*format_settings=*/std::nullopt, context_); + storage_metadata.setColumns(columns); + } + else + storage_metadata.setColumns(columns_); + + storage_metadata.setConstraints(constraints_); + setInMemoryMetadata(storage_metadata); + + auto default_virtuals = NamesAndTypesList{ + {"_path", std::make_shared(std::make_shared())}, + {"_file", std::make_shared(std::make_shared())}}; + + auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList(); + virtual_columns = getVirtualsForStorage(columns, default_virtuals); + for (const auto & column : virtual_columns) + virtual_block.insert({column.type->createColumn(), column.type, column.name}); +} + +void StorageAzureBlobCluster::addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context) +{ + ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query); + if (!expression_list) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function s3Cluster, got '{}'", queryToString(query)); + + TableFunctionAzureBlobStorageCluster::addColumnsStructureToArguments(expression_list->children, structure, context); +} + +void StorageAzureBlobCluster::updateConfigurationIfChanged(ContextPtr /*local_context*/) +{ +// configuration.update(local_context); +} + +RemoteQueryExecutor::Extension StorageAzureBlobCluster::getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const +{ + auto iterator = std::make_shared( + object_storage.get(), configuration.container, std::nullopt, + configuration.blob_path, query, virtual_block, context, nullptr); + auto callback = std::make_shared>([iterator]() mutable -> String { return iterator->next().relative_path; }); + return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; +} + +NamesAndTypesList StorageAzureBlobCluster::getVirtuals() const +{ + return virtual_columns; +} + + +} + +#endif diff --git a/src/Storages/StorageAzureBlobCluster.h b/src/Storages/StorageAzureBlobCluster.h new file mode 100644 index 00000000000..015452e641a --- /dev/null +++ b/src/Storages/StorageAzureBlobCluster.h @@ -0,0 +1,56 @@ +#pragma once + +#include "config.h" + +#if USE_AZURE_BLOB_STORAGE + +#include +#include + +#include "Client/Connection.h" +#include +#include +#include + +namespace DB +{ + +class Context; + +class StorageAzureBlobCluster : public IStorageCluster +{ +public: + StorageAzureBlobCluster( + const String & cluster_name_, + const StorageAzureBlob::Configuration & configuration_, + std::unique_ptr && object_storage_, + const StorageID & table_id_, + const ColumnsDescription & columns_, + const ConstraintsDescription & constraints_, + ContextPtr context_, + bool structure_argument_was_provided_); + + std::string getName() const override { return "AzureBlobStorageCluster"; } + + NamesAndTypesList getVirtuals() const override; + + RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, const ContextPtr & context) const override; + +protected: + void updateConfigurationIfChanged(ContextPtr local_context); + +private: + void updateBeforeRead(const ContextPtr & context) override { updateConfigurationIfChanged(context); } + + void addColumnsStructureToQuery(ASTPtr & query, const String & structure, const ContextPtr & context) override; + + StorageAzureBlob::Configuration configuration; + NamesAndTypesList virtual_columns; + Block virtual_block; + std::unique_ptr object_storage; +}; + + +} + +#endif diff --git a/src/TableFunctions/TableFunctionAzureBlobStorage.cpp b/src/TableFunctions/TableFunctionAzureBlobStorage.cpp index 38d9362894a..1b29e313c50 100644 --- a/src/TableFunctions/TableFunctionAzureBlobStorage.cpp +++ b/src/TableFunctions/TableFunctionAzureBlobStorage.cpp @@ -44,10 +44,8 @@ bool isConnectionString(const std::string & candidate) } -StorageAzureBlob::Configuration TableFunctionAzureBlobStorage::parseArgumentsImpl(ASTs & engine_args, const ContextPtr & local_context, bool get_format_from_file) +void TableFunctionAzureBlobStorage::parseArgumentsImpl(ASTs & engine_args, const ContextPtr & local_context) { - StorageAzureBlob::Configuration configuration; - /// Supported signatures: /// /// AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure]) @@ -59,10 +57,8 @@ StorageAzureBlob::Configuration TableFunctionAzureBlobStorage::parseArgumentsImp configuration.blobs_paths = {configuration.blob_path}; - if (configuration.format == "auto" && get_format_from_file) + if (configuration.format == "auto") configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path, true); - - return configuration; } if (engine_args.size() < 3 || engine_args.size() > 8) @@ -172,10 +168,8 @@ StorageAzureBlob::Configuration TableFunctionAzureBlobStorage::parseArgumentsImp configuration.blobs_paths = {configuration.blob_path}; - if (configuration.format == "auto" && get_format_from_file) + if (configuration.format == "auto") configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path, true); - - return configuration; } void TableFunctionAzureBlobStorage::parseArguments(const ASTPtr & ast_function, ContextPtr context) @@ -190,9 +184,44 @@ void TableFunctionAzureBlobStorage::parseArguments(const ASTPtr & ast_function, auto & args = args_func.at(0)->children; - configuration = parseArgumentsImpl(args, context); + parseArgumentsImpl(args, context); + LOG_DEBUG(&Poco::Logger::get("DEBUG"), "CONFIGURATION {}", configuration.connection_url); } + +void TableFunctionAzureBlobStorage::addColumnsStructureToArguments(ASTs & args, const String & structure, const ContextPtr & context) +{ + if (tryGetNamedCollectionWithOverrides(args, context)) + { + /// In case of named collection, just add key-value pair "structure='...'" + /// at the end of arguments to override existed structure. + ASTs equal_func_args = {std::make_shared("structure"), std::make_shared(structure)}; + auto equal_func = makeASTFunction("equals", std::move(equal_func_args)); + args.push_back(equal_func); + } + else + { + if (args.size() < 3 || args.size() > 8) + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Storage Azure requires 3 to 7 arguments: " + "AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])"); + + auto structure_literal = std::make_shared(structure); + + if (args.size() == 3) + { + /// Add format=auto before structure argument. + args.push_back(std::make_shared("auto")); + args.push_back(structure_literal); + } + else if (args.size() == 4) + { + args.push_back(structure_literal); + } + } +} + + ColumnsDescription TableFunctionAzureBlobStorage::getActualTableStructure(ContextPtr context) const { if (configuration.structure == "auto") diff --git a/src/TableFunctions/TableFunctionAzureBlobStorage.h b/src/TableFunctions/TableFunctionAzureBlobStorage.h index 0bb872de3f3..a473b969a20 100644 --- a/src/TableFunctions/TableFunctionAzureBlobStorage.h +++ b/src/TableFunctions/TableFunctionAzureBlobStorage.h @@ -46,7 +46,9 @@ public: return {"_path", "_file"}; } - static StorageAzureBlob::Configuration parseArgumentsImpl(ASTs & args, const ContextPtr & context, bool get_format_from_file = true); + virtual void parseArgumentsImpl(ASTs & args, const ContextPtr & context); + + static void addColumnsStructureToArguments(ASTs & args, const String & structure, const ContextPtr & context); protected: diff --git a/src/TableFunctions/TableFunctionAzureBlobStorageCluster.cpp b/src/TableFunctions/TableFunctionAzureBlobStorageCluster.cpp new file mode 100644 index 00000000000..47b03e30621 --- /dev/null +++ b/src/TableFunctions/TableFunctionAzureBlobStorageCluster.cpp @@ -0,0 +1,79 @@ +#include "config.h" + +#if USE_AZURE_BLOB_STORAGE + +#include +#include +#include +#include + +#include "registerTableFunctions.h" + +#include + + +namespace DB +{ + +StoragePtr TableFunctionAzureBlobStorageCluster::executeImpl( + const ASTPtr & /*function*/, ContextPtr context, + const std::string & table_name, ColumnsDescription /*cached_columns*/) const +{ + StoragePtr storage; + ColumnsDescription columns; + bool structure_argument_was_provided = configuration.structure != "auto"; + + if (structure_argument_was_provided) + { + columns = parseColumnsListFromString(configuration.structure, context); + } + else if (!structure_hint.empty()) + { + columns = structure_hint; + } + + auto client = StorageAzureBlob::createClient(configuration); + auto settings = StorageAzureBlob::createSettings(context); + + if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) + { + /// On worker node this filename won't contains globs + storage = std::make_shared( + configuration, + std::make_unique("AzureBlobStorageTableFunction", std::move(client), std::move(settings)), + context, + StorageID(getDatabaseName(), table_name), + columns, + ConstraintsDescription{}, + /* comment */String{}, + /* format_settings */std::nullopt, /// No format_settings for S3Cluster + /*partition_by_=*/nullptr); + } + else + { + storage = std::make_shared( + cluster_name, + configuration, + std::make_unique("AzureBlobStorageTableFunction", std::move(client), std::move(settings)), + StorageID(getDatabaseName(), table_name), + columns, + ConstraintsDescription{}, + context, + structure_argument_was_provided); + } + + storage->startup(); + + return storage; +} + + +void registerTableFunctionAzureBlobStorageCluster(TableFunctionFactory & factory) +{ + factory.registerFunction(); +} + + +} + +#endif diff --git a/src/TableFunctions/TableFunctionAzureBlobStorageCluster.h b/src/TableFunctions/TableFunctionAzureBlobStorageCluster.h new file mode 100644 index 00000000000..af4f57f235e --- /dev/null +++ b/src/TableFunctions/TableFunctionAzureBlobStorageCluster.h @@ -0,0 +1,54 @@ +#pragma once + +#include "config.h" + +#if USE_AZURE_BLOB_STORAGE + +#include +#include +#include +#include + + +namespace DB +{ + +class Context; + +/** + * azure_blob_storage_cluster(cluster_name, source, [access_key_id, secret_access_key,] format, structure, compression_method) + * A table function, which allows to process many files from Azure Blob Storage on a specific cluster + * On initiator it creates a connection to _all_ nodes in cluster, discloses asterisks + * in Azure Blob Storage file path and dispatch each file dynamically. + * On worker node it asks initiator about next task to process, processes it. + * This is repeated until the tasks are finished. + */ +class TableFunctionAzureBlobStorageCluster : public ITableFunctionCluster +{ +public: + static constexpr auto name = "azure_blob_storage_cluster"; + static constexpr auto signature = " - cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure]"; + + String getName() const override + { + return name; + } + + String getSignature() const override + { + return signature; + } + +protected: + StoragePtr executeImpl( + const ASTPtr & ast_function, + ContextPtr context, + const std::string & table_name, + ColumnsDescription cached_columns) const override; + + const char * getStorageTypeName() const override { return "AzureBlobStorageCluster"; } +}; + +} + +#endif diff --git a/src/TableFunctions/registerTableFunctions.cpp b/src/TableFunctions/registerTableFunctions.cpp index 0499524a912..8b684c102fa 100644 --- a/src/TableFunctions/registerTableFunctions.cpp +++ b/src/TableFunctions/registerTableFunctions.cpp @@ -74,6 +74,7 @@ void registerTableFunctions() #if USE_AZURE_BLOB_STORAGE registerTableFunctionAzureBlobStorage(factory); + registerTableFunctionAzureBlobStorageCluster(factory); #endif diff --git a/src/TableFunctions/registerTableFunctions.h b/src/TableFunctions/registerTableFunctions.h index 393bc080a3d..2e5ef926984 100644 --- a/src/TableFunctions/registerTableFunctions.h +++ b/src/TableFunctions/registerTableFunctions.h @@ -71,6 +71,7 @@ void registerTableFunctionExplain(TableFunctionFactory & factory); #if USE_AZURE_BLOB_STORAGE void registerTableFunctionAzureBlobStorage(TableFunctionFactory & factory); +void registerTableFunctionAzureBlobStorageCluster(TableFunctionFactory & factory); #endif void registerTableFunctions(); diff --git a/tests/integration/test_storage_azure_blob_storage/configs/cluster.xml b/tests/integration/test_storage_azure_blob_storage/configs/cluster.xml new file mode 100644 index 00000000000..43df7b46f3f --- /dev/null +++ b/tests/integration/test_storage_azure_blob_storage/configs/cluster.xml @@ -0,0 +1,23 @@ + + + + + + node_0 + 9000 + + + node_1 + 9000 + + + node_2 + 9000 + + + + + + simple_cluster + + \ No newline at end of file diff --git a/tests/integration/test_storage_azure_blob_storage/test_cluster.py b/tests/integration/test_storage_azure_blob_storage/test_cluster.py new file mode 100644 index 00000000000..1d551a9a3c3 --- /dev/null +++ b/tests/integration/test_storage_azure_blob_storage/test_cluster.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 + +import gzip +import json +import logging +import os +import io +import random +import threading +import time + +from azure.storage.blob import BlobServiceClient +import helpers.client +import pytest +from helpers.cluster import ClickHouseCluster, ClickHouseInstance +from helpers.test_tools import TSV +from helpers.network import PartitionManager +from helpers.mock_servers import start_mock_servers +from helpers.test_tools import exec_query_with_retry + + +@pytest.fixture(scope="module") +def cluster(): + try: + cluster = ClickHouseCluster(__file__) + cluster.add_instance( + "node_0", + main_configs=["configs/named_collections.xml", "configs/cluster.xml"], + with_azurite=True, + ) + cluster.add_instance( + "node_1", + main_configs=["configs/named_collections.xml", "configs/cluster.xml"], + with_azurite=True, + ) + cluster.start() + + yield cluster + finally: + cluster.shutdown() + +def azure_query(node, query, try_num=3, settings={}): + for i in range(try_num): + try: + return node.query(query, settings=settings) + except Exception as ex: + retriable_errors = [ + "DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response" + ] + retry = False + for error in retriable_errors: + if error in str(ex): + retry = True + print(f"Try num: {i}. Having retriable error: {ex}") + time.sleep(i) + break + if not retry or i == try_num - 1: + raise Exception(ex) + continue + + +def get_azure_file_content(filename): + container_name = "cont" + connection_string = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;" + blob_service_client = BlobServiceClient.from_connection_string(connection_string) + container_client = blob_service_client.get_container_client(container_name) + blob_client = container_client.get_blob_client(filename) + download_stream = blob_client.download_blob() + return download_stream.readall().decode("utf-8") + + + +def test_simple_write_account_string_table_function(cluster): + node = cluster.instances["node_0"] + azure_query( + node, + "INSERT INTO TABLE FUNCTION azure_blob_storage(" + "'http://azurite1:10000/devstoreaccount1', 'cont', 'test_simple_write_tf.csv', 'devstoreaccount1', " + "'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', " + "'auto', 'key UInt64, data String') VALUES (1, 'a')", + ) + print(get_azure_file_content("test_simple_write_tf.csv")) + assert get_azure_file_content("test_simple_write_tf.csv") == '1,"a"\n' + + pure_azure = node.query( + """ + SELECT * from azure_blob_storage( + 'http://azurite1:10000/devstoreaccount1', 'cont', 'test_simple_write_tf.csv', 'devstoreaccount1', + 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', + 'auto', 'key UInt64, data String')""" + ) + print(pure_azure) + distributed_azure = node.query( + """ + SELECT * from azure_blob_storage_cluster( + 'simple_cluster', 'http://azurite1:10000/devstoreaccount1', 'cont', 'test_simple_write_tf.csv', 'devstoreaccount1', + 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV', + 'auto', 'key UInt64, data String')""" + ) + print(distributed_azure) + + assert TSV(pure_azure) == TSV(distributed_azure)