Merge branch 'master' of https://github.com/ClickHouse/ClickHouse into taiyang-li-disk-reload

This commit is contained in:
kssenii 2021-08-02 15:16:09 +00:00
commit 087ddeb0dc
84 changed files with 2337 additions and 477 deletions

9
.gitmodules vendored
View File

@ -225,6 +225,15 @@
[submodule "contrib/yaml-cpp"]
path = contrib/yaml-cpp
url = https://github.com/ClickHouse-Extras/yaml-cpp.git
[submodule "contrib/libstemmer_c"]
path = contrib/libstemmer_c
url = https://github.com/ClickHouse-Extras/libstemmer_c.git
[submodule "contrib/wordnet-blast"]
path = contrib/wordnet-blast
url = https://github.com/ClickHouse-Extras/wordnet-blast.git
[submodule "contrib/lemmagen-c"]
path = contrib/lemmagen-c
url = https://github.com/ClickHouse-Extras/lemmagen-c.git
[submodule "contrib/libpqxx"]
path = contrib/libpqxx
url = https://github.com/ClickHouse-Extras/libpqxx.git

View File

@ -542,6 +542,7 @@ include (cmake/find/libpqxx.cmake)
include (cmake/find/nuraft.cmake)
include (cmake/find/yaml-cpp.cmake)
include (cmake/find/s2geometry.cmake)
include (cmake/find/nlp.cmake)
if(NOT USE_INTERNAL_PARQUET_LIBRARY)
set (ENABLE_ORC OFF CACHE INTERNAL "")

32
cmake/find/nlp.cmake Normal file
View File

@ -0,0 +1,32 @@
option(ENABLE_NLP "Enable NLP functions support" ${ENABLE_LIBRARIES})
if (NOT ENABLE_NLP)
message (STATUS "NLP functions disabled")
return()
endif()
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/libstemmer_c/Makefile")
message (WARNING "submodule contrib/libstemmer_c is missing. to fix try run: \n git submodule update --init --recursive")
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't find internal libstemmer_c library, NLP functions will be disabled")
set (USE_NLP 0)
return()
endif ()
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/wordnet-blast/CMakeLists.txt")
message (WARNING "submodule contrib/wordnet-blast is missing. to fix try run: \n git submodule update --init --recursive")
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't find internal wordnet-blast library, NLP functions will be disabled")
set (USE_NLP 0)
return()
endif ()
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/lemmagen-c/README.md")
message (WARNING "submodule contrib/lemmagen-c is missing. to fix try run: \n git submodule update --init --recursive")
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't find internal lemmagen-c library, NLP functions will be disabled")
set (USE_NLP 0)
return()
endif ()
set (USE_NLP 1)
message (STATUS "Using Libraries for NLP functions: contrib/wordnet-blast, contrib/libstemmer_c, contrib/lemmagen-c")

View File

@ -328,6 +328,12 @@ endif()
add_subdirectory(fast_float)
if (USE_NLP)
add_subdirectory(libstemmer-c-cmake)
add_subdirectory(wordnet-blast-cmake)
add_subdirectory(lemmagen-c-cmake)
endif()
if (USE_SQLITE)
add_subdirectory(sqlite-cmake)
endif()

2
contrib/boost vendored

@ -1 +1 @@
Subproject commit 1ccbb5a522a571ce83b606dbc2e1011c42ecccfb
Subproject commit 9cf09dbfd55a5c6202dedbdf40781a51b02c2675

View File

@ -13,11 +13,12 @@ if (NOT USE_INTERNAL_BOOST_LIBRARY)
regex
context
coroutine
graph
)
if(Boost_INCLUDE_DIR AND Boost_FILESYSTEM_LIBRARY AND Boost_FILESYSTEM_LIBRARY AND
Boost_PROGRAM_OPTIONS_LIBRARY AND Boost_REGEX_LIBRARY AND Boost_SYSTEM_LIBRARY AND Boost_CONTEXT_LIBRARY AND
Boost_COROUTINE_LIBRARY)
Boost_COROUTINE_LIBRARY AND Boost_GRAPH_LIBRARY)
set(EXTERNAL_BOOST_FOUND 1)
@ -32,6 +33,7 @@ if (NOT USE_INTERNAL_BOOST_LIBRARY)
add_library (_boost_system INTERFACE)
add_library (_boost_context INTERFACE)
add_library (_boost_coroutine INTERFACE)
add_library (_boost_graph INTERFACE)
target_link_libraries (_boost_filesystem INTERFACE ${Boost_FILESYSTEM_LIBRARY})
target_link_libraries (_boost_iostreams INTERFACE ${Boost_IOSTREAMS_LIBRARY})
@ -40,6 +42,7 @@ if (NOT USE_INTERNAL_BOOST_LIBRARY)
target_link_libraries (_boost_system INTERFACE ${Boost_SYSTEM_LIBRARY})
target_link_libraries (_boost_context INTERFACE ${Boost_CONTEXT_LIBRARY})
target_link_libraries (_boost_coroutine INTERFACE ${Boost_COROUTINE_LIBRARY})
target_link_libraries (_boost_graph INTERFACE ${Boost_GRAPH_LIBRARY})
add_library (boost::filesystem ALIAS _boost_filesystem)
add_library (boost::iostreams ALIAS _boost_iostreams)
@ -48,6 +51,7 @@ if (NOT USE_INTERNAL_BOOST_LIBRARY)
add_library (boost::system ALIAS _boost_system)
add_library (boost::context ALIAS _boost_context)
add_library (boost::coroutine ALIAS _boost_coroutine)
add_library (boost::graph ALIAS _boost_graph)
else()
set(EXTERNAL_BOOST_FOUND 0)
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't find system boost")
@ -221,4 +225,17 @@ if (NOT EXTERNAL_BOOST_FOUND)
add_library (boost::coroutine ALIAS _boost_coroutine)
target_include_directories (_boost_coroutine PRIVATE ${LIBRARY_DIR})
target_link_libraries(_boost_coroutine PRIVATE _boost_context)
# graph
set (SRCS_GRAPH
"${LIBRARY_DIR}/libs/graph/src/graphml.cpp"
"${LIBRARY_DIR}/libs/graph/src/read_graphviz_new.cpp"
)
add_library (_boost_graph ${SRCS_GRAPH})
add_library (boost::graph ALIAS _boost_graph)
target_include_directories (_boost_graph PRIVATE ${LIBRARY_DIR})
target_link_libraries(_boost_graph PRIVATE _boost_regex)
endif ()

1
contrib/lemmagen-c vendored Submodule

@ -0,0 +1 @@
Subproject commit 59537bdcf57bbed17913292cb4502d15657231f1

View File

