Merge branch 'master' of https://github.com/ClickHouse/ClickHouse into fix_create_znode

This commit is contained in:
zhangxiao871 2021-08-05 10:21:16 +08:00
commit 9fc4e42766
69 changed files with 1044 additions and 304 deletions

View File

@ -7,6 +7,6 @@ assignees: ''
---
Make sure to check documentation https://clickhouse.yandex/docs/en/ first. If the question is concise and probably has a short answer, asking it in Telegram chat https://telegram.me/clickhouse_en is probably the fastest way to find the answer. For more complicated questions, consider asking them on StackOverflow with "clickhouse" tag https://stackoverflow.com/questions/tagged/clickhouse
> Make sure to check documentation https://clickhouse.yandex/docs/en/ first. If the question is concise and probably has a short answer, asking it in Telegram chat https://telegram.me/clickhouse_en is probably the fastest way to find the answer. For more complicated questions, consider asking them on StackOverflow with "clickhouse" tag https://stackoverflow.com/questions/tagged/clickhouse
If you still prefer GitHub issues, remove all this text and ask your question here.
> If you still prefer GitHub issues, remove all this text and ask your question here.

View File

@ -7,16 +7,20 @@ assignees: ''
---
(you don't have to strictly follow this form)
> (you don't have to strictly follow this form)
**Use case**
A clear and concise description of what is the intended usage scenario is.
> A clear and concise description of what is the intended usage scenario is.
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
> A clear and concise description of what you want to happen.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.
> A clear and concise description of any alternative solutions or features you've considered.
**Additional context**
Add any other context or screenshots about the feature request here.
> Add any other context or screenshots about the feature request here.

View File

@ -7,11 +7,11 @@ assignees: ''
---
You have to provide the following information whenever possible.
> You have to provide the following information whenever possible.
**Describe the bug**
A clear and concise description of what works not as it is supposed to.
> A clear and concise description of what works not as it is supposed to.
**Does it reproduce on recent release?**
@ -19,7 +19,7 @@ A clear and concise description of what works not as it is supposed to.
**Enable crash reporting**
If possible, change "enabled" to true in "send_crash_reports" section in `config.xml`:
> If possible, change "enabled" to true in "send_crash_reports" section in `config.xml`:
```
<send_crash_reports>
@ -39,12 +39,12 @@ If possible, change "enabled" to true in "send_crash_reports" section in `config
**Expected behavior**
A clear and concise description of what you expected to happen.
> A clear and concise description of what you expected to happen.
**Error message and/or stacktrace**
If applicable, add screenshots to help explain your problem.
> If applicable, add screenshots to help explain your problem.
**Additional context**
Add any other context about the problem here.
> Add any other context about the problem here.

View File

@ -7,10 +7,11 @@ assignees: ''
---
Make sure that `git diff` result is empty and you've just pulled fresh master. Try cleaning up cmake cache. Just in case, official build instructions are published here: https://clickhouse.yandex/docs/en/development/build/
> Make sure that `git diff` result is empty and you've just pulled fresh master. Try cleaning up cmake cache. Just in case, official build instructions are published here: https://clickhouse.yandex/docs/en/development/build/
**Operating system**
OS kind or distribution, specific version/release, non-standard kernel if any. If you are trying to build inside virtual machine, please mention it too.
> OS kind or distribution, specific version/release, non-standard kernel if any. If you are trying to build inside virtual machine, please mention it too.
**Cmake version**

2
contrib/AMQP-CPP vendored

@ -1 +1 @@
Subproject commit 03781aaff0f10ef41f902b8cf865fe0067180c10
Subproject commit 1a6c51f4ac51ac56610fa95081bd2f349911375a

View File

@ -10,11 +10,12 @@ set (SRCS
"${LIBRARY_DIR}/src/deferredconsumer.cpp"
"${LIBRARY_DIR}/src/deferredextreceiver.cpp"
"${LIBRARY_DIR}/src/deferredget.cpp"
"${LIBRARY_DIR}/src/deferredpublisher.cpp"
"${LIBRARY_DIR}/src/deferredrecall.cpp"
"${LIBRARY_DIR}/src/deferredreceiver.cpp"
"${LIBRARY_DIR}/src/field.cpp"
"${LIBRARY_DIR}/src/flags.cpp"
"${LIBRARY_DIR}/src/linux_tcp/openssl.cpp"
"${LIBRARY_DIR}/src/linux_tcp/sslerrorprinter.cpp"
"${LIBRARY_DIR}/src/linux_tcp/tcpconnection.cpp"
"${LIBRARY_DIR}/src/inbuffer.cpp"
"${LIBRARY_DIR}/src/receivedframe.cpp"

2
contrib/arrow vendored

@ -1 +1 @@
Subproject commit debf751a129bdda9ff4d1e895e08957ff77000a1
Subproject commit 078e21bad344747b7656ef2d7a4f7410a0a303eb

View File

@ -194,9 +194,18 @@ set(ARROW_SRCS
"${LIBRARY_DIR}/compute/cast.cc"
"${LIBRARY_DIR}/compute/exec.cc"
"${LIBRARY_DIR}/compute/function.cc"
"${LIBRARY_DIR}/compute/function_internal.cc"
"${LIBRARY_DIR}/compute/kernel.cc"
"${LIBRARY_DIR}/compute/registry.cc"
"${LIBRARY_DIR}/compute/exec/exec_plan.cc"
"${LIBRARY_DIR}/compute/exec/expression.cc"
"${LIBRARY_DIR}/compute/exec/key_compare.cc"
"${LIBRARY_DIR}/compute/exec/key_encode.cc"
"${LIBRARY_DIR}/compute/exec/key_hash.cc"
"${LIBRARY_DIR}/compute/exec/key_map.cc"
"${LIBRARY_DIR}/compute/exec/util.cc"
"${LIBRARY_DIR}/compute/kernels/aggregate_basic.cc"
"${LIBRARY_DIR}/compute/kernels/aggregate_mode.cc"
"${LIBRARY_DIR}/compute/kernels/aggregate_quantile.cc"
@ -207,6 +216,7 @@ set(ARROW_SRCS
"${LIBRARY_DIR}/compute/kernels/scalar_arithmetic.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_boolean.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_cast_boolean.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_cast_dictionary.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_cast_internal.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_cast_nested.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_cast_numeric.cc"
@ -214,15 +224,18 @@ set(ARROW_SRCS
"${LIBRARY_DIR}/compute/kernels/scalar_cast_temporal.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_compare.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_fill_null.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_if_else.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_nested.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_set_lookup.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_string.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_temporal.cc"
"${LIBRARY_DIR}/compute/kernels/scalar_validity.cc"
"${LIBRARY_DIR}/compute/kernels/util_internal.cc"
"${LIBRARY_DIR}/compute/kernels/vector_hash.cc"
"${LIBRARY_DIR}/compute/kernels/vector_nested.cc"
"${LIBRARY_DIR}/compute/kernels/vector_replace.cc"
"${LIBRARY_DIR}/compute/kernels/vector_selection.cc"
"${LIBRARY_DIR}/compute/kernels/vector_sort.cc"
"${LIBRARY_DIR}/compute/kernels/util_internal.cc"
"${LIBRARY_DIR}/csv/chunker.cc"
"${LIBRARY_DIR}/csv/column_builder.cc"
@ -231,6 +244,7 @@ set(ARROW_SRCS
"${LIBRARY_DIR}/csv/options.cc"
"${LIBRARY_DIR}/csv/parser.cc"
"${LIBRARY_DIR}/csv/reader.cc"
"${LIBRARY_DIR}/csv/writer.cc"
"${LIBRARY_DIR}/ipc/dictionary.cc"
"${LIBRARY_DIR}/ipc/feather.cc"
@ -247,6 +261,7 @@ set(ARROW_SRCS
"${LIBRARY_DIR}/io/interfaces.cc"
"${LIBRARY_DIR}/io/memory.cc"
"${LIBRARY_DIR}/io/slow.cc"
"${LIBRARY_DIR}/io/stdio.cc"
"${LIBRARY_DIR}/io/transform.cc"
"${LIBRARY_DIR}/tensor/coo_converter.cc"
@ -257,9 +272,9 @@ set(ARROW_SRCS
"${LIBRARY_DIR}/util/bit_block_counter.cc"
"${LIBRARY_DIR}/util/bit_run_reader.cc"
"${LIBRARY_DIR}/util/bit_util.cc"
"${LIBRARY_DIR}/util/bitmap.cc"
"${LIBRARY_DIR}/util/bitmap_builders.cc"
"${LIBRARY_DIR}/util/bitmap_ops.cc"
"${LIBRARY_DIR}/util/bitmap.cc"
"${LIBRARY_DIR}/util/bpacking.cc"
"${LIBRARY_DIR}/util/cancel.cc"
"${LIBRARY_DIR}/util/compression.cc"

View File

@ -0,0 +1,13 @@
version: '2.3'
services:
mongo1:
image: mongo:3.6
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: clickhouse
volumes:
- ${MONGO_CONFIG_PATH}:/mongo/
ports:
- ${MONGO_EXTERNAL_PORT}:${MONGO_INTERNAL_PORT}
command: --config /mongo/mongo_secure.conf --profile=2 --verbose

View File

@ -15,7 +15,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name
name1 [type1],
name2 [type2],
...
) ENGINE = MongoDB(host:port, database, collection, user, password);
) ENGINE = MongoDB(host:port, database, collection, user, password [, options]);
```
**Engine Parameters**
@ -30,9 +30,11 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name
- `password` — User password.
- `options` — MongoDB connection string options (optional parameter).
## Usage Example {#usage-example}
Table in ClickHouse which allows to read data from MongoDB collection:
Create a table in ClickHouse which allows to read data from MongoDB collection:
``` text
CREATE TABLE mongo_table
@ -42,6 +44,16 @@ CREATE TABLE mongo_table
) ENGINE = MongoDB('mongo1:27017', 'test', 'simple_table', 'testuser', 'clickhouse');
```
To read from an SSL secured MongoDB server:
``` text
CREATE TABLE mongo_table_ssl
(
key UInt64,
data String
) ENGINE = MongoDB('mongo2:27017', 'test', 'simple_table', 'testuser', 'clickhouse', 'ssl=true');
```
Query:
``` sql

View File

@ -84,6 +84,8 @@ Features:
- Table data preview.
- Full-text search.
By default, DBeaver does not connect using a session (the CLI for example does). If you require session support (for example to set settings for your session), edit the driver connection properties and set session_id to a random string (it uses the http connection under the hood). Then you can use any setting from the query window
### clickhouse-cli {#clickhouse-cli}
[clickhouse-cli](https://github.com/hatarist/clickhouse-cli) is an alternative command-line client for ClickHouse, written in Python 3.

View File

@ -28,7 +28,7 @@ Structure of the `users` section:
<profile>profile_name</profile>
<quota>default</quota>
<default_database>default<default_database>
<databases>
<database_name>
<table_name>

View File

@ -15,6 +15,7 @@ CREATE USER [IF NOT EXISTS | OR REPLACE] name1 [ON CLUSTER cluster_name1]
[NOT IDENTIFIED | IDENTIFIED {[WITH {no_password | plaintext_password | sha256_password | sha256_hash | double_sha1_password | double_sha1_hash}] BY {'password' | 'hash'}} | {WITH ldap SERVER 'server_name'} | {WITH kerberos [REALM 'realm']}]
[HOST {LOCAL | NAME 'name' | REGEXP 'name_regexp' | IP 'address' | LIKE 'pattern'} [,...] | ANY | NONE]
[DEFAULT ROLE role [,...]]
[DEFAULT DATABASE database | NONE]
[GRANTEES {user | role | ANY | NONE} [,...] [EXCEPT {user | role} [,...]]]
[SETTINGS variable [= value] [MIN [=] min_value] [MAX [=] max_value] [READONLY | WRITABLE] | PROFILE 'profile_name'] [,...]
```

View File

@ -274,28 +274,28 @@ This modifier also can be combined with [LIMIT … WITH TIES modifier](../../../
`WITH FILL` modifier can be set after `ORDER BY expr` with optional `FROM expr`, `TO expr` and `STEP expr` parameters.
All missed values of `expr` column will be filled sequentially and other columns will be filled as defaults.
Use following syntax for filling multiple columns add `WITH FILL` modifier with optional parameters after each field name in `ORDER BY` section.
To fill multiple columns, add `WITH FILL` modifier with optional parameters after each field name in `ORDER BY` section.
``` sql
ORDER BY expr [WITH FILL] [FROM const_expr] [TO const_expr] [STEP const_numeric_expr], ... exprN [WITH FILL] [FROM expr] [TO expr] [STEP numeric_expr]
```
`WITH FILL` can be applied only for fields with Numeric (all kind of float, decimal, int) or Date/DateTime types.
`WITH FILL` can be applied for fields with Numeric (all kinds of float, decimal, int) or Date/DateTime types. When applied for `String` fields, missed values are filled with empty strings.
When `FROM const_expr` not defined sequence of filling use minimal `expr` field value from `ORDER BY`.
When `TO const_expr` not defined sequence of filling use maximum `expr` field value from `ORDER BY`.
When `STEP const_numeric_expr` defined then `const_numeric_expr` interprets `as is` for numeric types as `days` for Date type and as `seconds` for DateTime type.
When `STEP const_numeric_expr` omitted then sequence of filling use `1.0` for numeric type, `1 day` for Date type and `1 second` for DateTime type.
For example, the following query
Example of a query without `WITH FILL`:
``` sql
SELECT n, source FROM (
SELECT toFloat32(number % 10) AS n, 'original' AS source
FROM numbers(10) WHERE number % 3 = 1
) ORDER BY n
) ORDER BY n;
```
returns
Result:
``` text
┌─n─┬─source───┐
@ -305,16 +305,16 @@ returns
└───┴──────────┘
```
but after apply `WITH FILL` modifier
Same query after applying `WITH FILL` modifier:
``` sql
SELECT n, source FROM (
SELECT toFloat32(number % 10) AS n, 'original' AS source
FROM numbers(10) WHERE number % 3 = 1
) ORDER BY n WITH FILL FROM 0 TO 5.51 STEP 0.5
) ORDER BY n WITH FILL FROM 0 TO 5.51 STEP 0.5;
```
returns
Result:
``` text
┌───n─┬─source───┐
@ -334,7 +334,7 @@ returns
└─────┴──────────┘
```
For the case when we have multiple fields `ORDER BY field2 WITH FILL, field1 WITH FILL` order of filling will follow the order of fields in `ORDER BY` clause.
For the case with multiple fields `ORDER BY field2 WITH FILL, field1 WITH FILL` order of filling will follow the order of fields in the `ORDER BY` clause.
Example:
@ -350,7 +350,7 @@ ORDER BY
d1 WITH FILL STEP 5;
```
returns
Result:
``` text
┌───d1───────┬───d2───────┬─source───┐
@ -364,9 +364,9 @@ returns
└────────────┴────────────┴──────────┘
```
Field `d1` does not fill and use default value cause we do not have repeated values for `d2` value, and sequence for `d1` cant be properly calculated.
Field `d1` does not fill in and use the default value cause we do not have repeated values for `d2` value, and the sequence for `d1` cant be properly calculated.
The following query with a changed field in `ORDER BY`
The following query with the changed field in `ORDER BY`:
``` sql
SELECT
@ -380,7 +380,7 @@ ORDER BY
d2 WITH FILL;
```
returns
Result:
``` text
┌───d1───────┬───d2───────┬─source───┐

View File

@ -87,7 +87,7 @@ toc_title: "Введение"
Виртуальный столбец — это неотъемлемый атрибут движка таблиц, определенный в исходном коде движка.
Виртуальные столбцы не надо указывать в запросе `CREATE TABLE` и их не отображаются в результатах запросов `SHOW CREATE TABLE` и `DESCRIBE TABLE`. Также виртуальные столбцы доступны только для чтения, поэтому вы не можете вставлять в них данные.
Виртуальные столбцы не надо указывать в запросе `CREATE TABLE` и они не отображаются в результатах запросов `SHOW CREATE TABLE` и `DESCRIBE TABLE`. Также виртуальные столбцы доступны только для чтения, поэтому вы не можете вставлять в них данные.
Чтобы получить данные из виртуального столбца, необходимо указать его название в запросе `SELECT`. `SELECT *` не отображает данные из виртуальных столбцов.

View File

@ -111,7 +111,7 @@ toc_title: "Визуальные интерфейсы от сторонних р
### DataGrip {#datagrip}
[DataGrip](https://www.jetbrains.com/datagrip/) — это IDE для баз данных о JetBrains с выделенной поддержкой ClickHouse. Он также встроен в другие инструменты на основе IntelliJ: PyCharm, IntelliJ IDEA, GoLand, PhpStorm и другие.
[DataGrip](https://www.jetbrains.com/datagrip/) — это IDE для баз данных от JetBrains с выделенной поддержкой ClickHouse. Он также встроен в другие инструменты на основе IntelliJ: PyCharm, IntelliJ IDEA, GoLand, PhpStorm и другие.
Основные возможности:

View File

@ -271,8 +271,8 @@ SELECT * FROM collate_test ORDER BY s ASC COLLATE 'en';
Этот модификатор также может быть скобинирован с модификатором [LIMIT ... WITH TIES](../../../sql-reference/statements/select/limit.md#limit-with-ties)
`WITH FILL` модификатор может быть установлен после `ORDER BY expr` с опциональными параметрами `FROM expr`, `TO expr` и `STEP expr`.
Все пропущенные значнеия для колонки `expr` будут заполненые значениями соответсвующими предполагаемой последовательности значений колонки, другие колонки будут заполнены значенями по умолчанию.
Модификатор `WITH FILL` может быть установлен после `ORDER BY expr` с опциональными параметрами `FROM expr`, `TO expr` и `STEP expr`.
Все пропущенные значения для колонки `expr` будут заполнены значениями, соответствующими предполагаемой последовательности значений колонки, другие колонки будут заполнены значениями по умолчанию.
Используйте следующую конструкцию для заполнения нескольких колонок с модификатором `WITH FILL` с необязательными параметрами после каждого имени поля в секции `ORDER BY`.
@ -280,22 +280,22 @@ SELECT * FROM collate_test ORDER BY s ASC COLLATE 'en';
ORDER BY expr [WITH FILL] [FROM const_expr] [TO const_expr] [STEP const_numeric_expr], ... exprN [WITH FILL] [FROM expr] [TO expr] [STEP numeric_expr]
```
`WITH FILL` может быть применене только к полям с числовыми (все разновидности float, int, decimal) или временными (все разновидности Date, DateTime) типами.
`WITH FILL` может быть применен к полям с числовыми (все разновидности float, int, decimal) или временными (все разновидности Date, DateTime) типами. В случае применения к полям типа `String` недостающие значения заполняются пустой строкой.
Когда не определен `FROM const_expr`, последовательность заполнения использует минимальное значение поля `expr` из `ORDER BY`.
Когда не определен `TO const_expr`, последовательность заполнения использует максимальное значение поля `expr` из `ORDER BY`.
Когда `STEP const_numeric_expr` определен, тогда `const_numeric_expr` интерпретируется `как есть` для числовых типов, как `дни` для типа Date и как `секунды` для типа DateTime.
Когда `STEP const_numeric_expr` определен, `const_numeric_expr` интерпретируется "как есть" для числовых типов, как "дни" для типа `Date` и как "секунды" для типа `DateTime`.
Когда `STEP const_numeric_expr` не указан, тогда используется `1.0` для числовых типов, `1 день` для типа Date и `1 секунда` для типа DateTime.
Для примера, следующий запрос
Пример запроса без использования `WITH FILL`:
```sql
SELECT n, source FROM (
SELECT toFloat32(number % 10) AS n, 'original' AS source
FROM numbers(10) WHERE number % 3 = 1
) ORDER BY n
) ORDER BY n;
```
возвращает
Результат:
```text
┌─n─┬─source───┐
│ 1 │ original │
@ -304,7 +304,7 @@ SELECT n, source FROM (
└───┴──────────┘
```
но после применения модификатора `WITH FILL`
Тот же запрос после применения модификатора `WITH FILL`:
```sql
SELECT n, source FROM (
SELECT toFloat32(number % 10) AS n, 'original' AS source
@ -312,7 +312,8 @@ SELECT n, source FROM (
) ORDER BY n WITH FILL FROM 0 TO 5.51 STEP 0.5
```
возвращает
Результат:
```text
┌───n─┬─source───┐
│ 0 │ │
@ -331,13 +332,13 @@ SELECT n, source FROM (
└─────┴──────────┘
```
Для случая когда у нас есть несколько полей `ORDER BY field2 WITH FILL, field1 WITH FILL` порядок заполнения будет следовать порядку полей в секции `ORDER BY`.
Для случая с несколькими полями `ORDER BY field2 WITH FILL, field1 WITH FILL` порядок заполнения будет соответствовать порядку полей в секции `ORDER BY`.
Пример:
```sql
SELECT
toDate((number * 10) * 86400) AS d1,
toDate(number * 86400) AS d2,
```sql
SELECT
toDate((number * 10) * 86400) AS d1,
toDate(number * 86400) AS d2,
'original' AS source
FROM numbers(10)
WHERE (number % 3) = 1
@ -346,7 +347,7 @@ ORDER BY
d1 WITH FILL STEP 5;
```
возвращает
Результат:
```text
┌───d1───────┬───d2───────┬─source───┐
│ 1970-01-11 │ 1970-01-02 │ original │
@ -359,9 +360,9 @@ ORDER BY
└────────────┴────────────┴──────────┘
```
Поле `d1` не заполняет и используется значение по умолчанию поскольку у нас нет повторяющихся значения для `d2` поэтому мы не можем правильно рассчитать последователность заполнения для`d1`.
Поле `d1` не заполняется и использует значение по умолчанию. Поскольку у нас нет повторяющихся значений для `d2`, мы не можем правильно рассчитать последователность заполнения для `d1`.
едующий запрос (с измененым порядком в ORDER BY)
едующий запрос (с измененым порядком в ORDER BY):
```sql
SELECT
toDate((number * 10) * 86400) AS d1,
@ -374,7 +375,7 @@ ORDER BY
d2 WITH FILL;
```
возвращает
Результат:
```text
┌───d1───────┬───d2───────┬─source───┐
│ 1970-01-11 │ 1970-01-02 │ original │

View File

@ -12,8 +12,8 @@ namespace DB
Poco::URI uri{request.getURI()};
LOG_DEBUG(log, "Request URI: {}", uri.toString());
if (uri == "/ping" && request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
return std::make_unique<PingHandler>(keep_alive_timeout);
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
return std::make_unique<LibraryExistsHandler>(keep_alive_timeout, getContext());
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
return std::make_unique<LibraryRequestHandler>(keep_alive_timeout, getContext());

View File

@ -17,8 +17,24 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_REQUEST_PARAMETER;
}
namespace
{
void processError(HTTPServerResponse & response, const std::string & message)
{
response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
LOG_WARNING(&Poco::Logger::get("LibraryBridge"), message);
}
std::shared_ptr<Block> parseColumns(std::string && column_string)
{
auto sample_block = std::make_shared<Block>();
@ -30,9 +46,8 @@ namespace
return sample_block;
}
std::vector<uint64_t> parseIdsFromBinary(const std::string & ids_string)
std::vector<uint64_t> parseIdsFromBinary(ReadBuffer & buf)
{
ReadBufferFromString buf(ids_string);
std::vector<uint64_t> ids;
readVectorBinary(ids, buf);
return ids;
@ -67,13 +82,36 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
std::string method = params.get("method");
std::string dictionary_id = params.get("dictionary_id");
LOG_TRACE(log, "Library method: '{}', dictionary id: {}", method, dictionary_id);
LOG_TRACE(log, "Library method: '{}', dictionary id: {}", method, dictionary_id);
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
try
{
if (method == "libNew")
bool lib_new = (method == "libNew");
if (method == "libClone")
{
if (!params.has("from_dictionary_id"))
{
processError(response, "No 'from_dictionary_id' in request URL");
return;
}
std::string from_dictionary_id = params.get("from_dictionary_id");
bool cloned = false;
cloned = SharedLibraryHandlerFactory::instance().clone(from_dictionary_id, dictionary_id);
if (cloned)
{
writeStringBinary("1", out);
}
else
{
LOG_TRACE(log, "Cannot clone from dictionary with id: {}, will call libNew instead");
lib_new = true;
}
}
if (lib_new)
{
auto & read_buf = request.getStream();
params.read(read_buf);
@ -92,6 +130,8 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
std::string library_path = params.get("library_path");
const auto & settings_string = params.get("library_settings");
LOG_DEBUG(log, "Parsing library settings from binary string");
std::vector<std::string> library_settings = parseNamesFromBinary(settings_string);
/// Needed for library dictionary
@ -102,6 +142,8 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
}
const auto & attributes_string = params.get("attributes_names");
LOG_DEBUG(log, "Parsing attributes names from binary string");
std::vector<std::string> attributes_names = parseNamesFromBinary(attributes_string);
/// Needed to parse block from binary string format
@ -140,54 +182,63 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
SharedLibraryHandlerFactory::instance().create(dictionary_id, library_path, library_settings, sample_block_with_nulls, attributes_names);
writeStringBinary("1", out);
}
else if (method == "libClone")
{
if (!params.has("from_dictionary_id"))
{
processError(response, "No 'from_dictionary_id' in request URL");
return;
}
std::string from_dictionary_id = params.get("from_dictionary_id");
LOG_TRACE(log, "Calling libClone from {} to {}", from_dictionary_id, dictionary_id);
SharedLibraryHandlerFactory::instance().clone(from_dictionary_id, dictionary_id);
writeStringBinary("1", out);
}
else if (method == "libDelete")
{
SharedLibraryHandlerFactory::instance().remove(dictionary_id);
auto deleted = SharedLibraryHandlerFactory::instance().remove(dictionary_id);
/// Do not throw, a warning is ok.
if (!deleted)
LOG_WARNING(log, "Cannot delete library for with dictionary id: {}, because such id was not found.", dictionary_id);
writeStringBinary("1", out);
}
else if (method == "isModified")
{
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
bool res = library_handler->isModified();
writeStringBinary(std::to_string(res), out);
}
else if (method == "supportsSelectiveLoad")
{
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
bool res = library_handler->supportsSelectiveLoad();
writeStringBinary(std::to_string(res), out);
}
else if (method == "loadAll")
{
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
const auto & sample_block = library_handler->getSampleBlock();
LOG_DEBUG(log, "Calling loadAll() for dictionary id: {}", dictionary_id);
auto input = library_handler->loadAll();
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, getContext());
copyData(*input, *output);
}
else if (method == "loadIds")
{
LOG_DEBUG(log, "Getting diciontary ids for dictionary with id: {}", dictionary_id);
String ids_string;
readString(ids_string, request.getStream());
std::vector<uint64_t> ids = parseIdsFromBinary(ids_string);
std::vector<uint64_t> ids = parseIdsFromBinary(request.getStream());
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
const auto & sample_block = library_handler->getSampleBlock();
LOG_DEBUG(log, "Calling loadIds() for dictionary id: {}", dictionary_id);
auto input = library_handler->loadIds(ids);
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, getContext());
copyData(*input, *output);
}
@ -219,8 +270,14 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
auto block = reader->read();
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
const auto & sample_block = library_handler->getSampleBlock();
LOG_DEBUG(log, "Calling loadKeys() for dictionary id: {}", dictionary_id);
auto input = library_handler->loadKeys(block.getColumns());
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, getContext());
copyData(*input, *output);
}
@ -228,8 +285,9 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
catch (...)
{
auto message = getCurrentExceptionMessage(true);
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR, message); // can't call process_error, because of too soon response sending
LOG_ERROR(log, "Failed to process request for dictionary_id: {}. Error: {}", dictionary_id, message);
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR, message); // can't call process_error, because of too soon response sending
try
{
writeStringBinary(message, out);
@ -239,8 +297,6 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
{
tryLogCurrentException(log);
}
tryLogCurrentException(log);
}
try
@ -254,24 +310,30 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
}
void LibraryRequestHandler::processError(HTTPServerResponse & response, const std::string & message)
{
response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
LOG_WARNING(log, message);
}
void PingHandler::handleRequest(HTTPServerRequest & /* request */, HTTPServerResponse & response)
void LibraryExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
try
{
LOG_TRACE(log, "Request URI: {}", request.getURI());
HTMLForm params(getContext()->getSettingsRef(), request);
if (!params.has("dictionary_id"))
{
processError(response, "No 'dictionary_id' in request URL");
return;
}
std::string dictionary_id = params.get("dictionary_id");
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
String res;
if (library_handler)
res = "1";
else
res = "0";
setResponseDefaultHeaders(response, keep_alive_timeout);
const char * data = "Ok.\n";
response.sendBuffer(data, strlen(data));
LOG_TRACE(log, "Senging ping response: {} (dictionary id: {})", res, dictionary_id);
response.sendBuffer(res.data(), res.size());
}
catch (...)
{

View File

@ -22,8 +22,7 @@ class LibraryRequestHandler : public HTTPRequestHandler, WithContext
public:
LibraryRequestHandler(
size_t keep_alive_timeout_,
ContextPtr context_)
size_t keep_alive_timeout_, ContextPtr context_)
: WithContext(context_)
, log(&Poco::Logger::get("LibraryRequestHandler"))
, keep_alive_timeout(keep_alive_timeout_)
@ -35,18 +34,18 @@ public:
private:
static constexpr inline auto FORMAT = "RowBinary";
void processError(HTTPServerResponse & response, const std::string & message);
Poco::Logger * log;
size_t keep_alive_timeout;
};
class PingHandler : public HTTPRequestHandler
class LibraryExistsHandler : public HTTPRequestHandler, WithContext
{
public:
explicit PingHandler(size_t keep_alive_timeout_)
: keep_alive_timeout(keep_alive_timeout_)
explicit LibraryExistsHandler(size_t keep_alive_timeout_, ContextPtr context_)
: WithContext(context_)
, keep_alive_timeout(keep_alive_timeout_)
, log(&Poco::Logger::get("LibraryRequestHandler"))
{
}
@ -54,6 +53,8 @@ public:
private:
const size_t keep_alive_timeout;
Poco::Logger * log;
};
}

