diff --git a/dbms/include/DB/Storages/MergeTree/PKCondition.h b/dbms/include/DB/Storages/MergeTree/PKCondition.h index 2dde667a681..d34bd8c0036 100644 --- a/dbms/include/DB/Storages/MergeTree/PKCondition.h +++ b/dbms/include/DB/Storages/MergeTree/PKCondition.h @@ -219,7 +219,7 @@ class PKCondition public: /// Словарь, содержащий действия к соответствующим функциям по превращению их в RPNElement - using AtomMap = std::unordered_map; + using AtomMap = std::unordered_map; static const AtomMap atom_map; /// Не учитывает секцию SAMPLE. all_columns - набор всех столбцов таблицы. diff --git a/dbms/src/Storages/MergeTree/PKCondition.cpp b/dbms/src/Storages/MergeTree/PKCondition.cpp index 190f1c5a5d4..bdf02736524 100644 --- a/dbms/src/Storages/MergeTree/PKCondition.cpp +++ b/dbms/src/Storages/MergeTree/PKCondition.cpp @@ -12,6 +12,63 @@ namespace DB { +/// Пример: для строки Hello\_World%... возвращает Hello_World, а для строки %test% возвращает пустую строку. +static String extractFixedPrefixFromLikePattern(const String & like_pattern) +{ + String fixed_prefix; + + const char * pos = like_pattern.data(); + const char * end = pos + like_pattern.size(); + while (pos < end) + { + switch (*pos) + { + case '%': + case '_': + return fixed_prefix; + + case '\\': + ++pos; + if (pos == end) + break; + default: + fixed_prefix += *pos; + break; + } + + ++pos; + } + + return fixed_prefix; +} + + +/** Для заданной строки получить минимальную строку, которая строго больше всех строк с таким префиксом, + * или вернуть пустую строку, если таких строк не существует. + */ +static String firstStringThatIsGreaterThanAllStringsWithPrefix(const String & prefix) +{ + /** Увеличиваем последний байт префикса на единицу. Но если он равен 255, то убираем его и увеличиваем предыдущий. + * Пример (для удобства, представим, что максимальное значение байта равно z): + * abcx -> abcy + * abcz -> abd + * zzz -> пустая строка + * z -> пустая строка + */ + + String res = prefix; + + while (!res.empty() && static_cast(res.back()) == 255) + res.pop_back(); + + if (res.empty()) + return res; + + res.back() = static_cast(1 + static_cast(res.back())); + return res; +} + + const PKCondition::AtomMap PKCondition::atom_map{ { "notEquals", @@ -19,6 +76,7 @@ const PKCondition::AtomMap PKCondition::atom_map{ { out.function = RPNElement::FUNCTION_NOT_IN_RANGE; out.range = Range(value); + return true; } }, { @@ -27,6 +85,7 @@ const PKCondition::AtomMap PKCondition::atom_map{ { out.function = RPNElement::FUNCTION_IN_RANGE; out.range = Range(value); + return true; } }, { @@ -35,6 +94,7 @@ const PKCondition::AtomMap PKCondition::atom_map{ { out.function = RPNElement::FUNCTION_IN_RANGE; out.range = Range::createRightBounded(value, false); + return true; } }, { @@ -43,6 +103,7 @@ const PKCondition::AtomMap PKCondition::atom_map{ { out.function = RPNElement::FUNCTION_IN_RANGE; out.range = Range::createLeftBounded(value, false); + return true; } }, { @@ -51,6 +112,7 @@ const PKCondition::AtomMap PKCondition::atom_map{ { out.function = RPNElement::FUNCTION_IN_RANGE; out.range = Range::createRightBounded(value, true); + return true; } }, { @@ -59,6 +121,7 @@ const PKCondition::AtomMap PKCondition::atom_map{ { out.function = RPNElement::FUNCTION_IN_RANGE; out.range = Range::createLeftBounded(value, true); + return true; } }, { @@ -67,6 +130,7 @@ const PKCondition::AtomMap PKCondition::atom_map{ { out.function = RPNElement::FUNCTION_IN_SET; out.in_function = node; + return true; } }, { @@ -75,6 +139,28 @@ const PKCondition::AtomMap PKCondition::atom_map{ { out.function = RPNElement::FUNCTION_NOT_IN_SET; out.in_function = node; + return true; + } + }, + { + "like", + [] (RPNElement & out, const Field & value, ASTPtr & node) + { + if (value.getType() != Field::Types::String) + return false; + + String prefix = extractFixedPrefixFromLikePattern(value.get()); + if (prefix.empty()) + return false; + + String right_bound = firstStringThatIsGreaterThanAllStringsWithPrefix(prefix); + + out.function = RPNElement::FUNCTION_IN_RANGE; + out.range = !right_bound.empty() + ? Range(prefix, true, right_bound, false) + : Range::createLeftBounded(prefix, true); + + return true; } } }; @@ -281,15 +367,18 @@ bool PKCondition::atomFromAST(ASTPtr & node, const Context & context, Block & bl size_t column; RPNElement::MonotonicFunctionsChain chain; - if (getConstant(args[1], block_with_constants, value) && isPrimaryKeyPossiblyWrappedByMonotonicFunctions(args[0], context, column, chain)) + if (getConstant(args[1], block_with_constants, value) + && isPrimaryKeyPossiblyWrappedByMonotonicFunctions(args[0], context, column, chain)) { inverted = false; } - else if (getConstant(args[0], block_with_constants, value) && isPrimaryKeyPossiblyWrappedByMonotonicFunctions(args[1], context, column, chain)) + else if (getConstant(args[0], block_with_constants, value) + && isPrimaryKeyPossiblyWrappedByMonotonicFunctions(args[1], context, column, chain)) { inverted = true; } - else if (typeid_cast(args[1].get()) && isPrimaryKeyPossiblyWrappedByMonotonicFunctions(args[0], context, column, chain)) + else if (typeid_cast(args[1].get()) + && isPrimaryKeyPossiblyWrappedByMonotonicFunctions(args[0], context, column, chain)) { inverted = false; } @@ -309,7 +398,7 @@ bool PKCondition::atomFromAST(ASTPtr & node, const Context & context, Block & bl func_name = "lessOrEquals"; else if (func_name == "lessOrEquals") func_name = "greaterOrEquals"; - else if (func_name == "in" || func_name == "notIn") + else if (func_name == "in" || func_name == "notIn" || func_name == "like") { /// const IN x не имеет смысла (в отличие от x IN const). return false; @@ -323,9 +412,7 @@ bool PKCondition::atomFromAST(ASTPtr & node, const Context & context, Block & bl if (atom_it == std::end(atom_map)) return false; - atom_it->second(out, value, node); - - return true; + return atom_it->second(out, value, node); } else if (getConstant(node, block_with_constants, value)) /// Для случаев, когда написано, например, WHERE 0 AND something {