Use QueryPlan for horizontal part of merge

This commit is contained in:
Alexander Gololobov 2024-09-02 18:27:24 +02:00
parent 1f5082ee8a
commit 7e444136bb
4 changed files with 334 additions and 145 deletions

View File

@ -1217,9 +1217,16 @@ void MutationsInterpreter::Source::read(
createReadFromPartStep(
MergeTreeSequentialSourceType::Mutation,
plan, *data, storage_snapshot,
part, required_columns,
apply_deleted_mask_, std::move(filter), context_,
plan,
*data, storage_snapshot,
part,
required_columns,
nullptr,
apply_deleted_mask_,
std::move(filter),
false,
false,
context_,
getLogger("MutationsInterpreter"));
}
else

View File

@ -38,6 +38,11 @@
#include <Processors/Transforms/DistinctSortedTransform.h>
#include <Processors/Transforms/DistinctTransform.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@ -1206,12 +1211,204 @@ bool MergeTask::execute()
}
/// Apply merge strategy (Ordinary, Colapsing, Aggregating, etc) to the stream
class ApplyMergeStep : public ITransformingStep /// TODO: is this transformation step?
{
public:
ApplyMergeStep(
const DataStream & input_stream_,
const SortDescription & sort_description_,
const Names partition_key_columns_,
const MergeTreeData::MergingParams & merging_params_,
WriteBuffer * rows_sources_write_buf_,
UInt64 merge_block_size_rows_,
UInt64 merge_block_size_bytes_,
bool blocks_are_granules_size_,
bool cleanup_)
: ITransformingStep(input_stream_, input_stream_.header, Traits{}) // TODO proper traits?
, sort_description(sort_description_)
, partition_key_columns(partition_key_columns_)
, merging_params(merging_params_)
, rows_sources_write_buf(rows_sources_write_buf_)
, merge_block_size_rows(merge_block_size_rows_)
, merge_block_size_bytes(merge_block_size_bytes_)
, blocks_are_granules_size(blocks_are_granules_size_)
, cleanup(cleanup_)
{}
String getName() const override { return "ApplyMergePolicy"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*pipelineSettings*/) override
{
/// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number.
/// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part,
/// that is going in insertion order.
ProcessorPtr merged_transform;
// /// There is no sense to have the block size bigger than one granule for merge operations.
// const UInt64 merge_block_size_rows = data_settings->merge_max_block_size;
// const UInt64 merge_block_size_bytes = data_settings->merge_max_block_size_bytes;
const auto &header = pipeline.getHeader();
const auto input_streams_count = pipeline.getNumStreams();
switch (merging_params.mode)
{
case MergeTreeData::MergingParams::Ordinary:
merged_transform = std::make_shared<MergingSortedTransform>(
header,
input_streams_count,
sort_description,
merge_block_size_rows,
merge_block_size_bytes,
SortingQueueStrategy::Default,
/* limit_= */0,
/* always_read_till_end_= */false,
rows_sources_write_buf,
blocks_are_granules_size);
break;
case MergeTreeData::MergingParams::Collapsing:
merged_transform = std::make_shared<CollapsingSortedTransform>(
header, input_streams_count, sort_description, merging_params.sign_column, false,
merge_block_size_rows, merge_block_size_bytes, rows_sources_write_buf, blocks_are_granules_size);
break;
case MergeTreeData::MergingParams::Summing:
merged_transform = std::make_shared<SummingSortedTransform>(
header, input_streams_count, sort_description, merging_params.columns_to_sum, partition_key_columns, merge_block_size_rows, merge_block_size_bytes);
break;
case MergeTreeData::MergingParams::Aggregating:
merged_transform = std::make_shared<AggregatingSortedTransform>(header, input_streams_count, sort_description, merge_block_size_rows, merge_block_size_bytes);
break;
case MergeTreeData::MergingParams::Replacing:
merged_transform = std::make_shared<ReplacingSortedTransform>(
header, input_streams_count, sort_description, merging_params.is_deleted_column, merging_params.version_column,
merge_block_size_rows, merge_block_size_bytes, rows_sources_write_buf, blocks_are_granules_size,
cleanup);
break;
case MergeTreeData::MergingParams::Graphite:
merged_transform = std::make_shared<GraphiteRollupSortedTransform>(
header, input_streams_count, sort_description, merge_block_size_rows, merge_block_size_bytes,
merging_params.graphite_params, time_of_merge);
break;
case MergeTreeData::MergingParams::VersionedCollapsing:
merged_transform = std::make_shared<VersionedCollapsingTransform>(
header, input_streams_count, sort_description, merging_params.sign_column,
merge_block_size_rows, merge_block_size_bytes, rows_sources_write_buf, blocks_are_granules_size);
break;
}
pipeline.addTransform(std::move(merged_transform));
#ifndef NDEBUG
if (!sort_description.empty())
{
pipeline.addSimpleTransform([&](const Block & header_)
{
auto transform = std::make_shared<CheckSortedTransform>(header_, sort_description);
return transform;
});
}
#endif
}
void updateOutputStream() override
{
output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits());
output_stream->sort_description = sort_description;
/// TODO: is this correct?
// if (partition_key_columns.empty())
output_stream->sort_scope = DataStream::SortScope::Global;
// else
// output_stream->sort_scope = DataStream::SortScope::Stream;
}
private:
SortDescription sort_description;
Names partition_key_columns;
MergeTreeData::MergingParams merging_params{};
WriteBuffer * rows_sources_write_buf;
const UInt64 merge_block_size_rows;
const UInt64 merge_block_size_bytes;
bool blocks_are_granules_size;
bool cleanup{false};
time_t time_of_merge{0};
};
class MaterializingStep : public ITransformingStep /// TODO: is this transformation step?
{
public:
explicit MaterializingStep(
const DataStream & input_stream_)
: ITransformingStep(input_stream_, input_stream_.header, Traits{}) // TODO proper traits?
{}
String getName() const override { return "Materializing"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override
{
pipeline.addTransform(std::make_shared<MaterializingTransform>(input_streams.front().header));
}
void updateOutputStream() override
{
/// TODO: can this be simplified?
output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits());
output_stream->sort_description = input_streams.front().sort_description;
}
};
class TTLStep : public ITransformingStep
{
public:
TTLStep(
const DataStream & input_stream_,
const ContextPtr & context_,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::MutableDataPartPtr & data_part_,
time_t current_time,
bool force_)
: ITransformingStep(input_stream_, input_stream_.header, Traits{}) // TODO proper traits?
{
transform = std::make_shared<TTLTransform>(context_, input_stream_.header, storage_, metadata_snapshot_, data_part_, current_time, force_);
subqueries_for_sets = transform->getSubqueries();
}
String getName() const override { return "Materializing"; }
PreparedSets::Subqueries getSubqueries() { return std::move(subqueries_for_sets); }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override
{
pipeline.addTransform(transform);
}
void updateOutputStream() override
{
// TODO: implement?
}
private:
std::shared_ptr<TTLTransform> transform;
PreparedSets::Subqueries subqueries_for_sets;
};
void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
{
/** Read from all parts, merge and write into a new one.
* In passing, we calculate expression for sorting.
*/
Pipes pipes;
global_ctx->watch_prev_elapsed = 0;
/// We count total amount of bytes in parts
@ -1238,143 +1435,92 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
global_ctx->horizontal_stage_progress = std::make_unique<MergeStageProgress>(
ctx->column_sizes ? ctx->column_sizes->keyColumnsWeight() : 1.0);
auto sorting_key_expression_dag = global_ctx->metadata_snapshot->getSortingKey().expression->getActionsDAG().clone();
/// Read from all parts
std::vector<QueryPlanPtr> plans;
for (const auto & part : global_ctx->future_part->parts)
{
Pipe pipe = createMergeTreeSequentialSource(
/// TODO: this is just for debugging purposes, remove it later
if (part->getMarksCount() == 0)
LOG_DEBUG(ctx->log, "Part {} is empty", part->name);
auto plan_for_part = std::make_unique<QueryPlan>();
createReadFromPartStep(
MergeTreeSequentialSourceType::Merge,
*plan_for_part,
*global_ctx->data,
global_ctx->storage_snapshot,
part,
global_ctx->merging_columns.getNames(),
/*mark_ranges=*/ {},
global_ctx->input_rows_filtered,
/*apply_deleted_mask=*/ true,
/*filter=*/ std::nullopt,
ctx->read_with_direct_io,
/*prefetch=*/ false);
/*prefetch=*/ false,
global_ctx->context,
ctx->log);
if (global_ctx->metadata_snapshot->hasSortingKey())
{
pipe.addSimpleTransform([this](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, global_ctx->metadata_snapshot->getSortingKey().expression);
});
/// Calculate sorting key expressions so that they are available for merge sorting.
auto calculate_sorting_key_expression_step = std::make_unique<ExpressionStep>(
plan_for_part->getCurrentDataStream(),
sorting_key_expression_dag.clone()); /// TODO: can we avoid cloning here?
plan_for_part->addStep(std::move(calculate_sorting_key_expression_step));
}
pipes.emplace_back(std::move(pipe));
plans.emplace_back(std::move(plan_for_part));
}
QueryPlan merge_parts_query_plan;
Names sort_columns = global_ctx->metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
sort_description.compile_sort_description = global_ctx->data->getContext()->getSettingsRef().compile_sort_description;
sort_description.min_count_to_compile_sort_description = global_ctx->data->getContext()->getSettingsRef().min_count_to_compile_sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
Names partition_key_columns = global_ctx->metadata_snapshot->getPartitionKey().column_names;
Block header = pipes.at(0).getHeader();
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(sort_columns[i], 1, 1);
#ifndef NDEBUG
if (!sort_description.empty())
/// Union of all parts streams
{
for (size_t i = 0; i < pipes.size(); ++i)
{
auto & pipe = pipes[i];
pipe.addSimpleTransform([&](const Block & header_)
{
auto transform = std::make_shared<CheckSortedTransform>(header_, sort_description);
transform->setDescription(global_ctx->future_part->parts[i]->name);
return transform;
});
}
DataStreams input_streams;
input_streams.reserve(plans.size());
for (auto & plan : plans)
input_streams.emplace_back(plan->getCurrentDataStream());
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
merge_parts_query_plan.unitePlans(std::move(union_step), std::move(plans));
}
#endif
/// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number.
/// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part,
/// that is going in insertion order.
ProcessorPtr merged_transform;
/// If merge is vertical we cannot calculate it
ctx->blocks_are_granules_size = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical);
/// There is no sense to have the block size bigger than one granule for merge operations.
const UInt64 merge_block_size_rows = data_settings->merge_max_block_size;
const UInt64 merge_block_size_bytes = data_settings->merge_max_block_size_bytes;
switch (ctx->merging_params.mode)
/// Merge
{
case MergeTreeData::MergingParams::Ordinary:
merged_transform = std::make_shared<MergingSortedTransform>(
header,
pipes.size(),
sort_description,
merge_block_size_rows,
merge_block_size_bytes,
SortingQueueStrategy::Default,
/* limit_= */0,
/* always_read_till_end_= */false,
ctx->rows_sources_write_buf.get(),
ctx->blocks_are_granules_size);
break;
Names sort_columns = global_ctx->metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
sort_description.compile_sort_description = global_ctx->data->getContext()->getSettingsRef().compile_sort_description;
sort_description.min_count_to_compile_sort_description = global_ctx->data->getContext()->getSettingsRef().min_count_to_compile_sort_description;
case MergeTreeData::MergingParams::Collapsing:
merged_transform = std::make_shared<CollapsingSortedTransform>(
header, pipes.size(), sort_description, ctx->merging_params.sign_column, false,
merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size);
break;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
case MergeTreeData::MergingParams::Summing:
merged_transform = std::make_shared<SummingSortedTransform>(
header, pipes.size(), sort_description, ctx->merging_params.columns_to_sum, partition_key_columns, merge_block_size_rows, merge_block_size_bytes);
break;
Names partition_key_columns = global_ctx->metadata_snapshot->getPartitionKey().column_names;
case MergeTreeData::MergingParams::Aggregating:
merged_transform = std::make_shared<AggregatingSortedTransform>(header, pipes.size(), sort_description, merge_block_size_rows, merge_block_size_bytes);
break;
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(sort_columns[i], 1, 1);
case MergeTreeData::MergingParams::Replacing:
if (global_ctx->cleanup && !data_settings->allow_experimental_replacing_merge_with_cleanup)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental merges with CLEANUP are not allowed");
/// If merge is vertical we cannot calculate it
ctx->blocks_are_granules_size = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical);
merged_transform = std::make_shared<ReplacingSortedTransform>(
header, pipes.size(), sort_description, ctx->merging_params.is_deleted_column, ctx->merging_params.version_column,
merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size,
global_ctx->cleanup);
break;
if (global_ctx->cleanup && !data_settings->allow_experimental_replacing_merge_with_cleanup)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental merges with CLEANUP are not allowed");
case MergeTreeData::MergingParams::Graphite:
merged_transform = std::make_shared<GraphiteRollupSortedTransform>(
header, pipes.size(), sort_description, merge_block_size_rows, merge_block_size_bytes,
ctx->merging_params.graphite_params, global_ctx->time_of_merge);
break;
case MergeTreeData::MergingParams::VersionedCollapsing:
merged_transform = std::make_shared<VersionedCollapsingTransform>(
header, pipes.size(), sort_description, ctx->merging_params.sign_column,
merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size);
break;
auto merge_step = std::make_unique<ApplyMergeStep>(
merge_parts_query_plan.getCurrentDataStream(),
sort_description,
partition_key_columns,
ctx->merging_params,
ctx->rows_sources_write_buf.get(),
data_settings->merge_max_block_size,
data_settings->merge_max_block_size_bytes,
ctx->blocks_are_granules_size,
global_ctx->cleanup);
merge_step->setStepDescription("Merge sorted parts");
merge_parts_query_plan.addStep(std::move(merge_step));
}
auto builder = std::make_unique<QueryPipelineBuilder>();
builder->init(Pipe::unitePipes(std::move(pipes)));
builder->addTransform(std::move(merged_transform));
#ifndef NDEBUG
if (!sort_description.empty())
{
builder->addSimpleTransform([&](const Block & header_)
{
auto transform = std::make_shared<CheckSortedTransform>(header_, sort_description);
return transform;
});
}
#endif
if (global_ctx->deduplicate)
{
const auto & virtuals = *global_ctx->data->getVirtualsPtr();
@ -1383,44 +1529,56 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
/// If deduplicate_by_columns is empty, add all columns except virtuals.
if (global_ctx->deduplicate_by_columns.empty())
{
for (const auto & column : global_ctx->merging_columns)
for (const auto & column_name : global_ctx->merging_columns.getNames())
{
if (virtuals.tryGet(column.name, VirtualsKind::Persistent))
if (virtuals.tryGet(column_name, VirtualsKind::Persistent))
continue;
global_ctx->deduplicate_by_columns.emplace_back(column.name);
global_ctx->deduplicate_by_columns.emplace_back(column_name);
}
}
if (DistinctSortedTransform::isApplicable(header, sort_description, global_ctx->deduplicate_by_columns))
builder->addTransform(std::make_shared<DistinctSortedTransform>(
builder->getHeader(), sort_description, SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns));
else
builder->addTransform(std::make_shared<DistinctTransform>(
builder->getHeader(), SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns));
auto deduplication_step = std::make_unique<DistinctStep>(
merge_parts_query_plan.getCurrentDataStream(),
SizeLimits(), 0 /*limit_hint*/,
global_ctx->deduplicate_by_columns,
false,
true /*TODO: ??*/);
deduplication_step->setStepDescription("Deduplication step");
merge_parts_query_plan.addStep(std::move(deduplication_step));
}
PreparedSets::Subqueries subqueries;
/// TTL step
if (ctx->need_remove_expired_values)
{
auto transform = std::make_shared<TTLTransform>(global_ctx->context, builder->getHeader(), *global_ctx->data, global_ctx->metadata_snapshot, global_ctx->new_data_part, global_ctx->time_of_merge, ctx->force_ttl);
subqueries = transform->getSubqueries();
builder->addTransform(std::move(transform));
auto ttl_step = std::make_unique<TTLStep>(
merge_parts_query_plan.getCurrentDataStream(), global_ctx->context, *global_ctx->data, global_ctx->metadata_snapshot, global_ctx->new_data_part, global_ctx->time_of_merge, ctx->force_ttl);
subqueries = ttl_step->getSubqueries();
ttl_step->setStepDescription("TTL step");
merge_parts_query_plan.addStep(std::move(ttl_step));
}
/// Secondary indices expressions
if (!global_ctx->merging_skip_indexes.empty())
{
builder->addTransform(std::make_shared<ExpressionTransform>(
builder->getHeader(),
global_ctx->merging_skip_indexes.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(),
global_ctx->data->getContext())));
builder->addTransform(std::make_shared<MaterializingTransform>(builder->getHeader()));
auto indices_expression_dag = global_ctx->merging_skip_indexes.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), global_ctx->data->getContext())->getActionsDAG().clone();
auto calculate_indices_expression_step = std::make_unique<ExpressionStep>(
merge_parts_query_plan.getCurrentDataStream(),
std::move(indices_expression_dag));
merge_parts_query_plan.addStep(std::move(calculate_indices_expression_step));
/// TODO: what is the purpose of MaterializingTransform in the original code?
merge_parts_query_plan.addStep(std::make_unique<MaterializingStep>(merge_parts_query_plan.getCurrentDataStream()));
}
if (!subqueries.empty())
builder = addCreatingSetsTransform(std::move(builder), std::move(subqueries), global_ctx->context);
addCreatingSetsStep(merge_parts_query_plan, std::move(subqueries), global_ctx->context);
auto pipelineSettings = BuildQueryPipelineSettings::fromContext(global_ctx->context);
auto builder = merge_parts_query_plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(global_ctx->context),
pipelineSettings);
global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
/// Dereference unique_ptr and pass horizontal_stage_progress by reference

