fixed string LIKE: initial implementation

This commit is contained in:
Alexey Milovidov 2020-03-26 21:55:41 +03:00
parent aca6d99d42
commit 8c0aa1bd6a
4 changed files with 174 additions and 0 deletions

View File

@ -245,6 +245,156 @@ struct MatchImpl
}
}
static void vectorFixedConstant(
const ColumnString::Chars & data, size_t n, const std::string & pattern, PaddedPODArray<UInt8> & res)
{
if (data.empty())
return;
String strstr_pattern;
/// A simple case where the LIKE expression reduces to finding a substring in a string
if (like && likePatternIsStrstr(pattern, strstr_pattern))
{
const UInt8 * begin = data.data();
const UInt8 * pos = begin;
const UInt8 * end = pos + data.size();
/// If pattern is larger than string size - it cannot be found.
if (strstr_pattern.size() <= n)
{
Volnitsky searcher(strstr_pattern.data(), strstr_pattern.size(), end - pos);
/// We will search for the next occurrence in all rows at once.
while (pos < end && end != (pos = searcher.search(pos, end - pos)))
{
/// Let's determine which index it refers to.
size_t i = (pos - begin) / n;
/// We check that the entry does not pass through the boundaries of strings.
auto next = begin + (i + 1) * n;
if (pos + strstr_pattern.size() < next)
res[i] = !revert;
else
res[i] = revert;
pos = next;
}
}
/// Tail, in which there can be no substring.
{
size_t i = (pos - begin) / n;
if (i < res.size())
memset(&res[i], revert, (res.size() - i) * sizeof(res[0]));
}
}
else
{
size_t size = data.size() / n;
const auto & regexp = Regexps::get<like, true>(pattern);
std::string required_substring;
bool is_trivial;
bool required_substring_is_prefix; /// for `anchored` execution of the regexp.
regexp->getAnalyzeResult(required_substring, is_trivial, required_substring_is_prefix);
if (required_substring.empty())
{
if (!regexp->getRE2()) /// An empty regexp. Always matches.
{
if (size)
memset(res.data(), 1, size * sizeof(res[0]));
}
else
{
size_t offset = 0;
for (size_t i = 0; i < size; ++i)
{
res[i] = revert
^ regexp->getRE2()->Match(
re2_st::StringPiece(reinterpret_cast<const char *>(&data[offset]), n),
0,
n,
re2_st::RE2::UNANCHORED,
nullptr,
0);
offset += n;
}
}
}
else
{
/// NOTE This almost matches with the case of LikePatternIsStrstr.
const UInt8 * begin = data.data();
const UInt8 * pos = begin;
const UInt8 * end = pos + data.size();
/// If required substring is larger than string size - it cannot be found.
if (strstr_pattern.size() <= n)
{
Volnitsky searcher(required_substring.data(), required_substring.size(), end - pos);
/// We will search for the next occurrence in all rows at once.
while (pos < end && end != (pos = searcher.search(pos, end - pos)))
{
/// Determine which index it refers to.
size_t i = (pos - begin) / n;
/// We check that the entry does not pass through the boundaries of strings.
auto next = begin + (i + 1) * n;
if (pos + strstr_pattern.size() < next)
{
/// And if it does not, if necessary, we check the regexp.
if (is_trivial)
res[i] = !revert;
else
{
const char * str_data = reinterpret_cast<const char *>(begin + i * n);
/** Even in the case of `required_substring_is_prefix` use UNANCHORED check for regexp,
* so that it can match when `required_substring` occurs into the string several times,
* and at the first occurrence, the regexp is not a match.
*/
if (required_substring_is_prefix)
res[i] = revert
^ regexp->getRE2()->Match(
re2_st::StringPiece(str_data, n),
reinterpret_cast<const char *>(pos) - str_data,
n,
re2_st::RE2::UNANCHORED,
nullptr,
0);
else
res[i] = revert
^ regexp->getRE2()->Match(
re2_st::StringPiece(str_data, n), 0, n, re2_st::RE2::UNANCHORED, nullptr, 0);
}
}
else
res[i] = revert;
pos = next;
}
}
/// Tail, in which there can be no substring.
{
size_t i = (pos - begin) / n;
if (i < res.size())
memset(&res[i], revert, (res.size() - i) * sizeof(res[0]));
}
}
}
}
template <typename... Args>
static void vectorVector(Args &&...)
{

View File

@ -296,6 +296,12 @@ struct PositionImpl
prev_needle_offset = needle_offsets[i];
}
}
template <typename... Args>
static void vectorFixedConstant(Args &&...)
{
throw Exception("Functions 'position' don't support FixedString haystack argument", ErrorCodes::ILLEGAL_COLUMN);
}
};
template <typename Impl>
@ -519,6 +525,12 @@ struct HasTokenImpl
{
throw Exception("Function 'hasToken' does not support non-constant needle argument", ErrorCodes::ILLEGAL_COLUMN);
}
template <typename... Args>
static void vectorFixedConstant(Args &&...)
{
throw Exception("Functions 'hasToken' don't support FixedString haystack argument", ErrorCodes::ILLEGAL_COLUMN);
}
};

View File

@ -3,6 +3,7 @@
#include <Columns/ColumnArray.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnVector.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
@ -13,6 +14,7 @@
#include <Interpreters/Context.h>
#include <common/StringRef.h>
namespace DB
{
/** Search and replace functions in strings:
@ -132,6 +134,7 @@ public:
vec_res.resize(column_haystack->size());
const ColumnString * col_haystack_vector = checkAndGetColumn<ColumnString>(&*column_haystack);
const ColumnFixedString * col_haystack_vector_fixed = checkAndGetColumn<ColumnFixedString>(&*column_haystack);
const ColumnString * col_needle_vector = checkAndGetColumn<ColumnString>(&*column_needle);
if (col_haystack_vector && col_needle_vector)
@ -144,6 +147,9 @@ public:
else if (col_haystack_vector && col_needle_const)
Impl::vectorConstant(
col_haystack_vector->getChars(), col_haystack_vector->getOffsets(), col_needle_const->getValue<String>(), vec_res);
else if (col_haystack_vector_fixed && col_needle_const)
Impl::vectorFixedConstant(
col_haystack_vector_fixed->getChars(), col_haystack_vector_fixed->getN(), col_needle_const->getValue<String>(), vec_res);
else if (col_haystack_const && col_needle_vector)
Impl::constantVector(
col_haystack_const->getValue<String>(), col_needle_vector->getChars(), col_needle_vector->getOffsets(), vec_res);

View File

@ -130,6 +130,12 @@ struct ExtractParamImpl
{
throw Exception("Functions 'visitParamHas' and 'visitParamExtract*' doesn't support non-constant needle argument", ErrorCodes::ILLEGAL_COLUMN);
}
template <typename... Args>
static void vectorFixedConstant(Args &&...)
{
throw Exception("Functions 'visitParamHas' don't support FixedString haystack argument", ErrorCodes::ILLEGAL_COLUMN);
}
};