WIP : Azure Table Function, added read and StorageAzureSource

This commit is contained in:
Smita Kulkarni 2023-06-05 20:36:17 +02:00
parent 93e9752a3f
commit dedb9067ce
10 changed files with 354 additions and 9 deletions

View File

@ -348,6 +348,9 @@ The server successfully detected this situation and will download merged part fr
M(S3PutObject, "Number of S3 API PutObject calls.") \
M(S3GetObject, "Number of S3 API GetObject calls.") \
\
M(AzureDeleteObjects, "Number of S3 API DeleteObject(s) calls.") \
M(AzureListObjects, "Number of S3 API ListObjects calls.") \
\
M(DiskS3DeleteObjects, "Number of DiskS3 API DeleteObject(s) calls.") \
M(DiskS3CopyObject, "Number of DiskS3 API CopyObject calls.") \
M(DiskS3ListObjects, "Number of DiskS3 API ListObjects calls.") \

View File

@ -598,6 +598,8 @@ Block ActionsDAG::updateHeader(Block header) const
}
ColumnsWithTypeAndName result_columns;
result_columns.reserve(outputs.size());
struct Frame

View File

@ -15,6 +15,7 @@
#include <Formats/FormatFactory.h>
#include <Formats/ReadSchemaUtils.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <re2/re2.h>
#include <azure/identity/managed_identity_credential.hpp>
@ -32,6 +33,9 @@
#include <Storages/NamedCollectionsHelpers.h>
#include <Storages/ReadFromStorageProgress.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/Pipe.h>
using namespace Azure::Storage::Blobs;
@ -52,6 +56,8 @@ bool isConnectionString(const std::string & candidate)
StorageAzure::Configuration StorageAzure::getConfiguration(ASTs & engine_args, ContextPtr local_context, bool get_format_from_file)
{
LOG_INFO(&Poco::Logger::get("StorageAzure"), "get_format_from_file = {}", get_format_from_file);
StorageAzure::Configuration configuration;
/// Supported signatures:
@ -74,6 +80,11 @@ StorageAzure::Configuration StorageAzure::getConfiguration(ASTs & engine_args, C
configuration.container = checkAndGetLiteralArgument<String>(engine_args[1], "container");
configuration.blob_path = checkAndGetLiteralArgument<String>(engine_args[2], "blobpath");
LOG_INFO(&Poco::Logger::get("StorageAzure"), "connection_url = {}", configuration.connection_url);
LOG_INFO(&Poco::Logger::get("StorageAzure"), "container = {}", configuration.container);
LOG_INFO(&Poco::Logger::get("StorageAzure"), "blobpath = {}", configuration.blob_path);
auto is_format_arg = [] (const std::string & s) -> bool
{
return s == "auto" || FormatFactory::instance().getAllFormats().contains(s);
@ -81,6 +92,7 @@ StorageAzure::Configuration StorageAzure::getConfiguration(ASTs & engine_args, C
if (engine_args.size() == 4)
{
//'c1 UInt64, c2 UInt64
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name");
if (is_format_arg(fourth_arg))
{
@ -143,8 +155,13 @@ StorageAzure::Configuration StorageAzure::getConfiguration(ASTs & engine_args, C
configuration.blobs_paths = {configuration.blob_path};
if (configuration.format == "auto" && get_format_from_file)
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path, true);
LOG_INFO(&Poco::Logger::get("StorageAzure"), "get_format_from_file = {}", get_format_from_file);
// if (configuration.format == "auto" && get_format_from_file)
// configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.blob_path, true);
configuration.format = "TSV";
return configuration;
}
@ -215,6 +232,7 @@ AzureClientPtr StorageAzure::createClient(StorageAzure::Configuration configurat
if (configuration.is_connection_string)
{
LOG_INFO(&Poco::Logger::get("StorageAzure"), "createClient is_connection_string ");
result = std::make_unique<BlobContainerClient>(BlobContainerClient::CreateFromConnectionString(configuration.connection_url, configuration.container));
}
else
@ -228,8 +246,14 @@ AzureClientPtr StorageAzure::createClient(StorageAzure::Configuration configurat
auto managed_identity_credential = std::make_shared<Azure::Identity::ManagedIdentityCredential>();
result = std::make_unique<BlobContainerClient>(configuration.connection_url, managed_identity_credential);
LOG_INFO(&Poco::Logger::get("StorageAzure"), "createClient account_name & account_key ");
}
return result;
}
@ -251,15 +275,13 @@ StorageAzure::StorageAzure(
, format_settings(format_settings_)
, partition_by(partition_by_)
{
FormatFactory::instance().checkFormatName(configuration.format);
// FormatFactory::instance().checkFormatName(configuration.format);
context_->getGlobalContext()->getRemoteHostFilter().checkURL(Poco::URI(configuration.getConnectionURL()));
StorageInMemoryMetadata storage_metadata;
if (columns_.empty())
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Schema inference is not supported yet");
//auto columns = getTableStructureFromDataImpl(configuration, format_settings, context_);
//storage_metadata.setColumns(columns);
}
else
storage_metadata.setColumns(columns_);
@ -268,11 +290,28 @@ StorageAzure::StorageAzure(
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
StoredObjects objects;
for (const auto & key : configuration.blobs_paths)
objects.emplace_back(key);
for (auto obj : objects)
{
LOG_INFO(&Poco::Logger::get("StorageAzure"), "constructor obj.remote_paths = {}", obj.remote_path);
if (object_storage->exists(obj))
{
LOG_INFO(&Poco::Logger::get("StorageAzure"), "constructor exists obj.remote_paths = {}", obj.remote_path);
// auto read_buffer = object_storage->readObject(obj);
// LOG_INFO(&Poco::Logger::get("StorageAzure"), "constructor read size obj.remote_paths = {} , size = {}", obj.remote_path, read_buffer->getFileSize());
}
}
auto default_virtuals = NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
for (const auto & column : virtual_columns)
virtual_block.insert({column.type->createColumn(), column.type, column.name});
@ -435,6 +474,35 @@ private:
}
Pipe StorageAzure::read(
const Names & /*column_names*/ ,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
size_t /*num_streams*/)
{
Pipes pipes;
StoredObjects objects;
for (const auto & key : configuration.blobs_paths)
objects.emplace_back(key);
auto reader = object_storage->readObjects(objects);
auto block_for_format = storage_snapshot->metadata->getSampleBlock();
for (auto col : block_for_format.getColumns())
LOG_INFO(&Poco::Logger::get("StorageAzure"), "read col = {}",col->getName());
pipes.emplace_back(std::make_shared<StorageAzureSource>(std::move(reader), context, block_for_format, max_block_size));
return Pipe::unitePipes(std::move(pipes));
}
SinkToStoragePtr StorageAzure::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
auto sample_block = metadata_snapshot->getSampleBlock();
@ -513,6 +581,44 @@ bool StorageAzure::supportsPartitionBy() const
return true;
}
StorageAzureSource::StorageAzureSource (std::unique_ptr<ReadBufferFromFileBase> && read_buffer_, ContextPtr context_,
const Block & sample_block_,UInt64 max_block_size_)
:ISource(Block())
, WithContext(context_)
, read_buffer(std::move(read_buffer_))
, sample_block(sample_block_)
, max_block_size(max_block_size_)
{
auto format = "TSV";
auto input_format = FormatFactory::instance().getInput(
format, *read_buffer, sample_block, getContext(), max_block_size);
QueryPipelineBuilder builder;
builder.init(Pipe(input_format));
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
}
Chunk StorageAzureSource::generate()
{
Chunk chunk;
if (reader->pull(chunk))
{
LOG_INFO(&Poco::Logger::get("StorageAzureSource"), "pulled chunk rows = {}", chunk.getNumRows());
}
return chunk;
}
String StorageAzureSource::getName() const
{
return "StorageAzureSource";
}
}
#endif

View File

@ -48,10 +48,9 @@ public:
std::string getConnectionURL() const
{
if (!is_connection_string)
// if (!is_connection_string)
return connection_url;
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Connection string not implemented yet");
//throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Connection string not implemented yet");
}
std::string connection_url;
@ -78,6 +77,11 @@ public:
static StorageAzure::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context, bool get_format_from_file = true);
static AzureClientPtr createClient(StorageAzure::Configuration configuration);
static AzureObjectStorage::SettingsPtr createSettings(StorageAzure::Configuration configuration);
static ColumnsDescription getTableStructureFromData(
const StorageAzure::Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx);
String getName() const override
{
@ -114,6 +118,35 @@ private:
std::optional<FormatSettings> format_settings;
ASTPtr partition_by;
static ColumnsDescription getTableStructureFromDataImpl(
const Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx);
};
class StorageAzureSource : public ISource, WithContext
{
public:
StorageAzureSource (std::unique_ptr<ReadBufferFromFileBase> && read_buffer_, ContextPtr context_, const Block & sample_block_, UInt64 max_block_size_);
~StorageAzureSource() override {}
Chunk generate() override;
String getName() const override;
private:
// std::unique_ptr<ReadBufferFromFileBase> read_buffer;
String path;
std::unique_ptr<ReadBufferFromFileBase> read_buffer;
// std::unique_ptr<ReadBuffer> read_buf;
std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
Block sample_block;
UInt64 max_block_size;
// void createReader();
};
}

View File

@ -17,5 +17,5 @@ add_library(clickhouse_table_functions ${clickhouse_table_functions_sources})
target_link_libraries(clickhouse_table_functions PRIVATE clickhouse_parsers clickhouse_storages_system dbms)
if (TARGET ch_contrib::hivemetastore)
target_link_libraries(clickhouse_table_functions PRIVATE ch_contrib::hivemetastore ch_contrib::hdfs ch_contrib::parquet)
target_link_libraries(clickhouse_table_functions PRIVATE ch_contrib::hivemetastore ch_contrib::hdfs ch_contrib::parquet ch_contrib::azure_sdk)
endif ()

View File

@ -4,6 +4,7 @@
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionS3.h>
#include <TableFunctions/TableFunctionAzure.h>
#include <Storages/StorageS3Cluster.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>

View File

@ -0,0 +1,118 @@
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
//#include <IO/S3Common.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionS3.h>
#include <TableFunctions/TableFunctionAzure.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Access/Common/AccessFlags.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageAzure.h>
#include <Storages/StorageS3.h>
#include <Storages/StorageURL.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Formats/FormatFactory.h>
#include "registerTableFunctions.h"
#include <Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <boost/algorithm/string.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int LOGICAL_ERROR;
}
void TableFunctionAzure::parseArgumentsImpl(ASTs & args, const ContextPtr & context)
{
if (args.size() != 5)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "The signature of table function {} shall be the following:\n{}", getName(), getSignature());
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
configuration.connection_url = checkAndGetLiteralArgument<String>(args[0], "connection_url");
configuration.container = checkAndGetLiteralArgument<String>(args[1], "container");
configuration.blob_path = checkAndGetLiteralArgument<String>(args[2], "blob_path");
configuration.format = checkAndGetLiteralArgument<String>(args[3], "format");
configuration.structure = checkAndGetLiteralArgument<String>(args[4], "structure");
}
void TableFunctionAzure::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{
LOG_INFO(&Poco::Logger::get("TableFunctionAzure"), "parseArguments = {}", ast_function->dumpTree());
ASTs & args_func = ast_function->children;
if (args_func.size() != 1)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' must have arguments.", getName());
auto & args = args_func.at(0)->children;
parseArgumentsImpl(args, context);
}
ColumnsDescription TableFunctionAzure::getActualTableStructure(ContextPtr context) const
{
return parseColumnsListFromString(configuration.structure, context);
}
bool TableFunctionAzure::supportsReadingSubsetOfColumns()
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
}
StoragePtr TableFunctionAzure::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
LOG_INFO(&Poco::Logger::get("TableFunctionAzure"), "executeImpl = {}", table_name);
ColumnsDescription columns;
columns = parseColumnsListFromString(configuration.structure, context);
configuration.is_connection_string = true;
configuration.blobs_paths = {configuration.blob_path};
auto client = StorageAzure::createClient(configuration);
StoragePtr storage = std::make_shared<StorageAzure>(
configuration,
std::make_unique<AzureObjectStorage>(table_name, std::move(client), std::make_unique<AzureObjectStorageSettings>()),
context,
StorageID(getDatabaseName(), table_name),
columns,
ConstraintsDescription{},
String{},
/// No format_settings for table function Azure
std::nullopt, nullptr);
storage->startup();
return storage;
}
void registerTableFunctionAzure(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionAzure>(
{.documentation
= {.description=R"(The table function can be used to read the data stored on Azure Blob Storage.)",
.examples{{"azure_blob", "SELECT * FROM azure_blob(connection, container, blob_path, format, structure)", ""}}},
.allow_readonly = false});
}
}
#endif

