This commit is contained in:
Nikita Taranov 2024-08-18 16:53:00 +01:00
parent e7fc89ba26
commit 628a4300ba
6 changed files with 43 additions and 34 deletions

View File

@ -1330,7 +1330,7 @@ ParallelReadRequest Connection::receiveParallelReadRequest() const
InitialAllRangesAnnouncement Connection::receiveInitialParallelReadAnnouncement() const
{
return InitialAllRangesAnnouncement::deserialize(*in);
return InitialAllRangesAnnouncement::deserialize(*in, server_revision);
}

View File

@ -33,8 +33,6 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 1;
static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3;
static constexpr auto DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 4;
static constexpr auto DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS = 54453;
static constexpr auto DBMS_MERGE_TREE_PART_INFO_VERSION = 1;
@ -84,6 +82,8 @@ static constexpr auto DBMS_MIN_REVISION_WITH_SYSTEM_KEYWORDS_TABLE = 54468;
static constexpr auto DBMS_MIN_REVISION_WITH_ROWS_BEFORE_AGGREGATION = 54469;
static constexpr auto DBMS_MIN_REVISION_WITH_ADAPTIVE_MARK_SEGMENT_FOR_PARALLEL_REPLICAS = 54470;
/// Version of ClickHouse TCP protocol.
///
/// Should be incremented manually on protocol changes.

View File

@ -1221,7 +1221,7 @@ void TCPHandler::sendReadTaskRequestAssumeLocked()
void TCPHandler::sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRangesAnnouncement announcement)
{
writeVarUInt(Protocol::Server::MergeTreeAllRangesAnnouncement, *out);
announcement.serialize(*out);
announcement.serialize(*out, client_tcp_protocol_version);
out->next();
}

View File

@ -2,10 +2,10 @@
#include <Storages/MergeTree/RequestResponse.h>
#include <Core/ProtocolDefines.h>
#include <Common/SipHash.h>
#include <IO/ReadHelpers.h>
#include <IO/VarInt.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Common/SipHash.h>
#include <consistent_hashing.h>
@ -20,20 +20,21 @@ namespace ErrorCodes
namespace
{
CoordinationMode validateAndGet(uint8_t candidate)
{
if (candidate <= static_cast<uint8_t>(CoordinationMode::MAX))
return static_cast<CoordinationMode>(candidate);
constexpr UInt64 DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3;
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_OF_ENUM, "Unknown reading mode: {}", candidate);
}
CoordinationMode validateAndGet(uint8_t candidate)
{
if (candidate <= static_cast<uint8_t>(CoordinationMode::MAX))
return static_cast<CoordinationMode>(candidate);
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_OF_ENUM, "Unknown reading mode: {}", candidate);
}
}
void ParallelReadRequest::serialize(WriteBuffer & out) const
{
UInt64 version = DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION;
/// Must be the first
writeIntBinary(version, out);
writeIntBinary(DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION, out);
writeIntBinary(mode, out);
writeIntBinary(replica_num, out);
@ -55,10 +56,13 @@ ParallelReadRequest ParallelReadRequest::deserialize(ReadBuffer & in)
{
UInt64 version;
readIntBinary(version, in);
if (version != DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION)
throw Exception(ErrorCodes::UNKNOWN_PROTOCOL, "Protocol versions for parallel reading "\
if (version != DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION)
throw Exception(
ErrorCodes::UNKNOWN_PROTOCOL,
"Protocol versions for parallel reading "
"from replicas differ. Got: {}, supported version: {}",
version, DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION);
version,
DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION);
CoordinationMode mode;
size_t replica_num;
@ -90,9 +94,8 @@ void ParallelReadRequest::merge(ParallelReadRequest & other)
void ParallelReadResponse::serialize(WriteBuffer & out) const
{
UInt64 version = DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION;
/// Must be the first
writeIntBinary(version, out);
writeIntBinary(DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION, out);
writeBoolText(finish, out);
description.serialize(out);
@ -107,26 +110,29 @@ void ParallelReadResponse::deserialize(ReadBuffer & in)
{
UInt64 version;
readIntBinary(version, in);
if (version != DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION)
throw Exception(ErrorCodes::UNKNOWN_PROTOCOL, "Protocol versions for parallel reading " \
if (version != DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION)
throw Exception(
ErrorCodes::UNKNOWN_PROTOCOL,
"Protocol versions for parallel reading "
"from replicas differ. Got: {}, supported version: {}",
version, DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION);
version,
DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION);
readBoolText(finish, in);
description.deserialize(in);
}
void InitialAllRangesAnnouncement::serialize(WriteBuffer & out) const
void InitialAllRangesAnnouncement::serialize(WriteBuffer & out, UInt64 client_protocol_revision) const
{
UInt64 version = DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION;
/// Must be the first
writeIntBinary(version, out);
writeIntBinary(DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION, out);
writeIntBinary(mode, out);
description.serialize(out);
writeIntBinary(replica_num, out);
writeIntBinary(mark_segment_size, out);
if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_ADAPTIVE_MARK_SEGMENT_FOR_PARALLEL_REPLICAS)
writeIntBinary(mark_segment_size, out);
}
@ -138,14 +144,17 @@ String InitialAllRangesAnnouncement::describe()
return result;
}
InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffer & in)
InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffer & in, UInt64 client_protocol_revision)
{
UInt64 version;
readIntBinary(version, in);
if (version != DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION)
throw Exception(ErrorCodes::UNKNOWN_PROTOCOL, "Protocol versions for parallel reading " \
if (version != DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION)
throw Exception(
ErrorCodes::UNKNOWN_PROTOCOL,
"Protocol versions for parallel reading "
"from replicas differ. Got: {}, supported version: {}",
version, DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION);
version,
DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION);
CoordinationMode mode;
RangesInDataPartsDescription description;
@ -158,7 +167,7 @@ InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffe
readIntBinary(replica_num, in);
size_t mark_segment_size = 128;
if (version >= 4)
if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_ADAPTIVE_MARK_SEGMENT_FOR_PARALLEL_REPLICAS)
readIntBinary(mark_segment_size, in);
return InitialAllRangesAnnouncement{

View File

@ -102,9 +102,9 @@ struct InitialAllRangesAnnouncement
size_t replica_num;
size_t mark_segment_size;
void serialize(WriteBuffer & out) const;
void serialize(WriteBuffer & out, UInt64 client_protocol_revision) const;
String describe();
static InitialAllRangesAnnouncement deserialize(ReadBuffer & in);
static InitialAllRangesAnnouncement deserialize(ReadBuffer & i, UInt64 client_protocol_revisionn);
};

View File

@ -11,7 +11,7 @@ nodes = [
main_configs=["configs/clusters.xml"],
with_zookeeper=False,
image="clickhouse/clickhouse-server",
tag="23.11",
tag="23.11", # earlier versions lead to "Not found column sum(a) in block." exception 🤷
stay_alive=True,
use_old_analyzer=True,
with_installed_binary=True,