Update memory optimisation for MergingSorted.

This commit is contained in:
Nikolai Kochetov 2021-10-18 12:54:12 +03:00
parent 158b4c26b7
commit 6e479b301a
19 changed files with 27 additions and 36 deletions

View File

@ -2303,14 +2303,12 @@ void InterpreterSelectQuery::executeOrderOptimized(QueryPlan & query_plan, Input
{ {
const Settings & settings = context->getSettingsRef(); const Settings & settings = context->getSettingsRef();
const auto & query = getSelectQuery();
auto finish_sorting_step = std::make_unique<FinishSortingStep>( auto finish_sorting_step = std::make_unique<FinishSortingStep>(
query_plan.getCurrentDataStream(), query_plan.getCurrentDataStream(),
input_sorting_info->order_key_prefix_descr, input_sorting_info->order_key_prefix_descr,
output_order_descr, output_order_descr,
settings.max_block_size, settings.max_block_size,
limit, limit);
query.hasFiltration());
query_plan.addStep(std::move(finish_sorting_step)); query_plan.addStep(std::move(finish_sorting_step));
} }

View File

@ -16,7 +16,7 @@ public:
const Block & header, size_t num_inputs, const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size) SortDescription description_, size_t max_block_size)
: IMergingTransform( : IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false, num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
header, header,
num_inputs, num_inputs,
std::move(description_), std::move(description_),

View File

@ -20,7 +20,7 @@ public:
WriteBuffer * out_row_sources_buf_ = nullptr, WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false) bool use_average_block_sizes = false)
: IMergingTransform( : IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false, num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
header, header,
num_inputs, num_inputs,
std::move(description_), std::move(description_),

View File

@ -19,7 +19,7 @@ public:
SortDescription description, SortDescription description,
size_t max_block_size) size_t max_block_size)
: IMergingTransform( : IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false, num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
header, header,
num_inputs, num_inputs,
params, params,

View File

@ -15,7 +15,7 @@ public:
SortDescription description_, size_t max_block_size, SortDescription description_, size_t max_block_size,
Graphite::Params params_, time_t time_of_merge_) Graphite::Params params_, time_t time_of_merge_)
: IMergingTransform( : IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false, num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
header, header,
num_inputs, num_inputs,
std::move(description_), std::move(description_),

View File

@ -15,10 +15,10 @@ IMergingTransformBase::IMergingTransformBase(
const Block & input_header, const Block & input_header,
const Block & output_header, const Block & output_header,
bool have_all_inputs_, bool have_all_inputs_,
bool has_limit_below_one_block_) size_t limit_hint_)
: IProcessor(InputPorts(num_inputs, input_header), {output_header}) : IProcessor(InputPorts(num_inputs, input_header), {output_header})
, have_all_inputs(have_all_inputs_) , have_all_inputs(have_all_inputs_)
, has_limit_below_one_block(has_limit_below_one_block_) , limit_hint(limit_hint_)
{ {
} }
@ -79,7 +79,10 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
/// setNotNeeded after reading first chunk, because in optimismtic case /// setNotNeeded after reading first chunk, because in optimismtic case
/// (e.g. with optimized 'ORDER BY primary_key LIMIT n' and small 'n') /// (e.g. with optimized 'ORDER BY primary_key LIMIT n' and small 'n')
/// we won't have to read any chunks anymore; /// we won't have to read any chunks anymore;
auto chunk = input.pull(has_limit_below_one_block); auto chunk = input.pull(limit_hint != 0);
if (limit_hint && chunk.getNumRows() < limit_hint)
input.setNeeded();
if (!chunk.hasRows()) if (!chunk.hasRows())
{ {
if (!input.isFinished()) if (!input.isFinished())

View File

@ -17,7 +17,7 @@ public:
const Block & input_header, const Block & input_header,
const Block & output_header, const Block & output_header,
bool have_all_inputs_, bool have_all_inputs_,
bool has_limit_below_one_block_); size_t limit_hint_);
OutputPort & getOutputPort() { return outputs.front(); } OutputPort & getOutputPort() { return outputs.front(); }
@ -67,7 +67,7 @@ private:
std::vector<InputState> input_states; std::vector<InputState> input_states;
std::atomic<bool> have_all_inputs; std::atomic<bool> have_all_inputs;
bool is_initialized = false; bool is_initialized = false;
bool has_limit_below_one_block = false; size_t limit_hint = 0;
IProcessor::Status prepareInitializeInputs(); IProcessor::Status prepareInitializeInputs();
}; };
@ -83,9 +83,9 @@ public:
const Block & input_header, const Block & input_header,
const Block & output_header, const Block & output_header,
bool have_all_inputs_, bool have_all_inputs_,
bool has_limit_below_one_block_, size_t limit_hint_,
Args && ... args) Args && ... args)
: IMergingTransformBase(num_inputs, input_header, output_header, have_all_inputs_, has_limit_below_one_block_) : IMergingTransformBase(num_inputs, input_header, output_header, have_all_inputs_, limit_hint_)
, algorithm(std::forward<Args>(args) ...) , algorithm(std::forward<Args>(args) ...)
{ {
} }

