2024-09-27 10:19:39 +00:00
|
|
|
import glob
|
|
|
|
import json
|
2023-03-07 15:04:21 +00:00
|
|
|
import logging
|
|
|
|
import os
|
2024-08-02 11:13:26 +00:00
|
|
|
import random
|
|
|
|
import string
|
2024-09-27 10:19:39 +00:00
|
|
|
import time
|
2024-10-03 16:09:20 +00:00
|
|
|
import uuid
|
2024-09-27 10:19:39 +00:00
|
|
|
from datetime import datetime
|
2023-03-07 15:04:21 +00:00
|
|
|
|
|
|
|
import delta
|
2024-09-27 10:19:39 +00:00
|
|
|
import pyarrow as pa
|
|
|
|
import pyarrow.parquet as pq
|
|
|
|
import pyspark
|
|
|
|
import pytest
|
2023-03-07 15:04:21 +00:00
|
|
|
from delta import *
|
2024-09-27 10:19:39 +00:00
|
|
|
from deltalake.writer import write_deltalake
|
|
|
|
from minio.deleteobjects import DeleteObject
|
|
|
|
from pyspark.sql.functions import (
|
|
|
|
current_timestamp,
|
|
|
|
monotonically_increasing_id,
|
|
|
|
row_number,
|
|
|
|
)
|
2023-03-07 15:04:21 +00:00
|
|
|
from pyspark.sql.types import (
|
2024-09-27 10:19:39 +00:00
|
|
|
ArrayType,
|
|
|
|
BooleanType,
|
2023-03-07 15:04:21 +00:00
|
|
|
DateType,
|
2024-09-27 10:19:39 +00:00
|
|
|
IntegerType,
|
|
|
|
StringType,
|
|
|
|
StructField,
|
|
|
|
StructType,
|
2023-03-07 15:04:21 +00:00
|
|
|
TimestampType,
|
|
|
|
)
|
2023-03-24 21:35:12 +00:00
|
|
|
from pyspark.sql.window import Window
|
2023-03-07 15:04:21 +00:00
|
|
|
|
2024-09-27 10:19:39 +00:00
|
|
|
import helpers.client
|
|
|
|
from helpers.cluster import ClickHouseCluster
|
2024-01-22 17:24:48 +00:00
|
|
|
from helpers.s3_tools import (
|
|
|
|
get_file_contents,
|
|
|
|
list_s3_objects,
|
2024-09-27 10:19:39 +00:00
|
|
|
prepare_s3_bucket,
|
|
|
|
upload_directory,
|
2024-01-22 17:24:48 +00:00
|
|
|
)
|
2024-09-27 10:19:39 +00:00
|
|
|
from helpers.test_tools import TSV
|
2022-09-02 07:06:24 +00:00
|
|
|
|
2023-04-13 13:13:56 +00:00
|
|
|
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
2022-09-06 18:05:33 +00:00
|
|
|
|
2022-09-02 07:06:24 +00:00
|
|
|
|
2023-04-11 15:23:05 +00:00
|
|
|
def get_spark():
|
|
|
|
builder = (
|
|
|
|
pyspark.sql.SparkSession.builder.appName("spark_test")
|
|
|
|
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
|
|
|
|
.config(
|
|
|
|
"spark.sql.catalog.spark_catalog",
|
|
|
|
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
|
|
|
|
)
|
|
|
|
.master("local")
|
|
|
|
)
|
|
|
|
|
2023-04-12 12:38:39 +00:00
|
|
|
return builder.master("local").getOrCreate()
|
2023-04-11 15:23:05 +00:00
|
|
|
|
|
|
|
|
2024-08-02 11:13:26 +00:00
|
|
|
def randomize_table_name(table_name, random_suffix_length=10):
|
|
|
|
letters = string.ascii_letters + string.digits
|
|
|
|
return f"{table_name}{''.join(random.choice(letters) for _ in range(random_suffix_length))}"
|
|
|
|
|
|
|
|
|
2022-09-02 07:06:24 +00:00
|
|
|
@pytest.fixture(scope="module")
|
|
|
|
def started_cluster():
|
|
|
|
try:
|
2023-04-13 13:13:56 +00:00
|
|
|
cluster = ClickHouseCluster(__file__, with_spark=True)
|
2023-03-24 21:35:12 +00:00
|
|
|
cluster.add_instance(
|
|
|
|
"node1",
|
2024-10-03 16:01:15 +00:00
|
|
|
main_configs=[
|
|
|
|
"configs/config.d/named_collections.xml",
|
|
|
|
"configs/config.d/filesystem_caches.xml",
|
|
|
|
],
|
2023-06-14 11:45:53 +00:00
|
|
|
user_configs=["configs/users.d/users.xml"],
|
2023-03-24 21:35:12 +00:00
|
|
|
with_minio=True,
|
2024-01-22 17:24:48 +00:00
|
|
|
stay_alive=True,
|
2023-03-24 21:35:12 +00:00
|
|
|
)
|
2022-09-02 07:06:24 +00:00
|
|
|
|
|
|
|
logging.info("Starting cluster...")
|
|
|
|
cluster.start()
|
|
|
|
|
|
|
|
prepare_s3_bucket(cluster)
|
|
|
|
|
2023-04-11 15:23:05 +00:00
|
|
|
cluster.spark_session = get_spark()
|
|
|
|
|
2022-09-02 07:06:24 +00:00
|
|
|
yield cluster
|
2022-09-06 18:05:33 +00:00
|
|
|
|
2022-09-02 07:06:24 +00:00
|
|
|
finally:
|
|
|
|
cluster.shutdown()
|
|
|
|
|
2022-09-06 18:05:33 +00:00
|
|
|
|
2023-03-24 21:35:12 +00:00
|
|
|
def write_delta_from_file(spark, path, result_path, mode="overwrite"):
|
|
|
|
spark.read.load(path).write.mode(mode).option("compression", "none").format(
|
|
|
|
"delta"
|
|
|
|
).option("delta.columnMapping.mode", "name").save(result_path)
|
|
|
|
|
|
|
|
|
2023-04-03 18:56:10 +00:00
|
|
|
def write_delta_from_df(spark, df, result_path, mode="overwrite", partition_by=None):
|
|
|
|
if partition_by is None:
|
|
|
|
df.write.mode(mode).option("compression", "none").format("delta").option(
|
|
|
|
"delta.columnMapping.mode", "name"
|
|
|
|
).save(result_path)
|
|
|
|
else:
|
|
|
|
df.write.mode(mode).option("compression", "none").format("delta").option(
|
|
|
|
"delta.columnMapping.mode", "name"
|
|
|
|
).partitionBy("a").save(result_path)
|
2023-03-24 21:35:12 +00:00
|
|
|
|
|
|
|
|
|
|
|
def generate_data(spark, start, end):
|
|
|
|
a = spark.range(start, end, 1).toDF("a")
|
|
|
|
b = spark.range(start + 1, end + 1, 1).toDF("b")
|
|
|
|
b = b.withColumn("b", b["b"].cast(StringType()))
|
|
|
|
|
2023-03-28 18:51:02 +00:00
|
|
|
a = a.withColumn(
|
|
|
|
"row_index", row_number().over(Window.orderBy(monotonically_increasing_id()))
|
|
|
|
)
|
|
|
|
b = b.withColumn(
|
|
|
|
"row_index", row_number().over(Window.orderBy(monotonically_increasing_id()))
|
|
|
|
)
|
2023-03-24 21:35:12 +00:00
|
|
|
|
|
|
|
df = a.join(b, on=["row_index"]).drop("row_index")
|
|
|
|
return df
|
|
|
|
|
|
|
|
|
2023-03-07 15:04:21 +00:00
|
|
|
def get_delta_metadata(delta_metadata_file):
|
|
|
|
jsons = [json.loads(x) for x in delta_metadata_file.splitlines()]
|
|
|
|
combined_json = {}
|
|
|
|
for d in jsons:
|
|
|
|
combined_json.update(d)
|
|
|
|
return combined_json
|
2022-09-02 07:06:24 +00:00
|
|
|
|
|
|
|
|
2024-01-22 17:24:48 +00:00
|
|
|
def create_delta_table(node, table_name, bucket="root"):
|
2023-03-21 19:19:30 +00:00
|
|
|
node.query(
|
2023-03-24 21:35:12 +00:00
|
|
|
f"""
|
|
|
|
DROP TABLE IF EXISTS {table_name};
|
|
|
|
CREATE TABLE {table_name}
|
2024-01-22 17:24:48 +00:00
|
|
|
ENGINE=DeltaLake(s3, filename = '{table_name}/', url = 'http://minio1:9001/{bucket}/')"""
|
2023-03-21 19:19:30 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
2023-04-05 18:32:37 +00:00
|
|
|
def create_initial_data_file(
|
|
|
|
cluster, node, query, table_name, compression_method="none"
|
|
|
|
):
|
2023-03-24 21:35:12 +00:00
|
|
|
node.query(
|
|
|
|
f"""
|
|
|
|
INSERT INTO TABLE FUNCTION
|
|
|
|
file('{table_name}.parquet')
|
|
|
|
SETTINGS
|
|
|
|
output_format_parquet_compression_method='{compression_method}',
|
|
|
|
s3_truncate_on_insert=1 {query}
|
|
|
|
FORMAT Parquet"""
|
|
|
|
)
|
2023-04-05 18:32:37 +00:00
|
|
|
user_files_path = os.path.join(
|
|
|
|
SCRIPT_DIR, f"{cluster.instances_dir_name}/node1/database/user_files"
|
|
|
|
)
|
|
|
|
result_path = f"{user_files_path}/{table_name}.parquet"
|
2023-03-24 21:35:12 +00:00
|
|
|
return result_path
|
2023-03-21 19:19:30 +00:00
|
|
|
|
|
|
|
|
2023-03-24 21:35:12 +00:00
|
|
|
def test_single_log_file(started_cluster):
|
2023-03-07 15:04:21 +00:00
|
|
|
instance = started_cluster.instances["node1"]
|
2023-04-11 15:23:05 +00:00
|
|
|
spark = started_cluster.spark_session
|
2023-03-24 21:35:12 +00:00
|
|
|
minio_client = started_cluster.minio_client
|
|
|
|
bucket = started_cluster.minio_bucket
|
2024-08-02 11:13:26 +00:00
|
|
|
TABLE_NAME = randomize_table_name("test_single_log_file")
|
2022-09-02 07:06:24 +00:00
|
|
|
|
2024-05-13 17:54:29 +00:00
|
|
|
inserted_data = "SELECT number as a, toString(number + 1) as b FROM numbers(100)"
|
2023-04-05 18:32:37 +00:00
|
|
|
parquet_data_path = create_initial_data_file(
|
|
|
|
started_cluster, instance, inserted_data, TABLE_NAME
|
|
|
|
)
|
2023-03-24 21:35:12 +00:00
|
|
|
|
|
|
|
write_delta_from_file(spark, parquet_data_path, f"/{TABLE_NAME}")
|
|
|
|
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
|
2023-03-30 16:29:55 +00:00
|
|
|
assert len(files) == 2 # 1 metadata files + 1 data file
|
2023-03-24 21:35:12 +00:00
|
|
|
|
|
|
|
create_delta_table(instance, TABLE_NAME)
|
|
|
|
|
|
|
|
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
|
|
|
|
assert instance.query(f"SELECT * FROM {TABLE_NAME}") == instance.query(
|
|
|
|
inserted_data
|
|
|
|
)
|
2023-03-07 15:04:21 +00:00
|
|
|
|
|
|
|
|
2023-04-03 18:56:10 +00:00
|
|
|
def test_partition_by(started_cluster):
|
|
|
|
instance = started_cluster.instances["node1"]
|
2023-04-11 15:23:05 +00:00
|
|
|
spark = started_cluster.spark_session
|
2023-04-03 18:56:10 +00:00
|
|
|
minio_client = started_cluster.minio_client
|
|
|
|
bucket = started_cluster.minio_bucket
|
2024-08-02 11:13:26 +00:00
|
|
|
TABLE_NAME = randomize_table_name("test_partition_by")
|
2023-04-03 18:56:10 +00:00
|
|
|
|
|
|
|
write_delta_from_df(
|
|
|
|
spark,
|
|
|
|
generate_data(spark, 0, 10),
|
|
|
|
f"/{TABLE_NAME}",
|
|
|
|
mode="overwrite",
|
|
|
|
partition_by="a",
|
|
|
|
)
|
|
|
|
|
|
|
|
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
|
2023-04-03 19:27:05 +00:00
|
|
|
assert len(files) == 11 # 10 partitions and 1 metadata file
|
2023-04-03 18:56:10 +00:00
|
|
|
|
|
|
|
create_delta_table(instance, TABLE_NAME)
|
|
|
|
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 10
|
|
|
|
|
2024-07-31 13:51:38 +00:00
|
|
|
|
2023-04-04 14:00:59 +00:00
|
|
|
def test_checkpoint(started_cluster):
|
|
|
|
instance = started_cluster.instances["node1"]
|
2023-04-11 15:23:05 +00:00
|
|
|
spark = started_cluster.spark_session
|
2023-04-04 14:00:59 +00:00
|
|
|
minio_client = started_cluster.minio_client
|
|
|
|
bucket = started_cluster.minio_bucket
|
2024-08-02 11:13:26 +00:00
|
|
|
TABLE_NAME = randomize_table_name("test_checkpoint")
|
2023-04-04 14:00:59 +00:00
|
|
|
|
|
|
|
write_delta_from_df(
|
|
|
|
spark,
|
|
|
|
generate_data(spark, 0, 1),
|
|
|
|
f"/{TABLE_NAME}",
|
|
|
|
mode="overwrite",
|
|
|
|
)
|
|
|
|
for i in range(1, 25):
|
|
|
|
write_delta_from_df(
|
|
|
|
spark,
|
|
|
|
generate_data(spark, i, i + 1),
|
|
|
|
f"/{TABLE_NAME}",
|
|
|
|
mode="append",
|
|
|
|
)
|
|
|
|
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
|
|
|
|
# 25 data files
|
|
|
|
# 25 metadata files
|
|
|
|
# 1 last_metadata file
|
|
|
|
# 2 checkpoints
|
|
|
|
assert len(files) == 25 * 2 + 3
|
|
|
|
|
|
|
|
ok = False
|
|
|
|
for file in files:
|
|
|
|
if file.endswith("last_checkpoint"):
|
|
|
|
ok = True
|
|
|
|
assert ok
|
|
|
|
|
|
|
|
create_delta_table(instance, TABLE_NAME)
|
|
|
|
assert (
|
|
|
|
int(
|
|
|
|
instance.query(
|
|
|
|
f"SELECT count() FROM {TABLE_NAME} SETTINGS input_format_parquet_allow_missing_columns=1"
|
|
|
|
)
|
|
|
|
)
|
|
|
|
== 25
|
|
|
|
)
|
|
|
|
|
|
|
|
table = DeltaTable.forPath(spark, f"/{TABLE_NAME}")
|
|
|
|
table.delete("a < 10")
|
|
|
|
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
|
|
|
|
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 15
|
|
|
|
|
|
|
|
for i in range(0, 5):
|
|
|
|
write_delta_from_df(
|
|
|
|
spark,
|
|
|
|
generate_data(spark, i, i + 1),
|
|
|
|
f"/{TABLE_NAME}",
|
|
|
|
mode="append",
|
|
|
|
)
|
|
|
|
# + 1 metadata files (for delete)
|
|
|
|
# + 5 data files
|
|
|
|
# + 5 metadata files
|
|
|
|
# + 1 checkpoint file
|
|
|
|
# + 1 ?
|
|
|
|
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
|
|
|
|
assert len(files) == 53 + 1 + 5 * 2 + 1 + 1
|
|
|
|
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 20
|
|
|
|
|
2023-04-04 20:59:16 +00:00
|
|
|
assert (
|
|
|
|
instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY 1").strip()
|
|
|
|
== instance.query(
|
2023-04-18 14:25:21 +00:00
|
|
|
"SELECT * FROM ("
|
2023-04-04 20:59:16 +00:00
|
|
|
"SELECT number, toString(number + 1) FROM numbers(5) "
|
2023-04-18 14:25:21 +00:00
|
|
|
"UNION ALL SELECT number, toString(number + 1) FROM numbers(10, 15) "
|
|
|
|
") ORDER BY 1"
|
2023-04-04 20:59:16 +00:00
|
|
|
).strip()
|
|
|
|
)
|
2023-04-03 19:27:05 +00:00
|
|
|
|
|
|
|
|
2023-03-24 21:35:12 +00:00
|
|
|
def test_multiple_log_files(started_cluster):
|
|
|
|
instance = started_cluster.instances["node1"]
|
2023-04-11 15:23:05 +00:00
|
|
|
spark = started_cluster.spark_session
|
2023-03-24 21:35:12 +00:00
|
|
|
minio_client = started_cluster.minio_client
|
|
|
|
bucket = started_cluster.minio_bucket
|
2024-08-02 11:13:26 +00:00
|
|
|
TABLE_NAME = randomize_table_name("test_multiple_log_files")
|
2022-09-06 18:05:33 +00:00
|
|
|
|
2023-03-28 18:51:02 +00:00
|
|
|
write_delta_from_df(
|
|
|
|
spark, generate_data(spark, 0, 100), f"/{TABLE_NAME}", mode="overwrite"
|
|
|
|
)
|
2023-03-24 21:35:12 +00:00
|
|
|
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
|
2023-03-30 16:29:55 +00:00
|
|
|
assert len(files) == 2 # 1 metadata files + 1 data file
|
2023-03-24 21:35:12 +00:00
|
|
|
|
|
|
|
s3_objects = list(
|
Adjust the test_storage_delta::test_multiple_log_files after MinIO upgrade
After upgrade MinIO stopped handling of objects with leading slash:
(Pdb) p minio_client.fput_object(bucket_name='root', object_name='/test_multiple_log_files/_delta_log/00000000000000000001.json', file_path='/test_multiple_log_files/_delta_log/00000000000000000001.json').http_headers
HTTPHeaderDict({'Accept-Ranges': 'bytes', 'Content-Length': '0', 'ETag': '"f1f3276c787b3f6724b9a8edb313c4fa"', 'Server': 'MinIO', 'Strict-Transport-Security': 'max-age=31536000; includeSubDomains', 'Vary': 'Origin, Accept-Encoding', 'X-Amz-Id-2': '7987905dee74cdeb212432486a178e511309594cee7cb75f892cd53e35f09ea4', 'X-Amz-Request-Id': '178B9B0904A9640D', 'X-Content-Type-Options': 'nosniff', 'X-Xss-Protection': '1; mode=block', 'Date': 'Fri, 06 Oct 2023 19:22:27 GMT'})
(Pdb) p list(minio_client.list_objects('root', '/test_multiple_log_files/_delta_log/'))
[]
(Pdb) p minio_client.get_object(bucket_name='root', object_name='/test_multiple_log_files/_delta_log/00000000000000000001.json').read()
b'{"commitInfo":{"timestamp":1696618985577,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"100","numOutputBytes":"2459"},"engineInfo":"Apache-Spark/3.3.2 Delta-Lake/2.3.0","txnId":"2a69a4c9-f72e-4ac9-9374-bdff96707edf"}}\n{"add":{"path":"part-00000-9ff4728b-d5fe-49ac-a773-e2d6e01f4139-c000.parquet","partitionValues":{},"size":2459,"modificationTime":1696618985310,"dataChange":true,"stats":"{\\"numRecords\\":100,\\"minValues\\":{\\"col-ab7641d2-f0a9-4ffb-ae37-27087d13dfc8\\":0,\\"col-6cbfced4-5921-4e3b-9226-558171c11949\\":\\"1\\"},\\"maxValues\\":{\\"col-ab7641d2-f0a9-4ffb-ae37-27087d13dfc8\\":99,\\"col-6cbfced4-5921-4e3b-9226-558171c11949\\":\\"99\\"},\\"nullCount\\":{\\"col-ab7641d2-f0a9-4ffb-ae37-27087d13dfc8\\":0,\\"col-6cbfced4-5921-4e3b-9226-558171c11949\\":0}}"}}\n{"remove":{"path":"part-00000-7c29f5d9-e14d-419a-a273-58d07775e1e9-c000.parquet","deletionTimestamp":1696618985576,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":2459}}\n'
(Pdb) p list(minio_client.list_objects(bucket_name='root', prefix='test_multiple_log_files/', recursive=True))
[<minio.datatypes.Object object at 0x7ffff1ec2fe0>, <minio.datatypes.Object object at 0x7ffff1ec39d0>, <minio.datatypes.Object object at 0x7ffff1ec2590>, <minio.datatypes.Object object at 0x7ffff1ec3dc0>, <minio.datatypes.Object object at 0x7ffff1ec3a00>, <minio.datatypes.Object object at 0x7ffff1ec3fa0>, <minio.datatypes.Object object at 0x7ffff1ec3f40>, <minio.datatypes.Object object at 0x7ffff1ec2770>]
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2023-10-06 19:48:03 +00:00
|
|
|
minio_client.list_objects(bucket, f"{TABLE_NAME}/_delta_log/", recursive=True)
|
2023-03-24 21:35:12 +00:00
|
|
|
)
|
|
|
|
assert len(s3_objects) == 1
|
|
|
|
|
|
|
|
create_delta_table(instance, TABLE_NAME)
|
|
|
|
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
|
|
|
|
|
2023-03-28 18:51:02 +00:00
|
|
|
write_delta_from_df(
|
|
|
|
spark, generate_data(spark, 100, 200), f"/{TABLE_NAME}", mode="append"
|
|
|
|
)
|
2023-03-24 21:35:12 +00:00
|
|
|
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
|
2023-03-30 16:29:55 +00:00
|
|
|
assert len(files) == 4 # 2 metadata files + 2 data files
|
2023-03-24 21:35:12 +00:00
|
|
|
|
|
|
|
s3_objects = list(
|
Adjust the test_storage_delta::test_multiple_log_files after MinIO upgrade
After upgrade MinIO stopped handling of objects with leading slash:
(Pdb) p minio_client.fput_object(bucket_name='root', object_name='/test_multiple_log_files/_delta_log/00000000000000000001.json', file_path='/test_multiple_log_files/_delta_log/00000000000000000001.json').http_headers
HTTPHeaderDict({'Accept-Ranges': 'bytes', 'Content-Length': '0', 'ETag': '"f1f3276c787b3f6724b9a8edb313c4fa"', 'Server': 'MinIO', 'Strict-Transport-Security': 'max-age=31536000; includeSubDomains', 'Vary': 'Origin, Accept-Encoding', 'X-Amz-Id-2': '7987905dee74cdeb212432486a178e511309594cee7cb75f892cd53e35f09ea4', 'X-Amz-Request-Id': '178B9B0904A9640D', 'X-Content-Type-Options': 'nosniff', 'X-Xss-Protection': '1; mode=block', 'Date': 'Fri, 06 Oct 2023 19:22:27 GMT'})
(Pdb) p list(minio_client.list_objects('root', '/test_multiple_log_files/_delta_log/'))
[]
(Pdb) p minio_client.get_object(bucket_name='root', object_name='/test_multiple_log_files/_delta_log/00000000000000000001.json').read()
b'{"commitInfo":{"timestamp":1696618985577,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"100","numOutputBytes":"2459"},"engineInfo":"Apache-Spark/3.3.2 Delta-Lake/2.3.0","txnId":"2a69a4c9-f72e-4ac9-9374-bdff96707edf"}}\n{"add":{"path":"part-00000-9ff4728b-d5fe-49ac-a773-e2d6e01f4139-c000.parquet","partitionValues":{},"size":2459,"modificationTime":1696618985310,"dataChange":true,"stats":"{\\"numRecords\\":100,\\"minValues\\":{\\"col-ab7641d2-f0a9-4ffb-ae37-27087d13dfc8\\":0,\\"col-6cbfced4-5921-4e3b-9226-558171c11949\\":\\"1\\"},\\"maxValues\\":{\\"col-ab7641d2-f0a9-4ffb-ae37-27087d13dfc8\\":99,\\"col-6cbfced4-5921-4e3b-9226-558171c11949\\":\\"99\\"},\\"nullCount\\":{\\"col-ab7641d2-f0a9-4ffb-ae37-27087d13dfc8\\":0,\\"col-6cbfced4-5921-4e3b-9226-558171c11949\\":0}}"}}\n{"remove":{"path":"part-00000-7c29f5d9-e14d-419a-a273-58d07775e1e9-c000.parquet","deletionTimestamp":1696618985576,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":2459}}\n'
(Pdb) p list(minio_client.list_objects(bucket_name='root', prefix='test_multiple_log_files/', recursive=True))
[<minio.datatypes.Object object at 0x7ffff1ec2fe0>, <minio.datatypes.Object object at 0x7ffff1ec39d0>, <minio.datatypes.Object object at 0x7ffff1ec2590>, <minio.datatypes.Object object at 0x7ffff1ec3dc0>, <minio.datatypes.Object object at 0x7ffff1ec3a00>, <minio.datatypes.Object object at 0x7ffff1ec3fa0>, <minio.datatypes.Object object at 0x7ffff1ec3f40>, <minio.datatypes.Object object at 0x7ffff1ec2770>]
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2023-10-06 19:48:03 +00:00
|
|
|
minio_client.list_objects(bucket, f"{TABLE_NAME}/_delta_log/", recursive=True)
|
2023-03-24 21:35:12 +00:00
|
|
|
)
|
|
|
|
assert len(s3_objects) == 2
|
|
|
|
|
|
|
|
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 200
|
2023-03-28 18:51:02 +00:00
|
|
|
assert instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY 1") == instance.query(
|
2023-03-24 21:35:12 +00:00
|
|
|
"SELECT number, toString(number + 1) FROM numbers(200)"
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def test_metadata(started_cluster):
|
|
|
|
instance = started_cluster.instances["node1"]
|
2023-04-11 15:23:05 +00:00
|
|
|
spark = started_cluster.spark_session
|
2023-03-07 15:04:21 +00:00
|
|
|
minio_client = started_cluster.minio_client
|
2022-09-02 07:06:24 +00:00
|
|
|
bucket = started_cluster.minio_bucket
|
2024-08-02 11:13:26 +00:00
|
|
|
TABLE_NAME = randomize_table_name("test_metadata")
|
2023-03-24 21:35:12 +00:00
|
|
|
|
|
|
|
parquet_data_path = create_initial_data_file(
|
2023-04-05 18:32:37 +00:00
|
|
|
started_cluster,
|
|
|
|
instance,
|
|
|
|
"SELECT number, toString(number) FROM numbers(100)",
|
|
|
|
TABLE_NAME,
|
2023-03-24 21:35:12 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
write_delta_from_file(spark, parquet_data_path, f"/{TABLE_NAME}")
|
|
|
|
upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
|
2022-09-06 18:05:33 +00:00
|
|
|
|
2023-03-07 15:04:21 +00:00
|
|
|
data = get_file_contents(
|
|
|
|
minio_client,
|
|
|
|
bucket,
|
2023-03-24 21:35:12 +00:00
|
|
|
f"/{TABLE_NAME}/_delta_log/00000000000000000000.json",
|
2023-03-07 15:04:21 +00:00
|
|
|
)
|
|
|
|
delta_metadata = get_delta_metadata(data)
|
2022-09-02 07:06:24 +00:00
|
|
|
|
2023-03-07 15:04:21 +00:00
|
|
|
stats = json.loads(delta_metadata["add"]["stats"])
|
|
|
|
assert stats["numRecords"] == 100
|
|
|
|
assert next(iter(stats["minValues"].values())) == 0
|
|
|
|
assert next(iter(stats["maxValues"].values())) == 99
|
2022-09-02 07:06:24 +00:00
|
|
|
|
2023-03-24 21:35:12 +00:00
|
|
|
create_delta_table(instance, TABLE_NAME)
|
|
|
|
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
|
2023-03-21 19:19:30 +00:00
|
|
|
|
|
|
|
|
2023-03-07 15:04:21 +00:00
|
|
|
def test_types(started_cluster):
|
2024-08-02 11:13:26 +00:00
|
|
|
TABLE_NAME = randomize_table_name("test_types")
|
2023-04-11 15:23:05 +00:00
|
|
|
spark = started_cluster.spark_session
|
2024-08-02 11:13:26 +00:00
|
|
|
result_file = randomize_table_name(f"{TABLE_NAME}_result_2")
|
2023-03-28 18:51:02 +00:00
|
|
|
|
2023-03-07 15:04:21 +00:00
|
|
|
delta_table = (
|
|
|
|
DeltaTable.create(spark)
|
|
|
|
.tableName(TABLE_NAME)
|
|
|
|
.location(f"/{result_file}")
|
|
|
|
.addColumn("a", "INT")
|
|
|
|
.addColumn("b", "STRING")
|
|
|
|
.addColumn("c", "DATE")
|
|
|
|
.addColumn("d", "ARRAY<STRING>")
|
|
|
|
.addColumn("e", "BOOLEAN")
|
|
|
|
.execute()
|
|
|
|
)
|
|
|
|
data = [
|
|
|
|
(
|
|
|
|
123,
|
|
|
|
"string",
|
|
|
|
datetime.strptime("2000-01-01", "%Y-%m-%d"),
|
|
|
|
["str1", "str2"],
|
|
|
|
True,
|
|
|
|
)
|
|
|
|
]
|
2022-11-17 11:46:17 +00:00
|
|
|
|
2023-03-07 15:04:21 +00:00
|
|
|
schema = StructType(
|
|
|
|
[
|
|
|
|
StructField("a", IntegerType()),
|
|
|
|
StructField("b", StringType()),
|
|
|
|
StructField("c", DateType()),
|
|
|
|
StructField("d", ArrayType(StringType())),
|
|
|
|
StructField("e", BooleanType()),
|
|
|
|
]
|
|
|
|
)
|
|
|
|
df = spark.createDataFrame(data=data, schema=schema)
|
|
|
|
df.printSchema()
|
|
|
|
df.write.mode("append").format("delta").saveAsTable(TABLE_NAME)
|
2022-11-17 11:46:17 +00:00
|
|
|
|
2023-03-07 15:04:21 +00:00
|
|
|
minio_client = started_cluster.minio_client
|
2022-11-17 11:46:17 +00:00
|
|
|
bucket = started_cluster.minio_bucket
|
2023-03-07 15:04:21 +00:00
|
|
|
upload_directory(minio_client, bucket, f"/{result_file}", "")
|
|
|
|
|
|
|
|
instance = started_cluster.instances["node1"]
|
|
|
|
instance.query(
|
|
|
|
f"""
|
|
|
|
DROP TABLE IF EXISTS {TABLE_NAME};
|
|
|
|
CREATE TABLE {TABLE_NAME} ENGINE=DeltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')"""
|
|
|
|
)
|
|
|
|
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 1
|
|
|
|
assert (
|
|
|
|
instance.query(f"SELECT * FROM {TABLE_NAME}").strip()
|
|
|
|
== "123\tstring\t2000-01-01\t['str1','str2']\ttrue"
|
|
|
|
)
|
|
|
|
|
|
|
|
table_function = f"deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')"
|
|
|
|
assert (
|
|
|
|
instance.query(f"SELECT * FROM {table_function}").strip()
|
|
|
|
== "123\tstring\t2000-01-01\t['str1','str2']\ttrue"
|
2022-11-17 11:46:17 +00:00
|
|
|
)
|
|
|
|
|
2023-03-07 15:04:21 +00:00
|
|
|
assert instance.query(f"DESCRIBE {table_function} FORMAT TSV") == TSV(
|
2022-11-17 11:46:17 +00:00
|
|
|
[
|
2023-03-07 15:04:21 +00:00
|
|
|
["a", "Nullable(Int32)"],
|
|
|
|
["b", "Nullable(String)"],
|
|
|
|
["c", "Nullable(Date32)"],
|
|
|
|
["d", "Array(Nullable(String))"],
|
|
|
|
["e", "Nullable(Bool)"],
|
2022-11-17 11:46:17 +00:00
|
|
|
]
|
|
|
|
)
|
2024-01-22 17:24:48 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_restart_broken(started_cluster):
|
|
|
|
instance = started_cluster.instances["node1"]
|
|
|
|
spark = started_cluster.spark_session
|
|
|
|
minio_client = started_cluster.minio_client
|
|
|
|
bucket = "broken"
|
2024-08-02 11:13:26 +00:00
|
|
|
TABLE_NAME = randomize_table_name("test_restart_broken")
|
2024-01-22 17:24:48 +00:00
|
|
|
|
|
|
|
if not minio_client.bucket_exists(bucket):
|
|
|
|
minio_client.make_bucket(bucket)
|
|
|
|
|
|
|
|
parquet_data_path = create_initial_data_file(
|
|
|
|
started_cluster,
|
|
|
|
instance,
|
|
|
|
"SELECT number, toString(number) FROM numbers(100)",
|
|
|
|
TABLE_NAME,
|
|
|
|
)
|
|
|
|
|
|
|
|
write_delta_from_file(spark, parquet_data_path, f"/{TABLE_NAME}")
|
|
|
|
upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
|
|
|
|
create_delta_table(instance, TABLE_NAME, bucket=bucket)
|
|
|
|
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
|
|
|
|
|
|
|
|
s3_objects = list_s3_objects(minio_client, bucket, prefix="")
|
|
|
|
assert (
|
|
|
|
len(
|
|
|
|
list(
|
|
|
|
minio_client.remove_objects(
|
|
|
|
bucket,
|
|
|
|
[DeleteObject(obj) for obj in s3_objects],
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
== 0
|
|
|
|
)
|
|
|
|
minio_client.remove_bucket(bucket)
|
|
|
|
|
|
|
|
instance.restart_clickhouse()
|
|
|
|
|
2024-01-22 17:39:44 +00:00
|
|
|
assert "NoSuchBucket" in instance.query_and_get_error(
|
|
|
|
f"SELECT count() FROM {TABLE_NAME}"
|
|
|
|
)
|
2024-01-22 17:24:48 +00:00
|
|
|
|
2024-07-30 12:35:19 +00:00
|
|
|
s3_disk_no_key_errors_metric_value = int(
|
|
|
|
instance.query(
|
|
|
|
"""
|
|
|
|
SELECT value
|
|
|
|
FROM system.metrics
|
2024-08-14 17:53:33 +00:00
|
|
|
WHERE metric = 'DiskS3NoSuchKeyErrors'
|
2024-07-30 12:35:19 +00:00
|
|
|
"""
|
|
|
|
).strip()
|
|
|
|
)
|
|
|
|
|
|
|
|
assert s3_disk_no_key_errors_metric_value == 0
|
|
|
|
|
2024-01-22 17:24:48 +00:00
|
|
|
minio_client.make_bucket(bucket)
|
|
|
|
|
|
|
|
upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
|
|
|
|
|
|
|
|
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
|
2024-01-23 12:28:25 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_restart_broken_table_function(started_cluster):
|
|
|
|
instance = started_cluster.instances["node1"]
|
|
|
|
spark = started_cluster.spark_session
|
|
|
|
minio_client = started_cluster.minio_client
|
2024-01-24 15:53:10 +00:00
|
|
|
bucket = "broken2"
|
2024-08-02 11:13:26 +00:00
|
|
|
TABLE_NAME = randomize_table_name("test_restart_broken_table_function")
|
2024-01-23 12:28:25 +00:00
|
|
|
|
|
|
|
if not minio_client.bucket_exists(bucket):
|
|
|
|
minio_client.make_bucket(bucket)
|
|
|
|
|
|
|
|
parquet_data_path = create_initial_data_file(
|
|
|
|
started_cluster,
|
|
|
|
instance,
|
|
|
|
"SELECT number, toString(number) FROM numbers(100)",
|
|
|
|
TABLE_NAME,
|
|
|
|
)
|
|
|
|
|
|
|
|
write_delta_from_file(spark, parquet_data_path, f"/{TABLE_NAME}")
|
|
|
|
upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
|
|
|
|
instance.query(
|
|
|
|
f"""
|
|
|
|
DROP TABLE IF EXISTS {TABLE_NAME};
|
|
|
|
CREATE TABLE {TABLE_NAME}
|
|
|
|
AS deltaLake(s3, filename = '{TABLE_NAME}/', url = 'http://minio1:9001/{bucket}/')"""
|
|
|
|
)
|
|
|
|
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
|
|
|
|
|
|
|
|
s3_objects = list_s3_objects(minio_client, bucket, prefix="")
|
|
|
|
assert (
|
|
|
|
len(
|
|
|
|
list(
|
|
|
|
minio_client.remove_objects(
|
|
|
|
bucket,
|
|
|
|
[DeleteObject(obj) for obj in s3_objects],
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
== 0
|
|
|
|
)
|
|
|
|
minio_client.remove_bucket(bucket)
|
|
|
|
|
|
|
|
instance.restart_clickhouse()
|
|
|
|
|
|
|
|
assert "NoSuchBucket" in instance.query_and_get_error(
|
|
|
|
f"SELECT count() FROM {TABLE_NAME}"
|
|
|
|
)
|
|
|
|
|
|
|
|
minio_client.make_bucket(bucket)
|
|
|
|
|
|
|
|
upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
|
|
|
|
|
|
|
|
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
|
2024-04-30 08:44:25 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_partition_columns(started_cluster):
|
|
|
|
instance = started_cluster.instances["node1"]
|
|
|
|
spark = started_cluster.spark_session
|
|
|
|
minio_client = started_cluster.minio_client
|
|
|
|
bucket = started_cluster.minio_bucket
|
2024-08-02 11:13:26 +00:00
|
|
|
TABLE_NAME = randomize_table_name("test_partition_columns")
|
2024-04-30 08:44:25 +00:00
|
|
|
result_file = f"{TABLE_NAME}"
|
2024-05-13 17:54:29 +00:00
|
|
|
partition_columns = ["b", "c", "d", "e"]
|
2024-04-30 08:44:25 +00:00
|
|
|
|
|
|
|
delta_table = (
|
|
|
|
DeltaTable.create(spark)
|
|
|
|
.tableName(TABLE_NAME)
|
|
|
|
.location(f"/{result_file}")
|
|
|
|
.addColumn("a", "INT")
|
|
|
|
.addColumn("b", "STRING")
|
|
|
|
.addColumn("c", "DATE")
|
2024-05-13 17:54:29 +00:00
|
|
|
.addColumn("d", "INT")
|
|
|
|
.addColumn("e", "BOOLEAN")
|
|
|
|
.partitionedBy(partition_columns)
|
2024-04-30 08:44:25 +00:00
|
|
|
.execute()
|
|
|
|
)
|
|
|
|
num_rows = 9
|
|
|
|
|
|
|
|
schema = StructType(
|
|
|
|
[
|
|
|
|
StructField("a", IntegerType()),
|
|
|
|
StructField("b", StringType()),
|
|
|
|
StructField("c", DateType()),
|
2024-05-13 17:54:29 +00:00
|
|
|
StructField("d", IntegerType()),
|
|
|
|
StructField("e", BooleanType()),
|
2024-04-30 08:44:25 +00:00
|
|
|
]
|
|
|
|
)
|
|
|
|
|
|
|
|
for i in range(1, num_rows + 1):
|
|
|
|
data = [
|
|
|
|
(
|
|
|
|
i,
|
|
|
|
"test" + str(i),
|
|
|
|
datetime.strptime(f"2000-01-0{i}", "%Y-%m-%d"),
|
2024-05-13 17:54:29 +00:00
|
|
|
i,
|
2024-08-15 12:09:48 +00:00
|
|
|
False if i % 2 == 0 else True,
|
2024-04-30 08:44:25 +00:00
|
|
|
)
|
|
|
|
]
|
|
|
|
df = spark.createDataFrame(data=data, schema=schema)
|
|
|
|
df.printSchema()
|
2024-05-13 17:54:29 +00:00
|
|
|
df.write.mode("append").format("delta").partitionBy(partition_columns).save(
|
2024-04-30 08:44:25 +00:00
|
|
|
f"/{TABLE_NAME}"
|
|
|
|
)
|
|
|
|
|
|
|
|
minio_client = started_cluster.minio_client
|
|
|
|
bucket = started_cluster.minio_bucket
|
|
|
|
|
|
|
|
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
|
|
|
|
assert len(files) > 0
|
|
|
|
print(f"Uploaded files: {files}")
|
|
|
|
|
|
|
|
result = instance.query(
|
|
|
|
f"describe table deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')"
|
|
|
|
).strip()
|
|
|
|
|
|
|
|
assert (
|
|
|
|
result
|
2024-05-13 17:54:29 +00:00
|
|
|
== "a\tNullable(Int32)\t\t\t\t\t\nb\tNullable(String)\t\t\t\t\t\nc\tNullable(Date32)\t\t\t\t\t\nd\tNullable(Int32)\t\t\t\t\t\ne\tNullable(Bool)"
|
2024-04-30 08:44:25 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
result = int(
|
|
|
|
instance.query(
|
|
|
|
f"""SELECT count()
|
|
|
|
FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')
|
|
|
|
"""
|
|
|
|
)
|
|
|
|
)
|
|
|
|
assert result == num_rows
|
|
|
|
result = int(
|
|
|
|
instance.query(
|
|
|
|
f"""SELECT count()
|
|
|
|
FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')
|
|
|
|
WHERE c == toDateTime('2000/01/05')
|
|
|
|
"""
|
|
|
|
)
|
|
|
|
)
|
|
|
|
assert result == 1
|
|
|
|
|
2024-07-09 14:24:35 +00:00
|
|
|
instance.query(
|
|
|
|
f"""
|
|
|
|
DROP TABLE IF EXISTS {TABLE_NAME};
|
|
|
|
CREATE TABLE {TABLE_NAME} (a Nullable(Int32), b Nullable(String), c Nullable(Date32), d Nullable(Int32), e Nullable(Bool))
|
|
|
|
ENGINE=DeltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')"""
|
|
|
|
)
|
|
|
|
assert (
|
2024-08-15 12:09:48 +00:00
|
|
|
"""1 test1 2000-01-01 1 true
|
2024-07-09 14:24:35 +00:00
|
|
|
2 test2 2000-01-02 2 false
|
2024-08-15 12:09:48 +00:00
|
|
|
3 test3 2000-01-03 3 true
|
2024-07-09 14:24:35 +00:00
|
|
|
4 test4 2000-01-04 4 false
|
2024-08-15 12:09:48 +00:00
|
|
|
5 test5 2000-01-05 5 true
|
2024-07-09 14:24:35 +00:00
|
|
|
6 test6 2000-01-06 6 false
|
2024-08-15 12:09:48 +00:00
|
|
|
7 test7 2000-01-07 7 true
|
2024-07-09 14:24:35 +00:00
|
|
|
8 test8 2000-01-08 8 false
|
2024-08-15 12:09:48 +00:00
|
|
|
9 test9 2000-01-09 9 true"""
|
2024-07-09 14:24:35 +00:00
|
|
|
== instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY b").strip()
|
|
|
|
)
|
|
|
|
|
|
|
|
assert (
|
|
|
|
int(
|
|
|
|
instance.query(
|
|
|
|
f"SELECT count() FROM {TABLE_NAME} WHERE c == toDateTime('2000/01/05')"
|
|
|
|
)
|
|
|
|
)
|
|
|
|
== 1
|
|
|
|
)
|
|
|
|
|
|
|
|
# Subset of columns should work.
|
|
|
|
instance.query(
|
|
|
|
f"""
|
|
|
|
DROP TABLE IF EXISTS {TABLE_NAME};
|
|
|
|
CREATE TABLE {TABLE_NAME} (b Nullable(String), c Nullable(Date32), d Nullable(Int32))
|
|
|
|
ENGINE=DeltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')"""
|
|
|
|
)
|
|
|
|
assert (
|
|
|
|
"""test1 2000-01-01 1
|
|
|
|
test2 2000-01-02 2
|
|
|
|
test3 2000-01-03 3
|
|
|
|
test4 2000-01-04 4
|
|
|
|
test5 2000-01-05 5
|
|
|
|
test6 2000-01-06 6
|
|
|
|
test7 2000-01-07 7
|
|
|
|
test8 2000-01-08 8
|
|
|
|
test9 2000-01-09 9"""
|
|
|
|
== instance.query(f"SELECT * FROM {TABLE_NAME} ORDER BY b").strip()
|
|
|
|
)
|
|
|
|
|
|
|
|
for i in range(num_rows + 1, 2 * num_rows + 1):
|
|
|
|
data = [
|
|
|
|
(
|
|
|
|
i,
|
|
|
|
"test" + str(i),
|
|
|
|
datetime.strptime(f"2000-01-{i}", "%Y-%m-%d"),
|
|
|
|
i,
|
2024-08-15 12:09:48 +00:00
|
|
|
False if i % 2 == 0 else True,
|
2024-07-09 14:24:35 +00:00
|
|
|
)
|
|
|
|
]
|
|
|
|
df = spark.createDataFrame(data=data, schema=schema)
|
|
|
|
df.printSchema()
|
|
|
|
df.write.mode("append").format("delta").partitionBy(partition_columns).save(
|
|
|
|
f"/{TABLE_NAME}"
|
|
|
|
)
|
|
|
|
|
|
|
|
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
|
|
|
|
ok = False
|
|
|
|
for file in files:
|
|
|
|
if file.endswith("last_checkpoint"):
|
|
|
|
ok = True
|
|
|
|
assert ok
|
|
|
|
|
|
|
|
result = int(
|
|
|
|
instance.query(
|
|
|
|
f"""SELECT count()
|
|
|
|
FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')
|
|
|
|
"""
|
|
|
|
)
|
|
|
|
)
|
|
|
|
assert result == num_rows * 2
|
|
|
|
|
|
|
|
assert (
|
2024-08-15 12:09:48 +00:00
|
|
|
"""1 test1 2000-01-01 1 true
|
2024-07-09 14:24:35 +00:00
|
|
|
2 test2 2000-01-02 2 false
|
2024-08-15 12:09:48 +00:00
|
|
|
3 test3 2000-01-03 3 true
|
2024-07-09 14:24:35 +00:00
|
|
|
4 test4 2000-01-04 4 false
|
2024-08-15 12:09:48 +00:00
|
|
|
5 test5 2000-01-05 5 true
|
2024-07-09 14:24:35 +00:00
|
|
|
6 test6 2000-01-06 6 false
|
2024-08-15 12:09:48 +00:00
|
|
|
7 test7 2000-01-07 7 true
|
2024-07-09 14:24:35 +00:00
|
|
|
8 test8 2000-01-08 8 false
|
2024-08-15 12:09:48 +00:00
|
|
|
9 test9 2000-01-09 9 true
|
2024-07-09 14:24:35 +00:00
|
|
|
10 test10 2000-01-10 10 false
|
2024-08-15 12:09:48 +00:00
|
|
|
11 test11 2000-01-11 11 true
|
2024-07-09 14:24:35 +00:00
|
|
|
12 test12 2000-01-12 12 false
|
2024-08-15 12:09:48 +00:00
|
|
|
13 test13 2000-01-13 13 true
|
2024-07-09 14:24:35 +00:00
|
|
|
14 test14 2000-01-14 14 false
|
2024-08-15 12:09:48 +00:00
|
|
|
15 test15 2000-01-15 15 true
|
2024-07-09 14:24:35 +00:00
|
|
|
16 test16 2000-01-16 16 false
|
2024-08-15 12:09:48 +00:00
|
|
|
17 test17 2000-01-17 17 true
|
2024-07-09 14:24:35 +00:00
|
|
|
18 test18 2000-01-18 18 false"""
|
|
|
|
== instance.query(
|
|
|
|
f"""
|
|
|
|
SELECT * FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123') ORDER BY c
|
|
|
|
"""
|
|
|
|
).strip()
|
|
|
|
)
|
|
|
|
assert (
|
|
|
|
int(
|
|
|
|
instance.query(
|
|
|
|
f"SELECT count() FROM {TABLE_NAME} WHERE c == toDateTime('2000/01/15')"
|
|
|
|
)
|
|
|
|
)
|
|
|
|
== 1
|
|
|
|
)
|
2024-08-23 16:14:02 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_complex_types(started_cluster):
|
|
|
|
node = started_cluster.instances["node1"]
|
|
|
|
minio_client = started_cluster.minio_client
|
|
|
|
bucket = started_cluster.minio_bucket
|
|
|
|
|
|
|
|
schema = pa.schema(
|
|
|
|
[
|
|
|
|
("id", pa.int32()),
|
|
|
|
("name", pa.string()),
|
|
|
|
(
|
|
|
|
"address",
|
|
|
|
pa.struct(
|
|
|
|
[
|
|
|
|
("street", pa.string()),
|
|
|
|
("city", pa.string()),
|
|
|
|
("state", pa.string()),
|
|
|
|
]
|
|
|
|
),
|
|
|
|
),
|
|
|
|
("interests", pa.list_(pa.string())),
|
|
|
|
(
|
|
|
|
"metadata",
|
|
|
|
pa.map_(
|
|
|
|
pa.string(), pa.string()
|
|
|
|
), # Map with string keys and string values
|
|
|
|
),
|
|
|
|
]
|
|
|
|
)
|
|
|
|
|
|
|
|
# Create sample data
|
|
|
|
data = [
|
|
|
|
pa.array([1, 2, 3], type=pa.int32()),
|
|
|
|
pa.array(["John Doe", "Jane Smith", "Jake Johnson"], type=pa.string()),
|
|
|
|
pa.array(
|
|
|
|
[
|
|
|
|
{"street": "123 Elm St", "city": "Springfield", "state": "IL"},
|
|
|
|
{"street": "456 Maple St", "city": "Shelbyville", "state": "IL"},
|
|
|
|
{"street": "789 Oak St", "city": "Ogdenville", "state": "IL"},
|
|
|
|
],
|
|
|
|
type=schema.field("address").type,
|
|
|
|
),
|
|
|
|
pa.array(
|
|
|
|
[
|
|
|
|
pa.array(["dancing", "coding", "hiking"]),
|
|
|
|
pa.array(["dancing", "coding", "hiking"]),
|
|
|
|
pa.array(["dancing", "coding", "hiking"]),
|
|
|
|
],
|
|
|
|
type=schema.field("interests").type,
|
|
|
|
),
|
|
|
|
pa.array(
|
|
|
|
[
|
|
|
|
{"key1": "value1", "key2": "value2"},
|
|
|
|
{"key1": "value3", "key2": "value4"},
|
|
|
|
{"key1": "value5", "key2": "value6"},
|
|
|
|
],
|
|
|
|
type=schema.field("metadata").type,
|
|
|
|
),
|
|
|
|
]
|
|
|
|
|
|
|
|
endpoint_url = f"http://{started_cluster.minio_ip}:{started_cluster.minio_port}"
|
|
|
|
aws_access_key_id = "minio"
|
|
|
|
aws_secret_access_key = "minio123"
|
2024-08-26 09:33:08 +00:00
|
|
|
table_name = randomize_table_name("test_complex_types")
|
2024-08-23 16:14:02 +00:00
|
|
|
|
|
|
|
storage_options = {
|
|
|
|
"AWS_ENDPOINT_URL": endpoint_url,
|
|
|
|
"AWS_ACCESS_KEY_ID": aws_access_key_id,
|
|
|
|
"AWS_SECRET_ACCESS_KEY": aws_secret_access_key,
|
|
|
|
"AWS_ALLOW_HTTP": "true",
|
|
|
|
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
|
|
|
|
}
|
2024-08-26 09:33:08 +00:00
|
|
|
path = f"s3://root/{table_name}"
|
2024-08-23 16:14:02 +00:00
|
|
|
table = pa.Table.from_arrays(data, schema=schema)
|
|
|
|
|
|
|
|
write_deltalake(path, table, storage_options=storage_options)
|
|
|
|
|
|
|
|
assert "1\n2\n3\n" in node.query(
|
2024-08-26 09:33:08 +00:00
|
|
|
f"SELECT id FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{table_name}' , 'minio', 'minio123')"
|
2024-08-23 16:14:02 +00:00
|
|
|
)
|
|
|
|
assert (
|
|
|
|
"('123 Elm St','Springfield','IL')\n('456 Maple St','Shelbyville','IL')\n('789 Oak St','Ogdenville','IL')"
|
|
|
|
in node.query(
|
2024-08-26 09:33:08 +00:00
|
|
|
f"SELECT address FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{table_name}' , 'minio', 'minio123')"
|
2024-08-23 16:14:02 +00:00
|
|
|
)
|
|
|
|
)
|
|
|
|
assert (
|
|
|
|
"{'key1':'value1','key2':'value2'}\n{'key1':'value3','key2':'value4'}\n{'key1':'value5','key2':'value6'}"
|
|
|
|
in node.query(
|
2024-08-26 09:33:08 +00:00
|
|
|
f"SELECT metadata FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{table_name}' , 'minio', 'minio123')"
|
2024-08-23 16:14:02 +00:00
|
|
|
)
|
|
|
|
)
|
2024-10-03 16:01:15 +00:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize("storage_type", ["s3"])
|
|
|
|
def test_filesystem_cache(started_cluster, storage_type):
|
|
|
|
instance = started_cluster.instances["node1"]
|
|
|
|
spark = started_cluster.spark_session
|
|
|
|
minio_client = started_cluster.minio_client
|
|
|
|
TABLE_NAME = randomize_table_name("test_filesystem_cache")
|
|
|
|
bucket = started_cluster.minio_bucket
|
|
|
|
|
|
|
|
if not minio_client.bucket_exists(bucket):
|
|
|
|
minio_client.make_bucket(bucket)
|
|
|
|
|
|
|
|
parquet_data_path = create_initial_data_file(
|
|
|
|
started_cluster,
|
|
|
|
instance,
|
|
|
|
"SELECT number, toString(number) FROM numbers(100)",
|
|
|
|
TABLE_NAME,
|
|
|
|
)
|
|
|
|
|
|
|
|
write_delta_from_file(spark, parquet_data_path, f"/{TABLE_NAME}")
|
|
|
|
upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
|
|
|
|
create_delta_table(instance, TABLE_NAME, bucket=bucket)
|
|
|
|
|
|
|
|
query_id = f"{TABLE_NAME}-{uuid.uuid4()}"
|
|
|
|
instance.query(
|
|
|
|
f"SELECT * FROM {TABLE_NAME} SETTINGS filesystem_cache_name = 'cache1'",
|
|
|
|
query_id=query_id,
|
|
|
|
)
|
|
|
|
|
|
|
|
instance.query("SYSTEM FLUSH LOGS")
|
|
|
|
|
|
|
|
count = int(
|
|
|
|
instance.query(
|
|
|
|
f"SELECT ProfileEvents['CachedReadBufferCacheWriteBytes'] FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'"
|
|
|
|
)
|
|
|
|
)
|
|
|
|
assert 0 < int(
|
|
|
|
instance.query(
|
|
|
|
f"SELECT ProfileEvents['S3GetObject'] FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'"
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
query_id = f"{TABLE_NAME}-{uuid.uuid4()}"
|
|
|
|
instance.query(
|
|
|
|
f"SELECT * FROM {TABLE_NAME} SETTINGS filesystem_cache_name = 'cache1'",
|
|
|
|
query_id=query_id,
|
|
|
|
)
|
|
|
|
|
|
|
|
instance.query("SYSTEM FLUSH LOGS")
|
|
|
|
|
|
|
|
assert count == int(
|
|
|
|
instance.query(
|
|
|
|
f"SELECT ProfileEvents['CachedReadBufferReadFromCacheBytes'] FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'"
|
|
|
|
)
|
|
|
|
)
|
|
|
|
assert 0 == int(
|
|
|
|
instance.query(
|
|
|
|
f"SELECT ProfileEvents['S3GetObject'] FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'"
|
|
|
|
)
|
|
|
|
)
|