Merge pull request #71542 from ClickHouse/rest-catalog

Integrate with Iceberg REST Catalog
This commit is contained in:
Kseniia Sumarokova 2024-12-12 10:04:24 +00:00 committed by GitHub
commit 352e9d7397
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 2107 additions and 14 deletions

View File

@ -36,7 +36,7 @@ geomet==0.2.1.post1
grpcio-tools==1.60.0
grpcio==1.60.0
gssapi==1.8.3
httplib2==0.20.2
httplib2==0.22.0
idna==3.7
importlib-metadata==4.6.4
iniconfig==2.0.0
@ -72,7 +72,7 @@ pyarrow==17.0.0
pycparser==2.22
pycryptodome==3.20.0
pymongo==3.11.0
pyparsing==2.4.7
pyparsing==3.1.0
pyspark==3.3.2
pyspnego==0.10.2
pytest-order==1.0.0
@ -101,4 +101,5 @@ wadllib==1.3.6
websocket-client==1.8.0
wheel==0.38.1
zipp==1.0.0
pyiceberg==0.7.1
jinja2==3.1.3

View File

@ -160,6 +160,8 @@ if (TARGET ch_contrib::hdfs)
add_headers_and_sources(dbms Disks/ObjectStorages/HDFS)
endif()
add_headers_and_sources(dbms Databases/Iceberg)
add_headers_and_sources(dbms Disks/ObjectStorages/Cached)
add_headers_and_sources(dbms Disks/ObjectStorages/Local)
add_headers_and_sources(dbms Disks/ObjectStorages/Web)

View File