@ -0,0 +1,9 @@
set(LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/lemmagen-c")
set(LEMMAGEN_INCLUDE_DIR "${LIBRARY_DIR}/include")
set(SRCS
"${LIBRARY_DIR}/src/RdrLemmatizer.cpp"
)
add_library(lemmagen STATIC ${SRCS})
target_include_directories(lemmagen PUBLIC "${LEMMAGEN_INCLUDE_DIR}")

View File

@ -0,0 +1,31 @@
set(LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/libstemmer_c")
set(STEMMER_INCLUDE_DIR "${LIBRARY_DIR}/include")
FILE ( READ "${LIBRARY_DIR}/mkinc.mak" _CONTENT )
# replace '\ ' into one big line
STRING ( REGEX REPLACE "\\\\\n " " ${LIBRARY_DIR}/" _CONTENT "${_CONTENT}" )
# escape ';' (if any)
STRING ( REGEX REPLACE ";" "\\\\;" _CONTENT "${_CONTENT}" )
# now replace lf into ';' (it makes list from the line)
STRING ( REGEX REPLACE "\n" ";" _CONTENT "${_CONTENT}" )
FOREACH ( LINE ${_CONTENT} )
# skip comments (beginning with #)
IF ( NOT "${LINE}" MATCHES "^#.*" )
# parse 'name=value1 value2..." - extract the 'name' part
STRING ( REGEX REPLACE "=.*$" "" _NAME "${LINE}" )
# extract the list of values part
STRING ( REGEX REPLACE "^.*=" "" _LIST "${LINE}" )
# replace (multi)spaces into ';' (it makes list from the line)
STRING ( REGEX REPLACE " +" ";" _LIST "${_LIST}" )
# finally get our two variables
IF ( "${_NAME}" MATCHES "snowball_sources" )
SET ( _SOURCES "${_LIST}" )
ELSEIF ( "${_NAME}" MATCHES "snowball_headers" )
SET ( _HEADERS "${_LIST}" )
ENDIF ()
endif ()
endforeach ()
# all the sources parsed. Now just add the lib
add_library ( stemmer STATIC ${_SOURCES} ${_HEADERS} )
target_include_directories (stemmer PUBLIC "${STEMMER_INCLUDE_DIR}")

1
contrib/libstemmer_c vendored Submodule

@ -0,0 +1 @@
Subproject commit c753054304d87daf460057c1a649c482aa094835

1
contrib/wordnet-blast vendored Submodule

@ -0,0 +1 @@
Subproject commit 1d16ac28036e19fe8da7ba72c16a307fbdf8c87e

View File

@ -0,0 +1,13 @@
set(LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/wordnet-blast")
set(SRCS
"${LIBRARY_DIR}/wnb/core/info_helper.cc"
"${LIBRARY_DIR}/wnb/core/load_wordnet.cc"
"${LIBRARY_DIR}/wnb/core/wordnet.cc"
)
add_library(wnb ${SRCS})
target_link_libraries(wnb PRIVATE boost::headers_only boost::graph)
target_include_directories(wnb PUBLIC "${LIBRARY_DIR}")

View File

@ -23,6 +23,7 @@ RUN apt-get update \
libboost-regex-dev \
libboost-context-dev \
libboost-coroutine-dev \
libboost-graph-dev \
zlib1g-dev \
liblz4-dev \
libdouble-conversion-dev \

View File

@ -311,6 +311,7 @@ function run_tests
01411_bayesian_ab_testing
01798_uniq_theta_sketch
01799_long_uniq_theta_sketch
01890_stem # depends on libstemmer_c
collate
collation
_orc_

View File

@ -115,6 +115,7 @@ toc_title: Adopters
| <a href="http://english.sina.com/index.html" class="favicon">Sina</a> | News | — | — | — | [Slides in Chinese, October 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup19/6.%20ClickHouse最佳实践%20高鹏_新浪.pdf) |
| <a href="https://smi2.ru/" class="favicon">SMI2</a> | News | Analytics | — | — | [Blog Post in Russian, November 2017](https://habr.com/ru/company/smi2/blog/314558/) |
| <a href="https://www.spark.co.nz/" class="favicon">Spark New Zealand</a> | Telecommunications | Security Operations | — | — | [Blog Post, Feb 2020](https://blog.n0p.me/2020/02/2020-02-05-dnsmonster/) |
| <a href="https://splitbee.io" class="favicon">Splitbee</a> | Analytics | Main Product | — | — | [Blog Post, Mai 2021](https://splitbee.io/blog/new-pricing) |
| <a href="https://www.splunk.com/" class="favicon">Splunk</a> | Business Analytics | Main product | — | — | [Slides in English, January 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup12/splunk.pdf) |
| <a href="https://www.spotify.com" class="favicon">Spotify</a> | Music | Experimentation | — | — | [Slides, July 2018](https://www.slideshare.net/glebus/using-clickhouse-for-experimentation-104247173) |
| <a href="https://www.staffcop.ru/" class="favicon">Staffcop</a> | Information Security | Main Product | — | — | [Official website, Documentation](https://www.staffcop.ru/sce43) |

View File

@ -0,0 +1,125 @@
---
toc_priority: 67
toc_title: NLP
---
# Natural Language Processing functions {#nlp-functions}
## stem {#stem}
Performs stemming on a previously tokenized text.
**Syntax**
``` sql
stem('language', word)
```
**Arguments**
- `language` — Language which rules will be applied. Must be in lowercase. [String](../../sql-reference/data-types/string.md#string).
- `word` — word that needs to be stemmed. Must be in lowercase. [String](../../sql-reference/data-types/string.md#string).
**Examples**
Query:
``` sql
SELECT SELECT arrayMap(x -> stem('en', x), ['I', 'think', 'it', 'is', 'a', 'blessing', 'in', 'disguise']) as res;
```
Result:
``` text
┌─res────────────────────────────────────────────────┐
│ ['I','think','it','is','a','bless','in','disguis'] │
└────────────────────────────────────────────────────┘
```
## lemmatize {#lemmatize}
Performs lemmatization on a given word.
**Syntax**
``` sql
lemmatize('language', word)
```
**Arguments**
- `language` — Language which rules will be applied. [String](../../sql-reference/data-types/string.md#string).
- `word` — Word that needs to be lemmatized. Must be lowercase. [String](../../sql-reference/data-types/string.md#string).
**Examples**
Query:
``` sql
SELECT lemmatize('en', 'wolves');
```
Result:
``` text
┌─lemmatize("wolves")─┐
│ "wolf" │
└─────────────────────┘
```
Configuration:
``` xml
<lemmatizers>
<lemmatizer>
<lang>en</lang>
<path>en.bin</path>
</lemmatizer>
</lemmatizers>
```
## synonyms {#synonyms}
Finds synonyms to a given word.
**Syntax**
``` sql
synonyms('extension_name', word)
```
**Arguments**
- `extension_name` — Name of the extention in which search will be performed. [String](../../sql-reference/data-types/string.md#string).
- `word` — Word that will be searched in extension. [String](../../sql-reference/data-types/string.md#string).
**Examples**
Query:
``` sql
SELECT synonyms('list', 'important');
```
Result:
``` text
┌─synonyms('list', 'important')────────────┐
│ ['important','big','critical','crucial'] │
└──────────────────────────────────────────┘
```
Configuration:
``` xml
<synonyms_extensions>
<extension>
<name>en</name>
<type>plain</type>
<path>en.txt</path>
</extension>
<extension>
<name>en</name>
<type>wordnet</type>
<path>en/</path>
</extension>
</synonyms_extensions>
```

View File

@ -145,6 +145,72 @@ Result:
└────────────────────────────┘
```
## splitByWhitespace(s) {#splitbywhitespaceseparator-s}
Splits a string into substrings separated by whitespace characters.
Returns an array of selected substrings.
**Syntax**
``` sql
splitByWhitespace(s)
```
**Arguments**
- `s` — The string to split. [String](../../sql-reference/data-types/string.md).
**Returned value(s)**
Returns an array of selected substrings.
Type: [Array](../../sql-reference/data-types/array.md)([String](../../sql-reference/data-types/string.md)).
**Example**
``` sql
SELECT splitByWhitespace(' 1! a, b. ');
```
``` text
┌─splitByWhitespace(' 1! a, b. ')─┐
│ ['1!','a,','b.'] │
└─────────────────────────────────────┘
```
## splitByNonAlpha(s) {#splitbynonalphaseparator-s}
Splits a string into substrings separated by whitespace and punctuation characters.
Returns an array of selected substrings.
**Syntax**
``` sql
splitByNonAlpha(s)
```
**Arguments**
- `s` — The string to split. [String](../../sql-reference/data-types/string.md).
**Returned value(s)**
Returns an array of selected substrings.
Type: [Array](../../sql-reference/data-types/array.md)([String](../../sql-reference/data-types/string.md)).
**Example**
``` sql
SELECT splitByNonAlpha(' 1! a, b. ');
```
``` text
┌─splitByNonAlpha(' 1! a, b. ')─┐
│ ['1','a','b'] │
└───────────────────────────────────┘
```
## arrayStringConcat(arr\[, separator\]) {#arraystringconcatarr-separator}
Concatenates the strings listed in the array with the separator.separator is an optional parameter: a constant string, set to an empty string by default.

View File

@ -0,0 +1,125 @@
---
toc_priority: 67
toc_title: NLP
---
# Функции для работы с ествественным языком {#nlp-functions}
## stem {#stem}
Данная функция проводит стемминг заданного слова.
**Синтаксис**
``` sql
stem('language', word)
```
**Аргументы**
- `language` — Язык, правила которого будут применены для стемминга. Допускается только нижний регистр. [String](../../sql-reference/data-types/string.md#string).
- `word` — Слово подлежащее стеммингу. Допускается только нижний регистр. [String](../../sql-reference/data-types/string.md#string).
**Examples**
Query:
``` sql
SELECT SELECT arrayMap(x -> stem('en', x), ['I', 'think', 'it', 'is', 'a', 'blessing', 'in', 'disguise']) as res;
```
Result:
``` text
┌─res────────────────────────────────────────────────┐
│ ['I','think','it','is','a','bless','in','disguis'] │
└────────────────────────────────────────────────────┘
```
## lemmatize {#lemmatize}
Данная функция проводит лемматизацию для заданного слова.
**Синтаксис**
``` sql
lemmatize('language', word)
```
**Аргументы**
- `language` — Язык, правила которого будут применены для лемматизации. [String](../../sql-reference/data-types/string.md#string).
- `word` — Слово, подлежащее лемматизации. Допускается только нижний регистр. [String](../../sql-reference/data-types/string.md#string).
**Примеры**
Запрос:
``` sql
SELECT lemmatize('en', 'wolves');
```
Результат:
``` text
┌─lemmatize("wolves")─┐
│ "wolf" │
└─────────────────────┘
```
Конфигурация:
``` xml
<lemmatizers>
<lemmatizer>
<lang>en</lang>
<path>en.bin</path>
</lemmatizer>
</lemmatizers>
```
## synonyms {#synonyms}
Находит синонимы к заданному слову.
**Синтаксис**
``` sql
synonyms('extension_name', word)
```
**Аргументы**
- `extension_name` — Название расширения, в котором будет проводиться поиск. [String](../../sql-reference/data-types/string.md#string).
- `word` — Слово, которое будет искаться в расширении. [String](../../sql-reference/data-types/string.md#string).
**Примеры**
Запрос:
``` sql
SELECT synonyms('list', 'important');
```
Результат:
``` text
┌─synonyms('list', 'important')────────────┐
│ ['important','big','critical','crucial'] │
└──────────────────────────────────────────┘
```
Конфигурация:
``` xml
<synonyms_extensions>
<extension>
<name>en</name>
<type>plain</type>
<path>en.txt</path>
</extension>
<extension>
<name>en</name>
<type>wordnet</type>
<path>en/</path>
</extension>
</synonyms_extensions>
```

View File

@ -146,6 +146,70 @@ SELECT splitByRegexp('', 'abcde');
└────────────────────────────┘
```
## splitByWhitespace(s) {#splitbywhitespaceseparator-s}
Разбивает строку на подстроки, используя в качестве разделителей пробельные символы.
**Синтаксис**
``` sql
splitByWhitespace(s)
```
**Аргументы**
- `s` — разбиваемая строка. [String](../../sql-reference/data-types/string.md).
**Возвращаемые значения**
Возвращает массив подстрок.
Тип: [Array](../../sql-reference/data-types/array.md)([String](../../sql-reference/data-types/string.md)).
**Пример**
``` sql
SELECT splitByWhitespace(' 1! a, b. ');
```
``` text
┌─splitByWhitespace(' 1! a, b. ')─┐
│ ['1!','a,','b.'] │
└─────────────────────────────────────┘
```
## splitByNonAlpha(s) {#splitbynonalphaseparator-s}
Разбивает строку на подстроки, используя в качестве разделителей пробельные символы и символы пунктуации.
**Синтаксис**
``` sql
splitByNonAlpha(s)
```
**Аргументы**
- `s` — разбиваемая строка. [String](../../sql-reference/data-types/string.md).
**Возвращаемые значения**
Возвращает массив подстрок.
Тип: [Array](../../sql-reference/data-types/array.md)([String](../../sql-reference/data-types/string.md)).
**Пример**
``` sql
SELECT splitByNonAlpha(' 1! a, b. ');
```
``` text
┌─splitByNonAlpha(' 1! a, b. ')─┐
│ ['1','a','b'] │
└───────────────────────────────────┘
```
## arrayStringConcat(arr\[, separator\]) {#arraystringconcatarr-separator}
Склеивает строки, перечисленные в массиве, с разделителем separator.

View File

@ -473,6 +473,12 @@ endif ()
dbms_target_link_libraries(PRIVATE _boost_context)
if (USE_NLP)
dbms_target_link_libraries (PUBLIC stemmer)
dbms_target_link_libraries (PUBLIC wnb)
dbms_target_link_libraries (PUBLIC lemmagen)
endif()
include ("${ClickHouse_SOURCE_DIR}/cmake/add_check.cmake")
if (ENABLE_TESTS AND USE_GTEST)

View File

@ -580,6 +580,12 @@ void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String
void Connection::sendScalarsData(Scalars & data)
{
/// Avoid sending scalars to old servers. Note that this isn't a full fix. We didn't introduce a
/// dedicated revision after introducing scalars, so this will still break some versions with
/// revision 54428.
if (server_revision < DBMS_MIN_REVISION_WITH_SCALARS)
return;
if (data.empty())
return;

View File

@ -71,6 +71,7 @@
/// Minimum revision supporting SettingsBinaryFormat::STRINGS.
#define DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS 54429
#define DBMS_MIN_REVISION_WITH_SCALARS 54429
/// Minimum revision supporting OpenTelemetry
#define DBMS_MIN_REVISION_WITH_OPENTELEMETRY 54442

View File

@ -490,6 +490,7 @@ class IColumn;
\
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \
M(Bool, allow_experimental_nlp_functions, false, "Enable experimental functions for natural language processing.", 0) \
\
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \

View File

@ -15,4 +15,5 @@
#cmakedefine01 USE_LIBPQXX
#cmakedefine01 USE_SQLITE
#cmakedefine01 USE_NURAFT
#cmakedefine01 USE_NLP
#cmakedefine01 USE_KRB5

View File

@ -17,27 +17,26 @@ namespace ErrorCodes
}
CacheDictionaryStorageConfiguration parseCacheStorageConfiguration(
const String & full_name,
const Poco::Util::AbstractConfiguration & config,
const String & layout_prefix,
const DictionaryLifetime & dict_lifetime,
DictionaryKeyType dictionary_key_type)
const String & full_name,
const String & layout_type,
const String & dictionary_layout_prefix,
const DictionaryLifetime & dict_lifetime)
{
String dictionary_type_prefix = (dictionary_key_type == DictionaryKeyType::complex) ? ".complex_key_cache." : ".cache.";
String dictionary_configuration_prefix = layout_prefix + dictionary_type_prefix;
const size_t size = config.getUInt64(dictionary_configuration_prefix + "size_in_cells");
size_t size = config.getUInt64(dictionary_layout_prefix + ".size_in_cells");
if (size == 0)
throw Exception(ErrorCodes::TOO_SMALL_BUFFER_SIZE,
"{}: cache dictionary cannot have 0 cells",
full_name);
throw Exception(ErrorCodes::TOO_SMALL_BUFFER_SIZE, "{}: dictionary of layout '{}' setting 'size_in_cells' must be greater than 0", full_name, layout_type);
size_t dict_lifetime_seconds = static_cast<size_t>(dict_lifetime.max_sec);
const size_t strict_max_lifetime_seconds = config.getUInt64(dictionary_configuration_prefix + "strict_max_lifetime_seconds", dict_lifetime_seconds);
size_t strict_max_lifetime_seconds = config.getUInt64(dictionary_layout_prefix + ".strict_max_lifetime_seconds", dict_lifetime_seconds);
size_t rounded_size = roundUpToPowerOfTwoOrZero(size);
CacheDictionaryStorageConfiguration storage_configuration{rounded_size, strict_max_lifetime_seconds, dict_lifetime};
CacheDictionaryStorageConfiguration storage_configuration
{
.max_size_in_cells = rounded_size,
.strict_max_lifetime_seconds = strict_max_lifetime_seconds,
.lifetime = dict_lifetime
};
return storage_configuration;
}
@ -45,17 +44,13 @@ CacheDictionaryStorageConfiguration parseCacheStorageConfiguration(
#if defined(OS_LINUX) || defined(__FreeBSD__)
SSDCacheDictionaryStorageConfiguration parseSSDCacheStorageConfiguration(
const String & full_name,
const Poco::Util::AbstractConfiguration & config,
const String & layout_prefix,
const DictionaryLifetime & dict_lifetime,
DictionaryKeyType dictionary_key_type)
const String & full_name,
const String & layout_type,
const String & dictionary_layout_prefix,
const DictionaryLifetime & dict_lifetime)
{
String dictionary_type_prefix = dictionary_key_type == DictionaryKeyType::complex ? ".complex_key_ssd_cache." : ".ssd_cache.";
String dictionary_configuration_prefix = layout_prefix + dictionary_type_prefix;
const size_t strict_max_lifetime_seconds
= config.getUInt64(dictionary_configuration_prefix + "strict_max_lifetime_seconds", static_cast<size_t>(dict_lifetime.max_sec));
size_t strict_max_lifetime_seconds = config.getUInt64(dictionary_layout_prefix + ".strict_max_lifetime_seconds", static_cast<size_t>(dict_lifetime.max_sec));
static constexpr size_t DEFAULT_SSD_BLOCK_SIZE_BYTES = DEFAULT_AIO_FILE_BLOCK_SIZE;
static constexpr size_t DEFAULT_FILE_SIZE_BYTES = 4 * 1024 * 1024 * 1024ULL;
@ -64,44 +59,48 @@ SSDCacheDictionaryStorageConfiguration parseSSDCacheStorageConfiguration(
static constexpr size_t DEFAULT_PARTITIONS_COUNT = 16;
const size_t max_partitions_count
= config.getInt64(dictionary_configuration_prefix + "ssd_cache.max_partitions_count", DEFAULT_PARTITIONS_COUNT);
size_t max_partitions_count = config.getInt64(dictionary_layout_prefix + ".max_partitions_count", DEFAULT_PARTITIONS_COUNT);
const size_t block_size = config.getInt64(dictionary_configuration_prefix + "block_size", DEFAULT_SSD_BLOCK_SIZE_BYTES);
const size_t file_size = config.getInt64(dictionary_configuration_prefix + "file_size", DEFAULT_FILE_SIZE_BYTES);
size_t block_size = config.getInt64(dictionary_layout_prefix + ".block_size", DEFAULT_SSD_BLOCK_SIZE_BYTES);
size_t file_size = config.getInt64(dictionary_layout_prefix + ".file_size", DEFAULT_FILE_SIZE_BYTES);
if (file_size % block_size != 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{}: file_size must be a multiple of block_size",
full_name);
"{}: dictionary of layout '{}' setting 'file_size' must be a multiple of block_size",
full_name,
layout_type);
const size_t read_buffer_size = config.getInt64(dictionary_configuration_prefix + "read_buffer_size", DEFAULT_READ_BUFFER_SIZE_BYTES);
size_t read_buffer_size = config.getInt64(dictionary_layout_prefix + ".read_buffer_size", DEFAULT_READ_BUFFER_SIZE_BYTES);
if (read_buffer_size % block_size != 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{}: read_buffer_size must be a multiple of block_size",
full_name);
"{}: dictionary of layout '{}' setting 'read_buffer_size' must be a multiple of block_size",
full_name,
layout_type);
const size_t write_buffer_size
= config.getInt64(dictionary_configuration_prefix + "write_buffer_size", DEFAULT_WRITE_BUFFER_SIZE_BYTES);
size_t write_buffer_size = config.getInt64(dictionary_layout_prefix + ".write_buffer_size", DEFAULT_WRITE_BUFFER_SIZE_BYTES);
if (write_buffer_size % block_size != 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{}: write_buffer_size must be a multiple of block_size",
full_name);
"{}: dictionary of layout '{}' setting 'write_buffer_size' must be a multiple of block_size",
full_name,
layout_type);
auto file_path = config.getString(dictionary_configuration_prefix + "path");
auto file_path = config.getString(dictionary_layout_prefix + ".path");
if (file_path.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{}: ssd cache dictionary cannot have empty path",
full_name);
"{}: dictionary of layout '{}' setting 'path' must be specified",
full_name,
layout_type);
SSDCacheDictionaryStorageConfiguration configuration{
strict_max_lifetime_seconds,
dict_lifetime,
file_path,
max_partitions_count,
block_size,
file_size / block_size,
read_buffer_size / block_size,
write_buffer_size / block_size};
SSDCacheDictionaryStorageConfiguration configuration
{
.strict_max_lifetime_seconds = strict_max_lifetime_seconds,
.lifetime = dict_lifetime,
.file_path = file_path,
.max_partitions_count = max_partitions_count,
.block_size = block_size,
.file_blocks_size = file_size / block_size,
.read_buffer_blocks_size = read_buffer_size / block_size,
.write_buffer_blocks_size = write_buffer_size / block_size
};
return configuration;
}
@ -109,155 +108,131 @@ SSDCacheDictionaryStorageConfiguration parseSSDCacheStorageConfiguration(
#endif
CacheDictionaryUpdateQueueConfiguration parseCacheDictionaryUpdateQueueConfiguration(
const String & full_name,
const Poco::Util::AbstractConfiguration & config,
const String & layout_prefix,
DictionaryKeyType key_type)
const String & full_name,
const String & layout_type,
const String & dictionary_layout_prefix)
{
String layout_type = key_type == DictionaryKeyType::complex ? "complex_key_cache" : "cache";
const size_t max_update_queue_size = config.getUInt64(layout_prefix + ".cache.max_update_queue_size", 100000);
size_t max_update_queue_size = config.getUInt64(dictionary_layout_prefix + ".max_update_queue_size", 100000);
if (max_update_queue_size == 0)
throw Exception(ErrorCodes::TOO_SMALL_BUFFER_SIZE,
"{}: dictionary of layout '{}' cannot have empty update queue of size 0",
"{}: dictionary of layout '{}' setting 'max_update_queue_size' must be greater than 0",
full_name,
layout_type);
const size_t update_queue_push_timeout_milliseconds
= config.getUInt64(layout_prefix + ".cache.update_queue_push_timeout_milliseconds", 10);
size_t update_queue_push_timeout_milliseconds = config.getUInt64(dictionary_layout_prefix + ".update_queue_push_timeout_milliseconds", 10);
if (update_queue_push_timeout_milliseconds < 10)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{}: dictionary of layout '{}' have too little update_queue_push_timeout",
"{}: dictionary of layout '{}' setting 'update_queue_push_timeout_milliseconds' must be greater or equal than 10",
full_name,
layout_type);
const size_t query_wait_timeout_milliseconds = config.getUInt64(layout_prefix + ".cache.query_wait_timeout_milliseconds", 60000);
size_t query_wait_timeout_milliseconds = config.getUInt64(dictionary_layout_prefix + ".query_wait_timeout_milliseconds", 60000);
const size_t max_threads_for_updates = config.getUInt64(layout_prefix + ".max_threads_for_updates", 4);
size_t max_threads_for_updates = config.getUInt64(dictionary_layout_prefix + ".max_threads_for_updates", 4);
if (max_threads_for_updates == 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{}: dictionary of layout) '{}' cannot have zero threads for updates",
"{}: dictionary of layout '{}' setting 'max_threads_for_updates' must be greater than 0",
full_name,
layout_type);
CacheDictionaryUpdateQueueConfiguration update_queue_configuration{
max_update_queue_size, max_threads_for_updates, update_queue_push_timeout_milliseconds, query_wait_timeout_milliseconds};
CacheDictionaryUpdateQueueConfiguration update_queue_configuration
{
.max_update_queue_size = max_update_queue_size,
.max_threads_for_updates = max_threads_for_updates,
.update_queue_push_timeout_milliseconds = update_queue_push_timeout_milliseconds,
.query_wait_timeout_milliseconds = query_wait_timeout_milliseconds
};
return update_queue_configuration;
}
template <DictionaryKeyType dictionary_key_type>
template <DictionaryKeyType dictionary_key_type, bool ssd>
DictionaryPtr createCacheDictionaryLayout(
const String & full_name,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr)
{
static_assert(dictionary_key_type != DictionaryKeyType::range, "Range key type is not supported by CacheDictionary");
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
{
if (dict_struct.key)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "'key' is not supported for dictionary of layout 'cache'");
}
else if constexpr (dictionary_key_type == DictionaryKeyType::complex)
{
if (dict_struct.id)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "'id' is not supported for dictionary of layout 'complex_key_cache'");
}
if (dict_struct.range_min || dict_struct.range_max)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{}: elements .structure.range_min and .structure.range_max should be defined only "
"for a dictionary of layout 'range_hashed'",
full_name);
const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
if (require_nonempty)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{}: cache dictionary of layout cannot have 'require_nonempty' attribute set",
full_name);
const auto & layout_prefix = config_prefix + ".layout";
const auto dict_id = StorageID::fromDictionaryConfig(config, config_prefix);
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
const bool allow_read_expired_keys = config.getBool(layout_prefix + ".cache.allow_read_expired_keys", false);
auto storage_configuration = parseCacheStorageConfiguration(full_name, config, layout_prefix, dict_lifetime, dictionary_key_type);
std::shared_ptr<ICacheDictionaryStorage> storage = std::make_shared<CacheDictionaryStorage<dictionary_key_type>>(dict_struct, storage_configuration);
auto update_queue_configuration = parseCacheDictionaryUpdateQueueConfiguration(full_name, config, layout_prefix, dictionary_key_type);
return std::make_unique<CacheDictionary<dictionary_key_type>>(
dict_id, dict_struct, std::move(source_ptr), storage, update_queue_configuration, dict_lifetime, allow_read_expired_keys);
}
#if defined(OS_LINUX) || defined(__FreeBSD__)
template <DictionaryKeyType dictionary_key_type>
DictionaryPtr createSSDCacheDictionaryLayout(
const String & full_name,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
ContextPtr context,
bool created_from_ddl)
ContextPtr context [[maybe_unused]],
bool created_from_ddl [[maybe_unused]])
{
static_assert(dictionary_key_type != DictionaryKeyType::range, "Range key type is not supported by CacheDictionary");
String layout_type;
if constexpr (dictionary_key_type == DictionaryKeyType::simple && !ssd)
layout_type = "cache";
else if constexpr (dictionary_key_type == DictionaryKeyType::simple && ssd)
layout_type = "ssd_cache";
else if constexpr (dictionary_key_type == DictionaryKeyType::complex && !ssd)
layout_type = "complex_key_cache";
else if constexpr (dictionary_key_type == DictionaryKeyType::complex && ssd)
layout_type = "complex_key_ssd_cache";
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
{
if (dict_struct.key)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "'key' is not supported for dictionary of layout 'ssd_cache'");
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "{}: dictionary of layout '{}' 'key' is not supported", full_name, layout_type);
}
else if constexpr (dictionary_key_type == DictionaryKeyType::complex)
{
if (dict_struct.id)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "'id' is not supported for dictionary of layout 'complex_key_ssd_cache'");
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "{}: dictionary of layout '{}' 'id' is not supported", full_name, layout_type);
}
if (dict_struct.range_min || dict_struct.range_max)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{}: elements .structure.range_min and .structure.range_max should be defined only "
"{}: dictionary of layout '{}' elements .structure.range_min and .structure.range_max must be defined only "
"for a dictionary of layout 'range_hashed'",
full_name);
full_name,
layout_type);
const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
if (require_nonempty)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{}: cache dictionary of layout cannot have 'require_nonempty' attribute set",
full_name);
const auto & layout_prefix = config_prefix + ".layout";
const auto dict_id = StorageID::fromDictionaryConfig(config, config_prefix);
"{}: cache dictionary of layout '{}' cannot have 'require_nonempty' attribute set",
full_name,
layout_type);
const auto dictionary_identifier = StorageID::fromDictionaryConfig(config, config_prefix);
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
const bool allow_read_expired_keys = config.getBool(layout_prefix + ".cache.allow_read_expired_keys", false);
const auto & layout_prefix = config_prefix + ".layout";
const auto & dictionary_layout_prefix = layout_prefix + '.' + layout_type;
const bool allow_read_expired_keys = config.getBool(dictionary_layout_prefix + ".allow_read_expired_keys", false);
auto storage_configuration = parseSSDCacheStorageConfiguration(full_name, config, layout_prefix, dict_lifetime, dictionary_key_type);
auto update_queue_configuration = parseCacheDictionaryUpdateQueueConfiguration(config, full_name, layout_type, dictionary_layout_prefix);
if (created_from_ddl && !pathStartsWith(storage_configuration.file_path, context->getUserFilesPath()))
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", storage_configuration.file_path, context->getUserFilesPath());
std::shared_ptr<ICacheDictionaryStorage> storage;
if constexpr (!ssd)
{
auto storage_configuration = parseCacheStorageConfiguration(config, full_name, layout_type, dictionary_layout_prefix, dict_lifetime);
storage = std::make_shared<CacheDictionaryStorage<dictionary_key_type>>(dict_struct, storage_configuration);
}
#if defined(OS_LINUX) || defined(__FreeBSD__)
else
{
auto storage_configuration = parseSSDCacheStorageConfiguration(config, full_name, layout_type, dictionary_layout_prefix, dict_lifetime);
if (created_from_ddl && !pathStartsWith(storage_configuration.file_path, context->getUserFilesPath()))
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", storage_configuration.file_path, context->getUserFilesPath());
auto storage = std::make_shared<SSDCacheDictionaryStorage<dictionary_key_type>>(storage_configuration);
auto update_queue_configuration = parseCacheDictionaryUpdateQueueConfiguration(full_name, config, layout_prefix, dictionary_key_type);
return std::make_unique<CacheDictionary<dictionary_key_type>>(
dict_id, dict_struct, std::move(source_ptr), storage, update_queue_configuration, dict_lifetime, allow_read_expired_keys);
}
storage = std::make_shared<SSDCacheDictionaryStorage<dictionary_key_type>>(storage_configuration);
}
#endif
auto dictionary = std::make_unique<CacheDictionary<dictionary_key_type>>(
dictionary_identifier,
dict_struct,
std::move(source_ptr),
std::move(storage),
update_queue_configuration,
dict_lifetime,
allow_read_expired_keys);
return dictionary;
}
void registerDictionaryCache(DictionaryFactory & factory)
{
auto create_simple_cache_layout = [=](const String & full_name,
@ -265,10 +240,10 @@ void registerDictionaryCache(DictionaryFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
ContextPtr /* context */,
bool /* created_from_ddl */) -> DictionaryPtr
ContextPtr context,
bool created_from_ddl) -> DictionaryPtr
{
return createCacheDictionaryLayout<DictionaryKeyType::simple>(full_name, dict_struct, config, config_prefix, std::move(source_ptr));
return createCacheDictionaryLayout<DictionaryKeyType::simple, false/* ssd */>(full_name, dict_struct, config, config_prefix, std::move(source_ptr), std::move(context), created_from_ddl);
};
factory.registerLayout("cache", create_simple_cache_layout, false);
@ -278,10 +253,10 @@ void registerDictionaryCache(DictionaryFactory & factory)
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
ContextPtr /* context */,
bool /* created_from_ddl */) -> DictionaryPtr
ContextPtr context,
bool created_from_ddl) -> DictionaryPtr
{
return createCacheDictionaryLayout<DictionaryKeyType::complex>(full_name, dict_struct, config, config_prefix, std::move(source_ptr));
return createCacheDictionaryLayout<DictionaryKeyType::complex, false /* ssd */>(full_name, dict_struct, config, config_prefix, std::move(source_ptr), std::move(context), created_from_ddl);
};
factory.registerLayout("complex_key_cache", create_complex_key_cache_layout, true);
@ -296,7 +271,7 @@ void registerDictionaryCache(DictionaryFactory & factory)
ContextPtr context,
bool created_from_ddl) -> DictionaryPtr
{
return createSSDCacheDictionaryLayout<DictionaryKeyType::simple>(full_name, dict_struct, config, config_prefix, std::move(source_ptr), context, created_from_ddl);
return createCacheDictionaryLayout<DictionaryKeyType::simple, true /* ssd */>(full_name, dict_struct, config, config_prefix, std::move(source_ptr), std::move(context), created_from_ddl);
};
factory.registerLayout("ssd_cache", create_simple_ssd_cache_layout, false);
@ -308,11 +283,13 @@ void registerDictionaryCache(DictionaryFactory & factory)
DictionarySourcePtr source_ptr,
ContextPtr context,
bool created_from_ddl) -> DictionaryPtr {
return createSSDCacheDictionaryLayout<DictionaryKeyType::complex>(full_name, dict_struct, config, config_prefix, std::move(source_ptr), context, created_from_ddl);
return createCacheDictionaryLayout<DictionaryKeyType::complex, true /* ssd */>(full_name, dict_struct, config, config_prefix, std::move(source_ptr), std::move(context), created_from_ddl);
};
factory.registerLayout("complex_key_ssd_cache", create_complex_key_ssd_cache_layout, true);
#endif
}
}

