From 23d3d894e683c1a2d61a2a0c2d315d5477d8c55c Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Fri, 24 Sep 2021 21:57:44 +0800 Subject: [PATCH] Fix projection merges and mutations. --- .../MergeTree/MergeFromLogEntryTask.cpp | 1 + src/Storages/MergeTree/MergeList.h | 2 + .../MergeTree/MergePlainMergeTreeTask.cpp | 1 + src/Storages/MergeTree/MergeProgress.h | 14 ++-- src/Storages/MergeTree/MergeTask.cpp | 64 +++++++++---------- src/Storages/MergeTree/MergeTask.h | 12 +++- .../MergeTree/MergeTreeDataMergerMutator.cpp | 13 ++-- .../MergeTree/MergeTreeDataMergerMutator.h | 1 + src/Storages/MergeTree/MutateTask.cpp | 5 +- ...01710_projection_vertical_merges.reference | 0 .../01710_projection_vertical_merges.sql | 19 ++++++ 11 files changed, 81 insertions(+), 51 deletions(-) create mode 100644 tests/queries/0_stateless/01710_projection_vertical_merges.reference create mode 100644 tests/queries/0_stateless/01710_projection_vertical_merges.sql diff --git a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp index d9d691ab2e7..3a495fccc0e 100644 --- a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp @@ -194,6 +194,7 @@ std::pair MergeFromLogEntryT future_merged_part, metadata_snapshot, merge_mutate_entry.get(), + {} /* projection_merge_list_element */, table_lock_holder, entry.create_time, storage.getContext(), diff --git a/src/Storages/MergeTree/MergeList.h b/src/Storages/MergeTree/MergeList.h index bab4420de83..24df4ba5e42 100644 --- a/src/Storages/MergeTree/MergeList.h +++ b/src/Storages/MergeTree/MergeList.h @@ -114,6 +114,8 @@ struct MergeListElement : boost::noncopyable MergeInfo getInfo() const; + MergeListElement * ptr() { return this; } + ~MergeListElement(); }; diff --git a/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp b/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp index e8770fdc76e..d52ffe32f7f 100644 --- a/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp +++ b/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp @@ -95,6 +95,7 @@ void MergePlainMergeTreeTask::prepare() future_part, metadata_snapshot, merge_list_entry.get(), + {} /* projection_merge_list_element */, table_lock_holder, time(nullptr), storage.getContext(), diff --git a/src/Storages/MergeTree/MergeProgress.h b/src/Storages/MergeTree/MergeProgress.h index 2862a934411..d21edac76df 100644 --- a/src/Storages/MergeTree/MergeProgress.h +++ b/src/Storages/MergeTree/MergeProgress.h @@ -47,21 +47,21 @@ class MergeProgressCallback { public: MergeProgressCallback( - MergeList::Entry & merge_entry_, UInt64 & watch_prev_elapsed_, MergeStageProgress & stage_) - : merge_entry(merge_entry_) + MergeListElement * merge_list_element_ptr_, UInt64 & watch_prev_elapsed_, MergeStageProgress & stage_) + : merge_list_element_ptr(merge_list_element_ptr_) , watch_prev_elapsed(watch_prev_elapsed_) , stage(stage_) { updateWatch(); } - MergeList::Entry & merge_entry; + MergeListElement * merge_list_element_ptr; UInt64 & watch_prev_elapsed; MergeStageProgress & stage; void updateWatch() { - UInt64 watch_curr_elapsed = merge_entry->watch.elapsed(); + UInt64 watch_curr_elapsed = merge_list_element_ptr->watch.elapsed(); ProfileEvents::increment(ProfileEvents::MergesTimeMilliseconds, (watch_curr_elapsed - watch_prev_elapsed) / 1000000); watch_prev_elapsed = watch_curr_elapsed; } @@ -76,15 +76,15 @@ public: } updateWatch(); - merge_entry->bytes_read_uncompressed += value.read_bytes; + merge_list_element_ptr->bytes_read_uncompressed += value.read_bytes; if (stage.is_first) - merge_entry->rows_read += value.read_rows; + merge_list_element_ptr->rows_read += value.read_rows; stage.total_rows += value.total_rows_to_read; stage.rows_read += value.read_rows; if (stage.total_rows > 0) { - merge_entry->progress.store( + merge_list_element_ptr->progress.store( stage.initial_progress + stage.weight * stage.rows_read / stage.total_rows, std::memory_order_relaxed); } diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 855198f697e..c22d83275a4 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -141,7 +141,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() global_ctx->future_part->part_info, local_single_disk_volume, local_tmp_prefix + global_ctx->future_part->name + (global_ctx->parent_part ? ".proj" : ""), - global_ctx->parent_part.get()); + global_ctx->parent_part); global_ctx->new_data_part->uuid = global_ctx->future_part->uuid; global_ctx->new_data_part->setColumns(global_ctx->storage_columns); @@ -171,10 +171,10 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() ctx->need_remove_expired_values = false; } - ctx->sum_input_rows_upper_bound = (*global_ctx->merge_entry)->total_rows_count; - ctx->sum_compressed_bytes_upper_bound = (*global_ctx->merge_entry)->total_size_bytes_compressed; + ctx->sum_input_rows_upper_bound = global_ctx->merge_list_element_ptr->total_rows_count; + ctx->sum_compressed_bytes_upper_bound = global_ctx->merge_list_element_ptr->total_size_bytes_compressed; global_ctx->chosen_merge_algorithm = chooseMergeAlgorithm(); - (*global_ctx->merge_entry)->merge_algorithm.store(global_ctx->chosen_merge_algorithm, std::memory_order_relaxed); + global_ctx->merge_list_element_ptr->merge_algorithm.store(global_ctx->chosen_merge_algorithm, std::memory_order_relaxed); LOG_DEBUG(ctx->log, "Selected MergeAlgorithm: {}", toString(global_ctx->chosen_merge_algorithm)); @@ -184,7 +184,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() /// the order is reverse. This annoys TSan even though one lock is locked in shared mode and thus /// deadlock is impossible. ctx->compression_codec = global_ctx->data->getCompressionCodecForPart( - (*global_ctx->merge_entry)->total_size_bytes_compressed, global_ctx->new_data_part->ttl_infos, global_ctx->time_of_merge); + global_ctx->merge_list_element_ptr->total_size_bytes_compressed, global_ctx->new_data_part->ttl_infos, global_ctx->time_of_merge); ctx->tmp_disk = global_ctx->context->getTemporaryVolume()->getDisk(); @@ -307,8 +307,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() const_cast(*global_ctx->to).write(block); - (*global_ctx->merge_entry)->rows_written = global_ctx->merged_stream->getProfileInfo().rows; - (*global_ctx->merge_entry)->bytes_written_uncompressed = global_ctx->merged_stream->getProfileInfo().bytes; + global_ctx->merge_list_element_ptr->rows_written = global_ctx->merged_stream->getProfileInfo().rows; + global_ctx->merge_list_element_ptr->bytes_written_uncompressed = global_ctx->merged_stream->getProfileInfo().bytes; /// Reservation updates is not performed yet, during the merge it may lead to higher free space requirements if (global_ctx->space_reservation && ctx->sum_input_rows_upper_bound) @@ -317,7 +317,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() /// But now we are using inaccurate row-based estimation in Horizontal case for backward compatibility Float64 progress = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Horizontal) ? std::min(1., 1. * global_ctx->rows_written / ctx->sum_input_rows_upper_bound) - : std::min(1., (*global_ctx->merge_entry)->progress.load(std::memory_order_relaxed)); + : std::min(1., global_ctx->merge_list_element_ptr->progress.load(std::memory_order_relaxed)); global_ctx->space_reservation->update(static_cast((1. - progress) * ctx->initial_reservation)); } @@ -336,7 +336,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() throw Exception("Cancelled merging parts with expired TTL", ErrorCodes::ABORTED); const auto data_settings = global_ctx->data->getSettings(); - const size_t sum_compressed_bytes_upper_bound = (*global_ctx->merge_entry)->total_size_bytes_compressed; + const size_t sum_compressed_bytes_upper_bound = global_ctx->merge_list_element_ptr->total_size_bytes_compressed; ctx->need_sync = needSyncPart(ctx->sum_input_rows_upper_bound, sum_compressed_bytes_upper_bound, *data_settings); return false; @@ -349,9 +349,9 @@ bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const if (global_ctx->chosen_merge_algorithm != MergeAlgorithm::Vertical) return false; - size_t sum_input_rows_exact = (*global_ctx->merge_entry)->rows_read; - (*global_ctx->merge_entry)->columns_written = global_ctx->merging_column_names.size(); - (*global_ctx->merge_entry)->progress.store(ctx->column_sizes->keyColumnsWeight(), std::memory_order_relaxed); + size_t sum_input_rows_exact = global_ctx->merge_list_element_ptr->rows_read; + global_ctx->merge_list_element_ptr->columns_written = global_ctx->merging_column_names.size(); + global_ctx->merge_list_element_ptr->progress.store(ctx->column_sizes->keyColumnsWeight(), std::memory_order_relaxed); ctx->column_part_streams = BlockInputStreams(global_ctx->future_part->parts.size()); @@ -385,7 +385,7 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const const String & column_name = ctx->it_name_and_type->name; Names column_names{column_name}; - ctx->progress_before = (*global_ctx->merge_entry)->progress.load(std::memory_order_relaxed); + ctx->progress_before = global_ctx->merge_list_element_ptr->progress.load(std::memory_order_relaxed); global_ctx->column_progress = std::make_unique(ctx->progress_before, ctx->column_sizes->columnWeight(column_name)); @@ -396,7 +396,7 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const /// Dereference unique_ptr column_part_source->setProgressCallback( - MergeProgressCallback(*global_ctx->merge_entry, global_ctx->watch_prev_elapsed, *global_ctx->column_progress)); + MergeProgressCallback(global_ctx->merge_list_element_ptr, global_ctx->watch_prev_elapsed, *global_ctx->column_progress)); QueryPipeline column_part_pipeline; column_part_pipeline.init(Pipe(std::move(column_part_source))); @@ -460,9 +460,9 @@ void MergeTask::VerticalMergeStage::finalizeVerticalMergeForOneColumn() const /// NOTE: 'progress' is modified by single thread, but it may be concurrently read from MergeListElement::getInfo() (StorageSystemMerges). - (*global_ctx->merge_entry)->columns_written += 1; - (*global_ctx->merge_entry)->bytes_written_uncompressed += ctx->column_gathered_stream->getProfileInfo().bytes; - (*global_ctx->merge_entry)->progress.store(ctx->progress_before + ctx->column_sizes->columnWeight(column_name), std::memory_order_relaxed); + global_ctx->merge_list_element_ptr->columns_written += 1; + global_ctx->merge_list_element_ptr->bytes_written_uncompressed += ctx->column_gathered_stream->getProfileInfo().bytes; + global_ctx->merge_list_element_ptr->progress.store(ctx->progress_before + ctx->column_sizes->columnWeight(column_name), std::memory_order_relaxed); /// This is the external cycle increment. ++ctx->column_num_for_vertical_merge; @@ -487,16 +487,16 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c /// Print overall profiling info. NOTE: it may duplicates previous messages { - double elapsed_seconds = (*global_ctx->merge_entry)->watch.elapsedSeconds(); + double elapsed_seconds = global_ctx->merge_list_element_ptr->watch.elapsedSeconds(); LOG_DEBUG(ctx->log, "Merge sorted {} rows, containing {} columns ({} merged, {} gathered) in {} sec., {} rows/sec., {}/sec.", - (*global_ctx->merge_entry)->rows_read, + global_ctx->merge_list_element_ptr->rows_read, global_ctx->all_column_names.size(), global_ctx->merging_column_names.size(), global_ctx->gathering_column_names.size(), elapsed_seconds, - (*global_ctx->merge_entry)->rows_read / elapsed_seconds, - ReadableSize((*global_ctx->merge_entry)->bytes_read_uncompressed / elapsed_seconds)); + global_ctx->merge_list_element_ptr->rows_read / elapsed_seconds, + ReadableSize(global_ctx->merge_list_element_ptr->bytes_read_uncompressed / elapsed_seconds)); } @@ -536,18 +536,18 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c if (projection.type == ProjectionDescription::Type::Aggregate) projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating; - // TODO Should we use a new merge_entry for projection? ctx->tasks_for_projections.emplace_back(std::make_shared( projection_future_part, projection.metadata, global_ctx->merge_entry, + std::make_unique((*global_ctx->merge_entry)->table_id, projection_future_part), global_ctx->time_of_merge, global_ctx->context, global_ctx->space_reservation, global_ctx->deduplicate, global_ctx->deduplicate_by_columns, projection_merging_params, - global_ctx->new_data_part, + global_ctx->new_data_part.get(), "", // empty string for projection global_ctx->data, global_ctx->merges_blocker, @@ -576,21 +576,17 @@ bool MergeTask::MergeProjectionsStage::executeProjections() const bool MergeTask::MergeProjectionsStage::finalizeProjectionsAndWholeMerge() const { - const auto & projections = global_ctx->metadata_snapshot->getProjections(); - - size_t iter = 0; - - for (const auto & projection : projections) + for (const auto & task : ctx->tasks_for_projections) { - auto future = ctx->tasks_for_projections[iter]->getFuture(); - ++iter; - global_ctx->new_data_part->addProjectionPart(projection.name, future.get()); + auto part = task->getFuture().get(); + global_ctx->new_data_part->addProjectionPart(part->name, std::move(part)); } if (global_ctx->chosen_merge_algorithm != MergeAlgorithm::Vertical) global_ctx->to->writeSuffixAndFinalizePart(global_ctx->new_data_part, ctx->need_sync); else - global_ctx->to->writeSuffixAndFinalizePart(global_ctx->new_data_part, ctx->need_sync, &global_ctx->storage_columns, &global_ctx->checksums_gathered_columns); + global_ctx->to->writeSuffixAndFinalizePart( + global_ctx->new_data_part, ctx->need_sync, &global_ctx->storage_columns, &global_ctx->checksums_gathered_columns); global_ctx->promise.set_value(global_ctx->new_data_part); @@ -717,7 +713,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() /// Dereference unique_ptr and pass horizontal_stage_progress by reference input->setProgressCallback( - MergeProgressCallback(*global_ctx->merge_entry, global_ctx->watch_prev_elapsed, *global_ctx->horizontal_stage_progress)); + MergeProgressCallback(global_ctx->merge_list_element_ptr, global_ctx->watch_prev_elapsed, *global_ctx->horizontal_stage_progress)); Pipe pipe(std::move(input)); @@ -822,7 +818,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() MergeAlgorithm MergeTask::ExecuteAndFinalizeHorizontalPart::chooseMergeAlgorithm() const { - const size_t sum_rows_upper_bound = (*global_ctx->merge_entry)->total_rows_count; + const size_t sum_rows_upper_bound = global_ctx->merge_list_element_ptr->total_rows_count; const auto data_settings = global_ctx->data->getSettings(); if (global_ctx->deduplicate) diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index e6bee9a16a3..54b0255fd5c 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -49,13 +49,14 @@ public: FutureMergedMutatedPartPtr future_part_, StorageMetadataPtr metadata_snapshot_, MergeList::Entry * merge_entry_, + std::unique_ptr projection_merge_list_element_, time_t time_of_merge_, ContextPtr context_, ReservationSharedPtr space_reservation_, bool deduplicate_, Names deduplicate_by_columns_, MergeTreeData::MergingParams merging_params_, - MergeTreeDataPartPtr parent_part_, + const IMergeTreeDataPart * parent_part_, String prefix_, MergeTreeData * data_, ActionBlocker * merges_blocker_, @@ -66,6 +67,9 @@ public: global_ctx->future_part = std::move(future_part_); global_ctx->metadata_snapshot = std::move(metadata_snapshot_); global_ctx->merge_entry = std::move(merge_entry_); + global_ctx->projection_merge_list_element = std::move(projection_merge_list_element_); + global_ctx->merge_list_element_ptr + = global_ctx->projection_merge_list_element ? global_ctx->projection_merge_list_element.get() : (*global_ctx->merge_entry)->ptr(); global_ctx->time_of_merge = std::move(time_of_merge_); global_ctx->context = std::move(context_); global_ctx->space_reservation = std::move(space_reservation_); @@ -112,12 +116,16 @@ private: struct GlobalRuntimeContext : public IStageRuntimeContext //-V730 { MergeList::Entry * merge_entry{nullptr}; + /// If not null, use this instead of the global MergeList::Entry. This is for merging projections. + std::unique_ptr projection_merge_list_element; + MergeListElement * merge_list_element_ptr{nullptr}; MergeTreeData * data{nullptr}; ActionBlocker * merges_blocker{nullptr}; ActionBlocker * ttl_merges_blocker{nullptr}; StorageMetadataPtr metadata_snapshot{nullptr}; FutureMergedMutatedPartPtr future_part{nullptr}; - MergeTreeDataPartPtr parent_part{nullptr}; + /// This will be either nullptr or new_data_part, so raw pointer is ok. + const IMergeTreeDataPart * parent_part{nullptr}; ContextPtr context{nullptr}; time_t time_of_merge{0}; ReservationSharedPtr space_reservation{nullptr}; diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index bf3d98e3fcf..0c97598dc37 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -416,29 +416,30 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart( FutureMergedMutatedPartPtr future_part, const StorageMetadataPtr & metadata_snapshot, MergeList::Entry * merge_entry, - TableLockHolder holder, + std::unique_ptr projection_merge_list_element, + TableLockHolder, time_t time_of_merge, ContextPtr context, ReservationSharedPtr space_reservation, bool deduplicate, const Names & deduplicate_by_columns, const MergeTreeData::MergingParams & merging_params, - const IMergeTreeDataPart * /*parent_part*/, - const String & /*prefix*/) + const IMergeTreeDataPart * parent_part, + const String & prefix) { - (void)holder; return std::make_shared( future_part, const_cast(metadata_snapshot), merge_entry, + std::move(projection_merge_list_element), time_of_merge, context, space_reservation, deduplicate, deduplicate_by_columns, merging_params, - nullptr, - "", + parent_part, + prefix, &data, &merges_blocker, &ttl_merges_blocker); diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index cda8cfd2c57..82a7b541369 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -99,6 +99,7 @@ public: FutureMergedMutatedPartPtr future_part, const StorageMetadataPtr & metadata_snapshot, MergeListEntry * merge_entry, + std::unique_ptr projection_merge_list_element, TableLockHolder table_lock_holder, time_t time_of_merge, ContextPtr context, diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index f48a9285c53..0655806bf0e 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -631,8 +631,9 @@ public: LOG_DEBUG(log, "Merged {} parts in level {} to {}", selected_parts.size(), current_level, projection_future_part->name); auto tmp_part_merge_task = ctx->mutator->mergePartsToTemporaryPart( projection_future_part, - ctx->metadata_snapshot, + projection.metadata, ctx->mutate_entry, + std::make_unique((*ctx->mutate_entry)->table_id, projection_future_part), *ctx->holder, ctx->time_of_mutation, ctx->context, @@ -1261,7 +1262,7 @@ bool MutateTask::prepare() ctx->mutation_kind = ctx->interpreter->getMutationKind(); ctx->mutating_stream = ctx->interpreter->execute(); ctx->updated_header = ctx->interpreter->getUpdatedHeader(); - ctx->mutating_stream->setProgressCallback(MergeProgressCallback(*ctx->mutate_entry, ctx->watch_prev_elapsed, *ctx->stage_progress)); + ctx->mutating_stream->setProgressCallback(MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress)); } ctx->single_disk_volume = std::make_shared("volume_" + ctx->future_part->name, ctx->space_reservation->getDisk(), 0); diff --git a/tests/queries/0_stateless/01710_projection_vertical_merges.reference b/tests/queries/0_stateless/01710_projection_vertical_merges.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01710_projection_vertical_merges.sql b/tests/queries/0_stateless/01710_projection_vertical_merges.sql new file mode 100644 index 00000000000..cb06f473b44 --- /dev/null +++ b/tests/queries/0_stateless/01710_projection_vertical_merges.sql @@ -0,0 +1,19 @@ +-- Tags: long + +drop table if exists t; + +create table t (c1 Int64, c2 String, c3 DateTime, c4 Int8, c5 String, c6 String, c7 String, c8 String, c9 String, c10 String, c11 String, c12 String, c13 Int8, c14 Int64, c15 String, c16 String, c17 String, c18 Int64, c19 Int64, c20 Int64) engine MergeTree order by c18; + +insert into t (c1, c18) select number, -number from numbers(2000000); + +alter table t add projection p_norm (select * order by c1); + +optimize table t final; + +alter table t materialize projection p_norm settings mutations_sync = 1; + +set allow_experimental_projection_optimization = 1, max_rows_to_read = 3; + +select c18 from t where c1 < 0; + +drop table t;