@ -189,6 +189,9 @@
M(BuildVectorSimilarityIndexThreads, "Number of threads in the build vector similarity index thread pool.") \
M(BuildVectorSimilarityIndexThreadsActive, "Number of threads in the build vector similarity index thread pool running a task.") \
M(BuildVectorSimilarityIndexThreadsScheduled, "Number of queued or active jobs in the build vector similarity index thread pool.") \
M(IcebergCatalogThreads, "Number of threads in the IcebergCatalog thread pool.") \
M(IcebergCatalogThreadsActive, "Number of threads in the IcebergCatalog thread pool running a task.") \
M(IcebergCatalogThreadsScheduled, "Number of queued or active jobs in the IcebergCatalog thread pool.") \
\
M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \
M(DiskPlainRewritableAzureFileCount, "Number of file entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \

View File

@ -613,6 +613,7 @@
M(733, TABLE_IS_BEING_RESTARTED) \
M(734, CANNOT_WRITE_AFTER_BUFFER_CANCELED) \
M(735, QUERY_WAS_CANCELLED_BY_CLIENT) \
M(736, ICEBERG_CATALOG_ERROR) \
\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

View File

@ -210,6 +210,8 @@ namespace DB
DECLARE(UInt64, load_marks_threadpool_queue_size, 1000000, "Number of tasks which is possible to push into prefetches pool", 0) \
DECLARE(UInt64, threadpool_writer_pool_size, 100, "Size of background pool for write requests to object storages", 0) \
DECLARE(UInt64, threadpool_writer_queue_size, 1000000, "Number of tasks which is possible to push into background pool for write requests to object storages", 0) \
DECLARE(UInt64, iceberg_catalog_threadpool_pool_size, 50, "Size of background pool for iceberg catalog", 0) \
DECLARE(UInt64, iceberg_catalog_threadpool_queue_size, 1000000, "Number of tasks which is possible to push into iceberg catalog pool", 0) \
DECLARE(UInt32, allow_feature_tier, 0, "0 - All feature tiers allowed (experimental, beta, production). 1 - Only beta and production feature tiers allowed. 2 - Only production feature tier allowed", 0) \

View File

@ -5560,6 +5560,9 @@ Only available in ClickHouse Cloud. Exclude new data parts from SELECT queries u
)", 0) \
DECLARE(Int64, prefer_warmed_unmerged_parts_seconds, 0, R"(
Only available in ClickHouse Cloud. If a merged part is less than this many seconds old and is not pre-warmed (see cache_populated_by_fetch), but all its source parts are available and pre-warmed, SELECT queries will read from those parts instead. Only for ReplicatedMergeTree. Note that this only checks whether CacheWarmer processed the part; if the part was fetched into cache by something else, it'll still be considered cold until CacheWarmer gets to it; if it was warmed, then evicted from cache, it'll still be considered warm.
)", 0) \
DECLARE(Bool, allow_experimental_database_iceberg, false, R"(
Allow experimental database engine Iceberg
)", 0) \
DECLARE(Bool, allow_deprecated_error_prone_window_functions, false, R"(
Allow usage of deprecated error prone window functions (neighbor, runningAccumulate, runningDifferenceStartingWithFirstValue, runningDifference)

View File

@ -60,6 +60,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{
{"24.12",
{
{"allow_experimental_database_iceberg", false, false, "New setting."},
{"query_plan_join_swap_table", "false", "auto", "New setting. Right table was always chosen before."},
{"max_size_to_preallocate_for_aggregation", 100'000'000, 1'000'000'000'000, "Enable optimisation for bigger tables."},
{"max_size_to_preallocate_for_joins", 100'000'000, 1'000'000'000'000, "Enable optimisation for bigger tables."},

View File

@ -280,4 +280,7 @@ IMPLEMENT_SETTING_ENUM(
{"StochasticSimple", MergeSelectorAlgorithm::STOCHASTIC_SIMPLE},
{"Trivial", MergeSelectorAlgorithm::TRIVIAL}})
IMPLEMENT_SETTING_ENUM(DatabaseIcebergCatalogType, ErrorCodes::BAD_ARGUMENTS,
{{"rest", DatabaseIcebergCatalogType::REST}})
}

View File

@ -359,4 +359,11 @@ DECLARE_SETTING_ENUM(GroupArrayActionWhenLimitReached)
DECLARE_SETTING_ENUM(MergeSelectorAlgorithm)
enum class DatabaseIcebergCatalogType : uint8_t
{
REST,
};
DECLARE_SETTING_ENUM(DatabaseIcebergCatalogType)
}

View File

@ -0,0 +1,377 @@
#include <Databases/Iceberg/DatabaseIceberg.h>
#if USE_AVRO
#include <Access/Common/HTTPAuthenticationScheme.h>
#include <Core/Settings.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/Iceberg/RestCatalog.h>
#include <DataTypes/DataTypeString.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ConstraintsDescription.h>
#include <Storages/StorageNull.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include <Interpreters/StorageID.h>
#include <Formats/FormatFactory.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTDataType.h>
namespace DB
{
namespace DatabaseIcebergSetting
{
extern const DatabaseIcebergSettingsDatabaseIcebergCatalogType catalog_type;
extern const DatabaseIcebergSettingsString warehouse;
extern const DatabaseIcebergSettingsString catalog_credential;
extern const DatabaseIcebergSettingsString auth_header;
extern const DatabaseIcebergSettingsString auth_scope;
extern const DatabaseIcebergSettingsString storage_endpoint;
extern const DatabaseIcebergSettingsString oauth_server_uri;
extern const DatabaseIcebergSettingsBool vended_credentials;
}
namespace Setting
{
extern const SettingsBool allow_experimental_database_iceberg;
}
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int SUPPORT_IS_DISABLED;
}
namespace
{
/// Parse a string, containing at least one dot, into a two substrings:
/// A.B.C.D.E -> A.B.C.D and E, where
/// `A.B.C.D` is a table "namespace".
/// `E` is a table name.
std::pair<std::string, std::string> parseTableName(const std::string & name)
{
auto pos = name.rfind('.');
if (pos == std::string::npos)
throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Table cannot have empty namespace: {}", name);
auto table_name = name.substr(pos + 1);
auto namespace_name = name.substr(0, name.size() - table_name.size() - 1);
return {namespace_name, table_name};
}
}
DatabaseIceberg::DatabaseIceberg(
const std::string & database_name_,
const std::string & url_,
const DatabaseIcebergSettings & settings_,
ASTPtr database_engine_definition_)
: IDatabase(database_name_)
, url(url_)
, settings(settings_)
, database_engine_definition(database_engine_definition_)
, log(getLogger("DatabaseIceberg(" + database_name_ + ")"))
{
validateSettings();
}
void DatabaseIceberg::validateSettings()
{
if (settings[DatabaseIcebergSetting::warehouse].value.empty())
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "`warehouse` setting cannot be empty. "
"Please specify 'SETTINGS warehouse=<warehouse_name>' in the CREATE DATABASE query");
}
}
std::shared_ptr<Iceberg::ICatalog> DatabaseIceberg::getCatalog() const
{
if (catalog_impl)
return catalog_impl;
switch (settings[DatabaseIcebergSetting::catalog_type].value)
{
case DB::DatabaseIcebergCatalogType::REST:
{
catalog_impl = std::make_shared<Iceberg::RestCatalog>(
settings[DatabaseIcebergSetting::warehouse].value,
url,
settings[DatabaseIcebergSetting::catalog_credential].value,
settings[DatabaseIcebergSetting::auth_scope].value,
settings[DatabaseIcebergSetting::auth_header],
settings[DatabaseIcebergSetting::oauth_server_uri].value,
Context::getGlobalContextInstance());
}
}
return catalog_impl;
}
std::shared_ptr<StorageObjectStorage::Configuration> DatabaseIceberg::getConfiguration(DatabaseIcebergStorageType type) const
{
/// TODO: add tests for azure, local storage types.
switch (type)
{
#if USE_AWS_S3
case DB::DatabaseIcebergStorageType::S3:
{
return std::make_shared<StorageS3IcebergConfiguration>();
}
#endif
#if USE_AZURE_BLOB_STORAGE
case DB::DatabaseIcebergStorageType::Azure:
{
return std::make_shared<StorageAzureIcebergConfiguration>();
}
#endif
#if USE_HDFS
case DB::DatabaseIcebergStorageType::HDFS:
{
return std::make_shared<StorageHDFSIcebergConfiguration>();
}
#endif
case DB::DatabaseIcebergStorageType::Local:
{
return std::make_shared<StorageLocalIcebergConfiguration>();
}
#if !USE_AWS_S3 || !USE_AZURE_BLOB_STORAGE || !USE_HDFS
default:
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Server does not contain support for storage type {}",
type);
#endif
}
}
std::string DatabaseIceberg::getStorageEndpointForTable(const Iceberg::TableMetadata & table_metadata) const
{
auto endpoint_from_settings = settings[DatabaseIcebergSetting::storage_endpoint].value;
if (!endpoint_from_settings.empty())
{
return std::filesystem::path(endpoint_from_settings)
/ table_metadata.getLocation(/* path_only */true)
/ "";
}
else
{
return std::filesystem::path(table_metadata.getLocation(/* path_only */false)) / "";
}
}
bool DatabaseIceberg::empty() const
{
return getCatalog()->empty();
}
bool DatabaseIceberg::isTableExist(const String & name, ContextPtr /* context_ */) const
{
const auto [namespace_name, table_name] = parseTableName(name);
return getCatalog()->existsTable(namespace_name, table_name);
}
StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_) const
{
auto catalog = getCatalog();
auto table_metadata = Iceberg::TableMetadata().withLocation().withSchema();
const bool with_vended_credentials = settings[DatabaseIcebergSetting::vended_credentials].value;
if (with_vended_credentials)
table_metadata = table_metadata.withStorageCredentials();
auto [namespace_name, table_name] = parseTableName(name);
if (!catalog->tryGetTableMetadata(namespace_name, table_name, table_metadata))
return nullptr;
/// Take database engine definition AST as base.
ASTStorage * storage = database_engine_definition->as<ASTStorage>();
ASTs args = storage->engine->arguments->children;
/// Replace Iceberg Catalog endpoint with storage path endpoint of requested table.
auto table_endpoint = getStorageEndpointForTable(table_metadata);
args[0] = std::make_shared<ASTLiteral>(table_endpoint);
/// We either fetch storage credentials from catalog
/// or get storage credentials from database engine arguments
/// in CREATE query (e.g. in `args`).
/// Vended credentials can be disabled in catalog itself,
/// so we have a separate setting to know whether we should even try to fetch them.
if (with_vended_credentials && args.size() == 1)
{
auto storage_credentials = table_metadata.getStorageCredentials();
if (storage_credentials)
storage_credentials->addCredentialsToEngineArgs(args);
}
else if (args.size() == 1)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Either vended credentials need to be enabled "
"or storage credentials need to be specified in database engine arguments in CREATE query");
}
LOG_TEST(log, "Using table endpoint: {}", table_endpoint);
const auto columns = ColumnsDescription(table_metadata.getSchema());
DatabaseIcebergStorageType storage_type;
auto storage_type_from_catalog = catalog->getStorageType();
if (storage_type_from_catalog.has_value())
storage_type = storage_type_from_catalog.value();
else
storage_type = table_metadata.getStorageType();
const auto configuration = getConfiguration(storage_type);
auto storage_settings = std::make_unique<StorageObjectStorageSettings>();
/// with_table_structure = false: because there will be
/// no table structure in table definition AST.
StorageObjectStorage::Configuration::initialize(*configuration, args, context_, /* with_table_structure */false, std::move(storage_settings));
return std::make_shared<StorageObjectStorage>(
configuration,
configuration->createObjectStorage(context_, /* is_readonly */ false),
context_,
StorageID(getDatabaseName(), name),
/* columns */columns,
/* constraints */ConstraintsDescription{},
/* comment */"",
getFormatSettings(context_),
LoadingStrictnessLevel::CREATE,
/* distributed_processing */false,
/* partition_by */nullptr,
/* lazy_init */true);
}
DatabaseTablesIteratorPtr DatabaseIceberg::getTablesIterator(
ContextPtr context_,
const FilterByNameFunction & filter_by_table_name,
bool /* skip_not_loaded */) const
{
Tables tables;
auto catalog = getCatalog();
const auto iceberg_tables = catalog->getTables();
for (const auto & table_name : iceberg_tables)
{
if (filter_by_table_name && !filter_by_table_name(table_name))
continue;
auto storage = tryGetTable(table_name, context_);
[[maybe_unused]] bool inserted = tables.emplace(table_name, storage).second;
chassert(inserted);
}
return std::make_unique<DatabaseTablesSnapshotIterator>(tables, getDatabaseName());
}
ASTPtr DatabaseIceberg::getCreateDatabaseQuery() const
{
const auto & create_query = std::make_shared<ASTCreateQuery>();
create_query->setDatabase(getDatabaseName());
create_query->set(create_query->storage, database_engine_definition);
return create_query;
}
ASTPtr DatabaseIceberg::getCreateTableQueryImpl(
const String & name,
ContextPtr /* context_ */,
bool /* throw_on_error */) const
{
auto catalog = getCatalog();
auto table_metadata = Iceberg::TableMetadata().withLocation().withSchema();
const auto [namespace_name, table_name] = parseTableName(name);
catalog->getTableMetadata(namespace_name, table_name, table_metadata);
auto create_table_query = std::make_shared<ASTCreateQuery>();
auto table_storage_define = database_engine_definition->clone();
auto * storage = table_storage_define->as<ASTStorage>();
storage->engine->kind = ASTFunction::Kind::TABLE_ENGINE;
storage->settings = {};
create_table_query->set(create_table_query->storage, table_storage_define);
auto columns_declare_list = std::make_shared<ASTColumns>();
auto columns_expression_list = std::make_shared<ASTExpressionList>();
columns_declare_list->set(columns_declare_list->columns, columns_expression_list);
create_table_query->set(create_table_query->columns_list, columns_declare_list);
create_table_query->setTable(name);
create_table_query->setDatabase(getDatabaseName());
for (const auto & column_type_and_name : table_metadata.getSchema())
{
const auto column_declaration = std::make_shared<ASTColumnDeclaration>();
column_declaration->name = column_type_and_name.name;
column_declaration->type = makeASTDataType(column_type_and_name.type->getName());
columns_expression_list->children.emplace_back(column_declaration);
}
auto storage_engine_arguments = storage->engine->arguments;
if (storage_engine_arguments->children.empty())
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Unexpected number of arguments: {}",
storage_engine_arguments->children.size());
}
auto table_endpoint = getStorageEndpointForTable(table_metadata);
storage_engine_arguments->children[0] = std::make_shared<ASTLiteral>(table_endpoint);
return create_table_query;
}
void registerDatabaseIceberg(DatabaseFactory & factory)
{
auto create_fn = [](const DatabaseFactory::Arguments & args)
{
if (!args.create_query.attach
&& !args.context->getSettingsRef()[Setting::allow_experimental_database_iceberg])
{
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"DatabaseIceberg engine is experimental. "
"To allow its usage, enable setting allow_experimental_database_iceberg");
}
const auto * database_engine_define = args.create_query.storage;
const auto & database_engine_name = args.engine_name;
const ASTFunction * function_define = database_engine_define->engine;
if (!function_define->arguments)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name);
ASTs & engine_args = function_define->arguments->children;
if (engine_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name);
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context);
const auto url = engine_args[0]->as<ASTLiteral>()->value.safeGet<String>();
DatabaseIcebergSettings database_settings;
if (database_engine_define->settings)
database_settings.loadFromQuery(*database_engine_define);
return std::make_shared<DatabaseIceberg>(
args.database_name,
url,
database_settings,
database_engine_define->clone());
};
factory.registerDatabase("Iceberg", create_fn, { .supports_arguments = true, .supports_settings = true });
}
}
#endif

View File

