Use local working set for parallel replicas

This commit is contained in:
Igor Nikonov 2024-05-28 21:04:33 +00:00
parent a9b485a2c1
commit d756729f38
5 changed files with 52 additions and 53 deletions

View File

@ -273,37 +273,37 @@ std::unique_ptr<QueryPlan> createLocalPlanForParallelReplicas(
}
chassert(reading);
const auto * analyzed_merge_tree = typeid_cast<const ReadFromMergeTree *>(read_from_merge_tree.get());
if (!analyzed_merge_tree->hasAnalyzedResult())
analyzed_merge_tree->selectRangesToRead();
chassert(analyzed_merge_tree->hasAnalyzedResult());
CoordinationMode mode = CoordinationMode::Default;
switch (analyzed_merge_tree->getReadType())
{
case ReadFromMergeTree::ReadType::Default:
coordinator->initialize(CoordinationMode::Default);
mode = CoordinationMode::Default;
break;
case ReadFromMergeTree::ReadType::InOrder:
coordinator->initialize(CoordinationMode::WithOrder);
mode = CoordinationMode::WithOrder;
break;
case ReadFromMergeTree::ReadType::InReverseOrder:
coordinator->initialize(CoordinationMode::ReverseOrder);
mode = CoordinationMode::ReverseOrder;
break;
case ReadFromMergeTree::ReadType::ParallelReplicas:
chassert(false);
UNREACHABLE();
}
MergeTreeAllRangesCallback all_ranges_cb = [coordinator](InitialAllRangesAnnouncement announcement)
{
chassert(coordinator);
coordinator->handleInitialAllRangesAnnouncement(std::move(announcement));
};
const auto number_of_local_replica = new_context->getSettingsRef().max_parallel_replicas - 1;
coordinator->handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement(
mode, analyzed_merge_tree->getAnalysisResult().parts_with_ranges.getDescriptions(), number_of_local_replica));
MergeTreeAllRangesCallback all_ranges_cb = [coordinator](InitialAllRangesAnnouncement) {};
MergeTreeReadTaskCallback read_task_cb = [coordinator](ParallelReadRequest req) -> std::optional<ParallelReadResponse>
{ return coordinator->handleRequest(std::move(req)); };
auto read_from_merge_tree_parallel_replicas = reading->createLocalParallelReplicasReadingStep(analyzed_merge_tree, true, all_ranges_cb, read_task_cb);
auto read_from_merge_tree_parallel_replicas
= reading->createLocalParallelReplicasReadingStep(analyzed_merge_tree, true, all_ranges_cb, read_task_cb, number_of_local_replica);
node->step = std::move(read_from_merge_tree_parallel_replicas);
addConvertingActions(*query_plan, header, has_missing_objects);

View File

