ClickHouse/dbms/include/DB/Functions/FunctionsRound.h
proller 8b4b1534e3 Cmake: Test on having sse (#405)
* Cmake: Test on having sse

* fix

* wip

* wip

* wip

* wip

* wip

* wip

* fix

* fix

* fix

* fix

* fix

* fix
2017-01-27 23:55:33 +04:00

1168 lines
34 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#pragma once
#include <DB/Functions/FunctionsArithmetic.h>
#include <cmath>
#include <type_traits>
#include <array>
#if __SSE4_1__
#include <smmintrin.h>
#endif
namespace DB
{
/** Функции округления:
* roundToExp2 - вниз до ближайшей степени двойки;
* roundDuration - вниз до ближайшего из: 0, 1, 10, 30, 60, 120, 180, 240, 300, 600, 1200, 1800, 3600, 7200, 18000, 36000;
* roundAge - вниз до ближайшего из: 0, 18, 25, 35, 45.
*
* round(x, N) - арифметическое округление (N = 0 по умолчанию).
* ceil(x, N) - наименьшее число, которое не меньше x (N = 0 по умолчанию).
* floor(x, N) - наибольшее число, которое не больше x (N = 0 по умолчанию).
*
* Значение параметра N:
* - N > 0: округлять до числа с N десятичными знаками после запятой
* - N < 0: окурглять до целого числа с N нулевыми знаками
* - N = 0: округлять до целого числа
*/
template<typename A>
struct RoundToExp2Impl
{
using ResultType = A;
static inline A apply(A x)
{
return x <= 0 ? static_cast<A>(0) : (static_cast<A>(1) << static_cast<UInt64>(log2(static_cast<double>(x))));
}
};
template<>
struct RoundToExp2Impl<Float32>
{
using ResultType = Float32;
static inline Float32 apply(Float32 x)
{
return static_cast<Float32>(x < 1 ? 0. : pow(2., floor(log2(x))));
}
};
template<>
struct RoundToExp2Impl<Float64>
{
using ResultType = Float64;
static inline Float64 apply(Float64 x)
{
return x < 1 ? 0. : pow(2., floor(log2(x)));
}
};
template<typename A>
struct RoundDurationImpl
{
using ResultType = UInt16;
static inline ResultType apply(A x)
{
return x < 1 ? 0
: (x < 10 ? 1
: (x < 30 ? 10
: (x < 60 ? 30
: (x < 120 ? 60
: (x < 180 ? 120
: (x < 240 ? 180
: (x < 300 ? 240
: (x < 600 ? 300
: (x < 1200 ? 600
: (x < 1800 ? 1200
: (x < 3600 ? 1800
: (x < 7200 ? 3600
: (x < 18000 ? 7200
: (x < 36000 ? 18000
: 36000))))))))))))));
}
};
template<typename A>
struct RoundAgeImpl
{
using ResultType = UInt8;
static inline ResultType apply(A x)
{
return x < 1 ? 0
: (x < 18 ? 17
: (x < 25 ? 18
: (x < 35 ? 25
: (x < 45 ? 35
: 45))));
}
};
/** Быстрое вычисление остатка от деления для применения к округлению целых чисел.
* Без проверки, потому что делитель всегда положительный.
*/
template<typename T, typename Enable = void>
struct FastModulo;
template<typename T>
struct FastModulo<T, typename std::enable_if<std::is_integral<T>::value>::type>
{
private:
template<typename InputType, typename Enable = void>
struct Extend;
template<typename InputType>
struct Extend<InputType,
typename std::enable_if<std::is_same<InputType, Int8>::value
|| std::is_same<InputType, Int16>::value>::type>
{
using Type = Int64;
};
template<typename InputType>
struct Extend<InputType,
typename std::enable_if<std::is_same<InputType, UInt8>::value
|| std::is_same<InputType, UInt16>::value>::type>
{
using Type = UInt64;
};
template<typename InputType>
struct Extend<InputType,
typename std::enable_if<std::is_integral<InputType>::value
&& (sizeof(InputType) >= 4)>::type>
{
using Type = InputType;
};
using U = typename Extend<T>::Type;
public:
using Divisor = std::pair<size_t, typename libdivide::divider<U> >;
static inline Divisor prepare(size_t b)
{
return std::make_pair(b, libdivide::divider<U>(b));
}
static inline T compute(T a, const Divisor & divisor)
{
U val = static_cast<U>(a);
U rem = val - (val / divisor.second) * static_cast<U>(divisor.first);
return static_cast<T>(rem);
}
};
/** Этот параметр контролирует поведение функций округления.
*/
enum ScaleMode
{
PositiveScale, // округлять до числа с N десятичными знаками после запятой
NegativeScale, // окурглять до целого числа с N нулевыми знаками
ZeroScale, // округлять до целого числа
NullScale // возвращать нулевое значение
};
#if !defined(_MM_FROUND_NINT)
#define _MM_FROUND_NINT 0
#define _MM_FROUND_FLOOR 1
#define _MM_FROUND_CEIL 2
#endif
/** Реализация низкоуровневых функций округления для целочисленных значений.
*/
template<typename T, int rounding_mode, ScaleMode scale_mode, typename Enable = void>
struct IntegerRoundingComputation;
template<typename T, int rounding_mode, ScaleMode scale_mode>
struct IntegerRoundingComputation<T, rounding_mode, scale_mode,
typename std::enable_if<std::is_integral<T>::value
&& ((scale_mode == PositiveScale) || (scale_mode == ZeroScale))>::type>
{
using Divisor = int;
static inline Divisor prepare(size_t scale)
{
return 0;
}
static inline T compute(T in, const Divisor & scale)
{
return in;
}
};
template<typename T>
struct IntegerRoundingComputation<T, _MM_FROUND_NINT, NegativeScale,
typename std::enable_if<std::is_integral<T>::value>::type>
{
using Op = FastModulo<T>;
using Divisor = typename Op::Divisor;
static inline Divisor prepare(size_t scale)
{
return Op::prepare(scale);
}
static inline T compute(T in, const Divisor & scale)
{
T factor = (in < 0) ? -1 : 1;
in *= factor;
T rem = Op::compute(in, scale);
in -= rem;
T res;
if ((2 * rem) < static_cast<T>(scale.first))
res = in;
else
res = in + scale.first;
return factor * res;
}
};
template<typename T>
struct IntegerRoundingComputation<T, _MM_FROUND_CEIL, NegativeScale,
typename std::enable_if<std::is_integral<T>::value>::type>
{
using Op = FastModulo<T>;
using Divisor = typename Op::Divisor;
static inline Divisor prepare(size_t scale)
{
return Op::prepare(scale);
}
static inline T compute(T in, const Divisor & scale)
{
T factor = (in < 0) ? -1 : 1;
in *= factor;
T rem = Op::compute(in, scale);
T res = in - rem + scale.first;
return factor * res;
}
};
template<typename T>
struct IntegerRoundingComputation<T, _MM_FROUND_FLOOR, NegativeScale,
typename std::enable_if<std::is_integral<T>::value>::type>
{
using Op = FastModulo<T>;
using Divisor = typename Op::Divisor;
static inline Divisor prepare(size_t scale)
{
return Op::prepare(scale);
}
static inline T compute(T in, const Divisor & scale)
{
T factor = (in < 0) ? -1 : 1;
in *= factor;
T rem = Op::compute(in, scale);
T res = in - rem;
return factor * res;
}
};
#if __SSE4_1__
template <typename T>
class BaseFloatRoundingComputation;
template <>
class BaseFloatRoundingComputation<Float32>
{
public:
using Scale = __m128;
static const size_t data_count = 4;
protected:
/// Предотвратить появление отрицательных нолей определённых в стандарте IEEE-754.
static inline void normalize(__m128 & val, const __m128 & mask)
{
__m128 mask1 = _mm_cmpeq_ps(val, getZero());
__m128 mask2 = _mm_and_ps(mask, mask1);
mask2 = _mm_cmpeq_ps(mask2, getZero());
mask2 = _mm_min_ps(mask2, getTwo());
mask2 = _mm_sub_ps(mask2, getOne());
val = _mm_mul_ps(val, mask2);
}
static inline const __m128 & getZero()
{
static const __m128 zero = _mm_set1_ps(0.0);
return zero;
}
static inline const __m128 & getOne()
{
static const __m128 one = _mm_set1_ps(1.0);
return one;
}
static inline const __m128 & getTwo()
{
static const __m128 two = _mm_set1_ps(2.0);
return two;
}
};
template <>
class BaseFloatRoundingComputation<Float64>
{
public:
using Scale = __m128d;
static const size_t data_count = 2;
protected:
/// Предотвратить появление отрицательных нолей определённых в стандарте IEEE-754.
static inline void normalize(__m128d & val, const __m128d & mask)
{
__m128d mask1 = _mm_cmpeq_pd(val, getZero());
__m128d mask2 = _mm_and_pd(mask, mask1);
mask2 = _mm_cmpeq_pd(mask2, getZero());
mask2 = _mm_min_pd(mask2, getTwo());
mask2 = _mm_sub_pd(mask2, getOne());
val = _mm_mul_pd(val, mask2);
}
static inline const __m128d & getZero()
{
static const __m128d zero = _mm_set1_pd(0.0);
return zero;
}
static inline const __m128d & getOne()
{
static const __m128d one = _mm_set1_pd(1.0);
return one;
}
static inline const __m128d & getTwo()
{
static const __m128d two = _mm_set1_pd(2.0);
return two;
}
};
/** Реализация низкоуровневых функций округления для значений с плавающей точкой.
*/
template<typename T, int rounding_mode, ScaleMode scale_mode>
class FloatRoundingComputation;
template<int rounding_mode>
class FloatRoundingComputation<Float32, rounding_mode, PositiveScale>
: public BaseFloatRoundingComputation<Float32>
{
public:
static inline void prepare(size_t scale, Scale & mm_scale)
{
Float32 fscale = static_cast<Float32>(scale);
mm_scale = _mm_load1_ps(&fscale);
}
static inline void compute(const Float32 * __restrict in, const Scale & scale, Float32 * __restrict out)
{
__m128 val = _mm_loadu_ps(in);
__m128 mask = _mm_cmplt_ps(val, getZero());
/// Алгоритм округления.
val = _mm_mul_ps(val, scale);
val = _mm_round_ps(val, rounding_mode);
val = _mm_div_ps(val, scale);
normalize(val, mask);
_mm_storeu_ps(out, val);
}
};
template<int rounding_mode>
class FloatRoundingComputation<Float32, rounding_mode, NegativeScale>
: public BaseFloatRoundingComputation<Float32>
{
public:
static inline void prepare(size_t scale, Scale & mm_scale)
{
Float32 fscale = static_cast<Float32>(scale);
mm_scale = _mm_load1_ps(&fscale);
}
static inline void compute(const Float32 * __restrict in, const Scale & scale, Float32 * __restrict out)
{
__m128 val = _mm_loadu_ps(in);
__m128 mask = _mm_cmplt_ps(val, getZero());
/// Превратить отрицательные значения в положительные.
__m128 factor = _mm_cmpge_ps(val, getZero());
factor = _mm_min_ps(factor, getTwo());
factor = _mm_sub_ps(factor, getOne());
val = _mm_mul_ps(val, factor);
/// Алгоритм округления.
val = _mm_div_ps(val, scale);
__m128 res = _mm_cmpge_ps(val, getOneTenth());
val = _mm_round_ps(val, rounding_mode);
val = _mm_mul_ps(val, scale);
val = _mm_and_ps(val, res);
/// Вернуть настоящие знаки всех значений.
val = _mm_mul_ps(val, factor);
normalize(val, mask);
_mm_storeu_ps(out, val);
}
private:
static inline const __m128 & getOneTenth()
{
static const __m128 one_tenth = _mm_set1_ps(0.1);
return one_tenth;
}
};
template<int rounding_mode>
class FloatRoundingComputation<Float32, rounding_mode, ZeroScale>
: public BaseFloatRoundingComputation<Float32>
{
public:
static inline void prepare(size_t scale, Scale & mm_scale)
{
}
static inline void compute(const Float32 * __restrict in, const Scale & scale, Float32 * __restrict out)
{
__m128 val = _mm_loadu_ps(in);
__m128 mask = _mm_cmplt_ps(val, getZero());
val = _mm_round_ps(val, rounding_mode);
normalize(val, mask);
_mm_storeu_ps(out, val);
}
};
template<int rounding_mode>
class FloatRoundingComputation<Float64, rounding_mode, PositiveScale>
: public BaseFloatRoundingComputation<Float64>
{
public:
static inline void prepare(size_t scale, Scale & mm_scale)
{
Float64 fscale = static_cast<Float64>(scale);
mm_scale = _mm_load1_pd(&fscale);
}
static inline void compute(const Float64 * __restrict in, const Scale & scale, Float64 * __restrict out)
{
__m128d val = _mm_loadu_pd(in);
__m128d mask = _mm_cmplt_pd(val, getZero());
/// Алгоритм округления.
val = _mm_mul_pd(val, scale);
val = _mm_round_pd(val, rounding_mode);
val = _mm_div_pd(val, scale);
normalize(val, mask);
_mm_storeu_pd(out, val);
}
};
template<int rounding_mode>
class FloatRoundingComputation<Float64, rounding_mode, NegativeScale>
: public BaseFloatRoundingComputation<Float64>
{
public:
static inline void prepare(size_t scale, Scale & mm_scale)
{
Float64 fscale = static_cast<Float64>(scale);
mm_scale = _mm_load1_pd(&fscale);
}
static inline void compute(const Float64 * __restrict in, const Scale & scale, Float64 * __restrict out)
{
__m128d val = _mm_loadu_pd(in);
__m128d mask = _mm_cmplt_pd(val, getZero());
/// Превратить отрицательные значения в положительные.
__m128d factor = _mm_cmpge_pd(val, getZero());
factor = _mm_min_pd(factor, getTwo());
factor = _mm_sub_pd(factor, getOne());
val = _mm_mul_pd(val, factor);
/// Алгоритм округления.
val = _mm_div_pd(val, scale);
__m128d res = _mm_cmpge_pd(val, getOneTenth());
val = _mm_round_pd(val, rounding_mode);
val = _mm_mul_pd(val, scale);
val = _mm_and_pd(val, res);
/// Вернуть настоящие знаки всех значений.
val = _mm_mul_pd(val, factor);
normalize(val, mask);
_mm_storeu_pd(out, val);
}
private:
static inline const __m128d & getOneTenth()
{
static const __m128d one_tenth = _mm_set1_pd(0.1);
return one_tenth;
}
};
template<int rounding_mode>
class FloatRoundingComputation<Float64, rounding_mode, ZeroScale>
: public BaseFloatRoundingComputation<Float64>
{
public:
static inline void prepare(size_t scale, Scale & mm_scale)
{
}
static inline void compute(const Float64 * __restrict in, const Scale & scale, Float64 * __restrict out)
{
__m128d val = _mm_loadu_pd(in);
__m128d mask = _mm_cmplt_pd(val, getZero());
val = _mm_round_pd(val, rounding_mode);
normalize(val, mask);
_mm_storeu_pd(out, val);
}
};
#else
/// Реализация для ARM. Не векторизована. Не исправляет отрицательные нули.
template <int mode>
float roundWithMode(float x)
{
if (mode == _MM_FROUND_NINT) return roundf(x);
if (mode == _MM_FROUND_FLOOR) return floorf(x);
if (mode == _MM_FROUND_CEIL) return ceilf(x);
__builtin_unreachable();
}
template <int mode>
double roundWithMode(double x)
{
if (mode == _MM_FROUND_NINT) return round(x);
if (mode == _MM_FROUND_FLOOR) return floor(x);
if (mode == _MM_FROUND_CEIL) return ceil(x);
__builtin_unreachable();
}
template <typename T>
class BaseFloatRoundingComputation
{
public:
using Scale = T;
static const size_t data_count = 1;
static inline void prepare(size_t scale, Scale & mm_scale)
{
mm_scale = static_cast<T>(scale);
}
};
template <typename T, int rounding_mode, ScaleMode scale_mode>
class FloatRoundingComputation;
template <typename T, int rounding_mode>
class FloatRoundingComputation<T, rounding_mode, PositiveScale>
: public BaseFloatRoundingComputation<T>
{
public:
static inline void compute(const T * __restrict in, const T & scale, T * __restrict out)
{
out[0] = roundWithMode<rounding_mode>(in[0] * scale) / scale;
}
};
template <typename T, int rounding_mode>
class FloatRoundingComputation<T, rounding_mode, NegativeScale>
: public BaseFloatRoundingComputation<T>
{
public:
static inline void compute(const T * __restrict in, const T & scale, T * __restrict out)
{
out[0] = roundWithMode<rounding_mode>(in[0] / scale) * scale;
}
};
template <typename T, int rounding_mode>
class FloatRoundingComputation<T, rounding_mode, ZeroScale>
: public BaseFloatRoundingComputation<T>
{
public:
static inline void prepare(size_t scale, T & mm_scale)
{
}
static inline void compute(const T * __restrict in, const T & scale, T * __restrict out)
{
out[0] = roundWithMode<rounding_mode>(in[0]);
}
};
#endif
/** Реализация высокоуровневых функций округления.
*/
template<typename T, int rounding_mode, ScaleMode scale_mode, typename Enable = void>
struct FunctionRoundingImpl;
/** Реализация высокоуровневых функций округления для целочисленных значений.
*/
template<typename T, int rounding_mode, ScaleMode scale_mode>
struct FunctionRoundingImpl<T, rounding_mode, scale_mode,
typename std::enable_if<std::is_integral<T>::value && (scale_mode != NullScale)>::type>
{
private:
using Op = IntegerRoundingComputation<T, rounding_mode, scale_mode>;
public:
static inline void apply(const PaddedPODArray<T> & in, size_t scale, typename ColumnVector<T>::Container_t & out)
{
auto divisor = Op::prepare(scale);
const T* begin_in = &in[0];
const T* end_in = begin_in + in.size();
T* __restrict p_out = &out[0];
for (const T* __restrict p_in = begin_in; p_in != end_in; ++p_in)
{
*p_out = Op::compute(*p_in, divisor);
++p_out;
}
}
static inline T apply(T val, size_t scale)
{
auto divisor = Op::prepare(scale);
return Op::compute(val, divisor);
}
};
/** Реализация высокоуровневых функций округления для значений с плавающей точкой.
*/
template<typename T, int rounding_mode, ScaleMode scale_mode>
struct FunctionRoundingImpl<T, rounding_mode, scale_mode,
typename std::enable_if<std::is_floating_point<T>::value && (scale_mode != NullScale)>::type>
{
private:
using Op = FloatRoundingComputation<T, rounding_mode, scale_mode>;
using Data = std::array<T, Op::data_count>;
using Scale = typename Op::Scale;
public:
static inline void apply(const PaddedPODArray<T> & in, size_t scale, typename ColumnVector<T>::Container_t & out)
{
Scale mm_scale;
Op::prepare(scale, mm_scale);
const size_t data_count = std::tuple_size<Data>();
const T* begin_in = &in[0];
const T* end_in = begin_in + in.size();
T* begin_out = &out[0];
const T* end_out = begin_out + out.size();
const T* limit = begin_in + in.size() / data_count * data_count;
const T* __restrict p_in = begin_in;
T* __restrict p_out = begin_out;
for (; p_in < limit; p_in += data_count)
{
Op::compute(p_in, mm_scale, p_out);
p_out += data_count;
}
if (p_in < end_in)
{
Data tmp{{}};
T* begin_tmp = &tmp[0];
const T* end_tmp = begin_tmp + data_count;
for (T* __restrict p_tmp = begin_tmp; (p_tmp != end_tmp) && (p_in != end_in); ++p_tmp)
{
*p_tmp = *p_in;
++p_in;
}
Data res;
const T* begin_res = &res[0];
const T* end_res = begin_res + data_count;
Op::compute(reinterpret_cast<T *>(&tmp), mm_scale, reinterpret_cast<T *>(&res));
for (const T* __restrict p_res = begin_res; (p_res != end_res) && (p_out != end_out); ++p_res)
{
*p_out = *p_res;
++p_out;
}
}
}
static inline T apply(T val, size_t scale)
{
if (val == 0)
return val;
else
{
Scale mm_scale;
Op::prepare(scale, mm_scale);
Data tmp{{}};
tmp[0] = val;
Data res;
Op::compute(reinterpret_cast<T *>(&tmp), mm_scale, reinterpret_cast<T *>(&res));
return res[0];
}
}
};
/** Реализация высокоуровневых функций округления в том случае, когда возвращается нулевое значение.
*/
template<typename T, int rounding_mode, ScaleMode scale_mode>
struct FunctionRoundingImpl<T, rounding_mode, scale_mode,
typename std::enable_if<scale_mode == NullScale>::type>
{
public:
static inline void apply(const PaddedPODArray<T> & in, size_t scale, typename ColumnVector<T>::Container_t & out)
{
::memset(reinterpret_cast<T *>(&out[0]), 0, in.size() * sizeof(T));
}
static inline T apply(T val, size_t scale)
{
return 0;
}
};
/// Следующий код генерирует во время сборки таблицу степеней числа 10.
namespace
{
/// Отдельные степени числа 10.
template<size_t N>
struct PowerOf10
{
static const size_t value = 10 * PowerOf10<N - 1>::value;
};
template<>
struct PowerOf10<0>
{
static const size_t value = 1;
};
}
/// Объявление и определение контейнера содержащего таблицу степеней числа 10.
template<size_t... TArgs>
struct TableContainer
{
static const std::array<size_t, sizeof...(TArgs)> values;
};
template<size_t... TArgs>
const std::array<size_t, sizeof...(TArgs)> TableContainer<TArgs...>::values {{ TArgs... }};
/// Генератор первых N степеней.
template<size_t N, size_t... TArgs>
struct FillArrayImpl
{
using result = typename FillArrayImpl<N - 1, PowerOf10<N>::value, TArgs...>::result;
};
template<size_t... TArgs>
struct FillArrayImpl<0, TArgs...>
{
using result = TableContainer<PowerOf10<0>::value, TArgs...>;
};
template<size_t N>
struct FillArray
{
using result = typename FillArrayImpl<N - 1>::result;
};
/** Этот шаблон определяет точность, которую используют функции round/ceil/floor,
* затем преобразовывает её в значение, которое можно использовать в операциях
* умножения и деления. Поэтому оно называется масштабом.
*/
template<typename T, typename U, typename Enable = void>
struct ScaleForRightType;
template<typename T, typename U>
struct ScaleForRightType<T, U,
typename std::enable_if<
std::is_floating_point<T>::value
&& std::is_signed<U>::value>::type>
{
static inline bool apply(const ColumnPtr & column, ScaleMode & scale_mode, size_t & scale)
{
using PowersOf10 = typename FillArray<std::numeric_limits<T>::digits10 + 1>::result;
using ColumnType = ColumnConst<U>;
const ColumnType * precision_col = typeid_cast<const ColumnType *>(&*column);
if (precision_col == nullptr)
return false;
U val = precision_col->getData();
if (val < 0)
{
if (val < -static_cast<U>(std::numeric_limits<T>::digits10))
{
scale_mode = NullScale;
scale = 1;
}
else
{
scale_mode = NegativeScale;
scale = PowersOf10::values[-val];
}
}
else if (val == 0)
{
scale_mode = ZeroScale;
scale = 1;
}
else
{
scale_mode = PositiveScale;
if (val > std::numeric_limits<T>::digits10)
val = static_cast<U>(std::numeric_limits<T>::digits10);
scale = PowersOf10::values[val];
}
return true;
}
};
template<typename T, typename U>
struct ScaleForRightType<T, U,
typename std::enable_if<
std::is_floating_point<T>::value
&& std::is_unsigned<U>::value>::type>
{
static inline bool apply(const ColumnPtr & column, ScaleMode & scale_mode, size_t & scale)
{
using PowersOf10 = typename FillArray<std::numeric_limits<T>::digits10 + 1>::result;
using ColumnType = ColumnConst<U>;
const ColumnType * precision_col = typeid_cast<const ColumnType *>(&*column);
if (precision_col == nullptr)
return false;
U val = precision_col->getData();
if (val == 0)
{
scale_mode = ZeroScale;
scale = 1;
}
else
{
scale_mode = PositiveScale;
if (val > static_cast<U>(std::numeric_limits<T>::digits10))
val = static_cast<U>(std::numeric_limits<T>::digits10);
scale = PowersOf10::values[val];
}
return true;
}
};
template<typename T, typename U>
struct ScaleForRightType<T, U,
typename std::enable_if<
std::is_integral<T>::value
&& std::is_signed<U>::value>::type>
{
static inline bool apply(const ColumnPtr & column, ScaleMode & scale_mode, size_t & scale)
{
using PowersOf10 = typename FillArray<std::numeric_limits<T>::digits10 + 1>::result;
using ColumnType = ColumnConst<U>;
const ColumnType * precision_col = typeid_cast<const ColumnType *>(&*column);
if (precision_col == nullptr)
return false;
U val = precision_col->getData();
if (val < 0)
{
if (val < -std::numeric_limits<T>::digits10)
{
scale_mode = NullScale;
scale = 1;
}
else
{
scale_mode = NegativeScale;
scale = PowersOf10::values[-val];
}
}
else
{
scale_mode = ZeroScale;
scale = 1;
}
return true;
}
};
template<typename T, typename U>
struct ScaleForRightType<T, U,
typename std::enable_if<
std::is_integral<T>::value
&& std::is_unsigned<U>::value>::type>
{
static inline bool apply(const ColumnPtr & column, ScaleMode & scale_mode, size_t & scale)
{
using ColumnType = ColumnConst<U>;
const ColumnType * precision_col = typeid_cast<const ColumnType *>(&*column);
if (precision_col == nullptr)
return false;
scale_mode = ZeroScale;
scale = 1;
return true;
}
};
/** Превратить параметр точности в масштаб.
*/
template<typename T>
struct ScaleForLeftType
{
static inline void apply(const ColumnPtr & column, ScaleMode & scale_mode, size_t & scale)
{
if (!( ScaleForRightType<T, UInt8>::apply(column, scale_mode, scale)
|| ScaleForRightType<T, UInt16>::apply(column, scale_mode, scale)
|| ScaleForRightType<T, UInt16>::apply(column, scale_mode, scale)
|| ScaleForRightType<T, UInt32>::apply(column, scale_mode, scale)
|| ScaleForRightType<T, UInt64>::apply(column, scale_mode, scale)
|| ScaleForRightType<T, Int8>::apply(column, scale_mode, scale)
|| ScaleForRightType<T, Int16>::apply(column, scale_mode, scale)
|| ScaleForRightType<T, Int32>::apply(column, scale_mode, scale)
|| ScaleForRightType<T, Int64>::apply(column, scale_mode, scale)
|| ScaleForRightType<T, Float32>::apply(column, scale_mode, scale)
|| ScaleForRightType<T, Float64>::apply(column, scale_mode, scale)))
{
throw Exception("Internal error", ErrorCodes::LOGICAL_ERROR);
}
}
};
/** Главный шаблон применяющий функцию округления к значению или столбцу.
*/
template<typename T, int rounding_mode, ScaleMode scale_mode>
struct Cruncher
{
using Op = FunctionRoundingImpl<T, rounding_mode, scale_mode>;
static inline void apply(Block & block, ColumnVector<T> * col, const ColumnNumbers & arguments, size_t result, size_t scale)
{
auto col_res = std::make_shared<ColumnVector<T>>();
block.safeGetByPosition(result).column = col_res;
typename ColumnVector<T>::Container_t & vec_res = col_res->getData();
vec_res.resize(col->getData().size());
if (vec_res.empty())
return;
Op::apply(col->getData(), scale, vec_res);
}
static inline void apply(Block & block, ColumnConst<T> * col, const ColumnNumbers & arguments, size_t result, size_t scale)
{
T res = Op::apply(col->getData(), scale);
auto col_res = std::make_shared<ColumnConst<T>>(col->size(), res);
block.safeGetByPosition(result).column = col_res;
}
};
/** Выбрать подходящий алгоритм обработки в зависимости от масштаба.
*/
template<typename T, template <typename> class U, int rounding_mode>
struct Dispatcher
{
static inline void apply(Block & block, U<T> * col, const ColumnNumbers & arguments, size_t result)
{
ScaleMode scale_mode;
size_t scale;
if (arguments.size() == 2)
ScaleForLeftType<T>::apply(block.safeGetByPosition(arguments[1]).column, scale_mode, scale);
else
{
scale_mode = ZeroScale;
scale = 1;
}
if (scale_mode == PositiveScale)
Cruncher<T, rounding_mode, PositiveScale>::apply(block, col, arguments, result, scale);
else if (scale_mode == ZeroScale)
Cruncher<T, rounding_mode, ZeroScale>::apply(block, col, arguments, result, scale);
else if (scale_mode == NegativeScale)
Cruncher<T, rounding_mode, NegativeScale>::apply(block, col, arguments, result, scale);
else if (scale_mode == NullScale)
Cruncher<T, rounding_mode, NullScale>::apply(block, col, arguments, result, scale);
else
throw Exception("Illegal operation", ErrorCodes::LOGICAL_ERROR);
}
};
/** Шаблон для функций, которые округляют значение входного параметра типа
* (U)Int8/16/32/64 или Float32/64, и принимают дополнительный необязятельный
* параметр (по умолчанию - 0).
*/
template<typename Name, int rounding_mode>
class FunctionRounding : public IFunction
{
public:
static constexpr auto name = Name::name;
static FunctionPtr create(const Context & context) { return std::make_shared<FunctionRounding>(); }
private:
template<typename T>
bool checkType(const IDataType * type) const
{
return typeid_cast<const T *>(type) != nullptr;
}
template<typename T>
bool executeForType(Block & block, const ColumnNumbers & arguments, size_t result)
{
if (ColumnVector<T> * col = typeid_cast<ColumnVector<T> *>(block.safeGetByPosition(arguments[0]).column.get()))
{
Dispatcher<T, ColumnVector, rounding_mode>::apply(block, col, arguments, result);
return true;
}
else if (ColumnConst<T> * col = typeid_cast<ColumnConst<T> *>(block.safeGetByPosition(arguments[0]).column.get()))
{
Dispatcher<T, ColumnConst, rounding_mode>::apply(block, col, arguments, result);
return true;
}
else
return false;
}
public:
/// Получить имя функции.
String getName() const override
{
return name;
}
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
/// Получить типы результата по типам аргументов. Если функция неприменима для данных аргументов - кинуть исключение.
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if ((arguments.size() < 1) || (arguments.size() > 2))
throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 1 or 2.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (arguments.size() == 2)
{
const IDataType * type = &*arguments[1];
if (!( checkType<DataTypeUInt8>(type)
|| checkType<DataTypeUInt16>(type)
|| checkType<DataTypeUInt32>(type)
|| checkType<DataTypeUInt64>(type)
|| checkType<DataTypeInt8>(type)
|| checkType<DataTypeInt16>(type)
|| checkType<DataTypeInt32>(type)
|| checkType<DataTypeInt64>(type)
|| checkType<DataTypeFloat32>(type)
|| checkType<DataTypeFloat64>(type)))
{
throw Exception("Illegal type in second argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
}
const IDataType * type = &*arguments[0];
if (!type->behavesAsNumber())
throw Exception("Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return arguments[0];
}
/// Выполнить функцию над блоком.
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) override
{
if (!( executeForType<UInt8>(block, arguments, result)
|| executeForType<UInt16>(block, arguments, result)
|| executeForType<UInt32>(block, arguments, result)
|| executeForType<UInt64>(block, arguments, result)
|| executeForType<Int8>(block, arguments, result)
|| executeForType<Int16>(block, arguments, result)
|| executeForType<Int32>(block, arguments, result)
|| executeForType<Int64>(block, arguments, result)
|| executeForType<Float32>(block, arguments, result)
|| executeForType<Float64>(block, arguments, result)))
{
throw Exception("Illegal column " + block.safeGetByPosition(arguments[0]).column->getName()
+ " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
}
bool hasInformationAboutMonotonicity() const override
{
return true;
}
Monotonicity getMonotonicityForRange(const IDataType & type, const Field & left, const Field & right) const override
{
return { true };
}
};
struct NameRoundToExp2 { static constexpr auto name = "roundToExp2"; };
struct NameRoundDuration { static constexpr auto name = "roundDuration"; };
struct NameRoundAge { static constexpr auto name = "roundAge"; };
struct NameRound { static constexpr auto name = "round"; };
struct NameCeil { static constexpr auto name = "ceil"; };
struct NameFloor { static constexpr auto name = "floor"; };
using FunctionRoundToExp2 = FunctionUnaryArithmetic<RoundToExp2Impl, NameRoundToExp2, false>;
using FunctionRoundDuration = FunctionUnaryArithmetic<RoundDurationImpl, NameRoundDuration, false>;
using FunctionRoundAge = FunctionUnaryArithmetic<RoundAgeImpl, NameRoundAge, false>;
using FunctionRound = FunctionRounding<NameRound, _MM_FROUND_NINT>;
using FunctionFloor = FunctionRounding<NameFloor, _MM_FROUND_FLOOR>;
using FunctionCeil = FunctionRounding<NameCeil, _MM_FROUND_CEIL>;
struct PositiveMonotonicity
{
static bool has() { return true; }
static IFunction::Monotonicity get(const Field & left, const Field & right)
{
return { true };
}
};
template <> struct FunctionUnaryArithmeticMonotonicity<NameRoundToExp2> : PositiveMonotonicity {};
template <> struct FunctionUnaryArithmeticMonotonicity<NameRoundDuration> : PositiveMonotonicity {};
template <> struct FunctionUnaryArithmeticMonotonicity<NameRoundAge> : PositiveMonotonicity {};
}