Move all vertically possible streams to stopcondition

This commit is contained in:
alesapin 2019-03-26 14:37:32 +03:00
parent 2d0224aab4
commit 0378400972
6 changed files with 26 additions and 20 deletions

View File

@ -40,7 +40,7 @@ void CollapsingSortedBlockInputStream::reportIncorrectData()
} }
void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_columns, size_t & merged_rows) void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_columns, size_t block_size, MergeStopCondition & condition)
{ {
if (count_positive == 0 && count_negative == 0) if (count_positive == 0 && count_negative == 0)
{ {
@ -52,7 +52,7 @@ void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_column
{ {
if (count_positive <= count_negative) if (count_positive <= count_negative)
{ {
++merged_rows; condition.incrementRowsCountFromGranularity(block_size);
for (size_t i = 0; i < num_columns; ++i) for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*(*first_negative.columns)[i], first_negative.row_num); merged_columns[i]->insertFrom(*(*first_negative.columns)[i], first_negative.row_num);
@ -62,7 +62,7 @@ void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_column
if (count_positive >= count_negative) if (count_positive >= count_negative)
{ {
++merged_rows; condition.incrementRowsCountFromGranularity(block_size);
for (size_t i = 0; i < num_columns; ++i) for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*(*last_positive.columns)[i], last_positive.row_num); merged_columns[i]->insertFrom(*(*last_positive.columns)[i], last_positive.row_num);
@ -106,12 +106,14 @@ Block CollapsingSortedBlockInputStream::readImpl()
void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue) void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
{ {
size_t merged_rows = 0;
MergeStopCondition stop_condition(average_block_sizes, max_block_size);
size_t current_block_granularity;
/// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size` /// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size`
for (; !queue.empty(); ++current_pos) for (; !queue.empty(); ++current_pos)
{ {
SortCursor current = queue.top(); SortCursor current = queue.top();
current_block_granularity = current->rows;
if (current_key.empty()) if (current_key.empty())
setPrimaryKeyRef(current_key, current); setPrimaryKeyRef(current_key, current);
@ -122,7 +124,7 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st
bool key_differs = next_key != current_key; bool key_differs = next_key != current_key;
/// if there are enough rows and the last one is calculated completely /// if there are enough rows and the last one is calculated completely
if (key_differs && merged_rows >= max_block_size) if (key_differs && stop_condition.checkStop())
{ {
++blocks_written; ++blocks_written;
return; return;
@ -133,7 +135,7 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st
if (key_differs) if (key_differs)
{ {
/// We write data for the previous primary key. /// We write data for the previous primary key.
insertRows(merged_columns, merged_rows); insertRows(merged_columns, current_block_granularity, stop_condition);
current_key.swap(next_key); current_key.swap(next_key);
@ -167,7 +169,7 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st
first_negative_pos = current_pos; first_negative_pos = current_pos;
} }
if (!blocks_written && !merged_rows) if (!blocks_written && stop_condition.empty())
{ {
setRowRef(last_negative, current); setRowRef(last_negative, current);
last_negative_pos = current_pos; last_negative_pos = current_pos;
@ -193,7 +195,7 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st
} }
/// Write data for last primary key. /// Write data for last primary key.
insertRows(merged_columns, merged_rows); insertRows(merged_columns, current_block_granularity, stop_condition);
finished = true; finished = true;
} }

View File

@ -26,8 +26,8 @@ class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream
public: public:
CollapsingSortedBlockInputStream( CollapsingSortedBlockInputStream(
BlockInputStreams inputs_, const SortDescription & description_, BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr) const String & sign_column, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr, bool average_block_sizes = false)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_) : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_, false, average_block_sizes)
{ {
sign_column_number = header.getPositionByName(sign_column); sign_column_number = header.getPositionByName(sign_column);
} }
@ -75,7 +75,7 @@ private:
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue); void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
/// Output to result rows for the current primary key. /// Output to result rows for the current primary key.
void insertRows(MutableColumns & merged_columns, size_t & merged_rows); void insertRows(MutableColumns & merged_columns, size_t block_size, MergeStopCondition & condition);
void reportIncorrectData(); void reportIncorrectData();
}; };

View File

@ -134,6 +134,10 @@ protected:
bool checkStop(size_t total_rows) const; bool checkStop(size_t total_rows) const;
bool checkStop() const; bool checkStop() const;
bool empty() const
{
return blocks_granularity.empty();
}
}; };

View File

@ -16,8 +16,8 @@ namespace ErrorCodes
VersionedCollapsingSortedBlockInputStream::VersionedCollapsingSortedBlockInputStream( VersionedCollapsingSortedBlockInputStream::VersionedCollapsingSortedBlockInputStream(
const BlockInputStreams & inputs_, const SortDescription & description_, const BlockInputStreams & inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_, const String & sign_column_, size_t max_block_size_,
WriteBuffer * out_row_sources_buf_) WriteBuffer * out_row_sources_buf_, bool average_block_sizes_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_) : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_, false, average_block_sizes_)
, max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 2) , max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 2)
, current_keys(max_rows_in_queue + 1) , current_keys(max_rows_in_queue + 1)
{ {
@ -83,7 +83,7 @@ Block VersionedCollapsingSortedBlockInputStream::readImpl()
void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue) void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
{ {
size_t merged_rows = 0; MergeStopCondition stop_condition(average_block_sizes, max_block_size);
auto update_queue = [this, & queue](SortCursor & cursor) auto update_queue = [this, & queue](SortCursor & cursor)
{ {
@ -108,6 +108,7 @@ void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_co
while (!queue.empty()) while (!queue.empty())
{ {
SortCursor current = queue.top(); SortCursor current = queue.top();
size_t current_block_granularity = current->rows;
RowRef next_key; RowRef next_key;
@ -154,10 +155,10 @@ void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_co
current_keys.popFront(); current_keys.popFront();
++merged_rows; stop_condition.incrementRowsCountFromGranularity(current_block_granularity);
--rows_to_merge; --rows_to_merge;
if (merged_rows >= max_block_size) if (stop_condition.checkStop())
{ {
++blocks_written; ++blocks_written;
return; return;
@ -173,7 +174,6 @@ void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_co
insertRow(gap, row, merged_columns); insertRow(gap, row, merged_columns);
current_keys.popFront(); current_keys.popFront();
++merged_rows;
} }
/// Write information about last collapsed rows. /// Write information about last collapsed rows.

View File

@ -178,7 +178,7 @@ public:
VersionedCollapsingSortedBlockInputStream( VersionedCollapsingSortedBlockInputStream(
const BlockInputStreams & inputs_, const SortDescription & description_, const BlockInputStreams & inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_, const String & sign_column_, size_t max_block_size_,
WriteBuffer * out_row_sources_buf_ = nullptr); WriteBuffer * out_row_sources_buf_ = nullptr, bool average_block_sizes_ = false);
String getName() const override { return "VersionedCollapsingSorted"; } String getName() const override { return "VersionedCollapsingSorted"; }

View File

@ -642,7 +642,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
case MergeTreeData::MergingParams::Collapsing: case MergeTreeData::MergingParams::Collapsing:
merged_stream = std::make_unique<CollapsingSortedBlockInputStream>( merged_stream = std::make_unique<CollapsingSortedBlockInputStream>(
src_streams, sort_description, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get()); src_streams, sort_description, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get(), blocks_are_granules_size);
break; break;
case MergeTreeData::MergingParams::Summing: case MergeTreeData::MergingParams::Summing:
@ -668,7 +668,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
case MergeTreeData::MergingParams::VersionedCollapsing: case MergeTreeData::MergingParams::VersionedCollapsing:
merged_stream = std::make_unique<VersionedCollapsingSortedBlockInputStream>( merged_stream = std::make_unique<VersionedCollapsingSortedBlockInputStream>(
src_streams, sort_description, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get()); src_streams, sort_description, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, rows_sources_write_buf.get(), blocks_are_granules_size);
break; break;
} }