From 69e4f93a2adb1b8a88e6239406f6a4f122c08010 Mon Sep 17 00:00:00 2001 From: avogar Date: Tue, 12 Nov 2024 13:14:53 +0000 Subject: [PATCH 1/5] Fix JSON/Dynamic Native serialization with old server and new client --- src/Core/ProtocolDefines.h | 4 +++- src/DataTypes/Serializations/ISerialization.h | 3 +++ src/DataTypes/Serializations/SerializationDynamic.cpp | 7 +++++-- src/DataTypes/Serializations/SerializationObject.cpp | 10 +++++++++- src/Formats/NativeWriter.cpp | 5 +++-- 5 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/Core/ProtocolDefines.h b/src/Core/ProtocolDefines.h index b68eff0aa5a..f8b0544a26b 100644 --- a/src/Core/ProtocolDefines.h +++ b/src/Core/ProtocolDefines.h @@ -90,6 +90,8 @@ static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS = 54470; static constexpr auto DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL = 54471; +static constexpr auto DBMS_MIN_REVISION_WITH_V2_DYNAMIC_AND_JSON_SERIALIZATION = 54472; + /// Version of ClickHouse TCP protocol. /// /// Should be incremented manually on protocol changes. @@ -97,6 +99,6 @@ static constexpr auto DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCO /// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION, /// later is just a number for server version (one number instead of commit SHA) /// for simplicity (sometimes it may be more convenient in some use cases). -static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54471; +static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54472; } diff --git a/src/DataTypes/Serializations/ISerialization.h b/src/DataTypes/Serializations/ISerialization.h index 400bdbf32d3..90ae6cde0ce 100644 --- a/src/DataTypes/Serializations/ISerialization.h +++ b/src/DataTypes/Serializations/ISerialization.h @@ -286,6 +286,9 @@ public: SUFFIX, /// Write statistics in suffix. }; ObjectAndDynamicStatisticsMode object_and_dynamic_write_statistics = ObjectAndDynamicStatisticsMode::NONE; + + /// Use old V1 serialization of JSON and Dynamic types. Needed for compatibility. + bool use_v1_object_and_dynamic_serialization = false; }; struct DeserializeBinaryBulkSettings diff --git a/src/DataTypes/Serializations/SerializationDynamic.cpp b/src/DataTypes/Serializations/SerializationDynamic.cpp index 91c8797d43f..fe8e79f1ee9 100644 --- a/src/DataTypes/Serializations/SerializationDynamic.cpp +++ b/src/DataTypes/Serializations/SerializationDynamic.cpp @@ -108,14 +108,17 @@ void SerializationDynamic::serializeBinaryBulkStatePrefix( throw Exception(ErrorCodes::LOGICAL_ERROR, "Missing stream for Dynamic column structure during serialization of binary bulk state prefix"); /// Write structure serialization version. - UInt64 structure_version = DynamicSerializationVersion::Value::V2; + UInt64 structure_version = settings.use_v1_object_and_dynamic_serialization ? DynamicSerializationVersion::Value::V1 : DynamicSerializationVersion::Value::V2; writeBinaryLittleEndian(structure_version, *stream); auto dynamic_state = std::make_shared(structure_version); - dynamic_state->variant_type = variant_info.variant_type; dynamic_state->variant_names = variant_info.variant_names; const auto & variant_column = column_dynamic.getVariantColumn(); + /// In V1 version write max_dynamic_types parameter. + if (structure_version == DynamicSerializationVersion::Value::V1) + writeVarUInt(column_dynamic.getMaxDynamicTypes(), *stream); + /// Write information about dynamic types. dynamic_state->num_dynamic_types = dynamic_state->variant_names.size() - 1; /// -1 for SharedVariant writeVarUInt(dynamic_state->num_dynamic_types, *stream); diff --git a/src/DataTypes/Serializations/SerializationObject.cpp b/src/DataTypes/Serializations/SerializationObject.cpp index 1b95fddee9f..924a6a30610 100644 --- a/src/DataTypes/Serializations/SerializationObject.cpp +++ b/src/DataTypes/Serializations/SerializationObject.cpp @@ -187,7 +187,11 @@ void SerializationObject::serializeBinaryBulkStatePrefix( throw Exception(ErrorCodes::LOGICAL_ERROR, "Missing stream for Object column structure during serialization of binary bulk state prefix"); /// Write serialization version. - UInt64 serialization_version = settings.write_json_as_string ? ObjectSerializationVersion::Value::STRING : ObjectSerializationVersion::Value::V2; + UInt64 serialization_version = ObjectSerializationVersion::Value::V2; + if (settings.write_json_as_string) + serialization_version = ObjectSerializationVersion::Value::STRING; + else if (settings.use_v1_object_and_dynamic_serialization) + serialization_version = ObjectSerializationVersion::Value::V1; writeBinaryLittleEndian(serialization_version, *stream); auto object_state = std::make_shared(serialization_version); @@ -197,6 +201,10 @@ void SerializationObject::serializeBinaryBulkStatePrefix( return; } + /// In V1 version write max_dynamic_paths parameter. + if (serialization_version == ObjectSerializationVersion::Value::V1) + writeVarUInt(column_object.getMaxDynamicPaths(), *stream); + /// Write all dynamic paths in sorted order. object_state->sorted_dynamic_paths.reserve(dynamic_paths.size()); for (const auto & [path, _] : dynamic_paths) diff --git a/src/Formats/NativeWriter.cpp b/src/Formats/NativeWriter.cpp index a2e0b0a5571..8db63136008 100644 --- a/src/Formats/NativeWriter.cpp +++ b/src/Formats/NativeWriter.cpp @@ -56,7 +56,7 @@ void NativeWriter::flush() } -static void writeData(const ISerialization & serialization, const ColumnPtr & column, WriteBuffer & ostr, const std::optional & format_settings, UInt64 offset, UInt64 limit) +static void writeData(const ISerialization & serialization, const ColumnPtr & column, WriteBuffer & ostr, const std::optional & format_settings, UInt64 offset, UInt64 limit, UInt64 client_revision) { /** If there are columns-constants - then we materialize them. * (Since the data type does not know how to serialize / deserialize constants.) @@ -70,6 +70,7 @@ static void writeData(const ISerialization & serialization, const ColumnPtr & co settings.low_cardinality_max_dictionary_size = 0; settings.data_types_binary_encoding = format_settings && format_settings->native.encode_types_in_binary_format; settings.write_json_as_string = format_settings && format_settings->native.write_json_as_string; + settings.use_v1_object_and_dynamic_serialization = client_revision < DBMS_MIN_REVISION_WITH_V2_DYNAMIC_AND_JSON_SERIALIZATION; ISerialization::SerializeBinaryBulkStatePtr state; serialization.serializeBinaryBulkStatePrefix(*full_column, settings, state); @@ -181,7 +182,7 @@ size_t NativeWriter::write(const Block & block) /// Data if (rows) /// Zero items of data is always represented as zero number of bytes. - writeData(*serialization, column.column, ostr, format_settings, 0, 0); + writeData(*serialization, column.column, ostr, format_settings, 0, 0, client_revision); if (index) { From 013fde41e46ab03a8e32a96b2865375781a434cd Mon Sep 17 00:00:00 2001 From: avogar Date: Tue, 12 Nov 2024 15:18:00 +0000 Subject: [PATCH 2/5] Add setting to fallback to V1 serialization for Dynamic and Object --- src/Core/Settings.cpp | 3 +++ src/Core/SettingsChangesHistory.cpp | 1 + src/DataTypes/Serializations/SerializationDynamic.cpp | 11 +++++++---- src/DataTypes/Serializations/SerializationObject.cpp | 11 +++++++---- .../MergeTree/MergeTreeDataPartWriterCompact.cpp | 1 + .../MergeTree/MergeTreeDataPartWriterWide.cpp | 1 + src/Storages/MergeTree/MergeTreeIOSettings.cpp | 2 ++ src/Storages/MergeTree/MergeTreeIOSettings.h | 1 + 8 files changed, 23 insertions(+), 8 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 4c8761e503e..89adade7094 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -1222,6 +1222,9 @@ Possible values: non-negative numbers. Note that if the value is too small or to If true then data can be parsed directly to columns with custom serialization (e.g. Sparse) according to hints for serialization got from the table. )", 0) \ \ + DECLARE(Bool, merge_tree_use_v1_object_and_dynamic_serialization, false, R"( +When enabled, V1 serialization version of JSON and Dynamic types will be used in MergeTree instead of V2. +)", 0) \ DECLARE(UInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), R"( If the number of rows to be read from a file of a [MergeTree](../../engines/table-engines/mergetree-family/mergetree.md) table exceeds `merge_tree_min_rows_for_concurrent_read` then ClickHouse tries to perform a concurrent reading from this file on several threads. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 0ff9d0a6833..ef51df7b26b 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -77,6 +77,7 @@ static std::initializer_listvariant_names = variant_info.variant_names; const auto & variant_column = column_dynamic.getVariantColumn(); - /// In V1 version write max_dynamic_types parameter. - if (structure_version == DynamicSerializationVersion::Value::V1) - writeVarUInt(column_dynamic.getMaxDynamicTypes(), *stream); - /// Write information about dynamic types. dynamic_state->num_dynamic_types = dynamic_state->variant_names.size() - 1; /// -1 for SharedVariant + + /// In V1 version we had max_dynamic_types parameter written, but now we need only actual number of variants. + /// For compatibility we need to write V1 version sometimes, but we should write number of variants instead of + /// max_dynamic_types (because now max_dynamic_types can be different in different serialized columns). + if (structure_version == DynamicSerializationVersion::Value::V1) + writeVarUInt(dynamic_state->num_dynamic_types, *stream); + writeVarUInt(dynamic_state->num_dynamic_types, *stream); if (settings.data_types_binary_encoding) { diff --git a/src/DataTypes/Serializations/SerializationObject.cpp b/src/DataTypes/Serializations/SerializationObject.cpp index 924a6a30610..6d09c0a0396 100644 --- a/src/DataTypes/Serializations/SerializationObject.cpp +++ b/src/DataTypes/Serializations/SerializationObject.cpp @@ -201,15 +201,18 @@ void SerializationObject::serializeBinaryBulkStatePrefix( return; } - /// In V1 version write max_dynamic_paths parameter. - if (serialization_version == ObjectSerializationVersion::Value::V1) - writeVarUInt(column_object.getMaxDynamicPaths(), *stream); - /// Write all dynamic paths in sorted order. object_state->sorted_dynamic_paths.reserve(dynamic_paths.size()); for (const auto & [path, _] : dynamic_paths) object_state->sorted_dynamic_paths.push_back(path); std::sort(object_state->sorted_dynamic_paths.begin(), object_state->sorted_dynamic_paths.end()); + + /// In V1 version we had max_dynamic_paths parameter written, but now we need only actual number of dynamic paths. + /// For compatibility we need to write V1 version sometimes, but we should write number of dynamic paths instead of + /// max_dynamic_paths (because now max_dynamic_paths can be different in different serialized columns). + if (serialization_version == ObjectSerializationVersion::Value::V1) + writeVarUInt(object_state->sorted_dynamic_paths.size(), *stream); + writeVarUInt(object_state->sorted_dynamic_paths.size(), *stream); for (const auto & path : object_state->sorted_dynamic_paths) writeStringBinary(path, *stream); diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp index c8d11ced683..94de76b6d52 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp @@ -154,6 +154,7 @@ void writeColumnSingleGranule( serialize_settings.position_independent_encoding = true; serialize_settings.low_cardinality_max_dictionary_size = 0; serialize_settings.use_compact_variant_discriminators_serialization = settings.use_compact_variant_discriminators_serialization; + serialize_settings.use_v1_object_and_dynamic_serialization = settings.use_v1_object_and_dynamic_serialization; serialize_settings.object_and_dynamic_write_statistics = ISerialization::SerializeBinaryBulkSettings::ObjectAndDynamicStatisticsMode::PREFIX; serialization->serializeBinaryBulkStatePrefix(*column.column, serialize_settings, state); diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 7c9724b1b75..c8416847cb5 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -462,6 +462,7 @@ void MergeTreeDataPartWriterWide::writeColumn( { ISerialization::SerializeBinaryBulkSettings serialize_settings; serialize_settings.use_compact_variant_discriminators_serialization = settings.use_compact_variant_discriminators_serialization; + serialize_settings.use_v1_object_and_dynamic_serialization = settings.use_v1_object_and_dynamic_serialization; serialize_settings.getter = createStreamGetter(name_and_type, offset_columns); serialization->serializeBinaryBulkStatePrefix(column, serialize_settings, it->second); } diff --git a/src/Storages/MergeTree/MergeTreeIOSettings.cpp b/src/Storages/MergeTree/MergeTreeIOSettings.cpp index bacfbbd5720..dd6d0fea602 100644 --- a/src/Storages/MergeTree/MergeTreeIOSettings.cpp +++ b/src/Storages/MergeTree/MergeTreeIOSettings.cpp @@ -12,6 +12,7 @@ namespace Setting extern const SettingsBool low_cardinality_use_single_dictionary_for_part; extern const SettingsUInt64 min_compress_block_size; extern const SettingsUInt64 max_compress_block_size; + extern const SettingsBool merge_tree_use_v1_object_and_dynamic_serialization; } namespace MergeTreeSetting @@ -53,6 +54,7 @@ MergeTreeWriterSettings::MergeTreeWriterSettings( , low_cardinality_max_dictionary_size(global_settings[Setting::low_cardinality_max_dictionary_size]) , low_cardinality_use_single_dictionary_for_part(global_settings[Setting::low_cardinality_use_single_dictionary_for_part] != 0) , use_compact_variant_discriminators_serialization((*storage_settings)[MergeTreeSetting::use_compact_variant_discriminators_serialization]) + , use_v1_object_and_dynamic_serialization(global_settings[Setting::merge_tree_use_v1_object_and_dynamic_serialization]) , use_adaptive_write_buffer_for_dynamic_subcolumns((*storage_settings)[MergeTreeSetting::use_adaptive_write_buffer_for_dynamic_subcolumns]) , adaptive_write_buffer_initial_size((*storage_settings)[MergeTreeSetting::adaptive_write_buffer_initial_size]) { diff --git a/src/Storages/MergeTree/MergeTreeIOSettings.h b/src/Storages/MergeTree/MergeTreeIOSettings.h index 4d1d2533729..474fe8aae41 100644 --- a/src/Storages/MergeTree/MergeTreeIOSettings.h +++ b/src/Storages/MergeTree/MergeTreeIOSettings.h @@ -83,6 +83,7 @@ struct MergeTreeWriterSettings size_t low_cardinality_max_dictionary_size; bool low_cardinality_use_single_dictionary_for_part; bool use_compact_variant_discriminators_serialization; + bool use_v1_object_and_dynamic_serialization; bool use_adaptive_write_buffer_for_dynamic_subcolumns; size_t adaptive_write_buffer_initial_size; }; From cfecdd60dda82abe50898360e4313c87051f16d2 Mon Sep 17 00:00:00 2001 From: Pavel Kruglov <48961922+Avogar@users.noreply.github.com> Date: Tue, 12 Nov 2024 17:12:07 +0100 Subject: [PATCH 3/5] Update SettingsChangesHistory.cpp --- src/Core/SettingsChangesHistory.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index f87052e5298..b1b686dfa52 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -77,7 +77,7 @@ static std::initializer_list Date: Tue, 12 Nov 2024 17:13:49 +0100 Subject: [PATCH 4/5] Update Settings.cpp --- src/Core/Settings.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 62423fca71b..e30663ae51d 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -1223,7 +1223,7 @@ If true then data can be parsed directly to columns with custom serialization (e )", 0) \ \ DECLARE(Bool, merge_tree_use_v1_object_and_dynamic_serialization, false, R"( -When enabled, V1 serialization version of JSON and Dynamic types will be used in MergeTree instead of V2. +When enabled, V1 serialization version of JSON and Dynamic types will be used in MergeTree instead of V2. Changing this setting takes affect only after server restart. )", 0) \ DECLARE(UInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), R"( If the number of rows to be read from a file of a [MergeTree](../../engines/table-engines/mergetree-family/mergetree.md) table exceeds `merge_tree_min_rows_for_concurrent_read` then ClickHouse tries to perform a concurrent reading from this file on several threads. From 0e70a375dc4462834947ec503a73b9e2c6be4c39 Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 13 Nov 2024 13:19:12 +0000 Subject: [PATCH 5/5] Restart CI