Merge pull request #68424 from ClickHouse/adaptive_parallel_replicas

Adaptive mark_segment_size for parallel replicas
This commit is contained in:
Nikita Taranov 2024-09-17 13:52:42 +00:00 committed by GitHub
commit ffaf97a390
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 308 additions and 97 deletions

View File

@ -455,6 +455,9 @@ void Connection::sendAddendum()
writeStringBinary(proto_recv_chunked, *out); writeStringBinary(proto_recv_chunked, *out);
} }
if (server_revision >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL)
writeVarUInt(DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION, *out);
out->next(); out->next();
} }
@ -525,6 +528,8 @@ void Connection::receiveHello(const Poco::Timespan & handshake_timeout)
readVarUInt(server_version_major, *in); readVarUInt(server_version_major, *in);
readVarUInt(server_version_minor, *in); readVarUInt(server_version_minor, *in);
readVarUInt(server_revision, *in); readVarUInt(server_revision, *in);
if (server_revision >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL)
readVarUInt(server_parallel_replicas_protocol_version, *in);
if (server_revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE) if (server_revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE)
readStringBinary(server_timezone, *in); readStringBinary(server_timezone, *in);
if (server_revision >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME) if (server_revision >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME)
@ -959,7 +964,7 @@ void Connection::sendReadTaskResponse(const String & response)
void Connection::sendMergeTreeReadTaskResponse(const ParallelReadResponse & response) void Connection::sendMergeTreeReadTaskResponse(const ParallelReadResponse & response)
{ {
writeVarUInt(Protocol::Client::MergeTreeReadTaskResponse, *out); writeVarUInt(Protocol::Client::MergeTreeReadTaskResponse, *out);
response.serialize(*out); response.serialize(*out, server_parallel_replicas_protocol_version);
out->finishChunk(); out->finishChunk();
out->next(); out->next();
} }
@ -1413,7 +1418,7 @@ ParallelReadRequest Connection::receiveParallelReadRequest() const
InitialAllRangesAnnouncement Connection::receiveInitialParallelReadAnnouncement() const InitialAllRangesAnnouncement Connection::receiveInitialParallelReadAnnouncement() const
{ {
return InitialAllRangesAnnouncement::deserialize(*in); return InitialAllRangesAnnouncement::deserialize(*in, server_parallel_replicas_protocol_version);
} }

View File

@ -210,6 +210,7 @@ private:
UInt64 server_version_minor = 0; UInt64 server_version_minor = 0;
UInt64 server_version_patch = 0; UInt64 server_version_patch = 0;
UInt64 server_revision = 0; UInt64 server_revision = 0;
UInt64 server_parallel_replicas_protocol_version = 0;
String server_timezone; String server_timezone;
String server_display_name; String server_display_name;

View File

@ -376,6 +376,7 @@ The server successfully detected this situation and will download merged part fr
M(ParallelReplicasReadAssignedMarks, "Sum across all replicas of how many of scheduled marks were assigned by consistent hash") \ M(ParallelReplicasReadAssignedMarks, "Sum across all replicas of how many of scheduled marks were assigned by consistent hash") \
M(ParallelReplicasReadUnassignedMarks, "Sum across all replicas of how many unassigned marks were scheduled") \ M(ParallelReplicasReadUnassignedMarks, "Sum across all replicas of how many unassigned marks were scheduled") \
M(ParallelReplicasReadAssignedForStealingMarks, "Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash") \ M(ParallelReplicasReadAssignedForStealingMarks, "Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash") \
M(ParallelReplicasReadMarks, "How many marks were read by the given replica") \
\ \
M(ParallelReplicasStealingByHashMicroseconds, "Time spent collecting segments meant for stealing by hash") \ M(ParallelReplicasStealingByHashMicroseconds, "Time spent collecting segments meant for stealing by hash") \
M(ParallelReplicasProcessingPartsMicroseconds, "Time spent processing data parts") \ M(ParallelReplicasProcessingPartsMicroseconds, "Time spent processing data parts") \
@ -529,6 +530,7 @@ The server successfully detected this situation and will download merged part fr
M(CachedReadBufferReadFromCacheMicroseconds, "Time reading from filesystem cache") \ M(CachedReadBufferReadFromCacheMicroseconds, "Time reading from filesystem cache") \
M(CachedReadBufferReadFromSourceBytes, "Bytes read from filesystem cache source (from remote fs, etc)") \ M(CachedReadBufferReadFromSourceBytes, "Bytes read from filesystem cache source (from remote fs, etc)") \
M(CachedReadBufferReadFromCacheBytes, "Bytes read from filesystem cache") \ M(CachedReadBufferReadFromCacheBytes, "Bytes read from filesystem cache") \
M(CachedReadBufferPredownloadedBytes, "Bytes read from filesystem cache source. Cache segments are read from left to right as a whole, it might be that we need to predownload some part of the segment irrelevant for the current task just to get to the needed data") \
M(CachedReadBufferCacheWriteBytes, "Bytes written from source (remote fs, etc) to filesystem cache") \ M(CachedReadBufferCacheWriteBytes, "Bytes written from source (remote fs, etc) to filesystem cache") \
M(CachedReadBufferCacheWriteMicroseconds, "Time spent writing data into filesystem cache") \ M(CachedReadBufferCacheWriteMicroseconds, "Time spent writing data into filesystem cache") \
M(CachedReadBufferCreateBufferMicroseconds, "Prepare buffer time") \ M(CachedReadBufferCreateBufferMicroseconds, "Prepare buffer time") \

