Fixed half of bad code [#CLICKHOUSE-2].

This commit is contained in:
Alexey Milovidov 2017-09-16 21:36:16 +03:00
parent 0002507598
commit ad13a8c79d
2 changed files with 150 additions and 638 deletions

View File

@ -19,7 +19,7 @@ namespace ErrorCodes
/** ColumnConst contains another column with single element,
* but looks like a column with arbitary amount of same elements.
*/
class ColumnConst : public IColumn
class ColumnConst final : public IColumn
{
private:
ColumnPtr data;

View File

@ -126,7 +126,6 @@ enum class ScaleMode
Positive, // round to a number with N decimal places after the decimal point
Negative, // round to an integer with N zero characters
Zero, // round to an integer
Null // return zero value
};
enum class RoundingMode
@ -150,6 +149,13 @@ enum class RoundingMode
template <typename T, RoundingMode rounding_mode, ScaleMode scale_mode>
struct IntegerRoundingComputation
{
static const size_t data_count = 1;
static size_t prepare(size_t scale)
{
return scale;
}
template <size_t scale>
static inline T computeImpl(T x)
{
@ -174,8 +180,6 @@ struct IntegerRoundingComputation
{
switch (scale_mode)
{
case ScaleMode::Null:
return 0;
case ScaleMode::Zero:
return x;
case ScaleMode::Positive:
@ -210,10 +214,17 @@ struct IntegerRoundingComputation
}
}
}
static inline void compute(const T * __restrict in, size_t scale, T * __restrict out)
{
*out = compute(*in, scale);
}
};
#if __SSE4_1__
template <typename T>
class BaseFloatRoundingComputation;
@ -221,26 +232,20 @@ template <>
class BaseFloatRoundingComputation<Float32>
{
public:
using Scale = __m128;
using ScalarType = Float32;
using VectorType = __m128;
static const size_t data_count = 4;
protected:
static inline const __m128 & getZero()
{
static const __m128 zero = _mm_set1_ps(0.0);
return zero;
}
static VectorType load(const ScalarType * in) { return _mm_loadu_ps(in); }
static VectorType load1(const ScalarType in) { return _mm_load1_ps(&in); }
static void store(ScalarType * out, VectorType val) { _mm_storeu_ps(out, val);}
static VectorType multiply(VectorType val, VectorType scale) { return _mm_mul_ps(val, scale); }
static VectorType divide(VectorType val, VectorType scale) { return _mm_div_ps(val, scale); }
template <RoundingMode mode> static VectorType apply(VectorType val) { return _mm_round_ps(val, int(mode)); }
static inline const __m128 & getOne()
static VectorType prepare(size_t scale)
{
static const __m128 one = _mm_set1_ps(1.0);
return one;
}
static inline const __m128 & getTwo()
{
static const __m128 two = _mm_set1_ps(2.0);
return two;
return load1(scale);
}
};
@ -248,625 +253,170 @@ template <>
class BaseFloatRoundingComputation<Float64>
{
public:
using Scale = __m128d;
using ScalarType = Float64;
using VectorType = __m128d;
static const size_t data_count = 2;
protected:
static inline const __m128d & getZero()
{
static const __m128d zero = _mm_set1_pd(0.0);
return zero;
}
static VectorType load(const ScalarType * in) { return _mm_loadu_pd(in); }
static VectorType load1(const ScalarType in) { return _mm_load1_pd(&in); }
static void store(ScalarType * out, VectorType val) { _mm_storeu_pd(out, val);}
static VectorType multiply(VectorType val, VectorType scale) { return _mm_mul_pd(val, scale); }
static VectorType divide(VectorType val, VectorType scale) { return _mm_div_pd(val, scale); }
template <RoundingMode mode> static VectorType apply(VectorType val) { return _mm_round_pd(val, int(mode)); }
static inline const __m128d & getOne()
static VectorType prepare(size_t scale)
{
static const __m128d one = _mm_set1_pd(1.0);
return one;
}
static inline const __m128d & getTwo()
{
static const __m128d two = _mm_set1_pd(2.0);
return two;
return load1(scale);
}
};
/** Implementation of low-level round-off functions for floating-point values.
*/
template <typename T, RoundingMode rounding_mode, ScaleMode scale_mode>
class FloatRoundingComputation;
template <RoundingMode rounding_mode>
class FloatRoundingComputation<Float32, rounding_mode, ScaleMode::Positive>
: public BaseFloatRoundingComputation<Float32>
{
public:
static inline void prepare(size_t scale, Scale & mm_scale)
{
Float32 fscale = static_cast<Float32>(scale);
mm_scale = _mm_load1_ps(&fscale);
}
static inline void compute(const Float32 * __restrict in, const Scale & scale, Float32 * __restrict out)
{
__m128 val = _mm_loadu_ps(in);
/// Rounding algorithm.
val = _mm_mul_ps(val, scale);
val = _mm_round_ps(val, int(rounding_mode));
val = _mm_div_ps(val, scale);
_mm_storeu_ps(out, val);
}
};
template <RoundingMode rounding_mode>
class FloatRoundingComputation<Float32, rounding_mode, ScaleMode::Negative>
: public BaseFloatRoundingComputation<Float32>
{
public:
static inline void prepare(size_t scale, Scale & mm_scale)
{
Float32 fscale = static_cast<Float32>(scale);
mm_scale = _mm_load1_ps(&fscale);
}
static inline void compute(const Float32 * __restrict in, const Scale & scale, Float32 * __restrict out)
{
__m128 val = _mm_loadu_ps(in);
/// Turn negative values into positive values.
__m128 sign = _mm_cmpge_ps(val, getZero());
sign = _mm_min_ps(sign, getTwo());
sign = _mm_sub_ps(sign, getOne());
val = _mm_mul_ps(val, sign);
/// Rounding algorithm.
val = _mm_div_ps(val, scale);
__m128 res = _mm_cmpge_ps(val, getOneTenth());
val = _mm_round_ps(val, int(rounding_mode));
val = _mm_mul_ps(val, scale);
val = _mm_and_ps(val, res);
/// Return the real signs of all values.
val = _mm_mul_ps(val, sign);
_mm_storeu_ps(out, val);
}
private:
static inline const __m128 & getOneTenth()
{
static const __m128 one_tenth = _mm_set1_ps(0.1);
return one_tenth;
}
};
template <RoundingMode rounding_mode>
class FloatRoundingComputation<Float32, rounding_mode, ScaleMode::Zero>
: public BaseFloatRoundingComputation<Float32>
{
public:
static inline void prepare(size_t scale, Scale & mm_scale)
{
}
static inline void compute(const Float32 * __restrict in, const Scale & scale, Float32 * __restrict out)
{
__m128 val = _mm_loadu_ps(in);
val = _mm_round_ps(val, int(rounding_mode));
_mm_storeu_ps(out, val);
}
};
template <RoundingMode rounding_mode>
class FloatRoundingComputation<Float64, rounding_mode, ScaleMode::Positive>
: public BaseFloatRoundingComputation<Float64>
{
public:
static inline void prepare(size_t scale, Scale & mm_scale)
{
Float64 fscale = static_cast<Float64>(scale);
mm_scale = _mm_load1_pd(&fscale);
}
static inline void compute(const Float64 * __restrict in, const Scale & scale, Float64 * __restrict out)
{
__m128d val = _mm_loadu_pd(in);
/// Rounding algorithm.
val = _mm_mul_pd(val, scale);
val = _mm_round_pd(val, int(rounding_mode));
val = _mm_div_pd(val, scale);
_mm_storeu_pd(out, val);
}
};
template <RoundingMode rounding_mode>
class FloatRoundingComputation<Float64, rounding_mode, ScaleMode::Negative>
: public BaseFloatRoundingComputation<Float64>
{
public:
static inline void prepare(size_t scale, Scale & mm_scale)
{
Float64 fscale = static_cast<Float64>(scale);
mm_scale = _mm_load1_pd(&fscale);
}
static inline void compute(const Float64 * __restrict in, const Scale & scale, Float64 * __restrict out)
{
__m128d val = _mm_loadu_pd(in);
/// Turn negative values into positive values.
__m128d sign = _mm_cmpge_pd(val, getZero());
sign = _mm_min_pd(sign, getTwo());
sign = _mm_sub_pd(sign, getOne());
val = _mm_mul_pd(val, sign);
/// Rounding algorithm.
val = _mm_div_pd(val, scale);
__m128d res = _mm_cmpge_pd(val, getOneTenth());
val = _mm_round_pd(val, int(rounding_mode));
val = _mm_mul_pd(val, scale);
val = _mm_and_pd(val, res);
/// Return the real signs of all values.
val = _mm_mul_pd(val, sign);
_mm_storeu_pd(out, val);
}
private:
static inline const __m128d & getOneTenth()
{
static const __m128d one_tenth = _mm_set1_pd(0.1);
return one_tenth;
}
};
template <RoundingMode rounding_mode>
class FloatRoundingComputation<Float64, rounding_mode, ScaleMode::Zero>
: public BaseFloatRoundingComputation<Float64>
{
public:
static inline void prepare(size_t scale, Scale & mm_scale)
{
}
static inline void compute(const Float64 * __restrict in, const Scale & scale, Float64 * __restrict out)
{
__m128d val = _mm_loadu_pd(in);
val = _mm_round_pd(val, int(rounding_mode));
_mm_storeu_pd(out, val);
}
};
#else
/// Implementation for ARM. Not vectorized. Does not fix negative zeros.
/// Implementation for ARM. Not vectorized.
template <RoundingMode rounding_mode> float roundWithMode(float x);
template <> float roundWithMode<RoundingMode::Round>(float x) { return roundf(x); }
template <> float roundWithMode<RoundingMode::Floor>(float x) { return floorf(x); }
template <> float roundWithMode<RoundingMode::Ceil>(float x) { return ceilf(x); }
template <> float roundWithMode<RoundingMode::Trunc>(float x) { return truncf(x); }
template <RoundingMode rounding_mode> double roundWithMode(double x);
template <> double roundWithMode<RoundingMode::Round>(double x) { return round(x); }
template <> double roundWithMode<RoundingMode::Floor>(double x) { return floor(x); }
template <> double roundWithMode<RoundingMode::Ceil>(double x) { return ceil(x); }
template <> double roundWithMode<RoundingMode::Trunc>(double x) { return trunc(x); }
inline float roundWithMode(float x, RoundingMode mode)
{
switch (mode)
{
case RoundingMode::Round: return roundf(x);
case RoundingMode::Floor: return floorf(x);
case RoundingMode::Ceil: return ceilf(x);
case RoundingMode::Trunc: return truncf(x);
}
}
inline double roundWithMode(double x, RoundingMode mode)
{
switch (mode)
{
case RoundingMode::Round: return round(x);
case RoundingMode::Floor: return floor(x);
case RoundingMode::Ceil: return ceil(x);
case RoundingMode::Trunc: return trunc(x);
}
}
template <typename T>
class BaseFloatRoundingComputation
{
public:
using Scale = T;
using ScalarType = T;
using VectorType = T;
static const size_t data_count = 1;
static inline void prepare(size_t scale, Scale & mm_scale)
static VectorType load(const ScalarType * in) { return *in; }
static VectorType load1(const ScalarType in) { return in; }
static VectorType store(ScalarType * out, ScalarType val) { return *out = val;}
static VectorType multiply(VectorType val, VectorType scale) { return val * scale; }
static VectorType divide(VectorType val, VectorType scale) { return val / scale; }
template <RoundingMode mode> static VectorType apply(VectorType val) { return roundWithMode(val, mode); }
static VectorType prepare(size_t scale)
{
mm_scale = static_cast<T>(scale);
return load1(scale);
}
};
template <typename T, RoundingMode rounding_mode, ScaleMode scale_mode>
class FloatRoundingComputation;
template <typename T, RoundingMode rounding_mode>
class FloatRoundingComputation<T, rounding_mode, ScaleMode::Positive>
: public BaseFloatRoundingComputation<T>
{
public:
static inline void compute(const T * __restrict in, const T & scale, T * __restrict out)
{
out[0] = roundWithMode<rounding_mode>(in[0] * scale) / scale;
}
};
template <typename T, RoundingMode rounding_mode>
class FloatRoundingComputation<T, rounding_mode, ScaleMode::Negative>
: public BaseFloatRoundingComputation<T>
{
public:
static inline void compute(const T * __restrict in, const T & scale, T * __restrict out)
{
out[0] = roundWithMode<rounding_mode>(in[0] / scale) * scale;
}
};
template <typename T, RoundingMode rounding_mode>
class FloatRoundingComputation<T, rounding_mode, ScaleMode::Zero>
: public BaseFloatRoundingComputation<T>
{
public:
static inline void prepare(size_t scale, T & mm_scale)
{
}
static inline void compute(const T * __restrict in, const T & scale, T * __restrict out)
{
out[0] = roundWithMode<rounding_mode>(in[0]);
}
};
#endif
/** Implementation of low-level round-off functions for floating-point values.
*/
template <typename T, RoundingMode rounding_mode, ScaleMode scale_mode>
class FloatRoundingComputation : public BaseFloatRoundingComputation<T>
{
using Base = BaseFloatRoundingComputation<T>;
public:
static inline void compute(const T * __restrict in, const typename Base::VectorType & scale, T * __restrict out)
{
auto val = Base::load(in);
if (scale_mode == ScaleMode::Positive)
val = Base::multiply(val, scale);
else if (scale_mode == ScaleMode::Negative)
val = Base::divide(val, scale);
val = Base::template apply<rounding_mode>(val);
if (scale_mode == ScaleMode::Positive)
val = Base::divide(val, scale);
else if (scale_mode == ScaleMode::Negative)
val = Base::multiply(val, scale);
Base::store(out, val);
}
};
/** Implementing high-level rounding functions.
*/
template <typename T, RoundingMode rounding_mode, ScaleMode scale_mode, typename Enable = void>
struct FunctionRoundingImpl;
/** Implement high-level rounding functions for integer values.
*/
template <typename T, RoundingMode rounding_mode, ScaleMode scale_mode>
struct FunctionRoundingImpl<T, rounding_mode, scale_mode,
typename std::enable_if<std::is_integral<T>::value && (scale_mode != ScaleMode::Null)>::type>
struct FunctionRoundingImpl
{
private:
using Op = IntegerRoundingComputation<T, rounding_mode, scale_mode>;
using Op = typename std::conditional<std::is_floating_point<T>::value,
FloatRoundingComputation<T, rounding_mode, scale_mode>,
IntegerRoundingComputation<T, rounding_mode, scale_mode>>::type;
public:
static inline void apply(const PaddedPODArray<T> & in, size_t scale, typename ColumnVector<T>::Container_t & out)
{
const T* begin_in = &in[0];
const T* end_in = begin_in + in.size();
T* __restrict p_out = &out[0];
for (const T* __restrict p_in = begin_in; p_in != end_in; ++p_in)
{
*p_out = Op::compute(*p_in, scale);
++p_out;
}
}
static inline T apply(T val, size_t scale)
{
return Op::compute(val, scale);
}
};
/** Implement high-level round-off functions for floating-point values.
*/
template <typename T, RoundingMode rounding_mode, ScaleMode scale_mode>
struct FunctionRoundingImpl<T, rounding_mode, scale_mode,
typename std::enable_if<std::is_floating_point<T>::value && (scale_mode != ScaleMode::Null)>::type>
{
private:
using Op = FloatRoundingComputation<T, rounding_mode, scale_mode>;
using Data = std::array<T, Op::data_count>;
using Scale = typename Op::Scale;
public:
static inline void apply(const PaddedPODArray<T> & in, size_t scale, typename ColumnVector<T>::Container_t & out)
static NO_INLINE void apply(const PaddedPODArray<T> & in, size_t scale, typename ColumnVector<T>::Container_t & out)
{
Scale mm_scale;
Op::prepare(scale, mm_scale);
auto mm_scale = Op::prepare(scale);
const size_t data_count = std::tuple_size<Data>();
const T* begin_in = &in[0];
const T* end_in = begin_in + in.size();
const T* end_in = in.data() + in.size();
const T* limit = in.data() + in.size() / data_count * data_count;
T* begin_out = &out[0];
const T* end_out = begin_out + out.size();
const T* __restrict p_in = in.data();
T* __restrict p_out = out.data();
const T* limit = begin_in + in.size() / data_count * data_count;
const T* __restrict p_in = begin_in;
T* __restrict p_out = begin_out;
for (; p_in < limit; p_in += data_count)
while (p_in < limit)
{
Op::compute(p_in, mm_scale, p_out);
p_in += data_count;
p_out += data_count;
}
if (p_in < end_in)
{
Data tmp{{}};
T* begin_tmp = &tmp[0];
const T* end_tmp = begin_tmp + data_count;
Data tmp_src{{}};
Data tmp_dst;
for (T* __restrict p_tmp = begin_tmp; (p_tmp != end_tmp) && (p_in != end_in); ++p_tmp)
{
*p_tmp = *p_in;
++p_in;
}
size_t tail_size_bytes = (end_in - p_in) * sizeof(*p_in);
Data res;
const T* begin_res = &res[0];
const T* end_res = begin_res + data_count;
Op::compute(reinterpret_cast<T *>(&tmp), mm_scale, reinterpret_cast<T *>(&res));
for (const T* __restrict p_res = begin_res; (p_res != end_res) && (p_out != end_out); ++p_res)
{
*p_out = *p_res;
++p_out;
}
}
}
static inline T apply(T val, size_t scale)
{
if (val == 0)
return val;
else
{
Scale mm_scale;
Op::prepare(scale, mm_scale);
Data tmp{{}};
tmp[0] = val;
Data res;
Op::compute(reinterpret_cast<T *>(&tmp), mm_scale, reinterpret_cast<T *>(&res));
return res[0];
memcpy(&tmp_src, p_in, tail_size_bytes);
Op::compute(reinterpret_cast<T *>(&tmp_src), mm_scale, reinterpret_cast<T *>(&tmp_dst));
memcpy(p_out, &tmp_dst, tail_size_bytes);
}
}
};
/** Implementation of high-level rounding functions in the case when a zero value is returned.
*/
template <typename T, RoundingMode rounding_mode, ScaleMode scale_mode>
struct FunctionRoundingImpl<T, rounding_mode, scale_mode,
typename std::enable_if<scale_mode == ScaleMode::Null>::type>
{
public:
static inline void apply(const PaddedPODArray<T> & in, size_t scale, typename ColumnVector<T>::Container_t & out)
{
::memset(reinterpret_cast<T *>(&out[0]), 0, in.size() * sizeof(T));
}
static inline T apply(T val, size_t scale)
{
return 0;
}
};
/// The following code generates a table of powers of 10 during the build.
namespace
{
/// Individual degrees of the number 10.
template <size_t N>
struct PowerOf10
{
static const size_t value = 10 * PowerOf10<N - 1>::value;
};
template <>
struct PowerOf10<0>
{
static const size_t value = 1;
};
}
/// Declaring and defining a container containing a table of powers of 10.
template <size_t... TArgs>
struct TableContainer
{
static const std::array<size_t, sizeof...(TArgs)> values;
};
template <size_t... TArgs>
const std::array<size_t, sizeof...(TArgs)> TableContainer<TArgs...>::values {{ TArgs... }};
/// The generator of the first N degrees.
template <size_t N, size_t... TArgs>
struct FillArrayImpl
{
using result = typename FillArrayImpl<N - 1, PowerOf10<N>::value, TArgs...>::result;
};
template <size_t... TArgs>
struct FillArrayImpl<0, TArgs...>
{
using result = TableContainer<PowerOf10<0>::value, TArgs...>;
};
template <size_t N>
struct FillArray
{
using result = typename FillArrayImpl<N - 1>::result;
};
/** This pattern defines the precision that the round/ceil/floor functions use,
* then converts it to a value that can be used in operations of
* multiplication and division. Therefore, it is called a scale.
/** Select the appropriate processing algorithm depending on the scale.
*/
template <typename T, typename U, typename Enable = void>
struct ScaleForRightType;
template <typename T, typename U>
struct ScaleForRightType<T, U,
typename std::enable_if<
std::is_floating_point<T>::value
&& std::is_signed<U>::value>::type>
template <typename T, RoundingMode rounding_mode>
struct Dispatcher
{
static inline bool apply(const ColumnPtr & column, ScaleMode & scale_mode, size_t & scale)
static void apply(Block & block, const ColumnVector<T> * col, const ColumnNumbers & arguments, size_t result)
{
using PowersOf10 = typename FillArray<std::numeric_limits<T>::digits10 + 1>::result;
ScaleMode scale_mode;
size_t scale = 1;
Int64 scale_arg = 0;
auto precision_col = checkAndGetColumnConst<ColumnVector<U>>(column.get());
if (!precision_col)
return false;
if (arguments.size() == 2)
{
const IColumn & scale_column = *block.getByPosition(arguments[1]).column;
if (!scale_column.isConst())
throw Exception("Scale argument for rounding functions must be constant.", ErrorCodes::ILLEGAL_COLUMN);
U val = precision_col->template getValue<U>();
if (val < 0)
{
if (val < -static_cast<U>(std::numeric_limits<T>::digits10))
{
scale_mode = ScaleMode::Null;
scale = 1;
}
else
{
scale_mode = ScaleMode::Negative;
scale = PowersOf10::values[-val];
}
}
else if (val == 0)
{
scale_mode = ScaleMode::Zero;
scale = 1;
}
else
{
scale_mode = ScaleMode::Positive;
if (val > std::numeric_limits<T>::digits10)
val = static_cast<U>(std::numeric_limits<T>::digits10);
scale = PowersOf10::values[val];
scale_arg = applyVisitor(FieldVisitorConvertToNumber<Int64>(),
static_cast<const ColumnConst &>(scale_column).getField());
}
return true;
}
};
template <typename T, typename U>
struct ScaleForRightType<T, U,
typename std::enable_if<
std::is_floating_point<T>::value
&& std::is_unsigned<U>::value>::type>
{
static inline bool apply(const ColumnPtr & column, ScaleMode & scale_mode, size_t & scale)
{
using PowersOf10 = typename FillArray<std::numeric_limits<T>::digits10 + 1>::result;
auto precision_col = checkAndGetColumnConst<ColumnVector<U>>(column.get());
if (!precision_col)
return false;
U val = precision_col->template getValue<U>();
if (val == 0)
{
scale_mode = ScaleMode::Zero;
scale = 1;
}
else
{
scale_mode = ScaleMode::Positive;
if (val > static_cast<U>(std::numeric_limits<T>::digits10))
val = static_cast<U>(std::numeric_limits<T>::digits10);
scale = PowersOf10::values[val];
}
return true;
}
};
template <typename T, typename U>
struct ScaleForRightType<T, U,
typename std::enable_if<
std::is_integral<T>::value
&& std::is_signed<U>::value>::type>
{
static inline bool apply(const ColumnPtr & column, ScaleMode & scale_mode, size_t & scale)
{
using PowersOf10 = typename FillArray<std::numeric_limits<T>::digits10 + 1>::result;
auto precision_col = checkAndGetColumnConst<ColumnVector<U>>(column.get());
if (!precision_col)
return false;
U val = precision_col->template getValue<U>();
if (val < 0)
{
if (val < -std::numeric_limits<T>::digits10)
{
scale_mode = ScaleMode::Null;
scale = 1;
}
else
{
scale_mode = ScaleMode::Negative;
scale = PowersOf10::values[-val];
}
}
else
{
scale_mode = ScaleMode::Zero;
scale = 1;
}
return true;
}
};
template <typename T, typename U>
struct ScaleForRightType<T, U,
typename std::enable_if<
std::is_integral<T>::value
&& std::is_unsigned<U>::value>::type>
{
static inline bool apply(const ColumnPtr & column, ScaleMode & scale_mode, size_t & scale)
{
auto precision_col = checkAndGetColumnConst<ColumnVector<U>>(column.get());
if (!precision_col)
return false;
scale_mode = ScaleMode::Zero;
scale = 1;
return true;
}
};
/** Turn the precision parameter into a scale.
*/
template <typename T>
struct ScaleForLeftType
{
static inline void apply(const ColumnPtr & column, ScaleMode & scale_mode, size_t & scale)
{
if (!( ScaleForRightType<T, UInt8>::apply(column, scale_mode, scale)
|| ScaleForRightType<T, UInt16>::apply(column, scale_mode, scale)
|| ScaleForRightType<T, UInt16>::apply(column, scale_mode, scale)
|| ScaleForRightType<T, UInt32>::apply(column, scale_mode, scale)
|| ScaleForRightType<T, UInt64>::apply(column, scale_mode, scale)
|| ScaleForRightType<T, Int8>::apply(column, scale_mode, scale)
|| ScaleForRightType<T, Int16>::apply(column, scale_mode, scale)
|| ScaleForRightType<T, Int32>::apply(column, scale_mode, scale)
|| ScaleForRightType<T, Int64>::apply(column, scale_mode, scale)
|| ScaleForRightType<T, Float32>::apply(column, scale_mode, scale)
|| ScaleForRightType<T, Float64>::apply(column, scale_mode, scale)))
{
throw Exception("Internal error", ErrorCodes::LOGICAL_ERROR);
}
}
};
/** The main template that applies the rounding function to a value or column.
*/
template <typename T, RoundingMode rounding_mode, ScaleMode scale_mode>
struct Cruncher
{
using Op = FunctionRoundingImpl<T, rounding_mode, scale_mode>;
static inline void apply(Block & block, const ColumnVector<T> * col, const ColumnNumbers & arguments, size_t result, size_t scale)
{
auto col_res = std::make_shared<ColumnVector<T>>();
block.getByPosition(result).column = col_res;
@ -876,45 +426,31 @@ struct Cruncher
if (vec_res.empty())
return;
Op::apply(col->getData(), scale, vec_res);
}
};
/** Select the appropriate processing algorithm depending on the scale.
*/
template <typename T, typename ColumnType, RoundingMode rounding_mode>
struct Dispatcher
{
static inline void apply(Block & block, const ColumnType * col, const ColumnNumbers & arguments, size_t result)
{
ScaleMode scale_mode;
size_t scale;
if (arguments.size() == 2)
ScaleForLeftType<T>::apply(block.getByPosition(arguments[1]).column, scale_mode, scale);
else
if (scale_arg == 0)
{
scale_mode = ScaleMode::Zero;
scale = 1;
FunctionRoundingImpl<T, rounding_mode, ScaleMode::Zero>::apply(col->getData(), scale, vec_res);
}
else if (scale_arg > 0)
{
scale_mode = ScaleMode::Positive;
scale = pow(10, scale_arg);
FunctionRoundingImpl<T, rounding_mode, ScaleMode::Positive>::apply(col->getData(), scale, vec_res);
}
if (scale_mode == ScaleMode::Positive)
Cruncher<T, rounding_mode, ScaleMode::Positive>::apply(block, col, arguments, result, scale);
else if (scale_mode == ScaleMode::Zero)
Cruncher<T, rounding_mode, ScaleMode::Zero>::apply(block, col, arguments, result, scale);
else if (scale_mode == ScaleMode::Negative)
Cruncher<T, rounding_mode, ScaleMode::Negative>::apply(block, col, arguments, result, scale);
else if (scale_mode == ScaleMode::Null)
Cruncher<T, rounding_mode, ScaleMode::Null>::apply(block, col, arguments, result, scale);
else
throw Exception("Illegal operation", ErrorCodes::LOGICAL_ERROR);
{
scale_mode = ScaleMode::Negative;
scale = pow(10, -scale_arg);
FunctionRoundingImpl<T, rounding_mode, ScaleMode::Negative>::apply(col->getData(), scale, vec_res);
}
}
};
/** A template for functions that round the value of an input parameter of type
* (U)Int8/16/32/64 or Float32/64, and accept an additional optional
* parameter (default is 0).
*/
* (U)Int8/16/32/64 or Float32/64, and accept an additional optional
* parameter (default is 0).
*/
template <typename Name, RoundingMode rounding_mode>
class FunctionRounding : public IFunction
{
@ -923,18 +459,12 @@ public:
static FunctionPtr create(const Context & context) { return std::make_shared<FunctionRounding>(); }
private:
template <typename T>
bool checkType(const IDataType * type) const
{
return typeid_cast<const T *>(type);
}
template <typename T>
bool executeForType(Block & block, const ColumnNumbers & arguments, size_t result)
{
if (auto col = checkAndGetColumn<ColumnVector<T>>(block.getByPosition(arguments[0]).column.get()))
{
Dispatcher<T, ColumnVector<T>, rounding_mode>::apply(block, col, arguments, result);
Dispatcher<T, rounding_mode>::apply(block, col, arguments, result);
return true;
}
return false;
@ -957,29 +487,10 @@ public:
+ toString(arguments.size()) + ", should be 1 or 2.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (arguments.size() == 2)
{
const IDataType * type = &*arguments[1];
if (!( checkType<DataTypeUInt8>(type)
|| checkType<DataTypeUInt16>(type)
|| checkType<DataTypeUInt32>(type)
|| checkType<DataTypeUInt64>(type)
|| checkType<DataTypeInt8>(type)
|| checkType<DataTypeInt16>(type)
|| checkType<DataTypeInt32>(type)
|| checkType<DataTypeInt64>(type)
|| checkType<DataTypeFloat32>(type)
|| checkType<DataTypeFloat64>(type)))
{
throw Exception("Illegal type in second argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
}
const IDataType * type = &*arguments[0];
if (!type->behavesAsNumber())
throw Exception("Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
for (const auto & type : arguments)
if (!type->behavesAsNumber())
throw Exception("Illegal type " + arguments[0]->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return arguments[0];
}
@ -1017,6 +528,7 @@ public:
}
};
struct NameRoundToExp2 { static constexpr auto name = "roundToExp2"; };
struct NameRoundDuration { static constexpr auto name = "roundDuration"; };
struct NameRoundAge { static constexpr auto name = "roundAge"; };