diff --git a/docker/test/integration/runner/requirements.txt b/docker/test/integration/runner/requirements.txt index bb0c4d001e6..9e3f88e1350 100644 --- a/docker/test/integration/runner/requirements.txt +++ b/docker/test/integration/runner/requirements.txt @@ -36,7 +36,7 @@ geomet==0.2.1.post1 grpcio-tools==1.60.0 grpcio==1.60.0 gssapi==1.8.3 -httplib2==0.20.2 +httplib2==0.22.0 idna==3.7 importlib-metadata==4.6.4 iniconfig==2.0.0 @@ -72,7 +72,7 @@ pyarrow==17.0.0 pycparser==2.22 pycryptodome==3.20.0 pymongo==3.11.0 -pyparsing==2.4.7 +pyparsing==3.1.0 pyspark==3.3.2 pyspnego==0.10.2 pytest-order==1.0.0 @@ -101,3 +101,4 @@ wadllib==1.3.6 websocket-client==1.8.0 wheel==0.38.1 zipp==1.0.0 +pyiceberg==0.7.1 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 55228b2d1ec..717b731d985 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -166,6 +166,8 @@ if (TARGET ch_contrib::hdfs) add_headers_and_sources(dbms Disks/ObjectStorages/HDFS) endif() +add_headers_and_sources(dbms Databases/Iceberg) + add_headers_and_sources(dbms Disks/ObjectStorages/Cached) add_headers_and_sources(dbms Disks/ObjectStorages/Local) add_headers_and_sources(dbms Disks/ObjectStorages/Web) diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index d073e477593..33f0e10156f 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -189,6 +189,9 @@ M(BuildVectorSimilarityIndexThreads, "Number of threads in the build vector similarity index thread pool.") \ M(BuildVectorSimilarityIndexThreadsActive, "Number of threads in the build vector similarity index thread pool running a task.") \ M(BuildVectorSimilarityIndexThreadsScheduled, "Number of queued or active jobs in the build vector similarity index thread pool.") \ + M(IcebergCatalogThreads, "Number of threads in the IcebergCatalog thread pool.") \ + M(IcebergCatalogThreadsActive, "Number of threads in the IcebergCatalog thread pool running a task.") \ + M(IcebergCatalogThreadsScheduled, "Number of queued or active jobs in the IcebergCatalog thread pool.") \ \ M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \ M(DiskPlainRewritableAzureFileCount, "Number of file entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \ diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 376ccf6f297..6cda0fefa5e 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -613,6 +613,7 @@ M(733, TABLE_IS_BEING_RESTARTED) \ M(734, CANNOT_WRITE_AFTER_BUFFER_CANCELED) \ M(735, QUERY_WAS_CANCELLED_BY_CLIENT) \ + M(736, ICEBERG_CATALOG_ERROR) \ \ M(900, DISTRIBUTED_CACHE_ERROR) \ M(901, CANNOT_USE_DISTRIBUTED_CACHE) \ diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index cef63039277..8f7ff6f149c 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -280,4 +280,14 @@ IMPLEMENT_SETTING_ENUM( {"StochasticSimple", MergeSelectorAlgorithm::STOCHASTIC_SIMPLE}, {"Trivial", MergeSelectorAlgorithm::TRIVIAL}}) +IMPLEMENT_SETTING_ENUM(DatabaseIcebergCatalogType, ErrorCodes::BAD_ARGUMENTS, + {{"rest", DatabaseIcebergCatalogType::REST}}) + +IMPLEMENT_SETTING_ENUM(DatabaseIcebergStorageType, ErrorCodes::BAD_ARGUMENTS, + {{"s3", DatabaseIcebergStorageType::S3}, + {"azure", DatabaseIcebergStorageType::Azure}, + {"hdfs", DatabaseIcebergStorageType::HDFS}, + {"local", DatabaseIcebergStorageType::Local}, + }) + } diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index 607011b505b..f16e127c026 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -361,4 +361,21 @@ DECLARE_SETTING_ENUM(GroupArrayActionWhenLimitReached) DECLARE_SETTING_ENUM(MergeSelectorAlgorithm) +enum class DatabaseIcebergCatalogType : uint8_t +{ + REST, +}; + +DECLARE_SETTING_ENUM(DatabaseIcebergCatalogType) + +enum class DatabaseIcebergStorageType : uint8_t +{ + S3, + Azure, + Local, + HDFS, +}; + +DECLARE_SETTING_ENUM(DatabaseIcebergStorageType) + } diff --git a/src/Databases/Iceberg/DatabaseIceberg.cpp b/src/Databases/Iceberg/DatabaseIceberg.cpp new file mode 100644 index 00000000000..c672d805e5e --- /dev/null +++ b/src/Databases/Iceberg/DatabaseIceberg.cpp @@ -0,0 +1,365 @@ +#include + +#if USE_AVRO +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include +#include +#include +#include + +namespace CurrentMetrics +{ + extern const Metric IcebergCatalogThreads; + extern const Metric IcebergCatalogThreadsActive; + extern const Metric IcebergCatalogThreadsScheduled; +} + + +namespace DB +{ +namespace DatabaseIcebergSetting +{ + extern const DatabaseIcebergSettingsDatabaseIcebergCatalogType catalog_type; + extern const DatabaseIcebergSettingsDatabaseIcebergStorageType storage_type; + extern const DatabaseIcebergSettingsString warehouse; + extern const DatabaseIcebergSettingsString catalog_credential; + extern const DatabaseIcebergSettingsString auth_header; + extern const DatabaseIcebergSettingsString storage_endpoint; +} + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} + +namespace +{ + /// Parse a string, containing at least one dot, into a two substrings: + /// A.B.C.D.E -> A.B.C.D and E, where + /// `A.B.C.D` is a table "namespace". + /// `E` is a table name. + std::pair parseTableName(const std::string & name) + { + auto pos = name.rfind('.'); + if (pos == std::string::npos) + throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Table cannot have empty namespace: {}", name); + + auto table_name = name.substr(pos + 1); + auto namespace_name = name.substr(0, name.size() - table_name.size() - 1); + return {namespace_name, table_name}; + } +} + +DatabaseIceberg::DatabaseIceberg( + const std::string & database_name_, + const std::string & url_, + const DatabaseIcebergSettings & settings_, + ASTPtr database_engine_definition_, + ContextPtr context_) + : IDatabase(database_name_) + , url(url_) + , settings(settings_) + , database_engine_definition(database_engine_definition_) + , log(getLogger("DatabaseIceberg(" + database_name_ + ")")) +{ + validateSettings(context_); +} + +void DatabaseIceberg::validateSettings(const ContextPtr & context_) +{ + if (settings[DatabaseIcebergSetting::warehouse].value.empty()) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, "`warehouse` setting cannot be empty. " + "Please specify 'SETTINGS warehouse=' in the CREATE DATABASE query"); + } + + if (!settings[DatabaseIcebergSetting::storage_type].changed) + { + auto catalog = getCatalog(context_); + const auto storage_type = catalog->getStorageType(); + if (!storage_type) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, "Storage type is not found in catalog config. " + "Please specify it manually via 'SETTINGS storage_type=' in CREATE DATABASE query"); + } + } +} + +std::shared_ptr DatabaseIceberg::getCatalog(ContextPtr) const +{ + if (catalog_impl) + return catalog_impl; + + switch (settings[DatabaseIcebergSetting::catalog_type].value) + { + case DB::DatabaseIcebergCatalogType::REST: + { + catalog_impl = std::make_shared( + settings[DatabaseIcebergSetting::warehouse].value, + url, + settings[DatabaseIcebergSetting::catalog_credential].value, + settings[DatabaseIcebergSetting::auth_header], + Context::getGlobalContextInstance()); + } + } + return catalog_impl; +} + +std::shared_ptr DatabaseIceberg::getConfiguration() const +{ + switch (settings[DatabaseIcebergSetting::storage_type].value) + { +#if USE_AWS_S3 + case DB::DatabaseIcebergStorageType::S3: + { + return std::make_shared(); + } +#endif +#if USE_AZURE_BLOB_STORAGE + case DB::DatabaseIcebergStorageType::Azure: + { + return std::make_shared(); + } +#endif +#if USE_HDFS + case DB::DatabaseIcebergStorageType::HDFS: + { + return std::make_shared(); + } +#endif + case DB::DatabaseIcebergStorageType::Local: + { + return std::make_shared(); + } +#if !USE_AWS_S3 || !USE_AZURE_BLOB_STORAGE || !USE_HDFS + default: + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Server does not contain support for storage type {}", + settings[DatabaseIcebergSetting::storage_type].value); +#endif + } +} + +std::string DatabaseIceberg::getStorageEndpointForTable(const Iceberg::TableMetadata & table_metadata) const +{ + auto endpoint_from_settings = settings[DatabaseIcebergSetting::storage_endpoint].value; + if (!endpoint_from_settings.empty()) + { + return std::filesystem::path(endpoint_from_settings) + / table_metadata.getLocation(/* path_only */true) + / ""; + } + else + { + return std::filesystem::path(table_metadata.getLocation(/* path_only */false)) / ""; + } +} + +bool DatabaseIceberg::empty() const +{ + return getCatalog(Context::getGlobalContextInstance())->empty(); +} + +bool DatabaseIceberg::isTableExist(const String & name, ContextPtr context_) const +{ + const auto [namespace_name, table_name] = parseTableName(name); + return getCatalog(context_)->existsTable(namespace_name, table_name); +} + +StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_) const +{ + auto catalog = getCatalog(context_); + auto table_metadata = Iceberg::TableMetadata().withLocation().withSchema(); + auto [namespace_name, table_name] = parseTableName(name); + + if (!catalog->tryGetTableMetadata(namespace_name, table_name, table_metadata)) + return nullptr; + + /// Take database engine definition AST as base. + ASTStorage * storage = database_engine_definition->as(); + ASTs args = storage->engine->arguments->children; + + /// Replace Iceberg Catalog endpoint with storage path endpoint of requested table. + auto table_endpoint = getStorageEndpointForTable(table_metadata); + args[0] = std::make_shared(table_endpoint); + + LOG_TEST(log, "Using table endpoint: {}", table_endpoint); + + const auto columns = ColumnsDescription(table_metadata.getSchema()); + const auto configuration = getConfiguration(); + + /// with_table_structure = false: because there will be + /// no table structure in table definition AST. + StorageObjectStorage::Configuration::initialize(*configuration, args, context_, /* with_table_structure */false); + + return std::make_shared( + configuration, + configuration->createObjectStorage(context_, /* is_readonly */ false), + context_, + StorageID(getDatabaseName(), name), + /* columns */columns, + /* constraints */ConstraintsDescription{}, + /* comment */"", + getFormatSettings(context_), + LoadingStrictnessLevel::CREATE, + /* distributed_processing */false, + /* partition_by */nullptr, + /* lazy_init */true); +} + +DatabaseTablesIteratorPtr DatabaseIceberg::getTablesIterator( + ContextPtr context_, + const FilterByNameFunction & filter_by_table_name, + bool /* skip_not_loaded */) const +{ + Tables tables; + auto catalog = getCatalog(context_); + + auto iceberg_tables = catalog->getTables(); + size_t num_threads = std::min(10, iceberg_tables.size()); + ThreadPool pool( + CurrentMetrics::IcebergCatalogThreads, + CurrentMetrics::IcebergCatalogThreadsActive, + CurrentMetrics::IcebergCatalogThreadsScheduled, + num_threads); + + DB::ThreadPoolCallbackRunnerLocal runner(pool, "RestCatalog"); + std::mutex mutexx; + + for (const auto & table_name : iceberg_tables) + { + if (filter_by_table_name && !filter_by_table_name(table_name)) + continue; + + runner([&]{ + auto storage = tryGetTable(table_name, context_); + { + std::lock_guard lock(mutexx); + [[maybe_unused]] bool inserted = tables.emplace(table_name, storage).second; + chassert(inserted); + } + }); + } + + runner.waitForAllToFinishAndRethrowFirstError(); + return std::make_unique(tables, getDatabaseName()); +} + +ASTPtr DatabaseIceberg::getCreateDatabaseQuery() const +{ + const auto & create_query = std::make_shared(); + create_query->setDatabase(getDatabaseName()); + create_query->set(create_query->storage, database_engine_definition); + return create_query; +} + +ASTPtr DatabaseIceberg::getCreateTableQueryImpl( + const String & name, + ContextPtr context_, + bool /* throw_on_error */) const +{ + auto catalog = getCatalog(context_); + auto table_metadata = Iceberg::TableMetadata().withLocation().withSchema(); + + const auto [namespace_name, table_name] = parseTableName(name); + catalog->getTableMetadata(namespace_name, table_name, table_metadata); + + auto create_table_query = std::make_shared(); + auto table_storage_define = database_engine_definition->clone(); + + auto * storage = table_storage_define->as(); + storage->engine->kind = ASTFunction::Kind::TABLE_ENGINE; + storage->settings = {}; + + create_table_query->set(create_table_query->storage, table_storage_define); + + auto columns_declare_list = std::make_shared(); + auto columns_expression_list = std::make_shared(); + + columns_declare_list->set(columns_declare_list->columns, columns_expression_list); + create_table_query->set(create_table_query->columns_list, columns_declare_list); + + create_table_query->setTable(name); + create_table_query->setDatabase(getDatabaseName()); + + for (const auto & column_type_and_name : table_metadata.getSchema()) + { + const auto column_declaration = std::make_shared(); + column_declaration->name = column_type_and_name.name; + column_declaration->type = makeASTDataType(column_type_and_name.type->getName()); + columns_expression_list->children.emplace_back(column_declaration); + } + + auto storage_engine_arguments = storage->engine->arguments; + if (storage_engine_arguments->children.empty()) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, "Unexpected number of arguments: {}", + storage_engine_arguments->children.size()); + } + + auto table_endpoint = getStorageEndpointForTable(table_metadata); + storage_engine_arguments->children[0] = std::make_shared(table_endpoint); + + return create_table_query; +} + +void registerDatabaseIceberg(DatabaseFactory & factory) +{ + auto create_fn = [](const DatabaseFactory::Arguments & args) + { + const auto * database_engine_define = args.create_query.storage; + const auto & database_engine_name = args.engine_name; + + const ASTFunction * function_define = database_engine_define->engine; + if (!function_define->arguments) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name); + + ASTs & engine_args = function_define->arguments->children; + if (engine_args.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name); + + const size_t max_args_num = 3; + if (engine_args.size() != max_args_num) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine must have {} arguments", max_args_num); + + for (auto & engine_arg : engine_args) + engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context); + + const auto url = engine_args[0]->as()->value.safeGet(); + + DatabaseIcebergSettings database_settings; + if (database_engine_define->settings) + database_settings.loadFromQuery(*database_engine_define); + + return std::make_shared( + args.database_name, + url, + database_settings, + database_engine_define->clone(), + args.context); + }; + factory.registerDatabase("Iceberg", create_fn, { .supports_arguments = true, .supports_settings = true }); +} + +} + +#endif diff --git a/src/Databases/Iceberg/DatabaseIceberg.h b/src/Databases/Iceberg/DatabaseIceberg.h new file mode 100644 index 00000000000..145f033ad6a --- /dev/null +++ b/src/Databases/Iceberg/DatabaseIceberg.h @@ -0,0 +1,71 @@ +#pragma once +#include "config.h" + +#if USE_AVRO +#include +#include +#include +#include +#include + +namespace DB +{ + +/// TODO: +/// - auth: oauth, bearer token? +/// - tests with azure, hdfs, local + +class DatabaseIceberg final : public IDatabase, WithContext +{ +public: + explicit DatabaseIceberg( + const std::string & database_name_, + const std::string & url_, + const DatabaseIcebergSettings & settings_, + ASTPtr database_engine_definition_, + ContextPtr context_); + + String getEngineName() const override { return "Iceberg"; } + + bool canContainMergeTreeTables() const override { return false; } + bool canContainDistributedTables() const override { return false; } + bool shouldBeEmptyOnDetach() const override { return false; } + + bool empty() const override; + + bool isTableExist(const String & name, ContextPtr context) const override; + StoragePtr tryGetTable(const String & name, ContextPtr context) const override; + + DatabaseTablesIteratorPtr getTablesIterator( + ContextPtr context, + const FilterByNameFunction & filter_by_table_name, + bool skip_not_loaded) const override; + + void shutdown() override {} + + ASTPtr getCreateDatabaseQuery() const override; + +protected: + ASTPtr getCreateTableQueryImpl(const String & table_name, ContextPtr context, bool throw_on_error) const override; + +private: + /// Iceberg Catalog url. + const std::string url; + /// SETTINGS from CREATE query. + const DatabaseIcebergSettings settings; + /// Database engine definition taken from initial CREATE DATABASE query. + const ASTPtr database_engine_definition; + const LoggerPtr log; + /// Crendetials to authenticate Iceberg Catalog. + Poco::Net::HTTPBasicCredentials credentials; + + mutable std::shared_ptr catalog_impl; + + void validateSettings(const ContextPtr & context_); + std::shared_ptr getCatalog(ContextPtr context_) const; + std::shared_ptr getConfiguration() const; + std::string getStorageEndpointForTable(const Iceberg::TableMetadata & table_metadata) const; +}; + +} +#endif diff --git a/src/Databases/Iceberg/DatabaseIcebergSettings.cpp b/src/Databases/Iceberg/DatabaseIcebergSettings.cpp new file mode 100644 index 00000000000..1dc1ba1f61c --- /dev/null +++ b/src/Databases/Iceberg/DatabaseIcebergSettings.cpp @@ -0,0 +1,86 @@ +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_SETTING; +} + +#define DATABASE_ICEBERG_RELATED_SETTINGS(DECLARE, ALIAS) \ + DECLARE(DatabaseIcebergCatalogType, catalog_type, DatabaseIcebergCatalogType::REST, "Catalog type", 0) \ + DECLARE(DatabaseIcebergStorageType, storage_type, DatabaseIcebergStorageType::S3, "Storage type: S3, Local, Azure, HDFS", 0) \ + DECLARE(String, catalog_credential, "", "", 0) \ + DECLARE(String, warehouse, "", "Warehouse name inside the catalog", 0) \ + DECLARE(String, auth_header, "", "Authorization header of format 'Authorization: '", 0) \ + DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \ + +#define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \ + DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS) + +DECLARE_SETTINGS_TRAITS(DatabaseIcebergSettingsTraits, LIST_OF_DATABASE_ICEBERG_SETTINGS) +IMPLEMENT_SETTINGS_TRAITS(DatabaseIcebergSettingsTraits, LIST_OF_DATABASE_ICEBERG_SETTINGS) + +struct DatabaseIcebergSettingsImpl : public BaseSettings +{ +}; + +#define INITIALIZE_SETTING_EXTERN(TYPE, NAME, DEFAULT, DESCRIPTION, FLAGS) \ + DatabaseIcebergSettings##TYPE NAME = &DatabaseIcebergSettingsImpl ::NAME; + +namespace DatabaseIcebergSetting +{ +LIST_OF_DATABASE_ICEBERG_SETTINGS(INITIALIZE_SETTING_EXTERN, SKIP_ALIAS) +} + +#undef INITIALIZE_SETTING_EXTERN + +DatabaseIcebergSettings::DatabaseIcebergSettings() : impl(std::make_unique()) +{ +} + +DatabaseIcebergSettings::DatabaseIcebergSettings(const DatabaseIcebergSettings & settings) + : impl(std::make_unique(*settings.impl)) +{ +} + +DatabaseIcebergSettings::DatabaseIcebergSettings(DatabaseIcebergSettings && settings) noexcept + : impl(std::make_unique(std::move(*settings.impl))) +{ +} + +DatabaseIcebergSettings::~DatabaseIcebergSettings() = default; + +DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseIcebergSettings, IMPLEMENT_SETTING_SUBSCRIPT_OPERATOR) + + +void DatabaseIcebergSettings::applyChanges(const SettingsChanges & changes) +{ + impl->applyChanges(changes); +} + +void DatabaseIcebergSettings::loadFromQuery(const ASTStorage & storage_def) +{ + if (storage_def.settings) + { + try + { + impl->applyChanges(storage_def.settings->changes); + } + catch (Exception & e) + { + if (e.code() == ErrorCodes::UNKNOWN_SETTING) + e.addMessage("for database engine " + storage_def.engine->name); + throw; + } + } +} + +} diff --git a/src/Databases/Iceberg/DatabaseIcebergSettings.h b/src/Databases/Iceberg/DatabaseIcebergSettings.h new file mode 100644 index 00000000000..5d9d120efed --- /dev/null +++ b/src/Databases/Iceberg/DatabaseIcebergSettings.h @@ -0,0 +1,40 @@ +#pragma once + +#include +#include +#include +#include + + +namespace DB +{ +class ASTStorage; +struct DatabaseIcebergSettingsImpl; +class SettingsChanges; + +/// List of available types supported in DatabaseIcebergSettings object +#define DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \ + M(CLASS_NAME, String) \ + M(CLASS_NAME, UInt64) \ + M(CLASS_NAME, DatabaseIcebergCatalogType) \ + M(CLASS_NAME, DatabaseIcebergStorageType) + +DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseIcebergSettings, DECLARE_SETTING_TRAIT) + +struct DatabaseIcebergSettings +{ + DatabaseIcebergSettings(); + DatabaseIcebergSettings(const DatabaseIcebergSettings & settings); + DatabaseIcebergSettings(DatabaseIcebergSettings && settings) noexcept; + ~DatabaseIcebergSettings(); + + DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseIcebergSettings, DECLARE_SETTING_SUBSCRIPT_OPERATOR) + + void loadFromQuery(const ASTStorage & storage_def); + + void applyChanges(const SettingsChanges & changes); + +private: + std::unique_ptr impl; +}; +} diff --git a/src/Databases/Iceberg/ICatalog.cpp b/src/Databases/Iceberg/ICatalog.cpp new file mode 100644 index 00000000000..6b6ecc85daf --- /dev/null +++ b/src/Databases/Iceberg/ICatalog.cpp @@ -0,0 +1,86 @@ +#include +#include +#include +#include + +namespace DB::ErrorCodes +{ + extern const int NOT_IMPLEMENTED; + extern const int LOGICAL_ERROR; +} + +namespace Iceberg +{ + +void TableMetadata::setLocation(const std::string & location_) +{ + if (!with_location) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data location was not requested"); + + /// Location has format: + /// s3:///path/to/table/data. + /// We want to split s3:// and path/to/table/data. + + auto pos = location_.find("://"); + if (pos == std::string::npos) + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_); + + auto pos_to_bucket = pos + std::strlen("://"); + auto pos_to_path = location_.substr(pos_to_bucket).find('/'); + + if (pos_to_path == std::string::npos) + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_); + + pos_to_path = pos_to_bucket + pos_to_path; + + location_without_path = location_.substr(0, pos_to_path); + path = location_.substr(pos_to_path + 1); + + LOG_TEST(getLogger("TableMetadata"), + "Parsed location without path: {}, path: {}", + location_without_path, path); +} + +std::string TableMetadata::getLocation(bool path_only) const +{ + if (!with_location) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data location was not requested"); + + if (path_only) + return path; + else + return std::filesystem::path(location_without_path) / path; +} + +void TableMetadata::setSchema(const DB::NamesAndTypesList & schema_) +{ + if (!with_schema) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data schema was not requested"); + + schema = schema_; +} + +const DB::NamesAndTypesList & TableMetadata::getSchema() const +{ + if (!with_schema) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data schema was not requested"); + + return schema; +} + +StorageType ICatalog::getStorageType(const std::string & location) +{ + auto pos = location.find("://"); + if (pos == std::string::npos) + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected path format: {}", location); + + auto storage_type_str = location.substr(0, pos); + auto storage_type = magic_enum::enum_cast(Poco::toUpper(storage_type_str)); + + if (!storage_type) + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unsupported storage type: {}", storage_type_str); + + return *storage_type; +} + +} diff --git a/src/Databases/Iceberg/ICatalog.h b/src/Databases/Iceberg/ICatalog.h new file mode 100644 index 00000000000..4c61c87998d --- /dev/null +++ b/src/Databases/Iceberg/ICatalog.h @@ -0,0 +1,78 @@ +#pragma once +#include +#include +#include + +namespace Iceberg +{ +using StorageType = DB::DatabaseIcebergStorageType; + +class TableMetadata +{ +public: + TableMetadata() = default; + + TableMetadata & withLocation() { with_location = true; return *this; } + TableMetadata & withSchema() { with_schema = true; return *this; } + + std::string getLocation(bool path_only) const; + std::string getLocation() const; + std::string getLocationWithoutPath() const; + + const DB::NamesAndTypesList & getSchema() const; + + bool requiresLocation() const { return with_location; } + bool requiresSchema() const { return with_schema; } + + void setLocation(const std::string & location_); + void setSchema(const DB::NamesAndTypesList & schema_); + +private: + /// starts with s3://, file://, etc + std::string location_without_path; + std::string path; + /// column names and types + DB::NamesAndTypesList schema; + + bool with_location = false; + bool with_schema = false; +}; + + +class ICatalog +{ +public: + using Namespaces = std::vector; + using Tables = std::vector; + + explicit ICatalog(const std::string & warehouse_) : warehouse(warehouse_) {} + + virtual ~ICatalog() = default; + + virtual bool empty() const = 0; + + virtual Tables getTables() const = 0; + + virtual bool existsTable( + const std::string & namespace_naem, + const std::string & table_name) const = 0; + + virtual void getTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + TableMetadata & result) const = 0; + + virtual bool tryGetTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + TableMetadata & result) const = 0; + + virtual std::optional getStorageType() const = 0; + +protected: + const std::string warehouse; + + static StorageType getStorageType(const std::string & location); +}; + +} diff --git a/src/Databases/Iceberg/RestCatalog.cpp b/src/Databases/Iceberg/RestCatalog.cpp new file mode 100644 index 00000000000..a34aa45b812 --- /dev/null +++ b/src/Databases/Iceberg/RestCatalog.cpp @@ -0,0 +1,540 @@ +#include + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +namespace DB::ErrorCodes +{ + extern const int ICEBERG_CATALOG_ERROR; + extern const int LOGICAL_ERROR; + extern const int BAD_ARGUMENTS; +} + +namespace CurrentMetrics +{ + extern const Metric IcebergCatalogThreads; + extern const Metric IcebergCatalogThreadsActive; + extern const Metric IcebergCatalogThreadsScheduled; +} + +namespace Iceberg +{ + +static constexpr auto config_endpoint = "config"; +static constexpr auto namespaces_endpoint = "namespaces"; + +namespace +{ + +std::pair parseCatalogCredential(const std::string & catalog_credential) +{ + /// Parse a string of format ":" + /// into separare strings client_id and client_secret. + + std::string client_id, client_secret; + if (!catalog_credential.empty()) + { + auto pos = catalog_credential.find(':'); + if (pos == std::string::npos) + { + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected format of catalog credential: " + "expected client_id and client_secret separated by `:`"); + } + client_id = catalog_credential.substr(0, pos); + client_secret = catalog_credential.substr(pos + 1); + } + return std::pair(client_id, client_secret); +} + +DB::HTTPHeaderEntry parseAuthHeader(const std::string & auth_header) +{ + /// Parse a string of format "Authorization: " + /// into a key-value header "Authorization", " " + + auto pos = auth_header.find(':'); + if (pos == std::string::npos) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected format of auth header"); + + return DB::HTTPHeaderEntry(auth_header.substr(0, pos), auth_header.substr(pos + 1)); +} + +} + +std::string RestCatalog::Config::toString() const +{ + DB::WriteBufferFromOwnString wb; + + if (!prefix.empty()) + wb << "prefix: " << prefix.string() << ", "; + + if (!default_base_location.empty()) + wb << "default_base_location: " << default_base_location << ", "; + + return wb.str(); +} + +RestCatalog::RestCatalog( + const std::string & warehouse_, + const std::string & base_url_, + const std::string & catalog_credential_, + const std::string & auth_header_, + DB::ContextPtr context_) + : ICatalog(warehouse_) + , DB::WithContext(context_) + , base_url(base_url_) + , log(getLogger("RestCatalog(" + warehouse_ + ")")) +{ + if (!catalog_credential_.empty()) + std::tie(client_id, client_secret) = parseCatalogCredential(catalog_credential_); + else if (!auth_header_.empty()) + auth_header = parseAuthHeader(auth_header_); + + config = loadConfig(); +} + +RestCatalog::Config RestCatalog::loadConfig() +{ + Poco::URI::QueryParameters params = {{"warehouse", warehouse}}; + auto buf = createReadBuffer(config_endpoint, params); + + std::string json_str; + readJSONObjectPossiblyInvalid(json_str, *buf); + + LOG_TEST(log, "Received catalog configuration settings: {}", json_str); + + Poco::JSON::Parser parser; + Poco::Dynamic::Var json = parser.parse(json_str); + const Poco::JSON::Object::Ptr & object = json.extract(); + + Config result; + + auto defaults_object = object->get("defaults").extract(); + parseCatalogConfigurationSettings(defaults_object, result); + + auto overrides_object = object->get("overrides").extract(); + parseCatalogConfigurationSettings(overrides_object, result); + + LOG_TEST(log, "Parsed catalog configuration settings: {}", result.toString()); + return result; +} + +void RestCatalog::parseCatalogConfigurationSettings(const Poco::JSON::Object::Ptr & object, Config & result) +{ + if (!object) + return; + + if (object->has("prefix")) + result.prefix = object->get("prefix").extract(); + + if (object->has("default-base-location")) + result.default_base_location = object->get("default-base-location").extract(); +} + +DB::HTTPHeaderEntries RestCatalog::getHeaders(bool update_token) const +{ + if (auth_header.has_value()) + { + return DB::HTTPHeaderEntries{auth_header.value()}; + } + + if (!client_id.empty()) + { + if (!access_token.has_value() || update_token) + { + access_token = retrieveAccessToken(); + } + + DB::HTTPHeaderEntries headers; + headers.emplace_back("Authorization", "Bearer " + access_token.value()); + return headers; + } + + return {}; +} + +std::string RestCatalog::retrieveAccessToken() const +{ + static constexpr auto oauth_tokens_endpoint = "oauth/tokens"; + + /// TODO: + /// 1. support oauth2-server-uri + /// https://github.com/apache/iceberg/blob/918f81f3c3f498f46afcea17c1ac9cdc6913cb5c/open-api/rest-catalog-open-api.yaml#L183C82-L183C99 + + Poco::JSON::Object json; + json.set("grant_type", "client_credentials"); + json.set("scope", "PRINCIPAL_ROLE:ALL"); /// TODO: add it into setting. + json.set("client_id", client_id); + json.set("client_secret", client_secret); + + DB::HTTPHeaderEntries headers; + headers.emplace_back("Content-Type", "application/x-www-form-urlencoded"); + headers.emplace_back("Accepts", "application/json; charset=UTF-8"); + + Poco::URI url(base_url / oauth_tokens_endpoint); + Poco::URI::QueryParameters params = { + {"grant_type", "client_credentials"}, + {"scope", "PRINCIPAL_ROLE:ALL"}, + {"client_id", client_id}, + {"client_secret", client_secret}, + }; + url.setQueryParameters(params); + + const auto & context = getContext(); + auto wb = DB::BuilderRWBufferFromHTTP(url) + .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) + .withMethod(Poco::Net::HTTPRequest::HTTP_POST) + .withSettings(context->getReadSettings()) + .withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings())) + .withHostFilter(&context->getRemoteHostFilter()) + .withSkipNotFound(false) + .withHeaders(headers) + .create(credentials); + + std::string json_str; + readJSONObjectPossiblyInvalid(json_str, *wb); + + Poco::JSON::Parser parser; + Poco::Dynamic::Var res_json = parser.parse(json_str); + const Poco::JSON::Object::Ptr & object = res_json.extract(); + + return object->get("access_token").extract(); +} + +std::optional RestCatalog::getStorageType() const +{ + if (config.default_base_location.empty()) + return std::nullopt; + return ICatalog::getStorageType(config.default_base_location); +} + +DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer( + const std::string & endpoint, + const Poco::URI::QueryParameters & params) const +{ + const auto & context = getContext(); + + Poco::URI url(base_url / endpoint); + if (!params.empty()) + url.setQueryParameters(params); + + auto headers = getHeaders(false); + + LOG_TEST(log, "Requesting: {}", url.toString()); + + try + { + return DB::BuilderRWBufferFromHTTP(url) + .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) + .withSettings(getContext()->getReadSettings()) + .withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings())) + .withHeaders(headers) + .withHostFilter(&getContext()->getRemoteHostFilter()) + .withDelayInit(false) + .withSkipNotFound(false) + .create(credentials); + } + catch (...) + { + auto new_headers = getHeaders(true); + return DB::BuilderRWBufferFromHTTP(url) + .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) + .withSettings(getContext()->getReadSettings()) + .withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings())) + .withHeaders(new_headers) + .withHostFilter(&getContext()->getRemoteHostFilter()) + .withDelayInit(false) + .withSkipNotFound(false) + .create(credentials); + } +} + +bool RestCatalog::empty() const +{ + try + { + bool found_table = false; + auto stop_condition = [&](const std::string & namespace_name) -> bool + { + const auto tables = getTables(namespace_name, /* limit */1); + found_table = !tables.empty(); + return found_table; + }; + + Namespaces namespaces; + getNamespacesRecursive("", namespaces, stop_condition, {}); + + return found_table; + } + catch (...) + { + DB::tryLogCurrentException(log); + return true; + } +} + +RestCatalog::Tables RestCatalog::getTables() const +{ + size_t num_threads = 10; + ThreadPool pool( + CurrentMetrics::IcebergCatalogThreads, + CurrentMetrics::IcebergCatalogThreadsActive, + CurrentMetrics::IcebergCatalogThreadsScheduled, + num_threads); + + DB::ThreadPoolCallbackRunnerLocal runner(pool, "RestCatalog"); + + Tables tables; + std::mutex mutex; + + auto func = [&](const std::string & current_namespace) + { + runner( + [&]{ + auto tables_in_namespace = getTables(current_namespace); + { + std::lock_guard lock(mutex); + std::move(tables_in_namespace.begin(), tables_in_namespace.end(), std::back_inserter(tables)); + } + }); + }; + + Namespaces namespaces; + getNamespacesRecursive("", namespaces, {}, func); + + runner.waitForAllToFinishAndRethrowFirstError(); + return tables; +} + +void RestCatalog::getNamespacesRecursive( + const std::string & base_namespace, + Namespaces & result, + StopCondition stop_condition, + ExecuteFunc func) const +{ + auto namespaces = getNamespaces(base_namespace); + result.reserve(result.size() + namespaces.size()); + result.insert(result.end(), namespaces.begin(), namespaces.end()); + + for (const auto & current_namespace : namespaces) + { + chassert(current_namespace.starts_with(base_namespace)); + + if (stop_condition && stop_condition(current_namespace)) + break; + + if (func) + func(current_namespace); + + getNamespacesRecursive(current_namespace, result, stop_condition, func); + } +} + +Poco::URI::QueryParameters RestCatalog::createParentNamespaceParams(const std::string & base_namespace) const +{ + std::vector parts; + splitInto<'.'>(parts, base_namespace); + std::string parent_param; + for (const auto & part : parts) + { + if (!parent_param.empty()) + parent_param += static_cast(0x1F); + parent_param += part; + } + return {{"parent", parent_param}}; +} + +RestCatalog::Namespaces RestCatalog::getNamespaces(const std::string & base_namespace) const +{ + Poco::URI::QueryParameters params; + if (!base_namespace.empty()) + params = createParentNamespaceParams(base_namespace); + + try + { + auto buf = createReadBuffer(config.prefix / namespaces_endpoint, params); + auto namespaces = parseNamespaces(*buf, base_namespace); + LOG_TEST(log, "Loaded {} namespaces", namespaces.size()); + return namespaces; + } + catch (const DB::HTTPException & e) + { + std::string message = fmt::format( + "Received error while fetching list of namespaces from iceberg catalog `{}`. ", + warehouse); + + if (e.code() == Poco::Net::HTTPResponse::HTTPStatus::HTTP_NOT_FOUND) + message += "Namespace provided in the `parent` query parameter is not found. "; + + message += fmt::format( + "Code: {}, status: {}, message: {}", + e.code(), e.getHTTPStatus(), e.displayText()); + + throw DB::Exception(DB::ErrorCodes::ICEBERG_CATALOG_ERROR, "{}", message); + } +} + +RestCatalog::Namespaces RestCatalog::parseNamespaces(DB::ReadBuffer & buf, const std::string & base_namespace) const +{ + if (buf.eof()) + return {}; + + String json_str; + readJSONObjectPossiblyInvalid(json_str, buf); + + LOG_TEST(log, "Received response: {}", json_str); + + Poco::JSON::Parser parser; + Poco::Dynamic::Var json = parser.parse(json_str); + const Poco::JSON::Object::Ptr & object = json.extract(); + + auto namespaces_object = object->get("namespaces").extract(); + if (!namespaces_object) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse result"); + + Namespaces namespaces; + for (size_t i = 0; i < namespaces_object->size(); ++i) + { + auto current_namespace_array = namespaces_object->get(static_cast(i)).extract(); + if (current_namespace_array->size() == 0) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Expected namespace array to be non-empty"); + + const int idx = static_cast(current_namespace_array->size()) - 1; + const auto current_namespace = current_namespace_array->get(idx).extract(); + const auto full_namespace = base_namespace.empty() + ? current_namespace + : base_namespace + "." + current_namespace; + + namespaces.push_back(full_namespace); + } + + return namespaces; +} + +RestCatalog::Tables RestCatalog::getTables(const std::string & base_namespace, size_t limit) const +{ + const auto endpoint = std::string(namespaces_endpoint) + "/" + base_namespace + "/tables"; + auto buf = createReadBuffer(config.prefix / endpoint); + return parseTables(*buf, base_namespace, limit); +} + +RestCatalog::Tables RestCatalog::parseTables(DB::ReadBuffer & buf, const std::string & base_namespace, size_t limit) const +{ + if (buf.eof()) + return {}; + + String json_str; + readJSONObjectPossiblyInvalid(json_str, buf); + + Poco::JSON::Parser parser; + Poco::Dynamic::Var json = parser.parse(json_str); + const Poco::JSON::Object::Ptr & object = json.extract(); + + auto identifiers_object = object->get("identifiers").extract(); + if (!identifiers_object) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse result"); + + Tables tables; + for (size_t i = 0; i < identifiers_object->size(); ++i) + { + const auto current_table_json = identifiers_object->get(static_cast(i)).extract(); + const auto table_name = current_table_json->get("name").extract(); + + tables.push_back(base_namespace + "." + table_name); + if (limit && tables.size() >= limit) + break; + } + return tables; +} + +bool RestCatalog::existsTable(const std::string & namespace_name, const std::string & table_name) const +{ + TableMetadata table_metadata; + return tryGetTableMetadata(namespace_name, table_name, table_metadata); +} + +bool RestCatalog::tryGetTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + TableMetadata & result) const +{ + try + { + return getTableMetadataImpl(namespace_name, table_name, result); + } + catch (...) + { + DB::tryLogCurrentException(log); + return false; + } +} + +void RestCatalog::getTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + TableMetadata & result) const +{ + if (!getTableMetadataImpl(namespace_name, table_name, result)) + throw DB::Exception(DB::ErrorCodes::ICEBERG_CATALOG_ERROR, "No response from iceberg catalog"); +} + +bool RestCatalog::getTableMetadataImpl( + const std::string & namespace_name, + const std::string & table_name, + TableMetadata & result) const +{ + LOG_TEST(log, "Checking table {} in namespace {}", table_name, namespace_name); + + const auto endpoint = std::string(namespaces_endpoint) + "/" + namespace_name + "/tables/" + table_name; + auto buf = createReadBuffer(config.prefix / endpoint); + + if (buf->eof()) + { + LOG_TEST(log, "Table doesn't exist (endpoint: {})", endpoint); + return false; + } + + String json_str; + readJSONObjectPossiblyInvalid(json_str, *buf); + + LOG_TEST(log, "Received metadata for table {}: {}", table_name, json_str); + + Poco::JSON::Parser parser; + Poco::Dynamic::Var json = parser.parse(json_str); + const Poco::JSON::Object::Ptr & object = json.extract(); + + auto metadata_object = object->get("metadata").extract(); + if (!metadata_object) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse result"); + + if (result.requiresLocation()) + { + const auto location = metadata_object->get("location").extract(); + result.setLocation(location); + LOG_TEST(log, "Location for table {}: {}", table_name, location); + } + + if (result.requiresSchema()) + { + int format_version = metadata_object->getValue("format-version"); + result.setSchema(DB::IcebergMetadata::parseTableSchema(metadata_object, format_version, true).first); + } + return true; +} + +} diff --git a/src/Databases/Iceberg/RestCatalog.h b/src/Databases/Iceberg/RestCatalog.h new file mode 100644 index 00000000000..402e761b07a --- /dev/null +++ b/src/Databases/Iceberg/RestCatalog.h @@ -0,0 +1,111 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +class ReadBuffer; +} + +namespace Iceberg +{ + +class RestCatalog final : public ICatalog, private DB::WithContext +{ +public: + explicit RestCatalog( + const std::string & warehouse_, + const std::string & base_url_, + const std::string & catalog_credential_, + const std::string & auth_header_, + DB::ContextPtr context_); + + ~RestCatalog() override = default; + + bool empty() const override; + + Tables getTables() const override; + + bool existsTable(const std::string & namespace_name, const std::string & table_name) const override; + + void getTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + TableMetadata & result) const override; + + bool tryGetTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + TableMetadata & result) const override; + + std::optional getStorageType() const override; + +private: + struct Config + { + /// Prefix is a path of the catalog endpoint, + /// e.g. /v1/{prefix}/namespaces/{namespace}/tables/{table} + std::filesystem::path prefix; + /// Base location is location of data in storage + /// (in filesystem or object storage). + std::string default_base_location; + + std::string toString() const; + }; + + const std::filesystem::path base_url; + const LoggerPtr log; + + /// Catalog configuration settings from /v1/config endpoint. + Config config; + + /// Auth headers of format: "Authorization": " " + std::optional auth_header; + + /// Parameters for OAuth. + std::string client_id; + std::string client_secret; + mutable std::optional access_token; + + Poco::Net::HTTPBasicCredentials credentials{}; + + DB::ReadWriteBufferFromHTTPPtr createReadBuffer( + const std::string & endpoint, + const Poco::URI::QueryParameters & params = {}) const; + + Poco::URI::QueryParameters createParentNamespaceParams(const std::string & base_namespace) const; + + using StopCondition = std::function; + using ExecuteFunc = std::function; + + void getNamespacesRecursive( + const std::string & base_namespace, + Namespaces & result, + StopCondition stop_condition, + ExecuteFunc func) const; + + Namespaces getNamespaces(const std::string & base_namespace) const; + + Namespaces parseNamespaces(DB::ReadBuffer & buf, const std::string & base_namespace) const; + + Tables getTables(const std::string & base_namespace, size_t limit = 0) const; + + Tables parseTables(DB::ReadBuffer & buf, const std::string & base_namespace, size_t limit) const; + + bool getTableMetadataImpl( + const std::string & namespace_name, + const std::string & table_name, + TableMetadata & result) const; + + Config loadConfig(); + std::string retrieveAccessToken() const; + DB::HTTPHeaderEntries getHeaders(bool update_token = false) const; + static void parseCatalogConfigurationSettings(const Poco::JSON::Object::Ptr & object, Config & result); +}; + +} diff --git a/src/Databases/registerDatabases.cpp b/src/Databases/registerDatabases.cpp index 4f7c229bdf4..8daba254891 100644 --- a/src/Databases/registerDatabases.cpp +++ b/src/Databases/registerDatabases.cpp @@ -36,6 +36,10 @@ void registerDatabaseS3(DatabaseFactory & factory); void registerDatabaseHDFS(DatabaseFactory & factory); #endif +#if USE_AVRO +void registerDatabaseIceberg(DatabaseFactory & factory); +#endif + void registerDatabases() { auto & factory = DatabaseFactory::instance(); @@ -68,5 +72,9 @@ void registerDatabases() #if USE_HDFS registerDatabaseHDFS(factory); #endif + +#if USE_AVRO + registerDatabaseIceberg(factory); +#endif } } diff --git a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp index f0a80a41d4e..3be43daec09 100644 --- a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp @@ -70,9 +70,6 @@ IcebergMetadata::IcebergMetadata( { } -namespace -{ - enum class ManifestEntryStatus : uint8_t { EXISTING = 0, @@ -248,7 +245,7 @@ DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & t } -std::pair parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, bool ignore_schema_evolution) +std::pair IcebergMetadata::parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, bool ignore_schema_evolution) { Poco::JSON::Object::Ptr schema; Int32 current_schema_id; @@ -313,9 +310,9 @@ std::pair parseTableSchema(const Poco::JSON::Object::P for (size_t i = 0; i != fields->size(); ++i) { auto field = fields->getObject(static_cast(i)); - auto name = field->getValue("name"); + auto column_name = field->getValue("name"); bool required = field->getValue("required"); - names_and_types.push_back({name, getFieldType(field, "type", required)}); + names_and_types.push_back({column_name, getFieldType(field, "type", required)}); } return {std::move(names_and_types), current_schema_id}; @@ -380,8 +377,6 @@ std::pair getMetadataFileAndVersion( return *std::max_element(metadata_files_with_versions.begin(), metadata_files_with_versions.end()); } -} - DataLakeMetadataPtr IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context) { diff --git a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h index eb5cac591f2..b28de471b40 100644 --- a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h @@ -96,6 +96,11 @@ public: static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context); + static std::pair parseTableSchema( + const Poco::JSON::Object::Ptr & metadata_object, + int format_version, + bool ignore_schema_evolution); + private: size_t getVersion() const { return metadata_version; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 5e9ce8dce28..b8bc82cf083 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -84,7 +84,8 @@ StorageObjectStorage::StorageObjectStorage( std::optional format_settings_, LoadingStrictnessLevel mode, bool distributed_processing_, - ASTPtr partition_by_) + ASTPtr partition_by_, + bool lazy_init) : IStorage(table_id_) , configuration(configuration_) , object_storage(object_storage_) @@ -95,7 +96,8 @@ StorageObjectStorage::StorageObjectStorage( { try { - configuration->update(object_storage, context); + if (!lazy_init) + configuration->update(object_storage, context); } catch (...) { diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index e2bb41a4935..0df0bbdefbd 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -59,7 +59,8 @@ public: std::optional format_settings_, LoadingStrictnessLevel mode, bool distributed_processing_ = false, - ASTPtr partition_by_ = nullptr); + ASTPtr partition_by_ = nullptr, + bool lazy_init = false); String getName() const override; diff --git a/tests/integration/compose/docker_compose_iceberg_rest_catalog.yml b/tests/integration/compose/docker_compose_iceberg_rest_catalog.yml new file mode 100644 index 00000000000..860d9e29230 --- /dev/null +++ b/tests/integration/compose/docker_compose_iceberg_rest_catalog.yml @@ -0,0 +1,59 @@ +services: + spark-iceberg: + image: tabulario/spark-iceberg + container_name: spark-iceberg + build: spark/ + depends_on: + - rest + - minio + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + ports: + - 8080:8080 + - 10000:10000 + - 10001:10001 + rest: + image: tabulario/iceberg-rest + ports: + - 8182:8181 + environment: + - AWS_ACCESS_KEY_ID=minio + - AWS_SECRET_ACCESS_KEY=minio123 + - AWS_REGION=us-east-1 + - CATALOG_WAREHOUSE=s3://iceberg_data/ + - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO + - CATALOG_S3_ENDPOINT=http://minio:9000 + minio: + image: minio/minio + container_name: minio + environment: + - MINIO_ROOT_USER=minio + - MINIO_ROOT_PASSWORD=minio123 + - MINIO_DOMAIN=minio + networks: + default: + aliases: + - warehouse.minio + ports: + - 9001:9001 + - 9002:9000 + command: ["server", "/data", "--console-address", ":9001"] + mc: + depends_on: + - minio + image: minio/mc + container_name: mc + environment: + - AWS_ACCESS_KEY_ID=minio + - AWS_SECRET_ACCESS_KEY=minio123 + - AWS_REGION=us-east-1 + entrypoint: > + /bin/sh -c " + until (/usr/bin/mc config host add minio http://minio:9000 minio minio123) do echo '...waiting...' && sleep 1; done; + /usr/bin/mc rm -r --force minio/warehouse; + /usr/bin/mc mb minio/warehouse --ignore-existing; + /usr/bin/mc policy set public minio/warehouse; + tail -f /dev/null + " diff --git a/tests/integration/compose/docker_compose_minio.yml b/tests/integration/compose/docker_compose_minio.yml index 7fbe3796a0c..b0dad3d1ab8 100644 --- a/tests/integration/compose/docker_compose_minio.yml +++ b/tests/integration/compose/docker_compose_minio.yml @@ -14,6 +14,10 @@ services: depends_on: - proxy1 - proxy2 + networks: + default: + aliases: + - warehouse.minio # HTTP proxies for Minio. proxy1: diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index eae0f9fec2d..86225d09ba1 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -568,6 +568,7 @@ class ClickHouseCluster: self.resolver_logs_dir = os.path.join(self.instances_dir, "resolver") self.spark_session = None + self.with_iceberg_catalog = False self.with_azurite = False self.azurite_container = "azurite-container" @@ -1464,6 +1465,26 @@ class ClickHouseCluster: ) return self.base_minio_cmd + def setup_iceberg_catalog_cmd( + self, instance, env_variables, docker_compose_yml_dir + ): + self.base_cmd.extend( + [ + "--file", + p.join( + docker_compose_yml_dir, "docker_compose_iceberg_rest_catalog.yml" + ), + ] + ) + self.base_iceberg_catalog_cmd = self.compose_cmd( + "--env-file", + instance.env_file, + "--file", + p.join(docker_compose_yml_dir, "docker_compose_iceberg_rest_catalog.yml"), + ) + return self.base_iceberg_catalog_cmd + # return self.base_minio_cmd + def setup_azurite_cmd(self, instance, env_variables, docker_compose_yml_dir): self.with_azurite = True env_variables["AZURITE_PORT"] = str(self.azurite_port) @@ -1630,6 +1651,7 @@ class ClickHouseCluster: with_hive=False, with_coredns=False, with_prometheus=False, + with_iceberg_catalog=False, handle_prometheus_remote_write=False, handle_prometheus_remote_read=False, use_old_analyzer=None, @@ -1733,6 +1755,7 @@ class ClickHouseCluster: with_coredns=with_coredns, with_cassandra=with_cassandra, with_ldap=with_ldap, + with_iceberg_catalog=with_iceberg_catalog, use_old_analyzer=use_old_analyzer, server_bin_path=self.server_bin_path, odbc_bridge_bin_path=self.odbc_bridge_bin_path, @@ -1922,6 +1945,13 @@ class ClickHouseCluster: self.setup_minio_cmd(instance, env_variables, docker_compose_yml_dir) ) + if with_iceberg_catalog and not self.with_iceberg_catalog: + cmds.append( + self.setup_iceberg_catalog_cmd( + instance, env_variables, docker_compose_yml_dir + ) + ) + if with_azurite and not self.with_azurite: cmds.append( self.setup_azurite_cmd(instance, env_variables, docker_compose_yml_dir) @@ -3390,6 +3420,7 @@ class ClickHouseInstance: with_coredns, with_cassandra, with_ldap, + with_iceberg_catalog, use_old_analyzer, server_bin_path, odbc_bridge_bin_path, diff --git a/tests/integration/test_database_iceberg/__init__.py b/tests/integration/test_database_iceberg/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py new file mode 100644 index 00000000000..51d89444c2e --- /dev/null +++ b/tests/integration/test_database_iceberg/test.py @@ -0,0 +1,306 @@ +import glob +import json +import logging +import os +import random +import time +import uuid +from datetime import datetime, timedelta + +import pyarrow as pa +import pytest +import requests +import urllib3 +from minio import Minio +from pyiceberg.catalog import load_catalog +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table.sorting import SortField, SortOrder +from pyiceberg.transforms import DayTransform, IdentityTransform +from pyiceberg.types import ( + DoubleType, + FloatType, + NestedField, + StringType, + StructType, + TimestampType, +) + +from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm +from helpers.s3_tools import get_file_contents, list_s3_objects, prepare_s3_bucket +from helpers.test_tools import TSV, csv_compare + +BASE_URL = "http://rest:8181/v1" +BASE_URL_LOCAL = "http://localhost:8182/v1" +BASE_URL_LOCAL_RAW = "http://localhost:8182" + +CATALOG_NAME = "demo" + +DEFAULT_SCHEMA = Schema( + NestedField( + field_id=1, name="datetime", field_type=TimestampType(), required=False + ), + NestedField(field_id=2, name="symbol", field_type=StringType(), required=False), + NestedField(field_id=3, name="bid", field_type=DoubleType(), required=False), + NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False), + NestedField( + field_id=5, + name="details", + field_type=StructType( + NestedField( + field_id=4, + name="created_by", + field_type=StringType(), + required=False, + ), + ), + required=False, + ), +) + +DEFAULT_CREATE_TABLE = "CREATE TABLE {}.`{}.{}`\\n(\\n `datetime` Nullable(DateTime64(6)),\\n `symbol` Nullable(String),\\n `bid` Nullable(Float64),\\n `ask` Nullable(Float64),\\n `details` Tuple(created_by Nullable(String))\\n)\\nENGINE = Iceberg(\\'http://minio:9000/warehouse/data/\\', \\'minio\\', \\'[HIDDEN]\\')\n" + +DEFAULT_PARTITION_SPEC = PartitionSpec( + PartitionField( + source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day" + ) +) + +DEFAULT_SORT_ORDER = SortOrder(SortField(source_id=2, transform=IdentityTransform())) + + +def list_namespaces(): + response = requests.get(f"{BASE_URL_LOCAL}/namespaces") + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to list namespaces: {response.status_code}") + + +def load_catalog_impl(started_cluster): + return load_catalog( + CATALOG_NAME, + **{ + "uri": BASE_URL_LOCAL_RAW, + "type": "rest", + "s3.endpoint": f"http://localhost:9002", + "s3.access-key-id": "minio", + "s3.secret-access-key": "minio123", + }, + ) + + +def create_table( + catalog, + namespace, + table, + schema=DEFAULT_SCHEMA, + partition_spec=DEFAULT_PARTITION_SPEC, + sort_order=DEFAULT_SORT_ORDER, +): + return catalog.create_table( + identifier=f"{namespace}.{table}", + schema=schema, + location=f"s3://warehouse/data", + partition_spec=partition_spec, + sort_order=sort_order, + ) + + +def generate_record(): + return { + "datetime": datetime.now(), + "symbol": str("kek"), + "bid": round(random.uniform(100, 200), 2), + "ask": round(random.uniform(200, 300), 2), + "details": {"created_by": "Alice Smith"}, + } + + +def create_clickhouse_iceberg_database(started_cluster, node, name): + node.query( + f""" +DROP DATABASE IF EXISTS {name}; +CREATE DATABASE {name} ENGINE = Iceberg('{BASE_URL}', 'minio', 'minio123') +SETTINGS catalog_type = 'rest', storage_endpoint = 'http://minio:9000/' + """ + ) + + +def print_objects(): + minio_client = Minio( + f"localhost:9002", + access_key="minio", + secret_key="minio123", + secure=False, + http_client=urllib3.PoolManager(cert_reqs="CERT_NONE"), + ) + + objects = list(minio_client.list_objects("warehouse", "", recursive=True)) + names = [x.object_name for x in objects] + names.sort() + for name in names: + print(f"Found object: {name}") + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster = ClickHouseCluster(__file__) + cluster.add_instance( + "node1", + main_configs=[], + user_configs=[], + stay_alive=True, + with_iceberg_catalog=True, + ) + + logging.info("Starting cluster...") + cluster.start() + + # TODO: properly wait for container + time.sleep(10) + + yield cluster + + finally: + cluster.shutdown() + + +def test_list_tables(started_cluster): + node = started_cluster.instances["node1"] + + root_namespace = f"clickhouse_{uuid.uuid4()}" + namespace_1 = f"{root_namespace}.testA.A" + namespace_2 = f"{root_namespace}.testB.B" + namespace_1_tables = ["tableA", "tableB"] + namespace_2_tables = ["tableC", "tableD"] + + catalog = load_catalog_impl(started_cluster) + + for namespace in [namespace_1, namespace_2]: + catalog.create_namespace(namespace) + + found = False + for namespace_list in list_namespaces()["namespaces"]: + if root_namespace == namespace_list[0]: + found = True + break + assert found + + found = False + for namespace_list in catalog.list_namespaces(): + if root_namespace == namespace_list[0]: + found = True + break + assert found + + for namespace in [namespace_1, namespace_2]: + assert len(catalog.list_tables(namespace)) == 0 + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + tables_list = "" + for table in namespace_1_tables: + create_table(catalog, namespace_1, table) + if len(tables_list) > 0: + tables_list += "\n" + tables_list += f"{namespace_1}.{table}" + + for table in namespace_2_tables: + create_table(catalog, namespace_2, table) + if len(tables_list) > 0: + tables_list += "\n" + tables_list += f"{namespace_2}.{table}" + + assert ( + tables_list + == node.query( + f"SELECT name FROM system.tables WHERE database = '{CATALOG_NAME}' and name ILIKE '{root_namespace}%' ORDER BY name" + ).strip() + ) + node.restart_clickhouse() + assert ( + tables_list + == node.query( + f"SELECT name FROM system.tables WHERE database = '{CATALOG_NAME}' and name ILIKE '{root_namespace}%' ORDER BY name" + ).strip() + ) + + expected = DEFAULT_CREATE_TABLE.format(CATALOG_NAME, namespace_2, "tableC") + assert expected == node.query( + f"SHOW CREATE TABLE {CATALOG_NAME}.`{namespace_2}.tableC`" + ) + + +def test_many_namespaces(started_cluster): + node = started_cluster.instances["node1"] + root_namespace_1 = f"A_{uuid.uuid4()}" + root_namespace_2 = f"B_{uuid.uuid4()}" + namespaces = [ + f"{root_namespace_1}", + f"{root_namespace_1}.B.C", + f"{root_namespace_1}.B.C.D", + f"{root_namespace_1}.B.C.D.E", + f"{root_namespace_2}", + f"{root_namespace_2}.C", + f"{root_namespace_2}.CC", + ] + tables = ["A", "B", "C"] + catalog = load_catalog_impl(started_cluster) + + for namespace in namespaces: + catalog.create_namespace(namespace) + for table in tables: + create_table(catalog, namespace, table) + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + for namespace in namespaces: + for table in tables: + table_name = f"{namespace}.{table}" + assert int( + node.query( + f"SELECT count() FROM system.tables WHERE database = '{CATALOG_NAME}' and name = '{table_name}'" + ) + ) + + +def test_select(started_cluster): + node = started_cluster.instances["node1"] + + test_ref = f"test_list_tables_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + root_namespace = f"{test_ref}_namespace" + + namespace = f"{root_namespace}.A.B.C" + namespaces_to_create = [ + root_namespace, + f"{root_namespace}.A", + f"{root_namespace}.A.B", + f"{root_namespace}.A.B.C", + ] + + catalog = load_catalog_impl(started_cluster) + + for namespace in namespaces_to_create: + catalog.create_namespace(namespace) + assert len(catalog.list_tables(namespace)) == 0 + + table = create_table(catalog, namespace, table_name) + + num_rows = 10 + data = [generate_record() for _ in range(num_rows)] + df = pa.Table.from_pylist(data) + table.append(df) + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + expected = DEFAULT_CREATE_TABLE.format(CATALOG_NAME, namespace, table_name) + assert expected == node.query( + f"SHOW CREATE TABLE {CATALOG_NAME}.`{namespace}.{table_name}`" + ) + + assert num_rows == int( + node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace}.{table_name}`") + )