Fix JSON/Dynamic Native serialization with old server and new client

This commit is contained in:
avogar 2024-11-12 13:14:53 +00:00
parent d65f68f88e
commit 69e4f93a2a
5 changed files with 23 additions and 6 deletions

View File

@ -90,6 +90,8 @@ static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS = 54470;
static constexpr auto DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL = 54471;
static constexpr auto DBMS_MIN_REVISION_WITH_V2_DYNAMIC_AND_JSON_SERIALIZATION = 54472;
/// Version of ClickHouse TCP protocol.
///
/// Should be incremented manually on protocol changes.
@ -97,6 +99,6 @@ static constexpr auto DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCO
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases).
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54471;
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54472;
}

View File

@ -286,6 +286,9 @@ public:
SUFFIX, /// Write statistics in suffix.
};
ObjectAndDynamicStatisticsMode object_and_dynamic_write_statistics = ObjectAndDynamicStatisticsMode::NONE;
/// Use old V1 serialization of JSON and Dynamic types. Needed for compatibility.
bool use_v1_object_and_dynamic_serialization = false;
};
struct DeserializeBinaryBulkSettings

View File

@ -108,14 +108,17 @@ void SerializationDynamic::serializeBinaryBulkStatePrefix(
throw Exception(ErrorCodes::LOGICAL_ERROR, "Missing stream for Dynamic column structure during serialization of binary bulk state prefix");
/// Write structure serialization version.
UInt64 structure_version = DynamicSerializationVersion::Value::V2;
UInt64 structure_version = settings.use_v1_object_and_dynamic_serialization ? DynamicSerializationVersion::Value::V1 : DynamicSerializationVersion::Value::V2;
writeBinaryLittleEndian(structure_version, *stream);
auto dynamic_state = std::make_shared<SerializeBinaryBulkStateDynamic>(structure_version);
dynamic_state->variant_type = variant_info.variant_type;
dynamic_state->variant_names = variant_info.variant_names;
const auto & variant_column = column_dynamic.getVariantColumn();
/// In V1 version write max_dynamic_types parameter.
if (structure_version == DynamicSerializationVersion::Value::V1)
writeVarUInt(column_dynamic.getMaxDynamicTypes(), *stream);
/// Write information about dynamic types.
dynamic_state->num_dynamic_types = dynamic_state->variant_names.size() - 1; /// -1 for SharedVariant
writeVarUInt(dynamic_state->num_dynamic_types, *stream);

View File

@ -187,7 +187,11 @@ void SerializationObject::serializeBinaryBulkStatePrefix(
throw Exception(ErrorCodes::LOGICAL_ERROR, "Missing stream for Object column structure during serialization of binary bulk state prefix");
/// Write serialization version.
UInt64 serialization_version = settings.write_json_as_string ? ObjectSerializationVersion::Value::STRING : ObjectSerializationVersion::Value::V2;
UInt64 serialization_version = ObjectSerializationVersion::Value::V2;
if (settings.write_json_as_string)
serialization_version = ObjectSerializationVersion::Value::STRING;
else if (settings.use_v1_object_and_dynamic_serialization)
serialization_version = ObjectSerializationVersion::Value::V1;
writeBinaryLittleEndian(serialization_version, *stream);
auto object_state = std::make_shared<SerializeBinaryBulkStateObject>(serialization_version);
@ -197,6 +201,10 @@ void SerializationObject::serializeBinaryBulkStatePrefix(
return;
}
/// In V1 version write max_dynamic_paths parameter.
if (serialization_version == ObjectSerializationVersion::Value::V1)
writeVarUInt(column_object.getMaxDynamicPaths(), *stream);
/// Write all dynamic paths in sorted order.
object_state->sorted_dynamic_paths.reserve(dynamic_paths.size());
for (const auto & [path, _] : dynamic_paths)

View File

@ -56,7 +56,7 @@ void NativeWriter::flush()
}
static void writeData(const ISerialization & serialization, const ColumnPtr & column, WriteBuffer & ostr, const std::optional<FormatSettings> & format_settings, UInt64 offset, UInt64 limit)
static void writeData(const ISerialization & serialization, const ColumnPtr & column, WriteBuffer & ostr, const std::optional<FormatSettings> & format_settings, UInt64 offset, UInt64 limit, UInt64 client_revision)
{
/** If there are columns-constants - then we materialize them.
* (Since the data type does not know how to serialize / deserialize constants.)
@ -70,6 +70,7 @@ static void writeData(const ISerialization & serialization, const ColumnPtr & co
settings.low_cardinality_max_dictionary_size = 0;
settings.data_types_binary_encoding = format_settings && format_settings->native.encode_types_in_binary_format;
settings.write_json_as_string = format_settings && format_settings->native.write_json_as_string;
settings.use_v1_object_and_dynamic_serialization = client_revision < DBMS_MIN_REVISION_WITH_V2_DYNAMIC_AND_JSON_SERIALIZATION;
ISerialization::SerializeBinaryBulkStatePtr state;
serialization.serializeBinaryBulkStatePrefix(*full_column, settings, state);
@ -181,7 +182,7 @@ size_t NativeWriter::write(const Block & block)
/// Data
if (rows) /// Zero items of data is always represented as zero number of bytes.
writeData(*serialization, column.column, ostr, format_settings, 0, 0);
writeData(*serialization, column.column, ostr, format_settings, 0, 0, client_revision);
if (index)
{