revive separate protocol versioning for PRs

This commit is contained in:
Nikita Taranov 2024-09-12 15:40:51 +01:00
parent 1e3bc6d359
commit 16f93ea1b3
7 changed files with 63 additions and 52 deletions

View File

@ -455,6 +455,9 @@ void Connection::sendAddendum()
writeStringBinary(proto_recv_chunked, *out);
}
if (server_revision >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL)
writeVarUInt(DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION, *out);
out->next();
}
@ -525,6 +528,8 @@ void Connection::receiveHello(const Poco::Timespan & handshake_timeout)
readVarUInt(server_version_major, *in);
readVarUInt(server_version_minor, *in);
readVarUInt(server_revision, *in);
if (server_revision >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL)
readVarUInt(server_parallel_replicas_protocol_version, *in);
if (server_revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE)
readStringBinary(server_timezone, *in);
if (server_revision >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME)
@ -959,7 +964,7 @@ void Connection::sendReadTaskResponse(const String & response)
void Connection::sendMergeTreeReadTaskResponse(const ParallelReadResponse & response)
{
writeVarUInt(Protocol::Client::MergeTreeReadTaskResponse, *out);
response.serialize(*out);
response.serialize(*out, server_parallel_replicas_protocol_version);
out->finishChunk();
out->next();
}
@ -1413,7 +1418,7 @@ ParallelReadRequest Connection::receiveParallelReadRequest() const
InitialAllRangesAnnouncement Connection::receiveInitialParallelReadAnnouncement() const
{
return InitialAllRangesAnnouncement::deserialize(*in, server_revision);
return InitialAllRangesAnnouncement::deserialize(*in, server_parallel_replicas_protocol_version);
}

View File

@ -210,6 +210,7 @@ private:
UInt64 server_version_minor = 0;
UInt64 server_version_patch = 0;
UInt64 server_revision = 0;
UInt64 server_parallel_replicas_protocol_version = 0;
String server_timezone;
String server_display_name;

View File

@ -33,6 +33,8 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 1;
static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3;
static constexpr auto DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 4;
static constexpr auto DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS = 54453;
static constexpr auto DBMS_MERGE_TREE_PART_INFO_VERSION = 1;
@ -85,7 +87,7 @@ static constexpr auto DBMS_MIN_REVISION_WITH_ROWS_BEFORE_AGGREGATION = 54469;
/// Packets size header
static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS = 54470;
static constexpr auto DBMS_MIN_REVISION_WITH_ADAPTIVE_MARK_SEGMENT_FOR_PARALLEL_REPLICAS = 54471;
static constexpr auto DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL = 54471;
/// Version of ClickHouse TCP protocol.
///

View File

@ -1270,7 +1270,7 @@ void TCPHandler::sendReadTaskRequestAssumeLocked()
void TCPHandler::sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRangesAnnouncement announcement)
{
writeVarUInt(Protocol::Server::MergeTreeAllRangesAnnouncement, *out);
announcement.serialize(*out, client_tcp_protocol_version);
announcement.serialize(*out, client_parallel_replicas_protocol_version);
out->finishChunk();
out->next();
@ -1280,7 +1280,7 @@ void TCPHandler::sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRanges
void TCPHandler::sendMergeTreeReadTaskRequestAssumeLocked(ParallelReadRequest request)
{
writeVarUInt(Protocol::Server::MergeTreeReadTaskRequest, *out);
request.serialize(*out);
request.serialize(*out, client_parallel_replicas_protocol_version);
out->finishChunk();
out->next();
@ -1652,6 +1652,9 @@ void TCPHandler::receiveAddendum()
readStringBinary(proto_send_chunked_cl, *in);
readStringBinary(proto_recv_chunked_cl, *in);
}
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL)
readVarUInt(client_parallel_replicas_protocol_version, *in);
}
@ -1679,6 +1682,8 @@ void TCPHandler::sendHello()
writeVarUInt(VERSION_MAJOR, *out);
writeVarUInt(VERSION_MINOR, *out);
writeVarUInt(DBMS_TCP_PROTOCOL_VERSION, *out);
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL)
writeVarUInt(DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION, *out);
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE)
writeStringBinary(DateLUT::instance().getTimeZone(), *out);
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME)

View File

@ -188,6 +188,7 @@ private:
UInt64 client_version_minor = 0;
UInt64 client_version_patch = 0;
UInt32 client_tcp_protocol_version = 0;
UInt32 client_parallel_replicas_protocol_version = 0;
String proto_send_chunked_cl = "notchunked";
String proto_recv_chunked_cl = "notchunked";
String quota_key;

View File

@ -14,18 +14,12 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_PROTOCOL;
extern const int UNKNOWN_ELEMENT_OF_ENUM;
extern const int UNKNOWN_PROTOCOL;
extern const int UNKNOWN_ELEMENT_OF_ENUM;
}
namespace
{
/// Previously we had a separate protocol version number for parallel replicas.
/// But we didn't maintain backward compatibility and every protocol change was breaking.
/// Now we have to support at least minimal tail of the previous versions and the implementation
/// is based on the common tcp protocol version as in all other places.
constexpr UInt64 DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3;
CoordinationMode validateAndGet(uint8_t candidate)
{
if (candidate <= static_cast<uint8_t>(CoordinationMode::MAX))
@ -35,10 +29,15 @@ CoordinationMode validateAndGet(uint8_t candidate)
}
}
void ParallelReadRequest::serialize(WriteBuffer & out) const
void ParallelReadRequest::serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const
{
/// Must be the first
writeIntBinary(DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION, out);
/// Previously we didn't maintain backward compatibility and every change was breaking.
/// Particularly, we had an equality check for the version. To work around that code
/// in previous server versions we now have to lie to them about the version.
const UInt64 version = initiator_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL
? DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION
: DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION;
writeIntBinary(version, out);
writeIntBinary(mode, out);
writeIntBinary(replica_num, out);
@ -60,13 +59,12 @@ ParallelReadRequest ParallelReadRequest::deserialize(ReadBuffer & in)
{
UInt64 version;
readIntBinary(version, in);
if (version != DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION)
if (version < DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION)
throw Exception(
ErrorCodes::UNKNOWN_PROTOCOL,
"Protocol versions for parallel reading "
"from replicas differ. Got: {}, supported version: {}",
"Parallel replicas protocol version is too old. Got: {}, min supported version: {}",
version,
DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION);
DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION);
CoordinationMode mode;
size_t replica_num;
@ -80,12 +78,7 @@ ParallelReadRequest ParallelReadRequest::deserialize(ReadBuffer & in)
readIntBinary(min_number_of_marks, in);
description.deserialize(in);
return ParallelReadRequest(
mode,
replica_num,
min_number_of_marks,
std::move(description)
);
return ParallelReadRequest(mode, replica_num, min_number_of_marks, std::move(description));
}
void ParallelReadRequest::merge(ParallelReadRequest & other)
@ -96,10 +89,16 @@ void ParallelReadRequest::merge(ParallelReadRequest & other)
description.merge(other.description);
}
void ParallelReadResponse::serialize(WriteBuffer & out) const
void ParallelReadResponse::serialize(WriteBuffer & out, UInt64 replica_protocol_version) const
{
/// Previously we didn't maintain backward compatibility and every change was breaking.
/// Particularly, we had an equality check for the version. To work around that code
/// in previous server versions we now have to lie to them about the version.
UInt64 version = replica_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL
? DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION
: DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION;
/// Must be the first
writeIntBinary(DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION, out);
writeIntBinary(version, out);
writeBoolText(finish, out);
description.serialize(out);
@ -114,28 +113,32 @@ void ParallelReadResponse::deserialize(ReadBuffer & in)
{
UInt64 version;
readIntBinary(version, in);
if (version != DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION)
if (version < DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION)
throw Exception(
ErrorCodes::UNKNOWN_PROTOCOL,
"Protocol versions for parallel reading "
"from replicas differ. Got: {}, supported version: {}",
"Parallel replicas protocol version is too old. Got: {}, min supported version: {}",
version,
DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION);
DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION);
readBoolText(finish, in);
description.deserialize(in);
}
void InitialAllRangesAnnouncement::serialize(WriteBuffer & out, UInt64 client_protocol_revision) const
void InitialAllRangesAnnouncement::serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const
{
/// Must be the first
writeIntBinary(DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION, out);
/// Previously we didn't maintain backward compatibility and every change was breaking.
/// Particularly, we had an equality check for the version. To work around that code
/// in previous server versions we now have to lie to them about the version.
UInt64 version = initiator_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL
? DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION
: DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION;
writeIntBinary(version, out);
writeIntBinary(mode, out);
description.serialize(out);
writeIntBinary(replica_num, out);
if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_ADAPTIVE_MARK_SEGMENT_FOR_PARALLEL_REPLICAS)
if (initiator_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL)
writeIntBinary(mark_segment_size, out);
}
@ -148,17 +151,16 @@ String InitialAllRangesAnnouncement::describe()
return result;
}
InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffer & in, UInt64 client_protocol_revision)
InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffer & in, UInt64 replica_protocol_version)
{
UInt64 version;
readIntBinary(version, in);
if (version != DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION)
if (version < DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION)
throw Exception(
ErrorCodes::UNKNOWN_PROTOCOL,
"Protocol versions for parallel reading "
"from replicas differ. Got: {}, supported version: {}",
"Parallel replicas protocol version is too old. Got: {}, min supported version: {}",
version,
DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION);
DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION);
CoordinationMode mode;
RangesInDataPartsDescription description;
@ -171,15 +173,10 @@ InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffe
readIntBinary(replica_num, in);
size_t mark_segment_size = 128;
if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_ADAPTIVE_MARK_SEGMENT_FOR_PARALLEL_REPLICAS)
if (replica_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL)
readIntBinary(mark_segment_size, in);
return InitialAllRangesAnnouncement{
mode,
description,
replica_num,
mark_segment_size,
};
return InitialAllRangesAnnouncement{mode, description, replica_num, mark_segment_size};
}
}

View File

@ -63,7 +63,7 @@ struct ParallelReadRequest
/// Contains only data part names without mark ranges.
RangesInDataPartsDescription description;
void serialize(WriteBuffer & out) const;
void serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const;
String describe() const;
static ParallelReadRequest deserialize(ReadBuffer & in);
void merge(ParallelReadRequest & other);
@ -78,7 +78,7 @@ struct ParallelReadResponse
bool finish{false};
RangesInDataPartsDescription description;
void serialize(WriteBuffer & out) const;
void serialize(WriteBuffer & out, UInt64 replica_protocol_version) const;
String describe() const;
void deserialize(ReadBuffer & in);
};
@ -102,9 +102,9 @@ struct InitialAllRangesAnnouncement
size_t replica_num;
size_t mark_segment_size;
void serialize(WriteBuffer & out, UInt64 client_protocol_revision) const;
void serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const;
String describe();
static InitialAllRangesAnnouncement deserialize(ReadBuffer & i, UInt64 client_protocol_revisionn);
static InitialAllRangesAnnouncement deserialize(ReadBuffer & i, UInt64 replica_protocol_version);
};