Merge pull request #11071 (#11874)

* Add's hasSubstr  array function

* Add's example in the english doc of hasSubStr

* Updates conditions to avoid prematurely calling isEqual

* Remove feathericons

Authored-by: Ryad ZENINE <ryad.zenine@contentsquare.com>
This commit is contained in:
Alexander Kazakov 2020-06-23 17:42:19 +03:00 committed by GitHub
parent 56a665290c
commit 0510911559
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 314 additions and 35 deletions

View File

@ -176,6 +176,54 @@ hasAny(array1, array2)
`SELECT hasAll([[1, 2], [3, 4]], [[1, 2], [1, 2]])` returns `1`.
## hasSubstr {#hassubstr}
Checks whether all the elements of array2 appear in array1 in the same exact order. Therefore, the function will return 1, if and only if `array1 = prefix + array2 + suffix`.
``` sql
hasSubstr(array1, array2)
```
In other words, the functions will check whether all the elements of `array2` are contained in `array1` like
the `hasAll` function. In addition, it will check that the elements are observed in the same order in both `array1` and `array2`.
For Example:
- `hasSubstr([1,2,3,4], [2,3])` returns 1. However, `hasSubstr([1,2,3,4], [3,2])` will return `0`.
- `hasSubstr([1,2,3,4], [1,2,3])` returns 1. However, `hasSubstr([1,2,3,4], [1,2,4])` will return `0`.
**Parameters**
- `array1` Array of any type with a set of elements.
- `array2` Array of any type with a set of elements.
**Return values**
- `1`, if `array1` contains `array2`.
- `0`, otherwise.
**Peculiar properties**
- The function will return `1` if `array2` is empty.
- `Null` processed as a value. In other words `hasSubstr([1, 2, NULL, 3, 4], [2,3])` will return `0`. However, `hasSubstr([1, 2, NULL, 3, 4], [2,NULL,3])` will return `1`
- Order of values in both of arrays does matter.
**Examples**
`SELECT hasSubstr([], [])` returns 1.
`SELECT hasSubstr([1, Null], [Null])` returns 1.
`SELECT hasSubstr([1.0, 2, 3, 4], [1, 3])` returns 0.
`SELECT hasSubstr(['a', 'b'], ['a'])` returns 1.
`SELECT hasSubstr(['a', 'b' , 'c'], ['a', 'b'])` returns 1.
`SELECT hasSubstr(['a', 'b' , 'c'], ['a', 'c'])` returns 0.
`SELECT hasSubstr([[1, 2], [3, 4], [5, 6]], [[1, 2], [3, 4]])` returns 1.
## indexOf(arr, x) {#indexofarr-x}
Returns the index of the first x element (starting from 1) if it is in the array, or 0 if it is not.

View File

