Merge pull request #62747 from bigo-sg/percent_rank

New window function `percent_rank`
This commit is contained in:
Vitaly Baranov 2024-07-05 13:53:37 +00:00 committed by GitHub
commit 532eb28a7d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 217 additions and 32 deletions

View File

@ -23,6 +23,7 @@ ClickHouse supports the standard grammar for defining windows and window functio
| `GROUPS` frame | ❌ |
| Calculating aggregate functions over a frame (`sum(value) over (order by time)`) | ✅ (All aggregate functions are supported) |
| `rank()`, `dense_rank()`, `row_number()` | ✅ |
| `percent_rank()` | ✅ Efficiently computes the relative standing of a value within a partition in a dataset. This function effectively replaces the more verbose and computationally intensive manual SQL calculation expressed as `ifNull((rank() OVER(PARTITION BY x ORDER BY y) - 1) / nullif(count(1) OVER(PARTITION BY x) - 1, 0), 0)`|
| `lag/lead(value, offset)` | ❌ <br/> You can use one of the following workarounds:<br/> 1) `any(value) over (.... rows between <offset> preceding and <offset> preceding)`, or `following` for `lead` <br/> 2) `lagInFrame/leadInFrame`, which are analogous, but respect the window frame. To get behavior identical to `lag/lead`, use `rows between unbounded preceding and unbounded following` |
| ntile(buckets) | ✅ <br/> Specify window like, (partition by x order by y rows between unbounded preceding and unrounded following). |

View File