View File

@ -13,13 +13,12 @@ MergingSortedTransform::MergingSortedTransform(
SortDescription description_, SortDescription description_,
size_t max_block_size, size_t max_block_size,
UInt64 limit_, UInt64 limit_,
bool has_limit_below_one_block_,
WriteBuffer * out_row_sources_buf_, WriteBuffer * out_row_sources_buf_,
bool quiet_, bool quiet_,
bool use_average_block_sizes, bool use_average_block_sizes,
bool have_all_inputs_) bool have_all_inputs_)
: IMergingTransform( : IMergingTransform(
num_inputs, header, header, have_all_inputs_, has_limit_below_one_block_, num_inputs, header, header, have_all_inputs_, limit_,
header, header,
num_inputs, num_inputs,
std::move(description_), std::move(description_),

View File

@ -17,7 +17,6 @@ public:
SortDescription description, SortDescription description,
size_t max_block_size, size_t max_block_size,
UInt64 limit_ = 0, UInt64 limit_ = 0,
bool has_limit_below_one_block_ = false,
WriteBuffer * out_row_sources_buf_ = nullptr, WriteBuffer * out_row_sources_buf_ = nullptr,
bool quiet_ = false, bool quiet_ = false,
bool use_average_block_sizes = false, bool use_average_block_sizes = false,

View File

@ -18,7 +18,7 @@ public:
WriteBuffer * out_row_sources_buf_ = nullptr, WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false) bool use_average_block_sizes = false)
: IMergingTransform( : IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false, num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
header, header,
num_inputs, num_inputs,
std::move(description_), std::move(description_),

View File

@ -19,7 +19,7 @@ public:
const Names & partition_key_columns, const Names & partition_key_columns,
size_t max_block_size) size_t max_block_size)
: IMergingTransform( : IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false, num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
header, header,
num_inputs, num_inputs,
std::move(description_), std::move(description_),

View File

@ -19,7 +19,7 @@ public:
WriteBuffer * out_row_sources_buf_ = nullptr, WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false) bool use_average_block_sizes = false)
: IMergingTransform( : IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false, num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
header, header,
num_inputs, num_inputs,
std::move(description_), std::move(description_),

View File

@ -31,14 +31,12 @@ FinishSortingStep::FinishSortingStep(
SortDescription prefix_description_, SortDescription prefix_description_,
SortDescription result_description_, SortDescription result_description_,
size_t max_block_size_, size_t max_block_size_,
UInt64 limit_, UInt64 limit_)
bool has_filtration_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits(limit_)) : ITransformingStep(input_stream_, input_stream_.header, getTraits(limit_))
, prefix_description(std::move(prefix_description_)) , prefix_description(std::move(prefix_description_))
, result_description(std::move(result_description_)) , result_description(std::move(result_description_))
, max_block_size(max_block_size_) , max_block_size(max_block_size_)
, limit(limit_) , limit(limit_)
, has_filtration(has_filtration_)
{ {
/// TODO: check input_stream is sorted by prefix_description. /// TODO: check input_stream is sorted by prefix_description.
output_stream->sort_description = result_description; output_stream->sort_description = result_description;
@ -60,14 +58,12 @@ void FinishSortingStep::transformPipeline(QueryPipelineBuilder & pipeline, const
if (pipeline.getNumStreams() > 1) if (pipeline.getNumStreams() > 1)
{ {
UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit); UInt64 limit_for_merging = (need_finish_sorting ? 0 : limit);
bool has_limit_below_one_block = !has_filtration && limit_for_merging && limit_for_merging < max_block_size;
auto transform = std::make_shared<MergingSortedTransform>( auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(), pipeline.getHeader(),
pipeline.getNumStreams(), pipeline.getNumStreams(),
prefix_description, prefix_description,
max_block_size, max_block_size,
limit_for_merging, limit_for_merging);
has_limit_below_one_block);
pipeline.addTransform(std::move(transform)); pipeline.addTransform(std::move(transform));
} }

View File

@ -14,8 +14,7 @@ public:
SortDescription prefix_description_, SortDescription prefix_description_,
SortDescription result_description_, SortDescription result_description_,
size_t max_block_size_, size_t max_block_size_,
UInt64 limit_, UInt64 limit_);
bool has_filtration_);
String getName() const override { return "FinishSorting"; } String getName() const override { return "FinishSorting"; }
@ -32,7 +31,6 @@ private:
SortDescription result_description; SortDescription result_description;
size_t max_block_size; size_t max_block_size;
UInt64 limit; UInt64 limit;
bool has_filtration;
}; };
} }

