Better string comparison (development)

This commit is contained in:
Alexey Milovidov 2019-03-03 23:08:39 +03:00
parent 9216fd2810
commit 315b6f3878
9 changed files with 429 additions and 214 deletions

View File

@ -4,6 +4,7 @@
#include <Common/Arena.h>
#include <Common/SipHash.h>
#include <Common/memcpySmall.h>
#include <Common/memcmpSmall.h>
#include <DataStreams/ColumnGathererStream.h>
@ -106,8 +107,7 @@ struct ColumnFixedString::less
explicit less(const ColumnFixedString & parent_) : parent(parent_) {}
bool operator()(size_t lhs, size_t rhs) const
{
/// TODO: memcmp slows down.
int res = memcmp(&parent.chars[lhs * parent.n], &parent.chars[rhs * parent.n], parent.n);
int res = memcmpSmallAllowOverflow15(parent.chars.data() + lhs * parent.n, parent.chars.data() + rhs * parent.n, parent.n);
return positive ? (res < 0) : (res > 0);
}
};

View File

@ -1,8 +1,7 @@
#pragma once
#include <string.h> // memcmp
#include <Common/PODArray.h>
#include <Common/memcmpSmall.h>
#include <Columns/IColumn.h>
#include <Columns/ColumnVectorHelper.h>
@ -98,7 +97,7 @@ public:
int compareAt(size_t p1, size_t p2, const IColumn & rhs_, int /*nan_direction_hint*/) const override
{
const ColumnFixedString & rhs = static_cast<const ColumnFixedString &>(rhs_);
return memcmp(&chars[p1 * n], &rhs.chars[p2 * n], n);
return memcmpSmallAllowOverflow15(chars.data() + p1 * n, rhs.chars.data() + p2 * n, n);
}
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override;

View File

@ -1,5 +1,6 @@
#include <Core/Defines.h>
#include <Common/Arena.h>
#include <Common/memcmpSmall.h>
#include <Columns/Collator.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsCommon.h>
@ -239,15 +240,11 @@ struct ColumnString::less
explicit less(const ColumnString & parent_) : parent(parent_) {}
bool operator()(size_t lhs, size_t rhs) const
{
size_t left_len = parent.sizeAt(lhs);
size_t right_len = parent.sizeAt(rhs);
int res = memcmpSmallAllowOverflow15(
parent.chars.data() + parent.offsetAt(lhs), parent.sizeAt(lhs),
parent.chars.data() + parent.offsetAt(rhs), parent.sizeAt(rhs));
int res = memcmp(&parent.chars[parent.offsetAt(lhs)], &parent.chars[parent.offsetAt(rhs)], std::min(left_len, right_len));
if (res != 0)
return positive ? (res < 0) : (res > 0);
else
return positive ? (left_len < right_len) : (left_len > right_len);
return positive ? (res < 0) : (res > 0);
}
};

View File

@ -6,6 +6,7 @@
#include <Common/PODArray.h>
#include <Common/SipHash.h>
#include <Common/memcpySmall.h>
#include <Common/memcmpSmall.h>
class Collator;
@ -210,16 +211,7 @@ public:
int compareAt(size_t n, size_t m, const IColumn & rhs_, int /*nan_direction_hint*/) const override
{
const ColumnString & rhs = static_cast<const ColumnString &>(rhs_);
const size_t size = sizeAt(n);
const size_t rhs_size = rhs.sizeAt(m);
int cmp = memcmp(&chars[offsetAt(n)], &rhs.chars[rhs.offsetAt(m)], std::min(size, rhs_size));
if (cmp != 0)
return cmp;
else
return size > rhs_size ? 1 : (size < rhs_size ? -1 : 0);
return memcmpSmallAllowOverflow15(chars.data() + offsetAt(n), sizeAt(n), rhs.chars.data() + rhs.offsetAt(m), rhs.sizeAt(m));
}
/// Variant of compareAt for string comparison with respect of collation.

View File

