mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Backport changes from private to minimize merge conflicts
This commit is contained in:
parent
d4aa06524b
commit
3aef79b5a2
@ -125,19 +125,19 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu
|
|||||||
std::set<String> key_columns(sort_key_columns_vec.cbegin(), sort_key_columns_vec.cend());
|
std::set<String> key_columns(sort_key_columns_vec.cbegin(), sort_key_columns_vec.cend());
|
||||||
|
|
||||||
/// Force sign column for Collapsing mode
|
/// Force sign column for Collapsing mode
|
||||||
if (ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing)
|
if (global_ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing)
|
||||||
key_columns.emplace(ctx->merging_params.sign_column);
|
key_columns.emplace(global_ctx->merging_params.sign_column);
|
||||||
|
|
||||||
/// Force version column for Replacing mode
|
/// Force version column for Replacing mode
|
||||||
if (ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing)
|
if (global_ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing)
|
||||||
{
|
{
|
||||||
key_columns.emplace(ctx->merging_params.is_deleted_column);
|
key_columns.emplace(global_ctx->merging_params.is_deleted_column);
|
||||||
key_columns.emplace(ctx->merging_params.version_column);
|
key_columns.emplace(global_ctx->merging_params.version_column);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Force sign column for VersionedCollapsing mode. Version is already in primary key.
|
/// Force sign column for VersionedCollapsing mode. Version is already in primary key.
|
||||||
if (ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing)
|
if (global_ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing)
|
||||||
key_columns.emplace(ctx->merging_params.sign_column);
|
key_columns.emplace(global_ctx->merging_params.sign_column);
|
||||||
|
|
||||||
/// Force to merge at least one column in case of empty key
|
/// Force to merge at least one column in case of empty key
|
||||||
if (key_columns.empty())
|
if (key_columns.empty())
|
||||||
@ -206,7 +206,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
|
|||||||
// E.g. `proj_a.proj` for a normal projection merge and `proj_a.tmp_proj` for a projection materialization merge.
|
// E.g. `proj_a.proj` for a normal projection merge and `proj_a.tmp_proj` for a projection materialization merge.
|
||||||
local_tmp_prefix = global_ctx->parent_part ? "" : "tmp_merge_";
|
local_tmp_prefix = global_ctx->parent_part ? "" : "tmp_merge_";
|
||||||
}
|
}
|
||||||
const String local_tmp_suffix = global_ctx->parent_part ? ctx->suffix : "";
|
|
||||||
|
const String local_tmp_suffix = global_ctx->parent_part ? global_ctx->suffix : "";
|
||||||
|
|
||||||
if (global_ctx->merges_blocker->isCancelled() || global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed))
|
if (global_ctx->merges_blocker->isCancelled() || global_ctx->merge_list_element_ptr->is_cancelled.load(std::memory_order_relaxed))
|
||||||
throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts");
|
throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts");
|
||||||
@ -231,7 +232,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
|
|||||||
LOG_DEBUG(ctx->log, "DEDUPLICATE BY ('{}')", fmt::join(global_ctx->deduplicate_by_columns, "', '"));
|
LOG_DEBUG(ctx->log, "DEDUPLICATE BY ('{}')", fmt::join(global_ctx->deduplicate_by_columns, "', '"));
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx->disk = global_ctx->space_reservation->getDisk();
|
global_ctx->disk = global_ctx->space_reservation->getDisk();
|
||||||
auto local_tmp_part_basename = local_tmp_prefix + global_ctx->future_part->name + local_tmp_suffix;
|
auto local_tmp_part_basename = local_tmp_prefix + global_ctx->future_part->name + local_tmp_suffix;
|
||||||
|
|
||||||
std::optional<MergeTreeDataPartBuilder> builder;
|
std::optional<MergeTreeDataPartBuilder> builder;
|
||||||
@ -243,7 +244,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() const
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
auto local_single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + global_ctx->future_part->name, ctx->disk, 0);
|
auto local_single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + global_ctx->future_part->name, global_ctx->disk, 0);
|
||||||
builder.emplace(global_ctx->data->getDataPartBuilder(global_ctx->future_part->name, local_single_disk_volume, local_tmp_part_basename));
|
builder.emplace(global_ctx->data->getDataPartBuilder(global_ctx->future_part->name, local_single_disk_volume, local_tmp_part_basename));
|
||||||
builder->withPartStorageType(global_ctx->future_part->part_format.storage_type);
|
builder->withPartStorageType(global_ctx->future_part->part_format.storage_type);
|
||||||
}
|
}
|
||||||
@ -559,9 +560,9 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRe
|
|||||||
const bool merge_may_reduce_rows =
|
const bool merge_may_reduce_rows =
|
||||||
global_ctx->cleanup ||
|
global_ctx->cleanup ||
|
||||||
global_ctx->deduplicate ||
|
global_ctx->deduplicate ||
|
||||||
ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
|
global_ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
|
||||||
ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing ||
|
global_ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing ||
|
||||||
ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing;
|
global_ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing;
|
||||||
|
|
||||||
const auto & projections = global_ctx->metadata_snapshot->getProjections();
|
const auto & projections = global_ctx->metadata_snapshot->getProjections();
|
||||||
|
|
||||||
@ -1616,7 +1617,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
|
|||||||
merge_parts_query_plan.getCurrentDataStream(),
|
merge_parts_query_plan.getCurrentDataStream(),
|
||||||
sort_description,
|
sort_description,
|
||||||
partition_key_columns,
|
partition_key_columns,
|
||||||
ctx->merging_params,
|
global_ctx->merging_params,
|
||||||
ctx->rows_sources_write_buf.get(),
|
ctx->rows_sources_write_buf.get(),
|
||||||
data_settings->merge_max_block_size,
|
data_settings->merge_max_block_size,
|
||||||
data_settings->merge_max_block_size_bytes,
|
data_settings->merge_max_block_size_bytes,
|
||||||
@ -1726,10 +1727,10 @@ MergeAlgorithm MergeTask::ExecuteAndFinalizeHorizontalPart::chooseMergeAlgorithm
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool is_supported_storage =
|
bool is_supported_storage =
|
||||||
ctx->merging_params.mode == MergeTreeData::MergingParams::Ordinary ||
|
global_ctx->merging_params.mode == MergeTreeData::MergingParams::Ordinary ||
|
||||||
ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
|
global_ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
|
||||||
ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing ||
|
global_ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing ||
|
||||||
ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing;
|
global_ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing;
|
||||||
|
|
||||||
bool enough_ordinary_cols = global_ctx->gathering_columns.size() >= data_settings->vertical_merge_algorithm_min_columns_to_activate;
|
bool enough_ordinary_cols = global_ctx->gathering_columns.size() >= data_settings->vertical_merge_algorithm_min_columns_to_activate;
|
||||||
|
|
||||||
|
@ -100,6 +100,7 @@ public:
|
|||||||
global_ctx->context = std::move(context_);
|
global_ctx->context = std::move(context_);
|
||||||
global_ctx->holder = &holder;
|
global_ctx->holder = &holder;
|
||||||
global_ctx->space_reservation = std::move(space_reservation_);
|
global_ctx->space_reservation = std::move(space_reservation_);
|
||||||
|
global_ctx->disk = global_ctx->space_reservation->getDisk();
|
||||||
global_ctx->deduplicate = std::move(deduplicate_);
|
global_ctx->deduplicate = std::move(deduplicate_);
|
||||||
global_ctx->deduplicate_by_columns = std::move(deduplicate_by_columns_);
|
global_ctx->deduplicate_by_columns = std::move(deduplicate_by_columns_);
|
||||||
global_ctx->cleanup = std::move(cleanup_);
|
global_ctx->cleanup = std::move(cleanup_);
|
||||||
@ -110,12 +111,10 @@ public:
|
|||||||
global_ctx->ttl_merges_blocker = std::move(ttl_merges_blocker_);
|
global_ctx->ttl_merges_blocker = std::move(ttl_merges_blocker_);
|
||||||
global_ctx->txn = std::move(txn);
|
global_ctx->txn = std::move(txn);
|
||||||
global_ctx->need_prefix = need_prefix;
|
global_ctx->need_prefix = need_prefix;
|
||||||
|
global_ctx->suffix = std::move(suffix_);
|
||||||
|
global_ctx->merging_params = std::move(merging_params_);
|
||||||
|
|
||||||
auto prepare_stage_ctx = std::make_shared<ExecuteAndFinalizeHorizontalPartRuntimeContext>();
|
auto prepare_stage_ctx = std::make_shared<ExecuteAndFinalizeHorizontalPartRuntimeContext>();
|
||||||
|
|
||||||
prepare_stage_ctx->suffix = std::move(suffix_);
|
|
||||||
prepare_stage_ctx->merging_params = std::move(merging_params_);
|
|
||||||
|
|
||||||
(*stages.begin())->setRuntimeContext(std::move(prepare_stage_ctx), global_ctx);
|
(*stages.begin())->setRuntimeContext(std::move(prepare_stage_ctx), global_ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -172,6 +171,7 @@ private:
|
|||||||
ContextPtr context{nullptr};
|
ContextPtr context{nullptr};
|
||||||
time_t time_of_merge{0};
|
time_t time_of_merge{0};
|
||||||
ReservationSharedPtr space_reservation{nullptr};
|
ReservationSharedPtr space_reservation{nullptr};
|
||||||
|
DiskPtr disk{nullptr};
|
||||||
bool deduplicate{false};
|
bool deduplicate{false};
|
||||||
Names deduplicate_by_columns{};
|
Names deduplicate_by_columns{};
|
||||||
bool cleanup{false};
|
bool cleanup{false};
|
||||||
@ -210,6 +210,8 @@ private:
|
|||||||
|
|
||||||
MergeTreeTransactionPtr txn;
|
MergeTreeTransactionPtr txn;
|
||||||
bool need_prefix;
|
bool need_prefix;
|
||||||
|
String suffix;
|
||||||
|
MergeTreeData::MergingParams merging_params{};
|
||||||
|
|
||||||
scope_guard temporary_directory_lock;
|
scope_guard temporary_directory_lock;
|
||||||
UInt64 prev_elapsed_ms{0};
|
UInt64 prev_elapsed_ms{0};
|
||||||
@ -222,13 +224,7 @@ private:
|
|||||||
/// Proper initialization is responsibility of the author
|
/// Proper initialization is responsibility of the author
|
||||||
struct ExecuteAndFinalizeHorizontalPartRuntimeContext : public IStageRuntimeContext
|
struct ExecuteAndFinalizeHorizontalPartRuntimeContext : public IStageRuntimeContext
|
||||||
{
|
{
|
||||||
/// Dependencies
|
|
||||||
String suffix;
|
|
||||||
bool need_prefix;
|
|
||||||
MergeTreeData::MergingParams merging_params{};
|
|
||||||
|
|
||||||
TemporaryDataOnDiskPtr tmp_disk{nullptr};
|
TemporaryDataOnDiskPtr tmp_disk{nullptr};
|
||||||
DiskPtr disk{nullptr};
|
|
||||||
bool need_remove_expired_values{false};
|
bool need_remove_expired_values{false};
|
||||||
bool force_ttl{false};
|
bool force_ttl{false};
|
||||||
CompressionCodecPtr compression_codec{nullptr};
|
CompressionCodecPtr compression_codec{nullptr};
|
||||||
@ -264,7 +260,6 @@ private:
|
|||||||
|
|
||||||
using ExecuteAndFinalizeHorizontalPartRuntimeContextPtr = std::shared_ptr<ExecuteAndFinalizeHorizontalPartRuntimeContext>;
|
using ExecuteAndFinalizeHorizontalPartRuntimeContextPtr = std::shared_ptr<ExecuteAndFinalizeHorizontalPartRuntimeContext>;
|
||||||
|
|
||||||
|
|
||||||
struct ExecuteAndFinalizeHorizontalPart : public IStage
|
struct ExecuteAndFinalizeHorizontalPart : public IStage
|
||||||
{
|
{
|
||||||
bool execute() override;
|
bool execute() override;
|
||||||
@ -356,7 +351,6 @@ private:
|
|||||||
|
|
||||||
using VerticalMergeRuntimeContextPtr = std::shared_ptr<VerticalMergeRuntimeContext>;
|
using VerticalMergeRuntimeContextPtr = std::shared_ptr<VerticalMergeRuntimeContext>;
|
||||||
|
|
||||||
|
|
||||||
struct VerticalMergeStage : public IStage
|
struct VerticalMergeStage : public IStage
|
||||||
{
|
{
|
||||||
bool execute() override;
|
bool execute() override;
|
||||||
|
Loading…
Reference in New Issue
Block a user