Merge branch 'master' into recreate_kafka_table_case

This commit is contained in:
mergify[bot] 2021-06-09 10:30:32 +00:00 committed by GitHub
commit bc012b9c1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
198 changed files with 3736 additions and 1352 deletions

5
.gitignore vendored
View File

@ -14,6 +14,11 @@
/build-*
/tests/venv
# logs
*.log
*.stderr
*.stdout
/docs/build
/docs/publish
/docs/edit

1
.gitmodules vendored
View File

@ -228,7 +228,6 @@
[submodule "contrib/datasketches-cpp"]
path = contrib/datasketches-cpp
url = https://github.com/ClickHouse-Extras/datasketches-cpp.git
[submodule "contrib/yaml-cpp"]
path = contrib/yaml-cpp
url = https://github.com/ClickHouse-Extras/yaml-cpp.git

View File

@ -2,6 +2,7 @@
#### Upgrade Notes
* Do not upgrade if you have partition key with `UUID`.
* `zstd` compression library is updated to v1.5.0. You may get messages about "checksum does not match" in replication. These messages are expected due to update of compression algorithm and you can ignore them. These messages are informational and do not indicate any kinds of undesired behaviour.
* The setting `compile_expressions` is enabled by default. Although it has been heavily tested on variety of scenarios, if you find some undesired behaviour on your servers, you can try turning this setting off.
* Values of `UUID` type cannot be compared with integer. For example, instead of writing `uuid != 0` type `uuid != '00000000-0000-0000-0000-000000000000'`.
@ -763,6 +764,7 @@
* Allow using extended integer types (`Int128`, `Int256`, `UInt256`) in `avg` and `avgWeighted` functions. Also allow using different types (integer, decimal, floating point) for value and for weight in `avgWeighted` function. This is a backward-incompatible change: now the `avg` and `avgWeighted` functions always return `Float64` (as documented). Before this change the return type for `Decimal` arguments was also `Decimal`. [#15419](https://github.com/ClickHouse/ClickHouse/pull/15419) ([Mike](https://github.com/myrrc)).
* Expression `toUUID(N)` no longer works. Replace with `toUUID('00000000-0000-0000-0000-000000000000')`. This change is motivated by non-obvious results of `toUUID(N)` where N is non zero.
* SSL Certificates with incorrect "key usage" are rejected. In previous versions they are used to work. See [#19262](https://github.com/ClickHouse/ClickHouse/issues/19262).
* `incl` references to substitutions file (`/etc/metrika.xml`) were removed from the default config (`<remote_servers>`, `<zookeeper>`, `<macros>`, `<compression>`, `<networks>`). If you were using substitutions file and were relying on those implicit references, you should put them back manually and explicitly by adding corresponding sections with `incl="..."` attributes before the update. See [#18740](https://github.com/ClickHouse/ClickHouse/pull/18740) ([alexey-milovidov](https://github.com/alexey-milovidov)).
#### New Feature

View File

@ -528,7 +528,6 @@ include (cmake/find/libpqxx.cmake)
include (cmake/find/nuraft.cmake)
include (cmake/find/yaml-cpp.cmake)
if(NOT USE_INTERNAL_PARQUET_LIBRARY)
set (ENABLE_ORC OFF CACHE INTERNAL "")
endif()

View File

@ -4,12 +4,14 @@
#include <Core/Block.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/TextLog.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <sys/time.h>
#include <Poco/Message.h>
#include <Common/CurrentThread.h>
#include <Common/DNSResolver.h>
#include <common/getThreadId.h>
#include <Common/SensitiveDataMasker.h>
#include <Common/IO.h>
namespace DB
{
@ -26,16 +28,48 @@ void OwnSplitChannel::log(const Poco::Message & msg)
auto matches = masker->wipeSensitiveData(message_text);
if (matches > 0)
{
logSplit({msg, message_text}); // we will continue with the copy of original message with text modified
tryLogSplit({msg, message_text}); // we will continue with the copy of original message with text modified
return;
}
}
logSplit(msg);
tryLogSplit(msg);
}
void OwnSplitChannel::tryLogSplit(const Poco::Message & msg)
{
try
{
logSplit(msg);
}
/// It is better to catch the errors here in order to avoid
/// breaking some functionality because of unexpected "File not
/// found" (or similar) error.
///
/// For example StorageDistributedDirectoryMonitor will mark batch
/// as broken, some MergeTree code can also be affected.
///
/// Also note, that we cannot log the exception here, since this
/// will lead to recursion, using regular tryLogCurrentException().
/// but let's log it into the stderr at least.
catch (...)
{
MemoryTracker::LockExceptionInThread lock_memory_tracker(VariableContext::Global);
const std::string & exception_message = getCurrentExceptionMessage(true);
const std::string & message = msg.getText();
/// NOTE: errors are ignored, since nothing can be done.
writeRetry(STDERR_FILENO, "Cannot add message to the log: ");
writeRetry(STDERR_FILENO, message.data(), message.size());
writeRetry(STDERR_FILENO, "\n");
writeRetry(STDERR_FILENO, exception_message.data(), exception_message.size());
writeRetry(STDERR_FILENO, "\n");
}
}
void OwnSplitChannel::logSplit(const Poco::Message & msg)
{
ExtendedLogMessage msg_ext = ExtendedLogMessage::getFrom(msg);

View File

@ -24,6 +24,7 @@ public:
private:
void logSplit(const Poco::Message & msg);
void tryLogSplit(const Poco::Message & msg);
using ChannelPtr = Poco::AutoPtr<Poco::Channel>;
/// Handler and its pointer casted to extended interface

View File

@ -4,6 +4,6 @@ if (NOT USE_YAML_CPP)
return()
endif()
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/yaml-cpp")
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/yaml-cpp/README.md")
message (ERROR "submodule contrib/yaml-cpp is missing. to fix try run: \n git submodule update --init --recursive")
endif()

View File

@ -61,7 +61,6 @@ endif()
add_subdirectory (poco-cmake)
add_subdirectory (croaring-cmake)
# TODO: refactor the contrib libraries below this comment.
if (USE_INTERNAL_ZSTD_LIBRARY)

2
contrib/avro vendored

@ -1 +1 @@
Subproject commit 1ee16d8c5a7808acff5cf0475f771195d9aa3faa
Subproject commit e43c46e87fd32eafdc09471e95344555454c5ef8

View File

@ -1,6 +1,6 @@
if (SANITIZE OR NOT (
((OS_LINUX OR OS_FREEBSD) AND (ARCH_AMD64 OR ARCH_ARM OR ARCH_PPC64LE)) OR
(OS_DARWIN AND CMAKE_BUILD_TYPE STREQUAL "RelWithDebInfo")
(OS_DARWIN AND (CMAKE_BUILD_TYPE STREQUAL "RelWithDebInfo" OR CMAKE_BUILD_TYPE STREQUAL "Debug"))
))
if (ENABLE_JEMALLOC)
message (${RECONFIGURE_MESSAGE_LEVEL}

View File

@ -229,6 +229,7 @@ status()
case "$1" in
status)
status
exit 0
;;
esac

View File

@ -22,7 +22,7 @@ services:
entrypoint: /etc/bootstrap.sh -d
hdfskerberos:
image: yandex/clickhouse-kerberos-kdc:${DOCKER_KERBEROS_KDC_TAG}
image: yandex/clickhouse-kerberos-kdc:${DOCKER_KERBEROS_KDC_TAG:-latest}
hostname: hdfskerberos
volumes:
- ${KERBERIZED_HDFS_DIR}/secrets:/tmp/keytab

View File

@ -8,6 +8,7 @@ RUN apt-get update -y && \
python3-wheel \
brotli \
netcat-openbsd \
postgresql-client \
zstd
RUN python3 -m pip install \

View File

@ -32,7 +32,7 @@ CREATE TABLE `ontime`
`Reporting_Airline` String,
`DOT_ID_Reporting_Airline` Int32,
`IATA_CODE_Reporting_Airline` String,
`Tail_Number` Int32,
`Tail_Number` String,
`Flight_Number_Reporting_Airline` String,
`OriginAirportID` Int32,
`OriginAirportSeqID` Int32,

View File

@ -18,6 +18,8 @@ To apply a CatBoost model in ClickHouse:
For more information about training CatBoost models, see [Training and applying models](https://catboost.ai/docs/features/training.html#training).
You can reload CatBoost models if the configuration was updated without restarting the server using [RELOAD MODEL](../sql-reference/statements/system.md#query_language-system-reload-model) and [RELOAD MODELS](../sql-reference/statements/system.md#query_language-system-reload-models) system queries.
## Prerequisites {#prerequisites}
If you do not have the [Docker](https://docs.docker.com/install/) yet, install it.

View File

@ -0,0 +1,107 @@
---
toc_priority: 146
toc_title: intervalLengthSum
---
# intervalLengthSum {#agg_function-intervallengthsum}
Calculates the total length of union of all ranges (segments on numeric axis).
**Syntax**
``` sql
intervalLengthSum(start, end)
```
**Arguments**
- `start` — The starting value of the interval. [Int32](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [Int64](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [UInt32](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [UInt64](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [Float32](../../../sql-reference/data-types/float.md#float32-float64), [Float64](../../../sql-reference/data-types/float.md#float32-float64), [DateTime](../../../sql-reference/data-types/datetime.md#data_type-datetime) or [Date](../../../sql-reference/data-types/date.md#data_type-date).
- `end` — The ending value of the interval. [Int32](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [Int64](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [UInt32](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [UInt64](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [Float32](../../../sql-reference/data-types/float.md#float32-float64), [Float64](../../../sql-reference/data-types/float.md#float32-float64), [DateTime](../../../sql-reference/data-types/datetime.md#data_type-datetime) or [Date](../../../sql-reference/data-types/date.md#data_type-date).
!!! info "Note"
Arguments must be of the same data type. Otherwise, an exception will be thrown.
**Returned value**
- Total length of union of all ranges (segments on numeric axis). Depending on the type of the argument, the return value may be [UInt64](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64) or [Float64](../../../sql-reference/data-types/float.md#float32-float64) type.
**Examples**
1. Input table:
``` text
┌─id─┬─start─┬─end─┐
│ a │ 1.1 │ 2.9 │
│ a │ 2.5 │ 3.2 │
│ a │ 4 │ 5 │
└────┴───────┴─────┘
```
In this example, the arguments of the Float32 type are used. The function returns a value of the Float64 type.
Result is the sum of lengths of intervals `[1.1, 3.2]` (union of `[1.1, 2.9]` and `[2.5, 3.2]`) and `[4, 5]`
Query:
``` sql
SELECT id, intervalLengthSum(start, end), toTypeName(intervalLengthSum(start, end)) FROM fl_interval GROUP BY id ORDER BY id;
```
Result:
``` text
┌─id─┬─intervalLengthSum(start, end)─┬─toTypeName(intervalLengthSum(start, end))─┐
│ a │ 3.1 │ Float64 │
└────┴───────────────────────────────┴───────────────────────────────────────────┘
```
2. Input table:
``` text
┌─id─┬───────────────start─┬─────────────────end─┐
│ a │ 2020-01-01 01:12:30 │ 2020-01-01 02:10:10 │
│ a │ 2020-01-01 02:05:30 │ 2020-01-01 02:50:31 │
│ a │ 2020-01-01 03:11:22 │ 2020-01-01 03:23:31 │
└────┴─────────────────────┴─────────────────────┘
```
In this example, the arguments of the DateTime type are used. The function returns a value in seconds.
Query:
``` sql
SELECT id, intervalLengthSum(start, end), toTypeName(intervalLengthSum(start, end)) FROM dt_interval GROUP BY id ORDER BY id;
```
Result:
``` text
┌─id─┬─intervalLengthSum(start, end)─┬─toTypeName(intervalLengthSum(start, end))─┐
│ a │ 6610 │ UInt64 │
└────┴───────────────────────────────┴───────────────────────────────────────────┘
```
3. Input table:
``` text
┌─id─┬──────start─┬────────end─┐
│ a │ 2020-01-01 │ 2020-01-04 │
│ a │ 2020-01-12 │ 2020-01-18 │
└────┴────────────┴────────────┘
```
In this example, the arguments of the Date type are used. The function returns a value in days.
Query:
``` sql
SELECT id, intervalLengthSum(start, end), toTypeName(intervalLengthSum(start, end)) FROM date_interval GROUP BY id ORDER BY id;
```
Result:
``` text
┌─id─┬─intervalLengthSum(start, end)─┬─toTypeName(intervalLengthSum(start, end))─┐
│ a │ 9 │ UInt64 │
└────┴───────────────────────────────┴───────────────────────────────────────────┘
```

View File

@ -159,7 +159,7 @@ Configuration fields:
| Tag | Description | Required |
|------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|
| `name` | Column name. | Yes |
| `type` | ClickHouse data type.<br/>ClickHouse tries to cast value from dictionary to the specified data type. For example, for MySQL, the field might be `TEXT`, `VARCHAR`, or `BLOB` in the MySQL source table, but it can be uploaded as `String` in ClickHouse.<br/>[Nullable](../../../sql-reference/data-types/nullable.md) is currently supported for [Flat](external-dicts-dict-layout.md#flat), [Hashed](external-dicts-dict-layout.md#dicts-external_dicts_dict_layout-hashed), [ComplexKeyHashed](external-dicts-dict-layout.md#complex-key-hashed), [Direct](external-dicts-dict-layout.md#direct), [ComplexKeyDirect](external-dicts-dict-layout.md#complex-key-direct), [RangeHashed](external-dicts-dict-layout.md#range-hashed), [Polygon](external-dicts-dict-polygon.md), [Cache](external-dicts-dict-layout.md#cache), [ComplexKeyCache](external-dicts-dict-layout.md#complex-key-cache), [SSDCache](external-dicts-dict-layout.md#ssd-cache), [SSDComplexKeyCache](external-dicts-dict-layout.md#complex-key-ssd-cache) dictionaries. In [IPTrie](external-dicts-dict-layout.md#ip-trie) dictionaries `Nullable` types are not supported. | Yes |
| `type` | ClickHouse data type: [UInt8](../../../sql-reference/data-types/int-uint.md), [UInt16](../../../sql-reference/data-types/int-uint.md), [UInt32](../../../sql-reference/data-types/int-uint.md), [UInt64](../../../sql-reference/data-types/int-uint.md), [Int8](../../../sql-reference/data-types/int-uint.md), [Int16](../../../sql-reference/data-types/int-uint.md), [Int32](../../../sql-reference/data-types/int-uint.md), [Int64](../../../sql-reference/data-types/int-uint.md), [Float32](../../../sql-reference/data-types/float.md), [Float64](../../../sql-reference/data-types/float.md), [UUID](../../../sql-reference/data-types/uuid.md), [Decimal32](../../../sql-reference/data-types/decimal.md), [Decimal64](../../../sql-reference/data-types/decimal.md), [Decimal128](../../../sql-reference/data-types/decimal.md), [Decimal256](../../../sql-reference/data-types/decimal.md), [String](../../../sql-reference/data-types/string.md).<br/>ClickHouse tries to cast value from dictionary to the specified data type. For example, for MySQL, the field might be `TEXT`, `VARCHAR`, or `BLOB` in the MySQL source table, but it can be uploaded as `String` in ClickHouse.<br/>[Nullable](../../../sql-reference/data-types/nullable.md) is currently supported for [Flat](external-dicts-dict-layout.md#flat), [Hashed](external-dicts-dict-layout.md#dicts-external_dicts_dict_layout-hashed), [ComplexKeyHashed](external-dicts-dict-layout.md#complex-key-hashed), [Direct](external-dicts-dict-layout.md#direct), [ComplexKeyDirect](external-dicts-dict-layout.md#complex-key-direct), [RangeHashed](external-dicts-dict-layout.md#range-hashed), [Polygon](external-dicts-dict-polygon.md), [Cache](external-dicts-dict-layout.md#cache), [ComplexKeyCache](external-dicts-dict-layout.md#complex-key-cache), [SSDCache](external-dicts-dict-layout.md#ssd-cache), [SSDComplexKeyCache](external-dicts-dict-layout.md#complex-key-ssd-cache) dictionaries. In [IPTrie](external-dicts-dict-layout.md#ip-trie) dictionaries `Nullable` types are not supported. | Yes |
| `null_value` | Default value for a non-existing element.<br/>In the example, it is an empty string. [NULL](../../syntax.md#null-literal) value can be used only for the `Nullable` types (see the previous line with types description). | Yes |
| `expression` | [Expression](../../../sql-reference/syntax.md#syntax-expressions) that ClickHouse executes on the value.<br/>The expression can be a column name in the remote SQL database. Thus, you can use it to create an alias for the remote column.<br/><br/>Default value: no expression. | No |
| <a name="hierarchical-dict-attr"></a> `hierarchical` | If `true`, the attribute contains the value of a parent key for the current key. See [Hierarchical Dictionaries](../../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-hierarchical.md).<br/><br/>Default value: `false`. | No |

View File

@ -265,7 +265,7 @@ Result:
```
!!! attention "Attention"
The return type `toStartOf*` functions described below is `Date` or `DateTime`. Though these functions can take `DateTime64` as an argument, passing them a `DateTime64` that is out of normal range (years 1970 - 2105) will give incorrect result.
The return type `toStartOf*` functions described below is `Date` or `DateTime`. Though these functions can take `DateTime64` as an argument, passing them a `DateTime64` that is out of the normal range (years 1925 - 2283) will give an incorrect result.
## toStartOfYear {#tostartofyear}

View File

@ -6,7 +6,7 @@ toc_title: RENAME
# RENAME Statement {#misc_operations-rename}
## RENAME DATABASE {#misc_operations-rename_database}
Renames database, support only for Atomic database engine
Renames database, it is supported only for Atomic database engine.
```
RENAME DATABASE atomic_database1 TO atomic_database2 [ON CLUSTER cluster]

View File

@ -10,6 +10,8 @@ The list of available `SYSTEM` statements:
- [RELOAD EMBEDDED DICTIONARIES](#query_language-system-reload-emdedded-dictionaries)
- [RELOAD DICTIONARIES](#query_language-system-reload-dictionaries)
- [RELOAD DICTIONARY](#query_language-system-reload-dictionary)
- [RELOAD MODELS](#query_language-system-reload-models)
- [RELOAD MODEL](#query_language-system-reload-model)
- [DROP DNS CACHE](#query_language-system-drop-dns-cache)
- [DROP MARK CACHE](#query_language-system-drop-mark-cache)
- [DROP UNCOMPRESSED CACHE](#query_language-system-drop-uncompressed-cache)
@ -60,6 +62,26 @@ The status of the dictionary can be checked by querying the `system.dictionaries
SELECT name, status FROM system.dictionaries;
```
## RELOAD MODELS {#query_language-system-reload-models}
Reloads all [CatBoost](../../guides/apply-catboost-model.md#applying-catboost-model-in-clickhouse) models if the configuration was updated without restarting the server.
**Syntax**
```sql
SYSTEM RELOAD MODELS
```
## RELOAD MODEL {#query_language-system-reload-model}
Completely reloads a CatBoost model `model_name` if the configuration was updated without restarting the server.
**Syntax**
```sql
SYSTEM RELOAD MODEL <model_name>
```
## DROP DNS CACHE {#query_language-system-drop-dns-cache}
Resets ClickHouses internal DNS cache. Sometimes (for old ClickHouse versions) it is necessary to use this command when changing the infrastructure (changing the IP address of another ClickHouse server or the server used by dictionaries).

View File

@ -114,14 +114,14 @@ FROM s3('https://storage.yandexcloud.net/my-test-bucket-768/big_prefix/file-{000
Insert data into file `test-data.csv.gz`:
``` sql
INSERT INTO s3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
INSERT INTO FUNCTION s3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
VALUES ('test-data', 1), ('test-data-2', 2);
```
Insert data into file `test-data.csv.gz` from existing table:
``` sql
INSERT INTO s3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
INSERT INTO FUNCTION s3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
SELECT name, value FROM existing_table;
```

View File

@ -6,9 +6,7 @@ toc_title: Window Functions
# [experimental] Window Functions
!!! warning "Warning"
This is an experimental feature that is currently in development and is not ready
for general use. It will change in unpredictable backwards-incompatible ways in
the future releases. Set `allow_experimental_window_functions = 1` to enable it.
This is an experimental feature that is currently in development and is not ready for general use. It will change in unpredictable backwards-incompatible ways in the future releases. Set `allow_experimental_window_functions = 1` to enable it.
ClickHouse supports the standard grammar for defining windows and window functions. The following features are currently supported:

View File

@ -18,6 +18,8 @@ toc_title: "Применение модели CatBoost в ClickHouse"
Подробнее об обучении моделей в CatBoost, см. [Обучение и применение моделей](https://catboost.ai/docs/features/training.html#training).
Вы можете перегрузить модели CatBoost, если их конфигурация была обновлена, без перезагрузки сервера. Для этого используйте системные запросы [RELOAD MODEL](../sql-reference/statements/system.md#query_language-system-reload-model) и [RELOAD MODELS](../sql-reference/statements/system.md#query_language-system-reload-models).
## Перед началом работы {#prerequisites}
Если у вас еще нет [Docker](https://docs.docker.com/install/), установите его.

View File

@ -0,0 +1,107 @@
---
toc_priority: 146
toc_title: intervalLengthSum
---
# intervalLengthSum {#agg_function-intervallengthsum}
Вычисляет длину объединения интервалов (отрезков на числовой оси).
**Синтаксис**
``` sql
intervalLengthSum(start, end)
```
**Аргументы**
- `start` — начальное значение интервала. [Int32](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [Int64](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [UInt32](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [UInt64](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [Float32](../../../sql-reference/data-types/float.md#float32-float64), [Float64](../../../sql-reference/data-types/float.md#float32-float64), [DateTime](../../../sql-reference/data-types/datetime.md#data_type-datetime) или [Date](../../../sql-reference/data-types/date.md#data_type-date).
- `end` — конечное значение интервала. [Int32](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [Int64](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [UInt32](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [UInt64](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [Float32](../../../sql-reference/data-types/float.md#float32-float64), [Float64](../../../sql-reference/data-types/float.md#float32-float64), [DateTime](../../../sql-reference/data-types/datetime.md#data_type-datetime) или [Date](../../../sql-reference/data-types/date.md#data_type-date).
!!! info "Примечание"
Аргументы должны быть одного типа. В противном случае ClickHouse сгенерирует исключение.
**Возвращаемое значение**
- Длина объединения всех интервалов (отрезков на числовой оси). В зависимости от типа аргумента возвращаемое значение может быть типа [UInt64](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64) или [Float64](../../../sql-reference/data-types/float.md#float32-float64).
**Примеры**
1. Входная таблица:
``` text
┌─id─┬─start─┬─end─┐
│ a │ 1.1 │ 2.9 │
│ a │ 2.5 │ 3.2 │
│ a │ 4 │ 5 │
└────┴───────┴─────┘
```
В этом примере используются аргументы типа Float32. Функция возвращает значение типа Float64.
Результатом функции будет сумма длин интервалов `[1.1, 3.2]` (объединение `[1.1, 2.9]` и `[2.5, 3.2]`) и `[4, 5]`
Запрос:
``` sql
SELECT id, intervalLengthSum(start, end), toTypeName(intervalLengthSum(start, end)) FROM fl_interval GROUP BY id ORDER BY id;
```
Результат:
``` text
┌─id─┬─intervalLengthSum(start, end)─┬─toTypeName(intervalLengthSum(start, end))─┐
│ a │ 3.1 │ Float64 │
└────┴───────────────────────────────┴───────────────────────────────────────────┘
```
2. Входная таблица:
``` text
┌─id─┬───────────────start─┬─────────────────end─┐
│ a │ 2020-01-01 01:12:30 │ 2020-01-01 02:10:10 │
│ a │ 2020-01-01 02:05:30 │ 2020-01-01 02:50:31 │
│ a │ 2020-01-01 03:11:22 │ 2020-01-01 03:23:31 │
└────┴─────────────────────┴─────────────────────┘
```
В этом примере используются аргументы типа DateTime. Функция возвращает значение, выраженное в секундах.
Запрос:
``` sql
SELECT id, intervalLengthSum(start, end), toTypeName(intervalLengthSum(start, end)) FROM dt_interval GROUP BY id ORDER BY id;
```
Результат:
``` text
┌─id─┬─intervalLengthSum(start, end)─┬─toTypeName(intervalLengthSum(start, end))─┐
│ a │ 6610 │ UInt64 │
└────┴───────────────────────────────┴───────────────────────────────────────────┘
```
3. Входная таблица:
``` text
┌─id─┬──────start─┬────────end─┐
│ a │ 2020-01-01 │ 2020-01-04 │
│ a │ 2020-01-12 │ 2020-01-18 │
└────┴────────────┴────────────┘
```
В этом примере используются аргументы типа Date. Функция возвращает значение, выраженное в днях.
Запрос:
``` sql
SELECT id, intervalLengthSum(start, end), toTypeName(intervalLengthSum(start, end)) FROM date_interval GROUP BY id ORDER BY id;
```
Результат:
``` text
┌─id─┬─intervalLengthSum(start, end)─┬─toTypeName(intervalLengthSum(start, end))─┐
│ a │ 9 │ UInt64 │
└────┴───────────────────────────────┴───────────────────────────────────────────┘
```

View File

@ -159,7 +159,7 @@ CREATE DICTIONARY somename (
| Тег | Описание | Обязательный |
|------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------|
| `name` | Имя столбца. | Да |
| `type` | Тип данных ClickHouse.<br/>ClickHouse пытается привести значение из словаря к заданному типу данных. Например, в случае MySQL, в таблице-источнике поле может быть `TEXT`, `VARCHAR`, `BLOB`, но загружено может быть как `String`. <br/>[Nullable](../../../sql-reference/data-types/nullable.md) в настоящее время поддерживается для словарей [Flat](external-dicts-dict-layout.md#flat), [Hashed](external-dicts-dict-layout.md#dicts-external_dicts_dict_layout-hashed), [ComplexKeyHashed](external-dicts-dict-layout.md#complex-key-hashed), [Direct](external-dicts-dict-layout.md#direct), [ComplexKeyDirect](external-dicts-dict-layout.md#complex-key-direct), [RangeHashed](external-dicts-dict-layout.md#range-hashed), [Polygon](external-dicts-dict-polygon.md), [Cache](external-dicts-dict-layout.md#cache), [ComplexKeyCache](external-dicts-dict-layout.md#complex-key-cache), [SSDCache](external-dicts-dict-layout.md#ssd-cache), [SSDComplexKeyCache](external-dicts-dict-layout.md#complex-key-ssd-cache). Для словарей [IPTrie](external-dicts-dict-layout.md#ip-trie) `Nullable`-типы не поддерживаются. | Да |
| `type` | Тип данных ClickHouse: [UInt8](../../../sql-reference/data-types/int-uint.md), [UInt16](../../../sql-reference/data-types/int-uint.md), [UInt32](../../../sql-reference/data-types/int-uint.md), [UInt64](../../../sql-reference/data-types/int-uint.md), [Int8](../../../sql-reference/data-types/int-uint.md), [Int16](../../../sql-reference/data-types/int-uint.md), [Int32](../../../sql-reference/data-types/int-uint.md), [Int64](../../../sql-reference/data-types/int-uint.md), [Float32](../../../sql-reference/data-types/float.md), [Float64](../../../sql-reference/data-types/float.md), [UUID](../../../sql-reference/data-types/uuid.md), [Decimal32](../../../sql-reference/data-types/decimal.md), [Decimal64](../../../sql-reference/data-types/decimal.md), [Decimal128](../../../sql-reference/data-types/decimal.md), [Decimal256](../../../sql-reference/data-types/decimal.md), [String](../../../sql-reference/data-types/string.md).<br/>ClickHouse пытается привести значение из словаря к заданному типу данных. Например, в случае MySQL, в таблице-источнике поле может быть `TEXT`, `VARCHAR`, `BLOB`, но загружено может быть как `String`. <br/>[Nullable](../../../sql-reference/data-types/nullable.md) в настоящее время поддерживается для словарей [Flat](external-dicts-dict-layout.md#flat), [Hashed](external-dicts-dict-layout.md#dicts-external_dicts_dict_layout-hashed), [ComplexKeyHashed](external-dicts-dict-layout.md#complex-key-hashed), [Direct](external-dicts-dict-layout.md#direct), [ComplexKeyDirect](external-dicts-dict-layout.md#complex-key-direct), [RangeHashed](external-dicts-dict-layout.md#range-hashed), [Polygon](external-dicts-dict-polygon.md), [Cache](external-dicts-dict-layout.md#cache), [ComplexKeyCache](external-dicts-dict-layout.md#complex-key-cache), [SSDCache](external-dicts-dict-layout.md#ssd-cache), [SSDComplexKeyCache](external-dicts-dict-layout.md#complex-key-ssd-cache). Для словарей [IPTrie](external-dicts-dict-layout.md#ip-trie) `Nullable`-типы не поддерживаются. | Да |
| `null_value` | Значение по умолчанию для несуществующего элемента.<br/>В примере это пустая строка. Значение [NULL](../../syntax.md#null-literal) можно указывать только для типов `Nullable` (см. предыдущую строку с описанием типов). | Да |
| `expression` | [Выражение](../../syntax.md#syntax-expressions), которое ClickHouse выполняет со значением.<br/>Выражением может быть имя столбца в удаленной SQL базе. Таким образом, вы можете использовать его для создания псевдонима удаленного столбца.<br/><br/>Значение по умолчанию: нет выражения. | Нет |
| <a name="hierarchical-dict-attr"></a> `hierarchical` | Если `true`, то атрибут содержит ключ предка для текущего элемента. Смотрите [Иерархические словари](external-dicts-dict-hierarchical.md).<br/><br/>Значение по умолчанию: `false`. | Нет |

View File

@ -264,6 +264,9 @@ SELECT toUnixTimestamp('2017-11-05 08:07:47', 'Asia/Tokyo') AS unix_timestamp;
└────────────────┘
```
!!! attention "Attention"
`Date` или `DateTime` это возвращаемый тип функций `toStartOf*`, который описан ниже. Несмотря на то, что эти функции могут принимать `DateTime64` в качестве аргумента, если переданное значение типа `DateTime64` выходит за пределы нормального диапазона (с 1925 по 2283 год), то это даст неверный результат.
## toStartOfYear {#tostartofyear}
Округляет дату или дату-с-временем вниз до первого дня года.

View File

@ -6,7 +6,7 @@ toc_title: RENAME
# RENAME Statement {#misc_operations-rename}
## RENAME DATABASE {#misc_operations-rename_database}
Переименование базы данных
Переименовывает базу данных, поддерживается только для движка базы данных Atomic.
```
RENAME DATABASE atomic_database1 TO atomic_database2 [ON CLUSTER cluster]

View File

@ -8,6 +8,8 @@ toc_title: SYSTEM
- [RELOAD EMBEDDED DICTIONARIES](#query_language-system-reload-emdedded-dictionaries)
- [RELOAD DICTIONARIES](#query_language-system-reload-dictionaries)
- [RELOAD DICTIONARY](#query_language-system-reload-dictionary)
- [RELOAD MODELS](#query_language-system-reload-models)
- [RELOAD MODEL](#query_language-system-reload-model)
- [DROP DNS CACHE](#query_language-system-drop-dns-cache)
- [DROP MARK CACHE](#query_language-system-drop-mark-cache)
- [DROP UNCOMPRESSED CACHE](#query_language-system-drop-uncompressed-cache)
@ -37,7 +39,7 @@ toc_title: SYSTEM
- [RESTART REPLICAS](#query_language-system-restart-replicas)
## RELOAD EMBEDDED DICTIONARIES] {#query_language-system-reload-emdedded-dictionaries}
Перегружет все [Встроенные словари](../dictionaries/internal-dicts.md).
Перегружает все [Встроенные словари](../dictionaries/internal-dicts.md).
По умолчанию встроенные словари выключены.
Всегда возвращает `Ok.`, вне зависимости от результата обновления встроенных словарей.
@ -57,6 +59,26 @@ toc_title: SYSTEM
SELECT name, status FROM system.dictionaries;
```
## RELOAD MODELS {#query_language-system-reload-models}
Перегружает все модели [CatBoost](../../guides/apply-catboost-model.md#applying-catboost-model-in-clickhouse), если их конфигурация была обновлена, без перезагрузки сервера.
**Синтаксис**
```sql
SYSTEM RELOAD MODELS
```
## RELOAD MODEL {#query_language-system-reload-model}
Полностью перегружает модель [CatBoost](../../guides/apply-catboost-model.md#applying-catboost-model-in-clickhouse) `model_name`, если ее конфигурация была обновлена, без перезагрузки сервера.
**Синтаксис**
```sql
SYSTEM RELOAD MODEL <model_name>
```
## DROP DNS CACHE {#query_language-system-drop-dns-cache}
Сбрасывает внутренний DNS кеш ClickHouse. Иногда (для старых версий ClickHouse) необходимо использовать эту команду при изменении инфраструктуры (смене IP адреса у другого ClickHouse сервера или сервера, используемого словарями).

File diff suppressed because it is too large Load Diff

View File

@ -18,12 +18,13 @@ public:
ClusterCopier(const String & task_path_,
const String & host_id_,
const String & proxy_database_name_,
ContextMutablePtr context_)
ContextMutablePtr context_,
Poco::Logger * log_)
: WithMutableContext(context_),
task_zookeeper_path(task_path_),
host_id(host_id_),
working_database_name(proxy_database_name_),
log(&Poco::Logger::get("ClusterCopier")) {}
log(log_) {}
void init();
@ -117,14 +118,14 @@ protected:
TaskStatus tryMoveAllPiecesToDestinationTable(const TaskTable & task_table, const String & partition_name);
/// Removes MATERIALIZED and ALIAS columns from create table query
static ASTPtr removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast);
static ASTPtr removeAliasMaterializedAndTTLColumnsFromCreateQuery(const ASTPtr & query_ast, bool allow_to_copy_alias_and_materialized_columns);
bool tryDropPartitionPiece(ShardPartition & task_partition, size_t current_piece_number,
const zkutil::ZooKeeperPtr & zookeeper, const CleanStateClock & clean_state_clock);
static constexpr UInt64 max_table_tries = 3;
static constexpr UInt64 max_shard_partition_tries = 3;
static constexpr UInt64 max_shard_partition_piece_tries_for_alter = 3;
static constexpr UInt64 max_shard_partition_piece_tries_for_alter = 10;
bool tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table);
@ -189,9 +190,7 @@ protected:
const ClusterPtr & cluster,
const String & query,
const Settings & current_settings,
PoolMode pool_mode = PoolMode::GET_ALL,
ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_SHARD,
UInt64 max_successful_executions_per_shard = 0) const;
ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_SHARD) const;
private:
String task_zookeeper_path;
@ -208,7 +207,6 @@ private:
ConfigurationPtr task_cluster_initial_config;
ConfigurationPtr task_cluster_current_config;
Coordination::Stat task_description_current_stat{};
std::unique_ptr<TaskCluster> task_cluster;

View File

@ -22,8 +22,9 @@ void ClusterCopierApp::initialize(Poco::Util::Application & self)
config_xml_path = config().getString("config-file");
task_path = config().getString("task-path");
log_level = config().getString("log-level", "trace");
log_level = config().getString("log-level", "info");
is_safe_mode = config().has("safe-mode");
is_status_mode = config().has("status");
if (config().has("copy-fault-probability"))
copy_fault_probability = std::max(std::min(config().getDouble("copy-fault-probability"), 1.0), 0.0);
if (config().has("move-fault-probability"))
@ -97,6 +98,7 @@ void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options)
.argument("base-dir").binding("base-dir"));
options.addOption(Poco::Util::Option("experimental-use-sample-offset", "", "Use SAMPLE OFFSET query instead of cityHash64(PRIMARY KEY) % n == k")
.argument("experimental-use-sample-offset").binding("experimental-use-sample-offset"));
options.addOption(Poco::Util::Option("status", "", "Get for status for current execution").binding("status"));
using Me = std::decay_t<decltype(*this)>;
options.addOption(Poco::Util::Option("help", "", "produce this help message").binding("help")
@ -106,6 +108,25 @@ void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options)
void ClusterCopierApp::mainImpl()
{
/// Status command
{
if (is_status_mode)
{
SharedContextHolder shared_context = Context::createShared();
auto context = Context::createGlobal(shared_context.get());
context->makeGlobalContext();
SCOPE_EXIT_SAFE(context->shutdown());
auto zookeeper = context->getZooKeeper();
auto status_json = zookeeper->get(task_path + "/status");
LOG_INFO(&logger(), "{}", status_json);
std::cout << status_json << std::endl;
context->resetZooKeeper();
return;
}
}
StatusFile status_file(process_path + "/status", StatusFile::write_full_info);
ThreadStatus thread_status;
@ -136,7 +157,7 @@ void ClusterCopierApp::mainImpl()
/// Initialize query scope just in case.
CurrentThread::QueryScope query_scope(context);
auto copier = std::make_unique<ClusterCopier>(task_path, host_id, default_database, context);
auto copier = std::make_unique<ClusterCopier>(task_path, host_id, default_database, context, log);
copier->setSafeMode(is_safe_mode);
copier->setCopyFaultProbability(copy_fault_probability);
copier->setMoveFaultProbability(move_fault_probability);

View File

@ -76,8 +76,9 @@ private:
std::string config_xml_path;
std::string task_path;
std::string log_level = "trace";
std::string log_level = "info";
bool is_safe_mode = false;
bool is_status_mode = false;
double copy_fault_probability = 0.0;
double move_fault_probability = 0.0;
bool is_help = false;

View File

@ -0,0 +1,65 @@
#pragma once
#include <Poco/JSON/Parser.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Stringifier.h>
#include <unordered_map>
#include <memory>
#include <string>
#include <iostream>
namespace DB
{
class StatusAccumulator
{
public:
struct TableStatus
{
size_t all_partitions_count;
size_t processed_partitions_count;
};
using Map = std::unordered_map<std::string, TableStatus>;
using MapPtr = std::shared_ptr<Map>;
static MapPtr fromJSON(std::string state_json)
{
Poco::JSON::Parser parser;
auto state = parser.parse(state_json).extract<Poco::JSON::Object::Ptr>();
MapPtr result_ptr = std::make_shared<Map>();
for (const auto & table_name : state->getNames())
{
auto table_status_json = state->getValue<std::string>(table_name);
auto table_status = parser.parse(table_status_json).extract<Poco::JSON::Object::Ptr>();
/// Map entry will be created if it is absent
auto & map_table_status = (*result_ptr)[table_name];
map_table_status.all_partitions_count += table_status->getValue<size_t>("all_partitions_count");
map_table_status.processed_partitions_count += table_status->getValue<size_t>("processed_partitions_count");
}
return result_ptr;
}
static std::string serializeToJSON(MapPtr statuses)
{
Poco::JSON::Object result_json;
for (const auto & [table_name, table_status] : *statuses)
{
Poco::JSON::Object status_json;
status_json.set("all_partitions_count", table_status.all_partitions_count);
status_json.set("processed_partitions_count", table_status.processed_partitions_count);
result_json.set(table_name, status_json);
}
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(result_json, oss);
auto result = oss.str();
return result;
}
};
}

View File

@ -77,6 +77,8 @@ inline void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfigurat
if (config.has(prefix + "settings"))
settings_common.loadSettingsFromConfig(prefix + "settings", config);
settings_common.prefer_localhost_replica = 0;
settings_pull = settings_common;
if (config.has(prefix + "settings_pull"))
settings_pull.loadSettingsFromConfig(prefix + "settings_pull", config);
@ -92,11 +94,15 @@ inline void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfigurat
/// Override important settings
settings_pull.readonly = 1;
settings_push.insert_distributed_sync = 1;
settings_pull.prefer_localhost_replica = false;
settings_push.insert_distributed_sync = true;
settings_push.prefer_localhost_replica = false;
set_default_value(settings_pull.load_balancing, LoadBalancing::NEAREST_HOSTNAME);
set_default_value(settings_pull.max_threads, 1);
set_default_value(settings_pull.max_block_size, 8192UL);
set_default_value(settings_pull.preferred_block_size_bytes, 0);
set_default_value(settings_push.insert_distributed_timeout, 0);
set_default_value(settings_push.replication_alter_partitions_sync, 2);
}

View File

@ -36,27 +36,33 @@ struct TaskTable
String getPartitionAttachIsDonePath(const String & partition_name) const;
String getPartitionPiecePath(const String & partition_name, const size_t piece_number) const;
String getPartitionPiecePath(const String & partition_name, size_t piece_number) const;
String getCertainPartitionIsDirtyPath(const String & partition_name) const;
String getCertainPartitionPieceIsDirtyPath(const String & partition_name, const size_t piece_number) const;
String getCertainPartitionPieceIsDirtyPath(const String & partition_name, size_t piece_number) const;
String getCertainPartitionIsCleanedPath(const String & partition_name) const;
String getCertainPartitionPieceIsCleanedPath(const String & partition_name, const size_t piece_number) const;
String getCertainPartitionPieceIsCleanedPath(const String & partition_name, size_t piece_number) const;
String getCertainPartitionTaskStatusPath(const String & partition_name) const;
String getCertainPartitionPieceTaskStatusPath(const String & partition_name, const size_t piece_number) const;
String getCertainPartitionPieceTaskStatusPath(const String & partition_name, size_t piece_number) const;
bool isReplicatedTable() const { return is_replicated_table; }
/// These nodes are used for check-status option
String getStatusAllPartitionCount() const;
String getStatusProcessedPartitionsCount() const;
/// Partitions will be split into number-of-splits pieces.
/// Each piece will be copied independently. (10 by default)
size_t number_of_splits;
bool allow_to_copy_alias_and_materialized_columns{false};
bool allow_to_drop_target_partitions{false};
String name_in_config;
/// Used as task ID
@ -83,7 +89,7 @@ struct TaskTable
String engine_push_zk_path;
bool is_replicated_table;
ASTPtr rewriteReplicatedCreateQueryToPlain();
ASTPtr rewriteReplicatedCreateQueryToPlain() const;
/*
* A Distributed table definition used to split data
@ -181,6 +187,7 @@ struct TaskShard
/// Last CREATE TABLE query of the table of the shard
ASTPtr current_pull_table_create_query;
ASTPtr current_push_table_create_query;
/// Internal distributed tables
DatabaseAndTableName table_read_shard;
@ -242,6 +249,16 @@ inline String TaskTable::getCertainPartitionPieceTaskStatusPath(const String & p
return getPartitionPiecePath(partition_name, piece_number) + "/shards";
}
inline String TaskTable::getStatusAllPartitionCount() const
{
return task_cluster.task_zookeeper_path + "/status/all_partitions_count";
}
inline String TaskTable::getStatusProcessedPartitionsCount() const
{
return task_cluster.task_zookeeper_path + "/status/processed_partitions_count";
}
inline TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config,
const String & prefix_, const String & table_key)
: task_cluster(parent)
@ -250,7 +267,10 @@ inline TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConf
name_in_config = table_key;
number_of_splits = config.getUInt64(table_prefix + "number_of_splits", 10);
number_of_splits = config.getUInt64(table_prefix + "number_of_splits", 3);
allow_to_copy_alias_and_materialized_columns = config.getBool(table_prefix + "allow_to_copy_alias_and_materialized_columns", false);
allow_to_drop_target_partitions = config.getBool(table_prefix + "allow_to_drop_target_partitions", false);
cluster_pull_name = config.getString(table_prefix + "cluster_pull");
cluster_push_name = config.getString(table_prefix + "cluster_push");
@ -343,7 +363,7 @@ inline void TaskTable::initShards(RandomEngine && random_engine)
std::uniform_int_distribution<UInt8> get_urand(0, std::numeric_limits<UInt8>::max());
// Compute the priority
for (auto & shard_info : cluster_pull->getShardsInfo())
for (const auto & shard_info : cluster_pull->getShardsInfo())
{
TaskShardPtr task_shard = std::make_shared<TaskShard>(*this, shard_info);
const auto & replicas = cluster_pull->getShardsAddresses().at(task_shard->indexInCluster());
@ -369,7 +389,7 @@ inline void TaskTable::initShards(RandomEngine && random_engine)
local_shards.assign(all_shards.begin(), it_first_remote);
}
inline ASTPtr TaskTable::rewriteReplicatedCreateQueryToPlain()
inline ASTPtr TaskTable::rewriteReplicatedCreateQueryToPlain() const
{
ASTPtr prev_engine_push_ast = engine_push_ast->clone();
@ -383,10 +403,16 @@ inline ASTPtr TaskTable::rewriteReplicatedCreateQueryToPlain()
{
auto & replicated_table_arguments = new_engine_ast.arguments->children;
/// In some cases of Atomic database engine usage ReplicatedMergeTree tables
/// could be created without arguments.
if (!replicated_table_arguments.empty())
{
/// Delete first two arguments of Replicated...MergeTree() table.
replicated_table_arguments.erase(replicated_table_arguments.begin());
replicated_table_arguments.erase(replicated_table_arguments.begin());
}
}
return new_storage_ast.clone();
}
@ -400,7 +426,7 @@ inline String DB::TaskShard::getDescription() const
inline String DB::TaskShard::getHostNameExample() const
{
auto & replicas = task_table.cluster_pull->getShardsAddresses().at(indexInCluster());
const auto & replicas = task_table.cluster_pull->getShardsAddresses().at(indexInCluster());
return replicas.at(0).readableString();
}

View File

@ -19,6 +19,7 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/getHashOfLoadedBinary.h>
#include <Common/IO.h>
#include <common/phdr_cache.h>
#include <ext/scope_guard.h>
@ -172,11 +173,11 @@ enum class InstructionFail
AVX512 = 8
};
std::pair<const char *, size_t> instructionFailToString(InstructionFail fail)
auto instructionFailToString(InstructionFail fail)
{
switch (fail)
{
#define ret(x) return std::make_pair(x, ARRAY_SIZE(x) - 1)
#define ret(x) return std::make_tuple(STDERR_FILENO, x, ARRAY_SIZE(x) - 1)
case InstructionFail::NONE:
ret("NONE");
case InstructionFail::SSE3:
@ -260,28 +261,12 @@ void checkRequiredInstructionsImpl(volatile InstructionFail & fail)
fail = InstructionFail::NONE;
}
/// This function is safe to use in static initializers.
void writeErrorLen(const char * data, size_t size)
{
while (size != 0)
{
ssize_t res = ::write(STDERR_FILENO, data, size);
if ((-1 == res || 0 == res) && errno != EINTR)
_Exit(1);
if (res > 0)
{
data += res;
size -= res;
}
}
}
/// Macros to avoid using strlen(), since it may fail if SSE is not supported.
#define writeError(data) do \
{ \
static_assert(__builtin_constant_p(data)); \
writeErrorLen(data, ARRAY_SIZE(data) - 1); \
if (!writeRetry(STDERR_FILENO, data, ARRAY_SIZE(data) - 1)) \
_Exit(1); \
} while (false)
/// Check SSE and others instructions availability. Calls exit on fail.
@ -310,7 +295,8 @@ void checkRequiredInstructions()
if (sigsetjmp(jmpbuf, 1))
{
writeError("Instruction check fail. The CPU does not support ");
std::apply(writeErrorLen, instructionFailToString(fail));
if (!std::apply(writeRetry, instructionFailToString(fail)))
_Exit(1);
writeError(" instruction set.\n");
_Exit(1);
}

View File

@ -705,6 +705,8 @@
"yandex.ru" -> "yandex.ru:443", "yandex.ru:80" etc. is allowed, but "yandex.ru:80" -> only "yandex.ru:80" is allowed.
If the host is specified as IP address, it is checked as specified in URL. Example: "[2a02:6b8:a::a]".
If there are redirects and support for redirects is enabled, every redirect (the Location field) is checked.
Host should be specified using the host xml tag:
<host>yandex.ru</host>
-->
<!-- Regular expression can be specified. RE2 engine is used for regexps.

View File

@ -1,5 +1,5 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionSegmentLengthSum.h>
#include <AggregateFunctions/AggregateFunctionIntervalLengthSum.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/Helpers.h>
#include <DataTypes/DataTypeDate.h>
@ -22,7 +22,7 @@ namespace
{
template <template <typename> class Data>
AggregateFunctionPtr
createAggregateFunctionSegmentLengthSum(const std::string & name, const DataTypes & arguments, const Array &, const Settings *)
createAggregateFunctionIntervalLengthSum(const std::string & name, const DataTypes & arguments, const Array &, const Settings *)
{
if (arguments.size() != 2)
throw Exception(
@ -32,35 +32,35 @@ namespace
if (WhichDataType{args.begin()[0]}.idx != WhichDataType{args.begin()[1]}.idx)
throw Exception(
"Illegal type " + args.begin()[0]->getName() + " and " + args.begin()[1]->getName() + " of arguments of aggregate function "
+ name + ", there two arguments should have same DataType",
"Illegal types " + args.begin()[0]->getName() + " and " + args.begin()[1]->getName() + " of arguments of aggregate function "
+ name + ", both arguments should have same data type",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
for (const auto & arg : args)
{
if (!isNativeNumber(arg) && !isDate(arg) && !isDateTime(arg) && !isDateTime64(arg))
if (!isNativeNumber(arg) && !isDate(arg) && !isDateTime(arg))
throw Exception(
"Illegal type " + arg->getName() + " of argument of aggregate function " + name
+ ", must be Number, Date, DateTime or DateTime64",
+ ", must be native integral type, Date/DateTime or Float",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
AggregateFunctionPtr res(createWithBasicNumberOrDateOrDateTime<AggregateFunctionSegmentLengthSum, Data>(*arguments[0], arguments));
AggregateFunctionPtr res(createWithBasicNumberOrDateOrDateTime<AggregateFunctionIntervalLengthSum, Data>(*arguments[0], arguments));
if (res)
return res;
throw Exception(
"Illegal type " + arguments.front().get()->getName() + " of first argument of aggregate function " + name
+ ", must be Native Unsigned Number",
"Illegal type " + arguments.front().get()->getName() + " of argument of aggregate function " + name
+ ", must be native integral type, Date/DateTime or Float",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
}
void registerAggregateFunctionSegmentLengthSum(AggregateFunctionFactory & factory)
void registerAggregateFunctionIntervalLengthSum(AggregateFunctionFactory & factory)
{
factory.registerFunction("segmentLengthSum", createAggregateFunctionSegmentLengthSum<AggregateFunctionSegmentLengthSumData>);
factory.registerFunction("intervalLengthSum", createAggregateFunctionIntervalLengthSum<AggregateFunctionIntervalLengthSumData>);
}
}

View File

@ -1,22 +1,39 @@
#pragma once
#include <unordered_set>
#include <AggregateFunctions/AggregateFunctionNull.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/ArenaAllocator.h>
#include <Common/assert_cast.h>
#include <AggregateFunctions/AggregateFunctionNull.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <unordered_set>
namespace DB
{
template <typename T>
struct AggregateFunctionSegmentLengthSumData
namespace ErrorCodes
{
extern const int TOO_LARGE_ARRAY_SIZE;
}
/**
* Calculate total length of intervals without intersections. Each interval is the pair of numbers [begin, end];
* Return UInt64 for integral types (UInt/Int*, Date/DateTime) and return Float64 for Float*.
*
* Implementation simply stores intervals sorted by beginning and sums lengths at final.
*/
template <typename T>
struct AggregateFunctionIntervalLengthSumData
{
constexpr static size_t MAX_ARRAY_SIZE = 0xFFFFFF;
using Segment = std::pair<T, T>;
using Segments = PODArrayWithStackMemory<Segment, 64>;
@ -24,18 +41,16 @@ struct AggregateFunctionSegmentLengthSumData
Segments segments;
size_t size() const { return segments.size(); }
void add(T start, T end)
void add(T begin, T end)
{
if (sorted && segments.size() > 0)
if (sorted && !segments.empty())
{
sorted = segments.back().first <= start;
sorted = segments.back().first <= begin;
}
segments.emplace_back(start, end);
segments.emplace_back(begin, end);
}
void merge(const AggregateFunctionSegmentLengthSumData & other)
void merge(const AggregateFunctionIntervalLengthSumData & other)
{
if (other.segments.empty())
return;
@ -46,7 +61,9 @@ struct AggregateFunctionSegmentLengthSumData
/// either sort whole container or do so partially merging ranges afterwards
if (!sorted && !other.sorted)
std::stable_sort(std::begin(segments), std::end(segments));
{
std::sort(std::begin(segments), std::end(segments));
}
else
{
const auto begin = std::begin(segments);
@ -54,10 +71,10 @@ struct AggregateFunctionSegmentLengthSumData
const auto end = std::end(segments);
if (!sorted)
std::stable_sort(begin, middle);
std::sort(begin, middle);
if (!other.sorted)
std::stable_sort(middle, end);
std::sort(middle, end);
std::inplace_merge(begin, middle, end);
}
@ -69,7 +86,7 @@ struct AggregateFunctionSegmentLengthSumData
{
if (!sorted)
{
std::stable_sort(std::begin(segments), std::end(segments));
std::sort(std::begin(segments), std::end(segments));
sorted = true;
}
}
@ -93,28 +110,30 @@ struct AggregateFunctionSegmentLengthSumData
size_t size;
readBinary(size, buf);
if (unlikely(size > MAX_ARRAY_SIZE))
throw Exception("Too large array size", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
segments.clear();
segments.reserve(size);
T start, end;
Segment segment;
for (size_t i = 0; i < size; ++i)
{
readBinary(start, buf);
readBinary(end, buf);
segments.emplace_back(start, end);
readBinary(segment.first, buf);
readBinary(segment.second, buf);
segments.emplace_back(segment);
}
}
};
template <typename T, typename Data>
class AggregateFunctionSegmentLengthSum final : public IAggregateFunctionDataHelper<Data, AggregateFunctionSegmentLengthSum<T, Data>>
class AggregateFunctionIntervalLengthSum final : public IAggregateFunctionDataHelper<Data, AggregateFunctionIntervalLengthSum<T, Data>>
{
private:
template <typename TResult>
TResult getSegmentLengthSum(Data & data) const
TResult getIntervalLengthSum(Data & data) const
{
if (data.size() == 0)
if (data.segments.empty())
return 0;
data.sort();
@ -123,8 +142,9 @@ private:
typename Data::Segment cur_segment = data.segments[0];
for (size_t i = 1; i < data.segments.size(); ++i)
for (size_t i = 1, sz = data.segments.size(); i < sz; ++i)
{
/// Check if current interval intersect with next one then add length, otherwise advance interval end
if (cur_segment.second < data.segments[i].first)
{
res += cur_segment.second - cur_segment.first;
@ -140,10 +160,10 @@ private:
}
public:
String getName() const override { return "segmentLengthSum"; }
String getName() const override { return "intervalLengthSum"; }
explicit AggregateFunctionSegmentLengthSum(const DataTypes & arguments)
: IAggregateFunctionDataHelper<Data, AggregateFunctionSegmentLengthSum<T, Data>>(arguments, {})
explicit AggregateFunctionIntervalLengthSum(const DataTypes & arguments)
: IAggregateFunctionDataHelper<Data, AggregateFunctionIntervalLengthSum<T, Data>>(arguments, {})
{
}
@ -167,9 +187,9 @@ public:
void add(AggregateDataPtr __restrict place, const IColumn ** columns, const size_t row_num, Arena *) const override
{
auto start = assert_cast<const ColumnVector<T> *>(columns[0])->getData()[row_num];
auto begin = assert_cast<const ColumnVector<T> *>(columns[0])->getData()[row_num];
auto end = assert_cast<const ColumnVector<T> *>(columns[1])->getData()[row_num];
this->data(place).add(start, end);
this->data(place).add(begin, end);
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
@ -190,9 +210,9 @@ public:
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
if constexpr (std::is_floating_point_v<T>)
assert_cast<ColumnFloat64 &>(to).getData().push_back(getSegmentLengthSum<Float64>(this->data(place)));
assert_cast<ColumnFloat64 &>(to).getData().push_back(getIntervalLengthSum<Float64>(this->data(place)));
else
assert_cast<ColumnUInt64 &>(to).getData().push_back(getSegmentLengthSum<UInt64>(this->data(place)));
assert_cast<ColumnUInt64 &>(to).getData().push_back(getIntervalLengthSum<UInt64>(this->data(place)));
}
};

View File

@ -637,7 +637,7 @@ struct AggregateFunctionAnyLastData : Data
template <typename Data>
struct AggregateFunctionAnyHeavyData : Data
{
size_t counter = 0;
UInt64 counter = 0;
using Self = AggregateFunctionAnyHeavyData;

View File

@ -64,7 +64,7 @@ void registerAggregateFunctionCombinatorDistinct(AggregateFunctionCombinatorFact
void registerWindowFunctions(AggregateFunctionFactory & factory);
void registerAggregateFunctionSegmentLengthSum(AggregateFunctionFactory &);
void registerAggregateFunctionIntervalLengthSum(AggregateFunctionFactory &);
void registerAggregateFunctions()
{
@ -116,7 +116,7 @@ void registerAggregateFunctions()
registerWindowFunctions(factory);
registerAggregateFunctionSegmentLengthSum(factory);
registerAggregateFunctionIntervalLengthSum(factory);
}
{

View File

@ -31,6 +31,7 @@ SRCS(
AggregateFunctionGroupUniqArray.cpp
AggregateFunctionHistogram.cpp
AggregateFunctionIf.cpp
AggregateFunctionIntervalLengthSum.cpp
AggregateFunctionMLMethod.cpp
AggregateFunctionMannWhitney.cpp
AggregateFunctionMax.cpp
@ -43,7 +44,6 @@ SRCS(
AggregateFunctionRankCorrelation.cpp
AggregateFunctionResample.cpp
AggregateFunctionRetention.cpp
AggregateFunctionSegmentLengthSum.cpp
AggregateFunctionSequenceMatch.cpp
AggregateFunctionSequenceNextNode.cpp
AggregateFunctionSimpleLinearRegression.cpp

View File

@ -424,7 +424,7 @@ void Connection::sendQuery(
if (method == "ZSTD")
level = settings->network_zstd_compression_level;
CompressionCodecFactory::instance().validateCodec(method, level, !settings->allow_suspicious_codecs);
CompressionCodecFactory::instance().validateCodec(method, level, !settings->allow_suspicious_codecs, settings->allow_experimental_codecs);
compression_codec = CompressionCodecFactory::instance().get(method, level);
}
else

27
src/Common/IO.cpp Normal file
View File

@ -0,0 +1,27 @@
#include <Common/IO.h>
#include <unistd.h>
#include <errno.h>
#include <cstring>
bool writeRetry(int fd, const char * data, size_t size)
{
if (!size)
size = strlen(data);
while (size != 0)
{
ssize_t res = ::write(fd, data, size);
if ((-1 == res || 0 == res) && errno != EINTR)
return false;
if (res > 0)
{
data += res;
size -= res;
}
}
return true;
}

13
src/Common/IO.h Normal file
View File

@ -0,0 +1,13 @@
#pragma once
#include <cstddef>
/// IO helpers
/// Write loop with EINTR handling.
///
/// This function is safe to use in static initializers.
///
/// @param size - len of @data or 0 to use strlen()
/// @return true if write was succeed, otherwise false.
bool writeRetry(int fd, const char * data, size_t size = 0);

View File

@ -14,6 +14,6 @@
#cmakedefine01 USE_SENTRY
#cmakedefine01 USE_GRPC
#cmakedefine01 USE_STATS
#cmakedefine01 CLICKHOUSE_SPLIT_BINARY
#cmakedefine01 USE_DATASKETCHES
#cmakedefine01 USE_YAML_CPP
#cmakedefine01 CLICKHOUSE_SPLIT_BINARY

View File

@ -46,6 +46,7 @@ SRCS(
ExternalLoaderStatus.cpp
FieldVisitors.cpp
FileChecker.cpp
IO.cpp
IPv6ToBinary.cpp
IntervalKind.cpp
JSONBuilder.cpp

View File

@ -1,4 +1,4 @@
#include <Compression/CompressionCodecDelta.h>
#include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h>
#include <common/unaligned.h>
@ -11,6 +11,29 @@
namespace DB
{
class CompressionCodecDelta : public ICompressionCodec
{
public:
explicit CompressionCodecDelta(UInt8 delta_bytes_size_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override { return uncompressed_size + 2; }
bool isCompression() const override { return false; }
bool isGenericCompression() const override { return false; }
private:
UInt8 delta_bytes_size;
};
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;

View File

@ -1,32 +0,0 @@
#pragma once
#include <Compression/ICompressionCodec.h>
namespace DB
{
class CompressionCodecDelta : public ICompressionCodec
{
public:
CompressionCodecDelta(UInt8 delta_bytes_size_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override { return uncompressed_size + 2; }
bool isCompression() const override { return false; }
bool isGenericCompression() const override { return false; }
private:
UInt8 delta_bytes_size;
};
}

View File

@ -1,4 +1,4 @@
#include <Compression/CompressionCodecDoubleDelta.h>
#include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h>
#include <common/unaligned.h>
@ -15,9 +15,126 @@
#include <type_traits>
#include <limits>
namespace DB
{
/** NOTE DoubleDelta is surprisingly bad name. The only excuse is that it comes from an academic paper.
* Most people will think that "double delta" is just applying delta transform twice.
* But in fact it is something more than applying delta transform twice.
*/
/** DoubleDelta column codec implementation.
*
* Based on Gorilla paper: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf, which was extended
* to support 64bit types. The drawback is 1 extra bit for 32-byte wide deltas: 5-bit prefix
* instead of 4-bit prefix.
*
* This codec is best used against monotonic integer sequences with constant (or almost constant)
* stride, like event timestamp for some monitoring application.
*
* Given input sequence a: [a0, a1, ... an]:
*
* First, write number of items (sizeof(int32)*8 bits): n
* Then write first item as is (sizeof(a[0])*8 bits): a[0]
* Second item is written as delta (sizeof(a[0])*8 bits): a[1] - a[0]
* Loop over remaining items and calculate double delta:
* double_delta = a[i] - 2 * a[i - 1] + a[i - 2]
* Write it in compact binary form with `BitWriter`
* if double_delta == 0:
* write 1bit: 0
* else if -63 < double_delta < 64:
* write 2 bit prefix: 10
* write sign bit (1 if signed): x
* write 7-1 bits of abs(double_delta - 1): xxxxxx
* else if -255 < double_delta < 256:
* write 3 bit prefix: 110
* write sign bit (1 if signed): x
* write 9-1 bits of abs(double_delta - 1): xxxxxxxx
* else if -2047 < double_delta < 2048:
* write 4 bit prefix: 1110
* write sign bit (1 if signed): x
* write 12-1 bits of abs(double_delta - 1): xxxxxxxxxxx
* else if double_delta fits into 32-bit int:
* write 5 bit prefix: 11110
* write sign bit (1 if signed): x
* write 32-1 bits of abs(double_delta - 1): xxxxxxxxxxx...
* else
* write 5 bit prefix: 11111
* write sign bit (1 if signed): x
* write 64-1 bits of abs(double_delta - 1): xxxxxxxxxxx...
*
* @example sequence of UInt8 values [1, 2, 3, 4, 5, 6, 7, 8, 9 10] is encoded as (codec header is omitted):
*
* .- 4-byte little-endian sequence length (10 == 0xa)
* | .- 1 byte (sizeof(UInt8) a[0] : 0x01
* | | .- 1 byte of delta: a[1] - a[0] = 2 - 1 = 1 : 0x01
* | | | .- 8 zero bits since double delta for remaining 8 elements was 0 : 0x00
* v_______________v___v___v___
* \x0a\x00\x00\x00\x01\x01\x00
*
* @example sequence of Int16 values [-10, 10, -20, 20, -40, 40] is encoded as:
*
* .- 4-byte little endian sequence length = 6 : 0x00000006
* | .- 2 bytes (sizeof(Int16) a[0] as UInt16 = -10 : 0xfff6
* | | .- 2 bytes of delta: a[1] - a[0] = 10 - (-10) = 20 : 0x0014
* | | | .- 4 encoded double deltas (see below)
* v_______________ v______ v______ v______________________
* \x06\x00\x00\x00\xf6\xff\x14\x00\xb8\xe2\x2e\xb1\xe4\x58
*
* 4 binary encoded double deltas (\xb8\xe2\x2e\xb1\xe4\x58):
* double_delta (DD) = -20 - 2 * 10 + (-10) = -50
* .- 2-bit prefix : 0b10
* | .- sign-bit : 0b1
* | |.- abs(DD - 1) = 49 : 0b110001
* | ||
* | || DD = 20 - 2 * (-20) + 10 = 70
* | || .- 3-bit prefix : 0b110
* | || | .- sign bit : 0b0
* | || | |.- abs(DD - 1) = 69 : 0b1000101
* | || | ||
* | || | || DD = -40 - 2 * 20 + (-20) = -100
* | || | || .- 3-bit prefix : 0b110
* | || | || | .- sign-bit : 0b0
* | || | || | |.- abs(DD - 1) = 99 : 0b1100011
* | || | || | ||
* | || | || | || DD = 40 - 2 * (-40) + 20 = 140
* | || | || | || .- 3-bit prefix : 0b110
* | || | || | || | .- sign bit : 0b0
* | || | || | || | |.- abs(DD - 1) = 139 : 0b10001011
* | || | || | || | ||
* V_vv______V__vv________V____vv_______V__vv________,- padding bits
* 10111000 11100010 00101110 10110001 11100100 01011000
*
* Please also see unit tests for:
* * Examples on what output `BitWriter` produces on predefined input.
* * Compatibility tests solidifying encoded binary output on set of predefined sequences.
*/
class CompressionCodecDoubleDelta : public ICompressionCodec
{
public:
explicit CompressionCodecDoubleDelta(UInt8 data_bytes_size_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
private:
UInt8 data_bytes_size;
};
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;

View File

@ -1,118 +0,0 @@
#pragma once
#include <Compression/ICompressionCodec.h>
namespace DB
{
/** DoubleDelta column codec implementation.
*
* Based on Gorilla paper: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf, which was extended
* to support 64bit types. The drawback is 1 extra bit for 32-byte wide deltas: 5-bit prefix
* instead of 4-bit prefix.
*
* This codec is best used against monotonic integer sequences with constant (or almost constant)
* stride, like event timestamp for some monitoring application.
*
* Given input sequence a: [a0, a1, ... an]:
*
* First, write number of items (sizeof(int32)*8 bits): n
* Then write first item as is (sizeof(a[0])*8 bits): a[0]
* Second item is written as delta (sizeof(a[0])*8 bits): a[1] - a[0]
* Loop over remaining items and calculate double delta:
* double_delta = a[i] - 2 * a[i - 1] + a[i - 2]
* Write it in compact binary form with `BitWriter`
* if double_delta == 0:
* write 1bit: 0
* else if -63 < double_delta < 64:
* write 2 bit prefix: 10
* write sign bit (1 if signed): x
* write 7-1 bits of abs(double_delta - 1): xxxxxx
* else if -255 < double_delta < 256:
* write 3 bit prefix: 110
* write sign bit (1 if signed): x
* write 9-1 bits of abs(double_delta - 1): xxxxxxxx
* else if -2047 < double_delta < 2048:
* write 4 bit prefix: 1110
* write sign bit (1 if signed): x
* write 12-1 bits of abs(double_delta - 1): xxxxxxxxxxx
* else if double_delta fits into 32-bit int:
* write 5 bit prefix: 11110
* write sign bit (1 if signed): x
* write 32-1 bits of abs(double_delta - 1): xxxxxxxxxxx...
* else
* write 5 bit prefix: 11111
* write sign bit (1 if signed): x
* write 64-1 bits of abs(double_delta - 1): xxxxxxxxxxx...
*
* @example sequence of UInt8 values [1, 2, 3, 4, 5, 6, 7, 8, 9 10] is encoded as (codec header is omitted):
*
* .- 4-byte little-endian sequence length (10 == 0xa)
* | .- 1 byte (sizeof(UInt8) a[0] : 0x01
* | | .- 1 byte of delta: a[1] - a[0] = 2 - 1 = 1 : 0x01
* | | | .- 8 zero bits since double delta for remaining 8 elements was 0 : 0x00
* v_______________v___v___v___
* \x0a\x00\x00\x00\x01\x01\x00
*
* @example sequence of Int16 values [-10, 10, -20, 20, -40, 40] is encoded as:
*
* .- 4-byte little endian sequence length = 6 : 0x00000006
* | .- 2 bytes (sizeof(Int16) a[0] as UInt16 = -10 : 0xfff6
* | | .- 2 bytes of delta: a[1] - a[0] = 10 - (-10) = 20 : 0x0014
* | | | .- 4 encoded double deltas (see below)
* v_______________ v______ v______ v______________________
* \x06\x00\x00\x00\xf6\xff\x14\x00\xb8\xe2\x2e\xb1\xe4\x58
*
* 4 binary encoded double deltas (\xb8\xe2\x2e\xb1\xe4\x58):
* double_delta (DD) = -20 - 2 * 10 + (-10) = -50
* .- 2-bit prefix : 0b10
* | .- sign-bit : 0b1
* | |.- abs(DD - 1) = 49 : 0b110001
* | ||
* | || DD = 20 - 2 * (-20) + 10 = 70
* | || .- 3-bit prefix : 0b110
* | || | .- sign bit : 0b0
* | || | |.- abs(DD - 1) = 69 : 0b1000101
* | || | ||
* | || | || DD = -40 - 2 * 20 + (-20) = -100
* | || | || .- 3-bit prefix : 0b110
* | || | || | .- sign-bit : 0b0
* | || | || | |.- abs(DD - 1) = 99 : 0b1100011
* | || | || | ||
* | || | || | || DD = 40 - 2 * (-40) + 20 = 140
* | || | || | || .- 3-bit prefix : 0b110
* | || | || | || | .- sign bit : 0b0
* | || | || | || | |.- abs(DD - 1) = 139 : 0b10001011
* | || | || | || | ||
* V_vv______V__vv________V____vv_______V__vv________,- padding bits
* 10111000 11100010 00101110 10110001 11100100 01011000
*
* Please also see unit tests for:
* * Examples on what output `BitWriter` produces on predefined input.
* * Compatibility tests solidifying encoded binary output on set of predefined sequences.
*/
class CompressionCodecDoubleDelta : public ICompressionCodec
{
public:
CompressionCodecDoubleDelta(UInt8 data_bytes_size_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
private:
UInt8 data_bytes_size;
};
}

View File

@ -1,4 +1,4 @@
#include <Compression/CompressionCodecGorilla.h>
#include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h>
#include <common/unaligned.h>
@ -14,9 +14,118 @@
#include <bitset>
namespace DB
{
/** Gorilla column codec implementation.
*
* Based on Gorilla paper: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf
*
* This codec is best used against monotonic floating sequences, like CPU usage percentage
* or any other gauge.
*
* Given input sequence a: [a0, a1, ... an]
*
* First, write number of items (sizeof(int32)*8 bits): n
* Then write first item as is (sizeof(a[0])*8 bits): a[0]
* Loop over remaining items and calculate xor_diff:
* xor_diff = a[i] ^ a[i - 1] (e.g. 00000011'10110100)
* Write it in compact binary form with `BitWriter`
* if xor_diff == 0:
* write 1 bit: 0
* else:
* calculate leading zero bits (lzb)
* and trailing zero bits (tzb) of xor_diff,
* compare to lzb and tzb of previous xor_diff
* (X = sizeof(a[i]) * 8, e.g. X = 16, lzb = 6, tzb = 2)
* if lzb >= prev_lzb && tzb >= prev_tzb:
* (e.g. prev_lzb=4, prev_tzb=1)
* write 2 bit prefix: 0b10
* write xor_diff >> prev_tzb (X - prev_lzb - prev_tzb bits):0b00111011010
* (where X = sizeof(a[i]) * 8, e.g. 16)
* else:
* write 2 bit prefix: 0b11
* write 5 bits of lzb: 0b00110
* write 6 bits of (X - lzb - tzb)=(16-6-2)=8: 0b001000
* write (X - lzb - tzb) non-zero bits of xor_diff: 0b11101101
* prev_lzb = lzb
* prev_tzb = tzb
*
* @example sequence of Float32 values [0.1, 0.1, 0.11, 0.2, 0.1] is encoded as:
*
* .- 4-byte little endian sequence length: 5 : 0x00000005
* | .- 4 byte (sizeof(Float32) a[0] as UInt32 : -10 : 0xcdcccc3d
* | | .- 4 encoded xor diffs (see below)
* v_______________ v______________ v__________________________________________________
* \x05\x00\x00\x00\xcd\xcc\xcc\x3d\x6a\x5a\xd8\xb6\x3c\xcd\x75\xb1\x6c\x77\x00\x00\x00
*
* 4 binary encoded xor diffs (\x6a\x5a\xd8\xb6\x3c\xcd\x75\xb1\x6c\x77\x00\x00\x00):
*
* ...........................................
* a[i-1] = 00111101110011001100110011001101
* a[i] = 00111101110011001100110011001101
* xor_diff = 00000000000000000000000000000000
* .- 1-bit prefix : 0b0
* |
* | ...........................................
* | a[i-1] = 00111101110011001100110011001101
* ! a[i] = 00111101111000010100011110101110
* | xor_diff = 00000000001011011000101101100011
* | lzb = 10
* | tzb = 0
* |.- 2-bit prefix : 0b11
* || .- lzb (10) : 0b1010
* || | .- data length (32-10-0): 22 : 0b010110
* || | | .- data : 0b1011011000101101100011
* || | | |
* || | | | ...........................................
* || | | | a[i-1] = 00111101111000010100011110101110
* || | | | a[i] = 00111110010011001100110011001101
* || | | | xor_diff = 00000011101011011000101101100011
* || | | | .- 2-bit prefix : 0b11
* || | | | | .- lzb = 6 : 0b00110
* || | | | | | .- data length = (32 - 6) = 26 : 0b011010
* || | | | | | | .- data : 0b11101011011000101101100011
* || | | | | | | |
* || | | | | | | | ...........................................
* || | | | | | | | a[i-1] = 00111110010011001100110011001101
* || | | | | | | | a[i] = 00111101110011001100110011001101
* || | | | | | | | xor_diff = 00000011100000000000000000000000
* || | | | | | | | .- 2-bit prefix : 0b10
* || | | | | | | | | .- data : 0b11100000000000000000000000
* VV_v____ v_____v________________________V_v_____v______v____________________________V_v_____________________________
* 01101010 01011010 11011000 10110110 00111100 11001101 01110101 10110001 01101100 01110111 00000000 00000000 00000000
*
* Please also see unit tests for:
* * Examples on what output `BitWriter` produces on predefined input.
* * Compatibility tests solidifying encoded binary output on set of predefined sequences.
*/
class CompressionCodecGorilla : public ICompressionCodec
{
public:
explicit CompressionCodecGorilla(UInt8 data_bytes_size_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
private:
UInt8 data_bytes_size;
};
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;

View File

@ -1,115 +0,0 @@
#pragma once
#include <Compression/ICompressionCodec.h>
namespace DB
{
/** Gorilla column codec implementation.
*
* Based on Gorilla paper: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf
*
* This codec is best used against monotonic floating sequences, like CPU usage percentage
* or any other gauge.
*
* Given input sequence a: [a0, a1, ... an]
*
* First, write number of items (sizeof(int32)*8 bits): n
* Then write first item as is (sizeof(a[0])*8 bits): a[0]
* Loop over remaining items and calculate xor_diff:
* xor_diff = a[i] ^ a[i - 1] (e.g. 00000011'10110100)
* Write it in compact binary form with `BitWriter`
* if xor_diff == 0:
* write 1 bit: 0
* else:
* calculate leading zero bits (lzb)
* and trailing zero bits (tzb) of xor_diff,
* compare to lzb and tzb of previous xor_diff
* (X = sizeof(a[i]) * 8, e.g. X = 16, lzb = 6, tzb = 2)
* if lzb >= prev_lzb && tzb >= prev_tzb:
* (e.g. prev_lzb=4, prev_tzb=1)
* write 2 bit prefix: 0b10
* write xor_diff >> prev_tzb (X - prev_lzb - prev_tzb bits):0b00111011010
* (where X = sizeof(a[i]) * 8, e.g. 16)
* else:
* write 2 bit prefix: 0b11
* write 5 bits of lzb: 0b00110
* write 6 bits of (X - lzb - tzb)=(16-6-2)=8: 0b001000
* write (X - lzb - tzb) non-zero bits of xor_diff: 0b11101101
* prev_lzb = lzb
* prev_tzb = tzb
*
* @example sequence of Float32 values [0.1, 0.1, 0.11, 0.2, 0.1] is encoded as:
*
* .- 4-byte little endian sequence length: 5 : 0x00000005
* | .- 4 byte (sizeof(Float32) a[0] as UInt32 : -10 : 0xcdcccc3d
* | | .- 4 encoded xor diffs (see below)
* v_______________ v______________ v__________________________________________________
* \x05\x00\x00\x00\xcd\xcc\xcc\x3d\x6a\x5a\xd8\xb6\x3c\xcd\x75\xb1\x6c\x77\x00\x00\x00
*
* 4 binary encoded xor diffs (\x6a\x5a\xd8\xb6\x3c\xcd\x75\xb1\x6c\x77\x00\x00\x00):
*
* ...........................................
* a[i-1] = 00111101110011001100110011001101
* a[i] = 00111101110011001100110011001101
* xor_diff = 00000000000000000000000000000000
* .- 1-bit prefix : 0b0
* |
* | ...........................................
* | a[i-1] = 00111101110011001100110011001101
* ! a[i] = 00111101111000010100011110101110
* | xor_diff = 00000000001011011000101101100011
* | lzb = 10
* | tzb = 0
* |.- 2-bit prefix : 0b11
* || .- lzb (10) : 0b1010
* || | .- data length (32-10-0): 22 : 0b010110
* || | | .- data : 0b1011011000101101100011
* || | | |
* || | | | ...........................................
* || | | | a[i-1] = 00111101111000010100011110101110
* || | | | a[i] = 00111110010011001100110011001101
* || | | | xor_diff = 00000011101011011000101101100011
* || | | | .- 2-bit prefix : 0b11
* || | | | | .- lzb = 6 : 0b00110
* || | | | | | .- data length = (32 - 6) = 26 : 0b011010
* || | | | | | | .- data : 0b11101011011000101101100011
* || | | | | | | |
* || | | | | | | | ...........................................
* || | | | | | | | a[i-1] = 00111110010011001100110011001101
* || | | | | | | | a[i] = 00111101110011001100110011001101
* || | | | | | | | xor_diff = 00000011100000000000000000000000
* || | | | | | | | .- 2-bit prefix : 0b10
* || | | | | | | | | .- data : 0b11100000000000000000000000
* VV_v____ v_____v________________________V_v_____v______v____________________________V_v_____________________________
* 01101010 01011010 11011000 10110110 00111100 11001101 01110101 10110001 01101100 01110111 00000000 00000000 00000000
*
* Please also see unit tests for:
* * Examples on what output `BitWriter` produces on predefined input.
* * Compatibility tests solidifying encoded binary output on set of predefined sequences.
*/
class CompressionCodecGorilla : public ICompressionCodec
{
public:
CompressionCodecGorilla(UInt8 data_bytes_size_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
private:
UInt8 data_bytes_size;
};
}

View File

@ -1,7 +1,7 @@
#include "CompressionCodecLZ4.h"
#include <lz4.h>
#include <lz4hc.h>
#include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h>
#include <Compression/LZ4_decompress_faster.h>
@ -9,7 +9,9 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <IO/BufferWithOwnMemory.h>
#pragma GCC diagnostic ignored "-Wold-style-cast"
@ -17,6 +19,46 @@
namespace DB
{
class CompressionCodecLZ4 : public ICompressionCodec
{
public:
explicit CompressionCodecLZ4();
uint8_t getMethodByte() const override;
UInt32 getAdditionalSizeAtTheEndOfBuffer() const override { return LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER; }
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
private:
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
mutable LZ4::PerformanceStatistics lz4_stat;
ASTPtr codec_desc;
};
class CompressionCodecLZ4HC : public CompressionCodecLZ4
{
public:
explicit CompressionCodecLZ4HC(int level_);
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
private:
const int level;
};
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;

View File

@ -1,52 +0,0 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <Compression/ICompressionCodec.h>
#include <IO/BufferWithOwnMemory.h>
#include <Parsers/StringRange.h>
#include <Compression/LZ4_decompress_faster.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
class CompressionCodecLZ4 : public ICompressionCodec
{
public:
CompressionCodecLZ4();
uint8_t getMethodByte() const override;
UInt32 getAdditionalSizeAtTheEndOfBuffer() const override { return LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER; }
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
private:
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
mutable LZ4::PerformanceStatistics lz4_stat;
ASTPtr codec_desc;
};
class CompressionCodecLZ4HC : public CompressionCodecLZ4
{
public:
CompressionCodecLZ4HC(int level_);
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
private:
const int level;
};
}

View File

@ -1,6 +1,6 @@
#include <cstring>
#include <Compression/CompressionCodecT64.h>
#include <Compression/ICompressionCodec.h>
#include <Compression/CompressionFactory.h>
#include <common/unaligned.h>
#include <Parsers/IAST.h>
@ -8,11 +8,56 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
#include <IO/WriteHelpers.h>
#include <Core/Types.h>
namespace DB
{
/// Get 64 integer values, makes 64x64 bit matrix, transpose it and crop unused bits (most significant zeroes).
/// In example, if we have UInt8 with only 0 and 1 inside 64xUInt8 would be compressed into 1xUInt64.
/// It detects unused bits by calculating min and max values of data part, saving them in header in compression phase.
/// There's a special case with signed integers parts with crossing zero data. Here it stores one more bit to detect sign of value.
class CompressionCodecT64 : public ICompressionCodec
{
public:
static constexpr UInt32 HEADER_SIZE = 1 + 2 * sizeof(UInt64);
static constexpr UInt32 MAX_COMPRESSED_BLOCK_SIZE = sizeof(UInt64) * 64;
/// There're 2 compression variants:
/// Byte - transpose bit matrix by bytes (only the last not full byte is transposed by bits). It's default.
/// Bits - full bit-transpose of the bit matrix. It uses more resources and leads to better compression with ZSTD (but worse with LZ4).
enum class Variant
{
Byte,
Bit
};
CompressionCodecT64(TypeIndex type_idx_, Variant variant_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * src, UInt32 src_size, char * dst) const override;
void doDecompressData(const char * src, UInt32 src_size, char * dst, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override
{
/// uncompressed_size - (uncompressed_size % (sizeof(T) * 64)) + sizeof(UInt64) * sizeof(T) + header_size
return uncompressed_size + MAX_COMPRESSED_BLOCK_SIZE + HEADER_SIZE;
}
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
private:
TypeIndex type_idx;
Variant variant;
};
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;

View File

@ -1,53 +0,0 @@
#pragma once
#include <Core/Types.h>
#include <Compression/ICompressionCodec.h>
namespace DB
{
/// Get 64 integer values, makes 64x64 bit matrix, transpose it and crop unused bits (most significant zeroes).
/// In example, if we have UInt8 with only 0 and 1 inside 64xUInt8 would be compressed into 1xUInt64.
/// It detects unused bits by calculating min and max values of data part, saving them in header in compression phase.
/// There's a special case with signed integers parts with crossing zero data. Here it stores one more bit to detect sign of value.
class CompressionCodecT64 : public ICompressionCodec
{
public:
static constexpr UInt32 HEADER_SIZE = 1 + 2 * sizeof(UInt64);
static constexpr UInt32 MAX_COMPRESSED_BLOCK_SIZE = sizeof(UInt64) * 64;
/// There're 2 compression variants:
/// Byte - transpose bit matrix by bytes (only the last not full byte is transposed by bits). It's default.
/// Bits - full bit-transpose of the bit matrix. It uses more resources and leads to better compression with ZSTD (but worse with LZ4).
enum class Variant
{
Byte,
Bit
};
CompressionCodecT64(TypeIndex type_idx_, Variant variant_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * src, UInt32 src_size, char * dst) const override;
void doDecompressData(const char * src, UInt32 src_size, char * dst, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override
{
/// uncompressed_size - (uncompressed_size % (sizeof(T) * 64)) + sizeof(UInt64) * sizeof(T) + header_size
return uncompressed_size + MAX_COMPRESSED_BLOCK_SIZE + HEADER_SIZE;
}
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
private:
TypeIndex type_idx;
Variant variant;
};
}

View File

@ -1,4 +1,4 @@
#include <Compression/CompressionCodecZSTD.h>
#include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h>
#include <zstd.h>
@ -7,11 +7,44 @@
#include <Parsers/ASTFunction.h>
#include <Common/typeid_cast.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
namespace DB
{
class CompressionCodecZSTD : public ICompressionCodec
{
public:
static constexpr auto ZSTD_DEFAULT_LEVEL = 1;
static constexpr auto ZSTD_DEFAULT_LOG_WINDOW = 24;
explicit CompressionCodecZSTD(int level_);
CompressionCodecZSTD(int level_, int window_log);
uint8_t getMethodByte() const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
private:
const int level;
const bool enable_long_range;
const int window_log;
};
namespace ErrorCodes
{
extern const int CANNOT_COMPRESS;

View File

@ -1,42 +0,0 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <Compression/ICompressionCodec.h>
#include <IO/BufferWithOwnMemory.h>
#include <Parsers/StringRange.h>
namespace DB
{
class CompressionCodecZSTD : public ICompressionCodec
{
public:
static constexpr auto ZSTD_DEFAULT_LEVEL = 1;
static constexpr auto ZSTD_DEFAULT_LOG_WINDOW = 24;
CompressionCodecZSTD(int level_);
CompressionCodecZSTD(int level_, int window_log);
uint8_t getMethodByte() const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
private:
const int level;
const bool enable_long_range;
const int window_log;
};
}

View File

@ -1,3 +1,7 @@
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#include <Compression/CompressionFactory.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
@ -13,6 +17,7 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
@ -34,8 +39,8 @@ CompressionCodecPtr CompressionCodecFactory::get(const String & family_name, std
{
if (level)
{
auto literal = std::make_shared<ASTLiteral>(static_cast<UInt64>(*level));
return get(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), literal)), {});
auto level_literal = std::make_shared<ASTLiteral>(static_cast<UInt64>(*level));
return get(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), level_literal)), {});
}
else
{
@ -44,7 +49,8 @@ CompressionCodecPtr CompressionCodecFactory::get(const String & family_name, std
}
}
void CompressionCodecFactory::validateCodec(const String & family_name, std::optional<int> level, bool sanity_check) const
void CompressionCodecFactory::validateCodec(
const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs) const
{
if (family_name.empty())
throw Exception("Compression codec name cannot be empty", ErrorCodes::BAD_ARGUMENTS);
@ -52,16 +58,19 @@ void CompressionCodecFactory::validateCodec(const String & family_name, std::opt
if (level)
{
auto literal = std::make_shared<ASTLiteral>(static_cast<UInt64>(*level));
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), literal)), {}, sanity_check);
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), literal)),
{}, sanity_check, allow_experimental_codecs);
}
else
{
auto identifier = std::make_shared<ASTIdentifier>(Poco::toUpper(family_name));
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", identifier), {}, sanity_check);
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", identifier),
{}, sanity_check, allow_experimental_codecs);
}
}
ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const IDataType * column_type, bool sanity_check) const
ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
const ASTPtr & ast, const IDataType * column_type, bool sanity_check, bool allow_experimental_codecs) const
{
if (const auto * func = ast->as<ASTFunction>())
{
@ -72,7 +81,7 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
std::optional<size_t> generic_compression_codec_pos;
bool can_substitute_codec_arguments = true;
for (size_t i = 0; i < func->arguments->children.size(); ++i)
for (size_t i = 0, size = func->arguments->children.size(); i < size; ++i)
{
const auto & inner_codec_ast = func->arguments->children[i];
String codec_family_name;
@ -107,7 +116,8 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
if (column_type)
{
CompressionCodecPtr prev_codec;
IDataType::StreamCallbackWithType callback = [&](const ISerialization::SubstreamPath & substream_path, const IDataType & substream_type)
IDataType::StreamCallbackWithType callback = [&](
const ISerialization::SubstreamPath & substream_path, const IDataType & substream_type)
{
if (ISerialization::isSpecialCompressionAllowed(substream_path))
{
@ -132,6 +142,12 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
result_codec = getImpl(codec_family_name, codec_arguments, nullptr);
}
if (!allow_experimental_codecs && result_codec->isExperimental())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Codec {} is experimental and not meant to be used in production."
" You can enable it with the 'allow_experimental_codecs' setting.",
codec_family_name);
codecs_descriptions->children.emplace_back(result_codec->getCodecDesc());
}
@ -172,6 +188,7 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
" (Note: you can enable setting 'allow_suspicious_codecs' to skip this check).", ErrorCodes::BAD_ARGUMENTS);
}
/// For columns with nested types like Tuple(UInt32, UInt64) we
/// obviously cannot substitute parameters for codecs which depend on
/// data type, because for the first column Delta(4) is suitable and
@ -195,7 +212,9 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
throw Exception("Unknown codec family: " + queryToString(ast), ErrorCodes::UNKNOWN_CODEC);
}
CompressionCodecPtr CompressionCodecFactory::get(const ASTPtr & ast, const IDataType * column_type, CompressionCodecPtr current_default, bool only_generic) const
CompressionCodecPtr CompressionCodecFactory::get(
const ASTPtr & ast, const IDataType * column_type, CompressionCodecPtr current_default, bool only_generic) const
{
if (current_default == nullptr)
current_default = default_codec;
@ -246,6 +265,7 @@ CompressionCodecPtr CompressionCodecFactory::get(const ASTPtr & ast, const IData
throw Exception("Unexpected AST structure for compression codec: " + queryToString(ast), ErrorCodes::UNEXPECTED_AST_STRUCTURE);
}
CompressionCodecPtr CompressionCodecFactory::get(const uint8_t byte_code) const
{
const auto family_code_and_creator = family_code_with_codec.find(byte_code);
@ -303,7 +323,7 @@ void CompressionCodecFactory::registerSimpleCompressionCodec(
registerCompressionCodec(family_name, byte_code, [family_name, creator](const ASTPtr & ast)
{
if (ast)
throw Exception("Compression codec " + family_name + " cannot have arguments", ErrorCodes::DATA_TYPE_CANNOT_HAVE_ARGUMENTS);
throw Exception(ErrorCodes::DATA_TYPE_CANNOT_HAVE_ARGUMENTS, "Compression codec {} cannot have arguments", family_name);
return creator();
});
}

View File

@ -38,16 +38,16 @@ public:
CompressionCodecPtr getDefaultCodec() const;
/// Validate codecs AST specified by user and parses codecs description (substitute default parameters)
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const IDataType * column_type, bool sanity_check) const;
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const IDataType * column_type, bool sanity_check, bool allow_experimental_codecs) const;
/// Just wrapper for previous method.
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check) const
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs) const
{
return validateCodecAndGetPreprocessedAST(ast, column_type.get(), sanity_check);
return validateCodecAndGetPreprocessedAST(ast, column_type.get(), sanity_check, allow_experimental_codecs);
}
/// Validate codecs AST specified by user
void validateCodec(const String & family_name, std::optional<int> level, bool sanity_check) const;
void validateCodec(const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs) const;
/// Get codec by AST and possible column_type. Some codecs can use
/// information about type to improve inner settings, but every codec should

View File

@ -73,6 +73,10 @@ public:
/// Is it a generic compression algorithm like lz4, zstd. Usually it does not make sense to apply generic compression more than single time.
virtual bool isGenericCompression() const = 0;
/// It is a codec available only for evaluation purposes and not meant to be used in production.
/// It will not be allowed to use unless the user will turn off the safety switch.
virtual bool isExperimental() const { return false; }
/// If it does nothing.
virtual bool isNone() const { return false; }

View File

@ -80,7 +80,7 @@ class IColumn;
M(UInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.", 0) \
M(UInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \
M(UInt64, background_fetches_pool_size, 8, "Number of threads performing background fetches for replicated tables. Only has meaning at server startup.", 0) \
M(UInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables, dns cache updates. Only has meaning at server startup.", 0) \
M(UInt64, background_schedule_pool_size, 128, "Number of threads performing background tasks for replicated tables, dns cache updates. Only has meaning at server startup.", 0) \
M(UInt64, background_message_broker_schedule_pool_size, 16, "Number of threads performing background tasks for message streaming. Only has meaning at server startup.", 0) \
M(UInt64, background_distributed_schedule_pool_size, 16, "Number of threads performing background tasks for distributed sends. Only has meaning at server startup.", 0) \
M(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited. Only has meaning at server startup.", 0) \
@ -240,6 +240,7 @@ class IColumn;
M(Bool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.", 0) \
M(Bool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.", 0) \
M(Bool, allow_suspicious_codecs, false, "If it is set to true, allow to specify meaningless compression codecs.", 0) \
M(Bool, allow_experimental_codecs, false, "If it is set to true, allow to specify experimental compression codecs (but we don't have those yet and this option does nothing).", 0) \
M(UInt64, odbc_max_field_size, 1024, "Max size of filed can be read from ODBC dictionary. Long strings are truncated.", 0) \
M(UInt64, query_profiler_real_time_period_ns, 1000000000, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off the real clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(UInt64, query_profiler_cpu_time_period_ns, 1000000000, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off the CPU clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
@ -402,6 +403,7 @@ class IColumn;
M(Bool, optimize_if_chain_to_multiif, false, "Replace if(cond1, then1, if(cond2, ...)) chains to multiIf. Currently it's not beneficial for numeric types.", 0) \
M(Bool, optimize_if_transform_strings_to_enum, false, "Replaces string-type arguments in If and Transform to enum. Disabled by default cause it could make inconsistent change in distributed query that would lead to its fail.", 0) \
M(Bool, optimize_monotonous_functions_in_order_by, true, "Replace monotonous function with its argument in ORDER BY", 0) \
M(Bool, optimize_functions_to_subcolumns, false, "Transform functions to subcolumns, if possible, to reduce amount of read data. E.g. 'length(arr)' -> 'arr.size0', 'col IS NULL' -> 'col.null' ", 0) \
M(Bool, normalize_function_names, true, "Normalize function names to their canonical names", 0) \
M(Bool, allow_experimental_alter_materialized_view_structure, false, "Allow atomic alter on Materialized views. Work in progress.", 0) \
M(Bool, enable_early_constant_folding, true, "Enable query optimization where we analyze function and subqueries results and rewrite query if there're constants there", 0) \

View File

@ -7,7 +7,6 @@ PEERDIR(
clickhouse/src/Common
)
SRCS(
DiskCacheWrapper.cpp
DiskDecorator.cpp
@ -16,13 +15,21 @@ SRCS(
DiskMemory.cpp
DiskRestartProxy.cpp
DiskSelector.cpp
HDFS/DiskHDFS.cpp
IDisk.cpp
IDiskRemote.cpp
IVolume.cpp
LocalDirectorySyncGuard.cpp
ReadIndirectBufferFromRemoteFS.cpp
S3/DiskS3.cpp
S3/ProxyListConfiguration.cpp
S3/ProxyResolverConfiguration.cpp
S3/registerDiskS3.cpp
SingleDiskVolume.cpp
StoragePolicy.cpp
VolumeJBOD.cpp
VolumeRAID1.cpp
WriteIndirectBufferFromRemoteFS.cpp
createVolume.cpp
registerDisks.cpp

View File

@ -6,7 +6,7 @@ PEERDIR(
clickhouse/src/Common
)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F 'S3|HDFS' | sed 's/^\.\// /' | sort ?>
)

View File

@ -182,18 +182,20 @@ public:
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
const ColumnMap * col_map = typeid_cast<const ColumnMap *>(arguments[0].column.get());
bool is_const = isColumnConst(*arguments[0].column);
const ColumnMap * col_map = is_const ? checkAndGetColumnConstData<ColumnMap>(arguments[0].column.get()) : checkAndGetColumn<ColumnMap>(arguments[0].column.get());
if (!col_map)
return nullptr;
throw Exception{"First argument for function " + getName() + " must be a map", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
const auto & nested_column = col_map->getNestedColumn();
const auto & keys_data = col_map->getNestedData().getColumn(0);
/// Prepare arguments to call arrayIndex for check has the array element.
ColumnPtr column_array = ColumnArray::create(keys_data.getPtr(), nested_column.getOffsetsPtr());
ColumnsWithTypeAndName new_arguments =
{
{
ColumnArray::create(keys_data.getPtr(), nested_column.getOffsetsPtr()),
is_const ? ColumnConst::create(std::move(column_array), keys_data.size()) : std::move(column_array),
std::make_shared<DataTypeArray>(result_type),
""
},

View File

@ -1894,11 +1894,11 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
res = place;
}
if (block.rows() > 0)
for (size_t row = 0, rows = block.rows(); row < rows; ++row)
{
/// Adding Values
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(res + offsets_of_aggregate_states[i], (*aggregate_columns[i])[0], result.aggregates_pool);
aggregate_functions[i]->merge(res + offsets_of_aggregate_states[i], (*aggregate_columns[i])[row], result.aggregates_pool);
}
/// Early release memory.

View File

@ -83,6 +83,17 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, ContextPtr c
}
}
if (settings.offset)
{
new_settings.offset = 0;
new_settings.offset.changed = false;
}
if (settings.limit)
{
new_settings.limit = 0;
new_settings.limit.changed = false;
}
auto new_context = Context::createCopy(context);
new_context->setSettings(new_settings);
return new_context;

View File

@ -85,7 +85,8 @@ public:
/// If this is already an external table, you do not need to add anything. Just remember its presence.
auto temporary_table_name = getIdentifierName(subquery_or_table_name);
bool exists_in_local_map = external_tables.end() != external_tables.find(temporary_table_name);
bool exists_in_context = getContext()->tryResolveStorageID(StorageID("", temporary_table_name), Context::ResolveExternal);
bool exists_in_context = static_cast<bool>(getContext()->tryResolveStorageID(
StorageID("", temporary_table_name), Context::ResolveExternal));
if (exists_in_local_map || exists_in_context)
return;
}

View File

@ -447,6 +447,8 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
defaults_sample_block = validateColumnsDefaultsAndGetSampleBlock(default_expr_list, column_names_and_types, context_);
bool sanity_check_compression_codecs = !attach && !context_->getSettingsRef().allow_suspicious_codecs;
bool allow_experimental_codecs = attach || context_->getSettingsRef().allow_experimental_codecs;
ColumnsDescription res;
auto name_type_it = column_names_and_types.begin();
for (auto ast_it = columns_ast.children.begin(); ast_it != columns_ast.children.end(); ++ast_it, ++name_type_it)
@ -481,7 +483,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
if (col_decl.default_specifier == "ALIAS")
throw Exception{"Cannot specify codec for column type ALIAS", ErrorCodes::BAD_ARGUMENTS};
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(
col_decl.codec, column.type, sanity_check_compression_codecs);
col_decl.codec, column.type, sanity_check_compression_codecs, allow_experimental_codecs);
}
if (col_decl.ttl)

View File

@ -44,7 +44,8 @@ BlockInputStreamPtr InterpreterExistsQuery::executeImpl()
{
if (exists_query->temporary)
{
result = getContext()->tryResolveStorageID({"", exists_query->table}, Context::ResolveExternal);
result = static_cast<bool>(getContext()->tryResolveStorageID(
{"", exists_query->table}, Context::ResolveExternal));
}
else
{

View File

@ -80,6 +80,9 @@ void QueryNormalizer::visit(ASTIdentifier & node, ASTPtr & ast, Data & data)
/// If it is an alias, but not a parent alias (for constructs like "SELECT column + 1 AS column").
auto it_alias = data.aliases.find(node.name());
if (!data.allow_self_aliases && current_alias == node.name())
throw Exception(ErrorCodes::CYCLIC_ALIASES, "Self referencing of {} to {}. Cyclic alias", backQuote(current_alias), backQuote(node.name()));
if (it_alias != data.aliases.end() && current_alias != node.name())
{
if (!IdentifierSemantic::canBeAlias(node))

View File

@ -48,18 +48,22 @@ public:
MapOfASTs finished_asts; /// already processed vertices (and by what they replaced)
SetOfASTs current_asts; /// vertices in the current call stack of this method
std::string current_alias; /// the alias referencing to the ancestor of ast (the deepest ancestor with aliases)
bool ignore_alias; /// normalize query without any aliases
const bool ignore_alias; /// normalize query without any aliases
Data(const Aliases & aliases_, const NameSet & source_columns_set_, bool ignore_alias_, ExtractedSettings && settings_)
/// It's Ok to have "c + 1 AS c" in queries, but not in table definition
const bool allow_self_aliases; /// for constructs like "SELECT column + 1 AS column"
Data(const Aliases & aliases_, const NameSet & source_columns_set_, bool ignore_alias_, ExtractedSettings && settings_, bool allow_self_aliases_)
: aliases(aliases_)
, source_columns_set(source_columns_set_)
, settings(settings_)
, level(0)
, ignore_alias(ignore_alias_)
, allow_self_aliases(allow_self_aliases_)
{}
};
QueryNormalizer(Data & data)
explicit QueryNormalizer(Data & data)
: visitor_data(data)
{}

View File

@ -0,0 +1,80 @@
#include <Interpreters/RewriteFunctionToSubcolumnVisitor.h>
#include <DataTypes/NestedUtils.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
namespace DB
{
namespace
{
ASTPtr transformToSubcolumn(const String & name_in_storage, const String & subcolumn_name)
{
return std::make_shared<ASTIdentifier>(Nested::concatenateName(name_in_storage, subcolumn_name));
}
ASTPtr transformEmptyToSubcolumn(const String & name_in_storage, const String & subcolumn_name)
{
auto ast = transformToSubcolumn(name_in_storage, subcolumn_name);
return makeASTFunction("equals", ast, std::make_shared<ASTLiteral>(0u));
}
ASTPtr transformNotEmptyToSubcolumn(const String & name_in_storage, const String & subcolumn_name)
{
auto ast = transformToSubcolumn(name_in_storage, subcolumn_name);
return makeASTFunction("notEquals", ast, std::make_shared<ASTLiteral>(0u));
}
ASTPtr transformIsNotNullToSubcolumn(const String & name_in_storage, const String & subcolumn_name)
{
auto ast = transformToSubcolumn(name_in_storage, subcolumn_name);
return makeASTFunction("not", ast);
}
ASTPtr transformCountNullableToSubcolumn(const String & name_in_storage, const String & subcolumn_name)
{
auto ast = transformToSubcolumn(name_in_storage, subcolumn_name);
return makeASTFunction("sum", makeASTFunction("not", ast));
}
const std::unordered_map<String, std::tuple<TypeIndex, String, decltype(&transformToSubcolumn)>> function_to_subcolumn =
{
{"length", {TypeIndex::Array, "size0", transformToSubcolumn}},
{"empty", {TypeIndex::Array, "size0", transformEmptyToSubcolumn}},
{"notEmpty", {TypeIndex::Array, "size0", transformNotEmptyToSubcolumn}},
{"isNull", {TypeIndex::Nullable, "null", transformToSubcolumn}},
{"isNotNull", {TypeIndex::Nullable, "null", transformIsNotNullToSubcolumn}},
{"count", {TypeIndex::Nullable, "null", transformCountNullableToSubcolumn}},
{"mapKeys", {TypeIndex::Map, "keys", transformToSubcolumn}},
{"mapValues", {TypeIndex::Map, "values", transformToSubcolumn}},
};
}
void RewriteFunctionToSubcolumnData::visit(ASTFunction & function, ASTPtr & ast) const
{
const auto & arguments = function.arguments->children;
if (arguments.size() != 1)
return;
const auto * identifier = arguments[0]->as<ASTIdentifier>();
if (!identifier)
return;
auto it = function_to_subcolumn.find(function.name);
if (it == function_to_subcolumn.end())
return;
const auto & [type_id, subcolumn_name, transformer] = it->second;
const auto & columns = metadata_snapshot->getColumns();
const auto & name_in_storage = identifier->name();
if (columns.has(name_in_storage)
&& columns.get(name_in_storage).type->getTypeId() == type_id)
{
ast = transformer(name_in_storage, subcolumn_name);
}
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <Parsers/ASTFunction.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Storages/StorageInMemoryMetadata.h>
namespace DB
{
/// Rewrites functions to subcolumns, if possible, to reduce amount of read data.
/// E.g. 'length(arr)' -> 'arr.size0', 'col IS NULL' -> 'col.null'
class RewriteFunctionToSubcolumnData
{
public:
using TypeToVisit = ASTFunction;
void visit(ASTFunction & function, ASTPtr & ast) const;
StorageMetadataPtr metadata_snapshot;
};
using RewriteFunctionToSubcolumnMatcher = OneTypeMatcher<RewriteFunctionToSubcolumnData>;
using RewriteFunctionToSubcolumnVisitor = InDepthNodeVisitor<RewriteFunctionToSubcolumnMatcher, true>;
}

View File

@ -543,6 +543,40 @@ BoolMask MergeTreeSetIndex::checkInRange(const std::vector<Range> & key_ranges,
auto left_lower = std::lower_bound(indices.begin(), indices.end(), left_point, less);
auto right_lower = std::lower_bound(indices.begin(), indices.end(), right_point, less);
/// A special case of 1-element KeyRange. It's useful for partition pruning
bool one_element_range = true;
for (size_t i = 0; i < tuple_size; ++i)
{
auto & left = left_point[i];
auto & right = right_point[i];
if (left.getType() == right.getType())
{
if (left.getType() == ValueWithInfinity::NORMAL)
{
if (0 != left.getColumnIfFinite().compareAt(0, 0, right.getColumnIfFinite(), 1))
{
one_element_range = false;
break;
}
}
}
else
{
one_element_range = false;
break;
}
}
if (one_element_range)
{
/// Here we know that there is one element in range.
/// The main difference with the normal case is that we can definitely say that
/// condition in this range always TRUE (can_be_false = 0) xor always FALSE (can_be_true = 0).
if (left_lower != indices.end() && equals(*left_lower, left_point))
return {true, false};
else
return {false, true};
}
return
{
left_lower != right_lower

View File

@ -79,6 +79,15 @@ bool StorageID::operator<(const StorageID & rhs) const
return !hasUUID();
}
bool StorageID::operator==(const StorageID & rhs) const
{
assertNotEmpty();
if (hasUUID() && rhs.hasUUID())
return uuid == rhs.uuid;
else
return std::tie(database_name, table_name) == std::tie(rhs.database_name, rhs.table_name);
}
String StorageID::getFullTableName() const
{
return backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name);

View File

@ -54,7 +54,7 @@ struct StorageID
String getNameForLogs() const;
operator bool () const
explicit operator bool () const
{
return !empty();
}
@ -70,6 +70,7 @@ struct StorageID
}
bool operator<(const StorageID & rhs) const;
bool operator==(const StorageID & rhs) const;
void assertNotEmpty() const
{

View File

@ -1,6 +1,7 @@
#include <Core/Settings.h>
#include <Interpreters/TreeOptimizer.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/OptimizeIfChains.h>
#include <Interpreters/OptimizeIfWithConstantConditionVisitor.h>
#include <Interpreters/ArithmeticOperationsInAgrFuncOptimize.h>
@ -14,6 +15,7 @@
#include <Interpreters/MonotonicityCheckVisitor.h>
#include <Interpreters/ConvertStringsToEnumVisitor.h>
#include <Interpreters/PredicateExpressionsOptimizer.h>
#include <Interpreters/RewriteFunctionToSubcolumnVisitor.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExternalDictionariesLoader.h>
@ -27,7 +29,7 @@
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Functions/FunctionFactory.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/IStorage.h>
#include <Interpreters/RewriteSumIfFunctionVisitor.h>
@ -579,6 +581,12 @@ void transformIfStringsIntoEnum(ASTPtr & query)
ConvertStringsToEnumVisitor(convert_data).visit(query);
}
void optimizeFunctionsToSubcolumns(ASTPtr & query, const StorageMetadataPtr & metadata_snapshot)
{
RewriteFunctionToSubcolumnVisitor::Data data{metadata_snapshot};
RewriteFunctionToSubcolumnVisitor(data).visit(query);
}
}
void TreeOptimizer::optimizeIf(ASTPtr & query, Aliases & aliases, bool if_chain_to_multiif)
@ -590,10 +598,8 @@ void TreeOptimizer::optimizeIf(ASTPtr & query, Aliases & aliases, bool if_chain_
OptimizeIfChainsVisitor().visit(query);
}
void TreeOptimizer::apply(ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set,
const std::vector<TableWithColumnNamesAndTypes> & tables_with_columns,
ContextConstPtr context, const StorageMetadataPtr & metadata_snapshot,
bool & rewrite_subqueries)
void TreeOptimizer::apply(ASTPtr & query, TreeRewriterResult & result,
const std::vector<TableWithColumnNamesAndTypes> & tables_with_columns, ContextConstPtr context)
{
const auto & settings = context->getSettingsRef();
@ -601,17 +607,21 @@ void TreeOptimizer::apply(ASTPtr & query, Aliases & aliases, const NameSet & sou
if (!select_query)
throw Exception("Select analyze for not select asts.", ErrorCodes::LOGICAL_ERROR);
optimizeIf(query, aliases, settings.optimize_if_chain_to_multiif);
if (settings.optimize_functions_to_subcolumns && result.storage
&& result.storage->supportsSubcolumns() && result.metadata_snapshot)
optimizeFunctionsToSubcolumns(query, result.metadata_snapshot);
optimizeIf(query, result.aliases, settings.optimize_if_chain_to_multiif);
/// Move arithmetic operations out of aggregation functions
if (settings.optimize_arithmetic_operations_in_aggregate_functions)
optimizeAggregationFunctions(query);
/// Push the predicate expression down to the subqueries.
rewrite_subqueries = PredicateExpressionsOptimizer(context, tables_with_columns, settings).optimize(*select_query);
result.rewrite_subqueries = PredicateExpressionsOptimizer(context, tables_with_columns, settings).optimize(*select_query);
/// GROUP BY injective function elimination.
optimizeGroupBy(select_query, source_columns_set, context);
optimizeGroupBy(select_query, result.source_columns_set, context);
/// GROUP BY functions of other keys elimination.
if (settings.optimize_group_by_function_keys)
@ -658,7 +668,7 @@ void TreeOptimizer::apply(ASTPtr & query, Aliases & aliases, const NameSet & sou
/// Replace monotonous functions with its argument
if (settings.optimize_monotonous_functions_in_order_by)
optimizeMonotonousFunctionsInOrderBy(select_query, context, tables_with_columns,
metadata_snapshot ? metadata_snapshot->getSortingKeyColumns() : Names{});
result.metadata_snapshot ? result.metadata_snapshot->getSortingKeyColumns() : Names{});
/// Remove duplicate items from ORDER BY.
/// Execute it after all order by optimizations,

View File

@ -8,8 +8,7 @@
namespace DB
{
struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
struct TreeRewriterResult;
/// Part of of Tree Rewriter (SyntaxAnalyzer) that optimizes AST.
/// Query should be ready to execute either before either after it. But resulting query could be faster.
@ -18,12 +17,9 @@ class TreeOptimizer
public:
static void apply(
ASTPtr & query,
Aliases & aliases,
const NameSet & source_columns_set,
TreeRewriterResult & result,
const std::vector<TableWithColumnNamesAndTypes> & tables_with_columns,
ContextConstPtr context,
const StorageMetadataPtr & metadata_snapshot,
bool & rewrite_subqueries);
ContextConstPtr context);
static void optimizeIf(ASTPtr & query, Aliases & aliases, bool if_chain_to_multiif);
};

View File

@ -913,7 +913,7 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect(
all_source_columns_set.insert(name);
}
normalize(query, result.aliases, all_source_columns_set, select_options.ignore_alias, settings);
normalize(query, result.aliases, all_source_columns_set, select_options.ignore_alias, settings, /* allow_self_aliases = */ true);
/// Remove unneeded columns according to 'required_result_columns'.
/// Leave all selected columns in case of DISTINCT; columns that contain arrayJoin function inside.
@ -924,8 +924,7 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect(
/// Executing scalar subqueries - replacing them with constant values.
executeScalarSubqueries(query, getContext(), subquery_depth, result.scalars, select_options.only_analyze);
TreeOptimizer::apply(
query, result.aliases, source_columns_set, tables_with_columns, getContext(), result.metadata_snapshot, result.rewrite_subqueries);
TreeOptimizer::apply(query, result, tables_with_columns, getContext());
/// array_join_alias_to_name, array_join_result_to_source.
getArrayJoinedColumns(query, result, select_query, result.source_columns, source_columns_set);
@ -959,7 +958,8 @@ TreeRewriterResultPtr TreeRewriter::analyze(
const NamesAndTypesList & source_columns,
ConstStoragePtr storage,
const StorageMetadataPtr & metadata_snapshot,
bool allow_aggregations) const
bool allow_aggregations,
bool allow_self_aliases) const
{
if (query->as<ASTSelectQuery>())
throw Exception("Not select analyze for select asts.", ErrorCodes::LOGICAL_ERROR);
@ -968,7 +968,7 @@ TreeRewriterResultPtr TreeRewriter::analyze(
TreeRewriterResult result(source_columns, storage, metadata_snapshot, false);
normalize(query, result.aliases, result.source_columns_set, false, settings);
normalize(query, result.aliases, result.source_columns_set, false, settings, allow_self_aliases);
/// Executing scalar subqueries. Column defaults could be a scalar subquery.
executeScalarSubqueries(query, getContext(), 0, result.scalars, false);
@ -994,7 +994,7 @@ TreeRewriterResultPtr TreeRewriter::analyze(
}
void TreeRewriter::normalize(
ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set, bool ignore_alias, const Settings & settings)
ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set, bool ignore_alias, const Settings & settings, bool allow_self_aliases)
{
CustomizeCountDistinctVisitor::Data data_count_distinct{settings.count_distinct_implementation};
CustomizeCountDistinctVisitor(data_count_distinct).visit(query);
@ -1054,7 +1054,7 @@ void TreeRewriter::normalize(
FunctionNameNormalizer().visit(query.get());
/// Common subexpression elimination. Rewrite rules.
QueryNormalizer::Data normalizer_data(aliases, source_columns_set, ignore_alias, settings);
QueryNormalizer::Data normalizer_data(aliases, source_columns_set, ignore_alias, settings, allow_self_aliases);
QueryNormalizer(normalizer_data).visit(query);
}

View File

@ -103,7 +103,8 @@ public:
const NamesAndTypesList & source_columns_,
ConstStoragePtr storage = {},
const StorageMetadataPtr & metadata_snapshot = {},
bool allow_aggregations = false) const;
bool allow_aggregations = false,
bool allow_self_aliases = true) const;
/// Analyze and rewrite select query
TreeRewriterResultPtr analyzeSelect(
@ -115,7 +116,7 @@ public:
std::shared_ptr<TableJoin> table_join = {}) const;
private:
static void normalize(ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set, bool ignore_alias, const Settings & settings);
static void normalize(ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set, bool ignore_alias, const Settings & settings, bool allow_self_aliases);
};
}

View File

@ -224,8 +224,9 @@ namespace
}
}
}
else if (const auto * tuple_literal = right->as<ASTLiteral>();
tuple_literal && tuple_literal->value.getType() == Field::Types::Tuple)
else if (const auto * tuple_literal = right->as<ASTLiteral>(); tuple_literal)
{
if (tuple_literal->value.getType() == Field::Types::Tuple)
{
const auto & tuple = tuple_literal->value.get<const Tuple &>();
for (const auto & child : tuple)
@ -243,6 +244,9 @@ namespace
}
}
}
else
return analyzeEquals(identifier, tuple_literal, expr);
}
else
{
return {};

View File

@ -9,6 +9,21 @@
using namespace DB;
TEST(QueryNormalizer, SimpleLoopAlias)
{
String query = "a as a";
ParserExpressionList parser(false);
ASTPtr ast = parseQuery(parser, query, 0, 0);
Aliases aliases;
aliases["a"] = parseQuery(parser, "a as a", 0, 0)->children[0];
Settings settings;
QueryNormalizer::Data normalizer_data(aliases, {}, false, settings, false);
EXPECT_THROW(QueryNormalizer(normalizer_data).visit(ast), Exception);
}
TEST(QueryNormalizer, SimpleCycleAlias)
{
String query = "a as b, b as a";
@ -20,6 +35,6 @@ TEST(QueryNormalizer, SimpleCycleAlias)
aliases["b"] = parseQuery(parser, "a as b", 0, 0)->children[0];
Settings settings;
QueryNormalizer::Data normalizer_data(aliases, {}, false, settings);
QueryNormalizer::Data normalizer_data(aliases, {}, false, settings, true);
EXPECT_THROW(QueryNormalizer(normalizer_data).visit(ast), Exception);
}

View File

@ -1398,7 +1398,7 @@ void TCPHandler::initBlockOutput(const Block & block)
if (state.compression == Protocol::Compression::Enable)
{
CompressionCodecFactory::instance().validateCodec(method, level, !query_settings.allow_suspicious_codecs);
CompressionCodecFactory::instance().validateCodec(method, level, !query_settings.allow_suspicious_codecs, query_settings.allow_experimental_codecs);
state.maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(
*out, CompressionCodecFactory::instance().get(method, level));

View File

@ -348,7 +348,7 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
column.comment = *comment;
if (codec)
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type, false);
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type, false, true);
column.ttl = ttl;
@ -389,7 +389,7 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
else
{
if (codec)
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type ? data_type : column.type, false);
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type ? data_type : column.type, false, true);
if (comment)
column.comment = *comment;
@ -995,7 +995,7 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, ContextPt
ErrorCodes::BAD_ARGUMENTS};
if (command.codec)
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs);
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
all_columns.add(ColumnDescription(column_name, command.data_type));
}
@ -1015,7 +1015,7 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, ContextPt
ErrorCodes::NOT_IMPLEMENTED};
if (command.codec)
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs);
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
auto column_default = all_columns.getDefault(column_name);
if (column_default)
{

View File

@ -128,7 +128,7 @@ void ColumnDescription::readText(ReadBuffer & buf)
comment = col_ast->comment->as<ASTLiteral &>().value.get<String>();
if (col_ast->codec)
codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(col_ast->codec, type, false);
codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(col_ast->codec, type, false, true);
if (col_ast->ttl)
ttl = col_ast->ttl;
@ -590,7 +590,7 @@ Block validateColumnsDefaultsAndGetSampleBlock(ASTPtr default_expr_list, const N
try
{
auto syntax_analyzer_result = TreeRewriter(context).analyze(default_expr_list, all_columns);
auto syntax_analyzer_result = TreeRewriter(context).analyze(default_expr_list, all_columns, {}, {}, false, /* allow_self_aliases = */ false);
const auto actions = ExpressionAnalyzer(default_expr_list, syntax_analyzer_result, context).getActions(true);
for (const auto & action : actions->getActions())
if (action.node->type == ActionsDAG::ActionType::ARRAY_JOIN)

View File

@ -632,7 +632,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
if (compression_method == "ZSTD")
compression_level = settings.network_zstd_compression_level;
CompressionCodecFactory::instance().validateCodec(compression_method, compression_level, !settings.allow_suspicious_codecs);
CompressionCodecFactory::instance().validateCodec(compression_method, compression_level, !settings.allow_suspicious_codecs, settings.allow_experimental_codecs);
CompressionCodecPtr compression_codec = CompressionCodecFactory::instance().get(compression_method, compression_level);
/// tmp directory is used to ensure atomicity of transactions

View File

@ -8,9 +8,8 @@
namespace DB
{
MergeListElement::MergeListElement(const std::string & database_, const std::string & table_, const FutureMergedMutatedPart & future_part)
: database{database_}
, table{table_}
MergeListElement::MergeListElement(const StorageID & table_id_, const FutureMergedMutatedPart & future_part)
: table_id{table_id_}
, partition_id{future_part.part_info.partition_id}
, result_part_name{future_part.name}
, result_part_path{future_part.path}
@ -60,8 +59,8 @@ MergeListElement::MergeListElement(const std::string & database_, const std::str
MergeInfo MergeListElement::getInfo() const
{
MergeInfo res;
res.database = database;
res.table = table;
res.database = table_id.getDatabaseName();
res.table = table_id.getTableName();
res.result_part_name = result_part_name;
res.result_part_path = result_part_path;
res.partition_id = partition_id;

View File

@ -8,6 +8,7 @@
#include <Storages/MergeTree/MergeType.h>
#include <Storages/MergeTree/MergeAlgorithm.h>
#include <Storages/MergeTree/BackgroundProcessList.h>
#include <Interpreters/StorageID.h>
#include <boost/noncopyable.hpp>
#include <memory>
#include <list>
@ -54,8 +55,7 @@ struct FutureMergedMutatedPart;
struct MergeListElement : boost::noncopyable
{
const std::string database;
const std::string table;
const StorageID table_id;
std::string partition_id;
const std::string result_part_name;
@ -94,7 +94,7 @@ struct MergeListElement : boost::noncopyable
/// Detected after merge already started
std::atomic<MergeAlgorithm> merge_algorithm;
MergeListElement(const std::string & database, const std::string & table, const FutureMergedMutatedPart & future_part);
MergeListElement(const StorageID & table_id_, const FutureMergedMutatedPart & future_part);
MergeInfo getInfo() const;
@ -122,12 +122,13 @@ public:
--merges_with_ttl_counter;
}
void cancelPartMutations(const String & partition_id, Int64 mutation_version)
void cancelPartMutations(const StorageID & table_id, const String & partition_id, Int64 mutation_version)
{
std::lock_guard lock{mutex};
for (auto & merge_element : entries)
{
if ((partition_id.empty() || merge_element.partition_id == partition_id)
&& merge_element.table_id == table_id
&& merge_element.source_data_version < mutation_version
&& merge_element.result_data_version >= mutation_version)
merge_element.is_cancelled = true;

View File

@ -234,7 +234,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
select.setExpression(ASTSelectQuery::Expression::WHERE, given_select.where()->clone());
if (given_select.prewhere())
select.setExpression(ASTSelectQuery::Expression::WHERE, given_select.prewhere()->clone());
// TODO will row policy filter work?
// After overriding the group by clause, we finish the possible aggregations directly
if (processed_stage >= QueryProcessingStage::Enum::WithMergeableState && given_select.groupBy())

View File

@ -189,7 +189,12 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
if (action_type == ActionType::ADD_PART)
{
MergedBlockOutputStream part_out(part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {}));
MergedBlockOutputStream part_out(
part,
metadata_snapshot,
block.getNamesAndTypesList(),
{},
CompressionCodecFactory::instance().get("NONE", {}));
part->minmax_idx.update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
part->partition.create(metadata_snapshot, block, 0, context);

View File

@ -441,10 +441,6 @@ Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormat
if (type == DROP_RANGE)
return {new_part_name};
/// CLEAR_COLUMN and CLEAR_INDEX are deprecated since 20.3
if (type == CLEAR_COLUMN || type == CLEAR_INDEX)
return {};
if (type == REPLACE_RANGE)
{
Strings res = replace_range_entry->new_part_names;

View File

@ -140,18 +140,6 @@ struct ReplicatedMergeTreeLogEntryData
/// selection of merges. These parts are added to queue.virtual_parts.
Strings getVirtualPartNames(MergeTreeDataFormatVersion format_version) const;
/// Returns set of parts that denote the block number ranges that should be blocked during the entry execution.
/// These parts are added to future_parts.
Strings getBlockingPartNames(MergeTreeDataFormatVersion format_version) const
{
Strings res = getVirtualPartNames(format_version);
if (type == CLEAR_COLUMN)
res.emplace_back(new_part_name);
return res;
}
/// Returns fake part for drop range (for DROP_RANGE and REPLACE_RANGE)
std::optional<String> getDropRange(MergeTreeDataFormatVersion format_version) const;

View File

@ -1024,17 +1024,11 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
/// If our entry produce part which is already covered by
/// some other entry which is currently executing, then we can postpone this entry.
if (entry.type == LogEntry::MERGE_PARTS
|| entry.type == LogEntry::GET_PART
|| entry.type == LogEntry::ATTACH_PART
|| entry.type == LogEntry::MUTATE_PART)
{
for (const String & new_part_name : entry.getBlockingPartNames(format_version))
for (const String & new_part_name : entry.getVirtualPartNames(format_version))
{
if (!isNotCoveredByFuturePartsImpl(entry.znode_name, new_part_name, out_postpone_reason, state_lock))
return false;
}
}
/// Check that fetches pool is not overloaded
if ((entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART)
@ -1247,7 +1241,7 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(const Replicate
++entry->num_tries;
entry->last_attempt_time = time(nullptr);
for (const String & new_part_name : entry->getBlockingPartNames(queue.format_version))
for (const String & new_part_name : entry->getVirtualPartNames(queue.format_version))
{
if (!queue.future_parts.emplace(new_part_name, entry).second)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tagging already tagged future part {}. This is a bug. "
@ -1288,7 +1282,7 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting()
entry->currently_executing = false;
entry->execution_complete.notify_all();
for (const String & new_part_name : entry->getBlockingPartNames(queue.format_version))
for (const String & new_part_name : entry->getVirtualPartNames(queue.format_version))
{
if (!queue.future_parts.erase(new_part_name))
{

View File

@ -609,7 +609,7 @@ void StorageDistributed::read(
ClusterProxy::executeQuery(query_plan, select_stream_factory, log,
modified_query_ast, local_context, query_info,
sharding_key_expr, sharding_key_column_name,
getCluster());
query_info.cluster);
/// This is a bug, it is possible only when there is no shards to query, and this is handled earlier.
if (!query_plan.isInitialized())
@ -1042,7 +1042,7 @@ ClusterPtr StorageDistributed::skipUnusedShards(
if (!limit)
{
LOG_TRACE(log,
LOG_DEBUG(log,
"Number of values for sharding key exceeds optimize_skip_unused_shards_limit={}, "
"try to increase it, but note that this may increase query processing time.",
local_context->getSettingsRef().optimize_skip_unused_shards_limit);

View File

@ -625,7 +625,7 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
if (!to_kill)
return CancellationCode::NotFound;
getContext()->getMergeList().cancelPartMutations({}, to_kill->block_number);
getContext()->getMergeList().cancelPartMutations(getStorageID(), {}, to_kill->block_number);
to_kill->removeFile();
LOG_TRACE(log, "Cancelled part mutations and removed mutation file {}", mutation_id);
{
@ -817,9 +817,8 @@ bool StorageMergeTree::mergeSelectedParts(
auto & future_part = merge_mutate_entry.future_part;
Stopwatch stopwatch;
MutableDataPartPtr new_part;
auto table_id = getStorageID();
auto merge_list_entry = getContext()->getMergeList().insert(table_id.database_name, table_id.table_name, future_part);
auto merge_list_entry = getContext()->getMergeList().insert(getStorageID(), future_part);
auto write_part_log = [&] (const ExecutionStatus & execution_status)
{
@ -964,9 +963,8 @@ std::shared_ptr<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::se
bool StorageMergeTree::mutateSelectedPart(const StorageMetadataPtr & metadata_snapshot, MergeMutateSelectedEntry & merge_mutate_entry, TableLockHolder & table_lock_holder)
{
auto & future_part = merge_mutate_entry.future_part;
auto table_id = getStorageID();
auto merge_list_entry = getContext()->getMergeList().insert(table_id.database_name, table_id.table_name, future_part);
auto merge_list_entry = getContext()->getMergeList().insert(getStorageID(), future_part);
Stopwatch stopwatch;
MutableDataPartPtr new_part;

View File

@ -1726,7 +1726,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
auto table_id = getStorageID();
/// Add merge to list
MergeList::EntryPtr merge_entry = getContext()->getMergeList().insert(table_id.database_name, table_id.table_name, future_merged_part);
MergeList::EntryPtr merge_entry = getContext()->getMergeList().insert(getStorageID(), future_merged_part);
Transaction transaction(*this);
MutableDataPartPtr part;
@ -1871,9 +1871,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
future_mutated_part.updatePath(*this, reserved_space);
future_mutated_part.type = source_part->getType();
auto table_id = getStorageID();
MergeList::EntryPtr merge_entry = getContext()->getMergeList().insert(
table_id.database_name, table_id.table_name, future_mutated_part);
MergeList::EntryPtr merge_entry = getContext()->getMergeList().insert(getStorageID(), future_mutated_part);
Stopwatch stopwatch;
@ -2647,24 +2645,6 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
{
String source_path = fs::path(zookeeper_path) / "replicas" / source_replica;
/** TODO: it will be deleted! (It is only to support old version of CH server).
* In current code, the replica is created in single transaction.
* If the reference/master replica is not yet fully created, let's wait.
*/
while (!zookeeper->exists(fs::path(source_path) / "columns"))
{
LOG_INFO(log, "Waiting for replica {} to be fully created", source_path);
zkutil::EventPtr event = std::make_shared<Poco::Event>();
if (zookeeper->exists(fs::path(source_path) / "columns", nullptr, event))
{
LOG_WARNING(log, "Oops, a watch has leaked");
break;
}
event->wait();
}
/// The order of the following three actions is important.
Strings source_queue_names;
@ -5375,56 +5355,6 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry(
event->tryWait(event_wait_timeout_ms);
}
}
else if (startsWith(entry.znode_name, "queue-"))
{
/** In this case, the number of `log` node is unknown. You need look through everything from `log_pointer` to the end,
* looking for a node with the same content. And if we do not find it - then the replica has already taken this entry in its queue.
*/
String log_pointer = getZooKeeper()->get(fs::path(table_zookeeper_path) / "replicas" / replica / "log_pointer");
Strings log_entries = getZooKeeper()->getChildren(fs::path(table_zookeeper_path) / "log");
UInt64 log_index = 0;
bool found = false;
for (const String & log_entry_name : log_entries)
{
log_index = parse<UInt64>(log_entry_name.substr(log_entry_name.size() - 10));
if (!log_pointer.empty() && log_index < parse<UInt64>(log_pointer))
continue;
String log_entry_str;
bool exists = getZooKeeper()->tryGet(fs::path(table_zookeeper_path) / "log" / log_entry_name, log_entry_str);
if (exists && entry_str == log_entry_str)
{
found = true;
log_node_name = log_entry_name;
break;
}
}
if (found)
{
LOG_DEBUG(log, "Waiting for {} to pull {} to queue", replica, log_node_name);
/// Let's wait until the entry gets into the replica queue.
while (!stop_waiting())
{
zkutil::EventPtr event = std::make_shared<Poco::Event>();
String log_pointer_new = getZooKeeper()->get(fs::path(table_zookeeper_path) / "replicas" / replica / "log_pointer", nullptr, event);
if (!log_pointer_new.empty() && parse<UInt64>(log_pointer_new) > log_index)
break;
/// Wait with timeout because we can be already shut down, but not dropped.
/// So log_pointer node will exist, but we will never update it because all background threads already stopped.
/// It can lead to query hung because table drop query can wait for some query (alter, optimize, etc) which called this method,
/// but the query will never finish because the drop already shut down the table.
event->tryWait(event_wait_timeout_ms);
}
}
}
else
throw Exception("Logical error: unexpected name of log node: " + entry.znode_name, ErrorCodes::LOGICAL_ERROR);
@ -6002,7 +5932,7 @@ CancellationCode StorageReplicatedMergeTree::killMutation(const String & mutatio
{
const String & partition_id = pair.first;
Int64 block_number = pair.second;
getContext()->getMergeList().cancelPartMutations(partition_id, block_number);
getContext()->getMergeList().cancelPartMutations(getStorageID(), partition_id, block_number);
}
return CancellationCode::CancelSent;
}

View File

@ -3,8 +3,9 @@
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeArray.h>
#include <Storages/System/StorageSystemErrors.h>
#include <Common/ErrorCodes.h>
#include <Interpreters/Context.h>
#include <Common/ErrorCodes.h>
namespace DB
{

View File

@ -289,7 +289,7 @@ TTLDescription TTLDescription::getTTLFromAST(
{
result.recompression_codec =
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(
ttl_element->recompression_codec, {}, !context->getSettingsRef().allow_suspicious_codecs);
ttl_element->recompression_codec, {}, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
}
}

Some files were not shown because too many files have changed in this diff Show More