diff --git a/src/Databases/Iceberg/DatabaseIceberg.cpp b/src/Databases/Iceberg/DatabaseIceberg.cpp index 81c39554635..311ae9bf2dc 100644 --- a/src/Databases/Iceberg/DatabaseIceberg.cpp +++ b/src/Databases/Iceberg/DatabaseIceberg.cpp @@ -101,6 +101,13 @@ std::shared_ptr DatabaseIceberg::getConfigu } } +std::string DatabaseIceberg::getStorageEndpointForTable(const Iceberg::TableMetadata & table_metadata) const +{ + return std::filesystem::path(settings[DatabaseIcebergSetting::storage_endpoint].value) + / table_metadata.getPath() + / ""; +} + bool DatabaseIceberg::empty() const { return getCatalog(Context::getGlobalContextInstance())->empty(); @@ -115,7 +122,7 @@ bool DatabaseIceberg::isTableExist(const String & name, ContextPtr context_) con StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_) const { auto catalog = getCatalog(context_); - auto table_metadata = Iceberg::ICatalog::TableMetadata().withLocation().withSchema(); + auto table_metadata = Iceberg::TableMetadata().withLocation().withSchema(); auto [namespace_name, table_name] = parseTableName(name); if (!catalog->tryGetTableMetadata(namespace_name, table_name, table_metadata)) @@ -126,17 +133,16 @@ StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_ ASTs args = storage->engine->arguments->children; /// Replace Iceberg Catalog endpoint with storage path endpoint of requested table. - auto table_endpoint = std::filesystem::path(settings[DatabaseIcebergSetting::storage_endpoint].value) - / table_metadata.getPath() - / ""; + auto table_endpoint = getStorageEndpointForTable(table_metadata); + args[0] = std::make_shared(table_endpoint); - args[0] = std::make_shared(table_endpoint.string()); - - LOG_TEST(log, "Using table endpoint: {}", table_endpoint.string()); + LOG_TEST(log, "Using table endpoint: {}", table_endpoint); const auto columns = ColumnsDescription(table_metadata.getSchema()); const auto configuration = getConfiguration(); - /// with_table_structure = false: because there will be no table structure in table definition AST. + + /// with_table_structure = false: because there will be + /// no table structure in table definition AST. StorageObjectStorage::Configuration::initialize(*configuration, args, context_, /* with_table_structure */false); return std::make_shared( @@ -184,8 +190,9 @@ ASTPtr DatabaseIceberg::getCreateTableQueryImpl( bool /* throw_on_error */) const { auto catalog = getCatalog(context_); - auto table_metadata = Iceberg::ICatalog::TableMetadata().withLocation().withSchema(); - auto [namespace_name, table_name] = parseTableName(name); + auto table_metadata = Iceberg::TableMetadata().withLocation().withSchema(); + + const auto [namespace_name, table_name] = parseTableName(name); catalog->getTableMetadata(namespace_name, table_name, table_metadata); auto create_table_query = std::make_shared(); @@ -203,7 +210,6 @@ ASTPtr DatabaseIceberg::getCreateTableQueryImpl( columns_declare_list->set(columns_declare_list->columns, columns_expression_list); create_table_query->set(create_table_query->columns_list, columns_declare_list); - /// init create query. create_table_query->setTable(name); create_table_query->setDatabase(getDatabaseName()); @@ -217,10 +223,14 @@ ASTPtr DatabaseIceberg::getCreateTableQueryImpl( auto storage_engine_arguments = storage->engine->arguments; if (storage_engine_arguments->children.empty()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected number of arguments: {}", storage_engine_arguments->children.size()); + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, "Unexpected number of arguments: {}", + storage_engine_arguments->children.size()); + } - auto table_endpoint = std::filesystem::path(settings[DatabaseIcebergSetting::storage_endpoint].value) / table_metadata.getPath(); - storage_engine_arguments->children[0] = std::make_shared(table_endpoint.string()); + auto table_endpoint = getStorageEndpointForTable(table_metadata); + storage_engine_arguments->children[0] = std::make_shared(table_endpoint); return create_table_query; } diff --git a/src/Databases/Iceberg/DatabaseIceberg.h b/src/Databases/Iceberg/DatabaseIceberg.h index 41ed17f58d0..5a87e3179af 100644 --- a/src/Databases/Iceberg/DatabaseIceberg.h +++ b/src/Databases/Iceberg/DatabaseIceberg.h @@ -10,6 +10,10 @@ namespace DB { +/// TODO: +/// - http basic auth for catalog +/// - tests with azure, hdfs, local + class DatabaseIceberg final : public IDatabase, WithContext { public: @@ -20,7 +24,6 @@ public: ASTPtr database_engine_definition_); String getEngineName() const override { return "Iceberg"; } - String getMetadataPath() const override { return ""; } bool canContainMergeTreeTables() const override { return false; } bool canContainDistributedTables() const override { return false; } @@ -44,13 +47,18 @@ protected: ASTPtr getCreateTableQueryImpl(const String & table_name, ContextPtr context, bool throw_on_error) const override; private: + /// Iceberg Catalog url. const std::string url; + /// SETTINGS from CREATE query. const DatabaseIcebergSettings settings; + /// Database engine definition taken from initial CREATE DATABASE query. const ASTPtr database_engine_definition; + const LoggerPtr log; std::unique_ptr getCatalog(ContextPtr context_) const; std::shared_ptr getConfiguration() const; + std::string getStorageEndpointForTable(const Iceberg::TableMetadata & table_metadata) const; }; } diff --git a/src/Databases/Iceberg/ICatalog.cpp b/src/Databases/Iceberg/ICatalog.cpp index a67d88fc555..d07d9613e1a 100644 --- a/src/Databases/Iceberg/ICatalog.cpp +++ b/src/Databases/Iceberg/ICatalog.cpp @@ -10,7 +10,7 @@ namespace DB::ErrorCodes namespace Iceberg { -std::string ICatalog::TableMetadata::getPath() const +std::string TableMetadata::getPath() const { if (!with_location) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data location was not requested"); @@ -21,7 +21,7 @@ std::string ICatalog::TableMetadata::getPath() const throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected path format: {}", location); } -const DB::NamesAndTypesList & ICatalog::TableMetadata::getSchema() const +const DB::NamesAndTypesList & TableMetadata::getSchema() const { if (!with_schema) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data location was not requested"); diff --git a/src/Databases/Iceberg/ICatalog.h b/src/Databases/Iceberg/ICatalog.h index a9e8c24b01b..7bd2cf3010c 100644 --- a/src/Databases/Iceberg/ICatalog.h +++ b/src/Databases/Iceberg/ICatalog.h @@ -5,22 +5,41 @@ namespace Iceberg { +class TableMetadata +{ +friend class RestCatalog; + +public: + TableMetadata() = default; + + std::string getPath() const; + + const DB::NamesAndTypesList & getSchema() const; + + TableMetadata & withLocation() { with_location = true; return *this; } + TableMetadata & withSchema() { with_schema = true; return *this; } + +private: + /// starts with s3://, file://, etc + std::string location; + /// column names and types + DB::NamesAndTypesList schema; + + bool with_location = false; + bool with_schema = false; +}; + + class ICatalog { public: - using Namespace = std::string; - using Namespaces = std::vector; - using Table = std::string; - using Tables = std::vector; - - class TableMetadata; + using Namespaces = std::vector; + using Tables = std::vector; explicit ICatalog(const std::string & catalog_name_) : catalog_name(catalog_name_) {} virtual ~ICatalog() = default; - virtual bool existsCatalog() const = 0; - virtual bool empty() const = 0; virtual Tables getTables() const = 0; @@ -43,27 +62,4 @@ protected: const std::string catalog_name; }; -class ICatalog::TableMetadata -{ -friend class RestCatalog; - -public: - TableMetadata() = default; - - std::string getPath() const; - - const DB::NamesAndTypesList & getSchema() const; - - TableMetadata & withLocation() { with_location = true; return *this; } - TableMetadata & withSchema() { with_schema = true; return *this; } - -private: - /// starts with s3://, file://, etc - std::string location; - /// column names and types - DB::NamesAndTypesList schema; - - bool with_location = false; - bool with_schema = false; -}; } diff --git a/src/Databases/Iceberg/RestCatalog.cpp b/src/Databases/Iceberg/RestCatalog.cpp index 86abec72c29..729ed641494 100644 --- a/src/Databases/Iceberg/RestCatalog.cpp +++ b/src/Databases/Iceberg/RestCatalog.cpp @@ -38,20 +38,6 @@ RestCatalog::RestCatalog( { } -bool RestCatalog::existsCatalog() const -{ - try - { - createReadBuffer(namespaces_endpoint)->eof(); - return true; - } - catch (...) - { - DB::tryLogCurrentException(log); - return false; - } -} - bool RestCatalog::empty() const { try @@ -108,7 +94,7 @@ RestCatalog::Tables RestCatalog::getTables() const return tables; } -void RestCatalog::getNamespacesRecursive(const Namespace & base_namespace, Namespaces & result, StopCondition stop_condition) const +void RestCatalog::getNamespacesRecursive(const std::string & base_namespace, Namespaces & result, StopCondition stop_condition) const { auto namespaces = getNamespaces(base_namespace); result.reserve(result.size() + namespaces.size()); @@ -194,16 +180,19 @@ RestCatalog::Namespaces RestCatalog::parseNamespaces(DB::ReadBuffer & buf, const if (current_namespace_array->size() == 0) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Expected namespace array to be non-empty"); - const int current_namespace_idx = static_cast(current_namespace_array->size()) - 1; - const auto current_namespace = current_namespace_array->get(current_namespace_idx).extract(); - const auto full_namespace = base_namespace.empty() ? current_namespace : base_namespace + "." + current_namespace; + const int idx = static_cast(current_namespace_array->size()) - 1; + const auto current_namespace = current_namespace_array->get(idx).extract(); + const auto full_namespace = base_namespace.empty() + ? current_namespace + : base_namespace + "." + current_namespace; + namespaces.push_back(full_namespace); } return namespaces; } -RestCatalog::Tables RestCatalog::getTables(const Namespace & base_namespace, size_t limit) const +RestCatalog::Tables RestCatalog::getTables(const std::string & base_namespace, size_t limit) const { const auto endpoint = std::string(namespaces_endpoint) + "/" + base_namespace + "/tables"; auto buf = createReadBuffer(endpoint); diff --git a/src/Databases/Iceberg/RestCatalog.h b/src/Databases/Iceberg/RestCatalog.h index e83aa8eabe9..12ff31ad8d1 100644 --- a/src/Databases/Iceberg/RestCatalog.h +++ b/src/Databases/Iceberg/RestCatalog.h @@ -23,8 +23,6 @@ public: ~RestCatalog() override = default; - bool existsCatalog() const override; - bool empty() const override; Tables getTables() const override; @@ -51,13 +49,13 @@ private: Poco::URI::QueryParameters createParentNamespaceParams(const std::string & base_namespace) const; using StopCondition = std::function; - void getNamespacesRecursive(const Namespace & base_namespace, Namespaces & result, StopCondition stop_condition) const; + void getNamespacesRecursive(const std::string & base_namespace, Namespaces & result, StopCondition stop_condition) const; - Namespaces getNamespaces(const Namespace & base_namespace) const; + Namespaces getNamespaces(const std::string & base_namespace) const; Namespaces parseNamespaces(DB::ReadBuffer & buf, const std::string & base_namespace) const; - Tables getTables(const Namespace & base_namespace, size_t limit = 0) const; + Tables getTables(const std::string & base_namespace, size_t limit = 0) const; Tables parseTables(DB::ReadBuffer & buf, const std::string & base_namespace, size_t limit) const;