View File

@ -16,6 +16,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int INCORRECT_DISK_INDEX;
extern const int DATA_ENCRYPTION_ERROR;
extern const int NOT_IMPLEMENTED;
}
namespace
@ -37,80 +38,121 @@ namespace
}
}
struct DiskEncryptedSettings
std::unique_ptr<const DiskEncryptedSettings> parseDiskEncryptedSettings(
const String & name, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const DisksMap & map)
{
DiskPtr wrapped_disk;
String path_on_wrapped_disk;
std::unordered_map<UInt64, String> keys;
UInt64 current_key_id;
Algorithm current_algorithm;
DiskEncryptedSettings(
const String & disk_name, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const DisksMap & map)
try
{
try
{
current_algorithm = DEFAULT_ENCRYPTION_ALGORITHM;
if (config.has(config_prefix + ".algorithm"))
parseFromString(current_algorithm, config.getString(config_prefix + ".algorithm"));
auto res = std::make_unique<DiskEncryptedSettings>();
res->current_algorithm = DEFAULT_ENCRYPTION_ALGORITHM;
if (config.has(config_prefix + ".algorithm"))
parseFromString(res->current_algorithm, config.getString(config_prefix + ".algorithm"));
Strings config_keys;
config.keys(config_prefix, config_keys);
for (const std::string & config_key : config_keys)
Strings config_keys;
config.keys(config_prefix, config_keys);
for (const std::string & config_key : config_keys)
{
String key;
UInt64 key_id;
if ((config_key == "key") || config_key.starts_with("key["))
{
String key;
UInt64 key_id;
if ((config_key == "key") || config_key.starts_with("key["))
{
key = config.getString(config_prefix + "." + config_key, "");
key_id = config.getUInt64(config_prefix + "." + config_key + "[@id]", 0);
}
else if ((config_key == "key_hex") || config_key.starts_with("key_hex["))
{
key = unhexKey(config.getString(config_prefix + "." + config_key, ""));
key_id = config.getUInt64(config_prefix + "." + config_key + "[@id]", 0);
}
else
continue;
auto it = keys.find(key_id);
if (it != keys.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Multiple keys have the same ID {}", key_id);
keys[key_id] = key;
key = config.getString(config_prefix + "." + config_key, "");
key_id = config.getUInt64(config_prefix + "." + config_key + "[@id]", 0);
}
else if ((config_key == "key_hex") || config_key.starts_with("key_hex["))
{
key = unhexKey(config.getString(config_prefix + "." + config_key, ""));
key_id = config.getUInt64(config_prefix + "." + config_key + "[@id]", 0);
}
else
continue;
if (keys.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No keys, an encrypted disk needs keys to work", current_key_id);
current_key_id = config.getUInt64(config_prefix + ".current_key_id", 0);
if (!keys.contains(current_key_id))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Key with ID {} not found", current_key_id);
FileEncryption::checkKeySize(current_algorithm, keys[current_key_id].size());
String wrapped_disk_name = config.getString(config_prefix + ".disk", "");
if (wrapped_disk_name.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Name of the wrapped disk must not be empty. An encrypted disk is a wrapper over another disk");
auto wrapped_disk_it = map.find(wrapped_disk_name);
if (wrapped_disk_it == map.end())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"The wrapped disk must have been announced earlier. No disk with name {}",
wrapped_disk_name);
wrapped_disk = wrapped_disk_it->second;
path_on_wrapped_disk = config.getString(config_prefix + ".path", "");
}
catch (Exception & e)
{
e.addMessage("Disk " + disk_name);
throw;
if (res->keys.contains(key_id))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Multiple keys have the same ID {}", key_id);
res->keys[key_id] = key;
}
if (res->keys.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No keys, an encrypted disk needs keys to work");
res->current_key_id = config.getUInt64(config_prefix + ".current_key_id", 0);
if (!res->keys.contains(res->current_key_id))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Not found a key with the current ID {}", res->current_key_id);
FileEncryption::checkKeySize(res->current_algorithm, res->keys[res->current_key_id].size());
String wrapped_disk_name = config.getString(config_prefix + ".disk", "");
if (wrapped_disk_name.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Name of the wrapped disk must not be empty. Encrypted disk is a wrapper over another disk");
auto wrapped_disk_it = map.find(wrapped_disk_name);
if (wrapped_disk_it == map.end())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"The wrapped disk must have been announced earlier. No disk with name {}",
wrapped_disk_name);
res->wrapped_disk = wrapped_disk_it->second;
res->disk_path = config.getString(config_prefix + ".path", "");
if (!res->disk_path.empty() && (res->disk_path.back() != '/'))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk path must ends with '/', but '{}' doesn't.", quoteString(res->disk_path));
return res;
}
};
catch (Exception & e)
{
e.addMessage("Disk " + name);
throw;
}
}
FileEncryption::Header readHeader(ReadBufferFromFileBase & read_buffer)
{
try
{
FileEncryption::Header header;
header.read(read_buffer);
return header;
}
catch (Exception & e)
{
e.addMessage("While reading the header of encrypted file " + quoteString(read_buffer.getFileName()));
throw;
}
}
String getCurrentKey(const String & path, const DiskEncryptedSettings & settings)
{
auto it = settings.keys.find(settings.current_key_id);
if (it == settings.keys.end())
throw Exception(
ErrorCodes::DATA_ENCRYPTION_ERROR,
"Not found a key with the current ID {} required to cipher file {}",
settings.current_key_id,
quoteString(path));
return it->second;
}
String getKey(const String & path, const FileEncryption::Header & header, const DiskEncryptedSettings & settings)
{
auto it = settings.keys.find(header.key_id);
if (it == settings.keys.end())
throw Exception(
ErrorCodes::DATA_ENCRYPTION_ERROR,
"Not found a key with ID {} required to decipher file {}",
header.key_id,
quoteString(path));
String key = it->second;
if (calculateKeyHash(key) != header.key_hash)
throw Exception(
ErrorCodes::DATA_ENCRYPTION_ERROR, "Wrong key with ID {}, could not decipher file {}", header.key_id, quoteString(path));
return key;
}
bool inline isSameDiskType(const IDisk & one, const IDisk & another)
{
@ -144,6 +186,22 @@ private:
std::unique_ptr<IReservation> reservation;
};
DiskEncrypted::DiskEncrypted(
const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_)
: DiskEncrypted(name_, parseDiskEncryptedSettings(name_, config_, config_prefix_, map_))
{
}
DiskEncrypted::DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_)
: DiskDecorator(settings_->wrapped_disk)
, name(name_)
, disk_path(settings_->disk_path)
, disk_absolute_path(settings_->wrapped_disk->getPath() + settings_->disk_path)
, current_settings(std::move(settings_))
{
delegate->createDirectories(disk_path);
}
ReservationPtr DiskEncrypted::reserve(UInt64 bytes)
{
auto reservation = delegate->reserve(bytes);
@ -152,58 +210,23 @@ ReservationPtr DiskEncrypted::reserve(UInt64 bytes)
return std::make_unique<DiskEncryptedReservation>(std::static_pointer_cast<DiskEncrypted>(shared_from_this()), std::move(reservation));
}
DiskEncrypted::DiskEncrypted(
const String & name_,
DiskPtr wrapped_disk_,
const String & path_on_wrapped_disk_,
const std::unordered_map<UInt64, String> & keys_,
UInt64 current_key_id_,
FileEncryption::Algorithm current_algorithm_)
: DiskDecorator(wrapped_disk_)
, name(name_)
, disk_path(path_on_wrapped_disk_)
, keys(keys_)
, current_key_id(current_key_id_)
, current_algorithm(current_algorithm_)
{
initialize();
}
void DiskEncrypted::initialize()
{
disk_absolute_path = delegate->getPath() + disk_path;
// use wrapped_disk as an EncryptedDisk store
if (disk_path.empty())
return;
if (disk_path.back() != '/')
throw Exception("Disk path must ends with '/', but '" + disk_path + "' doesn't.", ErrorCodes::BAD_ARGUMENTS);
delegate->createDirectories(disk_path);
}
String DiskEncrypted::getKey(UInt64 key_id) const
{
auto it = keys.find(key_id);
if (it == keys.end())
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Key with ID {} not found", key_id);
return it->second;
}
void DiskEncrypted::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
{
/// Check if we can copy the file without deciphering.
if (isSameDiskType(*this, *to_disk))
{
/// Disk type is the same, check if the key is the same too.
if (auto * to_encrypted_disk = typeid_cast<DiskEncrypted *>(to_disk.get()))
if (auto * to_disk_enc = typeid_cast<DiskEncrypted *>(to_disk.get()))
{
if (keys == to_encrypted_disk->keys)
auto from_settings = current_settings.get();
auto to_settings = to_disk_enc->current_settings.get();
if (from_settings->keys == to_settings->keys)
{
/// Keys are the same so we can simply copy the encrypted file.
delegate->copy(wrappedPath(from_path), to_encrypted_disk->delegate, to_encrypted_disk->wrappedPath(to_path));
auto wrapped_from_path = wrappedPath(from_path);
auto to_delegate = to_disk_enc->delegate;
auto wrapped_to_path = to_disk_enc->wrappedPath(to_path);
delegate->copy(wrapped_from_path, to_delegate, wrapped_to_path);
return;
}
}
@ -221,62 +244,43 @@ std::unique_ptr<ReadBufferFromFileBase> DiskEncrypted::readFile(
size_t mmap_threshold,
MMappedFileCache * mmap_cache) const
{
try
{
auto wrapped_path = wrappedPath(path);
auto buffer = delegate->readFile(wrapped_path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache);
FileEncryption::Header header;
header.read(*buffer);
String key = getKey(header.key_id);
if (calculateKeyHash(key) != header.key_hash)
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Wrong key, could not read file");
return std::make_unique<ReadBufferFromEncryptedFile>(buf_size, std::move(buffer), key, header);
}
catch (Exception & e)
{
e.addMessage("File " + quoteString(path));
throw;
}
auto wrapped_path = wrappedPath(path);
auto buffer = delegate->readFile(wrapped_path, buf_size, estimated_size, aio_threshold, mmap_threshold, mmap_cache);
auto settings = current_settings.get();
FileEncryption::Header header = readHeader(*buffer);
String key = getKey(path, header, *settings);
return std::make_unique<ReadBufferFromEncryptedFile>(buf_size, std::move(buffer), key, header);
}
std::unique_ptr<WriteBufferFromFileBase> DiskEncrypted::writeFile(const String & path, size_t buf_size, WriteMode mode)
{
try
auto wrapped_path = wrappedPath(path);
FileEncryption::Header header;
String key;
UInt64 old_file_size = 0;
auto settings = current_settings.get();
if (mode == WriteMode::Append && exists(path))
{
auto wrapped_path = wrappedPath(path);
FileEncryption::Header header;
String key;
UInt64 old_file_size = 0;
if (mode == WriteMode::Append && exists(path))
old_file_size = getFileSize(path);
if (old_file_size)
{
old_file_size = getFileSize(path);
if (old_file_size)
{
/// Append mode: we continue to use the same header.
auto read_buffer = delegate->readFile(wrapped_path, FileEncryption::Header::kSize);
header.read(*read_buffer);
key = getKey(header.key_id);
if (calculateKeyHash(key) != header.key_hash)
throw Exception(ErrorCodes::DATA_ENCRYPTION_ERROR, "Wrong key, could not append file");
}
/// Append mode: we continue to use the same header.
auto read_buffer = delegate->readFile(wrapped_path, FileEncryption::Header::kSize);
header = readHeader(*read_buffer);
key = getKey(path, header, *settings);
}
if (!old_file_size)
{
/// Rewrite mode: we generate a new header.
key = getKey(current_key_id);
header.algorithm = current_algorithm;
header.key_id = current_key_id;
header.key_hash = calculateKeyHash(key);
header.init_vector = InitVector::random();
}
auto buffer = delegate->writeFile(wrapped_path, buf_size, mode);
return std::make_unique<WriteBufferFromEncryptedFile>(buf_size, std::move(buffer), key, header, old_file_size);
}
catch (Exception & e)
if (!old_file_size)
{
e.addMessage("File " + quoteString(path));
throw;
/// Rewrite mode: we generate a new header.
key = getCurrentKey(path, *settings);
header.algorithm = settings->current_algorithm;
header.key_id = settings->current_key_id;
header.key_hash = calculateKeyHash(key);
header.init_vector = InitVector::random();
}
auto buffer = delegate->writeFile(wrapped_path, buf_size, mode);
return std::make_unique<WriteBufferFromEncryptedFile>(buf_size, std::move(buffer), key, header, old_file_size);
}
@ -303,15 +307,16 @@ void DiskEncrypted::applyNewSettings(
const Poco::Util::AbstractConfiguration & config,
ContextPtr /*context*/,
const String & config_prefix,
const DisksMap & map)
const DisksMap & disk_map)
{
DiskEncryptedSettings settings{name, config, config_prefix, map};
delegate = settings.wrapped_disk;
disk_path = settings.path_on_wrapped_disk;
keys = settings.keys;
current_key_id = settings.current_key_id;
current_algorithm = settings.current_algorithm;
initialize();
auto new_settings = parseDiskEncryptedSettings(name, config, config_prefix, disk_map);
if (new_settings->wrapped_disk != delegate)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Сhanging wrapped disk on the fly is not supported. Disk {}", name);
if (new_settings->disk_path != disk_path)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Сhanging disk path on the fly is not supported. Disk {}", name);
current_settings.set(std::move(new_settings));
}
void registerDiskEncrypted(DiskFactory & factory)
@ -322,14 +327,7 @@ void registerDiskEncrypted(DiskFactory & factory)
ContextPtr /*context*/,
const DisksMap & map) -> DiskPtr
{
DiskEncryptedSettings settings{name, config, config_prefix, map};
return std::make_shared<DiskEncrypted>(
name,
settings.wrapped_disk,
settings.path_on_wrapped_disk,
settings.keys,
settings.current_key_id,
settings.current_algorithm);
return std::make_shared<DiskEncrypted>(name, config, config_prefix, map);
};
factory.registerDiskType("encrypted", creator);
}

View File

@ -7,6 +7,7 @@
#if USE_SSL
#include <Disks/IDisk.h>
#include <Disks/DiskDecorator.h>
#include <Common/MultiVersion.h>
namespace DB
@ -15,19 +16,23 @@ class ReadBufferFromFileBase;
class WriteBufferFromFileBase;
namespace FileEncryption { enum class Algorithm; }
struct DiskEncryptedSettings
{
DiskPtr wrapped_disk;
String disk_path;
std::unordered_map<UInt64, String> keys;
UInt64 current_key_id;
FileEncryption::Algorithm current_algorithm;
};
/// Encrypted disk ciphers all written files on the fly and writes the encrypted files to an underlying (normal) disk.
/// And when we read files from an encrypted disk it deciphers them automatically,
/// so we can work with a encrypted disk like it's a normal disk.
class DiskEncrypted : public DiskDecorator
{
public:
DiskEncrypted(
const String & name_,
DiskPtr wrapped_disk_,
const String & path_on_wrapped_disk_,
const std::unordered_map<UInt64, String> & keys_,
UInt64 current_key_id_,
FileEncryption::Algorithm current_algorithm_);
DiskEncrypted(const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_);
DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_);
const String & getName() const override { return name; }
const String & getPath() const override { return disk_absolute_path; }
@ -215,8 +220,6 @@ public:
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;
private:
void initialize();
String wrappedPath(const String & path) const
{
// if path starts_with disk_path -> got already wrapped path
@ -225,14 +228,10 @@ private:
return disk_path + path;
}
String getKey(UInt64 key_id) const;
String name;
String disk_path;
String disk_absolute_path;
std::unordered_map<UInt64, String> keys;
UInt64 current_key_id;
FileEncryption::Algorithm current_algorithm;
const String name;
const String disk_path;
const String disk_absolute_path;
MultiVersion<DiskEncryptedSettings> current_settings;
};
}

