diff --git a/src/Storages/MergeTree/MergeProjectionPartsTask.cpp b/src/Storages/MergeTree/MergeProjectionPartsTask.cpp new file mode 100644 index 00000000000..bdc2ba8b9ca --- /dev/null +++ b/src/Storages/MergeTree/MergeProjectionPartsTask.cpp @@ -0,0 +1,95 @@ +#include + +#include +#include + +namespace DB +{ + +bool MergeProjectionPartsTask::executeStep() +{ + auto & current_level_parts = level_parts[current_level]; + auto & next_level_parts = level_parts[next_level]; + + MergeTreeData::MutableDataPartsVector selected_parts; + while (selected_parts.size() < max_parts_to_merge_in_one_level && !current_level_parts.empty()) + { + selected_parts.push_back(std::move(current_level_parts.back())); + current_level_parts.pop_back(); + } + + if (selected_parts.empty()) + { + if (next_level_parts.empty()) + { + LOG_WARNING(log, "There is no projection parts merged"); + + /// Task is finished + return false; + } + current_level = next_level; + ++next_level; + } + else if (selected_parts.size() == 1) + { + if (next_level_parts.empty()) + { + LOG_DEBUG(log, "Merged a projection part in level {}", current_level); + selected_parts[0]->renameTo(projection.name + ".proj", true); + selected_parts[0]->setName(projection.name); + selected_parts[0]->is_temp = false; + new_data_part->addProjectionPart(name, std::move(selected_parts[0])); + + /// Task is finished + return false; + } + else + { + LOG_DEBUG(log, "Forwarded part {} in level {} to next level", selected_parts[0]->name, current_level); + next_level_parts.push_back(std::move(selected_parts[0])); + } + } + else if (selected_parts.size() > 1) + { + // Generate a unique part name + ++block_num; + auto projection_future_part = std::make_shared(); + MergeTreeData::DataPartsVector const_selected_parts( + std::make_move_iterator(selected_parts.begin()), std::make_move_iterator(selected_parts.end())); + projection_future_part->assign(std::move(const_selected_parts)); + projection_future_part->name = fmt::format("{}_{}", projection.name, ++block_num); + projection_future_part->part_info = {"all", 0, 0, 0}; + + MergeTreeData::MergingParams projection_merging_params; + projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary; + if (projection.type == ProjectionDescription::Type::Aggregate) + projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating; + + LOG_DEBUG(log, "Merged {} parts in level {} to {}", selected_parts.size(), current_level, projection_future_part->name); + auto tmp_part_merge_task = mutator->mergePartsToTemporaryPart( + projection_future_part, + projection.metadata, + merge_entry, + std::make_unique((*merge_entry)->table_id, projection_future_part, context), + *table_lock_holder, + time_of_merge, + context, + space_reservation, + false, // TODO Do we need deduplicate for projections + {}, + false, // no cleanup + projection_merging_params, + NO_TRANSACTION_PTR, + /* need_prefix */ true, + new_data_part.get(), + ".tmp_proj"); + + next_level_parts.push_back(executeHere(tmp_part_merge_task)); + next_level_parts.back()->is_temp = true; + } + + /// Need execute again + return true; +} + +} diff --git a/src/Storages/MergeTree/MergeProjectionPartsTask.h b/src/Storages/MergeTree/MergeProjectionPartsTask.h new file mode 100644 index 00000000000..47cafe01151 --- /dev/null +++ b/src/Storages/MergeTree/MergeProjectionPartsTask.h @@ -0,0 +1,84 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +class MergeProjectionPartsTask : public IExecutableTask +{ +public: + + MergeProjectionPartsTask( + String name_, + MergeTreeData::MutableDataPartsVector && parts_, + const ProjectionDescription & projection_, + size_t & block_num_, + ContextPtr context_, + TableLockHolder * table_lock_holder_, + MergeTreeDataMergerMutator * mutator_, + MergeListEntry * merge_entry_, + time_t time_of_merge_, + MergeTreeData::MutableDataPartPtr new_data_part_, + ReservationSharedPtr space_reservation_) + : name(std::move(name_)) + , parts(std::move(parts_)) + , projection(projection_) + , block_num(block_num_) + , context(context_) + , table_lock_holder(table_lock_holder_) + , mutator(mutator_) + , merge_entry(merge_entry_) + , time_of_merge(time_of_merge_) + , new_data_part(new_data_part_) + , space_reservation(space_reservation_) + , log(getLogger("MergeProjectionPartsTask")) + { + LOG_DEBUG(log, "Selected {} projection_parts from {} to {}", parts.size(), parts.front()->name, parts.back()->name); + level_parts[current_level] = std::move(parts); + } + + void onCompleted() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } + StorageID getStorageID() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } + Priority getPriority() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } + String getQueryId() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } + + bool executeStep() override; + +private: + String name; + MergeTreeData::MutableDataPartsVector parts; + const ProjectionDescription & projection; + size_t & block_num; + + ContextPtr context; + TableLockHolder * table_lock_holder; + MergeTreeDataMergerMutator * mutator; + MergeListEntry * merge_entry; + time_t time_of_merge; + + MergeTreeData::MutableDataPartPtr new_data_part; + ReservationSharedPtr space_reservation; + + LoggerPtr log; + + std::map level_parts; + size_t current_level = 0; + size_t next_level = 1; + + /// TODO(nikitamikhaylov): make this constant a setting + static constexpr size_t max_parts_to_merge_in_one_level = 10; +}; + +} diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 806ed930311..aa178be270f 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -21,6 +21,8 @@ #include #include #include +#include +#include #include #include #include @@ -63,6 +65,7 @@ namespace ErrorCodes extern const int SUPPORT_IS_DISABLED; } + static ColumnsStatistics getStatisticsForColumns( const NamesAndTypesList & columns_to_read, const StorageMetadataPtr & metadata_snapshot) @@ -155,6 +158,13 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu } } + for (const auto * projection : global_ctx->projections_to_rebuild) + { + Names projection_columns_vec = projection->getRequiredColumns(); + std::copy(projection_columns_vec.cbegin(), projection_columns_vec.cend(), + std::inserter(key_columns, key_columns.end())); + } + /// TODO: also force "summing" and "aggregating" columns to make Horizontal merge only for such columns for (const auto & column : global_ctx->storage_columns) @@ -254,6 +264,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() extendObjectColumns(global_ctx->storage_columns, object_columns, false); global_ctx->storage_snapshot = std::make_shared(*global_ctx->data, global_ctx->metadata_snapshot, std::move(object_columns)); + prepareProjectionsToMergeAndRebuild(); + extractMergingAndGatheringColumns(); global_ctx->new_data_part->uuid = global_ctx->future_part->uuid; @@ -517,6 +529,148 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::execute() } +void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRebuild() const +{ + const auto mode = global_ctx->data->getSettings()->deduplicate_merge_projection_mode; + /// Under throw mode, we still choose to drop projections due to backward compatibility since some + /// users might have projections before this change. + if (global_ctx->data->merging_params.mode != MergeTreeData::MergingParams::Ordinary + && (mode == DeduplicateMergeProjectionMode::THROW || mode == DeduplicateMergeProjectionMode::DROP)) + return; + + /// These merging modes may or may not reduce number of rows. It's not known until the horizontal stage is finished. + const bool merge_may_reduce_rows = + global_ctx->cleanup || + global_ctx->deduplicate || + ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing || + ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing || + ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing; + + const auto & projections = global_ctx->metadata_snapshot->getProjections(); + + for (const auto & projection : projections) + { + if (merge_may_reduce_rows) + { + global_ctx->projections_to_rebuild.push_back(&projection); + continue; + } + + MergeTreeData::DataPartsVector projection_parts; + for (const auto & part : global_ctx->future_part->parts) + { + auto it = part->getProjectionParts().find(projection.name); + if (it != part->getProjectionParts().end() && !it->second->is_broken) + projection_parts.push_back(it->second); + } + if (projection_parts.size() == global_ctx->future_part->parts.size()) + { + global_ctx->projections_to_merge.push_back(&projection); + global_ctx->projections_to_merge_parts[projection.name].assign(projection_parts.begin(), projection_parts.end()); + } + else + { + chassert(projection_parts.size() < global_ctx->future_part->parts.size()); + LOG_DEBUG(ctx->log, "Projection {} is not merged because some parts don't have it", projection.name); + continue; + } + } + + const auto & settings = global_ctx->context->getSettingsRef(); + + for (const auto * projection : global_ctx->projections_to_rebuild) + ctx->projection_squashes.emplace_back(projection->sample_block.cloneEmpty(), + settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); +} + + +void MergeTask::ExecuteAndFinalizeHorizontalPart::calculateProjections(const Block & block) const +{ + for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i) + { + const auto & projection = *global_ctx->projections_to_rebuild[i]; + Block block_to_squash = projection.calculate(block, global_ctx->context); + auto & projection_squash_plan = ctx->projection_squashes[i]; + projection_squash_plan.setHeader(block_to_squash.cloneEmpty()); + Chunk squashed_chunk = Squashing::squash(projection_squash_plan.add({block_to_squash.getColumns(), block_to_squash.rows()})); + if (squashed_chunk) + { + auto result = projection_squash_plan.getHeader().cloneWithColumns(squashed_chunk.detachColumns()); + auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart( + *global_ctx->data, ctx->log, result, projection, global_ctx->new_data_part.get(), ++ctx->projection_block_num); + tmp_part.finalize(); + tmp_part.part->getDataPartStorage().commitTransaction(); + ctx->projection_parts[projection.name].emplace_back(std::move(tmp_part.part)); + } + } +} + + +void MergeTask::ExecuteAndFinalizeHorizontalPart::finalizeProjections() const +{ + for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i) + { + const auto & projection = *global_ctx->projections_to_rebuild[i]; + auto & projection_squash_plan = ctx->projection_squashes[i]; + auto squashed_chunk = Squashing::squash(projection_squash_plan.flush()); + if (squashed_chunk) + { + auto result = projection_squash_plan.getHeader().cloneWithColumns(squashed_chunk.detachColumns()); + auto temp_part = MergeTreeDataWriter::writeTempProjectionPart( + *global_ctx->data, ctx->log, result, projection, global_ctx->new_data_part.get(), ++ctx->projection_block_num); + temp_part.finalize(); + temp_part.part->getDataPartStorage().commitTransaction(); + ctx->projection_parts[projection.name].emplace_back(std::move(temp_part.part)); + } + } + + ctx->projection_parts_iterator = std::make_move_iterator(ctx->projection_parts.begin()); + if (ctx->projection_parts_iterator != std::make_move_iterator(ctx->projection_parts.end())) + constructTaskForProjectionPartsMerge(); +} + + +void MergeTask::ExecuteAndFinalizeHorizontalPart::constructTaskForProjectionPartsMerge() const +{ + auto && [name, parts] = *ctx->projection_parts_iterator; + const auto & projection = global_ctx->metadata_snapshot->projections.get(name); + + ctx->merge_projection_parts_task_ptr = std::make_unique + ( + name, + std::move(parts), + projection, + ctx->projection_block_num, + global_ctx->context, + global_ctx->holder, + global_ctx->mutator, + global_ctx->merge_entry, + global_ctx->time_of_merge, + global_ctx->new_data_part, + global_ctx->space_reservation + ); +} + + +bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeMergeProjections() // NOLINT +{ + /// In case if there are no projections we didn't construct a task + if (!ctx->merge_projection_parts_task_ptr) + return false; + + if (ctx->merge_projection_parts_task_ptr->executeStep()) + return true; + + ++ctx->projection_parts_iterator; + + if (ctx->projection_parts_iterator == std::make_move_iterator(ctx->projection_parts.end())) + return false; + + constructTaskForProjectionPartsMerge(); + + return true; +} + bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() { Stopwatch watch(CLOCK_MONOTONIC_COARSE); @@ -535,6 +689,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() global_ctx->rows_written += block.rows(); const_cast(*global_ctx->to).write(block); + calculateProjections(block); + UInt64 result_rows = 0; UInt64 result_bytes = 0; global_ctx->merged_pipeline.tryGetResultRowsAndBytes(result_rows, result_bytes); @@ -558,8 +714,10 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() return true; } + void MergeTask::ExecuteAndFinalizeHorizontalPart::finalize() const { + finalizeProjections(); global_ctx->merging_executor.reset(); global_ctx->merged_pipeline.reset(); @@ -847,35 +1005,9 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c ReadableSize(global_ctx->merge_list_element_ptr->bytes_read_uncompressed / elapsed_seconds)); } - - const auto mode = global_ctx->data->getSettings()->deduplicate_merge_projection_mode; - /// Under throw mode, we still choose to drop projections due to backward compatibility since some - /// users might have projections before this change. - if (global_ctx->data->merging_params.mode != MergeTreeData::MergingParams::Ordinary - && (mode == DeduplicateMergeProjectionMode::THROW || mode == DeduplicateMergeProjectionMode::DROP)) + for (const auto & projection : global_ctx->projections_to_merge) { - ctx->projections_iterator = ctx->tasks_for_projections.begin(); - return false; - } - - const auto & projections = global_ctx->metadata_snapshot->getProjections(); - - for (const auto & projection : projections) - { - MergeTreeData::DataPartsVector projection_parts; - for (const auto & part : global_ctx->future_part->parts) - { - auto actual_projection_parts = part->getProjectionParts(); - auto it = actual_projection_parts.find(projection.name); - if (it != actual_projection_parts.end() && !it->second->is_broken) - projection_parts.push_back(it->second); - } - if (projection_parts.size() < global_ctx->future_part->parts.size()) - { - LOG_DEBUG(ctx->log, "Projection {} is not merged because some parts don't have it", projection.name); - continue; - } - + MergeTreeData::DataPartsVector projection_parts = global_ctx->projections_to_merge_parts[projection->name]; LOG_DEBUG( ctx->log, "Selected {} projection_parts from {} to {}", @@ -885,7 +1017,7 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c auto projection_future_part = std::make_shared(); projection_future_part->assign(std::move(projection_parts)); - projection_future_part->name = projection.name; + projection_future_part->name = projection->name; // TODO (ab): path in future_part is only for merge process introspection, which is not available for merges of projection parts. // Let's comment this out to avoid code inconsistency and add it back after we implement projection merge introspection. // projection_future_part->path = global_ctx->future_part->path + "/" + projection.name + ".proj/"; @@ -893,16 +1025,17 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c MergeTreeData::MergingParams projection_merging_params; projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary; - if (projection.type == ProjectionDescription::Type::Aggregate) + if (projection->type == ProjectionDescription::Type::Aggregate) projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating; ctx->tasks_for_projections.emplace_back(std::make_shared( projection_future_part, - projection.metadata, + projection->metadata, global_ctx->merge_entry, std::make_unique((*global_ctx->merge_entry)->table_id, projection_future_part, global_ctx->context), global_ctx->time_of_merge, global_ctx->context, + *global_ctx->holder, global_ctx->space_reservation, global_ctx->deduplicate, global_ctx->deduplicate_by_columns, diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index 9a68b2e04ac..c80995888d4 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -72,6 +73,7 @@ public: std::unique_ptr projection_merge_list_element_, time_t time_of_merge_, ContextPtr context_, + TableLockHolder & holder, ReservationSharedPtr space_reservation_, bool deduplicate_, Names deduplicate_by_columns_, @@ -96,6 +98,7 @@ public: = global_ctx->projection_merge_list_element ? global_ctx->projection_merge_list_element.get() : (*global_ctx->merge_entry)->ptr(); global_ctx->time_of_merge = std::move(time_of_merge_); global_ctx->context = std::move(context_); + global_ctx->holder = &holder; global_ctx->space_reservation = std::move(space_reservation_); global_ctx->deduplicate = std::move(deduplicate_); global_ctx->deduplicate_by_columns = std::move(deduplicate_by_columns_); @@ -151,6 +154,7 @@ private: /// Proper initialization is responsibility of the author struct GlobalRuntimeContext : public IStageRuntimeContext { + TableLockHolder * holder; MergeList::Entry * merge_entry{nullptr}; /// If not null, use this instead of the global MergeList::Entry. This is for merging projections. std::unique_ptr projection_merge_list_element; @@ -181,6 +185,10 @@ private: MergeAlgorithm chosen_merge_algorithm{MergeAlgorithm::Undecided}; + std::vector projections_to_rebuild{}; + std::vector projections_to_merge{}; + std::map projections_to_merge_parts{}; + std::unique_ptr horizontal_stage_progress{nullptr}; std::unique_ptr column_progress{nullptr}; @@ -228,6 +236,14 @@ private: std::unique_ptr rows_sources_write_buf{nullptr}; std::optional column_sizes{}; + /// For projections to rebuild + using ProjectionNameToItsBlocks = std::map; + ProjectionNameToItsBlocks projection_parts; + std::move_iterator projection_parts_iterator; + std::vector projection_squashes; + size_t projection_block_num = 0; + ExecutableTaskPtr merge_projection_parts_task_ptr; + size_t initial_reservation{0}; bool read_with_direct_io{false}; @@ -257,16 +273,23 @@ private: void finalize() const; /// NOTE: Using pointer-to-member instead of std::function and lambda makes stacktraces much more concise and readable - using ExecuteAndFinalizeHorizontalPartSubtasks = std::array; + using ExecuteAndFinalizeHorizontalPartSubtasks = std::array; const ExecuteAndFinalizeHorizontalPartSubtasks subtasks { &ExecuteAndFinalizeHorizontalPart::prepare, - &ExecuteAndFinalizeHorizontalPart::executeImpl + &ExecuteAndFinalizeHorizontalPart::executeImpl, + &ExecuteAndFinalizeHorizontalPart::executeMergeProjections }; ExecuteAndFinalizeHorizontalPartSubtasks::const_iterator subtasks_iterator = subtasks.begin(); + void prepareProjectionsToMergeAndRebuild() const; + void calculateProjections(const Block & block) const; + void finalizeProjections() const; + void constructTaskForProjectionPartsMerge() const; + bool executeMergeProjections(); + MergeAlgorithm chooseMergeAlgorithm() const; void createMergedStream(); void extractMergingAndGatheringColumns() const; diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 140a226f2d1..5c65dd6c1c1 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -671,7 +671,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart( const StorageMetadataPtr & metadata_snapshot, MergeList::Entry * merge_entry, std::unique_ptr projection_merge_list_element, - TableLockHolder, + TableLockHolder & holder, time_t time_of_merge, ContextPtr context, ReservationSharedPtr space_reservation, @@ -691,6 +691,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart( std::move(projection_merge_list_element), time_of_merge, context, + holder, space_reservation, deduplicate, deduplicate_by_columns, diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index d2852a3a504..c14962d903b 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -159,7 +159,7 @@ public: const StorageMetadataPtr & metadata_snapshot, MergeListEntry * merge_entry, std::unique_ptr projection_merge_list_element, - TableLockHolder table_lock_holder, + TableLockHolder & table_lock_holder, time_t time_of_merge, ContextPtr context, ReservationSharedPtr space_reservation, diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 55a4947832e..28464ae6434 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -1058,136 +1059,6 @@ struct MutationContext using MutationContextPtr = std::shared_ptr; -class MergeProjectionPartsTask : public IExecutableTask -{ -public: - - MergeProjectionPartsTask( - String name_, - MergeTreeData::MutableDataPartsVector && parts_, - const ProjectionDescription & projection_, - size_t & block_num_, - MutationContextPtr ctx_) - : name(std::move(name_)) - , parts(std::move(parts_)) - , projection(projection_) - , block_num(block_num_) - , ctx(ctx_) - , log(getLogger("MergeProjectionPartsTask")) - { - LOG_DEBUG(log, "Selected {} projection_parts from {} to {}", parts.size(), parts.front()->name, parts.back()->name); - level_parts[current_level] = std::move(parts); - } - - void onCompleted() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } - StorageID getStorageID() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } - Priority getPriority() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } - String getQueryId() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } - - bool executeStep() override - { - auto & current_level_parts = level_parts[current_level]; - auto & next_level_parts = level_parts[next_level]; - - MergeTreeData::MutableDataPartsVector selected_parts; - while (selected_parts.size() < max_parts_to_merge_in_one_level && !current_level_parts.empty()) - { - selected_parts.push_back(std::move(current_level_parts.back())); - current_level_parts.pop_back(); - } - - if (selected_parts.empty()) - { - if (next_level_parts.empty()) - { - LOG_WARNING(log, "There is no projection parts merged"); - - /// Task is finished - return false; - } - current_level = next_level; - ++next_level; - } - else if (selected_parts.size() == 1) - { - if (next_level_parts.empty()) - { - LOG_DEBUG(log, "Merged a projection part in level {}", current_level); - selected_parts[0]->renameTo(projection.name + ".proj", true); - selected_parts[0]->setName(projection.name); - selected_parts[0]->is_temp = false; - ctx->new_data_part->addProjectionPart(name, std::move(selected_parts[0])); - - /// Task is finished - return false; - } - else - { - LOG_DEBUG(log, "Forwarded part {} in level {} to next level", selected_parts[0]->name, current_level); - next_level_parts.push_back(std::move(selected_parts[0])); - } - } - else if (selected_parts.size() > 1) - { - // Generate a unique part name - ++block_num; - auto projection_future_part = std::make_shared(); - MergeTreeData::DataPartsVector const_selected_parts( - std::make_move_iterator(selected_parts.begin()), std::make_move_iterator(selected_parts.end())); - projection_future_part->assign(std::move(const_selected_parts)); - projection_future_part->name = fmt::format("{}_{}", projection.name, ++block_num); - projection_future_part->part_info = {"all", 0, 0, 0}; - - MergeTreeData::MergingParams projection_merging_params; - projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary; - if (projection.type == ProjectionDescription::Type::Aggregate) - projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating; - - LOG_DEBUG(log, "Merged {} parts in level {} to {}", selected_parts.size(), current_level, projection_future_part->name); - auto tmp_part_merge_task = ctx->mutator->mergePartsToTemporaryPart( - projection_future_part, - projection.metadata, - ctx->mutate_entry, - std::make_unique((*ctx->mutate_entry)->table_id, projection_future_part, ctx->context), - *ctx->holder, - ctx->time_of_mutation, - ctx->context, - ctx->space_reservation, - false, // TODO Do we need deduplicate for projections - {}, - false, // no cleanup - projection_merging_params, - NO_TRANSACTION_PTR, - /* need_prefix */ true, - ctx->new_data_part.get(), - ".tmp_proj"); - - next_level_parts.push_back(executeHere(tmp_part_merge_task)); - next_level_parts.back()->is_temp = true; - } - - /// Need execute again - return true; - } - -private: - String name; - MergeTreeData::MutableDataPartsVector parts; - const ProjectionDescription & projection; - size_t & block_num; - MutationContextPtr ctx; - - LoggerPtr log; - - std::map level_parts; - size_t current_level = 0; - size_t next_level = 1; - - /// TODO(nikitamikhaylov): make this constant a setting - static constexpr size_t max_parts_to_merge_in_one_level = 10; -}; - - // This class is responsible for: // 1. get projection pipeline and a sink to write parts // 2. build an executor that can write block to the input stream (actually we can write through it to generate as many parts as possible) @@ -1406,7 +1277,13 @@ void PartMergerWriter::constructTaskForProjectionPartsMerge() std::move(parts), projection, block_num, - ctx + ctx->context, + ctx->holder, + ctx->mutator, + ctx->mutate_entry, + ctx->time_of_mutation, + ctx->new_data_part, + ctx->space_reservation ); } diff --git a/tests/queries/0_stateless/02968_projection_merge.reference b/tests/queries/0_stateless/02968_projection_merge.reference new file mode 100644 index 00000000000..40cb572c95a --- /dev/null +++ b/tests/queries/0_stateless/02968_projection_merge.reference @@ -0,0 +1,28 @@ +ReplacingMergeTree +0 2 +1 2 +2 2 +0 2 +1 2 +2 2 +CollapsingMergeTree +0 2 +1 2 +2 2 +0 2 +1 2 +2 2 +VersionedCollapsingMergeTree +0 2 +1 2 +2 2 +0 2 +1 2 +2 2 +DEDUPLICATE ON MergeTree +0 1 +1 1 +2 1 +0 1 +1 1 +2 1 diff --git a/tests/queries/0_stateless/02968_projection_merge.sql b/tests/queries/0_stateless/02968_projection_merge.sql new file mode 100644 index 00000000000..3e047d2cf69 --- /dev/null +++ b/tests/queries/0_stateless/02968_projection_merge.sql @@ -0,0 +1,116 @@ +SELECT 'ReplacingMergeTree'; +DROP TABLE IF EXISTS tp; +CREATE TABLE tp +( + `type` Int32, + `eventcnt` UInt64, + PROJECTION p + ( + SELECT type,sum(eventcnt) + GROUP BY type + ) +) +ENGINE = ReplacingMergeTree +ORDER BY type +SETTINGS deduplicate_merge_projection_mode = 'rebuild'; + +INSERT INTO tp SELECT number%3, 1 FROM numbers(3); +INSERT INTO tp SELECT number%3, 2 FROM numbers(3); + +OPTIMIZE TABLE tp FINAL; + +SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type +SETTINGS optimize_use_projections = 0, force_optimize_projection = 0; + +SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type +SETTINGS optimize_use_projections = 1, force_optimize_projection = 1; + + +SELECT 'CollapsingMergeTree'; +DROP TABLE IF EXISTS tp; +CREATE TABLE tp +( + `type` Int32, + `eventcnt` UInt64, + `sign` Int8, + PROJECTION p + ( + SELECT type,sum(eventcnt) + GROUP BY type + ) +) +ENGINE = CollapsingMergeTree(sign) +ORDER BY type +SETTINGS deduplicate_merge_projection_mode = 'rebuild'; + +INSERT INTO tp SELECT number % 3, 1, 1 FROM numbers(3); +INSERT INTO tp SELECT number % 3, 1, -1 FROM numbers(3); +INSERT INTO tp SELECT number % 3, 2, 1 FROM numbers(3); + +OPTIMIZE TABLE tp FINAL; + +SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type +SETTINGS optimize_use_projections = 0, force_optimize_projection = 0; + +SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type +SETTINGS optimize_use_projections = 1, force_optimize_projection = 1; + +-- Actually we don't need to test all 3 engines Replacing/Collapsing/VersionedCollapsing, +-- Because they share the same logic of 'reduce number of rows during merges' +SELECT 'VersionedCollapsingMergeTree'; +DROP TABLE IF EXISTS tp; +CREATE TABLE tp +( + `type` Int32, + `eventcnt` UInt64, + `sign` Int8, + `version` UInt8, + PROJECTION p + ( + SELECT type,sum(eventcnt) + GROUP BY type + ) +) +ENGINE = VersionedCollapsingMergeTree(sign,version) +ORDER BY type +SETTINGS deduplicate_merge_projection_mode = 'rebuild'; + +INSERT INTO tp SELECT number % 3, 1, -1, 0 FROM numbers(3); +INSERT INTO tp SELECT number % 3, 2, 1, 1 FROM numbers(3); +INSERT INTO tp SELECT number % 3, 1, 1, 0 FROM numbers(3); + +OPTIMIZE TABLE tp FINAL; + +SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type +SETTINGS optimize_use_projections = 0, force_optimize_projection = 0; + +SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type +SETTINGS optimize_use_projections = 1, force_optimize_projection = 1; + +SELECT 'DEDUPLICATE ON MergeTree'; +DROP TABLE IF EXISTS tp; +CREATE TABLE tp +( + `type` Int32, + `eventcnt` UInt64, + PROJECTION p + ( + SELECT type,sum(eventcnt) + GROUP BY type + ) +) +ENGINE = MergeTree +ORDER BY type +SETTINGS deduplicate_merge_projection_mode = 'rebuild'; + +INSERT INTO tp SELECT number % 3, 1 FROM numbers(3); +INSERT INTO tp SELECT number % 3, 2 FROM numbers(3); + +OPTIMIZE TABLE tp FINAL DEDUPLICATE BY type; + +SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type +SETTINGS optimize_use_projections = 0, force_optimize_projection = 0; + +SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type +SETTINGS optimize_use_projections = 1, force_optimize_projection = 1; +