Merge branch 'master' into run_func_tests_in_parallel

This commit is contained in:
alesapin 2020-10-09 10:25:35 +03:00
commit 2b66180c70
114 changed files with 1506 additions and 576 deletions

2
.gitmodules vendored
View File

@ -158,7 +158,7 @@
url = https://github.com/openldap/openldap.git
[submodule "contrib/AMQP-CPP"]
path = contrib/AMQP-CPP
url = https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
url = https://github.com/ClickHouse-Extras/AMQP-CPP.git
[submodule "contrib/cassandra"]
path = contrib/cassandra
url = https://github.com/ClickHouse-Extras/cpp-driver.git

View File

@ -300,6 +300,11 @@ if (COMPILER_CLANG)
option(ENABLE_THINLTO "Clang-specific link time optimization" ON)
endif()
# Set new experimental pass manager, it's a performance, build time and binary size win.
# Can be removed after https://reviews.llvm.org/D66490 merged and released to at least two versions of clang.
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fexperimental-new-pass-manager")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fexperimental-new-pass-manager")
# We cannot afford to use LTO when compiling unit tests, and it's not enough
# to only supply -fno-lto at the final linking stage. So we disable it
# completely.

View File

@ -21,8 +21,8 @@ void Pool::Entry::incrementRefCount()
{
if (!data)
return;
++data->ref_count;
if (data->ref_count == 1)
/// First reference, initialize thread
if (data->ref_count.fetch_add(1) == 0)
mysql_thread_init();
}
@ -30,12 +30,10 @@ void Pool::Entry::decrementRefCount()
{
if (!data)
return;
if (data->ref_count > 0)
{
--data->ref_count;
if (data->ref_count == 0)
mysql_thread_end();
}
/// We were the last user of this thread, deinitialize it
if (data->ref_count.fetch_sub(1) == 1)
mysql_thread_end();
}

View File

@ -3,6 +3,7 @@
#include <list>
#include <memory>
#include <mutex>
#include <atomic>
#include <Poco/Exception.h>
#include <mysqlxx/Connection.h>
@ -35,7 +36,9 @@ protected:
struct Connection
{
mysqlxx::Connection conn;
int ref_count = 0;
/// Ref count modified in constructor/descructor of Entry
/// but also read in pool code.
std::atomic<int> ref_count = 0;
};
public:

View File

@ -31,6 +31,7 @@ if (COMPILER_CLANG)
add_warning(pedantic)
no_warning(vla-extension)
no_warning(zero-length-array)
no_warning(c11-extensions)
add_warning(comma)
add_warning(conditional-uninitialized)

2
contrib/AMQP-CPP vendored

@ -1 +1 @@
Subproject commit 1c08399ab0ab9e4042ef8e2bbe9e208e5dcbc13b
Subproject commit d63e1f016582e9faaaf279aa24513087a07bc6e7

View File

