Merge pull request #69167 from ClickHouse/query_plan_for_merge

Use QueryPlan for merge
This commit is contained in:
Alexander Gololobov 2024-09-11 14:41:25 +00:00 committed by GitHub
commit f69cb73df0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 549 additions and 207 deletions

View File

@ -1224,9 +1224,18 @@ void MutationsInterpreter::Source::read(
createReadFromPartStep(
MergeTreeSequentialSourceType::Mutation,
plan, *data, storage_snapshot,
part, alter_conversions, required_columns,
apply_deleted_mask_, std::move(filter), context_,
plan,
*data,
storage_snapshot,
part,
alter_conversions,
required_columns,
nullptr,
apply_deleted_mask_,
std::move(filter),
false,
false,
context_,
getLogger("MutationsInterpreter"));
}
else

View File

@ -38,6 +38,11 @@
#include <Processors/Transforms/DistinctSortedTransform.h>
#include <Processors/Transforms/DistinctTransform.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@ -190,7 +195,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu
}
}
bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
{
ProfileEvents::increment(ProfileEvents::Merge);
@ -664,7 +669,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::constructTaskForProjectionPart
}
bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeMergeProjections() // NOLINT
bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeMergeProjections() const
{
/// In case if there are no projections we didn't construct a task
if (!ctx->merge_projection_parts_task_ptr)
@ -683,7 +688,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeMergeProjections() // N
return true;
}
bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() const
{
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
UInt64 step_time_ms = global_ctx->data->getSettings()->background_task_preferred_step_execution_time_ms.totalMilliseconds();
@ -812,33 +817,154 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
ctx->use_prefetch = all_parts_on_remote_disks && global_ctx->data->getSettings()->vertical_merge_remote_filesystem_prefetch;
if (ctx->use_prefetch && ctx->it_name_and_type != global_ctx->gathering_columns.end())
ctx->prepared_pipe = createPipeForReadingOneColumn(ctx->it_name_and_type->name);
ctx->prepared_pipeline = createPipelineForReadingOneColumn(ctx->it_name_and_type->name);
return false;
}
Pipe MergeTask::VerticalMergeStage::createPipeForReadingOneColumn(const String & column_name) const
/// Gathers values from all parts for one column using rows sources temporary file
class ColumnGathererStep : public ITransformingStep
{
Pipes pipes;
public:
ColumnGathererStep(
const DataStream & input_stream_,
CompressedReadBufferFromFile * rows_sources_read_buf_,
UInt64 merge_block_size_rows_,
UInt64 merge_block_size_bytes_,
bool is_result_sparse_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, rows_sources_read_buf(rows_sources_read_buf_)
, merge_block_size_rows(merge_block_size_rows_)
, merge_block_size_bytes(merge_block_size_bytes_)
, is_result_sparse(is_result_sparse_)
{}
String getName() const override { return "ColumnGatherer"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*pipelineSettings*/) override
{
const auto &header = pipeline.getHeader();
const auto input_streams_count = pipeline.getNumStreams();
rows_sources_read_buf->seek(0, 0);
auto transform = std::make_unique<ColumnGathererTransform>(
header,
input_streams_count,
*rows_sources_read_buf,
merge_block_size_rows,
merge_block_size_bytes,
is_result_sparse);
pipeline.addTransform(std::move(transform));
}
void updateOutputStream() override
{
output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits());
}
private:
static Traits getTraits()
{
return ITransformingStep::Traits
{
{
.returns_single_stream = true,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = false,
}
};
}
MergeTreeData::MergingParams merging_params{};
CompressedReadBufferFromFile * rows_sources_read_buf;
const UInt64 merge_block_size_rows;
const UInt64 merge_block_size_bytes;
const bool is_result_sparse;
};
MergeTask::VerticalMergeRuntimeContext::PreparedColumnPipeline MergeTask::VerticalMergeStage::createPipelineForReadingOneColumn(const String & column_name) const
{
/// Read from all parts
std::vector<QueryPlanPtr> plans;
for (size_t part_num = 0; part_num < global_ctx->future_part->parts.size(); ++part_num)
{
Pipe pipe = createMergeTreeSequentialSource(
auto plan_for_part = std::make_unique<QueryPlan>();
createReadFromPartStep(
MergeTreeSequentialSourceType::Merge,
*plan_for_part,
*global_ctx->data,
global_ctx->storage_snapshot,
global_ctx->future_part->parts[part_num],
global_ctx->alter_conversions[part_num],
Names{column_name},
/*mark_ranges=*/ {},
global_ctx->input_rows_filtered,
/*apply_deleted_mask=*/ true,
std::nullopt,
ctx->read_with_direct_io,
ctx->use_prefetch);
ctx->use_prefetch,
global_ctx->context,
getLogger("VerticalMergeStage"));
pipes.emplace_back(std::move(pipe));
plans.emplace_back(std::move(plan_for_part));
}
return Pipe::unitePipes(std::move(pipes));
QueryPlan merge_column_query_plan;
/// Union of all parts streams
{
DataStreams input_streams;
input_streams.reserve(plans.size());
for (auto & plan : plans)
input_streams.emplace_back(plan->getCurrentDataStream());
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
merge_column_query_plan.unitePlans(std::move(union_step), std::move(plans));
}
/// Add column gatherer step
{
bool is_result_sparse = global_ctx->new_data_part->getSerialization(column_name)->getKind() == ISerialization::Kind::SPARSE;
const auto data_settings = global_ctx->data->getSettings();
auto merge_step = std::make_unique<ColumnGathererStep>(
merge_column_query_plan.getCurrentDataStream(),
ctx->rows_sources_read_buf.get(), //global_ctx->rows_sources_temporary_file_name,
data_settings->merge_max_block_size,
data_settings->merge_max_block_size_bytes,
is_result_sparse);
merge_step->setStepDescription("Gather column");
merge_column_query_plan.addStep(std::move(merge_step));
}
/// Add expression step for indexes
MergeTreeIndices indexes_to_recalc;
IndicesDescription indexes_to_recalc_description;
{
auto indexes_it = global_ctx->skip_indexes_by_column.find(column_name);
if (indexes_it != global_ctx->skip_indexes_by_column.end())
{
indexes_to_recalc_description = indexes_it->second;
indexes_to_recalc = MergeTreeIndexFactory::instance().getMany(indexes_it->second);
auto indices_expression_dag = indexes_it->second.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), global_ctx->data->getContext())->getActionsDAG().clone();
indices_expression_dag.addMaterializingOutputActions(); /// Const columns cannot be written without materialization.
auto calculate_indices_expression_step = std::make_unique<ExpressionStep>(
merge_column_query_plan.getCurrentDataStream(),
std::move(indices_expression_dag));
merge_column_query_plan.addStep(std::move(calculate_indices_expression_step));
}
}
auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context);
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
auto builder = merge_column_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);
return {QueryPipelineBuilder::getPipeline(std::move(*builder)), std::move(indexes_to_recalc)};
}
void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
@ -848,50 +974,22 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
ctx->progress_before = global_ctx->merge_list_element_ptr->progress.load(std::memory_order_relaxed);
global_ctx->column_progress = std::make_unique<MergeStageProgress>(ctx->progress_before, ctx->column_sizes->columnWeight(column_name));
Pipe pipe;
if (ctx->prepared_pipe)
VerticalMergeRuntimeContext::PreparedColumnPipeline column_pipepline;
if (ctx->prepared_pipeline)
{
pipe = std::move(*ctx->prepared_pipe);
column_pipepline = std::move(*ctx->prepared_pipeline);
/// Prepare next column pipeline to initiate prefetching
auto next_column_it = std::next(ctx->it_name_and_type);
if (next_column_it != global_ctx->gathering_columns.end())
ctx->prepared_pipe = createPipeForReadingOneColumn(next_column_it->name);
ctx->prepared_pipeline = createPipelineForReadingOneColumn(next_column_it->name);
}
else
{
pipe = createPipeForReadingOneColumn(column_name);
column_pipepline = createPipelineForReadingOneColumn(column_name);
}
ctx->rows_sources_read_buf->seek(0, 0);
bool is_result_sparse = global_ctx->new_data_part->getSerialization(column_name)->getKind() == ISerialization::Kind::SPARSE;
const auto data_settings = global_ctx->data->getSettings();
auto transform = std::make_unique<ColumnGathererTransform>(
pipe.getHeader(),
pipe.numOutputPorts(),
*ctx->rows_sources_read_buf,
data_settings->merge_max_block_size,
data_settings->merge_max_block_size_bytes,
is_result_sparse);
pipe.addTransform(std::move(transform));
MergeTreeIndices indexes_to_recalc;
auto indexes_it = global_ctx->skip_indexes_by_column.find(column_name);
if (indexes_it != global_ctx->skip_indexes_by_column.end())
{
indexes_to_recalc = MergeTreeIndexFactory::instance().getMany(indexes_it->second);
pipe.addTransform(std::make_shared<ExpressionTransform>(
pipe.getHeader(),
indexes_it->second.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(),
global_ctx->data->getContext())));
pipe.addTransform(std::make_shared<MaterializingTransform>(pipe.getHeader()));
}
ctx->column_parts_pipeline = QueryPipeline(std::move(pipe));
ctx->column_parts_pipeline = std::move(column_pipepline.pipeline);
/// Dereference unique_ptr
ctx->column_parts_pipeline.setProgressCallback(MergeProgressCallback(
@ -909,12 +1007,16 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
global_ctx->metadata_snapshot,
columns_list,
ctx->compression_codec,
indexes_to_recalc,
column_pipepline.indexes_to_recalc,
getStatisticsForColumns(columns_list, global_ctx->metadata_snapshot),
&global_ctx->written_offset_columns,
global_ctx->to->getIndexGranularity());
ctx->column_elems_written = 0;
/// rows_sources_read_buf is reused for each column so we need to rewind it explicitly each time
/// This sharing also prevents from from running multiple merge of individual columns in parallel.
ctx->rows_sources_read_buf->seek(0, 0);
}
@ -1219,12 +1321,200 @@ bool MergeTask::execute()
}
void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
/// Apply merge strategy (Ordinary, Colapsing, Aggregating, etc) to the stream
class MergePartsStep : public ITransformingStep
{
public:
MergePartsStep(
const DataStream & input_stream_,
const SortDescription & sort_description_,
const Names partition_key_columns_,
const MergeTreeData::MergingParams & merging_params_,
WriteBuffer * rows_sources_write_buf_,
UInt64 merge_block_size_rows_,
UInt64 merge_block_size_bytes_,
bool blocks_are_granules_size_,
bool cleanup_,
time_t time_of_merge_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, sort_description(sort_description_)
, partition_key_columns(partition_key_columns_)
, merging_params(merging_params_)
, rows_sources_write_buf(rows_sources_write_buf_)
, merge_block_size_rows(merge_block_size_rows_)
, merge_block_size_bytes(merge_block_size_bytes_)
, blocks_are_granules_size(blocks_are_granules_size_)
, cleanup(cleanup_)
, time_of_merge(time_of_merge_)
{}
String getName() const override { return "MergeParts"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*pipelineSettings*/) override
{
/// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number.
/// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part,
/// that is going in insertion order.
ProcessorPtr merged_transform;
const auto &header = pipeline.getHeader();
const auto input_streams_count = pipeline.getNumStreams();
switch (merging_params.mode)
{
case MergeTreeData::MergingParams::Ordinary:
merged_transform = std::make_shared<MergingSortedTransform>(
header,
input_streams_count,
sort_description,
merge_block_size_rows,
merge_block_size_bytes,
SortingQueueStrategy::Default,
/* limit_= */0,
/* always_read_till_end_= */false,
rows_sources_write_buf,
blocks_are_granules_size);
break;
case MergeTreeData::MergingParams::Collapsing:
merged_transform = std::make_shared<CollapsingSortedTransform>(
header, input_streams_count, sort_description, merging_params.sign_column, false,
merge_block_size_rows, merge_block_size_bytes, rows_sources_write_buf, blocks_are_granules_size);
break;
case MergeTreeData::MergingParams::Summing:
merged_transform = std::make_shared<SummingSortedTransform>(
header, input_streams_count, sort_description, merging_params.columns_to_sum, partition_key_columns, merge_block_size_rows, merge_block_size_bytes);
break;
case MergeTreeData::MergingParams::Aggregating:
merged_transform = std::make_shared<AggregatingSortedTransform>(header, input_streams_count, sort_description, merge_block_size_rows, merge_block_size_bytes);
break;
case MergeTreeData::MergingParams::Replacing:
merged_transform = std::make_shared<ReplacingSortedTransform>(
header, input_streams_count, sort_description, merging_params.is_deleted_column, merging_params.version_column,
merge_block_size_rows, merge_block_size_bytes, rows_sources_write_buf, blocks_are_granules_size,
cleanup);
break;
case MergeTreeData::MergingParams::Graphite:
merged_transform = std::make_shared<GraphiteRollupSortedTransform>(
header, input_streams_count, sort_description, merge_block_size_rows, merge_block_size_bytes,
merging_params.graphite_params, time_of_merge);
break;
case MergeTreeData::MergingParams::VersionedCollapsing:
merged_transform = std::make_shared<VersionedCollapsingTransform>(
header, input_streams_count, sort_description, merging_params.sign_column,
merge_block_size_rows, merge_block_size_bytes, rows_sources_write_buf, blocks_are_granules_size);
break;
}
pipeline.addTransform(std::move(merged_transform));
#ifndef NDEBUG
if (!sort_description.empty())
{
pipeline.addSimpleTransform([&](const Block & header_)
{
auto transform = std::make_shared<CheckSortedTransform>(header_, sort_description);
return transform;
});
}
#endif
}
void updateOutputStream() override
{
output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits());
}
private:
static Traits getTraits()
{
return ITransformingStep::Traits
{
{
.returns_single_stream = true,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = false,
}
};
}
const SortDescription sort_description;
const Names partition_key_columns;
const MergeTreeData::MergingParams merging_params{};
WriteBuffer * rows_sources_write_buf;
const UInt64 merge_block_size_rows;
const UInt64 merge_block_size_bytes;
const bool blocks_are_granules_size;
const bool cleanup{false};
const time_t time_of_merge{0};
};
class TTLStep : public ITransformingStep
{
public:
TTLStep(
const DataStream & input_stream_,
const ContextPtr & context_,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::MutableDataPartPtr & data_part_,
time_t current_time,
bool force_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
{
transform = std::make_shared<TTLTransform>(context_, input_stream_.header, storage_, metadata_snapshot_, data_part_, current_time, force_);
subqueries_for_sets = transform->getSubqueries();
}
String getName() const override { return "TTL"; }
PreparedSets::Subqueries getSubqueries() { return std::move(subqueries_for_sets); }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override
{
pipeline.addTransform(transform);
}
void updateOutputStream() override
{
output_stream = createOutputStream(input_streams.front(), input_streams.front().header, getDataStreamTraits());
}
private:
static Traits getTraits()
{
return ITransformingStep::Traits
{
{
.returns_single_stream = true,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = false,
}
};
}
std::shared_ptr<TTLTransform> transform;
PreparedSets::Subqueries subqueries_for_sets;
};
void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
{
/** Read from all parts, merge and write into a new one.
* In passing, we calculate expression for sorting.
*/
Pipes pipes;
global_ctx->watch_prev_elapsed = 0;
/// We count total amount of bytes in parts
@ -1251,143 +1541,91 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
global_ctx->horizontal_stage_progress = std::make_unique<MergeStageProgress>(
ctx->column_sizes ? ctx->column_sizes->keyColumnsWeight() : 1.0);
/// Read from all parts
std::vector<QueryPlanPtr> plans;
for (size_t i = 0; i < global_ctx->future_part->parts.size(); ++i)
{
Pipe pipe = createMergeTreeSequentialSource(
if (global_ctx->future_part->parts[i]->getMarksCount() == 0)
LOG_TRACE(ctx->log, "Part {} is empty", global_ctx->future_part->parts[i]->name);
auto plan_for_part = std::make_unique<QueryPlan>();
createReadFromPartStep(
MergeTreeSequentialSourceType::Merge,
*plan_for_part,
*global_ctx->data,
global_ctx->storage_snapshot,
global_ctx->future_part->parts[i],
global_ctx->alter_conversions[i],
global_ctx->merging_columns.getNames(),
/*mark_ranges=*/ {},
global_ctx->input_rows_filtered,
/*apply_deleted_mask=*/ true,
/*filter=*/ std::nullopt,
ctx->read_with_direct_io,
/*prefetch=*/ false);
/*prefetch=*/ false,
global_ctx->context,
ctx->log);
if (global_ctx->metadata_snapshot->hasSortingKey())
{
pipe.addSimpleTransform([this](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, global_ctx->metadata_snapshot->getSortingKey().expression);
});
}
pipes.emplace_back(std::move(pipe));
plans.emplace_back(std::move(plan_for_part));
}
QueryPlan merge_parts_query_plan;
Names sort_columns = global_ctx->metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
sort_description.compile_sort_description = global_ctx->data->getContext()->getSettingsRef().compile_sort_description;
sort_description.min_count_to_compile_sort_description = global_ctx->data->getContext()->getSettingsRef().min_count_to_compile_sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
Names partition_key_columns = global_ctx->metadata_snapshot->getPartitionKey().column_names;
Block header = pipes.at(0).getHeader();
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(sort_columns[i], 1, 1);
#ifndef NDEBUG
if (!sort_description.empty())
/// Union of all parts streams
{
for (size_t i = 0; i < pipes.size(); ++i)
{
auto & pipe = pipes[i];
pipe.addSimpleTransform([&](const Block & header_)
{
auto transform = std::make_shared<CheckSortedTransform>(header_, sort_description);
transform->setDescription(global_ctx->future_part->parts[i]->name);
return transform;
});
}
DataStreams input_streams;
input_streams.reserve(plans.size());
for (auto & plan : plans)
input_streams.emplace_back(plan->getCurrentDataStream());
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
merge_parts_query_plan.unitePlans(std::move(union_step), std::move(plans));
}
#endif
/// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number.
/// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part,
/// that is going in insertion order.
ProcessorPtr merged_transform;
/// If merge is vertical we cannot calculate it
ctx->blocks_are_granules_size = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical);
/// There is no sense to have the block size bigger than one granule for merge operations.
const UInt64 merge_block_size_rows = data_settings->merge_max_block_size;
const UInt64 merge_block_size_bytes = data_settings->merge_max_block_size_bytes;
switch (ctx->merging_params.mode)
if (global_ctx->metadata_snapshot->hasSortingKey())
{
case MergeTreeData::MergingParams::Ordinary:
merged_transform = std::make_shared<MergingSortedTransform>(
header,
pipes.size(),
sort_description,
merge_block_size_rows,
merge_block_size_bytes,
SortingQueueStrategy::Default,
/* limit_= */0,
/* always_read_till_end_= */false,
ctx->rows_sources_write_buf.get(),
ctx->blocks_are_granules_size);
break;
case MergeTreeData::MergingParams::Collapsing:
merged_transform = std::make_shared<CollapsingSortedTransform>(
header, pipes.size(), sort_description, ctx->merging_params.sign_column, false,
merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size);
break;
case MergeTreeData::MergingParams::Summing:
merged_transform = std::make_shared<SummingSortedTransform>(
header, pipes.size(), sort_description, ctx->merging_params.columns_to_sum, partition_key_columns, merge_block_size_rows, merge_block_size_bytes);
break;
case MergeTreeData::MergingParams::Aggregating:
merged_transform = std::make_shared<AggregatingSortedTransform>(header, pipes.size(), sort_description, merge_block_size_rows, merge_block_size_bytes);
break;
case MergeTreeData::MergingParams::Replacing:
if (global_ctx->cleanup && !data_settings->allow_experimental_replacing_merge_with_cleanup)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental merges with CLEANUP are not allowed");
merged_transform = std::make_shared<ReplacingSortedTransform>(
header, pipes.size(), sort_description, ctx->merging_params.is_deleted_column, ctx->merging_params.version_column,
merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size,
global_ctx->cleanup);
break;
case MergeTreeData::MergingParams::Graphite:
merged_transform = std::make_shared<GraphiteRollupSortedTransform>(
header, pipes.size(), sort_description, merge_block_size_rows, merge_block_size_bytes,
ctx->merging_params.graphite_params, global_ctx->time_of_merge);
break;
case MergeTreeData::MergingParams::VersionedCollapsing:
merged_transform = std::make_shared<VersionedCollapsingTransform>(
header, pipes.size(), sort_description, ctx->merging_params.sign_column,
merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size);
break;
/// Calculate sorting key expressions so that they are available for merge sorting.
auto sorting_key_expression_dag = global_ctx->metadata_snapshot->getSortingKey().expression->getActionsDAG().clone();
auto calculate_sorting_key_expression_step = std::make_unique<ExpressionStep>(
merge_parts_query_plan.getCurrentDataStream(),
std::move(sorting_key_expression_dag));
merge_parts_query_plan.addStep(std::move(calculate_sorting_key_expression_step));
}
auto builder = std::make_unique<QueryPipelineBuilder>();
builder->init(Pipe::unitePipes(std::move(pipes)));
builder->addTransform(std::move(merged_transform));
#ifndef NDEBUG
if (!sort_description.empty())
/// Merge
{
builder->addSimpleTransform([&](const Block & header_)
{
auto transform = std::make_shared<CheckSortedTransform>(header_, sort_description);
return transform;
});
Names sort_columns = global_ctx->metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
sort_description.compile_sort_description = global_ctx->data->getContext()->getSettingsRef().compile_sort_description;
sort_description.min_count_to_compile_sort_description = global_ctx->data->getContext()->getSettingsRef().min_count_to_compile_sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
Names partition_key_columns = global_ctx->metadata_snapshot->getPartitionKey().column_names;
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(sort_columns[i], 1, 1);
/// If merge is vertical we cannot calculate it
ctx->blocks_are_granules_size = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical);
if (global_ctx->cleanup && !data_settings->allow_experimental_replacing_merge_with_cleanup)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Experimental merges with CLEANUP are not allowed");
auto merge_step = std::make_unique<MergePartsStep>(
merge_parts_query_plan.getCurrentDataStream(),
sort_description,
partition_key_columns,
ctx->merging_params,
ctx->rows_sources_write_buf.get(),
data_settings->merge_max_block_size,
data_settings->merge_max_block_size_bytes,
ctx->blocks_are_granules_size,
global_ctx->cleanup,
global_ctx->time_of_merge);
merge_step->setStepDescription("Merge sorted parts");
merge_parts_query_plan.addStep(std::move(merge_step));
}
#endif
if (global_ctx->deduplicate)
{
@ -1406,37 +1644,50 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
}
}
if (DistinctSortedTransform::isApplicable(header, sort_description, global_ctx->deduplicate_by_columns))
builder->addTransform(std::make_shared<DistinctSortedTransform>(
builder->getHeader(), sort_description, SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns));
else
builder->addTransform(std::make_shared<DistinctTransform>(
builder->getHeader(), SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns));
auto deduplication_step = std::make_unique<DistinctStep>(
merge_parts_query_plan.getCurrentDataStream(),
SizeLimits(), 0 /*limit_hint*/,
global_ctx->deduplicate_by_columns,
false /*pre_distinct*/,
true /*optimize_distinct_in_order TODO: looks like it should be enabled*/);
deduplication_step->setStepDescription("Deduplication step");
merge_parts_query_plan.addStep(std::move(deduplication_step));
}
PreparedSets::Subqueries subqueries;
/// TTL step
if (ctx->need_remove_expired_values)
{
auto transform = std::make_shared<TTLTransform>(global_ctx->context, builder->getHeader(), *global_ctx->data, global_ctx->metadata_snapshot, global_ctx->new_data_part, global_ctx->time_of_merge, ctx->force_ttl);
subqueries = transform->getSubqueries();
builder->addTransform(std::move(transform));
auto ttl_step = std::make_unique<TTLStep>(
merge_parts_query_plan.getCurrentDataStream(), global_ctx->context, *global_ctx->data, global_ctx->metadata_snapshot, global_ctx->new_data_part, global_ctx->time_of_merge, ctx->force_ttl);
subqueries = ttl_step->getSubqueries();
ttl_step->setStepDescription("TTL step");
merge_parts_query_plan.addStep(std::move(ttl_step));
}
/// Secondary indices expressions
if (!global_ctx->merging_skip_indexes.empty())
{
builder->addTransform(std::make_shared<ExpressionTransform>(
builder->getHeader(),
global_ctx->merging_skip_indexes.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(),
global_ctx->data->getContext())));
builder->addTransform(std::make_shared<MaterializingTransform>(builder->getHeader()));
auto indices_expression_dag = global_ctx->merging_skip_indexes.getSingleExpressionForIndices(global_ctx->metadata_snapshot->getColumns(), global_ctx->data->getContext())->getActionsDAG().clone();
indices_expression_dag.addMaterializingOutputActions(); /// Const columns cannot be written without materialization.
auto calculate_indices_expression_step = std::make_unique<ExpressionStep>(
merge_parts_query_plan.getCurrentDataStream(),
std::move(indices_expression_dag));
merge_parts_query_plan.addStep(std::move(calculate_indices_expression_step));
}
if (!subqueries.empty())
builder = addCreatingSetsTransform(std::move(builder), std::move(subqueries), global_ctx->context);
addCreatingSetsStep(merge_parts_query_plan, std::move(subqueries), global_ctx->context);
{
auto pipeline_settings = BuildQueryPipelineSettings::fromContext(global_ctx->context);
auto optimization_settings = QueryPlanOptimizationSettings::fromContext(global_ctx->context);
auto builder = merge_parts_query_plan.buildQueryPipeline(optimization_settings, pipeline_settings);
global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
}
global_ctx->merged_pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
/// Dereference unique_ptr and pass horizontal_stage_progress by reference
global_ctx->merged_pipeline.setProgressCallback(MergeProgressCallback(global_ctx->merge_list_element_ptr, global_ctx->watch_prev_elapsed, *global_ctx->horizontal_stage_progress));
/// Is calculated inside MergeProgressCallback.

