diff --git a/docs/en/engines/table-engines/integrations/iceberg.md b/docs/en/engines/table-engines/integrations/iceberg.md index 21fdbc0b1a5..94468066372 100644 --- a/docs/en/engines/table-engines/integrations/iceberg.md +++ b/docs/en/engines/table-engines/integrations/iceberg.md @@ -6,28 +6,34 @@ sidebar_label: Iceberg # Iceberg Table Engine -This engine provides a read-only integration with existing Apache [Iceberg](https://iceberg.apache.org/) tables in Amazon S3. +This engine provides a read-only integration with existing Apache [Iceberg](https://iceberg.apache.org/) tables in Amazon S3, Azure and locally stored tables. ## Create Table -Note that the Iceberg table must already exist in S3, this command does not take DDL parameters to create a new table. +Note that the Iceberg table must already exist in the storage, this command does not take DDL parameters to create a new table. ``` sql -CREATE TABLE iceberg_table - ENGINE = Iceberg(url, [aws_access_key_id, aws_secret_access_key,]) +CREATE TABLE iceberg_table_s3 + ENGINE = IcebergS3(url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression]) + +CREATE TABLE iceberg_table_azure + ENGINE = IcebergAzure(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression]) + +CREATE TABLE iceberg_table_local + ENGINE = IcebergLocal(path_to_table, [,format] [,compression_method]) ``` -**Engine parameters** +**Engine arguments** -- `url` — url with the path to an existing Iceberg table. -- `aws_access_key_id`, `aws_secret_access_key` - Long-term credentials for the [AWS](https://aws.amazon.com/) account user. You can use these to authenticate your requests. Parameter is optional. If credentials are not specified, they are used from the configuration file. +Description of the arguments coincides with description of arguments in engines `S3`, `AzureBlobStorage` and `File` correspondingly. +`format` stands for the format of data files in the Iceberg table. Engine parameters can be specified using [Named Collections](../../../operations/named-collections.md) **Example** ```sql -CREATE TABLE iceberg_table ENGINE=Iceberg('http://test.s3.amazonaws.com/clickhouse-bucket/test_table', 'test', 'test') +CREATE TABLE iceberg_table ENGINE=IcebergS3('http://test.s3.amazonaws.com/clickhouse-bucket/test_table', 'test', 'test') ``` Using named collections: @@ -45,9 +51,15 @@ Using named collections: ``` ```sql -CREATE TABLE iceberg_table ENGINE=Iceberg(iceberg_conf, filename = 'test_table') +CREATE TABLE iceberg_table ENGINE=IcebergS3(iceberg_conf, filename = 'test_table') + ``` +**Aliases** + + +Table engine `Iceberg` is an alias to `IcebergS3` now. + ## See also - [iceberg table function](/docs/en/sql-reference/table-functions/iceberg.md) diff --git a/docs/en/sql-reference/table-functions/iceberg.md b/docs/en/sql-reference/table-functions/iceberg.md index fa86b436a5e..784fd646860 100644 --- a/docs/en/sql-reference/table-functions/iceberg.md +++ b/docs/en/sql-reference/table-functions/iceberg.md @@ -6,35 +6,37 @@ sidebar_label: iceberg # iceberg Table Function -Provides a read-only table-like interface to Apache [Iceberg](https://iceberg.apache.org/) tables in Amazon S3. +Provides a read-only table-like interface to Apache [Iceberg](https://iceberg.apache.org/) tables in Amazon S3, Azure or locally stored. ## Syntax ``` sql -iceberg(url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure]) +icebergS3(url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method]) +icebergS3(named_collection[, option=value [,..]]) + +icebergAzure(connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method]) +icebergAzure(named_collection[, option=value [,..]]) + +icebergLocal(path_to_table, [,format] [,compression_method]) +icebergLocal(named_collection[, option=value [,..]]) ``` ## Arguments -- `url` — Bucket url with the path to an existing Iceberg table in S3. -- `aws_access_key_id`, `aws_secret_access_key` - Long-term credentials for the [AWS](https://aws.amazon.com/) account user. You can use these to authenticate your requests. These parameters are optional. If credentials are not specified, they are used from the ClickHouse configuration. For more information see [Using S3 for Data Storage](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#table_engine-mergetree-s3). -- `format` — The [format](/docs/en/interfaces/formats.md/#formats) of the file. By default `Parquet` is used. -- `structure` — Structure of the table. Format `'column1_name column1_type, column2_name column2_type, ...'`. - -Engine parameters can be specified using [Named Collections](/docs/en/operations/named-collections.md). +Description of the arguments coincides with description of arguments in table functions `s3`, `azureBlobStorage` and `file` correspondingly. +`format` stands for the format of data files in the Iceberg table. **Returned value** - -A table with the specified structure for reading data in the specified Iceberg table in S3. +A table with the specified structure for reading data in the specified Iceberg table. **Example** ```sql -SELECT * FROM iceberg('http://test.s3.amazonaws.com/clickhouse-bucket/test_table', 'test', 'test') +SELECT * FROM icebergS3('http://test.s3.amazonaws.com/clickhouse-bucket/test_table', 'test', 'test') ``` :::important -ClickHouse currently supports reading v1 (v2 support is coming soon!) of the Iceberg format via the `iceberg` table function and `Iceberg` table engine. +ClickHouse currently supports reading v1 and v2 of the Iceberg format via the `icebergS3`, `icebergAzure` and `icebergLocal` table functions and `IcebergS3`, `icebergAzure` ans `icebergLocal` table engines. ::: ## Defining a named collection @@ -56,10 +58,14 @@ Here is an example of configuring a named collection for storing the URL and cre ``` ```sql -SELECT * FROM iceberg(iceberg_conf, filename = 'test_table') -DESCRIBE iceberg(iceberg_conf, filename = 'test_table') +SELECT * FROM icebergS3(iceberg_conf, filename = 'test_table') +DESCRIBE icebergS3(iceberg_conf, filename = 'test_table') ``` +**Aliases** + +Table function `iceberg` is an alias to `icebergS3` now. + **See Also** - [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 95e431b54be..1889bba3b39 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -111,6 +111,7 @@ add_headers_and_sources(dbms Storages/ObjectStorage) add_headers_and_sources(dbms Storages/ObjectStorage/Azure) add_headers_and_sources(dbms Storages/ObjectStorage/S3) add_headers_and_sources(dbms Storages/ObjectStorage/HDFS) +add_headers_and_sources(dbms Storages/ObjectStorage/Local) add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes) add_headers_and_sources(dbms Common/NamedCollections) diff --git a/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp b/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp index 5b61c57ca21..3b650adb71f 100644 --- a/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp +++ b/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp @@ -43,39 +43,21 @@ bool LocalObjectStorage::exists(const StoredObject & object) const std::unique_ptr LocalObjectStorage::readObjects( /// NOLINT const StoredObjects & objects, const ReadSettings & read_settings, - std::optional read_hint, - std::optional file_size) const + std::optional, + std::optional) const { auto modified_settings = patchSettings(read_settings); auto global_context = Context::getGlobalContextInstance(); - auto read_buffer_creator = - [=] (bool /* restricted_seek */, const StoredObject & object) - -> std::unique_ptr - { - return createReadBufferFromFileBase(object.remote_path, modified_settings, read_hint, file_size); - }; + auto read_buffer_creator = [=](bool /* restricted_seek */, const StoredObject & object) -> std::unique_ptr + { return std::make_unique(object.remote_path); }; - switch (read_settings.remote_fs_method) - { - case RemoteFSReadMethod::read: - { - return std::make_unique( - std::move(read_buffer_creator), objects, "file:", modified_settings, - global_context->getFilesystemCacheLog(), /* use_external_buffer */false); - } - case RemoteFSReadMethod::threadpool: - { - auto impl = std::make_unique( - std::move(read_buffer_creator), objects, "file:", modified_settings, - global_context->getFilesystemCacheLog(), /* use_external_buffer */true); - - auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER); - return std::make_unique( - std::move(impl), reader, read_settings, - global_context->getAsyncReadCounters(), - global_context->getFilesystemReadPrefetchesLog()); - } - } + return std::make_unique( + std::move(read_buffer_creator), + objects, + "file:", + modified_settings, + global_context->getFilesystemCacheLog(), + /* use_external_buffer */ false); } ReadSettings LocalObjectStorage::patchSettings(const ReadSettings & read_settings) const diff --git a/src/Storages/ObjectStorage/Azure/Configuration.cpp b/src/Storages/ObjectStorage/Azure/Configuration.cpp index f0a0a562b92..9730391d429 100644 --- a/src/Storages/ObjectStorage/Azure/Configuration.cpp +++ b/src/Storages/ObjectStorage/Azure/Configuration.cpp @@ -148,10 +148,12 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context, { if (engine_args.size() < 3 || engine_args.size() > (with_structure ? 8 : 7)) { - throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, - "Storage AzureBlobStorage requires 3 to 7 arguments: " - "AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, " - "[account_name, account_key, format, compression, structure)])"); + throw Exception( + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Storage AzureBlobStorage requires 3 to {} arguments: " + "AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, " + "[account_name, account_key, format, compression, structure)])", + (with_structure ? 8 : 7)); } for (auto & engine_arg : engine_args) diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp index 9b972711cb1..667a925d11e 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp @@ -3,7 +3,7 @@ #include "config.h" #include -#if USE_AWS_S3 && USE_PARQUET +#if USE_PARQUET #include #include diff --git a/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h b/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h index 087207d3860..4b21bdb0494 100644 --- a/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h +++ b/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h @@ -2,7 +2,7 @@ #include "config.h" -#if USE_AWS_S3 && USE_AVRO +#if USE_AVRO #include #include diff --git a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp index 6d18b13df01..9b9d92e282c 100644 --- a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.cpp @@ -1,6 +1,6 @@ #include "config.h" -#if USE_AWS_S3 && USE_AVRO +#if USE_AVRO #include #include diff --git a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h index 9476ac6e7d9..7b0deab91c3 100644 --- a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h @@ -1,6 +1,6 @@ #pragma once -#if USE_AWS_S3 && USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format. +#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format. #include #include diff --git a/src/Storages/ObjectStorage/DataLakes/registerDataLakeStorages.cpp b/src/Storages/ObjectStorage/DataLakes/registerDataLakeStorages.cpp index 0fa6402e892..f0bd51de375 100644 --- a/src/Storages/ObjectStorage/DataLakes/registerDataLakeStorages.cpp +++ b/src/Storages/ObjectStorage/DataLakes/registerDataLakeStorages.cpp @@ -2,10 +2,12 @@ #if USE_AWS_S3 -#include -#include -#include -#include +# include +# include +# include +# include +# include +# include namespace DB @@ -22,6 +24,54 @@ void registerStorageIceberg(StorageFactory & factory) auto configuration = std::make_shared(); StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false); + return StorageIceberg::create( + configuration, args.getContext(), args.table_id, args.columns, args.constraints, args.comment, std::nullopt, args.mode); + }, + { + .supports_settings = false, + .supports_schema_inference = true, + .source_access_type = AccessType::S3, + }); + + factory.registerStorage( + "IcebergS3", + [&](const StorageFactory::Arguments & args) + { + auto configuration = std::make_shared(); + StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false); + + return StorageIceberg::create( + configuration, args.getContext(), args.table_id, args.columns, args.constraints, args.comment, std::nullopt, args.mode); + }, + { + .supports_settings = false, + .supports_schema_inference = true, + .source_access_type = AccessType::S3, + }); + + factory.registerStorage( + "IcebergAzure", + [&](const StorageFactory::Arguments & args) + { + auto configuration = std::make_shared(); + StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), true); + + return StorageIceberg::create( + configuration, args.getContext(), args.table_id, args.columns, args.constraints, args.comment, std::nullopt, args.mode); + }, + { + .supports_settings = false, + .supports_schema_inference = true, + .source_access_type = AccessType::AZURE, + }); + + factory.registerStorage( + "IcebergLocal", + [&](const StorageFactory::Arguments & args) + { + auto configuration = std::make_shared(); + StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false); + return StorageIceberg::create( configuration, args.getContext(), args.table_id, args.columns, args.constraints, args.comment, std::nullopt, args.mode); @@ -29,7 +79,7 @@ void registerStorageIceberg(StorageFactory & factory) { .supports_settings = false, .supports_schema_inference = true, - .source_access_type = AccessType::S3, + .source_access_type = AccessType::FILE, }); } diff --git a/src/Storages/ObjectStorage/Local/Configuration.cpp b/src/Storages/ObjectStorage/Local/Configuration.cpp new file mode 100644 index 00000000000..a0cf70e6212 --- /dev/null +++ b/src/Storages/ObjectStorage/Local/Configuration.cpp @@ -0,0 +1,77 @@ +#include +#include +#include +#include +#include +#include "Common/NamedCollections/NamedCollections.h" + + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; +} + +void StorageLocalConfiguration::fromNamedCollection(const NamedCollection & collection, ContextPtr) +{ + path = collection.get("path"); + format = collection.getOrDefault("format", "auto"); + compression_method = collection.getOrDefault("compression_method", collection.getOrDefault("compression", "auto")); + structure = collection.getOrDefault("structure", "auto"); + paths = {path}; +} + + +void StorageLocalConfiguration::fromAST(ASTs & args, ContextPtr context, bool with_structure) +{ + const size_t max_args_num = with_structure ? 4 : 3; + if (args.empty() || args.size() > max_args_num) + { + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Expected not more than {} arguments", max_args_num); + } + + for (auto & arg : args) + arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context); + + path = checkAndGetLiteralArgument(args[0], "path"); + + if (args.size() > 1) + { + format = checkAndGetLiteralArgument(args[1], "format_name"); + } + + if (with_structure) + { + if (args.size() > 2) + { + structure = checkAndGetLiteralArgument(args[2], "structure"); + } + if (args.size() > 3) + { + compression_method = checkAndGetLiteralArgument(args[3], "compression_method"); + } + } + else if (args.size() > 2) + { + compression_method = checkAndGetLiteralArgument(args[2], "compression_method"); + } + paths = {path}; +} + +StorageObjectStorage::QuerySettings StorageLocalConfiguration::getQuerySettings(const ContextPtr & context) const +{ + const auto & settings = context->getSettingsRef(); + return StorageObjectStorage::QuerySettings{ + .truncate_on_insert = settings.engine_file_truncate_on_insert, + .create_new_file_on_insert = false, + .schema_inference_use_cache = settings.schema_inference_use_cache_for_file, + .schema_inference_mode = settings.schema_inference_mode, + .skip_empty_files = settings.engine_file_skip_empty_files, + .list_object_keys_size = 0, + .throw_on_zero_files_match = false, + .ignore_non_existent_file = false}; +} + +} diff --git a/src/Storages/ObjectStorage/Local/Configuration.h b/src/Storages/ObjectStorage/Local/Configuration.h new file mode 100644 index 00000000000..ba4de63ac47 --- /dev/null +++ b/src/Storages/ObjectStorage/Local/Configuration.h @@ -0,0 +1,52 @@ +#pragma once + +#include +#include "Disks/ObjectStorages/Local/LocalObjectStorage.h" + +#include + +#include + + +namespace fs = std::filesystem; + +namespace DB +{ + +class StorageLocalConfiguration : public StorageObjectStorage::Configuration +{ +public: + using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr; + + static constexpr auto type_name = "local"; + + StorageLocalConfiguration() = default; + StorageLocalConfiguration(const StorageLocalConfiguration & other) = default; + + std::string getTypeName() const override { return type_name; } + std::string getEngineName() const override { return "Local"; } + + Path getPath() const override { return path; } + void setPath(const Path & path_) override { path = path_; } + + const Paths & getPaths() const override { return paths; } + void setPaths(const Paths & paths_) override { paths = paths_; } + + String getNamespace() const override { return ""; } + String getDataSourceDescription() const override { return ""; } + StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr &) const override; + + ConfigurationPtr clone() override { return std::make_shared(*this); } + + ObjectStoragePtr createObjectStorage(ContextPtr, bool) override { return std::make_shared("/"); } + + void addStructureAndFormatToArgs(ASTs &, const String &, const String &, ContextPtr) override { } + +private: + void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override; + void fromAST(ASTs & args, ContextPtr context, bool with_structure) override; + Path path; + Paths paths; +}; + +} diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index a0f189e92fc..f0686c144ba 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -465,6 +465,12 @@ SchemaCache & StorageObjectStorage::getSchemaCache(const ContextPtr & context, c DEFAULT_SCHEMA_CACHE_ELEMENTS)); return schema_cache; } + else if (storage_type_name == "local") + { + static SchemaCache schema_cache( + context->getConfigRef().getUInt("schema_inference_cache_max_elements_for_local", DEFAULT_SCHEMA_CACHE_ELEMENTS)); + return schema_cache; + } else throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported storage type: {}", storage_type_name); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index cae0db48f31..562ca259089 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -162,7 +162,7 @@ public: ContextPtr local_context, bool with_table_structure); - /// Storage type: s3, hdfs, azure. + /// Storage type: s3, hdfs, azure, local. virtual std::string getTypeName() const = 0; /// Engine name: S3, HDFS, Azure. virtual std::string getEngineName() const = 0; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 04e319cd0b8..3ae884bf98c 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -417,10 +417,7 @@ std::future StorageObjectStorageSource } std::unique_ptr StorageObjectStorageSource::createReadBuffer( - const ObjectInfo & object_info, - const ObjectStoragePtr & object_storage, - const ContextPtr & context_, - const LoggerPtr & log) + const ObjectInfo & object_info, const ObjectStoragePtr & object_storage, const ContextPtr & context_, const LoggerPtr & log) { const auto & object_size = object_info.metadata->size_bytes; diff --git a/src/TableFunctions/ITableFunctionDataLake.h b/src/TableFunctions/ITableFunctionDataLake.h index fe6e5b3e593..db8287f97bf 100644 --- a/src/TableFunctions/ITableFunctionDataLake.h +++ b/src/TableFunctions/ITableFunctionDataLake.h @@ -76,6 +76,21 @@ struct TableFunctionIcebergName static constexpr auto name = "iceberg"; }; +struct TableFunctionIcebergS3Name +{ + static constexpr auto name = "icebergS3"; +}; + +struct TableFunctionIcebergAzureName +{ + static constexpr auto name = "icebergAzure"; +}; + +struct TableFunctionIcebergLocalName +{ + static constexpr auto name = "icebergLocal"; +}; + struct TableFunctionDeltaLakeName { static constexpr auto name = "deltaLake"; @@ -86,14 +101,20 @@ struct TableFunctionHudiName static constexpr auto name = "hudi"; }; -#if USE_AWS_S3 #if USE_AVRO +# if USE_AWS_S3 using TableFunctionIceberg = ITableFunctionDataLake; +using TableFunctionIcebergS3 = ITableFunctionDataLake; +# endif +# if USE_AZURE_BLOB_STORAGE +using TableFunctionIcebergAzure = ITableFunctionDataLake; +# endif +using TableFunctionIcebergLocal = ITableFunctionDataLake; #endif -#if USE_PARQUET +#if USE_AWS_S3 +# if USE_PARQUET using TableFunctionDeltaLake = ITableFunctionDataLake; #endif using TableFunctionHudi = ITableFunctionDataLake; #endif - } diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 39392a4c44c..9cebb91044a 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -14,10 +14,11 @@ #include #include -#include -#include -#include #include +#include +#include +#include +#include namespace DB @@ -223,5 +224,5 @@ template class TableFunctionObjectStorage template class TableFunctionObjectStorage; template class TableFunctionObjectStorage; #endif - +template class TableFunctionObjectStorage; } diff --git a/src/TableFunctions/TableFunctionObjectStorage.h b/src/TableFunctions/TableFunctionObjectStorage.h index 86b8f0d5e14..3468e5c5007 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.h +++ b/src/TableFunctions/TableFunctionObjectStorage.h @@ -1,11 +1,11 @@ #pragma once -#include "config.h" -#include -#include #include -#include +#include #include +#include +#include +#include "config.h" namespace DB { @@ -14,6 +14,7 @@ class Context; class StorageS3Configuration; class StorageAzureConfiguration; class StorageHDFSConfiguration; +class StorageLocalConfiguration; struct S3StorageSettings; struct AzureStorageSettings; struct HDFSStorageSettings; @@ -90,6 +91,17 @@ struct HDFSDefinition static constexpr auto max_number_of_arguments = 4; }; +struct LocalDefinition +{ + static constexpr auto name = "local"; + static constexpr auto storage_type_name = "Local"; + static constexpr auto signature = " - path\n" + " - path, format\n" + " - path, format, structure\n" + " - path, format, structure, compression_method\n"; + static constexpr auto max_number_of_arguments = 4; +}; + template class TableFunctionObjectStorage : public ITableFunction { @@ -169,4 +181,6 @@ using TableFunctionAzureBlob = TableFunctionObjectStorage; #endif + +using TableFunctionLocal = TableFunctionObjectStorage; } diff --git a/src/TableFunctions/registerDataLakeTableFunctions.cpp b/src/TableFunctions/registerDataLakeTableFunctions.cpp index 15a6668f434..8361d8a7977 100644 --- a/src/TableFunctions/registerDataLakeTableFunctions.cpp +++ b/src/TableFunctions/registerDataLakeTableFunctions.cpp @@ -4,24 +4,43 @@ namespace DB { -#if USE_AWS_S3 #if USE_AVRO void registerTableFunctionIceberg(TableFunctionFactory & factory) { +# if USE_AWS_S3 factory.registerFunction( - { - .documentation = - { - .description=R"(The table function can be used to read the Iceberg table stored on object store.)", + {.documentation + = {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store. Alias to icebergS3)", .examples{{"iceberg", "SELECT * FROM iceberg(url, access_key_id, secret_access_key)", ""}}, - .categories{"DataLake"} - }, - .allow_readonly = false - }); + .categories{"DataLake"}}, + .allow_readonly = false}); + factory.registerFunction( + {.documentation + = {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store.)", + .examples{{"icebergS3", "SELECT * FROM icebergS3(url, access_key_id, secret_access_key)", ""}}, + .categories{"DataLake"}}, + .allow_readonly = false}); + +# endif +# if USE_AZURE_BLOB_STORAGE + factory.registerFunction( + {.documentation + = {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store.)", + .examples{{"icebergAzure", "SELECT * FROM icebergAzure(url, access_key_id, secret_access_key)", ""}}, + .categories{"DataLake"}}, + .allow_readonly = false}); +# endif + factory.registerFunction( + {.documentation + = {.description = R"(The table function can be used to read the Iceberg table stored locally.)", + .examples{{"icebergLocal", "SELECT * FROM icebergLocal(filename)", ""}}, + .categories{"DataLake"}}, + .allow_readonly = false}); } #endif -#if USE_PARQUET +#if USE_AWS_S3 +# if USE_PARQUET void registerTableFunctionDeltaLake(TableFunctionFactory & factory) { factory.registerFunction( @@ -55,11 +74,11 @@ void registerTableFunctionHudi(TableFunctionFactory & factory) void registerDataLakeTableFunctions(TableFunctionFactory & factory) { UNUSED(factory); -#if USE_AWS_S3 #if USE_AVRO registerTableFunctionIceberg(factory); #endif -#if USE_PARQUET +#if USE_AWS_S3 +# if USE_PARQUET registerTableFunctionDeltaLake(factory); #endif registerTableFunctionHudi(factory); diff --git a/tests/integration/helpers/s3_tools.py b/tests/integration/helpers/s3_tools.py index 0c3538c3c39..5b727060e69 100644 --- a/tests/integration/helpers/s3_tools.py +++ b/tests/integration/helpers/s3_tools.py @@ -2,30 +2,92 @@ from minio import Minio import glob import os import json +import shutil -def upload_directory(minio_client, bucket_name, local_path, s3_path): - result_files = [] - for local_file in glob.glob(local_path + "/**"): - if os.path.isfile(local_file): +from enum import Enum + + +class CloudUploader: + + def upload_directory(self, local_path, remote_blob_path, **kwargs): + print(kwargs) + result_files = [] + # print(f"Arguments: {local_path}, {s3_path}") + # for local_file in glob.glob(local_path + "/**"): + # print("Local file: {}", local_file) + for local_file in glob.glob(local_path + "/**"): result_local_path = os.path.join(local_path, local_file) - result_s3_path = os.path.join(s3_path, local_file) - print(f"Putting file {result_local_path} to {result_s3_path}") - minio_client.fput_object( - bucket_name=bucket_name, - object_name=result_s3_path, - file_path=result_local_path, + result_remote_blob_path = os.path.join(remote_blob_path, local_file) + if os.path.isfile(local_file): + self.upload_file(result_local_path, result_remote_blob_path, **kwargs) + result_files.append(result_remote_blob_path) + else: + files = self.upload_directory( + result_local_path, result_remote_blob_path, **kwargs + ) + result_files.extend(files) + return result_files + + +class S3Uploader(CloudUploader): + def __init__(self, minio_client, bucket_name): + self.minio_client = minio_client + self.bucket_name = bucket_name + + def upload_file(self, local_path, remote_blob_path, bucket=None): + print(f"Upload to bucket: {bucket}") + if bucket is None: + bucket = self.bucket_name + self.minio_client.fput_object( + bucket_name=bucket, + object_name=remote_blob_path, + file_path=local_path, + ) + + +class LocalUploader(CloudUploader): + + def __init__(self, clickhouse_node): + self.clickhouse_node = clickhouse_node + + def upload_file(self, local_path, remote_blob_path): + dir_path = os.path.dirname(remote_blob_path) + if dir_path != "": + self.clickhouse_node.exec_in_container( + [ + "bash", + "-c", + "mkdir -p {}".format(dir_path), + ] ) - result_files.append(result_s3_path) + self.clickhouse_node.copy_file_to_container(local_path, remote_blob_path) + + +class AzureUploader(CloudUploader): + + def __init__(self, blob_service_client, container_name): + self.blob_service_client = blob_service_client + self.container_client = self.blob_service_client.get_container_client( + container_name + ) + + def upload_file(self, local_path, remote_blob_path, container_name=None): + if container_name is None: + container_client = self.container_client else: - files = upload_directory( - minio_client, - bucket_name, - os.path.join(local_path, local_file), - os.path.join(s3_path, local_file), + container_client = self.blob_service_client.get_container_client( + container_name ) - result_files.extend(files) - return result_files + blob_client = container_client.get_blob_client(remote_blob_path) + with open(local_path, "rb") as data: + blob_client.upload_blob(data, overwrite=True) + + +def upload_directory(minio_client, bucket, local_path, remote_path): + return S3Uploader(minio_client=minio_client, bucket_name=bucket).upload_directory( + local_path, remote_path + ) def get_file_contents(minio_client, bucket, s3_path): diff --git a/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml b/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml index d4c54e2d13d..b488638dd19 100644 --- a/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml +++ b/tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml @@ -5,5 +5,11 @@ minio minio123 + + devstoreaccount1 + Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== + + + diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 7762d17b96f..176c7e209bd 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -28,12 +28,15 @@ from pyspark.sql.functions import monotonically_increasing_id, row_number from pyspark.sql.window import Window from pyspark.sql.readwriter import DataFrameWriter, DataFrameWriterV2 from minio.deleteobjects import DeleteObject +from azure.storage.blob import BlobServiceClient from helpers.s3_tools import ( prepare_s3_bucket, - upload_directory, get_file_contents, list_s3_objects, + S3Uploader, + AzureUploader, + LocalUploader, ) SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -67,6 +70,7 @@ def started_cluster(): main_configs=["configs/config.d/named_collections.xml"], user_configs=["configs/users.d/users.xml"], with_minio=True, + with_azurite=True, stay_alive=True, ) @@ -77,6 +81,25 @@ def started_cluster(): logging.info("S3 bucket created") cluster.spark_session = get_spark() + cluster.default_s3_uploader = S3Uploader( + cluster.minio_client, cluster.minio_bucket + ) + + cluster.azure_container_name = "mycontainer" + + cluster.blob_service_client = cluster.blob_service_client + + container_client = cluster.blob_service_client.create_container( + cluster.azure_container_name + ) + + cluster.container_client = container_client + + cluster.default_azure_uploader = AzureUploader( + cluster.blob_service_client, cluster.azure_container_name + ) + + cluster.default_local_uploader = LocalUploader(cluster.instances["node1"]) yield cluster @@ -142,12 +165,65 @@ def generate_data(spark, start, end): return df -def create_iceberg_table(node, table_name, format="Parquet", bucket="root"): +def get_creation_expression( + storage_type, + table_name, + cluster, + format="Parquet", + table_function=False, + **kwargs, +): + if storage_type == "s3": + if "bucket" in kwargs: + bucket = kwargs["bucket"] + else: + bucket = cluster.minio_bucket + print(bucket) + if table_function: + return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')" + else: + return f""" + DROP TABLE IF EXISTS {table_name}; + CREATE TABLE {table_name} + ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')""" + elif storage_type == "azure": + if table_function: + return f""" + icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format}) + """ + else: + return f""" + DROP TABLE IF EXISTS {table_name}; + CREATE TABLE {table_name} + ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})""" + elif storage_type == "local": + if table_function: + return f""" + icebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format}) + """ + else: + return f""" + DROP TABLE IF EXISTS {table_name}; + CREATE TABLE {table_name} + ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format});""" + else: + raise Exception(f"Unknown iceberg storage type: {storage_type}") + + +def get_uuid_str(): + return str(uuid.uuid4()).replace("-", "_") + + +def create_iceberg_table( + storage_type, + node, + table_name, + cluster, + format="Parquet", + **kwargs, +): node.query( - f""" - DROP TABLE IF EXISTS {table_name}; - CREATE TABLE {table_name} - ENGINE=Iceberg(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')""" + get_creation_expression(storage_type, table_name, cluster, format, **kwargs) ) @@ -170,40 +246,69 @@ def create_initial_data_file( return result_path +def default_upload_directory( + started_cluster, storage_type, local_path, remote_path, **kwargs +): + if storage_type == "local": + return started_cluster.default_local_uploader.upload_directory( + local_path, remote_path, **kwargs + ) + elif storage_type == "s3": + print(kwargs) + return started_cluster.default_s3_uploader.upload_directory( + local_path, remote_path, **kwargs + ) + elif storage_type == "azure": + return started_cluster.default_azure_uploader.upload_directory( + local_path, remote_path, **kwargs + ) + else: + raise Exception(f"Unknown iceberg storage type: {storage_type}") + + @pytest.mark.parametrize("format_version", ["1", "2"]) -def test_single_iceberg_file(started_cluster, format_version): +@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +def test_single_iceberg_file(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session - minio_client = started_cluster.minio_client - bucket = started_cluster.minio_bucket - TABLE_NAME = "test_single_iceberg_file_" + format_version - - inserted_data = "SELECT number, toString(number) as string FROM numbers(100)" - parquet_data_path = create_initial_data_file( - started_cluster, instance, inserted_data, TABLE_NAME + TABLE_NAME = ( + "test_single_iceberg_file_" + + format_version + + "_" + + storage_type + + "_" + + get_uuid_str() ) - write_iceberg_from_file( - spark, parquet_data_path, TABLE_NAME, format_version=format_version + write_iceberg_from_df(spark, generate_data(spark, 0, 100), TABLE_NAME) + + files = default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", ) - files = upload_directory( - minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", "" - ) + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster) - create_iceberg_table(instance, TABLE_NAME) assert instance.query(f"SELECT * FROM {TABLE_NAME}") == instance.query( - inserted_data + "SELECT number, toString(number + 1) FROM numbers(100)" ) @pytest.mark.parametrize("format_version", ["1", "2"]) -def test_partition_by(started_cluster, format_version): +@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +def test_partition_by(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session - minio_client = started_cluster.minio_client - bucket = started_cluster.minio_bucket - TABLE_NAME = "test_partition_by_" + format_version + TABLE_NAME = ( + "test_partition_by_" + + format_version + + "_" + + storage_type + + "_" + + get_uuid_str() + ) write_iceberg_from_df( spark, @@ -214,22 +319,33 @@ def test_partition_by(started_cluster, format_version): partition_by="a", ) - files = upload_directory( - minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", "" + files = default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", ) assert len(files) == 14 # 10 partitiions + 4 metadata files - create_iceberg_table(instance, TABLE_NAME) + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster) assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 10 @pytest.mark.parametrize("format_version", ["1", "2"]) -def test_multiple_iceberg_files(started_cluster, format_version): +@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +def test_multiple_iceberg_files(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session minio_client = started_cluster.minio_client bucket = started_cluster.minio_bucket - TABLE_NAME = "test_multiple_iceberg_files_" + format_version + TABLE_NAME = ( + "test_multiple_iceberg_files_" + + format_version + + "_" + + storage_type + + "_" + + get_uuid_str() + ) write_iceberg_from_df( spark, @@ -239,9 +355,13 @@ def test_multiple_iceberg_files(started_cluster, format_version): format_version=format_version, ) - files = upload_directory( - minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}", "" + files = default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", ) + # ['/iceberg_data/default/test_multiple_iceberg_files/data/00000-1-35302d56-f1ed-494e-a85b-fbf85c05ab39-00001.parquet', # '/iceberg_data/default/test_multiple_iceberg_files/metadata/version-hint.text', # '/iceberg_data/default/test_multiple_iceberg_files/metadata/3127466b-299d-48ca-a367-6b9b1df1e78c-m0.avro', @@ -249,7 +369,7 @@ def test_multiple_iceberg_files(started_cluster, format_version): # '/iceberg_data/default/test_multiple_iceberg_files/metadata/v1.metadata.json'] assert len(files) == 5 - create_iceberg_table(instance, TABLE_NAME) + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster) assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100 write_iceberg_from_df( @@ -259,8 +379,11 @@ def test_multiple_iceberg_files(started_cluster, format_version): mode="append", format_version=format_version, ) - files = upload_directory( - minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}", "" + files = default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + "", ) assert len(files) == 9 @@ -271,12 +394,13 @@ def test_multiple_iceberg_files(started_cluster, format_version): @pytest.mark.parametrize("format_version", ["1", "2"]) -def test_types(started_cluster, format_version): +@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +def test_types(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session - minio_client = started_cluster.minio_client - bucket = started_cluster.minio_bucket - TABLE_NAME = "test_types_" + format_version + TABLE_NAME = ( + "test_types_" + format_version + "_" + storage_type + "_" + get_uuid_str() + ) data = [ ( @@ -302,22 +426,29 @@ def test_types(started_cluster, format_version): spark, df, TABLE_NAME, mode="overwrite", format_version=format_version ) - upload_directory(minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}", "") + default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + ) - create_iceberg_table(instance, TABLE_NAME) + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster) assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 1 assert ( instance.query(f"SELECT a, b, c, d, e FROM {TABLE_NAME}").strip() == "123\tstring\t2000-01-01\t['str1','str2']\ttrue" ) - table_function = f"iceberg(s3, filename='iceberg_data/default/{TABLE_NAME}/')" + table_function_expr = get_creation_expression( + storage_type, TABLE_NAME, started_cluster, table_function=True + ) assert ( - instance.query(f"SELECT a, b, c, d, e FROM {table_function}").strip() + instance.query(f"SELECT a, b, c, d, e FROM {table_function_expr}").strip() == "123\tstring\t2000-01-01\t['str1','str2']\ttrue" ) - assert instance.query(f"DESCRIBE {table_function} FORMAT TSV") == TSV( + assert instance.query(f"DESCRIBE {table_function_expr} FORMAT TSV") == TSV( [ ["a", "Nullable(Int32)"], ["b", "Nullable(String)"], @@ -329,12 +460,20 @@ def test_types(started_cluster, format_version): @pytest.mark.parametrize("format_version", ["1", "2"]) -def test_delete_files(started_cluster, format_version): +@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +def test_delete_files(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session minio_client = started_cluster.minio_client bucket = started_cluster.minio_bucket - TABLE_NAME = "test_delete_files_" + format_version + TABLE_NAME = ( + "test_delete_files_" + + format_version + + "_" + + storage_type + + "_" + + get_uuid_str() + ) write_iceberg_from_df( spark, @@ -344,17 +483,22 @@ def test_delete_files(started_cluster, format_version): format_version=format_version, ) - files = upload_directory( - minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", "" + files = default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", ) - - create_iceberg_table(instance, TABLE_NAME) + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster) assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100 spark.sql(f"DELETE FROM {TABLE_NAME} WHERE a >= 0") - files = upload_directory( - minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", "" + files = default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + "", ) assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 0 @@ -368,27 +512,41 @@ def test_delete_files(started_cluster, format_version): format_version=format_version, ) - files = upload_directory( - minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", "" + files = default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + "", ) assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100 spark.sql(f"DELETE FROM {TABLE_NAME} WHERE a >= 150") - files = upload_directory( - minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", "" + files = default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + "", ) assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 50 @pytest.mark.parametrize("format_version", ["1", "2"]) -def test_evolved_schema(started_cluster, format_version): +@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +def test_evolved_schema(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session minio_client = started_cluster.minio_client bucket = started_cluster.minio_bucket - TABLE_NAME = "test_evolved_schema_" + format_version + TABLE_NAME = ( + "test_evolved_schema_" + + format_version + + "_" + + storage_type + + "_" + + get_uuid_str() + ) write_iceberg_from_df( spark, @@ -398,19 +556,25 @@ def test_evolved_schema(started_cluster, format_version): format_version=format_version, ) - files = upload_directory( - minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", "" + files = default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", ) - create_iceberg_table(instance, TABLE_NAME) + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster) assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100 expected_data = instance.query(f"SELECT * FROM {TABLE_NAME} order by a, b") spark.sql(f"ALTER TABLE {TABLE_NAME} ADD COLUMNS (x bigint)") - files = upload_directory( - minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", "" + files = default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + "", ) error = instance.query_and_get_error(f"SELECT * FROM {TABLE_NAME}") @@ -422,12 +586,13 @@ def test_evolved_schema(started_cluster, format_version): assert data == expected_data -def test_row_based_deletes(started_cluster): +@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +def test_row_based_deletes(started_cluster, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session minio_client = started_cluster.minio_client bucket = started_cluster.minio_bucket - TABLE_NAME = "test_row_based_deletes" + TABLE_NAME = "test_row_based_deletes_" + storage_type + "_" + get_uuid_str() spark.sql( f"CREATE TABLE {TABLE_NAME} (id bigint, data string) USING iceberg TBLPROPERTIES ('format-version' = '2', 'write.update.mode'='merge-on-read', 'write.delete.mode'='merge-on-read', 'write.merge.mode'='merge-on-read')" @@ -436,17 +601,23 @@ def test_row_based_deletes(started_cluster): f"INSERT INTO {TABLE_NAME} select id, char(id + ascii('a')) from range(100)" ) - files = upload_directory( - minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", "" + files = default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", ) - create_iceberg_table(instance, TABLE_NAME) + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster) assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100 spark.sql(f"DELETE FROM {TABLE_NAME} WHERE id < 10") - files = upload_directory( - minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", "" + files = default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + "", ) error = instance.query_and_get_error(f"SELECT * FROM {TABLE_NAME}") @@ -454,13 +625,21 @@ def test_row_based_deletes(started_cluster): @pytest.mark.parametrize("format_version", ["1", "2"]) -def test_schema_inference(started_cluster, format_version): +@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +def test_schema_inference(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session - minio_client = started_cluster.minio_client - bucket = started_cluster.minio_bucket for format in ["Parquet", "ORC", "Avro"]: - TABLE_NAME = "test_schema_inference_" + format + "_" + format_version + TABLE_NAME = ( + "test_schema_inference_" + + format + + "_" + + format_version + + "_" + + storage_type + + "_" + + get_uuid_str() + ) # Types time, timestamptz, fixed are not supported in Spark. spark.sql( @@ -470,12 +649,16 @@ def test_schema_inference(started_cluster, format_version): spark.sql( f"insert into {TABLE_NAME} select 42, 4242, 42.42, 4242.4242, decimal(42.42), decimal(42.42), decimal(42.42), date('2020-01-01'), timestamp('2020-01-01 20:00:00'), 'hello', binary('hello'), array(1,2,3), map('key', 'value'), struct(42, 'hello'), array(struct(map('key', array(map('key', 42))), struct(42, 'hello')))" ) - - files = upload_directory( - minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", "" + files = default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", ) - create_iceberg_table(instance, TABLE_NAME, format) + create_iceberg_table( + storage_type, instance, TABLE_NAME, started_cluster, format=format + ) res = instance.query( f"DESC {TABLE_NAME} FORMAT TSVRaw", settings={"print_pretty_type_names": 0} @@ -510,12 +693,18 @@ def test_schema_inference(started_cluster, format_version): @pytest.mark.parametrize("format_version", ["1", "2"]) -def test_metadata_file_selection(started_cluster, format_version): +@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +def test_metadata_file_selection(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session - minio_client = started_cluster.minio_client - bucket = started_cluster.minio_bucket - TABLE_NAME = "test_metadata_selection_" + format_version + TABLE_NAME = ( + "test_metadata_selection_" + + format_version + + "_" + + storage_type + + "_" + + get_uuid_str() + ) spark.sql( f"CREATE TABLE {TABLE_NAME} (id bigint, data string) USING iceberg TBLPROPERTIES ('format-version' = '2', 'write.update.mode'='merge-on-read', 'write.delete.mode'='merge-on-read', 'write.merge.mode'='merge-on-read')" @@ -526,22 +715,31 @@ def test_metadata_file_selection(started_cluster, format_version): f"INSERT INTO {TABLE_NAME} select id, char(id + ascii('a')) from range(10)" ) - files = upload_directory( - minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", "" + files = default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", ) - create_iceberg_table(instance, TABLE_NAME) + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster) assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 500 @pytest.mark.parametrize("format_version", ["1", "2"]) -def test_metadata_file_format_with_uuid(started_cluster, format_version): +@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +def test_metadata_file_format_with_uuid(started_cluster, format_version, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session - minio_client = started_cluster.minio_client - bucket = started_cluster.minio_bucket - TABLE_NAME = "test_metadata_selection_with_uuid_" + format_version + TABLE_NAME = ( + "test_metadata_selection_with_uuid_" + + format_version + + "_" + + storage_type + + "_" + + get_uuid_str() + ) spark.sql( f"CREATE TABLE {TABLE_NAME} (id bigint, data string) USING iceberg TBLPROPERTIES ('format-version' = '2', 'write.update.mode'='merge-on-read', 'write.delete.mode'='merge-on-read', 'write.merge.mode'='merge-on-read')" @@ -555,40 +753,48 @@ def test_metadata_file_format_with_uuid(started_cluster, format_version): for i in range(50): os.rename( f"/iceberg_data/default/{TABLE_NAME}/metadata/v{i + 1}.metadata.json", - f"/iceberg_data/default/{TABLE_NAME}/metadata/{str(i).zfill(5)}-{uuid.uuid4()}.metadata.json", + f"/iceberg_data/default/{TABLE_NAME}/metadata/{str(i).zfill(5)}-{get_uuid_str()}.metadata.json", ) - files = upload_directory( - minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", "" + files = default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", ) - create_iceberg_table(instance, TABLE_NAME) + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster) assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 500 -def test_restart_broken(started_cluster): +def test_restart_broken_s3(started_cluster): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session + TABLE_NAME = "test_restart_broken_table_function_s3" + "_" + get_uuid_str() + minio_client = started_cluster.minio_client bucket = "broken2" - TABLE_NAME = "test_restart_broken_table_function" if not minio_client.bucket_exists(bucket): minio_client.make_bucket(bucket) - parquet_data_path = create_initial_data_file( - started_cluster, - instance, - "SELECT number, toString(number) FROM numbers(100)", + write_iceberg_from_df( + spark, + generate_data(spark, 0, 100), TABLE_NAME, + mode="overwrite", + format_version="1", ) - write_iceberg_from_file(spark, parquet_data_path, TABLE_NAME, format_version="1") - files = upload_directory( - minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", "" + files = default_upload_directory( + started_cluster, + "s3", + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + bucket=bucket, ) - create_iceberg_table(instance, TABLE_NAME, bucket=bucket) + create_iceberg_table("s3", instance, TABLE_NAME, started_cluster, bucket=bucket) assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100 s3_objects = list_s3_objects(minio_client, bucket, prefix="") @@ -613,8 +819,12 @@ def test_restart_broken(started_cluster): minio_client.make_bucket(bucket) - files = upload_directory( - minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", "" + files = default_upload_directory( + started_cluster, + "s3", + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + bucket=bucket, ) assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100