@ -0,0 +1,66 @@
#pragma once
#include "config.h"
#if USE_AVRO
#include <Databases/DatabasesCommon.h>
#include <Databases/Iceberg/DatabaseIcebergSettings.h>
#include <Databases/Iceberg/ICatalog.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Poco/Net/HTTPBasicCredentials.h>
namespace DB
{
class DatabaseIceberg final : public IDatabase, WithContext
{
public:
explicit DatabaseIceberg(
const std::string & database_name_,
const std::string & url_,
const DatabaseIcebergSettings & settings_,
ASTPtr database_engine_definition_);
String getEngineName() const override { return "Iceberg"; }
bool canContainMergeTreeTables() const override { return false; }
bool canContainDistributedTables() const override { return false; }
bool shouldBeEmptyOnDetach() const override { return false; }
bool empty() const override;
bool isTableExist(const String & name, ContextPtr context) const override;
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;
DatabaseTablesIteratorPtr getTablesIterator(
ContextPtr context,
const FilterByNameFunction & filter_by_table_name,
bool skip_not_loaded) const override;
void shutdown() override {}
ASTPtr getCreateDatabaseQuery() const override;
protected:
ASTPtr getCreateTableQueryImpl(const String & table_name, ContextPtr context, bool throw_on_error) const override;
private:
/// Iceberg Catalog url.
const std::string url;
/// SETTINGS from CREATE query.
const DatabaseIcebergSettings settings;
/// Database engine definition taken from initial CREATE DATABASE query.
const ASTPtr database_engine_definition;
const LoggerPtr log;
/// Crendetials to authenticate Iceberg Catalog.
Poco::Net::HTTPBasicCredentials credentials;
mutable std::shared_ptr<Iceberg::ICatalog> catalog_impl;
void validateSettings();
std::shared_ptr<Iceberg::ICatalog> getCatalog() const;
std::shared_ptr<StorageObjectStorage::Configuration> getConfiguration(DatabaseIcebergStorageType type) const;
std::string getStorageEndpointForTable(const Iceberg::TableMetadata & table_metadata) const;
};
}
#endif

View File

@ -0,0 +1,88 @@
#include <Core/BaseSettings.h>
#include <Core/BaseSettingsFwdMacrosImpl.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSetQuery.h>
#include <Databases/Iceberg/DatabaseIcebergSettings.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_SETTING;
}
#define DATABASE_ICEBERG_RELATED_SETTINGS(DECLARE, ALIAS) \
DECLARE(DatabaseIcebergCatalogType, catalog_type, DatabaseIcebergCatalogType::REST, "Catalog type", 0) \
DECLARE(String, catalog_credential, "", "", 0) \
DECLARE(Bool, vended_credentials, true, "Use vended credentials (storage credentials) from catalog", 0) \
DECLARE(String, auth_scope, "PRINCIPAL_ROLE:ALL", "Authorization scope for client credentials or token exchange", 0) \
DECLARE(String, oauth_server_uri, "", "OAuth server uri", 0) \
DECLARE(String, warehouse, "", "Warehouse name inside the catalog", 0) \
DECLARE(String, auth_header, "", "Authorization header of format 'Authorization: <scheme> <auth_info>'", 0) \
DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \
#define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \
DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS)
DECLARE_SETTINGS_TRAITS(DatabaseIcebergSettingsTraits, LIST_OF_DATABASE_ICEBERG_SETTINGS)
IMPLEMENT_SETTINGS_TRAITS(DatabaseIcebergSettingsTraits, LIST_OF_DATABASE_ICEBERG_SETTINGS)
struct DatabaseIcebergSettingsImpl : public BaseSettings<DatabaseIcebergSettingsTraits>
{
};
#define INITIALIZE_SETTING_EXTERN(TYPE, NAME, DEFAULT, DESCRIPTION, FLAGS) \
DatabaseIcebergSettings##TYPE NAME = &DatabaseIcebergSettingsImpl ::NAME;
namespace DatabaseIcebergSetting
{
LIST_OF_DATABASE_ICEBERG_SETTINGS(INITIALIZE_SETTING_EXTERN, SKIP_ALIAS)
}
#undef INITIALIZE_SETTING_EXTERN
DatabaseIcebergSettings::DatabaseIcebergSettings() : impl(std::make_unique<DatabaseIcebergSettingsImpl>())
{
}
DatabaseIcebergSettings::DatabaseIcebergSettings(const DatabaseIcebergSettings & settings)
: impl(std::make_unique<DatabaseIcebergSettingsImpl>(*settings.impl))
{
}
DatabaseIcebergSettings::DatabaseIcebergSettings(DatabaseIcebergSettings && settings) noexcept
: impl(std::make_unique<DatabaseIcebergSettingsImpl>(std::move(*settings.impl)))
{
}
DatabaseIcebergSettings::~DatabaseIcebergSettings() = default;
DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseIcebergSettings, IMPLEMENT_SETTING_SUBSCRIPT_OPERATOR)
void DatabaseIcebergSettings::applyChanges(const SettingsChanges & changes)
{
impl->applyChanges(changes);
}
void DatabaseIcebergSettings::loadFromQuery(const ASTStorage & storage_def)
{
if (storage_def.settings)
{
try
{
impl->applyChanges(storage_def.settings->changes);
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
e.addMessage("for database engine " + storage_def.engine->name);
throw;
}
}
}
}

View File

@ -0,0 +1,40 @@
#pragma once
#include <Core/BaseSettingsFwdMacros.h>
#include <Core/FormatFactorySettings.h>
#include <Core/SettingsEnums.h>
#include <Core/SettingsFields.h>
namespace DB
{
class ASTStorage;
struct DatabaseIcebergSettingsImpl;
class SettingsChanges;
/// List of available types supported in DatabaseIcebergSettings object
#define DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \
M(CLASS_NAME, String) \
M(CLASS_NAME, UInt64) \
M(CLASS_NAME, Bool) \
M(CLASS_NAME, DatabaseIcebergCatalogType) \
DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseIcebergSettings, DECLARE_SETTING_TRAIT)
struct DatabaseIcebergSettings
{
DatabaseIcebergSettings();
DatabaseIcebergSettings(const DatabaseIcebergSettings & settings);
DatabaseIcebergSettings(DatabaseIcebergSettings && settings) noexcept;
~DatabaseIcebergSettings();
DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseIcebergSettings, DECLARE_SETTING_SUBSCRIPT_OPERATOR)
void loadFromQuery(const ASTStorage & storage_def);
void applyChanges(const SettingsChanges & changes);
private:
std::unique_ptr<DatabaseIcebergSettingsImpl> impl;
};
}

View File

@ -0,0 +1,15 @@
#pragma once
#include <Core/Types.h>
namespace DB
{
enum class DatabaseIcebergStorageType : uint8_t
{
S3,
Azure,
Local,
HDFS,
};
}

View File

@ -0,0 +1,118 @@
#include <Databases/Iceberg/ICatalog.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
#include <Poco/String.h>
namespace DB::ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
}
namespace Iceberg
{
StorageType parseStorageTypeFromLocation(const std::string & location)
{
/// Table location in catalog metadata always starts with one of s3://, file://, etc.
/// So just extract this part of the path and deduce storage type from it.
auto pos = location.find("://");
if (pos == std::string::npos)
{
throw DB::Exception(
DB::ErrorCodes::NOT_IMPLEMENTED,
"Unexpected path format: {}", location);
}
auto storage_type_str = location.substr(0, pos);
auto storage_type = magic_enum::enum_cast<StorageType>(Poco::toUpper(storage_type_str));
if (!storage_type)
{
throw DB::Exception(
DB::ErrorCodes::NOT_IMPLEMENTED,
"Unsupported storage type: {}", storage_type_str);
}
return *storage_type;
}
void TableMetadata::setLocation(const std::string & location_)
{
if (!with_location)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data location was not requested");
/// Location has format:
/// s3://<bucket>/path/to/table/data.
/// We want to split s3://<bucket> and path/to/table/data.
auto pos = location_.find("://");
if (pos == std::string::npos)
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);
auto pos_to_bucket = pos + std::strlen("://");
auto pos_to_path = location_.substr(pos_to_bucket).find('/');
if (pos_to_path == std::string::npos)
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);
pos_to_path = pos_to_bucket + pos_to_path;
location_without_path = location_.substr(0, pos_to_path);
path = location_.substr(pos_to_path + 1);
LOG_TEST(getLogger("TableMetadata"),
"Parsed location without path: {}, path: {}",
location_without_path, path);
}
std::string TableMetadata::getLocation(bool path_only) const
{
if (!with_location)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data location was not requested");
if (path_only)
return path;
else
return std::filesystem::path(location_without_path) / path;
}
void TableMetadata::setSchema(const DB::NamesAndTypesList & schema_)
{
if (!with_schema)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data schema was not requested");
schema = schema_;
}
const DB::NamesAndTypesList & TableMetadata::getSchema() const
{
if (!with_schema)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data schema was not requested");
return schema;
}
void TableMetadata::setStorageCredentials(std::shared_ptr<IStorageCredentials> credentials_)
{
if (!with_storage_credentials)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Storage credentials were not requested");
storage_credentials = std::move(credentials_);
}
std::shared_ptr<IStorageCredentials> TableMetadata::getStorageCredentials() const
{
if (!with_storage_credentials)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data schema was not requested");
return storage_credentials;
}
StorageType TableMetadata::getStorageType() const
{
return parseStorageTypeFromLocation(location_without_path);
}
}

