Merge pull request #68210 from ClickHouse/divanik/add_local_and_azure_iceberg_support

Support partial Iceberg reading in azure and local storages
This commit is contained in:
Daniil Ivanik 2024-08-27 11:52:43 +00:00 committed by GitHub
commit e0dc32bc61
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 734 additions and 216 deletions

View File

@ -6,28 +6,34 @@ sidebar_label: Iceberg
# Iceberg Table Engine
This engine provides a read-only integration with existing Apache [Iceberg](https://iceberg.apache.org/) tables in Amazon S3.
This engine provides a read-only integration with existing Apache [Iceberg](https://iceberg.apache.org/) tables in Amazon S3, Azure and locally stored tables.
## Create Table
Note that the Iceberg table must already exist in S3, this command does not take DDL parameters to create a new table.
Note that the Iceberg table must already exist in the storage, this command does not take DDL parameters to create a new table.
``` sql
CREATE TABLE iceberg_table
ENGINE = Iceberg(url, [aws_access_key_id, aws_secret_access_key,])
CREATE TABLE iceberg_table_s3
ENGINE = IcebergS3(url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression])
CREATE TABLE iceberg_table_azure
ENGINE = IcebergAzure(connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])
CREATE TABLE iceberg_table_local
ENGINE = IcebergLocal(path_to_table, [,format] [,compression_method])
```
**Engine parameters**
**Engine arguments**
- `url` — url with the path to an existing Iceberg table.
- `aws_access_key_id`, `aws_secret_access_key` - Long-term credentials for the [AWS](https://aws.amazon.com/) account user. You can use these to authenticate your requests. Parameter is optional. If credentials are not specified, they are used from the configuration file.
Description of the arguments coincides with description of arguments in engines `S3`, `AzureBlobStorage` and `File` correspondingly.
`format` stands for the format of data files in the Iceberg table.
Engine parameters can be specified using [Named Collections](../../../operations/named-collections.md)
**Example**
```sql
CREATE TABLE iceberg_table ENGINE=Iceberg('http://test.s3.amazonaws.com/clickhouse-bucket/test_table', 'test', 'test')
CREATE TABLE iceberg_table ENGINE=IcebergS3('http://test.s3.amazonaws.com/clickhouse-bucket/test_table', 'test', 'test')
```
Using named collections:
@ -45,9 +51,15 @@ Using named collections:
```
```sql
CREATE TABLE iceberg_table ENGINE=Iceberg(iceberg_conf, filename = 'test_table')
CREATE TABLE iceberg_table ENGINE=IcebergS3(iceberg_conf, filename = 'test_table')
```
**Aliases**
Table engine `Iceberg` is an alias to `IcebergS3` now.
## See also
- [iceberg table function](/docs/en/sql-reference/table-functions/iceberg.md)

View File

@ -6,35 +6,37 @@ sidebar_label: iceberg
# iceberg Table Function
Provides a read-only table-like interface to Apache [Iceberg](https://iceberg.apache.org/) tables in Amazon S3.
Provides a read-only table-like interface to Apache [Iceberg](https://iceberg.apache.org/) tables in Amazon S3, Azure or locally stored.
## Syntax
``` sql
iceberg(url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure])
icebergS3(url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method])
icebergS3(named_collection[, option=value [,..]])
icebergAzure(connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method])
icebergAzure(named_collection[, option=value [,..]])
icebergLocal(path_to_table, [,format] [,compression_method])
icebergLocal(named_collection[, option=value [,..]])
```
## Arguments
- `url` — Bucket url with the path to an existing Iceberg table in S3.
- `aws_access_key_id`, `aws_secret_access_key` - Long-term credentials for the [AWS](https://aws.amazon.com/) account user. You can use these to authenticate your requests. These parameters are optional. If credentials are not specified, they are used from the ClickHouse configuration. For more information see [Using S3 for Data Storage](/docs/en/engines/table-engines/mergetree-family/mergetree.md/#table_engine-mergetree-s3).
- `format` — The [format](/docs/en/interfaces/formats.md/#formats) of the file. By default `Parquet` is used.
- `structure` — Structure of the table. Format `'column1_name column1_type, column2_name column2_type, ...'`.
Engine parameters can be specified using [Named Collections](/docs/en/operations/named-collections.md).
Description of the arguments coincides with description of arguments in table functions `s3`, `azureBlobStorage` and `file` correspondingly.
`format` stands for the format of data files in the Iceberg table.
**Returned value**
A table with the specified structure for reading data in the specified Iceberg table in S3.
A table with the specified structure for reading data in the specified Iceberg table.
**Example**
```sql
SELECT * FROM iceberg('http://test.s3.amazonaws.com/clickhouse-bucket/test_table', 'test', 'test')
SELECT * FROM icebergS3('http://test.s3.amazonaws.com/clickhouse-bucket/test_table', 'test', 'test')
```
:::important
ClickHouse currently supports reading v1 (v2 support is coming soon!) of the Iceberg format via the `iceberg` table function and `Iceberg` table engine.
ClickHouse currently supports reading v1 and v2 of the Iceberg format via the `icebergS3`, `icebergAzure` and `icebergLocal` table functions and `IcebergS3`, `icebergAzure` ans `icebergLocal` table engines.
:::
## Defining a named collection
@ -56,10 +58,14 @@ Here is an example of configuring a named collection for storing the URL and cre
```
```sql
SELECT * FROM iceberg(iceberg_conf, filename = 'test_table')
DESCRIBE iceberg(iceberg_conf, filename = 'test_table')
SELECT * FROM icebergS3(iceberg_conf, filename = 'test_table')
DESCRIBE icebergS3(iceberg_conf, filename = 'test_table')
```
**Aliases**
Table function `iceberg` is an alias to `icebergS3` now.
**See Also**
- [Iceberg engine](/docs/en/engines/table-engines/integrations/iceberg.md)

