diff --git a/dbms/include/DB/Common/Volnitsky.h b/dbms/include/DB/Common/Volnitsky.h index 1dc2060730d..462517bfde9 100644 --- a/dbms/include/DB/Common/Volnitsky.h +++ b/dbms/include/DB/Common/Volnitsky.h @@ -36,33 +36,20 @@ protected: using offset_t = uint8_t; /// Смещение в needle. Для основного алгоритма, длина needle не должна быть больше 255. using ngram_t = uint16_t; /// n-грамма (2 байта). - const char * needle; - size_t needle_size; - const char * needle_end; - size_t step; /// Насколько двигаемся, если n-грамма из haystack не нашлась в хэш-таблице. + const UInt8 * const needle; + const size_t needle_size; + const UInt8 * const needle_end = needle + needle_size; + /// На сколько двигаемся, если n-грамма из haystack не нашлась в хэш-таблице. + const size_t step = needle_size - sizeof(ngram_t) + 1; + /** max needle length is 255, max distinct ngrams for case-sensitive is (255 - 1), case-insensitive is 4 * (255 - 1) + * storage of 64K ngrams (n = 2, 128 KB) should be large enough for both cases */ static const size_t hash_size = 64 * 1024; /// Помещается в L2-кэш. offset_t hash[hash_size]; /// Хэш-таблица. - bool fallback; /// Нужно ли использовать fallback алгоритм. - - /// fallback алгоритм - const char * search_fallback(const char * haystack, const size_t haystack_size) const - { - const char * pos = haystack; - const char * end = haystack + haystack_size; - - while (nullptr != (pos = static_cast(memchr(pos, needle[0], end - pos))) && - pos + needle_size <= end) - { - if (0 == memcmp(pos, needle, needle_size)) - return pos; - - ++pos; - } - - return end; - } + /// min haystack size to use main algorithm instead of fallback + static constexpr auto min_haystack_size_for_algorithm = 20000; + const bool fallback; /// Нужно ли использовать fallback алгоритм. public: /** haystack_size_hint - ожидаемый суммарный размер haystack при вызовах search. Можно не указывать. @@ -70,52 +57,36 @@ public: * так как считается, что тратить время на инициализацию хэш-таблицы не имеет смысла. */ VolnitskyBase(const char * const needle, const size_t needle_size, size_t haystack_size_hint = 0) - : needle{needle}, needle_size{needle_size}, needle_end{needle + needle_size}, - step{needle_size - sizeof(ngram_t) + 1} + : needle{reinterpret_cast(needle)}, needle_size{needle_size}, + fallback{ + needle_size < 2 * sizeof(ngram_t) or needle_size >= std::numeric_limits::max() or + (haystack_size_hint and haystack_size_hint < min_haystack_size_for_algorithm) + } { - if (needle_size < 2 * sizeof(ngram_t) - || needle_size >= std::numeric_limits::max() - || (haystack_size_hint && haystack_size_hint < 20000)) - { - fallback = true; + if (fallback) return; - } - else - fallback = false; memset(hash, 0, sizeof(hash)); - for (int i = needle_size - sizeof(ngram_t); i >= 0; --i) - self().putNGram(needle + i, i + 1); + /// int is used here because unsigned can't be used with condition like `i >= 0`, unsigned always >= 0 + for (auto i = static_cast(needle_size - sizeof(ngram_t)); i >= 0; --i) + self().putNGram(this->needle + i, i + 1); } /// Если не найдено - возвращается конец haystack. - const char * search(const char * haystack, size_t haystack_size) const + const UInt8 * search(const UInt8 * const haystack, const size_t haystack_size) const { if (needle_size == 0) return haystack; - const char * haystack_end = haystack + haystack_size; + const auto haystack_end = haystack + haystack_size; - if (needle_size == 1) - { - if (const auto res = static_cast( - memchr(haystack, /*CaseSensitive*/true ? needle[0] : std::tolower(needle[0]), haystack_size))) - return res; - -// if (!CaseSensitive) -// if (const auto res = static_cast(memchr(haystack, std::toupper(this->needle[0]), haystack_size))) -// return res; - - return haystack_end; - } - - if (fallback || haystack_size <= needle_size) - return self().search_fallback(haystack, haystack_size); + if (needle_size == 1 || fallback || haystack_size <= needle_size) + return self().search_fallback(haystack, haystack_end); /// Будем "прикладывать" needle к haystack и сравнивать n-грам из конца needle. - const char * pos = haystack + needle_size - sizeof(ngram_t); + const auto * pos = haystack + needle_size - sizeof(ngram_t); for (; pos <= haystack_end - needle_size; pos += step) { /// Смотрим все ячейки хэш-таблицы, которые могут соответствовать n-граму из haystack. @@ -123,30 +94,27 @@ public: cell_num = (cell_num + 1) % hash_size) { /// Когда нашли - сравниваем побайтово, используя смещение из хэш-таблицы. - const char * res = pos - (hash[cell_num] - 1); - for (size_t i = 0; i < needle_size;) - if (!self().compare(res + i, needle + i, i)) - goto next_hash_cell; + const auto res = pos - (hash[cell_num] - 1); - return res; - next_hash_cell:; + if (self().compare(res)) + return res; } } /// Оставшийся хвостик. - return self().search_fallback(pos - step + 1, haystack_end - (pos - step + 1)); + return self().search_fallback(pos - step + 1, haystack_end); } - const unsigned char * search(const unsigned char * haystack, size_t haystack_size) const + const char * search(const char * haystack, size_t haystack_size) const { - return reinterpret_cast(search(reinterpret_cast(haystack), haystack_size)); + return reinterpret_cast(search(reinterpret_cast(haystack), haystack_size)); } protected: CRTP & self() { return static_cast(*this); } const CRTP & self() const { return const_cast(this)->self(); } - static const ngram_t & toNGram(const char * const pos) + static const ngram_t & toNGram(const UInt8 * const pos) { return *reinterpret_cast(pos); } @@ -167,17 +135,205 @@ protected: /// Primary template for case sensitive comparison template struct VolnitskyImpl : VolnitskyBase> { - using VolnitskyBase>::VolnitskyBase; + VolnitskyImpl(const char * const needle, const size_t needle_size, const size_t haystack_size_hint = 0) + : VolnitskyBase>{needle, needle_size, haystack_size_hint}, + fallback_searcher{needle, needle_size} + { + } - void putNGram(const char * const pos, const int offset) + void putNGram(const UInt8 * const pos, const int offset) { this->putNGramBase(this->toNGram(pos), offset); } - static bool compare(const char * const lhs, const char * const rhs, std::size_t & offset) + bool compare(const UInt8 * const pos) const { - ++offset; - return *lhs == *rhs; + /// @todo: maybe just use memcmp for this case and rely on internal SSE optimization as in case with memcpy? + return fallback_searcher.compare(pos); + } + + class Searcher + { + static constexpr auto n = sizeof(__m128i); + + const int page_size = getpagesize(); + + /// string to be searched for + const char * const needle; + const std::size_t needle_size; + /// first character in `needle` + UInt8 first{}; + /// vector filled `first` for determining leftmost position of the first symbol + __m128i pattern; + /// vector of first 16 characters of `needle` + __m128i cache = _mm_setzero_si128(); + int cachemask{}; + + bool page_safe(const void * const ptr) const + { + return ((page_size - 1) & reinterpret_cast(ptr)) <= page_size - n; + } + + public: + Searcher(const char * const needle, const std::size_t needle_size) + : needle{needle}, needle_size{needle_size} + { + if (0 == needle_size) + return; + + auto needle_pos = needle; + + first = *needle_pos; + + pattern = _mm_set1_epi8(first); + + const auto needle_end = needle_pos + needle_size; + + for (const auto i : ext::range(0, n)) + { + cache = _mm_srli_si128(cache, 1); + + if (needle_pos != needle_end) + { + cache = _mm_insert_epi8(cache, *needle_pos, n - 1); + cachemask |= 1 << i; + ++needle_pos; + } + } + } + + bool compare(const UInt8 * pos) const + { + if (page_safe(pos)) + { + const auto v_haystack = _mm_loadu_si128(reinterpret_cast(pos)); + const auto v_against_cache = _mm_cmpeq_epi8(v_haystack, cache); + const auto mask = _mm_movemask_epi8(v_against_cache); + + if (0xffff == cachemask) + { + if (mask == cachemask) + { + pos += n; + auto needle_pos = needle + n; + const auto needle_end = needle + needle_size; + + while (needle_pos < needle_end && *pos == *needle_pos) + ++pos, ++needle_pos; + + if (needle_pos == needle_end) + return true; + } + } + else if ((mask & cachemask) == cachemask) + return true; + + return false; + } + + if (*pos == first) + { + ++pos; + auto needle_pos = needle + 1; + const auto needle_end = needle + needle_size; + + while (needle_pos < needle_end && *pos == *needle_pos) + ++pos, ++needle_pos; + + if (needle_pos == needle_end) + return true; + } + + return false; + } + + const UInt8 * find(const UInt8 * haystack, const UInt8 * const haystack_end) const + { + if (0 == needle_size) + return haystack; + + const auto needle_begin = reinterpret_cast(needle); + const auto needle_end = needle_begin + needle_size; + + while (haystack < haystack_end) + { + /// @todo supposedly for long strings spanning across multiple pages. Why don't we use this technique in other places? + if (haystack + n <= haystack_end && page_safe(haystack)) + { + /// find first character + const auto v_haystack = _mm_loadu_si128(reinterpret_cast(haystack)); + const auto v_against_pattern = _mm_cmpeq_epi8(v_haystack, pattern); + + const auto mask = _mm_movemask_epi8(v_against_pattern); + + /// first character not present in 16 octets starting at `haystack` + if (mask == 0) + { + haystack += n; + continue; + } + + const auto offset = _bit_scan_forward(mask); + haystack += offset; + + if (haystack < haystack_end && haystack + n <= haystack_end && page_safe(haystack)) + { + /// check for first 16 octets + const auto v_haystack = _mm_loadu_si128(reinterpret_cast(haystack)); + const auto v_against_cache = _mm_cmpeq_epi8(v_haystack, cache); + const auto mask = _mm_movemask_epi8(v_against_cache); + + if (0xffff == cachemask) + { + if (mask == cachemask) + { + auto haystack_pos = haystack + n; + auto needle_pos = needle_begin + n; + + while (haystack_pos < haystack_end && needle_pos < needle_end && + *haystack_pos == *needle_pos) + ++haystack_pos, ++needle_pos; + + if (needle_pos == needle_end) + return haystack; + } + } + else if ((mask & cachemask) == cachemask) + return haystack; + + ++haystack; + continue; + } + } + + if (haystack == haystack_end) + return haystack_end; + + if (*haystack == first) + { + auto haystack_pos = haystack + 1; + auto needle_pos = needle_begin + 1; + + while (haystack_pos < haystack_end && needle_pos < needle_end && + *haystack_pos == *needle_pos) + ++haystack_pos, ++needle_pos; + + if (needle_pos == needle_end) + return haystack; + } + + ++haystack; + } + + return haystack_end; + } + }; + + Searcher fallback_searcher; + + const UInt8 * search_fallback(const UInt8 * const haystack, const UInt8 * const haystack_end) const + { + return fallback_searcher.find(haystack, haystack_end); } }; @@ -185,11 +341,11 @@ template struct VolnitskyImpl : VolnitskyBase struct VolnitskyImpl : VolnitskyBase> { VolnitskyImpl(const char * const needle, const size_t needle_size, const size_t haystack_size_hint = 0) - : VolnitskyBase{needle, needle_size, haystack_size_hint}, fallback_searcher{needle, needle_size} + : VolnitskyBase{needle, needle_size, haystack_size_hint}, fallback_searcher{needle, needle_size} { } - void putNGram(const char * const pos, const int offset) + void putNGram(const UInt8 * const pos, const int offset) { union { ngram_t n; @@ -239,13 +395,12 @@ template <> struct VolnitskyImpl : VolnitskyBase struct VolnitskyImpl : VolnitskyBase struct VolnitskyImpl : VolnitskyBase(pos)); + const auto v_against_l = _mm_cmpeq_epi8(v_haystack, cachel); + const auto v_against_u = _mm_cmpeq_epi8(v_haystack, cacheu); + const auto v_against_l_or_u = _mm_or_si128(v_against_l, v_against_u); + const auto mask = _mm_movemask_epi8(v_against_l_or_u); + + if (0xffff == cachemask) + { + if (mask == cachemask) + { + pos += n; + auto needle_pos = needle + n; + const auto needle_end = needle + needle_size; + + while (needle_pos < needle_end && std::tolower(*pos) == std::tolower(*needle_pos)) + ++pos, ++needle_pos; + + if (needle_pos == needle_end) + return true; + } + } + else if ((mask & cachemask) == cachemask) + return true; + + return false; + } + + if (*pos == l || *pos == u) + { + ++pos; + auto needle_pos = needle + 1; + const auto needle_end = needle + needle_size; + + while (needle_pos < needle_end && std::tolower(*pos) == std::tolower(*needle_pos)) + ++pos, ++needle_pos; + + if (needle_pos == needle_end) + return true; + } + + return false; + } + const UInt8 * find(const UInt8 * haystack, const UInt8 * const haystack_end) const { if (0 == needle_size) @@ -383,12 +585,11 @@ template <> struct VolnitskyImpl : VolnitskyBase(fallback_searcher.find(reinterpret_cast(haystack), - reinterpret_cast(haystack) + haystack_size)); + return fallback_searcher.find(haystack, haystack_end); } }; @@ -400,7 +601,7 @@ template <> struct VolnitskyImpl : VolnitskyBase struct VolnitskyImpl : VolnitskyBase(pos); static const Poco::UTF8Encoding utf8; if (utf8_is_continuation_octet(c[1])) { /// ngram is inside a sequence - auto seq_pos = u_pos; + auto seq_pos = pos; utf8_sync_backward(seq_pos); const auto u32 = utf8.convert(seq_pos); @@ -476,7 +676,7 @@ template <> struct VolnitskyImpl : VolnitskyBase struct VolnitskyImpl : VolnitskyBase struct VolnitskyImpl : VolnitskyBase struct VolnitskyImpl : VolnitskyBase struct VolnitskyImpl : VolnitskyBase struct VolnitskyImpl : VolnitskyBase(lhs)); - - static const Poco::UTF8Encoding utf8; - - return Poco::Unicode::toLower(utf8.convert(reinterpret_cast(lhs))) == - Poco::Unicode::toLower(utf8.convert(reinterpret_cast(rhs))); + return fallback_searcher.compare(pos); } - class CaseInsensitiveSearcher + class Searcher { using UTF8SequenceBuffer = UInt8[6]; @@ -658,7 +853,7 @@ template <> struct VolnitskyImpl : VolnitskyBase struct VolnitskyImpl : VolnitskyBase(pos)); + const auto v_against_l = _mm_cmpeq_epi8(v_haystack, cachel); + const auto v_against_u = _mm_cmpeq_epi8(v_haystack, cacheu); + const auto v_against_l_or_u = _mm_or_si128(v_against_l, v_against_u); + const auto mask = _mm_movemask_epi8(v_against_l_or_u); + + if (0xffff == cachemask) + { + if (mask == cachemask) + { + pos += cache_valid_len; + auto needle_pos = needle + cache_valid_len; + const auto needle_end = needle + needle_size; + + while (needle_pos < needle_end && + Poco::Unicode::toLower(utf8.convert(pos)) == + Poco::Unicode::toLower(utf8.convert(reinterpret_cast(needle_pos)))) + { + /// @note assuming sequences for lowercase and uppercase have exact same length + const auto len = utf8_seq_length(*pos); + pos += len, needle_pos += len; + } + + if (needle_pos == needle_end) + return true; + } + } + else if ((mask & cachemask) == cachemask) + return true; + + return false; + } + + if (*pos == l || *pos == u) + { + pos += first_needle_symbol_is_ascii; + auto needle_pos = needle + first_needle_symbol_is_ascii; + const auto needle_end = needle + needle_size; + + while (needle_pos < needle_end && + Poco::Unicode::toLower(utf8.convert(pos)) == + Poco::Unicode::toLower(utf8.convert(reinterpret_cast(needle_pos)))) + { + const auto len = utf8_seq_length(*pos); + pos += len, needle_pos += len; + } + + if (needle_pos == needle_end) + return true; + } + + return false; + } + const UInt8 * find(const UInt8 * haystack, const UInt8 * const haystack_end) const { if (0 == needle_size) @@ -838,12 +1093,11 @@ template <> struct VolnitskyImpl : VolnitskyBase(fallback_searcher.find(reinterpret_cast(haystack), - reinterpret_cast(haystack) + haystack_size)); + return fallback_searcher.find(haystack, haystack_end); } };