Added generic version for comparison operators [#CLICKHOUSE-2881].

This commit is contained in:
Alexey Milovidov 2017-03-12 07:22:52 +03:00
parent ef3e44e1bc
commit 03d525942a

View File

@ -5,6 +5,7 @@
#include <DB/Columns/ColumnString.h>
#include <DB/Columns/ColumnFixedString.h>
#include <DB/Columns/ColumnTuple.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeDateTime.h>
@ -513,6 +514,35 @@ template <typename A, typename B>
struct StringComparisonImpl<NotEqualsOp<A, B>> : StringEqualsImpl<false> {};
/// Generic version, implemented for columns of same type.
template <typename Op>
struct GenericComparisonImpl
{
static void NO_INLINE vector_vector(const IColumn & a, const IColumn & b, PaddedPODArray<UInt8> & c)
{
for (size_t i = 0, size = a.size(); i < size; ++i)
c[i] = Op::apply(a.compareAt(i, i, b, 1), 0);
}
static void NO_INLINE vector_constant(const IColumn & a, const IColumn & b, PaddedPODArray<UInt8> & c)
{
auto b_materialized = b.cloneResized(1)->convertToFullColumnIfConst();
for (size_t i = 0, size = a.size(); i < size; ++i)
c[i] = Op::apply(a.compareAt(i, 0, *b_materialized, 1), 0);
}
static void constant_vector(const IColumn & a, const IColumn & b, PaddedPODArray<UInt8> & c)
{
GenericComparisonImpl<typename Op::SymmetricOp>::vector_constant(b, a, c);
}
static void constant_constant(const IColumn & a, const IColumn & b, UInt8 & c)
{
c = Op::apply(a.compareAt(0, 0, b, 1), 0);
}
};
struct NameEquals { static constexpr auto name = "equals"; };
struct NameNotEquals { static constexpr auto name = "notEquals"; };
struct NameLess { static constexpr auto name = "less"; };
@ -631,7 +661,7 @@ private:
return false;
}
void executeString(Block & block, size_t result, const IColumn * c0, const IColumn * c1)
bool executeString(Block & block, size_t result, const IColumn * c0, const IColumn * c1)
{
const ColumnString * c0_string = typeid_cast<const ColumnString *>(c0);
const ColumnString * c1_string = typeid_cast<const ColumnString *>(c1);
@ -640,6 +670,9 @@ private:
const ColumnConstString * c0_const = typeid_cast<const ColumnConstString *>(c0);
const ColumnConstString * c1_const = typeid_cast<const ColumnConstString *>(c1);
if (!((c0_string || c0_fixed_string || c0_const) && (c1_string || c1_fixed_string || c1_const)))
return false;
using StringImpl = StringComparisonImpl<Op<int, int>>;
if (c0_const && c1_const)
@ -701,6 +734,8 @@ private:
+ " of arguments of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
return true;
}
void executeDateOrDateTimeOrEnumWithConstString(
@ -900,6 +935,32 @@ private:
block.safeGetByPosition(result).column = tmp_block.safeGetByPosition(tmp_block.columns() - 1).column;
}
void executeGeneric(Block & block, size_t result, const IColumn * c0, const IColumn * c1)
{
bool c0_const = c0->isConst();
bool c1_const = c1->isConst();
if (c0_const && c1_const)
{
auto c_res = std::make_shared<ColumnConstUInt8>(c0->size(), 0);
block.safeGetByPosition(result).column = c_res;
GenericComparisonImpl<Op<int, int>>::constant_constant(*c0, *c1, c_res->getData());
}
else
{
auto c_res = std::make_shared<ColumnUInt8>();
block.safeGetByPosition(result).column = c_res;
ColumnUInt8::Container_t & vec_res = c_res->getData();
vec_res.resize(c0->size());
if (c0_const)
GenericComparisonImpl<Op<int, int>>::constant_vector(*c0, *c1, vec_res);
else if (c1_const)
GenericComparisonImpl<Op<int, int>>::vector_constant(*c0, *c1, vec_res);
else
GenericComparisonImpl<Op<int, int>>::vector_vector(*c0, *c1, vec_res);
}
}
public:
String getName() const override
@ -964,7 +1025,8 @@ public:
|| (left_is_enum && right_is_enum && arguments[0]->getName() == arguments[1]->getName()) /// only equivalent enum type values can be compared against
|| (left_is_enum && right_is_string)
|| (left_is_string && right_is_enum)
|| (left_tuple && right_tuple && left_tuple->getElements().size() == right_tuple->getElements().size())))
|| (left_tuple && right_tuple && left_tuple->getElements().size() == right_tuple->getElements().size())
|| (arguments[0]->equals(*arguments[1]))))
throw Exception("Illegal types of arguments (" + arguments[0]->getName() + ", " + arguments[1]->getName() + ")"
" of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
@ -1006,8 +1068,10 @@ public:
}
else if (typeid_cast<const DataTypeTuple *>(col_with_type_and_name_left.type.get()))
executeTuple(block, result, col_left_untyped, col_right_untyped);
else if (!left_is_num && !right_is_num)
executeString(block, result, col_left_untyped, col_right_untyped);
else if (!left_is_num && !right_is_num && executeString(block, result, col_left_untyped, col_right_untyped))
;
else if (col_with_type_and_name_left.type->equals(*col_with_type_and_name_right.type))
executeGeneric(block, result, col_left_untyped, col_right_untyped);
else
executeDateOrDateTimeOrEnumWithConstString(
block, result, col_left_untyped, col_right_untyped,