View File

@ -111,6 +111,7 @@ add_headers_and_sources(dbms Storages/ObjectStorage)
add_headers_and_sources(dbms Storages/ObjectStorage/Azure)
add_headers_and_sources(dbms Storages/ObjectStorage/S3)
add_headers_and_sources(dbms Storages/ObjectStorage/HDFS)
add_headers_and_sources(dbms Storages/ObjectStorage/Local)
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes)
add_headers_and_sources(dbms Common/NamedCollections)

View File

@ -43,39 +43,21 @@ bool LocalObjectStorage::exists(const StoredObject & object) const
std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOLINT
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
std::optional<size_t>,
std::optional<size_t>) const
{
auto modified_settings = patchSettings(read_settings);
auto global_context = Context::getGlobalContextInstance();
auto read_buffer_creator =
[=] (bool /* restricted_seek */, const StoredObject & object)
-> std::unique_ptr<ReadBufferFromFileBase>
{
return createReadBufferFromFileBase(object.remote_path, modified_settings, read_hint, file_size);
};
auto read_buffer_creator = [=](bool /* restricted_seek */, const StoredObject & object) -> std::unique_ptr<ReadBufferFromFileBase>
{ return std::make_unique<ReadBufferFromFile>(object.remote_path); };
switch (read_settings.remote_fs_method)
{
case RemoteFSReadMethod::read:
{
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), objects, "file:", modified_settings,
global_context->getFilesystemCacheLog(), /* use_external_buffer */false);
}
case RemoteFSReadMethod::threadpool:
{
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), objects, "file:", modified_settings,
global_context->getFilesystemCacheLog(), /* use_external_buffer */true);
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousBoundedReadBuffer>(
std::move(impl), reader, read_settings,
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
}
return std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
"file:",
modified_settings,
global_context->getFilesystemCacheLog(),
/* use_external_buffer */ false);
}
ReadSettings LocalObjectStorage::patchSettings(const ReadSettings & read_settings) const

View File

@ -148,10 +148,12 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
{
if (engine_args.size() < 3 || engine_args.size() > (with_structure ? 8 : 7))
{
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage AzureBlobStorage requires 3 to 7 arguments: "
"AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, "
"[account_name, account_key, format, compression, structure)])");
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage AzureBlobStorage requires 3 to {} arguments: "
"AzureBlobStorage(connection_string|storage_account_url, container_name, blobpath, "
"[account_name, account_key, format, compression, structure)])",
(with_structure ? 8 : 7));
}
for (auto & engine_arg : engine_args)

View File

@ -3,7 +3,7 @@
#include "config.h"
#include <set>
#if USE_AWS_S3 && USE_PARQUET
#if USE_PARQUET
#include <Common/logger_useful.h>
#include <Columns/ColumnString.h>

View File

@ -2,7 +2,7 @@
#include "config.h"
#if USE_AWS_S3 && USE_AVRO
#if USE_AVRO
#include <Formats/FormatFactory.h>
#include <Storages/IStorage.h>

View File

@ -1,6 +1,6 @@
#include "config.h"
#if USE_AWS_S3 && USE_AVRO
#if USE_AVRO
#include <Common/logger_useful.h>
#include <Core/Settings.h>

View File

@ -1,6 +1,6 @@
#pragma once
#if USE_AWS_S3 && USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format.
#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format.
#include <Interpreters/Context_fwd.h>
#include <Core/Types.h>

View File

@ -2,10 +2,12 @@
#if USE_AWS_S3
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
#include <Storages/ObjectStorage/DataLakes/IStorageDataLake.h>
#include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
# include <Storages/ObjectStorage/Azure/Configuration.h>
# include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
# include <Storages/ObjectStorage/DataLakes/IStorageDataLake.h>
# include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
# include <Storages/ObjectStorage/Local/Configuration.h>
# include <Storages/ObjectStorage/S3/Configuration.h>
namespace DB
@ -22,6 +24,54 @@ void registerStorageIceberg(StorageFactory & factory)
auto configuration = std::make_shared<StorageS3Configuration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageIceberg::create(
configuration, args.getContext(), args.table_id, args.columns, args.constraints, args.comment, std::nullopt, args.mode);
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
factory.registerStorage(
"IcebergS3",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3Configuration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageIceberg::create(
configuration, args.getContext(), args.table_id, args.columns, args.constraints, args.comment, std::nullopt, args.mode);
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
factory.registerStorage(
"IcebergAzure",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageAzureConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), true);
return StorageIceberg::create(
configuration, args.getContext(), args.table_id, args.columns, args.constraints, args.comment, std::nullopt, args.mode);
},
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::AZURE,
});
factory.registerStorage(
"IcebergLocal",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageLocalConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return StorageIceberg::create(
configuration, args.getContext(), args.table_id, args.columns,
args.constraints, args.comment, std::nullopt, args.mode);
@ -29,7 +79,7 @@ void registerStorageIceberg(StorageFactory & factory)
{
.supports_settings = false,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
.source_access_type = AccessType::FILE,
});
}

