From 3aef79b5a265802c536195653469e8d3f88128ec Mon Sep 17 00:00:00 2001 From: Alexander Gololobov Date: Wed, 11 Sep 2024 15:45:55 +0200 Subject: [PATCH] Backport changes from private to minimize merge conflicts --- src/Storages/MergeTree/MergeTask.cpp | 37 ++++++++++++++-------------- src/Storages/MergeTree/MergeTask.h | 18 +++++--------- 2 files changed, 25 insertions(+), 30 deletions(-) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 5c993504245..e04d6afbf7a 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -125,19 +125,19 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu std::set key_columns(sort_key_columns_vec.cbegin(), sort_key_columns_vec.cend()); /// Force sign column for Collapsing mode - if (ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing) - key_columns.emplace(ctx->merging_params.sign_column); + if (global_ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing) + key_columns.emplace(global_ctx->merging_params.sign_column); /// Force version column for Replacing mode - if (ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing) + if (global_ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing) { - key_columns.emplace(ctx->merging_params.is_deleted_column); - key_columns.emplace(ctx->merging_params.version_column); + key_columns.emplace(global_ctx->merging_params.is_deleted_column); + key_columns.emplace(global_ctx->merging_params.version_column); } /// Force sign column for VersionedCollapsing mode. Version is already in primary key. - if (ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing) - key_columns.emplace(ctx->merging_params.sign_column); + if (global_ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing) + key_columns.emplace(global_ctx->merging_params.sign_column); /// Force to merge at least one column in case of empty key if (key_columns.empty()) @@ -206,7 +206,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const // E.g. `proj_a.proj` for a normal projection merge and `proj_a.tmp_proj` for a projection materialization merge. local_tmp_prefix = global_ctx->parent_part ? "" : "tmp_merge_"; } - const String local_tmp_suffix = global_ctx->parent_part ? ctx->suffix : ""; + + const String local_tmp_suffix = global_ctx->parent_part ? global_ctx->suffix : ""; if (global_ctx->merges_blocker->isCancelled() || global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed)) throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts"); @@ -231,7 +232,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const LOG_DEBUG(ctx->log, "DEDUPLICATE BY ('{}')", fmt::join(global_ctx->deduplicate_by_columns, "', '")); } - ctx->disk = global_ctx->space_reservation->getDisk(); + global_ctx->disk = global_ctx->space_reservation->getDisk(); auto local_tmp_part_basename = local_tmp_prefix + global_ctx->future_part->name + local_tmp_suffix; std::optional builder; @@ -243,7 +244,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const } else { - auto local_single_disk_volume = std::make_shared("volume_" + global_ctx->future_part->name, ctx->disk, 0); + auto local_single_disk_volume = std::make_shared("volume_" + global_ctx->future_part->name, global_ctx->disk, 0); builder.emplace(global_ctx->data->getDataPartBuilder(global_ctx->future_part->name, local_single_disk_volume, local_tmp_part_basename)); builder->withPartStorageType(global_ctx->future_part->part_format.storage_type); } @@ -559,9 +560,9 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRe const bool merge_may_reduce_rows = global_ctx->cleanup || global_ctx->deduplicate || - ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing || - ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing || - ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing; + global_ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing || + global_ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing || + global_ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing; const auto & projections = global_ctx->metadata_snapshot->getProjections(); @@ -1616,7 +1617,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const merge_parts_query_plan.getCurrentDataStream(), sort_description, partition_key_columns, - ctx->merging_params, + global_ctx->merging_params, ctx->rows_sources_write_buf.get(), data_settings->merge_max_block_size, data_settings->merge_max_block_size_bytes, @@ -1726,10 +1727,10 @@ MergeAlgorithm MergeTask::ExecuteAndFinalizeHorizontalPart::chooseMergeAlgorithm } bool is_supported_storage = - ctx->merging_params.mode == MergeTreeData::MergingParams::Ordinary || - ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing || - ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing || - ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing; + global_ctx->merging_params.mode == MergeTreeData::MergingParams::Ordinary || + global_ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing || + global_ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing || + global_ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing; bool enough_ordinary_cols = global_ctx->gathering_columns.size() >= data_settings->vertical_merge_algorithm_min_columns_to_activate; diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index 84f00d521fd..fe12fba8aab 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -100,6 +100,7 @@ public: global_ctx->context = std::move(context_); global_ctx->holder = &holder; global_ctx->space_reservation = std::move(space_reservation_); + global_ctx->disk = global_ctx->space_reservation->getDisk(); global_ctx->deduplicate = std::move(deduplicate_); global_ctx->deduplicate_by_columns = std::move(deduplicate_by_columns_); global_ctx->cleanup = std::move(cleanup_); @@ -110,12 +111,10 @@ public: global_ctx->ttl_merges_blocker = std::move(ttl_merges_blocker_); global_ctx->txn = std::move(txn); global_ctx->need_prefix = need_prefix; + global_ctx->suffix = std::move(suffix_); + global_ctx->merging_params = std::move(merging_params_); auto prepare_stage_ctx = std::make_shared(); - - prepare_stage_ctx->suffix = std::move(suffix_); - prepare_stage_ctx->merging_params = std::move(merging_params_); - (*stages.begin())->setRuntimeContext(std::move(prepare_stage_ctx), global_ctx); } @@ -172,6 +171,7 @@ private: ContextPtr context{nullptr}; time_t time_of_merge{0}; ReservationSharedPtr space_reservation{nullptr}; + DiskPtr disk{nullptr}; bool deduplicate{false}; Names deduplicate_by_columns{}; bool cleanup{false}; @@ -210,6 +210,8 @@ private: MergeTreeTransactionPtr txn; bool need_prefix; + String suffix; + MergeTreeData::MergingParams merging_params{}; scope_guard temporary_directory_lock; UInt64 prev_elapsed_ms{0}; @@ -222,13 +224,7 @@ private: /// Proper initialization is responsibility of the author struct ExecuteAndFinalizeHorizontalPartRuntimeContext : public IStageRuntimeContext { - /// Dependencies - String suffix; - bool need_prefix; - MergeTreeData::MergingParams merging_params{}; - TemporaryDataOnDiskPtr tmp_disk{nullptr}; - DiskPtr disk{nullptr}; bool need_remove_expired_values{false}; bool force_ttl{false}; CompressionCodecPtr compression_codec{nullptr}; @@ -264,7 +260,6 @@ private: using ExecuteAndFinalizeHorizontalPartRuntimeContextPtr = std::shared_ptr; - struct ExecuteAndFinalizeHorizontalPart : public IStage { bool execute() override; @@ -356,7 +351,6 @@ private: using VerticalMergeRuntimeContextPtr = std::shared_ptr; - struct VerticalMergeStage : public IStage { bool execute() override;