From cc5456c649a4bd1f95321f4320fadfa382236d1b Mon Sep 17 00:00:00 2001 From: cangyin Date: Sat, 6 Apr 2024 19:38:37 +0000 Subject: [PATCH 01/10] Fix projection merge for Collapsing/Replacing/VersionedCollapsing MergeTree --- .../MergeTree/MergeProjectionPartsTask.cpp | 95 +++++++++++ .../MergeTree/MergeProjectionPartsTask.h | 84 ++++++++++ src/Storages/MergeTree/MergeTask.cpp | 157 +++++++++++++++--- src/Storages/MergeTree/MergeTask.h | 25 ++- .../MergeTree/MergeTreeDataMergerMutator.cpp | 3 +- .../MergeTree/MergeTreeDataMergerMutator.h | 2 +- src/Storages/MergeTree/MutateTask.cpp | 139 +--------------- 7 files changed, 349 insertions(+), 156 deletions(-) create mode 100644 src/Storages/MergeTree/MergeProjectionPartsTask.cpp create mode 100644 src/Storages/MergeTree/MergeProjectionPartsTask.h diff --git a/src/Storages/MergeTree/MergeProjectionPartsTask.cpp b/src/Storages/MergeTree/MergeProjectionPartsTask.cpp new file mode 100644 index 00000000000..bdc2ba8b9ca --- /dev/null +++ b/src/Storages/MergeTree/MergeProjectionPartsTask.cpp @@ -0,0 +1,95 @@ +#include + +#include +#include + +namespace DB +{ + +bool MergeProjectionPartsTask::executeStep() +{ + auto & current_level_parts = level_parts[current_level]; + auto & next_level_parts = level_parts[next_level]; + + MergeTreeData::MutableDataPartsVector selected_parts; + while (selected_parts.size() < max_parts_to_merge_in_one_level && !current_level_parts.empty()) + { + selected_parts.push_back(std::move(current_level_parts.back())); + current_level_parts.pop_back(); + } + + if (selected_parts.empty()) + { + if (next_level_parts.empty()) + { + LOG_WARNING(log, "There is no projection parts merged"); + + /// Task is finished + return false; + } + current_level = next_level; + ++next_level; + } + else if (selected_parts.size() == 1) + { + if (next_level_parts.empty()) + { + LOG_DEBUG(log, "Merged a projection part in level {}", current_level); + selected_parts[0]->renameTo(projection.name + ".proj", true); + selected_parts[0]->setName(projection.name); + selected_parts[0]->is_temp = false; + new_data_part->addProjectionPart(name, std::move(selected_parts[0])); + + /// Task is finished + return false; + } + else + { + LOG_DEBUG(log, "Forwarded part {} in level {} to next level", selected_parts[0]->name, current_level); + next_level_parts.push_back(std::move(selected_parts[0])); + } + } + else if (selected_parts.size() > 1) + { + // Generate a unique part name + ++block_num; + auto projection_future_part = std::make_shared(); + MergeTreeData::DataPartsVector const_selected_parts( + std::make_move_iterator(selected_parts.begin()), std::make_move_iterator(selected_parts.end())); + projection_future_part->assign(std::move(const_selected_parts)); + projection_future_part->name = fmt::format("{}_{}", projection.name, ++block_num); + projection_future_part->part_info = {"all", 0, 0, 0}; + + MergeTreeData::MergingParams projection_merging_params; + projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary; + if (projection.type == ProjectionDescription::Type::Aggregate) + projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating; + + LOG_DEBUG(log, "Merged {} parts in level {} to {}", selected_parts.size(), current_level, projection_future_part->name); + auto tmp_part_merge_task = mutator->mergePartsToTemporaryPart( + projection_future_part, + projection.metadata, + merge_entry, + std::make_unique((*merge_entry)->table_id, projection_future_part, context), + *table_lock_holder, + time_of_merge, + context, + space_reservation, + false, // TODO Do we need deduplicate for projections + {}, + false, // no cleanup + projection_merging_params, + NO_TRANSACTION_PTR, + /* need_prefix */ true, + new_data_part.get(), + ".tmp_proj"); + + next_level_parts.push_back(executeHere(tmp_part_merge_task)); + next_level_parts.back()->is_temp = true; + } + + /// Need execute again + return true; +} + +} diff --git a/src/Storages/MergeTree/MergeProjectionPartsTask.h b/src/Storages/MergeTree/MergeProjectionPartsTask.h new file mode 100644 index 00000000000..47cafe01151 --- /dev/null +++ b/src/Storages/MergeTree/MergeProjectionPartsTask.h @@ -0,0 +1,84 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +class MergeProjectionPartsTask : public IExecutableTask +{ +public: + + MergeProjectionPartsTask( + String name_, + MergeTreeData::MutableDataPartsVector && parts_, + const ProjectionDescription & projection_, + size_t & block_num_, + ContextPtr context_, + TableLockHolder * table_lock_holder_, + MergeTreeDataMergerMutator * mutator_, + MergeListEntry * merge_entry_, + time_t time_of_merge_, + MergeTreeData::MutableDataPartPtr new_data_part_, + ReservationSharedPtr space_reservation_) + : name(std::move(name_)) + , parts(std::move(parts_)) + , projection(projection_) + , block_num(block_num_) + , context(context_) + , table_lock_holder(table_lock_holder_) + , mutator(mutator_) + , merge_entry(merge_entry_) + , time_of_merge(time_of_merge_) + , new_data_part(new_data_part_) + , space_reservation(space_reservation_) + , log(getLogger("MergeProjectionPartsTask")) + { + LOG_DEBUG(log, "Selected {} projection_parts from {} to {}", parts.size(), parts.front()->name, parts.back()->name); + level_parts[current_level] = std::move(parts); + } + + void onCompleted() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } + StorageID getStorageID() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } + Priority getPriority() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } + String getQueryId() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } + + bool executeStep() override; + +private: + String name; + MergeTreeData::MutableDataPartsVector parts; + const ProjectionDescription & projection; + size_t & block_num; + + ContextPtr context; + TableLockHolder * table_lock_holder; + MergeTreeDataMergerMutator * mutator; + MergeListEntry * merge_entry; + time_t time_of_merge; + + MergeTreeData::MutableDataPartPtr new_data_part; + ReservationSharedPtr space_reservation; + + LoggerPtr log; + + std::map level_parts; + size_t current_level = 0; + size_t next_level = 1; + + /// TODO(nikitamikhaylov): make this constant a setting + static constexpr size_t max_parts_to_merge_in_one_level = 10; +}; + +} diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 34e17e40a74..4be1b003573 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -18,6 +18,8 @@ #include #include #include +#include +#include #include #include #include @@ -47,12 +49,12 @@ namespace ErrorCodes extern const int SUPPORT_IS_DISABLED; } - /// PK columns are sorted and merged, ordinary columns are gathered using info from merge step static void extractMergingAndGatheringColumns( const NamesAndTypesList & storage_columns, const ExpressionActionsPtr & sorting_key_expr, const IndicesDescription & indexes, + const std::vector & projections, const MergeTreeData::MergingParams & merging_params, NamesAndTypesList & gathering_columns, Names & gathering_column_names, NamesAndTypesList & merging_columns, Names & merging_column_names) @@ -65,6 +67,12 @@ static void extractMergingAndGatheringColumns( std::copy(index_columns_vec.cbegin(), index_columns_vec.cend(), std::inserter(key_columns, key_columns.end())); } + for (const auto & projection : projections) + { + Names projection_columns_vec = projection->getRequiredColumns(); + std::copy(projection_columns_vec.cbegin(), projection_columns_vec.cend(), + std::inserter(key_columns, key_columns.end())); + } /// Force sign column for Collapsing mode if (merging_params.mode == MergeTreeData::MergingParams::Collapsing) @@ -203,10 +211,13 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() extendObjectColumns(global_ctx->storage_columns, object_columns, false); global_ctx->storage_snapshot = std::make_shared(*global_ctx->data, global_ctx->metadata_snapshot, std::move(object_columns)); + prepareProjectionsToMergeAndRebuild(); + extractMergingAndGatheringColumns( global_ctx->storage_columns, global_ctx->metadata_snapshot->getSortingKey().expression, global_ctx->metadata_snapshot->getSecondaryIndices(), + global_ctx->projections_to_rebuild, ctx->merging_params, global_ctx->gathering_columns, global_ctx->gathering_column_names, @@ -453,6 +464,65 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::execute() } +void MergeTask::ExecuteAndFinalizeHorizontalPart::calculateProjections(const Block & block) const +{ + for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i) + { + const auto & projection = *global_ctx->projections_to_rebuild[i]; + auto projection_block = ctx->projection_squashes[i].add(projection.calculate(block, global_ctx->context)); + if (projection_block) + { + auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart( + *global_ctx->data, ctx->log, projection_block, projection, global_ctx->new_data_part.get(), ++ctx->projection_block_num); + tmp_part.finalize(); + tmp_part.part->getDataPartStorage().commitTransaction(); + ctx->projection_parts[projection.name].emplace_back(std::move(tmp_part.part)); + } + } +} + + +void MergeTask::ExecuteAndFinalizeHorizontalPart::constructTaskForProjectionPartsMerge() const +{ + auto && [name, parts] = *ctx->projection_parts_iterator; + const auto & projection = global_ctx->metadata_snapshot->projections.get(name); + + ctx->merge_projection_parts_task_ptr = std::make_unique + ( + name, + std::move(parts), + projection, + ctx->projection_block_num, + global_ctx->context, + global_ctx->holder, + global_ctx->mutator, + global_ctx->merge_entry, + global_ctx->time_of_merge, + global_ctx->new_data_part, + global_ctx->space_reservation + ); +} + + +bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeMergeProjections() +{ + /// In case if there are no projections we didn't construct a task + if (!ctx->merge_projection_parts_task_ptr) + return false; + + if (ctx->merge_projection_parts_task_ptr->executeStep()) + return true; + + ++ctx->projection_parts_iterator; + + if (ctx->projection_parts_iterator == std::make_move_iterator(ctx->projection_parts.end())) + return false; + + constructTaskForProjectionPartsMerge(); + + return true; +} + bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() { Block block; @@ -462,6 +532,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() const_cast(*global_ctx->to).write(block); + calculateProjections(block); + UInt64 result_rows = 0; UInt64 result_bytes = 0; global_ctx->merged_pipeline.tryGetResultRowsAndBytes(result_rows, result_bytes); @@ -484,6 +556,13 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() return true; } + // finalize projections + calculateProjections(global_ctx->merging_executor->getHeader().cloneEmpty()); + + ctx->projection_parts_iterator = std::make_move_iterator(ctx->projection_parts.begin()); + if (ctx->projection_parts_iterator != std::make_move_iterator(ctx->projection_parts.end())) + constructTaskForProjectionPartsMerge(); + global_ctx->merging_executor.reset(); global_ctx->merged_pipeline.reset(); @@ -732,24 +811,9 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c } - const auto & projections = global_ctx->metadata_snapshot->getProjections(); - - for (const auto & projection : projections) + for (const auto & projection : global_ctx->projections_to_merge) { - MergeTreeData::DataPartsVector projection_parts; - for (const auto & part : global_ctx->future_part->parts) - { - auto actual_projection_parts = part->getProjectionParts(); - auto it = actual_projection_parts.find(projection.name); - if (it != actual_projection_parts.end() && !it->second->is_broken) - projection_parts.push_back(it->second); - } - if (projection_parts.size() < global_ctx->future_part->parts.size()) - { - LOG_DEBUG(ctx->log, "Projection {} is not merged because some parts don't have it", projection.name); - continue; - } - + MergeTreeData::DataPartsVector projection_parts = global_ctx->projections_to_merge_parts[projection->name]; LOG_DEBUG( ctx->log, "Selected {} projection_parts from {} to {}", @@ -759,7 +823,7 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c auto projection_future_part = std::make_shared(); projection_future_part->assign(std::move(projection_parts)); - projection_future_part->name = projection.name; + projection_future_part->name = projection->name; // TODO (ab): path in future_part is only for merge process introspection, which is not available for merges of projection parts. // Let's comment this out to avoid code inconsistency and add it back after we implement projection merge introspection. // projection_future_part->path = global_ctx->future_part->path + "/" + projection.name + ".proj/"; @@ -767,16 +831,17 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c MergeTreeData::MergingParams projection_merging_params; projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary; - if (projection.type == ProjectionDescription::Type::Aggregate) + if (projection->type == ProjectionDescription::Type::Aggregate) projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating; ctx->tasks_for_projections.emplace_back(std::make_shared( projection_future_part, - projection.metadata, + projection->metadata, global_ctx->merge_entry, std::make_unique((*global_ctx->merge_entry)->table_id, projection_future_part, global_ctx->context), global_ctx->time_of_merge, global_ctx->context, + *global_ctx->holder, global_ctx->space_reservation, global_ctx->deduplicate, global_ctx->deduplicate_by_columns, @@ -1136,6 +1201,56 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() global_ctx->merging_executor = std::make_unique(global_ctx->merged_pipeline); } +void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRebuild() const +{ + // These merging modes may or may not reduce number of rows. It's not known until the horizontal stage is finished. + const bool merge_may_reduce_rows = + global_ctx->deduplicate || + ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing || + ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing || + ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing; + + const auto & projections = global_ctx->metadata_snapshot->getProjections(); + + for (const auto & projection : projections) + { + if (merge_may_reduce_rows) + { + global_ctx->projections_to_rebuild.push_back(&projection); + continue; + } + + MergeTreeData::DataPartsVector projection_parts; + for (const auto & part : global_ctx->future_part->parts) + { + auto it = part->getProjectionParts().find(projection.name); + if (it != part->getProjectionParts().end()) + projection_parts.push_back(it->second); + } + if (projection_parts.size() == global_ctx->future_part->parts.size()) + { + global_ctx->projections_to_merge.push_back(&projection); + global_ctx->projections_to_merge_parts[projection.name].assign(projection_parts.begin(), projection_parts.end()); + } + else if (projection_parts.empty()) + { + LOG_DEBUG(ctx->log, "Projection {} will not be merged or rebuilt because all parts don't have it", projection.name); + } + else + { + LOG_DEBUG(ctx->log, "Projection {} will be rebuilt because some parts don't have it", projection.name); + global_ctx->projections_to_rebuild.push_back(&projection); + } + } + + const auto & settings = global_ctx->context->getSettingsRef(); + + for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i) + { + ctx->projection_squashes.emplace_back(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); + } +} + MergeAlgorithm MergeTask::ExecuteAndFinalizeHorizontalPart::chooseMergeAlgorithm() const { diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index f6268886b14..e373b6b33c5 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -64,6 +65,7 @@ public: std::unique_ptr projection_merge_list_element_, time_t time_of_merge_, ContextPtr context_, + TableLockHolder & holder, ReservationSharedPtr space_reservation_, bool deduplicate_, Names deduplicate_by_columns_, @@ -88,6 +90,7 @@ public: = global_ctx->projection_merge_list_element ? global_ctx->projection_merge_list_element.get() : (*global_ctx->merge_entry)->ptr(); global_ctx->time_of_merge = std::move(time_of_merge_); global_ctx->context = std::move(context_); + global_ctx->holder = &holder; global_ctx->space_reservation = std::move(space_reservation_); global_ctx->deduplicate = std::move(deduplicate_); global_ctx->deduplicate_by_columns = std::move(deduplicate_by_columns_); @@ -142,6 +145,7 @@ private: /// Proper initialization is responsibility of the author struct GlobalRuntimeContext : public IStageRuntimeContext { + TableLockHolder * holder; MergeList::Entry * merge_entry{nullptr}; /// If not null, use this instead of the global MergeList::Entry. This is for merging projections. std::unique_ptr projection_merge_list_element; @@ -173,6 +177,10 @@ private: MergeAlgorithm chosen_merge_algorithm{MergeAlgorithm::Undecided}; size_t gathering_column_names_size{0}; + std::vector projections_to_rebuild{}; + std::vector projections_to_merge{}; + std::map projections_to_merge_parts{}; + std::unique_ptr horizontal_stage_progress{nullptr}; std::unique_ptr column_progress{nullptr}; @@ -219,6 +227,14 @@ private: std::unique_ptr rows_sources_write_buf{nullptr}; std::optional column_sizes{}; + // For projections to rebuild + using ProjectionNameToItsBlocks = std::map; + ProjectionNameToItsBlocks projection_parts; + std::move_iterator projection_parts_iterator; + std::vector projection_squashes; + size_t projection_block_num = 0; + ExecutableTaskPtr merge_projection_parts_task_ptr; + size_t initial_reservation{0}; bool read_with_direct_io{false}; @@ -247,16 +263,21 @@ private: bool executeImpl(); /// NOTE: Using pointer-to-member instead of std::function and lambda makes stacktraces much more concise and readable - using ExecuteAndFinalizeHorizontalPartSubtasks = std::array; + using ExecuteAndFinalizeHorizontalPartSubtasks = std::array; const ExecuteAndFinalizeHorizontalPartSubtasks subtasks { &ExecuteAndFinalizeHorizontalPart::prepare, - &ExecuteAndFinalizeHorizontalPart::executeImpl + &ExecuteAndFinalizeHorizontalPart::executeImpl, + &ExecuteAndFinalizeHorizontalPart::executeMergeProjections }; ExecuteAndFinalizeHorizontalPartSubtasks::const_iterator subtasks_iterator = subtasks.begin(); + void prepareProjectionsToMergeAndRebuild() const; + void calculateProjections(const Block & block) const; + void constructTaskForProjectionPartsMerge() const; + bool executeMergeProjections(); MergeAlgorithm chooseMergeAlgorithm() const; void createMergedStream(); diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 53d49b51e8f..765c2c5e428 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -670,7 +670,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart( const StorageMetadataPtr & metadata_snapshot, MergeList::Entry * merge_entry, std::unique_ptr projection_merge_list_element, - TableLockHolder, + TableLockHolder & holder, time_t time_of_merge, ContextPtr context, ReservationSharedPtr space_reservation, @@ -690,6 +690,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart( std::move(projection_merge_list_element), time_of_merge, context, + holder, space_reservation, deduplicate, deduplicate_by_columns, diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 669ee040af3..fc910f401b4 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -159,7 +159,7 @@ public: const StorageMetadataPtr & metadata_snapshot, MergeListEntry * merge_entry, std::unique_ptr projection_merge_list_element, - TableLockHolder table_lock_holder, + TableLockHolder & table_lock_holder, time_t time_of_merge, ContextPtr context, ReservationSharedPtr space_reservation, diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index a971c4fda1c..aaf4e723494 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -1030,136 +1031,6 @@ struct MutationContext using MutationContextPtr = std::shared_ptr; -class MergeProjectionPartsTask : public IExecutableTask -{ -public: - - MergeProjectionPartsTask( - String name_, - MergeTreeData::MutableDataPartsVector && parts_, - const ProjectionDescription & projection_, - size_t & block_num_, - MutationContextPtr ctx_) - : name(std::move(name_)) - , parts(std::move(parts_)) - , projection(projection_) - , block_num(block_num_) - , ctx(ctx_) - , log(getLogger("MergeProjectionPartsTask")) - { - LOG_DEBUG(log, "Selected {} projection_parts from {} to {}", parts.size(), parts.front()->name, parts.back()->name); - level_parts[current_level] = std::move(parts); - } - - void onCompleted() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } - StorageID getStorageID() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } - Priority getPriority() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } - String getQueryId() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } - - bool executeStep() override - { - auto & current_level_parts = level_parts[current_level]; - auto & next_level_parts = level_parts[next_level]; - - MergeTreeData::MutableDataPartsVector selected_parts; - while (selected_parts.size() < max_parts_to_merge_in_one_level && !current_level_parts.empty()) - { - selected_parts.push_back(std::move(current_level_parts.back())); - current_level_parts.pop_back(); - } - - if (selected_parts.empty()) - { - if (next_level_parts.empty()) - { - LOG_WARNING(log, "There is no projection parts merged"); - - /// Task is finished - return false; - } - current_level = next_level; - ++next_level; - } - else if (selected_parts.size() == 1) - { - if (next_level_parts.empty()) - { - LOG_DEBUG(log, "Merged a projection part in level {}", current_level); - selected_parts[0]->renameTo(projection.name + ".proj", true); - selected_parts[0]->setName(projection.name); - selected_parts[0]->is_temp = false; - ctx->new_data_part->addProjectionPart(name, std::move(selected_parts[0])); - - /// Task is finished - return false; - } - else - { - LOG_DEBUG(log, "Forwarded part {} in level {} to next level", selected_parts[0]->name, current_level); - next_level_parts.push_back(std::move(selected_parts[0])); - } - } - else if (selected_parts.size() > 1) - { - // Generate a unique part name - ++block_num; - auto projection_future_part = std::make_shared(); - MergeTreeData::DataPartsVector const_selected_parts( - std::make_move_iterator(selected_parts.begin()), std::make_move_iterator(selected_parts.end())); - projection_future_part->assign(std::move(const_selected_parts)); - projection_future_part->name = fmt::format("{}_{}", projection.name, ++block_num); - projection_future_part->part_info = {"all", 0, 0, 0}; - - MergeTreeData::MergingParams projection_merging_params; - projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary; - if (projection.type == ProjectionDescription::Type::Aggregate) - projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating; - - LOG_DEBUG(log, "Merged {} parts in level {} to {}", selected_parts.size(), current_level, projection_future_part->name); - auto tmp_part_merge_task = ctx->mutator->mergePartsToTemporaryPart( - projection_future_part, - projection.metadata, - ctx->mutate_entry, - std::make_unique((*ctx->mutate_entry)->table_id, projection_future_part, ctx->context), - *ctx->holder, - ctx->time_of_mutation, - ctx->context, - ctx->space_reservation, - false, // TODO Do we need deduplicate for projections - {}, - false, // no cleanup - projection_merging_params, - NO_TRANSACTION_PTR, - /* need_prefix */ true, - ctx->new_data_part.get(), - ".tmp_proj"); - - next_level_parts.push_back(executeHere(tmp_part_merge_task)); - next_level_parts.back()->is_temp = true; - } - - /// Need execute again - return true; - } - -private: - String name; - MergeTreeData::MutableDataPartsVector parts; - const ProjectionDescription & projection; - size_t & block_num; - MutationContextPtr ctx; - - LoggerPtr log; - - std::map level_parts; - size_t current_level = 0; - size_t next_level = 1; - - /// TODO(nikitamikhaylov): make this constant a setting - static constexpr size_t max_parts_to_merge_in_one_level = 10; -}; - - // This class is responsible for: // 1. get projection pipeline and a sink to write parts // 2. build an executor that can write block to the input stream (actually we can write through it to generate as many parts as possible) @@ -1356,7 +1227,13 @@ void PartMergerWriter::constructTaskForProjectionPartsMerge() std::move(parts), projection, block_num, - ctx + ctx->context, + ctx->holder, + ctx->mutator, + ctx->mutate_entry, + ctx->time_of_mutation, + ctx->new_data_part, + ctx->space_reservation ); } From 603a52caa06552c50e3b4d29989e177d2a0a3efa Mon Sep 17 00:00:00 2001 From: cangyin Date: Mon, 8 Apr 2024 12:34:59 +0800 Subject: [PATCH 02/10] Add tests --- .../02968_projection_merge.reference | 28 +++++ .../0_stateless/02968_projection_merge.sql | 112 ++++++++++++++++++ 2 files changed, 140 insertions(+) create mode 100644 tests/queries/0_stateless/02968_projection_merge.reference create mode 100644 tests/queries/0_stateless/02968_projection_merge.sql diff --git a/tests/queries/0_stateless/02968_projection_merge.reference b/tests/queries/0_stateless/02968_projection_merge.reference new file mode 100644 index 00000000000..40cb572c95a --- /dev/null +++ b/tests/queries/0_stateless/02968_projection_merge.reference @@ -0,0 +1,28 @@ +ReplacingMergeTree +0 2 +1 2 +2 2 +0 2 +1 2 +2 2 +CollapsingMergeTree +0 2 +1 2 +2 2 +0 2 +1 2 +2 2 +VersionedCollapsingMergeTree +0 2 +1 2 +2 2 +0 2 +1 2 +2 2 +DEDUPLICATE ON MergeTree +0 1 +1 1 +2 1 +0 1 +1 1 +2 1 diff --git a/tests/queries/0_stateless/02968_projection_merge.sql b/tests/queries/0_stateless/02968_projection_merge.sql new file mode 100644 index 00000000000..07d40e30c2f --- /dev/null +++ b/tests/queries/0_stateless/02968_projection_merge.sql @@ -0,0 +1,112 @@ +SELECT 'ReplacingMergeTree'; +DROP TABLE IF EXISTS tp; +CREATE TABLE tp +( + `type` Int32, + `eventcnt` UInt64, + PROJECTION p + ( + SELECT type,sum(eventcnt) + GROUP BY type + ) +) +ENGINE = ReplacingMergeTree +ORDER BY type; + +INSERT INTO tp SELECT number%3, 1 FROM numbers(3); +INSERT INTO tp SELECT number%3, 2 FROM numbers(3); + +OPTIMIZE TABLE tp FINAL; + +SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type +SETTINGS allow_experimental_projection_optimization = 0, force_optimize_projection = 0; + +SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type +SETTINGS allow_experimental_projection_optimization = 1, force_optimize_projection = 1; + + +SELECT 'CollapsingMergeTree'; +DROP TABLE IF EXISTS tp; +CREATE TABLE tp +( + `type` Int32, + `eventcnt` UInt64, + `sign` Int8, + PROJECTION p + ( + SELECT type,sum(eventcnt) + GROUP BY type + ) +) +ENGINE = CollapsingMergeTree(sign) +ORDER BY type; + +INSERT INTO tp SELECT number % 3, 1, 1 FROM numbers(3); +INSERT INTO tp SELECT number % 3, 1, -1 FROM numbers(3); +INSERT INTO tp SELECT number % 3, 2, 1 FROM numbers(3); + +OPTIMIZE TABLE tp FINAL; + +SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type +SETTINGS allow_experimental_projection_optimization = 0, force_optimize_projection = 0; + +SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type +SETTINGS allow_experimental_projection_optimization = 1, force_optimize_projection = 1; + +-- Actually we don't need to test all 3 engines Replacing/Collapsing/VersionedCollapsing, +-- Because they share the same logic of 'reduce number of rows during merges' +SELECT 'VersionedCollapsingMergeTree'; +DROP TABLE IF EXISTS tp; +CREATE TABLE tp +( + `type` Int32, + `eventcnt` UInt64, + `sign` Int8, + `version` UInt8, + PROJECTION p + ( + SELECT type,sum(eventcnt) + GROUP BY type + ) +) +ENGINE = VersionedCollapsingMergeTree(sign,version) +ORDER BY type; + +INSERT INTO tp SELECT number % 3, 1, -1, 0 FROM numbers(3); +INSERT INTO tp SELECT number % 3, 2, 1, 1 FROM numbers(3); +INSERT INTO tp SELECT number % 3, 1, 1, 0 FROM numbers(3); + +OPTIMIZE TABLE tp FINAL; + +SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type +SETTINGS allow_experimental_projection_optimization = 0, force_optimize_projection = 0; + +SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type +SETTINGS allow_experimental_projection_optimization = 1, force_optimize_projection = 1; + +SELECT 'DEDUPLICATE ON MergeTree'; +DROP TABLE IF EXISTS tp; +CREATE TABLE tp +( + `type` Int32, + `eventcnt` UInt64, + PROJECTION p + ( + SELECT type,sum(eventcnt) + GROUP BY type + ) +) +ENGINE = MergeTree +ORDER BY type; + +INSERT INTO tp SELECT number % 3, 1 FROM numbers(3); +INSERT INTO tp SELECT number % 3, 2 FROM numbers(3); + +OPTIMIZE TABLE tp FINAL DEDUPLICATE BY type; + +SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type +SETTINGS allow_experimental_projection_optimization = 0, force_optimize_projection = 0; + +SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type +SETTINGS allow_experimental_projection_optimization = 1, force_optimize_projection = 1; + From 8c2a371eaa8e51c0d382a13f4413ab9a2796e02b Mon Sep 17 00:00:00 2001 From: cangyin Date: Mon, 8 Apr 2024 18:52:08 +0800 Subject: [PATCH 03/10] no readability-make-member-function-const --- src/Storages/MergeTree/MergeTask.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 4be1b003573..66629163ebb 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -504,7 +504,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::constructTaskForProjectionPart } -bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeMergeProjections() +bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeMergeProjections() // NOLINT { /// In case if there are no projections we didn't construct a task if (!ctx->merge_projection_parts_task_ptr) From 5ecb5da648afb4d2d62aa7edee55af4ae89e7459 Mon Sep 17 00:00:00 2001 From: cangyin Date: Wed, 10 Apr 2024 19:30:28 +0000 Subject: [PATCH 04/10] Also rebuild for OPTIMIZE CLEANUP --- src/Storages/MergeTree/MergeTask.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 66629163ebb..72a75e4a32e 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -1205,6 +1205,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRe { // These merging modes may or may not reduce number of rows. It's not known until the horizontal stage is finished. const bool merge_may_reduce_rows = + global_ctx->cleanup || global_ctx->deduplicate || ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing || ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing || From 6f6056477785842c047ab0faa4c089a514625f06 Mon Sep 17 00:00:00 2001 From: jsc0218 Date: Wed, 7 Aug 2024 02:28:13 +0000 Subject: [PATCH 05/10] fix build --- src/Storages/MergeTree/MergeTask.cpp | 12 +++++++----- src/Storages/MergeTree/MergeTask.h | 4 ++-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 2e7cc03f5ed..e50a66c91ec 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -501,11 +501,13 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::calculateProjections(const Blo for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i) { const auto & projection = *global_ctx->projections_to_rebuild[i]; - auto projection_block = ctx->projection_squashes[i].add(projection.calculate(block, global_ctx->context)); - if (projection_block) + Block block_to_squash = projection.calculate(block, global_ctx->context); + auto chunk = ctx->projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()}); + if (chunk) { + auto result = ctx->projection_squashes[i].getHeader().cloneWithColumns(chunk.detachColumns()); auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart( - *global_ctx->data, ctx->log, projection_block, projection, global_ctx->new_data_part.get(), ++ctx->projection_block_num); + *global_ctx->data, ctx->log, result, projection, global_ctx->new_data_part.get(), ++ctx->projection_block_num); tmp_part.finalize(); tmp_part.part->getDataPartStorage().commitTransaction(); ctx->projection_parts[projection.name].emplace_back(std::move(tmp_part.part)); @@ -1313,9 +1315,9 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRe const auto & settings = global_ctx->context->getSettingsRef(); - for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i) + for (auto projection : global_ctx->projections_to_rebuild) { - ctx->projection_squashes.emplace_back(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); + ctx->projection_squashes.emplace_back(projection->sample_block, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); } } diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index 0424b698f8a..f60a32252b2 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -8,7 +8,7 @@ #include #include -#include +#include #include #include @@ -231,7 +231,7 @@ private: using ProjectionNameToItsBlocks = std::map; ProjectionNameToItsBlocks projection_parts; std::move_iterator projection_parts_iterator; - std::vector projection_squashes; + std::vector projection_squashes; size_t projection_block_num = 0; ExecutableTaskPtr merge_projection_parts_task_ptr; From 22dad244e75137580e08aa2cc83538e5ffd950c8 Mon Sep 17 00:00:00 2001 From: jsc0218 Date: Thu, 8 Aug 2024 01:50:49 +0000 Subject: [PATCH 06/10] fix whitespace --- src/Storages/MergeTree/MergeTask.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 49e9121b65a..4133e47d4bc 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -887,7 +887,7 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c ctx->projections_iterator = ctx->tasks_for_projections.begin(); return false; } - + for (const auto & projection : global_ctx->projections_to_merge) { MergeTreeData::DataPartsVector projection_parts = global_ctx->projections_to_merge_parts[projection->name]; From a837df164c48f0b8041b122adc8fa80a148629f8 Mon Sep 17 00:00:00 2001 From: jsc0218 Date: Sat, 10 Aug 2024 02:37:42 +0000 Subject: [PATCH 07/10] fix squash related and projection collection --- src/Storages/MergeTree/MergeTask.cpp | 47 ++++++++++++------- .../0_stateless/02968_projection_merge.sql | 12 +++-- 2 files changed, 38 insertions(+), 21 deletions(-) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 4133e47d4bc..3b117168e33 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -502,10 +502,11 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::calculateProjections(const Blo { const auto & projection = *global_ctx->projections_to_rebuild[i]; Block block_to_squash = projection.calculate(block, global_ctx->context); - auto chunk = ctx->projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()}); - if (chunk) + ctx->projection_squashes[i].setHeader(block_to_squash.cloneEmpty()); + auto squashed_chunk = Squashing::squash(ctx->projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()})); + if (squashed_chunk) { - auto result = ctx->projection_squashes[i].getHeader().cloneWithColumns(chunk.detachColumns()); + auto result = ctx->projection_squashes[i].getHeader().cloneWithColumns(squashed_chunk.detachColumns()); auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart( *global_ctx->data, ctx->log, result, projection, global_ctx->new_data_part.get(), ++ctx->projection_block_num); tmp_part.finalize(); @@ -590,8 +591,23 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() return true; } - // finalize projections - calculateProjections(global_ctx->merging_executor->getHeader().cloneEmpty()); + /// finalize projections + // calculateProjections(global_ctx->merging_executor->getHeader().cloneEmpty()); + for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i) + { + const auto & projection = *global_ctx->projections_to_rebuild[i]; + auto & projection_squash_plan = ctx->projection_squashes[i]; + auto squashed_chunk = Squashing::squash(projection_squash_plan.flush()); + if (squashed_chunk) + { + auto result = projection_squash_plan.getHeader().cloneWithColumns(squashed_chunk.detachColumns()); + auto temp_part = MergeTreeDataWriter::writeTempProjectionPart( + *global_ctx->data, ctx->log, result, projection, global_ctx->new_data_part.get(), ++ctx->projection_block_num); + temp_part.finalize(); + temp_part.part->getDataPartStorage().commitTransaction(); + ctx->projection_parts[projection.name].emplace_back(std::move(temp_part.part)); + } + } ctx->projection_parts_iterator = std::make_move_iterator(ctx->projection_parts.begin()); if (ctx->projection_parts_iterator != std::make_move_iterator(ctx->projection_parts.end())) @@ -878,16 +894,6 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c ReadableSize(global_ctx->merge_list_element_ptr->bytes_read_uncompressed / elapsed_seconds)); } - const auto mode = global_ctx->data->getSettings()->deduplicate_merge_projection_mode; - /// Under throw mode, we still choose to drop projections due to backward compatibility since some - /// users might have projections before this change. - if (global_ctx->data->merging_params.mode != MergeTreeData::MergingParams::Ordinary - && (mode == DeduplicateMergeProjectionMode::THROW || mode == DeduplicateMergeProjectionMode::DROP)) - { - ctx->projections_iterator = ctx->tasks_for_projections.begin(); - return false; - } - for (const auto & projection : global_ctx->projections_to_merge) { MergeTreeData::DataPartsVector projection_parts = global_ctx->projections_to_merge_parts[projection->name]; @@ -1281,6 +1287,13 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRebuild() const { + const auto mode = global_ctx->data->getSettings()->deduplicate_merge_projection_mode; + /// Under throw mode, we still choose to drop projections due to backward compatibility since some + /// users might have projections before this change. + if (global_ctx->data->merging_params.mode != MergeTreeData::MergingParams::Ordinary + && (mode == DeduplicateMergeProjectionMode::THROW || mode == DeduplicateMergeProjectionMode::DROP)) + return; + // These merging modes may or may not reduce number of rows. It's not known until the horizontal stage is finished. const bool merge_may_reduce_rows = global_ctx->cleanup || @@ -1324,9 +1337,9 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRe const auto & settings = global_ctx->context->getSettingsRef(); - for (auto projection : global_ctx->projections_to_rebuild) + for (const auto * projection : global_ctx->projections_to_rebuild) { - ctx->projection_squashes.emplace_back(projection->sample_block, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); + ctx->projection_squashes.emplace_back(projection->sample_block.cloneEmpty(), settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); } } diff --git a/tests/queries/0_stateless/02968_projection_merge.sql b/tests/queries/0_stateless/02968_projection_merge.sql index 07d40e30c2f..03488042a4e 100644 --- a/tests/queries/0_stateless/02968_projection_merge.sql +++ b/tests/queries/0_stateless/02968_projection_merge.sql @@ -11,7 +11,8 @@ CREATE TABLE tp ) ) ENGINE = ReplacingMergeTree -ORDER BY type; +ORDER BY type +SETTINGS deduplicate_merge_projection_mode = 'rebuild'; INSERT INTO tp SELECT number%3, 1 FROM numbers(3); INSERT INTO tp SELECT number%3, 2 FROM numbers(3); @@ -39,7 +40,8 @@ CREATE TABLE tp ) ) ENGINE = CollapsingMergeTree(sign) -ORDER BY type; +ORDER BY type +SETTINGS deduplicate_merge_projection_mode = 'rebuild'; INSERT INTO tp SELECT number % 3, 1, 1 FROM numbers(3); INSERT INTO tp SELECT number % 3, 1, -1 FROM numbers(3); @@ -70,7 +72,8 @@ CREATE TABLE tp ) ) ENGINE = VersionedCollapsingMergeTree(sign,version) -ORDER BY type; +ORDER BY type +SETTINGS deduplicate_merge_projection_mode = 'rebuild'; INSERT INTO tp SELECT number % 3, 1, -1, 0 FROM numbers(3); INSERT INTO tp SELECT number % 3, 2, 1, 1 FROM numbers(3); @@ -97,7 +100,8 @@ CREATE TABLE tp ) ) ENGINE = MergeTree -ORDER BY type; +ORDER BY type +SETTINGS deduplicate_merge_projection_mode = 'rebuild'; INSERT INTO tp SELECT number % 3, 1 FROM numbers(3); INSERT INTO tp SELECT number % 3, 2 FROM numbers(3); From 6bed26a52764558ba2b52752bdfd126fd943c616 Mon Sep 17 00:00:00 2001 From: jsc0218 Date: Wed, 14 Aug 2024 02:19:34 +0000 Subject: [PATCH 08/10] tidy --- src/Storages/MergeTree/MergeTask.cpp | 171 ++++++++++++++------------- src/Storages/MergeTree/MergeTask.h | 3 +- 2 files changed, 90 insertions(+), 84 deletions(-) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 3b117168e33..56e17ac1884 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -145,7 +145,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu } } - for (const auto & projection : global_ctx->projections_to_rebuild) + for (const auto * projection : global_ctx->projections_to_rebuild) { Names projection_columns_vec = projection->getRequiredColumns(); std::copy(projection_columns_vec.cbegin(), projection_columns_vec.cend(), @@ -496,17 +496,76 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::execute() } +void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRebuild() const +{ + const auto mode = global_ctx->data->getSettings()->deduplicate_merge_projection_mode; + /// Under throw mode, we still choose to drop projections due to backward compatibility since some + /// users might have projections before this change. + if (global_ctx->data->merging_params.mode != MergeTreeData::MergingParams::Ordinary + && (mode == DeduplicateMergeProjectionMode::THROW || mode == DeduplicateMergeProjectionMode::DROP)) + return; + + /// These merging modes may or may not reduce number of rows. It's not known until the horizontal stage is finished. + const bool merge_may_reduce_rows = + global_ctx->cleanup || + global_ctx->deduplicate || + ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing || + ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing || + ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing; + + const auto & projections = global_ctx->metadata_snapshot->getProjections(); + + for (const auto & projection : projections) + { + if (merge_may_reduce_rows) + { + global_ctx->projections_to_rebuild.push_back(&projection); + continue; + } + + MergeTreeData::DataPartsVector projection_parts; + for (const auto & part : global_ctx->future_part->parts) + { + auto it = part->getProjectionParts().find(projection.name); + if (it != part->getProjectionParts().end()) + projection_parts.push_back(it->second); + } + if (projection_parts.size() == global_ctx->future_part->parts.size()) + { + global_ctx->projections_to_merge.push_back(&projection); + global_ctx->projections_to_merge_parts[projection.name].assign(projection_parts.begin(), projection_parts.end()); + } + else if (projection_parts.empty()) + { + LOG_DEBUG(ctx->log, "Projection {} will not be merged or rebuilt because all parts don't have it", projection.name); + } + else + { + LOG_DEBUG(ctx->log, "Projection {} will be rebuilt because some parts don't have it", projection.name); + global_ctx->projections_to_rebuild.push_back(&projection); + } + } + + const auto & settings = global_ctx->context->getSettingsRef(); + + for (const auto * projection : global_ctx->projections_to_rebuild) + ctx->projection_squashes.emplace_back(projection->sample_block.cloneEmpty(), + settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); +} + + void MergeTask::ExecuteAndFinalizeHorizontalPart::calculateProjections(const Block & block) const { for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i) { const auto & projection = *global_ctx->projections_to_rebuild[i]; Block block_to_squash = projection.calculate(block, global_ctx->context); - ctx->projection_squashes[i].setHeader(block_to_squash.cloneEmpty()); - auto squashed_chunk = Squashing::squash(ctx->projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()})); + auto & projection_squash_plan = ctx->projection_squashes[i]; + projection_squash_plan.setHeader(block_to_squash.cloneEmpty()); + Chunk squashed_chunk = Squashing::squash(projection_squash_plan.add({block_to_squash.getColumns(), block_to_squash.rows()})); if (squashed_chunk) { - auto result = ctx->projection_squashes[i].getHeader().cloneWithColumns(squashed_chunk.detachColumns()); + auto result = projection_squash_plan.getHeader().cloneWithColumns(squashed_chunk.detachColumns()); auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart( *global_ctx->data, ctx->log, result, projection, global_ctx->new_data_part.get(), ++ctx->projection_block_num); tmp_part.finalize(); @@ -517,6 +576,30 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::calculateProjections(const Blo } +void MergeTask::ExecuteAndFinalizeHorizontalPart::finalizeProjections() const +{ + for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i) + { + const auto & projection = *global_ctx->projections_to_rebuild[i]; + auto & projection_squash_plan = ctx->projection_squashes[i]; + auto squashed_chunk = Squashing::squash(projection_squash_plan.flush()); + if (squashed_chunk) + { + auto result = projection_squash_plan.getHeader().cloneWithColumns(squashed_chunk.detachColumns()); + auto temp_part = MergeTreeDataWriter::writeTempProjectionPart( + *global_ctx->data, ctx->log, result, projection, global_ctx->new_data_part.get(), ++ctx->projection_block_num); + temp_part.finalize(); + temp_part.part->getDataPartStorage().commitTransaction(); + ctx->projection_parts[projection.name].emplace_back(std::move(temp_part.part)); + } + } + + ctx->projection_parts_iterator = std::make_move_iterator(ctx->projection_parts.begin()); + if (ctx->projection_parts_iterator != std::make_move_iterator(ctx->projection_parts.end())) + constructTaskForProjectionPartsMerge(); +} + + void MergeTask::ExecuteAndFinalizeHorizontalPart::constructTaskForProjectionPartsMerge() const { auto && [name, parts] = *ctx->projection_parts_iterator; @@ -591,27 +674,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() return true; } - /// finalize projections - // calculateProjections(global_ctx->merging_executor->getHeader().cloneEmpty()); - for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i) - { - const auto & projection = *global_ctx->projections_to_rebuild[i]; - auto & projection_squash_plan = ctx->projection_squashes[i]; - auto squashed_chunk = Squashing::squash(projection_squash_plan.flush()); - if (squashed_chunk) - { - auto result = projection_squash_plan.getHeader().cloneWithColumns(squashed_chunk.detachColumns()); - auto temp_part = MergeTreeDataWriter::writeTempProjectionPart( - *global_ctx->data, ctx->log, result, projection, global_ctx->new_data_part.get(), ++ctx->projection_block_num); - temp_part.finalize(); - temp_part.part->getDataPartStorage().commitTransaction(); - ctx->projection_parts[projection.name].emplace_back(std::move(temp_part.part)); - } - } - - ctx->projection_parts_iterator = std::make_move_iterator(ctx->projection_parts.begin()); - if (ctx->projection_parts_iterator != std::make_move_iterator(ctx->projection_parts.end())) - constructTaskForProjectionPartsMerge(); + finalizeProjections(); global_ctx->merging_executor.reset(); global_ctx->merged_pipeline.reset(); @@ -1285,64 +1348,6 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() global_ctx->merging_executor = std::make_unique(global_ctx->merged_pipeline); } -void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRebuild() const -{ - const auto mode = global_ctx->data->getSettings()->deduplicate_merge_projection_mode; - /// Under throw mode, we still choose to drop projections due to backward compatibility since some - /// users might have projections before this change. - if (global_ctx->data->merging_params.mode != MergeTreeData::MergingParams::Ordinary - && (mode == DeduplicateMergeProjectionMode::THROW || mode == DeduplicateMergeProjectionMode::DROP)) - return; - - // These merging modes may or may not reduce number of rows. It's not known until the horizontal stage is finished. - const bool merge_may_reduce_rows = - global_ctx->cleanup || - global_ctx->deduplicate || - ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing || - ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing || - ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing; - - const auto & projections = global_ctx->metadata_snapshot->getProjections(); - - for (const auto & projection : projections) - { - if (merge_may_reduce_rows) - { - global_ctx->projections_to_rebuild.push_back(&projection); - continue; - } - - MergeTreeData::DataPartsVector projection_parts; - for (const auto & part : global_ctx->future_part->parts) - { - auto it = part->getProjectionParts().find(projection.name); - if (it != part->getProjectionParts().end()) - projection_parts.push_back(it->second); - } - if (projection_parts.size() == global_ctx->future_part->parts.size()) - { - global_ctx->projections_to_merge.push_back(&projection); - global_ctx->projections_to_merge_parts[projection.name].assign(projection_parts.begin(), projection_parts.end()); - } - else if (projection_parts.empty()) - { - LOG_DEBUG(ctx->log, "Projection {} will not be merged or rebuilt because all parts don't have it", projection.name); - } - else - { - LOG_DEBUG(ctx->log, "Projection {} will be rebuilt because some parts don't have it", projection.name); - global_ctx->projections_to_rebuild.push_back(&projection); - } - } - - const auto & settings = global_ctx->context->getSettingsRef(); - - for (const auto * projection : global_ctx->projections_to_rebuild) - { - ctx->projection_squashes.emplace_back(projection->sample_block.cloneEmpty(), settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); - } -} - MergeAlgorithm MergeTask::ExecuteAndFinalizeHorizontalPart::chooseMergeAlgorithm() const { diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index f60a32252b2..d9cc7c1dbad 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -227,7 +227,7 @@ private: std::unique_ptr rows_sources_write_buf{nullptr}; std::optional column_sizes{}; - // For projections to rebuild + /// For projections to rebuild using ProjectionNameToItsBlocks = std::map; ProjectionNameToItsBlocks projection_parts; std::move_iterator projection_parts_iterator; @@ -275,6 +275,7 @@ private: void prepareProjectionsToMergeAndRebuild() const; void calculateProjections(const Block & block) const; + void finalizeProjections() const; void constructTaskForProjectionPartsMerge() const; bool executeMergeProjections(); From 527774d138fd2a8b077800c0897417cedb2113be Mon Sep 17 00:00:00 2001 From: jsc0218 Date: Mon, 19 Aug 2024 15:26:17 +0000 Subject: [PATCH 09/10] use new option name --- .../integration/test_broken_projections/test.py | 2 +- .../0_stateless/02968_projection_merge.sql | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/integration/test_broken_projections/test.py b/tests/integration/test_broken_projections/test.py index 578ff42369c..4ea018d3379 100644 --- a/tests/integration/test_broken_projections/test.py +++ b/tests/integration/test_broken_projections/test.py @@ -191,7 +191,7 @@ def optimize(node, table, final, no_wait): if final: query += " FINAL" if no_wait: - query += " SETTINGS alter_sync=0" + query += " SETTINGS alter_sync=2" node.query(query) diff --git a/tests/queries/0_stateless/02968_projection_merge.sql b/tests/queries/0_stateless/02968_projection_merge.sql index 03488042a4e..3e047d2cf69 100644 --- a/tests/queries/0_stateless/02968_projection_merge.sql +++ b/tests/queries/0_stateless/02968_projection_merge.sql @@ -20,10 +20,10 @@ INSERT INTO tp SELECT number%3, 2 FROM numbers(3); OPTIMIZE TABLE tp FINAL; SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type -SETTINGS allow_experimental_projection_optimization = 0, force_optimize_projection = 0; +SETTINGS optimize_use_projections = 0, force_optimize_projection = 0; SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type -SETTINGS allow_experimental_projection_optimization = 1, force_optimize_projection = 1; +SETTINGS optimize_use_projections = 1, force_optimize_projection = 1; SELECT 'CollapsingMergeTree'; @@ -50,10 +50,10 @@ INSERT INTO tp SELECT number % 3, 2, 1 FROM numbers(3); OPTIMIZE TABLE tp FINAL; SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type -SETTINGS allow_experimental_projection_optimization = 0, force_optimize_projection = 0; +SETTINGS optimize_use_projections = 0, force_optimize_projection = 0; SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type -SETTINGS allow_experimental_projection_optimization = 1, force_optimize_projection = 1; +SETTINGS optimize_use_projections = 1, force_optimize_projection = 1; -- Actually we don't need to test all 3 engines Replacing/Collapsing/VersionedCollapsing, -- Because they share the same logic of 'reduce number of rows during merges' @@ -82,10 +82,10 @@ INSERT INTO tp SELECT number % 3, 1, 1, 0 FROM numbers(3); OPTIMIZE TABLE tp FINAL; SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type -SETTINGS allow_experimental_projection_optimization = 0, force_optimize_projection = 0; +SETTINGS optimize_use_projections = 0, force_optimize_projection = 0; SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type -SETTINGS allow_experimental_projection_optimization = 1, force_optimize_projection = 1; +SETTINGS optimize_use_projections = 1, force_optimize_projection = 1; SELECT 'DEDUPLICATE ON MergeTree'; DROP TABLE IF EXISTS tp; @@ -109,8 +109,8 @@ INSERT INTO tp SELECT number % 3, 2 FROM numbers(3); OPTIMIZE TABLE tp FINAL DEDUPLICATE BY type; SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type -SETTINGS allow_experimental_projection_optimization = 0, force_optimize_projection = 0; +SETTINGS optimize_use_projections = 0, force_optimize_projection = 0; SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type -SETTINGS allow_experimental_projection_optimization = 1, force_optimize_projection = 1; +SETTINGS optimize_use_projections = 1, force_optimize_projection = 1; From 902e7b6f29fe90007525f1f74adbf26e72b90bde Mon Sep 17 00:00:00 2001 From: jsc0218 Date: Mon, 19 Aug 2024 23:58:48 +0000 Subject: [PATCH 10/10] ignore broken proj --- src/Storages/MergeTree/MergeTask.cpp | 11 ++++------- tests/integration/test_broken_projections/test.py | 2 +- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 9ba09ea494a..8aac267d653 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -560,7 +560,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRe for (const auto & part : global_ctx->future_part->parts) { auto it = part->getProjectionParts().find(projection.name); - if (it != part->getProjectionParts().end()) + if (it != part->getProjectionParts().end() && !it->second->is_broken) projection_parts.push_back(it->second); } if (projection_parts.size() == global_ctx->future_part->parts.size()) @@ -568,14 +568,11 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRe global_ctx->projections_to_merge.push_back(&projection); global_ctx->projections_to_merge_parts[projection.name].assign(projection_parts.begin(), projection_parts.end()); } - else if (projection_parts.empty()) - { - LOG_DEBUG(ctx->log, "Projection {} will not be merged or rebuilt because all parts don't have it", projection.name); - } else { - LOG_DEBUG(ctx->log, "Projection {} will be rebuilt because some parts don't have it", projection.name); - global_ctx->projections_to_rebuild.push_back(&projection); + chassert(projection_parts.size() < global_ctx->future_part->parts.size()); + LOG_DEBUG(ctx->log, "Projection {} is not merged because some parts don't have it", projection.name); + continue; } } diff --git a/tests/integration/test_broken_projections/test.py b/tests/integration/test_broken_projections/test.py index 4ea018d3379..578ff42369c 100644 --- a/tests/integration/test_broken_projections/test.py +++ b/tests/integration/test_broken_projections/test.py @@ -191,7 +191,7 @@ def optimize(node, table, final, no_wait): if final: query += " FINAL" if no_wait: - query += " SETTINGS alter_sync=2" + query += " SETTINGS alter_sync=0" node.query(query)