Merge pull request #45384 from ucasfl/iceberg

This commit is contained in:
Kseniia Sumarokova 2023-02-17 23:38:24 +01:00 committed by GitHub
commit bda4c94f66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
134 changed files with 2294 additions and 840 deletions

View File

@ -68,35 +68,35 @@ namespace ErrorCodes
extern const int CANNOT_READ_ALL_DATA;
}
class InputStreamReadBufferAdapter : public avro::InputStream
bool AvroInputStreamReadBufferAdapter::next(const uint8_t ** data, size_t * len)
{
public:
explicit InputStreamReadBufferAdapter(ReadBuffer & in_) : in(in_) {}
bool next(const uint8_t ** data, size_t * len) override
if (in.eof())
{
if (in.eof())
{
*len = 0;
return false;
}
*data = reinterpret_cast<const uint8_t *>(in.position());
*len = in.available();
in.position() += in.available();
return true;
*len = 0;
return false;
}
void backup(size_t len) override { in.position() -= len; }
*data = reinterpret_cast<const uint8_t *>(in.position());
*len = in.available();
void skip(size_t len) override { in.tryIgnore(len); }
in.position() += in.available();
return true;
}
size_t byteCount() const override { return in.count(); }
void AvroInputStreamReadBufferAdapter::backup(size_t len)
{
in.position() -= len;
}
private:
ReadBuffer & in;
};
void AvroInputStreamReadBufferAdapter::skip(size_t len)
{
in.tryIgnore(len);
}
size_t AvroInputStreamReadBufferAdapter::byteCount() const
{
return in.count();
}
/// Insert value with conversion to the column of target type.
template <typename T>
@ -757,7 +757,7 @@ AvroRowInputFormat::AvroRowInputFormat(const Block & header_, ReadBuffer & in_,
void AvroRowInputFormat::readPrefix()
{
file_reader_ptr = std::make_unique<avro::DataFileReaderBase>(std::make_unique<InputStreamReadBufferAdapter>(*in));
file_reader_ptr = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*in));
deserializer_ptr = std::make_unique<AvroDeserializer>(
output.getHeader(), file_reader_ptr->dataSchema(), format_settings.avro.allow_missing_fields, format_settings.avro.null_as_default);
file_reader_ptr->init();
@ -914,7 +914,7 @@ AvroConfluentRowInputFormat::AvroConfluentRowInputFormat(
void AvroConfluentRowInputFormat::readPrefix()
{
input_stream = std::make_unique<InputStreamReadBufferAdapter>(*in);
input_stream = std::make_unique<AvroInputStreamReadBufferAdapter>(*in);
decoder = avro::binaryDecoder();
decoder->init(*input_stream);
}
@ -971,7 +971,7 @@ NamesAndTypesList AvroSchemaReader::readSchema()
}
else
{
auto file_reader_ptr = std::make_unique<avro::DataFileReaderBase>(std::make_unique<InputStreamReadBufferAdapter>(in));
auto file_reader_ptr = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(in));
root_node = file_reader_ptr->dataSchema().root();
}

View File

@ -28,6 +28,23 @@ namespace ErrorCodes
extern const int INCORRECT_DATA;
}
class AvroInputStreamReadBufferAdapter : public avro::InputStream
{
public:
explicit AvroInputStreamReadBufferAdapter(ReadBuffer & in_) : in(in_) {}
bool next(const uint8_t ** data, size_t * len) override;
void backup(size_t len) override;
void skip(size_t len) override;
size_t byteCount() const override;
private:
ReadBuffer & in;
};
class AvroDeserializer
{
public:
@ -185,8 +202,8 @@ public:
NamesAndTypesList readSchema() override;
static DataTypePtr avroNodeToDataType(avro::NodePtr node);
private:
DataTypePtr avroNodeToDataType(avro::NodePtr node);
bool confluent;
const FormatSettings format_settings;

View File

@ -0,0 +1,111 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
# include <Common/logger_useful.h>
# include <Storages/IStorage.h>
# include <filesystem>
# include <fmt/format.h>
# include <IO/S3/URI.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
template <typename Storage, typename Name, typename MetadataParser>
class IStorageDataLake : public Storage
{
public:
using Configuration = typename Storage::Configuration;
// 1. Parses internal file structure of table
// 2. Finds out parts with latest version
// 3. Creates url for underlying StorageS3 enigne to handle reads
IStorageDataLake(
const Configuration & configuration_,
const StorageID & table_id_,
ColumnsDescription columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_)
: Storage(
getAdjustedConfiguration(
context_, Storage::updateConfiguration(context_, configuration_), &Poco::Logger::get("Storage" + String(name))),
table_id_,
columns_,
constraints_,
comment,
context_,
format_settings_)
{
}
static constexpr auto name = Name::name;
String getName() const override { return name; }
static ColumnsDescription getTableStructureFromData(
Configuration & configuration, const std::optional<FormatSettings> & format_settings, ContextPtr ctx)
{
Storage::updateConfiguration(ctx, configuration);
auto new_configuration = getAdjustedConfiguration(ctx, configuration, &Poco::Logger::get("Storage" + String(name)));
return Storage::getTableStructureFromData(new_configuration, format_settings, ctx, /*object_infos*/ nullptr);
}
static Configuration
getAdjustedConfiguration(const ContextPtr & context, const Configuration & configuration, Poco::Logger * log)
{
MetadataParser parser{configuration, context};
auto keys = parser.getFiles();
Configuration new_configuration(configuration);
new_configuration.appendToPath(
std::filesystem::path(Name::data_directory_prefix) / MetadataParser::generateQueryFromKeys(keys, configuration.format));
LOG_DEBUG(log, "Table path: {}, new uri: {}", configuration.url.key, configuration.getPath());
return new_configuration;
}
static Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context)
{
auto configuration = Storage::getConfiguration(engine_args, local_context, false /* get_format_from_file */);
if (configuration.format == "auto")
configuration.format = "Parquet";
return configuration;
}
SinkToStoragePtr write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr /*context*/) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method write is not supported by storage {}", getName());
}
void truncate(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr /*local_context*/,
TableExclusiveLockHolder &) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported by storage {}", getName());
}
NamesAndTypesList getVirtuals() const override { return {}; }
};
}
#endif

View File

@ -0,0 +1,123 @@
#include <config.h>
#if USE_AWS_S3
# include <IO/ReadBufferFromS3.h>
# include <IO/S3/Requests.h>
# include <Interpreters/Context.h>
# include <Storages/S3DataLakeMetadataReadHelper.h>
# include <aws/core/auth/AWSCredentials.h>
# include <aws/s3/S3Client.h>
# include <aws/s3/model/ListObjectsV2Request.h>
namespace DB
{
namespace ErrorCodes
{
extern const int S3_ERROR;
}
std::shared_ptr<ReadBuffer>
S3DataLakeMetadataReadHelper::createReadBuffer(const String & key, ContextPtr context, const StorageS3::Configuration & base_configuration)
{
S3Settings::RequestSettings request_settings;
request_settings.max_single_read_retries = context->getSettingsRef().s3_max_single_read_retries;
return std::make_shared<ReadBufferFromS3>(
base_configuration.client,
base_configuration.url.bucket,
key,
base_configuration.url.version_id,
request_settings,
context->getReadSettings());
}
std::vector<String> S3DataLakeMetadataReadHelper::listFilesMatchSuffix(
const StorageS3::Configuration & base_configuration, const String & directory, const String & suffix)
{
const auto & table_path = base_configuration.url.key;
const auto & bucket = base_configuration.url.bucket;
const auto & client = base_configuration.client;
std::vector<String> res;
S3::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
bool is_finished{false};
request.SetBucket(bucket);
request.SetPrefix(std::filesystem::path(table_path) / directory);
while (!is_finished)
{
outcome = client->ListObjectsV2(request);
if (!outcome.IsSuccess())
throw Exception(
ErrorCodes::S3_ERROR,
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
quoteString(bucket),
quoteString(base_configuration.url.key),
backQuote(outcome.GetError().GetExceptionName()),
quoteString(outcome.GetError().GetMessage()));
const auto & result_batch = outcome.GetResult().GetContents();
for (const auto & obj : result_batch)
{
const auto & filename = obj.GetKey();
if (std::filesystem::path(filename).extension() == suffix)
res.push_back(filename);
}
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
is_finished = !outcome.GetResult().GetIsTruncated();
}
return res;
}
std::vector<String> S3DataLakeMetadataReadHelper::listFiles(const StorageS3::Configuration & configuration)
{
const auto & client = configuration.client;
const auto & table_path = configuration.url.key;
const auto & bucket = configuration.url.bucket;
std::vector<std::string> keys;
S3::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
bool is_finished{false};
request.SetBucket(bucket);
request.SetPrefix(table_path);
while (!is_finished)
{
outcome = client->ListObjectsV2(request);
if (!outcome.IsSuccess())
throw Exception(
ErrorCodes::S3_ERROR,
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
quoteString(bucket),
quoteString(table_path),
backQuote(outcome.GetError().GetExceptionName()),
quoteString(outcome.GetError().GetMessage()));
const auto & result_batch = outcome.GetResult().GetContents();
for (const auto & obj : result_batch)
{
const auto & filename = obj.GetKey().substr(table_path.size()); /// Object name without tablepath prefix.
keys.push_back(filename);
}
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
is_finished = !outcome.GetResult().GetIsTruncated();
}
return keys;
}
}
#endif