View File

@ -9,6 +9,8 @@ void registerFunctionsStringArray(FunctionFactory & factory)
{
factory.registerFunction<FunctionExtractAll>();
factory.registerFunction<FunctionAlphaTokens>();
factory.registerFunction<FunctionSplitByNonAlpha>();
factory.registerFunction<FunctionSplitByWhitespace>();
factory.registerFunction<FunctionSplitByChar>();
factory.registerFunction<FunctionSplitByString>();
factory.registerFunction<FunctionSplitByRegexp>();

View File

@ -33,6 +33,9 @@ namespace ErrorCodes
* splitByString(sep, s)
* splitByRegexp(regexp, s)
*
* splitByWhitespace(s) - split the string by whitespace characters
* splitByNonAlpha(s) - split the string by whitespace and punctuation characters
*
* extractAll(s, regexp) - select from the string the subsequences corresponding to the regexp.
* - first subpattern, if regexp has subpattern;
* - zero subpattern (the match part, otherwise);
@ -111,6 +114,121 @@ public:
}
};
class SplitByNonAlphaImpl
{
private:
Pos pos;
Pos end;
public:
/// Get the name of the function.
static constexpr auto name = "splitByNonAlpha";
static String getName() { return name; }
static size_t getNumberOfArguments() { return 1; }
/// Check the type of the function's arguments.
static void checkArguments(const DataTypes & arguments)
{
if (!isString(arguments[0]))
throw Exception("Illegal type " + arguments[0]->getName() + " of first argument of function " + getName() + ". Must be String.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
/// Initialize by the function arguments.
void init(const ColumnsWithTypeAndName & /*arguments*/) {}
/// Called for each next string.
void set(Pos pos_, Pos end_)
{
pos = pos_;
end = end_;
}
/// Returns the position of the argument, that is the column of strings
size_t getStringsArgumentPosition()
{
return 0;
}
/// Get the next token, if any, or return false.
bool get(Pos & token_begin, Pos & token_end)
{
/// Skip garbage
while (pos < end && (isWhitespaceASCII(*pos) || isPunctuationASCII(*pos)))
++pos;
if (pos == end)
return false;
token_begin = pos;
while (pos < end && !(isWhitespaceASCII(*pos) || isPunctuationASCII(*pos)))
++pos;
token_end = pos;
return true;
}
};
class SplitByWhitespaceImpl
{
private:
Pos pos;
Pos end;
public:
/// Get the name of the function.
static constexpr auto name = "splitByWhitespace";
static String getName() { return name; }
static size_t getNumberOfArguments() { return 1; }
/// Check the type of the function's arguments.
static void checkArguments(const DataTypes & arguments)
{
if (!isString(arguments[0]))
throw Exception("Illegal type " + arguments[0]->getName() + " of first argument of function " + getName() + ". Must be String.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
/// Initialize by the function arguments.
void init(const ColumnsWithTypeAndName & /*arguments*/) {}
/// Called for each next string.
void set(Pos pos_, Pos end_)
{
pos = pos_;
end = end_;
}
/// Returns the position of the argument, that is the column of strings
size_t getStringsArgumentPosition()
{
return 0;
}
/// Get the next token, if any, or return false.
bool get(Pos & token_begin, Pos & token_end)
{
/// Skip garbage
while (pos < end && isWhitespaceASCII(*pos))
++pos;
if (pos == end)
return false;
token_begin = pos;
while (pos < end && !isWhitespaceASCII(*pos))
++pos;
token_end = pos;
return true;
}
};
class SplitByCharImpl
{
@ -662,6 +780,8 @@ public:
using FunctionAlphaTokens = FunctionTokens<AlphaTokensImpl>;
using FunctionSplitByNonAlpha = FunctionTokens<SplitByNonAlphaImpl>;
using FunctionSplitByWhitespace = FunctionTokens<SplitByWhitespaceImpl>;
using FunctionSplitByChar = FunctionTokens<SplitByCharImpl>;
using FunctionSplitByString = FunctionTokens<SplitByStringImpl>;
using FunctionSplitByRegexp = FunctionTokens<SplitByRegexpImpl>;

View File

@ -5,6 +5,7 @@
#include <DataTypes/DataTypeString.h>
#include <Common/SymbolIndex.h>
#include <Core/Field.h>
#include <Interpreters/Context.h>
namespace DB
@ -18,9 +19,13 @@ class FunctionBuildId : public IFunction
{
public:
static constexpr auto name = "buildId";
static FunctionPtr create(ContextPtr)
static FunctionPtr create(ContextPtr context)
{
return std::make_shared<FunctionBuildId>(context->isDistributed());
}
explicit FunctionBuildId(bool is_distributed_) : is_distributed(is_distributed_)
{
return std::make_shared<FunctionBuildId>();
}
String getName() const override
@ -33,6 +38,10 @@ public:
return 0;
}
bool isDeterministic() const override { return false; }
bool isDeterministicInScopeOfQuery() const override { return true; }
bool isSuitableForConstantFolding() const override { return !is_distributed; }
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeString>();
@ -42,6 +51,9 @@ public:
{
return DataTypeString().createColumnConst(input_rows_count, SymbolIndex::instance()->getBuildIDHex());
}
private:
bool is_distributed;
};
}

View File

@ -2,6 +2,7 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnString.h>
#include <Interpreters/Context.h>
#include <Common/Macros.h>
@ -60,11 +61,89 @@ private:
mutable ColumnWithTypeAndName scalar;
};
/** Get special scalar values
*/
template <typename Scalar>
class FunctionGetSpecialScalar : public IFunction
{
public:
static constexpr auto name = Scalar::name;
static FunctionPtr create(ContextPtr context_)
{
return std::make_shared<FunctionGetSpecialScalar<Scalar>>(context_);
}
static ColumnWithTypeAndName createScalar(ContextPtr context_)
{
if (const auto * block = context_->tryGetLocalScalar(Scalar::scalar_name))
return block->getByPosition(0);
else if (context_->hasQueryContext())
{
if (context_->getQueryContext()->hasScalar(Scalar::scalar_name))
return context_->getQueryContext()->getScalar(Scalar::scalar_name).getByPosition(0);
}
return {DataTypeUInt32().createColumnConst(1, 0), std::make_shared<DataTypeUInt32>(), Scalar::scalar_name};
}
explicit FunctionGetSpecialScalar(ContextPtr context_)
: scalar(createScalar(context_)), is_distributed(context_->isDistributed())
{
}
String getName() const override
{
return name;
}
bool isDeterministic() const override { return false; }
bool isDeterministicInScopeOfQuery() const override
{
return true;
}
bool isSuitableForConstantFolding() const override { return !is_distributed; }
size_t getNumberOfArguments() const override
{
return 0;
}
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName &) const override
{
return scalar.type;
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
return ColumnConst::create(scalar.column, input_rows_count);
}
private:
ColumnWithTypeAndName scalar;
bool is_distributed;
};
struct GetShardNum
{
static constexpr auto name = "shardNum";
static constexpr auto scalar_name = "_shard_num";
};
struct GetShardCount
{
static constexpr auto name = "shardCount";
static constexpr auto scalar_name = "_shard_count";
};
}
void registerFunctionGetScalar(FunctionFactory & factory)
{
factory.registerFunction<FunctionGetScalar>();
factory.registerFunction<FunctionGetSpecialScalar<GetShardNum>>();
factory.registerFunction<FunctionGetSpecialScalar<GetShardCount>>();
}
}

