mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
tidy
This commit is contained in:
parent
a837df164c
commit
6bed26a527
@ -145,7 +145,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto & projection : global_ctx->projections_to_rebuild)
|
||||
for (const auto * projection : global_ctx->projections_to_rebuild)
|
||||
{
|
||||
Names projection_columns_vec = projection->getRequiredColumns();
|
||||
std::copy(projection_columns_vec.cbegin(), projection_columns_vec.cend(),
|
||||
@ -496,17 +496,76 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::execute()
|
||||
}
|
||||
|
||||
|
||||
void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRebuild() const
|
||||
{
|
||||
const auto mode = global_ctx->data->getSettings()->deduplicate_merge_projection_mode;
|
||||
/// Under throw mode, we still choose to drop projections due to backward compatibility since some
|
||||
/// users might have projections before this change.
|
||||
if (global_ctx->data->merging_params.mode != MergeTreeData::MergingParams::Ordinary
|
||||
&& (mode == DeduplicateMergeProjectionMode::THROW || mode == DeduplicateMergeProjectionMode::DROP))
|
||||
return;
|
||||
|
||||
/// These merging modes may or may not reduce number of rows. It's not known until the horizontal stage is finished.
|
||||
const bool merge_may_reduce_rows =
|
||||
global_ctx->cleanup ||
|
||||
global_ctx->deduplicate ||
|
||||
ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
|
||||
ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing ||
|
||||
ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing;
|
||||
|
||||
const auto & projections = global_ctx->metadata_snapshot->getProjections();
|
||||
|
||||
for (const auto & projection : projections)
|
||||
{
|
||||
if (merge_may_reduce_rows)
|
||||
{
|
||||
global_ctx->projections_to_rebuild.push_back(&projection);
|
||||
continue;
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartsVector projection_parts;
|
||||
for (const auto & part : global_ctx->future_part->parts)
|
||||
{
|
||||
auto it = part->getProjectionParts().find(projection.name);
|
||||
if (it != part->getProjectionParts().end())
|
||||
projection_parts.push_back(it->second);
|
||||
}
|
||||
if (projection_parts.size() == global_ctx->future_part->parts.size())
|
||||
{
|
||||
global_ctx->projections_to_merge.push_back(&projection);
|
||||
global_ctx->projections_to_merge_parts[projection.name].assign(projection_parts.begin(), projection_parts.end());
|
||||
}
|
||||
else if (projection_parts.empty())
|
||||
{
|
||||
LOG_DEBUG(ctx->log, "Projection {} will not be merged or rebuilt because all parts don't have it", projection.name);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(ctx->log, "Projection {} will be rebuilt because some parts don't have it", projection.name);
|
||||
global_ctx->projections_to_rebuild.push_back(&projection);
|
||||
}
|
||||
}
|
||||
|
||||
const auto & settings = global_ctx->context->getSettingsRef();
|
||||
|
||||
for (const auto * projection : global_ctx->projections_to_rebuild)
|
||||
ctx->projection_squashes.emplace_back(projection->sample_block.cloneEmpty(),
|
||||
settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
|
||||
}
|
||||
|
||||
|
||||
void MergeTask::ExecuteAndFinalizeHorizontalPart::calculateProjections(const Block & block) const
|
||||
{
|
||||
for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i)
|
||||
{
|
||||
const auto & projection = *global_ctx->projections_to_rebuild[i];
|
||||
Block block_to_squash = projection.calculate(block, global_ctx->context);
|
||||
ctx->projection_squashes[i].setHeader(block_to_squash.cloneEmpty());
|
||||
auto squashed_chunk = Squashing::squash(ctx->projection_squashes[i].add({block_to_squash.getColumns(), block_to_squash.rows()}));
|
||||
auto & projection_squash_plan = ctx->projection_squashes[i];
|
||||
projection_squash_plan.setHeader(block_to_squash.cloneEmpty());
|
||||
Chunk squashed_chunk = Squashing::squash(projection_squash_plan.add({block_to_squash.getColumns(), block_to_squash.rows()}));
|
||||
if (squashed_chunk)
|
||||
{
|
||||
auto result = ctx->projection_squashes[i].getHeader().cloneWithColumns(squashed_chunk.detachColumns());
|
||||
auto result = projection_squash_plan.getHeader().cloneWithColumns(squashed_chunk.detachColumns());
|
||||
auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart(
|
||||
*global_ctx->data, ctx->log, result, projection, global_ctx->new_data_part.get(), ++ctx->projection_block_num);
|
||||
tmp_part.finalize();
|
||||
@ -517,6 +576,30 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::calculateProjections(const Blo
|
||||
}
|
||||
|
||||
|
||||
void MergeTask::ExecuteAndFinalizeHorizontalPart::finalizeProjections() const
|
||||
{
|
||||
for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i)
|
||||
{
|
||||
const auto & projection = *global_ctx->projections_to_rebuild[i];
|
||||
auto & projection_squash_plan = ctx->projection_squashes[i];
|
||||
auto squashed_chunk = Squashing::squash(projection_squash_plan.flush());
|
||||
if (squashed_chunk)
|
||||
{
|
||||
auto result = projection_squash_plan.getHeader().cloneWithColumns(squashed_chunk.detachColumns());
|
||||
auto temp_part = MergeTreeDataWriter::writeTempProjectionPart(
|
||||
*global_ctx->data, ctx->log, result, projection, global_ctx->new_data_part.get(), ++ctx->projection_block_num);
|
||||
temp_part.finalize();
|
||||
temp_part.part->getDataPartStorage().commitTransaction();
|
||||
ctx->projection_parts[projection.name].emplace_back(std::move(temp_part.part));
|
||||
}
|
||||
}
|
||||
|
||||
ctx->projection_parts_iterator = std::make_move_iterator(ctx->projection_parts.begin());
|
||||
if (ctx->projection_parts_iterator != std::make_move_iterator(ctx->projection_parts.end()))
|
||||
constructTaskForProjectionPartsMerge();
|
||||
}
|
||||
|
||||
|
||||
void MergeTask::ExecuteAndFinalizeHorizontalPart::constructTaskForProjectionPartsMerge() const
|
||||
{
|
||||
auto && [name, parts] = *ctx->projection_parts_iterator;
|
||||
@ -591,27 +674,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
|
||||
return true;
|
||||
}
|
||||
|
||||
/// finalize projections
|
||||
// calculateProjections(global_ctx->merging_executor->getHeader().cloneEmpty());
|
||||
for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i)
|
||||
{
|
||||
const auto & projection = *global_ctx->projections_to_rebuild[i];
|
||||
auto & projection_squash_plan = ctx->projection_squashes[i];
|
||||
auto squashed_chunk = Squashing::squash(projection_squash_plan.flush());
|
||||
if (squashed_chunk)
|
||||
{
|
||||
auto result = projection_squash_plan.getHeader().cloneWithColumns(squashed_chunk.detachColumns());
|
||||
auto temp_part = MergeTreeDataWriter::writeTempProjectionPart(
|
||||
*global_ctx->data, ctx->log, result, projection, global_ctx->new_data_part.get(), ++ctx->projection_block_num);
|
||||
temp_part.finalize();
|
||||
temp_part.part->getDataPartStorage().commitTransaction();
|
||||
ctx->projection_parts[projection.name].emplace_back(std::move(temp_part.part));
|
||||
}
|
||||
}
|
||||
|
||||
ctx->projection_parts_iterator = std::make_move_iterator(ctx->projection_parts.begin());
|
||||
if (ctx->projection_parts_iterator != std::make_move_iterator(ctx->projection_parts.end()))
|
||||
constructTaskForProjectionPartsMerge();
|
||||
finalizeProjections();
|
||||
|
||||
global_ctx->merging_executor.reset();
|
||||
global_ctx->merged_pipeline.reset();
|
||||
@ -1285,64 +1348,6 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
|
||||
global_ctx->merging_executor = std::make_unique<PullingPipelineExecutor>(global_ctx->merged_pipeline);
|
||||
}
|
||||
|
||||
void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRebuild() const
|
||||
{
|
||||
const auto mode = global_ctx->data->getSettings()->deduplicate_merge_projection_mode;
|
||||
/// Under throw mode, we still choose to drop projections due to backward compatibility since some
|
||||
/// users might have projections before this change.
|
||||
if (global_ctx->data->merging_params.mode != MergeTreeData::MergingParams::Ordinary
|
||||
&& (mode == DeduplicateMergeProjectionMode::THROW || mode == DeduplicateMergeProjectionMode::DROP))
|
||||
return;
|
||||
|
||||
// These merging modes may or may not reduce number of rows. It's not known until the horizontal stage is finished.
|
||||
const bool merge_may_reduce_rows =
|
||||
global_ctx->cleanup ||
|
||||
global_ctx->deduplicate ||
|
||||
ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
|
||||
ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing ||
|
||||
ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing;
|
||||
|
||||
const auto & projections = global_ctx->metadata_snapshot->getProjections();
|
||||
|
||||
for (const auto & projection : projections)
|
||||
{
|
||||
if (merge_may_reduce_rows)
|
||||
{
|
||||
global_ctx->projections_to_rebuild.push_back(&projection);
|
||||
continue;
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartsVector projection_parts;
|
||||
for (const auto & part : global_ctx->future_part->parts)
|
||||
{
|
||||
auto it = part->getProjectionParts().find(projection.name);
|
||||
if (it != part->getProjectionParts().end())
|
||||
projection_parts.push_back(it->second);
|
||||
}
|
||||
if (projection_parts.size() == global_ctx->future_part->parts.size())
|
||||
{
|
||||
global_ctx->projections_to_merge.push_back(&projection);
|
||||
global_ctx->projections_to_merge_parts[projection.name].assign(projection_parts.begin(), projection_parts.end());
|
||||
}
|
||||
else if (projection_parts.empty())
|
||||
{
|
||||
LOG_DEBUG(ctx->log, "Projection {} will not be merged or rebuilt because all parts don't have it", projection.name);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(ctx->log, "Projection {} will be rebuilt because some parts don't have it", projection.name);
|
||||
global_ctx->projections_to_rebuild.push_back(&projection);
|
||||
}
|
||||
}
|
||||
|
||||
const auto & settings = global_ctx->context->getSettingsRef();
|
||||
|
||||
for (const auto * projection : global_ctx->projections_to_rebuild)
|
||||
{
|
||||
ctx->projection_squashes.emplace_back(projection->sample_block.cloneEmpty(), settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
MergeAlgorithm MergeTask::ExecuteAndFinalizeHorizontalPart::chooseMergeAlgorithm() const
|
||||
{
|
||||
|
@ -227,7 +227,7 @@ private:
|
||||
std::unique_ptr<WriteBuffer> rows_sources_write_buf{nullptr};
|
||||
std::optional<ColumnSizeEstimator> column_sizes{};
|
||||
|
||||
// For projections to rebuild
|
||||
/// For projections to rebuild
|
||||
using ProjectionNameToItsBlocks = std::map<String, MergeTreeData::MutableDataPartsVector>;
|
||||
ProjectionNameToItsBlocks projection_parts;
|
||||
std::move_iterator<ProjectionNameToItsBlocks::iterator> projection_parts_iterator;
|
||||
@ -275,6 +275,7 @@ private:
|
||||
|
||||
void prepareProjectionsToMergeAndRebuild() const;
|
||||
void calculateProjections(const Block & block) const;
|
||||
void finalizeProjections() const;
|
||||
void constructTaskForProjectionPartsMerge() const;
|
||||
bool executeMergeProjections();
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user