View File

@ -33,7 +33,8 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 1; static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 1;
static constexpr auto DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3;
static constexpr auto DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 4;
static constexpr auto DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS = 54453; static constexpr auto DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS = 54453;
static constexpr auto DBMS_MERGE_TREE_PART_INFO_VERSION = 1; static constexpr auto DBMS_MERGE_TREE_PART_INFO_VERSION = 1;
@ -86,6 +87,8 @@ static constexpr auto DBMS_MIN_REVISION_WITH_ROWS_BEFORE_AGGREGATION = 54469;
/// Packets size header /// Packets size header
static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS = 54470; static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS = 54470;
static constexpr auto DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL = 54471;
/// Version of ClickHouse TCP protocol. /// Version of ClickHouse TCP protocol.
/// ///
/// Should be incremented manually on protocol changes. /// Should be incremented manually on protocol changes.
@ -93,6 +96,6 @@ static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS = 54470;
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION, /// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA) /// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases). /// for simplicity (sometimes it may be more convenient in some use cases).
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54470; static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54471;
} }

View File

@ -946,7 +946,7 @@ class IColumn;
M(Bool, parallel_replicas_for_non_replicated_merge_tree, false, "If true, ClickHouse will use parallel replicas algorithm also for non-replicated MergeTree tables", 0) \ M(Bool, parallel_replicas_for_non_replicated_merge_tree, false, "If true, ClickHouse will use parallel replicas algorithm also for non-replicated MergeTree tables", 0) \
M(UInt64, parallel_replicas_min_number_of_rows_per_replica, 0, "Limit the number of replicas used in a query to (estimated rows to read / min_number_of_rows_per_replica). The max is still limited by 'max_parallel_replicas'", 0) \ M(UInt64, parallel_replicas_min_number_of_rows_per_replica, 0, "Limit the number of replicas used in a query to (estimated rows to read / min_number_of_rows_per_replica). The max is still limited by 'max_parallel_replicas'", 0) \
M(Bool, parallel_replicas_prefer_local_join, true, "If true, and JOIN can be executed with parallel replicas algorithm, and all storages of right JOIN part are *MergeTree, local JOIN will be used instead of GLOBAL JOIN.", 0) \ M(Bool, parallel_replicas_prefer_local_join, true, "If true, and JOIN can be executed with parallel replicas algorithm, and all storages of right JOIN part are *MergeTree, local JOIN will be used instead of GLOBAL JOIN.", 0) \
M(UInt64, parallel_replicas_mark_segment_size, 128, "Parts virtually divided into segments to be distributed between replicas for parallel reading. This setting controls the size of these segments. Not recommended to change until you're absolutely sure in what you're doing", 0) \ M(UInt64, parallel_replicas_mark_segment_size, 0, "Parts virtually divided into segments to be distributed between replicas for parallel reading. This setting controls the size of these segments. Not recommended to change until you're absolutely sure in what you're doing. Value should be in range [128; 16384]", 0) \
M(Bool, allow_archive_path_syntax, true, "File/S3 engines/table function will parse paths with '::' as '<archive> :: <file>' if archive has correct extension", 0) \ M(Bool, allow_archive_path_syntax, true, "File/S3 engines/table function will parse paths with '::' as '<archive> :: <file>' if archive has correct extension", 0) \
M(Bool, parallel_replicas_local_plan, false, "Build local plan for local replica", 0) \ M(Bool, parallel_replicas_local_plan, false, "Build local plan for local replica", 0) \
\ \

View File

@ -79,6 +79,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"allow_materialized_view_with_bad_select", true, true, "Support (but not enable yet) stricter validation in CREATE MATERIALIZED VIEW"}, {"allow_materialized_view_with_bad_select", true, true, "Support (but not enable yet) stricter validation in CREATE MATERIALIZED VIEW"},
{"output_format_always_quote_identifiers", false, false, "New setting."}, {"output_format_always_quote_identifiers", false, false, "New setting."},
{"output_format_identifier_quoting_style", "Backticks", "Backticks", "New setting."}, {"output_format_identifier_quoting_style", "Backticks", "Backticks", "New setting."},
{"parallel_replicas_mark_segment_size", 128, 0, "Value for this setting now determined automatically"},
{"database_replicated_allow_replicated_engine_arguments", 1, 0, "Don't allow explicit arguments by default"}, {"database_replicated_allow_replicated_engine_arguments", 1, 0, "Don't allow explicit arguments by default"},
{"database_replicated_allow_explicit_uuid", 0, 0, "Added a new setting to disallow explicitly specifying table UUID"}, {"database_replicated_allow_explicit_uuid", 0, 0, "Added a new setting to disallow explicitly specifying table UUID"},
{"parallel_replicas_local_plan", false, false, "Use local plan for local replica in a query with parallel replicas"}, {"parallel_replicas_local_plan", false, false, "Use local plan for local replica in a query with parallel replicas"},

View File