View File

@ -3,6 +3,7 @@
#include <DataTypes/DataTypeString.h>
#include <Common/DNSResolver.h>
#include <Core/Field.h>
#include <Interpreters/Context.h>
namespace DB
@ -15,9 +16,13 @@ class FunctionHostName : public IFunction
{
public:
static constexpr auto name = "hostName";
static FunctionPtr create(ContextPtr)
static FunctionPtr create(ContextPtr context)
{
return std::make_shared<FunctionHostName>(context->isDistributed());
}
explicit FunctionHostName(bool is_distributed_) : is_distributed(is_distributed_)
{
return std::make_shared<FunctionHostName>();
}
String getName() const override
@ -29,10 +34,10 @@ public:
bool isDeterministicInScopeOfQuery() const override
{
return false;
return true;
}
bool isSuitableForConstantFolding() const override { return false; }
bool isSuitableForConstantFolding() const override { return !is_distributed; }
size_t getNumberOfArguments() const override
{
@ -44,14 +49,12 @@ public:
return std::make_shared<DataTypeString>();
}
/** convertToFullColumn needed because in distributed query processing,
* each server returns its own value.
*/
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr & result_type, size_t input_rows_count) const override
{
return result_type->createColumnConst(
input_rows_count, DNSResolver::instance().getHostName())->convertToFullColumnIfConst();
return result_type->createColumnConst(input_rows_count, DNSResolver::instance().getHostName());
}
private:
bool is_distributed;
};
}

130
src/Functions/lemmatize.cpp Normal file
View File

@ -0,0 +1,130 @@
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_NLP
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context.h>
#include <Interpreters/Lemmatizers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int SUPPORT_IS_DISABLED;
}
namespace
{
struct LemmatizeImpl
{
static void vector(
const ColumnString::Chars & data,
const ColumnString::Offsets & offsets,
ColumnString::Chars & res_data,
ColumnString::Offsets & res_offsets,
Lemmatizers::LemmPtr & lemmatizer)
{
res_data.resize(data.size());
res_offsets.assign(offsets);
UInt64 data_size = 0;
for (UInt64 i = 0; i < offsets.size(); ++i)
{
/// lemmatize() uses the fact the fact that each string ends with '\0'
auto result = lemmatizer->lemmatize(reinterpret_cast<const char *>(data.data() + offsets[i - 1]));
size_t new_size = strlen(result.get()) + 1;
if (data_size + new_size > res_data.size())
res_data.resize(data_size + new_size);
memcpy(res_data.data() + data_size, reinterpret_cast<const unsigned char *>(result.get()), new_size);
data_size += new_size;
res_offsets[i] = data_size;
}
res_data.resize(data_size);
}
};
class FunctionLemmatize : public IFunction
{
public:
static constexpr auto name = "lemmatize";
static FunctionPtr create(ContextPtr context)
{
if (!context->getSettingsRef().allow_experimental_nlp_functions)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Natural language processing function '{}' is experimental. Set `allow_experimental_nlp_functions` setting to enable it", name);
return std::make_shared<FunctionLemmatize>(context->getLemmatizers());
}
private:
Lemmatizers & lemmatizers;
public:
explicit FunctionLemmatize(Lemmatizers & lemmatizers_)
: lemmatizers(lemmatizers_) {}
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 2; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!isString(arguments[0]))
throw Exception(
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!isString(arguments[1]))
throw Exception(
"Illegal type " + arguments[1]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return arguments[1];
}
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {0}; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const override
{
const auto & langcolumn = arguments[0].column;
const auto & strcolumn = arguments[1].column;
const ColumnConst * lang_col = checkAndGetColumn<ColumnConst>(langcolumn.get());
const ColumnString * words_col = checkAndGetColumn<ColumnString>(strcolumn.get());
if (!lang_col)
throw Exception(
"Illegal column " + arguments[0].column->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
if (!words_col)
throw Exception(
"Illegal column " + arguments[1].column->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
String language = lang_col->getValue<String>();
auto lemmatizer = lemmatizers.getLemmatizer(language);
auto col_res = ColumnString::create();
LemmatizeImpl::vector(words_col->getChars(), words_col->getOffsets(), col_res->getChars(), col_res->getOffsets(), lemmatizer);
return col_res;
}
};
}
void registerFunctionLemmatize(FunctionFactory & factory)
{
factory.registerFunction<FunctionLemmatize>(FunctionFactory::CaseInsensitive);
}
}
#endif

View File

@ -1,5 +1,6 @@
#if !defined(ARCADIA_BUILD)
# include "config_functions.h"
# include "config_core.h"
#endif
namespace DB
@ -39,13 +40,18 @@ void registerFunctionDecodeXMLComponent(FunctionFactory &);
void registerFunctionExtractTextFromHTML(FunctionFactory &);
void registerFunctionToStringCutToZero(FunctionFactory &);
#if USE_BASE64
void registerFunctionBase64Encode(FunctionFactory &);
void registerFunctionBase64Decode(FunctionFactory &);
void registerFunctionTryBase64Decode(FunctionFactory &);
#endif
#if USE_NLP
void registerFunctionStem(FunctionFactory &);
void registerFunctionSynonyms(FunctionFactory &);
void registerFunctionLemmatize(FunctionFactory &);
#endif
void registerFunctionsString(FunctionFactory & factory)
{
registerFunctionRepeat(factory);
@ -79,11 +85,18 @@ void registerFunctionsString(FunctionFactory & factory)
registerFunctionDecodeXMLComponent(factory);
registerFunctionExtractTextFromHTML(factory);
registerFunctionToStringCutToZero(factory);
#if USE_BASE64
registerFunctionBase64Encode(factory);
registerFunctionBase64Decode(factory);
registerFunctionTryBase64Decode(factory);
#endif
#if USE_NLP
registerFunctionStem(factory);
registerFunctionSynonyms(factory);
registerFunctionLemmatize(factory);
#endif
}
}

135
src/Functions/stem.cpp Normal file
View File

@ -0,0 +1,135 @@
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_NLP
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context.h>
#include <libstemmer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int SUPPORT_IS_DISABLED;
}
namespace
{
struct StemImpl
{
static void vector(
const ColumnString::Chars & data,
const ColumnString::Offsets & offsets,
ColumnString::Chars & res_data,
ColumnString::Offsets & res_offsets,
const String & language)
{
sb_stemmer * stemmer = sb_stemmer_new(language.data(), "UTF_8");
if (stemmer == nullptr)
{
throw Exception(
"Language " + language + " is not supported for function stem",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
res_data.resize(data.size());
res_offsets.assign(offsets);
UInt64 data_size = 0;
for (UInt64 i = 0; i < offsets.size(); ++i)
{
/// Note that accessing -1th element is valid for PaddedPODArray.
size_t original_size = offsets[i] - offsets[i - 1];
const sb_symbol * result = sb_stemmer_stem(stemmer,
reinterpret_cast<const uint8_t *>(data.data() + offsets[i - 1]),
original_size - 1);
size_t new_size = sb_stemmer_length(stemmer) + 1;
memcpy(res_data.data() + data_size, result, new_size);
data_size += new_size;
res_offsets[i] = data_size;
}
res_data.resize(data_size);
sb_stemmer_delete(stemmer);
}
};
class FunctionStem : public IFunction
{
public:
static constexpr auto name = "stem";
static FunctionPtr create(ContextPtr context)
{
if (!context->getSettingsRef().allow_experimental_nlp_functions)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Natural language processing function '{}' is experimental. Set `allow_experimental_nlp_functions` setting to enable it", name);
return std::make_shared<FunctionStem>();
}
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 2; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!isString(arguments[0]))
throw Exception(
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!isString(arguments[1]))
throw Exception(
"Illegal type " + arguments[1]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return arguments[1];
}
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {0}; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const override
{
const auto & langcolumn = arguments[0].column;
const auto & strcolumn = arguments[1].column;
const ColumnConst * lang_col = checkAndGetColumn<ColumnConst>(langcolumn.get());
const ColumnString * words_col = checkAndGetColumn<ColumnString>(strcolumn.get());
if (!lang_col)
throw Exception(
"Illegal column " + arguments[0].column->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
if (!words_col)
throw Exception(
"Illegal column " + arguments[1].column->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
String language = lang_col->getValue<String>();
auto col_res = ColumnString::create();
StemImpl::vector(words_col->getChars(), words_col->getOffsets(), col_res->getChars(), col_res->getOffsets(), language);
return col_res;
}
};
}
void registerFunctionStem(FunctionFactory & factory)
{
factory.registerFunction<FunctionStem>(FunctionFactory::CaseInsensitive);
}
}
#endif

128
src/Functions/synonyms.cpp Normal file
View File

@ -0,0 +1,128 @@
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_NLP
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context.h>
#include <Interpreters/SynonymsExtensions.h>
#include <string_view>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int SUPPORT_IS_DISABLED;
}
class FunctionSynonyms : public IFunction
{
public:
static constexpr auto name = "synonyms";
static FunctionPtr create(ContextPtr context)
{
if (!context->getSettingsRef().allow_experimental_nlp_functions)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Natural language processing function '{}' is experimental. Set `allow_experimental_nlp_functions` setting to enable it", name);
return std::make_shared<FunctionSynonyms>(context->getSynonymsExtensions());
}
private:
SynonymsExtensions & extensions;
public:
explicit FunctionSynonyms(SynonymsExtensions & extensions_)
: extensions(extensions_) {}
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 2; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!isString(arguments[0]))
throw Exception(
"Illegal type " + arguments[0]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!isString(arguments[1]))
throw Exception(
"Illegal type " + arguments[1]->getName() + " of argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>());
}
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {0}; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
const auto & extcolumn = arguments[0].column;
const auto & strcolumn = arguments[1].column;
const ColumnConst * ext_col = checkAndGetColumn<ColumnConst>(extcolumn.get());
const ColumnString * word_col = checkAndGetColumn<ColumnString>(strcolumn.get());
if (!ext_col)
throw Exception(
"Illegal column " + arguments[0].column->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
if (!word_col)
throw Exception(
"Illegal column " + arguments[1].column->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
String ext_name = ext_col->getValue<String>();
auto extension = extensions.getExtension(ext_name);
/// Create and fill the result array.
const DataTypePtr & elem_type = static_cast<const DataTypeArray &>(*result_type).getNestedType();
auto out = ColumnArray::create(elem_type->createColumn());
IColumn & out_data = out->getData();
IColumn::Offsets & out_offsets = out->getOffsets();
const ColumnString::Chars & data = word_col->getChars();
const ColumnString::Offsets & offsets = word_col->getOffsets();
out_data.reserve(input_rows_count);
out_offsets.resize(input_rows_count);
IColumn::Offset current_offset = 0;
for (size_t i = 0; i < offsets.size(); ++i)
{
std::string_view word(reinterpret_cast<const char *>(data.data() + offsets[i - 1]), offsets[i] - offsets[i - 1] - 1);
const auto * synset = extension->getSynonyms(word);
if (synset)
{
for (const auto & token : *synset)
out_data.insert(Field(token.data(), token.size()));
current_offset += synset->size();
}
out_offsets[i] = current_offset;
}
return out;
}
};
void registerFunctionSynonyms(FunctionFactory & factory)
{
factory.registerFunction<FunctionSynonyms>(FunctionFactory::CaseInsensitive);
}
}
#endif

View File

@ -16,10 +16,10 @@ public:
static FunctionPtr create(ContextPtr context)
{
return std::make_shared<FunctionTcpPort>(context->getTCPPort());
return std::make_shared<FunctionTcpPort>(context->isDistributed(), context->getTCPPort());
}
explicit FunctionTcpPort(UInt16 port_) : port(port_)
explicit FunctionTcpPort(bool is_distributed_, UInt16 port_) : is_distributed(is_distributed_), port(port_)
{
}
@ -31,12 +31,17 @@ public:
bool isDeterministic() const override { return false; }
bool isDeterministicInScopeOfQuery() const override { return true; }
bool isSuitableForConstantFolding() const override { return !is_distributed; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
return DataTypeUInt16().createColumnConst(input_rows_count, port);
}
private:
bool is_distributed;
const UInt64 port;
};

View File

@ -3,6 +3,7 @@
#include <common/DateLUT.h>
#include <Core/Field.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/Context.h>
namespace DB
@ -16,9 +17,13 @@ class FunctionTimezone : public IFunction
{
public:
static constexpr auto name = "timezone";
static FunctionPtr create(ContextPtr)
static FunctionPtr create(ContextPtr context)
{
return std::make_shared<FunctionTimezone>(context->isDistributed());
}
explicit FunctionTimezone(bool is_distributed_) : is_distributed(is_distributed_)
{
return std::make_shared<FunctionTimezone>();
}
String getName() const override
@ -36,11 +41,15 @@ public:
}
bool isDeterministic() const override { return false; }
bool isDeterministicInScopeOfQuery() const override { return true; }
bool isSuitableForConstantFolding() const override { return !is_distributed; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
return DataTypeString().createColumnConst(input_rows_count, DateLUT::instance().getTimeZone());
}
private:
bool is_distributed;
};
}