View File

@ -347,8 +347,11 @@ public:
const StorageSnapshotPtr & storage_snapshot_,
MergeTreeData::DataPartPtr data_part_,
Names columns_to_read_,
std::shared_ptr<std::atomic<size_t>> filtered_rows_count_,
bool apply_deleted_mask_,
std::optional<ActionsDAG> filter_,
bool read_with_direct_io_,
bool prefetch_,
ContextPtr context_,
LoggerPtr log_)
: ISourceStep(DataStream{.header = storage_snapshot_->getSampleBlockForColumns(columns_to_read_)})
@ -357,8 +360,11 @@ public:
, storage_snapshot(storage_snapshot_)
, data_part(std::move(data_part_))
, columns_to_read(std::move(columns_to_read_))
, filtered_rows_count(std::move(filtered_rows_count_))
, apply_deleted_mask(apply_deleted_mask_)
, filter(std::move(filter_))
, read_with_direct_io(read_with_direct_io_)
, prefetch(prefetch_)
, context(std::move(context_))
, log(log_)
{
@ -401,24 +407,27 @@ public:
data_part,
columns_to_read,
std::move(mark_ranges),
/*filtered_rows_count=*/ nullptr,
filtered_rows_count,
apply_deleted_mask,
/*read_with_direct_io=*/ false,
/*prefetch=*/ false);
read_with_direct_io,
prefetch);
pipeline.init(Pipe(std::move(source)));
}
private:
MergeTreeSequentialSourceType type;
const MergeTreeSequentialSourceType type;
const MergeTreeData & storage;
StorageSnapshotPtr storage_snapshot;
MergeTreeData::DataPartPtr data_part;
Names columns_to_read;
bool apply_deleted_mask;
std::optional<ActionsDAG> filter;
ContextPtr context;
LoggerPtr log;
const StorageSnapshotPtr storage_snapshot;
const MergeTreeData::DataPartPtr data_part;
const Names columns_to_read;
const std::shared_ptr<std::atomic<size_t>> filtered_rows_count;
const bool apply_deleted_mask;
const std::optional<ActionsDAG> filter;
const bool read_with_direct_io;
const bool prefetch;
const ContextPtr context;
const LoggerPtr log;
};
void createReadFromPartStep(
@ -428,15 +437,27 @@ void createReadFromPartStep(
const StorageSnapshotPtr & storage_snapshot,
MergeTreeData::DataPartPtr data_part,
Names columns_to_read,
std::shared_ptr<std::atomic<size_t>> filtered_rows_count,
bool apply_deleted_mask,
std::optional<ActionsDAG> filter,
bool read_with_direct_io,
bool prefetch,
ContextPtr context,
LoggerPtr log)
{
auto reading = std::make_unique<ReadFromPart>(type,
storage, storage_snapshot, std::move(data_part),
std::move(columns_to_read), apply_deleted_mask,
std::move(filter), std::move(context), log);
auto reading = std::make_unique<ReadFromPart>(
type,
storage,
storage_snapshot,
std::move(data_part),
std::move(columns_to_read),
filtered_rows_count,
apply_deleted_mask,
std::move(filter),
read_with_direct_io,
prefetch,
std::move(context),
log);
plan.addStep(std::move(reading));
}

View File

@ -37,8 +37,11 @@ void createReadFromPartStep(
const StorageSnapshotPtr & storage_snapshot,
MergeTreeData::DataPartPtr data_part,
Names columns_to_read,
std::shared_ptr<std::atomic<size_t>> filtered_rows_count,
bool apply_deleted_mask,
std::optional<ActionsDAG> filter,
bool read_with_direct_io,
bool prefetch,
ContextPtr context,
LoggerPtr log);