@ -28,6 +28,7 @@ extern const Event CachedReadBufferReadFromCacheMicroseconds;
extern const Event CachedReadBufferCacheWriteMicroseconds; extern const Event CachedReadBufferCacheWriteMicroseconds;
extern const Event CachedReadBufferReadFromSourceBytes; extern const Event CachedReadBufferReadFromSourceBytes;
extern const Event CachedReadBufferReadFromCacheBytes; extern const Event CachedReadBufferReadFromCacheBytes;
extern const Event CachedReadBufferPredownloadedBytes;
extern const Event CachedReadBufferCacheWriteBytes; extern const Event CachedReadBufferCacheWriteBytes;
extern const Event CachedReadBufferCreateBufferMicroseconds; extern const Event CachedReadBufferCreateBufferMicroseconds;
@ -644,6 +645,7 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegment & file_segment)
size_t current_predownload_size = std::min(current_impl_buffer_size, bytes_to_predownload); size_t current_predownload_size = std::min(current_impl_buffer_size, bytes_to_predownload);
ProfileEvents::increment(ProfileEvents::CachedReadBufferReadFromSourceBytes, current_impl_buffer_size); ProfileEvents::increment(ProfileEvents::CachedReadBufferReadFromSourceBytes, current_impl_buffer_size);
ProfileEvents::increment(ProfileEvents::CachedReadBufferPredownloadedBytes, current_impl_buffer_size);
std::string failure_reason; std::string failure_reason;
bool continue_predownload = file_segment.reserve( bool continue_predownload = file_segment.reserve(

View File

@ -532,7 +532,7 @@ void executeQueryWithParallelReplicas(
max_replicas_to_use = shard.getAllNodeCount(); max_replicas_to_use = shard.getAllNodeCount();
} }
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(max_replicas_to_use, settings.parallel_replicas_mark_segment_size); auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(max_replicas_to_use);
auto external_tables = new_context->getExternalTables(); auto external_tables = new_context->getExternalTables();

View File

@ -1,6 +1,8 @@
#include <Processors/QueryPlan/ReadFromMergeTree.h> #include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Core/Settings.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/ExpressionAnalyzer.h> #include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterSelectQuery.h> #include <Interpreters/InterpreterSelectQuery.h>
@ -8,6 +10,8 @@
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h> #include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTSelectQuery.h> #include <Parsers/ASTSelectQuery.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseIdentifierOrStringLiteral.h>
#include <Processors/ConcatProcessor.h> #include <Processors/ConcatProcessor.h>
#include <Processors/Merges/AggregatingSortedTransform.h> #include <Processors/Merges/AggregatingSortedTransform.h>
#include <Processors/Merges/CollapsingSortedTransform.h> #include <Processors/Merges/CollapsingSortedTransform.h>
@ -16,6 +20,7 @@
#include <Processors/Merges/ReplacingSortedTransform.h> #include <Processors/Merges/ReplacingSortedTransform.h>
#include <Processors/Merges/SummingSortedTransform.h> #include <Processors/Merges/SummingSortedTransform.h>
#include <Processors/Merges/VersionedCollapsingTransform.h> #include <Processors/Merges/VersionedCollapsingTransform.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/PartsSplitter.h> #include <Processors/QueryPlan/PartsSplitter.h>
#include <Processors/Sources/NullSource.h> #include <Processors/Sources/NullSource.h>
#include <Processors/Transforms/ExpressionTransform.h> #include <Processors/Transforms/ExpressionTransform.h>
@ -24,10 +29,11 @@
#include <Processors/Transforms/SelectByIndicesTransform.h> #include <Processors/Transforms/SelectByIndicesTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h> #include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h> #include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeIndexVectorSimilarity.h>
#include <Storages/MergeTree/MergeTreeIndexLegacyVectorSimilarity.h> #include <Storages/MergeTree/MergeTreeIndexLegacyVectorSimilarity.h>
#include <Storages/MergeTree/MergeTreeReadPool.h> #include <Storages/MergeTree/MergeTreeIndexMinMax.h>
#include <Storages/MergeTree/MergeTreeIndexVectorSimilarity.h>
#include <Storages/MergeTree/MergeTreePrefetchedReadPool.h> #include <Storages/MergeTree/MergeTreePrefetchedReadPool.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeReadPoolInOrder.h> #include <Storages/MergeTree/MergeTreeReadPoolInOrder.h>
#include <Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h> #include <Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h>
#include <Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h> #include <Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h>
@ -41,11 +47,6 @@
#include <Common/JSONBuilder.h> #include <Common/JSONBuilder.h>
#include <Common/isLocalAddress.h> #include <Common/isLocalAddress.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Core/Settings.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Parsers/parseIdentifierOrStringLiteral.h>
#include <Parsers/ExpressionListParsers.h>
#include <Storages/MergeTree/MergeTreeIndexMinMax.h>
#include <algorithm> #include <algorithm>
#include <iterator> #include <iterator>
@ -381,6 +382,7 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(RangesInDataParts parts_wit
.all_callback = all_ranges_callback.value(), .all_callback = all_ranges_callback.value(),
.callback = read_task_callback.value(), .callback = read_task_callback.value(),
.number_of_current_replica = number_of_current_replica.value_or(client_info.number_of_current_replica), .number_of_current_replica = number_of_current_replica.value_or(client_info.number_of_current_replica),
.total_nodes_count = context->getClusterForParallelReplicas()->getShardsInfo().at(0).getAllNodeCount(),
}; };
/// We have a special logic for local replica. It has to read less data, because in some cases it should /// We have a special logic for local replica. It has to read less data, because in some cases it should
@ -563,6 +565,7 @@ Pipe ReadFromMergeTree::readInOrder(
.all_callback = all_ranges_callback.value(), .all_callback = all_ranges_callback.value(),
.callback = read_task_callback.value(), .callback = read_task_callback.value(),
.number_of_current_replica = number_of_current_replica.value_or(client_info.number_of_current_replica), .number_of_current_replica = number_of_current_replica.value_or(client_info.number_of_current_replica),
.total_nodes_count = context->getClusterForParallelReplicas()->getShardsInfo().at(0).getAllNodeCount(),
}; };
auto multiplier = context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier; auto multiplier = context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier;