View File

@ -0,0 +1,26 @@
#pragma once
#include <config.h>
#if USE_AWS_S3
# include <Storages/StorageS3.h>
class ReadBuffer;
namespace DB
{
struct S3DataLakeMetadataReadHelper
{
static std::shared_ptr<ReadBuffer>
createReadBuffer(const String & key, ContextPtr context, const StorageS3::Configuration & base_configuration);
static std::vector<String>
listFilesMatchSuffix(const StorageS3::Configuration & base_configuration, const String & directory, const String & suffix);
static std::vector<String> listFiles(const StorageS3::Configuration & configuration);
};
}
#endif

View File

@ -4,25 +4,13 @@
#include <Storages/StorageDeltaLake.h>
#include <Common/logger_useful.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadSettings.h>
#include <IO/S3Common.h>
#include <IO/S3/Requests.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/StorageFactory.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Formats/FormatFactory.h>
#include <aws/core/auth/AWSCredentials.h>
#include <QueryPipeline/Pipe.h>
#include <fmt/format.h>
#include <fmt/ranges.h>
#include <ranges>
namespace DB
{
@ -30,7 +18,6 @@ namespace DB
namespace ErrorCodes
{
extern const int S3_ERROR;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int INCORRECT_DATA;
}
@ -57,20 +44,22 @@ std::vector<String> DeltaLakeMetadata::listCurrentFiles() &&
return keys;
}
JsonMetadataGetter::JsonMetadataGetter(const StorageS3::Configuration & configuration_, ContextPtr context)
template <typename Configuration, typename MetadataReadHelper>
DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::DeltaLakeMetadataParser(const Configuration & configuration_, ContextPtr context)
: base_configuration(configuration_)
{
init(context);
}
void JsonMetadataGetter::init(ContextPtr context)
template <typename Configuration, typename MetadataReadHelper>
void DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::init(ContextPtr context)
{
auto keys = getJsonLogFiles();
// read data from every json log file
for (const String & key : keys)
{
auto buf = createS3ReadBuffer(key, context);
auto buf = MetadataReadHelper::createReadBuffer(key, context, base_configuration);
char c;
while (!buf->eof())
@ -94,74 +83,19 @@ void JsonMetadataGetter::init(ContextPtr context)
}
}
std::vector<String> JsonMetadataGetter::getJsonLogFiles() const
template <typename Configuration, typename MetadataReadHelper>
std::vector<String> DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::getJsonLogFiles() const
{
const auto & client = base_configuration.client;
const auto table_path = base_configuration.url.key;
const auto bucket = base_configuration.url.bucket;
std::vector<String> keys;
S3::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
bool is_finished{false};
request.SetBucket(bucket);
/// DeltaLake format stores all metadata json files in _delta_log directory
static constexpr auto deltalake_metadata_directory = "_delta_log";
request.SetPrefix(std::filesystem::path(table_path) / deltalake_metadata_directory);
static constexpr auto meta_file_suffix = ".json";
while (!is_finished)
{
outcome = client->ListObjectsV2(request);
if (!outcome.IsSuccess())
throw Exception(
ErrorCodes::S3_ERROR,
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
quoteString(bucket),
quoteString(table_path),
backQuote(outcome.GetError().GetExceptionName()),
quoteString(outcome.GetError().GetMessage()));
const auto & result_batch = outcome.GetResult().GetContents();
for (const auto & obj : result_batch)
{
const auto & filename = obj.GetKey();
// DeltaLake metadata files have json extension
if (std::filesystem::path(filename).extension() == ".json")
keys.push_back(filename);
}
/// Needed in case any more results are available
/// if so, we will continue reading, and not read keys that were already read
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
/// Set to false if all of the results were returned. Set to true if more keys
/// are available to return. If the number of results exceeds that specified by
/// MaxKeys, all of the results might not be returned
is_finished = !outcome.GetResult().GetIsTruncated();
}
return keys;
return MetadataReadHelper::listFilesMatchSuffix(base_configuration, deltalake_metadata_directory, meta_file_suffix);
}
std::shared_ptr<ReadBuffer> JsonMetadataGetter::createS3ReadBuffer(const String & key, ContextPtr context)
{
/// TODO: add parallel downloads
S3Settings::RequestSettings request_settings;
request_settings.max_single_read_retries = 10;
return std::make_shared<ReadBufferFromS3>(
base_configuration.client,
base_configuration.url.bucket,
key,
base_configuration.url.version_id,
request_settings,
context->getReadSettings());
}
void JsonMetadataGetter::handleJSON(const JSON & json)
template <typename Configuration, typename MetadataReadHelper>
void DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::handleJSON(const JSON & json)
{
if (json.has("add"))
{
@ -179,101 +113,31 @@ void JsonMetadataGetter::handleJSON(const JSON & json)
}
}
namespace
{
// DeltaLake stores data in parts in different files
// keys is vector of parts with latest version
// generateQueryFromKeys constructs query from parts filenames for
// underlying StorageS3 engine
String generateQueryFromKeys(const std::vector<String> & keys)
template <typename Configuration, typename MetadataReadHelper>
String DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::generateQueryFromKeys(const std::vector<String> & keys, const String &)
{
std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ","));
return new_query;
}
template DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::DeltaLakeMetadataParser(
const StorageS3::Configuration & configuration_, ContextPtr context);
StorageS3::Configuration getAdjustedS3Configuration(
const ContextPtr & context, const StorageS3::Configuration & configuration, Poco::Logger * log)
{
JsonMetadataGetter getter{configuration, context};
const auto keys = getter.getFiles();
const auto new_uri = configuration.url.uri.toString() + generateQueryFromKeys(keys);
template std::vector<String> DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFiles();
// set new url in configuration
StorageS3::Configuration new_configuration(configuration);
new_configuration.url = S3::URI(new_uri);
template String DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::generateQueryFromKeys(
const std::vector<String> & keys, const String & format);
LOG_DEBUG(log, "Table path: {}, new uri: {}", configuration.url.key, new_uri);
return new_configuration;
}
template void DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::init(ContextPtr context);
}
template std::vector<String> DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getJsonLogFiles() const;
StorageDeltaLake::StorageDeltaLake(
const StorageS3::Configuration & configuration_,
const StorageID & table_id_,
ColumnsDescription columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_)
: IStorage(table_id_)
, base_configuration{configuration_}
, log(&Poco::Logger::get("StorageDeltaLake (" + table_id_.table_name + ")"))
, table_path(base_configuration.url.key)
{
StorageInMemoryMetadata storage_metadata;
StorageS3::updateS3Configuration(context_, base_configuration);
auto new_configuration = getAdjustedS3Configuration(context_, base_configuration, log);
if (columns_.empty())
{
columns_ = StorageS3::getTableStructureFromData(
new_configuration, format_settings_, context_, nullptr);
storage_metadata.setColumns(columns_);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
s3engine = std::make_shared<StorageS3>(
new_configuration,
table_id_,
columns_,
constraints_,
comment,
context_,
format_settings_,
/* distributed_processing_ */ false,
nullptr);
}
Pipe StorageDeltaLake::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
{
StorageS3::updateS3Configuration(context, base_configuration);
return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
}
ColumnsDescription StorageDeltaLake::getTableStructureFromData(
StorageS3::Configuration & configuration, const std::optional<FormatSettings> & format_settings, ContextPtr ctx)
{
StorageS3::updateS3Configuration(ctx, configuration);
auto new_configuration = getAdjustedS3Configuration(ctx, configuration, &Poco::Logger::get("StorageDeltaLake"));
return StorageS3::getTableStructureFromData(new_configuration, format_settings, ctx, /*object_infos*/ nullptr);
}
template void DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::handleJSON(const JSON & json);
void registerStorageDeltaLake(StorageFactory & factory)
{
@ -281,31 +145,15 @@ void registerStorageDeltaLake(StorageFactory & factory)
"DeltaLake",
[](const StorageFactory::Arguments & args)
{
auto & engine_args = args.engine_args;
if (engine_args.empty() || engine_args.size() < 3)
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage DeltaLake requires 3 to 4 arguments: table_url, access_key, secret_access_key, [format]");
StorageS3::Configuration configuration = StorageDeltaLake::getConfiguration(args.engine_args, args.getLocalContext());
StorageS3::Configuration configuration;
configuration.url = S3::URI(checkAndGetLiteralArgument<String>(engine_args[0], "url"));
configuration.auth_settings.access_key_id = checkAndGetLiteralArgument<String>(engine_args[1], "access_key_id");
configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(engine_args[2], "secret_access_key");
if (engine_args.size() == 4)
configuration.format = checkAndGetLiteralArgument<String>(engine_args[3], "format");
else
{
/// DeltaLake uses Parquet by default.
configuration.format = "Parquet";
}
auto format_settings = getFormatSettings(args.getContext());
return std::make_shared<StorageDeltaLake>(
configuration, args.table_id, args.columns, args.constraints, args.comment, args.getContext(), std::nullopt);
configuration, args.table_id, args.columns, args.constraints, args.comment, args.getContext(), format_settings);
},
{
.supports_settings = true,
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});

