This commit is contained in:
Evgeniy Gatov 2015-10-17 21:16:20 +03:00
commit c2818d97c9
27 changed files with 1346 additions and 606 deletions

View File

@ -230,10 +230,16 @@ public:
if (size == 0)
return res;
res_->getData().reserve(size);
auto & res_data = res_->getData();
res_data.reserve(size);
for (size_t i = 0; i < size; ++i)
if (filter[i])
res_->getData().push_back(getData()[i]);
res_data.push_back(getData()[i]);
/// Для экономии оперативки в случае слишком сильной фильтрации.
if (res_data.size() * 2 < res_data.capacity())
res_data = Container_t(res_data.cbegin(), res_data.cend());
return res;
}

View File

@ -233,7 +233,13 @@ public:
void getExtremes(Field & min, Field & max) const override
{
throw Exception("Method getExtremes is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
size_t tuple_size = columns.size();
min = Array(tuple_size);
max = Array(tuple_size);
for (size_t i = 0; i < tuple_size; ++i)
columns[i]->getExtremes(min.get<Array &>()[i], max.get<Array &>()[i]);
}

View File

@ -67,7 +67,7 @@ private:
return n;
}
static size_t to_size(size_t n) { return byte_size(std::max(POD_ARRAY_INITIAL_SIZE, round_up_to_power_of_two(n))); }
static size_t to_size(size_t n) { return byte_size(round_up_to_power_of_two(n)); }
void alloc(size_t n)
{
@ -162,6 +162,8 @@ public:
iterator end() { return t_end(); }
const_iterator begin() const { return t_start(); }
const_iterator end() const { return t_end(); }
const_iterator cbegin() const { return t_start(); }
const_iterator cend() const { return t_end(); }
void reserve(size_t n)
{

View File

@ -0,0 +1,639 @@
#pragma once
#include <DB/Common/UTF8Helpers.h>
#include <ext/range.hpp>
#include <Poco/UTF8Encoding.h>
#include <Poco/Unicode.h>
#include <stdint.h>
#include <string.h>
namespace DB
{
/// Performs case-sensitive and case-insensitive search of UTF-8 strings
template <bool CaseSensitive, bool ASCII> class StringSearcher;
/// Case-insensitive UTF-8 searcher
template <> class StringSearcher<false, false>
{
using UTF8SequenceBuffer = UInt8[6];
static constexpr auto n = sizeof(__m128i);
const int page_size = getpagesize();
/// string to be searched for
const UInt8 * const needle;
const std::size_t needle_size;
const UInt8 * const needle_end = needle + needle_size;
/// lower and uppercase variants of the first octet of the first character in `needle`
bool first_needle_symbol_is_ascii{};
UInt8 l{};
UInt8 u{};
/// vectors filled with `l` and `u`, for determining leftmost position of the first symbol
__m128i patl, patu;
/// lower and uppercase vectors of first 16 characters of `needle`
__m128i cachel = _mm_setzero_si128(), cacheu = _mm_setzero_si128();
int cachemask{};
std::size_t cache_valid_len{};
std::size_t cache_actual_len{};
bool page_safe(const void * const ptr) const
{
return ((page_size - 1) & reinterpret_cast<std::uintptr_t>(ptr)) <= page_size - n;
}
public:
StringSearcher(const char * const needle_, const std::size_t needle_size)
: needle{reinterpret_cast<const UInt8 *>(needle_)}, needle_size{needle_size}
{
if (0 == needle_size)
return;
static const Poco::UTF8Encoding utf8;
UTF8SequenceBuffer l_seq, u_seq;
if (*needle < 0x80u)
{
first_needle_symbol_is_ascii = true;
l = static_cast<const UInt8>(std::tolower(*needle));
u = static_cast<const UInt8>(std::toupper(*needle));
}
else
{
const auto first_u32 = utf8.convert(needle);
const auto first_l_u32 = Poco::Unicode::toLower(first_u32);
const auto first_u_u32 = Poco::Unicode::toUpper(first_u32);
/// lower and uppercase variants of the first octet of the first character in `needle`
utf8.convert(first_l_u32, l_seq, sizeof(l_seq));
l = l_seq[0];
utf8.convert(first_u_u32, u_seq, sizeof(u_seq));
u = u_seq[0];
}
/// for detecting leftmost position of the first symbol
patl = _mm_set1_epi8(l);
patu = _mm_set1_epi8(u);
/// lower and uppercase vectors of first 16 octets of `needle`
auto needle_pos = needle;
for (std::size_t i = 0; i < n;)
{
if (needle_pos == needle_end)
{
cachel = _mm_srli_si128(cachel, 1);
cacheu = _mm_srli_si128(cacheu, 1);
++i;
continue;
}
const auto src_len = DB::UTF8::seqLength(*needle_pos);
const auto c_u32 = utf8.convert(needle_pos);
const auto c_l_u32 = Poco::Unicode::toLower(c_u32);
const auto c_u_u32 = Poco::Unicode::toUpper(c_u32);
const auto dst_l_len = static_cast<UInt8>(utf8.convert(c_l_u32, l_seq, sizeof(l_seq)));
const auto dst_u_len = static_cast<UInt8>(utf8.convert(c_u_u32, u_seq, sizeof(u_seq)));
/// @note Unicode standard states it is a rare but possible occasion
if (!(dst_l_len == dst_u_len && dst_u_len == src_len))
throw DB::Exception{
"UTF8 sequences with different lowercase and uppercase lengths are not supported",
DB::ErrorCodes::UNSUPPORTED_PARAMETER
};
cache_actual_len += src_len;
if (cache_actual_len < n)
cache_valid_len += src_len;
for (std::size_t j = 0; j < src_len && i < n; ++j, ++i)
{
cachel = _mm_srli_si128(cachel, 1);
cacheu = _mm_srli_si128(cacheu, 1);
if (needle_pos != needle_end)
{
cachel = _mm_insert_epi8(cachel, l_seq[j], n - 1);
cacheu = _mm_insert_epi8(cacheu, u_seq[j], n - 1);
cachemask |= 1 << i;
++needle_pos;
}
}
}
}
bool compare(const UInt8 * pos) const
{
static const Poco::UTF8Encoding utf8;
if (page_safe(pos))
{
const auto v_haystack = _mm_loadu_si128(reinterpret_cast<const __m128i *>(pos));
const auto v_against_l = _mm_cmpeq_epi8(v_haystack, cachel);
const auto v_against_u = _mm_cmpeq_epi8(v_haystack, cacheu);
const auto v_against_l_or_u = _mm_or_si128(v_against_l, v_against_u);
const auto mask = _mm_movemask_epi8(v_against_l_or_u);
if (0xffff == cachemask)
{
if (mask == cachemask)
{
pos += cache_valid_len;
auto needle_pos = needle + cache_valid_len;
while (needle_pos < needle_end &&
Poco::Unicode::toLower(utf8.convert(pos)) ==
Poco::Unicode::toLower(utf8.convert(needle_pos)))
{
/// @note assuming sequences for lowercase and uppercase have exact same length
const auto len = DB::UTF8::seqLength(*pos);
pos += len, needle_pos += len;
}
if (needle_pos == needle_end)
return true;
}
}
else if ((mask & cachemask) == cachemask)
return true;
return false;
}
if (*pos == l || *pos == u)
{
pos += first_needle_symbol_is_ascii;
auto needle_pos = needle + first_needle_symbol_is_ascii;
while (needle_pos < needle_end &&
Poco::Unicode::toLower(utf8.convert(pos)) ==
Poco::Unicode::toLower(utf8.convert(needle_pos)))
{
const auto len = DB::UTF8::seqLength(*pos);
pos += len, needle_pos += len;
}
if (needle_pos == needle_end)
return true;
}
return false;
}
const UInt8 * search(const UInt8 * haystack, const UInt8 * const haystack_end) const
{
if (0 == needle_size)
return haystack;
static const Poco::UTF8Encoding utf8;
while (haystack < haystack_end)
{
if (haystack + n <= haystack_end && page_safe(haystack))
{
const auto v_haystack = _mm_loadu_si128(reinterpret_cast<const __m128i *>(haystack));
const auto v_against_l = _mm_cmpeq_epi8(v_haystack, patl);
const auto v_against_u = _mm_cmpeq_epi8(v_haystack, patu);
const auto v_against_l_or_u = _mm_or_si128(v_against_l, v_against_u);
const auto mask = _mm_movemask_epi8(v_against_l_or_u);
if (mask == 0)
{
haystack += n;
DB::UTF8::syncForward(haystack, haystack_end);
continue;
}
const auto offset = _bit_scan_forward(mask);
haystack += offset;
if (haystack < haystack_end && haystack + n <= haystack_end && page_safe(haystack))
{
const auto v_haystack = _mm_loadu_si128(reinterpret_cast<const __m128i *>(haystack));
const auto v_against_l = _mm_cmpeq_epi8(v_haystack, cachel);
const auto v_against_u = _mm_cmpeq_epi8(v_haystack, cacheu);
const auto v_against_l_or_u = _mm_or_si128(v_against_l, v_against_u);
const auto mask = _mm_movemask_epi8(v_against_l_or_u);
if (0xffff == cachemask)
{
if (mask == cachemask)
{
auto haystack_pos = haystack + cache_valid_len;
auto needle_pos = needle + cache_valid_len;
while (haystack_pos < haystack_end && needle_pos < needle_end &&
Poco::Unicode::toLower(utf8.convert(haystack_pos)) ==
Poco::Unicode::toLower(utf8.convert(needle_pos)))
{
/// @note assuming sequences for lowercase and uppercase have exact same length
const auto len = DB::UTF8::seqLength(*haystack_pos);
haystack_pos += len, needle_pos += len;
}
if (needle_pos == needle_end)
return haystack;
}
}
else if ((mask & cachemask) == cachemask)
return haystack;
/// first octet was ok, but not the first 16, move to start of next sequence and reapply
haystack += DB::UTF8::seqLength(*haystack);
continue;
}
}
if (haystack == haystack_end)
return haystack_end;
if (*haystack == l || *haystack == u)
{
auto haystack_pos = haystack + first_needle_symbol_is_ascii;
auto needle_pos = needle + first_needle_symbol_is_ascii;
while (haystack_pos < haystack_end && needle_pos < needle_end &&
Poco::Unicode::toLower(utf8.convert(haystack_pos)) ==
Poco::Unicode::toLower(utf8.convert(needle_pos)))
{
const auto len = DB::UTF8::seqLength(*haystack_pos);
haystack_pos += len, needle_pos += len;
}
if (needle_pos == needle_end)
return haystack;
}
/// advance to the start of the next sequence
haystack += DB::UTF8::seqLength(*haystack);
}
return haystack_end;
}
};
/// Case-insensitive ASCII searcher
template <> class StringSearcher<false, true>
{
static constexpr auto n = sizeof(__m128i);
const int page_size = getpagesize();
/// string to be searched for
const UInt8 * const needle;
const std::size_t needle_size;
const UInt8 * const needle_end = needle + needle_size;
/// lower and uppercase variants of the first character in `needle`
UInt8 l{};
UInt8 u{};
/// vectors filled with `l` and `u`, for determining leftmost position of the first symbol
__m128i patl, patu;
/// lower and uppercase vectors of first 16 characters of `needle`
__m128i cachel = _mm_setzero_si128(), cacheu = _mm_setzero_si128();
int cachemask{};
bool page_safe(const void * const ptr) const
{
return ((page_size - 1) & reinterpret_cast<std::uintptr_t>(ptr)) <= page_size - n;
}
public:
StringSearcher(const char * const needle_, const std::size_t needle_size)
: needle{reinterpret_cast<const UInt8 *>(needle_)}, needle_size{needle_size}
{
if (0 == needle_size)
return;
l = static_cast<UInt8>(std::tolower(*needle));
u = static_cast<UInt8>(std::toupper(*needle));
patl = _mm_set1_epi8(l);
patu = _mm_set1_epi8(u);
auto needle_pos = needle;
for (const auto i : ext::range(0, n))
{
cachel = _mm_srli_si128(cachel, 1);
cacheu = _mm_srli_si128(cacheu, 1);
if (needle_pos != needle_end)
{
cachel = _mm_insert_epi8(cachel, std::tolower(*needle_pos), n - 1);
cacheu = _mm_insert_epi8(cacheu, std::toupper(*needle_pos), n - 1);
cachemask |= 1 << i;
++needle_pos;
}
}
}
bool compare(const UInt8 * pos) const
{
if (page_safe(pos))
{
const auto v_haystack = _mm_loadu_si128(reinterpret_cast<const __m128i *>(pos));
const auto v_against_l = _mm_cmpeq_epi8(v_haystack, cachel);
const auto v_against_u = _mm_cmpeq_epi8(v_haystack, cacheu);
const auto v_against_l_or_u = _mm_or_si128(v_against_l, v_against_u);
const auto mask = _mm_movemask_epi8(v_against_l_or_u);
if (0xffff == cachemask)
{
if (mask == cachemask)
{
pos += n;
auto needle_pos = needle + n;
while (needle_pos < needle_end && std::tolower(*pos) == std::tolower(*needle_pos))
++pos, ++needle_pos;
if (needle_pos == needle_end)
return true;
}
}
else if ((mask & cachemask) == cachemask)
return true;
return false;
}
if (*pos == l || *pos == u)
{
++pos;
auto needle_pos = needle + 1;
while (needle_pos < needle_end && std::tolower(*pos) == std::tolower(*needle_pos))
++pos, ++needle_pos;
if (needle_pos == needle_end)
return true;
}
return false;
}
const UInt8 * search(const UInt8 * haystack, const UInt8 * const haystack_end) const
{
if (0 == needle_size)
return haystack;
while (haystack < haystack_end)
{
if (haystack + n <= haystack_end && page_safe(haystack))
{
const auto v_haystack = _mm_loadu_si128(reinterpret_cast<const __m128i *>(haystack));
const auto v_against_l = _mm_cmpeq_epi8(v_haystack, patl);
const auto v_against_u = _mm_cmpeq_epi8(v_haystack, patu);
const auto v_against_l_or_u = _mm_or_si128(v_against_l, v_against_u);
const auto mask = _mm_movemask_epi8(v_against_l_or_u);
if (mask == 0)
{
haystack += n;
continue;
}
const auto offset = _bit_scan_forward(mask);
haystack += offset;
if (haystack < haystack_end && haystack + n <= haystack_end && page_safe(haystack))
{
const auto v_haystack = _mm_loadu_si128(reinterpret_cast<const __m128i *>(haystack));
const auto v_against_l = _mm_cmpeq_epi8(v_haystack, cachel);
const auto v_against_u = _mm_cmpeq_epi8(v_haystack, cacheu);
const auto v_against_l_or_u = _mm_or_si128(v_against_l, v_against_u);
const auto mask = _mm_movemask_epi8(v_against_l_or_u);
if (0xffff == cachemask)
{
if (mask == cachemask)
{
auto haystack_pos = haystack + n;
auto needle_pos = needle + n;
while (haystack_pos < haystack_end && needle_pos < needle_end &&
std::tolower(*haystack_pos) == std::tolower(*needle_pos))
++haystack_pos, ++needle_pos;
if (needle_pos == needle_end)
return haystack;
}
}
else if ((mask & cachemask) == cachemask)
return haystack;
++haystack;
continue;
}
}
if (haystack == haystack_end)
return haystack_end;
if (*haystack == l || *haystack == u)
{
auto haystack_pos = haystack + 1;
auto needle_pos = needle + 1;
while (haystack_pos < haystack_end && needle_pos < needle_end &&
std::tolower(*haystack_pos) == std::tolower(*needle_pos))
++haystack_pos, ++needle_pos;
if (needle_pos == needle_end)
return haystack;
}
++haystack;
}
return haystack_end;
}
};
/// Case-sensitive searcher (both ASCII and UTF-8)
template <bool ASCII> class StringSearcher<true, ASCII>
{
static constexpr auto n = sizeof(__m128i);
const int page_size = getpagesize();
/// string to be searched for
const UInt8 * const needle;
const std::size_t needle_size;
const UInt8 * const needle_end = needle + needle_size;
/// first character in `needle`
UInt8 first{};
/// vector filled `first` for determining leftmost position of the first symbol
__m128i pattern;
/// vector of first 16 characters of `needle`
__m128i cache = _mm_setzero_si128();
int cachemask{};
bool page_safe(const void * const ptr) const
{
return ((page_size - 1) & reinterpret_cast<std::uintptr_t>(ptr)) <= page_size - n;
}
public:
StringSearcher(const char * const needle_, const std::size_t needle_size)
: needle{reinterpret_cast<const UInt8 *>(needle_)}, needle_size{needle_size}
{
if (0 == needle_size)
return;
first = *needle;
pattern = _mm_set1_epi8(first);
auto needle_pos = needle;
for (const auto i : ext::range(0, n))
{
cache = _mm_srli_si128(cache, 1);
if (needle_pos != needle_end)
{
cache = _mm_insert_epi8(cache, *needle_pos, n - 1);
cachemask |= 1 << i;
++needle_pos;
}
}
}
bool compare(const UInt8 * pos) const
{
if (page_safe(pos))
{
const auto v_haystack = _mm_loadu_si128(reinterpret_cast<const __m128i *>(pos));
const auto v_against_cache = _mm_cmpeq_epi8(v_haystack, cache);
const auto mask = _mm_movemask_epi8(v_against_cache);
if (0xffff == cachemask)
{
if (mask == cachemask)
{
pos += n;
auto needle_pos = needle + n;
while (needle_pos < needle_end && *pos == *needle_pos)
++pos, ++needle_pos;
if (needle_pos == needle_end)
return true;
}
}
else if ((mask & cachemask) == cachemask)
return true;
return false;
}
if (*pos == first)
{
++pos;
auto needle_pos = needle + 1;
while (needle_pos < needle_end && *pos == *needle_pos)
++pos, ++needle_pos;
if (needle_pos == needle_end)
return true;
}
return false;
}
const UInt8 * search(const UInt8 * haystack, const UInt8 * const haystack_end) const
{
if (0 == needle_size)
return haystack;
while (haystack < haystack_end)
{
if (haystack + n <= haystack_end && page_safe(haystack))
{
/// find first character
const auto v_haystack = _mm_loadu_si128(reinterpret_cast<const __m128i *>(haystack));
const auto v_against_pattern = _mm_cmpeq_epi8(v_haystack, pattern);
const auto mask = _mm_movemask_epi8(v_against_pattern);
/// first character not present in 16 octets starting at `haystack`
if (mask == 0)
{
haystack += n;
continue;
}
const auto offset = _bit_scan_forward(mask);
haystack += offset;
if (haystack < haystack_end && haystack + n <= haystack_end && page_safe(haystack))
{
/// check for first 16 octets
const auto v_haystack = _mm_loadu_si128(reinterpret_cast<const __m128i *>(haystack));
const auto v_against_cache = _mm_cmpeq_epi8(v_haystack, cache);
const auto mask = _mm_movemask_epi8(v_against_cache);
if (0xffff == cachemask)
{
if (mask == cachemask)
{
auto haystack_pos = haystack + n;
auto needle_pos = needle + n;
while (haystack_pos < haystack_end && needle_pos < needle_end &&
*haystack_pos == *needle_pos)
++haystack_pos, ++needle_pos;
if (needle_pos == needle_end)
return haystack;
}
}
else if ((mask & cachemask) == cachemask)
return haystack;
++haystack;
continue;
}
}
if (haystack == haystack_end)
return haystack_end;
if (*haystack == first)
{
auto haystack_pos = haystack + 1;
auto needle_pos = needle + 1;
while (haystack_pos < haystack_end && needle_pos < needle_end &&
*haystack_pos == *needle_pos)
++haystack_pos, ++needle_pos;
if (needle_pos == needle_end)
return haystack;
}
++haystack;
}
return haystack_end;
}
};
using ASCIICaseSensitiveStringSearcher = StringSearcher<true, true>;
using ASCIICaseInsensitiveStringSearcher = StringSearcher<false, true>;
using UTF8CaseSensitiveStringSearcher = StringSearcher<true, false>;
using UTF8CaseInsensitiveStringSearcher = StringSearcher<false, false>;
}

View File

@ -0,0 +1,54 @@
#pragma once
#include <DB/Core/Types.h>
#include <x86intrin.h>
namespace DB
{
namespace UTF8
{
static const UInt8 CONTINUATION_OCTET_MASK = 0b11000000u;
static const UInt8 CONTINUATION_OCTET = 0b10000000u;
/// return true if `octet` binary repr starts with 10 (octet is a UTF-8 sequence continuation)
inline bool isContinuationOctet(const UInt8 octet)
{
return (octet & CONTINUATION_OCTET_MASK) == CONTINUATION_OCTET;
}
/// moves `s` backward until either first non-continuation octet
inline void syncBackward(const UInt8 * & s)
{
while (isContinuationOctet(*s))
--s;
}
/// moves `s` forward until either first non-continuation octet or string end is met
inline void syncForward(const UInt8 * & s, const UInt8 * const end)
{
while (s < end && isContinuationOctet(*s))
++s;
}
/// returns UTF-8 code point sequence length judging by it's first octet
inline std::size_t seqLength(const UInt8 first_octet)
{
if (first_octet < 0x80u)
return 1;
const std::size_t bits = 8;
const auto first_zero = _bit_scan_reverse(static_cast<UInt8>(~first_octet));
return bits - 1 - first_zero;
}
}
}

View File

@ -1,5 +1,10 @@
#pragma once
#include <DB/Common/StringSearcher.h>
#include <Poco/UTF8Encoding.h>
#include <Poco/Unicode.h>
#include <ext/range.hpp>
#include <x86intrin.h>
#include <stdint.h>
#include <string.h>
@ -23,111 +28,426 @@
*
* Используется невыровненный доступ к памяти.
*/
class Volnitsky
namespace DB
{
private:
typedef uint8_t offset_t; /// Смещение в needle. Для основного алгоритма, длина needle не должна быть больше 255.
typedef uint16_t ngram_t; /// n-грамма (2 байта).
const char * needle;
size_t needle_size;
const char * needle_end;
size_t step; /// Насколько двигаемся, если n-грамма из haystack не нашлась в хэш-таблице.
/// @todo store lowercase needle to speed up in case there are numerous occurrences of bigrams from needle in haystack
template <typename CRTP>
class VolnitskyBase
{
protected:
using offset_t = uint8_t; /// Смещение в needle. Для основного алгоритма, длина needle не должна быть больше 255.
using ngram_t = uint16_t; /// n-грамма (2 байта).
const UInt8 * const needle;
const size_t needle_size;
const UInt8 * const needle_end = needle + needle_size;
/// На сколько двигаемся, если n-грамма из haystack не нашлась в хэш-таблице.
const size_t step = needle_size - sizeof(ngram_t) + 1;
/** max needle length is 255, max distinct ngrams for case-sensitive is (255 - 1), case-insensitive is 4 * (255 - 1)
* storage of 64K ngrams (n = 2, 128 KB) should be large enough for both cases */
static const size_t hash_size = 64 * 1024; /// Помещается в L2-кэш.
offset_t hash[hash_size]; /// Хэш-таблица.
bool fallback; /// Нужно ли использовать fallback алгоритм.
/// fallback алгоритм
static const char * naive_memmem(const char * haystack, size_t haystack_size, const char * needle, size_t needle_size)
{
const char * pos = haystack;
const char * end = haystack + haystack_size;
while (nullptr != (pos = reinterpret_cast<const char *>(memchr(pos, needle[0], end - pos))) && pos + needle_size <= end)
{
if (0 == memcmp(pos, needle, needle_size))
return pos;
else
++pos;
}
return end;
}
/// min haystack size to use main algorithm instead of fallback
static constexpr auto min_haystack_size_for_algorithm = 20000;
const bool fallback; /// Нужно ли использовать fallback алгоритм.
public:
/** haystack_size_hint - ожидаемый суммарный размер haystack при вызовах search. Можно не указывать.
* Если указать его достаточно маленьким, то будет использован fallback алгоритм,
* так как считается, что тратить время на инициализацию хэш-таблицы не имеет смысла.
*/
Volnitsky(const char * needle_, size_t needle_size_, size_t haystack_size_hint = 0)
: needle(needle_), needle_size(needle_size_), needle_end(needle + needle_size), step(needle_size - sizeof(ngram_t) + 1)
VolnitskyBase(const char * const needle, const size_t needle_size, size_t haystack_size_hint = 0)
: needle{reinterpret_cast<const UInt8 *>(needle)}, needle_size{needle_size},
fallback{
needle_size < 2 * sizeof(ngram_t) or needle_size >= std::numeric_limits<offset_t>::max() or
(haystack_size_hint and haystack_size_hint < min_haystack_size_for_algorithm)
}
{
if (needle_size < 2 * sizeof(ngram_t)
|| needle_size >= std::numeric_limits<offset_t>::max()
|| (haystack_size_hint && haystack_size_hint < 20000))
{
fallback = true;
if (fallback)
return;
}
else
fallback = false;
memset(hash, 0, hash_size * sizeof(hash[0]));
memset(hash, 0, sizeof(hash));
for (int i = needle_size - sizeof(ngram_t); i >= 0; --i)
{
/// Кладём смещение для n-грама в соответствующую ему ячейку или ближайшую свободную.
size_t cell_num = *reinterpret_cast<const ngram_t *>(needle + i) % hash_size;
while (hash[cell_num])
cell_num = (cell_num + 1) % hash_size; /// Поиск следующей свободной ячейки.
hash[cell_num] = i + 1;
}
/// int is used here because unsigned can't be used with condition like `i >= 0`, unsigned always >= 0
for (auto i = static_cast<int>(needle_size - sizeof(ngram_t)); i >= 0; --i)
self().putNGram(this->needle + i, i + 1);
}
/// Если не найдено - возвращается конец haystack.
const char * search(const char * haystack, size_t haystack_size) const
const UInt8 * search(const UInt8 * const haystack, const size_t haystack_size) const
{
if (needle_size == 0)
return haystack;
const char * haystack_end = haystack + haystack_size;
const auto haystack_end = haystack + haystack_size;
if (needle_size == 1)
{
const char * res = reinterpret_cast<const char *>(memchr(haystack, needle[0], haystack_size));
return res ? res : haystack_end;
}
if (fallback || haystack_size <= needle_size)
{
return naive_memmem(haystack, haystack_size, needle, needle_size);
}
if (needle_size == 1 || fallback || haystack_size <= needle_size)
return self().search_fallback(haystack, haystack_end);
/// Будем "прикладывать" needle к haystack и сравнивать n-грам из конца needle.
const char * pos = haystack + needle_size - sizeof(ngram_t);
const auto * pos = haystack + needle_size - sizeof(ngram_t);
for (; pos <= haystack_end - needle_size; pos += step)
{
/// Смотрим все ячейки хэш-таблицы, которые могут соответствовать n-граму из haystack.
for (size_t cell_num = *reinterpret_cast<const ngram_t *>(pos) % hash_size; hash[cell_num]; cell_num = (cell_num + 1) % hash_size)
for (size_t cell_num = toNGram(pos) % hash_size; hash[cell_num];
cell_num = (cell_num + 1) % hash_size)
{
/// Когда нашли - сравниваем побайтово, используя смещение из хэш-таблицы.
const char * res = pos - (hash[cell_num] - 1);
for (size_t i = 0; i < needle_size; ++i)
if (res[i] != needle[i])
goto next_hash_cell;
const auto res = pos - (hash[cell_num] - 1);
return res;
next_hash_cell:;
if (self().compare(res))
return res;
}
}
/// Оставшийся хвостик.
return naive_memmem(pos - step + 1, haystack_end - (pos - step + 1), needle, needle_size);
return self().search_fallback(pos - step + 1, haystack_end);
}
const unsigned char * search(const unsigned char * haystack, size_t haystack_size) const
const char * search(const char * haystack, size_t haystack_size) const
{
return reinterpret_cast<const unsigned char *>(search(reinterpret_cast<const char *>(haystack), haystack_size));
return reinterpret_cast<const char *>(search(reinterpret_cast<const UInt8 *>(haystack), haystack_size));
}
protected:
CRTP & self() { return static_cast<CRTP &>(*this); }
const CRTP & self() const { return const_cast<VolnitskyBase *>(this)->self(); }
static const ngram_t & toNGram(const UInt8 * const pos)
{
return *reinterpret_cast<const ngram_t *>(pos);
}
void putNGramBase(const ngram_t ngram, const int offset)
{
/// Кладём смещение для n-грама в соответствующую ему ячейку или ближайшую свободную.
size_t cell_num = ngram % hash_size;
while (hash[cell_num])
cell_num = (cell_num + 1) % hash_size; /// Поиск следующей свободной ячейки.
hash[cell_num] = offset;
}
};
template <bool CaseSensitive, bool ASCII> struct VolnitskyImpl;
/// Case sensitive comparison
template <bool ASCII> struct VolnitskyImpl<true, ASCII> : VolnitskyBase<VolnitskyImpl<true, ASCII>>
{
VolnitskyImpl(const char * const needle, const size_t needle_size, const size_t haystack_size_hint = 0)
: VolnitskyBase<VolnitskyImpl<true, ASCII>>{needle, needle_size, haystack_size_hint},
fallback_searcher{needle, needle_size}
{
}
void putNGram(const UInt8 * const pos, const int offset)
{
this->putNGramBase(this->toNGram(pos), offset);
}
bool compare(const UInt8 * const pos) const
{
/// @todo: maybe just use memcmp for this case and rely on internal SSE optimization as in case with memcpy?
return fallback_searcher.compare(pos);
}
const UInt8 * search_fallback(const UInt8 * const haystack, const UInt8 * const haystack_end) const
{
return fallback_searcher.search(haystack, haystack_end);
}
ASCIICaseSensitiveStringSearcher fallback_searcher;
};
/// Case-insensitive ASCII
template <> struct VolnitskyImpl<false, true> : VolnitskyBase<VolnitskyImpl<false, true>>
{
VolnitskyImpl(const char * const needle, const size_t needle_size, const size_t haystack_size_hint = 0)
: VolnitskyBase{needle, needle_size, haystack_size_hint}, fallback_searcher{needle, needle_size}
{
}
void putNGram(const UInt8 * const pos, const int offset)
{
union {
ngram_t n;
UInt8 c[2];
};
n = toNGram(pos);
const auto c0_alpha = std::isalpha(c[0]);
const auto c1_alpha = std::isalpha(c[1]);
if (c0_alpha && c1_alpha)
{
/// 4 combinations: AB, aB, Ab, ab
c[0] = std::tolower(c[0]);
c[1] = std::tolower(c[1]);
putNGramBase(n, offset);
c[0] = std::toupper(c[0]);
putNGramBase(n, offset);
c[1] = std::toupper(c[1]);
putNGramBase(n, offset);
c[0] = std::tolower(c[0]);
putNGramBase(n, offset);
}
else if (c0_alpha)
{
/// 2 combinations: A1, a1
c[0] = std::tolower(c[0]);
putNGramBase(n, offset);
c[0] = std::toupper(c[0]);
putNGramBase(n, offset);
}
else if (c1_alpha)
{
/// 2 combinations: 0B, 0b
c[1] = std::tolower(c[1]);
putNGramBase(n, offset);
c[1] = std::toupper(c[1]);
putNGramBase(n, offset);
}
else
/// 1 combination: 01
putNGramBase(n, offset);
}
bool compare(const UInt8 * const pos) const
{
return fallback_searcher.compare(pos);
}
const UInt8 * search_fallback(const UInt8 * const haystack, const UInt8 * const haystack_end) const
{
return fallback_searcher.search(haystack, haystack_end);
}
ASCIICaseInsensitiveStringSearcher fallback_searcher;
};
/// Case-sensitive UTF-8
template <> struct VolnitskyImpl<false, false> : VolnitskyBase<VolnitskyImpl<false, false>>
{
VolnitskyImpl(const char * const needle, const size_t needle_size, const size_t haystack_size_hint = 0)
: VolnitskyBase{needle, needle_size, haystack_size_hint}, fallback_searcher{needle, needle_size}
{
}
void putNGram(const UInt8 * const pos, const int offset)
{
union
{
ngram_t n;
UInt8 c[2];
};
n = toNGram(pos);
if (isascii(c[0]) && isascii(c[1]))
{
const auto c0_al = std::isalpha(c[0]);
const auto c1_al = std::isalpha(c[1]);
if (c0_al && c1_al)
{
/// 4 combinations: AB, aB, Ab, ab
c[0] = std::tolower(c[0]);
c[1] = std::tolower(c[1]);
putNGramBase(n, offset);
c[0] = std::toupper(c[0]);
putNGramBase(n, offset);
c[1] = std::toupper(c[1]);
putNGramBase(n, offset);
c[0] = std::tolower(c[0]);
putNGramBase(n, offset);
}
else if (c0_al)
{
/// 2 combinations: A1, a1
c[0] = std::tolower(c[0]);
putNGramBase(n, offset);
c[0] = std::toupper(c[0]);
putNGramBase(n, offset);
}
else if (c1_al)
{
/// 2 combinations: 0B, 0b
c[1] = std::tolower(c[1]);
putNGramBase(n, offset);
c[1] = std::toupper(c[1]);
putNGramBase(n, offset);
}
else
/// 1 combination: 01
putNGramBase(n, offset);
}
else
{
using Seq = UInt8[6];
static const Poco::UTF8Encoding utf8;
if (UTF8::isContinuationOctet(c[1]))
{
/// ngram is inside a sequence
auto seq_pos = pos;
UTF8::syncBackward(seq_pos);
const auto u32 = utf8.convert(seq_pos);
const auto l_u32 = Poco::Unicode::toLower(u32);
const auto u_u32 = Poco::Unicode::toUpper(u32);
/// symbol is case-independent
if (l_u32 == u_u32)
putNGramBase(n, offset);
else
{
/// where is the given ngram in respect to UTF-8 sequence start?
const auto seq_ngram_offset = pos - seq_pos;
Seq seq;
/// put ngram from lowercase
utf8.convert(l_u32, seq, sizeof(seq));
c[0] = seq[seq_ngram_offset];
c[1] = seq[seq_ngram_offset + 1];
putNGramBase(n, offset);
/// put ngram for uppercase
utf8.convert(u_u32, seq, sizeof(seq));
c[0] = seq[seq_ngram_offset];
c[1] = seq[seq_ngram_offset + 1];
putNGramBase(n, offset);
}
}
else
{
/// ngram is on the boundary of two sequences
/// first sequence may start before u_pos if it is not ASCII
auto first_seq_pos = pos;
UTF8::syncBackward(first_seq_pos);
const auto first_u32 = utf8.convert(first_seq_pos);
const auto first_l_u32 = Poco::Unicode::toLower(first_u32);
const auto first_u_u32 = Poco::Unicode::toUpper(first_u32);
/// second sequence always start immediately after u_pos
auto second_seq_pos = pos + 1;
const auto second_u32 = utf8.convert(second_seq_pos);
const auto second_l_u32 = Poco::Unicode::toLower(second_u32);
const auto second_u_u32 = Poco::Unicode::toUpper(second_u32);
/// both symbols are case-independent
if (first_l_u32 == first_u_u32 && second_l_u32 == second_u_u32)
putNGramBase(n, offset);
else if (first_l_u32 == first_u_u32)
{
/// first symbol is case-independent
Seq seq;
/// put ngram for lowercase
utf8.convert(second_l_u32, seq, sizeof(seq));
c[1] = seq[0];
putNGramBase(n, offset);
/// put ngram from uppercase
utf8.convert(second_u_u32, seq, sizeof(seq));
c[1] = seq[0];
putNGramBase(n, offset);
}
else if (second_l_u32 == second_u_u32)
{
/// second symbol is case-independent
/// where is the given ngram in respect to the first UTF-8 sequence start?
const auto seq_ngram_offset = pos - first_seq_pos;
Seq seq;
/// put ngram for lowercase
utf8.convert(second_l_u32, seq, sizeof(seq));
c[0] = seq[seq_ngram_offset];
putNGramBase(n, offset);
/// put ngram for uppercase
utf8.convert(second_u_u32, seq, sizeof(seq));
c[0] = seq[seq_ngram_offset];
putNGramBase(n, offset);
}
else
{
/// where is the given ngram in respect to the first UTF-8 sequence start?
const auto seq_ngram_offset = pos - first_seq_pos;
Seq first_l_seq, first_u_seq, second_l_seq, second_u_seq;
utf8.convert(first_l_u32, first_l_seq, sizeof(first_l_seq));
utf8.convert(first_u_u32, first_u_seq, sizeof(first_u_seq));
utf8.convert(second_l_u32, second_l_seq, sizeof(second_l_seq));
utf8.convert(second_u_u32, second_u_seq, sizeof(second_u_seq));
/// ngram for ll
c[0] = first_l_seq[seq_ngram_offset];
c[1] = second_l_seq[0];
putNGramBase(n, offset);
/// ngram for lU
c[0] = first_l_seq[seq_ngram_offset];
c[1] = second_u_seq[0];
putNGramBase(n, offset);
/// ngram for Ul
c[0] = first_u_seq[seq_ngram_offset];
c[1] = second_l_seq[0];
putNGramBase(n, offset);
/// ngram for UU
c[0] = first_u_seq[seq_ngram_offset];
c[1] = second_u_seq[0];
putNGramBase(n, offset);
}
}
}
}
bool compare(const UInt8 * const pos) const
{
return fallback_searcher.compare(pos);
}
const UInt8 * search_fallback(const UInt8 * const haystack, const UInt8 * const haystack_end) const
{
return fallback_searcher.search(haystack, haystack_end);
}
UTF8CaseInsensitiveStringSearcher fallback_searcher;
};
using Volnitsky = VolnitskyImpl<true, true>;
using VolnitskyUTF8 = VolnitskyImpl<true, false>; /// exactly same as Volnitsky
using VolnitskyCaseInsensitive = VolnitskyImpl<false, true>; /// ignores non-ASCII bytes
using VolnitskyCaseInsensitiveUTF8 = VolnitskyImpl<false, false>;
}

View File

@ -13,6 +13,7 @@
#include <DB/Columns/ColumnConst.h>
#include <DB/Common/Volnitsky.h>
#include <DB/Functions/IFunction.h>
#include <DB/Common/StringSearcher.h>
#include <re2/re2.h>
#include <re2/stringpiece.h>
#include <Poco/UTF8Encoding.h>
@ -52,6 +53,7 @@ namespace DB
*/
template <bool CaseSensitive>
struct PositionImpl
{
typedef UInt64 ResultType;
@ -69,7 +71,7 @@ struct PositionImpl
/// Текущий индекс в массиве строк.
size_t i = 0;
Volnitsky searcher(needle.data(), needle.size(), end - pos);
VolnitskyImpl<CaseSensitive, true> searcher(needle.data(), needle.size(), end - pos);
/// Искать будем следующее вхождение сразу во всех строках.
while (pos < end && end != (pos = searcher.search(pos, end - pos)))
@ -94,8 +96,14 @@ struct PositionImpl
memset(&res[i], 0, (res.size() - i) * sizeof(res[0]));
}
static void constant(const std::string & data, const std::string & needle, UInt64 & res)
static void constant(std::string data, std::string needle, UInt64 & res)
{
if (!CaseSensitive)
{
std::transform(std::begin(data), std::end(data), std::begin(data), tolower);
std::transform(std::begin(needle), std::end(needle), std::begin(needle), tolower);
}
res = data.find(needle);
if (res == std::string::npos)
res = 0;
@ -105,43 +113,7 @@ struct PositionImpl
};
namespace
{
const UInt8 utf8_continuation_octet_mask = 0b11000000u;
const UInt8 utf8_continuation_octet = 0b10000000u;
/// return true if `octet` binary repr starts with 10 (octet is a UTF-8 sequence continuation)
bool utf8_is_continuation_octet(const UInt8 octet)
{
return (octet & utf8_continuation_octet_mask) == utf8_continuation_octet;
}
/// moves `s` forward until either first non-continuation octet or string end is met
void utf8_sync_forward(const UInt8 * & s, const UInt8 * const end = nullptr)
{
while (s < end && utf8_is_continuation_octet(*s))
++s;
}
/// returns UTF-8 code point sequence length judging by it's first octet
std::size_t utf8_seq_length(const UInt8 first_octet)
{
if (first_octet < 0x80u)
return 1;
const std::size_t bits = 8;
const auto first_zero = _bit_scan_reverse(static_cast<UInt8>(~first_octet));
return bits - 1 - first_zero;
}
}
template <bool CaseSensitive>
struct PositionUTF8Impl
{
typedef UInt64 ResultType;
@ -157,7 +129,7 @@ struct PositionUTF8Impl
/// Текущий индекс в массиве строк.
size_t i = 0;
Volnitsky searcher(needle.data(), needle.size(), end - pos);
VolnitskyImpl<CaseSensitive, false> searcher(needle.data(), needle.size(), end - pos);
/// Искать будем следующее вхождение сразу во всех строках.
while (pos < end && end != (pos = searcher.search(pos, end - pos)))
@ -175,7 +147,7 @@ struct PositionUTF8Impl
/// А теперь надо найти, сколько кодовых точек находится перед pos.
res[i] = 1;
for (const UInt8 * c = begin + (i != 0 ? offsets[i - 1] : 0); c < pos; ++c)
if (!utf8_is_continuation_octet(*c))
if (!UTF8::isContinuationOctet(*c))
++res[i];
}
else
@ -188,15 +160,36 @@ struct PositionUTF8Impl
memset(&res[i], 0, (res.size() - i) * sizeof(res[0]));
}
static void constant(const std::string & data, const std::string & needle, UInt64 & res)
static void constant(std::string data, std::string needle, UInt64 & res)
{
if (!CaseSensitive)
{
static const Poco::UTF8Encoding utf8;
auto data_pos = reinterpret_cast<UInt8 *>(&data[0]);
const auto data_end = data_pos + data.size();
while (data_pos < data_end)
{
const auto len = utf8.convert(Poco::Unicode::toLower(utf8.convert(data_pos)), data_pos, data_end - data_pos);
data_pos += len;
}
auto needle_pos = reinterpret_cast<UInt8 *>(&needle[0]);
const auto needle_end = needle_pos + needle.size();
while (needle_pos < needle_end)
{
const auto len = utf8.convert(Poco::Unicode::toLower(utf8.convert(needle_pos)), needle_pos, needle_end - needle_pos);
needle_pos += len;
}
}
const auto pos = data.find(needle);
if (pos != std::string::npos)
{
/// А теперь надо найти, сколько кодовых точек находится перед pos.
res = 1;
for (const auto i : ext::range(0, pos))
if (!utf8_is_continuation_octet(static_cast<UInt8>(data[i])))
if (!UTF8::isContinuationOctet(static_cast<UInt8>(data[i])))
++res;
}
else
@ -207,143 +200,6 @@ struct PositionUTF8Impl
struct PositionCaseInsensitiveImpl
{
private:
class CaseInsensitiveSearcher
{
static constexpr auto n = sizeof(__m128i);
const int page_size = getpagesize();
/// string to be searched for
const std::string & needle;
/// lower and uppercase variants of the first character in `needle`
UInt8 l{};
UInt8 u{};
/// vectors filled with `l` and `u`, for determining leftmost position of the first symbol
__m128i patl, patu;
/// lower and uppercase vectors of first 16 characters of `needle`
__m128i cachel = _mm_setzero_si128(), cacheu = _mm_setzero_si128();
int cachemask{};
bool page_safe(const void * const ptr) const
{
return ((page_size - 1) & reinterpret_cast<std::uintptr_t>(ptr)) <= page_size - n;
}
public:
CaseInsensitiveSearcher(const std::string & needle) : needle(needle)
{
if (needle.empty())
return;
auto needle_pos = needle.data();
l = std::tolower(*needle_pos);
u = std::toupper(*needle_pos);
patl = _mm_set1_epi8(l);
patu = _mm_set1_epi8(u);
const auto needle_end = needle_pos + needle.size();
for (const auto i : ext::range(0, n))
{
cachel = _mm_srli_si128(cachel, 1);
cacheu = _mm_srli_si128(cacheu, 1);
if (needle_pos != needle_end)
{
cachel = _mm_insert_epi8(cachel, std::tolower(*needle_pos), n - 1);
cacheu = _mm_insert_epi8(cacheu, std::toupper(*needle_pos), n - 1);
cachemask |= 1 << i;
++needle_pos;
}
}
}
const UInt8 * find(const UInt8 * haystack, const UInt8 * const haystack_end) const
{
if (needle.empty())
return haystack;
const auto needle_begin = reinterpret_cast<const UInt8 *>(needle.data());
const auto needle_end = needle_begin + needle.size();
while (haystack < haystack_end)
{
/// @todo supposedly for long strings spanning across multiple pages. Why don't we use this technique in other places?
if (haystack + n <= haystack_end && page_safe(haystack))
{
const auto v_haystack = _mm_loadu_si128(reinterpret_cast<const __m128i *>(haystack));
const auto v_against_l = _mm_cmpeq_epi8(v_haystack, patl);
const auto v_against_u = _mm_cmpeq_epi8(v_haystack, patu);
const auto v_against_l_or_u = _mm_or_si128(v_against_l, v_against_u);
const auto mask = _mm_movemask_epi8(v_against_l_or_u);
if (mask == 0)
{
haystack += n;
continue;
}
const auto offset = _bit_scan_forward(mask);
haystack += offset;
if (haystack < haystack_end && haystack + n <= haystack_end && page_safe(haystack))
{
const auto v_haystack = _mm_loadu_si128(reinterpret_cast<const __m128i *>(haystack));
const auto v_against_l = _mm_cmpeq_epi8(v_haystack, cachel);
const auto v_against_u = _mm_cmpeq_epi8(v_haystack, cacheu);
const auto v_against_l_or_u = _mm_or_si128(v_against_l, v_against_u);
const auto mask = _mm_movemask_epi8(v_against_l_or_u);
if (0xffff == cachemask)
{
if (mask == cachemask)
{
auto haystack_pos = haystack + n;
auto needle_pos = needle_begin + n;
while (haystack_pos < haystack_end && needle_pos < needle_end &&
std::tolower(*haystack_pos) == std::tolower(*needle_pos))
++haystack_pos, ++needle_pos;
if (needle_pos == needle_end)
return haystack;
}
}
else if ((mask & cachemask) == cachemask)
return haystack;
++haystack;
continue;
}
}
if (haystack == haystack_end)
return haystack_end;
if (*haystack == l || *haystack == u)
{
auto haystack_pos = haystack + 1;
auto needle_pos = needle_begin + 1;
while (haystack_pos < haystack_end && needle_pos < needle_end &&
std::tolower(*haystack_pos) == std::tolower(*needle_pos))
++haystack_pos, ++needle_pos;
if (needle_pos == needle_end)
return haystack;
}
++haystack;
}
return haystack_end;
}
};
public:
using ResultType = UInt64;
@ -351,7 +207,7 @@ public:
const ColumnString::Chars_t & data, const ColumnString::Offsets_t & offsets, const std::string & needle,
PODArray<UInt64> & res)
{
const CaseInsensitiveSearcher searcher{needle};
const ASCIICaseInsensitiveStringSearcher searcher{needle.data(), needle.size()};
const UInt8 * begin = &data[0];
const UInt8 * pos = begin;
@ -361,7 +217,7 @@ public:
size_t i = 0;
/// Искать будем следующее вхождение сразу во всех строках.
while (pos < end && end != (pos = searcher.find(pos, end)))
while (pos < end && end != (pos = searcher.search(pos, end)))
{
/// Определим, к какому индексу оно относится.
while (begin + offsets[i] <= pos)
@ -397,297 +253,6 @@ public:
};
struct PositionCaseInsensitiveUTF8Impl
{
private:
class CaseInsensitiveSearcher
{
using UTF8SequenceBuffer = UInt8[6];
static constexpr auto n = sizeof(__m128i);
const int page_size = getpagesize();
/// string to be searched for
const std::string & needle;
bool first_needle_symbol_is_ascii{};
/// lower and uppercase variants of the first octet of the first character in `needle`
UInt8 l{};
UInt8 u{};
/// vectors filled with `l` and `u`, for determining leftmost position of the first symbol
__m128i patl, patu;
/// lower and uppercase vectors of first 16 characters of `needle`
__m128i cachel = _mm_setzero_si128(), cacheu = _mm_setzero_si128();
int cachemask{};
std::size_t cache_valid_len{};
std::size_t cache_actual_len{};
bool page_safe(const void * const ptr) const
{
return ((page_size - 1) & reinterpret_cast<std::uintptr_t>(ptr)) <= page_size - n;
}
public:
CaseInsensitiveSearcher(const std::string & needle) : needle(needle)
{
if (needle.empty())
return;
static const Poco::UTF8Encoding utf8;
UTF8SequenceBuffer l_seq, u_seq;
auto needle_pos = reinterpret_cast<const UInt8 *>(needle.data());
if (*needle_pos < 0x80u)
{
first_needle_symbol_is_ascii = true;
l = std::tolower(*needle_pos);
u = std::toupper(*needle_pos);
}
else
{
const auto first_u32 = utf8.convert(needle_pos);
const auto first_l_u32 = Poco::Unicode::toLower(first_u32);
const auto first_u_u32 = Poco::Unicode::toUpper(first_u32);
/// lower and uppercase variants of the first octet of the first character in `needle`
utf8.convert(first_l_u32, l_seq, sizeof(l_seq));
l = l_seq[0];
utf8.convert(first_u_u32, u_seq, sizeof(u_seq));
u = u_seq[0];
}
/// for detecting leftmost position of the first symbol
patl = _mm_set1_epi8(l);
patu = _mm_set1_epi8(u);
/// lower and uppercase vectors of first 16 octets of `needle`
const auto needle_end = needle_pos + needle.size();
for (std::size_t i = 0; i < n;)
{
if (needle_pos == needle_end)
{
cachel = _mm_srli_si128(cachel, 1);
cacheu = _mm_srli_si128(cacheu, 1);
++i;
continue;
}
const auto src_len = utf8_seq_length(*needle_pos);
const auto c_u32 = utf8.convert(needle_pos);
const auto c_l_u32 = Poco::Unicode::toLower(c_u32);
const auto c_u_u32 = Poco::Unicode::toUpper(c_u32);
const auto dst_l_len = static_cast<UInt8>(utf8.convert(c_l_u32, l_seq, sizeof(l_seq)));
const auto dst_u_len = static_cast<UInt8>(utf8.convert(c_u_u32, u_seq, sizeof(u_seq)));
/// @note Unicode standard states it is a rare but possible occasion
if (!(dst_l_len == dst_u_len && dst_u_len == src_len))
throw Exception{
"UTF8 sequences with different lowercase and uppercase lengths are not supported",
ErrorCodes::UNSUPPORTED_PARAMETER
};
cache_actual_len += src_len;
if (cache_actual_len < n)
cache_valid_len += src_len;
for (std::size_t j = 0; j < src_len && i < n; ++j, ++i)
{
cachel = _mm_srli_si128(cachel, 1);
cacheu = _mm_srli_si128(cacheu, 1);
if (needle_pos != needle_end)
{
cachel = _mm_insert_epi8(cachel, l_seq[j], n - 1);
cacheu = _mm_insert_epi8(cacheu, u_seq[j], n - 1);
cachemask |= 1 << i;
++needle_pos;
}
}
}
}
const UInt8 * find(const UInt8 * haystack, const UInt8 * const haystack_end) const
{
if (needle.empty())
return haystack;
static const Poco::UTF8Encoding utf8;
const auto needle_begin = reinterpret_cast<const UInt8 *>(needle.data());
const auto needle_end = needle_begin + needle.size();
while (haystack < haystack_end)
{
if (haystack + n <= haystack_end && page_safe(haystack))
{
const auto v_haystack = _mm_loadu_si128(reinterpret_cast<const __m128i *>(haystack));
const auto v_against_l = _mm_cmpeq_epi8(v_haystack, patl);
const auto v_against_u = _mm_cmpeq_epi8(v_haystack, patu);
const auto v_against_l_or_u = _mm_or_si128(v_against_l, v_against_u);
const auto mask = _mm_movemask_epi8(v_against_l_or_u);
if (mask == 0)
{
haystack += n;
utf8_sync_forward(haystack, haystack_end);
continue;
}
const auto offset = _bit_scan_forward(mask);
haystack += offset;
if (haystack < haystack_end && haystack + n <= haystack_end && page_safe(haystack))
{
const auto v_haystack = _mm_loadu_si128(reinterpret_cast<const __m128i *>(haystack));
const auto v_against_l = _mm_cmpeq_epi8(v_haystack, cachel);
const auto v_against_u = _mm_cmpeq_epi8(v_haystack, cacheu);
const auto v_against_l_or_u = _mm_or_si128(v_against_l, v_against_u);
const auto mask = _mm_movemask_epi8(v_against_l_or_u);
if (0xffff == cachemask)
{
if (mask == cachemask)
{
auto haystack_pos = haystack + cache_valid_len;
auto needle_pos = needle_begin + cache_valid_len;
while (haystack_pos < haystack_end && needle_pos < needle_end &&
Poco::Unicode::toLower(utf8.convert(haystack_pos)) ==
Poco::Unicode::toLower(utf8.convert(needle_pos)))
{
/// @note assuming sequences for lowercase and uppercase have exact same length
const auto len = utf8_seq_length(*haystack_pos);
haystack_pos += len, needle_pos += len;
}
if (needle_pos == needle_end)
return haystack;
}
}
else if ((mask & cachemask) == cachemask)
return haystack;
/// first octet was ok, but not the first 16, move to start of next sequence and reapply
haystack += utf8_seq_length(*haystack);
continue;
}
}
if (haystack == haystack_end)
return haystack_end;
if (*haystack == l || *haystack == u)
{
auto haystack_pos = haystack + first_needle_symbol_is_ascii;
auto needle_pos = needle_begin + first_needle_symbol_is_ascii;
while (haystack_pos < haystack_end && needle_pos < needle_end &&
Poco::Unicode::toLower(utf8.convert(haystack_pos)) ==
Poco::Unicode::toLower(utf8.convert(needle_pos)))
{
const auto len = utf8_seq_length(*haystack_pos);
haystack_pos += len, needle_pos += len;
}
if (needle_pos == needle_end)
return haystack;
}
/// advance to the start of the next sequence
haystack += utf8_seq_length(*haystack);
}
return haystack_end;
}
};
public:
using ResultType = UInt64;
static void vector(
const ColumnString::Chars_t & data, const ColumnString::Offsets_t & offsets, const std::string & needle,
PODArray<UInt64> & res)
{
const CaseInsensitiveSearcher searcher{needle};
const UInt8 * begin = &data[0];
const UInt8 * pos = begin;
const UInt8 * end = pos + data.size();
/// Текущий индекс в массиве строк.
size_t i = 0;
/// Искать будем следующее вхождение сразу во всех строках.
while (pos < end && end != (pos = searcher.find(pos, end)))
{
/// Определим, к какому индексу оно относится.
while (begin + offsets[i] <= pos)
{
res[i] = 0;
++i;
}
/// Проверяем, что вхождение не переходит через границы строк.
if (pos + needle.size() < begin + offsets[i])
{
/// А теперь надо найти, сколько кодовых точек находится перед pos.
res[i] = 1;
for (const UInt8 * c = begin + (i != 0 ? offsets[i - 1] : 0); c < pos; ++c)
if (!utf8_is_continuation_octet(*c))
++res[i];
}
else
res[i] = 0;
pos = begin + offsets[i];
++i;
}
memset(&res[i], 0, (res.size() - i) * sizeof(res[0]));
}
static void constant(std::string data, std::string needle, UInt64 & res)
{
static const Poco::UTF8Encoding utf8;
auto data_pos = reinterpret_cast<UInt8 *>(&data[0]);
const auto data_end = data_pos + data.size();
while (data_pos < data_end)
{
const auto len = utf8.convert(Poco::Unicode::toLower(utf8.convert(data_pos)), data_pos, data_end - data_pos);
data_pos += len;
}
auto needle_pos = reinterpret_cast<UInt8 *>(&needle[0]);
const auto needle_end = needle_pos + needle.size();
while (needle_pos < needle_end)
{
const auto len = utf8.convert(Poco::Unicode::toLower(utf8.convert(needle_pos)), needle_pos, needle_end - needle_pos);
needle_pos += len;
}
const auto pos = data.find(needle);
if (pos != std::string::npos)
{
/// А теперь надо найти, сколько кодовых точек находится перед pos.
res = 1;
for (const auto i : ext::range(0, pos))
if (!utf8_is_continuation_octet(static_cast<UInt8>(data[i])))
++res;
}
else
res = 0;
}
};
/// Переводит выражение LIKE в regexp re2. Например, abc%def -> ^abc.*def$
inline String likePatternToRegexp(const String & pattern)
{
@ -1744,10 +1309,11 @@ struct NameReplaceAll { static constexpr auto name = "replaceAll"; };
struct NameReplaceRegexpOne { static constexpr auto name = "replaceRegexpOne"; };
struct NameReplaceRegexpAll { static constexpr auto name = "replaceRegexpAll"; };
typedef FunctionsStringSearch<PositionImpl, NamePosition> FunctionPosition;
typedef FunctionsStringSearch<PositionUTF8Impl, NamePositionUTF8> FunctionPositionUTF8;
typedef FunctionsStringSearch<PositionImpl<true>, NamePosition> FunctionPosition;
typedef FunctionsStringSearch<PositionUTF8Impl<true>, NamePositionUTF8> FunctionPositionUTF8;
typedef FunctionsStringSearch<PositionCaseInsensitiveImpl, NamePositionCaseInsensitive> FunctionPositionCaseInsensitive;
typedef FunctionsStringSearch<PositionCaseInsensitiveUTF8Impl, NamePositionCaseInsensitiveUTF8> FunctionPositionCaseInsensitiveUTF8;
typedef FunctionsStringSearch<PositionUTF8Impl<false>, NamePositionCaseInsensitiveUTF8> FunctionPositionCaseInsensitiveUTF8;
typedef FunctionsStringSearch<MatchImpl<false>, NameMatch> FunctionMatch;
typedef FunctionsStringSearch<MatchImpl<true>, NameLike> FunctionLike;
typedef FunctionsStringSearch<MatchImpl<true, true>, NameNotLike> FunctionNotLike;

View File

@ -83,7 +83,6 @@ public:
/// Массив шардов. Для каждого шарда - массив адресов реплик (серверов, считающихся идентичными).
typedef std::vector<Addresses> AddressesWithFailover;
public:
const Addresses & getShardsInfo() const { return addresses; }
const AddressesWithFailover & getShardsWithFailoverInfo() const { return addresses_with_failover; }
const Addresses & getLocalShardsInfo() const { return local_addresses; }

View File

@ -166,6 +166,8 @@ struct Settings
/** Для запросов SELECT из реплицируемой таблицы, кидать исключение, если на реплике нет куска, записанного с кворумом; \
* не читать куски, которые ещё не были записаны с кворумом. */ \
M(SettingUInt64, select_sequential_consistency, 0) \
/** Максимальное количество различных шардов и максимальное количество реплик одного шарда в функции remote. */ \
M(SettingUInt64, table_function_remote_max_addresses, 1000) \
/// Всевозможные ограничения на выполнение запроса.
Limits limits;

View File

@ -36,7 +36,7 @@ inline void evaluateMissingDefaults(Block & block,
return;
/** ExpressionAnalyzer eliminates "unused" columns, in order to ensure their safety
* we are going to operate on a copy instead of the original block */
* we are going to operate on a copy instead of the original block */
Block copy_block{block};
/// evaluate default values for defaulted columns
ExpressionAnalyzer{default_expr_list, context, {}, required_columns}.getActions(true)->execute(copy_block);

View File

@ -1,13 +1,13 @@
#pragma once
#include <DB/Parsers/IAST.h>
#include <DB/Parsers/ASTQueryWithOutput.h>
namespace DB
{
struct ASTCheckQuery : public IAST
struct ASTCheckQuery : public ASTQueryWithOutput
{
ASTCheckQuery(StringRange range_ = StringRange()) : IAST(range_) {};
ASTCheckQuery(StringRange range_ = StringRange()) : ASTQueryWithOutput(range_) {};
/** Получить текст, который идентифицирует этот элемент. */
String getID() const override { return ("CheckQuery_" + database + "_" + table); };

View File

@ -99,10 +99,9 @@ public:
private:
virtual const NamesAndTypesList & getColumnsListImpl() const = 0;
using ColumnsListRange = boost::range::joined_range<
const boost::iterator_range<NamesAndTypesList::const_iterator>,
const boost::iterator_range<NamesAndTypesList::const_iterator>>;
ColumnsListRange getColumnsListIterator() const;
using ColumnsListRange = boost::range::joined_range<const NamesAndTypesList, const NamesAndTypesList>;
/// Returns a lazily joined range of table's ordinary and materialized columns, without unnecessary copying
ColumnsListRange getColumnsListRange() const;
};
}

View File

@ -24,9 +24,6 @@ namespace DB
class TableFunctionRemote : public ITableFunction
{
public:
/// Максимальное количество различных шардов и максимальное количество реплик одного шарда
const size_t MAX_ADDRESSES = 1000; /// TODO Перенести в Settings.
std::string getName() const override { return "remote"; }
StoragePtr execute(ASTPtr ast_function, Context & context) const override
@ -109,11 +106,13 @@ public:
if (ASTIdentifier * id = typeid_cast<ASTIdentifier *>(arg.get()))
id->kind = ASTIdentifier::Table;
size_t max_addresses = context.getSettingsRef().table_function_remote_max_addresses;
std::vector<std::vector<String>> names;
std::vector<String> shards = parseDescription(description, 0, description.size(), ',');
std::vector<String> shards = parseDescription(description, 0, description.size(), ',', max_addresses);
for (size_t i = 0; i < shards.size(); ++i)
names.push_back(parseDescription(shards[i], 0, shards[i].size(), '|'));
names.push_back(parseDescription(shards[i], 0, shards[i].size(), '|', max_addresses));
if (names.empty())
throw Exception("Shard list is empty after parsing first argument", ErrorCodes::BAD_ARGUMENTS);
@ -164,7 +163,7 @@ private:
}
/// Декартово произведение двух множеств строк, результат записываем на место первого аргумента
void append(std::vector<String> & to, const std::vector<String> & what) const
void append(std::vector<String> & to, const std::vector<String> & what, size_t max_addresses) const
{
if (what.empty()) return;
if (to.empty())
@ -172,7 +171,7 @@ private:
to = what;
return;
}
if (what.size() * to.size() > MAX_ADDRESSES)
if (what.size() * to.size() > max_addresses)
throw Exception("Storage Distributed, first argument generates too many result addresses",
ErrorCodes::BAD_ARGUMENTS);
std::vector<String> res;
@ -209,7 +208,7 @@ private:
* abc{1..9}de{f,g,h} - прямое произведение, 27 шардов.
* abc{1..9}de{0|1} - прямое произведение, 9 шардов, в каждом 2 реплики.
*/
std::vector<String> parseDescription(const String & description, size_t l, size_t r, char separator) const
std::vector<String> parseDescription(const String & description, size_t l, size_t r, char separator, size_t max_addresses) const
{
std::vector<String> res;
std::vector<String> cur;
@ -263,7 +262,7 @@ private:
throw Exception("Storage Distributed, incorrect argument in braces (left number is greater then right): "
+ description.substr(i, m - i + 1),
ErrorCodes::BAD_ARGUMENTS);
if (right - left + 1 > MAX_ADDRESSES)
if (right - left + 1 > max_addresses)
throw Exception("Storage Distributed, first argument generates too many result addresses",
ErrorCodes::BAD_ARGUMENTS);
bool add_leading_zeroes = false;
@ -282,25 +281,29 @@ private:
buffer.push_back(cur);
}
} else if (have_splitter) /// Если внутри есть текущий разделитель, то сгенерировать множество получаемых строк
buffer = parseDescription(description, i + 1, m, separator);
buffer = parseDescription(description, i + 1, m, separator, max_addresses);
else /// Иначе просто скопировать, порождение произойдет при вызове с правильным разделителем
buffer.push_back(description.substr(i, m - i + 1));
/// К текущему множеству строк добавить все возможные полученные продолжения
append(cur, buffer);
append(cur, buffer, max_addresses);
i = m;
} else if (description[i] == separator) {
}
else if (description[i] == separator)
{
/// Если разделитель, то добавляем в ответ найденные строки
res.insert(res.end(), cur.begin(), cur.end());
cur.clear();
} else {
}
else
{
/// Иначе просто дописываем символ к текущим строкам
std::vector<String> buffer;
buffer.push_back(description.substr(i, 1));
append(cur, buffer);
append(cur, buffer, max_addresses);
}
}
res.insert(res.end(), cur.begin(), cur.end());
if (res.size() > MAX_ADDRESSES)
if (res.size() > max_addresses)
throw Exception("Storage Distributed, first argument generates too many result addresses",
ErrorCodes::BAD_ARGUMENTS);
return res;

View File

@ -1,6 +1,7 @@
#include <DB/Interpreters/Cluster.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/Common/isLocalAddress.h>
#include <DB/Common/SimpleCache.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
@ -8,17 +9,42 @@ namespace DB
{
/// Для кэширования DNS запросов.
static Poco::Net::SocketAddress resolveSocketAddressImpl1(const String & host, UInt16 port)
{
return Poco::Net::SocketAddress(host, port);
}
static Poco::Net::SocketAddress resolveSocketAddressImpl2(const String & host_and_port)
{
return Poco::Net::SocketAddress(host_and_port);
}
static Poco::Net::SocketAddress resolveSocketAddress(const String & host, UInt16 port)
{
static SimpleCache<decltype(resolveSocketAddressImpl1), &resolveSocketAddressImpl1> cache;
return cache(host, port);
}
static Poco::Net::SocketAddress resolveSocketAddress(const String & host_and_port)
{
static SimpleCache<decltype(resolveSocketAddressImpl2), &resolveSocketAddressImpl2> cache;
return cache(host_and_port);
}
Cluster::Address::Address(const String & config_prefix)
{
auto & config = Poco::Util::Application::instance().config();
host_name = config.getString(config_prefix + ".host");
port = config.getInt(config_prefix + ".port");
resolved_address = Poco::Net::SocketAddress(host_name, port);
resolved_address = resolveSocketAddress(host_name, port);
user = config.getString(config_prefix + ".user", "default");
password = config.getString(config_prefix + ".password", "");
}
Cluster::Address::Address(const String & host_port_, const String & user_, const String & password_)
: user(user_), password(password_)
{
@ -27,18 +53,19 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const
/// Похоже на то, что строка host_port_ содержит порт. Если условие срабатывает - не обязательно значит, что порт есть (пример: [::]).
if (nullptr != strchr(host_port_.c_str(), ':') || !default_port)
{
resolved_address = Poco::Net::SocketAddress(host_port_);
resolved_address = resolveSocketAddress(host_port_);
host_name = host_port_.substr(0, host_port_.find(':'));
port = resolved_address.port();
}
else
{
resolved_address = Poco::Net::SocketAddress(host_port_, default_port);
resolved_address = resolveSocketAddress(host_port_, default_port);
host_name = host_port_;
port = default_port;
}
}
namespace
{
inline std::string addressToDirName(const Cluster::Address & address)
@ -67,6 +94,8 @@ Clusters::Clusters(const Settings & settings, const String & config_name)
Cluster::Cluster(const Settings & settings, const String & cluster_name)
{
/// Создать кластер.
Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config();
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(cluster_name, config_keys);
@ -161,22 +190,25 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name)
++current_shard_num;
}
/// Создать соответствующие пулы соединений.
if (!addresses_with_failover.empty() && !addresses.empty())
throw Exception("There must be either 'node' or 'shard' elements in config", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
if (addresses_with_failover.size())
if (!addresses_with_failover.empty())
{
for (const auto & shard : addresses_with_failover)
{
ConnectionPools replicas;
replicas.reserve(shard.size());
bool has_local_replics = false;
bool has_local_replica = false;
for (const auto & replica : shard)
{
if (isLocal(replica))
{
has_local_replics = true;
has_local_replica = true;
local_addresses.push_back(replica);
break;
}
@ -193,13 +225,13 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name)
}
}
if (has_local_replics)
if (has_local_replica)
++local_nodes_num;
else
pools.emplace_back(new ConnectionPoolWithFailover(replicas, settings.load_balancing, settings.connections_with_failover_max_tries));
}
}
else if (addresses.size())
else if (!addresses.empty())
{
for (const auto & address : addresses)
{
@ -234,15 +266,13 @@ Cluster::Cluster(const Settings & settings, std::vector<std::vector<String>> nam
Addresses current;
for (auto & replica : shard)
current.emplace_back(replica, username, password);
addresses_with_failover.emplace_back(current);
}
for (const auto & shard : addresses_with_failover)
{
ConnectionPools replicas;
replicas.reserve(shard.size());
replicas.reserve(current.size());
for (const auto & replica : shard)
for (const auto & replica : current)
{
replicas.emplace_back(new ConnectionPool(
settings.distributed_connections_pool_size,

View File

@ -781,6 +781,9 @@ void ExpressionActions::finalize(const Names & output_columns)
for (const auto & name : action.prerequisite_names)
++columns_refcount[name];
for (const auto & name_alias : action.projection)
++columns_refcount[name_alias.first];
}
Actions new_actions;
@ -809,6 +812,8 @@ void ExpressionActions::finalize(const Names & output_columns)
for (const auto & name : action.prerequisite_names)
process(name);
/// Для projection тут нет уменьшения refcount, так как действие project заменяет имена у столбцов, по сути, уже удаляя их под старыми именами.
}
actions.swap(new_actions);

View File

@ -141,12 +141,18 @@ void ExpressionAnalyzer::init()
/// Удалить ненужное из списка columns. Создать unknown_required_columns. Сформировать columns_added_by_join.
collectUsedColumns();
/// has_aggregation, aggregation_keys, aggregate_descriptions, aggregated_columns.
analyzeAggregation();
/// external_tables, subqueries_for_sets для глобальных подзапросов.
/// Заменяет глобальные подзапросы на сгенерированные имена временных таблиц, которые будут отправлены на удалённые серверы.
initGlobalSubqueriesAndExternalTables();
/// has_aggregation, aggregation_keys, aggregate_descriptions, aggregated_columns.
/// Этот анализ надо провести после обработки глобальных подзапросов, потому что в противном случае,
/// если агрегатная функция содержит глобальный подзапрос, то метод analyzeAggregation сохранит
/// в aggregate_descriptions информацию о параметрах этой агрегатной функции, среди которых окажется
/// глобальный подзапрос. Затем при вызове метода initGlobalSubqueriesAndExternalTables, этот
/// глобальный подзапрос будет заменён на временную таблицу, в результате чего aggregate_descriptions
/// будет содержать устаревшую информацию, что приведёт к ошибке при выполнении запроса.
analyzeAggregation();
}

View File

@ -215,8 +215,8 @@ BlockIO InterpreterCheckQuery::execute()
{
status_column->insert(static_cast<UInt64>(status_value));
structure_class_column->insert(static_cast<UInt64>(desc.structure_class));
host_name_column->insert(desc.extra_info.resolved_address);
host_address_column->insert(desc.extra_info.host);
host_name_column->insert(desc.extra_info.host);
host_address_column->insert(desc.extra_info.resolved_address);
port_column->insert(static_cast<UInt64>(desc.extra_info.port));
user_column->insert(desc.extra_info.user);
structure_column->insert(desc.names_with_types);

View File

@ -11,6 +11,7 @@ bool ParserCheckQuery::parseImpl(IParser::Pos & pos, IParser::Pos end, ASTPtr &
ParserWhiteSpaceOrComments ws;
ParserString s_check("CHECK", true, true);
ParserString s_table("TABLE", true, true);
ParserString s_format("FORMAT", true, true);
ParserString s_dot(".");
ParserIdentifier table_parser;
@ -46,6 +47,22 @@ bool ParserCheckQuery::parseImpl(IParser::Pos & pos, IParser::Pos end, ASTPtr &
query->table = typeid_cast<ASTIdentifier &>(*table).name;
}
ws.ignore(pos, end);
/// FORMAT format_name
if (s_format.ignore(pos, end, max_parsed_pos, expected))
{
ws.ignore(pos, end);
ParserIdentifier format_p;
if (!format_p.parse(pos, end, query->format, max_parsed_pos, expected))
return false;
typeid_cast<ASTIdentifier &>(*query->format).kind = ASTIdentifier::Format;
ws.ignore(pos, end);
}
node = query;
return true;
}

View File

@ -5,6 +5,9 @@
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTNameTypePair.h>
#include <DB/Interpreters/Context.h>
#include <ext/map.hpp>
#include <ext/identity.hpp>
#include <ext/collection_cast.hpp>
namespace DB
@ -12,25 +15,19 @@ namespace DB
NamesAndTypesList ITableDeclaration::getColumnsList() const
{
auto columns = getColumnsListImpl();
columns.insert(std::end(columns), std::begin(materialized_columns), std::end(materialized_columns));
return columns;
return ext::collection_cast<NamesAndTypesList>(getColumnsListRange());
}
ITableDeclaration::ColumnsListRange ITableDeclaration::getColumnsListIterator() const
ITableDeclaration::ColumnsListRange ITableDeclaration::getColumnsListRange() const
{
const auto & columns = getColumnsListImpl();
return boost::join(
boost::iterator_range<NamesAndTypesList::const_iterator>(columns.begin(), columns.end()),
boost::iterator_range<NamesAndTypesList::const_iterator>(std::begin(materialized_columns), std::end(materialized_columns)));
return boost::join(getColumnsListImpl(), materialized_columns);
}
bool ITableDeclaration::hasRealColumn(const String & column_name) const
{
for (auto & it : getColumnsListIterator())
for (auto & it : getColumnsListRange())
if (it.name == column_name)
return true;
return false;
@ -39,16 +36,13 @@ bool ITableDeclaration::hasRealColumn(const String & column_name) const
Names ITableDeclaration::getColumnNamesList() const
{
Names res;
for (auto & it : getColumnsListIterator())
res.push_back(it.name);
return res;
return ext::map<Names>(getColumnsListRange(), [] (const auto & it) { return it.name; });
}
NameAndTypePair ITableDeclaration::getRealColumn(const String & column_name) const
{
for (auto & it : getColumnsListIterator())
for (auto & it : getColumnsListRange())
if (it.name == column_name)
return it;
throw Exception("There is no column " + column_name + " in table.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
@ -85,7 +79,7 @@ NameAndTypePair ITableDeclaration::getColumn(const String & column_name) const
const DataTypePtr ITableDeclaration::getDataTypeByName(const String & column_name) const
{
for (const auto & column : getColumnsListIterator())
for (const auto & column : getColumnsListRange())
if (column.name == column_name)
return column.type;
@ -97,7 +91,7 @@ Block ITableDeclaration::getSampleBlock() const
{
Block res;
for (const auto & col : getColumnsListIterator())
for (const auto & col : getColumnsListRange())
res.insert({ col.type->createColumn(), col.type, col.name });
return res;

View File

@ -0,0 +1,6 @@
DROP TABLE IF EXISTS test.storage;
CREATE TABLE test.storage(UserID UInt64) ENGINE=Memory;
INSERT INTO test.storage(UserID) values (6460432721393873721)(6460432721393873721)(6460432721393873721)(6460432721393873721)(6460432721393873721)(6460432721393873721)(6460432721393873721)(402895971392036118)(402895971392036118)(402895971392036118);
SELECT sum(UserID GLOBAL IN (SELECT UserID FROM remote('127.0.0.{1,2}', test.storage))) FROM remote('127.0.0.{1,2}', test.storage);
SELECT sum(UserID GLOBAL IN (SELECT UserID FROM test.storage)) FROM remote('127.0.0.{1,2}', test.storage);

View File

@ -0,0 +1,2 @@
1 0 0
2015-01-01 2015-01-01 01:02:03 111 123 456 789 456 9434005089510819894 9434005089510819894

View File

@ -0,0 +1,23 @@
DROP TABLE IF EXISTS test.defaults;
CREATE TABLE test.defaults (a UInt8, b DEFAULT 0, c DEFAULT identity(b)) ENGINE = Memory;
INSERT INTO test.defaults (a) VALUES (1);
SELECT * FROM test.defaults;
DROP TABLE test.defaults;
DROP TABLE IF EXISTS test.elog_cut;
CREATE TABLE test.elog_cut
(
date Date DEFAULT toDate(uts),
uts DateTime,
pr UInt64,
ya_uid UInt64,
adf_uid UInt64,
owner_id UInt32,
eff_uid UInt64 DEFAULT if(adf_uid != 0, adf_uid, ya_uid),
page_session UInt64 DEFAULT cityHash64(eff_uid, pr),
sample_key UInt64 ALIAS page_session
) ENGINE = MergeTree(date, cityHash64(adf_uid, ya_uid, pr), (owner_id, date, cityHash64(adf_uid, ya_uid, pr)), 8192);
INSERT INTO test.elog_cut (uts, pr, ya_uid, adf_uid, owner_id) VALUES ('2015-01-01 01:02:03', 111, 123, 456, 789);
SELECT date, uts, pr, ya_uid, adf_uid, owner_id, eff_uid, page_session, sample_key FROM test.elog_cut;
DROP TABLE test.elog_cut;

View File

@ -0,0 +1,13 @@
0 (0,'2015-01-01')
1 (1,'2015-01-02')
2 (2,'2015-01-03')
3 (3,'2015-01-04')
4 (4,'2015-01-05')
5 (5,'2015-01-06')
6 (6,'2015-01-07')
7 (7,'2015-01-08')
8 (8,'2015-01-09')
9 (9,'2015-01-10')
0 (0,'2015-01-01')
9 (9,'2015-01-10')

View File

@ -0,0 +1 @@
SELECT number, (number, toDate('2015-01-01') + number) FROM system.numbers LIMIT 10 SETTINGS extremes = 1;

View File

@ -0,0 +1,23 @@
#pragma once
namespace ext
{
/** \brief Returns collection of specified container-type.
* Retains stored value_type, constructs resulting collection using iterator range. */
template <template <typename...> class ResultCollection, typename Collection>
auto collection_cast(const Collection & collection)
{
using value_type = typename Collection::value_type;
return ResultCollection<value_type>(std::begin(collection), std::end(collection));
};
/** \brief Returns collection of specified type.
* Performs implicit conversion of between source and result value_type, if available and required. */
template <typename ResultCollection, typename Collection>
auto collection_cast(const Collection & collection)
{
return ResultCollection(std::begin(collection), std::end(collection));
}
}

View File

@ -0,0 +1,22 @@
#pragma once
namespace ext
{
/// \brief Identity function for use with other algorithms as a pass-through.
class identity
{
/** \brief Function pointer type template for converting identity to a function pointer.
* Presumably useless, provided for completeness. */
template <typename T> using function_ptr_t = T &&(*)(T &&);
/** \brief Implementation of identity as a non-instance member function for taking function pointer. */
template <typename T> static T && invoke(T && t) { return std::forward<T>(t); }
public:
/** \brief Returns the value passed as a sole argument using perfect forwarding. */
template <typename T> T && operator()(T && t) const { return std::forward<T>(t); }
/** \brief Allows conversion of identity instance to a function pointer. */
template <typename T> operator function_ptr_t<T>() const { return &invoke; };
};
}