Fix for graphite merge mode

This commit is contained in:
Alexander Gololobov 2024-09-02 22:24:53 +02:00
parent 7e444136bb
commit 13f4eb3fac
2 changed files with 13 additions and 14 deletions

View File

@ -1224,7 +1224,8 @@ public:
UInt64 merge_block_size_rows_,
UInt64 merge_block_size_bytes_,
bool blocks_are_granules_size_,
bool cleanup_)
bool cleanup_,
time_t time_of_merge_)
: ITransformingStep(input_stream_, input_stream_.header, Traits{}) // TODO proper traits?
, sort_description(sort_description_)
, partition_key_columns(partition_key_columns_)
@ -1234,6 +1235,7 @@ public:
, merge_block_size_bytes(merge_block_size_bytes_)
, blocks_are_granules_size(blocks_are_granules_size_)
, cleanup(cleanup_)
, time_of_merge(time_of_merge_)
{}
String getName() const override { return "ApplyMergePolicy"; }
@ -1245,10 +1247,6 @@ public:
/// that is going in insertion order.
ProcessorPtr merged_transform;
// /// There is no sense to have the block size bigger than one granule for merge operations.
// const UInt64 merge_block_size_rows = data_settings->merge_max_block_size;
// const UInt64 merge_block_size_bytes = data_settings->merge_max_block_size_bytes;
const auto &header = pipeline.getHeader();
const auto input_streams_count = pipeline.getNumStreams();
@ -1330,15 +1328,15 @@ public:
}
private:
SortDescription sort_description;
Names partition_key_columns;
MergeTreeData::MergingParams merging_params{};
const SortDescription sort_description;
const Names partition_key_columns;
const MergeTreeData::MergingParams merging_params{};
WriteBuffer * rows_sources_write_buf;
const UInt64 merge_block_size_rows;
const UInt64 merge_block_size_bytes;
bool blocks_are_granules_size;
bool cleanup{false};
time_t time_of_merge{0};
const bool blocks_are_granules_size;
const bool cleanup{false};
const time_t time_of_merge{0};
};
@ -1403,7 +1401,7 @@ private:
};
void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const
{
/** Read from all parts, merge and write into a new one.
* In passing, we calculate expression for sorting.
@ -1516,7 +1514,8 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
data_settings->merge_max_block_size,
data_settings->merge_max_block_size_bytes,
ctx->blocks_are_granules_size,
global_ctx->cleanup);
global_ctx->cleanup,
global_ctx->time_of_merge);
merge_step->setStepDescription("Merge sorted parts");
merge_parts_query_plan.addStep(std::move(merge_step));
}

View File

@ -291,7 +291,7 @@ private:
bool executeMergeProjections();
MergeAlgorithm chooseMergeAlgorithm() const;
void createMergedStream();
void createMergedStream() const;
void extractMergingAndGatheringColumns() const;
void setRuntimeContext(StageRuntimeContextPtr local, StageRuntimeContextPtr global) override