import glob import json import logging import os import random import time import uuid from datetime import datetime, timedelta import pyarrow as pa import pytest import requests import urllib3 from minio import Minio from pyiceberg.catalog import load_catalog from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table.sorting import SortField, SortOrder from pyiceberg.transforms import DayTransform, IdentityTransform from pyiceberg.types import ( DoubleType, FloatType, NestedField, StringType, StructType, TimestampType, ) from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm from helpers.s3_tools import get_file_contents, list_s3_objects, prepare_s3_bucket from helpers.test_tools import TSV, csv_compare BASE_URL = "http://rest:8181/v1" BASE_URL_LOCAL = "http://localhost:8182/v1" BASE_URL_LOCAL_RAW = "http://localhost:8182" CATALOG_NAME = "demo" DEFAULT_SCHEMA = Schema( NestedField( field_id=1, name="datetime", field_type=TimestampType(), required=False ), NestedField(field_id=2, name="symbol", field_type=StringType(), required=False), NestedField(field_id=3, name="bid", field_type=DoubleType(), required=False), NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False), NestedField( field_id=5, name="details", field_type=StructType( NestedField( field_id=4, name="created_by", field_type=StringType(), required=False, ), ), required=False, ), ) DEFAULT_CREATE_TABLE = "CREATE TABLE {}.`{}.{}`\\n(\\n `datetime` Nullable(DateTime64(6)),\\n `symbol` Nullable(String),\\n `bid` Nullable(Float64),\\n `ask` Nullable(Float64),\\n `details` Tuple(created_by Nullable(String))\\n)\\nENGINE = Iceberg(\\'http://minio:9000/warehouse/data/\\', \\'minio\\', \\'[HIDDEN]\\')\n" DEFAULT_PARTITION_SPEC = PartitionSpec( PartitionField( source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day" ) ) DEFAULT_SORT_ORDER = SortOrder(SortField(source_id=2, transform=IdentityTransform())) def list_namespaces(): response = requests.get(f"{BASE_URL_LOCAL}/namespaces") if response.status_code == 200: return response.json() else: raise Exception(f"Failed to list namespaces: {response.status_code}") def load_catalog_impl(started_cluster): return load_catalog( CATALOG_NAME, **{ "uri": BASE_URL_LOCAL_RAW, "type": "rest", "s3.endpoint": f"http://localhost:9002", "s3.access-key-id": "minio", "s3.secret-access-key": "minio123", }, ) def create_table( catalog, namespace, table, schema=DEFAULT_SCHEMA, partition_spec=DEFAULT_PARTITION_SPEC, sort_order=DEFAULT_SORT_ORDER, ): return catalog.create_table( identifier=f"{namespace}.{table}", schema=schema, location=f"s3://warehouse/data", partition_spec=partition_spec, sort_order=sort_order, ) def generate_record(): return { "datetime": datetime.now(), "symbol": str("kek"), "bid": round(random.uniform(100, 200), 2), "ask": round(random.uniform(200, 300), 2), "details": {"created_by": "Alice Smith"}, } def create_clickhouse_iceberg_database(started_cluster, node, name): node.query( f""" DROP DATABASE IF EXISTS {name}; CREATE DATABASE {name} ENGINE = Iceberg('{BASE_URL}', 'minio', 'minio123') SETTINGS catalog_type = 'rest', storage_endpoint = 'http://minio:9000/' """ ) def print_objects(): minio_client = Minio( f"localhost:9002", access_key="minio", secret_key="minio123", secure=False, http_client=urllib3.PoolManager(cert_reqs="CERT_NONE"), ) objects = list(minio_client.list_objects("warehouse", "", recursive=True)) names = [x.object_name for x in objects] names.sort() for name in names: print(f"Found object: {name}") @pytest.fixture(scope="module") def started_cluster(): try: cluster = ClickHouseCluster(__file__) cluster.add_instance( "node1", main_configs=[], user_configs=[], stay_alive=True, with_iceberg_catalog=True, ) logging.info("Starting cluster...") cluster.start() # TODO: properly wait for container time.sleep(10) yield cluster finally: cluster.shutdown() def test_list_tables(started_cluster): node = started_cluster.instances["node1"] root_namespace = f"clickhouse_{uuid.uuid4()}" namespace_1 = f"{root_namespace}.testA.A" namespace_2 = f"{root_namespace}.testB.B" namespace_1_tables = ["tableA", "tableB"] namespace_2_tables = ["tableC", "tableD"] catalog = load_catalog_impl(started_cluster) for namespace in [namespace_1, namespace_2]: catalog.create_namespace(namespace) found = False for namespace_list in list_namespaces()["namespaces"]: if root_namespace == namespace_list[0]: found = True break assert found found = False for namespace_list in catalog.list_namespaces(): if root_namespace == namespace_list[0]: found = True break assert found for namespace in [namespace_1, namespace_2]: assert len(catalog.list_tables(namespace)) == 0 create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) tables_list = "" for table in namespace_1_tables: create_table(catalog, namespace_1, table) if len(tables_list) > 0: tables_list += "\n" tables_list += f"{namespace_1}.{table}" for table in namespace_2_tables: create_table(catalog, namespace_2, table) if len(tables_list) > 0: tables_list += "\n" tables_list += f"{namespace_2}.{table}" assert ( tables_list == node.query( f"SELECT name FROM system.tables WHERE database = '{CATALOG_NAME}' and name ILIKE '{root_namespace}%' ORDER BY name" ).strip() ) node.restart_clickhouse() assert ( tables_list == node.query( f"SELECT name FROM system.tables WHERE database = '{CATALOG_NAME}' and name ILIKE '{root_namespace}%' ORDER BY name" ).strip() ) expected = DEFAULT_CREATE_TABLE.format(CATALOG_NAME, namespace_2, "tableC") assert expected == node.query( f"SHOW CREATE TABLE {CATALOG_NAME}.`{namespace_2}.tableC`" ) def test_many_namespaces(started_cluster): node = started_cluster.instances["node1"] root_namespace_1 = f"A_{uuid.uuid4()}" root_namespace_2 = f"B_{uuid.uuid4()}" namespaces = [ f"{root_namespace_1}", f"{root_namespace_1}.B.C", f"{root_namespace_1}.B.C.D", f"{root_namespace_1}.B.C.D.E", f"{root_namespace_2}", f"{root_namespace_2}.C", f"{root_namespace_2}.CC", ] tables = ["A", "B", "C"] catalog = load_catalog_impl(started_cluster) for namespace in namespaces: catalog.create_namespace(namespace) for table in tables: create_table(catalog, namespace, table) create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) for namespace in namespaces: for table in tables: table_name = f"{namespace}.{table}" assert int( node.query( f"SELECT count() FROM system.tables WHERE database = '{CATALOG_NAME}' and name = '{table_name}'" ) ) def test_select(started_cluster): node = started_cluster.instances["node1"] test_ref = f"test_list_tables_{uuid.uuid4()}" table_name = f"{test_ref}_table" root_namespace = f"{test_ref}_namespace" namespace = f"{root_namespace}.A.B.C" namespaces_to_create = [ root_namespace, f"{root_namespace}.A", f"{root_namespace}.A.B", f"{root_namespace}.A.B.C", ] catalog = load_catalog_impl(started_cluster) for namespace in namespaces_to_create: catalog.create_namespace(namespace) assert len(catalog.list_tables(namespace)) == 0 table = create_table(catalog, namespace, table_name) num_rows = 10 data = [generate_record() for _ in range(num_rows)] df = pa.Table.from_pylist(data) table.append(df) create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) expected = DEFAULT_CREATE_TABLE.format(CATALOG_NAME, namespace, table_name) assert expected == node.query( f"SHOW CREATE TABLE {CATALOG_NAME}.`{namespace}.{table_name}`" ) assert num_rows == int( node.query(f"SELECT count() FROM {CATALOG_NAME}.`{namespace}.{table_name}`") )