Remove unnecessary changes

This commit is contained in:
divanik 2024-09-27 14:30:07 +00:00
parent b0c3c7f52a
commit 3c8594d401
16 changed files with 407 additions and 584 deletions

View File

@ -0,0 +1,86 @@
#pragma once
#include "config.h"
#if USE_AVRO
# include <Storages/IStorage.h>
# include <Storages/ObjectStorage/Azure/Configuration.h>
# include <Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h>
# include <Storages/ObjectStorage/DataLakes/HudiMetadata.h>
# include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
# include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
# include <Storages/ObjectStorage/Local/Configuration.h>
# include <Storages/ObjectStorage/S3/Configuration.h>
# include <Storages/ObjectStorage/StorageObjectStorage.h>
# include <Storages/StorageFactory.h>
# include <Common/logger_useful.h>
# include <memory>
namespace DB
{
template <typename T>
concept StorageConfiguration = std::derived_from<T, StorageObjectStorage::Configuration>;
template <StorageConfiguration BaseStorageConfiguration, typename DataLakeMetadata>
class DataLakeConfiguration : public BaseStorageConfiguration, public std::enable_shared_from_this<StorageObjectStorage::Configuration>
{
public:
using Configuration = StorageObjectStorage::Configuration;
bool isDataLakeConfiguration() const override { return true; }
std::string getEngineName() const override { return DataLakeMetadata::name; }
void update(ObjectStoragePtr object_storage, ContextPtr local_context) override
{
auto new_metadata = DataLakeMetadata::create(object_storage, weak_from_this(), local_context);
if (current_metadata && *current_metadata == *new_metadata)
return;
current_metadata = std::move(new_metadata);
BaseStorageConfiguration::setPaths(current_metadata->getDataFiles());
BaseStorageConfiguration::setPartitionColumns(current_metadata->getPartitionColumns());
}
private:
DataLakeMetadataPtr current_metadata;
ReadFromFormatInfo prepareReadingFromFormat(
ObjectStoragePtr object_storage,
const Strings & requested_columns,
const StorageSnapshotPtr & storage_snapshot,
bool supports_subset_of_columns,
ContextPtr local_context) override
{
auto info = DB::prepareReadingFromFormat(requested_columns, storage_snapshot, supports_subset_of_columns);
if (!current_metadata)
{
current_metadata = DataLakeMetadata::create(object_storage, weak_from_this(), local_context);
}
auto column_mapping = current_metadata->getColumnNameToPhysicalNameMapping();
if (!column_mapping.empty())
{
for (const auto & [column_name, physical_name] : column_mapping)
{
auto & column = info.format_header.getByName(column_name);
column.name = physical_name;
}
}
return info;
}
};
using StorageS3IcebergConfiguration = DataLakeConfiguration<StorageS3Configuration, IcebergMetadata>;
using StorageAzureIcebergConfiguration = DataLakeConfiguration<StorageAzureConfiguration, IcebergMetadata>;
using StorageLocalIcebergConfiguration = DataLakeConfiguration<StorageLocalConfiguration, IcebergMetadata>;
using StorageS3DeltaLakeConfiguration = DataLakeConfiguration<StorageS3Configuration, DeltaLakeMetadata>;
using StorageS3HudiConfiguration = DataLakeConfiguration<StorageS3Configuration, HudiMetadata>;
}
#endif

View File

