Remove StopCondition from MergingSortedBlockInputStream.

This commit is contained in:
Nikolai Kochetov 2020-04-14 14:25:01 +03:00
parent 2da9f889cd
commit e92827b954
2 changed files with 5 additions and 55 deletions

View File

@ -19,9 +19,9 @@ namespace ErrorCodes
MergingSortedBlockInputStream::MergingSortedBlockInputStream(
const BlockInputStreams & inputs_, const SortDescription & description_,
size_t max_block_size_, UInt64 limit_, WriteBuffer * out_row_sources_buf_, bool quiet_, bool average_block_sizes_)
size_t max_block_size_, UInt64 limit_, WriteBuffer * out_row_sources_buf_, bool quiet_)
: description(description_), max_block_size(max_block_size_), limit(limit_), quiet(quiet_)
, average_block_sizes(average_block_sizes_), source_blocks(inputs_.size())
, source_blocks(inputs_.size())
, cursors(inputs_.size()), out_row_sources_buf(out_row_sources_buf_)
, log(&Logger::get("MergingSortedBlockInputStream"))
{
@ -139,30 +139,15 @@ void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current,
}
bool MergingSortedBlockInputStream::MergeStopCondition::checkStop() const
{
if (!count_average)
return sum_rows_count == max_block_size;
if (sum_rows_count == 0)
return false;
size_t average = sum_blocks_granularity / sum_rows_count;
return sum_rows_count >= average;
}
template <typename TSortingHeap>
void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSortingHeap & queue)
{
size_t merged_rows = 0;
MergeStopCondition stop_condition(average_block_sizes, max_block_size);
/** Increase row counters.
* Return true if it's time to finish generating the current data block.
*/
auto count_row_and_check_limit = [&, this](size_t current_granularity)
auto count_row_and_check_limit = [&, this]()
{
++total_merged_rows;
if (limit && total_merged_rows == limit)
@ -174,8 +159,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSort
}
++merged_rows;
stop_condition.addRowWithGranularity(current_granularity);
return stop_condition.checkStop();
return merged_rows >= max_block_size;
};
/// Take rows in required order and put them into `merged_columns`, while the number of rows are no more than `max_block_size`

View File

@ -34,7 +34,7 @@ public:
*/
MergingSortedBlockInputStream(
const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_,
UInt64 limit_ = 0, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false, bool average_block_sizes_ = false);
UInt64 limit_ = 0, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false);
String getName() const override { return "MergingSorted"; }
@ -44,38 +44,6 @@ public:
Block getHeader() const override { return header; }
protected:
/// Simple class, which allows to check stop condition during merge process
/// in simple case it just compare amount of merged rows with max_block_size
/// in `count_average` case it compares amount of merged rows with linear combination
/// of block sizes from which these rows were taken.
struct MergeStopCondition
{
size_t sum_blocks_granularity = 0;
size_t sum_rows_count = 0;
bool count_average;
size_t max_block_size;
MergeStopCondition(bool count_average_, size_t max_block_size_)
: count_average(count_average_)
, max_block_size(max_block_size_)
{}
/// add single row from block size `granularity`
void addRowWithGranularity(size_t granularity)
{
sum_blocks_granularity += granularity;
sum_rows_count++;
}
/// check that sum_rows_count is enough
bool checkStop() const;
bool empty() const
{
return sum_blocks_granularity == 0;
}
};
Block readImpl() override;
void readSuffixImpl() override;
@ -87,7 +55,6 @@ protected:
template <typename TSortCursor>
void fetchNextBlock(const TSortCursor & current, SortingHeap<TSortCursor> & queue);
Block header;
const SortDescription description;
@ -98,7 +65,6 @@ protected:
bool first = true;
bool has_collation = false;
bool quiet = false;
bool average_block_sizes = false;
/// May be smaller or equal to max_block_size. To do 'reserve' for columns.
size_t expected_block_size = 0;