decimal comparisons

This commit is contained in:
chertus 2018-07-25 22:38:21 +03:00
parent af00724c52
commit c1c149d74b
11 changed files with 397 additions and 88 deletions

View File

@ -30,29 +30,29 @@ namespace ErrorCodes
}
template <typename T>
StringRef ColumnVector<T>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
template <typename T, bool _s>
StringRef ColumnVector<T, _s>::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
auto pos = arena.allocContinue(sizeof(T), begin);
memcpy(pos, &data[n], sizeof(T));
return StringRef(pos, sizeof(T));
}
template <typename T>
const char * ColumnVector<T>::deserializeAndInsertFromArena(const char * pos)
template <typename T, bool _s>
const char * ColumnVector<T, _s>::deserializeAndInsertFromArena(const char * pos)
{
data.push_back(*reinterpret_cast<const T *>(pos));
return pos + sizeof(T);
}
template <typename T>
void ColumnVector<T>::updateHashWithValue(size_t n, SipHash & hash) const
template <typename T, bool _s>
void ColumnVector<T, _s>::updateHashWithValue(size_t n, SipHash & hash) const
{
hash.update(data[n]);
}
template <typename T>
struct ColumnVector<T>::less
template <typename T, bool _s>
struct ColumnVector<T, _s>::less
{
const Self & parent;
int nan_direction_hint;
@ -60,8 +60,8 @@ struct ColumnVector<T>::less
bool operator()(size_t lhs, size_t rhs) const { return CompareHelper<T>::less(parent.data[lhs], parent.data[rhs], nan_direction_hint); }
};
template <typename T>
struct ColumnVector<T>::greater
template <typename T, bool _s>
struct ColumnVector<T, _s>::greater
{
const Self & parent;
int nan_direction_hint;
@ -69,8 +69,8 @@ struct ColumnVector<T>::greater
bool operator()(size_t lhs, size_t rhs) const { return CompareHelper<T>::greater(parent.data[lhs], parent.data[rhs], nan_direction_hint); }
};
template <typename T>
void ColumnVector<T>::getPermutation(bool reverse, size_t limit, int nan_direction_hint, IColumn::Permutation & res) const
template <typename T, bool _s>
void ColumnVector<T, _s>::getPermutation(bool reverse, size_t limit, int nan_direction_hint, IColumn::Permutation & res) const
{
size_t s = data.size();
res.resize(s);
@ -96,14 +96,14 @@ void ColumnVector<T>::getPermutation(bool reverse, size_t limit, int nan_directi
}
}
template <typename T>
const char * ColumnVector<T>::getFamilyName() const
template <typename T, bool _s>
const char * ColumnVector<T, _s>::getFamilyName() const
{
return TypeName<T>::get();
}
template <typename T>
MutableColumnPtr ColumnVector<T>::cloneResized(size_t size) const
template <typename T, bool _s>
MutableColumnPtr ColumnVector<T, _s>::cloneResized(size_t size) const
{
auto res = this->create();
@ -122,14 +122,14 @@ MutableColumnPtr ColumnVector<T>::cloneResized(size_t size) const
return std::move(res);
}
template <typename T>
UInt64 ColumnVector<T>::get64(size_t n) const
template <typename T, bool _s>
UInt64 ColumnVector<T, _s>::get64(size_t n) const
{
return ext::bit_cast<UInt64>(data[n]);
}
template <typename T>
void ColumnVector<T>::insertRangeFrom(const IColumn & src, size_t start, size_t length)
template <typename T, bool _s>
void ColumnVector<T, _s>::insertRangeFrom(const IColumn & src, size_t start, size_t length)
{
const ColumnVector & src_vec = static_cast<const ColumnVector &>(src);
@ -145,8 +145,8 @@ void ColumnVector<T>::insertRangeFrom(const IColumn & src, size_t start, size_t
memcpy(&data[old_size], &src_vec.data[start], length * sizeof(data[0]));
}
template <typename T>
ColumnPtr ColumnVector<T>::filter(const IColumn::Filter & filt, ssize_t result_size_hint) const
template <typename T, bool _s>
ColumnPtr ColumnVector<T, _s>::filter(const IColumn::Filter & filt, ssize_t result_size_hint) const
{
size_t size = data.size();
if (size != filt.size())
@ -209,8 +209,8 @@ ColumnPtr ColumnVector<T>::filter(const IColumn::Filter & filt, ssize_t result_s
return std::move(res);
}
template <typename T>
ColumnPtr ColumnVector<T>::permute(const IColumn::Permutation & perm, size_t limit) const
template <typename T, bool _s>
ColumnPtr ColumnVector<T, _s>::permute(const IColumn::Permutation & perm, size_t limit) const
{
size_t size = data.size();
@ -230,8 +230,8 @@ ColumnPtr ColumnVector<T>::permute(const IColumn::Permutation & perm, size_t lim
return std::move(res);
}
template <typename T>
ColumnPtr ColumnVector<T>::replicate(const IColumn::Offsets & offsets) const
template <typename T, bool _s>
ColumnPtr ColumnVector<T, _s>::replicate(const IColumn::Offsets & offsets) const
{
size_t size = data.size();
if (size != offsets.size())
@ -257,14 +257,14 @@ ColumnPtr ColumnVector<T>::replicate(const IColumn::Offsets & offsets) const
return std::move(res);
}
template <typename T>
void ColumnVector<T>::gather(ColumnGathererStream & gatherer)
template <typename T, bool _s>
void ColumnVector<T, _s>::gather(ColumnGathererStream & gatherer)
{
gatherer.gather(*this);
}
template <typename T>
void ColumnVector<T>::getExtremes(Field & min, Field & max) const
template <typename T, bool _s>
void ColumnVector<T, _s>::getExtremes(Field & min, Field & max) const
{
size_t size = data.size();
@ -310,16 +310,20 @@ void ColumnVector<T>::getExtremes(Field & min, Field & max) const
}
/// Explicit template instantiations - to avoid code bloat in headers.
template class ColumnVector<UInt8>;
template class ColumnVector<UInt16>;
template class ColumnVector<UInt32>;
template class ColumnVector<UInt64>;
template class ColumnVector<UInt128>;
template class ColumnVector<Int8>;
template class ColumnVector<Int16>;
template class ColumnVector<Int32>;
template class ColumnVector<Int64>;
template class ColumnVector<Int128>;
template class ColumnVector<Float32>;
template class ColumnVector<Float64>;
template class ColumnVector<UInt8, true>;
template class ColumnVector<UInt16, true>;
template class ColumnVector<UInt32, true>;
template class ColumnVector<UInt64, true>;
template class ColumnVector<UInt128, true>;
template class ColumnVector<Int8, true>;
template class ColumnVector<Int16, true>;
template class ColumnVector<Int32, true>;
template class ColumnVector<Int64, true>;
template class ColumnVector<Int128, true>;
template class ColumnVector<Float32, true>;
template class ColumnVector<Float64, true>;
template class ColumnVector<Int32, false>;
template class ColumnVector<Int64, false>;
template class ColumnVector<Int128, false>;
}

View File

@ -118,14 +118,14 @@ template <> inline UInt64 unionCastToUInt64(Float32 x)
/** A template for columns that use a simple array to store.
*/
template <typename T>
class ColumnVector final : public COWPtrHelper<IColumn, ColumnVector<T>>
* If _simpleType then T and columnType are the same.
*/
template <typename T, bool _simpleType = true>
class ColumnVector final : public COWPtrHelper<IColumn, ColumnVector<T, _simpleType>>
{
private:
friend class COWPtrHelper<IColumn, ColumnVector<T>>;
using Self = ColumnVector<T>;
using Self = ColumnVector<T, _simpleType>;
friend class COWPtrHelper<IColumn, Self>;
struct less;
struct greater;
@ -144,7 +144,7 @@ private:
ColumnVector(std::initializer_list<T> il) : data{il} {}
public:
bool isNumeric() const override { return IsNumber<T>; }
bool isNumeric() const override { return _simpleType && IsNumber<T>; }
size_t size() const override
{

View File

@ -48,8 +48,10 @@ std::enable_if_t<std::is_class_v<T>, T> NaNOrZero()
return T{};
}
#if 1 /// __int128
template <typename T>
std::enable_if_t<std::is_same_v<T, __int128>, __int128> NaNOrZero()
{
return __int128(0);
}
#endif

View File

@ -139,7 +139,6 @@ inline bool_if_double_can_be_used<TAInt, TAFloat> equalsOpTmpl(TAFloat a, TAInt
return static_cast<double>(a) == static_cast<double>(b);
}
/* Final realiztions */
@ -333,6 +332,15 @@ inline bool equalsOp<DB::Float32, DB::UInt128>(DB::Float32 f, DB::UInt128 u)
return equalsOp(static_cast<DB::Float64>(f), u);
}
inline bool greaterOp(DB::Int128 i, DB::Float64 f) { return static_cast<DB::Int128>(f) < i; }
inline bool greaterOp(DB::Int128 i, DB::Float32 f) { return static_cast<DB::Int128>(f) < i; }
inline bool greaterOp(DB::Float64 f, DB::Int128 i) { return static_cast<DB::Int128>(f) > i; }
inline bool greaterOp(DB::Float32 f, DB::Int128 i) { return static_cast<DB::Int128>(f) > i; }
inline bool equalsOp(DB::Int128 i, DB::Float64 f) { return i == static_cast<DB::Int128>(f) && static_cast<DB::Float64>(i) == f; }
inline bool equalsOp(DB::Int128 i, DB::Float32 f) { return i == static_cast<DB::Int128>(f) && static_cast<DB::Float32>(i) == f; }
inline bool equalsOp(DB::Float64 f, DB::Int128 i) { return equalsOp(i, f); }
inline bool equalsOp(DB::Float32 f, DB::Int128 i) { return equalsOp(i, f); }
template <typename A, typename B>
inline bool_if_not_safe_conversion<A, B> notEqualsOp(A a, B b)

View File

@ -21,7 +21,6 @@ using Int8 = int8_t;
using Int16 = int16_t;
using Int32 = int32_t;
using Int64 = int64_t;
using Int128 = __int128;
using Float32 = float;
using Float64 = double;
@ -54,7 +53,6 @@ template <> struct TypeName<Int8> { static const char * get() { return "Int8"
template <> struct TypeName<Int16> { static const char * get() { return "Int16"; } };
template <> struct TypeName<Int32> { static const char * get() { return "Int32"; } };
template <> struct TypeName<Int64> { static const char * get() { return "Int64"; } };
template <> struct TypeName<Int128> { static const char * get() { return "Int128"; } };
template <> struct TypeName<Float32> { static const char * get() { return "Float32"; } };
template <> struct TypeName<Float64> { static const char * get() { return "Float64"; } };
template <> struct TypeName<String> { static const char * get() { return "String"; } };
@ -64,3 +62,39 @@ template <> struct TypeName<String> { static const char * get() { return "Strin
using Strings = std::vector<String>;
}
#if 1 /// __int128
namespace DB
{
using Int128 = __int128;
template <> constexpr bool IsNumber<Int128> = true;
template <> struct TypeName<Int128> { static const char * get() { return "Int128"; } };
}
namespace std
{
template <> struct is_signed<__int128>
{
static constexpr bool value = true;
};
template <> struct is_unsigned<__int128>
{
static constexpr bool value = false;
};
template <> struct is_integral<__int128>
{
static constexpr bool value = true;
};
template <> struct is_arithmetic<__int128>
{
static constexpr bool value = true;
};
}
#endif

View File

@ -2,7 +2,6 @@
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeFactory.h>
#include <Columns/ColumnVector.h>
//#include <Columns/ColumnConst.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
@ -35,13 +34,15 @@ std::string DataTypeDecimal<T>::getName() const
template <typename T>
bool DataTypeDecimal<T>::equals(const IDataType & rhs) const
{
return typeid(rhs) == typeid(*this) && getName() == rhs.getName();
if (auto * ptype = typeid_cast<const DataTypeDecimal<T> *>(&rhs))
return scale == ptype->getScale();
return false;
}
template <typename T>
void DataTypeDecimal<T>::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings &) const
{
const T & value = static_cast<const ColumnVector<T> &>(column).getData()[row_num];
const T & value = static_cast<const ColumnType &>(column).getData()[row_num];
writeIntText(wholePart(value), ostr);
writeChar('.', ostr);
@ -56,42 +57,42 @@ void DataTypeDecimal<T>::deserializeText(IColumn & column, ReadBuffer & istr, co
UInt32 unread_scale = scale;
readDecimalText(istr, x, precision, unread_scale);
x *= getScaleMultiplier(unread_scale);
static_cast<ColumnVector<T> &>(column).getData().push_back(x);
static_cast<ColumnType &>(column).getData().push_back(x);
}
template <typename T>
void DataTypeDecimal<T>::serializeBinary(const Field & field, WriteBuffer & ostr) const
{
/// ColumnVector<T>::value_type is a narrower type. For example, UInt8, when the Field type is UInt64
typename ColumnVector<T>::value_type x = get<FieldType>(field);
/// ColumnType::value_type is a narrower type. For example, UInt8, when the Field type is UInt64
typename ColumnType::value_type x = get<FieldType>(field);
writeBinary(x, ostr);
}
template <typename T>
void DataTypeDecimal<T>::serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr) const
{
writeBinary(static_cast<const ColumnVector<T> &>(column).getData()[row_num], ostr);
writeBinary(static_cast<const ColumnType &>(column).getData()[row_num], ostr);
}
template <typename T>
void DataTypeDecimal<T>::serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const
{
const typename ColumnVector<T>::Container & x = typeid_cast<const ColumnVector<T> &>(column).getData();
const typename ColumnType::Container & x = typeid_cast<const ColumnType &>(column).getData();
size_t size = x.size();
if (limit == 0 || offset + limit > size)
limit = size - offset;
ostr.write(reinterpret_cast<const char *>(&x[offset]), sizeof(typename ColumnVector<T>::value_type) * limit);
ostr.write(reinterpret_cast<const char *>(&x[offset]), sizeof(typename ColumnType::value_type) * limit);
}
template <typename T>
void DataTypeDecimal<T>::deserializeBinary(Field & field, ReadBuffer & istr) const
{
typename ColumnVector<T>::value_type x;
typename ColumnType::value_type x;
readBinary(x, istr);
field = FieldType(x);
}
@ -99,19 +100,19 @@ void DataTypeDecimal<T>::deserializeBinary(Field & field, ReadBuffer & istr) con
template <typename T>
void DataTypeDecimal<T>::deserializeBinary(IColumn & column, ReadBuffer & istr) const
{
typename ColumnVector<T>::value_type x;
typename ColumnType::value_type x;
readBinary(x, istr);
static_cast<ColumnVector<T> &>(column).getData().push_back(x);
static_cast<ColumnType &>(column).getData().push_back(x);
}
template <typename T>
void DataTypeDecimal<T>::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr, size_t limit, double ) const
{
typename ColumnVector<T>::Container & x = typeid_cast<ColumnVector<T> &>(column).getData();
typename ColumnType::Container & x = typeid_cast<ColumnType &>(column).getData();
size_t initial_size = x.size();
x.resize(initial_size + limit);
size_t size = istr.readBig(reinterpret_cast<char*>(&x[initial_size]), sizeof(typename ColumnVector<T>::value_type) * limit);
x.resize(initial_size + size / sizeof(typename ColumnVector<T>::value_type));
size_t size = istr.readBig(reinterpret_cast<char*>(&x[initial_size]), sizeof(typename ColumnType::value_type) * limit);
x.resize(initial_size + size / sizeof(typename ColumnType::value_type));
}
@ -125,7 +126,7 @@ Field DataTypeDecimal<T>::getDefault() const
template <typename T>
MutableColumnPtr DataTypeDecimal<T>::createColumn() const
{
return ColumnVector<T>::create();
return ColumnType::create();
}

View File

@ -1,6 +1,8 @@
#pragma once
#include <common/likely.h>
#include <Common/typeid_cast.h>
#include <DataTypes/IDataType.h>
#include <Columns/ColumnVector.h>
namespace DB
@ -58,6 +60,16 @@ class DataTypeSimpleSerialization : public IDataType
};
/// Enum for IDataType to DataTypeDecimal convertion.
enum class DecimalPrecision
{
None = 0,
I32 = 9,
I64 = 18,
I128 = 38,
};
static constexpr size_t minDecimalPrecision() { return 1; }
template <typename T> static constexpr size_t maxDecimalPrecision();
template <> constexpr size_t maxDecimalPrecision<Int32>() { return 9; }
@ -81,6 +93,7 @@ class DataTypeDecimal final : public DataTypeSimpleSerialization
public:
using UnderlyingType = T;
using FieldType = typename NearestFieldType<T>::Type;
using ColumnType = ColumnVector<T, false>;
static constexpr bool is_parametric = true;
@ -166,11 +179,24 @@ public:
return true;
}
/// @returns multiplier for T to become UnderlyingType of result_type with correct scale
template <typename R>
R scaleFactor(const DataTypeDecimal<R> & result_type) const
{
if (getScale() > result_type.getScale())
throw Exception("Decimal result's scale is less then argiment's one", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
UInt32 scale_delta = result_type.getScale() - getScale(); /// scale_delta >= 0
return DataTypeDecimal<R>::getScaleMultiplier(scale_delta);
}
private:
const UInt32 precision;
const UInt32 scale; /// TODO: should we support scales out of [0, precision]?
static T getScaleMultiplier(UInt32 scale);
template <typename>
friend class DataTypeDecimal;
};
@ -178,15 +204,50 @@ template <typename T, typename U>
typename std::enable_if_t<(sizeof(T) >= sizeof(U)), const DataTypeDecimal<T>>
decimalResultType(const DataTypeDecimal<T> & tx, const DataTypeDecimal<U> & ty)
{
return DataTypeDecimal<T>(maxDecimalPrecision<T>(), max(tx.getScale(), ty.getScale()));
return DataTypeDecimal<T>(maxDecimalPrecision<T>(), (tx.getScale() > ty.getScale() ? tx.getScale() : ty.getScale()));
}
template <typename T, typename U>
typename std::enable_if_t<(sizeof(T) < sizeof(U)), const DataTypeDecimal<U>>
decimalResultType(const DataTypeDecimal<T> & tx, const DataTypeDecimal<U> & ty)
{
return DataTypeDecimal<U>(maxDecimalPrecision<U>(), max(tx.getScale(), ty.getScale()));
return DataTypeDecimal<U>(maxDecimalPrecision<U>(), (tx.getScale() > ty.getScale() ? tx.getScale() : ty.getScale()));
}
template <typename T>
inline const DataTypeDecimal<T> * checkDecimal(const IDataType & data_type)
{
return typeid_cast<const DataTypeDecimal<T> *>(&data_type);
}
inline DecimalPrecision checkDecimal(const IDataType & data_type)
{
if (typeid_cast<const DataTypeDecimal<Int32> *>(&data_type))
return DecimalPrecision::I32;
if (typeid_cast<const DataTypeDecimal<Int64> *>(&data_type))
return DecimalPrecision::I64;
if (typeid_cast<const DataTypeDecimal<Int128> *>(&data_type))
return DecimalPrecision::I128;
return DecimalPrecision::None;
}
inline bool isDecimal(DecimalPrecision precision) { return precision != DecimalPrecision::None; }
inline bool isDecimal(const IDataType & data_type) { return isDecimal(checkDecimal(data_type)); }
///
inline bool notDecimalButComparableToDecimal(const IDataType & data_type)
{
if (data_type.isInteger())
return true;
return false;
}
///
inline bool comparableToDecimal(const IDataType & data_type)
{
if (data_type.isInteger())
return true;
return isDecimal(data_type);
}
}

