#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include "IFunctionImpl.h" namespace DB { /** Window functions: * * TUMBLE(time_attr, interval [, timezone]) * * TUMBLE_START(window_id) * * TUMBLE_START(time_attr, interval [, timezone]) * * TUMBLE_END(window_id) * * TUMBLE_END(time_attr, interval [, timezone]) * * HOP(time_attr, hop_interval, window_interval [, timezone]) * * HOP_START(window_id) * * HOP_START(time_attr, hop_interval, window_interval [, timezone]) * * HOP_END(window_id) * * HOP_END(time_attr, hop_interval, window_interval [, timezone]) * */ enum WindowFunctionName { TUMBLE, TUMBLE_START, TUMBLE_END, HOP, HOP_START, HOP_END, WINDOW_ID }; namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int ILLEGAL_COLUMN; extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int ARGUMENT_OUT_OF_BOUND; } template struct ToStartOfTransform; #define TRANSFORM_DATE(INTERVAL_KIND) \ template <> \ struct ToStartOfTransform \ { \ static UInt32 execute(UInt32 t, UInt64 delta, const DateLUTImpl & time_zone) \ { \ return time_zone.toStartOf##INTERVAL_KIND##Interval(time_zone.toDayNum(t), delta); \ } \ }; TRANSFORM_DATE(Year) TRANSFORM_DATE(Quarter) TRANSFORM_DATE(Month) TRANSFORM_DATE(Week) TRANSFORM_DATE(Day) #undef TRANSFORM_DATE #define TRANSFORM_TIME(INTERVAL_KIND) \ template <> \ struct ToStartOfTransform \ { \ static UInt32 execute(UInt32 t, UInt64 delta, const DateLUTImpl & time_zone) \ { \ return time_zone.toStartOf##INTERVAL_KIND##Interval(t, delta); \ } \ }; TRANSFORM_TIME(Hour) TRANSFORM_TIME(Minute) TRANSFORM_TIME(Second) #undef TRANSFORM_DATE template struct AddTime; #define ADD_DATE(INTERVAL_KIND) \ template <> \ struct AddTime \ { \ static UInt32 execute(UInt32 t, Int64 delta, const DateLUTImpl & time_zone) { return time_zone.add##INTERVAL_KIND##s(t, delta); } \ }; ADD_DATE(Year) ADD_DATE(Quarter) ADD_DATE(Month) ADD_DATE(Week) ADD_DATE(Day) #undef ADD_DATE #define ADD_TIME(INTERVAL_KIND, INTERVAL) \ template <> \ struct AddTime \ { \ static UInt32 execute(UInt32 t, Int64 delta, const DateLUTImpl &) { return t + INTERVAL * delta; } \ }; ADD_TIME(Hour, 3600) ADD_TIME(Minute, 60) ADD_TIME(Second, 1) #undef ADD_TIME namespace { static std::tuple dispatchForIntervalColumns(const ColumnWithTypeAndName & interval_column, const String & function_name) { const auto * interval_type = checkAndGetDataType(interval_column.type.get()); if (!interval_type) throw Exception( "Illegal column " + interval_column.name + " of argument of function " + function_name, ErrorCodes::ILLEGAL_COLUMN); const auto * interval_column_const_int64 = checkAndGetColumnConst(interval_column.column.get()); if (!interval_column_const_int64) throw Exception( "Illegal column " + interval_column.name + " of argument of function " + function_name, ErrorCodes::ILLEGAL_COLUMN); Int64 num_units = interval_column_const_int64->getValue(); if (num_units <= 0) throw Exception( "Value for column " + interval_column.name + " of function " + function_name + " must be positive.", ErrorCodes::ARGUMENT_OUT_OF_BOUND); return {interval_type->getKind(), num_units}; } static ColumnPtr executeWindowBound(const ColumnPtr & column, int index, const String & function_name) { if (const ColumnTuple * col_tuple = checkAndGetColumn(column.get()); col_tuple) { if (!checkColumn>(*col_tuple->getColumnPtr(index))) throw Exception( "Illegal column for first argument of function " + function_name + ". Must be a Tuple(DataTime, DataTime)", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); return col_tuple->getColumnPtr(index); } else if (const ColumnArray * col_array = checkAndGetColumn(column.get()); col_array) { const ColumnTuple * col_tuple_ = checkAndGetColumn(&col_array->getData()); if (!col_tuple_) throw Exception( "Illegal column for first argument of function " + function_name + ". Must be a Tuple or Array(Tuple)", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); const auto & bound_column = col_tuple_->getColumn(index); const ColumnUInt32::Container & bound_data = static_cast(bound_column).getData(); const auto & column_offsets = col_array->getOffsets(); auto res = ColumnUInt32::create(); ColumnUInt32::Container & res_data = res->getData(); res_data.reserve(column_offsets.size()); IColumn::Offset current_offset = 0; for (size_t i = 0; i < column_offsets.size(); ++i) { res_data.push_back(bound_data[current_offset]); current_offset = column_offsets[i]; } return res; } else { throw Exception( "Illegal column for first argument of function " + function_name + ". Must be a Tuple or Array(Tuple)", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); } } template struct WindowImpl { static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name); }; template <> struct WindowImpl { static constexpr auto name = "TUMBLE"; [[maybe_unused]] static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name) { if (arguments.size() != 2 && arguments.size() != 3) { throw Exception( "Number of arguments for function " + function_name + " doesn't match: passed " + toString(arguments.size()) + ", should be 2.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); } if (!WhichDataType(arguments[0].type).isDateTime()) throw Exception( "Illegal type of first argument of function " + function_name + " should be DateTime", ErrorCodes::ILLEGAL_COLUMN); if (!WhichDataType(arguments[1].type).isInterval()) throw Exception( "Illegal type of second argument of function " + function_name + " should be Interval", ErrorCodes::ILLEGAL_COLUMN); if (arguments.size() == 3 && !WhichDataType(arguments[2].type).isString()) throw Exception( "Illegal type " + arguments[2].type->getName() + " of argument of function " + function_name + ". This argument is optional and must be a constant string with timezone name", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); return std::make_shared(DataTypes{std::make_shared(), std::make_shared()}); } [[maybe_unused]] static ColumnPtr dispatchForColumns(Block & block, const ColumnNumbers & arguments, const String & function_name) { const auto & time_column = block.getByPosition(arguments[0]); const auto & interval_column = block.getByPosition(arguments[1]); const auto & from_datatype = *time_column.type.get(); const auto which_type = WhichDataType(from_datatype); const auto * time_column_vec = checkAndGetColumn(time_column.column.get()); const DateLUTImpl & time_zone = extractTimeZoneFromFunctionArguments(block, arguments, 2, 0); if (!which_type.isDateTime() || !time_column_vec) throw Exception( "Illegal column " + time_column.name + " of function " + function_name + ". Must contain dates or dates with time", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); auto interval = dispatchForIntervalColumns(interval_column, function_name); switch (std::get<0>(interval)) { case IntervalKind::Second: return execute_tumble(*time_column_vec, std::get<1>(interval), time_zone); case IntervalKind::Minute: return execute_tumble(*time_column_vec, std::get<1>(interval), time_zone); case IntervalKind::Hour: return execute_tumble(*time_column_vec, std::get<1>(interval), time_zone); case IntervalKind::Day: return execute_tumble(*time_column_vec, std::get<1>(interval), time_zone); case IntervalKind::Week: return execute_tumble(*time_column_vec, std::get<1>(interval), time_zone); case IntervalKind::Month: return execute_tumble(*time_column_vec, std::get<1>(interval), time_zone); case IntervalKind::Quarter: return execute_tumble(*time_column_vec, std::get<1>(interval), time_zone); case IntervalKind::Year: return execute_tumble(*time_column_vec, std::get<1>(interval), time_zone); } __builtin_unreachable(); } template static ColumnPtr execute_tumble(const ColumnUInt32 & time_column, UInt64 num_units, const DateLUTImpl & time_zone) { const auto & time_data = time_column.getData(); size_t size = time_column.size(); auto start = ColumnUInt32::create(size); auto end = ColumnUInt32::create(size); ColumnUInt32::Container & start_data = start->getData(); ColumnUInt32::Container & end_data = end->getData(); for (size_t i = 0; i != size; ++i) { UInt32 wid = static_cast(ToStartOfTransform::execute(time_data[i], num_units, time_zone)); start_data[i] = wid; end_data[i] = AddTime::execute(wid, num_units, time_zone); } MutableColumns result; result.emplace_back(std::move(start)); result.emplace_back(std::move(end)); return ColumnTuple::create(std::move(result)); } }; template <> struct WindowImpl { static constexpr auto name = "TUMBLE_START"; static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name) { if (arguments.size() == 1) { if (!WhichDataType(arguments[0].type).isTuple()) throw Exception( "Illegal type of first argument of function " + function_name + " should be tuple", ErrorCodes::ILLEGAL_COLUMN); return std::make_shared(); } else if (arguments.size() == 2 || arguments.size() == 3) { if (!WhichDataType(arguments[0].type).isDateTime()) throw Exception( "Illegal type of first argument of function " + function_name + " should be DateTime", ErrorCodes::ILLEGAL_COLUMN); if (!WhichDataType(arguments[1].type).isInterval()) throw Exception( "Illegal type of second argument of function " + function_name + " should be Interval", ErrorCodes::ILLEGAL_COLUMN); if (arguments.size() == 3 && !WhichDataType(arguments[2].type).isString()) throw Exception( "Illegal type " + arguments[2].type->getName() + " of argument of function " + function_name + ". This argument is optional and must be a constant string with timezone name", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); return std::make_shared(); } else { throw Exception( "Number of arguments for function " + function_name + " doesn't match: passed " + toString(arguments.size()) + ", should not larger than 2.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); } } [[maybe_unused]] static ColumnPtr dispatchForColumns(Block & block, const ColumnNumbers & arguments, const String & function_name) { const auto & time_column = block.getByPosition(arguments[0]); const auto which_type = WhichDataType(time_column.type); ColumnPtr result_column_; if (which_type.isDateTime()) result_column_ = WindowImpl::dispatchForColumns(block, arguments, function_name); else result_column_ = block.getByPosition(arguments[0]).column; return executeWindowBound(result_column_, 0, function_name); } }; template <> struct WindowImpl { static constexpr auto name = "TUMBLE_END"; [[maybe_unused]] static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name) { return WindowImpl::getReturnType(arguments, function_name); } [[maybe_unused]] static ColumnPtr dispatchForColumns(Block & block, const ColumnNumbers & arguments, const String & function_name) { const auto & time_column = block.getByPosition(arguments[0]); const auto which_type = WhichDataType(time_column.type); ColumnPtr result_column_; if (which_type.isDateTime()) result_column_ = WindowImpl::dispatchForColumns(block, arguments, function_name); else result_column_ = block.getByPosition(arguments[0]).column; return executeWindowBound(result_column_, 1, function_name); } }; template <> struct WindowImpl { static constexpr auto name = "HOP"; [[maybe_unused]] static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name) { if (arguments.size() != 3 && arguments.size() != 4) { throw Exception( "Number of arguments for function " + function_name + " doesn't match: passed " + toString(arguments.size()) + ", should be 3.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); } if (!WhichDataType(arguments[0].type).isDateTime()) throw Exception( "Illegal type of first argument of function " + function_name + " should be DateTime", ErrorCodes::ILLEGAL_COLUMN); if (!WhichDataType(arguments[1].type).isInterval()) throw Exception( "Illegal type of second argument of function " + function_name + " should be Interval", ErrorCodes::ILLEGAL_COLUMN); if (!WhichDataType(arguments[2].type).isInterval()) throw Exception( "Illegal type of third argument of function " + function_name + " should be Interval", ErrorCodes::ILLEGAL_COLUMN); if (arguments.size() == 4 && !WhichDataType(arguments[3].type).isString()) throw Exception( "Illegal type " + arguments[3].type->getName() + " of argument of function " + function_name + ". This argument is optional and must be a constant string with timezone name", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); return std::make_shared(DataTypes{std::make_shared(), std::make_shared()}); } static ColumnPtr dispatchForColumns(Block & block, const ColumnNumbers & arguments, const String & function_name) { const auto & time_column = block.getByPosition(arguments[0]); const auto & hop_interval_column = block.getByPosition(arguments[1]); const auto & window_interval_column = block.getByPosition(arguments[2]); const auto & from_datatype = *time_column.type.get(); const auto * time_column_vec = checkAndGetColumn(time_column.column.get()); const DateLUTImpl & time_zone = extractTimeZoneFromFunctionArguments(block, arguments, 3, 0); if (!WhichDataType(from_datatype).isDateTime() || !time_column_vec) throw Exception( "Illegal column " + time_column.name + " argument of function " + function_name + ". Must contain dates or dates with time", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); auto hop_interval = dispatchForIntervalColumns(hop_interval_column, function_name); auto window_interval = dispatchForIntervalColumns(window_interval_column, function_name); if (std::get<0>(hop_interval) != std::get<0>(window_interval)) throw Exception( "Interval type of window and hop column of function " + function_name + ", must be same.", ErrorCodes::ILLEGAL_COLUMN); if (std::get<1>(hop_interval) > std::get<1>(window_interval)) throw Exception( "Value for hop interval of function " + function_name + " must not larger than window interval.", ErrorCodes::ARGUMENT_OUT_OF_BOUND); switch (std::get<0>(window_interval)) { case IntervalKind::Second: return execute_hop( *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone); case IntervalKind::Minute: return execute_hop( *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone); case IntervalKind::Hour: return execute_hop( *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone); case IntervalKind::Day: return execute_hop( *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone); case IntervalKind::Week: return execute_hop( *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone); case IntervalKind::Month: return execute_hop( *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone); case IntervalKind::Quarter: return execute_hop( *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone); case IntervalKind::Year: return execute_hop( *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone); } __builtin_unreachable(); } template static ColumnPtr execute_hop(const ColumnUInt32 & time_column, UInt64 hop_num_units, UInt64 window_num_units, const DateLUTImpl & time_zone) { const auto & time_data = time_column.getData(); size_t size = time_column.size(); auto start = ColumnUInt32::create(size); auto end = ColumnUInt32::create(size); ColumnUInt32::Container & start_data = start->getData(); ColumnUInt32::Container & end_data = end->getData(); for (size_t i = 0; i < size; ++i) { UInt32 wstart = static_cast(ToStartOfTransform::execute(time_data[i], hop_num_units, time_zone)); UInt32 wend = AddTime::execute(wstart, hop_num_units, time_zone); wstart = AddTime::execute(wend, -1 * window_num_units, time_zone); UInt32 wend_ = wend; UInt32 wend_latest; do { wend_latest = wend_; wend_ = AddTime::execute(wend_, -1 * hop_num_units, time_zone); } while (wend_ > time_data[i]); end_data[i] = wend_latest; start_data[i] = AddTime::execute(wend_latest, -1 * window_num_units, time_zone); } MutableColumns result; result.emplace_back(std::move(start)); result.emplace_back(std::move(end)); return ColumnTuple::create(std::move(result)); } }; template <> struct WindowImpl { static constexpr auto name = "WINDOW_ID"; [[maybe_unused]] static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name) { if (!WhichDataType(arguments[0].type).isDateTime()) throw Exception( "Illegal type of first argument of function " + function_name + " should be DateTime", ErrorCodes::ILLEGAL_COLUMN); if (!WhichDataType(arguments[1].type).isInterval()) throw Exception( "Illegal type of second argument of function " + function_name + " should be Interval", ErrorCodes::ILLEGAL_COLUMN); if (arguments.size() >= 3 && !WhichDataType(arguments[2].type).isInterval()) throw Exception( "Illegal type of third argument of function " + function_name + " should be Interval", ErrorCodes::ILLEGAL_COLUMN); if (arguments.size() == 4 && !WhichDataType(arguments[3].type).isString()) throw Exception( "Illegal type " + arguments[3].type->getName() + " of argument of function " + function_name + ". This argument is optional and must be a constant string with timezone name", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); if (arguments.size() > 4) throw Exception( "Number of arguments for function " + function_name + " doesn't match: passed " + toString(arguments.size()) + ", should not larger than 4.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); return std::make_shared(); } [[maybe_unused]] static ColumnPtr dispatchForHopColumns(Block & block, const ColumnNumbers & arguments, const String & function_name) { const auto & time_column = block.getByPosition(arguments[0]); const auto & hop_interval_column = block.getByPosition(arguments[1]); const auto & window_interval_column = block.getByPosition(arguments[2]); const auto & from_datatype = *time_column.type.get(); const auto * time_column_vec = checkAndGetColumn(time_column.column.get()); const DateLUTImpl & time_zone = extractTimeZoneFromFunctionArguments(block, arguments, 3, 0); if (!WhichDataType(from_datatype).isDateTime() || !time_column_vec) throw Exception( "Illegal column " + time_column.name + " argument of function " + function_name + ". Must contain dates or dates with time", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); auto hop_interval = dispatchForIntervalColumns(hop_interval_column, function_name); auto window_interval = dispatchForIntervalColumns(window_interval_column, function_name); if (std::get<0>(hop_interval) != std::get<0>(window_interval)) throw Exception( "Interval type of window and hop column of function " + function_name + ", must be same.", ErrorCodes::ILLEGAL_COLUMN); if (std::get<1>(hop_interval) > std::get<1>(window_interval)) throw Exception( "Value for hop interval of function " + function_name + " must not larger than window interval.", ErrorCodes::ARGUMENT_OUT_OF_BOUND); switch (std::get<0>(window_interval)) { case IntervalKind::Second: return execute_hop_slice( *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone); case IntervalKind::Minute: return execute_hop_slice( *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone); case IntervalKind::Hour: return execute_hop_slice( *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone); case IntervalKind::Day: return execute_hop_slice( *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone); case IntervalKind::Week: return execute_hop_slice( *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone); case IntervalKind::Month: return execute_hop_slice( *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone); case IntervalKind::Quarter: return execute_hop_slice( *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone); case IntervalKind::Year: return execute_hop_slice( *time_column_vec, std::get<1>(hop_interval), std::get<1>(window_interval), time_zone); } __builtin_unreachable(); } template static ColumnPtr execute_hop_slice(const ColumnUInt32 & time_column, UInt64 hop_num_units, UInt64 window_num_units, const DateLUTImpl & time_zone) { Int64 gcd_num_units = std::gcd(hop_num_units, window_num_units); const auto & time_data = time_column.getData(); size_t size = time_column.size(); auto end = ColumnUInt32::create(size); ColumnUInt32::Container & end_data = end->getData(); for (size_t i = 0; i < size; ++i) { UInt32 wstart = static_cast(ToStartOfTransform::execute(time_data[i], hop_num_units, time_zone)); UInt32 wend = AddTime::execute(wstart, hop_num_units, time_zone); UInt32 wend_ = wend; UInt32 wend_latest; do { wend_latest = wend_; wend_ = AddTime::execute(wend_, -1 * gcd_num_units, time_zone); } while (wend_ > time_data[i]); end_data[i] = wend_latest; } return end; } [[maybe_unused]] static ColumnPtr dispatchForTumbleColumns(Block & block, const ColumnNumbers & arguments, const String & function_name) { ColumnPtr column = WindowImpl::dispatchForColumns(block, arguments, function_name); return executeWindowBound(column, 1, function_name); } [[maybe_unused]] static ColumnPtr dispatchForColumns(Block & block, const ColumnNumbers & arguments, const String & function_name) { const auto & third_column = block.getByPosition(arguments[2]); if (arguments.size() == 2 || (arguments.size() == 3 && WhichDataType(third_column.type).isString())) return dispatchForTumbleColumns(block, arguments, function_name); else return dispatchForHopColumns(block, arguments, function_name); } }; template <> struct WindowImpl { static constexpr auto name = "HOP_START"; static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name) { if (arguments.size() == 1) { auto type_ = WhichDataType(arguments[0].type); if (!type_.isTuple() && !type_.isUInt32()) throw Exception( "Illegal type of first argument of function " + function_name + " should be Tuple, Array, UInt8 or UInt32", ErrorCodes::ILLEGAL_COLUMN); return std::make_shared(); } else if (arguments.size() == 3 || arguments.size() == 4) { if (!WhichDataType(arguments[0].type).isDateTime()) throw Exception( "Illegal type of first argument of function " + function_name + " should be DateTime", ErrorCodes::ILLEGAL_COLUMN); if (!WhichDataType(arguments[1].type).isInterval()) throw Exception( "Illegal type of second argument of function " + function_name + " should be Interval", ErrorCodes::ILLEGAL_COLUMN); if (!WhichDataType(arguments[2].type).isInterval()) throw Exception( "Illegal type of third argument of function " + function_name + " should be Interval", ErrorCodes::ILLEGAL_COLUMN); if (arguments.size() == 4 && !WhichDataType(arguments[3].type).isString()) throw Exception( "Illegal type " + arguments[3].type->getName() + " of argument of function " + function_name + ". This argument is optional and must be a constant string with timezone name", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); return std::make_shared(); } else { throw Exception( "Number of arguments for function " + function_name + " doesn't match: passed " + toString(arguments.size()) + ", should be 1, 3 or 4.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); } } [[maybe_unused]] static ColumnPtr dispatchForColumns(Block & block, const ColumnNumbers & arguments, const String & function_name) { const auto & time_column = block.getByPosition(arguments[0]); const auto which_type = WhichDataType(time_column.type); ColumnPtr result_column_; if (arguments.size() == 1) { if (which_type.isUInt32()) return time_column.column; else //isTuple result_column_ = time_column.column; } else result_column_ = WindowImpl::dispatchForColumns(block, arguments, function_name); return executeWindowBound(result_column_, 0, function_name); } }; template <> struct WindowImpl { static constexpr auto name = "HOP_END"; [[maybe_unused]] static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name) { return WindowImpl::getReturnType(arguments, function_name); } [[maybe_unused]] static ColumnPtr dispatchForColumns(Block & block, const ColumnNumbers & arguments, const String & function_name) { const auto & time_column = block.getByPosition(arguments[0]); const auto which_type = WhichDataType(time_column.type); ColumnPtr result_column_; if (arguments.size() == 1) { if (which_type.isUInt32()) return time_column.column; else //isTuple result_column_ = time_column.column; } else result_column_ = WindowImpl::dispatchForColumns(block, arguments, function_name); return executeWindowBound(result_column_, 1, function_name); } }; }; template class FunctionWindow : public IFunction { public: static constexpr auto name = WindowImpl::name; static FunctionPtr create(const Context &) { return std::make_shared(); } String getName() const override { return name; } bool isVariadic() const override { return true; } size_t getNumberOfArguments() const override { return 0; } bool useDefaultImplementationForConstants() const override { return true; } ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1, 2, 3}; } DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override { return WindowImpl::getReturnType(arguments, name); } void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) override { auto result_column = WindowImpl::dispatchForColumns(block, arguments, name); block.getByPosition(result).column = std::move(result_column); } }; using FunctionTumble = FunctionWindow; using FunctionTumbleStart = FunctionWindow; using FunctionTumbleEnd = FunctionWindow; using FunctionHop = FunctionWindow; using FunctionWindowId = FunctionWindow; using FunctionHopStart = FunctionWindow; using FunctionHopEnd = FunctionWindow; }