dbms: added support of tuple arguments to comparison functions [#METR-18149].

This commit is contained in:
Alexey Milovidov 2015-10-12 04:42:47 +03:00
parent 8ab8ab5a2b
commit 4201a685b5
4 changed files with 312 additions and 16 deletions

View File

@ -10,7 +10,9 @@
#include <DB/DataTypes/DataTypeDate.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypeFixedString.h>
#include <DB/DataTypes/DataTypeTuple.h>
#include <DB/Functions/FunctionsLogical.h>
#include <DB/Functions/IFunction.h>
@ -384,6 +386,14 @@ template <typename A, typename B>
struct StringComparisonImpl<NotEqualsOp<A, B>> : StringEqualsImpl<false> {};
struct NameEquals { static constexpr auto name = "equals"; };
struct NameNotEquals { static constexpr auto name = "notEquals"; };
struct NameLess { static constexpr auto name = "less"; };
struct NameGreater { static constexpr auto name = "greater"; };
struct NameLessOrEquals { static constexpr auto name = "lessOrEquals"; };
struct NameGreaterOrEquals { static constexpr auto name = "greaterOrEquals"; };
template <
template <typename, typename> class Op,
typename Name>
@ -620,6 +630,108 @@ private:
}
}
void executeTuple(Block & block, size_t result, const IColumn * c0, const IColumn * c1)
{
/** Сравнивать кортежи будем лексикографически. Это делается следующим образом:
* x == y : x1 == y1 && x2 == y2 ...
* x != y : x1 != y1 || x2 != y2 ...
*
* x < y: x1 < y1 || (x1 == y1 && (x2 < y2 || (x2 == y2 ... && xn < yn))
* x > y: x1 > y1 || (x1 == y1 && (x2 > y2 || (x2 == y2 ... && xn > yn))
* x <= y: x1 < y1 || (x1 == y1 && (x2 < y2 || (x2 == y2 ... && xn <= yn))
*
* Рекурсивная запись:
* x <= y: x1 < y1 || (x1 == y1 && x_tail <= y_tail)
*
* x >= y: x1 > y1 || (x1 == y1 && (x2 > y2 || (x2 == y2 ... && xn >= yn))
*/
const size_t tuple_size = x->getData().columns();
if (0 == tuple_size)
throw Exception("Comparison of zero-sized tuples is not implemented.", ErrorCodes::NOT_IMPLEMENTED);
auto x = static_cast<const ColumnTuple *>(c0);
auto y = static_cast<const ColumnTuple *>(c1);
executeTupleImpl(block, result, x, y, tuple_size);
}
void executeTupleImpl(Block & block, size_t result, const ColumnTuple * x, const ColumnTuple * y, size_t tuple_size);
template <typename ComparisonFunction, typename ConvolutionFunction>
void executeTupleEqualityImpl(Block & block, size_t result, const ColumnTuple * x, const ColumnTuple * y, size_t tuple_size)
{
ComparisonFunction func_compare;
ConvolutionFunction func_convolution;
Block tmp_block;
for (size_t i = 0; i < tuple_size; ++i)
{
tmp_block.insert(x->getData().getByPosition(i));
tmp_block.insert(y->getData().getByPosition(i));
/// Сравнение элементов.
tmp_block.insert({ nullptr, new DataTypeUInt8, "" });
func_compare.execute(tmp_block, {i * 3, i * 3 + 1}, i * 3 + 2);
}
/// Логическая свёртка.
tmp_block.insert({ nullptr, new DataTypeUInt8, "" });
ColumnNumbers convolution_args(tuple_size);
for (size_t i = 0; i < tuple_size; ++i)
convolution_args[i] = i * 3 + 2;
func_convolution.execute(tmp_block, convolution_args, tuple_size * 3);
block.getByPosition(result).column = tmp_block.getByPosition(tuple_size * 3).column;
}
template <typename HeadComparisonFunction, typename TailComparisonFunction>
void executeTupleLessGreaterImpl(Block & block, size_t result, const ColumnTuple * x, const ColumnTuple * y, size_t tuple_size)
{
HeadComparisonFunction func_compare_head;
TailComparisonFunction func_compare_tail;
FunctionAnd func_and;
FunctionOr func_or;
FunctionComparison<EqualsOp, NameEquals> func_equals;
Block tmp_block;
/// Попарное сравнение на неравенство всех элементов; на равенство всех элементов кроме последнего.
for (size_t i = 0; i < tuple_size; ++i)
{
tmp_block.insert(x->getData().getByPosition(i));
tmp_block.insert(y->getData().getByPosition(i));
tmp_block.insert({ nullptr, new DataTypeUInt8, "" });
if (i + 1 != tuple_size)
{
func_compare_head.execute(tmp_block, {i * 4, i * 4 + 1}, i * 4 + 2);
tmp_block.insert({ nullptr, new DataTypeUInt8, "" });
func_equals.execute(tmp_block, {i * 4, i * 4 + 1}, i * 4 + 3);
}
else
func_compare_tail.execute(tmp_block, {i * 4, i * 4 + 1}, i * 4 + 2);
}
/// Комбинирование. Сложный код - сделайте рисунок. Можно заменить на рекурсивное сравнение кортежей.
size_t i = tuple_size - 1;
while (i > 0)
{
tmp_block.insert({ nullptr, new DataTypeUInt8, "" });
func_and.execute(tmp_block, { tmp_block.columns() - 2, (i - 1) * 4 + 3 }, tmp_block.columns() - 1);
tmp_block.insert({ nullptr, new DataTypeUInt8, "" });
func_or.execute(tmp_block, { tmp_block.columns() - 2, (i - 1) * 4 + 2 }, tmp_block.columns() - 1);
--i;
}
block.getByPosition(result).column = tmp_block.getByPosition(tmp_block.columns() - 1).column;
}
public:
/// Получить имя функции.
String getName() const override
@ -639,23 +751,27 @@ public:
bool left_is_date_time = false;
bool left_is_string = false;
bool left_is_fixed_string = false;
const DataTypeTuple * left_tuple = nullptr;
false
|| (left_is_date = typeid_cast<const DataTypeDate *>(arguments[0].get()))
|| (left_is_date_time = typeid_cast<const DataTypeDateTime *>(arguments[0].get()))
|| (left_is_string = typeid_cast<const DataTypeString *>(arguments[0].get()))
|| (left_is_fixed_string = typeid_cast<const DataTypeFixedString *>(arguments[0].get()));
|| (left_is_fixed_string = typeid_cast<const DataTypeFixedString *>(arguments[0].get()))
|| (left_tuple = typeid_cast<const DataTypeTuple *>(arguments[0].get()));
bool right_is_date = false;
bool right_is_date_time = false;
bool right_is_string = false;
bool right_is_fixed_string = false;
const DataTypeTuple * right_tuple = nullptr;
false
|| (right_is_date = typeid_cast<const DataTypeDate *>(arguments[1].get()))
|| (right_is_date_time = typeid_cast<const DataTypeDateTime *>(arguments[1].get()))
|| (right_is_string = typeid_cast<const DataTypeString *>(arguments[1].get()))
|| (right_is_fixed_string = typeid_cast<const DataTypeFixedString *>(arguments[1].get()));
|| (right_is_fixed_string = typeid_cast<const DataTypeFixedString *>(arguments[1].get()))
|| (right_tuple = typeid_cast<const DataTypeTuple *>(arguments[1].get()));
if (!( (arguments[0]->behavesAsNumber() && arguments[1]->behavesAsNumber())
|| ((left_is_string || left_is_fixed_string) && (right_is_string || right_is_fixed_string))
@ -664,10 +780,18 @@ public:
|| (left_is_string && right_is_date)
|| (left_is_date_time && right_is_date_time)
|| (left_is_date_time && right_is_string)
|| (left_is_string && right_is_date_time)))
|| (left_is_string && right_is_date_time)
|| (left_tuple && right_tuple && left_tuple->getElements().size() == right_tuple->getElements().size())))
throw Exception("Illegal types of arguments (" + arguments[0]->getName() + ", " + arguments[1]->getName() + ")"
" of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (left_tuple && right_tuple)
{
size_t size = left_tuple->getElements().size();
for (size_t i = 0; i < size; ++i)
getReturnType({ left_tuple->getElements()[i], right_tuple->getElements()[i] });
}
return new DataTypeUInt8;
}
@ -696,25 +820,25 @@ public:
+ " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
else if (!left_is_num && !right_is_num)
{
executeString(block, result, col_left_untyped, col_right_untyped);
}
else
{
executeDateOrDateTimeWithConstString(block, result, col_left_untyped, col_right_untyped, left_is_num, right_is_num);
if (typeid_cast<const ColumnTuple *>(col_left_untyped))
{
executeTuple(block, result, col_left_untyped, col_right_untyped);
}
else if (!left_is_num && !right_is_num)
{
executeString(block, result, col_left_untyped, col_right_untyped);
}
else
{
executeDateOrDateTimeWithConstString(block, result, col_left_untyped, col_right_untyped, left_is_num, right_is_num);
}
}
}
};
struct NameEquals { static constexpr auto name = "equals"; };
struct NameNotEquals { static constexpr auto name = "notEquals"; };
struct NameLess { static constexpr auto name = "less"; };
struct NameGreater { static constexpr auto name = "greater"; };
struct NameLessOrEquals { static constexpr auto name = "lessOrEquals"; };
struct NameGreaterOrEquals { static constexpr auto name = "greaterOrEquals"; };
typedef FunctionComparison<EqualsOp, NameEquals> FunctionEquals;
typedef FunctionComparison<NotEqualsOp, NameNotEquals> FunctionNotEquals;
typedef FunctionComparison<LessOp, NameLess> FunctionLess;
@ -722,4 +846,56 @@ typedef FunctionComparison<GreaterOp, NameGreater> FunctionGreater;
typedef FunctionComparison<LessOrEqualsOp, NameLessOrEquals> FunctionLessOrEquals;
typedef FunctionComparison<GreaterOrEqualsOp, NameGreaterOrEquals> FunctionGreaterOrEquals;
template <>
void FunctionComparison<EqualsOp, NameEquals>::executeTupleImpl(
Block & block, size_t result, const ColumnTuple * x, const ColumnTuple * y, size_t tuple_size)
{
return executeTupleEqualityImpl<FunctionComparison<EqualsOp, NameEquals>, FunctionAnd>(block, result, x, y, tuple_size);
}
template <>
void FunctionComparison<NotEqualsOp, NameNotEquals>::executeTupleImpl(
Block & block, size_t result, const ColumnTuple * x, const ColumnTuple * y, size_t tuple_size)
{
return executeTupleEqualityImpl<FunctionComparison<NotEqualsOp, NameNotEquals>, FunctionOr>(block, result, x, y, tuple_size);
}
template <>
void FunctionComparison<LessOp, NameLess>::executeTupleImpl(
Block & block, size_t result, const ColumnTuple * x, const ColumnTuple * y, size_t tuple_size)
{
return executeTupleLessGreaterImpl<
FunctionComparison<LessOp, NameLess>,
FunctionComparison<LessOp, NameLess>>(block, result, x, y, tuple_size);
}
template <>
void FunctionComparison<GreaterOp, NameGreater>::executeTupleImpl(
Block & block, size_t result, const ColumnTuple * x, const ColumnTuple * y, size_t tuple_size)
{
return executeTupleLessGreaterImpl<
FunctionComparison<GreaterOp, NameGreater>,
FunctionComparison<GreaterOp, NameGreater>>(block, result, x, y, tuple_size);
}
template <>
void FunctionComparison<LessOrEqualsOp, NameLessOrEquals>::executeTupleImpl(
Block & block, size_t result, const ColumnTuple * x, const ColumnTuple * y, size_t tuple_size)
{
return executeTupleLessGreaterImpl<
FunctionComparison<LessOp, NameLess>,
FunctionComparison<LessOrEqualsOp, NameLessOrEquals>>(block, result, x, y, tuple_size);
}
template <>
void FunctionComparison<GreaterOrEqualsOp, NameGreaterOrEquals>::executeTupleImpl(
Block & block, size_t result, const ColumnTuple * x, const ColumnTuple * y, size_t tuple_size)
{
return executeTupleLessGreaterImpl<
FunctionComparison<GreaterOp, NameGreater>,
FunctionComparison<GreaterOrEqualsOp, NameGreaterOrEquals>>(block, result, x, y, tuple_size);
}
}