View File

@ -4,12 +4,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
SharedLibraryHandlerPtr SharedLibraryHandlerFactory::get(const std::string & dictionary_id)
{
std::lock_guard lock(mutex);
@ -18,7 +12,7 @@ SharedLibraryHandlerPtr SharedLibraryHandlerFactory::get(const std::string & dic
if (library_handler != library_handlers.end())
return library_handler->second;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Not found dictionary with id: {}", dictionary_id);
return nullptr;
}
@ -30,32 +24,32 @@ void SharedLibraryHandlerFactory::create(
const std::vector<std::string> & attributes_names)
{
std::lock_guard lock(mutex);
library_handlers[dictionary_id] = std::make_shared<SharedLibraryHandler>(library_path, library_settings, sample_block, attributes_names);
if (!library_handlers.count(dictionary_id))
library_handlers.emplace(std::make_pair(dictionary_id, std::make_shared<SharedLibraryHandler>(library_path, library_settings, sample_block, attributes_names)));
else
LOG_WARNING(&Poco::Logger::get("SharedLibraryHandlerFactory"), "Library handler with dictionary id {} already exists", dictionary_id);
}
void SharedLibraryHandlerFactory::clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id)
bool SharedLibraryHandlerFactory::clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id)
{
std::lock_guard lock(mutex);
auto from_library_handler = library_handlers.find(from_dictionary_id);
/// This is not supposed to happen as libClone is called from copy constructor of LibraryDictionarySource
/// object, and shared library handler of from_dictionary is removed only in its destructor.
/// And if for from_dictionary there was no shared library handler, it would have received and exception in
/// its constructor, so no libClone would be made from it.
if (from_library_handler == library_handlers.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No shared library handler found");
return false;
/// libClone method will be called in copy constructor
library_handlers[to_dictionary_id] = std::make_shared<SharedLibraryHandler>(*from_library_handler->second);
return true;
}
void SharedLibraryHandlerFactory::remove(const std::string & dictionary_id)
bool SharedLibraryHandlerFactory::remove(const std::string & dictionary_id)
{
std::lock_guard lock(mutex);
/// libDelete is called in destructor.
library_handlers.erase(dictionary_id);
return library_handlers.erase(dictionary_id);
}

View File

@ -24,9 +24,9 @@ public:
const Block & sample_block,
const std::vector<std::string> & attributes_names);
void clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id);
bool clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id);
void remove(const std::string & dictionary_id);
bool remove(const std::string & dictionary_id);
private:
/// map: dict_id -> sharedLibraryHandler

