Merge pull request #69445 from ClickHouse/divanik/add_schema_evolution_concise

Make simple schema evolution for Iceberg data format
This commit is contained in:
Daniil Ivanik 2024-12-05 10:22:55 +00:00 committed by GitHub
commit 9f425ebe73
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 1660 additions and 307 deletions

View File

@ -6,6 +6,14 @@ sidebar_label: Iceberg
# Iceberg Table Engine
:::warning
We recommend using the [Iceberg Table Function](/docs/en/sql-reference/table-functions/iceberg.md) for working with Iceberg data in ClickHouse. The Iceberg Table Function currently provides sufficient functionality, offering a partial read-only interface for Iceberg tables.
The Iceberg Table Engine is available but may have limitations. ClickHouse wasn't originally designed to support tables with externally changing schemas, which can affect the functionality of the Iceberg Table Engine. As a result, some features that work with regular tables may be unavailable or may not function correctly, especially when using the old analyzer.
For optimal compatibility, we suggest using the Iceberg Table Function while we continue to improve support for the Iceberg Table Engine.
:::
This engine provides a read-only integration with existing Apache [Iceberg](https://iceberg.apache.org/) tables in Amazon S3, Azure, HDFS and locally stored tables.
## Create Table
@ -63,6 +71,16 @@ CREATE TABLE iceberg_table ENGINE=IcebergS3(iceberg_conf, filename = 'test_table
Table engine `Iceberg` is an alias to `IcebergS3` now.
**Schema Evolution**
At the moment, with the help of CH, you can read iceberg tables, the schema of which has changed over time. We currently support reading tables where columns have been added and removed, and their order has changed. You can also change a column where a value is required to one where NULL is allowed. Additionally, we support permitted type casting for simple types, namely:  
* int -> long
* float -> double
* decimal(P, S) -> decimal(P', S) where P' > P.
Currently, it is not possible to change nested structures or the types of elements within arrays and maps.
To read a table where the schema has changed after its creation with dynamic schema inference, set allow_dynamic_metadata_for_data_lakes = true when creating the table.
### Data cache {#data-cache}
`Iceberg` table engine and table function support data caching same as `S3`, `AzureBlobStorage`, `HDFS` storages. See [here](../../../engines/table-engines/integrations/s3.md#data-cache).

View File

@ -65,6 +65,14 @@ SELECT * FROM icebergS3(iceberg_conf, filename = 'test_table')
DESCRIBE icebergS3(iceberg_conf, filename = 'test_table')
```
**Schema Evolution**
At the moment, with the help of CH, you can read iceberg tables, the schema of which has changed over time. We currently support reading tables where columns have been added and removed, and their order has changed. You can also change a column where a value is required to one where NULL is allowed. Additionally, we support permitted type casting for simple types, namely:  
* int -> long
* float -> double
* decimal(P, S) -> decimal(P', S) where P' > P.
Currently, it is not possible to change nested structures or the types of elements within arrays and maps.
**Aliases**
Table function `iceberg` is an alias to `icebergS3` now.

View File

@ -443,6 +443,11 @@ std::shared_ptr<TableNode> IdentifierResolver::tryResolveTableIdentifierFromData
if (!storage)
return {};
if (storage->hasExternalDynamicMetadata())
{
storage->updateExternalDynamicMetadata(context);
}
if (!storage_lock)
storage_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef()[Setting::lock_acquire_timeout]);
auto storage_snapshot = storage->getStorageSnapshot(storage->getInMemoryMetadataPtr(), context);

View File

@ -5559,13 +5559,6 @@ Only available in ClickHouse Cloud. Exclude new data parts from SELECT queries u
)", 0) \
DECLARE(Int64, prefer_warmed_unmerged_parts_seconds, 0, R"(
Only available in ClickHouse Cloud. If a merged part is less than this many seconds old and is not pre-warmed (see cache_populated_by_fetch), but all its source parts are available and pre-warmed, SELECT queries will read from those parts instead. Only for ReplicatedMergeTree. Note that this only checks whether CacheWarmer processed the part; if the part was fetched into cache by something else, it'll still be considered cold until CacheWarmer gets to it; if it was warmed, then evicted from cache, it'll still be considered warm.
)", 0) \
DECLARE(Bool, iceberg_engine_ignore_schema_evolution, false, R"(
Allow to ignore schema evolution in Iceberg table engine and read all data using schema specified by the user on table creation or latest schema parsed from metadata on table creation.
:::note
Enabling this setting can lead to incorrect result as in case of evolved schema all data files will be read using the same schema.
:::
)", 0) \
DECLARE(Bool, allow_deprecated_error_prone_window_functions, false, R"(
Allow usage of deprecated error prone window functions (neighbor, runningAccumulate, runningDifferenceStartingWithFirstValue, runningDifference)
@ -5999,6 +5992,7 @@ Experimental data deduplication for SELECT queries based on part UUIDs
MAKE_OBSOLETE(M, Bool, optimize_monotonous_functions_in_order_by, false) \
MAKE_OBSOLETE(M, UInt64, http_max_chunk_size, 100_GiB) \
MAKE_OBSOLETE(M, Bool, enable_deflate_qpl_codec, false) \
MAKE_OBSOLETE(M, Bool, iceberg_engine_ignore_schema_evolution, false) \
/** The section above is for obsolete settings. Do not add anything there. */
#endif /// __CLION_IDE__

View File

@ -180,6 +180,11 @@ void InterpreterDescribeQuery::fillColumnsFromTable(const ASTTableExpression & t
auto table_id = getContext()->resolveStorageID(table_expression.database_and_table_name);
getContext()->checkAccess(AccessType::SHOW_COLUMNS, table_id);
auto table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (table->hasExternalDynamicMetadata())
{
table->updateExternalDynamicMetadata(getContext());
}
auto table_lock = table->lockForShare(getContext()->getInitialQueryId(), settings[Setting::lock_acquire_timeout]);
auto metadata_snapshot = table->getInMemoryMetadataPtr();

View File

@ -556,8 +556,14 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (storage)
{
if (storage->hasExternalDynamicMetadata())
{
storage->updateExternalDynamicMetadata(context);
metadata_snapshot = storage->getInMemoryMetadataPtr();
}
table_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef()[Setting::lock_acquire_timeout]);
table_id = storage->getStorageID();
if (!metadata_snapshot)
metadata_snapshot = storage->getInMemoryMetadataPtr();

View File

@ -121,6 +121,9 @@ public:
/// Returns true if the storage is dictionary
virtual bool isDictionary() const { return false; }
/// Returns true if the metadata of a table can be changed normally by other processes
virtual bool hasExternalDynamicMetadata() const { return false; }
/// Returns true if the storage supports queries with the SAMPLE section.
virtual bool supportsSampling() const { return getInMemoryMetadataPtr()->hasSamplingKey(); }
@ -503,6 +506,12 @@ public:
*/
virtual void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & alter_lock_holder);
/// Updates metadata that can be changed by other processes
virtual void updateExternalDynamicMetadata(ContextPtr)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method updateExternalDynamicMetadata is not supported by storage {}", getName());
}
/** Checks that alter commands can be applied to storage. For example, columns can be modified,
* or primary key can be changes, etc.
*/

View File

@ -12,13 +12,23 @@
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/StorageFactory.h>
#include <Common/logger_useful.h>
#include "Storages/ColumnsDescription.h"
#include <memory>
#include <string>
#include <unordered_map>
#include <Common/ErrorCodes.h>
namespace DB
{
namespace ErrorCodes
{
extern const int FORMAT_VERSION_TOO_OLD;
}
template <typename T>
concept StorageConfiguration = std::derived_from<T, StorageObjectStorage::Configuration>;
@ -36,13 +46,23 @@ public:
{
BaseStorageConfiguration::update(object_storage, local_context);
auto new_metadata = DataLakeMetadata::create(object_storage, weak_from_this(), local_context);
if (current_metadata && *current_metadata == *new_metadata)
return;
if (!current_metadata || (*current_metadata != *new_metadata))
{
if (hasExternalDynamicMetadata())
{
throw Exception(
ErrorCodes::FORMAT_VERSION_TOO_OLD,
"Metadata is not consinsent with the one which was used to infer table schema. Please, retry the query.");
}
else
{
current_metadata = std::move(new_metadata);
BaseStorageConfiguration::setPaths(current_metadata->getDataFiles());
BaseStorageConfiguration::setPartitionColumns(current_metadata->getPartitionColumns());
}
}
}
std::optional<ColumnsDescription> tryGetTableStructureFromMetadata() const override
{
@ -56,6 +76,41 @@ public:
return std::nullopt;
}
std::shared_ptr<NamesAndTypesList> getInitialSchemaByPath(const String & data_path) const override
{
if (!current_metadata)
return {};
return current_metadata->getInitialSchemaByPath(data_path);
}
std::shared_ptr<const ActionsDAG> getSchemaTransformer(const String & data_path) const override
{
if (!current_metadata)
return {};
return current_metadata->getSchemaTransformer(data_path);
}
bool hasExternalDynamicMetadata() override
{
return StorageObjectStorage::Configuration::allow_dynamic_metadata_for_data_lakes && current_metadata
&& current_metadata->supportsExternalMetadataChange();
}
ColumnsDescription updateAndGetCurrentSchema(ObjectStoragePtr object_storage, ContextPtr context) override
{
BaseStorageConfiguration::update(object_storage, context);
auto new_metadata = DataLakeMetadata::create(object_storage, weak_from_this(), context);
if (!current_metadata || (*current_metadata != *new_metadata))
{
current_metadata = std::move(new_metadata);
BaseStorageConfiguration::setPaths(current_metadata->getDataFiles());
BaseStorageConfiguration::setPartitionColumns(current_metadata->getPartitionColumns());
}
return ColumnsDescription{current_metadata->getTableSchema()};
}
private:
DataLakeMetadataPtr current_metadata;

View File

@ -1,12 +1,12 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <Core/Types.h>
#include <Core/NamesAndTypes.h>
#include <Core/Types.h>
#include <boost/noncopyable.hpp>
#include "Interpreters/ActionsDAG.h"
#include "PartitionColumns.h"
namespace DB
{
class IDataLakeMetadata : boost::noncopyable
{
public:
@ -16,6 +16,9 @@ public:
virtual bool operator==(const IDataLakeMetadata & other) const = 0;
virtual const DataLakePartitionColumns & getPartitionColumns() const = 0;
virtual const std::unordered_map<String, String> & getColumnNameToPhysicalNameMapping() const = 0;
virtual std::shared_ptr<NamesAndTypesList> getInitialSchemaByPath(const String &) const { return {}; }
virtual std::shared_ptr<const ActionsDAG> getSchemaTransformer(const String &) const { return {}; }
virtual bool supportsExternalMetadataChange() const { return false; }
};
using DataLakeMetadataPtr = std::unique_ptr<IDataLakeMetadata>;

View File

@ -1,3 +1,4 @@
#include <mutex>
#include "config.h"
#if USE_AVRO
@ -33,41 +34,47 @@
#include <Poco/JSON/Parser.h>
#include <filesystem>
#include <sstream>
namespace DB
{
namespace Setting
{
extern const SettingsBool iceberg_engine_ignore_schema_evolution;
extern const SettingsBool allow_data_lake_dynamic_schema;
}
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
extern const int ILLEGAL_COLUMN;
extern const int BAD_ARGUMENTS;
extern const int UNSUPPORTED_METHOD;
extern const int FILE_DOESNT_EXIST;
extern const int ILLEGAL_COLUMN;
extern const int BAD_ARGUMENTS;
extern const int UNSUPPORTED_METHOD;
extern const int LOGICAL_ERROR;
}
Int32 parseTableSchema(
const Poco::JSON::Object::Ptr & metadata_object, IcebergSchemaProcessor & schema_processor, LoggerPtr metadata_logger);
IcebergMetadata::IcebergMetadata(
ObjectStoragePtr object_storage_,
ConfigurationObserverPtr configuration_,
DB::ContextPtr context_,
const DB::ContextPtr & context_,
Int32 metadata_version_,
Int32 format_version_,
String manifest_list_file_,
Int32 current_schema_id_,
DB::NamesAndTypesList schema_)
const Poco::JSON::Object::Ptr & object)
: WithContext(context_)
, object_storage(object_storage_)
, configuration(configuration_)
, object_storage(std::move(object_storage_))
, configuration(std::move(configuration_))
, metadata_version(metadata_version_)
, format_version(format_version_)
, manifest_list_file(std::move(manifest_list_file_))
, current_schema_id(current_schema_id_)
, schema(std::move(schema_))
, schema_processor(IcebergSchemaProcessor())
, log(getLogger("IcebergMetadata"))
{
auto schema_id = parseTableSchema(object, schema_processor, log);
schema = *(schema_processor.getClickhouseTableSchemaById(schema_id));
current_schema_id = schema_id;
}
namespace
@ -87,60 +94,44 @@ enum class DataFileContent : uint8_t
EQUALITY_DELETES = 2,
};
/**
* Iceberg supports the next data types (see https://iceberg.apache.org/spec/#schemas-and-data-types):
* - Primitive types:
* - boolean
* - int
* - long
* - float
* - double
* - decimal(P, S)
* - date
* - time (time of day in microseconds since midnight)
* - timestamp (in microseconds since 1970-01-01)
* - timestamptz (timestamp with timezone, stores values in UTC timezone)
* - string
* - uuid
* - fixed(L) (fixed-length byte array of length L)
* - binary
* - Complex types:
* - struct(field1: Type1, field2: Type2, ...) (tuple of typed values)
* - list(nested_type)
* - map(Key, Value)
*
* Example of table schema in metadata:
* {
* "type" : "struct",
* "schema-id" : 0,
* "fields" : [
* {
* "id" : 1,
* "name" : "id",
* "required" : false,
* "type" : "long"
* },
* {
* "id" : 2,
* "name" : "array",
* "required" : false,
* "type" : {
* "type" : "list",
* "element-id" : 5,
* "element" : "int",
* "element-required" : false
* },
* {
* "id" : 3,
* "name" : "data",
* "required" : false,
* "type" : "binary"
* }
* }
*/
std::pair<size_t, size_t> parseDecimal(const String & type_name)
{
ReadBufferFromString buf(std::string_view(type_name.begin() + 8, type_name.end() - 1));
size_t precision;
size_t scale;
readIntText(precision, buf);
skipWhitespaceIfAny(buf);
assertChar(',', buf);
skipWhitespaceIfAny(buf);
tryReadIntText(scale, buf);
return {precision, scale};
}
bool operator==(const Poco::JSON::Object & first, const Poco::JSON::Object & second)
{
std::stringstream first_string_stream; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
std::stringstream second_string_stream; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
first.stringify(first_string_stream);
if (!first_string_stream)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "JSON Parsing failed");
}
second.stringify(second_string_stream);
if (!second_string_stream)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "JSON Parsing failed");
}
return first_string_stream.str() == second_string_stream.str();
}
bool operator!=(const Poco::JSON::Object & first, const Poco::JSON::Object & second)
{
return !(first == second);
}
}
DataTypePtr getSimpleTypeByName(const String & type_name)
DataTypePtr IcebergSchemaProcessor::getSimpleType(const String & type_name)
{
if (type_name == "boolean")
return DataTypeFactory::instance().get("Bool");
@ -154,8 +145,6 @@ DataTypePtr getSimpleTypeByName(const String & type_name)
return std::make_shared<DataTypeFloat64>();
if (type_name == "date")
return std::make_shared<DataTypeDate>();
/// Time type represents time of the day in microseconds since midnight.
/// We don't have similar type for it, let's use just Int64.
if (type_name == "time")
return std::make_shared<DataTypeInt64>();
if (type_name == "timestamp")
@ -178,22 +167,14 @@ DataTypePtr getSimpleTypeByName(const String & type_name)
if (type_name.starts_with("decimal(") && type_name.ends_with(')'))
{
ReadBufferFromString buf(std::string_view(type_name.begin() + 8, type_name.end() - 1));
size_t precision;
size_t scale;
readIntText(precision, buf);
skipWhitespaceIfAny(buf);
assertChar(',', buf);
skipWhitespaceIfAny(buf);
tryReadIntText(scale, buf);
auto [precision, scale] = parseDecimal(type_name);
return createDecimal<DataTypeDecimal>(precision, scale);
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown Iceberg type: {}", type_name);
}
DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & type_key, bool required);
DataTypePtr getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type)
DataTypePtr IcebergSchemaProcessor::getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type)
{
String type_name = type->getValue<String>("type");
if (type_name == "list")
@ -232,7 +213,7 @@ DataTypePtr getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown Iceberg type: {}", type_name);
}
DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & type_key, bool required)
DataTypePtr IcebergSchemaProcessor::getFieldType(const Poco::JSON::Object::Ptr & field, const String & type_key, bool required)
{
if (field->isObject(type_key))
return getComplexTypeFromObject(field->getObject(type_key));
@ -241,7 +222,7 @@ DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & t
if (type.isString())
{
const String & type_name = type.extract<String>();
auto data_type = getSimpleTypeByName(type_name);
auto data_type = getSimpleType(type_name);
return required ? data_type : makeNullable(data_type);
}
@ -249,8 +230,28 @@ DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & t
}
std::pair<Poco::JSON::Object::Ptr, Int32>
parseTableSchemaV2Method(const Poco::JSON::Object::Ptr & metadata_object, bool ignore_schema_evolution)
/**
* Iceberg allows only three types of primitive type conversion:
* int -> long
* float -> double
* decimal(P, S) -> decimal(P', S) where P' > P
* This function checks if `old_type` and `new_type` satisfy to one of these conditions.
**/
bool IcebergSchemaProcessor::allowPrimitiveTypeConversion(const String & old_type, const String & new_type)
{
bool allowed_type_conversion = (old_type == new_type);
allowed_type_conversion |= (old_type == "int") && (new_type == "long");
allowed_type_conversion |= (old_type == "float") && (new_type == "double");
if (old_type.starts_with("decimal(") && old_type.ends_with(')') && new_type.starts_with("decimal(") && new_type.ends_with(")"))
{
auto [old_precision, old_scale] = parseDecimal(old_type);
auto [new_precision, new_scale] = parseDecimal(new_type);
allowed_type_conversion |= (old_precision <= new_precision) && (old_scale == new_scale);
}
return allowed_type_conversion;
}
std::pair<Poco::JSON::Object::Ptr, Int32> parseTableSchemaV2Method(const Poco::JSON::Object::Ptr & metadata_object)
{
Poco::JSON::Object::Ptr schema;
if (!metadata_object->has("current-schema-id"))
@ -261,11 +262,6 @@ parseTableSchemaV2Method(const Poco::JSON::Object::Ptr & metadata_object, bool i
auto schemas = metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>();
if (schemas->size() == 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: schemas field is empty");
if (ignore_schema_evolution)
{
/// If we ignore schema evolution, we will just use latest schema for all data files.
/// Find schema with 'schema-id' equal to 'current_schema_id'.
for (uint32_t i = 0; i != schemas->size(); ++i)
{
auto current_schema = schemas->getObject(i);
@ -281,31 +277,13 @@ parseTableSchemaV2Method(const Poco::JSON::Object::Ptr & metadata_object, bool i
}
if (!schema)
throw Exception(
ErrorCodes::BAD_ARGUMENTS, R"(There is no schema with "schema-id" that matches "current-schema-id" in metadata)");
}
else
{
if (schemas->size() == 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: schemas field is empty");
if (schemas->size() > 1)
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is "
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable "
"setting "
"iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)");
/// Now we sure that there is only one schema.
schema = schemas->getObject(0);
throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(There is no schema with "schema-id" that matches "current-schema-id" in metadata)");
if (schema->getValue<int>("schema-id") != current_schema_id)
throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(Field "schema-id" of the schema doesn't match "current-schema-id" in metadata)");
}
return {schema, current_schema_id};
}
std::pair<Poco::JSON::Object::Ptr, Int32>
parseTableSchemaV1Method(const Poco::JSON::Object::Ptr & metadata_object, bool ignore_schema_evolution)
std::pair<Poco::JSON::Object::Ptr, Int32> parseTableSchemaV1Method(const Poco::JSON::Object::Ptr & metadata_object)
{
if (!metadata_object->has("schema"))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: 'schema' field is missing in metadata");
@ -313,36 +291,26 @@ parseTableSchemaV1Method(const Poco::JSON::Object::Ptr & metadata_object, bool i
if (!metadata_object->has("schema"))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: 'schema-id' field is missing in schema");
auto current_schema_id = schema->getValue<int>("schema-id");
if (!ignore_schema_evolution && metadata_object->has("schemas")
&& metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>()->size() > 1)
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not "
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable "
"setting "
"iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)");
return {schema, current_schema_id};
}
std::pair<NamesAndTypesList, Int32> parseTableSchema(
const Poco::JSON::Object::Ptr & metadata_object, LoggerPtr metadata_logger, int format_version, bool ignore_schema_evolution)
Int32 parseTableSchema(
const Poco::JSON::Object::Ptr & metadata_object, IcebergSchemaProcessor & schema_processor, LoggerPtr metadata_logger)
{
Poco::JSON::Object::Ptr schema;
Int32 current_schema_id;
/// First check if schema was evolved, because we don't support it yet.
/// For version 2 we can check it by using field schemas, but in version 1
/// this field is optional and we will check it later during parsing manifest files
/// (we will compare schema id from manifest file and currently used schema).
Int32 format_version = metadata_object->getValue<Int32>("format-version");
if (format_version == 2)
{
std::tie(schema, current_schema_id) = parseTableSchemaV2Method(metadata_object, ignore_schema_evolution);
auto [schema, current_schema_id] = parseTableSchemaV2Method(metadata_object);
schema_processor.addIcebergTableSchema(schema);
return current_schema_id;
}
else
{
try
{
std::tie(schema, current_schema_id) = parseTableSchemaV1Method(metadata_object, ignore_schema_evolution);
auto [schema, current_schema_id] = parseTableSchemaV1Method(metadata_object);
schema_processor.addIcebergTableSchema(schema);
return current_schema_id;
}
catch (const Exception & first_error)
{
@ -350,12 +318,15 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(
throw;
try
{
std::tie(schema, current_schema_id) = parseTableSchemaV2Method(metadata_object, ignore_schema_evolution);
auto [schema, current_schema_id] = parseTableSchemaV2Method(metadata_object);
schema_processor.addIcebergTableSchema(schema);
LOG_WARNING(
metadata_logger,
"Iceberg table schema was parsed using v2 specification, but it was impossible to parse it using v1 specification. Be "
"Iceberg table schema was parsed using v2 specification, but it was impossible to parse it using v1 "
"specification. Be "
"aware that you Iceberg writing engine violates Iceberg specification. Error during parsing {}",
first_error.displayText());
return current_schema_id;
}
catch (const Exception & second_error)
{
@ -369,25 +340,174 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(
}
}
}
}
NamesAndTypesList names_and_types;
auto fields = schema->get("fields").extract<Poco::JSON::Array::Ptr>();
// Ids are passed only for error logging purposes
std::shared_ptr<ActionsDAG> IcebergSchemaProcessor::getSchemaTransformationDag(
const Poco::JSON::Object::Ptr & old_schema, const Poco::JSON::Object::Ptr & new_schema, Int32 old_id, Int32 new_id)
{
std::unordered_map<size_t, std::pair<Poco::JSON::Object::Ptr, const ActionsDAG::Node *>> old_schema_entries;
auto old_schema_fields = old_schema->get("fields").extract<Poco::JSON::Array::Ptr>();
std::shared_ptr<ActionsDAG> dag = std::make_shared<ActionsDAG>();
auto & outputs = dag->getOutputs();
for (size_t i = 0; i != old_schema_fields->size(); ++i)
{
auto field = old_schema_fields->getObject(static_cast<UInt32>(i));
size_t id = field->getValue<size_t>("id");
auto name = field->getValue<String>("name");
bool required = field->getValue<bool>("required");
old_schema_entries[id] = {field, &dag->addInput(name, getFieldType(field, "type", required))};
}
auto new_schema_fields = new_schema->get("fields").extract<Poco::JSON::Array::Ptr>();
for (size_t i = 0; i != new_schema_fields->size(); ++i)
{
auto field = new_schema_fields->getObject(static_cast<UInt32>(i));
size_t id = field->getValue<size_t>("id");
auto name = field->getValue<String>("name");
bool required = field->getValue<bool>("required");
auto type = getFieldType(field, "type", required);
auto old_node_it = old_schema_entries.find(id);
if (old_node_it != old_schema_entries.end())
{
auto [old_json, old_node] = old_node_it->second;
if (field->isObject("type"))
{
if (*old_json != *field)
{
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Schema evolution is not supported for complex types yet, field id is {}, old schema id is {}, new schema id "
"is {}",
id,
old_id,
new_id);
}
else
{
outputs.push_back(old_node);
}
}
else
{
if (old_json->isObject("type"))
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Can't cast primitive type to the complex type, field id is {}, old schema id is {}, new schema id is {}",
id,
old_id,
new_id);
}
String old_type = old_json->getValue<String>("type");
String new_type = field->getValue<String>("type");
const ActionsDAG::Node * node = old_node;
if (old_type == new_type)
{
if (old_json->getValue<String>("name") != name)
{
node = &dag->addAlias(*old_node, name);
}
}
else if (allowPrimitiveTypeConversion(old_type, new_type))
{
node = &dag->addCast(*old_node, getFieldType(field, "type", required), name);
}
outputs.push_back(node);
}
}
else
{
if (field->isObject("type"))
{
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Adding a default column with id {} and complex type is not supported yet. Old schema id is {}, new schema id is "
"{}",
id,
old_id,
new_id);
}
if (!type->isNullable())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add a column with id {} with required values to the table during schema evolution. This is forbidden by "
"Iceberg format specification. Old schema id is {}, new "
"schema id is {}",
id,
old_id,
new_id);
}
ColumnPtr default_type_column = type->createColumnConstWithDefaultValue(0);
const auto & constant = dag->addColumn({default_type_column, type, name});
outputs.push_back(&constant);
}
}
return dag;
}
std::shared_ptr<const ActionsDAG> IcebergSchemaProcessor::getSchemaTransformationDagByIds(Int32 old_id, Int32 new_id)
{
if (old_id == new_id)
{
return nullptr;
}
std::lock_guard lock(mutex);
auto required_transform_dag_it = transform_dags_by_ids.find({old_id, new_id});
if (required_transform_dag_it != transform_dags_by_ids.end())
{
return required_transform_dag_it->second;
}
auto old_schema_it = iceberg_table_schemas_by_ids.find(old_id);
if (old_schema_it == iceberg_table_schemas_by_ids.end())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema with schema-id {} is unknown", old_id);
}
auto new_schema_it = iceberg_table_schemas_by_ids.find(new_id);
if (new_schema_it == iceberg_table_schemas_by_ids.end())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema with schema-id {} is unknown", new_id);
}
return transform_dags_by_ids[{old_id, new_id}]
= getSchemaTransformationDag(old_schema_it->second, new_schema_it->second, old_id, new_id);
}
void IcebergSchemaProcessor::addIcebergTableSchema(Poco::JSON::Object::Ptr schema_ptr)
{
Int32 schema_id = schema_ptr->getValue<Int32>("schema-id");
if (iceberg_table_schemas_by_ids.contains(schema_id))
{
chassert(clickhouse_table_schemas_by_ids.contains(schema_id));
chassert(*iceberg_table_schemas_by_ids.at(schema_id) == *schema_ptr);
}
else
{
iceberg_table_schemas_by_ids[schema_id] = schema_ptr;
auto fields = schema_ptr->get("fields").extract<Poco::JSON::Array::Ptr>();
auto clickhouse_schema = std::make_shared<NamesAndTypesList>();
for (size_t i = 0; i != fields->size(); ++i)
{
auto field = fields->getObject(static_cast<UInt32>(i));
auto name = field->getValue<String>("name");
bool required = field->getValue<bool>("required");
names_and_types.push_back({name, getFieldType(field, "type", required)});
clickhouse_schema->push_back(NameAndTypePair{name, getFieldType(field, "type", required)});
}
clickhouse_table_schemas_by_ids[schema_id] = clickhouse_schema;
}
return {std::move(names_and_types), current_schema_id};
}
MutableColumns parseAvro(
avro::DataFileReaderBase & file_reader,
const Block & header,
const FormatSettings & settings)
std::shared_ptr<NamesAndTypesList> IcebergSchemaProcessor::getClickhouseTableSchemaById(Int32 id)
{
auto it = clickhouse_table_schemas_by_ids.find(id);
if (it == clickhouse_table_schemas_by_ids.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema with id {} is unknown", id);
return it->second;
}
MutableColumns parseAvro(avro::DataFileReaderBase & file_reader, const Block & header, const FormatSettings & settings)
{
auto deserializer = std::make_unique<AvroDeserializer>(header, file_reader.dataSchema(), true, true, settings);
MutableColumns columns = header.cloneEmptyColumns();
@ -408,9 +528,8 @@ MutableColumns parseAvro(
* 1) v<V>.metadata.json, where V - metadata version.
* 2) <V>-<random-uuid>.metadata.json, where V - metadata version
*/
std::pair<Int32, String> getMetadataFileAndVersion(
ObjectStoragePtr object_storage,
const StorageObjectStorage::Configuration & configuration)
std::pair<Int32, String>
getMetadataFileAndVersion(const ObjectStoragePtr & object_storage, const StorageObjectStorage::Configuration & configuration)
{
const auto metadata_files = listFiles(*object_storage, configuration, "metadata", ".metadata.json");
if (metadata_files.empty())
@ -443,10 +562,9 @@ std::pair<Int32, String> getMetadataFileAndVersion(
return *std::max_element(metadata_files_with_versions.begin(), metadata_files_with_versions.end());
}
}
DataLakeMetadataPtr
IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context)
DataLakeMetadataPtr IcebergMetadata::create(
const ObjectStoragePtr & object_storage, const ConfigurationObserverPtr & configuration, const ContextPtr & local_context)
{
auto configuration_ptr = configuration.lock();
@ -465,14 +583,15 @@ IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPt
Poco::Dynamic::Var json = parser.parse(json_str);
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
auto format_version = object->getValue<int>("format-version");
auto [schema, schema_id]
= parseTableSchema(object, log, format_version, local_context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution]);
IcebergSchemaProcessor schema_processor;
auto format_version = object->getValue<int>("format-version");
auto current_snapshot_id = object->getValue<Int64>("current-snapshot-id");
auto snapshots = object->get("snapshots").extract<Poco::JSON::Array::Ptr>();
String manifest_list_file;
auto current_snapshot_id = object->getValue<Int64>("current-snapshot-id");
for (size_t i = 0; i < snapshots->size(); ++i)
{
const auto snapshot = snapshots->getObject(static_cast<UInt32>(i));
@ -484,8 +603,11 @@ IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPt
}
}
return std::make_unique<IcebergMetadata>(
object_storage, configuration_ptr, local_context, metadata_version, format_version, manifest_list_file, schema_id, schema);
auto ptr = std::make_unique<IcebergMetadata>(
object_storage, configuration_ptr, local_context, metadata_version, format_version, manifest_list_file, object);
return ptr;
}
/**
@ -513,12 +635,15 @@ IcebergMetadata::create(ObjectStoragePtr object_storage, ConfigurationObserverPt
* 1 2252246380142525104 ('/iceberg_data/db/table_name/data/a=2/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00003.parquet','PARQUET',(2),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0'),(2,'3')],[(1,'\0\0\0\0\0\0\0'),(2,'3')],NULL,[4],0)
*
*/
Strings IcebergMetadata::getDataFiles() const
{
auto configuration_ptr = configuration.lock();
std::lock_guard lock(get_data_files_mutex);
if (!data_files.empty())
return data_files;
auto configuration_ptr = configuration.lock();
Strings manifest_files;
if (manifest_list_file.empty())
return data_files;
@ -551,7 +676,6 @@ Strings IcebergMetadata::getDataFiles() const
manifest_files.emplace_back(std::filesystem::path(configuration_ptr->getPath()) / "metadata" / filename);
}
NameSet files;
LOG_TEST(log, "Collect data files");
for (const auto & manifest_file : manifest_files)
{
@ -575,14 +699,7 @@ Strings IcebergMetadata::getDataFiles() const
Poco::JSON::Parser parser;
Poco::Dynamic::Var json = parser.parse(schema_json_string);
const Poco::JSON::Object::Ptr & schema_object = json.extract<Poco::JSON::Object::Ptr>();
if (!context->getSettingsRef()[Setting::iceberg_engine_ignore_schema_evolution]
&& schema_object->getValue<int>("schema-id") != current_schema_id)
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not "
"supported. If you want to ignore schema evolution and read all files using latest schema saved on table creation, enable setting "
"iceberg_engine_ignore_schema_evolution (Note: enabling this setting can lead to incorrect result)");
Int32 schema_object_id = schema_object->getValue<int>("schema-id");
avro::NodePtr root_node = manifest_file_reader->dataSchema().root();
size_t leaves_num = root_node->leaves();
size_t expected_min_num = format_version == 1 ? 3 : 2;
@ -698,17 +815,22 @@ Strings IcebergMetadata::getDataFiles() const
if (ManifestEntryStatus(status) == ManifestEntryStatus::DELETED)
{
LOG_TEST(log, "Processing delete file for path: {}", file_path);
chassert(!files.contains(file_path));
chassert(schema_id_by_data_file.contains(file_path) == 0);
}
else
{
LOG_TEST(log, "Processing data file for path: {}", file_path);
files.insert(file_path);
}
schema_id_by_data_file[file_path] = schema_object_id;
}
}
data_files = std::vector<std::string>(files.begin(), files.end());
schema_processor.addIcebergTableSchema(schema_object);
}
for (const auto & [file_path, schema_object_id] : schema_id_by_data_file)
{
data_files.emplace_back(file_path);
}
return data_files;
}

View File

@ -1,5 +1,9 @@
#pragma once
#include <memory>
#include <mutex>
#include <optional>
#include <unordered_map>
#include "config.h"
#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format.
@ -10,9 +14,98 @@
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
#include <Poco/JSON/Array.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
/**
* Iceberg supports the next data types (see https://iceberg.apache.org/spec/#schemas-and-data-types):
* - Primitive types:
* - boolean
* - int
* - long
* - float
* - double
* - decimal(P, S)
* - date
* - time (time of day in microseconds since midnight)
* - timestamp (in microseconds since 1970-01-01)
* - timestamptz (timestamp with timezone, stores values in UTC timezone)
* - string
* - uuid
* - fixed(L) (fixed-length byte array of length L)
* - binary
* - Complex types:
* - struct(field1: Type1, field2: Type2, ...) (tuple of typed values)
* - list(nested_type)
* - map(Key, Value)
*
* Example of table schema in metadata:
* {
* "type" : "struct",
* "schema-id" : 0,
* "fields" : [
* {
* "id" : 1,
* "name" : "id",
* "required" : false,
* "type" : "long"
* },
* {
* "id" : 2,
* "name" : "array",
* "required" : false,
* "type" : {
* "type" : "list",
* "element-id" : 5,
* "element" : "int",
* "element-required" : false
* },
* {
* "id" : 3,
* "name" : "data",
* "required" : false,
* "type" : "binary"
* }
* }
*/
class IcebergSchemaProcessor
{
using Node = ActionsDAG::Node;
public:
void addIcebergTableSchema(Poco::JSON::Object::Ptr schema_ptr);
std::shared_ptr<NamesAndTypesList> getClickhouseTableSchemaById(Int32 id);
std::shared_ptr<const ActionsDAG> getSchemaTransformationDagByIds(Int32 old_id, Int32 new_id);
private:
std::unordered_map<Int32, Poco::JSON::Object::Ptr> iceberg_table_schemas_by_ids;
std::unordered_map<Int32, std::shared_ptr<NamesAndTypesList>> clickhouse_table_schemas_by_ids;
std::map<std::pair<Int32, Int32>, std::shared_ptr<ActionsDAG>> transform_dags_by_ids;
NamesAndTypesList getSchemaType(const Poco::JSON::Object::Ptr & schema);
DataTypePtr getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type);
DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & type_key, bool required);
DataTypePtr getSimpleType(const String & type_name);
bool allowPrimitiveTypeConversion(const String & old_type, const String & new_type);
const Node * getDefaultNodeForField(const Poco::JSON::Object::Ptr & field);
std::shared_ptr<ActionsDAG> getSchemaTransformationDag(
const Poco::JSON::Object::Ptr & old_schema, const Poco::JSON::Object::Ptr & new_schema, Int32 old_id, Int32 new_id);
std::mutex mutex;
};
/**
* Useful links:
* - https://iceberg.apache.org/spec/
@ -70,12 +163,11 @@ public:
IcebergMetadata(
ObjectStoragePtr object_storage_,
ConfigurationObserverPtr configuration_,
ContextPtr context_,
const DB::ContextPtr & context_,
Int32 metadata_version_,
Int32 format_version_,
String manifest_list_file_,
Int32 current_schema_id_,
NamesAndTypesList schema_);
const Poco::JSON::Object::Ptr& object);
/// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files.
/// All subsequent calls will return saved list of files (because it cannot be changed without changing metadata file)
@ -94,10 +186,29 @@ public:
return iceberg_metadata && getVersion() == iceberg_metadata->getVersion();
}
static DataLakeMetadataPtr create(ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, ContextPtr local_context);
static DataLakeMetadataPtr
create(const ObjectStoragePtr & object_storage, const ConfigurationObserverPtr & configuration, const ContextPtr & local_context);
size_t getVersion() const { return metadata_version; }
std::shared_ptr<NamesAndTypesList> getInitialSchemaByPath(const String & data_path) const override
{
auto version_if_outdated = getSchemaVersionByFileIfOutdated(data_path);
return version_if_outdated.has_value() ? schema_processor.getClickhouseTableSchemaById(version_if_outdated.value()) : nullptr;
}
std::shared_ptr<const ActionsDAG> getSchemaTransformer(const String & data_path) const override
{
auto version_if_outdated = getSchemaVersionByFileIfOutdated(data_path);
return version_if_outdated.has_value()
? schema_processor.getSchemaTransformationDagByIds(version_if_outdated.value(), current_schema_id)
: nullptr;
}
bool supportsExternalMetadataChange() const override { return true; }
private:
size_t getVersion() const { return metadata_version; }
mutable std::unordered_map<String, Int32> schema_id_by_data_file;
const ObjectStoragePtr object_storage;
const ConfigurationObserverPtr configuration;
@ -105,11 +216,26 @@ private:
Int32 format_version;
String manifest_list_file;
Int32 current_schema_id;
NamesAndTypesList schema;
mutable Strings data_files;
std::unordered_map<String, String> column_name_to_physical_name;
DataLakePartitionColumns partition_columns;
NamesAndTypesList schema;
mutable IcebergSchemaProcessor schema_processor;
LoggerPtr log;
mutable std::mutex get_data_files_mutex;
std::optional<Int32> getSchemaVersionByFileIfOutdated(String data_path) const
{
auto schema_id = schema_id_by_data_file.find(data_path);
if (schema_id == schema_id_by_data_file.end())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot find schema version for data file: {}", data_path);
}
if (schema_id->second == current_schema_id)
return std::nullopt;
return std::optional{schema_id->second};
}
};
}

View File

@ -24,7 +24,9 @@
#include <Storages/VirtualColumnUtils.h>
#include "Databases/LoadingStrictnessLevel.h"
#include "Storages/ColumnsDescription.h"
#include "Storages/ObjectStorage/StorageObjectStorageSettings.h"
#include <Poco/Logger.h>
namespace DB
{
@ -42,6 +44,11 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
namespace StorageObjectStorageSetting
{
extern const StorageObjectStorageSettingsBool allow_dynamic_metadata_for_data_lakes;
}
String StorageObjectStorage::getPathSample(ContextPtr context)
{
auto query_settings = configuration->getQuerySettings(context);
@ -95,6 +102,9 @@ StorageObjectStorage::StorageObjectStorage(
{
try
{
if (configuration->hasExternalDynamicMetadata())
configuration->updateAndGetCurrentSchema(object_storage, context);
else
configuration->update(object_storage, context);
}
catch (...)
@ -152,6 +162,19 @@ void StorageObjectStorage::Configuration::update(ObjectStoragePtr object_storage
IObjectStorage::ApplyNewSettingsOptions options{.allow_client_change = !isStaticConfiguration()};
object_storage_ptr->applyNewSettings(context->getConfigRef(), getTypeName() + ".", context, options);
}
bool StorageObjectStorage::hasExternalDynamicMetadata() const
{
return configuration->hasExternalDynamicMetadata();
}
void StorageObjectStorage::updateExternalDynamicMetadata(ContextPtr context_ptr)
{
StorageInMemoryMetadata metadata;
metadata.setColumns(configuration->updateAndGetCurrentSchema(object_storage, context_ptr));
setInMemoryMetadata(metadata);
}
namespace
{
class ReadFromObjectStorageStep : public SourceStepWithFilter
@ -365,8 +388,7 @@ SinkToStoragePtr StorageObjectStorage::write(
}
auto paths = configuration->getPaths();
if (auto new_key = checkAndGetNewFileOnInsertIfNeeded(
*object_storage, *configuration, settings, paths.front(), paths.size()))
if (auto new_key = checkAndGetNewFileOnInsertIfNeeded(*object_storage, *configuration, settings, paths.front(), paths.size()))
{
paths.push_back(*new_key);
}
@ -439,6 +461,9 @@ ColumnsDescription StorageObjectStorage::resolveSchemaFromData(
{
if (configuration->isDataLakeConfiguration())
{
if (configuration->hasExternalDynamicMetadata())
configuration->updateAndGetCurrentSchema(object_storage, context);
else
configuration->update(object_storage, context);
auto table_structure = configuration->tryGetTableStructureFromMetadata();
if (table_structure)
@ -518,7 +543,8 @@ void StorageObjectStorage::Configuration::initialize(
Configuration & configuration,
ASTs & engine_args,
ContextPtr local_context,
bool with_table_structure)
bool with_table_structure,
std::unique_ptr<StorageObjectStorageSettings> settings)
{
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, local_context))
configuration.fromNamedCollection(*named_collection, local_context);
@ -542,6 +568,9 @@ void StorageObjectStorage::Configuration::initialize(
else
FormatFactory::instance().checkFormatName(configuration.format);
if (settings)
configuration.allow_dynamic_metadata_for_data_lakes
= (*settings)[StorageObjectStorageSetting::allow_dynamic_metadata_for_data_lakes];
configuration.initialized = true;
}

View File

@ -5,10 +5,13 @@
#include <Processors/Formats/IInputFormat.h>
#include <Storages/IStorage.h>
#include <Storages/ObjectStorage/DataLakes/PartitionColumns.h>
#include <Storages/ObjectStorage/StorageObjectStorageSettings.h>
#include <Storages/prepareReadingFromFormat.h>
#include <Common/threadPoolCallbackRunner.h>
#include "Interpreters/ActionsDAG.h"
#include "Storages/ColumnsDescription.h"
#include <memory>
namespace DB
{
@ -16,6 +19,12 @@ class ReadBufferIterator;
class SchemaCache;
class NamedCollection;
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
/**
* A general class containing implementation for external table engines
* such as StorageS3, StorageAzure, StorageHDFS.
@ -122,6 +131,10 @@ public:
std::string & sample_path,
const ContextPtr & context);
bool hasExternalDynamicMetadata() const override;
void updateExternalDynamicMetadata(ContextPtr) override;
protected:
String getPathSample(ContextPtr context);
@ -155,7 +168,8 @@ public:
Configuration & configuration,
ASTs & engine_args,
ContextPtr local_context,
bool with_table_structure);
bool with_table_structure,
std::unique_ptr<StorageObjectStorageSettings> settings);
/// Storage type: s3, hdfs, azure, local.
virtual ObjectStorageType getType() const = 0;
@ -204,6 +218,17 @@ public:
virtual bool isDataLakeConfiguration() const { return false; }
virtual bool hasExternalDynamicMetadata() { return false; }
virtual std::shared_ptr<NamesAndTypesList> getInitialSchemaByPath(const String&) const { return {}; }
virtual std::shared_ptr<const ActionsDAG> getSchemaTransformer(const String&) const { return {}; }
virtual ColumnsDescription updateAndGetCurrentSchema(ObjectStoragePtr, ContextPtr)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method updateAndGetCurrentSchema is not supported by storage {}", getEngineName());
}
virtual ReadFromFormatInfo prepareReadingFromFormat(
ObjectStoragePtr object_storage,
const Strings & requested_columns,
@ -229,6 +254,8 @@ protected:
bool initialized = false;
DataLakePartitionColumns partition_columns;
bool allow_dynamic_metadata_for_data_lakes;
};
}

View File

@ -0,0 +1,76 @@
#include <Core/BaseSettings.h>
#include <Core/BaseSettingsFwdMacrosImpl.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSetQuery.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorageSettings.h>
#include <Storages/System/MutableColumnsAndConstraints.h>
#include <Common/Exception.h>
namespace DB
{
#define STORAGE_OBJECT_STORAGE_RELATED_SETTINGS(DECLARE, ALIAS) \
DECLARE( \
Bool, \
allow_dynamic_metadata_for_data_lakes, \
false, \
"If enabled, indicates that metadata is taken from iceberg specification that is pulled from cloud before each query.", \
0)
#define LIST_OF_STORAGE_OBJECT_STORAGE_SETTINGS(M, ALIAS) \
STORAGE_OBJECT_STORAGE_RELATED_SETTINGS(M, ALIAS) \
LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS)
DECLARE_SETTINGS_TRAITS(StorageObjectStorageSettingsTraits, LIST_OF_STORAGE_OBJECT_STORAGE_SETTINGS)
IMPLEMENT_SETTINGS_TRAITS(StorageObjectStorageSettingsTraits, LIST_OF_STORAGE_OBJECT_STORAGE_SETTINGS)
struct StorageObjectStorageSettingsImpl : public BaseSettings<StorageObjectStorageSettingsTraits>
{
};
#define INITIALIZE_SETTING_EXTERN(TYPE, NAME, DEFAULT, DESCRIPTION, FLAGS) \
StorageObjectStorageSettings##TYPE NAME = &StorageObjectStorageSettingsImpl ::NAME;
namespace StorageObjectStorageSetting
{
LIST_OF_STORAGE_OBJECT_STORAGE_SETTINGS(INITIALIZE_SETTING_EXTERN, SKIP_ALIAS)
}
#undef INITIALIZE_SETTING_EXTERN
StorageObjectStorageSettings::StorageObjectStorageSettings() : impl(std::make_unique<StorageObjectStorageSettingsImpl>())
{
}
StorageObjectStorageSettings::StorageObjectStorageSettings(const StorageObjectStorageSettings & settings)
: impl(std::make_unique<StorageObjectStorageSettingsImpl>(*settings.impl))
{
}
StorageObjectStorageSettings::StorageObjectStorageSettings(StorageObjectStorageSettings && settings) noexcept
: impl(std::make_unique<StorageObjectStorageSettingsImpl>(std::move(*settings.impl)))
{
}
StorageObjectStorageSettings::~StorageObjectStorageSettings() = default;
STORAGE_OBJECT_STORAGE_SETTINGS_SUPPORTED_TYPES(StorageObjectStorageSettings, IMPLEMENT_SETTING_SUBSCRIPT_OPERATOR)
void StorageObjectStorageSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
impl->applyChanges(storage_def.settings->changes);
}
}
Field StorageObjectStorageSettings::get(const std::string & name)
{
return impl->get(name);
}
}

View File

@ -0,0 +1,65 @@
#pragma once
#include <Core/BaseSettingsFwdMacros.h>
#include <Core/FormatFactorySettings.h>
#include <Core/SettingsEnums.h>
#include <Core/SettingsFields.h>
namespace DB
{
class ASTStorage;
struct StorageObjectStorageSettingsImpl;
struct MutableColumnsAndConstraints;
class StorageObjectStorage;
class SettingsChanges;
/// List of available types supported in StorageObjectStorageSettingsSettings object
#define STORAGE_OBJECT_STORAGE_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \
M(CLASS_NAME, ArrowCompression) \
M(CLASS_NAME, Bool) \
M(CLASS_NAME, CapnProtoEnumComparingMode) \
M(CLASS_NAME, Char) \
M(CLASS_NAME, DateTimeInputFormat) \
M(CLASS_NAME, DateTimeOutputFormat) \
M(CLASS_NAME, DateTimeOverflowBehavior) \
M(CLASS_NAME, Double) \
M(CLASS_NAME, EscapingRule) \
M(CLASS_NAME, Float) \
M(CLASS_NAME, IdentifierQuotingRule) \
M(CLASS_NAME, IdentifierQuotingStyle) \
M(CLASS_NAME, Int64) \
M(CLASS_NAME, IntervalOutputFormat) \
M(CLASS_NAME, MsgPackUUIDRepresentation) \
M(CLASS_NAME, ObjectStorageQueueAction) \
M(CLASS_NAME, ObjectStorageQueueMode) \
M(CLASS_NAME, ORCCompression) \
M(CLASS_NAME, ParquetCompression) \
M(CLASS_NAME, ParquetVersion) \
M(CLASS_NAME, SchemaInferenceMode) \
M(CLASS_NAME, String) \
M(CLASS_NAME, UInt32) \
M(CLASS_NAME, UInt64) \
M(CLASS_NAME, UInt64Auto) \
M(CLASS_NAME, URI)
STORAGE_OBJECT_STORAGE_SETTINGS_SUPPORTED_TYPES(StorageObjectStorageSettings, DECLARE_SETTING_TRAIT)
struct StorageObjectStorageSettings
{
StorageObjectStorageSettings();
StorageObjectStorageSettings(const StorageObjectStorageSettings & settings);
StorageObjectStorageSettings(StorageObjectStorageSettings && settings) noexcept;
~StorageObjectStorageSettings();
STORAGE_OBJECT_STORAGE_SETTINGS_SUPPORTED_TYPES(StorageObjectStorageSettings, DECLARE_SETTING_SUBSCRIPT_OPERATOR)
void loadFromQuery(ASTStorage & storage_def);
Field get(const std::string & name);
private:
std::unique_ptr<StorageObjectStorageSettingsImpl> impl;
};
}

View File

@ -1,26 +1,30 @@
#include "StorageObjectStorageSource.h"
#include <Storages/VirtualColumnUtils.h>
#include <optional>
#include <Core/Settings.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Disks/ObjectStorages/ObjectStorageIterator.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Sources/ConstChunkGenerator.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Transforms/ExtractColumnsTransform.h>
#include <Formats/FormatFactory.h>
#include <Formats/ReadSchemaUtils.h>
#include <IO/Archives/createArchiveReader.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/Cache/FileCache.h>
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>
#include <IO/Archives/createArchiveReader.h>
#include <Formats/FormatFactory.h>
#include <Disks/IO/AsynchronousBoundedReadBuffer.h>
#include <Formats/ReadSchemaUtils.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Interpreters/ExpressionActions.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Sources/ConstChunkGenerator.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/ExtractColumnsTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/Cache/SchemaCache.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/VirtualColumnUtils.h>
#include <Common/parseGlobs.h>
#include <Core/Settings.h>
#include "Disks/IO/CachedOnDiskReadBufferFromFile.h"
#include "Interpreters/Cache/FileCache.h"
#include "Interpreters/Cache/FileCacheKey.h"
namespace fs = std::filesystem;
namespace ProfileEvents
{
extern const Event EngineFileLikeReadFiles;
@ -376,16 +380,29 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
read_buf = createReadBuffer(*object_info, object_storage, context_, log);
}
Block initial_header = read_from_format_info.format_header;
if (auto initial_schema = configuration->getInitialSchemaByPath(object_info->getPath()))
{
Block sample_header;
for (const auto & [name, type] : *initial_schema)
{
sample_header.insert({type->createColumn(), type, name});
}
initial_header = sample_header;
}
auto input_format = FormatFactory::instance().getInput(
configuration->format,
*read_buf,
read_from_format_info.format_header,
initial_header,
context_,
max_block_size,
format_settings,
need_only_count ? 1 : max_parsing_threads,
std::nullopt,
true/* is_remote_fs */,
true /* is_remote_fs */,
compression_method,
need_only_count);
@ -399,6 +416,16 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
builder.init(Pipe(input_format));
if (auto transformer = configuration->getSchemaTransformer(object_info->getPath()))
{
auto schema_modifying_actions = std::make_shared<ExpressionActions>(transformer->clone());
builder.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, schema_modifying_actions);
});
}
if (read_from_format_info.columns_description.hasDefaults())
{
builder.addSimpleTransform(

View File

@ -7,6 +7,9 @@
#include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/StorageFactory.h>
#include <Poco/Logger.h>
#include "Common/logger_useful.h"
#include "Storages/ObjectStorage/StorageObjectStorageSettings.h"
namespace DB
{
@ -29,7 +32,12 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject
if (engine_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, context, false);
auto queue_settings = std::make_unique<StorageObjectStorageSettings>();
queue_settings->loadFromQuery(*args.storage_def);
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, context, false, std::move(queue_settings));
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
@ -163,12 +171,10 @@ void registerStorageIceberg(StorageFactory & factory)
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3IcebergConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{
.supports_settings = false,
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
@ -178,59 +184,51 @@ void registerStorageIceberg(StorageFactory & factory)
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3IcebergConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{
.supports_settings = false,
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
#endif
#if USE_AZURE_BLOB_STORAGE
# endif
# if USE_AZURE_BLOB_STORAGE
factory.registerStorage(
"IcebergAzure",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageAzureIcebergConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), true);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{
.supports_settings = false,
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::AZURE,
});
#endif
#if USE_HDFS
# endif
# if USE_HDFS
factory.registerStorage(
"IcebergHDFS",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageHDFSIcebergConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{
.supports_settings = false,
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::HDFS,
});
#endif
# endif
factory.registerStorage(
"IcebergLocal",
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageLocalIcebergConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{
.supports_settings = false,
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::FILE,
});
@ -248,8 +246,6 @@ void registerStorageDeltaLake(StorageFactory & factory)
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3DeltaLakeConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{
@ -257,7 +253,7 @@ void registerStorageDeltaLake(StorageFactory & factory)
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
#endif
# endif
UNUSED(factory);
}
#endif
@ -270,8 +266,6 @@ void registerStorageHudi(StorageFactory & factory)
[&](const StorageFactory::Arguments & args)
{
auto configuration = std::make_shared<StorageS3HudiConfiguration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getLocalContext(), false);
return createStorageObjectStorage(args, configuration, args.getLocalContext());
},
{

View File

@ -32,7 +32,7 @@ StoragePtr createQueueStorage(const StorageFactory::Arguments & args)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
auto configuration = std::make_shared<Configuration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getContext(), false);
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getContext(), false, nullptr);
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current

View File

@ -130,7 +130,7 @@ public:
virtual void parseArgumentsImpl(ASTs & args, const ContextPtr & context)
{
StorageObjectStorage::Configuration::initialize(*getConfiguration(), args, context, true);
StorageObjectStorage::Configuration::initialize(*getConfiguration(), args, context, true, nullptr);
}
static void updateStructureAndFormatArgumentsIfNeeded(

File diff suppressed because it is too large Load Diff