View File

@ -0,0 +1,104 @@
#pragma once
#include <Core/Types.h>
#include <Core/NamesAndTypes.h>
#include <Core/SettingsEnums.h>
#include <Databases/Iceberg/StorageCredentials.h>
#include <Databases/Iceberg/DatabaseIcebergStorageType.h>
namespace Iceberg
{
using StorageType = DB::DatabaseIcebergStorageType;
StorageType parseStorageTypeFromLocation(const std::string & location);
/// A class representing table metadata,
/// which was received from Catalog.
class TableMetadata
{
public:
TableMetadata() = default;
TableMetadata & withLocation() { with_location = true; return *this; }
TableMetadata & withSchema() { with_schema = true; return *this; }
TableMetadata & withStorageCredentials() { with_storage_credentials = true; return *this; }
void setLocation(const std::string & location_);
std::string getLocation(bool path_only) const;
void setSchema(const DB::NamesAndTypesList & schema_);
const DB::NamesAndTypesList & getSchema() const;
void setStorageCredentials(std::shared_ptr<IStorageCredentials> credentials_);
std::shared_ptr<IStorageCredentials> getStorageCredentials() const;
bool requiresLocation() const { return with_location; }
bool requiresSchema() const { return with_schema; }
bool requiresCredentials() const { return with_storage_credentials; }
StorageType getStorageType() const;
private:
/// Starts with s3://, file://, etc.
/// For example, `s3://bucket/`
std::string location_without_path;
/// Path to table's data: `/path/to/table/data/`
std::string path;
DB::NamesAndTypesList schema;
/// Storage credentials, which are called "vended credentials".
std::shared_ptr<IStorageCredentials> storage_credentials;
bool with_location = false;
bool with_schema = false;
bool with_storage_credentials = false;
};
/// Base class for catalog implementation.
/// Used for communication with the catalog.
class ICatalog
{
public:
using Namespaces = std::vector<std::string>;
explicit ICatalog(const std::string & warehouse_) : warehouse(warehouse_) {}
virtual ~ICatalog() = default;
/// Does catalog have any tables?
virtual bool empty() const = 0;
/// Fetch tables' names list.
/// Contains full namespaces in names.
virtual DB::Names getTables() const = 0;
/// Check that a table exists in a given namespace.
virtual bool existsTable(
const std::string & namespace_naem,
const std::string & table_name) const = 0;
/// Get table metadata in the given namespace.
/// Throw exception if table does not exist.
virtual void getTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
TableMetadata & result) const = 0;
/// Get table metadata in the given namespace.
/// Return `false` if table does not exist, `true` otherwise.
virtual bool tryGetTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
TableMetadata & result) const = 0;
/// Get storage type, where Iceberg tables' data is stored.
/// E.g. one of S3, Azure, Local, HDFS.
virtual std::optional<StorageType> getStorageType() const = 0;
protected:
/// Name of the warehouse,
/// which is sometimes also called "catalog name".
const std::string warehouse;
};
}

View File

