This commit is contained in:
Nikita Taranov 2024-08-15 16:55:47 +01:00
parent f4b8a98d06
commit cb0335446e
14 changed files with 130 additions and 37 deletions

View File

@ -329,6 +329,7 @@ The server successfully detected this situation and will download merged part fr
M(ParallelReplicasReadAssignedMarks, "Sum across all replicas of how many of scheduled marks were assigned by consistent hash") \
M(ParallelReplicasReadUnassignedMarks, "Sum across all replicas of how many unassigned marks were scheduled") \
M(ParallelReplicasReadAssignedForStealingMarks, "Sum across all replicas of how many of scheduled marks were assigned for stealing by consistent hash") \
M(ParallelReplicasReadMarks, "How many marks were read by the given replica") \
\
M(ParallelReplicasStealingByHashMicroseconds, "Time spent collecting segments meant for stealing by hash") \
M(ParallelReplicasProcessingPartsMicroseconds, "Time spent processing data parts") \
@ -482,6 +483,7 @@ The server successfully detected this situation and will download merged part fr
M(CachedReadBufferReadFromCacheMicroseconds, "Time reading from filesystem cache") \
M(CachedReadBufferReadFromSourceBytes, "Bytes read from filesystem cache source (from remote fs, etc)") \
M(CachedReadBufferReadFromCacheBytes, "Bytes read from filesystem cache") \
M(CachedReadBufferPredownloadedBytes, "Bytes read from filesystem cache source. Cache segments are read from left to right as a whole, it might be that we need to predownload some part of the segment irrelevant for the current task just to get to the needed data") \
M(CachedReadBufferCacheWriteBytes, "Bytes written from source (remote fs, etc) to filesystem cache") \
M(CachedReadBufferCacheWriteMicroseconds, "Time spent writing data into filesystem cache") \
M(CachedReadBufferCreateBufferMicroseconds, "Prepare buffer time") \

View File

@ -33,7 +33,8 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 1;
static constexpr auto DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3;
static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3;
static constexpr auto DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 4;
static constexpr auto DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS = 54453;
static constexpr auto DBMS_MERGE_TREE_PART_INFO_VERSION = 1;

View File

@ -938,7 +938,7 @@ class IColumn;
M(Bool, parallel_replicas_for_non_replicated_merge_tree, false, "If true, ClickHouse will use parallel replicas algorithm also for non-replicated MergeTree tables", 0) \
M(UInt64, parallel_replicas_min_number_of_rows_per_replica, 0, "Limit the number of replicas used in a query to (estimated rows to read / min_number_of_rows_per_replica). The max is still limited by 'max_parallel_replicas'", 0) \
M(Bool, parallel_replicas_prefer_local_join, true, "If true, and JOIN can be executed with parallel replicas algorithm, and all storages of right JOIN part are *MergeTree, local JOIN will be used instead of GLOBAL JOIN.", 0) \
M(UInt64, parallel_replicas_mark_segment_size, 128, "Parts virtually divided into segments to be distributed between replicas for parallel reading. This setting controls the size of these segments. Not recommended to change until you're absolutely sure in what you're doing", 0) \
M(UInt64, parallel_replicas_mark_segment_size, 0, "Parts virtually divided into segments to be distributed between replicas for parallel reading. This setting controls the size of these segments. Not recommended to change until you're absolutely sure in what you're doing. Value should be in range [128; 16384]", 0) \
M(Bool, allow_archive_path_syntax, true, "File/S3 engines/table function will parse paths with '::' as '<archive> :: <file>' if archive has correct extension", 0) \
\
M(Bool, allow_experimental_inverted_index, false, "If it is set to true, allow to use experimental inverted index.", 0) \

View File

@ -28,6 +28,7 @@ extern const Event CachedReadBufferReadFromCacheMicroseconds;
extern const Event CachedReadBufferCacheWriteMicroseconds;
extern const Event CachedReadBufferReadFromSourceBytes;
extern const Event CachedReadBufferReadFromCacheBytes;
extern const Event CachedReadBufferPredownloadedBytes;
extern const Event CachedReadBufferCacheWriteBytes;
extern const Event CachedReadBufferCreateBufferMicroseconds;
@ -644,6 +645,7 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegment & file_segment)
size_t current_predownload_size = std::min(current_impl_buffer_size, bytes_to_predownload);
ProfileEvents::increment(ProfileEvents::CachedReadBufferReadFromSourceBytes, current_impl_buffer_size);
ProfileEvents::increment(ProfileEvents::CachedReadBufferPredownloadedBytes, current_impl_buffer_size);
bool continue_predownload = file_segment.reserve(
current_predownload_size, settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds);