View File

@ -1270,7 +1270,7 @@ void TCPHandler::sendReadTaskRequestAssumeLocked()
void TCPHandler::sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRangesAnnouncement announcement) void TCPHandler::sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRangesAnnouncement announcement)
{ {
writeVarUInt(Protocol::Server::MergeTreeAllRangesAnnouncement, *out); writeVarUInt(Protocol::Server::MergeTreeAllRangesAnnouncement, *out);
announcement.serialize(*out); announcement.serialize(*out, client_parallel_replicas_protocol_version);
out->finishChunk(); out->finishChunk();
out->next(); out->next();
@ -1280,7 +1280,7 @@ void TCPHandler::sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRanges
void TCPHandler::sendMergeTreeReadTaskRequestAssumeLocked(ParallelReadRequest request) void TCPHandler::sendMergeTreeReadTaskRequestAssumeLocked(ParallelReadRequest request)
{ {
writeVarUInt(Protocol::Server::MergeTreeReadTaskRequest, *out); writeVarUInt(Protocol::Server::MergeTreeReadTaskRequest, *out);
request.serialize(*out); request.serialize(*out, client_parallel_replicas_protocol_version);
out->finishChunk(); out->finishChunk();
out->next(); out->next();
@ -1662,6 +1662,9 @@ void TCPHandler::receiveAddendum()
readStringBinary(proto_send_chunked_cl, *in); readStringBinary(proto_send_chunked_cl, *in);
readStringBinary(proto_recv_chunked_cl, *in); readStringBinary(proto_recv_chunked_cl, *in);
} }
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL)
readVarUInt(client_parallel_replicas_protocol_version, *in);
} }
@ -1689,6 +1692,8 @@ void TCPHandler::sendHello()
writeVarUInt(VERSION_MAJOR, *out); writeVarUInt(VERSION_MAJOR, *out);
writeVarUInt(VERSION_MINOR, *out); writeVarUInt(VERSION_MINOR, *out);
writeVarUInt(DBMS_TCP_PROTOCOL_VERSION, *out); writeVarUInt(DBMS_TCP_PROTOCOL_VERSION, *out);
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL)
writeVarUInt(DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION, *out);
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE) if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE)
writeStringBinary(DateLUT::instance().getTimeZone(), *out); writeStringBinary(DateLUT::instance().getTimeZone(), *out);
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME) if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME)

View File

@ -188,6 +188,7 @@ private:
UInt64 client_version_minor = 0; UInt64 client_version_minor = 0;
UInt64 client_version_patch = 0; UInt64 client_version_patch = 0;
UInt32 client_tcp_protocol_version = 0; UInt32 client_tcp_protocol_version = 0;
UInt32 client_parallel_replicas_protocol_version = 0;
String proto_send_chunked_cl = "notchunked"; String proto_send_chunked_cl = "notchunked";
String proto_recv_chunked_cl = "notchunked"; String proto_recv_chunked_cl = "notchunked";
String quota_key; String quota_key;

View File

