From 57287a28c12f8b41521be1353463112a825408ef Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Mon, 25 May 2015 16:39:06 +0300 Subject: [PATCH] Merge --- dbms/include/DB/Functions/FunctionsRound.h | 683 +++++++++++++++------ 1 file changed, 490 insertions(+), 193 deletions(-) diff --git a/dbms/include/DB/Functions/FunctionsRound.h b/dbms/include/DB/Functions/FunctionsRound.h index ffe2769f24e..b96e3002f05 100644 --- a/dbms/include/DB/Functions/FunctionsRound.h +++ b/dbms/include/DB/Functions/FunctionsRound.h @@ -13,9 +13,14 @@ namespace DB * roundToExp2 - вниз до ближайшей степени двойки; * roundDuration - вниз до ближайшего из: 0, 1, 10, 30, 60, 120, 180, 240, 300, 600, 1200, 1800, 3600, 7200, 18000, 36000; * roundAge - вниз до ближайшего из: 0, 18, 25, 35, 45. - * round(x, N) - арифметическое округление (N - сколько знаков после запятой оставить; 0 по умолчанию). - * ceil(x, N) - наименьшее число, которое не меньше x (N - сколько знаков после запятой оставить; 0 по умолчанию). - * floor(x, N) - наибольшее число, которое не больше x (N - сколько знаков после запятой оставить; 0 по умолчанию). + * round(x, N) - арифметическое округление (N = 0 по умолчанию). + * ceil(x, N) - наименьшее число, которое не меньше x (N = 0 по умолчанию). + * floor(x, N) - наибольшее число, которое не больше x (N = 0 по умолчанию). + * + * Значение параметра N: + * - N > 0: округлять до числа с N десятичными знаками после запятой + * - N < 0: окурглять до целого числа с N нулевыми знаками + * - N = 0: округлять до целого числа */ template @@ -92,125 +97,275 @@ namespace DB } }; - /// Реализация функций округления на низком уровне. + /** Этот параметр контролирует поведение функций округления. + */ + enum ScaleMode + { + PositiveScale, // округлять до числа с N десятичными знаками после запятой + NegativeScale, // окурглять до целого числа с N нулевыми знаками + ZeroScale, // округлять до целого числа + NullScale // возвращать нулевое значение + }; - template - struct RoundingComputation + /** Реализация низкоуровневых функций округления для целочисленных значений. + */ + template + struct IntegerRoundingComputation { }; - template - struct RoundingComputation + template + struct IntegerRoundingComputation::value + && ((scale_mode == PositiveScale) || (scale_mode == ZeroScale))>::type> { - using Data = std::array; - using Scale = __m128; + static inline T compute(const T in, size_t scale) + { + return in; + } + }; - template - static inline void prepareScale(size_t scale, Scale & mm_scale, - typename std::enable_if::type * = nullptr) + template + struct IntegerRoundingComputation::value>::type> + { + static inline T compute(T in, size_t scale) + { + T rem = in % scale; + if (rem == in) + return 0; + + in -= rem; + if (static_cast(2 * rem) < scale) + return in; + else + return in + scale; + } + }; + + template + struct IntegerRoundingComputation::value>::type> + { + static inline T compute(const T in, size_t scale) + { + T rem = in % scale; + if (rem == in) + return 0; + + return in - rem + scale; + } + }; + + template + struct IntegerRoundingComputation::value>::type> + { + static inline T compute(const T in, size_t scale) + { + T rem = in % scale; + if (rem == in) + return 0; + + return in - rem; + } + }; + + template + struct BaseFloatRoundingComputation + { + }; + + template<> + struct BaseFloatRoundingComputation + { + using Scale = __m128; + static const size_t data_count = 4; + }; + + template<> + struct BaseFloatRoundingComputation + { + using Scale = __m128d; + static const size_t data_count = 2; + }; + + /** Реализация низкоуровневых функций округления для значений с плавающей точкой. + */ + template + struct FloatRoundingComputation : public BaseFloatRoundingComputation + { + }; + + template + struct FloatRoundingComputation + : public BaseFloatRoundingComputation + { + static inline void prepareScale(size_t scale, Scale & mm_scale) { Float32 fscale = static_cast(scale); mm_scale = _mm_load1_ps(&fscale); } - template - static inline void prepareScale(size_t scale, Scale & mm_scale, - typename std::enable_if::type * = nullptr) + static inline void compute(const Float32 * in, const Scale & scale, Float32 * out) { - } - - template - static inline void compute(const Data & in, const Scale & mm_scale, Data & out, - typename std::enable_if::type * = nullptr) - { - __m128 mm_value = _mm_loadu_ps(reinterpret_cast(&in)); - mm_value = _mm_mul_ps(mm_value, mm_scale); - mm_value = _mm_round_ps(mm_value, rounding_mode); - mm_value = _mm_div_ps(mm_value, mm_scale); - _mm_storeu_ps(reinterpret_cast(&out), mm_value); - } - - template - static inline void compute(const Data & in, const Scale & mm_scale, Data & out, - typename std::enable_if::type * = nullptr) - { - __m128 mm_value = _mm_loadu_ps(reinterpret_cast(&in)); - mm_value = _mm_round_ps(mm_value, rounding_mode); - _mm_storeu_ps(reinterpret_cast(&out), mm_value); + __m128 val = _mm_loadu_ps(in); + val = _mm_mul_ps(val, scale); + val = _mm_round_ps(val, rounding_mode); + val = _mm_div_ps(val, scale); + _mm_storeu_ps(out, val); } }; - template - struct RoundingComputation + template + struct FloatRoundingComputation + : public BaseFloatRoundingComputation { - using Data = std::array; - using Scale = __m128d; + static inline void prepareScale(size_t scale, Scale & mm_scale) + { + Float32 fscale = static_cast(scale); + mm_scale = _mm_load1_ps(&fscale); + } - template - static inline void prepareScale(size_t scale, Scale & mm_scale, - typename std::enable_if::type * = nullptr) + static inline void compute(const Float32 * in, const Scale & scale, Float32 * out) + { + __m128 val = _mm_loadu_ps(in); + val = _mm_div_ps(val, scale); + __m128 res = _mm_cmpge_ps(val, getOne()); + val = _mm_round_ps(val, rounding_mode); + val = _mm_mul_ps(val, scale); + val = _mm_and_ps(val, res); + _mm_storeu_ps(out, val); + } + + private: + static inline const __m128 & getOne() + { + static const __m128 one = _mm_set1_ps(1.0); + return one; + } + }; + + template + struct FloatRoundingComputation + : public BaseFloatRoundingComputation + { + static inline void prepareScale(size_t scale, Scale & mm_scale) + { + } + + static inline void compute(const Float32 * in, const Scale & scale, Float32 * out) + { + __m128 val = _mm_loadu_ps(in); + val = _mm_round_ps(val, rounding_mode); + _mm_storeu_ps(out, val); + } + }; + + template + struct FloatRoundingComputation + : public BaseFloatRoundingComputation + { + static inline void prepareScale(size_t scale, Scale & mm_scale) { Float64 fscale = static_cast(scale); mm_scale = _mm_load1_pd(&fscale); } - template - static inline void prepareScale(size_t scale, Scale & mm_scale, - typename std::enable_if::type * = nullptr) + static inline void compute(const Float64 * in, const Scale & scale, Float64 * out) { - } - - template - static inline void compute(const Data & in, const Scale & mm_scale, Data & out, - typename std::enable_if::type * = nullptr) - { - __m128d mm_value = _mm_loadu_pd(reinterpret_cast(&in)); - mm_value = _mm_mul_pd(mm_value, mm_scale); - mm_value = _mm_round_pd(mm_value, rounding_mode); - mm_value = _mm_div_pd(mm_value, mm_scale); - _mm_storeu_pd(reinterpret_cast(&out), mm_value); - } - - template - static inline void compute(const Data & in, const Scale & mm_scale, Data & out, - typename std::enable_if::type * = nullptr) - { - __m128d mm_value = _mm_loadu_pd(reinterpret_cast(&in)); - mm_value = _mm_round_pd(mm_value, rounding_mode); - _mm_storeu_pd(reinterpret_cast(&out), mm_value); + __m128d val = _mm_loadu_pd(in); + val = _mm_mul_pd(val, scale); + val = _mm_round_pd(val, rounding_mode); + val = _mm_div_pd(val, scale); + _mm_storeu_pd(out, val); } }; - /// Реализация функций округления на высоком уровне. + template + struct FloatRoundingComputation + : public BaseFloatRoundingComputation + { + static inline void prepareScale(size_t scale, Scale & mm_scale) + { + Float64 fscale = static_cast(scale); + mm_scale = _mm_load1_pd(&fscale); + } - template + static inline void compute(const Float64 * in, const Scale & scale, Float64 * out) + { + __m128d val = _mm_loadu_pd(in); + val = _mm_div_pd(val, scale); + __m128d res = _mm_cmpge_pd(val, getOne()); + val = _mm_round_pd(val, rounding_mode); + val = _mm_mul_pd(val, scale); + val = _mm_and_pd(val, res); + _mm_storeu_pd(out, val); + } + + private: + static inline const __m128d & getOne() + { + static const __m128d one = _mm_set1_pd(1.0); + return one; + } + }; + + template + struct FloatRoundingComputation + : public BaseFloatRoundingComputation + { + static inline void prepareScale(size_t scale, Scale & mm_scale) + { + } + + static inline void compute(const Float64 * in, const Scale & scale, Float64 * out) + { + __m128d val = _mm_loadu_pd(in); + val = _mm_round_pd(val, rounding_mode); + _mm_storeu_pd(out, val); + } + }; + + /** Реализация высокоуровневых функций округления. + */ + template struct FunctionRoundingImpl { }; - /// В случае целочисленных значений не выполяется округления. - template - struct FunctionRoundingImpl::value>::type> + /** Реализация высокоуровневых функций округления для целочисленных значений. + */ + template + struct FunctionRoundingImpl::value && (scale_mode != NullScale)>::type> { + private: + using Op = IntegerRoundingComputation; + + public: static inline void apply(const PODArray & in, size_t scale, typename ColumnVector::Container_t & out) { size_t size = in.size(); for (size_t i = 0; i < size; ++i) - out[i] = in[i]; + out[i] = Op::compute(in[i], scale); } static inline T apply(T val, size_t scale) { - return val; + return Op::compute(val, scale); } }; - template - struct FunctionRoundingImpl::value>::type> + /** Реализация высокоуровневых функций округления для значений с плавающей точкой. + */ + template + struct FunctionRoundingImpl::value && (scale_mode != NullScale)>::type> { private: - using Op = RoundingComputation; - using Data = typename Op::Data; + using Op = FloatRoundingComputation; + using Data = std::array; using Scale = typename Op::Scale; public: @@ -220,32 +375,22 @@ namespace DB Op::prepareScale(scale, mm_scale); const size_t size = in.size(); - const size_t data_size = std::tuple_size(); + const size_t data_count = std::tuple_size(); size_t i; - for (i = 0; i < (size - data_size + 1); i += data_size) - { - Data tmp; - for (size_t j = 0; j < data_size; ++j) - tmp[j] = in[i + j]; - - Data res; - Op::compute(tmp, mm_scale, res); - - for (size_t j = 0; j < data_size; ++j) - out[i + j] = res[j]; - } + for (i = 0; i < (size - data_count + 1); i += data_count) + Op::compute(reinterpret_cast(&in[i]), mm_scale, reinterpret_cast(&out[i])); if (i < size) { Data tmp{0}; - for (size_t j = 0; (j < data_size) && ((i + j) < size); ++j) + for (size_t j = 0; (j < data_count) && ((i + j) < size); ++j) tmp[j] = in[i + j]; Data res; - Op::compute(tmp, mm_scale, res); + Op::compute(reinterpret_cast(&tmp), mm_scale, reinterpret_cast(&res)); - for (size_t j = 0; (j < data_size) && ((i + j) < size); ++j) + for (size_t j = 0; (j < data_count) && ((i + j) < size); ++j) out[i + j] = res[j]; } } @@ -263,50 +408,29 @@ namespace DB tmp[0] = val; Data res; - Op::compute(tmp, mm_scale, res); + Op::compute(reinterpret_cast(&tmp), mm_scale, reinterpret_cast(&res)); return res[0]; } } }; - template - struct PrecisionForType + /** Реализация высокоуровневых функций округления в том случае, когда возвращается нулевое значение. + */ + template + struct FunctionRoundingImpl::type> { - template - static inline bool apply(const ColumnPtr & column, UInt8 & precision, - typename std::enable_if::value>::type * = nullptr) + public: + static inline void apply(const PODArray & in, size_t scale, typename ColumnVector::Container_t & out) { - using ColumnType = ColumnConst; - - const ColumnType * precision_col = typeid_cast(&*column); - if (precision_col == nullptr) - return false; - - U val = precision_col->getData(); - if (val < 0) - val = 0; - else if (val >= static_cast(std::numeric_limits::digits10)) - val = static_cast(std::numeric_limits::digits10); - - precision = static_cast(val); - - return true; + size_t size = in.size(); + for (size_t i = 0; i < size; ++i) + out[i] = 0; } - /// Для целых чисел точность не имеет значения. - template - static inline bool apply(const ColumnPtr & column, UInt8 & precision, - typename std::enable_if::value>::type * = nullptr) + static inline T apply(T val, size_t scale) { - using ColumnType = ColumnConst; - - const ColumnType * precision_col = typeid_cast(&*column); - if (precision_col == nullptr) - return false; - - precision = 0; - - return true; + return 0; } }; @@ -360,10 +484,238 @@ namespace using result = typename FillArrayImpl::result; }; - /** Шаблон для функций, которые вычисляют приближенное значение входного параметра - * типа (U)Int8/16/32/64 или Float32/64 и принимают дополнительный необязятельный - * параметр указывающий сколько знаков после запятой оставить (по умолчанию - 0). - * Op - функция (round/floor/ceil) + /** Этот шаблон определяет точность, которую используют функции round/ceil/floor, + * затем преобразовывает её в значение, которое можно использовать в операциях + * умножения и деления. Поэтому оно называется масштабом. + */ + template + struct ScaleForRightType + { + }; + + template + struct ScaleForRightType::value + && std::is_signed::value>::type> + { + static inline bool apply(const ColumnPtr & column, ScaleMode & scale_mode, size_t & scale) + { + using PowersOf10 = typename FillArray::digits10 + 1>::result; + using ColumnType = ColumnConst; + + const ColumnType * precision_col = typeid_cast(&*column); + if (precision_col == nullptr) + return false; + + U val = precision_col->getData(); + if (val < 0) + { + if (val < -static_cast(std::numeric_limits::digits10)) + { + scale_mode = NullScale; + scale = 1; + } + else + { + scale_mode = NegativeScale; + scale = PowersOf10::values[-val]; + } + } + else if (val == 0) + { + scale_mode = ZeroScale; + scale = 1; + } + else + { + scale_mode = PositiveScale; + if (val > std::numeric_limits::digits10) + val = static_cast(std::numeric_limits::digits10); + scale = PowersOf10::values[val]; + } + + return true; + } + }; + + template + struct ScaleForRightType::value + && std::is_unsigned::value>::type> + { + static inline bool apply(const ColumnPtr & column, ScaleMode & scale_mode, size_t & scale) + { + using PowersOf10 = typename FillArray::digits10 + 1>::result; + using ColumnType = ColumnConst; + + const ColumnType * precision_col = typeid_cast(&*column); + if (precision_col == nullptr) + return false; + + U val = precision_col->getData(); + if (val == 0) + { + scale_mode = ZeroScale; + scale = 1; + } + else + { + scale_mode = PositiveScale; + if (val > static_cast(std::numeric_limits::digits10)) + val = static_cast(std::numeric_limits::digits10); + scale = PowersOf10::values[val]; + } + + return true; + } + }; + + template + struct ScaleForRightType::value + && std::is_signed::value>::type> + { + static inline bool apply(const ColumnPtr & column, ScaleMode & scale_mode, size_t & scale) + { + using PowersOf10 = typename FillArray::digits10 + 1>::result; + using ColumnType = ColumnConst; + + const ColumnType * precision_col = typeid_cast(&*column); + if (precision_col == nullptr) + return false; + + U val = precision_col->getData(); + if (val < 0) + { + if (val < -std::numeric_limits::digits10) + { + scale_mode = NullScale; + scale = 1; + } + else + { + scale_mode = NegativeScale; + scale = PowersOf10::values[-val]; + } + } + else + { + scale_mode = ZeroScale; + scale = 1; + } + + return true; + } + }; + + template + struct ScaleForRightType::value + && std::is_unsigned::value>::type> + { + static inline bool apply(const ColumnPtr & column, ScaleMode & scale_mode, size_t & scale) + { + using ColumnType = ColumnConst; + + const ColumnType * precision_col = typeid_cast(&*column); + if (precision_col == nullptr) + return false; + + scale_mode = ZeroScale; + scale = 1; + + return true; + } + }; + + /** Превратить параметр точности в масштаб. + */ + template + struct ScaleForLeftType + { + static inline void apply(const ColumnPtr & column, ScaleMode & scale_mode, size_t & scale) + { + if (!( ScaleForRightType::apply(column, scale_mode, scale) + || ScaleForRightType::apply(column, scale_mode, scale) + || ScaleForRightType::apply(column, scale_mode, scale) + || ScaleForRightType::apply(column, scale_mode, scale) + || ScaleForRightType::apply(column, scale_mode, scale) + || ScaleForRightType::apply(column, scale_mode, scale) + || ScaleForRightType::apply(column, scale_mode, scale) + || ScaleForRightType::apply(column, scale_mode, scale) + || ScaleForRightType::apply(column, scale_mode, scale) + || ScaleForRightType::apply(column, scale_mode, scale) + || ScaleForRightType::apply(column, scale_mode, scale))) + { + throw Exception("Internal error", ErrorCodes::LOGICAL_ERROR); + } + } + }; + + /** Главный шаблон применяющий функцию округления к значению или столбцу. + */ + template + struct Cruncher + { + using Op = FunctionRoundingImpl; + + static inline void apply(Block & block, ColumnVector * col, const ColumnNumbers & arguments, size_t result, size_t scale) + { + ColumnVector * col_res = new ColumnVector; + block.getByPosition(result).column = col_res; + + typename ColumnVector::Container_t & vec_res = col_res->getData(); + vec_res.resize(col->getData().size()); + + Op::apply(col->getData(), scale, vec_res); + } + + static inline void apply(Block & block, ColumnConst * col, const ColumnNumbers & arguments, size_t result, size_t scale) + { + T res = Op::apply(col->getData(), scale); + ColumnConst * col_res = new ColumnConst(col->size(), res); + block.getByPosition(result).column = col_res; + } + }; + + /** Выбрать подходящий алгоритм обработки в зависимости от масштаба. + */ + template class U, int rounding_mode> + struct Dispatcher + { + static inline void apply(Block & block, U * col, const ColumnNumbers & arguments, size_t result) + { + ScaleMode scale_mode; + size_t scale; + + if (arguments.size() == 2) + ScaleForLeftType::apply(block.getByPosition(arguments[1]).column, scale_mode, scale); + else + { + scale_mode = ZeroScale; + scale = 1; + } + + if (scale_mode == PositiveScale) + Cruncher::apply(block, col, arguments, result, scale); + else if (scale_mode == ZeroScale) + Cruncher::apply(block, col, arguments, result, scale); + else if (scale_mode == NegativeScale) + Cruncher::apply(block, col, arguments, result, scale); + else if (scale_mode == NullScale) + Cruncher::apply(block, col, arguments, result, scale); + else + throw Exception("Illegal operation", ErrorCodes::LOGICAL_ERROR); + } + }; + + /** Шаблон для функций, которые округляют значение входного параметра типа + * (U)Int8/16/32/64 или Float32/64, и принимают дополнительный необязятельный + * параметр (по умолчанию - 0). */ template class FunctionRounding : public IFunction @@ -372,9 +724,6 @@ namespace static constexpr auto name = Name::name; static IFunction * create(const Context & context) { return new FunctionRounding; } - private: - using PowersOf10 = FillArray::digits10 + 1>::result; - private: template bool checkType(const IDataType * type) const @@ -385,72 +734,18 @@ namespace template bool executeForType(Block & block, const ColumnNumbers & arguments, size_t result) { - using OpWithScale = FunctionRoundingImpl; - using OpWithoutScale = FunctionRoundingImpl; - if (ColumnVector * col = typeid_cast *>(&*block.getByPosition(arguments[0]).column)) { - ColumnVector * col_res = new ColumnVector; - block.getByPosition(result).column = col_res; - - typename ColumnVector::Container_t & vec_res = col_res->getData(); - vec_res.resize(col->getData().size()); - - UInt8 precision = 0; - if (arguments.size() == 2) - precision = getPrecision(block.getByPosition(arguments[1]).column); - - if (precision > 0) - OpWithScale::apply(col->getData(), PowersOf10::values[precision], vec_res); - else - OpWithoutScale::apply(col->getData(), 0, vec_res); - + Dispatcher::apply(block, col, arguments, result); return true; } else if (ColumnConst * col = typeid_cast *>(&*block.getByPosition(arguments[0]).column)) { - UInt8 precision = 0; - if (arguments.size() == 2) - precision = getPrecision(block.getByPosition(arguments[1]).column); - - T res; - if (precision > 0) - res = OpWithScale::apply(col->getData(), PowersOf10::values[precision]); - else - res = OpWithoutScale::apply(col->getData(), 0); - - ColumnConst * col_res = new ColumnConst(col->size(), res); - block.getByPosition(result).column = col_res; - + Dispatcher::apply(block, col, arguments, result); return true; } - - return false; - } - - /// В зависимости от входного параметра, определить какая нужна точность - /// для результата. - template - UInt8 getPrecision(const ColumnPtr & column) - { - UInt8 precision = 0; - - if (!( PrecisionForType::apply(column, precision) - || PrecisionForType::apply(column, precision) - || PrecisionForType::apply(column, precision) - || PrecisionForType::apply(column, precision) - || PrecisionForType::apply(column, precision) - || PrecisionForType::apply(column, precision) - || PrecisionForType::apply(column, precision) - || PrecisionForType::apply(column, precision) - || PrecisionForType::apply(column, precision))) - { - throw Exception("Illegal column " + column->getName() - + " of second ('precision') argument of function " + getName(), - ErrorCodes::ILLEGAL_COLUMN); - } - - return precision; + else + return false; } public: @@ -478,7 +773,9 @@ namespace || checkType(type) || checkType(type) || checkType(type) - || checkType(type))) + || checkType(type) + || checkType(type) + || checkType(type))) { throw Exception("Illegal type in second argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);