View File

@ -0,0 +1,77 @@
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Storages/ObjectStorage/Local/Configuration.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include "Common/NamedCollections/NamedCollections.h"
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
void StorageLocalConfiguration::fromNamedCollection(const NamedCollection & collection, ContextPtr)
{
path = collection.get<String>("path");
format = collection.getOrDefault<String>("format", "auto");
compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
structure = collection.getOrDefault<String>("structure", "auto");
paths = {path};
}
void StorageLocalConfiguration::fromAST(ASTs & args, ContextPtr context, bool with_structure)
{
const size_t max_args_num = with_structure ? 4 : 3;
if (args.empty() || args.size() > max_args_num)
{
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Expected not more than {} arguments", max_args_num);
}
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
path = checkAndGetLiteralArgument<String>(args[0], "path");
if (args.size() > 1)
{
format = checkAndGetLiteralArgument<String>(args[1], "format_name");
}
if (with_structure)
{
if (args.size() > 2)
{
structure = checkAndGetLiteralArgument<String>(args[2], "structure");
}
if (args.size() > 3)
{
compression_method = checkAndGetLiteralArgument<String>(args[3], "compression_method");
}
}
else if (args.size() > 2)
{
compression_method = checkAndGetLiteralArgument<String>(args[2], "compression_method");
}
paths = {path};
}
StorageObjectStorage::QuerySettings StorageLocalConfiguration::getQuerySettings(const ContextPtr & context) const
{
const auto & settings = context->getSettingsRef();
return StorageObjectStorage::QuerySettings{
.truncate_on_insert = settings.engine_file_truncate_on_insert,
.create_new_file_on_insert = false,
.schema_inference_use_cache = settings.schema_inference_use_cache_for_file,
.schema_inference_mode = settings.schema_inference_mode,
.skip_empty_files = settings.engine_file_skip_empty_files,
.list_object_keys_size = 0,
.throw_on_zero_files_match = false,
.ignore_non_existent_file = false};
}
}

View File

@ -0,0 +1,52 @@
#pragma once
#include <memory>
#include "Disks/ObjectStorages/Local/LocalObjectStorage.h"
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
class StorageLocalConfiguration : public StorageObjectStorage::Configuration
{
public:
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
static constexpr auto type_name = "local";
StorageLocalConfiguration() = default;
StorageLocalConfiguration(const StorageLocalConfiguration & other) = default;
std::string getTypeName() const override { return type_name; }
std::string getEngineName() const override { return "Local"; }
Path getPath() const override { return path; }
void setPath(const Path & path_) override { path = path_; }
const Paths & getPaths() const override { return paths; }
void setPaths(const Paths & paths_) override { paths = paths_; }
String getNamespace() const override { return ""; }
String getDataSourceDescription() const override { return ""; }
StorageObjectStorage::QuerySettings getQuerySettings(const ContextPtr &) const override;
ConfigurationPtr clone() override { return std::make_shared<StorageLocalConfiguration>(*this); }
ObjectStoragePtr createObjectStorage(ContextPtr, bool) override { return std::make_shared<LocalObjectStorage>("/"); }
void addStructureAndFormatToArgs(ASTs &, const String &, const String &, ContextPtr) override { }
private:
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override;
void fromAST(ASTs & args, ContextPtr context, bool with_structure) override;
Path path;
Paths paths;
};
}

View File

@ -465,6 +465,12 @@ SchemaCache & StorageObjectStorage::getSchemaCache(const ContextPtr & context, c
DEFAULT_SCHEMA_CACHE_ELEMENTS));
return schema_cache;
}
else if (storage_type_name == "local")
{
static SchemaCache schema_cache(
context->getConfigRef().getUInt("schema_inference_cache_max_elements_for_local", DEFAULT_SCHEMA_CACHE_ELEMENTS));
return schema_cache;
}
else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported storage type: {}", storage_type_name);
}

View File

@ -162,7 +162,7 @@ public:
ContextPtr local_context,
bool with_table_structure);
/// Storage type: s3, hdfs, azure.
/// Storage type: s3, hdfs, azure, local.
virtual std::string getTypeName() const = 0;
/// Engine name: S3, HDFS, Azure.
virtual std::string getEngineName() const = 0;

View File

