wip on finish sorting

This commit is contained in:
CurtizJ 2018-10-04 13:24:51 +03:00
parent b326b95592
commit dd9516a810
4 changed files with 200 additions and 34 deletions

View File

@ -5,6 +5,7 @@
#include <Common/formatReadable.h> #include <Common/formatReadable.h>
#include <IO/WriteBufferFromFile.h> #include <IO/WriteBufferFromFile.h>
#include <IO/CompressedWriteBuffer.h> #include <IO/CompressedWriteBuffer.h>
#include <Interpreters/sortBlock.h>
namespace ProfileEvents namespace ProfileEvents
@ -304,4 +305,126 @@ void MergeSortingBlockInputStream::remerge()
sum_bytes_in_blocks = new_sum_bytes_in_blocks; sum_bytes_in_blocks = new_sum_bytes_in_blocks;
} }
FinishMergeSortingBlockInputStream::FinishMergeSortingBlockInputStream(
const BlockInputStreamPtr & input, SortDescription & description_sorted_,
SortDescription & description_to_sort_,
size_t max_merged_block_size_, size_t limit_)
: description_sorted(description_sorted_), description_to_sort(description_to_sort_),
max_merged_block_size(max_merged_block_size_), limit(limit_)
{
children.push_back(input);
header = children.at(0)->getHeader();
removeConstantsFromSortDescription(header, description_sorted);
removeConstantsFromSortDescription(header, description_to_sort);
}
static bool equalKeysAt(const ColumnsWithSortDescriptions & lhs, const ColumnsWithSortDescriptions & rhs, size_t n, size_t m)
{
for (auto it = lhs.begin(), jt = rhs.begin(); it != lhs.end(); ++it, ++jt)
{
int res = it->first->compareAt(n, m, *jt->first, it->second.nulls_direction);
if (res != 0)
return false;
}
return true;
}
Block FinishMergeSortingBlockInputStream::readImpl()
{
if (limit && total_rows_processed == limit)
return {};
Block res;
if (impl)
res = impl->read();
/// If res block is empty, we finish sorting previous chunk of blocks.
if (!res)
{
if (end_of_stream)
return {};
blocks.clear();
if (tail_block)
blocks.push_back(std::move(tail_block));
Block block;
size_t tail_pos = 0;
while (true)
{
block = children.back()->read();
/// End of input stream, but we can`t returns immediatly, we need to merge already read blocks.
/// Check it later, when get end of stream from impl.
if (!block)
{
end_of_stream = true;
break;
}
// If there were only const columns in sort description, then there is no need to sort.
// Return the blocks as is.
if (description_to_sort.empty())
return block;
size_t size = block.rows();
if (size == 0)
continue;
auto columns_with_sort_desc = getColumnsWithSortDescription(block, description_sorted);
removeConstantsFromBlock(block);
/// May be new block starts with new key.
if (!blocks.empty())
{
const Block & last_block = blocks.back();
if (!equalKeysAt(getColumnsWithSortDescription(last_block, description_sorted), columns_with_sort_desc, last_block.rows() - 1, 0))
break;
}
IColumn::Permutation perm(size);
for (size_t i = 0; i < size; ++i)
perm[i] = i;
PartialSortingLess less(columns_with_sort_desc);
/// We need to save tail of block, because next block may starts with the same key as in tail
/// and we should sort these rows in one chunk.
tail_pos = *std::lower_bound(perm.begin(), perm.end(), size - 1, less);
if (tail_pos != 0)
break;
/// If we reach here, that means that current block has all rows with the same key as tail of a previous block.
blocks.push_back(block);
}
if (block)
{
Block head_block = block.cloneEmpty();
tail_block = block.cloneEmpty();
for (size_t i = 0; i < block.columns(); ++i)
{
head_block.getByPosition(i).column = block.getByPosition(i).column->cut(0, tail_pos);
tail_block.getByPosition(i).column = block.getByPosition(i).column->cut(tail_pos, block.rows() - tail_pos);
}
blocks.push_back(head_block);
}
impl = std::make_unique<MergeSortingBlocksBlockInputStream>(blocks, description_to_sort, max_merged_block_size, limit);
res = impl->read();
}
if (res)
enrichBlockWithConstants(res, header);
total_rows_processed += res.rows();
return res;
}
} }

