Added optimization for Processors; added reserve.

This commit is contained in:
Alexey Milovidov 2019-12-22 01:16:39 +03:00
parent de4d7a5297
commit eb9d6983c8
5 changed files with 46 additions and 39 deletions

View File

@ -179,9 +179,16 @@ template <typename TSortingHeap>
Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue)
{
size_t num_columns = header.columns();
MutableColumns merged_columns = header.cloneEmptyColumns();
/// TODO: reserve (in each column)
/// Reserve
if (queue.isValid() && !blocks.empty())
{
/// The expected size of output block is the same as input block
size_t size_to_reserve = blocks[0].rows();
for (auto & column : merged_columns)
column->reserve(size_to_reserve);
}
/// Take rows from queue in right order and push to 'merged'.
size_t merged_rows = 0;
@ -210,6 +217,9 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(TSortingHeap & queue)
break;
}
if (!queue.isValid())
blocks.clear();
if (merged_rows == 0)
return {};

View File

@ -109,8 +109,7 @@ protected:
size_t num_columns = 0;
std::vector<SharedBlockPtr> source_blocks;
using CursorImpls = std::vector<SortCursorImpl>;
CursorImpls cursors;
SortCursorImpls cursors;
using Queue = std::priority_queue<SortCursor>;
Queue queue_without_collation;

View File

@ -111,8 +111,7 @@ protected:
/// Chunks currently being merged.
std::vector<SharedChunkPtr> source_chunks;
using CursorImpls = std::vector<SortCursorImpl>;
CursorImpls cursors;
SortCursorImpls cursors;
using Queue = std::priority_queue<SortCursor>;
Queue queue_without_collation;

View File

@ -41,15 +41,9 @@ MergeSorter::MergeSorter(Chunks chunks_, SortDescription & description_, size_t
chunks.swap(nonempty_chunks);
if (!has_collation)
{
for (auto & cursor : cursors)
queue_without_collation.push(SortCursor(&cursor));
}
queue_without_collation = SortingHeap<SortCursor>(cursors);
else
{
for (auto & cursor : cursors)
queue_with_collation.push(SortCursorWithCollation(&cursor));
}
queue_with_collation = SortingHeap<SortCursorWithCollation>(cursors);
}
@ -66,49 +60,55 @@ Chunk MergeSorter::read()
}
return !has_collation
? mergeImpl<SortCursor>(queue_without_collation)
: mergeImpl<SortCursorWithCollation>(queue_with_collation);
? mergeImpl(queue_without_collation)
: mergeImpl(queue_with_collation);
}
template <typename TSortCursor>
Chunk MergeSorter::mergeImpl(std::priority_queue<TSortCursor> & queue)
template <typename TSortingHeap>
Chunk MergeSorter::mergeImpl(TSortingHeap & queue)
{
size_t num_columns = chunks[0].getNumColumns();
MutableColumns merged_columns = chunks[0].cloneEmptyColumns();
/// TODO: reserve (in each column)
/// Reserve
if (queue.isValid())
{
/// The expected size of output block is the same as input block
size_t size_to_reserve = chunks[0].getNumRows();
for (auto & column : merged_columns)
column->reserve(size_to_reserve);
}
/// Take rows from queue in right order and push to 'merged'.
size_t merged_rows = 0;
while (!queue.empty())
while (queue.isValid())
{
TSortCursor current = queue.top();
queue.pop();
auto current = queue.current();
/// Append a row from queue.
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
++total_merged_rows;
++merged_rows;
if (!current->isLast())
{
current->next();
queue.push(current);
}
/// We don't need more rows because of limit has reached.
if (limit && total_merged_rows == limit)
{
chunks.clear();
return Chunk(std::move(merged_columns), merged_rows);
break;
}
queue.next();
/// It's enough for current output block but we will continue.
if (merged_rows == max_merged_block_size)
return Chunk(std::move(merged_columns), merged_rows);
break;
}
chunks.clear();
if (!queue.isValid())
chunks.clear();
if (merged_rows == 0)
return {};

View File

@ -1,10 +1,10 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
#include <DataStreams/IBlockInputStream.h>
#include <Processors/ISource.h>
#include <queue>
namespace DB
@ -27,19 +27,18 @@ private:
UInt64 limit;
size_t total_merged_rows = 0;
using CursorImpls = std::vector<SortCursorImpl>;
CursorImpls cursors;
SortCursorImpls cursors;
bool has_collation = false;
std::priority_queue<SortCursor> queue_without_collation;
std::priority_queue<SortCursorWithCollation> queue_with_collation;
SortingHeap<SortCursor> queue_without_collation;
SortingHeap<SortCursorWithCollation> queue_with_collation;
/** Two different cursors are supported - with and without Collation.
* Templates are used (instead of virtual functions in SortCursor) for zero-overhead.
*/
template <typename TSortCursor>
Chunk mergeImpl(std::priority_queue<TSortCursor> & queue);
template <typename TSortingHeap>
Chunk mergeImpl(TSortingHeap & queue);
};