fix squash related and projection collection

This commit is contained in:
jsc0218 2024-08-10 02:37:42 +00:00
parent 22dad244e7
commit a837df164c
2 changed files with 38 additions and 21 deletions

View File

@ -502,10 +502,11 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::calculateProjections(const Blo
{
const auto & projection = *global_ctx->projections_to_rebuild[i];
Block block_to_squash = projection.calculate(block, global_ctx->context);
auto chunk = ctx->projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()});
if (chunk)
ctx->projection_squashes[i].setHeader(block_to_squash.cloneEmpty());
auto squashed_chunk = Squashing::squash(ctx->projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()}));
if (squashed_chunk)
{
auto result = ctx->projection_squashes[i].getHeader().cloneWithColumns(chunk.detachColumns());
auto result = ctx->projection_squashes[i].getHeader().cloneWithColumns(squashed_chunk.detachColumns());
auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart(
*global_ctx->data, ctx->log, result, projection, global_ctx->new_data_part.get(), ++ctx->projection_block_num);
tmp_part.finalize();
@ -590,8 +591,23 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
return true;
}
// finalize projections
calculateProjections(global_ctx->merging_executor->getHeader().cloneEmpty());
/// finalize projections
// calculateProjections(global_ctx->merging_executor->getHeader().cloneEmpty());
for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i)
{
const auto & projection = *global_ctx->projections_to_rebuild[i];
auto & projection_squash_plan = ctx->projection_squashes[i];
auto squashed_chunk = Squashing::squash(projection_squash_plan.flush());
if (squashed_chunk)
{
auto result = projection_squash_plan.getHeader().cloneWithColumns(squashed_chunk.detachColumns());
auto temp_part = MergeTreeDataWriter::writeTempProjectionPart(
*global_ctx->data, ctx->log, result, projection, global_ctx->new_data_part.get(), ++ctx->projection_block_num);
temp_part.finalize();
temp_part.part->getDataPartStorage().commitTransaction();
ctx->projection_parts[projection.name].emplace_back(std::move(temp_part.part));
}
}
ctx->projection_parts_iterator = std::make_move_iterator(ctx->projection_parts.begin());
if (ctx->projection_parts_iterator != std::make_move_iterator(ctx->projection_parts.end()))
@ -878,16 +894,6 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
ReadableSize(global_ctx->merge_list_element_ptr->bytes_read_uncompressed / elapsed_seconds));
}
const auto mode = global_ctx->data->getSettings()->deduplicate_merge_projection_mode;
/// Under throw mode, we still choose to drop projections due to backward compatibility since some
/// users might have projections before this change.
if (global_ctx->data->merging_params.mode != MergeTreeData::MergingParams::Ordinary
&& (mode == DeduplicateMergeProjectionMode::THROW || mode == DeduplicateMergeProjectionMode::DROP))
{
ctx->projections_iterator = ctx->tasks_for_projections.begin();
return false;
}
for (const auto & projection : global_ctx->projections_to_merge)
{
MergeTreeData::DataPartsVector projection_parts = global_ctx->projections_to_merge_parts[projection->name];
@ -1281,6 +1287,13 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRebuild() const
{
const auto mode = global_ctx->data->getSettings()->deduplicate_merge_projection_mode;
/// Under throw mode, we still choose to drop projections due to backward compatibility since some
/// users might have projections before this change.
if (global_ctx->data->merging_params.mode != MergeTreeData::MergingParams::Ordinary
&& (mode == DeduplicateMergeProjectionMode::THROW || mode == DeduplicateMergeProjectionMode::DROP))
return;
// These merging modes may or may not reduce number of rows. It's not known until the horizontal stage is finished.
const bool merge_may_reduce_rows =
global_ctx->cleanup ||
@ -1324,9 +1337,9 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRe
const auto & settings = global_ctx->context->getSettingsRef();
for (auto projection : global_ctx->projections_to_rebuild)
for (const auto * projection : global_ctx->projections_to_rebuild)
{
ctx->projection_squashes.emplace_back(projection->sample_block, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
ctx->projection_squashes.emplace_back(projection->sample_block.cloneEmpty(), settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
}
}

View File

@ -11,7 +11,8 @@ CREATE TABLE tp
)
)
ENGINE = ReplacingMergeTree
ORDER BY type;
ORDER BY type
SETTINGS deduplicate_merge_projection_mode = 'rebuild';
INSERT INTO tp SELECT number%3, 1 FROM numbers(3);
INSERT INTO tp SELECT number%3, 2 FROM numbers(3);
@ -39,7 +40,8 @@ CREATE TABLE tp
)
)
ENGINE = CollapsingMergeTree(sign)
ORDER BY type;
ORDER BY type
SETTINGS deduplicate_merge_projection_mode = 'rebuild';
INSERT INTO tp SELECT number % 3, 1, 1 FROM numbers(3);
INSERT INTO tp SELECT number % 3, 1, -1 FROM numbers(3);
@ -70,7 +72,8 @@ CREATE TABLE tp
)
)
ENGINE = VersionedCollapsingMergeTree(sign,version)
ORDER BY type;
ORDER BY type
SETTINGS deduplicate_merge_projection_mode = 'rebuild';
INSERT INTO tp SELECT number % 3, 1, -1, 0 FROM numbers(3);
INSERT INTO tp SELECT number % 3, 2, 1, 1 FROM numbers(3);
@ -97,7 +100,8 @@ CREATE TABLE tp
)
)
ENGINE = MergeTree
ORDER BY type;
ORDER BY type
SETTINGS deduplicate_merge_projection_mode = 'rebuild';
INSERT INTO tp SELECT number % 3, 1 FROM numbers(3);
INSERT INTO tp SELECT number % 3, 2 FROM numbers(3);