@ -6,6 +6,7 @@
#include "Sinks.h"
#include <Core/AccurateComparison.h>
#include <ext/range.h>
#include "GatherUtils.h"
namespace DB::ErrorCodes
@ -394,11 +395,12 @@ void NO_INLINE conditional(SourceA && src_a, SourceB && src_b, Sink && sink, con
/// Methods to check if first array has elements from second array, overloaded for various combinations of types.
template <bool all, typename FirstSliceType, typename SecondSliceType,
template <
ArraySearchType search_type,
typename FirstSliceType,
typename SecondSliceType,
bool (*isEqual)(const FirstSliceType &, const SecondSliceType &, size_t, size_t)>
bool sliceHasImpl(const FirstSliceType & first, const SecondSliceType & second,
const UInt8 * first_null_map, const UInt8 * second_null_map)
bool sliceHasImplAnyAll(const FirstSliceType & first, const SecondSliceType & second, const UInt8 * first_null_map, const UInt8 * second_null_map)
{
const bool has_first_null_map = first_null_map != nullptr;
const bool has_second_null_map = second_null_map != nullptr;
@ -418,17 +420,113 @@ bool sliceHasImpl(const FirstSliceType & first, const SecondSliceType & second,
has = true;
}
if (has && !all)
if (has && search_type == ArraySearchType::Any)
return true;
if (!has && all)
if (!has && search_type == ArraySearchType::All)
return false;
}
return search_type == ArraySearchType::All;
}
/// For details of Knuth-Morris-Pratt string matching algorithm see
/// https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm.
/// A "prefix-function" is defined as: i-th element is the length of the longest of all prefixes that end in i-th position
template <typename SliceType, typename EqualityFunc>
std::vector<size_t> buildKMPPrefixFunction(const SliceType & pattern, const EqualityFunc & isEqualFunc)
{
std::vector<size_t> result(pattern.size);
result[0] = 0;
for (size_t i = 1; i < pattern.size; ++i)
{
result[i] = 0;
for (auto length = i; length > 0;)
{
length = result[length - 1];
if (isEqualFunc(pattern, i, length))
{
result[i] = length + 1;
break;
}
}
}
return all;
return result;
}
template < typename FirstSliceType,
typename SecondSliceType,
bool (*isEqual)(const FirstSliceType &, const SecondSliceType &, size_t, size_t),
bool (*isEqualUnary)(const SecondSliceType &, size_t, size_t)>
bool sliceHasImplSubstr(const FirstSliceType & first, const SecondSliceType & second, const UInt8 * first_null_map, const UInt8 * second_null_map)
{
if (second.size == 0)
return true;
const bool has_first_null_map = first_null_map != nullptr;
const bool has_second_null_map = second_null_map != nullptr;
std::vector<size_t> prefix_function;
if (has_second_null_map)
{
prefix_function = buildKMPPrefixFunction(second,
[null_map = second_null_map](const SecondSliceType & pattern, size_t i, size_t j)
{
return !!null_map[i] == !!null_map[j] && (!!null_map[i] || isEqualUnary(pattern, i, j));
});
}
else
{
prefix_function = buildKMPPrefixFunction(second,
[](const SecondSliceType & pattern, size_t i, size_t j) { return isEqualUnary(pattern, i, j); });
}
size_t firstCur = 0;
size_t secondCur = 0;
while (firstCur < first.size && secondCur < second.size)
{
const bool is_first_null = has_first_null_map && first_null_map[firstCur];
const bool is_second_null = has_second_null_map && second_null_map[secondCur];
const bool cond_both_null_match = is_first_null && is_second_null;
const bool cond_both_not_null = !is_first_null && !is_second_null;
if (cond_both_null_match || (cond_both_not_null && isEqual(first, second, firstCur, secondCur)))
{
++firstCur;
++secondCur;
}
else if (secondCur > 0)
{
secondCur = prefix_function[secondCur - 1];
}
else
{
++firstCur;
}
}
return secondCur == second.size;
}
template <
ArraySearchType search_type,
typename FirstSliceType,
typename SecondSliceType,
bool (*isEqual)(const FirstSliceType &, const SecondSliceType &, size_t, size_t),
bool (*isEqualSecond)(const SecondSliceType &, size_t, size_t)>
bool sliceHasImpl(const FirstSliceType & first, const SecondSliceType & second, const UInt8 * first_null_map, const UInt8 * second_null_map)
{
if constexpr (search_type == ArraySearchType::Substr)
return sliceHasImplSubstr<FirstSliceType, SecondSliceType, isEqual, isEqualSecond>(first, second, first_null_map, second_null_map);
else
return sliceHasImplAnyAll<search_type, FirstSliceType, SecondSliceType, isEqual>(first, second, first_null_map, second_null_map);
}
template <typename T, typename U>
bool sliceEqualElements(const NumericArraySlice<T> & first [[maybe_unused]],
const NumericArraySlice<U> & second [[maybe_unused]],
@ -461,65 +559,95 @@ inline ALWAYS_INLINE bool sliceEqualElements(const GenericArraySlice & first, co
return first.elements->compareAt(first_ind + first.begin, second_ind + second.begin, *second.elements, -1) == 0;
}
template <bool all, typename T, typename U>
template <typename T>
bool insliceEqualElements(const NumericArraySlice<T> & first [[maybe_unused]],
size_t first_ind [[maybe_unused]],
size_t second_ind [[maybe_unused]])
{
if constexpr (IsDecimalNumber<T>)
return accurate::equalsOp(typename T::NativeType(first.data[first_ind]), typename T::NativeType(first.data[second_ind]));
else
return accurate::equalsOp(first.data[first_ind], first.data[second_ind]);
}
inline ALWAYS_INLINE bool insliceEqualElements(const GenericArraySlice & first, size_t first_ind, size_t second_ind)
{
return first.elements->compareAt(first_ind + first.begin, second_ind + first.begin, *first.elements, -1) == 0;
}
template <ArraySearchType search_type, typename T, typename U>
bool sliceHas(const NumericArraySlice<T> & first, const NumericArraySlice<U> & second)
{
auto impl = sliceHasImpl<all, NumericArraySlice<T>, NumericArraySlice<U>, sliceEqualElements<T, U>>;
auto impl = sliceHasImpl<search_type, NumericArraySlice<T>, NumericArraySlice<U>, sliceEqualElements<T, U>, insliceEqualElements<U>>;
return impl(first, second, nullptr, nullptr);
}
template <bool all>
template <ArraySearchType search_type>
bool sliceHas(const GenericArraySlice & first, const GenericArraySlice & second)
{
/// Generic arrays should have the same type in order to use column.compareAt(...)
if (!first.elements->structureEquals(*second.elements))
return false;
auto impl = sliceHasImpl<all, GenericArraySlice, GenericArraySlice, sliceEqualElements>;
auto impl = sliceHasImpl<search_type, GenericArraySlice, GenericArraySlice, sliceEqualElements, insliceEqualElements>;
return impl(first, second, nullptr, nullptr);
}
template <bool all, typename U>
template <ArraySearchType search_type, typename U>
bool sliceHas(const GenericArraySlice & /*first*/, const NumericArraySlice<U> & /*second*/)
{
return false;
}
template <bool all, typename T>
template <ArraySearchType search_type, typename T>
bool sliceHas(const NumericArraySlice<T> & /*first*/, const GenericArraySlice & /*second*/)
{
return false;
}
template <bool all, typename FirstArraySlice, typename SecondArraySlice>
template <ArraySearchType search_type, typename FirstArraySlice, typename SecondArraySlice>
bool sliceHas(const FirstArraySlice & first, NullableSlice<SecondArraySlice> & second)
{
auto impl = sliceHasImpl<all, FirstArraySlice, SecondArraySlice, sliceEqualElements<FirstArraySlice, SecondArraySlice>>;
auto impl = sliceHasImpl<
search_type,
FirstArraySlice,
SecondArraySlice,
sliceEqualElements<FirstArraySlice, SecondArraySlice>,
insliceEqualElements<SecondArraySlice>>;
return impl(first, second, nullptr, second.null_map);
}
template <bool all, typename FirstArraySlice, typename SecondArraySlice>
template <ArraySearchType search_type, typename FirstArraySlice, typename SecondArraySlice>
bool sliceHas(const NullableSlice<FirstArraySlice> & first, SecondArraySlice & second)
{
auto impl = sliceHasImpl<all, FirstArraySlice, SecondArraySlice, sliceEqualElements<FirstArraySlice, SecondArraySlice>>;
auto impl = sliceHasImpl<
search_type,
FirstArraySlice,
SecondArraySlice,
sliceEqualElements<FirstArraySlice, SecondArraySlice>,
insliceEqualElements<SecondArraySlice>>;
return impl(first, second, first.null_map, nullptr);
}
template <bool all, typename FirstArraySlice, typename SecondArraySlice>
template <ArraySearchType search_type, typename FirstArraySlice, typename SecondArraySlice>
bool sliceHas(const NullableSlice<FirstArraySlice> & first, NullableSlice<SecondArraySlice> & second)
{
auto impl = sliceHasImpl<all, FirstArraySlice, SecondArraySlice, sliceEqualElements<FirstArraySlice, SecondArraySlice>>;
auto impl = sliceHasImpl<
search_type,
FirstArraySlice,
SecondArraySlice,
sliceEqualElements<FirstArraySlice, SecondArraySlice>,
insliceEqualElements<SecondArraySlice>>;
return impl(first, second, first.null_map, second.null_map);
}
template <bool all, typename FirstSource, typename SecondSource>
template <ArraySearchType search_type, typename FirstSource, typename SecondSource>
void NO_INLINE arrayAllAny(FirstSource && first, SecondSource && second, ColumnUInt8 & result)
{
auto size = result.size();
auto & data = result.getData();
for (auto row : ext::range(0, size))
{
data[row] = static_cast<UInt8>(sliceHas<all>(first.getWhole(), second.getWhole()) ? 1 : 0);
data[row] = static_cast<UInt8>(sliceHas<search_type>(first.getWhole(), second.getWhole()) ? 1 : 0);
first.next();
second.next();
}

View File

@ -30,6 +30,13 @@
namespace DB::GatherUtils
{
enum class ArraySearchType
{
Any, // Corresponds to the hasAny array function
All, // Corresponds to the hasAll array function
Substr // Corresponds to the hasSubstr array function
};
std::unique_ptr<IArraySource> createArraySource(const ColumnArray & col, bool is_const, size_t total_rows);
std::unique_ptr<IValueSource> createValueSource(const IColumn & col, bool is_const, size_t total_rows);
std::unique_ptr<IArraySink> createArraySink(ColumnArray & col, size_t column_size);
@ -45,7 +52,7 @@ void sliceFromRightConstantOffsetBounded(IArraySource & src, IArraySink & sink,
void sliceDynamicOffsetUnbounded(IArraySource & src, IArraySink & sink, const IColumn & offset_column);
void sliceDynamicOffsetBounded(IArraySource & src, IArraySink & sink, const IColumn & offset_column, const IColumn & length_column);
void sliceHas(IArraySource & first, IArraySource & second, bool all, ColumnUInt8 & result);
void sliceHas(IArraySource & first, IArraySource & second, ArraySearchType & search_type, ColumnUInt8 & result);
void push(IArraySource & array_source, IValueSource & value_source, IArraySink & sink, bool push_front);

View File

@ -8,18 +8,28 @@ namespace DB::GatherUtils
struct ArrayHasSelectArraySourcePair : public ArraySourcePairSelector<ArrayHasSelectArraySourcePair>
{
template <typename FirstSource, typename SecondSource>
static void selectSourcePair(FirstSource && first, SecondSource && second, bool all, ColumnUInt8 & result)
static void selectSourcePair(FirstSource && first, SecondSource && second, ArraySearchType & search_type, ColumnUInt8 & result)
{
if (all)
arrayAllAny<true>(first, second, result);
else
arrayAllAny<false>(first, second, result);
switch (search_type)
{
case ArraySearchType::All:
arrayAllAny<ArraySearchType::All>(first, second, result);
break;
case ArraySearchType::Any:
arrayAllAny<ArraySearchType::Any>(first, second, result);
break;
case ArraySearchType::Substr:
arrayAllAny<ArraySearchType::Substr>(first, second, result);
break;
}
}
};
void sliceHas(IArraySource & first, IArraySource & second, bool all, ColumnUInt8 & result)
void sliceHas(IArraySource & first, IArraySource & second, ArraySearchType & search_type, ColumnUInt8 & result)
{
ArrayHasSelectArraySourcePair::select(first, second, all, result);
ArrayHasSelectArraySourcePair::select(first, second, search_type, result);
}
}

View File

@ -1,5 +1,6 @@
#include "hasAllAny.h"
#include <Functions/FunctionFactory.h>
#include <Functions/GatherUtils/GatherUtils.h>
namespace DB
@ -10,7 +11,7 @@ class FunctionArrayHasAll : public FunctionArrayHasAllAny
public:
static constexpr auto name = "hasAll";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionArrayHasAll>(); }
FunctionArrayHasAll() : FunctionArrayHasAllAny(true, name) {}
FunctionArrayHasAll() : FunctionArrayHasAllAny(GatherUtils::ArraySearchType::All, name) {}
};
void registerFunctionHasAll(FunctionFactory & factory)

View File

@ -27,8 +27,8 @@ namespace ErrorCodes
class FunctionArrayHasAllAny : public IFunction
{
public:
FunctionArrayHasAllAny(bool all_, const char * name_)
: all(all_), name(name_) {}
FunctionArrayHasAllAny(GatherUtils::ArraySearchType search_type_, const char * name_)
: search_type(search_type_), name(name_) {}
String getName() const override { return name; }
@ -106,7 +106,7 @@ public:
auto result_column = ColumnUInt8::create(rows);
auto result_column_ptr = typeid_cast<ColumnUInt8 *>(result_column.get());
GatherUtils::sliceHas(*sources[0], *sources[1], all, *result_column_ptr);
GatherUtils::sliceHas(*sources[0], *sources[1], search_type, *result_column_ptr);
block.getByPosition(result).column = std::move(result_column);
}
@ -114,7 +114,7 @@ public:
bool useDefaultImplementationForConstants() const override { return true; }
private:
bool all;
GatherUtils::ArraySearchType search_type;
const char * name;
};

