diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 3bee2ecb0d9..fb5bbc4729c 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -1224,7 +1224,8 @@ public: UInt64 merge_block_size_rows_, UInt64 merge_block_size_bytes_, bool blocks_are_granules_size_, - bool cleanup_) + bool cleanup_, + time_t time_of_merge_) : ITransformingStep(input_stream_, input_stream_.header, Traits{}) // TODO proper traits? , sort_description(sort_description_) , partition_key_columns(partition_key_columns_) @@ -1234,6 +1235,7 @@ public: , merge_block_size_bytes(merge_block_size_bytes_) , blocks_are_granules_size(blocks_are_granules_size_) , cleanup(cleanup_) + , time_of_merge(time_of_merge_) {} String getName() const override { return "ApplyMergePolicy"; } @@ -1245,10 +1247,6 @@ public: /// that is going in insertion order. ProcessorPtr merged_transform; -// /// There is no sense to have the block size bigger than one granule for merge operations. -// const UInt64 merge_block_size_rows = data_settings->merge_max_block_size; -// const UInt64 merge_block_size_bytes = data_settings->merge_max_block_size_bytes; - const auto &header = pipeline.getHeader(); const auto input_streams_count = pipeline.getNumStreams(); @@ -1330,15 +1328,15 @@ public: } private: - SortDescription sort_description; - Names partition_key_columns; - MergeTreeData::MergingParams merging_params{}; + const SortDescription sort_description; + const Names partition_key_columns; + const MergeTreeData::MergingParams merging_params{}; WriteBuffer * rows_sources_write_buf; const UInt64 merge_block_size_rows; const UInt64 merge_block_size_bytes; - bool blocks_are_granules_size; - bool cleanup{false}; - time_t time_of_merge{0}; + const bool blocks_are_granules_size; + const bool cleanup{false}; + const time_t time_of_merge{0}; }; @@ -1403,7 +1401,7 @@ private: }; -void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() +void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() const { /** Read from all parts, merge and write into a new one. * In passing, we calculate expression for sorting. @@ -1516,7 +1514,8 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() data_settings->merge_max_block_size, data_settings->merge_max_block_size_bytes, ctx->blocks_are_granules_size, - global_ctx->cleanup); + global_ctx->cleanup, + global_ctx->time_of_merge); merge_step->setStepDescription("Merge sorted parts"); merge_parts_query_plan.addStep(std::move(merge_step)); } diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index c80995888d4..a5d7851932c 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -291,7 +291,7 @@ private: bool executeMergeProjections(); MergeAlgorithm chooseMergeAlgorithm() const; - void createMergedStream(); + void createMergedStream() const; void extractMergingAndGatheringColumns() const; void setRuntimeContext(StageRuntimeContextPtr local, StageRuntimeContextPtr global) override