mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-03 13:02:00 +00:00
Merge branch 'master' of https://github.com/ClickHouse/ClickHouse into nanodbc
This commit is contained in:
commit
7a287e6fe9
2
.gitmodules
vendored
2
.gitmodules
vendored
@ -133,7 +133,7 @@
|
||||
url = https://github.com/unicode-org/icu.git
|
||||
[submodule "contrib/flatbuffers"]
|
||||
path = contrib/flatbuffers
|
||||
url = https://github.com/google/flatbuffers.git
|
||||
url = https://github.com/ClickHouse-Extras/flatbuffers.git
|
||||
[submodule "contrib/libc-headers"]
|
||||
path = contrib/libc-headers
|
||||
url = https://github.com/ClickHouse-Extras/libc-headers.git
|
||||
|
@ -1,6 +1,6 @@
|
||||
## ClickHouse release 21.4
|
||||
|
||||
### ClickHouse release 21.4.1 2021-04-08
|
||||
### ClickHouse release 21.4.1 2021-04-12
|
||||
|
||||
#### Backward Incompatible Change
|
||||
|
||||
|
@ -200,8 +200,8 @@ int IBridge::main(const std::vector<std::string> & /*args*/)
|
||||
http_params->setKeepAliveTimeout(keep_alive_timeout);
|
||||
|
||||
auto shared_context = Context::createShared();
|
||||
Context context(Context::createGlobal(shared_context.get()));
|
||||
context.makeGlobalContext();
|
||||
auto context = Context::createGlobal(shared_context.get());
|
||||
context->makeGlobalContext();
|
||||
|
||||
if (config().has("query_masking_rules"))
|
||||
SensitiveDataMasker::setInstance(std::make_unique<SensitiveDataMasker>(config(), "query_masking_rules"));
|
||||
|
@ -2,10 +2,11 @@
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
|
||||
#include <Poco/Util/ServerApplication.h>
|
||||
#include <Poco/Logger.h>
|
||||
#include <daemon/BaseDaemon.h>
|
||||
|
||||
#include <Poco/Logger.h>
|
||||
#include <Poco/Util/ServerApplication.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -29,9 +30,9 @@ protected:
|
||||
|
||||
int main(const std::vector<std::string> & args) override;
|
||||
|
||||
virtual const std::string bridgeName() const = 0;
|
||||
virtual std::string bridgeName() const = 0;
|
||||
|
||||
virtual HandlerFactoryPtr getHandlerFactoryPtr(Context & context) const = 0;
|
||||
virtual HandlerFactoryPtr getHandlerFactoryPtr(ContextPtr context) const = 0;
|
||||
|
||||
size_t keep_alive_timeout;
|
||||
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <common/getMemoryAmount.h>
|
||||
#include <common/logger_useful.h>
|
||||
|
||||
#include <Common/formatReadable.h>
|
||||
#include <Common/SymbolIndex.h>
|
||||
#include <Common/StackTrace.h>
|
||||
#include <Common/getNumberOfPhysicalCPUCores.h>
|
||||
|
@ -1,11 +1,11 @@
|
||||
set (DEFAULT_LIBS "-nodefaultlibs")
|
||||
|
||||
if (NOT COMPILER_CLANG)
|
||||
message (FATAL_ERROR "Darwin build is supported only for Clang")
|
||||
endif ()
|
||||
|
||||
set (DEFAULT_LIBS "${DEFAULT_LIBS} ${COVERAGE_OPTION} -lc -lm -lpthread -ldl")
|
||||
|
||||
if (COMPILER_GCC)
|
||||
set (DEFAULT_LIBS "${DEFAULT_LIBS} -lgcc_eh")
|
||||
endif ()
|
||||
|
||||
message(STATUS "Default libraries: ${DEFAULT_LIBS}")
|
||||
|
||||
set(CMAKE_CXX_STANDARD_LIBRARIES ${DEFAULT_LIBS})
|
||||
|
@ -1,3 +1,8 @@
|
||||
if (OS_DARWIN AND COMPILER_GCC)
|
||||
# AMQP-CPP requires libuv which cannot be built with GCC in macOS due to a bug: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=93082
|
||||
set (ENABLE_AMQPCPP OFF CACHE INTERNAL "")
|
||||
endif()
|
||||
|
||||
option(ENABLE_AMQPCPP "Enalbe AMQP-CPP" ${ENABLE_LIBRARIES})
|
||||
|
||||
if (NOT ENABLE_AMQPCPP)
|
||||
|
@ -1,3 +1,8 @@
|
||||
if (OS_DARWIN AND COMPILER_GCC)
|
||||
# Cassandra requires libuv which cannot be built with GCC in macOS due to a bug: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=93082
|
||||
set (ENABLE_CASSANDRA OFF CACHE INTERNAL "")
|
||||
endif()
|
||||
|
||||
option(ENABLE_CASSANDRA "Enable Cassandra" ${ENABLE_LIBRARIES})
|
||||
|
||||
if (NOT ENABLE_CASSANDRA)
|
||||
|
2
contrib/antlr4-runtime
vendored
2
contrib/antlr4-runtime
vendored
@ -1 +1 @@
|
||||
Subproject commit a2fa7b76e2ee16d2ad955e9214a90bbf79da66fc
|
||||
Subproject commit 672643e9a427ef803abf13bc8cb4989606553d64
|
2
contrib/boringssl
vendored
2
contrib/boringssl
vendored
@ -1 +1 @@
|
||||
Subproject commit fd9ce1a0406f571507068b9555d0b545b8a18332
|
||||
Subproject commit 83c1cda8a0224dc817cbad2966c7ed4acc35f02a
|
@ -16,7 +16,7 @@ endif()
|
||||
|
||||
if(CMAKE_COMPILER_IS_GNUCXX OR CLANG)
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fvisibility=hidden -fno-common -fno-exceptions -fno-rtti")
|
||||
if(APPLE)
|
||||
if(APPLE AND CLANG)
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++")
|
||||
endif()
|
||||
|
||||
|
2
contrib/flatbuffers
vendored
2
contrib/flatbuffers
vendored
@ -1 +1 @@
|
||||
Subproject commit 6df40a2471737b27271bdd9b900ab5f3aec746c7
|
||||
Subproject commit 22e3ffc66d2d7d72d1414390aa0f04ffd114a5a1
|
@ -121,12 +121,14 @@ target_include_directories(jemalloc SYSTEM PRIVATE
|
||||
target_compile_definitions(jemalloc PRIVATE -DJEMALLOC_NO_PRIVATE_NAMESPACE)
|
||||
|
||||
if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG")
|
||||
target_compile_definitions(jemalloc PRIVATE -DJEMALLOC_DEBUG=1 -DJEMALLOC_PROF=1)
|
||||
target_compile_definitions(jemalloc PRIVATE -DJEMALLOC_DEBUG=1)
|
||||
endif ()
|
||||
|
||||
if (USE_UNWIND)
|
||||
target_compile_definitions (jemalloc PRIVATE -DJEMALLOC_PROF_LIBUNWIND=1)
|
||||
target_link_libraries (jemalloc PRIVATE unwind)
|
||||
endif ()
|
||||
target_compile_definitions(jemalloc PRIVATE -DJEMALLOC_PROF=1)
|
||||
|
||||
if (USE_UNWIND)
|
||||
target_compile_definitions (jemalloc PRIVATE -DJEMALLOC_PROF_LIBUNWIND=1)
|
||||
target_link_libraries (jemalloc PRIVATE unwind)
|
||||
endif ()
|
||||
|
||||
target_compile_options(jemalloc PRIVATE -Wno-redundant-decls)
|
||||
|
2
contrib/libcxx
vendored
2
contrib/libcxx
vendored
@ -1 +1 @@
|
||||
Subproject commit 8b80a151d12b98ffe2d0c22f7cec12c3b9ff88d7
|
||||
Subproject commit 2fa892f69acbaa40f8a18c6484854a6183a34482
|
@ -56,6 +56,11 @@ if (USE_UNWIND)
|
||||
target_compile_definitions(cxx PUBLIC -DSTD_EXCEPTION_HAS_STACK_TRACE=1)
|
||||
endif ()
|
||||
|
||||
# Override the deduced attribute support that causes error.
|
||||
if (OS_DARWIN AND COMPILER_GCC)
|
||||
add_compile_definitions(_LIBCPP_INIT_PRIORITY_MAX)
|
||||
endif ()
|
||||
|
||||
target_compile_options(cxx PUBLIC $<$<COMPILE_LANGUAGE:CXX>:-nostdinc++>)
|
||||
|
||||
# Third party library may have substandard code.
|
||||
|
@ -56,6 +56,27 @@ Default value: 150.
|
||||
|
||||
ClickHouse artificially executes `INSERT` longer (adds ‘sleep’) so that the background merge process can merge parts faster than they are added.
|
||||
|
||||
## inactive_parts_to_throw_insert {#inactive-parts-to-throw-insert}
|
||||
|
||||
If the number of inactive parts in a single partition more than the `inactive_parts_to_throw_insert` value, `INSERT` is interrupted with the `Too many inactive parts (N). Parts cleaning are processing significantly slower than inserts` exception.
|
||||
|
||||
|
||||
Possible values:
|
||||
|
||||
- Any positive integer.
|
||||
|
||||
Default value: 0 (unlimited).
|
||||
|
||||
## inactive_parts_to_delay_insert {#inactive-parts-to-delay-insert}
|
||||
|
||||
If the number of inactive parts in a single partition in the table at least that many the `inactive_parts_to_delay_insert` value, an `INSERT` artificially slows down. It is useful when a server fails to clean up parts quickly enough.
|
||||
|
||||
Possible values:
|
||||
|
||||
- Any positive integer.
|
||||
|
||||
Default value: 0 (unlimited).
|
||||
|
||||
## max_delay_to_insert {#max-delay-to-insert}
|
||||
|
||||
The value in seconds, which is used to calculate the `INSERT` delay, if the number of active parts in a single partition exceeds the [parts_to_delay_insert](#parts-to-delay-insert) value.
|
||||
|
@ -649,3 +649,65 @@ Result:
|
||||
- [List of XML and HTML character entity references](https://en.wikipedia.org/wiki/List_of_XML_and_HTML_character_entity_references)
|
||||
|
||||
|
||||
|
||||
## extractTextFromHTML {#extracttextfromhtml}
|
||||
|
||||
A function to extract text from HTML or XHTML.
|
||||
It does not necessarily 100% conform to any of the HTML, XML or XHTML standards, but the implementation is reasonably accurate and it is fast. The rules are the following:
|
||||
|
||||
1. Comments are skipped. Example: `<!-- test -->`. Comment must end with `-->`. Nested comments are not possible.
|
||||
Note: constructions like `<!-->` and `<!--->` are not valid comments in HTML but they are skipped by other rules.
|
||||
2. CDATA is pasted verbatim. Note: CDATA is XML/XHTML specific. But it is processed for "best-effort" approach.
|
||||
3. `script` and `style` elements are removed with all their content. Note: it is assumed that closing tag cannot appear inside content. For example, in JS string literal has to be escaped like `"<\/script>"`.
|
||||
Note: comments and CDATA are possible inside `script` or `style` - then closing tags are not searched inside CDATA. Example: `<script><![CDATA[</script>]]></script>`. But they are still searched inside comments. Sometimes it becomes complicated: `<script>var x = "<!--"; </script> var y = "-->"; alert(x + y);</script>`
|
||||
Note: `script` and `style` can be the names of XML namespaces - then they are not treated like usual `script` or `style` elements. Example: `<script:a>Hello</script:a>`.
|
||||
Note: whitespaces are possible after closing tag name: `</script >` but not before: `< / script>`.
|
||||
4. Other tags or tag-like elements are skipped without inner content. Example: `<a>.</a>`
|
||||
Note: it is expected that this HTML is illegal: `<a test=">"></a>`
|
||||
Note: it also skips something like tags: `<>`, `<!>`, etc.
|
||||
Note: tag without end is skipped to the end of input: `<hello `
|
||||
5. HTML and XML entities are not decoded. They must be processed by separate function.
|
||||
6. Whitespaces in the text are collapsed or inserted by specific rules.
|
||||
- Whitespaces at the beginning and at the end are removed.
|
||||
- Consecutive whitespaces are collapsed.
|
||||
- But if the text is separated by other elements and there is no whitespace, it is inserted.
|
||||
- It may cause unnatural examples: `Hello<b>world</b>`, `Hello<!-- -->world` - there is no whitespace in HTML, but the function inserts it. Also consider: `Hello<p>world</p>`, `Hello<br>world`. This behavior is reasonable for data analysis, e.g. to convert HTML to a bag of words.
|
||||
7. Also note that correct handling of whitespaces requires the support of `<pre></pre>` and CSS `display` and `white-space` properties.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
extractTextFromHTML(x)
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `x` — input text. [String](../../sql-reference/data-types/string.md).
|
||||
|
||||
**Returned value**
|
||||
|
||||
- Extracted text.
|
||||
|
||||
Type: [String](../../sql-reference/data-types/string.md).
|
||||
|
||||
**Example**
|
||||
|
||||
The first example contains several tags and a comment and also shows whitespace processing.
|
||||
The second example shows `CDATA` and `script` tag processing.
|
||||
In the third example text is extracted from the full HTML response received by the [url](../../sql-reference/table-functions/url.md) function.
|
||||
|
||||
Query:
|
||||
|
||||
``` sql
|
||||
SELECT extractTextFromHTML(' <p> A text <i>with</i><b>tags</b>. <!-- comments --> </p> ');
|
||||
SELECT extractTextFromHTML('<![CDATA[The content within <b>CDATA</b>]]> <script>alert("Script");</script>');
|
||||
SELECT extractTextFromHTML(html) FROM url('http://www.donothingfor2minutes.com/', RawBLOB, 'html String');
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
``` text
|
||||
A text with tags .
|
||||
The content within <b>CDATA</b>
|
||||
Do Nothing for 2 Minutes 2:00
|
||||
```
|
||||
|
@ -5,6 +5,14 @@ toc_title: RENAME
|
||||
|
||||
# RENAME Statement {#misc_operations-rename}
|
||||
|
||||
## RENAME DATABASE {#misc_operations-rename_database}
|
||||
Renames database, support only for Atomic database engine
|
||||
|
||||
```
|
||||
RENAME DATABASE atomic_database1 TO atomic_database2 [ON CLUSTER cluster]
|
||||
```
|
||||
|
||||
## RENAME TABLE {#misc_operations-rename_table}
|
||||
Renames one or more tables.
|
||||
|
||||
``` sql
|
||||
|
@ -645,3 +645,66 @@ SELECT decodeXMLComponent('< Σ >');
|
||||
|
||||
- [Мнемоники в HTML](https://ru.wikipedia.org/wiki/%D0%9C%D0%BD%D0%B5%D0%BC%D0%BE%D0%BD%D0%B8%D0%BA%D0%B8_%D0%B2_HTML)
|
||||
|
||||
|
||||
|
||||
## extractTextFromHTML {#extracttextfromhtml}
|
||||
|
||||
Функция для извлечения текста из HTML или XHTML.
|
||||
Она не соответствует всем HTML, XML или XHTML стандартам на 100%, но ее реализация достаточно точная и быстрая. Правила обработки следующие:
|
||||
|
||||
1. Комментарии удаляются. Пример: `<!-- test -->`. Комментарий должен оканчиваться символами `-->`. Вложенные комментарии недопустимы.
|
||||
Примечание: конструкции наподобие `<!-->` и `<!--->` не являются допустимыми комментариями в HTML, но они будут удалены согласно другим правилам.
|
||||
2. Содержимое CDATA вставляется дословно. Примечание: формат CDATA специфичен для XML/XHTML. Но он обрабатывается всегда по принципу "наилучшего возможного результата".
|
||||
3. Элементы `script` и `style` удаляются вместе со всем содержимым. Примечание: предполагается, что закрывающий тег не может появиться внутри содержимого. Например, в JS строковый литерал должен быть экранирован как `"<\/script>"`.
|
||||
Примечание: комментарии и CDATA возможны внутри `script` или `style` - тогда закрывающие теги не ищутся внутри CDATA. Пример: `<script><![CDATA[</script>]]></script>`. Но они ищутся внутри комментариев. Иногда возникают сложные случаи: `<script>var x = "<!--"; </script> var y = "-->"; alert(x + y);</script>`
|
||||
Примечание: `script` и `style` могут быть названиями пространств имен XML - тогда они не обрабатываются как обычные элементы `script` или `style`. Пример: `<script:a>Hello</script:a>`.
|
||||
Примечание: пробелы возможны после имени закрывающего тега: `</script >`, но не перед ним: `< / script>`.
|
||||
4. Другие теги или элементы, подобные тегам, удаляются, а их внутреннее содержимое остается. Пример: `<a>.</a>`
|
||||
Примечание: ожидается, что такой HTML является недопустимым: `<a test=">"></a>`
|
||||
Примечание: функция также удаляет подобные тегам элементы: `<>`, `<!>`, и т. д.
|
||||
Примечание: если встречается тег без завершающего символа `>`, то удаляется этот тег и весь следующий за ним текст: `<hello `
|
||||
5. Мнемоники HTML и XML не декодируются. Они должны быть обработаны отдельной функцией.
|
||||
6. Пробелы в тексте удаляются и добавляются по следующим правилам:
|
||||
- Пробелы в начале и в конце извлеченного текста удаляются.
|
||||
- Несколько пробелов подряд заменяются одним пробелом.
|
||||
- Если текст разделен другими удаляемыми элементами и в этом месте нет пробела, он добавляется.
|
||||
- Это может привести к появлению неестественного написания, например: `Hello<b>world</b>`, `Hello<!-- -->world` — в HTML нет пробелов, но функция вставляет их. Также следует учитывать такие варианты написания: `Hello<p>world</p>`, `Hello<br>world`. Подобные результаты выполнения функции могут использоваться для анализа данных, например, для преобразования HTML-текста в набор используемых слов.
|
||||
7. Также обратите внимание, что правильная обработка пробелов требует поддержки `<pre></pre>` и свойств CSS `display` и `white-space`.
|
||||
|
||||
**Синтаксис**
|
||||
|
||||
``` sql
|
||||
extractTextFromHTML(x)
|
||||
```
|
||||
|
||||
**Аргументы**
|
||||
|
||||
- `x` — текст для обработки. [String](../../sql-reference/data-types/string.md).
|
||||
|
||||
**Возвращаемое значение**
|
||||
|
||||
- Извлеченный текст.
|
||||
|
||||
Тип: [String](../../sql-reference/data-types/string.md).
|
||||
|
||||
**Пример**
|
||||
|
||||
Первый пример содержит несколько тегов и комментарий. На этом примере также видно, как обрабатываются пробелы.
|
||||
Второй пример показывает обработку `CDATA` и тега `script`.
|
||||
В третьем примере текст выделяется из полного HTML ответа, полученного с помощью функции [url](../../sql-reference/table-functions/url.md).
|
||||
|
||||
Запрос:
|
||||
|
||||
``` sql
|
||||
SELECT extractTextFromHTML(' <p> A text <i>with</i><b>tags</b>. <!-- comments --> </p> ');
|
||||
SELECT extractTextFromHTML('<![CDATA[The content within <b>CDATA</b>]]> <script>alert("Script");</script>');
|
||||
SELECT extractTextFromHTML(html) FROM url('http://www.donothingfor2minutes.com/', RawBLOB, 'html String');
|
||||
```
|
||||
|
||||
Результат:
|
||||
|
||||
``` text
|
||||
A text with tags .
|
||||
The content within <b>CDATA</b>
|
||||
Do Nothing for 2 Minutes 2:00
|
||||
```
|
||||
|
@ -3,8 +3,16 @@ toc_priority: 48
|
||||
toc_title: RENAME
|
||||
---
|
||||
|
||||
# RENAME {#misc_operations-rename}
|
||||
# RENAME Statement {#misc_operations-rename}
|
||||
|
||||
## RENAME DATABASE {#misc_operations-rename_database}
|
||||
Переименование базы данных
|
||||
|
||||
```
|
||||
RENAME DATABASE atomic_database1 TO atomic_database2 [ON CLUSTER cluster]
|
||||
```
|
||||
|
||||
## RENAME TABLE {#misc_operations-rename_table}
|
||||
Переименовывает одну или несколько таблиц.
|
||||
|
||||
``` sql
|
||||
@ -12,5 +20,3 @@ RENAME TABLE [db11.]name11 TO [db12.]name12, [db21.]name21 TO [db22.]name22, ...
|
||||
```
|
||||
|
||||
Переименовывание таблицы является лёгкой операцией. Если вы указали после `TO` другую базу данных, то таблица будет перенесена в эту базу данных. При этом, директории с базами данных должны быть расположены в одной файловой системе (иначе возвращается ошибка). В случае переименования нескольких таблиц в одном запросе — это неатомарная операция, может выполнится частично, запросы в других сессиях могут получить ошибку `Table ... doesn't exist...`.
|
||||
|
||||
|
||||
|
@ -95,8 +95,8 @@ public:
|
||||
comparison_info_total.emplace_back(std::make_shared<Stats>());
|
||||
}
|
||||
|
||||
global_context.makeGlobalContext();
|
||||
global_context.setSettings(settings);
|
||||
global_context->makeGlobalContext();
|
||||
global_context->setSettings(settings);
|
||||
|
||||
std::cerr << std::fixed << std::setprecision(3);
|
||||
|
||||
@ -159,7 +159,7 @@ private:
|
||||
bool print_stacktrace;
|
||||
const Settings & settings;
|
||||
SharedContextHolder shared_context;
|
||||
Context global_context;
|
||||
ContextPtr global_context;
|
||||
QueryProcessingStage::Enum query_processing_stage;
|
||||
|
||||
/// Don't execute new queries after timelimit or SIGINT or exception
|
||||
|
@ -191,7 +191,7 @@ private:
|
||||
bool has_vertical_output_suffix = false; /// Is \G present at the end of the query string?
|
||||
|
||||
SharedContextHolder shared_context = Context::createShared();
|
||||
Context context = Context::createGlobal(shared_context.get());
|
||||
ContextPtr context = Context::createGlobal(shared_context.get());
|
||||
|
||||
/// Buffer that reads from stdin in batch mode.
|
||||
ReadBufferFromFileDescriptor std_in {STDIN_FILENO};
|
||||
@ -274,20 +274,20 @@ private:
|
||||
|
||||
configReadClient(config(), home_path);
|
||||
|
||||
context.setApplicationType(Context::ApplicationType::CLIENT);
|
||||
context.setQueryParameters(query_parameters);
|
||||
context->setApplicationType(Context::ApplicationType::CLIENT);
|
||||
context->setQueryParameters(query_parameters);
|
||||
|
||||
/// settings and limits could be specified in config file, but passed settings has higher priority
|
||||
for (const auto & setting : context.getSettingsRef().allUnchanged())
|
||||
for (const auto & setting : context->getSettingsRef().allUnchanged())
|
||||
{
|
||||
const auto & name = setting.getName();
|
||||
if (config().has(name))
|
||||
context.setSetting(name, config().getString(name));
|
||||
context->setSetting(name, config().getString(name));
|
||||
}
|
||||
|
||||
/// Set path for format schema files
|
||||
if (config().has("format_schema_path"))
|
||||
context.setFormatSchemaPath(Poco::Path(config().getString("format_schema_path")).toString());
|
||||
context->setFormatSchemaPath(Poco::Path(config().getString("format_schema_path")).toString());
|
||||
|
||||
/// Initialize query_id_formats if any
|
||||
if (config().has("query_id_formats"))
|
||||
@ -538,15 +538,15 @@ private:
|
||||
else
|
||||
format = config().getString("format", is_interactive ? "PrettyCompact" : "TabSeparated");
|
||||
|
||||
format_max_block_size = config().getInt("format_max_block_size", context.getSettingsRef().max_block_size);
|
||||
format_max_block_size = config().getInt("format_max_block_size", context->getSettingsRef().max_block_size);
|
||||
|
||||
insert_format = "Values";
|
||||
|
||||
/// Setting value from cmd arg overrides one from config
|
||||
if (context.getSettingsRef().max_insert_block_size.changed)
|
||||
insert_format_max_block_size = context.getSettingsRef().max_insert_block_size;
|
||||
if (context->getSettingsRef().max_insert_block_size.changed)
|
||||
insert_format_max_block_size = context->getSettingsRef().max_insert_block_size;
|
||||
else
|
||||
insert_format_max_block_size = config().getInt("insert_format_max_block_size", context.getSettingsRef().max_insert_block_size);
|
||||
insert_format_max_block_size = config().getInt("insert_format_max_block_size", context->getSettingsRef().max_insert_block_size);
|
||||
|
||||
if (!is_interactive)
|
||||
{
|
||||
@ -555,7 +555,7 @@ private:
|
||||
ignore_error = config().getBool("ignore-error", false);
|
||||
}
|
||||
|
||||
ClientInfo & client_info = context.getClientInfo();
|
||||
ClientInfo & client_info = context->getClientInfo();
|
||||
client_info.setInitialQuery();
|
||||
client_info.quota_key = config().getString("quota_key", "");
|
||||
|
||||
@ -563,7 +563,7 @@ private:
|
||||
|
||||
/// Initialize DateLUT here to avoid counting time spent here as query execution time.
|
||||
const auto local_tz = DateLUT::instance().getTimeZone();
|
||||
if (!context.getSettingsRef().use_client_time_zone)
|
||||
if (!context->getSettingsRef().use_client_time_zone)
|
||||
{
|
||||
const auto & time_zone = connection->getServerTimezone(connection_parameters.timeouts);
|
||||
if (!time_zone.empty())
|
||||
@ -738,7 +738,7 @@ private:
|
||||
{
|
||||
auto query_id = config().getString("query_id", "");
|
||||
if (!query_id.empty())
|
||||
context.setCurrentQueryId(query_id);
|
||||
context->setCurrentQueryId(query_id);
|
||||
|
||||
nonInteractive();
|
||||
|
||||
@ -1038,7 +1038,7 @@ private:
|
||||
{
|
||||
Tokens tokens(this_query_begin, all_queries_end);
|
||||
IParser::Pos token_iterator(tokens,
|
||||
context.getSettingsRef().max_parser_depth);
|
||||
context->getSettingsRef().max_parser_depth);
|
||||
if (!token_iterator.isValid())
|
||||
{
|
||||
break;
|
||||
@ -1087,7 +1087,7 @@ private:
|
||||
if (ignore_error)
|
||||
{
|
||||
Tokens tokens(this_query_begin, all_queries_end);
|
||||
IParser::Pos token_iterator(tokens, context.getSettingsRef().max_parser_depth);
|
||||
IParser::Pos token_iterator(tokens, context->getSettingsRef().max_parser_depth);
|
||||
while (token_iterator->type != TokenType::Semicolon && token_iterator.isValid())
|
||||
++token_iterator;
|
||||
this_query_begin = token_iterator->end;
|
||||
@ -1133,7 +1133,7 @@ private:
|
||||
// beneficial so that we see proper trailing comments in "echo" and
|
||||
// server log.
|
||||
adjustQueryEnd(this_query_end, all_queries_end,
|
||||
context.getSettingsRef().max_parser_depth);
|
||||
context->getSettingsRef().max_parser_depth);
|
||||
|
||||
// full_query is the query + inline INSERT data + trailing comments
|
||||
// (the latter is our best guess for now).
|
||||
@ -1173,7 +1173,7 @@ private:
|
||||
{
|
||||
this_query_end = insert_ast->end;
|
||||
adjustQueryEnd(this_query_end, all_queries_end,
|
||||
context.getSettingsRef().max_parser_depth);
|
||||
context->getSettingsRef().max_parser_depth);
|
||||
}
|
||||
|
||||
// Now we know for sure where the query ends.
|
||||
@ -1290,7 +1290,7 @@ private:
|
||||
// Prints changed settings to stderr. Useful for debugging fuzzing failures.
|
||||
void printChangedSettings() const
|
||||
{
|
||||
const auto & changes = context.getSettingsRef().changes();
|
||||
const auto & changes = context->getSettingsRef().changes();
|
||||
if (!changes.empty())
|
||||
{
|
||||
fmt::print(stderr, "Changed settings: ");
|
||||
@ -1590,11 +1590,11 @@ private:
|
||||
if (is_interactive)
|
||||
{
|
||||
// Generate a new query_id
|
||||
context.setCurrentQueryId("");
|
||||
context->setCurrentQueryId("");
|
||||
for (const auto & query_id_format : query_id_formats)
|
||||
{
|
||||
writeString(query_id_format.first, std_out);
|
||||
writeString(fmt::format(query_id_format.second, fmt::arg("query_id", context.getCurrentQueryId())), std_out);
|
||||
writeString(fmt::format(query_id_format.second, fmt::arg("query_id", context->getCurrentQueryId())), std_out);
|
||||
writeChar('\n', std_out);
|
||||
std_out.next();
|
||||
}
|
||||
@ -1610,12 +1610,12 @@ private:
|
||||
{
|
||||
/// Temporarily apply query settings to context.
|
||||
std::optional<Settings> old_settings;
|
||||
SCOPE_EXIT_SAFE({ if (old_settings) context.setSettings(*old_settings); });
|
||||
SCOPE_EXIT_SAFE({ if (old_settings) context->setSettings(*old_settings); });
|
||||
auto apply_query_settings = [&](const IAST & settings_ast)
|
||||
{
|
||||
if (!old_settings)
|
||||
old_settings.emplace(context.getSettingsRef());
|
||||
context.applySettingsChanges(settings_ast.as<ASTSetQuery>()->changes);
|
||||
old_settings.emplace(context->getSettingsRef());
|
||||
context->applySettingsChanges(settings_ast.as<ASTSetQuery>()->changes);
|
||||
};
|
||||
const auto * insert = parsed_query->as<ASTInsertQuery>();
|
||||
if (insert && insert->settings_ast)
|
||||
@ -1653,7 +1653,7 @@ private:
|
||||
if (change.name == "profile")
|
||||
current_profile = change.value.safeGet<String>();
|
||||
else
|
||||
context.applySettingChange(change);
|
||||
context->applySettingChange(change);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1725,10 +1725,10 @@ private:
|
||||
connection->sendQuery(
|
||||
connection_parameters.timeouts,
|
||||
query_to_send,
|
||||
context.getCurrentQueryId(),
|
||||
context->getCurrentQueryId(),
|
||||
query_processing_stage,
|
||||
&context.getSettingsRef(),
|
||||
&context.getClientInfo(),
|
||||
&context->getSettingsRef(),
|
||||
&context->getClientInfo(),
|
||||
true);
|
||||
|
||||
sendExternalTables();
|
||||
@ -1766,10 +1766,10 @@ private:
|
||||
connection->sendQuery(
|
||||
connection_parameters.timeouts,
|
||||
query_to_send,
|
||||
context.getCurrentQueryId(),
|
||||
context->getCurrentQueryId(),
|
||||
query_processing_stage,
|
||||
&context.getSettingsRef(),
|
||||
&context.getClientInfo(),
|
||||
&context->getSettingsRef(),
|
||||
&context->getClientInfo(),
|
||||
true);
|
||||
|
||||
sendExternalTables();
|
||||
@ -1792,7 +1792,7 @@ private:
|
||||
ParserQuery parser(end);
|
||||
ASTPtr res;
|
||||
|
||||
const auto & settings = context.getSettingsRef();
|
||||
const auto & settings = context->getSettingsRef();
|
||||
size_t max_length = 0;
|
||||
if (!allow_multi_statements)
|
||||
max_length = settings.max_query_size;
|
||||
@ -1880,7 +1880,7 @@ private:
|
||||
current_format = insert->format;
|
||||
}
|
||||
|
||||
BlockInputStreamPtr block_input = context.getInputFormat(
|
||||
BlockInputStreamPtr block_input = context->getInputFormat(
|
||||
current_format, buf, sample, insert_format_max_block_size);
|
||||
|
||||
if (columns_description.hasDefaults())
|
||||
@ -2204,9 +2204,9 @@ private:
|
||||
|
||||
/// It is not clear how to write progress with parallel formatting. It may increase code complexity significantly.
|
||||
if (!need_render_progress)
|
||||
block_out_stream = context.getOutputStreamParallelIfPossible(current_format, *out_buf, block);
|
||||
block_out_stream = context->getOutputStreamParallelIfPossible(current_format, *out_buf, block);
|
||||
else
|
||||
block_out_stream = context.getOutputStream(current_format, *out_buf, block);
|
||||
block_out_stream = context->getOutputStream(current_format, *out_buf, block);
|
||||
|
||||
block_out_stream->writePrefix();
|
||||
}
|
||||
@ -2710,12 +2710,12 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
context.makeGlobalContext();
|
||||
context.setSettings(cmd_settings);
|
||||
context->makeGlobalContext();
|
||||
context->setSettings(cmd_settings);
|
||||
|
||||
/// Copy settings-related program options to config.
|
||||
/// TODO: Is this code necessary?
|
||||
for (const auto & setting : context.getSettingsRef().all())
|
||||
for (const auto & setting : context->getSettingsRef().all())
|
||||
{
|
||||
const auto & name = setting.getName();
|
||||
if (options.count(name))
|
||||
@ -2807,7 +2807,7 @@ public:
|
||||
{
|
||||
std::string traceparent = options["opentelemetry-traceparent"].as<std::string>();
|
||||
std::string error;
|
||||
if (!context.getClientInfo().client_trace_context.parseTraceparentHeader(
|
||||
if (!context->getClientInfo().client_trace_context.parseTraceparentHeader(
|
||||
traceparent, error))
|
||||
{
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
@ -2818,7 +2818,7 @@ public:
|
||||
|
||||
if (options.count("opentelemetry-tracestate"))
|
||||
{
|
||||
context.getClientInfo().client_trace_context.tracestate =
|
||||
context->getClientInfo().client_trace_context.tracestate =
|
||||
options["opentelemetry-tracestate"].as<std::string>();
|
||||
}
|
||||
|
||||
|
@ -22,7 +22,7 @@ namespace ErrorCodes
|
||||
|
||||
void ClusterCopier::init()
|
||||
{
|
||||
auto zookeeper = context.getZooKeeper();
|
||||
auto zookeeper = getContext()->getZooKeeper();
|
||||
|
||||
task_description_watch_callback = [this] (const Coordination::WatchResponse & response)
|
||||
{
|
||||
@ -39,14 +39,14 @@ void ClusterCopier::init()
|
||||
task_cluster_initial_config = task_cluster_current_config;
|
||||
|
||||
task_cluster->loadTasks(*task_cluster_initial_config);
|
||||
context.setClustersConfig(task_cluster_initial_config, task_cluster->clusters_prefix);
|
||||
getContext()->setClustersConfig(task_cluster_initial_config, task_cluster->clusters_prefix);
|
||||
|
||||
/// Set up shards and their priority
|
||||
task_cluster->random_engine.seed(task_cluster->random_device());
|
||||
for (auto & task_table : task_cluster->table_tasks)
|
||||
{
|
||||
task_table.cluster_pull = context.getCluster(task_table.cluster_pull_name);
|
||||
task_table.cluster_push = context.getCluster(task_table.cluster_push_name);
|
||||
task_table.cluster_pull = getContext()->getCluster(task_table.cluster_pull_name);
|
||||
task_table.cluster_push = getContext()->getCluster(task_table.cluster_push_name);
|
||||
task_table.initShards(task_cluster->random_engine);
|
||||
}
|
||||
|
||||
@ -206,7 +206,7 @@ void ClusterCopier::uploadTaskDescription(const std::string & task_path, const s
|
||||
if (task_config_str.empty())
|
||||
return;
|
||||
|
||||
auto zookeeper = context.getZooKeeper();
|
||||
auto zookeeper = getContext()->getZooKeeper();
|
||||
|
||||
zookeeper->createAncestors(local_task_description_path);
|
||||
auto code = zookeeper->tryCreate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent);
|
||||
@ -219,7 +219,7 @@ void ClusterCopier::uploadTaskDescription(const std::string & task_path, const s
|
||||
|
||||
void ClusterCopier::reloadTaskDescription()
|
||||
{
|
||||
auto zookeeper = context.getZooKeeper();
|
||||
auto zookeeper = getContext()->getZooKeeper();
|
||||
task_description_watch_zookeeper = zookeeper;
|
||||
|
||||
String task_config_str;
|
||||
@ -235,7 +235,7 @@ void ClusterCopier::reloadTaskDescription()
|
||||
|
||||
/// Setup settings
|
||||
task_cluster->reloadSettings(*config);
|
||||
context.setSettings(task_cluster->settings_common);
|
||||
getContext()->setSettings(task_cluster->settings_common);
|
||||
|
||||
task_cluster_current_config = config;
|
||||
task_description_current_stat = stat;
|
||||
@ -440,7 +440,7 @@ bool ClusterCopier::checkPartitionPieceIsDone(const TaskTable & task_table, cons
|
||||
{
|
||||
LOG_DEBUG(log, "Check that all shards processed partition {} piece {} successfully", partition_name, piece_number);
|
||||
|
||||
auto zookeeper = context.getZooKeeper();
|
||||
auto zookeeper = getContext()->getZooKeeper();
|
||||
|
||||
/// Collect all shards that contain partition piece number piece_number.
|
||||
Strings piece_status_paths;
|
||||
@ -532,7 +532,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
|
||||
|
||||
LOG_DEBUG(log, "Try to move {} to destination table", partition_name);
|
||||
|
||||
auto zookeeper = context.getZooKeeper();
|
||||
auto zookeeper = getContext()->getZooKeeper();
|
||||
|
||||
const auto current_partition_attach_is_active = task_table.getPartitionAttachIsActivePath(partition_name);
|
||||
const auto current_partition_attach_is_done = task_table.getPartitionAttachIsDonePath(partition_name);
|
||||
@ -1095,7 +1095,7 @@ TaskStatus ClusterCopier::tryCreateDestinationTable(const ConnectionTimeouts & t
|
||||
= rewriteCreateQueryStorage(task_shard->current_pull_table_create_query, task_table.table_push, task_table.engine_push_ast);
|
||||
auto & create = create_query_push_ast->as<ASTCreateQuery &>();
|
||||
create.if_not_exists = true;
|
||||
InterpreterCreateQuery::prepareOnClusterQuery(create, context, task_table.cluster_push_name);
|
||||
InterpreterCreateQuery::prepareOnClusterQuery(create, getContext(), task_table.cluster_push_name);
|
||||
String query = queryToString(create_query_push_ast);
|
||||
|
||||
LOG_DEBUG(log, "Create destination tables. Query: {}", query);
|
||||
@ -1211,7 +1211,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
|
||||
|
||||
auto split_table_for_current_piece = task_shard.list_of_split_tables_on_shard[current_piece_number];
|
||||
|
||||
auto zookeeper = context.getZooKeeper();
|
||||
auto zookeeper = getContext()->getZooKeeper();
|
||||
|
||||
const String piece_is_dirty_flag_path = partition_piece.getPartitionPieceIsDirtyPath();
|
||||
const String piece_is_dirty_cleaned_path = partition_piece.getPartitionPieceIsCleanedPath();
|
||||
@ -1262,7 +1262,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
|
||||
|
||||
ParserQuery p_query(query.data() + query.size());
|
||||
|
||||
const auto & settings = context.getSettingsRef();
|
||||
const auto & settings = getContext()->getSettingsRef();
|
||||
return parseQuery(p_query, query, settings.max_query_size, settings.max_parser_depth);
|
||||
};
|
||||
|
||||
@ -1366,10 +1366,10 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
|
||||
ASTPtr query_select_ast = get_select_query(split_table_for_current_piece, "count()", /*enable_splitting*/ true);
|
||||
UInt64 count;
|
||||
{
|
||||
Context local_context = context;
|
||||
auto local_context = Context::createCopy(context);
|
||||
// Use pull (i.e. readonly) settings, but fetch data from destination servers
|
||||
local_context.setSettings(task_cluster->settings_pull);
|
||||
local_context.setSetting("skip_unavailable_shards", true);
|
||||
local_context->setSettings(task_cluster->settings_pull);
|
||||
local_context->setSetting("skip_unavailable_shards", true);
|
||||
|
||||
Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_select_ast, local_context)->execute().getInputStream());
|
||||
count = (block) ? block.safeGetByPosition(0).column->getUInt(0) : 0;
|
||||
@ -1468,7 +1468,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
|
||||
query += "INSERT INTO " + getQuotedTable(split_table_for_current_piece) + " VALUES ";
|
||||
|
||||
ParserQuery p_query(query.data() + query.size());
|
||||
const auto & settings = context.getSettingsRef();
|
||||
const auto & settings = getContext()->getSettingsRef();
|
||||
query_insert_ast = parseQuery(p_query, query, settings.max_query_size, settings.max_parser_depth);
|
||||
|
||||
LOG_DEBUG(log, "Executing INSERT query: {}", query);
|
||||
@ -1476,18 +1476,18 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
|
||||
|
||||
try
|
||||
{
|
||||
std::unique_ptr<Context> context_select = std::make_unique<Context>(context);
|
||||
auto context_select = Context::createCopy(context);
|
||||
context_select->setSettings(task_cluster->settings_pull);
|
||||
|
||||
std::unique_ptr<Context> context_insert = std::make_unique<Context>(context);
|
||||
auto context_insert = Context::createCopy(context);
|
||||
context_insert->setSettings(task_cluster->settings_push);
|
||||
|
||||
/// Custom INSERT SELECT implementation
|
||||
BlockInputStreamPtr input;
|
||||
BlockOutputStreamPtr output;
|
||||
{
|
||||
BlockIO io_select = InterpreterFactory::get(query_select_ast, *context_select)->execute();
|
||||
BlockIO io_insert = InterpreterFactory::get(query_insert_ast, *context_insert)->execute();
|
||||
BlockIO io_select = InterpreterFactory::get(query_select_ast, context_select)->execute();
|
||||
BlockIO io_insert = InterpreterFactory::get(query_insert_ast, context_insert)->execute();
|
||||
|
||||
input = io_select.getInputStream();
|
||||
output = io_insert.out;
|
||||
@ -1581,7 +1581,7 @@ void ClusterCopier::dropAndCreateLocalTable(const ASTPtr & create_ast)
|
||||
const auto & create = create_ast->as<ASTCreateQuery &>();
|
||||
dropLocalTableIfExists({create.database, create.table});
|
||||
|
||||
InterpreterCreateQuery interpreter(create_ast, context);
|
||||
InterpreterCreateQuery interpreter(create_ast, getContext());
|
||||
interpreter.execute();
|
||||
}
|
||||
|
||||
@ -1592,7 +1592,7 @@ void ClusterCopier::dropLocalTableIfExists(const DatabaseAndTableName & table_na
|
||||
drop_ast->database = table_name.first;
|
||||
drop_ast->table = table_name.second;
|
||||
|
||||
InterpreterDropQuery interpreter(drop_ast, context);
|
||||
InterpreterDropQuery interpreter(drop_ast, getContext());
|
||||
interpreter.execute();
|
||||
}
|
||||
|
||||
@ -1654,8 +1654,8 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT
|
||||
|
||||
String ClusterCopier::getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings & settings)
|
||||
{
|
||||
Context remote_context(context);
|
||||
remote_context.setSettings(settings);
|
||||
auto remote_context = Context::createCopy(context);
|
||||
remote_context->setSettings(settings);
|
||||
|
||||
String query = "SHOW CREATE TABLE " + getQuotedTable(table);
|
||||
Block block = getBlockWithAllStreamData(std::make_shared<RemoteBlockInputStream>(
|
||||
@ -1674,7 +1674,7 @@ ASTPtr ClusterCopier::getCreateTableForPullShard(const ConnectionTimeouts & time
|
||||
task_cluster->settings_pull);
|
||||
|
||||
ParserCreateQuery parser_create_query;
|
||||
const auto & settings = context.getSettingsRef();
|
||||
const auto & settings = getContext()->getSettingsRef();
|
||||
return parseQuery(parser_create_query, create_query_pull_str, settings.max_query_size, settings.max_parser_depth);
|
||||
}
|
||||
|
||||
@ -1703,7 +1703,7 @@ void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeout
|
||||
/// Create special cluster with single shard
|
||||
String shard_read_cluster_name = read_shard_prefix + task_table.cluster_pull_name;
|
||||
ClusterPtr cluster_pull_current_shard = task_table.cluster_pull->getClusterWithSingleShard(task_shard.indexInCluster());
|
||||
context.setCluster(shard_read_cluster_name, cluster_pull_current_shard);
|
||||
getContext()->setCluster(shard_read_cluster_name, cluster_pull_current_shard);
|
||||
|
||||
auto storage_shard_ast = createASTStorageDistributed(shard_read_cluster_name, task_table.table_pull.first, task_table.table_pull.second);
|
||||
|
||||
@ -1763,13 +1763,13 @@ std::set<String> ClusterCopier::getShardPartitions(const ConnectionTimeouts & ti
|
||||
}
|
||||
|
||||
ParserQuery parser_query(query.data() + query.size());
|
||||
const auto & settings = context.getSettingsRef();
|
||||
const auto & settings = getContext()->getSettingsRef();
|
||||
ASTPtr query_ast = parseQuery(parser_query, query, settings.max_query_size, settings.max_parser_depth);
|
||||
|
||||
LOG_DEBUG(log, "Computing destination partition set, executing query: {}", query);
|
||||
|
||||
Context local_context = context;
|
||||
local_context.setSettings(task_cluster->settings_pull);
|
||||
auto local_context = Context::createCopy(context);
|
||||
local_context->setSettings(task_cluster->settings_pull);
|
||||
Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_ast, local_context)->execute().getInputStream());
|
||||
|
||||
if (block)
|
||||
@ -1809,11 +1809,11 @@ bool ClusterCopier::checkShardHasPartition(const ConnectionTimeouts & timeouts,
|
||||
LOG_DEBUG(log, "Checking shard {} for partition {} existence, executing query: {}", task_shard.getDescription(), partition_quoted_name, query);
|
||||
|
||||
ParserQuery parser_query(query.data() + query.size());
|
||||
const auto & settings = context.getSettingsRef();
|
||||
const auto & settings = getContext()->getSettingsRef();
|
||||
ASTPtr query_ast = parseQuery(parser_query, query, settings.max_query_size, settings.max_parser_depth);
|
||||
|
||||
Context local_context = context;
|
||||
local_context.setSettings(task_cluster->settings_pull);
|
||||
auto local_context = Context::createCopy(context);
|
||||
local_context->setSettings(task_cluster->settings_pull);
|
||||
return InterpreterFactory::get(query_ast, local_context)->execute().getInputStream()->read().rows() != 0;
|
||||
}
|
||||
|
||||
@ -1848,11 +1848,11 @@ bool ClusterCopier::checkPresentPartitionPiecesOnCurrentShard(const ConnectionTi
|
||||
LOG_DEBUG(log, "Checking shard {} for partition {} piece {} existence, executing query: {}", task_shard.getDescription(), partition_quoted_name, std::to_string(current_piece_number), query);
|
||||
|
||||
ParserQuery parser_query(query.data() + query.size());
|
||||
const auto & settings = context.getSettingsRef();
|
||||
const auto & settings = getContext()->getSettingsRef();
|
||||
ASTPtr query_ast = parseQuery(parser_query, query, settings.max_query_size, settings.max_parser_depth);
|
||||
|
||||
Context local_context = context;
|
||||
local_context.setSettings(task_cluster->settings_pull);
|
||||
auto local_context = Context::createCopy(context);
|
||||
local_context->setSettings(task_cluster->settings_pull);
|
||||
auto result = InterpreterFactory::get(query_ast, local_context)->execute().getInputStream()->read().rows();
|
||||
if (result != 0)
|
||||
LOG_DEBUG(log, "Partition {} piece number {} is PRESENT on shard {}", partition_quoted_name, std::to_string(current_piece_number), task_shard.getDescription());
|
||||
@ -1908,7 +1908,7 @@ UInt64 ClusterCopier::executeQueryOnCluster(
|
||||
/// In that case we don't have local replicas, but do it just in case
|
||||
for (UInt64 i = 0; i < num_local_replicas; ++i)
|
||||
{
|
||||
auto interpreter = InterpreterFactory::get(query_ast, context);
|
||||
auto interpreter = InterpreterFactory::get(query_ast, getContext());
|
||||
interpreter->execute();
|
||||
|
||||
if (increment_and_check_exit())
|
||||
@ -1923,8 +1923,8 @@ UInt64 ClusterCopier::executeQueryOnCluster(
|
||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(shard_settings).getSaturated(shard_settings.max_execution_time);
|
||||
auto connections = shard.pool->getMany(timeouts, &shard_settings, pool_mode);
|
||||
|
||||
Context shard_context(context);
|
||||
shard_context.setSettings(shard_settings);
|
||||
auto shard_context = Context::createCopy(context);
|
||||
shard_context->setSettings(shard_settings);
|
||||
|
||||
for (auto & connection : connections)
|
||||
{
|
||||
|
@ -12,18 +12,17 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ClusterCopier
|
||||
class ClusterCopier : WithContext
|
||||
{
|
||||
public:
|
||||
ClusterCopier(const String & task_path_,
|
||||
const String & host_id_,
|
||||
const String & proxy_database_name_,
|
||||
Context & context_)
|
||||
:
|
||||
ContextPtr context_)
|
||||
: WithContext(context_),
|
||||
task_zookeeper_path(task_path_),
|
||||
host_id(host_id_),
|
||||
working_database_name(proxy_database_name_),
|
||||
context(context_),
|
||||
log(&Poco::Logger::get("ClusterCopier")) {}
|
||||
|
||||
void init();
|
||||
@ -36,7 +35,7 @@ public:
|
||||
/// Compute set of partitions, assume set of partitions aren't changed during the processing
|
||||
void discoverTablePartitions(const ConnectionTimeouts & timeouts, TaskTable & task_table, UInt64 num_threads = 0);
|
||||
|
||||
void uploadTaskDescription(const std::string & task_path, const std::string & task_file, const bool force);
|
||||
void uploadTaskDescription(const std::string & task_path, const std::string & task_file, bool force);
|
||||
|
||||
void reloadTaskDescription();
|
||||
|
||||
@ -120,7 +119,7 @@ protected:
|
||||
/// Removes MATERIALIZED and ALIAS columns from create table query
|
||||
static ASTPtr removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast);
|
||||
|
||||
bool tryDropPartitionPiece(ShardPartition & task_partition, const size_t current_piece_number,
|
||||
bool tryDropPartitionPiece(ShardPartition & task_partition, size_t current_piece_number,
|
||||
const zkutil::ZooKeeperPtr & zookeeper, const CleanStateClock & clean_state_clock);
|
||||
|
||||
static constexpr UInt64 max_table_tries = 3;
|
||||
@ -141,7 +140,7 @@ protected:
|
||||
|
||||
TaskStatus processPartitionPieceTaskImpl(const ConnectionTimeouts & timeouts,
|
||||
ShardPartition & task_partition,
|
||||
const size_t current_piece_number,
|
||||
size_t current_piece_number,
|
||||
bool is_unprioritized_task);
|
||||
|
||||
void dropAndCreateLocalTable(const ASTPtr & create_ast);
|
||||
@ -219,7 +218,6 @@ private:
|
||||
|
||||
bool experimental_use_sample_offset{false};
|
||||
|
||||
Context & context;
|
||||
Poco::Logger * log;
|
||||
|
||||
std::chrono::milliseconds default_sleep_time{1000};
|
||||
|
@ -111,7 +111,7 @@ void ClusterCopierApp::mainImpl()
|
||||
LOG_INFO(log, "Starting clickhouse-copier (id {}, host_id {}, path {}, revision {})", process_id, host_id, process_path, ClickHouseRevision::getVersionRevision());
|
||||
|
||||
SharedContextHolder shared_context = Context::createShared();
|
||||
auto context = std::make_unique<Context>(Context::createGlobal(shared_context.get()));
|
||||
auto context = Context::createGlobal(shared_context.get());
|
||||
context->makeGlobalContext();
|
||||
SCOPE_EXIT_SAFE(context->shutdown());
|
||||
|
||||
@ -128,13 +128,13 @@ void ClusterCopierApp::mainImpl()
|
||||
registerFormats();
|
||||
|
||||
static const std::string default_database = "_local";
|
||||
DatabaseCatalog::instance().attachDatabase(default_database, std::make_shared<DatabaseMemory>(default_database, *context));
|
||||
DatabaseCatalog::instance().attachDatabase(default_database, std::make_shared<DatabaseMemory>(default_database, context));
|
||||
context->setCurrentDatabase(default_database);
|
||||
|
||||
/// Initialize query scope just in case.
|
||||
CurrentThread::QueryScope query_scope(*context);
|
||||
CurrentThread::QueryScope query_scope(context);
|
||||
|
||||
auto copier = std::make_unique<ClusterCopier>(task_path, host_id, default_database, *context);
|
||||
auto copier = std::make_unique<ClusterCopier>(task_path, host_id, default_database, context);
|
||||
copier->setSafeMode(is_safe_mode);
|
||||
copier->setCopyFaultProbability(copy_fault_probability);
|
||||
copier->setMoveFaultProbability(move_fault_probability);
|
||||
|
@ -102,8 +102,8 @@ int mainEntryClickHouseFormat(int argc, char ** argv)
|
||||
}
|
||||
|
||||
SharedContextHolder shared_context = Context::createShared();
|
||||
Context context = Context::createGlobal(shared_context.get());
|
||||
context.makeGlobalContext();
|
||||
auto context = Context::createGlobal(shared_context.get());
|
||||
context->makeGlobalContext();
|
||||
|
||||
registerFunctions();
|
||||
registerAggregateFunctions();
|
||||
|
@ -16,7 +16,7 @@ namespace DB
|
||||
return std::make_unique<PingHandler>(keep_alive_timeout);
|
||||
|
||||
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
|
||||
return std::make_unique<LibraryRequestHandler>(keep_alive_timeout, context);
|
||||
return std::make_unique<LibraryRequestHandler>(keep_alive_timeout, getContext());
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
@ -12,17 +12,17 @@ class SharedLibraryHandler;
|
||||
using SharedLibraryHandlerPtr = std::shared_ptr<SharedLibraryHandler>;
|
||||
|
||||
/// Factory for '/ping', '/' handlers.
|
||||
class LibraryBridgeHandlerFactory : public HTTPRequestHandlerFactory
|
||||
class LibraryBridgeHandlerFactory : public HTTPRequestHandlerFactory, WithContext
|
||||
{
|
||||
public:
|
||||
LibraryBridgeHandlerFactory(
|
||||
const std::string & name_,
|
||||
size_t keep_alive_timeout_,
|
||||
Context & context_)
|
||||
: log(&Poco::Logger::get(name_))
|
||||
ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, log(&Poco::Logger::get(name_))
|
||||
, name(name_)
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
, context(context_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -32,7 +32,6 @@ private:
|
||||
Poco::Logger * log;
|
||||
std::string name;
|
||||
size_t keep_alive_timeout;
|
||||
Context & context;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -131,7 +131,7 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
}
|
||||
|
||||
ReadBufferFromString read_block_buf(params.get("null_values"));
|
||||
auto format = FormatFactory::instance().getInput(FORMAT, read_block_buf, *sample_block, context, DEFAULT_BLOCK_SIZE);
|
||||
auto format = FormatFactory::instance().getInput(FORMAT, read_block_buf, *sample_block, getContext(), DEFAULT_BLOCK_SIZE);
|
||||
auto reader = std::make_shared<InputStreamFromInputFormat>(format);
|
||||
auto sample_block_with_nulls = reader->read();
|
||||
|
||||
@ -176,7 +176,7 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
const auto & sample_block = library_handler->getSampleBlock();
|
||||
auto input = library_handler->loadAll();
|
||||
|
||||
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, context);
|
||||
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, getContext());
|
||||
copyData(*input, *output);
|
||||
}
|
||||
else if (method == "loadIds")
|
||||
@ -193,7 +193,7 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
const auto & sample_block = library_handler->getSampleBlock();
|
||||
auto input = library_handler->loadIds(ids);
|
||||
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, context);
|
||||
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, getContext());
|
||||
copyData(*input, *output);
|
||||
}
|
||||
else if (method == "loadKeys")
|
||||
@ -219,14 +219,14 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
}
|
||||
|
||||
auto & read_buf = request.getStream();
|
||||
auto format = FormatFactory::instance().getInput(FORMAT, read_buf, *requested_sample_block, context, DEFAULT_BLOCK_SIZE);
|
||||
auto format = FormatFactory::instance().getInput(FORMAT, read_buf, *requested_sample_block, getContext(), DEFAULT_BLOCK_SIZE);
|
||||
auto reader = std::make_shared<InputStreamFromInputFormat>(format);
|
||||
auto block = reader->read();
|
||||
|
||||
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
const auto & sample_block = library_handler->getSampleBlock();
|
||||
auto input = library_handler->loadKeys(block.getColumns());
|
||||
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, context);
|
||||
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, getContext());
|
||||
copyData(*input, *output);
|
||||
}
|
||||
}
|
||||
|
@ -17,16 +17,16 @@ namespace DB
|
||||
/// names of dictionary attributes, sample block to parse block of null values, block of null values. Everything is
|
||||
/// passed in binary format and is urlencoded. When dictionary is cloned, a new handler is created.
|
||||
/// Each handler is unique to dictionary.
|
||||
class LibraryRequestHandler : public HTTPRequestHandler
|
||||
class LibraryRequestHandler : public HTTPRequestHandler, WithContext
|
||||
{
|
||||
public:
|
||||
|
||||
LibraryRequestHandler(
|
||||
size_t keep_alive_timeout_,
|
||||
Context & context_)
|
||||
: log(&Poco::Logger::get("LibraryRequestHandler"))
|
||||
ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, log(&Poco::Logger::get("LibraryRequestHandler"))
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
, context(context_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -39,7 +39,6 @@ private:
|
||||
|
||||
Poco::Logger * log;
|
||||
size_t keep_alive_timeout;
|
||||
Context & context;
|
||||
};
|
||||
|
||||
|
||||
|
@ -12,12 +12,12 @@ class LibraryBridge : public IBridge
|
||||
{
|
||||
|
||||
protected:
|
||||
const std::string bridgeName() const override
|
||||
std::string bridgeName() const override
|
||||
{
|
||||
return "LibraryBridge";
|
||||
}
|
||||
|
||||
HandlerFactoryPtr getHandlerFactoryPtr(Context & context) const override
|
||||
HandlerFactoryPtr getHandlerFactoryPtr(ContextPtr context) const override
|
||||
{
|
||||
return std::make_shared<LibraryBridgeHandlerFactory>("LibraryRequestHandlerFactory-factory", keep_alive_timeout, context);
|
||||
}
|
||||
|
@ -99,9 +99,9 @@ void LocalServer::initialize(Poco::Util::Application & self)
|
||||
}
|
||||
}
|
||||
|
||||
void LocalServer::applyCmdSettings(Context & context)
|
||||
void LocalServer::applyCmdSettings(ContextPtr context)
|
||||
{
|
||||
context.applySettingsChanges(cmd_settings.changes());
|
||||
context->applySettingsChanges(cmd_settings.changes());
|
||||
}
|
||||
|
||||
/// If path is specified and not empty, will try to setup server environment and load existing metadata
|
||||
@ -176,7 +176,7 @@ void LocalServer::tryInitPath()
|
||||
}
|
||||
|
||||
|
||||
static void attachSystemTables(const Context & context)
|
||||
static void attachSystemTables(ContextPtr context)
|
||||
{
|
||||
DatabasePtr system_database = DatabaseCatalog::instance().tryGetDatabase(DatabaseCatalog::SYSTEM_DATABASE);
|
||||
if (!system_database)
|
||||
@ -211,7 +211,7 @@ try
|
||||
}
|
||||
|
||||
shared_context = Context::createShared();
|
||||
global_context = std::make_unique<Context>(Context::createGlobal(shared_context.get()));
|
||||
global_context = Context::createGlobal(shared_context.get());
|
||||
global_context->makeGlobalContext();
|
||||
global_context->setApplicationType(Context::ApplicationType::LOCAL);
|
||||
tryInitPath();
|
||||
@ -274,9 +274,9 @@ try
|
||||
* if such tables will not be dropped, clickhouse-server will not be able to load them due to security reasons.
|
||||
*/
|
||||
std::string default_database = config().getString("default_database", "_local");
|
||||
DatabaseCatalog::instance().attachDatabase(default_database, std::make_shared<DatabaseMemory>(default_database, *global_context));
|
||||
DatabaseCatalog::instance().attachDatabase(default_database, std::make_shared<DatabaseMemory>(default_database, global_context));
|
||||
global_context->setCurrentDatabase(default_database);
|
||||
applyCmdOptions(*global_context);
|
||||
applyCmdOptions(global_context);
|
||||
|
||||
if (config().has("path"))
|
||||
{
|
||||
@ -288,15 +288,15 @@ try
|
||||
LOG_DEBUG(log, "Loading metadata from {}", path);
|
||||
Poco::File(path + "data/").createDirectories();
|
||||
Poco::File(path + "metadata/").createDirectories();
|
||||
loadMetadataSystem(*global_context);
|
||||
attachSystemTables(*global_context);
|
||||
loadMetadata(*global_context);
|
||||
loadMetadataSystem(global_context);
|
||||
attachSystemTables(global_context);
|
||||
loadMetadata(global_context);
|
||||
DatabaseCatalog::instance().loadDatabases();
|
||||
LOG_DEBUG(log, "Loaded metadata.");
|
||||
}
|
||||
else if (!config().has("no-system-tables"))
|
||||
{
|
||||
attachSystemTables(*global_context);
|
||||
attachSystemTables(global_context);
|
||||
}
|
||||
|
||||
processQueries();
|
||||
@ -375,13 +375,13 @@ void LocalServer::processQueries()
|
||||
|
||||
/// we can't mutate global global_context (can lead to races, as it was already passed to some background threads)
|
||||
/// so we can't reuse it safely as a query context and need a copy here
|
||||
auto context = Context(*global_context);
|
||||
auto context = Context::createCopy(global_context);
|
||||
|
||||
context.makeSessionContext();
|
||||
context.makeQueryContext();
|
||||
context->makeSessionContext();
|
||||
context->makeQueryContext();
|
||||
|
||||
context.setUser("default", "", Poco::Net::SocketAddress{});
|
||||
context.setCurrentQueryId("");
|
||||
context->setUser("default", "", Poco::Net::SocketAddress{});
|
||||
context->setCurrentQueryId("");
|
||||
applyCmdSettings(context);
|
||||
|
||||
/// Use the same query_id (and thread group) for all queries
|
||||
@ -618,9 +618,9 @@ void LocalServer::init(int argc, char ** argv)
|
||||
argsToConfig(arguments, config(), 100);
|
||||
}
|
||||
|
||||
void LocalServer::applyCmdOptions(Context & context)
|
||||
void LocalServer::applyCmdOptions(ContextPtr context)
|
||||
{
|
||||
context.setDefaultFormat(config().getString("output-format", config().getString("format", "TSV")));
|
||||
context->setDefaultFormat(config().getString("output-format", config().getString("format", "TSV")));
|
||||
applyCmdSettings(context);
|
||||
}
|
||||
|
||||
|
@ -36,15 +36,15 @@ private:
|
||||
std::string getInitialCreateTableQuery();
|
||||
|
||||
void tryInitPath();
|
||||
void applyCmdOptions(Context & context);
|
||||
void applyCmdSettings(Context & context);
|
||||
void applyCmdOptions(ContextPtr context);
|
||||
void applyCmdSettings(ContextPtr context);
|
||||
void processQueries();
|
||||
void setupUsers();
|
||||
void cleanup();
|
||||
|
||||
protected:
|
||||
SharedContextHolder shared_context;
|
||||
std::unique_ptr<Context> global_context;
|
||||
ContextPtr global_context;
|
||||
|
||||
/// Settings specified via command line args
|
||||
Settings cmd_settings;
|
||||
|
@ -1129,8 +1129,8 @@ try
|
||||
}
|
||||
|
||||
SharedContextHolder shared_context = Context::createShared();
|
||||
Context context = Context::createGlobal(shared_context.get());
|
||||
context.makeGlobalContext();
|
||||
ContextPtr context = Context::createGlobal(shared_context.get());
|
||||
context->makeGlobalContext();
|
||||
|
||||
ReadBufferFromFileDescriptor file_in(STDIN_FILENO);
|
||||
WriteBufferFromFileDescriptor file_out(STDOUT_FILENO);
|
||||
@ -1152,7 +1152,7 @@ try
|
||||
if (!silent)
|
||||
std::cerr << "Training models\n";
|
||||
|
||||
BlockInputStreamPtr input = context.getInputFormat(input_format, file_in, header, max_block_size);
|
||||
BlockInputStreamPtr input = context->getInputFormat(input_format, file_in, header, max_block_size);
|
||||
|
||||
input->readPrefix();
|
||||
while (Block block = input->read())
|
||||
@ -1179,8 +1179,8 @@ try
|
||||
|
||||
file_in.seek(0, SEEK_SET);
|
||||
|
||||
BlockInputStreamPtr input = context.getInputFormat(input_format, file_in, header, max_block_size);
|
||||
BlockOutputStreamPtr output = context.getOutputStreamParallelIfPossible(output_format, file_out, header);
|
||||
BlockInputStreamPtr input = context->getInputFormat(input_format, file_in, header, max_block_size);
|
||||
BlockOutputStreamPtr output = context->getOutputStreamParallelIfPossible(output_format, file_out, header);
|
||||
|
||||
if (processed_rows + source_rows > limit)
|
||||
input = std::make_shared<LimitBlockInputStream>(input, limit - processed_rows, 0);
|
||||
|
@ -107,7 +107,7 @@ void ODBCColumnsInfoHandler::handleRequest(HTTPServerRequest & request, HTTPServ
|
||||
|
||||
auto connection = ODBCConnectionFactory::instance().get(
|
||||
validateODBCConnectionString(connection_string),
|
||||
context.getSettingsRef().odbc_bridge_connection_pool_size);
|
||||
getContext()->getSettingsRef().odbc_bridge_connection_pool_size);
|
||||
|
||||
nanodbc::catalog catalog(*connection);
|
||||
std::string catalog_name;
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#if USE_ODBC
|
||||
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Server/HTTP/HTTPRequestHandler.h>
|
||||
#include <Common/config.h>
|
||||
@ -11,13 +12,13 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ODBCColumnsInfoHandler : public HTTPRequestHandler
|
||||
class ODBCColumnsInfoHandler : public HTTPRequestHandler, WithContext
|
||||
{
|
||||
public:
|
||||
ODBCColumnsInfoHandler(size_t keep_alive_timeout_, const Context & context_)
|
||||
: log(&Poco::Logger::get("ODBCColumnsInfoHandler"))
|
||||
ODBCColumnsInfoHandler(size_t keep_alive_timeout_, ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, log(&Poco::Logger::get("ODBCColumnsInfoHandler"))
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
, context(context_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -26,7 +27,6 @@ public:
|
||||
private:
|
||||
Poco::Logger * log;
|
||||
size_t keep_alive_timeout;
|
||||
const Context & context;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -21,26 +21,26 @@ std::unique_ptr<HTTPRequestHandler> ODBCBridgeHandlerFactory::createRequestHandl
|
||||
|
||||
if (uri.getPath() == "/columns_info")
|
||||
#if USE_ODBC
|
||||
return std::make_unique<ODBCColumnsInfoHandler>(keep_alive_timeout, context);
|
||||
return std::make_unique<ODBCColumnsInfoHandler>(keep_alive_timeout, getContext());
|
||||
#else
|
||||
return nullptr;
|
||||
#endif
|
||||
else if (uri.getPath() == "/identifier_quote")
|
||||
#if USE_ODBC
|
||||
return std::make_unique<IdentifierQuoteHandler>(keep_alive_timeout, context);
|
||||
return std::make_unique<IdentifierQuoteHandler>(keep_alive_timeout, getContext());
|
||||
#else
|
||||
return nullptr;
|
||||
#endif
|
||||
else if (uri.getPath() == "/schema_allowed")
|
||||
#if USE_ODBC
|
||||
return std::make_unique<SchemaAllowedHandler>(keep_alive_timeout, context);
|
||||
return std::make_unique<SchemaAllowedHandler>(keep_alive_timeout, getContext());
|
||||
#else
|
||||
return nullptr;
|
||||
#endif
|
||||
else if (uri.getPath() == "/write")
|
||||
return std::make_unique<ODBCHandler>(keep_alive_timeout, context, "write");
|
||||
return std::make_unique<ODBCHandler>(keep_alive_timeout, getContext(), "write");
|
||||
else
|
||||
return std::make_unique<ODBCHandler>(keep_alive_timeout, context, "read");
|
||||
return std::make_unique<ODBCHandler>(keep_alive_timeout, getContext(), "read");
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
|
||||
#include "ColumnInfoHandler.h"
|
||||
#include "IdentifierQuoteHandler.h"
|
||||
@ -14,11 +14,14 @@ namespace DB
|
||||
/** Factory for '/ping', '/', '/columns_info', '/identifier_quote', '/schema_allowed' handlers.
|
||||
* Also stores Session pools for ODBC connections
|
||||
*/
|
||||
class ODBCBridgeHandlerFactory : public HTTPRequestHandlerFactory
|
||||
class ODBCBridgeHandlerFactory : public HTTPRequestHandlerFactory, WithContext
|
||||
{
|
||||
public:
|
||||
ODBCBridgeHandlerFactory(const std::string & name_, size_t keep_alive_timeout_, Context & context_)
|
||||
: log(&Poco::Logger::get(name_)), name(name_), keep_alive_timeout(keep_alive_timeout_), context(context_)
|
||||
ODBCBridgeHandlerFactory(const std::string & name_, size_t keep_alive_timeout_, ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, log(&Poco::Logger::get(name_))
|
||||
, name(name_)
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -28,7 +31,6 @@ private:
|
||||
Poco::Logger * log;
|
||||
std::string name;
|
||||
size_t keep_alive_timeout;
|
||||
Context & context;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -44,7 +44,7 @@ void IdentifierQuoteHandler::handleRequest(HTTPServerRequest & request, HTTPServ
|
||||
|
||||
auto connection = ODBCConnectionFactory::instance().get(
|
||||
validateODBCConnectionString(connection_string),
|
||||
context.getSettingsRef().odbc_bridge_connection_pool_size);
|
||||
getContext()->getSettingsRef().odbc_bridge_connection_pool_size);
|
||||
|
||||
auto identifier = getIdentifierQuote(*connection);
|
||||
|
||||
|
@ -11,11 +11,13 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class IdentifierQuoteHandler : public HTTPRequestHandler
|
||||
class IdentifierQuoteHandler : public HTTPRequestHandler, WithContext
|
||||
{
|
||||
public:
|
||||
IdentifierQuoteHandler(size_t keep_alive_timeout_, const Context & context_)
|
||||
: log(&Poco::Logger::get("IdentifierQuoteHandler")), keep_alive_timeout(keep_alive_timeout_), context(context_)
|
||||
IdentifierQuoteHandler(size_t keep_alive_timeout_, ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, log(&Poco::Logger::get("IdentifierQuoteHandler"))
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -24,7 +26,6 @@ public:
|
||||
private:
|
||||
Poco::Logger * log;
|
||||
size_t keep_alive_timeout;
|
||||
const Context & context;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -112,7 +112,7 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
|
||||
{
|
||||
auto connection = ODBCConnectionFactory::instance().get(
|
||||
validateODBCConnectionString(connection_string),
|
||||
context.getSettingsRef().odbc_bridge_connection_pool_size);
|
||||
getContext()->getSettingsRef().odbc_bridge_connection_pool_size);
|
||||
|
||||
if (mode == "write")
|
||||
{
|
||||
@ -135,9 +135,9 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
|
||||
quoting_style = getQuotingStyle(*connection);
|
||||
#endif
|
||||
auto & read_buf = request.getStream();
|
||||
auto input_format = FormatFactory::instance().getInput(format, read_buf, *sample_block, context, max_block_size);
|
||||
auto input_format = FormatFactory::instance().getInput(format, read_buf, *sample_block, getContext(), max_block_size);
|
||||
auto input_stream = std::make_shared<InputStreamFromInputFormat>(input_format);
|
||||
ODBCBlockOutputStream output_stream(*connection, db_name, table_name, *sample_block, context, quoting_style);
|
||||
ODBCBlockOutputStream output_stream(*connection, db_name, table_name, *sample_block, getContext(), quoting_style);
|
||||
copyData(*input_stream, output_stream);
|
||||
writeStringBinary("Ok.", out);
|
||||
}
|
||||
@ -146,7 +146,7 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
|
||||
std::string query = params.get("query");
|
||||
LOG_TRACE(log, "Query: {}", query);
|
||||
|
||||
BlockOutputStreamPtr writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, out, *sample_block, context);
|
||||
BlockOutputStreamPtr writer = FormatFactory::instance().getOutputStreamParallelIfPossible(format, out, *sample_block, getContext());
|
||||
ODBCBlockInputStream inp(*connection, query, *sample_block, max_block_size);
|
||||
copyData(inp, *writer);
|
||||
}
|
||||
|
@ -1,10 +1,14 @@
|
||||
#pragma once
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Server/HTTP/HTTPRequestHandler.h>
|
||||
#include <Poco/Logger.h>
|
||||
|
||||
|
||||
#include <mutex>
|
||||
#include <unordered_map>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
/** Main handler for requests to ODBC driver
|
||||
@ -12,16 +16,16 @@ namespace DB
|
||||
* and also query in request body
|
||||
* response in RowBinary format
|
||||
*/
|
||||
class ODBCHandler : public HTTPRequestHandler
|
||||
class ODBCHandler : public HTTPRequestHandler, WithContext
|
||||
{
|
||||
public:
|
||||
ODBCHandler(
|
||||
size_t keep_alive_timeout_,
|
||||
const Context & context_,
|
||||
ContextPtr context_,
|
||||
const String & mode_)
|
||||
: log(&Poco::Logger::get("ODBCHandler"))
|
||||
: WithContext(context_)
|
||||
, log(&Poco::Logger::get("ODBCHandler"))
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
, context(context_)
|
||||
, mode(mode_)
|
||||
{
|
||||
}
|
||||
@ -32,7 +36,6 @@ private:
|
||||
Poco::Logger * log;
|
||||
|
||||
size_t keep_alive_timeout;
|
||||
const Context & context;
|
||||
String mode;
|
||||
|
||||
static inline std::mutex mutex;
|
||||
|
@ -44,14 +44,14 @@ ODBCBlockOutputStream::ODBCBlockOutputStream(nanodbc::connection & connection_,
|
||||
const std::string & remote_database_name_,
|
||||
const std::string & remote_table_name_,
|
||||
const Block & sample_block_,
|
||||
const Context & context_,
|
||||
ContextPtr local_context_,
|
||||
IdentifierQuotingStyle quoting_)
|
||||
: log(&Poco::Logger::get("ODBCBlockOutputStream"))
|
||||
, connection(connection_)
|
||||
, db_name(remote_database_name_)
|
||||
, table_name(remote_table_name_)
|
||||
, sample_block(sample_block_)
|
||||
, context(context_)
|
||||
, local_context(local_context_)
|
||||
, quoting(quoting_)
|
||||
{
|
||||
description.init(sample_block);
|
||||
@ -65,7 +65,7 @@ Block ODBCBlockOutputStream::getHeader() const
|
||||
void ODBCBlockOutputStream::write(const Block & block)
|
||||
{
|
||||
WriteBufferFromOwnString values_buf;
|
||||
auto writer = FormatFactory::instance().getOutputStream("Values", values_buf, sample_block, context);
|
||||
auto writer = FormatFactory::instance().getOutputStream("Values", values_buf, sample_block, local_context);
|
||||
writer->write(block);
|
||||
|
||||
std::string query = getInsertQuery(db_name, table_name, block.getColumnsWithTypeAndName(), quoting) + values_buf.str();
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <DataStreams/IBlockOutputStream.h>
|
||||
#include <Core/ExternalResultDescription.h>
|
||||
#include <Parsers/IdentifierQuotingStyle.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <nanodbc/nanodbc.h>
|
||||
|
||||
|
||||
@ -19,7 +20,7 @@ public:
|
||||
const std::string & remote_database_name_,
|
||||
const std::string & remote_table_name_,
|
||||
const Block & sample_block_,
|
||||
const Context & context_,
|
||||
ContextPtr local_context_,
|
||||
IdentifierQuotingStyle quoting);
|
||||
|
||||
Block getHeader() const override;
|
||||
@ -32,7 +33,7 @@ private:
|
||||
std::string db_name;
|
||||
std::string table_name;
|
||||
Block sample_block;
|
||||
const Context & context;
|
||||
ContextPtr local_context;
|
||||
IdentifierQuotingStyle quoting;
|
||||
|
||||
ExternalResultDescription description;
|
||||
|
@ -13,12 +13,12 @@ class ODBCBridge : public IBridge
|
||||
{
|
||||
|
||||
protected:
|
||||
const std::string bridgeName() const override
|
||||
std::string bridgeName() const override
|
||||
{
|
||||
return "ODBCBridge";
|
||||
}
|
||||
|
||||
HandlerFactoryPtr getHandlerFactoryPtr(Context & context) const override
|
||||
HandlerFactoryPtr getHandlerFactoryPtr(ContextPtr context) const override
|
||||
{
|
||||
return std::make_shared<ODBCBridgeHandlerFactory>("ODBCRequestHandlerFactory-factory", keep_alive_timeout, context);
|
||||
}
|
||||
|
@ -51,7 +51,7 @@ void SchemaAllowedHandler::handleRequest(HTTPServerRequest & request, HTTPServer
|
||||
|
||||
auto connection = ODBCConnectionFactory::instance().get(
|
||||
validateODBCConnectionString(connection_string),
|
||||
context.getSettingsRef().odbc_bridge_connection_pool_size);
|
||||
getContext()->getSettingsRef().odbc_bridge_connection_pool_size);
|
||||
|
||||
bool result = isSchemaAllowed(*connection);
|
||||
|
||||
|
@ -13,13 +13,13 @@ namespace DB
|
||||
class Context;
|
||||
|
||||
/// This handler establishes connection to database, and retrieves whether schema is allowed.
|
||||
class SchemaAllowedHandler : public HTTPRequestHandler
|
||||
class SchemaAllowedHandler : public HTTPRequestHandler, WithContext
|
||||
{
|
||||
public:
|
||||
SchemaAllowedHandler(size_t keep_alive_timeout_, const Context & context_)
|
||||
: log(&Poco::Logger::get("SchemaAllowedHandler"))
|
||||
SchemaAllowedHandler(size_t keep_alive_timeout_, ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, log(&Poco::Logger::get("SchemaAllowedHandler"))
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
, context(context_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -28,7 +28,6 @@ public:
|
||||
private:
|
||||
Poco::Logger * log;
|
||||
size_t keep_alive_timeout;
|
||||
const Context & context;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -426,8 +426,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
* settings, available functions, data types, aggregate functions, databases, ...
|
||||
*/
|
||||
auto shared_context = Context::createShared();
|
||||
auto global_context = std::make_unique<Context>(Context::createGlobal(shared_context.get()));
|
||||
global_context_ptr = global_context.get();
|
||||
global_context = Context::createGlobal(shared_context.get());
|
||||
|
||||
global_context->makeGlobalContext();
|
||||
global_context->setApplicationType(Context::ApplicationType::SERVER);
|
||||
@ -934,7 +933,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
/** Explicitly destroy Context. It is more convenient than in destructor of Server, because logger is still available.
|
||||
* At this moment, no one could own shared part of Context.
|
||||
*/
|
||||
global_context_ptr = nullptr;
|
||||
global_context.reset();
|
||||
shared_context.reset();
|
||||
LOG_DEBUG(log, "Destroyed global context.");
|
||||
@ -948,14 +946,14 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
|
||||
try
|
||||
{
|
||||
loadMetadataSystem(*global_context);
|
||||
loadMetadataSystem(global_context);
|
||||
/// After attaching system databases we can initialize system log.
|
||||
global_context->initializeSystemLogs();
|
||||
auto & database_catalog = DatabaseCatalog::instance();
|
||||
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
|
||||
attachSystemTablesServer(*database_catalog.getSystemDatabase(), has_zookeeper);
|
||||
/// Then, load remaining databases
|
||||
loadMetadata(*global_context, default_database);
|
||||
loadMetadata(global_context, default_database);
|
||||
database_catalog.loadDatabases();
|
||||
/// After loading validate that default database exists
|
||||
database_catalog.assertDatabaseExists(default_database);
|
||||
@ -1035,7 +1033,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
else
|
||||
{
|
||||
/// Initialize a watcher periodically updating DNS cache
|
||||
dns_cache_updater = std::make_unique<DNSCacheUpdater>(*global_context, config().getInt("dns_cache_update_period", 15));
|
||||
dns_cache_updater = std::make_unique<DNSCacheUpdater>(global_context, config().getInt("dns_cache_update_period", 15));
|
||||
}
|
||||
|
||||
#if defined(OS_LINUX)
|
||||
@ -1067,7 +1065,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
{
|
||||
/// This object will periodically calculate some metrics.
|
||||
AsynchronousMetrics async_metrics(
|
||||
*global_context, config().getUInt("asynchronous_metrics_update_period_s", 60), servers_to_start_before_tables, servers);
|
||||
global_context, config().getUInt("asynchronous_metrics_update_period_s", 60), servers_to_start_before_tables, servers);
|
||||
attachSystemTablesAsync(*DatabaseCatalog::instance().getSystemDatabase(), async_metrics);
|
||||
|
||||
for (const auto & listen_host : listen_hosts)
|
||||
@ -1330,7 +1328,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
int pool_size = config().getInt("distributed_ddl.pool_size", 1);
|
||||
if (pool_size < 1)
|
||||
throw Exception("distributed_ddl.pool_size should be greater then 0", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
||||
global_context->setDDLWorker(std::make_unique<DDLWorker>(pool_size, ddl_zookeeper_path, *global_context, &config(),
|
||||
global_context->setDDLWorker(std::make_unique<DDLWorker>(pool_size, ddl_zookeeper_path, global_context, &config(),
|
||||
"distributed_ddl", "DDLWorker", &CurrentMetrics::MaxDDLEntryID));
|
||||
}
|
||||
|
||||
|
@ -40,9 +40,9 @@ public:
|
||||
return BaseDaemon::logger();
|
||||
}
|
||||
|
||||
Context & context() const override
|
||||
ContextPtr context() const override
|
||||
{
|
||||
return *global_context_ptr;
|
||||
return global_context;
|
||||
}
|
||||
|
||||
bool isCancelled() const override
|
||||
@ -64,8 +64,7 @@ protected:
|
||||
std::string getDefaultCorePath() const override;
|
||||
|
||||
private:
|
||||
Context * global_context_ptr = nullptr;
|
||||
|
||||
ContextPtr global_context;
|
||||
Poco::Net::SocketAddress socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, [[maybe_unused]] bool secure = false) const;
|
||||
|
||||
using CreateServerFunc = std::function<void(UInt16)>;
|
||||
|
@ -60,6 +60,8 @@ public:
|
||||
return std::make_shared<DataTypeUInt8>();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void create(AggregateDataPtr __restrict place) const override
|
||||
{
|
||||
if (std::uniform_real_distribution<>(0.0, 1.0)(thread_local_rng) <= throw_probability)
|
||||
|
@ -98,6 +98,8 @@ public:
|
||||
|
||||
DataTypePtr getReturnType() const final { return std::make_shared<DataTypeNumber<Float64>>(); }
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void NO_SANITIZE_UNDEFINED merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
|
||||
{
|
||||
this->data(place).numerator += this->data(rhs).numerator;
|
||||
|
@ -54,6 +54,8 @@ public:
|
||||
return std::make_shared<DataTypeNumber<T>>();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
this->data(place).update(assert_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num]);
|
||||
|
@ -127,6 +127,8 @@ public:
|
||||
return std::make_shared<DataTypeFloat64>();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, const size_t row_num, Arena *) const override
|
||||
{
|
||||
/// NOTE Slightly inefficient.
|
||||
|
@ -33,6 +33,8 @@ public:
|
||||
return "categoricalInformationValue";
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void create(AggregateDataPtr __restrict place) const override
|
||||
{
|
||||
memset(place, 0, sizeOfData());
|
||||
|
@ -38,6 +38,8 @@ public:
|
||||
return std::make_shared<DataTypeUInt64>();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn **, size_t, Arena *) const override
|
||||
{
|
||||
++data(place).count;
|
||||
@ -126,6 +128,8 @@ public:
|
||||
return std::make_shared<DataTypeUInt64>();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
data(place).count += !assert_cast<const ColumnNullable &>(*columns[0]).isNullAt(row_num);
|
||||
|
@ -43,6 +43,8 @@ public:
|
||||
|
||||
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeNumber<T>>(); }
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void NO_SANITIZE_UNDEFINED ALWAYS_INLINE add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
auto value = assert_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num];
|
||||
|
@ -103,6 +103,8 @@ public:
|
||||
return std::make_shared<DataTypeNumber<Float64>>();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
if constexpr (!std::is_same_v<UInt128, Value>)
|
||||
|
@ -121,7 +121,7 @@ AggregateFunctionPtr AggregateFunctionFactory::getImpl(
|
||||
is_case_insensitive = true;
|
||||
}
|
||||
|
||||
const Context * query_context = nullptr;
|
||||
ContextPtr query_context;
|
||||
if (CurrentThread::isInitialized())
|
||||
query_context = CurrentThread::get().getQueryContext();
|
||||
|
||||
|
@ -104,6 +104,8 @@ public:
|
||||
return std::make_shared<DataTypeArray>(type);
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
/// TODO Do positions need to be 1-based for this function?
|
||||
|
@ -22,6 +22,8 @@ public:
|
||||
|
||||
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeNumber<T>>(); }
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
this->data(place).rbs.add(assert_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num]);
|
||||
@ -56,6 +58,8 @@ public:
|
||||
|
||||
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeNumber<T>>(); }
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
Data & data_lhs = this->data(place);
|
||||
|
@ -59,6 +59,8 @@ public:
|
||||
return std::make_shared<DataTypeArray>(this->argument_types[0]);
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
if (limit_num_elems && this->data(place).value.size() >= max_elems)
|
||||
|
@ -332,6 +332,8 @@ public:
|
||||
return std::make_shared<DataTypeArray>(tuple);
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
auto val = assert_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num];
|
||||
|
@ -146,7 +146,7 @@ void LinearModelData::predict(
|
||||
const ColumnsWithTypeAndName & arguments,
|
||||
size_t offset,
|
||||
size_t limit,
|
||||
const Context & context) const
|
||||
ContextPtr context) const
|
||||
{
|
||||
gradient_computer->predict(container, arguments, offset, limit, weights, bias, context);
|
||||
}
|
||||
@ -453,7 +453,7 @@ void LogisticRegression::predict(
|
||||
size_t limit,
|
||||
const std::vector<Float64> & weights,
|
||||
Float64 bias,
|
||||
const Context & /*context*/) const
|
||||
ContextPtr /*context*/) const
|
||||
{
|
||||
size_t rows_num = arguments.front().column->size();
|
||||
|
||||
@ -521,7 +521,7 @@ void LinearRegression::predict(
|
||||
size_t limit,
|
||||
const std::vector<Float64> & weights,
|
||||
Float64 bias,
|
||||
const Context & /*context*/) const
|
||||
ContextPtr /*context*/) const
|
||||
{
|
||||
if (weights.size() + 1 != arguments.size())
|
||||
{
|
||||
|
@ -3,10 +3,10 @@
|
||||
#include <Columns/ColumnVector.h>
|
||||
#include <Columns/ColumnsCommon.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include "IAggregateFunction.h"
|
||||
|
||||
namespace DB
|
||||
@ -44,7 +44,7 @@ public:
|
||||
size_t limit,
|
||||
const std::vector<Float64> & weights,
|
||||
Float64 bias,
|
||||
const Context & context) const = 0;
|
||||
ContextPtr context) const = 0;
|
||||
};
|
||||
|
||||
|
||||
@ -69,7 +69,7 @@ public:
|
||||
size_t limit,
|
||||
const std::vector<Float64> & weights,
|
||||
Float64 bias,
|
||||
const Context & context) const override;
|
||||
ContextPtr context) const override;
|
||||
};
|
||||
|
||||
|
||||
@ -94,7 +94,7 @@ public:
|
||||
size_t limit,
|
||||
const std::vector<Float64> & weights,
|
||||
Float64 bias,
|
||||
const Context & context) const override;
|
||||
ContextPtr context) const override;
|
||||
};
|
||||
|
||||
|
||||
@ -264,7 +264,7 @@ public:
|
||||
const ColumnsWithTypeAndName & arguments,
|
||||
size_t offset,
|
||||
size_t limit,
|
||||
const Context & context) const;
|
||||
ContextPtr context) const;
|
||||
|
||||
void returnWeights(IColumn & to) const;
|
||||
private:
|
||||
@ -323,6 +323,8 @@ public:
|
||||
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeFloat64>());
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
/// This function is called from evalMLMethod function for correct predictValues call
|
||||
DataTypePtr getReturnTypeToPredict() const override
|
||||
{
|
||||
@ -363,7 +365,7 @@ public:
|
||||
const ColumnsWithTypeAndName & arguments,
|
||||
size_t offset,
|
||||
size_t limit,
|
||||
const Context & context) const override
|
||||
ContextPtr context) const override
|
||||
{
|
||||
if (arguments.size() != param_num + 1)
|
||||
throw Exception(
|
||||
|
@ -87,6 +87,8 @@ public:
|
||||
return std::make_shared<DataTypeNumber<PointType>>();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
|
||||
{
|
||||
PointType left = assert_cast<const ColumnVector<PointType> &>(*columns[0]).getData()[row_num];
|
||||
|
@ -28,6 +28,8 @@ public:
|
||||
return argument_types.front();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void create(AggregateDataPtr) const override
|
||||
{
|
||||
}
|
||||
|
@ -103,6 +103,8 @@ public:
|
||||
return res;
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
auto value = static_cast<const ColVecType &>(*columns[0]).getData()[row_num];
|
||||
|
@ -94,6 +94,8 @@ public:
|
||||
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt8>());
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, const size_t row_num, Arena *) const override
|
||||
{
|
||||
for (const auto i : ext::range(0, events_size))
|
||||
|
@ -560,6 +560,8 @@ public:
|
||||
|
||||
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeUInt8>(); }
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
|
||||
{
|
||||
this->data(place).sort();
|
||||
@ -588,6 +590,8 @@ public:
|
||||
|
||||
DataTypePtr getReturnType() const override { return std::make_shared<DataTypeUInt64>(); }
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
|
||||
{
|
||||
this->data(place).sort();
|
||||
|
@ -168,6 +168,8 @@ public:
|
||||
);
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void insertResultInto(
|
||||
AggregateDataPtr place,
|
||||
IColumn & to,
|
||||
|
@ -123,6 +123,8 @@ public:
|
||||
return std::make_shared<DataTypeFloat64>();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
this->data(place).update(*columns[0], row_num);
|
||||
@ -375,6 +377,8 @@ public:
|
||||
return std::make_shared<DataTypeFloat64>();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
this->data(place).update(*columns[0], *columns[1], row_num);
|
||||
|
@ -121,6 +121,8 @@ public:
|
||||
return std::make_shared<DataTypeNumber<ResultType>>();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
if constexpr (StatFunc::num_args == 2)
|
||||
|
@ -314,6 +314,8 @@ public:
|
||||
return std::make_shared<ResultDataType>();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
const auto & column = assert_cast<const ColVecType &>(*columns[0]);
|
||||
|
@ -140,6 +140,8 @@ public:
|
||||
return std::make_shared<DataTypeTuple>(types);
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
static const auto & getArgumentColumns(const IColumn**& columns)
|
||||
{
|
||||
if constexpr (tuple_argument)
|
||||
|
@ -109,6 +109,8 @@ public:
|
||||
);
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
Float64 value = columns[0]->getFloat64(row_num);
|
||||
|
@ -50,6 +50,8 @@ public:
|
||||
return std::make_shared<DataTypeArray>(this->argument_types[0]);
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
auto & set = this->data(place).value;
|
||||
|
@ -210,6 +210,8 @@ public:
|
||||
return std::make_shared<DataTypeUInt64>();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
/// ALWAYS_INLINE is required to have better code layout for uniqHLL12 function
|
||||
void ALWAYS_INLINE add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
@ -265,6 +267,8 @@ public:
|
||||
return std::make_shared<DataTypeUInt64>();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
this->data(place).set.insert(typename Data::Set::value_type(
|
||||
|
@ -141,6 +141,8 @@ public:
|
||||
return std::make_shared<DataTypeUInt64>();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
if constexpr (!std::is_same_v<T, String>)
|
||||
@ -211,6 +213,8 @@ public:
|
||||
return std::make_shared<DataTypeUInt64>();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
this->data(place).set.insert(typename AggregateFunctionUniqCombinedData<UInt64, K, HashValueType>::Set::value_type(
|
||||
|
@ -184,6 +184,8 @@ public:
|
||||
return std::make_shared<DataTypeUInt64>();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
/// ALWAYS_INLINE is required to have better code layout for uniqUpTo function
|
||||
void ALWAYS_INLINE add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
@ -247,6 +249,8 @@ public:
|
||||
return std::make_shared<DataTypeUInt64>();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
this->data(place).insert(UInt64(UniqVariadicHash<is_exact, argument_is_tuple>::apply(num_args, columns, row_num)), threshold);
|
||||
|
@ -247,6 +247,8 @@ public:
|
||||
return std::make_shared<DataTypeUInt8>();
|
||||
}
|
||||
|
||||
bool allocatesMemoryInArena() const override { return false; }
|
||||
|
||||
AggregateFunctionPtr getOwnNullAdapter(
|
||||
const AggregateFunctionPtr & nested_function, const DataTypes & arguments, const Array & params,
|
||||
const AggregateFunctionProperties & /*properties*/) const override
|
||||
|
@ -1,21 +1,23 @@
|
||||
#pragma once
|
||||
|
||||
#include <Columns/ColumnTuple.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Core/Block.h>
|
||||
#include <Core/ColumnNumbers.h>
|
||||
#include <Core/Field.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <common/types.h>
|
||||
|
||||
#include <cstddef>
|
||||
#include <memory>
|
||||
#include <type_traits>
|
||||
#include <vector>
|
||||
|
||||
#include <common/types.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Core/Block.h>
|
||||
#include <Core/ColumnNumbers.h>
|
||||
#include <Core/Field.h>
|
||||
#include <Columns/ColumnTuple.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
@ -104,7 +106,7 @@ public:
|
||||
virtual void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, Arena * arena) const = 0;
|
||||
|
||||
/// Returns true if a function requires Arena to handle own states (see add(), merge(), deserialize()).
|
||||
virtual bool allocatesMemoryInArena() const { return false; }
|
||||
virtual bool allocatesMemoryInArena() const = 0;
|
||||
|
||||
/// Inserts results into a column. This method might modify the state (e.g.
|
||||
/// sort an array), so must be called once, from single thread. The state
|
||||
@ -122,7 +124,7 @@ public:
|
||||
const ColumnsWithTypeAndName & /*arguments*/,
|
||||
size_t /*offset*/,
|
||||
size_t /*limit*/,
|
||||
const Context & /*context*/) const
|
||||
ContextPtr /*context*/) const
|
||||
{
|
||||
throw Exception("Method predictValues is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ namespace DB
|
||||
|
||||
/// Common base class for XDBC and Library bridge helpers.
|
||||
/// Contains helper methods to check/start bridge sync.
|
||||
class IBridgeHelper
|
||||
class IBridgeHelper: protected WithContext
|
||||
{
|
||||
|
||||
public:
|
||||
@ -27,6 +27,7 @@ public:
|
||||
static const inline std::string PING_METHOD = Poco::Net::HTTPRequest::HTTP_GET;
|
||||
static const inline std::string MAIN_METHOD = Poco::Net::HTTPRequest::HTTP_POST;
|
||||
|
||||
explicit IBridgeHelper(ContextPtr context_) : WithContext(context_) {}
|
||||
virtual ~IBridgeHelper() = default;
|
||||
|
||||
void startBridgeSync() const;
|
||||
@ -38,9 +39,9 @@ public:
|
||||
|
||||
protected:
|
||||
/// clickhouse-odbc-bridge, clickhouse-library-bridge
|
||||
virtual const String serviceAlias() const = 0;
|
||||
virtual String serviceAlias() const = 0;
|
||||
|
||||
virtual const String serviceFileName() const = 0;
|
||||
virtual String serviceFileName() const = 0;
|
||||
|
||||
virtual size_t getDefaultPort() const = 0;
|
||||
|
||||
@ -48,9 +49,7 @@ protected:
|
||||
|
||||
virtual void startBridge(std::unique_ptr<ShellCommand> cmd) const = 0;
|
||||
|
||||
virtual const String configPrefix() const = 0;
|
||||
|
||||
virtual const Context & getContext() const = 0;
|
||||
virtual String configPrefix() const = 0;
|
||||
|
||||
virtual const Poco::Util::AbstractConfiguration & getConfig() const = 0;
|
||||
|
||||
|
@ -21,14 +21,14 @@ namespace DB
|
||||
{
|
||||
|
||||
LibraryBridgeHelper::LibraryBridgeHelper(
|
||||
const Context & context_,
|
||||
ContextPtr context_,
|
||||
const Block & sample_block_,
|
||||
const Field & dictionary_id_)
|
||||
: log(&Poco::Logger::get("LibraryBridgeHelper"))
|
||||
, context(context_)
|
||||
: IBridgeHelper(context_)
|
||||
, log(&Poco::Logger::get("LibraryBridgeHelper"))
|
||||
, sample_block(sample_block_)
|
||||
, config(context.getConfigRef())
|
||||
, http_timeout(context.getSettingsRef().http_receive_timeout.value.totalSeconds())
|
||||
, config(context_->getConfigRef())
|
||||
, http_timeout(context_->getSettingsRef().http_receive_timeout.value.totalSeconds())
|
||||
, dictionary_id(dictionary_id_)
|
||||
{
|
||||
bridge_port = config.getUInt("library_bridge.port", DEFAULT_PORT);
|
||||
@ -57,7 +57,7 @@ Poco::URI LibraryBridgeHelper::createBaseURI() const
|
||||
|
||||
void LibraryBridgeHelper::startBridge(std::unique_ptr<ShellCommand> cmd) const
|
||||
{
|
||||
context.addBridgeCommand(std::move(cmd));
|
||||
getContext()->addBridgeCommand(std::move(cmd));
|
||||
}
|
||||
|
||||
|
||||
@ -68,7 +68,7 @@ bool LibraryBridgeHelper::initLibrary(const std::string & library_path, const st
|
||||
|
||||
/// Sample block must contain null values
|
||||
WriteBufferFromOwnString out;
|
||||
auto output_stream = context.getOutputStream(LibraryBridgeHelper::DEFAULT_FORMAT, out, sample_block);
|
||||
auto output_stream = getContext()->getOutputStream(LibraryBridgeHelper::DEFAULT_FORMAT, out, sample_block);
|
||||
formatBlock(output_stream, sample_block);
|
||||
auto block_string = out.str();
|
||||
|
||||
@ -142,8 +142,7 @@ BlockInputStreamPtr LibraryBridgeHelper::loadKeys(const Block & requested_block)
|
||||
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [requested_block, this](std::ostream & os)
|
||||
{
|
||||
WriteBufferFromOStream out_buffer(os);
|
||||
auto output_stream = context.getOutputStream(
|
||||
LibraryBridgeHelper::DEFAULT_FORMAT, out_buffer, sample_block);
|
||||
auto output_stream = getContext()->getOutputStream(LibraryBridgeHelper::DEFAULT_FORMAT, out_buffer, sample_block);
|
||||
formatBlock(output_stream, requested_block);
|
||||
};
|
||||
return loadBase(uri, out_stream_callback);
|
||||
@ -156,7 +155,7 @@ bool LibraryBridgeHelper::executeRequest(const Poco::URI & uri, ReadWriteBufferF
|
||||
uri,
|
||||
Poco::Net::HTTPRequest::HTTP_POST,
|
||||
std::move(out_stream_callback),
|
||||
ConnectionTimeouts::getHTTPTimeouts(context));
|
||||
ConnectionTimeouts::getHTTPTimeouts(getContext()));
|
||||
|
||||
bool res;
|
||||
readBoolText(res, buf);
|
||||
@ -170,13 +169,13 @@ BlockInputStreamPtr LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWri
|
||||
uri,
|
||||
Poco::Net::HTTPRequest::HTTP_POST,
|
||||
std::move(out_stream_callback),
|
||||
ConnectionTimeouts::getHTTPTimeouts(context),
|
||||
ConnectionTimeouts::getHTTPTimeouts(getContext()),
|
||||
0,
|
||||
Poco::Net::HTTPBasicCredentials{},
|
||||
DBMS_DEFAULT_BUFFER_SIZE,
|
||||
ReadWriteBufferFromHTTP::HTTPHeaderEntries{});
|
||||
|
||||
auto input_stream = context.getInputFormat(LibraryBridgeHelper::DEFAULT_FORMAT, *read_buf_ptr, sample_block, DEFAULT_BLOCK_SIZE);
|
||||
auto input_stream = getContext()->getInputFormat(LibraryBridgeHelper::DEFAULT_FORMAT, *read_buf_ptr, sample_block, DEFAULT_BLOCK_SIZE);
|
||||
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(read_buf_ptr));
|
||||
}
|
||||
|
||||
|
@ -17,9 +17,9 @@ class LibraryBridgeHelper : public IBridgeHelper
|
||||
public:
|
||||
static constexpr inline size_t DEFAULT_PORT = 9012;
|
||||
|
||||
LibraryBridgeHelper(const Context & context_, const Block & sample_block, const Field & dictionary_id_);
|
||||
LibraryBridgeHelper(ContextPtr context_, const Block & sample_block, const Field & dictionary_id_);
|
||||
|
||||
bool initLibrary(const std::string & library_path, const std::string library_settings, const std::string attributes_names);
|
||||
bool initLibrary(const std::string & library_path, std::string library_settings, std::string attributes_names);
|
||||
|
||||
bool cloneLibrary(const Field & other_dictionary_id);
|
||||
|
||||
@ -31,7 +31,7 @@ public:
|
||||
|
||||
BlockInputStreamPtr loadAll();
|
||||
|
||||
BlockInputStreamPtr loadIds(const std::string ids_string);
|
||||
BlockInputStreamPtr loadIds(std::string ids_string);
|
||||
|
||||
BlockInputStreamPtr loadKeys(const Block & requested_block);
|
||||
|
||||
@ -43,17 +43,15 @@ public:
|
||||
protected:
|
||||
void startBridge(std::unique_ptr<ShellCommand> cmd) const override;
|
||||
|
||||
const String serviceAlias() const override { return "clickhouse-library-bridge"; }
|
||||
String serviceAlias() const override { return "clickhouse-library-bridge"; }
|
||||
|
||||
const String serviceFileName() const override { return serviceAlias(); }
|
||||
String serviceFileName() const override { return serviceAlias(); }
|
||||
|
||||
size_t getDefaultPort() const override { return DEFAULT_PORT; }
|
||||
|
||||
bool startBridgeManually() const override { return false; }
|
||||
|
||||
const String configPrefix() const override { return "library_bridge"; }
|
||||
|
||||
const Context & getContext() const override { return context; }
|
||||
String configPrefix() const override { return "library_bridge"; }
|
||||
|
||||
const Poco::Util::AbstractConfiguration & getConfig() const override { return config; }
|
||||
|
||||
@ -76,7 +74,6 @@ private:
|
||||
Poco::URI createRequestURI(const String & method) const;
|
||||
|
||||
Poco::Logger * log;
|
||||
const Context & context;
|
||||
const Block sample_block;
|
||||
const Poco::Util::AbstractConfiguration & config;
|
||||
const Poco::Timespan http_timeout;
|
||||
@ -85,4 +82,5 @@ private:
|
||||
std::string bridge_host;
|
||||
size_t bridge_port;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -35,6 +35,8 @@ class IXDBCBridgeHelper : public IBridgeHelper
|
||||
{
|
||||
|
||||
public:
|
||||
explicit IXDBCBridgeHelper(ContextPtr context_) : IBridgeHelper(context_) {}
|
||||
|
||||
virtual std::vector<std::pair<std::string, std::string>> getURLParams(const std::string & cols, UInt64 max_block_size) const = 0;
|
||||
|
||||
virtual Poco::URI getColumnsInfoURI() const = 0;
|
||||
@ -43,7 +45,7 @@ public:
|
||||
|
||||
virtual bool isSchemaAllowed() = 0;
|
||||
|
||||
virtual const String getName() const = 0;
|
||||
virtual String getName() const = 0;
|
||||
};
|
||||
|
||||
using BridgeHelperPtr = std::shared_ptr<IXDBCBridgeHelper>;
|
||||
@ -60,14 +62,14 @@ public:
|
||||
static constexpr inline auto SCHEMA_ALLOWED_HANDLER = "/schema_allowed";
|
||||
|
||||
XDBCBridgeHelper(
|
||||
const Context & global_context_,
|
||||
ContextPtr global_context_,
|
||||
const Poco::Timespan & http_timeout_,
|
||||
const std::string & connection_string_)
|
||||
: log(&Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper"))
|
||||
: IXDBCBridgeHelper(global_context_)
|
||||
, log(&Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper"))
|
||||
, connection_string(connection_string_)
|
||||
, http_timeout(http_timeout_)
|
||||
, context(global_context_)
|
||||
, config(context.getConfigRef())
|
||||
, config(global_context_->getConfigRef())
|
||||
{
|
||||
bridge_host = config.getString(BridgeHelperMixin::configPrefix() + ".host", DEFAULT_HOST);
|
||||
bridge_port = config.getUInt(BridgeHelperMixin::configPrefix() + ".port", DEFAULT_PORT);
|
||||
@ -77,18 +79,16 @@ public:
|
||||
protected:
|
||||
auto getConnectionString() const { return connection_string; }
|
||||
|
||||
const String getName() const override { return BridgeHelperMixin::getName(); }
|
||||
String getName() const override { return BridgeHelperMixin::getName(); }
|
||||
|
||||
size_t getDefaultPort() const override { return DEFAULT_PORT; }
|
||||
|
||||
const String serviceAlias() const override { return BridgeHelperMixin::serviceAlias(); }
|
||||
String serviceAlias() const override { return BridgeHelperMixin::serviceAlias(); }
|
||||
|
||||
/// Same for odbc and jdbc
|
||||
const String serviceFileName() const override { return "clickhouse-odbc-bridge"; }
|
||||
String serviceFileName() const override { return "clickhouse-odbc-bridge"; }
|
||||
|
||||
const String configPrefix() const override { return BridgeHelperMixin::configPrefix(); }
|
||||
|
||||
const Context & getContext() const override { return context; }
|
||||
String configPrefix() const override { return BridgeHelperMixin::configPrefix(); }
|
||||
|
||||
const Poco::Timespan & getHTTPTimeout() const override { return http_timeout; }
|
||||
|
||||
@ -109,7 +109,7 @@ protected:
|
||||
|
||||
void startBridge(std::unique_ptr<ShellCommand> cmd) const override
|
||||
{
|
||||
context.addBridgeCommand(std::move(cmd));
|
||||
getContext()->addBridgeCommand(std::move(cmd));
|
||||
}
|
||||
|
||||
|
||||
@ -122,7 +122,6 @@ private:
|
||||
std::string bridge_host;
|
||||
size_t bridge_port;
|
||||
|
||||
const Context & context;
|
||||
const Configuration & config;
|
||||
|
||||
std::optional<IdentifierQuotingStyle> quote_style;
|
||||
@ -160,8 +159,7 @@ protected:
|
||||
uri.setPath(SCHEMA_ALLOWED_HANDLER);
|
||||
uri.addQueryParameter("connection_string", getConnectionString());
|
||||
|
||||
ReadWriteBufferFromHTTP buf(
|
||||
uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context));
|
||||
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()));
|
||||
|
||||
bool res;
|
||||
readBoolText(res, buf);
|
||||
@ -181,12 +179,14 @@ protected:
|
||||
uri.setPath(IDENTIFIER_QUOTE_HANDLER);
|
||||
uri.addQueryParameter("connection_string", getConnectionString());
|
||||
|
||||
ReadWriteBufferFromHTTP buf(
|
||||
uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context));
|
||||
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()));
|
||||
|
||||
std::string character;
|
||||
readStringBinary(character, buf);
|
||||
if (character.length() > 1)
|
||||
throw Exception("Failed to parse quoting style from '" + character + "' for service " + BridgeHelperMixin::serviceAlias(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
throw Exception(
|
||||
"Failed to parse quoting style from '" + character + "' for service " + BridgeHelperMixin::serviceAlias(),
|
||||
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
|
||||
else if (character.length() == 0)
|
||||
quote_style = IdentifierQuotingStyle::None;
|
||||
else if (character[0] == '`')
|
||||
@ -206,17 +206,17 @@ struct JDBCBridgeMixin
|
||||
{
|
||||
static constexpr inline auto DEFAULT_PORT = 9019;
|
||||
|
||||
static const String configPrefix()
|
||||
static String configPrefix()
|
||||
{
|
||||
return "jdbc_bridge";
|
||||
}
|
||||
|
||||
static const String serviceAlias()
|
||||
static String serviceAlias()
|
||||
{
|
||||
return "clickhouse-jdbc-bridge";
|
||||
}
|
||||
|
||||
static const String getName()
|
||||
static String getName()
|
||||
{
|
||||
return "JDBC";
|
||||
}
|
||||
@ -237,17 +237,17 @@ struct ODBCBridgeMixin
|
||||
{
|
||||
static constexpr inline auto DEFAULT_PORT = 9018;
|
||||
|
||||
static const String configPrefix()
|
||||
static String configPrefix()
|
||||
{
|
||||
return "odbc_bridge";
|
||||
}
|
||||
|
||||
static const String serviceAlias()
|
||||
static String serviceAlias()
|
||||
{
|
||||
return "clickhouse-odbc-bridge";
|
||||
}
|
||||
|
||||
static const String getName()
|
||||
static String getName()
|
||||
{
|
||||
return "ODBC";
|
||||
}
|
||||
|
@ -508,5 +508,10 @@ if (ENABLE_TESTS AND USE_GTEST)
|
||||
clickhouse_common_zookeeper
|
||||
string_utils)
|
||||
|
||||
# For __udivmodti4 referenced in Core/tests/gtest_DecimalFunctions.cpp
|
||||
if (OS_DARWIN AND COMPILER_GCC)
|
||||
target_link_libraries(unit_tests_dbms PRIVATE gcc)
|
||||
endif ()
|
||||
|
||||
add_check(unit_tests_dbms)
|
||||
endif ()
|
||||
|
@ -21,6 +21,7 @@
|
||||
#include <IO/ReadBufferFromPocoSocket.h>
|
||||
|
||||
#include <Interpreters/TablesStatus.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
|
||||
#include <Compression/ICompressionCodec.h>
|
||||
|
||||
@ -52,8 +53,6 @@ class Connection;
|
||||
using ConnectionPtr = std::shared_ptr<Connection>;
|
||||
using Connections = std::vector<ConnectionPtr>;
|
||||
|
||||
using Scalars = std::map<String, Block>;
|
||||
|
||||
|
||||
/// Packet that could be received from server.
|
||||
struct Packet
|
||||
@ -111,7 +110,7 @@ public:
|
||||
setDescription();
|
||||
}
|
||||
|
||||
virtual ~Connection() {}
|
||||
virtual ~Connection() = default;
|
||||
|
||||
/// Set throttler of network traffic. One throttler could be used for multiple connections to limit total traffic.
|
||||
void setThrottler(const ThrottlerPtr & throttler_)
|
||||
|
@ -162,7 +162,7 @@ MutableColumnPtr ColumnAggregateFunction::convertToValues(MutableColumnPtr colum
|
||||
return res;
|
||||
}
|
||||
|
||||
MutableColumnPtr ColumnAggregateFunction::predictValues(const ColumnsWithTypeAndName & arguments, const Context & context) const
|
||||
MutableColumnPtr ColumnAggregateFunction::predictValues(const ColumnsWithTypeAndName & arguments, ContextPtr context) const
|
||||
{
|
||||
MutableColumnPtr res = func->getReturnTypeToPredict()->createColumn();
|
||||
res->reserve(data.size());
|
||||
|
@ -82,7 +82,7 @@ private:
|
||||
/// Name of the type to distinguish different aggregation states.
|
||||
String type_string;
|
||||
|
||||
ColumnAggregateFunction() {}
|
||||
ColumnAggregateFunction() = default;
|
||||
|
||||
/// Create a new column that has another column as a source.
|
||||
MutablePtr createView() const;
|
||||
@ -119,7 +119,7 @@ public:
|
||||
const char * getFamilyName() const override { return "AggregateFunction"; }
|
||||
TypeIndex getDataType() const override { return TypeIndex::AggregateFunction; }
|
||||
|
||||
MutableColumnPtr predictValues(const ColumnsWithTypeAndName & arguments, const Context & context) const;
|
||||
MutableColumnPtr predictValues(const ColumnsWithTypeAndName & arguments, ContextPtr context) const;
|
||||
|
||||
size_t size() const override
|
||||
{
|
||||
|
@ -1,11 +1,12 @@
|
||||
#pragma once
|
||||
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Common/ThreadStatus.h>
|
||||
#include <common/StringRef.h>
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include <common/StringRef.h>
|
||||
#include <Common/ThreadStatus.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
@ -18,7 +19,6 @@ class MemoryTracker;
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class Context;
|
||||
class QueryStatus;
|
||||
struct Progress;
|
||||
class InternalTextLogsQueue;
|
||||
@ -87,7 +87,7 @@ public:
|
||||
/// Initializes query with current thread as master thread in constructor, and detaches it in destructor
|
||||
struct QueryScope
|
||||
{
|
||||
explicit QueryScope(Context & query_context);
|
||||
explicit QueryScope(ContextPtr query_context);
|
||||
~QueryScope();
|
||||
|
||||
void logPeakMemoryUsage();
|
||||
@ -99,7 +99,7 @@ private:
|
||||
|
||||
/// Sets query_context for current thread group
|
||||
/// Can by used only through QueryScope
|
||||
static void attachQueryContext(Context & query_context);
|
||||
static void attachQueryContext(ContextPtr query_context);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -548,6 +548,7 @@
|
||||
M(578, INVALID_FORMAT_INSERT_QUERY_WITH_DATA) \
|
||||
M(579, INCORRECT_PART_TYPE) \
|
||||
M(580, CANNOT_SET_ROUNDING_MODE) \
|
||||
M(581, TOO_LARGE_DISTRIBUTED_DEPTH) \
|
||||
\
|
||||
M(998, POSTGRESQL_CONNECTION_FAILURE) \
|
||||
M(999, KEEPER_EXCEPTION) \
|
||||
|
@ -11,7 +11,9 @@
|
||||
|
||||
#ifdef __APPLE__
|
||||
// ucontext is not available without _XOPEN_SOURCE
|
||||
# pragma clang diagnostic ignored "-Wreserved-id-macro"
|
||||
# ifdef __clang__
|
||||
# pragma clang diagnostic ignored "-Wreserved-id-macro"
|
||||
# endif
|
||||
# define _XOPEN_SOURCE 700
|
||||
#endif
|
||||
#include <ucontext.h>
|
||||
|
@ -101,7 +101,8 @@ ThreadStatus::~ThreadStatus()
|
||||
|
||||
#if !defined(ARCADIA_BUILD)
|
||||
/// It may cause segfault if query_context was destroyed, but was not detached
|
||||
assert((!query_context && query_id.empty()) || (query_context && query_id == query_context->getCurrentQueryId()));
|
||||
auto query_context_ptr = query_context.lock();
|
||||
assert((!query_context_ptr && query_id.empty()) || (query_context_ptr && query_id == query_context_ptr->getCurrentQueryId()));
|
||||
#endif
|
||||
|
||||
if (deleter)
|
||||
|
@ -1,20 +1,20 @@
|
||||
#pragma once
|
||||
|
||||
#include <common/StringRef.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Core/SettingsEnums.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <IO/Progress.h>
|
||||
#include <Common/MemoryTracker.h>
|
||||
#include <Common/OpenTelemetryTraceContext.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <common/StringRef.h>
|
||||
|
||||
#include <Core/SettingsEnums.h>
|
||||
#include <boost/noncopyable.hpp>
|
||||
|
||||
#include <IO/Progress.h>
|
||||
|
||||
#include <memory>
|
||||
#include <functional>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <shared_mutex>
|
||||
#include <functional>
|
||||
#include <boost/noncopyable.hpp>
|
||||
|
||||
|
||||
namespace Poco
|
||||
@ -26,7 +26,6 @@ namespace Poco
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class Context;
|
||||
class QueryStatus;
|
||||
class ThreadStatus;
|
||||
class QueryProfilerReal;
|
||||
@ -58,8 +57,8 @@ public:
|
||||
ProfileEvents::Counters performance_counters{VariableContext::Process};
|
||||
MemoryTracker memory_tracker{VariableContext::Process};
|
||||
|
||||
Context * query_context = nullptr;
|
||||
Context * global_context = nullptr;
|
||||
ContextWeakPtr query_context;
|
||||
ContextWeakPtr global_context;
|
||||
|
||||
InternalTextLogsQueueWeakPtr logs_queue_ptr;
|
||||
std::function<void()> fatal_error_callback;
|
||||
@ -122,9 +121,9 @@ protected:
|
||||
std::atomic<int> thread_state{ThreadState::DetachedFromQuery};
|
||||
|
||||
/// Is set once
|
||||
Context * global_context = nullptr;
|
||||
ContextWeakPtr global_context;
|
||||
/// Use it only from current thread
|
||||
Context * query_context = nullptr;
|
||||
ContextWeakPtr query_context;
|
||||
|
||||
String query_id;
|
||||
|
||||
@ -178,9 +177,9 @@ public:
|
||||
return query_id;
|
||||
}
|
||||
|
||||
const Context * getQueryContext() const
|
||||
auto getQueryContext() const
|
||||
{
|
||||
return query_context;
|
||||
return query_context.lock();
|
||||
}
|
||||
|
||||
/// Starts new query and create new thread group for it, current thread becomes master thread of the query
|
||||
@ -203,7 +202,7 @@ public:
|
||||
|
||||
/// Sets query context for current master thread and its thread group
|
||||
/// NOTE: query_context have to be alive until detachQuery() is called
|
||||
void attachQueryContext(Context & query_context);
|
||||
void attachQueryContext(ContextPtr query_context);
|
||||
|
||||
/// Update several ProfileEvents counters
|
||||
void updatePerformanceCounters();
|
||||
|
@ -5,14 +5,14 @@
|
||||
struct ContextHolder
|
||||
{
|
||||
DB::SharedContextHolder shared_context;
|
||||
DB::Context context;
|
||||
DB::ContextPtr context;
|
||||
|
||||
ContextHolder()
|
||||
: shared_context(DB::Context::createShared())
|
||||
, context(DB::Context::createGlobal(shared_context.get()))
|
||||
{
|
||||
context.makeGlobalContext();
|
||||
context.setPath("./");
|
||||
context->makeGlobalContext();
|
||||
context->setPath("./");
|
||||
}
|
||||
|
||||
ContextHolder(ContextHolder &&) = default;
|
||||
|
@ -1,15 +1,15 @@
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
#include <list>
|
||||
#include <set>
|
||||
#include <map>
|
||||
#include <initializer_list>
|
||||
|
||||
#include <Core/BlockInfo.h>
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Core/ColumnWithTypeAndName.h>
|
||||
#include <Core/ColumnsWithTypeAndName.h>
|
||||
#include <Core/NamesAndTypes.h>
|
||||
|
||||
#include <initializer_list>
|
||||
#include <list>
|
||||
#include <map>
|
||||
#include <set>
|
||||
#include <vector>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -22,8 +22,6 @@ namespace DB
|
||||
* Allows to insert, remove columns in arbitrary position, to change order of columns.
|
||||
*/
|
||||
|
||||
class Context;
|
||||
|
||||
class Block
|
||||
{
|
||||
private:
|
||||
|
@ -1,9 +1,10 @@
|
||||
#pragma once
|
||||
|
||||
#include <unordered_map>
|
||||
|
||||
#include <common/types.h>
|
||||
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
@ -81,8 +81,9 @@
|
||||
#define DBMS_MIN_REVISION_WITH_REFERER_IN_CLIENT_INFO 54447
|
||||
|
||||
/// Version of ClickHouse TCP protocol. Increment it manually when you change the protocol.
|
||||
#define DBMS_TCP_PROTOCOL_VERSION 54447
|
||||
#define DBMS_TCP_PROTOCOL_VERSION 54448
|
||||
|
||||
#define DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH 54448
|
||||
/// The boundary on which the blocks for asynchronous file operations should be aligned.
|
||||
#define DEFAULT_AIO_FILE_BLOCK_SIZE 4096
|
||||
|
||||
|
@ -31,11 +31,11 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
ExternalTableDataPtr BaseExternalTable::getData(const Context & context)
|
||||
ExternalTableDataPtr BaseExternalTable::getData(ContextPtr context)
|
||||
{
|
||||
initReadBuffer();
|
||||
initSampleBlock();
|
||||
auto input = context.getInputFormat(format, *read_buffer, sample_block, DEFAULT_BLOCK_SIZE);
|
||||
auto input = context->getInputFormat(format, *read_buffer, sample_block, DEFAULT_BLOCK_SIZE);
|
||||
auto stream = std::make_shared<AsynchronousBlockInputStream>(input);
|
||||
|
||||
auto data = std::make_unique<ExternalTableData>();
|
||||
@ -127,7 +127,7 @@ ExternalTable::ExternalTable(const boost::program_options::variables_map & exter
|
||||
|
||||
void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header, ReadBuffer & stream)
|
||||
{
|
||||
const Settings & settings = context.getSettingsRef();
|
||||
const Settings & settings = getContext()->getSettingsRef();
|
||||
|
||||
if (settings.http_max_multipart_form_data_size)
|
||||
read_buffer = std::make_unique<LimitReadBuffer>(
|
||||
@ -152,14 +152,14 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header,
|
||||
else
|
||||
throw Exception("Neither structure nor types have not been provided for external table " + name + ". Use fields " + name + "_structure or " + name + "_types to do so.", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
ExternalTableDataPtr data = getData(context);
|
||||
ExternalTableDataPtr data = getData(getContext());
|
||||
|
||||
/// Create table
|
||||
NamesAndTypesList columns = sample_block.getNamesAndTypesList();
|
||||
auto temporary_table = TemporaryTableHolder(context, ColumnsDescription{columns}, {});
|
||||
auto temporary_table = TemporaryTableHolder(getContext(), ColumnsDescription{columns}, {});
|
||||
auto storage = temporary_table.getTable();
|
||||
context.addExternalTable(data->table_name, std::move(temporary_table));
|
||||
BlockOutputStreamPtr output = storage->write(ASTPtr(), storage->getInMemoryMetadataPtr(), context);
|
||||
getContext()->addExternalTable(data->table_name, std::move(temporary_table));
|
||||
BlockOutputStreamPtr output = storage->write(ASTPtr(), storage->getInMemoryMetadataPtr(), getContext());
|
||||
|
||||
/// Write data
|
||||
data->pipe->resize(1);
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <Client/Connection.h>
|
||||
#include <Core/Block.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <IO/ReadBuffer.h>
|
||||
#include <Server/HTTP/HTMLForm.h>
|
||||
|
||||
@ -11,30 +12,21 @@
|
||||
#include <vector>
|
||||
|
||||
|
||||
namespace Poco
|
||||
namespace Poco::Net
|
||||
{
|
||||
namespace Net
|
||||
{
|
||||
class NameValueCollection;
|
||||
class MessageHeader;
|
||||
}
|
||||
class NameValueCollection;
|
||||
class MessageHeader;
|
||||
}
|
||||
|
||||
namespace boost
|
||||
namespace boost::program_options
|
||||
{
|
||||
namespace program_options
|
||||
{
|
||||
class variables_map;
|
||||
}
|
||||
class variables_map;
|
||||
}
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class Context;
|
||||
|
||||
|
||||
/// The base class containing the basic information about external table and
|
||||
/// basic functions for extracting this information from text fields.
|
||||
class BaseExternalTable
|
||||
@ -56,7 +48,7 @@ public:
|
||||
virtual void initReadBuffer() {}
|
||||
|
||||
/// Get the table data - a pair (a stream with the contents of the table, the name of the table)
|
||||
ExternalTableDataPtr getData(const Context & context);
|
||||
ExternalTableDataPtr getData(ContextPtr context);
|
||||
|
||||
protected:
|
||||
/// Clear all accumulated information
|
||||
@ -88,15 +80,14 @@ public:
|
||||
/// Parsing of external table used when sending tables via http
|
||||
/// The `handlePart` function will be called for each table passed,
|
||||
/// so it's also necessary to call `clean` at the end of the `handlePart`.
|
||||
class ExternalTablesHandler : public HTMLForm::PartHandler, BaseExternalTable
|
||||
class ExternalTablesHandler : public HTMLForm::PartHandler, BaseExternalTable, WithContext
|
||||
{
|
||||
public:
|
||||
ExternalTablesHandler(Context & context_, const Poco::Net::NameValueCollection & params_) : context(context_), params(params_) {}
|
||||
ExternalTablesHandler(ContextPtr context_, const Poco::Net::NameValueCollection & params_) : WithContext(context_), params(params_) {}
|
||||
|
||||
void handlePart(const Poco::Net::MessageHeader & header, ReadBuffer & stream) override;
|
||||
|
||||
private:
|
||||
Context & context;
|
||||
const Poco::Net::NameValueCollection & params;
|
||||
};
|
||||
|
||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user