View File

@ -33,24 +33,9 @@ Poco::URI IBridgeHelper::getPingURI() const
}
bool IBridgeHelper::checkBridgeIsRunning() const
void IBridgeHelper::startBridgeSync()
{
try
{
ReadWriteBufferFromHTTP buf(
getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()));
return checkString(PING_OK_ANSWER, buf);
}
catch (...)
{
return false;
}
}
void IBridgeHelper::startBridgeSync() const
{
if (!checkBridgeIsRunning())
if (!bridgeHandShake())
{
LOG_TRACE(getLog(), "{} is not running, will try to start it", serviceAlias());
startBridge(startBridgeCommand());
@ -64,7 +49,7 @@ void IBridgeHelper::startBridgeSync() const
++counter;
LOG_TRACE(getLog(), "Checking {} is running, try {}", serviceAlias(), counter);
if (checkBridgeIsRunning())
if (bridgeHandShake())
{
started = true;
break;
@ -81,7 +66,7 @@ void IBridgeHelper::startBridgeSync() const
}
std::unique_ptr<ShellCommand> IBridgeHelper::startBridgeCommand() const
std::unique_ptr<ShellCommand> IBridgeHelper::startBridgeCommand()
{
if (startBridgeManually())
throw Exception(serviceAlias() + " is not running. Please, start it manually", ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING);

View File

@ -28,16 +28,19 @@ public:
static const inline std::string MAIN_METHOD = Poco::Net::HTTPRequest::HTTP_POST;
explicit IBridgeHelper(ContextPtr context_) : WithContext(context_) {}
virtual ~IBridgeHelper() = default;
void startBridgeSync() const;
virtual ~IBridgeHelper() = default;
Poco::URI getMainURI() const;
Poco::URI getPingURI() const;
void startBridgeSync();
protected:
/// Check bridge is running. Can also check something else in the mean time.
virtual bool bridgeHandShake() = 0;
/// clickhouse-odbc-bridge, clickhouse-library-bridge
virtual String serviceAlias() const = 0;
@ -61,9 +64,7 @@ protected:
private:
bool checkBridgeIsRunning() const;
std::unique_ptr<ShellCommand> startBridgeCommand() const;
std::unique_ptr<ShellCommand> startBridgeCommand();
};
}

View File

@ -1,6 +1,5 @@
#include "LibraryBridgeHelper.h"
#include <IO/ReadHelpers.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <DataStreams/formatBlock.h>
@ -8,6 +7,8 @@
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Formats/FormatFactory.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/ShellCommand.h>
@ -20,16 +21,25 @@
namespace DB
{
namespace ErrorCodes
{
extern const int EXTERNAL_LIBRARY_ERROR;
extern const int LOGICAL_ERROR;
}
LibraryBridgeHelper::LibraryBridgeHelper(
ContextPtr context_,
const Block & sample_block_,
const Field & dictionary_id_)
const Field & dictionary_id_,
const LibraryInitData & library_data_)
: IBridgeHelper(context_->getGlobalContext())
, log(&Poco::Logger::get("LibraryBridgeHelper"))
, sample_block(sample_block_)
, config(context_->getConfigRef())
, http_timeout(context_->getGlobalContext()->getSettingsRef().http_receive_timeout.value)
, library_data(library_data_)
, dictionary_id(dictionary_id_)
, http_timeouts(ConnectionTimeouts::getHTTPTimeouts(context_))
{
bridge_port = config.getUInt("library_bridge.port", DEFAULT_PORT);
bridge_host = config.getString("library_bridge.host", DEFAULT_HOST);
@ -61,26 +71,91 @@ void LibraryBridgeHelper::startBridge(std::unique_ptr<ShellCommand> cmd) const
}
bool LibraryBridgeHelper::initLibrary(const std::string & library_path, const std::string library_settings, const std::string attributes_names)
bool LibraryBridgeHelper::bridgeHandShake()
{
startBridgeSync();
auto uri = createRequestURI(LIB_NEW_METHOD);
String result;
try
{
ReadWriteBufferFromHTTP buf(createRequestURI(PING), Poco::Net::HTTPRequest::HTTP_GET, {}, http_timeouts);
readString(result, buf);
}
catch (...)
{
return false;
}
/*
* When pinging bridge we also pass current dicionary_id. The bridge will check if there is such
* dictionary. It is possible that such dictionary_id is not present only in two cases:
* 1. It is dictionary source creation and initialization of library handler on bridge side did not happen yet.
* 2. Bridge crashed or restarted for some reason while server did not.
**/
if (result.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected message from library bridge: {}. Check bridge and server have the same version.", result);
UInt8 dictionary_id_exists;
auto parsed = tryParse<UInt8>(dictionary_id_exists, result);
if (!parsed || (dictionary_id_exists != 0 && dictionary_id_exists != 1))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected message from library bridge: {} ({}). Check bridge and server have the same version.",
result, parsed ? toString(dictionary_id_exists) : "failed to parse");
LOG_TRACE(log, "dictionary_id: {}, dictionary_id_exists on bridge side: {}, library confirmed to be initialized on server side: {}",
toString(dictionary_id), toString(dictionary_id_exists), library_initialized);
if (dictionary_id_exists && !library_initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Library was not initialized, but bridge responded to already have dictionary id: {}", dictionary_id);
/// Here we want to say bridge to recreate a new library handler for current dictionary,
/// because it responded to have lost it, but we know that it has already been created. (It is a direct result of bridge crash).
if (!dictionary_id_exists && library_initialized)
{
LOG_WARNING(log, "Library bridge does not have library handler with dictionaty id: {}. It will be reinitialized.", dictionary_id);
bool reinitialized = false;
try
{
auto uri = createRequestURI(LIB_NEW_METHOD);
reinitialized = executeRequest(uri, getInitLibraryCallback());
}
catch (...)
{
tryLogCurrentException(log);
return false;
}
if (!reinitialized)
throw Exception(ErrorCodes::EXTERNAL_LIBRARY_ERROR,
"Failed to reinitialize library handler on bridge side for dictionary with id: {}", dictionary_id);
}
return true;
}
ReadWriteBufferFromHTTP::OutStreamCallback LibraryBridgeHelper::getInitLibraryCallback() const
{
/// Sample block must contain null values
WriteBufferFromOwnString out;
auto output_stream = getContext()->getOutputStream(LibraryBridgeHelper::DEFAULT_FORMAT, out, sample_block);
formatBlock(output_stream, sample_block);
auto block_string = out.str();
auto out_stream_callback = [library_path, library_settings, attributes_names, block_string, this](std::ostream & os)
return [block_string, this](std::ostream & os)
{
os << "library_path=" << escapeForFileName(library_path) << "&";
os << "library_settings=" << escapeForFileName(library_settings) << "&";
os << "attributes_names=" << escapeForFileName(attributes_names) << "&";
os << "library_path=" << escapeForFileName(library_data.library_path) << "&";
os << "library_settings=" << escapeForFileName(library_data.library_settings) << "&";
os << "attributes_names=" << escapeForFileName(library_data.dict_attributes) << "&";
os << "sample_block=" << escapeForFileName(sample_block.getNamesAndTypesList().toString()) << "&";
os << "null_values=" << escapeForFileName(block_string);
};
return executeRequest(uri, out_stream_callback);
}
bool LibraryBridgeHelper::initLibrary()
{
startBridgeSync();
auto uri = createRequestURI(LIB_NEW_METHOD);
library_initialized = executeRequest(uri, getInitLibraryCallback());
return library_initialized;
}
@ -89,15 +164,23 @@ bool LibraryBridgeHelper::cloneLibrary(const Field & other_dictionary_id)
startBridgeSync();
auto uri = createRequestURI(LIB_CLONE_METHOD);
uri.addQueryParameter("from_dictionary_id", toString(other_dictionary_id));
return executeRequest(uri);
/// We also pass initialization settings in order to create a library handler
/// in case from_dictionary_id does not exist in bridge side (possible in case of bridge crash).
library_initialized = executeRequest(uri, getInitLibraryCallback());
return library_initialized;
}
bool LibraryBridgeHelper::removeLibrary()
{
startBridgeSync();
auto uri = createRequestURI(LIB_DELETE_METHOD);
return executeRequest(uri);
/// Do not force bridge restart if it is not running in case of removeLibrary
/// because in this case after restart it will not have this dictionaty id in memory anyway.
if (bridgeHandShake())
{
auto uri = createRequestURI(LIB_DELETE_METHOD);
return executeRequest(uri);
}
return true;
}
@ -125,10 +208,12 @@ BlockInputStreamPtr LibraryBridgeHelper::loadAll()
}
BlockInputStreamPtr LibraryBridgeHelper::loadIds(const std::string ids_string)
BlockInputStreamPtr LibraryBridgeHelper::loadIds(const std::vector<uint64_t> & ids)
{
startBridgeSync();
auto uri = createRequestURI(LOAD_IDS_METHOD);
uri.addQueryParameter("ids_num", toString(ids.size())); /// Not used parameter, but helpful
auto ids_string = getDictIdsString(ids);
return loadBase(uri, [ids_string](std::ostream & os) { os << ids_string; });
}
@ -149,13 +234,13 @@ BlockInputStreamPtr LibraryBridgeHelper::loadKeys(const Block & requested_block)
}
bool LibraryBridgeHelper::executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback)
bool LibraryBridgeHelper::executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback) const
{
ReadWriteBufferFromHTTP buf(
uri,
Poco::Net::HTTPRequest::HTTP_POST,
std::move(out_stream_callback),
ConnectionTimeouts::getHTTPTimeouts(getContext()));
http_timeouts);
bool res;
readBoolText(res, buf);
@ -169,7 +254,7 @@ BlockInputStreamPtr LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWri
uri,
Poco::Net::HTTPRequest::HTTP_POST,
std::move(out_stream_callback),
ConnectionTimeouts::getHTTPTimeouts(getContext()),
http_timeouts,
0,
Poco::Net::HTTPBasicCredentials{},
DBMS_DEFAULT_BUFFER_SIZE,
@ -179,4 +264,13 @@ BlockInputStreamPtr LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWri
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(read_buf_ptr));
}
String LibraryBridgeHelper::getDictIdsString(const std::vector<UInt64> & ids)
{
WriteBufferFromOwnString out;
writeVectorBinary(ids, out);
return out.str();
}
}

