mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Add spark to tests, rewrite tests, fix bug
This commit is contained in:
parent
41c79b0c42
commit
0240ad4c68
@ -32,6 +32,7 @@ RUN apt-get update \
|
|||||||
libssl-dev \
|
libssl-dev \
|
||||||
libcurl4-openssl-dev \
|
libcurl4-openssl-dev \
|
||||||
gdb \
|
gdb \
|
||||||
|
default-jdk \
|
||||||
software-properties-common \
|
software-properties-common \
|
||||||
libkrb5-dev \
|
libkrb5-dev \
|
||||||
krb5-user \
|
krb5-user \
|
||||||
@ -92,6 +93,8 @@ RUN python3 -m pip install \
|
|||||||
tzlocal==2.1 \
|
tzlocal==2.1 \
|
||||||
urllib3 \
|
urllib3 \
|
||||||
requests-kerberos \
|
requests-kerberos \
|
||||||
|
pyspark==3.3.2 \
|
||||||
|
delta-spark==2.2.0 \
|
||||||
pyhdfs \
|
pyhdfs \
|
||||||
azure-storage-blob \
|
azure-storage-blob \
|
||||||
meilisearch==0.18.3
|
meilisearch==0.18.3
|
||||||
@ -101,6 +104,9 @@ COPY dockerd-entrypoint.sh /usr/local/bin/
|
|||||||
COPY compose/ /compose/
|
COPY compose/ /compose/
|
||||||
COPY misc/ /misc/
|
COPY misc/ /misc/
|
||||||
|
|
||||||
|
RUN wget https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
|
||||||
|
RUN tar xzvf spark-3.3.2-bin-hadoop3.tgz -C /
|
||||||
|
|
||||||
RUN set -x \
|
RUN set -x \
|
||||||
&& addgroup --system dockremap \
|
&& addgroup --system dockremap \
|
||||||
&& adduser --system dockremap \
|
&& adduser --system dockremap \
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
set -e
|
set -eu
|
||||||
|
|
||||||
mkdir -p /etc/docker/
|
mkdir -p /etc/docker/
|
||||||
echo '{
|
echo '{
|
||||||
@ -37,6 +37,13 @@ set -e
|
|||||||
docker ps --all --quiet | xargs --no-run-if-empty docker rm || true
|
docker ps --all --quiet | xargs --no-run-if-empty docker rm || true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
java_path="$(update-alternatives --config java | sed -n 's/.*(providing \/usr\/bin\/java): //p')"
|
||||||
|
export JAVA_PATH=$java_path
|
||||||
|
java -version
|
||||||
|
export SPARK_HOME="/spark-3.3.2-bin-hadoop3"
|
||||||
|
export PATH=$SPARK_HOME/bin:$PATH
|
||||||
|
pyspark --version
|
||||||
|
|
||||||
echo "Start tests"
|
echo "Start tests"
|
||||||
export CLICKHOUSE_TESTS_SERVER_BIN_PATH=/clickhouse
|
export CLICKHOUSE_TESTS_SERVER_BIN_PATH=/clickhouse
|
||||||
export CLICKHOUSE_TESTS_CLIENT_BIN_PATH=/clickhouse
|
export CLICKHOUSE_TESTS_CLIENT_BIN_PATH=/clickhouse
|
||||||
|
@ -71,10 +71,9 @@ public:
|
|||||||
|
|
||||||
Configuration new_configuration(configuration);
|
Configuration new_configuration(configuration);
|
||||||
|
|
||||||
new_configuration.appendToPath(
|
new_configuration.appendToPath(MetadataParser::generateQueryFromKeys(keys, configuration.format));
|
||||||
std::filesystem::path(Name::data_directory_prefix) / MetadataParser::generateQueryFromKeys(keys, configuration.format));
|
|
||||||
|
|
||||||
LOG_DEBUG(log, "Table path: {}, new uri: {}", configuration.url.key, configuration.getPath());
|
LOG_DEBUG(log, "Table path: {}, new uri: {}", configuration.url.key, new_configuration.getPath());
|
||||||
|
|
||||||
return new_configuration;
|
return new_configuration;
|
||||||
}
|
}
|
||||||
|
@ -86,7 +86,6 @@ void DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::init(ContextPtr
|
|||||||
template <typename Configuration, typename MetadataReadHelper>
|
template <typename Configuration, typename MetadataReadHelper>
|
||||||
std::vector<String> DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::getJsonLogFiles() const
|
std::vector<String> DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::getJsonLogFiles() const
|
||||||
{
|
{
|
||||||
|
|
||||||
/// DeltaLake format stores all metadata json files in _delta_log directory
|
/// DeltaLake format stores all metadata json files in _delta_log directory
|
||||||
static constexpr auto deltalake_metadata_directory = "_delta_log";
|
static constexpr auto deltalake_metadata_directory = "_delta_log";
|
||||||
static constexpr auto meta_file_suffix = ".json";
|
static constexpr auto meta_file_suffix = ".json";
|
||||||
@ -121,8 +120,14 @@ void DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::handleJSON(cons
|
|||||||
template <typename Configuration, typename MetadataReadHelper>
|
template <typename Configuration, typename MetadataReadHelper>
|
||||||
String DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::generateQueryFromKeys(const std::vector<String> & keys, const String &)
|
String DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::generateQueryFromKeys(const std::vector<String> & keys, const String &)
|
||||||
{
|
{
|
||||||
std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ","));
|
if (keys.size() == 1)
|
||||||
return new_query;
|
{
|
||||||
|
return fmt::format("{}", keys[0]);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return fmt::format("{{{}}}", fmt::join(keys, ","));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::DeltaLakeMetadataParser(
|
template DeltaLakeMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::DeltaLakeMetadataParser(
|
||||||
|
@ -53,7 +53,6 @@ private:
|
|||||||
struct StorageDeltaLakeName
|
struct StorageDeltaLakeName
|
||||||
{
|
{
|
||||||
static constexpr auto name = "DeltaLake";
|
static constexpr auto name = "DeltaLake";
|
||||||
static constexpr auto data_directory_prefix = "";
|
|
||||||
};
|
};
|
||||||
|
|
||||||
using StorageDeltaLake
|
using StorageDeltaLake
|
||||||
|
@ -32,7 +32,6 @@ private:
|
|||||||
struct StorageHudiName
|
struct StorageHudiName
|
||||||
{
|
{
|
||||||
static constexpr auto name = "Hudi";
|
static constexpr auto name = "Hudi";
|
||||||
static constexpr auto data_directory_prefix = "";
|
|
||||||
};
|
};
|
||||||
|
|
||||||
using StorageHudi
|
using StorageHudi
|
||||||
|
@ -215,8 +215,14 @@ std::vector<String> IcebergMetadataParser<Configuration, MetadataReadHelper>::ge
|
|||||||
template <typename Configuration, typename MetadataReadHelper>
|
template <typename Configuration, typename MetadataReadHelper>
|
||||||
String IcebergMetadataParser<Configuration, MetadataReadHelper>::generateQueryFromKeys(const std::vector<String> & keys, const String &)
|
String IcebergMetadataParser<Configuration, MetadataReadHelper>::generateQueryFromKeys(const std::vector<String> & keys, const String &)
|
||||||
{
|
{
|
||||||
std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ","));
|
if (keys.size() == 1)
|
||||||
return new_query;
|
{
|
||||||
|
return fmt::format("{}", keys[0]);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return fmt::format("{{{}}}", fmt::join(keys, ","));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::IcebergMetadataParser(
|
template IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>::IcebergMetadataParser(
|
||||||
|
@ -42,7 +42,6 @@ private:
|
|||||||
struct StorageIcebergName
|
struct StorageIcebergName
|
||||||
{
|
{
|
||||||
static constexpr auto name = "Iceberg";
|
static constexpr auto name = "Iceberg";
|
||||||
static constexpr auto data_directory_prefix = "data";
|
|
||||||
};
|
};
|
||||||
|
|
||||||
using StorageIceberg
|
using StorageIceberg
|
||||||
|
83
tests/integration/helpers/s3_tools.py
Normal file
83
tests/integration/helpers/s3_tools.py
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
from minio import Minio
|
||||||
|
import glob
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
|
def upload_directory(minio_client, bucket_name, local_path, s3_path):
|
||||||
|
for local_file in glob.glob(local_path + "/**"):
|
||||||
|
if os.path.isfile(local_file):
|
||||||
|
result_local_path = os.path.join(local_path, local_file)
|
||||||
|
result_s3_path = os.path.join(s3_path, local_file)
|
||||||
|
print(f"Putting file {result_local_path} to {result_s3_path}")
|
||||||
|
minio_client.fput_object(
|
||||||
|
bucket_name=bucket_name,
|
||||||
|
object_name=result_s3_path,
|
||||||
|
file_path=result_local_path,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
upload_directory(
|
||||||
|
minio_client,
|
||||||
|
bucket_name,
|
||||||
|
os.path.join(local_path, local_file),
|
||||||
|
os.path.join(s3_path, local_file),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_file_contents(minio_client, bucket, s3_path):
|
||||||
|
data = minio_client.get_object(bucket, s3_path)
|
||||||
|
data_str = b""
|
||||||
|
for chunk in data.stream():
|
||||||
|
data_str += chunk
|
||||||
|
return data_str.decode()
|
||||||
|
|
||||||
|
|
||||||
|
# Creates S3 bucket for tests and allows anonymous read-write access to it.
|
||||||
|
def prepare_s3_bucket(started_cluster):
|
||||||
|
# Allows read-write access for bucket without authorization.
|
||||||
|
bucket_read_write_policy = {
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement": [
|
||||||
|
{
|
||||||
|
"Sid": "",
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Principal": {"AWS": "*"},
|
||||||
|
"Action": "s3:GetBucketLocation",
|
||||||
|
"Resource": "arn:aws:s3:::root",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Sid": "",
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Principal": {"AWS": "*"},
|
||||||
|
"Action": "s3:ListBucket",
|
||||||
|
"Resource": "arn:aws:s3:::root",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Sid": "",
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Principal": {"AWS": "*"},
|
||||||
|
"Action": "s3:GetObject",
|
||||||
|
"Resource": "arn:aws:s3:::root/*",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Sid": "",
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Principal": {"AWS": "*"},
|
||||||
|
"Action": "s3:PutObject",
|
||||||
|
"Resource": "arn:aws:s3:::root/*",
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
minio_client = started_cluster.minio_client
|
||||||
|
minio_client.set_bucket_policy(
|
||||||
|
started_cluster.minio_bucket, json.dumps(bucket_read_write_policy)
|
||||||
|
)
|
||||||
|
|
||||||
|
started_cluster.minio_restricted_bucket = "{}-with-auth".format(
|
||||||
|
started_cluster.minio_bucket
|
||||||
|
)
|
||||||
|
if minio_client.bucket_exists(started_cluster.minio_restricted_bucket):
|
||||||
|
minio_client.remove_bucket(started_cluster.minio_restricted_bucket)
|
||||||
|
|
||||||
|
minio_client.make_bucket(started_cluster.minio_restricted_bucket)
|
@ -1,83 +1,47 @@
|
|||||||
import logging
|
|
||||||
import os
|
|
||||||
import json
|
|
||||||
import helpers.client
|
import helpers.client
|
||||||
import pytest
|
|
||||||
from helpers.cluster import ClickHouseCluster
|
from helpers.cluster import ClickHouseCluster
|
||||||
from helpers.test_tools import TSV
|
from helpers.test_tools import TSV
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
|
||||||
|
import pyspark
|
||||||
|
import delta
|
||||||
|
from delta import *
|
||||||
|
from pyspark.sql.types import (
|
||||||
|
StructType,
|
||||||
|
StructField,
|
||||||
|
StringType,
|
||||||
|
IntegerType,
|
||||||
|
DateType,
|
||||||
|
TimestampType,
|
||||||
|
BooleanType,
|
||||||
|
ArrayType,
|
||||||
|
)
|
||||||
|
from pyspark.sql.functions import current_timestamp
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from helpers.s3_tools import prepare_s3_bucket, upload_directory, get_file_contents
|
||||||
|
|
||||||
|
|
||||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
TABLE_NAME = "test_delta_table"
|
||||||
|
USER_FILES_PATH = "/ClickHouse/tests/integration/test_storage_delta/_instances/node1/database/user_files"
|
||||||
def prepare_s3_bucket(started_cluster):
|
|
||||||
bucket_read_write_policy = {
|
|
||||||
"Version": "2012-10-17",
|
|
||||||
"Statement": [
|
|
||||||
{
|
|
||||||
"Sid": "",
|
|
||||||
"Effect": "Allow",
|
|
||||||
"Principal": {"AWS": "*"},
|
|
||||||
"Action": "s3:GetBucketLocation",
|
|
||||||
"Resource": "arn:aws:s3:::root",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"Sid": "",
|
|
||||||
"Effect": "Allow",
|
|
||||||
"Principal": {"AWS": "*"},
|
|
||||||
"Action": "s3:ListBucket",
|
|
||||||
"Resource": "arn:aws:s3:::root",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"Sid": "",
|
|
||||||
"Effect": "Allow",
|
|
||||||
"Principal": {"AWS": "*"},
|
|
||||||
"Action": "s3:GetObject",
|
|
||||||
"Resource": "arn:aws:s3:::root/*",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"Sid": "",
|
|
||||||
"Effect": "Allow",
|
|
||||||
"Principal": {"AWS": "*"},
|
|
||||||
"Action": "s3:PutObject",
|
|
||||||
"Resource": "arn:aws:s3:::root/*",
|
|
||||||
},
|
|
||||||
],
|
|
||||||
}
|
|
||||||
|
|
||||||
minio_client = started_cluster.minio_client
|
|
||||||
minio_client.set_bucket_policy(
|
|
||||||
started_cluster.minio_bucket, json.dumps(bucket_read_write_policy)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def upload_test_table(started_cluster):
|
|
||||||
bucket = started_cluster.minio_bucket
|
|
||||||
|
|
||||||
for address, dirs, files in os.walk(SCRIPT_DIR + "/test_table"):
|
|
||||||
address_without_prefix = address[len(SCRIPT_DIR) :]
|
|
||||||
|
|
||||||
for name in files:
|
|
||||||
started_cluster.minio_client.fput_object(
|
|
||||||
bucket,
|
|
||||||
os.path.join(address_without_prefix, name),
|
|
||||||
os.path.join(address, name),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
@pytest.fixture(scope="module")
|
||||||
def started_cluster():
|
def started_cluster():
|
||||||
try:
|
try:
|
||||||
cluster = ClickHouseCluster(__file__)
|
cluster = ClickHouseCluster(__file__)
|
||||||
cluster.add_instance("main_server", with_minio=True)
|
cluster.add_instance("node1", with_minio=True)
|
||||||
|
|
||||||
logging.info("Starting cluster...")
|
logging.info("Starting cluster...")
|
||||||
cluster.start()
|
cluster.start()
|
||||||
|
|
||||||
prepare_s3_bucket(cluster)
|
prepare_s3_bucket(cluster)
|
||||||
logging.info("S3 bucket created")
|
|
||||||
|
|
||||||
upload_test_table(cluster)
|
|
||||||
logging.info("Test table uploaded")
|
|
||||||
|
|
||||||
yield cluster
|
yield cluster
|
||||||
|
|
||||||
@ -85,82 +49,144 @@ def started_cluster():
|
|||||||
cluster.shutdown()
|
cluster.shutdown()
|
||||||
|
|
||||||
|
|
||||||
def run_query(instance, query, stdin=None, settings=None):
|
def get_spark():
|
||||||
# type: (ClickHouseInstance, str, object, dict) -> str
|
builder = (
|
||||||
|
pyspark.sql.SparkSession.builder.appName("spark_test")
|
||||||
logging.info("Running query '{}'...".format(query))
|
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
|
||||||
result = instance.query(query, stdin=stdin, settings=settings)
|
.config(
|
||||||
logging.info("Query finished")
|
"spark.sql.catalog.spark_catalog",
|
||||||
|
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
|
||||||
return result
|
)
|
||||||
|
.master("local")
|
||||||
|
|
||||||
def test_create_query(started_cluster):
|
|
||||||
instance = started_cluster.instances["main_server"]
|
|
||||||
bucket = started_cluster.minio_bucket
|
|
||||||
|
|
||||||
create_query = f"""CREATE TABLE deltalake ENGINE=DeltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123')"""
|
|
||||||
|
|
||||||
run_query(instance, create_query)
|
|
||||||
|
|
||||||
|
|
||||||
def test_select_query(started_cluster):
|
|
||||||
instance = started_cluster.instances["main_server"]
|
|
||||||
bucket = started_cluster.minio_bucket
|
|
||||||
columns = [
|
|
||||||
"begin_lat",
|
|
||||||
"begin_lon",
|
|
||||||
"driver",
|
|
||||||
"end_lat",
|
|
||||||
"end_lon",
|
|
||||||
"fare",
|
|
||||||
"rider",
|
|
||||||
"ts",
|
|
||||||
"uuid",
|
|
||||||
]
|
|
||||||
|
|
||||||
# create query in case table doesn't exist
|
|
||||||
create_query = f"""CREATE TABLE IF NOT EXISTS deltalake ENGINE=DeltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123')"""
|
|
||||||
|
|
||||||
run_query(instance, create_query)
|
|
||||||
|
|
||||||
select_query = "SELECT {} FROM deltalake FORMAT TSV"
|
|
||||||
select_table_function_query = "SELECT {col} FROM deltaLake('http://{ip}:{port}/{bucket}/test_table/', 'minio', 'minio123') FORMAT TSV"
|
|
||||||
|
|
||||||
for column_name in columns:
|
|
||||||
result = run_query(instance, select_query.format(column_name)).splitlines()
|
|
||||||
assert len(result) > 0
|
|
||||||
|
|
||||||
for column_name in columns:
|
|
||||||
result = run_query(
|
|
||||||
instance,
|
|
||||||
select_table_function_query.format(
|
|
||||||
col=column_name,
|
|
||||||
ip=started_cluster.minio_ip,
|
|
||||||
port=started_cluster.minio_port,
|
|
||||||
bucket=bucket,
|
|
||||||
),
|
|
||||||
).splitlines()
|
|
||||||
assert len(result) > 0
|
|
||||||
|
|
||||||
|
|
||||||
def test_describe_query(started_cluster):
|
|
||||||
instance = started_cluster.instances["main_server"]
|
|
||||||
bucket = started_cluster.minio_bucket
|
|
||||||
result = instance.query(
|
|
||||||
f"DESCRIBE deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123') FORMAT TSV",
|
|
||||||
)
|
)
|
||||||
|
|
||||||
assert result == TSV(
|
return configure_spark_with_delta_pip(builder).master("local").getOrCreate()
|
||||||
|
|
||||||
|
|
||||||
|
def get_delta_metadata(delta_metadata_file):
|
||||||
|
jsons = [json.loads(x) for x in delta_metadata_file.splitlines()]
|
||||||
|
combined_json = {}
|
||||||
|
for d in jsons:
|
||||||
|
combined_json.update(d)
|
||||||
|
return combined_json
|
||||||
|
|
||||||
|
|
||||||
|
def test_basic(started_cluster):
|
||||||
|
instance = started_cluster.instances["node1"]
|
||||||
|
|
||||||
|
data_path = f"/var/lib/clickhouse/user_files/{TABLE_NAME}.parquet"
|
||||||
|
inserted_data = "SELECT number, toString(number) FROM numbers(100)"
|
||||||
|
instance.query(
|
||||||
|
f"INSERT INTO TABLE FUNCTION file('{data_path}') {inserted_data} FORMAT Parquet"
|
||||||
|
)
|
||||||
|
|
||||||
|
instance.exec_in_container(
|
||||||
|
["bash", "-c", "chmod 777 -R /var/lib/clickhouse/user_files"],
|
||||||
|
user="root",
|
||||||
|
)
|
||||||
|
|
||||||
|
spark = get_spark()
|
||||||
|
result_path = f"/{TABLE_NAME}_result"
|
||||||
|
|
||||||
|
spark.read.load(f"file://{USER_FILES_PATH}/{TABLE_NAME}.parquet").write.mode(
|
||||||
|
"overwrite"
|
||||||
|
).option("compression", "none").format("delta").option(
|
||||||
|
"delta.columnMapping.mode", "name"
|
||||||
|
).save(
|
||||||
|
result_path
|
||||||
|
)
|
||||||
|
|
||||||
|
minio_client = started_cluster.minio_client
|
||||||
|
bucket = started_cluster.minio_bucket
|
||||||
|
upload_directory(minio_client, bucket, result_path, "")
|
||||||
|
|
||||||
|
data = get_file_contents(
|
||||||
|
minio_client,
|
||||||
|
bucket,
|
||||||
|
"/test_delta_table_result/_delta_log/00000000000000000000.json",
|
||||||
|
)
|
||||||
|
delta_metadata = get_delta_metadata(data)
|
||||||
|
|
||||||
|
stats = json.loads(delta_metadata["add"]["stats"])
|
||||||
|
assert stats["numRecords"] == 100
|
||||||
|
assert next(iter(stats["minValues"].values())) == 0
|
||||||
|
assert next(iter(stats["maxValues"].values())) == 99
|
||||||
|
|
||||||
|
instance.query(
|
||||||
|
f"""
|
||||||
|
DROP TABLE IF EXISTS {TABLE_NAME};
|
||||||
|
CREATE TABLE {TABLE_NAME} ENGINE=DeltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_delta_table_result/', 'minio', 'minio123')"""
|
||||||
|
)
|
||||||
|
assert instance.query(f"SELECT * FROM {TABLE_NAME}") == instance.query(
|
||||||
|
inserted_data
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_types(started_cluster):
|
||||||
|
spark = get_spark()
|
||||||
|
result_file = f"{TABLE_NAME}_result_2"
|
||||||
|
delta_table = (
|
||||||
|
DeltaTable.create(spark)
|
||||||
|
.tableName(TABLE_NAME)
|
||||||
|
.location(f"/{result_file}")
|
||||||
|
.addColumn("a", "INT")
|
||||||
|
.addColumn("b", "STRING")
|
||||||
|
.addColumn("c", "DATE")
|
||||||
|
.addColumn("d", "ARRAY<STRING>")
|
||||||
|
.addColumn("e", "BOOLEAN")
|
||||||
|
.execute()
|
||||||
|
)
|
||||||
|
data = [
|
||||||
|
(
|
||||||
|
123,
|
||||||
|
"string",
|
||||||
|
datetime.strptime("2000-01-01", "%Y-%m-%d"),
|
||||||
|
["str1", "str2"],
|
||||||
|
True,
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
|
schema = StructType(
|
||||||
[
|
[
|
||||||
["begin_lat", "Nullable(Float64)"],
|
StructField("a", IntegerType()),
|
||||||
["begin_lon", "Nullable(Float64)"],
|
StructField("b", StringType()),
|
||||||
["driver", "Nullable(String)"],
|
StructField("c", DateType()),
|
||||||
["end_lat", "Nullable(Float64)"],
|
StructField("d", ArrayType(StringType())),
|
||||||
["end_lon", "Nullable(Float64)"],
|
StructField("e", BooleanType()),
|
||||||
["fare", "Nullable(Float64)"],
|
]
|
||||||
["rider", "Nullable(String)"],
|
)
|
||||||
["ts", "Nullable(Int64)"],
|
df = spark.createDataFrame(data=data, schema=schema)
|
||||||
["uuid", "Nullable(String)"],
|
df.printSchema()
|
||||||
|
df.write.mode("append").format("delta").saveAsTable(TABLE_NAME)
|
||||||
|
|
||||||
|
minio_client = started_cluster.minio_client
|
||||||
|
bucket = started_cluster.minio_bucket
|
||||||
|
upload_directory(minio_client, bucket, f"/{result_file}", "")
|
||||||
|
|
||||||
|
instance = started_cluster.instances["node1"]
|
||||||
|
instance.query(
|
||||||
|
f"""
|
||||||
|
DROP TABLE IF EXISTS {TABLE_NAME};
|
||||||
|
CREATE TABLE {TABLE_NAME} ENGINE=DeltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')"""
|
||||||
|
)
|
||||||
|
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 1
|
||||||
|
assert (
|
||||||
|
instance.query(f"SELECT * FROM {TABLE_NAME}").strip()
|
||||||
|
== "123\tstring\t2000-01-01\t['str1','str2']\ttrue"
|
||||||
|
)
|
||||||
|
|
||||||
|
table_function = f"deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')"
|
||||||
|
assert (
|
||||||
|
instance.query(f"SELECT * FROM {table_function}").strip()
|
||||||
|
== "123\tstring\t2000-01-01\t['str1','str2']\ttrue"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert instance.query(f"DESCRIBE {table_function} FORMAT TSV") == TSV(
|
||||||
|
[
|
||||||
|
["a", "Nullable(Int32)"],
|
||||||
|
["b", "Nullable(String)"],
|
||||||
|
["c", "Nullable(Date32)"],
|
||||||
|
["d", "Array(Nullable(String))"],
|
||||||
|
["e", "Nullable(Bool)"],
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
Binary file not shown.
Binary file not shown.
@ -1,9 +0,0 @@
|
|||||||
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
|
|
||||||
{"metaData":{"id":"6eae6736-e014-439d-8301-070bfa5fc358","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"begin_lat\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"begin_lon\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"driver\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"end_lat\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"end_lon\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"fare\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"partitionpath\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"rider\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"uuid\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["partitionpath"],"configuration":{},"createdTime":1661963201495}}
|
|
||||||
{"add":{"path":"partitionpath=americas%252Fbrazil%252Fsao_paulo/part-00000-7212b9be-df70-42ca-831e-2ab223e7c176.c000.snappy.parquet","partitionValues":{"partitionpath":"americas/brazil/sao_paulo"},"size":2795,"modificationTime":1661963202988,"dataChange":true}}
|
|
||||||
{"add":{"path":"partitionpath=americas%252Funited_states%252Fsan_francisco/part-00000-8dcd9986-b57d-41e5-afe4-658c02e1aeb5.c000.snappy.parquet","partitionValues":{"partitionpath":"americas/united_states/san_francisco"},"size":2966,"modificationTime":1661963203028,"dataChange":true}}
|
|
||||||
{"add":{"path":"partitionpath=asia%252Findia%252Fchennai/part-00000-714ed689-3609-424f-acd2-d2bab8e66748.c000.snappy.parquet","partitionValues":{"partitionpath":"asia/india/chennai"},"size":2795,"modificationTime":1661963203056,"dataChange":true}}
|
|
||||||
{"add":{"path":"partitionpath=americas%252Fbrazil%252Fsao_paulo/part-00001-3fd0374b-5fcf-42de-b929-a68f54aa1e6b.c000.snappy.parquet","partitionValues":{"partitionpath":"americas/brazil/sao_paulo"},"size":2878,"modificationTime":1661963202988,"dataChange":true}}
|
|
||||||
{"add":{"path":"partitionpath=americas%252Funited_states%252Fsan_francisco/part-00001-7e34b80c-8fe9-466b-b8e2-817f80097b3b.c000.snappy.parquet","partitionValues":{"partitionpath":"americas/united_states/san_francisco"},"size":2878,"modificationTime":1661963203044,"dataChange":true}}
|
|
||||||
{"add":{"path":"partitionpath=asia%252Findia%252Fchennai/part-00001-a3499b25-46da-463a-9527-a3dcd269f99e.c000.snappy.parquet","partitionValues":{"partitionpath":"asia/india/chennai"},"size":2795,"modificationTime":1661963203072,"dataChange":true}}
|
|
||||||
{"commitInfo":{"timestamp":1661963203129,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[\"partitionpath\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"6","numOutputRows":"10","numOutputBytes":"17107"},"engineInfo":"Apache-Spark/3.2.2 Delta-Lake/1.1.0"}}
|
|
@ -1,13 +0,0 @@
|
|||||||
{"add":{"path":"partitionpath=americas%252Fbrazil%252Fsao_paulo/part-00000-df1117a8-d568-4514-b556-cd6ebe7630c9.c000.snappy.parquet","partitionValues":{"partitionpath":"americas/brazil/sao_paulo"},"size":2795,"modificationTime":1661964654518,"dataChange":true}}
|
|
||||||
{"add":{"path":"partitionpath=americas%252Funited_states%252Fsan_francisco/part-00000-a8bac363-ee42-47f5-a37c-1539c1bb57b1.c000.snappy.parquet","partitionValues":{"partitionpath":"americas/united_states/san_francisco"},"size":2966,"modificationTime":1661964654558,"dataChange":true}}
|
|
||||||
{"add":{"path":"partitionpath=asia%252Findia%252Fchennai/part-00000-db7e2844-bba1-41e9-841b-22762fcfc509.c000.snappy.parquet","partitionValues":{"partitionpath":"asia/india/chennai"},"size":2794,"modificationTime":1661964654586,"dataChange":true}}
|
|
||||||
{"add":{"path":"partitionpath=americas%252Fbrazil%252Fsao_paulo/part-00001-d0760f2d-45e8-493a-8144-d0d9d0ff572c.c000.snappy.parquet","partitionValues":{"partitionpath":"americas/brazil/sao_paulo"},"size":2878,"modificationTime":1661964654518,"dataChange":true}}
|
|
||||||
{"add":{"path":"partitionpath=americas%252Funited_states%252Fsan_francisco/part-00001-cebe56e9-0e6f-4fe8-8135-23184ffdc617.c000.snappy.parquet","partitionValues":{"partitionpath":"americas/united_states/san_francisco"},"size":2879,"modificationTime":1661964654558,"dataChange":true}}
|
|
||||||
{"add":{"path":"partitionpath=asia%252Findia%252Fchennai/part-00001-cbd68744-0f7d-45c7-8ca0-7594340b2c66.c000.snappy.parquet","partitionValues":{"partitionpath":"asia/india/chennai"},"size":2795,"modificationTime":1661964654582,"dataChange":true}}
|
|
||||||
{"remove":{"path":"partitionpath=americas%252Fbrazil%252Fsao_paulo/part-00000-7212b9be-df70-42ca-831e-2ab223e7c176.c000.snappy.parquet","deletionTimestamp":1661964655238,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{"partitionpath":"americas/brazil/sao_paulo"},"size":2795}}
|
|
||||||
{"remove":{"path":"partitionpath=americas%252Funited_states%252Fsan_francisco/part-00000-8dcd9986-b57d-41e5-afe4-658c02e1aeb5.c000.snappy.parquet","deletionTimestamp":1661964655238,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{"partitionpath":"americas/united_states/san_francisco"},"size":2966}}
|
|
||||||
{"remove":{"path":"partitionpath=americas%252Funited_states%252Fsan_francisco/part-00001-7e34b80c-8fe9-466b-b8e2-817f80097b3b.c000.snappy.parquet","deletionTimestamp":1661964655238,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{"partitionpath":"americas/united_states/san_francisco"},"size":2878}}
|
|
||||||
{"remove":{"path":"partitionpath=asia%252Findia%252Fchennai/part-00000-714ed689-3609-424f-acd2-d2bab8e66748.c000.snappy.parquet","deletionTimestamp":1661964655238,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{"partitionpath":"asia/india/chennai"},"size":2795}}
|
|
||||||
{"remove":{"path":"partitionpath=americas%252Fbrazil%252Fsao_paulo/part-00001-3fd0374b-5fcf-42de-b929-a68f54aa1e6b.c000.snappy.parquet","deletionTimestamp":1661964655238,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{"partitionpath":"americas/brazil/sao_paulo"},"size":2878}}
|
|
||||||
{"remove":{"path":"partitionpath=asia%252Findia%252Fchennai/part-00001-a3499b25-46da-463a-9527-a3dcd269f99e.c000.snappy.parquet","deletionTimestamp":1661964655238,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{"partitionpath":"asia/india/chennai"},"size":2795}}
|
|
||||||
{"commitInfo":{"timestamp":1661964655251,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[\"partitionpath\"]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"6","numOutputRows":"10","numOutputBytes":"17107"},"engineInfo":"Apache-Spark/3.2.2 Delta-Lake/1.1.0"}}
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user