@ -0,0 +1,219 @@
#pragma once
#include <string.h>
#include <algorithm>
#ifdef __SSE2__
#include <emmintrin.h>
namespace detail
{
template <typename T>
inline int cmp(T a, T b)
{
if (a < b)
return -1;
if (a > b)
return 1;
return 0;
}
}
/** All functions works under the following assumptions:
* - it's possible to read up to 15 excessive bytes after end of 'a' and 'b' region;
* - memory regions are relatively small and extra loop unrolling is not worth to do.
*/
/** Variant when memory regions may have different sizes.
*/
template <typename Char>
inline int memcmpSmallAllowOverflow15(const Char * a, size_t a_size, const Char * b, size_t b_size)
{
size_t min_size = std::min(a_size, b_size);
size_t size_to_compare_sse = (min_size + 15) / 16 * 16;
for (size_t offset = 0; offset < size_to_compare_sse; offset += 16)
{
auto mask = ~_mm_movemask_epi8(_mm_cmpeq_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(a + offset)),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(b + offset))));
if (mask)
{
offset += __builtin_ctz(mask);
if (offset >= min_size)
break;
return detail::cmp(a[offset], b[offset]);
}
}
return detail::cmp(a_size, b_size);
}
/** Variant when memory regions have same size.
* TODO Check if the compiler can optimize previous function when the caller pass identical sizes.
*/
template <typename Char>
inline int memcmpSmallAllowOverflow15(const Char * a, const Char * b, size_t size)
{
size_t size_to_compare_sse = (size + 15) / 16 * 16;
for (size_t offset = 0; offset < size_to_compare_sse; offset += 16)
{
auto mask = ~_mm_movemask_epi8(_mm_cmpeq_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(a + offset)),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(b + offset))));
if (mask)
{
offset += __builtin_ctz(mask);
if (offset >= size)
return 0;
return detail::cmp(a[offset], b[offset]);
}
}
return 0;
}
/** Compare memory regions for equality.
*/
template <typename Char>
inline bool memequalSmallAllowOverflow15(const Char * a, size_t a_size, const Char * b, size_t b_size)
{
if (a_size != b_size)
return false;
size_t size_to_compare_sse = (a_size + 15) / 16 * 16;
for (size_t offset = 0; offset < size_to_compare_sse; offset += 16)
{
auto mask = ~_mm_movemask_epi8(_mm_cmpeq_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(a + offset)),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(b + offset))));
if (mask)
{
offset += __builtin_ctz(mask);
return offset >= a_size;
}
}
return true;
}
/** Variant when the caller know in advance that the size is a multiple of 16.
*/
template <typename Char>
inline int memcmpSmallMultipleOf16(const Char * a, const Char * b, size_t size)
{
for (size_t offset = 0; offset < size; offset += 16)
{
auto mask = ~_mm_movemask_epi8(_mm_cmpeq_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(a + offset)),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(b + offset))));
if (mask)
{
offset += __builtin_ctz(mask);
return detail::cmp(a[offset], b[offset]);
}
}
return 0;
}
/** Variant when the size is 16 exactly.
*/
template <typename Char>
inline int memcmp16(const Char * a, const Char * b)
{
auto mask = ~_mm_movemask_epi8(_mm_cmpeq_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(a)),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(b))));
if (mask)
{
auto offset = __builtin_ctz(mask);
return detail::cmp(a[offset], b[offset]);
}
return 0;
}
/** Variant when the size is 16 exactly.
*/
inline bool memequal16(const void * a, const void * b)
{
return 0xFFFF == _mm_movemask_epi8(_mm_cmpeq_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(a)),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(b))));
}
/** Compare memory region to zero */
inline bool memoryIsZeroSmallAllowOverflow15(const void * data, size_t size)
{
const __m128 zero16 = _mm_setzero_ps();
for (size_t offset = 0; offset < size; offset += 16)
{
auto mask = ~_mm_movemask_epi8(_mm_cmpeq_epi8(zero16,
_mm_loadu_si128(reinterpret_cast<const __m128i *>(reinterpret_cast<const char *>(data) + offset))));
if (mask)
{
offset += __builtin_ctz(mask);
return offset >= size;
}
}
return true;
}
#else
template <typename Char>
inline int memcmpSmallAllowOverflow15(const Char * a, size_t a_size, const Char * b, size_t b_size)
{
return memcmp(a, b, std::min(a_size, b_size));
}
template <typename Char>
inline int memcmpSmallAllowOverflow15(const Char * a, const Char * b, size_t size)
{
return memcmp(a, b, size);
}
template <typename Char>
inline int memcmpSmallMultipleOf16(const Char * a, const Char * b, size_t size)
{
return memcmp(a, b, size);
}
inline bool memoryIsZeroSmallAllowOverflow15(const void * data, size_t size)
{
const char * pos = reinterpret_cast<const char *>(data);
const char * end = pos + size;
for (; pos < end; ++pos)
if (*pos)
return false;
return true;
}
#endif

View File

@ -1,7 +1,6 @@
#pragma once
#include <string.h>
#include <Core/Defines.h>
#ifdef __SSE2__
#include <emmintrin.h>

View File