@ -1,6 +1,92 @@
#include <iterator>
#include <Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h> #include <Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <algorithm>
#include <iterator>
#include <ranges>
namespace
{
size_t chooseSegmentSize(
LoggerPtr log, size_t mark_segment_size, size_t min_marks_per_task, size_t threads, size_t sum_marks, size_t number_of_replicas)
{
/// Mark segment size determines the granularity of work distribution between replicas.
/// Namely, coordinator will take mark segments of size `mark_segment_size` granules, calculate hash of this segment and assign it to corresponding replica.
/// Small segments are good when we read a small random subset of a table, big - when we do full-scan over a large table.
/// With small segments there is a problem: consider a query like `select max(time) from wikistat`. Average size of `time` per granule is ~5KB. So when we
/// read 128 granules we still read only ~0.5MB of data. With default fs cache segment size of 4MB it means a lot of data will be downloaded and written
/// in cache for no reason. General case will look like this:
///
/// +---------- useful data
/// v
/// +------+--+------+
/// |------|++| |
/// |------|++| |
/// +------+--+------+
/// ^
/// predownloaded data -----------+
///
/// Having large segments solves all the problems in this case. Also bigger segments mean less requests (especially for big tables and full-scans).
/// These three values below chosen mostly intuitively. 128 granules is 1M rows - just a good starting point, 16384 seems to still make sense when reading
/// billions of rows and 1024 - is a reasonable point in between. We limit our choice to only these three options because when we change segment size
/// we essentially change distribution of data between replicas and of course we don't want to use simultaneously tens of different distributions, because
/// it would be a huge waste of cache space.
constexpr std::array<size_t, 3> borders{128, 1024, 16384};
LOG_DEBUG(
log,
"mark_segment_size={}, min_marks_per_task*threads={}, sum_marks/number_of_replicas^2={}",
mark_segment_size,
min_marks_per_task * threads,
sum_marks / number_of_replicas / number_of_replicas);
/// Here we take max of two numbers:
/// * (min_marks_per_task * threads) = the number of marks we request from the coordinator each time - there is no point to have segments smaller than one unit of work for a replica
/// * (sum_marks / number_of_replicas^2) - we use consistent hashing for work distribution (including work stealing). If we have a really slow replica
/// everything except (1/number_of_replicas) portion of its work will be stolen by other replicas. And it owns (1/number_of_replicas) share of total number of marks.
/// Also important to note here that sum_marks is calculated after PK analysis, it means in particular that different segment sizes might be used for the
/// same table for different queries (it is intentional).
///
/// Positive `mark_segment_size` means it is a user provided value, we have to preserve it.
if (mark_segment_size == 0)
mark_segment_size = std::max(min_marks_per_task * threads, sum_marks / number_of_replicas / number_of_replicas);
/// Squeeze the value to the borders.
mark_segment_size = std::clamp(mark_segment_size, borders.front(), borders.back());
/// After we calculated a hopefully good value for segment_size let's just find the maximal border that is not bigger than the chosen value.
for (auto border : borders | std::views::reverse)
{
if (mark_segment_size >= border)
{
LOG_DEBUG(log, "Chosen segment size: {}", border);
return border;
}
}
UNREACHABLE();
}
size_t getMinMarksPerTask(size_t min_marks_per_task, const std::vector<DB::MergeTreeReadTaskInfoPtr> & per_part_infos)
{
for (const auto & info : per_part_infos)
min_marks_per_task = std::max(min_marks_per_task, info->min_marks_per_task);
if (min_marks_per_task == 0)
throw DB::Exception(
DB::ErrorCodes::BAD_ARGUMENTS, "Chosen number of marks to read is zero (likely because of weird interference of settings)");
return min_marks_per_task;
}
}
namespace ProfileEvents
{
extern const Event ParallelReplicasReadMarks;
}
namespace DB namespace DB
{ {
@ -36,17 +122,17 @@ MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas(
context_) context_)
, extension(std::move(extension_)) , extension(std::move(extension_))
, coordination_mode(CoordinationMode::Default) , coordination_mode(CoordinationMode::Default)
, min_marks_per_task(pool_settings.min_marks_for_concurrent_read) , min_marks_per_task(getMinMarksPerTask(pool_settings.min_marks_for_concurrent_read, per_part_infos))
, mark_segment_size(chooseSegmentSize(
log,
context_->getSettingsRef().parallel_replicas_mark_segment_size,
min_marks_per_task,
pool_settings.threads,
pool_settings.sum_marks,
extension.total_nodes_count))
{ {
for (const auto & info : per_part_infos) extension.all_callback(InitialAllRangesAnnouncement(
min_marks_per_task = std::max(min_marks_per_task, info->min_marks_per_task); coordination_mode, parts_ranges.getDescriptions(), extension.number_of_current_replica, mark_segment_size));
if (min_marks_per_task == 0)
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Chosen number of marks to read is zero (likely because of weird interference of settings)");
extension.all_callback(
InitialAllRangesAnnouncement(coordination_mode, parts_ranges.getDescriptions(), extension.number_of_current_replica));
} }
MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t /*task_idx*/, MergeTreeReadTask * previous_task) MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t /*task_idx*/, MergeTreeReadTask * previous_task)
@ -111,6 +197,7 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t /*task_id
if (current_task.ranges.empty()) if (current_task.ranges.empty())
buffered_ranges.pop_front(); buffered_ranges.pop_front();
ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, current_sum_marks);
return createTask(per_part_infos[part_idx], std::move(ranges_to_read), previous_task); return createTask(per_part_infos[part_idx], std::move(ranges_to_read), previous_task);
} }

View File

@ -31,12 +31,13 @@ public:
private: private:
mutable std::mutex mutex; mutable std::mutex mutex;
LoggerPtr log = getLogger("MergeTreeReadPoolParallelReplicas");
const ParallelReadingExtension extension; const ParallelReadingExtension extension;
const CoordinationMode coordination_mode; const CoordinationMode coordination_mode;
size_t min_marks_per_task{0}; size_t min_marks_per_task{0};
size_t mark_segment_size{0};
RangesInDataPartsDescription buffered_ranges; RangesInDataPartsDescription buffered_ranges;
bool no_more_tasks_available{false}; bool no_more_tasks_available{false};
LoggerPtr log = getLogger("MergeTreeReadPoolParallelReplicas");
}; };
} }

View File

@ -1,5 +1,10 @@
#include <Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h> #include <Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h>
namespace ProfileEvents
{
extern const Event ParallelReplicasReadMarks;
}
namespace DB namespace DB
{ {
@ -50,11 +55,8 @@ MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrd
for (const auto & part : parts_ranges) for (const auto & part : parts_ranges)
buffered_tasks.push_back({part.data_part->info, MarkRanges{}}); buffered_tasks.push_back({part.data_part->info, MarkRanges{}});
extension.all_callback(InitialAllRangesAnnouncement( extension.all_callback(
mode, InitialAllRangesAnnouncement(mode, parts_ranges.getDescriptions(), extension.number_of_current_replica, /*mark_segment_size_=*/0));
parts_ranges.getDescriptions(),
extension.number_of_current_replica
));
} }
MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t task_idx, MergeTreeReadTask * previous_task) MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t task_idx, MergeTreeReadTask * previous_task)
@ -75,13 +77,14 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t ta
{ {
auto result = std::move(desc.ranges); auto result = std::move(desc.ranges);
desc.ranges = MarkRanges{}; desc.ranges = MarkRanges{};
ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, desc.ranges.getNumberOfMarks());
return result; return result;
} }
} }
return std::nullopt; return std::nullopt;
}; };
if (auto result = get_from_buffer(); result) if (auto result = get_from_buffer())
return createTask(per_part_infos[task_idx], std::move(*result), previous_task); return createTask(per_part_infos[task_idx], std::move(*result), previous_task);
if (no_more_tasks) if (no_more_tasks)
@ -104,7 +107,7 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t ta
std::move(new_ranges.begin(), new_ranges.end(), std::back_inserter(old_ranges)); std::move(new_ranges.begin(), new_ranges.end(), std::back_inserter(old_ranges));
} }
if (auto result = get_from_buffer(); result) if (auto result = get_from_buffer())
return createTask(per_part_infos[task_idx], std::move(*result), previous_task); return createTask(per_part_infos[task_idx], std::move(*result), previous_task);
return nullptr; return nullptr;