@ -55,22 +55,18 @@ namespace ErrorCodes
struct DeltaLakeMetadataImpl
{
using ConfigurationPtr = DeltaLakeMetadata::ConfigurationPtr;
using ConfigurationObservePtr = DeltaLakeMetadata::ConfigurationObservePtr;
ObjectStoragePtr object_storage;
ConfigurationPtr configuration;
ConfigurationObservePtr configuration;
ContextPtr context;
/**
* Useful links:
* - https://github.com/delta-io/delta/blob/master/PROTOCOL.md#data-files
*/
DeltaLakeMetadataImpl(ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
ContextPtr context_)
: object_storage(object_storage_)
, configuration(configuration_)
, context(context_)
DeltaLakeMetadataImpl(ObjectStoragePtr object_storage_, ConfigurationObservePtr configuration_, ContextPtr context_)
: object_storage(object_storage_), configuration(configuration_), context(context_)
{
}
@ -110,6 +106,7 @@ struct DeltaLakeMetadataImpl
};
DeltaLakeMetadata processMetadataFiles()
{
auto configuration_ptr = configuration.lock();
std::set<String> result_files;
NamesAndTypesList current_schema;
DataLakePartitionColumns current_partition_columns;
@ -121,7 +118,7 @@ struct DeltaLakeMetadataImpl
while (true)
{
const auto filename = withPadding(++current_version) + metadata_file_suffix;
const auto file_path = std::filesystem::path(configuration->getPath()) / deltalake_metadata_directory / filename;
const auto file_path = std::filesystem::path(configuration_ptr->getPath()) / deltalake_metadata_directory / filename;
if (!object_storage->exists(StoredObject(file_path)))
break;
@ -135,7 +132,7 @@ struct DeltaLakeMetadataImpl
}
else
{
const auto keys = listFiles(*object_storage, *configuration, deltalake_metadata_directory, metadata_file_suffix);
const auto keys = listFiles(*object_storage, *configuration_ptr, deltalake_metadata_directory, metadata_file_suffix);
for (const String & key : keys)
processMetadataFile(key, current_schema, current_partition_columns, result_files);
}
@ -244,6 +241,8 @@ struct DeltaLakeMetadataImpl
}
}
auto configuration_ptr = configuration.lock();
if (object->has("add"))
{
auto add_object = object->get("add").extract<Poco::JSON::Object::Ptr>();
@ -251,7 +250,7 @@ struct DeltaLakeMetadataImpl
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to extract `add` field");
auto path = add_object->getValue<String>("path");
result.insert(fs::path(configuration->getPath()) / path);
result.insert(fs::path(configuration_ptr->getPath()) / path);
auto filename = fs::path(path).filename().string();
auto it = file_partition_columns.find(filename);
@ -295,7 +294,7 @@ struct DeltaLakeMetadataImpl
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to extract `remove` field");
auto path = remove_object->getValue<String>("path");
result.erase(fs::path(configuration->getPath()) / path);
result.erase(fs::path(configuration_ptr->getPath()) / path);
}
}
}
@ -486,7 +485,9 @@ struct DeltaLakeMetadataImpl
*/
size_t readLastCheckpointIfExists() const
{
const auto last_checkpoint_file = std::filesystem::path(configuration->getPath()) / deltalake_metadata_directory / "_last_checkpoint";
auto configuration_ptr = configuration.lock();
const auto last_checkpoint_file
= std::filesystem::path(configuration_ptr->getPath()) / deltalake_metadata_directory / "_last_checkpoint";
if (!object_storage->exists(StoredObject(last_checkpoint_file)))
return 0;
@ -552,7 +553,11 @@ struct DeltaLakeMetadataImpl
return 0;
const auto checkpoint_filename = withPadding(version) + ".checkpoint.parquet";
const auto checkpoint_path = std::filesystem::path(configuration->getPath()) / deltalake_metadata_directory / checkpoint_filename;
auto configuration_ptr = configuration.lock();
const auto checkpoint_path
= std::filesystem::path(configuration_ptr->getPath()) / deltalake_metadata_directory / checkpoint_filename;
LOG_TRACE(log, "Using checkpoint file: {}", checkpoint_path.string());
@ -667,7 +672,7 @@ struct DeltaLakeMetadataImpl
}
LOG_TEST(log, "Adding {}", path);
const auto [_, inserted] = result.insert(std::filesystem::path(configuration->getPath()) / path);
const auto [_, inserted] = result.insert(std::filesystem::path(configuration_ptr->getPath()) / path);
if (!inserted)
throw Exception(ErrorCodes::INCORRECT_DATA, "File already exists {}", path);
}
@ -678,10 +683,7 @@ struct DeltaLakeMetadataImpl
LoggerPtr log = getLogger("DeltaLakeMetadataParser");
};
DeltaLakeMetadata::DeltaLakeMetadata(
ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
ContextPtr context_)
DeltaLakeMetadata::DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObservePtr configuration_, ContextPtr context_)
{
auto impl = DeltaLakeMetadataImpl(object_storage_, configuration_, context_);
auto result = impl.processMetadataFiles();

View File

@ -12,13 +12,10 @@ namespace DB
class DeltaLakeMetadata final : public IDataLakeMetadata
{
public:
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
using ConfigurationObservePtr = StorageObjectStorage::ConfigurationObservePtr;
static constexpr auto name = "DeltaLake";
DeltaLakeMetadata(
ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
ContextPtr context_);
DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObservePtr configuration_, ContextPtr context_);
Strings getDataFiles() const override { return data_files; }
@ -36,10 +33,7 @@ public:
&& data_files == deltalake_metadata->data_files;
}
static DataLakeMetadataPtr create(
ObjectStoragePtr object_storage,
ConfigurationPtr configuration,
ContextPtr local_context)
static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObservePtr configuration, ContextPtr local_context)
{
return std::make_unique<DeltaLakeMetadata>(object_storage, configuration, local_context);
}

View File

