mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Fix projection merge for Collapsing/Replacing/VersionedCollapsing MergeTree
This commit is contained in:
parent
633aeaaa76
commit
cc5456c649
95
src/Storages/MergeTree/MergeProjectionPartsTask.cpp
Normal file
95
src/Storages/MergeTree/MergeProjectionPartsTask.cpp
Normal file
@ -0,0 +1,95 @@
|
||||
#include <Storages/MergeTree/MergeProjectionPartsTask.h>
|
||||
|
||||
#include <Common/TransactionID.h>
|
||||
#include <Storages/MergeTree/MergeList.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
bool MergeProjectionPartsTask::executeStep()
|
||||
{
|
||||
auto & current_level_parts = level_parts[current_level];
|
||||
auto & next_level_parts = level_parts[next_level];
|
||||
|
||||
MergeTreeData::MutableDataPartsVector selected_parts;
|
||||
while (selected_parts.size() < max_parts_to_merge_in_one_level && !current_level_parts.empty())
|
||||
{
|
||||
selected_parts.push_back(std::move(current_level_parts.back()));
|
||||
current_level_parts.pop_back();
|
||||
}
|
||||
|
||||
if (selected_parts.empty())
|
||||
{
|
||||
if (next_level_parts.empty())
|
||||
{
|
||||
LOG_WARNING(log, "There is no projection parts merged");
|
||||
|
||||
/// Task is finished
|
||||
return false;
|
||||
}
|
||||
current_level = next_level;
|
||||
++next_level;
|
||||
}
|
||||
else if (selected_parts.size() == 1)
|
||||
{
|
||||
if (next_level_parts.empty())
|
||||
{
|
||||
LOG_DEBUG(log, "Merged a projection part in level {}", current_level);
|
||||
selected_parts[0]->renameTo(projection.name + ".proj", true);
|
||||
selected_parts[0]->setName(projection.name);
|
||||
selected_parts[0]->is_temp = false;
|
||||
new_data_part->addProjectionPart(name, std::move(selected_parts[0]));
|
||||
|
||||
/// Task is finished
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(log, "Forwarded part {} in level {} to next level", selected_parts[0]->name, current_level);
|
||||
next_level_parts.push_back(std::move(selected_parts[0]));
|
||||
}
|
||||
}
|
||||
else if (selected_parts.size() > 1)
|
||||
{
|
||||
// Generate a unique part name
|
||||
++block_num;
|
||||
auto projection_future_part = std::make_shared<FutureMergedMutatedPart>();
|
||||
MergeTreeData::DataPartsVector const_selected_parts(
|
||||
std::make_move_iterator(selected_parts.begin()), std::make_move_iterator(selected_parts.end()));
|
||||
projection_future_part->assign(std::move(const_selected_parts));
|
||||
projection_future_part->name = fmt::format("{}_{}", projection.name, ++block_num);
|
||||
projection_future_part->part_info = {"all", 0, 0, 0};
|
||||
|
||||
MergeTreeData::MergingParams projection_merging_params;
|
||||
projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary;
|
||||
if (projection.type == ProjectionDescription::Type::Aggregate)
|
||||
projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating;
|
||||
|
||||
LOG_DEBUG(log, "Merged {} parts in level {} to {}", selected_parts.size(), current_level, projection_future_part->name);
|
||||
auto tmp_part_merge_task = mutator->mergePartsToTemporaryPart(
|
||||
projection_future_part,
|
||||
projection.metadata,
|
||||
merge_entry,
|
||||
std::make_unique<MergeListElement>((*merge_entry)->table_id, projection_future_part, context),
|
||||
*table_lock_holder,
|
||||
time_of_merge,
|
||||
context,
|
||||
space_reservation,
|
||||
false, // TODO Do we need deduplicate for projections
|
||||
{},
|
||||
false, // no cleanup
|
||||
projection_merging_params,
|
||||
NO_TRANSACTION_PTR,
|
||||
/* need_prefix */ true,
|
||||
new_data_part.get(),
|
||||
".tmp_proj");
|
||||
|
||||
next_level_parts.push_back(executeHere(tmp_part_merge_task));
|
||||
next_level_parts.back()->is_temp = true;
|
||||
}
|
||||
|
||||
/// Need execute again
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
84
src/Storages/MergeTree/MergeProjectionPartsTask.h
Normal file
84
src/Storages/MergeTree/MergeProjectionPartsTask.h
Normal file
@ -0,0 +1,84 @@
|
||||
#pragma once
|
||||
|
||||
#include <Interpreters/StorageID.h>
|
||||
#include <Storages/MergeTree/IExecutableTask.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
|
||||
#include <Storages/MergeTree/MergeProgress.h>
|
||||
#include <Storages/MergeTree/FutureMergedMutatedPart.h>
|
||||
#include <Storages/ProjectionsDescription.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
class MergeProjectionPartsTask : public IExecutableTask
|
||||
{
|
||||
public:
|
||||
|
||||
MergeProjectionPartsTask(
|
||||
String name_,
|
||||
MergeTreeData::MutableDataPartsVector && parts_,
|
||||
const ProjectionDescription & projection_,
|
||||
size_t & block_num_,
|
||||
ContextPtr context_,
|
||||
TableLockHolder * table_lock_holder_,
|
||||
MergeTreeDataMergerMutator * mutator_,
|
||||
MergeListEntry * merge_entry_,
|
||||
time_t time_of_merge_,
|
||||
MergeTreeData::MutableDataPartPtr new_data_part_,
|
||||
ReservationSharedPtr space_reservation_)
|
||||
: name(std::move(name_))
|
||||
, parts(std::move(parts_))
|
||||
, projection(projection_)
|
||||
, block_num(block_num_)
|
||||
, context(context_)
|
||||
, table_lock_holder(table_lock_holder_)
|
||||
, mutator(mutator_)
|
||||
, merge_entry(merge_entry_)
|
||||
, time_of_merge(time_of_merge_)
|
||||
, new_data_part(new_data_part_)
|
||||
, space_reservation(space_reservation_)
|
||||
, log(getLogger("MergeProjectionPartsTask"))
|
||||
{
|
||||
LOG_DEBUG(log, "Selected {} projection_parts from {} to {}", parts.size(), parts.front()->name, parts.back()->name);
|
||||
level_parts[current_level] = std::move(parts);
|
||||
}
|
||||
|
||||
void onCompleted() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
|
||||
StorageID getStorageID() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
|
||||
Priority getPriority() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
|
||||
String getQueryId() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
|
||||
|
||||
bool executeStep() override;
|
||||
|
||||
private:
|
||||
String name;
|
||||
MergeTreeData::MutableDataPartsVector parts;
|
||||
const ProjectionDescription & projection;
|
||||
size_t & block_num;
|
||||
|
||||
ContextPtr context;
|
||||
TableLockHolder * table_lock_holder;
|
||||
MergeTreeDataMergerMutator * mutator;
|
||||
MergeListEntry * merge_entry;
|
||||
time_t time_of_merge;
|
||||
|
||||
MergeTreeData::MutableDataPartPtr new_data_part;
|
||||
ReservationSharedPtr space_reservation;
|
||||
|
||||
LoggerPtr log;
|
||||
|
||||
std::map<size_t, MergeTreeData::MutableDataPartsVector> level_parts;
|
||||
size_t current_level = 0;
|
||||
size_t next_level = 1;
|
||||
|
||||
/// TODO(nikitamikhaylov): make this constant a setting
|
||||
static constexpr size_t max_parts_to_merge_in_one_level = 10;
|
||||
};
|
||||
|
||||
}
|
@ -18,6 +18,8 @@
|
||||
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
|
||||
#include <Storages/MergeTree/FutureMergedMutatedPart.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataWriter.h>
|
||||
#include <Storages/MergeTree/MergeProjectionPartsTask.h>
|
||||
#include <Processors/Transforms/ExpressionTransform.h>
|
||||
#include <Processors/Transforms/MaterializingTransform.h>
|
||||
#include <Processors/Transforms/FilterTransform.h>
|
||||
@ -47,12 +49,12 @@ namespace ErrorCodes
|
||||
extern const int SUPPORT_IS_DISABLED;
|
||||
}
|
||||
|
||||
|
||||
/// PK columns are sorted and merged, ordinary columns are gathered using info from merge step
|
||||
static void extractMergingAndGatheringColumns(
|
||||
const NamesAndTypesList & storage_columns,
|
||||
const ExpressionActionsPtr & sorting_key_expr,
|
||||
const IndicesDescription & indexes,
|
||||
const std::vector<ProjectionDescriptionRawPtr> & projections,
|
||||
const MergeTreeData::MergingParams & merging_params,
|
||||
NamesAndTypesList & gathering_columns, Names & gathering_column_names,
|
||||
NamesAndTypesList & merging_columns, Names & merging_column_names)
|
||||
@ -65,6 +67,12 @@ static void extractMergingAndGatheringColumns(
|
||||
std::copy(index_columns_vec.cbegin(), index_columns_vec.cend(),
|
||||
std::inserter(key_columns, key_columns.end()));
|
||||
}
|
||||
for (const auto & projection : projections)
|
||||
{
|
||||
Names projection_columns_vec = projection->getRequiredColumns();
|
||||
std::copy(projection_columns_vec.cbegin(), projection_columns_vec.cend(),
|
||||
std::inserter(key_columns, key_columns.end()));
|
||||
}
|
||||
|
||||
/// Force sign column for Collapsing mode
|
||||
if (merging_params.mode == MergeTreeData::MergingParams::Collapsing)
|
||||
@ -203,10 +211,13 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
|
||||
extendObjectColumns(global_ctx->storage_columns, object_columns, false);
|
||||
global_ctx->storage_snapshot = std::make_shared<StorageSnapshot>(*global_ctx->data, global_ctx->metadata_snapshot, std::move(object_columns));
|
||||
|
||||
prepareProjectionsToMergeAndRebuild();
|
||||
|
||||
extractMergingAndGatheringColumns(
|
||||
global_ctx->storage_columns,
|
||||
global_ctx->metadata_snapshot->getSortingKey().expression,
|
||||
global_ctx->metadata_snapshot->getSecondaryIndices(),
|
||||
global_ctx->projections_to_rebuild,
|
||||
ctx->merging_params,
|
||||
global_ctx->gathering_columns,
|
||||
global_ctx->gathering_column_names,
|
||||
@ -453,6 +464,65 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::execute()
|
||||
}
|
||||
|
||||
|
||||
void MergeTask::ExecuteAndFinalizeHorizontalPart::calculateProjections(const Block & block) const
|
||||
{
|
||||
for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i)
|
||||
{
|
||||
const auto & projection = *global_ctx->projections_to_rebuild[i];
|
||||
auto projection_block = ctx->projection_squashes[i].add(projection.calculate(block, global_ctx->context));
|
||||
if (projection_block)
|
||||
{
|
||||
auto tmp_part = MergeTreeDataWriter::writeTempProjectionPart(
|
||||
*global_ctx->data, ctx->log, projection_block, projection, global_ctx->new_data_part.get(), ++ctx->projection_block_num);
|
||||
tmp_part.finalize();
|
||||
tmp_part.part->getDataPartStorage().commitTransaction();
|
||||
ctx->projection_parts[projection.name].emplace_back(std::move(tmp_part.part));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void MergeTask::ExecuteAndFinalizeHorizontalPart::constructTaskForProjectionPartsMerge() const
|
||||
{
|
||||
auto && [name, parts] = *ctx->projection_parts_iterator;
|
||||
const auto & projection = global_ctx->metadata_snapshot->projections.get(name);
|
||||
|
||||
ctx->merge_projection_parts_task_ptr = std::make_unique<MergeProjectionPartsTask>
|
||||
(
|
||||
name,
|
||||
std::move(parts),
|
||||
projection,
|
||||
ctx->projection_block_num,
|
||||
global_ctx->context,
|
||||
global_ctx->holder,
|
||||
global_ctx->mutator,
|
||||
global_ctx->merge_entry,
|
||||
global_ctx->time_of_merge,
|
||||
global_ctx->new_data_part,
|
||||
global_ctx->space_reservation
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeMergeProjections()
|
||||
{
|
||||
/// In case if there are no projections we didn't construct a task
|
||||
if (!ctx->merge_projection_parts_task_ptr)
|
||||
return false;
|
||||
|
||||
if (ctx->merge_projection_parts_task_ptr->executeStep())
|
||||
return true;
|
||||
|
||||
++ctx->projection_parts_iterator;
|
||||
|
||||
if (ctx->projection_parts_iterator == std::make_move_iterator(ctx->projection_parts.end()))
|
||||
return false;
|
||||
|
||||
constructTaskForProjectionPartsMerge();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
|
||||
{
|
||||
Block block;
|
||||
@ -462,6 +532,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
|
||||
|
||||
const_cast<MergedBlockOutputStream &>(*global_ctx->to).write(block);
|
||||
|
||||
calculateProjections(block);
|
||||
|
||||
UInt64 result_rows = 0;
|
||||
UInt64 result_bytes = 0;
|
||||
global_ctx->merged_pipeline.tryGetResultRowsAndBytes(result_rows, result_bytes);
|
||||
@ -484,6 +556,13 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
|
||||
return true;
|
||||
}
|
||||
|
||||
// finalize projections
|
||||
calculateProjections(global_ctx->merging_executor->getHeader().cloneEmpty());
|
||||
|
||||
ctx->projection_parts_iterator = std::make_move_iterator(ctx->projection_parts.begin());
|
||||
if (ctx->projection_parts_iterator != std::make_move_iterator(ctx->projection_parts.end()))
|
||||
constructTaskForProjectionPartsMerge();
|
||||
|
||||
global_ctx->merging_executor.reset();
|
||||
global_ctx->merged_pipeline.reset();
|
||||
|
||||
@ -732,24 +811,9 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
|
||||
}
|
||||
|
||||
|
||||
const auto & projections = global_ctx->metadata_snapshot->getProjections();
|
||||
|
||||
for (const auto & projection : projections)
|
||||
for (const auto & projection : global_ctx->projections_to_merge)
|
||||
{
|
||||
MergeTreeData::DataPartsVector projection_parts;
|
||||
for (const auto & part : global_ctx->future_part->parts)
|
||||
{
|
||||
auto actual_projection_parts = part->getProjectionParts();
|
||||
auto it = actual_projection_parts.find(projection.name);
|
||||
if (it != actual_projection_parts.end() && !it->second->is_broken)
|
||||
projection_parts.push_back(it->second);
|
||||
}
|
||||
if (projection_parts.size() < global_ctx->future_part->parts.size())
|
||||
{
|
||||
LOG_DEBUG(ctx->log, "Projection {} is not merged because some parts don't have it", projection.name);
|
||||
continue;
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartsVector projection_parts = global_ctx->projections_to_merge_parts[projection->name];
|
||||
LOG_DEBUG(
|
||||
ctx->log,
|
||||
"Selected {} projection_parts from {} to {}",
|
||||
@ -759,7 +823,7 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
|
||||
|
||||
auto projection_future_part = std::make_shared<FutureMergedMutatedPart>();
|
||||
projection_future_part->assign(std::move(projection_parts));
|
||||
projection_future_part->name = projection.name;
|
||||
projection_future_part->name = projection->name;
|
||||
// TODO (ab): path in future_part is only for merge process introspection, which is not available for merges of projection parts.
|
||||
// Let's comment this out to avoid code inconsistency and add it back after we implement projection merge introspection.
|
||||
// projection_future_part->path = global_ctx->future_part->path + "/" + projection.name + ".proj/";
|
||||
@ -767,16 +831,17 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
|
||||
|
||||
MergeTreeData::MergingParams projection_merging_params;
|
||||
projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary;
|
||||
if (projection.type == ProjectionDescription::Type::Aggregate)
|
||||
if (projection->type == ProjectionDescription::Type::Aggregate)
|
||||
projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating;
|
||||
|
||||
ctx->tasks_for_projections.emplace_back(std::make_shared<MergeTask>(
|
||||
projection_future_part,
|
||||
projection.metadata,
|
||||
projection->metadata,
|
||||
global_ctx->merge_entry,
|
||||
std::make_unique<MergeListElement>((*global_ctx->merge_entry)->table_id, projection_future_part, global_ctx->context),
|
||||
global_ctx->time_of_merge,
|
||||
global_ctx->context,
|
||||
*global_ctx->holder,
|
||||
global_ctx->space_reservation,
|
||||
global_ctx->deduplicate,
|
||||
global_ctx->deduplicate_by_columns,
|
||||
@ -1136,6 +1201,56 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
|
||||
global_ctx->merging_executor = std::make_unique<PullingPipelineExecutor>(global_ctx->merged_pipeline);
|
||||
}
|
||||
|
||||
void MergeTask::ExecuteAndFinalizeHorizontalPart::prepareProjectionsToMergeAndRebuild() const
|
||||
{
|
||||
// These merging modes may or may not reduce number of rows. It's not known until the horizontal stage is finished.
|
||||
const bool merge_may_reduce_rows =
|
||||
global_ctx->deduplicate ||
|
||||
ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
|
||||
ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing ||
|
||||
ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing;
|
||||
|
||||
const auto & projections = global_ctx->metadata_snapshot->getProjections();
|
||||
|
||||
for (const auto & projection : projections)
|
||||
{
|
||||
if (merge_may_reduce_rows)
|
||||
{
|
||||
global_ctx->projections_to_rebuild.push_back(&projection);
|
||||
continue;
|
||||
}
|
||||
|
||||
MergeTreeData::DataPartsVector projection_parts;
|
||||
for (const auto & part : global_ctx->future_part->parts)
|
||||
{
|
||||
auto it = part->getProjectionParts().find(projection.name);
|
||||
if (it != part->getProjectionParts().end())
|
||||
projection_parts.push_back(it->second);
|
||||
}
|
||||
if (projection_parts.size() == global_ctx->future_part->parts.size())
|
||||
{
|
||||
global_ctx->projections_to_merge.push_back(&projection);
|
||||
global_ctx->projections_to_merge_parts[projection.name].assign(projection_parts.begin(), projection_parts.end());
|
||||
}
|
||||
else if (projection_parts.empty())
|
||||
{
|
||||
LOG_DEBUG(ctx->log, "Projection {} will not be merged or rebuilt because all parts don't have it", projection.name);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(ctx->log, "Projection {} will be rebuilt because some parts don't have it", projection.name);
|
||||
global_ctx->projections_to_rebuild.push_back(&projection);
|
||||
}
|
||||
}
|
||||
|
||||
const auto & settings = global_ctx->context->getSettingsRef();
|
||||
|
||||
for (size_t i = 0, size = global_ctx->projections_to_rebuild.size(); i < size; ++i)
|
||||
{
|
||||
ctx->projection_squashes.emplace_back(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
MergeAlgorithm MergeTask::ExecuteAndFinalizeHorizontalPart::chooseMergeAlgorithm() const
|
||||
{
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <Compression/CompressedReadBuffer.h>
|
||||
#include <Compression/CompressedReadBufferFromFile.h>
|
||||
|
||||
#include <Interpreters/SquashingTransform.h>
|
||||
#include <Interpreters/TemporaryDataOnDisk.h>
|
||||
|
||||
#include <Processors/Executors/PullingPipelineExecutor.h>
|
||||
@ -64,6 +65,7 @@ public:
|
||||
std::unique_ptr<MergeListElement> projection_merge_list_element_,
|
||||
time_t time_of_merge_,
|
||||
ContextPtr context_,
|
||||
TableLockHolder & holder,
|
||||
ReservationSharedPtr space_reservation_,
|
||||
bool deduplicate_,
|
||||
Names deduplicate_by_columns_,
|
||||
@ -88,6 +90,7 @@ public:
|
||||
= global_ctx->projection_merge_list_element ? global_ctx->projection_merge_list_element.get() : (*global_ctx->merge_entry)->ptr();
|
||||
global_ctx->time_of_merge = std::move(time_of_merge_);
|
||||
global_ctx->context = std::move(context_);
|
||||
global_ctx->holder = &holder;
|
||||
global_ctx->space_reservation = std::move(space_reservation_);
|
||||
global_ctx->deduplicate = std::move(deduplicate_);
|
||||
global_ctx->deduplicate_by_columns = std::move(deduplicate_by_columns_);
|
||||
@ -142,6 +145,7 @@ private:
|
||||
/// Proper initialization is responsibility of the author
|
||||
struct GlobalRuntimeContext : public IStageRuntimeContext
|
||||
{
|
||||
TableLockHolder * holder;
|
||||
MergeList::Entry * merge_entry{nullptr};
|
||||
/// If not null, use this instead of the global MergeList::Entry. This is for merging projections.
|
||||
std::unique_ptr<MergeListElement> projection_merge_list_element;
|
||||
@ -173,6 +177,10 @@ private:
|
||||
MergeAlgorithm chosen_merge_algorithm{MergeAlgorithm::Undecided};
|
||||
size_t gathering_column_names_size{0};
|
||||
|
||||
std::vector<ProjectionDescriptionRawPtr> projections_to_rebuild{};
|
||||
std::vector<ProjectionDescriptionRawPtr> projections_to_merge{};
|
||||
std::map<String, MergeTreeData::DataPartsVector> projections_to_merge_parts{};
|
||||
|
||||
std::unique_ptr<MergeStageProgress> horizontal_stage_progress{nullptr};
|
||||
std::unique_ptr<MergeStageProgress> column_progress{nullptr};
|
||||
|
||||
@ -219,6 +227,14 @@ private:
|
||||
std::unique_ptr<WriteBuffer> rows_sources_write_buf{nullptr};
|
||||
std::optional<ColumnSizeEstimator> column_sizes{};
|
||||
|
||||
// For projections to rebuild
|
||||
using ProjectionNameToItsBlocks = std::map<String, MergeTreeData::MutableDataPartsVector>;
|
||||
ProjectionNameToItsBlocks projection_parts;
|
||||
std::move_iterator<ProjectionNameToItsBlocks::iterator> projection_parts_iterator;
|
||||
std::vector<SquashingTransform> projection_squashes;
|
||||
size_t projection_block_num = 0;
|
||||
ExecutableTaskPtr merge_projection_parts_task_ptr;
|
||||
|
||||
size_t initial_reservation{0};
|
||||
bool read_with_direct_io{false};
|
||||
|
||||
@ -247,16 +263,21 @@ private:
|
||||
bool executeImpl();
|
||||
|
||||
/// NOTE: Using pointer-to-member instead of std::function and lambda makes stacktraces much more concise and readable
|
||||
using ExecuteAndFinalizeHorizontalPartSubtasks = std::array<bool(ExecuteAndFinalizeHorizontalPart::*)(), 2>;
|
||||
using ExecuteAndFinalizeHorizontalPartSubtasks = std::array<bool(ExecuteAndFinalizeHorizontalPart::*)(), 3>;
|
||||
|
||||
const ExecuteAndFinalizeHorizontalPartSubtasks subtasks
|
||||
{
|
||||
&ExecuteAndFinalizeHorizontalPart::prepare,
|
||||
&ExecuteAndFinalizeHorizontalPart::executeImpl
|
||||
&ExecuteAndFinalizeHorizontalPart::executeImpl,
|
||||
&ExecuteAndFinalizeHorizontalPart::executeMergeProjections
|
||||
};
|
||||
|
||||
ExecuteAndFinalizeHorizontalPartSubtasks::const_iterator subtasks_iterator = subtasks.begin();
|
||||
|
||||
void prepareProjectionsToMergeAndRebuild() const;
|
||||
void calculateProjections(const Block & block) const;
|
||||
void constructTaskForProjectionPartsMerge() const;
|
||||
bool executeMergeProjections();
|
||||
|
||||
MergeAlgorithm chooseMergeAlgorithm() const;
|
||||
void createMergedStream();
|
||||
|
@ -670,7 +670,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
MergeList::Entry * merge_entry,
|
||||
std::unique_ptr<MergeListElement> projection_merge_list_element,
|
||||
TableLockHolder,
|
||||
TableLockHolder & holder,
|
||||
time_t time_of_merge,
|
||||
ContextPtr context,
|
||||
ReservationSharedPtr space_reservation,
|
||||
@ -690,6 +690,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
|
||||
std::move(projection_merge_list_element),
|
||||
time_of_merge,
|
||||
context,
|
||||
holder,
|
||||
space_reservation,
|
||||
deduplicate,
|
||||
deduplicate_by_columns,
|
||||
|
@ -159,7 +159,7 @@ public:
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
MergeListEntry * merge_entry,
|
||||
std::unique_ptr<MergeListElement> projection_merge_list_element,
|
||||
TableLockHolder table_lock_holder,
|
||||
TableLockHolder & table_lock_holder,
|
||||
time_t time_of_merge,
|
||||
ContextPtr context,
|
||||
ReservationSharedPtr space_reservation,
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include <Processors/QueryPlan/CreatingSetsStep.h>
|
||||
#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataWriter.h>
|
||||
#include <Storages/MergeTree/MergeProjectionPartsTask.h>
|
||||
#include <Storages/MutationCommands.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndexInverted.h>
|
||||
@ -1030,136 +1031,6 @@ struct MutationContext
|
||||
using MutationContextPtr = std::shared_ptr<MutationContext>;
|
||||
|
||||
|
||||
class MergeProjectionPartsTask : public IExecutableTask
|
||||
{
|
||||
public:
|
||||
|
||||
MergeProjectionPartsTask(
|
||||
String name_,
|
||||
MergeTreeData::MutableDataPartsVector && parts_,
|
||||
const ProjectionDescription & projection_,
|
||||
size_t & block_num_,
|
||||
MutationContextPtr ctx_)
|
||||
: name(std::move(name_))
|
||||
, parts(std::move(parts_))
|
||||
, projection(projection_)
|
||||
, block_num(block_num_)
|
||||
, ctx(ctx_)
|
||||
, log(getLogger("MergeProjectionPartsTask"))
|
||||
{
|
||||
LOG_DEBUG(log, "Selected {} projection_parts from {} to {}", parts.size(), parts.front()->name, parts.back()->name);
|
||||
level_parts[current_level] = std::move(parts);
|
||||
}
|
||||
|
||||
void onCompleted() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
|
||||
StorageID getStorageID() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
|
||||
Priority getPriority() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
|
||||
String getQueryId() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
|
||||
|
||||
bool executeStep() override
|
||||
{
|
||||
auto & current_level_parts = level_parts[current_level];
|
||||
auto & next_level_parts = level_parts[next_level];
|
||||
|
||||
MergeTreeData::MutableDataPartsVector selected_parts;
|
||||
while (selected_parts.size() < max_parts_to_merge_in_one_level && !current_level_parts.empty())
|
||||
{
|
||||
selected_parts.push_back(std::move(current_level_parts.back()));
|
||||
current_level_parts.pop_back();
|
||||
}
|
||||
|
||||
if (selected_parts.empty())
|
||||
{
|
||||
if (next_level_parts.empty())
|
||||
{
|
||||
LOG_WARNING(log, "There is no projection parts merged");
|
||||
|
||||
/// Task is finished
|
||||
return false;
|
||||
}
|
||||
current_level = next_level;
|
||||
++next_level;
|
||||
}
|
||||
else if (selected_parts.size() == 1)
|
||||
{
|
||||
if (next_level_parts.empty())
|
||||
{
|
||||
LOG_DEBUG(log, "Merged a projection part in level {}", current_level);
|
||||
selected_parts[0]->renameTo(projection.name + ".proj", true);
|
||||
selected_parts[0]->setName(projection.name);
|
||||
selected_parts[0]->is_temp = false;
|
||||
ctx->new_data_part->addProjectionPart(name, std::move(selected_parts[0]));
|
||||
|
||||
/// Task is finished
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(log, "Forwarded part {} in level {} to next level", selected_parts[0]->name, current_level);
|
||||
next_level_parts.push_back(std::move(selected_parts[0]));
|
||||
}
|
||||
}
|
||||
else if (selected_parts.size() > 1)
|
||||
{
|
||||
// Generate a unique part name
|
||||
++block_num;
|
||||
auto projection_future_part = std::make_shared<FutureMergedMutatedPart>();
|
||||
MergeTreeData::DataPartsVector const_selected_parts(
|
||||
std::make_move_iterator(selected_parts.begin()), std::make_move_iterator(selected_parts.end()));
|
||||
projection_future_part->assign(std::move(const_selected_parts));
|
||||
projection_future_part->name = fmt::format("{}_{}", projection.name, ++block_num);
|
||||
projection_future_part->part_info = {"all", 0, 0, 0};
|
||||
|
||||
MergeTreeData::MergingParams projection_merging_params;
|
||||
projection_merging_params.mode = MergeTreeData::MergingParams::Ordinary;
|
||||
if (projection.type == ProjectionDescription::Type::Aggregate)
|
||||
projection_merging_params.mode = MergeTreeData::MergingParams::Aggregating;
|
||||
|
||||
LOG_DEBUG(log, "Merged {} parts in level {} to {}", selected_parts.size(), current_level, projection_future_part->name);
|
||||
auto tmp_part_merge_task = ctx->mutator->mergePartsToTemporaryPart(
|
||||
projection_future_part,
|
||||
projection.metadata,
|
||||
ctx->mutate_entry,
|
||||
std::make_unique<MergeListElement>((*ctx->mutate_entry)->table_id, projection_future_part, ctx->context),
|
||||
*ctx->holder,
|
||||
ctx->time_of_mutation,
|
||||
ctx->context,
|
||||
ctx->space_reservation,
|
||||
false, // TODO Do we need deduplicate for projections
|
||||
{},
|
||||
false, // no cleanup
|
||||
projection_merging_params,
|
||||
NO_TRANSACTION_PTR,
|
||||
/* need_prefix */ true,
|
||||
ctx->new_data_part.get(),
|
||||
".tmp_proj");
|
||||
|
||||
next_level_parts.push_back(executeHere(tmp_part_merge_task));
|
||||
next_level_parts.back()->is_temp = true;
|
||||
}
|
||||
|
||||
/// Need execute again
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
String name;
|
||||
MergeTreeData::MutableDataPartsVector parts;
|
||||
const ProjectionDescription & projection;
|
||||
size_t & block_num;
|
||||
MutationContextPtr ctx;
|
||||
|
||||
LoggerPtr log;
|
||||
|
||||
std::map<size_t, MergeTreeData::MutableDataPartsVector> level_parts;
|
||||
size_t current_level = 0;
|
||||
size_t next_level = 1;
|
||||
|
||||
/// TODO(nikitamikhaylov): make this constant a setting
|
||||
static constexpr size_t max_parts_to_merge_in_one_level = 10;
|
||||
};
|
||||
|
||||
|
||||
// This class is responsible for:
|
||||
// 1. get projection pipeline and a sink to write parts
|
||||
// 2. build an executor that can write block to the input stream (actually we can write through it to generate as many parts as possible)
|
||||
@ -1356,7 +1227,13 @@ void PartMergerWriter::constructTaskForProjectionPartsMerge()
|
||||
std::move(parts),
|
||||
projection,
|
||||
block_num,
|
||||
ctx
|
||||
ctx->context,
|
||||
ctx->holder,
|
||||
ctx->mutator,
|
||||
ctx->mutate_entry,
|
||||
ctx->time_of_mutation,
|
||||
ctx->new_data_part,
|
||||
ctx->space_reservation
|
||||
);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user