View File

@ -27,6 +27,7 @@ struct ParallelReadingExtension
MergeTreeAllRangesCallback all_callback; MergeTreeAllRangesCallback all_callback;
MergeTreeReadTaskCallback callback; MergeTreeReadTaskCallback callback;
size_t number_of_current_replica{0}; size_t number_of_current_replica{0};
size_t total_nodes_count{0};
}; };
/// Base class for MergeTreeThreadSelectAlgorithm and MergeTreeSelectAlgorithm /// Base class for MergeTreeThreadSelectAlgorithm and MergeTreeSelectAlgorithm

View File

@ -212,14 +212,11 @@ using PartRefs = std::deque<Parts::iterator>;
class DefaultCoordinator : public ParallelReplicasReadingCoordinator::ImplInterface class DefaultCoordinator : public ParallelReplicasReadingCoordinator::ImplInterface
{ {
public: public:
explicit DefaultCoordinator(size_t replicas_count_, size_t mark_segment_size_) explicit DefaultCoordinator(size_t replicas_count_)
: ParallelReplicasReadingCoordinator::ImplInterface(replicas_count_) : ParallelReplicasReadingCoordinator::ImplInterface(replicas_count_)
, mark_segment_size(mark_segment_size_)
, replica_status(replicas_count_) , replica_status(replicas_count_)
, distribution_by_hash_queue(replicas_count_) , distribution_by_hash_queue(replicas_count_)
{ {
if (mark_segment_size == 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Zero value provided for `mark_segment_size`");
} }
~DefaultCoordinator() override; ~DefaultCoordinator() override;
@ -232,7 +229,7 @@ public:
private: private:
/// This many granules will represent a single segment of marks that will be assigned to a replica /// This many granules will represent a single segment of marks that will be assigned to a replica
const size_t mark_segment_size{0}; size_t mark_segment_size{0};
bool state_initialized{false}; bool state_initialized{false};
size_t finished_replicas{0}; size_t finished_replicas{0};
@ -394,7 +391,11 @@ void DefaultCoordinator::initializeReadingState(InitialAllRangesAnnouncement ann
state_initialized = true; state_initialized = true;
source_replica_for_parts_snapshot = announcement.replica_num; source_replica_for_parts_snapshot = announcement.replica_num;
LOG_DEBUG(log, "Reading state is fully initialized: {}", fmt::join(all_parts_to_read, "; ")); mark_segment_size = announcement.mark_segment_size;
if (mark_segment_size == 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Zero value provided for `mark_segment_size`");
LOG_DEBUG(log, "Reading state is fully initialized: {}, mark_segment_size: {}", fmt::join(all_parts_to_read, "; "), mark_segment_size);
} }
void DefaultCoordinator::markReplicaAsUnavailable(size_t replica_number) void DefaultCoordinator::markReplicaAsUnavailable(size_t replica_number)
@ -1062,7 +1063,7 @@ void ParallelReplicasReadingCoordinator::initialize(CoordinationMode mode)
switch (mode) switch (mode)
{ {
case CoordinationMode::Default: case CoordinationMode::Default:
pimpl = std::make_unique<DefaultCoordinator>(replicas_count, mark_segment_size); pimpl = std::make_unique<DefaultCoordinator>(replicas_count);
break; break;
case CoordinationMode::WithOrder: case CoordinationMode::WithOrder:
pimpl = std::make_unique<InOrderCoordinator<CoordinationMode::WithOrder>>(replicas_count); pimpl = std::make_unique<InOrderCoordinator<CoordinationMode::WithOrder>>(replicas_count);
@ -1080,8 +1081,7 @@ void ParallelReplicasReadingCoordinator::initialize(CoordinationMode mode)
pimpl->markReplicaAsUnavailable(replica); pimpl->markReplicaAsUnavailable(replica);
} }
ParallelReplicasReadingCoordinator::ParallelReplicasReadingCoordinator(size_t replicas_count_, size_t mark_segment_size_) ParallelReplicasReadingCoordinator::ParallelReplicasReadingCoordinator(size_t replicas_count_) : replicas_count(replicas_count_)
: replicas_count(replicas_count_), mark_segment_size(mark_segment_size_)
{ {
} }

View File

@ -15,7 +15,7 @@ class ParallelReplicasReadingCoordinator
public: public:
class ImplInterface; class ImplInterface;
explicit ParallelReplicasReadingCoordinator(size_t replicas_count_, size_t mark_segment_size_ = 0); explicit ParallelReplicasReadingCoordinator(size_t replicas_count_);
~ParallelReplicasReadingCoordinator(); ~ParallelReplicasReadingCoordinator();
void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement); void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement);
@ -35,7 +35,6 @@ private:
std::mutex mutex; std::mutex mutex;
const size_t replicas_count{0}; const size_t replicas_count{0};
size_t mark_segment_size{0};
std::unique_ptr<ImplInterface> pimpl; std::unique_ptr<ImplInterface> pimpl;
ProgressCallback progress_callback; // store the callback only to bypass it to coordinator implementation ProgressCallback progress_callback; // store the callback only to bypass it to coordinator implementation
std::set<size_t> replicas_used; std::set<size_t> replicas_used;

View File

@ -2,10 +2,10 @@
#include <Storages/MergeTree/RequestResponse.h> #include <Storages/MergeTree/RequestResponse.h>
#include <Core/ProtocolDefines.h> #include <Core/ProtocolDefines.h>
#include <Common/SipHash.h> #include <IO/ReadHelpers.h>
#include <IO/VarInt.h> #include <IO/VarInt.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h> #include <Common/SipHash.h>
#include <consistent_hashing.h> #include <consistent_hashing.h>
@ -14,25 +14,29 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int UNKNOWN_PROTOCOL; extern const int UNKNOWN_PROTOCOL;
extern const int UNKNOWN_ELEMENT_OF_ENUM; extern const int UNKNOWN_ELEMENT_OF_ENUM;
} }
namespace namespace
{ {
CoordinationMode validateAndGet(uint8_t candidate) CoordinationMode validateAndGet(uint8_t candidate)
{ {
if (candidate <= static_cast<uint8_t>(CoordinationMode::MAX)) if (candidate <= static_cast<uint8_t>(CoordinationMode::MAX))
return static_cast<CoordinationMode>(candidate); return static_cast<CoordinationMode>(candidate);
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_OF_ENUM, "Unknown reading mode: {}", candidate); throw Exception(ErrorCodes::UNKNOWN_ELEMENT_OF_ENUM, "Unknown reading mode: {}", candidate);
} }
} }
void ParallelReadRequest::serialize(WriteBuffer & out) const void ParallelReadRequest::serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const
{ {
UInt64 version = DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION; /// Previously we didn't maintain backward compatibility and every change was breaking.
/// Must be the first /// Particularly, we had an equality check for the version. To work around that code
/// in previous server versions we now have to lie to them about the version.
const UInt64 version = initiator_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL
? DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION
: DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION;
writeIntBinary(version, out); writeIntBinary(version, out);
writeIntBinary(mode, out); writeIntBinary(mode, out);
@ -53,10 +57,12 @@ ParallelReadRequest ParallelReadRequest::deserialize(ReadBuffer & in)
{ {
UInt64 version; UInt64 version;
readIntBinary(version, in); readIntBinary(version, in);
if (version != DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION) if (version < DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION)
throw Exception(ErrorCodes::UNKNOWN_PROTOCOL, "Protocol versions for parallel reading "\ throw Exception(
"from replicas differ. Got: {}, supported version: {}", ErrorCodes::UNKNOWN_PROTOCOL,
version, DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION); "Parallel replicas protocol version is too old. Got: {}, min supported version: {}",
version,
DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION);
CoordinationMode mode; CoordinationMode mode;
size_t replica_num; size_t replica_num;
@ -70,12 +76,7 @@ ParallelReadRequest ParallelReadRequest::deserialize(ReadBuffer & in)
readIntBinary(min_number_of_marks, in); readIntBinary(min_number_of_marks, in);
description.deserialize(in); description.deserialize(in);
return ParallelReadRequest( return ParallelReadRequest(mode, replica_num, min_number_of_marks, std::move(description));
mode,
replica_num,
min_number_of_marks,
std::move(description)
);
} }
void ParallelReadRequest::merge(ParallelReadRequest & other) void ParallelReadRequest::merge(ParallelReadRequest & other)
@ -86,9 +87,14 @@ void ParallelReadRequest::merge(ParallelReadRequest & other)
description.merge(other.description); description.merge(other.description);
} }
void ParallelReadResponse::serialize(WriteBuffer & out) const void ParallelReadResponse::serialize(WriteBuffer & out, UInt64 replica_protocol_version) const
{ {
UInt64 version = DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION; /// Previously we didn't maintain backward compatibility and every change was breaking.
/// Particularly, we had an equality check for the version. To work around that code
/// in previous server versions we now have to lie to them about the version.
UInt64 version = replica_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL
? DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION
: DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION;
/// Must be the first /// Must be the first
writeIntBinary(version, out); writeIntBinary(version, out);
@ -105,25 +111,33 @@ void ParallelReadResponse::deserialize(ReadBuffer & in)
{ {
UInt64 version; UInt64 version;
readIntBinary(version, in); readIntBinary(version, in);
if (version != DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION) if (version < DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION)
throw Exception(ErrorCodes::UNKNOWN_PROTOCOL, "Protocol versions for parallel reading " \ throw Exception(
"from replicas differ. Got: {}, supported version: {}", ErrorCodes::UNKNOWN_PROTOCOL,
version, DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION); "Parallel replicas protocol version is too old. Got: {}, min supported version: {}",
version,
DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION);
readBoolText(finish, in); readBoolText(finish, in);
description.deserialize(in); description.deserialize(in);
} }
void InitialAllRangesAnnouncement::serialize(WriteBuffer & out) const void InitialAllRangesAnnouncement::serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const
{ {
UInt64 version = DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION; /// Previously we didn't maintain backward compatibility and every change was breaking.
/// Must be the first /// Particularly, we had an equality check for the version. To work around that code
/// in previous server versions we now have to lie to them about the version.
UInt64 version = initiator_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL
? DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION
: DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION;
writeIntBinary(version, out); writeIntBinary(version, out);
writeIntBinary(mode, out); writeIntBinary(mode, out);
description.serialize(out); description.serialize(out);
writeIntBinary(replica_num, out); writeIntBinary(replica_num, out);
if (initiator_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL)
writeIntBinary(mark_segment_size, out);
} }
@ -132,14 +146,16 @@ String InitialAllRangesAnnouncement::describe()
return fmt::format("replica {}, mode {}, {}", replica_num, mode, description.describe()); return fmt::format("replica {}, mode {}, {}", replica_num, mode, description.describe());
} }
InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffer & in) InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffer & in, UInt64 replica_protocol_version)
{ {
UInt64 version; UInt64 version;
readIntBinary(version, in); readIntBinary(version, in);
if (version != DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION) if (version < DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION)
throw Exception(ErrorCodes::UNKNOWN_PROTOCOL, "Protocol versions for parallel reading " \ throw Exception(
"from replicas differ. Got: {}, supported version: {}", ErrorCodes::UNKNOWN_PROTOCOL,
version, DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION); "Parallel replicas protocol version is too old. Got: {}, min supported version: {}",
version,
DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION);
CoordinationMode mode; CoordinationMode mode;
RangesInDataPartsDescription description; RangesInDataPartsDescription description;
@ -151,11 +167,11 @@ InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffe
description.deserialize(in); description.deserialize(in);
readIntBinary(replica_num, in); readIntBinary(replica_num, in);
return InitialAllRangesAnnouncement { size_t mark_segment_size = 128;
mode, if (replica_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL)
description, readIntBinary(mark_segment_size, in);
replica_num
}; return InitialAllRangesAnnouncement{mode, description, replica_num, mark_segment_size};
} }
} }