View File

@ -15,10 +15,10 @@ public:
static constexpr auto name = "uptime";
static FunctionPtr create(ContextPtr context)
{
return std::make_shared<FunctionUptime>(context->getUptimeSeconds());
return std::make_shared<FunctionUptime>(context->isDistributed(), context->getUptimeSeconds());
}
explicit FunctionUptime(time_t uptime_) : uptime(uptime_)
explicit FunctionUptime(bool is_distributed_, time_t uptime_) : is_distributed(is_distributed_), uptime(uptime_)
{
}
@ -38,6 +38,8 @@ public:
}
bool isDeterministic() const override { return false; }
bool isDeterministicInScopeOfQuery() const override { return true; }
bool isSuitableForConstantFolding() const override { return !is_distributed; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
@ -45,6 +47,7 @@ public:
}
private:
bool is_distributed;
time_t uptime;
};

View File

@ -2,6 +2,7 @@
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypeString.h>
#include <Core/Field.h>
#include <Interpreters/Context.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config_version.h>
@ -16,9 +17,13 @@ class FunctionVersion : public IFunction
{
public:
static constexpr auto name = "version";
static FunctionPtr create(ContextPtr)
static FunctionPtr create(ContextPtr context)
{
return std::make_shared<FunctionVersion>(context->isDistributed());
}
explicit FunctionVersion(bool is_distributed_) : is_distributed(is_distributed_)
{
return std::make_shared<FunctionVersion>();
}
String getName() const override
@ -27,8 +32,8 @@ public:
}
bool isDeterministic() const override { return false; }
bool isDeterministicInScopeOfQuery() const override { return false; }
bool isSuitableForConstantFolding() const override { return false; }
bool isDeterministicInScopeOfQuery() const override { return true; }
bool isSuitableForConstantFolding() const override { return !is_distributed; }
size_t getNumberOfArguments() const override
{
@ -44,6 +49,8 @@ public:
{
return DataTypeString().createColumnConst(input_rows_count, VERSION_STRING);
}
private:
bool is_distributed;
};

View File

@ -340,6 +340,7 @@ SRCS(
jumpConsistentHash.cpp
lcm.cpp
least.cpp
lemmatize.cpp
lengthUTF8.cpp
less.cpp
lessOrEquals.cpp
@ -481,6 +482,7 @@ SRCS(
sleepEachRow.cpp
sqrt.cpp
startsWith.cpp
stem.cpp
stringCutToZero.cpp
stringToH3.cpp
substring.cpp
@ -493,6 +495,7 @@ SRCS(
subtractWeeks.cpp
subtractYears.cpp
svg.cpp
synonyms.cpp
tan.cpp
tanh.cpp
tcpPort.cpp

View File

@ -54,7 +54,8 @@ public:
const ASTPtr & table_func_ptr,
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards) = 0;
Shards & remote_shards,
UInt32 shard_count) = 0;
};
}

View File

@ -47,61 +47,6 @@ SelectStreamFactory::SelectStreamFactory(
namespace
{
/// Special support for the case when `_shard_num` column is used in GROUP BY key expression.
/// This column is a constant for shard.
/// Constant expression with this column may be removed from intermediate header.
/// However, this column is not constant for initiator, and it expect intermediate header has it.
///
/// To fix it, the following trick is applied.
/// We check all GROUP BY keys which depend only on `_shard_num`.
/// Calculate such expression for current shard if it is used in header.
/// Those columns will be added to modified header as already known constants.
///
/// For local shard, missed constants will be added by converting actions.
/// For remote shard, RemoteQueryExecutor will automatically add missing constant.
Block evaluateConstantGroupByKeysWithShardNumber(
const ContextPtr & context, const ASTPtr & query_ast, const Block & header, UInt32 shard_num)
{
Block res;
ColumnWithTypeAndName shard_num_col;
shard_num_col.type = std::make_shared<DataTypeUInt32>();
shard_num_col.column = shard_num_col.type->createColumnConst(0, shard_num);
shard_num_col.name = "_shard_num";
if (auto group_by = query_ast->as<ASTSelectQuery &>().groupBy())
{
for (const auto & elem : group_by->children)
{
String key_name = elem->getColumnName();
if (header.has(key_name))
{
auto ast = elem->clone();
RequiredSourceColumnsVisitor::Data columns_context;
RequiredSourceColumnsVisitor(columns_context).visit(ast);
auto required_columns = columns_context.requiredColumns();
if (required_columns.size() != 1 || required_columns.count("_shard_num") == 0)
continue;
Block block({shard_num_col});
auto syntax_result = TreeRewriter(context).analyze(ast, {NameAndTypePair{shard_num_col.name, shard_num_col.type}});
ExpressionAnalyzer(ast, syntax_result, context).getActions(true, false)->execute(block);
res.insert(block.getByName(key_name));
}
}
}
/// We always add _shard_num constant just in case.
/// For initial query it is considered as a column from table, and may be required by intermediate block.
if (!res.has(shard_num_col.name))
res.insert(std::move(shard_num_col));
return res;
}
ActionsDAGPtr getConvertingDAG(const Block & block, const Block & header)
{
/// Convert header structure to expected.
@ -128,13 +73,16 @@ std::unique_ptr<QueryPlan> createLocalPlan(
const ASTPtr & query_ast,
const Block & header,
ContextPtr context,
QueryProcessingStage::Enum processed_stage)
QueryProcessingStage::Enum processed_stage,
UInt32 shard_num,
UInt32 shard_count)
{
checkStackSize();
auto query_plan = std::make_unique<QueryPlan>();
InterpreterSelectQuery interpreter(query_ast, context, SelectQueryOptions(processed_stage));
InterpreterSelectQuery interpreter(
query_ast, context, SelectQueryOptions(processed_stage).setShardInfo(shard_num, shard_count));
interpreter.buildQueryPlan(*query_plan);
addConvertingActions(*query_plan, header);
@ -151,38 +99,27 @@ void SelectStreamFactory::createForShard(
const ASTPtr & table_func_ptr,
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards)
Shards & remote_shards,
UInt32 shard_count)
{
auto modified_query_ast = query_ast->clone();
auto modified_header = header;
if (has_virtual_shard_num_column)
{
VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, "_shard_num", shard_info.shard_num, "toUInt32");
auto shard_num_constants = evaluateConstantGroupByKeysWithShardNumber(context, query_ast, modified_header, shard_info.shard_num);
for (auto & col : shard_num_constants)
{
if (modified_header.has(col.name))
modified_header.getByName(col.name).column = std::move(col.column);
else
modified_header.insert(std::move(col));
}
}
auto emplace_local_stream = [&]()
{
local_plans.emplace_back(createLocalPlan(modified_query_ast, modified_header, context, processed_stage));
addConvertingActions(*local_plans.back(), header);
local_plans.emplace_back(createLocalPlan(modified_query_ast, header, context, processed_stage, shard_info.shard_num, shard_count));
};
auto emplace_remote_stream = [&]()
auto emplace_remote_stream = [&](bool lazy = false, UInt32 local_delay = 0)
{
remote_shards.emplace_back(Shard{
.query = modified_query_ast,
.header = modified_header,
.header = header,
.shard_num = shard_info.shard_num,
.pool = shard_info.pool,
.lazy = false
.lazy = lazy,
.local_delay = local_delay,
});
};
@ -273,15 +210,7 @@ void SelectStreamFactory::createForShard(
/// Try our luck with remote replicas, but if they are stale too, then fallback to local replica.
/// Do it lazily to avoid connecting in the main thread.
remote_shards.emplace_back(Shard{
.query = modified_query_ast,
.header = modified_header,
.shard_num = shard_info.shard_num,
.pool = shard_info.pool,
.lazy = true,
.local_delay = local_delay
});
emplace_remote_stream(true /* lazy */, local_delay);
}
else
emplace_remote_stream();

View File

@ -26,7 +26,8 @@ public:
const ASTPtr & table_func_ptr,
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards) override;
Shards & remote_shards,
UInt32 shard_count) override;
private:
const Block header;

View File

@ -11,6 +11,7 @@
#include <Processors/QueryPlan/ReadFromRemote.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Storages/SelectQueryInfo.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
@ -165,12 +166,14 @@ void executeQuery(
stream_factory.createForShard(shard_info,
query_ast_for_shard, main_table, table_func_ptr,
new_context, plans, remote_shards);
new_context, plans, remote_shards, shards);
}
if (!remote_shards.empty())
{
const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{};
Scalars scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{};
scalars.emplace(
"_shard_count", Block{{DataTypeUInt32().createColumnConst(1, shards), std::make_shared<DataTypeUInt32>(), "_shard_count"}});
auto external_tables = context->getExternalTables();
auto plan = std::make_unique<QueryPlan>();
@ -182,9 +185,10 @@ void executeQuery(
table_func_ptr,
new_context,
throttler,
scalars,
std::move(scalars),
std::move(external_tables),
log);
log,
shards);
read_from_remote->setStepDescription("Read from remote replica");
plan->addStep(std::move(read_from_remote));

View File

@ -77,6 +77,8 @@
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <Interpreters/SynonymsExtensions.h>
#include <Interpreters/Lemmatizers.h>
#include <filesystem>
@ -349,6 +351,11 @@ struct ContextSharedPart
scope_guard dictionaries_xmls;
#if USE_NLP
mutable std::optional<SynonymsExtensions> synonyms_extensions;
mutable std::optional<Lemmatizers> lemmatizers;
#endif
String default_profile_name; /// Default profile name used for default values.
String system_profile_name; /// Profile used by system processes
String buffer_profile_name; /// Profile used by Buffer engine for flushing to the underlying
@ -1000,6 +1007,13 @@ const Block & Context::getScalar(const String & name) const
return it->second;
}
const Block * Context::tryGetLocalScalar(const String & name) const
{
auto it = local_scalars.find(name);
if (local_scalars.end() == it)
return nullptr;
return &it->second;
}
Tables Context::getExternalTables() const
{
@ -1059,6 +1073,13 @@ void Context::addScalar(const String & name, const Block & block)
}
void Context::addLocalScalar(const String & name, const Block & block)
{
assert(!isGlobalContext() || getApplicationType() == ApplicationType::LOCAL);
local_scalars[name] = block;
}
bool Context::hasScalar(const String & name) const
{
assert(!isGlobalContext() || getApplicationType() == ApplicationType::LOCAL);
@ -1505,6 +1526,29 @@ void Context::loadDictionaries(const Poco::Util::AbstractConfiguration & config)
std::make_unique<ExternalLoaderXMLConfigRepository>(config, "dictionaries_config"));
}
#if USE_NLP
SynonymsExtensions & Context::getSynonymsExtensions() const
{
auto lock = getLock();
if (!shared->synonyms_extensions)
shared->synonyms_extensions.emplace(getConfigRef());
return *shared->synonyms_extensions;
}
Lemmatizers & Context::getLemmatizers() const
{
auto lock = getLock();
if (!shared->lemmatizers)
shared->lemmatizers.emplace(getConfigRef());
return *shared->lemmatizers;
}
#endif
void Context::setProgressCallback(ProgressCallback callback)
{
/// Callback is set to a session or to a query. In the session, only one query is processed at a time. Therefore, the lock is not needed.

View File

@ -114,6 +114,11 @@ using VolumePtr = std::shared_ptr<IVolume>;
struct NamedSession;
struct BackgroundTaskSchedulingSettings;
#if USE_NLP
class SynonymsExtensions;
class Lemmatizers;
#endif
class Throttler;
using ThrottlerPtr = std::shared_ptr<Throttler>;
@ -191,11 +196,13 @@ private:
QueryStatus * process_list_elem = nullptr; /// For tracking total resource usage for query.
StorageID insertion_table = StorageID::createEmpty(); /// Saved insertion table in query context
bool is_distributed = false; /// Whether the current context it used for distributed query
String default_format; /// Format, used when server formats data by itself and if query does not have FORMAT specification.
/// Thus, used in HTTP interface. If not specified - then some globally default format is used.
TemporaryTablesMapping external_tables_mapping;
Scalars scalars;
Scalars local_scalars;
/// Fields for distributed s3 function
std::optional<ReadTaskCallback> next_task_callback;
@ -454,6 +461,9 @@ public:
void addScalar(const String & name, const Block & block);
bool hasScalar(const String & name) const;
const Block * tryGetLocalScalar(const String & name) const;
void addLocalScalar(const String & name, const Block & block);
const QueryAccessInfo & getQueryAccessInfo() const { return query_access_info; }
void addQueryAccessInfo(
const String & quoted_database_name,
@ -500,6 +510,9 @@ public:
void setInsertionTable(StorageID db_and_table) { insertion_table = std::move(db_and_table); }
const StorageID & getInsertionTable() const { return insertion_table; }
void setDistributed(bool is_distributed_) { is_distributed = is_distributed_; }
bool isDistributed() const { return is_distributed; }
String getDefaultFormat() const; /// If default_format is not specified, some global default format is returned.
void setDefaultFormat(const String & name);
@ -534,6 +547,11 @@ public:
void tryCreateEmbeddedDictionaries() const;
void loadDictionaries(const Poco::Util::AbstractConfiguration & config);
#if USE_NLP
SynonymsExtensions & getSynonymsExtensions() const;
Lemmatizers & getLemmatizers() const;
#endif
void setExternalModelsConfig(const ConfigurationPtr & config, const std::string & config_name = "models_config");
/// I/O formats.

View File

@ -126,6 +126,7 @@ ExpressionAnalyzerData::~ExpressionAnalyzerData() = default;
ExpressionAnalyzer::ExtractedSettings::ExtractedSettings(const Settings & settings_)
: use_index_for_in_with_subqueries(settings_.use_index_for_in_with_subqueries)
, size_limits_for_set(settings_.max_rows_in_set, settings_.max_bytes_in_set, settings_.set_overflow_mode)
, distributed_group_by_no_merge(settings_.distributed_group_by_no_merge)
{}
ExpressionAnalyzer::~ExpressionAnalyzer() = default;
@ -247,21 +248,25 @@ void ExpressionAnalyzer::analyzeAggregation()
if (!node)
throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER);
/// Constant expressions have non-null column pointer at this stage.
if (node->column && isColumnConst(*node->column))
/// Only removes constant keys if it's an initiator or distributed_group_by_no_merge is enabled.
if (getContext()->getClientInfo().distributed_depth == 0 || settings.distributed_group_by_no_merge > 0)
{
select_query->group_by_with_constant_keys = true;
/// But don't remove last key column if no aggregate functions, otherwise aggregation will not work.
if (!aggregate_descriptions.empty() || size > 1)
/// Constant expressions have non-null column pointer at this stage.
if (node->column && isColumnConst(*node->column))
{
if (i + 1 < static_cast<ssize_t>(size))
group_asts[i] = std::move(group_asts.back());
select_query->group_by_with_constant_keys = true;
group_asts.pop_back();
/// But don't remove last key column if no aggregate functions, otherwise aggregation will not work.
if (!aggregate_descriptions.empty() || size > 1)
{
if (i + 1 < static_cast<ssize_t>(size))
group_asts[i] = std::move(group_asts.back());
--i;
continue;
group_asts.pop_back();
--i;
continue;
}
}
}

View File

@ -90,6 +90,7 @@ private:
{
const bool use_index_for_in_with_subqueries;
const SizeLimits size_limits_for_set;
const UInt64 distributed_group_by_no_merge;
ExtractedSettings(const Settings & settings_);
};

View File

@ -4,6 +4,7 @@
#include <Interpreters/IInterpreter.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Parsers/IAST_fwd.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
@ -16,6 +17,14 @@ public:
, options(options_)
, max_streams(context->getSettingsRef().max_threads)
{
if (options.shard_num)
context->addLocalScalar(
"_shard_num",
Block{{DataTypeUInt32().createColumnConst(1, *options.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}});
if (options.shard_count)
context->addLocalScalar(
"_shard_count",
Block{{DataTypeUInt32().createColumnConst(1, *options.shard_count), std::make_shared<DataTypeUInt32>(), "_shard_count"}});
}
virtual void buildQueryPlan(QueryPlan & query_plan) = 0;

View File

@ -387,6 +387,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
options, joined_tables.tablesWithColumns(), required_result_column_names, table_join);
query_info.syntax_analyzer_result = syntax_analyzer_result;
context->setDistributed(syntax_analyzer_result->is_remote_storage);
if (storage && !query.final() && storage->needRewriteQueryWithFinal(syntax_analyzer_result->requiredSourceColumns()))
query.setFinal();

View File

@ -0,0 +1,100 @@
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_NLP
#include <Common/Exception.h>
#include <Interpreters/Lemmatizers.h>
#include <RdrLemmatizer.h>
#include <vector>
#include <filesystem>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int INVALID_CONFIG_PARAMETER;
}
class Lemmatizer : public ILemmatizer
{
private:
RdrLemmatizer lemmatizer;
public:
explicit Lemmatizer(const String & path) : lemmatizer(path.data()) {}
TokenPtr lemmatize(const char * token) override
{
return TokenPtr(lemmatizer.Lemmatize(token));
}
};
/// Duplicate of code from StringUtils.h. Copied here for less dependencies.
static bool startsWith(const std::string & s, const char * prefix)
{
return s.size() >= strlen(prefix) && 0 == memcmp(s.data(), prefix, strlen(prefix));
}
Lemmatizers::Lemmatizers(const Poco::Util::AbstractConfiguration & config)
{
String prefix = "lemmatizers";
Poco::Util::AbstractConfiguration::Keys keys;
if (!config.has(prefix))
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "No lemmatizers specified in server config on prefix '{}'", prefix);
config.keys(prefix, keys);
for (const auto & key : keys)
{
if (startsWith(key, "lemmatizer"))
{
const auto & lemm_name = config.getString(prefix + "." + key + ".lang", "");
const auto & lemm_path = config.getString(prefix + "." + key + ".path", "");
if (lemm_name.empty())
throw Exception("Lemmatizer language in config is not specified here: " + prefix + "." + key + ".lang",
ErrorCodes::INVALID_CONFIG_PARAMETER);
if (lemm_path.empty())
throw Exception("Path to lemmatizer in config is not specified here: " + prefix + "." + key + ".path",
ErrorCodes::INVALID_CONFIG_PARAMETER);
paths[lemm_name] = lemm_path;
}
else
throw Exception("Unknown element in config: " + prefix + "." + key + ", must be 'lemmatizer'",
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
}
Lemmatizers::LemmPtr Lemmatizers::getLemmatizer(const String & name)
{
std::lock_guard guard(mutex);
if (lemmatizers.find(name) != lemmatizers.end())
return lemmatizers[name];
if (paths.find(name) != paths.end())
{
if (!std::filesystem::exists(paths[name]))
throw Exception("Incorrect path to lemmatizer: " + paths[name],
ErrorCodes::INVALID_CONFIG_PARAMETER);
lemmatizers[name] = std::make_shared<Lemmatizer>(paths[name]);
return lemmatizers[name];
}
throw Exception("Lemmatizer named: '" + name + "' is not found",
ErrorCodes::INVALID_CONFIG_PARAMETER);
}
}
#endif

