From fb4a230eee1051cd17d627c370eb1ffe318f2dd5 Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 30 Apr 2024 10:44:25 +0200 Subject: [PATCH 01/20] Support reading partitioed DeltaLake columns --- src/Processors/Chunk.cpp | 4 +- .../DataLakes/DeltaLakeMetadataParser.cpp | 265 ++++++++++++++++-- .../DataLakes/DeltaLakeMetadataParser.h | 13 +- src/Storages/DataLakes/HudiMetadataParser.cpp | 19 +- src/Storages/DataLakes/HudiMetadataParser.h | 11 +- src/Storages/DataLakes/IStorageDataLake.h | 112 +++++--- .../DataLakes/Iceberg/StorageIceberg.cpp | 8 +- .../DataLakes/Iceberg/StorageIceberg.h | 4 +- src/Storages/DataLakes/PartitionColumns.h | 17 ++ src/Storages/DataLakes/S3MetadataReader.cpp | 7 +- src/Storages/StorageS3.cpp | 53 +++- src/Storages/StorageS3.h | 9 +- src/TableFunctions/ITableFunction.cpp | 2 +- src/TableFunctions/ITableFunctionDataLake.h | 15 +- src/TableFunctions/TableFunctionS3.cpp | 2 +- src/TableFunctions/TableFunctionS3Cluster.cpp | 2 +- tests/integration/test_storage_delta/test.py | 95 +++++++ 17 files changed, 534 insertions(+), 104 deletions(-) create mode 100644 src/Storages/DataLakes/PartitionColumns.h diff --git a/src/Processors/Chunk.cpp b/src/Processors/Chunk.cpp index 2631f665f9c..8e3ca0b03b3 100644 --- a/src/Processors/Chunk.cpp +++ b/src/Processors/Chunk.cpp @@ -125,7 +125,7 @@ void Chunk::addColumn(size_t position, ColumnPtr column) if (position >= columns.size()) throw Exception(ErrorCodes::POSITION_OUT_OF_BOUND, "Position {} out of bound in Chunk::addColumn(), max position = {}", - position, columns.size() - 1); + position, columns.size() ? columns.size() - 1 : 0); if (empty()) num_rows = column->size(); else if (column->size() != num_rows) @@ -143,7 +143,7 @@ void Chunk::erase(size_t position) if (position >= columns.size()) throw Exception(ErrorCodes::POSITION_OUT_OF_BOUND, "Position {} out of bound in Chunk::erase(), max position = {}", - toString(position), toString(columns.size() - 1)); + toString(position), toString(columns.size() ? columns.size() - 1 : 0)); columns.erase(columns.begin() + position); } diff --git a/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp b/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp index 14a912a180d..50b6ca83cb4 100644 --- a/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp +++ b/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp @@ -17,6 +17,22 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include namespace fs = std::filesystem; @@ -26,6 +42,7 @@ namespace DB namespace ErrorCodes { extern const int INCORRECT_DATA; + extern const int LOGICAL_ERROR; extern const int BAD_ARGUMENTS; } @@ -65,9 +82,17 @@ struct DeltaLakeMetadataParser::Impl * An action changes one aspect of the table's state, for example, adding or removing a file. * Note: it is not a valid json, but a list of json's, so we read it in a while cycle. */ - std::set processMetadataFiles(const Configuration & configuration, ContextPtr context) + struct DeltaLakeMetadata + { + NamesAndTypesList schema; + Strings data_files; + DataLakePartitionColumns partition_columns; + }; + DeltaLakeMetadata processMetadataFiles(const Configuration & configuration, ContextPtr context) { std::set result_files; + NamesAndTypesList current_schema; + DataLakePartitionColumns current_partition_columns; const auto checkpoint_version = getCheckpointIfExists(result_files, configuration, context); if (checkpoint_version) @@ -81,7 +106,7 @@ struct DeltaLakeMetadataParser::Impl if (!MetadataReadHelper::exists(file_path, configuration)) break; - processMetadataFile(file_path, result_files, configuration, context); + processMetadataFile(file_path, result_files, current_schema, current_partition_columns, configuration, context); } LOG_TRACE( @@ -94,10 +119,10 @@ struct DeltaLakeMetadataParser::Impl configuration, deltalake_metadata_directory, metadata_file_suffix); for (const String & key : keys) - processMetadataFile(key, result_files, configuration, context); + processMetadataFile(key, result_files, current_schema, current_partition_columns, configuration, context); } - return result_files; + return DeltaLakeMetadata{current_schema, Strings(result_files.begin(), result_files.end()), current_partition_columns}; } /** @@ -132,6 +157,8 @@ struct DeltaLakeMetadataParser::Impl void processMetadataFile( const String & key, std::set & result, + NamesAndTypesList & file_schema, + DataLakePartitionColumns & file_partition_columns, const Configuration & configuration, ContextPtr context) { @@ -153,20 +180,217 @@ struct DeltaLakeMetadataParser::Impl if (json_str.empty()) continue; - const JSON json(json_str); - if (json.has("add")) + Poco::JSON::Parser parser; + Poco::Dynamic::Var json = parser.parse(json_str); + Poco::JSON::Object::Ptr object = json.extract(); + + if (object->has("add")) { - const auto path = json["add"]["path"].getString(); + auto add_object = object->get("add").extract(); + auto path = add_object->getValue("path"); result.insert(fs::path(configuration.getPath()) / path); + + auto filename = fs::path(path).filename().string(); + auto it = file_partition_columns.find(filename); + if (it == file_partition_columns.end()) + { + auto partition_values = add_object->get("partitionValues").extract(); + if (partition_values->size()) + { + auto & current_partition_columns = file_partition_columns[filename]; + for (const auto & name : partition_values->getNames()) + { + const auto value = partition_values->getValue(name); + auto name_and_type = file_schema.tryGetByName(name); + if (!name_and_type) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No such column in schema: {}", name); + + auto field = getFieldValue(value, name_and_type->type); + current_partition_columns.emplace_back(*name_and_type, field); + + LOG_TEST(log, "Partition {} value is {} (for {})", name, value, filename); + } + } + } } - else if (json.has("remove")) + else if (object->has("remove")) { - const auto path = json["remove"]["path"].getString(); + auto path = object->get("remove").extract()->getValue("path"); result.erase(fs::path(configuration.getPath()) / path); } + if (file_schema.empty()) + { + // std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + // object->stringify(oss); + // LOG_TEST(log, "Metadata: {}", oss.str()); + + if (object->has("metaData")) + { + const auto metadata_object = object->get("metaData").extract(); + const auto schema_object = metadata_object->getValue("schemaString"); + + Poco::JSON::Parser p; + Poco::Dynamic::Var fields_json = parser.parse(schema_object); + Poco::JSON::Object::Ptr fields_object = fields_json.extract(); + + const auto fields = fields_object->get("fields").extract(); + for (size_t i = 0; i < fields->size(); ++i) + { + const auto field = fields->getObject(static_cast(i)); + auto name = field->getValue("name"); + auto type = field->getValue("type"); + auto is_nullable = field->getValue("nullable"); + + file_schema.push_back({name, getFieldType(field, "type", is_nullable)}); + } + } + } + /// TODO: Check if schema in each file is the same? } } + DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & type_key, bool is_nullable) + { + if (field->isObject(type_key)) + return getComplexTypeFromObject(field->getObject(type_key)); + + auto type = field->get(type_key); + if (type.isString()) + { + const String & type_name = type.extract(); + auto data_type = getSimpleTypeByName(type_name); + return is_nullable ? makeNullable(data_type) : data_type; + } + + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected 'type' field: {}", type.toString()); + } + + Field getFieldValue(const String & value, DataTypePtr data_type) + { + DataTypePtr check_type; + if (data_type->isNullable()) + check_type = static_cast(data_type.get())->getNestedType(); + else + check_type = data_type; + + WhichDataType which(check_type->getTypeId()); + if (which.isStringOrFixedString()) + return value; + else if (which.isInt8()) + return parse(value); + else if (which.isInt16()) + return parse(value); + else if (which.isInt32()) + return parse(value); + else if (which.isInt64()) + return parse(value); + else if (which.isFloat32()) + return parse(value); + else if (which.isFloat64()) + return parse(value); + else if (which.isDate()) + return UInt16{LocalDate{std::string(value)}.getDayNum()}; + else if (which.isDate32()) + return Int32{LocalDate{std::string(value)}.getExtenedDayNum()}; + else if (which.isDateTime64()) + { + ReadBufferFromString in(value); + DateTime64 time = 0; + readDateTime64Text(time, 6, in, assert_cast(data_type.get())->getTimeZone()); + return time; + } + // else if (which.isDecimal32()) + // return parse(value); + // else if (which.isDecimal64()) + // return parse(value); + // else if (which.isDecimal128()) + // return parse(value); + // else if (which.isDecimal256()) + // return parse(value); + + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported DeltaLake type for {}", check_type->getColumnType()); + } + + DataTypePtr getSimpleTypeByName(const String & type_name) + { + /// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#primitive-types + + if (type_name == "string" || type_name == "binary") + return std::make_shared(); + if (type_name == "long") + return std::make_shared(); + if (type_name == "integer") + return std::make_shared(); + if (type_name == "short") + return std::make_shared(); + if (type_name == "byte") + return std::make_shared(); + if (type_name == "float") + return std::make_shared(); + if (type_name == "double") + return std::make_shared(); + if (type_name == "boolean") + return DataTypeFactory::instance().get("Bool"); + if (type_name == "date") + return std::make_shared(); + if (type_name == "timestamp") + return std::make_shared(6); + if (type_name.starts_with("decimal(") && type_name.ends_with(')')) + { + ReadBufferFromString buf(std::string_view(type_name.begin() + 8, type_name.end() - 1)); + size_t precision; + size_t scale; + readIntText(precision, buf); + skipWhitespaceIfAny(buf); + assertChar(',', buf); + skipWhitespaceIfAny(buf); + tryReadIntText(scale, buf); + return createDecimal(precision, scale); + } + + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported DeltaLake type: {}", type_name); + } + + DataTypePtr getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type) + { + String type_name = type->getValue("type"); + + if (type_name == "struct") + { + DataTypes element_types; + Names element_names; + auto fields = type->get("fields").extract(); + element_types.reserve(fields->size()); + element_names.reserve(fields->size()); + for (size_t i = 0; i != fields->size(); ++i) + { + auto field = fields->getObject(static_cast(i)); + element_names.push_back(field->getValue("name")); + auto required = field->getValue("required"); + element_types.push_back(getFieldType(field, "type", required)); + } + + return std::make_shared(element_types, element_names); + } + + if (type_name == "array") + { + bool is_nullable = type->getValue("containsNull"); + auto element_type = getFieldType(type, "elementType", is_nullable); + return std::make_shared(element_type); + } + + if (type_name == "map") + { + bool is_nullable = type->getValue("containsNull"); + auto key_type = getFieldType(type, "keyType", /* is_nullable */false); + auto value_type = getFieldType(type, "valueType", is_nullable); + return std::make_shared(key_type, value_type); + } + + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported DeltaLake type: {}", type_name); + } + /** * Checkpoints in delta-lake are created each 10 commits by default. * Latest checkpoint is written in _last_checkpoint file: _delta_log/_last_checkpoint @@ -272,8 +496,8 @@ struct DeltaLakeMetadataParser::Impl arrow::default_memory_pool(), &reader)); - std::shared_ptr schema; - THROW_ARROW_NOT_OK(reader->GetSchema(&schema)); + std::shared_ptr file_schema; + THROW_ARROW_NOT_OK(reader->GetSchema(&file_schema)); ArrowColumnToCHColumn column_reader( header, "Parquet", @@ -318,20 +542,19 @@ struct DeltaLakeMetadataParser::Impl template -DeltaLakeMetadataParser::DeltaLakeMetadataParser() : impl(std::make_unique()) -{ -} - -template -Strings DeltaLakeMetadataParser::getFiles(const Configuration & configuration, ContextPtr context) +DeltaLakeMetadataParser::DeltaLakeMetadataParser(const Configuration & configuration, ContextPtr context) + : impl(std::make_unique()) { auto result = impl->processMetadataFiles(configuration, context); - return Strings(result.begin(), result.end()); + data_files = result.data_files; + schema = result.schema; + partition_columns = result.partition_columns; + + LOG_TRACE(impl->log, "Found {} data files, {} partition files, schema: {}", + data_files.size(), partition_columns.size(), schema.toString()); } -template DeltaLakeMetadataParser::DeltaLakeMetadataParser(); -template Strings DeltaLakeMetadataParser::getFiles( - const StorageS3::Configuration & configuration, ContextPtr); +template DeltaLakeMetadataParser::DeltaLakeMetadataParser(const StorageS3::Configuration & configuration, ContextPtr context); } #endif diff --git a/src/Storages/DataLakes/DeltaLakeMetadataParser.h b/src/Storages/DataLakes/DeltaLakeMetadataParser.h index df7276b90b4..58cf7acd2a3 100644 --- a/src/Storages/DataLakes/DeltaLakeMetadataParser.h +++ b/src/Storages/DataLakes/DeltaLakeMetadataParser.h @@ -1,7 +1,9 @@ #pragma once +#include #include #include +#include "PartitionColumns.h" namespace DB { @@ -10,13 +12,20 @@ template struct DeltaLakeMetadataParser { public: - DeltaLakeMetadataParser(); + DeltaLakeMetadataParser(const Configuration & configuration, ContextPtr context); - Strings getFiles(const Configuration & configuration, ContextPtr context); + Strings getFiles() { return data_files; } + + NamesAndTypesList getTableSchema() const { return schema; } + + DataLakePartitionColumns getPartitionColumns() const { return partition_columns; } private: struct Impl; std::shared_ptr impl; + NamesAndTypesList schema; + DataLakePartitionColumns partition_columns; + Strings data_files; }; } diff --git a/src/Storages/DataLakes/HudiMetadataParser.cpp b/src/Storages/DataLakes/HudiMetadataParser.cpp index 699dfe8fda0..ad66ba69258 100644 --- a/src/Storages/DataLakes/HudiMetadataParser.cpp +++ b/src/Storages/DataLakes/HudiMetadataParser.cpp @@ -61,7 +61,7 @@ struct HudiMetadataParser::Impl String key; UInt64 timestamp = 0; }; - std::unordered_map> data_files; + std::unordered_map> files; for (const auto & key : keys) { @@ -76,7 +76,7 @@ struct HudiMetadataParser::Impl const auto & file_id = file_parts[0]; const auto timestamp = parse(file_parts[2]); - auto & file_info = data_files[partition][file_id]; + auto & file_info = files[partition][file_id]; if (file_info.timestamp == 0 || file_info.timestamp < timestamp) { file_info.key = std::move(key); @@ -85,7 +85,7 @@ struct HudiMetadataParser::Impl } Strings result; - for (auto & [partition, partition_data] : data_files) + for (auto & [partition, partition_data] : files) { LOG_TRACE(log, "Adding {} data files from partition {}", partition, partition_data.size()); for (auto & [file_id, file_data] : partition_data) @@ -97,19 +97,12 @@ struct HudiMetadataParser::Impl template -HudiMetadataParser::HudiMetadataParser() : impl(std::make_unique()) +HudiMetadataParser::HudiMetadataParser(const Configuration & configuration, ContextPtr) : impl(std::make_unique()) { + data_files = impl->processMetadataFiles(configuration); } -template -Strings HudiMetadataParser::getFiles(const Configuration & configuration, ContextPtr) -{ - return impl->processMetadataFiles(configuration); -} - -template HudiMetadataParser::HudiMetadataParser(); -template Strings HudiMetadataParser::getFiles( - const StorageS3::Configuration & configuration, ContextPtr); +template HudiMetadataParser::HudiMetadataParser(const StorageS3::Configuration & configuration, ContextPtr); } diff --git a/src/Storages/DataLakes/HudiMetadataParser.h b/src/Storages/DataLakes/HudiMetadataParser.h index 6727ba2f718..9e2901a9d24 100644 --- a/src/Storages/DataLakes/HudiMetadataParser.h +++ b/src/Storages/DataLakes/HudiMetadataParser.h @@ -1,7 +1,9 @@ #pragma once +#include #include #include +#include "PartitionColumns.h" namespace DB { @@ -10,13 +12,18 @@ template struct HudiMetadataParser { public: - HudiMetadataParser(); + HudiMetadataParser(const Configuration & configuration, ContextPtr context); - Strings getFiles(const Configuration & configuration, ContextPtr context); + Strings getFiles() { return data_files; } + + NamesAndTypesList getTableSchema() const { return {}; } + + DataLakePartitionColumns getPartitionColumns() const { return {}; } private: struct Impl; std::shared_ptr impl; + Strings data_files; }; } diff --git a/src/Storages/DataLakes/IStorageDataLake.h b/src/Storages/DataLakes/IStorageDataLake.h index 711abbde38c..be23c017043 100644 --- a/src/Storages/DataLakes/IStorageDataLake.h +++ b/src/Storages/DataLakes/IStorageDataLake.h @@ -9,6 +9,7 @@ #include #include #include +#include "PartitionColumns.h" #include @@ -23,15 +24,51 @@ public: using Configuration = typename Storage::Configuration; template - explicit IStorageDataLake(const Configuration & configuration_, ContextPtr context_, LoadingStrictnessLevel mode, Args && ...args) - : Storage(getConfigurationForDataRead(configuration_, context_, {}, mode), context_, std::forward(args)...) - , base_configuration(configuration_) - , log(getLogger(getName())) {} // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall) + static StoragePtr create( + const Configuration & configuration_, + ContextPtr context_, + LoadingStrictnessLevel mode, + const ColumnsDescription & columns_, + Args && ...args) + { + std::unique_ptr metadata; + Configuration read_configuration; + Configuration base_configuration{configuration_}; + try + { + base_configuration.update(context_); + metadata = std::make_unique(base_configuration, context_); + read_configuration = getConfigurationForDataRead(*metadata, base_configuration, context_); + } + catch (...) + { + if (mode <= LoadingStrictnessLevel::CREATE) + throw; + tryLogCurrentException(__PRETTY_FUNCTION__); + } + + return std::make_shared>( + configuration_, + read_configuration, + context_, + columns_.empty() && metadata ? ColumnsDescription(metadata->getTableSchema()) : columns_, + std::forward(args)...); + } template - static StoragePtr create(const Configuration & configuration_, ContextPtr context_, LoadingStrictnessLevel mode, Args && ...args) + explicit IStorageDataLake( + const Configuration & base_configuration_, + const Configuration & read_configuration_, + ContextPtr context_, + const ColumnsDescription & columns_, + Args && ...args) + : Storage(read_configuration_, + context_, + columns_, + std::forward(args)...) + , base_configuration(base_configuration_) + , log(getLogger(getName())) // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall) { - return std::make_shared>(configuration_, context_, mode, std::forward(args)...); } String getName() const override { return name; } @@ -41,8 +78,18 @@ public: const std::optional & format_settings, const ContextPtr & local_context) { - auto configuration = getConfigurationForDataRead(base_configuration, local_context); - return Storage::getTableStructureFromData(configuration, format_settings, local_context); + base_configuration.update(local_context); + auto metadata = std::make_unique(base_configuration, local_context); + auto schema = metadata->getTableSchema(); + if (!schema.empty()) + { + return ColumnsDescription(schema); + } + else + { + auto read_configuration = getConfigurationForDataRead(*metadata, base_configuration, local_context); + return Storage::getTableStructureFromData(read_configuration, format_settings, local_context); + } } static Configuration getConfiguration(ASTs & engine_args, const ContextPtr & local_context) @@ -65,51 +112,31 @@ public: private: static Configuration getConfigurationForDataRead( - const Configuration & base_configuration, const ContextPtr & local_context, const Strings & keys = {}, - LoadingStrictnessLevel mode = LoadingStrictnessLevel::CREATE) + MetadataParser & metadata_, + const Configuration & base_configuration, + const ContextPtr & local_context) { auto configuration{base_configuration}; configuration.update(local_context); configuration.static_configuration = true; - - try - { - if (keys.empty()) - configuration.keys = getDataFiles(configuration, local_context); - else - configuration.keys = keys; - - LOG_TRACE( - getLogger("DataLake"), - "New configuration path: {}, keys: {}", - configuration.getPath(), fmt::join(configuration.keys, ", ")); - - configuration.connect(local_context); - return configuration; - } - catch (...) - { - if (mode <= LoadingStrictnessLevel::CREATE) - throw; - tryLogCurrentException(__PRETTY_FUNCTION__); - return configuration; - } - } - - static Strings getDataFiles(const Configuration & configuration, const ContextPtr & local_context) - { - return MetadataParser().getFiles(configuration, local_context); + configuration.keys = metadata_.getFiles(); + configuration.connect(local_context); + return configuration; } void updateConfigurationImpl(const ContextPtr & local_context) { const bool updated = base_configuration.update(local_context); - auto new_keys = getDataFiles(base_configuration, local_context); + + auto metadata = MetadataParser(base_configuration, local_context); + auto new_keys = metadata.getFiles(); + Storage::partition_columns = metadata.getPartitionColumns(); if (!updated && new_keys == Storage::getConfiguration().keys) return; - Storage::useConfiguration(getConfigurationForDataRead(base_configuration, local_context, new_keys)); + auto read_configuration = getConfigurationForDataRead(metadata, base_configuration, local_context); + Storage::useConfiguration(read_configuration); } Configuration base_configuration; @@ -127,8 +154,9 @@ static StoragePtr createDataLakeStorage(const StorageFactory::Arguments & args) if (configuration.format == "auto") configuration.format = "Parquet"; - return DataLake::create(configuration, args.getContext(), args.mode, args.table_id, args.columns, args.constraints, - args.comment, getFormatSettings(args.getContext())); + return DataLake::create(configuration, args.getContext(), args.mode, + args.columns, args.table_id, args.constraints, + args.comment, getFormatSettings(args.getContext())); } } diff --git a/src/Storages/DataLakes/Iceberg/StorageIceberg.cpp b/src/Storages/DataLakes/Iceberg/StorageIceberg.cpp index 19cd97c3d4f..6557b67d20c 100644 --- a/src/Storages/DataLakes/Iceberg/StorageIceberg.cpp +++ b/src/Storages/DataLakes/Iceberg/StorageIceberg.cpp @@ -9,8 +9,8 @@ StoragePtr StorageIceberg::create( const DB::StorageIceberg::Configuration & base_configuration, DB::ContextPtr context_, LoadingStrictnessLevel mode, - const DB::StorageID & table_id_, const DB::ColumnsDescription & columns_, + const DB::StorageID & table_id_, const DB::ConstraintsDescription & constraints_, const String & comment, std::optional format_settings_) @@ -36,8 +36,8 @@ StoragePtr StorageIceberg::create( std::move(metadata), configuration, context_, - table_id_, columns_.empty() ? ColumnsDescription(schema_from_metadata) : columns_, + table_id_, constraints_, comment, format_settings_); @@ -47,12 +47,12 @@ StorageIceberg::StorageIceberg( std::unique_ptr metadata_, const Configuration & configuration_, ContextPtr context_, - const StorageID & table_id_, const ColumnsDescription & columns_, + const StorageID & table_id_, const ConstraintsDescription & constraints_, const String & comment, std::optional format_settings_) - : StorageS3(configuration_, context_, table_id_, columns_, constraints_, comment, format_settings_) + : StorageS3(configuration_, context_, columns_, table_id_, constraints_, comment, format_settings_) , current_metadata(std::move(metadata_)) , base_configuration(configuration_) { diff --git a/src/Storages/DataLakes/Iceberg/StorageIceberg.h b/src/Storages/DataLakes/Iceberg/StorageIceberg.h index 45cbef0b41b..a95e203aa72 100644 --- a/src/Storages/DataLakes/Iceberg/StorageIceberg.h +++ b/src/Storages/DataLakes/Iceberg/StorageIceberg.h @@ -31,8 +31,8 @@ public: static StoragePtr create(const Configuration & base_configuration, ContextPtr context_, LoadingStrictnessLevel mode, - const StorageID & table_id_, const ColumnsDescription & columns_, + const StorageID & table_id_, const ConstraintsDescription & constraints_, const String & comment, std::optional format_settings_); @@ -41,8 +41,8 @@ public: std::unique_ptr metadata_, const Configuration & configuration_, ContextPtr context_, - const StorageID & table_id_, const ColumnsDescription & columns_, + const StorageID & table_id_, const ConstraintsDescription & constraints_, const String & comment, std::optional format_settings_); diff --git a/src/Storages/DataLakes/PartitionColumns.h b/src/Storages/DataLakes/PartitionColumns.h new file mode 100644 index 00000000000..604dbbf78fa --- /dev/null +++ b/src/Storages/DataLakes/PartitionColumns.h @@ -0,0 +1,17 @@ +#pragma once +#include +#include + +namespace DB +{ + +struct DataLakePartitionColumn +{ + NameAndTypePair name_and_type; + Field value; +}; + +/// Data file -> partition columns +using DataLakePartitionColumns = std::unordered_map>; + +} diff --git a/src/Storages/DataLakes/S3MetadataReader.cpp b/src/Storages/DataLakes/S3MetadataReader.cpp index d66e21550a3..6a00ae1e452 100644 --- a/src/Storages/DataLakes/S3MetadataReader.cpp +++ b/src/Storages/DataLakes/S3MetadataReader.cpp @@ -45,12 +45,16 @@ std::vector S3DataLakeMetadataReadHelper::listFiles( const auto & bucket = base_configuration.url.bucket; const auto & client = base_configuration.client; + auto path = std::filesystem::path(table_path) / prefix; + std::vector res; S3::ListObjectsV2Request request; Aws::S3::Model::ListObjectsV2Outcome outcome; request.SetBucket(bucket); - request.SetPrefix(std::filesystem::path(table_path) / prefix); + request.SetPrefix(path); + + LOG_TEST(getLogger("S3DataLakeMetadataReadHelper"), "Listing files in {}", path.string()); bool is_finished{false}; while (!is_finished) @@ -69,6 +73,7 @@ std::vector S3DataLakeMetadataReadHelper::listFiles( for (const auto & obj : result_batch) { const auto & filename = obj.GetKey(); + LOG_TEST(getLogger("S3DataLakeMetadataReadHelper"), "Listed file: {} (searching for suffix: {})", filename, suffix); if (filename.ends_with(suffix)) res.push_back(filename); } diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 0371a9de08a..205b5d3381e 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -148,20 +148,23 @@ public: const StorageSnapshotPtr & storage_snapshot_, const ContextPtr & context_, Block sample_block, + const StorageS3::Configuration & query_configuration_, StorageS3 & storage_, ReadFromFormatInfo read_from_format_info_, bool need_only_count_, size_t max_block_size_, - size_t num_streams_) + size_t num_streams_, + const DataLakePartitionColumns & partition_columns_) : SourceStepWithFilter(DataStream{.header = std::move(sample_block)}, column_names_, query_info_, storage_snapshot_, context_) , column_names(column_names_) , storage(storage_) , read_from_format_info(std::move(read_from_format_info_)) , need_only_count(need_only_count_) + , query_configuration(query_configuration_) + , partition_columns(partition_columns_) , max_block_size(max_block_size_) , num_streams(num_streams_) { - query_configuration = storage.updateConfigurationAndGetCopy(context); virtual_columns = storage.getVirtualsList(); } @@ -172,6 +175,7 @@ private: bool need_only_count; StorageS3::Configuration query_configuration; NamesAndTypesList virtual_columns; + DataLakePartitionColumns partition_columns; size_t max_block_size; size_t num_streams; @@ -577,7 +581,8 @@ StorageS3Source::StorageS3Source( const String & url_host_and_port_, std::shared_ptr file_iterator_, const size_t max_parsing_threads_, - bool need_only_count_) + bool need_only_count_, + const DataLakePartitionColumns & partition_columns_) : SourceWithKeyCondition(info.source_header, false) , WithContext(context_) , name(std::move(name_)) @@ -593,6 +598,7 @@ StorageS3Source::StorageS3Source( , client(client_) , sample_block(info.format_header) , format_settings(format_settings_) + , partition_columns(partition_columns_) , requested_virtual_columns(info.requested_virtual_columns) , file_iterator(file_iterator_) , max_parsing_threads(max_parsing_threads_) @@ -798,8 +804,37 @@ Chunk StorageS3Source::generate() size_t chunk_size = 0; if (const auto * input_format = reader.getInputFormat()) chunk_size = reader.getInputFormat()->getApproxBytesReadForChunk(); + progress(num_rows, chunk_size ? chunk_size : chunk.bytes()); VirtualColumnUtils::addRequestedPathFileAndSizeVirtualsToChunk(chunk, requested_virtual_columns, reader.getPath(), reader.getFileSize()); + + if (!partition_columns.empty() && chunk_size && chunk.hasColumns()) + { + auto filename = fs::path(reader.getPath()).filename().string(); + auto partition_values = partition_columns.find(filename); + + for (const auto & [name_and_type, value] : partition_values->second) + { + if (!sample_block.has(name_and_type.name)) + continue; + + auto column_pos = sample_block.getPositionByName(name_and_type.name); + + const auto & type = name_and_type.type; + auto partition_column = type->createColumnConst(chunk.getNumRows(), value)->convertToFullColumnIfConst(); + /// This column is filled with default value now, remove it. + chunk.erase(column_pos); + /// Add correct values. + if (chunk.hasColumns()) + { + chunk.addColumn(column_pos, std::move(partition_column)); + } + else + { + chunk.addColumn(std::move(partition_column)); + } + } + } return chunk; } @@ -1072,8 +1107,8 @@ private: StorageS3::StorageS3( const Configuration & configuration_, const ContextPtr & context_, - const StorageID & table_id_, const ColumnsDescription & columns_, + const StorageID & table_id_, const ConstraintsDescription & constraints_, const String & comment, std::optional format_settings_, @@ -1190,17 +1225,20 @@ void StorageS3::read( bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty()) && local_context->getSettingsRef().optimize_count_from_files; + auto query_configuration = updateConfigurationAndGetCopy(local_context); auto reading = std::make_unique( column_names, query_info, storage_snapshot, local_context, read_from_format_info.source_header, + query_configuration, *this, std::move(read_from_format_info), need_only_count, max_block_size, - num_streams); + num_streams, + partition_columns); query_plan.addStep(std::move(reading)); } @@ -1262,7 +1300,8 @@ void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline, query_configuration.url.uri.getHost() + std::to_string(query_configuration.url.uri.getPort()), iterator_wrapper, max_parsing_threads, - need_only_count); + need_only_count, + partition_columns); source->setKeyCondition(filter_actions_dag, context); pipes.emplace_back(std::move(source)); @@ -1973,8 +2012,8 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory) return std::make_shared( std::move(configuration), args.getContext(), - args.table_id, args.columns, + args.table_id, args.constraints, args.comment, format_settings, diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index c8ab28fb20e..9e1d9eb5aad 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -141,7 +142,8 @@ public: const String & url_host_and_port, std::shared_ptr file_iterator_, size_t max_parsing_threads, - bool need_only_count_); + bool need_only_count_, + const DataLakePartitionColumns & partition_columns_ = {}); ~StorageS3Source() override; @@ -170,6 +172,7 @@ private: std::shared_ptr client; Block sample_block; std::optional format_settings; + DataLakePartitionColumns partition_columns; struct ReaderHolder { @@ -305,8 +308,8 @@ public: StorageS3( const Configuration & configuration_, const ContextPtr & context_, - const StorageID & table_id_, const ColumnsDescription & columns_, + const StorageID & table_id_, const ConstraintsDescription & constraints_, const String & comment, std::optional format_settings_, @@ -363,6 +366,8 @@ protected: const Configuration & getConfiguration(); + mutable DataLakePartitionColumns partition_columns; + private: friend class StorageS3Cluster; friend class TableFunctionS3Cluster; diff --git a/src/TableFunctions/ITableFunction.cpp b/src/TableFunctions/ITableFunction.cpp index 137e1dc27fe..e5676c5c25d 100644 --- a/src/TableFunctions/ITableFunction.cpp +++ b/src/TableFunctions/ITableFunction.cpp @@ -36,7 +36,7 @@ StoragePtr ITableFunction::execute(const ASTPtr & ast_function, ContextPtr conte if (cached_columns.empty()) return executeImpl(ast_function, context, table_name, std::move(cached_columns), is_insert_query); - if (hasStaticStructure() && cached_columns == getActualTableStructure(context,is_insert_query)) + if (hasStaticStructure() && cached_columns == getActualTableStructure(context, is_insert_query)) return executeImpl(ast_function, context_to_use, table_name, std::move(cached_columns), is_insert_query); auto this_table_function = shared_from_this(); diff --git a/src/TableFunctions/ITableFunctionDataLake.h b/src/TableFunctions/ITableFunctionDataLake.h index 91165ba6705..1d946d9b5fa 100644 --- a/src/TableFunctions/ITableFunctionDataLake.h +++ b/src/TableFunctions/ITableFunctionDataLake.h @@ -26,21 +26,28 @@ protected: const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, - ColumnsDescription /*cached_columns*/, + ColumnsDescription cached_columns, bool /*is_insert_query*/) const override { ColumnsDescription columns; if (TableFunction::configuration.structure != "auto") columns = parseColumnsListFromString(TableFunction::configuration.structure, context); + else if (!structure_hint.empty()) + columns = structure_hint; + else if (!cached_columns.empty()) + columns = cached_columns; StoragePtr storage = Storage::create( - TableFunction::configuration, context, LoadingStrictnessLevel::CREATE, StorageID(TableFunction::getDatabaseName(), table_name), - columns, ConstraintsDescription{}, String{}, std::nullopt); + TableFunction::configuration, context, LoadingStrictnessLevel::CREATE, + columns, StorageID(TableFunction::getDatabaseName(), table_name), + ConstraintsDescription{}, String{}, std::nullopt); storage->startup(); return storage; } + void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; } + const char * getStorageTypeName() const override { return Storage::name; } ColumnsDescription getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const override @@ -60,6 +67,8 @@ protected: TableFunction::configuration.format = "Parquet"; TableFunction::parseArguments(ast_function, context); } + + ColumnsDescription structure_hint; }; } diff --git a/src/TableFunctions/TableFunctionS3.cpp b/src/TableFunctions/TableFunctionS3.cpp index a8c100ebd44..93e43fb17ac 100644 --- a/src/TableFunctions/TableFunctionS3.cpp +++ b/src/TableFunctions/TableFunctionS3.cpp @@ -420,8 +420,8 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context StoragePtr storage = std::make_shared( configuration, context, - StorageID(getDatabaseName(), table_name), columns, + StorageID(getDatabaseName(), table_name), ConstraintsDescription{}, String{}, /// No format_settings for table function S3 diff --git a/src/TableFunctions/TableFunctionS3Cluster.cpp b/src/TableFunctions/TableFunctionS3Cluster.cpp index e727c4e4c89..3f8ed72479b 100644 --- a/src/TableFunctions/TableFunctionS3Cluster.cpp +++ b/src/TableFunctions/TableFunctionS3Cluster.cpp @@ -37,8 +37,8 @@ StoragePtr TableFunctionS3Cluster::executeImpl( storage = std::make_shared( configuration, context, - StorageID(getDatabaseName(), table_name), columns, + StorageID(getDatabaseName(), table_name), ConstraintsDescription{}, /* comment */String{}, /* format_settings */std::nullopt, /// No format_settings for S3Cluster diff --git a/tests/integration/test_storage_delta/test.py b/tests/integration/test_storage_delta/test.py index 25f0b58e0f5..c6bb8fd8d69 100644 --- a/tests/integration/test_storage_delta/test.py +++ b/tests/integration/test_storage_delta/test.py @@ -511,3 +511,98 @@ def test_restart_broken_table_function(started_cluster): upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "") assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100 + + +def test_partition_columns(started_cluster): + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + minio_client = started_cluster.minio_client + bucket = started_cluster.minio_bucket + TABLE_NAME = "test_partition_columns" + result_file = f"{TABLE_NAME}" + partition_column = "c" + + delta_table = ( + DeltaTable.create(spark) + .tableName(TABLE_NAME) + .location(f"/{result_file}") + .addColumn("a", "INT") + .addColumn("b", "STRING") + .addColumn("c", "DATE") + .partitionedBy(partition_column) + .execute() + ) + num_rows = 9 + + schema = StructType( + [ + StructField("a", IntegerType()), + StructField("b", StringType()), + StructField("c", DateType()), + ] + ) + + for i in range(1, num_rows + 1): + data = [ + ( + i, + "test" + str(i), + datetime.strptime(f"2000-01-0{i}", "%Y-%m-%d"), + ) + ] + df = spark.createDataFrame(data=data, schema=schema) + df.printSchema() + df.write.mode("append").format("delta").partitionBy(partition_column).save( + f"/{TABLE_NAME}" + ) + + minio_client = started_cluster.minio_client + bucket = started_cluster.minio_bucket + + files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "") + assert len(files) > 0 + print(f"Uploaded files: {files}") + + result = instance.query( + f"describe table deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')" + ).strip() + + assert ( + result + == "a\tNullable(Int32)\t\t\t\t\t\nb\tNullable(String)\t\t\t\t\t\nc\tNullable(Date)" + ) + + result = int( + instance.query( + f"""SELECT count() + FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123') + """ + ) + ) + assert result == num_rows + result = int( + instance.query( + f"""SELECT count() + FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123') + WHERE c == toDateTime('2000/01/05') + """ + ) + ) + assert result == 1 + + # instance.query( + # f""" + # DROP TABLE IF EXISTS {TABLE_NAME}; + # CREATE TABLE {TABLE_NAME} (a Int32, b String, c DateTime) + # ENGINE=DeltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', 'minio123')""" + # ) + # assert ( + # int( + # instance.query( + # f"SELECT count() FROM {TABLE_NAME} WHERE c != toDateTime('2000/01/05')" + # ) + # ) + # == num_rows - 1 + # ) + # instance.query(f"SELECT a, b, c, FROM {TABLE_NAME}") + # assert False From a8f0cfe580291c8612c43e5396693fe9cd4d463e Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 13 May 2024 19:54:29 +0200 Subject: [PATCH 02/20] Fixes --- .../DataLakes/DeltaLakeMetadataParser.cpp | 76 ++++++++++++------- .../DataLakes/DeltaLakeMetadataParser.h | 3 + src/Storages/DataLakes/HudiMetadataParser.h | 3 + src/Storages/DataLakes/IStorageDataLake.h | 40 ++++++++-- src/Storages/StorageS3.cpp | 12 ++- src/Storages/StorageS3.h | 6 ++ tests/integration/test_storage_delta/test.py | 16 ++-- 7 files changed, 116 insertions(+), 40 deletions(-) diff --git a/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp b/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp index 50b6ca83cb4..30a617f3460 100644 --- a/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp +++ b/src/Storages/DataLakes/DeltaLakeMetadataParser.cpp @@ -44,6 +44,7 @@ namespace ErrorCodes extern const int INCORRECT_DATA; extern const int LOGICAL_ERROR; extern const int BAD_ARGUMENTS; + extern const int NOT_IMPLEMENTED; } template @@ -184,6 +185,10 @@ struct DeltaLakeMetadataParser::Impl Poco::Dynamic::Var json = parser.parse(json_str); Poco::JSON::Object::Ptr object = json.extract(); + std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + object->stringify(oss); + LOG_TEST(log, "Metadata: {}", oss.str()); + if (object->has("add")) { auto add_object = object->get("add").extract(); @@ -218,34 +223,49 @@ struct DeltaLakeMetadataParser::Impl auto path = object->get("remove").extract()->getValue("path"); result.erase(fs::path(configuration.getPath()) / path); } - if (file_schema.empty()) + if (object->has("metaData")) { - // std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM - // object->stringify(oss); - // LOG_TEST(log, "Metadata: {}", oss.str()); + const auto metadata_object = object->get("metaData").extract(); + const auto schema_object = metadata_object->getValue("schemaString"); - if (object->has("metaData")) + Poco::JSON::Parser p; + Poco::Dynamic::Var fields_json = parser.parse(schema_object); + Poco::JSON::Object::Ptr fields_object = fields_json.extract(); + + const auto fields = fields_object->get("fields").extract(); + NamesAndTypesList current_schema; + for (size_t i = 0; i < fields->size(); ++i) { - const auto metadata_object = object->get("metaData").extract(); - const auto schema_object = metadata_object->getValue("schemaString"); + const auto field = fields->getObject(static_cast(i)); + auto name = field->getValue("name"); + auto type = field->getValue("type"); + auto is_nullable = field->getValue("nullable"); - Poco::JSON::Parser p; - Poco::Dynamic::Var fields_json = parser.parse(schema_object); - Poco::JSON::Object::Ptr fields_object = fields_json.extract(); + std::string physical_name; + auto schema_metadata_object = field->get("metadata").extract(); + if (schema_metadata_object->has("delta.columnMapping.physicalName")) + physical_name = schema_metadata_object->getValue("delta.columnMapping.physicalName"); + else + physical_name = name; - const auto fields = fields_object->get("fields").extract(); - for (size_t i = 0; i < fields->size(); ++i) - { - const auto field = fields->getObject(static_cast(i)); - auto name = field->getValue("name"); - auto type = field->getValue("type"); - auto is_nullable = field->getValue("nullable"); + LOG_TEST(log, "Found column: {}, type: {}, nullable: {}, physical name: {}", + name, type, is_nullable, physical_name); - file_schema.push_back({name, getFieldType(field, "type", is_nullable)}); - } + current_schema.push_back({physical_name, getFieldType(field, "type", is_nullable)}); + } + + if (file_schema.empty()) + { + file_schema = current_schema; + } + else if (file_schema != current_schema) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, + "Reading from files with different schema is not possible " + "({} is different from {})", + file_schema.toString(), current_schema.toString()); } } - /// TODO: Check if schema in each file is the same? } } @@ -278,12 +298,20 @@ struct DeltaLakeMetadataParser::Impl return value; else if (which.isInt8()) return parse(value); + else if (which.isUInt8()) + return parse(value); else if (which.isInt16()) return parse(value); + else if (which.isUInt16()) + return parse(value); else if (which.isInt32()) return parse(value); + else if (which.isUInt32()) + return parse(value); else if (which.isInt64()) return parse(value); + else if (which.isUInt64()) + return parse(value); else if (which.isFloat32()) return parse(value); else if (which.isFloat64()) @@ -299,14 +327,6 @@ struct DeltaLakeMetadataParser::Impl readDateTime64Text(time, 6, in, assert_cast(data_type.get())->getTimeZone()); return time; } - // else if (which.isDecimal32()) - // return parse(value); - // else if (which.isDecimal64()) - // return parse(value); - // else if (which.isDecimal128()) - // return parse(value); - // else if (which.isDecimal256()) - // return parse(value); throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported DeltaLake type for {}", check_type->getColumnType()); } diff --git a/src/Storages/DataLakes/DeltaLakeMetadataParser.h b/src/Storages/DataLakes/DeltaLakeMetadataParser.h index 58cf7acd2a3..701f15a8a9f 100644 --- a/src/Storages/DataLakes/DeltaLakeMetadataParser.h +++ b/src/Storages/DataLakes/DeltaLakeMetadataParser.h @@ -20,10 +20,13 @@ public: DataLakePartitionColumns getPartitionColumns() const { return partition_columns; } + const std::unordered_map & getColumnNameToPhysicalNameMapping() const { return column_name_to_physical_name; } + private: struct Impl; std::shared_ptr impl; NamesAndTypesList schema; + std::unordered_map column_name_to_physical_name; DataLakePartitionColumns partition_columns; Strings data_files; }; diff --git a/src/Storages/DataLakes/HudiMetadataParser.h b/src/Storages/DataLakes/HudiMetadataParser.h index 9e2901a9d24..3f8631dba77 100644 --- a/src/Storages/DataLakes/HudiMetadataParser.h +++ b/src/Storages/DataLakes/HudiMetadataParser.h @@ -20,10 +20,13 @@ public: DataLakePartitionColumns getPartitionColumns() const { return {}; } + const std::unordered_map & getColumnNameToPhysicalNameMapping() const { return column_name_to_physical_name; } + private: struct Impl; std::shared_ptr impl; Strings data_files; + std::unordered_map column_name_to_physical_name; }; } diff --git a/src/Storages/DataLakes/IStorageDataLake.h b/src/Storages/DataLakes/IStorageDataLake.h index be23c017043..b0c5af1a6e4 100644 --- a/src/Storages/DataLakes/IStorageDataLake.h +++ b/src/Storages/DataLakes/IStorageDataLake.h @@ -9,6 +9,7 @@ #include #include #include +#include #include "PartitionColumns.h" #include @@ -32,11 +33,13 @@ public: Args && ...args) { std::unique_ptr metadata; - Configuration read_configuration; + Configuration base_configuration{configuration_}; + base_configuration.update(context_); + Configuration read_configuration{base_configuration}; + try { - base_configuration.update(context_); metadata = std::make_unique(base_configuration, context_); read_configuration = getConfigurationForDataRead(*metadata, base_configuration, context_); } @@ -124,23 +127,48 @@ private: return configuration; } + ReadFromFormatInfo prepareReadingFromFormat( + const Strings & requested_columns, + const StorageSnapshotPtr & storage_snapshot, + bool supports_subset_of_columns, + ContextPtr local_context) override + { + auto info = DB::prepareReadingFromFormat(requested_columns, storage_snapshot, supports_subset_of_columns); + if (!metadata) + { + base_configuration.update(local_context); + metadata = std::make_unique(base_configuration, local_context); + } + auto column_mapping = metadata->getColumnNameToPhysicalNameMapping(); + if (!column_mapping.empty()) + { + for (const auto & [column_name, physical_name] : column_mapping) + { + auto & column = info.format_header.getByName(column_name); + column.name = physical_name; + } + } + return info; + } + void updateConfigurationImpl(const ContextPtr & local_context) { const bool updated = base_configuration.update(local_context); - auto metadata = MetadataParser(base_configuration, local_context); - auto new_keys = metadata.getFiles(); - Storage::partition_columns = metadata.getPartitionColumns(); + metadata = std::make_unique(base_configuration, local_context); + auto new_keys = metadata->getFiles(); + Storage::partition_columns = metadata->getPartitionColumns(); if (!updated && new_keys == Storage::getConfiguration().keys) return; - auto read_configuration = getConfigurationForDataRead(metadata, base_configuration, local_context); + auto read_configuration = getConfigurationForDataRead(*metadata, base_configuration, local_context); Storage::useConfiguration(read_configuration); } Configuration base_configuration; std::mutex configuration_update_mutex; + std::unique_ptr metadata; LoggerPtr log; }; diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 205b5d3381e..fc05c40c38f 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -1210,6 +1210,15 @@ bool StorageS3::parallelizeOutputAfterReading(ContextPtr context) const return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration.format, context); } +ReadFromFormatInfo StorageS3::prepareReadingFromFormat( + const Strings & requested_columns, + const StorageSnapshotPtr & storage_snapshot, + bool supports_subset_of_columns, + ContextPtr /* local_context */) +{ + return DB::prepareReadingFromFormat(requested_columns, storage_snapshot, supports_subset_of_columns); +} + void StorageS3::read( QueryPlan & query_plan, const Names & column_names, @@ -1220,7 +1229,8 @@ void StorageS3::read( size_t max_block_size, size_t num_streams) { - auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context)); + auto read_from_format_info = prepareReadingFromFormat( + column_names, storage_snapshot, supportsSubsetOfColumns(local_context), local_context); bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty()) && local_context->getSettingsRef().optimize_count_from_files; diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index 9e1d9eb5aad..535c3a97a68 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -395,6 +395,12 @@ private: bool prefersLargeBlocks() const override; bool parallelizeOutputAfterReading(ContextPtr context) const override; + + virtual ReadFromFormatInfo prepareReadingFromFormat( + const Strings & requested_columns, + const StorageSnapshotPtr & storage_snapshot, + bool supports_subset_of_columns, + ContextPtr local_context); }; } diff --git a/tests/integration/test_storage_delta/test.py b/tests/integration/test_storage_delta/test.py index c6bb8fd8d69..4cb71895881 100644 --- a/tests/integration/test_storage_delta/test.py +++ b/tests/integration/test_storage_delta/test.py @@ -153,7 +153,7 @@ def test_single_log_file(started_cluster): bucket = started_cluster.minio_bucket TABLE_NAME = "test_single_log_file" - inserted_data = "SELECT number, toString(number + 1) FROM numbers(100)" + inserted_data = "SELECT number as a, toString(number + 1) as b FROM numbers(100)" parquet_data_path = create_initial_data_file( started_cluster, instance, inserted_data, TABLE_NAME ) @@ -520,7 +520,7 @@ def test_partition_columns(started_cluster): bucket = started_cluster.minio_bucket TABLE_NAME = "test_partition_columns" result_file = f"{TABLE_NAME}" - partition_column = "c" + partition_columns = ["b", "c", "d", "e"] delta_table = ( DeltaTable.create(spark) @@ -529,7 +529,9 @@ def test_partition_columns(started_cluster): .addColumn("a", "INT") .addColumn("b", "STRING") .addColumn("c", "DATE") - .partitionedBy(partition_column) + .addColumn("d", "INT") + .addColumn("e", "BOOLEAN") + .partitionedBy(partition_columns) .execute() ) num_rows = 9 @@ -539,6 +541,8 @@ def test_partition_columns(started_cluster): StructField("a", IntegerType()), StructField("b", StringType()), StructField("c", DateType()), + StructField("d", IntegerType()), + StructField("e", BooleanType()), ] ) @@ -548,11 +552,13 @@ def test_partition_columns(started_cluster): i, "test" + str(i), datetime.strptime(f"2000-01-0{i}", "%Y-%m-%d"), + i, + False, ) ] df = spark.createDataFrame(data=data, schema=schema) df.printSchema() - df.write.mode("append").format("delta").partitionBy(partition_column).save( + df.write.mode("append").format("delta").partitionBy(partition_columns).save( f"/{TABLE_NAME}" ) @@ -569,7 +575,7 @@ def test_partition_columns(started_cluster): assert ( result - == "a\tNullable(Int32)\t\t\t\t\t\nb\tNullable(String)\t\t\t\t\t\nc\tNullable(Date)" + == "a\tNullable(Int32)\t\t\t\t\t\nb\tNullable(String)\t\t\t\t\t\nc\tNullable(Date32)\t\t\t\t\t\nd\tNullable(Int32)\t\t\t\t\t\ne\tNullable(Bool)" ) result = int( From ccb4bd63700267169cbc30da68392ba6d8abb0d9 Mon Sep 17 00:00:00 2001 From: kssenii Date: Thu, 6 Jun 2024 19:35:57 +0200 Subject: [PATCH 03/20] Resolve conflicts: apply implementation to StorageObjectStorage --- .../DataLakes/DeltaLakeMetadata.cpp | 311 ++++++++++++++++-- .../DataLakes/DeltaLakeMetadata.h | 12 +- .../ObjectStorage/DataLakes/HudiMetadata.h | 3 + .../DataLakes/IDataLakeMetadata.h | 3 + .../DataLakes/IStorageDataLake.h | 27 +- .../ObjectStorage/DataLakes/IcebergMetadata.h | 3 + .../ObjectStorage/StorageObjectStorage.cpp | 21 +- .../ObjectStorage/StorageObjectStorage.h | 8 + .../StorageObjectStorageSource.cpp | 31 +- .../StorageObjectStorageSource.h | 5 +- 10 files changed, 384 insertions(+), 40 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp index 38bf3112ee2..bd3e21f12fd 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp @@ -4,19 +4,41 @@ #include #if USE_AWS_S3 && USE_PARQUET -#include + +#include +#include +#include +#include + +#include +#include +#include + #include #include #include -#include -#include -#include -#include -#include -#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + #include +#include #include -#include +#include +#include +#include + +namespace fs = std::filesystem; namespace DB { @@ -25,6 +47,8 @@ namespace ErrorCodes { extern const int INCORRECT_DATA; extern const int BAD_ARGUMENTS; + extern const int LOGICAL_ERROR; + extern const int NOT_IMPLEMENTED; } struct DeltaLakeMetadata::Impl @@ -74,9 +98,17 @@ struct DeltaLakeMetadata::Impl * An action changes one aspect of the table's state, for example, adding or removing a file. * Note: it is not a valid json, but a list of json's, so we read it in a while cycle. */ - std::set processMetadataFiles() + struct DeltaLakeMetadata + { + NamesAndTypesList schema; + Strings data_files; + DataLakePartitionColumns partition_columns; + }; + DeltaLakeMetadata processMetadataFiles() { std::set result_files; + NamesAndTypesList current_schema; + DataLakePartitionColumns current_partition_columns; const auto checkpoint_version = getCheckpointIfExists(result_files); if (checkpoint_version) @@ -90,7 +122,7 @@ struct DeltaLakeMetadata::Impl if (!object_storage->exists(StoredObject(file_path))) break; - processMetadataFile(file_path, result_files); + processMetadataFile(file_path, current_schema, current_partition_columns, result_files); } LOG_TRACE( @@ -101,10 +133,10 @@ struct DeltaLakeMetadata::Impl { const auto keys = listFiles(*object_storage, *configuration, deltalake_metadata_directory, metadata_file_suffix); for (const String & key : keys) - processMetadataFile(key, result_files); + processMetadataFile(key, current_schema, current_partition_columns, result_files); } - return result_files; + return DeltaLakeMetadata{current_schema, Strings(result_files.begin(), result_files.end()), current_partition_columns}; } /** @@ -136,7 +168,11 @@ struct DeltaLakeMetadata::Impl * \"nullCount\":{\"col-6c990940-59bb-4709-8f2e-17083a82c01a\":0,\"col-763cd7e2-7627-4d8e-9fb7-9e85d0c8845b\":0}}"}} * " */ - void processMetadataFile(const String & key, std::set & result) const + void processMetadataFile( + const String & key, + NamesAndTypesList & file_schema, + DataLakePartitionColumns & file_partition_columns, + std::set & result) { auto read_settings = context->getReadSettings(); auto buf = object_storage->readObject(StoredObject(key), read_settings); @@ -157,20 +193,236 @@ struct DeltaLakeMetadata::Impl if (json_str.empty()) continue; - const JSON json(json_str); - if (json.has("add")) + Poco::JSON::Parser parser; + Poco::Dynamic::Var json = parser.parse(json_str); + Poco::JSON::Object::Ptr object = json.extract(); + + std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + object->stringify(oss); + LOG_TEST(log, "Metadata: {}", oss.str()); + + if (object->has("add")) { - const auto path = json["add"]["path"].getString(); - result.insert(std::filesystem::path(configuration->getPath()) / path); + auto add_object = object->get("add").extract(); + auto path = add_object->getValue("path"); + result.insert(fs::path(configuration->getPath()) / path); + + auto filename = fs::path(path).filename().string(); + auto it = file_partition_columns.find(filename); + if (it == file_partition_columns.end()) + { + auto partition_values = add_object->get("partitionValues").extract(); + if (partition_values->size()) + { + auto & current_partition_columns = file_partition_columns[filename]; + for (const auto & partition_name : partition_values->getNames()) + { + const auto value = partition_values->getValue(partition_name); + auto name_and_type = file_schema.tryGetByName(partition_name); + if (!name_and_type) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No such column in schema: {}", partition_name); + + auto field = getFieldValue(value, name_and_type->type); + current_partition_columns.emplace_back(*name_and_type, field); + + LOG_TEST(log, "Partition {} value is {} (for {})", partition_name, value, filename); + } + } + } } - else if (json.has("remove")) + else if (object->has("remove")) { - const auto path = json["remove"]["path"].getString(); - result.erase(std::filesystem::path(configuration->getPath()) / path); + auto path = object->get("remove").extract()->getValue("path"); + result.erase(fs::path(configuration->getPath()) / path); + } + if (object->has("metaData")) + { + const auto metadata_object = object->get("metaData").extract(); + const auto schema_object = metadata_object->getValue("schemaString"); + + Poco::JSON::Parser p; + Poco::Dynamic::Var fields_json = parser.parse(schema_object); + Poco::JSON::Object::Ptr fields_object = fields_json.extract(); + + const auto fields = fields_object->get("fields").extract(); + NamesAndTypesList current_schema; + for (size_t i = 0; i < fields->size(); ++i) + { + const auto field = fields->getObject(static_cast(i)); + auto column_name = field->getValue("name"); + auto type = field->getValue("type"); + auto is_nullable = field->getValue("nullable"); + + std::string physical_name; + auto schema_metadata_object = field->get("metadata").extract(); + if (schema_metadata_object->has("delta.columnMapping.physicalName")) + physical_name = schema_metadata_object->getValue("delta.columnMapping.physicalName"); + else + physical_name = column_name; + + LOG_TEST(log, "Found column: {}, type: {}, nullable: {}, physical name: {}", + column_name, type, is_nullable, physical_name); + + current_schema.push_back({physical_name, getFieldType(field, "type", is_nullable)}); + } + + if (file_schema.empty()) + { + file_schema = current_schema; + } + else if (file_schema != current_schema) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, + "Reading from files with different schema is not possible " + "({} is different from {})", + file_schema.toString(), current_schema.toString()); + } } } } + DataTypePtr getFieldType(const Poco::JSON::Object::Ptr & field, const String & type_key, bool is_nullable) + { + if (field->isObject(type_key)) + return getComplexTypeFromObject(field->getObject(type_key)); + + auto type = field->get(type_key); + if (type.isString()) + { + const String & type_name = type.extract(); + auto data_type = getSimpleTypeByName(type_name); + return is_nullable ? makeNullable(data_type) : data_type; + } + + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected 'type' field: {}", type.toString()); + } + + Field getFieldValue(const String & value, DataTypePtr data_type) + { + DataTypePtr check_type; + if (data_type->isNullable()) + check_type = static_cast(data_type.get())->getNestedType(); + else + check_type = data_type; + + WhichDataType which(check_type->getTypeId()); + if (which.isStringOrFixedString()) + return value; + else if (which.isInt8()) + return parse(value); + else if (which.isUInt8()) + return parse(value); + else if (which.isInt16()) + return parse(value); + else if (which.isUInt16()) + return parse(value); + else if (which.isInt32()) + return parse(value); + else if (which.isUInt32()) + return parse(value); + else if (which.isInt64()) + return parse(value); + else if (which.isUInt64()) + return parse(value); + else if (which.isFloat32()) + return parse(value); + else if (which.isFloat64()) + return parse(value); + else if (which.isDate()) + return UInt16{LocalDate{std::string(value)}.getDayNum()}; + else if (which.isDate32()) + return Int32{LocalDate{std::string(value)}.getExtenedDayNum()}; + else if (which.isDateTime64()) + { + ReadBufferFromString in(value); + DateTime64 time = 0; + readDateTime64Text(time, 6, in, assert_cast(data_type.get())->getTimeZone()); + return time; + } + + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported DeltaLake type for {}", check_type->getColumnType()); + } + + DataTypePtr getSimpleTypeByName(const String & type_name) + { + /// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#primitive-types + + if (type_name == "string" || type_name == "binary") + return std::make_shared(); + if (type_name == "long") + return std::make_shared(); + if (type_name == "integer") + return std::make_shared(); + if (type_name == "short") + return std::make_shared(); + if (type_name == "byte") + return std::make_shared(); + if (type_name == "float") + return std::make_shared(); + if (type_name == "double") + return std::make_shared(); + if (type_name == "boolean") + return DataTypeFactory::instance().get("Bool"); + if (type_name == "date") + return std::make_shared(); + if (type_name == "timestamp") + return std::make_shared(6); + if (type_name.starts_with("decimal(") && type_name.ends_with(')')) + { + ReadBufferFromString buf(std::string_view(type_name.begin() + 8, type_name.end() - 1)); + size_t precision; + size_t scale; + readIntText(precision, buf); + skipWhitespaceIfAny(buf); + assertChar(',', buf); + skipWhitespaceIfAny(buf); + tryReadIntText(scale, buf); + return createDecimal(precision, scale); + } + + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported DeltaLake type: {}", type_name); + } + + DataTypePtr getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type) + { + String type_name = type->getValue("type"); + + if (type_name == "struct") + { + DataTypes element_types; + Names element_names; + auto fields = type->get("fields").extract(); + element_types.reserve(fields->size()); + element_names.reserve(fields->size()); + for (size_t i = 0; i != fields->size(); ++i) + { + auto field = fields->getObject(static_cast(i)); + element_names.push_back(field->getValue("name")); + auto required = field->getValue("required"); + element_types.push_back(getFieldType(field, "type", required)); + } + + return std::make_shared(element_types, element_names); + } + + if (type_name == "array") + { + bool is_nullable = type->getValue("containsNull"); + auto element_type = getFieldType(type, "elementType", is_nullable); + return std::make_shared(element_type); + } + + if (type_name == "map") + { + bool is_nullable = type->getValue("containsNull"); + auto key_type = getFieldType(type, "keyType", /* is_nullable */false); + auto value_type = getFieldType(type, "valueType", is_nullable); + return std::make_shared(key_type, value_type); + } + + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported DeltaLake type: {}", type_name); + } + /** * Checkpoints in delta-lake are created each 10 commits by default. * Latest checkpoint is written in _last_checkpoint file: _delta_log/_last_checkpoint @@ -277,8 +529,8 @@ struct DeltaLakeMetadata::Impl ArrowMemoryPool::instance(), &reader)); - std::shared_ptr schema; - THROW_ARROW_NOT_OK(reader->GetSchema(&schema)); + std::shared_ptr file_schema; + THROW_ARROW_NOT_OK(reader->GetSchema(&file_schema)); ArrowColumnToCHColumn column_reader( header, "Parquet", @@ -327,16 +579,13 @@ DeltaLakeMetadata::DeltaLakeMetadata( ContextPtr context_) : impl(std::make_unique(object_storage_, configuration_, context_)) { -} - -Strings DeltaLakeMetadata::getDataFiles() const -{ - if (!data_files.empty()) - return data_files; - auto result = impl->processMetadataFiles(); - data_files = Strings(result.begin(), result.end()); - return data_files; + data_files = result.data_files; + schema = result.schema; + partition_columns = result.partition_columns; + + LOG_TRACE(impl->log, "Found {} data files, {} partition files, schema: {}", + data_files.size(), partition_columns.size(), schema.toString()); } } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h index e527721b29e..da9fa1e76ce 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h @@ -20,9 +20,13 @@ public: ConfigurationPtr configuration_, ContextPtr context_); - Strings getDataFiles() const override; + Strings getDataFiles() const override { return data_files; } - NamesAndTypesList getTableSchema() const override { return {}; } + NamesAndTypesList getTableSchema() const override { return schema; } + + DataLakePartitionColumns getPartitionColumns() const override { return partition_columns; } + + const std::unordered_map & getColumnNameToPhysicalNameMapping() const override { return column_name_to_physical_name; } bool operator ==(const IDataLakeMetadata & other) const override { @@ -44,6 +48,10 @@ private: struct Impl; const std::shared_ptr impl; mutable Strings data_files; + + NamesAndTypesList schema; + std::unordered_map column_name_to_physical_name; + DataLakePartitionColumns partition_columns; }; } diff --git a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h index 3ab274b1fbf..ac978732804 100644 --- a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h @@ -26,6 +26,8 @@ public: NamesAndTypesList getTableSchema() const override { return {}; } + const std::unordered_map & getColumnNameToPhysicalNameMapping() const override { return column_name_to_physical_name; } + bool operator ==(const IDataLakeMetadata & other) const override { const auto * hudi_metadata = dynamic_cast(&other); @@ -46,6 +48,7 @@ private: const ObjectStoragePtr object_storage; const ConfigurationPtr configuration; mutable Strings data_files; + std::unordered_map column_name_to_physical_name; Strings getDataFilesImpl() const; }; diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index a2bd5adb947..53b8abf7a5c 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -2,6 +2,7 @@ #include #include #include +#include "PartitionColumns.h" namespace DB { @@ -13,6 +14,8 @@ public: virtual Strings getDataFiles() const = 0; virtual NamesAndTypesList getTableSchema() const = 0; virtual bool operator==(const IDataLakeMetadata & other) const = 0; + virtual DataLakePartitionColumns getPartitionColumns() const { return {}; } + virtual const std::unordered_map & getColumnNameToPhysicalNameMapping() const = 0; }; using DataLakeMetadataPtr = std::unique_ptr; diff --git a/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h b/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h index 83865c47eb8..64711d1774c 100644 --- a/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h +++ b/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h @@ -81,7 +81,7 @@ public: auto metadata = DataLakeMetadata::create(object_storage_, base_configuration, local_context); auto schema_from_metadata = metadata->getTableSchema(); - if (schema_from_metadata != NamesAndTypesList{}) + if (!schema_from_metadata.empty()) { return ColumnsDescription(std::move(schema_from_metadata)); } @@ -99,6 +99,7 @@ public: Storage::updateConfiguration(local_context); auto new_metadata = DataLakeMetadata::create(Storage::object_storage, base_configuration, local_context); + Storage::partition_columns = new_metadata->getPartitionColumns(); if (current_metadata && *current_metadata == *new_metadata) return; @@ -128,6 +129,30 @@ public: private: ConfigurationPtr base_configuration; DataLakeMetadataPtr current_metadata; + + ReadFromFormatInfo prepareReadingFromFormat( + const Strings & requested_columns, + const StorageSnapshotPtr & storage_snapshot, + bool supports_subset_of_columns, + ContextPtr local_context) override + { + auto info = DB::prepareReadingFromFormat(requested_columns, storage_snapshot, supports_subset_of_columns); + if (!current_metadata) + { + Storage::updateConfiguration(local_context); + current_metadata = DataLakeMetadata::create(Storage::object_storage, base_configuration, local_context); + } + auto column_mapping = current_metadata->getColumnNameToPhysicalNameMapping(); + if (!column_mapping.empty()) + { + for (const auto & [column_name, physical_name] : column_mapping) + { + auto & column = info.format_header.getByName(column_name); + column.name = physical_name; + } + } + return info; + } }; using StorageIceberg = IStorageDataLake; diff --git a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h index 06dbd373bf9..39673a03cb1 100644 --- a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h @@ -82,6 +82,8 @@ public: /// Get table schema parsed from metadata. NamesAndTypesList getTableSchema() const override { return schema; } + const std::unordered_map & getColumnNameToPhysicalNameMapping() const override { return column_name_to_physical_name; } + bool operator ==(const IDataLakeMetadata & other) const override { const auto * iceberg_metadata = dynamic_cast(&other); @@ -104,6 +106,7 @@ private: Int32 current_schema_id; NamesAndTypesList schema; mutable Strings data_files; + std::unordered_map column_name_to_physical_name; LoggerPtr log; }; diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 2c8e60b49d0..14bbad659f1 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -111,7 +111,8 @@ public: const bool need_only_count_, ContextPtr context_, size_t max_block_size_, - size_t num_streams_) + size_t num_streams_, + const DataLakePartitionColumns & partition_columns_) : SourceStepWithFilter(DataStream{.header = info_.source_header}, columns_to_read, query_info_, storage_snapshot_, context_) , object_storage(object_storage_) , configuration(configuration_) @@ -123,6 +124,7 @@ public: , max_block_size(max_block_size_) , num_streams(num_streams_) , distributed_processing(distributed_processing_) + , partition_columns(partition_columns_) { } @@ -161,7 +163,7 @@ public: { auto source = std::make_shared( getName(), object_storage, configuration, info, format_settings, - context, max_block_size, iterator_wrapper, max_parsing_threads, need_only_count); + context, max_block_size, iterator_wrapper, max_parsing_threads, need_only_count, partition_columns); source->setKeyCondition(filter_actions_dag, context); pipes.emplace_back(std::move(source)); @@ -190,6 +192,7 @@ private: const size_t max_block_size; size_t num_streams; const bool distributed_processing; + DataLakePartitionColumns partition_columns; void createIterator(const ActionsDAG::Node * predicate) { @@ -203,6 +206,15 @@ private: }; } +ReadFromFormatInfo StorageObjectStorage::prepareReadingFromFormat( + const Strings & requested_columns, + const StorageSnapshotPtr & storage_snapshot, + bool supports_subset_of_columns, + ContextPtr /* local_context */) +{ + return DB::prepareReadingFromFormat(requested_columns, storage_snapshot, supports_subset_of_columns); +} + void StorageObjectStorage::read( QueryPlan & query_plan, const Names & column_names, @@ -222,7 +234,7 @@ void StorageObjectStorage::read( } const auto read_from_format_info = prepareReadingFromFormat( - column_names, storage_snapshot, supportsSubsetOfColumns(local_context)); + column_names, storage_snapshot, supportsSubsetOfColumns(local_context), local_context); const bool need_only_count = (query_info.optimize_trivial_count || read_from_format_info.requested_columns.empty()) && local_context->getSettingsRef().optimize_count_from_files; @@ -240,7 +252,8 @@ void StorageObjectStorage::read( need_only_count, local_context, max_block_size, - num_streams); + num_streams, + partition_columns); query_plan.addStep(std::move(read_step)); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index f45d8c1f01a..645f97201ec 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB { @@ -117,6 +118,12 @@ public: protected: virtual void updateConfiguration(ContextPtr local_context); + virtual ReadFromFormatInfo prepareReadingFromFormat( + const Strings & requested_columns, + const StorageSnapshotPtr & storage_snapshot, + bool supports_subset_of_columns, + ContextPtr local_context); + static std::unique_ptr createReadBufferIterator( const ObjectStoragePtr & object_storage, const ConfigurationPtr & configuration, @@ -129,6 +136,7 @@ protected: const std::optional format_settings; const ASTPtr partition_by; const bool distributed_processing; + mutable DataLakePartitionColumns partition_columns; LoggerPtr log; }; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index b31d0f8a92e..97c1ecc38b9 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -49,7 +49,8 @@ StorageObjectStorageSource::StorageObjectStorageSource( UInt64 max_block_size_, std::shared_ptr file_iterator_, size_t max_parsing_threads_, - bool need_only_count_) + bool need_only_count_, + const DataLakePartitionColumns & partition_columns_) : SourceWithKeyCondition(info.source_header, false) , WithContext(context_) , name(std::move(name_)) @@ -68,6 +69,7 @@ StorageObjectStorageSource::StorageObjectStorageSource( , columns_desc(info.columns_description) , file_iterator(file_iterator_) , schema_cache(StorageObjectStorage::getSchemaCache(context_, configuration->getTypeName())) + , partition_columns(partition_columns_) , create_reader_scheduler(threadPoolCallbackRunnerUnsafe(*create_reader_pool, "Reader")) { } @@ -201,6 +203,33 @@ Chunk StorageObjectStorageSource::generate() getUniqueStoragePathIdentifier(*configuration, reader.getObjectInfo(), false), object_info.metadata->size_bytes, &filename); + if (!partition_columns.empty() && chunk_size && chunk.hasColumns()) + { + auto partition_values = partition_columns.find(filename); + + for (const auto & [name_and_type, value] : partition_values->second) + { + if (!read_from_format_info.source_header.has(name_and_type.name)) + continue; + + auto column_pos = read_from_format_info.source_header.getPositionByName(name_and_type.name); + + const auto & type = name_and_type.type; + auto partition_column = type->createColumnConst(chunk.getNumRows(), value)->convertToFullColumnIfConst(); + /// This column is filled with default value now, remove it. + chunk.erase(column_pos); + /// Add correct values. + if (chunk.hasColumns()) + { + chunk.addColumn(column_pos, std::move(partition_column)); + } + else + { + chunk.addColumn(std::move(partition_column)); + } + } + } + return chunk; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.h b/src/Storages/ObjectStorage/StorageObjectStorageSource.h index fd7c7aa7102..ab8d9588155 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB @@ -39,7 +40,8 @@ public: UInt64 max_block_size_, std::shared_ptr file_iterator_, size_t max_parsing_threads_, - bool need_only_count_); + bool need_only_count_, + const DataLakePartitionColumns & partition_columns_ = {}); ~StorageObjectStorageSource() override; @@ -81,6 +83,7 @@ protected: bool initialized = false; size_t total_rows_in_file = 0; LoggerPtr log = getLogger("StorageObjectStorageSource"); + DataLakePartitionColumns partition_columns; struct ReaderHolder : private boost::noncopyable { From 1c1628db0f00d25c97e552e1b63d2f2a5cae9ee0 Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 7 Jun 2024 13:43:26 +0200 Subject: [PATCH 04/20] Minor --- .../ObjectStorage/DataLakes/DeltaLakeMetadata.h | 2 +- src/Storages/ObjectStorage/DataLakes/HudiMetadata.h | 3 +++ .../ObjectStorage/DataLakes/IDataLakeMetadata.h | 2 +- .../ObjectStorage/DataLakes/IStorageDataLake.h | 5 ++++- src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h | 3 +++ .../ObjectStorage/DataLakes/PartitionColumns.h | 2 ++ .../ObjectStorage/StorageObjectStorageSource.cpp | 10 +++------- src/TableFunctions/ITableFunctionDataLake.h | 2 -- 8 files changed, 17 insertions(+), 12 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h index da9fa1e76ce..926bd1b451d 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h @@ -24,7 +24,7 @@ public: NamesAndTypesList getTableSchema() const override { return schema; } - DataLakePartitionColumns getPartitionColumns() const override { return partition_columns; } + const DataLakePartitionColumns & getPartitionColumns() const override { return partition_columns; } const std::unordered_map & getColumnNameToPhysicalNameMapping() const override { return column_name_to_physical_name; } diff --git a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h index ac978732804..b060b1b0d39 100644 --- a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h @@ -26,6 +26,8 @@ public: NamesAndTypesList getTableSchema() const override { return {}; } + const DataLakePartitionColumns & getPartitionColumns() const override { return partition_columns; } + const std::unordered_map & getColumnNameToPhysicalNameMapping() const override { return column_name_to_physical_name; } bool operator ==(const IDataLakeMetadata & other) const override @@ -49,6 +51,7 @@ private: const ConfigurationPtr configuration; mutable Strings data_files; std::unordered_map column_name_to_physical_name; + DataLakePartitionColumns partition_columns; Strings getDataFilesImpl() const; }; diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 53b8abf7a5c..2954d50db91 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -14,7 +14,7 @@ public: virtual Strings getDataFiles() const = 0; virtual NamesAndTypesList getTableSchema() const = 0; virtual bool operator==(const IDataLakeMetadata & other) const = 0; - virtual DataLakePartitionColumns getPartitionColumns() const { return {}; } + virtual const DataLakePartitionColumns & getPartitionColumns() const = 0; virtual const std::unordered_map & getColumnNameToPhysicalNameMapping() const = 0; }; using DataLakeMetadataPtr = std::unique_ptr; diff --git a/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h b/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h index 64711d1774c..97fb9890490 100644 --- a/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h +++ b/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h @@ -99,7 +99,10 @@ public: Storage::updateConfiguration(local_context); auto new_metadata = DataLakeMetadata::create(Storage::object_storage, base_configuration, local_context); - Storage::partition_columns = new_metadata->getPartitionColumns(); + auto partition_columns = new_metadata->getPartitionColumns(); + + if (partition_columns != Storage::partition_columns) + Storage::partition_columns = partition_columns; if (current_metadata && *current_metadata == *new_metadata) return; diff --git a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h index 39673a03cb1..9476ac6e7d9 100644 --- a/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IcebergMetadata.h @@ -84,6 +84,8 @@ public: const std::unordered_map & getColumnNameToPhysicalNameMapping() const override { return column_name_to_physical_name; } + const DataLakePartitionColumns & getPartitionColumns() const override { return partition_columns; } + bool operator ==(const IDataLakeMetadata & other) const override { const auto * iceberg_metadata = dynamic_cast(&other); @@ -107,6 +109,7 @@ private: NamesAndTypesList schema; mutable Strings data_files; std::unordered_map column_name_to_physical_name; + DataLakePartitionColumns partition_columns; LoggerPtr log; }; diff --git a/src/Storages/ObjectStorage/DataLakes/PartitionColumns.h b/src/Storages/ObjectStorage/DataLakes/PartitionColumns.h index 604dbbf78fa..eb605559145 100644 --- a/src/Storages/ObjectStorage/DataLakes/PartitionColumns.h +++ b/src/Storages/ObjectStorage/DataLakes/PartitionColumns.h @@ -9,6 +9,8 @@ struct DataLakePartitionColumn { NameAndTypePair name_and_type; Field value; + + bool operator ==(const DataLakePartitionColumn & other) const = default; }; /// Data file -> partition columns diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 97c1ecc38b9..3ae51cd4235 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -212,21 +212,17 @@ Chunk StorageObjectStorageSource::generate() if (!read_from_format_info.source_header.has(name_and_type.name)) continue; - auto column_pos = read_from_format_info.source_header.getPositionByName(name_and_type.name); + const auto column_pos = read_from_format_info.source_header.getPositionByName(name_and_type.name); + auto partition_column = name_and_type.type->createColumnConst(chunk.getNumRows(), value)->convertToFullColumnIfConst(); - const auto & type = name_and_type.type; - auto partition_column = type->createColumnConst(chunk.getNumRows(), value)->convertToFullColumnIfConst(); /// This column is filled with default value now, remove it. chunk.erase(column_pos); + /// Add correct values. if (chunk.hasColumns()) - { chunk.addColumn(column_pos, std::move(partition_column)); - } else - { chunk.addColumn(std::move(partition_column)); - } } } diff --git a/src/TableFunctions/ITableFunctionDataLake.h b/src/TableFunctions/ITableFunctionDataLake.h index f7915643a08..fe6e5b3e593 100644 --- a/src/TableFunctions/ITableFunctionDataLake.h +++ b/src/TableFunctions/ITableFunctionDataLake.h @@ -69,8 +69,6 @@ protected: /// Set default format to Parquet if it's not specified in arguments. TableFunction::parseArguments(ast_function, context); } - - ColumnsDescription structure_hint; }; struct TableFunctionIcebergName From 146a7e13d950cb6b122fe23d336092cbe009b7d3 Mon Sep 17 00:00:00 2001 From: Pablo Marcos Date: Thu, 27 Jun 2024 08:44:46 +0000 Subject: [PATCH 05/20] Throw exception in bitTest when position is out of bound This happens whenever the number of bit positions is bigger than the number of bits in the number, or when the bit position is negative. --- src/Functions/bitTest.cpp | 14 +++++++++-- .../00967_ubsan_bit_test.reference | 1 - .../0_stateless/00967_ubsan_bit_test.sql | 1 - .../01082_bit_test_out_of_bound.reference | 24 +++++++++++++++++++ .../01082_bit_test_out_of_bound.sql | 5 ++++ .../01710_minmax_count_projection.sql | 2 +- 6 files changed, 42 insertions(+), 5 deletions(-) delete mode 100644 tests/queries/0_stateless/00967_ubsan_bit_test.reference delete mode 100644 tests/queries/0_stateless/00967_ubsan_bit_test.sql diff --git a/src/Functions/bitTest.cpp b/src/Functions/bitTest.cpp index 78ec9c8b773..f4c90a0c603 100644 --- a/src/Functions/bitTest.cpp +++ b/src/Functions/bitTest.cpp @@ -8,6 +8,7 @@ namespace DB namespace ErrorCodes { extern const int NOT_IMPLEMENTED; + extern const int PARAMETER_OUT_OF_BOUND; } namespace @@ -21,12 +22,21 @@ struct BitTestImpl static const constexpr bool allow_string_integer = false; template - NO_SANITIZE_UNDEFINED static Result apply(A a [[maybe_unused]], B b [[maybe_unused]]) + static Result apply(A a [[maybe_unused]], B b [[maybe_unused]]) { if constexpr (is_big_int_v || is_big_int_v) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "bitTest is not implemented for big integers as second argument"); else - return (typename NumberTraits::ToInteger::Type(a) >> typename NumberTraits::ToInteger::Type(b)) & 1; + { + const auto max_position = decltype(b)((8 * sizeof(a)) - 1); + if (b > max_position || b < 0) + { + throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND, + "The bit position argument needs to a positive value and less or equal to {} for integer {}", + static_cast(max_position), static_cast(a)); + } + return (a >> b) & 1; + } } #if USE_EMBEDDED_COMPILER diff --git a/tests/queries/0_stateless/00967_ubsan_bit_test.reference b/tests/queries/0_stateless/00967_ubsan_bit_test.reference deleted file mode 100644 index 573541ac970..00000000000 --- a/tests/queries/0_stateless/00967_ubsan_bit_test.reference +++ /dev/null @@ -1 +0,0 @@ -0 diff --git a/tests/queries/0_stateless/00967_ubsan_bit_test.sql b/tests/queries/0_stateless/00967_ubsan_bit_test.sql deleted file mode 100644 index 1682e725670..00000000000 --- a/tests/queries/0_stateless/00967_ubsan_bit_test.sql +++ /dev/null @@ -1 +0,0 @@ -SELECT sum(ignore(bitTest(number, 65))) FROM numbers(10); diff --git a/tests/queries/0_stateless/01082_bit_test_out_of_bound.reference b/tests/queries/0_stateless/01082_bit_test_out_of_bound.reference index 708c5d9d994..ee35e683ed1 100644 --- a/tests/queries/0_stateless/01082_bit_test_out_of_bound.reference +++ b/tests/queries/0_stateless/01082_bit_test_out_of_bound.reference @@ -198,3 +198,27 @@ 97 1 98 1 99 1 +0 1 +1 0 +2 1 +3 0 +4 1 +5 0 +6 1 +7 0 +0 1 +1 0 +2 1 +3 0 +4 1 +5 0 +6 1 +7 0 +8 1 +9 0 +10 1 +11 0 +12 1 +13 0 +14 1 +15 0 diff --git a/tests/queries/0_stateless/01082_bit_test_out_of_bound.sql b/tests/queries/0_stateless/01082_bit_test_out_of_bound.sql index 82e2c5a2380..324768b2e1d 100644 --- a/tests/queries/0_stateless/01082_bit_test_out_of_bound.sql +++ b/tests/queries/0_stateless/01082_bit_test_out_of_bound.sql @@ -1,2 +1,7 @@ SELECT number, bitTestAny(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(100); SELECT number, bitTestAll(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(100); + +SELECT number, bitTest(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(8); +SELECT number, bitTest(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(8, 16); -- { serverError PARAMETER_OUT_OF_BOUND } +SELECT number, bitTest(toUInt16(1 + 4 + 16 + 64 + 256 + 1024 + 4096 + 16384 + 65536), number) FROM numbers(16); +SELECT -number, bitTest(toUInt16(1), -number) FROM numbers(8); -- { serverError PARAMETER_OUT_OF_BOUND } diff --git a/tests/queries/0_stateless/01710_minmax_count_projection.sql b/tests/queries/0_stateless/01710_minmax_count_projection.sql index d0177da84d2..6c598bce440 100644 --- a/tests/queries/0_stateless/01710_minmax_count_projection.sql +++ b/tests/queries/0_stateless/01710_minmax_count_projection.sql @@ -16,7 +16,7 @@ select min(i), max(i), count() from d where _partition_value.1 = 10 group by _pa select min(i) from d where 1 = _partition_value.1; -- fuzz crash https://github.com/ClickHouse/ClickHouse/issues/37151 -SELECT min(i), max(i), count() FROM d WHERE (_partition_value.1) = 0 GROUP BY ignore(bitTest(ignore(NULL), 65535), NULL, (_partition_value.1) = 7, '10.25', bitTest(NULL, -9223372036854775808), NULL, ignore(ignore(-2147483647, NULL)), 1024), _partition_id ORDER BY _partition_id ASC NULLS FIRST; +SELECT min(i), max(i), count() FROM d WHERE (_partition_value.1) = 0 GROUP BY ignore(bitTest(ignore(NULL), 0), NULL, (_partition_value.1) = 7, '10.25', bitTest(NULL, 0), NULL, ignore(ignore(-2147483647, NULL)), 1024), _partition_id ORDER BY _partition_id ASC NULLS FIRST; drop table d; From d63f70e4f1459bbb7808fd1fb7202e9648ad6570 Mon Sep 17 00:00:00 2001 From: Pablo Marcos Date: Thu, 27 Jun 2024 10:41:51 +0000 Subject: [PATCH 06/20] Add bound check to bitTestAll and bitTestAny --- src/Functions/FunctionBitTestMany.h | 13 +- src/Functions/bitTest.cpp | 8 +- .../01082_bit_test_out_of_bound.reference | 184 ------------------ .../01082_bit_test_out_of_bound.sql | 6 +- 4 files changed, 20 insertions(+), 191 deletions(-) diff --git a/src/Functions/FunctionBitTestMany.h b/src/Functions/FunctionBitTestMany.h index 71e94b1e71d..19ece2ae9e5 100644 --- a/src/Functions/FunctionBitTestMany.h +++ b/src/Functions/FunctionBitTestMany.h @@ -17,6 +17,7 @@ namespace ErrorCodes extern const int ILLEGAL_COLUMN; extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION; + extern const int PARAMETER_OUT_OF_BOUND; } @@ -146,6 +147,9 @@ private: const auto pos = pos_col_const->getUInt(0); if (pos < 8 * sizeof(ValueType)) mask = mask | (ValueType(1) << pos); + else + throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND, + "The bit position argument {} is out of bounds for number", static_cast(pos)); } else { @@ -186,13 +190,20 @@ private: for (const auto i : collections::range(0, mask.size())) if (pos[i] < 8 * sizeof(ValueType)) mask[i] = mask[i] | (ValueType(1) << pos[i]); + else + throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND, + "The bit position argument {} is out of bounds for number", static_cast(pos[i])); return true; } else if (const auto pos_col_const = checkAndGetColumnConst>(pos_col_untyped)) { const auto & pos = pos_col_const->template getValue(); - const auto new_mask = pos < 8 * sizeof(ValueType) ? ValueType(1) << pos : 0; + if (pos >= 8 * sizeof(ValueType)) + throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND, + "The bit position argument {} is out of bounds for number", static_cast(pos)); + + const auto new_mask = ValueType(1) << pos; for (const auto i : collections::range(0, mask.size())) mask[i] = mask[i] | new_mask; diff --git a/src/Functions/bitTest.cpp b/src/Functions/bitTest.cpp index f4c90a0c603..1223ef7cbbb 100644 --- a/src/Functions/bitTest.cpp +++ b/src/Functions/bitTest.cpp @@ -28,14 +28,14 @@ struct BitTestImpl throw Exception(ErrorCodes::NOT_IMPLEMENTED, "bitTest is not implemented for big integers as second argument"); else { + typename NumberTraits::ToInteger::Type a_int(a); + typename NumberTraits::ToInteger::Type b_int(b); const auto max_position = decltype(b)((8 * sizeof(a)) - 1); if (b > max_position || b < 0) - { throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND, "The bit position argument needs to a positive value and less or equal to {} for integer {}", - static_cast(max_position), static_cast(a)); - } - return (a >> b) & 1; + std::to_string(max_position), std::to_string(a_int)); + return (a_int >> b_int) & 1; } } diff --git a/tests/queries/0_stateless/01082_bit_test_out_of_bound.reference b/tests/queries/0_stateless/01082_bit_test_out_of_bound.reference index ee35e683ed1..cf12c6b0b1c 100644 --- a/tests/queries/0_stateless/01082_bit_test_out_of_bound.reference +++ b/tests/queries/0_stateless/01082_bit_test_out_of_bound.reference @@ -6,98 +6,6 @@ 5 0 6 1 7 0 -8 0 -9 0 -10 0 -11 0 -12 0 -13 0 -14 0 -15 0 -16 0 -17 0 -18 0 -19 0 -20 0 -21 0 -22 0 -23 0 -24 0 -25 0 -26 0 -27 0 -28 0 -29 0 -30 0 -31 0 -32 0 -33 0 -34 0 -35 0 -36 0 -37 0 -38 0 -39 0 -40 0 -41 0 -42 0 -43 0 -44 0 -45 0 -46 0 -47 0 -48 0 -49 0 -50 0 -51 0 -52 0 -53 0 -54 0 -55 0 -56 0 -57 0 -58 0 -59 0 -60 0 -61 0 -62 0 -63 0 -64 0 -65 0 -66 0 -67 0 -68 0 -69 0 -70 0 -71 0 -72 0 -73 0 -74 0 -75 0 -76 0 -77 0 -78 0 -79 0 -80 0 -81 0 -82 0 -83 0 -84 0 -85 0 -86 0 -87 0 -88 0 -89 0 -90 0 -91 0 -92 0 -93 0 -94 0 -95 0 -96 0 -97 0 -98 0 -99 0 0 1 1 0 2 1 @@ -106,98 +14,6 @@ 5 0 6 1 7 0 -8 1 -9 1 -10 1 -11 1 -12 1 -13 1 -14 1 -15 1 -16 1 -17 1 -18 1 -19 1 -20 1 -21 1 -22 1 -23 1 -24 1 -25 1 -26 1 -27 1 -28 1 -29 1 -30 1 -31 1 -32 1 -33 1 -34 1 -35 1 -36 1 -37 1 -38 1 -39 1 -40 1 -41 1 -42 1 -43 1 -44 1 -45 1 -46 1 -47 1 -48 1 -49 1 -50 1 -51 1 -52 1 -53 1 -54 1 -55 1 -56 1 -57 1 -58 1 -59 1 -60 1 -61 1 -62 1 -63 1 -64 1 -65 1 -66 1 -67 1 -68 1 -69 1 -70 1 -71 1 -72 1 -73 1 -74 1 -75 1 -76 1 -77 1 -78 1 -79 1 -80 1 -81 1 -82 1 -83 1 -84 1 -85 1 -86 1 -87 1 -88 1 -89 1 -90 1 -91 1 -92 1 -93 1 -94 1 -95 1 -96 1 -97 1 -98 1 -99 1 0 1 1 0 2 1 diff --git a/tests/queries/0_stateless/01082_bit_test_out_of_bound.sql b/tests/queries/0_stateless/01082_bit_test_out_of_bound.sql index 324768b2e1d..92ece2a4aa4 100644 --- a/tests/queries/0_stateless/01082_bit_test_out_of_bound.sql +++ b/tests/queries/0_stateless/01082_bit_test_out_of_bound.sql @@ -1,5 +1,7 @@ -SELECT number, bitTestAny(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(100); -SELECT number, bitTestAll(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(100); +SELECT number, bitTestAny(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(8); +SELECT number, bitTestAny(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(8, 16); -- { serverError PARAMETER_OUT_OF_BOUND } +SELECT number, bitTestAll(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(8); +SELECT number, bitTestAll(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(8, 16); -- { serverError PARAMETER_OUT_OF_BOUND } SELECT number, bitTest(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(8); SELECT number, bitTest(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(8, 16); -- { serverError PARAMETER_OUT_OF_BOUND } From 7340d9adf8b0d70479717143e183b3f19222085b Mon Sep 17 00:00:00 2001 From: Blargian Date: Fri, 28 Jun 2024 13:29:26 +0200 Subject: [PATCH 07/20] add transactionID --- .../functions/other-functions.md | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/docs/en/sql-reference/functions/other-functions.md b/docs/en/sql-reference/functions/other-functions.md index 58fc1eba02e..738a12143a3 100644 --- a/docs/en/sql-reference/functions/other-functions.md +++ b/docs/en/sql-reference/functions/other-functions.md @@ -3860,3 +3860,45 @@ Result: └───────────────┘ ``` +## transactionID + +Returns the ID of a [transaction](https://clickhouse.com/docs/en/guides/developer/transactional#transactions-commit-and-rollback). + +:::note +This function is part of an experimental feature set. Enable experimental transaction support by adding this setting in `config.d/transactions.xml`: + +``` + + 1 + +``` + +For more information see the page [Transactional (ACID) support](https://clickhouse.com/docs/en/guides/developer/transactional#transactions-commit-and-rollback). +::: + +**Syntax** + +```sql +transactionID() +``` + +**Returned value** + +- TransactionID. [String](../data-types/string.md). [Tuple](../data-types/tuple.md)([UInt64](../data-types/int-uint.md), [UInt64](../data-types/int-uint.md), [UUID](../data-types/uuid.md)). + +**Example** + +Query: + +```sql +SELECT transactionID(); +``` + +Result: + +```response +┌─transactionID()──────────────────────────────┐ +│ (0,0,'00000000-0000-0000-0000-000000000000') │ +└──────────────────────────────────────────────┘ +``` + From ce330d54cf9e75defa98fa8fa6e16b17252441ad Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 28 Jun 2024 13:38:32 +0200 Subject: [PATCH 08/20] Review fixes (partial) --- .../DataLakes/DeltaLakeMetadata.cpp | 41 +++++++++++-------- .../DataLakes/IStorageDataLake.h | 19 ++++++--- .../StorageObjectStorageSource.cpp | 1 - 3 files changed, 39 insertions(+), 22 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp index bd3e21f12fd..79cd48e7aab 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp @@ -168,14 +168,20 @@ struct DeltaLakeMetadata::Impl * \"nullCount\":{\"col-6c990940-59bb-4709-8f2e-17083a82c01a\":0,\"col-763cd7e2-7627-4d8e-9fb7-9e85d0c8845b\":0}}"}} * " */ + + /// Read metadata file and fill `file_schema`, `file_parition_columns`, `result`. + /// `result` is a list of data files. + /// `file_schema` is a common schema for all files. + /// Schema evolution is not supported, so we check that all files have the same schema. + /// `file_partiion_columns` is information about parition columns of data files. void processMetadataFile( - const String & key, + const String & metadata_file_path, NamesAndTypesList & file_schema, DataLakePartitionColumns & file_partition_columns, std::set & result) { auto read_settings = context->getReadSettings(); - auto buf = object_storage->readObject(StoredObject(key), read_settings); + auto buf = object_storage->readObject(StoredObject(metadata_file_path), read_settings); char c; while (!buf->eof()) @@ -197,9 +203,9 @@ struct DeltaLakeMetadata::Impl Poco::Dynamic::Var json = parser.parse(json_str); Poco::JSON::Object::Ptr object = json.extract(); - std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM - object->stringify(oss); - LOG_TEST(log, "Metadata: {}", oss.str()); + // std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + // object->stringify(oss); + // LOG_TEST(log, "Metadata: {}", oss.str()); if (object->has("add")) { @@ -211,21 +217,24 @@ struct DeltaLakeMetadata::Impl auto it = file_partition_columns.find(filename); if (it == file_partition_columns.end()) { - auto partition_values = add_object->get("partitionValues").extract(); - if (partition_values->size()) + if (add_object->has("partitionValues")) { - auto & current_partition_columns = file_partition_columns[filename]; - for (const auto & partition_name : partition_values->getNames()) + auto partition_values = add_object->get("partitionValues").extract(); + if (partition_values->size()) { - const auto value = partition_values->getValue(partition_name); - auto name_and_type = file_schema.tryGetByName(partition_name); - if (!name_and_type) - throw Exception(ErrorCodes::LOGICAL_ERROR, "No such column in schema: {}", partition_name); + auto & current_partition_columns = file_partition_columns[filename]; + for (const auto & partition_name : partition_values->getNames()) + { + const auto value = partition_values->getValue(partition_name); + auto name_and_type = file_schema.tryGetByName(partition_name); + if (!name_and_type) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No such column in schema: {}", partition_name); - auto field = getFieldValue(value, name_and_type->type); - current_partition_columns.emplace_back(*name_and_type, field); + auto field = getFieldValue(value, name_and_type->type); + current_partition_columns.emplace_back(*name_and_type, field); - LOG_TEST(log, "Partition {} value is {} (for {})", partition_name, value, filename); + LOG_TEST(log, "Partition {} value is {} (for {})", partition_name, value, filename); + } } } } diff --git a/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h b/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h index 97fb9890490..ab069364021 100644 --- a/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h +++ b/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h @@ -99,14 +99,16 @@ public: Storage::updateConfiguration(local_context); auto new_metadata = DataLakeMetadata::create(Storage::object_storage, base_configuration, local_context); - auto partition_columns = new_metadata->getPartitionColumns(); - - if (partition_columns != Storage::partition_columns) - Storage::partition_columns = partition_columns; - if (current_metadata && *current_metadata == *new_metadata) return; + if (!current_metadata) + { + auto partition_columns = new_metadata->getPartitionColumns(); + if (partition_columns != Storage::partition_columns) + Storage::partition_columns = partition_columns; + } + current_metadata = std::move(new_metadata); auto updated_configuration = base_configuration->clone(); updated_configuration->setPaths(current_metadata->getDataFiles()); @@ -127,6 +129,13 @@ public: { base_configuration->format = Storage::configuration->format; } + + if (current_metadata) + { + auto partition_columns = current_metadata->getPartitionColumns(); + if (partition_columns != Storage::partition_columns) + Storage::partition_columns = partition_columns; + } } private: diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 6a5957c405b..b99ac466081 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -229,7 +229,6 @@ Chunk StorageObjectStorageSource::generate() } } - return chunk; } From f25147aefd7cef1ed18696df57a60369120229f8 Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 28 Jun 2024 14:57:38 +0200 Subject: [PATCH 09/20] Review fixes --- .../ObjectStorage/DataLakes/IStorageDataLake.h | 14 ++++---------- .../ObjectStorage/StorageObjectStorage.cpp | 11 ++++------- src/Storages/ObjectStorage/StorageObjectStorage.h | 5 ++++- .../ObjectStorage/StorageObjectStorageSource.cpp | 5 ++--- .../ObjectStorage/StorageObjectStorageSource.h | 4 +--- 5 files changed, 15 insertions(+), 24 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h b/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h index ab069364021..f1217bc9729 100644 --- a/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h +++ b/src/Storages/ObjectStorage/DataLakes/IStorageDataLake.h @@ -102,16 +102,10 @@ public: if (current_metadata && *current_metadata == *new_metadata) return; - if (!current_metadata) - { - auto partition_columns = new_metadata->getPartitionColumns(); - if (partition_columns != Storage::partition_columns) - Storage::partition_columns = partition_columns; - } - current_metadata = std::move(new_metadata); auto updated_configuration = base_configuration->clone(); updated_configuration->setPaths(current_metadata->getDataFiles()); + updated_configuration->setPartitionColumns(current_metadata->getPartitionColumns()); Storage::configuration = updated_configuration; } @@ -132,9 +126,9 @@ public: if (current_metadata) { - auto partition_columns = current_metadata->getPartitionColumns(); - if (partition_columns != Storage::partition_columns) - Storage::partition_columns = partition_columns; + const auto & columns = current_metadata->getPartitionColumns(); + base_configuration->setPartitionColumns(columns); + Storage::configuration->setPartitionColumns(columns); } } diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 4e3eeeb17f2..683473006e3 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -111,8 +111,7 @@ public: const bool need_only_count_, ContextPtr context_, size_t max_block_size_, - size_t num_streams_, - const DataLakePartitionColumns & partition_columns_) + size_t num_streams_) : SourceStepWithFilter(DataStream{.header = info_.source_header}, columns_to_read, query_info_, storage_snapshot_, context_) , object_storage(object_storage_) , configuration(configuration_) @@ -124,7 +123,6 @@ public: , max_block_size(max_block_size_) , num_streams(num_streams_) , distributed_processing(distributed_processing_) - , partition_columns(partition_columns_) { } @@ -163,7 +161,7 @@ public: { auto source = std::make_shared( getName(), object_storage, configuration, info, format_settings, - context, max_block_size, iterator_wrapper, max_parsing_threads, need_only_count, partition_columns); + context, max_block_size, iterator_wrapper, max_parsing_threads, need_only_count); source->setKeyCondition(filter_actions_dag, context); pipes.emplace_back(std::move(source)); @@ -192,7 +190,6 @@ private: const size_t max_block_size; size_t num_streams; const bool distributed_processing; - DataLakePartitionColumns partition_columns; void createIterator(const ActionsDAG::Node * predicate) { @@ -252,8 +249,7 @@ void StorageObjectStorage::read( need_only_count, local_context, max_block_size, - num_streams, - partition_columns); + num_streams); query_plan.addStep(std::move(read_step)); } @@ -464,6 +460,7 @@ StorageObjectStorage::Configuration::Configuration(const Configuration & other) format = other.format; compression_method = other.compression_method; structure = other.structure; + partition_columns = other.partition_columns; } bool StorageObjectStorage::Configuration::withPartitionWildcard() const diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index fba91edf6f7..c93a0bf6943 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -136,7 +136,6 @@ protected: const std::optional format_settings; const ASTPtr partition_by; const bool distributed_processing; - mutable DataLakePartitionColumns partition_columns; LoggerPtr log; }; @@ -196,6 +195,9 @@ public: virtual ConfigurationPtr clone() = 0; virtual bool isStaticConfiguration() const { return true; } + void setPartitionColumns(const DataLakePartitionColumns & columns) { partition_columns = columns; } + const DataLakePartitionColumns & getPartitionColumns() const { return partition_columns; } + String format = "auto"; String compression_method = "auto"; String structure = "auto"; @@ -207,6 +209,7 @@ protected: void assertInitialized() const; bool initialized = false; + DataLakePartitionColumns partition_columns; }; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index b99ac466081..6bded90f11d 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -49,8 +49,7 @@ StorageObjectStorageSource::StorageObjectStorageSource( UInt64 max_block_size_, std::shared_ptr file_iterator_, size_t max_parsing_threads_, - bool need_only_count_, - const DataLakePartitionColumns & partition_columns_) + bool need_only_count_) : SourceWithKeyCondition(info.source_header, false) , WithContext(context_) , name(std::move(name_)) @@ -69,7 +68,6 @@ StorageObjectStorageSource::StorageObjectStorageSource( , columns_desc(info.columns_description) , file_iterator(file_iterator_) , schema_cache(StorageObjectStorage::getSchemaCache(context_, configuration->getTypeName())) - , partition_columns(partition_columns_) , create_reader_scheduler(threadPoolCallbackRunnerUnsafe(*create_reader_pool, "Reader")) { } @@ -206,6 +204,7 @@ Chunk StorageObjectStorageSource::generate() .last_modified = object_info->metadata->last_modified }); + const auto & partition_columns = configuration->getPartitionColumns(); if (!partition_columns.empty() && chunk_size && chunk.hasColumns()) { auto partition_values = partition_columns.find(filename); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.h b/src/Storages/ObjectStorage/StorageObjectStorageSource.h index 566cc687400..c16619b34d8 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.h @@ -40,8 +40,7 @@ public: UInt64 max_block_size_, std::shared_ptr file_iterator_, size_t max_parsing_threads_, - bool need_only_count_, - const DataLakePartitionColumns & partition_columns_ = {}); + bool need_only_count_); ~StorageObjectStorageSource() override; @@ -83,7 +82,6 @@ protected: bool initialized = false; size_t total_rows_in_file = 0; LoggerPtr log = getLogger("StorageObjectStorageSource"); - DataLakePartitionColumns partition_columns; struct ReaderHolder : private boost::noncopyable { From fbdbbf5a782a5fb032dacf880ade1ebc505e8dea Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Fri, 28 Jun 2024 15:17:35 +0200 Subject: [PATCH 10/20] Fix typo --- src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp index 79cd48e7aab..3b6cbca5d46 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp @@ -173,7 +173,7 @@ struct DeltaLakeMetadata::Impl /// `result` is a list of data files. /// `file_schema` is a common schema for all files. /// Schema evolution is not supported, so we check that all files have the same schema. - /// `file_partiion_columns` is information about parition columns of data files. + /// `file_partiion_columns` is information about partition columns of data files. void processMetadataFile( const String & metadata_file_path, NamesAndTypesList & file_schema, From 9864508b052ba58f25431d4fbaab9a27ee658919 Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 28 Jun 2024 17:51:08 +0200 Subject: [PATCH 11/20] Fix --- src/Interpreters/ObjectStorageQueueLog.cpp | 3 --- .../ObjectStorageQueue/ObjectStorageQueueIFileMetadata.h | 1 - src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp | 5 ----- src/Storages/System/StorageSystemS3Queue.cpp | 3 --- 4 files changed, 12 deletions(-) diff --git a/src/Interpreters/ObjectStorageQueueLog.cpp b/src/Interpreters/ObjectStorageQueueLog.cpp index 24261429434..c841984fd08 100644 --- a/src/Interpreters/ObjectStorageQueueLog.cpp +++ b/src/Interpreters/ObjectStorageQueueLog.cpp @@ -36,7 +36,6 @@ ColumnsDescription ObjectStorageQueueLogElement::getColumnsDescription() {"status", status_datatype, "Status of the processing file"}, {"processing_start_time", std::make_shared(std::make_shared()), "Time of the start of processing the file"}, {"processing_end_time", std::make_shared(std::make_shared()), "Time of the end of processing the file"}, - {"ProfileEvents", std::make_shared(std::make_shared(), std::make_shared()), "Profile events collected while loading this file"}, {"exception", std::make_shared(), "Exception message if happened"}, }; } @@ -64,8 +63,6 @@ void ObjectStorageQueueLogElement::appendToBlock(MutableColumns & columns) const else columns[i++]->insertDefault(); - ProfileEvents::dumpToMapColumn(counters_snapshot, columns[i++].get(), true); - columns[i++]->insert(exception); } diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueIFileMetadata.h b/src/Storages/ObjectStorageQueue/ObjectStorageQueueIFileMetadata.h index 652b4742389..f0e55c202a2 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueIFileMetadata.h +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueIFileMetadata.h @@ -34,7 +34,6 @@ public: std::atomic processing_start_time = 0; std::atomic processing_end_time = 0; std::atomic retries = 0; - ProfileEvents::Counters profile_counters; private: mutable std::mutex last_exception_mutex; diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp index 2effbe7e7c2..5087755f9e7 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp @@ -513,10 +513,6 @@ Chunk ObjectStorageQueueSource::generateImpl() path, processed_rows_from_file); } - auto * prev_scope = CurrentThread::get().attachProfileCountersScope(&file_status->profile_counters); - SCOPE_EXIT({ CurrentThread::get().attachProfileCountersScope(prev_scope); }); - /// FIXME: if files are compressed, profile counters update does not work fully (object storage related counters are not saved). Why? - try { auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::ObjectStorageQueuePullMicroseconds); @@ -715,7 +711,6 @@ void ObjectStorageQueueSource::appendLogElement( .file_name = filename, .rows_processed = processed_rows, .status = processed ? ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Processed : ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Failed, - .counters_snapshot = file_status_.profile_counters.getPartiallyAtomicSnapshot(), .processing_start_time = file_status_.processing_start_time, .processing_end_time = file_status_.processing_end_time, .exception = file_status_.getException(), diff --git a/src/Storages/System/StorageSystemS3Queue.cpp b/src/Storages/System/StorageSystemS3Queue.cpp index c1d686067fd..a1c9380f616 100644 --- a/src/Storages/System/StorageSystemS3Queue.cpp +++ b/src/Storages/System/StorageSystemS3Queue.cpp @@ -32,7 +32,6 @@ ColumnsDescription StorageSystemS3Queue::getColumnsDescription() {"status", std::make_shared(), "Status of processing: Processed, Processing, Failed"}, {"processing_start_time", std::make_shared(std::make_shared()), "Time at which processing of the file started"}, {"processing_end_time", std::make_shared(std::make_shared()), "Time at which processing of the file ended"}, - {"ProfileEvents", std::make_shared(std::make_shared(), std::make_shared()), "Profile events collected during processing of the file"}, {"exception", std::make_shared(), "Exception which happened during processing"}, }; } @@ -65,8 +64,6 @@ void StorageSystemS3Queue::fillData(MutableColumns & res_columns, ContextPtr, co else res_columns[i++]->insertDefault(); - ProfileEvents::dumpToMapColumn(file_status->profile_counters.getPartiallyAtomicSnapshot(), res_columns[i++].get(), true); - res_columns[i++]->insert(file_status->getException()); } } From c63263fe394290e88ff34035f9213a3aa80290ef Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 28 Jun 2024 17:56:51 +0200 Subject: [PATCH 12/20] One more --- src/Interpreters/ObjectStorageQueueLog.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Interpreters/ObjectStorageQueueLog.h b/src/Interpreters/ObjectStorageQueueLog.h index b0e843a0cc3..669238d8dbb 100644 --- a/src/Interpreters/ObjectStorageQueueLog.h +++ b/src/Interpreters/ObjectStorageQueueLog.h @@ -26,7 +26,6 @@ struct ObjectStorageQueueLogElement Failed, }; ObjectStorageQueueStatus status; - ProfileEvents::Counters::Snapshot counters_snapshot; time_t processing_start_time; time_t processing_end_time; std::string exception; From 5e7a8245c467dbc40344bda67760ad1d33535994 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 1 Jul 2024 15:40:49 +0200 Subject: [PATCH 13/20] Fix memory leak --- .../ObjectStorage/DataLakes/DeltaLakeMetadata.cpp | 12 +++++++----- .../ObjectStorage/DataLakes/DeltaLakeMetadata.h | 3 --- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp index 3b6cbca5d46..bc64ef15cf1 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp @@ -51,8 +51,10 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; } -struct DeltaLakeMetadata::Impl +struct DeltaLakeMetadataImpl { + using ConfigurationPtr = DeltaLakeMetadata::ConfigurationPtr; + ObjectStoragePtr object_storage; ConfigurationPtr configuration; ContextPtr context; @@ -61,7 +63,7 @@ struct DeltaLakeMetadata::Impl * Useful links: * - https://github.com/delta-io/delta/blob/master/PROTOCOL.md#data-files */ - Impl(ObjectStoragePtr object_storage_, + DeltaLakeMetadataImpl(ObjectStoragePtr object_storage_, ConfigurationPtr configuration_, ContextPtr context_) : object_storage(object_storage_) @@ -586,14 +588,14 @@ DeltaLakeMetadata::DeltaLakeMetadata( ObjectStoragePtr object_storage_, ConfigurationPtr configuration_, ContextPtr context_) - : impl(std::make_unique(object_storage_, configuration_, context_)) { - auto result = impl->processMetadataFiles(); + auto impl = DeltaLakeMetadataImpl(object_storage_, configuration_, context_); + auto result = impl.processMetadataFiles(); data_files = result.data_files; schema = result.schema; partition_columns = result.partition_columns; - LOG_TRACE(impl->log, "Found {} data files, {} partition files, schema: {}", + LOG_TRACE(impl.log, "Found {} data files, {} partition files, schema: {}", data_files.size(), partition_columns.size(), schema.toString()); } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h index 926bd1b451d..a479a3dd293 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h @@ -45,10 +45,7 @@ public: } private: - struct Impl; - const std::shared_ptr impl; mutable Strings data_files; - NamesAndTypesList schema; std::unordered_map column_name_to_physical_name; DataLakePartitionColumns partition_columns; From ef24f517892891b3b298dc33cbb27493fa4ba668 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Mon, 1 Jul 2024 19:17:06 +0200 Subject: [PATCH 14/20] Fix handling of SSL_ERROR_WANT_READ/SSL_ERROR_WANT_WRITE with zero timeout Previously if you were using socket without timeout it wasn't able to handle SSL_ERROR_WANT_READ/SSL_ERROR_WANT_WRITE, and even though sockets without timeouts is an odd thing (but it is possible - [1]), it still may be possible somewhere. [1]: https://github.com/ClickHouse/ClickHouse/pull/65917 Signed-off-by: Azat Khuzhin --- .../include/Poco/Net/SecureSocketImpl.h | 5 +-- .../NetSSL_OpenSSL/src/SecureSocketImpl.cpp | 31 +++++++++++++------ 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/base/poco/NetSSL_OpenSSL/include/Poco/Net/SecureSocketImpl.h b/base/poco/NetSSL_OpenSSL/include/Poco/Net/SecureSocketImpl.h index 49c12b6b45f..890752c52da 100644 --- a/base/poco/NetSSL_OpenSSL/include/Poco/Net/SecureSocketImpl.h +++ b/base/poco/NetSSL_OpenSSL/include/Poco/Net/SecureSocketImpl.h @@ -235,8 +235,6 @@ namespace Net /// Note that simply closing a socket is not sufficient /// to be able to re-use it again. - Poco::Timespan getMaxTimeout(); - private: SecureSocketImpl(const SecureSocketImpl &); SecureSocketImpl & operator=(const SecureSocketImpl &); @@ -250,6 +248,9 @@ namespace Net Session::Ptr _pSession; friend class SecureStreamSocketImpl; + + Poco::Timespan getMaxTimeoutOrLimit(); + //// Return max(send, receive) if non zero, otherwise maximum timeout }; diff --git a/base/poco/NetSSL_OpenSSL/src/SecureSocketImpl.cpp b/base/poco/NetSSL_OpenSSL/src/SecureSocketImpl.cpp index efe25f65909..4873d259ae5 100644 --- a/base/poco/NetSSL_OpenSSL/src/SecureSocketImpl.cpp +++ b/base/poco/NetSSL_OpenSSL/src/SecureSocketImpl.cpp @@ -199,7 +199,7 @@ void SecureSocketImpl::connectSSL(bool performHandshake) if (performHandshake && _pSocket->getBlocking()) { int ret; - Poco::Timespan remaining_time = getMaxTimeout(); + Poco::Timespan remaining_time = getMaxTimeoutOrLimit(); do { RemainingTimeCounter counter(remaining_time); @@ -302,7 +302,7 @@ int SecureSocketImpl::sendBytes(const void* buffer, int length, int flags) return rc; } - Poco::Timespan remaining_time = getMaxTimeout(); + Poco::Timespan remaining_time = getMaxTimeoutOrLimit(); do { RemainingTimeCounter counter(remaining_time); @@ -338,7 +338,7 @@ int SecureSocketImpl::receiveBytes(void* buffer, int length, int flags) return rc; } - Poco::Timespan remaining_time = getMaxTimeout(); + Poco::Timespan remaining_time = getMaxTimeoutOrLimit(); do { /// SSL record may consist of several TCP packets, @@ -372,7 +372,7 @@ int SecureSocketImpl::completeHandshake() poco_check_ptr (_pSSL); int rc; - Poco::Timespan remaining_time = getMaxTimeout(); + Poco::Timespan remaining_time = getMaxTimeoutOrLimit(); do { RemainingTimeCounter counter(remaining_time); @@ -453,18 +453,29 @@ X509* SecureSocketImpl::peerCertificate() const return 0; } -Poco::Timespan SecureSocketImpl::getMaxTimeout() +Poco::Timespan SecureSocketImpl::getMaxTimeoutOrLimit() { std::lock_guard lock(_mutex); Poco::Timespan remaining_time = _pSocket->getReceiveTimeout(); Poco::Timespan send_timeout = _pSocket->getSendTimeout(); if (remaining_time < send_timeout) remaining_time = send_timeout; + /// zero SO_SNDTIMEO/SO_RCVTIMEO works as no timeout, let's replicate this + /// + /// NOTE: we cannot use INT64_MAX (std::numeric_limits::max()), + /// since it will be later passed to poll() which accept int timeout, and + /// even though poll() accepts milliseconds and Timespan() accepts + /// microseconds, let's use smaller maximum value just to avoid some possible + /// issues, this should be enough anyway (it is ~24 days). + if (remaining_time == 0) + remaining_time = Poco::Timespan(std::numeric_limits::max()); return remaining_time; } bool SecureSocketImpl::mustRetry(int rc, Poco::Timespan& remaining_time) { + if (remaining_time == 0) + return false; std::lock_guard lock(_mutex); if (rc <= 0) { @@ -475,9 +486,7 @@ bool SecureSocketImpl::mustRetry(int rc, Poco::Timespan& remaining_time) case SSL_ERROR_WANT_READ: if (_pSocket->getBlocking()) { - /// Level-triggered mode of epoll_wait is used, so if SSL_read don't read all available data from socket, - /// epoll_wait returns true without waiting for new data even if remaining_time == 0 - if (_pSocket->pollImpl(remaining_time, Poco::Net::Socket::SELECT_READ) && remaining_time != 0) + if (_pSocket->pollImpl(remaining_time, Poco::Net::Socket::SELECT_READ)) return true; else throw Poco::TimeoutException(); @@ -486,13 +495,15 @@ bool SecureSocketImpl::mustRetry(int rc, Poco::Timespan& remaining_time) case SSL_ERROR_WANT_WRITE: if (_pSocket->getBlocking()) { - /// The same as for SSL_ERROR_WANT_READ - if (_pSocket->pollImpl(remaining_time, Poco::Net::Socket::SELECT_WRITE) && remaining_time != 0) + if (_pSocket->pollImpl(remaining_time, Poco::Net::Socket::SELECT_WRITE)) return true; else throw Poco::TimeoutException(); } break; + /// NOTE: POCO_EINTR is the same as SSL_ERROR_WANT_READ (at least in + /// OpenSSL), so this likely dead code, but let's leave it for + /// compatibility with other implementations case SSL_ERROR_SYSCALL: return socketError == POCO_EAGAIN || socketError == POCO_EINTR; default: From 93c1b5d8a72559517bafd45cf94e2e531e39a404 Mon Sep 17 00:00:00 2001 From: Pablo Marcos Date: Tue, 2 Jul 2024 08:21:51 +0000 Subject: [PATCH 15/20] Address issues pointed out in the PR --- docs/en/sql-reference/functions/bit-functions.md | 6 +++--- src/Functions/FunctionBitTestMany.h | 2 +- src/Functions/bitTest.cpp | 8 ++++---- .../0_stateless/01082_bit_test_out_of_bound.reference | 3 +++ tests/queries/0_stateless/01082_bit_test_out_of_bound.sql | 4 ++++ 5 files changed, 15 insertions(+), 8 deletions(-) diff --git a/docs/en/sql-reference/functions/bit-functions.md b/docs/en/sql-reference/functions/bit-functions.md index a48893b93bf..5ab7e07fcad 100644 --- a/docs/en/sql-reference/functions/bit-functions.md +++ b/docs/en/sql-reference/functions/bit-functions.md @@ -173,7 +173,7 @@ See function [substring](string-functions.md#substring). ## bitTest -Takes any integer and converts it into [binary form](https://en.wikipedia.org/wiki/Binary_number), returns the value of a bit at specified position. The countdown starts from 0 from the right to the left. +Takes any integer and converts it into [binary form](https://en.wikipedia.org/wiki/Binary_number), returns the value of a bit at specified position. Counting is right-to-left, starting at 0. **Syntax** @@ -226,7 +226,7 @@ Result: ## bitTestAll -Returns result of [logical conjuction](https://en.wikipedia.org/wiki/Logical_conjunction) (AND operator) of all bits at given positions. The countdown starts from 0 from the right to the left. +Returns result of [logical conjuction](https://en.wikipedia.org/wiki/Logical_conjunction) (AND operator) of all bits at given positions. Counting is right-to-left, starting at 0. The conjuction for bit-wise operations: @@ -289,7 +289,7 @@ Result: ## bitTestAny -Returns result of [logical disjunction](https://en.wikipedia.org/wiki/Logical_disjunction) (OR operator) of all bits at given positions. The countdown starts from 0 from the right to the left. +Returns result of [logical disjunction](https://en.wikipedia.org/wiki/Logical_disjunction) (OR operator) of all bits at given positions. Counting is right-to-left, starting at 0. The disjunction for bit-wise operations: diff --git a/src/Functions/FunctionBitTestMany.h b/src/Functions/FunctionBitTestMany.h index 19ece2ae9e5..950e4ab4ea8 100644 --- a/src/Functions/FunctionBitTestMany.h +++ b/src/Functions/FunctionBitTestMany.h @@ -16,8 +16,8 @@ namespace ErrorCodes { extern const int ILLEGAL_COLUMN; extern const int ILLEGAL_TYPE_OF_ARGUMENT; - extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION; extern const int PARAMETER_OUT_OF_BOUND; + extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION; } diff --git a/src/Functions/bitTest.cpp b/src/Functions/bitTest.cpp index 1223ef7cbbb..cb6b83c1cf1 100644 --- a/src/Functions/bitTest.cpp +++ b/src/Functions/bitTest.cpp @@ -28,10 +28,10 @@ struct BitTestImpl throw Exception(ErrorCodes::NOT_IMPLEMENTED, "bitTest is not implemented for big integers as second argument"); else { - typename NumberTraits::ToInteger::Type a_int(a); - typename NumberTraits::ToInteger::Type b_int(b); - const auto max_position = decltype(b)((8 * sizeof(a)) - 1); - if (b > max_position || b < 0) + typename NumberTraits::ToInteger::Type a_int = a; + typename NumberTraits::ToInteger::Type b_int = b; + const auto max_position = static_cast((8 * sizeof(a)) - 1); + if (b_int > max_position || b_int < 0) throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND, "The bit position argument needs to a positive value and less or equal to {} for integer {}", std::to_string(max_position), std::to_string(a_int)); diff --git a/tests/queries/0_stateless/01082_bit_test_out_of_bound.reference b/tests/queries/0_stateless/01082_bit_test_out_of_bound.reference index cf12c6b0b1c..26085389381 100644 --- a/tests/queries/0_stateless/01082_bit_test_out_of_bound.reference +++ b/tests/queries/0_stateless/01082_bit_test_out_of_bound.reference @@ -1,3 +1,4 @@ +-- bitTestAny 0 1 1 0 2 1 @@ -6,6 +7,7 @@ 5 0 6 1 7 0 +-- bitTestAll 0 1 1 0 2 1 @@ -14,6 +16,7 @@ 5 0 6 1 7 0 +-- bitTest 0 1 1 0 2 1 diff --git a/tests/queries/0_stateless/01082_bit_test_out_of_bound.sql b/tests/queries/0_stateless/01082_bit_test_out_of_bound.sql index 92ece2a4aa4..e741cb249d0 100644 --- a/tests/queries/0_stateless/01082_bit_test_out_of_bound.sql +++ b/tests/queries/0_stateless/01082_bit_test_out_of_bound.sql @@ -1,8 +1,12 @@ +SELECT '-- bitTestAny'; SELECT number, bitTestAny(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(8); SELECT number, bitTestAny(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(8, 16); -- { serverError PARAMETER_OUT_OF_BOUND } + +SELECT '-- bitTestAll'; SELECT number, bitTestAll(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(8); SELECT number, bitTestAll(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(8, 16); -- { serverError PARAMETER_OUT_OF_BOUND } +SELECT '-- bitTest'; SELECT number, bitTest(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(8); SELECT number, bitTest(toUInt8(1 + 4 + 16 + 64), number) FROM numbers(8, 16); -- { serverError PARAMETER_OUT_OF_BOUND } SELECT number, bitTest(toUInt16(1 + 4 + 16 + 64 + 256 + 1024 + 4096 + 16384 + 65536), number) FROM numbers(16); From 2eb750f8ebcd74ebe2566e9138496a05f75f8b9c Mon Sep 17 00:00:00 2001 From: Blargian Date: Tue, 2 Jul 2024 11:35:54 +0200 Subject: [PATCH 16/20] add missing transactionXYZ functions to docs --- .../functions/other-functions.md | 103 +++++++++++++++++- 1 file changed, 98 insertions(+), 5 deletions(-) diff --git a/docs/en/sql-reference/functions/other-functions.md b/docs/en/sql-reference/functions/other-functions.md index 738a12143a3..4e252785715 100644 --- a/docs/en/sql-reference/functions/other-functions.md +++ b/docs/en/sql-reference/functions/other-functions.md @@ -3865,7 +3865,7 @@ Result: Returns the ID of a [transaction](https://clickhouse.com/docs/en/guides/developer/transactional#transactions-commit-and-rollback). :::note -This function is part of an experimental feature set. Enable experimental transaction support by adding this setting in `config.d/transactions.xml`: +This function is part of an experimental feature set. Enable experimental transaction support by adding this setting to your configuration: ``` @@ -3884,21 +3884,114 @@ transactionID() **Returned value** -- TransactionID. [String](../data-types/string.md). [Tuple](../data-types/tuple.md)([UInt64](../data-types/int-uint.md), [UInt64](../data-types/int-uint.md), [UUID](../data-types/uuid.md)). +- Returns a tuple consisting of `start_csn`, `local_tid` and `host_id`. [Tuple](../data-types/tuple.md). + +- `start_csn`: Global sequential number, the newest commit timestamp that was seen when this transaction began. [UInt64](../data-types/int-uint.md). +- `local_tid`: Local sequential number that is unique for each transaction started by this host within a specific start_csn. [UInt64](../data-types/int-uint.md). +- `host_id`: UUID of the host that has started this transaction. [UUID](../data-types/uuid.md). **Example** Query: ```sql +BEGIN TRANSACTION; SELECT transactionID(); +ROLLBACK; ``` Result: ```response -┌─transactionID()──────────────────────────────┐ -│ (0,0,'00000000-0000-0000-0000-000000000000') │ -└──────────────────────────────────────────────┘ +┌─transactionID()────────────────────────────────┐ +│ (32,34,'0ee8b069-f2bb-4748-9eae-069c85b5252b') │ +└────────────────────────────────────────────────┘ ``` +## transactionLatestSnapshot + +Returns the newest snapshot (Commit Sequence Number) of a [transaction](https://clickhouse.com/docs/en/guides/developer/transactional#transactions-commit-and-rollback) that is available for reading. + +:::note +This function is part of an experimental feature set. Enable experimental transaction support by adding this setting to your configuration: + +``` + + 1 + +``` + +For more information see the page [Transactional (ACID) support](https://clickhouse.com/docs/en/guides/developer/transactional#transactions-commit-and-rollback). +::: + +**Syntax** + +```sql +transactionLatestSnapshot() +``` + +**Returned value** + +- Returns the latest snapshot (CSN) of a transaction. [UInt64](../data-types/int-uint.md) + +**Example** + +Query: + +```sql +BEGIN TRANSACTION; +SELECT transactionLatestSnapshot(); +ROLLBACK; +``` + +Result: + +```response +┌─transactionLatestSnapshot()─┐ +│ 32 │ +└─────────────────────────────┘ +``` + +## transactionOldestSnapshot + +Returns the oldest snapshot (Commit Sequence Number) that is visible for some running [transaction](https://clickhouse.com/docs/en/guides/developer/transactional#transactions-commit-and-rollback). + +:::note +This function is part of an experimental feature set. Enable experimental transaction support by adding this setting to your configuration: + +``` + + 1 + +``` + +For more information see the page [Transactional (ACID) support](https://clickhouse.com/docs/en/guides/developer/transactional#transactions-commit-and-rollback). +::: + +**Syntax** + +```sql +transactionOldestSnapshot() +``` + +**Returned value** + +- Returns the oldest snapshot (CSN) of a transaction. [UInt64](../data-types/int-uint.md) + +**Example** + +Query: + +```sql +BEGIN TRANSACTION; +SELECT transactionLatestSnapshot(); +ROLLBACK; +``` + +Result: + +```response +┌─transactionOldestSnapshot()─┐ +│ 32 │ +└─────────────────────────────┘ +``` From a905b24f7585191f2271f77cc4817e11f5b758f0 Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Tue, 2 Jul 2024 11:54:56 +0200 Subject: [PATCH 17/20] Fix clang-tidy --- src/Processors/Chunk.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Processors/Chunk.cpp b/src/Processors/Chunk.cpp index 8e3ca0b03b3..5f6cf2f7230 100644 --- a/src/Processors/Chunk.cpp +++ b/src/Processors/Chunk.cpp @@ -125,7 +125,7 @@ void Chunk::addColumn(size_t position, ColumnPtr column) if (position >= columns.size()) throw Exception(ErrorCodes::POSITION_OUT_OF_BOUND, "Position {} out of bound in Chunk::addColumn(), max position = {}", - position, columns.size() ? columns.size() - 1 : 0); + position, !columns.empty() ? columns.size() - 1 : 0); if (empty()) num_rows = column->size(); else if (column->size() != num_rows) @@ -143,7 +143,7 @@ void Chunk::erase(size_t position) if (position >= columns.size()) throw Exception(ErrorCodes::POSITION_OUT_OF_BOUND, "Position {} out of bound in Chunk::erase(), max position = {}", - toString(position), toString(columns.size() ? columns.size() - 1 : 0)); + toString(position), toString(!columns.empty() ? columns.size() - 1 : 0)); columns.erase(columns.begin() + position); } From bd47b8f3a11ffe582af6b678ab6330dfdbbb00ba Mon Sep 17 00:00:00 2001 From: Shaun Struwig <41984034+Blargian@users.noreply.github.com> Date: Tue, 2 Jul 2024 16:15:32 +0200 Subject: [PATCH 18/20] Update aspell-dict.txt --- utils/check-style/aspell-ignore/en/aspell-dict.txt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/utils/check-style/aspell-ignore/en/aspell-dict.txt b/utils/check-style/aspell-ignore/en/aspell-dict.txt index 5c3991ed293..a6d9988f81f 100644 --- a/utils/check-style/aspell-ignore/en/aspell-dict.txt +++ b/utils/check-style/aspell-ignore/en/aspell-dict.txt @@ -977,6 +977,9 @@ TotalRowsOfMergeTreeTables TotalTemporaryFiles Tradeoff Transactional +transactionID +transactionLatestSnapshot +transactionOldestSnapshot Tsai Tukey TwoColumnList From f8c4d77aae321207ce2dc5920fd1e95b7985bdee Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Tue, 2 Jul 2024 14:17:19 +0000 Subject: [PATCH 19/20] Update aspell-dict --- .../aspell-ignore/en/aspell-dict.txt | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/utils/check-style/aspell-ignore/en/aspell-dict.txt b/utils/check-style/aspell-ignore/en/aspell-dict.txt index a6d9988f81f..239ade4d503 100644 --- a/utils/check-style/aspell-ignore/en/aspell-dict.txt +++ b/utils/check-style/aspell-ignore/en/aspell-dict.txt @@ -48,7 +48,6 @@ AutoML Autocompletion AvroConfluent BIGINT -bigrams BIGSERIAL BORO BSON @@ -223,7 +222,6 @@ DatabaseOrdinaryThreadsActive DateTime DateTimes DbCL -deallocated Decrypted Deduplicate Deduplication @@ -295,7 +293,6 @@ FilesystemMainPathUsedBytes FilesystemMainPathUsedINodes FixedString FlameGraph -flameGraph Flink ForEach FreeBSD @@ -977,9 +974,6 @@ TotalRowsOfMergeTreeTables TotalTemporaryFiles Tradeoff Transactional -transactionID -transactionLatestSnapshot -transactionOldestSnapshot Tsai Tukey TwoColumnList @@ -995,6 +989,8 @@ UPDATEs URIs URL URL's +URLDecode +URLEncode URLHash URLHierarchy URLPathHierarchy @@ -1012,13 +1008,10 @@ UncompressedCacheBytes UncompressedCacheCells UnidirectionalEdgeIsValid UniqThetaSketch -unigrams Updatable Uppercased Uptime Uptrace -URLDecode -URLEncode UserID Util VARCHAR @@ -1224,6 +1217,7 @@ basename bcrypt benchmarking bfloat +bigrams binlog bitAnd bitCount @@ -1473,6 +1467,7 @@ dbeaver dbgen dbms ddl +deallocated deallocation deallocations debian @@ -1512,11 +1507,11 @@ deserializing destructor destructors detectCharset -detectTonality detectLanguage detectLanguageMixed detectLanguageUnknown detectProgrammingLanguage +detectTonality determinator deterministically dictGet @@ -1532,8 +1527,8 @@ dictIsIn disableProtocols disjunction disjunctions -displaySecretsInShowAndSelect displayName +displaySecretsInShowAndSelect distro divideDecimal dmesg @@ -1583,11 +1578,11 @@ evalMLMethod exFAT expiryMsec exponentialMovingAverage -exponentialmovingaverage exponentialTimeDecayedAvg exponentialTimeDecayedCount exponentialTimeDecayedMax exponentialTimeDecayedSum +exponentialmovingaverage expr exprN extendedVerification @@ -1624,6 +1619,7 @@ firstSignificantSubdomainCustom firstSignificantSubdomainCustomRFC firstSignificantSubdomainRFC fixedstring +flameGraph flamegraph flatbuffers flattenTuple @@ -1806,8 +1802,8 @@ incrementing indexHint indexOf infi -infty inflight +infty initcap initcapUTF initialQueryID @@ -1955,9 +1951,9 @@ loghouse london lookups loongarch -lowcardinality lowCardinalityIndices lowCardinalityKeys +lowcardinality lowerUTF lowercased lttb @@ -2265,9 +2261,9 @@ proleptic prometheus proportionsZTest proto -protocol protobuf protobufsingle +protocol proxied pseudorandom pseudorandomize @@ -2758,6 +2754,12 @@ topLevelDomain topLevelDomainRFC topk topkweighted +transactionID +transactionID +transactionLatestSnapshot +transactionLatestSnapshot +transactionOldestSnapshot +transactionOldestSnapshot transactional transactionally translateUTF @@ -2811,6 +2813,7 @@ unescaping unhex unicode unidimensional +unigrams unintuitive uniq uniqCombined From 65eb46927839a42c32bf5db4054095fe7dae99a7 Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Tue, 2 Jul 2024 14:18:33 +0000 Subject: [PATCH 20/20] Remove duplicates from aspell-dict --- utils/check-style/aspell-ignore/en/aspell-dict.txt | 3 --- 1 file changed, 3 deletions(-) diff --git a/utils/check-style/aspell-ignore/en/aspell-dict.txt b/utils/check-style/aspell-ignore/en/aspell-dict.txt index 239ade4d503..ea3dd642ee0 100644 --- a/utils/check-style/aspell-ignore/en/aspell-dict.txt +++ b/utils/check-style/aspell-ignore/en/aspell-dict.txt @@ -2755,10 +2755,7 @@ topLevelDomainRFC topk topkweighted transactionID -transactionID transactionLatestSnapshot -transactionLatestSnapshot -transactionOldestSnapshot transactionOldestSnapshot transactional transactionally