Fix projection merges and mutations.

This commit is contained in:
Amos Bird 2021-09-24 21:57:44 +08:00
parent af30a17822
commit 23d3d894e6
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
11 changed files with 81 additions and 51 deletions

View File

@ -194,6 +194,7 @@ std::pair<bool, ReplicatedMergeMutateTaskBase::PartLogWriter> MergeFromLogEntryT
future_merged_part,
metadata_snapshot,
merge_mutate_entry.get(),
{} /* projection_merge_list_element */,
table_lock_holder,
entry.create_time,
storage.getContext(),

View File

@ -114,6 +114,8 @@ struct MergeListElement : boost::noncopyable
MergeInfo getInfo() const;
MergeListElement * ptr() { return this; }
~MergeListElement();
};

View File

@ -95,6 +95,7 @@ void MergePlainMergeTreeTask::prepare()
future_part,
metadata_snapshot,
merge_list_entry.get(),
{} /* projection_merge_list_element */,
table_lock_holder,
time(nullptr),
storage.getContext(),

View File

@ -47,21 +47,21 @@ class MergeProgressCallback
{
public:
MergeProgressCallback(
MergeList::Entry & merge_entry_, UInt64 & watch_prev_elapsed_, MergeStageProgress & stage_)
: merge_entry(merge_entry_)
MergeListElement * merge_list_element_ptr_, UInt64 & watch_prev_elapsed_, MergeStageProgress & stage_)
: merge_list_element_ptr(merge_list_element_ptr_)
, watch_prev_elapsed(watch_prev_elapsed_)
, stage(stage_)
{
updateWatch();
}
MergeList::Entry & merge_entry;
MergeListElement * merge_list_element_ptr;
UInt64 & watch_prev_elapsed;
MergeStageProgress & stage;
void updateWatch()
{
UInt64 watch_curr_elapsed = merge_entry->watch.elapsed();
UInt64 watch_curr_elapsed = merge_list_element_ptr->watch.elapsed();
ProfileEvents::increment(ProfileEvents::MergesTimeMilliseconds, (watch_curr_elapsed - watch_prev_elapsed) / 1000000);
watch_prev_elapsed = watch_curr_elapsed;
}
@ -76,15 +76,15 @@ public:
}
updateWatch();
merge_entry->bytes_read_uncompressed += value.read_bytes;
merge_list_element_ptr->bytes_read_uncompressed += value.read_bytes;
if (stage.is_first)
merge_entry->rows_read += value.read_rows;
merge_list_element_ptr->rows_read += value.read_rows;
stage.total_rows += value.total_rows_to_read;
stage.rows_read += value.read_rows;
if (stage.total_rows > 0)
{
merge_entry->progress.store(
merge_list_element_ptr->progress.store(
stage.initial_progress + stage.weight * stage.rows_read / stage.total_rows,
std::memory_order_relaxed);
}

View File

@ -141,7 +141,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
global_ctx->future_part->part_info,
local_single_disk_volume,
local_tmp_prefix + global_ctx->future_part->name + (global_ctx->parent_part ? ".proj" : ""),
global_ctx->parent_part.get());
global_ctx->parent_part);
global_ctx->new_data_part->uuid = global_ctx->future_part->uuid;
global_ctx->new_data_part->setColumns(global_ctx->storage_columns);
@ -171,10 +171,10 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
ctx->need_remove_expired_values = false;
}
ctx->sum_input_rows_upper_bound = (*global_ctx->merge_entry)->total_rows_count;
ctx->sum_compressed_bytes_upper_bound = (*global_ctx->merge_entry)->total_size_bytes_compressed;
ctx->sum_input_rows_upper_bound = global_ctx->merge_list_element_ptr->total_rows_count;
ctx->sum_compressed_bytes_upper_bound = global_ctx->merge_list_element_ptr->total_size_bytes_compressed;
global_ctx->chosen_merge_algorithm = chooseMergeAlgorithm();
(*global_ctx->merge_entry)->merge_algorithm.store(global_ctx->chosen_merge_algorithm, std::memory_order_relaxed);
global_ctx->merge_list_element_ptr->merge_algorithm.store(global_ctx->chosen_merge_algorithm, std::memory_order_relaxed);
LOG_DEBUG(ctx->log, "Selected MergeAlgorithm: {}", toString(global_ctx->chosen_merge_algorithm));
@ -184,7 +184,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
/// the order is reverse. This annoys TSan even though one lock is locked in shared mode and thus
/// deadlock is impossible.
ctx->compression_codec = global_ctx->data->getCompressionCodecForPart(
(*global_ctx->merge_entry)->total_size_bytes_compressed, global_ctx->new_data_part->ttl_infos, global_ctx->time_of_merge);
global_ctx->merge_list_element_ptr->total_size_bytes_compressed, global_ctx->new_data_part->ttl_infos, global_ctx->time_of_merge);
ctx->tmp_disk = global_ctx->context->getTemporaryVolume()->getDisk();
@ -307,8 +307,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
const_cast<MergedBlockOutputStream &>(*global_ctx->to).write(block);
(*global_ctx->merge_entry)->rows_written = global_ctx->merged_stream->getProfileInfo().rows;
(*global_ctx->merge_entry)->bytes_written_uncompressed = global_ctx->merged_stream->getProfileInfo().bytes;
global_ctx->merge_list_element_ptr->rows_written = global_ctx->merged_stream->getProfileInfo().rows;
global_ctx->merge_list_element_ptr->bytes_written_uncompressed = global_ctx->merged_stream->getProfileInfo().bytes;
/// Reservation updates is not performed yet, during the merge it may lead to higher free space requirements
if (global_ctx->space_reservation && ctx->sum_input_rows_upper_bound)
@ -317,7 +317,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
/// But now we are using inaccurate row-based estimation in Horizontal case for backward compatibility
Float64 progress = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Horizontal)
? std::min(1., 1. * global_ctx->rows_written / ctx->sum_input_rows_upper_bound)
: std::min(1., (*global_ctx->merge_entry)->progress.load(std::memory_order_relaxed));
: std::min(1., global_ctx->merge_list_element_ptr->progress.load(std::memory_order_relaxed));
global_ctx->space_reservation->update(static_cast<size_t>((1. - progress) * ctx->initial_reservation));
}
@ -336,7 +336,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
throw Exception("Cancelled merging parts with expired TTL", ErrorCodes::ABORTED);
const auto data_settings = global_ctx->data->getSettings();
const size_t sum_compressed_bytes_upper_bound = (*global_ctx->merge_entry)->total_size_bytes_compressed;
const size_t sum_compressed_bytes_upper_bound = global_ctx->merge_list_element_ptr->total_size_bytes_compressed;
ctx->need_sync = needSyncPart(ctx->sum_input_rows_upper_bound, sum_compressed_bytes_upper_bound, *data_settings);
return false;
@ -349,9 +349,9 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
if (global_ctx->chosen_merge_algorithm != MergeAlgorithm::Vertical)
return false;
size_t sum_input_rows_exact = (*global_ctx->merge_entry)->rows_read;
(*global_ctx->merge_entry)->columns_written = global_ctx->merging_column_names.size();
(*global_ctx->merge_entry)->progress.store(ctx->column_sizes->keyColumnsWeight(), std::memory_order_relaxed);
size_t sum_input_rows_exact = global_ctx->merge_list_element_ptr->rows_read;
global_ctx->merge_list_element_ptr->columns_written = global_ctx->merging_column_names.size();
global_ctx->merge_list_element_ptr->progress.store(ctx->column_sizes->keyColumnsWeight(), std::memory_order_relaxed);
ctx->column_part_streams = BlockInputStreams(global_ctx->future_part->parts.size());
@ -385,7 +385,7 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
const String & column_name = ctx->it_name_and_type->name;
Names column_names{column_name};
ctx->progress_before = (*global_ctx->merge_entry)->progress.load(std::memory_order_relaxed);
ctx->progress_before = global_ctx->merge_list_element_ptr->progress.load(std::memory_order_relaxed);
global_ctx->column_progress = std::make_unique<MergeStageProgress>(ctx->progress_before, ctx->column_sizes->columnWeight(column_name));
@ -396,7 +396,7 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
/// Dereference unique_ptr
column_part_source->setProgressCallback(
MergeProgressCallback(*global_ctx->merge_entry, global_ctx->watch_prev_elapsed, *global_ctx->column_progress));
MergeProgressCallback(global_ctx->merge_list_element_ptr, global_ctx->watch_prev_elapsed, *global_ctx->column_progress));
QueryPipeline column_part_pipeline;
column_part_pipeline.init(Pipe(std::move(column_part_source)));
@ -460,9 +460,9 @@ void MergeTask::VerticalMergeStage::finalizeVerticalMergeForOneColumn() const
/// NOTE: 'progress' is modified by single thread, but it may be concurrently read from MergeListElement::getInfo() (StorageSystemMerges).
(*global_ctx->merge_entry)->columns_written += 1;
(*global_ctx->merge_entry)->bytes_written_uncompressed += ctx->column_gathered_stream->getProfileInfo().bytes;
(*global_ctx->merge_entry)->progress.store(ctx->progress_before + ctx->column_sizes->columnWeight(column_name), std::memory_order_relaxed);
global_ctx->merge_list_element_ptr->columns_written += 1;
global_ctx->merge_list_element_ptr->bytes_written_uncompressed += ctx->column_gathered_stream->getProfileInfo().bytes;
global_ctx->merge_list_element_ptr->progress.store(ctx->progress_before + ctx->column_sizes->columnWeight(column_name), std::memory_order_relaxed);
/// This is the external cycle increment.
++ctx->column_num_for_vertical_merge;
@ -487,16 +487,16 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
/// Print overall profiling info. NOTE: it may duplicates previous messages
{
double elapsed_seconds = (*global_ctx->merge_entry)->watch.elapsedSeconds();
double elapsed_seconds = global_ctx->merge_list_element_ptr->watch.elapsedSeconds();
LOG_DEBUG(ctx->log,
"Merge sorted {} rows, containing {} columns ({} merged, {} gathered) in {} sec., {} rows/sec., {}/sec.",
(*global_ctx->merge_entry)->rows_read,
global_ctx->merge_list_element_ptr->rows_read,
global_ctx->all_column_names.size(),
global_ctx->merging_column_names.size(),
global_ctx->gathering_column_names.size(),
elapsed_seconds,
(*global_ctx->merge_entry)->rows_read / elapsed_seconds,
ReadableSize((*global_ctx->merge_entry)->bytes_read_uncompressed / elapsed_seconds));
global_ctx->merge_list_element_ptr->rows_read / elapsed_seconds,
ReadableSize(global_ctx->merge_list_element_ptr->bytes_read_uncompressed / elapsed_seconds));
}
@ -536,18 +536,18 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
if (projection.type == ProjectionDescription::Type::Aggregate)
projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating;
// TODO Should we use a new merge_entry for projection?
ctx->tasks_for_projections.emplace_back(std::make_shared<MergeTask>(
projection_future_part,
projection.metadata,
global_ctx->merge_entry,
std::make_unique<MergeListElement>((*global_ctx->merge_entry)->table_id, projection_future_part),
global_ctx->time_of_merge,
global_ctx->context,
global_ctx->space_reservation,
global_ctx->deduplicate,
global_ctx->deduplicate_by_columns,
projection_merging_params,
global_ctx->new_data_part,
global_ctx->new_data_part.get(),
"", // empty string for projection
global_ctx->data,
global_ctx->merges_blocker,
@ -576,21 +576,17 @@ bool MergeTask::MergeProjectionsStage::executeProjections() const
bool MergeTask::MergeProjectionsStage::finalizeProjectionsAndWholeMerge() const
{
const auto & projections = global_ctx->metadata_snapshot->getProjections();
size_t iter = 0;
for (const auto & projection : projections)
for (const auto & task : ctx->tasks_for_projections)
{
auto future = ctx->tasks_for_projections[iter]->getFuture();
++iter;
global_ctx->new_data_part->addProjectionPart(projection.name, future.get());
auto part = task->getFuture().get();
global_ctx->new_data_part->addProjectionPart(part->name, std::move(part));
}
if (global_ctx->chosen_merge_algorithm != MergeAlgorithm::Vertical)
global_ctx->to->writeSuffixAndFinalizePart(global_ctx->new_data_part, ctx->need_sync);
else
global_ctx->to->writeSuffixAndFinalizePart(global_ctx->new_data_part, ctx->need_sync, &global_ctx->storage_columns, &global_ctx->checksums_gathered_columns);
global_ctx->to->writeSuffixAndFinalizePart(
global_ctx->new_data_part, ctx->need_sync, &global_ctx->storage_columns, &global_ctx->checksums_gathered_columns);
global_ctx->promise.set_value(global_ctx->new_data_part);
@ -717,7 +713,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
/// Dereference unique_ptr and pass horizontal_stage_progress by reference
input->setProgressCallback(
MergeProgressCallback(*global_ctx->merge_entry, global_ctx->watch_prev_elapsed, *global_ctx->horizontal_stage_progress));
MergeProgressCallback(global_ctx->merge_list_element_ptr, global_ctx->watch_prev_elapsed, *global_ctx->horizontal_stage_progress));
Pipe pipe(std::move(input));
@ -822,7 +818,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
MergeAlgorithm MergeTask::ExecuteAndFinalizeHorizontalPart::chooseMergeAlgorithm() const
{
const size_t sum_rows_upper_bound = (*global_ctx->merge_entry)->total_rows_count;
const size_t sum_rows_upper_bound = global_ctx->merge_list_element_ptr->total_rows_count;
const auto data_settings = global_ctx->data->getSettings();
if (global_ctx->deduplicate)

View File

@ -49,13 +49,14 @@ public:
FutureMergedMutatedPartPtr future_part_,
StorageMetadataPtr metadata_snapshot_,
MergeList::Entry * merge_entry_,
std::unique_ptr<MergeListElement> projection_merge_list_element_,
time_t time_of_merge_,
ContextPtr context_,
ReservationSharedPtr space_reservation_,
bool deduplicate_,
Names deduplicate_by_columns_,
MergeTreeData::MergingParams merging_params_,
MergeTreeDataPartPtr parent_part_,
const IMergeTreeDataPart * parent_part_,
String prefix_,
MergeTreeData * data_,
ActionBlocker * merges_blocker_,
@ -66,6 +67,9 @@ public:
global_ctx->future_part = std::move(future_part_);
global_ctx->metadata_snapshot = std::move(metadata_snapshot_);
global_ctx->merge_entry = std::move(merge_entry_);
global_ctx->projection_merge_list_element = std::move(projection_merge_list_element_);
global_ctx->merge_list_element_ptr
= global_ctx->projection_merge_list_element ? global_ctx->projection_merge_list_element.get() : (*global_ctx->merge_entry)->ptr();
global_ctx->time_of_merge = std::move(time_of_merge_);
global_ctx->context = std::move(context_);
global_ctx->space_reservation = std::move(space_reservation_);
@ -112,12 +116,16 @@ private:
struct GlobalRuntimeContext : public IStageRuntimeContext //-V730
{
MergeList::Entry * merge_entry{nullptr};
/// If not null, use this instead of the global MergeList::Entry. This is for merging projections.
std::unique_ptr<MergeListElement> projection_merge_list_element;
MergeListElement * merge_list_element_ptr{nullptr};
MergeTreeData * data{nullptr};
ActionBlocker * merges_blocker{nullptr};
ActionBlocker * ttl_merges_blocker{nullptr};
StorageMetadataPtr metadata_snapshot{nullptr};
FutureMergedMutatedPartPtr future_part{nullptr};
MergeTreeDataPartPtr parent_part{nullptr};
/// This will be either nullptr or new_data_part, so raw pointer is ok.
const IMergeTreeDataPart * parent_part{nullptr};
ContextPtr context{nullptr};
time_t time_of_merge{0};
ReservationSharedPtr space_reservation{nullptr};

View File

@ -416,29 +416,30 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
FutureMergedMutatedPartPtr future_part,
const StorageMetadataPtr & metadata_snapshot,
MergeList::Entry * merge_entry,
TableLockHolder holder,
std::unique_ptr<MergeListElement> projection_merge_list_element,
TableLockHolder,
time_t time_of_merge,
ContextPtr context,
ReservationSharedPtr space_reservation,
bool deduplicate,
const Names & deduplicate_by_columns,
const MergeTreeData::MergingParams & merging_params,
const IMergeTreeDataPart * /*parent_part*/,
const String & /*prefix*/)
const IMergeTreeDataPart * parent_part,
const String & prefix)
{
(void)holder;
return std::make_shared<MergeTask>(
future_part,
const_cast<StorageMetadataPtr &>(metadata_snapshot),
merge_entry,
std::move(projection_merge_list_element),
time_of_merge,
context,
space_reservation,
deduplicate,
deduplicate_by_columns,
merging_params,
nullptr,
"",
parent_part,
prefix,
&data,
&merges_blocker,
&ttl_merges_blocker);

View File

@ -99,6 +99,7 @@ public:
FutureMergedMutatedPartPtr future_part,
const StorageMetadataPtr & metadata_snapshot,
MergeListEntry * merge_entry,
std::unique_ptr<MergeListElement> projection_merge_list_element,
TableLockHolder table_lock_holder,
time_t time_of_merge,
ContextPtr context,

View File

@ -631,8 +631,9 @@ public:
LOG_DEBUG(log, "Merged {} parts in level {} to {}", selected_parts.size(), current_level, projection_future_part->name);
auto tmp_part_merge_task = ctx->mutator->mergePartsToTemporaryPart(
projection_future_part,
ctx->metadata_snapshot,
projection.metadata,
ctx->mutate_entry,
std::make_unique<MergeListElement>((*ctx->mutate_entry)->table_id, projection_future_part),
*ctx->holder,
ctx->time_of_mutation,
ctx->context,
@ -1261,7 +1262,7 @@ bool MutateTask::prepare()
ctx->mutation_kind = ctx->interpreter->getMutationKind();
ctx->mutating_stream = ctx->interpreter->execute();
ctx->updated_header = ctx->interpreter->getUpdatedHeader();
ctx->mutating_stream->setProgressCallback(MergeProgressCallback(*ctx->mutate_entry, ctx->watch_prev_elapsed, *ctx->stage_progress));
ctx->mutating_stream->setProgressCallback(MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress));
}
ctx->single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + ctx->future_part->name, ctx->space_reservation->getDisk(), 0);

View File

@ -0,0 +1,19 @@
-- Tags: long
drop table if exists t;
create table t (c1 Int64, c2 String, c3 DateTime, c4 Int8, c5 String, c6 String, c7 String, c8 String, c9 String, c10 String, c11 String, c12 String, c13 Int8, c14 Int64, c15 String, c16 String, c17 String, c18 Int64, c19 Int64, c20 Int64) engine MergeTree order by c18;
insert into t (c1, c18) select number, -number from numbers(2000000);
alter table t add projection p_norm (select * order by c1);
optimize table t final;
alter table t materialize projection p_norm settings mutations_sync = 1;
set allow_experimental_projection_optimization = 1, max_rows_to_read = 3;
select c18 from t where c1 < 0;
drop table t;