This commit is contained in:
kssenii 2024-01-22 18:24:48 +01:00
parent ebfea07f93
commit 1aa8e62bb7
8 changed files with 190 additions and 29 deletions

View File

@ -22,15 +22,15 @@ public:
using Configuration = typename Storage::Configuration;
template <class ...Args>
explicit IStorageDataLake(const Configuration & configuration_, ContextPtr context_, Args && ...args)
: Storage(getConfigurationForDataRead(configuration_, context_), context_, std::forward<Args>(args)...)
explicit IStorageDataLake(const Configuration & configuration_, ContextPtr context_, bool attach, Args && ...args)
: Storage(getConfigurationForDataRead(configuration_, context_, {}, attach), context_, std::forward<Args>(args)...)
, base_configuration(configuration_)
, log(&Poco::Logger::get(getName())) {} // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall)
template <class ...Args>
static StoragePtr create(const Configuration & configuration_, ContextPtr context_, Args && ...args)
static StoragePtr create(const Configuration & configuration_, ContextPtr context_, bool attach, Args && ...args)
{
return std::make_shared<IStorageDataLake<Storage, Name, MetadataParser>>(configuration_, context_, std::forward<Args>(args)...);
return std::make_shared<IStorageDataLake<Storage, Name, MetadataParser>>(configuration_, context_, attach, std::forward<Args>(args)...);
}
String getName() const override { return name; }
@ -64,24 +64,34 @@ public:
private:
static Configuration getConfigurationForDataRead(
const Configuration & base_configuration, ContextPtr local_context, const Strings & keys = {})
const Configuration & base_configuration, ContextPtr local_context, const Strings & keys = {}, bool attach = false)
{
auto configuration{base_configuration};
configuration.update(local_context);
configuration.static_configuration = true;
if (keys.empty())
configuration.keys = getDataFiles(configuration, local_context);
else
configuration.keys = keys;
try
{
if (keys.empty())
configuration.keys = getDataFiles(configuration, local_context);
else
configuration.keys = keys;
LOG_TRACE(
&Poco::Logger::get("DataLake"),
"New configuration path: {}, keys: {}",
configuration.getPath(), fmt::join(configuration.keys, ", "));
LOG_TRACE(
&Poco::Logger::get("DataLake"),
"New configuration path: {}, keys: {}",
configuration.getPath(), fmt::join(configuration.keys, ", "));
configuration.connect(local_context);
return configuration;
configuration.connect(local_context);
return configuration;
}
catch (...)
{
if (!attach)
throw;
configuration.is_broken = true;
return configuration;
}
}
static Strings getDataFiles(const Configuration & configuration, ContextPtr local_context)
@ -94,10 +104,11 @@ private:
const bool updated = base_configuration.update(local_context);
auto new_keys = getDataFiles(base_configuration, local_context);
if (!updated && new_keys == Storage::getConfiguration().keys)
if (!updated && !base_configuration.is_broken && new_keys == Storage::getConfiguration().keys)
return;
Storage::useConfiguration(getConfigurationForDataRead(base_configuration, local_context, new_keys));
base_configuration.is_broken = false;
}
Configuration base_configuration;
@ -115,7 +126,7 @@ static StoragePtr createDataLakeStorage(const StorageFactory::Arguments & args)
if (configuration.format == "auto")
configuration.format = "Parquet";
return DataLake::create(configuration, args.getContext(), args.table_id, args.columns, args.constraints,
return DataLake::create(configuration, args.getContext(), args.attach, args.table_id, args.columns, args.constraints,
args.comment, getFormatSettings(args.getContext()));
}

View File