@ -417,10 +417,7 @@ std::future<StorageObjectStorageSource::ReaderHolder> StorageObjectStorageSource
}
std::unique_ptr<ReadBuffer> StorageObjectStorageSource::createReadBuffer(
const ObjectInfo & object_info,
const ObjectStoragePtr & object_storage,
const ContextPtr & context_,
const LoggerPtr & log)
const ObjectInfo & object_info, const ObjectStoragePtr & object_storage, const ContextPtr & context_, const LoggerPtr & log)
{
const auto & object_size = object_info.metadata->size_bytes;

View File

@ -76,6 +76,21 @@ struct TableFunctionIcebergName
static constexpr auto name = "iceberg";
};
struct TableFunctionIcebergS3Name
{
static constexpr auto name = "icebergS3";
};
struct TableFunctionIcebergAzureName
{
static constexpr auto name = "icebergAzure";
};
struct TableFunctionIcebergLocalName
{
static constexpr auto name = "icebergLocal";
};
struct TableFunctionDeltaLakeName
{
static constexpr auto name = "deltaLake";
@ -86,14 +101,20 @@ struct TableFunctionHudiName
static constexpr auto name = "hudi";
};
#if USE_AWS_S3
#if USE_AVRO
# if USE_AWS_S3
using TableFunctionIceberg = ITableFunctionDataLake<TableFunctionIcebergName, StorageIceberg, TableFunctionS3>;
using TableFunctionIcebergS3 = ITableFunctionDataLake<TableFunctionIcebergS3Name, StorageIceberg, TableFunctionS3>;
# endif
# if USE_AZURE_BLOB_STORAGE
using TableFunctionIcebergAzure = ITableFunctionDataLake<TableFunctionIcebergAzureName, StorageIceberg, TableFunctionAzureBlob>;
# endif
using TableFunctionIcebergLocal = ITableFunctionDataLake<TableFunctionIcebergLocalName, StorageIceberg, TableFunctionLocal>;
#endif
#if USE_PARQUET
#if USE_AWS_S3
# if USE_PARQUET
using TableFunctionDeltaLake = ITableFunctionDataLake<TableFunctionDeltaLakeName, StorageDeltaLake, TableFunctionS3>;
#endif
using TableFunctionHudi = ITableFunctionDataLake<TableFunctionHudiName, StorageHudi, TableFunctionS3>;
#endif
}

View File

@ -14,10 +14,11 @@
#include <Storages/ObjectStorage/Utils.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ObjectStorage/HDFS/Configuration.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/Azure/Configuration.h>
#include <Storages/ObjectStorage/HDFS/Configuration.h>
#include <Storages/ObjectStorage/Local/Configuration.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
namespace DB
@ -223,5 +224,5 @@ template class TableFunctionObjectStorage<OSSDefinition, StorageS3Configuration>
template class TableFunctionObjectStorage<HDFSDefinition, StorageHDFSConfiguration>;
template class TableFunctionObjectStorage<HDFSClusterDefinition, StorageHDFSConfiguration>;
#endif
template class TableFunctionObjectStorage<LocalDefinition, StorageLocalConfiguration>;
}

View File

@ -1,11 +1,11 @@
#pragma once
#include "config.h"
#include <TableFunctions/ITableFunction.h>
#include <Formats/FormatFactory.h>
#include <Disks/ObjectStorages/IObjectStorage_fwd.h>
#include <Storages/VirtualColumnUtils.h>
#include <Formats/FormatFactory.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/VirtualColumnUtils.h>
#include <TableFunctions/ITableFunction.h>
#include "config.h"
namespace DB
{
@ -14,6 +14,7 @@ class Context;
class StorageS3Configuration;
class StorageAzureConfiguration;
class StorageHDFSConfiguration;
class StorageLocalConfiguration;
struct S3StorageSettings;
struct AzureStorageSettings;
struct HDFSStorageSettings;
@ -90,6 +91,17 @@ struct HDFSDefinition
static constexpr auto max_number_of_arguments = 4;
};
struct LocalDefinition
{
static constexpr auto name = "local";
static constexpr auto storage_type_name = "Local";
static constexpr auto signature = " - path\n"
" - path, format\n"
" - path, format, structure\n"
" - path, format, structure, compression_method\n";
static constexpr auto max_number_of_arguments = 4;
};
template <typename Definition, typename Configuration>
class TableFunctionObjectStorage : public ITableFunction
{
@ -169,4 +181,6 @@ using TableFunctionAzureBlob = TableFunctionObjectStorage<AzureDefinition, Stora
#if USE_HDFS
using TableFunctionHDFS = TableFunctionObjectStorage<HDFSDefinition, StorageHDFSConfiguration>;
#endif
using TableFunctionLocal = TableFunctionObjectStorage<LocalDefinition, StorageLocalConfiguration>;
}

View File

