add test for iceberg

This commit is contained in:
Mikhail Artemenko 2024-11-18 17:37:24 +00:00
parent d2efae7511
commit a29ded4941
3 changed files with 181 additions and 20 deletions

View File

@ -0,0 +1,20 @@
<clickhouse>
<remote_servers>
<cluster_simple>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</cluster_simple>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,6 @@
<clickhouse>
<query_log>
<database>system</database>
<table>query_log</table>
</query_log>
</clickhouse>

View File

@ -73,14 +73,38 @@ def started_cluster():
cluster.add_instance( cluster.add_instance(
"node1", "node1",
main_configs=[ main_configs=[
"configs/config.d/query_log.xml",
"configs/config.d/cluster.xml",
"configs/config.d/named_collections.xml", "configs/config.d/named_collections.xml",
"configs/config.d/filesystem_caches.xml", "configs/config.d/filesystem_caches.xml",
], ],
user_configs=["configs/users.d/users.xml"], user_configs=["configs/users.d/users.xml"],
with_minio=True, with_minio=True,
with_azurite=True, with_azurite=True,
stay_alive=True,
with_hdfs=with_hdfs, with_hdfs=with_hdfs,
stay_alive=True,
)
cluster.add_instance(
"node2",
main_configs=[
"configs/config.d/query_log.xml",
"configs/config.d/cluster.xml",
"configs/config.d/named_collections.xml",
"configs/config.d/filesystem_caches.xml",
],
user_configs=["configs/users.d/users.xml"],
stay_alive=True,
)
cluster.add_instance(
"node3",
main_configs=[
"configs/config.d/query_log.xml",
"configs/config.d/cluster.xml",
"configs/config.d/named_collections.xml",
"configs/config.d/filesystem_caches.xml",
],
user_configs=["configs/users.d/users.xml"],
stay_alive=True,
) )
logging.info("Starting cluster...") logging.info("Starting cluster...")
@ -182,6 +206,7 @@ def get_creation_expression(
cluster, cluster,
format="Parquet", format="Parquet",
table_function=False, table_function=False,
run_on_cluster=False,
**kwargs, **kwargs,
): ):
if storage_type == "s3": if storage_type == "s3":
@ -189,35 +214,56 @@ def get_creation_expression(
bucket = kwargs["bucket"] bucket = kwargs["bucket"]
else: else:
bucket = cluster.minio_bucket bucket = cluster.minio_bucket
print(bucket)
if table_function: if run_on_cluster:
return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')" assert table_function
return f"icebergS3Cluster('cluster_simple', s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
else: else:
return f""" if table_function:
DROP TABLE IF EXISTS {table_name}; return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
CREATE TABLE {table_name} else:
ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')""" return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"""
elif storage_type == "azure": elif storage_type == "azure":
if table_function: if run_on_cluster:
assert table_function
return f""" return f"""
icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format}) icebergAzureCluster('cluster_simple', azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
""" """
else: else:
return f""" if table_function:
DROP TABLE IF EXISTS {table_name}; return f"""
CREATE TABLE {table_name} icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})""" """
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})"""
elif storage_type == "hdfs": elif storage_type == "hdfs":
if table_function: if run_on_cluster:
assert table_function
return f""" return f"""
icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/') icebergHDFSCluster('cluster_simple', hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
""" """
else: else:
return f""" if table_function:
DROP TABLE IF EXISTS {table_name}; return f"""
CREATE TABLE {table_name} icebergHDFS(hdfs, filename= 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/')
ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');""" """
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergHDFS(hdfs, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'hdfs://hdfs1:9000/');"""
elif storage_type == "local": elif storage_type == "local":
assert not run_on_cluster
if table_function: if table_function:
return f""" return f"""
icebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format}) icebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format})
@ -227,6 +273,7 @@ def get_creation_expression(
DROP TABLE IF EXISTS {table_name}; DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name} CREATE TABLE {table_name}
ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format});""" ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format});"""
else: else:
raise Exception(f"Unknown iceberg storage type: {storage_type}") raise Exception(f"Unknown iceberg storage type: {storage_type}")
@ -492,6 +539,94 @@ def test_types(started_cluster, format_version, storage_type):
) )
@pytest.mark.parametrize("format_version", ["1", "2"])
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs"])
def test_cluster_table_function(started_cluster, format_version, storage_type):
if is_arm() and storage_type == "hdfs":
pytest.skip("Disabled test IcebergHDFS for aarch64")
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
TABLE_NAME = (
"test_iceberg_cluster_"
+ format_version
+ "_"
+ storage_type
+ "_"
+ get_uuid_str()
)
def add_df(mode):
write_iceberg_from_df(
spark,
generate_data(spark, 0, 100),
TABLE_NAME,
mode=mode,
format_version=format_version,
)
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)
logging.info(f"Adding another dataframe. result files: {files}")
return files
files = add_df(mode="overwrite")
for i in range(1, len(started_cluster.instances)):
files = add_df(mode="append")
logging.info(f"Setup complete. files: {files}")
assert len(files) == 5 + 4 * (len(started_cluster.instances) - 1)
clusters = instance.query(f"SELECT * FROM system.clusters")
logging.info(f"Clusters setup: {clusters}")
# Regular Query only node1
table_function_expr = get_creation_expression(
storage_type, TABLE_NAME, started_cluster, table_function=True
)
select_regular = instance.query(f"SELECT * FROM {table_function_expr}").strip().split()
# Cluster Query with node1 as coordinator
table_function_expr_cluster = get_creation_expression(
storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=True
)
select_cluster = instance.query(f"SELECT * FROM {table_function_expr_cluster}").strip().split()
# Simple size check
assert len(select_regular) == 600
assert len(select_cluster) == 600
# Actual check
assert select_cluster == select_regular
# Check query_log
for replica in started_cluster.instances.values():
replica.query("SYSTEM FLUSH LOGS")
for node_name, replica in started_cluster.instances.items():
cluster_secondary_queries = replica.query(
f"""
SELECT query, type, is_initial_query, read_rows, read_bytes FROM system.query_log
WHERE
type = 'QueryStart' AND
positionCaseInsensitive(query, '{storage_type}Cluster') != 0 AND
position(query, '{TABLE_NAME}') != 0 AND
position(query, 'system.query_log') = 0 AND
NOT is_initial_query
"""
).strip().split("\n")
logging.info(f"[{node_name}] cluster_secondary_queries: {cluster_secondary_queries}")
assert len(cluster_secondary_queries) == 1
@pytest.mark.parametrize("format_version", ["1", "2"]) @pytest.mark.parametrize("format_version", ["1", "2"])
@pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"]) @pytest.mark.parametrize("storage_type", ["s3", "azure", "hdfs", "local"])
def test_delete_files(started_cluster, format_version, storage_type): def test_delete_files(started_cluster, format_version, storage_type):