View File

@ -63,7 +63,7 @@ struct ParallelReadRequest
/// Contains only data part names without mark ranges. /// Contains only data part names without mark ranges.
RangesInDataPartsDescription description; RangesInDataPartsDescription description;
void serialize(WriteBuffer & out) const; void serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const;
String describe() const; String describe() const;
static ParallelReadRequest deserialize(ReadBuffer & in); static ParallelReadRequest deserialize(ReadBuffer & in);
void merge(ParallelReadRequest & other); void merge(ParallelReadRequest & other);
@ -78,7 +78,7 @@ struct ParallelReadResponse
bool finish{false}; bool finish{false};
RangesInDataPartsDescription description; RangesInDataPartsDescription description;
void serialize(WriteBuffer & out) const; void serialize(WriteBuffer & out, UInt64 replica_protocol_version) const;
String describe() const; String describe() const;
void deserialize(ReadBuffer & in); void deserialize(ReadBuffer & in);
}; };
@ -93,21 +93,18 @@ struct InitialAllRangesAnnouncement
/// No default constructor, you must initialize all fields at once. /// No default constructor, you must initialize all fields at once.
InitialAllRangesAnnouncement( InitialAllRangesAnnouncement(
CoordinationMode mode_, CoordinationMode mode_, RangesInDataPartsDescription description_, size_t replica_num_, size_t mark_segment_size_)
RangesInDataPartsDescription description_, : mode(mode_), description(std::move(description_)), replica_num(replica_num_), mark_segment_size(mark_segment_size_)
size_t replica_num_)
: mode(mode_)
, description(description_)
, replica_num(replica_num_)
{} {}
CoordinationMode mode; CoordinationMode mode;
RangesInDataPartsDescription description; RangesInDataPartsDescription description;
size_t replica_num; size_t replica_num;
size_t mark_segment_size;
void serialize(WriteBuffer & out) const; void serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const;
String describe(); String describe();
static InitialAllRangesAnnouncement deserialize(ReadBuffer & in); static InitialAllRangesAnnouncement deserialize(ReadBuffer & i, UInt64 replica_protocol_version);
}; };