View File

@ -4,17 +4,12 @@
#if USE_AWS_S3
# include <Storages/IStorage.h>
# include <Storages/IStorageDataLake.h>
# include <Storages/S3DataLakeMetadataReadHelper.h>
# include <Storages/StorageS3.h>
# include <unordered_map>
# include <base/JSON.h>
namespace Poco
{
class Logger;
}
namespace DB
{
@ -34,65 +29,35 @@ private:
};
// class to get deltalake log json files and read json from them
class JsonMetadataGetter
template <typename Configuration, typename MetadataReadHelper>
class DeltaLakeMetadataParser
{
public:
JsonMetadataGetter(const StorageS3::Configuration & configuration_, ContextPtr context);
DeltaLakeMetadataParser(const Configuration & configuration_, ContextPtr context);
std::vector<String> getFiles() { return std::move(metadata).listCurrentFiles(); }
static String generateQueryFromKeys(const std::vector<String> & keys, const String & format);
private:
void init(ContextPtr context);
std::vector<String> getJsonLogFiles() const;
std::shared_ptr<ReadBuffer> createS3ReadBuffer(const String & key, ContextPtr context);
void handleJSON(const JSON & json);
StorageS3::Configuration base_configuration;
Configuration base_configuration;
DeltaLakeMetadata metadata;
};
class StorageDeltaLake : public IStorage
struct StorageDeltaLakeName
{
public:
// 1. Parses internal file structure of table
// 2. Finds out parts with latest version
// 3. Creates url for underlying StorageS3 enigne to handle reads
StorageDeltaLake(
const StorageS3::Configuration & configuration_,
const StorageID & table_id_,
ColumnsDescription columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_);
String getName() const override { return "DeltaLake"; }
// Reads latest version of DeltaLake table
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams) override;
static ColumnsDescription getTableStructureFromData(
StorageS3::Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx);
private:
StorageS3::Configuration base_configuration;
std::shared_ptr<StorageS3> s3engine;
Poco::Logger * log;
String table_path;
static constexpr auto name = "DeltaLake";
static constexpr auto data_directory_prefix = "";
};
using StorageDeltaLake
= IStorageDataLake<StorageS3, StorageDeltaLakeName, DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>;
}
#endif

View File