View File

@ -15,11 +15,18 @@ class LibraryBridgeHelper : public IBridgeHelper
{
public:
struct LibraryInitData
{
String library_path;
String library_settings;
String dict_attributes;
};
static constexpr inline size_t DEFAULT_PORT = 9012;
LibraryBridgeHelper(ContextPtr context_, const Block & sample_block, const Field & dictionary_id_);
LibraryBridgeHelper(ContextPtr context_, const Block & sample_block, const Field & dictionary_id_, const LibraryInitData & library_data_);
bool initLibrary(const std::string & library_path, std::string library_settings, std::string attributes_names);
bool initLibrary();
bool cloneLibrary(const Field & other_dictionary_id);
@ -31,16 +38,19 @@ public:
BlockInputStreamPtr loadAll();
BlockInputStreamPtr loadIds(std::string ids_string);
BlockInputStreamPtr loadIds(const std::vector<uint64_t> & ids);
BlockInputStreamPtr loadKeys(const Block & requested_block);
BlockInputStreamPtr loadBase(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {});
bool executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {});
bool executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {}) const;
LibraryInitData getLibraryData() const { return library_data; }
protected:
bool bridgeHandShake() override;
void startBridge(std::unique_ptr<ShellCommand> cmd) const override;
String serviceAlias() const override { return "clickhouse-library-bridge"; }
@ -61,6 +71,8 @@ protected:
Poco::URI createBaseURI() const override;
ReadWriteBufferFromHTTP::OutStreamCallback getInitLibraryCallback() const;
private:
static constexpr inline auto LIB_NEW_METHOD = "libNew";
static constexpr inline auto LIB_CLONE_METHOD = "libClone";
@ -69,18 +81,24 @@ private:
static constexpr inline auto LOAD_IDS_METHOD = "loadIds";
static constexpr inline auto LOAD_KEYS_METHOD = "loadKeys";
static constexpr inline auto IS_MODIFIED_METHOD = "isModified";
static constexpr inline auto PING = "ping";
static constexpr inline auto SUPPORTS_SELECTIVE_LOAD_METHOD = "supportsSelectiveLoad";
Poco::URI createRequestURI(const String & method) const;
static String getDictIdsString(const std::vector<UInt64> & ids);
Poco::Logger * log;
const Block sample_block;
const Poco::Util::AbstractConfiguration & config;
const Poco::Timespan http_timeout;
LibraryInitData library_data;
Field dictionary_id;
std::string bridge_host;
size_t bridge_port;
bool library_initialized = false;
ConnectionTimeouts http_timeouts;
};
}

View File

@ -60,20 +60,33 @@ public:
static constexpr inline auto SCHEMA_ALLOWED_HANDLER = "/schema_allowed";
XDBCBridgeHelper(
ContextPtr context_,
Poco::Timespan http_timeout_,
const std::string & connection_string_)
: IXDBCBridgeHelper(context_->getGlobalContext())
, log(&Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper"))
, connection_string(connection_string_)
, http_timeout(http_timeout_)
, config(context_->getGlobalContext()->getConfigRef())
{
bridge_host = config.getString(BridgeHelperMixin::configPrefix() + ".host", DEFAULT_HOST);
bridge_port = config.getUInt(BridgeHelperMixin::configPrefix() + ".port", DEFAULT_PORT);
}
ContextPtr context_,
Poco::Timespan http_timeout_,
const std::string & connection_string_)
: IXDBCBridgeHelper(context_->getGlobalContext())
, log(&Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper"))
, connection_string(connection_string_)
, http_timeout(http_timeout_)
, config(context_->getGlobalContext()->getConfigRef())
{
bridge_host = config.getString(BridgeHelperMixin::configPrefix() + ".host", DEFAULT_HOST);
bridge_port = config.getUInt(BridgeHelperMixin::configPrefix() + ".port", DEFAULT_PORT);
}
protected:
bool bridgeHandShake() override
{
try
{
ReadWriteBufferFromHTTP buf(getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()));
return checkString(PING_OK_ANSWER, buf);
}
catch (...)
{
return false;
}
}
auto getConnectionString() const { return connection_string; }
String getName() const override { return BridgeHelperMixin::getName(); }

View File