@ -1,5 +1,6 @@
#pragma once
#include <cstring>
#include <Common/memcmpSmall.h>
#include <Columns/ColumnString.h>
#include <Functions/FunctionFactory.h>
@ -38,11 +39,9 @@ struct EmptyImpl
static void vector_fixed_to_vector(const ColumnString::Chars & data, size_t n, PaddedPODArray<UInt8> & res)
{
std::vector<char> empty_chars(n);
size_t size = data.size() / n;
for (size_t i = 0; i < size; ++i)
res[i] = negative ^ (0 == memcmp(&data[i * n], empty_chars.data(), n));
size_t size = data.size();
for (size_t i = 0; i < size; i += n)
res[i] = negative ^ memoryIsZeroSmallAllowOverflow15(data.data() + i, n);
}
static void array(const ColumnString::Offsets & offsets, PaddedPODArray<UInt8> & res)

View File

@ -1,5 +1,7 @@
#pragma once
#include <Common/memcmpSmall.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnDecimal.h>
@ -115,30 +117,6 @@ struct NumComparisonImpl
};
inline int memcmp16(const void * a, const void * b)
{
/// Assuming little endian.
UInt64 a_hi = __builtin_bswap64(unalignedLoad<UInt64>(a));
UInt64 b_hi = __builtin_bswap64(unalignedLoad<UInt64>(b));
if (a_hi < b_hi)
return -1;
if (a_hi > b_hi)
return 1;
UInt64 a_lo = __builtin_bswap64(unalignedLoad<UInt64>(reinterpret_cast<const char *>(a) + 8));
UInt64 b_lo = __builtin_bswap64(unalignedLoad<UInt64>(reinterpret_cast<const char *>(b) + 8));
if (a_lo < b_lo)
return -1;
if (a_lo > b_lo)
return 1;
return 0;
}
template <typename Op>
struct StringComparisonImpl
{
@ -148,27 +126,17 @@ struct StringComparisonImpl
PaddedPODArray<UInt8> & c)
{
size_t size = a_offsets.size();
ColumnString::Offset prev_a_offset = 0;
ColumnString::Offset prev_b_offset = 0;
for (size_t i = 0; i < size; ++i)
{
/// Trailing zero byte of the smaller string is included in the comparison.
size_t a_size;
size_t b_size;
int res;
if (i == 0)
{
a_size = a_offsets[0];
b_size = b_offsets[0];
res = memcmp(a_data.data(), b_data.data(), std::min(a_size, b_size));
}
else
{
a_size = a_offsets[i] - a_offsets[i - 1];
b_size = b_offsets[i] - b_offsets[i - 1];
res = memcmp(&a_data[a_offsets[i - 1]], &b_data[b_offsets[i - 1]], std::min(a_size, b_size));
}
c[i] = Op::apply(memcmpSmallAllowOverflow15(
a_data.data() + prev_a_offset, a_offsets[i] - prev_a_offset - 1,
b_data.data() + prev_b_offset, b_offsets[i] - prev_b_offset - 1), 0);
c[i] = Op::apply(res, 0) || (res == 0 && Op::apply(a_size, b_size));
prev_a_offset = a_offsets[i];
prev_b_offset = b_offsets[i];
}
}
@ -178,43 +146,33 @@ struct StringComparisonImpl
PaddedPODArray<UInt8> & c)
{
size_t size = a_offsets.size();
ColumnString::Offset prev_a_offset = 0;
for (size_t i = 0; i < size; ++i)
{
if (i == 0)
{
int res = memcmp(a_data.data(), b_data.data(), std::min(a_offsets[0] - 1, b_n));
c[i] = Op::apply(res, 0) || (res == 0 && Op::apply(a_offsets[0], b_n + 1));
}
else
{
int res = memcmp(&a_data[a_offsets[i - 1]], &b_data[i * b_n],
std::min(a_offsets[i] - a_offsets[i - 1] - 1, b_n));
c[i] = Op::apply(res, 0) || (res == 0 && Op::apply(a_offsets[i] - a_offsets[i - 1], b_n + 1));
}
c[i] = Op::apply(memcmpSmallAllowOverflow15(
a_data.data() + prev_a_offset, a_offsets[i] - prev_a_offset - 1,
b_data.data() + i * b_n, b_n), 0);
prev_a_offset = a_offsets[i];
}
}
static void NO_INLINE string_vector_constant(
const ColumnString::Chars & a_data, const ColumnString::Offsets & a_offsets,
const std::string & b,
const ColumnString::Chars & b_data, ColumnString::Offset b_size,
PaddedPODArray<UInt8> & c)
{
size_t size = a_offsets.size();
ColumnString::Offset b_size = b.size() + 1;
const UInt8 * b_data = reinterpret_cast<const UInt8 *>(b.data());
ColumnString::Offset prev_a_offset = 0;
for (size_t i = 0; i < size; ++i)
{
/// Trailing zero byte of the smaller string is included in the comparison.
if (i == 0)
{
int res = memcmp(a_data.data(), b_data, std::min(a_offsets[0], b_size));
c[i] = Op::apply(res, 0) || (res == 0 && Op::apply(a_offsets[0], b_size));
}
else
{
int res = memcmp(&a_data[a_offsets[i - 1]], b_data, std::min(a_offsets[i] - a_offsets[i - 1], b_size));
c[i] = Op::apply(res, 0) || (res == 0 && Op::apply(a_offsets[i] - a_offsets[i - 1], b_size));
}
c[i] = Op::apply(memcmpSmallAllowOverflow15(
a_data.data() + prev_a_offset, a_offsets[i] - prev_a_offset - 1,
b_data.data(), b_size), 0);
prev_a_offset = a_offsets[i];
}
}
@ -239,13 +197,13 @@ struct StringComparisonImpl
static void NO_INLINE fixed_string_vector_constant_16(
const ColumnString::Chars & a_data,
const std::string & b,
const ColumnString::Chars & b_data,
PaddedPODArray<UInt8> & c)
{
size_t size = a_data.size();
for (size_t i = 0, j = 0; i < size; i += 16, ++j)
c[j] = Op::apply(memcmp16(&a_data[i], b.data()), 0);
c[j] = Op::apply(memcmp16(&a_data[i], &b_data[0]), 0);
}
static void NO_INLINE fixed_string_vector_fixed_string_vector(
@ -253,74 +211,73 @@ struct StringComparisonImpl
const ColumnString::Chars & b_data, ColumnString::Offset b_n,
PaddedPODArray<UInt8> & c)
{
/** Specialization if both sizes are 16.
* To more efficient comparison of IPv6 addresses stored in FixedString(16).
*/
if (a_n == 16 && b_n == 16)
{
/** Specialization if both sizes are 16.
* To more efficient comparison of IPv6 addresses stored in FixedString(16).
*/
fixed_string_vector_fixed_string_vector_16(a_data, b_data, c);
}
else if (a_n == b_n)
{
size_t size = a_data.size();
for (size_t i = 0, j = 0; i < size; i += a_n, ++j)
c[j] = Op::apply(memcmpSmallAllowOverflow15(a_data.data() + i, b_data.data() + i, a_n), 0);
}
else
{
/// Generic implementation, less efficient.
size_t size = a_data.size();
size_t size = a_data.size() / a_n;
for (size_t i = 0, j = 0; i < size; i += a_n, ++j)
{
int res = memcmp(&a_data[i], &b_data[i], std::min(a_n, b_n));
c[j] = Op::apply(res, 0) || (res == 0 && Op::apply(a_n, b_n));
}
for (size_t i = 0; i < size; ++i)
c[i] = Op::apply(memcmpSmallAllowOverflow15(a_data.data() + i * a_n, a_n, b_data.data() + i * b_n, b_n), 0);
}
}
static void NO_INLINE fixed_string_vector_constant(
const ColumnString::Chars & a_data, ColumnString::Offset a_n,
const std::string & b,
const ColumnString::Chars & b_data, ColumnString::Offset b_size,
PaddedPODArray<UInt8> & c)
{
ColumnString::Offset b_n = b.size();
if (a_n == 16 && b_n == 16)
if (a_n == 16 && b_size == 16)
{
fixed_string_vector_constant_16(a_data, b, c);
fixed_string_vector_constant_16(a_data, b_data, c);
}
else if (a_n == b_size)
{
size_t size = a_data.size();
for (size_t i = 0, j = 0; i < size; i += a_n, ++j)
c[j] = Op::apply(memcmpSmallAllowOverflow15(a_data.data() + i, b_data.data(), a_n), 0);
}
else
{
size_t size = a_data.size();
const UInt8 * b_data = reinterpret_cast<const UInt8 *>(b.data());
for (size_t i = 0, j = 0; i < size; i += a_n, ++j)
{
int res = memcmp(&a_data[i], b_data, std::min(a_n, b_n));
c[j] = Op::apply(res, 0) || (res == 0 && Op::apply(a_n, b_n));
}
c[j] = Op::apply(memcmpSmallAllowOverflow15(a_data.data() + i, a_n, b_data.data(), b_size), 0);
}
}
static void constant_string_vector(
const std::string & a,
const ColumnString::Chars & a_data, ColumnString::Offset a_size,
const ColumnString::Chars & b_data, const ColumnString::Offsets & b_offsets,
PaddedPODArray<UInt8> & c)
{
StringComparisonImpl<typename Op::SymmetricOp>::string_vector_constant(b_data, b_offsets, a, c);
StringComparisonImpl<typename Op::SymmetricOp>::string_vector_constant(b_data, b_offsets, a_data, a_size, c);
}
static void constant_fixed_string_vector(
const std::string & a,
const ColumnString::Chars & a_data, ColumnString::Offset a_size,
const ColumnString::Chars & b_data, ColumnString::Offset b_n,
PaddedPODArray<UInt8> & c)
{
StringComparisonImpl<typename Op::SymmetricOp>::fixed_string_vector_constant(b_data, b_n, a, c);
StringComparisonImpl<typename Op::SymmetricOp>::fixed_string_vector_constant(b_data, b_n, a_data, a_size, c);
}
static void constant_constant(
const std::string & a,
const std::string & b,
const ColumnString::Chars & a_data, ColumnString::Offset a_size,
const ColumnString::Chars & b_data, ColumnString::Offset b_size,
UInt8 & c)
{
size_t a_n = a.size();
size_t b_n = b.size();
int res = memcmp(a.data(), b.data(), std::min(a_n, b_n));
c = Op::apply(res, 0) || (res == 0 && Op::apply(a_n, b_n));
c = Op::apply(memcmpSmallAllowOverflow15(a_data.data(), a_size, b_data.data(), b_size), 0);
}
};
@ -335,11 +292,21 @@ struct StringEqualsImpl
PaddedPODArray<UInt8> & c)
{
size_t size = a_offsets.size();
ColumnString::Offset prev_a_offset = 0;
ColumnString::Offset prev_b_offset = 0;
for (size_t i = 0; i < size; ++i)
c[i] = positive == ((i == 0)
? (a_offsets[0] == b_offsets[0] && !memcmp(a_data.data(), b_data.data(), a_offsets[0] - 1))
: (a_offsets[i] - a_offsets[i - 1] == b_offsets[i] - b_offsets[i - 1]
&& !memcmp(&a_data[a_offsets[i - 1]], &b_data[b_offsets[i - 1]], a_offsets[i] - a_offsets[i - 1] - 1)));
{
auto a_size = a_offsets[i] - prev_a_offset - 1;
auto b_size = b_offsets[i] - prev_b_offset - 1;
c[i] = positive == memequalSmallAllowOverflow15(
a_data.data() + prev_a_offset, a_size,
b_data.data() + prev_b_offset, b_size);
prev_a_offset = a_offsets[i];
prev_b_offset = b_offsets[i];
}
}
static void NO_INLINE string_vector_fixed_string_vector(
@ -348,76 +315,65 @@ struct StringEqualsImpl
PaddedPODArray<UInt8> & c)
{
size_t size = a_offsets.size();
ColumnString::Offset prev_a_offset = 0;
for (size_t i = 0; i < size; ++i)
c[i] = positive == ((i == 0)
? (a_offsets[0] == b_n + 1 && !memcmp(a_data.data(), b_data.data(), b_n))
: (a_offsets[i] - a_offsets[i - 1] == b_n + 1
&& !memcmp(&a_data[a_offsets[i - 1]], &b_data[b_n * i], b_n)));
{
auto a_size = a_offsets[i] - prev_a_offset - 1;
c[i] = positive == memequalSmallAllowOverflow15(
a_data.data() + prev_a_offset, a_size,
b_data.data() + b_n * i, b_n);
prev_a_offset = a_offsets[i];
}
}
static void NO_INLINE string_vector_constant(
const ColumnString::Chars & a_data, const ColumnString::Offsets & a_offsets,
const std::string & b,
const ColumnString::Chars & b_data, ColumnString::Offset b_size,
PaddedPODArray<UInt8> & c)
{
size_t size = a_offsets.size();
ColumnString::Offset b_n = b.size();
const UInt8 * b_data = reinterpret_cast<const UInt8 *>(b.data());
ColumnString::Offset prev_a_offset = 0;
for (size_t i = 0; i < size; ++i)
c[i] = positive == ((i == 0)
? (a_offsets[0] == b_n + 1 && !memcmp(a_data.data(), b_data, b_n))
: (a_offsets[i] - a_offsets[i - 1] == b_n + 1
&& !memcmp(&a_data[a_offsets[i - 1]], b_data, b_n)));
{
auto a_size = a_offsets[i] - prev_a_offset - 1;
c[i] = positive == memequalSmallAllowOverflow15(
a_data.data() + prev_a_offset, a_size,
b_data.data(), b_size);
prev_a_offset = a_offsets[i];
}
}
#ifdef __SSE2__
static void NO_INLINE fixed_string_vector_fixed_string_vector_16(
const ColumnString::Chars & a_data,
const ColumnString::Chars & b_data,
PaddedPODArray<UInt8> & c)
{
size_t size = c.size();
size_t size = a_data.size() / 16;
const __m128i * a_pos = reinterpret_cast<const __m128i *>(a_data.data());
const __m128i * b_pos = reinterpret_cast<const __m128i *>(b_data.data());
UInt8 * c_pos = c.data();
UInt8 * c_end = c_pos + size;
while (c_pos < c_end)
{
*c_pos = positive == (0xFFFF == _mm_movemask_epi8(_mm_cmpeq_epi8(
_mm_loadu_si128(a_pos),
_mm_loadu_si128(b_pos))));
++a_pos;
++b_pos;
++c_pos;
}
for (size_t i = 0; i < size; ++i)
c[i] = positive == memequal16(
a_data.data() + i * 16,
b_data.data() + i * 16);
}
static void NO_INLINE fixed_string_vector_constant_16(
const ColumnString::Chars & a_data,
const std::string & b,
const ColumnString::Chars & b_data,
PaddedPODArray<UInt8> & c)
{
size_t size = c.size();
size_t size = a_data.size() / 16;
const __m128i * a_pos = reinterpret_cast<const __m128i *>(a_data.data());
const __m128i b_value = _mm_loadu_si128(reinterpret_cast<const __m128i *>(b.data()));
UInt8 * c_pos = c.data();
UInt8 * c_end = c_pos + size;
while (c_pos < c_end)
{
*c_pos = positive == (0xFFFF == _mm_movemask_epi8(_mm_cmpeq_epi8(
_mm_loadu_si128(a_pos),
b_value)));
++a_pos;
++c_pos;
}
for (size_t i = 0; i < size; ++i)
c[i] = positive == memequal16(
a_data.data() + i * 16,
b_data.data());
}
#endif
static void NO_INLINE fixed_string_vector_fixed_string_vector(
const ColumnString::Chars & a_data, ColumnString::Offset a_n,
@ -427,38 +383,32 @@ struct StringEqualsImpl
/** Specialization if both sizes are 16.
* To more efficient comparison of IPv6 addresses stored in FixedString(16).
*/
#ifdef __SSE2__
if (a_n == 16 && b_n == 16)
{
fixed_string_vector_fixed_string_vector_16(a_data, b_data, c);
}
else
#endif
{
size_t size = a_data.size();
for (size_t i = 0, j = 0; i < size; i += a_n, ++j)
c[j] = positive == (a_n == b_n && !memcmp(&a_data[i], &b_data[i], a_n));
size_t size = a_data.size() / a_n;
for (size_t i = 0; i < size; ++i)
c[i] = positive == memequalSmallAllowOverflow15(a_data.data() + i * a_n, a_n, b_data.data() + i * b_n, b_n);
}
}
static void NO_INLINE fixed_string_vector_constant(
const ColumnString::Chars & a_data, ColumnString::Offset a_n,
const std::string & b,
const ColumnString::Chars & b_data, ColumnString::Offset b_size,
PaddedPODArray<UInt8> & c)
{
ColumnString::Offset b_n = b.size();
#ifdef __SSE2__
if (a_n == 16 && b_n == 16)
if (a_n == 16 && b_size == 16)
{
fixed_string_vector_constant_16(a_data, b, c);
fixed_string_vector_constant_16(a_data, b_data, c);
}
else
#endif
{
size_t size = a_data.size();
const UInt8 * b_data = reinterpret_cast<const UInt8 *>(b.data());
for (size_t i = 0, j = 0; i < size; i += a_n, ++j)
c[j] = positive == (a_n == b_n && !memcmp(&a_data[i], b_data, a_n));
size_t size = a_data.size() / a_n;
for (size_t i = 0; i < size; ++i)
c[i] = positive == memequalSmallAllowOverflow15(a_data.data() + i * a_n, a_n, b_data.data(), b_size);
}
}
@ -471,27 +421,27 @@ struct StringEqualsImpl
}
static void constant_string_vector(
const std::string & a,
const ColumnString::Chars & a_data, ColumnString::Offset a_size,
const ColumnString::Chars & b_data, const ColumnString::Offsets & b_offsets,
PaddedPODArray<UInt8> & c)
{
string_vector_constant(b_data, b_offsets, a, c);
string_vector_constant(b_data, b_offsets, a_data, a_size, c);
}
static void constant_fixed_string_vector(
const std::string & a,
const ColumnString::Chars & a_data, ColumnString::Offset a_size,
const ColumnString::Chars & b_data, ColumnString::Offset b_n,
PaddedPODArray<UInt8> & c)
{
fixed_string_vector_constant(b_data, b_n, a, c);
fixed_string_vector_constant(b_data, b_n, a_data, a_size, c);
}
static void constant_constant(
const std::string & a,
const std::string & b,
const ColumnString::Chars & a_data, ColumnString::Offset a_size,
const ColumnString::Chars & b_data, ColumnString::Offset b_size,
UInt8 & c)
{
c = positive == (a == b);
c = positive == memequalSmallAllowOverflow15(a_data.data(), a_size, b_data.data(), b_size);
}
};
@ -744,18 +694,62 @@ private:
const ColumnString * c1_string = checkAndGetColumn<ColumnString>(c1);
const ColumnFixedString * c0_fixed_string = checkAndGetColumn<ColumnFixedString>(c0);
const ColumnFixedString * c1_fixed_string = checkAndGetColumn<ColumnFixedString>(c1);
const ColumnConst * c0_const = checkAndGetColumnConstStringOrFixedString(c0);
const ColumnConst * c1_const = checkAndGetColumnConstStringOrFixedString(c1);
if (!((c0_string || c0_fixed_string || c0_const) && (c1_string || c1_fixed_string || c1_const)))
return false;
const ColumnString::Chars * c0_const_chars = nullptr;
const ColumnString::Chars * c1_const_chars = nullptr;
ColumnString::Offset c0_const_size = 0;
ColumnString::Offset c1_const_size = 0;
if (c0_const)
{
const ColumnString * c0_const_string = checkAndGetColumn<ColumnString>(&c0_const->getDataColumn());
const ColumnFixedString * c0_const_fixed_string = checkAndGetColumn<ColumnFixedString>(&c0_const->getDataColumn());
if (c0_const_string)
{
c0_const_chars = &c0_const_string->getChars();
c0_const_size = c0_const_string->getDataAt(0).size;
}
else if (c0_const_fixed_string)
{
c0_const_chars = &c0_const_fixed_string->getChars();
c0_const_size = c0_const_fixed_string->getN();
}
else
throw Exception("Logical error: ColumnConst contains not String nor FixedString column", ErrorCodes::ILLEGAL_COLUMN);
}
if (c1_const)
{
const ColumnString * c1_const_string = checkAndGetColumn<ColumnString>(&c1_const->getDataColumn());
const ColumnFixedString * c1_const_fixed_string = checkAndGetColumn<ColumnFixedString>(&c1_const->getDataColumn());
if (c1_const_string)
{
c1_const_chars = &c1_const_string->getChars();
c1_const_size = c1_const_string->getDataAt(0).size;
}
else if (c1_const_fixed_string)
{
c1_const_chars = &c1_const_fixed_string->getChars();
c1_const_size = c1_const_fixed_string->getN();
}
else
throw Exception("Logical error: ColumnConst contains not String nor FixedString column", ErrorCodes::ILLEGAL_COLUMN);
}
using StringImpl = StringComparisonImpl<Op<int, int>>;
if (c0_const && c1_const)
{
UInt8 res = 0;
StringImpl::constant_constant(c0_const->getValue<String>(), c1_const->getValue<String>(), res);
StringImpl::constant_constant(*c0_const_chars, c0_const_size, *c1_const_chars, c1_const_size, res);
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(c0_const->size(), toField(res));
return true;
}
@ -778,7 +772,7 @@ private:
else if (c0_string && c1_const)
StringImpl::string_vector_constant(
c0_string->getChars(), c0_string->getOffsets(),
c1_const->getValue<String>(),
*c1_const_chars, c1_const_size,
c_res->getData());
else if (c0_fixed_string && c1_string)
StringImpl::fixed_string_vector_string_vector(
@ -793,16 +787,16 @@ private:
else if (c0_fixed_string && c1_const)
StringImpl::fixed_string_vector_constant(
c0_fixed_string->getChars(), c0_fixed_string->getN(),
c1_const->getValue<String>(),
*c1_const_chars, c1_const_size,
c_res->getData());
else if (c0_const && c1_string)
StringImpl::constant_string_vector(
c0_const->getValue<String>(),
*c0_const_chars, c0_const_size,
c1_string->getChars(), c1_string->getOffsets(),
c_res->getData());
else if (c0_const && c1_fixed_string)
StringImpl::constant_fixed_string_vector(
c0_const->getValue<String>(),
*c0_const_chars, c0_const_size,
c1_fixed_string->getChars(), c1_fixed_string->getN(),
c_res->getData());
else

View File

@ -6,9 +6,11 @@
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnNullable.h>
#include <Common/FieldVisitors.h>
#include <Common/memcmpSmall.h>
namespace DB
@ -272,8 +274,7 @@ struct ArrayIndexNumNullImpl
}
};
/// Implementation for arrays of strings when the 2nd function argument
/// is a NULL value.
/// Implementation for arrays of strings when the 2nd function argument is a NULL value.
template <typename IndexConv>
struct ArrayIndexStringNullImpl
{
@ -311,12 +312,11 @@ struct ArrayIndexStringImpl
{
static void vector_const(
const ColumnString::Chars & data, const ColumnArray::Offsets & offsets, const ColumnString::Offsets & string_offsets,
const String & value,
const ColumnString::Chars & value, ColumnString::Offset value_size,
PaddedPODArray<typename IndexConv::ResultType> & result,
const PaddedPODArray<UInt8> * null_map_data)
{
const auto size = offsets.size();
const auto value_size = value.size();
result.resize(size);
ColumnArray::Offset current_offset = 0;
@ -331,12 +331,12 @@ struct ArrayIndexStringImpl
? 0
: string_offsets[current_offset + j - 1];
ColumnArray::Offset string_size = string_offsets[current_offset + j] - string_pos;
ColumnArray::Offset string_size = string_offsets[current_offset + j] - string_pos - 1;
if (null_map_data && (*null_map_data)[current_offset + j])
{
}
else if (string_size == value_size + 1 && 0 == memcmp(value.data(), &data[string_pos], value_size))
else if (memequalSmallAllowOverflow15(value.data(), value_size, &data[string_pos], string_size))
{
if (!IndexConv::apply(j, current))
break;
@ -381,7 +381,7 @@ struct ArrayIndexStringImpl
if (null_map_item && (*null_map_item)[i])
hit = true;
}
else if (string_size == value_size && 0 == memcmp(&item_values[value_pos], &data[string_pos], value_size))
else if (memequalSmallAllowOverflow15(&item_values[value_pos], value_size, &data[string_pos], string_size))
hit = true;
if (hit)
@ -708,16 +708,32 @@ private:
const auto item_arg = block.getByPosition(arguments[1]).column.get();
if (item_arg->onlyNull())
{
ArrayIndexStringNullImpl<IndexConv>::vector_const(col_nested->getChars(), col_array->getOffsets(),
col_nested->getOffsets(), col_res->getData(), null_map_data);
}
else if (const auto item_arg_const = checkAndGetColumnConstStringOrFixedString(item_arg))
ArrayIndexStringImpl<IndexConv>::vector_const(col_nested->getChars(), col_array->getOffsets(),
col_nested->getOffsets(), item_arg_const->getValue<String>(), col_res->getData(),
null_map_data);
{
const ColumnString * item_const_string = checkAndGetColumn<ColumnString>(item_arg_const);
const ColumnFixedString * item_const_fixedstring = checkAndGetColumn<ColumnFixedString>(item_arg_const);
if (item_const_string)
ArrayIndexStringImpl<IndexConv>::vector_const(col_nested->getChars(), col_array->getOffsets(), col_nested->getOffsets(),
item_const_string->getChars(), item_const_string->getDataAt(0).size,
col_res->getData(), null_map_data);
else if (item_const_fixedstring)
ArrayIndexStringImpl<IndexConv>::vector_const(col_nested->getChars(), col_array->getOffsets(), col_nested->getOffsets(),
item_const_fixedstring->getChars(), item_const_fixedstring->getN(),
col_res->getData(), null_map_data);
else
throw Exception("Logical error: ColumnConst contains not String nor FixedString column", ErrorCodes::ILLEGAL_COLUMN);
}
else if (const auto item_arg_vector = checkAndGetColumn<ColumnString>(item_arg))
{
ArrayIndexStringImpl<IndexConv>::vector_vector(col_nested->getChars(), col_array->getOffsets(),
col_nested->getOffsets(), item_arg_vector->getChars(), item_arg_vector->getOffsets(),
col_res->getData(), null_map_data, null_map_item);
}
else
return false;