Merge pull request #38859 from kitaisreal/merge-tree-merge-disable-batch-optimization

MergeTree merge disable batch optimization
This commit is contained in:
Maksim Kita 2022-07-06 15:59:40 +02:00 committed by GitHub
commit b94489d52c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 171 additions and 21 deletions

View File

@ -136,7 +136,8 @@ SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & bloc
pipeline.getHeader(),
pipeline.getNumStreams(),
sort_description,
rows_in_block);
rows_in_block,
SortingQueueStrategy::Default);
pipeline.addTransform(std::move(transform));
}
@ -190,7 +191,8 @@ SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge()
pipeline.getHeader(),
pipeline.getNumStreams(),
sort_description,
rows_in_block);
rows_in_block,
SortingQueueStrategy::Default);
pipeline.addTransform(std::move(transform));
}
@ -222,7 +224,8 @@ SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<vo
pipeline.getHeader(),
pipeline.getNumStreams(),
sort_description,
rows_in_block);
rows_in_block,
SortingQueueStrategy::Default);
pipeline.addTransform(std::move(transform));
}
@ -303,7 +306,8 @@ Block SortedBlocksBuffer::mergeBlocks(Blocks && blocks) const
builder.getHeader(),
builder.getNumStreams(),
sort_description,
num_rows);
num_rows,
SortingQueueStrategy::Default);
builder.addTransform(std::move(transform));
}

View File

