diff --git a/src/Storages/DataLakes/IStorageDataLake.h b/src/Storages/DataLakes/IStorageDataLake.h index 77a22cd00fc..0e21b3d65ac 100644 --- a/src/Storages/DataLakes/IStorageDataLake.h +++ b/src/Storages/DataLakes/IStorageDataLake.h @@ -22,15 +22,15 @@ public: using Configuration = typename Storage::Configuration; template - explicit IStorageDataLake(const Configuration & configuration_, ContextPtr context_, Args && ...args) - : Storage(getConfigurationForDataRead(configuration_, context_), context_, std::forward(args)...) + explicit IStorageDataLake(const Configuration & configuration_, ContextPtr context_, bool attach, Args && ...args) + : Storage(getConfigurationForDataRead(configuration_, context_, {}, attach), context_, std::forward(args)...) , base_configuration(configuration_) , log(&Poco::Logger::get(getName())) {} // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall) template - static StoragePtr create(const Configuration & configuration_, ContextPtr context_, Args && ...args) + static StoragePtr create(const Configuration & configuration_, ContextPtr context_, bool attach, Args && ...args) { - return std::make_shared>(configuration_, context_, std::forward(args)...); + return std::make_shared>(configuration_, context_, attach, std::forward(args)...); } String getName() const override { return name; } @@ -64,24 +64,34 @@ public: private: static Configuration getConfigurationForDataRead( - const Configuration & base_configuration, ContextPtr local_context, const Strings & keys = {}) + const Configuration & base_configuration, ContextPtr local_context, const Strings & keys = {}, bool attach = false) { auto configuration{base_configuration}; configuration.update(local_context); configuration.static_configuration = true; - if (keys.empty()) - configuration.keys = getDataFiles(configuration, local_context); - else - configuration.keys = keys; + try + { + if (keys.empty()) + configuration.keys = getDataFiles(configuration, local_context); + else + configuration.keys = keys; - LOG_TRACE( - &Poco::Logger::get("DataLake"), - "New configuration path: {}, keys: {}", - configuration.getPath(), fmt::join(configuration.keys, ", ")); + LOG_TRACE( + &Poco::Logger::get("DataLake"), + "New configuration path: {}, keys: {}", + configuration.getPath(), fmt::join(configuration.keys, ", ")); - configuration.connect(local_context); - return configuration; + configuration.connect(local_context); + return configuration; + } + catch (...) + { + if (!attach) + throw; + configuration.is_broken = true; + return configuration; + } } static Strings getDataFiles(const Configuration & configuration, ContextPtr local_context) @@ -94,10 +104,11 @@ private: const bool updated = base_configuration.update(local_context); auto new_keys = getDataFiles(base_configuration, local_context); - if (!updated && new_keys == Storage::getConfiguration().keys) + if (!updated && !base_configuration.is_broken && new_keys == Storage::getConfiguration().keys) return; Storage::useConfiguration(getConfigurationForDataRead(base_configuration, local_context, new_keys)); + base_configuration.is_broken = false; } Configuration base_configuration; @@ -115,7 +126,7 @@ static StoragePtr createDataLakeStorage(const StorageFactory::Arguments & args) if (configuration.format == "auto") configuration.format = "Parquet"; - return DataLake::create(configuration, args.getContext(), args.table_id, args.columns, args.constraints, + return DataLake::create(configuration, args.getContext(), args.attach, args.table_id, args.columns, args.constraints, args.comment, getFormatSettings(args.getContext())); } diff --git a/src/Storages/DataLakes/Iceberg/StorageIceberg.cpp b/src/Storages/DataLakes/Iceberg/StorageIceberg.cpp index 20ac77976cb..2f6b9a5a694 100644 --- a/src/Storages/DataLakes/Iceberg/StorageIceberg.cpp +++ b/src/Storages/DataLakes/Iceberg/StorageIceberg.cpp @@ -8,6 +8,7 @@ namespace DB StoragePtr StorageIceberg::create( const DB::StorageIceberg::Configuration & base_configuration, DB::ContextPtr context_, + bool attach, const DB::StorageID & table_id_, const DB::ColumnsDescription & columns_, const DB::ConstraintsDescription & constraints_, @@ -16,10 +17,30 @@ StoragePtr StorageIceberg::create( { auto configuration{base_configuration}; configuration.update(context_); - auto metadata = parseIcebergMetadata(configuration, context_); - auto schema_from_metadata = metadata->getTableSchema(); - configuration.keys = metadata->getDataFiles(); - return std::make_shared(std::move(metadata), configuration, context_, table_id_, columns_.empty() ? ColumnsDescription(schema_from_metadata) : columns_, constraints_, comment, format_settings_); + std::unique_ptr metadata; + NamesAndTypesList schema_from_metadata; + try + { + metadata = parseIcebergMetadata(configuration, context_); + schema_from_metadata = metadata->getTableSchema(); + configuration.keys = metadata->getDataFiles(); + } + catch (...) + { + if (!attach) + throw; + configuration.is_broken = true; + } + + return std::make_shared( + std::move(metadata), + configuration, + context_, + table_id_, + columns_.empty() ? ColumnsDescription(schema_from_metadata) : columns_, + constraints_, + comment, + format_settings_); } StorageIceberg::StorageIceberg( @@ -52,8 +73,12 @@ void StorageIceberg::updateConfigurationImpl(ContextPtr local_context) { const bool updated = base_configuration.update(local_context); auto new_metadata = parseIcebergMetadata(base_configuration, local_context); + + if (!current_metadata) + current_metadata = parseIcebergMetadata(base_configuration, local_context); + /// Check if nothing was changed. - if (updated && new_metadata->getVersion() == current_metadata->getVersion()) + if (!updated && !base_configuration.is_broken && new_metadata->getVersion() == current_metadata->getVersion()) return; if (new_metadata->getVersion() != current_metadata->getVersion()) @@ -63,6 +88,7 @@ void StorageIceberg::updateConfigurationImpl(ContextPtr local_context) /// If metadata wasn't changed, we won't list data files again. updated_configuration.keys = current_metadata->getDataFiles(); StorageS3::useConfiguration(updated_configuration); + base_configuration.is_broken = false; } } diff --git a/src/Storages/DataLakes/Iceberg/StorageIceberg.h b/src/Storages/DataLakes/Iceberg/StorageIceberg.h index a18865b5a54..4e63da5508a 100644 --- a/src/Storages/DataLakes/Iceberg/StorageIceberg.h +++ b/src/Storages/DataLakes/Iceberg/StorageIceberg.h @@ -30,6 +30,7 @@ public: static StoragePtr create(const Configuration & base_configuration, ContextPtr context_, + bool attach, const StorageID & table_id_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index b90a0d394cb..f33c13ece86 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -304,6 +304,7 @@ public: std::shared_ptr client; std::vector keys; + bool is_broken = false; }; StorageS3( diff --git a/src/TableFunctions/ITableFunctionDataLake.h b/src/TableFunctions/ITableFunctionDataLake.h index d8524963776..3e4d6f8201d 100644 --- a/src/TableFunctions/ITableFunctionDataLake.h +++ b/src/TableFunctions/ITableFunctionDataLake.h @@ -34,7 +34,7 @@ protected: columns = parseColumnsListFromString(TableFunction::configuration.structure, context); StoragePtr storage = Storage::create( - TableFunction::configuration, context, StorageID(TableFunction::getDatabaseName(), table_name), + TableFunction::configuration, context, true, StorageID(TableFunction::getDatabaseName(), table_name), columns, ConstraintsDescription{}, String{}, std::nullopt); storage->startup(); diff --git a/tests/integration/helpers/s3_tools.py b/tests/integration/helpers/s3_tools.py index 777b3394dc1..c4f4c3f5aaa 100644 --- a/tests/integration/helpers/s3_tools.py +++ b/tests/integration/helpers/s3_tools.py @@ -36,6 +36,16 @@ def get_file_contents(minio_client, bucket, s3_path): return data_str.decode() +def list_s3_objects(minio_client, bucket, prefix=""): + prefix_len = len(prefix) + return [ + obj.object_name[prefix_len:] + for obj in minio_client.list_objects( + bucket, prefix=prefix, recursive=True + ) + ] + + # Creates S3 bucket for tests and allows anonymous read-write access to it. def prepare_s3_bucket(started_cluster): # Allows read-write access for bucket without authorization. diff --git a/tests/integration/test_storage_delta/test.py b/tests/integration/test_storage_delta/test.py index 621d2b89fc5..8f7349ad709 100644 --- a/tests/integration/test_storage_delta/test.py +++ b/tests/integration/test_storage_delta/test.py @@ -26,8 +26,14 @@ from pyspark.sql.functions import current_timestamp from datetime import datetime from pyspark.sql.functions import monotonically_increasing_id, row_number from pyspark.sql.window import Window +from minio.deleteobjects import DeleteObject -from helpers.s3_tools import prepare_s3_bucket, upload_directory, get_file_contents +from helpers.s3_tools import ( + prepare_s3_bucket, + upload_directory, + get_file_contents, + list_s3_objects, +) SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -55,6 +61,7 @@ def started_cluster(): main_configs=["configs/config.d/named_collections.xml"], user_configs=["configs/users.d/users.xml"], with_minio=True, + stay_alive=True, ) logging.info("Starting cluster...") @@ -111,12 +118,12 @@ def get_delta_metadata(delta_metadata_file): return combined_json -def create_delta_table(node, table_name): +def create_delta_table(node, table_name, bucket="root"): node.query( f""" DROP TABLE IF EXISTS {table_name}; CREATE TABLE {table_name} - ENGINE=DeltaLake(s3, filename = '{table_name}/')""" + ENGINE=DeltaLake(s3, filename = '{table_name}/', url = 'http://minio1:9001/{bucket}/')""" ) @@ -401,3 +408,50 @@ def test_types(started_cluster): ["e", "Nullable(Bool)"], ] ) + + +def test_restart_broken(started_cluster): + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + minio_client = started_cluster.minio_client + bucket = "broken" + TABLE_NAME = "test_restart_broken" + + if not minio_client.bucket_exists(bucket): + minio_client.make_bucket(bucket) + + parquet_data_path = create_initial_data_file( + started_cluster, + instance, + "SELECT number, toString(number) FROM numbers(100)", + TABLE_NAME, + ) + + write_delta_from_file(spark, parquet_data_path, f"/{TABLE_NAME}") + upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "") + create_delta_table(instance, TABLE_NAME, bucket=bucket) + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100 + + s3_objects = list_s3_objects(minio_client, bucket, prefix="") + assert ( + len( + list( + minio_client.remove_objects( + bucket, + [DeleteObject(obj) for obj in s3_objects], + ) + ) + ) + == 0 + ) + minio_client.remove_bucket(bucket) + + instance.restart_clickhouse() + + assert "NoSuchBucket" in instance.query_and_get_error(f"SELECT count() FROM {TABLE_NAME}") + + minio_client.make_bucket(bucket) + + upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "") + + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100 diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 9a75dc50d61..ee7c20025f4 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -27,8 +27,14 @@ from datetime import datetime from pyspark.sql.functions import monotonically_increasing_id, row_number from pyspark.sql.window import Window from pyspark.sql.readwriter import DataFrameWriter, DataFrameWriterV2 +from minio.deleteobjects import DeleteObject -from helpers.s3_tools import prepare_s3_bucket, upload_directory, get_file_contents +from helpers.s3_tools import ( + prepare_s3_bucket, + upload_directory, + get_file_contents, + list_s3_objects, +) SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -61,6 +67,7 @@ def started_cluster(): main_configs=["configs/config.d/named_collections.xml"], user_configs=["configs/users.d/users.xml"], with_minio=True, + stay_alive=True, ) logging.info("Starting cluster...") @@ -135,12 +142,12 @@ def generate_data(spark, start, end): return df -def create_iceberg_table(node, table_name, format="Parquet"): +def create_iceberg_table(node, table_name, format="Parquet", bucket="root"): node.query( f""" DROP TABLE IF EXISTS {table_name}; CREATE TABLE {table_name} - ENGINE=Iceberg(s3, filename = 'iceberg_data/default/{table_name}/', format={format})""" + ENGINE=Iceberg(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')""" ) @@ -551,3 +558,54 @@ def test_metadata_file_format_with_uuid(started_cluster, format_version): create_iceberg_table(instance, TABLE_NAME) assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 500 + + +def test_restart_broken(started_cluster): + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + minio_client = started_cluster.minio_client + bucket = "broken" + TABLE_NAME = "test_restart_broken" + + if not minio_client.bucket_exists(bucket): + minio_client.make_bucket(bucket) + + parquet_data_path = create_initial_data_file( + started_cluster, + instance, + "SELECT number, toString(number) FROM numbers(100)", + TABLE_NAME, + ) + + write_iceberg_from_file(spark, parquet_data_path, TABLE_NAME, format_version="1") + files = upload_directory( + minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", "" + ) + create_iceberg_table(instance, TABLE_NAME, bucket=bucket) + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100 + + s3_objects = list_s3_objects(minio_client, bucket, prefix="") + assert ( + len( + list( + minio_client.remove_objects( + bucket, + [DeleteObject(obj) for obj in s3_objects], + ) + ) + ) + == 0 + ) + minio_client.remove_bucket(bucket) + + instance.restart_clickhouse() + + assert "NoSuchBucket" in instance.query_and_get_error(f"SELECT count() FROM {TABLE_NAME}") + + minio_client.make_bucket(bucket) + + files = upload_directory( + minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", "" + ) + + assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100