View File

@ -131,4 +131,45 @@ private:
bool remerge_is_useful = true; bool remerge_is_useful = true;
}; };
/** Takes stream already sorted by `x` and finishes sorting it by (`x`, `y`).
* During sorting only blocks with rows equal by `x` saved in RAM.
* */
class FinishMergeSortingBlockInputStream : public IProfilingBlockInputStream
{
public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
FinishMergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_sorted_,
SortDescription & description_to_sort_,
size_t max_merged_block_size_, size_t limit_);
String getName() const override { return "FinishMergeSorting"; }
bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description_to_sort; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
private:
SortDescription description_sorted;
SortDescription description_to_sort;
size_t max_merged_block_size;
size_t limit;
Block tail_block;
Blocks blocks;
std::unique_ptr<IBlockInputStream> impl;
/// Before operation, will remove constant columns from blocks. And after, place constant columns back.
/// to avoid excessive virtual function calls
/// Save original block structure here.
Block header;
bool end_of_stream = false;
size_t total_rows_processed = 0;
};
} }

View File

@ -13,9 +13,19 @@ namespace ErrorCodes
} }
using ColumnsWithSortDescriptions = std::vector<std::pair<const IColumn *, SortColumnDescription>>; static inline bool needCollation(const IColumn * column, const SortColumnDescription & description)
{
if (!description.collator)
return false;
static ColumnsWithSortDescriptions getColumnsWithSortDescription(const Block & block, const SortDescription & description) if (!typeid_cast<const ColumnString *>(column)) /// TODO Nullable(String)
throw Exception("Collations could be specified only for String columns.", ErrorCodes::BAD_COLLATION);
return true;
}
ColumnsWithSortDescriptions getColumnsWithSortDescription(const Block & block, const SortDescription & description)
{ {
size_t size = description.size(); size_t size = description.size();
ColumnsWithSortDescriptions res; ColumnsWithSortDescriptions res;
@ -34,38 +44,6 @@ static ColumnsWithSortDescriptions getColumnsWithSortDescription(const Block & b
} }
static inline bool needCollation(const IColumn * column, const SortColumnDescription & description)
{
if (!description.collator)
return false;
if (!typeid_cast<const ColumnString *>(column)) /// TODO Nullable(String)
throw Exception("Collations could be specified only for String columns.", ErrorCodes::BAD_COLLATION);
return true;
}
struct PartialSortingLess
{
const ColumnsWithSortDescriptions & columns;
explicit PartialSortingLess(const ColumnsWithSortDescriptions & columns_) : columns(columns_) {}
bool operator() (size_t a, size_t b) const
{
for (ColumnsWithSortDescriptions::const_iterator it = columns.begin(); it != columns.end(); ++it)
{
int res = it->second.direction * it->first->compareAt(a, b, *it->first, it->second.nulls_direction);
if (res < 0)
return true;
else if (res > 0)
return false;
}
return false;
}
};
struct PartialSortingLessWithCollation struct PartialSortingLessWithCollation
{ {
const ColumnsWithSortDescriptions & columns; const ColumnsWithSortDescriptions & columns;

View File

@ -29,4 +29,28 @@ void stableGetPermutation(const Block & block, const SortDescription & descripti
*/ */
bool isAlreadySorted(const Block & block, const SortDescription & description); bool isAlreadySorted(const Block & block, const SortDescription & description);
using ColumnsWithSortDescriptions = std::vector<std::pair<const IColumn *, SortColumnDescription>>;
struct PartialSortingLess
{
const ColumnsWithSortDescriptions & columns;
explicit PartialSortingLess(const ColumnsWithSortDescriptions & columns_) : columns(columns_) {}
bool operator() (size_t a, size_t b) const
{
for (ColumnsWithSortDescriptions::const_iterator it = columns.begin(); it != columns.end(); ++it)
{
int res = it->second.direction * it->first->compareAt(a, b, *it->first, it->second.nulls_direction);
if (res < 0)
return true;
else if (res > 0)
return false;
}
return false;
}
};
ColumnsWithSortDescriptions getColumnsWithSortDescription(const Block & block, const SortDescription & description);
} }