refactor and get rid of s3

This commit is contained in:
flynn 2023-02-13 08:29:22 +00:00
parent 9743a05cde
commit d3dd9421da
10 changed files with 96 additions and 133 deletions

View File

@ -46,12 +46,35 @@ namespace ErrorCodes
extern const int ILLEGAL_COLUMN;
}
IcebergMetaParser::IcebergMetaParser(const StorageS3::Configuration & configuration_, ContextPtr context_)
std::shared_ptr<ReadBuffer>
S3MetaReadHelper::createReadBuffer(const String & key, ContextPtr context, const StorageS3::Configuration & base_configuration)
{
S3Settings::RequestSettings request_settings;
request_settings.max_single_read_retries = context->getSettingsRef().s3_max_single_read_retries;
return std::make_shared<ReadBufferFromS3>(
base_configuration.client,
base_configuration.url.bucket,
key,
base_configuration.url.version_id,
request_settings,
context->getReadSettings());
}
std::vector<String>
S3MetaReadHelper::listFilesMatchSuffix(const StorageS3::Configuration & base_configuration, const String & directory, const String & suffix)
{
const auto & table_path = base_configuration.url.key;
return S3::listFiles(
*base_configuration.client, base_configuration.url.bucket, table_path, std::filesystem::path(table_path) / directory, suffix);
}
template <typename Configuration, typename MetaReadHelper>
IcebergMetaParser<Configuration, MetaReadHelper>::IcebergMetaParser(const Configuration & configuration_, ContextPtr context_)
: base_configuration(configuration_), context(context_)
{
}
std::vector<String> IcebergMetaParser::getFiles() const
template <typename Configuration, typename MetaReadHelper>
std::vector<String> IcebergMetaParser<Configuration, MetaReadHelper>::getFiles() const
{
auto metadata = getNewestMetaFile();
auto manifest_list = getManiFestList(metadata);
@ -66,29 +89,27 @@ std::vector<String> IcebergMetaParser::getFiles() const
return getFilesForRead(manifest_files);
}
String IcebergMetaParser::getNewestMetaFile() const
template <typename Configuration, typename MetaReadHelper>
String IcebergMetaParser<Configuration, MetaReadHelper>::getNewestMetaFile() const
{
/// Iceberg stores all the metadata.json in metadata directory, and the
/// newest version has the max version name, so we should list all of them
/// then find the newest metadata.
const auto & table_path = base_configuration.url.key;
std::vector<String> metadata_files = S3::listFiles(
*base_configuration.client,
base_configuration.url.bucket,
table_path,
std::filesystem::path(table_path) / metadata_directory,
".json");
constexpr auto meta_file_suffix = ".json";
auto metadata_files = MetaReadHelper::listFilesMatchSuffix(base_configuration, metadata_directory, meta_file_suffix);
if (metadata_files.empty())
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "The metadata file for Iceberg table with path {} doesn't exist", table_path);
throw Exception(
ErrorCodes::FILE_DOESNT_EXIST, "The metadata file for Iceberg table with path {} doesn't exist", base_configuration.url.key);
auto it = std::max_element(metadata_files.begin(), metadata_files.end());
return *it;
}
String IcebergMetaParser::getManiFestList(const String & metadata_name) const
template <typename Configuration, typename MetaReadHelper>
String IcebergMetaParser<Configuration, MetaReadHelper>::getManiFestList(const String & metadata_name) const
{
auto buffer = createS3ReadBuffer(metadata_name);
auto buffer = MetaReadHelper::createReadBuffer(metadata_name, context, base_configuration);
String json_str;
readJSONObjectPossiblyInvalid(json_str, *buffer);
@ -132,9 +153,10 @@ parseAvro(const std::unique_ptr<avro::DataFileReaderBase> & file_reader, const D
return columns;
}
std::vector<String> IcebergMetaParser::getManifestFiles(const String & manifest_list) const
template <typename Configuration, typename MetaReadHelper>
std::vector<String> IcebergMetaParser<Configuration, MetaReadHelper>::getManifestFiles(const String & manifest_list) const
{
auto buffer = createS3ReadBuffer(manifest_list);
auto buffer = MetaReadHelper::createReadBuffer(manifest_list, context, base_configuration);
auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buffer));
@ -168,12 +190,13 @@ std::vector<String> IcebergMetaParser::getManifestFiles(const String & manifest_
col->getFamilyName());
}
std::vector<String> IcebergMetaParser::getFilesForRead(const std::vector<String> & manifest_files) const
template <typename Configuration, typename MetaReadHelper>
std::vector<String> IcebergMetaParser<Configuration, MetaReadHelper>::getFilesForRead(const std::vector<String> & manifest_files) const
{
std::vector<String> keys;
for (const auto & manifest_file : manifest_files)
{
auto buffer = createS3ReadBuffer(manifest_file);
auto buffer = MetaReadHelper::createReadBuffer(manifest_file, context, base_configuration);
auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buffer));
@ -222,27 +245,27 @@ std::vector<String> IcebergMetaParser::getFilesForRead(const std::vector<String>
return keys;
}
std::shared_ptr<ReadBuffer> IcebergMetaParser::createS3ReadBuffer(const String & key) const
{
S3Settings::RequestSettings request_settings;
request_settings.max_single_read_retries = context->getSettingsRef().s3_max_single_read_retries;
return std::make_shared<ReadBufferFromS3>(
base_configuration.client,
base_configuration.url.bucket,
key,
base_configuration.url.version_id,
request_settings,
context->getReadSettings());
}
// generateQueryFromKeys constructs query from all parquet filenames
// for underlying StorageS3 engine
String IcebergMetaParser::generateQueryFromKeys(const std::vector<String> & keys, const String &)
template <typename Configuration, typename MetaReadHelper>
String IcebergMetaParser<Configuration, MetaReadHelper>::generateQueryFromKeys(const std::vector<String> & keys, const String &)
{
std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ","));
return new_query;
}
template IcebergMetaParser<StorageS3::Configuration, S3MetaReadHelper>::IcebergMetaParser(
const StorageS3::Configuration & configuration_, ContextPtr context_);
template std::vector<String> IcebergMetaParser<StorageS3::Configuration, S3MetaReadHelper>::getFiles() const;
template String IcebergMetaParser<StorageS3::Configuration, S3MetaReadHelper>::generateQueryFromKeys(
const std::vector<String> & keys, const String & format);
template String IcebergMetaParser<StorageS3::Configuration, S3MetaReadHelper>::getNewestMetaFile() const;
template String IcebergMetaParser<StorageS3::Configuration, S3MetaReadHelper>::getManiFestList(const String & metadata_name) const;
template std::vector<String>
IcebergMetaParser<StorageS3::Configuration, S3MetaReadHelper>::getManifestFiles(const String & manifest_list) const;
template std::vector<String>
IcebergMetaParser<StorageS3::Configuration, S3MetaReadHelper>::getFilesForRead(const std::vector<String> & manifest_files) const;
void registerStorageIceberg(StorageFactory & factory)
{
factory.registerStorage(

View File

@ -12,16 +12,26 @@
namespace DB
{
struct S3MetaReadHelper
{
static std::shared_ptr<ReadBuffer>
createReadBuffer(const String & key, ContextPtr context, const StorageS3::Configuration & base_configuration);
static std::vector<String>
listFilesMatchSuffix(const StorageS3::Configuration & base_configuration, const String & directory, const String & suffix);
};
// Class to parse iceberg metadata and find files needed for query in table
// Iceberg table directory outlook:
// table/
// data/
// metadata/
// The metadata has three layers: metadata -> manifest list -> manifest files
template <typename Configuration, typename MetaReadHelper>
class IcebergMetaParser
{
public:
IcebergMetaParser(const StorageS3::Configuration & configuration_, ContextPtr context_);
IcebergMetaParser(const Configuration & configuration_, ContextPtr context_);
std::vector<String> getFiles() const;
@ -29,7 +39,7 @@ public:
private:
static constexpr auto metadata_directory = "metadata";
StorageS3::Configuration base_configuration;
Configuration base_configuration;
ContextPtr context;
/// Just get file name
@ -47,7 +57,7 @@ struct StorageIcebergName
static constexpr auto data_directory_prefix = "data";
};
using StorageIceberg = IStorageDataLake<StorageIcebergName, IcebergMetaParser>;
using StorageIceberg = IStorageDataLake<StorageIcebergName, IcebergMetaParser<StorageS3::Configuration, S3MetaReadHelper>>;
}
#endif

View File

@ -39,6 +39,8 @@ template <typename Name, typename MetaParser>
class IStorageDataLake;
struct StorageIcebergName;
struct S3MetaReadHelper;
template <typename Configuration, typename MetaReadHelper>
class IcebergMetaParser;
struct StorageDeltaLakeName;
@ -317,7 +319,7 @@ private:
friend class TableFunctionS3Cluster;
friend class IStorageDataLake<StorageHudiName, HudiMetaParser>;
friend class IStorageDataLake<StorageDeltaLakeName, DeltaLakeMetaParser>;
friend class IStorageDataLake<StorageIcebergName, IcebergMetaParser>;
friend class IStorageDataLake<StorageIcebergName, IcebergMetaParser<StorageS3::Configuration, S3MetaReadHelper>>;
Configuration s3_configuration;
std::vector<String> keys;

View File

@ -4,9 +4,7 @@
#if USE_AWS_S3
#include <TableFunctions/ITableFunction.h>
#include <Storages/ExternalDataSourceConfiguration.h>
# include <TableFunctions/ITableFunction.h>
# include <filesystem>
# include <Access/Common/AccessFlags.h>
# include <Formats/FormatFactory.h>
@ -15,21 +13,19 @@
# include <Interpreters/evaluateConstantExpression.h>
# include <Interpreters/parseColumnsListForTableFunction.h>
# include <Parsers/ASTLiteral.h>
# include <Storages/ExternalDataSourceConfiguration.h>
# include <Storages/StorageURL.h>
# include <Storages/checkAndGetLiteralArgument.h>
# include <TableFunctions/TableFunctionS3.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
class Context;
class TableFunctionS3Cluster;
template <typename Name, typename Storage>
template <typename Name, typename Storage, typename Configuration>
class ITableFunctionDataLake : public ITableFunction
{
public:
@ -44,8 +40,6 @@ protected:
executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/)
const override
{
S3::URI s3_uri(configuration.url);
ColumnsDescription columns;
if (configuration.structure != "auto")
columns = parseColumnsListFromString(configuration.structure, context);
@ -58,7 +52,7 @@ protected:
return storage;
}
const char * getStorageTypeName() const override { return name; }
const char * getStorageTypeName() const override { return Storage::name; }
ColumnsDescription getActualTableStructure(ContextPtr context) const override
{
@ -93,85 +87,14 @@ protected:
auto & args = args_func.at(0)->children;
parseArgumentsImpl(message, args, context, configuration);
TableFunctionS3::parseArgumentsImpl<false>(message, args, context, configuration);
if (configuration.format == "auto")
configuration.format = "Parquet";
}
static void
parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & base_configuration)
{
if (args.empty() || args.size() > 6)
throw Exception::createDeprecated(error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
auto * header_it = StorageURL::collectHeaders(args, base_configuration.headers_from_ast, context);
if (header_it != args.end())
args.erase(header_it);
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
/// Size -> argument indexes
static auto size_to_args = std::map<size_t, std::map<String, size_t>>{
{1, {{}}},
{2, {{"format", 1}}},
{5, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}}},
{6, {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}, {"structure", 4}, {"compression_method", 5}}}};
std::map<String, size_t> args_to_idx;
/// For 4 arguments we support 2 possible variants:
/// dataLake(source, format, structure, compression_method) and iceberg(source, access_key_id, access_key_id, format)
/// We can distinguish them by looking at the 2-nd argument: check if it's a format name or not.
if (args.size() == 4)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/access_key_id");
if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
args_to_idx = {{"format", 1}, {"structure", 2}, {"compression_method", 3}};
else
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"format", 3}};
}
/// For 3 arguments we support 2 possible variants:
/// dataLake(source, format, structure) and iceberg(source, access_key_id, access_key_id)
/// We can distinguish them by looking at the 2-nd argument: check if it's a format name or not.
else if (args.size() == 3)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/access_key_id");
if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
args_to_idx = {{"format", 1}, {"structure", 2}};
else
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}};
}
else
{
args_to_idx = size_to_args[args.size()];
}
/// This argument is always the first
base_configuration.url = S3::URI(checkAndGetLiteralArgument<String>(args[0], "url"));
if (args_to_idx.contains("format"))
base_configuration.format = checkAndGetLiteralArgument<String>(args[args_to_idx["format"]], "format");
else
base_configuration.format = "Parquet";
if (args_to_idx.contains("structure"))
base_configuration.structure = checkAndGetLiteralArgument<String>(args[args_to_idx["structure"]], "structure");
if (args_to_idx.contains("compression_method"))
base_configuration.compression_method
= checkAndGetLiteralArgument<String>(args[args_to_idx["compression_method"]], "compression_method");
if (args_to_idx.contains("access_key_id"))
base_configuration.auth_settings.access_key_id
= checkAndGetLiteralArgument<String>(args[args_to_idx["access_key_id"]], "access_key_id");
if (args_to_idx.contains("secret_access_key"))
base_configuration.auth_settings.secret_access_key
= checkAndGetLiteralArgument<String>(args[args_to_idx["secret_access_key"]], "secret_access_key");
}
mutable StorageS3::Configuration configuration;
};
mutable Configuration configuration;
};
}
#endif