@ -16,6 +16,7 @@ set (SRCS
${LIBRARY_DIR}/src/flags.cpp
${LIBRARY_DIR}/src/linux_tcp/openssl.cpp
${LIBRARY_DIR}/src/linux_tcp/tcpconnection.cpp
${LIBRARY_DIR}/src/inbuffer.cpp
${LIBRARY_DIR}/src/receivedframe.cpp
${LIBRARY_DIR}/src/table.cpp
${LIBRARY_DIR}/src/watchable.cpp

2
contrib/libhdfs3 vendored

@ -1 +1 @@
Subproject commit 1b666578c85094306b061352078022f6350bfab8
Subproject commit 24b058c356794ef6cc2d31323dc9adf0386652ff

@ -1 +1 @@
Subproject commit 3f512fedf0ba0f769a1b4852b4bac542d92c5b20
Subproject commit f5638e954a79f50bac7c7a5deaa5a241e0ce8b5f

View File

@ -11,7 +11,7 @@ RUN apt-get update \
&& echo "${LLVM_PUBKEY_HASH} /tmp/llvm-snapshot.gpg.key" | sha384sum -c \
&& apt-key add /tmp/llvm-snapshot.gpg.key \
&& export CODENAME="$(lsb_release --codename --short | tr 'A-Z' 'a-z')" \
&& echo "deb [trusted=yes] http://apt.llvm.org/${CODENAME}/ llvm-toolchain-${CODENAME}-${LLVM_VERSION} main" >> \
&& echo "deb [trusted=yes] https://apt.llvm.org/${CODENAME}/ llvm-toolchain-${CODENAME}-${LLVM_VERSION} main" >> \
/etc/apt/sources.list
# initial packages

View File

@ -20,6 +20,7 @@ toc_title: Client Libraries
- [simpod/clickhouse-client](https://packagist.org/packages/simpod/clickhouse-client)
- [seva-code/php-click-house-client](https://packagist.org/packages/seva-code/php-click-house-client)
- [SeasClick C++ client](https://github.com/SeasX/SeasClick)
- [one-ck](https://github.com/lizhichao/one-ck)
- Go
- [clickhouse](https://github.com/kshvakov/clickhouse/)
- [go-clickhouse](https://github.com/roistat/go-clickhouse)

View File

@ -70,6 +70,35 @@ Works with tables in the MergeTree family.
If `force_primary_key=1`, ClickHouse checks to see if the query has a primary key condition that can be used for restricting data ranges. If there is no suitable condition, it throws an exception. However, it does not check whether the condition reduces the amount of data to read. For more information about data ranges in MergeTree tables, see [MergeTree](../../engines/table-engines/mergetree-family/mergetree.md).
## force\_data\_skipping\_indices {#settings-force_data_skipping_indices}
Disables query execution if passed data skipping indices wasn't used.
Consider the following example:
```sql
CREATE TABLE data
(
key Int,
d1 Int,
d1_null Nullable(Int),
INDEX d1_idx d1 TYPE minmax GRANULARITY 1,
INDEX d1_null_idx assumeNotNull(d1_null) TYPE minmax GRANULARITY 1
)
Engine=MergeTree()
ORDER BY key;
SELECT * FROM data_01515;
SELECT * FROM data_01515 SETTINGS force_data_skipping_indices=''; -- query will produce CANNOT_PARSE_TEXT error.
SELECT * FROM data_01515 SETTINGS force_data_skipping_indices='d1_idx'; -- query will produce INDEX_NOT_USED error.
SELECT * FROM data_01515 WHERE d1 = 0 SETTINGS force_data_skipping_indices='d1_idx'; -- Ok.
SELECT * FROM data_01515 WHERE d1 = 0 SETTINGS force_data_skipping_indices='`d1_idx`'; -- Ok (example of full featured parser).
SELECT * FROM data_01515 WHERE d1 = 0 SETTINGS force_data_skipping_indices='`d1_idx`, d1_null_idx'; -- query will produce INDEX_NOT_USED error, since d1_null_idx is not used.
SELECT * FROM data_01515 WHERE d1 = 0 AND assumeNotNull(d1_null) = 0 SETTINGS force_data_skipping_indices='`d1_idx`, d1_null_idx'; -- Ok.
```
Works with tables in the MergeTree family.
## format\_schema {#format-schema}
This parameter is useful when you are using formats that require a schema definition, such as [Capn Proto](https://capnproto.org/) or [Protobuf](https://developers.google.com/protocol-buffers/). The value depends on the format.
@ -1565,7 +1594,7 @@ See also:
## allow\_introspection\_functions {#settings-allow_introspection_functions}
Enables of disables [introspections functions](../../sql-reference/functions/introspection.md) for query profiling.
Enables or disables [introspections functions](../../sql-reference/functions/introspection.md) for query profiling.
Possible values:
@ -2027,3 +2056,14 @@ Result:
```
[Original article](https://clickhouse.tech/docs/en/operations/settings/settings/) <!-- hide -->
## allow_experimental_bigint_types {#allow_experimental_bigint_types}
Enables or disables integer values exceeding the range that is supported by the int data type.
Possible values:
- 1 — The bigint data type is enabled.
- 0 — The bigint data type is disabled.
Default value: `0`.

View File

@ -3,7 +3,7 @@ toc_priority: 42
toc_title: Decimal
---
# Decimal(P, S), Decimal32(S), Decimal64(S), Decimal128(S), Decimal256(S) {#decimalp-s-decimal32s-decimal64s-decimal128s}
# Decimal(P, S), Decimal32(S), Decimal64(S), Decimal128(S), Decimal256(S) {#decimalp-s-decimal32s-decimal64s-decimal128s-decimal256s}
Signed fixed-point numbers that keep precision during add, subtract and multiply operations. For division least significant digits are discarded (not rounded).
@ -107,4 +107,8 @@ SELECT toDecimal32(1, 8) < 100
DB::Exception: Can't compare.
```
**See also**
- [isDecimalOverflow](../../sql-reference/functions/other-functions.md#is-decimal-overflow)
- [countDigits](../../sql-reference/functions/other-functions.md#count-digits)
[Original article](https://clickhouse.tech/docs/en/data_types/decimal/) <!--hide-->

View File

@ -3,7 +3,7 @@ toc_priority: 40
toc_title: UInt8, UInt16, UInt32, UInt64, UInt256, Int8, Int16, Int32, Int64, Int128, Int256
---
# UInt8, UInt16, UInt32, UInt64, UInt256, Int8, Int16, Int32, Int64, Int128, Int256 {#uint8-uint16-uint32-uint64-int8-int16-int32-int64}
# UInt8, UInt16, UInt32, UInt64, UInt256, Int8, Int16, Int32, Int64, Int128, Int256 {#uint8-uint16-uint32-uint64-uint256-int8-int16-int32-int64-int128-int256}
Fixed-length integers, with or without a sign.

View File

@ -1526,5 +1526,80 @@ SELECT getSetting('custom_a');
- [Custom Settings](../../operations/settings/index.md#custom_settings)
## isDecimalOverflow {#is-decimal-overflow}
Checks whether the [Decimal](../../sql-reference/data-types/decimal.md#decimalp-s-decimal32s-decimal64s-decimal128s) value is out of its (or specified) precision.
**Syntax**
``` sql
isDecimalOverflow(d, [p])
```
**Parameters**
- `d` — value. [Decimal](../../sql-reference/data-types/decimal.md#decimalp-s-decimal32s-decimal64s-decimal128s).
- `p` — precision. Optional. If omitted, the initial presicion of the first argument is used. Using of this paratemer could be helpful for data extraction to another DBMS or file. [UInt8](../../sql-reference/data-types/int-uint.md#uint-ranges).
**Returned values**
- `1` — Decimal value has more digits then it's precision allow,
- `0` — Decimal value satisfies the specified precision.
**Example**
Query:
``` sql
SELECT isDecimalOverflow(toDecimal32(1000000000, 0), 9),
isDecimalOverflow(toDecimal32(1000000000, 0)),
isDecimalOverflow(toDecimal32(-1000000000, 0), 9),
isDecimalOverflow(toDecimal32(-1000000000, 0));
```
Result:
``` text
1 1 1 1
```
## countDigits {#count-digits}
Returns number of decimal digits you need to represent the value.
**Syntax**
``` sql
countDigits(x)
```
**Parameters**
- `x` — [Int](../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64) or [Decimal](../../sql-reference/data-types/decimal.md#decimalp-s-decimal32s-decimal64s-decimal128s) value.
**Returned value**
Number of digits.
Type: [UInt8](../../sql-reference/data-types/int-uint.md#uint-ranges).
!!! note "Note"
For `Decimal` values takes into account their scales: calculates result over underlying integer type which is `(value * scale)`. For example: `countDigits(42) = 2`, `countDigits(42.000) = 5`, `countDigits(0.04200) = 4`. I.e. you may check decimal overflow for `Decimal64` with `countDecimal(x) > 18`. It's a slow variant of [isDecimalOverflow](#is-decimal-overflow).
**Example**
Query:
``` sql
SELECT countDigits(toDecimal32(1, 9)), countDigits(toDecimal32(-1, 9)),
countDigits(toDecimal64(1, 18)), countDigits(toDecimal64(-1, 18)),
countDigits(toDecimal128(1, 38)), countDigits(toDecimal128(-1, 38));
```
Result:
``` text
10 10 19 19 39 39
```
[Original article](https://clickhouse.tech/docs/en/query_language/functions/other_functions/) <!--hide-->

View File

@ -360,6 +360,89 @@ Extracts a fragment of a string using a regular expression. If haystack do
Extracts all the fragments of a string using a regular expression. If haystack doesnt match the pattern regex, an empty string is returned. Returns an array of strings consisting of all matches to the regex. In general, the behavior is the same as the extract function (it takes the first subpattern, or the entire expression if there isnt a subpattern).
## extractAllGroupsHorizontal {#extractallgroups-horizontal}
Matches all groups of the `haystack` string using the `pattern` regular expression. Returns an array of arrays, where the first array includes all fragments matching the first group, the second array - matching the second group, etc.
!!! note "Note"
`extractAllGroupsHorizontal` function is slower than [extractAllGroupsVertical](#extractallgroups-vertical).
**Syntax**
``` sql
extractAllGroupsHorizontal(haystack, pattern)
```
**Parameters**
- `haystack` — Input string. Type: [String](../../sql-reference/data-types/string.md).
- `pattern` — Regular expression with [re2 syntax](https://github.com/google/re2/wiki/Syntax). Must contain groups, each group enclosed in parentheses. If `pattern` contains no groups, an exception is thrown. Type: [String](../../sql-reference/data-types/string.md).
**Returned value**
- Type: [Array](../../sql-reference/data-types/array.md).
If `haystack` doesnt match the `pattern` regex, an array of empty arrays is returned.
**Example**
Query:
``` sql
SELECT extractAllGroupsHorizontal('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')
```
Result:
``` text
┌─extractAllGroupsHorizontal('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')─┐
│ [['abc','def','ghi'],['111','222','333']] │
└──────────────────────────────────────────────────────────────────────────────────────────┘
```
**See also**
- [extractAllGroupsVertical](#extractallgroups-vertical)
## extractAllGroupsVertical {#extractallgroups-vertical}
Matches all groups of the `haystack` string using the `pattern` regular expression. Returns an array of arrays, where each array includes matching fragments from every group. Fragments are grouped in order of appearance in the `haystack`.
**Syntax**
``` sql
extractAllGroupsVertical(haystack, pattern)
```
**Parameters**
- `haystack` — Input string. Type: [String](../../sql-reference/data-types/string.md).
- `pattern` — Regular expression with [re2 syntax](https://github.com/google/re2/wiki/Syntax). Must contain groups, each group enclosed in parentheses. If `pattern` contains no groups, an exception is thrown. Type: [String](../../sql-reference/data-types/string.md).
**Returned value**
- Type: [Array](../../sql-reference/data-types/array.md).
If `haystack` doesnt match the `pattern` regex, an empty array is returned.
**Example**
Query:
``` sql
SELECT extractAllGroupsVertical('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')
```
Result:
``` text
┌─extractAllGroupsVertical('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')─┐
│ [['abc','111'],['def','222'],['ghi','333']] │
└────────────────────────────────────────────────────────────────────────────────────────┘
```
**See also**
- [extractAllGroupsHorizontal](#extractallgroups-horizontal)
## like(haystack, pattern), haystack LIKE pattern operator {#function-like}
Checks whether a string matches a simple regular expression.

View File

@ -11,7 +11,8 @@ When you convert a value from one to another data type, you should remember that
ClickHouse has the [same behavior as C++ programs](https://en.cppreference.com/w/cpp/language/implicit_conversion).
## toInt(8\|16\|32\|64\|128\|256) {#toint8163264}
## toInt(8\|16\|32\|64\|128\|256) {#toint8163264128256}
Converts an input value to the [Int](../../sql-reference/data-types/int-uint.md) data type. This function family includes:
@ -62,7 +63,7 @@ select toInt64OrZero('123123'), toInt8OrZero('123qwe123')
└─────────────────────────┴───────────────────────────┘
```
## toInt(8\|16\|32\|64\|128\|256)OrNull {#toint8163264ornull}
## toInt(8\|16\|32\|64\|128\|256)OrNull {#toint8163264128256ornull}
It takes an argument of type String and tries to parse it into Int (8 \| 16 \| 32 \| 64 \| 128 \| 256). If failed, returns NULL.
@ -78,7 +79,7 @@ select toInt64OrNull('123123'), toInt8OrNull('123qwe123')
└─────────────────────────┴───────────────────────────┘
```
## toUInt(8\|16\|32\|64\|256) {#touint8163264}
## toUInt(8\|16\|32\|64\|256) {#touint8163264256}
Converts an input value to the [UInt](../../sql-reference/data-types/int-uint.md) data type. This function family includes:
@ -112,9 +113,9 @@ SELECT toUInt64(nan), toUInt32(-32), toUInt16('16'), toUInt8(8.8)
└─────────────────────┴───────────────┴────────────────┴──────────────┘
```
## toUInt(8\|16\|32\|64\|256)OrZero {#touint8163264orzero}
## toUInt(8\|16\|32\|64\|256)OrZero {#touint8163264256orzero}
## toUInt(8\|16\|32\|64\|256)OrNull {#touint8163264ornull}
## toUInt(8\|16\|32\|64\|256)OrNull {#touint8163264256ornull}
## toFloat(32\|64) {#tofloat3264}
@ -134,7 +135,7 @@ SELECT toUInt64(nan), toUInt32(-32), toUInt16('16'), toUInt8(8.8)
## toDateTimeOrNull {#todatetimeornull}
## toDecimal(32\|64\|128\|256) {#todecimal3264128}
## toDecimal(32\|64\|128\|256) {#todecimal3264128256}
Converts `value` to the [Decimal](../../sql-reference/data-types/decimal.md) data type with precision of `S`. The `value` can be a number or a string. The `S` (scale) parameter specifies the number of decimal places.
@ -143,7 +144,7 @@ Converts `value` to the [Decimal](../../sql-reference/data-types/decimal.md) dat
- `toDecimal128(value, S)`
- `toDecimal256(value, S)`
## toDecimal(32\|64\|128\|256)OrNull {#todecimal3264128ornull}
## toDecimal(32\|64\|128\|256)OrNull {#todecimal3264128256ornull}
Converts an input string to a [Nullable(Decimal(P,S))](../../sql-reference/data-types/decimal.md) data type value. This family of functions include:
@ -188,7 +189,9 @@ SELECT toDecimal32OrNull(toString(-1.111), 2) AS val, toTypeName(val)
└──────┴────────────────────────────────────────────────────┘
```
## toDecimal(32\|64\|128\|256)OrZero {#todecimal3264128orzero}
## toDecimal(32\|64\|128\'|256)OrZero {#todecimal3264128256orzero}
Converts an input value to the [Decimal(P,S)](../../sql-reference/data-types/decimal.md) data type. This family of functions include:

View File

@ -102,4 +102,9 @@ SELECT toDecimal32(1, 8) < 100
DB::Exception: Can't compare.
```
**Смотрите также**
- [isDecimalOverflow](../../sql-reference/functions/other-functions.md#is-decimal-overflow)
- [countDigits](../../sql-reference/functions/other-functions.md#count-digits)
[Оригинальная статья](https://clickhouse.tech/docs/ru/data_types/decimal/) <!--hide-->

View File

@ -1431,5 +1431,80 @@ SELECT randomStringUTF8(13)
```
## isDecimalOverflow {#is-decimal-overflow}
Проверяет, находится ли число [Decimal](../../sql-reference/data-types/decimal.md#decimalp-s-decimal32s-decimal64s-decimal128s) вне собственной (или заданной) области значений.
**Синтаксис**
``` sql
isDecimalOverflow(d, [p])
```
**Параметры**
- `d` — число. [Decimal](../../sql-reference/data-types/decimal.md#decimalp-s-decimal32s-decimal64s-decimal128s).
- `p` — точность. Необязательный параметр. Если опущен, используется исходная точность первого аргумента. Использование этого параметра может быть полезно для извлечения данных в другую СУБД или файл. [UInt8](../../sql-reference/data-types/int-uint.md#uint-ranges).
**Возвращаемое значение**
- `1` — число имеет больше цифр, чем позволяет точность.
- `0` — число удовлетворяет заданной точности.
**Пример**
Запрос:
``` sql
SELECT isDecimalOverflow(toDecimal32(1000000000, 0), 9),
isDecimalOverflow(toDecimal32(1000000000, 0)),
isDecimalOverflow(toDecimal32(-1000000000, 0), 9),
isDecimalOverflow(toDecimal32(-1000000000, 0));
```
Результат:
``` text
1 1 1 1
```
## countDigits {#count-digits}
Возвращает количество десятичных цифр, необходимых для представления значения.
**Синтаксис**
``` sql
countDigits(x)
```
**Параметры**
- `x` — [целое](../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64) или [дробное](../../sql-reference/data-types/decimal.md#decimalp-s-decimal32s-decimal64s-decimal128s) число.
**Возвращаемое значение**
Количество цифр.
Тип: [UInt8](../../sql-reference/data-types/int-uint.md#uint-ranges).
!!! note "Примечание"
Для `Decimal` значений учитывается их масштаб: вычисляется результат по базовому целочисленному типу, полученному как `(value * scale)`. Например: `countDigits(42) = 2`, `countDigits(42.000) = 5`, `countDigits(0.04200) = 4`. То есть вы можете проверить десятичное переполнение для `Decimal64` с помощью `countDecimal(x) > 18`. Это медленный вариант [isDecimalOverflow](#is-decimal-overflow).
**Пример**
Запрос:
``` sql
SELECT countDigits(toDecimal32(1, 9)), countDigits(toDecimal32(-1, 9)),
countDigits(toDecimal64(1, 18)), countDigits(toDecimal64(-1, 18)),
countDigits(toDecimal128(1, 38)), countDigits(toDecimal128(-1, 38));
```
Результат:
``` text
10 10 19 19 39 39
```
[Оригинальная статья](https://clickhouse.tech/docs/ru/query_language/functions/other_functions/) <!--hide-->

View File

@ -341,6 +341,89 @@ Result:
Извлечение всех фрагментов строки по регулярному выражению. Если haystack не соответствует регулярному выражению pattern, то возвращается пустая строка. Возвращается массив строк, состоящий из всех соответствий регулярному выражению. В остальном, поведение аналогично функции extract (по прежнему, вынимается первый subpattern, или всё выражение, если subpattern-а нет).
## extractAllGroupsHorizontal {#extractallgroups-horizontal}
Разбирает строку `haystack` на фрагменты, соответствующие группам регулярного выражения `pattern`. Возвращает массив массивов, где первый массив содержит все фрагменты, соответствующие первой группе регулярного выражения, второй массив - соответствующие второй группе, и т.д.
!!! note "Замечание"
Функция `extractAllGroupsHorizontal` работает медленнее, чем функция [extractAllGroupsVertical](#extractallgroups-vertical).
**Синтаксис**
``` sql
extractAllGroupsHorizontal(haystack, pattern)
```
**Параметры**
- `haystack` — строка для разбора. Тип: [String](../../sql-reference/data-types/string.md).
- `pattern` — регулярное выражение, построенное по синтаксическим правилам [re2](https://github.com/google/re2/wiki/Syntax). Выражение должно содержать группы, заключенные в круглые скобки. Если выражение не содержит групп, генерируется исключение. Тип: [String](../../sql-reference/data-types/string.md).
**Возвращаемое значение**
- Тип: [Array](../../sql-reference/data-types/array.md).
Если в строке `haystack` нет групп, соответствующих регулярному выражению `pattern`, возвращается массив пустых массивов.
**Пример**
Запрос:
``` sql
SELECT extractAllGroupsHorizontal('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')
```
Результат:
``` text
┌─extractAllGroupsHorizontal('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')─┐
│ [['abc','def','ghi'],['111','222','333']] │
└──────────────────────────────────────────────────────────────────────────────────────────┘
```
**См. также**
- функция [extractAllGroupsVertical](#extractallgroups-vertical)
## extractAllGroupsVertical {#extractallgroups-vertical}
Разбирает строку `haystack` на фрагменты, соответствующие группам регулярного выражения `pattern`. Возвращает массив массивов, где каждый массив содержит по одному фрагменту, соответствующему каждой группе регулярного выражения. Фрагменты группируются в массивы в соответствии с порядком появления в исходной строке.
**Синтаксис**
``` sql
extractAllGroupsVertical(haystack, pattern)
```
**Параметры**
- `haystack` — строка для разбора. Тип: [String](../../sql-reference/data-types/string.md).
- `pattern` — регулярное выражение, построенное по синтаксическим правилам [re2](https://github.com/google/re2/wiki/Syntax). Выражение должно содержать группы, заключенные в круглые скобки. Если выражение не содержит групп, генерируется исключение. Тип: [String](../../sql-reference/data-types/string.md).
**Возвращаемое значение**
- Тип: [Array](../../sql-reference/data-types/array.md).
Если в строке `haystack` нет групп, соответствующих регулярному выражению `pattern`, возвращается пустой массив.
**Пример**
Запрос:
``` sql
SELECT extractAllGroupsVertical('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')
```
Результат:
``` text
┌─extractAllGroupsVertical('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')─┐
│ [['abc','111'],['def','222'],['ghi','333']] │
└────────────────────────────────────────────────────────────────────────────────────────┘
```
**См. также**
- функция [extractAllGroupsHorizontal](#extractallgroups-horizontal)
## like(haystack, pattern), оператор haystack LIKE pattern {#function-like}
Проверка строки на соответствие простому регулярному выражению.

View File

@ -24,14 +24,14 @@ toc_title: "\u0412\u0432\u0435\u0434\u0435\u043D\u0438\u0435"
| Функция | Описание |
|-----------------------|---------------------------------------------------------------------------------------------------------------------------------------|
| [file](file.md) | Создаёт таблицу с движком [File](../../sql-reference/table-functions/index.md). |
| [merge](merge.md) | Создаёт таблицу с движком [Merge](../../sql-reference/table-functions/index.md). |
| [file](file.md) | Создаёт таблицу с движком [File](../../engines/table-engines/special/file.md). |
| [merge](merge.md) | Создаёт таблицу с движком [Merge](../../engines/table-engines/special/merge.md). |
| [numbers](numbers.md) | Создаёт таблицу с единственным столбцом, заполненным целыми числами. |
| [remote](remote.md) | Предоставляет доступ к удалённым серверам, не создавая таблицу с движком [Distributed](../../sql-reference/table-functions/index.md). |
| [url](url.md) | Создаёт таблицу с движком [Url](../../sql-reference/table-functions/index.md). |
| [mysql](mysql.md) | Создаёт таблицу с движком [MySQL](../../sql-reference/table-functions/index.md). |
| [jdbc](jdbc.md) | Создаёт таблицу с дижком [JDBC](../../sql-reference/table-functions/index.md). |
| [odbc](odbc.md) | Создаёт таблицу с движком [ODBC](../../sql-reference/table-functions/index.md). |
| [hdfs](hdfs.md) | Создаёт таблицу с движком [HDFS](../../sql-reference/table-functions/index.md). |
| [remote](remote.md) | Предоставляет доступ к удалённым серверам, не создавая таблицу с движком [Distributed](../../engines/table-engines/special/distributed.md). |
| [url](url.md) | Создаёт таблицу с движком [Url](../../engines/table-engines/special/url.md). |
| [mysql](mysql.md) | Создаёт таблицу с движком [MySQL](../../engines/table-engines/integrations/mysql.md). |
| [jdbc](jdbc.md) | Создаёт таблицу с дижком [JDBC](../../engines/table-engines/integrations/jdbc.md). |
| [odbc](odbc.md) | Создаёт таблицу с движком [ODBC](../../engines/table-engines/integrations/odbc.md). |
| [hdfs](hdfs.md) | Создаёт таблицу с движком [HDFS](../../engines/table-engines/integrations/hdfs.md). |
[Оригинальная статья](https://clickhouse.tech/docs/ru/query_language/table_functions/) <!--hide-->

View File

@ -267,12 +267,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
registerDictionaries();
registerDisks();
#if !defined(ARCADIA_BUILD)
#if USE_OPENCL
BitonicSort::getInstance().configure();
#endif
#endif
CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::getVersionRevision());
CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger());

View File

@ -82,13 +82,6 @@ inline UInt64 getCurrentTimeNanoseconds(clockid_t clock_type = CLOCK_MONOTONIC)
return ts.tv_sec * 1000000000ULL + ts.tv_nsec;
}
inline UInt64 getCurrentTimeMicroseconds()
{
struct timeval tv;
gettimeofday(&tv, nullptr);
return (tv.tv_sec) * 1000000U + (tv.tv_usec);
}
struct RUsageCounters
{
/// In nanoseconds
@ -115,13 +108,6 @@ struct RUsageCounters
hard_page_faults = static_cast<UInt64>(rusage.ru_majflt);
}
static RUsageCounters zeros(UInt64 real_time_ = getCurrentTimeNanoseconds())
{
RUsageCounters res;
res.real_time = real_time_;
return res;
}
static RUsageCounters current(UInt64 real_time_ = getCurrentTimeNanoseconds())
{
::rusage rusage {};

View File

@ -152,12 +152,14 @@ inline typename DecimalType::NativeType getFractionalPartWithScaleMultiplier(
{
using T = typename DecimalType::NativeType;
T result = decimal.value;
/// There's UB with min integer value here. But it does not matter for Decimals cause they use not full integer ranges.
/// Anycase we make modulo before compare to make scale_multiplier > 1 unaffected.
T result = decimal.value % scale_multiplier;
if constexpr (!keep_sign)
if (result < T(0))
result = -result;
return result % scale_multiplier;
return result;
}
/** Get fractional part from decimal

View File

@ -135,6 +135,7 @@ class IColumn;
\
M(Bool, force_index_by_date, 0, "Throw an exception if there is a partition key in a table, and it is not used.", 0) \
M(Bool, force_primary_key, 0, "Throw an exception if there is primary key in a table, and it is not used.", 0) \
M(String, force_data_skipping_indices, "", "Comma separated list of strings or literals with the name of the data skipping indices that should be used during query execution, otherwise an exception will be thrown.", 0) \
\
M(Float, max_streams_to_max_threads_ratio, 1, "Allows you to use more sources than the number of threads - to more evenly distribute work across threads. It is assumed that this is a temporary solution, since it will be possible in the future to make the number of sources equal to the number of threads, but for each source to dynamically select available work for itself.", 0) \
M(Float, max_streams_multiplier_for_merge_tables, 5, "Ask more streams when reading from Merge table. Streams will be spread across tables that Merge table will use. This allows more even distribution of work across threads and especially helpful when merged tables differ in size.", 0) \
@ -389,14 +390,6 @@ class IColumn;
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\
M(Bool, allow_experimental_low_cardinality_type, true, "Obsolete setting, does nothing. Will be removed after 2019-08-13", 0) \
M(Bool, compile, false, "Whether query compilation is enabled. Will be removed after 2020-03-13", 0) \
M(UInt64, min_count_to_compile, 0, "Obsolete setting, does nothing. Will be removed after 2020-03-13", 0) \
M(Bool, allow_experimental_multiple_joins_emulation, true, "Obsolete setting, does nothing. Will be removed after 2020-05-31", 0) \
M(Bool, allow_experimental_cross_to_join_conversion, true, "Obsolete setting, does nothing. Will be removed after 2020-05-31", 0) \
M(Bool, allow_experimental_data_skipping_indices, true, "Obsolete setting, does nothing. Will be removed after 2020-05-31", 0) \
M(Bool, merge_tree_uniform_read_distribution, true, "Obsolete setting, does nothing. Will be removed after 2020-05-20", 0) \
M(UInt64, mark_cache_min_lifetime, 0, "Obsolete setting, does nothing. Will be removed after 2020-05-31", 0) \
M(Bool, partial_merge_join, false, "Obsolete. Use join_algorithm='prefer_partial_merge' instead.", 0) \
M(UInt64, max_memory_usage_for_all_queries, 0, "Obsolete. Will be removed after 2020-10-20", 0) \
M(UInt64, multiple_joins_rewriter_version, 0, "Obsolete setting, does nothing. Will be removed after 2021-03-31", 0) \

View File

@ -194,6 +194,7 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode
auto src_buffer = cache_disk->readFile(path, buf_size, estimated_size, aio_threshold, 0);
auto dst_buffer = DiskDecorator::writeFile(path, buf_size, mode, estimated_size, aio_threshold);
copyData(*src_buffer, *dst_buffer);
dst_buffer->finalize();
},
buf_size);
}

View File

@ -27,6 +27,7 @@ void copyFile(IDisk & from_disk, const String & from_path, IDisk & to_disk, cons
auto in = from_disk.readFile(from_path);
auto out = to_disk.writeFile(to_path);
copyData(*in, *out);
out->finalize();
}

View File

@ -1,3 +1,4 @@
#include <aws/core/client/DefaultRetryStrategy.h>
#include <IO/ReadHelpers.h>
#include <IO/S3Common.h>
#include <IO/WriteHelpers.h>
@ -123,6 +124,9 @@ void registerDiskS3(DiskFactory & factory)
if (proxy_config)
cfg.perRequestConfiguration = [proxy_config](const auto & request) { return proxy_config->getConfiguration(request); };
cfg.retryStrategy = std::make_shared<Aws::Client::DefaultRetryStrategy>(
config.getUInt(config_prefix + ".retry_attempts", 10));
auto client = S3::ClientFactory::instance().create(
cfg,
uri.is_virtual_hosted_style,

View File

@ -5,7 +5,6 @@ namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
}
@ -18,10 +17,7 @@ struct DivideFloatingImpl
template <typename Result = ResultType>
static inline NO_SANITIZE_UNDEFINED Result apply(A a [[maybe_unused]], B b [[maybe_unused]])
{
if constexpr (is_big_int_v<A> || is_big_int_v<B>)
throw Exception("DivideFloatingImpl are not implemented for big integers", ErrorCodes::NOT_IMPLEMENTED);
else
return static_cast<Result>(a) / b;
return static_cast<Result>(a) / b;
}
#if USE_EMBEDDED_COMPILER

View File

@ -3,6 +3,7 @@
#if USE_HDFS
#include <IO/HDFSCommon.h>
#include <hdfs/hdfs.h>
#include <mutex>
namespace DB
@ -17,6 +18,9 @@ ReadBufferFromHDFS::~ReadBufferFromHDFS() = default;
struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
{
/// HDFS create/open functions are not thread safe
static std::mutex hdfs_init_mutex;
std::string hdfs_uri;
hdfsFile fin;
HDFSBuilderPtr builder;
@ -24,9 +28,11 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
explicit ReadBufferFromHDFSImpl(const std::string & hdfs_name_)
: hdfs_uri(hdfs_name_)
, builder(createHDFSBuilder(hdfs_uri))
, fs(createHDFSFS(builder.get()))
{
std::lock_guard lock(hdfs_init_mutex);
builder = createHDFSBuilder(hdfs_uri);
fs = createHDFSFS(builder.get());
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
const std::string path = hdfs_uri.substr(begin_of_path);
fin = hdfsOpenFile(fs.get(), path.c_str(), O_RDONLY, 0, 0, 0);
@ -47,10 +53,13 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
~ReadBufferFromHDFSImpl()
{
std::lock_guard lock(hdfs_init_mutex);
hdfsCloseFile(fs.get(), fin);
}
};
std::mutex ReadBufferFromHDFS::ReadBufferFromHDFSImpl::hdfs_init_mutex;
ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size)
: BufferWithOwnMemory<ReadBuffer>(buf_size)
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_name_))

View File

@ -74,7 +74,7 @@ static size_t getTypeDepth(const DataTypePtr & type)
}
template<typename Collection>
static Block createBlockFromCollection(const Collection & collection, const DataTypes & types, const Context & context)
static Block createBlockFromCollection(const Collection & collection, const DataTypes & types, bool transform_null_in)
{
size_t columns_num = types.size();
MutableColumns columns(columns_num);
@ -87,7 +87,8 @@ static Block createBlockFromCollection(const Collection & collection, const Data
if (columns_num == 1)
{
auto field = convertFieldToType(value, *types[0]);
if (!field.isNull() || context.getSettingsRef().transform_null_in)
bool need_insert_null = transform_null_in && types[0]->isNullable();
if (!field.isNull() || need_insert_null)
columns[0]->insert(std::move(field));
}
else
@ -110,7 +111,8 @@ static Block createBlockFromCollection(const Collection & collection, const Data
for (; i < tuple_size; ++i)
{
tuple_values[i] = convertFieldToType(tuple[i], *types[i]);
if (tuple_values[i].isNull() && !context.getSettingsRef().transform_null_in)
bool need_insert_null = transform_null_in && types[i]->isNullable();
if (tuple_values[i].isNull() && !need_insert_null)
break;
}
@ -155,6 +157,7 @@ static Block createBlockFromAST(const ASTPtr & node, const DataTypes & types, co
DataTypePtr tuple_type;
Row tuple_values;
const auto & list = node->as<ASTExpressionList &>();
bool transform_null_in = context.getSettingsRef().transform_null_in;
for (const auto & elem : list.children)
{
if (num_columns == 1)
@ -162,8 +165,9 @@ static Block createBlockFromAST(const ASTPtr & node, const DataTypes & types, co
/// One column at the left of IN.
Field value = extractValueFromNode(elem, *types[0], context);
bool need_insert_null = transform_null_in && types[0]->isNullable();
if (!value.isNull() || context.getSettingsRef().transform_null_in)
if (!value.isNull() || need_insert_null)
columns[0]->insert(value);
}
else if (elem->as<ASTFunction>() || elem->as<ASTLiteral>())
@ -217,9 +221,11 @@ static Block createBlockFromAST(const ASTPtr & node, const DataTypes & types, co
Field value = tuple ? convertFieldToType((*tuple)[i], *types[i])
: extractValueFromNode(func->arguments->children[i], *types[i], context);
bool need_insert_null = transform_null_in && types[i]->isNullable();
/// If at least one of the elements of the tuple has an impossible (outside the range of the type) value,
/// then the entire tuple too.
if (value.isNull() && !context.getSettings().transform_null_in)
if (value.isNull() && !need_insert_null)
break;
tuple_values[i] = value;
@ -254,20 +260,22 @@ Block createBlockForSet(
};
Block block;
bool tranform_null_in = context.getSettingsRef().transform_null_in;
/// 1 in 1; (1, 2) in (1, 2); identity(tuple(tuple(tuple(1)))) in tuple(tuple(tuple(1))); etc.
if (left_type_depth == right_type_depth)
{
Array array{right_arg_value};
block = createBlockFromCollection(array, set_element_types, context);
block = createBlockFromCollection(array, set_element_types, tranform_null_in);
}
/// 1 in (1, 2); (1, 2) in ((1, 2), (3, 4)); etc.
else if (left_type_depth + 1 == right_type_depth)
{
auto type_index = right_arg_type->getTypeId();
if (type_index == TypeIndex::Tuple)
block = createBlockFromCollection(DB::get<const Tuple &>(right_arg_value), set_element_types, context);
block = createBlockFromCollection(DB::get<const Tuple &>(right_arg_value), set_element_types, tranform_null_in);
else if (type_index == TypeIndex::Array)
block = createBlockFromCollection(DB::get<const Array &>(right_arg_value), set_element_types, context);
block = createBlockFromCollection(DB::get<const Array &>(right_arg_value), set_element_types, tranform_null_in);
else
throw_unsupported_type(right_arg_type);
}

View File

@ -1114,10 +1114,6 @@ ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSe
input_columns = split_actions->getSampleBlock().getNamesAndTypesList();
input_columns.insert(input_columns.end(), inputs_from_array_join.begin(), inputs_from_array_join.end());
/// Remove not needed columns.
if (!actions.empty())
prependProjectInput();
return split_actions;
}

View File

@ -27,7 +27,9 @@ namespace
StoragePtr tryGetTable(const ASTPtr & database_and_table, const Context & context)
{
auto table_id = context.resolveStorageID(database_and_table);
auto table_id = context.tryResolveStorageID(database_and_table);
if (!table_id)
return {};
return DatabaseCatalog::instance().tryGetTable(table_id, context);
}

View File

@ -171,7 +171,8 @@ void ThreadStatus::initPerformanceCounters()
query_start_time_microseconds = time_in_microseconds(now);
++queries_started;
*last_rusage = RUsageCounters::current(query_start_time_nanoseconds);
// query_start_time_nanoseconds cannot be used here since RUsageCounters expect CLOCK_MONOTONIC
*last_rusage = RUsageCounters::current();
if (query_context)
{

View File

@ -213,6 +213,7 @@ void TranslateQualifiedNamesMatcher::visit(ASTExpressionList & node, const ASTPt
for (const auto & child : old_children)
{
ASTs columns;
if (const auto * asterisk = child->as<ASTAsterisk>())
{
bool first_table = true;
@ -222,7 +223,7 @@ void TranslateQualifiedNamesMatcher::visit(ASTExpressionList & node, const ASTPt
{
if (first_table || !data.join_using_columns.count(column.name))
{
addIdentifier(node.children, table.table, column.name);
addIdentifier(columns, table.table, column.name);
}
}
@ -230,7 +231,7 @@ void TranslateQualifiedNamesMatcher::visit(ASTExpressionList & node, const ASTPt
}
for (const auto & transformer : asterisk->children)
{
IASTColumnsTransformer::transform(transformer, node.children);
IASTColumnsTransformer::transform(transformer, columns);
}
}
else if (const auto * asterisk_pattern = child->as<ASTColumnsMatcher>())
@ -238,7 +239,7 @@ void TranslateQualifiedNamesMatcher::visit(ASTExpressionList & node, const ASTPt
if (asterisk_pattern->column_list)
{
for (const auto & ident : asterisk_pattern->column_list->children)
node.children.emplace_back(ident->clone());
columns.emplace_back(ident->clone());
}
else
{
@ -249,7 +250,7 @@ void TranslateQualifiedNamesMatcher::visit(ASTExpressionList & node, const ASTPt
{
if (asterisk_pattern->isColumnMatching(column.name) && (first_table || !data.join_using_columns.count(column.name)))
{
addIdentifier(node.children, table.table, column.name);
addIdentifier(columns, table.table, column.name);
}
}
@ -259,7 +260,7 @@ void TranslateQualifiedNamesMatcher::visit(ASTExpressionList & node, const ASTPt
// ColumnsMatcher's transformers start to appear at child 1
for (auto it = asterisk_pattern->children.begin() + 1; it != asterisk_pattern->children.end(); ++it)
{
IASTColumnsTransformer::transform(*it, node.children);
IASTColumnsTransformer::transform(*it, columns);
}
}
else if (const auto * qualified_asterisk = child->as<ASTQualifiedAsterisk>())
@ -272,7 +273,7 @@ void TranslateQualifiedNamesMatcher::visit(ASTExpressionList & node, const ASTPt
{
for (const auto & column : table.columns)
{
addIdentifier(node.children, table.table, column.name);
addIdentifier(columns, table.table, column.name);
}
break;
}
@ -280,11 +281,16 @@ void TranslateQualifiedNamesMatcher::visit(ASTExpressionList & node, const ASTPt
// QualifiedAsterisk's transformers start to appear at child 1
for (auto it = qualified_asterisk->children.begin() + 1; it != qualified_asterisk->children.end(); ++it)
{
IASTColumnsTransformer::transform(*it, node.children);
IASTColumnsTransformer::transform(*it, columns);
}
}
else
node.children.emplace_back(child);
columns.emplace_back(child);
node.children.insert(
node.children.end(),
std::make_move_iterator(columns.begin()),
std::make_move_iterator(columns.end()));
}
}

View File

@ -32,7 +32,10 @@ void ASTColumnsMatcher::formatImpl(const FormatSettings & settings, FormatState
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "COLUMNS" << (settings.hilite ? hilite_none : "") << "(";
if (column_list)
{
frame.expression_list_prepend_whitespace = false;
column_list->formatImpl(settings, state, frame);
}
else
settings.ostr << quoteString(original_pattern);
settings.ostr << ")";

View File

@ -566,7 +566,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st
}
if (node->exception)
finish();
cancel();
if (finished)
break;

View File

@ -101,7 +101,7 @@ public:
void setQuota(const std::shared_ptr<const EnabledQuota> & quota);
/// Do not allow to change the table while the processors of pipe are alive.
void addTableLock(const TableLockHolder & lock) { holder.table_locks.push_back(lock); }
void addTableLock(TableLockHolder lock) { holder.table_locks.emplace_back(std::move(lock)); }
/// This methods are from QueryPipeline. Needed to make conversion from pipeline to pipe possible.
void addInterpreterContext(std::shared_ptr<Context> context) { holder.interpreter_context.emplace_back(std::move(context)); }
void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); }

View File

@ -101,7 +101,7 @@ public:
const Block & getHeader() const { return pipe.getHeader(); }
void addTableLock(const TableLockHolder & lock) { pipe.addTableLock(lock); }
void addTableLock(TableLockHolder lock) { pipe.addTableLock(std::move(lock)); }
void addInterpreterContext(std::shared_ptr<Context> context) { pipe.addInterpreterContext(std::move(context)); }
void addStorageHolder(StoragePtr storage) { pipe.addStorageHolder(std::move(storage)); }
void addQueryPlan(std::unique_ptr<QueryPlan> plan) { pipe.addQueryPlan(std::move(plan)); }

View File

@ -12,30 +12,18 @@ namespace DB
{
ReadFromStorageStep::ReadFromStorageStep(
TableLockHolder table_lock_,
StorageMetadataPtr metadata_snapshot_,
StreamLocalLimits & limits_,
SizeLimits & leaf_limits_,
std::shared_ptr<const EnabledQuota> quota_,
StoragePtr storage_,
const Names & required_columns_,
const SelectQueryInfo & query_info_,
std::shared_ptr<Context> context_,
QueryProcessingStage::Enum processing_stage_,
size_t max_block_size_,
size_t max_streams_)
: table_lock(std::move(table_lock_))
, metadata_snapshot(std::move(metadata_snapshot_))
, limits(limits_)
, leaf_limits(leaf_limits_)
, quota(std::move(quota_))
, storage(std::move(storage_))
, required_columns(required_columns_)
, query_info(query_info_)
, context(std::move(context_))
, processing_stage(processing_stage_)
, max_block_size(max_block_size_)
, max_streams(max_streams_)
TableLockHolder table_lock,
StorageMetadataPtr metadata_snapshot,
StreamLocalLimits & limits,
SizeLimits & leaf_limits,
std::shared_ptr<const EnabledQuota> quota,
StoragePtr storage,
const Names & required_columns,
const SelectQueryInfo & query_info,
std::shared_ptr<Context> context,
QueryProcessingStage::Enum processing_stage,
size_t max_block_size,
size_t max_streams)
{
/// Note: we read from storage in constructor of step because we don't know real header before reading.
/// It will be fixed when storage return QueryPlanStep itself.
@ -83,9 +71,6 @@ ReadFromStorageStep::ReadFromStorageStep(
pipeline = std::make_unique<QueryPipeline>();
QueryPipelineProcessorsCollector collector(*pipeline, this);
/// Table lock is stored inside pipeline here.
pipeline->addTableLock(table_lock);
pipe.setLimits(limits);
/**
@ -103,8 +88,11 @@ ReadFromStorageStep::ReadFromStorageStep(
pipeline->init(std::move(pipe));
/// Add resources to pipeline. The order is important.
/// Add in reverse order of destruction. Pipeline will be destroyed at the end in case of exception.
pipeline->addInterpreterContext(std::move(context));
pipeline->addStorageHolder(std::move(storage));
pipeline->addTableLock(std::move(table_lock));
processors = collector.detachProcessors();

View File

@ -45,20 +45,6 @@ public:
void describePipeline(FormatSettings & settings) const override;
private:
TableLockHolder table_lock;
StorageMetadataPtr metadata_snapshot;
StreamLocalLimits limits;
SizeLimits leaf_limits;
std::shared_ptr<const EnabledQuota> quota;
StoragePtr storage;
const Names & required_columns;
const SelectQueryInfo & query_info;
std::shared_ptr<Context> context;
QueryProcessingStage::Enum processing_stage;
size_t max_block_size;
size_t max_streams;
QueryPipelinePtr pipeline;
Processors processors;
};

View File

@ -50,6 +50,96 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
struct StorageKafkaInterceptors
{
static rd_kafka_resp_err_t rdKafkaOnThreadStart(rd_kafka_t *, rd_kafka_thread_type_t thread_type, const char *, void * ctx)
{
StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx);
const auto & storage_id = self->getStorageID();
const auto & table = storage_id.getTableName();
switch (thread_type)
{
case RD_KAFKA_THREAD_MAIN:
setThreadName(("rdk:m/" + table.substr(0, 9)).c_str());
break;
case RD_KAFKA_THREAD_BACKGROUND:
setThreadName(("rdk:bg/" + table.substr(0, 8)).c_str());
break;
case RD_KAFKA_THREAD_BROKER:
setThreadName(("rdk:b/" + table.substr(0, 9)).c_str());
break;
}
/// Create ThreadStatus to track memory allocations from librdkafka threads.
//
/// And store them in a separate list (thread_statuses) to make sure that they will be destroyed,
/// regardless how librdkafka calls the hooks.
/// But this can trigger use-after-free if librdkafka will not destroy threads after rd_kafka_wait_destroyed()
auto thread_status = std::make_shared<ThreadStatus>();
std::lock_guard lock(self->thread_statuses_mutex);
self->thread_statuses.emplace_back(std::move(thread_status));
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
static rd_kafka_resp_err_t rdKafkaOnThreadExit(rd_kafka_t *, rd_kafka_thread_type_t, const char *, void * ctx)
{
StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx);
std::lock_guard lock(self->thread_statuses_mutex);
const auto it = std::find_if(self->thread_statuses.begin(), self->thread_statuses.end(), [](const auto & thread_status_ptr)
{
return thread_status_ptr.get() == current_thread;
});
if (it == self->thread_statuses.end())
throw Exception("No thread status for this librdkafka thread.", ErrorCodes::LOGICAL_ERROR);
self->thread_statuses.erase(it);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
static rd_kafka_resp_err_t rdKafkaOnNew(rd_kafka_t * rk, const rd_kafka_conf_t *, void * ctx, char * /*errstr*/, size_t /*errstr_size*/)
{
StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx);
rd_kafka_resp_err_t status;
status = rd_kafka_interceptor_add_on_thread_start(rk, "init-thread", rdKafkaOnThreadStart, ctx);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
{
LOG_ERROR(self->log, "Cannot set on thread start interceptor due to {} error", status);
return status;
}
status = rd_kafka_interceptor_add_on_thread_exit(rk, "exit-thread", rdKafkaOnThreadExit, ctx);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
LOG_ERROR(self->log, "Cannot set on thread exit interceptor due to {} error", status);
return status;
}
static rd_kafka_resp_err_t rdKafkaOnConfDup(rd_kafka_conf_t * new_conf, const rd_kafka_conf_t * /*old_conf*/, size_t /*filter_cnt*/, const char ** /*filter*/, void * ctx)
{
StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx);
rd_kafka_resp_err_t status;
// cppkafka copies configuration multiple times
status = rd_kafka_conf_interceptor_add_on_conf_dup(new_conf, "init", rdKafkaOnConfDup, ctx);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
{
LOG_ERROR(self->log, "Cannot set on conf dup interceptor due to {} error", status);
return status;
}
status = rd_kafka_conf_interceptor_add_on_new(new_conf, "init", rdKafkaOnNew, ctx);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
LOG_ERROR(self->log, "Cannot set on conf new interceptor due to {} error", status);
return status;
}
};
namespace
{
const auto RESCHEDULE_MS = 500;
@ -76,46 +166,6 @@ namespace
conf.set(key_name, config.getString(key_path));
}
}
rd_kafka_resp_err_t rdKafkaOnThreadStart(rd_kafka_t *, rd_kafka_thread_type_t thread_type, const char *, void * ctx)
{
StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx);
const auto & storage_id = self->getStorageID();
const auto & table = storage_id.getTableName();
switch (thread_type)
{
case RD_KAFKA_THREAD_MAIN:
setThreadName(("rdk:m/" + table.substr(0, 9)).c_str());
break;
case RD_KAFKA_THREAD_BACKGROUND:
setThreadName(("rdk:bg/" + table.substr(0, 8)).c_str());
break;
case RD_KAFKA_THREAD_BROKER:
setThreadName(("rdk:b/" + table.substr(0, 9)).c_str());
break;
}
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
rd_kafka_resp_err_t rdKafkaOnNew(rd_kafka_t * rk, const rd_kafka_conf_t *, void * ctx, char * /*errstr*/, size_t /*errstr_size*/)
{
return rd_kafka_interceptor_add_on_thread_start(rk, "setThreadName", rdKafkaOnThreadStart, ctx);
}
rd_kafka_resp_err_t rdKafkaOnConfDup(rd_kafka_conf_t * new_conf, const rd_kafka_conf_t * /*old_conf*/, size_t /*filter_cnt*/, const char ** /*filter*/, void * ctx)
{
rd_kafka_resp_err_t status;
// cppkafka copies configuration multiple times
status = rd_kafka_conf_interceptor_add_on_conf_dup(new_conf, "setThreadName", rdKafkaOnConfDup, ctx);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
return status;
status = rd_kafka_conf_interceptor_add_on_new(new_conf, "setThreadName", rdKafkaOnNew, ctx);
return status;
}
}
StorageKafka::StorageKafka(
@ -443,14 +493,16 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & conf)
int status;
status = rd_kafka_conf_interceptor_add_on_new(conf.get_handle(), "setThreadName", rdKafkaOnNew, self);
status = rd_kafka_conf_interceptor_add_on_new(conf.get_handle(),
"init", StorageKafkaInterceptors::rdKafkaOnNew, self);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
LOG_ERROR(log, "Cannot set new interceptor");
LOG_ERROR(log, "Cannot set new interceptor due to {} error", status);
// cppkafka always copy the configuration
status = rd_kafka_conf_interceptor_add_on_conf_dup(conf.get_handle(), "setThreadName", rdKafkaOnConfDup, self);
status = rd_kafka_conf_interceptor_add_on_conf_dup(conf.get_handle(),
"init", StorageKafkaInterceptors::rdKafkaOnConfDup, self);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
LOG_ERROR(log, "Cannot set dup conf interceptor");
LOG_ERROR(log, "Cannot set dup conf interceptor due to {} error", status);
}
}

View File

@ -11,6 +11,7 @@
#include <ext/shared_ptr_helper.h>
#include <mutex>
#include <list>
#include <atomic>
namespace cppkafka
@ -23,12 +24,16 @@ class Configuration;
namespace DB
{
struct StorageKafkaInterceptors;
/** Implements a Kafka queue table engine that can be used as a persistent queue / buffer,
* or as a basic building block for creating pipelines with a continuous insertion / ETL.
*/
class StorageKafka final : public ext::shared_ptr_helper<StorageKafka>, public IStorage
{
friend struct ext::shared_ptr_helper<StorageKafka>;
friend struct StorageKafkaInterceptors;
public:
std::string getName() const override { return "Kafka"; }
@ -105,11 +110,14 @@ private:
std::vector<std::shared_ptr<TaskContext>> tasks;
bool thread_per_consumer = false;
/// For memory accounting in the librdkafka threads.
std::mutex thread_statuses_mutex;
std::list<std::shared_ptr<ThreadStatus>> thread_statuses;
SettingsChanges createSettingsAdjustments();
ConsumerBufferPtr createReadBuffer(const size_t consumer_number);
// Update Kafka configuration with values from CH user configuration.
void updateConfiguration(cppkafka::Configuration & conf);
void threadFunc(size_t idx);

View File

@ -276,7 +276,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
ReadBufferFromString ttl_infos_buffer(ttl_infos_string);
assertString("ttl format version: 1\n", ttl_infos_buffer);
ttl_infos.read(ttl_infos_buffer);
reservation = data.reserveSpacePreferringTTLRules(sum_files_size, ttl_infos, std::time(nullptr), 0, true);
reservation = data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true);
}
else
reservation = data.reserveSpace(sum_files_size);

View File

@ -88,6 +88,7 @@ void IMergeTreeDataPart::MinMaxIndex::store(
out_hashing.next();
out_checksums.files[file_name].file_size = out_hashing.count();
out_checksums.files[file_name].file_hash = out_hashing.getHash();
out->finalize();
}
}

View File

@ -3037,28 +3037,31 @@ ReservationPtr MergeTreeData::tryReserveSpace(UInt64 expected_size, SpacePtr spa
return space->reserve(expected_size);
}
ReservationPtr MergeTreeData::reserveSpacePreferringTTLRules(UInt64 expected_size,
const IMergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move,
size_t min_volume_index,
bool is_insert) const
ReservationPtr MergeTreeData::reserveSpacePreferringTTLRules(
const StorageMetadataPtr & metadata_snapshot,
UInt64 expected_size,
const IMergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move,
size_t min_volume_index,
bool is_insert) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
ReservationPtr reservation = tryReserveSpacePreferringTTLRules(expected_size, ttl_infos, time_of_move, min_volume_index, is_insert);
ReservationPtr reservation = tryReserveSpacePreferringTTLRules(metadata_snapshot, expected_size, ttl_infos, time_of_move, min_volume_index, is_insert);
return checkAndReturnReservation(expected_size, std::move(reservation));
}
ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_size,
const IMergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move,
size_t min_volume_index,
bool is_insert) const
ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(
const StorageMetadataPtr & metadata_snapshot,
UInt64 expected_size,
const IMergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move,
size_t min_volume_index,
bool is_insert) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
auto metadata_snapshot = getInMemoryMetadataPtr();
ReservationPtr reservation;
auto move_ttl_entry = selectTTLDescriptionForTTLInfos(metadata_snapshot->getMoveTTLs(), ttl_infos.moves_ttl, time_of_move, true);

View File

@ -632,6 +632,7 @@ public:
/// Reserves space at least 1MB preferring best destination according to `ttl_infos`.
ReservationPtr reserveSpacePreferringTTLRules(
const StorageMetadataPtr & metadata_snapshot,
UInt64 expected_size,
const IMergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move,
@ -639,6 +640,7 @@ public:
bool is_insert = false) const;
ReservationPtr tryReserveSpacePreferringTTLRules(
const StorageMetadataPtr & metadata_snapshot,
UInt64 expected_size,
const IMergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move,

View File

@ -624,6 +624,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
MergeList::Entry & merge_entry,
TableLockHolder &,
time_t time_of_merge,
const Context & context,
const ReservationPtr & space_reservation,
bool deduplicate)
{
@ -706,7 +707,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
MergeAlgorithm merge_alg = chooseMergeAlgorithm(parts, sum_input_rows_upper_bound, gathering_columns, deduplicate, need_remove_expired_values);
merge_entry->merge_algorithm = merge_alg;
LOG_DEBUG(log, "Selected MergeAlgorithm: {}", ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal"));
LOG_DEBUG(log, "Selected MergeAlgorithm: {}", toString(merge_alg));
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
/// (which is locked in data.getTotalActiveSizeInBytes())
@ -715,7 +716,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
/// deadlock is impossible.
auto compression_codec = data.getCompressionCodecForPart(merge_entry->total_size_bytes_compressed, new_data_part->ttl_infos, time_of_merge);
/// TODO: Should it go through IDisk interface?
auto tmp_disk = context.getTemporaryVolume()->getDisk();
String rows_sources_file_path;
std::unique_ptr<WriteBufferFromFileBase> rows_sources_uncompressed_write_buf;
std::unique_ptr<WriteBuffer> rows_sources_write_buf;
@ -723,9 +724,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
if (merge_alg == MergeAlgorithm::Vertical)
{
disk->createDirectories(new_part_tmp_path);
tmp_disk->createDirectories(new_part_tmp_path);
rows_sources_file_path = new_part_tmp_path + "rows_sources";
rows_sources_uncompressed_write_buf = disk->writeFile(rows_sources_file_path);
rows_sources_uncompressed_write_buf = tmp_disk->writeFile(rows_sources_file_path);
rows_sources_write_buf = std::make_unique<CompressedWriteBuffer>(*rows_sources_uncompressed_write_buf);
for (const MergeTreeData::DataPartPtr & part : parts)
@ -955,7 +956,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
+ ") differs from number of bytes written to rows_sources file (" + toString(rows_sources_count)
+ "). It is a bug.", ErrorCodes::LOGICAL_ERROR);
CompressedReadBufferFromFile rows_sources_read_buf(disk->readFile(rows_sources_file_path));
CompressedReadBufferFromFile rows_sources_read_buf(tmp_disk->readFile(rows_sources_file_path));
IMergedBlockOutputStream::WrittenOffsetColumns written_offset_columns;
for (size_t column_num = 0, gathering_column_names_size = gathering_column_names.size();
@ -1027,7 +1028,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
merge_entry->progress.store(progress_before + column_sizes->columnWeight(column_name), std::memory_order_relaxed);
}
disk->remove(rows_sources_file_path);
tmp_disk->remove(rows_sources_file_path);
}
for (const auto & part : parts)

View File

@ -114,6 +114,7 @@ public:
MergeListEntry & merge_entry,
TableLockHolder & table_lock_holder,
time_t time_of_merge,
const Context & context,
const ReservationPtr & space_reservation,
bool deduplicate);

View File

@ -182,6 +182,10 @@ std::optional<TTLDescription> selectTTLDescriptionForTTLInfos(const TTLDescripti
for (auto ttl_entry_it = descriptions.begin(); ttl_entry_it != descriptions.end(); ++ttl_entry_it)
{
auto ttl_info_it = ttl_info_map.find(ttl_entry_it->result_column);
if (ttl_info_it == ttl_info_map.end())
continue;
time_t ttl_time;
if (use_max)
@ -190,8 +194,7 @@ std::optional<TTLDescription> selectTTLDescriptionForTTLInfos(const TTLDescripti
ttl_time = ttl_info_it->second.min;
/// Prefer TTL rule which went into action last.
if (ttl_info_it != ttl_info_map.end()
&& ttl_time <= current_time
if (ttl_time <= current_time
&& best_ttl_time <= ttl_time)
{
best_entry_it = ttl_entry_it;

View File

@ -229,6 +229,8 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart:
marks.next();
addToChecksums(checksums);
plain_file->finalize();
marks_file->finalize();
if (sync)
{
plain_file->sync();

View File

@ -17,8 +17,12 @@ namespace
void MergeTreeDataPartWriterOnDisk::Stream::finalize()
{
compressed.next();
plain_file->next();
/// 'compressed_buf' doesn't call next() on underlying buffer ('plain_hashing'). We should do it manually.
plain_hashing.next();
marks.next();
plain_file->finalize();
marks_file->finalize();
}
void MergeTreeDataPartWriterOnDisk::Stream::sync() const
@ -331,6 +335,7 @@ void MergeTreeDataPartWriterOnDisk::finishPrimaryIndexSerialization(
index_stream->next();
checksums.files["primary.idx"].file_size = index_stream->count();
checksums.files["primary.idx"].file_hash = index_stream->getHash();
index_file_stream->finalize();
if (sync)
index_file_stream->sync();
index_stream = nullptr;

View File

@ -1,6 +1,7 @@
#include <boost/rational.hpp> /// For calculations related to sampling coefficients.
#include <ext/scope_guard.h>
#include <optional>
#include <unordered_set>
#include <Poco/File.h>
@ -18,6 +19,7 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSampleRatio.h>
#include <Parsers/parseIdentifierOrStringLiteral.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/Context.h>
@ -59,6 +61,7 @@ namespace ErrorCodes
extern const int ILLEGAL_COLUMN;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int TOO_MANY_ROWS;
extern const int CANNOT_PARSE_TEXT;
}
@ -552,6 +555,38 @@ Pipe MergeTreeDataSelectExecutor::readFromParts(
useful_indices.emplace_back(index_helper, condition);
}
if (settings.force_data_skipping_indices.changed)
{
const auto & indices = settings.force_data_skipping_indices.toString();
Strings forced_indices;
{
Tokens tokens(&indices[0], &indices[indices.size()], settings.max_query_size);
IParser::Pos pos(tokens, settings.max_parser_depth);
Expected expected;
if (!parseIdentifiersOrStringLiterals(pos, expected, forced_indices))
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT,
"Cannot parse force_data_skipping_indices ('{}')", indices);
}
if (forced_indices.empty())
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "No indices parsed from force_data_skipping_indices ('{}')", indices);
std::unordered_set<std::string> useful_indices_names;
for (const auto & useful_index : useful_indices)
useful_indices_names.insert(useful_index.first->index.name);
for (const auto & index_name : forced_indices)
{
if (!useful_indices_names.count(index_name))
{
throw Exception(ErrorCodes::INDEX_NOT_USED,
"Index {} is not used and setting 'force_data_skipping_indices' contains it",
backQuote(index_name));
}
}
}
RangesInDataParts parts_with_ranges(parts.size());
size_t sum_marks = 0;
std::atomic<size_t> sum_marks_pk = 0;

View File

@ -237,7 +237,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
updateTTL(ttl_entry, move_ttl_infos, move_ttl_infos.moves_ttl[ttl_entry.result_column], block, false);
NamesAndTypesList columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
ReservationPtr reservation = data.reserveSpacePreferringTTLRules(expected_size, move_ttl_infos, time(nullptr), 0, true);
ReservationPtr reservation = data.reserveSpacePreferringTTLRules(metadata_snapshot, expected_size, move_ttl_infos, time(nullptr), 0, true);
VolumePtr volume = data.getStoragePolicy()->getVolume(0);
auto new_data_part = data.createPart(

View File

@ -156,6 +156,7 @@ void MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr
out_hashing.next();
checksums.files["partition.dat"].file_size = out_hashing.count();
checksums.files["partition.dat"].file_hash = out_hashing.getHash();
out->finalize();
}
void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row)

View File

@ -58,7 +58,7 @@ struct Settings;
/** Replication settings. */ \
M(UInt64, replicated_deduplication_window, 100, "How many last blocks of hashes should be kept in ZooKeeper (old blocks will be deleted).", 0) \
M(UInt64, replicated_deduplication_window_seconds, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \
M(UInt64, max_replicated_logs_to_keep, 100, "How many records may be in log, if there is inactive replica.", 0) \
M(UInt64, max_replicated_logs_to_keep, 1000, "How many records may be in log, if there is inactive replica. Inactive replica becomes lost when when this number exceed.", 0) \
M(UInt64, min_replicated_logs_to_keep, 10, "Keep about this number of last records in ZooKeeper log, even if they are obsolete. It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning.", 0) \
M(Seconds, prefer_fetch_merged_part_time_threshold, 3600, "If time passed after replication log entry creation exceeds this threshold and sum size of parts is greater than \"prefer_fetch_merged_part_size_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \
M(UInt64, prefer_fetch_merged_part_size_threshold, 10ULL * 1024 * 1024 * 1024, "If sum size of parts exceeds this threshold and time passed after replication log entry creation is greater than \"prefer_fetch_merged_part_time_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \
@ -75,6 +75,7 @@ struct Settings;
M(UInt64, replicated_max_parallel_sends_for_table, 0, "Limit parallel sends for one table.", 0) \
M(Bool, replicated_can_become_leader, true, "If true, Replicated tables replicas on this node will try to acquire leadership.", 0) \
M(Seconds, zookeeper_session_expiration_check_period, 60, "ZooKeeper session expiration check period, in seconds.", 0) \
M(Bool, detach_old_local_parts_when_cloning_replica, 1, "Do not remove old local parts when repairing lost replica.", 0) \
\
/** Check delay of replicas settings. */ \
M(UInt64, min_relative_delay_to_measure, 120, "Calculate relative replica delay only if absolute delay is not less that this value.", 0) \

View File

@ -148,6 +148,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
count_out_hashing.next();
checksums.files["count.txt"].file_size = count_out_hashing.count();
checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
count_out->finalize();
if (sync)
count_out->sync();
}
@ -160,6 +161,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
new_part->ttl_infos.write(out_hashing);
checksums.files["ttl.txt"].file_size = out_hashing.count();
checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
out->finalize();
if (sync)
out->sync();
}
@ -170,6 +172,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
/// Write a file with a description of columns.
auto out = volume->getDisk()->writeFile(part_path + "columns.txt", 4096);
part_columns.writeText(*out);
out->finalize();
if (sync)
out->sync();
}
@ -178,6 +181,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
{
auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096);
DB::writeText(queryToString(default_codec->getFullCodecDesc()), *out);
out->finalize();
}
else
{
@ -189,6 +193,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
/// Write file with checksums.
auto out = volume->getDisk()->writeFile(part_path + "checksums.txt", 4096);
checksums.write(*out);
out->finalize();
if (sync)
out->sync();
}

View File

@ -274,6 +274,7 @@ void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map
for (const auto & pair : log_pointers_candidate_lost_replicas)
{
String replica = pair.first;
LOG_WARNING(log, "Will mark replica {} as lost, because it has stale log pointer: {}", replica, pair.second);
Coordination::Requests ops;
/// If host changed version we can not mark replicas, because replica started to be active.
ops.emplace_back(zkutil::makeCheckRequest(

View File

@ -715,6 +715,16 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
for (const String & produced_part_name : queue_entry->getVirtualPartNames())
{
auto part_info = MergeTreePartInfo::fromPartName(produced_part_name, format_version);
/// Oddly enough, getVirtualPartNames() may return _virtual_ part name.
/// Such parts do not exist and will never appear, so we should not add virtual parts to parts_to_do list.
/// Fortunately, it's easy to distinguish virtual parts from normal parts by part level.
/// See StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(...)
auto max_level = MergeTreePartInfo::MAX_LEVEL; /// DROP/DETACH PARTITION
auto another_max_level = std::numeric_limits<decltype(part_info.level)>::max(); /// REPLACE/MOVE PARTITION
if (part_info.level == max_level || part_info.level == another_max_level)
continue;
auto it = entry->block_numbers.find(part_info.partition_id);
if (it != entry->block_numbers.end() && it->second > part_info.getDataVersion())
mutation.parts_to_do.add(produced_part_name);

View File

@ -21,81 +21,64 @@ namespace ErrorCodes
class MemorySource : public SourceWithProgress
{
using InitializerFunc = std::function<void(BlocksList::const_iterator &, size_t &)>;
public:
/// We use range [first, last] which includes right border.
/// Blocks are stored in std::list which may be appended in another thread.
/// We don't use synchronisation here, because elements in range [first, last] won't be modified.
/// We use pointer to the beginning of the list and its current size.
/// We don't need synchronisation in this reader, because while we hold SharedLock on storage,
/// only new elements can be added to the back of the list, so our iterators remain valid
MemorySource(
Names column_names_,
BlocksList::iterator first_,
BlocksList::const_iterator first_,
size_t num_blocks_,
const StorageMemory & storage,
const StorageMetadataPtr & metadata_snapshot)
const StorageMetadataPtr & metadata_snapshot,
InitializerFunc initializer_func_ = [](BlocksList::const_iterator &, size_t &) {})
: SourceWithProgress(metadata_snapshot->getSampleBlockForColumns(column_names_, storage.getVirtuals(), storage.getStorageID()))
, column_names(std::move(column_names_))
, current_it(first_)
, num_blocks(num_blocks_)
, initializer_func(std::move(initializer_func_))
{
}
/// If called, will initialize the number of blocks at first read.
/// It allows to read data which was inserted into memory table AFTER Storage::read was called.
/// This hack is needed for global subqueries.
void delayInitialization(BlocksList * data_, std::mutex * mutex_)
{
data = data_;
mutex = mutex_;
}
String getName() const override { return "Memory"; }
protected:
Chunk generate() override
{
if (data)
if (!postponed_init_done)
{
std::lock_guard guard(*mutex);
current_it = data->begin();
num_blocks = data->size();
is_finished = num_blocks == 0;
data = nullptr;
mutex = nullptr;
initializer_func(current_it, num_blocks);
postponed_init_done = true;
}
if (is_finished)
{
if (current_block_idx == num_blocks)
return {};
}
else
{
const Block & src = *current_it;
Columns columns;
columns.reserve(column_names.size());
/// Add only required columns to `res`.
for (const auto & name : column_names)
columns.emplace_back(src.getByName(name).column);
const Block & src = *current_it;
Columns columns;
columns.reserve(column_names.size());
++current_block_idx;
/// Add only required columns to `res`.
for (const auto & name : column_names)
columns.push_back(src.getByName(name).column);
if (current_block_idx == num_blocks)
is_finished = true;
else
++current_it;
if (++current_block_idx < num_blocks)
++current_it;
return Chunk(std::move(columns), src.rows());
}
return Chunk(std::move(columns), src.rows());
}
private:
Names column_names;
BlocksList::iterator current_it;
size_t current_block_idx = 0;
size_t num_blocks;
bool is_finished = false;
BlocksList * data = nullptr;
std::mutex * mutex = nullptr;
private:
const Names column_names;
BlocksList::const_iterator current_it;
size_t num_blocks;
size_t current_block_idx = 0;
bool postponed_init_done = false;
InitializerFunc initializer_func;
};
@ -113,9 +96,18 @@ public:
void write(const Block & block) override
{
const auto size_bytes_diff = block.allocatedBytes();
const auto size_rows_diff = block.rows();
metadata_snapshot->check(block, true);
std::lock_guard lock(storage.mutex);
storage.data.push_back(block);
{
std::lock_guard lock(storage.mutex);
storage.data.push_back(block);
storage.total_size_bytes.fetch_add(size_bytes_diff, std::memory_order_relaxed);
storage.total_size_rows.fetch_add(size_rows_diff, std::memory_order_relaxed);
}
}
private:
StorageMemory & storage;
@ -144,8 +136,6 @@ Pipe StorageMemory::read(
{
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
std::lock_guard lock(mutex);
if (delay_read_for_global_subqueries)
{
/// Note: for global subquery we use single source.
@ -156,11 +146,22 @@ Pipe StorageMemory::read(
/// set for IN or hash table for JOIN, which can't be done concurrently.
/// Since no other manipulation with data is done, multiple sources shouldn't give any profit.
auto source = std::make_shared<MemorySource>(column_names, data.begin(), data.size(), *this, metadata_snapshot);
source->delayInitialization(&data, &mutex);
return Pipe(std::move(source));
return Pipe(
std::make_shared<MemorySource>(
column_names, data.end(), 0, *this, metadata_snapshot,
/// This hack is needed for global subqueries.
/// It allows to set up this Source for read AFTER Storage::read() has been called and just before actual reading
[this](BlocksList::const_iterator & current_it, size_t & num_blocks)
{
std::lock_guard guard(mutex);
current_it = data.begin();
num_blocks = data.size();
}
));
}
std::lock_guard lock(mutex);
size_t size = data.size();
if (num_streams > size)
@ -168,7 +169,7 @@ Pipe StorageMemory::read(
Pipes pipes;
BlocksList::iterator it = data.begin();
BlocksList::const_iterator it = data.begin();
size_t offset = 0;
for (size_t stream = 0; stream < num_streams; ++stream)
@ -201,31 +202,32 @@ void StorageMemory::drop()
{
std::lock_guard lock(mutex);
data.clear();
total_size_bytes.store(0, std::memory_order_relaxed);
total_size_rows.store(0, std::memory_order_relaxed);
}
void StorageMemory::truncate(
const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &)
{
std::lock_guard lock(mutex);
data.clear();
total_size_bytes.store(0, std::memory_order_relaxed);
total_size_rows.store(0, std::memory_order_relaxed);
}
std::optional<UInt64> StorageMemory::totalRows() const
{
UInt64 rows = 0;
std::lock_guard lock(mutex);
for (const auto & buffer : data)
rows += buffer.rows();
return rows;
/// All modifications of these counters are done under mutex which automatically guarantees synchronization/consistency
/// When run concurrently we are fine with any value: "before" or "after"
return total_size_rows.load(std::memory_order_relaxed);
}
std::optional<UInt64> StorageMemory::totalBytes() const
{
UInt64 bytes = 0;
std::lock_guard lock(mutex);
for (const auto & buffer : data)
bytes += buffer.allocatedBytes();
return bytes;
return total_size_bytes.load(std::memory_order_relaxed);
}

View File

@ -1,5 +1,7 @@
#pragma once
#include <atomic>
#include <optional>
#include <mutex>
#include <ext/shared_ptr_helper.h>
@ -93,6 +95,9 @@ private:
bool delay_read_for_global_subqueries = false;
std::atomic<size_t> total_size_bytes = 0;
std::atomic<size_t> total_size_rows = 0;
protected:
StorageMemory(const StorageID & table_id_, ColumnsDescription columns_description_, ConstraintsDescription constraints_);
};

View File

@ -286,7 +286,12 @@ struct CurrentlyMergingPartsTagger
StorageMergeTree & storage;
public:
CurrentlyMergingPartsTagger(FutureMergedMutatedPart & future_part_, size_t total_size, StorageMergeTree & storage_, bool is_mutation)
CurrentlyMergingPartsTagger(
FutureMergedMutatedPart & future_part_,
size_t total_size,
StorageMergeTree & storage_,
const StorageMetadataPtr & metadata_snapshot,
bool is_mutation)
: future_part(future_part_), storage(storage_)
{
/// Assume mutex is already locked, because this method is called from mergeTask.
@ -304,7 +309,7 @@ public:
max_volume_index = std::max(max_volume_index, storage.getStoragePolicy()->getVolumeIndexByDisk(part_ptr->volume->getDisk()));
}
reserved_space = storage.tryReserveSpacePreferringTTLRules(total_size, ttl_infos, time(nullptr), max_volume_index);
reserved_space = storage.tryReserveSpacePreferringTTLRules(metadata_snapshot, total_size, ttl_infos, time(nullptr), max_volume_index);
}
if (!reserved_space)
{
@ -715,7 +720,7 @@ bool StorageMergeTree::merge(
return false;
}
merging_tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, false);
merging_tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, metadata_snapshot, false);
auto table_id = getStorageID();
merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part);
}
@ -739,7 +744,7 @@ bool StorageMergeTree::merge(
try
{
new_part = merger_mutator.mergePartsToTemporaryPart(
future_part, metadata_snapshot, *merge_entry, table_lock_holder, time(nullptr),
future_part, metadata_snapshot, *merge_entry, table_lock_holder, time(nullptr), global_context,
merging_tagger->reserved_space, deduplicate);
merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
@ -856,7 +861,7 @@ bool StorageMergeTree::tryMutatePart()
future_part.name = part->getNewName(new_part_info);
future_part.type = part->getType();
tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, true);
tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true);
break;
}
}

View File

@ -721,7 +721,10 @@ void StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, con
throw Exception("Table was not dropped because ZooKeeper session has expired.", ErrorCodes::TABLE_WAS_NOT_DROPPED);
auto remote_replica_path = zookeeper_path + "/replicas/" + replica;
LOG_INFO(logger, "Removing replica {}", remote_replica_path);
LOG_INFO(logger, "Removing replica {}, marking it as lost", remote_replica_path);
/// Mark itself lost before removing, because the following recursive removal may fail
/// and partially dropped replica may be considered as alive one (until someone will mark it lost)
zookeeper->trySet(zookeeper_path + "/replicas/" + replica + "/is_lost", "1");
/// It may left some garbage if replica_path subtree are concurrently modified
zookeeper->tryRemoveRecursive(remote_replica_path);
if (zookeeper->exists(remote_replica_path))
@ -1416,11 +1419,11 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
ttl_infos.update(part_ptr->ttl_infos);
max_volume_index = std::max(max_volume_index, getStoragePolicy()->getVolumeIndexByDisk(part_ptr->volume->getDisk()));
}
ReservationPtr reserved_space = reserveSpacePreferringTTLRules(estimated_space_for_merge,
ttl_infos, time(nullptr), max_volume_index);
auto table_lock = lockForShare(RWLockImpl::NO_QUERY, storage_settings_ptr->lock_acquire_timeout_for_background_operations);
StorageMetadataPtr metadata_snapshot = getInMemoryMetadataPtr();
ReservationPtr reserved_space = reserveSpacePreferringTTLRules(
metadata_snapshot, estimated_space_for_merge, ttl_infos, time(nullptr), max_volume_index);
FutureMergedMutatedPart future_merged_part(parts, entry.new_part_type);
if (future_merged_part.name != entry.new_part_name)
@ -1450,7 +1453,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
{
part = merger_mutator.mergePartsToTemporaryPart(
future_merged_part, metadata_snapshot, *merge_entry,
table_lock, entry.create_time, reserved_space, entry.deduplicate);
table_lock, entry.create_time, global_context, reserved_space, entry.deduplicate);
merger_mutator.renameMergedTemporaryPart(part, parts, &transaction);
@ -2182,8 +2185,6 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coordination::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper)
{
LOG_INFO(log, "Will mimic {}", source_replica);
String source_path = zookeeper_path + "/replicas/" + source_replica;
/** TODO: it will be deleted! (It is only to support old version of CH server).
@ -2309,6 +2310,17 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
LOG_WARNING(log, "Source replica does not have part {}. Removing it from working set.", part->name);
}
}
if (getSettings()->detach_old_local_parts_when_cloning_replica)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
for (const auto & part : parts_to_remove_from_working_set)
{
LOG_INFO(log, "Detaching {}", part->relative_path);
part->makeCloneInDetached("clone", metadata_snapshot);
}
}
removePartsFromWorkingSet(parts_to_remove_from_working_set, true);
for (const String & name : active_parts)
@ -2336,47 +2348,119 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper)
{
Coordination::Stat is_lost_stat;
bool is_new_replica = true;
String res;
if (zookeeper->tryGet(replica_path + "/is_lost", res))
if (zookeeper->tryGet(replica_path + "/is_lost", res, &is_lost_stat))
{
if (res == "0")
return;
if (is_lost_stat.version)
is_new_replica = false;
}
else
{
/// Replica was created by old version of CH, so me must create "/is_lost".
/// Note that in old version of CH there was no "lost" replicas possible.
/// TODO is_lost node should always exist since v18.12, maybe we can replace `tryGet` with `get` and remove old code?
zookeeper->create(replica_path + "/is_lost", "0", zkutil::CreateMode::Persistent);
return;
}
/// is_lost is "1": it means that we are in repair mode.
String source_replica;
Coordination::Stat source_is_lost_stat;
source_is_lost_stat.version = -1;
for (const String & source_replica_name : zookeeper->getChildren(zookeeper_path + "/replicas"))
/// Try choose source replica to clone.
/// Source replica must not be lost and should have minimal queue size and maximal log pointer.
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
std::vector<zkutil::ZooKeeper::FutureGet> futures;
for (const String & source_replica_name : replicas)
{
/// Do not clone from myself.
if (source_replica_name == replica_name)
continue;
String source_replica_path = zookeeper_path + "/replicas/" + source_replica_name;
/// Do not clone from myself.
if (source_replica_path != replica_path)
/// Obviously the following get operations are not atomic, but it's ok to choose good enough replica, not the best one.
/// NOTE: We may count some entries twice if log_pointer is moved.
futures.emplace_back(zookeeper->asyncTryGet(source_replica_path + "/is_lost"));
futures.emplace_back(zookeeper->asyncTryGet(source_replica_path + "/log_pointer"));
futures.emplace_back(zookeeper->asyncTryGet(source_replica_path + "/queue"));
}
/// Wait for results before getting log entries
for (auto & future : futures)
future.wait();
Strings log_entries = zookeeper->getChildren(zookeeper_path + "/log");
size_t max_log_entry = 0;
if (!log_entries.empty())
{
String last_entry = *std::max_element(log_entries.begin(), log_entries.end());
max_log_entry = parse<UInt64>(last_entry.substr(strlen("log-")));
}
/// log_pointer can point to future entry, which was not created yet
++max_log_entry;
size_t min_replication_lag = std::numeric_limits<size_t>::max();
String source_replica;
Coordination::Stat source_is_lost_stat;
size_t future_num = 0;
for (const String & source_replica_name : replicas)
{
if (source_replica_name == replica_name)
continue;
auto get_is_lost = futures[future_num++].get();
auto get_log_pointer = futures[future_num++].get();
auto get_queue = futures[future_num++].get();
if (get_is_lost.error != Coordination::Error::ZOK)
{
/// Do not clone from lost replicas.
String source_replica_is_lost_value;
if (!zookeeper->tryGet(source_replica_path + "/is_lost", source_replica_is_lost_value, &source_is_lost_stat)
|| source_replica_is_lost_value == "0")
{
source_replica = source_replica_name;
break;
}
LOG_INFO(log, "Not cloning {}, cannot get '/is_lost': {}", Coordination::errorMessage(get_is_lost.error));
continue;
}
else if (get_is_lost.data != "0")
{
LOG_INFO(log, "Not cloning {}, it's lost");
continue;
}
if (get_log_pointer.error != Coordination::Error::ZOK)
{
LOG_INFO(log, "Not cloning {}, cannot get '/log_pointer': {}", Coordination::errorMessage(get_log_pointer.error));
continue;
}
if (get_queue.error != Coordination::Error::ZOK)
{
LOG_INFO(log, "Not cloning {}, cannot get '/queue': {}", Coordination::errorMessage(get_queue.error));
continue;
}
/// Replica is not lost and we can clone it. Let's calculate approx replication lag.
size_t source_log_pointer = get_log_pointer.data.empty() ? 0 : parse<UInt64>(get_log_pointer.data);
assert(source_log_pointer <= max_log_entry);
size_t replica_queue_lag = max_log_entry - source_log_pointer;
size_t replica_queue_size = get_queue.stat.numChildren;
size_t replication_lag = replica_queue_lag + replica_queue_size;
LOG_INFO(log, "Replica {} has log pointer '{}', approximate {} queue lag and {} queue size",
source_replica_name, get_log_pointer.data, replica_queue_lag, replica_queue_size);
if (replication_lag < min_replication_lag)
{
source_replica = source_replica_name;
source_is_lost_stat = get_is_lost.stat;
min_replication_lag = replication_lag;
}
}
if (source_replica.empty())
throw Exception("All replicas are lost", ErrorCodes::ALL_REPLICAS_LOST);
if (is_new_replica)
LOG_INFO(log, "Will mimic {}", source_replica);
else
LOG_WARNING(log, "Will mimic {}", source_replica);
/// Clear obsolete queue that we no longer need.
zookeeper->removeChildren(replica_path + "/queue");
@ -4053,6 +4137,7 @@ Pipe StorageReplicatedMergeTree::alterPartition(
/// If new version returns ordinary name, else returns part name containing the first and last month of the month
/// NOTE: use it in pair with getFakePartCoveringAllPartsInPartition(...)
static String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info)
{
if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
@ -4068,7 +4153,7 @@ static String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version,
return part_info.getPartName();
}
bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info)
bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info, bool for_replace_partition)
{
/// Even if there is no data in the partition, you still need to mark the range for deletion.
/// - Because before executing DETACH, tasks for downloading parts to this partition can be executed.
@ -4091,14 +4176,21 @@ bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const St
mutation_version = queue.getCurrentMutationVersion(partition_id, right);
}
/// Empty partition.
if (right == 0)
return false;
/// REPLACE PARTITION uses different max level and does not decrement max_block of DROP_RANGE for unknown (probably historical) reason.
auto max_level = std::numeric_limits<decltype(part_info.level)>::max();
if (!for_replace_partition)
{
max_level = MergeTreePartInfo::MAX_LEVEL;
--right;
/// Empty partition.
if (right == 0)
return false;
--right;
}
/// Artificial high level is chosen, to make this part "covering" all parts inside.
part_info = MergeTreePartInfo(partition_id, left, right, MergeTreePartInfo::MAX_LEVEL, mutation_version);
part_info = MergeTreePartInfo(partition_id, left, right, max_level, mutation_version);
return true;
}
@ -5305,11 +5397,11 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
/// Firstly, generate last block number and compute drop_range
/// NOTE: Even if we make ATTACH PARTITION instead of REPLACE PARTITION drop_range will not be empty, it will contain a block.
/// So, such case has special meaning, if drop_range contains only one block it means that nothing to drop.
/// TODO why not to add normal DROP_RANGE entry to replication queue if `replace` is true?
MergeTreePartInfo drop_range;
drop_range.partition_id = partition_id;
drop_range.max_block = allocateBlockNumber(partition_id, zookeeper)->getNumber();
drop_range.min_block = replace ? 0 : drop_range.max_block;
drop_range.level = std::numeric_limits<decltype(drop_range.level)>::max();
getFakePartCoveringAllPartsInPartition(partition_id, drop_range, true);
if (!replace)
drop_range.min_block = drop_range.max_block;
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
@ -5388,6 +5480,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
}
/// We are almost ready to commit changes, remove fetches and merges from drop range
/// FIXME it's unsafe to remove queue entries before we actually commit REPLACE_RANGE to replication log
queue.removePartProducingOpsInRange(zookeeper, drop_range, entry);
/// Remove deduplication block_ids of replacing parts
@ -5502,11 +5595,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
/// A range for log entry to remove parts from the source table (myself).
MergeTreePartInfo drop_range;
drop_range.partition_id = partition_id;
drop_range.max_block = allocateBlockNumber(partition_id, zookeeper)->getNumber();
drop_range.min_block = 0;
drop_range.level = std::numeric_limits<decltype(drop_range.level)>::max();
getFakePartCoveringAllPartsInPartition(partition_id, drop_range, true);
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
if (drop_range.getBlocksCount() > 1)
@ -5561,6 +5650,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
drop_range_dest.max_block = drop_range.max_block;
drop_range_dest.min_block = drop_range.max_block;
drop_range_dest.level = drop_range.level;
drop_range_dest.mutation = drop_range.mutation;
entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
entry.source_replica = dest_table_storage->replica_name;

View File

@ -526,7 +526,7 @@ private:
/// Produce an imaginary part info covering all parts in the specified partition (at the call moment).
/// Returns false if the partition doesn't exist yet.
bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info);
bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info, bool for_replace_partition = false);
/// Check for a node in ZK. If it is, remember this information, and then immediately answer true.
std::unordered_set<std::string> existing_nodes_cache;

View File

@ -32,8 +32,13 @@ Pipe StorageValues::read(
{
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
Chunk chunk(res_block.getColumns(), res_block.rows());
return Pipe(std::make_shared<SourceFromSingleChunk>(res_block.cloneEmpty(), std::move(chunk)));
/// Get only required columns.
Block block;
for (const auto & name : column_names)
block.insert(res_block.getByName(name));
Chunk chunk(block.getColumns(), block.rows());
return Pipe(std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), std::move(chunk)));
}
}

View File

@ -102,7 +102,7 @@
"with_coverage": false
},
{
"compiler": "clang-10",
"compiler": "clang-11",
"build-type": "",
"sanitizer": "",
"package-type": "binary",
@ -134,7 +134,7 @@
"with_coverage": false
},
{
"compiler": "clang-10-darwin",
"compiler": "clang-11-darwin",
"build-type": "",
"sanitizer": "",
"package-type": "binary",
@ -144,7 +144,7 @@
"with_coverage": false
},
{
"compiler": "clang-10-aarch64",
"compiler": "clang-11-aarch64",
"build-type": "",
"sanitizer": "",
"package-type": "binary",
@ -154,7 +154,7 @@
"with_coverage": false
},
{
"compiler": "clang-10-freebsd",
"compiler": "clang-11-freebsd",
"build-type": "",
"sanitizer": "",
"package-type": "binary",

View File

@ -512,6 +512,15 @@ def collect_build_flags(client):
else:
raise Exception("Cannot get inforamtion about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr))
clickhouse_proc = Popen(shlex.split(client), stdin=PIPE, stdout=PIPE, stderr=PIPE)
(stdout, stderr) = clickhouse_proc.communicate(b"SELECT value FROM system.merge_tree_settings WHERE name = 'min_bytes_for_wide_part'")
if clickhouse_proc.returncode == 0:
if stdout == b'0\n':
result.append(BuildFlags.POLYMORPHIC_PARTS)
else:
raise Exception("Cannot get inforamtion about build from server errorcode {}, stderr {}".format(clickhouse_proc.returncode, stderr))
return result

View File

@ -1,5 +1,5 @@
<yandex>
<merge_tree>
<min_bytes_for_wide_part>10485760</min_bytes_for_wide_part>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
</merge_tree>
</yandex>

View File

@ -0,0 +1,12 @@
<?xml version="1.0" encoding="utf-8"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -42,7 +42,8 @@ def setup_module(module):
dictionaries = []
main_configs = []
main_configs.append(os.path.join('configs', 'disable_ssl_verification.xml'))
main_configs.append(os.path.join('configs', 'log_conf.xml'))
for fname in os.listdir(DICT_CONFIG_PATH):
dictionaries.append(os.path.join(DICT_CONFIG_PATH, fname))
@ -69,14 +70,20 @@ def started_cluster():
finally:
cluster.shutdown()
# We have a lot of race conditions in cassandra library
# https://github.com/ClickHouse/ClickHouse/issues/15754.
# TODO fix them and enable tests as soon as possible.
@pytest.mark.parametrize("layout_name", LAYOUTS_SIMPLE)
def test_simple(started_cluster, layout_name):
simple_tester.execute(layout_name, node)
if not node.is_built_with_thread_sanitizer():
simple_tester.execute(layout_name, node)
@pytest.mark.parametrize("layout_name", LAYOUTS_COMPLEX)
def test_complex(started_cluster, layout_name):
complex_tester.execute(layout_name, node)
if not node.is_built_with_thread_sanitizer():
complex_tester.execute(layout_name, node)
@pytest.mark.parametrize("layout_name", LAYOUTS_RANGED)
def test_ranged(started_cluster, layout_name):
ranged_tester.execute(layout_name, node)
if not node.is_built_with_thread_sanitizer():
ranged_tester.execute(layout_name, node)

View File

@ -42,6 +42,7 @@ def setup_module(module):
dictionaries = []
main_configs = []
main_configs.append(os.path.join('configs', 'disable_ssl_verification.xml'))
main_configs.append(os.path.join('configs', 'log_conf.xml'))
for fname in os.listdir(DICT_CONFIG_PATH):
dictionaries.append(os.path.join(DICT_CONFIG_PATH, fname))

View File

@ -0,0 +1,12 @@
<?xml version="1.0" encoding="utf-8"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -5,7 +5,7 @@ from helpers.cluster import ClickHouseCluster
CONFIG_FILES = ['configs/dictionaries/mysql_dict1.xml', 'configs/dictionaries/mysql_dict2.xml',
'configs/remote_servers.xml']
CONFIG_FILES += ['configs/enable_dictionaries.xml']
CONFIG_FILES += ['configs/enable_dictionaries.xml', 'configs/log_conf.xml']
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance', main_configs=CONFIG_FILES, with_mysql=True)

View File

@ -0,0 +1,11 @@
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -3,7 +3,7 @@ from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance')
instance = cluster.add_instance('instance', main_configs=['configs/log_conf.xml'])
@pytest.fixture(scope="module", autouse=True)

View File

@ -0,0 +1,12 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,26 @@
<yandex>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<!-- Use custom S3 endpoint -->
<endpoint>http://resolver:8080/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<!-- ClickHouse starts earlier than custom S3 endpoint. Skip access check to avoid fail on start-up -->
<skip_access_check>true</skip_access_check>
<!-- Avoid extra retries to speed up tests -->
<retry_attempts>0</retry_attempts>
</s3>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
</yandex>

View File

@ -0,0 +1,5 @@
<yandex>
<profiles>
<default/>
</profiles>
</yandex>

View File

@ -0,0 +1,20 @@
<?xml version="1.0"?>
<yandex>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
</yandex>

View File

@ -0,0 +1,49 @@
from bottle import request, route, run, response
# Endpoint can be configured to throw 500 error on N-th request attempt.
# In usual situation just redirects to original Minio server.
# Dict to the number of request should be failed.
cache = {}
@route('/fail_request/<_request_number>')
def fail_request(_request_number):
request_number = int(_request_number)
if request_number > 0:
cache['request_number'] = request_number
else:
cache.pop('request_number', None)
return 'OK'
# Handle for MultipleObjectsDelete.
@route('/<_bucket>', ['POST'])
def delete(_bucket):
response.set_header("Location", "http://minio1:9001/" + _bucket + "?" + request.query_string)
response.status = 307
return 'Redirected'
@route('/<_bucket>/<_path:path>', ['GET', 'POST', 'PUT', 'DELETE'])
def server(_bucket, _path):
if cache.get('request_number', None):
request_number = cache.pop('request_number') - 1
if request_number > 0:
cache['request_number'] = request_number
else:
response.status = 500
return 'Expected Error'
response.set_header("Location", "http://minio1:9001/" + _bucket + '/' + _path)
response.status = 307
return 'Redirected'
@route('/')
def ping():
return 'OK'
run(host='0.0.0.0', port=8080)

View File

@ -0,0 +1,117 @@
import logging
import os
import time
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
# Runs custom python-based S3 endpoint.
def run_endpoint(cluster):
logging.info("Starting custom S3 endpoint")
container_id = cluster.get_container_id('resolver')
current_dir = os.path.dirname(__file__)
cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_endpoint", "endpoint.py"), "endpoint.py")
cluster.exec_in_container(container_id, ["python", "endpoint.py"], detach=True)
# Wait for S3 endpoint start
for attempt in range(10):
ping_response = cluster.exec_in_container(cluster.get_container_id('resolver'),
["curl", "-s", "http://resolver:8080/"], nothrow=True)
if ping_response != 'OK':
if attempt == 9:
assert ping_response == 'OK', 'Expected "OK", but got "{}"'.format(ping_response)
else:
time.sleep(1)
else:
break
logging.info("S3 endpoint started")
def fail_request(cluster, request):
response = cluster.exec_in_container(cluster.get_container_id('resolver'),
["curl", "-s", "http://resolver:8080/fail_request/{}".format(request)])
assert response == 'OK', 'Expected "OK", but got "{}"'.format(response)
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node",
main_configs=["configs/config.d/log_conf.xml", "configs/config.d/storage_conf.xml"],
with_minio=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
run_endpoint(cluster)
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def drop_table(cluster):
yield
node = cluster.instances["node"]
node.query("DROP TABLE IF EXISTS s3_failover_test NO DELAY")
# S3 request will be failed for an appropriate part file write.
FILES_PER_PART_BASE = 5 # partition.dat, default_compression_codec.txt, count.txt, columns.txt, checksums.txt
FILES_PER_PART_WIDE = FILES_PER_PART_BASE + 1 + 1 + 3 * 2 # Primary index, MinMax, Mark and data file for column(s)
FILES_PER_PART_COMPACT = FILES_PER_PART_BASE + 1 + 1 + 2
@pytest.mark.parametrize(
"min_bytes_for_wide_part,request_count",
[
(0, FILES_PER_PART_WIDE),
(1024 * 1024, FILES_PER_PART_COMPACT)
]
)
def test_write_failover(cluster, min_bytes_for_wide_part, request_count):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE s3_failover_test (
dt Date,
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
PARTITION BY dt
SETTINGS storage_policy='s3', min_bytes_for_wide_part={}
"""
.format(min_bytes_for_wide_part)
)
for request in range(request_count + 1):
# Fail N-th request to S3.
fail_request(cluster, request + 1)
data = "('2020-03-01',0,'data'),('2020-03-01',1,'data')"
positive = request == request_count
try:
node.query("INSERT INTO s3_failover_test VALUES {}".format(data))
assert positive, "Insert query should be failed, request {}".format(request)
except QueryRuntimeException as e:
assert not positive, "Insert query shouldn't be failed, request {}".format(request)
assert str(e).find("Expected Error") != -1, "Unexpected error {}".format(str(e))
if positive:
# Disable request failing.
fail_request(cluster, 0)
assert node.query("CHECK TABLE s3_failover_test") == '1\n'
assert node.query("SELECT * FROM s3_failover_test FORMAT Values") == data

View File

@ -1,19 +0,0 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<default_database>shard_0</default_database>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<default_database>shard_0</default_database>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -9,16 +9,15 @@ def fill_nodes(nodes, shard):
for node in nodes:
node.query(
'''
CREATE DATABASE test;
CREATE TABLE test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test{shard}/replicated', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/replicated', '{replica}') ORDER BY id PARTITION BY toYYYYMM(date) SETTINGS min_replicated_logs_to_keep=3, max_replicated_logs_to_keep=5, cleanup_delay_period=0, cleanup_delay_period_random_add=0;
'''.format(shard=shard, replica=node.name))
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
node1 = cluster.add_instance('node1', with_zookeeper=True)
node2 = cluster.add_instance('node2', with_zookeeper=True)
node3 = cluster.add_instance('node3', with_zookeeper=True)
@pytest.fixture(scope="module")
@ -26,7 +25,7 @@ def start_cluster():
try:
cluster.start()
fill_nodes([node1, node2], 1)
fill_nodes([node1, node2, node3], 1)
yield cluster
@ -49,3 +48,29 @@ def test_recovery(start_cluster):
check_callback=lambda x: len(node2.query("select * from test_table")) > 0)
assert_eq_with_retry(node2, "SELECT count(*) FROM test_table", node1.query("SELECT count(*) FROM test_table"))
lost_marker = "Will mark replica node2 as lost"
assert node1.contains_in_log(lost_marker) or node3.contains_in_log(lost_marker)
def test_choose_source_replica(start_cluster):
node3.query("INSERT INTO test_table VALUES (2, 1)")
time.sleep(1)
node2.query("DETACH TABLE test_table")
node1.query("SYSTEM STOP FETCHES test_table") # node1 will have many entries in queue, so node2 will clone node3
for i in range(100):
node3.query("INSERT INTO test_table VALUES (2, {})".format(i))
node2.query_with_retry("ATTACH TABLE test_table",
check_callback=lambda x: len(node2.query("select * from test_table")) > 0)
node1.query("SYSTEM START FETCHES test_table")
node1.query("SYSTEM SYNC REPLICA test_table")
node2.query("SYSTEM SYNC REPLICA test_table")
assert node1.query("SELECT count(*) FROM test_table") == node3.query("SELECT count(*) FROM test_table")
assert node2.query("SELECT count(*) FROM test_table") == node3.query("SELECT count(*) FROM test_table")
lost_marker = "Will mark replica node2 as lost"
assert node1.contains_in_log(lost_marker) or node3.contains_in_log(lost_marker)
assert node2.contains_in_log("Will mimic node3")

View File

@ -6,7 +6,7 @@ from helpers.hdfs_api import HDFSApi
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_hdfs=True, user_configs=[], main_configs=['configs/log_conf.xml'])
node1 = cluster.add_instance('node1', with_hdfs=True, main_configs=['configs/log_conf.xml'])
@pytest.fixture(scope="module")

View File

@ -4,4 +4,4 @@ set -e
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
env CLICKHOUSE_CLIENT_OPT="--merge_tree_uniform_read_distribution=0" bash "$CURDIR"/00443_optimize_final_vertical_merge.sh
bash "$CURDIR"/00443_optimize_final_vertical_merge.sh

View File

@ -41,10 +41,10 @@ popd > /dev/null
#SCRIPTDIR=`dirname "$SCRIPTPATH"`
SCRIPTDIR=$SCRIPTPATH
cat "$SCRIPTDIR"/00282_merging.sql | $CLICKHOUSE_CLIENT --preferred_block_size_bytes=10 --merge_tree_uniform_read_distribution=1 -n > "${CLICKHOUSE_TMP}"/preferred_block_size_bytes.stdout 2>&1
cat "$SCRIPTDIR"/00282_merging.sql | $CLICKHOUSE_CLIENT --preferred_block_size_bytes=10 -n > "${CLICKHOUSE_TMP}"/preferred_block_size_bytes.stdout 2>&1
cmp "$SCRIPTDIR"/00282_merging.reference "${CLICKHOUSE_TMP}"/preferred_block_size_bytes.stdout && echo PASSED || echo FAILED
cat "$SCRIPTDIR"/00282_merging.sql | $CLICKHOUSE_CLIENT --preferred_block_size_bytes=20 --merge_tree_uniform_read_distribution=0 -n > "${CLICKHOUSE_TMP}"/preferred_block_size_bytes.stdout 2>&1
cat "$SCRIPTDIR"/00282_merging.sql | $CLICKHOUSE_CLIENT --preferred_block_size_bytes=20 -n > "${CLICKHOUSE_TMP}"/preferred_block_size_bytes.stdout 2>&1
cmp "$SCRIPTDIR"/00282_merging.reference "${CLICKHOUSE_TMP}"/preferred_block_size_bytes.stdout && echo PASSED || echo FAILED
rm "${CLICKHOUSE_TMP}"/preferred_block_size_bytes.stdout

View File

@ -5,18 +5,12 @@ INSERT INTO merge_tree SELECT 0 FROM numbers(1000000);
SET max_threads = 4;
SET max_rows_to_read = 1100000;
SET merge_tree_uniform_read_distribution = 1;
SELECT count() FROM merge_tree;
SET merge_tree_uniform_read_distribution = 0;
SELECT count() FROM merge_tree;
SET max_rows_to_read = 900000;
SET merge_tree_uniform_read_distribution = 1;
SELECT count() FROM merge_tree WHERE not ignore(); -- { serverError 158 }
SET merge_tree_uniform_read_distribution = 0;
SELECT count() FROM merge_tree WHERE not ignore(); -- { serverError 158 }
DROP TABLE merge_tree;

View File

@ -1,6 +1,3 @@
SET allow_experimental_data_skipping_indices = 1;
DROP TABLE IF EXISTS bloom_filter_null_array;
CREATE TABLE bloom_filter_null_array (v Array(LowCardinality(Nullable(String))), INDEX idx v TYPE bloom_filter(0.1) GRANULARITY 1) ENGINE = MergeTree() ORDER BY v;

View File

@ -1,5 +1,3 @@
SET allow_experimental_data_skipping_indices = 1;
DROP TABLE IF EXISTS test;
CREATE TABLE test (`int8` Int8, `int16` Int16, `int32` Int32, `int64` Int64, INDEX idx (`int8`, `int16`, `int32`, `int64`) TYPE bloom_filter(0.01) GRANULARITY 8192 ) ENGINE = MergeTree() ORDER BY `int8`;

View File

@ -1,5 +1,4 @@
CREATE TABLE old_syntax_01071_test (date Date, id UInt8) ENGINE = MergeTree(date, id, 8192);
SET allow_experimental_data_skipping_indices=1;
ALTER TABLE old_syntax_01071_test ADD INDEX id_minmax id TYPE minmax GRANULARITY 1; -- { serverError 36 }
CREATE TABLE new_syntax_01071_test (date Date, id UInt8) ENGINE = MergeTree() ORDER BY id;
ALTER TABLE new_syntax_01071_test ADD INDEX id_minmax id TYPE minmax GRANULARITY 1;

View File

@ -12,7 +12,7 @@ CREATE TABLE foo (
INSERT INTO foo VALUES (1, 0.5, 0.2, 0.3, 0.8);
SELECT divide(sum(a) + sum(b), nullIf(sum(c) + sum(d), 0)) FROM foo SETTINGS compile_expressions = 1, min_count_to_compile = 0;
SELECT divide(sum(a) + sum(b), nullIf(sum(c) + sum(d), 0)) FROM foo SETTINGS compile_expressions = 1, min_count_to_compile = 0;
SELECT divide(sum(a) + sum(b), nullIf(sum(c) + sum(d), 0)) FROM foo SETTINGS compile_expressions = 1;
SELECT divide(sum(a) + sum(b), nullIf(sum(c) + sum(d), 0)) FROM foo SETTINGS compile_expressions = 1;
DROP TABLE foo;

View File

@ -0,0 +1,14 @@
1 1
2 2
3 3
4 4
mt 0 0_1_1_0 2
rmt 0 0_0_0_0 2
1 1
2 2
mt 0 0_1_1_0 2
rmt 0 0_3_3_0 2
0000000000 UPDATE s = concat(\'s\', toString(n)) WHERE 1 [] 0 1
0000000001 DROP COLUMN s [] 0 1
3
4

View File

@ -0,0 +1,39 @@
drop table if exists mt;
drop table if exists rmt sync;
create table mt (n UInt64, s String) engine = MergeTree partition by intDiv(n, 10) order by n;
insert into mt values (3, '3'), (4, '4');
create table rmt (n UInt64, s String) engine = ReplicatedMergeTree('/clickhouse/test_01149/rmt', 'r1') partition by intDiv(n, 10) order by n;
insert into rmt values (1,'1'), (2, '2');
select * from rmt;
select * from mt;
select table, partition_id, name, rows from system.parts where database=currentDatabase() and table in ('mt', 'rmt') and active=1 order by table, name;
alter table rmt update s = 's'||toString(n) where 1;
select * from rmt;
alter table rmt replace partition '0' from mt;
select table, partition_id, name, rows from system.parts where database=currentDatabase() and table in ('mt', 'rmt') and active=1 order by table, name;
alter table rmt drop column s;
select mutation_id, command, parts_to_do_names, parts_to_do, is_done from system.mutations where database=currentDatabase() and table='rmt';
select * from rmt;
drop table rmt sync;
set replication_alter_partitions_sync=0;
create table rmt (n UInt64, s String) engine = ReplicatedMergeTree('/clickhouse/test_01149/rmt', 'r1') partition by intDiv(n, 10) order by n;
insert into rmt values (1,'1'), (2, '2');
alter table rmt update s = 's'||toString(n) where 1;
alter table rmt drop partition '0';
set replication_alter_partitions_sync=1;
alter table rmt drop column s;
drop table mt;
drop table rmt sync;

View File

@ -6,11 +6,11 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
url="http://${CLICKHOUSE_HOST}:${CLICKHOUSE_PORT_HTTP}/?session_id=test_01194"
rnd=$RANDOM
${CLICKHOUSE_CURL} -sS "$url&query=SELECT+$rnd,1" > /dev/null
${CLICKHOUSE_CURL} -sS "$url&query=SELECT+$rnd,2" > /dev/null
${CLICKHOUSE_CURL} -sS "$url" --data "SELECT $rnd,3" > /dev/null
${CLICKHOUSE_CURL} -sS "$url" --data "SELECT $rnd,4" > /dev/null
${CLICKHOUSE_CURL} -sS "$url&query=SELECT+'test_01194',$rnd,1" > /dev/null
${CLICKHOUSE_CURL} -sS "$url&query=SELECT+'test_01194',$rnd,2" > /dev/null
${CLICKHOUSE_CURL} -sS "$url" --data "SELECT 'test_01194',$rnd,3" > /dev/null
${CLICKHOUSE_CURL} -sS "$url" --data "SELECT 'test_01194',$rnd,4" > /dev/null
${CLICKHOUSE_CURL} -sS "$url" --data "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CURL} -sS "$url&query=SELECT+count(DISTINCT+query_id)+FROM+system.query_log+WHERE+query+LIKE+'SELECT+$rnd%25'"
${CLICKHOUSE_CURL} -sS "$url&query=SELECT+count(DISTINCT+query_id)+FROM+system.query_log+WHERE+query+LIKE+'SELECT+''test_01194'',$rnd%25'"

View File

@ -42,3 +42,14 @@
1 1 Int128 Int256
1 1 Int128 Int256
-1 -1 Int128 Int256
1 1 Float64 Float64
1 1 Float64 Float64
1 1 Float64 Float64
1 1 Float64 Float64
-1 -1 Float64 Float64
-1 -1 Float64 Float64
-1 -1 Float64 Float64
-1 -1 Float64 Float64
1 1 Float64 Float64
1 1 Float64 Float64
-1 -1 Float64 Float64

View File

@ -58,16 +58,16 @@ select intDiv(toInt128(-1), toInt256(-1)) x, intDiv(toInt256(-1), toInt256(-1))
select intDiv(toInt128(-1), toUInt256(1)) x, intDiv(toInt256(-1), toUInt256(1)) y, toTypeName(x), toTypeName(y);
-- select (toInt128(-1) / toInt8(1)) x, (toInt256(-1) / toInt8(1)) y, toTypeName(x), toTypeName(y);
-- select (toInt128(-1) / toInt16(1)) x, (toInt256(-1) / toInt16(1)) y, toTypeName(x), toTypeName(y);
-- select (toInt128(-1) / toInt32(1)) x, (toInt256(-1) / toInt32(1)) y, toTypeName(x), toTypeName(y);
-- select (toInt128(-1) / toInt64(1)) x, (toInt256(-1) / toInt64(1)) y, toTypeName(x), toTypeName(y);
-- select (toInt128(-1) / toUInt8(1)) x, (toInt256(-1) / toUInt8(1)) y, toTypeName(x), toTypeName(y);
-- select (toInt128(-1) / toUInt16(1)) x, (toInt256(-1) / toUInt16(1)) y, toTypeName(x), toTypeName(y);
-- select (toInt128(-1) / toUInt32(1)) x, (toInt256(-1) / toUInt32(1)) y, toTypeName(x), toTypeName(y);
-- select (toInt128(-1) / toUInt64(1)) x, (toInt256(-1) / toUInt64(1)) y, toTypeName(x), toTypeName(y);
select (toInt128(-1) / toInt8(-1)) x, (toInt256(-1) / toInt8(-1)) y, toTypeName(x), toTypeName(y);
select (toInt128(-1) / toInt16(-1)) x, (toInt256(-1) / toInt16(-1)) y, toTypeName(x), toTypeName(y);
select (toInt128(-1) / toInt32(-1)) x, (toInt256(-1) / toInt32(-1)) y, toTypeName(x), toTypeName(y);
select (toInt128(-1) / toInt64(-1)) x, (toInt256(-1) / toInt64(-1)) y, toTypeName(x), toTypeName(y);
select (toInt128(-1) / toUInt8(1)) x, (toInt256(-1) / toUInt8(1)) y, toTypeName(x), toTypeName(y);
select (toInt128(-1) / toUInt16(1)) x, (toInt256(-1) / toUInt16(1)) y, toTypeName(x), toTypeName(y);
select (toInt128(-1) / toUInt32(1)) x, (toInt256(-1) / toUInt32(1)) y, toTypeName(x), toTypeName(y);
select (toInt128(-1) / toUInt64(1)) x, (toInt256(-1) / toUInt64(1)) y, toTypeName(x), toTypeName(y);
-- select (toInt128(-1) * toInt128(1)) x, (toInt256(-1) * toInt128(1)) y, toTypeName(x), toTypeName(y);
-- select (toInt128(-1) * toInt256(1)) x, (toInt256(-1) * toInt256(1)) y, toTypeName(x), toTypeName(y);
-- --select (toInt128(-1) * toUInt128(1)) x, (toInt256(-1) * toUInt128(1)) y, toTypeName(x), toTypeName(y);
-- select (toInt128(-1) * toUInt256(1)) x, (toInt256(-1) * toUInt256(1)) y, toTypeName(x), toTypeName(y);
select (toInt128(-1) / toInt128(-1)) x, (toInt256(-1) / toInt128(-1)) y, toTypeName(x), toTypeName(y);
select (toInt128(-1) / toInt256(-1)) x, (toInt256(-1) / toInt256(-1)) y, toTypeName(x), toTypeName(y);
--select (toInt128(-1) / toUInt128(1)) x, (toInt256(-1) / toUInt128(1)) y, toTypeName(x), toTypeName(y);
select (toInt128(-1) / toUInt256(1)) x, (toInt256(-1) / toUInt256(1)) y, toTypeName(x), toTypeName(y);

View File

@ -67,3 +67,13 @@ SELECT
sum(j),
sum(k)
FROM columns_transformers
100 10 100 10 324 10
120 8 120 8 23 8
SELECT
i,
j,
toFloat64(i),
toFloat64(j),
toFloat64(k),
j
FROM columns_transformers

View File

@ -37,4 +37,8 @@ EXPLAIN SYNTAX SELECT * REPLACE(i + 1 AS i) REPLACE(i + 1 AS i) from columns_tra
SELECT COLUMNS(i, j, k) APPLY(sum) from columns_transformers;
EXPLAIN SYNTAX SELECT COLUMNS(i, j, k) APPLY(sum) from columns_transformers;
-- Multiple column matchers and transformers
SELECT i, j, COLUMNS(i, j, k) APPLY(toFloat64), COLUMNS(i, j) EXCEPT (i) from columns_transformers;
EXPLAIN SYNTAX SELECT i, j, COLUMNS(i, j, k) APPLY(toFloat64), COLUMNS(i, j) EXCEPT (i) from columns_transformers;
DROP TABLE columns_transformers;

Some files were not shown because too many files have changed in this diff Show More