@ -274,7 +274,8 @@ ReadFromMergeTree::ReadFromMergeTree(
AnalysisResultPtr analyzed_result_ptr_,
bool enable_parallel_reading_,
std::optional<MergeTreeAllRangesCallback> all_ranges_callback_,
std::optional<MergeTreeReadTaskCallback> read_task_callback_)
std::optional<MergeTreeReadTaskCallback> read_task_callback_,
std::optional<size_t> number_of_current_replica_)
: SourceStepWithFilter(DataStream{.header = MergeTreeSelectProcessor::transformHeader(
storage_snapshot_->getSampleBlockForColumns(all_column_names_),
query_info_.prewhere_info)}, all_column_names_, query_info_, storage_snapshot_, context_)
@ -295,18 +296,12 @@ ReadFromMergeTree::ReadFromMergeTree(
, analyzed_result_ptr(analyzed_result_ptr_)
, is_parallel_reading_from_replicas(enable_parallel_reading_)
, enable_remove_parts_from_snapshot_optimization(query_info_.merge_tree_enable_remove_parts_from_snapshot_optimization)
, number_of_current_replica(number_of_current_replica_)
{
if (is_parallel_reading_from_replicas)
{
if (all_ranges_callback_)
all_ranges_callback = all_ranges_callback_.value();
else
all_ranges_callback = context->getMergeTreeAllRangesCallback();
if (read_task_callback_)
read_task_callback = read_task_callback_.value();
else
read_task_callback = context->getMergeTreeReadTaskCallback();
all_ranges_callback = all_ranges_callback_.value_or(context->getMergeTreeAllRangesCallback());
read_task_callback = read_task_callback_.value_or(context->getMergeTreeReadTaskCallback());
}
const auto & settings = context->getSettingsRef();
@ -344,7 +339,8 @@ std::unique_ptr<ReadFromMergeTree> ReadFromMergeTree::createLocalParallelReplica
const ReadFromMergeTree * analyzed_merge_tree,
bool enable_parallel_reading_,
std::optional<MergeTreeAllRangesCallback> all_ranges_callback_,
std::optional<MergeTreeReadTaskCallback> read_task_callback_)
std::optional<MergeTreeReadTaskCallback> read_task_callback_,
std::optional<size_t> number_of_current_replica_)
{
return std::make_unique<ReadFromMergeTree>(
prepared_parts,
@ -361,7 +357,8 @@ std::unique_ptr<ReadFromMergeTree> ReadFromMergeTree::createLocalParallelReplica
analyzed_merge_tree->analyzed_result_ptr,
enable_parallel_reading_,
all_ranges_callback_,
read_task_callback_);
read_task_callback_,
number_of_current_replica_);
}
Pipe ReadFromMergeTree::readFromPoolParallelReplicas(RangesInDataParts parts_with_range, Names required_columns, PoolSettings pool_settings)
@ -372,9 +369,7 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(RangesInDataParts parts_wit
{
.all_callback = all_ranges_callback.value(),
.callback = read_task_callback.value(),
.count_participating_replicas = client_info.count_participating_replicas,
.number_of_current_replica = client_info.number_of_current_replica,
.columns_to_read = required_columns,
.number_of_current_replica = number_of_current_replica.value_or(client_info.number_of_current_replica),
};
/// We have a special logic for local replica. It has to read less data, because in some cases it should
@ -545,9 +540,7 @@ Pipe ReadFromMergeTree::readInOrder(
{
.all_callback = all_ranges_callback.value(),
.callback = read_task_callback.value(),
.count_participating_replicas = client_info.count_participating_replicas,
.number_of_current_replica = client_info.number_of_current_replica,
.columns_to_read = required_columns,
};
const auto multiplier = context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier;

View File

@ -121,13 +121,15 @@ public:
AnalysisResultPtr analyzed_result_ptr_,
bool enable_parallel_reading_,
std::optional<MergeTreeAllRangesCallback> all_ranges_callback_ = std::nullopt,
std::optional<MergeTreeReadTaskCallback> read_task_callback_ = std::nullopt);
std::optional<MergeTreeReadTaskCallback> read_task_callback_ = std::nullopt,
std::optional<size_t> number_of_current_replica_ = std::nullopt);
std::unique_ptr<ReadFromMergeTree> createLocalParallelReplicasReadingStep(
const ReadFromMergeTree * analyzed_merge_tree,
bool enable_parallel_reading_,
std::optional<MergeTreeAllRangesCallback> all_ranges_callback_,
std::optional<MergeTreeReadTaskCallback> read_task_callback_);
std::optional<MergeTreeReadTaskCallback> read_task_callback_,
std::optional<size_t> number_of_current_replica_);
static constexpr auto name = "ReadFromMergeTree";
String getName() const override { return name; }
@ -192,6 +194,7 @@ public:
bool hasAnalyzedResult() const { return analyzed_result_ptr != nullptr; }
void setAnalyzedResult(AnalysisResultPtr analyzed_result_ptr_) { analyzed_result_ptr = std::move(analyzed_result_ptr_); }
ReadFromMergeTree::AnalysisResult getAnalysisResult() const;
const MergeTreeData::DataPartsVector & getParts() const { return prepared_parts; }
const std::vector<AlterConversionsPtr> & getAlterConvertionsForParts() const { return alter_conversions_for_parts; }
@ -286,8 +289,6 @@ private:
Pipe spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts, size_t num_streams, const Names & origin_column_names, const Names & column_names, ActionsDAGPtr & out_projection);
ReadFromMergeTree::AnalysisResult getAnalysisResult() const;
AnalysisResultPtr analyzed_result_ptr;
VirtualFields shared_virtual_fields;
@ -296,6 +297,7 @@ private:
std::optional<MergeTreeReadTaskCallback> read_task_callback;
bool enable_vertical_final = false;
bool enable_remove_parts_from_snapshot_optimization = true;
std::optional<size_t> number_of_current_replica;
friend class ReadFromMergeTreeCoordinated;
};

View File

@ -390,8 +390,18 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
std::vector<String> description;
description.push_back(fmt::format("query: {}", formattedAST(query_ast)));
for (const auto & pool : cluster->getShardsInfo().front().per_replica_pools)
description.push_back(fmt::format("Replica: {}", pool->getHost()));
bool first_local = false;
for (const auto & addr : cluster->getShardsAddresses().front())
{
/// skip first local
if (exclude_local_replica && addr.is_local && !first_local)
{
first_local = true;
continue;
}
description.push_back(fmt::format("Replica: {}", addr.host_name));
}
setStepDescription(boost::algorithm::join(description, ", "));
}
@ -414,9 +424,6 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
const auto & shard = cluster->getShardsInfo().at(0);
size_t all_replicas_count = current_settings.max_parallel_replicas;
if (exclude_local_replica)
--all_replicas_count;
if (all_replicas_count > shard.getAllNodeCount())
{
LOG_INFO(
@ -427,6 +434,8 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
shard.getAllNodeCount());
all_replicas_count = shard.getAllNodeCount();
}
if (exclude_local_replica)
--all_replicas_count;
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool> shuffled_pool;
if (all_replicas_count < shard.getAllNodeCount())

View File

@ -26,12 +26,7 @@ struct ParallelReadingExtension
{
MergeTreeAllRangesCallback all_callback;
MergeTreeReadTaskCallback callback;
size_t count_participating_replicas{0};
size_t number_of_current_replica{0};
/// This is needed to estimate the number of bytes
/// between a pair of marks to perform one request
/// over the network for a 1Gb of data.
Names columns_to_read;
};
/// Base class for MergeTreeThreadSelectAlgorithm and MergeTreeSelectAlgorithm