diff --git a/src/Planner/PlannerActionsVisitor.cpp b/src/Planner/PlannerActionsVisitor.cpp index 99b9c3f7482..f6c2c92cbb4 100644 --- a/src/Planner/PlannerActionsVisitor.cpp +++ b/src/Planner/PlannerActionsVisitor.cpp @@ -1,5 +1,6 @@ #include +#include #include #include #include @@ -242,6 +243,11 @@ public: auto & window_frame = window_node.getWindowFrame(); if (!window_frame.is_default) return window_frame; + auto aggregate_function = function_node.getAggregateFunction(); + if (const auto * win_func = dynamic_cast(aggregate_function.get())) + { + return win_func->getDefaultFrame(); + } return {}; }; buffer << " OVER ("; diff --git a/src/Planner/PlannerWindowFunctions.cpp b/src/Planner/PlannerWindowFunctions.cpp index 7d0fc3a85b3..2a28787ba96 100644 --- a/src/Planner/PlannerWindowFunctions.cpp +++ b/src/Planner/PlannerWindowFunctions.cpp @@ -24,7 +24,6 @@ extern const int NOT_IMPLEMENTED; namespace { -//WindowDescription extractWindowDescriptionFromWindowNode(const QueryTreeNodePtr & node, const PlannerContext & planner_context) WindowDescription extractWindowDescriptionFromWindowNode(const FunctionNode & func_node, const PlannerContext & planner_context) { auto node = func_node.getWindowNode(); diff --git a/src/Processors/Transforms/WindowTransform.cpp b/src/Processors/Transforms/WindowTransform.cpp index a003c9a8e56..c26cd7cc8c3 100644 --- a/src/Processors/Transforms/WindowTransform.cpp +++ b/src/Processors/Transforms/WindowTransform.cpp @@ -6,12 +6,9 @@ #include #include #include -#include #include #include -#include #include -#include #include #include #include @@ -19,6 +16,9 @@ #include #include #include +#include +#include +#include #include #include @@ -57,31 +57,30 @@ struct Settings; namespace ErrorCodes { -extern const int BAD_ARGUMENTS; -extern const int NOT_IMPLEMENTED; -extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; -extern const int ILLEGAL_TYPE_OF_ARGUMENT; -extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION; -extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION; + extern const int BAD_ARGUMENTS; + extern const int NOT_IMPLEMENTED; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int ILLEGAL_TYPE_OF_ARGUMENT; + extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION; + extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION; } - // Compares ORDER BY column values at given rows to find the boundaries of frame: // [compared] with [reference] +/- offset. Return value is -1/0/+1, like in // sorting predicates -- -1 means [compared] is less than [reference] +/- offset. template -static int compareValuesWithOffset( - const IColumn * _compared_column, - size_t compared_row, - const IColumn * _reference_column, +static int compareValuesWithOffset(const IColumn * _compared_column, + size_t compared_row, const IColumn * _reference_column, size_t reference_row, const Field & _offset, bool offset_is_preceding) { // Casting the columns to the known type here makes it faster, probably // because the getData call can be devirtualized. - const auto * compared_column = assert_cast(_compared_column); - const auto * reference_column = assert_cast(_reference_column); + const auto * compared_column = assert_cast( + _compared_column); + const auto * reference_column = assert_cast( + _reference_column); using ValueType = typename ColumnType::ValueType; // Note that the storage type of offset returned by get<> is different, so @@ -91,11 +90,13 @@ static int compareValuesWithOffset( const auto compared_value_data = compared_column->getDataAt(compared_row); assert(compared_value_data.size == sizeof(ValueType)); - auto compared_value = unalignedLoad(compared_value_data.data); + auto compared_value = unalignedLoad( + compared_value_data.data); const auto reference_value_data = reference_column->getDataAt(reference_row); assert(reference_value_data.size == sizeof(ValueType)); - auto reference_value = unalignedLoad(reference_value_data.data); + auto reference_value = unalignedLoad( + reference_value_data.data); bool is_overflow; if (offset_is_preceding) @@ -120,34 +121,37 @@ static int compareValuesWithOffset( else { // No overflow, compare normally. - return compared_value < reference_value ? -1 : compared_value == reference_value ? 0 : 1; + return compared_value < reference_value ? -1 + : compared_value == reference_value ? 0 : 1; } } // A specialization of compareValuesWithOffset for floats. template -static int compareValuesWithOffsetFloat( - const IColumn * _compared_column, - size_t compared_row, - const IColumn * _reference_column, +static int compareValuesWithOffsetFloat(const IColumn * _compared_column, + size_t compared_row, const IColumn * _reference_column, size_t reference_row, const Field & _offset, bool offset_is_preceding) { // Casting the columns to the known type here makes it faster, probably // because the getData call can be devirtualized. - const auto * compared_column = assert_cast(_compared_column); - const auto * reference_column = assert_cast(_reference_column); + const auto * compared_column = assert_cast( + _compared_column); + const auto * reference_column = assert_cast( + _reference_column); const auto offset = _offset.get(); chassert(offset >= 0); const auto compared_value_data = compared_column->getDataAt(compared_row); assert(compared_value_data.size == sizeof(typename ColumnType::ValueType)); - auto compared_value = unalignedLoad(compared_value_data.data); + auto compared_value = unalignedLoad( + compared_value_data.data); const auto reference_value_data = reference_column->getDataAt(reference_row); assert(reference_value_data.size == sizeof(typename ColumnType::ValueType)); - auto reference_value = unalignedLoad(reference_value_data.data); + auto reference_value = unalignedLoad( + reference_value_data.data); /// Floats overflow to Inf and the comparison will work normally, so we don't have to do anything. if (offset_is_preceding) @@ -155,58 +159,58 @@ static int compareValuesWithOffsetFloat( else reference_value += static_cast(offset); - const auto result = compared_value < reference_value ? -1 : (compared_value == reference_value ? 0 : 1); + const auto result = compared_value < reference_value ? -1 + : (compared_value == reference_value ? 0 : 1); return result; } // Helper macros to dispatch on type of the ORDER BY column #define APPLY_FOR_ONE_NEST_TYPE(FUNCTION, TYPE) \ - else if (typeid_cast(nest_compared_column.get())) \ - { \ - /* clang-tidy you're dumb, I can't put FUNCTION in braces here. */ \ - nest_compare_function = FUNCTION; /* NOLINT */ \ - } +else if (typeid_cast(nest_compared_column.get())) \ +{ \ + /* clang-tidy you're dumb, I can't put FUNCTION in braces here. */ \ + nest_compare_function = FUNCTION; /* NOLINT */ \ +} #define APPLY_FOR_NEST_TYPES(FUNCTION) \ - if (false) /* NOLINT */ \ - { \ - /* Do nothing, a starter condition. */ \ - } \ - APPLY_FOR_ONE_NEST_TYPE(FUNCTION, ColumnVector) \ - APPLY_FOR_ONE_NEST_TYPE(FUNCTION, ColumnVector) \ - APPLY_FOR_ONE_NEST_TYPE(FUNCTION, ColumnVector) \ - APPLY_FOR_ONE_NEST_TYPE(FUNCTION, ColumnVector) \ +if (false) /* NOLINT */ \ +{ \ + /* Do nothing, a starter condition. */ \ +} \ +APPLY_FOR_ONE_NEST_TYPE(FUNCTION, ColumnVector) \ +APPLY_FOR_ONE_NEST_TYPE(FUNCTION, ColumnVector) \ +APPLY_FOR_ONE_NEST_TYPE(FUNCTION, ColumnVector) \ +APPLY_FOR_ONE_NEST_TYPE(FUNCTION, ColumnVector) \ \ - APPLY_FOR_ONE_NEST_TYPE(FUNCTION, ColumnVector) \ - APPLY_FOR_ONE_NEST_TYPE(FUNCTION, ColumnVector) \ - APPLY_FOR_ONE_NEST_TYPE(FUNCTION, ColumnVector) \ - APPLY_FOR_ONE_NEST_TYPE(FUNCTION, ColumnVector) \ - APPLY_FOR_ONE_NEST_TYPE(FUNCTION, ColumnVector) \ +APPLY_FOR_ONE_NEST_TYPE(FUNCTION, ColumnVector) \ +APPLY_FOR_ONE_NEST_TYPE(FUNCTION, ColumnVector) \ +APPLY_FOR_ONE_NEST_TYPE(FUNCTION, ColumnVector) \ +APPLY_FOR_ONE_NEST_TYPE(FUNCTION, ColumnVector) \ +APPLY_FOR_ONE_NEST_TYPE(FUNCTION, ColumnVector) \ \ - APPLY_FOR_ONE_NEST_TYPE(FUNCTION##Float, ColumnVector) \ - APPLY_FOR_ONE_NEST_TYPE(FUNCTION##Float, ColumnVector) \ +APPLY_FOR_ONE_NEST_TYPE(FUNCTION##Float, ColumnVector) \ +APPLY_FOR_ONE_NEST_TYPE(FUNCTION##Float, ColumnVector) \ \ - else \ - { \ - throw Exception( \ - ErrorCodes::NOT_IMPLEMENTED, \ - "The RANGE OFFSET frame for '{}' ORDER BY nest column is not implemented", \ - demangle(typeid(nest_compared_column).name())); \ - } +else \ +{ \ + throw Exception(ErrorCodes::NOT_IMPLEMENTED, \ + "The RANGE OFFSET frame for '{}' ORDER BY nest column is not implemented", \ + demangle(typeid(nest_compared_column).name())); \ +} // A specialization of compareValuesWithOffset for nullable. template -static int compareValuesWithOffsetNullable( - const IColumn * _compared_column, - size_t compared_row, - const IColumn * _reference_column, +static int compareValuesWithOffsetNullable(const IColumn * _compared_column, + size_t compared_row, const IColumn * _reference_column, size_t reference_row, const Field & _offset, bool offset_is_preceding) { - const auto * compared_column = assert_cast(_compared_column); - const auto * reference_column = assert_cast(_reference_column); + const auto * compared_column = assert_cast( + _compared_column); + const auto * reference_column = assert_cast( + _reference_column); if (compared_column->isNullAt(compared_row) && !reference_column->isNullAt(reference_row)) { @@ -225,59 +229,54 @@ static int compareValuesWithOffsetNullable( ColumnPtr nest_reference_column = reference_column->getNestedColumnPtr(); std::function - nest_compare_function; + bool offset_is_preceding)> nest_compare_function; APPLY_FOR_NEST_TYPES(compareValuesWithOffset) - return nest_compare_function( - nest_compared_column.get(), compared_row, nest_reference_column.get(), reference_row, _offset, offset_is_preceding); + return nest_compare_function(nest_compared_column.get(), compared_row, + nest_reference_column.get(), reference_row, _offset, offset_is_preceding); } // Helper macros to dispatch on type of the ORDER BY column #define APPLY_FOR_ONE_TYPE(FUNCTION, TYPE) \ - else if (typeid_cast(column)) \ - { \ - /* clang-tidy you're dumb, I can't put FUNCTION in braces here. */ \ - compare_values_with_offset = FUNCTION; /* NOLINT */ \ - } +else if (typeid_cast(column)) \ +{ \ + /* clang-tidy you're dumb, I can't put FUNCTION in braces here. */ \ + compare_values_with_offset = FUNCTION; /* NOLINT */ \ +} #define APPLY_FOR_TYPES(FUNCTION) \ - if (false) /* NOLINT */ \ - { \ - /* Do nothing, a starter condition. */ \ - } \ - APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector) \ - APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector) \ - APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector) \ - APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector) \ +if (false) /* NOLINT */ \ +{ \ + /* Do nothing, a starter condition. */ \ +} \ +APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector) \ +APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector) \ +APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector) \ +APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector) \ \ - APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector) \ - APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector) \ - APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector) \ - APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector) \ - APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector) \ +APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector) \ +APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector) \ +APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector) \ +APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector) \ +APPLY_FOR_ONE_TYPE(FUNCTION, ColumnVector) \ \ - APPLY_FOR_ONE_TYPE(FUNCTION##Float, ColumnVector) \ - APPLY_FOR_ONE_TYPE(FUNCTION##Float, ColumnVector) \ +APPLY_FOR_ONE_TYPE(FUNCTION##Float, ColumnVector) \ +APPLY_FOR_ONE_TYPE(FUNCTION##Float, ColumnVector) \ \ - APPLY_FOR_ONE_TYPE(FUNCTION##Nullable, ColumnNullable) \ - else \ - { \ - throw Exception( \ - ErrorCodes::NOT_IMPLEMENTED, \ - "The RANGE OFFSET frame for '{}' ORDER BY column is not implemented", \ - demangle(typeid(*column).name())); \ - } +APPLY_FOR_ONE_TYPE(FUNCTION##Nullable, ColumnNullable) \ +else \ +{ \ + throw Exception(ErrorCodes::NOT_IMPLEMENTED, \ + "The RANGE OFFSET frame for '{}' ORDER BY column is not implemented", \ + demangle(typeid(*column).name())); \ +} -WindowTransform::WindowTransform( - const Block & input_header_, - const Block & output_header_, - const WindowDescription & window_description_, - const std::vector & functions) +WindowTransform::WindowTransform(const Block & input_header_, + const Block & output_header_, + const WindowDescription & window_description_, + const std::vector & functions) : IProcessor({input_header_}, {output_header_}) , input(inputs.front()) , output(outputs.front()) @@ -308,7 +307,8 @@ WindowTransform::WindowTransform( workspace.argument_column_indices.reserve(f.argument_names.size()); for (const auto & argument_name : f.argument_names) { - workspace.argument_column_indices.push_back(input_header.getPositionByName(argument_name)); + workspace.argument_column_indices.push_back( + input_header.getPositionByName(argument_name)); } workspace.argument_columns.assign(f.argument_names.size(), nullptr); @@ -325,7 +325,9 @@ WindowTransform::WindowTransform( } workspace.is_aggregate_function_state = workspace.aggregate_function->isState(); - workspace.aggregate_function_state.reset(aggregate_function->sizeOfData(), aggregate_function->alignOfData()); + workspace.aggregate_function_state.reset( + aggregate_function->sizeOfData(), + aggregate_function->alignOfData()); aggregate_function->create(workspace.aggregate_function_state.data()); workspaces.push_back(std::move(workspace)); @@ -334,20 +336,24 @@ WindowTransform::WindowTransform( partition_by_indices.reserve(window_description.partition_by.size()); for (const auto & column : window_description.partition_by) { - partition_by_indices.push_back(input_header.getPositionByName(column.column_name)); + partition_by_indices.push_back( + input_header.getPositionByName(column.column_name)); } order_by_indices.reserve(window_description.order_by.size()); for (const auto & column : window_description.order_by) { - order_by_indices.push_back(input_header.getPositionByName(column.column_name)); + order_by_indices.push_back( + input_header.getPositionByName(column.column_name)); } // Choose a row comparison function for RANGE OFFSET frame based on the // type of the ORDER BY column. if (window_description.frame.type == WindowFrame::FrameType::RANGE - && (window_description.frame.begin_type == WindowFrame::BoundaryType::Offset - || window_description.frame.end_type == WindowFrame::BoundaryType::Offset)) + && (window_description.frame.begin_type + == WindowFrame::BoundaryType::Offset + || window_description.frame.end_type + == WindowFrame::BoundaryType::Offset)) { assert(order_by_indices.size() == 1); const auto & entry = input_header.getByPosition(order_by_indices[0]); @@ -357,26 +363,32 @@ WindowTransform::WindowTransform( // Convert the offsets to the ORDER BY column type. We can't just check // that the type matches, because e.g. the int literals are always // (U)Int64, but the column might be Int8 and so on. - if (window_description.frame.begin_type == WindowFrame::BoundaryType::Offset) + if (window_description.frame.begin_type + == WindowFrame::BoundaryType::Offset) { - window_description.frame.begin_offset = convertFieldToTypeOrThrow(window_description.frame.begin_offset, *entry.type); + window_description.frame.begin_offset = convertFieldToTypeOrThrow( + window_description.frame.begin_offset, + *entry.type); - if (applyVisitor(FieldVisitorAccurateLess{}, window_description.frame.begin_offset, Field(0))) + if (applyVisitor(FieldVisitorAccurateLess{}, + window_description.frame.begin_offset, Field(0))) { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Window frame start offset must be nonnegative, {} given", window_description.frame.begin_offset); } } - if (window_description.frame.end_type == WindowFrame::BoundaryType::Offset) + if (window_description.frame.end_type + == WindowFrame::BoundaryType::Offset) { - window_description.frame.end_offset = convertFieldToTypeOrThrow(window_description.frame.end_offset, *entry.type); + window_description.frame.end_offset = convertFieldToTypeOrThrow( + window_description.frame.end_offset, + *entry.type); - if (applyVisitor(FieldVisitorAccurateLess{}, window_description.frame.end_offset, Field(0))) + if (applyVisitor(FieldVisitorAccurateLess{}, + window_description.frame.end_offset, Field(0))) { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Window frame start offset must be nonnegative, {} given", window_description.frame.end_offset); } @@ -389,10 +401,11 @@ WindowTransform::WindowTransform( { if (!workspace.window_function_impl->checkWindowFrameType(this)) { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, "Unsupported window frame type for function '{}'", workspace.aggregate_function->getName()); + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported window frame type for function '{}'", + workspace.aggregate_function->getName()); } } + } } @@ -401,7 +414,8 @@ WindowTransform::~WindowTransform() // Some states may be not created yet if the creation failed. for (auto & ws : workspaces) { - ws.aggregate_function->destroy(ws.aggregate_function_state.data()); + ws.aggregate_function->destroy( + ws.aggregate_function_state.data()); } } @@ -475,10 +489,14 @@ void WindowTransform::advancePartitionEnd() size_t i = 0; for (; i < partition_by_columns; ++i) { - const auto * reference_column = inputAt(prev_frame_start)[partition_by_indices[i]].get(); - const auto * compared_column = inputAt(partition_end)[partition_by_indices[i]].get(); + const auto * reference_column + = inputAt(prev_frame_start)[partition_by_indices[i]].get(); + const auto * compared_column + = inputAt(partition_end)[partition_by_indices[i]].get(); - if (compared_column->compareAt(partition_end.row, prev_frame_start.row, *reference_column, 1 /* nan_direction_hint */) != 0) + if (compared_column->compareAt(partition_end.row, + prev_frame_start.row, *reference_column, + 1 /* nan_direction_hint */) != 0) { break; } @@ -590,8 +608,9 @@ auto WindowTransform::moveRowNumber(const RowNumber & original_row_number, Int64 void WindowTransform::advanceFrameStartRowsOffset() { // Just recalculate it each time by walking blocks. - const auto [moved_row, offset_left] = moveRowNumber( - current_row, window_description.frame.begin_offset.get() * (window_description.frame.begin_preceding ? -1 : 1)); + const auto [moved_row, offset_left] = moveRowNumber(current_row, + window_description.frame.begin_offset.get() + * (window_description.frame.begin_preceding ? -1 : 1)); frame_start = moved_row; @@ -628,17 +647,21 @@ void WindowTransform::advanceFrameStartRangeOffset() { // See the comment for advanceFrameEndRangeOffset(). const int direction = window_description.order_by[0].direction; - const bool preceding = window_description.frame.begin_preceding == (direction > 0); - const auto * reference_column = inputAt(current_row)[order_by_indices[0]].get(); + const bool preceding = window_description.frame.begin_preceding + == (direction > 0); + const auto * reference_column + = inputAt(current_row)[order_by_indices[0]].get(); for (; frame_start < partition_end; advanceRowNumber(frame_start)) { // The first frame value is [current_row] with offset, so we advance // while [frames_start] < [current_row] with offset. - const auto * compared_column = inputAt(frame_start)[order_by_indices[0]].get(); - if (compare_values_with_offset( - compared_column, frame_start.row, reference_column, current_row.row, window_description.frame.begin_offset, preceding) - * direction - >= 0) + const auto * compared_column + = inputAt(frame_start)[order_by_indices[0]].get(); + if (compare_values_with_offset(compared_column, frame_start.row, + reference_column, current_row.row, + window_description.frame.begin_offset, + preceding) + * direction >= 0) { frame_started = true; return; @@ -683,8 +706,7 @@ void WindowTransform::advanceFrameStart() advanceFrameStartRangeOffset(); break; default: - throw Exception( - ErrorCodes::NOT_IMPLEMENTED, + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Frame start type '{}' for frame '{}' is not implemented", window_description.frame.begin_type, window_description.frame.type); @@ -744,7 +766,8 @@ bool WindowTransform::arePeers(const RowNumber & x, const RowNumber & y) const { const auto * column_x = inputAt(x)[order_by_indices[i]].get(); const auto * column_y = inputAt(y)[order_by_indices[i]].get(); - if (column_x->compareAt(x.row, y.row, *column_y, 1 /* nan_direction_hint */) != 0) + if (column_x->compareAt(x.row, y.row, *column_y, + 1 /* nan_direction_hint */) != 0) { return false; } @@ -761,7 +784,8 @@ void WindowTransform::advanceFrameEndCurrentRow() // (only loop over rows and not over blocks), that should hopefully be more // efficient. // partition_end is either in this new block or past-the-end. - assert(frame_end.block == partition_end.block || frame_end.block + 1 == partition_end.block); + assert(frame_end.block == partition_end.block + || frame_end.block + 1 == partition_end.block); if (frame_end == partition_end) { @@ -823,8 +847,10 @@ void WindowTransform::advanceFrameEndRowsOffset() { // Walk the specified offset from the current row. The "+1" is needed // because the frame_end is a past-the-end pointer. - const auto [moved_row, offset_left] = moveRowNumber( - current_row, window_description.frame.end_offset.get() * (window_description.frame.end_preceding ? -1 : 1) + 1); + const auto [moved_row, offset_left] = moveRowNumber(current_row, + window_description.frame.end_offset.get() + * (window_description.frame.end_preceding ? -1 : 1) + + 1); if (partition_end <= moved_row) { @@ -857,18 +883,22 @@ void WindowTransform::advanceFrameEndRangeOffset() // PRECEDING/FOLLOWING change direction for DESC order. // See CD 9075-2:201?(E) 7.14 p. 429. const int direction = window_description.order_by[0].direction; - const bool preceding = window_description.frame.end_preceding == (direction > 0); - const auto * reference_column = inputAt(current_row)[order_by_indices[0]].get(); + const bool preceding = window_description.frame.end_preceding + == (direction > 0); + const auto * reference_column + = inputAt(current_row)[order_by_indices[0]].get(); for (; frame_end < partition_end; advanceRowNumber(frame_end)) { // The last frame value is current_row with offset, and we need a // past-the-end pointer, so we advance while // [frame_end] <= [current_row] with offset. - const auto * compared_column = inputAt(frame_end)[order_by_indices[0]].get(); - if (compare_values_with_offset( - compared_column, frame_end.row, reference_column, current_row.row, window_description.frame.end_offset, preceding) - * direction - > 0) + const auto * compared_column + = inputAt(frame_end)[order_by_indices[0]].get(); + if (compare_values_with_offset(compared_column, frame_end.row, + reference_column, current_row.row, + window_description.frame.end_offset, + preceding) + * direction > 0) { frame_ended = true; return; @@ -903,8 +933,9 @@ void WindowTransform::advanceFrameEnd() advanceFrameEndRangeOffset(); break; default: - throw Exception( - ErrorCodes::NOT_IMPLEMENTED, "The frame end type '{}' is not implemented", window_description.frame.end_type); + throw Exception(ErrorCodes::NOT_IMPLEMENTED, + "The frame end type '{}' is not implemented", + window_description.frame.end_type); } break; } @@ -975,9 +1006,13 @@ void WindowTransform::updateAggregationState() // rows manually, instead of using advanceRowNumber(). // For this purpose, the past-the-end block can be different than the // block of the past-the-end row (it's usually the next block). - const auto past_the_end_block = rows_to_add_end.row == 0 ? rows_to_add_end.block : rows_to_add_end.block + 1; + const auto past_the_end_block = rows_to_add_end.row == 0 + ? rows_to_add_end.block + : rows_to_add_end.block + 1; - for (auto block_number = rows_to_add_start.block; block_number < past_the_end_block; ++block_number) + for (auto block_number = rows_to_add_start.block; + block_number < past_the_end_block; + ++block_number) { auto & block = blockAt(block_number); @@ -985,15 +1020,18 @@ void WindowTransform::updateAggregationState() { for (size_t i = 0; i < ws.argument_column_indices.size(); ++i) { - ws.argument_columns[i] = block.input_columns[ws.argument_column_indices[i]].get(); + ws.argument_columns[i] = block.input_columns[ + ws.argument_column_indices[i]].get(); } ws.cached_block_number = block_number; } // First and last blocks may be processed partially, and other blocks // are processed in full. - const auto first_row = block_number == rows_to_add_start.block ? rows_to_add_start.row : 0; - const auto past_the_end_row = block_number == rows_to_add_end.block ? rows_to_add_end.row : block.rows; + const auto first_row = block_number == rows_to_add_start.block + ? rows_to_add_start.row : 0; + const auto past_the_end_row = block_number == rows_to_add_end.block + ? rows_to_add_end.row : block.rows; // We should add an addBatch analog that can accept a starting offset. // For now, add the values one by one. @@ -1041,7 +1079,8 @@ void WindowTransform::writeOutCurrentRow() } } -static void assertSameColumns(const Columns & left_all, const Columns & right_all) +static void assertSameColumns(const Columns & left_all, + const Columns & right_all) { assert(left_all.size() == right_all.size()); @@ -1059,7 +1098,8 @@ static void assertSameColumns(const Columns & left_all, const Columns & right_al if (const auto * right_lc = typeid_cast(right_column)) right_column = right_lc->getDictionary().getNestedColumn().get(); - assert(typeid(*left_column).hash_code() == typeid(*right_column).hash_code()); + assert(typeid(*left_column).hash_code() + == typeid(*right_column).hash_code()); if (isColumnConst(*left_column)) { @@ -1120,7 +1160,8 @@ void WindowTransform::appendChunk(Chunk & chunk) if (ws.window_function_impl) block.casted_columns.push_back(ws.window_function_impl->castColumn(block.input_columns, ws.argument_column_indices)); - block.output_columns.push_back(ws.aggregate_function->getResultType()->createColumn()); + block.output_columns.push_back(ws.aggregate_function->getResultType() + ->createColumn()); block.output_columns.back()->reserve(block.rows); } @@ -1444,10 +1485,12 @@ void WindowTransform::work() // that the frame start can be further than current row for some frame specs // (e.g. EXCLUDE CURRENT ROW), so we have to check both. assert(prev_frame_start <= frame_start); - const auto first_used_block = std::min(next_output_block_number, std::min(prev_frame_start.block, current_row.block)); + const auto first_used_block = std::min(next_output_block_number, + std::min(prev_frame_start.block, current_row.block)); if (first_block_number < first_used_block) { - blocks.erase(blocks.begin(), blocks.begin() + (first_used_block - first_block_number)); + blocks.erase(blocks.begin(), + blocks.begin() + (first_used_block - first_block_number)); first_block_number = first_used_block; assert(next_output_block_number >= first_block_number); @@ -1458,82 +1501,83 @@ void WindowTransform::work() } } - struct WindowFunctionRank final : public WindowFunction { - WindowFunctionRank(const std::string & name_, const DataTypes & argument_types_, const Array & parameters_) + WindowFunctionRank(const std::string & name_, + const DataTypes & argument_types_, const Array & parameters_) : WindowFunction(name_, argument_types_, parameters_, std::make_shared()) - { - } + {} bool allocatesMemoryInArena() const override { return false; } - void windowInsertResultInto(const WindowTransform * transform, size_t function_index) const override + void windowInsertResultInto(const WindowTransform * transform, + size_t function_index) const override { - IColumn & to = *transform->blockAt(transform->current_row).output_columns[function_index]; - assert_cast(to).getData().push_back(transform->peer_group_start_row_number); + IColumn & to = *transform->blockAt(transform->current_row) + .output_columns[function_index]; + assert_cast(to).getData().push_back( + transform->peer_group_start_row_number); } }; struct WindowFunctionDenseRank final : public WindowFunction { - WindowFunctionDenseRank(const std::string & name_, const DataTypes & argument_types_, const Array & parameters_) + WindowFunctionDenseRank(const std::string & name_, + const DataTypes & argument_types_, const Array & parameters_) : WindowFunction(name_, argument_types_, parameters_, std::make_shared()) - { - } + {} bool allocatesMemoryInArena() const override { return false; } - void windowInsertResultInto(const WindowTransform * transform, size_t function_index) const override + void windowInsertResultInto(const WindowTransform * transform, + size_t function_index) const override { - IColumn & to = *transform->blockAt(transform->current_row).output_columns[function_index]; - assert_cast(to).getData().push_back(transform->peer_group_number); + IColumn & to = *transform->blockAt(transform->current_row) + .output_columns[function_index]; + assert_cast(to).getData().push_back( + transform->peer_group_number); } }; namespace recurrent_detail { -template -T getValue(const WindowTransform * /*transform*/, size_t /*function_index*/, size_t /*column_index*/, RowNumber /*row*/) -{ - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "recurrent_detail::getValue() is not implemented for {} type", typeid(T).name()); -} + template T getValue(const WindowTransform * /*transform*/, size_t /*function_index*/, size_t /*column_index*/, RowNumber /*row*/) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "recurrent_detail::getValue() is not implemented for {} type", typeid(T).name()); + } -template <> -Float64 getValue(const WindowTransform * transform, size_t function_index, size_t column_index, RowNumber row) -{ - const auto & workspace = transform->workspaces[function_index]; - const auto & column = transform->blockAt(row.block).input_columns[workspace.argument_column_indices[column_index]]; - return column->getFloat64(row.row); -} + template<> Float64 getValue(const WindowTransform * transform, size_t function_index, size_t column_index, RowNumber row) + { + const auto & workspace = transform->workspaces[function_index]; + const auto & column = transform->blockAt(row.block).input_columns[workspace.argument_column_indices[column_index]]; + return column->getFloat64(row.row); + } -template -void setValueToOutputColumn(const WindowTransform * /*transform*/, size_t /*function_index*/, T /*value*/) -{ - throw Exception( - ErrorCodes::NOT_IMPLEMENTED, "recurrent_detail::setValueToOutputColumn() is not implemented for {} type", typeid(T).name()); -} + template void setValueToOutputColumn(const WindowTransform * /*transform*/, size_t /*function_index*/, T /*value*/) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, + "recurrent_detail::setValueToOutputColumn() is not implemented for {} type", typeid(T).name()); + } -template <> -void setValueToOutputColumn(const WindowTransform * transform, size_t function_index, Float64 value) -{ - auto current_row = transform->current_row; - const auto & current_block = transform->blockAt(current_row); - IColumn & to = *current_block.output_columns[function_index]; + template<> void setValueToOutputColumn(const WindowTransform * transform, size_t function_index, Float64 value) + { + auto current_row = transform->current_row; + const auto & current_block = transform->blockAt(current_row); + IColumn & to = *current_block.output_columns[function_index]; - assert_cast(to).getData().push_back(value); -} + assert_cast(to).getData().push_back(value); + } } struct WindowFunctionHelpers { - template + template static T getValue(const WindowTransform * transform, size_t function_index, size_t column_index, RowNumber row) { return recurrent_detail::getValue(transform, function_index, column_index, row); } - template + template static void setValueToOutputColumn(const WindowTransform * transform, size_t function_index, T value) { recurrent_detail::setValueToOutputColumn(transform, function_index, value); @@ -1568,7 +1612,6 @@ struct WindowFunctionHelpers } }; - struct ExponentialTimeDecayedSumState { Float64 previous_time; @@ -1591,34 +1634,34 @@ struct WindowFunctionExponentialTimeDecayedSum final : public StatefulWindowFunc { if (parameters_.size() != 1) { - throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} takes exactly one parameter", name_); + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Function {} takes exactly one parameter", name_); } return applyVisitor(FieldVisitorConvertToNumber(), parameters_[0]); } - WindowFunctionExponentialTimeDecayedSum(const std::string & name_, const DataTypes & argument_types_, const Array & parameters_) + WindowFunctionExponentialTimeDecayedSum(const std::string & name_, + const DataTypes & argument_types_, const Array & parameters_) : StatefulWindowFunction(name_, argument_types_, parameters_, std::make_shared()) , decay_length(getDecayLength(parameters_, name_)) { if (argument_types.size() != 2) { - throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} takes exactly two arguments", name_); + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Function {} takes exactly two arguments", name_); } if (!isNumber(argument_types[ARGUMENT_VALUE])) { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Argument {} must be a number, '{}' given", ARGUMENT_VALUE, argument_types[ARGUMENT_VALUE]->getName()); } - if (!isNumber(argument_types[ARGUMENT_TIME]) && !isDateTime(argument_types[ARGUMENT_TIME]) - && !isDateTime64(argument_types[ARGUMENT_TIME])) + if (!isNumber(argument_types[ARGUMENT_TIME]) && !isDateTime(argument_types[ARGUMENT_TIME]) && !isDateTime64(argument_types[ARGUMENT_TIME])) { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Argument {} must be DateTime, DateTime64 or a number, '{}' given", ARGUMENT_TIME, argument_types[ARGUMENT_TIME]->getName()); @@ -1627,7 +1670,8 @@ struct WindowFunctionExponentialTimeDecayedSum final : public StatefulWindowFunc bool allocatesMemoryInArena() const override { return false; } - void windowInsertResultInto(const WindowTransform * transform, size_t function_index) const override + void windowInsertResultInto(const WindowTransform * transform, + size_t function_index) const override { const auto & workspace = transform->workspaces[function_index]; auto & state = getState(workspace); @@ -1639,7 +1683,8 @@ struct WindowFunctionExponentialTimeDecayedSum final : public StatefulWindowFunc RowNumber frame_back = transform->prevRowNumber(transform->frame_end); Float64 back_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, frame_back); - if (transform->prev_frame_start <= transform->frame_start && transform->frame_start < transform->prev_frame_end + if (transform->prev_frame_start <= transform->frame_start + && transform->frame_start < transform->prev_frame_end && transform->prev_frame_end <= transform->frame_end) { for (RowNumber i = transform->prev_frame_start; i < transform->frame_start; transform->advanceRowNumber(i)) @@ -1673,8 +1718,8 @@ struct WindowFunctionExponentialTimeDecayedSum final : public StatefulWindowFunc WindowFunctionHelpers::setValueToOutputColumn(transform, function_index, result); } -private: - const Float64 decay_length; + private: + const Float64 decay_length; }; struct WindowFunctionExponentialTimeDecayedMax final : public WindowFunction @@ -1686,34 +1731,34 @@ struct WindowFunctionExponentialTimeDecayedMax final : public WindowFunction { if (parameters_.size() != 1) { - throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} takes exactly one parameter", name_); + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Function {} takes exactly one parameter", name_); } return applyVisitor(FieldVisitorConvertToNumber(), parameters_[0]); } - WindowFunctionExponentialTimeDecayedMax(const std::string & name_, const DataTypes & argument_types_, const Array & parameters_) + WindowFunctionExponentialTimeDecayedMax(const std::string & name_, + const DataTypes & argument_types_, const Array & parameters_) : WindowFunction(name_, argument_types_, parameters_, std::make_shared()) , decay_length(getDecayLength(parameters_, name_)) { if (argument_types.size() != 2) { - throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} takes exactly two arguments", name_); + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Function {} takes exactly two arguments", name_); } if (!isNumber(argument_types[ARGUMENT_VALUE])) { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Argument {} must be a number, '{}' given", ARGUMENT_VALUE, argument_types[ARGUMENT_VALUE]->getName()); } - if (!isNumber(argument_types[ARGUMENT_TIME]) && !isDateTime(argument_types[ARGUMENT_TIME]) - && !isDateTime64(argument_types[ARGUMENT_TIME])) + if (!isNumber(argument_types[ARGUMENT_TIME]) && !isDateTime(argument_types[ARGUMENT_TIME]) && !isDateTime64(argument_types[ARGUMENT_TIME])) { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Argument {} must be DateTime, DateTime64 or a number, '{}' given", ARGUMENT_TIME, argument_types[ARGUMENT_TIME]->getName()); @@ -1722,7 +1767,8 @@ struct WindowFunctionExponentialTimeDecayedMax final : public WindowFunction bool allocatesMemoryInArena() const override { return false; } - void windowInsertResultInto(const WindowTransform * transform, size_t function_index) const override + void windowInsertResultInto(const WindowTransform * transform, + size_t function_index) const override { Float64 result = std::numeric_limits::quiet_NaN(); @@ -1748,8 +1794,8 @@ struct WindowFunctionExponentialTimeDecayedMax final : public WindowFunction WindowFunctionHelpers::setValueToOutputColumn(transform, function_index, result); } -private: - const Float64 decay_length; + private: + const Float64 decay_length; }; struct WindowFunctionExponentialTimeDecayedCount final : public StatefulWindowFunction @@ -1760,25 +1806,26 @@ struct WindowFunctionExponentialTimeDecayedCount final : public StatefulWindowFu { if (parameters_.size() != 1) { - throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} takes exactly one parameter", name_); + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Function {} takes exactly one parameter", name_); } return applyVisitor(FieldVisitorConvertToNumber(), parameters_[0]); } - WindowFunctionExponentialTimeDecayedCount(const std::string & name_, const DataTypes & argument_types_, const Array & parameters_) + WindowFunctionExponentialTimeDecayedCount(const std::string & name_, + const DataTypes & argument_types_, const Array & parameters_) : StatefulWindowFunction(name_, argument_types_, parameters_, std::make_shared()) , decay_length(getDecayLength(parameters_, name_)) { if (argument_types.size() != 1) { - throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} takes exactly one argument", name_); + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Function {} takes exactly one argument", name_); } - if (!isNumber(argument_types[ARGUMENT_TIME]) && !isDateTime(argument_types[ARGUMENT_TIME]) - && !isDateTime64(argument_types[ARGUMENT_TIME])) + if (!isNumber(argument_types[ARGUMENT_TIME]) && !isDateTime(argument_types[ARGUMENT_TIME]) && !isDateTime64(argument_types[ARGUMENT_TIME])) { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Argument {} must be DateTime, DateTime64 or a number, '{}' given", ARGUMENT_TIME, argument_types[ARGUMENT_TIME]->getName()); @@ -1787,7 +1834,8 @@ struct WindowFunctionExponentialTimeDecayedCount final : public StatefulWindowFu bool allocatesMemoryInArena() const override { return false; } - void windowInsertResultInto(const WindowTransform * transform, size_t function_index) const override + void windowInsertResultInto(const WindowTransform * transform, + size_t function_index) const override { const auto & workspace = transform->workspaces[function_index]; auto & state = getState(workspace); @@ -1799,7 +1847,8 @@ struct WindowFunctionExponentialTimeDecayedCount final : public StatefulWindowFu RowNumber frame_back = transform->prevRowNumber(transform->frame_end); Float64 back_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, frame_back); - if (transform->prev_frame_start <= transform->frame_start && transform->frame_start < transform->prev_frame_end + if (transform->prev_frame_start <= transform->frame_start + && transform->frame_start < transform->prev_frame_end && transform->prev_frame_end <= transform->frame_end) { for (RowNumber i = transform->prev_frame_start; i < transform->frame_start; transform->advanceRowNumber(i)) @@ -1830,8 +1879,8 @@ struct WindowFunctionExponentialTimeDecayedCount final : public StatefulWindowFu WindowFunctionHelpers::setValueToOutputColumn(transform, function_index, result); } -private: - const Float64 decay_length; + private: + const Float64 decay_length; }; struct WindowFunctionExponentialTimeDecayedAvg final : public StatefulWindowFunction @@ -1843,34 +1892,34 @@ struct WindowFunctionExponentialTimeDecayedAvg final : public StatefulWindowFunc { if (parameters_.size() != 1) { - throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} takes exactly one parameter", name_); + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Function {} takes exactly one parameter", name_); } return applyVisitor(FieldVisitorConvertToNumber(), parameters_[0]); } - WindowFunctionExponentialTimeDecayedAvg(const std::string & name_, const DataTypes & argument_types_, const Array & parameters_) + WindowFunctionExponentialTimeDecayedAvg(const std::string & name_, + const DataTypes & argument_types_, const Array & parameters_) : StatefulWindowFunction(name_, argument_types_, parameters_, std::make_shared()) , decay_length(getDecayLength(parameters_, name_)) { if (argument_types.size() != 2) { - throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} takes exactly two arguments", name_); + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Function {} takes exactly two arguments", name_); } if (!isNumber(argument_types[ARGUMENT_VALUE])) { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Argument {} must be a number, '{}' given", ARGUMENT_VALUE, argument_types[ARGUMENT_VALUE]->getName()); } - if (!isNumber(argument_types[ARGUMENT_TIME]) && !isDateTime(argument_types[ARGUMENT_TIME]) - && !isDateTime64(argument_types[ARGUMENT_TIME])) + if (!isNumber(argument_types[ARGUMENT_TIME]) && !isDateTime(argument_types[ARGUMENT_TIME]) && !isDateTime64(argument_types[ARGUMENT_TIME])) { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Argument {} must be DateTime, DateTime64 or a number, '{}' given", ARGUMENT_TIME, argument_types[ARGUMENT_TIME]->getName()); @@ -1879,7 +1928,8 @@ struct WindowFunctionExponentialTimeDecayedAvg final : public StatefulWindowFunc bool allocatesMemoryInArena() const override { return false; } - void windowInsertResultInto(const WindowTransform * transform, size_t function_index) const override + void windowInsertResultInto(const WindowTransform * transform, + size_t function_index) const override { const auto & workspace = transform->workspaces[function_index]; auto & state = getState(workspace); @@ -1893,7 +1943,8 @@ struct WindowFunctionExponentialTimeDecayedAvg final : public StatefulWindowFunc RowNumber frame_back = transform->prevRowNumber(transform->frame_end); Float64 back_t = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIME, frame_back); - if (transform->prev_frame_start <= transform->frame_start && transform->frame_start < transform->prev_frame_end + if (transform->prev_frame_start <= transform->frame_start + && transform->frame_start < transform->prev_frame_end && transform->prev_frame_end <= transform->frame_end) { for (RowNumber i = transform->prev_frame_start; i < transform->frame_start; transform->advanceRowNumber(i)) @@ -1936,49 +1987,56 @@ struct WindowFunctionExponentialTimeDecayedAvg final : public StatefulWindowFunc state.previous_count = count; state.previous_time = back_t; - result = sum / count; + result = sum/count; } WindowFunctionHelpers::setValueToOutputColumn(transform, function_index, result); } -private: - const Float64 decay_length; + private: + const Float64 decay_length; }; struct WindowFunctionRowNumber final : public WindowFunction { - WindowFunctionRowNumber(const std::string & name_, const DataTypes & argument_types_, const Array & parameters_) + WindowFunctionRowNumber(const std::string & name_, + const DataTypes & argument_types_, const Array & parameters_) : WindowFunction(name_, argument_types_, parameters_, std::make_shared()) - { - } + {} bool allocatesMemoryInArena() const override { return false; } - void windowInsertResultInto(const WindowTransform * transform, size_t function_index) const override + void windowInsertResultInto(const WindowTransform * transform, + size_t function_index) const override { - IColumn & to = *transform->blockAt(transform->current_row).output_columns[function_index]; - assert_cast(to).getData().push_back(transform->current_row_number); + IColumn & to = *transform->blockAt(transform->current_row) + .output_columns[function_index]; + assert_cast(to).getData().push_back( + transform->current_row_number); } }; namespace { -struct NtileState -{ - UInt64 buckets = 0; - RowNumber start_row; - UInt64 current_partition_rows = 0; - UInt64 current_partition_inserted_row = 0; + struct NtileState + { + UInt64 buckets = 0; + RowNumber start_row; + UInt64 current_partition_rows = 0; + UInt64 current_partition_inserted_row = 0; - void windowInsertResultInto(const WindowTransform * transform, size_t function_index, const DataTypes & argument_types); -}; + void windowInsertResultInto( + const WindowTransform * transform, + size_t function_index, + const DataTypes & argument_types); + }; } // Usage: ntile(n). n is the number of buckets. struct WindowFunctionNtile final : public StatefulWindowFunction { - WindowFunctionNtile(const std::string & name_, const DataTypes & argument_types_, const Array & parameters_) + WindowFunctionNtile(const std::string & name_, + const DataTypes & argument_types_, const Array & parameters_) : StatefulWindowFunction(name_, argument_types_, parameters_, std::make_shared()) { if (argument_types.size() != 1) @@ -1986,11 +2044,7 @@ struct WindowFunctionNtile final : public StatefulWindowFunction auto type_id = argument_types[0]->getTypeId(); if (type_id != TypeIndex::UInt8 && type_id != TypeIndex::UInt16 && type_id != TypeIndex::UInt32 && type_id != TypeIndex::UInt64) - throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "'{}' argument type must be an unsigned integer (not larger than 64-bit), got {}", - name_, - argument_types[0]->getName()); + throw Exception(ErrorCodes::BAD_ARGUMENTS, "'{}' argument type must be an unsigned integer (not larger than 64-bit), got {}", name_, argument_types[0]->getName()); } bool allocatesMemoryInArena() const override { return false; } @@ -2021,13 +2075,13 @@ struct WindowFunctionNtile final : public StatefulWindowFunction std::optional getDefaultFrame() const override { WindowFrame frame; - frame.is_default = false; frame.type = WindowFrame::FrameType::ROWS; frame.end_type = WindowFrame::BoundaryType::Unbounded; return frame; } - void windowInsertResultInto(const WindowTransform * transform, size_t function_index) const override + void windowInsertResultInto(const WindowTransform * transform, + size_t function_index) const override { const auto & workspace = transform->workspaces[function_index]; auto & state = getState(workspace); @@ -2037,81 +2091,84 @@ struct WindowFunctionNtile final : public StatefulWindowFunction namespace { -void NtileState::windowInsertResultInto(const WindowTransform * transform, size_t function_index, const DataTypes & argument_types) -{ - if (!buckets) [[unlikely]] + void NtileState::windowInsertResultInto( + const WindowTransform * transform, + size_t function_index, + const DataTypes & argument_types) { - const auto & current_block = transform->blockAt(transform->current_row); - const auto & workspace = transform->workspaces[function_index]; - const auto & arg_col = *current_block.original_input_columns[workspace.argument_column_indices[0]]; - if (!isColumnConst(arg_col)) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Argument of 'ntile' function must be a constant"); - auto type_id = argument_types[0]->getTypeId(); - if (type_id == TypeIndex::UInt8) - buckets = arg_col[transform->current_row.row].get(); - else if (type_id == TypeIndex::UInt16) - buckets = arg_col[transform->current_row.row].get(); - else if (type_id == TypeIndex::UInt32) - buckets = arg_col[transform->current_row.row].get(); - else if (type_id == TypeIndex::UInt64) - buckets = arg_col[transform->current_row.row].get(); - - if (!buckets) + if (!buckets) [[unlikely]] { - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Argument of 'ntile' function must be greater than zero"); - } - } - // new partition - if (WindowFunctionHelpers::checkPartitionEnterFirstRow(transform)) [[unlikely]] - { - current_partition_rows = 0; - current_partition_inserted_row = 0; - start_row = transform->current_row; - } - current_partition_rows++; + const auto & current_block = transform->blockAt(transform->current_row); + const auto & workspace = transform->workspaces[function_index]; + const auto & arg_col = *current_block.original_input_columns[workspace.argument_column_indices[0]]; + if (!isColumnConst(arg_col)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Argument of 'ntile' function must be a constant"); + auto type_id = argument_types[0]->getTypeId(); + if (type_id == TypeIndex::UInt8) + buckets = arg_col[transform->current_row.row].get(); + else if (type_id == TypeIndex::UInt16) + buckets = arg_col[transform->current_row.row].get(); + else if (type_id == TypeIndex::UInt32) + buckets = arg_col[transform->current_row.row].get(); + else if (type_id == TypeIndex::UInt64) + buckets = arg_col[transform->current_row.row].get(); - // Only do the action when we meet the last row in this partition. - if (!WindowFunctionHelpers::checkPartitionEnterLastRow(transform)) - return; - - auto bucket_capacity = current_partition_rows / buckets; - auto capacity_diff = current_partition_rows - bucket_capacity * buckets; - - // bucket number starts from 1. - UInt64 bucket_num = 1; - while (current_partition_inserted_row < current_partition_rows) - { - auto current_bucket_capacity = bucket_capacity; - if (capacity_diff > 0) - { - current_bucket_capacity += 1; - capacity_diff--; - } - auto left_rows = current_bucket_capacity; - while (left_rows) - { - auto available_block_rows = transform->blockRowsNumber(start_row) - start_row.row; - IColumn & to = *transform->blockAt(start_row).output_columns[function_index]; - auto & pod_array = assert_cast(to).getData(); - if (left_rows < available_block_rows) + if (!buckets) { - pod_array.resize_fill(pod_array.size() + left_rows, bucket_num); - start_row.row += left_rows; - left_rows = 0; - } - else - { - pod_array.resize_fill(pod_array.size() + available_block_rows, bucket_num); - left_rows -= available_block_rows; - start_row.block++; - start_row.row = 0; + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Argument of 'ntile' function must be greater than zero"); } } - current_partition_inserted_row += current_bucket_capacity; - bucket_num += 1; + // new partition + if (WindowFunctionHelpers::checkPartitionEnterFirstRow(transform)) [[unlikely]] + { + current_partition_rows = 0; + current_partition_inserted_row = 0; + start_row = transform->current_row; + } + current_partition_rows++; + + // Only do the action when we meet the last row in this partition. + if (!WindowFunctionHelpers::checkPartitionEnterLastRow(transform)) + return; + + auto bucket_capacity = current_partition_rows / buckets; + auto capacity_diff = current_partition_rows - bucket_capacity * buckets; + + // bucket number starts from 1. + UInt64 bucket_num = 1; + while (current_partition_inserted_row < current_partition_rows) + { + auto current_bucket_capacity = bucket_capacity; + if (capacity_diff > 0) + { + current_bucket_capacity += 1; + capacity_diff--; + } + auto left_rows = current_bucket_capacity; + while (left_rows) + { + auto available_block_rows = transform->blockRowsNumber(start_row) - start_row.row; + IColumn & to = *transform->blockAt(start_row).output_columns[function_index]; + auto & pod_array = assert_cast(to).getData(); + if (left_rows < available_block_rows) + { + pod_array.resize_fill(pod_array.size() + left_rows, bucket_num); + start_row.row += left_rows; + left_rows = 0; + } + else + { + pod_array.resize_fill(pod_array.size() + available_block_rows, bucket_num); + left_rows -= available_block_rows; + start_row.block++; + start_row.row = 0; + } + } + current_partition_inserted_row += current_bucket_capacity; + bucket_num += 1; + } } } -} namespace { @@ -2125,20 +2182,21 @@ struct PercentRankState struct WindowFunctionPercentRank final : public StatefulWindowFunction { public: - WindowFunctionPercentRank(const std::string & name_, const DataTypes & argument_types_, const Array & parameters_) + WindowFunctionPercentRank(const std::string & name_, + const DataTypes & argument_types_, const Array & parameters_) : StatefulWindowFunction(name_, argument_types_, parameters_, std::make_shared()) - { - } + {} bool allocatesMemoryInArena() const override { return false; } bool checkWindowFrameType(const WindowTransform * transform) const override { - if (transform->window_description.frame != getDefaultFrame()) + auto default_window_frame = getDefaultFrame(); + if (transform->window_description.frame != default_window_frame) { LOG_ERROR( getLogger("WindowFunctionPercentRank"), - "Window frame for function 'percent_rank' should be '{}'", getDefaultFrame()->toString()); + "Window frame for function 'percent_rank' should be '{}'", default_window_frame->toString()); return false; } return true; @@ -2147,11 +2205,9 @@ public: std::optional getDefaultFrame() const override { WindowFrame frame; - frame.is_default = false; frame.type = WindowFrame::FrameType::RANGE; frame.begin_type = WindowFrame::BoundaryType::Unbounded; frame.end_type = WindowFrame::BoundaryType::Unbounded; - //frame.end_type = WindowFrame::BoundaryType::Current; return frame; } @@ -2227,12 +2283,14 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction { FunctionBasePtr func_cast = nullptr; - WindowFunctionLagLeadInFrame(const std::string & name_, const DataTypes & argument_types_, const Array & parameters_) + WindowFunctionLagLeadInFrame(const std::string & name_, + const DataTypes & argument_types_, const Array & parameters_) : WindowFunction(name_, argument_types_, parameters_, createResultType(argument_types_, name_)) { if (!parameters.empty()) { - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Function {} cannot be parameterized", name_); + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Function {} cannot be parameterized", name_); } if (argument_types.size() == 1) @@ -2242,7 +2300,9 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction if (!isInt64OrUInt64FieldType(argument_types[1]->getDefault().getType())) { - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Offset must be an integer, '{}' given", argument_types[1]->getName()); + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Offset must be an integer, '{}' given", + argument_types[1]->getName()); } if (argument_types.size() == 2) @@ -2252,11 +2312,9 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction if (argument_types.size() > 3) { - throw Exception( - ErrorCodes::TOO_MANY_ARGUMENTS_FOR_FUNCTION, + throw Exception(ErrorCodes::TOO_MANY_ARGUMENTS_FOR_FUNCTION, "Function '{}' accepts at most 3 arguments, {} given", - name, - argument_types.size()); + name, argument_types.size()); } if (argument_types[0]->equals(*argument_types[2])) @@ -2265,16 +2323,14 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction const auto supertype = tryGetLeastSupertype(DataTypes{argument_types[0], argument_types[2]}); if (!supertype) { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, + throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no supertype for the argument type '{}' and the default value type '{}'", argument_types[0]->getName(), argument_types[2]->getName()); } if (!argument_types[0]->equals(*supertype)) { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, + throw Exception(ErrorCodes::BAD_ARGUMENTS, "The supertype '{}' for the argument type '{}' and the default value type '{}' is not the same as the argument type", supertype->getName(), argument_types[0]->getName(), @@ -2283,8 +2339,15 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction const auto from_name = argument_types[2]->getName(); const auto to_name = argument_types[0]->getName(); - ColumnsWithTypeAndName arguments{ - {argument_types[2], ""}, {DataTypeString().createColumnConst(0, to_name), std::make_shared(), ""}}; + ColumnsWithTypeAndName arguments + { + { argument_types[2], "" }, + { + DataTypeString().createColumnConst(0, to_name), + std::make_shared(), + "" + } + }; auto get_cast_func = [&arguments] { @@ -2293,6 +2356,7 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction }; func_cast = get_cast_func(); + } ColumnPtr castColumn(const Columns & columns, const std::vector & idx) override @@ -2300,11 +2364,15 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction if (!func_cast) return nullptr; - ColumnsWithTypeAndName arguments{ - {columns[idx[2]], argument_types[2], ""}, - {DataTypeString().createColumnConst(columns[idx[2]]->size(), argument_types[0]->getName()), - std::make_shared(), - ""}}; + ColumnsWithTypeAndName arguments + { + { columns[idx[2]], argument_types[2], "" }, + { + DataTypeString().createColumnConst(columns[idx[2]]->size(), argument_types[0]->getName()), + std::make_shared(), + "" + } + }; return func_cast->execute(arguments, argument_types[0], columns[idx[2]]->size()); } @@ -2313,7 +2381,8 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction { if (argument_types_.empty()) { - throw Exception(ErrorCodes::TOO_FEW_ARGUMENTS_FOR_FUNCTION, "Function {} takes at least one argument", name_); + throw Exception(ErrorCodes::TOO_FEW_ARGUMENTS_FOR_FUNCTION, + "Function {} takes at least one argument", name_); } return argument_types_[0]; @@ -2321,7 +2390,8 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction bool allocatesMemoryInArena() const override { return false; } - void windowInsertResultInto(const WindowTransform * transform, size_t function_index) const override + void windowInsertResultInto(const WindowTransform * transform, + size_t function_index) const override { const auto & current_block = transform->blockAt(transform->current_row); IColumn & to = *current_block.output_columns[function_index]; @@ -2330,27 +2400,34 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction Int64 offset = 1; if (argument_types.size() > 1) { - offset = (*current_block.input_columns[workspace.argument_column_indices[1]])[transform->current_row.row].get(); + offset = (*current_block.input_columns[ + workspace.argument_column_indices[1]])[ + transform->current_row.row].get(); /// Either overflow or really negative value, both is not acceptable. if (offset < 0) { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, "The offset for function {} must be in (0, {}], {} given", getName(), INT64_MAX, offset); + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "The offset for function {} must be in (0, {}], {} given", + getName(), INT64_MAX, offset); } } - const auto [target_row, offset_left] = transform->moveRowNumber(transform->current_row, offset * (is_lead ? 1 : -1)); + const auto [target_row, offset_left] = transform->moveRowNumber( + transform->current_row, offset * (is_lead ? 1 : -1)); - if (offset_left != 0 || target_row < transform->frame_start || transform->frame_end <= target_row) + if (offset_left != 0 + || target_row < transform->frame_start + || transform->frame_end <= target_row) { // Offset is outside the frame. if (argument_types.size() > 2) { // Column with default values is specified. - const IColumn & default_column = current_block.casted_columns[function_index] - ? *current_block.casted_columns[function_index].get() - : *current_block.input_columns[workspace.argument_column_indices[2]].get(); + const IColumn & default_column = + current_block.casted_columns[function_index] ? + *current_block.casted_columns[function_index].get() : + *current_block.input_columns[workspace.argument_column_indices[2]].get(); to.insert(default_column[transform->current_row.row]); } @@ -2362,24 +2439,30 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction else { // Offset is inside the frame. - to.insertFrom(*transform->blockAt(target_row).input_columns[workspace.argument_column_indices[0]], target_row.row); + to.insertFrom(*transform->blockAt(target_row).input_columns[ + workspace.argument_column_indices[0]], + target_row.row); } } }; struct WindowFunctionNthValue final : public WindowFunction { - WindowFunctionNthValue(const std::string & name_, const DataTypes & argument_types_, const Array & parameters_) + WindowFunctionNthValue(const std::string & name_, + const DataTypes & argument_types_, const Array & parameters_) : WindowFunction(name_, argument_types_, parameters_, createResultType(name_, argument_types_)) { if (!parameters.empty()) { - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Function {} cannot be parameterized", name_); + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Function {} cannot be parameterized", name_); } if (!isInt64OrUInt64FieldType(argument_types[1]->getDefault().getType())) { - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Offset must be an integer, '{}' given", argument_types[1]->getName()); + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Offset must be an integer, '{}' given", + argument_types[1]->getName()); } } @@ -2387,7 +2470,8 @@ struct WindowFunctionNthValue final : public WindowFunction { if (argument_types_.size() != 2) { - throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} takes exactly two arguments", name_); + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Function {} takes exactly two arguments", name_); } return argument_types_[0]; @@ -2395,24 +2479,30 @@ struct WindowFunctionNthValue final : public WindowFunction bool allocatesMemoryInArena() const override { return false; } - void windowInsertResultInto(const WindowTransform * transform, size_t function_index) const override + void windowInsertResultInto(const WindowTransform * transform, + size_t function_index) const override { const auto & current_block = transform->blockAt(transform->current_row); IColumn & to = *current_block.output_columns[function_index]; const auto & workspace = transform->workspaces[function_index]; - Int64 offset = (*current_block.input_columns[workspace.argument_column_indices[1]])[transform->current_row.row].get(); + Int64 offset = (*current_block.input_columns[ + workspace.argument_column_indices[1]])[ + transform->current_row.row].get(); /// Either overflow or really negative value, both is not acceptable. if (offset <= 0) { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, "The offset for function {} must be in (0, {}], {} given", getName(), INT64_MAX, offset); + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "The offset for function {} must be in (0, {}], {} given", + getName(), INT64_MAX, offset); } --offset; const auto [target_row, offset_left] = transform->moveRowNumber(transform->frame_start, offset); - if (offset_left != 0 || target_row < transform->frame_start || transform->frame_end <= target_row) + if (offset_left != 0 + || target_row < transform->frame_start + || transform->frame_end <= target_row) { // Offset is outside the frame. to.insertDefault(); @@ -2420,7 +2510,9 @@ struct WindowFunctionNthValue final : public WindowFunction else { // Offset is inside the frame. - to.insertFrom(*transform->blockAt(target_row).input_columns[workspace.argument_column_indices[0]], target_row.row); + to.insertFrom(*transform->blockAt(target_row).input_columns[ + workspace.argument_column_indices[0]], + target_row.row); } } }; @@ -2441,34 +2533,35 @@ struct NonNegativeDerivativeParams bool interval_specified = false; Int64 ts_scale_multiplier = 0; - NonNegativeDerivativeParams(const std::string & name_, const DataTypes & argument_types, const Array & parameters) + NonNegativeDerivativeParams( + const std::string & name_, const DataTypes & argument_types, const Array & parameters) { if (!parameters.empty()) { - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Function {} cannot be parameterized", name_); + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Function {} cannot be parameterized", name_); } if (argument_types.size() != 2 && argument_types.size() != 3) { - throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} takes 2 or 3 arguments", name_); + throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Function {} takes 2 or 3 arguments", name_); } if (!isNumber(argument_types[ARGUMENT_METRIC])) { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "Argument {} must be a number, '{}' given", - ARGUMENT_METRIC, - argument_types[ARGUMENT_METRIC]->getName()); + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Argument {} must be a number, '{}' given", + ARGUMENT_METRIC, + argument_types[ARGUMENT_METRIC]->getName()); } if (!isDateTime(argument_types[ARGUMENT_TIMESTAMP]) && !isDateTime64(argument_types[ARGUMENT_TIMESTAMP])) { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "Argument {} must be DateTime or DateTime64, '{}' given", - ARGUMENT_TIMESTAMP, - argument_types[ARGUMENT_TIMESTAMP]->getName()); + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Argument {} must be DateTime or DateTime64, '{}' given", + ARGUMENT_TIMESTAMP, + argument_types[ARGUMENT_TIMESTAMP]->getName()); } if (isDateTime64(argument_types[ARGUMENT_TIMESTAMP])) @@ -2502,28 +2595,27 @@ struct NonNegativeDerivativeParams }; // nonNegativeDerivative(metric_column, timestamp_column[, INTERVAL 1 SECOND]) -struct WindowFunctionNonNegativeDerivative final : public StatefulWindowFunction, - public NonNegativeDerivativeParams +struct WindowFunctionNonNegativeDerivative final : public StatefulWindowFunction, public NonNegativeDerivativeParams { using Params = NonNegativeDerivativeParams; - WindowFunctionNonNegativeDerivative(const std::string & name_, const DataTypes & argument_types_, const Array & parameters_) + WindowFunctionNonNegativeDerivative(const std::string & name_, + const DataTypes & argument_types_, const Array & parameters_) : StatefulWindowFunction(name_, argument_types_, parameters_, std::make_shared()) , NonNegativeDerivativeParams(name, argument_types, parameters) - { - } + {} bool allocatesMemoryInArena() const override { return false; } - void windowInsertResultInto(const WindowTransform * transform, size_t function_index) const override + void windowInsertResultInto(const WindowTransform * transform, + size_t function_index) const override { const auto & current_block = transform->blockAt(transform->current_row); const auto & workspace = transform->workspaces[function_index]; auto & state = getState(workspace); - auto interval_duration = interval_specified - ? interval_length * (*current_block.input_columns[workspace.argument_column_indices[ARGUMENT_INTERVAL]]).getFloat64(0) - : 1; + auto interval_duration = interval_specified ? interval_length * + (*current_block.input_columns[workspace.argument_column_indices[ARGUMENT_INTERVAL]]).getFloat64(0) : 1; Float64 curr_metric = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_METRIC, transform->current_row); Float64 metric_diff = curr_metric - state.previous_metric; @@ -2531,18 +2623,16 @@ struct WindowFunctionNonNegativeDerivative final : public StatefulWindowFunction if (ts_scale_multiplier) { - const auto & column - = transform->blockAt(transform->current_row.block).input_columns[workspace.argument_column_indices[ARGUMENT_TIMESTAMP]]; + const auto & column = transform->blockAt(transform->current_row.block).input_columns[workspace.argument_column_indices[ARGUMENT_TIMESTAMP]]; const auto & curr_timestamp = checkAndGetColumn(*column).getInt(transform->current_row.row); Float64 time_elapsed = curr_timestamp - state.previous_timestamp; - result = (time_elapsed > 0) ? (metric_diff * ts_scale_multiplier / time_elapsed * interval_duration) : 0; + result = (time_elapsed > 0) ? (metric_diff * ts_scale_multiplier / time_elapsed * interval_duration) : 0; state.previous_timestamp = curr_timestamp; } else { - Float64 curr_timestamp - = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIMESTAMP, transform->current_row); + Float64 curr_timestamp = WindowFunctionHelpers::getValue(transform, function_index, ARGUMENT_TIMESTAMP, transform->current_row); Float64 time_elapsed = curr_timestamp - state.previous_timestamp; result = (time_elapsed > 0) ? (metric_diff / time_elapsed * interval_duration) : 0; state.previous_timestamp = curr_timestamp; @@ -2682,5 +2772,4 @@ void registerWindowFunctions(AggregateFunctionFactory & factory) name, argument_types, parameters); }, properties}); } - }