@ -4,24 +4,43 @@
namespace DB
{
#if USE_AWS_S3
#if USE_AVRO
void registerTableFunctionIceberg(TableFunctionFactory & factory)
{
# if USE_AWS_S3
factory.registerFunction<TableFunctionIceberg>(
{
.documentation =
{
.description=R"(The table function can be used to read the Iceberg table stored on object store.)",
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store. Alias to icebergS3)",
.examples{{"iceberg", "SELECT * FROM iceberg(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}
},
.allow_readonly = false
});
.categories{"DataLake"}},
.allow_readonly = false});
factory.registerFunction<TableFunctionIcebergS3>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on S3 object store.)",
.examples{{"icebergS3", "SELECT * FROM icebergS3(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
# endif
# if USE_AZURE_BLOB_STORAGE
factory.registerFunction<TableFunctionIcebergAzure>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored on Azure object store.)",
.examples{{"icebergAzure", "SELECT * FROM icebergAzure(url, access_key_id, secret_access_key)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
# endif
factory.registerFunction<TableFunctionIcebergLocal>(
{.documentation
= {.description = R"(The table function can be used to read the Iceberg table stored locally.)",
.examples{{"icebergLocal", "SELECT * FROM icebergLocal(filename)", ""}},
.categories{"DataLake"}},
.allow_readonly = false});
}
#endif
#if USE_PARQUET
#if USE_AWS_S3
# if USE_PARQUET
void registerTableFunctionDeltaLake(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionDeltaLake>(
@ -55,11 +74,11 @@ void registerTableFunctionHudi(TableFunctionFactory & factory)
void registerDataLakeTableFunctions(TableFunctionFactory & factory)
{
UNUSED(factory);
#if USE_AWS_S3
#if USE_AVRO
registerTableFunctionIceberg(factory);
#endif
#if USE_PARQUET
#if USE_AWS_S3
# if USE_PARQUET
registerTableFunctionDeltaLake(factory);
#endif
registerTableFunctionHudi(factory);

View File

@ -2,30 +2,92 @@ from minio import Minio
import glob
import os
import json
import shutil
def upload_directory(minio_client, bucket_name, local_path, s3_path):
result_files = []
for local_file in glob.glob(local_path + "/**"):
if os.path.isfile(local_file):
from enum import Enum
class CloudUploader:
def upload_directory(self, local_path, remote_blob_path, **kwargs):
print(kwargs)
result_files = []
# print(f"Arguments: {local_path}, {s3_path}")
# for local_file in glob.glob(local_path + "/**"):
# print("Local file: {}", local_file)
for local_file in glob.glob(local_path + "/**"):
result_local_path = os.path.join(local_path, local_file)
result_s3_path = os.path.join(s3_path, local_file)
print(f"Putting file {result_local_path} to {result_s3_path}")
minio_client.fput_object(
bucket_name=bucket_name,
object_name=result_s3_path,
file_path=result_local_path,
result_remote_blob_path = os.path.join(remote_blob_path, local_file)
if os.path.isfile(local_file):
self.upload_file(result_local_path, result_remote_blob_path, **kwargs)
result_files.append(result_remote_blob_path)
else:
files = self.upload_directory(
result_local_path, result_remote_blob_path, **kwargs
)
result_files.extend(files)
return result_files
class S3Uploader(CloudUploader):
def __init__(self, minio_client, bucket_name):
self.minio_client = minio_client
self.bucket_name = bucket_name
def upload_file(self, local_path, remote_blob_path, bucket=None):
print(f"Upload to bucket: {bucket}")
if bucket is None:
bucket = self.bucket_name
self.minio_client.fput_object(
bucket_name=bucket,
object_name=remote_blob_path,
file_path=local_path,
)
class LocalUploader(CloudUploader):
def __init__(self, clickhouse_node):
self.clickhouse_node = clickhouse_node
def upload_file(self, local_path, remote_blob_path):
dir_path = os.path.dirname(remote_blob_path)
if dir_path != "":
self.clickhouse_node.exec_in_container(
[
"bash",
"-c",
"mkdir -p {}".format(dir_path),
]
)
result_files.append(result_s3_path)
self.clickhouse_node.copy_file_to_container(local_path, remote_blob_path)
class AzureUploader(CloudUploader):
def __init__(self, blob_service_client, container_name):
self.blob_service_client = blob_service_client
self.container_client = self.blob_service_client.get_container_client(
container_name
)
def upload_file(self, local_path, remote_blob_path, container_name=None):
if container_name is None:
container_client = self.container_client
else:
files = upload_directory(
minio_client,
bucket_name,
os.path.join(local_path, local_file),
os.path.join(s3_path, local_file),
container_client = self.blob_service_client.get_container_client(
container_name
)
result_files.extend(files)
return result_files
blob_client = container_client.get_blob_client(remote_blob_path)
with open(local_path, "rb") as data:
blob_client.upload_blob(data, overwrite=True)
def upload_directory(minio_client, bucket, local_path, remote_path):
return S3Uploader(minio_client=minio_client, bucket_name=bucket).upload_directory(
local_path, remote_path
)
def get_file_contents(minio_client, bucket, s3_path):

View File

@ -5,5 +5,11 @@
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
<azure>
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
</azure>
<local>
</local>
</named_collections>
</clickhouse>

View File

@ -28,12 +28,15 @@ from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window
from pyspark.sql.readwriter import DataFrameWriter, DataFrameWriterV2
from minio.deleteobjects import DeleteObject
from azure.storage.blob import BlobServiceClient
from helpers.s3_tools import (
prepare_s3_bucket,
upload_directory,
get_file_contents,
list_s3_objects,
S3Uploader,
AzureUploader,
LocalUploader,
)
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -67,6 +70,7 @@ def started_cluster():
main_configs=["configs/config.d/named_collections.xml"],
user_configs=["configs/users.d/users.xml"],
with_minio=True,
with_azurite=True,
stay_alive=True,
)
@ -77,6 +81,25 @@ def started_cluster():
logging.info("S3 bucket created")
cluster.spark_session = get_spark()
cluster.default_s3_uploader = S3Uploader(
cluster.minio_client, cluster.minio_bucket
)
cluster.azure_container_name = "mycontainer"
cluster.blob_service_client = cluster.blob_service_client
container_client = cluster.blob_service_client.create_container(
cluster.azure_container_name
)
cluster.container_client = container_client
cluster.default_azure_uploader = AzureUploader(
cluster.blob_service_client, cluster.azure_container_name
)
cluster.default_local_uploader = LocalUploader(cluster.instances["node1"])
yield cluster
@ -142,12 +165,65 @@ def generate_data(spark, start, end):
return df
def create_iceberg_table(node, table_name, format="Parquet", bucket="root"):
def get_creation_expression(
storage_type,
table_name,
cluster,
format="Parquet",
table_function=False,
**kwargs,
):
if storage_type == "s3":
if "bucket" in kwargs:
bucket = kwargs["bucket"]
else:
bucket = cluster.minio_bucket
print(bucket)
if table_function:
return f"icebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"""
elif storage_type == "azure":
if table_function:
return f"""
icebergAzure(azure, container = '{cluster.azure_container_name}', storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})
"""
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergAzure(azure, container = {cluster.azure_container_name}, storage_account_url = '{cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]}', blob_path = '/iceberg_data/default/{table_name}/', format={format})"""
elif storage_type == "local":
if table_function:
return f"""
icebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format})
"""
else:
return f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{table_name}/', format={format});"""
else:
raise Exception(f"Unknown iceberg storage type: {storage_type}")
def get_uuid_str():
return str(uuid.uuid4()).replace("-", "_")
def create_iceberg_table(
storage_type,
node,
table_name,
cluster,
format="Parquet",
**kwargs,
):
node.query(
f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name}
ENGINE=Iceberg(s3, filename = 'iceberg_data/default/{table_name}/', format={format}, url = 'http://minio1:9001/{bucket}/')"""
get_creation_expression(storage_type, table_name, cluster, format, **kwargs)
)
@ -170,40 +246,69 @@ def create_initial_data_file(
return result_path
def default_upload_directory(
started_cluster, storage_type, local_path, remote_path, **kwargs
):
if storage_type == "local":
return started_cluster.default_local_uploader.upload_directory(
local_path, remote_path, **kwargs
)
elif storage_type == "s3":
print(kwargs)
return started_cluster.default_s3_uploader.upload_directory(
local_path, remote_path, **kwargs
)
elif storage_type == "azure":
return started_cluster.default_azure_uploader.upload_directory(
local_path, remote_path, **kwargs
)
else:
raise Exception(f"Unknown iceberg storage type: {storage_type}")
@pytest.mark.parametrize("format_version", ["1", "2"])
def test_single_iceberg_file(started_cluster, format_version):
@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
def test_single_iceberg_file(started_cluster, format_version, storage_type):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_single_iceberg_file_" + format_version
inserted_data = "SELECT number, toString(number) as string FROM numbers(100)"
parquet_data_path = create_initial_data_file(
started_cluster, instance, inserted_data, TABLE_NAME
TABLE_NAME = (
"test_single_iceberg_file_"
+ format_version
+ "_"
+ storage_type
+ "_"
+ get_uuid_str()
)
write_iceberg_from_file(
spark, parquet_data_path, TABLE_NAME, format_version=format_version
write_iceberg_from_df(spark, generate_data(spark, 0, 100), TABLE_NAME)
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
)
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster)
create_iceberg_table(instance, TABLE_NAME)
assert instance.query(f"SELECT * FROM {TABLE_NAME}") == instance.query(
inserted_data
"SELECT number, toString(number + 1) FROM numbers(100)"
)
@pytest.mark.parametrize("format_version", ["1", "2"])
def test_partition_by(started_cluster, format_version):
@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
def test_partition_by(started_cluster, format_version, storage_type):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_partition_by_" + format_version
TABLE_NAME = (
"test_partition_by_"
+ format_version
+ "_"
+ storage_type
+ "_"
+ get_uuid_str()
)
write_iceberg_from_df(
spark,
@ -214,22 +319,33 @@ def test_partition_by(started_cluster, format_version):
partition_by="a",
)
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)
assert len(files) == 14 # 10 partitiions + 4 metadata files
create_iceberg_table(instance, TABLE_NAME)
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 10
@pytest.mark.parametrize("format_version", ["1", "2"])
def test_multiple_iceberg_files(started_cluster, format_version):
@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
def test_multiple_iceberg_files(started_cluster, format_version, storage_type):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_multiple_iceberg_files_" + format_version
TABLE_NAME = (
"test_multiple_iceberg_files_"
+ format_version
+ "_"
+ storage_type
+ "_"
+ get_uuid_str()
)
write_iceberg_from_df(
spark,
@ -239,9 +355,13 @@ def test_multiple_iceberg_files(started_cluster, format_version):
format_version=format_version,
)
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}", ""
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)
# ['/iceberg_data/default/test_multiple_iceberg_files/data/00000-1-35302d56-f1ed-494e-a85b-fbf85c05ab39-00001.parquet',
# '/iceberg_data/default/test_multiple_iceberg_files/metadata/version-hint.text',
# '/iceberg_data/default/test_multiple_iceberg_files/metadata/3127466b-299d-48ca-a367-6b9b1df1e78c-m0.avro',
@ -249,7 +369,7 @@ def test_multiple_iceberg_files(started_cluster, format_version):
# '/iceberg_data/default/test_multiple_iceberg_files/metadata/v1.metadata.json']
assert len(files) == 5
create_iceberg_table(instance, TABLE_NAME)
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
write_iceberg_from_df(
@ -259,8 +379,11 @@ def test_multiple_iceberg_files(started_cluster, format_version):
mode="append",
format_version=format_version,
)
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}", ""
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
"",
)
assert len(files) == 9
@ -271,12 +394,13 @@ def test_multiple_iceberg_files(started_cluster, format_version):
@pytest.mark.parametrize("format_version", ["1", "2"])
def test_types(started_cluster, format_version):
@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
def test_types(started_cluster, format_version, storage_type):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_types_" + format_version
TABLE_NAME = (
"test_types_" + format_version + "_" + storage_type + "_" + get_uuid_str()
)
data = [
(
@ -302,22 +426,29 @@ def test_types(started_cluster, format_version):
spark, df, TABLE_NAME, mode="overwrite", format_version=format_version
)
upload_directory(minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}", "")
default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)
create_iceberg_table(instance, TABLE_NAME)
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 1
assert (
instance.query(f"SELECT a, b, c, d, e FROM {TABLE_NAME}").strip()
== "123\tstring\t2000-01-01\t['str1','str2']\ttrue"
)
table_function = f"iceberg(s3, filename='iceberg_data/default/{TABLE_NAME}/')"
table_function_expr = get_creation_expression(
storage_type, TABLE_NAME, started_cluster, table_function=True
)
assert (
instance.query(f"SELECT a, b, c, d, e FROM {table_function}").strip()
instance.query(f"SELECT a, b, c, d, e FROM {table_function_expr}").strip()
== "123\tstring\t2000-01-01\t['str1','str2']\ttrue"
)
assert instance.query(f"DESCRIBE {table_function} FORMAT TSV") == TSV(
assert instance.query(f"DESCRIBE {table_function_expr} FORMAT TSV") == TSV(
[
["a", "Nullable(Int32)"],
["b", "Nullable(String)"],
@ -329,12 +460,20 @@ def test_types(started_cluster, format_version):
@pytest.mark.parametrize("format_version", ["1", "2"])
def test_delete_files(started_cluster, format_version):
@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
def test_delete_files(started_cluster, format_version, storage_type):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_delete_files_" + format_version
TABLE_NAME = (
"test_delete_files_"
+ format_version
+ "_"
+ storage_type
+ "_"
+ get_uuid_str()
)
write_iceberg_from_df(
spark,
@ -344,17 +483,22 @@ def test_delete_files(started_cluster, format_version):
format_version=format_version,
)
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)
create_iceberg_table(instance, TABLE_NAME)
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
spark.sql(f"DELETE FROM {TABLE_NAME} WHERE a >= 0")
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
"",
)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 0
@ -368,27 +512,41 @@ def test_delete_files(started_cluster, format_version):
format_version=format_version,
)
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
"",
)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
spark.sql(f"DELETE FROM {TABLE_NAME} WHERE a >= 150")
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
"",
)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 50
@pytest.mark.parametrize("format_version", ["1", "2"])
def test_evolved_schema(started_cluster, format_version):
@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
def test_evolved_schema(started_cluster, format_version, storage_type):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_evolved_schema_" + format_version
TABLE_NAME = (
"test_evolved_schema_"
+ format_version
+ "_"
+ storage_type
+ "_"
+ get_uuid_str()
)
write_iceberg_from_df(
spark,
@ -398,19 +556,25 @@ def test_evolved_schema(started_cluster, format_version):
format_version=format_version,
)
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)
create_iceberg_table(instance, TABLE_NAME)
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
expected_data = instance.query(f"SELECT * FROM {TABLE_NAME} order by a, b")
spark.sql(f"ALTER TABLE {TABLE_NAME} ADD COLUMNS (x bigint)")
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
"",
)
error = instance.query_and_get_error(f"SELECT * FROM {TABLE_NAME}")
@ -422,12 +586,13 @@ def test_evolved_schema(started_cluster, format_version):
assert data == expected_data
def test_row_based_deletes(started_cluster):
@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
def test_row_based_deletes(started_cluster, storage_type):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_row_based_deletes"
TABLE_NAME = "test_row_based_deletes_" + storage_type + "_" + get_uuid_str()
spark.sql(
f"CREATE TABLE {TABLE_NAME} (id bigint, data string) USING iceberg TBLPROPERTIES ('format-version' = '2', 'write.update.mode'='merge-on-read', 'write.delete.mode'='merge-on-read', 'write.merge.mode'='merge-on-read')"
@ -436,17 +601,23 @@ def test_row_based_deletes(started_cluster):
f"INSERT INTO {TABLE_NAME} select id, char(id + ascii('a')) from range(100)"
)
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)
create_iceberg_table(instance, TABLE_NAME)
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
spark.sql(f"DELETE FROM {TABLE_NAME} WHERE id < 10")
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
"",
)
error = instance.query_and_get_error(f"SELECT * FROM {TABLE_NAME}")
@ -454,13 +625,21 @@ def test_row_based_deletes(started_cluster):
@pytest.mark.parametrize("format_version", ["1", "2"])
def test_schema_inference(started_cluster, format_version):
@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
def test_schema_inference(started_cluster, format_version, storage_type):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
for format in ["Parquet", "ORC", "Avro"]:
TABLE_NAME = "test_schema_inference_" + format + "_" + format_version
TABLE_NAME = (
"test_schema_inference_"
+ format
+ "_"
+ format_version
+ "_"
+ storage_type
+ "_"
+ get_uuid_str()
)
# Types time, timestamptz, fixed are not supported in Spark.
spark.sql(
@ -470,12 +649,16 @@ def test_schema_inference(started_cluster, format_version):
spark.sql(
f"insert into {TABLE_NAME} select 42, 4242, 42.42, 4242.4242, decimal(42.42), decimal(42.42), decimal(42.42), date('2020-01-01'), timestamp('2020-01-01 20:00:00'), 'hello', binary('hello'), array(1,2,3), map('key', 'value'), struct(42, 'hello'), array(struct(map('key', array(map('key', 42))), struct(42, 'hello')))"
)
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)
create_iceberg_table(instance, TABLE_NAME, format)
create_iceberg_table(
storage_type, instance, TABLE_NAME, started_cluster, format=format
)
res = instance.query(
f"DESC {TABLE_NAME} FORMAT TSVRaw", settings={"print_pretty_type_names": 0}
@ -510,12 +693,18 @@ def test_schema_inference(started_cluster, format_version):
@pytest.mark.parametrize("format_version", ["1", "2"])
def test_metadata_file_selection(started_cluster, format_version):
@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
def test_metadata_file_selection(started_cluster, format_version, storage_type):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_metadata_selection_" + format_version
TABLE_NAME = (
"test_metadata_selection_"
+ format_version
+ "_"
+ storage_type
+ "_"
+ get_uuid_str()
)
spark.sql(
f"CREATE TABLE {TABLE_NAME} (id bigint, data string) USING iceberg TBLPROPERTIES ('format-version' = '2', 'write.update.mode'='merge-on-read', 'write.delete.mode'='merge-on-read', 'write.merge.mode'='merge-on-read')"
@ -526,22 +715,31 @@ def test_metadata_file_selection(started_cluster, format_version):
f"INSERT INTO {TABLE_NAME} select id, char(id + ascii('a')) from range(10)"
)
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)
create_iceberg_table(instance, TABLE_NAME)
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 500
@pytest.mark.parametrize("format_version", ["1", "2"])
def test_metadata_file_format_with_uuid(started_cluster, format_version):
@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
def test_metadata_file_format_with_uuid(started_cluster, format_version, storage_type):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_bucket
TABLE_NAME = "test_metadata_selection_with_uuid_" + format_version
TABLE_NAME = (
"test_metadata_selection_with_uuid_"
+ format_version
+ "_"
+ storage_type
+ "_"
+ get_uuid_str()
)
spark.sql(
f"CREATE TABLE {TABLE_NAME} (id bigint, data string) USING iceberg TBLPROPERTIES ('format-version' = '2', 'write.update.mode'='merge-on-read', 'write.delete.mode'='merge-on-read', 'write.merge.mode'='merge-on-read')"
@ -555,40 +753,48 @@ def test_metadata_file_format_with_uuid(started_cluster, format_version):
for i in range(50):
os.rename(
f"/iceberg_data/default/{TABLE_NAME}/metadata/v{i + 1}.metadata.json",
f"/iceberg_data/default/{TABLE_NAME}/metadata/{str(i).zfill(5)}-{uuid.uuid4()}.metadata.json",
f"/iceberg_data/default/{TABLE_NAME}/metadata/{str(i).zfill(5)}-{get_uuid_str()}.metadata.json",
)
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
files = default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)
create_iceberg_table(instance, TABLE_NAME)
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 500
def test_restart_broken(started_cluster):
def test_restart_broken_s3(started_cluster):
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
TABLE_NAME = "test_restart_broken_table_function_s3" + "_" + get_uuid_str()
minio_client = started_cluster.minio_client
bucket = "broken2"
TABLE_NAME = "test_restart_broken_table_function"
if not minio_client.bucket_exists(bucket):
minio_client.make_bucket(bucket)
parquet_data_path = create_initial_data_file(
started_cluster,
instance,
"SELECT number, toString(number) FROM numbers(100)",
write_iceberg_from_df(
spark,
generate_data(spark, 0, 100),
TABLE_NAME,
mode="overwrite",
format_version="1",
)
write_iceberg_from_file(spark, parquet_data_path, TABLE_NAME, format_version="1")
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
files = default_upload_directory(
started_cluster,
"s3",
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
bucket=bucket,
)
create_iceberg_table(instance, TABLE_NAME, bucket=bucket)
create_iceberg_table("s3", instance, TABLE_NAME, started_cluster, bucket=bucket)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100
s3_objects = list_s3_objects(minio_client, bucket, prefix="")
@ -613,8 +819,12 @@ def test_restart_broken(started_cluster):
minio_client.make_bucket(bucket)
files = upload_directory(
minio_client, bucket, f"/iceberg_data/default/{TABLE_NAME}/", ""
files = default_upload_directory(
started_cluster,
"s3",
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
bucket=bucket,
)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100