@ -17,6 +17,9 @@
#include <Common/FieldVisitorConvertToNumber.h>
#include <Common/FieldVisitorsAccurateComparison.h>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
#include <limits>
@ -71,6 +74,9 @@ public:
size_t function_index) const = 0;
virtual std::optional<WindowFrame> getDefaultFrame() const { return {}; }
/// Is the frame type supported by this function.
virtual bool checkWindowFrameType(const WindowTransform * /*transform*/) const { return true; }
};
// Compares ORDER BY column values at given rows to find the boundaries of frame:
@ -402,6 +408,19 @@ WindowTransform::WindowTransform(const Block & input_header_,
}
}
}
for (const auto & workspace : workspaces)
{
if (workspace.window_function_impl)
{
if (!workspace.window_function_impl->checkWindowFrameType(this))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported window frame type for function '{}'",
workspace.aggregate_function->getName());
}
}
}
}
WindowTransform::~WindowTransform()
@ -1609,6 +1628,34 @@ struct WindowFunctionHelpers
{
recurrent_detail::setValueToOutputColumn<T>(transform, function_index, value);
}
ALWAYS_INLINE static bool checkPartitionEnterFirstRow(const WindowTransform * transform) { return transform->current_row_number == 1; }
ALWAYS_INLINE static bool checkPartitionEnterLastRow(const WindowTransform * transform)
{
/// This is for fast check.
if (!transform->partition_ended)
return false;
auto current_row = transform->current_row;
/// checkPartitionEnterLastRow is called on each row, also move on current_row.row here.
current_row.row++;
const auto & partition_end_row = transform->partition_end;
/// The partition end is reached, when following is true
/// - current row is the partition end row,
/// - or current row is the last row of all input.
if (current_row != partition_end_row)
{
/// when current row is not the partition end row, we need to check whether it's the last
/// input row.
if (current_row.row < transform->blockRowsNumber(current_row))
return false;
if (partition_end_row.block != current_row.block + 1 || partition_end_row.row)
return false;
}
return true;
}
};
template<typename State>
@ -2058,8 +2105,6 @@ namespace
const WindowTransform * transform,
size_t function_index,
const DataTypes & argument_types);
static void checkWindowFrameType(const WindowTransform * transform);
};
}
@ -2080,6 +2125,29 @@ struct WindowFunctionNtile final : public StatefulWindowFunction<NtileState>
bool allocatesMemoryInArena() const override { return false; }
bool checkWindowFrameType(const WindowTransform * transform) const override
{
if (transform->order_by_indices.empty())
{
LOG_ERROR(getLogger("WindowFunctionNtile"), "Window frame for 'ntile' function must have ORDER BY clause");
return false;
}
// We must wait all for the partition end and get the total rows number in this
// partition. So before the end of this partition, there is no any block could be
// dropped out.
bool is_frame_supported = transform->window_description.frame.begin_type == WindowFrame::BoundaryType::Unbounded
&& transform->window_description.frame.end_type == WindowFrame::BoundaryType::Unbounded;
if (!is_frame_supported)
{
LOG_ERROR(
getLogger("WindowFunctionNtile"),
"Window frame for function 'ntile' should be 'ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING'");
return false;
}
return true;
}
std::optional<WindowFrame> getDefaultFrame() const override
{
WindowFrame frame;
@ -2106,7 +2174,6 @@ namespace
{
if (!buckets) [[unlikely]]
{
checkWindowFrameType(transform);
const auto & current_block = transform->blockAt(transform->current_row);
const auto & workspace = transform->workspaces[function_index];
const auto & arg_col = *current_block.original_input_columns[workspace.argument_column_indices[0]];
@ -2128,7 +2195,7 @@ namespace
}
}
// new partition
if (transform->current_row_number == 1) [[unlikely]]
if (WindowFunctionHelpers::checkPartitionEnterFirstRow(transform)) [[unlikely]]
{
current_partition_rows = 0;
current_partition_inserted_row = 0;
@ -2137,25 +2204,9 @@ namespace
current_partition_rows++;
// Only do the action when we meet the last row in this partition.
if (!transform->partition_ended)
if (!WindowFunctionHelpers::checkPartitionEnterLastRow(transform))
return;
else
{
auto current_row = transform->current_row;
current_row.row++;
const auto & end_row = transform->partition_end;
if (current_row != end_row)
{
if (current_row.row < transform->blockRowsNumber(current_row))
return;
if (end_row.block != current_row.block + 1 || end_row.row)
{
return;
}
// else, current_row is the last input row.
}
}
auto bucket_capacity = current_partition_rows / buckets;
auto capacity_diff = current_partition_rows - bucket_capacity * buckets;
@ -2193,23 +2244,115 @@ namespace
bucket_num += 1;
}
}
}
void NtileState::checkWindowFrameType(const WindowTransform * transform)
namespace
{
struct PercentRankState
{
RowNumber start_row;
UInt64 current_partition_rows = 0;
};
}
struct WindowFunctionPercentRank final : public StatefulWindowFunction<PercentRankState>
{
public:
WindowFunctionPercentRank(const std::string & name_,
const DataTypes & argument_types_, const Array & parameters_)
: StatefulWindowFunction(name_, argument_types_, parameters_, std::make_shared<DataTypeFloat64>())
{}
bool allocatesMemoryInArena() const override { return false; }
bool checkWindowFrameType(const WindowTransform * transform) const override
{
if (transform->order_by_indices.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Window frame for 'ntile' function must have ORDER BY clause");
if (transform->window_description.frame.type != WindowFrame::FrameType::RANGE
|| transform->window_description.frame.begin_type != WindowFrame::BoundaryType::Unbounded
|| transform->window_description.frame.end_type != WindowFrame::BoundaryType::Current)
{
LOG_ERROR(
getLogger("WindowFunctionPercentRank"),
"Window frame for function 'percent_rank' should be 'RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT'");
return false;
}
return true;
}
// We must wait all for the partition end and get the total rows number in this
// partition. So before the end of this partition, there is no any block could be
// dropped out.
bool is_frame_supported = transform->window_description.frame.begin_type == WindowFrame::BoundaryType::Unbounded
&& transform->window_description.frame.end_type == WindowFrame::BoundaryType::Unbounded;
if (!is_frame_supported)
std::optional<WindowFrame> getDefaultFrame() const override
{
WindowFrame frame;
frame.type = WindowFrame::FrameType::RANGE;
frame.begin_type = WindowFrame::BoundaryType::Unbounded;
frame.end_type = WindowFrame::BoundaryType::Current;
return frame;
}
void windowInsertResultInto(const WindowTransform * transform, size_t function_index) const override
{
auto & state = getWorkspaceState(transform, function_index);
if (WindowFunctionHelpers::checkPartitionEnterFirstRow(transform))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Window frame for function 'ntile' should be 'ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING'");
state.current_partition_rows = 0;
state.start_row = transform->current_row;
}
insertRankIntoColumn(transform, function_index);
state.current_partition_rows++;
if (!WindowFunctionHelpers::checkPartitionEnterLastRow(transform))
{
return;
}
UInt64 remaining_rows = state.current_partition_rows;
Float64 percent_rank_denominator = remaining_rows == 1 ? 1 : remaining_rows - 1;
while (remaining_rows > 0)
{
auto block_rows_number = transform->blockRowsNumber(state.start_row);
auto available_block_rows = block_rows_number - state.start_row.row;
if (available_block_rows <= remaining_rows)
{
/// This partition involves multiple blocks. Finish current block and move on to the
/// next block.
auto & to_column = *transform->blockAt(state.start_row).output_columns[function_index];
auto & data = assert_cast<ColumnFloat64 &>(to_column).getData();
for (size_t i = state.start_row.row; i < block_rows_number; ++i)
data[i] = (data[i] - 1) / percent_rank_denominator;
state.start_row.block++;
state.start_row.row = 0;
remaining_rows -= available_block_rows;
}
else
{
/// The partition ends in current block.s
auto & to_column = *transform->blockAt(state.start_row).output_columns[function_index];
auto & data = assert_cast<ColumnFloat64 &>(to_column).getData();
for (size_t i = state.start_row.row, n = state.start_row.row + remaining_rows; i < n; ++i)
{
data[i] = (data[i] - 1) / percent_rank_denominator;
}
state.start_row.row += remaining_rows;
remaining_rows = 0;
}
}
}
}
inline PercentRankState & getWorkspaceState(const WindowTransform * transform, size_t function_index) const
{
const auto & workspace = transform->workspaces[function_index];
return getState(workspace);
}
inline void insertRankIntoColumn(const WindowTransform * transform, size_t function_index) const
{
auto & to_column = *transform->blockAt(transform->current_row).output_columns[function_index];
assert_cast<ColumnFloat64 &>(to_column).getData().push_back(static_cast<Float64>(transform->peer_group_start_row_number));
}
};
// ClickHouse-specific variant of lag/lead that respects the window frame.
template <bool is_lead>
@ -2582,6 +2725,13 @@ void registerWindowFunctions(AggregateFunctionFactory & factory)
parameters);
}, properties}, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("percent_rank", {[](const std::string & name,
const DataTypes & argument_types, const Array & parameters, const Settings *)
{
return std::make_shared<WindowFunctionPercentRank>(name, argument_types,
parameters);
}, properties}, AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("row_number", {[](const std::string & name,
const DataTypes & argument_types, const Array & parameters, const Settings *)
{

View File

@ -79,3 +79,16 @@ iPhone 900 Smartphone 500 500
Kindle Fire 150 Tablet 150 350
Samsung Galaxy Tab 200 Tablet 175 350
iPad 700 Tablet 350 350
---- Q8 ----
Lenovo Thinkpad Laptop 700 1 0
Sony VAIO Laptop 700 1 0
Dell Vostro Laptop 800 3 0.6666666666666666
HP Elite Laptop 1200 4 1
Microsoft Lumia Smartphone 200 1 0
HTC One Smartphone 400 2 0.3333333333333333
Nexus Smartphone 500 3 0.6666666666666666
iPhone Smartphone 900 4 1
Kindle Fire Tablet 150 1 0
Samsung Galaxy Tab Tablet 200 2 0.5
iPad Tablet 700 3 1
Others Unknow 200 1 0

View File

@ -101,5 +101,26 @@ SELECT
FROM products INNER JOIN product_groups USING (group_id)) t
order by group_name, product_name, price;
select '---- Q8 ----';
INSERT INTO product_groups VALUES (4, 'Unknow');
INSERT INTO products (product_id,product_name, group_id,price) VALUES (12, 'Others', 4, 200);
SELECT *
FROM
(
SELECT
product_name,
group_name,
price,
rank() OVER (PARTITION BY group_name ORDER BY price ASC) AS rank,
percent_rank() OVER (PARTITION BY group_name ORDER BY price ASC) AS percent
FROM products
INNER JOIN product_groups USING (group_id)
) AS t
ORDER BY
group_name ASC,
price ASC,
product_name ASC;
drop table product_groups;
drop table products;