This commit is contained in:
Pavel Kruglov 2024-11-20 15:25:22 -08:00 committed by GitHub
commit c21521ef08
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 38 additions and 6 deletions

View File

@ -93,6 +93,8 @@ static constexpr auto DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCO
/// Push externally granted roles to other nodes
static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_INTERSERVER_EXTERNALLY_GRANTED_ROLES = 54472;
static constexpr auto DBMS_MIN_REVISION_WITH_V2_DYNAMIC_AND_JSON_SERIALIZATION = 54473;
/// Version of ClickHouse TCP protocol.
///
/// Should be incremented manually on protocol changes.
@ -100,6 +102,6 @@ static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_INTERSERVER_EXTERNALLY_GRAN
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases).
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54472;
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54473;
}

View File

@ -1225,6 +1225,9 @@ Possible values: non-negative numbers. Note that if the value is too small or to
If true then data can be parsed directly to columns with custom serialization (e.g. Sparse) according to hints for serialization got from the table.
)", 0) \
\
DECLARE(Bool, merge_tree_use_v1_object_and_dynamic_serialization, false, R"(
When enabled, V1 serialization version of JSON and Dynamic types will be used in MergeTree instead of V2. Changing this setting takes affect only after server restart.
)", 0) \
DECLARE(UInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), R"(
If the number of rows to be read from a file of a [MergeTree](../../engines/table-engines/mergetree-family/mergetree.md) table exceeds `merge_tree_min_rows_for_concurrent_read` then ClickHouse tries to perform a concurrent reading from this file on several threads.

View File

@ -82,6 +82,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"backup_restore_finish_timeout_after_error_sec", 0, 180, "New setting."},
{"query_plan_merge_filters", false, true, "Allow to merge filters in the query plan. This is required to properly support filter-push-down with a new analyzer."},
{"parallel_replicas_local_plan", false, true, "Use local plan for local replica in a query with parallel replicas"},
{"merge_tree_use_v1_object_and_dynamic_serialization", true, false, "Add new serialization V2 version for JSON and Dynamic types"},
{"min_joined_block_size_bytes", 524288, 524288, "New setting."},
{"allow_experimental_bfloat16_type", false, false, "Add new experimental BFloat16 type"},
{"filesystem_cache_skip_download_if_exceeds_per_query_cache_write_limit", 1, 1, "Rename of setting skip_download_if_exceeds_query_cache_limit"},

View File

@ -286,6 +286,9 @@ public:
SUFFIX, /// Write statistics in suffix.
};
ObjectAndDynamicStatisticsMode object_and_dynamic_write_statistics = ObjectAndDynamicStatisticsMode::NONE;
/// Use old V1 serialization of JSON and Dynamic types. Needed for compatibility.
bool use_v1_object_and_dynamic_serialization = false;
};
struct DeserializeBinaryBulkSettings

View File