View File

@ -0,0 +1,48 @@
#pragma once
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_NLP
#include <common/types.h>
#include <Poco/Util/Application.h>
#include <mutex>
#include <unordered_map>
namespace DB
{
class ILemmatizer
{
public:
using TokenPtr = std::shared_ptr<char []>;
virtual TokenPtr lemmatize(const char * token) = 0;
virtual ~ILemmatizer() = default;
};
class Lemmatizers
{
public:
using LemmPtr = std::shared_ptr<ILemmatizer>;
private:
std::mutex mutex;
std::unordered_map<String, LemmPtr> lemmatizers;
std::unordered_map<String, String> paths;
public:
explicit Lemmatizers(const Poco::Util::AbstractConfiguration & config);
LemmPtr getLemmatizer(const String & name);
};
}
#endif

View File

@ -6,6 +6,7 @@
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTKillQueryQuery.h>
#include <Parsers/queryNormalization.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Common/typeid_cast.h>
#include <Common/Exception.h>
#include <Common/CurrentThread.h>
@ -297,7 +298,10 @@ QueryStatus::QueryStatus(
{
}
QueryStatus::~QueryStatus() = default;
QueryStatus::~QueryStatus()
{
assert(executors.empty());
}
void QueryStatus::setQueryStreams(const BlockIO & io)
{
@ -351,6 +355,11 @@ CancellationCode QueryStatus::cancelQuery(bool kill)
BlockInputStreamPtr input_stream;
BlockOutputStreamPtr output_stream;
SCOPE_EXIT({
std::lock_guard lock(query_streams_mutex);
for (auto * e : executors)
e->cancel();
});
if (tryGetQueryStreams(input_stream, output_stream))
{
@ -366,6 +375,20 @@ CancellationCode QueryStatus::cancelQuery(bool kill)
return CancellationCode::CancelSent;
}
void QueryStatus::addPipelineExecutor(PipelineExecutor * e)
{
std::lock_guard lock(query_streams_mutex);
assert(std::find(executors.begin(), executors.end(), e) == executors.end());
executors.push_back(e);
}
void QueryStatus::removePipelineExecutor(PipelineExecutor * e)
{
std::lock_guard lock(query_streams_mutex);
assert(std::find(executors.begin(), executors.end(), e) != executors.end());
std::erase_if(executors, [e](PipelineExecutor * x) { return x == e; });
}
void QueryStatus::setUserProcessList(ProcessListForUser * user_process_list_)
{

View File

@ -22,6 +22,7 @@
#include <mutex>
#include <shared_mutex>
#include <unordered_map>
#include <vector>
namespace CurrentMetrics
@ -34,6 +35,7 @@ namespace DB
struct Settings;
class IAST;
class PipelineExecutor;
struct ProcessListForUser;
class QueryStatus;
@ -109,6 +111,9 @@ protected:
BlockInputStreamPtr query_stream_in;
BlockOutputStreamPtr query_stream_out;
/// Array of PipelineExecutors to be cancelled when a cancelQuery is received
std::vector<PipelineExecutor *> executors;
enum QueryStreamsStatus
{
NotInitialized,
@ -183,6 +188,12 @@ public:
CancellationCode cancelQuery(bool kill);
bool isKilled() const { return is_killed; }
/// Adds a pipeline to the QueryStatus
void addPipelineExecutor(PipelineExecutor * e);
/// Removes a pipeline to the QueryStatus
void removePipelineExecutor(PipelineExecutor * e);
};

View File

@ -1,6 +1,7 @@
#pragma once
#include <Core/QueryProcessingStage.h>
#include <optional>
namespace DB
{
@ -45,6 +46,12 @@ struct SelectQueryOptions
bool is_subquery = false; // non-subquery can also have subquery_depth > 0, e.g. insert select
bool with_all_cols = false; /// asterisk include materialized and aliased columns
/// These two fields are used to evaluate shardNum() and shardCount() function when
/// prefer_localhost_replica == 1 and local instance is selected. They are needed because local
/// instance might have multiple shards and scalars can only hold one value.
std::optional<UInt32> shard_num;
std::optional<UInt32> shard_count;
SelectQueryOptions(
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete,
size_t depth = 0,
@ -124,6 +131,13 @@ struct SelectQueryOptions
with_all_cols = value;
return *this;
}
SelectQueryOptions & setShardInfo(UInt32 shard_num_, UInt32 shard_count_)
{
shard_num = shard_num_;
shard_count = shard_count_;
return *this;
}
};
}

View File

@ -0,0 +1,157 @@
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_NLP
#include <Common/Exception.h>
#include <Interpreters/SynonymsExtensions.h>
#include <fstream>
#include <list>
#include <boost/algorithm/string.hpp>
#include <wnb/core/wordnet.hh>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int INVALID_CONFIG_PARAMETER;
}
class PlainSynonymsExtension : public ISynonymsExtension
{
private:
using Container = std::list<Synset>;
using LookupTable = std::unordered_map<std::string_view, Synset *>;
Container synsets;
LookupTable table;
public:
explicit PlainSynonymsExtension(const String & path)
{
std::ifstream file(path);
if (!file.is_open())
throw Exception("Cannot find synonyms extension at: " + path,
ErrorCodes::INVALID_CONFIG_PARAMETER);
String line;
while (std::getline(file, line))
{
Synset synset;
boost::split(synset, line, boost::is_any_of("\t "));
if (!synset.empty())
{
synsets.emplace_back(std::move(synset));
for (const auto &word : synsets.back())
table[word] = &synsets.back();
}
}
}
const Synset * getSynonyms(std::string_view token) const override
{
auto it = table.find(token);
if (it != table.end())
return (*it).second;
return nullptr;
}
};
class WordnetSynonymsExtension : public ISynonymsExtension
{
private:
wnb::wordnet wn;
public:
explicit WordnetSynonymsExtension(const String & path) : wn(path) {}
const Synset * getSynonyms(std::string_view token) const override
{
return wn.get_synset(std::string(token));
}
};
/// Duplicate of code from StringUtils.h. Copied here for less dependencies.
static bool startsWith(const std::string & s, const char * prefix)
{
return s.size() >= strlen(prefix) && 0 == memcmp(s.data(), prefix, strlen(prefix));
}
SynonymsExtensions::SynonymsExtensions(const Poco::Util::AbstractConfiguration & config)
{
String prefix = "synonyms_extensions";
Poco::Util::AbstractConfiguration::Keys keys;
if (!config.has(prefix))
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER,
"No synonims extensions specified in server config on prefix '{}'", prefix);
config.keys(prefix, keys);
for (const auto & key : keys)
{
if (startsWith(key, "extension"))
{
const auto & ext_name = config.getString(prefix + "." + key + ".name", "");
const auto & ext_path = config.getString(prefix + "." + key + ".path", "");
const auto & ext_type = config.getString(prefix + "." + key + ".type", "");
if (ext_name.empty())
throw Exception("Extension name in config is not specified here: " + prefix + "." + key + ".name",
ErrorCodes::INVALID_CONFIG_PARAMETER);
if (ext_path.empty())
throw Exception("Extension path in config is not specified here: " + prefix + "." + key + ".path",
ErrorCodes::INVALID_CONFIG_PARAMETER);
if (ext_type.empty())
throw Exception("Extension type in config is not specified here: " + prefix + "." + key + ".type",
ErrorCodes::INVALID_CONFIG_PARAMETER);
if (ext_type != "plain" && ext_type != "wordnet")
throw Exception("Unknown extension type in config: " + prefix + "." + key + ".type, must be 'plain' or 'wordnet'",
ErrorCodes::INVALID_CONFIG_PARAMETER);
info[ext_name].path = ext_path;
info[ext_name].type = ext_type;
}
else
throw Exception("Unknown element in config: " + prefix + "." + key + ", must be 'extension'",
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
}
SynonymsExtensions::ExtPtr SynonymsExtensions::getExtension(const String & name)
{
std::lock_guard guard(mutex);
if (extensions.find(name) != extensions.end())
return extensions[name];
if (info.find(name) != info.end())
{
const Info & ext_info = info[name];
if (ext_info.type == "plain")
extensions[name] = std::make_shared<PlainSynonymsExtension>(ext_info.path);
else if (ext_info.type == "wordnet")
extensions[name] = std::make_shared<WordnetSynonymsExtension>(ext_info.path);
else
throw Exception("Unknown extension type: " + ext_info.type, ErrorCodes::LOGICAL_ERROR);
return extensions[name];
}
throw Exception("Extension named: '" + name + "' is not found",
ErrorCodes::INVALID_CONFIG_PARAMETER);
}
}
#endif

View File

@ -0,0 +1,57 @@
#pragma once
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_NLP
#include <common/types.h>
#include <Poco/Util/Application.h>
#include <memory>
#include <mutex>
#include <string_view>
#include <vector>
#include <unordered_map>
namespace DB
{
class ISynonymsExtension
{
public:
using Synset = std::vector<String>;
virtual const Synset * getSynonyms(std::string_view token) const = 0;
virtual ~ISynonymsExtension() = default;
};
class SynonymsExtensions
{
public:
using ExtPtr = std::shared_ptr<ISynonymsExtension>;
explicit SynonymsExtensions(const Poco::Util::AbstractConfiguration & config);
ExtPtr getExtension(const String & name);
private:
struct Info
{
String path;
String type;
};
using ExtContainer = std::unordered_map<String, ExtPtr>;
using InfoContainer = std::unordered_map<String, Info>;
std::mutex mutex;
ExtContainer extensions;
InfoContainer info;
};
}
#endif

View File

@ -108,6 +108,7 @@ SRCS(
JoinSwitcher.cpp
JoinToSubqueryTransformVisitor.cpp
JoinedTables.cpp
Lemmatizers.cpp
LogicalExpressionsOptimizer.cpp
MarkTableIdentifiersVisitor.cpp
MergeJoin.cpp
@ -145,6 +146,7 @@ SRCS(
SortedBlocksWriter.cpp
StorageID.cpp
SubqueryForSet.cpp
SynonymsExtensions.cpp
SystemLog.cpp
TableJoin.cpp
TablesStatus.cpp

View File

@ -45,6 +45,8 @@ PipelineExecutor::PipelineExecutor(Processors & processors_, QueryStatus * elem)
try
{
graph = std::make_unique<ExecutingGraph>(processors);
if (process_list_element)
process_list_element->addPipelineExecutor(this);
}
catch (Exception & exception)
{
@ -59,6 +61,12 @@ PipelineExecutor::PipelineExecutor(Processors & processors_, QueryStatus * elem)
}
}
PipelineExecutor::~PipelineExecutor()
{
if (process_list_element)
process_list_element->removePipelineExecutor(this);
}
void PipelineExecutor::addChildlessProcessorsToStack(Stack & stack)
{
UInt64 num_processors = processors.size();

View File

@ -31,6 +31,7 @@ public:
///
/// Explicit graph representation is built in constructor. Throws if graph is not correct.
explicit PipelineExecutor(Processors & processors_, QueryStatus * elem = nullptr);
~PipelineExecutor();
/// Execute pipeline in multiple threads. Must be called once.
/// In case of exception during execution throws any occurred.
@ -127,7 +128,7 @@ private:
ProcessorsMap processors_map;
/// Now it's used to check if query was killed.
QueryStatus * process_list_element = nullptr;
QueryStatus * const process_list_element = nullptr;
/// Graph related methods.
bool expandPipeline(Stack & stack, UInt64 pid);

View File

@ -67,13 +67,16 @@ static std::unique_ptr<QueryPlan> createLocalPlan(
const ASTPtr & query_ast,
const Block & header,
ContextPtr context,
QueryProcessingStage::Enum processed_stage)
QueryProcessingStage::Enum processed_stage,
UInt32 shard_num,
UInt32 shard_count)
{
checkStackSize();
auto query_plan = std::make_unique<QueryPlan>();
InterpreterSelectQuery interpreter(query_ast, context, SelectQueryOptions(processed_stage));
InterpreterSelectQuery interpreter(
query_ast, context, SelectQueryOptions(processed_stage).setShardInfo(shard_num, shard_count));
interpreter.buildQueryPlan(*query_plan);
addConvertingActions(*query_plan, header);
@ -92,7 +95,8 @@ ReadFromRemote::ReadFromRemote(
ThrottlerPtr throttler_,
Scalars scalars_,
Tables external_tables_,
Poco::Logger * log_)
Poco::Logger * log_,
UInt32 shard_count_)
: ISourceStep(DataStream{.header = std::move(header_)})
, shards(std::move(shards_))
, stage(stage_)
@ -103,6 +107,7 @@ ReadFromRemote::ReadFromRemote(
, scalars(std::move(scalars_))
, external_tables(std::move(external_tables_))
, log(log_)
, shard_count(shard_count_)
{
}
@ -119,12 +124,12 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto
}
auto lazily_create_stream = [
pool = shard.pool, shard_num = shard.shard_num, query = shard.query, header = shard.header,
pool = shard.pool, shard_num = shard.shard_num, shard_count = shard_count, query = shard.query, header = shard.header,
context = context, throttler = throttler,
main_table = main_table, table_func_ptr = table_func_ptr,
scalars = scalars, external_tables = external_tables,
stage = stage, local_delay = shard.local_delay,
add_agg_info, add_totals, add_extremes, async_read]()
add_agg_info, add_totals, add_extremes, async_read]() mutable
-> Pipe
{
auto current_settings = context->getSettingsRef();
@ -157,7 +162,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto
if (try_results.empty() || local_delay < max_remote_delay)
{
auto plan = createLocalPlan(query, header, context, stage);
auto plan = createLocalPlan(query, header, context, stage, shard_num, shard_count);
return QueryPipeline::getPipe(std::move(*plan->buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context),
BuildQueryPipelineSettings::fromContext(context))));
@ -171,6 +176,8 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto
String query_string = formattedAST(query);
scalars["_shard_num"]
= Block{{DataTypeUInt32().createColumnConst(1, shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
pool, std::move(connections), query_string, header, context, throttler, scalars, external_tables, stage);
@ -197,6 +204,8 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::
String query_string = formattedAST(shard.query);
scalars["_shard_num"]
= Block{{DataTypeUInt32().createColumnConst(1, shard.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage);
remote_query_executor->setLogger(log);

View File

@ -29,7 +29,8 @@ public:
ThrottlerPtr throttler_,
Scalars scalars_,
Tables external_tables_,
Poco::Logger * log_);
Poco::Logger * log_,
UInt32 shard_count_);
String getName() const override { return "ReadFromRemote"; }
@ -50,6 +51,7 @@ private:
Poco::Logger * log;
UInt32 shard_count;
void addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard);
void addPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard);
};