View File

@ -0,0 +1,72 @@
#pragma once
#include "config.h"
#if USE_AZURE_BLOB_STORAGE
#include <TableFunctions/ITableFunction.h>
#include <Storages/StorageAzure.h>
namespace DB
{
class Context;
/* AzureBlob(source, [access_key_id, secret_access_key,] [format, structure, compression]) - creates a temporary storage for a file in AzureBlob.
*/
class TableFunctionAzure : public ITableFunction
{
public:
static constexpr auto name = "azure_blob";
static constexpr auto signature = "- connection_url, container, blob, format, structure\n";
static size_t getMaxNumberOfArguments() { return 5; }
String getName() const override
{
return name;
}
virtual String getSignature() const
{
return signature;
}
bool hasStaticStructure() const override { return configuration.structure != "auto"; }
bool needStructureHint() const override { return configuration.structure == "auto"; }
void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }
bool supportsReadingSubsetOfColumns() override;
std::unordered_set<String> getVirtualsToCheckBeforeUsingStructureHint() const override
{
return {"_path", "_file"};
}
virtual void parseArgumentsImpl(ASTs & args, const ContextPtr & context);
static void addColumnsStructureToArguments(ASTs & args, const String & structure, const ContextPtr & context);
protected:
StoragePtr executeImpl(
const ASTPtr & ast_function,
ContextPtr context,
const std::string & table_name,
ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "Azure"; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
mutable StorageAzure::Configuration configuration;
ColumnsDescription structure_hint;
};
}
#endif

View File

@ -71,6 +71,12 @@ void registerTableFunctions()
registerTableFunctionFormat(factory);
registerTableFunctionExplain(factory);
#if USE_AZURE_BLOB_STORAGE
registerTableFunctionAzure(factory);
#endif
}
}

View File

@ -69,6 +69,10 @@ void registerTableFunctionFormat(TableFunctionFactory & factory);
void registerTableFunctionExplain(TableFunctionFactory & factory);
#if USE_AZURE_BLOB_STORAGE
void registerTableFunctionAzure(TableFunctionFactory & factory);
#endif
void registerTableFunctions();
}