diff --git a/docker/test/integration/runner/requirements.txt b/docker/test/integration/runner/requirements.txt index 45811ecbaea..da0e0963d68 100644 --- a/docker/test/integration/runner/requirements.txt +++ b/docker/test/integration/runner/requirements.txt @@ -36,7 +36,7 @@ geomet==0.2.1.post1 grpcio-tools==1.60.0 grpcio==1.60.0 gssapi==1.8.3 -httplib2==0.20.2 +httplib2==0.22.0 idna==3.7 importlib-metadata==4.6.4 iniconfig==2.0.0 @@ -72,7 +72,7 @@ pyarrow==17.0.0 pycparser==2.22 pycryptodome==3.20.0 pymongo==3.11.0 -pyparsing==2.4.7 +pyparsing==3.1.0 pyspark==3.3.2 pyspnego==0.10.2 pytest-order==1.0.0 @@ -101,4 +101,5 @@ wadllib==1.3.6 websocket-client==1.8.0 wheel==0.38.1 zipp==1.0.0 +pyiceberg==0.7.1 jinja2==3.1.3 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 7385d1510cf..74bbbfa3f68 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -160,6 +160,8 @@ if (TARGET ch_contrib::hdfs) add_headers_and_sources(dbms Disks/ObjectStorages/HDFS) endif() +add_headers_and_sources(dbms Databases/Iceberg) + add_headers_and_sources(dbms Disks/ObjectStorages/Cached) add_headers_and_sources(dbms Disks/ObjectStorages/Local) add_headers_and_sources(dbms Disks/ObjectStorages/Web) diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 8c3f2a442c4..363fb6f32c5 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -189,6 +189,9 @@ M(BuildVectorSimilarityIndexThreads, "Number of threads in the build vector similarity index thread pool.") \ M(BuildVectorSimilarityIndexThreadsActive, "Number of threads in the build vector similarity index thread pool running a task.") \ M(BuildVectorSimilarityIndexThreadsScheduled, "Number of queued or active jobs in the build vector similarity index thread pool.") \ + M(IcebergCatalogThreads, "Number of threads in the IcebergCatalog thread pool.") \ + M(IcebergCatalogThreadsActive, "Number of threads in the IcebergCatalog thread pool running a task.") \ + M(IcebergCatalogThreadsScheduled, "Number of queued or active jobs in the IcebergCatalog thread pool.") \ \ M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \ M(DiskPlainRewritableAzureFileCount, "Number of file entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \ diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 122877c7772..b6be1a8834c 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -613,6 +613,7 @@ M(733, TABLE_IS_BEING_RESTARTED) \ M(734, CANNOT_WRITE_AFTER_BUFFER_CANCELED) \ M(735, QUERY_WAS_CANCELLED_BY_CLIENT) \ + M(736, ICEBERG_CATALOG_ERROR) \ \ M(900, DISTRIBUTED_CACHE_ERROR) \ M(901, CANNOT_USE_DISTRIBUTED_CACHE) \ diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index 5f48e5ea891..2169f238c18 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -210,6 +210,8 @@ namespace DB DECLARE(UInt64, load_marks_threadpool_queue_size, 1000000, "Number of tasks which is possible to push into prefetches pool", 0) \ DECLARE(UInt64, threadpool_writer_pool_size, 100, "Size of background pool for write requests to object storages", 0) \ DECLARE(UInt64, threadpool_writer_queue_size, 1000000, "Number of tasks which is possible to push into background pool for write requests to object storages", 0) \ + DECLARE(UInt64, iceberg_catalog_threadpool_pool_size, 50, "Size of background pool for iceberg catalog", 0) \ + DECLARE(UInt64, iceberg_catalog_threadpool_queue_size, 1000000, "Number of tasks which is possible to push into iceberg catalog pool", 0) \ DECLARE(UInt32, allow_feature_tier, 0, "0 - All feature tiers allowed (experimental, beta, production). 1 - Only beta and production feature tiers allowed. 2 - Only production feature tier allowed", 0) \ diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index a9887aa92af..11e1c8442ae 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -5560,6 +5560,9 @@ Only available in ClickHouse Cloud. Exclude new data parts from SELECT queries u )", 0) \ DECLARE(Int64, prefer_warmed_unmerged_parts_seconds, 0, R"( Only available in ClickHouse Cloud. If a merged part is less than this many seconds old and is not pre-warmed (see cache_populated_by_fetch), but all its source parts are available and pre-warmed, SELECT queries will read from those parts instead. Only for ReplicatedMergeTree. Note that this only checks whether CacheWarmer processed the part; if the part was fetched into cache by something else, it'll still be considered cold until CacheWarmer gets to it; if it was warmed, then evicted from cache, it'll still be considered warm. +)", 0) \ + DECLARE(Bool, allow_experimental_database_iceberg, false, R"( +Allow experimental database engine Iceberg )", 0) \ DECLARE(Bool, allow_deprecated_error_prone_window_functions, false, R"( Allow usage of deprecated error prone window functions (neighbor, runningAccumulate, runningDifferenceStartingWithFirstValue, runningDifference) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index dde7ba68228..98995a12e6c 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -60,6 +60,7 @@ static std::initializer_list + +#if USE_AVRO +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include +#include +#include +#include + + +namespace DB +{ +namespace DatabaseIcebergSetting +{ + extern const DatabaseIcebergSettingsDatabaseIcebergCatalogType catalog_type; + extern const DatabaseIcebergSettingsString warehouse; + extern const DatabaseIcebergSettingsString catalog_credential; + extern const DatabaseIcebergSettingsString auth_header; + extern const DatabaseIcebergSettingsString auth_scope; + extern const DatabaseIcebergSettingsString storage_endpoint; + extern const DatabaseIcebergSettingsString oauth_server_uri; + extern const DatabaseIcebergSettingsBool vended_credentials; +} +namespace Setting +{ + extern const SettingsBool allow_experimental_database_iceberg; +} + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int SUPPORT_IS_DISABLED; +} + +namespace +{ + /// Parse a string, containing at least one dot, into a two substrings: + /// A.B.C.D.E -> A.B.C.D and E, where + /// `A.B.C.D` is a table "namespace". + /// `E` is a table name. + std::pair parseTableName(const std::string & name) + { + auto pos = name.rfind('.'); + if (pos == std::string::npos) + throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Table cannot have empty namespace: {}", name); + + auto table_name = name.substr(pos + 1); + auto namespace_name = name.substr(0, name.size() - table_name.size() - 1); + return {namespace_name, table_name}; + } +} + +DatabaseIceberg::DatabaseIceberg( + const std::string & database_name_, + const std::string & url_, + const DatabaseIcebergSettings & settings_, + ASTPtr database_engine_definition_) + : IDatabase(database_name_) + , url(url_) + , settings(settings_) + , database_engine_definition(database_engine_definition_) + , log(getLogger("DatabaseIceberg(" + database_name_ + ")")) +{ + validateSettings(); +} + +void DatabaseIceberg::validateSettings() +{ + if (settings[DatabaseIcebergSetting::warehouse].value.empty()) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, "`warehouse` setting cannot be empty. " + "Please specify 'SETTINGS warehouse=' in the CREATE DATABASE query"); + } +} + +std::shared_ptr DatabaseIceberg::getCatalog() const +{ + if (catalog_impl) + return catalog_impl; + + switch (settings[DatabaseIcebergSetting::catalog_type].value) + { + case DB::DatabaseIcebergCatalogType::REST: + { + catalog_impl = std::make_shared( + settings[DatabaseIcebergSetting::warehouse].value, + url, + settings[DatabaseIcebergSetting::catalog_credential].value, + settings[DatabaseIcebergSetting::auth_scope].value, + settings[DatabaseIcebergSetting::auth_header], + settings[DatabaseIcebergSetting::oauth_server_uri].value, + Context::getGlobalContextInstance()); + } + } + return catalog_impl; +} + +std::shared_ptr DatabaseIceberg::getConfiguration(DatabaseIcebergStorageType type) const +{ + /// TODO: add tests for azure, local storage types. + + switch (type) + { +#if USE_AWS_S3 + case DB::DatabaseIcebergStorageType::S3: + { + return std::make_shared(); + } +#endif +#if USE_AZURE_BLOB_STORAGE + case DB::DatabaseIcebergStorageType::Azure: + { + return std::make_shared(); + } +#endif +#if USE_HDFS + case DB::DatabaseIcebergStorageType::HDFS: + { + return std::make_shared(); + } +#endif + case DB::DatabaseIcebergStorageType::Local: + { + return std::make_shared(); + } +#if !USE_AWS_S3 || !USE_AZURE_BLOB_STORAGE || !USE_HDFS + default: + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Server does not contain support for storage type {}", + type); +#endif + } +} + +std::string DatabaseIceberg::getStorageEndpointForTable(const Iceberg::TableMetadata & table_metadata) const +{ + auto endpoint_from_settings = settings[DatabaseIcebergSetting::storage_endpoint].value; + if (!endpoint_from_settings.empty()) + { + return std::filesystem::path(endpoint_from_settings) + / table_metadata.getLocation(/* path_only */true) + / ""; + } + else + { + return std::filesystem::path(table_metadata.getLocation(/* path_only */false)) / ""; + } +} + +bool DatabaseIceberg::empty() const +{ + return getCatalog()->empty(); +} + +bool DatabaseIceberg::isTableExist(const String & name, ContextPtr /* context_ */) const +{ + const auto [namespace_name, table_name] = parseTableName(name); + return getCatalog()->existsTable(namespace_name, table_name); +} + +StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_) const +{ + auto catalog = getCatalog(); + auto table_metadata = Iceberg::TableMetadata().withLocation().withSchema(); + + const bool with_vended_credentials = settings[DatabaseIcebergSetting::vended_credentials].value; + if (with_vended_credentials) + table_metadata = table_metadata.withStorageCredentials(); + + auto [namespace_name, table_name] = parseTableName(name); + + if (!catalog->tryGetTableMetadata(namespace_name, table_name, table_metadata)) + return nullptr; + + /// Take database engine definition AST as base. + ASTStorage * storage = database_engine_definition->as(); + ASTs args = storage->engine->arguments->children; + + /// Replace Iceberg Catalog endpoint with storage path endpoint of requested table. + auto table_endpoint = getStorageEndpointForTable(table_metadata); + args[0] = std::make_shared(table_endpoint); + + /// We either fetch storage credentials from catalog + /// or get storage credentials from database engine arguments + /// in CREATE query (e.g. in `args`). + /// Vended credentials can be disabled in catalog itself, + /// so we have a separate setting to know whether we should even try to fetch them. + if (with_vended_credentials && args.size() == 1) + { + auto storage_credentials = table_metadata.getStorageCredentials(); + if (storage_credentials) + storage_credentials->addCredentialsToEngineArgs(args); + } + else if (args.size() == 1) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Either vended credentials need to be enabled " + "or storage credentials need to be specified in database engine arguments in CREATE query"); + } + + LOG_TEST(log, "Using table endpoint: {}", table_endpoint); + + const auto columns = ColumnsDescription(table_metadata.getSchema()); + + DatabaseIcebergStorageType storage_type; + auto storage_type_from_catalog = catalog->getStorageType(); + if (storage_type_from_catalog.has_value()) + storage_type = storage_type_from_catalog.value(); + else + storage_type = table_metadata.getStorageType(); + + const auto configuration = getConfiguration(storage_type); + auto storage_settings = std::make_unique(); + + /// with_table_structure = false: because there will be + /// no table structure in table definition AST. + StorageObjectStorage::Configuration::initialize(*configuration, args, context_, /* with_table_structure */false, std::move(storage_settings)); + + return std::make_shared( + configuration, + configuration->createObjectStorage(context_, /* is_readonly */ false), + context_, + StorageID(getDatabaseName(), name), + /* columns */columns, + /* constraints */ConstraintsDescription{}, + /* comment */"", + getFormatSettings(context_), + LoadingStrictnessLevel::CREATE, + /* distributed_processing */false, + /* partition_by */nullptr, + /* lazy_init */true); +} + +DatabaseTablesIteratorPtr DatabaseIceberg::getTablesIterator( + ContextPtr context_, + const FilterByNameFunction & filter_by_table_name, + bool /* skip_not_loaded */) const +{ + Tables tables; + auto catalog = getCatalog(); + const auto iceberg_tables = catalog->getTables(); + + for (const auto & table_name : iceberg_tables) + { + if (filter_by_table_name && !filter_by_table_name(table_name)) + continue; + + auto storage = tryGetTable(table_name, context_); + [[maybe_unused]] bool inserted = tables.emplace(table_name, storage).second; + chassert(inserted); + } + + return std::make_unique(tables, getDatabaseName()); +} + +ASTPtr DatabaseIceberg::getCreateDatabaseQuery() const +{ + const auto & create_query = std::make_shared(); + create_query->setDatabase(getDatabaseName()); + create_query->set(create_query->storage, database_engine_definition); + return create_query; +} + +ASTPtr DatabaseIceberg::getCreateTableQueryImpl( + const String & name, + ContextPtr /* context_ */, + bool /* throw_on_error */) const +{ + auto catalog = getCatalog(); + auto table_metadata = Iceberg::TableMetadata().withLocation().withSchema(); + + const auto [namespace_name, table_name] = parseTableName(name); + catalog->getTableMetadata(namespace_name, table_name, table_metadata); + + auto create_table_query = std::make_shared(); + auto table_storage_define = database_engine_definition->clone(); + + auto * storage = table_storage_define->as(); + storage->engine->kind = ASTFunction::Kind::TABLE_ENGINE; + storage->settings = {}; + + create_table_query->set(create_table_query->storage, table_storage_define); + + auto columns_declare_list = std::make_shared(); + auto columns_expression_list = std::make_shared(); + + columns_declare_list->set(columns_declare_list->columns, columns_expression_list); + create_table_query->set(create_table_query->columns_list, columns_declare_list); + + create_table_query->setTable(name); + create_table_query->setDatabase(getDatabaseName()); + + for (const auto & column_type_and_name : table_metadata.getSchema()) + { + const auto column_declaration = std::make_shared(); + column_declaration->name = column_type_and_name.name; + column_declaration->type = makeASTDataType(column_type_and_name.type->getName()); + columns_expression_list->children.emplace_back(column_declaration); + } + + auto storage_engine_arguments = storage->engine->arguments; + if (storage_engine_arguments->children.empty()) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, "Unexpected number of arguments: {}", + storage_engine_arguments->children.size()); + } + + auto table_endpoint = getStorageEndpointForTable(table_metadata); + storage_engine_arguments->children[0] = std::make_shared(table_endpoint); + + return create_table_query; +} + +void registerDatabaseIceberg(DatabaseFactory & factory) +{ + auto create_fn = [](const DatabaseFactory::Arguments & args) + { + if (!args.create_query.attach + && !args.context->getSettingsRef()[Setting::allow_experimental_database_iceberg]) + { + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, + "DatabaseIceberg engine is experimental. " + "To allow its usage, enable setting allow_experimental_database_iceberg"); + } + + const auto * database_engine_define = args.create_query.storage; + const auto & database_engine_name = args.engine_name; + + const ASTFunction * function_define = database_engine_define->engine; + if (!function_define->arguments) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name); + + ASTs & engine_args = function_define->arguments->children; + if (engine_args.empty()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name); + + for (auto & engine_arg : engine_args) + engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context); + + const auto url = engine_args[0]->as()->value.safeGet(); + + DatabaseIcebergSettings database_settings; + if (database_engine_define->settings) + database_settings.loadFromQuery(*database_engine_define); + + return std::make_shared( + args.database_name, + url, + database_settings, + database_engine_define->clone()); + }; + factory.registerDatabase("Iceberg", create_fn, { .supports_arguments = true, .supports_settings = true }); +} + +} + +#endif diff --git a/src/Databases/Iceberg/DatabaseIceberg.h b/src/Databases/Iceberg/DatabaseIceberg.h new file mode 100644 index 00000000000..94717c522af --- /dev/null +++ b/src/Databases/Iceberg/DatabaseIceberg.h @@ -0,0 +1,66 @@ +#pragma once +#include "config.h" + +#if USE_AVRO +#include +#include +#include +#include +#include + +namespace DB +{ + +class DatabaseIceberg final : public IDatabase, WithContext +{ +public: + explicit DatabaseIceberg( + const std::string & database_name_, + const std::string & url_, + const DatabaseIcebergSettings & settings_, + ASTPtr database_engine_definition_); + + String getEngineName() const override { return "Iceberg"; } + + bool canContainMergeTreeTables() const override { return false; } + bool canContainDistributedTables() const override { return false; } + bool shouldBeEmptyOnDetach() const override { return false; } + + bool empty() const override; + + bool isTableExist(const String & name, ContextPtr context) const override; + StoragePtr tryGetTable(const String & name, ContextPtr context) const override; + + DatabaseTablesIteratorPtr getTablesIterator( + ContextPtr context, + const FilterByNameFunction & filter_by_table_name, + bool skip_not_loaded) const override; + + void shutdown() override {} + + ASTPtr getCreateDatabaseQuery() const override; + +protected: + ASTPtr getCreateTableQueryImpl(const String & table_name, ContextPtr context, bool throw_on_error) const override; + +private: + /// Iceberg Catalog url. + const std::string url; + /// SETTINGS from CREATE query. + const DatabaseIcebergSettings settings; + /// Database engine definition taken from initial CREATE DATABASE query. + const ASTPtr database_engine_definition; + const LoggerPtr log; + /// Crendetials to authenticate Iceberg Catalog. + Poco::Net::HTTPBasicCredentials credentials; + + mutable std::shared_ptr catalog_impl; + + void validateSettings(); + std::shared_ptr getCatalog() const; + std::shared_ptr getConfiguration(DatabaseIcebergStorageType type) const; + std::string getStorageEndpointForTable(const Iceberg::TableMetadata & table_metadata) const; +}; + +} +#endif diff --git a/src/Databases/Iceberg/DatabaseIcebergSettings.cpp b/src/Databases/Iceberg/DatabaseIcebergSettings.cpp new file mode 100644 index 00000000000..37b4909106b --- /dev/null +++ b/src/Databases/Iceberg/DatabaseIcebergSettings.cpp @@ -0,0 +1,88 @@ +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_SETTING; +} + +#define DATABASE_ICEBERG_RELATED_SETTINGS(DECLARE, ALIAS) \ + DECLARE(DatabaseIcebergCatalogType, catalog_type, DatabaseIcebergCatalogType::REST, "Catalog type", 0) \ + DECLARE(String, catalog_credential, "", "", 0) \ + DECLARE(Bool, vended_credentials, true, "Use vended credentials (storage credentials) from catalog", 0) \ + DECLARE(String, auth_scope, "PRINCIPAL_ROLE:ALL", "Authorization scope for client credentials or token exchange", 0) \ + DECLARE(String, oauth_server_uri, "", "OAuth server uri", 0) \ + DECLARE(String, warehouse, "", "Warehouse name inside the catalog", 0) \ + DECLARE(String, auth_header, "", "Authorization header of format 'Authorization: '", 0) \ + DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \ + +#define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \ + DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS) + +DECLARE_SETTINGS_TRAITS(DatabaseIcebergSettingsTraits, LIST_OF_DATABASE_ICEBERG_SETTINGS) +IMPLEMENT_SETTINGS_TRAITS(DatabaseIcebergSettingsTraits, LIST_OF_DATABASE_ICEBERG_SETTINGS) + +struct DatabaseIcebergSettingsImpl : public BaseSettings +{ +}; + +#define INITIALIZE_SETTING_EXTERN(TYPE, NAME, DEFAULT, DESCRIPTION, FLAGS) \ + DatabaseIcebergSettings##TYPE NAME = &DatabaseIcebergSettingsImpl ::NAME; + +namespace DatabaseIcebergSetting +{ +LIST_OF_DATABASE_ICEBERG_SETTINGS(INITIALIZE_SETTING_EXTERN, SKIP_ALIAS) +} + +#undef INITIALIZE_SETTING_EXTERN + +DatabaseIcebergSettings::DatabaseIcebergSettings() : impl(std::make_unique()) +{ +} + +DatabaseIcebergSettings::DatabaseIcebergSettings(const DatabaseIcebergSettings & settings) + : impl(std::make_unique(*settings.impl)) +{ +} + +DatabaseIcebergSettings::DatabaseIcebergSettings(DatabaseIcebergSettings && settings) noexcept + : impl(std::make_unique(std::move(*settings.impl))) +{ +} + +DatabaseIcebergSettings::~DatabaseIcebergSettings() = default; + +DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseIcebergSettings, IMPLEMENT_SETTING_SUBSCRIPT_OPERATOR) + + +void DatabaseIcebergSettings::applyChanges(const SettingsChanges & changes) +{ + impl->applyChanges(changes); +} + +void DatabaseIcebergSettings::loadFromQuery(const ASTStorage & storage_def) +{ + if (storage_def.settings) + { + try + { + impl->applyChanges(storage_def.settings->changes); + } + catch (Exception & e) + { + if (e.code() == ErrorCodes::UNKNOWN_SETTING) + e.addMessage("for database engine " + storage_def.engine->name); + throw; + } + } +} + +} diff --git a/src/Databases/Iceberg/DatabaseIcebergSettings.h b/src/Databases/Iceberg/DatabaseIcebergSettings.h new file mode 100644 index 00000000000..041a99c6d83 --- /dev/null +++ b/src/Databases/Iceberg/DatabaseIcebergSettings.h @@ -0,0 +1,40 @@ +#pragma once + +#include +#include +#include +#include + + +namespace DB +{ +class ASTStorage; +struct DatabaseIcebergSettingsImpl; +class SettingsChanges; + +/// List of available types supported in DatabaseIcebergSettings object +#define DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \ + M(CLASS_NAME, String) \ + M(CLASS_NAME, UInt64) \ + M(CLASS_NAME, Bool) \ + M(CLASS_NAME, DatabaseIcebergCatalogType) \ + +DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseIcebergSettings, DECLARE_SETTING_TRAIT) + +struct DatabaseIcebergSettings +{ + DatabaseIcebergSettings(); + DatabaseIcebergSettings(const DatabaseIcebergSettings & settings); + DatabaseIcebergSettings(DatabaseIcebergSettings && settings) noexcept; + ~DatabaseIcebergSettings(); + + DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseIcebergSettings, DECLARE_SETTING_SUBSCRIPT_OPERATOR) + + void loadFromQuery(const ASTStorage & storage_def); + + void applyChanges(const SettingsChanges & changes); + +private: + std::unique_ptr impl; +}; +} diff --git a/src/Databases/Iceberg/DatabaseIcebergStorageType.h b/src/Databases/Iceberg/DatabaseIcebergStorageType.h new file mode 100644 index 00000000000..9b8dc3a0633 --- /dev/null +++ b/src/Databases/Iceberg/DatabaseIcebergStorageType.h @@ -0,0 +1,15 @@ +#pragma once +#include + +namespace DB +{ + +enum class DatabaseIcebergStorageType : uint8_t +{ + S3, + Azure, + Local, + HDFS, +}; + +} diff --git a/src/Databases/Iceberg/ICatalog.cpp b/src/Databases/Iceberg/ICatalog.cpp new file mode 100644 index 00000000000..4568cf95ac0 --- /dev/null +++ b/src/Databases/Iceberg/ICatalog.cpp @@ -0,0 +1,118 @@ +#include +#include +#include +#include + +namespace DB::ErrorCodes +{ + extern const int NOT_IMPLEMENTED; + extern const int LOGICAL_ERROR; +} + +namespace Iceberg +{ + +StorageType parseStorageTypeFromLocation(const std::string & location) +{ + /// Table location in catalog metadata always starts with one of s3://, file://, etc. + /// So just extract this part of the path and deduce storage type from it. + + auto pos = location.find("://"); + if (pos == std::string::npos) + { + throw DB::Exception( + DB::ErrorCodes::NOT_IMPLEMENTED, + "Unexpected path format: {}", location); + } + + auto storage_type_str = location.substr(0, pos); + auto storage_type = magic_enum::enum_cast(Poco::toUpper(storage_type_str)); + + if (!storage_type) + { + throw DB::Exception( + DB::ErrorCodes::NOT_IMPLEMENTED, + "Unsupported storage type: {}", storage_type_str); + } + + return *storage_type; +} + +void TableMetadata::setLocation(const std::string & location_) +{ + if (!with_location) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data location was not requested"); + + /// Location has format: + /// s3:///path/to/table/data. + /// We want to split s3:// and path/to/table/data. + + auto pos = location_.find("://"); + if (pos == std::string::npos) + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_); + + auto pos_to_bucket = pos + std::strlen("://"); + auto pos_to_path = location_.substr(pos_to_bucket).find('/'); + + if (pos_to_path == std::string::npos) + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_); + + pos_to_path = pos_to_bucket + pos_to_path; + + location_without_path = location_.substr(0, pos_to_path); + path = location_.substr(pos_to_path + 1); + + LOG_TEST(getLogger("TableMetadata"), + "Parsed location without path: {}, path: {}", + location_without_path, path); +} + +std::string TableMetadata::getLocation(bool path_only) const +{ + if (!with_location) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data location was not requested"); + + if (path_only) + return path; + else + return std::filesystem::path(location_without_path) / path; +} + +void TableMetadata::setSchema(const DB::NamesAndTypesList & schema_) +{ + if (!with_schema) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data schema was not requested"); + + schema = schema_; +} + +const DB::NamesAndTypesList & TableMetadata::getSchema() const +{ + if (!with_schema) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data schema was not requested"); + + return schema; +} + +void TableMetadata::setStorageCredentials(std::shared_ptr credentials_) +{ + if (!with_storage_credentials) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Storage credentials were not requested"); + + storage_credentials = std::move(credentials_); +} + +std::shared_ptr TableMetadata::getStorageCredentials() const +{ + if (!with_storage_credentials) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data schema was not requested"); + + return storage_credentials; +} + +StorageType TableMetadata::getStorageType() const +{ + return parseStorageTypeFromLocation(location_without_path); +} + +} diff --git a/src/Databases/Iceberg/ICatalog.h b/src/Databases/Iceberg/ICatalog.h new file mode 100644 index 00000000000..45d9f056477 --- /dev/null +++ b/src/Databases/Iceberg/ICatalog.h @@ -0,0 +1,104 @@ +#pragma once +#include +#include +#include +#include +#include + +namespace Iceberg +{ +using StorageType = DB::DatabaseIcebergStorageType; +StorageType parseStorageTypeFromLocation(const std::string & location); + +/// A class representing table metadata, +/// which was received from Catalog. +class TableMetadata +{ +public: + TableMetadata() = default; + + TableMetadata & withLocation() { with_location = true; return *this; } + TableMetadata & withSchema() { with_schema = true; return *this; } + TableMetadata & withStorageCredentials() { with_storage_credentials = true; return *this; } + + void setLocation(const std::string & location_); + std::string getLocation(bool path_only) const; + + void setSchema(const DB::NamesAndTypesList & schema_); + const DB::NamesAndTypesList & getSchema() const; + + void setStorageCredentials(std::shared_ptr credentials_); + std::shared_ptr getStorageCredentials() const; + + bool requiresLocation() const { return with_location; } + bool requiresSchema() const { return with_schema; } + bool requiresCredentials() const { return with_storage_credentials; } + + StorageType getStorageType() const; + +private: + /// Starts with s3://, file://, etc. + /// For example, `s3://bucket/` + std::string location_without_path; + /// Path to table's data: `/path/to/table/data/` + std::string path; + DB::NamesAndTypesList schema; + + /// Storage credentials, which are called "vended credentials". + std::shared_ptr storage_credentials; + + bool with_location = false; + bool with_schema = false; + bool with_storage_credentials = false; +}; + + +/// Base class for catalog implementation. +/// Used for communication with the catalog. +class ICatalog +{ +public: + using Namespaces = std::vector; + + explicit ICatalog(const std::string & warehouse_) : warehouse(warehouse_) {} + + virtual ~ICatalog() = default; + + /// Does catalog have any tables? + virtual bool empty() const = 0; + + /// Fetch tables' names list. + /// Contains full namespaces in names. + virtual DB::Names getTables() const = 0; + + /// Check that a table exists in a given namespace. + virtual bool existsTable( + const std::string & namespace_naem, + const std::string & table_name) const = 0; + + /// Get table metadata in the given namespace. + /// Throw exception if table does not exist. + virtual void getTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + TableMetadata & result) const = 0; + + /// Get table metadata in the given namespace. + /// Return `false` if table does not exist, `true` otherwise. + virtual bool tryGetTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + TableMetadata & result) const = 0; + + /// Get storage type, where Iceberg tables' data is stored. + /// E.g. one of S3, Azure, Local, HDFS. + virtual std::optional getStorageType() const = 0; + +protected: + /// Name of the warehouse, + /// which is sometimes also called "catalog name". + const std::string warehouse; +}; + + +} diff --git a/src/Databases/Iceberg/RestCatalog.cpp b/src/Databases/Iceberg/RestCatalog.cpp new file mode 100644 index 00000000000..ab51bb8a495 --- /dev/null +++ b/src/Databases/Iceberg/RestCatalog.cpp @@ -0,0 +1,648 @@ +#include "config.h" + +#if USE_AVRO +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + + +namespace DB::ErrorCodes +{ + extern const int ICEBERG_CATALOG_ERROR; + extern const int LOGICAL_ERROR; + extern const int BAD_ARGUMENTS; +} + +namespace DB::Setting +{ + extern const SettingsBool iceberg_engine_ignore_schema_evolution; +} + +namespace Iceberg +{ + +static constexpr auto CONFIG_ENDPOINT = "config"; +static constexpr auto NAMESPACES_ENDPOINT = "namespaces"; + +namespace +{ + +std::pair parseCatalogCredential(const std::string & catalog_credential) +{ + /// Parse a string of format ":" + /// into separare strings client_id and client_secret. + + std::string client_id, client_secret; + if (!catalog_credential.empty()) + { + auto pos = catalog_credential.find(':'); + if (pos == std::string::npos) + { + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected format of catalog credential: " + "expected client_id and client_secret separated by `:`"); + } + client_id = catalog_credential.substr(0, pos); + client_secret = catalog_credential.substr(pos + 1); + } + return std::pair(client_id, client_secret); +} + +DB::HTTPHeaderEntry parseAuthHeader(const std::string & auth_header) +{ + /// Parse a string of format "Authorization: " + /// into a key-value header "Authorization", " " + + auto pos = auth_header.find(':'); + if (pos == std::string::npos) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected format of auth header"); + + return DB::HTTPHeaderEntry(auth_header.substr(0, pos), auth_header.substr(pos + 1)); +} + +std::string correctAPIURI(const std::string & uri) +{ + if (uri.ends_with("v1")) + return uri; + return std::filesystem::path(uri) / "v1"; +} + +} + +std::string RestCatalog::Config::toString() const +{ + DB::WriteBufferFromOwnString wb; + + if (!prefix.empty()) + wb << "prefix: " << prefix.string() << ", "; + + if (!default_base_location.empty()) + wb << "default_base_location: " << default_base_location << ", "; + + return wb.str(); +} + +RestCatalog::RestCatalog( + const std::string & warehouse_, + const std::string & base_url_, + const std::string & catalog_credential_, + const std::string & auth_scope_, + const std::string & auth_header_, + const std::string & oauth_server_uri_, + DB::ContextPtr context_) + : ICatalog(warehouse_) + , DB::WithContext(context_) + , base_url(correctAPIURI(base_url_)) + , log(getLogger("RestCatalog(" + warehouse_ + ")")) + , auth_scope(auth_scope_) + , oauth_server_uri(oauth_server_uri_) +{ + if (!catalog_credential_.empty()) + { + std::tie(client_id, client_secret) = parseCatalogCredential(catalog_credential_); + update_token_if_expired = true; + } + else if (!auth_header_.empty()) + auth_header = parseAuthHeader(auth_header_); + + config = loadConfig(); +} + +RestCatalog::Config RestCatalog::loadConfig() +{ + Poco::URI::QueryParameters params = {{"warehouse", warehouse}}; + auto buf = createReadBuffer(CONFIG_ENDPOINT, params); + + std::string json_str; + readJSONObjectPossiblyInvalid(json_str, *buf); + + LOG_TEST(log, "Received catalog configuration settings: {}", json_str); + + Poco::JSON::Parser parser; + Poco::Dynamic::Var json = parser.parse(json_str); + const Poco::JSON::Object::Ptr & object = json.extract(); + + Config result; + + auto defaults_object = object->get("defaults").extract(); + parseCatalogConfigurationSettings(defaults_object, result); + + auto overrides_object = object->get("overrides").extract(); + parseCatalogConfigurationSettings(overrides_object, result); + + LOG_TEST(log, "Parsed catalog configuration settings: {}", result.toString()); + return result; +} + +void RestCatalog::parseCatalogConfigurationSettings(const Poco::JSON::Object::Ptr & object, Config & result) +{ + if (!object) + return; + + if (object->has("prefix")) + result.prefix = object->get("prefix").extract(); + + if (object->has("default-base-location")) + result.default_base_location = object->get("default-base-location").extract(); +} + +DB::HTTPHeaderEntries RestCatalog::getAuthHeaders(bool update_token) const +{ + /// Option 1: user specified auth header manually. + /// Header has format: 'Authorization: '. + if (auth_header.has_value()) + { + return DB::HTTPHeaderEntries{auth_header.value()}; + } + + /// Option 2: user provided grant_type, client_id and client_secret. + /// We would make OAuthClientCredentialsRequest + /// https://github.com/apache/iceberg/blob/3badfe0c1fcf0c0adfc7aa4a10f0b50365c48cf9/open-api/rest-catalog-open-api.yaml#L3498C5-L3498C34 + if (!client_id.empty()) + { + if (!access_token.has_value() || update_token) + { + access_token = retrieveAccessToken(); + } + + DB::HTTPHeaderEntries headers; + headers.emplace_back("Authorization", "Bearer " + access_token.value()); + return headers; + } + + return {}; +} + +std::string RestCatalog::retrieveAccessToken() const +{ + static constexpr auto oauth_tokens_endpoint = "oauth/tokens"; + + /// TODO: + /// 1. support oauth2-server-uri + /// https://github.com/apache/iceberg/blob/918f81f3c3f498f46afcea17c1ac9cdc6913cb5c/open-api/rest-catalog-open-api.yaml#L183C82-L183C99 + + DB::HTTPHeaderEntries headers; + headers.emplace_back("Content-Type", "application/x-www-form-urlencoded"); + headers.emplace_back("Accepts", "application/json; charset=UTF-8"); + + Poco::URI url; + DB::ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback; + if (oauth_server_uri.empty()) + { + url = Poco::URI(base_url / oauth_tokens_endpoint); + + Poco::URI::QueryParameters params = { + {"grant_type", "client_credentials"}, + {"scope", auth_scope}, + {"client_id", client_id}, + {"client_secret", client_secret}, + }; + url.setQueryParameters(params); + } + else + { + url = Poco::URI(oauth_server_uri); + out_stream_callback = [&](std::ostream & os) + { + os << fmt::format( + "grant_type=client_credentials&scope={}&client_id={}&client_secret={}", + auth_scope, client_id, client_secret); + }; + } + + const auto & context = getContext(); + auto wb = DB::BuilderRWBufferFromHTTP(url) + .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) + .withMethod(Poco::Net::HTTPRequest::HTTP_POST) + .withSettings(context->getReadSettings()) + .withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings())) + .withHostFilter(&context->getRemoteHostFilter()) + .withOutCallback(std::move(out_stream_callback)) + .withSkipNotFound(false) + .withHeaders(headers) + .create(credentials); + + std::string json_str; + readJSONObjectPossiblyInvalid(json_str, *wb); + + Poco::JSON::Parser parser; + Poco::Dynamic::Var res_json = parser.parse(json_str); + const Poco::JSON::Object::Ptr & object = res_json.extract(); + + return object->get("access_token").extract(); +} + +std::optional RestCatalog::getStorageType() const +{ + if (config.default_base_location.empty()) + return std::nullopt; + return parseStorageTypeFromLocation(config.default_base_location); +} + +DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer( + const std::string & endpoint, + const Poco::URI::QueryParameters & params, + const DB::HTTPHeaderEntries & headers) const +{ + const auto & context = getContext(); + + Poco::URI url(base_url / endpoint); + if (!params.empty()) + url.setQueryParameters(params); + + auto create_buffer = [&](bool update_token) + { + auto result_headers = getAuthHeaders(update_token); + std::move(headers.begin(), headers.end(), std::back_inserter(result_headers)); + + return DB::BuilderRWBufferFromHTTP(url) + .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) + .withSettings(getContext()->getReadSettings()) + .withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings())) + .withHostFilter(&getContext()->getRemoteHostFilter()) + .withHeaders(result_headers) + .withDelayInit(false) + .withSkipNotFound(false) + .create(credentials); + }; + + LOG_TEST(log, "Requesting: {}", url.toString()); + + try + { + return create_buffer(false); + } + catch (const DB::HTTPException & e) + { + const auto status = e.getHTTPStatus(); + if (update_token_if_expired && + (status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_UNAUTHORIZED + || status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_FORBIDDEN)) + { + return create_buffer(true); + } + throw; + } +} + +bool RestCatalog::empty() const +{ + /// TODO: add a test with empty namespaces and zero namespaces. + bool found_table = false; + auto stop_condition = [&](const std::string & namespace_name) -> bool + { + const auto tables = getTables(namespace_name, /* limit */1); + found_table = !tables.empty(); + return found_table; + }; + + Namespaces namespaces; + getNamespacesRecursive("", namespaces, stop_condition, /* execute_func */{}); + + return found_table; +} + +DB::Names RestCatalog::getTables() const +{ + auto & pool = getContext()->getIcebergCatalogThreadpool(); + DB::ThreadPoolCallbackRunnerLocal runner(pool, "RestCatalog"); + + DB::Names tables; + std::mutex mutex; + + auto execute_for_each_namespace = [&](const std::string & current_namespace) + { + runner( + [=, &tables, &mutex, this] + { + auto tables_in_namespace = getTables(current_namespace); + std::lock_guard lock(mutex); + std::move(tables_in_namespace.begin(), tables_in_namespace.end(), std::back_inserter(tables)); + }); + }; + + Namespaces namespaces; + getNamespacesRecursive( + /* base_namespace */"", /// Empty base namespace means starting from root. + namespaces, + /* stop_condition */{}, + /* execute_func */execute_for_each_namespace); + + runner.waitForAllToFinishAndRethrowFirstError(); + return tables; +} + +void RestCatalog::getNamespacesRecursive( + const std::string & base_namespace, + Namespaces & result, + StopCondition stop_condition, + ExecuteFunc func) const +{ + checkStackSize(); + + auto namespaces = getNamespaces(base_namespace); + result.reserve(result.size() + namespaces.size()); + result.insert(result.end(), namespaces.begin(), namespaces.end()); + + for (const auto & current_namespace : namespaces) + { + chassert(current_namespace.starts_with(base_namespace)); + + if (stop_condition && stop_condition(current_namespace)) + break; + + if (func) + func(current_namespace); + + getNamespacesRecursive(current_namespace, result, stop_condition, func); + } +} + +Poco::URI::QueryParameters RestCatalog::createParentNamespaceParams(const std::string & base_namespace) const +{ + std::vector parts; + splitInto<'.'>(parts, base_namespace); + std::string parent_param; + for (const auto & part : parts) + { + /// 0x1F is a unit separator + /// https://github.com/apache/iceberg/blob/70d87f1750627b14b3b25a0216a97db86a786992/open-api/rest-catalog-open-api.yaml#L264 + if (!parent_param.empty()) + parent_param += static_cast(0x1F); + parent_param += part; + } + return {{"parent", parent_param}}; +} + +RestCatalog::Namespaces RestCatalog::getNamespaces(const std::string & base_namespace) const +{ + Poco::URI::QueryParameters params; + if (!base_namespace.empty()) + params = createParentNamespaceParams(base_namespace); + + try + { + auto buf = createReadBuffer(config.prefix / NAMESPACES_ENDPOINT, params); + auto namespaces = parseNamespaces(*buf, base_namespace); + LOG_TEST(log, "Loaded {} namespaces in base namespace {}", namespaces.size(), base_namespace); + return namespaces; + } + catch (const DB::HTTPException & e) + { + std::string message = fmt::format( + "Received error while fetching list of namespaces from iceberg catalog `{}`. ", + warehouse); + + if (e.code() == Poco::Net::HTTPResponse::HTTPStatus::HTTP_NOT_FOUND) + message += "Namespace provided in the `parent` query parameter is not found. "; + + message += fmt::format( + "Code: {}, status: {}, message: {}", + e.code(), e.getHTTPStatus(), e.displayText()); + + throw DB::Exception(DB::ErrorCodes::ICEBERG_CATALOG_ERROR, "{}", message); + } +} + +RestCatalog::Namespaces RestCatalog::parseNamespaces(DB::ReadBuffer & buf, const std::string & base_namespace) const +{ + if (buf.eof()) + return {}; + + String json_str; + readJSONObjectPossiblyInvalid(json_str, buf); + + LOG_TEST(log, "Received response: {}", json_str); + + try + { + Poco::JSON::Parser parser; + Poco::Dynamic::Var json = parser.parse(json_str); + const Poco::JSON::Object::Ptr & object = json.extract(); + + auto namespaces_object = object->get("namespaces").extract(); + if (!namespaces_object) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse result"); + + Namespaces namespaces; + for (size_t i = 0; i < namespaces_object->size(); ++i) + { + auto current_namespace_array = namespaces_object->get(static_cast(i)).extract(); + if (current_namespace_array->size() == 0) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Expected namespace array to be non-empty"); + + const int idx = static_cast(current_namespace_array->size()) - 1; + const auto current_namespace = current_namespace_array->get(idx).extract(); + const auto full_namespace = base_namespace.empty() + ? current_namespace + : base_namespace + "." + current_namespace; + + namespaces.push_back(full_namespace); + } + + return namespaces; + } + catch (DB::Exception & e) + { + e.addMessage("while parsing JSON: " + json_str); + throw; + } +} + +DB::Names RestCatalog::getTables(const std::string & base_namespace, size_t limit) const +{ + const std::string endpoint = std::filesystem::path(NAMESPACES_ENDPOINT) / base_namespace / "tables"; + auto buf = createReadBuffer(config.prefix / endpoint); + return parseTables(*buf, base_namespace, limit); +} + +DB::Names RestCatalog::parseTables(DB::ReadBuffer & buf, const std::string & base_namespace, size_t limit) const +{ + if (buf.eof()) + return {}; + + String json_str; + readJSONObjectPossiblyInvalid(json_str, buf); + + try + { + Poco::JSON::Parser parser; + Poco::Dynamic::Var json = parser.parse(json_str); + const Poco::JSON::Object::Ptr & object = json.extract(); + + auto identifiers_object = object->get("identifiers").extract(); + if (!identifiers_object) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse result"); + + DB::Names tables; + for (size_t i = 0; i < identifiers_object->size(); ++i) + { + const auto current_table_json = identifiers_object->get(static_cast(i)).extract(); + const auto table_name = current_table_json->get("name").extract(); + + tables.push_back(base_namespace + "." + table_name); + if (limit && tables.size() >= limit) + break; + } + + return tables; + } + catch (DB::Exception & e) + { + e.addMessage("while parsing JSON: " + json_str); + throw; + } +} + +bool RestCatalog::existsTable(const std::string & namespace_name, const std::string & table_name) const +{ + TableMetadata table_metadata; + return tryGetTableMetadata(namespace_name, table_name, table_metadata); +} + +bool RestCatalog::tryGetTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + TableMetadata & result) const +{ + try + { + return getTableMetadataImpl(namespace_name, table_name, result); + } + catch (...) + { + DB::tryLogCurrentException(log); + return false; + } +} + +void RestCatalog::getTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + TableMetadata & result) const +{ + if (!getTableMetadataImpl(namespace_name, table_name, result)) + throw DB::Exception(DB::ErrorCodes::ICEBERG_CATALOG_ERROR, "No response from iceberg catalog"); +} + +bool RestCatalog::getTableMetadataImpl( + const std::string & namespace_name, + const std::string & table_name, + TableMetadata & result) const +{ + LOG_TEST(log, "Checking table {} in namespace {}", table_name, namespace_name); + + DB::HTTPHeaderEntries headers; + if (result.requiresCredentials()) + { + /// Header `X-Iceberg-Access-Delegation` tells catalog to include storage credentials in LoadTableResponse. + /// Value can be one of the two: + /// 1. `vended-credentials` + /// 2. `remote-signing` + /// Currently we support only the first. + /// https://github.com/apache/iceberg/blob/3badfe0c1fcf0c0adfc7aa4a10f0b50365c48cf9/open-api/rest-catalog-open-api.yaml#L1832 + headers.emplace_back("X-Iceberg-Access-Delegation", "vended-credentials"); + } + + const std::string endpoint = std::filesystem::path(NAMESPACES_ENDPOINT) / namespace_name / "tables" / table_name; + auto buf = createReadBuffer(config.prefix / endpoint, /* params */{}, headers); + + if (buf->eof()) + { + LOG_TEST(log, "Table doesn't exist (endpoint: {})", endpoint); + return false; + } + + String json_str; + readJSONObjectPossiblyInvalid(json_str, *buf); + +#ifdef DEBUG_OR_SANITIZER_BUILD + /// This log message might contain credentials, + /// so log it only for debugging. + LOG_TEST(log, "Received metadata for table {}: {}", table_name, json_str); +#endif + + Poco::JSON::Parser parser; + Poco::Dynamic::Var json = parser.parse(json_str); + const Poco::JSON::Object::Ptr & object = json.extract(); + + auto metadata_object = object->get("metadata").extract(); + if (!metadata_object) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse result"); + + std::string location; + if (result.requiresLocation()) + { + location = metadata_object->get("location").extract(); + result.setLocation(location); + LOG_TEST(log, "Location for table {}: {}", table_name, location); + } + + if (result.requiresSchema()) + { + // int format_version = metadata_object->getValue("format-version"); + auto schema_processor = DB::IcebergSchemaProcessor(); + auto id = DB::IcebergMetadata::parseTableSchema(metadata_object, schema_processor, log); + auto schema = schema_processor.getClickhouseTableSchemaById(id); + result.setSchema(*schema); + } + + if (result.requiresCredentials() && object->has("config")) + { + auto config_object = object->get("config").extract(); + if (!config_object) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse config result"); + + auto storage_type = parseStorageTypeFromLocation(location); + switch (storage_type) + { + case StorageType::S3: + { + static constexpr auto access_key_id_str = "s3.access-key-id"; + static constexpr auto secret_access_key_str = "s3.secret-access-key"; + static constexpr auto session_token_str = "s3.session-token"; + + std::string access_key_id, secret_access_key, session_token; + if (config_object->has(access_key_id_str)) + access_key_id = config_object->get(access_key_id_str).extract(); + if (config_object->has(secret_access_key_str)) + secret_access_key = config_object->get(secret_access_key_str).extract(); + if (config_object->has(session_token_str)) + session_token = config_object->get(session_token_str).extract(); + + result.setStorageCredentials( + std::make_shared(access_key_id, secret_access_key, session_token)); + break; + } + default: + break; + } + } + + return true; +} + +} + +#endif diff --git a/src/Databases/Iceberg/RestCatalog.h b/src/Databases/Iceberg/RestCatalog.h new file mode 100644 index 00000000000..a1dbe268d0b --- /dev/null +++ b/src/Databases/Iceberg/RestCatalog.h @@ -0,0 +1,122 @@ +#pragma once +#include "config.h" + +#if USE_AVRO +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +class ReadBuffer; +} + +namespace Iceberg +{ + +class RestCatalog final : public ICatalog, private DB::WithContext +{ +public: + explicit RestCatalog( + const std::string & warehouse_, + const std::string & base_url_, + const std::string & catalog_credential_, + const std::string & auth_scope_, + const std::string & auth_header_, + const std::string & oauth_server_uri_, + DB::ContextPtr context_); + + ~RestCatalog() override = default; + + bool empty() const override; + + DB::Names getTables() const override; + + bool existsTable(const std::string & namespace_name, const std::string & table_name) const override; + + void getTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + TableMetadata & result) const override; + + bool tryGetTableMetadata( + const std::string & namespace_name, + const std::string & table_name, + TableMetadata & result) const override; + + std::optional getStorageType() const override; + +private: + struct Config + { + /// Prefix is a path of the catalog endpoint, + /// e.g. /v1/{prefix}/namespaces/{namespace}/tables/{table} + std::filesystem::path prefix; + /// Base location is location of data in storage + /// (in filesystem or object storage). + std::string default_base_location; + + std::string toString() const; + }; + + const std::filesystem::path base_url; + const LoggerPtr log; + + /// Catalog configuration settings from /v1/config endpoint. + Config config; + + /// Auth headers of format: "Authorization": " " + std::optional auth_header; + + /// Parameters for OAuth. + bool update_token_if_expired = false; + std::string client_id; + std::string client_secret; + std::string auth_scope; + std::string oauth_server_uri; + mutable std::optional access_token; + + Poco::Net::HTTPBasicCredentials credentials{}; + + DB::ReadWriteBufferFromHTTPPtr createReadBuffer( + const std::string & endpoint, + const Poco::URI::QueryParameters & params = {}, + const DB::HTTPHeaderEntries & headers = {}) const; + + Poco::URI::QueryParameters createParentNamespaceParams(const std::string & base_namespace) const; + + using StopCondition = std::function; + using ExecuteFunc = std::function; + + void getNamespacesRecursive( + const std::string & base_namespace, + Namespaces & result, + StopCondition stop_condition, + ExecuteFunc func) const; + + Namespaces getNamespaces(const std::string & base_namespace) const; + + Namespaces parseNamespaces(DB::ReadBuffer & buf, const std::string & base_namespace) const; + + DB::Names getTables(const std::string & base_namespace, size_t limit = 0) const; + + DB::Names parseTables(DB::ReadBuffer & buf, const std::string & base_namespace, size_t limit) const; + + bool getTableMetadataImpl( + const std::string & namespace_name, + const std::string & table_name, + TableMetadata & result) const; + + Config loadConfig(); + std::string retrieveAccessToken() const; + DB::HTTPHeaderEntries getAuthHeaders(bool update_token = false) const; + static void parseCatalogConfigurationSettings(const Poco::JSON::Object::Ptr & object, Config & result); +}; + +} + +#endif diff --git a/src/Databases/Iceberg/StorageCredentials.h b/src/Databases/Iceberg/StorageCredentials.h new file mode 100644 index 00000000000..90c20262cf4 --- /dev/null +++ b/src/Databases/Iceberg/StorageCredentials.h @@ -0,0 +1,51 @@ +#pragma once +#include +#include +#include + +namespace DB::ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} + +namespace Iceberg +{ + +class IStorageCredentials +{ +public: + virtual ~IStorageCredentials() = default; + + virtual void addCredentialsToEngineArgs(DB::ASTs & engine_args) const = 0; +}; + +class S3Credentials final : public IStorageCredentials +{ +public: + /// TODO: support region as well. + S3Credentials( + const std::string & access_key_id_, + const std::string & secret_access_key_, + const std::string session_token_) + : access_key_id(access_key_id_) + , secret_access_key(secret_access_key_) + , session_token(session_token_) + {} + + void addCredentialsToEngineArgs(DB::ASTs & engine_args) const override + { + if (engine_args.size() != 1) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Storage credentials specified in AST already"); + + engine_args.push_back(std::make_shared(access_key_id)); + engine_args.push_back(std::make_shared(secret_access_key)); + engine_args.push_back(std::make_shared(session_token)); + } + +private: + std::string access_key_id; + std::string secret_access_key; + std::string session_token; +}; + +} diff --git a/src/Databases/enableAllExperimentalSettings.cpp b/src/Databases/enableAllExperimentalSettings.cpp index 1be54664bc9..2b102095632 100644 --- a/src/Databases/enableAllExperimentalSettings.cpp +++ b/src/Databases/enableAllExperimentalSettings.cpp @@ -46,6 +46,7 @@ void enableAllExperimentalSettings(ContextMutablePtr context) context->setSetting("enable_zstd_qat_codec", 1); context->setSetting("allow_create_index_without_type", 1); context->setSetting("allow_experimental_s3queue", 1); + context->setSetting("allow_experimental_database_iceberg", 1); /// clickhouse-private settings context->setSetting("allow_experimental_shared_set_join", 1); diff --git a/src/Databases/registerDatabases.cpp b/src/Databases/registerDatabases.cpp index 4f7c229bdf4..8daba254891 100644 --- a/src/Databases/registerDatabases.cpp +++ b/src/Databases/registerDatabases.cpp @@ -36,6 +36,10 @@ void registerDatabaseS3(DatabaseFactory & factory); void registerDatabaseHDFS(DatabaseFactory & factory); #endif +#if USE_AVRO +void registerDatabaseIceberg(DatabaseFactory & factory); +#endif + void registerDatabases() { auto & factory = DatabaseFactory::instance(); @@ -68,5 +72,9 @@ void registerDatabases() #if USE_HDFS registerDatabaseHDFS(factory); #endif + +#if USE_AVRO + registerDatabaseIceberg(factory); +#endif } } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 84a93fa48bb..6ec108eb1ce 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -176,6 +176,9 @@ namespace CurrentMetrics extern const Metric AttachedDictionary; extern const Metric AttachedDatabase; extern const Metric PartsActive; + extern const Metric IcebergCatalogThreads; + extern const Metric IcebergCatalogThreadsActive; + extern const Metric IcebergCatalogThreadsScheduled; } @@ -284,6 +287,8 @@ namespace ServerSetting extern const ServerSettingsUInt64 load_marks_threadpool_queue_size; extern const ServerSettingsUInt64 threadpool_writer_pool_size; extern const ServerSettingsUInt64 threadpool_writer_queue_size; + extern const ServerSettingsUInt64 iceberg_catalog_threadpool_pool_size; + extern const ServerSettingsUInt64 iceberg_catalog_threadpool_queue_size; } @@ -415,6 +420,8 @@ struct ContextSharedPart : boost::noncopyable mutable std::unique_ptr load_marks_threadpool; /// Threadpool for loading marks cache. mutable OnceFlag prefetch_threadpool_initialized; mutable std::unique_ptr prefetch_threadpool; /// Threadpool for loading marks cache. + mutable std::unique_ptr iceberg_catalog_threadpool; + mutable OnceFlag iceberg_catalog_threadpool_initialized; mutable OnceFlag build_vector_similarity_index_threadpool_initialized; mutable std::unique_ptr build_vector_similarity_index_threadpool; /// Threadpool for vector-similarity index creation. mutable UncompressedCachePtr index_uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks for MergeTree indices. @@ -3257,6 +3264,23 @@ ThreadPool & Context::getLoadMarksThreadpool() const return *shared->load_marks_threadpool; } +ThreadPool & Context::getIcebergCatalogThreadpool() const +{ + callOnce(shared->iceberg_catalog_threadpool_initialized, [&] + { + auto pool_size = shared->server_settings[ServerSetting::iceberg_catalog_threadpool_pool_size]; + auto queue_size = shared->server_settings[ServerSetting::iceberg_catalog_threadpool_queue_size]; + + shared->iceberg_catalog_threadpool = std::make_unique( + CurrentMetrics::IcebergCatalogThreads, + CurrentMetrics::IcebergCatalogThreadsActive, + CurrentMetrics::IcebergCatalogThreadsScheduled, + pool_size, pool_size, queue_size); + }); + + return *shared->iceberg_catalog_threadpool; +} + void Context::setPrimaryIndexCache(const String & cache_policy, size_t max_cache_size_in_bytes, double size_ratio) { std::lock_guard lock(shared->mutex); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index ca2ea9190fc..5fd92d0a4d2 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1123,6 +1123,7 @@ public: size_t getPrefetchThreadpoolSize() const; ThreadPool & getBuildVectorSimilarityIndexThreadPool() const; + ThreadPool & getIcebergCatalogThreadpool() const; /// Settings for MergeTree background tasks stored in config.xml BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const; diff --git a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp index 9cf4f64cdb3..21682d93bfb 100644 --- a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp @@ -52,9 +52,6 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -Int32 parseTableSchema( - const Poco::JSON::Object::Ptr & metadata_object, IcebergSchemaProcessor & schema_processor, LoggerPtr metadata_logger); - IcebergMetadata::IcebergMetadata( ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, @@ -79,7 +76,6 @@ IcebergMetadata::IcebergMetadata( namespace { - enum class ManifestEntryStatus : uint8_t { EXISTING = 0, @@ -143,6 +139,7 @@ bool schemasAreIdentical(const Poco::JSON::Object & first, const Poco::JSON::Obj return false; return *(first.getArray(fields_key)) == *(second.getArray(fields_key)); } + } @@ -309,7 +306,7 @@ std::pair parseTableSchemaV1Method(const Poco::J return {schema, current_schema_id}; } -Int32 parseTableSchema( +Int32 DB::IcebergMetadata::parseTableSchema( const Poco::JSON::Object::Ptr & metadata_object, IcebergSchemaProcessor & schema_processor, LoggerPtr metadata_logger) { Int32 format_version = metadata_object->getValue("format-version"); @@ -577,7 +574,6 @@ getMetadataFileAndVersion(const ObjectStoragePtr & object_storage, const Storage return *std::max_element(metadata_files_with_versions.begin(), metadata_files_with_versions.end()); } - DataLakeMetadataPtr IcebergMetadata::create( const ObjectStoragePtr & object_storage, const ConfigurationObserverPtr & configuration, const ContextPtr & local_context) { diff --git a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h index fb5a7800228..d4e70030012 100644 --- a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h @@ -207,6 +207,9 @@ public: bool supportsExternalMetadataChange() const override { return true; } + static Int32 parseTableSchema( + const Poco::JSON::Object::Ptr & metadata_object, IcebergSchemaProcessor & schema_processor, LoggerPtr metadata_logger); + private: mutable std::unordered_map schema_id_by_data_file; diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 7b40b3a0e59..b0284508562 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -91,7 +91,8 @@ StorageObjectStorage::StorageObjectStorage( std::optional format_settings_, LoadingStrictnessLevel mode, bool distributed_processing_, - ASTPtr partition_by_) + ASTPtr partition_by_, + bool lazy_init) : IStorage(table_id_) , configuration(configuration_) , object_storage(object_storage_) @@ -102,10 +103,13 @@ StorageObjectStorage::StorageObjectStorage( { try { - if (configuration->hasExternalDynamicMetadata()) - configuration->updateAndGetCurrentSchema(object_storage, context); - else - configuration->update(object_storage, context); + if (!lazy_init) + { + if (configuration->hasExternalDynamicMetadata()) + configuration->updateAndGetCurrentSchema(object_storage, context); + else + configuration->update(object_storage, context); + } } catch (...) { diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 72f96591666..1349e24320f 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -68,7 +68,8 @@ public: std::optional format_settings_, LoadingStrictnessLevel mode, bool distributed_processing_ = false, - ASTPtr partition_by_ = nullptr); + ASTPtr partition_by_ = nullptr, + bool lazy_init = false); String getName() const override; diff --git a/tests/integration/compose/docker_compose_iceberg_rest_catalog.yml b/tests/integration/compose/docker_compose_iceberg_rest_catalog.yml new file mode 100644 index 00000000000..860d9e29230 --- /dev/null +++ b/tests/integration/compose/docker_compose_iceberg_rest_catalog.yml @@ -0,0 +1,59 @@ +services: + spark-iceberg: + image: tabulario/spark-iceberg + container_name: spark-iceberg + build: spark/ + depends_on: + - rest + - minio + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + ports: + - 8080:8080 + - 10000:10000 + - 10001:10001 + rest: + image: tabulario/iceberg-rest + ports: + - 8182:8181 + environment: + - AWS_ACCESS_KEY_ID=minio + - AWS_SECRET_ACCESS_KEY=minio123 + - AWS_REGION=us-east-1 + - CATALOG_WAREHOUSE=s3://iceberg_data/ + - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO + - CATALOG_S3_ENDPOINT=http://minio:9000 + minio: + image: minio/minio + container_name: minio + environment: + - MINIO_ROOT_USER=minio + - MINIO_ROOT_PASSWORD=minio123 + - MINIO_DOMAIN=minio + networks: + default: + aliases: + - warehouse.minio + ports: + - 9001:9001 + - 9002:9000 + command: ["server", "/data", "--console-address", ":9001"] + mc: + depends_on: + - minio + image: minio/mc + container_name: mc + environment: + - AWS_ACCESS_KEY_ID=minio + - AWS_SECRET_ACCESS_KEY=minio123 + - AWS_REGION=us-east-1 + entrypoint: > + /bin/sh -c " + until (/usr/bin/mc config host add minio http://minio:9000 minio minio123) do echo '...waiting...' && sleep 1; done; + /usr/bin/mc rm -r --force minio/warehouse; + /usr/bin/mc mb minio/warehouse --ignore-existing; + /usr/bin/mc policy set public minio/warehouse; + tail -f /dev/null + " diff --git a/tests/integration/compose/docker_compose_minio.yml b/tests/integration/compose/docker_compose_minio.yml index 7fbe3796a0c..b0dad3d1ab8 100644 --- a/tests/integration/compose/docker_compose_minio.yml +++ b/tests/integration/compose/docker_compose_minio.yml @@ -14,6 +14,10 @@ services: depends_on: - proxy1 - proxy2 + networks: + default: + aliases: + - warehouse.minio # HTTP proxies for Minio. proxy1: diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index 4e10e06118c..06914bff124 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -572,6 +572,7 @@ class ClickHouseCluster: self.resolver_logs_dir = os.path.join(self.instances_dir, "resolver") self.spark_session = None + self.with_iceberg_catalog = False self.with_azurite = False self.azurite_container = "azurite-container" @@ -1468,6 +1469,26 @@ class ClickHouseCluster: ) return self.base_minio_cmd + def setup_iceberg_catalog_cmd( + self, instance, env_variables, docker_compose_yml_dir + ): + self.base_cmd.extend( + [ + "--file", + p.join( + docker_compose_yml_dir, "docker_compose_iceberg_rest_catalog.yml" + ), + ] + ) + self.base_iceberg_catalog_cmd = self.compose_cmd( + "--env-file", + instance.env_file, + "--file", + p.join(docker_compose_yml_dir, "docker_compose_iceberg_rest_catalog.yml"), + ) + return self.base_iceberg_catalog_cmd + # return self.base_minio_cmd + def setup_azurite_cmd(self, instance, env_variables, docker_compose_yml_dir): self.with_azurite = True env_variables["AZURITE_PORT"] = str(self.azurite_port) @@ -1634,6 +1655,7 @@ class ClickHouseCluster: with_hive=False, with_coredns=False, with_prometheus=False, + with_iceberg_catalog=False, handle_prometheus_remote_write=False, handle_prometheus_remote_read=False, use_old_analyzer=None, @@ -1742,6 +1764,7 @@ class ClickHouseCluster: with_coredns=with_coredns, with_cassandra=with_cassandra, with_ldap=with_ldap, + with_iceberg_catalog=with_iceberg_catalog, use_old_analyzer=use_old_analyzer, server_bin_path=self.server_bin_path, odbc_bridge_bin_path=self.odbc_bridge_bin_path, @@ -1932,6 +1955,13 @@ class ClickHouseCluster: self.setup_minio_cmd(instance, env_variables, docker_compose_yml_dir) ) + if with_iceberg_catalog and not self.with_iceberg_catalog: + cmds.append( + self.setup_iceberg_catalog_cmd( + instance, env_variables, docker_compose_yml_dir + ) + ) + if with_azurite and not self.with_azurite: cmds.append( self.setup_azurite_cmd(instance, env_variables, docker_compose_yml_dir) @@ -3443,6 +3473,7 @@ class ClickHouseInstance: with_coredns, with_cassandra, with_ldap, + with_iceberg_catalog, use_old_analyzer, server_bin_path, odbc_bridge_bin_path, diff --git a/tests/integration/test_database_iceberg/__init__.py b/tests/integration/test_database_iceberg/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py new file mode 100644 index 00000000000..6dbaccce648 --- /dev/null +++ b/tests/integration/test_database_iceberg/test.py @@ -0,0 +1,309 @@ +import glob +import json +import logging +import os +import random +import time +import uuid +from datetime import datetime, timedelta + +import pyarrow as pa +import pytest +import requests +import urllib3 +from minio import Minio +from pyiceberg.catalog import load_catalog +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table.sorting import SortField, SortOrder +from pyiceberg.transforms import DayTransform, IdentityTransform +from pyiceberg.types import ( + DoubleType, + FloatType, + NestedField, + StringType, + StructType, + TimestampType, +) + +from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm +from helpers.s3_tools import get_file_contents, list_s3_objects, prepare_s3_bucket +from helpers.test_tools import TSV, csv_compare + +BASE_URL = "http://rest:8181/v1" +BASE_URL_LOCAL = "http://localhost:8182/v1" +BASE_URL_LOCAL_RAW = "http://localhost:8182" + +CATALOG_NAME = "demo" + +DEFAULT_SCHEMA = Schema( + NestedField( + field_id=1, name="datetime", field_type=TimestampType(), required=False + ), + NestedField(field_id=2, name="symbol", field_type=StringType(), required=False), + NestedField(field_id=3, name="bid", field_type=DoubleType(), required=False), + NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False), + NestedField( + field_id=5, + name="details", + field_type=StructType( + NestedField( + field_id=4, + name="created_by", + field_type=StringType(), + required=False, + ), + ), + required=False, + ), +) + +DEFAULT_CREATE_TABLE = "CREATE TABLE {}.`{}.{}`\\n(\\n `datetime` Nullable(DateTime64(6)),\\n `symbol` Nullable(String),\\n `bid` Nullable(Float64),\\n `ask` Nullable(Float64),\\n `details` Tuple(created_by Nullable(String))\\n)\\nENGINE = Iceberg(\\'http://minio:9000/warehouse/data/\\', \\'minio\\', \\'[HIDDEN]\\')\n" + +DEFAULT_PARTITION_SPEC = PartitionSpec( + PartitionField( + source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day" + ) +) + +DEFAULT_SORT_ORDER = SortOrder(SortField(source_id=2, transform=IdentityTransform())) + + +def list_namespaces(): + response = requests.get(f"{BASE_URL_LOCAL}/namespaces") + if response.status_code == 200: + return response.json() + else: + raise Exception(f"Failed to list namespaces: {response.status_code}") + + +def load_catalog_impl(started_cluster): + return load_catalog( + CATALOG_NAME, + **{ + "uri": BASE_URL_LOCAL_RAW, + "type": "rest", + "s3.endpoint": f"http://localhost:9002", + "s3.access-key-id": "minio", + "s3.secret-access-key": "minio123", + }, + ) + + +def create_table( + catalog, + namespace, + table, + schema=DEFAULT_SCHEMA, + partition_spec=DEFAULT_PARTITION_SPEC, + sort_order=DEFAULT_SORT_ORDER, +): + return catalog.create_table( + identifier=f"{namespace}.{table}", + schema=schema, + location=f"s3://warehouse/data", + partition_spec=partition_spec, + sort_order=sort_order, + ) + + +def generate_record(): + return { + "datetime": datetime.now(), + "symbol": str("kek"), + "bid": round(random.uniform(100, 200), 2), + "ask": round(random.uniform(200, 300), 2), + "details": {"created_by": "Alice Smith"}, + } + + +def create_clickhouse_iceberg_database(started_cluster, node, name): + node.query( + f""" +DROP DATABASE IF EXISTS {name}; +SET allow_experimental_database_iceberg=true; +CREATE DATABASE {name} ENGINE = Iceberg('{BASE_URL}', 'minio', 'minio123') +SETTINGS catalog_type = 'rest', + storage_endpoint = 'http://minio:9000/warehouse', + warehouse='demo' + """ + ) + + +def print_objects(): + minio_client = Minio( + f"localhost:9002", + access_key="minio", + secret_key="minio123", + secure=False, + http_client=urllib3.PoolManager(cert_reqs="CERT_NONE"), + ) + + objects = list(minio_client.list_objects("warehouse", "", recursive=True)) + names = [x.object_name for x in objects] + names.sort() + for name in names: + print(f"Found object: {name}") + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster = ClickHouseCluster(__file__) + cluster.add_instance( + "node1", + main_configs=[], + user_configs=[], + stay_alive=True, + with_iceberg_catalog=True, + ) + + logging.info("Starting cluster...") + cluster.start() + + # TODO: properly wait for container + time.sleep(10) + + yield cluster + + finally: + cluster.shutdown() + + +def test_list_tables(started_cluster): + node = started_cluster.instances["node1"] + + root_namespace = f"clickhouse_{uuid.uuid4()}" + namespace_1 = f"{root_namespace}.testA.A" + namespace_2 = f"{root_namespace}.testB.B" + namespace_1_tables = ["tableA", "tableB"] + namespace_2_tables = ["tableC", "tableD"] + + catalog = load_catalog_impl(started_cluster) + + for namespace in [namespace_1, namespace_2]: + catalog.create_namespace(namespace) + + found = False + for namespace_list in list_namespaces()["namespaces"]: + if root_namespace == namespace_list[0]: + found = True + break + assert found + + found = False + for namespace_list in catalog.list_namespaces(): + if root_namespace == namespace_list[0]: + found = True + break + assert found + + for namespace in [namespace_1, namespace_2]: + assert len(catalog.list_tables(namespace)) == 0 + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + tables_list = "" + for table in namespace_1_tables: + create_table(catalog, namespace_1, table) + if len(tables_list) > 0: + tables_list += "\n" + tables_list += f"{namespace_1}.{table}" + + for table in namespace_2_tables: + create_table(catalog, namespace_2, table) + if len(tables_list) > 0: + tables_list += "\n" + tables_list += f"{namespace_2}.{table}" + + assert ( + tables_list + == node.query( + f"SELECT name FROM system.tables WHERE database = '{CATALOG_NAME}' and name ILIKE '{root_namespace}%' ORDER BY name" + ).strip() + ) + node.restart_clickhouse() + assert ( + tables_list + == node.query( + f"SELECT name FROM system.tables WHERE database = '{CATALOG_NAME}' and name ILIKE '{root_namespace}%' ORDER BY name" + ).strip() + ) + + expected = DEFAULT_CREATE_TABLE.format(CATALOG_NAME, namespace_2, "tableC") + assert expected == node.query( + f"SHOW CREATE TABLE {CATALOG_NAME}.`{namespace_2}.tableC`" + ) + + +def test_many_namespaces(started_cluster): + node = started_cluster.instances["node1"] + root_namespace_1 = f"A_{uuid.uuid4()}" + root_namespace_2 = f"B_{uuid.uuid4()}" + namespaces = [ + f"{root_namespace_1}", + f"{root_namespace_1}.B.C", + f"{root_namespace_1}.B.C.D", + f"{root_namespace_1}.B.C.D.E", + f"{root_namespace_2}", + f"{root_namespace_2}.C", + f"{root_namespace_2}.CC", + ] + tables = ["A", "B", "C"] + catalog = load_catalog_impl(started_cluster) + + for namespace in namespaces: + catalog.create_namespace(namespace) + for table in tables: + create_table(catalog, namespace, table) + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + for namespace in namespaces: + for table in tables: + table_name = f"{namespace}.{table}" + assert int( + node.query( + f"SELECT count() FROM system.tables WHERE database = '{CATALOG_NAME}' and name = '{table_name}'" + ) + ) + + +def test_select(started_cluster): + node = started_cluster.instances["node1"] + + test_ref = f"test_list_tables_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + root_namespace = f"{test_ref}_namespace" + + namespace = f"{root_namespace}.A.B.C" + namespaces_to_create = [ + root_namespace, + f"{root_namespace}.A", + f"{root_namespace}.A.B", + f"{root_namespace}.A.B.C", + ] + + catalog = load_catalog_impl(started_cluster) + + for namespace in namespaces_to_create: + catalog.create_namespace(namespace) + assert len(catalog.list_tables(namespace)) == 0 + + table = create_table(catalog, namespace, table_name) + + num_rows = 10 + data = [generate_record() for _ in range(num_rows)] + df = pa.Table.from_pylist(data) + table.append(df) + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + expected = DEFAULT_CREATE_TABLE.format(CATALOG_NAME, namespace, table_name) + assert expected == node.query( + f"SHOW CREATE TABLE {CATALOG_NAME}.`{namespace}.{table_name}`" + ) + + assert num_rows == int( + node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace}.{table_name}`") + )