@ -8,6 +8,7 @@ namespace DB
StoragePtr StorageIceberg::create(
const DB::StorageIceberg::Configuration & base_configuration,
DB::ContextPtr context_,
bool attach,
const DB::StorageID & table_id_,
const DB::ColumnsDescription & columns_,
const DB::ConstraintsDescription & constraints_,
@ -16,10 +17,30 @@ StoragePtr StorageIceberg::create(
{
auto configuration{base_configuration};
configuration.update(context_);
auto metadata = parseIcebergMetadata(configuration, context_);
auto schema_from_metadata = metadata->getTableSchema();
configuration.keys = metadata->getDataFiles();
return std::make_shared<StorageIceberg>(std::move(metadata), configuration, context_, table_id_, columns_.empty() ? ColumnsDescription(schema_from_metadata) : columns_, constraints_, comment, format_settings_);
std::unique_ptr<IcebergMetadata> metadata;
NamesAndTypesList schema_from_metadata;
try
{
metadata = parseIcebergMetadata(configuration, context_);
schema_from_metadata = metadata->getTableSchema();
configuration.keys = metadata->getDataFiles();
}
catch (...)
{
if (!attach)
throw;
configuration.is_broken = true;
}
return std::make_shared<StorageIceberg>(
std::move(metadata),
configuration,
context_,
table_id_,
columns_.empty() ? ColumnsDescription(schema_from_metadata) : columns_,
constraints_,
comment,
format_settings_);
}
StorageIceberg::StorageIceberg(
@ -52,8 +73,12 @@ void StorageIceberg::updateConfigurationImpl(ContextPtr local_context)
{
const bool updated = base_configuration.update(local_context);
auto new_metadata = parseIcebergMetadata(base_configuration, local_context);
if (!current_metadata)
current_metadata = parseIcebergMetadata(base_configuration, local_context);
/// Check if nothing was changed.
if (updated && new_metadata->getVersion() == current_metadata->getVersion())
if (!updated && !base_configuration.is_broken && new_metadata->getVersion() == current_metadata->getVersion())
return;
if (new_metadata->getVersion() != current_metadata->getVersion())
@ -63,6 +88,7 @@ void StorageIceberg::updateConfigurationImpl(ContextPtr local_context)
/// If metadata wasn't changed, we won't list data files again.
updated_configuration.keys = current_metadata->getDataFiles();
StorageS3::useConfiguration(updated_configuration);
base_configuration.is_broken = false;
}
}

View File

@ -30,6 +30,7 @@ public:
static StoragePtr create(const Configuration & base_configuration,
ContextPtr context_,
bool attach,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,

View File

@ -304,6 +304,7 @@ public:
std::shared_ptr<const S3::Client> client;
std::vector<String> keys;
bool is_broken = false;
};
StorageS3(

View File

@ -34,7 +34,7 @@ protected:
columns = parseColumnsListFromString(TableFunction::configuration.structure, context);
StoragePtr storage = Storage::create(
TableFunction::configuration, context, StorageID(TableFunction::getDatabaseName(), table_name),
TableFunction::configuration, context, true, StorageID(TableFunction::getDatabaseName(), table_name),
columns, ConstraintsDescription{}, String{}, std::nullopt);
storage->startup();

View File

@ -36,6 +36,16 @@ def get_file_contents(minio_client, bucket, s3_path):
return data_str.decode()
def list_s3_objects(minio_client, bucket, prefix=""):
prefix_len = len(prefix)
return [
obj.object_name[prefix_len:]
for obj in minio_client.list_objects(
bucket, prefix=prefix, recursive=True
)
]
# Creates S3 bucket for tests and allows anonymous read-write access to it.
def prepare_s3_bucket(started_cluster):
# Allows read-write access for bucket without authorization.

View File

@ -26,8 +26,14 @@ from pyspark.sql.functions import current_timestamp
from datetime import datetime
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window
from minio.deleteobjects import DeleteObject
from helpers.s3_tools import prepare_s3_bucket, upload_directory, get_file_contents
from helpers.s3_tools import (
prepare_s3_bucket,
upload_directory,
get_file_contents,
list_s3_objects,
)
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -55,6 +61,7 @@ def started_cluster():
main_configs=["configs/config.d/named_collections.xml"],
user_configs=["configs/users.d/users.xml"],
with_minio=True,
stay_alive=True,
)
logging.info("Starting cluster...")
@ -111,12 +118,12 @@ def get_delta_metadata(delta_metadata_file):
return combined_json
def create_delta_table(node, table_name):
def create_delta_table(node, table_name, bucket="root"):
node.query(
f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=DeltaLake(s3, filename = '{table_name}/')"""
ENGINE=DeltaLake(s3, filename = '{table_name}/', url = 'http://minio1:9001/{bucket}/')"""
)
@ -401,3 +408,50 @@ def test_types(started_cluster):
["e", "Nullable(Bool)"],
]
)
def test_restart_broken(started_cluster):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = "broken"
TABLE_NAME = "test_restart_broken"
if not minio_client.bucket_exists(bucket):
minio_client.make_bucket(bucket)
parquet_data_path = create_initial_data_file(
started_cluster,
instance,
"SELECT number, toString(number) FROM numbers(100)",
TABLE_NAME,
)
write_delta_from_file(spark, parquet_data_path, f"/{TABLE_NAME}")
upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
create_delta_table(instance, TABLE_NAME, bucket=bucket)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
s3_objects = list_s3_objects(minio_client, bucket, prefix="")
assert (
len(
list(
minio_client.remove_objects(
bucket,
[DeleteObject(obj) for obj in s3_objects],
)
)
)
== 0
)
minio_client.remove_bucket(bucket)
instance.restart_clickhouse()
assert "NoSuchBucket" in instance.query_and_get_error(f"SELECT count() FROM {TABLE_NAME}")
minio_client.make_bucket(bucket)
upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100