@ -0,0 +1,648 @@
#include "config.h"
#if USE_AVRO
#include <Databases/Iceberg/RestCatalog.h>
#include <Databases/Iceberg/StorageCredentials.h>
#include <base/find_symbols.h>
#include <Core/Settings.h>
#include <Common/escapeForFileName.h>
#include <Common/threadPoolCallbackRunner.h>
#include <Common/Base64.h>
#include <Common/checkStackSize.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/HTTPCommon.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Interpreters/Context.h>
#include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
#include <Server/HTTP/HTMLForm.h>
#include <Formats/FormatFactory.h>
#include <Poco/URI.h>
#include <Poco/JSON/Array.h>
#include <Poco/JSON/Parser.h>
namespace DB::ErrorCodes
{
extern const int ICEBERG_CATALOG_ERROR;
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
namespace DB::Setting
{
extern const SettingsBool iceberg_engine_ignore_schema_evolution;
}
namespace Iceberg
{
static constexpr auto CONFIG_ENDPOINT = "config";
static constexpr auto NAMESPACES_ENDPOINT = "namespaces";
namespace
{
std::pair<std::string, std::string> parseCatalogCredential(const std::string & catalog_credential)
{
/// Parse a string of format "<client_id>:<client_secret>"
/// into separare strings client_id and client_secret.
std::string client_id, client_secret;
if (!catalog_credential.empty())
{
auto pos = catalog_credential.find(':');
if (pos == std::string::npos)
{
throw DB::Exception(
DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected format of catalog credential: "
"expected client_id and client_secret separated by `:`");
}
client_id = catalog_credential.substr(0, pos);
client_secret = catalog_credential.substr(pos + 1);
}
return std::pair(client_id, client_secret);
}
DB::HTTPHeaderEntry parseAuthHeader(const std::string & auth_header)
{
/// Parse a string of format "Authorization: <auth_scheme> <auth_token>"
/// into a key-value header "Authorization", "<auth_scheme> <auth_token>"
auto pos = auth_header.find(':');
if (pos == std::string::npos)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected format of auth header");
return DB::HTTPHeaderEntry(auth_header.substr(0, pos), auth_header.substr(pos + 1));
}
std::string correctAPIURI(const std::string & uri)
{
if (uri.ends_with("v1"))
return uri;
return std::filesystem::path(uri) / "v1";
}
}
std::string RestCatalog::Config::toString() const
{
DB::WriteBufferFromOwnString wb;
if (!prefix.empty())
wb << "prefix: " << prefix.string() << ", ";
if (!default_base_location.empty())
wb << "default_base_location: " << default_base_location << ", ";
return wb.str();
}
RestCatalog::RestCatalog(
const std::string & warehouse_,
const std::string & base_url_,
const std::string & catalog_credential_,
const std::string & auth_scope_,
const std::string & auth_header_,
const std::string & oauth_server_uri_,
DB::ContextPtr context_)
: ICatalog(warehouse_)
, DB::WithContext(context_)
, base_url(correctAPIURI(base_url_))
, log(getLogger("RestCatalog(" + warehouse_ + ")"))
, auth_scope(auth_scope_)
, oauth_server_uri(oauth_server_uri_)
{
if (!catalog_credential_.empty())
{
std::tie(client_id, client_secret) = parseCatalogCredential(catalog_credential_);
update_token_if_expired = true;
}
else if (!auth_header_.empty())
auth_header = parseAuthHeader(auth_header_);
config = loadConfig();
}
RestCatalog::Config RestCatalog::loadConfig()
{
Poco::URI::QueryParameters params = {{"warehouse", warehouse}};
auto buf = createReadBuffer(CONFIG_ENDPOINT, params);
std::string json_str;
readJSONObjectPossiblyInvalid(json_str, *buf);
LOG_TEST(log, "Received catalog configuration settings: {}", json_str);
Poco::JSON::Parser parser;
Poco::Dynamic::Var json = parser.parse(json_str);
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
Config result;
auto defaults_object = object->get("defaults").extract<Poco::JSON::Object::Ptr>();
parseCatalogConfigurationSettings(defaults_object, result);
auto overrides_object = object->get("overrides").extract<Poco::JSON::Object::Ptr>();
parseCatalogConfigurationSettings(overrides_object, result);
LOG_TEST(log, "Parsed catalog configuration settings: {}", result.toString());
return result;
}
void RestCatalog::parseCatalogConfigurationSettings(const Poco::JSON::Object::Ptr & object, Config & result)
{
if (!object)
return;
if (object->has("prefix"))
result.prefix = object->get("prefix").extract<String>();
if (object->has("default-base-location"))
result.default_base_location = object->get("default-base-location").extract<String>();
}
DB::HTTPHeaderEntries RestCatalog::getAuthHeaders(bool update_token) const
{
/// Option 1: user specified auth header manually.
/// Header has format: 'Authorization: <scheme> <token>'.
if (auth_header.has_value())
{
return DB::HTTPHeaderEntries{auth_header.value()};
}
/// Option 2: user provided grant_type, client_id and client_secret.
/// We would make OAuthClientCredentialsRequest
/// https://github.com/apache/iceberg/blob/3badfe0c1fcf0c0adfc7aa4a10f0b50365c48cf9/open-api/rest-catalog-open-api.yaml#L3498C5-L3498C34
if (!client_id.empty())
{
if (!access_token.has_value() || update_token)
{
access_token = retrieveAccessToken();
}
DB::HTTPHeaderEntries headers;
headers.emplace_back("Authorization", "Bearer " + access_token.value());
return headers;
}
return {};
}
std::string RestCatalog::retrieveAccessToken() const
{
static constexpr auto oauth_tokens_endpoint = "oauth/tokens";
/// TODO:
/// 1. support oauth2-server-uri
/// https://github.com/apache/iceberg/blob/918f81f3c3f498f46afcea17c1ac9cdc6913cb5c/open-api/rest-catalog-open-api.yaml#L183C82-L183C99
DB::HTTPHeaderEntries headers;
headers.emplace_back("Content-Type", "application/x-www-form-urlencoded");
headers.emplace_back("Accepts", "application/json; charset=UTF-8");
Poco::URI url;
DB::ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback;
if (oauth_server_uri.empty())
{
url = Poco::URI(base_url / oauth_tokens_endpoint);
Poco::URI::QueryParameters params = {
{"grant_type", "client_credentials"},
{"scope", auth_scope},
{"client_id", client_id},
{"client_secret", client_secret},
};
url.setQueryParameters(params);
}
else
{
url = Poco::URI(oauth_server_uri);
out_stream_callback = [&](std::ostream & os)
{
os << fmt::format(
"grant_type=client_credentials&scope={}&client_id={}&client_secret={}",
auth_scope, client_id, client_secret);
};
}
const auto & context = getContext();
auto wb = DB::BuilderRWBufferFromHTTP(url)
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
.withMethod(Poco::Net::HTTPRequest::HTTP_POST)
.withSettings(context->getReadSettings())
.withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings()))
.withHostFilter(&context->getRemoteHostFilter())
.withOutCallback(std::move(out_stream_callback))
.withSkipNotFound(false)
.withHeaders(headers)
.create(credentials);
std::string json_str;
readJSONObjectPossiblyInvalid(json_str, *wb);
Poco::JSON::Parser parser;
Poco::Dynamic::Var res_json = parser.parse(json_str);
const Poco::JSON::Object::Ptr & object = res_json.extract<Poco::JSON::Object::Ptr>();
return object->get("access_token").extract<String>();
}
std::optional<StorageType> RestCatalog::getStorageType() const
{
if (config.default_base_location.empty())
return std::nullopt;
return parseStorageTypeFromLocation(config.default_base_location);
}
DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer(
const std::string & endpoint,
const Poco::URI::QueryParameters & params,
const DB::HTTPHeaderEntries & headers) const
{
const auto & context = getContext();
Poco::URI url(base_url / endpoint);
if (!params.empty())
url.setQueryParameters(params);
auto create_buffer = [&](bool update_token)
{
auto result_headers = getAuthHeaders(update_token);
std::move(headers.begin(), headers.end(), std::back_inserter(result_headers));
return DB::BuilderRWBufferFromHTTP(url)
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
.withSettings(getContext()->getReadSettings())
.withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings()))
.withHostFilter(&getContext()->getRemoteHostFilter())
.withHeaders(result_headers)
.withDelayInit(false)
.withSkipNotFound(false)
.create(credentials);
};
LOG_TEST(log, "Requesting: {}", url.toString());
try
{
return create_buffer(false);
}
catch (const DB::HTTPException & e)
{
const auto status = e.getHTTPStatus();
if (update_token_if_expired &&
(status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_UNAUTHORIZED
|| status == Poco::Net::HTTPResponse::HTTPStatus::HTTP_FORBIDDEN))
{
return create_buffer(true);
}
throw;
}
}
bool RestCatalog::empty() const
{
/// TODO: add a test with empty namespaces and zero namespaces.
bool found_table = false;
auto stop_condition = [&](const std::string & namespace_name) -> bool
{
const auto tables = getTables(namespace_name, /* limit */1);
found_table = !tables.empty();
return found_table;
};
Namespaces namespaces;
getNamespacesRecursive("", namespaces, stop_condition, /* execute_func */{});
return found_table;
}
DB::Names RestCatalog::getTables() const
{
auto & pool = getContext()->getIcebergCatalogThreadpool();
DB::ThreadPoolCallbackRunnerLocal<void> runner(pool, "RestCatalog");
DB::Names tables;
std::mutex mutex;
auto execute_for_each_namespace = [&](const std::string & current_namespace)
{
runner(
[=, &tables, &mutex, this]
{
auto tables_in_namespace = getTables(current_namespace);
std::lock_guard lock(mutex);
std::move(tables_in_namespace.begin(), tables_in_namespace.end(), std::back_inserter(tables));
});
};
Namespaces namespaces;
getNamespacesRecursive(
/* base_namespace */"", /// Empty base namespace means starting from root.
namespaces,
/* stop_condition */{},
/* execute_func */execute_for_each_namespace);
runner.waitForAllToFinishAndRethrowFirstError();
return tables;
}
void RestCatalog::getNamespacesRecursive(
const std::string & base_namespace,
Namespaces & result,
StopCondition stop_condition,
ExecuteFunc func) const
{
checkStackSize();
auto namespaces = getNamespaces(base_namespace);
result.reserve(result.size() + namespaces.size());
result.insert(result.end(), namespaces.begin(), namespaces.end());
for (const auto & current_namespace : namespaces)
{
chassert(current_namespace.starts_with(base_namespace));
if (stop_condition && stop_condition(current_namespace))
break;
if (func)
func(current_namespace);
getNamespacesRecursive(current_namespace, result, stop_condition, func);
}
}
Poco::URI::QueryParameters RestCatalog::createParentNamespaceParams(const std::string & base_namespace) const
{
std::vector<std::string> parts;
splitInto<'.'>(parts, base_namespace);
std::string parent_param;
for (const auto & part : parts)
{
/// 0x1F is a unit separator
/// https://github.com/apache/iceberg/blob/70d87f1750627b14b3b25a0216a97db86a786992/open-api/rest-catalog-open-api.yaml#L264
if (!parent_param.empty())
parent_param += static_cast<char>(0x1F);
parent_param += part;
}
return {{"parent", parent_param}};
}
RestCatalog::Namespaces RestCatalog::getNamespaces(const std::string & base_namespace) const
{
Poco::URI::QueryParameters params;
if (!base_namespace.empty())
params = createParentNamespaceParams(base_namespace);
try
{
auto buf = createReadBuffer(config.prefix / NAMESPACES_ENDPOINT, params);
auto namespaces = parseNamespaces(*buf, base_namespace);
LOG_TEST(log, "Loaded {} namespaces in base namespace {}", namespaces.size(), base_namespace);
return namespaces;
}
catch (const DB::HTTPException & e)
{
std::string message = fmt::format(
"Received error while fetching list of namespaces from iceberg catalog `{}`. ",
warehouse);
if (e.code() == Poco::Net::HTTPResponse::HTTPStatus::HTTP_NOT_FOUND)
message += "Namespace provided in the `parent` query parameter is not found. ";
message += fmt::format(
"Code: {}, status: {}, message: {}",
e.code(), e.getHTTPStatus(), e.displayText());
throw DB::Exception(DB::ErrorCodes::ICEBERG_CATALOG_ERROR, "{}", message);
}
}
RestCatalog::Namespaces RestCatalog::parseNamespaces(DB::ReadBuffer & buf, const std::string & base_namespace) const
{
if (buf.eof())
return {};
String json_str;
readJSONObjectPossiblyInvalid(json_str, buf);
LOG_TEST(log, "Received response: {}", json_str);
try
{
Poco::JSON::Parser parser;
Poco::Dynamic::Var json = parser.parse(json_str);
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
auto namespaces_object = object->get("namespaces").extract<Poco::JSON::Array::Ptr>();
if (!namespaces_object)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse result");
Namespaces namespaces;
for (size_t i = 0; i < namespaces_object->size(); ++i)
{
auto current_namespace_array = namespaces_object->get(static_cast<int>(i)).extract<Poco::JSON::Array::Ptr>();
if (current_namespace_array->size() == 0)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Expected namespace array to be non-empty");
const int idx = static_cast<int>(current_namespace_array->size()) - 1;
const auto current_namespace = current_namespace_array->get(idx).extract<String>();
const auto full_namespace = base_namespace.empty()
? current_namespace
: base_namespace + "." + current_namespace;
namespaces.push_back(full_namespace);
}
return namespaces;
}
catch (DB::Exception & e)
{
e.addMessage("while parsing JSON: " + json_str);
throw;
}
}
DB::Names RestCatalog::getTables(const std::string & base_namespace, size_t limit) const
{
const std::string endpoint = std::filesystem::path(NAMESPACES_ENDPOINT) / base_namespace / "tables";
auto buf = createReadBuffer(config.prefix / endpoint);
return parseTables(*buf, base_namespace, limit);
}
DB::Names RestCatalog::parseTables(DB::ReadBuffer & buf, const std::string & base_namespace, size_t limit) const
{
if (buf.eof())
return {};
String json_str;
readJSONObjectPossiblyInvalid(json_str, buf);
try
{
Poco::JSON::Parser parser;
Poco::Dynamic::Var json = parser.parse(json_str);
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
auto identifiers_object = object->get("identifiers").extract<Poco::JSON::Array::Ptr>();
if (!identifiers_object)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse result");
DB::Names tables;
for (size_t i = 0; i < identifiers_object->size(); ++i)
{
const auto current_table_json = identifiers_object->get(static_cast<int>(i)).extract<Poco::JSON::Object::Ptr>();
const auto table_name = current_table_json->get("name").extract<String>();
tables.push_back(base_namespace + "." + table_name);
if (limit && tables.size() >= limit)
break;
}
return tables;
}
catch (DB::Exception & e)
{
e.addMessage("while parsing JSON: " + json_str);
throw;
}
}
bool RestCatalog::existsTable(const std::string & namespace_name, const std::string & table_name) const
{
TableMetadata table_metadata;
return tryGetTableMetadata(namespace_name, table_name, table_metadata);
}
bool RestCatalog::tryGetTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
TableMetadata & result) const
{
try
{
return getTableMetadataImpl(namespace_name, table_name, result);
}
catch (...)
{
DB::tryLogCurrentException(log);
return false;
}
}
void RestCatalog::getTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
TableMetadata & result) const
{
if (!getTableMetadataImpl(namespace_name, table_name, result))
throw DB::Exception(DB::ErrorCodes::ICEBERG_CATALOG_ERROR, "No response from iceberg catalog");
}
bool RestCatalog::getTableMetadataImpl(
const std::string & namespace_name,
const std::string & table_name,
TableMetadata & result) const
{
LOG_TEST(log, "Checking table {} in namespace {}", table_name, namespace_name);
DB::HTTPHeaderEntries headers;
if (result.requiresCredentials())
{
/// Header `X-Iceberg-Access-Delegation` tells catalog to include storage credentials in LoadTableResponse.
/// Value can be one of the two:
/// 1. `vended-credentials`
/// 2. `remote-signing`
/// Currently we support only the first.
/// https://github.com/apache/iceberg/blob/3badfe0c1fcf0c0adfc7aa4a10f0b50365c48cf9/open-api/rest-catalog-open-api.yaml#L1832
headers.emplace_back("X-Iceberg-Access-Delegation", "vended-credentials");
}
const std::string endpoint = std::filesystem::path(NAMESPACES_ENDPOINT) / namespace_name / "tables" / table_name;
auto buf = createReadBuffer(config.prefix / endpoint, /* params */{}, headers);
if (buf->eof())
{
LOG_TEST(log, "Table doesn't exist (endpoint: {})", endpoint);
return false;
}
String json_str;
readJSONObjectPossiblyInvalid(json_str, *buf);
#ifdef DEBUG_OR_SANITIZER_BUILD
/// This log message might contain credentials,
/// so log it only for debugging.
LOG_TEST(log, "Received metadata for table {}: {}", table_name, json_str);
#endif
Poco::JSON::Parser parser;
Poco::Dynamic::Var json = parser.parse(json_str);
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
auto metadata_object = object->get("metadata").extract<Poco::JSON::Object::Ptr>();
if (!metadata_object)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse result");
std::string location;
if (result.requiresLocation())
{
location = metadata_object->get("location").extract<String>();
result.setLocation(location);
LOG_TEST(log, "Location for table {}: {}", table_name, location);
}
if (result.requiresSchema())
{
// int format_version = metadata_object->getValue<int>("format-version");
auto schema_processor = DB::IcebergSchemaProcessor();
auto id = DB::IcebergMetadata::parseTableSchema(metadata_object, schema_processor, log);
auto schema = schema_processor.getClickhouseTableSchemaById(id);
result.setSchema(*schema);
}
if (result.requiresCredentials() && object->has("config"))
{
auto config_object = object->get("config").extract<Poco::JSON::Object::Ptr>();
if (!config_object)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse config result");
auto storage_type = parseStorageTypeFromLocation(location);
switch (storage_type)
{
case StorageType::S3:
{
static constexpr auto access_key_id_str = "s3.access-key-id";
static constexpr auto secret_access_key_str = "s3.secret-access-key";
static constexpr auto session_token_str = "s3.session-token";
std::string access_key_id, secret_access_key, session_token;
if (config_object->has(access_key_id_str))
access_key_id = config_object->get(access_key_id_str).extract<String>();
if (config_object->has(secret_access_key_str))
secret_access_key = config_object->get(secret_access_key_str).extract<String>();
if (config_object->has(session_token_str))
session_token = config_object->get(session_token_str).extract<String>();
result.setStorageCredentials(
std::make_shared<S3Credentials>(access_key_id, secret_access_key, session_token));
break;
}
default:
break;
}
}
return true;
}
}
#endif

