diff --git a/src/Interpreters/sortBlock.cpp b/src/Interpreters/sortBlock.cpp index 7b19d338ee8..bdf672623da 100644 --- a/src/Interpreters/sortBlock.cpp +++ b/src/Interpreters/sortBlock.cpp @@ -272,6 +272,56 @@ bool isAlreadySortedImpl(size_t rows, Comparator compare) return true; } +template +void checkSortedWithPermutationImpl(size_t rows, Comparator compare, UInt64 limit, const IColumn::Permutation & permutation) +{ + if (limit && limit < rows) + rows = limit; + + for (size_t i = 1; i < rows; ++i) + { + const size_t current_row = permutation[i]; + const size_t previous_row = permutation[i - 1]; + + if (compare(current_row, previous_row)) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Rows are not sorted with permuation, position {}, previous_row index {}, current_row index {}", i, previous_row, current_row); + } +} + +void checkSortedWithPermutation(const Block & block, const SortDescription & description, UInt64 limit, const IColumn::Permutation & permutation) +{ + if (!block) + return; + + ColumnsWithSortDescriptions columns_with_sort_desc = getColumnsWithSortDescription(block, description); + bool is_collation_required = false; + + for (auto & column_with_sort_desc : columns_with_sort_desc) + { + if (isCollationRequired(column_with_sort_desc.description)) + { + is_collation_required = true; + break; + } + } + + size_t rows = block.rows(); + + if (is_collation_required) + { + PartialSortingLessWithCollation less(columns_with_sort_desc); + checkSortedWithPermutationImpl(rows, less, limit, permutation); + return; + } + else + { + PartialSortingLess less(columns_with_sort_desc); + checkSortedWithPermutationImpl(rows, less, limit, permutation); + return; + } +} + } void sortBlock(Block & block, const SortDescription & description, UInt64 limit) @@ -279,6 +329,10 @@ void sortBlock(Block & block, const SortDescription & description, UInt64 limit) IColumn::Permutation permutation; getBlockSortPermutationImpl(block, description, IColumn::PermutationSortStability::Unstable, limit, permutation); +#ifndef NDEBUG + checkSortedWithPermutation(block, description, limit, permutation); +#endif + if (permutation.empty()) return; @@ -303,6 +357,10 @@ void stableGetPermutation(const Block & block, const SortDescription & descripti return; getBlockSortPermutationImpl(block, description, IColumn::PermutationSortStability::Stable, 0, out_permutation); + +#ifndef NDEBUG + checkSortedWithPermutation(block, description, 0, out_permutation); +#endif } bool isAlreadySorted(const Block & block, const SortDescription & description)