Merge branch 'master' into pufit/fix-parsing-alter-drop-all-profiles

This commit is contained in:
pufit 2024-12-13 14:09:40 -05:00 committed by GitHub
commit ade97fa447
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
45 changed files with 1696 additions and 184 deletions

View File

@ -50,3 +50,4 @@ urllib3==1.26.5
wadllib==1.3.6
wheel==0.37.1
zipp==1.0.0
clickhouse-driver==0.2.7

View File

@ -96,7 +96,7 @@ private:
auto [_, inserted] = aliases.alias_name_to_lambda_node.insert(std::make_pair(alias, node));
if (!inserted)
addDuplicatingAlias(node);
addDuplicatingAlias(node);
return;
}

View File

@ -129,6 +129,7 @@ add_headers_and_sources(dbms Storages/ObjectStorage/S3)
add_headers_and_sources(dbms Storages/ObjectStorage/HDFS)
add_headers_and_sources(dbms Storages/ObjectStorage/Local)
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes)
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/Iceberg)
add_headers_and_sources(dbms Common/NamedCollections)
add_headers_and_sources(dbms Common/Scheduler/Workload)

View File

@ -201,7 +201,7 @@ namespace DB
DECLARE(UInt64, parts_kill_delay_period_random_add, 10, "Add uniformly distributed value from 0 to x seconds to kill_delay_period to avoid thundering herd effect and subsequent DoS of ZooKeeper in case of very large number of tables. Only available in ClickHouse Cloud", 0) \
DECLARE(UInt64, parts_killer_pool_size, 128, "Threads for cleanup of shared merge tree outdated threads. Only available in ClickHouse Cloud", 0) \
DECLARE(UInt64, keeper_multiread_batch_size, 10'000, "Maximum size of batch for MultiRead request to [Zoo]Keeper that support batching. If set to 0, batching is disabled. Available only in ClickHouse Cloud.", 0) \
DECLARE(Bool, use_legacy_mongodb_integration, true, "Obsolete, has no effect", 0) \
DECLARE(Bool, use_legacy_mongodb_integration, true, "Obsolete, does nothing.", SettingsTierType::OBSOLETE) \
DECLARE(Bool, send_settings_to_client, true, "Send user settings from server configuration to clients (in the server Hello message).", 0) \
\
DECLARE(UInt64, prefetch_threadpool_pool_size, 100, "Size of background pool for prefetches for remote object storages", 0) \

View File

@ -2277,6 +2277,48 @@ Result:
```
)", 0) \
\
DECLARE(Bool, skip_redundant_aliases_in_udf, false, R"(
Redundant aliases are not used (substituted) in user-defined functions in order to simplify it's usage.
Possible values:
- 1 The aliases are skipped (substituted) in UDFs.
- 0 The aliases are not skipped (substituted) in UDFs.
**Example**
The difference between enabled and disabled:
Query:
```sql
SET skip_redundant_aliases_in_udf = 0;
CREATE FUNCTION IF NOT EXISTS test_03274 AS ( x ) -> ((x + 1 as y, y + 2));
EXPLAIN SYNTAX SELECT test_03274(4 + 2);
```
Result:
```text
SELECT ((4 + 2) + 1 AS y, y + 2)
```
Query:
```sql
SET skip_redundant_aliases_in_udf = 1;
CREATE FUNCTION IF NOT EXISTS test_03274 AS ( x ) -> ((x + 1 as y, y + 2));
EXPLAIN SYNTAX SELECT test_03274(4 + 2);
```
Result:
```text
SELECT ((4 + 2) + 1, ((4 + 2) + 1) + 2)
```
)", 0) \
DECLARE(Bool, prefer_global_in_and_join, false, R"(
Enables the replacement of `IN`/`JOIN` operators with `GLOBAL IN`/`GLOBAL JOIN`.

View File

@ -77,6 +77,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"use_async_executor_for_materialized_views", false, false, "New setting."},
{"composed_data_type_output_format_mode", "default", "default", "New setting"},
{"http_response_headers", "", "", "New setting."},
{"skip_redundant_aliases_in_udf", false, false, "New setting."},
{"parallel_replicas_index_analysis_only_on_coordinator", true, true, "Index analysis done only on replica-coordinator and skipped on other replicas. Effective only with enabled parallel_replicas_local_plan"}, // enabling it was moved to 24.10
/// Release closed. Please use 25.1
}

View File

@ -53,7 +53,8 @@ std::pair<std::string, std::string> parseCatalogCredential(const std::string & c
/// Parse a string of format "<client_id>:<client_secret>"
/// into separare strings client_id and client_secret.
std::string client_id, client_secret;
std::string client_id;
std::string client_secret;
if (!catalog_credential.empty())
{
auto pos = catalog_credential.find(':');
@ -623,7 +624,9 @@ bool RestCatalog::getTableMetadataImpl(
static constexpr auto secret_access_key_str = "s3.secret-access-key";
static constexpr auto session_token_str = "s3.session-token";
std::string access_key_id, secret_access_key, session_token;
std::string access_key_id;
std::string secret_access_key;
std::string session_token;
if (config_object->has(access_key_id_str))
access_key_id = config_object->get(access_key_id_str).extract<String>();
if (config_object->has(secret_access_key_str))

View File

@ -11,28 +11,42 @@
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTIdentifier.h>
#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>
#include "Parsers/ASTColumnDeclaration.h"
#include <Interpreters/QueryAliasesVisitor.h>
#include <Interpreters/MarkTableIdentifiersVisitor.h>
#include <Interpreters/QueryNormalizer.h>
namespace DB
{
namespace Setting
{
extern const SettingsBool skip_redundant_aliases_in_udf;
}
namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
}
void UserDefinedSQLFunctionVisitor::visit(ASTPtr & ast)
void UserDefinedSQLFunctionVisitor::visit(ASTPtr & ast, ContextPtr context_)
{
chassert(ast);
if (const auto * function = ast->template as<ASTFunction>())
{
std::unordered_set<std::string> udf_in_replace_process;
auto replace_result = tryToReplaceFunction(*function, udf_in_replace_process, context_);
if (replace_result)
ast = replace_result;
}
for (auto & child : ast->children)
{
if (!child)
return;
auto * old_ptr = child.get();
visit(child);
visit(child, context_);
auto * new_ptr = child.get();
/// Some AST classes have naked pointers to children elements as members.
@ -44,22 +58,22 @@ void UserDefinedSQLFunctionVisitor::visit(ASTPtr & ast)
if (const auto * function = ast->template as<ASTFunction>())
{
std::unordered_set<std::string> udf_in_replace_process;
auto replace_result = tryToReplaceFunction(*function, udf_in_replace_process);
auto replace_result = tryToReplaceFunction(*function, udf_in_replace_process, context_);
if (replace_result)
ast = replace_result;
}
}
void UserDefinedSQLFunctionVisitor::visit(IAST * ast)
void UserDefinedSQLFunctionVisitor::visit(IAST * ast, ContextPtr context_)
{
if (!ast)
return;
for (auto & child : ast->children)
visit(child);
visit(child, context_);
}
ASTPtr UserDefinedSQLFunctionVisitor::tryToReplaceFunction(const ASTFunction & function, std::unordered_set<std::string> & udf_in_replace_process)
ASTPtr UserDefinedSQLFunctionVisitor::tryToReplaceFunction(const ASTFunction & function, std::unordered_set<std::string> & udf_in_replace_process, ContextPtr context_)
{
if (udf_in_replace_process.find(function.name) != udf_in_replace_process.end())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
@ -101,6 +115,20 @@ ASTPtr UserDefinedSQLFunctionVisitor::tryToReplaceFunction(const ASTFunction & f
auto function_body_to_update = function_core_expression->children.at(1)->clone();
if (context_->getSettingsRef()[Setting::skip_redundant_aliases_in_udf])
{
Aliases aliases;
QueryAliasesVisitor(aliases).visit(function_body_to_update);
/// Mark table ASTIdentifiers with not a column marker
MarkTableIdentifiersVisitor::Data identifiers_data{aliases};
MarkTableIdentifiersVisitor(identifiers_data).visit(function_body_to_update);
/// Common subexpression elimination. Rewrite rules.
QueryNormalizer::Data normalizer_data(aliases, {}, true, context_->getSettingsRef(), true, false);
QueryNormalizer(normalizer_data).visit(function_body_to_update);
}
auto expression_list = std::make_shared<ASTExpressionList>();
expression_list->children.emplace_back(std::move(function_body_to_update));
@ -116,7 +144,7 @@ ASTPtr UserDefinedSQLFunctionVisitor::tryToReplaceFunction(const ASTFunction & f
{
if (auto * inner_function = child->as<ASTFunction>())
{
auto replace_result = tryToReplaceFunction(*inner_function, udf_in_replace_process);
auto replace_result = tryToReplaceFunction(*inner_function, udf_in_replace_process, context_);
if (replace_result)
child = replace_result;
}

View File

@ -22,10 +22,10 @@ class ASTFunction;
class UserDefinedSQLFunctionVisitor
{
public:
static void visit(ASTPtr & ast);
static void visit(ASTPtr & ast, ContextPtr context_);
private:
static void visit(IAST *);
static ASTPtr tryToReplaceFunction(const ASTFunction & function, std::unordered_set<std::string> & udf_in_replace_process);
static void visit(IAST *, ContextPtr context_);
static ASTPtr tryToReplaceFunction(const ASTFunction & function, std::unordered_set<std::string> & udf_in_replace_process, ContextPtr context_);
};

View File

@ -99,7 +99,7 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
BlockIO res;
if (!UserDefinedSQLFunctionFactory::instance().empty())
UserDefinedSQLFunctionVisitor::visit(query_ptr);
UserDefinedSQLFunctionVisitor::visit(query_ptr, getContext());
auto table_id = getContext()->tryResolveStorageID(alter);
StoragePtr table;

View File

@ -1623,7 +1623,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
// substitute possible UDFs with their definitions
if (!UserDefinedSQLFunctionFactory::instance().empty())
UserDefinedSQLFunctionVisitor::visit(query_ptr);
UserDefinedSQLFunctionVisitor::visit(query_ptr, getContext());
/// Set and retrieve list of columns, indices and constraints. Set table engine if needed. Rewrite query in canonical way.
TableProperties properties = getTablePropertiesAndNormalizeCreateQuery(create, mode);

View File

@ -122,7 +122,7 @@ void QueryAliasesMatcher<T>::visitOther(const ASTPtr & ast, Data & data)
if (!alias.empty())
{
if (aliases.contains(alias) && ast->getTreeHash(/*ignore_aliases=*/ true) != aliases[alias]->getTreeHash(/*ignore_aliases=*/ true))
throw Exception(wrongAliasMessage(ast, aliases[alias], alias), ErrorCodes::MULTIPLE_EXPRESSIONS_FOR_ALIAS);
throw Exception(wrongAliasMessage(ast, aliases[alias], alias), ErrorCodes::MULTIPLE_EXPRESSIONS_FOR_ALIAS);
aliases[alias] = ast;
}

View File

@ -85,10 +85,10 @@ void QueryNormalizer::visit(ASTIdentifier & node, ASTPtr & ast, Data & data)
}
/// If it is an alias, but not a parent alias (for constructs like "SELECT column + 1 AS column").
auto it_alias = data.aliases.find(node.name());
if (!data.allow_self_aliases && current_alias == node.name())
throw Exception(ErrorCodes::CYCLIC_ALIASES, "Self referencing of {} to {}. Cyclic alias",
backQuote(current_alias), backQuote(node.name()));
auto it_alias = data.aliases.find(node.name());
if (it_alias != data.aliases.end() && current_alias != node.name())
{

View File

@ -5,6 +5,7 @@
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/TraceLog.h>
#include <Common/TraceSender.h>
#include <Common/ProfileEvents.h>
#include <Common/setThreadName.h>
#include <Common/logger_useful.h>
@ -62,21 +63,25 @@ void TraceCollector::tryClosePipe()
TraceCollector::~TraceCollector()
{
try
// Pipes could be already closed due to exception in TraceCollector::run.
if (TraceSender::pipe.fds_rw[1] >= 0)
{
/** Sends TraceCollector stop message
*
* Each sequence of data for TraceCollector thread starts with a boolean flag.
* If this flag is true, TraceCollector must stop reading trace_pipe and exit.
* This function sends flag with a true value to stop TraceCollector gracefully.
*/
WriteBufferFromFileDescriptor out(TraceSender::pipe.fds_rw[1]);
writeChar(true, out);
out.finalize();
}
catch (...)
{
tryLogCurrentException("TraceCollector");
try
{
/** Sends TraceCollector stop message
*
* Each sequence of data for TraceCollector thread starts with a boolean flag.
* If this flag is true, TraceCollector must stop reading trace_pipe and exit.
* This function sends flag with a true value to stop TraceCollector gracefully.
*/
WriteBufferFromFileDescriptor out(TraceSender::pipe.fds_rw[1]);
writeChar(true, out);
out.finalize();
}
catch (...)
{
tryLogCurrentException("TraceCollector");
}
}
tryClosePipe();

View File

@ -1574,7 +1574,7 @@ void TreeRewriter::normalize(
ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set, bool ignore_alias, const Settings & settings, bool allow_self_aliases, ContextPtr context_, bool is_create_parameterized_view)
{
if (!UserDefinedSQLFunctionFactory::instance().empty())
UserDefinedSQLFunctionVisitor::visit(query);
UserDefinedSQLFunctionVisitor::visit(query, context_);
CustomizeCountDistinctVisitor::Data data_count_distinct{settings[Setting::count_distinct_implementation]};
CustomizeCountDistinctVisitor(data_count_distinct).visit(query);

View File

@ -5,6 +5,7 @@
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/StorageID.h>
#include <Parsers/queryToString.h>
#include <Parsers/ExpressionElementParsers.h>
#include <IO/Operators.h>
@ -109,7 +110,15 @@ void ASTIdentifier::formatImplWithoutAlias(WriteBuffer & ostr, const FormatSetti
auto format_element = [&](const String & elem_name)
{
ostr << (settings.hilite ? hilite_identifier : "");
settings.writeIdentifier(ostr, elem_name, /*ambiguous=*/false);
if (auto special_delimiter_and_identifier = ParserCompoundIdentifier::splitSpecialDelimiterAndIdentifierIfAny(elem_name))
{
ostr << special_delimiter_and_identifier->first;
settings.writeIdentifier(ostr, special_delimiter_and_identifier->second, /*ambiguous=*/false);
}
else
{
settings.writeIdentifier(ostr, elem_name, /*ambiguous=*/false);
}
ostr << (settings.hilite ? hilite_none : "");
};

View File

@ -415,6 +415,20 @@ bool ParserCompoundIdentifier::parseImpl(Pos & pos, ASTPtr & node, Expected & ex
return true;
}
std::optional<std::pair<char, String>> ParserCompoundIdentifier::splitSpecialDelimiterAndIdentifierIfAny(const String & name)
{
/// Identifier with special delimiter looks like this: <special_delimiter>`<identifier>`.
if (name.size() < 3
|| (name[0] != char(SpecialDelimiter::JSON_PATH_DYNAMIC_TYPE) && name[0] != char(SpecialDelimiter::JSON_PATH_PREFIX))
|| name[1] != '`' || name.back() != '`')
return std::nullopt;
String identifier;
ReadBufferFromMemory buf(name.data() + 1, name.size() - 1);
readBackQuotedString(identifier, buf);
return std::make_pair(name[0], identifier);
}
ASTPtr createFunctionCast(const ASTPtr & expr_ast, const ASTPtr & type_ast)
{

View File

@ -61,6 +61,16 @@ protected:
class ParserCompoundIdentifier : public IParserBase
{
public:
explicit ParserCompoundIdentifier(bool table_name_with_optional_uuid_ = false, bool allow_query_parameter_ = false, Highlight highlight_type_ = Highlight::identifier)
: table_name_with_optional_uuid(table_name_with_optional_uuid_), allow_query_parameter(allow_query_parameter_), highlight_type(highlight_type_)
{
}
/// Checks if the identirier is actually a pair of a special delimiter and the identifier in back quotes.
/// For example: :`UInt64` or ^`path` from special JSON subcolumns.
static std::optional<std::pair<char, String>> splitSpecialDelimiterAndIdentifierIfAny(const String & name);
protected:
enum class SpecialDelimiter : char
{
NONE = '\0',
@ -68,12 +78,6 @@ public:
JSON_PATH_PREFIX = '^',
};
explicit ParserCompoundIdentifier(bool table_name_with_optional_uuid_ = false, bool allow_query_parameter_ = false, Highlight highlight_type_ = Highlight::identifier)
: table_name_with_optional_uuid(table_name_with_optional_uuid_), allow_query_parameter(allow_query_parameter_), highlight_type(highlight_type_)
{
}
protected:
const char * getName() const override { return "compound identifier"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
bool table_name_with_optional_uuid;

View File

@ -1110,8 +1110,13 @@ void TCPHandler::processInsertQuery(QueryState & state)
startInsertQuery(state);
while (receivePacketsExpectDataConcurrentWithExecutor(state))
{
executor.push(std::move(state.block_for_insert));
sendLogs(state);
sendInsertProfileEvents(state);
}
state.read_all_data = true;
executor.finish();

View File

@ -883,39 +883,6 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
}
}
void IMergeTreeDataPart::appendFilesOfColumnsChecksumsIndexes(Strings & files, bool include_projection) const
{
if (isStoredOnDisk())
{
if (!isStoredOnReadonlyDisk())
appendFilesOfUUID(files);
appendFilesOfColumns(files);
appendFilesOfChecksums(files);
appendFilesOfIndexGranularity(files);
appendFilesOfIndex(files);
appendFilesOfRowsCount(files);
appendFilesOfPartitionAndMinMaxIndex(files);
if (!isStoredOnReadonlyDisk())
appendFilesOfTTLInfos(files);
appendFilesOfDefaultCompressionCodec(files);
appendFilesOfMetadataVersion(files);
}
if (!parent_part && include_projection)
{
for (const auto & [projection_name, projection_part] : projection_parts)
{
Strings projection_files;
projection_part->appendFilesOfColumnsChecksumsIndexes(projection_files, true);
for (const auto & projection_file : projection_files)
files.push_back(fs::path(projection_part->name + ".proj") / projection_file);
}
}
}
MergeTreeDataPartBuilder IMergeTreeDataPart::getProjectionPartBuilder(const String & projection_name, bool is_temp_projection)
{
const char * projection_extension = is_temp_projection ? ".tmp_proj" : ".proj";
@ -994,10 +961,6 @@ void IMergeTreeDataPart::loadIndexGranularity()
"Method 'loadIndexGranularity' is not implemented for part with type {}", getType().toString());
}
/// Currently we don't cache mark files of part, because cache other meta files is enough to speed up loading.
void IMergeTreeDataPart::appendFilesOfIndexGranularity(Strings & /* files */) const
{
}
template <typename Columns>
void IMergeTreeDataPart::optimizeIndexColumns(size_t marks_count, Columns & index_columns) const
@ -1098,22 +1061,6 @@ std::shared_ptr<IMergeTreeDataPart::Index> IMergeTreeDataPart::loadIndex() const
return std::make_shared<Index>(std::make_move_iterator(loaded_index.begin()), std::make_move_iterator(loaded_index.end()));
}
void IMergeTreeDataPart::appendFilesOfIndex(Strings & files) const
{
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
if (parent_part)
metadata_snapshot = metadata_snapshot->projections.has(name) ? metadata_snapshot->projections.get(name).metadata : nullptr;
if (!metadata_snapshot)
return;
if (metadata_snapshot->hasPrimaryKey())
{
String index_name = "primary" + getIndexExtensionFromFilesystem(getDataPartStorage());
files.push_back(index_name);
}
}
NameSet IMergeTreeDataPart::getFileNamesWithoutChecksums() const
{
if (!isStoredOnDisk())
@ -1287,16 +1234,6 @@ void IMergeTreeDataPart::removeMetadataVersion()
getDataPartStorage().removeFileIfExists(METADATA_VERSION_FILE_NAME);
}
void IMergeTreeDataPart::appendFilesOfDefaultCompressionCodec(Strings & files)
{
files.push_back(DEFAULT_COMPRESSION_CODEC_FILE_NAME);
}
void IMergeTreeDataPart::appendFilesOfMetadataVersion(Strings & files)
{
files.push_back(METADATA_VERSION_FILE_NAME);
}
CompressionCodecPtr IMergeTreeDataPart::detectDefaultCompressionCodec() const
{
/// In memory parts doesn't have any compression
@ -1384,18 +1321,6 @@ void IMergeTreeDataPart::loadPartitionAndMinMaxIndex()
getDataPartStorage().getFullPath(), calculated_partition_id, info.partition_id);
}
void IMergeTreeDataPart::appendFilesOfPartitionAndMinMaxIndex(Strings & files) const
{
if (storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING && !parent_part)
return;
if (!parent_part)
MergeTreePartition::appendFiles(storage, files);
if (!parent_part)
IMergeTreeDataPart::MinMaxIndex::appendFiles(storage, files);
}
void IMergeTreeDataPart::loadChecksums(bool require)
{
if (auto buf = metadata_manager->readIfExists("checksums.txt"))
@ -1427,11 +1352,6 @@ void IMergeTreeDataPart::loadChecksums(bool require)
}
}
void IMergeTreeDataPart::appendFilesOfChecksums(Strings & files)
{
files.push_back("checksums.txt");
}
void IMergeTreeDataPart::loadRowsCountFileForUnexpectedPart()
{
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || part_type == Type::Compact || parent_part)
@ -1655,11 +1575,6 @@ UInt64 IMergeTreeDataPart::readExistingRowsCount()
return existing_count;
}
void IMergeTreeDataPart::appendFilesOfRowsCount(Strings & files)
{
files.push_back("count.txt");
}
void IMergeTreeDataPart::loadTTLInfos()
{
if (auto in = metadata_manager->readIfExists("ttl.txt"))
@ -1686,11 +1601,6 @@ void IMergeTreeDataPart::loadTTLInfos()
}
void IMergeTreeDataPart::appendFilesOfTTLInfos(Strings & files)
{
files.push_back("ttl.txt");
}
void IMergeTreeDataPart::loadUUID()
{
if (auto in = metadata_manager->readIfExists(UUID_FILE_NAME))
@ -1701,11 +1611,6 @@ void IMergeTreeDataPart::loadUUID()
}
}
void IMergeTreeDataPart::appendFilesOfUUID(Strings & files)
{
files.push_back(UUID_FILE_NAME);
}
void IMergeTreeDataPart::loadColumns(bool require)
{
String path = fs::path(getDataPartStorage().getRelativePath()) / "columns.txt";
@ -1995,11 +1900,6 @@ bool IMergeTreeDataPart::assertHasValidVersionMetadata() const
}
}
void IMergeTreeDataPart::appendFilesOfColumns(Strings & files)
{
files.push_back("columns.txt");
files.push_back(SERIALIZATION_FILE_NAME);
}
bool IMergeTreeDataPart::shallParticipateInMerges(const StoragePolicyPtr & storage_policy) const
{

View File

@ -179,7 +179,6 @@ public:
/// Initialize columns (from columns.txt if exists, or create from column files if not).
/// Load various metadata into memory: checksums from checksums.txt, index if required, etc.
void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency);
void appendFilesOfColumnsChecksumsIndexes(Strings & files, bool include_projection = false) const;
void loadRowsCountFileForUnexpectedPart();
@ -700,20 +699,13 @@ private:
/// Reads part unique identifier (if exists) from uuid.txt
void loadUUID();
static void appendFilesOfUUID(Strings & files);
/// Reads columns names and types from columns.txt
void loadColumns(bool require);
static void appendFilesOfColumns(Strings & files);
static void appendFilesOfChecksums(Strings & files);
/// Loads marks index granularity into memory
virtual void loadIndexGranularity();
virtual void appendFilesOfIndexGranularity(Strings & files) const;
/// Loads the index file.
std::shared_ptr<Index> loadIndex() const;
@ -721,8 +713,6 @@ private:
template <typename Columns>
void optimizeIndexColumns(size_t marks_count, Columns & index_columns) const;
void appendFilesOfIndex(Strings & files) const;
/// Load rows count for this part from disk (for the newer storage format version).
/// For the older format version calculates rows count from the size of a column with a fixed size.
void loadRowsCount();
@ -731,21 +721,15 @@ private:
/// if load_existing_rows_count_for_old_parts and exclude_deleted_rows_for_part_size_in_merge are both enabled.
void loadExistingRowsCount();
static void appendFilesOfRowsCount(Strings & files);
/// Loads ttl infos in json format from file ttl.txt. If file doesn't exists assigns ttl infos with all zeros
void loadTTLInfos();
static void appendFilesOfTTLInfos(Strings & files);
void loadPartitionAndMinMaxIndex();
void calculateColumnsSizesOnDisk(std::optional<Block> columns_sample = std::nullopt);
void calculateSecondaryIndicesSizesOnDisk();
void appendFilesOfPartitionAndMinMaxIndex(Strings & files) const;
/// Load default compression codec from file default_compression_codec.txt
/// if it not exists tries to deduce codec from compressed column without
/// any specifial compression.
@ -757,10 +741,6 @@ private:
template <typename Writer>
void writeMetadata(const String & filename, const WriteSettings & settings, Writer && writer);
static void appendFilesOfDefaultCompressionCodec(Strings & files);
static void appendFilesOfMetadataVersion(Strings & files);
/// Found column without specific compression and return codec
/// for this column with default parameters.
CompressionCodecPtr detectDefaultCompressionCodec() const;

View File

@ -5,7 +5,7 @@
#include <Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h>
#include <Storages/ObjectStorage/DataLakes/HudiMetadata.h>
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
#include <Storages/ObjectStorage/DataLakes/IcebergMetadata.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h>
#include <Storages/ObjectStorage/HDFS/Configuration.h>
#include <Storages/ObjectStorage/Local/Configuration.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
@ -45,11 +45,12 @@ public:
void update(ObjectStoragePtr object_storage, ContextPtr local_context) override
{
BaseStorageConfiguration::update(object_storage, local_context);
auto new_metadata = DataLakeMetadata::create(object_storage, weak_from_this(), local_context);
if (!current_metadata || (*current_metadata != *new_metadata))
bool existed = current_metadata != nullptr;
if (updateMetadataObjectIfNeeded(object_storage, local_context))
{
if (hasExternalDynamicMetadata())
if (hasExternalDynamicMetadata() && existed)
{
throw Exception(
ErrorCodes::FORMAT_VERSION_TOO_OLD,
@ -57,7 +58,6 @@ public:
}
else
{
current_metadata = std::move(new_metadata);
BaseStorageConfiguration::setPaths(current_metadata->getDataFiles());
BaseStorageConfiguration::setPartitionColumns(current_metadata->getPartitionColumns());
}
@ -99,14 +99,12 @@ public:
ColumnsDescription updateAndGetCurrentSchema(ObjectStoragePtr object_storage, ContextPtr context) override
{
BaseStorageConfiguration::update(object_storage, context);
auto new_metadata = DataLakeMetadata::create(object_storage, weak_from_this(), context);
if (!current_metadata || (*current_metadata != *new_metadata))
if (updateMetadataObjectIfNeeded(object_storage, context))
{
current_metadata = std::move(new_metadata);
BaseStorageConfiguration::setPaths(current_metadata->getDataFiles());
BaseStorageConfiguration::setPartitionColumns(current_metadata->getPartitionColumns());
}
return ColumnsDescription{current_metadata->getTableSchema()};
}
@ -137,20 +135,46 @@ private:
}
return info;
}
bool updateMetadataObjectIfNeeded(ObjectStoragePtr object_storage, ContextPtr context)
{
if (!current_metadata)
{
current_metadata = DataLakeMetadata::create(object_storage, weak_from_this(), context);
return true;
}
if (current_metadata->supportsUpdate())
{
return current_metadata->update(context);
}
auto new_metadata = DataLakeMetadata::create(object_storage, weak_from_this(), context);
if (*current_metadata != *new_metadata)
{
current_metadata = std::move(new_metadata);
return true;
}
else
{
return false;
}
}
};
#if USE_AVRO
#if USE_AWS_S3
# if USE_AWS_S3
using StorageS3IcebergConfiguration = DataLakeConfiguration<StorageS3Configuration, IcebergMetadata>;
# endif
#endif
#if USE_AZURE_BLOB_STORAGE
using StorageAzureIcebergConfiguration = DataLakeConfiguration<StorageAzureConfiguration, IcebergMetadata>;
# endif
#endif
#if USE_HDFS
using StorageHDFSIcebergConfiguration = DataLakeConfiguration<StorageHDFSConfiguration, IcebergMetadata>;
# endif
#endif
using StorageLocalIcebergConfiguration = DataLakeConfiguration<StorageLocalConfiguration, IcebergMetadata>;
#endif
@ -158,7 +182,7 @@ using StorageLocalIcebergConfiguration = DataLakeConfiguration<StorageLocalConfi
#if USE_PARQUET
#if USE_AWS_S3
using StorageS3DeltaLakeConfiguration = DataLakeConfiguration<StorageS3Configuration, DeltaLakeMetadata>;
# endif
#endif
#endif
#if USE_AWS_S3

View File

@ -19,6 +19,8 @@ public:
virtual std::shared_ptr<NamesAndTypesList> getInitialSchemaByPath(const String &) const { return {}; }
virtual std::shared_ptr<const ActionsDAG> getSchemaTransformer(const String &) const { return {}; }
virtual bool supportsExternalMetadataChange() const { return false; }
virtual bool supportsUpdate() const { return false; }
virtual bool update(const ContextPtr &) { return false; }
};
using DataLakeMetadataPtr = std::unique_ptr<IDataLakeMetadata>;

View File

@ -0,0 +1,424 @@
#include "config.h"
#if USE_AVRO
#include <Columns/ColumnString.h>
#include <Columns/ColumnTuple.h>
#include <Columns/IColumn.h>
#include <Core/Settings.h>
#include <Formats/FormatFactory.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Processors/Formats/Impl/AvroRowInputFormat.h>
#include <Storages/ObjectStorage/DataLakes/Common.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Common/logger_useful.h>
#include "Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h"
#include "Storages/ObjectStorage/DataLakes/Iceberg/Utils.h"
#include "Storages/ObjectStorage/DataLakes/Iceberg/ManifestFileImpl.h"
#include "Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h"
namespace DB
{
namespace Setting
{
extern const SettingsBool allow_data_lake_dynamic_schema;
}
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
extern const int ILLEGAL_COLUMN;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
using namespace Iceberg;
std::pair<Int32, Poco::JSON::Object::Ptr>
parseTableSchemaFromManifestFile(const avro::DataFileReaderBase & manifest_file_reader, const String & manifest_file_name)
{
auto avro_metadata = manifest_file_reader.metadata();
auto avro_schema_it = avro_metadata.find("schema");
if (avro_schema_it == avro_metadata.end())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Cannot read Iceberg table: manifest file {} doesn't have table schema in its metadata",
manifest_file_name);
std::vector<uint8_t> schema_json = avro_schema_it->second;
String schema_json_string = String(reinterpret_cast<char *>(schema_json.data()), schema_json.size());
Poco::JSON::Parser parser;
Poco::Dynamic::Var json = parser.parse(schema_json_string);
const Poco::JSON::Object::Ptr & schema_object = json.extract<Poco::JSON::Object::Ptr>();
Int32 schema_object_id = schema_object->getValue<int>("schema-id");
return {schema_object_id, schema_object};
}
IcebergMetadata::IcebergMetadata(
ObjectStoragePtr object_storage_,
ConfigurationObserverPtr configuration_,
const DB::ContextPtr & context_,
Int32 metadata_version_,
Int32 format_version_,
const Poco::JSON::Object::Ptr & object)
: WithContext(context_)
, object_storage(std::move(object_storage_))
, configuration(std::move(configuration_))
, schema_processor(IcebergSchemaProcessor())
, log(getLogger("IcebergMetadata"))
, current_metadata_version(metadata_version_)
, format_version(format_version_)
{
auto manifest_list_file = getRelevantManifestList(object);
if (manifest_list_file)
{
current_snapshot = getSnapshot(manifest_list_file.value());
}
current_schema_id = parseTableSchema(object, schema_processor, log);
}
std::pair<Poco::JSON::Object::Ptr, Int32> parseTableSchemaV2Method(const Poco::JSON::Object::Ptr & metadata_object)
{
Poco::JSON::Object::Ptr schema;
if (!metadata_object->has("current-schema-id"))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: 'current-schema-id' field is missing in metadata");
auto current_schema_id = metadata_object->getValue<int>("current-schema-id");
if (!metadata_object->has("schemas"))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: 'schemas' field is missing in metadata");
auto schemas = metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>();
if (schemas->size() == 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: schemas field is empty");
for (uint32_t i = 0; i != schemas->size(); ++i)
{
auto current_schema = schemas->getObject(i);
if (!current_schema->has("schema-id"))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: 'schema-id' field is missing in schema");
}
if (current_schema->getValue<int>("schema-id") == current_schema_id)
{
schema = current_schema;
break;
}
}
if (!schema)
throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(There is no schema with "schema-id" that matches "current-schema-id" in metadata)");
if (schema->getValue<int>("schema-id") != current_schema_id)
throw Exception(ErrorCodes::BAD_ARGUMENTS, R"(Field "schema-id" of the schema doesn't match "current-schema-id" in metadata)");
return {schema, current_schema_id};
}
std::pair<Poco::JSON::Object::Ptr, Int32> parseTableSchemaV1Method(const Poco::JSON::Object::Ptr & metadata_object)
{
if (!metadata_object->has("schema"))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: 'schema' field is missing in metadata");
Poco::JSON::Object::Ptr schema = metadata_object->getObject("schema");
if (!metadata_object->has("schema"))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse Iceberg table schema: 'schema-id' field is missing in schema");
auto current_schema_id = schema->getValue<int>("schema-id");
return {schema, current_schema_id};
}
Int32 IcebergMetadata::parseTableSchema(
const Poco::JSON::Object::Ptr & metadata_object, IcebergSchemaProcessor & schema_processor, LoggerPtr metadata_logger)
{
Int32 format_version = metadata_object->getValue<Int32>("format-version");
if (format_version == 2)
{
auto [schema, current_schema_id] = parseTableSchemaV2Method(metadata_object);
schema_processor.addIcebergTableSchema(schema);
return current_schema_id;
}
else
{
try
{
auto [schema, current_schema_id] = parseTableSchemaV1Method(metadata_object);
schema_processor.addIcebergTableSchema(schema);
return current_schema_id;
}
catch (const Exception & first_error)
{
if (first_error.code() != ErrorCodes::BAD_ARGUMENTS)
throw;
try
{
auto [schema, current_schema_id] = parseTableSchemaV2Method(metadata_object);
schema_processor.addIcebergTableSchema(schema);
LOG_WARNING(
metadata_logger,
"Iceberg table schema was parsed using v2 specification, but it was impossible to parse it using v1 "
"specification. Be "
"aware that you Iceberg writing engine violates Iceberg specification. Error during parsing {}",
first_error.displayText());
return current_schema_id;
}
catch (const Exception & second_error)
{
if (first_error.code() != ErrorCodes::BAD_ARGUMENTS)
throw;
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Cannot parse Iceberg table schema both with v1 and v2 methods. Old method error: {}. New method error: {}",
first_error.displayText(),
second_error.displayText());
}
}
}
}
/**
* Each version of table metadata is stored in a `metadata` directory and
* has one of 2 formats:
* 1) v<V>.metadata.json, where V - metadata version.
* 2) <V>-<random-uuid>.metadata.json, where V - metadata version
*/
std::pair<Int32, String>
getMetadataFileAndVersion(const ObjectStoragePtr & object_storage, const StorageObjectStorage::Configuration & configuration)
{
const auto metadata_files = listFiles(*object_storage, configuration, "metadata", ".metadata.json");
if (metadata_files.empty())
{
throw Exception(
ErrorCodes::FILE_DOESNT_EXIST, "The metadata file for Iceberg table with path {} doesn't exist", configuration.getPath());
}
std::vector<std::pair<UInt32, String>> metadata_files_with_versions;
metadata_files_with_versions.reserve(metadata_files.size());
for (const auto & path : metadata_files)
{
String file_name(path.begin() + path.find_last_of('/') + 1, path.end());
String version_str;
/// v<V>.metadata.json
if (file_name.starts_with('v'))
version_str = String(file_name.begin() + 1, file_name.begin() + file_name.find_first_of('.'));
/// <V>-<random-uuid>.metadata.json
else
version_str = String(file_name.begin(), file_name.begin() + file_name.find_first_of('-'));
if (!std::all_of(version_str.begin(), version_str.end(), isdigit))
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name);
metadata_files_with_versions.emplace_back(std::stoi(version_str), path);
}
/// Get the latest version of metadata file: v<V>.metadata.json
return *std::max_element(metadata_files_with_versions.begin(), metadata_files_with_versions.end());
}
Poco::JSON::Object::Ptr IcebergMetadata::readJSON(const String & metadata_file_path, const ContextPtr & local_context) const
{
StorageObjectStorageSource::ObjectInfo object_info(metadata_file_path);
auto buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log);
String json_str;
readJSONObjectPossiblyInvalid(json_str, *buf);
Poco::JSON::Parser parser; /// For some reason base/base/JSON.h can not parse this json file
Poco::Dynamic::Var json = parser.parse(json_str);
return json.extract<Poco::JSON::Object::Ptr>();
}
bool IcebergMetadata::update(const ContextPtr & local_context)
{
auto configuration_ptr = configuration.lock();
const auto [metadata_version, metadata_file_path] = getMetadataFileAndVersion(object_storage, *configuration_ptr);
if (metadata_version == current_metadata_version)
return false;
current_metadata_version = metadata_version;
auto metadata_object = readJSON(metadata_file_path, local_context);
chassert(format_version == metadata_object->getValue<int>("format-version"));
auto manifest_list_file = getRelevantManifestList(metadata_object);
if (manifest_list_file && (!current_snapshot.has_value() || (manifest_list_file.value() != current_snapshot->getName())))
{
current_snapshot = getSnapshot(manifest_list_file.value());
cached_files_for_current_snapshot = std::nullopt;
}
current_schema_id = parseTableSchema(metadata_object, schema_processor, log);
return true;
}
std::optional<String> IcebergMetadata::getRelevantManifestList(const Poco::JSON::Object::Ptr & metadata)
{
auto configuration_ptr = configuration.lock();
auto snapshots = metadata->get("snapshots").extract<Poco::JSON::Array::Ptr>();
auto current_snapshot_id = metadata->getValue<Int64>("current-snapshot-id");
for (size_t i = 0; i < snapshots->size(); ++i)
{
const auto snapshot = snapshots->getObject(static_cast<UInt32>(i));
if (snapshot->getValue<Int64>("snapshot-id") == current_snapshot_id)
{
const auto path = snapshot->getValue<String>("manifest-list");
return std::filesystem::path(path).filename();
}
}
return std::nullopt;
}
std::optional<Int32> IcebergMetadata::getSchemaVersionByFileIfOutdated(String data_path) const
{
auto manifest_file_it = manifest_entry_by_data_file.find(data_path);
if (manifest_file_it == manifest_entry_by_data_file.end())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot find schema version for data file: {}", data_path);
}
auto schema_id = manifest_file_it->second.getContent().getSchemaId();
if (schema_id == current_schema_id)
return std::nullopt;
return std::optional{schema_id};
}
DataLakeMetadataPtr IcebergMetadata::create(
const ObjectStoragePtr & object_storage, const ConfigurationObserverPtr & configuration, const ContextPtr & local_context)
{
auto configuration_ptr = configuration.lock();
const auto [metadata_version, metadata_file_path] = getMetadataFileAndVersion(object_storage, *configuration_ptr);
auto log = getLogger("IcebergMetadata");
StorageObjectStorageSource::ObjectInfo object_info(metadata_file_path);
auto buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, local_context, log);
String json_str;
readJSONObjectPossiblyInvalid(json_str, *buf);
Poco::JSON::Parser parser; /// For some reason base/base/JSON.h can not parse this json file
Poco::Dynamic::Var json = parser.parse(json_str);
const Poco::JSON::Object::Ptr & object = json.extract<Poco::JSON::Object::Ptr>();
IcebergSchemaProcessor schema_processor;
auto format_version = object->getValue<int>("format-version");
auto ptr
= std::make_unique<IcebergMetadata>(object_storage, configuration_ptr, local_context, metadata_version, format_version, object);
return ptr;
}
ManifestList IcebergMetadata::initializeManifestList(const String & manifest_list_file) const
{
auto configuration_ptr = configuration.lock();
if (configuration_ptr == nullptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Configuration is expired");
auto context = getContext();
StorageObjectStorageSource::ObjectInfo object_info(
std::filesystem::path(configuration_ptr->getPath()) / "metadata" / manifest_list_file);
auto manifest_list_buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, context, log);
auto manifest_list_file_reader
= std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*manifest_list_buf));
auto data_type = AvroSchemaReader::avroNodeToDataType(manifest_list_file_reader->dataSchema().root()->leafAt(0));
Block header{{data_type->createColumn(), data_type, "manifest_path"}};
auto columns = parseAvro(*manifest_list_file_reader, header, getFormatSettings(context));
auto & col = columns.at(0);
if (col->getDataType() != TypeIndex::String)
{
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `manifest_path` field should be String type, got {}",
col->getFamilyName());
}
const auto * col_str = typeid_cast<ColumnString *>(col.get());
std::vector<ManifestFileEntry> manifest_files;
for (size_t i = 0; i < col_str->size(); ++i)
{
const auto file_path = col_str->getDataAt(i).toView();
const auto filename = std::filesystem::path(file_path).filename();
String manifest_file = std::filesystem::path(configuration_ptr->getPath()) / "metadata" / filename;
auto manifest_file_it = manifest_files_by_name.find(manifest_file);
if (manifest_file_it != manifest_files_by_name.end())
{
manifest_files.emplace_back(manifest_file_it);
continue;
}
manifest_files.emplace_back(initializeManifestFile(filename, configuration_ptr));
}
return ManifestList{manifest_files};
}
ManifestFileEntry IcebergMetadata::initializeManifestFile(const String & filename, const ConfigurationPtr & configuration_ptr) const
{
String manifest_file = std::filesystem::path(configuration_ptr->getPath()) / "metadata" / filename;
StorageObjectStorageSource::ObjectInfo manifest_object_info(manifest_file);
auto buffer = StorageObjectStorageSource::createReadBuffer(manifest_object_info, object_storage, getContext(), log);
auto manifest_file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buffer));
auto [schema_id, schema_object] = parseTableSchemaFromManifestFile(*manifest_file_reader, filename);
auto manifest_file_impl = std::make_unique<ManifestFileContentImpl>(
std::move(manifest_file_reader), format_version, configuration_ptr->getPath(), getFormatSettings(getContext()), schema_id);
auto [manifest_file_iterator, _inserted]
= manifest_files_by_name.emplace(manifest_file, ManifestFileContent(std::move(manifest_file_impl)));
ManifestFileEntry manifest_file_entry{manifest_file_iterator};
for (const auto & data_file : manifest_file_entry.getContent().getDataFiles())
{
manifest_entry_by_data_file.emplace(data_file.data_file_name, manifest_file_entry);
}
schema_processor.addIcebergTableSchema(schema_object);
return manifest_file_entry;
}
IcebergSnapshot IcebergMetadata::getSnapshot(const String & manifest_list_file) const
{
const auto manifest_list_file_it = manifest_lists_by_name.find(manifest_list_file);
if (manifest_list_file_it != manifest_lists_by_name.end())
return IcebergSnapshot(manifest_list_file_it);
return IcebergSnapshot{manifest_lists_by_name.emplace(manifest_list_file, initializeManifestList(manifest_list_file)).first};
}
Strings IcebergMetadata::getDataFiles() const
{
if (!current_snapshot)
{
return {};
}
if (cached_files_for_current_snapshot.has_value())
{
return cached_files_for_current_snapshot.value();
}
Strings data_files;
for (const auto & manifest_entry : current_snapshot->getManifestList().getManifestFiles())
{
for (const auto & data_file : manifest_entry.getContent().getDataFiles())
{
if (data_file.status != ManifestEntryStatus::DELETED)
{
data_files.push_back(data_file.data_file_name);
}
}
}
cached_files_for_current_snapshot.emplace(std::move(data_files));
return cached_files_for_current_snapshot.value();
}
}
#endif

View File

@ -0,0 +1,129 @@
#pragma once
#include "config.h"
#if USE_AVRO
#include <Core/Types.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Interpreters/Context_fwd.h>
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Poco/JSON/Array.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include "Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h"
#include "Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h"
#include "Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h"
#include <unordered_map>
namespace DB
{
class IcebergMetadata : public IDataLakeMetadata, private WithContext
{
public:
using ConfigurationObserverPtr = StorageObjectStorage::ConfigurationObserverPtr;
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
static constexpr auto name = "Iceberg";
IcebergMetadata(
ObjectStoragePtr object_storage_,
ConfigurationObserverPtr configuration_,
const DB::ContextPtr & context_,
Int32 metadata_version_,
Int32 format_version_,
const Poco::JSON::Object::Ptr & object);
/// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files.
/// All subsequent calls when the same data snapshot is relevant will return saved list of files (because it cannot be changed
/// without changing metadata file). Drops on every snapshot update.
Strings getDataFiles() const override;
/// Get table schema parsed from metadata.
NamesAndTypesList getTableSchema() const override { return *schema_processor.getClickhouseTableSchemaById(current_schema_id); }
const std::unordered_map<String, String> & getColumnNameToPhysicalNameMapping() const override { return column_name_to_physical_name; }
const DataLakePartitionColumns & getPartitionColumns() const override { return partition_columns; }
bool operator==(const IDataLakeMetadata & other) const override
{
const auto * iceberg_metadata = dynamic_cast<const IcebergMetadata *>(&other);
return iceberg_metadata && getVersion() == iceberg_metadata->getVersion();
}
static DataLakeMetadataPtr
create(const ObjectStoragePtr & object_storage, const ConfigurationObserverPtr & configuration, const ContextPtr & local_context);
size_t getVersion() const { return current_metadata_version; }
std::shared_ptr<NamesAndTypesList> getInitialSchemaByPath(const String & data_path) const override
{
auto version_if_outdated = getSchemaVersionByFileIfOutdated(data_path);
return version_if_outdated.has_value() ? schema_processor.getClickhouseTableSchemaById(version_if_outdated.value()) : nullptr;
}
std::shared_ptr<const ActionsDAG> getSchemaTransformer(const String & data_path) const override
{
auto version_if_outdated = getSchemaVersionByFileIfOutdated(data_path);
return version_if_outdated.has_value()
? schema_processor.getSchemaTransformationDagByIds(version_if_outdated.value(), current_schema_id)
: nullptr;
}
bool supportsExternalMetadataChange() const override { return true; }
static Int32
parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, IcebergSchemaProcessor & schema_processor, LoggerPtr metadata_logger);
bool supportsUpdate() const override { return true; }
bool update(const ContextPtr & local_context) override;
private:
using ManifestEntryByDataFile = std::unordered_map<String, Iceberg::ManifestFileEntry>;
const ObjectStoragePtr object_storage;
const ConfigurationObserverPtr configuration;
mutable IcebergSchemaProcessor schema_processor;
LoggerPtr log;
mutable Iceberg::ManifestFilesByName manifest_files_by_name;
mutable Iceberg::ManifestListsByName manifest_lists_by_name;
mutable ManifestEntryByDataFile manifest_entry_by_data_file;
Int32 current_metadata_version;
Int32 format_version;
Int32 current_schema_id;
std::optional<Iceberg::IcebergSnapshot> current_snapshot;
mutable std::optional<Strings> cached_files_for_current_snapshot;
Iceberg::ManifestList initializeManifestList(const String & manifest_list_file) const;
Iceberg::IcebergSnapshot getSnapshot(const String & manifest_list_file) const;
std::optional<Int32> getSchemaVersionByFileIfOutdated(String data_path) const;
Iceberg::ManifestFileEntry getManifestFile(const String & manifest_file) const;
Iceberg::ManifestFileEntry initializeManifestFile(const String & filename, const ConfigurationPtr & configuration_ptr) const;
std::optional<String> getRelevantManifestList(const Poco::JSON::Object::Ptr & metadata);
Poco::JSON::Object::Ptr readJSON(const String & metadata_file_path, const ContextPtr & local_context) const;
//Fields are needed only for providing dynamic polymorphism
std::unordered_map<String, String> column_name_to_physical_name;
DataLakePartitionColumns partition_columns;
};
}
#endif

View File

@ -0,0 +1,167 @@
#include <unordered_set>
#include "config.h"
#if USE_AVRO
#include "Storages/ObjectStorage/DataLakes/Iceberg/ManifestFileImpl.h"
#include "Storages/ObjectStorage/DataLakes/Iceberg/Utils.h"
#include <Columns/ColumnTuple.h>
#include "DataTypes/DataTypeTuple.h"
namespace DB::ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int BAD_ARGUMENTS;
extern const int UNSUPPORTED_METHOD;
}
namespace Iceberg
{
const std::vector<DataFileEntry> & ManifestFileContent::getDataFiles() const
{
return impl->data_files;
}
Int32 ManifestFileContent::getSchemaId() const
{
return impl->schema_id;
}
ManifestFileContent::ManifestFileContent(std::unique_ptr<ManifestFileContentImpl> impl_) : impl(std::move(impl_))
{
}
using namespace DB;
ManifestFileContentImpl::ManifestFileContentImpl(
std::unique_ptr<avro::DataFileReaderBase> manifest_file_reader_,
Int32 format_version_,
const String & common_path,
const DB::FormatSettings & format_settings,
Int32 schema_id_)
{
this->schema_id = schema_id_;
avro::NodePtr root_node = manifest_file_reader_->dataSchema().root();
size_t leaves_num = root_node->leaves();
size_t expected_min_num = format_version_ == 1 ? 3 : 2;
if (leaves_num < expected_min_num)
{
throw Exception(
DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected number of columns {}. Expected at least {}", root_node->leaves(), expected_min_num);
}
avro::NodePtr status_node = root_node->leafAt(0);
if (status_node->type() != avro::Type::AVRO_INT)
{
throw Exception(
DB::ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `status` field should be Int type, got {}",
magic_enum::enum_name(status_node->type()));
}
avro::NodePtr data_file_node = root_node->leafAt(static_cast<int>(leaves_num) - 1);
if (data_file_node->type() != avro::Type::AVRO_RECORD)
{
throw Exception(
DB::ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `data_file` field should be Tuple type, got {}",
magic_enum::enum_name(data_file_node->type()));
}
auto status_col_data_type = AvroSchemaReader::avroNodeToDataType(status_node);
auto data_col_data_type = AvroSchemaReader::avroNodeToDataType(data_file_node);
Block manifest_file_header
= {{status_col_data_type->createColumn(), status_col_data_type, "status"},
{data_col_data_type->createColumn(), data_col_data_type, "data_file"}};
auto columns = parseAvro(*manifest_file_reader_, manifest_file_header, format_settings);
if (columns.size() != 2)
throw Exception(DB::ErrorCodes::ILLEGAL_COLUMN, "Unexpected number of columns. Expected 2, got {}", columns.size());
if (columns.at(0)->getDataType() != TypeIndex::Int32)
{
throw Exception(
DB::ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `status` field should be Int32 type, got {}",
columns.at(0)->getFamilyName());
}
if (columns.at(1)->getDataType() != TypeIndex::Tuple)
{
throw Exception(
DB::ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `file_path` field should be Tuple type, got {}",
columns.at(1)->getFamilyName());
}
const auto * status_int_column = assert_cast<DB::ColumnInt32 *>(columns.at(0).get());
const auto & data_file_tuple_type = assert_cast<const DataTypeTuple &>(*data_col_data_type.get());
const auto * data_file_tuple_column = assert_cast<DB::ColumnTuple *>(columns.at(1).get());
if (status_int_column->size() != data_file_tuple_column->size())
{
throw Exception(
DB::ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `file_path` and `status` have different rows number: {} and {}",
status_int_column->size(),
data_file_tuple_column->size());
}
ColumnPtr file_path_column = data_file_tuple_column->getColumnPtr(data_file_tuple_type.getPositionByName("file_path"));
if (file_path_column->getDataType() != TypeIndex::String)
{
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `file_path` field should be String type, got {}",
file_path_column->getFamilyName());
}
const auto * file_path_string_column = assert_cast<const ColumnString *>(file_path_column.get());
ColumnPtr content_column;
const ColumnInt32 * content_int_column = nullptr;
if (format_version_ == 2)
{
content_column = data_file_tuple_column->getColumnPtr(data_file_tuple_type.getPositionByName("content"));
if (content_column->getDataType() != TypeIndex::Int32)
{
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `content` field should be Int type, got {}",
content_column->getFamilyName());
}
content_int_column = assert_cast<const ColumnInt32 *>(content_column.get());
}
for (size_t i = 0; i < data_file_tuple_column->size(); ++i)
{
DataFileContent content_type = DataFileContent::DATA;
if (format_version_ == 2)
{
content_type = DataFileContent(content_int_column->getElement(i));
if (content_type != DataFileContent::DATA)
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: positional and equality deletes are not supported");
}
const auto status = ManifestEntryStatus(status_int_column->getInt(i));
const auto data_path = std::string(file_path_string_column->getDataAt(i).toView());
const auto pos = data_path.find(common_path);
if (pos == std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected to find {} in data path: {}", common_path, data_path);
const auto file_path = data_path.substr(pos);
this->data_files.push_back({file_path, status, content_type});
}
}
}
#endif

View File

@ -0,0 +1,66 @@
#pragma once
#include "config.h"
#if USE_AVRO
#include <cstdint>
#include <Common/Exception.h>
namespace Iceberg
{
struct ManifestFileContentImpl;
enum class ManifestEntryStatus : uint8_t
{
EXISTING = 0,
ADDED = 1,
DELETED = 2,
};
enum class DataFileContent : uint8_t
{
DATA = 0,
POSITION_DELETES = 1,
EQUALITY_DELETES = 2,
};
struct DataFileEntry
{
String data_file_name;
ManifestEntryStatus status;
DataFileContent content;
};
class ManifestFileContent
{
public:
explicit ManifestFileContent(std::unique_ptr<ManifestFileContentImpl> impl_);
const std::vector<DataFileEntry> & getDataFiles() const;
Int32 getSchemaId() const;
private:
std::unique_ptr<ManifestFileContentImpl> impl;
};
using ManifestFilesByName = std::map<String, ManifestFileContent>;
struct ManifestFileEntry
{
explicit ManifestFileEntry(const ManifestFilesByName::const_iterator & reference_) : reference(reference_) { }
const ManifestFileContent & getContent() const { return reference->second; }
const String & getName() const { return reference->first; }
private:
ManifestFilesByName::const_iterator reference;
};
}
#endif

View File

@ -0,0 +1,54 @@
#pragma once
#include "config.h"
#if USE_AVRO
#include <Processors/Formats/Impl/AvroRowInputFormat.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h>
namespace Iceberg
{
/**
* Manifest file has the following format: '/iceberg_data/db/table_name/metadata/c87bfec7-d36c-4075-ad04-600b6b0f2020-m0.avro'
*
* `manifest file` is different in format version V1 and V2 and has the following contents:
* v1 v2
* status req req
* snapshot_id req opt
* sequence_number opt
* file_sequence_number opt
* data_file req req
* Example format version V1:
* statussnapshot_iddata_file
* 1 2819310504515118887 ('/iceberg_data/db/table_name/data/00000-1-3edca534-15a0-4f74-8a28-4733e0bf1270-00001.parquet','PARQUET',(),100,1070,67108864,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],0)
*
* Example format version V2:
* statussnapshot_idsequence_numberfile_sequence_numberdata_file
* 1 5887006101709926452 (0,'/iceberg_data/db/table_name/data/00000-1-c8045c90-8799-4eac-b957-79a0484e223c-00001.parquet','PARQUET',(),100,1070,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],[],0)
*
* In case of partitioned data we'll have extra directory partition=value:
* statussnapshot_iddata_file
* 1 2252246380142525104 ('/iceberg_data/db/table_name/data/a=0/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00001.parquet','PARQUET',(0),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0\0'),(2,'1')],[(1,'\0\0\0\0\0\0\0\0'),(2,'1')],NULL,[4],0)
* 1 2252246380142525104 ('/iceberg_data/db/table_name/data/a=1/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00002.parquet','PARQUET',(1),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0'),(2,'2')],[(1,'\0\0\0\0\0\0\0'),(2,'2')],NULL,[4],0)
* 1 2252246380142525104 ('/iceberg_data/db/table_name/data/a=2/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00003.parquet','PARQUET',(2),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0'),(2,'3')],[(1,'\0\0\0\0\0\0\0'),(2,'3')],NULL,[4],0)
*
*/
struct ManifestFileContentImpl
{
public:
explicit ManifestFileContentImpl(
std::unique_ptr<avro::DataFileReaderBase> manifest_file_reader_,
Int32 format_version_,
const String & common_path,
const DB::FormatSettings & format_settings,
Int32 schema_id_);
Int32 schema_id;
std::vector<DataFileEntry> data_files;
};
}
#endif

View File

@ -0,0 +1,357 @@
#include <Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h>
#include <Poco/JSON/Array.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <IO/ReadBufferFromString.h>
#include <Common/Exception.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <Formats/FormatFactory.h>
#include <IO/ReadHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int UNSUPPORTED_METHOD;
}
namespace
{
bool operator==(const Poco::JSON::Object & first, const Poco::JSON::Object & second)
{
std::stringstream first_string_stream; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
std::stringstream second_string_stream; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
first.stringify(first_string_stream);
if (!first_string_stream)
{
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "JSON Parsing failed");
}
second.stringify(second_string_stream);
if (!second_string_stream)
{
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "JSON Parsing failed");
}
return first_string_stream.str() == second_string_stream.str();
}
std::pair<size_t, size_t> parseDecimal(const String & type_name)
{
DB::ReadBufferFromString buf(std::string_view(type_name.begin() + 8, type_name.end() - 1));
size_t precision;
size_t scale;
readIntText(precision, buf);
skipWhitespaceIfAny(buf);
assertChar(',', buf);
skipWhitespaceIfAny(buf);
tryReadIntText(scale, buf);
return {precision, scale};
}
}
void IcebergSchemaProcessor::addIcebergTableSchema(Poco::JSON::Object::Ptr schema_ptr)
{
Int32 schema_id = schema_ptr->getValue<Int32>("schema-id");
if (iceberg_table_schemas_by_ids.contains(schema_id))
{
chassert(clickhouse_table_schemas_by_ids.contains(schema_id));
chassert(*iceberg_table_schemas_by_ids.at(schema_id) == *schema_ptr);
}
else
{
iceberg_table_schemas_by_ids[schema_id] = schema_ptr;
auto fields = schema_ptr->get("fields").extract<Poco::JSON::Array::Ptr>();
auto clickhouse_schema = std::make_shared<NamesAndTypesList>();
for (size_t i = 0; i != fields->size(); ++i)
{
auto field = fields->getObject(static_cast<UInt32>(i));
auto name = field->getValue<String>("name");
bool required = field->getValue<bool>("required");
clickhouse_schema->push_back(NameAndTypePair{name, getFieldType(field, "type", required)});
}
clickhouse_table_schemas_by_ids[schema_id] = clickhouse_schema;
}
}
DataTypePtr IcebergSchemaProcessor::getSimpleType(const String & type_name)
{
if (type_name == "boolean")
return DataTypeFactory::instance().get("Bool");
if (type_name == "int")
return std::make_shared<DataTypeInt32>();
if (type_name == "long")
return std::make_shared<DataTypeInt64>();
if (type_name == "float")
return std::make_shared<DataTypeFloat32>();
if (type_name == "double")
return std::make_shared<DataTypeFloat64>();
if (type_name == "date")
return std::make_shared<DataTypeDate>();
if (type_name == "time")
return std::make_shared<DataTypeInt64>();
if (type_name == "timestamp")
return std::make_shared<DataTypeDateTime64>(6);
if (type_name == "timestamptz")
return std::make_shared<DataTypeDateTime64>(6, "UTC");
if (type_name == "string" || type_name == "binary")
return std::make_shared<DataTypeString>();
if (type_name == "uuid")
return std::make_shared<DataTypeUUID>();
if (type_name.starts_with("fixed[") && type_name.ends_with(']'))
{
ReadBufferFromString buf(std::string_view(type_name.begin() + 6, type_name.end() - 1));
size_t n;
readIntText(n, buf);
return std::make_shared<DataTypeFixedString>(n);
}
if (type_name.starts_with("decimal(") && type_name.ends_with(')'))
{
ReadBufferFromString buf(std::string_view(type_name.begin() + 8, type_name.end() - 1));
auto [precision, scale] = parseDecimal(type_name);
return createDecimal<DataTypeDecimal>(precision, scale);
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown Iceberg type: {}", type_name);
}
DataTypePtr IcebergSchemaProcessor::getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type)
{
String type_name = type->getValue<String>("type");
if (type_name == "list")
{
bool element_required = type->getValue<bool>("element-required");
auto element_type = getFieldType(type, "element", element_required);
return std::make_shared<DataTypeArray>(element_type);
}
if (type_name == "map")
{
auto key_type = getFieldType(type, "key", true);
auto value_required = type->getValue<bool>("value-required");
auto value_type = getFieldType(type, "value", value_required);
return std::make_shared<DataTypeMap>(key_type, value_type);
}
if (type_name == "struct")
{
DataTypes element_types;
Names element_names;
auto fields = type->get("fields").extract<Poco::JSON::Array::Ptr>();
element_types.reserve(fields->size());
element_names.reserve(fields->size());
for (size_t i = 0; i != fields->size(); ++i)
{
auto field = fields->getObject(static_cast<Int32>(i));
element_names.push_back(field->getValue<String>("name"));
auto required = field->getValue<bool>("required");
element_types.push_back(getFieldType(field, "type", required));
}
return std::make_shared<DataTypeTuple>(element_types, element_names);
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown Iceberg type: {}", type_name);
}
DataTypePtr IcebergSchemaProcessor::getFieldType(const Poco::JSON::Object::Ptr & field, const String & type_key, bool required)
{
if (field->isObject(type_key))
return getComplexTypeFromObject(field->getObject(type_key));
auto type = field->get(type_key);
if (type.isString())
{
const String & type_name = type.extract<String>();
auto data_type = getSimpleType(type_name);
return required ? data_type : makeNullable(data_type);
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected 'type' field: {}", type.toString());
}
/**
* Iceberg allows only three types of primitive type conversion:
* int -> long
* float -> double
* decimal(P, S) -> decimal(P', S) where P' > P
* This function checks if `old_type` and `new_type` satisfy to one of these conditions.
**/
bool IcebergSchemaProcessor::allowPrimitiveTypeConversion(const String & old_type, const String & new_type)
{
bool allowed_type_conversion = (old_type == new_type);
allowed_type_conversion |= (old_type == "int") && (new_type == "long");
allowed_type_conversion |= (old_type == "float") && (new_type == "double");
if (old_type.starts_with("decimal(") && old_type.ends_with(')') && new_type.starts_with("decimal(") && new_type.ends_with(")"))
{
auto [old_precision, old_scale] = parseDecimal(old_type);
auto [new_precision, new_scale] = parseDecimal(new_type);
allowed_type_conversion |= (old_precision <= new_precision) && (old_scale == new_scale);
}
return allowed_type_conversion;
}
// Ids are passed only for error logging purposes
std::shared_ptr<ActionsDAG> IcebergSchemaProcessor::getSchemaTransformationDag(
const Poco::JSON::Object::Ptr & old_schema, const Poco::JSON::Object::Ptr & new_schema, Int32 old_id, Int32 new_id)
{
std::unordered_map<size_t, std::pair<Poco::JSON::Object::Ptr, const ActionsDAG::Node *>> old_schema_entries;
auto old_schema_fields = old_schema->get("fields").extract<Poco::JSON::Array::Ptr>();
std::shared_ptr<ActionsDAG> dag = std::make_shared<ActionsDAG>();
auto & outputs = dag->getOutputs();
for (size_t i = 0; i != old_schema_fields->size(); ++i)
{
auto field = old_schema_fields->getObject(static_cast<UInt32>(i));
size_t id = field->getValue<size_t>("id");
auto name = field->getValue<String>("name");
bool required = field->getValue<bool>("required");
old_schema_entries[id] = {field, &dag->addInput(name, getFieldType(field, "type", required))};
}
auto new_schema_fields = new_schema->get("fields").extract<Poco::JSON::Array::Ptr>();
for (size_t i = 0; i != new_schema_fields->size(); ++i)
{
auto field = new_schema_fields->getObject(static_cast<UInt32>(i));
size_t id = field->getValue<size_t>("id");
auto name = field->getValue<String>("name");
bool required = field->getValue<bool>("required");
auto type = getFieldType(field, "type", required);
auto old_node_it = old_schema_entries.find(id);
if (old_node_it != old_schema_entries.end())
{
auto [old_json, old_node] = old_node_it->second;
if (field->isObject("type"))
{
if (*old_json != *field)
{
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Schema evolution is not supported for complex types yet, field id is {}, old schema id is {}, new schema id "
"is {}",
id,
old_id,
new_id);
}
else
{
outputs.push_back(old_node);
}
}
else
{
if (old_json->isObject("type"))
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Can't cast primitive type to the complex type, field id is {}, old schema id is {}, new schema id is {}",
id,
old_id,
new_id);
}
String old_type = old_json->getValue<String>("type");
String new_type = field->getValue<String>("type");
const ActionsDAG::Node * node = old_node;
if (old_type == new_type)
{
if (old_json->getValue<String>("name") != name)
{
node = &dag->addAlias(*old_node, name);
}
}
else if (allowPrimitiveTypeConversion(old_type, new_type))
{
node = &dag->addCast(*old_node, getFieldType(field, "type", required), name);
}
outputs.push_back(node);
}
}
else
{
if (field->isObject("type"))
{
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Adding a default column with id {} and complex type is not supported yet. Old schema id is {}, new schema id is "
"{}",
id,
old_id,
new_id);
}
if (!type->isNullable())
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot add a column with id {} with required values to the table during schema evolution. This is forbidden by "
"Iceberg format specification. Old schema id is {}, new "
"schema id is {}",
id,
old_id,
new_id);
}
ColumnPtr default_type_column = type->createColumnConstWithDefaultValue(0);
const auto & constant = dag->addColumn({default_type_column, type, name});
outputs.push_back(&constant);
}
}
return dag;
}
std::shared_ptr<const ActionsDAG> IcebergSchemaProcessor::getSchemaTransformationDagByIds(Int32 old_id, Int32 new_id)
{
if (old_id == new_id)
{
return nullptr;
}
std::lock_guard lock(mutex);
auto required_transform_dag_it = transform_dags_by_ids.find({old_id, new_id});
if (required_transform_dag_it != transform_dags_by_ids.end())
{
return required_transform_dag_it->second;
}
auto old_schema_it = iceberg_table_schemas_by_ids.find(old_id);
if (old_schema_it == iceberg_table_schemas_by_ids.end())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema with schema-id {} is unknown", old_id);
}
auto new_schema_it = iceberg_table_schemas_by_ids.find(new_id);
if (new_schema_it == iceberg_table_schemas_by_ids.end())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema with schema-id {} is unknown", new_id);
}
return transform_dags_by_ids[{old_id, new_id}]
= getSchemaTransformationDag(old_schema_it->second, new_schema_it->second, old_id, new_id);
}
std::shared_ptr<NamesAndTypesList> IcebergSchemaProcessor::getClickhouseTableSchemaById(Int32 id)
{
auto it = clickhouse_table_schemas_by_ids.find(id);
if (it == clickhouse_table_schemas_by_ids.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema with id {} is unknown", id);
return it->second;
}
}

View File

@ -0,0 +1,100 @@
#pragma once
#include <memory>
#include <mutex>
#include "config.h"
#include <Core/NamesAndTypes.h>
#include <Core/Types.h>
#include <Interpreters/ActionsDAG.h>
#include <Poco/JSON/Array.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <unordered_map>
namespace DB
{
/**
* Iceberg supports the following data types (see https://iceberg.apache.org/spec/#schemas-and-data-types):
* - Primitive types:
* - boolean
* - int
* - long
* - float
* - double
* - decimal(P, S)
* - date
* - time (time of day in microseconds since midnight)
* - timestamp (in microseconds since 1970-01-01)
* - timestamptz (timestamp with timezone, stores values in UTC timezone)
* - string
* - uuid
* - fixed(L) (fixed-length byte array of length L)
* - binary
* - Complex types:
* - struct(field1: Type1, field2: Type2, ...) (tuple of typed values)
* - list(nested_type)
* - map(Key, Value)
*
* Example of table schema in metadata:
* {
* "type" : "struct",
* "schema-id" : 0,
* "fields" : [
* {
* "id" : 1,
* "name" : "id",
* "required" : false,
* "type" : "long"
* },
* {
* "id" : 2,
* "name" : "array",
* "required" : false,
* "type" : {
* "type" : "list",
* "element-id" : 5,
* "element" : "int",
* "element-required" : false
* },
* {
* "id" : 3,
* "name" : "data",
* "required" : false,
* "type" : "binary"
* }
* }
*/
class IcebergSchemaProcessor
{
using Node = ActionsDAG::Node;
public:
void addIcebergTableSchema(Poco::JSON::Object::Ptr schema_ptr);
std::shared_ptr<NamesAndTypesList> getClickhouseTableSchemaById(Int32 id);
std::shared_ptr<const ActionsDAG> getSchemaTransformationDagByIds(Int32 old_id, Int32 new_id);
private:
std::unordered_map<Int32, Poco::JSON::Object::Ptr> iceberg_table_schemas_by_ids;
std::unordered_map<Int32, std::shared_ptr<NamesAndTypesList>> clickhouse_table_schemas_by_ids;
std::map<std::pair<Int32, Int32>, std::shared_ptr<ActionsDAG>> transform_dags_by_ids;
NamesAndTypesList getSchemaType(const Poco::JSON::Object::Ptr & schema);
DataTypePtr getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type);
DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & type_key, bool required);
DataTypePtr getSimpleType(const String & type_name);
bool allowPrimitiveTypeConversion(const String & old_type, const String & new_type);
const Node * getDefaultNodeForField(const Poco::JSON::Object::Ptr & field);
std::shared_ptr<ActionsDAG> getSchemaTransformationDag(
const Poco::JSON::Object::Ptr & old_schema, const Poco::JSON::Object::Ptr & new_schema, Int32 old_id, Int32 new_id);
std::mutex mutex;
};
}

View File

@ -0,0 +1,36 @@
#pragma once
#include "config.h"
#if USE_AVRO
# include "Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h"
namespace Iceberg
{
class ManifestList
{
public:
explicit ManifestList(std::vector<ManifestFileEntry> manifest_files_) : manifest_files(std::move(manifest_files_)) { }
const std::vector<ManifestFileEntry> & getManifestFiles() const { return manifest_files; }
private:
std::vector<ManifestFileEntry> manifest_files;
};
using ManifestListsByName = std::map<String, ManifestList>;
class IcebergSnapshot
{
public:
explicit IcebergSnapshot(const ManifestListsByName::const_iterator & reference_) : reference(reference_) { }
const ManifestList & getManifestList() const { return reference->second; }
const String & getName() const { return reference->first; }
private:
ManifestListsByName::const_iterator reference;
};
}
#endif

View File

@ -0,0 +1,32 @@
#include "config.h"
#if USE_AVRO
#include <Processors/Formats/Impl/AvroRowInputFormat.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Utils.h>
namespace Iceberg
{
using namespace DB;
MutableColumns parseAvro(avro::DataFileReaderBase & file_reader, const Block & header, const FormatSettings & settings)
{
auto deserializer = std::make_unique<DB::AvroDeserializer>(header, file_reader.dataSchema(), true, true, settings);
MutableColumns columns = header.cloneEmptyColumns();
file_reader.init();
RowReadExtension ext;
while (file_reader.hasMore())
{
file_reader.decr();
deserializer->deserializeRow(columns, file_reader.decoder(), ext);
}
return columns;
}
}
#endif

View File

@ -0,0 +1,15 @@
#pragma once
#include "config.h"
#if USE_AVRO
#include <Processors/Formats/Impl/AvroRowInputFormat.h>
namespace Iceberg
{
DB::MutableColumns parseAvro(avro::DataFileReaderBase & file_reader, const DB::Block & header, const DB::FormatSettings & settings);
}
#endif

View File

@ -1,5 +1,5 @@
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Core/ColumnWithTypeAndName.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Core/Settings.h>
#include <Formats/FormatFactory.h>

View File

@ -506,6 +506,7 @@ std::unique_ptr<ReadBufferFromFileBase> StorageObjectStorageSource::createReadBu
std::unique_ptr<ReadBufferFromFileBase> impl;
if (use_cache)
{
chassert(object_info.metadata.has_value());
if (object_info.metadata->etag.empty())
{
LOG_WARNING(log, "Cannot use filesystem cache, no etag specified");
@ -540,9 +541,13 @@ std::unique_ptr<ReadBufferFromFileBase> StorageObjectStorageSource::createReadBu
/* read_until_position */std::nullopt,
context_->getFilesystemCacheLog());
LOG_TEST(log, "Using filesystem cache `{}` (path: {}, etag: {}, hash: {})",
filesystem_cache_name, object_info.getPath(),
object_info.metadata->etag, toString(hash.get128()));
LOG_TEST(
log,
"Using filesystem cache `{}` (path: {}, etag: {}, hash: {})",
filesystem_cache_name,
object_info.getPath(),
object_info.metadata->etag,
toString(hash.get128()));
}
}

View File

@ -1032,9 +1032,9 @@ void StorageKeeperMap::restoreDataImpl(
if (!dynamic_cast<ReadBufferFromFileBase *>(in.get()))
{
temp_data_file.emplace(temporary_disk);
auto out = std::make_unique<WriteBufferFromFile>(temp_data_file->getAbsolutePath());
copyData(*in, *out);
out.reset();
auto out = WriteBufferFromFile(temp_data_file->getAbsolutePath());
copyData(*in, out);
out.finalize();
in = createReadBufferFromFileBase(temp_data_file->getAbsolutePath(), {});
}
std::unique_ptr<ReadBufferFromFileBase> in_from_file{static_cast<ReadBufferFromFileBase *>(in.release())};

View File

@ -2185,7 +2185,7 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd
size_t failed_op_index = zkutil::getFailedOpIndex(e, responses);
if (failed_op_index < num_check_ops)
{
LOG_INFO(log, "The part {} on a replica suddenly appeared, will recheck checksums", ops[failed_op_index]->getPath());
LOG_DEBUG(log, "The part {} on a replica suddenly appeared, will recheck checksums", ops[failed_op_index]->getPath());
continue;
}
}
@ -8335,6 +8335,9 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
}
LOG_INFO(log, "Will try to attach {} partitions", partitions.size());
if (partitions.empty())
return;
const Stopwatch watch;
ProfileEventsScope profile_events_scope;
const auto zookeeper = getZooKeeper();

View File

@ -720,7 +720,6 @@ def test_delete_files(started_cluster, format_version, storage_type):
)
assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 0
assert instance.contains_in_log("Processing delete file for path")
write_iceberg_from_df(
spark,

View File

@ -1,4 +1,5 @@
-- Obsolete server settings
use_legacy_mongodb_integration
-- Obsolete general settings
1
-- Obsolete merge tree settings

View File

@ -0,0 +1,6 @@
FIX ISSUE #69143
a
b
EXPLAIN SYNTAX OF UDF
SELECT ((4 + 2) + 1 AS y, y + 2)
SELECT ((4 + 2) + 1, ((4 + 2) + 1) + 2)

View File

@ -0,0 +1,51 @@
-- Tags: no-parallel
SET skip_redundant_aliases_in_udf = 0;
SELECT 'FIX ISSUE #69143';
DROP TABLE IF EXISTS test_table;
CREATE FUNCTION IF NOT EXISTS 03274_test_function AS ( input_column_name ) -> ((
'1' AS a,
input_column_name AS input_column_name
).2);
CREATE TABLE IF NOT EXISTS test_table
(
`metadata_a` String,
`metadata_b` String
)
ENGINE = MergeTree()
ORDER BY tuple();
ALTER TABLE test_table ADD COLUMN mat_a String MATERIALIZED 03274_test_function(metadata_a);
ALTER TABLE test_table MATERIALIZE COLUMN `mat_a`;
ALTER TABLE test_table ADD COLUMN mat_b String MATERIALIZED 03274_test_function(metadata_b); -- { serverError MULTIPLE_EXPRESSIONS_FOR_ALIAS }
SET skip_redundant_aliases_in_udf = 1;
ALTER TABLE test_table ADD COLUMN mat_b String MATERIALIZED 03274_test_function(metadata_b);
ALTER TABLE test_table MATERIALIZE COLUMN `mat_b`;
INSERT INTO test_table SELECT 'a', 'b';
SELECT mat_a FROM test_table;
SELECT mat_b FROM test_table;
SELECT 'EXPLAIN SYNTAX OF UDF';
CREATE FUNCTION IF NOT EXISTS test_03274 AS ( x ) -> ((x + 1 as y, y + 2));
SET skip_redundant_aliases_in_udf = 0;
EXPLAIN SYNTAX SELECT test_03274(4 + 2);
SET skip_redundant_aliases_in_udf = 1;
EXPLAIN SYNTAX SELECT test_03274(4 + 2);
DROP FUNCTION 03274_test_function;
DROP FUNCTION test_03274;
DROP TABLE IF EXISTS test_table;

View File

@ -0,0 +1,21 @@
#!/usr/bin/env python3
# Tags: no-fasttest
import os
from clickhouse_driver import Client
def run(database):
client = Client("localhost", user="default", password="")
client.execute(
f"CREATE TABLE IF NOT EXISTS {database}.test (x Int32) ENGINE = Memory"
)
client.execute(f"INSERT INTO {database}.test (x) VALUES", [{"x": 100}])
result = client.execute(f"SELECT * FROM {database}.test")
print(result)
if __name__ == "__main__":
database = os.environ["CLICKHOUSE_DATABASE"]
run(database)

View File

@ -0,0 +1 @@
[(100,)]

View File

@ -0,0 +1,12 @@
SELECT json.^sub.object.path
FROM test
SELECT json.^`s^b`.object.path
FROM test
SELECT json.`^sub`.object.path
FROM test
SELECT json.path.:UInt64
FROM test
SELECT json.path.:`Array(JSON)`
FROM test
SELECT json.path.`:UInt64`
FROM test

View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_FORMAT --query "select json.^sub.object.path from test";
$CLICKHOUSE_FORMAT --query "select json.^\`s^b\`.object.path from test";
$CLICKHOUSE_FORMAT --query "select json.\`^sub\`.object.path from test";
$CLICKHOUSE_FORMAT --query "select json.path.:UInt64 from test";
$CLICKHOUSE_FORMAT --query "select json.path.:\`Array(JSON)\` from test";
$CLICKHOUSE_FORMAT --query "select json.path.\`:UInt64\` from test";