This commit is contained in:
kssenii 2024-11-13 19:58:04 +01:00
parent 1e8ee2034a
commit 6dfd4ad942
12 changed files with 213 additions and 47 deletions

View File

@ -287,4 +287,11 @@ IMPLEMENT_SETTING_ENUM(
IMPLEMENT_SETTING_ENUM(DatabaseIcebergCatalogType, ErrorCodes::BAD_ARGUMENTS,
{{"rest", DatabaseIcebergCatalogType::REST}})
IMPLEMENT_SETTING_ENUM(DatabaseIcebergStorageType, ErrorCodes::BAD_ARGUMENTS,
{{"s3", DatabaseIcebergStorageType::S3},
{"azure", DatabaseIcebergStorageType::Azure},
{"hdfs", DatabaseIcebergStorageType::HDFS},
{"local", DatabaseIcebergStorageType::Local},
})
}

View File

@ -367,4 +367,15 @@ enum class DatabaseIcebergCatalogType : uint8_t
};
DECLARE_SETTING_ENUM(DatabaseIcebergCatalogType)
enum class DatabaseIcebergStorageType : uint8_t
{
S3,
Azure,
Local,
HDFS,
};
DECLARE_SETTING_ENUM(DatabaseIcebergStorageType)
}

View File

@ -6,7 +6,6 @@
#include <DataTypes/DataTypeString.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ConstraintsDescription.h>
#include <Storages/StorageNull.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
@ -29,6 +28,7 @@ namespace DatabaseIcebergSetting
{
extern const DatabaseIcebergSettingsString storage_endpoint;
extern const DatabaseIcebergSettingsDatabaseIcebergCatalogType catalog_type;
extern const DatabaseIcebergSettingsDatabaseIcebergStorageType storage_type;
}
namespace ErrorCodes
@ -38,6 +38,10 @@ namespace ErrorCodes
namespace
{
/// Parse a string, containing at least one dot, into a two substrings:
/// A.B.C.D.E -> A.B.C.D and E, where
/// `A.B.C.D` is a table "namespace".
/// `E` is a table name.
std::pair<std::string, std::string> parseTableName(const std::string & name)
{
auto pos = name.rfind('.');
@ -74,41 +78,73 @@ std::unique_ptr<Iceberg::ICatalog> DatabaseIceberg::getCatalog(ContextPtr contex
}
}
std::shared_ptr<StorageObjectStorage::Configuration> DatabaseIceberg::getConfiguration() const
{
switch (settings[DatabaseIcebergSetting::storage_type].value)
{
case DB::DatabaseIcebergStorageType::S3:
{
return std::make_shared<StorageS3IcebergConfiguration>();
}
case DB::DatabaseIcebergStorageType::Azure:
{
return std::make_shared<StorageAzureIcebergConfiguration>();
}
case DB::DatabaseIcebergStorageType::HDFS:
{
return std::make_shared<StorageHDFSIcebergConfiguration>();
}
case DB::DatabaseIcebergStorageType::Local:
{
return std::make_shared<StorageLocalIcebergConfiguration>();
}
}
}
bool DatabaseIceberg::empty() const
{
return getCatalog(Context::getGlobalContextInstance())->existsCatalog();
return getCatalog(Context::getGlobalContextInstance())->empty();
}
bool DatabaseIceberg::isTableExist(const String & name, ContextPtr context_) const
{
auto [namespace_name, table_name] = parseTableName(name);
const auto [namespace_name, table_name] = parseTableName(name);
return getCatalog(context_)->existsTable(namespace_name, table_name);
}
StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_) const
{
auto catalog = getCatalog(context_);
auto table_metadata = Iceberg::ICatalog::TableMetadata().withLocation();
auto table_metadata = Iceberg::ICatalog::TableMetadata().withLocation().withSchema();
auto [namespace_name, table_name] = parseTableName(name);
if (!catalog->tryGetTableMetadata(namespace_name, table_name, table_metadata))
return nullptr;
auto configuration = std::make_shared<StorageS3IcebergConfiguration>();
/// Take database engine definition AST as base.
ASTStorage * storage = database_engine_definition->as<ASTStorage>();
ASTs args = storage->engine->arguments->children;
auto table_endpoint = std::filesystem::path(settings[DatabaseIcebergSetting::storage_endpoint].value) / table_metadata.getPath();
/// Replace Iceberg Catalog endpoint with storage path endpoint of requested table.
auto table_endpoint = std::filesystem::path(settings[DatabaseIcebergSetting::storage_endpoint].value)
/ table_metadata.getPath()
/ "";
args[0] = std::make_shared<ASTLiteral>(table_endpoint.string());
LOG_TEST(log, "Using endpoint: {}", table_endpoint.string());
LOG_TEST(log, "Using table endpoint: {}", table_endpoint.string());
const auto columns = ColumnsDescription(table_metadata.getSchema());
const auto configuration = getConfiguration();
/// with_table_structure = false: because there will be no table stucture in table definition AST.
StorageObjectStorage::Configuration::initialize(*configuration, args, context_, /* with_table_structure */false);
StorageObjectStorage::Configuration::initialize(*configuration, args, context_, false);
return std::make_shared<StorageObjectStorage>(
configuration,
configuration->createObjectStorage(context_, /* is_readonly */ false),
context_,
StorageID(getDatabaseName(), name),
/* columns */ColumnsDescription{},
/* columns */columns,
/* constraints */ConstraintsDescription{},
/* comment */"",
getFormatSettings(context_),
@ -117,16 +153,17 @@ StoragePtr DatabaseIceberg::tryGetTable(const String & name, ContextPtr context_
DatabaseTablesIteratorPtr DatabaseIceberg::getTablesIterator(
ContextPtr context_,
const FilterByNameFunction & /* filter_by_table_name */,
const FilterByNameFunction & filter_by_table_name,
bool /* skip_not_loaded */) const
{
Tables tables;
auto catalog = getCatalog(context_);
for (const auto & table_name : catalog->getTables())
{
DataTypePtr type = std::make_shared<DataTypeString>();
auto columns = ColumnsDescription{NamesAndTypesList({NameAndTypePair(std::string("a"), type)})};
auto storage = std::make_shared<StorageNull>(StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription{}, "");
if (filter_by_table_name && !filter_by_table_name(table_name))
continue;
auto storage = tryGetTable(table_name, context_);
tables.emplace(table_name, storage);
}

View File

@ -5,6 +5,7 @@
#include <Databases/DatabasesCommon.h>
#include <Databases/Iceberg/DatabaseIcebergSettings.h>
#include <Databases/Iceberg/ICatalog.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
namespace DB
{
@ -49,7 +50,7 @@ private:
const LoggerPtr log;
std::unique_ptr<Iceberg::ICatalog> getCatalog(ContextPtr context_) const;
std::shared_ptr<StorageObjectStorage::Configuration> getConfiguration() const;
};
}

View File

@ -16,6 +16,7 @@ namespace ErrorCodes
#define DATABASE_ICEBERG_RELATED_SETTINGS(DECLARE, ALIAS) \
DECLARE(DatabaseIcebergCatalogType, catalog_type, DatabaseIcebergCatalogType::REST, "Catalog type", 0) \
DECLARE(DatabaseIcebergStorageType, storage_type, DatabaseIcebergStorageType::S3, "Storage type: S3, Local, Azure, HDFS", 0) \
DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \
#define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \

View File

@ -16,7 +16,8 @@ class SettingsChanges;
#define DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \
M(CLASS_NAME, String) \
M(CLASS_NAME, UInt64) \
M(CLASS_NAME, DatabaseIcebergCatalogType)
M(CLASS_NAME, DatabaseIcebergCatalogType) \
M(CLASS_NAME, DatabaseIcebergStorageType)
DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseIcebergSettings, DECLARE_SETTING_TRAIT)

View File

@ -18,7 +18,7 @@ std::string ICatalog::TableMetadata::getPath() const
if (location.starts_with("s3://"))
return location.substr(std::strlen("s3://"));
else
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unsupported location type: {}", location);
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected path format: {}", location);
}
const DB::NamesAndTypesList & ICatalog::TableMetadata::getSchema() const

View File

@ -21,6 +21,8 @@ public:
virtual bool existsCatalog() const = 0;
virtual bool empty() const = 0;
virtual Tables getTables() const = 0;
virtual bool existsTable(

View File

@ -52,6 +52,30 @@ bool RestCatalog::existsCatalog() const
}
}
bool RestCatalog::empty() const
{
try
{
bool found_table = false;
auto stop_condition = [&](const std::string & namespace_name) -> bool
{
const auto tables = getTables(namespace_name, /* limit */1);
found_table = !tables.empty();
return found_table;
};
Namespaces namespaces;
getNamespacesRecursive("", namespaces, stop_condition);
return found_table;
}
catch (...)
{
DB::tryLogCurrentException(log);
return true;
}
}
DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer(const std::string & endpoint, const Poco::URI::QueryParameters & params) const
{
const auto & context = getContext();
@ -73,7 +97,7 @@ DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer(const std::string &
RestCatalog::Tables RestCatalog::getTables() const
{
Namespaces namespaces;
getNamespacesRecursive("", namespaces);
getNamespacesRecursive("", namespaces, {});
Tables tables;
for (const auto & current_namespace : namespaces)
@ -84,7 +108,7 @@ RestCatalog::Tables RestCatalog::getTables() const
return tables;
}
void RestCatalog::getNamespacesRecursive(const Namespace & base_namespace, Namespaces & result) const
void RestCatalog::getNamespacesRecursive(const Namespace & base_namespace, Namespaces & result, StopCondition stop_condition) const
{
auto namespaces = getNamespaces(base_namespace);
result.reserve(result.size() + namespaces.size());
@ -93,7 +117,11 @@ void RestCatalog::getNamespacesRecursive(const Namespace & base_namespace, Names
for (const auto & current_namespace : namespaces)
{
chassert(current_namespace.starts_with(base_namespace));
getNamespacesRecursive(current_namespace, result);
if (stop_condition && stop_condition(current_namespace))
break;
getNamespacesRecursive(current_namespace, result, stop_condition);
}
}
@ -175,14 +203,14 @@ RestCatalog::Namespaces RestCatalog::parseNamespaces(DB::ReadBuffer & buf, const
return namespaces;
}
RestCatalog::Tables RestCatalog::getTables(const Namespace & base_namespace) const
RestCatalog::Tables RestCatalog::getTables(const Namespace & base_namespace, size_t limit) const
{
const auto endpoint = std::string(namespaces_endpoint) + "/" + base_namespace + "/tables";
auto buf = createReadBuffer(endpoint);
return parseTables(*buf, base_namespace);
return parseTables(*buf, base_namespace, limit);
}
RestCatalog::Tables RestCatalog::parseTables(DB::ReadBuffer & buf, const std::string & base_namespace) const
RestCatalog::Tables RestCatalog::parseTables(DB::ReadBuffer & buf, const std::string & base_namespace, size_t limit) const
{
if (buf.eof())
return {};
@ -201,9 +229,12 @@ RestCatalog::Tables RestCatalog::parseTables(DB::ReadBuffer & buf, const std::st
Tables tables;
for (size_t i = 0; i < identifiers_object->size(); ++i)
{
auto current_table_json = identifiers_object->get(static_cast<int>(i)).extract<Poco::JSON::Object::Ptr>();
auto table_name = current_table_json->get("name").extract<String>();
const auto current_table_json = identifiers_object->get(static_cast<int>(i)).extract<Poco::JSON::Object::Ptr>();
const auto table_name = current_table_json->get("name").extract<String>();
tables.push_back(base_namespace + "." + table_name);
if (limit && tables.size() >= limit)
break;
}
return tables;
}

View File

@ -25,6 +25,8 @@ public:
bool existsCatalog() const override;
bool empty() const override;
Tables getTables() const override;
bool existsTable(const std::string & namespace_name, const std::string & table_name) const override;
@ -48,15 +50,16 @@ private:
Poco::URI::QueryParameters createParentNamespaceParams(const std::string & base_namespace) const;
void getNamespacesRecursive(const Namespace & base_namespace, Namespaces & result) const;
using StopCondition = std::function<bool(const std::string & namespace_name)>;
void getNamespacesRecursive(const Namespace & base_namespace, Namespaces & result, StopCondition stop_condition) const;
Namespaces getNamespaces(const Namespace & base_namespace) const;
Namespaces parseNamespaces(DB::ReadBuffer & buf, const std::string & base_namespace) const;
Tables getTables(const Namespace & base_namespace) const;
Tables getTables(const Namespace & base_namespace, size_t limit = 0) const;
Tables parseTables(DB::ReadBuffer & buf, const std::string & base_namespace) const;
Tables parseTables(DB::ReadBuffer & buf, const std::string & base_namespace, size_t limit) const;
bool getTableMetadataImpl(
const std::string & namespace_name,

View File

@ -22,7 +22,7 @@ services:
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=minio123
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_WAREHOUSE=s3://iceberg_data/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
minio:
@ -36,8 +36,9 @@ services:
default:
aliases:
- warehouse.minio
expose:
- 9001
ports:
- 9001:9001
- 9002:9000
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:

View File

@ -22,7 +22,9 @@ from pyiceberg.types import (
StructType,
TimestampType,
)
import pyarrow as pa
import random
from datetime import datetime, timedelta
from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm
from helpers.s3_tools import get_file_contents, list_s3_objects, prepare_s3_bucket
from helpers.test_tools import TSV, csv_compare
@ -34,9 +36,11 @@ BASE_URL_LOCAL_RAW = "http://localhost:8182"
CATALOG_NAME = "demo"
DEFAULT_SCHEMA = Schema(
NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True),
NestedField(field_id=2, name="symbol", field_type=StringType(), required=True),
NestedField(field_id=3, name="bid", field_type=FloatType(), required=False),
NestedField(
field_id=1, name="datetime", field_type=TimestampType(), required=False
),
NestedField(field_id=2, name="symbol", field_type=StringType(), required=False),
NestedField(field_id=3, name="bid", field_type=DoubleType(), required=False),
NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
NestedField(
field_id=5,
@ -52,11 +56,15 @@ DEFAULT_SCHEMA = Schema(
required=False,
),
)
DEFAULT_CREATE_TABLE = "CREATE TABLE {}.`{}.{}`\\n(\\n `datetime` Nullable(DateTime64(6)),\\n `symbol` Nullable(String),\\n `bid` Nullable(Float64),\\n `ask` Nullable(Float64),\\n `details` Tuple(created_by Nullable(String))\\n)\\nENGINE = Iceberg(\\'http://minio:9000/warehouse/data\\', \\'minio\\', \\'[HIDDEN]\\')\n"
DEFAULT_PARTITION_SPEC = PartitionSpec(
PartitionField(
source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day"
)
)
DEFAULT_SORT_ORDER = SortOrder(SortField(source_id=2, transform=IdentityTransform()))
@ -68,13 +76,13 @@ def list_namespaces():
raise Exception(f"Failed to list namespaces: {response.status_code}")
def load_catalog_impl():
def load_catalog_impl(started_cluster):
return load_catalog(
CATALOG_NAME,
**{
"uri": BASE_URL_LOCAL_RAW,
"type": "rest",
"s3.endpoint": f"http://minio:9000",
"s3.endpoint": f"http://localhost:9002",
"s3.access-key-id": "minio",
"s3.secret-access-key": "minio123",
},
@ -89,25 +97,51 @@ def create_table(
partition_spec=DEFAULT_PARTITION_SPEC,
sort_order=DEFAULT_SORT_ORDER,
):
catalog.create_table(
return catalog.create_table(
identifier=f"{namespace}.{table}",
schema=schema,
location=f"s3://warehouse",
location=f"s3://warehouse/data",
partition_spec=partition_spec,
sort_order=sort_order,
)
def generate_record():
return {
"datetime": datetime.now(),
"symbol": str("kek"),
"bid": round(random.uniform(100, 200), 2),
"ask": round(random.uniform(200, 300), 2),
"details": {"created_by": "Alice Smith"},
}
def create_clickhouse_iceberg_database(started_cluster, node, name):
node.query(
f"""
DROP DATABASE IF EXISTS {name};
CREATE DATABASE {name} ENGINE = Iceberg('{BASE_URL}', 'minio', 'minio123')
SETTINGS catalog_type = 'rest', storage_endpoint = 'http://{started_cluster.minio_ip}:{started_cluster.minio_port}/'
SETTINGS catalog_type = 'rest', storage_endpoint = 'http://minio:9000/'
"""
)
def print_objects():
minio_client = Minio(
f"localhost:9002",
access_key="minio",
secret_key="minio123",
secure=False,
http_client=urllib3.PoolManager(cert_reqs="CERT_NONE"),
)
objects = list(minio_client.list_objects("warehouse", "", recursive=True))
names = [x.object_name for x in objects]
names.sort()
for name in names:
print(f"Found object: {name}")
@pytest.fixture(scope="module")
def started_cluster():
try:
@ -125,7 +159,6 @@ def started_cluster():
# TODO: properly wait for container
time.sleep(10)
#cluster.minio_client.make_bucket("warehouse")
yield cluster
@ -133,7 +166,7 @@ def started_cluster():
cluster.shutdown()
def test_simple(started_cluster):
def test_list_tables(started_cluster):
node = started_cluster.instances["node1"]
root_namespace = "clickhouse"
@ -142,7 +175,7 @@ def test_simple(started_cluster):
namespace_1_tables = ["tableA", "tableB"]
namespace_2_tables = ["tableC", "tableD"]
catalog = load_catalog_impl()
catalog = load_catalog_impl(started_cluster)
for namespace in [namespace_1, namespace_2]:
catalog.create_namespace(namespace)
@ -182,13 +215,13 @@ def test_simple(started_cluster):
).strip()
)
expected = f"CREATE TABLE {CATALOG_NAME}.`{namespace_2}.tableC`\\n(\\n `datetime` DateTime64(6),\\n `symbol` String,\\n `bid` Nullable(Float32),\\n `ask` Nullable(Float64),\\n `details` Tuple(created_by Nullable(String))\\n)\\nENGINE = Iceberg(\\'http://None:9001/warehouse\\', \\'minio\\', \\'[HIDDEN]\\')\n"
expected = DEFAULT_CREATE_TABLE.format(CATALOG_NAME, namespace_2, "tableC")
assert expected == node.query(
f"SHOW CREATE TABLE {CATALOG_NAME}.`{namespace_2}.tableC`"
)
def test_different_namespaces(started_cluster):
def test_many_namespaces(started_cluster):
node = started_cluster.instances["node1"]
namespaces = [
"A",
@ -202,11 +235,9 @@ def test_different_namespaces(started_cluster):
"B.CC",
]
tables = ["A", "B", "C", "D", "E", "F"]
catalog = load_catalog_impl()
catalog = load_catalog_impl(started_cluster)
for namespace in namespaces:
# if namespace in catalog.list_namespaces()["namesoaces"]:
# catalog.drop_namespace(namespace)
catalog.create_namespace(namespace)
for table in tables:
create_table(catalog, namespace, table)
@ -221,3 +252,43 @@ def test_different_namespaces(started_cluster):
f"SELECT count() FROM system.tables WHERE database = '{CATALOG_NAME}' and name = '{table_name}'"
)
)
def test_select(started_cluster):
node = started_cluster.instances["node1"]
test_ref = "test_list_tables"
table_name = f"{test_ref}_table"
root_namespace = f"{test_ref}_namespace"
namespace = f"{root_namespace}.A.B.C"
namespaces_to_create = [
root_namespace,
f"{root_namespace}.A",
f"{root_namespace}.A.B",
f"{root_namespace}.A.B.C",
]
catalog = load_catalog_impl(started_cluster)
for namespace in namespaces_to_create:
catalog.create_namespace(namespace)
assert len(catalog.list_tables(namespace)) == 0
table = create_table(catalog, namespace, table_name)
num_rows = 10
data = [generate_record() for _ in range(num_rows)]
df = pa.Table.from_pylist(data)
table.append(df)
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
expected = DEFAULT_CREATE_TABLE.format(CATALOG_NAME, namespace, table_name)
assert expected == node.query(
f"SHOW CREATE TABLE {CATALOG_NAME}.`{namespace}.{table_name}`"
)
assert num_rows == int(
node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace}.{table_name}`")
)