#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int TOO_LARGE_STRING_SIZE; extern const int ILLEGAL_COLUMN; extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int LOGICAL_ERROR; extern const int NOT_IMPLEMENTED; } /** Comparison functions: ==, !=, <, >, <=, >=. * The comparison functions always return 0 or 1 (UInt8). * * You can compare the following types: * - numbers and decimals; * - strings and fixed strings; * - dates; * - datetimes; * within each group, but not from different groups; * - tuples (lexicographic comparison). * * Exception: You can compare the date and datetime with a constant string. Example: EventDate = '2015-01-01'. */ template struct NumComparisonImpl { /// If you don't specify NO_INLINE, the compiler will inline this function, but we don't need this as this function contains tight loop inside. static void NO_INLINE vector_vector(const PaddedPODArray & a, const PaddedPODArray & b, PaddedPODArray & c) { /** GCC 4.8.2 vectorizes a loop only if it is written in this form. * In this case, if you loop through the array index (the code will look simpler), * the loop will not be vectorized. */ size_t size = a.size(); const A * a_pos = a.data(); const B * b_pos = b.data(); UInt8 * c_pos = c.data(); const A * a_end = a_pos + size; while (a_pos < a_end) { *c_pos = Op::apply(*a_pos, *b_pos); ++a_pos; ++b_pos; ++c_pos; } } static void NO_INLINE vector_constant(const PaddedPODArray & a, B b, PaddedPODArray & c) { size_t size = a.size(); const A * a_pos = a.data(); UInt8 * c_pos = c.data(); const A * a_end = a_pos + size; while (a_pos < a_end) { *c_pos = Op::apply(*a_pos, b); ++a_pos; ++c_pos; } } static void constant_vector(A a, const PaddedPODArray & b, PaddedPODArray & c) { NumComparisonImpl::vector_constant(b, a, c); } static void constant_constant(A a, B b, UInt8 & c) { c = Op::apply(a, b); } }; template struct StringComparisonImpl { static void NO_INLINE string_vector_string_vector( const ColumnString::Chars & a_data, const ColumnString::Offsets & a_offsets, const ColumnString::Chars & b_data, const ColumnString::Offsets & b_offsets, PaddedPODArray & c) { size_t size = a_offsets.size(); ColumnString::Offset prev_a_offset = 0; ColumnString::Offset prev_b_offset = 0; for (size_t i = 0; i < size; ++i) { c[i] = Op::apply(memcmpSmallAllowOverflow15( a_data.data() + prev_a_offset, a_offsets[i] - prev_a_offset - 1, b_data.data() + prev_b_offset, b_offsets[i] - prev_b_offset - 1), 0); prev_a_offset = a_offsets[i]; prev_b_offset = b_offsets[i]; } } static void NO_INLINE string_vector_fixed_string_vector( const ColumnString::Chars & a_data, const ColumnString::Offsets & a_offsets, const ColumnString::Chars & b_data, ColumnString::Offset b_n, PaddedPODArray & c) { size_t size = a_offsets.size(); ColumnString::Offset prev_a_offset = 0; for (size_t i = 0; i < size; ++i) { c[i] = Op::apply(memcmpSmallAllowOverflow15( a_data.data() + prev_a_offset, a_offsets[i] - prev_a_offset - 1, b_data.data() + i * b_n, b_n), 0); prev_a_offset = a_offsets[i]; } } static void NO_INLINE string_vector_constant( const ColumnString::Chars & a_data, const ColumnString::Offsets & a_offsets, const ColumnString::Chars & b_data, ColumnString::Offset b_size, PaddedPODArray & c) { size_t size = a_offsets.size(); ColumnString::Offset prev_a_offset = 0; for (size_t i = 0; i < size; ++i) { c[i] = Op::apply(memcmpSmallAllowOverflow15( a_data.data() + prev_a_offset, a_offsets[i] - prev_a_offset - 1, b_data.data(), b_size), 0); prev_a_offset = a_offsets[i]; } } static void fixed_string_vector_string_vector( const ColumnString::Chars & a_data, ColumnString::Offset a_n, const ColumnString::Chars & b_data, const ColumnString::Offsets & b_offsets, PaddedPODArray & c) { StringComparisonImpl::string_vector_fixed_string_vector(b_data, b_offsets, a_data, a_n, c); } static void NO_INLINE fixed_string_vector_fixed_string_vector_16( const ColumnString::Chars & a_data, const ColumnString::Chars & b_data, PaddedPODArray & c) { size_t size = a_data.size(); for (size_t i = 0, j = 0; i < size; i += 16, ++j) c[j] = Op::apply(memcmp16(&a_data[i], &b_data[i]), 0); } static void NO_INLINE fixed_string_vector_constant_16( const ColumnString::Chars & a_data, const ColumnString::Chars & b_data, PaddedPODArray & c) { size_t size = a_data.size(); for (size_t i = 0, j = 0; i < size; i += 16, ++j) c[j] = Op::apply(memcmp16(&a_data[i], &b_data[0]), 0); } static void NO_INLINE fixed_string_vector_fixed_string_vector( const ColumnString::Chars & a_data, ColumnString::Offset a_n, const ColumnString::Chars & b_data, ColumnString::Offset b_n, PaddedPODArray & c) { if (a_n == 16 && b_n == 16) { /** Specialization if both sizes are 16. * To more efficient comparison of IPv6 addresses stored in FixedString(16). */ fixed_string_vector_fixed_string_vector_16(a_data, b_data, c); } else if (a_n == b_n) { size_t size = a_data.size(); for (size_t i = 0, j = 0; i < size; i += a_n, ++j) c[j] = Op::apply(memcmpSmallAllowOverflow15(a_data.data() + i, b_data.data() + i, a_n), 0); } else { size_t size = a_data.size() / a_n; for (size_t i = 0; i < size; ++i) c[i] = Op::apply(memcmpSmallAllowOverflow15(a_data.data() + i * a_n, a_n, b_data.data() + i * b_n, b_n), 0); } } static void NO_INLINE fixed_string_vector_constant( const ColumnString::Chars & a_data, ColumnString::Offset a_n, const ColumnString::Chars & b_data, ColumnString::Offset b_size, PaddedPODArray & c) { if (a_n == 16 && b_size == 16) { fixed_string_vector_constant_16(a_data, b_data, c); } else if (a_n == b_size) { size_t size = a_data.size(); for (size_t i = 0, j = 0; i < size; i += a_n, ++j) c[j] = Op::apply(memcmpSmallAllowOverflow15(a_data.data() + i, b_data.data(), a_n), 0); } else { size_t size = a_data.size(); for (size_t i = 0, j = 0; i < size; i += a_n, ++j) c[j] = Op::apply(memcmpSmallAllowOverflow15(a_data.data() + i, a_n, b_data.data(), b_size), 0); } } static void constant_string_vector( const ColumnString::Chars & a_data, ColumnString::Offset a_size, const ColumnString::Chars & b_data, const ColumnString::Offsets & b_offsets, PaddedPODArray & c) { StringComparisonImpl::string_vector_constant(b_data, b_offsets, a_data, a_size, c); } static void constant_fixed_string_vector( const ColumnString::Chars & a_data, ColumnString::Offset a_size, const ColumnString::Chars & b_data, ColumnString::Offset b_n, PaddedPODArray & c) { StringComparisonImpl::fixed_string_vector_constant(b_data, b_n, a_data, a_size, c); } static void constant_constant( const ColumnString::Chars & a_data, ColumnString::Offset a_size, const ColumnString::Chars & b_data, ColumnString::Offset b_size, UInt8 & c) { c = Op::apply(memcmpSmallAllowOverflow15(a_data.data(), a_size, b_data.data(), b_size), 0); } }; /// Comparisons for equality/inequality are implemented slightly more efficient. template struct StringEqualsImpl { static void NO_INLINE string_vector_string_vector( const ColumnString::Chars & a_data, const ColumnString::Offsets & a_offsets, const ColumnString::Chars & b_data, const ColumnString::Offsets & b_offsets, PaddedPODArray & c) { size_t size = a_offsets.size(); ColumnString::Offset prev_a_offset = 0; ColumnString::Offset prev_b_offset = 0; for (size_t i = 0; i < size; ++i) { auto a_size = a_offsets[i] - prev_a_offset - 1; auto b_size = b_offsets[i] - prev_b_offset - 1; c[i] = positive == memequalSmallAllowOverflow15( a_data.data() + prev_a_offset, a_size, b_data.data() + prev_b_offset, b_size); prev_a_offset = a_offsets[i]; prev_b_offset = b_offsets[i]; } } static void NO_INLINE string_vector_fixed_string_vector( const ColumnString::Chars & a_data, const ColumnString::Offsets & a_offsets, const ColumnString::Chars & b_data, ColumnString::Offset b_n, PaddedPODArray & c) { size_t size = a_offsets.size(); ColumnString::Offset prev_a_offset = 0; for (size_t i = 0; i < size; ++i) { auto a_size = a_offsets[i] - prev_a_offset - 1; c[i] = positive == memequalSmallAllowOverflow15( a_data.data() + prev_a_offset, a_size, b_data.data() + b_n * i, b_n); prev_a_offset = a_offsets[i]; } } static void NO_INLINE string_vector_constant( const ColumnString::Chars & a_data, const ColumnString::Offsets & a_offsets, const ColumnString::Chars & b_data, ColumnString::Offset b_size, PaddedPODArray & c) { size_t size = a_offsets.size(); ColumnString::Offset prev_a_offset = 0; for (size_t i = 0; i < size; ++i) { auto a_size = a_offsets[i] - prev_a_offset - 1; c[i] = positive == memequalSmallAllowOverflow15( a_data.data() + prev_a_offset, a_size, b_data.data(), b_size); prev_a_offset = a_offsets[i]; } } static void NO_INLINE fixed_string_vector_fixed_string_vector_16( const ColumnString::Chars & a_data, const ColumnString::Chars & b_data, PaddedPODArray & c) { size_t size = a_data.size() / 16; for (size_t i = 0; i < size; ++i) c[i] = positive == memequal16( a_data.data() + i * 16, b_data.data() + i * 16); } static void NO_INLINE fixed_string_vector_constant_16( const ColumnString::Chars & a_data, const ColumnString::Chars & b_data, PaddedPODArray & c) { size_t size = a_data.size() / 16; for (size_t i = 0; i < size; ++i) c[i] = positive == memequal16( a_data.data() + i * 16, b_data.data()); } static void NO_INLINE fixed_string_vector_fixed_string_vector( const ColumnString::Chars & a_data, ColumnString::Offset a_n, const ColumnString::Chars & b_data, ColumnString::Offset b_n, PaddedPODArray & c) { /** Specialization if both sizes are 16. * To more efficient comparison of IPv6 addresses stored in FixedString(16). */ if (a_n == 16 && b_n == 16) { fixed_string_vector_fixed_string_vector_16(a_data, b_data, c); } else { size_t size = a_data.size() / a_n; for (size_t i = 0; i < size; ++i) c[i] = positive == memequalSmallAllowOverflow15(a_data.data() + i * a_n, a_n, b_data.data() + i * b_n, b_n); } } static void NO_INLINE fixed_string_vector_constant( const ColumnString::Chars & a_data, ColumnString::Offset a_n, const ColumnString::Chars & b_data, ColumnString::Offset b_size, PaddedPODArray & c) { if (a_n == 16 && b_size == 16) { fixed_string_vector_constant_16(a_data, b_data, c); } else { size_t size = a_data.size() / a_n; for (size_t i = 0; i < size; ++i) c[i] = positive == memequalSmallAllowOverflow15(a_data.data() + i * a_n, a_n, b_data.data(), b_size); } } static void fixed_string_vector_string_vector( const ColumnString::Chars & a_data, ColumnString::Offset a_n, const ColumnString::Chars & b_data, const ColumnString::Offsets & b_offsets, PaddedPODArray & c) { string_vector_fixed_string_vector(b_data, b_offsets, a_data, a_n, c); } static void constant_string_vector( const ColumnString::Chars & a_data, ColumnString::Offset a_size, const ColumnString::Chars & b_data, const ColumnString::Offsets & b_offsets, PaddedPODArray & c) { string_vector_constant(b_data, b_offsets, a_data, a_size, c); } static void constant_fixed_string_vector( const ColumnString::Chars & a_data, ColumnString::Offset a_size, const ColumnString::Chars & b_data, ColumnString::Offset b_n, PaddedPODArray & c) { fixed_string_vector_constant(b_data, b_n, a_data, a_size, c); } static void constant_constant( const ColumnString::Chars & a_data, ColumnString::Offset a_size, const ColumnString::Chars & b_data, ColumnString::Offset b_size, UInt8 & c) { c = positive == memequalSmallAllowOverflow15(a_data.data(), a_size, b_data.data(), b_size); } }; template struct StringComparisonImpl> : StringEqualsImpl {}; template struct StringComparisonImpl> : StringEqualsImpl {}; /// Generic version, implemented for columns of same type. template struct GenericComparisonImpl { static void NO_INLINE vector_vector(const IColumn & a, const IColumn & b, PaddedPODArray & c) { for (size_t i = 0, size = a.size(); i < size; ++i) c[i] = Op::apply(a.compareAt(i, i, b, 1), 0); } static void NO_INLINE vector_constant(const IColumn & a, const IColumn & b, PaddedPODArray & c) { auto b_materialized = b.cloneResized(1)->convertToFullColumnIfConst(); for (size_t i = 0, size = a.size(); i < size; ++i) c[i] = Op::apply(a.compareAt(i, 0, *b_materialized, 1), 0); } static void constant_vector(const IColumn & a, const IColumn & b, PaddedPODArray & c) { GenericComparisonImpl::vector_constant(b, a, c); } static void constant_constant(const IColumn & a, const IColumn & b, UInt8 & c) { c = Op::apply(a.compareAt(0, 0, b, 1), 0); } }; #if USE_EMBEDDED_COMPILER template