mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 18:42:26 +00:00
fix
This commit is contained in:
commit
3690294ea4
2
.gitmodules
vendored
2
.gitmodules
vendored
@ -158,7 +158,7 @@
|
||||
url = https://github.com/openldap/openldap.git
|
||||
[submodule "contrib/AMQP-CPP"]
|
||||
path = contrib/AMQP-CPP
|
||||
url = https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
|
||||
url = https://github.com/ClickHouse-Extras/AMQP-CPP.git
|
||||
[submodule "contrib/cassandra"]
|
||||
path = contrib/cassandra
|
||||
url = https://github.com/ClickHouse-Extras/cpp-driver.git
|
||||
|
@ -300,6 +300,11 @@ if (COMPILER_CLANG)
|
||||
option(ENABLE_THINLTO "Clang-specific link time optimization" ON)
|
||||
endif()
|
||||
|
||||
# Set new experimental pass manager, it's a performance, build time and binary size win.
|
||||
# Can be removed after https://reviews.llvm.org/D66490 merged and released to at least two versions of clang.
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fexperimental-new-pass-manager")
|
||||
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fexperimental-new-pass-manager")
|
||||
|
||||
# We cannot afford to use LTO when compiling unit tests, and it's not enough
|
||||
# to only supply -fno-lto at the final linking stage. So we disable it
|
||||
# completely.
|
||||
|
2
contrib/AMQP-CPP
vendored
2
contrib/AMQP-CPP
vendored
@ -1 +1 @@
|
||||
Subproject commit 1c08399ab0ab9e4042ef8e2bbe9e208e5dcbc13b
|
||||
Subproject commit d63e1f016582e9faaaf279aa24513087a07bc6e7
|
@ -16,6 +16,7 @@ set (SRCS
|
||||
${LIBRARY_DIR}/src/flags.cpp
|
||||
${LIBRARY_DIR}/src/linux_tcp/openssl.cpp
|
||||
${LIBRARY_DIR}/src/linux_tcp/tcpconnection.cpp
|
||||
${LIBRARY_DIR}/src/inbuffer.cpp
|
||||
${LIBRARY_DIR}/src/receivedframe.cpp
|
||||
${LIBRARY_DIR}/src/table.cpp
|
||||
${LIBRARY_DIR}/src/watchable.cpp
|
||||
|
2
contrib/libhdfs3
vendored
2
contrib/libhdfs3
vendored
@ -1 +1 @@
|
||||
Subproject commit 1b666578c85094306b061352078022f6350bfab8
|
||||
Subproject commit 24b058c356794ef6cc2d31323dc9adf0386652ff
|
@ -11,7 +11,7 @@ RUN apt-get update \
|
||||
&& echo "${LLVM_PUBKEY_HASH} /tmp/llvm-snapshot.gpg.key" | sha384sum -c \
|
||||
&& apt-key add /tmp/llvm-snapshot.gpg.key \
|
||||
&& export CODENAME="$(lsb_release --codename --short | tr 'A-Z' 'a-z')" \
|
||||
&& echo "deb [trusted=yes] http://apt.llvm.org/${CODENAME}/ llvm-toolchain-${CODENAME}-${LLVM_VERSION} main" >> \
|
||||
&& echo "deb [trusted=yes] https://apt.llvm.org/${CODENAME}/ llvm-toolchain-${CODENAME}-${LLVM_VERSION} main" >> \
|
||||
/etc/apt/sources.list
|
||||
|
||||
# initial packages
|
||||
|
@ -70,6 +70,35 @@ Works with tables in the MergeTree family.
|
||||
|
||||
If `force_primary_key=1`, ClickHouse checks to see if the query has a primary key condition that can be used for restricting data ranges. If there is no suitable condition, it throws an exception. However, it does not check whether the condition reduces the amount of data to read. For more information about data ranges in MergeTree tables, see [MergeTree](../../engines/table-engines/mergetree-family/mergetree.md).
|
||||
|
||||
## force\_data\_skipping\_indices {#settings-force_data_skipping_indices}
|
||||
|
||||
Disables query execution if passed data skipping indices wasn't used.
|
||||
|
||||
Consider the following example:
|
||||
|
||||
```sql
|
||||
CREATE TABLE data
|
||||
(
|
||||
key Int,
|
||||
d1 Int,
|
||||
d1_null Nullable(Int),
|
||||
INDEX d1_idx d1 TYPE minmax GRANULARITY 1,
|
||||
INDEX d1_null_idx assumeNotNull(d1_null) TYPE minmax GRANULARITY 1
|
||||
)
|
||||
Engine=MergeTree()
|
||||
ORDER BY key;
|
||||
|
||||
SELECT * FROM data_01515;
|
||||
SELECT * FROM data_01515 SETTINGS force_data_skipping_indices=''; -- query will produce CANNOT_PARSE_TEXT error.
|
||||
SELECT * FROM data_01515 SETTINGS force_data_skipping_indices='d1_idx'; -- query will produce INDEX_NOT_USED error.
|
||||
SELECT * FROM data_01515 WHERE d1 = 0 SETTINGS force_data_skipping_indices='d1_idx'; -- Ok.
|
||||
SELECT * FROM data_01515 WHERE d1 = 0 SETTINGS force_data_skipping_indices='`d1_idx`'; -- Ok (example of full featured parser).
|
||||
SELECT * FROM data_01515 WHERE d1 = 0 SETTINGS force_data_skipping_indices='`d1_idx`, d1_null_idx'; -- query will produce INDEX_NOT_USED error, since d1_null_idx is not used.
|
||||
SELECT * FROM data_01515 WHERE d1 = 0 AND assumeNotNull(d1_null) = 0 SETTINGS force_data_skipping_indices='`d1_idx`, d1_null_idx'; -- Ok.
|
||||
```
|
||||
|
||||
Works with tables in the MergeTree family.
|
||||
|
||||
## format\_schema {#format-schema}
|
||||
|
||||
This parameter is useful when you are using formats that require a schema definition, such as [Cap’n Proto](https://capnproto.org/) or [Protobuf](https://developers.google.com/protocol-buffers/). The value depends on the format.
|
||||
|
@ -107,4 +107,8 @@ SELECT toDecimal32(1, 8) < 100
|
||||
DB::Exception: Can't compare.
|
||||
```
|
||||
|
||||
**See also**
|
||||
- [isDecimalOverflow](../../sql-reference/functions/other-functions.md#is-decimal-overflow)
|
||||
- [countDigits](../../sql-reference/functions/other-functions.md#count-digits)
|
||||
|
||||
[Original article](https://clickhouse.tech/docs/en/data_types/decimal/) <!--hide-->
|
||||
|
@ -1526,5 +1526,80 @@ SELECT getSetting('custom_a');
|
||||
|
||||
- [Custom Settings](../../operations/settings/index.md#custom_settings)
|
||||
|
||||
## isDecimalOverflow {#is-decimal-overflow}
|
||||
|
||||
Checks whether the [Decimal](../../sql-reference/data-types/decimal.md#decimalp-s-decimal32s-decimal64s-decimal128s) value is out of its (or specified) precision.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
isDecimalOverflow(d, [p])
|
||||
```
|
||||
|
||||
**Parameters**
|
||||
|
||||
- `d` — value. [Decimal](../../sql-reference/data-types/decimal.md#decimalp-s-decimal32s-decimal64s-decimal128s).
|
||||
- `p` — precision. Optional. If omitted, the initial presicion of the first argument is used. Using of this paratemer could be helpful for data extraction to another DBMS or file. [UInt8](../../sql-reference/data-types/int-uint.md#uint-ranges).
|
||||
|
||||
**Returned values**
|
||||
|
||||
- `1` — Decimal value has more digits then it's precision allow,
|
||||
- `0` — Decimal value satisfies the specified precision.
|
||||
|
||||
**Example**
|
||||
|
||||
Query:
|
||||
|
||||
``` sql
|
||||
SELECT isDecimalOverflow(toDecimal32(1000000000, 0), 9),
|
||||
isDecimalOverflow(toDecimal32(1000000000, 0)),
|
||||
isDecimalOverflow(toDecimal32(-1000000000, 0), 9),
|
||||
isDecimalOverflow(toDecimal32(-1000000000, 0));
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
``` text
|
||||
1 1 1 1
|
||||
```
|
||||
|
||||
## countDigits {#count-digits}
|
||||
|
||||
Returns number of decimal digits you need to represent the value.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
countDigits(x)
|
||||
```
|
||||
|
||||
**Parameters**
|
||||
|
||||
- `x` — [Int](../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64) or [Decimal](../../sql-reference/data-types/decimal.md#decimalp-s-decimal32s-decimal64s-decimal128s) value.
|
||||
|
||||
**Returned value**
|
||||
|
||||
Number of digits.
|
||||
|
||||
Type: [UInt8](../../sql-reference/data-types/int-uint.md#uint-ranges).
|
||||
|
||||
!!! note "Note"
|
||||
For `Decimal` values takes into account their scales: calculates result over underlying integer type which is `(value * scale)`. For example: `countDigits(42) = 2`, `countDigits(42.000) = 5`, `countDigits(0.04200) = 4`. I.e. you may check decimal overflow for `Decimal64` with `countDecimal(x) > 18`. It's a slow variant of [isDecimalOverflow](#is-decimal-overflow).
|
||||
|
||||
**Example**
|
||||
|
||||
Query:
|
||||
|
||||
``` sql
|
||||
SELECT countDigits(toDecimal32(1, 9)), countDigits(toDecimal32(-1, 9)),
|
||||
countDigits(toDecimal64(1, 18)), countDigits(toDecimal64(-1, 18)),
|
||||
countDigits(toDecimal128(1, 38)), countDigits(toDecimal128(-1, 38));
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
``` text
|
||||
10 10 19 19 39 39
|
||||
```
|
||||
|
||||
[Original article](https://clickhouse.tech/docs/en/query_language/functions/other_functions/) <!--hide-->
|
||||
|
@ -360,6 +360,89 @@ Extracts a fragment of a string using a regular expression. If ‘haystack’ do
|
||||
|
||||
Extracts all the fragments of a string using a regular expression. If ‘haystack’ doesn’t match the ‘pattern’ regex, an empty string is returned. Returns an array of strings consisting of all matches to the regex. In general, the behavior is the same as the ‘extract’ function (it takes the first subpattern, or the entire expression if there isn’t a subpattern).
|
||||
|
||||
## extractAllGroupsHorizontal {#extractallgroups-horizontal}
|
||||
|
||||
Matches all groups of the `haystack` string using the `pattern` regular expression. Returns an array of arrays, where the first array includes all fragments matching the first group, the second array - matching the second group, etc.
|
||||
|
||||
!!! note "Note"
|
||||
`extractAllGroupsHorizontal` function is slower than [extractAllGroupsVertical](#extractallgroups-vertical).
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
extractAllGroupsHorizontal(haystack, pattern)
|
||||
```
|
||||
|
||||
**Parameters**
|
||||
|
||||
- `haystack` — Input string. Type: [String](../../sql-reference/data-types/string.md).
|
||||
- `pattern` — Regular expression with [re2 syntax](https://github.com/google/re2/wiki/Syntax). Must contain groups, each group enclosed in parentheses. If `pattern` contains no groups, an exception is thrown. Type: [String](../../sql-reference/data-types/string.md).
|
||||
|
||||
**Returned value**
|
||||
|
||||
- Type: [Array](../../sql-reference/data-types/array.md).
|
||||
|
||||
If `haystack` doesn’t match the `pattern` regex, an array of empty arrays is returned.
|
||||
|
||||
**Example**
|
||||
|
||||
Query:
|
||||
|
||||
``` sql
|
||||
SELECT extractAllGroupsHorizontal('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
``` text
|
||||
┌─extractAllGroupsHorizontal('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')─┐
|
||||
│ [['abc','def','ghi'],['111','222','333']] │
|
||||
└──────────────────────────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
**See also**
|
||||
- [extractAllGroupsVertical](#extractallgroups-vertical)
|
||||
|
||||
## extractAllGroupsVertical {#extractallgroups-vertical}
|
||||
|
||||
Matches all groups of the `haystack` string using the `pattern` regular expression. Returns an array of arrays, where each array includes matching fragments from every group. Fragments are grouped in order of appearance in the `haystack`.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
extractAllGroupsVertical(haystack, pattern)
|
||||
```
|
||||
|
||||
**Parameters**
|
||||
|
||||
- `haystack` — Input string. Type: [String](../../sql-reference/data-types/string.md).
|
||||
- `pattern` — Regular expression with [re2 syntax](https://github.com/google/re2/wiki/Syntax). Must contain groups, each group enclosed in parentheses. If `pattern` contains no groups, an exception is thrown. Type: [String](../../sql-reference/data-types/string.md).
|
||||
|
||||
**Returned value**
|
||||
|
||||
- Type: [Array](../../sql-reference/data-types/array.md).
|
||||
|
||||
If `haystack` doesn’t match the `pattern` regex, an empty array is returned.
|
||||
|
||||
**Example**
|
||||
|
||||
Query:
|
||||
|
||||
``` sql
|
||||
SELECT extractAllGroupsVertical('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
``` text
|
||||
┌─extractAllGroupsVertical('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')─┐
|
||||
│ [['abc','111'],['def','222'],['ghi','333']] │
|
||||
└────────────────────────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
**See also**
|
||||
- [extractAllGroupsHorizontal](#extractallgroups-horizontal)
|
||||
|
||||
## like(haystack, pattern), haystack LIKE pattern operator {#function-like}
|
||||
|
||||
Checks whether a string matches a simple regular expression.
|
||||
|
@ -102,4 +102,9 @@ SELECT toDecimal32(1, 8) < 100
|
||||
DB::Exception: Can't compare.
|
||||
```
|
||||
|
||||
**Смотрите также**
|
||||
- [isDecimalOverflow](../../sql-reference/functions/other-functions.md#is-decimal-overflow)
|
||||
- [countDigits](../../sql-reference/functions/other-functions.md#count-digits)
|
||||
|
||||
|
||||
[Оригинальная статья](https://clickhouse.tech/docs/ru/data_types/decimal/) <!--hide-->
|
||||
|
@ -1431,5 +1431,80 @@ SELECT randomStringUTF8(13)
|
||||
|
||||
```
|
||||
|
||||
## isDecimalOverflow {#is-decimal-overflow}
|
||||
|
||||
Проверяет, находится ли число [Decimal](../../sql-reference/data-types/decimal.md#decimalp-s-decimal32s-decimal64s-decimal128s) вне собственной (или заданной) области значений.
|
||||
|
||||
**Синтаксис**
|
||||
|
||||
``` sql
|
||||
isDecimalOverflow(d, [p])
|
||||
```
|
||||
|
||||
**Параметры**
|
||||
|
||||
- `d` — число. [Decimal](../../sql-reference/data-types/decimal.md#decimalp-s-decimal32s-decimal64s-decimal128s).
|
||||
- `p` — точность. Необязательный параметр. Если опущен, используется исходная точность первого аргумента. Использование этого параметра может быть полезно для извлечения данных в другую СУБД или файл. [UInt8](../../sql-reference/data-types/int-uint.md#uint-ranges).
|
||||
|
||||
**Возвращаемое значение**
|
||||
|
||||
- `1` — число имеет больше цифр, чем позволяет точность.
|
||||
- `0` — число удовлетворяет заданной точности.
|
||||
|
||||
**Пример**
|
||||
|
||||
Запрос:
|
||||
|
||||
``` sql
|
||||
SELECT isDecimalOverflow(toDecimal32(1000000000, 0), 9),
|
||||
isDecimalOverflow(toDecimal32(1000000000, 0)),
|
||||
isDecimalOverflow(toDecimal32(-1000000000, 0), 9),
|
||||
isDecimalOverflow(toDecimal32(-1000000000, 0));
|
||||
```
|
||||
|
||||
Результат:
|
||||
|
||||
``` text
|
||||
1 1 1 1
|
||||
```
|
||||
|
||||
## countDigits {#count-digits}
|
||||
|
||||
Возвращает количество десятичных цифр, необходимых для представления значения.
|
||||
|
||||
**Синтаксис**
|
||||
|
||||
``` sql
|
||||
countDigits(x)
|
||||
```
|
||||
|
||||
**Параметры**
|
||||
|
||||
- `x` — [целое](../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64) или [дробное](../../sql-reference/data-types/decimal.md#decimalp-s-decimal32s-decimal64s-decimal128s) число.
|
||||
|
||||
**Возвращаемое значение**
|
||||
|
||||
Количество цифр.
|
||||
|
||||
Тип: [UInt8](../../sql-reference/data-types/int-uint.md#uint-ranges).
|
||||
|
||||
!!! note "Примечание"
|
||||
Для `Decimal` значений учитывается их масштаб: вычисляется результат по базовому целочисленному типу, полученному как `(value * scale)`. Например: `countDigits(42) = 2`, `countDigits(42.000) = 5`, `countDigits(0.04200) = 4`. То есть вы можете проверить десятичное переполнение для `Decimal64` с помощью `countDecimal(x) > 18`. Это медленный вариант [isDecimalOverflow](#is-decimal-overflow).
|
||||
|
||||
**Пример**
|
||||
|
||||
Запрос:
|
||||
|
||||
``` sql
|
||||
SELECT countDigits(toDecimal32(1, 9)), countDigits(toDecimal32(-1, 9)),
|
||||
countDigits(toDecimal64(1, 18)), countDigits(toDecimal64(-1, 18)),
|
||||
countDigits(toDecimal128(1, 38)), countDigits(toDecimal128(-1, 38));
|
||||
```
|
||||
|
||||
Результат:
|
||||
|
||||
``` text
|
||||
10 10 19 19 39 39
|
||||
```
|
||||
|
||||
[Оригинальная статья](https://clickhouse.tech/docs/ru/query_language/functions/other_functions/) <!--hide-->
|
||||
|
@ -341,6 +341,89 @@ Result:
|
||||
|
||||
Извлечение всех фрагментов строки по регулярному выражению. Если haystack не соответствует регулярному выражению pattern, то возвращается пустая строка. Возвращается массив строк, состоящий из всех соответствий регулярному выражению. В остальном, поведение аналогично функции extract (по прежнему, вынимается первый subpattern, или всё выражение, если subpattern-а нет).
|
||||
|
||||
## extractAllGroupsHorizontal {#extractallgroups-horizontal}
|
||||
|
||||
Разбирает строку `haystack` на фрагменты, соответствующие группам регулярного выражения `pattern`. Возвращает массив массивов, где первый массив содержит все фрагменты, соответствующие первой группе регулярного выражения, второй массив - соответствующие второй группе, и т.д.
|
||||
|
||||
!!! note "Замечание"
|
||||
Функция `extractAllGroupsHorizontal` работает медленнее, чем функция [extractAllGroupsVertical](#extractallgroups-vertical).
|
||||
|
||||
**Синтаксис**
|
||||
|
||||
``` sql
|
||||
extractAllGroupsHorizontal(haystack, pattern)
|
||||
```
|
||||
|
||||
**Параметры**
|
||||
|
||||
- `haystack` — строка для разбора. Тип: [String](../../sql-reference/data-types/string.md).
|
||||
- `pattern` — регулярное выражение, построенное по синтаксическим правилам [re2](https://github.com/google/re2/wiki/Syntax). Выражение должно содержать группы, заключенные в круглые скобки. Если выражение не содержит групп, генерируется исключение. Тип: [String](../../sql-reference/data-types/string.md).
|
||||
|
||||
**Возвращаемое значение**
|
||||
|
||||
- Тип: [Array](../../sql-reference/data-types/array.md).
|
||||
|
||||
Если в строке `haystack` нет групп, соответствующих регулярному выражению `pattern`, возвращается массив пустых массивов.
|
||||
|
||||
**Пример**
|
||||
|
||||
Запрос:
|
||||
|
||||
``` sql
|
||||
SELECT extractAllGroupsHorizontal('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')
|
||||
```
|
||||
|
||||
Результат:
|
||||
|
||||
``` text
|
||||
┌─extractAllGroupsHorizontal('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')─┐
|
||||
│ [['abc','def','ghi'],['111','222','333']] │
|
||||
└──────────────────────────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
**См. также**
|
||||
- функция [extractAllGroupsVertical](#extractallgroups-vertical)
|
||||
|
||||
## extractAllGroupsVertical {#extractallgroups-vertical}
|
||||
|
||||
Разбирает строку `haystack` на фрагменты, соответствующие группам регулярного выражения `pattern`. Возвращает массив массивов, где каждый массив содержит по одному фрагменту, соответствующему каждой группе регулярного выражения. Фрагменты группируются в массивы в соответствии с порядком появления в исходной строке.
|
||||
|
||||
**Синтаксис**
|
||||
|
||||
``` sql
|
||||
extractAllGroupsVertical(haystack, pattern)
|
||||
```
|
||||
|
||||
**Параметры**
|
||||
|
||||
- `haystack` — строка для разбора. Тип: [String](../../sql-reference/data-types/string.md).
|
||||
- `pattern` — регулярное выражение, построенное по синтаксическим правилам [re2](https://github.com/google/re2/wiki/Syntax). Выражение должно содержать группы, заключенные в круглые скобки. Если выражение не содержит групп, генерируется исключение. Тип: [String](../../sql-reference/data-types/string.md).
|
||||
|
||||
**Возвращаемое значение**
|
||||
|
||||
- Тип: [Array](../../sql-reference/data-types/array.md).
|
||||
|
||||
Если в строке `haystack` нет групп, соответствующих регулярному выражению `pattern`, возвращается пустой массив.
|
||||
|
||||
**Пример**
|
||||
|
||||
Запрос:
|
||||
|
||||
``` sql
|
||||
SELECT extractAllGroupsVertical('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')
|
||||
```
|
||||
|
||||
Результат:
|
||||
|
||||
``` text
|
||||
┌─extractAllGroupsVertical('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')─┐
|
||||
│ [['abc','111'],['def','222'],['ghi','333']] │
|
||||
└────────────────────────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
**См. также**
|
||||
- функция [extractAllGroupsHorizontal](#extractallgroups-horizontal)
|
||||
|
||||
## like(haystack, pattern), оператор haystack LIKE pattern {#function-like}
|
||||
|
||||
Проверка строки на соответствие простому регулярному выражению.
|
||||
|
@ -267,12 +267,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
|
||||
registerDictionaries();
|
||||
registerDisks();
|
||||
|
||||
#if !defined(ARCADIA_BUILD)
|
||||
#if USE_OPENCL
|
||||
BitonicSort::getInstance().configure();
|
||||
#endif
|
||||
#endif
|
||||
|
||||
CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::getVersionRevision());
|
||||
CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger());
|
||||
|
||||
|
@ -152,12 +152,14 @@ inline typename DecimalType::NativeType getFractionalPartWithScaleMultiplier(
|
||||
{
|
||||
using T = typename DecimalType::NativeType;
|
||||
|
||||
T result = decimal.value;
|
||||
/// There's UB with min integer value here. But it does not matter for Decimals cause they use not full integer ranges.
|
||||
/// Anycase we make modulo before compare to make scale_multiplier > 1 unaffected.
|
||||
T result = decimal.value % scale_multiplier;
|
||||
if constexpr (!keep_sign)
|
||||
if (result < T(0))
|
||||
result = -result;
|
||||
|
||||
return result % scale_multiplier;
|
||||
return result;
|
||||
}
|
||||
|
||||
/** Get fractional part from decimal
|
||||
|
@ -135,6 +135,7 @@ class IColumn;
|
||||
\
|
||||
M(Bool, force_index_by_date, 0, "Throw an exception if there is a partition key in a table, and it is not used.", 0) \
|
||||
M(Bool, force_primary_key, 0, "Throw an exception if there is primary key in a table, and it is not used.", 0) \
|
||||
M(String, force_data_skipping_indices, "", "Comma separated list of strings or literals with the name of the data skipping indices that should be used during query execution, otherwise an exception will be thrown.", 0) \
|
||||
\
|
||||
M(Float, max_streams_to_max_threads_ratio, 1, "Allows you to use more sources than the number of threads - to more evenly distribute work across threads. It is assumed that this is a temporary solution, since it will be possible in the future to make the number of sources equal to the number of threads, but for each source to dynamically select available work for itself.", 0) \
|
||||
M(Float, max_streams_multiplier_for_merge_tables, 5, "Ask more streams when reading from Merge table. Streams will be spread across tables that Merge table will use. This allows more even distribution of work across threads and especially helpful when merged tables differ in size.", 0) \
|
||||
|
@ -194,6 +194,7 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode
|
||||
auto src_buffer = cache_disk->readFile(path, buf_size, estimated_size, aio_threshold, 0);
|
||||
auto dst_buffer = DiskDecorator::writeFile(path, buf_size, mode, estimated_size, aio_threshold);
|
||||
copyData(*src_buffer, *dst_buffer);
|
||||
dst_buffer->finalize();
|
||||
},
|
||||
buf_size);
|
||||
}
|
||||
|
@ -27,6 +27,7 @@ void copyFile(IDisk & from_disk, const String & from_path, IDisk & to_disk, cons
|
||||
auto in = from_disk.readFile(from_path);
|
||||
auto out = to_disk.writeFile(to_path);
|
||||
copyData(*in, *out);
|
||||
out->finalize();
|
||||
}
|
||||
|
||||
|
||||
|
@ -1,3 +1,4 @@
|
||||
#include <aws/core/client/DefaultRetryStrategy.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/S3Common.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
@ -123,6 +124,9 @@ void registerDiskS3(DiskFactory & factory)
|
||||
if (proxy_config)
|
||||
cfg.perRequestConfiguration = [proxy_config](const auto & request) { return proxy_config->getConfiguration(request); };
|
||||
|
||||
cfg.retryStrategy = std::make_shared<Aws::Client::DefaultRetryStrategy>(
|
||||
config.getUInt(config_prefix + ".retry_attempts", 10));
|
||||
|
||||
auto client = S3::ClientFactory::instance().create(
|
||||
cfg,
|
||||
uri.is_virtual_hosted_style,
|
||||
|
@ -5,7 +5,6 @@ namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
@ -18,10 +17,7 @@ struct DivideFloatingImpl
|
||||
template <typename Result = ResultType>
|
||||
static inline NO_SANITIZE_UNDEFINED Result apply(A a [[maybe_unused]], B b [[maybe_unused]])
|
||||
{
|
||||
if constexpr (is_big_int_v<A> || is_big_int_v<B>)
|
||||
throw Exception("DivideFloatingImpl are not implemented for big integers", ErrorCodes::NOT_IMPLEMENTED);
|
||||
else
|
||||
return static_cast<Result>(a) / b;
|
||||
return static_cast<Result>(a) / b;
|
||||
}
|
||||
|
||||
#if USE_EMBEDDED_COMPILER
|
||||
|
@ -3,6 +3,7 @@
|
||||
#if USE_HDFS
|
||||
#include <IO/HDFSCommon.h>
|
||||
#include <hdfs/hdfs.h>
|
||||
#include <mutex>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -17,6 +18,9 @@ ReadBufferFromHDFS::~ReadBufferFromHDFS() = default;
|
||||
|
||||
struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
|
||||
{
|
||||
/// HDFS create/open functions are not thread safe
|
||||
static std::mutex hdfs_init_mutex;
|
||||
|
||||
std::string hdfs_uri;
|
||||
hdfsFile fin;
|
||||
HDFSBuilderPtr builder;
|
||||
@ -24,9 +28,11 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
|
||||
|
||||
explicit ReadBufferFromHDFSImpl(const std::string & hdfs_name_)
|
||||
: hdfs_uri(hdfs_name_)
|
||||
, builder(createHDFSBuilder(hdfs_uri))
|
||||
, fs(createHDFSFS(builder.get()))
|
||||
{
|
||||
std::lock_guard lock(hdfs_init_mutex);
|
||||
|
||||
builder = createHDFSBuilder(hdfs_uri);
|
||||
fs = createHDFSFS(builder.get());
|
||||
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
|
||||
const std::string path = hdfs_uri.substr(begin_of_path);
|
||||
fin = hdfsOpenFile(fs.get(), path.c_str(), O_RDONLY, 0, 0, 0);
|
||||
@ -47,10 +53,13 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
|
||||
|
||||
~ReadBufferFromHDFSImpl()
|
||||
{
|
||||
std::lock_guard lock(hdfs_init_mutex);
|
||||
hdfsCloseFile(fs.get(), fin);
|
||||
}
|
||||
};
|
||||
|
||||
std::mutex ReadBufferFromHDFS::ReadBufferFromHDFSImpl::hdfs_init_mutex;
|
||||
|
||||
ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size)
|
||||
: BufferWithOwnMemory<ReadBuffer>(buf_size)
|
||||
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_name_))
|
||||
|
@ -1114,10 +1114,6 @@ ExpressionActionsPtr ExpressionActions::splitActionsBeforeArrayJoin(const NameSe
|
||||
input_columns = split_actions->getSampleBlock().getNamesAndTypesList();
|
||||
input_columns.insert(input_columns.end(), inputs_from_array_join.begin(), inputs_from_array_join.end());
|
||||
|
||||
/// Remove not needed columns.
|
||||
if (!actions.empty())
|
||||
prependProjectInput();
|
||||
|
||||
return split_actions;
|
||||
}
|
||||
|
||||
|
@ -27,7 +27,9 @@ namespace
|
||||
|
||||
StoragePtr tryGetTable(const ASTPtr & database_and_table, const Context & context)
|
||||
{
|
||||
auto table_id = context.resolveStorageID(database_and_table);
|
||||
auto table_id = context.tryResolveStorageID(database_and_table);
|
||||
if (!table_id)
|
||||
return {};
|
||||
return DatabaseCatalog::instance().tryGetTable(table_id, context);
|
||||
}
|
||||
|
||||
|
@ -566,7 +566,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st
|
||||
}
|
||||
|
||||
if (node->exception)
|
||||
finish();
|
||||
cancel();
|
||||
|
||||
if (finished)
|
||||
break;
|
||||
|
@ -101,7 +101,7 @@ public:
|
||||
void setQuota(const std::shared_ptr<const EnabledQuota> & quota);
|
||||
|
||||
/// Do not allow to change the table while the processors of pipe are alive.
|
||||
void addTableLock(const TableLockHolder & lock) { holder.table_locks.push_back(lock); }
|
||||
void addTableLock(TableLockHolder lock) { holder.table_locks.emplace_back(std::move(lock)); }
|
||||
/// This methods are from QueryPipeline. Needed to make conversion from pipeline to pipe possible.
|
||||
void addInterpreterContext(std::shared_ptr<Context> context) { holder.interpreter_context.emplace_back(std::move(context)); }
|
||||
void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); }
|
||||
|
@ -101,7 +101,7 @@ public:
|
||||
|
||||
const Block & getHeader() const { return pipe.getHeader(); }
|
||||
|
||||
void addTableLock(const TableLockHolder & lock) { pipe.addTableLock(lock); }
|
||||
void addTableLock(TableLockHolder lock) { pipe.addTableLock(std::move(lock)); }
|
||||
void addInterpreterContext(std::shared_ptr<Context> context) { pipe.addInterpreterContext(std::move(context)); }
|
||||
void addStorageHolder(StoragePtr storage) { pipe.addStorageHolder(std::move(storage)); }
|
||||
void addQueryPlan(std::unique_ptr<QueryPlan> plan) { pipe.addQueryPlan(std::move(plan)); }
|
||||
|
@ -12,30 +12,18 @@ namespace DB
|
||||
{
|
||||
|
||||
ReadFromStorageStep::ReadFromStorageStep(
|
||||
TableLockHolder table_lock_,
|
||||
StorageMetadataPtr metadata_snapshot_,
|
||||
StreamLocalLimits & limits_,
|
||||
SizeLimits & leaf_limits_,
|
||||
std::shared_ptr<const EnabledQuota> quota_,
|
||||
StoragePtr storage_,
|
||||
const Names & required_columns_,
|
||||
const SelectQueryInfo & query_info_,
|
||||
std::shared_ptr<Context> context_,
|
||||
QueryProcessingStage::Enum processing_stage_,
|
||||
size_t max_block_size_,
|
||||
size_t max_streams_)
|
||||
: table_lock(std::move(table_lock_))
|
||||
, metadata_snapshot(std::move(metadata_snapshot_))
|
||||
, limits(limits_)
|
||||
, leaf_limits(leaf_limits_)
|
||||
, quota(std::move(quota_))
|
||||
, storage(std::move(storage_))
|
||||
, required_columns(required_columns_)
|
||||
, query_info(query_info_)
|
||||
, context(std::move(context_))
|
||||
, processing_stage(processing_stage_)
|
||||
, max_block_size(max_block_size_)
|
||||
, max_streams(max_streams_)
|
||||
TableLockHolder table_lock,
|
||||
StorageMetadataPtr metadata_snapshot,
|
||||
StreamLocalLimits & limits,
|
||||
SizeLimits & leaf_limits,
|
||||
std::shared_ptr<const EnabledQuota> quota,
|
||||
StoragePtr storage,
|
||||
const Names & required_columns,
|
||||
const SelectQueryInfo & query_info,
|
||||
std::shared_ptr<Context> context,
|
||||
QueryProcessingStage::Enum processing_stage,
|
||||
size_t max_block_size,
|
||||
size_t max_streams)
|
||||
{
|
||||
/// Note: we read from storage in constructor of step because we don't know real header before reading.
|
||||
/// It will be fixed when storage return QueryPlanStep itself.
|
||||
@ -83,9 +71,6 @@ ReadFromStorageStep::ReadFromStorageStep(
|
||||
pipeline = std::make_unique<QueryPipeline>();
|
||||
QueryPipelineProcessorsCollector collector(*pipeline, this);
|
||||
|
||||
/// Table lock is stored inside pipeline here.
|
||||
pipeline->addTableLock(table_lock);
|
||||
|
||||
pipe.setLimits(limits);
|
||||
|
||||
/**
|
||||
@ -103,8 +88,11 @@ ReadFromStorageStep::ReadFromStorageStep(
|
||||
|
||||
pipeline->init(std::move(pipe));
|
||||
|
||||
/// Add resources to pipeline. The order is important.
|
||||
/// Add in reverse order of destruction. Pipeline will be destroyed at the end in case of exception.
|
||||
pipeline->addInterpreterContext(std::move(context));
|
||||
pipeline->addStorageHolder(std::move(storage));
|
||||
pipeline->addTableLock(std::move(table_lock));
|
||||
|
||||
processors = collector.detachProcessors();
|
||||
|
||||
|
@ -45,20 +45,6 @@ public:
|
||||
void describePipeline(FormatSettings & settings) const override;
|
||||
|
||||
private:
|
||||
TableLockHolder table_lock;
|
||||
StorageMetadataPtr metadata_snapshot;
|
||||
StreamLocalLimits limits;
|
||||
SizeLimits leaf_limits;
|
||||
std::shared_ptr<const EnabledQuota> quota;
|
||||
|
||||
StoragePtr storage;
|
||||
const Names & required_columns;
|
||||
const SelectQueryInfo & query_info;
|
||||
std::shared_ptr<Context> context;
|
||||
QueryProcessingStage::Enum processing_stage;
|
||||
size_t max_block_size;
|
||||
size_t max_streams;
|
||||
|
||||
QueryPipelinePtr pipeline;
|
||||
Processors processors;
|
||||
};
|
||||
|
@ -276,7 +276,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
ReadBufferFromString ttl_infos_buffer(ttl_infos_string);
|
||||
assertString("ttl format version: 1\n", ttl_infos_buffer);
|
||||
ttl_infos.read(ttl_infos_buffer);
|
||||
reservation = data.reserveSpacePreferringTTLRules(sum_files_size, ttl_infos, std::time(nullptr), 0, true);
|
||||
reservation = data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true);
|
||||
}
|
||||
else
|
||||
reservation = data.reserveSpace(sum_files_size);
|
||||
|
@ -88,6 +88,7 @@ void IMergeTreeDataPart::MinMaxIndex::store(
|
||||
out_hashing.next();
|
||||
out_checksums.files[file_name].file_size = out_hashing.count();
|
||||
out_checksums.files[file_name].file_hash = out_hashing.getHash();
|
||||
out->finalize();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3037,28 +3037,31 @@ ReservationPtr MergeTreeData::tryReserveSpace(UInt64 expected_size, SpacePtr spa
|
||||
return space->reserve(expected_size);
|
||||
}
|
||||
|
||||
ReservationPtr MergeTreeData::reserveSpacePreferringTTLRules(UInt64 expected_size,
|
||||
const IMergeTreeDataPart::TTLInfos & ttl_infos,
|
||||
time_t time_of_move,
|
||||
size_t min_volume_index,
|
||||
bool is_insert) const
|
||||
ReservationPtr MergeTreeData::reserveSpacePreferringTTLRules(
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
UInt64 expected_size,
|
||||
const IMergeTreeDataPart::TTLInfos & ttl_infos,
|
||||
time_t time_of_move,
|
||||
size_t min_volume_index,
|
||||
bool is_insert) const
|
||||
{
|
||||
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
|
||||
|
||||
ReservationPtr reservation = tryReserveSpacePreferringTTLRules(expected_size, ttl_infos, time_of_move, min_volume_index, is_insert);
|
||||
ReservationPtr reservation = tryReserveSpacePreferringTTLRules(metadata_snapshot, expected_size, ttl_infos, time_of_move, min_volume_index, is_insert);
|
||||
|
||||
return checkAndReturnReservation(expected_size, std::move(reservation));
|
||||
}
|
||||
|
||||
ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_size,
|
||||
const IMergeTreeDataPart::TTLInfos & ttl_infos,
|
||||
time_t time_of_move,
|
||||
size_t min_volume_index,
|
||||
bool is_insert) const
|
||||
ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
UInt64 expected_size,
|
||||
const IMergeTreeDataPart::TTLInfos & ttl_infos,
|
||||
time_t time_of_move,
|
||||
size_t min_volume_index,
|
||||
bool is_insert) const
|
||||
{
|
||||
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
|
||||
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
ReservationPtr reservation;
|
||||
|
||||
auto move_ttl_entry = selectTTLDescriptionForTTLInfos(metadata_snapshot->getMoveTTLs(), ttl_infos.moves_ttl, time_of_move, true);
|
||||
|
@ -632,6 +632,7 @@ public:
|
||||
|
||||
/// Reserves space at least 1MB preferring best destination according to `ttl_infos`.
|
||||
ReservationPtr reserveSpacePreferringTTLRules(
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
UInt64 expected_size,
|
||||
const IMergeTreeDataPart::TTLInfos & ttl_infos,
|
||||
time_t time_of_move,
|
||||
@ -639,6 +640,7 @@ public:
|
||||
bool is_insert = false) const;
|
||||
|
||||
ReservationPtr tryReserveSpacePreferringTTLRules(
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
UInt64 expected_size,
|
||||
const IMergeTreeDataPart::TTLInfos & ttl_infos,
|
||||
time_t time_of_move,
|
||||
|
@ -182,6 +182,10 @@ std::optional<TTLDescription> selectTTLDescriptionForTTLInfos(const TTLDescripti
|
||||
for (auto ttl_entry_it = descriptions.begin(); ttl_entry_it != descriptions.end(); ++ttl_entry_it)
|
||||
{
|
||||
auto ttl_info_it = ttl_info_map.find(ttl_entry_it->result_column);
|
||||
|
||||
if (ttl_info_it == ttl_info_map.end())
|
||||
continue;
|
||||
|
||||
time_t ttl_time;
|
||||
|
||||
if (use_max)
|
||||
@ -190,8 +194,7 @@ std::optional<TTLDescription> selectTTLDescriptionForTTLInfos(const TTLDescripti
|
||||
ttl_time = ttl_info_it->second.min;
|
||||
|
||||
/// Prefer TTL rule which went into action last.
|
||||
if (ttl_info_it != ttl_info_map.end()
|
||||
&& ttl_time <= current_time
|
||||
if (ttl_time <= current_time
|
||||
&& best_ttl_time <= ttl_time)
|
||||
{
|
||||
best_entry_it = ttl_entry_it;
|
||||
|
@ -229,6 +229,8 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart:
|
||||
marks.next();
|
||||
addToChecksums(checksums);
|
||||
|
||||
plain_file->finalize();
|
||||
marks_file->finalize();
|
||||
if (sync)
|
||||
{
|
||||
plain_file->sync();
|
||||
|
@ -17,8 +17,12 @@ namespace
|
||||
void MergeTreeDataPartWriterOnDisk::Stream::finalize()
|
||||
{
|
||||
compressed.next();
|
||||
plain_file->next();
|
||||
/// 'compressed_buf' doesn't call next() on underlying buffer ('plain_hashing'). We should do it manually.
|
||||
plain_hashing.next();
|
||||
marks.next();
|
||||
|
||||
plain_file->finalize();
|
||||
marks_file->finalize();
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterOnDisk::Stream::sync() const
|
||||
@ -331,6 +335,7 @@ void MergeTreeDataPartWriterOnDisk::finishPrimaryIndexSerialization(
|
||||
index_stream->next();
|
||||
checksums.files["primary.idx"].file_size = index_stream->count();
|
||||
checksums.files["primary.idx"].file_hash = index_stream->getHash();
|
||||
index_file_stream->finalize();
|
||||
if (sync)
|
||||
index_file_stream->sync();
|
||||
index_stream = nullptr;
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include <boost/rational.hpp> /// For calculations related to sampling coefficients.
|
||||
#include <ext/scope_guard.h>
|
||||
#include <optional>
|
||||
#include <unordered_set>
|
||||
|
||||
#include <Poco/File.h>
|
||||
|
||||
@ -18,6 +19,7 @@
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTSampleRatio.h>
|
||||
#include <Parsers/parseIdentifierOrStringLiteral.h>
|
||||
#include <Interpreters/ExpressionAnalyzer.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
@ -59,6 +61,7 @@ namespace ErrorCodes
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
extern const int ARGUMENT_OUT_OF_BOUND;
|
||||
extern const int TOO_MANY_ROWS;
|
||||
extern const int CANNOT_PARSE_TEXT;
|
||||
}
|
||||
|
||||
|
||||
@ -552,6 +555,38 @@ Pipe MergeTreeDataSelectExecutor::readFromParts(
|
||||
useful_indices.emplace_back(index_helper, condition);
|
||||
}
|
||||
|
||||
if (settings.force_data_skipping_indices.changed)
|
||||
{
|
||||
const auto & indices = settings.force_data_skipping_indices.toString();
|
||||
|
||||
Strings forced_indices;
|
||||
{
|
||||
Tokens tokens(&indices[0], &indices[indices.size()], settings.max_query_size);
|
||||
IParser::Pos pos(tokens, settings.max_parser_depth);
|
||||
Expected expected;
|
||||
if (!parseIdentifiersOrStringLiterals(pos, expected, forced_indices))
|
||||
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT,
|
||||
"Cannot parse force_data_skipping_indices ('{}')", indices);
|
||||
}
|
||||
|
||||
if (forced_indices.empty())
|
||||
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "No indices parsed from force_data_skipping_indices ('{}')", indices);
|
||||
|
||||
std::unordered_set<std::string> useful_indices_names;
|
||||
for (const auto & useful_index : useful_indices)
|
||||
useful_indices_names.insert(useful_index.first->index.name);
|
||||
|
||||
for (const auto & index_name : forced_indices)
|
||||
{
|
||||
if (!useful_indices_names.count(index_name))
|
||||
{
|
||||
throw Exception(ErrorCodes::INDEX_NOT_USED,
|
||||
"Index {} is not used and setting 'force_data_skipping_indices' contains it",
|
||||
backQuote(index_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RangesInDataParts parts_with_ranges(parts.size());
|
||||
size_t sum_marks = 0;
|
||||
std::atomic<size_t> sum_marks_pk = 0;
|
||||
|
@ -237,7 +237,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
|
||||
updateTTL(ttl_entry, move_ttl_infos, move_ttl_infos.moves_ttl[ttl_entry.result_column], block, false);
|
||||
|
||||
NamesAndTypesList columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
|
||||
ReservationPtr reservation = data.reserveSpacePreferringTTLRules(expected_size, move_ttl_infos, time(nullptr), 0, true);
|
||||
ReservationPtr reservation = data.reserveSpacePreferringTTLRules(metadata_snapshot, expected_size, move_ttl_infos, time(nullptr), 0, true);
|
||||
VolumePtr volume = data.getStoragePolicy()->getVolume(0);
|
||||
|
||||
auto new_data_part = data.createPart(
|
||||
|
@ -156,6 +156,7 @@ void MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr
|
||||
out_hashing.next();
|
||||
checksums.files["partition.dat"].file_size = out_hashing.count();
|
||||
checksums.files["partition.dat"].file_hash = out_hashing.getHash();
|
||||
out->finalize();
|
||||
}
|
||||
|
||||
void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row)
|
||||
|
@ -148,6 +148,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
|
||||
count_out_hashing.next();
|
||||
checksums.files["count.txt"].file_size = count_out_hashing.count();
|
||||
checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
|
||||
count_out->finalize();
|
||||
if (sync)
|
||||
count_out->sync();
|
||||
}
|
||||
@ -160,6 +161,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
|
||||
new_part->ttl_infos.write(out_hashing);
|
||||
checksums.files["ttl.txt"].file_size = out_hashing.count();
|
||||
checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
|
||||
out->finalize();
|
||||
if (sync)
|
||||
out->sync();
|
||||
}
|
||||
@ -170,6 +172,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
|
||||
/// Write a file with a description of columns.
|
||||
auto out = volume->getDisk()->writeFile(part_path + "columns.txt", 4096);
|
||||
part_columns.writeText(*out);
|
||||
out->finalize();
|
||||
if (sync)
|
||||
out->sync();
|
||||
}
|
||||
@ -178,6 +181,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
|
||||
{
|
||||
auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096);
|
||||
DB::writeText(queryToString(default_codec->getFullCodecDesc()), *out);
|
||||
out->finalize();
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -189,6 +193,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
|
||||
/// Write file with checksums.
|
||||
auto out = volume->getDisk()->writeFile(part_path + "checksums.txt", 4096);
|
||||
checksums.write(*out);
|
||||
out->finalize();
|
||||
if (sync)
|
||||
out->sync();
|
||||
}
|
||||
|
@ -715,6 +715,16 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
|
||||
for (const String & produced_part_name : queue_entry->getVirtualPartNames())
|
||||
{
|
||||
auto part_info = MergeTreePartInfo::fromPartName(produced_part_name, format_version);
|
||||
|
||||
/// Oddly enough, getVirtualPartNames() may return _virtual_ part name.
|
||||
/// Such parts do not exist and will never appear, so we should not add virtual parts to parts_to_do list.
|
||||
/// Fortunately, it's easy to distinguish virtual parts from normal parts by part level.
|
||||
/// See StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(...)
|
||||
auto max_level = MergeTreePartInfo::MAX_LEVEL; /// DROP/DETACH PARTITION
|
||||
auto another_max_level = std::numeric_limits<decltype(part_info.level)>::max(); /// REPLACE/MOVE PARTITION
|
||||
if (part_info.level == max_level || part_info.level == another_max_level)
|
||||
continue;
|
||||
|
||||
auto it = entry->block_numbers.find(part_info.partition_id);
|
||||
if (it != entry->block_numbers.end() && it->second > part_info.getDataVersion())
|
||||
mutation.parts_to_do.add(produced_part_name);
|
||||
|
@ -22,74 +22,64 @@ namespace ErrorCodes
|
||||
|
||||
class MemorySource : public SourceWithProgress
|
||||
{
|
||||
using InitializerFunc = std::function<void(BlocksList::const_iterator &, size_t &)>;
|
||||
public:
|
||||
/// We use range [first, last] which includes right border.
|
||||
/// Blocks are stored in std::list which may be appended in another thread.
|
||||
/// We don't use synchronisation here, because elements in range [first, last] won't be modified.
|
||||
/// We use pointer to the beginning of the list and its current size.
|
||||
/// We don't need synchronisation in this reader, because while we hold SharedLock on storage,
|
||||
/// only new elements can be added to the back of the list, so our iterators remain valid
|
||||
|
||||
MemorySource(
|
||||
Names column_names_,
|
||||
BlocksList::const_iterator first_,
|
||||
size_t num_blocks_,
|
||||
const StorageMemory & storage,
|
||||
const StorageMetadataPtr & metadata_snapshot)
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
InitializerFunc initializer_func_ = [](BlocksList::const_iterator &, size_t &) {})
|
||||
: SourceWithProgress(metadata_snapshot->getSampleBlockForColumns(column_names_, storage.getVirtuals(), storage.getStorageID()))
|
||||
, column_names(std::move(column_names_))
|
||||
, current_it(first_)
|
||||
, num_blocks(num_blocks_)
|
||||
, initializer_func(std::move(initializer_func_))
|
||||
{
|
||||
}
|
||||
|
||||
/// If called, will initialize the number of blocks at first read.
|
||||
/// It allows to read data which was inserted into memory table AFTER Storage::read was called.
|
||||
/// This hack is needed for global subqueries.
|
||||
void delayInitialization(std::shared_ptr<const BlocksList> data_) { data = data_; }
|
||||
|
||||
String getName() const override { return "Memory"; }
|
||||
|
||||
protected:
|
||||
Chunk generate() override
|
||||
{
|
||||
if (data)
|
||||
if (!postponed_init_done)
|
||||
{
|
||||
current_it = data->begin();
|
||||
num_blocks = data->size();
|
||||
is_finished = num_blocks == 0;
|
||||
|
||||
data = nullptr;
|
||||
initializer_func(current_it, num_blocks);
|
||||
postponed_init_done = true;
|
||||
}
|
||||
|
||||
if (is_finished)
|
||||
{
|
||||
if (current_block_idx == num_blocks)
|
||||
return {};
|
||||
}
|
||||
else
|
||||
{
|
||||
const Block & src = *current_it;
|
||||
Columns columns;
|
||||
columns.reserve(column_names.size());
|
||||
|
||||
/// Add only required columns to `res`.
|
||||
for (const auto & name : column_names)
|
||||
columns.emplace_back(src.getByName(name).column);
|
||||
const Block & src = *current_it;
|
||||
Columns columns;
|
||||
columns.reserve(column_names.size());
|
||||
|
||||
++current_block_idx;
|
||||
/// Add only required columns to `res`.
|
||||
for (const auto & name : column_names)
|
||||
columns.push_back(src.getByName(name).column);
|
||||
|
||||
if (current_block_idx == num_blocks)
|
||||
is_finished = true;
|
||||
else
|
||||
++current_it;
|
||||
if (++current_block_idx < num_blocks)
|
||||
++current_it;
|
||||
|
||||
return Chunk(std::move(columns), src.rows());
|
||||
}
|
||||
return Chunk(std::move(columns), src.rows());
|
||||
}
|
||||
private:
|
||||
Names column_names;
|
||||
BlocksList::const_iterator current_it;
|
||||
size_t current_block_idx = 0;
|
||||
size_t num_blocks;
|
||||
bool is_finished = false;
|
||||
|
||||
std::shared_ptr<const BlocksList> data = nullptr;
|
||||
private:
|
||||
const Names column_names;
|
||||
BlocksList::const_iterator current_it;
|
||||
size_t num_blocks;
|
||||
size_t current_block_idx = 0;
|
||||
|
||||
bool postponed_init_done = false;
|
||||
InitializerFunc initializer_func;
|
||||
};
|
||||
|
||||
|
||||
@ -107,10 +97,20 @@ public:
|
||||
|
||||
void write(const Block & block) override
|
||||
{
|
||||
const auto size_bytes_diff = block.allocatedBytes();
|
||||
const auto size_rows_diff = block.rows();
|
||||
|
||||
metadata_snapshot->check(block, true);
|
||||
auto new_data = std::make_unique<BlocksList>(*(storage.data.get()));
|
||||
new_data->push_back(block);
|
||||
storage.data.set(std::move(new_data));
|
||||
{
|
||||
std::lock_guard lock(storage.mutex);
|
||||
auto new_data = std::make_unique<BlocksList>(*(storage.data.get()));
|
||||
new_data->push_back(block);
|
||||
storage.data.set(std::move(new_data));
|
||||
|
||||
storage.total_size_bytes.fetch_add(size_bytes_diff, std::memory_order_relaxed);
|
||||
storage.total_size_rows.fetch_add(size_rows_diff, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
}
|
||||
private:
|
||||
StorageMemory & storage;
|
||||
@ -151,9 +151,19 @@ Pipe StorageMemory::read(
|
||||
/// set for IN or hash table for JOIN, which can't be done concurrently.
|
||||
/// Since no other manipulation with data is done, multiple sources shouldn't give any profit.
|
||||
|
||||
auto source = std::make_shared<MemorySource>(column_names, current_data->begin(), current_data->size(), *this, metadata_snapshot);
|
||||
source->delayInitialization(current_data);
|
||||
return Pipe(std::move(source));
|
||||
return Pipe(std::make_shared<MemorySource>(
|
||||
column_names,
|
||||
current_data->end(),
|
||||
0,
|
||||
*this,
|
||||
metadata_snapshot,
|
||||
/// This hack is needed for global subqueries.
|
||||
/// It allows to set up this Source for read AFTER Storage::read() has been called and just before actual reading
|
||||
[this](BlocksList::const_iterator & current_it, size_t & num_blocks) {
|
||||
std::lock_guard guard(mutex);
|
||||
current_it = current_data->begin();
|
||||
num_blocks = current_data->size();
|
||||
}));
|
||||
}
|
||||
|
||||
size_t size = current_data->size();
|
||||
@ -163,7 +173,7 @@ Pipe StorageMemory::read(
|
||||
|
||||
Pipes pipes;
|
||||
|
||||
auto it = current_data->begin();
|
||||
BlocksList::const_iterator it = current_data->begin();
|
||||
|
||||
size_t offset = 0;
|
||||
for (size_t stream = 0; stream < num_streams; ++stream)
|
||||
@ -194,7 +204,10 @@ BlockOutputStreamPtr StorageMemory::write(const ASTPtr & /*query*/, const Storag
|
||||
|
||||
void StorageMemory::drop()
|
||||
{
|
||||
data.set(std::make_unique<BlocksList>());
|
||||
std::lock_guard lock(mutex);
|
||||
data.set(std::make_unique<BlocksList>());
|
||||
total_size_bytes.store(0, std::memory_order_relaxed);
|
||||
total_size_rows.store(0, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
static inline void updateBlockData(Block & old_block, const Block & new_block)
|
||||
@ -209,6 +222,7 @@ static inline void updateBlockData(Block & old_block, const Block & new_block)
|
||||
|
||||
void StorageMemory::mutate(const MutationCommands & commands, const Context & context)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
auto storage = getStorageID();
|
||||
auto storage_ptr = DatabaseCatalog::instance().getTable(storage, context);
|
||||
@ -227,7 +241,16 @@ void StorageMemory::mutate(const MutationCommands & commands, const Context & co
|
||||
// all column affected
|
||||
if (interpreter->isAffectingAllColumns())
|
||||
{
|
||||
size_t rows = 0;
|
||||
size_t bytes = 0;
|
||||
for (const auto & buffer : out)
|
||||
{
|
||||
rows += buffer.rows();
|
||||
bytes += buffer.bytes();
|
||||
}
|
||||
data.set(std::make_unique<BlocksList>(out));
|
||||
total_size_bytes.store(rows, std::memory_order_relaxed);
|
||||
total_size_rows.store(bytes, std::memory_order_relaxed);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -240,32 +263,41 @@ void StorageMemory::mutate(const MutationCommands & commands, const Context & co
|
||||
++data_it;
|
||||
++out_it;
|
||||
}
|
||||
size_t rows = 0;
|
||||
size_t bytes = 0;
|
||||
for (const auto & buffer : *new_data)
|
||||
{
|
||||
rows += buffer.rows();
|
||||
bytes += buffer.bytes();
|
||||
}
|
||||
total_size_bytes.store(rows, std::memory_order_relaxed);
|
||||
total_size_rows.store(bytes, std::memory_order_relaxed);
|
||||
data.set(std::move(new_data));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void StorageMemory::truncate(
|
||||
const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
data.set(std::make_unique<BlocksList>());
|
||||
total_size_bytes.store(0, std::memory_order_relaxed);
|
||||
total_size_rows.store(0, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
|
||||
std::optional<UInt64> StorageMemory::totalRows() const
|
||||
{
|
||||
UInt64 rows = 0;
|
||||
auto current_data = data.get();
|
||||
for (const auto & buffer : *current_data)
|
||||
rows += buffer.rows();
|
||||
return rows;
|
||||
/// All modifications of these counters are done under mutex which automatically guarantees synchronization/consistency
|
||||
/// When run concurrently we are fine with any value: "before" or "after"
|
||||
return total_size_rows.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
|
||||
std::optional<UInt64> StorageMemory::totalBytes() const
|
||||
{
|
||||
UInt64 bytes = 0;
|
||||
auto current_data = data.get();
|
||||
for (const auto & buffer : *current_data)
|
||||
bytes += buffer.allocatedBytes();
|
||||
return bytes;
|
||||
return total_size_bytes.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1,5 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <optional>
|
||||
#include <mutex>
|
||||
|
||||
#include <ext/shared_ptr_helper.h>
|
||||
@ -94,6 +96,9 @@ private:
|
||||
|
||||
bool delay_read_for_global_subqueries = false;
|
||||
|
||||
std::atomic<size_t> total_size_bytes = 0;
|
||||
std::atomic<size_t> total_size_rows = 0;
|
||||
|
||||
protected:
|
||||
StorageMemory(const StorageID & table_id_, ColumnsDescription columns_description_, ConstraintsDescription constraints_);
|
||||
};
|
||||
|
@ -286,7 +286,12 @@ struct CurrentlyMergingPartsTagger
|
||||
StorageMergeTree & storage;
|
||||
|
||||
public:
|
||||
CurrentlyMergingPartsTagger(FutureMergedMutatedPart & future_part_, size_t total_size, StorageMergeTree & storage_, bool is_mutation)
|
||||
CurrentlyMergingPartsTagger(
|
||||
FutureMergedMutatedPart & future_part_,
|
||||
size_t total_size,
|
||||
StorageMergeTree & storage_,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
bool is_mutation)
|
||||
: future_part(future_part_), storage(storage_)
|
||||
{
|
||||
/// Assume mutex is already locked, because this method is called from mergeTask.
|
||||
@ -304,7 +309,7 @@ public:
|
||||
max_volume_index = std::max(max_volume_index, storage.getStoragePolicy()->getVolumeIndexByDisk(part_ptr->volume->getDisk()));
|
||||
}
|
||||
|
||||
reserved_space = storage.tryReserveSpacePreferringTTLRules(total_size, ttl_infos, time(nullptr), max_volume_index);
|
||||
reserved_space = storage.tryReserveSpacePreferringTTLRules(metadata_snapshot, total_size, ttl_infos, time(nullptr), max_volume_index);
|
||||
}
|
||||
if (!reserved_space)
|
||||
{
|
||||
@ -715,7 +720,7 @@ bool StorageMergeTree::merge(
|
||||
return false;
|
||||
}
|
||||
|
||||
merging_tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, false);
|
||||
merging_tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, metadata_snapshot, false);
|
||||
auto table_id = getStorageID();
|
||||
merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part);
|
||||
}
|
||||
@ -856,7 +861,7 @@ bool StorageMergeTree::tryMutatePart()
|
||||
future_part.name = part->getNewName(new_part_info);
|
||||
future_part.type = part->getType();
|
||||
|
||||
tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, true);
|
||||
tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -1416,11 +1416,11 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
|
||||
ttl_infos.update(part_ptr->ttl_infos);
|
||||
max_volume_index = std::max(max_volume_index, getStoragePolicy()->getVolumeIndexByDisk(part_ptr->volume->getDisk()));
|
||||
}
|
||||
ReservationPtr reserved_space = reserveSpacePreferringTTLRules(estimated_space_for_merge,
|
||||
ttl_infos, time(nullptr), max_volume_index);
|
||||
|
||||
auto table_lock = lockForShare(RWLockImpl::NO_QUERY, storage_settings_ptr->lock_acquire_timeout_for_background_operations);
|
||||
|
||||
StorageMetadataPtr metadata_snapshot = getInMemoryMetadataPtr();
|
||||
ReservationPtr reserved_space = reserveSpacePreferringTTLRules(
|
||||
metadata_snapshot, estimated_space_for_merge, ttl_infos, time(nullptr), max_volume_index);
|
||||
|
||||
FutureMergedMutatedPart future_merged_part(parts, entry.new_part_type);
|
||||
if (future_merged_part.name != entry.new_part_name)
|
||||
@ -4053,6 +4053,7 @@ Pipe StorageReplicatedMergeTree::alterPartition(
|
||||
|
||||
|
||||
/// If new version returns ordinary name, else returns part name containing the first and last month of the month
|
||||
/// NOTE: use it in pair with getFakePartCoveringAllPartsInPartition(...)
|
||||
static String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info)
|
||||
{
|
||||
if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
@ -4068,7 +4069,7 @@ static String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version,
|
||||
return part_info.getPartName();
|
||||
}
|
||||
|
||||
bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info)
|
||||
bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info, bool for_replace_partition)
|
||||
{
|
||||
/// Even if there is no data in the partition, you still need to mark the range for deletion.
|
||||
/// - Because before executing DETACH, tasks for downloading parts to this partition can be executed.
|
||||
@ -4091,14 +4092,21 @@ bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const St
|
||||
mutation_version = queue.getCurrentMutationVersion(partition_id, right);
|
||||
}
|
||||
|
||||
/// Empty partition.
|
||||
if (right == 0)
|
||||
return false;
|
||||
/// REPLACE PARTITION uses different max level and does not decrement max_block of DROP_RANGE for unknown (probably historical) reason.
|
||||
auto max_level = std::numeric_limits<decltype(part_info.level)>::max();
|
||||
if (!for_replace_partition)
|
||||
{
|
||||
max_level = MergeTreePartInfo::MAX_LEVEL;
|
||||
|
||||
--right;
|
||||
/// Empty partition.
|
||||
if (right == 0)
|
||||
return false;
|
||||
|
||||
--right;
|
||||
}
|
||||
|
||||
/// Artificial high level is chosen, to make this part "covering" all parts inside.
|
||||
part_info = MergeTreePartInfo(partition_id, left, right, MergeTreePartInfo::MAX_LEVEL, mutation_version);
|
||||
part_info = MergeTreePartInfo(partition_id, left, right, max_level, mutation_version);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -5305,11 +5313,11 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
|
||||
/// Firstly, generate last block number and compute drop_range
|
||||
/// NOTE: Even if we make ATTACH PARTITION instead of REPLACE PARTITION drop_range will not be empty, it will contain a block.
|
||||
/// So, such case has special meaning, if drop_range contains only one block it means that nothing to drop.
|
||||
/// TODO why not to add normal DROP_RANGE entry to replication queue if `replace` is true?
|
||||
MergeTreePartInfo drop_range;
|
||||
drop_range.partition_id = partition_id;
|
||||
drop_range.max_block = allocateBlockNumber(partition_id, zookeeper)->getNumber();
|
||||
drop_range.min_block = replace ? 0 : drop_range.max_block;
|
||||
drop_range.level = std::numeric_limits<decltype(drop_range.level)>::max();
|
||||
getFakePartCoveringAllPartsInPartition(partition_id, drop_range, true);
|
||||
if (!replace)
|
||||
drop_range.min_block = drop_range.max_block;
|
||||
|
||||
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
|
||||
|
||||
@ -5388,6 +5396,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
|
||||
}
|
||||
|
||||
/// We are almost ready to commit changes, remove fetches and merges from drop range
|
||||
/// FIXME it's unsafe to remove queue entries before we actually commit REPLACE_RANGE to replication log
|
||||
queue.removePartProducingOpsInRange(zookeeper, drop_range, entry);
|
||||
|
||||
/// Remove deduplication block_ids of replacing parts
|
||||
@ -5502,11 +5511,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
|
||||
/// A range for log entry to remove parts from the source table (myself).
|
||||
|
||||
MergeTreePartInfo drop_range;
|
||||
drop_range.partition_id = partition_id;
|
||||
drop_range.max_block = allocateBlockNumber(partition_id, zookeeper)->getNumber();
|
||||
drop_range.min_block = 0;
|
||||
drop_range.level = std::numeric_limits<decltype(drop_range.level)>::max();
|
||||
|
||||
getFakePartCoveringAllPartsInPartition(partition_id, drop_range, true);
|
||||
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
|
||||
|
||||
if (drop_range.getBlocksCount() > 1)
|
||||
@ -5561,6 +5566,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
|
||||
drop_range_dest.max_block = drop_range.max_block;
|
||||
drop_range_dest.min_block = drop_range.max_block;
|
||||
drop_range_dest.level = drop_range.level;
|
||||
drop_range_dest.mutation = drop_range.mutation;
|
||||
|
||||
entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
|
||||
entry.source_replica = dest_table_storage->replica_name;
|
||||
|
@ -526,7 +526,7 @@ private:
|
||||
|
||||
/// Produce an imaginary part info covering all parts in the specified partition (at the call moment).
|
||||
/// Returns false if the partition doesn't exist yet.
|
||||
bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info);
|
||||
bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info, bool for_replace_partition = false);
|
||||
|
||||
/// Check for a node in ZK. If it is, remember this information, and then immediately answer true.
|
||||
std::unordered_set<std::string> existing_nodes_cache;
|
||||
|
@ -32,8 +32,13 @@ Pipe StorageValues::read(
|
||||
{
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
|
||||
Chunk chunk(res_block.getColumns(), res_block.rows());
|
||||
return Pipe(std::make_shared<SourceFromSingleChunk>(res_block.cloneEmpty(), std::move(chunk)));
|
||||
/// Get only required columns.
|
||||
Block block;
|
||||
for (const auto & name : column_names)
|
||||
block.insert(res_block.getByName(name));
|
||||
|
||||
Chunk chunk(block.getColumns(), block.rows());
|
||||
return Pipe(std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), std::move(chunk)));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -102,7 +102,7 @@
|
||||
"with_coverage": false
|
||||
},
|
||||
{
|
||||
"compiler": "clang-10",
|
||||
"compiler": "clang-11",
|
||||
"build-type": "",
|
||||
"sanitizer": "",
|
||||
"package-type": "binary",
|
||||
@ -134,7 +134,7 @@
|
||||
"with_coverage": false
|
||||
},
|
||||
{
|
||||
"compiler": "clang-10-darwin",
|
||||
"compiler": "clang-11-darwin",
|
||||
"build-type": "",
|
||||
"sanitizer": "",
|
||||
"package-type": "binary",
|
||||
@ -144,7 +144,7 @@
|
||||
"with_coverage": false
|
||||
},
|
||||
{
|
||||
"compiler": "clang-10-aarch64",
|
||||
"compiler": "clang-11-aarch64",
|
||||
"build-type": "",
|
||||
"sanitizer": "",
|
||||
"package-type": "binary",
|
||||
@ -154,7 +154,7 @@
|
||||
"with_coverage": false
|
||||
},
|
||||
{
|
||||
"compiler": "clang-10-freebsd",
|
||||
"compiler": "clang-11-freebsd",
|
||||
"build-type": "",
|
||||
"sanitizer": "",
|
||||
"package-type": "binary",
|
||||
|
@ -0,0 +1,12 @@
|
||||
<yandex>
|
||||
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
|
||||
<logger>
|
||||
<level>trace</level>
|
||||
<log>/var/log/clickhouse-server/log.log</log>
|
||||
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
|
||||
<size>1000M</size>
|
||||
<count>10</count>
|
||||
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
|
||||
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
|
||||
</logger>
|
||||
</yandex>
|
@ -0,0 +1,26 @@
|
||||
<yandex>
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<s3>
|
||||
<type>s3</type>
|
||||
<!-- Use custom S3 endpoint -->
|
||||
<endpoint>http://resolver:8080/root/data/</endpoint>
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
<!-- ClickHouse starts earlier than custom S3 endpoint. Skip access check to avoid fail on start-up -->
|
||||
<skip_access_check>true</skip_access_check>
|
||||
<!-- Avoid extra retries to speed up tests -->
|
||||
<retry_attempts>0</retry_attempts>
|
||||
</s3>
|
||||
</disks>
|
||||
<policies>
|
||||
<s3>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>s3</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</s3>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
</yandex>
|
@ -0,0 +1,5 @@
|
||||
<yandex>
|
||||
<profiles>
|
||||
<default/>
|
||||
</profiles>
|
||||
</yandex>
|
@ -0,0 +1,20 @@
|
||||
<?xml version="1.0"?>
|
||||
<yandex>
|
||||
<tcp_port>9000</tcp_port>
|
||||
<listen_host>127.0.0.1</listen_host>
|
||||
|
||||
<openSSL>
|
||||
<client>
|
||||
<cacheSessions>true</cacheSessions>
|
||||
<verificationMode>none</verificationMode>
|
||||
<invalidCertificateHandler>
|
||||
<name>AcceptCertificateHandler</name>
|
||||
</invalidCertificateHandler>
|
||||
</client>
|
||||
</openSSL>
|
||||
|
||||
<max_concurrent_queries>500</max_concurrent_queries>
|
||||
<mark_cache_size>5368709120</mark_cache_size>
|
||||
<path>./clickhouse/</path>
|
||||
<users_config>users.xml</users_config>
|
||||
</yandex>
|
@ -0,0 +1,49 @@
|
||||
from bottle import request, route, run, response
|
||||
|
||||
|
||||
# Endpoint can be configured to throw 500 error on N-th request attempt.
|
||||
# In usual situation just redirects to original Minio server.
|
||||
|
||||
# Dict to the number of request should be failed.
|
||||
cache = {}
|
||||
|
||||
|
||||
@route('/fail_request/<_request_number>')
|
||||
def fail_request(_request_number):
|
||||
request_number = int(_request_number)
|
||||
if request_number > 0:
|
||||
cache['request_number'] = request_number
|
||||
else:
|
||||
cache.pop('request_number', None)
|
||||
return 'OK'
|
||||
|
||||
|
||||
# Handle for MultipleObjectsDelete.
|
||||
@route('/<_bucket>', ['POST'])
|
||||
def delete(_bucket):
|
||||
response.set_header("Location", "http://minio1:9001/" + _bucket + "?" + request.query_string)
|
||||
response.status = 307
|
||||
return 'Redirected'
|
||||
|
||||
|
||||
@route('/<_bucket>/<_path:path>', ['GET', 'POST', 'PUT', 'DELETE'])
|
||||
def server(_bucket, _path):
|
||||
if cache.get('request_number', None):
|
||||
request_number = cache.pop('request_number') - 1
|
||||
if request_number > 0:
|
||||
cache['request_number'] = request_number
|
||||
else:
|
||||
response.status = 500
|
||||
return 'Expected Error'
|
||||
|
||||
response.set_header("Location", "http://minio1:9001/" + _bucket + '/' + _path)
|
||||
response.status = 307
|
||||
return 'Redirected'
|
||||
|
||||
|
||||
@route('/')
|
||||
def ping():
|
||||
return 'OK'
|
||||
|
||||
|
||||
run(host='0.0.0.0', port=8080)
|
117
tests/integration/test_merge_tree_s3_failover/test.py
Normal file
117
tests/integration/test_merge_tree_s3_failover/test.py
Normal file
@ -0,0 +1,117 @@
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from helpers.client import QueryRuntimeException
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
logging.getLogger().addHandler(logging.StreamHandler())
|
||||
|
||||
|
||||
# Runs custom python-based S3 endpoint.
|
||||
def run_endpoint(cluster):
|
||||
logging.info("Starting custom S3 endpoint")
|
||||
container_id = cluster.get_container_id('resolver')
|
||||
current_dir = os.path.dirname(__file__)
|
||||
cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_endpoint", "endpoint.py"), "endpoint.py")
|
||||
cluster.exec_in_container(container_id, ["python", "endpoint.py"], detach=True)
|
||||
|
||||
# Wait for S3 endpoint start
|
||||
for attempt in range(10):
|
||||
ping_response = cluster.exec_in_container(cluster.get_container_id('resolver'),
|
||||
["curl", "-s", "http://resolver:8080/"], nothrow=True)
|
||||
if ping_response != 'OK':
|
||||
if attempt == 9:
|
||||
assert ping_response == 'OK', 'Expected "OK", but got "{}"'.format(ping_response)
|
||||
else:
|
||||
time.sleep(1)
|
||||
else:
|
||||
break
|
||||
|
||||
logging.info("S3 endpoint started")
|
||||
|
||||
|
||||
def fail_request(cluster, request):
|
||||
response = cluster.exec_in_container(cluster.get_container_id('resolver'),
|
||||
["curl", "-s", "http://resolver:8080/fail_request/{}".format(request)])
|
||||
assert response == 'OK', 'Expected "OK", but got "{}"'.format(response)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def cluster():
|
||||
try:
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
cluster.add_instance("node",
|
||||
main_configs=["configs/config.d/log_conf.xml", "configs/config.d/storage_conf.xml"],
|
||||
with_minio=True)
|
||||
logging.info("Starting cluster...")
|
||||
cluster.start()
|
||||
logging.info("Cluster started")
|
||||
|
||||
run_endpoint(cluster)
|
||||
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def drop_table(cluster):
|
||||
yield
|
||||
node = cluster.instances["node"]
|
||||
node.query("DROP TABLE IF EXISTS s3_failover_test NO DELAY")
|
||||
|
||||
|
||||
# S3 request will be failed for an appropriate part file write.
|
||||
FILES_PER_PART_BASE = 5 # partition.dat, default_compression_codec.txt, count.txt, columns.txt, checksums.txt
|
||||
FILES_PER_PART_WIDE = FILES_PER_PART_BASE + 1 + 1 + 3 * 2 # Primary index, MinMax, Mark and data file for column(s)
|
||||
FILES_PER_PART_COMPACT = FILES_PER_PART_BASE + 1 + 1 + 2
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"min_bytes_for_wide_part,request_count",
|
||||
[
|
||||
(0, FILES_PER_PART_WIDE),
|
||||
(1024 * 1024, FILES_PER_PART_COMPACT)
|
||||
]
|
||||
)
|
||||
def test_write_failover(cluster, min_bytes_for_wide_part, request_count):
|
||||
node = cluster.instances["node"]
|
||||
|
||||
node.query(
|
||||
"""
|
||||
CREATE TABLE s3_failover_test (
|
||||
dt Date,
|
||||
id Int64,
|
||||
data String
|
||||
) ENGINE=MergeTree()
|
||||
ORDER BY id
|
||||
PARTITION BY dt
|
||||
SETTINGS storage_policy='s3', min_bytes_for_wide_part={}
|
||||
"""
|
||||
.format(min_bytes_for_wide_part)
|
||||
)
|
||||
|
||||
for request in range(request_count + 1):
|
||||
# Fail N-th request to S3.
|
||||
fail_request(cluster, request + 1)
|
||||
|
||||
data = "('2020-03-01',0,'data'),('2020-03-01',1,'data')"
|
||||
positive = request == request_count
|
||||
try:
|
||||
node.query("INSERT INTO s3_failover_test VALUES {}".format(data))
|
||||
|
||||
assert positive, "Insert query should be failed, request {}".format(request)
|
||||
except QueryRuntimeException as e:
|
||||
assert not positive, "Insert query shouldn't be failed, request {}".format(request)
|
||||
assert str(e).find("Expected Error") != -1, "Unexpected error {}".format(str(e))
|
||||
|
||||
if positive:
|
||||
# Disable request failing.
|
||||
fail_request(cluster, 0)
|
||||
|
||||
assert node.query("CHECK TABLE s3_failover_test") == '1\n'
|
||||
assert node.query("SELECT * FROM s3_failover_test FORMAT Values") == data
|
@ -6,7 +6,7 @@ from helpers.hdfs_api import HDFSApi
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node1 = cluster.add_instance('node1', with_hdfs=True, user_configs=[], main_configs=['configs/log_conf.xml'])
|
||||
node1 = cluster.add_instance('node1', with_hdfs=True, main_configs=['configs/log_conf.xml'])
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
|
@ -0,0 +1,14 @@
|
||||
1 1
|
||||
2 2
|
||||
3 3
|
||||
4 4
|
||||
mt 0 0_1_1_0 2
|
||||
rmt 0 0_0_0_0 2
|
||||
1 1
|
||||
2 2
|
||||
mt 0 0_1_1_0 2
|
||||
rmt 0 0_3_3_0 2
|
||||
0000000000 UPDATE s = concat(\'s\', toString(n)) WHERE 1 [] 0 1
|
||||
0000000001 DROP COLUMN s [] 0 1
|
||||
3
|
||||
4
|
@ -0,0 +1,39 @@
|
||||
drop table if exists mt;
|
||||
drop table if exists rmt sync;
|
||||
|
||||
create table mt (n UInt64, s String) engine = MergeTree partition by intDiv(n, 10) order by n;
|
||||
insert into mt values (3, '3'), (4, '4');
|
||||
|
||||
create table rmt (n UInt64, s String) engine = ReplicatedMergeTree('/clickhouse/test_01149/rmt', 'r1') partition by intDiv(n, 10) order by n;
|
||||
insert into rmt values (1,'1'), (2, '2');
|
||||
|
||||
select * from rmt;
|
||||
select * from mt;
|
||||
select table, partition_id, name, rows from system.parts where database=currentDatabase() and table in ('mt', 'rmt') and active=1 order by table, name;
|
||||
|
||||
alter table rmt update s = 's'||toString(n) where 1;
|
||||
|
||||
select * from rmt;
|
||||
alter table rmt replace partition '0' from mt;
|
||||
|
||||
select table, partition_id, name, rows from system.parts where database=currentDatabase() and table in ('mt', 'rmt') and active=1 order by table, name;
|
||||
|
||||
alter table rmt drop column s;
|
||||
|
||||
select mutation_id, command, parts_to_do_names, parts_to_do, is_done from system.mutations where database=currentDatabase() and table='rmt';
|
||||
select * from rmt;
|
||||
|
||||
drop table rmt sync;
|
||||
|
||||
set replication_alter_partitions_sync=0;
|
||||
create table rmt (n UInt64, s String) engine = ReplicatedMergeTree('/clickhouse/test_01149/rmt', 'r1') partition by intDiv(n, 10) order by n;
|
||||
insert into rmt values (1,'1'), (2, '2');
|
||||
|
||||
alter table rmt update s = 's'||toString(n) where 1;
|
||||
alter table rmt drop partition '0';
|
||||
|
||||
set replication_alter_partitions_sync=1;
|
||||
alter table rmt drop column s;
|
||||
|
||||
drop table mt;
|
||||
drop table rmt sync;
|
@ -42,3 +42,14 @@
|
||||
1 1 Int128 Int256
|
||||
1 1 Int128 Int256
|
||||
-1 -1 Int128 Int256
|
||||
1 1 Float64 Float64
|
||||
1 1 Float64 Float64
|
||||
1 1 Float64 Float64
|
||||
1 1 Float64 Float64
|
||||
-1 -1 Float64 Float64
|
||||
-1 -1 Float64 Float64
|
||||
-1 -1 Float64 Float64
|
||||
-1 -1 Float64 Float64
|
||||
1 1 Float64 Float64
|
||||
1 1 Float64 Float64
|
||||
-1 -1 Float64 Float64
|
||||
|
@ -58,16 +58,16 @@ select intDiv(toInt128(-1), toInt256(-1)) x, intDiv(toInt256(-1), toInt256(-1))
|
||||
select intDiv(toInt128(-1), toUInt256(1)) x, intDiv(toInt256(-1), toUInt256(1)) y, toTypeName(x), toTypeName(y);
|
||||
|
||||
|
||||
-- select (toInt128(-1) / toInt8(1)) x, (toInt256(-1) / toInt8(1)) y, toTypeName(x), toTypeName(y);
|
||||
-- select (toInt128(-1) / toInt16(1)) x, (toInt256(-1) / toInt16(1)) y, toTypeName(x), toTypeName(y);
|
||||
-- select (toInt128(-1) / toInt32(1)) x, (toInt256(-1) / toInt32(1)) y, toTypeName(x), toTypeName(y);
|
||||
-- select (toInt128(-1) / toInt64(1)) x, (toInt256(-1) / toInt64(1)) y, toTypeName(x), toTypeName(y);
|
||||
-- select (toInt128(-1) / toUInt8(1)) x, (toInt256(-1) / toUInt8(1)) y, toTypeName(x), toTypeName(y);
|
||||
-- select (toInt128(-1) / toUInt16(1)) x, (toInt256(-1) / toUInt16(1)) y, toTypeName(x), toTypeName(y);
|
||||
-- select (toInt128(-1) / toUInt32(1)) x, (toInt256(-1) / toUInt32(1)) y, toTypeName(x), toTypeName(y);
|
||||
-- select (toInt128(-1) / toUInt64(1)) x, (toInt256(-1) / toUInt64(1)) y, toTypeName(x), toTypeName(y);
|
||||
select (toInt128(-1) / toInt8(-1)) x, (toInt256(-1) / toInt8(-1)) y, toTypeName(x), toTypeName(y);
|
||||
select (toInt128(-1) / toInt16(-1)) x, (toInt256(-1) / toInt16(-1)) y, toTypeName(x), toTypeName(y);
|
||||
select (toInt128(-1) / toInt32(-1)) x, (toInt256(-1) / toInt32(-1)) y, toTypeName(x), toTypeName(y);
|
||||
select (toInt128(-1) / toInt64(-1)) x, (toInt256(-1) / toInt64(-1)) y, toTypeName(x), toTypeName(y);
|
||||
select (toInt128(-1) / toUInt8(1)) x, (toInt256(-1) / toUInt8(1)) y, toTypeName(x), toTypeName(y);
|
||||
select (toInt128(-1) / toUInt16(1)) x, (toInt256(-1) / toUInt16(1)) y, toTypeName(x), toTypeName(y);
|
||||
select (toInt128(-1) / toUInt32(1)) x, (toInt256(-1) / toUInt32(1)) y, toTypeName(x), toTypeName(y);
|
||||
select (toInt128(-1) / toUInt64(1)) x, (toInt256(-1) / toUInt64(1)) y, toTypeName(x), toTypeName(y);
|
||||
|
||||
-- select (toInt128(-1) * toInt128(1)) x, (toInt256(-1) * toInt128(1)) y, toTypeName(x), toTypeName(y);
|
||||
-- select (toInt128(-1) * toInt256(1)) x, (toInt256(-1) * toInt256(1)) y, toTypeName(x), toTypeName(y);
|
||||
-- --select (toInt128(-1) * toUInt128(1)) x, (toInt256(-1) * toUInt128(1)) y, toTypeName(x), toTypeName(y);
|
||||
-- select (toInt128(-1) * toUInt256(1)) x, (toInt256(-1) * toUInt256(1)) y, toTypeName(x), toTypeName(y);
|
||||
select (toInt128(-1) / toInt128(-1)) x, (toInt256(-1) / toInt128(-1)) y, toTypeName(x), toTypeName(y);
|
||||
select (toInt128(-1) / toInt256(-1)) x, (toInt256(-1) / toInt256(-1)) y, toTypeName(x), toTypeName(y);
|
||||
--select (toInt128(-1) / toUInt128(1)) x, (toInt256(-1) / toUInt128(1)) y, toTypeName(x), toTypeName(y);
|
||||
select (toInt128(-1) / toUInt256(1)) x, (toInt256(-1) / toUInt256(1)) y, toTypeName(x), toTypeName(y);
|
||||
|
@ -0,0 +1,2 @@
|
||||
0
|
||||
1
|
@ -0,0 +1,36 @@
|
||||
CREATE DATABASE IF NOT EXISTS shard_0;
|
||||
CREATE DATABASE IF NOT EXISTS shard_1;
|
||||
CREATE DATABASE IF NOT EXISTS main_01487;
|
||||
CREATE DATABASE IF NOT EXISTS test_01487;
|
||||
|
||||
USE main_01487;
|
||||
|
||||
DROP TABLE IF EXISTS shard_0.l;
|
||||
DROP TABLE IF EXISTS shard_1.l;
|
||||
DROP TABLE IF EXISTS d;
|
||||
DROP TABLE IF EXISTS t;
|
||||
|
||||
CREATE TABLE shard_0.l (value UInt8) ENGINE = MergeTree ORDER BY value;
|
||||
CREATE TABLE shard_1.l (value UInt8) ENGINE = MergeTree ORDER BY value;
|
||||
CREATE TABLE t (value UInt8) ENGINE = Memory;
|
||||
|
||||
INSERT INTO shard_0.l VALUES (0);
|
||||
INSERT INTO shard_1.l VALUES (1);
|
||||
INSERT INTO t VALUES (0), (1), (2);
|
||||
|
||||
CREATE TABLE d AS t ENGINE = Distributed(test_cluster_two_shards_different_databases, currentDatabase(), t);
|
||||
|
||||
USE test_01487;
|
||||
DROP DATABASE test_01487;
|
||||
|
||||
SELECT * FROM main_01487.d WHERE value IN (SELECT l.value FROM l) ORDER BY value;
|
||||
|
||||
USE main_01487;
|
||||
|
||||
DROP TABLE IF EXISTS shard_0.l;
|
||||
DROP TABLE IF EXISTS shard_1.l;
|
||||
DROP TABLE IF EXISTS d;
|
||||
DROP TABLE IF EXISTS t;
|
||||
|
||||
DROP DATABASE shard_0;
|
||||
DROP DATABASE shard_1;
|
21
tests/queries/0_stateless/01514_distributed_cancel_query_on_error.sh
Executable file
21
tests/queries/0_stateless/01514_distributed_cancel_query_on_error.sh
Executable file
@ -0,0 +1,21 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
# _shard_num:
|
||||
# 1 on 127.2
|
||||
# 2 on 127.3
|
||||
# max_block_size to fail faster
|
||||
# max_memory_usage/_shard_num/repeat() will allow failure on the first shard earlier.
|
||||
opts=(
|
||||
--max_memory_usage=3G
|
||||
--max_block_size=50
|
||||
--max_threads=1
|
||||
--max_distributed_connections=2
|
||||
)
|
||||
${CLICKHOUSE_CLIENT} "${opts[@]}" -q "SELECT groupArray(repeat('a', 1000*_shard_num)), number%100000 k from remote('127.{2,3}', system.numbers) GROUP BY k LIMIT 10e6" |& {
|
||||
# the query should fail earlier on 127.3 and 127.2 should not even go to the memory limit exceeded error.
|
||||
fgrep -q 'DB::Exception: Received from 127.3:9000. DB::Exception: Memory limit (for query) exceeded:'
|
||||
# while if this will not correctly then it will got the exception from the 127.2:9000 and fail
|
||||
}
|
@ -0,0 +1,31 @@
|
||||
DROP TABLE IF EXISTS data_01515;
|
||||
CREATE TABLE data_01515
|
||||
(
|
||||
key Int,
|
||||
d1 Int,
|
||||
d1_null Nullable(Int),
|
||||
INDEX d1_idx d1 TYPE minmax GRANULARITY 1,
|
||||
INDEX d1_null_idx assumeNotNull(d1_null) TYPE minmax GRANULARITY 1
|
||||
)
|
||||
Engine=MergeTree()
|
||||
ORDER BY key;
|
||||
|
||||
SELECT * FROM data_01515;
|
||||
SELECT * FROM data_01515 SETTINGS force_data_skipping_indices=''; -- { serverError 6 }
|
||||
SELECT * FROM data_01515 SETTINGS force_data_skipping_indices='d1_idx'; -- { serverError 277 }
|
||||
SELECT * FROM data_01515 SETTINGS force_data_skipping_indices='d1_null_idx'; -- { serverError 277 }
|
||||
|
||||
SELECT * FROM data_01515 WHERE d1 = 0 SETTINGS force_data_skipping_indices='d1_idx';
|
||||
SELECT * FROM data_01515 WHERE d1 = 0 SETTINGS force_data_skipping_indices='`d1_idx`';
|
||||
SELECT * FROM data_01515 WHERE d1 = 0 SETTINGS force_data_skipping_indices=' d1_idx ';
|
||||
SELECT * FROM data_01515 WHERE d1 = 0 SETTINGS force_data_skipping_indices=' d1_idx ';
|
||||
SELECT * FROM data_01515 WHERE d1 = 0 SETTINGS force_data_skipping_indices='d1_idx,d1_null_idx'; -- { serverError 277 }
|
||||
SELECT * FROM data_01515 WHERE d1 = 0 SETTINGS force_data_skipping_indices='d1_null_idx,d1_idx'; -- { serverError 277 }
|
||||
SELECT * FROM data_01515 WHERE d1 = 0 SETTINGS force_data_skipping_indices='d1_null_idx,d1_idx,,'; -- { serverError 277 }
|
||||
SELECT * FROM data_01515 WHERE d1 = 0 SETTINGS force_data_skipping_indices=' d1_null_idx,d1_idx'; -- { serverError 277 }
|
||||
SELECT * FROM data_01515 WHERE d1 = 0 SETTINGS force_data_skipping_indices=' `d1_null_idx`,d1_idx'; -- { serverError 277 }
|
||||
SELECT * FROM data_01515 WHERE d1 = 0 SETTINGS force_data_skipping_indices='d1_null_idx'; -- { serverError 277 }
|
||||
SELECT * FROM data_01515 WHERE d1 = 0 SETTINGS force_data_skipping_indices=' d1_null_idx '; -- { serverError 277 }
|
||||
|
||||
SELECT * FROM data_01515 WHERE d1_null = 0 SETTINGS force_data_skipping_indices='d1_null_idx'; -- { serverError 277 }
|
||||
SELECT * FROM data_01515 WHERE assumeNotNull(d1_null) = 0 SETTINGS force_data_skipping_indices='d1_null_idx';
|
@ -0,0 +1,48 @@
|
||||
CREATE TABLE visits
|
||||
(
|
||||
`CounterID` UInt32,
|
||||
`StartDate` Date,
|
||||
`StartTime` DateTime,
|
||||
`GoalsID` Array(UInt32),
|
||||
`Sign` Int8
|
||||
)
|
||||
ENGINE = Null;
|
||||
|
||||
|
||||
CREATE MATERIALIZED VIEW goal_view TO goal
|
||||
(
|
||||
`CounterID` UInt32,
|
||||
`StartDate` Date,
|
||||
`GoalID` UInt32,
|
||||
`Visits` AggregateFunction(sumIf, Int8, UInt8),
|
||||
`GoalReaches` AggregateFunction(sum, Int8)
|
||||
) AS
|
||||
SELECT
|
||||
CounterID,
|
||||
StartDate,
|
||||
GoalID,
|
||||
sumIfState(Sign, _uniq = 1) AS Visits,
|
||||
sumState(Sign) AS GoalReaches
|
||||
FROM visits
|
||||
ARRAY JOIN
|
||||
GoalsID AS GoalID,
|
||||
arrayEnumerateUniq(GoalsID) AS _uniq
|
||||
GROUP BY
|
||||
CounterID,
|
||||
StartDate,
|
||||
GoalID
|
||||
ORDER BY
|
||||
CounterID ASC,
|
||||
StartDate ASC,
|
||||
GoalID ASC;
|
||||
|
||||
CREATE TABLE goal
|
||||
(
|
||||
`CounterID` UInt32,
|
||||
`StartDate` Date,
|
||||
`GoalID` UInt32,
|
||||
`Visits` AggregateFunction(sumIf, Int8, UInt8),
|
||||
`GoalReaches` AggregateFunction(sum, Int8)
|
||||
) ENGINE = AggregatingMergeTree PARTITION BY toStartOfMonth(StartDate) ORDER BY (CounterID, StartDate, GoalID) SETTINGS index_granularity = 256;
|
||||
|
||||
INSERT INTO visits (`CounterID`,`StartDate`,`StartTime`,`Sign`,`GoalsID`) VALUES (1, toDate('2000-01-01'), toDateTime(toDate('2000-01-01')), 1, [1]);
|
@ -149,3 +149,4 @@
|
||||
00609_mv_index_in_in
|
||||
00510_materizlized_view_and_deduplication_zookeeper
|
||||
00738_lock_for_inner_table
|
||||
01515_force_data_skipping_indices
|
||||
|
@ -9,4 +9,10 @@ if (USE_PROTOBUF)
|
||||
target_link_libraries (ProtobufDelimitedMessagesSerializer PRIVATE ${Protobuf_LIBRARY} boost::program_options)
|
||||
get_filename_component(ProtobufDelimitedMessagesSerializer_OutputDir "${CMAKE_CURRENT_LIST_DIR}/../../tests/queries/0_stateless" REALPATH)
|
||||
target_compile_definitions(ProtobufDelimitedMessagesSerializer PRIVATE OUTPUT_DIR="${ProtobufDelimitedMessagesSerializer_OutputDir}")
|
||||
|
||||
# Protoc generates substandard code.
|
||||
check_cxx_compiler_flag("-Wsuggest-destructor-override" HAS_SUGGEST_DESTRUCTOR_OVERRIDE)
|
||||
if (HAS_SUGGEST_OVERRIDE)
|
||||
target_compile_options(ProtobufDelimitedMessagesSerializer PRIVATE -Wno-suggest-destructor-override)
|
||||
endif()
|
||||
endif ()
|
||||
|
@ -1,188 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/Types.h>
|
||||
#include <Common/HashTable/HashMap.h>
|
||||
#include <Common/Arena.h>
|
||||
#include <ext/bit_cast.h>
|
||||
#include <common/StringRef.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
||||
class MarkovModel
|
||||
{
|
||||
private:
|
||||
using NGramHash = UInt32;
|
||||
|
||||
struct HistogramElement
|
||||
{
|
||||
UInt8 byte;
|
||||
UInt32 count;
|
||||
};
|
||||
|
||||
struct Histogram
|
||||
{
|
||||
UInt32 total = 0;
|
||||
std::vector<HistogramElement> data;
|
||||
|
||||
void add(UInt8 byte)
|
||||
{
|
||||
++total;
|
||||
|
||||
for (auto & elem : data)
|
||||
{
|
||||
if (elem.byte == byte)
|
||||
{
|
||||
++elem.count;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
data.emplace_back(HistogramElement{.byte = byte, .count = 1});
|
||||
}
|
||||
|
||||
UInt8 sample(UInt32 random) const
|
||||
{
|
||||
random %= total;
|
||||
|
||||
UInt32 sum = 0;
|
||||
for (const auto & elem : data)
|
||||
{
|
||||
sum += elem.count;
|
||||
if (sum > random)
|
||||
return elem.byte;
|
||||
}
|
||||
|
||||
__builtin_unreachable();
|
||||
}
|
||||
};
|
||||
|
||||
using Table = HashMap<NGramHash, Histogram, TrivialHash>;
|
||||
Table table;
|
||||
|
||||
size_t n;
|
||||
|
||||
|
||||
NGramHash hashContext(const char * pos, const char * data, size_t size) const
|
||||
{
|
||||
if (pos >= data + n)
|
||||
return CRC32Hash()(StringRef(pos - n, n));
|
||||
else
|
||||
return CRC32Hash()(StringRef(data, pos - data));
|
||||
}
|
||||
|
||||
public:
|
||||
explicit MarkovModel(size_t n_) : n(n_) {}
|
||||
MarkovModel() {}
|
||||
|
||||
void consume(const char * data, size_t size)
|
||||
{
|
||||
const char * pos = data;
|
||||
const char * end = data + size;
|
||||
|
||||
while (pos < end)
|
||||
{
|
||||
table[hashContext(pos, data, size)].add(*pos);
|
||||
++pos;
|
||||
}
|
||||
|
||||
/// Mark end of string as zero byte.
|
||||
table[hashContext(pos, data, size)].add(0);
|
||||
}
|
||||
|
||||
|
||||
template <typename Random>
|
||||
size_t generate(char * data, size_t size, Random && random) const
|
||||
{
|
||||
char * pos = data;
|
||||
char * end = data + size;
|
||||
|
||||
while (pos < end)
|
||||
{
|
||||
auto it = table.find(hashContext(pos, data, size));
|
||||
if (table.end() == it)
|
||||
return pos - data;
|
||||
|
||||
*pos = it->getMapped().sample(random());
|
||||
|
||||
/// Zero byte marks end of string.
|
||||
if (0 == *pos)
|
||||
return pos - data;
|
||||
|
||||
++pos;
|
||||
}
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
|
||||
/// Allows to add random noise to frequencies.
|
||||
template <typename Transform>
|
||||
void modifyCounts(Transform && transform)
|
||||
{
|
||||
for (auto & elem : table)
|
||||
{
|
||||
UInt32 new_total = 0;
|
||||
for (auto & frequency : elem.getMapped().data)
|
||||
{
|
||||
frequency.count = transform(frequency.count);
|
||||
new_total += frequency.count;
|
||||
}
|
||||
elem.getMapped().total = new_total;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void write(WriteBuffer & out) const
|
||||
{
|
||||
writeBinary(UInt8(n), out);
|
||||
writeVarUInt(table.size(), out);
|
||||
|
||||
for (const auto & elem : table)
|
||||
{
|
||||
writeBinary(elem.getKey(), out);
|
||||
writeBinary(UInt8(elem.getMapped().data.size()), out);
|
||||
|
||||
for (const auto & frequency : elem.getMapped().data)
|
||||
{
|
||||
writeBinary(frequency.byte, out);
|
||||
writeVarUInt(frequency.count, out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void read(ReadBuffer & in)
|
||||
{
|
||||
table.clear();
|
||||
|
||||
UInt8 read_n = 0;
|
||||
readBinary(read_n, in);
|
||||
n = read_n;
|
||||
|
||||
size_t read_size = 0;
|
||||
readVarUInt(read_size, in);
|
||||
|
||||
for (size_t i = 0; i < read_size; ++i)
|
||||
{
|
||||
NGramHash key = 0;
|
||||
UInt8 historgam_size = 0;
|
||||
readBinary(key, in);
|
||||
readBinary(historgam_size, in);
|
||||
|
||||
Histogram & histogram = table[key];
|
||||
histogram.data.resize(historgam_size);
|
||||
|
||||
for (size_t j = 0; j < historgam_size; ++j)
|
||||
{
|
||||
readBinary(histogram.data[j].byte, in);
|
||||
readVarUInt(histogram.data[j].count, in);
|
||||
histogram.total += histogram.data[j].count;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user