This commit is contained in:
kssenii 2024-11-13 20:30:33 +01:00
parent 58edfbe113
commit 6c3003c6ce
6 changed files with 73 additions and 72 deletions

View File

@ -101,6 +101,13 @@ std::shared_ptr<StorageObjectStorage::Configuration> DatabaseIceberg::getConfigu
} }
} }
std::string DatabaseIceberg::getStorageEndpointForTable(const Iceberg::TableMetadata & table_metadata) const
{
return std::filesystem::path(settings[DatabaseIcebergSetting::storage_endpoint].value)
/ table_metadata.getPath()
/ "";
}
bool DatabaseIceberg::empty() const bool DatabaseIceberg::empty() const
{ {
return getCatalog(Context::getGlobalContextInstance())->empty(); return getCatalog(Context::getGlobalContextInstance())->empty();
@ -115,7 +122,7 @@ bool DatabaseIceberg::isTableExist(const String & name, ContextPtr context_) con
StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_) const StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_) const
{ {
auto catalog = getCatalog(context_); auto catalog = getCatalog(context_);
auto table_metadata = Iceberg::ICatalog::TableMetadata().withLocation().withSchema(); auto table_metadata = Iceberg::TableMetadata().withLocation().withSchema();
auto [namespace_name, table_name] = parseTableName(name); auto [namespace_name, table_name] = parseTableName(name);
if (!catalog->tryGetTableMetadata(namespace_name, table_name, table_metadata)) if (!catalog->tryGetTableMetadata(namespace_name, table_name, table_metadata))
@ -126,17 +133,16 @@ StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_
ASTs args = storage->engine->arguments->children; ASTs args = storage->engine->arguments->children;
/// Replace Iceberg Catalog endpoint with storage path endpoint of requested table. /// Replace Iceberg Catalog endpoint with storage path endpoint of requested table.
auto table_endpoint = std::filesystem::path(settings[DatabaseIcebergSetting::storage_endpoint].value) auto table_endpoint = getStorageEndpointForTable(table_metadata);
/ table_metadata.getPath() args[0] = std::make_shared<ASTLiteral>(table_endpoint);
/ "";
args[0] = std::make_shared<ASTLiteral>(table_endpoint.string()); LOG_TEST(log, "Using table endpoint: {}", table_endpoint);
LOG_TEST(log, "Using table endpoint: {}", table_endpoint.string());
const auto columns = ColumnsDescription(table_metadata.getSchema()); const auto columns = ColumnsDescription(table_metadata.getSchema());
const auto configuration = getConfiguration(); const auto configuration = getConfiguration();
/// with_table_structure = false: because there will be no table structure in table definition AST.
/// with_table_structure = false: because there will be
/// no table structure in table definition AST.
StorageObjectStorage::Configuration::initialize(*configuration, args, context_, /* with_table_structure */false); StorageObjectStorage::Configuration::initialize(*configuration, args, context_, /* with_table_structure */false);
return std::make_shared<StorageObjectStorage>( return std::make_shared<StorageObjectStorage>(
@ -184,8 +190,9 @@ ASTPtr DatabaseIceberg::getCreateTableQueryImpl(
bool /* throw_on_error */) const bool /* throw_on_error */) const
{ {
auto catalog = getCatalog(context_); auto catalog = getCatalog(context_);
auto table_metadata = Iceberg::ICatalog::TableMetadata().withLocation().withSchema(); auto table_metadata = Iceberg::TableMetadata().withLocation().withSchema();
auto [namespace_name, table_name] = parseTableName(name);
const auto [namespace_name, table_name] = parseTableName(name);
catalog->getTableMetadata(namespace_name, table_name, table_metadata); catalog->getTableMetadata(namespace_name, table_name, table_metadata);
auto create_table_query = std::make_shared<ASTCreateQuery>(); auto create_table_query = std::make_shared<ASTCreateQuery>();
@ -203,7 +210,6 @@ ASTPtr DatabaseIceberg::getCreateTableQueryImpl(
columns_declare_list->set(columns_declare_list->columns, columns_expression_list); columns_declare_list->set(columns_declare_list->columns, columns_expression_list);
create_table_query->set(create_table_query->columns_list, columns_declare_list); create_table_query->set(create_table_query->columns_list, columns_declare_list);
/// init create query.
create_table_query->setTable(name); create_table_query->setTable(name);
create_table_query->setDatabase(getDatabaseName()); create_table_query->setDatabase(getDatabaseName());
@ -217,10 +223,14 @@ ASTPtr DatabaseIceberg::getCreateTableQueryImpl(
auto storage_engine_arguments = storage->engine->arguments; auto storage_engine_arguments = storage->engine->arguments;
if (storage_engine_arguments->children.empty()) if (storage_engine_arguments->children.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected number of arguments: {}", storage_engine_arguments->children.size()); {
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Unexpected number of arguments: {}",
storage_engine_arguments->children.size());
}
auto table_endpoint = std::filesystem::path(settings[DatabaseIcebergSetting::storage_endpoint].value) / table_metadata.getPath(); auto table_endpoint = getStorageEndpointForTable(table_metadata);
storage_engine_arguments->children[0] = std::make_shared<ASTLiteral>(table_endpoint.string()); storage_engine_arguments->children[0] = std::make_shared<ASTLiteral>(table_endpoint);
return create_table_query; return create_table_query;
} }

View File

@ -10,6 +10,10 @@
namespace DB namespace DB
{ {
/// TODO:
/// - http basic auth for catalog
/// - tests with azure, hdfs, local
class DatabaseIceberg final : public IDatabase, WithContext class DatabaseIceberg final : public IDatabase, WithContext
{ {
public: public:
@ -20,7 +24,6 @@ public:
ASTPtr database_engine_definition_); ASTPtr database_engine_definition_);
String getEngineName() const override { return "Iceberg"; } String getEngineName() const override { return "Iceberg"; }
String getMetadataPath() const override { return ""; }
bool canContainMergeTreeTables() const override { return false; } bool canContainMergeTreeTables() const override { return false; }
bool canContainDistributedTables() const override { return false; } bool canContainDistributedTables() const override { return false; }
@ -44,13 +47,18 @@ protected:
ASTPtr getCreateTableQueryImpl(const String & table_name, ContextPtr context, bool throw_on_error) const override; ASTPtr getCreateTableQueryImpl(const String & table_name, ContextPtr context, bool throw_on_error) const override;
private: private:
/// Iceberg Catalog url.
const std::string url; const std::string url;
/// SETTINGS from CREATE query.
const DatabaseIcebergSettings settings; const DatabaseIcebergSettings settings;
/// Database engine definition taken from initial CREATE DATABASE query.
const ASTPtr database_engine_definition; const ASTPtr database_engine_definition;
const LoggerPtr log; const LoggerPtr log;
std::unique_ptr<Iceberg::ICatalog> getCatalog(ContextPtr context_) const; std::unique_ptr<Iceberg::ICatalog> getCatalog(ContextPtr context_) const;
std::shared_ptr<StorageObjectStorage::Configuration> getConfiguration() const; std::shared_ptr<StorageObjectStorage::Configuration> getConfiguration() const;
std::string getStorageEndpointForTable(const Iceberg::TableMetadata & table_metadata) const;
}; };
} }

View File

@ -10,7 +10,7 @@ namespace DB::ErrorCodes
namespace Iceberg namespace Iceberg
{ {
std::string ICatalog::TableMetadata::getPath() const std::string TableMetadata::getPath() const
{ {
if (!with_location) if (!with_location)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data location was not requested"); throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data location was not requested");
@ -21,7 +21,7 @@ std::string ICatalog::TableMetadata::getPath() const
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected path format: {}", location); throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected path format: {}", location);
} }
const DB::NamesAndTypesList & ICatalog::TableMetadata::getSchema() const const DB::NamesAndTypesList & TableMetadata::getSchema() const
{ {
if (!with_schema) if (!with_schema)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data location was not requested"); throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Data location was not requested");

View File

@ -5,22 +5,41 @@
namespace Iceberg namespace Iceberg
{ {
class TableMetadata
{
friend class RestCatalog;
public:
TableMetadata() = default;
std::string getPath() const;
const DB::NamesAndTypesList & getSchema() const;
TableMetadata & withLocation() { with_location = true; return *this; }
TableMetadata & withSchema() { with_schema = true; return *this; }
private:
/// starts with s3://, file://, etc
std::string location;
/// column names and types
DB::NamesAndTypesList schema;
bool with_location = false;
bool with_schema = false;
};
class ICatalog class ICatalog
{ {
public: public:
using Namespace = std::string; using Namespaces = std::vector<std::string>;
using Namespaces = std::vector<Namespace>; using Tables = std::vector<std::string>;
using Table = std::string;
using Tables = std::vector<Table>;
class TableMetadata;
explicit ICatalog(const std::string & catalog_name_) : catalog_name(catalog_name_) {} explicit ICatalog(const std::string & catalog_name_) : catalog_name(catalog_name_) {}
virtual ~ICatalog() = default; virtual ~ICatalog() = default;
virtual bool existsCatalog() const = 0;
virtual bool empty() const = 0; virtual bool empty() const = 0;
virtual Tables getTables() const = 0; virtual Tables getTables() const = 0;
@ -43,27 +62,4 @@ protected:
const std::string catalog_name; const std::string catalog_name;
}; };
class ICatalog::TableMetadata
{
friend class RestCatalog;
public:
TableMetadata() = default;
std::string getPath() const;
const DB::NamesAndTypesList & getSchema() const;
TableMetadata & withLocation() { with_location = true; return *this; }
TableMetadata & withSchema() { with_schema = true; return *this; }
private:
/// starts with s3://, file://, etc
std::string location;
/// column names and types
DB::NamesAndTypesList schema;
bool with_location = false;
bool with_schema = false;
};
} }

View File

@ -38,20 +38,6 @@ RestCatalog::RestCatalog(
{ {
} }
bool RestCatalog::existsCatalog() const
{
try
{
createReadBuffer(namespaces_endpoint)->eof();
return true;
}
catch (...)
{
DB::tryLogCurrentException(log);
return false;
}
}
bool RestCatalog::empty() const bool RestCatalog::empty() const
{ {
try try
@ -108,7 +94,7 @@ RestCatalog::Tables RestCatalog::getTables() const
return tables; return tables;
} }
void RestCatalog::getNamespacesRecursive(const Namespace & base_namespace, Namespaces & result, StopCondition stop_condition) const void RestCatalog::getNamespacesRecursive(const std::string & base_namespace, Namespaces & result, StopCondition stop_condition) const
{ {
auto namespaces = getNamespaces(base_namespace); auto namespaces = getNamespaces(base_namespace);
result.reserve(result.size() + namespaces.size()); result.reserve(result.size() + namespaces.size());
@ -194,16 +180,19 @@ RestCatalog::Namespaces RestCatalog::parseNamespaces(DB::ReadBuffer & buf, const
if (current_namespace_array->size() == 0) if (current_namespace_array->size() == 0)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Expected namespace array to be non-empty"); throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Expected namespace array to be non-empty");
const int current_namespace_idx = static_cast<int>(current_namespace_array->size()) - 1; const int idx = static_cast<int>(current_namespace_array->size()) - 1;
const auto current_namespace = current_namespace_array->get(current_namespace_idx).extract<String>(); const auto current_namespace = current_namespace_array->get(idx).extract<String>();
const auto full_namespace = base_namespace.empty() ? current_namespace : base_namespace + "." + current_namespace; const auto full_namespace = base_namespace.empty()
? current_namespace
: base_namespace + "." + current_namespace;
namespaces.push_back(full_namespace); namespaces.push_back(full_namespace);
} }
return namespaces; return namespaces;
} }
RestCatalog::Tables RestCatalog::getTables(const Namespace & base_namespace, size_t limit) const RestCatalog::Tables RestCatalog::getTables(const std::string & base_namespace, size_t limit) const
{ {
const auto endpoint = std::string(namespaces_endpoint) + "/" + base_namespace + "/tables"; const auto endpoint = std::string(namespaces_endpoint) + "/" + base_namespace + "/tables";
auto buf = createReadBuffer(endpoint); auto buf = createReadBuffer(endpoint);

View File

@ -23,8 +23,6 @@ public:
~RestCatalog() override = default; ~RestCatalog() override = default;
bool existsCatalog() const override;
bool empty() const override; bool empty() const override;
Tables getTables() const override; Tables getTables() const override;
@ -51,13 +49,13 @@ private:
Poco::URI::QueryParameters createParentNamespaceParams(const std::string & base_namespace) const; Poco::URI::QueryParameters createParentNamespaceParams(const std::string & base_namespace) const;
using StopCondition = std::function<bool(const std::string & namespace_name)>; using StopCondition = std::function<bool(const std::string & namespace_name)>;
void getNamespacesRecursive(const Namespace & base_namespace, Namespaces & result, StopCondition stop_condition) const; void getNamespacesRecursive(const std::string & base_namespace, Namespaces & result, StopCondition stop_condition) const;
Namespaces getNamespaces(const Namespace & base_namespace) const; Namespaces getNamespaces(const std::string & base_namespace) const;
Namespaces parseNamespaces(DB::ReadBuffer & buf, const std::string & base_namespace) const; Namespaces parseNamespaces(DB::ReadBuffer & buf, const std::string & base_namespace) const;
Tables getTables(const Namespace & base_namespace, size_t limit = 0) const; Tables getTables(const std::string & base_namespace, size_t limit = 0) const;
Tables parseTables(DB::ReadBuffer & buf, const std::string & base_namespace, size_t limit) const; Tables parseTables(DB::ReadBuffer & buf, const std::string & base_namespace, size_t limit) const;