View File

@ -27,8 +27,14 @@ from datetime import datetime
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window
from pyspark.sql.readwriter import DataFrameWriter, DataFrameWriterV2
from minio.deleteobjects import DeleteObject
from helpers.s3_tools import prepare_s3_bucket, upload_directory, get_file_contents
from helpers.s3_tools import (
prepare_s3_bucket,
upload_directory,
get_file_contents,
list_s3_objects,
)
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -61,6 +67,7 @@ def started_cluster():
main_configs=["configs/config.d/named_collections.xml"],
user_configs=["configs/users.d/users.xml"],
with_minio=True,
stay_alive=True,
)
logging.info("Starting cluster...")
@ -135,12 +142,12 @@ def generate_data(spark, start, end):
return df
def create_iceberg_table(node, table_name, format="Parquet"):
def create_iceberg_table(node, table_name, format="Parquet", bucket="root"):
node.query(
f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=Iceberg(s3, filename = 'iceberg_data/default/{table_name}/', format={format})"""
ENGINE=Iceberg(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"""
)
@ -551,3 +558,54 @@ def test_metadata_file_format_with_uuid(started_cluster, format_version):
create_iceberg_table(instance, TABLE_NAME)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 500
def test_restart_broken(started_cluster):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = "broken"
TABLE_NAME = "test_restart_broken"
if not minio_client.bucket_exists(bucket):
minio_client.make_bucket(bucket)
parquet_data_path = create_initial_data_file(
started_cluster,
instance,
"SELECT number, toString(number) FROM numbers(100)",
TABLE_NAME,
)
write_iceberg_from_file(spark, parquet_data_path, TABLE_NAME, format_version="1")
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
)
create_iceberg_table(instance, TABLE_NAME, bucket=bucket)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
s3_objects = list_s3_objects(minio_client, bucket, prefix="")
assert (
len(
list(
minio_client.remove_objects(
bucket,
[DeleteObject(obj) for obj in s3_objects],
)
)
)
== 0
)
minio_client.remove_bucket(bucket)
instance.restart_clickhouse()
assert "NoSuchBucket" in instance.query_and_get_error(f"SELECT count() FROM {TABLE_NAME}")
minio_client.make_bucket(bucket)
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100