This commit is contained in:
Albert Kidrachev 2020-05-25 02:55:00 +03:00
parent 177050a2b5
commit e51e828d5c
5 changed files with 149 additions and 0 deletions

View File

@ -2,6 +2,7 @@
#include <algorithm>
#include <Core/Block.h>
#include <Core/SortDescription.h>
#include <Columns/IColumn.h>
#include <boost/smart_ptr/intrusive_ptr.hpp>
@ -86,4 +87,33 @@ struct SharedBlockRowRef
}
};
struct SharedBlockRowWithSortDescriptionRef : SharedBlockRowRef
{
SortDescription * description = nullptr;
void set(SharedBlockPtr & shared_block_, ColumnRawPtrs * columns_, size_t row_num_) = delete;
bool operator< (const SharedBlockRowRef & other) const
{
size_t size = columns->size();
for (size_t i = 0; i < size; ++i)
{
int res = (*description)[i].direction * (*columns)[i]->compareAt(row_num, other.row_num, *(*other.columns)[i], 1);
if (res < 0)
return true;
else if (res > 0)
return false;
}
return false;
}
void set(SharedBlockPtr & shared_block_, ColumnRawPtrs * columns_, size_t row_num_, SortDescription * description_)
{
shared_block = shared_block_;
columns = columns_;
row_num = row_num_;
description = description_;
}
};
}

View File

@ -84,6 +84,7 @@
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Transforms/OptimizedPartialSortingTransform.h>
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <Processors/Transforms/MergeSortingTransform.h>
#include <Processors/Transforms/DistinctTransform.h>

View File

@ -0,0 +1,83 @@
#include <Processors/Transforms/OptimizedPartialSortingTransform.h>
#include <Interpreters/sortBlock.h>
#include <Common/PODArray.h>
namespace DB
{
OptimizedPartialSortingTransform::OptimizedPartialSortingTransform(
const Block & header_, SortDescription & description_, UInt64 limit_)
: ISimpleTransform(header_, header_, false)
, description(description_), limit(limit_)
, threshold_shared_block(nullptr)
{
}
static ColumnRawPtrs extractColumns(const Block & block, const SortDescription& description)
{
size_t size = description.size();
ColumnRawPtrs res;
res.reserve(size);
for (size_t i = 0; i < size; ++i)
{
const IColumn * column = !description[i].column_name.empty()
? block.getByName(description[i].column_name).column.get()
: block.safeGetByPosition(description[i].column_number).column.get();
res.emplace_back(column);
}
return res;
}
void OptimizedPartialSortingTransform::transform(Chunk & chunk)
{
if (read_rows)
read_rows->add(chunk.getNumRows());
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
chunk.clear();
SharedBlockPtr shared_block = new detail::SharedBlock(std::move(block));
UInt64 rows_num = shared_block->rows();
if (threshold_shared_block) {
SharedBlockRowWithSortDescriptionRef row;
IColumn::Filter filter(rows_num);
ColumnRawPtrs shared_block_columns = extractColumns(*shared_block, description);
size_t filtered_count = 0;
for (UInt64 i = 0; i < rows_num; ++i) {
row.set(shared_block, &shared_block_columns, i, &description);
if (threshold_row < row)
{
++filtered_count;
filter[i] = 1;
}
}
if (filtered_count)
{
for (auto & column : shared_block->getColumns())
{
column = column->filter(filter, filtered_count);
}
}
}
sortBlock(*shared_block, description, limit);
if (!threshold_shared_block && limit && limit < rows_num)
{
Block threshold_block = shared_block->cloneWithColumns(shared_block->getColumns());
threshold_shared_block = new detail::SharedBlock(std::move(threshold_block));
threshold_block_columns = extractColumns(*threshold_shared_block, description);
threshold_row.set(threshold_shared_block, &threshold_block_columns, limit - 1, &description);
}
chunk.setColumns(shared_block->getColumns(), shared_block->rows());
}
}

View File

@ -0,0 +1,34 @@
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Processors/RowsBeforeLimitCounter.h>
#include <Common/SharedBlockRowRef.h>
#include <Core/SortDescription.h>
namespace DB
{
class OptimizedPartialSortingTransform : public ISimpleTransform
{
public:
OptimizedPartialSortingTransform(
const Block & header_,
SortDescription & description_,
UInt64 limit_ = 0);
String getName() const override { return "OptimizedPartialSortingTransform"; }
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { read_rows.swap(counter); }
protected:
void transform(Chunk & chunk) override;
private:
SortDescription description;
UInt64 limit;
RowsBeforeLimitCounterPtr read_rows;
SharedBlockRowWithSortDescriptionRef threshold_row;
SharedBlockPtr threshold_shared_block;
ColumnRawPtrs threshold_block_columns;
};
}

View File

@ -129,6 +129,7 @@ SRCS(
Transforms/MergeSortingTransform.cpp
Transforms/MergingAggregatedMemoryEfficientTransform.cpp
Transforms/MergingAggregatedTransform.cpp
Transforms/OptimizedPartialSortingTransform.cpp
Transforms/PartialSortingTransform.cpp
Transforms/ReverseTransform.cpp
Transforms/RollupTransform.cpp