@ -108,16 +108,22 @@ void SerializationDynamic::serializeBinaryBulkStatePrefix(
throw Exception(ErrorCodes::LOGICAL_ERROR, "Missing stream for Dynamic column structure during serialization of binary bulk state prefix");
/// Write structure serialization version.
UInt64 structure_version = DynamicSerializationVersion::Value::V2;
UInt64 structure_version = settings.use_v1_object_and_dynamic_serialization ? DynamicSerializationVersion::Value::V1 : DynamicSerializationVersion::Value::V2;
writeBinaryLittleEndian(structure_version, *stream);
auto dynamic_state = std::make_shared<SerializeBinaryBulkStateDynamic>(structure_version);
dynamic_state->variant_type = variant_info.variant_type;
dynamic_state->variant_names = variant_info.variant_names;
const auto & variant_column = column_dynamic.getVariantColumn();
/// Write information about dynamic types.
dynamic_state->num_dynamic_types = dynamic_state->variant_names.size() - 1; /// -1 for SharedVariant
/// In V1 version we had max_dynamic_types parameter written, but now we need only actual number of variants.
/// For compatibility we need to write V1 version sometimes, but we should write number of variants instead of
/// max_dynamic_types (because now max_dynamic_types can be different in different serialized columns).
if (structure_version == DynamicSerializationVersion::Value::V1)
writeVarUInt(dynamic_state->num_dynamic_types, *stream);
writeVarUInt(dynamic_state->num_dynamic_types, *stream);
if (settings.data_types_binary_encoding)
{

View File

@ -187,7 +187,11 @@ void SerializationObject::serializeBinaryBulkStatePrefix(
throw Exception(ErrorCodes::LOGICAL_ERROR, "Missing stream for Object column structure during serialization of binary bulk state prefix");
/// Write serialization version.
UInt64 serialization_version = settings.write_json_as_string ? ObjectSerializationVersion::Value::STRING : ObjectSerializationVersion::Value::V2;
UInt64 serialization_version = ObjectSerializationVersion::Value::V2;
if (settings.write_json_as_string)
serialization_version = ObjectSerializationVersion::Value::STRING;
else if (settings.use_v1_object_and_dynamic_serialization)
serialization_version = ObjectSerializationVersion::Value::V1;
writeBinaryLittleEndian(serialization_version, *stream);
auto object_state = std::make_shared<SerializeBinaryBulkStateObject>(serialization_version);
@ -202,6 +206,13 @@ void SerializationObject::serializeBinaryBulkStatePrefix(
for (const auto & [path, _] : dynamic_paths)
object_state->sorted_dynamic_paths.push_back(path);
std::sort(object_state->sorted_dynamic_paths.begin(), object_state->sorted_dynamic_paths.end());
/// In V1 version we had max_dynamic_paths parameter written, but now we need only actual number of dynamic paths.
/// For compatibility we need to write V1 version sometimes, but we should write number of dynamic paths instead of
/// max_dynamic_paths (because now max_dynamic_paths can be different in different serialized columns).
if (serialization_version == ObjectSerializationVersion::Value::V1)
writeVarUInt(object_state->sorted_dynamic_paths.size(), *stream);
writeVarUInt(object_state->sorted_dynamic_paths.size(), *stream);
for (const auto & path : object_state->sorted_dynamic_paths)
writeStringBinary(path, *stream);

View File

@ -56,7 +56,7 @@ void NativeWriter::flush()
}
static void writeData(const ISerialization & serialization, const ColumnPtr & column, WriteBuffer & ostr, const std::optional<FormatSettings> & format_settings, UInt64 offset, UInt64 limit)
static void writeData(const ISerialization & serialization, const ColumnPtr & column, WriteBuffer & ostr, const std::optional<FormatSettings> & format_settings, UInt64 offset, UInt64 limit, UInt64 client_revision)
{
/** If there are columns-constants - then we materialize them.
* (Since the data type does not know how to serialize / deserialize constants.)
@ -70,6 +70,7 @@ static void writeData(const ISerialization & serialization, const ColumnPtr & co
settings.low_cardinality_max_dictionary_size = 0;
settings.data_types_binary_encoding = format_settings && format_settings->native.encode_types_in_binary_format;
settings.write_json_as_string = format_settings && format_settings->native.write_json_as_string;
settings.use_v1_object_and_dynamic_serialization = client_revision < DBMS_MIN_REVISION_WITH_V2_DYNAMIC_AND_JSON_SERIALIZATION;
ISerialization::SerializeBinaryBulkStatePtr state;
serialization.serializeBinaryBulkStatePrefix(*full_column, settings, state);
@ -181,7 +182,7 @@ size_t NativeWriter::write(const Block & block)
/// Data
if (rows) /// Zero items of data is always represented as zero number of bytes.
writeData(*serialization, column.column, ostr, format_settings, 0, 0);
writeData(*serialization, column.column, ostr, format_settings, 0, 0, client_revision);
if (index)
{

View File

@ -154,6 +154,7 @@ void writeColumnSingleGranule(
serialize_settings.position_independent_encoding = true;
serialize_settings.low_cardinality_max_dictionary_size = 0;
serialize_settings.use_compact_variant_discriminators_serialization = settings.use_compact_variant_discriminators_serialization;
serialize_settings.use_v1_object_and_dynamic_serialization = settings.use_v1_object_and_dynamic_serialization;
serialize_settings.object_and_dynamic_write_statistics = ISerialization::SerializeBinaryBulkSettings::ObjectAndDynamicStatisticsMode::PREFIX;
serialization->serializeBinaryBulkStatePrefix(*column.column, serialize_settings, state);

View File

@ -467,6 +467,7 @@ void MergeTreeDataPartWriterWide::writeColumn(
{
ISerialization::SerializeBinaryBulkSettings serialize_settings;
serialize_settings.use_compact_variant_discriminators_serialization = settings.use_compact_variant_discriminators_serialization;
serialize_settings.use_v1_object_and_dynamic_serialization = settings.use_v1_object_and_dynamic_serialization;
serialize_settings.getter = createStreamGetter(name_and_type, offset_columns);
serialization->serializeBinaryBulkStatePrefix(column, serialize_settings, it->second);
}

View File

@ -12,6 +12,7 @@ namespace Setting
extern const SettingsBool low_cardinality_use_single_dictionary_for_part;
extern const SettingsUInt64 min_compress_block_size;
extern const SettingsUInt64 max_compress_block_size;
extern const SettingsBool merge_tree_use_v1_object_and_dynamic_serialization;
}
namespace MergeTreeSetting
@ -53,6 +54,7 @@ MergeTreeWriterSettings::MergeTreeWriterSettings(
, low_cardinality_max_dictionary_size(global_settings[Setting::low_cardinality_max_dictionary_size])
, low_cardinality_use_single_dictionary_for_part(global_settings[Setting::low_cardinality_use_single_dictionary_for_part] != 0)
, use_compact_variant_discriminators_serialization((*storage_settings)[MergeTreeSetting::use_compact_variant_discriminators_serialization])
, use_v1_object_and_dynamic_serialization(global_settings[Setting::merge_tree_use_v1_object_and_dynamic_serialization])
, use_adaptive_write_buffer_for_dynamic_subcolumns((*storage_settings)[MergeTreeSetting::use_adaptive_write_buffer_for_dynamic_subcolumns])
, adaptive_write_buffer_initial_size((*storage_settings)[MergeTreeSetting::adaptive_write_buffer_initial_size])
{

View File

@ -85,6 +85,7 @@ struct MergeTreeWriterSettings
size_t low_cardinality_max_dictionary_size;
bool low_cardinality_use_single_dictionary_for_part;
bool use_compact_variant_discriminators_serialization;
bool use_v1_object_and_dynamic_serialization;
bool use_adaptive_write_buffer_for_dynamic_subcolumns;
size_t adaptive_write_buffer_initial_size;
};