Merge pull request #69525 from ClickHouse/backport_mergetask

Backport MergeTask changes from private to minimize merge conflicts
This commit is contained in:
Alexander Gololobov 2024-09-12 19:52:10 +00:00 committed by GitHub
commit 09f22920d9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 25 additions and 30 deletions

View File

@ -188,19 +188,19 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu
std::set<String> key_columns(sort_key_columns_vec.cbegin(), sort_key_columns_vec.cend());
/// Force sign column for Collapsing mode
if (ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing)
key_columns.emplace(ctx->merging_params.sign_column);
if (global_ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing)
key_columns.emplace(global_ctx->merging_params.sign_column);
/// Force version column for Replacing mode
if (ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing)
if (global_ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing)
{
key_columns.emplace(ctx->merging_params.is_deleted_column);
key_columns.emplace(ctx->merging_params.version_column);
key_columns.emplace(global_ctx->merging_params.is_deleted_column);
key_columns.emplace(global_ctx->merging_params.version_column);
}
/// Force sign column for VersionedCollapsing mode. Version is already in primary key.
if (ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing)
key_columns.emplace(ctx->merging_params.sign_column);
if (global_ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing)
key_columns.emplace(global_ctx->merging_params.sign_column);
/// Force to merge at least one column in case of empty key
if (key_columns.empty())
@ -269,7 +269,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
// E.g. `proj_a.proj` for a normal projection merge and `proj_a.tmp_proj` for a projection materialization merge.
local_tmp_prefix = global_ctx->parent_part ? "" : "tmp_merge_";
}
const String local_tmp_suffix = global_ctx->parent_part ? ctx->suffix : "";
const String local_tmp_suffix = global_ctx->parent_part ? global_ctx->suffix : "";
if (global_ctx->merges_blocker->isCancelled() || global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed))
throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts");
@ -294,7 +295,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
LOG_DEBUG(ctx->log, "DEDUPLICATE BY ('{}')", fmt::join(global_ctx->deduplicate_by_columns, "', '"));
}
ctx->disk = global_ctx->space_reservation->getDisk();
global_ctx->disk = global_ctx->space_reservation->getDisk();
auto local_tmp_part_basename = local_tmp_prefix + global_ctx->future_part->name + local_tmp_suffix;
std::optional<MergeTreeDataPartBuilder> builder;
@ -306,7 +307,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
}
else
{
auto local_single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + global_ctx->future_part->name, ctx->disk, 0);
auto local_single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + global_ctx->future_part->name, global_ctx->disk, 0);
builder.emplace(global_ctx->data->getDataPartBuilder(global_ctx->future_part->name, local_single_disk_volume, local_tmp_part_basename));
builder->withPartStorageType(global_ctx->future_part->part_format.storage_type);
}
@ -617,9 +618,9 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRe
const bool merge_may_reduce_rows =
global_ctx->cleanup ||
global_ctx->deduplicate ||
ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing ||
ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing;
global_ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
global_ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing ||
global_ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing;
const auto & projections = global_ctx->metadata_snapshot->getProjections();
@ -1656,7 +1657,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
merge_parts_query_plan.getCurrentDataStream(),
sort_description,
partition_key_columns,
ctx->merging_params,
global_ctx->merging_params,
(is_vertical_merge ? RowsSourcesTemporaryFile::FILE_ID : ""), /// rows_sources temporaty file is used only for vertical merge
data_settings->merge_max_block_size,
data_settings->merge_max_block_size_bytes,
@ -1767,10 +1768,10 @@ MergeAlgorithm MergeTask::ExecuteAndFinalizeHorizontalPart::chooseMergeAlgorithm
}
bool is_supported_storage =
ctx->merging_params.mode == MergeTreeData::MergingParams::Ordinary ||
ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing ||
ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing;
global_ctx->merging_params.mode == MergeTreeData::MergingParams::Ordinary ||
global_ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
global_ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing ||
global_ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing;
bool enough_ordinary_cols = global_ctx->gathering_columns.size() >= data_settings->vertical_merge_algorithm_min_columns_to_activate;

View File

@ -101,6 +101,7 @@ public:
global_ctx->context = std::move(context_);
global_ctx->holder = &holder;
global_ctx->space_reservation = std::move(space_reservation_);
global_ctx->disk = global_ctx->space_reservation->getDisk();
global_ctx->deduplicate = std::move(deduplicate_);
global_ctx->deduplicate_by_columns = std::move(deduplicate_by_columns_);
global_ctx->cleanup = std::move(cleanup_);
@ -111,12 +112,10 @@ public:
global_ctx->ttl_merges_blocker = std::move(ttl_merges_blocker_);
global_ctx->txn = std::move(txn);
global_ctx->need_prefix = need_prefix;
global_ctx->suffix = std::move(suffix_);
global_ctx->merging_params = std::move(merging_params_);
auto prepare_stage_ctx = std::make_shared<ExecuteAndFinalizeHorizontalPartRuntimeContext>();
prepare_stage_ctx->suffix = std::move(suffix_);
prepare_stage_ctx->merging_params = std::move(merging_params_);
(*stages.begin())->setRuntimeContext(std::move(prepare_stage_ctx), global_ctx);
}
@ -173,6 +172,7 @@ private:
ContextPtr context{nullptr};
time_t time_of_merge{0};
ReservationSharedPtr space_reservation{nullptr};
DiskPtr disk{nullptr};
bool deduplicate{false};
Names deduplicate_by_columns{};
bool cleanup{false};
@ -211,6 +211,8 @@ private:
MergeTreeTransactionPtr txn;
bool need_prefix;
String suffix;
MergeTreeData::MergingParams merging_params{};
scope_guard temporary_directory_lock;
UInt64 prev_elapsed_ms{0};
@ -223,12 +225,6 @@ private:
/// Proper initialization is responsibility of the author
struct ExecuteAndFinalizeHorizontalPartRuntimeContext : public IStageRuntimeContext
{
/// Dependencies
String suffix;
bool need_prefix;
MergeTreeData::MergingParams merging_params{};
DiskPtr disk{nullptr};
bool need_remove_expired_values{false};
bool force_ttl{false};
CompressionCodecPtr compression_codec{nullptr};
@ -263,7 +259,6 @@ private:
using ExecuteAndFinalizeHorizontalPartRuntimeContextPtr = std::shared_ptr<ExecuteAndFinalizeHorizontalPartRuntimeContext>;
struct ExecuteAndFinalizeHorizontalPart : public IStage
{
bool execute() override;
@ -352,7 +347,6 @@ private:
using VerticalMergeRuntimeContextPtr = std::shared_ptr<VerticalMergeRuntimeContext>;
struct VerticalMergeStage : public IStage
{
bool execute() override;