View File

@ -4124,7 +4124,8 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
candidate.before_aggregation = analysis_result.before_aggregation->clone();
auto required_columns = candidate.before_aggregation->foldActionsByProjection(keys, projection.sample_block_for_keys);
if (required_columns.empty() && !keys.empty())
// TODO Let's find out the exact required_columns for keys.
if (required_columns.empty() && (!keys.empty() && !candidate.before_aggregation->getRequiredColumns().empty()))
continue;
if (analysis_result.optimize_aggregation_in_order)

View File

@ -123,7 +123,7 @@
<source>
<executable>
<command>echo "1\tValue"</command>
<command>printf "1\tValue\n"</command>
<format>TabSeparated</format>
<implicit_key>false</implicit_key>
</executable>
@ -197,7 +197,7 @@
<source>
<executable>
<command>echo "1\tFirstKey\tValue"</command>
<command>printf "1\tFirstKey\tValue\n"</command>
<format>TabSeparated</format>
<implicit_key>false</implicit_key>
</executable>

View File

View File

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<yandex>
<synonyms_extensions>
<extension>
<name>en</name>
<type>plain</type>
<path>/etc/clickhouse-server/dictionaries/ext-en.txt</path>
</extension>
<extension>
<name>ru</name>
<type>plain</type>
<path>/etc/clickhouse-server/dictionaries/ext-ru.txt</path>
</extension>
</synonyms_extensions>
<lemmatizers>
<lemmatizer>
<lang>en</lang>
<path>/etc/clickhouse-server/dictionaries/lem-en.bin</path>
</lemmatizer>
</lemmatizers>
</yandex>

View File

@ -0,0 +1,4 @@
important big critical crucial essential
happy cheerful delighted ecstatic
however nonetheless but yet
quiz query check exam

View File

@ -0,0 +1,4 @@
важный большой высокий хороший главный
веселый счастливый живой яркий смешной
хотя однако но правда
экзамен испытание проверка

Binary file not shown.

View File

@ -0,0 +1,47 @@
import os
import sys
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance', main_configs=['configs/dicts_config.xml'])
def copy_file_to_container(local_path, dist_path, container_id):
os.system("docker cp {local} {cont_id}:{dist}".format(local=local_path, cont_id=container_id, dist=dist_path))
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
copy_file_to_container(os.path.join(SCRIPT_DIR, 'dictionaries/.'), '/etc/clickhouse-server/dictionaries', instance.docker_id)
yield cluster
finally:
cluster.shutdown()
def test_lemmatize(start_cluster):
assert instance.query("SELECT lemmatize('en', 'wolves')", settings={"allow_experimental_nlp_functions": 1}) == "wolf\n"
assert instance.query("SELECT lemmatize('en', 'dogs')", settings={"allow_experimental_nlp_functions": 1}) == "dog\n"
assert instance.query("SELECT lemmatize('en', 'looking')", settings={"allow_experimental_nlp_functions": 1}) == "look\n"
assert instance.query("SELECT lemmatize('en', 'took')", settings={"allow_experimental_nlp_functions": 1}) == "take\n"
assert instance.query("SELECT lemmatize('en', 'imported')", settings={"allow_experimental_nlp_functions": 1}) == "import\n"
assert instance.query("SELECT lemmatize('en', 'tokenized')", settings={"allow_experimental_nlp_functions": 1}) == "tokenize\n"
assert instance.query("SELECT lemmatize('en', 'flown')", settings={"allow_experimental_nlp_functions": 1}) == "fly\n"
def test_synonyms_extensions(start_cluster):
assert instance.query("SELECT synonyms('en', 'crucial')", settings={"allow_experimental_nlp_functions": 1}) == "['important','big','critical','crucial','essential']\n"
assert instance.query("SELECT synonyms('en', 'cheerful')", settings={"allow_experimental_nlp_functions": 1}) == "['happy','cheerful','delighted','ecstatic']\n"
assert instance.query("SELECT synonyms('en', 'yet')", settings={"allow_experimental_nlp_functions": 1}) == "['however','nonetheless','but','yet']\n"
assert instance.query("SELECT synonyms('en', 'quiz')", settings={"allow_experimental_nlp_functions": 1}) == "['quiz','query','check','exam']\n"
assert instance.query("SELECT synonyms('ru', 'главный')", settings={"allow_experimental_nlp_functions": 1}) == "['важный','большой','высокий','хороший','главный']\n"
assert instance.query("SELECT synonyms('ru', 'веселый')", settings={"allow_experimental_nlp_functions": 1}) == "['веселый','счастливый','живой','яркий','смешной']\n"
assert instance.query("SELECT synonyms('ru', 'правда')", settings={"allow_experimental_nlp_functions": 1}) == "['хотя','однако','но','правда']\n"
assert instance.query("SELECT synonyms('ru', 'экзамен')", settings={"allow_experimental_nlp_functions": 1}) == "['экзамен','испытание','проверка']\n"

View File

@ -0,0 +1,16 @@
<yandex>
<remote_servers>
<two_shards>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</two_shards>
</remote_servers>
</yandex>

View File

@ -0,0 +1,30 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_remote(start_cluster):
assert (
node1.query(
"""select hostName() h, tcpPort() p, count() from clusterAllReplicas("two_shards", system.one) group by h, p order by h, p"""
)
== "node1\t9000\t1\nnode2\t9000\t1\n"
)

20
tests/performance/nlp.xml Normal file
View File

@ -0,0 +1,20 @@
<test>
<settings>
<allow_experimental_nlp_functions>1</allow_experimental_nlp_functions>
</settings>
<preconditions>
<table_exists>hits_100m_single</table_exists>
</preconditions>
<create_query>CREATE TABLE hits_100m_words (words Array(String), UserID UInt64) ENGINE Memory</create_query>
<create_query>CREATE TABLE hits_100m_words_ws (words Array(String), UserID UInt64) ENGINE Memory</create_query>
<query>INSERT INTO hits_100m_words SELECT splitByNonAlpha(SearchPhrase) AS words, UserID FROM hits_100m_single WHERE length(words) > 0</query>
<query>INSERT INTO hits_100m_words_ws SELECT splitByWhitespace(SearchPhrase) AS words, UserID FROM hits_100m_single WHERE length(words) > 0</query>
<query>SELECT arrayMap(x -> stem('ru', x), words) FROM hits_100m_words FORMAT Null</query>
<drop_query>DROP TABLE IF EXISTS hits_100m_words</drop_query>
<drop_query>DROP TABLE IF EXISTS hits_100m_words_ws</drop_query>
</test>

View File

@ -11,6 +11,4 @@ SELECT _shard_num + dummy s, count() FROM remote('127.0.0.{1,2}', system.one) GR
SELECT _shard_num FROM remote('127.0.0.{1,2}', system.one) ORDER BY _shard_num;
SELECT _shard_num s FROM remote('127.0.0.{1,2}', system.one) ORDER BY _shard_num;
SELECT _shard_num s, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY s order by s;
select materialize(_shard_num), * from remote('127.{1,2}', system.one) limit 1 by dummy format Null;
SELECT _shard_num, count() FROM remote('127.0.0.{1,2}', system.one) GROUP BY _shard_num order by _shard_num;

View File

@ -0,0 +1,8 @@
['It','is','quite','a','wonderful','day','isn','t','it']
['There','is','so','much','to','learn']
['22','00','email','yandex','ru']
['Токенизация','каких','либо','других','языков']
['It','is','quite','a','wonderful','day,','isn\'t','it?']
['There','is....','so','much','to','learn!']
['22:00','email@yandex.ru']
['Токенизация','каких-либо','других','языков?']

View File

@ -0,0 +1,11 @@
SET allow_experimental_nlp_functions = 1;
SELECT splitByNonAlpha('It is quite a wonderful day, isn\'t it?');
SELECT splitByNonAlpha('There is.... so much to learn!');
SELECT splitByNonAlpha('22:00 email@yandex.ru');
SELECT splitByNonAlpha('Токенизация каких-либо других языков?');
SELECT splitByWhitespace('It is quite a wonderful day, isn\'t it?');
SELECT splitByWhitespace('There is.... so much to learn!');
SELECT splitByWhitespace('22:00 email@yandex.ru');
SELECT splitByWhitespace('Токенизация каких-либо других языков?');

View File

@ -0,0 +1,21 @@
given
combinatori
collect
possibl
studi
commonplac
pack
комбинаторн
получ
огранич
конечн
максимальн
суммарн
стоимост
remplissag
valeur
maximis
dépass
intens
étudi
peuvent

View File

@ -0,0 +1,25 @@
SET allow_experimental_nlp_functions = 1;
SELECT stem('en', 'given');
SELECT stem('en', 'combinatorial');
SELECT stem('en', 'collection');
SELECT stem('en', 'possibility');
SELECT stem('en', 'studied');
SELECT stem('en', 'commonplace');
SELECT stem('en', 'packing');
SELECT stem('ru', 'комбинаторной');
SELECT stem('ru', 'получила');
SELECT stem('ru', 'ограничена');
SELECT stem('ru', 'конечной');
SELECT stem('ru', 'максимальной');
SELECT stem('ru', 'суммарный');
SELECT stem('ru', 'стоимостью');
SELECT stem('fr', 'remplissage');
SELECT stem('fr', 'valeur');
SELECT stem('fr', 'maximiser');
SELECT stem('fr', 'dépasser');
SELECT stem('fr', 'intensivement');
SELECT stem('fr', 'étudié');
SELECT stem('fr', 'peuvent');

View File

@ -0,0 +1,2 @@
finished test_01948_tcp_default default SELECT * FROM\n (\n SELECT a.name as n\n FROM\n (\n SELECT \'Name\' as name, number FROM system.numbers LIMIT 2000000\n ) AS a,\n (\n SELECT \'Name\' as name, number FROM system.numbers LIMIT 2000000\n ) as b\n GROUP BY n\n )\n LIMIT 20\n FORMAT Null
finished test_01948_http_default default SELECT * FROM\n (\n SELECT a.name as n\n FROM\n (\n SELECT \'Name\' as name, number FROM system.numbers LIMIT 2000000\n ) AS a,\n (\n SELECT \'Name\' as name, number FROM system.numbers LIMIT 2000000\n ) as b\n GROUP BY n\n )\n LIMIT 20\n FORMAT Null

View File

@ -0,0 +1,54 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -e -o pipefail
function wait_for_query_to_start()
{
while [[ $($CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes WHERE query_id = '$1'") == 0 ]]; do sleep 0.1; done
}
# TCP CLIENT
$CLICKHOUSE_CLIENT --max_execution_time 10 --query_id "test_01948_tcp_$CLICKHOUSE_DATABASE" -q \
"SELECT * FROM
(
SELECT a.name as n
FROM
(
SELECT 'Name' as name, number FROM system.numbers LIMIT 2000000
) AS a,
(
SELECT 'Name' as name, number FROM system.numbers LIMIT 2000000
) as b
GROUP BY n
)
LIMIT 20
FORMAT Null" > /dev/null 2>&1 &
wait_for_query_to_start "test_01948_tcp_$CLICKHOUSE_DATABASE"
$CLICKHOUSE_CLIENT --max_execution_time 10 -q "KILL QUERY WHERE query_id = 'test_01948_tcp_$CLICKHOUSE_DATABASE' SYNC"
# HTTP CLIENT
${CLICKHOUSE_CURL_COMMAND} -q --max-time 10 -sS "$CLICKHOUSE_URL&query_id=test_01948_http_$CLICKHOUSE_DATABASE" -d \
"SELECT * FROM
(
SELECT a.name as n
FROM
(
SELECT 'Name' as name, number FROM system.numbers LIMIT 2000000
) AS a,
(
SELECT 'Name' as name, number FROM system.numbers LIMIT 2000000
) as b
GROUP BY n
)
LIMIT 20
FORMAT Null" > /dev/null 2>&1 &
wait_for_query_to_start "test_01948_http_$CLICKHOUSE_DATABASE"
$CLICKHOUSE_CURL --max-time 10 -sS "$CLICKHOUSE_URL" -d "KILL QUERY WHERE query_id = 'test_01948_http_$CLICKHOUSE_DATABASE' SYNC"

View File

@ -0,0 +1,7 @@
0 0
1 3
2 3
3 3
1 3
2 3
3 3

View File

@ -0,0 +1,3 @@
select shardNum() n, shardCount() c;
select shardNum() n, shardCount() c from remote('127.0.0.{1,2,3}', system.one) order by n settings prefer_localhost_replica = 0;
select shardNum() n, shardCount() c from remote('127.0.0.{1,2,3}', system.one) order by n settings prefer_localhost_replica = 1;