Correcting new hasAll implementation for the case with null elements present in 'second' and absent in 'first', refactoring the outer loop remainder into a separate function, improving null checking in the default implementation

This commit is contained in:
Jakub Kuklis 2021-08-19 14:13:30 +02:00 committed by youenn lebras
parent 439082aa98
commit 763bd006a7
No known key found for this signature in database
GPG Key ID: E1DF98A69CABD2A5

View File

@ -7,10 +7,12 @@
#include <Core/AccurateComparison.h>
#include <base/range.h>
#include "GatherUtils.h"
#if defined(__AVX2__) || defined(__SSE4_2__)
#include <immintrin.h>
#include <immintrin.h>
#endif
namespace DB::ErrorCodes
{
extern const int LOGICAL_ERROR;
@ -495,6 +497,20 @@ std::vector<size_t> buildKMPPrefixFunction(const SliceType & pattern, const Equa
}
inline ALWAYS_INLINE bool hasNull(const UInt8 * null_map, size_t null_map_size)
{
if (null_map != nullptr)
{
for (size_t i = 0; i < null_map_size; ++i)
{
if (null_map[i])
return true;
}
}
return false;
}
/// Methods to check if first array has elements from second array, overloaded for various combinations of types.
template <
ArraySearchType search_type,
@ -506,19 +522,35 @@ bool sliceHasImplAnyAll(const FirstSliceType & first, const SecondSliceType & se
const bool has_first_null_map = first_null_map != nullptr;
const bool has_second_null_map = second_null_map != nullptr;
const bool has_second_null = hasNull(second_null_map, second.size);
if (has_second_null)
{
const bool has_first_null = hasNull(first_null_map, first.size);
if (has_first_null && search_type == ArraySearchType::Any)
return true;
if (!has_first_null && search_type == ArraySearchType::All)
return false;
}
for (size_t i = 0; i < second.size; ++i)
{
if (has_second_null_map && second_null_map[i])
continue;
bool has = false;
for (unsigned j = 0; j < first.size && !has; ++j)
for (size_t j = 0; j < first.size && !has; ++j)
{
const bool is_first_null = has_first_null_map && first_null_map[j];
const bool is_second_null = has_second_null_map && second_null_map[i];
if (has_first_null_map && first_null_map[j])
continue;
if (is_first_null && is_second_null)
has = true;
if (!is_first_null && !is_second_null && isEqual(first, second, j, i))
if (isEqual(first, second, j, i))
{
has = true;
break;
}
}
if (has && search_type == ArraySearchType::Any)
@ -531,21 +563,60 @@ bool sliceHasImplAnyAll(const FirstSliceType & first, const SecondSliceType & se
}
#if defined(__AVX2__) || defined(__SSE4_2__)
template<class T>
inline ALWAYS_INLINE bool hasAllIntegralLoopRemainder(
size_t j, const NumericArraySlice<T> & first, const NumericArraySlice<T> & second, const UInt8 * first_null_map, const UInt8 * second_null_map)
{
const bool has_first_null_map = first_null_map != nullptr;
const bool has_second_null_map = second_null_map != nullptr;
for (; j < second.size; ++j)
{
// skip null elements since both have at least one - assuming it was checked earlier that at least one element in 'first' is null
if (has_second_null_map && second_null_map[j])
continue;
bool found = false;
for (size_t i = 0; i < first.size; ++i)
{
if (has_first_null_map && first_null_map[i])
continue;
if (first.data[i] == second.data[j])
{
found = true;
break;
}
}
if (!found)
return false;
}
return true;
}
#endif
#if defined(__AVX2__)
// AVX2 - Int specialization
// AVX2 Int specialization
template <>
inline ALWAYS_INLINE bool sliceHasImplAnyAll<ArraySearchType::All, NumericArraySlice<int>, NumericArraySlice<int>, sliceEqualElements<int,int> >(
const NumericArraySlice<int> & first, const NumericArraySlice<int> & second, const UInt8 * second_null_map, const UInt8 * first_null_map)
const NumericArraySlice<int> & first, const NumericArraySlice<int> & second, const UInt8 * first_null_map, const UInt8 * second_null_map)
{
if (second.size == 0)
return true;
const bool has_first_null_map = first_null_map != nullptr;
const bool has_second_null_map = second_null_map != nullptr;
if (!has_first_null_map && has_second_null_map)
if (!hasNull(first_null_map, first.size) && hasNull(second_null_map, second.size))
return false;
unsigned j = 0;
const bool has_first_null_map = first_null_map != nullptr;
const bool has_second_null_map = second_null_map != nullptr;
size_t j = 0;
short has_mask = 1;
const int full = -1, none = 0;
const __m256i ones = _mm256_set1_epi32(full);
@ -625,28 +696,16 @@ inline ALWAYS_INLINE bool sliceHasImplAnyAll<ArraySearchType::All, NumericArrayS
continue;
__m256i v_i = _mm256_set1_epi32(first.data[i]);
bitmask = _mm256_or_si256(bitmask, _mm256_cmpeq_epi32(f_data, v_i));
has_mask = _mm256_testc_si256 (bitmask, ones);
has_mask = _mm256_testc_si256(bitmask, ones);
}
}
}
}
bool found = false;
// Loop(j)-jam
for (; j < second.size && has_mask; ++j)
{
// skip null elements since both have at least one - assuming at least one element in the first null map is set
found = (has_second_null_map && second_null_map[j]) ? true : false;
for (unsigned i = 0; i < first.size && !found; ++i)
{
if (has_first_null_map && first_null_map[i])
continue;
found = (first.data[i] == second.data[j]);
}
if (!found)
return false;
}
return has_mask || found;
if (!has_mask)
return false;
return hasAllIntegralLoopRemainder(j, first, second, first_null_map, second_null_map);
}
// TODO: Discuss about
@ -655,26 +714,27 @@ inline ALWAYS_INLINE bool sliceHasImplAnyAll<ArraySearchType::All, NumericArrayS
// AVX2 UInt specialization
// template <>
// inline ALWAYS_INLINE bool sliceHasImplAnyAll<ArraySearchType::All, NumericArraySlice<unsigned>, NumericArraySlice<unsigned>, sliceEqualElements<unsigned,unsigned> >(
// const NumericArraySlice<unsigned> & second, const NumericArraySlice<unsigned> & first, const UInt8 * second_null_map, const UInt8 * first_null_map)
// const NumericArraySlice<unsigned> & second, const NumericArraySlice<unsigned> & first, const UInt8 * first_null_map, const UInt8 * second_null_map)
// {
// return sliceHasImplAnyAll<ArraySearchType::All, NumericArraySlice<int>, NumericArraySlice<int>, sliceEqualElements<int,int> > (
// return sliceHasImplAnyAll<ArraySearchType::All, NumericArraySlice<int>, NumericArraySlice<int>, sliceEqualElements<int,int> > (
// static_cast<const NumericArraySlice<int> &>(second), static_cast<const NumericArraySlice<int> &>(first), second_null_map, first_null_map);
// }
// AVX2 Int64 specialization
template <>
inline ALWAYS_INLINE bool sliceHasImplAnyAll<ArraySearchType::All, NumericArraySlice<Int64>, NumericArraySlice<Int64>, sliceEqualElements<Int64,Int64> >(
const NumericArraySlice<Int64> & first, const NumericArraySlice<Int64> & second, const UInt8 * second_null_map, const UInt8 * first_null_map)
const NumericArraySlice<Int64> & first, const NumericArraySlice<Int64> & second, const UInt8 * first_null_map, const UInt8 * second_null_map)
{
if (second.size == 0)
return true;
const bool has_first_null_map = first_null_map != nullptr;
const bool has_second_null_map = second_null_map != nullptr;
if (!has_first_null_map && has_second_null_map)
if (!hasNull(first_null_map, first.size) && hasNull(second_null_map, second.size))
return false;
unsigned j = 0;
const bool has_first_null_map = first_null_map != nullptr;
const bool has_second_null_map = second_null_map != nullptr;
size_t j = 0;
short has_mask = 1;
const int full = -1, none = 0;
const __m256i ones = _mm256_set1_epi64x(full);
@ -694,7 +754,7 @@ inline ALWAYS_INLINE bool sliceHasImplAnyAll<ArraySearchType::All, NumericArrayS
: zeros;
unsigned i = 0;
for (; i < first.size - 3 && !has_mask; has_mask = _mm256_testc_si256 (bitmask, ones), i += 4)
for (; i < first.size - 3 && !has_mask; has_mask = _mm256_testc_si256(bitmask, ones), i += 4)
{
const __m256i s_data = _mm256_lddqu_si256(reinterpret_cast<const __m256i*>(first.data + i));
const __m256i first_nm_mask = _mm256_set_m128i(
@ -729,42 +789,33 @@ inline ALWAYS_INLINE bool sliceHasImplAnyAll<ArraySearchType::All, NumericArrayS
continue;
__m256i v_i = _mm256_set1_epi64x(first.data[i]);
bitmask = _mm256_or_si256(bitmask, _mm256_cmpeq_epi64(f_data, v_i));
has_mask = _mm256_testc_si256 (bitmask, ones);
has_mask = _mm256_testc_si256(bitmask, ones);
}
}
}
}
bool found = false;
for (; j < second.size && (has_mask || second.size <= 2); ++j)
{
found = (has_second_null_map && second_null_map[j]) ? true: false;
for (unsigned i = 0; i < first.size && !found; ++i)
{
if (has_first_null_map && first_null_map[i])
continue;
found = (first.data[i] == second.data[j]);
}
if (!found)
return false;
}
return has_mask || found;
if (!has_mask && second.size > 2)
return false;
return hasAllIntegralLoopRemainder(j, first, second, first_null_map, second_null_map);
}
// AVX2 Int16_t specialization
template <>
inline ALWAYS_INLINE bool sliceHasImplAnyAll<ArraySearchType::All, NumericArraySlice<int16_t>, NumericArraySlice<int16_t>, sliceEqualElements<int16_t,int16_t> >(
const NumericArraySlice<int16_t> & first, const NumericArraySlice<int16_t> & second, const UInt8 * second_null_map, const UInt8 * first_null_map)
const NumericArraySlice<int16_t> & first, const NumericArraySlice<int16_t> & second, const UInt8 * first_null_map, const UInt8 * second_null_map)
{
if (second.size == 0)
return true;
const bool has_first_null_map = first_null_map != nullptr;
const bool has_second_null_map = second_null_map != nullptr;
if (!has_first_null_map && has_second_null_map)
if (!hasNull(first_null_map, first.size) && hasNull(second_null_map, second.size))
return false;
unsigned j = 0;
const bool has_first_null_map = first_null_map != nullptr;
const bool has_second_null_map = second_null_map != nullptr;
size_t j = 0;
short has_mask = 1;
const int full = -1, none = 0;
const __m256i ones = _mm256_set1_epi16(full);
@ -787,7 +838,7 @@ inline ALWAYS_INLINE bool sliceHasImplAnyAll<ArraySearchType::All, NumericArrayS
(second_null_map[j + 1]) ? full : none, (second_null_map[j]) ? full : none)
: zeros;
unsigned i = 0;
for (; i < first.size - 15 && !has_mask; has_mask = _mm256_testc_si256 (bitmask, ones), i += 16)
for (; i < first.size - 15 && !has_mask; has_mask = _mm256_testc_si256(bitmask, ones), i += 16)
{
const __m256i s_data = _mm256_lddqu_si256(reinterpret_cast<const __m256i*>(first.data + i));
const __m256i first_nm_mask = _mm256_set_m128i(
@ -874,26 +925,16 @@ inline ALWAYS_INLINE bool sliceHasImplAnyAll<ArraySearchType::All, NumericArrayS
continue;
__m256i v_i = _mm256_set1_epi16(first.data[i]);
bitmask = _mm256_or_si256(bitmask, _mm256_cmpeq_epi16(f_data, v_i));
has_mask = _mm256_testc_si256 (bitmask, ones);
has_mask = _mm256_testc_si256(bitmask, ones);
}
}
}
}
bool found = false;
for (; j < second.size && (has_mask || second.size <= 2); ++j)
{
found = (has_second_null_map && second_null_map[j])? true: false;
for (unsigned i = 0; i < first.size && !found; ++i)
{
if (has_first_null_map && first_null_map[i])
continue;
found = (first.data[i] == second.data[j]);
}
if (!found)
return false;
}
return has_mask || found;
if (!has_mask && second.size > 2)
return false;
return hasAllIntegralLoopRemainder(j, first, second, first_null_map, second_null_map);
}
#elif defined(__SSE4_2__)
@ -901,17 +942,18 @@ inline ALWAYS_INLINE bool sliceHasImplAnyAll<ArraySearchType::All, NumericArrayS
// SSE4.2 Int specialization
template <>
inline ALWAYS_INLINE bool sliceHasImplAnyAll<ArraySearchType::All, NumericArraySlice<int>, NumericArraySlice<int>, sliceEqualElements<int,int> >(
const NumericArraySlice<int> & first, const NumericArraySlice<int> & second, const UInt8 * second_null_map, const UInt8 * first_null_map)
const NumericArraySlice<int> & first, const NumericArraySlice<int> & second, const UInt8 * first_null_map, const UInt8 * second_null_map)
{
if (second.size == 0)
return true;
const bool has_first_null_map = first_null_map != nullptr;
const bool has_second_null_map = second_null_map != nullptr;
if (!has_first_null_map && has_second_null_map)
if (!hasNull(first_null_map, first.size) && hasNull(second_null_map, second.size))
return false;
unsigned j = 0;
const bool has_first_null_map = first_null_map != nullptr;
const bool has_second_null_map = second_null_map != nullptr;
size_t j = 0;
short has_mask = 1;
const __m128i zeros = _mm_setzero_si128();
if (second.size > 3 && first.size > 2)
@ -972,36 +1014,27 @@ inline ALWAYS_INLINE bool sliceHasImplAnyAll<ArraySearchType::All, NumericArrayS
}
}
bool found = false;
for (; j < second.size && has_mask; ++j)
{
found = (has_second_null_map && second_null_map[j]) ? true: false;
for (unsigned i = 0; i < first.size && !found; ++i)
{
if (has_first_null_map && first_null_map[i])
continue;
found = (first.data[i] == second.data[j]);
}
if (!found)
return false;
}
return has_mask || found;
if (!has_mask)
return false;
return hasAllIntegralLoopRemainder(j, first, second, first_null_map, second_null_map);
}
// SSE4.2 Int64 specialization
template <>
inline ALWAYS_INLINE bool sliceHasImplAnyAll<ArraySearchType::All, NumericArraySlice<Int64>, NumericArraySlice<Int64>, sliceEqualElements<Int64,Int64> >(
const NumericArraySlice<Int64> & first, const NumericArraySlice<Int64> & second, const UInt8 * second_null_map, const UInt8 * first_null_map)
const NumericArraySlice<Int64> & first, const NumericArraySlice<Int64> & second, const UInt8 * first_null_map, const UInt8 * second_null_map)
{
if (second.size == 0)
return true;
const bool has_first_null_map = first_null_map != nullptr;
const bool has_second_null_map = second_null_map != nullptr;
if (!has_first_null_map && has_second_null_map)
if (!hasNull(first_null_map, first.size) && hasNull(second_null_map, second.size))
return false;
unsigned j = 0;
const bool has_first_null_map = first_null_map != nullptr;
const bool has_second_null_map = second_null_map != nullptr;
size_t j = 0;
short has_mask = 1;
const Int64 full = -1, none = 0;
const __m128i zeros = _mm_setzero_si128();
@ -1046,36 +1079,27 @@ inline ALWAYS_INLINE bool sliceHasImplAnyAll<ArraySearchType::All, NumericArrayS
}
}
bool found = false;
for (; j < second.size && has_mask; j++)
{
found = (has_second_null_map && second_null_map[j]) ? true : false;
for (unsigned i = 0; i < first.size && !found; ++i)
{
if (has_first_null_map && first_null_map[i])
continue;
found = (first.data[i] == second.data[j]);
}
if (!found)
return false;
}
return has_mask || found;
if (!has_mask)
return false;
return hasAllIntegralLoopRemainder(j, first, second, first_null_map, second_null_map);
}
// SSE4.2 Int16_t specialization
template <>
inline ALWAYS_INLINE bool sliceHasImplAnyAll<ArraySearchType::All, NumericArraySlice<int16_t>, NumericArraySlice<int16_t>, sliceEqualElements<int16_t,int16_t> >(
const NumericArraySlice<int16_t> & first, const NumericArraySlice<int16_t> & second, const UInt8 * second_null_map, const UInt8 * first_null_map)
const NumericArraySlice<int16_t> & first, const NumericArraySlice<int16_t> & second, const UInt8 * first_null_map, const UInt8 * second_null_map)
{
if (second.size == 0)
return true;
const bool has_first_null_map = first_null_map != nullptr;
const bool has_second_null_map = second_null_map != nullptr;
if (!has_first_null_map && has_second_null_map)
if (!hasNull(first_null_map, first.size) && hasNull(second_null_map, second.size))
return false;
unsigned j = 0;
const bool has_first_null_map = first_null_map != nullptr;
const bool has_second_null_map = second_null_map != nullptr;
size_t j = 0;
short has_mask = 1;
const int16_t full = -1, none = 0;
const __m128i zeros = _mm_setzero_si128();
@ -1151,36 +1175,27 @@ inline ALWAYS_INLINE bool sliceHasImplAnyAll<ArraySearchType::All, NumericArrayS
}
}
bool found = false;
for (; j < second.size && (has_mask || second.size <= 2); ++j)
{
found = (has_second_null_map && second_null_map[j]) ? true : false;
for (unsigned i = 0; i < first.size && !found; ++i)
{
if (has_first_null_map && first_null_map[i])
continue;
found = (first.data[i] == second.data[j]);
}
if (!found)
return false;
}
return has_mask || found;
if (!has_mask && second.size > 2)
return false;
return hasAllIntegralLoopRemainder(j, first, second, first_null_map, second_null_map);
}
// SSE4.2 Int8_t specialization
template <>
inline ALWAYS_INLINE bool sliceHasImplAnyAll<ArraySearchType::All, NumericArraySlice<int8_t>, NumericArraySlice<int8_t>, sliceEqualElements<int8_t,int8_t> >(
const NumericArraySlice<int8_t> & first, const NumericArraySlice<int8_t> & second, const UInt8 * second_null_map, const UInt8 * first_null_map)
const NumericArraySlice<int8_t> & first, const NumericArraySlice<int8_t> & second, const UInt8 * first_null_map, const UInt8 * second_null_map)
{
if (second.size == 0)
return true;
const bool has_first_null_map = first_null_map != nullptr;
const bool has_second_null_map = second_null_map != nullptr;
if (!has_first_null_map && has_second_null_map)
if (!hasNull(first_null_map, first.size) && hasNull(second_null_map, second.size))
return false;
unsigned j = 0;
const bool has_first_null_map = first_null_map != nullptr;
const bool has_second_null_map = second_null_map != nullptr;
size_t j = 0;
short has_mask = 1;
const int full = -1, none = 0;
const __m128i zeros = _mm_setzero_si128();
@ -1291,21 +1306,10 @@ inline ALWAYS_INLINE bool sliceHasImplAnyAll<ArraySearchType::All, NumericArrayS
}
}
bool found = false;
for (; j < second.size && has_mask; ++j)
{
found = (has_second_null_map && second_null_map[j]) ? true : false;
for (unsigned i = 0; i < first.size && !found; ++i)
{
if (has_first_null_map && first_null_map[i])
continue;
found = (first.data[i] == second.data[j]);
}
if (!found)
return false;
}
if (!has_mask)
return false;
return has_mask || found;
return hasAllIntegralLoopRemainder(j, first, second, first_null_map, second_null_map);
}
#endif
@ -1566,4 +1570,3 @@ void resizeConstantSize(ArraySource && array_source, ValueSource && value_source
}
}