From cb0335446eb6b1ca2e452c9246893fe2053a7e1e Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Thu, 15 Aug 2024 16:55:47 +0100 Subject: [PATCH 01/14] impl --- src/Common/ProfileEvents.cpp | 2 + src/Core/ProtocolDefines.h | 3 +- src/Core/Settings.h | 2 +- .../IO/CachedOnDiskReadBufferFromFile.cpp | 2 + .../QueryPlan/ReadFromMergeTree.cpp | 9 +- src/Processors/QueryPlan/ReadFromRemote.cpp | 3 +- .../MergeTreeReadPoolParallelReplicas.cpp | 87 ++++++++++++++++++- .../MergeTreeReadPoolParallelReplicas.h | 3 +- ...rgeTreeReadPoolParallelReplicasInOrder.cpp | 17 ++-- .../MergeTree/MergeTreeSelectProcessor.h | 1 + .../ParallelReplicasReadingCoordinator.cpp | 16 ++-- .../ParallelReplicasReadingCoordinator.h | 3 +- src/Storages/MergeTree/RequestResponse.cpp | 10 ++- src/Storages/MergeTree/RequestResponse.h | 9 +- 14 files changed, 130 insertions(+), 37 deletions(-) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index ccdce7ff584..e5bad44ae93 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -329,6 +329,7 @@ The server successfully detected this situation and will download merged part fr M(ParallelReplicasReadAssignedMarks, "Sum across all replicas of how many of scheduled marks were assigned by consistent hash") \ M(ParallelReplicasReadUnassignedMarks, "Sum across all replicas of how many unassigned marks were scheduled") \ M(ParallelReplicasReadAssignedForStealingMarks, "Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash") \ + M(ParallelReplicasReadMarks, "How many marks were read by the given replica") \ \ M(ParallelReplicasStealingByHashMicroseconds, "Time spent collecting segments meant for stealing by hash") \ M(ParallelReplicasProcessingPartsMicroseconds, "Time spent processing data parts") \ @@ -482,6 +483,7 @@ The server successfully detected this situation and will download merged part fr M(CachedReadBufferReadFromCacheMicroseconds, "Time reading from filesystem cache") \ M(CachedReadBufferReadFromSourceBytes, "Bytes read from filesystem cache source (from remote fs, etc)") \ M(CachedReadBufferReadFromCacheBytes, "Bytes read from filesystem cache") \ + M(CachedReadBufferPredownloadedBytes, "Bytes read from filesystem cache source. Cache segments are read from left to right as a whole, it might be that we need to predownload some part of the segment irrelevant for the current task just to get to the needed data") \ M(CachedReadBufferCacheWriteBytes, "Bytes written from source (remote fs, etc) to filesystem cache") \ M(CachedReadBufferCacheWriteMicroseconds, "Time spent writing data into filesystem cache") \ M(CachedReadBufferCreateBufferMicroseconds, "Prepare buffer time") \ diff --git a/src/Core/ProtocolDefines.h b/src/Core/ProtocolDefines.h index 02d54221ed3..dd37daadaff 100644 --- a/src/Core/ProtocolDefines.h +++ b/src/Core/ProtocolDefines.h @@ -33,7 +33,8 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54 static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 1; -static constexpr auto DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; +static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; +static constexpr auto DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 4; static constexpr auto DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS = 54453; static constexpr auto DBMS_MERGE_TREE_PART_INFO_VERSION = 1; diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 0808e8eb49f..6a980a850c8 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -938,7 +938,7 @@ class IColumn; M(Bool, parallel_replicas_for_non_replicated_merge_tree, false, "If true, ClickHouse will use parallel replicas algorithm also for non-replicated MergeTree tables", 0) \ M(UInt64, parallel_replicas_min_number_of_rows_per_replica, 0, "Limit the number of replicas used in a query to (estimated rows to read / min_number_of_rows_per_replica). The max is still limited by 'max_parallel_replicas'", 0) \ M(Bool, parallel_replicas_prefer_local_join, true, "If true, and JOIN can be executed with parallel replicas algorithm, and all storages of right JOIN part are *MergeTree, local JOIN will be used instead of GLOBAL JOIN.", 0) \ - M(UInt64, parallel_replicas_mark_segment_size, 128, "Parts virtually divided into segments to be distributed between replicas for parallel reading. This setting controls the size of these segments. Not recommended to change until you're absolutely sure in what you're doing", 0) \ + M(UInt64, parallel_replicas_mark_segment_size, 0, "Parts virtually divided into segments to be distributed between replicas for parallel reading. This setting controls the size of these segments. Not recommended to change until you're absolutely sure in what you're doing. Value should be in range [128; 16384]", 0) \ M(Bool, allow_archive_path_syntax, true, "File/S3 engines/table function will parse paths with '::' as ' :: ' if archive has correct extension", 0) \ \ M(Bool, allow_experimental_inverted_index, false, "If it is set to true, allow to use experimental inverted index.", 0) \ diff --git a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp index b471f3fc58f..6363c40a9ad 100644 --- a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp +++ b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp @@ -28,6 +28,7 @@ extern const Event CachedReadBufferReadFromCacheMicroseconds; extern const Event CachedReadBufferCacheWriteMicroseconds; extern const Event CachedReadBufferReadFromSourceBytes; extern const Event CachedReadBufferReadFromCacheBytes; +extern const Event CachedReadBufferPredownloadedBytes; extern const Event CachedReadBufferCacheWriteBytes; extern const Event CachedReadBufferCreateBufferMicroseconds; @@ -644,6 +645,7 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegment & file_segment) size_t current_predownload_size = std::min(current_impl_buffer_size, bytes_to_predownload); ProfileEvents::increment(ProfileEvents::CachedReadBufferReadFromSourceBytes, current_impl_buffer_size); + ProfileEvents::increment(ProfileEvents::CachedReadBufferPredownloadedBytes, current_impl_buffer_size); bool continue_predownload = file_segment.reserve( current_predownload_size, settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds); diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 348019d7d10..768d1cbc639 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -52,6 +52,7 @@ #include #include +#include "Interpreters/Cluster.h" #include "config.h" using namespace DB; @@ -343,11 +344,11 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas( { const auto & client_info = context->getClientInfo(); - auto extension = ParallelReadingExtension - { + auto extension = ParallelReadingExtension{ .all_callback = all_ranges_callback.value(), .callback = read_task_callback.value(), .number_of_current_replica = client_info.number_of_current_replica, + .total_nodes_count = context->getClusterForParallelReplicas()->getShardsInfo().begin()->getAllNodeCount(), }; /// We have a special logic for local replica. It has to read less data, because in some cases it should @@ -514,11 +515,11 @@ Pipe ReadFromMergeTree::readInOrder( if (is_parallel_reading_from_replicas) { const auto & client_info = context->getClientInfo(); - ParallelReadingExtension extension - { + ParallelReadingExtension extension{ .all_callback = all_ranges_callback.value(), .callback = read_task_callback.value(), .number_of_current_replica = client_info.number_of_current_replica, + .total_nodes_count = context->getClusterForParallelReplicas()->getShardsInfo().begin()->getAllNodeCount(), }; const auto multiplier = context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier; diff --git a/src/Processors/QueryPlan/ReadFromRemote.cpp b/src/Processors/QueryPlan/ReadFromRemote.cpp index cf11052cd59..9dd90d26487 100644 --- a/src/Processors/QueryPlan/ReadFromRemote.cpp +++ b/src/Processors/QueryPlan/ReadFromRemote.cpp @@ -436,8 +436,7 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder shuffled_pool = shard.pool->getShuffledPools(current_settings, priority_func); } - coordinator - = std::make_shared(max_replicas_to_use, current_settings.parallel_replicas_mark_segment_size); + coordinator = std::make_shared(max_replicas_to_use); for (size_t i=0; i < max_replicas_to_use; ++i) { diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp index 33eaf5a49bd..2736aab2603 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp @@ -1,6 +1,79 @@ -#include #include +#include +#include + +#include +#include +#include + + +namespace +{ + +size_t chooseSegmentSize( + LoggerPtr log, size_t mark_segment_size, size_t min_marks_per_task, size_t threads, size_t sum_marks, size_t number_of_replicas) +{ + /// Mark segment size determines the granularity of work distribution between replicas. + /// Namely, coordinator will take mark segments of size `mark_segment_size` granules, calculate hash of this segment and assign it to corresponding replica. + /// Small segments are good when we read a small random subset of a table, big - when we do full-scan over a large table. + /// With small segments there is a problem: consider a query like `select max(time) from wikistat`. Average size of `time` per granule is ~5KB. So when we + /// read 128 granules we still read only ~0.5MB of data. With default fs cache segment size of 4MB it means a lot of data will be downloaded and written + /// in cache for no reason. General case will look like this: + /// + /// +---------- useful data + /// v + /// +------+--+------+ + /// |------|++| | + /// |------|++| | + /// +------+--+------+ + /// ^ + /// predownloaded data -----------+ + /// + /// Having large segments solves all the problems in this case. Also bigger segments mean less requests (especially for big tables and full-scans). + /// These three values below chosen mostly intuitively. 128 granules is 1M rows - just a good starting point, 16384 seems to still make sense when reading + /// billions of rows and 1024 - is a reasonable point in between. We limit our choice to only these three options because when we change segment size + /// we essentially change distribution of data between replicas and of course we don't want to use simultaneously tens of different distributions, because + /// it would be a huge waste of cache space. + constexpr std::array borders{128, 1024, 16384}; + + LOG_DEBUG( + log, + "mark_segment_size={}, min_marks_per_task*threads={}, sum_marks/number_of_replicas^2={}", + mark_segment_size, + min_marks_per_task * threads, + sum_marks / number_of_replicas / number_of_replicas); + + /// Here we take max of three numbers: + /// * user provided setting (0 by default) + /// * (min_marks_per_task * threads) = the number of marks we request from the coordinator each time - there is no point to have segments smaller than one unit of work for a replica + /// * (sum_marks / number_of_replicas^2) - we use consistent hashing for work distribution (including work stealing). If we have a really slow replica + /// everything up to (1/number_of_replicas) portion of its work will be stolen by other replicas. And it owns (1/number_of_replicas) share of total number of marks. + /// Aslo important to note here that sum_marks is calculated after PK analysis, it means in particular that different segment sizes might be used for the + /// same table for different queries (it is intentional). + mark_segment_size = std::max({mark_segment_size, min_marks_per_task * threads, sum_marks / number_of_replicas / number_of_replicas}); + + /// Squeeze the value to the borders. + mark_segment_size = std::clamp(mark_segment_size, borders.front(), borders.back()); + /// After we calculated a hopefully good value for segment_size let's just find the maximal border that is not bigger than the chosen value. + for (auto border : borders | std::views::reverse) + { + if (mark_segment_size >= border) + { + LOG_DEBUG(log, "Chosen segment size: {}", border); + return border; + } + } + + UNREACHABLE(); +} + +} + +namespace ProfileEvents +{ +extern const Event ParallelReplicasReadMarks; +} namespace DB { @@ -34,12 +107,19 @@ MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas( , extension(std::move(extension_)) , coordination_mode(CoordinationMode::Default) , min_marks_per_task(pool_settings.min_marks_for_concurrent_read) + , mark_segment_size(chooseSegmentSize( + log, + context_->getSettingsRef().parallel_replicas_mark_segment_size, + min_marks_per_task, + pool_settings.threads, + pool_settings.sum_marks, + extension.total_nodes_count)) { for (const auto & info : per_part_infos) min_marks_per_task = std::max(min_marks_per_task, info->min_marks_per_task); - extension.all_callback( - InitialAllRangesAnnouncement(coordination_mode, parts_ranges.getDescriptions(), extension.number_of_current_replica)); + extension.all_callback(InitialAllRangesAnnouncement( + coordination_mode, parts_ranges.getDescriptions(), extension.number_of_current_replica, mark_segment_size)); } MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t /*task_idx*/, MergeTreeReadTask * previous_task) @@ -104,6 +184,7 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t /*task_id if (current_task.ranges.empty()) buffered_ranges.pop_front(); + ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, current_sum_marks); return createTask(per_part_infos[part_idx], std::move(ranges_to_read), previous_task); } diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h index 6ba63cc2c9a..1c79bd736bd 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h @@ -30,12 +30,13 @@ public: private: mutable std::mutex mutex; + LoggerPtr log = getLogger("MergeTreeReadPoolParallelReplicas"); const ParallelReadingExtension extension; const CoordinationMode coordination_mode; size_t min_marks_per_task{0}; + size_t mark_segment_size{0}; RangesInDataPartsDescription buffered_ranges; bool no_more_tasks_available{false}; - LoggerPtr log = getLogger("MergeTreeReadPoolParallelReplicas"); }; } diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp index 6b5cf978423..e0cb88c209a 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp @@ -1,5 +1,10 @@ #include +namespace ProfileEvents +{ +extern const Event ParallelReplicasReadMarks; +} + namespace DB { @@ -43,11 +48,8 @@ MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrd for (const auto & part : parts_ranges) buffered_tasks.push_back({part.data_part->info, MarkRanges{}}); - extension.all_callback(InitialAllRangesAnnouncement( - mode, - parts_ranges.getDescriptions(), - extension.number_of_current_replica - )); + extension.all_callback( + InitialAllRangesAnnouncement(mode, parts_ranges.getDescriptions(), extension.number_of_current_replica, /*mark_segment_size_=*/0)); } MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t task_idx, MergeTreeReadTask * previous_task) @@ -68,13 +70,14 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t ta { auto result = std::move(desc.ranges); desc.ranges = MarkRanges{}; + ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, desc.ranges.getNumberOfMarks()); return result; } } return std::nullopt; }; - if (auto result = get_from_buffer(); result) + if (auto result = get_from_buffer()) return createTask(per_part_infos[task_idx], std::move(*result), previous_task); if (no_more_tasks) @@ -97,7 +100,7 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t ta std::move(new_ranges.begin(), new_ranges.end(), std::back_inserter(old_ranges)); } - if (auto result = get_from_buffer(); result) + if (auto result = get_from_buffer()) return createTask(per_part_infos[task_idx], std::move(*result), previous_task); return nullptr; diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.h b/src/Storages/MergeTree/MergeTreeSelectProcessor.h index 7a9cebbcb2e..e20427dbff0 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.h @@ -27,6 +27,7 @@ struct ParallelReadingExtension MergeTreeAllRangesCallback all_callback; MergeTreeReadTaskCallback callback; size_t number_of_current_replica{0}; + size_t total_nodes_count{0}; }; /// Base class for MergeTreeThreadSelectAlgorithm and MergeTreeSelectAlgorithm diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp index f46b4de10b7..4a1b6d0bada 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp @@ -211,14 +211,11 @@ using PartRefs = std::deque; class DefaultCoordinator : public ParallelReplicasReadingCoordinator::ImplInterface { public: - explicit DefaultCoordinator(size_t replicas_count_, size_t mark_segment_size_) + explicit DefaultCoordinator(size_t replicas_count_) : ParallelReplicasReadingCoordinator::ImplInterface(replicas_count_) - , mark_segment_size(mark_segment_size_) , replica_status(replicas_count_) , distribution_by_hash_queue(replicas_count_) { - if (mark_segment_size == 0) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Zero value provided for `mark_segment_size`"); } ~DefaultCoordinator() override; @@ -231,7 +228,7 @@ public: private: /// This many granules will represent a single segment of marks that will be assigned to a replica - const size_t mark_segment_size{0}; + size_t mark_segment_size{0}; bool state_initialized{false}; size_t finished_replicas{0}; @@ -393,6 +390,10 @@ void DefaultCoordinator::initializeReadingState(InitialAllRangesAnnouncement ann state_initialized = true; source_replica_for_parts_snapshot = announcement.replica_num; + mark_segment_size = announcement.mark_segment_size; + if (mark_segment_size == 0) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Zero value provided for `mark_segment_size`"); + LOG_DEBUG(log, "Reading state is fully initialized: {}", fmt::join(all_parts_to_read, "; ")); } @@ -1043,7 +1044,7 @@ void ParallelReplicasReadingCoordinator::initialize(CoordinationMode mode) switch (mode) { case CoordinationMode::Default: - pimpl = std::make_unique(replicas_count, mark_segment_size); + pimpl = std::make_unique(replicas_count); break; case CoordinationMode::WithOrder: pimpl = std::make_unique>(replicas_count); @@ -1060,8 +1061,7 @@ void ParallelReplicasReadingCoordinator::initialize(CoordinationMode mode) pimpl->markReplicaAsUnavailable(replica); } -ParallelReplicasReadingCoordinator::ParallelReplicasReadingCoordinator(size_t replicas_count_, size_t mark_segment_size_) - : replicas_count(replicas_count_), mark_segment_size(mark_segment_size_) +ParallelReplicasReadingCoordinator::ParallelReplicasReadingCoordinator(size_t replicas_count_) : replicas_count(replicas_count_) { } diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h index 8b463fda395..ad51d20f553 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h @@ -15,7 +15,7 @@ class ParallelReplicasReadingCoordinator public: class ImplInterface; - explicit ParallelReplicasReadingCoordinator(size_t replicas_count_, size_t mark_segment_size_ = 0); + explicit ParallelReplicasReadingCoordinator(size_t replicas_count_); ~ParallelReplicasReadingCoordinator(); void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement); @@ -35,7 +35,6 @@ private: std::mutex mutex; const size_t replicas_count{0}; - size_t mark_segment_size{0}; std::unique_ptr pimpl; ProgressCallback progress_callback; // store the callback only to bypass it to coordinator implementation std::set replicas_used; diff --git a/src/Storages/MergeTree/RequestResponse.cpp b/src/Storages/MergeTree/RequestResponse.cpp index 2ce0e20dcd2..1b0ad3cca48 100644 --- a/src/Storages/MergeTree/RequestResponse.cpp +++ b/src/Storages/MergeTree/RequestResponse.cpp @@ -126,6 +126,7 @@ void InitialAllRangesAnnouncement::serialize(WriteBuffer & out) const writeIntBinary(mode, out); description.serialize(out); writeIntBinary(replica_num, out); + writeIntBinary(mark_segment_size, out); } @@ -156,10 +157,15 @@ InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffe description.deserialize(in); readIntBinary(replica_num, in); - return InitialAllRangesAnnouncement { + size_t mark_segment_size = 128; + if (version >= 4) + readIntBinary(mark_segment_size, in); + + return InitialAllRangesAnnouncement{ mode, description, - replica_num + replica_num, + mark_segment_size, }; } diff --git a/src/Storages/MergeTree/RequestResponse.h b/src/Storages/MergeTree/RequestResponse.h index 5f5516a6804..17518adf833 100644 --- a/src/Storages/MergeTree/RequestResponse.h +++ b/src/Storages/MergeTree/RequestResponse.h @@ -93,17 +93,14 @@ struct InitialAllRangesAnnouncement /// No default constructor, you must initialize all fields at once. InitialAllRangesAnnouncement( - CoordinationMode mode_, - RangesInDataPartsDescription description_, - size_t replica_num_) - : mode(mode_) - , description(description_) - , replica_num(replica_num_) + CoordinationMode mode_, RangesInDataPartsDescription description_, size_t replica_num_, size_t mark_segment_size_) + : mode(mode_), description(description_), replica_num(replica_num_), mark_segment_size(mark_segment_size_) {} CoordinationMode mode; RangesInDataPartsDescription description; size_t replica_num; + size_t mark_segment_size; void serialize(WriteBuffer & out) const; String describe(); From 891f9c5358dce8c572848fdb24513c0e56629bd8 Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Thu, 15 Aug 2024 18:44:31 +0100 Subject: [PATCH 02/14] fix typo --- src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp index 2736aab2603..87fc1c12ddd 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp @@ -49,7 +49,7 @@ size_t chooseSegmentSize( /// * (min_marks_per_task * threads) = the number of marks we request from the coordinator each time - there is no point to have segments smaller than one unit of work for a replica /// * (sum_marks / number_of_replicas^2) - we use consistent hashing for work distribution (including work stealing). If we have a really slow replica /// everything up to (1/number_of_replicas) portion of its work will be stolen by other replicas. And it owns (1/number_of_replicas) share of total number of marks. - /// Aslo important to note here that sum_marks is calculated after PK analysis, it means in particular that different segment sizes might be used for the + /// Also important to note here that sum_marks is calculated after PK analysis, it means in particular that different segment sizes might be used for the /// same table for different queries (it is intentional). mark_segment_size = std::max({mark_segment_size, min_marks_per_task * threads, sum_marks / number_of_replicas / number_of_replicas}); From 80d985a690d66621dd994b9e8066788b16cfe044 Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Thu, 15 Aug 2024 19:11:23 +0100 Subject: [PATCH 03/14] add setting change --- src/Core/SettingsChangesHistory.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 20a8721c10e..0c2f9190553 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -88,6 +88,7 @@ static std::initializer_list Date: Thu, 15 Aug 2024 18:14:07 +0100 Subject: [PATCH 04/14] add bw-compatibility test --- .../configs/clusters.xml | 20 ++++++ .../test_parallel_replicas_protocol.py | 64 +++++++++++++++++++ 2 files changed, 84 insertions(+) create mode 100644 tests/integration/test_backward_compatibility/configs/clusters.xml create mode 100644 tests/integration/test_backward_compatibility/test_parallel_replicas_protocol.py diff --git a/tests/integration/test_backward_compatibility/configs/clusters.xml b/tests/integration/test_backward_compatibility/configs/clusters.xml new file mode 100644 index 00000000000..ac773152df9 --- /dev/null +++ b/tests/integration/test_backward_compatibility/configs/clusters.xml @@ -0,0 +1,20 @@ + + + + + + node0 + 9000 + + + node1 + 9000 + + + node2 + 9000 + + + + + diff --git a/tests/integration/test_backward_compatibility/test_parallel_replicas_protocol.py b/tests/integration/test_backward_compatibility/test_parallel_replicas_protocol.py new file mode 100644 index 00000000000..6f97df95876 --- /dev/null +++ b/tests/integration/test_backward_compatibility/test_parallel_replicas_protocol.py @@ -0,0 +1,64 @@ +import pytest + +from helpers.cluster import ClickHouseCluster + + +cluster = ClickHouseCluster(__file__) +cluster_name = "parallel_replicas" +nodes = [ + cluster.add_instance( + f"node{num}", + main_configs=["configs/clusters.xml"], + with_zookeeper=False, + image="clickhouse/clickhouse-server", + tag="23.11", + stay_alive=True, + use_old_analyzer=True, + with_installed_binary=True, + ) + for num in range(2) +] + [ + cluster.add_instance( + "node2", + main_configs=["configs/clusters.xml"], + with_zookeeper=False, + use_old_analyzer=True, + ) +] + + +@pytest.fixture(scope="module") +def start_cluster(): + try: + cluster.start() + yield cluster + + finally: + cluster.shutdown() + + +def test_backward_compatability(start_cluster): + for node in nodes: + node.query("create table t (a UInt64) engine = MergeTree order by tuple()") + node.query("insert into t select number % 100000 from numbers_mt(1000000)") + + # all we want is the query to run without errors + for node in nodes: + assert ( + node.query( + """ + select sum(a) + from t + """, + settings={ + "cluster_for_parallel_replicas": "parallel_replicas", + "max_parallel_replicas": 3, + "allow_experimental_parallel_reading_from_replicas": 1, + "parallel_replicas_for_non_replicated_merge_tree": 1, + }, + ) + == "49999500000\n" + ) + + for node in nodes: + node.query("drop table t") From 628a4300ba5d492cf1ae0b4b077459cd2a37f1be Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Sun, 18 Aug 2024 16:53:00 +0100 Subject: [PATCH 05/14] fix --- src/Client/Connection.cpp | 2 +- src/Core/ProtocolDefines.h | 4 +- src/Server/TCPHandler.cpp | 2 +- src/Storages/MergeTree/RequestResponse.cpp | 63 +++++++++++-------- src/Storages/MergeTree/RequestResponse.h | 4 +- .../test_parallel_replicas_protocol.py | 2 +- 6 files changed, 43 insertions(+), 34 deletions(-) diff --git a/src/Client/Connection.cpp b/src/Client/Connection.cpp index 07f4bf19f05..4f19fece5ef 100644 --- a/src/Client/Connection.cpp +++ b/src/Client/Connection.cpp @@ -1330,7 +1330,7 @@ ParallelReadRequest Connection::receiveParallelReadRequest() const InitialAllRangesAnnouncement Connection::receiveInitialParallelReadAnnouncement() const { - return InitialAllRangesAnnouncement::deserialize(*in); + return InitialAllRangesAnnouncement::deserialize(*in, server_revision); } diff --git a/src/Core/ProtocolDefines.h b/src/Core/ProtocolDefines.h index dd37daadaff..5765704e8ac 100644 --- a/src/Core/ProtocolDefines.h +++ b/src/Core/ProtocolDefines.h @@ -33,8 +33,6 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54 static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 1; -static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; -static constexpr auto DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 4; static constexpr auto DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS = 54453; static constexpr auto DBMS_MERGE_TREE_PART_INFO_VERSION = 1; @@ -84,6 +82,8 @@ static constexpr auto DBMS_MIN_REVISION_WITH_SYSTEM_KEYWORDS_TABLE = 54468; static constexpr auto DBMS_MIN_REVISION_WITH_ROWS_BEFORE_AGGREGATION = 54469; +static constexpr auto DBMS_MIN_REVISION_WITH_ADAPTIVE_MARK_SEGMENT_FOR_PARALLEL_REPLICAS = 54470; + /// Version of ClickHouse TCP protocol. /// /// Should be incremented manually on protocol changes. diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 679f72b85ff..4a017b2ae40 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -1221,7 +1221,7 @@ void TCPHandler::sendReadTaskRequestAssumeLocked() void TCPHandler::sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRangesAnnouncement announcement) { writeVarUInt(Protocol::Server::MergeTreeAllRangesAnnouncement, *out); - announcement.serialize(*out); + announcement.serialize(*out, client_tcp_protocol_version); out->next(); } diff --git a/src/Storages/MergeTree/RequestResponse.cpp b/src/Storages/MergeTree/RequestResponse.cpp index 1b0ad3cca48..5e7e392384f 100644 --- a/src/Storages/MergeTree/RequestResponse.cpp +++ b/src/Storages/MergeTree/RequestResponse.cpp @@ -2,10 +2,10 @@ #include #include -#include +#include #include #include -#include +#include #include @@ -20,20 +20,21 @@ namespace ErrorCodes namespace { - CoordinationMode validateAndGet(uint8_t candidate) - { - if (candidate <= static_cast(CoordinationMode::MAX)) - return static_cast(candidate); +constexpr UInt64 DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; - throw Exception(ErrorCodes::UNKNOWN_ELEMENT_OF_ENUM, "Unknown reading mode: {}", candidate); - } +CoordinationMode validateAndGet(uint8_t candidate) +{ + if (candidate <= static_cast(CoordinationMode::MAX)) + return static_cast(candidate); + + throw Exception(ErrorCodes::UNKNOWN_ELEMENT_OF_ENUM, "Unknown reading mode: {}", candidate); +} } void ParallelReadRequest::serialize(WriteBuffer & out) const { - UInt64 version = DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION; /// Must be the first - writeIntBinary(version, out); + writeIntBinary(DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION, out); writeIntBinary(mode, out); writeIntBinary(replica_num, out); @@ -55,10 +56,13 @@ ParallelReadRequest ParallelReadRequest::deserialize(ReadBuffer & in) { UInt64 version; readIntBinary(version, in); - if (version != DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION) - throw Exception(ErrorCodes::UNKNOWN_PROTOCOL, "Protocol versions for parallel reading "\ + if (version != DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION) + throw Exception( + ErrorCodes::UNKNOWN_PROTOCOL, + "Protocol versions for parallel reading " "from replicas differ. Got: {}, supported version: {}", - version, DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION); + version, + DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION); CoordinationMode mode; size_t replica_num; @@ -90,9 +94,8 @@ void ParallelReadRequest::merge(ParallelReadRequest & other) void ParallelReadResponse::serialize(WriteBuffer & out) const { - UInt64 version = DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION; /// Must be the first - writeIntBinary(version, out); + writeIntBinary(DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION, out); writeBoolText(finish, out); description.serialize(out); @@ -107,26 +110,29 @@ void ParallelReadResponse::deserialize(ReadBuffer & in) { UInt64 version; readIntBinary(version, in); - if (version != DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION) - throw Exception(ErrorCodes::UNKNOWN_PROTOCOL, "Protocol versions for parallel reading " \ + if (version != DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION) + throw Exception( + ErrorCodes::UNKNOWN_PROTOCOL, + "Protocol versions for parallel reading " "from replicas differ. Got: {}, supported version: {}", - version, DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION); + version, + DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION); readBoolText(finish, in); description.deserialize(in); } -void InitialAllRangesAnnouncement::serialize(WriteBuffer & out) const +void InitialAllRangesAnnouncement::serialize(WriteBuffer & out, UInt64 client_protocol_revision) const { - UInt64 version = DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION; /// Must be the first - writeIntBinary(version, out); + writeIntBinary(DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION, out); writeIntBinary(mode, out); description.serialize(out); writeIntBinary(replica_num, out); - writeIntBinary(mark_segment_size, out); + if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_ADAPTIVE_MARK_SEGMENT_FOR_PARALLEL_REPLICAS) + writeIntBinary(mark_segment_size, out); } @@ -138,14 +144,17 @@ String InitialAllRangesAnnouncement::describe() return result; } -InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffer & in) +InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffer & in, UInt64 client_protocol_revision) { UInt64 version; readIntBinary(version, in); - if (version != DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION) - throw Exception(ErrorCodes::UNKNOWN_PROTOCOL, "Protocol versions for parallel reading " \ + if (version != DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION) + throw Exception( + ErrorCodes::UNKNOWN_PROTOCOL, + "Protocol versions for parallel reading " "from replicas differ. Got: {}, supported version: {}", - version, DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION); + version, + DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION); CoordinationMode mode; RangesInDataPartsDescription description; @@ -158,7 +167,7 @@ InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffe readIntBinary(replica_num, in); size_t mark_segment_size = 128; - if (version >= 4) + if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_ADAPTIVE_MARK_SEGMENT_FOR_PARALLEL_REPLICAS) readIntBinary(mark_segment_size, in); return InitialAllRangesAnnouncement{ diff --git a/src/Storages/MergeTree/RequestResponse.h b/src/Storages/MergeTree/RequestResponse.h index 17518adf833..fcb6147c087 100644 --- a/src/Storages/MergeTree/RequestResponse.h +++ b/src/Storages/MergeTree/RequestResponse.h @@ -102,9 +102,9 @@ struct InitialAllRangesAnnouncement size_t replica_num; size_t mark_segment_size; - void serialize(WriteBuffer & out) const; + void serialize(WriteBuffer & out, UInt64 client_protocol_revision) const; String describe(); - static InitialAllRangesAnnouncement deserialize(ReadBuffer & in); + static InitialAllRangesAnnouncement deserialize(ReadBuffer & i, UInt64 client_protocol_revisionn); }; diff --git a/tests/integration/test_backward_compatibility/test_parallel_replicas_protocol.py b/tests/integration/test_backward_compatibility/test_parallel_replicas_protocol.py index 6f97df95876..e1b9049ef5d 100644 --- a/tests/integration/test_backward_compatibility/test_parallel_replicas_protocol.py +++ b/tests/integration/test_backward_compatibility/test_parallel_replicas_protocol.py @@ -11,7 +11,7 @@ nodes = [ main_configs=["configs/clusters.xml"], with_zookeeper=False, image="clickhouse/clickhouse-server", - tag="23.11", + tag="23.11", # earlier versions lead to "Not found column sum(a) in block." exception 🤷 stay_alive=True, use_old_analyzer=True, with_installed_binary=True, From 30229a3bfdd9ca0e827de0f741b9ff1d9553203d Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Sun, 18 Aug 2024 17:44:16 +0100 Subject: [PATCH 06/14] better --- src/Core/ProtocolDefines.h | 4 +-- .../QueryPlan/ReadFromMergeTree.cpp | 26 ++++++++++--------- .../MergeTreeReadPoolParallelReplicas.cpp | 8 +++--- src/Storages/MergeTree/RequestResponse.cpp | 4 +++ src/Storages/MergeTree/RequestResponse.h | 2 +- 5 files changed, 25 insertions(+), 19 deletions(-) diff --git a/src/Core/ProtocolDefines.h b/src/Core/ProtocolDefines.h index 55c87976355..5acc07e70b7 100644 --- a/src/Core/ProtocolDefines.h +++ b/src/Core/ProtocolDefines.h @@ -94,6 +94,4 @@ static constexpr auto DBMS_MIN_REVISION_WITH_ADAPTIVE_MARK_SEGMENT_FOR_PARALLEL_ /// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION, /// later is just a number for server version (one number instead of commit SHA) /// for simplicity (sometimes it may be more convenient in some use cases). -static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54470; - -} +static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54471; diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 5d3a676c959..a52a36daf7d 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -1,6 +1,8 @@ #include +#include #include +#include #include #include #include @@ -8,6 +10,8 @@ #include #include #include +#include +#include #include #include #include @@ -16,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -24,10 +29,11 @@ #include #include #include -#include #include -#include +#include +#include #include +#include #include #include #include @@ -41,18 +47,12 @@ #include #include #include -#include -#include -#include -#include -#include #include #include #include #include -#include "Interpreters/Cluster.h" #include "config.h" using namespace DB; @@ -344,11 +344,12 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas( { const auto & client_info = context->getClientInfo(); - auto extension = ParallelReadingExtension{ + auto extension = ParallelReadingExtension + { .all_callback = all_ranges_callback.value(), .callback = read_task_callback.value(), .number_of_current_replica = client_info.number_of_current_replica, - .total_nodes_count = context->getClusterForParallelReplicas()->getShardsInfo().begin()->getAllNodeCount(), + .total_nodes_count = context->getClusterForParallelReplicas()->getShardsInfo().at(0).getAllNodeCount(), }; /// We have a special logic for local replica. It has to read less data, because in some cases it should @@ -523,11 +524,12 @@ Pipe ReadFromMergeTree::readInOrder( if (is_parallel_reading_from_replicas) { const auto & client_info = context->getClientInfo(); - ParallelReadingExtension extension{ + ParallelReadingExtension extension + { .all_callback = all_ranges_callback.value(), .callback = read_task_callback.value(), .number_of_current_replica = client_info.number_of_current_replica, - .total_nodes_count = context->getClusterForParallelReplicas()->getShardsInfo().begin()->getAllNodeCount(), + .total_nodes_count = context->getClusterForParallelReplicas()->getShardsInfo().at(0).getAllNodeCount(), }; auto multiplier = context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier; diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp index 87fc1c12ddd..fc982fc0249 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp @@ -44,14 +44,16 @@ size_t chooseSegmentSize( min_marks_per_task * threads, sum_marks / number_of_replicas / number_of_replicas); - /// Here we take max of three numbers: - /// * user provided setting (0 by default) + /// Here we take max of two numbers: /// * (min_marks_per_task * threads) = the number of marks we request from the coordinator each time - there is no point to have segments smaller than one unit of work for a replica /// * (sum_marks / number_of_replicas^2) - we use consistent hashing for work distribution (including work stealing). If we have a really slow replica /// everything up to (1/number_of_replicas) portion of its work will be stolen by other replicas. And it owns (1/number_of_replicas) share of total number of marks. /// Also important to note here that sum_marks is calculated after PK analysis, it means in particular that different segment sizes might be used for the /// same table for different queries (it is intentional). - mark_segment_size = std::max({mark_segment_size, min_marks_per_task * threads, sum_marks / number_of_replicas / number_of_replicas}); + /// + /// Positive `mark_segment_size` means it is a user provided value, we have to preserve it. + if (mark_segment_size == 0) + mark_segment_size = std::max(min_marks_per_task * threads, sum_marks / number_of_replicas / number_of_replicas); /// Squeeze the value to the borders. mark_segment_size = std::clamp(mark_segment_size, borders.front(), borders.back()); diff --git a/src/Storages/MergeTree/RequestResponse.cpp b/src/Storages/MergeTree/RequestResponse.cpp index 5e7e392384f..48ff6ebccfd 100644 --- a/src/Storages/MergeTree/RequestResponse.cpp +++ b/src/Storages/MergeTree/RequestResponse.cpp @@ -20,6 +20,10 @@ namespace ErrorCodes namespace { +/// Previously we had a separate protocol version number for parallel replicas. +/// But we didn't maintain backward compatibility and every protocol change was breaking. +/// Now we have to support at least minimal tail of the previous versions and the implementation +/// is based on the common tcp protocol version as in all other places. constexpr UInt64 DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; CoordinationMode validateAndGet(uint8_t candidate) diff --git a/src/Storages/MergeTree/RequestResponse.h b/src/Storages/MergeTree/RequestResponse.h index fcb6147c087..da2fa2795a3 100644 --- a/src/Storages/MergeTree/RequestResponse.h +++ b/src/Storages/MergeTree/RequestResponse.h @@ -94,7 +94,7 @@ struct InitialAllRangesAnnouncement InitialAllRangesAnnouncement( CoordinationMode mode_, RangesInDataPartsDescription description_, size_t replica_num_, size_t mark_segment_size_) - : mode(mode_), description(description_), replica_num(replica_num_), mark_segment_size(mark_segment_size_) + : mode(mode_), description(std::move(description_)), replica_num(replica_num_), mark_segment_size(mark_segment_size_) {} CoordinationMode mode; From c252b3c8b05d85acb355054543d0b7f78d171af0 Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Sun, 18 Aug 2024 18:29:48 +0100 Subject: [PATCH 07/14] fix build --- src/Core/ProtocolDefines.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Core/ProtocolDefines.h b/src/Core/ProtocolDefines.h index 5acc07e70b7..f80ddc646bb 100644 --- a/src/Core/ProtocolDefines.h +++ b/src/Core/ProtocolDefines.h @@ -95,3 +95,5 @@ static constexpr auto DBMS_MIN_REVISION_WITH_ADAPTIVE_MARK_SEGMENT_FOR_PARALLEL_ /// later is just a number for server version (one number instead of commit SHA) /// for simplicity (sometimes it may be more convenient in some use cases). static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54471; + +} From 8cdc10cf656791ab88093cddb0cb07bb4209492a Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Mon, 9 Sep 2024 18:11:03 +0100 Subject: [PATCH 08/14] fix settings changes --- src/Core/SettingsChangesHistory.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 0a4f31522ba..bd701af39b0 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -76,7 +76,8 @@ static std::initializer_list Date: Tue, 10 Sep 2024 12:07:44 +0100 Subject: [PATCH 09/14] fix --- src/Core/SettingsChangesHistory.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index a5cbc8f1ec6..d19ca031c25 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -78,7 +78,6 @@ static std::initializer_list Date: Tue, 10 Sep 2024 17:18:27 +0100 Subject: [PATCH 10/14] fix wording --- src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp index bee73a38128..452f11085e3 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp @@ -47,7 +47,7 @@ size_t chooseSegmentSize( /// Here we take max of two numbers: /// * (min_marks_per_task * threads) = the number of marks we request from the coordinator each time - there is no point to have segments smaller than one unit of work for a replica /// * (sum_marks / number_of_replicas^2) - we use consistent hashing for work distribution (including work stealing). If we have a really slow replica - /// everything up to (1/number_of_replicas) portion of its work will be stolen by other replicas. And it owns (1/number_of_replicas) share of total number of marks. + /// everything except (1/number_of_replicas) portion of its work will be stolen by other replicas. And it owns (1/number_of_replicas) share of total number of marks. /// Also important to note here that sum_marks is calculated after PK analysis, it means in particular that different segment sizes might be used for the /// same table for different queries (it is intentional). /// From fc83c1c7a2200ecc5b571d931415a7af16d1865b Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Wed, 11 Sep 2024 20:20:18 +0100 Subject: [PATCH 11/14] use final task size in segment size calculation --- .../MergeTreeReadPoolParallelReplicas.cpp | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp index 452f11085e3..71d89f9950a 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp @@ -70,6 +70,17 @@ size_t chooseSegmentSize( UNREACHABLE(); } +size_t getMinMarksPerTask(size_t min_marks_per_task, const std::vector & per_part_infos) +{ + for (const auto & info : per_part_infos) + min_marks_per_task = std::max(min_marks_per_task, info->min_marks_per_task); + + if (min_marks_per_task == 0) + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, "Chosen number of marks to read is zero (likely because of weird interference of settings)"); + + return min_marks_per_task; +} } namespace ProfileEvents @@ -111,7 +122,7 @@ MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas( context_) , extension(std::move(extension_)) , coordination_mode(CoordinationMode::Default) - , min_marks_per_task(pool_settings.min_marks_for_concurrent_read) + , min_marks_per_task(getMinMarksPerTask(pool_settings.min_marks_for_concurrent_read, per_part_infos)) , mark_segment_size(chooseSegmentSize( log, context_->getSettingsRef().parallel_replicas_mark_segment_size, @@ -120,13 +131,6 @@ MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas( pool_settings.sum_marks, extension.total_nodes_count)) { - for (const auto & info : per_part_infos) - min_marks_per_task = std::max(min_marks_per_task, info->min_marks_per_task); - - if (min_marks_per_task == 0) - throw Exception( - ErrorCodes::BAD_ARGUMENTS, "Chosen number of marks to read is zero (likely because of weird interference of settings)"); - extension.all_callback(InitialAllRangesAnnouncement( coordination_mode, parts_ranges.getDescriptions(), extension.number_of_current_replica, mark_segment_size)); } From 1e3bc6d359453ef356d3d2c19af92414dee68ef9 Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Thu, 12 Sep 2024 15:15:57 +0100 Subject: [PATCH 12/14] log mark_segment_size on initiator --- src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp index 6d0ba879881..1955501b4ba 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp @@ -394,7 +394,7 @@ void DefaultCoordinator::initializeReadingState(InitialAllRangesAnnouncement ann if (mark_segment_size == 0) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Zero value provided for `mark_segment_size`"); - LOG_DEBUG(log, "Reading state is fully initialized: {}", fmt::join(all_parts_to_read, "; ")); + LOG_DEBUG(log, "Reading state is fully initialized: {}, mark_segment_size: {}", fmt::join(all_parts_to_read, "; "), mark_segment_size); } void DefaultCoordinator::markReplicaAsUnavailable(size_t replica_number) From 16f93ea1b34b7c4010e88e3a212a98f49e127c8f Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Thu, 12 Sep 2024 15:40:51 +0100 Subject: [PATCH 13/14] revive separate protocol versioning for PRs --- src/Client/Connection.cpp | 9 ++- src/Client/Connection.h | 1 + src/Core/ProtocolDefines.h | 4 +- src/Server/TCPHandler.cpp | 9 ++- src/Server/TCPHandler.h | 1 + src/Storages/MergeTree/RequestResponse.cpp | 83 +++++++++++----------- src/Storages/MergeTree/RequestResponse.h | 8 +-- 7 files changed, 63 insertions(+), 52 deletions(-) diff --git a/src/Client/Connection.cpp b/src/Client/Connection.cpp index 61022220b61..8a1c7d3988a 100644 --- a/src/Client/Connection.cpp +++ b/src/Client/Connection.cpp @@ -455,6 +455,9 @@ void Connection::sendAddendum() writeStringBinary(proto_recv_chunked, *out); } + if (server_revision >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL) + writeVarUInt(DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION, *out); + out->next(); } @@ -525,6 +528,8 @@ void Connection::receiveHello(const Poco::Timespan & handshake_timeout) readVarUInt(server_version_major, *in); readVarUInt(server_version_minor, *in); readVarUInt(server_revision, *in); + if (server_revision >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL) + readVarUInt(server_parallel_replicas_protocol_version, *in); if (server_revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE) readStringBinary(server_timezone, *in); if (server_revision >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME) @@ -959,7 +964,7 @@ void Connection::sendReadTaskResponse(const String & response) void Connection::sendMergeTreeReadTaskResponse(const ParallelReadResponse & response) { writeVarUInt(Protocol::Client::MergeTreeReadTaskResponse, *out); - response.serialize(*out); + response.serialize(*out, server_parallel_replicas_protocol_version); out->finishChunk(); out->next(); } @@ -1413,7 +1418,7 @@ ParallelReadRequest Connection::receiveParallelReadRequest() const InitialAllRangesAnnouncement Connection::receiveInitialParallelReadAnnouncement() const { - return InitialAllRangesAnnouncement::deserialize(*in, server_revision); + return InitialAllRangesAnnouncement::deserialize(*in, server_parallel_replicas_protocol_version); } diff --git a/src/Client/Connection.h b/src/Client/Connection.h index ed84bc51318..e09d913f1ba 100644 --- a/src/Client/Connection.h +++ b/src/Client/Connection.h @@ -210,6 +210,7 @@ private: UInt64 server_version_minor = 0; UInt64 server_version_patch = 0; UInt64 server_revision = 0; + UInt64 server_parallel_replicas_protocol_version = 0; String server_timezone; String server_display_name; diff --git a/src/Core/ProtocolDefines.h b/src/Core/ProtocolDefines.h index f80ddc646bb..49c6fc1dde6 100644 --- a/src/Core/ProtocolDefines.h +++ b/src/Core/ProtocolDefines.h @@ -33,6 +33,8 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54 static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 1; +static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; +static constexpr auto DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 4; static constexpr auto DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS = 54453; static constexpr auto DBMS_MERGE_TREE_PART_INFO_VERSION = 1; @@ -85,7 +87,7 @@ static constexpr auto DBMS_MIN_REVISION_WITH_ROWS_BEFORE_AGGREGATION = 54469; /// Packets size header static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS = 54470; -static constexpr auto DBMS_MIN_REVISION_WITH_ADAPTIVE_MARK_SEGMENT_FOR_PARALLEL_REPLICAS = 54471; +static constexpr auto DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL = 54471; /// Version of ClickHouse TCP protocol. /// diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 4df6e7fb7c3..13573ce6db0 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -1270,7 +1270,7 @@ void TCPHandler::sendReadTaskRequestAssumeLocked() void TCPHandler::sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRangesAnnouncement announcement) { writeVarUInt(Protocol::Server::MergeTreeAllRangesAnnouncement, *out); - announcement.serialize(*out, client_tcp_protocol_version); + announcement.serialize(*out, client_parallel_replicas_protocol_version); out->finishChunk(); out->next(); @@ -1280,7 +1280,7 @@ void TCPHandler::sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRanges void TCPHandler::sendMergeTreeReadTaskRequestAssumeLocked(ParallelReadRequest request) { writeVarUInt(Protocol::Server::MergeTreeReadTaskRequest, *out); - request.serialize(*out); + request.serialize(*out, client_parallel_replicas_protocol_version); out->finishChunk(); out->next(); @@ -1652,6 +1652,9 @@ void TCPHandler::receiveAddendum() readStringBinary(proto_send_chunked_cl, *in); readStringBinary(proto_recv_chunked_cl, *in); } + + if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL) + readVarUInt(client_parallel_replicas_protocol_version, *in); } @@ -1679,6 +1682,8 @@ void TCPHandler::sendHello() writeVarUInt(VERSION_MAJOR, *out); writeVarUInt(VERSION_MINOR, *out); writeVarUInt(DBMS_TCP_PROTOCOL_VERSION, *out); + if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL) + writeVarUInt(DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION, *out); if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE) writeStringBinary(DateLUT::instance().getTimeZone(), *out); if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME) diff --git a/src/Server/TCPHandler.h b/src/Server/TCPHandler.h index dca40e98920..3b6e0059a30 100644 --- a/src/Server/TCPHandler.h +++ b/src/Server/TCPHandler.h @@ -188,6 +188,7 @@ private: UInt64 client_version_minor = 0; UInt64 client_version_patch = 0; UInt32 client_tcp_protocol_version = 0; + UInt32 client_parallel_replicas_protocol_version = 0; String proto_send_chunked_cl = "notchunked"; String proto_recv_chunked_cl = "notchunked"; String quota_key; diff --git a/src/Storages/MergeTree/RequestResponse.cpp b/src/Storages/MergeTree/RequestResponse.cpp index 48ff6ebccfd..04d7b23513a 100644 --- a/src/Storages/MergeTree/RequestResponse.cpp +++ b/src/Storages/MergeTree/RequestResponse.cpp @@ -14,18 +14,12 @@ namespace DB namespace ErrorCodes { - extern const int UNKNOWN_PROTOCOL; - extern const int UNKNOWN_ELEMENT_OF_ENUM; +extern const int UNKNOWN_PROTOCOL; +extern const int UNKNOWN_ELEMENT_OF_ENUM; } namespace { -/// Previously we had a separate protocol version number for parallel replicas. -/// But we didn't maintain backward compatibility and every protocol change was breaking. -/// Now we have to support at least minimal tail of the previous versions and the implementation -/// is based on the common tcp protocol version as in all other places. -constexpr UInt64 DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; - CoordinationMode validateAndGet(uint8_t candidate) { if (candidate <= static_cast(CoordinationMode::MAX)) @@ -35,10 +29,15 @@ CoordinationMode validateAndGet(uint8_t candidate) } } -void ParallelReadRequest::serialize(WriteBuffer & out) const +void ParallelReadRequest::serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const { - /// Must be the first - writeIntBinary(DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION, out); + /// Previously we didn't maintain backward compatibility and every change was breaking. + /// Particularly, we had an equality check for the version. To work around that code + /// in previous server versions we now have to lie to them about the version. + const UInt64 version = initiator_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL + ? DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION + : DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION; + writeIntBinary(version, out); writeIntBinary(mode, out); writeIntBinary(replica_num, out); @@ -60,13 +59,12 @@ ParallelReadRequest ParallelReadRequest::deserialize(ReadBuffer & in) { UInt64 version; readIntBinary(version, in); - if (version != DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION) + if (version < DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION) throw Exception( ErrorCodes::UNKNOWN_PROTOCOL, - "Protocol versions for parallel reading " - "from replicas differ. Got: {}, supported version: {}", + "Parallel replicas protocol version is too old. Got: {}, min supported version: {}", version, - DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION); + DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION); CoordinationMode mode; size_t replica_num; @@ -80,12 +78,7 @@ ParallelReadRequest ParallelReadRequest::deserialize(ReadBuffer & in) readIntBinary(min_number_of_marks, in); description.deserialize(in); - return ParallelReadRequest( - mode, - replica_num, - min_number_of_marks, - std::move(description) - ); + return ParallelReadRequest(mode, replica_num, min_number_of_marks, std::move(description)); } void ParallelReadRequest::merge(ParallelReadRequest & other) @@ -96,10 +89,16 @@ void ParallelReadRequest::merge(ParallelReadRequest & other) description.merge(other.description); } -void ParallelReadResponse::serialize(WriteBuffer & out) const +void ParallelReadResponse::serialize(WriteBuffer & out, UInt64 replica_protocol_version) const { + /// Previously we didn't maintain backward compatibility and every change was breaking. + /// Particularly, we had an equality check for the version. To work around that code + /// in previous server versions we now have to lie to them about the version. + UInt64 version = replica_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL + ? DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION + : DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION; /// Must be the first - writeIntBinary(DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION, out); + writeIntBinary(version, out); writeBoolText(finish, out); description.serialize(out); @@ -114,28 +113,32 @@ void ParallelReadResponse::deserialize(ReadBuffer & in) { UInt64 version; readIntBinary(version, in); - if (version != DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION) + if (version < DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION) throw Exception( ErrorCodes::UNKNOWN_PROTOCOL, - "Protocol versions for parallel reading " - "from replicas differ. Got: {}, supported version: {}", + "Parallel replicas protocol version is too old. Got: {}, min supported version: {}", version, - DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION); + DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION); readBoolText(finish, in); description.deserialize(in); } -void InitialAllRangesAnnouncement::serialize(WriteBuffer & out, UInt64 client_protocol_revision) const +void InitialAllRangesAnnouncement::serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const { - /// Must be the first - writeIntBinary(DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION, out); + /// Previously we didn't maintain backward compatibility and every change was breaking. + /// Particularly, we had an equality check for the version. To work around that code + /// in previous server versions we now have to lie to them about the version. + UInt64 version = initiator_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL + ? DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION + : DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION; + writeIntBinary(version, out); writeIntBinary(mode, out); description.serialize(out); writeIntBinary(replica_num, out); - if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_ADAPTIVE_MARK_SEGMENT_FOR_PARALLEL_REPLICAS) + if (initiator_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL) writeIntBinary(mark_segment_size, out); } @@ -148,17 +151,16 @@ String InitialAllRangesAnnouncement::describe() return result; } -InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffer & in, UInt64 client_protocol_revision) +InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffer & in, UInt64 replica_protocol_version) { UInt64 version; readIntBinary(version, in); - if (version != DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION) + if (version < DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION) throw Exception( ErrorCodes::UNKNOWN_PROTOCOL, - "Protocol versions for parallel reading " - "from replicas differ. Got: {}, supported version: {}", + "Parallel replicas protocol version is too old. Got: {}, min supported version: {}", version, - DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION); + DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION); CoordinationMode mode; RangesInDataPartsDescription description; @@ -171,15 +173,10 @@ InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffe readIntBinary(replica_num, in); size_t mark_segment_size = 128; - if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_ADAPTIVE_MARK_SEGMENT_FOR_PARALLEL_REPLICAS) + if (replica_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL) readIntBinary(mark_segment_size, in); - return InitialAllRangesAnnouncement{ - mode, - description, - replica_num, - mark_segment_size, - }; + return InitialAllRangesAnnouncement{mode, description, replica_num, mark_segment_size}; } } diff --git a/src/Storages/MergeTree/RequestResponse.h b/src/Storages/MergeTree/RequestResponse.h index da2fa2795a3..96b65c45bfa 100644 --- a/src/Storages/MergeTree/RequestResponse.h +++ b/src/Storages/MergeTree/RequestResponse.h @@ -63,7 +63,7 @@ struct ParallelReadRequest /// Contains only data part names without mark ranges. RangesInDataPartsDescription description; - void serialize(WriteBuffer & out) const; + void serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const; String describe() const; static ParallelReadRequest deserialize(ReadBuffer & in); void merge(ParallelReadRequest & other); @@ -78,7 +78,7 @@ struct ParallelReadResponse bool finish{false}; RangesInDataPartsDescription description; - void serialize(WriteBuffer & out) const; + void serialize(WriteBuffer & out, UInt64 replica_protocol_version) const; String describe() const; void deserialize(ReadBuffer & in); }; @@ -102,9 +102,9 @@ struct InitialAllRangesAnnouncement size_t replica_num; size_t mark_segment_size; - void serialize(WriteBuffer & out, UInt64 client_protocol_revision) const; + void serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const; String describe(); - static InitialAllRangesAnnouncement deserialize(ReadBuffer & i, UInt64 client_protocol_revisionn); + static InitialAllRangesAnnouncement deserialize(ReadBuffer & i, UInt64 replica_protocol_version); }; From 63577507c968e82f51c35e5f893864191b86ccf6 Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Sat, 14 Sep 2024 21:43:27 +0100 Subject: [PATCH 14/14] fix build --- src/Interpreters/ClusterProxy/executeQuery.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 771c6a89caa..1f854c41873 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -532,7 +532,7 @@ void executeQueryWithParallelReplicas( max_replicas_to_use = shard.getAllNodeCount(); } - auto coordinator = std::make_shared(max_replicas_to_use, settings.parallel_replicas_mark_segment_size); + auto coordinator = std::make_shared(max_replicas_to_use); auto external_tables = new_context->getExternalTables();