View File

@ -1,5 +1,6 @@
#include "hasAllAny.h"
#include <Functions/FunctionFactory.h>
#include <Functions/GatherUtils/GatherUtils.h>
namespace DB
@ -10,7 +11,7 @@ class FunctionArrayHasAny : public FunctionArrayHasAllAny
public:
static constexpr auto name = "hasAny";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionArrayHasAny>(); }
FunctionArrayHasAny() : FunctionArrayHasAllAny(false, name) {}
FunctionArrayHasAny() : FunctionArrayHasAllAny(GatherUtils::ArraySearchType::Any, name) {}
};
void registerFunctionHasAny(FunctionFactory & factory)

View File

@ -0,0 +1,22 @@
#include "hasAllAny.h"
#include <Functions/FunctionFactory.h>
#include <Functions/GatherUtils/GatherUtils.h>
namespace DB
{
class FunctionArrayHasSubstr : public FunctionArrayHasAllAny
{
public:
static constexpr auto name = "hasSubstr";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionArrayHasSubstr>(); }
FunctionArrayHasSubstr() : FunctionArrayHasAllAny(GatherUtils::ArraySearchType::Substr, name) {}
};
void registerFunctionHasSubstr(FunctionFactory & factory)
{
factory.registerFunction<FunctionArrayHasSubstr>();
}
}

