Implement countSubstrings()

Function to count number of substring occurrences in the string:
- in case of needle is multi char - counts non-intersecting substrings
- the code is based on position helpers.

The following new functions is available:
- countSubstrings()
- countSubstringsCaseInsensitive()
- countSubstringsCaseInsensitiveUTF8()

v0: substringCount()

v2:
- add substringCountCaseInsensitiveUTF8
- improve tests
- fix coding style issues
- fix multichar needle

v3: rename to countSubstrings (by analogy with countEqual())
This commit is contained in:
Azat Khuzhin 2020-11-26 21:16:07 +03:00
parent 9291bbb04b
commit 838596c7a4
10 changed files with 621 additions and 0 deletions

View File

@ -536,4 +536,58 @@ For case-insensitive search or/and in UTF-8 format use functions `ngramSearchCas
!!! note "Note"
For UTF-8 case we use 3-gram distance. All these are not perfectly fair n-gram distances. We use 2-byte hashes to hash n-grams and then calculate the (non-)symmetric difference between these hash tables collisions may occur. With UTF-8 case-insensitive format we do not use fair `tolower` function we zero the 5-th bit (starting from zero) of each codepoint byte and first bit of zeroth byte if bytes more than one this works for Latin and mostly for all Cyrillic letters.
## countSubstrings(haystack, needle) {#countSubstrings}
Count the number of substring occurrences
For a case-insensitive search, use the function `countSubstringsCaseInsensitive` (or `countSubstringsCaseInsensitiveUTF8`).
**Syntax**
``` sql
countSubstrings(haystack, needle[, start_pos])
```
**Parameters**
- `haystack` — The string to search in. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `needle` — The substring to search for. [String](../../sql-reference/syntax.md#syntax-string-literal).
- `start_pos` Optional parameter, position of the first character in the string to start search. [UInt](../../sql-reference/data-types/int-uint.md)
**Returned values**
- Number of occurrences.
Type: `Integer`.
**Examples**
Query:
``` sql
SELECT countSubstrings('foobar.com', '.')
```
Result:
``` text
┌─countSubstrings('foobar.com', '.')─┐
│ 1 │
└────────────────────────────────────┘
```
Query:
``` sql
SELECT countSubstrings('aaaa', 'aa')
```
Result:
``` text
┌─countSubstrings('aaaa', 'aa')─┐
│ 2 │
└───────────────────────────────┘
```
[Original article](https://clickhouse.tech/docs/en/query_language/functions/string_search_functions/) <!--hide-->

View File

@ -0,0 +1,232 @@
#pragma once
#include "PositionImpl.h"
#include <string>
#include <vector>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
}
/// Implementation of the countSubstrings() using helpers for position()
///
/// NOTE: Intersecting substrings in haystack accounted only once, i.e.:
///
/// countSubstrings('aaaa', 'aa') == 2
template <typename Impl>
struct CountSubstringsImpl
{
static constexpr bool use_default_implementation_for_constants = false;
static constexpr bool supports_start_pos = true;
using ResultType = UInt64;
/// Count occurrences of one substring in many strings.
static void vectorConstant(
const ColumnString::Chars & data,
const ColumnString::Offsets & offsets,
const std::string & needle,
const ColumnPtr & start_pos,
PaddedPODArray<UInt64> & res)
{
const UInt8 * begin = data.data();
const UInt8 * pos = begin;
const UInt8 * end = pos + data.size();
/// FIXME: suboptimal
memset(&res[0], 0, res.size() * sizeof(res[0]));
/// Current index in the array of strings.
size_t i = 0;
typename Impl::SearcherInBigHaystack searcher = Impl::createSearcherInBigHaystack(needle.data(), needle.size(), end - pos);
/// We will search for the next occurrence in all strings at once.
while (pos < end && end != (pos = searcher.search(pos, end - pos)))
{
/// Determine which index it refers to.
while (begin + offsets[i] <= pos)
++i;
auto start = start_pos != nullptr ? start_pos->getUInt(i) : 0;
/// We check that the entry does not pass through the boundaries of strings.
if (pos + needle.size() < begin + offsets[i])
{
auto res_pos = needle.size() + Impl::countChars(reinterpret_cast<const char *>(begin + offsets[i - 1]), reinterpret_cast<const char *>(pos));
if (res_pos >= start)
{
++res[i];
}
/// Intersecting substrings in haystack accounted only once
pos += needle.size();
continue;
}
pos = begin + offsets[i];
++i;
}
}
/// Count number of occurrences of substring in string.
static void constantConstantScalar(
std::string data,
std::string needle,
UInt64 start_pos,
UInt64 & res)
{
res = 0;
if (needle.size() == 0)
return;
auto start = std::max(start_pos, UInt64(1));
size_t start_byte = Impl::advancePos(data.data(), data.data() + data.size(), start - 1) - data.data();
size_t new_start_byte;
while ((new_start_byte = data.find(needle, start_byte)) != std::string::npos)
{
++res;
/// Intersecting substrings in haystack accounted only once
start_byte = new_start_byte + needle.size();
}
}
/// Count number of occurrences of substring in string starting from different positions.
static void constantConstant(
std::string data,
std::string needle,
const ColumnPtr & start_pos,
PaddedPODArray<UInt64> & res)
{
Impl::toLowerIfNeed(data);
Impl::toLowerIfNeed(needle);
if (start_pos == nullptr)
{
constantConstantScalar(data, needle, 0, res[0]);
return;
}
size_t haystack_size = Impl::countChars(data.data(), data.data() + data.size());
size_t size = start_pos != nullptr ? start_pos->size() : 0;
for (size_t i = 0; i < size; ++i)
{
auto start = start_pos->getUInt(i);
if (start > haystack_size + 1)
{
res[i] = 0;
continue;
}
constantConstantScalar(data, needle, start, res[i]);
}
}
/// Count number of occurrences of substring each time for a different inside each time different string.
static void vectorVector(
const ColumnString::Chars & haystack_data,
const ColumnString::Offsets & haystack_offsets,
const ColumnString::Chars & needle_data,
const ColumnString::Offsets & needle_offsets,
const ColumnPtr & start_pos,
PaddedPODArray<UInt64> & res)
{
ColumnString::Offset prev_haystack_offset = 0;
ColumnString::Offset prev_needle_offset = 0;
size_t size = haystack_offsets.size();
for (size_t i = 0; i < size; ++i)
{
size_t needle_size = needle_offsets[i] - prev_needle_offset - 1;
size_t haystack_size = haystack_offsets[i] - prev_haystack_offset - 1;
auto start = start_pos != nullptr ? std::max(start_pos->getUInt(i), UInt64(1)) : UInt64(1);
res[i] = 0;
if (start > haystack_size + 1)
{
/// 0 already
}
else if (0 == needle_size)
{
/// 0 already
}
else
{
/// It is assumed that the StringSearcher is not very difficult to initialize.
typename Impl::SearcherInSmallHaystack searcher = Impl::createSearcherInSmallHaystack(
reinterpret_cast<const char *>(&needle_data[prev_needle_offset]),
needle_offsets[i] - prev_needle_offset - 1); /// zero byte at the end
const UInt8 * end = reinterpret_cast<const UInt8 *>(&haystack_data[haystack_offsets[i] - 1]);
const UInt8 * beg = reinterpret_cast<const UInt8 *>(Impl::advancePos(reinterpret_cast<const char *>(&haystack_data[prev_haystack_offset]), reinterpret_cast<const char *>(end), start - 1));
const UInt8 * pos;
/// searcher returns a pointer to the found substring or to the end of `haystack`.
while ((pos = searcher.search(beg, end)) < end)
{
++res[i];
beg = pos + needle_size;
}
}
prev_haystack_offset = haystack_offsets[i];
prev_needle_offset = needle_offsets[i];
}
}
/// Count number of substrings occurrences in the single string.
static void constantVector(
const String & haystack,
const ColumnString::Chars & needle_data,
const ColumnString::Offsets & needle_offsets,
const ColumnPtr & start_pos,
PaddedPODArray<UInt64> & res)
{
/// NOTE You could use haystack indexing. But this is a rare case.
ColumnString::Offset prev_needle_offset = 0;
size_t size = needle_offsets.size();
for (size_t i = 0; i < size; ++i)
{
res[i] = 0;
auto start = start_pos != nullptr ? std::max(start_pos->getUInt(i), UInt64(1)) : UInt64(1);
if (start <= haystack.size() + 1)
{
const char * needle_beg = reinterpret_cast<const char *>(&needle_data[prev_needle_offset]);
size_t needle_size = needle_offsets[i] - prev_needle_offset - 1;
typename Impl::SearcherInSmallHaystack searcher = Impl::createSearcherInSmallHaystack(needle_beg, needle_size);
const UInt8 * end = reinterpret_cast<const UInt8 *>(haystack.data() + haystack.size());
const UInt8 * beg = reinterpret_cast<const UInt8 *>(Impl::advancePos(haystack.data(), reinterpret_cast<const char *>(end), start - 1));
const UInt8 * pos;
while ((pos = searcher.search(beg, end)) < end)
{
++res[i];
beg = pos + needle_size;
}
}
prev_needle_offset = needle_offsets[i];
}
}
template <typename... Args>
static void vectorFixedConstant(Args &&...)
{
throw Exception("Functions 'position' don't support FixedString haystack argument", ErrorCodes::ILLEGAL_COLUMN);
}
};
}

View File

@ -29,6 +29,9 @@ namespace DB
* multiMatchAnyIndex(haystack, [pattern_1, pattern_2, ..., pattern_n]) -- search by re2 regular expressions pattern_i; Returns index of any match or zero if none;
* multiMatchAllIndices(haystack, [pattern_1, pattern_2, ..., pattern_n]) -- search by re2 regular expressions pattern_i; Returns an array of matched indices in any order;
*
* countSubstrings(haystack, needle) -- count number of occurences of needle in haystack.
* countSubstringsCaseInsensitive(haystack, needle)
*
* Applies regexp re2 and pulls:
* - the first subpattern, if the regexp has a subpattern;
* - the zero subpattern (the match part, otherwise);

View File

@ -0,0 +1,24 @@
#include "FunctionsStringSearch.h"
#include "FunctionFactory.h"
#include "CountSubstringsImpl.h"
namespace DB
{
namespace
{
struct NameCountSubstrings
{
static constexpr auto name = "countSubstrings";
};
using FunctionCountSubstrings = FunctionsStringSearch<CountSubstringsImpl<PositionCaseSensitiveASCII>, NameCountSubstrings>;
}
void registerFunctionCountSubstrings(FunctionFactory & factory)
{
factory.registerFunction<FunctionCountSubstrings>(FunctionFactory::CaseInsensitive);
}
}

View File

@ -0,0 +1,24 @@
#include "FunctionsStringSearch.h"
#include "FunctionFactory.h"
#include "CountSubstringsImpl.h"
namespace DB
{
namespace
{
struct NameCountSubstringsCaseInsensitive
{
static constexpr auto name = "countSubstringsCaseInsensitive";
};
using FunctionCountSubstringsCaseInsensitive = FunctionsStringSearch<CountSubstringsImpl<PositionCaseInsensitiveASCII>, NameCountSubstringsCaseInsensitive>;
}
void registerFunctionCountSubstringsCaseInsensitive(FunctionFactory & factory)
{
factory.registerFunction<FunctionCountSubstringsCaseInsensitive>();
}
}

View File

@ -0,0 +1,24 @@
#include "FunctionsStringSearch.h"
#include "FunctionFactory.h"
#include "CountSubstringsImpl.h"
namespace DB
{
namespace
{
struct NameCountSubstringsCaseInsensitiveUTF8
{
static constexpr auto name = "countSubstringsCaseInsensitiveUTF8";
};
using FunctionCountSubstringsCaseInsensitiveUTF8 = FunctionsStringSearch<CountSubstringsImpl<PositionCaseInsensitiveUTF8>, NameCountSubstringsCaseInsensitiveUTF8>;
}
void registerFunctionCountSubstringsCaseInsensitiveUTF8(FunctionFactory & factory)
{
factory.registerFunction<FunctionCountSubstringsCaseInsensitiveUTF8>();
}
}

View File

@ -31,6 +31,10 @@ void registerFunctionMultiSearchAllPositionsCaseInsensitiveUTF8(FunctionFactory
void registerFunctionHasToken(FunctionFactory &);
void registerFunctionHasTokenCaseInsensitive(FunctionFactory &);
void registerFunctionCountSubstrings(FunctionFactory &);
void registerFunctionCountSubstringsCaseInsensitive(FunctionFactory &);
void registerFunctionCountSubstringsCaseInsensitiveUTF8(FunctionFactory &);
void registerFunctionsStringSearch(FunctionFactory & factory)
{
@ -61,6 +65,10 @@ void registerFunctionsStringSearch(FunctionFactory & factory)
registerFunctionHasToken(factory);
registerFunctionHasTokenCaseInsensitive(factory);
registerFunctionCountSubstrings(factory);
registerFunctionCountSubstringsCaseInsensitive(factory);
registerFunctionCountSubstringsCaseInsensitiveUTF8(factory);
}
}

View File

@ -208,6 +208,9 @@ SRCS(
cos.cpp
cosh.cpp
countDigits.cpp
countSubstrings.cpp
countSubstringsCaseInsensitive.cpp
countSubstringsCaseInsensitiveUTF8.cpp
currentDatabase.cpp
currentUser.cpp
dateDiff.cpp

View File

@ -0,0 +1,111 @@
# countSubstrings
CountSubstringsImpl::constantConstant
CountSubstringsImpl::constantConstantScalar
empty
0
0
0
char
1
2
3
word
1
1
1
2
3
intersect
2
CountSubstringsImpl::vectorVector
1
4
6
"intersect",4
CountSubstringsImpl::constantVector
2
1
0
3
5
"intersect",4
CountSubstringsImpl::vectorConstant
0
1
2
3
4
"intersect",4
# countSubstringsCaseInsensitive
CountSubstringsImpl::constantConstant
CountSubstringsImpl::constantConstantScalar
char
1
2
3
word
1
1
1
2
3
intersect
2
CountSubstringsImpl::vectorVector
1
3
5
CountSubstringsImpl::constantVector
2
1
0
3
5
CountSubstringsImpl::vectorConstant
1
0
0
# countSubstringsCaseInsensitiveUTF8
CountSubstringsImpl::constantConstant
CountSubstringsImpl::constantConstantScalar
char
1
2
3
word
1
1
1
2
3
intersect
2
CountSubstringsImpl::vectorVector
1
3
5
"intersect",4
CountSubstringsImpl::constantVector
2
3
5
"intersect",4
CountSubstringsImpl::vectorConstant
1
0
"intersect",4

View File

@ -0,0 +1,138 @@
--
-- countSubstrings
--
select '';
select '# countSubstrings';
select '';
select 'CountSubstringsImpl::constantConstant';
select 'CountSubstringsImpl::constantConstantScalar';
select 'empty';
select countSubstrings('', '.');
select countSubstrings('', '');
select countSubstrings('.', '');
select 'char';
select countSubstrings('foobar.com', '.');
select countSubstrings('www.foobar.com', '.');
select countSubstrings('.foobar.com.', '.');
select 'word';
select countSubstrings('foobar.com', 'com');
select countSubstrings('com.foobar', 'com');
select countSubstrings('foo.com.bar', 'com');
select countSubstrings('com.foobar.com', 'com');
select countSubstrings('com.foo.com.bar.com', 'com');
select 'intersect';
select countSubstrings('aaaa', 'aa');
select '';
select 'CountSubstringsImpl::vectorVector';
select countSubstrings(toString(number), toString(number)) from numbers(1);
select countSubstrings(concat(toString(number), '000111'), toString(number)) from numbers(1);
select countSubstrings(concat(toString(number), '000111001'), toString(number)) from numbers(1);
select 'intersect', countSubstrings(concat(toString(number), '0000000'), '00') from numbers(1) format CSV;
select '';
select 'CountSubstringsImpl::constantVector';
select countSubstrings('100', toString(number)) from numbers(3);
select countSubstrings('0100', toString(number)) from numbers(1);
select countSubstrings('010000', toString(number)) from numbers(1);
select 'intersect', countSubstrings('00000000', repeat(toString(number), 2)) from numbers(1) format CSV;
select '';
select 'CountSubstringsImpl::vectorConstant';
select countSubstrings(toString(number), '1') from system.numbers limit 3 offset 9;
select countSubstrings(concat(toString(number), '000111'), '1') from numbers(1);
select countSubstrings(concat(toString(number), '000111001'), '1') from numbers(1);
select 'intersect', countSubstrings(repeat(toString(number), 8), '00') from numbers(1) format CSV;
--
-- countSubstringsCaseInsensitive
--
select '';
select '# countSubstringsCaseInsensitive';
select '';
select 'CountSubstringsImpl::constantConstant';
select 'CountSubstringsImpl::constantConstantScalar';
select 'char';
select countSubstringsCaseInsensitive('aba', 'B');
select countSubstringsCaseInsensitive('bab', 'B');
select countSubstringsCaseInsensitive('BaBaB', 'b');
select 'word';
select countSubstringsCaseInsensitive('foobar.com', 'COM');
select countSubstringsCaseInsensitive('com.foobar', 'COM');
select countSubstringsCaseInsensitive('foo.com.bar', 'COM');
select countSubstringsCaseInsensitive('com.foobar.com', 'COM');
select countSubstringsCaseInsensitive('com.foo.com.bar.com', 'COM');
select 'intersect';
select countSubstringsCaseInsensitive('aaaa', 'AA');
select '';
select 'CountSubstringsImpl::vectorVector';
select countSubstringsCaseInsensitive(upper(char(number)), lower(char(number))) from numbers(100) where number = 0x41; -- A
select countSubstringsCaseInsensitive(concat(toString(number), 'aaa111'), char(number)) from numbers(100) where number = 0x41;
select countSubstringsCaseInsensitive(concat(toString(number), 'aaa111aa1'), char(number)) from numbers(100) where number = 0x41;
select '';
select 'CountSubstringsImpl::constantVector';
select countSubstringsCaseInsensitive('aab', char(number)) from numbers(100) where number >= 0x41 and number <= 0x43; -- A..C
select countSubstringsCaseInsensitive('abaa', char(number)) from numbers(100) where number = 0x41;
select countSubstringsCaseInsensitive('abaaaa', char(number)) from numbers(100) where number = 0x41;
select '';
select 'CountSubstringsImpl::vectorConstant';
select countSubstringsCaseInsensitive(char(number), 'a') from numbers(100) where number >= 0x41 and number <= 0x43;
--
-- countSubstringsCaseInsensitiveUTF8
--
select '';
select '# countSubstringsCaseInsensitiveUTF8';
select '';
select 'CountSubstringsImpl::constantConstant';
select 'CountSubstringsImpl::constantConstantScalar';
select 'char';
select countSubstringsCaseInsensitiveUTF8('фуу', 'Ф');
select countSubstringsCaseInsensitiveUTF8('ФуФ', 'ф');
select countSubstringsCaseInsensitiveUTF8('ФуФуФ', 'ф');
select 'word';
select countSubstringsCaseInsensitiveUTF8('подстрока.рф', 'РФ');
select countSubstringsCaseInsensitiveUTF8('рф.подстрока', 'рф');
select countSubstringsCaseInsensitiveUTF8('подстрока.рф.подстрока', 'РФ');
select countSubstringsCaseInsensitiveUTF8('рф.подстрока.рф', 'рф');
select countSubstringsCaseInsensitiveUTF8('рф.подстрока.рф.подстрока.рф', 'РФ');
select 'intersect';
select countSubstringsCaseInsensitiveUTF8('яяяя', 'ЯЯ');
select '';
select 'CountSubstringsImpl::vectorVector';
-- can't use any char, since this will not make valid UTF8
-- for the haystack we use number as-is, for needle we just add dependency from number to go to vectorVector code
select countSubstringsCaseInsensitiveUTF8(upperUTF8(concat(char(number), 'я')), lowerUTF8(concat(substringUTF8(char(number), 2), 'Я'))) from numbers(100) where number = 0x41; -- A
select countSubstringsCaseInsensitiveUTF8(concat(toString(number), 'ЯЯЯ111'), concat(substringUTF8(char(number), 2), 'я')) from numbers(100) where number = 0x41; -- A
select countSubstringsCaseInsensitiveUTF8(concat(toString(number), 'яяя111яя1'), concat(substringUTF8(char(number), 2), 'Я')) from numbers(100) where number = 0x41; -- A
select 'intersect', countSubstringsCaseInsensitiveUTF8(concat(toString(number), 'яяяяяяяя'), concat(substringUTF8(char(number), 2), 'Яя')) from numbers(100) where number = 0x41 format CSV; -- A
select '';
select 'CountSubstringsImpl::constantVector';
select countSubstringsCaseInsensitiveUTF8('ЯЯb', concat(substringUTF8(char(number), 2), 'я')) from numbers(100) where number = 0x41; -- A
select countSubstringsCaseInsensitiveUTF8('ЯbЯЯ', concat(substringUTF8(char(number), 2), 'я')) from numbers(100) where number = 0x41; -- A
select countSubstringsCaseInsensitiveUTF8('ЯbЯЯЯЯ', concat(substringUTF8(char(number), 2), 'я')) from numbers(100) where number = 0x41; -- A
select 'intersect', countSubstringsCaseInsensitiveUTF8('ЯЯЯЯЯЯЯЯ', concat(substringUTF8(char(number), 2), 'Яя')) from numbers(100) where number = 0x41 format CSV; -- A
select '';
select 'CountSubstringsImpl::vectorConstant';
select countSubstringsCaseInsensitiveUTF8(concat(char(number), 'я'), 'Я') from numbers(100) where number = 0x41; -- A
select countSubstringsCaseInsensitiveUTF8(concat(char(number), 'б'), 'Я') from numbers(100) where number = 0x41; -- A
select 'intersect', countSubstringsCaseInsensitiveUTF8(concat(char(number), repeat('я', 8)), 'яЯ') from numbers(100) where number = 0x41 format CSV; -- A