View File

@ -52,6 +52,7 @@
#include <memory>
#include <unordered_map>
#include "Interpreters/Cluster.h"
#include "config.h"
using namespace DB;
@ -343,11 +344,11 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(
{
const auto & client_info = context->getClientInfo();
auto extension = ParallelReadingExtension
{
auto extension = ParallelReadingExtension{
.all_callback = all_ranges_callback.value(),
.callback = read_task_callback.value(),
.number_of_current_replica = client_info.number_of_current_replica,
.total_nodes_count = context->getClusterForParallelReplicas()->getShardsInfo().begin()->getAllNodeCount(),
};
/// We have a special logic for local replica. It has to read less data, because in some cases it should
@ -514,11 +515,11 @@ Pipe ReadFromMergeTree::readInOrder(
if (is_parallel_reading_from_replicas)
{
const auto & client_info = context->getClientInfo();
ParallelReadingExtension extension
{
ParallelReadingExtension extension{
.all_callback = all_ranges_callback.value(),
.callback = read_task_callback.value(),
.number_of_current_replica = client_info.number_of_current_replica,
.total_nodes_count = context->getClusterForParallelReplicas()->getShardsInfo().begin()->getAllNodeCount(),
};
const auto multiplier = context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier;

View File

@ -436,8 +436,7 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
shuffled_pool = shard.pool->getShuffledPools(current_settings, priority_func);
}
coordinator
= std::make_shared<ParallelReplicasReadingCoordinator>(max_replicas_to_use, current_settings.parallel_replicas_mark_segment_size);
coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(max_replicas_to_use);
for (size_t i=0; i < max_replicas_to_use; ++i)
{

View File

@ -1,6 +1,79 @@
#include <iterator>
#include <Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>
#include <algorithm>
#include <iterator>
#include <ranges>
namespace
{
size_t chooseSegmentSize(
LoggerPtr log, size_t mark_segment_size, size_t min_marks_per_task, size_t threads, size_t sum_marks, size_t number_of_replicas)
{
/// Mark segment size determines the granularity of work distribution between replicas.
/// Namely, coordinator will take mark segments of size `mark_segment_size` granules, calculate hash of this segment and assign it to corresponding replica.
/// Small segments are good when we read a small random subset of a table, big - when we do full-scan over a large table.
/// With small segments there is a problem: consider a query like `select max(time) from wikistat`. Average size of `time` per granule is ~5KB. So when we
/// read 128 granules we still read only ~0.5MB of data. With default fs cache segment size of 4MB it means a lot of data will be downloaded and written
/// in cache for no reason. General case will look like this:
///
/// +---------- useful data
/// v
/// +------+--+------+
/// |------|++| |
/// |------|++| |
/// +------+--+------+
/// ^
/// predownloaded data -----------+
///
/// Having large segments solves all the problems in this case. Also bigger segments mean less requests (especially for big tables and full-scans).
/// These three values below chosen mostly intuitively. 128 granules is 1M rows - just a good starting point, 16384 seems to still make sense when reading
/// billions of rows and 1024 - is a reasonable point in between. We limit our choice to only these three options because when we change segment size
/// we essentially change distribution of data between replicas and of course we don't want to use simultaneously tens of different distributions, because
/// it would be a huge waste of cache space.
constexpr std::array<size_t, 3> borders{128, 1024, 16384};
LOG_DEBUG(
log,
"mark_segment_size={}, min_marks_per_task*threads={}, sum_marks/number_of_replicas^2={}",
mark_segment_size,
min_marks_per_task * threads,
sum_marks / number_of_replicas / number_of_replicas);
/// Here we take max of three numbers:
/// * user provided setting (0 by default)
/// * (min_marks_per_task * threads) = the number of marks we request from the coordinator each time - there is no point to have segments smaller than one unit of work for a replica
/// * (sum_marks / number_of_replicas^2) - we use consistent hashing for work distribution (including work stealing). If we have a really slow replica
/// everything up to (1/number_of_replicas) portion of its work will be stolen by other replicas. And it owns (1/number_of_replicas) share of total number of marks.
/// Aslo important to note here that sum_marks is calculated after PK analysis, it means in particular that different segment sizes might be used for the
/// same table for different queries (it is intentional).
mark_segment_size = std::max({mark_segment_size, min_marks_per_task * threads, sum_marks / number_of_replicas / number_of_replicas});
/// Squeeze the value to the borders.
mark_segment_size = std::clamp(mark_segment_size, borders.front(), borders.back());
/// After we calculated a hopefully good value for segment_size let's just find the maximal border that is not bigger than the chosen value.
for (auto border : borders | std::views::reverse)
{
if (mark_segment_size >= border)
{
LOG_DEBUG(log, "Chosen segment size: {}", border);
return border;
}
}
UNREACHABLE();
}
}
namespace ProfileEvents
{
extern const Event ParallelReplicasReadMarks;
}
namespace DB
{
@ -34,12 +107,19 @@ MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas(
, extension(std::move(extension_))
, coordination_mode(CoordinationMode::Default)
, min_marks_per_task(pool_settings.min_marks_for_concurrent_read)
, mark_segment_size(chooseSegmentSize(
log,
context_->getSettingsRef().parallel_replicas_mark_segment_size,
min_marks_per_task,
pool_settings.threads,
pool_settings.sum_marks,
extension.total_nodes_count))
{
for (const auto & info : per_part_infos)
min_marks_per_task = std::max(min_marks_per_task, info->min_marks_per_task);
extension.all_callback(
InitialAllRangesAnnouncement(coordination_mode, parts_ranges.getDescriptions(), extension.number_of_current_replica));
extension.all_callback(InitialAllRangesAnnouncement(
coordination_mode, parts_ranges.getDescriptions(), extension.number_of_current_replica, mark_segment_size));
}
MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t /*task_idx*/, MergeTreeReadTask * previous_task)
@ -104,6 +184,7 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t /*task_id
if (current_task.ranges.empty())
buffered_ranges.pop_front();
ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, current_sum_marks);
return createTask(per_part_infos[part_idx], std::move(ranges_to_read), previous_task);
}

View File

@ -30,12 +30,13 @@ public:
private:
mutable std::mutex mutex;
LoggerPtr log = getLogger("MergeTreeReadPoolParallelReplicas");
const ParallelReadingExtension extension;
const CoordinationMode coordination_mode;
size_t min_marks_per_task{0};
size_t mark_segment_size{0};
RangesInDataPartsDescription buffered_ranges;
bool no_more_tasks_available{false};
LoggerPtr log = getLogger("MergeTreeReadPoolParallelReplicas");
};
}

View File

@ -1,5 +1,10 @@
#include <Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h>
namespace ProfileEvents
{
extern const Event ParallelReplicasReadMarks;
}
namespace DB
{
@ -43,11 +48,8 @@ MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrd
for (const auto & part : parts_ranges)
buffered_tasks.push_back({part.data_part->info, MarkRanges{}});
extension.all_callback(InitialAllRangesAnnouncement(
mode,
parts_ranges.getDescriptions(),
extension.number_of_current_replica
));
extension.all_callback(
InitialAllRangesAnnouncement(mode, parts_ranges.getDescriptions(), extension.number_of_current_replica, /*mark_segment_size_=*/0));
}
MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t task_idx, MergeTreeReadTask * previous_task)
@ -68,13 +70,14 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t ta
{
auto result = std::move(desc.ranges);
desc.ranges = MarkRanges{};
ProfileEvents::increment(ProfileEvents::ParallelReplicasReadMarks, desc.ranges.getNumberOfMarks());
return result;
}
}
return std::nullopt;
};
if (auto result = get_from_buffer(); result)
if (auto result = get_from_buffer())
return createTask(per_part_infos[task_idx], std::move(*result), previous_task);
if (no_more_tasks)
@ -97,7 +100,7 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicasInOrder::getTask(size_t ta
std::move(new_ranges.begin(), new_ranges.end(), std::back_inserter(old_ranges));
}
if (auto result = get_from_buffer(); result)
if (auto result = get_from_buffer())
return createTask(per_part_infos[task_idx], std::move(*result), previous_task);
return nullptr;

View File

@ -27,6 +27,7 @@ struct ParallelReadingExtension
MergeTreeAllRangesCallback all_callback;
MergeTreeReadTaskCallback callback;
size_t number_of_current_replica{0};
size_t total_nodes_count{0};
};
/// Base class for MergeTreeThreadSelectAlgorithm and MergeTreeSelectAlgorithm