View File

@ -0,0 +1,122 @@
#pragma once
#include "config.h"
#if USE_AVRO
#include <Databases/Iceberg/ICatalog.h>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/HTTPHeaderEntries.h>
#include <Interpreters/Context_fwd.h>
#include <filesystem>
#include <Poco/JSON/Object.h>
namespace DB
{
class ReadBuffer;
}
namespace Iceberg
{
class RestCatalog final : public ICatalog, private DB::WithContext
{
public:
explicit RestCatalog(
const std::string & warehouse_,
const std::string & base_url_,
const std::string & catalog_credential_,
const std::string & auth_scope_,
const std::string & auth_header_,
const std::string & oauth_server_uri_,
DB::ContextPtr context_);
~RestCatalog() override = default;
bool empty() const override;
DB::Names getTables() const override;
bool existsTable(const std::string & namespace_name, const std::string & table_name) const override;
void getTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
TableMetadata & result) const override;
bool tryGetTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
TableMetadata & result) const override;
std::optional<StorageType> getStorageType() const override;
private:
struct Config
{
/// Prefix is a path of the catalog endpoint,
/// e.g. /v1/{prefix}/namespaces/{namespace}/tables/{table}
std::filesystem::path prefix;
/// Base location is location of data in storage
/// (in filesystem or object storage).
std::string default_base_location;
std::string toString() const;
};
const std::filesystem::path base_url;
const LoggerPtr log;
/// Catalog configuration settings from /v1/config endpoint.
Config config;
/// Auth headers of format: "Authorization": "<auth_scheme> <token>"
std::optional<DB::HTTPHeaderEntry> auth_header;
/// Parameters for OAuth.
bool update_token_if_expired = false;
std::string client_id;
std::string client_secret;
std::string auth_scope;
std::string oauth_server_uri;
mutable std::optional<std::string> access_token;
Poco::Net::HTTPBasicCredentials credentials{};
DB::ReadWriteBufferFromHTTPPtr createReadBuffer(
const std::string & endpoint,
const Poco::URI::QueryParameters & params = {},
const DB::HTTPHeaderEntries & headers = {}) const;
Poco::URI::QueryParameters createParentNamespaceParams(const std::string & base_namespace) const;
using StopCondition = std::function<bool(const std::string & namespace_name)>;
using ExecuteFunc = std::function<void(const std::string & namespace_name)>;
void getNamespacesRecursive(
const std::string & base_namespace,
Namespaces & result,
StopCondition stop_condition,
ExecuteFunc func) const;
Namespaces getNamespaces(const std::string & base_namespace) const;
Namespaces parseNamespaces(DB::ReadBuffer & buf, const std::string & base_namespace) const;
DB::Names getTables(const std::string & base_namespace, size_t limit = 0) const;
DB::Names parseTables(DB::ReadBuffer & buf, const std::string & base_namespace, size_t limit) const;
bool getTableMetadataImpl(
const std::string & namespace_name,
const std::string & table_name,
TableMetadata & result) const;
Config loadConfig();
std::string retrieveAccessToken() const;
DB::HTTPHeaderEntries getAuthHeaders(bool update_token = false) const;
static void parseCatalogConfigurationSettings(const Poco::JSON::Object::Ptr & object, Config & result);
};
}
#endif

View File

@ -0,0 +1,51 @@
#pragma once
#include <Core/Types.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTLiteral.h>
namespace DB::ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace Iceberg
{
class IStorageCredentials
{
public:
virtual ~IStorageCredentials() = default;
virtual void addCredentialsToEngineArgs(DB::ASTs & engine_args) const = 0;
};
class S3Credentials final : public IStorageCredentials
{
public:
/// TODO: support region as well.
S3Credentials(
const std::string & access_key_id_,
const std::string & secret_access_key_,
const std::string session_token_)
: access_key_id(access_key_id_)
, secret_access_key(secret_access_key_)
, session_token(session_token_)
{}
void addCredentialsToEngineArgs(DB::ASTs & engine_args) const override
{
if (engine_args.size() != 1)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Storage credentials specified in AST already");
engine_args.push_back(std::make_shared<DB::ASTLiteral>(access_key_id));
engine_args.push_back(std::make_shared<DB::ASTLiteral>(secret_access_key));
engine_args.push_back(std::make_shared<DB::ASTLiteral>(session_token));
}
private:
std::string access_key_id;
std::string secret_access_key;
std::string session_token;
};
}

View File

@ -46,6 +46,7 @@ void enableAllExperimentalSettings(ContextMutablePtr context)
context->setSetting("enable_zstd_qat_codec", 1);
context->setSetting("allow_create_index_without_type", 1);
context->setSetting("allow_experimental_s3queue", 1);
context->setSetting("allow_experimental_database_iceberg", 1);
/// clickhouse-private settings
context->setSetting("allow_experimental_shared_set_join", 1);

View File

