Local reading step from merge tree

This commit is contained in:
Igor Nikonov 2024-05-27 12:22:18 +00:00
parent be08ebd0f4
commit 5cdf8d336c
5 changed files with 264 additions and 13 deletions

View File

@ -495,7 +495,7 @@ void executeQueryWithParallelReplicas(
query_ast,
new_cluster,
storage_id,
std::move(coordinator),
coordinator,
header,
processed_stage,
new_context,

View File

@ -1,12 +1,32 @@
#include <Processors/QueryPlan/ParallelReplicasLocalPlan.h>
#include <Common/checkStackSize.h>
#include "Storages/MergeTree/RequestResponse.h"
#include <Interpreters/Context.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/ISourceStep.h>
#include <Interpreters/StorageID.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/Pipe.h>
#include <Parsers/ASTFunction.h>
#include <Storages/MergeTree/RequestResponse.h>
#include <memory>
namespace ProfileEvents
{
extern const Event SelectedParts;
extern const Event SelectedRanges;
extern const Event SelectedMarks;
}
namespace DB
{
@ -40,13 +60,179 @@ void addConvertingActions(QueryPlan & plan, const Block & header, bool has_missi
}
class ReadFromMergeTreeCoordinated : public ISourceStep
{
public:
ReadFromMergeTreeCoordinated(QueryPlanStepPtr read_from_merge_tree_, ParallelReplicasReadingCoordinatorPtr coordinator_)
: ISourceStep(read_from_merge_tree_->getOutputStream())
, read_from_merge_tree(std::move(read_from_merge_tree_))
, coordinator(std::move(coordinator_))
{
}
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) override;
String getName() const override { return "ReadFromLocalParallelReplica"; }
private:
QueryPlanStepPtr read_from_merge_tree;
ParallelReplicasReadingCoordinatorPtr coordinator;
};
void ReadFromMergeTreeCoordinated::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*settings*/)
{
ReadFromMergeTree & reading = *typeid_cast<ReadFromMergeTree *>(read_from_merge_tree.get());
auto result = reading.getAnalysisResult();
const auto & query_info = reading.getQueryInfo();
const auto & data = reading.data;
const auto & context = reading.getContext();
const auto & storage_snapshot = reading.getStorageSnapshot();
if (reading.enable_remove_parts_from_snapshot_optimization)
{
/// Do not keep data parts in snapshot.
/// They are stored separately, and some could be released after PK analysis.
reading.storage_snapshot->data = std::make_unique<MergeTreeData::SnapshotData>();
}
LOG_DEBUG(
reading.log,
"Selected {}/{} parts by partition key, {} parts by primary key, {}/{} marks by primary key, {} marks to read from {} ranges",
result.parts_before_pk,
result.total_parts,
result.selected_parts,
result.selected_marks_pk,
result.total_marks_pk,
result.selected_marks,
result.selected_ranges);
// Adding partition info to QueryAccessInfo.
if (context->hasQueryContext() && !query_info.is_internal)
{
Names partition_names;
for (const auto & part : result.parts_with_ranges)
{
partition_names.emplace_back(
fmt::format("{}.{}", data.getStorageID().getFullNameNotQuoted(), part.data_part->info.partition_id));
}
context->getQueryContext()->addQueryAccessInfo(partition_names);
if (storage_snapshot->projection)
context->getQueryContext()->addQueryAccessInfo(
Context::QualifiedProjectionName{.storage_id = data.getStorageID(), .projection_name = storage_snapshot->projection->name});
}
ProfileEvents::increment(ProfileEvents::SelectedParts, result.selected_parts);
ProfileEvents::increment(ProfileEvents::SelectedRanges, result.selected_ranges);
ProfileEvents::increment(ProfileEvents::SelectedMarks, result.selected_marks);
auto query_id_holder = MergeTreeDataSelectExecutor::checkLimits(data, result, context);
// TODO: check this on plan level, we should be here if there is nothing to read
if (result.parts_with_ranges.empty())
{
pipeline.init(Pipe(std::make_shared<NullSource>(getOutputStream().header)));
return;
}
/// Projection, that needed to drop columns, which have appeared by execution
/// of some extra expressions, and to allow execute the same expressions later.
/// NOTE: It may lead to double computation of expressions.
ActionsDAGPtr result_projection;
Pipe pipe = reading.spreadMarkRanges(std::move(result.parts_with_ranges), reading.requested_num_streams, result, result_projection);
for (const auto & processor : pipe.getProcessors())
processor->setStorageLimits(query_info.storage_limits);
if (pipe.empty())
{
pipeline.init(Pipe(std::make_shared<NullSource>(getOutputStream().header)));
return;
}
if (result.sampling.use_sampling)
{
auto sampling_actions = std::make_shared<ExpressionActions>(result.sampling.filter_expression);
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(
header,
sampling_actions,
result.sampling.filter_function->getColumnName(),
false);
});
}
Block cur_header = pipe.getHeader();
auto append_actions = [&result_projection](ActionsDAGPtr actions)
{
if (!result_projection)
result_projection = std::move(actions);
else
result_projection = ActionsDAG::merge(std::move(*result_projection), std::move(*actions));
};
if (result_projection)
cur_header = result_projection->updateHeader(cur_header);
/// Extra columns may be returned (for example, if sampling is used).
/// Convert pipe to step header structure.
if (!isCompatibleHeader(cur_header, getOutputStream().header))
{
auto converting = ActionsDAG::makeConvertingActions(
cur_header.getColumnsWithTypeAndName(),
getOutputStream().header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
append_actions(std::move(converting));
}
if (result_projection)
{
auto projection_actions = std::make_shared<ExpressionActions>(result_projection);
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, projection_actions);
});
}
/// Some extra columns could be added by sample/final/in-order/etc
/// Remove them from header if not needed.
if (!blocksHaveEqualStructure(pipe.getHeader(), getOutputStream().header))
{
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
pipe.getHeader().getColumnsWithTypeAndName(),
getOutputStream().header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name,
true);
auto converting_dag_expr = std::make_shared<ExpressionActions>(convert_actions_dag);
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, converting_dag_expr);
});
}
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);
pipeline.init(std::move(pipe));
pipeline.addContext(context);
// Attach QueryIdHolder if needed
if (query_id_holder)
pipeline.setQueryIdHolder(std::move(query_id_holder));
}
std::unique_ptr<QueryPlan> createLocalPlanForParallelReplicas(
const ASTPtr & query_ast,
const Block & header,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
ParallelReplicasReadingCoordinatorPtr /*coordinator*/,
QueryPlanStepPtr /*read_from_merge_tree*/,
ParallelReplicasReadingCoordinatorPtr coordinator,
QueryPlanStepPtr read_from_merge_tree,
bool has_missing_objects)
{
checkStackSize();
@ -72,6 +258,33 @@ std::unique_ptr<QueryPlan> createLocalPlanForParallelReplicas(
auto interpreter = InterpreterSelectQueryAnalyzer(query_ast, new_context, select_query_options);
query_plan = std::make_unique<QueryPlan>(std::move(interpreter).extractQueryPlan());
QueryPlan::Node * node = query_plan->getRootNode();
ReadFromMergeTree * reading = nullptr;
while (node)
{
reading = typeid_cast<ReadFromMergeTree *>(node->step.get());
if (reading)
break;
if (!node->children.empty())
node = node->children.at(0);
}
chassert(reading);
MergeTreeAllRangesCallback all_ranges_cb = [coordinator](InitialAllRangesAnnouncement announcement)
{
chassert(coordinator);
coordinator->handleInitialAllRangesAnnouncement(std::move(announcement));
};
MergeTreeReadTaskCallback read_task_cb = [coordinator](ParallelReadRequest req) -> std::optional<ParallelReadResponse>
{ return coordinator->handleRequest(std::move(req)); };
const auto * analyzed_merge_tree = typeid_cast<const ReadFromMergeTree *>(read_from_merge_tree.get());
auto read_from_merge_tree_parallel_replicas = reading->createLocalParallelReplicasReadingStep(analyzed_merge_tree, true, all_ranges_cb, read_task_cb);
node->step = std::move(read_from_merge_tree_parallel_replicas);
addConvertingActions(*query_plan, header, has_missing_objects);
return query_plan;
}

View File

@ -272,7 +272,9 @@ ReadFromMergeTree::ReadFromMergeTree(
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
LoggerPtr log_,
AnalysisResultPtr analyzed_result_ptr_,
bool enable_parallel_reading)
bool enable_parallel_reading_,
std::optional<MergeTreeAllRangesCallback> all_ranges_callback_,
std::optional<MergeTreeReadTaskCallback> read_task_callback_)
: SourceStepWithFilter(DataStream{.header = MergeTreeSelectProcessor::transformHeader(
storage_snapshot_->getSampleBlockForColumns(all_column_names_),
query_info_.prewhere_info)}, all_column_names_, query_info_, storage_snapshot_, context_)
@ -291,12 +293,19 @@ ReadFromMergeTree::ReadFromMergeTree(
, max_block_numbers_to_read(std::move(max_block_numbers_to_read_))
, log(std::move(log_))
, analyzed_result_ptr(analyzed_result_ptr_)
, is_parallel_reading_from_replicas(enable_parallel_reading)
, is_parallel_reading_from_replicas(enable_parallel_reading_)
, enable_remove_parts_from_snapshot_optimization(query_info_.merge_tree_enable_remove_parts_from_snapshot_optimization)
{
if (is_parallel_reading_from_replicas)
{
if (all_ranges_callback_)
all_ranges_callback = all_ranges_callback_.value();
else
all_ranges_callback = context->getMergeTreeAllRangesCallback();
if (read_task_callback_)
read_task_callback = read_task_callback_.value();
else
read_task_callback = context->getMergeTreeReadTaskCallback();
}
@ -331,11 +340,31 @@ ReadFromMergeTree::ReadFromMergeTree(
enable_vertical_final);
}
std::unique_ptr<ReadFromMergeTree> ReadFromMergeTree::createLocalParallelReplicasReadingStep(
const ReadFromMergeTree * analyzed_merge_tree,
bool enable_parallel_reading_,
std::optional<MergeTreeAllRangesCallback> all_ranges_callback_,
std::optional<MergeTreeReadTaskCallback> read_task_callback_)
{
return std::make_unique<ReadFromMergeTree>(
prepared_parts,
alter_conversions_for_parts,
all_column_names,
data,
getQueryInfo(),
getStorageSnapshot(),
getContext(),
block_size.max_block_size_rows,
requested_num_streams,
max_block_numbers_to_read,
log,
analyzed_merge_tree->analyzed_result_ptr,
enable_parallel_reading_,
all_ranges_callback_,
read_task_callback_);
}
Pipe ReadFromMergeTree::readFromPoolParallelReplicas(
RangesInDataParts parts_with_range,
Names required_columns,
PoolSettings pool_settings)
Pipe ReadFromMergeTree::readFromPoolParallelReplicas(RangesInDataParts parts_with_range, Names required_columns, PoolSettings pool_settings)
{
const auto & client_info = context->getClientInfo();

View File

@ -119,7 +119,15 @@ public:
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
LoggerPtr log_,
AnalysisResultPtr analyzed_result_ptr_,
bool enable_parallel_reading);
bool enable_parallel_reading_,
std::optional<MergeTreeAllRangesCallback> all_ranges_callback_ = std::nullopt,
std::optional<MergeTreeReadTaskCallback> read_task_callback_ = std::nullopt);
std::unique_ptr<ReadFromMergeTree> createLocalParallelReplicasReadingStep(
const ReadFromMergeTree * analyzed_merge_tree,
bool enable_parallel_reading_,
std::optional<MergeTreeAllRangesCallback> all_ranges_callback_,
std::optional<MergeTreeReadTaskCallback> read_task_callback_);
static constexpr auto name = "ReadFromMergeTree";
String getName() const override { return name; }
@ -282,6 +290,8 @@ private:
std::optional<MergeTreeReadTaskCallback> read_task_callback;
bool enable_vertical_final = false;
bool enable_remove_parts_from_snapshot_optimization = true;
friend class ReadFromMergeTreeCoordinated;
};
}

View File

@ -1,6 +1,5 @@
#pragma once
#include <condition_variable>
#include <functional>
#include <optional>