Attempt to optimize merging sorted blocks

This commit is contained in:
Alexey Milovidov 2019-12-22 04:37:56 +03:00
parent 9f3afed5ff
commit 401c5eef81
3 changed files with 61 additions and 51 deletions

View File

@ -110,21 +110,52 @@ using SortCursorImpls = std::vector<SortCursorImpl>;
/// For easy copying. /// For easy copying.
struct SortCursor template <typename Derived>
struct SortCursorHelper
{ {
SortCursorImpl * impl; SortCursorImpl * impl;
SortCursor(SortCursorImpl * impl_) : impl(impl_) {} const Derived & derived() const { return static_cast<const Derived &>(*this); }
SortCursorHelper(SortCursorImpl * impl_) : impl(impl_) {}
SortCursorImpl * operator-> () { return impl; } SortCursorImpl * operator-> () { return impl; }
const SortCursorImpl * operator-> () const { return impl; } const SortCursorImpl * operator-> () const { return impl; }
bool greater(const SortCursorHelper & rhs) const
{
return derived().greaterAt(rhs.derived(), impl->pos, rhs.impl->pos);
}
/// Inverted so that the priority queue elements are removed in ascending order.
bool operator< (const SortCursorHelper & rhs) const
{
return derived().greater(rhs.derived());
}
/// Checks that all rows in the current block of this cursor are less than or equal to all the rows of the current block of another cursor.
bool totallyLessOrEquals(const SortCursorHelper & rhs) const
{
if (impl->rows == 0 || rhs.impl->rows == 0)
return false;
/// The last row of this cursor is no larger than the first row of the another cursor.
return !derived().greaterAt(rhs.derived(), impl->rows - 1, 0);
}
};
struct SortCursor : SortCursorHelper<SortCursor>
{
using SortCursorHelper<SortCursor>::SortCursorHelper;
/// The specified row of this cursor is greater than the specified row of another cursor. /// The specified row of this cursor is greater than the specified row of another cursor.
bool greaterAt(const SortCursor & rhs, size_t lhs_pos, size_t rhs_pos) const bool greaterAt(const SortCursor & rhs, size_t lhs_pos, size_t rhs_pos) const
{ {
for (size_t i = 0; i < impl->sort_columns_size; ++i) for (size_t i = 0; i < impl->sort_columns_size; ++i)
{ {
int direction = impl->desc[i].direction; const auto & desc = impl->desc[i];
int nulls_direction = impl->desc[i].nulls_direction; int direction = desc.direction;
int nulls_direction = desc.nulls_direction;
int res = direction * impl->sort_columns[i]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), nulls_direction); int res = direction * impl->sort_columns[i]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[i]), nulls_direction);
if (res > 0) if (res > 0)
return true; return true;
@ -133,45 +164,37 @@ struct SortCursor
} }
return impl->order > rhs.impl->order; return impl->order > rhs.impl->order;
} }
};
/// Checks that all rows in the current block of this cursor are less than or equal to all the rows of the current block of another cursor.
bool totallyLessOrEquals(const SortCursor & rhs) const /// For the case with a single column and when there is no order between different cursors.
struct SimpleSortCursor : SortCursorHelper<SimpleSortCursor>
{ {
if (impl->rows == 0 || rhs.impl->rows == 0) using SortCursorHelper<SimpleSortCursor>::SortCursorHelper;
return false;
/// The last row of this cursor is no larger than the first row of the another cursor. bool greaterAt(const SimpleSortCursor & rhs, size_t lhs_pos, size_t rhs_pos) const
return !greaterAt(rhs, impl->rows - 1, 0);
}
bool greater(const SortCursor & rhs) const
{ {
return greaterAt(rhs, impl->pos, rhs.impl->pos); const auto & desc = impl->desc[0];
} int direction = desc.direction;
int nulls_direction = desc.nulls_direction;
/// Inverted so that the priority queue elements are removed in ascending order. int res = impl->sort_columns[0]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[0]), nulls_direction);
bool operator< (const SortCursor & rhs) const return (res > 0) ^ (direction > 0);
{
return greater(rhs);
} }
}; };
/// Separate comparator for locale-sensitive string comparisons /// Separate comparator for locale-sensitive string comparisons
struct SortCursorWithCollation struct SortCursorWithCollation : SortCursorHelper<SortCursorWithCollation>
{ {
SortCursorImpl * impl; using SortCursorHelper<SortCursorWithCollation>::SortCursorHelper;
SortCursorWithCollation(SortCursorImpl * impl_) : impl(impl_) {}
SortCursorImpl * operator-> () { return impl; }
const SortCursorImpl * operator-> () const { return impl; }
bool greaterAt(const SortCursorWithCollation & rhs, size_t lhs_pos, size_t rhs_pos) const bool greaterAt(const SortCursorWithCollation & rhs, size_t lhs_pos, size_t rhs_pos) const
{ {
for (size_t i = 0; i < impl->sort_columns_size; ++i) for (size_t i = 0; i < impl->sort_columns_size; ++i)
{ {
int direction = impl->desc[i].direction; const auto & desc = impl->desc[i];
int nulls_direction = impl->desc[i].nulls_direction; int direction = desc.direction;
int nulls_direction = desc.nulls_direction;
int res; int res;
if (impl->need_collation[i]) if (impl->need_collation[i])
{ {
@ -189,29 +212,11 @@ struct SortCursorWithCollation
} }
return impl->order > rhs.impl->order; return impl->order > rhs.impl->order;
} }
bool totallyLessOrEquals(const SortCursorWithCollation & rhs) const
{
if (impl->rows == 0 || rhs.impl->rows == 0)
return false;
/// The last row of this cursor is no larger than the first row of the another cursor.
return !greaterAt(rhs, impl->rows - 1, 0);
}
bool greater(const SortCursorWithCollation & rhs) const
{
return greaterAt(rhs, impl->pos, rhs.impl->pos);
}
bool operator< (const SortCursorWithCollation & rhs) const
{
return greater(rhs);
}
}; };
/** Allows to fetch data from multiple sort cursors in sorted order (merging sorted data streams). /** Allows to fetch data from multiple sort cursors in sorted order (merging sorted data streams).
* TODO: Replace with "Loser Tree", see https://en.wikipedia.org/wiki/K-way_merge_algorithm
*/ */
template <typename Cursor> template <typename Cursor>
class SortingHeap class SortingHeap

View File

@ -60,8 +60,10 @@ void MergingSortedBlockInputStream::init(MutableColumns & merged_columns)
if (!has_collation) if (!has_collation)
queue_without_collation = SortingHeap<SortCursor>(cursors); queue_without_collation = SortingHeap<SortCursor>(cursors);
else else if (description.size() > 1)
queue_with_collation = SortingHeap<SortCursorWithCollation>(cursors); queue_with_collation = SortingHeap<SortCursorWithCollation>(cursors);
else
queue_simple = SortingHeap<SimpleSortCursor>(cursors);
} }
/// Let's check that all source blocks have the same structure. /// Let's check that all source blocks have the same structure.
@ -98,8 +100,10 @@ Block MergingSortedBlockInputStream::readImpl()
if (has_collation) if (has_collation)
merge(merged_columns, queue_with_collation); merge(merged_columns, queue_with_collation);
else else if (description.size() > 1)
merge(merged_columns, queue_without_collation); merge(merged_columns, queue_without_collation);
else
merge(merged_columns, queue_simple);
return header.cloneWithColumns(std::move(merged_columns)); return header.cloneWithColumns(std::move(merged_columns));
} }

View File

@ -110,6 +110,7 @@ protected:
SortCursorImpls cursors; SortCursorImpls cursors;
SortingHeap<SortCursor> queue_without_collation; SortingHeap<SortCursor> queue_without_collation;
SortingHeap<SimpleSortCursor> queue_simple;
SortingHeap<SortCursorWithCollation> queue_with_collation; SortingHeap<SortCursorWithCollation> queue_with_collation;
/// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step) /// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step)