View File

@ -8,6 +8,7 @@ void registerFunctionArrayResize(FunctionFactory &);
void registerFunctionHas(FunctionFactory &);
void registerFunctionHasAll(FunctionFactory &);
void registerFunctionHasAny(FunctionFactory &);
void registerFunctionHasSubstr(FunctionFactory &);
void registerFunctionIndexOf(FunctionFactory &);
void registerFunctionCountEqual(FunctionFactory &);
void registerFunctionArrayIntersect(FunctionFactory &);
@ -43,6 +44,7 @@ void registerFunctionsArray(FunctionFactory & factory)
registerFunctionHas(factory);
registerFunctionHasAll(factory);
registerFunctionHasAny(factory);
registerFunctionHasSubstr(factory);
registerFunctionIndexOf(factory);
registerFunctionCountEqual(factory);
registerFunctionArrayIntersect(factory);

View File

@ -92,6 +92,7 @@ SRCS(
array/emptyArrayToSingle.cpp
array/hasAll.cpp
array/hasAny.cpp
array/hasSubstr.cpp
array/has.cpp
array/indexOf.cpp
array/length.cpp

View File

@ -0,0 +1,27 @@
1
0
0
1
0
1
0
1
1
0
-
1
-
0
0
1
0
-
1
0
1
-
0
0
1
1
0

View File

@ -0,0 +1,32 @@
select hasSubstr([], []);
select hasSubstr([], [1]);
select hasSubstr([], [NULL]);
select hasSubstr([Null], [Null]);
select hasSubstr([Null], [Null, 1]);
select hasSubstr([1], []);
select hasSubstr([1], [Null]);
select hasSubstr([1, Null], [Null]);
select hasSubstr([1, Null, 3, 4, Null, 5, 7], [3, 4, Null]);
select hasSubstr([1, Null], [3, 4, Null]);
select '-';
select hasSubstr([1], emptyArrayUInt8());
select '-';
select hasSubstr([1, 2, 3, 4], [1, 3]);
select hasSubstr([1, 2, 3, 4], [1, 3, 5]);
select hasSubstr([-128, 1., 512], [1.]);
select hasSubstr([-128, 1.0, 512], [.3]);
select '-';
select hasSubstr(['a'], ['a']);
select hasSubstr(['a', 'b'], ['a', 'c']);
select hasSubstr(['a', 'c', 'b'], ['a', 'c']);
select '-';
select hasSubstr([1], ['a']);
select hasSubstr([[1, 2], [3, 4]], ['a', 'c']);
select hasSubstr([[1, 2], [3, 4], [5, 8]], [[3, 4]]);
select hasSubstr([[1, 2], [3, 4], [5, 8]], [[3, 4], [5, 8]]);
select hasSubstr([[1, 2], [3, 4], [5, 8]], [[1, 2], [5, 8]]);