View File

@ -269,12 +269,12 @@ private:
{
bool execute() override;
bool prepare();
bool executeImpl();
bool prepare() const;
bool executeImpl() const;
void finalize() const;
/// NOTE: Using pointer-to-member instead of std::function and lambda makes stacktraces much more concise and readable
using ExecuteAndFinalizeHorizontalPartSubtasks = std::array<bool(ExecuteAndFinalizeHorizontalPart::*)(), 3>;
using ExecuteAndFinalizeHorizontalPartSubtasks = std::array<bool(ExecuteAndFinalizeHorizontalPart::*)()const, 3>;
const ExecuteAndFinalizeHorizontalPartSubtasks subtasks
{
@ -289,10 +289,10 @@ private:
void calculateProjections(const Block & block) const;
void finalizeProjections() const;
void constructTaskForProjectionPartsMerge() const;
bool executeMergeProjections();
bool executeMergeProjections() const;
MergeAlgorithm chooseMergeAlgorithm() const;
void createMergedStream();
void createMergedStream() const;
void extractMergingAndGatheringColumns() const;
void setRuntimeContext(StageRuntimeContextPtr local, StageRuntimeContextPtr global) override
@ -334,7 +334,16 @@ private:
Float64 progress_before = 0;
std::unique_ptr<MergedColumnOnlyOutputStream> column_to{nullptr};
std::optional<Pipe> prepared_pipe;
/// Used for prefetching. Right before starting merge of a column we create a pipeline for the next column
/// and it initiates prefetching of the first range of that column.
struct PreparedColumnPipeline
{
QueryPipeline pipeline;
MergeTreeIndices indexes_to_recalc;
};
std::optional<PreparedColumnPipeline> prepared_pipeline;
size_t max_delayed_streams = 0;
bool use_prefetch = false;
std::list<std::unique_ptr<MergedColumnOnlyOutputStream>> delayed_streams;
@ -379,7 +388,7 @@ private:
bool executeVerticalMergeForOneColumn() const;
void finalizeVerticalMergeForOneColumn() const;
Pipe createPipeForReadingOneColumn(const String & column_name) const;
VerticalMergeRuntimeContext::PreparedColumnPipeline createPipelineForReadingOneColumn(const String & column_name) const;
VerticalMergeRuntimeContextPtr ctx;
GlobalRuntimeContextPtr global_ctx;

View File

@ -354,8 +354,11 @@ public:
MergeTreeData::DataPartPtr data_part_,
AlterConversionsPtr alter_conversions_,
Names columns_to_read_,
std::shared_ptr<std::atomic<size_t>> filtered_rows_count_,
bool apply_deleted_mask_,
std::optional<ActionsDAG> filter_,
bool read_with_direct_io_,
bool prefetch_,
ContextPtr context_,
LoggerPtr log_)
: ISourceStep(DataStream{.header = storage_snapshot_->getSampleBlockForColumns(columns_to_read_)})
@ -365,8 +368,11 @@ public:
, data_part(std::move(data_part_))
, alter_conversions(std::move(alter_conversions_))
, columns_to_read(std::move(columns_to_read_))
, filtered_rows_count(std::move(filtered_rows_count_))
, apply_deleted_mask(apply_deleted_mask_)
, filter(std::move(filter_))
, read_with_direct_io(read_with_direct_io_)
, prefetch(prefetch_)
, context(std::move(context_))
, log(log_)
{
@ -410,25 +416,28 @@ public:
alter_conversions,
columns_to_read,
std::move(mark_ranges),
/*filtered_rows_count=*/ nullptr,
filtered_rows_count,
apply_deleted_mask,
/*read_with_direct_io=*/ false,
/*prefetch=*/ false);
read_with_direct_io,
prefetch);
pipeline.init(Pipe(std::move(source)));
}
private:
MergeTreeSequentialSourceType type;
const MergeTreeSequentialSourceType type;
const MergeTreeData & storage;
StorageSnapshotPtr storage_snapshot;
MergeTreeData::DataPartPtr data_part;
AlterConversionsPtr alter_conversions;
Names columns_to_read;
bool apply_deleted_mask;
std::optional<ActionsDAG> filter;
ContextPtr context;
LoggerPtr log;
const StorageSnapshotPtr storage_snapshot;
const MergeTreeData::DataPartPtr data_part;
const AlterConversionsPtr alter_conversions;
const Names columns_to_read;
const std::shared_ptr<std::atomic<size_t>> filtered_rows_count;
const bool apply_deleted_mask;
const std::optional<ActionsDAG> filter;
const bool read_with_direct_io;
const bool prefetch;
const ContextPtr context;
const LoggerPtr log;
};
void createReadFromPartStep(
@ -439,16 +448,28 @@ void createReadFromPartStep(
MergeTreeData::DataPartPtr data_part,
AlterConversionsPtr alter_conversions,
Names columns_to_read,
std::shared_ptr<std::atomic<size_t>> filtered_rows_count,
bool apply_deleted_mask,
std::optional<ActionsDAG> filter,
bool read_with_direct_io,
bool prefetch,
ContextPtr context,
LoggerPtr log)
{
auto reading = std::make_unique<ReadFromPart>(type,
storage, storage_snapshot,
std::move(data_part), std::move(alter_conversions),
std::move(columns_to_read), apply_deleted_mask,
std::move(filter), std::move(context), log);
auto reading = std::make_unique<ReadFromPart>(
type,
storage,
storage_snapshot,
std::move(data_part),
std::move(alter_conversions),
std::move(columns_to_read),
filtered_rows_count,
apply_deleted_mask,
std::move(filter),
read_with_direct_io,
prefetch,
std::move(context),
log);
plan.addStep(std::move(reading));
}

View File

@ -39,8 +39,11 @@ void createReadFromPartStep(
MergeTreeData::DataPartPtr data_part,
AlterConversionsPtr alter_conversions,
Names columns_to_read,
std::shared_ptr<std::atomic<size_t>> filtered_rows_count,
bool apply_deleted_mask,
std::optional<ActionsDAG> filter,
bool read_with_direct_io,
bool prefetch,
ContextPtr context,
LoggerPtr log);

View File

@ -0,0 +1,4 @@
key Sparse
value Sparse
1000
1

View File

@ -0,0 +1,45 @@
DROP TABLE IF EXISTS t_bloom_filter;
CREATE TABLE t_bloom_filter(
key UInt64,
value UInt64,
INDEX key_bf key TYPE bloom_filter(0.01) GRANULARITY 2147483648, -- bloom filter on sorting key column
INDEX value_bf value TYPE bloom_filter(0.01) GRANULARITY 2147483648 -- bloom filter on no-sorting column
) ENGINE=MergeTree ORDER BY key
SETTINGS
-- settings to trigger sparse serialization and vertical merge
ratio_of_defaults_for_sparse_serialization = 0.0
,vertical_merge_algorithm_min_rows_to_activate = 1
,vertical_merge_algorithm_min_columns_to_activate = 1
,allow_vertical_merges_from_compact_to_wide_parts = 1
,min_bytes_for_wide_part=0
;
SYSTEM STOP MERGES t_bloom_filter;
-- Create at least one part
INSERT INTO t_bloom_filter
SELECT
number % 100 as key, -- 100 unique keys
rand() % 100 as value -- 100 unique values
FROM numbers(50_000);
-- And another part
INSERT INTO t_bloom_filter
SELECT
number % 100 as key, -- 100 unique keys
rand() % 100 as value -- 100 unique values
FROM numbers(50_000, 50_000);
SYSTEM START MERGES t_bloom_filter;
-- Merge everything into a single part
OPTIMIZE TABLE t_bloom_filter FINAL;
-- Check sparse serialization
SELECT column, serialization_kind FROM system.parts_columns WHERE database = currentDatabase() AND table = 't_bloom_filter' AND active ORDER BY column;
SELECT COUNT() FROM t_bloom_filter WHERE key = 1;
-- Check bloom filter non-zero size
SELECT COUNT() FROM system.parts WHERE database = currentDatabase() AND table = 't_bloom_filter' AND secondary_indices_uncompressed_bytes > 200 AND active;