View File

@ -485,8 +485,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
pipe.getHeader(), pipe.getHeader(),
pipe.numOutputPorts(), pipe.numOutputPorts(),
sort_description, sort_description,
max_block_size, max_block_size);
0, true);
pipe.addTransform(std::move(transform)); pipe.addTransform(std::move(transform));
} }

View File

@ -127,7 +127,7 @@ ColumnGathererTransform::ColumnGathererTransform(
ReadBuffer & row_sources_buf_, ReadBuffer & row_sources_buf_,
size_t block_preferred_size_) size_t block_preferred_size_)
: IMergingTransform<ColumnGathererStream>( : IMergingTransform<ColumnGathererStream>(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false, num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
num_inputs, row_sources_buf_, block_preferred_size_) num_inputs, row_sources_buf_, block_preferred_size_)
, log(&Poco::Logger::get("ColumnGathererStream")) , log(&Poco::Logger::get("ColumnGathererStream"))
{ {

View File

@ -197,7 +197,6 @@ void MergeSortingTransform::consume(Chunk chunk)
description, description,
max_merged_block_size, max_merged_block_size,
limit, limit,
false,
nullptr, nullptr,
quiet, quiet,
use_average_block_sizes, use_average_block_sizes,

View File

@ -83,7 +83,7 @@ TEST(MergingSortedTest, SimpleBlockSizeTest)
EXPECT_EQ(pipe.numOutputPorts(), 3); EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description, auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
DEFAULT_MERGE_BLOCK_SIZE, 0, false, nullptr, false, true); DEFAULT_MERGE_BLOCK_SIZE, 0, nullptr, false, true);
pipe.addTransform(std::move(transform)); pipe.addTransform(std::move(transform));
@ -130,7 +130,7 @@ TEST(MergingSortedTest, MoreInterestingBlockSizes)
EXPECT_EQ(pipe.numOutputPorts(), 3); EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description, auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
DEFAULT_MERGE_BLOCK_SIZE, 0, false, nullptr, false, true); DEFAULT_MERGE_BLOCK_SIZE, 0, nullptr, false, true);
pipe.addTransform(std::move(transform)); pipe.addTransform(std::move(transform));

View File

@ -778,7 +778,7 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
{ {
case MergeTreeData::MergingParams::Ordinary: case MergeTreeData::MergingParams::Ordinary:
merged_transform = std::make_shared<MergingSortedTransform>( merged_transform = std::make_shared<MergingSortedTransform>(
header, pipes.size(), sort_description, merge_block_size, 0, false, ctx->rows_sources_write_buf.get(), true, ctx->blocks_are_granules_size); header, pipes.size(), sort_description, merge_block_size, 0, ctx->rows_sources_write_buf.get(), true, ctx->blocks_are_granules_size);
break; break;
case MergeTreeData::MergingParams::Collapsing: case MergeTreeData::MergingParams::Collapsing: