From 628a4300ba5d492cf1ae0b4b077459cd2a37f1be Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Sun, 18 Aug 2024 16:53:00 +0100 Subject: [PATCH] fix --- src/Client/Connection.cpp | 2 +- src/Core/ProtocolDefines.h | 4 +- src/Server/TCPHandler.cpp | 2 +- src/Storages/MergeTree/RequestResponse.cpp | 63 +++++++++++-------- src/Storages/MergeTree/RequestResponse.h | 4 +- .../test_parallel_replicas_protocol.py | 2 +- 6 files changed, 43 insertions(+), 34 deletions(-) diff --git a/src/Client/Connection.cpp b/src/Client/Connection.cpp index 07f4bf19f05..4f19fece5ef 100644 --- a/src/Client/Connection.cpp +++ b/src/Client/Connection.cpp @@ -1330,7 +1330,7 @@ ParallelReadRequest Connection::receiveParallelReadRequest() const InitialAllRangesAnnouncement Connection::receiveInitialParallelReadAnnouncement() const { - return InitialAllRangesAnnouncement::deserialize(*in); + return InitialAllRangesAnnouncement::deserialize(*in, server_revision); } diff --git a/src/Core/ProtocolDefines.h b/src/Core/ProtocolDefines.h index dd37daadaff..5765704e8ac 100644 --- a/src/Core/ProtocolDefines.h +++ b/src/Core/ProtocolDefines.h @@ -33,8 +33,6 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54 static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 1; -static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; -static constexpr auto DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 4; static constexpr auto DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS = 54453; static constexpr auto DBMS_MERGE_TREE_PART_INFO_VERSION = 1; @@ -84,6 +82,8 @@ static constexpr auto DBMS_MIN_REVISION_WITH_SYSTEM_KEYWORDS_TABLE = 54468; static constexpr auto DBMS_MIN_REVISION_WITH_ROWS_BEFORE_AGGREGATION = 54469; +static constexpr auto DBMS_MIN_REVISION_WITH_ADAPTIVE_MARK_SEGMENT_FOR_PARALLEL_REPLICAS = 54470; + /// Version of ClickHouse TCP protocol. /// /// Should be incremented manually on protocol changes. diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 679f72b85ff..4a017b2ae40 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -1221,7 +1221,7 @@ void TCPHandler::sendReadTaskRequestAssumeLocked() void TCPHandler::sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRangesAnnouncement announcement) { writeVarUInt(Protocol::Server::MergeTreeAllRangesAnnouncement, *out); - announcement.serialize(*out); + announcement.serialize(*out, client_tcp_protocol_version); out->next(); } diff --git a/src/Storages/MergeTree/RequestResponse.cpp b/src/Storages/MergeTree/RequestResponse.cpp index 1b0ad3cca48..5e7e392384f 100644 --- a/src/Storages/MergeTree/RequestResponse.cpp +++ b/src/Storages/MergeTree/RequestResponse.cpp @@ -2,10 +2,10 @@ #include #include -#include +#include #include #include -#include +#include #include @@ -20,20 +20,21 @@ namespace ErrorCodes namespace { - CoordinationMode validateAndGet(uint8_t candidate) - { - if (candidate <= static_cast(CoordinationMode::MAX)) - return static_cast(candidate); +constexpr UInt64 DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; - throw Exception(ErrorCodes::UNKNOWN_ELEMENT_OF_ENUM, "Unknown reading mode: {}", candidate); - } +CoordinationMode validateAndGet(uint8_t candidate) +{ + if (candidate <= static_cast(CoordinationMode::MAX)) + return static_cast(candidate); + + throw Exception(ErrorCodes::UNKNOWN_ELEMENT_OF_ENUM, "Unknown reading mode: {}", candidate); +} } void ParallelReadRequest::serialize(WriteBuffer & out) const { - UInt64 version = DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION; /// Must be the first - writeIntBinary(version, out); + writeIntBinary(DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION, out); writeIntBinary(mode, out); writeIntBinary(replica_num, out); @@ -55,10 +56,13 @@ ParallelReadRequest ParallelReadRequest::deserialize(ReadBuffer & in) { UInt64 version; readIntBinary(version, in); - if (version != DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION) - throw Exception(ErrorCodes::UNKNOWN_PROTOCOL, "Protocol versions for parallel reading "\ + if (version != DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION) + throw Exception( + ErrorCodes::UNKNOWN_PROTOCOL, + "Protocol versions for parallel reading " "from replicas differ. Got: {}, supported version: {}", - version, DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION); + version, + DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION); CoordinationMode mode; size_t replica_num; @@ -90,9 +94,8 @@ void ParallelReadRequest::merge(ParallelReadRequest & other) void ParallelReadResponse::serialize(WriteBuffer & out) const { - UInt64 version = DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION; /// Must be the first - writeIntBinary(version, out); + writeIntBinary(DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION, out); writeBoolText(finish, out); description.serialize(out); @@ -107,26 +110,29 @@ void ParallelReadResponse::deserialize(ReadBuffer & in) { UInt64 version; readIntBinary(version, in); - if (version != DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION) - throw Exception(ErrorCodes::UNKNOWN_PROTOCOL, "Protocol versions for parallel reading " \ + if (version != DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION) + throw Exception( + ErrorCodes::UNKNOWN_PROTOCOL, + "Protocol versions for parallel reading " "from replicas differ. Got: {}, supported version: {}", - version, DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION); + version, + DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION); readBoolText(finish, in); description.deserialize(in); } -void InitialAllRangesAnnouncement::serialize(WriteBuffer & out) const +void InitialAllRangesAnnouncement::serialize(WriteBuffer & out, UInt64 client_protocol_revision) const { - UInt64 version = DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION; /// Must be the first - writeIntBinary(version, out); + writeIntBinary(DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION, out); writeIntBinary(mode, out); description.serialize(out); writeIntBinary(replica_num, out); - writeIntBinary(mark_segment_size, out); + if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_ADAPTIVE_MARK_SEGMENT_FOR_PARALLEL_REPLICAS) + writeIntBinary(mark_segment_size, out); } @@ -138,14 +144,17 @@ String InitialAllRangesAnnouncement::describe() return result; } -InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffer & in) +InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffer & in, UInt64 client_protocol_revision) { UInt64 version; readIntBinary(version, in); - if (version != DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION) - throw Exception(ErrorCodes::UNKNOWN_PROTOCOL, "Protocol versions for parallel reading " \ + if (version != DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION) + throw Exception( + ErrorCodes::UNKNOWN_PROTOCOL, + "Protocol versions for parallel reading " "from replicas differ. Got: {}, supported version: {}", - version, DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION); + version, + DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION); CoordinationMode mode; RangesInDataPartsDescription description; @@ -158,7 +167,7 @@ InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffe readIntBinary(replica_num, in); size_t mark_segment_size = 128; - if (version >= 4) + if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_ADAPTIVE_MARK_SEGMENT_FOR_PARALLEL_REPLICAS) readIntBinary(mark_segment_size, in); return InitialAllRangesAnnouncement{ diff --git a/src/Storages/MergeTree/RequestResponse.h b/src/Storages/MergeTree/RequestResponse.h index 17518adf833..fcb6147c087 100644 --- a/src/Storages/MergeTree/RequestResponse.h +++ b/src/Storages/MergeTree/RequestResponse.h @@ -102,9 +102,9 @@ struct InitialAllRangesAnnouncement size_t replica_num; size_t mark_segment_size; - void serialize(WriteBuffer & out) const; + void serialize(WriteBuffer & out, UInt64 client_protocol_revision) const; String describe(); - static InitialAllRangesAnnouncement deserialize(ReadBuffer & in); + static InitialAllRangesAnnouncement deserialize(ReadBuffer & i, UInt64 client_protocol_revisionn); }; diff --git a/tests/integration/test_backward_compatibility/test_parallel_replicas_protocol.py b/tests/integration/test_backward_compatibility/test_parallel_replicas_protocol.py index 6f97df95876..e1b9049ef5d 100644 --- a/tests/integration/test_backward_compatibility/test_parallel_replicas_protocol.py +++ b/tests/integration/test_backward_compatibility/test_parallel_replicas_protocol.py @@ -11,7 +11,7 @@ nodes = [ main_configs=["configs/clusters.xml"], with_zookeeper=False, image="clickhouse/clickhouse-server", - tag="23.11", + tag="23.11", # earlier versions lead to "Not found column sum(a) in block." exception 🤷 stay_alive=True, use_old_analyzer=True, with_installed_binary=True,