diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index a7bcaa25ddc..0beeffcb267 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -188,19 +188,19 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu std::set key_columns(sort_key_columns_vec.cbegin(), sort_key_columns_vec.cend()); /// Force sign column for Collapsing mode - if (ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing) - key_columns.emplace(ctx->merging_params.sign_column); + if (global_ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing) + key_columns.emplace(global_ctx->merging_params.sign_column); /// Force version column for Replacing mode - if (ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing) + if (global_ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing) { - key_columns.emplace(ctx->merging_params.is_deleted_column); - key_columns.emplace(ctx->merging_params.version_column); + key_columns.emplace(global_ctx->merging_params.is_deleted_column); + key_columns.emplace(global_ctx->merging_params.version_column); } /// Force sign column for VersionedCollapsing mode. Version is already in primary key. - if (ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing) - key_columns.emplace(ctx->merging_params.sign_column); + if (global_ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing) + key_columns.emplace(global_ctx->merging_params.sign_column); /// Force to merge at least one column in case of empty key if (key_columns.empty()) @@ -269,7 +269,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const // E.g. `proj_a.proj` for a normal projection merge and `proj_a.tmp_proj` for a projection materialization merge. local_tmp_prefix = global_ctx->parent_part ? "" : "tmp_merge_"; } - const String local_tmp_suffix = global_ctx->parent_part ? ctx->suffix : ""; + + const String local_tmp_suffix = global_ctx->parent_part ? global_ctx->suffix : ""; if (global_ctx->merges_blocker->isCancelled() || global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed)) throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts"); @@ -294,7 +295,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const LOG_DEBUG(ctx->log, "DEDUPLICATE BY ('{}')", fmt::join(global_ctx->deduplicate_by_columns, "', '")); } - ctx->disk = global_ctx->space_reservation->getDisk(); + global_ctx->disk = global_ctx->space_reservation->getDisk(); auto local_tmp_part_basename = local_tmp_prefix + global_ctx->future_part->name + local_tmp_suffix; std::optional builder; @@ -306,7 +307,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const } else { - auto local_single_disk_volume = std::make_shared("volume_" + global_ctx->future_part->name, ctx->disk, 0); + auto local_single_disk_volume = std::make_shared("volume_" + global_ctx->future_part->name, global_ctx->disk, 0); builder.emplace(global_ctx->data->getDataPartBuilder(global_ctx->future_part->name, local_single_disk_volume, local_tmp_part_basename)); builder->withPartStorageType(global_ctx->future_part->part_format.storage_type); } @@ -617,9 +618,9 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRe const bool merge_may_reduce_rows = global_ctx->cleanup || global_ctx->deduplicate || - ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing || - ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing || - ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing; + global_ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing || + global_ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing || + global_ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing; const auto & projections = global_ctx->metadata_snapshot->getProjections(); @@ -1656,7 +1657,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const merge_parts_query_plan.getCurrentDataStream(), sort_description, partition_key_columns, - ctx->merging_params, + global_ctx->merging_params, (is_vertical_merge ? RowsSourcesTemporaryFile::FILE_ID : ""), /// rows_sources temporaty file is used only for vertical merge data_settings->merge_max_block_size, data_settings->merge_max_block_size_bytes, @@ -1767,10 +1768,10 @@ MergeAlgorithm MergeTask::ExecuteAndFinalizeHorizontalPart::chooseMergeAlgorithm } bool is_supported_storage = - ctx->merging_params.mode == MergeTreeData::MergingParams::Ordinary || - ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing || - ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing || - ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing; + global_ctx->merging_params.mode == MergeTreeData::MergingParams::Ordinary || + global_ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing || + global_ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing || + global_ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing; bool enough_ordinary_cols = global_ctx->gathering_columns.size() >= data_settings->vertical_merge_algorithm_min_columns_to_activate; diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index ef187e1de2f..29b5c4452e7 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -101,6 +101,7 @@ public: global_ctx->context = std::move(context_); global_ctx->holder = &holder; global_ctx->space_reservation = std::move(space_reservation_); + global_ctx->disk = global_ctx->space_reservation->getDisk(); global_ctx->deduplicate = std::move(deduplicate_); global_ctx->deduplicate_by_columns = std::move(deduplicate_by_columns_); global_ctx->cleanup = std::move(cleanup_); @@ -111,12 +112,10 @@ public: global_ctx->ttl_merges_blocker = std::move(ttl_merges_blocker_); global_ctx->txn = std::move(txn); global_ctx->need_prefix = need_prefix; + global_ctx->suffix = std::move(suffix_); + global_ctx->merging_params = std::move(merging_params_); auto prepare_stage_ctx = std::make_shared(); - - prepare_stage_ctx->suffix = std::move(suffix_); - prepare_stage_ctx->merging_params = std::move(merging_params_); - (*stages.begin())->setRuntimeContext(std::move(prepare_stage_ctx), global_ctx); } @@ -173,6 +172,7 @@ private: ContextPtr context{nullptr}; time_t time_of_merge{0}; ReservationSharedPtr space_reservation{nullptr}; + DiskPtr disk{nullptr}; bool deduplicate{false}; Names deduplicate_by_columns{}; bool cleanup{false}; @@ -211,6 +211,8 @@ private: MergeTreeTransactionPtr txn; bool need_prefix; + String suffix; + MergeTreeData::MergingParams merging_params{}; scope_guard temporary_directory_lock; UInt64 prev_elapsed_ms{0}; @@ -223,12 +225,6 @@ private: /// Proper initialization is responsibility of the author struct ExecuteAndFinalizeHorizontalPartRuntimeContext : public IStageRuntimeContext { - /// Dependencies - String suffix; - bool need_prefix; - MergeTreeData::MergingParams merging_params{}; - - DiskPtr disk{nullptr}; bool need_remove_expired_values{false}; bool force_ttl{false}; CompressionCodecPtr compression_codec{nullptr}; @@ -263,7 +259,6 @@ private: using ExecuteAndFinalizeHorizontalPartRuntimeContextPtr = std::shared_ptr; - struct ExecuteAndFinalizeHorizontalPart : public IStage { bool execute() override; @@ -352,7 +347,6 @@ private: using VerticalMergeRuntimeContextPtr = std::shared_ptr; - struct VerticalMergeStage : public IStage { bool execute() override;