@ -6,12 +6,7 @@
#include <Common/logger_useful.h>
#include <Formats/FormatFactory.h>
#include <IO/S3Common.h>
#include <IO/S3/Requests.h>
#include <IO/ReadHelpers.h>
#include <Storages/StorageFactory.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <aws/core/auth/AWSCredentials.h>
#include <QueryPipeline/Pipe.h>
@ -22,20 +17,23 @@ namespace DB
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int S3_ERROR;
extern const int LOGICAL_ERROR;
}
namespace
template <typename Configuration, typename MetadataReadHelper>
HudiMetadataParser<Configuration, MetadataReadHelper>::HudiMetadataParser(const Configuration & configuration_, ContextPtr context_)
: configuration(configuration_), context(context_), log(&Poco::Logger::get("StorageHudi"))
{
}
/// Apache Hudi store parts of data in different files.
/// Every part file has timestamp in it.
/// Every partition(directory) in Apache Hudi has different versions of part.
/// To find needed parts we need to find out latest part file for every partition.
/// Part format is usually parquet, but can differ.
String generateQueryFromKeys(const std::vector<std::string> & keys, const String & format)
template <typename Configuration, typename MetadataReadHelper>
String HudiMetadataParser<Configuration, MetadataReadHelper>::generateQueryFromKeys(const std::vector<std::string> & keys, const String & format)
{
/// For each partition path take only latest file.
struct FileInfo
@ -86,125 +84,19 @@ String generateQueryFromKeys(const std::vector<std::string> & keys, const String
return "{" + list_of_keys + "}";
}
std::vector<std::string> getKeysFromS3(const StorageS3::Configuration & configuration, Poco::Logger * log)
template <typename Configuration, typename MetadataReadHelper>
std::vector<std::string> HudiMetadataParser<Configuration, MetadataReadHelper>::getFiles() const
{
const auto & client = configuration.client;
const auto & table_path = configuration.url.key;
const auto & bucket = configuration.url.bucket;
std::vector<std::string> keys;
S3::ListObjectsV2Request request;
Aws::S3::Model::ListObjectsV2Outcome outcome;
bool is_finished{false};
request.SetBucket(bucket);
request.SetPrefix(table_path);
while (!is_finished)
{
outcome = client->ListObjectsV2(request);
if (!outcome.IsSuccess())
throw Exception(
ErrorCodes::S3_ERROR,
"Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}",
quoteString(bucket),
quoteString(table_path),
backQuote(outcome.GetError().GetExceptionName()),
quoteString(outcome.GetError().GetMessage()));
const auto & result_batch = outcome.GetResult().GetContents();
for (const auto & obj : result_batch)
{
const auto & filename = obj.GetKey().substr(table_path.size()); /// Object name without tablepath prefix.
keys.push_back(filename);
LOG_DEBUG(log, "Found file: {}", filename);
}
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
is_finished = !outcome.GetResult().GetIsTruncated();
}
return keys;
return MetadataReadHelper::listFiles(configuration);
}
template HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::HudiMetadataParser(
const StorageS3::Configuration & configuration_, ContextPtr context_);
StorageS3::Configuration getAdjustedS3Configuration(const StorageS3::Configuration & configuration, Poco::Logger * log)
{
const auto keys = getKeysFromS3(configuration, log);
const auto new_uri = configuration.url.uri.toString() + generateQueryFromKeys(keys, configuration.format);
template std::vector<String> HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFiles() const;
StorageS3::Configuration new_configuration(configuration);
new_configuration.url = S3::URI(new_uri);
LOG_DEBUG(log, "Table path: {}, new uri: {}", configuration.url.key, new_uri);
return new_configuration;
}
}
StorageHudi::StorageHudi(
const StorageS3::Configuration & configuration_,
const StorageID & table_id_,
ColumnsDescription columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_)
: IStorage(table_id_)
, base_configuration{configuration_}
, log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")"))
{
StorageInMemoryMetadata storage_metadata;
StorageS3::updateS3Configuration(context_, base_configuration);
auto new_configuration = getAdjustedS3Configuration(base_configuration, log);
if (columns_.empty())
{
columns_ = StorageS3::getTableStructureFromData(
new_configuration, format_settings_, context_, nullptr);
storage_metadata.setColumns(columns_);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
s3engine = std::make_shared<StorageS3>(
new_configuration,
table_id_,
columns_,
constraints_,
comment,
context_,
format_settings_,
/* distributed_processing_ */ false,
nullptr);
}
Pipe StorageHudi::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
{
StorageS3::updateS3Configuration(context, base_configuration);
return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
}
ColumnsDescription StorageHudi::getTableStructureFromData(
StorageS3::Configuration & configuration, const std::optional<FormatSettings> & format_settings, ContextPtr ctx)
{
StorageS3::updateS3Configuration(ctx, configuration);
auto new_configuration = getAdjustedS3Configuration(configuration, &Poco::Logger::get("StorageDeltaLake"));
return StorageS3::getTableStructureFromData(new_configuration, format_settings, ctx, /*object_infos*/ nullptr);
}
template String HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::generateQueryFromKeys(
const std::vector<String> & keys, const String & format);
void registerStorageHudi(StorageFactory & factory)
{
@ -212,25 +104,7 @@ void registerStorageHudi(StorageFactory & factory)
"Hudi",
[](const StorageFactory::Arguments & args)
{
auto & engine_args = args.engine_args;
if (engine_args.empty() || engine_args.size() < 3)
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage Hudi requires 3 to 4 arguments: table_url, access_key, secret_access_key, [format]");
StorageS3::Configuration configuration;
configuration.url = S3::URI(checkAndGetLiteralArgument<String>(engine_args[0], "url"));
configuration.auth_settings.access_key_id = checkAndGetLiteralArgument<String>(engine_args[1], "access_key_id");
configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(engine_args[2], "secret_access_key");
if (engine_args.size() == 4)
configuration.format = checkAndGetLiteralArgument<String>(engine_args[3], "format");
else
{
// Apache Hudi uses Parquet by default
configuration.format = "Parquet";
}
StorageS3::Configuration configuration = StorageHudi::getConfiguration(args.engine_args, args.getLocalContext());
auto format_settings = getFormatSettings(args.getContext());
@ -238,7 +112,7 @@ void registerStorageHudi(StorageFactory & factory)
configuration, args.table_id, args.columns, args.constraints, args.comment, args.getContext(), format_settings);
},
{
.supports_settings = true,
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});

View File

@ -5,55 +5,38 @@
#if USE_AWS_S3
# include <Storages/IStorage.h>
# include <Storages/IStorageDataLake.h>
# include <Storages/S3DataLakeMetadataReadHelper.h>
# include <Storages/StorageS3.h>
namespace Poco
{
class Logger;
}
namespace DB
{
class StorageHudi : public IStorage
template <typename Configuration, typename MetadataReadHelper>
class HudiMetadataParser
{
public:
/// 1. Parses internal file structure of table.
/// 2. Finds out parts with latest version.
/// 3. Creates url for underlying StorageS3 enigne to handle reads.
StorageHudi(
const StorageS3::Configuration & configuration_,
const StorageID & table_id_,
ColumnsDescription columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_);
HudiMetadataParser(const Configuration & configuration_, ContextPtr context_);
String getName() const override { return "Hudi"; }
std::vector<String> getFiles() const;
/// Reads latest version of Apache Hudi table
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams) override;
static ColumnsDescription getTableStructureFromData(
StorageS3::Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx);
static String generateQueryFromKeys(const std::vector<String> & keys, const String & format);
private:
StorageS3::Configuration base_configuration;
std::shared_ptr<StorageS3> s3engine;
Configuration configuration;
ContextPtr context;
Poco::Logger * log;
};
struct StorageHudiName
{
static constexpr auto name = "Hudi";
static constexpr auto data_directory_prefix = "";
};
using StorageHudi
= IStorageDataLake<StorageS3, StorageHudiName, HudiMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>;
}
#endif

View File

