mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-22 09:40:49 +00:00
some optimizations
This commit is contained in:
parent
5bd9c8b122
commit
695e3a797a
@ -12,6 +12,105 @@ namespace ErrorCodes
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
// Compares ORDER BY column values at given rows to find the boundaries of frame:
|
||||
// [compared] with [reference] +/- offset. Return value is -1/0/+1, like in
|
||||
// sorting predicates -- -1 means [compared] is less than [reference] +/- offset.
|
||||
template <typename ColumnType>
|
||||
static int compareValuesWithOffset(const IColumn * _compared_column,
|
||||
size_t compared_row, const IColumn * _reference_column,
|
||||
size_t reference_row,
|
||||
uint64_t _offset,
|
||||
bool offset_is_preceding)
|
||||
{
|
||||
// Casting the columns to the known type here makes it faster, probably
|
||||
// because the getData call can be devirtualized.
|
||||
const auto * compared_column = assert_cast<const ColumnType *>(
|
||||
_compared_column);
|
||||
const auto * reference_column = assert_cast<const ColumnType *>(
|
||||
_reference_column);
|
||||
const auto offset = static_cast<typename ColumnType::ValueType>(_offset);
|
||||
|
||||
const auto compared_value_data = compared_column->getDataAt(compared_row);
|
||||
assert(compared_value_data.size == sizeof(typename ColumnType::ValueType));
|
||||
auto compared_value = unalignedLoad<typename ColumnType::ValueType>(
|
||||
compared_value_data.data);
|
||||
|
||||
const auto reference_value_data = reference_column->getDataAt(reference_row);
|
||||
assert(reference_value_data.size == sizeof(typename ColumnType::ValueType));
|
||||
auto reference_value = unalignedLoad<typename ColumnType::ValueType>(
|
||||
reference_value_data.data);
|
||||
|
||||
bool is_overflow;
|
||||
bool overflow_to_negative;
|
||||
if (offset_is_preceding)
|
||||
{
|
||||
is_overflow = __builtin_sub_overflow(reference_value, offset,
|
||||
&reference_value);
|
||||
overflow_to_negative = offset > 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
is_overflow = __builtin_add_overflow(reference_value, offset,
|
||||
&reference_value);
|
||||
overflow_to_negative = offset < 0;
|
||||
}
|
||||
|
||||
// fmt::print(stderr,
|
||||
// "compared [{}] = {}, ref [{}] = {}, offset {} preceding {} overflow {} to negative {}\n",
|
||||
// compared_row, toString(compared_value),
|
||||
// reference_row, toString(reference_value),
|
||||
// toString(offset), offset_is_preceding,
|
||||
// is_overflow, overflow_to_negative);
|
||||
|
||||
if (is_overflow)
|
||||
{
|
||||
if (overflow_to_negative)
|
||||
{
|
||||
// Overflow to the negative, [compared] must be greater.
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Overflow to the positive, [compared] must be less.
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// No overflow, compare normally.
|
||||
return compared_value < reference_value ? -1
|
||||
: compared_value == reference_value ? 0 : 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Helper macros to dispatch on type of the ORDER BY column
|
||||
#define APPLY_FOR_ONE_TYPE(FUNCTION, TYPE) \
|
||||
else if (typeid_cast<const TYPE *>(column)) \
|
||||
{ \
|
||||
/* clang-tidy you're dumb, I can't put FUNCTION in braces here. */ \
|
||||
compare_values_with_offset = FUNCTION<TYPE>; /* NOLINT */ \
|
||||
}
|
||||
|
||||
#define APPLY_FOR_TYPES(FUNCTION) \
|
||||
if (false) /* NOLINT */ \
|
||||
{ \
|
||||
/* Do nothing, a starter condition. */ \
|
||||
} \
|
||||
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int8>) \
|
||||
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<UInt8>) \
|
||||
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int16>) \
|
||||
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<UInt16>) \
|
||||
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int32>) \
|
||||
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<UInt32>) \
|
||||
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int64>) \
|
||||
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<UInt64>) \
|
||||
else \
|
||||
{ \
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, \
|
||||
"The RANGE OFFSET frame for '{}' ORDER BY column is not implemented", \
|
||||
demangle(typeid(*column).name())); \
|
||||
}
|
||||
|
||||
WindowTransform::WindowTransform(const Block & input_header_,
|
||||
const Block & output_header_,
|
||||
const WindowDescription & window_description_,
|
||||
@ -63,6 +162,20 @@ WindowTransform::WindowTransform(const Block & input_header_,
|
||||
order_by_indices.push_back(
|
||||
input_header.getPositionByName(column.column_name));
|
||||
}
|
||||
|
||||
// Choose a row comparison function for RANGE OFFSET frame based on the
|
||||
// type of the ORDER BY column.
|
||||
if (window_description.frame.type == WindowFrame::FrameType::Range
|
||||
&& (window_description.frame.begin_type
|
||||
== WindowFrame::BoundaryType::Offset
|
||||
|| window_description.frame.end_type
|
||||
== WindowFrame::BoundaryType::Offset))
|
||||
{
|
||||
assert(order_by_indices.size() == 1);
|
||||
const IColumn * column = input_header.getByPosition(
|
||||
order_by_indices[0]).column.get();
|
||||
APPLY_FOR_TYPES(compareValuesWithOffset)
|
||||
}
|
||||
}
|
||||
|
||||
WindowTransform::~WindowTransform()
|
||||
@ -290,85 +403,22 @@ void WindowTransform::advanceFrameStartRowsOffset()
|
||||
assert(offset_left >= 0);
|
||||
}
|
||||
|
||||
// Compares ORDER BY column values at given rows to find the boundaries of frame:
|
||||
// [compared] with [reference] +/- offset. Return value is -1/0/+1, like in
|
||||
// sorting predicates -- -1 means [compared] is less than [reference] +/- offset.
|
||||
template <typename ColumnType>
|
||||
static int compareValuesWithOffset(const ColumnType * compared_column,
|
||||
size_t compared_row, const ColumnType * reference_column,
|
||||
size_t reference_row,
|
||||
typename ColumnType::ValueType offset,
|
||||
bool offset_is_preceding)
|
||||
{
|
||||
const auto compared_value_data = compared_column->getDataAt(compared_row);
|
||||
assert(compared_value_data.size == sizeof(typename ColumnType::ValueType));
|
||||
auto compared_value = unalignedLoad<typename ColumnType::ValueType>(
|
||||
compared_value_data.data);
|
||||
|
||||
const auto reference_value_data = reference_column->getDataAt(reference_row);
|
||||
assert(reference_value_data.size == sizeof(typename ColumnType::ValueType));
|
||||
auto reference_value = unalignedLoad<typename ColumnType::ValueType>(
|
||||
reference_value_data.data);
|
||||
|
||||
bool is_overflow;
|
||||
bool overflow_to_negative;
|
||||
if (offset_is_preceding)
|
||||
{
|
||||
is_overflow = __builtin_sub_overflow(reference_value, offset,
|
||||
&reference_value);
|
||||
overflow_to_negative = offset > 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
is_overflow = __builtin_add_overflow(reference_value, offset,
|
||||
&reference_value);
|
||||
overflow_to_negative = offset < 0;
|
||||
}
|
||||
|
||||
// fmt::print(stderr,
|
||||
// "compared [{}] = {}, ref [{}] = {}, offset {} preceding {} overflow {} to negative {}\n",
|
||||
// compared_row, toString(compared_value),
|
||||
// reference_row, toString(reference_value),
|
||||
// toString(offset), offset_is_preceding,
|
||||
// is_overflow, overflow_to_negative);
|
||||
|
||||
if (is_overflow)
|
||||
{
|
||||
if (overflow_to_negative)
|
||||
{
|
||||
// Overflow to the negative, [compared] must be greater.
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Overflow to the positive, [compared] must be less.
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// No overflow, compare normally.
|
||||
return compared_value < reference_value ? -1
|
||||
: compared_value == reference_value ? 0 : 1;
|
||||
}
|
||||
}
|
||||
|
||||
template <typename ColumnType>
|
||||
void WindowTransform::advanceFrameStartRangeOffset()
|
||||
{
|
||||
// See the comment for advanceFrameEndRangeOffset().
|
||||
const int direction = window_description.order_by[0].direction;
|
||||
const bool preceding = window_description.frame.begin_preceding
|
||||
== (direction > 0);
|
||||
const auto * reference_column = assert_cast<const ColumnType *>(
|
||||
inputAt(current_row)[order_by_indices[0]].get());
|
||||
const auto * reference_column
|
||||
= inputAt(current_row)[order_by_indices[0]].get();
|
||||
for (; frame_start < partition_end; advanceRowNumber(frame_start))
|
||||
{
|
||||
// The first frame value is [current_row] with offset, so we advance
|
||||
// while [frames_start] < [current_row] with offset.
|
||||
const auto * compared_column = assert_cast<const ColumnType *>(
|
||||
inputAt(frame_start)[order_by_indices[0]].get());
|
||||
if (compareValuesWithOffset(compared_column, frame_start.row,
|
||||
const auto * compared_column
|
||||
= inputAt(frame_start)[order_by_indices[0]].get();
|
||||
if (compare_values_with_offset(compared_column, frame_start.row,
|
||||
reference_column, current_row.row,
|
||||
window_description.frame.begin_offset,
|
||||
preceding)
|
||||
@ -382,43 +432,6 @@ void WindowTransform::advanceFrameStartRangeOffset()
|
||||
frame_started = partition_ended;
|
||||
}
|
||||
|
||||
// Helper macros to dispatch on type of the ORDER BY column
|
||||
#define APPLY_FOR_ONE_TYPE(FUNCTION, TYPE) \
|
||||
else if (typeid_cast<const TYPE *>(column)) \
|
||||
{ \
|
||||
/* clang-tidy you're dumb, I can't put FUNCTION in braces here. */ \
|
||||
FUNCTION<TYPE>(); /* NOLINT */ \
|
||||
}
|
||||
|
||||
#define APPLY_FOR_TYPES(FUNCTION) \
|
||||
if (false) /* NOLINT */ \
|
||||
{ \
|
||||
/* Do nothing, a starter condition. */ \
|
||||
} \
|
||||
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int8>) \
|
||||
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<UInt8>) \
|
||||
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int16>) \
|
||||
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<UInt16>) \
|
||||
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int32>) \
|
||||
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<UInt32>) \
|
||||
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<Int64>) \
|
||||
APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector<UInt64>) \
|
||||
else \
|
||||
{ \
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, \
|
||||
"The RANGE OFFSET frame for '{}' ORDER BY column is not implemented", \
|
||||
demangle(typeid(*column).name())); \
|
||||
}
|
||||
|
||||
void WindowTransform::advanceFrameStartRangeOffsetDispatch()
|
||||
{
|
||||
// Dispatch on the type of the ORDER BY column.
|
||||
assert(order_by_indices.size() == 1);
|
||||
const IColumn * column = inputAt(current_row)[order_by_indices[0]].get();
|
||||
|
||||
APPLY_FOR_TYPES(advanceFrameStartRangeOffset)
|
||||
}
|
||||
|
||||
void WindowTransform::advanceFrameStart()
|
||||
{
|
||||
if (frame_started)
|
||||
@ -451,7 +464,7 @@ void WindowTransform::advanceFrameStart()
|
||||
advanceFrameStartRowsOffset();
|
||||
break;
|
||||
case WindowFrame::FrameType::Range:
|
||||
advanceFrameStartRangeOffsetDispatch();
|
||||
advanceFrameStartRangeOffset();
|
||||
break;
|
||||
default:
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
|
||||
@ -631,7 +644,6 @@ void WindowTransform::advanceFrameEndRowsOffset()
|
||||
assert(offset_left >= 0);
|
||||
}
|
||||
|
||||
template <typename ColumnType>
|
||||
void WindowTransform::advanceFrameEndRangeOffset()
|
||||
{
|
||||
// PRECEDING/FOLLOWING change direction for DESC order.
|
||||
@ -639,16 +651,16 @@ void WindowTransform::advanceFrameEndRangeOffset()
|
||||
const int direction = window_description.order_by[0].direction;
|
||||
const bool preceding = window_description.frame.end_preceding
|
||||
== (direction > 0);
|
||||
const auto * reference_column = assert_cast<const ColumnType *>(
|
||||
inputAt(current_row)[order_by_indices[0]].get());
|
||||
const auto * reference_column
|
||||
= inputAt(current_row)[order_by_indices[0]].get();
|
||||
for (; frame_end < partition_end; advanceRowNumber(frame_end))
|
||||
{
|
||||
// The last frame value is current_row with offset, and we need a
|
||||
// past-the-end pointer, so we advance while
|
||||
// [frame_end] <= [current_row] with offset.
|
||||
const auto * compared_column = assert_cast<const ColumnType *>(
|
||||
inputAt(frame_end)[order_by_indices[0]].get());
|
||||
if (compareValuesWithOffset(compared_column, frame_end.row,
|
||||
const auto * compared_column
|
||||
= inputAt(frame_end)[order_by_indices[0]].get();
|
||||
if (compare_values_with_offset(compared_column, frame_end.row,
|
||||
reference_column, current_row.row,
|
||||
window_description.frame.end_offset,
|
||||
preceding)
|
||||
@ -662,15 +674,6 @@ void WindowTransform::advanceFrameEndRangeOffset()
|
||||
frame_ended = partition_ended;
|
||||
}
|
||||
|
||||
void WindowTransform::advanceFrameEndRangeOffsetDispatch()
|
||||
{
|
||||
// Dispatch on the type of the ORDER BY column.
|
||||
assert(order_by_indices.size() == 1);
|
||||
const IColumn * column = inputAt(current_row)[order_by_indices[0]].get();
|
||||
|
||||
APPLY_FOR_TYPES(advanceFrameEndRangeOffset)
|
||||
}
|
||||
|
||||
void WindowTransform::advanceFrameEnd()
|
||||
{
|
||||
// No reason for this function to be called again after it succeeded.
|
||||
@ -693,7 +696,7 @@ void WindowTransform::advanceFrameEnd()
|
||||
advanceFrameEndRowsOffset();
|
||||
break;
|
||||
case WindowFrame::FrameType::Range:
|
||||
advanceFrameEndRangeOffsetDispatch();
|
||||
advanceFrameEndRangeOffset();
|
||||
break;
|
||||
default:
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
|
||||
@ -763,24 +766,43 @@ void WindowTransform::updateAggregationState()
|
||||
a->create(buf);
|
||||
}
|
||||
|
||||
for (auto row = rows_to_add_start; row < rows_to_add_end;
|
||||
advanceRowNumber(row))
|
||||
{
|
||||
if (row.block != ws.cached_block_number)
|
||||
{
|
||||
const auto & block
|
||||
= blocks[row.block - first_block_number];
|
||||
ws.argument_columns.clear();
|
||||
for (const auto i : ws.argument_column_indices)
|
||||
{
|
||||
ws.argument_columns.push_back(block.input_columns[i].get());
|
||||
}
|
||||
ws.cached_block_number = row.block;
|
||||
}
|
||||
// To achieve better performance, we will have to loop over blocks and
|
||||
// rows manually, instead of using advanceRowNumber().
|
||||
// For this purpose, the past-the-end block can be different than the
|
||||
// block of the past-the-end row (it's usually the next block).
|
||||
const auto past_the_end_block = rows_to_add_end.row == 0
|
||||
? rows_to_add_end.block
|
||||
: rows_to_add_end.block + 1;
|
||||
|
||||
// fmt::print(stderr, "(2) add row {}\n", row);
|
||||
for (auto block_number = rows_to_add_start.block;
|
||||
block_number < past_the_end_block;
|
||||
++block_number)
|
||||
{
|
||||
auto & block = blockAt(block_number);
|
||||
|
||||
ws.argument_columns.clear();
|
||||
for (const auto i : ws.argument_column_indices)
|
||||
{
|
||||
ws.argument_columns.push_back(block.input_columns[i].get());
|
||||
}
|
||||
ws.cached_block_number = block_number;
|
||||
|
||||
// First and last blocks may be processed partially, and other blocks
|
||||
// are processed in full.
|
||||
const auto first_row = block_number == rows_to_add_start.block
|
||||
? rows_to_add_start.row : 0;
|
||||
const auto past_the_end_row = block_number == rows_to_add_end.block
|
||||
? rows_to_add_end.row : block.rows;
|
||||
|
||||
// We should add an addBatch analog that can accept a starting offset.
|
||||
// For now, add the values one by one.
|
||||
auto * columns = ws.argument_columns.data();
|
||||
a->add(buf, columns, row.row, arena.get());
|
||||
// Removing arena.get() from the loop makes it faster somehow...
|
||||
auto * _arena = arena.get();
|
||||
for (auto row = first_row; row < past_the_end_row; ++row)
|
||||
{
|
||||
a->add(buf, columns, row, _arena);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -793,6 +815,7 @@ void WindowTransform::writeOutCurrentRow()
|
||||
assert(current_row < partition_end);
|
||||
assert(current_row.block >= first_block_number);
|
||||
|
||||
const auto & block = blockAt(current_row);
|
||||
for (size_t wi = 0; wi < workspaces.size(); ++wi)
|
||||
{
|
||||
auto & ws = workspaces[wi];
|
||||
@ -800,7 +823,7 @@ void WindowTransform::writeOutCurrentRow()
|
||||
const auto * a = f.aggregate_function.get();
|
||||
auto * buf = ws.aggregate_function_state.data();
|
||||
|
||||
IColumn * result_column = outputAt(current_row)[wi].get();
|
||||
IColumn * result_column = block.output_columns[wi].get();
|
||||
// FIXME does it also allocate the result on the arena?
|
||||
// We'll have to pass it out with blocks then...
|
||||
a->insertResultInto(buf, *result_column, arena.get());
|
||||
|
@ -108,8 +108,6 @@ private:
|
||||
bool arePeers(const RowNumber & x, const RowNumber & y) const;
|
||||
|
||||
void advanceFrameStartRowsOffset();
|
||||
void advanceFrameStartRangeOffsetDispatch();
|
||||
template <typename ColumnType>
|
||||
void advanceFrameStartRangeOffset();
|
||||
void advanceFrameStart();
|
||||
|
||||
@ -117,8 +115,6 @@ private:
|
||||
void advanceFrameEndCurrentRow();
|
||||
void advanceFrameEndUnbounded();
|
||||
void advanceFrameEnd();
|
||||
void advanceFrameEndRangeOffsetDispatch();
|
||||
template <typename ColumnType>
|
||||
void advanceFrameEndRangeOffset();
|
||||
|
||||
void updateAggregationState();
|
||||
@ -134,13 +130,19 @@ private:
|
||||
const Columns & inputAt(const RowNumber & x) const
|
||||
{ return const_cast<WindowTransform *>(this)->inputAt(x); }
|
||||
|
||||
auto & blockAt(const RowNumber & x)
|
||||
auto & blockAt(const uint64_t block_number)
|
||||
{
|
||||
assert(x.block >= first_block_number);
|
||||
assert(x.block - first_block_number < blocks.size());
|
||||
return blocks[x.block - first_block_number];
|
||||
assert(block_number >= first_block_number);
|
||||
assert(block_number - first_block_number < blocks.size());
|
||||
return blocks[block_number - first_block_number];
|
||||
}
|
||||
|
||||
const auto & blockAt(const uint64_t block_number) const
|
||||
{ return const_cast<WindowTransform *>(this)->blockAt(block_number); }
|
||||
|
||||
auto & blockAt(const RowNumber & x)
|
||||
{ return blockAt(x.block); }
|
||||
|
||||
const auto & blockAt(const RowNumber & x) const
|
||||
{ return const_cast<WindowTransform *>(this)->blockAt(x); }
|
||||
|
||||
@ -299,6 +301,18 @@ public:
|
||||
// state after we find the new frame.
|
||||
RowNumber prev_frame_start;
|
||||
RowNumber prev_frame_end;
|
||||
|
||||
// Comparison function for RANGE OFFSET frames. We choose the appropriate
|
||||
// overload once, based on the type of the ORDER BY column. Choosing it for
|
||||
// each row would be slow.
|
||||
int (* compare_values_with_offset) (
|
||||
const IColumn * compared_column, size_t compared_row,
|
||||
const IColumn * reference_column, size_t reference_row,
|
||||
// We can make it a Field later if we need the Decimals. Now we only
|
||||
// have ints and datetime, and the underlying Field type for them is
|
||||
// uint64_t anyway.
|
||||
uint64_t offset,
|
||||
bool offset_is_preceding);
|
||||
};
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user