This commit is contained in:
Igor Nikonov 2024-06-24 14:31:05 +00:00
parent 9658d37d49
commit 5d91fd9717
2 changed files with 0 additions and 174 deletions

View File

@ -9,7 +9,6 @@
#include <deque>
#include <queue>
#include <mutex>
#include <memory>

View File

@ -17,13 +17,6 @@
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/RequestResponse.h>
namespace ProfileEvents
{
extern const Event SelectedParts;
extern const Event SelectedRanges;
extern const Event SelectedMarks;
}
namespace DB
{
@ -56,172 +49,6 @@ void addConvertingActions(QueryPlan & plan, const Block & header, bool has_missi
}
class ReadFromMergeTreeCoordinated : public ISourceStep
{
public:
ReadFromMergeTreeCoordinated(QueryPlanStepPtr read_from_merge_tree_, ParallelReplicasReadingCoordinatorPtr coordinator_)
: ISourceStep(read_from_merge_tree_->getOutputStream())
, read_from_merge_tree(std::move(read_from_merge_tree_))
, coordinator(std::move(coordinator_))
{
}
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) override;
String getName() const override { return "ReadFromLocalParallelReplica"; }
private:
QueryPlanStepPtr read_from_merge_tree;
ParallelReplicasReadingCoordinatorPtr coordinator;
};
void ReadFromMergeTreeCoordinated::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*settings*/)
{
ReadFromMergeTree & reading = *typeid_cast<ReadFromMergeTree *>(read_from_merge_tree.get());
auto result = reading.getAnalysisResult();
const auto & query_info = reading.getQueryInfo();
const auto & data = reading.data;
const auto & context = reading.getContext();
const auto & storage_snapshot = reading.getStorageSnapshot();
if (reading.enable_remove_parts_from_snapshot_optimization)
{
/// Do not keep data parts in snapshot.
/// They are stored separately, and some could be released after PK analysis.
reading.storage_snapshot->data = std::make_unique<MergeTreeData::SnapshotData>();
}
LOG_DEBUG(
reading.log,
"Selected {}/{} parts by partition key, {} parts by primary key, {}/{} marks by primary key, {} marks to read from {} ranges",
result.parts_before_pk,
result.total_parts,
result.selected_parts,
result.selected_marks_pk,
result.total_marks_pk,
result.selected_marks,
result.selected_ranges);
// Adding partition info to QueryAccessInfo.
if (context->hasQueryContext() && !query_info.is_internal)
{
Names partition_names;
for (const auto & part : result.parts_with_ranges)
{
partition_names.emplace_back(
fmt::format("{}.{}", data.getStorageID().getFullNameNotQuoted(), part.data_part->info.partition_id));
}
context->getQueryContext()->addQueryAccessInfo(partition_names);
if (storage_snapshot->projection)
context->getQueryContext()->addQueryAccessInfo(
Context::QualifiedProjectionName{.storage_id = data.getStorageID(), .projection_name = storage_snapshot->projection->name});
}
ProfileEvents::increment(ProfileEvents::SelectedParts, result.selected_parts);
ProfileEvents::increment(ProfileEvents::SelectedRanges, result.selected_ranges);
ProfileEvents::increment(ProfileEvents::SelectedMarks, result.selected_marks);
auto query_id_holder = MergeTreeDataSelectExecutor::checkLimits(data, result, context);
// TODO: check this on plan level, we should be here if there is nothing to read
if (result.parts_with_ranges.empty())
{
pipeline.init(Pipe(std::make_shared<NullSource>(getOutputStream().header)));
return;
}
/// Projection, that needed to drop columns, which have appeared by execution
/// of some extra expressions, and to allow execute the same expressions later.
/// NOTE: It may lead to double computation of expressions.
ActionsDAGPtr result_projection;
Pipe pipe = reading.spreadMarkRanges(std::move(result.parts_with_ranges), reading.requested_num_streams, result, result_projection);
for (const auto & processor : pipe.getProcessors())
processor->setStorageLimits(query_info.storage_limits);
if (pipe.empty())
{
pipeline.init(Pipe(std::make_shared<NullSource>(getOutputStream().header)));
return;
}
if (result.sampling.use_sampling)
{
auto sampling_actions = std::make_shared<ExpressionActions>(result.sampling.filter_expression);
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(
header,
sampling_actions,
result.sampling.filter_function->getColumnName(),
false);
});
}
Block cur_header = pipe.getHeader();
auto append_actions = [&result_projection](ActionsDAGPtr actions)
{
if (!result_projection)
result_projection = std::move(actions);
else
result_projection = ActionsDAG::merge(std::move(*result_projection), std::move(*actions));
};
if (result_projection)
cur_header = result_projection->updateHeader(cur_header);
/// Extra columns may be returned (for example, if sampling is used).
/// Convert pipe to step header structure.
if (!isCompatibleHeader(cur_header, getOutputStream().header))
{
auto converting = ActionsDAG::makeConvertingActions(
cur_header.getColumnsWithTypeAndName(),
getOutputStream().header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name);
append_actions(std::move(converting));
}
if (result_projection)
{
auto projection_actions = std::make_shared<ExpressionActions>(result_projection);
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, projection_actions);
});
}
/// Some extra columns could be added by sample/final/in-order/etc
/// Remove them from header if not needed.
if (!blocksHaveEqualStructure(pipe.getHeader(), getOutputStream().header))
{
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
pipe.getHeader().getColumnsWithTypeAndName(),
getOutputStream().header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name,
true);
auto converting_dag_expr = std::make_shared<ExpressionActions>(convert_actions_dag);
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, converting_dag_expr);
});
}
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);
pipeline.init(std::move(pipe));
pipeline.addContext(context);
// Attach QueryIdHolder if needed
if (query_id_holder)
pipeline.setQueryIdHolder(std::move(query_id_holder));
}
std::unique_ptr<QueryPlan> createLocalPlanForParallelReplicas(
const ASTPtr & query_ast,
const Block & header,