dbms: Server: performance optimizations. [#METR-15210]

This commit is contained in:
Alexey Arno 2015-05-20 18:34:09 +03:00
parent 793d1accfc
commit 1143cfb7ad
2 changed files with 68 additions and 25 deletions

View File

@ -226,7 +226,6 @@ template<bool compute_marginal_moments>
class BaseCovarianceData
{
protected:
virtual ~BaseCovarianceData() = default;
void incrementMarginalMoments(Float64 left_incr, Float64 right_incr) {}
void mergeWith(const BaseCovarianceData & source) {}
void serialize(WriteBuffer & buf) const {}
@ -237,8 +236,6 @@ template<>
class BaseCovarianceData<true>
{
protected:
virtual ~BaseCovarianceData() = default;
void incrementMarginalMoments(Float64 left_incr, Float64 right_incr)
{
left_m2 += left_incr;

View File

@ -94,24 +94,34 @@ namespace DB
/// Реализация функций округления на низком уровне.
template<typename T, int rounding_mode>
template<typename T, int rounding_mode, bool with_scale>
struct RoundingComputation
{
};
template<int rounding_mode>
struct RoundingComputation<Float32, rounding_mode>
template<int rounding_mode, bool with_scale>
struct RoundingComputation<Float32, rounding_mode, with_scale>
{
using Data = std::array<Float32, 4>;
using Scale = __m128;
static inline void prepareScale(size_t scale, Scale & mm_scale)
template<bool with_scale2 = with_scale>
static inline void prepareScale(size_t scale, Scale & mm_scale,
typename std::enable_if<with_scale2>::type * = nullptr)
{
Float32 fscale = static_cast<Float32>(scale);
mm_scale = _mm_load1_ps(&fscale);
}
static inline void compute(const Data & in, const Scale & mm_scale, Data & out)
template<bool with_scale2 = with_scale>
static inline void prepareScale(size_t scale, Scale & mm_scale,
typename std::enable_if<!with_scale2>::type * = nullptr)
{
}
template<bool with_scale2 = with_scale>
static inline void compute(const Data & in, const Scale & mm_scale, Data & out,
typename std::enable_if<with_scale2>::type * = nullptr)
{
__m128 mm_value = _mm_loadu_ps(reinterpret_cast<const Float32 *>(&in));
mm_value = _mm_mul_ps(mm_value, mm_scale);
@ -119,21 +129,40 @@ namespace DB
mm_value = _mm_div_ps(mm_value, mm_scale);
_mm_storeu_ps(reinterpret_cast<Float32 *>(&out), mm_value);
}
template<bool with_scale2 = with_scale>
static inline void compute(const Data & in, const Scale & mm_scale, Data & out,
typename std::enable_if<!with_scale2>::type * = nullptr)
{
__m128 mm_value = _mm_loadu_ps(reinterpret_cast<const Float32 *>(&in));
mm_value = _mm_round_ps(mm_value, rounding_mode);
_mm_storeu_ps(reinterpret_cast<Float32 *>(&out), mm_value);
}
};
template<int rounding_mode>
struct RoundingComputation<Float64, rounding_mode>
template<int rounding_mode, bool with_scale>
struct RoundingComputation<Float64, rounding_mode, with_scale>
{
using Data = std::array<Float64, 2>;
using Scale = __m128d;
static inline void prepareScale(size_t scale, Scale & mm_scale)
template<bool with_scale2 = with_scale>
static inline void prepareScale(size_t scale, Scale & mm_scale,
typename std::enable_if<with_scale2>::type * = nullptr)
{
Float64 fscale = static_cast<Float64>(scale);
mm_scale = _mm_load1_pd(&fscale);
}
static inline void compute(const Data & in, const Scale & mm_scale, Data & out)
template<bool with_scale2 = with_scale>
static inline void prepareScale(size_t scale, Scale & mm_scale,
typename std::enable_if<!with_scale2>::type * = nullptr)
{
}
template<bool with_scale2 = with_scale>
static inline void compute(const Data & in, const Scale & mm_scale, Data & out,
typename std::enable_if<with_scale2>::type * = nullptr)
{
__m128d mm_value = _mm_loadu_pd(reinterpret_cast<const Float64 *>(&in));
mm_value = _mm_mul_pd(mm_value, mm_scale);
@ -141,18 +170,27 @@ namespace DB
mm_value = _mm_div_pd(mm_value, mm_scale);
_mm_storeu_pd(reinterpret_cast<Float64 *>(&out), mm_value);
}
template<bool with_scale2 = with_scale>
static inline void compute(const Data & in, const Scale & mm_scale, Data & out,
typename std::enable_if<!with_scale2>::type * = nullptr)
{
__m128d mm_value = _mm_loadu_pd(reinterpret_cast<const Float64 *>(&in));
mm_value = _mm_round_pd(mm_value, rounding_mode);
_mm_storeu_pd(reinterpret_cast<Float64 *>(&out), mm_value);
}
};
/// Реализация функций округления на высоком уровне.
template<typename T, int rounding_mode, typename Enable = void>
template<typename T, int rounding_mode, bool with_scale, typename Enable = void>
struct FunctionRoundingImpl
{
};
/// В случае целочисленных значений не выполяется округления.
template<typename T, int rounding_mode>
struct FunctionRoundingImpl<T, rounding_mode, typename std::enable_if<std::is_integral<T>::value>::type>
template<typename T, int rounding_mode, bool with_scale>
struct FunctionRoundingImpl<T, rounding_mode, with_scale, typename std::enable_if<std::is_integral<T>::value>::type>
{
static inline void apply(const PODArray<T> & in, size_t scale, typename ColumnVector<T>::Container_t & out)
{
@ -167,11 +205,11 @@ namespace DB
}
};
template<typename T, int rounding_mode>
struct FunctionRoundingImpl<T, rounding_mode, typename std::enable_if<std::is_floating_point<T>::value>::type>
template<typename T, int rounding_mode, bool with_scale>
struct FunctionRoundingImpl<T, rounding_mode, with_scale, typename std::enable_if<std::is_floating_point<T>::value>::type>
{
private:
using Op = RoundingComputation<T, rounding_mode>;
using Op = RoundingComputation<T, rounding_mode, with_scale>;
using Data = typename Op::Data;
using Scale = typename Op::Scale;
@ -347,21 +385,25 @@ namespace
template<typename T>
bool executeForType(Block & block, const ColumnNumbers & arguments, size_t result)
{
using Op = FunctionRoundingImpl<T, rounding_mode>;
using OpWithScale = FunctionRoundingImpl<T, rounding_mode, true>;
using OpWithoutScale = FunctionRoundingImpl<T, rounding_mode, false>;
if (ColumnVector<T> * col = typeid_cast<ColumnVector<T> *>(&*block.getByPosition(arguments[0]).column))
{
UInt8 precision = 0;
if (arguments.size() == 2)
precision = getPrecision<T>(block.getByPosition(arguments[1]).column);
ColumnVector<T> * col_res = new ColumnVector<T>;
block.getByPosition(result).column = col_res;
typename ColumnVector<T>::Container_t & vec_res = col_res->getData();
vec_res.resize(col->getData().size());
Op::apply(col->getData(), PowersOf10::values[precision], vec_res);
UInt8 precision = 0;
if (arguments.size() == 2)
precision = getPrecision<T>(block.getByPosition(arguments[1]).column);
if (precision > 0)
OpWithScale::apply(col->getData(), PowersOf10::values[precision], vec_res);
else
OpWithoutScale::apply(col->getData(), 0, vec_res);
return true;
}
@ -371,7 +413,11 @@ namespace
if (arguments.size() == 2)
precision = getPrecision<T>(block.getByPosition(arguments[1]).column);
T res = Op::apply(col->getData(), PowersOf10::values[precision]);
T res;
if (precision > 0)
res = OpWithScale::apply(col->getData(), PowersOf10::values[precision]);
else
res = OpWithoutScale::apply(col->getData(), 0);
ColumnConst<T> * col_res = new ColumnConst<T>(col->size(), res);
block.getByPosition(result).column = col_res;