@ -43,8 +43,9 @@ namespace ErrorCodes
*/
Strings HudiMetadata::getDataFilesImpl() const
{
auto configuration_ptr = configuration.lock();
auto log = getLogger("HudiMetadata");
const auto keys = listFiles(*object_storage, *configuration, "", Poco::toLower(configuration->format));
const auto keys = listFiles(*object_storage, *configuration_ptr, "", Poco::toLower(configuration_ptr->format));
using Partition = std::string;
using FileID = std::string;
@ -86,13 +87,8 @@ Strings HudiMetadata::getDataFilesImpl() const
return result;
}
HudiMetadata::HudiMetadata(
ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
ContextPtr context_)
: WithContext(context_)
, object_storage(object_storage_)
, configuration(configuration_)
HudiMetadata::HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObservePtr configuration_, ContextPtr context_)
: WithContext(context_), object_storage(object_storage_), configuration(configuration_)
{
}

View File

@ -13,13 +13,13 @@ namespace DB
class HudiMetadata final : public IDataLakeMetadata, private WithContext
{
public:
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
using ConfigurationObservePtr = StorageObjectStorage::ConfigurationObservePtr;
static constexpr auto name = "Hudi";
HudiMetadata(
ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
ConfigurationObservePtr configuration_,
ContextPtr context_);
Strings getDataFiles() const override;
@ -40,7 +40,7 @@ public:
static DataLakeMetadataPtr create(
ObjectStoragePtr object_storage,
ConfigurationPtr configuration,
ConfigurationObservePtr configuration,
ContextPtr local_context)
{
return std::make_unique<HudiMetadata>(object_storage, configuration, local_context);
@ -48,7 +48,7 @@ public:
private:
const ObjectStoragePtr object_storage;
const ConfigurationPtr configuration;
const ConfigurationObservePtr configuration;
mutable Strings data_files;
std::unordered_map<String, String> column_name_to_physical_name;
DataLakePartitionColumns partition_columns;

View File

@ -1,172 +0,0 @@
#pragma once
#include "config.h"
#if USE_AVRO
#include <Storages/IStorage.h>
#include <Storages/StorageFactory.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
#include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
#include <Storages/ObjectStorage/DataLakes/HudiMetadata.h>
#include <Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h>
#include <Common/logger_useful.h>
namespace DB
{
/// Storage for read-only integration with Apache Iceberg tables in Amazon S3 (see https://iceberg.apache.org/)
/// Right now it's implemented on top of StorageS3 and right now it doesn't support
/// many Iceberg features like schema evolution, partitioning, positional and equality deletes.
template <typename DataLakeMetadata>
class IStorageDataLake final : public StorageObjectStorage
{
public:
using Storage = StorageObjectStorage;
using ConfigurationPtr = Storage::ConfigurationPtr;
static StoragePtr create(
ConfigurationPtr base_configuration,
ContextPtr context,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment_,
std::optional<FormatSettings> format_settings_,
LoadingStrictnessLevel mode)
{
auto object_storage = base_configuration->createObjectStorage(context, /* is_readonly */true);
DataLakeMetadataPtr metadata;
NamesAndTypesList schema_from_metadata;
const bool use_schema_from_metadata = columns_.empty();
if (base_configuration->format == "auto")
base_configuration->format = "Parquet";
ConfigurationPtr configuration = base_configuration->clone();
try
{
metadata = DataLakeMetadata::create(object_storage, base_configuration, context);
configuration->setPaths(metadata->getDataFiles());
if (use_schema_from_metadata)
schema_from_metadata = metadata->getTableSchema();
}
catch (...)
{
if (mode <= LoadingStrictnessLevel::CREATE)
throw;
metadata.reset();
configuration->setPaths({});
tryLogCurrentException(__PRETTY_FUNCTION__);
}
return std::make_shared<IStorageDataLake<DataLakeMetadata>>(
base_configuration, std::move(metadata), configuration, object_storage,
context, table_id_,
use_schema_from_metadata ? ColumnsDescription(schema_from_metadata) : columns_,
constraints_, comment_, format_settings_);
}
String getName() const override { return DataLakeMetadata::name; }
static ColumnsDescription getTableStructureFromData(
ObjectStoragePtr object_storage_,
ConfigurationPtr base_configuration,
const std::optional<FormatSettings> & format_settings_,
ContextPtr local_context)
{
auto metadata = DataLakeMetadata::create(object_storage_, base_configuration, local_context);
auto schema_from_metadata = metadata->getTableSchema();
if (!schema_from_metadata.empty())
{
return ColumnsDescription(std::move(schema_from_metadata));
}
else
{
ConfigurationPtr configuration = base_configuration->clone();
configuration->setPaths(metadata->getDataFiles());
std::string sample_path;
return Storage::resolveSchemaFromData(
object_storage_, configuration, format_settings_, sample_path, local_context);
}
}
void updateConfiguration(ContextPtr local_context) override
{
Storage::updateConfiguration(local_context);
auto new_metadata = DataLakeMetadata::create(Storage::object_storage, base_configuration, local_context);
if (current_metadata && *current_metadata == *new_metadata)
return;
current_metadata = std::move(new_metadata);
auto updated_configuration = base_configuration->clone();
updated_configuration->setPaths(current_metadata->getDataFiles());
updated_configuration->setPartitionColumns(current_metadata->getPartitionColumns());
Storage::configuration = updated_configuration;
}
template <typename... Args>
IStorageDataLake(
ConfigurationPtr base_configuration_,
DataLakeMetadataPtr metadata_,
Args &&... args)
: Storage(std::forward<Args>(args)...)
, base_configuration(base_configuration_)
, current_metadata(std::move(metadata_))
{
if (base_configuration->format == "auto")
{
base_configuration->format = Storage::configuration->format;
}
if (current_metadata)
{
const auto & columns = current_metadata->getPartitionColumns();
base_configuration->setPartitionColumns(columns);
Storage::configuration->setPartitionColumns(columns);
}
}
private:
ConfigurationPtr base_configuration;
DataLakeMetadataPtr current_metadata;
ReadFromFormatInfo prepareReadingFromFormat(
const Strings & requested_columns,
const StorageSnapshotPtr & storage_snapshot,
bool supports_subset_of_columns,
ContextPtr local_context) override
{
auto info = DB::prepareReadingFromFormat(requested_columns, storage_snapshot, supports_subset_of_columns);
if (!current_metadata)
{
Storage::updateConfiguration(local_context);
current_metadata = DataLakeMetadata::create(Storage::object_storage, base_configuration, local_context);
}
auto column_mapping = current_metadata->getColumnNameToPhysicalNameMapping();
if (!column_mapping.empty())
{
for (const auto & [column_name, physical_name] : column_mapping)
{
auto & column = info.format_header.getByName(column_name);
column.name = physical_name;
}
}
return info;
}
};
using StorageIceberg = IStorageDataLake<IcebergMetadata>;
using StorageDeltaLake = IStorageDataLake<DeltaLakeMetadata>;
using StorageHudi = IStorageDataLake<HudiMetadata>;
}
#endif

View File

@ -50,7 +50,7 @@ extern const int UNSUPPORTED_METHOD;
IcebergMetadata::IcebergMetadata(
ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
ConfigurationObservePtr configuration_,
DB::ContextPtr context_,
Int32 metadata_version_,
Int32 format_version_,
@ -381,12 +381,12 @@ std::pair<Int32, String> getMetadataFileAndVersion(
}
DataLakeMetadataPtr IcebergMetadata::create(
ObjectStoragePtr object_storage,
ConfigurationPtr configuration,
ContextPtr local_context)
DataLakeMetadataPtr
IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObservePtr configuration, ContextPtr local_context)
{
const auto [metadata_version, metadata_file_path] = getMetadataFileAndVersion(object_storage, *configuration);
auto configuration_ptr = configuration.lock();
const auto [metadata_version, metadata_file_path] = getMetadataFileAndVersion(object_storage, *configuration_ptr);
LOG_DEBUG(getLogger("IcebergMetadata"), "Parse metadata {}", metadata_file_path);
auto read_settings = local_context->getReadSettings();
auto buf = object_storage->readObject(StoredObject(metadata_file_path), read_settings);
@ -411,12 +411,13 @@ DataLakeMetadataPtr IcebergMetadata::create(
if (snapshot->getValue<Int64>("snapshot-id") == current_snapshot_id)
{
const auto path = snapshot->getValue<String>("manifest-list");
manifest_list_file = std::filesystem::path(configuration->getPath()) / "metadata" / std::filesystem::path(path).filename();
manifest_list_file = std::filesystem::path(configuration_ptr->getPath()) / "metadata" / std::filesystem::path(path).filename();
break;
}
}
return std::make_unique<IcebergMetadata>(object_storage, configuration, local_context, metadata_version, format_version, manifest_list_file, schema_id, schema);
return std::make_unique<IcebergMetadata>(
object_storage, configuration_ptr, local_context, metadata_version, format_version, manifest_list_file, schema_id, schema);
}
/**
@ -446,6 +447,7 @@ DataLakeMetadataPtr IcebergMetadata::create(
*/
Strings IcebergMetadata::getDataFiles() const
{
auto configuration_ptr = configuration.lock();
if (!data_files.empty())
return data_files;
@ -478,7 +480,7 @@ Strings IcebergMetadata::getDataFiles() const
{
const auto file_path = col_str->getDataAt(i).toView();
const auto filename = std::filesystem::path(file_path).filename();
manifest_files.emplace_back(std::filesystem::path(configuration->getPath()) / "metadata" / filename);
manifest_files.emplace_back(std::filesystem::path(configuration_ptr->getPath()) / "metadata" / filename);
}
NameSet files;
@ -612,9 +614,9 @@ Strings IcebergMetadata::getDataFiles() const
const auto status = status_int_column->getInt(i);
const auto data_path = std::string(file_path_string_column->getDataAt(i).toView());
const auto pos = data_path.find(configuration->getPath());
const auto pos = data_path.find(configuration_ptr->getPath());
if (pos == std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected to find {} in data path: {}", configuration->getPath(), data_path);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected to find {} in data path: {}", configuration_ptr->getPath(), data_path);
const auto file_path = data_path.substr(pos);

View File

@ -61,13 +61,13 @@ namespace DB
class IcebergMetadata : public IDataLakeMetadata, private WithContext
{
public:
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
using ConfigurationObservePtr = StorageObjectStorage::ConfigurationObservePtr;
static constexpr auto name = "Iceberg";
IcebergMetadata(
ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
ConfigurationObservePtr configuration_,
ContextPtr context_,
Int32 metadata_version_,
Int32 format_version_,
@ -94,14 +94,14 @@ public:
static DataLakeMetadataPtr create(
ObjectStoragePtr object_storage,
ConfigurationPtr configuration,
ConfigurationObservePtr configuration,
ContextPtr local_context);
private:
size_t getVersion() const { return metadata_version; }
const ObjectStoragePtr object_storage;
const ConfigurationPtr configuration;
const ConfigurationObservePtr configuration;
Int32 metadata_version;
Int32 format_version;
String manifest_list_file;

View File

@ -1,132 +0,0 @@
#include "config.h"
#if USE_AWS_S3
# include <Storages/ObjectStorage/Azure/Configuration.h>
# include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
# include <Storages/ObjectStorage/DataLakes/IStorageDataLake.h>
# include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
# include <Storages/ObjectStorage/Local/Configuration.h>
# include <Storages/ObjectStorage/S3/Configuration.h>
namespace DB
{
#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format.
void registerStorageIceberg(StorageFactory & factory)
{
factory.registerStorage(
"Iceberg",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3Configuration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageIceberg::create(
configuration, args.getContext(), args.table_id, args.columns, args.constraints, args.comment, std::nullopt, args.mode);
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
factory.registerStorage(
"IcebergS3",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3Configuration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageIceberg::create(
configuration, args.getContext(), args.table_id, args.columns, args.constraints, args.comment, std::nullopt, args.mode);
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
factory.registerStorage(
"IcebergAzure",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageAzureConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), true);
return StorageIceberg::create(
configuration, args.getContext(), args.table_id, args.columns, args.constraints, args.comment, std::nullopt, args.mode);
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::AZURE,
});
factory.registerStorage(
"IcebergLocal",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageLocalConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageIceberg::create(
configuration, args.getContext(), args.table_id, args.columns,
args.constraints, args.comment, std::nullopt, args.mode);
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::FILE,
});
}
#endif
#if USE_PARQUET
void registerStorageDeltaLake(StorageFactory & factory)
{
factory.registerStorage(
"DeltaLake",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3Configuration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageDeltaLake::create(
configuration, args.getContext(), args.table_id, args.columns,
args.constraints, args.comment, std::nullopt, args.mode);
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
}
#endif
void registerStorageHudi(StorageFactory & factory)
{
factory.registerStorage(
"Hudi",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3Configuration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageHudi::create(
configuration, args.getContext(), args.table_id, args.columns,
args.constraints, args.comment, std::nullopt, args.mode);
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
}
}
#endif

View File

@ -124,12 +124,11 @@ bool StorageObjectStorage::supportsSubsetOfColumns(const ContextPtr & context) c
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->format, context, format_settings);
}
void StorageObjectStorage::updateConfiguration(ContextPtr context)
void StorageObjectStorage::Configuration::update(ObjectStoragePtr object_storage_ptr, ContextPtr context)
{
IObjectStorage::ApplyNewSettingsOptions options{ .allow_client_change = !configuration->isStaticConfiguration() };
object_storage->applyNewSettings(context->getConfigRef(), configuration->getTypeName() + ".", context, options);
IObjectStorage::ApplyNewSettingsOptions options{.allow_client_change = !isStaticConfiguration()};
object_storage_ptr->applyNewSettings(context->getConfigRef(), getTypeName() + ".", context, options);
}
namespace
{
class ReadFromObjectStorageStep : public SourceStepWithFilter
@ -243,7 +242,8 @@ private:
};
}
ReadFromFormatInfo StorageObjectStorage::prepareReadingFromFormat(
ReadFromFormatInfo StorageObjectStorage::Configuration::prepareReadingFromFormat(
ObjectStoragePtr,
const Strings & requested_columns,
const StorageSnapshotPtr & storage_snapshot,
bool supports_subset_of_columns,
@ -262,7 +262,7 @@ void StorageObjectStorage::read(
size_t max_block_size,
size_t num_streams)
{
updateConfiguration(local_context);
configuration->update(object_storage, local_context);
if (partition_by && configuration->withPartitionWildcard())
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
@ -270,8 +270,8 @@ void StorageObjectStorage::read(
getName());
}
const auto read_from_format_info = prepareReadingFromFormat(
column_names, storage_snapshot, supportsSubsetOfColumns(local_context), local_context);
const auto read_from_format_info = configuration->prepareReadingFromFormat(
object_storage, column_names, storage_snapshot, supportsSubsetOfColumns(local_context), local_context);
const bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty())
&& local_context->getSettingsRef()[Setting::optimize_count_from_files];
@ -300,7 +300,7 @@ SinkToStoragePtr StorageObjectStorage::write(
ContextPtr local_context,
bool /* async_insert */)
{
updateConfiguration(local_context);
configuration->update(object_storage, local_context);
const auto sample_block = metadata_snapshot->getSampleBlock();
const auto & settings = configuration->getQuerySettings(local_context);

View File

@ -25,6 +25,7 @@ class StorageObjectStorage : public IStorage
public:
class Configuration;
using ConfigurationPtr = std::shared_ptr<Configuration>;
using ConfigurationObservePtr = std::weak_ptr<Configuration>;
using ObjectInfo = RelativePathWithMetadata;
using ObjectInfoPtr = std::shared_ptr<ObjectInfo>;
using ObjectInfos = std::vector<ObjectInfoPtr>;
@ -120,16 +121,8 @@ public:
const ContextPtr & context);
protected:
virtual void updateConfiguration(ContextPtr local_context);
String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context);
virtual ReadFromFormatInfo prepareReadingFromFormat(
const Strings & requested_columns,
const StorageSnapshotPtr & storage_snapshot,
bool supports_subset_of_columns,
ContextPtr local_context);
static std::unique_ptr<ReadBufferIterator> createReadBufferIterator(
const ObjectStoragePtr & object_storage,
const ConfigurationPtr & configuration,
@ -206,14 +199,26 @@ public:
void setPartitionColumns(const DataLakePartitionColumns & columns) { partition_columns = columns; }
const DataLakePartitionColumns & getPartitionColumns() const { return partition_columns; }
virtual bool isDataLakeConfiguration() const { return false; }
virtual ReadFromFormatInfo prepareReadingFromFormat(
ObjectStoragePtr object_storage,
const Strings & requested_columns,
const StorageSnapshotPtr & storage_snapshot,
bool supports_subset_of_columns,
ContextPtr local_context);
String format = "auto";
String compression_method = "auto";
String structure = "auto";
virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context);
protected:
virtual void fromNamedCollection(const NamedCollection & collection, ContextPtr context) = 0;
virtual void fromAST(ASTs & args, ContextPtr context, bool with_structure) = 0;
void assertInitialized() const;
bool initialized = false;

View File

@ -2,6 +2,7 @@
#include <Core/Settings.h>
#include <Formats/FormatFactory.h>
#include <Storages/ObjectStorage/Azure/Configuration.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <Storages/ObjectStorage/HDFS/Configuration.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
@ -148,4 +149,108 @@ void registerStorageObjectStorage(StorageFactory & factory)
UNUSED(factory);
}
#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format.
void registerStorageIceberg(StorageFactory & factory)
{
factory.registerStorage(
"Iceberg",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3IcebergConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
factory.registerStorage(
"IcebergS3",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3IcebergConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
factory.registerStorage(
"IcebergAzure",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageAzureIcebergConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), true);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::AZURE,
});
factory.registerStorage(
"IcebergLocal",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageLocalIcebergConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::FILE,
});
}
#endif
#if USE_PARQUET
void registerStorageDeltaLake(StorageFactory & factory)
{
factory.registerStorage(
"DeltaLake",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3DeltaLakeConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
}
#endif
void registerStorageHudi(StorageFactory & factory)
{
factory.registerStorage(
"Hudi",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3HudiConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
}
}

View File

@ -1,120 +0,0 @@
#pragma once
#include "config.h"
#include <Access/Common/AccessFlags.h>
#include <Interpreters/Context.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/DataLakes/IStorageDataLake.h>
#include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
#include <TableFunctions/TableFunctionFactory.h>
namespace DB
{
template <typename Name, typename Storage, typename TableFunction>
class ITableFunctionDataLake : public TableFunction
{
public:
static constexpr auto name = Name::name;
std::string getName() const override { return name; }
protected:
StoragePtr executeImpl(
const ASTPtr & /* ast_function */,
ContextPtr context,
const std::string & table_name,
ColumnsDescription cached_columns,
bool /*is_insert_query*/) const override
{
ColumnsDescription columns;
auto configuration = TableFunction::getConfiguration();
if (configuration->structure != "auto")
columns = parseColumnsListFromString(configuration->structure, context);
else if (!cached_columns.empty())
columns = cached_columns;
StoragePtr storage = Storage::create(
configuration, context, StorageID(TableFunction::getDatabaseName(), table_name),
columns, ConstraintsDescription{}, String{}, std::nullopt, LoadingStrictnessLevel::CREATE);
storage->startup();
return storage;
}
const char * getStorageTypeName() const override { return name; }
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override
{
auto configuration = TableFunction::getConfiguration();
if (configuration->structure == "auto")
{
context->checkAccess(TableFunction::getSourceAccessType());
auto object_storage = TableFunction::getObjectStorage(context, !is_insert_query);
return Storage::getTableStructureFromData(object_storage, configuration, std::nullopt, context);
}
else
{
return parseColumnsListFromString(configuration->structure, context);
}
}
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override
{
auto configuration = TableFunction::getConfiguration();
configuration->format = "Parquet";
/// Set default format to Parquet if it's not specified in arguments.
TableFunction::parseArguments(ast_function, context);
}
};
struct TableFunctionIcebergName
{
static constexpr auto name = "iceberg";
};
struct TableFunctionIcebergS3Name
{
static constexpr auto name = "icebergS3";
};
struct TableFunctionIcebergAzureName
{
static constexpr auto name = "icebergAzure";
};
struct TableFunctionIcebergLocalName
{
static constexpr auto name = "icebergLocal";
};
struct TableFunctionDeltaLakeName
{
static constexpr auto name = "deltaLake";
};
struct TableFunctionHudiName
{
static constexpr auto name = "hudi";
};
#if USE_AVRO
# if USE_AWS_S3
using TableFunctionIceberg = ITableFunctionDataLake<TableFunctionIcebergName, StorageIceberg, TableFunctionS3>;
using TableFunctionIcebergS3 = ITableFunctionDataLake<TableFunctionIcebergS3Name, StorageIceberg, TableFunctionS3>;
# endif
# if USE_AZURE_BLOB_STORAGE
using TableFunctionIcebergAzure = ITableFunctionDataLake<TableFunctionIcebergAzureName, StorageIceberg, TableFunctionAzureBlob>;
# endif
using TableFunctionIcebergLocal = ITableFunctionDataLake<TableFunctionIcebergLocalName, StorageIceberg, TableFunctionLocal>;
#endif
#if USE_AWS_S3
# if USE_PARQUET
using TableFunctionDeltaLake = ITableFunctionDataLake<TableFunctionDeltaLakeName, StorageDeltaLake, TableFunctionS3>;
#endif
using TableFunctionHudi = ITableFunctionDataLake<TableFunctionHudiName, StorageHudi, TableFunctionS3>;
#endif
}

View File

@ -225,4 +225,94 @@ template class TableFunctionObjectStorage<HDFSDefinition, StorageHDFSConfigurati
template class TableFunctionObjectStorage<HDFSClusterDefinition, StorageHDFSConfiguration>;
#endif
template class TableFunctionObjectStorage<LocalDefinition, StorageLocalConfiguration>;
#if USE_AVRO
void registerTableFunctionIceberg(TableFunctionFactory & factory)
{
# if USE_AWS_S3
factory.registerFunction<TableFunctionIceberg>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store. Alias to icebergS3)",
.examples{{"iceberg", "SELECT * FROM iceberg(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
factory.registerFunction<TableFunctionIcebergS3>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store.)",
.examples{{"icebergS3", "SELECT * FROM icebergS3(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
# endif
# if USE_AZURE_BLOB_STORAGE
factory.registerFunction<TableFunctionIcebergAzure>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store.)",
.examples{{"icebergAzure", "SELECT * FROM icebergAzure(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
# endif
factory.registerFunction<TableFunctionIcebergLocal>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored locally.)",
.examples{{"icebergLocal", "SELECT * FROM icebergLocal(filename)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}
#endif
#if USE_AWS_S3
# if USE_PARQUET
void registerTableFunctionDeltaLake(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionDeltaLake>(
{.documentation
= {.description = R"(The table function can be used to read the DeltaLake table stored on object store.)",
.examples{{"deltaLake", "SELECT * FROM deltaLake(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}
# endif
void registerTableFunctionHudi(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionHudi>(
{.documentation
= {.description = R"(The table function can be used to read the Hudi table stored on object store.)",
.examples{{"hudi", "SELECT * FROM hudi(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}
#endif
void registerDataLakeTableFunctions(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AVRO
registerTableFunctionIceberg(factory);
#endif
#if USE_AWS_S3
# if USE_PARQUET
registerTableFunctionDeltaLake(factory);
# endif
registerTableFunctionHudi(factory);
#endif
}
#if USE_AVRO
# if USE_AWS_S3
template class TableFunctionObjectStorage<IcebergDefinition, StorageS3IcebergConfiguration>;
template class TableFunctionObjectStorage<IcebergS3Definition, StorageS3IcebergConfiguration>;
# endif
# if USE_AZURE_BLOB_STORAGE
template class TableFunctionObjectStorage<IcebergAzureDefinition, StorageAzureIcebergConfiguration>;
# endif
template class TableFunctionObjectStorage<IcebergLocalDefinition, StorageLocalIcebergConfiguration>;
#endif
#if USE_AWS_S3
# if USE_PARQUET
template class TableFunctionObjectStorage<DeltaLakeDefinition, StorageS3DeltaLakeConfiguration>;
# endif
template class TableFunctionObjectStorage<HudiDefinition, StorageS3HudiConfiguration>;
#endif
}

View File

@ -2,6 +2,7 @@
#include <Disks/ObjectStorages/IObjectStorage_fwd.h>
#include <Formats/FormatFactory.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/VirtualColumnUtils.h>
#include <TableFunctions/ITableFunction.h>
@ -61,6 +62,42 @@ struct LocalDefinition
static constexpr auto storage_type_name = "Local";
};
struct IcebergDefinition
{
static constexpr auto name = "iceberg";
static constexpr auto storage_type_name = "S3";
};
struct IcebergS3Definition
{
static constexpr auto name = "icebergS3";
static constexpr auto storage_type_name = "S3";
};
struct IcebergAzureDefinition
{
static constexpr auto name = "icebergAzure";
static constexpr auto storage_type_name = "Azure";
};
struct IcebergLocalDefinition
{
static constexpr auto name = "icebergLocal";
static constexpr auto storage_type_name = "Local";
};
struct DeltaLakeDefinition
{
static constexpr auto name = "deltaLake";
static constexpr auto storage_type_name = "S3";
};
struct HudiDefinition
{
static constexpr auto name = "hudi";
static constexpr auto storage_type_name = "S3";
};
template <typename Definition, typename Configuration>
class TableFunctionObjectStorage : public ITableFunction
{
@ -137,4 +174,22 @@ using TableFunctionHDFS = TableFunctionObjectStorage<HDFSDefinition, StorageHDFS
#endif
using TableFunctionLocal = TableFunctionObjectStorage<LocalDefinition, StorageLocalConfiguration>;
#if USE_AVRO
# if USE_AWS_S3
using TableFunctionIceberg = TableFunctionObjectStorage<IcebergDefinition, StorageS3IcebergConfiguration>;
using TableFunctionIcebergS3 = TableFunctionObjectStorage<IcebergS3Definition, StorageS3IcebergConfiguration>;
# endif
# if USE_AZURE_BLOB_STORAGE
using TableFunctionIcebergAzure = TableFunctionObjectStorage<IcebergAzureDefinition, StorageAzureIcebergConfiguration>;
# endif
using TableFunctionIcebergLocal = TableFunctionObjectStorage<IcebergLocalDefinition, StorageLocalIcebergConfiguration>;
#endif
#if USE_AWS_S3
# if USE_PARQUET
using TableFunctionDeltaLake = TableFunctionObjectStorage<DeltaLakeDefinition, StorageS3DeltaLakeConfiguration>;
# endif
using TableFunctionHudi = TableFunctionObjectStorage<HudiDefinition, StorageS3HudiConfiguration>;
#endif
}

View File

@ -1,88 +0,0 @@
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/ITableFunctionDataLake.h>
namespace DB
{
#if USE_AVRO
void registerTableFunctionIceberg(TableFunctionFactory & factory)
{
# if USE_AWS_S3
factory.registerFunction<TableFunctionIceberg>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store. Alias to icebergS3)",
.examples{{"iceberg", "SELECT * FROM iceberg(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
factory.registerFunction<TableFunctionIcebergS3>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store.)",
.examples{{"icebergS3", "SELECT * FROM icebergS3(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
# endif
# if USE_AZURE_BLOB_STORAGE
factory.registerFunction<TableFunctionIcebergAzure>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store.)",
.examples{{"icebergAzure", "SELECT * FROM icebergAzure(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
# endif
factory.registerFunction<TableFunctionIcebergLocal>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored locally.)",
.examples{{"icebergLocal", "SELECT * FROM icebergLocal(filename)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}
#endif
#if USE_AWS_S3
# if USE_PARQUET
void registerTableFunctionDeltaLake(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionDeltaLake>(
{
.documentation =
{
.description=R"(The table function can be used to read the DeltaLake table stored on object store.)",
.examples{{"deltaLake", "SELECT * FROM deltaLake(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}
},
.allow_readonly = false
});
}
#endif
void registerTableFunctionHudi(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionHudi>(
{
.documentation =
{
.description=R"(The table function can be used to read the Hudi table stored on object store.)",
.examples{{"hudi", "SELECT * FROM hudi(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}
},
.allow_readonly = false
});
}
#endif
void registerDataLakeTableFunctions(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AVRO
registerTableFunctionIceberg(factory);
#endif
#if USE_AWS_S3
# if USE_PARQUET
registerTableFunctionDeltaLake(factory);
#endif
registerTableFunctionHudi(factory);
#endif
}
}