mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
One more test
This commit is contained in:
parent
e565f5f1c7
commit
f8255fb4ae
@ -10,6 +10,7 @@ import requests
|
||||
from minio import Minio
|
||||
import urllib3
|
||||
|
||||
from helpers.test_tools import TSV, csv_compare
|
||||
from pyiceberg.catalog import load_catalog
|
||||
from pyiceberg.schema import Schema
|
||||
from pyiceberg.types import (
|
||||
@ -33,23 +34,33 @@ BASE_URL = "http://rest:8181/v1"
|
||||
BASE_URL_LOCAL = "http://localhost:8181/v1"
|
||||
BASE_URL_LOCAL_RAW = "http://localhost:8181"
|
||||
|
||||
CATALOG_NAME = "demo"
|
||||
|
||||
def create_namespace(name):
|
||||
payload = {
|
||||
"namespace": [name],
|
||||
"properties": {"owner": "clickhouse", "description": "test namespace"},
|
||||
}
|
||||
|
||||
headers = {"Content-Type": "application/json"}
|
||||
response = requests.post(
|
||||
f"{BASE_URL_LOCAL}/namespaces", headers=headers, data=json.dumps(payload)
|
||||
DEFAULT_SCHEMA = Schema(
|
||||
NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True),
|
||||
NestedField(field_id=2, name="symbol", field_type=StringType(), required=True),
|
||||
NestedField(field_id=3, name="bid", field_type=FloatType(), required=False),
|
||||
NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
|
||||
NestedField(
|
||||
field_id=5,
|
||||
name="details",
|
||||
field_type=StructType(
|
||||
NestedField(
|
||||
field_id=4,
|
||||
name="created_by",
|
||||
field_type=StringType(),
|
||||
required=False,
|
||||
),
|
||||
),
|
||||
required=False,
|
||||
),
|
||||
)
|
||||
DEFAULT_PARTITION_SPEC = PartitionSpec(
|
||||
PartitionField(
|
||||
source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day"
|
||||
)
|
||||
if response.status_code == 200:
|
||||
print(f"Namespace '{name}' created successfully.")
|
||||
else:
|
||||
raise Exception(
|
||||
f"Failed to create namespace. Status code: {response.status_code}, Response: {response.text}"
|
||||
)
|
||||
)
|
||||
DEFAULT_SORT_ORDER = SortOrder(SortField(source_id=2, transform=IdentityTransform()))
|
||||
|
||||
|
||||
def list_namespaces():
|
||||
@ -60,31 +71,44 @@ def list_namespaces():
|
||||
raise Exception(f"Failed to list namespaces: {response.status_code}")
|
||||
|
||||
|
||||
def create_table(name, namespace):
|
||||
payload = {
|
||||
"name": name,
|
||||
"location": "s3://warehouse/",
|
||||
"schema": {
|
||||
"type": "struct",
|
||||
"fields": [
|
||||
{"id": 1, "name": "name", "type": "String", "required": True},
|
||||
{"id": 2, "name": "age", "type": "Int", "required": False},
|
||||
],
|
||||
def load_catalog_impl():
|
||||
return load_catalog(
|
||||
CATALOG_NAME,
|
||||
**{
|
||||
"uri": BASE_URL_LOCAL_RAW,
|
||||
"type": "rest",
|
||||
"s3.endpoint": f"http://minio:9000",
|
||||
"s3.access-key-id": "minio",
|
||||
"s3.secret-access-key": "minio123",
|
||||
},
|
||||
}
|
||||
|
||||
headers = {"Content-Type": "application/json"}
|
||||
response = requests.post(
|
||||
f"{BASE_URL_LOCAL}/namespaces/{namespace}/tables",
|
||||
headers=headers,
|
||||
data=json.dumps(payload),
|
||||
)
|
||||
if response.status_code == 200:
|
||||
print(f"Table '{name}' created successfully.")
|
||||
else:
|
||||
raise Exception(
|
||||
f"Failed to create a table. Status code: {response.status_code}, Response: {response.text}"
|
||||
)
|
||||
|
||||
|
||||
def create_table(
|
||||
catalog,
|
||||
namespace,
|
||||
table,
|
||||
schema=DEFAULT_SCHEMA,
|
||||
partition_spec=DEFAULT_PARTITION_SPEC,
|
||||
sort_order=DEFAULT_SORT_ORDER,
|
||||
):
|
||||
catalog.create_table(
|
||||
identifier=f"{namespace}.{table}",
|
||||
schema=schema,
|
||||
location=f"s3://warehouse",
|
||||
partition_spec=partition_spec,
|
||||
sort_order=sort_order,
|
||||
)
|
||||
|
||||
|
||||
def create_clickhouse_iceberg_database(started_cluster, node, name):
|
||||
node.query(
|
||||
f"""
|
||||
DROP DATABASE IF EXISTS {name};
|
||||
CREATE DATABASE {name} ENGINE = Iceberg('{BASE_URL}', 'minio', 'minio123')
|
||||
SETTINGS catalog_type = 'rest', storage_endpoint = 'http://{started_cluster.minio_ip}:{started_cluster.minio_port}/'
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
@ -102,7 +126,8 @@ def started_cluster():
|
||||
logging.info("Starting cluster...")
|
||||
cluster.start()
|
||||
|
||||
# prepare_s3_bucket(cluster)
|
||||
# TODO: properly wait for container
|
||||
time.sleep(10)
|
||||
|
||||
yield cluster
|
||||
|
||||
@ -111,88 +136,78 @@ def started_cluster():
|
||||
|
||||
|
||||
def test_simple(started_cluster):
|
||||
# TODO: properly wait for container
|
||||
time.sleep(10)
|
||||
|
||||
namespace_1 = "clickhouse.test.A"
|
||||
root_namespace = "clickhouse"
|
||||
|
||||
create_namespace(namespace_1)
|
||||
assert root_namespace in list_namespaces()["namespaces"][0][0]
|
||||
|
||||
node = started_cluster.instances["node1"]
|
||||
node.query(
|
||||
f"""
|
||||
CREATE DATABASE demo ENGINE = Iceberg('{BASE_URL}', 'minio', 'minio123')
|
||||
SETTINGS catalog_type = 'rest', storage_endpoint = 'http://{started_cluster.minio_ip}:{started_cluster.minio_port}/'
|
||||
"""
|
||||
)
|
||||
|
||||
catalog_name = "demo"
|
||||
root_namespace = "clickhouse"
|
||||
namespace_1 = "clickhouse.testA.A"
|
||||
namespace_2 = "clickhouse.testB.B"
|
||||
namespace_1_tables = ["tableA", "tableB"]
|
||||
namespace_2_tables = ["tableC", "tableD"]
|
||||
|
||||
catalog = load_catalog(
|
||||
"demo",
|
||||
**{
|
||||
"uri": BASE_URL_LOCAL_RAW,
|
||||
"type": "rest",
|
||||
"s3.endpoint": f"http://minio:9000",
|
||||
"s3.access-key-id": "minio",
|
||||
"s3.secret-access-key": "minio123",
|
||||
},
|
||||
)
|
||||
namespace_2 = "clickhouse.test.B"
|
||||
catalog.create_namespace(namespace_2)
|
||||
catalog = load_catalog_impl()
|
||||
|
||||
for namespace in [namespace_1, namespace_2]:
|
||||
catalog.create_namespace(namespace)
|
||||
|
||||
assert root_namespace in list_namespaces()["namespaces"][0][0]
|
||||
assert [(root_namespace,)] == catalog.list_namespaces()
|
||||
|
||||
tables = catalog.list_tables(namespace_2)
|
||||
assert len(tables) == 0
|
||||
for namespace in [namespace_1, namespace_2]:
|
||||
assert len(catalog.list_tables(namespace)) == 0
|
||||
|
||||
schema = Schema(
|
||||
NestedField(
|
||||
field_id=1, name="datetime", field_type=TimestampType(), required=True
|
||||
),
|
||||
NestedField(field_id=2, name="symbol", field_type=StringType(), required=True),
|
||||
NestedField(field_id=3, name="bid", field_type=FloatType(), required=False),
|
||||
NestedField(field_id=4, name="ask", field_type=DoubleType(), required=False),
|
||||
NestedField(
|
||||
field_id=5,
|
||||
name="details",
|
||||
field_type=StructType(
|
||||
NestedField(
|
||||
field_id=4,
|
||||
name="created_by",
|
||||
field_type=StringType(),
|
||||
required=False,
|
||||
),
|
||||
),
|
||||
required=False,
|
||||
),
|
||||
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
|
||||
|
||||
tables_list = ""
|
||||
for table in namespace_1_tables:
|
||||
create_table(catalog, namespace_1, table)
|
||||
if len(tables_list) > 0:
|
||||
tables_list += "\n"
|
||||
tables_list += f"{namespace_1}.{table}"
|
||||
|
||||
for table in namespace_2_tables:
|
||||
create_table(catalog, namespace_2, table)
|
||||
if len(tables_list) > 0:
|
||||
tables_list += "\n"
|
||||
tables_list += f"{namespace_2}.{table}"
|
||||
|
||||
assert (
|
||||
tables_list
|
||||
== node.query(
|
||||
f"SELECT name FROM system.tables WHERE database = '{CATALOG_NAME}' ORDER BY name"
|
||||
).strip()
|
||||
)
|
||||
|
||||
partition_spec = PartitionSpec(
|
||||
PartitionField(
|
||||
source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day"
|
||||
)
|
||||
)
|
||||
|
||||
sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform()))
|
||||
|
||||
for table in ["tableA", "tableB"]:
|
||||
catalog.create_table(
|
||||
identifier=f"{namespace_2}.{table}",
|
||||
schema=schema,
|
||||
location=f"s3://warehouse",
|
||||
partition_spec=partition_spec,
|
||||
sort_order=sort_order,
|
||||
)
|
||||
|
||||
def check():
|
||||
assert f"{namespace_2}.tableA\n{namespace_2}.tableB\n" == node.query("SELECT name FROM system.tables WHERE database = 'demo' ORDER BY name")
|
||||
|
||||
expected = "CREATE TABLE demo.`clickhouse.test.B.tableA`\\n(\\n `datetime` DateTime64(6),\\n `symbol` String,\\n `bid` Nullable(Float32),\\n `ask` Nullable(Float64),\\n `details` Tuple(created_by Nullable(String))\\n)\\nENGINE = Iceberg(\\'http://None:9001/warehouse\\', \\'minio\\', \\'[HIDDEN]\\')\n";
|
||||
assert expected == node.query(f"SHOW CREATE TABLE demo.`{namespace_2}.tableA`")
|
||||
|
||||
check()
|
||||
node.restart_clickhouse()
|
||||
check()
|
||||
assert (
|
||||
tables_list
|
||||
== node.query(
|
||||
f"SELECT name FROM system.tables WHERE database = '{CATALOG_NAME}' ORDER BY name"
|
||||
).strip()
|
||||
)
|
||||
|
||||
expected = f"CREATE TABLE {CATALOG_NAME}.`{namespace_2}.tableC`\\n(\\n `datetime` DateTime64(6),\\n `symbol` String,\\n `bid` Nullable(Float32),\\n `ask` Nullable(Float64),\\n `details` Tuple(created_by Nullable(String))\\n)\\nENGINE = Iceberg(\\'http://None:9001/warehouse\\', \\'minio\\', \\'[HIDDEN]\\')\n"
|
||||
assert expected == node.query(
|
||||
f"SHOW CREATE TABLE {CATALOG_NAME}.`{namespace_2}.tableC`"
|
||||
)
|
||||
|
||||
|
||||
def test_different_namespaces(started_cluster):
|
||||
node = started_cluster.instances["node1"]
|
||||
namespaces = ["A", "A.B.C", "A.B.C.D", "A.B.C.D.E", "A.B.C.D.E.F", "A.B.C.D.E.FF", "B", "B.C", "B.CC"]
|
||||
tables = ["A", "B", "C", "D", "E", "F"]
|
||||
catalog = load_catalog_impl()
|
||||
|
||||
for namespace in namespaces:
|
||||
#if namespace in catalog.list_namespaces()["namesoaces"]:
|
||||
# catalog.drop_namespace(namespace)
|
||||
catalog.create_namespace(namespace)
|
||||
for table in tables:
|
||||
create_table(catalog, namespace, table)
|
||||
|
||||
create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME)
|
||||
|
||||
for namespace in namespaces:
|
||||
for table in tables:
|
||||
table_name = f"{namespace}.{table}"
|
||||
assert int(node.query(
|
||||
f"SELECT count() FROM system.tables WHERE database = '{CATALOG_NAME}' and name = '{table_name}'"
|
||||
))
|
||||
|
Loading…
Reference in New Issue
Block a user