View File

@ -8,7 +8,7 @@
#include <DB/Columns/ColumnExpression.h>
#include <DB/Functions/IFunction.h>
#include "FunctionsMiscellaneous.h"
#include <DB/Functions/FunctionsMiscellaneous.h>
namespace DB

View File

@ -0,0 +1,15 @@
1 0 0 0 1 1
0 1 1 0 1 0
0 1 1 0 1 0
0 1 1 0 1 0
0 1 0 1 0 1
0 1 0 1 0 1
0 1 0 1 0 1
1 0 0 0 1 1
0 1 1 0 1 0
0 1 1 0 1 0
0 1 0 1 0 1
0 1 0 1 0 1
1 0 0 0 1 1
0 1 1 0 1 0
0 1 0 1 0 1

View File

@ -0,0 +1,105 @@
SELECT
(1, 'Hello', 23) = (1, 'Hello', 23),
(1, 'Hello', 23) != (1, 'Hello', 23),
(1, 'Hello', 23) < (1, 'Hello', 23),
(1, 'Hello', 23) > (1, 'Hello', 23),
(1, 'Hello', 23) <= (1, 'Hello', 23),
(1, 'Hello', 23) >= (1, 'Hello', 23);
SELECT
(1, 'Hello', 23) = (2, 'Hello', 23),
(1, 'Hello', 23) != (2, 'Hello', 23),
(1, 'Hello', 23) < (2, 'Hello', 23),
(1, 'Hello', 23) > (2, 'Hello', 23),
(1, 'Hello', 23) <= (2, 'Hello', 23),
(1, 'Hello', 23) >= (2, 'Hello', 23);
SELECT
(1, 'Hello', 23) = (1, 'World', 23),
(1, 'Hello', 23) != (1, 'World', 23),
(1, 'Hello', 23) < (1, 'World', 23),
(1, 'Hello', 23) > (1, 'World', 23),
(1, 'Hello', 23) <= (1, 'World', 23),
(1, 'Hello', 23) >= (1, 'World', 23);
SELECT
(1, 'Hello', 23) = (1, 'Hello', 24),
(1, 'Hello', 23) != (1, 'Hello', 24),
(1, 'Hello', 23) < (1, 'Hello', 24),
(1, 'Hello', 23) > (1, 'Hello', 24),
(1, 'Hello', 23) <= (1, 'Hello', 24),
(1, 'Hello', 23) >= (1, 'Hello', 24);
SELECT
(2, 'Hello', 23) = (1, 'Hello', 23),
(2, 'Hello', 23) != (1, 'Hello', 23),
(2, 'Hello', 23) < (1, 'Hello', 23),
(2, 'Hello', 23) > (1, 'Hello', 23),
(2, 'Hello', 23) <= (1, 'Hello', 23),
(2, 'Hello', 23) >= (1, 'Hello', 23);
SELECT
(1, 'World', 23) = (1, 'Hello', 23),
(1, 'World', 23) != (1, 'Hello', 23),
(1, 'World', 23) < (1, 'Hello', 23),
(1, 'World', 23) > (1, 'Hello', 23),
(1, 'World', 23) <= (1, 'Hello', 23),
(1, 'World', 23) >= (1, 'Hello', 23);
SELECT
(1, 'Hello', 24) = (1, 'Hello', 23),
(1, 'Hello', 24) != (1, 'Hello', 23),
(1, 'Hello', 24) < (1, 'Hello', 23),
(1, 'Hello', 24) > (1, 'Hello', 23),
(1, 'Hello', 24) <= (1, 'Hello', 23),
(1, 'Hello', 24) >= (1, 'Hello', 23);
SELECT
(1, 'Hello') = (1, 'Hello'),
(1, 'Hello') != (1, 'Hello'),
(1, 'Hello') < (1, 'Hello'),
(1, 'Hello') > (1, 'Hello'),
(1, 'Hello') <= (1, 'Hello'),
(1, 'Hello') >= (1, 'Hello');
SELECT
(1, 'Hello') = (2, 'Hello'),
(1, 'Hello') != (2, 'Hello'),
(1, 'Hello') < (2, 'Hello'),
(1, 'Hello') > (2, 'Hello'),
(1, 'Hello') <= (2, 'Hello'),
(1, 'Hello') >= (2, 'Hello');
SELECT
(1, 'Hello') = (1, 'World'),
(1, 'Hello') != (1, 'World'),
(1, 'Hello') < (1, 'World'),
(1, 'Hello') > (1, 'World'),
(1, 'Hello') <= (1, 'World'),
(1, 'Hello') >= (1, 'World');
SELECT
(2, 'Hello') = (1, 'Hello'),
(2, 'Hello') != (1, 'Hello'),
(2, 'Hello') < (1, 'Hello'),
(2, 'Hello') > (1, 'Hello'),
(2, 'Hello') <= (1, 'Hello'),
(2, 'Hello') >= (1, 'Hello');
SELECT
(1, 'World') = (1, 'Hello'),
(1, 'World') != (1, 'Hello'),
(1, 'World') < (1, 'Hello'),
(1, 'World') > (1, 'Hello'),
(1, 'World') <= (1, 'Hello'),
(1, 'World') >= (1, 'Hello');
SELECT
tuple(1) = tuple(1),
tuple(1) != tuple(1),
tuple(1) < tuple(1),
tuple(1) > tuple(1),
tuple(1) <= tuple(1),
tuple(1) >= tuple(1);
SELECT
tuple(1) = tuple(2),
tuple(1) != tuple(2),
tuple(1) < tuple(2),
tuple(1) > tuple(2),
tuple(1) <= tuple(2),
tuple(1) >= tuple(2);
SELECT
tuple(2) = tuple(1),
tuple(2) != tuple(1),
tuple(2) < tuple(1),
tuple(2) > tuple(1),
tuple(2) <= tuple(1),
tuple(2) >= tuple(1);