diff --git a/src/Client/Connection.cpp b/src/Client/Connection.cpp index da6e5baa3ad..8a1c7d3988a 100644 --- a/src/Client/Connection.cpp +++ b/src/Client/Connection.cpp @@ -455,6 +455,9 @@ void Connection::sendAddendum() writeStringBinary(proto_recv_chunked, *out); } + if (server_revision >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL) + writeVarUInt(DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION, *out); + out->next(); } @@ -525,6 +528,8 @@ void Connection::receiveHello(const Poco::Timespan & handshake_timeout) readVarUInt(server_version_major, *in); readVarUInt(server_version_minor, *in); readVarUInt(server_revision, *in); + if (server_revision >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL) + readVarUInt(server_parallel_replicas_protocol_version, *in); if (server_revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE) readStringBinary(server_timezone, *in); if (server_revision >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME) @@ -959,7 +964,7 @@ void Connection::sendReadTaskResponse(const String & response) void Connection::sendMergeTreeReadTaskResponse(const ParallelReadResponse & response) { writeVarUInt(Protocol::Client::MergeTreeReadTaskResponse, *out); - response.serialize(*out); + response.serialize(*out, server_parallel_replicas_protocol_version); out->finishChunk(); out->next(); } @@ -1413,7 +1418,7 @@ ParallelReadRequest Connection::receiveParallelReadRequest() const InitialAllRangesAnnouncement Connection::receiveInitialParallelReadAnnouncement() const { - return InitialAllRangesAnnouncement::deserialize(*in); + return InitialAllRangesAnnouncement::deserialize(*in, server_parallel_replicas_protocol_version); } diff --git a/src/Client/Connection.h b/src/Client/Connection.h index ed84bc51318..e09d913f1ba 100644 --- a/src/Client/Connection.h +++ b/src/Client/Connection.h @@ -210,6 +210,7 @@ private: UInt64 server_version_minor = 0; UInt64 server_version_patch = 0; UInt64 server_revision = 0; + UInt64 server_parallel_replicas_protocol_version = 0; String server_timezone; String server_display_name; diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 467dfe60cd7..7c28b193d13 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -376,6 +376,7 @@ The server successfully detected this situation and will download merged part fr M(ParallelReplicasReadAssignedMarks, "Sum across all replicas of how many of scheduled marks were assigned by consistent hash") \ M(ParallelReplicasReadUnassignedMarks, "Sum across all replicas of how many unassigned marks were scheduled") \ M(ParallelReplicasReadAssignedForStealingMarks, "Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash") \ + M(ParallelReplicasReadMarks, "How many marks were read by the given replica") \ \ M(ParallelReplicasStealingByHashMicroseconds, "Time spent collecting segments meant for stealing by hash") \ M(ParallelReplicasProcessingPartsMicroseconds, "Time spent processing data parts") \ @@ -529,6 +530,7 @@ The server successfully detected this situation and will download merged part fr M(CachedReadBufferReadFromCacheMicroseconds, "Time reading from filesystem cache") \ M(CachedReadBufferReadFromSourceBytes, "Bytes read from filesystem cache source (from remote fs, etc)") \ M(CachedReadBufferReadFromCacheBytes, "Bytes read from filesystem cache") \ + M(CachedReadBufferPredownloadedBytes, "Bytes read from filesystem cache source. Cache segments are read from left to right as a whole, it might be that we need to predownload some part of the segment irrelevant for the current task just to get to the needed data") \ M(CachedReadBufferCacheWriteBytes, "Bytes written from source (remote fs, etc) to filesystem cache") \ M(CachedReadBufferCacheWriteMicroseconds, "Time spent writing data into filesystem cache") \ M(CachedReadBufferCreateBufferMicroseconds, "Prepare buffer time") \ diff --git a/src/Core/ProtocolDefines.h b/src/Core/ProtocolDefines.h index 790987272fa..49c6fc1dde6 100644 --- a/src/Core/ProtocolDefines.h +++ b/src/Core/ProtocolDefines.h @@ -33,7 +33,8 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54 static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 1; -static constexpr auto DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; +static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; +static constexpr auto DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 4; static constexpr auto DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS = 54453; static constexpr auto DBMS_MERGE_TREE_PART_INFO_VERSION = 1; @@ -86,6 +87,8 @@ static constexpr auto DBMS_MIN_REVISION_WITH_ROWS_BEFORE_AGGREGATION = 54469; /// Packets size header static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS = 54470; +static constexpr auto DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL = 54471; + /// Version of ClickHouse TCP protocol. /// /// Should be incremented manually on protocol changes. @@ -93,6 +96,6 @@ static constexpr auto DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS = 54470; /// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION, /// later is just a number for server version (one number instead of commit SHA) /// for simplicity (sometimes it may be more convenient in some use cases). -static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54470; +static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54471; } diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 40c08eeb84d..fadc4079fe0 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -946,7 +946,7 @@ class IColumn; M(Bool, parallel_replicas_for_non_replicated_merge_tree, false, "If true, ClickHouse will use parallel replicas algorithm also for non-replicated MergeTree tables", 0) \ M(UInt64, parallel_replicas_min_number_of_rows_per_replica, 0, "Limit the number of replicas used in a query to (estimated rows to read / min_number_of_rows_per_replica). The max is still limited by 'max_parallel_replicas'", 0) \ M(Bool, parallel_replicas_prefer_local_join, true, "If true, and JOIN can be executed with parallel replicas algorithm, and all storages of right JOIN part are *MergeTree, local JOIN will be used instead of GLOBAL JOIN.", 0) \ - M(UInt64, parallel_replicas_mark_segment_size, 128, "Parts virtually divided into segments to be distributed between replicas for parallel reading. This setting controls the size of these segments. Not recommended to change until you're absolutely sure in what you're doing", 0) \ + M(UInt64, parallel_replicas_mark_segment_size, 0, "Parts virtually divided into segments to be distributed between replicas for parallel reading. This setting controls the size of these segments. Not recommended to change until you're absolutely sure in what you're doing. Value should be in range [128; 16384]", 0) \ M(Bool, allow_archive_path_syntax, true, "File/S3 engines/table function will parse paths with '::' as ' :: ' if archive has correct extension", 0) \ M(Bool, parallel_replicas_local_plan, false, "Build local plan for local replica", 0) \ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index ef2088f25b0..560f144866b 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -79,6 +79,7 @@ static std::initializer_list(max_replicas_to_use, settings.parallel_replicas_mark_segment_size); + auto coordinator = std::make_shared(max_replicas_to_use); auto external_tables = new_context->getExternalTables(); diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 0b96cc57274..60eadb80496 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -1,6 +1,8 @@ #include +#include #include +#include #include #include #include @@ -8,6 +10,8 @@ #include #include #include +#include +#include #include #include #include @@ -16,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -24,10 +29,11 @@ #include #include #include -#include #include -#include +#include +#include #include +#include #include #include #include @@ -41,11 +47,6 @@ #include #include #include -#include -#include -#include -#include -#include #include #include @@ -381,6 +382,7 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(RangesInDataParts parts_wit .all_callback = all_ranges_callback.value(), .callback = read_task_callback.value(), .number_of_current_replica = number_of_current_replica.value_or(client_info.number_of_current_replica), + .total_nodes_count = context->getClusterForParallelReplicas()->getShardsInfo().at(0).getAllNodeCount(), }; /// We have a special logic for local replica. It has to read less data, because in some cases it should @@ -563,6 +565,7 @@ Pipe ReadFromMergeTree::readInOrder( .all_callback = all_ranges_callback.value(), .callback = read_task_callback.value(), .number_of_current_replica = number_of_current_replica.value_or(client_info.number_of_current_replica), + .total_nodes_count = context->getClusterForParallelReplicas()->getShardsInfo().at(0).getAllNodeCount(), }; auto multiplier = context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier; diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 3ae71bd6d07..50963887752 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -1270,7 +1270,7 @@ void TCPHandler::sendReadTaskRequestAssumeLocked() void TCPHandler::sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRangesAnnouncement announcement) { writeVarUInt(Protocol::Server::MergeTreeAllRangesAnnouncement, *out); - announcement.serialize(*out); + announcement.serialize(*out, client_parallel_replicas_protocol_version); out->finishChunk(); out->next(); @@ -1280,7 +1280,7 @@ void TCPHandler::sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRanges void TCPHandler::sendMergeTreeReadTaskRequestAssumeLocked(ParallelReadRequest request) { writeVarUInt(Protocol::Server::MergeTreeReadTaskRequest, *out); - request.serialize(*out); + request.serialize(*out, client_parallel_replicas_protocol_version); out->finishChunk(); out->next(); @@ -1662,6 +1662,9 @@ void TCPHandler::receiveAddendum() readStringBinary(proto_send_chunked_cl, *in); readStringBinary(proto_recv_chunked_cl, *in); } + + if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL) + readVarUInt(client_parallel_replicas_protocol_version, *in); } @@ -1689,6 +1692,8 @@ void TCPHandler::sendHello() writeVarUInt(VERSION_MAJOR, *out); writeVarUInt(VERSION_MINOR, *out); writeVarUInt(DBMS_TCP_PROTOCOL_VERSION, *out); + if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL) + writeVarUInt(DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION, *out); if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE) writeStringBinary(DateLUT::instance().getTimeZone(), *out); if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME) diff --git a/src/Server/TCPHandler.h b/src/Server/TCPHandler.h index dca40e98920..3b6e0059a30 100644 --- a/src/Server/TCPHandler.h +++ b/src/Server/TCPHandler.h @@ -188,6 +188,7 @@ private: UInt64 client_version_minor = 0; UInt64 client_version_patch = 0; UInt32 client_tcp_protocol_version = 0; + UInt32 client_parallel_replicas_protocol_version = 0; String proto_send_chunked_cl = "notchunked"; String proto_recv_chunked_cl = "notchunked"; String quota_key; diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp index c19c2baca5c..71d89f9950a 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.cpp @@ -1,6 +1,92 @@ -#include #include +#include +#include + +#include +#include +#include + + +namespace +{ + +size_t chooseSegmentSize( + LoggerPtr log, size_t mark_segment_size, size_t min_marks_per_task, size_t threads, size_t sum_marks, size_t number_of_replicas) +{ + /// Mark segment size determines the granularity of work distribution between replicas. + /// Namely, coordinator will take mark segments of size `mark_segment_size` granules, calculate hash of this segment and assign it to corresponding replica. + /// Small segments are good when we read a small random subset of a table, big - when we do full-scan over a large table. + /// With small segments there is a problem: consider a query like `select max(time) from wikistat`. Average size of `time` per granule is ~5KB. So when we + /// read 128 granules we still read only ~0.5MB of data. With default fs cache segment size of 4MB it means a lot of data will be downloaded and written + /// in cache for no reason. General case will look like this: + /// + /// +---------- useful data + /// v + /// +------+--+------+ + /// |------|++| | + /// |------|++| | + /// +------+--+------+ + /// ^ + /// predownloaded data -----------+ + /// + /// Having large segments solves all the problems in this case. Also bigger segments mean less requests (especially for big tables and full-scans). + /// These three values below chosen mostly intuitively. 128 granules is 1M rows - just a good starting point, 16384 seems to still make sense when reading + /// billions of rows and 1024 - is a reasonable point in between. We limit our choice to only these three options because when we change segment size + /// we essentially change distribution of data between replicas and of course we don't want to use simultaneously tens of different distributions, because + /// it would be a huge waste of cache space. + constexpr std::array borders{128, 1024, 16384}; + + LOG_DEBUG( + log, + "mark_segment_size={}, min_marks_per_task*threads={}, sum_marks/number_of_replicas^2={}", + mark_segment_size, + min_marks_per_task * threads, + sum_marks / number_of_replicas / number_of_replicas); + + /// Here we take max of two numbers: + /// * (min_marks_per_task * threads) = the number of marks we request from the coordinator each time - there is no point to have segments smaller than one unit of work for a replica + /// * (sum_marks / number_of_replicas^2) - we use consistent hashing for work distribution (including work stealing). If we have a really slow replica + /// everything except (1/number_of_replicas) portion of its work will be stolen by other replicas. And it owns (1/number_of_replicas) share of total number of marks. + /// Also important to note here that sum_marks is calculated after PK analysis, it means in particular that different segment sizes might be used for the + /// same table for different queries (it is intentional). + /// + /// Positive `mark_segment_size` means it is a user provided value, we have to preserve it. + if (mark_segment_size == 0) + mark_segment_size = std::max(min_marks_per_task * threads, sum_marks / number_of_replicas / number_of_replicas); + + /// Squeeze the value to the borders. + mark_segment_size = std::clamp(mark_segment_size, borders.front(), borders.back()); + /// After we calculated a hopefully good value for segment_size let's just find the maximal border that is not bigger than the chosen value. + for (auto border : borders | std::views::reverse) + { + if (mark_segment_size >= border) + { + LOG_DEBUG(log, "Chosen segment size: {}", border); + return border; + } + } + + UNREACHABLE(); +} + +size_t getMinMarksPerTask(size_t min_marks_per_task, const std::vector & per_part_infos) +{ + for (const auto & info : per_part_infos) + min_marks_per_task = std::max(min_marks_per_task, info->min_marks_per_task); + + if (min_marks_per_task == 0) + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, "Chosen number of marks to read is zero (likely because of weird interference of settings)"); + + return min_marks_per_task; +} +} + +namespace ProfileEvents +{ +extern const Event ParallelReplicasReadMarks; +} namespace DB { @@ -36,17 +122,17 @@ MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas( context_) , extension(std::move(extension_)) , coordination_mode(CoordinationMode::Default) - , min_marks_per_task(pool_settings.min_marks_for_concurrent_read) + , min_marks_per_task(getMinMarksPerTask(pool_settings.min_marks_for_concurrent_read, per_part_infos)) + , mark_segment_size(chooseSegmentSize( + log, + context_->getSettingsRef().parallel_replicas_mark_segment_size, + min_marks_per_task, + pool_settings.threads, + pool_settings.sum_marks, + extension.total_nodes_count)) { - for (const auto & info : per_part_infos) - min_marks_per_task = std::max(min_marks_per_task, info->min_marks_per_task); - - if (min_marks_per_task == 0) - throw Exception( - ErrorCodes::BAD_ARGUMENTS, "Chosen number of marks to read is zero (likely because of weird interference of settings)"); - - extension.all_callback( - InitialAllRangesAnnouncement(coordination_mode, parts_ranges.getDescriptions(), extension.number_of_current_replica)); + extension.all_callback(InitialAllRangesAnnouncement( + coordination_mode, parts_ranges.getDescriptions(), extension.number_of_current_replica, mark_segment_size)); } MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t /*task_idx*/, MergeTreeReadTask * previous_task) @@ -111,6 +197,7 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t /*task_id if (current_task.ranges.empty()) buffered_ranges.pop_front(); + ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, current_sum_marks); return createTask(per_part_infos[part_idx], std::move(ranges_to_read), previous_task); } diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h index 987b7f80755..b9f2e133c4a 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h @@ -31,12 +31,13 @@ public: private: mutable std::mutex mutex; + LoggerPtr log = getLogger("MergeTreeReadPoolParallelReplicas"); const ParallelReadingExtension extension; const CoordinationMode coordination_mode; size_t min_marks_per_task{0}; + size_t mark_segment_size{0}; RangesInDataPartsDescription buffered_ranges; bool no_more_tasks_available{false}; - LoggerPtr log = getLogger("MergeTreeReadPoolParallelReplicas"); }; } diff --git a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp index f726078cfc8..13a64b4d82e 100644 --- a/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp +++ b/src/Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.cpp @@ -1,5 +1,10 @@ #include +namespace ProfileEvents +{ +extern const Event ParallelReplicasReadMarks; +} + namespace DB { @@ -50,11 +55,8 @@ MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrd for (const auto & part : parts_ranges) buffered_tasks.push_back({part.data_part->info, MarkRanges{}}); - extension.all_callback(InitialAllRangesAnnouncement( - mode, - parts_ranges.getDescriptions(), - extension.number_of_current_replica - )); + extension.all_callback( + InitialAllRangesAnnouncement(mode, parts_ranges.getDescriptions(), extension.number_of_current_replica, /*mark_segment_size_=*/0)); } MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t task_idx, MergeTreeReadTask * previous_task) @@ -75,13 +77,14 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t ta { auto result = std::move(desc.ranges); desc.ranges = MarkRanges{}; + ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, desc.ranges.getNumberOfMarks()); return result; } } return std::nullopt; }; - if (auto result = get_from_buffer(); result) + if (auto result = get_from_buffer()) return createTask(per_part_infos[task_idx], std::move(*result), previous_task); if (no_more_tasks) @@ -104,7 +107,7 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t ta std::move(new_ranges.begin(), new_ranges.end(), std::back_inserter(old_ranges)); } - if (auto result = get_from_buffer(); result) + if (auto result = get_from_buffer()) return createTask(per_part_infos[task_idx], std::move(*result), previous_task); return nullptr; diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.h b/src/Storages/MergeTree/MergeTreeSelectProcessor.h index 7a9cebbcb2e..e20427dbff0 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.h +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.h @@ -27,6 +27,7 @@ struct ParallelReadingExtension MergeTreeAllRangesCallback all_callback; MergeTreeReadTaskCallback callback; size_t number_of_current_replica{0}; + size_t total_nodes_count{0}; }; /// Base class for MergeTreeThreadSelectAlgorithm and MergeTreeSelectAlgorithm diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp index 8abf735b49f..ecfaa92c6db 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.cpp @@ -212,14 +212,11 @@ using PartRefs = std::deque; class DefaultCoordinator : public ParallelReplicasReadingCoordinator::ImplInterface { public: - explicit DefaultCoordinator(size_t replicas_count_, size_t mark_segment_size_) + explicit DefaultCoordinator(size_t replicas_count_) : ParallelReplicasReadingCoordinator::ImplInterface(replicas_count_) - , mark_segment_size(mark_segment_size_) , replica_status(replicas_count_) , distribution_by_hash_queue(replicas_count_) { - if (mark_segment_size == 0) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Zero value provided for `mark_segment_size`"); } ~DefaultCoordinator() override; @@ -232,7 +229,7 @@ public: private: /// This many granules will represent a single segment of marks that will be assigned to a replica - const size_t mark_segment_size{0}; + size_t mark_segment_size{0}; bool state_initialized{false}; size_t finished_replicas{0}; @@ -394,7 +391,11 @@ void DefaultCoordinator::initializeReadingState(InitialAllRangesAnnouncement ann state_initialized = true; source_replica_for_parts_snapshot = announcement.replica_num; - LOG_DEBUG(log, "Reading state is fully initialized: {}", fmt::join(all_parts_to_read, "; ")); + mark_segment_size = announcement.mark_segment_size; + if (mark_segment_size == 0) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Zero value provided for `mark_segment_size`"); + + LOG_DEBUG(log, "Reading state is fully initialized: {}, mark_segment_size: {}", fmt::join(all_parts_to_read, "; "), mark_segment_size); } void DefaultCoordinator::markReplicaAsUnavailable(size_t replica_number) @@ -1062,7 +1063,7 @@ void ParallelReplicasReadingCoordinator::initialize(CoordinationMode mode) switch (mode) { case CoordinationMode::Default: - pimpl = std::make_unique(replicas_count, mark_segment_size); + pimpl = std::make_unique(replicas_count); break; case CoordinationMode::WithOrder: pimpl = std::make_unique>(replicas_count); @@ -1080,8 +1081,7 @@ void ParallelReplicasReadingCoordinator::initialize(CoordinationMode mode) pimpl->markReplicaAsUnavailable(replica); } -ParallelReplicasReadingCoordinator::ParallelReplicasReadingCoordinator(size_t replicas_count_, size_t mark_segment_size_) - : replicas_count(replicas_count_), mark_segment_size(mark_segment_size_) +ParallelReplicasReadingCoordinator::ParallelReplicasReadingCoordinator(size_t replicas_count_) : replicas_count(replicas_count_) { } diff --git a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h index 8b463fda395..ad51d20f553 100644 --- a/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h +++ b/src/Storages/MergeTree/ParallelReplicasReadingCoordinator.h @@ -15,7 +15,7 @@ class ParallelReplicasReadingCoordinator public: class ImplInterface; - explicit ParallelReplicasReadingCoordinator(size_t replicas_count_, size_t mark_segment_size_ = 0); + explicit ParallelReplicasReadingCoordinator(size_t replicas_count_); ~ParallelReplicasReadingCoordinator(); void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement); @@ -35,7 +35,6 @@ private: std::mutex mutex; const size_t replicas_count{0}; - size_t mark_segment_size{0}; std::unique_ptr pimpl; ProgressCallback progress_callback; // store the callback only to bypass it to coordinator implementation std::set replicas_used; diff --git a/src/Storages/MergeTree/RequestResponse.cpp b/src/Storages/MergeTree/RequestResponse.cpp index bcdeb443a0b..cd1cc2ad8e7 100644 --- a/src/Storages/MergeTree/RequestResponse.cpp +++ b/src/Storages/MergeTree/RequestResponse.cpp @@ -2,10 +2,10 @@ #include #include -#include +#include #include #include -#include +#include #include @@ -14,25 +14,29 @@ namespace DB namespace ErrorCodes { - extern const int UNKNOWN_PROTOCOL; - extern const int UNKNOWN_ELEMENT_OF_ENUM; +extern const int UNKNOWN_PROTOCOL; +extern const int UNKNOWN_ELEMENT_OF_ENUM; } namespace { - CoordinationMode validateAndGet(uint8_t candidate) - { - if (candidate <= static_cast(CoordinationMode::MAX)) - return static_cast(candidate); +CoordinationMode validateAndGet(uint8_t candidate) +{ + if (candidate <= static_cast(CoordinationMode::MAX)) + return static_cast(candidate); - throw Exception(ErrorCodes::UNKNOWN_ELEMENT_OF_ENUM, "Unknown reading mode: {}", candidate); - } + throw Exception(ErrorCodes::UNKNOWN_ELEMENT_OF_ENUM, "Unknown reading mode: {}", candidate); +} } -void ParallelReadRequest::serialize(WriteBuffer & out) const +void ParallelReadRequest::serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const { - UInt64 version = DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION; - /// Must be the first + /// Previously we didn't maintain backward compatibility and every change was breaking. + /// Particularly, we had an equality check for the version. To work around that code + /// in previous server versions we now have to lie to them about the version. + const UInt64 version = initiator_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL + ? DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION + : DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION; writeIntBinary(version, out); writeIntBinary(mode, out); @@ -53,10 +57,12 @@ ParallelReadRequest ParallelReadRequest::deserialize(ReadBuffer & in) { UInt64 version; readIntBinary(version, in); - if (version != DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION) - throw Exception(ErrorCodes::UNKNOWN_PROTOCOL, "Protocol versions for parallel reading "\ - "from replicas differ. Got: {}, supported version: {}", - version, DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION); + if (version < DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION) + throw Exception( + ErrorCodes::UNKNOWN_PROTOCOL, + "Parallel replicas protocol version is too old. Got: {}, min supported version: {}", + version, + DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION); CoordinationMode mode; size_t replica_num; @@ -70,12 +76,7 @@ ParallelReadRequest ParallelReadRequest::deserialize(ReadBuffer & in) readIntBinary(min_number_of_marks, in); description.deserialize(in); - return ParallelReadRequest( - mode, - replica_num, - min_number_of_marks, - std::move(description) - ); + return ParallelReadRequest(mode, replica_num, min_number_of_marks, std::move(description)); } void ParallelReadRequest::merge(ParallelReadRequest & other) @@ -86,9 +87,14 @@ void ParallelReadRequest::merge(ParallelReadRequest & other) description.merge(other.description); } -void ParallelReadResponse::serialize(WriteBuffer & out) const +void ParallelReadResponse::serialize(WriteBuffer & out, UInt64 replica_protocol_version) const { - UInt64 version = DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION; + /// Previously we didn't maintain backward compatibility and every change was breaking. + /// Particularly, we had an equality check for the version. To work around that code + /// in previous server versions we now have to lie to them about the version. + UInt64 version = replica_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL + ? DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION + : DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION; /// Must be the first writeIntBinary(version, out); @@ -105,25 +111,33 @@ void ParallelReadResponse::deserialize(ReadBuffer & in) { UInt64 version; readIntBinary(version, in); - if (version != DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION) - throw Exception(ErrorCodes::UNKNOWN_PROTOCOL, "Protocol versions for parallel reading " \ - "from replicas differ. Got: {}, supported version: {}", - version, DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION); + if (version < DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION) + throw Exception( + ErrorCodes::UNKNOWN_PROTOCOL, + "Parallel replicas protocol version is too old. Got: {}, min supported version: {}", + version, + DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION); readBoolText(finish, in); description.deserialize(in); } -void InitialAllRangesAnnouncement::serialize(WriteBuffer & out) const +void InitialAllRangesAnnouncement::serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const { - UInt64 version = DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION; - /// Must be the first + /// Previously we didn't maintain backward compatibility and every change was breaking. + /// Particularly, we had an equality check for the version. To work around that code + /// in previous server versions we now have to lie to them about the version. + UInt64 version = initiator_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL + ? DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION + : DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION; writeIntBinary(version, out); writeIntBinary(mode, out); description.serialize(out); writeIntBinary(replica_num, out); + if (initiator_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL) + writeIntBinary(mark_segment_size, out); } @@ -132,14 +146,16 @@ String InitialAllRangesAnnouncement::describe() return fmt::format("replica {}, mode {}, {}", replica_num, mode, description.describe()); } -InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffer & in) +InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffer & in, UInt64 replica_protocol_version) { UInt64 version; readIntBinary(version, in); - if (version != DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION) - throw Exception(ErrorCodes::UNKNOWN_PROTOCOL, "Protocol versions for parallel reading " \ - "from replicas differ. Got: {}, supported version: {}", - version, DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION); + if (version < DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION) + throw Exception( + ErrorCodes::UNKNOWN_PROTOCOL, + "Parallel replicas protocol version is too old. Got: {}, min supported version: {}", + version, + DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION); CoordinationMode mode; RangesInDataPartsDescription description; @@ -151,11 +167,11 @@ InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffe description.deserialize(in); readIntBinary(replica_num, in); - return InitialAllRangesAnnouncement { - mode, - description, - replica_num - }; + size_t mark_segment_size = 128; + if (replica_protocol_version >= DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL) + readIntBinary(mark_segment_size, in); + + return InitialAllRangesAnnouncement{mode, description, replica_num, mark_segment_size}; } } diff --git a/src/Storages/MergeTree/RequestResponse.h b/src/Storages/MergeTree/RequestResponse.h index 5f5516a6804..96b65c45bfa 100644 --- a/src/Storages/MergeTree/RequestResponse.h +++ b/src/Storages/MergeTree/RequestResponse.h @@ -63,7 +63,7 @@ struct ParallelReadRequest /// Contains only data part names without mark ranges. RangesInDataPartsDescription description; - void serialize(WriteBuffer & out) const; + void serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const; String describe() const; static ParallelReadRequest deserialize(ReadBuffer & in); void merge(ParallelReadRequest & other); @@ -78,7 +78,7 @@ struct ParallelReadResponse bool finish{false}; RangesInDataPartsDescription description; - void serialize(WriteBuffer & out) const; + void serialize(WriteBuffer & out, UInt64 replica_protocol_version) const; String describe() const; void deserialize(ReadBuffer & in); }; @@ -93,21 +93,18 @@ struct InitialAllRangesAnnouncement /// No default constructor, you must initialize all fields at once. InitialAllRangesAnnouncement( - CoordinationMode mode_, - RangesInDataPartsDescription description_, - size_t replica_num_) - : mode(mode_) - , description(description_) - , replica_num(replica_num_) + CoordinationMode mode_, RangesInDataPartsDescription description_, size_t replica_num_, size_t mark_segment_size_) + : mode(mode_), description(std::move(description_)), replica_num(replica_num_), mark_segment_size(mark_segment_size_) {} CoordinationMode mode; RangesInDataPartsDescription description; size_t replica_num; + size_t mark_segment_size; - void serialize(WriteBuffer & out) const; + void serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const; String describe(); - static InitialAllRangesAnnouncement deserialize(ReadBuffer & in); + static InitialAllRangesAnnouncement deserialize(ReadBuffer & i, UInt64 replica_protocol_version); }; diff --git a/tests/integration/test_backward_compatibility/configs/clusters.xml b/tests/integration/test_backward_compatibility/configs/clusters.xml new file mode 100644 index 00000000000..ac773152df9 --- /dev/null +++ b/tests/integration/test_backward_compatibility/configs/clusters.xml @@ -0,0 +1,20 @@ + + + + + + node0 + 9000 + + + node1 + 9000 + + + node2 + 9000 + + + + + diff --git a/tests/integration/test_backward_compatibility/test_parallel_replicas_protocol.py b/tests/integration/test_backward_compatibility/test_parallel_replicas_protocol.py new file mode 100644 index 00000000000..e1b9049ef5d --- /dev/null +++ b/tests/integration/test_backward_compatibility/test_parallel_replicas_protocol.py @@ -0,0 +1,64 @@ +import pytest + +from helpers.cluster import ClickHouseCluster + + +cluster = ClickHouseCluster(__file__) +cluster_name = "parallel_replicas" +nodes = [ + cluster.add_instance( + f"node{num}", + main_configs=["configs/clusters.xml"], + with_zookeeper=False, + image="clickhouse/clickhouse-server", + tag="23.11", # earlier versions lead to "Not found column sum(a) in block." exception 🤷 + stay_alive=True, + use_old_analyzer=True, + with_installed_binary=True, + ) + for num in range(2) +] + [ + cluster.add_instance( + "node2", + main_configs=["configs/clusters.xml"], + with_zookeeper=False, + use_old_analyzer=True, + ) +] + + +@pytest.fixture(scope="module") +def start_cluster(): + try: + cluster.start() + yield cluster + + finally: + cluster.shutdown() + + +def test_backward_compatability(start_cluster): + for node in nodes: + node.query("create table t (a UInt64) engine = MergeTree order by tuple()") + node.query("insert into t select number % 100000 from numbers_mt(1000000)") + + # all we want is the query to run without errors + for node in nodes: + assert ( + node.query( + """ + select sum(a) + from t + """, + settings={ + "cluster_for_parallel_replicas": "parallel_replicas", + "max_parallel_replicas": 3, + "allow_experimental_parallel_reading_from_replicas": 1, + "parallel_replicas_for_non_replicated_merge_tree": 1, + }, + ) + == "49999500000\n" + ) + + for node in nodes: + node.query("drop table t")