View File

@ -15,7 +15,7 @@ struct TableFunctionDeltaLakeName
static constexpr auto name = "deltaLake";
};
using TableFunctionDeltaLake = ITableFunctionDataLake<TableFunctionDeltaLakeName, StorageDeltaLake>;
using TableFunctionDeltaLake = ITableFunctionDataLake<TableFunctionDeltaLakeName, StorageDeltaLake, StorageS3::Configuration>;
void registerTableFunctionDeltaLake(TableFunctionFactory & factory)
{

View File

@ -14,7 +14,7 @@ struct TableFunctionHudiName
{
static constexpr auto name = "hudi";
};
using TableFunctionHudi = ITableFunctionDataLake<TableFunctionHudiName, StorageHudi>;
using TableFunctionHudi = ITableFunctionDataLake<TableFunctionHudiName, StorageHudi, StorageS3::Configuration>;
void registerTableFunctionHudi(TableFunctionFactory & factory)
{

View File

@ -16,7 +16,7 @@ struct TableFunctionIcebergName
static constexpr auto name = "iceberg";
};
using TableFunctionIceberg = ITableFunctionDataLake<TableFunctionIcebergName, StorageIceberg>;
using TableFunctionIceberg = ITableFunctionDataLake<TableFunctionIcebergName, StorageIceberg, StorageS3::Configuration>;
void registerTableFunctionIceberg(TableFunctionFactory & factory)
{

View File

@ -29,7 +29,9 @@ namespace ErrorCodes
/// This is needed to avoid copy-pase. Because s3Cluster arguments only differ in additional argument (first) - cluster name
void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & s3_configuration)
template <bool get_format_from_file>
void TableFunctionS3::parseArgumentsImpl(
const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & s3_configuration)
{
if (auto named_collection = tryGetNamedCollectionWithOverrides(args))
{
@ -105,10 +107,14 @@ void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & ar
s3_configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(args[args_to_idx["secret_access_key"]], "secret_access_key");
}
if (s3_configuration.format == "auto")
/// For DataLake table functions, we should specify default format.
if (s3_configuration.format == "auto" && get_format_from_file)
s3_configuration.format = FormatFactory::instance().getFormatFromFileName(s3_configuration.url.uri.getPath(), true);
}
template void TableFunctionS3::parseArgumentsImpl<false>(
const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & s3_configuration);
void TableFunctionS3::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{
/// Parse args

View File

@ -12,7 +12,6 @@ namespace DB
{
class Context;
class TableFunctionS3Cluster;
/* s3(source, [access_key_id, secret_access_key,] format, structure[, compression]) - creates a temporary storage for a file in S3.
*/
@ -37,8 +36,10 @@ public:
return {"_path", "_file"};
}
template <bool get_format_from_file = true>
static void parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & configuration);
protected:
friend class TableFunctionS3Cluster;
StoragePtr executeImpl(
const ASTPtr & ast_function,
@ -51,8 +52,6 @@ protected:
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
static void parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3::Configuration & configuration);
mutable StorageS3::Configuration configuration;
ColumnsDescription structure_hint;
};

View File

@ -73,7 +73,7 @@ void TableFunctionS3Cluster::parseArguments(const ASTPtr & ast_function, Context
std::copy(args.begin() + 1, args.end(), std::back_inserter(clipped_args));
/// StorageS3ClusterConfiguration inherints from StorageS3::Configuration, so it is safe to upcast it.
TableFunctionS3::parseArgumentsImpl(message.text, clipped_args, context, static_cast<StorageS3::Configuration & >(configuration));
TableFunctionS3::parseArgumentsImpl(message.text, clipped_args, context, static_cast<StorageS3::Configuration &>(configuration));
}