View File

@ -0,0 +1,20 @@
<clickhouse>
<remote_servers>
<parallel_replicas>
<shard>
<replica>
<host>node0</host>
<port>9000</port>
</replica>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</parallel_replicas>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,64 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
cluster_name = "parallel_replicas"
nodes = [
cluster.add_instance(
f"node{num}",
main_configs=["configs/clusters.xml"],
with_zookeeper=False,
image="clickhouse/clickhouse-server",
tag="23.11", # earlier versions lead to "Not found column sum(a) in block." exception 🤷
stay_alive=True,
use_old_analyzer=True,
with_installed_binary=True,
)
for num in range(2)
] + [
cluster.add_instance(
"node2",
main_configs=["configs/clusters.xml"],
with_zookeeper=False,
use_old_analyzer=True,
)
]
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_backward_compatability(start_cluster):
for node in nodes:
node.query("create table t (a UInt64) engine = MergeTree order by tuple()")
node.query("insert into t select number % 100000 from numbers_mt(1000000)")
# all we want is the query to run without errors
for node in nodes:
assert (
node.query(
"""
select sum(a)
from t
""",
settings={
"cluster_for_parallel_replicas": "parallel_replicas",
"max_parallel_replicas": 3,
"allow_experimental_parallel_reading_from_replicas": 1,
"parallel_replicas_for_non_replicated_merge_tree": 1,
},
)
== "49999500000\n"
)
for node in nodes:
node.query("drop table t")