View File

@ -8,6 +8,7 @@
#include <Columns/ColumnArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeString.h>
@ -39,7 +40,7 @@ namespace DB
* The comparison functions always return 0 or 1 (UInt8).
*
* You can compare the following types:
* - numbers;
* - numbers and decimals;
* - strings and fixed strings;
* - dates;
* - datetimes;
@ -188,6 +189,174 @@ struct NumComparisonImpl
}
};
///
template <typename A, typename B, typename Op>
class DecimalComparison
{
public:
static bool apply(Block & block, size_t result, const ColumnWithTypeAndName & col_left, const ColumnWithTypeAndName & col_right)
{
Shift shift;
if (!allowed(col_left, col_right, shift))
return false;
if (ColumnPtr c_res = apply(col_left, col_right, shift))
{
block.getByPosition(result).column = std::move(c_res);
return true;
}
return false;
}
private:
struct Shift
{
Int128 a = 1;
Int128 b = 1;
};
/// @returns false if either of decimals has another type.
static bool allowed(const ColumnWithTypeAndName & col_left, const ColumnWithTypeAndName & col_right, Shift & shift)
{
const DataTypePtr & left_type = col_left.type;
const DataTypePtr & right_type = col_right.type;
if (const DataTypeDecimal<A> * decimal0 = checkDecimal<A>(*left_type))
{
if (const DataTypeDecimal<B> * decimal1 = checkDecimal<B>(*right_type))
{
shift.a = decimal0->scaleFactor(decimalResultType(*decimal0, *decimal1));
shift.b = decimal1->scaleFactor(decimalResultType(*decimal0, *decimal1));
}
else if (notDecimalButComparableToDecimal(*right_type))
shift.b = decimal0->getScaleMultiplier();
else
return false;
}
else if (const DataTypeDecimal<B> * decimal1 = checkDecimal<B>(*right_type))
{
if (!notDecimalButComparableToDecimal(*left_type))
shift.a = decimal1->getScaleMultiplier();
else
return false;
}
else
return false;
return true;
}
static ColumnPtr apply(const ColumnWithTypeAndName & col_left, const ColumnWithTypeAndName & col_right, const Shift & shift)
{
const ColumnPtr & c0 = col_left.column;
const ColumnPtr & c1 = col_right.column;
bool c0_const = c0->isColumnConst();
bool c1_const = c1->isColumnConst();
if (c0_const && c1_const)
{
const ColumnConst * c0_const = checkAndGetColumnConst<ColumnVector<A>>(c0.get());
const ColumnConst * c1_const = checkAndGetColumnConst<ColumnVector<B>>(c1.get());
UInt8 res = apply(c0_const->template getValue<A>(), c1_const->template getValue<B>(), shift);
return DataTypeUInt8().createColumnConst(c0->size(), toField(res));
}
auto c_res = ColumnUInt8::create();
ColumnUInt8::Container & vec_res = c_res->getData();
vec_res.resize(c0->size());
if (c0_const)
{
const ColumnConst * c0_const = checkAndGetColumnConst<ColumnVector<A>>(c0.get());
if (const ColumnVector<B> * c1_vec = checkAndGetColumn<ColumnVector<B>>(c1.get()))
constant_vector(c0_const->template getValue<A>(), c1_vec->getData(), vec_res, shift);
else if (const ColumnVector<B, false> * c1_vec = checkAndGetColumn<ColumnVector<B, false>>(c1.get()))
constant_vector(c0_const->template getValue<A>(), c1_vec->getData(), vec_res, shift);
}
else if (c1_const)
{
const ColumnConst * c1_const = checkAndGetColumnConst<ColumnVector<B>>(c1.get());
if (const ColumnVector<A> * c0_vec = checkAndGetColumn<ColumnVector<A>>(c0.get()))
vector_constant(c0_vec->getData(), c1_const->template getValue<B>(), vec_res, shift);
else if (const ColumnVector<A, false> * c0_vec = checkAndGetColumn<ColumnVector<A, false>>(c0.get()))
vector_constant(c0_vec->getData(), c1_const->template getValue<B>(), vec_res, shift);
}
else
{
if (const ColumnVector<A> * c0_vec = checkAndGetColumn<ColumnVector<A>>(c0.get()))
{
if (const ColumnVector<B> * c1_vec = checkAndGetColumn<ColumnVector<B>>(c1.get()))
vector_vector(c0_vec->getData(), c1_vec->getData(), vec_res, shift);
else if (const ColumnVector<B, false> * c1_vec = checkAndGetColumn<ColumnVector<B, false>>(c1.get()))
vector_vector(c0_vec->getData(), c1_vec->getData(), vec_res, shift);
}
else if (const ColumnVector<A, false> * c0_vec = checkAndGetColumn<ColumnVector<A, false>>(c0.get()))
{
if (const ColumnVector<B> * c1_vec = checkAndGetColumn<ColumnVector<B>>(c1.get()))
vector_vector(c0_vec->getData(), c1_vec->getData(), vec_res, shift);
else if (const ColumnVector<B, false> * c1_vec = checkAndGetColumn<ColumnVector<B, false>>(c1.get()))
vector_vector(c0_vec->getData(), c1_vec->getData(), vec_res, shift);
}
}
return c_res;
}
static NO_INLINE UInt8 apply(A a, B b, const Shift & shift)
{
return Op::apply(a * shift.a, b * shift.b);
}
static void NO_INLINE vector_vector(const PaddedPODArray<A> & a, const PaddedPODArray<B> & b, PaddedPODArray<UInt8> & c,
const Shift & shift)
{
size_t size = a.size();
const A * a_pos = &a[0];
const B * b_pos = &b[0];
UInt8 * c_pos = &c[0];
const A * a_end = a_pos + size;
while (a_pos < a_end)
{
*c_pos = apply(*a_pos, *b_pos, shift);
++a_pos;
++b_pos;
++c_pos;
}
}
static void NO_INLINE vector_constant(const PaddedPODArray<A> & a, B b, PaddedPODArray<UInt8> & c, const Shift & shift)
{
size_t size = a.size();
const A * a_pos = &a[0];
UInt8 * c_pos = &c[0];
const A * a_end = a_pos + size;
while (a_pos < a_end)
{
*c_pos = apply(*a_pos, b, shift);
++a_pos;
++c_pos;
}
}
static void NO_INLINE constant_vector(A a, const PaddedPODArray<B> & b, PaddedPODArray<UInt8> & c, const Shift & shift)
{
size_t size = b.size();
const B * b_pos = &b[0];
UInt8 * c_pos = &c[0];
const B * b_end = b_pos + size;
while (b_pos < b_end)
{
*c_pos = apply(a, *b_pos, shift);
++b_pos;
++c_pos;
}
}
};
inline int memcmp16(const void * a, const void * b)
{
@ -696,6 +865,7 @@ private:
|| executeNumRightType<T0, Int16>(block, result, col_left, col_right_untyped)
|| executeNumRightType<T0, Int32>(block, result, col_left, col_right_untyped)
|| executeNumRightType<T0, Int64>(block, result, col_left, col_right_untyped)
|| executeNumRightType<T0, Int128>(block, result, col_left, col_right_untyped)
|| executeNumRightType<T0, Float32>(block, result, col_left, col_right_untyped)
|| executeNumRightType<T0, Float64>(block, result, col_left, col_right_untyped))
return true;
@ -715,6 +885,7 @@ private:
|| executeNumConstRightType<T0, Int16>(block, result, col_left, col_right_untyped)
|| executeNumConstRightType<T0, Int32>(block, result, col_left, col_right_untyped)
|| executeNumConstRightType<T0, Int64>(block, result, col_left, col_right_untyped)
|| executeNumConstRightType<T0, Int128>(block, result, col_left, col_right_untyped)
|| executeNumConstRightType<T0, Float32>(block, result, col_left, col_right_untyped)
|| executeNumConstRightType<T0, Float64>(block, result, col_left, col_right_untyped))
return true;
@ -727,6 +898,28 @@ private:
return false;
}
bool executeDecimal(Block & block, size_t result, const ColumnWithTypeAndName & col_left, const ColumnWithTypeAndName & col_right)
{
if (DecimalComparison<Int32, Int32, Op<Int128, Int128>>::apply(block, result, col_left, col_right) ||
DecimalComparison<Int32, Int64, Op<Int128, Int128>>::apply(block, result, col_left, col_right) ||
DecimalComparison<Int32, Int128, Op<Int128, Int128>>::apply(block, result, col_left, col_right) ||
DecimalComparison<Int64, Int32, Op<Int128, Int128>>::apply(block, result, col_left, col_right) ||
DecimalComparison<Int64, Int64, Op<Int128, Int128>>::apply(block, result, col_left, col_right) ||
DecimalComparison<Int64, Int128, Op<Int128, Int128>>::apply(block, result, col_left, col_right) ||
DecimalComparison<Int128, Int32, Op<Int128, Int128>>::apply(block, result, col_left, col_right) ||
DecimalComparison<Int128, Int64, Op<Int128, Int128>>::apply(block, result, col_left, col_right) ||
DecimalComparison<Int128, Int128, Op<Int128, Int128>>::apply(block, result, col_left, col_right))
{
return true;
}
throw Exception("Not implemented " + getName() + " between " + col_left.type->getName() + " and " + col_right.type->getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
bool executeString(Block & block, size_t result, const IColumn * c0, const IColumn * c1)
{
const ColumnString * c0_string = checkAndGetColumn<ColumnString>(c0);
@ -1172,6 +1365,8 @@ public:
const auto & col_with_type_and_name_right = block.getByPosition(arguments[1]);
const IColumn * col_left_untyped = col_with_type_and_name_left.column.get();
const IColumn * col_right_untyped = col_with_type_and_name_right.column.get();
const DataTypePtr & left_type = col_with_type_and_name_left.type;
const DataTypePtr & right_type = col_with_type_and_name_right.type;
const bool left_is_num = col_left_untyped->isNumeric();
const bool right_is_num = col_right_untyped->isNumeric();
@ -1187,26 +1382,35 @@ public:
|| executeNumLeftType<Int16>(block, result, col_left_untyped, col_right_untyped)
|| executeNumLeftType<Int32>(block, result, col_left_untyped, col_right_untyped)
|| executeNumLeftType<Int64>(block, result, col_left_untyped, col_right_untyped)
|| executeNumLeftType<Int128>(block, result, col_left_untyped, col_right_untyped)
|| executeNumLeftType<Float32>(block, result, col_left_untyped, col_right_untyped)
|| executeNumLeftType<Float64>(block, result, col_left_untyped, col_right_untyped)))
throw Exception("Illegal column " + col_left_untyped->getName()
+ " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
else if (checkAndGetDataType<DataTypeTuple>(col_with_type_and_name_left.type.get()))
else if (checkAndGetDataType<DataTypeTuple>(left_type.get()))
{
executeTuple(block, result, col_with_type_and_name_left, col_with_type_and_name_right, input_rows_count);
}
else if (isDecimal(*left_type) || isDecimal(*right_type))
{
if (!comparableToDecimal(*left_type) || !comparableToDecimal(*right_type))
throw Exception("No operation " + getName() + " between " + left_type->getName() + " and " + right_type->getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
executeDecimal(block, result, col_with_type_and_name_left, col_with_type_and_name_right);
}
else if (!left_is_num && !right_is_num && executeString(block, result, col_left_untyped, col_right_untyped))
{
}
else if (col_with_type_and_name_left.type->equals(*col_with_type_and_name_right.type))
else if (left_type->equals(*right_type))
{
executeGenericIdenticalTypes(block, result, col_left_untyped, col_right_untyped);
}
else if (executeDateOrDateTimeOrEnumOrUUIDWithConstString(
block, result, col_left_untyped, col_right_untyped,
col_with_type_and_name_left.type, col_with_type_and_name_right.type,
left_type, right_type,
left_is_num, input_rows_count))
{
}

View File

@ -185,6 +185,7 @@ namespace detail
writeUIntText(static_cast<std::make_unsigned_t<T>>(x), buf);
}
#if 1
inline void writeSIntText(__int128 x, WriteBuffer & buf)
{
if (unlikely((x - 1) == 0))
@ -201,6 +202,7 @@ namespace detail
writeUIntText(static_cast<unsigned __int128>(x), buf);
}
#endif
}
@ -208,13 +210,9 @@ template <typename T>
std::enable_if_t<std::is_signed_v<T>, void> writeIntText(T x, WriteBuffer & buf)
{
detail::writeSIntText(x, buf);
}
template <typename T>
std::enable_if_t<std::is_same_v<T, Int128>, void> writeIntText(T x, WriteBuffer & buf)
{
detail::writeSIntText(x, buf);
}
template <typename T>
std::enable_if_t<std::is_unsigned_v<T>, void> writeIntText(T x, WriteBuffer & buf)

View File

@ -76,7 +76,7 @@ static Field convertNumericType(const Field & from, const IDataType & type)
template <typename From, typename To>
static Field convertIntToDecimalTypeImpl(const Field & from, const To & type)
{
using FieldType = typename NearestFieldType<typename To::UnderlyingType>::Type;
using FieldType = typename To::FieldType;
From value = from.get<From>();
if (!type.canStoreWhole(value))
@ -150,12 +150,9 @@ Field convertFieldToTypeImpl(const Field & src, const IDataType & type)
if (typeid_cast<const DataTypeInt64 *>(&type)) return convertNumericType<Int64>(src, type);
if (typeid_cast<const DataTypeFloat32 *>(&type)) return convertNumericType<Float32>(src, type);
if (typeid_cast<const DataTypeFloat64 *>(&type)) return convertNumericType<Float64>(src, type);
if (typeid_cast<const DataTypeDecimal<Int32> *>(&type))
return convertIntToDecimalType(src, typeid_cast<const DataTypeDecimal<Int32> &>(type));
if (typeid_cast<const DataTypeDecimal<Int64> *>(&type))
return convertIntToDecimalType(src, typeid_cast<const DataTypeDecimal<Int64> &>(type));
if (typeid_cast<const DataTypeDecimal<Int128> *>(&type))
return convertIntToDecimalType(src, typeid_cast<const DataTypeDecimal<Int128> &>(type));
if (auto * ptype = typeid_cast<const DataTypeDecimal<Int32> *>(&type)) return convertIntToDecimalType(src, *ptype);
if (auto * ptype = typeid_cast<const DataTypeDecimal<Int64> *>(&type)) return convertIntToDecimalType(src, *ptype);
if (auto * ptype = typeid_cast<const DataTypeDecimal<Int128> *>(&type)) return convertIntToDecimalType(src, *ptype);
const bool is_date = typeid_cast<const DataTypeDate *>(&type);
bool is_datetime = false;

View File

@ -7,9 +7,9 @@
namespace DB
{
template <typename T>
template <typename T, bool>
class ColumnVector;
using ColumnUInt8 = ColumnVector<UInt8>;
using ColumnUInt8 = ColumnVector<UInt8, true>;
class MergeTreeReader;