mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge pull request #62364 from cangyin/fix-projection-merge
Rebuild projection for merges that reduces rows
This commit is contained in:
commit
6289c65e02
95
src/Storages/MergeTree/MergeProjectionPartsTask.cpp
Normal file
95
src/Storages/MergeTree/MergeProjectionPartsTask.cpp
Normal file
@ -0,0 +1,95 @@
|
||||
#include <Storages/MergeTree/MergeProjectionPartsTask.h>
|
||||
|
||||
#include <Common/TransactionID.h>
|
||||
#include <Storages/MergeTree/MergeList.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
bool MergeProjectionPartsTask::executeStep()
|
||||
{
|
||||
auto & current_level_parts = level_parts[current_level];
|
||||
auto & next_level_parts = level_parts[next_level];
|
||||
|
||||
MergeTreeData::MutableDataPartsVector selected_parts;
|
||||
while (selected_parts.size() < max_parts_to_merge_in_one_level && !current_level_parts.empty())
|
||||
{
|
||||
selected_parts.push_back(std::move(current_level_parts.back()));
|
||||
current_level_parts.pop_back();
|
||||
}
|
||||
|
||||
if (selected_parts.empty())
|
||||
{
|
||||
if (next_level_parts.empty())
|
||||
{
|
||||
LOG_WARNING(log, "There is no projection parts merged");
|
||||
|
||||
/// Task is finished
|
||||
return false;
|
||||
}
|
||||
current_level = next_level;
|
||||
++next_level;
|
||||
}
|
||||
else if (selected_parts.size() == 1)
|
||||
{
|
||||
if (next_level_parts.empty())
|
||||
{
|
||||
LOG_DEBUG(log, "Merged a projection part in level {}", current_level);
|
||||
selected_parts[0]->renameTo(projection.name + ".proj", true);
|
||||
selected_parts[0]->setName(projection.name);
|
||||
selected_parts[0]->is_temp = false;
|
||||
new_data_part->addProjectionPart(name, std::move(selected_parts[0]));
|
||||
|
||||
/// Task is finished
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(log, "Forwarded part {} in level {} to next level", selected_parts[0]->name, current_level);
|
||||
next_level_parts.push_back(std::move(selected_parts[0]));
|
||||
}
|
||||
}
|
||||
else if (selected_parts.size() > 1)
|
||||
{
|
||||
// Generate a unique part name
|
||||
++block_num;
|
||||
auto projection_future_part = std::make_shared<FutureMergedMutatedPart>();
|
||||
MergeTreeData::DataPartsVector const_selected_parts(
|
||||
std::make_move_iterator(selected_parts.begin()), std::make_move_iterator(selected_parts.end()));
|
||||
projection_future_part->assign(std::move(const_selected_parts));
|
||||
projection_future_part->name = fmt::format("{}_{}", projection.name, ++block_num);
|
||||
projection_future_part->part_info = {"all", 0, 0, 0};
|
||||
|
||||
MergeTreeData::MergingParams projection_merging_params;
|
||||
projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary;
|
||||
if (projection.type == ProjectionDescription::Type::Aggregate)
|
||||
projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating;
|
||||
|
||||
LOG_DEBUG(log, "Merged {} parts in level {} to {}", selected_parts.size(), current_level, projection_future_part->name);
|
||||
auto tmp_part_merge_task = mutator->mergePartsToTemporaryPart(
|
||||
projection_future_part,
|
||||
projection.metadata,
|
||||
merge_entry,
|
||||
std::make_unique<MergeListElement>((*merge_entry)->table_id, projection_future_part, context),
|
||||
*table_lock_holder,
|
||||
time_of_merge,
|
||||
context,
|
||||
space_reservation,
|
||||
false, // TODO Do we need deduplicate for projections
|
||||
{},
|
||||
false, // no cleanup
|
||||
projection_merging_params,
|
||||
NO_TRANSACTION_PTR,
|
||||
/* need_prefix */ true,
|
||||
new_data_part.get(),
|
||||
".tmp_proj");
|
||||
|
||||
next_level_parts.push_back(executeHere(tmp_part_merge_task));
|
||||
next_level_parts.back()->is_temp = true;
|
||||
}
|
||||
|
||||
/// Need execute again
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
84
src/Storages/MergeTree/MergeProjectionPartsTask.h
Normal file
84
src/Storages/MergeTree/MergeProjectionPartsTask.h
Normal file
@ -0,0 +1,84 @@
|
||||
#pragma once
|
||||
|
||||
#include <Interpreters/StorageID.h>
|
||||
#include <Storages/MergeTree/IExecutableTask.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
|
||||
#include <Storages/MergeTree/MergeProgress.h>
|
||||
#include <Storages/MergeTree/FutureMergedMutatedPart.h>
|
||||
#include <Storages/ProjectionsDescription.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
class MergeProjectionPartsTask : public IExecutableTask
|
||||
{
|
||||
public:
|
||||
|
||||
MergeProjectionPartsTask(
|
||||
String name_,
|
||||
MergeTreeData::MutableDataPartsVector && parts_,
|
||||
const ProjectionDescription & projection_,
|
||||
size_t & block_num_,
|
||||
ContextPtr context_,
|
||||
TableLockHolder * table_lock_holder_,
|
||||
MergeTreeDataMergerMutator * mutator_,
|
||||
MergeListEntry * merge_entry_,
|
||||
time_t time_of_merge_,
|
||||
MergeTreeData::MutableDataPartPtr new_data_part_,
|
||||
ReservationSharedPtr space_reservation_)
|
||||
: name(std::move(name_))
|
||||
, parts(std::move(parts_))
|
||||
, projection(projection_)
|
||||
, block_num(block_num_)
|
||||
, context(context_)
|
||||
, table_lock_holder(table_lock_holder_)
|
||||
, mutator(mutator_)
|
||||
, merge_entry(merge_entry_)
|
||||
, time_of_merge(time_of_merge_)
|
||||
, new_data_part(new_data_part_)
|
||||
, space_reservation(space_reservation_)
|
||||
, log(getLogger("MergeProjectionPartsTask"))
|
||||
{
|
||||
LOG_DEBUG(log, "Selected {} projection_parts from {} to {}", parts.size(), parts.front()->name, parts.back()->name);
|
||||
level_parts[current_level] = std::move(parts);
|
||||
}
|
||||
|
||||
void onCompleted() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
|
||||
StorageID getStorageID() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
|
||||
Priority getPriority() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
|
||||
String getQueryId() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
|
||||
|
||||
bool executeStep() override;
|
||||
|
||||
private:
|
||||
String name;
|
||||
MergeTreeData::MutableDataPartsVector parts;
|
||||
const ProjectionDescription & projection;
|
||||
size_t & block_num;
|
||||
|
||||
ContextPtr context;
|
||||
TableLockHolder * table_lock_holder;
|
||||
MergeTreeDataMergerMutator * mutator;
|
||||
MergeListEntry * merge_entry;
|
||||
time_t time_of_merge;
|
||||
|
||||
MergeTreeData::MutableDataPartPtr new_data_part;
|
||||
ReservationSharedPtr space_reservation;
|
||||
|
||||
LoggerPtr log;
|
||||
|
||||
std::map<size_t, MergeTreeData::MutableDataPartsVector> level_parts;
|
||||
size_t current_level = 0;
|
||||
size_t next_level = 1;
|
||||
|
||||
/// TODO(nikitamikhaylov): make this constant a setting
|
||||
static constexpr size_t max_parts_to_merge_in_one_level = 10;
|
||||
};
|
||||
|
||||
}
|
@ -21,6 +21,8 @@
|
||||
#include <Storages/MergeTree/MergeTreeSettings.h>
|
||||
#include <Storages/MergeTree/FutureMergedMutatedPart.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataWriter.h>
|
||||
#include <Storages/MergeTree/MergeProjectionPartsTask.h>
|
||||
#include <Processors/Transforms/ExpressionTransform.h>
|
||||
#include <Processors/Transforms/MaterializingTransform.h>
|
||||
#include <Processors/Transforms/FilterTransform.h>
|
||||
@ -63,6 +65,7 @@ namespace ErrorCodes
|
||||
extern const int SUPPORT_IS_DISABLED;
|
||||
}
|
||||
|
||||
|
||||
static ColumnsStatistics getStatisticsForColumns(
|
||||
const NamesAndTypesList & columns_to_read,
|
||||
const StorageMetadataPtr & metadata_snapshot)
|
||||
@ -155,6 +158,13 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto * projection : global_ctx->projections_to_rebuild)
|
||||
{
|
||||
Names projection_columns_vec = projection->getRequiredColumns();
|
||||
std::copy(projection_columns_vec.cbegin(), projection_columns_vec.cend(),
|
||||
std::inserter(key_columns, key_columns.end()));
|
||||
}
|
||||
|
||||
/// TODO: also force "summing" and "aggregating" columns to make Horizontal merge only for such columns
|
||||
|
||||
for (const auto & column : global_ctx->storage_columns)
|
||||
@ -254,6 +264,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
|
||||
extendObjectColumns(global_ctx->storage_columns, object_columns, false);
|
||||
global_ctx->storage_snapshot = std::make_shared<StorageSnapshot>(*global_ctx->data, global_ctx->metadata_snapshot, std::move(object_columns));
|
||||
|
||||
prepareProjectionsToMergeAndRebuild();
|
||||
|
||||
extractMergingAndGatheringColumns();
|
||||
|
||||
global_ctx->new_data_part->uuid = global_ctx->future_part->uuid;
|
||||
@ -517,6 +529,148 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::execute()
|
||||
}
|
||||
|
||||
|
||||
void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRebuild() const
|
||||
{
|
||||
const auto mode = global_ctx->data->getSettings()->deduplicate_merge_projection_mode;
|
||||
/// Under throw mode, we still choose to drop projections due to backward compatibility since some
|
||||
/// users might have projections before this change.
|
||||
if (global_ctx->data->merging_params.mode != MergeTreeData::MergingParams::Ordinary
|
||||
&& (mode == DeduplicateMergeProjectionMode::THROW || mode == DeduplicateMergeProjectionMode::DROP))
|
||||
return;
|
||||
|
||||
/// These merging modes may or may not reduce number of rows. It's not known until the horizontal stage is finished.
|
||||
const bool merge_may_reduce_rows =
|
||||
global_ctx->cleanup ||
|
||||
global_ctx->deduplicate ||
|
||||
ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
|
||||
ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing ||
|
||||
ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing;
|
||||
|
||||
const auto & projections = global_ctx->metadata_snapshot->getProjections();
|
||||
|
||||
for (const auto & projection : projections)
|
||||
{
|
||||
if (merge_may_reduce_rows)
|
||||
{
|
||||
global_ctx->projections_to_rebuild.push_back(&projection);
|
||||
continue;
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartsVector projection_parts;
|
||||
for (const auto & part : global_ctx->future_part->parts)
|
||||
{
|
||||
auto it = part->getProjectionParts().find(projection.name);
|
||||
if (it != part->getProjectionParts().end() && !it->second->is_broken)
|
||||
projection_parts.push_back(it->second);
|
||||
}
|
||||
if (projection_parts.size() == global_ctx->future_part->parts.size())
|
||||
{
|
||||
global_ctx->projections_to_merge.push_back(&projection);
|
||||
global_ctx->projections_to_merge_parts[projection.name].assign(projection_parts.begin(), projection_parts.end());
|
||||
}
|
||||
else
|
||||
{
|
||||
chassert(projection_parts.size() < global_ctx->future_part->parts.size());
|
||||
LOG_DEBUG(ctx->log, "Projection {} is not merged because some parts don't have it", projection.name);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
const auto & settings = global_ctx->context->getSettingsRef();
|
||||
|
||||
for (const auto * projection : global_ctx->projections_to_rebuild)
|
||||
ctx->projection_squashes.emplace_back(projection->sample_block.cloneEmpty(),
|
||||
settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
|
||||
}
|
||||
|
||||
|
||||
void MergeTask::ExecuteAndFinalizeHorizontalPart::calculateProjections(const Block & block) const
|
||||
{
|
||||
for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i)
|
||||
{
|
||||
const auto & projection = *global_ctx->projections_to_rebuild[i];
|
||||
Block block_to_squash = projection.calculate(block, global_ctx->context);
|
||||
auto & projection_squash_plan = ctx->projection_squashes[i];
|
||||
projection_squash_plan.setHeader(block_to_squash.cloneEmpty());
|
||||
Chunk squashed_chunk = Squashing::squash(projection_squash_plan.add({block_to_squash.getColumns(), block_to_squash.rows()}));
|
||||
if (squashed_chunk)
|
||||
{
|
||||
auto result = projection_squash_plan.getHeader().cloneWithColumns(squashed_chunk.detachColumns());
|
||||
auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart(
|
||||
*global_ctx->data, ctx->log, result, projection, global_ctx->new_data_part.get(), ++ctx->projection_block_num);
|
||||
tmp_part.finalize();
|
||||
tmp_part.part->getDataPartStorage().commitTransaction();
|
||||
ctx->projection_parts[projection.name].emplace_back(std::move(tmp_part.part));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void MergeTask::ExecuteAndFinalizeHorizontalPart::finalizeProjections() const
|
||||
{
|
||||
for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i)
|
||||
{
|
||||
const auto & projection = *global_ctx->projections_to_rebuild[i];
|
||||
auto & projection_squash_plan = ctx->projection_squashes[i];
|
||||
auto squashed_chunk = Squashing::squash(projection_squash_plan.flush());
|
||||
if (squashed_chunk)
|
||||
{
|
||||
auto result = projection_squash_plan.getHeader().cloneWithColumns(squashed_chunk.detachColumns());
|
||||
auto temp_part = MergeTreeDataWriter::writeTempProjectionPart(
|
||||
*global_ctx->data, ctx->log, result, projection, global_ctx->new_data_part.get(), ++ctx->projection_block_num);
|
||||
temp_part.finalize();
|
||||
temp_part.part->getDataPartStorage().commitTransaction();
|
||||
ctx->projection_parts[projection.name].emplace_back(std::move(temp_part.part));
|
||||
}
|
||||
}
|
||||
|
||||
ctx->projection_parts_iterator = std::make_move_iterator(ctx->projection_parts.begin());
|
||||
if (ctx->projection_parts_iterator != std::make_move_iterator(ctx->projection_parts.end()))
|
||||
constructTaskForProjectionPartsMerge();
|
||||
}
|
||||
|
||||
|
||||
void MergeTask::ExecuteAndFinalizeHorizontalPart::constructTaskForProjectionPartsMerge() const
|
||||
{
|
||||
auto && [name, parts] = *ctx->projection_parts_iterator;
|
||||
const auto & projection = global_ctx->metadata_snapshot->projections.get(name);
|
||||
|
||||
ctx->merge_projection_parts_task_ptr = std::make_unique<MergeProjectionPartsTask>
|
||||
(
|
||||
name,
|
||||
std::move(parts),
|
||||
projection,
|
||||
ctx->projection_block_num,
|
||||
global_ctx->context,
|
||||
global_ctx->holder,
|
||||
global_ctx->mutator,
|
||||
global_ctx->merge_entry,
|
||||
global_ctx->time_of_merge,
|
||||
global_ctx->new_data_part,
|
||||
global_ctx->space_reservation
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeMergeProjections() // NOLINT
|
||||
{
|
||||
/// In case if there are no projections we didn't construct a task
|
||||
if (!ctx->merge_projection_parts_task_ptr)
|
||||
return false;
|
||||
|
||||
if (ctx->merge_projection_parts_task_ptr->executeStep())
|
||||
return true;
|
||||
|
||||
++ctx->projection_parts_iterator;
|
||||
|
||||
if (ctx->projection_parts_iterator == std::make_move_iterator(ctx->projection_parts.end()))
|
||||
return false;
|
||||
|
||||
constructTaskForProjectionPartsMerge();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
|
||||
{
|
||||
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
|
||||
@ -535,6 +689,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
|
||||
global_ctx->rows_written += block.rows();
|
||||
const_cast<MergedBlockOutputStream &>(*global_ctx->to).write(block);
|
||||
|
||||
calculateProjections(block);
|
||||
|
||||
UInt64 result_rows = 0;
|
||||
UInt64 result_bytes = 0;
|
||||
global_ctx->merged_pipeline.tryGetResultRowsAndBytes(result_rows, result_bytes);
|
||||
@ -558,8 +714,10 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void MergeTask::ExecuteAndFinalizeHorizontalPart::finalize() const
|
||||
{
|
||||
finalizeProjections();
|
||||
global_ctx->merging_executor.reset();
|
||||
global_ctx->merged_pipeline.reset();
|
||||
|
||||
@ -847,35 +1005,9 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
|
||||
ReadableSize(global_ctx->merge_list_element_ptr->bytes_read_uncompressed / elapsed_seconds));
|
||||
}
|
||||
|
||||
|
||||
const auto mode = global_ctx->data->getSettings()->deduplicate_merge_projection_mode;
|
||||
/// Under throw mode, we still choose to drop projections due to backward compatibility since some
|
||||
/// users might have projections before this change.
|
||||
if (global_ctx->data->merging_params.mode != MergeTreeData::MergingParams::Ordinary
|
||||
&& (mode == DeduplicateMergeProjectionMode::THROW || mode == DeduplicateMergeProjectionMode::DROP))
|
||||
for (const auto & projection : global_ctx->projections_to_merge)
|
||||
{
|
||||
ctx->projections_iterator = ctx->tasks_for_projections.begin();
|
||||
return false;
|
||||
}
|
||||
|
||||
const auto & projections = global_ctx->metadata_snapshot->getProjections();
|
||||
|
||||
for (const auto & projection : projections)
|
||||
{
|
||||
MergeTreeData::DataPartsVector projection_parts;
|
||||
for (const auto & part : global_ctx->future_part->parts)
|
||||
{
|
||||
auto actual_projection_parts = part->getProjectionParts();
|
||||
auto it = actual_projection_parts.find(projection.name);
|
||||
if (it != actual_projection_parts.end() && !it->second->is_broken)
|
||||
projection_parts.push_back(it->second);
|
||||
}
|
||||
if (projection_parts.size() < global_ctx->future_part->parts.size())
|
||||
{
|
||||
LOG_DEBUG(ctx->log, "Projection {} is not merged because some parts don't have it", projection.name);
|
||||
continue;
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartsVector projection_parts = global_ctx->projections_to_merge_parts[projection->name];
|
||||
LOG_DEBUG(
|
||||
ctx->log,
|
||||
"Selected {} projection_parts from {} to {}",
|
||||
@ -885,7 +1017,7 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
|
||||
|
||||
auto projection_future_part = std::make_shared<FutureMergedMutatedPart>();
|
||||
projection_future_part->assign(std::move(projection_parts));
|
||||
projection_future_part->name = projection.name;
|
||||
projection_future_part->name = projection->name;
|
||||
// TODO (ab): path in future_part is only for merge process introspection, which is not available for merges of projection parts.
|
||||
// Let's comment this out to avoid code inconsistency and add it back after we implement projection merge introspection.
|
||||
// projection_future_part->path = global_ctx->future_part->path + "/" + projection.name + ".proj/";
|
||||
@ -893,16 +1025,17 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
|
||||
|
||||
MergeTreeData::MergingParams projection_merging_params;
|
||||
projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary;
|
||||
if (projection.type == ProjectionDescription::Type::Aggregate)
|
||||
if (projection->type == ProjectionDescription::Type::Aggregate)
|
||||
projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating;
|
||||
|
||||
ctx->tasks_for_projections.emplace_back(std::make_shared<MergeTask>(
|
||||
projection_future_part,
|
||||
projection.metadata,
|
||||
projection->metadata,
|
||||
global_ctx->merge_entry,
|
||||
std::make_unique<MergeListElement>((*global_ctx->merge_entry)->table_id, projection_future_part, global_ctx->context),
|
||||
global_ctx->time_of_merge,
|
||||
global_ctx->context,
|
||||
*global_ctx->holder,
|
||||
global_ctx->space_reservation,
|
||||
global_ctx->deduplicate,
|
||||
global_ctx->deduplicate_by_columns,
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <Compression/CompressedReadBuffer.h>
|
||||
#include <Compression/CompressedReadBufferFromFile.h>
|
||||
|
||||
#include <Interpreters/Squashing.h>
|
||||
#include <Interpreters/TemporaryDataOnDisk.h>
|
||||
|
||||
#include <Processors/Executors/PullingPipelineExecutor.h>
|
||||
@ -72,6 +73,7 @@ public:
|
||||
std::unique_ptr<MergeListElement> projection_merge_list_element_,
|
||||
time_t time_of_merge_,
|
||||
ContextPtr context_,
|
||||
TableLockHolder & holder,
|
||||
ReservationSharedPtr space_reservation_,
|
||||
bool deduplicate_,
|
||||
Names deduplicate_by_columns_,
|
||||
@ -96,6 +98,7 @@ public:
|
||||
= global_ctx->projection_merge_list_element ? global_ctx->projection_merge_list_element.get() : (*global_ctx->merge_entry)->ptr();
|
||||
global_ctx->time_of_merge = std::move(time_of_merge_);
|
||||
global_ctx->context = std::move(context_);
|
||||
global_ctx->holder = &holder;
|
||||
global_ctx->space_reservation = std::move(space_reservation_);
|
||||
global_ctx->deduplicate = std::move(deduplicate_);
|
||||
global_ctx->deduplicate_by_columns = std::move(deduplicate_by_columns_);
|
||||
@ -151,6 +154,7 @@ private:
|
||||
/// Proper initialization is responsibility of the author
|
||||
struct GlobalRuntimeContext : public IStageRuntimeContext
|
||||
{
|
||||
TableLockHolder * holder;
|
||||
MergeList::Entry * merge_entry{nullptr};
|
||||
/// If not null, use this instead of the global MergeList::Entry. This is for merging projections.
|
||||
std::unique_ptr<MergeListElement> projection_merge_list_element;
|
||||
@ -181,6 +185,10 @@ private:
|
||||
|
||||
MergeAlgorithm chosen_merge_algorithm{MergeAlgorithm::Undecided};
|
||||
|
||||
std::vector<ProjectionDescriptionRawPtr> projections_to_rebuild{};
|
||||
std::vector<ProjectionDescriptionRawPtr> projections_to_merge{};
|
||||
std::map<String, MergeTreeData::DataPartsVector> projections_to_merge_parts{};
|
||||
|
||||
std::unique_ptr<MergeStageProgress> horizontal_stage_progress{nullptr};
|
||||
std::unique_ptr<MergeStageProgress> column_progress{nullptr};
|
||||
|
||||
@ -228,6 +236,14 @@ private:
|
||||
std::unique_ptr<WriteBuffer> rows_sources_write_buf{nullptr};
|
||||
std::optional<ColumnSizeEstimator> column_sizes{};
|
||||
|
||||
/// For projections to rebuild
|
||||
using ProjectionNameToItsBlocks = std::map<String, MergeTreeData::MutableDataPartsVector>;
|
||||
ProjectionNameToItsBlocks projection_parts;
|
||||
std::move_iterator<ProjectionNameToItsBlocks::iterator> projection_parts_iterator;
|
||||
std::vector<Squashing> projection_squashes;
|
||||
size_t projection_block_num = 0;
|
||||
ExecutableTaskPtr merge_projection_parts_task_ptr;
|
||||
|
||||
size_t initial_reservation{0};
|
||||
bool read_with_direct_io{false};
|
||||
|
||||
@ -257,16 +273,23 @@ private:
|
||||
void finalize() const;
|
||||
|
||||
/// NOTE: Using pointer-to-member instead of std::function and lambda makes stacktraces much more concise and readable
|
||||
using ExecuteAndFinalizeHorizontalPartSubtasks = std::array<bool(ExecuteAndFinalizeHorizontalPart::*)(), 2>;
|
||||
using ExecuteAndFinalizeHorizontalPartSubtasks = std::array<bool(ExecuteAndFinalizeHorizontalPart::*)(), 3>;
|
||||
|
||||
const ExecuteAndFinalizeHorizontalPartSubtasks subtasks
|
||||
{
|
||||
&ExecuteAndFinalizeHorizontalPart::prepare,
|
||||
&ExecuteAndFinalizeHorizontalPart::executeImpl
|
||||
&ExecuteAndFinalizeHorizontalPart::executeImpl,
|
||||
&ExecuteAndFinalizeHorizontalPart::executeMergeProjections
|
||||
};
|
||||
|
||||
ExecuteAndFinalizeHorizontalPartSubtasks::const_iterator subtasks_iterator = subtasks.begin();
|
||||
|
||||
void prepareProjectionsToMergeAndRebuild() const;
|
||||
void calculateProjections(const Block & block) const;
|
||||
void finalizeProjections() const;
|
||||
void constructTaskForProjectionPartsMerge() const;
|
||||
bool executeMergeProjections();
|
||||
|
||||
MergeAlgorithm chooseMergeAlgorithm() const;
|
||||
void createMergedStream();
|
||||
void extractMergingAndGatheringColumns() const;
|
||||
|
@ -671,7 +671,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
MergeList::Entry * merge_entry,
|
||||
std::unique_ptr<MergeListElement> projection_merge_list_element,
|
||||
TableLockHolder,
|
||||
TableLockHolder & holder,
|
||||
time_t time_of_merge,
|
||||
ContextPtr context,
|
||||
ReservationSharedPtr space_reservation,
|
||||
@ -691,6 +691,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
|
||||
std::move(projection_merge_list_element),
|
||||
time_of_merge,
|
||||
context,
|
||||
holder,
|
||||
space_reservation,
|
||||
deduplicate,
|
||||
deduplicate_by_columns,
|
||||
|
@ -159,7 +159,7 @@ public:
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
MergeListEntry * merge_entry,
|
||||
std::unique_ptr<MergeListElement> projection_merge_list_element,
|
||||
TableLockHolder table_lock_holder,
|
||||
TableLockHolder & table_lock_holder,
|
||||
time_t time_of_merge,
|
||||
ContextPtr context,
|
||||
ReservationSharedPtr space_reservation,
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
|
||||
#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataWriter.h>
|
||||
#include <Storages/MergeTree/MergeProjectionPartsTask.h>
|
||||
#include <Storages/MutationCommands.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndexFullText.h>
|
||||
@ -1058,136 +1059,6 @@ struct MutationContext
|
||||
using MutationContextPtr = std::shared_ptr<MutationContext>;
|
||||
|
||||
|
||||
class MergeProjectionPartsTask : public IExecutableTask
|
||||
{
|
||||
public:
|
||||
|
||||
MergeProjectionPartsTask(
|
||||
String name_,
|
||||
MergeTreeData::MutableDataPartsVector && parts_,
|
||||
const ProjectionDescription & projection_,
|
||||
size_t & block_num_,
|
||||
MutationContextPtr ctx_)
|
||||
: name(std::move(name_))
|
||||
, parts(std::move(parts_))
|
||||
, projection(projection_)
|
||||
, block_num(block_num_)
|
||||
, ctx(ctx_)
|
||||
, log(getLogger("MergeProjectionPartsTask"))
|
||||
{
|
||||
LOG_DEBUG(log, "Selected {} projection_parts from {} to {}", parts.size(), parts.front()->name, parts.back()->name);
|
||||
level_parts[current_level] = std::move(parts);
|
||||
}
|
||||
|
||||
void onCompleted() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
|
||||
StorageID getStorageID() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
|
||||
Priority getPriority() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
|
||||
String getQueryId() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
|
||||
|
||||
bool executeStep() override
|
||||
{
|
||||
auto & current_level_parts = level_parts[current_level];
|
||||
auto & next_level_parts = level_parts[next_level];
|
||||
|
||||
MergeTreeData::MutableDataPartsVector selected_parts;
|
||||
while (selected_parts.size() < max_parts_to_merge_in_one_level && !current_level_parts.empty())
|
||||
{
|
||||
selected_parts.push_back(std::move(current_level_parts.back()));
|
||||
current_level_parts.pop_back();
|
||||
}
|
||||
|
||||
if (selected_parts.empty())
|
||||
{
|
||||
if (next_level_parts.empty())
|
||||
{
|
||||
LOG_WARNING(log, "There is no projection parts merged");
|
||||
|
||||
/// Task is finished
|
||||
return false;
|
||||
}
|
||||
current_level = next_level;
|
||||
++next_level;
|
||||
}
|
||||
else if (selected_parts.size() == 1)
|
||||
{
|
||||
if (next_level_parts.empty())
|
||||
{
|
||||
LOG_DEBUG(log, "Merged a projection part in level {}", current_level);
|
||||
selected_parts[0]->renameTo(projection.name + ".proj", true);
|
||||
selected_parts[0]->setName(projection.name);
|
||||
selected_parts[0]->is_temp = false;
|
||||
ctx->new_data_part->addProjectionPart(name, std::move(selected_parts[0]));
|
||||
|
||||
/// Task is finished
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(log, "Forwarded part {} in level {} to next level", selected_parts[0]->name, current_level);
|
||||
next_level_parts.push_back(std::move(selected_parts[0]));
|
||||
}
|
||||
}
|
||||
else if (selected_parts.size() > 1)
|
||||
{
|
||||
// Generate a unique part name
|
||||
++block_num;
|
||||
auto projection_future_part = std::make_shared<FutureMergedMutatedPart>();
|
||||
MergeTreeData::DataPartsVector const_selected_parts(
|
||||
std::make_move_iterator(selected_parts.begin()), std::make_move_iterator(selected_parts.end()));
|
||||
projection_future_part->assign(std::move(const_selected_parts));
|
||||
projection_future_part->name = fmt::format("{}_{}", projection.name, ++block_num);
|
||||
projection_future_part->part_info = {"all", 0, 0, 0};
|
||||
|
||||
MergeTreeData::MergingParams projection_merging_params;
|
||||
projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary;
|
||||
if (projection.type == ProjectionDescription::Type::Aggregate)
|
||||
projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating;
|
||||
|
||||
LOG_DEBUG(log, "Merged {} parts in level {} to {}", selected_parts.size(), current_level, projection_future_part->name);
|
||||
auto tmp_part_merge_task = ctx->mutator->mergePartsToTemporaryPart(
|
||||
projection_future_part,
|
||||
projection.metadata,
|
||||
ctx->mutate_entry,
|
||||
std::make_unique<MergeListElement>((*ctx->mutate_entry)->table_id, projection_future_part, ctx->context),
|
||||
*ctx->holder,
|
||||
ctx->time_of_mutation,
|
||||
ctx->context,
|
||||
ctx->space_reservation,
|
||||
false, // TODO Do we need deduplicate for projections
|
||||
{},
|
||||
false, // no cleanup
|
||||
projection_merging_params,
|
||||
NO_TRANSACTION_PTR,
|
||||
/* need_prefix */ true,
|
||||
ctx->new_data_part.get(),
|
||||
".tmp_proj");
|
||||
|
||||
next_level_parts.push_back(executeHere(tmp_part_merge_task));
|
||||
next_level_parts.back()->is_temp = true;
|
||||
}
|
||||
|
||||
/// Need execute again
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
String name;
|
||||
MergeTreeData::MutableDataPartsVector parts;
|
||||
const ProjectionDescription & projection;
|
||||
size_t & block_num;
|
||||
MutationContextPtr ctx;
|
||||
|
||||
LoggerPtr log;
|
||||
|
||||
std::map<size_t, MergeTreeData::MutableDataPartsVector> level_parts;
|
||||
size_t current_level = 0;
|
||||
size_t next_level = 1;
|
||||
|
||||
/// TODO(nikitamikhaylov): make this constant a setting
|
||||
static constexpr size_t max_parts_to_merge_in_one_level = 10;
|
||||
};
|
||||
|
||||
|
||||
// This class is responsible for:
|
||||
// 1. get projection pipeline and a sink to write parts
|
||||
// 2. build an executor that can write block to the input stream (actually we can write through it to generate as many parts as possible)
|
||||
@ -1406,7 +1277,13 @@ void PartMergerWriter::constructTaskForProjectionPartsMerge()
|
||||
std::move(parts),
|
||||
projection,
|
||||
block_num,
|
||||
ctx
|
||||
ctx->context,
|
||||
ctx->holder,
|
||||
ctx->mutator,
|
||||
ctx->mutate_entry,
|
||||
ctx->time_of_mutation,
|
||||
ctx->new_data_part,
|
||||
ctx->space_reservation
|
||||
);
|
||||
}
|
||||
|
||||
|
28
tests/queries/0_stateless/02968_projection_merge.reference
Normal file
28
tests/queries/0_stateless/02968_projection_merge.reference
Normal file
@ -0,0 +1,28 @@
|
||||
ReplacingMergeTree
|
||||
0 2
|
||||
1 2
|
||||
2 2
|
||||
0 2
|
||||
1 2
|
||||
2 2
|
||||
CollapsingMergeTree
|
||||
0 2
|
||||
1 2
|
||||
2 2
|
||||
0 2
|
||||
1 2
|
||||
2 2
|
||||
VersionedCollapsingMergeTree
|
||||
0 2
|
||||
1 2
|
||||
2 2
|
||||
0 2
|
||||
1 2
|
||||
2 2
|
||||
DEDUPLICATE ON MergeTree
|
||||
0 1
|
||||
1 1
|
||||
2 1
|
||||
0 1
|
||||
1 1
|
||||
2 1
|
116
tests/queries/0_stateless/02968_projection_merge.sql
Normal file
116
tests/queries/0_stateless/02968_projection_merge.sql
Normal file
@ -0,0 +1,116 @@
|
||||
SELECT 'ReplacingMergeTree';
|
||||
DROP TABLE IF EXISTS tp;
|
||||
CREATE TABLE tp
|
||||
(
|
||||
`type` Int32,
|
||||
`eventcnt` UInt64,
|
||||
PROJECTION p
|
||||
(
|
||||
SELECT type,sum(eventcnt)
|
||||
GROUP BY type
|
||||
)
|
||||
)
|
||||
ENGINE = ReplacingMergeTree
|
||||
ORDER BY type
|
||||
SETTINGS deduplicate_merge_projection_mode = 'rebuild';
|
||||
|
||||
INSERT INTO tp SELECT number%3, 1 FROM numbers(3);
|
||||
INSERT INTO tp SELECT number%3, 2 FROM numbers(3);
|
||||
|
||||
OPTIMIZE TABLE tp FINAL;
|
||||
|
||||
SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type
|
||||
SETTINGS optimize_use_projections = 0, force_optimize_projection = 0;
|
||||
|
||||
SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type
|
||||
SETTINGS optimize_use_projections = 1, force_optimize_projection = 1;
|
||||
|
||||
|
||||
SELECT 'CollapsingMergeTree';
|
||||
DROP TABLE IF EXISTS tp;
|
||||
CREATE TABLE tp
|
||||
(
|
||||
`type` Int32,
|
||||
`eventcnt` UInt64,
|
||||
`sign` Int8,
|
||||
PROJECTION p
|
||||
(
|
||||
SELECT type,sum(eventcnt)
|
||||
GROUP BY type
|
||||
)
|
||||
)
|
||||
ENGINE = CollapsingMergeTree(sign)
|
||||
ORDER BY type
|
||||
SETTINGS deduplicate_merge_projection_mode = 'rebuild';
|
||||
|
||||
INSERT INTO tp SELECT number % 3, 1, 1 FROM numbers(3);
|
||||
INSERT INTO tp SELECT number % 3, 1, -1 FROM numbers(3);
|
||||
INSERT INTO tp SELECT number % 3, 2, 1 FROM numbers(3);
|
||||
|
||||
OPTIMIZE TABLE tp FINAL;
|
||||
|
||||
SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type
|
||||
SETTINGS optimize_use_projections = 0, force_optimize_projection = 0;
|
||||
|
||||
SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type
|
||||
SETTINGS optimize_use_projections = 1, force_optimize_projection = 1;
|
||||
|
||||
-- Actually we don't need to test all 3 engines Replacing/Collapsing/VersionedCollapsing,
|
||||
-- Because they share the same logic of 'reduce number of rows during merges'
|
||||
SELECT 'VersionedCollapsingMergeTree';
|
||||
DROP TABLE IF EXISTS tp;
|
||||
CREATE TABLE tp
|
||||
(
|
||||
`type` Int32,
|
||||
`eventcnt` UInt64,
|
||||
`sign` Int8,
|
||||
`version` UInt8,
|
||||
PROJECTION p
|
||||
(
|
||||
SELECT type,sum(eventcnt)
|
||||
GROUP BY type
|
||||
)
|
||||
)
|
||||
ENGINE = VersionedCollapsingMergeTree(sign,version)
|
||||
ORDER BY type
|
||||
SETTINGS deduplicate_merge_projection_mode = 'rebuild';
|
||||
|
||||
INSERT INTO tp SELECT number % 3, 1, -1, 0 FROM numbers(3);
|
||||
INSERT INTO tp SELECT number % 3, 2, 1, 1 FROM numbers(3);
|
||||
INSERT INTO tp SELECT number % 3, 1, 1, 0 FROM numbers(3);
|
||||
|
||||
OPTIMIZE TABLE tp FINAL;
|
||||
|
||||
SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type
|
||||
SETTINGS optimize_use_projections = 0, force_optimize_projection = 0;
|
||||
|
||||
SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type
|
||||
SETTINGS optimize_use_projections = 1, force_optimize_projection = 1;
|
||||
|
||||
SELECT 'DEDUPLICATE ON MergeTree';
|
||||
DROP TABLE IF EXISTS tp;
|
||||
CREATE TABLE tp
|
||||
(
|
||||
`type` Int32,
|
||||
`eventcnt` UInt64,
|
||||
PROJECTION p
|
||||
(
|
||||
SELECT type,sum(eventcnt)
|
||||
GROUP BY type
|
||||
)
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY type
|
||||
SETTINGS deduplicate_merge_projection_mode = 'rebuild';
|
||||
|
||||
INSERT INTO tp SELECT number % 3, 1 FROM numbers(3);
|
||||
INSERT INTO tp SELECT number % 3, 2 FROM numbers(3);
|
||||
|
||||
OPTIMIZE TABLE tp FINAL DEDUPLICATE BY type;
|
||||
|
||||
SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type
|
||||
SETTINGS optimize_use_projections = 0, force_optimize_projection = 0;
|
||||
|
||||
SELECT type,sum(eventcnt) FROM tp GROUP BY type ORDER BY type
|
||||
SETTINGS optimize_use_projections = 1, force_optimize_projection = 1;
|
||||
|
Loading…
Reference in New Issue
Block a user