@ -0,0 +1,262 @@
#include "config.h"
#if USE_AWS_S3
# include <Storages/StorageIceberg.h>
# include <Common/logger_useful.h>
# include <Columns/ColumnString.h>
# include <Columns/ColumnTuple.h>
# include <Columns/IColumn.h>
# include <Storages/StorageFactory.h>
# include <Formats/FormatFactory.h>
# include <fmt/format.h>
# include <Processors/Formats/Impl/AvroRowInputFormat.h>
# include <Poco/JSON/Array.h>
# include <Poco/JSON/Object.h>
# include <Poco/JSON/Parser.h>
namespace DB
{
namespace ErrorCodes
{
extern const int S3_ERROR;
extern const int FILE_DOESNT_EXIST;
extern const int ILLEGAL_COLUMN;
}
template <typename Configuration, typename MetadataReadHelper>
IcebergMetadataParser<Configuration, MetadataReadHelper>::IcebergMetadataParser(const Configuration & configuration_, ContextPtr context_)
: base_configuration(configuration_), context(context_)
{
}
template <typename Configuration, typename MetadataReadHelper>
std::vector<String> IcebergMetadataParser<Configuration, MetadataReadHelper>::getFiles() const
{
auto metadata = fetchMetadataFile();
auto manifest_list = getManifestList(metadata);
/// When table first created and does not have any data
if (manifest_list.empty())
{
return {};
}
auto manifest_files = getManifestFiles(manifest_list);
return getFilesForRead(manifest_files);
}
template <typename Configuration, typename MetadataReadHelper>
String IcebergMetadataParser<Configuration, MetadataReadHelper>::fetchMetadataFile() const
{
/// Iceberg stores all the metadata.json in metadata directory, and the
/// newest version has the max version name, so we should list all of them,
/// then find the newest metadata.
static constexpr auto meta_file_suffix = ".json";
auto metadata_files = MetadataReadHelper::listFilesMatchSuffix(base_configuration, metadata_directory, meta_file_suffix);
if (metadata_files.empty())
throw Exception(
ErrorCodes::FILE_DOESNT_EXIST, "The metadata file for Iceberg table with path {} doesn't exist", base_configuration.url.key);
/// See comment above
auto it = std::max_element(metadata_files.begin(), metadata_files.end());
return *it;
}
template <typename Configuration, typename MetadataReadHelper>
String IcebergMetadataParser<Configuration, MetadataReadHelper>::getManifestList(const String & metadata_name) const
{
auto buffer = MetadataReadHelper::createReadBuffer(metadata_name, context, base_configuration);
String json_str;
readJSONObjectPossiblyInvalid(json_str, *buffer);
/// Looks like base/base/JSON.h can not parse this json file
Poco::JSON::Parser parser;
Poco::Dynamic::Var json = parser.parse(json_str);
Poco::JSON::Object::Ptr object = json.extract<Poco::JSON::Object::Ptr>();
auto current_snapshot_id = object->getValue<Int64>("current-snapshot-id");
auto snapshots = object->get("snapshots").extract<Poco::JSON::Array::Ptr>();
for (size_t i = 0; i < snapshots->size(); ++i)
{
auto snapshot = snapshots->getObject(static_cast<UInt32>(i));
if (snapshot->getValue<Int64>("snapshot-id") == current_snapshot_id)
{
auto path = snapshot->getValue<String>("manifest-list");
return std::filesystem::path(base_configuration.url.key) / metadata_directory / std::filesystem::path(path).filename();
}
}
return {};
}
static MutableColumns
parseAvro(const std::unique_ptr<avro::DataFileReaderBase> & file_reader, const DataTypePtr & data_type, const String & field_name)
{
auto deserializer = std::make_unique<AvroDeserializer>(
Block{{data_type->createColumn(), data_type, field_name}}, file_reader->dataSchema(), true, true);
file_reader->init();
MutableColumns columns;
columns.emplace_back(data_type->createColumn());
RowReadExtension ext;
while (file_reader->hasMore())
{
file_reader->decr();
deserializer->deserializeRow(columns, file_reader->decoder(), ext);
}
return columns;
}
template <typename Configuration, typename MetadataReadHelper>
std::vector<String> IcebergMetadataParser<Configuration, MetadataReadHelper>::getManifestFiles(const String & manifest_list) const
{
auto buffer = MetadataReadHelper::createReadBuffer(manifest_list, context, base_configuration);
auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buffer));
static constexpr auto manifest_path = "manifest_path";
/// The manifest_path is the first field in manifest list file,
/// And its have String data type
/// {'manifest_path': 'xxx', ...}
auto data_type = AvroSchemaReader::avroNodeToDataType(file_reader->dataSchema().root()->leafAt(0));
auto columns = parseAvro(file_reader, data_type, manifest_path);
auto & col = columns.at(0);
std::vector<String> res;
if (col->getDataType() == TypeIndex::String)
{
const auto * col_str = typeid_cast<ColumnString *>(col.get());
size_t col_size = col_str->size();
for (size_t i = 0; i < col_size; ++i)
{
auto file_path = col_str->getDataAt(i).toView();
/// We just need obtain the file name
std::filesystem::path path(file_path);
res.emplace_back(std::filesystem::path(base_configuration.url.key) / metadata_directory / path.filename());
}
return res;
}
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `manifest_path` field should be String type, got {}",
col->getFamilyName());
}
template <typename Configuration, typename MetadataReadHelper>
std::vector<String> IcebergMetadataParser<Configuration, MetadataReadHelper>::getFilesForRead(const std::vector<String> & manifest_files) const
{
std::vector<String> keys;
for (const auto & manifest_file : manifest_files)
{
auto buffer = MetadataReadHelper::createReadBuffer(manifest_file, context, base_configuration);
auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buffer));
static constexpr auto manifest_path = "data_file";
/// The data_file filed at the 3rd position of the manifest file:
/// {'status': xx, 'snapshot_id': xx, 'data_file': {'file_path': 'xxx', ...}, ...}
/// and it's also a nested record, so its result type is a nested Tuple
auto data_type = AvroSchemaReader::avroNodeToDataType(file_reader->dataSchema().root()->leafAt(2));
auto columns = parseAvro(file_reader, data_type, manifest_path);
auto & col = columns.at(0);
if (col->getDataType() == TypeIndex::Tuple)
{
auto * col_tuple = typeid_cast<ColumnTuple *>(col.get());
auto & col_str = col_tuple->getColumnPtr(0);
if (col_str->getDataType() == TypeIndex::String)
{
const auto * str_col = typeid_cast<const ColumnString *>(col_str.get());
size_t col_size = str_col->size();
for (size_t i = 0; i < col_size; ++i)
{
auto file_path = str_col->getDataAt(i).toView();
/// We just obtain the partition/file name
std::filesystem::path path(file_path);
keys.emplace_back(path.parent_path().filename() / path.filename());
}
}
else
{
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `file_path` field should be String type, got {}",
col_str->getFamilyName());
}
}
else
{
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `data_file` field should be Tuple type, got {}",
col->getFamilyName());
}
}
return keys;
}
// generateQueryFromKeys constructs query from all parquet filenames
// for underlying StorageS3 engine
template <typename Configuration, typename MetadataReadHelper>
String IcebergMetadataParser<Configuration, MetadataReadHelper>::generateQueryFromKeys(const std::vector<String> & keys, const String &)
{
std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ","));
return new_query;
}
template IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::IcebergMetadataParser(
const StorageS3::Configuration & configuration_, ContextPtr context_);
template std::vector<String> IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFiles() const;
template String IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::generateQueryFromKeys(
const std::vector<String> & keys, const String & format);
template String IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::fetchMetadataFile() const;
template String IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getManifestList(const String & metadata_name) const;
template std::vector<String>
IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getManifestFiles(const String & manifest_list) const;
template std::vector<String>
IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::getFilesForRead(const std::vector<String> & manifest_files) const;
void registerStorageIceberg(StorageFactory & factory)
{
factory.registerStorage(
"Iceberg",
[](const StorageFactory::Arguments & args)
{
auto & engine_args = args.engine_args;
StorageS3::Configuration configuration = StorageIceberg::getConfiguration(engine_args, args.getLocalContext());
auto format_settings = getFormatSettings(args.getContext());
return std::make_shared<StorageIceberg>(
configuration, args.table_id, args.columns, args.constraints, args.comment, args.getContext(), format_settings);
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
}
}
#endif

View File

@ -0,0 +1,51 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
# include <Storages/IStorageDataLake.h>
# include <Storages/S3DataLakeMetadataReadHelper.h>
# include <Storages/StorageS3.h>
namespace DB
{
// Class to parse iceberg metadata and find files needed for query in table
// Iceberg table directory outlook:
// table/
// data/
// metadata/
// The metadata has three layers: metadata -> manifest list -> manifest files
template <typename Configuration, typename MetadataReadHelper>
class IcebergMetadataParser
{
public:
IcebergMetadataParser(const Configuration & configuration_, ContextPtr context_);
std::vector<String> getFiles() const;
static String generateQueryFromKeys(const std::vector<String> & keys, const String & format);
private:
static constexpr auto metadata_directory = "metadata";
Configuration base_configuration;
ContextPtr context;
String fetchMetadataFile() const;
String getManifestList(const String & metadata_name) const;
std::vector<String> getManifestFiles(const String & manifest_list) const;
std::vector<String> getFilesForRead(const std::vector<String> & manifest_files) const;
};
struct StorageIcebergName
{
static constexpr auto name = "Iceberg";
static constexpr auto data_directory_prefix = "data";
};
using StorageIceberg
= IStorageDataLake<StorageS3, StorageIcebergName, IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>;
}
#endif

View File

@ -954,7 +954,7 @@ StorageS3::StorageS3(
context_->getGlobalContext()->getRemoteHostFilter().checkURL(s3_configuration.url.uri);
StorageInMemoryMetadata storage_metadata;
updateS3Configuration(context_, s3_configuration);
updateConfiguration(context_, s3_configuration);
if (columns_.empty())
{
auto columns = getTableStructureFromDataImpl(
@ -1040,7 +1040,7 @@ Pipe StorageS3::read(
if (partition_by && has_wildcards)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Reading from a partitioned S3 storage is not implemented yet");
updateS3Configuration(local_context, s3_configuration);
updateConfiguration(local_context, s3_configuration);
Pipes pipes;
@ -1115,7 +1115,7 @@ Pipe StorageS3::read(
SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
updateS3Configuration(local_context, s3_configuration);
updateConfiguration(local_context, s3_configuration);
auto sample_block = metadata_snapshot->getSampleBlock();
auto chosen_compression_method = chooseCompressionMethod(keys.back(), compression_method);
@ -1185,7 +1185,7 @@ SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr
void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &)
{
updateS3Configuration(local_context, s3_configuration);
updateConfiguration(local_context, s3_configuration);
if (is_key_with_globs)
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,
@ -1217,7 +1217,14 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
}
void StorageS3::updateS3Configuration(ContextPtr ctx, StorageS3::Configuration & upd)
StorageS3::Configuration StorageS3::updateConfiguration(ContextPtr local_context, const StorageS3::Configuration & configuration)
{
StorageS3::Configuration new_configuration(configuration);
updateConfiguration(local_context, new_configuration);
return new_configuration;
}
void StorageS3::updateConfiguration(ContextPtr ctx, StorageS3::Configuration & upd)
{
auto settings = ctx->getStorageS3Settings().getSettings(upd.url.uri.toString());
upd.request_settings = settings.request_settings;
@ -1283,7 +1290,7 @@ void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configur
configuration.request_settings = S3Settings::RequestSettings(collection);
}
StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPtr local_context)
StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPtr local_context, bool get_format_from_file)
{
StorageS3::Configuration configuration;
@ -1360,7 +1367,7 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
configuration.static_configuration = !configuration.auth_settings.access_key_id.empty();
if (configuration.format == "auto")
if (configuration.format == "auto" && get_format_from_file)
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.url.key, true);
return configuration;
@ -1372,7 +1379,7 @@ ColumnsDescription StorageS3::getTableStructureFromData(
ContextPtr ctx,
ObjectInfos * object_infos)
{
updateS3Configuration(ctx, configuration);
updateConfiguration(ctx, configuration);
return getTableStructureFromDataImpl(
configuration.format, configuration, configuration.compression_method,
configuration.url.key.find_first_of("*?{") != std::string::npos, format_settings, ctx, object_infos);

View File

@ -19,7 +19,6 @@
#include <IO/CompressionMethod.h>
#include <Interpreters/Context.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/Cache/SchemaCache.h>
#include <Storages/StorageConfiguration.h>
@ -250,6 +249,10 @@ public:
bool static_configuration = true;
/// Headers from ast is a part of static configuration.
HTTPHeaderEntries headers_from_ast;
void appendToPath(const String & suffix) { url = S3::URI{std::filesystem::path(url.uri.toString()) / suffix}; }
String getPath() const { return url.uri.toString(); } /// For logging
};
StorageS3(
@ -285,25 +288,27 @@ public:
bool supportsPartitionBy() const override;
static StorageS3::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context);
using ObjectInfos = StorageS3Source::ObjectInfos;
static void processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection);
static SchemaCache & getSchemaCache(const ContextPtr & ctx);
static StorageS3::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context, bool get_format_from_file = true);
static ColumnsDescription getTableStructureFromData(
StorageS3::Configuration & configuration,
const std::optional<FormatSettings> & format_settings,
ContextPtr ctx,
ObjectInfos * object_infos = nullptr);
static void processNamedCollectionResult(StorageS3::Configuration & configuration, const NamedCollection & collection);
static SchemaCache & getSchemaCache(const ContextPtr & ctx);
protected:
static StorageS3::Configuration updateConfiguration(ContextPtr local_context, const Configuration & configuration);
static void updateConfiguration(ContextPtr, Configuration &);
private:
friend class StorageS3Cluster;
friend class TableFunctionS3Cluster;
friend class StorageHudi;
friend class StorageDeltaLake;
Configuration s3_configuration;
std::vector<String> keys;
@ -320,8 +325,6 @@ private:
ObjectInfos object_infos;
static void updateS3Configuration(ContextPtr, Configuration &);
static std::shared_ptr<StorageS3Source::IIterator> createFileIterator(
const Configuration & s3_configuration,
const std::vector<String> & keys,

View File

@ -56,7 +56,7 @@ StorageS3Cluster::StorageS3Cluster(
{
context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration_.url.uri);
StorageInMemoryMetadata storage_metadata;
StorageS3::updateS3Configuration(context_, s3_configuration);
StorageS3::updateConfiguration(context_, s3_configuration);
if (columns_.empty())
{
@ -96,7 +96,7 @@ Pipe StorageS3Cluster::read(
size_t /*max_block_size*/,
size_t /*num_streams*/)
{
StorageS3::updateS3Configuration(context, s3_configuration);
StorageS3::updateConfiguration(context, s3_configuration);
auto cluster = getCluster(context);
auto extension = getTaskIteratorExtension(query_info.query, context);

View File

@ -35,6 +35,7 @@ void registerStorageCOS(StorageFactory & factory);
void registerStorageOSS(StorageFactory & factory);
void registerStorageHudi(StorageFactory & factory);
void registerStorageDeltaLake(StorageFactory & factory);
void registerStorageIceberg(StorageFactory & factory);
#endif
#if USE_HDFS
@ -124,7 +125,8 @@ void registerStorages()
registerStorageOSS(factory);
registerStorageHudi(factory);
registerStorageDeltaLake(factory);
#endif
registerStorageIceberg(factory);
#endif
#if USE_HDFS
registerStorageHDFS(factory);

View File

@ -0,0 +1,84 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
# include <Access/Common/AccessFlags.h>
# include <Formats/FormatFactory.h>
# include <Interpreters/Context.h>
# include <Interpreters/parseColumnsListForTableFunction.h>
# include <Storages/IStorage.h>
# include <TableFunctions/ITableFunction.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
template <typename Name, typename Storage, typename TableFunction>
class ITableFunctionDataLake : public ITableFunction
{
public:
static constexpr auto name = Name::name;
std::string getName() const override
{
return name;
}
protected:
StoragePtr
executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/)
const override
{
ColumnsDescription columns;
if (configuration.structure != "auto")
columns = parseColumnsListFromString(configuration.structure, context);
StoragePtr storage = std::make_shared<Storage>(
configuration, StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription{}, String{}, context, std::nullopt);
storage->startup();
return storage;
}
const char * getStorageTypeName() const override { return Storage::name; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override
{
if (configuration.structure == "auto")
{
context->checkAccess(getSourceAccessType());
return Storage::getTableStructureFromData(configuration, std::nullopt, context);
}
return parseColumnsListFromString(configuration.structure, context);
}
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override
{
/// Parse args
ASTs & args_func = ast_function->children;
const auto message
= fmt::format("The signature of table function '{}' could be the following:\n{}", getName(), TableFunction::signature);
if (args_func.size() != 1)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' must have arguments", getName());
auto & args = args_func.at(0)->children;
TableFunction::parseArgumentsImpl(message, args, context, configuration, false);
if (configuration.format == "auto")
configuration.format = "Parquet";
}
mutable typename Storage::Configuration configuration;
};
}
#endif

View File

@ -2,157 +2,21 @@
#if USE_AWS_S3
# include <filesystem>
# include <Access/Common/AccessFlags.h>
# include <Formats/FormatFactory.h>
# include <IO/S3Common.h>
# include <Interpreters/Context.h>
# include <Interpreters/evaluateConstantExpression.h>
# include <Interpreters/parseColumnsListForTableFunction.h>
# include <Parsers/ASTLiteral.h>
# include <Storages/StorageDeltaLake.h>
# include <Storages/StorageURL.h>
# include <Storages/checkAndGetLiteralArgument.h>
# include <TableFunctions/TableFunctionDeltaLake.h>
# include <TableFunctions/ITableFunctionDataLake.h>
# include <TableFunctions/TableFunctionFactory.h>
# include <TableFunctions/TableFunctionS3.h>
# include "registerTableFunctions.h"
namespace DB
{
namespace ErrorCodes
struct TableFunctionDeltaLakeName
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
void TableFunctionDeltaLake::parseArgumentsImpl(
const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & base_configuration)
{
if (args.empty() || args.size() > 6)
throw Exception::createDeprecated(error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
auto * header_it = StorageURL::collectHeaders(args, base_configuration.headers_from_ast, context);
if (header_it != args.end())
args.erase(header_it);
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
/// Size -> argument indexes
static auto size_to_args = std::map<size_t, std::map<String, size_t>>{
{1, {{}}},
{2, {{"format", 1}}},
{5, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}}},
{6, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}, {"compression_method", 5}}}};
std::map<String, size_t> args_to_idx;
/// For 4 arguments we support 2 possible variants:
/// deltaLake(source, format, structure, compression_method) and deltaLake(source, access_key_id, access_key_id, format)
/// We can distinguish them by looking at the 2-nd argument: check if it's a format name or not.
if (args.size() == 4)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/access_key_id");
if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
args_to_idx = {{"format", 1}, {"structure", 2}, {"compression_method", 3}};
else
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}};
}
/// For 3 arguments we support 2 possible variants:
/// deltaLake(source, format, structure) and deltaLake(source, access_key_id, access_key_id)
/// We can distinguish them by looking at the 2-nd argument: check if it's a format name or not.
else if (args.size() == 3)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/access_key_id");
if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
args_to_idx = {{"format", 1}, {"structure", 2}};
else
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}};
}
else
{
args_to_idx = size_to_args[args.size()];
}
/// This argument is always the first
base_configuration.url = S3::URI(checkAndGetLiteralArgument<String>(args[0], "url"));
if (args_to_idx.contains("format"))
base_configuration.format = checkAndGetLiteralArgument<String>(args[args_to_idx["format"]], "format");
else
base_configuration.format = "Parquet";
if (args_to_idx.contains("structure"))
base_configuration.structure = checkAndGetLiteralArgument<String>(args[args_to_idx["structure"]], "structure");
if (args_to_idx.contains("compression_method"))
base_configuration.compression_method
= checkAndGetLiteralArgument<String>(args[args_to_idx["compression_method"]], "compression_method");
if (args_to_idx.contains("access_key_id"))
base_configuration.auth_settings.access_key_id
= checkAndGetLiteralArgument<String>(args[args_to_idx["access_key_id"]], "access_key_id");
if (args_to_idx.contains("secret_access_key"))
base_configuration.auth_settings.secret_access_key
= checkAndGetLiteralArgument<String>(args[args_to_idx["secret_access_key"]], "secret_access_key");
}
void TableFunctionDeltaLake::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{
/// Parse args
ASTs & args_func = ast_function->children;
const auto message = fmt::format(
"The signature of table function {} could be the following:\n" \
" - url\n" \
" - url, format\n" \
" - url, format, structure\n" \
" - url, access_key_id, secret_access_key\n" \
" - url, format, structure, compression_method\n" \
" - url, access_key_id, secret_access_key, format\n" \
" - url, access_key_id, secret_access_key, format, structure\n" \
" - url, access_key_id, secret_access_key, format, structure, compression_method",
getName());
if (args_func.size() != 1)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' must have arguments", getName());
auto & args = args_func.at(0)->children;
parseArgumentsImpl(message, args, context, configuration);
}
ColumnsDescription TableFunctionDeltaLake::getActualTableStructure(ContextPtr context) const
{
if (configuration.structure == "auto")
{
context->checkAccess(getSourceAccessType());
return StorageDeltaLake::getTableStructureFromData(configuration, std::nullopt, context);
}
return parseColumnsListFromString(configuration.structure, context);
}
StoragePtr TableFunctionDeltaLake::executeImpl(
const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
S3::URI s3_uri(configuration.url);
ColumnsDescription columns;
if (configuration.structure != "auto")
columns = parseColumnsListFromString(configuration.structure, context);
StoragePtr storage = std::make_shared<StorageDeltaLake>(
configuration, StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription{}, String{}, context, std::nullopt);
storage->startup();
return storage;
}
static constexpr auto name = "deltaLake";
};
using TableFunctionDeltaLake = ITableFunctionDataLake<TableFunctionDeltaLakeName, StorageDeltaLake, TableFunctionS3>;
void registerTableFunctionDeltaLake(TableFunctionFactory & factory)
{

View File

@ -1,44 +0,0 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include <TableFunctions/ITableFunction.h>
#include <Storages/ExternalDataSourceConfiguration.h>
namespace DB
{
class Context;
class TableFunctionS3Cluster;
/* deltaLake(source, [access_key_id, secret_access_key,] format, structure[, compression]) - creates a temporary DeltaLake table on S3.
*/
class TableFunctionDeltaLake : public ITableFunction
{
public:
static constexpr auto name = "deltaLake";
std::string getName() const override
{
return name;
}
protected:
StoragePtr executeImpl(
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return name; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
static void parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & configuration);
mutable StorageS3::Configuration configuration;
};
}
#endif

View File

@ -2,157 +2,20 @@
#if USE_AWS_S3
# include <filesystem>
# include <Access/Common/AccessFlags.h>
# include <Formats/FormatFactory.h>
# include <IO/S3Common.h>
# include <Interpreters/Context.h>
# include <Interpreters/evaluateConstantExpression.h>
# include <Interpreters/parseColumnsListForTableFunction.h>
# include <Parsers/ASTLiteral.h>
# include <Storages/StorageHudi.h>
# include <Storages/StorageURL.h>
# include <Storages/checkAndGetLiteralArgument.h>
# include <TableFunctions/ITableFunctionDataLake.h>
# include <TableFunctions/TableFunctionFactory.h>
# include <TableFunctions/TableFunctionHudi.h>
# include <TableFunctions/TableFunctionS3.h>
# include "registerTableFunctions.h"
namespace DB
{
namespace ErrorCodes
struct TableFunctionHudiName
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
void TableFunctionHudi::parseArgumentsImpl(
const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & base_configuration)
{
if (args.empty() || args.size() > 6)
throw Exception::createDeprecated(error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
auto * header_it = StorageURL::collectHeaders(args, base_configuration.headers_from_ast, context);
if (header_it != args.end())
args.erase(header_it);
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
/// Size -> argument indexes
static auto size_to_args = std::map<size_t, std::map<String, size_t>>{
{1, {{}}},
{2, {{"format", 1}}},
{5, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}}},
{6, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}, {"compression_method", 5}}}};
std::map<String, size_t> args_to_idx;
/// For 4 arguments we support 2 possible variants:
/// hudi(source, format, structure, compression_method) and hudi(source, access_key_id, access_key_id, format)
/// We can distinguish them by looking at the 2-nd argument: check if it's a format name or not.
if (args.size() == 4)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/access_key_id");
if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
args_to_idx = {{"format", 1}, {"structure", 2}, {"compression_method", 3}};
else
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}};
}
/// For 3 arguments we support 2 possible variants:
/// hudi(source, format, structure) and hudi(source, access_key_id, access_key_id)
/// We can distinguish them by looking at the 2-nd argument: check if it's a format name or not.
else if (args.size() == 3)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/access_key_id");
if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
args_to_idx = {{"format", 1}, {"structure", 2}};
else
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}};
}
else
{
args_to_idx = size_to_args[args.size()];
}
/// This argument is always the first
base_configuration.url = S3::URI(checkAndGetLiteralArgument<String>(args[0], "url"));
if (args_to_idx.contains("format"))
base_configuration.format = checkAndGetLiteralArgument<String>(args[args_to_idx["format"]], "format");
else
base_configuration.format = "Parquet";
if (args_to_idx.contains("structure"))
base_configuration.structure = checkAndGetLiteralArgument<String>(args[args_to_idx["structure"]], "structure");
if (args_to_idx.contains("compression_method"))
base_configuration.compression_method
= checkAndGetLiteralArgument<String>(args[args_to_idx["compression_method"]], "compression_method");
if (args_to_idx.contains("access_key_id"))
base_configuration.auth_settings.access_key_id
= checkAndGetLiteralArgument<String>(args[args_to_idx["access_key_id"]], "access_key_id");
if (args_to_idx.contains("secret_access_key"))
base_configuration.auth_settings.secret_access_key
= checkAndGetLiteralArgument<String>(args[args_to_idx["secret_access_key"]], "secret_access_key");
}
void TableFunctionHudi::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{
/// Parse args
ASTs & args_func = ast_function->children;
const auto message = fmt::format(
"The signature of table function {} could be the following:\n" \
" - url\n" \
" - url, format\n" \
" - url, format, structure\n" \
" - url, access_key_id, secret_access_key\n" \
" - url, format, structure, compression_method\n" \
" - url, access_key_id, secret_access_key, format\n" \
" - url, access_key_id, secret_access_key, format, structure\n" \
" - url, access_key_id, secret_access_key, format, structure, compression_method",
getName());
if (args_func.size() != 1)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' must have arguments", getName());
auto & args = args_func.at(0)->children;
parseArgumentsImpl(message, args, context, configuration);
}
ColumnsDescription TableFunctionHudi::getActualTableStructure(ContextPtr context) const
{
if (configuration.structure == "auto")
{
context->checkAccess(getSourceAccessType());
return StorageHudi::getTableStructureFromData(configuration, std::nullopt, context);
}
return parseColumnsListFromString(configuration.structure, context);
}
StoragePtr TableFunctionHudi::executeImpl(
const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
S3::URI s3_uri(configuration.url);
ColumnsDescription columns;
if (configuration.structure != "auto")
columns = parseColumnsListFromString(configuration.structure, context);
StoragePtr storage = std::make_shared<StorageHudi>(
configuration, StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription{}, String{}, context, std::nullopt);
storage->startup();
return storage;
}
static constexpr auto name = "hudi";
};
using TableFunctionHudi = ITableFunctionDataLake<TableFunctionHudiName, StorageHudi, TableFunctionS3>;
void registerTableFunctionHudi(TableFunctionFactory & factory)
{

View File

@ -1,44 +0,0 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include <TableFunctions/ITableFunction.h>
#include <Storages/ExternalDataSourceConfiguration.h>
namespace DB
{
class Context;
class TableFunctionS3Cluster;
/* hudi(source, [access_key_id, secret_access_key,] format, structure[, compression]) - creates a temporary Hudi table on S3.
*/
class TableFunctionHudi : public ITableFunction
{
public:
static constexpr auto name = "hudi";
std::string getName() const override
{
return name;
}
protected:
StoragePtr executeImpl(
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return name; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
static void parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & configuration);
mutable StorageS3::Configuration configuration;
};
}
#endif

View File

@ -0,0 +1,34 @@
#include "config.h"
#if USE_AWS_S3
# include <Storages/StorageIceberg.h>
# include <TableFunctions/ITableFunctionDataLake.h>
# include <TableFunctions/TableFunctionFactory.h>
# include <TableFunctions/TableFunctionS3.h>
# include "registerTableFunctions.h"
namespace DB
{
struct TableFunctionIcebergName
{
static constexpr auto name = "iceberg";
};
using TableFunctionIceberg = ITableFunctionDataLake<TableFunctionIcebergName, StorageIceberg, TableFunctionS3>;
void registerTableFunctionIceberg(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionIceberg>(
{.documentation
= {R"(The table function can be used to read the Iceberg table stored on object store.)",
Documentation::Examples{{"iceberg", "SELECT * FROM iceberg(url, access_key_id, secret_access_key)"}},
Documentation::Categories{"DataLake"}},
.allow_readonly = false});
}
}
#endif

View File

@ -29,7 +29,8 @@ namespace ErrorCodes
/// This is needed to avoid copy-pase. Because s3Cluster arguments only differ in additional argument (first) - cluster name
void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & s3_configuration)
void TableFunctionS3::parseArgumentsImpl(
const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & s3_configuration, bool get_format_from_file)
{
if (auto named_collection = tryGetNamedCollectionWithOverrides(args))
{
@ -105,7 +106,8 @@ void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & ar
s3_configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(args[args_to_idx["secret_access_key"]], "secret_access_key");
}
if (s3_configuration.format == "auto")
/// For DataLake table functions, we should specify default format.
if (s3_configuration.format == "auto" && get_format_from_file)
s3_configuration.format = FormatFactory::instance().getFormatFromFileName(s3_configuration.url.uri.getPath(), true);
}
@ -114,17 +116,7 @@ void TableFunctionS3::parseArguments(const ASTPtr & ast_function, ContextPtr con
/// Parse args
ASTs & args_func = ast_function->children;
const auto message = fmt::format(
"The signature of table function {} could be the following:\n" \
" - url\n" \
" - url, format\n" \
" - url, format, structure\n" \
" - url, access_key_id, secret_access_key\n" \
" - url, format, structure, compression_method\n" \
" - url, access_key_id, secret_access_key, format\n" \
" - url, access_key_id, secret_access_key, format, structure\n" \
" - url, access_key_id, secret_access_key, format, structure, compression_method",
getName());
const auto message = fmt::format("The signature of table function '{}' could be the following:\n{}", getName(), signature);
if (args_func.size() != 1)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' must have arguments.", getName());

View File

@ -12,7 +12,6 @@ namespace DB
{
class Context;
class TableFunctionS3Cluster;
/* s3(source, [access_key_id, secret_access_key,] format, structure[, compression]) - creates a temporary storage for a file in S3.
*/
@ -20,6 +19,14 @@ class TableFunctionS3 : public ITableFunction
{
public:
static constexpr auto name = "s3";
static constexpr auto signature = " - url\n"
" - url, format\n"
" - url, format, structure\n"
" - url, access_key_id, secret_access_key\n"
" - url, format, structure, compression_method\n"
" - url, access_key_id, secret_access_key, format\n"
" - url, access_key_id, secret_access_key, format, structure\n"
" - url, access_key_id, secret_access_key, format, structure, compression_method";
std::string getName() const override
{
return name;
@ -36,9 +43,14 @@ public:
{
return {"_path", "_file"};
}
static void parseArgumentsImpl(
const String & error_message,
ASTs & args,
ContextPtr context,
StorageS3::Configuration & configuration,
bool get_format_from_file = true);
protected:
friend class TableFunctionS3Cluster;
StoragePtr executeImpl(
const ASTPtr & ast_function,
@ -51,8 +63,6 @@ protected:
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
static void parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & configuration);
mutable StorageS3::Configuration configuration;
ColumnsDescription structure_hint;
};

View File

@ -73,7 +73,7 @@ void TableFunctionS3Cluster::parseArguments(const ASTPtr & ast_function, Context
std::copy(args.begin() + 1, args.end(), std::back_inserter(clipped_args));
/// StorageS3ClusterConfiguration inherints from StorageS3::Configuration, so it is safe to upcast it.
TableFunctionS3::parseArgumentsImpl(message.text, clipped_args, context, static_cast<StorageS3::Configuration & >(configuration));
TableFunctionS3::parseArgumentsImpl(message.text, clipped_args, context, static_cast<StorageS3::Configuration &>(configuration));
}

View File

@ -29,6 +29,7 @@ void registerTableFunctions()
registerTableFunctionCOS(factory);
registerTableFunctionHudi(factory);
registerTableFunctionDeltaLake(factory);
registerTableFunctionIceberg(factory);
registerTableFunctionOSS(factory);
#endif

View File

@ -26,6 +26,7 @@ void registerTableFunctionS3Cluster(TableFunctionFactory & factory);
void registerTableFunctionCOS(TableFunctionFactory & factory);
void registerTableFunctionHudi(TableFunctionFactory & factory);
void registerTableFunctionDeltaLake(TableFunctionFactory & factory);
void registerTableFunctionIceberg(TableFunctionFactory & factory);
void registerTableFunctionOSS(TableFunctionFactory & factory);
#endif

Some files were not shown because too many files have changed in this diff Show More