@ -12,6 +12,7 @@ MergingSortedAlgorithm::MergingSortedAlgorithm(
size_t num_inputs,
SortDescription description_,
size_t max_block_size,
SortingQueueStrategy sorting_queue_strategy_,
UInt64 limit_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
@ -21,6 +22,7 @@ MergingSortedAlgorithm::MergingSortedAlgorithm(
, limit(limit_)
, out_row_sources_buf(out_row_sources_buf_)
, current_inputs(num_inputs)
, sorting_queue_strategy(sorting_queue_strategy_)
, cursors(num_inputs)
{
DataTypes sort_description_types;
@ -69,11 +71,22 @@ void MergingSortedAlgorithm::initialize(Inputs inputs)
cursors[source_num] = SortCursorImpl(header, chunk.getColumns(), description, source_num);
}
queue_variants.callOnBatchVariant([&](auto & queue)
if (sorting_queue_strategy == SortingQueueStrategy::Default)
{
using QueueType = std::decay_t<decltype(queue)>;
queue = QueueType(cursors);
});
queue_variants.callOnVariant([&](auto & queue)
{
using QueueType = std::decay_t<decltype(queue)>;
queue = QueueType(cursors);
});
}
else
{
queue_variants.callOnBatchVariant([&](auto & queue)
{
using QueueType = std::decay_t<decltype(queue)>;
queue = QueueType(cursors);
});
}
}
void MergingSortedAlgorithm::consume(Input & input, size_t source_num)
@ -82,14 +95,34 @@ void MergingSortedAlgorithm::consume(Input & input, size_t source_num)
current_inputs[source_num].swap(input);
cursors[source_num].reset(current_inputs[source_num].chunk.getColumns(), header);
queue_variants.callOnBatchVariant([&](auto & queue)
if (sorting_queue_strategy == SortingQueueStrategy::Default)
{
queue.push(cursors[source_num]);
});
queue_variants.callOnVariant([&](auto & queue)
{
queue.push(cursors[source_num]);
});
}
else
{
queue_variants.callOnBatchVariant([&](auto & queue)
{
queue.push(cursors[source_num]);
});
}
}
IMergingAlgorithm::Status MergingSortedAlgorithm::merge()
{
if (sorting_queue_strategy == SortingQueueStrategy::Default)
{
IMergingAlgorithm::Status result = queue_variants.callOnVariant([&](auto & queue)
{
return mergeImpl(queue);
});
return result;
}
IMergingAlgorithm::Status result = queue_variants.callOnBatchVariant([&](auto & queue)
{
return mergeBatchImpl(queue);
@ -98,6 +131,100 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::merge()
return result;
}
template <typename TSortingHeap>
IMergingAlgorithm::Status MergingSortedAlgorithm::mergeImpl(TSortingHeap & queue)
{
/// Take rows in required order and put them into `merged_data`, while the rows are no more than `max_block_size`
while (queue.isValid())
{
if (merged_data.hasEnoughRows())
return Status(merged_data.pull());
auto current = queue.current();
if (current.impl->isLast() && current_inputs[current.impl->order].skip_last_row)
{
/// Get the next block from the corresponding source, if there is one.
queue.removeTop();
return Status(current.impl->order);
}
if (current.impl->isFirst()
&& !current_inputs[current.impl->order].skip_last_row /// Ignore optimization if last row should be skipped.
&& (queue.size() == 1
|| (queue.size() >= 2 && current.totallyLessOrEquals(queue.nextChild()))))
{
/** This is special optimization if current cursor is totally less than next cursor.
* We want to insert current cursor chunk directly in merged data.
*
* First if merged_data is not empty we need to flush it.
* We will get into the same condition on next mergeBatch call.
*
* Then we can insert chunk directly in merged data.
*/
if (merged_data.mergedRows() != 0)
return Status(merged_data.pull());
/// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl)
size_t source_num = current.impl->order;
auto chunk_num_rows = current_inputs[source_num].chunk.getNumRows();
UInt64 total_merged_rows_after_insertion = merged_data.mergedRows() + chunk_num_rows;
bool limit_reached = limit && total_merged_rows_after_insertion >= limit;
if (limit && total_merged_rows_after_insertion > limit)
chunk_num_rows -= total_merged_rows_after_insertion - limit;
merged_data.insertChunk(std::move(current_inputs[source_num].chunk), chunk_num_rows);
current_inputs[source_num].chunk = Chunk();
/// Write order of rows for other columns this data will be used in gather stream
if (out_row_sources_buf)
{
RowSourcePart row_source(source_num);
for (size_t i = 0; i < chunk_num_rows; ++i)
out_row_sources_buf->write(row_source.data);
}
/// We will get the next block from the corresponding source, if there is one.
queue.removeTop();
auto status = Status(merged_data.pull(), limit_reached);
if (!limit_reached)
status.required_source = source_num;
return status;
}
merged_data.insertRow(current->all_columns, current->getRow(), current->rows);
if (out_row_sources_buf)
{
RowSourcePart row_source(current.impl->order);
out_row_sources_buf->write(row_source.data);
}
if (limit && merged_data.totalMergedRows() >= limit)
return Status(merged_data.pull(), true);
if (!current->isLast())
{
queue.next();
}
else
{
/// We will get the next block from the corresponding source, if there is one.
queue.removeTop();
return Status(current.impl->order);
}
}
return Status(merged_data.pull(), true);
}
template <typename TSortingQueue>
IMergingAlgorithm::Status MergingSortedAlgorithm::mergeBatchImpl(TSortingQueue & queue)
@ -134,14 +261,22 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeBatchImpl(TSortingQueue &
}
bool limit_reached = false;
if (limit && merged_rows + updated_batch_size > limit)
if (limit && merged_rows + updated_batch_size >= limit && !batch_skip_last_row)
{
updated_batch_size -= merged_rows + updated_batch_size - limit;
limit_reached = true;
}
else if (limit && merged_rows + updated_batch_size > limit)
{
batch_skip_last_row = false;
updated_batch_size -= merged_rows + updated_batch_size - limit;
limit_reached = true;
}
if (unlikely(current.impl->isFirst() && current.impl->isLast(initial_batch_size)))
if (unlikely(current.impl->isFirst() &&
current.impl->isLast(initial_batch_size) &&
!current_inputs[current.impl->order].skip_last_row))
{
/** This is special optimization if current cursor is totally less than next cursor.
* We want to insert current cursor chunk directly in merged data.
@ -167,9 +302,6 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeBatchImpl(TSortingQueue &
out_row_sources_buf->write(row_source.data);
}
if (limit_reached)
break;
/// We will get the next block from the corresponding source, if there is one.
queue.removeTop();

View File

@ -18,6 +18,7 @@ public:
size_t num_inputs,
SortDescription description_,
size_t max_block_size,
SortingQueueStrategy sorting_queue_strategy_,
UInt64 limit_ = 0,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
@ -47,10 +48,15 @@ private:
/// Chunks currently being merged.
Inputs current_inputs;
SortingQueueStrategy sorting_queue_strategy;
SortCursorImpls cursors;
SortQueueVariants queue_variants;
template <typename TSortingQueue>
Status mergeImpl(TSortingQueue & queue);
template <typename TSortingQueue>
Status mergeBatchImpl(TSortingQueue & queue);

View File

@ -12,6 +12,7 @@ MergingSortedTransform::MergingSortedTransform(
size_t num_inputs,
SortDescription description_,
size_t max_block_size,
SortingQueueStrategy sorting_queue_strategy,
UInt64 limit_,
WriteBuffer * out_row_sources_buf_,
bool quiet_,
@ -23,6 +24,7 @@ MergingSortedTransform::MergingSortedTransform(
num_inputs,
std::move(description_),
max_block_size,
sorting_queue_strategy,
limit_,
out_row_sources_buf_,
use_average_block_sizes)

View File

@ -16,6 +16,7 @@ public:
size_t num_inputs,
SortDescription description,
size_t max_block_size,
SortingQueueStrategy sorting_queue_strategy,
UInt64 limit_ = 0,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool quiet_ = false,

View File

@ -583,7 +583,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
pipe.getHeader(),
pipe.numOutputPorts(),
sort_description,
max_block_size);
max_block_size,
SortingQueueStrategy::Batch);
pipe.addTransform(std::move(transform));
}
@ -611,7 +612,7 @@ static void addMergingFinal(
{
case MergeTreeData::MergingParams::Ordinary:
return std::make_shared<MergingSortedTransform>(header, num_outputs,
sort_description, max_block_size);
sort_description, max_block_size, SortingQueueStrategy::Batch);
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedTransform>(header, num_outputs,

View File

@ -124,6 +124,7 @@ void SortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const Build
pipeline.getNumStreams(),
prefix_description,
max_block_size,
SortingQueueStrategy::Batch,
limit_for_merging);
pipeline.addTransform(std::move(transform));
@ -212,6 +213,7 @@ void SortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const Build
pipeline.getNumStreams(),
result_description,
max_block_size,
SortingQueueStrategy::Batch,
limit);
pipeline.addTransform(std::move(transform));
@ -226,6 +228,7 @@ void SortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const Build
pipeline.getNumStreams(),
result_description,
max_block_size,
SortingQueueStrategy::Batch,
limit);
pipeline.addTransform(std::move(transform));

View File

@ -203,6 +203,7 @@ void MergeSortingTransform::consume(Chunk chunk)
0,
description,
max_merged_block_size,
SortingQueueStrategy::Batch,
limit,
nullptr,
quiet,

View File

@ -83,7 +83,7 @@ TEST(MergingSortedTest, SimpleBlockSizeTest)
EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
DEFAULT_MERGE_BLOCK_SIZE, false, nullptr, false, true);
DEFAULT_MERGE_BLOCK_SIZE, SortingQueueStrategy::Batch, 0, nullptr, false, true);
pipe.addTransform(std::move(transform));
@ -125,7 +125,7 @@ TEST(MergingSortedTest, MoreInterestingBlockSizes)
EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
DEFAULT_MERGE_BLOCK_SIZE, false, nullptr, false, true);
DEFAULT_MERGE_BLOCK_SIZE, SortingQueueStrategy::Batch, 0, nullptr, false, true);
pipe.addTransform(std::move(transform));

View File

@ -855,7 +855,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
{
case MergeTreeData::MergingParams::Ordinary:
merged_transform = std::make_shared<MergingSortedTransform>(
header, pipes.size(), sort_description, merge_block_size, 0, ctx->rows_sources_write_buf.get(), true, ctx->blocks_are_granules_size);
header, pipes.size(), sort_description, merge_block_size, SortingQueueStrategy::Default, 0, ctx->rows_sources_write_buf.get(), true, ctx->blocks_are_granules_size);
break;
case MergeTreeData::MergingParams::Collapsing: