mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge 4abbf29a86
into e0f8b8d351
This commit is contained in:
commit
2ff295f5f6
@ -36,7 +36,7 @@ geomet==0.2.1.post1
|
||||
grpcio-tools==1.60.0
|
||||
grpcio==1.60.0
|
||||
gssapi==1.8.3
|
||||
httplib2==0.20.2
|
||||
httplib2==0.22.0
|
||||
idna==3.7
|
||||
importlib-metadata==4.6.4
|
||||
iniconfig==2.0.0
|
||||
@ -72,7 +72,7 @@ pyarrow==17.0.0
|
||||
pycparser==2.22
|
||||
pycryptodome==3.20.0
|
||||
pymongo==3.11.0
|
||||
pyparsing==2.4.7
|
||||
pyparsing==3.1.0
|
||||
pyspark==3.3.2
|
||||
pyspnego==0.10.2
|
||||
pytest-order==1.0.0
|
||||
@ -101,3 +101,4 @@ wadllib==1.3.6
|
||||
websocket-client==1.8.0
|
||||
wheel==0.38.1
|
||||
zipp==1.0.0
|
||||
pyiceberg==0.7.1
|
||||
|
@ -166,6 +166,8 @@ if (TARGET ch_contrib::hdfs)
|
||||
add_headers_and_sources(dbms Disks/ObjectStorages/HDFS)
|
||||
endif()
|
||||
|
||||
add_headers_and_sources(dbms Databases/Iceberg)
|
||||
|
||||
add_headers_and_sources(dbms Disks/ObjectStorages/Cached)
|
||||
add_headers_and_sources(dbms Disks/ObjectStorages/Local)
|
||||
add_headers_and_sources(dbms Disks/ObjectStorages/Web)
|
||||
|
@ -189,6 +189,9 @@
|
||||
M(BuildVectorSimilarityIndexThreads, "Number of threads in the build vector similarity index thread pool.") \
|
||||
M(BuildVectorSimilarityIndexThreadsActive, "Number of threads in the build vector similarity index thread pool running a task.") \
|
||||
M(BuildVectorSimilarityIndexThreadsScheduled, "Number of queued or active jobs in the build vector similarity index thread pool.") \
|
||||
M(IcebergCatalogThreads, "Number of threads in the IcebergCatalog thread pool.") \
|
||||
M(IcebergCatalogThreadsActive, "Number of threads in the IcebergCatalog thread pool running a task.") \
|
||||
M(IcebergCatalogThreadsScheduled, "Number of queued or active jobs in the IcebergCatalog thread pool.") \
|
||||
\
|
||||
M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \
|
||||
M(DiskPlainRewritableAzureFileCount, "Number of file entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \
|
||||
|
@ -613,6 +613,7 @@
|
||||
M(733, TABLE_IS_BEING_RESTARTED) \
|
||||
M(734, CANNOT_WRITE_AFTER_BUFFER_CANCELED) \
|
||||
M(735, QUERY_WAS_CANCELLED_BY_CLIENT) \
|
||||
M(736, ICEBERG_CATALOG_ERROR) \
|
||||
\
|
||||
M(900, DISTRIBUTED_CACHE_ERROR) \
|
||||
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \
|
||||
|
@ -280,4 +280,14 @@ IMPLEMENT_SETTING_ENUM(
|
||||
{"StochasticSimple", MergeSelectorAlgorithm::STOCHASTIC_SIMPLE},
|
||||
{"Trivial", MergeSelectorAlgorithm::TRIVIAL}})
|
||||
|
||||
IMPLEMENT_SETTING_ENUM(DatabaseIcebergCatalogType, ErrorCodes::BAD_ARGUMENTS,
|
||||
{{"rest", DatabaseIcebergCatalogType::REST}})
|
||||
|
||||
IMPLEMENT_SETTING_ENUM(DatabaseIcebergStorageType, ErrorCodes::BAD_ARGUMENTS,
|
||||
{{"s3", DatabaseIcebergStorageType::S3},
|
||||
{"azure", DatabaseIcebergStorageType::Azure},
|
||||
{"hdfs", DatabaseIcebergStorageType::HDFS},
|
||||
{"local", DatabaseIcebergStorageType::Local},
|
||||
})
|
||||
|
||||
}
|
||||
|
@ -361,4 +361,21 @@ DECLARE_SETTING_ENUM(GroupArrayActionWhenLimitReached)
|
||||
|
||||
DECLARE_SETTING_ENUM(MergeSelectorAlgorithm)
|
||||
|
||||
enum class DatabaseIcebergCatalogType : uint8_t
|
||||
{
|
||||
REST,
|
||||
};
|
||||
|
||||
DECLARE_SETTING_ENUM(DatabaseIcebergCatalogType)
|
||||
|
||||
enum class DatabaseIcebergStorageType : uint8_t
|
||||
{
|
||||
S3,
|
||||
Azure,
|
||||
Local,
|
||||
HDFS,
|
||||
};
|
||||
|
||||
DECLARE_SETTING_ENUM(DatabaseIcebergStorageType)
|
||||
|
||||
}
|
||||
|
365
src/Databases/Iceberg/DatabaseIceberg.cpp
Normal file
365
src/Databases/Iceberg/DatabaseIceberg.cpp
Normal file
@ -0,0 +1,365 @@
|
||||
#include <Databases/Iceberg/DatabaseIceberg.h>
|
||||
|
||||
#if USE_AVRO
|
||||
#include <Access/Common/HTTPAuthenticationScheme.h>
|
||||
|
||||
#include <Databases/DatabaseFactory.h>
|
||||
#include <Databases/Iceberg/RestCatalog.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
|
||||
#include <Storages/ObjectStorage/S3/Configuration.h>
|
||||
#include <Storages/ConstraintsDescription.h>
|
||||
#include <Storages/StorageNull.h>
|
||||
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
|
||||
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/StorageID.h>
|
||||
|
||||
#include <Formats/FormatFactory.h>
|
||||
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTDataType.h>
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric IcebergCatalogThreads;
|
||||
extern const Metric IcebergCatalogThreadsActive;
|
||||
extern const Metric IcebergCatalogThreadsScheduled;
|
||||
}
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace DatabaseIcebergSetting
|
||||
{
|
||||
extern const DatabaseIcebergSettingsDatabaseIcebergCatalogType catalog_type;
|
||||
extern const DatabaseIcebergSettingsDatabaseIcebergStorageType storage_type;
|
||||
extern const DatabaseIcebergSettingsString warehouse;
|
||||
extern const DatabaseIcebergSettingsString catalog_credential;
|
||||
extern const DatabaseIcebergSettingsString auth_header;
|
||||
extern const DatabaseIcebergSettingsString storage_endpoint;
|
||||
}
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
/// Parse a string, containing at least one dot, into a two substrings:
|
||||
/// A.B.C.D.E -> A.B.C.D and E, where
|
||||
/// `A.B.C.D` is a table "namespace".
|
||||
/// `E` is a table name.
|
||||
std::pair<std::string, std::string> parseTableName(const std::string & name)
|
||||
{
|
||||
auto pos = name.rfind('.');
|
||||
if (pos == std::string::npos)
|
||||
throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Table cannot have empty namespace: {}", name);
|
||||
|
||||
auto table_name = name.substr(pos + 1);
|
||||
auto namespace_name = name.substr(0, name.size() - table_name.size() - 1);
|
||||
return {namespace_name, table_name};
|
||||
}
|
||||
}
|
||||
|
||||
DatabaseIceberg::DatabaseIceberg(
|
||||
const std::string & database_name_,
|
||||
const std::string & url_,
|
||||
const DatabaseIcebergSettings & settings_,
|
||||
ASTPtr database_engine_definition_,
|
||||
ContextPtr context_)
|
||||
: IDatabase(database_name_)
|
||||
, url(url_)
|
||||
, settings(settings_)
|
||||
, database_engine_definition(database_engine_definition_)
|
||||
, log(getLogger("DatabaseIceberg(" + database_name_ + ")"))
|
||||
{
|
||||
validateSettings(context_);
|
||||
}
|
||||
|
||||
void DatabaseIceberg::validateSettings(const ContextPtr & context_)
|
||||
{
|
||||
if (settings[DatabaseIcebergSetting::warehouse].value.empty())
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS, "`warehouse` setting cannot be empty. "
|
||||
"Please specify 'SETTINGS warehouse=<warehouse_name>' in the CREATE DATABASE query");
|
||||
}
|
||||
|
||||
if (!settings[DatabaseIcebergSetting::storage_type].changed)
|
||||
{
|
||||
auto catalog = getCatalog(context_);
|
||||
const auto storage_type = catalog->getStorageType();
|
||||
if (!storage_type)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS, "Storage type is not found in catalog config. "
|
||||
"Please specify it manually via 'SETTINGS storage_type=<type>' in CREATE DATABASE query");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<Iceberg::ICatalog> DatabaseIceberg::getCatalog(ContextPtr) const
|
||||
{
|
||||
if (catalog_impl)
|
||||
return catalog_impl;
|
||||
|
||||
switch (settings[DatabaseIcebergSetting::catalog_type].value)
|
||||
{
|
||||
case DB::DatabaseIcebergCatalogType::REST:
|
||||
{
|
||||
catalog_impl = std::make_shared<Iceberg::RestCatalog>(
|
||||
settings[DatabaseIcebergSetting::warehouse].value,
|
||||
url,
|
||||
settings[DatabaseIcebergSetting::catalog_credential].value,
|
||||
settings[DatabaseIcebergSetting::auth_header],
|
||||
Context::getGlobalContextInstance());
|
||||
}
|
||||
}
|
||||
return catalog_impl;
|
||||
}
|
||||
|
||||
std::shared_ptr<StorageObjectStorage::Configuration> DatabaseIceberg::getConfiguration() const
|
||||
{
|
||||
switch (settings[DatabaseIcebergSetting::storage_type].value)
|
||||
{
|
||||
#if USE_AWS_S3
|
||||
case DB::DatabaseIcebergStorageType::S3:
|
||||
{
|
||||
return std::make_shared<StorageS3IcebergConfiguration>();
|
||||
}
|
||||
#endif
|
||||
#if USE_AZURE_BLOB_STORAGE
|
||||
case DB::DatabaseIcebergStorageType::Azure:
|
||||
{
|
||||
return std::make_shared<StorageAzureIcebergConfiguration>();
|
||||
}
|
||||
#endif
|
||||
#if USE_HDFS
|
||||
case DB::DatabaseIcebergStorageType::HDFS:
|
||||
{
|
||||
return std::make_shared<StorageHDFSIcebergConfiguration>();
|
||||
}
|
||||
#endif
|
||||
case DB::DatabaseIcebergStorageType::Local:
|
||||
{
|
||||
return std::make_shared<StorageLocalIcebergConfiguration>();
|
||||
}
|
||||
#if !USE_AWS_S3 || !USE_AZURE_BLOB_STORAGE || !USE_HDFS
|
||||
default:
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Server does not contain support for storage type {}",
|
||||
settings[DatabaseIcebergSetting::storage_type].value);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
std::string DatabaseIceberg::getStorageEndpointForTable(const Iceberg::TableMetadata & table_metadata) const
|
||||
{
|
||||
auto endpoint_from_settings = settings[DatabaseIcebergSetting::storage_endpoint].value;
|
||||
if (!endpoint_from_settings.empty())
|
||||
{
|
||||
return std::filesystem::path(endpoint_from_settings)
|
||||
/ table_metadata.getLocation(/* path_only */true)
|
||||
/ "";
|
||||
}
|
||||
else
|
||||
{
|
||||
return std::filesystem::path(table_metadata.getLocation(/* path_only */false)) / "";
|
||||
}
|
||||
}
|
||||
|
||||
bool DatabaseIceberg::empty() const
|
||||
{
|
||||
return getCatalog(Context::getGlobalContextInstance())->empty();
|
||||
}
|
||||
|
||||
bool DatabaseIceberg::isTableExist(const String & name, ContextPtr context_) const
|
||||
{
|
||||
const auto [namespace_name, table_name] = parseTableName(name);
|
||||
return getCatalog(context_)->existsTable(namespace_name, table_name);
|
||||
}
|
||||
|
||||
StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_) const
|
||||
{
|
||||
auto catalog = getCatalog(context_);
|
||||
auto table_metadata = Iceberg::TableMetadata().withLocation().withSchema();
|
||||
auto [namespace_name, table_name] = parseTableName(name);
|
||||
|
||||
if (!catalog->tryGetTableMetadata(namespace_name, table_name, table_metadata))
|
||||
return nullptr;
|
||||
|
||||
/// Take database engine definition AST as base.
|
||||
ASTStorage * storage = database_engine_definition->as<ASTStorage>();
|
||||
ASTs args = storage->engine->arguments->children;
|
||||
|
||||
/// Replace Iceberg Catalog endpoint with storage path endpoint of requested table.
|
||||
auto table_endpoint = getStorageEndpointForTable(table_metadata);
|
||||
args[0] = std::make_shared<ASTLiteral>(table_endpoint);
|
||||
|
||||
LOG_TEST(log, "Using table endpoint: {}", table_endpoint);
|
||||
|
||||
const auto columns = ColumnsDescription(table_metadata.getSchema());
|
||||
const auto configuration = getConfiguration();
|
||||
|
||||
/// with_table_structure = false: because there will be
|
||||
/// no table structure in table definition AST.
|
||||
StorageObjectStorage::Configuration::initialize(*configuration, args, context_, /* with_table_structure */false);
|
||||
|
||||
return std::make_shared<StorageObjectStorage>(
|
||||
configuration,
|
||||
configuration->createObjectStorage(context_, /* is_readonly */ false),
|
||||
context_,
|
||||
StorageID(getDatabaseName(), name),
|
||||
/* columns */columns,
|
||||
/* constraints */ConstraintsDescription{},
|
||||
/* comment */"",
|
||||
getFormatSettings(context_),
|
||||
LoadingStrictnessLevel::CREATE,
|
||||
/* distributed_processing */false,
|
||||
/* partition_by */nullptr,
|
||||
/* lazy_init */true);
|
||||
}
|
||||
|
||||
DatabaseTablesIteratorPtr DatabaseIceberg::getTablesIterator(
|
||||
ContextPtr context_,
|
||||
const FilterByNameFunction & filter_by_table_name,
|
||||
bool /* skip_not_loaded */) const
|
||||
{
|
||||
Tables tables;
|
||||
auto catalog = getCatalog(context_);
|
||||
|
||||
auto iceberg_tables = catalog->getTables();
|
||||
size_t num_threads = std::min<size_t>(10, iceberg_tables.size());
|
||||
ThreadPool pool(
|
||||
CurrentMetrics::IcebergCatalogThreads,
|
||||
CurrentMetrics::IcebergCatalogThreadsActive,
|
||||
CurrentMetrics::IcebergCatalogThreadsScheduled,
|
||||
num_threads);
|
||||
|
||||
DB::ThreadPoolCallbackRunnerLocal<void> runner(pool, "RestCatalog");
|
||||
std::mutex mutexx;
|
||||
|
||||
for (const auto & table_name : iceberg_tables)
|
||||
{
|
||||
if (filter_by_table_name && !filter_by_table_name(table_name))
|
||||
continue;
|
||||
|
||||
runner([&]{
|
||||
auto storage = tryGetTable(table_name, context_);
|
||||
{
|
||||
std::lock_guard lock(mutexx);
|
||||
[[maybe_unused]] bool inserted = tables.emplace(table_name, storage).second;
|
||||
chassert(inserted);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
runner.waitForAllToFinishAndRethrowFirstError();
|
||||
return std::make_unique<DatabaseTablesSnapshotIterator>(tables, getDatabaseName());
|
||||
}
|
||||
|
||||
ASTPtr DatabaseIceberg::getCreateDatabaseQuery() const
|
||||
{
|
||||
const auto & create_query = std::make_shared<ASTCreateQuery>();
|
||||
create_query->setDatabase(getDatabaseName());
|
||||
create_query->set(create_query->storage, database_engine_definition);
|
||||
return create_query;
|
||||
}
|
||||
|
||||
ASTPtr DatabaseIceberg::getCreateTableQueryImpl(
|
||||
const String & name,
|
||||
ContextPtr context_,
|
||||
bool /* throw_on_error */) const
|
||||
{
|
||||
auto catalog = getCatalog(context_);
|
||||
auto table_metadata = Iceberg::TableMetadata().withLocation().withSchema();
|
||||
|
||||
const auto [namespace_name, table_name] = parseTableName(name);
|
||||
catalog->getTableMetadata(namespace_name, table_name, table_metadata);
|
||||
|
||||
auto create_table_query = std::make_shared<ASTCreateQuery>();
|
||||
auto table_storage_define = database_engine_definition->clone();
|
||||
|
||||
auto * storage = table_storage_define->as<ASTStorage>();
|
||||
storage->engine->kind = ASTFunction::Kind::TABLE_ENGINE;
|
||||
storage->settings = {};
|
||||
|
||||
create_table_query->set(create_table_query->storage, table_storage_define);
|
||||
|
||||
auto columns_declare_list = std::make_shared<ASTColumns>();
|
||||
auto columns_expression_list = std::make_shared<ASTExpressionList>();
|
||||
|
||||
columns_declare_list->set(columns_declare_list->columns, columns_expression_list);
|
||||
create_table_query->set(create_table_query->columns_list, columns_declare_list);
|
||||
|
||||
create_table_query->setTable(name);
|
||||
create_table_query->setDatabase(getDatabaseName());
|
||||
|
||||
for (const auto & column_type_and_name : table_metadata.getSchema())
|
||||
{
|
||||
const auto column_declaration = std::make_shared<ASTColumnDeclaration>();
|
||||
column_declaration->name = column_type_and_name.name;
|
||||
column_declaration->type = makeASTDataType(column_type_and_name.type->getName());
|
||||
columns_expression_list->children.emplace_back(column_declaration);
|
||||
}
|
||||
|
||||
auto storage_engine_arguments = storage->engine->arguments;
|
||||
if (storage_engine_arguments->children.empty())
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS, "Unexpected number of arguments: {}",
|
||||
storage_engine_arguments->children.size());
|
||||
}
|
||||
|
||||
auto table_endpoint = getStorageEndpointForTable(table_metadata);
|
||||
storage_engine_arguments->children[0] = std::make_shared<ASTLiteral>(table_endpoint);
|
||||
|
||||
return create_table_query;
|
||||
}
|
||||
|
||||
void registerDatabaseIceberg(DatabaseFactory & factory)
|
||||
{
|
||||
auto create_fn = [](const DatabaseFactory::Arguments & args)
|
||||
{
|
||||
const auto * database_engine_define = args.create_query.storage;
|
||||
const auto & database_engine_name = args.engine_name;
|
||||
|
||||
const ASTFunction * function_define = database_engine_define->engine;
|
||||
if (!function_define->arguments)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name);
|
||||
|
||||
ASTs & engine_args = function_define->arguments->children;
|
||||
if (engine_args.empty())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name);
|
||||
|
||||
const size_t max_args_num = 3;
|
||||
if (engine_args.size() != max_args_num)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine must have {} arguments", max_args_num);
|
||||
|
||||
for (auto & engine_arg : engine_args)
|
||||
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context);
|
||||
|
||||
const auto url = engine_args[0]->as<ASTLiteral>()->value.safeGet<String>();
|
||||
|
||||
DatabaseIcebergSettings database_settings;
|
||||
if (database_engine_define->settings)
|
||||
database_settings.loadFromQuery(*database_engine_define);
|
||||
|
||||
return std::make_shared<DatabaseIceberg>(
|
||||
args.database_name,
|
||||
url,
|
||||
database_settings,
|
||||
database_engine_define->clone(),
|
||||
args.context);
|
||||
};
|
||||
factory.registerDatabase("Iceberg", create_fn, { .supports_arguments = true, .supports_settings = true });
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
71
src/Databases/Iceberg/DatabaseIceberg.h
Normal file
71
src/Databases/Iceberg/DatabaseIceberg.h
Normal file
@ -0,0 +1,71 @@
|
||||
#pragma once
|
||||
#include "config.h"
|
||||
|
||||
#if USE_AVRO
|
||||
#include <Databases/DatabasesCommon.h>
|
||||
#include <Databases/Iceberg/DatabaseIcebergSettings.h>
|
||||
#include <Databases/Iceberg/ICatalog.h>
|
||||
#include <Storages/ObjectStorage/StorageObjectStorage.h>
|
||||
#include <Poco/Net/HTTPBasicCredentials.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// TODO:
|
||||
/// - auth: oauth, bearer token?
|
||||
/// - tests with azure, hdfs, local
|
||||
|
||||
class DatabaseIceberg final : public IDatabase, WithContext
|
||||
{
|
||||
public:
|
||||
explicit DatabaseIceberg(
|
||||
const std::string & database_name_,
|
||||
const std::string & url_,
|
||||
const DatabaseIcebergSettings & settings_,
|
||||
ASTPtr database_engine_definition_,
|
||||
ContextPtr context_);
|
||||
|
||||
String getEngineName() const override { return "Iceberg"; }
|
||||
|
||||
bool canContainMergeTreeTables() const override { return false; }
|
||||
bool canContainDistributedTables() const override { return false; }
|
||||
bool shouldBeEmptyOnDetach() const override { return false; }
|
||||
|
||||
bool empty() const override;
|
||||
|
||||
bool isTableExist(const String & name, ContextPtr context) const override;
|
||||
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;
|
||||
|
||||
DatabaseTablesIteratorPtr getTablesIterator(
|
||||
ContextPtr context,
|
||||
const FilterByNameFunction & filter_by_table_name,
|
||||
bool skip_not_loaded) const override;
|
||||
|
||||
void shutdown() override {}
|
||||
|
||||
ASTPtr getCreateDatabaseQuery() const override;
|
||||
|
||||
protected:
|
||||
ASTPtr getCreateTableQueryImpl(const String & table_name, ContextPtr context, bool throw_on_error) const override;
|
||||
|
||||
private:
|
||||
/// Iceberg Catalog url.
|
||||
const std::string url;
|
||||
/// SETTINGS from CREATE query.
|
||||
const DatabaseIcebergSettings settings;
|
||||
/// Database engine definition taken from initial CREATE DATABASE query.
|
||||
const ASTPtr database_engine_definition;
|
||||
const LoggerPtr log;
|
||||
/// Crendetials to authenticate Iceberg Catalog.
|
||||
Poco::Net::HTTPBasicCredentials credentials;
|
||||
|
||||
mutable std::shared_ptr<Iceberg::ICatalog> catalog_impl;
|
||||
|
||||
void validateSettings(const ContextPtr & context_);
|
||||
std::shared_ptr<Iceberg::ICatalog> getCatalog(ContextPtr context_) const;
|
||||
std::shared_ptr<StorageObjectStorage::Configuration> getConfiguration() const;
|
||||
std::string getStorageEndpointForTable(const Iceberg::TableMetadata & table_metadata) const;
|
||||
};
|
||||
|
||||
}
|
||||
#endif
|
86
src/Databases/Iceberg/DatabaseIcebergSettings.cpp
Normal file
86
src/Databases/Iceberg/DatabaseIcebergSettings.cpp
Normal file
@ -0,0 +1,86 @@
|
||||
#include <Core/BaseSettings.h>
|
||||
#include <Core/BaseSettingsFwdMacrosImpl.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTSetQuery.h>
|
||||
#include <Databases/Iceberg/DatabaseIcebergSettings.h>
|
||||
#include <Common/Exception.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int UNKNOWN_SETTING;
|
||||
}
|
||||
|
||||
#define DATABASE_ICEBERG_RELATED_SETTINGS(DECLARE, ALIAS) \
|
||||
DECLARE(DatabaseIcebergCatalogType, catalog_type, DatabaseIcebergCatalogType::REST, "Catalog type", 0) \
|
||||
DECLARE(DatabaseIcebergStorageType, storage_type, DatabaseIcebergStorageType::S3, "Storage type: S3, Local, Azure, HDFS", 0) \
|
||||
DECLARE(String, catalog_credential, "", "", 0) \
|
||||
DECLARE(String, warehouse, "", "Warehouse name inside the catalog", 0) \
|
||||
DECLARE(String, auth_header, "", "Authorization header of format 'Authorization: <scheme> <auth_info>'", 0) \
|
||||
DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \
|
||||
|
||||
#define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \
|
||||
DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS)
|
||||
|
||||
DECLARE_SETTINGS_TRAITS(DatabaseIcebergSettingsTraits, LIST_OF_DATABASE_ICEBERG_SETTINGS)
|
||||
IMPLEMENT_SETTINGS_TRAITS(DatabaseIcebergSettingsTraits, LIST_OF_DATABASE_ICEBERG_SETTINGS)
|
||||
|
||||
struct DatabaseIcebergSettingsImpl : public BaseSettings<DatabaseIcebergSettingsTraits>
|
||||
{
|
||||
};
|
||||
|
||||
#define INITIALIZE_SETTING_EXTERN(TYPE, NAME, DEFAULT, DESCRIPTION, FLAGS) \
|
||||
DatabaseIcebergSettings##TYPE NAME = &DatabaseIcebergSettingsImpl ::NAME;
|
||||
|
||||
namespace DatabaseIcebergSetting
|
||||
{
|
||||
LIST_OF_DATABASE_ICEBERG_SETTINGS(INITIALIZE_SETTING_EXTERN, SKIP_ALIAS)
|
||||
}
|
||||
|
||||
#undef INITIALIZE_SETTING_EXTERN
|
||||
|
||||
DatabaseIcebergSettings::DatabaseIcebergSettings() : impl(std::make_unique<DatabaseIcebergSettingsImpl>())
|
||||
{
|
||||
}
|
||||
|
||||
DatabaseIcebergSettings::DatabaseIcebergSettings(const DatabaseIcebergSettings & settings)
|
||||
: impl(std::make_unique<DatabaseIcebergSettingsImpl>(*settings.impl))
|
||||
{
|
||||
}
|
||||
|
||||
DatabaseIcebergSettings::DatabaseIcebergSettings(DatabaseIcebergSettings && settings) noexcept
|
||||
: impl(std::make_unique<DatabaseIcebergSettingsImpl>(std::move(*settings.impl)))
|
||||
{
|
||||
}
|
||||
|
||||
DatabaseIcebergSettings::~DatabaseIcebergSettings() = default;
|
||||
|
||||
DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseIcebergSettings, IMPLEMENT_SETTING_SUBSCRIPT_OPERATOR)
|
||||
|
||||
|
||||
void DatabaseIcebergSettings::applyChanges(const SettingsChanges & changes)
|
||||
{
|
||||
impl->applyChanges(changes);
|
||||
}
|
||||
|
||||
void DatabaseIcebergSettings::loadFromQuery(const ASTStorage & storage_def)
|
||||
{
|
||||
if (storage_def.settings)
|
||||
{
|
||||
try
|
||||
{
|
||||
impl->applyChanges(storage_def.settings->changes);
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
|
||||
e.addMessage("for database engine " + storage_def.engine->name);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
40
src/Databases/Iceberg/DatabaseIcebergSettings.h
Normal file
40
src/Databases/Iceberg/DatabaseIcebergSettings.h
Normal file
@ -0,0 +1,40 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/BaseSettingsFwdMacros.h>
|
||||
#include <Core/FormatFactorySettings.h>
|
||||
#include <Core/SettingsEnums.h>
|
||||
#include <Core/SettingsFields.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class ASTStorage;
|
||||
struct DatabaseIcebergSettingsImpl;
|
||||
class SettingsChanges;
|
||||
|
||||
/// List of available types supported in DatabaseIcebergSettings object
|
||||
#define DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \
|
||||
M(CLASS_NAME, String) \
|
||||
M(CLASS_NAME, UInt64) \
|
||||
M(CLASS_NAME, DatabaseIcebergCatalogType) \
|
||||
M(CLASS_NAME, DatabaseIcebergStorageType)
|
||||
|
||||
DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseIcebergSettings, DECLARE_SETTING_TRAIT)
|
||||
|
||||
struct DatabaseIcebergSettings
|
||||
{
|
||||
DatabaseIcebergSettings();
|
||||
DatabaseIcebergSettings(const DatabaseIcebergSettings & settings);
|
||||
DatabaseIcebergSettings(DatabaseIcebergSettings && settings) noexcept;
|
||||
~DatabaseIcebergSettings();
|
||||
|
||||
DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseIcebergSettings, DECLARE_SETTING_SUBSCRIPT_OPERATOR)
|
||||
|
||||
void loadFromQuery(const ASTStorage & storage_def);
|
||||
|
||||
void applyChanges(const SettingsChanges & changes);
|
||||
|
||||
private:
|
||||
std::unique_ptr<DatabaseIcebergSettingsImpl> impl;
|
||||
};
|
||||
}
|
86
src/Databases/Iceberg/ICatalog.cpp
Normal file
86
src/Databases/Iceberg/ICatalog.cpp
Normal file
@ -0,0 +1,86 @@
|
||||
#include <Databases/Iceberg/ICatalog.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Poco/String.h>
|
||||
|
||||
namespace DB::ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
namespace Iceberg
|
||||
{
|
||||
|
||||
void TableMetadata::setLocation(const std::string & location_)
|
||||
{
|
||||
if (!with_location)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data location was not requested");
|
||||
|
||||
/// Location has format:
|
||||
/// s3://<bucket>/path/to/table/data.
|
||||
/// We want to split s3://<bucket> and path/to/table/data.
|
||||
|
||||
auto pos = location_.find("://");
|
||||
if (pos == std::string::npos)
|
||||
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);
|
||||
|
||||
auto pos_to_bucket = pos + std::strlen("://");
|
||||
auto pos_to_path = location_.substr(pos_to_bucket).find('/');
|
||||
|
||||
if (pos_to_path == std::string::npos)
|
||||
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);
|
||||
|
||||
pos_to_path = pos_to_bucket + pos_to_path;
|
||||
|
||||
location_without_path = location_.substr(0, pos_to_path);
|
||||
path = location_.substr(pos_to_path + 1);
|
||||
|
||||
LOG_TEST(getLogger("TableMetadata"),
|
||||
"Parsed location without path: {}, path: {}",
|
||||
location_without_path, path);
|
||||
}
|
||||
|
||||
std::string TableMetadata::getLocation(bool path_only) const
|
||||
{
|
||||
if (!with_location)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data location was not requested");
|
||||
|
||||
if (path_only)
|
||||
return path;
|
||||
else
|
||||
return std::filesystem::path(location_without_path) / path;
|
||||
}
|
||||
|
||||
void TableMetadata::setSchema(const DB::NamesAndTypesList & schema_)
|
||||
{
|
||||
if (!with_schema)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data schema was not requested");
|
||||
|
||||
schema = schema_;
|
||||
}
|
||||
|
||||
const DB::NamesAndTypesList & TableMetadata::getSchema() const
|
||||
{
|
||||
if (!with_schema)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data schema was not requested");
|
||||
|
||||
return schema;
|
||||
}
|
||||
|
||||
StorageType ICatalog::getStorageType(const std::string & location)
|
||||
{
|
||||
auto pos = location.find("://");
|
||||
if (pos == std::string::npos)
|
||||
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected path format: {}", location);
|
||||
|
||||
auto storage_type_str = location.substr(0, pos);
|
||||
auto storage_type = magic_enum::enum_cast<StorageType>(Poco::toUpper(storage_type_str));
|
||||
|
||||
if (!storage_type)
|
||||
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unsupported storage type: {}", storage_type_str);
|
||||
|
||||
return *storage_type;
|
||||
}
|
||||
|
||||
}
|
78
src/Databases/Iceberg/ICatalog.h
Normal file
78
src/Databases/Iceberg/ICatalog.h
Normal file
@ -0,0 +1,78 @@
|
||||
#pragma once
|
||||
#include <Core/Types.h>
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Core/SettingsEnums.h>
|
||||
|
||||
namespace Iceberg
|
||||
{
|
||||
using StorageType = DB::DatabaseIcebergStorageType;
|
||||
|
||||
class TableMetadata
|
||||
{
|
||||
public:
|
||||
TableMetadata() = default;
|
||||
|
||||
TableMetadata & withLocation() { with_location = true; return *this; }
|
||||
TableMetadata & withSchema() { with_schema = true; return *this; }
|
||||
|
||||
std::string getLocation(bool path_only) const;
|
||||
std::string getLocation() const;
|
||||
std::string getLocationWithoutPath() const;
|
||||
|
||||
const DB::NamesAndTypesList & getSchema() const;
|
||||
|
||||
bool requiresLocation() const { return with_location; }
|
||||
bool requiresSchema() const { return with_schema; }
|
||||
|
||||
void setLocation(const std::string & location_);
|
||||
void setSchema(const DB::NamesAndTypesList & schema_);
|
||||
|
||||
private:
|
||||
/// starts with s3://, file://, etc
|
||||
std::string location_without_path;
|
||||
std::string path;
|
||||
/// column names and types
|
||||
DB::NamesAndTypesList schema;
|
||||
|
||||
bool with_location = false;
|
||||
bool with_schema = false;
|
||||
};
|
||||
|
||||
|
||||
class ICatalog
|
||||
{
|
||||
public:
|
||||
using Namespaces = std::vector<std::string>;
|
||||
using Tables = std::vector<std::string>;
|
||||
|
||||
explicit ICatalog(const std::string & warehouse_) : warehouse(warehouse_) {}
|
||||
|
||||
virtual ~ICatalog() = default;
|
||||
|
||||
virtual bool empty() const = 0;
|
||||
|
||||
virtual Tables getTables() const = 0;
|
||||
|
||||
virtual bool existsTable(
|
||||
const std::string & namespace_naem,
|
||||
const std::string & table_name) const = 0;
|
||||
|
||||
virtual void getTableMetadata(
|
||||
const std::string & namespace_name,
|
||||
const std::string & table_name,
|
||||
TableMetadata & result) const = 0;
|
||||
|
||||
virtual bool tryGetTableMetadata(
|
||||
const std::string & namespace_name,
|
||||
const std::string & table_name,
|
||||
TableMetadata & result) const = 0;
|
||||
|
||||
virtual std::optional<StorageType> getStorageType() const = 0;
|
||||
|
||||
protected:
|
||||
const std::string warehouse;
|
||||
|
||||
static StorageType getStorageType(const std::string & location);
|
||||
};
|
||||
|
||||
}
|
540
src/Databases/Iceberg/RestCatalog.cpp
Normal file
540
src/Databases/Iceberg/RestCatalog.cpp
Normal file
@ -0,0 +1,540 @@
|
||||
#include <Databases/Iceberg/RestCatalog.h>
|
||||
|
||||
#include <base/find_symbols.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Common/threadPoolCallbackRunner.h>
|
||||
#include <Common/Base64.h>
|
||||
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
#include <IO/HTTPCommon.h>
|
||||
#include <IO/ReadBuffer.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
#include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
|
||||
#include <Server/HTTP/HTMLForm.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
|
||||
#include <Poco/URI.h>
|
||||
#include <Poco/JSON/Array.h>
|
||||
#include <Poco/JSON/Parser.h>
|
||||
|
||||
namespace DB::ErrorCodes
|
||||
{
|
||||
extern const int ICEBERG_CATALOG_ERROR;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric IcebergCatalogThreads;
|
||||
extern const Metric IcebergCatalogThreadsActive;
|
||||
extern const Metric IcebergCatalogThreadsScheduled;
|
||||
}
|
||||
|
||||
namespace Iceberg
|
||||
{
|
||||
|
||||
static constexpr auto config_endpoint = "config";
|
||||
static constexpr auto namespaces_endpoint = "namespaces";
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
std::pair<std::string, std::string> parseCatalogCredential(const std::string & catalog_credential)
|
||||
{
|
||||
/// Parse a string of format "<client_id>:<client_secret>"
|
||||
/// into separare strings client_id and client_secret.
|
||||
|
||||
std::string client_id, client_secret;
|
||||
if (!catalog_credential.empty())
|
||||
{
|
||||
auto pos = catalog_credential.find(':');
|
||||
if (pos == std::string::npos)
|
||||
{
|
||||
throw DB::Exception(
|
||||
DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected format of catalog credential: "
|
||||
"expected client_id and client_secret separated by `:`");
|
||||
}
|
||||
client_id = catalog_credential.substr(0, pos);
|
||||
client_secret = catalog_credential.substr(pos + 1);
|
||||
}
|
||||
return std::pair(client_id, client_secret);
|
||||
}
|
||||
|
||||
DB::HTTPHeaderEntry parseAuthHeader(const std::string & auth_header)
|
||||
{
|
||||
/// Parse a string of format "Authorization: <auth_scheme> <auth_token>"
|
||||
/// into a key-value header "Authorization", "<auth_scheme> <auth_token>"
|
||||
|
||||
auto pos = auth_header.find(':');
|
||||
if (pos == std::string::npos)
|
||||
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected format of auth header");
|
||||
|
||||
return DB::HTTPHeaderEntry(auth_header.substr(0, pos), auth_header.substr(pos + 1));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
std::string RestCatalog::Config::toString() const
|
||||
{
|
||||
DB::WriteBufferFromOwnString wb;
|
||||
|
||||
if (!prefix.empty())
|
||||
wb << "prefix: " << prefix.string() << ", ";
|
||||
|
||||
if (!default_base_location.empty())
|
||||
wb << "default_base_location: " << default_base_location << ", ";
|
||||
|
||||
return wb.str();
|
||||
}
|
||||
|
||||
RestCatalog::RestCatalog(
|
||||
const std::string & warehouse_,
|
||||
const std::string & base_url_,
|
||||
const std::string & catalog_credential_,
|
||||
const std::string & auth_header_,
|
||||
DB::ContextPtr context_)
|
||||
: ICatalog(warehouse_)
|
||||
, DB::WithContext(context_)
|
||||
, base_url(base_url_)
|
||||
, log(getLogger("RestCatalog(" + warehouse_ + ")"))
|
||||
{
|
||||
if (!catalog_credential_.empty())
|
||||
std::tie(client_id, client_secret) = parseCatalogCredential(catalog_credential_);
|
||||
else if (!auth_header_.empty())
|
||||
auth_header = parseAuthHeader(auth_header_);
|
||||
|
||||
config = loadConfig();
|
||||
}
|
||||
|
||||
RestCatalog::Config RestCatalog::loadConfig()
|
||||
{
|
||||
Poco::URI::QueryParameters params = {{"warehouse", warehouse}};
|
||||
auto buf = createReadBuffer(config_endpoint, params);
|
||||
|
||||
std::string json_str;
|
||||
readJSONObjectPossiblyInvalid(json_str, *buf);
|
||||
|
||||
LOG_TEST(log, "Received catalog configuration settings: {}", json_str);
|
||||
|
||||
Poco::JSON::Parser parser;
|
||||
Poco::Dynamic::Var json = parser.parse(json_str);
|
||||
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
|
||||
|
||||
Config result;
|
||||
|
||||
auto defaults_object = object->get("defaults").extract<Poco::JSON::Object::Ptr>();
|
||||
parseCatalogConfigurationSettings(defaults_object, result);
|
||||
|
||||
auto overrides_object = object->get("overrides").extract<Poco::JSON::Object::Ptr>();
|
||||
parseCatalogConfigurationSettings(overrides_object, result);
|
||||
|
||||
LOG_TEST(log, "Parsed catalog configuration settings: {}", result.toString());
|
||||
return result;
|
||||
}
|
||||
|
||||
void RestCatalog::parseCatalogConfigurationSettings(const Poco::JSON::Object::Ptr & object, Config & result)
|
||||
{
|
||||
if (!object)
|
||||
return;
|
||||
|
||||
if (object->has("prefix"))
|
||||
result.prefix = object->get("prefix").extract<String>();
|
||||
|
||||
if (object->has("default-base-location"))
|
||||
result.default_base_location = object->get("default-base-location").extract<String>();
|
||||
}
|
||||
|
||||
DB::HTTPHeaderEntries RestCatalog::getHeaders(bool update_token) const
|
||||
{
|
||||
if (auth_header.has_value())
|
||||
{
|
||||
return DB::HTTPHeaderEntries{auth_header.value()};
|
||||
}
|
||||
|
||||
if (!client_id.empty())
|
||||
{
|
||||
if (!access_token.has_value() || update_token)
|
||||
{
|
||||
access_token = retrieveAccessToken();
|
||||
}
|
||||
|
||||
DB::HTTPHeaderEntries headers;
|
||||
headers.emplace_back("Authorization", "Bearer " + access_token.value());
|
||||
return headers;
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
std::string RestCatalog::retrieveAccessToken() const
|
||||
{
|
||||
static constexpr auto oauth_tokens_endpoint = "oauth/tokens";
|
||||
|
||||
/// TODO:
|
||||
/// 1. support oauth2-server-uri
|
||||
/// https://github.com/apache/iceberg/blob/918f81f3c3f498f46afcea17c1ac9cdc6913cb5c/open-api/rest-catalog-open-api.yaml#L183C82-L183C99
|
||||
|
||||
Poco::JSON::Object json;
|
||||
json.set("grant_type", "client_credentials");
|
||||
json.set("scope", "PRINCIPAL_ROLE:ALL"); /// TODO: add it into setting.
|
||||
json.set("client_id", client_id);
|
||||
json.set("client_secret", client_secret);
|
||||
|
||||
DB::HTTPHeaderEntries headers;
|
||||
headers.emplace_back("Content-Type", "application/x-www-form-urlencoded");
|
||||
headers.emplace_back("Accepts", "application/json; charset=UTF-8");
|
||||
|
||||
Poco::URI url(base_url / oauth_tokens_endpoint);
|
||||
Poco::URI::QueryParameters params = {
|
||||
{"grant_type", "client_credentials"},
|
||||
{"scope", "PRINCIPAL_ROLE:ALL"},
|
||||
{"client_id", client_id},
|
||||
{"client_secret", client_secret},
|
||||
};
|
||||
url.setQueryParameters(params);
|
||||
|
||||
const auto & context = getContext();
|
||||
auto wb = DB::BuilderRWBufferFromHTTP(url)
|
||||
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
|
||||
.withMethod(Poco::Net::HTTPRequest::HTTP_POST)
|
||||
.withSettings(context->getReadSettings())
|
||||
.withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings()))
|
||||
.withHostFilter(&context->getRemoteHostFilter())
|
||||
.withSkipNotFound(false)
|
||||
.withHeaders(headers)
|
||||
.create(credentials);
|
||||
|
||||
std::string json_str;
|
||||
readJSONObjectPossiblyInvalid(json_str, *wb);
|
||||
|
||||
Poco::JSON::Parser parser;
|
||||
Poco::Dynamic::Var res_json = parser.parse(json_str);
|
||||
const Poco::JSON::Object::Ptr & object = res_json.extract<Poco::JSON::Object::Ptr>();
|
||||
|
||||
return object->get("access_token").extract<String>();
|
||||
}
|
||||
|
||||
std::optional<StorageType> RestCatalog::getStorageType() const
|
||||
{
|
||||
if (config.default_base_location.empty())
|
||||
return std::nullopt;
|
||||
return ICatalog::getStorageType(config.default_base_location);
|
||||
}
|
||||
|
||||
DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer(
|
||||
const std::string & endpoint,
|
||||
const Poco::URI::QueryParameters & params) const
|
||||
{
|
||||
const auto & context = getContext();
|
||||
|
||||
Poco::URI url(base_url / endpoint);
|
||||
if (!params.empty())
|
||||
url.setQueryParameters(params);
|
||||
|
||||
auto headers = getHeaders(false);
|
||||
|
||||
LOG_TEST(log, "Requesting: {}", url.toString());
|
||||
|
||||
try
|
||||
{
|
||||
return DB::BuilderRWBufferFromHTTP(url)
|
||||
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
|
||||
.withSettings(getContext()->getReadSettings())
|
||||
.withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings()))
|
||||
.withHeaders(headers)
|
||||
.withHostFilter(&getContext()->getRemoteHostFilter())
|
||||
.withDelayInit(false)
|
||||
.withSkipNotFound(false)
|
||||
.create(credentials);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
auto new_headers = getHeaders(true);
|
||||
return DB::BuilderRWBufferFromHTTP(url)
|
||||
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
|
||||
.withSettings(getContext()->getReadSettings())
|
||||
.withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings()))
|
||||
.withHeaders(new_headers)
|
||||
.withHostFilter(&getContext()->getRemoteHostFilter())
|
||||
.withDelayInit(false)
|
||||
.withSkipNotFound(false)
|
||||
.create(credentials);
|
||||
}
|
||||
}
|
||||
|
||||
bool RestCatalog::empty() const
|
||||
{
|
||||
try
|
||||
{
|
||||
bool found_table = false;
|
||||
auto stop_condition = [&](const std::string & namespace_name) -> bool
|
||||
{
|
||||
const auto tables = getTables(namespace_name, /* limit */1);
|
||||
found_table = !tables.empty();
|
||||
return found_table;
|
||||
};
|
||||
|
||||
Namespaces namespaces;
|
||||
getNamespacesRecursive("", namespaces, stop_condition, {});
|
||||
|
||||
return found_table;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
DB::tryLogCurrentException(log);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
RestCatalog::Tables RestCatalog::getTables() const
|
||||
{
|
||||
size_t num_threads = 10;
|
||||
ThreadPool pool(
|
||||
CurrentMetrics::IcebergCatalogThreads,
|
||||
CurrentMetrics::IcebergCatalogThreadsActive,
|
||||
CurrentMetrics::IcebergCatalogThreadsScheduled,
|
||||
num_threads);
|
||||
|
||||
DB::ThreadPoolCallbackRunnerLocal<void> runner(pool, "RestCatalog");
|
||||
|
||||
Tables tables;
|
||||
std::mutex mutex;
|
||||
|
||||
auto func = [&](const std::string & current_namespace)
|
||||
{
|
||||
runner(
|
||||
[&]{
|
||||
auto tables_in_namespace = getTables(current_namespace);
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
std::move(tables_in_namespace.begin(), tables_in_namespace.end(), std::back_inserter(tables));
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
Namespaces namespaces;
|
||||
getNamespacesRecursive("", namespaces, {}, func);
|
||||
|
||||
runner.waitForAllToFinishAndRethrowFirstError();
|
||||
return tables;
|
||||
}
|
||||
|
||||
void RestCatalog::getNamespacesRecursive(
|
||||
const std::string & base_namespace,
|
||||
Namespaces & result,
|
||||
StopCondition stop_condition,
|
||||
ExecuteFunc func) const
|
||||
{
|
||||
auto namespaces = getNamespaces(base_namespace);
|
||||
result.reserve(result.size() + namespaces.size());
|
||||
result.insert(result.end(), namespaces.begin(), namespaces.end());
|
||||
|
||||
for (const auto & current_namespace : namespaces)
|
||||
{
|
||||
chassert(current_namespace.starts_with(base_namespace));
|
||||
|
||||
if (stop_condition && stop_condition(current_namespace))
|
||||
break;
|
||||
|
||||
if (func)
|
||||
func(current_namespace);
|
||||
|
||||
getNamespacesRecursive(current_namespace, result, stop_condition, func);
|
||||
}
|
||||
}
|
||||
|
||||
Poco::URI::QueryParameters RestCatalog::createParentNamespaceParams(const std::string & base_namespace) const
|
||||
{
|
||||
std::vector<std::string> parts;
|
||||
splitInto<'.'>(parts, base_namespace);
|
||||
std::string parent_param;
|
||||
for (const auto & part : parts)
|
||||
{
|
||||
if (!parent_param.empty())
|
||||
parent_param += static_cast<char>(0x1F);
|
||||
parent_param += part;
|
||||
}
|
||||
return {{"parent", parent_param}};
|
||||
}
|
||||
|
||||
RestCatalog::Namespaces RestCatalog::getNamespaces(const std::string & base_namespace) const
|
||||
{
|
||||
Poco::URI::QueryParameters params;
|
||||
if (!base_namespace.empty())
|
||||
params = createParentNamespaceParams(base_namespace);
|
||||
|
||||
try
|
||||
{
|
||||
auto buf = createReadBuffer(config.prefix / namespaces_endpoint, params);
|
||||
auto namespaces = parseNamespaces(*buf, base_namespace);
|
||||
LOG_TEST(log, "Loaded {} namespaces", namespaces.size());
|
||||
return namespaces;
|
||||
}
|
||||
catch (const DB::HTTPException & e)
|
||||
{
|
||||
std::string message = fmt::format(
|
||||
"Received error while fetching list of namespaces from iceberg catalog `{}`. ",
|
||||
warehouse);
|
||||
|
||||
if (e.code() == Poco::Net::HTTPResponse::HTTPStatus::HTTP_NOT_FOUND)
|
||||
message += "Namespace provided in the `parent` query parameter is not found. ";
|
||||
|
||||
message += fmt::format(
|
||||
"Code: {}, status: {}, message: {}",
|
||||
e.code(), e.getHTTPStatus(), e.displayText());
|
||||
|
||||
throw DB::Exception(DB::ErrorCodes::ICEBERG_CATALOG_ERROR, "{}", message);
|
||||
}
|
||||
}
|
||||
|
||||
RestCatalog::Namespaces RestCatalog::parseNamespaces(DB::ReadBuffer & buf, const std::string & base_namespace) const
|
||||
{
|
||||
if (buf.eof())
|
||||
return {};
|
||||
|
||||
String json_str;
|
||||
readJSONObjectPossiblyInvalid(json_str, buf);
|
||||
|
||||
LOG_TEST(log, "Received response: {}", json_str);
|
||||
|
||||
Poco::JSON::Parser parser;
|
||||
Poco::Dynamic::Var json = parser.parse(json_str);
|
||||
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
|
||||
|
||||
auto namespaces_object = object->get("namespaces").extract<Poco::JSON::Array::Ptr>();
|
||||
if (!namespaces_object)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse result");
|
||||
|
||||
Namespaces namespaces;
|
||||
for (size_t i = 0; i < namespaces_object->size(); ++i)
|
||||
{
|
||||
auto current_namespace_array = namespaces_object->get(static_cast<int>(i)).extract<Poco::JSON::Array::Ptr>();
|
||||
if (current_namespace_array->size() == 0)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Expected namespace array to be non-empty");
|
||||
|
||||
const int idx = static_cast<int>(current_namespace_array->size()) - 1;
|
||||
const auto current_namespace = current_namespace_array->get(idx).extract<String>();
|
||||
const auto full_namespace = base_namespace.empty()
|
||||
? current_namespace
|
||||
: base_namespace + "." + current_namespace;
|
||||
|
||||
namespaces.push_back(full_namespace);
|
||||
}
|
||||
|
||||
return namespaces;
|
||||
}
|
||||
|
||||
RestCatalog::Tables RestCatalog::getTables(const std::string & base_namespace, size_t limit) const
|
||||
{
|
||||
const auto endpoint = std::string(namespaces_endpoint) + "/" + base_namespace + "/tables";
|
||||
auto buf = createReadBuffer(config.prefix / endpoint);
|
||||
return parseTables(*buf, base_namespace, limit);
|
||||
}
|
||||
|
||||
RestCatalog::Tables RestCatalog::parseTables(DB::ReadBuffer & buf, const std::string & base_namespace, size_t limit) const
|
||||
{
|
||||
if (buf.eof())
|
||||
return {};
|
||||
|
||||
String json_str;
|
||||
readJSONObjectPossiblyInvalid(json_str, buf);
|
||||
|
||||
Poco::JSON::Parser parser;
|
||||
Poco::Dynamic::Var json = parser.parse(json_str);
|
||||
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
|
||||
|
||||
auto identifiers_object = object->get("identifiers").extract<Poco::JSON::Array::Ptr>();
|
||||
if (!identifiers_object)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse result");
|
||||
|
||||
Tables tables;
|
||||
for (size_t i = 0; i < identifiers_object->size(); ++i)
|
||||
{
|
||||
const auto current_table_json = identifiers_object->get(static_cast<int>(i)).extract<Poco::JSON::Object::Ptr>();
|
||||
const auto table_name = current_table_json->get("name").extract<String>();
|
||||
|
||||
tables.push_back(base_namespace + "." + table_name);
|
||||
if (limit && tables.size() >= limit)
|
||||
break;
|
||||
}
|
||||
return tables;
|
||||
}
|
||||
|
||||
bool RestCatalog::existsTable(const std::string & namespace_name, const std::string & table_name) const
|
||||
{
|
||||
TableMetadata table_metadata;
|
||||
return tryGetTableMetadata(namespace_name, table_name, table_metadata);
|
||||
}
|
||||
|
||||
bool RestCatalog::tryGetTableMetadata(
|
||||
const std::string & namespace_name,
|
||||
const std::string & table_name,
|
||||
TableMetadata & result) const
|
||||
{
|
||||
try
|
||||
{
|
||||
return getTableMetadataImpl(namespace_name, table_name, result);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
DB::tryLogCurrentException(log);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
void RestCatalog::getTableMetadata(
|
||||
const std::string & namespace_name,
|
||||
const std::string & table_name,
|
||||
TableMetadata & result) const
|
||||
{
|
||||
if (!getTableMetadataImpl(namespace_name, table_name, result))
|
||||
throw DB::Exception(DB::ErrorCodes::ICEBERG_CATALOG_ERROR, "No response from iceberg catalog");
|
||||
}
|
||||
|
||||
bool RestCatalog::getTableMetadataImpl(
|
||||
const std::string & namespace_name,
|
||||
const std::string & table_name,
|
||||
TableMetadata & result) const
|
||||
{
|
||||
LOG_TEST(log, "Checking table {} in namespace {}", table_name, namespace_name);
|
||||
|
||||
const auto endpoint = std::string(namespaces_endpoint) + "/" + namespace_name + "/tables/" + table_name;
|
||||
auto buf = createReadBuffer(config.prefix / endpoint);
|
||||
|
||||
if (buf->eof())
|
||||
{
|
||||
LOG_TEST(log, "Table doesn't exist (endpoint: {})", endpoint);
|
||||
return false;
|
||||
}
|
||||
|
||||
String json_str;
|
||||
readJSONObjectPossiblyInvalid(json_str, *buf);
|
||||
|
||||
LOG_TEST(log, "Received metadata for table {}: {}", table_name, json_str);
|
||||
|
||||
Poco::JSON::Parser parser;
|
||||
Poco::Dynamic::Var json = parser.parse(json_str);
|
||||
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
|
||||
|
||||
auto metadata_object = object->get("metadata").extract<Poco::JSON::Object::Ptr>();
|
||||
if (!metadata_object)
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse result");
|
||||
|
||||
if (result.requiresLocation())
|
||||
{
|
||||
const auto location = metadata_object->get("location").extract<String>();
|
||||
result.setLocation(location);
|
||||
LOG_TEST(log, "Location for table {}: {}", table_name, location);
|
||||
}
|
||||
|
||||
if (result.requiresSchema())
|
||||
{
|
||||
int format_version = metadata_object->getValue<int>("format-version");
|
||||
result.setSchema(DB::IcebergMetadata::parseTableSchema(metadata_object, format_version, true).first);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
111
src/Databases/Iceberg/RestCatalog.h
Normal file
111
src/Databases/Iceberg/RestCatalog.h
Normal file
@ -0,0 +1,111 @@
|
||||
#pragma once
|
||||
#include <Databases/Iceberg/ICatalog.h>
|
||||
#include <Poco/Net/HTTPBasicCredentials.h>
|
||||
#include <IO/ReadWriteBufferFromHTTP.h>
|
||||
#include <IO/HTTPHeaderEntries.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <filesystem>
|
||||
#include <Poco/JSON/Object.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class ReadBuffer;
|
||||
}
|
||||
|
||||
namespace Iceberg
|
||||
{
|
||||
|
||||
class RestCatalog final : public ICatalog, private DB::WithContext
|
||||
{
|
||||
public:
|
||||
explicit RestCatalog(
|
||||
const std::string & warehouse_,
|
||||
const std::string & base_url_,
|
||||
const std::string & catalog_credential_,
|
||||
const std::string & auth_header_,
|
||||
DB::ContextPtr context_);
|
||||
|
||||
~RestCatalog() override = default;
|
||||
|
||||
bool empty() const override;
|
||||
|
||||
Tables getTables() const override;
|
||||
|
||||
bool existsTable(const std::string & namespace_name, const std::string & table_name) const override;
|
||||
|
||||
void getTableMetadata(
|
||||
const std::string & namespace_name,
|
||||
const std::string & table_name,
|
||||
TableMetadata & result) const override;
|
||||
|
||||
bool tryGetTableMetadata(
|
||||
const std::string & namespace_name,
|
||||
const std::string & table_name,
|
||||
TableMetadata & result) const override;
|
||||
|
||||
std::optional<StorageType> getStorageType() const override;
|
||||
|
||||
private:
|
||||
struct Config
|
||||
{
|
||||
/// Prefix is a path of the catalog endpoint,
|
||||
/// e.g. /v1/{prefix}/namespaces/{namespace}/tables/{table}
|
||||
std::filesystem::path prefix;
|
||||
/// Base location is location of data in storage
|
||||
/// (in filesystem or object storage).
|
||||
std::string default_base_location;
|
||||
|
||||
std::string toString() const;
|
||||
};
|
||||
|
||||
const std::filesystem::path base_url;
|
||||
const LoggerPtr log;
|
||||
|
||||
/// Catalog configuration settings from /v1/config endpoint.
|
||||
Config config;
|
||||
|
||||
/// Auth headers of format: "Authorization": "<auth_scheme> <token>"
|
||||
std::optional<DB::HTTPHeaderEntry> auth_header;
|
||||
|
||||
/// Parameters for OAuth.
|
||||
std::string client_id;
|
||||
std::string client_secret;
|
||||
mutable std::optional<std::string> access_token;
|
||||
|
||||
Poco::Net::HTTPBasicCredentials credentials{};
|
||||
|
||||
DB::ReadWriteBufferFromHTTPPtr createReadBuffer(
|
||||
const std::string & endpoint,
|
||||
const Poco::URI::QueryParameters & params = {}) const;
|
||||
|
||||
Poco::URI::QueryParameters createParentNamespaceParams(const std::string & base_namespace) const;
|
||||
|
||||
using StopCondition = std::function<bool(const std::string & namespace_name)>;
|
||||
using ExecuteFunc = std::function<void(const std::string & namespace_name)>;
|
||||
|
||||
void getNamespacesRecursive(
|
||||
const std::string & base_namespace,
|
||||
Namespaces & result,
|
||||
StopCondition stop_condition,
|
||||
ExecuteFunc func) const;
|
||||
|
||||
Namespaces getNamespaces(const std::string & base_namespace) const;
|
||||
|
||||
Namespaces parseNamespaces(DB::ReadBuffer & buf, const std::string & base_namespace) const;
|
||||
|
||||
Tables getTables(const std::string & base_namespace, size_t limit = 0) const;
|
||||
|
||||
Tables parseTables(DB::ReadBuffer & buf, const std::string & base_namespace, size_t limit) const;
|
||||
|
||||
bool getTableMetadataImpl(
|
||||
const std::string & namespace_name,
|
||||
const std::string & table_name,
|
||||
TableMetadata & result) const;
|
||||
|
||||
Config loadConfig();
|
||||
std::string retrieveAccessToken() const;
|
||||
DB::HTTPHeaderEntries getHeaders(bool update_token = false) const;
|
||||
static void parseCatalogConfigurationSettings(const Poco::JSON::Object::Ptr & object, Config & result);
|
||||
};
|
||||
|
||||
}
|
@ -36,6 +36,10 @@ void registerDatabaseS3(DatabaseFactory & factory);
|
||||
void registerDatabaseHDFS(DatabaseFactory & factory);
|
||||
#endif
|
||||
|
||||
#if USE_AVRO
|
||||
void registerDatabaseIceberg(DatabaseFactory & factory);
|
||||
#endif
|
||||
|
||||
void registerDatabases()
|
||||
{
|
||||
auto & factory = DatabaseFactory::instance();
|
||||
@ -68,5 +72,9 @@ void registerDatabases()
|
||||
#if USE_HDFS
|
||||
registerDatabaseHDFS(factory);
|
||||
#endif
|
||||
|
||||
#if USE_AVRO
|
||||
registerDatabaseIceberg(factory);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
@ -70,9 +70,6 @@ IcebergMetadata::IcebergMetadata(
|
||||
{
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
enum class ManifestEntryStatus : uint8_t
|
||||
{
|
||||
EXISTING = 0,
|
||||
@ -248,7 +245,7 @@ DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & t
|
||||
|
||||
}
|
||||
|
||||
std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, bool ignore_schema_evolution)
|
||||
std::pair<NamesAndTypesList, Int32> IcebergMetadata::parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version, bool ignore_schema_evolution)
|
||||
{
|
||||
Poco::JSON::Object::Ptr schema;
|
||||
Int32 current_schema_id;
|
||||
@ -313,9 +310,9 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
|
||||
for (size_t i = 0; i != fields->size(); ++i)
|
||||
{
|
||||
auto field = fields->getObject(static_cast<UInt32>(i));
|
||||
auto name = field->getValue<String>("name");
|
||||
auto column_name = field->getValue<String>("name");
|
||||
bool required = field->getValue<bool>("required");
|
||||
names_and_types.push_back({name, getFieldType(field, "type", required)});
|
||||
names_and_types.push_back({column_name, getFieldType(field, "type", required)});
|
||||
}
|
||||
|
||||
return {std::move(names_and_types), current_schema_id};
|
||||
@ -380,8 +377,6 @@ std::pair<Int32, String> getMetadataFileAndVersion(
|
||||
return *std::max_element(metadata_files_with_versions.begin(), metadata_files_with_versions.end());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
DataLakeMetadataPtr
|
||||
IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context)
|
||||
{
|
||||
|
@ -96,6 +96,11 @@ public:
|
||||
|
||||
static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context);
|
||||
|
||||
static std::pair<NamesAndTypesList, Int32> parseTableSchema(
|
||||
const Poco::JSON::Object::Ptr & metadata_object,
|
||||
int format_version,
|
||||
bool ignore_schema_evolution);
|
||||
|
||||
private:
|
||||
size_t getVersion() const { return metadata_version; }
|
||||
|
||||
|
@ -84,7 +84,8 @@ StorageObjectStorage::StorageObjectStorage(
|
||||
std::optional<FormatSettings> format_settings_,
|
||||
LoadingStrictnessLevel mode,
|
||||
bool distributed_processing_,
|
||||
ASTPtr partition_by_)
|
||||
ASTPtr partition_by_,
|
||||
bool lazy_init)
|
||||
: IStorage(table_id_)
|
||||
, configuration(configuration_)
|
||||
, object_storage(object_storage_)
|
||||
@ -95,7 +96,8 @@ StorageObjectStorage::StorageObjectStorage(
|
||||
{
|
||||
try
|
||||
{
|
||||
configuration->update(object_storage, context);
|
||||
if (!lazy_init)
|
||||
configuration->update(object_storage, context);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
@ -59,7 +59,8 @@ public:
|
||||
std::optional<FormatSettings> format_settings_,
|
||||
LoadingStrictnessLevel mode,
|
||||
bool distributed_processing_ = false,
|
||||
ASTPtr partition_by_ = nullptr);
|
||||
ASTPtr partition_by_ = nullptr,
|
||||
bool lazy_init = false);
|
||||
|
||||
String getName() const override;
|
||||
|
||||
|
@ -0,0 +1,59 @@
|
||||
services:
|
||||
spark-iceberg:
|
||||
image: tabulario/spark-iceberg
|
||||
container_name: spark-iceberg
|
||||
build: spark/
|
||||
depends_on:
|
||||
- rest
|
||||
- minio
|
||||
environment:
|
||||
- AWS_ACCESS_KEY_ID=admin
|
||||
- AWS_SECRET_ACCESS_KEY=password
|
||||
- AWS_REGION=us-east-1
|
||||
ports:
|
||||
- 8080:8080
|
||||
- 10000:10000
|
||||
- 10001:10001
|
||||
rest:
|
||||
image: tabulario/iceberg-rest
|
||||
ports:
|
||||
- 8182:8181
|
||||
environment:
|
||||
- AWS_ACCESS_KEY_ID=minio
|
||||
- AWS_SECRET_ACCESS_KEY=minio123
|
||||
- AWS_REGION=us-east-1
|
||||
- CATALOG_WAREHOUSE=s3://iceberg_data/
|
||||
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
|
||||
- CATALOG_S3_ENDPOINT=http://minio:9000
|
||||
minio:
|
||||
image: minio/minio
|
||||
container_name: minio
|
||||
environment:
|
||||
- MINIO_ROOT_USER=minio
|
||||
- MINIO_ROOT_PASSWORD=minio123
|
||||
- MINIO_DOMAIN=minio
|
||||
networks:
|
||||
default:
|
||||
aliases:
|
||||
- warehouse.minio
|
||||
ports:
|
||||
- 9001:9001
|
||||
- 9002:9000
|
||||
command: ["server", "/data", "--console-address", ":9001"]
|
||||
mc:
|
||||
depends_on:
|
||||
- minio
|
||||
image: minio/mc
|
||||
container_name: mc
|
||||
environment:
|
||||
- AWS_ACCESS_KEY_ID=minio
|
||||
- AWS_SECRET_ACCESS_KEY=minio123
|
||||
- AWS_REGION=us-east-1
|
||||
entrypoint: >
|
||||
/bin/sh -c "
|
||||
until (/usr/bin/mc config host add minio http://minio:9000 minio minio123) do echo '...waiting...' && sleep 1; done;
|
||||
/usr/bin/mc rm -r --force minio/warehouse;
|
||||
/usr/bin/mc mb minio/warehouse --ignore-existing;
|
||||
/usr/bin/mc policy set public minio/warehouse;
|
||||
tail -f /dev/null
|
||||
"
|
@ -14,6 +14,10 @@ services:
|
||||
depends_on:
|
||||
- proxy1
|
||||
- proxy2
|
||||
networks:
|
||||
default:
|
||||
aliases:
|
||||
- warehouse.minio
|
||||
|
||||
# HTTP proxies for Minio.
|
||||
proxy1:
|
||||
|
@ -568,6 +568,7 @@ class ClickHouseCluster:
|
||||
self.resolver_logs_dir = os.path.join(self.instances_dir, "resolver")
|
||||
|
||||
self.spark_session = None
|
||||
self.with_iceberg_catalog = False
|
||||
|
||||
self.with_azurite = False
|
||||
self.azurite_container = "azurite-container"
|
||||
@ -1464,6 +1465,26 @@ class ClickHouseCluster:
|
||||
)
|
||||
return self.base_minio_cmd
|
||||
|
||||
def setup_iceberg_catalog_cmd(
|
||||
self, instance, env_variables, docker_compose_yml_dir
|
||||
):
|
||||
self.base_cmd.extend(
|
||||
[
|
||||
"--file",
|
||||
p.join(
|
||||
docker_compose_yml_dir, "docker_compose_iceberg_rest_catalog.yml"
|
||||
),
|
||||
]
|
||||
)
|
||||
self.base_iceberg_catalog_cmd = self.compose_cmd(
|
||||
"--env-file",
|
||||
instance.env_file,
|
||||
"--file",
|
||||
p.join(docker_compose_yml_dir, "docker_compose_iceberg_rest_catalog.yml"),
|
||||
)
|
||||
return self.base_iceberg_catalog_cmd
|
||||
# return self.base_minio_cmd
|
||||
|
||||
def setup_azurite_cmd(self, instance, env_variables, docker_compose_yml_dir):
|
||||
self.with_azurite = True
|
||||
env_variables["AZURITE_PORT"] = str(self.azurite_port)
|
||||
@ -1630,6 +1651,7 @@ class ClickHouseCluster:
|
||||
with_hive=False,
|
||||
with_coredns=False,
|
||||
with_prometheus=False,
|
||||
with_iceberg_catalog=False,
|
||||
handle_prometheus_remote_write=False,
|
||||
handle_prometheus_remote_read=False,
|
||||
use_old_analyzer=None,
|
||||
@ -1733,6 +1755,7 @@ class ClickHouseCluster:
|
||||
with_coredns=with_coredns,
|
||||
with_cassandra=with_cassandra,
|
||||
with_ldap=with_ldap,
|
||||
with_iceberg_catalog=with_iceberg_catalog,
|
||||
use_old_analyzer=use_old_analyzer,
|
||||
server_bin_path=self.server_bin_path,
|
||||
odbc_bridge_bin_path=self.odbc_bridge_bin_path,
|
||||
@ -1922,6 +1945,13 @@ class ClickHouseCluster:
|
||||
self.setup_minio_cmd(instance, env_variables, docker_compose_yml_dir)
|
||||
)
|
||||
|
||||
if with_iceberg_catalog and not self.with_iceberg_catalog:
|
||||
cmds.append(
|
||||
self.setup_iceberg_catalog_cmd(
|
||||
instance, env_variables, docker_compose_yml_dir
|
||||
)
|
||||
)
|
||||
|
||||
if with_azurite and not self.with_azurite:
|
||||
cmds.append(
|
||||
self.setup_azurite_cmd(instance, env_variables, docker_compose_yml_dir)
|
||||
@ -3390,6 +3420,7 @@ class ClickHouseInstance:
|
||||
with_coredns,
|
||||
with_cassandra,
|
||||
with_ldap,
|
||||
with_iceberg_catalog,
|
||||
use_old_analyzer,
|
||||
server_bin_path,
|
||||
odbc_bridge_bin_path,
|
||||
|
0
tests/integration/test_database_iceberg/__init__.py
Normal file
0
tests/integration/test_database_iceberg/__init__.py
Normal file
306
tests/integration/test_database_iceberg/test.py
Normal file
306
tests/integration/test_database_iceberg/test.py
Normal file
@ -0,0 +1,306 @@
|
||||
import glob
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
import uuid
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import pyarrow as pa
|
||||
import pytest
|
||||
import requests
|
||||
import urllib3
|
||||
from minio import Minio
|
||||
from pyiceberg.catalog import load_catalog
|
||||
from pyiceberg.partitioning import PartitionField, PartitionSpec
|
||||
from pyiceberg.schema import Schema
|
||||
from pyiceberg.table.sorting import SortField, SortOrder
|
||||
from pyiceberg.transforms import DayTransform, IdentityTransform
|
||||
from pyiceberg.types import (
|
||||
DoubleType,
|
||||
FloatType,
|
||||
NestedField,
|
||||
StringType,
|
||||
StructType,
|
||||
TimestampType,
|
||||
)
|
||||
|
||||
from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm
|
||||
from helpers.s3_tools import get_file_contents, list_s3_objects, prepare_s3_bucket
|
||||
from helpers.test_tools import TSV, csv_compare
|
||||
|
||||
BASE_URL = "http://rest:8181/v1"
|
||||
BASE_URL_LOCAL = "http://localhost:8182/v1"
|
||||
BASE_URL_LOCAL_RAW = "http://localhost:8182"
|
||||
|
||||
CATALOG_NAME = "demo"
|
||||
|
||||
DEFAULT_SCHEMA = Schema(
|
||||
NestedField(
|
||||
field_id=1, name="datetime", field_type=TimestampType(), required=False
|
||||
),
|
||||
NestedField(field_id=2, name="symbol", field_type=StringType(), required=False),
|
||||
NestedField(field_id=3, name="bid", field_type=DoubleType(), required=False),
|
||||
NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
|
||||
NestedField(
|
||||
field_id=5,
|
||||
name="details",
|
||||
field_type=StructType(
|
||||
NestedField(
|
||||
field_id=4,
|
||||
name="created_by",
|
||||
field_type=StringType(),
|
||||
required=False,
|
||||
),
|
||||
),
|
||||
required=False,
|
||||
),
|
||||
)
|
||||
|
||||
DEFAULT_CREATE_TABLE = "CREATE TABLE {}.`{}.{}`\\n(\\n `datetime` Nullable(DateTime64(6)),\\n `symbol` Nullable(String),\\n `bid` Nullable(Float64),\\n `ask` Nullable(Float64),\\n `details` Tuple(created_by Nullable(String))\\n)\\nENGINE = Iceberg(\\'http://minio:9000/warehouse/data/\\', \\'minio\\', \\'[HIDDEN]\\')\n"
|
||||
|
||||
DEFAULT_PARTITION_SPEC = PartitionSpec(
|
||||
PartitionField(
|
||||
source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day"
|
||||
)
|
||||
)
|
||||
|
||||
DEFAULT_SORT_ORDER = SortOrder(SortField(source_id=2, transform=IdentityTransform()))
|
||||
|
||||
|
||||
def list_namespaces():
|
||||
response = requests.get(f"{BASE_URL_LOCAL}/namespaces")
|
||||
if response.status_code == 200:
|
||||
return response.json()
|
||||
else:
|
||||
raise Exception(f"Failed to list namespaces: {response.status_code}")
|
||||
|
||||
|
||||
def load_catalog_impl(started_cluster):
|
||||
return load_catalog(
|
||||
CATALOG_NAME,
|
||||
**{
|
||||
"uri": BASE_URL_LOCAL_RAW,
|
||||
"type": "rest",
|
||||
"s3.endpoint": f"http://localhost:9002",
|
||||
"s3.access-key-id": "minio",
|
||||
"s3.secret-access-key": "minio123",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def create_table(
|
||||
catalog,
|
||||
namespace,
|
||||
table,
|
||||
schema=DEFAULT_SCHEMA,
|
||||
partition_spec=DEFAULT_PARTITION_SPEC,
|
||||
sort_order=DEFAULT_SORT_ORDER,
|
||||
):
|
||||
return catalog.create_table(
|
||||
identifier=f"{namespace}.{table}",
|
||||
schema=schema,
|
||||
location=f"s3://warehouse/data",
|
||||
partition_spec=partition_spec,
|
||||
sort_order=sort_order,
|
||||
)
|
||||
|
||||
|
||||
def generate_record():
|
||||
return {
|
||||
"datetime": datetime.now(),
|
||||
"symbol": str("kek"),
|
||||
"bid": round(random.uniform(100, 200), 2),
|
||||
"ask": round(random.uniform(200, 300), 2),
|
||||
"details": {"created_by": "Alice Smith"},
|
||||
}
|
||||
|
||||
|
||||
def create_clickhouse_iceberg_database(started_cluster, node, name):
|
||||
node.query(
|
||||
f"""
|
||||
DROP DATABASE IF EXISTS {name};
|
||||
CREATE DATABASE {name} ENGINE = Iceberg('{BASE_URL}', 'minio', 'minio123')
|
||||
SETTINGS catalog_type = 'rest', storage_endpoint = 'http://minio:9000/'
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def print_objects():
|
||||
minio_client = Minio(
|
||||
f"localhost:9002",
|
||||
access_key="minio",
|
||||
secret_key="minio123",
|
||||
secure=False,
|
||||
http_client=urllib3.PoolManager(cert_reqs="CERT_NONE"),
|
||||
)
|
||||
|
||||
objects = list(minio_client.list_objects("warehouse", "", recursive=True))
|
||||
names = [x.object_name for x in objects]
|
||||
names.sort()
|
||||
for name in names:
|
||||
print(f"Found object: {name}")
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
cluster.add_instance(
|
||||
"node1",
|
||||
main_configs=[],
|
||||
user_configs=[],
|
||||
stay_alive=True,
|
||||
with_iceberg_catalog=True,
|
||||
)
|
||||
|
||||
logging.info("Starting cluster...")
|
||||
cluster.start()
|
||||
|
||||
# TODO: properly wait for container
|
||||
time.sleep(10)
|
||||
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_list_tables(started_cluster):
|
||||
node = started_cluster.instances["node1"]
|
||||
|
||||
root_namespace = f"clickhouse_{uuid.uuid4()}"
|
||||
namespace_1 = f"{root_namespace}.testA.A"
|
||||
namespace_2 = f"{root_namespace}.testB.B"
|
||||
namespace_1_tables = ["tableA", "tableB"]
|
||||
namespace_2_tables = ["tableC", "tableD"]
|
||||
|
||||
catalog = load_catalog_impl(started_cluster)
|
||||
|
||||
for namespace in [namespace_1, namespace_2]:
|
||||
catalog.create_namespace(namespace)
|
||||
|
||||
found = False
|
||||
for namespace_list in list_namespaces()["namespaces"]:
|
||||
if root_namespace == namespace_list[0]:
|
||||
found = True
|
||||
break
|
||||
assert found
|
||||
|
||||
found = False
|
||||
for namespace_list in catalog.list_namespaces():
|
||||
if root_namespace == namespace_list[0]:
|
||||
found = True
|
||||
break
|
||||
assert found
|
||||
|
||||
for namespace in [namespace_1, namespace_2]:
|
||||
assert len(catalog.list_tables(namespace)) == 0
|
||||
|
||||
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
|
||||
|
||||
tables_list = ""
|
||||
for table in namespace_1_tables:
|
||||
create_table(catalog, namespace_1, table)
|
||||
if len(tables_list) > 0:
|
||||
tables_list += "\n"
|
||||
tables_list += f"{namespace_1}.{table}"
|
||||
|
||||
for table in namespace_2_tables:
|
||||
create_table(catalog, namespace_2, table)
|
||||
if len(tables_list) > 0:
|
||||
tables_list += "\n"
|
||||
tables_list += f"{namespace_2}.{table}"
|
||||
|
||||
assert (
|
||||
tables_list
|
||||
== node.query(
|
||||
f"SELECT name FROM system.tables WHERE database = '{CATALOG_NAME}' and name ILIKE '{root_namespace}%' ORDER BY name"
|
||||
).strip()
|
||||
)
|
||||
node.restart_clickhouse()
|
||||
assert (
|
||||
tables_list
|
||||
== node.query(
|
||||
f"SELECT name FROM system.tables WHERE database = '{CATALOG_NAME}' and name ILIKE '{root_namespace}%' ORDER BY name"
|
||||
).strip()
|
||||
)
|
||||
|
||||
expected = DEFAULT_CREATE_TABLE.format(CATALOG_NAME, namespace_2, "tableC")
|
||||
assert expected == node.query(
|
||||
f"SHOW CREATE TABLE {CATALOG_NAME}.`{namespace_2}.tableC`"
|
||||
)
|
||||
|
||||
|
||||
def test_many_namespaces(started_cluster):
|
||||
node = started_cluster.instances["node1"]
|
||||
root_namespace_1 = f"A_{uuid.uuid4()}"
|
||||
root_namespace_2 = f"B_{uuid.uuid4()}"
|
||||
namespaces = [
|
||||
f"{root_namespace_1}",
|
||||
f"{root_namespace_1}.B.C",
|
||||
f"{root_namespace_1}.B.C.D",
|
||||
f"{root_namespace_1}.B.C.D.E",
|
||||
f"{root_namespace_2}",
|
||||
f"{root_namespace_2}.C",
|
||||
f"{root_namespace_2}.CC",
|
||||
]
|
||||
tables = ["A", "B", "C"]
|
||||
catalog = load_catalog_impl(started_cluster)
|
||||
|
||||
for namespace in namespaces:
|
||||
catalog.create_namespace(namespace)
|
||||
for table in tables:
|
||||
create_table(catalog, namespace, table)
|
||||
|
||||
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
|
||||
|
||||
for namespace in namespaces:
|
||||
for table in tables:
|
||||
table_name = f"{namespace}.{table}"
|
||||
assert int(
|
||||
node.query(
|
||||
f"SELECT count() FROM system.tables WHERE database = '{CATALOG_NAME}' and name = '{table_name}'"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def test_select(started_cluster):
|
||||
node = started_cluster.instances["node1"]
|
||||
|
||||
test_ref = f"test_list_tables_{uuid.uuid4()}"
|
||||
table_name = f"{test_ref}_table"
|
||||
root_namespace = f"{test_ref}_namespace"
|
||||
|
||||
namespace = f"{root_namespace}.A.B.C"
|
||||
namespaces_to_create = [
|
||||
root_namespace,
|
||||
f"{root_namespace}.A",
|
||||
f"{root_namespace}.A.B",
|
||||
f"{root_namespace}.A.B.C",
|
||||
]
|
||||
|
||||
catalog = load_catalog_impl(started_cluster)
|
||||
|
||||
for namespace in namespaces_to_create:
|
||||
catalog.create_namespace(namespace)
|
||||
assert len(catalog.list_tables(namespace)) == 0
|
||||
|
||||
table = create_table(catalog, namespace, table_name)
|
||||
|
||||
num_rows = 10
|
||||
data = [generate_record() for _ in range(num_rows)]
|
||||
df = pa.Table.from_pylist(data)
|
||||
table.append(df)
|
||||
|
||||
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
|
||||
|
||||
expected = DEFAULT_CREATE_TABLE.format(CATALOG_NAME, namespace, table_name)
|
||||
assert expected == node.query(
|
||||
f"SHOW CREATE TABLE {CATALOG_NAME}.`{namespace}.{table_name}`"
|
||||
)
|
||||
|
||||
assert num_rows == int(
|
||||
node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace}.{table_name}`")
|
||||
)
|
Loading…
Reference in New Issue
Block a user