View File

@ -211,14 +211,11 @@ using PartRefs = std::deque<Parts::iterator>;
class DefaultCoordinator : public ParallelReplicasReadingCoordinator::ImplInterface
{
public:
explicit DefaultCoordinator(size_t replicas_count_, size_t mark_segment_size_)
explicit DefaultCoordinator(size_t replicas_count_)
: ParallelReplicasReadingCoordinator::ImplInterface(replicas_count_)
, mark_segment_size(mark_segment_size_)
, replica_status(replicas_count_)
, distribution_by_hash_queue(replicas_count_)
{
if (mark_segment_size == 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Zero value provided for `mark_segment_size`");
}
~DefaultCoordinator() override;
@ -231,7 +228,7 @@ public:
private:
/// This many granules will represent a single segment of marks that will be assigned to a replica
const size_t mark_segment_size{0};
size_t mark_segment_size{0};
bool state_initialized{false};
size_t finished_replicas{0};
@ -393,6 +390,10 @@ void DefaultCoordinator::initializeReadingState(InitialAllRangesAnnouncement ann
state_initialized = true;
source_replica_for_parts_snapshot = announcement.replica_num;
mark_segment_size = announcement.mark_segment_size;
if (mark_segment_size == 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Zero value provided for `mark_segment_size`");
LOG_DEBUG(log, "Reading state is fully initialized: {}", fmt::join(all_parts_to_read, "; "));
}
@ -1043,7 +1044,7 @@ void ParallelReplicasReadingCoordinator::initialize(CoordinationMode mode)
switch (mode)
{
case CoordinationMode::Default:
pimpl = std::make_unique<DefaultCoordinator>(replicas_count, mark_segment_size);
pimpl = std::make_unique<DefaultCoordinator>(replicas_count);
break;
case CoordinationMode::WithOrder:
pimpl = std::make_unique<InOrderCoordinator<CoordinationMode::WithOrder>>(replicas_count);
@ -1060,8 +1061,7 @@ void ParallelReplicasReadingCoordinator::initialize(CoordinationMode mode)
pimpl->markReplicaAsUnavailable(replica);
}
ParallelReplicasReadingCoordinator::ParallelReplicasReadingCoordinator(size_t replicas_count_, size_t mark_segment_size_)
: replicas_count(replicas_count_), mark_segment_size(mark_segment_size_)
ParallelReplicasReadingCoordinator::ParallelReplicasReadingCoordinator(size_t replicas_count_) : replicas_count(replicas_count_)
{
}

View File

@ -15,7 +15,7 @@ class ParallelReplicasReadingCoordinator
public:
class ImplInterface;
explicit ParallelReplicasReadingCoordinator(size_t replicas_count_, size_t mark_segment_size_ = 0);
explicit ParallelReplicasReadingCoordinator(size_t replicas_count_);
~ParallelReplicasReadingCoordinator();
void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement);
@ -35,7 +35,6 @@ private:
std::mutex mutex;
const size_t replicas_count{0};
size_t mark_segment_size{0};
std::unique_ptr<ImplInterface> pimpl;
ProgressCallback progress_callback; // store the callback only to bypass it to coordinator implementation
std::set<size_t> replicas_used;

View File

@ -126,6 +126,7 @@ void InitialAllRangesAnnouncement::serialize(WriteBuffer & out) const
writeIntBinary(mode, out);
description.serialize(out);
writeIntBinary(replica_num, out);
writeIntBinary(mark_segment_size, out);
}
@ -156,10 +157,15 @@ InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffe
description.deserialize(in);
readIntBinary(replica_num, in);
return InitialAllRangesAnnouncement {
size_t mark_segment_size = 128;
if (version >= 4)
readIntBinary(mark_segment_size, in);
return InitialAllRangesAnnouncement{
mode,
description,
replica_num
replica_num,
mark_segment_size,
};
}

View File

@ -93,17 +93,14 @@ struct InitialAllRangesAnnouncement
/// No default constructor, you must initialize all fields at once.
InitialAllRangesAnnouncement(
CoordinationMode mode_,
RangesInDataPartsDescription description_,
size_t replica_num_)
: mode(mode_)
, description(description_)
, replica_num(replica_num_)
CoordinationMode mode_, RangesInDataPartsDescription description_, size_t replica_num_, size_t mark_segment_size_)
: mode(mode_), description(description_), replica_num(replica_num_), mark_segment_size(mark_segment_size_)
{}
CoordinationMode mode;
RangesInDataPartsDescription description;
size_t replica_num;
size_t mark_segment_size;
void serialize(WriteBuffer & out) const;
String describe();