Initiliaze working set on pipelie initialization, right after analysis

This commit is contained in:
Igor Nikonov 2024-06-06 14:28:17 +00:00
parent 69009b886f
commit f8d4aabfe0
4 changed files with 48 additions and 44 deletions

View File

@ -908,13 +908,13 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
chassert(reading);
auto result_ptr = reading->selectRangesToRead();
UInt64 rows_to_read = result_ptr->selected_rows;
reading->setAnalyzedResult(std::move(result_ptr));
// (2) if it's ReadFromMergeTree - run index analysis and check number of rows to read
if (settings.parallel_replicas_min_number_of_rows_per_replica > 0)
{
auto result_ptr = reading->selectRangesToRead();
UInt64 rows_to_read = result_ptr->selected_rows;
reading->setAnalyzedResult(std::move(result_ptr));
if (table_expression_query_info.limit > 0 && table_expression_query_info.limit < rows_to_read)
rows_to_read = table_expression_query_info.limit;

View File

@ -273,44 +273,21 @@ std::unique_ptr<QueryPlan> createLocalPlanForParallelReplicas(
}
chassert(reading);
if (!analyzed_read_from_merge_tree)
analyzed_read_from_merge_tree = std::move(node->step);
auto * analyzed_merge_tree = typeid_cast<ReadFromMergeTree *>(analyzed_read_from_merge_tree.get());
/// if no analysis is done yet, let's do it (happens with JOINs)
if (!analyzed_merge_tree->hasAnalyzedResult())
analyzed_merge_tree->setAnalyzedResult(analyzed_merge_tree->selectRangesToRead());
ReadFromMergeTree * analyzed_merge_tree = nullptr;
if (analyzed_read_from_merge_tree.get())
analyzed_merge_tree = typeid_cast<ReadFromMergeTree *>(analyzed_read_from_merge_tree.get());
chassert(analyzed_merge_tree->hasAnalyzedResult());
CoordinationMode mode = CoordinationMode::Default;
switch (analyzed_merge_tree->getReadType())
{
case ReadFromMergeTree::ReadType::Default:
mode = CoordinationMode::Default;
break;
case ReadFromMergeTree::ReadType::InOrder:
mode = CoordinationMode::WithOrder;
break;
case ReadFromMergeTree::ReadType::InReverseOrder:
mode = CoordinationMode::ReverseOrder;
break;
case ReadFromMergeTree::ReadType::ParallelReplicas:
chassert(false);
UNREACHABLE();
}
const auto number_of_local_replica = new_context->getSettingsRef().max_parallel_replicas - 1;
coordinator->handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement(
mode, analyzed_merge_tree->getAnalysisResult().parts_with_ranges.getDescriptions(), number_of_local_replica));
MergeTreeAllRangesCallback all_ranges_cb = [coordinator](InitialAllRangesAnnouncement) {};
MergeTreeAllRangesCallback all_ranges_cb
= [coordinator](InitialAllRangesAnnouncement announcement) { coordinator->handleInitialAllRangesAnnouncement(announcement); };
MergeTreeReadTaskCallback read_task_cb = [coordinator](ParallelReadRequest req) -> std::optional<ParallelReadResponse>
{ return coordinator->handleRequest(std::move(req)); };
const auto number_of_local_replica = new_context->getSettingsRef().max_parallel_replicas - 1;
auto read_from_merge_tree_parallel_replicas
= reading->createLocalParallelReplicasReadingStep(analyzed_merge_tree, true, all_ranges_cb, read_task_cb, number_of_local_replica);
= reading->createLocalParallelReplicasReadingStep(analyzed_merge_tree, all_ranges_cb, read_task_cb, number_of_local_replica);
node->step = std::move(read_from_merge_tree_parallel_replicas);
addConvertingActions(*query_plan, header, has_missing_objects);

View File

@ -337,7 +337,6 @@ ReadFromMergeTree::ReadFromMergeTree(
std::unique_ptr<ReadFromMergeTree> ReadFromMergeTree::createLocalParallelReplicasReadingStep(
const ReadFromMergeTree * analyzed_merge_tree,
bool enable_parallel_reading_,
std::optional<MergeTreeAllRangesCallback> all_ranges_callback_,
std::optional<MergeTreeReadTaskCallback> read_task_callback_,
std::optional<size_t> number_of_current_replica_)
@ -354,8 +353,8 @@ std::unique_ptr<ReadFromMergeTree> ReadFromMergeTree::createLocalParallelReplica
requested_num_streams,
max_block_numbers_to_read,
log,
analyzed_merge_tree->analyzed_result_ptr,
enable_parallel_reading_,
(analyzed_merge_tree ? analyzed_merge_tree->analyzed_result_ptr : nullptr),
true,
all_ranges_callback_,
read_task_callback_,
number_of_current_replica_);
@ -1424,11 +1423,8 @@ static void buildIndexes(
const auto & settings = context->getSettingsRef();
indexes.emplace(ReadFromMergeTree::Indexes{{
filter_actions_dag,
context,
primary_key_column_names,
primary_key.expression}, {}, {}, {}, {}, false, {}});
indexes.emplace(
ReadFromMergeTree::Indexes{KeyCondition{filter_actions_dag, context, primary_key_column_names, primary_key.expression}});
if (metadata_snapshot->hasPartitionKey())
{
@ -1951,6 +1947,33 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
{
auto result = getAnalysisResult();
if (is_parallel_reading_from_replicas && context->canUseParallelReplicasOnInitiator())
{
CoordinationMode mode = CoordinationMode::Default;
switch (result.read_type)
{
case ReadFromMergeTree::ReadType::Default:
mode = CoordinationMode::Default;
break;
case ReadFromMergeTree::ReadType::InOrder:
mode = CoordinationMode::WithOrder;
break;
case ReadFromMergeTree::ReadType::InReverseOrder:
mode = CoordinationMode::ReverseOrder;
break;
case ReadFromMergeTree::ReadType::ParallelReplicas:
chassert(false);
UNREACHABLE();
}
chassert(number_of_current_replica.has_value());
chassert(all_ranges_callback.has_value());
/// initialize working set from local replica
all_ranges_callback.value()(
InitialAllRangesAnnouncement(mode, result.parts_with_ranges.getDescriptions(), number_of_current_replica.value()));
}
if (enable_remove_parts_from_snapshot_optimization)
{
/// Do not keep data parts in snapshot.

View File

@ -126,7 +126,6 @@ public:
std::unique_ptr<ReadFromMergeTree> createLocalParallelReplicasReadingStep(
const ReadFromMergeTree * analyzed_merge_tree,
bool enable_parallel_reading_,
std::optional<MergeTreeAllRangesCallback> all_ranges_callback_,
std::optional<MergeTreeReadTaskCallback> read_task_callback_,
std::optional<size_t> number_of_current_replica_);
@ -151,6 +150,11 @@ public:
struct Indexes
{
explicit Indexes(KeyCondition key_condition_)
: key_condition(std::move(key_condition_))
, use_skip_indexes(false)
{}
KeyCondition key_condition;
std::optional<PartitionPruner> partition_pruner;
std::optional<KeyCondition> minmax_idx_condition;