Add a test

This commit is contained in:
kssenii 2024-11-12 16:41:39 +01:00
parent ca3dfe5d8e
commit e565f5f1c7
3 changed files with 144 additions and 23 deletions

View File

@ -1,7 +1,21 @@
services:
spark-iceberg:
image: tabulario/spark-iceberg
container_name: spark-iceberg
build: spark/
depends_on:
- rest
- minio
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
ports:
- 8080:8080
- 10000:10000
- 10001:10001
rest:
image: tabulario/iceberg-rest
container_name: iceberg-rest
ports:
- 8181:8181
environment:
@ -10,18 +24,36 @@ services:
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9001
spark-iceberg:
image: tabulario/spark-iceberg
container_name: spark-iceberg
- CATALOG_S3_ENDPOINT=http://minio:9000
minio:
image: minio/minio
container_name: minio
environment:
- MINIO_ROOT_USER=minio
- MINIO_ROOT_PASSWORD=minio123
- MINIO_DOMAIN=minio
networks:
default:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:
- rest
- minio
image: minio/mc
container_name: mc
environment:
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=minio123
- AWS_REGION=us-east-1
ports:
- 8080:8080
- 10000:10000
- 10001:10001
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 minio minio123) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/warehouse;
/usr/bin/mc mb minio/warehouse --ignore-existing;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"

View File

@ -440,7 +440,6 @@ class ClickHouseCluster:
zookeeper_keyfile=None,
zookeeper_certfile=None,
with_spark=False,
with_iceberg_catalog=False,
):
for param in list(os.environ.keys()):
logging.debug("ENV %40s %s" % (param, os.environ[param]))
@ -569,6 +568,7 @@ class ClickHouseCluster:
self.resolver_logs_dir = os.path.join(self.instances_dir, "resolver")
self.spark_session = None
self.with_iceberg_catalog = False
self.with_azurite = False
self.azurite_container = "azurite-container"
@ -1480,7 +1480,8 @@ class ClickHouseCluster:
"--file",
p.join(docker_compose_yml_dir, "docker_compose_iceberg_rest_catalog.yml"),
)
return self.base_minio_cmd
return self.base_iceberg_catalog_cmd
#return self.base_minio_cmd
def setup_azurite_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_azurite = True
@ -1648,6 +1649,7 @@ class ClickHouseCluster:
with_hive=False,
with_coredns=False,
with_prometheus=False,
with_iceberg_catalog=False,
handle_prometheus_remote_write=False,
handle_prometheus_remote_read=False,
use_old_analyzer=None,
@ -1750,6 +1752,7 @@ class ClickHouseCluster:
with_coredns=with_coredns,
with_cassandra=with_cassandra,
with_ldap=with_ldap,
with_iceberg_catalog=with_iceberg_catalog,
use_old_analyzer=use_old_analyzer,
server_bin_path=self.server_bin_path,
odbc_bridge_bin_path=self.odbc_bridge_bin_path,
@ -1937,6 +1940,8 @@ class ClickHouseCluster:
cmds.append(
self.setup_minio_cmd(instance, env_variables, docker_compose_yml_dir)
)
if with_iceberg_catalog and not self.with_iceberg_catalog:
cmds.append(
self.setup_iceberg_catalog_cmd(
instance, env_variables, docker_compose_yml_dir
@ -3383,6 +3388,7 @@ class ClickHouseInstance:
with_coredns,
with_cassandra,
with_ldap,
with_iceberg_catalog,
use_old_analyzer,
server_bin_path,
odbc_bridge_bin_path,

View File

@ -7,12 +7,31 @@ import uuid
import pytest
import requests
from minio import Minio
import urllib3
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (
TimestampType,
FloatType,
DoubleType,
StringType,
NestedField,
StructType,
)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform
from pyiceberg.table.sorting import SortOrder, SortField
from pyiceberg.transforms import IdentityTransform
from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm
from helpers.s3_tools import get_file_contents, list_s3_objects, prepare_s3_bucket
BASE_URL = "http://rest:8181/v1"
BASE_URL_LOCAL = "http://localhost:8181/v1"
BASE_URL_LOCAL_RAW = "http://localhost:8181"
def create_namespace(name):
@ -44,6 +63,7 @@ def list_namespaces():
def create_table(name, namespace):
payload = {
"name": name,
"location": "s3://warehouse/",
"schema": {
"type": "struct",
"fields": [
@ -70,20 +90,19 @@ def create_table(name, namespace):
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster = ClickHouseCluster(__file__, with_spark=True)
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node1",
main_configs=[],
user_configs=[],
with_minio=True,
stay_alive=True,
with_iceberg_catalog=True,
)
logging.info("Starting cluster...")
cluster.start()
cluster.minio_client.make_bucket("warehouse")
prepare_s3_bucket(cluster)
# prepare_s3_bucket(cluster)
yield cluster
@ -95,10 +114,10 @@ def test_simple(started_cluster):
# TODO: properly wait for container
time.sleep(10)
namespace = "kssenii.test.namespace"
root_namespace = "kssenii"
namespace_1 = "clickhouse.test.A"
root_namespace = "clickhouse"
create_namespace(namespace)
create_namespace(namespace_1)
assert root_namespace in list_namespaces()["namespaces"][0][0]
node = started_cluster.instances["node1"]
@ -109,7 +128,71 @@ SETTINGS catalog_type = 'rest', storage_endpoint = 'http://{started_cluster.mini
"""
)
table_name = "testtable"
create_table(table_name, "kssenii")
catalog_name = "demo"
assert namespace in node.query("USE demo; SHOW TABLES")
catalog = load_catalog(
"demo",
**{
"uri": BASE_URL_LOCAL_RAW,
"type": "rest",
"s3.endpoint": f"http://minio:9000",
"s3.access-key-id": "minio",
"s3.secret-access-key": "minio123",
},
)
namespace_2 = "clickhouse.test.B"
catalog.create_namespace(namespace_2)
assert [(root_namespace,)] == catalog.list_namespaces()
tables = catalog.list_tables(namespace_2)
assert len(tables) == 0
schema = Schema(
NestedField(
field_id=1, name="datetime", field_type=TimestampType(), required=True
),
NestedField(field_id=2, name="symbol", field_type=StringType(), required=True),
NestedField(field_id=3, name="bid", field_type=FloatType(), required=False),
NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
NestedField(
field_id=5,
name="details",
field_type=StructType(
NestedField(
field_id=4,
name="created_by",
field_type=StringType(),
required=False,
),
),
required=False,
),
)
partition_spec = PartitionSpec(
PartitionField(
source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day"
)
)
sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform()))
for table in ["tableA", "tableB"]:
catalog.create_table(
identifier=f"{namespace_2}.{table}",
schema=schema,
location=f"s3://warehouse",
partition_spec=partition_spec,
sort_order=sort_order,
)
def check():
assert f"{namespace_2}.tableA\n{namespace_2}.tableB\n" == node.query("SELECT name FROM system.tables WHERE database = 'demo' ORDER BY name")
expected = "CREATE TABLE demo.`clickhouse.test.B.tableA`\\n(\\n `datetime` DateTime64(6),\\n `symbol` String,\\n `bid` Nullable(Float32),\\n `ask` Nullable(Float64),\\n `details` Tuple(created_by Nullable(String))\\n)\\nENGINE = Iceberg(\\'http://None:9001/warehouse\\', \\'minio\\', \\'[HIDDEN]\\')\n";
assert expected == node.query(f"SHOW CREATE TABLE demo.`{namespace_2}.tableA`")
check()
node.restart_clickhouse()
check()