@ -36,6 +36,10 @@ void registerDatabaseS3(DatabaseFactory & factory);
void registerDatabaseHDFS(DatabaseFactory & factory);
#endif
#if USE_AVRO
void registerDatabaseIceberg(DatabaseFactory & factory);
#endif
void registerDatabases()
{
auto & factory = DatabaseFactory::instance();
@ -68,5 +72,9 @@ void registerDatabases()
#if USE_HDFS
registerDatabaseHDFS(factory);
#endif
#if USE_AVRO
registerDatabaseIceberg(factory);
#endif
}
}

View File

@ -176,6 +176,9 @@ namespace CurrentMetrics
extern const Metric AttachedDictionary;
extern const Metric AttachedDatabase;
extern const Metric PartsActive;
extern const Metric IcebergCatalogThreads;
extern const Metric IcebergCatalogThreadsActive;
extern const Metric IcebergCatalogThreadsScheduled;
}
@ -284,6 +287,8 @@ namespace ServerSetting
extern const ServerSettingsUInt64 load_marks_threadpool_queue_size;
extern const ServerSettingsUInt64 threadpool_writer_pool_size;
extern const ServerSettingsUInt64 threadpool_writer_queue_size;
extern const ServerSettingsUInt64 iceberg_catalog_threadpool_pool_size;
extern const ServerSettingsUInt64 iceberg_catalog_threadpool_queue_size;
}
@ -415,6 +420,8 @@ struct ContextSharedPart : boost::noncopyable
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
mutable OnceFlag prefetch_threadpool_initialized;
mutable std::unique_ptr<ThreadPool> prefetch_threadpool; /// Threadpool for loading marks cache.
mutable std::unique_ptr<ThreadPool> iceberg_catalog_threadpool;
mutable OnceFlag iceberg_catalog_threadpool_initialized;
mutable OnceFlag build_vector_similarity_index_threadpool_initialized;
mutable std::unique_ptr<ThreadPool> build_vector_similarity_index_threadpool; /// Threadpool for vector-similarity index creation.
mutable UncompressedCachePtr index_uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks for MergeTree indices.
@ -3257,6 +3264,23 @@ ThreadPool & Context::getLoadMarksThreadpool() const
return *shared->load_marks_threadpool;
}
ThreadPool & Context::getIcebergCatalogThreadpool() const
{
callOnce(shared->iceberg_catalog_threadpool_initialized, [&]
{
auto pool_size = shared->server_settings[ServerSetting::iceberg_catalog_threadpool_pool_size];
auto queue_size = shared->server_settings[ServerSetting::iceberg_catalog_threadpool_queue_size];
shared->iceberg_catalog_threadpool = std::make_unique<ThreadPool>(
CurrentMetrics::IcebergCatalogThreads,
CurrentMetrics::IcebergCatalogThreadsActive,
CurrentMetrics::IcebergCatalogThreadsScheduled,
pool_size, pool_size, queue_size);
});
return *shared->iceberg_catalog_threadpool;
}
void Context::setPrimaryIndexCache(const String & cache_policy, size_t max_cache_size_in_bytes, double size_ratio)
{
std::lock_guard lock(shared->mutex);

View File

@ -1123,6 +1123,7 @@ public:
size_t getPrefetchThreadpoolSize() const;
ThreadPool & getBuildVectorSimilarityIndexThreadPool() const;
ThreadPool & getIcebergCatalogThreadpool() const;
/// Settings for MergeTree background tasks stored in config.xml
BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const;

View File

@ -52,9 +52,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
Int32 parseTableSchema(
const Poco::JSON::Object::Ptr & metadata_object, IcebergSchemaProcessor & schema_processor, LoggerPtr metadata_logger);
IcebergMetadata::IcebergMetadata(
ObjectStoragePtr object_storage_,
ConfigurationObserverPtr configuration_,
@ -79,7 +76,6 @@ IcebergMetadata::IcebergMetadata(
namespace
{
enum class ManifestEntryStatus : uint8_t
{
EXISTING = 0,
@ -143,6 +139,7 @@ bool schemasAreIdentical(const Poco::JSON::Object & first, const Poco::JSON::Obj
return false;
return *(first.getArray(fields_key)) == *(second.getArray(fields_key));
}
}
@ -309,7 +306,7 @@ std::pair<Poco::JSON::Object::Ptr, Int32> parseTableSchemaV1Method(const Poco::J
return {schema, current_schema_id};
}
Int32 parseTableSchema(
Int32 DB::IcebergMetadata::parseTableSchema(
const Poco::JSON::Object::Ptr & metadata_object, IcebergSchemaProcessor & schema_processor, LoggerPtr metadata_logger)
{
Int32 format_version = metadata_object->getValue<Int32>("format-version");
@ -577,7 +574,6 @@ getMetadataFileAndVersion(const ObjectStoragePtr & object_storage, const Storage
return *std::max_element(metadata_files_with_versions.begin(), metadata_files_with_versions.end());
}
DataLakeMetadataPtr IcebergMetadata::create(
const ObjectStoragePtr & object_storage, const ConfigurationObserverPtr & configuration, const ContextPtr & local_context)
{

View File

@ -207,6 +207,9 @@ public:
bool supportsExternalMetadataChange() const override { return true; }
static Int32 parseTableSchema(
const Poco::JSON::Object::Ptr & metadata_object, IcebergSchemaProcessor & schema_processor, LoggerPtr metadata_logger);
private:
mutable std::unordered_map<String, Int32> schema_id_by_data_file;

View File

@ -91,7 +91,8 @@ StorageObjectStorage::StorageObjectStorage(
std::optional<FormatSettings> format_settings_,
LoadingStrictnessLevel mode,
bool distributed_processing_,
ASTPtr partition_by_)
ASTPtr partition_by_,
bool lazy_init)
: IStorage(table_id_)
, configuration(configuration_)
, object_storage(object_storage_)
@ -102,10 +103,13 @@ StorageObjectStorage::StorageObjectStorage(
{
try
{
if (configuration->hasExternalDynamicMetadata())
configuration->updateAndGetCurrentSchema(object_storage, context);
else
configuration->update(object_storage, context);
if (!lazy_init)
{
if (configuration->hasExternalDynamicMetadata())
configuration->updateAndGetCurrentSchema(object_storage, context);
else
configuration->update(object_storage, context);
}
}
catch (...)
{

View File

@ -68,7 +68,8 @@ public:
std::optional<FormatSettings> format_settings_,
LoadingStrictnessLevel mode,
bool distributed_processing_ = false,
ASTPtr partition_by_ = nullptr);
ASTPtr partition_by_ = nullptr,
bool lazy_init = false);
String getName() const override;

View File

@ -0,0 +1,59 @@
services:
spark-iceberg:
image: tabulario/spark-iceberg
container_name: spark-iceberg
build: spark/
depends_on:
- rest
- minio
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
ports:
- 8080:8080
- 10000:10000
- 10001:10001
rest:
image: tabulario/iceberg-rest
ports:
- 8182:8181
environment:
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=minio123
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3://iceberg_data/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
minio:
image: minio/minio
container_name: minio
environment:
- MINIO_ROOT_USER=minio
- MINIO_ROOT_PASSWORD=minio123
- MINIO_DOMAIN=minio
networks:
default:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9002:9000
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:
- minio
image: minio/mc
container_name: mc
environment:
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=minio123
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 minio minio123) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/warehouse;
/usr/bin/mc mb minio/warehouse --ignore-existing;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"

View File

@ -14,6 +14,10 @@ services:
depends_on:
- proxy1
- proxy2
networks:
default:
aliases:
- warehouse.minio
# HTTP proxies for Minio.
proxy1:

View File

@ -572,6 +572,7 @@ class ClickHouseCluster:
self.resolver_logs_dir = os.path.join(self.instances_dir, "resolver")
self.spark_session = None
self.with_iceberg_catalog = False
self.with_azurite = False
self.azurite_container = "azurite-container"
@ -1468,6 +1469,26 @@ class ClickHouseCluster:
)
return self.base_minio_cmd
def setup_iceberg_catalog_cmd(
self, instance, env_variables, docker_compose_yml_dir
):
self.base_cmd.extend(
[
"--file",
p.join(
docker_compose_yml_dir, "docker_compose_iceberg_rest_catalog.yml"
),
]
)
self.base_iceberg_catalog_cmd = self.compose_cmd(
"--env-file",
instance.env_file,
"--file",
p.join(docker_compose_yml_dir, "docker_compose_iceberg_rest_catalog.yml"),
)
return self.base_iceberg_catalog_cmd
# return self.base_minio_cmd
def setup_azurite_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_azurite = True
env_variables["AZURITE_PORT"] = str(self.azurite_port)
@ -1634,6 +1655,7 @@ class ClickHouseCluster:
with_hive=False,
with_coredns=False,
with_prometheus=False,
with_iceberg_catalog=False,
handle_prometheus_remote_write=False,
handle_prometheus_remote_read=False,
use_old_analyzer=None,
@ -1742,6 +1764,7 @@ class ClickHouseCluster:
with_coredns=with_coredns,
with_cassandra=with_cassandra,
with_ldap=with_ldap,
with_iceberg_catalog=with_iceberg_catalog,
use_old_analyzer=use_old_analyzer,
server_bin_path=self.server_bin_path,
odbc_bridge_bin_path=self.odbc_bridge_bin_path,
@ -1932,6 +1955,13 @@ class ClickHouseCluster:
self.setup_minio_cmd(instance, env_variables, docker_compose_yml_dir)
)
if with_iceberg_catalog and not self.with_iceberg_catalog:
cmds.append(
self.setup_iceberg_catalog_cmd(
instance, env_variables, docker_compose_yml_dir
)
)
if with_azurite and not self.with_azurite:
cmds.append(
self.setup_azurite_cmd(instance, env_variables, docker_compose_yml_dir)
@ -3443,6 +3473,7 @@ class ClickHouseInstance:
with_coredns,
with_cassandra,
with_ldap,
with_iceberg_catalog,
use_old_analyzer,
server_bin_path,
odbc_bridge_bin_path,

View File

@ -0,0 +1,309 @@
import glob
import json
import logging
import os
import random
import time
import uuid
from datetime import datetime, timedelta
import pyarrow as pa
import pytest
import requests
import urllib3
from minio import Minio
from pyiceberg.catalog import load_catalog
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.sorting import SortField, SortOrder
from pyiceberg.transforms import DayTransform, IdentityTransform
from pyiceberg.types import (
DoubleType,
FloatType,
NestedField,
StringType,
StructType,
TimestampType,
)
from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm
from helpers.s3_tools import get_file_contents, list_s3_objects, prepare_s3_bucket
from helpers.test_tools import TSV, csv_compare
BASE_URL = "http://rest:8181/v1"
BASE_URL_LOCAL = "http://localhost:8182/v1"
BASE_URL_LOCAL_RAW = "http://localhost:8182"
CATALOG_NAME = "demo"
DEFAULT_SCHEMA = Schema(
NestedField(
field_id=1, name="datetime", field_type=TimestampType(), required=False
),
NestedField(field_id=2, name="symbol", field_type=StringType(), required=False),
NestedField(field_id=3, name="bid", field_type=DoubleType(), required=False),
NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
NestedField(
field_id=5,
name="details",
field_type=StructType(
NestedField(
field_id=4,
name="created_by",
field_type=StringType(),
required=False,
),
),
required=False,
),
)
DEFAULT_CREATE_TABLE = "CREATE TABLE {}.`{}.{}`\\n(\\n `datetime` Nullable(DateTime64(6)),\\n `symbol` Nullable(String),\\n `bid` Nullable(Float64),\\n `ask` Nullable(Float64),\\n `details` Tuple(created_by Nullable(String))\\n)\\nENGINE = Iceberg(\\'http://minio:9000/warehouse/data/\\', \\'minio\\', \\'[HIDDEN]\\')\n"
DEFAULT_PARTITION_SPEC = PartitionSpec(
PartitionField(
source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day"
)
)
DEFAULT_SORT_ORDER = SortOrder(SortField(source_id=2, transform=IdentityTransform()))
def list_namespaces():
response = requests.get(f"{BASE_URL_LOCAL}/namespaces")
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to list namespaces: {response.status_code}")
def load_catalog_impl(started_cluster):
return load_catalog(
CATALOG_NAME,
**{
"uri": BASE_URL_LOCAL_RAW,
"type": "rest",
"s3.endpoint": f"http://localhost:9002",
"s3.access-key-id": "minio",
"s3.secret-access-key": "minio123",
},
)
def create_table(
catalog,
namespace,
table,
schema=DEFAULT_SCHEMA,
partition_spec=DEFAULT_PARTITION_SPEC,
sort_order=DEFAULT_SORT_ORDER,
):
return catalog.create_table(
identifier=f"{namespace}.{table}",
schema=schema,
location=f"s3://warehouse/data",
partition_spec=partition_spec,
sort_order=sort_order,
)
def generate_record():
return {
"datetime": datetime.now(),
"symbol": str("kek"),
"bid": round(random.uniform(100, 200), 2),
"ask": round(random.uniform(200, 300), 2),
"details": {"created_by": "Alice Smith"},
}
def create_clickhouse_iceberg_database(started_cluster, node, name):
node.query(
f"""
DROP DATABASE IF EXISTS {name};
SET allow_experimental_database_iceberg=true;
CREATE DATABASE {name} ENGINE = Iceberg('{BASE_URL}', 'minio', 'minio123')
SETTINGS catalog_type = 'rest',
storage_endpoint = 'http://minio:9000/warehouse',
warehouse='demo'
"""
)
def print_objects():
minio_client = Minio(
f"localhost:9002",
access_key="minio",
secret_key="minio123",
secure=False,
http_client=urllib3.PoolManager(cert_reqs="CERT_NONE"),
)
objects = list(minio_client.list_objects("warehouse", "", recursive=True))
names = [x.object_name for x in objects]
names.sort()
for name in names:
print(f"Found object: {name}")
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node1",
main_configs=[],
user_configs=[],
stay_alive=True,
with_iceberg_catalog=True,
)
logging.info("Starting cluster...")
cluster.start()
# TODO: properly wait for container
time.sleep(10)
yield cluster
finally:
cluster.shutdown()
def test_list_tables(started_cluster):
node = started_cluster.instances["node1"]
root_namespace = f"clickhouse_{uuid.uuid4()}"
namespace_1 = f"{root_namespace}.testA.A"
namespace_2 = f"{root_namespace}.testB.B"
namespace_1_tables = ["tableA", "tableB"]
namespace_2_tables = ["tableC", "tableD"]
catalog = load_catalog_impl(started_cluster)
for namespace in [namespace_1, namespace_2]:
catalog.create_namespace(namespace)
found = False
for namespace_list in list_namespaces()["namespaces"]:
if root_namespace == namespace_list[0]:
found = True
break
assert found
found = False
for namespace_list in catalog.list_namespaces():
if root_namespace == namespace_list[0]:
found = True
break
assert found
for namespace in [namespace_1, namespace_2]:
assert len(catalog.list_tables(namespace)) == 0
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
tables_list = ""
for table in namespace_1_tables:
create_table(catalog, namespace_1, table)
if len(tables_list) > 0:
tables_list += "\n"
tables_list += f"{namespace_1}.{table}"
for table in namespace_2_tables:
create_table(catalog, namespace_2, table)
if len(tables_list) > 0:
tables_list += "\n"
tables_list += f"{namespace_2}.{table}"
assert (
tables_list
== node.query(
f"SELECT name FROM system.tables WHERE database = '{CATALOG_NAME}' and name ILIKE '{root_namespace}%' ORDER BY name"
).strip()
)
node.restart_clickhouse()
assert (
tables_list
== node.query(
f"SELECT name FROM system.tables WHERE database = '{CATALOG_NAME}' and name ILIKE '{root_namespace}%' ORDER BY name"
).strip()
)
expected = DEFAULT_CREATE_TABLE.format(CATALOG_NAME, namespace_2, "tableC")
assert expected == node.query(
f"SHOW CREATE TABLE {CATALOG_NAME}.`{namespace_2}.tableC`"
)
def test_many_namespaces(started_cluster):
node = started_cluster.instances["node1"]
root_namespace_1 = f"A_{uuid.uuid4()}"
root_namespace_2 = f"B_{uuid.uuid4()}"
namespaces = [
f"{root_namespace_1}",
f"{root_namespace_1}.B.C",
f"{root_namespace_1}.B.C.D",
f"{root_namespace_1}.B.C.D.E",
f"{root_namespace_2}",
f"{root_namespace_2}.C",
f"{root_namespace_2}.CC",
]
tables = ["A", "B", "C"]
catalog = load_catalog_impl(started_cluster)
for namespace in namespaces:
catalog.create_namespace(namespace)
for table in tables:
create_table(catalog, namespace, table)
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
for namespace in namespaces:
for table in tables:
table_name = f"{namespace}.{table}"
assert int(
node.query(
f"SELECT count() FROM system.tables WHERE database = '{CATALOG_NAME}' and name = '{table_name}'"
)
)
def test_select(started_cluster):
node = started_cluster.instances["node1"]
test_ref = f"test_list_tables_{uuid.uuid4()}"
table_name = f"{test_ref}_table"
root_namespace = f"{test_ref}_namespace"
namespace = f"{root_namespace}.A.B.C"
namespaces_to_create = [
root_namespace,
f"{root_namespace}.A",
f"{root_namespace}.A.B",
f"{root_namespace}.A.B.C",
]
catalog = load_catalog_impl(started_cluster)
for namespace in namespaces_to_create:
catalog.create_namespace(namespace)
assert len(catalog.list_tables(namespace)) == 0
table = create_table(catalog, namespace, table_name)
num_rows = 10
data = [generate_record() for _ in range(num_rows)]
df = pa.Table.from_pylist(data)
table.append(df)
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
expected = DEFAULT_CREATE_TABLE.format(CATALOG_NAME, namespace, table_name)
assert expected == node.query(
f"SHOW CREATE TABLE {CATALOG_NAME}.`{namespace}.{table_name}`"
)
assert num_rows == int(
node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace}.{table_name}`")
)