@ -41,6 +41,9 @@ LibraryDictionarySource::LibraryDictionarySource(
, sample_block{sample_block_}
, context(Context::createCopy(context_))
{
if (fs::path(path).is_relative())
path = fs::canonical(path);
if (created_from_ddl && !pathStartsWith(path, context->getDictionariesLibPath()))
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", path, context->getDictionariesLibPath());
@ -48,17 +51,32 @@ LibraryDictionarySource::LibraryDictionarySource(
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "LibraryDictionarySource: Can't load library {}: file doesn't exist", path);
description.init(sample_block);
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id);
auto res = bridge_helper->initLibrary(path, getLibrarySettingsString(config, config_prefix + ".settings"), getDictAttributesString());
if (!res)
LibraryBridgeHelper::LibraryInitData library_data
{
.library_path = path,
.library_settings = getLibrarySettingsString(config, config_prefix + ".settings"),
.dict_attributes = getDictAttributesString()
};
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id, library_data);
if (!bridge_helper->initLibrary())
throw Exception(ErrorCodes::EXTERNAL_LIBRARY_ERROR, "Failed to create shared library from path: {}", path);
}
LibraryDictionarySource::~LibraryDictionarySource()
{
bridge_helper->removeLibrary();
try
{
bridge_helper->removeLibrary();
}
catch (...)
{
tryLogCurrentException("LibraryDictionarySource");
}
}
@ -72,8 +90,9 @@ LibraryDictionarySource::LibraryDictionarySource(const LibraryDictionarySource &
, context(other.context)
, description{other.description}
{
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id);
bridge_helper->cloneLibrary(other.dictionary_id);
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id, other.bridge_helper->getLibraryData());
if (!bridge_helper->cloneLibrary(other.dictionary_id))
throw Exception(ErrorCodes::EXTERNAL_LIBRARY_ERROR, "Failed to clone library");
}
@ -99,7 +118,7 @@ BlockInputStreamPtr LibraryDictionarySource::loadAll()
BlockInputStreamPtr LibraryDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());
return bridge_helper->loadIds(getDictIdsString(ids));
return bridge_helper->loadIds(ids);
}
@ -147,14 +166,6 @@ String LibraryDictionarySource::getLibrarySettingsString(const Poco::Util::Abstr
}
String LibraryDictionarySource::getDictIdsString(const std::vector<UInt64> & ids)
{
WriteBufferFromOwnString out;
writeVectorBinary(ids, out);
return out.str();
}
String LibraryDictionarySource::getDictAttributesString()
{
std::vector<String> attributes_names(dict_struct.attributes.size());

View File

@ -70,8 +70,6 @@ public:
std::string toString() const override;
private:
static String getDictIdsString(const std::vector<UInt64> & ids);
String getDictAttributesString();
static String getLibrarySettingsString(const Poco::Util::AbstractConfiguration & config, const std::string & config_root);
@ -82,7 +80,7 @@ private:
const DictionaryStructure dict_struct;
const std::string config_prefix;
const std::string path;
std::string path;
const Field dictionary_id;
Block sample_block;

View File

@ -31,6 +31,56 @@ std::mutex DiskLocal::reservation_mutex;
using DiskLocalPtr = std::shared_ptr<DiskLocal>;
static void loadDiskLocalConfig(const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
String & path,
UInt64 & keep_free_space_bytes)
{
path = config.getString(config_prefix + ".path", "");
if (name == "default")
{
if (!path.empty())
throw Exception(
"\"default\" disk path should be provided in <path> not it <storage_configuration>",
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
path = context->getPath();
}
else
{
if (path.empty())
throw Exception("Disk path can not be empty. Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
if (path.back() != '/')
throw Exception("Disk path must end with /. Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
if (!FS::canRead(path) || !FS::canWrite(path))
throw Exception("There is no RW access to the disk " + name + " (" + path + ")", ErrorCodes::PATH_ACCESS_DENIED);
bool has_space_ratio = config.has(config_prefix + ".keep_free_space_ratio");
if (config.has(config_prefix + ".keep_free_space_bytes") && has_space_ratio)
throw Exception(
"Only one of 'keep_free_space_bytes' and 'keep_free_space_ratio' can be specified",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
keep_free_space_bytes = config.getUInt64(config_prefix + ".keep_free_space_bytes", 0);
if (has_space_ratio)
{
auto ratio = config.getDouble(config_prefix + ".keep_free_space_ratio");
if (ratio < 0 || ratio > 1)
throw Exception("'keep_free_space_ratio' have to be between 0 and 1", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
String tmp_path = path;
if (tmp_path.empty())
tmp_path = context->getPath();
// Create tmp disk for getting total disk space.
keep_free_space_bytes = static_cast<UInt64>(DiskLocal("tmp", tmp_path, 0).getTotalSpace() * ratio);
}
}
class DiskLocalReservation : public IReservation
{
public:
@ -317,6 +367,21 @@ SyncGuardPtr DiskLocal::getDirectorySyncGuard(const String & path) const
return std::make_unique<LocalDirectorySyncGuard>(fs::path(disk_path) / path);
}
void DiskLocal::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap &)
{
String new_disk_path;
UInt64 new_keep_free_space_bytes;
loadDiskLocalConfig(name, config, config_prefix, context, new_disk_path, new_keep_free_space_bytes);
if (disk_path != new_disk_path)
throw Exception("Disk path can't be updated from config " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
if (keep_free_space_bytes != new_keep_free_space_bytes)
keep_free_space_bytes = new_keep_free_space_bytes;
}
DiskPtr DiskLocalReservation::getDisk(size_t i) const
{
if (i != 0)
@ -334,7 +399,6 @@ void DiskLocalReservation::update(UInt64 new_size)
disk->reserved_bytes += size;
}
DiskLocalReservation::~DiskLocalReservation()
{
try
@ -369,48 +433,9 @@ void registerDiskLocal(DiskFactory & factory)
const String & config_prefix,
ContextPtr context,
const DisksMap & /*map*/) -> DiskPtr {
String path = config.getString(config_prefix + ".path", "");
if (name == "default")
{
if (!path.empty())
throw Exception(
"\"default\" disk path should be provided in <path> not it <storage_configuration>",
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
path = context->getPath();
}
else
{
if (path.empty())
throw Exception("Disk path can not be empty. Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
if (path.back() != '/')
throw Exception("Disk path must end with /. Disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
if (!FS::canRead(path) || !FS::canWrite(path))
throw Exception("There is no RW access to the disk " + name + " (" + path + ")", ErrorCodes::PATH_ACCESS_DENIED);
bool has_space_ratio = config.has(config_prefix + ".keep_free_space_ratio");
if (config.has(config_prefix + ".keep_free_space_bytes") && has_space_ratio)
throw Exception(
"Only one of 'keep_free_space_bytes' and 'keep_free_space_ratio' can be specified",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
UInt64 keep_free_space_bytes = config.getUInt64(config_prefix + ".keep_free_space_bytes", 0);
if (has_space_ratio)
{
auto ratio = config.getDouble(config_prefix + ".keep_free_space_ratio");
if (ratio < 0 || ratio > 1)
throw Exception("'keep_free_space_ratio' have to be between 0 and 1", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
String tmp_path = path;
if (tmp_path.empty())
tmp_path = context->getPath();
// Create tmp disk for getting total disk space.
keep_free_space_bytes = static_cast<UInt64>(DiskLocal("tmp", tmp_path, 0).getTotalSpace() * ratio);
}
String path;
UInt64 keep_free_space_bytes;
loadDiskLocalConfig(name, config, config_prefix, context, path, keep_free_space_bytes);
return std::make_shared<DiskLocal>(name, path, keep_free_space_bytes);
};
factory.registerDiskType("local", creator);

View File

@ -5,6 +5,7 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFile.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
@ -104,13 +105,15 @@ public:
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap &) override;
private:
bool tryReserve(UInt64 bytes);
private:
const String name;
const String disk_path;
const UInt64 keep_free_space_bytes;
std::atomic<UInt64> keep_free_space_bytes;
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;
@ -120,4 +123,5 @@ private:
Poco::Logger * log = &Poco::Logger::get("DiskLocal");
};
}

View File

@ -32,7 +32,7 @@ public:
/// Get all disks with names
const DisksMap & getDisksMap() const { return disks; }
void addToDiskMap(String name, DiskPtr disk)
void addToDiskMap(const String & name, DiskPtr disk)
{
disks.emplace(name, disk);
}

View File

@ -13,9 +13,9 @@
#include <mutex>
#include <utility>
#include <boost/noncopyable.hpp>
#include "Poco/Util/AbstractConfiguration.h"
#include <Poco/Timestamp.h>
#include <filesystem>
#include "Poco/Util/AbstractConfiguration.h"
namespace fs = std::filesystem;

View File

@ -363,7 +363,8 @@ int DiskS3::readSchemaVersion(const String & source_bucket, const String & sourc
settings->client,
source_bucket,
source_path + SCHEMA_VERSION_OBJECT,
settings->s3_max_single_read_retries);
settings->s3_max_single_read_retries,
DBMS_DEFAULT_BUFFER_SIZE);
readIntText(version, buffer);

View File

@ -1218,17 +1218,36 @@ public:
{
return res;
}
else if ((isColumnedAsDecimal(left_type) || isColumnedAsDecimal(right_type))
// Comparing Date and DateTime64 requires implicit conversion,
// otherwise Date is treated as number.
&& !(date_and_datetime && (isDate(left_type) || isDate(right_type))))
else if ((isColumnedAsDecimal(left_type) || isColumnedAsDecimal(right_type)))
{
// compare
if (!allowDecimalComparison(left_type, right_type) && !date_and_datetime)
throw Exception("No operation " + getName() + " between " + left_type->getName() + " and " + right_type->getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
// Comparing Date and DateTime64 requires implicit conversion,
if (date_and_datetime && (isDate(left_type) || isDate(right_type)))
{
DataTypePtr common_type = getLeastSupertype({left_type, right_type});
ColumnPtr c0_converted = castColumn(col_with_type_and_name_left, common_type);
ColumnPtr c1_converted = castColumn(col_with_type_and_name_right, common_type);
return executeDecimal({c0_converted, common_type, "left"}, {c1_converted, common_type, "right"});
}
else
{
// compare
if (!allowDecimalComparison(left_type, right_type) && !date_and_datetime)
throw Exception(
"No operation " + getName() + " between " + left_type->getName() + " and " + right_type->getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return executeDecimal(col_with_type_and_name_left, col_with_type_and_name_right);
}
return executeDecimal(col_with_type_and_name_left, col_with_type_and_name_right);
}
else if (date_and_datetime)
{
DataTypePtr common_type = getLeastSupertype({left_type, right_type});
ColumnPtr c0_converted = castColumn(col_with_type_and_name_left, common_type);
ColumnPtr c1_converted = castColumn(col_with_type_and_name_right, common_type);
if (!((res = executeNumLeftType<UInt32>(c0_converted.get(), c1_converted.get()))
|| (res = executeNumLeftType<UInt64>(c0_converted.get(), c1_converted.get()))))
throw Exception("Date related common types can only be UInt32 or UInt64", ErrorCodes::LOGICAL_ERROR);
return res;
}
else if (left_type->equals(*right_type))
{

View File

@ -43,7 +43,7 @@ public:
const String & bucket_,
const String & key_,
UInt64 max_single_read_retries_,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
size_t buffer_size_);
bool nextImpl() override;

View File

@ -63,7 +63,7 @@ public:
return;
bool is_table = false;
ASTPtr subquery_or_table_name = ast; /// ASTTableIdentifier | ASTSubquery | ASTTableExpression
ASTPtr subquery_or_table_name; /// ASTTableIdentifier | ASTSubquery | ASTTableExpression
if (const auto * ast_table_expr = ast->as<ASTTableExpression>())
{
@ -76,7 +76,14 @@ public:
}
}
else if (ast->as<ASTTableIdentifier>())
{
subquery_or_table_name = ast;
is_table = true;
}
else if (ast->as<ASTSubquery>())
{
subquery_or_table_name = ast;
}
if (!subquery_or_table_name)
throw Exception("Global subquery requires subquery or table name", ErrorCodes::WRONG_GLOBAL_SUBQUERY);

View File

@ -37,7 +37,7 @@ public:
virtual size_t getTotalRowCount() const = 0;
virtual size_t getTotalByteCount() const = 0;
virtual bool alwaysReturnsEmptySet() const { return false; }
virtual bool alwaysReturnsEmptySet() const = 0;
/// StorageJoin/Dictionary is already filled. No need to call addJoinedBlock.
/// Different query plan is used for such joins.

View File

@ -551,8 +551,6 @@ std::vector<TableNeededColumns> normalizeColumnNamesExtractNeeded(
else
needed_columns[*table_pos].no_clashes.emplace(ident->shortName());
}
else if (!got_alias)
throw Exception("Unknown column name '" + ident->name() + "'", ErrorCodes::UNKNOWN_IDENTIFIER);
}
return needed_columns;

View File

@ -32,6 +32,8 @@ public:
size_t getTotalRowCount() const override { return right_blocks.row_count; }
size_t getTotalByteCount() const override { return right_blocks.bytes; }
/// Has to be called only after setTotals()/mergeRightBlocks()
bool alwaysReturnsEmptySet() const override { return (is_right || is_inner) && min_max_right_blocks.empty(); }
BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const override;

View File

@ -5,6 +5,7 @@
#include <Columns/ColumnsNumber.h>
#include <Common/CurrentThread.h>
#include <Common/SettingsChanges.h>
#include <Common/setThreadName.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataStreams/PushingToSinkBlockOutputStream.h>
@ -480,6 +481,40 @@ namespace
};
/// A boolean state protected by mutex able to wait until other thread sets it to a specific value.
class BoolState
{
public:
explicit BoolState(bool initial_value) : value(initial_value) {}
bool get() const
{
std::lock_guard lock{mutex};
return value;
}
void set(bool new_value)
{
std::lock_guard lock{mutex};
if (value == new_value)
return;
value = new_value;
changed.notify_all();
}
void wait(bool wanted_value) const
{
std::unique_lock lock{mutex};
changed.wait(lock, [this, wanted_value]() { return value == wanted_value; });
}
private:
bool value;
mutable std::mutex mutex;
mutable std::condition_variable changed;
};
/// Handles a connection after a responder is started (i.e. after getting a new call).
class Call
{
@ -564,18 +599,15 @@ namespace
UInt64 waited_for_client_writing = 0;
/// The following fields are accessed both from call_thread and queue_thread.
std::atomic<bool> reading_query_info = false;
BoolState reading_query_info{false};
std::atomic<bool> failed_to_read_query_info = false;
GRPCQueryInfo next_query_info_while_reading;
std::atomic<bool> want_to_cancel = false;
std::atomic<bool> check_query_info_contains_cancel_only = false;
std::atomic<bool> sending_result = false;
BoolState sending_result{false};
std::atomic<bool> failed_to_send_result = false;
ThreadFromGlobalPool call_thread;
std::condition_variable read_finished;
std::condition_variable write_finished;
std::mutex dummy_mutex; /// Doesn't protect anything.
};
Call::Call(CallType call_type_, std::unique_ptr<BaseResponder> responder_, IServer & iserver_, Poco::Logger * log_)
@ -610,6 +642,7 @@ namespace
{
try
{
setThreadName("GRPCServerCall");
receiveQuery();
executeQuery();
processInput();
@ -1230,8 +1263,7 @@ namespace
{
auto start_reading = [&]
{
assert(!reading_query_info);
reading_query_info = true;
reading_query_info.set(true);
responder->read(next_query_info_while_reading, [this](bool ok)
{
/// Called on queue_thread.
@ -1256,18 +1288,16 @@ namespace
/// on queue_thread.
failed_to_read_query_info = true;
}
reading_query_info = false;
read_finished.notify_one();
reading_query_info.set(false);
});
};
auto finish_reading = [&]
{
if (reading_query_info)
if (reading_query_info.get())
{
Stopwatch client_writing_watch;
std::unique_lock lock{dummy_mutex};
read_finished.wait(lock, [this] { return !reading_query_info; });
reading_query_info.wait(false);
waited_for_client_writing += client_writing_watch.elapsedNanoseconds();
}
throwIfFailedToReadQueryInfo();
@ -1430,11 +1460,10 @@ namespace
/// Wait for previous write to finish.
/// (gRPC doesn't allow to start sending another result while the previous is still being sending.)
if (sending_result)
if (sending_result.get())
{
Stopwatch client_reading_watch;
std::unique_lock lock{dummy_mutex};
write_finished.wait(lock, [this] { return !sending_result; });
sending_result.wait(false);
waited_for_client_reading += client_reading_watch.elapsedNanoseconds();
}
throwIfFailedToSendResult();
@ -1445,14 +1474,13 @@ namespace
if (write_buffer)
write_buffer->finalize();
sending_result = true;
sending_result.set(true);
auto callback = [this](bool ok)
{
/// Called on queue_thread.
if (!ok)
failed_to_send_result = true;
sending_result = false;
write_finished.notify_one();
sending_result.set(false);
};
Stopwatch client_reading_final_watch;
@ -1472,8 +1500,7 @@ namespace
if (send_final_message)
{
/// Wait until the result is actually sent.
std::unique_lock lock{dummy_mutex};
write_finished.wait(lock, [this] { return !sending_result; });
sending_result.wait(false);
waited_for_client_reading += client_reading_final_watch.elapsedNanoseconds();
throwIfFailedToSendResult();
LOG_TRACE(log, "Final result has been sent to the client");
@ -1584,7 +1611,7 @@ private:
{
/// Called on call_thread. That's why we can't destroy the `call` right now
/// (thread can't join to itself). Thus here we only move the `call` from
/// `current_call` to `finished_calls` and run() will actually destroy the `call`.
/// `current_calls` to `finished_calls` and run() will actually destroy the `call`.
std::lock_guard lock{mutex};
auto it = current_calls.find(call);
finished_calls.push_back(std::move(it->second));
@ -1593,6 +1620,7 @@ private:
void run()
{
setThreadName("GRPCServerQueue");
while (true)
{
{

View File

@ -1,4 +1,5 @@
#include "StorageMongoDB.h"
#include "StorageMongoDBSocketFactory.h"
#include <Poco/MongoDB/Connection.h>
#include <Poco/MongoDB/Cursor.h>
@ -33,6 +34,7 @@ StorageMongoDB::StorageMongoDB(
const std::string & collection_name_,
const std::string & username_,
const std::string & password_,
const std::string & options_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment)
@ -43,6 +45,8 @@ StorageMongoDB::StorageMongoDB(
, collection_name(collection_name_)
, username(username_)
, password(password_)
, options(options_)
, uri("mongodb://" + host_ + ":" + std::to_string(port_) + "/" + database_name_ + "?" + options_)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
@ -56,7 +60,10 @@ void StorageMongoDB::connectIfNotConnected()
{
std::lock_guard lock{connection_mutex};
if (!connection)
connection = std::make_shared<Poco::MongoDB::Connection>(host, port);
{
StorageMongoDBSocketFactory factory;
connection = std::make_shared<Poco::MongoDB::Connection>(uri, factory);
}
if (!authenticated)
{
@ -102,9 +109,9 @@ void registerStorageMongoDB(StorageFactory & factory)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 5)
if (engine_args.size() < 5 || engine_args.size() > 6)
throw Exception(
"Storage MongoDB requires 5 parameters: MongoDB('host:port', database, collection, 'user', 'password').",
"Storage MongoDB requires from 5 to 6 parameters: MongoDB('host:port', database, collection, 'user', 'password' [, 'options']).",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & engine_arg : engine_args)
@ -118,6 +125,11 @@ void registerStorageMongoDB(StorageFactory & factory)
const String & username = engine_args[3]->as<ASTLiteral &>().value.safeGet<String>();
const String & password = engine_args[4]->as<ASTLiteral &>().value.safeGet<String>();
String options;
if (engine_args.size() >= 6)
options = engine_args[5]->as<ASTLiteral &>().value.safeGet<String>();
return StorageMongoDB::create(
args.table_id,
parsed_host_port.first,
@ -126,6 +138,7 @@ void registerStorageMongoDB(StorageFactory & factory)
collection,
username,
password,
options,
args.columns,
args.constraints,
args.comment);

View File

@ -26,6 +26,7 @@ public:
const std::string & collection_name_,
const std::string & username_,
const std::string & password_,
const std::string & options_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment);
@ -50,6 +51,8 @@ private:
const std::string collection_name;
const std::string username;
const std::string password;
const std::string options;
const std::string uri;
std::shared_ptr<Poco::MongoDB::Connection> connection;
bool authenticated = false;

View File

@ -0,0 +1,55 @@
#include "StorageMongoDBSocketFactory.h"
#include <Common/Exception.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
#endif
#include <Poco/Net/IPAddress.h>
#include <Poco/Net/SocketAddress.h>
#if USE_SSL
# include <Poco/Net/SecureStreamSocket.h>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME;
}
Poco::Net::StreamSocket StorageMongoDBSocketFactory::createSocket(const std::string & host, int port, Poco::Timespan connectTimeout, bool secure)
{
return secure ? createSecureSocket(host, port, connectTimeout) : createPlainSocket(host, port, connectTimeout);
}
Poco::Net::StreamSocket StorageMongoDBSocketFactory::createPlainSocket(const std::string & host, int port, Poco::Timespan connectTimeout)
{
Poco::Net::SocketAddress address(host, port);
Poco::Net::StreamSocket socket;
socket.connect(address, connectTimeout);
return socket;
}
Poco::Net::StreamSocket StorageMongoDBSocketFactory::createSecureSocket(const std::string & host [[maybe_unused]], int port [[maybe_unused]], Poco::Timespan connectTimeout [[maybe_unused]])
{
#if USE_SSL
Poco::Net::SocketAddress address(host, port);
Poco::Net::SecureStreamSocket socket;
socket.connect(address, connectTimeout);
return socket;
#else
throw Exception("SSL is not enabled at build time.", ErrorCodes::FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME);
#endif
}
}

View File

@ -0,0 +1,19 @@
#pragma once
#include <Poco/MongoDB/Connection.h>
namespace DB
{
class StorageMongoDBSocketFactory : public Poco::MongoDB::Connection::SocketFactory
{
public:
virtual Poco::Net::StreamSocket createSocket(const std::string & host, int port, Poco::Timespan connectTimeout, bool secure) override;
private:
static Poco::Net::StreamSocket createPlainSocket(const std::string & host, int port, Poco::Timespan connectTimeout);
static Poco::Net::StreamSocket createSecureSocket(const std::string & host, int port, Poco::Timespan connectTimeout);
};
}

View File

@ -208,7 +208,8 @@ bool StorageS3Source::initialize()
file_path = fs::path(bucket) / current_key;
read_buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>(client, bucket, current_key, max_single_read_retries), chooseCompressionMethod(current_key, compression_hint));
std::make_unique<ReadBufferFromS3>(client, bucket, current_key, max_single_read_retries, DBMS_DEFAULT_BUFFER_SIZE),
chooseCompressionMethod(current_key, compression_hint));
auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, getContext(), max_block_size);
pipeline = std::make_unique<QueryPipeline>();
pipeline->init(Pipe(input_format));

View File

@ -50,6 +50,7 @@ NamesAndTypesList StorageSystemUsers::getNamesAndTypes()
{"grantees_any", std::make_shared<DataTypeUInt8>()},
{"grantees_list", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"grantees_except", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"default_database", std::make_shared<DataTypeString>()},
};
return names_and_types;
}
@ -85,6 +86,7 @@ void StorageSystemUsers::fillData(MutableColumns & res_columns, ContextPtr conte
auto & column_grantees_list_offsets = assert_cast<ColumnArray &>(*res_columns[column_index++]).getOffsets();
auto & column_grantees_except = assert_cast<ColumnString &>(assert_cast<ColumnArray &>(*res_columns[column_index]).getData());
auto & column_grantees_except_offsets = assert_cast<ColumnArray &>(*res_columns[column_index++]).getOffsets();
auto & column_default_database = assert_cast<ColumnString &>(*res_columns[column_index++]);
auto add_row = [&](const String & name,
const UUID & id,
@ -92,7 +94,8 @@ void StorageSystemUsers::fillData(MutableColumns & res_columns, ContextPtr conte
const Authentication & authentication,
const AllowedClientHosts & allowed_hosts,
const RolesOrUsersSet & default_roles,
const RolesOrUsersSet & grantees)
const RolesOrUsersSet & grantees,
const String default_database)
{
column_name.insertData(name.data(), name.length());
column_id.push_back(id.toUnderType());
@ -180,6 +183,8 @@ void StorageSystemUsers::fillData(MutableColumns & res_columns, ContextPtr conte
for (const auto & except_name : grantees_ast->except_names)
column_grantees_except.insertData(except_name.data(), except_name.length());
column_grantees_except_offsets.push_back(column_grantees_except.size());
column_default_database.insertData(default_database.data(),default_database.length());
};
for (const auto & id : ids)
@ -192,7 +197,8 @@ void StorageSystemUsers::fillData(MutableColumns & res_columns, ContextPtr conte
if (!storage)
continue;
add_row(user->getName(), id, storage->getStorageName(), user->authentication, user->allowed_client_hosts, user->default_roles, user->grantees);
add_row(user->getName(), id, storage->getStorageName(), user->authentication, user->allowed_client_hosts,
user->default_roles, user->grantees, user->default_database);
}
}

View File

@ -27,6 +27,7 @@ except ImportError:
import random
import string
import multiprocessing
import socket
from contextlib import closing
USE_JINJA = True
@ -267,6 +268,9 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
os.system("LC_ALL=C sed -i -e 's|/auto_{{shard}}||g' {file}".format(file=stdout_file))
os.system("LC_ALL=C sed -i -e 's|auto_{{replica}}||g' {file}".format(file=stdout_file))
# Normalize hostname in stdout file.
os.system("LC_ALL=C sed -i -e 's/{hostname}/localhost/g' {file}".format(hostname=socket.gethostname(), file=stdout_file))
stdout = open(stdout_file, 'rb').read() if os.path.exists(stdout_file) else b''
stdout = str(stdout, errors='replace', encoding='utf-8')
stderr = open(stderr_file, 'rb').read() if os.path.exists(stderr_file) else b''

View File

@ -110,6 +110,7 @@ def subprocess_check_call(args, detach=False, nothrow=False):
#logging.info('run:' + ' '.join(args))
return run_and_check(args, detach=detach, nothrow=nothrow)
def get_odbc_bridge_path():
path = os.environ.get('CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH')
if path is None:
@ -261,6 +262,7 @@ class ClickHouseCluster:
self.with_hdfs = False
self.with_kerberized_hdfs = False
self.with_mongo = False
self.with_mongo_secure = False
self.with_net_trics = False
self.with_redis = False
self.with_cassandra = False
@ -548,7 +550,6 @@ class ClickHouseCluster:
return self.base_mysql_client_cmd
def setup_mysql_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_mysql = True
env_variables['MYSQL_HOST'] = self.mysql_host
@ -680,6 +681,17 @@ class ClickHouseCluster:
'--file', p.join(docker_compose_yml_dir, 'docker_compose_rabbitmq.yml')]
return self.base_rabbitmq_cmd
def setup_mongo_secure_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_mongo = self.with_mongo_secure = True
env_variables['MONGO_HOST'] = self.mongo_host
env_variables['MONGO_EXTERNAL_PORT'] = str(self.mongo_port)
env_variables['MONGO_INTERNAL_PORT'] = "27017"
env_variables['MONGO_CONFIG_PATH'] = HELPERS_DIR
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_mongo_secure.yml')])
self.base_mongo_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_mongo_secure.yml')]
return self.base_mongo_cmd
def setup_mongo_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_mongo = True
env_variables['MONGO_HOST'] = self.mongo_host
@ -723,7 +735,8 @@ class ClickHouseCluster:
macros=None, with_zookeeper=False, with_zookeeper_secure=False,
with_mysql_client=False, with_mysql=False, with_mysql8=False, with_mysql_cluster=False,
with_kafka=False, with_kerberized_kafka=False, with_rabbitmq=False, clickhouse_path_dir=None,
with_odbc_drivers=False, with_postgres=False, with_postgres_cluster=False, with_hdfs=False, with_kerberized_hdfs=False, with_mongo=False,
with_odbc_drivers=False, with_postgres=False, with_postgres_cluster=False, with_hdfs=False,
with_kerberized_hdfs=False, with_mongo=False, with_mongo_secure=False,
with_redis=False, with_minio=False, with_cassandra=False, with_jdbc_bridge=False,
hostname=None, env_variables=None, image="yandex/clickhouse-integration-test", tag=None,
stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, tmpfs=None,
@ -776,7 +789,7 @@ class ClickHouseCluster:
with_kerberized_kafka=with_kerberized_kafka,
with_rabbitmq=with_rabbitmq,
with_kerberized_hdfs=with_kerberized_hdfs,
with_mongo=with_mongo,
with_mongo=with_mongo or with_mongo_secure,
with_redis=with_redis,
with_minio=with_minio,
with_cassandra=with_cassandra,
@ -861,8 +874,11 @@ class ClickHouseCluster:
if with_kerberized_hdfs and not self.with_kerberized_hdfs:
cmds.append(self.setup_kerberized_hdfs_cmd(instance, env_variables, docker_compose_yml_dir))
if with_mongo and not self.with_mongo:
cmds.append(self.setup_mongo_cmd(instance, env_variables, docker_compose_yml_dir))
if (with_mongo or with_mongo_secure) and not (self.with_mongo or self.with_mongo_secure):
if with_mongo_secure:
cmds.append(self.setup_mongo_secure_cmd(instance, env_variables, docker_compose_yml_dir))
else:
cmds.append(self.setup_mongo_cmd(instance, env_variables, docker_compose_yml_dir))
if self.with_net_trics:
for cmd in cmds:
@ -1234,7 +1250,6 @@ class ClickHouseCluster:
logging.debug("Waiting for Kafka to start up")
time.sleep(1)
def wait_hdfs_to_start(self, timeout=300, check_marker=False):
start = time.time()
while time.time() - start < timeout:
@ -1251,9 +1266,11 @@ class ClickHouseCluster:
raise Exception("Can't wait HDFS to start")
def wait_mongo_to_start(self, timeout=180):
def wait_mongo_to_start(self, timeout=30, secure=False):
connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format(
host='localhost', port=self.mongo_port, user='root', password='clickhouse')
if secure:
connection_str += '/?tls=true&tlsAllowInvalidCertificates=true'
connection = pymongo.MongoClient(connection_str)
start = time.time()
while time.time() - start < timeout:
@ -1320,7 +1337,6 @@ class ClickHouseCluster:
raise Exception("Can't wait Schema Registry to start")
def wait_cassandra_to_start(self, timeout=180):
self.cassandra_ip = self.get_instance_ip(self.cassandra_host)
cass_client = cassandra.cluster.Cluster([self.cassandra_ip], port=self.cassandra_port, load_balancing_policy=RoundRobinPolicy())
@ -1505,7 +1521,7 @@ class ClickHouseCluster:
if self.with_mongo and self.base_mongo_cmd:
logging.debug('Setup Mongo')
run_and_check(self.base_mongo_cmd + common_opts)
self.wait_mongo_to_start(30)
self.wait_mongo_to_start(30, secure=self.with_mongo_secure)
if self.with_redis and self.base_redis_cmd:
logging.debug('Setup Redis')

View File

@ -0,0 +1,44 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAtz2fpa8hyUff8u8jYlh20HbkOO8hQi64Ke2Prack2Br0lhOr
1MI6I8nVk5iDrt+7ix2Cnt+2aZKb6HJv0CG1V25yWg+jgsXeIT1KHTJf8rTmYxhb
t+ye+S1Z0h/Rt+xqSd9XXfzOLPGHYfyx6ZQ4AumO/HoEFD4IH/qiREjwtOfRXuhz
CohqtUTyYR7pJmZqBSuGac461WVRisnjfKRxeVa3itc84/RgktgYej2x4PQBFk13
xAXKrWmHkwdgWklTuuK8Gtoqz65Y4/J9CSl+Bd08QDdRnaVvq1u1eNTZg1BVyeRv
jFYBMSathKASrng5nK66Fdilw6tO/9khaP0SDQIDAQABAoIBAAm/5qGrKtIJ1/mW
Dbzq1g+Lc+MvngZmc/gPIsjrjsNM09y0WT0txGgpEgsTX1ZLoy/otw16+7qsSU1Z
4WcilAJ95umx0VJg8suz9iCNkJtaUrPNFPw5Q9AgQJo0hTUTCCi8EGr4y4OKqlhl
WJYEA+LryGbYmyT0k/wXmtClTOFjKS09mK4deQ1DqbBxayR9MUZgRJzEODA8eGXs
Rc6fJUenMVNMzIVLgpossRtKImoZNcf5UtCKL3HECunndQeMu4zuqLMU+EzL1F/o
iHKF7v3CVmsK0OxNJfOfT0abN3XaJttFwTJyghQjgP8OX1IKjlj3vo9xwEDfVUEf
GVIER0UCgYEA2j+kjaT3Dw2lNlBEsA8uvVlLJHBc9JBtMGduGYsIEgsL/iStqXv4
xoA9N3CwkN/rrQpDfi/16DMXAAYdjGulPwiMNFBY22TCzhtPC2mAnfaSForxwZCs
lwc3KkIloo3N5XvN78AuZf8ewiS+bOEj+HHHqqSb1+u/csuaXO9neesCgYEA1u/I
Mlt/pxJkH+c3yOskwCh/CNhq9szi8G9LXROIQ58BT2ydJSEPpt7AhUTtQGimQQTW
KLiffJSkjuVaFckR1GjCoAmFGYw9wUb+TmFNScz5pJ2dXse8aBysAMIQfEIcRAEa
gKnkLBH6nw3+/Hm3xwoBc35t8Pa2ek7LsWDfbecCgYBhilQW4gVw+t49uf4Y2ZBA
G+pTbMx+mRXTrkYssFB5D+raOLZMqxVyUdoKLxkahpkkCxRDD1hN4JeE8Ta/jVSb
KUzQDKDJ3OybhOT86rgK4SpFXO/TXL9l+FmVT17WmZ3N1Fkjr7aM60pp5lYc/zo+
TUu5XjwwcjJsMcbZhj2u5QKBgQCDNuUn4PYAP9kCJPzIWs0XxmEvPDeorZIJqFgA
3XC9n2+EVlFlHlbYz3oGofqY7Io6fUJkn7k1q+T+G4QwcozA+KeAXe90lkoJGVcc
8IfnewwYc+RjvVoG0SIsYE0CHrX0yhus2oqiYON4gGnfJkuMZk5WfKOPjH4AEuSF
SBd+lwKBgQCHG/DA6u2mYmezPF9hebWFoyAVSr2PDXDhu8cNNHCpx9GewJXhuK/P
tW8mazHzUuJKBvmaUXDIXFh4K6FNhjH16p5jR1w3hsPE7NEZhjfVRaUYPmBqaOYR
jp8H+Sh5g4Rwbtfp6Qhu6UAKi/y6Vozs5GkJtSiNrjNDVrD+sGGrXA==
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIICqDCCAZACFBdaMnuT0pWhmrh05UT3HXJ+kI0yMA0GCSqGSIb3DQEBCwUAMA0x
CzAJBgNVBAMMAmNhMB4XDTIxMDQwNjE3MDQxNVoXDTIyMDQwNjE3MDQxNVowFDES
MBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKC
AQEAtz2fpa8hyUff8u8jYlh20HbkOO8hQi64Ke2Prack2Br0lhOr1MI6I8nVk5iD
rt+7ix2Cnt+2aZKb6HJv0CG1V25yWg+jgsXeIT1KHTJf8rTmYxhbt+ye+S1Z0h/R
t+xqSd9XXfzOLPGHYfyx6ZQ4AumO/HoEFD4IH/qiREjwtOfRXuhzCohqtUTyYR7p
JmZqBSuGac461WVRisnjfKRxeVa3itc84/RgktgYej2x4PQBFk13xAXKrWmHkwdg
WklTuuK8Gtoqz65Y4/J9CSl+Bd08QDdRnaVvq1u1eNTZg1BVyeRvjFYBMSathKAS
rng5nK66Fdilw6tO/9khaP0SDQIDAQABMA0GCSqGSIb3DQEBCwUAA4IBAQAct2If
isMLHIqyL9GjY4b0xcxF4svFU/DUwNanStmoFMW1ifPf1cCqeMzyQOxBCDdMs0RT
hBbDYHW0BMXDqYIr3Ktbu38/3iVyr3pb56YOCKy8yHXpmKEaUBhCknSLcQyvNfeS
tM+DWsKFTZfyR5px+WwXbGKVMYwLaTON+/wcv1MeKMig3CxluaCpEJVYYwAiUc4K
sgvQNAunwGmPLPoXtUnpR2ZWiQA5R6yjS1oIe+8vpryFP6kjhWs0HR0jZEtLulV5
WXUuxkqTXiBIvYpsmusoR44e9rptwLbV1wL/LUScRt9ttqFM3N5/Pof+2UwkSjGB
GAyPmw0Pkqtt+lva
-----END CERTIFICATE-----

View File

@ -0,0 +1,5 @@
net:
ssl:
mode: requireSSL
PEMKeyFile: /mongo/mongo_cert.pem
allowConnectionsWithoutCertificates: true

View File

@ -8,5 +8,9 @@
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
<library_bridge_log>/var/log/clickhouse-server/clickhouse-library-bridge.log</library_bridge_log>
<library_bridge_errlog>/var/log/clickhouse-server/clickhouse-library-bridge.err.log</library_bridge_errlog>
<library_bridge_level>trace</library_bridge_level>
</logger>
</yandex>

View File

@ -2,14 +2,30 @@ import os
import os.path as p
import pytest
import time
import logging
from helpers.cluster import ClickHouseCluster, run_and_check
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
dictionaries=['configs/dictionaries/dict1.xml'],
main_configs=['configs/config.d/config.xml'])
dictionaries=['configs/dictionaries/dict1.xml'], main_configs=['configs/config.d/config.xml'], stay_alive=True)
def create_dict_simple():
instance.query('DROP DICTIONARY IF EXISTS lib_dict_c')
instance.query('''
CREATE DICTIONARY lib_dict_c (key UInt64, value1 UInt64, value2 UInt64, value3 UInt64)
PRIMARY KEY key SOURCE(library(PATH '/etc/clickhouse-server/config.d/dictionaries_lib/dict_lib.so'))
LAYOUT(CACHE(
SIZE_IN_CELLS 10000000
BLOCK_SIZE 4096
FILE_SIZE 16777216
READ_BUFFER_SIZE 1048576
MAX_STORED_KEYS 1048576))
LIFETIME(2) ;
''')
@pytest.fixture(scope="module")
def ch_cluster():
@ -98,6 +114,10 @@ def test_load_ids(ch_cluster):
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(0));''')
assert(result.strip() == '100')
# Just check bridge is ok with a large vector of random ids
instance.query('''select number, dictGet(lib_dict_c, 'value1', toUInt64(rand())) from numbers(1000);''')
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
assert(result.strip() == '101')
instance.query('DROP DICTIONARY lib_dict_c')
@ -160,6 +180,91 @@ def test_null_values(ch_cluster):
assert(result == expected)
def test_recover_after_bridge_crash(ch_cluster):
if instance.is_built_with_memory_sanitizer():
pytest.skip("Memory Sanitizer cannot work with third-party shared libraries")
create_dict_simple()
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(0));''')
assert(result.strip() == '100')
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
assert(result.strip() == '101')
instance.exec_in_container(['bash', '-c', 'kill -9 `pidof clickhouse-library-bridge`'], user='root')
instance.query('SYSTEM RELOAD DICTIONARY lib_dict_c')
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(0));''')
assert(result.strip() == '100')
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
assert(result.strip() == '101')
instance.exec_in_container(['bash', '-c', 'kill -9 `pidof clickhouse-library-bridge`'], user='root')
instance.query('DROP DICTIONARY lib_dict_c')
def test_server_restart_bridge_might_be_stil_alive(ch_cluster):
if instance.is_built_with_memory_sanitizer():
pytest.skip("Memory Sanitizer cannot work with third-party shared libraries")
create_dict_simple()
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
assert(result.strip() == '101')
instance.restart_clickhouse()
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
assert(result.strip() == '101')
instance.exec_in_container(['bash', '-c', 'kill -9 `pidof clickhouse-library-bridge`'], user='root')
instance.restart_clickhouse()
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
assert(result.strip() == '101')
instance.query('DROP DICTIONARY lib_dict_c')
def test_bridge_dies_with_parent(ch_cluster):
if instance.is_built_with_memory_sanitizer():
pytest.skip("Memory Sanitizer cannot work with third-party shared libraries")
if instance.is_built_with_address_sanitizer():
pytest.skip("Leak sanitizer falsely reports about a leak of 16 bytes in clickhouse-odbc-bridge")
create_dict_simple()
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
assert(result.strip() == '101')
clickhouse_pid = instance.get_process_pid("clickhouse server")
bridge_pid = instance.get_process_pid("library-bridge")
assert clickhouse_pid is not None
assert bridge_pid is not None
while clickhouse_pid is not None:
try:
instance.exec_in_container(["kill", str(clickhouse_pid)], privileged=True, user='root')
except:
pass
clickhouse_pid = instance.get_process_pid("clickhouse server")
time.sleep(1)
for i in range(30):
time.sleep(1)
bridge_pid = instance.get_process_pid("library-bridge")
if bridge_pid is None:
break
if bridge_pid:
out = instance.exec_in_container(["gdb", "-p", str(bridge_pid), "--ex", "thread apply all bt", "--ex", "q"],
privileged=True, user='root')
logging.debug(f"Bridge is running, gdb output:\n{out}")
assert clickhouse_pid is None
assert bridge_pid is None
instance.start_clickhouse(20)
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -7,8 +7,10 @@ import xml.etree.ElementTree as ET
import helpers.client
import helpers.cluster
from helpers.test_tools import TSV
import pytest
cluster = helpers.cluster.ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1',
@ -76,6 +78,37 @@ def add_disk(node, name, path, separate_file=False):
else:
tree.write(os.path.join(node.config_d_dir, "storage_configuration.xml"))
def update_disk(node, name, path, keep_free_space_bytes, separate_file=False):
separate_configuration_path = os.path.join(node.config_d_dir,
"separate_configuration.xml")
try:
if separate_file:
tree = ET.parse(separate_configuration_path)
else:
tree = ET.parse(
os.path.join(node.config_d_dir, "storage_configuration.xml"))
except:
tree = ET.ElementTree(
ET.fromstring('<yandex><storage_configuration><disks/><policies/></storage_configuration></yandex>'))
root = tree.getroot()
disk = root.find("storage_configuration").find("disks").find(name)
assert disk is not None
new_path = disk.find("path")
assert new_path is not None
new_path.text = path
new_keep_free_space_bytes = disk.find("keep_free_space_bytes")
assert new_keep_free_space_bytes is not None
new_keep_free_space_bytes.text = keep_free_space_bytes
if separate_file:
tree.write(separate_configuration_path)
else:
tree.write(os.path.join(node.config_d_dir, "storage_configuration.xml"))
def add_policy(node, name, volumes):
tree = ET.parse(os.path.join(node.config_d_dir, "storage_configuration.xml"))
@ -123,6 +156,36 @@ def test_add_disk(started_cluster):
except:
""""""
def test_update_disk(started_cluster):
try:
name = "test_update_disk"
engine = "MergeTree()"
start_over()
node1.restart_clickhouse(kill=True)
time.sleep(2)
node1.query("""
CREATE TABLE {name} (
d UInt64
) ENGINE = {engine}
ORDER BY d
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
assert node1.query("SELECT path, keep_free_space FROM system.disks where name = 'jbod2'") == TSV([
["/jbod2/", "10485760"]])
update_disk(node1, "jbod2", "/jbod2/", "20971520")
node1.query("SYSTEM RELOAD CONFIG")
assert node1.query("SELECT path, keep_free_space FROM system.disks where name = 'jbod2'") == TSV([
["/jbod2/", "20971520"]])
finally:
try:
node1.query("DROP TABLE IF EXISTS {}".format(name))
except:
""""""
def test_add_disk_to_separate_config(started_cluster):
try:

View File

@ -0,0 +1,8 @@
<yandex>
<openSSL>
<client>
<!-- For self-signed certificate -->
<verificationMode>none</verificationMode>
</client>
</openSSL>
</yandex>

View File

@ -5,24 +5,29 @@ from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', with_mongo=True)
@pytest.fixture(scope="module")
def started_cluster():
def started_cluster(request):
try:
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node',
main_configs=["configs_secure/config.d/ssl_conf.xml"],
with_mongo=True,
with_mongo_secure=request.param)
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_mongo_connection(started_cluster):
def get_mongo_connection(started_cluster, secure=False):
connection_str = 'mongodb://root:clickhouse@localhost:{}'.format(started_cluster.mongo_port)
if secure:
connection_str += '/?tls=true&tlsAllowInvalidCertificates=true'
return pymongo.MongoClient(connection_str)
@pytest.mark.parametrize('started_cluster', [False], indirect=['started_cluster'])
def test_simple_select(started_cluster):
mongo_connection = get_mongo_connection(started_cluster)
db = mongo_connection['test']
@ -33,6 +38,7 @@ def test_simple_select(started_cluster):
data.append({'key': i, 'data': hex(i * i)})
simple_mongo_table.insert_many(data)
node = started_cluster.instances['node']
node.query(
"CREATE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse')")
@ -42,6 +48,7 @@ def test_simple_select(started_cluster):
assert node.query("SELECT data from simple_mongo_table where key = 42") == hex(42 * 42) + '\n'
@pytest.mark.parametrize('started_cluster', [False], indirect=['started_cluster'])
def test_complex_data_type(started_cluster):
mongo_connection = get_mongo_connection(started_cluster)
db = mongo_connection['test']
@ -52,6 +59,7 @@ def test_complex_data_type(started_cluster):
data.append({'key': i, 'data': hex(i * i), 'dict': {'a': i, 'b': str(i)}})
incomplete_mongo_table.insert_many(data)
node = started_cluster.instances['node']
node.query(
"CREATE TABLE incomplete_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'complex_table', 'root', 'clickhouse')")
@ -61,6 +69,7 @@ def test_complex_data_type(started_cluster):
assert node.query("SELECT data from incomplete_mongo_table where key = 42") == hex(42 * 42) + '\n'
@pytest.mark.parametrize('started_cluster', [False], indirect=['started_cluster'])
def test_incorrect_data_type(started_cluster):
mongo_connection = get_mongo_connection(started_cluster)
db = mongo_connection['test']
@ -71,6 +80,7 @@ def test_incorrect_data_type(started_cluster):
data.append({'key': i, 'data': hex(i * i), 'aaaa': 'Hello'})
strange_mongo_table.insert_many(data)
node = started_cluster.instances['node']
node.query(
"CREATE TABLE strange_mongo_table(key String, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'strange_table', 'root', 'clickhouse')")
@ -85,3 +95,24 @@ def test_incorrect_data_type(started_cluster):
with pytest.raises(QueryRuntimeException):
node.query("SELECT bbbb FROM strange_mongo_table2")
@pytest.mark.parametrize('started_cluster', [True], indirect=['started_cluster'])
def test_secure_connection(started_cluster):
mongo_connection = get_mongo_connection(started_cluster, secure=True)
db = mongo_connection['test']
db.add_user('root', 'clickhouse')
simple_mongo_table = db['simple_table']
data = []
for i in range(0, 100):
data.append({'key': i, 'data': hex(i * i)})
simple_mongo_table.insert_many(data)
node = started_cluster.instances['node']
node.query(
"CREATE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse', 'ssl=true')")
assert node.query("SELECT COUNT() FROM simple_mongo_table") == '100\n'
assert node.query("SELECT sum(key) FROM simple_mongo_table") == str(sum(range(0, 100))) + '\n'
assert node.query("SELECT data from simple_mongo_table where key = 42") == hex(42 * 42) + '\n'

View File

@ -2,4 +2,5 @@
<query>SELECT count() FROM numbers(1000000000) WHERE materialize(now()) > toString(toDateTime('2020-09-30 00:00:00'))</query>
<query>SELECT count() FROM numbers(1000000000) WHERE materialize(now()) > toUInt32(toDateTime('2020-09-30 00:00:00'))</query>
<query>SELECT count() FROM numbers(1000000000) WHERE materialize(now()) > toDateTime('2020-09-30 00:00:00')</query>
<query>SELECT count() FROM numbers(1000000000) WHERE materialize(now()) > toDate('2020-09-30 00:00:00')</query>
</test>

View File

@ -48,3 +48,4 @@
6 6 60 60 66 66 120
12 12 120 120 132 132 240
18 18 180 180 198 198 360
1

View File

@ -2,6 +2,7 @@ DROP TABLE IF EXISTS table1;
DROP TABLE IF EXISTS table2;
DROP TABLE IF EXISTS table3;
DROP TABLE IF EXISTS table5;
DROP TABLE IF EXISTS table_set;
CREATE TABLE table1 (a UInt32) ENGINE = Memory;
CREATE TABLE table2 (a UInt32, b UInt32) ENGINE = Memory;
@ -76,6 +77,18 @@ join table3 as t3 on t2_b = t3_b;
--join table2 as t2 on t1_t2_x = t2.a
--join table3 as t3 on t1_t3_x = t2_t3_x;
CREATE TABLE table_set ( x UInt32 ) ENGINE = Set;
INSERT INTO table_set VALUES (0), (1), (2);
select count()
from table1 as t1
join table2 as t2 on t1.a = t2.a
join table3 as t3 on t2.b = t3.b
join table5 as t5 on t3.c = t5.c
WHERE t1.a in table_set;
DROP TABLE table_set;
DROP TABLE table1;
DROP TABLE table2;
DROP TABLE table3;

View File

@ -15,3 +15,20 @@ multiple sets INNER JOIN empty set AND IN non-empty set 0
multiple sets INNER JOIN non-empty set AND IN non-empty set 1
IN empty set equals 0 10
IN empty set sum if 10
IN empty set 0
IN non-empty set 1
NOT IN empty set 10
INNER JOIN empty set 0
INNER JOIN non-empty set 1
RIGHT JOIN empty set 0
RIGHT JOIN non-empty set 1
LEFT JOIN empty set 10
LEFT JOIN non-empty set 10
multiple sets IN empty set OR IN non-empty set 1
multiple sets IN empty set OR NOT IN non-empty set 9
multiple sets NOT IN empty set AND IN non-empty set 1
multiple sets INNER JOIN empty set AND IN empty set 0
multiple sets INNER JOIN empty set AND IN non-empty set 0
multiple sets INNER JOIN non-empty set AND IN non-empty set 1
IN empty set equals 0 10
IN empty set sum if 10

View File

@ -1,5 +1,9 @@
SET joined_subquery_requires_alias = 0;
{% for join_algorithm in ['partial_merge', 'hash'] -%}
SET join_algorithm = '{{ join_algorithm }}';
SELECT 'IN empty set',count() FROM system.numbers WHERE number IN (SELECT toUInt64(1) WHERE 0);
SELECT 'IN non-empty set',count() FROM (SELECT number FROM system.numbers LIMIT 10) t1 WHERE t1.number IN (SELECT toUInt64(1) WHERE 1);
SELECT 'NOT IN empty set',count() FROM (SELECT number FROM system.numbers LIMIT 10) WHERE number NOT IN (SELECT toUInt64(1) WHERE 0);
@ -22,3 +26,5 @@ SELECT 'multiple sets INNER JOIN non-empty set AND IN non-empty set',count() FRO
SELECT 'IN empty set equals 0', count() FROM numbers(10) WHERE (number IN (SELECT toUInt64(1) WHERE 0)) = 0;
SELECT 'IN empty set sum if', sum(if(number IN (SELECT toUInt64(1) WHERE 0), 2, 1)) FROM numbers(10);
{% endfor -%}

View File

@ -75,8 +75,8 @@ $CLICKHOUSE_CLIENT --query="DROP TABLE dst;"
$CLICKHOUSE_CLIENT --query="SELECT 'MOVE incompatible schema different order by';"
$CLICKHOUSE_CLIENT --query="CREATE TABLE src (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CURR_DATABASE/src3', '1') PARTITION BY p ORDER BY (p, k, d);"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CURR_DATABASE/dst3', '1') PARTITION BY p ORDER BY (d, k, p);"
$CLICKHOUSE_CLIENT --query="CREATE TABLE src (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/src3', '1') PARTITION BY p ORDER BY (p, k, d);"
$CLICKHOUSE_CLIENT --query="CREATE TABLE dst (p UInt64, k String, d UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst3', '1') PARTITION BY p ORDER BY (d, k, p);"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (0, '0', 1);"

View File

@ -0,0 +1,3 @@
create user if not exists u_02001 default database system;
select default_database from system.users where name = 'u_02001';
drop user if exists u_02001;

View File

@ -0,0 +1,2 @@
localhost
localhost 2

View File

@ -0,0 +1,2 @@
select hostname();
select hostName() h, count() from cluster(test_cluster_two_shards, system.one) group by h;

View File

@ -0,0 +1,5 @@
SELECT
cityHash64(number GLOBAL IN (NULL, -2147483648, -9223372036854775808), nan, 1024, NULL, NULL, 1.000100016593933, NULL),
(NULL, cityHash64(inf, -2147483648, NULL, NULL, 10.000100135803223), cityHash64(1.1754943508222875e-38, NULL, NULL, NULL), 2147483647)
FROM cluster(test_cluster_two_shards_localhost, numbers((NULL, cityHash64(0., 65536, NULL, NULL, 10000000000., NULL), 0) GLOBAL IN (some_identifier), 65536))
WHERE number GLOBAL IN [1025] --{serverError 284}

View File

@ -526,6 +526,7 @@
"01902_table_function_merge_db_repr",
"01946_test_zstd_decompression_with_escape_sequence_at_the_end_of_buffer",
"01946_test_wrong_host_name_access",
"01213_alter_rename_with_default_zookeeper" /// Warning: Removing leftovers from table.
"01213_alter_rename_with_default_zookeeper", /// Warning: Removing leftovers from table.
"02001_add_default_database_to_system_users" ///create user
]
}