Merge remote-tracking branch 'origin/master' into fix-wrong-code-in-aggregate-functions

This commit is contained in:
Alexey Milovidov 2021-06-08 03:08:03 +03:00
commit d6513b22cb
111 changed files with 953 additions and 739 deletions

1
.gitmodules vendored
View File

@ -228,7 +228,6 @@
[submodule "contrib/datasketches-cpp"] [submodule "contrib/datasketches-cpp"]
path = contrib/datasketches-cpp path = contrib/datasketches-cpp
url = https://github.com/ClickHouse-Extras/datasketches-cpp.git url = https://github.com/ClickHouse-Extras/datasketches-cpp.git
[submodule "contrib/yaml-cpp"] [submodule "contrib/yaml-cpp"]
path = contrib/yaml-cpp path = contrib/yaml-cpp
url = https://github.com/ClickHouse-Extras/yaml-cpp.git url = https://github.com/ClickHouse-Extras/yaml-cpp.git

View File

@ -528,7 +528,6 @@ include (cmake/find/libpqxx.cmake)
include (cmake/find/nuraft.cmake) include (cmake/find/nuraft.cmake)
include (cmake/find/yaml-cpp.cmake) include (cmake/find/yaml-cpp.cmake)
if(NOT USE_INTERNAL_PARQUET_LIBRARY) if(NOT USE_INTERNAL_PARQUET_LIBRARY)
set (ENABLE_ORC OFF CACHE INTERNAL "") set (ENABLE_ORC OFF CACHE INTERNAL "")
endif() endif()

View File

@ -4,6 +4,6 @@ if (NOT USE_YAML_CPP)
return() return()
endif() endif()
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/yaml-cpp") if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/yaml-cpp/README.md")
message (ERROR "submodule contrib/yaml-cpp is missing. to fix try run: \n git submodule update --init --recursive") message (ERROR "submodule contrib/yaml-cpp is missing. to fix try run: \n git submodule update --init --recursive")
endif() endif()

View File

@ -61,7 +61,6 @@ endif()
add_subdirectory (poco-cmake) add_subdirectory (poco-cmake)
add_subdirectory (croaring-cmake) add_subdirectory (croaring-cmake)
# TODO: refactor the contrib libraries below this comment. # TODO: refactor the contrib libraries below this comment.
if (USE_INTERNAL_ZSTD_LIBRARY) if (USE_INTERNAL_ZSTD_LIBRARY)

2
contrib/avro vendored

@ -1 +1 @@
Subproject commit 1ee16d8c5a7808acff5cf0475f771195d9aa3faa Subproject commit e43c46e87fd32eafdc09471e95344555454c5ef8

View File

@ -1,6 +1,6 @@
if (SANITIZE OR NOT ( if (SANITIZE OR NOT (
((OS_LINUX OR OS_FREEBSD) AND (ARCH_AMD64 OR ARCH_ARM OR ARCH_PPC64LE)) OR ((OS_LINUX OR OS_FREEBSD) AND (ARCH_AMD64 OR ARCH_ARM OR ARCH_PPC64LE)) OR
(OS_DARWIN AND CMAKE_BUILD_TYPE STREQUAL "RelWithDebInfo") (OS_DARWIN AND (CMAKE_BUILD_TYPE STREQUAL "RelWithDebInfo" OR CMAKE_BUILD_TYPE STREQUAL "Debug"))
)) ))
if (ENABLE_JEMALLOC) if (ENABLE_JEMALLOC)
message (${RECONFIGURE_MESSAGE_LEVEL} message (${RECONFIGURE_MESSAGE_LEVEL}

View File

@ -22,7 +22,7 @@ services:
entrypoint: /etc/bootstrap.sh -d entrypoint: /etc/bootstrap.sh -d
hdfskerberos: hdfskerberos:
image: yandex/clickhouse-kerberos-kdc:${DOCKER_KERBEROS_KDC_TAG} image: yandex/clickhouse-kerberos-kdc:${DOCKER_KERBEROS_KDC_TAG:-latest}
hostname: hdfskerberos hostname: hdfskerberos
volumes: volumes:
- ${KERBERIZED_HDFS_DIR}/secrets:/tmp/keytab - ${KERBERIZED_HDFS_DIR}/secrets:/tmp/keytab

View File

@ -18,6 +18,8 @@ To apply a CatBoost model in ClickHouse:
For more information about training CatBoost models, see [Training and applying models](https://catboost.ai/docs/features/training.html#training). For more information about training CatBoost models, see [Training and applying models](https://catboost.ai/docs/features/training.html#training).
You can reload CatBoost models if the configuration was updated without restarting the server using [RELOAD MODEL](../sql-reference/statements/system.md#query_language-system-reload-model) and [RELOAD MODELS](../sql-reference/statements/system.md#query_language-system-reload-models) system queries.
## Prerequisites {#prerequisites} ## Prerequisites {#prerequisites}
If you do not have the [Docker](https://docs.docker.com/install/) yet, install it. If you do not have the [Docker](https://docs.docker.com/install/) yet, install it.

View File

@ -265,7 +265,7 @@ Result:
``` ```
!!! attention "Attention" !!! attention "Attention"
The return type `toStartOf*` functions described below is `Date` or `DateTime`. Though these functions can take `DateTime64` as an argument, passing them a `DateTime64` that is out of normal range (years 1970 - 2105) will give incorrect result. The return type `toStartOf*` functions described below is `Date` or `DateTime`. Though these functions can take `DateTime64` as an argument, passing them a `DateTime64` that is out of the normal range (years 1925 - 2283) will give an incorrect result.
## toStartOfYear {#tostartofyear} ## toStartOfYear {#tostartofyear}

View File

@ -6,7 +6,7 @@ toc_title: RENAME
# RENAME Statement {#misc_operations-rename} # RENAME Statement {#misc_operations-rename}
## RENAME DATABASE {#misc_operations-rename_database} ## RENAME DATABASE {#misc_operations-rename_database}
Renames database, support only for Atomic database engine Renames database, it is supported only for Atomic database engine.
``` ```
RENAME DATABASE atomic_database1 TO atomic_database2 [ON CLUSTER cluster] RENAME DATABASE atomic_database1 TO atomic_database2 [ON CLUSTER cluster]

View File

@ -10,6 +10,8 @@ The list of available `SYSTEM` statements:
- [RELOAD EMBEDDED DICTIONARIES](#query_language-system-reload-emdedded-dictionaries) - [RELOAD EMBEDDED DICTIONARIES](#query_language-system-reload-emdedded-dictionaries)
- [RELOAD DICTIONARIES](#query_language-system-reload-dictionaries) - [RELOAD DICTIONARIES](#query_language-system-reload-dictionaries)
- [RELOAD DICTIONARY](#query_language-system-reload-dictionary) - [RELOAD DICTIONARY](#query_language-system-reload-dictionary)
- [RELOAD MODELS](#query_language-system-reload-models)
- [RELOAD MODEL](#query_language-system-reload-model)
- [DROP DNS CACHE](#query_language-system-drop-dns-cache) - [DROP DNS CACHE](#query_language-system-drop-dns-cache)
- [DROP MARK CACHE](#query_language-system-drop-mark-cache) - [DROP MARK CACHE](#query_language-system-drop-mark-cache)
- [DROP UNCOMPRESSED CACHE](#query_language-system-drop-uncompressed-cache) - [DROP UNCOMPRESSED CACHE](#query_language-system-drop-uncompressed-cache)
@ -60,6 +62,26 @@ The status of the dictionary can be checked by querying the `system.dictionaries
SELECT name, status FROM system.dictionaries; SELECT name, status FROM system.dictionaries;
``` ```
## RELOAD MODELS {#query_language-system-reload-models}
Reloads all [CatBoost](../../guides/apply-catboost-model.md#applying-catboost-model-in-clickhouse) models if the configuration was updated without restarting the server.
**Syntax**
```sql
SYSTEM RELOAD MODELS
```
## RELOAD MODEL {#query_language-system-reload-model}
Completely reloads a CatBoost model `model_name` if the configuration was updated without restarting the server.
**Syntax**
```sql
SYSTEM RELOAD MODEL <model_name>
```
## DROP DNS CACHE {#query_language-system-drop-dns-cache} ## DROP DNS CACHE {#query_language-system-drop-dns-cache}
Resets ClickHouses internal DNS cache. Sometimes (for old ClickHouse versions) it is necessary to use this command when changing the infrastructure (changing the IP address of another ClickHouse server or the server used by dictionaries). Resets ClickHouses internal DNS cache. Sometimes (for old ClickHouse versions) it is necessary to use this command when changing the infrastructure (changing the IP address of another ClickHouse server or the server used by dictionaries).

View File

@ -114,14 +114,14 @@ FROM s3('https://storage.yandexcloud.net/my-test-bucket-768/big_prefix/file-{000
Insert data into file `test-data.csv.gz`: Insert data into file `test-data.csv.gz`:
``` sql ``` sql
INSERT INTO s3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip') INSERT INTO FUNCTION s3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
VALUES ('test-data', 1), ('test-data-2', 2); VALUES ('test-data', 1), ('test-data-2', 2);
``` ```
Insert data into file `test-data.csv.gz` from existing table: Insert data into file `test-data.csv.gz` from existing table:
``` sql ``` sql
INSERT INTO s3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip') INSERT INTO FUNCTION s3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
SELECT name, value FROM existing_table; SELECT name, value FROM existing_table;
``` ```

View File

@ -6,9 +6,7 @@ toc_title: Window Functions
# [experimental] Window Functions # [experimental] Window Functions
!!! warning "Warning" !!! warning "Warning"
This is an experimental feature that is currently in development and is not ready This is an experimental feature that is currently in development and is not ready for general use. It will change in unpredictable backwards-incompatible ways in the future releases. Set `allow_experimental_window_functions = 1` to enable it.
for general use. It will change in unpredictable backwards-incompatible ways in
the future releases. Set `allow_experimental_window_functions = 1` to enable it.
ClickHouse supports the standard grammar for defining windows and window functions. The following features are currently supported: ClickHouse supports the standard grammar for defining windows and window functions. The following features are currently supported:

View File

@ -18,6 +18,8 @@ toc_title: "Применение модели CatBoost в ClickHouse"
Подробнее об обучении моделей в CatBoost, см. [Обучение и применение моделей](https://catboost.ai/docs/features/training.html#training). Подробнее об обучении моделей в CatBoost, см. [Обучение и применение моделей](https://catboost.ai/docs/features/training.html#training).
Вы можете перегрузить модели CatBoost, если их конфигурация была обновлена, без перезагрузки сервера. Для этого используйте системные запросы [RELOAD MODEL](../sql-reference/statements/system.md#query_language-system-reload-model) и [RELOAD MODELS](../sql-reference/statements/system.md#query_language-system-reload-models).
## Перед началом работы {#prerequisites} ## Перед началом работы {#prerequisites}
Если у вас еще нет [Docker](https://docs.docker.com/install/), установите его. Если у вас еще нет [Docker](https://docs.docker.com/install/), установите его.

View File

@ -264,6 +264,9 @@ SELECT toUnixTimestamp('2017-11-05 08:07:47', 'Asia/Tokyo') AS unix_timestamp;
└────────────────┘ └────────────────┘
``` ```
!!! attention "Attention"
`Date` или `DateTime` это возвращаемый тип функций `toStartOf*`, который описан ниже. Несмотря на то, что эти функции могут принимать `DateTime64` в качестве аргумента, если переданное значение типа `DateTime64` выходит за пределы нормального диапазона (с 1925 по 2283 год), то это даст неверный результат.
## toStartOfYear {#tostartofyear} ## toStartOfYear {#tostartofyear}
Округляет дату или дату-с-временем вниз до первого дня года. Округляет дату или дату-с-временем вниз до первого дня года.

View File

@ -6,7 +6,7 @@ toc_title: RENAME
# RENAME Statement {#misc_operations-rename} # RENAME Statement {#misc_operations-rename}
## RENAME DATABASE {#misc_operations-rename_database} ## RENAME DATABASE {#misc_operations-rename_database}
Переименование базы данных Переименовывает базу данных, поддерживается только для движка базы данных Atomic.
``` ```
RENAME DATABASE atomic_database1 TO atomic_database2 [ON CLUSTER cluster] RENAME DATABASE atomic_database1 TO atomic_database2 [ON CLUSTER cluster]

View File

@ -8,6 +8,8 @@ toc_title: SYSTEM
- [RELOAD EMBEDDED DICTIONARIES](#query_language-system-reload-emdedded-dictionaries) - [RELOAD EMBEDDED DICTIONARIES](#query_language-system-reload-emdedded-dictionaries)
- [RELOAD DICTIONARIES](#query_language-system-reload-dictionaries) - [RELOAD DICTIONARIES](#query_language-system-reload-dictionaries)
- [RELOAD DICTIONARY](#query_language-system-reload-dictionary) - [RELOAD DICTIONARY](#query_language-system-reload-dictionary)
- [RELOAD MODELS](#query_language-system-reload-models)
- [RELOAD MODEL](#query_language-system-reload-model)
- [DROP DNS CACHE](#query_language-system-drop-dns-cache) - [DROP DNS CACHE](#query_language-system-drop-dns-cache)
- [DROP MARK CACHE](#query_language-system-drop-mark-cache) - [DROP MARK CACHE](#query_language-system-drop-mark-cache)
- [DROP UNCOMPRESSED CACHE](#query_language-system-drop-uncompressed-cache) - [DROP UNCOMPRESSED CACHE](#query_language-system-drop-uncompressed-cache)
@ -37,7 +39,7 @@ toc_title: SYSTEM
- [RESTART REPLICAS](#query_language-system-restart-replicas) - [RESTART REPLICAS](#query_language-system-restart-replicas)
## RELOAD EMBEDDED DICTIONARIES] {#query_language-system-reload-emdedded-dictionaries} ## RELOAD EMBEDDED DICTIONARIES] {#query_language-system-reload-emdedded-dictionaries}
Перегружет все [Встроенные словари](../dictionaries/internal-dicts.md). Перегружает все [Встроенные словари](../dictionaries/internal-dicts.md).
По умолчанию встроенные словари выключены. По умолчанию встроенные словари выключены.
Всегда возвращает `Ok.`, вне зависимости от результата обновления встроенных словарей. Всегда возвращает `Ok.`, вне зависимости от результата обновления встроенных словарей.
@ -57,6 +59,26 @@ toc_title: SYSTEM
SELECT name, status FROM system.dictionaries; SELECT name, status FROM system.dictionaries;
``` ```
## RELOAD MODELS {#query_language-system-reload-models}
Перегружает все модели [CatBoost](../../guides/apply-catboost-model.md#applying-catboost-model-in-clickhouse), если их конфигурация была обновлена, без перезагрузки сервера.
**Синтаксис**
```sql
SYSTEM RELOAD MODELS
```
## RELOAD MODEL {#query_language-system-reload-model}
Полностью перегружает модель [CatBoost](../../guides/apply-catboost-model.md#applying-catboost-model-in-clickhouse) `model_name`, если ее конфигурация была обновлена, без перезагрузки сервера.
**Синтаксис**
```sql
SYSTEM RELOAD MODEL <model_name>
```
## DROP DNS CACHE {#query_language-system-drop-dns-cache} ## DROP DNS CACHE {#query_language-system-drop-dns-cache}
Сбрасывает внутренний DNS кеш ClickHouse. Иногда (для старых версий ClickHouse) необходимо использовать эту команду при изменении инфраструктуры (смене IP адреса у другого ClickHouse сервера или сервера, используемого словарями). Сбрасывает внутренний DNS кеш ClickHouse. Иногда (для старых версий ClickHouse) необходимо использовать эту команду при изменении инфраструктуры (смене IP адреса у другого ClickHouse сервера или сервера, используемого словарями).

View File

@ -705,6 +705,8 @@
"yandex.ru" -> "yandex.ru:443", "yandex.ru:80" etc. is allowed, but "yandex.ru:80" -> only "yandex.ru:80" is allowed. "yandex.ru" -> "yandex.ru:443", "yandex.ru:80" etc. is allowed, but "yandex.ru:80" -> only "yandex.ru:80" is allowed.
If the host is specified as IP address, it is checked as specified in URL. Example: "[2a02:6b8:a::a]". If the host is specified as IP address, it is checked as specified in URL. Example: "[2a02:6b8:a::a]".
If there are redirects and support for redirects is enabled, every redirect (the Location field) is checked. If there are redirects and support for redirects is enabled, every redirect (the Location field) is checked.
Host should be specified using the host xml tag:
<host>yandex.ru</host>
--> -->
<!-- Regular expression can be specified. RE2 engine is used for regexps. <!-- Regular expression can be specified. RE2 engine is used for regexps.

View File

@ -1,5 +1,5 @@
#include <AggregateFunctions/AggregateFunctionFactory.h> #include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionSegmentLengthSum.h> #include <AggregateFunctions/AggregateFunctionIntervalLengthSum.h>
#include <AggregateFunctions/FactoryHelpers.h> #include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/Helpers.h> #include <AggregateFunctions/Helpers.h>
#include <DataTypes/DataTypeDate.h> #include <DataTypes/DataTypeDate.h>
@ -22,7 +22,7 @@ namespace
{ {
template <template <typename> class Data> template <template <typename> class Data>
AggregateFunctionPtr AggregateFunctionPtr
createAggregateFunctionSegmentLengthSum(const std::string & name, const DataTypes & arguments, const Array &, const Settings *) createAggregateFunctionIntervalLengthSum(const std::string & name, const DataTypes & arguments, const Array &, const Settings *)
{ {
if (arguments.size() != 2) if (arguments.size() != 2)
throw Exception( throw Exception(
@ -32,35 +32,35 @@ namespace
if (WhichDataType{args.begin()[0]}.idx != WhichDataType{args.begin()[1]}.idx) if (WhichDataType{args.begin()[0]}.idx != WhichDataType{args.begin()[1]}.idx)
throw Exception( throw Exception(
"Illegal type " + args.begin()[0]->getName() + " and " + args.begin()[1]->getName() + " of arguments of aggregate function " "Illegal types " + args.begin()[0]->getName() + " and " + args.begin()[1]->getName() + " of arguments of aggregate function "
+ name + ", there two arguments should have same DataType", + name + ", both arguments should have same data type",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
for (const auto & arg : args) for (const auto & arg : args)
{ {
if (!isNativeNumber(arg) && !isDate(arg) && !isDateTime(arg) && !isDateTime64(arg)) if (!isNativeNumber(arg) && !isDate(arg) && !isDateTime(arg))
throw Exception( throw Exception(
"Illegal type " + arg->getName() + " of argument of aggregate function " + name "Illegal type " + arg->getName() + " of argument of aggregate function " + name
+ ", must be Number, Date, DateTime or DateTime64", + ", must be native integral type, Date/DateTime or Float",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
} }
AggregateFunctionPtr res(createWithBasicNumberOrDateOrDateTime<AggregateFunctionSegmentLengthSum, Data>(*arguments[0], arguments)); AggregateFunctionPtr res(createWithBasicNumberOrDateOrDateTime<AggregateFunctionIntervalLengthSum, Data>(*arguments[0], arguments));
if (res) if (res)
return res; return res;
throw Exception( throw Exception(
"Illegal type " + arguments.front().get()->getName() + " of first argument of aggregate function " + name "Illegal type " + arguments.front().get()->getName() + " of argument of aggregate function " + name
+ ", must be Native Unsigned Number", + ", must be native integral type, Date/DateTime or Float",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
} }
} }
void registerAggregateFunctionSegmentLengthSum(AggregateFunctionFactory & factory) void registerAggregateFunctionIntervalLengthSum(AggregateFunctionFactory & factory)
{ {
factory.registerFunction("segmentLengthSum", createAggregateFunctionSegmentLengthSum<AggregateFunctionSegmentLengthSumData>); factory.registerFunction("intervalLengthSum", createAggregateFunctionIntervalLengthSum<AggregateFunctionIntervalLengthSumData>);
} }
} }

View File

@ -1,22 +1,39 @@
#pragma once #pragma once
#include <unordered_set> #include <AggregateFunctions/AggregateFunctionNull.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/ArenaAllocator.h> #include <Common/ArenaAllocator.h>
#include <Common/assert_cast.h> #include <Common/assert_cast.h>
#include <AggregateFunctions/AggregateFunctionNull.h> #include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <unordered_set>
namespace DB namespace DB
{ {
template <typename T> namespace ErrorCodes
struct AggregateFunctionSegmentLengthSumData
{ {
extern const int TOO_LARGE_ARRAY_SIZE;
}
/**
* Calculate total length of intervals without intersections. Each interval is the pair of numbers [begin, end];
* Return UInt64 for integral types (UInt/Int*, Date/DateTime) and return Float64 for Float*.
*
* Implementation simply stores intervals sorted by beginning and sums lengths at final.
*/
template <typename T>
struct AggregateFunctionIntervalLengthSumData
{
constexpr static size_t MAX_ARRAY_SIZE = 0xFFFFFF;
using Segment = std::pair<T, T>; using Segment = std::pair<T, T>;
using Segments = PODArrayWithStackMemory<Segment, 64>; using Segments = PODArrayWithStackMemory<Segment, 64>;
@ -24,18 +41,16 @@ struct AggregateFunctionSegmentLengthSumData
Segments segments; Segments segments;
size_t size() const { return segments.size(); } void add(T begin, T end)
void add(T start, T end)
{ {
if (sorted && segments.size() > 0) if (sorted && !segments.empty())
{ {
sorted = segments.back().first <= start; sorted = segments.back().first <= begin;
} }
segments.emplace_back(start, end); segments.emplace_back(begin, end);
} }
void merge(const AggregateFunctionSegmentLengthSumData & other) void merge(const AggregateFunctionIntervalLengthSumData & other)
{ {
if (other.segments.empty()) if (other.segments.empty())
return; return;
@ -46,7 +61,9 @@ struct AggregateFunctionSegmentLengthSumData
/// either sort whole container or do so partially merging ranges afterwards /// either sort whole container or do so partially merging ranges afterwards
if (!sorted && !other.sorted) if (!sorted && !other.sorted)
std::stable_sort(std::begin(segments), std::end(segments)); {
std::sort(std::begin(segments), std::end(segments));
}
else else
{ {
const auto begin = std::begin(segments); const auto begin = std::begin(segments);
@ -54,10 +71,10 @@ struct AggregateFunctionSegmentLengthSumData
const auto end = std::end(segments); const auto end = std::end(segments);
if (!sorted) if (!sorted)
std::stable_sort(begin, middle); std::sort(begin, middle);
if (!other.sorted) if (!other.sorted)
std::stable_sort(middle, end); std::sort(middle, end);
std::inplace_merge(begin, middle, end); std::inplace_merge(begin, middle, end);
} }
@ -69,7 +86,7 @@ struct AggregateFunctionSegmentLengthSumData
{ {
if (!sorted) if (!sorted)
{ {
std::stable_sort(std::begin(segments), std::end(segments)); std::sort(std::begin(segments), std::end(segments));
sorted = true; sorted = true;
} }
} }
@ -93,28 +110,30 @@ struct AggregateFunctionSegmentLengthSumData
size_t size; size_t size;
readBinary(size, buf); readBinary(size, buf);
if (unlikely(size > MAX_ARRAY_SIZE))
throw Exception("Too large array size", ErrorCodes::TOO_LARGE_ARRAY_SIZE);
segments.clear(); segments.clear();
segments.reserve(size); segments.reserve(size);
T start, end; Segment segment;
for (size_t i = 0; i < size; ++i) for (size_t i = 0; i < size; ++i)
{ {
readBinary(start, buf); readBinary(segment.first, buf);
readBinary(end, buf); readBinary(segment.second, buf);
segments.emplace_back(start, end); segments.emplace_back(segment);
} }
} }
}; };
template <typename T, typename Data> template <typename T, typename Data>
class AggregateFunctionSegmentLengthSum final : public IAggregateFunctionDataHelper<Data, AggregateFunctionSegmentLengthSum<T, Data>> class AggregateFunctionIntervalLengthSum final : public IAggregateFunctionDataHelper<Data, AggregateFunctionIntervalLengthSum<T, Data>>
{ {
private: private:
template <typename TResult> template <typename TResult>
TResult getSegmentLengthSum(Data & data) const TResult getIntervalLengthSum(Data & data) const
{ {
if (data.size() == 0) if (data.segments.empty())
return 0; return 0;
data.sort(); data.sort();
@ -123,8 +142,9 @@ private:
typename Data::Segment cur_segment = data.segments[0]; typename Data::Segment cur_segment = data.segments[0];
for (size_t i = 1; i < data.segments.size(); ++i) for (size_t i = 1, sz = data.segments.size(); i < sz; ++i)
{ {
/// Check if current interval intersect with next one then add length, otherwise advance interval end
if (cur_segment.second < data.segments[i].first) if (cur_segment.second < data.segments[i].first)
{ {
res += cur_segment.second - cur_segment.first; res += cur_segment.second - cur_segment.first;
@ -140,10 +160,10 @@ private:
} }
public: public:
String getName() const override { return "segmentLengthSum"; } String getName() const override { return "intervalLengthSum"; }
explicit AggregateFunctionSegmentLengthSum(const DataTypes & arguments) explicit AggregateFunctionIntervalLengthSum(const DataTypes & arguments)
: IAggregateFunctionDataHelper<Data, AggregateFunctionSegmentLengthSum<T, Data>>(arguments, {}) : IAggregateFunctionDataHelper<Data, AggregateFunctionIntervalLengthSum<T, Data>>(arguments, {})
{ {
} }
@ -167,9 +187,9 @@ public:
void add(AggregateDataPtr __restrict place, const IColumn ** columns, const size_t row_num, Arena *) const override void add(AggregateDataPtr __restrict place, const IColumn ** columns, const size_t row_num, Arena *) const override
{ {
auto start = assert_cast<const ColumnVector<T> *>(columns[0])->getData()[row_num]; auto begin = assert_cast<const ColumnVector<T> *>(columns[0])->getData()[row_num];
auto end = assert_cast<const ColumnVector<T> *>(columns[1])->getData()[row_num]; auto end = assert_cast<const ColumnVector<T> *>(columns[1])->getData()[row_num];
this->data(place).add(start, end); this->data(place).add(begin, end);
} }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
@ -190,9 +210,9 @@ public:
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{ {
if constexpr (std::is_floating_point_v<T>) if constexpr (std::is_floating_point_v<T>)
assert_cast<ColumnFloat64 &>(to).getData().push_back(getSegmentLengthSum<Float64>(this->data(place))); assert_cast<ColumnFloat64 &>(to).getData().push_back(getIntervalLengthSum<Float64>(this->data(place)));
else else
assert_cast<ColumnUInt64 &>(to).getData().push_back(getSegmentLengthSum<UInt64>(this->data(place))); assert_cast<ColumnUInt64 &>(to).getData().push_back(getIntervalLengthSum<UInt64>(this->data(place)));
} }
}; };

View File

@ -64,7 +64,7 @@ void registerAggregateFunctionCombinatorDistinct(AggregateFunctionCombinatorFact
void registerWindowFunctions(AggregateFunctionFactory & factory); void registerWindowFunctions(AggregateFunctionFactory & factory);
void registerAggregateFunctionSegmentLengthSum(AggregateFunctionFactory &); void registerAggregateFunctionIntervalLengthSum(AggregateFunctionFactory &);
void registerAggregateFunctions() void registerAggregateFunctions()
{ {
@ -116,7 +116,7 @@ void registerAggregateFunctions()
registerWindowFunctions(factory); registerWindowFunctions(factory);
registerAggregateFunctionSegmentLengthSum(factory); registerAggregateFunctionIntervalLengthSum(factory);
} }
{ {

View File

@ -31,6 +31,7 @@ SRCS(
AggregateFunctionGroupUniqArray.cpp AggregateFunctionGroupUniqArray.cpp
AggregateFunctionHistogram.cpp AggregateFunctionHistogram.cpp
AggregateFunctionIf.cpp AggregateFunctionIf.cpp
AggregateFunctionIntervalLengthSum.cpp
AggregateFunctionMLMethod.cpp AggregateFunctionMLMethod.cpp
AggregateFunctionMannWhitney.cpp AggregateFunctionMannWhitney.cpp
AggregateFunctionMax.cpp AggregateFunctionMax.cpp
@ -43,7 +44,6 @@ SRCS(
AggregateFunctionRankCorrelation.cpp AggregateFunctionRankCorrelation.cpp
AggregateFunctionResample.cpp AggregateFunctionResample.cpp
AggregateFunctionRetention.cpp AggregateFunctionRetention.cpp
AggregateFunctionSegmentLengthSum.cpp
AggregateFunctionSequenceMatch.cpp AggregateFunctionSequenceMatch.cpp
AggregateFunctionSequenceNextNode.cpp AggregateFunctionSequenceNextNode.cpp
AggregateFunctionSimpleLinearRegression.cpp AggregateFunctionSimpleLinearRegression.cpp

View File

@ -424,7 +424,7 @@ void Connection::sendQuery(
if (method == "ZSTD") if (method == "ZSTD")
level = settings->network_zstd_compression_level; level = settings->network_zstd_compression_level;
CompressionCodecFactory::instance().validateCodec(method, level, !settings->allow_suspicious_codecs); CompressionCodecFactory::instance().validateCodec(method, level, !settings->allow_suspicious_codecs, settings->allow_experimental_codecs);
compression_codec = CompressionCodecFactory::instance().get(method, level); compression_codec = CompressionCodecFactory::instance().get(method, level);
} }
else else

View File

@ -14,6 +14,6 @@
#cmakedefine01 USE_SENTRY #cmakedefine01 USE_SENTRY
#cmakedefine01 USE_GRPC #cmakedefine01 USE_GRPC
#cmakedefine01 USE_STATS #cmakedefine01 USE_STATS
#cmakedefine01 CLICKHOUSE_SPLIT_BINARY
#cmakedefine01 USE_DATASKETCHES #cmakedefine01 USE_DATASKETCHES
#cmakedefine01 USE_YAML_CPP #cmakedefine01 USE_YAML_CPP
#cmakedefine01 CLICKHOUSE_SPLIT_BINARY

View File

@ -1,4 +1,4 @@
#include <Compression/CompressionCodecDelta.h> #include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h> #include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <common/unaligned.h> #include <common/unaligned.h>
@ -11,6 +11,29 @@
namespace DB namespace DB
{ {
class CompressionCodecDelta : public ICompressionCodec
{
public:
explicit CompressionCodecDelta(UInt8 delta_bytes_size_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override { return uncompressed_size + 2; }
bool isCompression() const override { return false; }
bool isGenericCompression() const override { return false; }
private:
UInt8 delta_bytes_size;
};
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int CANNOT_COMPRESS; extern const int CANNOT_COMPRESS;

View File

@ -1,32 +0,0 @@
#pragma once
#include <Compression/ICompressionCodec.h>
namespace DB
{
class CompressionCodecDelta : public ICompressionCodec
{
public:
CompressionCodecDelta(UInt8 delta_bytes_size_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override { return uncompressed_size + 2; }
bool isCompression() const override { return false; }
bool isGenericCompression() const override { return false; }
private:
UInt8 delta_bytes_size;
};
}

View File

@ -1,4 +1,4 @@
#include <Compression/CompressionCodecDoubleDelta.h> #include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h> #include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <common/unaligned.h> #include <common/unaligned.h>
@ -15,9 +15,126 @@
#include <type_traits> #include <type_traits>
#include <limits> #include <limits>
namespace DB namespace DB
{ {
/** NOTE DoubleDelta is surprisingly bad name. The only excuse is that it comes from an academic paper.
* Most people will think that "double delta" is just applying delta transform twice.
* But in fact it is something more than applying delta transform twice.
*/
/** DoubleDelta column codec implementation.
*
* Based on Gorilla paper: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf, which was extended
* to support 64bit types. The drawback is 1 extra bit for 32-byte wide deltas: 5-bit prefix
* instead of 4-bit prefix.
*
* This codec is best used against monotonic integer sequences with constant (or almost constant)
* stride, like event timestamp for some monitoring application.
*
* Given input sequence a: [a0, a1, ... an]:
*
* First, write number of items (sizeof(int32)*8 bits): n
* Then write first item as is (sizeof(a[0])*8 bits): a[0]
* Second item is written as delta (sizeof(a[0])*8 bits): a[1] - a[0]
* Loop over remaining items and calculate double delta:
* double_delta = a[i] - 2 * a[i - 1] + a[i - 2]
* Write it in compact binary form with `BitWriter`
* if double_delta == 0:
* write 1bit: 0
* else if -63 < double_delta < 64:
* write 2 bit prefix: 10
* write sign bit (1 if signed): x
* write 7-1 bits of abs(double_delta - 1): xxxxxx
* else if -255 < double_delta < 256:
* write 3 bit prefix: 110
* write sign bit (1 if signed): x
* write 9-1 bits of abs(double_delta - 1): xxxxxxxx
* else if -2047 < double_delta < 2048:
* write 4 bit prefix: 1110
* write sign bit (1 if signed): x
* write 12-1 bits of abs(double_delta - 1): xxxxxxxxxxx
* else if double_delta fits into 32-bit int:
* write 5 bit prefix: 11110
* write sign bit (1 if signed): x
* write 32-1 bits of abs(double_delta - 1): xxxxxxxxxxx...
* else
* write 5 bit prefix: 11111
* write sign bit (1 if signed): x
* write 64-1 bits of abs(double_delta - 1): xxxxxxxxxxx...
*
* @example sequence of UInt8 values [1, 2, 3, 4, 5, 6, 7, 8, 9 10] is encoded as (codec header is omitted):
*
* .- 4-byte little-endian sequence length (10 == 0xa)
* | .- 1 byte (sizeof(UInt8) a[0] : 0x01
* | | .- 1 byte of delta: a[1] - a[0] = 2 - 1 = 1 : 0x01
* | | | .- 8 zero bits since double delta for remaining 8 elements was 0 : 0x00
* v_______________v___v___v___
* \x0a\x00\x00\x00\x01\x01\x00
*
* @example sequence of Int16 values [-10, 10, -20, 20, -40, 40] is encoded as:
*
* .- 4-byte little endian sequence length = 6 : 0x00000006
* | .- 2 bytes (sizeof(Int16) a[0] as UInt16 = -10 : 0xfff6
* | | .- 2 bytes of delta: a[1] - a[0] = 10 - (-10) = 20 : 0x0014
* | | | .- 4 encoded double deltas (see below)
* v_______________ v______ v______ v______________________
* \x06\x00\x00\x00\xf6\xff\x14\x00\xb8\xe2\x2e\xb1\xe4\x58
*
* 4 binary encoded double deltas (\xb8\xe2\x2e\xb1\xe4\x58):
* double_delta (DD) = -20 - 2 * 10 + (-10) = -50
* .- 2-bit prefix : 0b10
* | .- sign-bit : 0b1
* | |.- abs(DD - 1) = 49 : 0b110001
* | ||
* | || DD = 20 - 2 * (-20) + 10 = 70
* | || .- 3-bit prefix : 0b110
* | || | .- sign bit : 0b0
* | || | |.- abs(DD - 1) = 69 : 0b1000101
* | || | ||
* | || | || DD = -40 - 2 * 20 + (-20) = -100
* | || | || .- 3-bit prefix : 0b110
* | || | || | .- sign-bit : 0b0
* | || | || | |.- abs(DD - 1) = 99 : 0b1100011
* | || | || | ||
* | || | || | || DD = 40 - 2 * (-40) + 20 = 140
* | || | || | || .- 3-bit prefix : 0b110
* | || | || | || | .- sign bit : 0b0
* | || | || | || | |.- abs(DD - 1) = 139 : 0b10001011
* | || | || | || | ||
* V_vv______V__vv________V____vv_______V__vv________,- padding bits
* 10111000 11100010 00101110 10110001 11100100 01011000
*
* Please also see unit tests for:
* * Examples on what output `BitWriter` produces on predefined input.
* * Compatibility tests solidifying encoded binary output on set of predefined sequences.
*/
class CompressionCodecDoubleDelta : public ICompressionCodec
{
public:
explicit CompressionCodecDoubleDelta(UInt8 data_bytes_size_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
private:
UInt8 data_bytes_size;
};
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int CANNOT_COMPRESS; extern const int CANNOT_COMPRESS;

View File

@ -1,118 +0,0 @@
#pragma once
#include <Compression/ICompressionCodec.h>
namespace DB
{
/** DoubleDelta column codec implementation.
*
* Based on Gorilla paper: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf, which was extended
* to support 64bit types. The drawback is 1 extra bit for 32-byte wide deltas: 5-bit prefix
* instead of 4-bit prefix.
*
* This codec is best used against monotonic integer sequences with constant (or almost constant)
* stride, like event timestamp for some monitoring application.
*
* Given input sequence a: [a0, a1, ... an]:
*
* First, write number of items (sizeof(int32)*8 bits): n
* Then write first item as is (sizeof(a[0])*8 bits): a[0]
* Second item is written as delta (sizeof(a[0])*8 bits): a[1] - a[0]
* Loop over remaining items and calculate double delta:
* double_delta = a[i] - 2 * a[i - 1] + a[i - 2]
* Write it in compact binary form with `BitWriter`
* if double_delta == 0:
* write 1bit: 0
* else if -63 < double_delta < 64:
* write 2 bit prefix: 10
* write sign bit (1 if signed): x
* write 7-1 bits of abs(double_delta - 1): xxxxxx
* else if -255 < double_delta < 256:
* write 3 bit prefix: 110
* write sign bit (1 if signed): x
* write 9-1 bits of abs(double_delta - 1): xxxxxxxx
* else if -2047 < double_delta < 2048:
* write 4 bit prefix: 1110
* write sign bit (1 if signed): x
* write 12-1 bits of abs(double_delta - 1): xxxxxxxxxxx
* else if double_delta fits into 32-bit int:
* write 5 bit prefix: 11110
* write sign bit (1 if signed): x
* write 32-1 bits of abs(double_delta - 1): xxxxxxxxxxx...
* else
* write 5 bit prefix: 11111
* write sign bit (1 if signed): x
* write 64-1 bits of abs(double_delta - 1): xxxxxxxxxxx...
*
* @example sequence of UInt8 values [1, 2, 3, 4, 5, 6, 7, 8, 9 10] is encoded as (codec header is omitted):
*
* .- 4-byte little-endian sequence length (10 == 0xa)
* | .- 1 byte (sizeof(UInt8) a[0] : 0x01
* | | .- 1 byte of delta: a[1] - a[0] = 2 - 1 = 1 : 0x01
* | | | .- 8 zero bits since double delta for remaining 8 elements was 0 : 0x00
* v_______________v___v___v___
* \x0a\x00\x00\x00\x01\x01\x00
*
* @example sequence of Int16 values [-10, 10, -20, 20, -40, 40] is encoded as:
*
* .- 4-byte little endian sequence length = 6 : 0x00000006
* | .- 2 bytes (sizeof(Int16) a[0] as UInt16 = -10 : 0xfff6
* | | .- 2 bytes of delta: a[1] - a[0] = 10 - (-10) = 20 : 0x0014
* | | | .- 4 encoded double deltas (see below)
* v_______________ v______ v______ v______________________
* \x06\x00\x00\x00\xf6\xff\x14\x00\xb8\xe2\x2e\xb1\xe4\x58
*
* 4 binary encoded double deltas (\xb8\xe2\x2e\xb1\xe4\x58):
* double_delta (DD) = -20 - 2 * 10 + (-10) = -50
* .- 2-bit prefix : 0b10
* | .- sign-bit : 0b1
* | |.- abs(DD - 1) = 49 : 0b110001
* | ||
* | || DD = 20 - 2 * (-20) + 10 = 70
* | || .- 3-bit prefix : 0b110
* | || | .- sign bit : 0b0
* | || | |.- abs(DD - 1) = 69 : 0b1000101
* | || | ||
* | || | || DD = -40 - 2 * 20 + (-20) = -100
* | || | || .- 3-bit prefix : 0b110
* | || | || | .- sign-bit : 0b0
* | || | || | |.- abs(DD - 1) = 99 : 0b1100011
* | || | || | ||
* | || | || | || DD = 40 - 2 * (-40) + 20 = 140
* | || | || | || .- 3-bit prefix : 0b110
* | || | || | || | .- sign bit : 0b0
* | || | || | || | |.- abs(DD - 1) = 139 : 0b10001011
* | || | || | || | ||
* V_vv______V__vv________V____vv_______V__vv________,- padding bits
* 10111000 11100010 00101110 10110001 11100100 01011000
*
* Please also see unit tests for:
* * Examples on what output `BitWriter` produces on predefined input.
* * Compatibility tests solidifying encoded binary output on set of predefined sequences.
*/
class CompressionCodecDoubleDelta : public ICompressionCodec
{
public:
CompressionCodecDoubleDelta(UInt8 data_bytes_size_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
private:
UInt8 data_bytes_size;
};
}

View File

@ -1,4 +1,4 @@
#include <Compression/CompressionCodecGorilla.h> #include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h> #include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <common/unaligned.h> #include <common/unaligned.h>
@ -14,9 +14,118 @@
#include <bitset> #include <bitset>
namespace DB namespace DB
{ {
/** Gorilla column codec implementation.
*
* Based on Gorilla paper: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf
*
* This codec is best used against monotonic floating sequences, like CPU usage percentage
* or any other gauge.
*
* Given input sequence a: [a0, a1, ... an]
*
* First, write number of items (sizeof(int32)*8 bits): n
* Then write first item as is (sizeof(a[0])*8 bits): a[0]
* Loop over remaining items and calculate xor_diff:
* xor_diff = a[i] ^ a[i - 1] (e.g. 00000011'10110100)
* Write it in compact binary form with `BitWriter`
* if xor_diff == 0:
* write 1 bit: 0
* else:
* calculate leading zero bits (lzb)
* and trailing zero bits (tzb) of xor_diff,
* compare to lzb and tzb of previous xor_diff
* (X = sizeof(a[i]) * 8, e.g. X = 16, lzb = 6, tzb = 2)
* if lzb >= prev_lzb && tzb >= prev_tzb:
* (e.g. prev_lzb=4, prev_tzb=1)
* write 2 bit prefix: 0b10
* write xor_diff >> prev_tzb (X - prev_lzb - prev_tzb bits):0b00111011010
* (where X = sizeof(a[i]) * 8, e.g. 16)
* else:
* write 2 bit prefix: 0b11
* write 5 bits of lzb: 0b00110
* write 6 bits of (X - lzb - tzb)=(16-6-2)=8: 0b001000
* write (X - lzb - tzb) non-zero bits of xor_diff: 0b11101101
* prev_lzb = lzb
* prev_tzb = tzb
*
* @example sequence of Float32 values [0.1, 0.1, 0.11, 0.2, 0.1] is encoded as:
*
* .- 4-byte little endian sequence length: 5 : 0x00000005
* | .- 4 byte (sizeof(Float32) a[0] as UInt32 : -10 : 0xcdcccc3d
* | | .- 4 encoded xor diffs (see below)
* v_______________ v______________ v__________________________________________________
* \x05\x00\x00\x00\xcd\xcc\xcc\x3d\x6a\x5a\xd8\xb6\x3c\xcd\x75\xb1\x6c\x77\x00\x00\x00
*
* 4 binary encoded xor diffs (\x6a\x5a\xd8\xb6\x3c\xcd\x75\xb1\x6c\x77\x00\x00\x00):
*
* ...........................................
* a[i-1] = 00111101110011001100110011001101
* a[i] = 00111101110011001100110011001101
* xor_diff = 00000000000000000000000000000000
* .- 1-bit prefix : 0b0
* |
* | ...........................................
* | a[i-1] = 00111101110011001100110011001101
* ! a[i] = 00111101111000010100011110101110
* | xor_diff = 00000000001011011000101101100011
* | lzb = 10
* | tzb = 0
* |.- 2-bit prefix : 0b11
* || .- lzb (10) : 0b1010
* || | .- data length (32-10-0): 22 : 0b010110
* || | | .- data : 0b1011011000101101100011
* || | | |
* || | | | ...........................................
* || | | | a[i-1] = 00111101111000010100011110101110
* || | | | a[i] = 00111110010011001100110011001101
* || | | | xor_diff = 00000011101011011000101101100011
* || | | | .- 2-bit prefix : 0b11
* || | | | | .- lzb = 6 : 0b00110
* || | | | | | .- data length = (32 - 6) = 26 : 0b011010
* || | | | | | | .- data : 0b11101011011000101101100011
* || | | | | | | |
* || | | | | | | | ...........................................
* || | | | | | | | a[i-1] = 00111110010011001100110011001101
* || | | | | | | | a[i] = 00111101110011001100110011001101
* || | | | | | | | xor_diff = 00000011100000000000000000000000
* || | | | | | | | .- 2-bit prefix : 0b10
* || | | | | | | | | .- data : 0b11100000000000000000000000
* VV_v____ v_____v________________________V_v_____v______v____________________________V_v_____________________________
* 01101010 01011010 11011000 10110110 00111100 11001101 01110101 10110001 01101100 01110111 00000000 00000000 00000000
*
* Please also see unit tests for:
* * Examples on what output `BitWriter` produces on predefined input.
* * Compatibility tests solidifying encoded binary output on set of predefined sequences.
*/
class CompressionCodecGorilla : public ICompressionCodec
{
public:
explicit CompressionCodecGorilla(UInt8 data_bytes_size_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
private:
UInt8 data_bytes_size;
};
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int CANNOT_COMPRESS; extern const int CANNOT_COMPRESS;

View File

@ -1,115 +0,0 @@
#pragma once
#include <Compression/ICompressionCodec.h>
namespace DB
{
/** Gorilla column codec implementation.
*
* Based on Gorilla paper: http://www.vldb.org/pvldb/vol8/p1816-teller.pdf
*
* This codec is best used against monotonic floating sequences, like CPU usage percentage
* or any other gauge.
*
* Given input sequence a: [a0, a1, ... an]
*
* First, write number of items (sizeof(int32)*8 bits): n
* Then write first item as is (sizeof(a[0])*8 bits): a[0]
* Loop over remaining items and calculate xor_diff:
* xor_diff = a[i] ^ a[i - 1] (e.g. 00000011'10110100)
* Write it in compact binary form with `BitWriter`
* if xor_diff == 0:
* write 1 bit: 0
* else:
* calculate leading zero bits (lzb)
* and trailing zero bits (tzb) of xor_diff,
* compare to lzb and tzb of previous xor_diff
* (X = sizeof(a[i]) * 8, e.g. X = 16, lzb = 6, tzb = 2)
* if lzb >= prev_lzb && tzb >= prev_tzb:
* (e.g. prev_lzb=4, prev_tzb=1)
* write 2 bit prefix: 0b10
* write xor_diff >> prev_tzb (X - prev_lzb - prev_tzb bits):0b00111011010
* (where X = sizeof(a[i]) * 8, e.g. 16)
* else:
* write 2 bit prefix: 0b11
* write 5 bits of lzb: 0b00110
* write 6 bits of (X - lzb - tzb)=(16-6-2)=8: 0b001000
* write (X - lzb - tzb) non-zero bits of xor_diff: 0b11101101
* prev_lzb = lzb
* prev_tzb = tzb
*
* @example sequence of Float32 values [0.1, 0.1, 0.11, 0.2, 0.1] is encoded as:
*
* .- 4-byte little endian sequence length: 5 : 0x00000005
* | .- 4 byte (sizeof(Float32) a[0] as UInt32 : -10 : 0xcdcccc3d
* | | .- 4 encoded xor diffs (see below)
* v_______________ v______________ v__________________________________________________
* \x05\x00\x00\x00\xcd\xcc\xcc\x3d\x6a\x5a\xd8\xb6\x3c\xcd\x75\xb1\x6c\x77\x00\x00\x00
*
* 4 binary encoded xor diffs (\x6a\x5a\xd8\xb6\x3c\xcd\x75\xb1\x6c\x77\x00\x00\x00):
*
* ...........................................
* a[i-1] = 00111101110011001100110011001101
* a[i] = 00111101110011001100110011001101
* xor_diff = 00000000000000000000000000000000
* .- 1-bit prefix : 0b0
* |
* | ...........................................
* | a[i-1] = 00111101110011001100110011001101
* ! a[i] = 00111101111000010100011110101110
* | xor_diff = 00000000001011011000101101100011
* | lzb = 10
* | tzb = 0
* |.- 2-bit prefix : 0b11
* || .- lzb (10) : 0b1010
* || | .- data length (32-10-0): 22 : 0b010110
* || | | .- data : 0b1011011000101101100011
* || | | |
* || | | | ...........................................
* || | | | a[i-1] = 00111101111000010100011110101110
* || | | | a[i] = 00111110010011001100110011001101
* || | | | xor_diff = 00000011101011011000101101100011
* || | | | .- 2-bit prefix : 0b11
* || | | | | .- lzb = 6 : 0b00110
* || | | | | | .- data length = (32 - 6) = 26 : 0b011010
* || | | | | | | .- data : 0b11101011011000101101100011
* || | | | | | | |
* || | | | | | | | ...........................................
* || | | | | | | | a[i-1] = 00111110010011001100110011001101
* || | | | | | | | a[i] = 00111101110011001100110011001101
* || | | | | | | | xor_diff = 00000011100000000000000000000000
* || | | | | | | | .- 2-bit prefix : 0b10
* || | | | | | | | | .- data : 0b11100000000000000000000000
* VV_v____ v_____v________________________V_v_____v______v____________________________V_v_____________________________
* 01101010 01011010 11011000 10110110 00111100 11001101 01110101 10110001 01101100 01110111 00000000 00000000 00000000
*
* Please also see unit tests for:
* * Examples on what output `BitWriter` produces on predefined input.
* * Compatibility tests solidifying encoded binary output on set of predefined sequences.
*/
class CompressionCodecGorilla : public ICompressionCodec
{
public:
CompressionCodecGorilla(UInt8 data_bytes_size_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
private:
UInt8 data_bytes_size;
};
}

View File

@ -1,7 +1,7 @@
#include "CompressionCodecLZ4.h"
#include <lz4.h> #include <lz4.h>
#include <lz4hc.h> #include <lz4hc.h>
#include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h> #include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <Compression/LZ4_decompress_faster.h> #include <Compression/LZ4_decompress_faster.h>
@ -9,7 +9,9 @@
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h> #include <Parsers/ASTIdentifier.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/BufferWithOwnMemory.h>
#pragma GCC diagnostic ignored "-Wold-style-cast" #pragma GCC diagnostic ignored "-Wold-style-cast"
@ -17,11 +19,51 @@
namespace DB namespace DB
{ {
class CompressionCodecLZ4 : public ICompressionCodec
{
public:
explicit CompressionCodecLZ4();
uint8_t getMethodByte() const override;
UInt32 getAdditionalSizeAtTheEndOfBuffer() const override { return LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER; }
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
private:
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
mutable LZ4::PerformanceStatistics lz4_stat;
ASTPtr codec_desc;
};
class CompressionCodecLZ4HC : public CompressionCodecLZ4
{
public:
explicit CompressionCodecLZ4HC(int level_);
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
private:
const int level;
};
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int CANNOT_COMPRESS; extern const int CANNOT_COMPRESS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE; extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER; extern const int ILLEGAL_CODEC_PARAMETER;
} }
CompressionCodecLZ4::CompressionCodecLZ4() CompressionCodecLZ4::CompressionCodecLZ4()

View File

@ -1,52 +0,0 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <Compression/ICompressionCodec.h>
#include <IO/BufferWithOwnMemory.h>
#include <Parsers/StringRange.h>
#include <Compression/LZ4_decompress_faster.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
class CompressionCodecLZ4 : public ICompressionCodec
{
public:
CompressionCodecLZ4();
uint8_t getMethodByte() const override;
UInt32 getAdditionalSizeAtTheEndOfBuffer() const override { return LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER; }
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
private:
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
mutable LZ4::PerformanceStatistics lz4_stat;
ASTPtr codec_desc;
};
class CompressionCodecLZ4HC : public CompressionCodecLZ4
{
public:
CompressionCodecLZ4HC(int level_);
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
private:
const int level;
};
}

View File

@ -1,6 +1,6 @@
#include <cstring> #include <cstring>
#include <Compression/CompressionCodecT64.h> #include <Compression/ICompressionCodec.h>
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <common/unaligned.h> #include <common/unaligned.h>
#include <Parsers/IAST.h> #include <Parsers/IAST.h>
@ -8,18 +8,63 @@
#include <Parsers/ASTIdentifier.h> #include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Core/Types.h>
namespace DB namespace DB
{ {
/// Get 64 integer values, makes 64x64 bit matrix, transpose it and crop unused bits (most significant zeroes).
/// In example, if we have UInt8 with only 0 and 1 inside 64xUInt8 would be compressed into 1xUInt64.
/// It detects unused bits by calculating min and max values of data part, saving them in header in compression phase.
/// There's a special case with signed integers parts with crossing zero data. Here it stores one more bit to detect sign of value.
class CompressionCodecT64 : public ICompressionCodec
{
public:
static constexpr UInt32 HEADER_SIZE = 1 + 2 * sizeof(UInt64);
static constexpr UInt32 MAX_COMPRESSED_BLOCK_SIZE = sizeof(UInt64) * 64;
/// There're 2 compression variants:
/// Byte - transpose bit matrix by bytes (only the last not full byte is transposed by bits). It's default.
/// Bits - full bit-transpose of the bit matrix. It uses more resources and leads to better compression with ZSTD (but worse with LZ4).
enum class Variant
{
Byte,
Bit
};
CompressionCodecT64(TypeIndex type_idx_, Variant variant_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * src, UInt32 src_size, char * dst) const override;
void doDecompressData(const char * src, UInt32 src_size, char * dst, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override
{
/// uncompressed_size - (uncompressed_size % (sizeof(T) * 64)) + sizeof(UInt64) * sizeof(T) + header_size
return uncompressed_size + MAX_COMPRESSED_BLOCK_SIZE + HEADER_SIZE;
}
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
private:
TypeIndex type_idx;
Variant variant;
};
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int CANNOT_COMPRESS; extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS; extern const int CANNOT_DECOMPRESS;
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE; extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE;
extern const int ILLEGAL_CODEC_PARAMETER; extern const int ILLEGAL_CODEC_PARAMETER;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
namespace namespace

View File

@ -1,53 +0,0 @@
#pragma once
#include <Core/Types.h>
#include <Compression/ICompressionCodec.h>
namespace DB
{
/// Get 64 integer values, makes 64x64 bit matrix, transpose it and crop unused bits (most significant zeroes).
/// In example, if we have UInt8 with only 0 and 1 inside 64xUInt8 would be compressed into 1xUInt64.
/// It detects unused bits by calculating min and max values of data part, saving them in header in compression phase.
/// There's a special case with signed integers parts with crossing zero data. Here it stores one more bit to detect sign of value.
class CompressionCodecT64 : public ICompressionCodec
{
public:
static constexpr UInt32 HEADER_SIZE = 1 + 2 * sizeof(UInt64);
static constexpr UInt32 MAX_COMPRESSED_BLOCK_SIZE = sizeof(UInt64) * 64;
/// There're 2 compression variants:
/// Byte - transpose bit matrix by bytes (only the last not full byte is transposed by bits). It's default.
/// Bits - full bit-transpose of the bit matrix. It uses more resources and leads to better compression with ZSTD (but worse with LZ4).
enum class Variant
{
Byte,
Bit
};
CompressionCodecT64(TypeIndex type_idx_, Variant variant_);
uint8_t getMethodByte() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * src, UInt32 src_size, char * dst) const override;
void doDecompressData(const char * src, UInt32 src_size, char * dst, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override
{
/// uncompressed_size - (uncompressed_size % (sizeof(T) * 64)) + sizeof(UInt64) * sizeof(T) + header_size
return uncompressed_size + MAX_COMPRESSED_BLOCK_SIZE + HEADER_SIZE;
}
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
private:
TypeIndex type_idx;
Variant variant;
};
}

View File

@ -1,4 +1,4 @@
#include <Compression/CompressionCodecZSTD.h> #include <Compression/ICompressionCodec.h>
#include <Compression/CompressionInfo.h> #include <Compression/CompressionInfo.h>
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <zstd.h> #include <zstd.h>
@ -7,11 +7,44 @@
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
namespace DB namespace DB
{ {
class CompressionCodecZSTD : public ICompressionCodec
{
public:
static constexpr auto ZSTD_DEFAULT_LEVEL = 1;
static constexpr auto ZSTD_DEFAULT_LOG_WINDOW = 24;
explicit CompressionCodecZSTD(int level_);
CompressionCodecZSTD(int level_, int window_log);
uint8_t getMethodByte() const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
private:
const int level;
const bool enable_long_range;
const int window_log;
};
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int CANNOT_COMPRESS; extern const int CANNOT_COMPRESS;

View File

@ -1,42 +0,0 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <Compression/ICompressionCodec.h>
#include <IO/BufferWithOwnMemory.h>
#include <Parsers/StringRange.h>
namespace DB
{
class CompressionCodecZSTD : public ICompressionCodec
{
public:
static constexpr auto ZSTD_DEFAULT_LEVEL = 1;
static constexpr auto ZSTD_DEFAULT_LOG_WINDOW = 24;
CompressionCodecZSTD(int level_);
CompressionCodecZSTD(int level_, int window_log);
uint8_t getMethodByte() const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }
private:
const int level;
const bool enable_long_range;
const int window_log;
};
}

View File

@ -1,3 +1,7 @@
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h> #include <Parsers/ASTIdentifier.h>
@ -13,6 +17,7 @@
namespace DB namespace DB
{ {
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
@ -34,8 +39,8 @@ CompressionCodecPtr CompressionCodecFactory::get(const String & family_name, std
{ {
if (level) if (level)
{ {
auto literal = std::make_shared<ASTLiteral>(static_cast<UInt64>(*level)); auto level_literal = std::make_shared<ASTLiteral>(static_cast<UInt64>(*level));
return get(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), literal)), {}); return get(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), level_literal)), {});
} }
else else
{ {
@ -44,7 +49,8 @@ CompressionCodecPtr CompressionCodecFactory::get(const String & family_name, std
} }
} }
void CompressionCodecFactory::validateCodec(const String & family_name, std::optional<int> level, bool sanity_check) const void CompressionCodecFactory::validateCodec(
const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs) const
{ {
if (family_name.empty()) if (family_name.empty())
throw Exception("Compression codec name cannot be empty", ErrorCodes::BAD_ARGUMENTS); throw Exception("Compression codec name cannot be empty", ErrorCodes::BAD_ARGUMENTS);
@ -52,16 +58,19 @@ void CompressionCodecFactory::validateCodec(const String & family_name, std::opt
if (level) if (level)
{ {
auto literal = std::make_shared<ASTLiteral>(static_cast<UInt64>(*level)); auto literal = std::make_shared<ASTLiteral>(static_cast<UInt64>(*level));
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), literal)), {}, sanity_check); validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", makeASTFunction(Poco::toUpper(family_name), literal)),
{}, sanity_check, allow_experimental_codecs);
} }
else else
{ {
auto identifier = std::make_shared<ASTIdentifier>(Poco::toUpper(family_name)); auto identifier = std::make_shared<ASTIdentifier>(Poco::toUpper(family_name));
validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", identifier), {}, sanity_check); validateCodecAndGetPreprocessedAST(makeASTFunction("CODEC", identifier),
{}, sanity_check, allow_experimental_codecs);
} }
} }
ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const IDataType * column_type, bool sanity_check) const ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
const ASTPtr & ast, const IDataType * column_type, bool sanity_check, bool allow_experimental_codecs) const
{ {
if (const auto * func = ast->as<ASTFunction>()) if (const auto * func = ast->as<ASTFunction>())
{ {
@ -72,7 +81,7 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
std::optional<size_t> generic_compression_codec_pos; std::optional<size_t> generic_compression_codec_pos;
bool can_substitute_codec_arguments = true; bool can_substitute_codec_arguments = true;
for (size_t i = 0; i < func->arguments->children.size(); ++i) for (size_t i = 0, size = func->arguments->children.size(); i < size; ++i)
{ {
const auto & inner_codec_ast = func->arguments->children[i]; const auto & inner_codec_ast = func->arguments->children[i];
String codec_family_name; String codec_family_name;
@ -107,7 +116,8 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
if (column_type) if (column_type)
{ {
CompressionCodecPtr prev_codec; CompressionCodecPtr prev_codec;
IDataType::StreamCallbackWithType callback = [&](const ISerialization::SubstreamPath & substream_path, const IDataType & substream_type) IDataType::StreamCallbackWithType callback = [&](
const ISerialization::SubstreamPath & substream_path, const IDataType & substream_type)
{ {
if (ISerialization::isSpecialCompressionAllowed(substream_path)) if (ISerialization::isSpecialCompressionAllowed(substream_path))
{ {
@ -132,6 +142,12 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
result_codec = getImpl(codec_family_name, codec_arguments, nullptr); result_codec = getImpl(codec_family_name, codec_arguments, nullptr);
} }
if (!allow_experimental_codecs && result_codec->isExperimental())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Codec {} is experimental and not meant to be used in production."
" You can enable it with the 'allow_experimental_codecs' setting.",
codec_family_name);
codecs_descriptions->children.emplace_back(result_codec->getCodecDesc()); codecs_descriptions->children.emplace_back(result_codec->getCodecDesc());
} }
@ -172,6 +188,7 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
" (Note: you can enable setting 'allow_suspicious_codecs' to skip this check).", ErrorCodes::BAD_ARGUMENTS); " (Note: you can enable setting 'allow_suspicious_codecs' to skip this check).", ErrorCodes::BAD_ARGUMENTS);
} }
/// For columns with nested types like Tuple(UInt32, UInt64) we /// For columns with nested types like Tuple(UInt32, UInt64) we
/// obviously cannot substitute parameters for codecs which depend on /// obviously cannot substitute parameters for codecs which depend on
/// data type, because for the first column Delta(4) is suitable and /// data type, because for the first column Delta(4) is suitable and
@ -195,7 +212,9 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(const ASTPtr
throw Exception("Unknown codec family: " + queryToString(ast), ErrorCodes::UNKNOWN_CODEC); throw Exception("Unknown codec family: " + queryToString(ast), ErrorCodes::UNKNOWN_CODEC);
} }
CompressionCodecPtr CompressionCodecFactory::get(const ASTPtr & ast, const IDataType * column_type, CompressionCodecPtr current_default, bool only_generic) const
CompressionCodecPtr CompressionCodecFactory::get(
const ASTPtr & ast, const IDataType * column_type, CompressionCodecPtr current_default, bool only_generic) const
{ {
if (current_default == nullptr) if (current_default == nullptr)
current_default = default_codec; current_default = default_codec;
@ -246,6 +265,7 @@ CompressionCodecPtr CompressionCodecFactory::get(const ASTPtr & ast, const IData
throw Exception("Unexpected AST structure for compression codec: " + queryToString(ast), ErrorCodes::UNEXPECTED_AST_STRUCTURE); throw Exception("Unexpected AST structure for compression codec: " + queryToString(ast), ErrorCodes::UNEXPECTED_AST_STRUCTURE);
} }
CompressionCodecPtr CompressionCodecFactory::get(const uint8_t byte_code) const CompressionCodecPtr CompressionCodecFactory::get(const uint8_t byte_code) const
{ {
const auto family_code_and_creator = family_code_with_codec.find(byte_code); const auto family_code_and_creator = family_code_with_codec.find(byte_code);
@ -303,7 +323,7 @@ void CompressionCodecFactory::registerSimpleCompressionCodec(
registerCompressionCodec(family_name, byte_code, [family_name, creator](const ASTPtr & ast) registerCompressionCodec(family_name, byte_code, [family_name, creator](const ASTPtr & ast)
{ {
if (ast) if (ast)
throw Exception("Compression codec " + family_name + " cannot have arguments", ErrorCodes::DATA_TYPE_CANNOT_HAVE_ARGUMENTS); throw Exception(ErrorCodes::DATA_TYPE_CANNOT_HAVE_ARGUMENTS, "Compression codec {} cannot have arguments", family_name);
return creator(); return creator();
}); });
} }

View File

@ -38,16 +38,16 @@ public:
CompressionCodecPtr getDefaultCodec() const; CompressionCodecPtr getDefaultCodec() const;
/// Validate codecs AST specified by user and parses codecs description (substitute default parameters) /// Validate codecs AST specified by user and parses codecs description (substitute default parameters)
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const IDataType * column_type, bool sanity_check) const; ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const IDataType * column_type, bool sanity_check, bool allow_experimental_codecs) const;
/// Just wrapper for previous method. /// Just wrapper for previous method.
ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check) const ASTPtr validateCodecAndGetPreprocessedAST(const ASTPtr & ast, const DataTypePtr & column_type, bool sanity_check, bool allow_experimental_codecs) const
{ {
return validateCodecAndGetPreprocessedAST(ast, column_type.get(), sanity_check); return validateCodecAndGetPreprocessedAST(ast, column_type.get(), sanity_check, allow_experimental_codecs);
} }
/// Validate codecs AST specified by user /// Validate codecs AST specified by user
void validateCodec(const String & family_name, std::optional<int> level, bool sanity_check) const; void validateCodec(const String & family_name, std::optional<int> level, bool sanity_check, bool allow_experimental_codecs) const;
/// Get codec by AST and possible column_type. Some codecs can use /// Get codec by AST and possible column_type. Some codecs can use
/// information about type to improve inner settings, but every codec should /// information about type to improve inner settings, but every codec should

View File

@ -73,6 +73,10 @@ public:
/// Is it a generic compression algorithm like lz4, zstd. Usually it does not make sense to apply generic compression more than single time. /// Is it a generic compression algorithm like lz4, zstd. Usually it does not make sense to apply generic compression more than single time.
virtual bool isGenericCompression() const = 0; virtual bool isGenericCompression() const = 0;
/// It is a codec available only for evaluation purposes and not meant to be used in production.
/// It will not be allowed to use unless the user will turn off the safety switch.
virtual bool isExperimental() const { return false; }
/// If it does nothing. /// If it does nothing.
virtual bool isNone() const { return false; } virtual bool isNone() const { return false; }

View File

@ -83,12 +83,10 @@
#define DBMS_MIN_REVISION_WITH_X_FORWARDED_FOR_IN_CLIENT_INFO 54443 #define DBMS_MIN_REVISION_WITH_X_FORWARDED_FOR_IN_CLIENT_INFO 54443
#define DBMS_MIN_REVISION_WITH_REFERER_IN_CLIENT_INFO 54447 #define DBMS_MIN_REVISION_WITH_REFERER_IN_CLIENT_INFO 54447
#define DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH 54448
/// Version of ClickHouse TCP protocol. Increment it manually when you change the protocol. /// Version of ClickHouse TCP protocol. Increment it manually when you change the protocol.
#define DBMS_TCP_PROTOCOL_VERSION 54449 #define DBMS_TCP_PROTOCOL_VERSION 54448
#define DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME 54449 #define DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH 54448
/// The boundary on which the blocks for asynchronous file operations should be aligned. /// The boundary on which the blocks for asynchronous file operations should be aligned.
#define DEFAULT_AIO_FILE_BLOCK_SIZE 4096 #define DEFAULT_AIO_FILE_BLOCK_SIZE 4096

View File

@ -240,6 +240,7 @@ class IColumn;
M(Bool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.", 0) \ M(Bool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.", 0) \
M(Bool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.", 0) \ M(Bool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.", 0) \
M(Bool, allow_suspicious_codecs, false, "If it is set to true, allow to specify meaningless compression codecs.", 0) \ M(Bool, allow_suspicious_codecs, false, "If it is set to true, allow to specify meaningless compression codecs.", 0) \
M(Bool, allow_experimental_codecs, false, "If it is set to true, allow to specify experimental compression codecs (but we don't have those yet and this option does nothing).", 0) \
M(UInt64, odbc_max_field_size, 1024, "Max size of filed can be read from ODBC dictionary. Long strings are truncated.", 0) \ M(UInt64, odbc_max_field_size, 1024, "Max size of filed can be read from ODBC dictionary. Long strings are truncated.", 0) \
M(UInt64, query_profiler_real_time_period_ns, 1000000000, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off the real clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \ M(UInt64, query_profiler_real_time_period_ns, 1000000000, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off the real clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(UInt64, query_profiler_cpu_time_period_ns, 1000000000, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off the CPU clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \ M(UInt64, query_profiler_cpu_time_period_ns, 1000000000, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off the CPU clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \

View File

@ -7,7 +7,6 @@ PEERDIR(
clickhouse/src/Common clickhouse/src/Common
) )
SRCS( SRCS(
DiskCacheWrapper.cpp DiskCacheWrapper.cpp
DiskDecorator.cpp DiskDecorator.cpp
@ -16,13 +15,21 @@ SRCS(
DiskMemory.cpp DiskMemory.cpp
DiskRestartProxy.cpp DiskRestartProxy.cpp
DiskSelector.cpp DiskSelector.cpp
HDFS/DiskHDFS.cpp
IDisk.cpp IDisk.cpp
IDiskRemote.cpp
IVolume.cpp IVolume.cpp
LocalDirectorySyncGuard.cpp LocalDirectorySyncGuard.cpp
ReadIndirectBufferFromRemoteFS.cpp
S3/DiskS3.cpp
S3/ProxyListConfiguration.cpp
S3/ProxyResolverConfiguration.cpp
S3/registerDiskS3.cpp
SingleDiskVolume.cpp SingleDiskVolume.cpp
StoragePolicy.cpp StoragePolicy.cpp
VolumeJBOD.cpp VolumeJBOD.cpp
VolumeRAID1.cpp VolumeRAID1.cpp
WriteIndirectBufferFromRemoteFS.cpp
createVolume.cpp createVolume.cpp
registerDisks.cpp registerDisks.cpp

View File

@ -6,7 +6,7 @@ PEERDIR(
clickhouse/src/Common clickhouse/src/Common
) )
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F 'S3|HDFS' | sed 's/^\.\// /' | sort ?> <? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F 'S3|HDFS' | sed 's/^\.\// /' | sort ?>
) )

View File

@ -34,12 +34,6 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision)
writeBinary(initial_query_id, out); writeBinary(initial_query_id, out);
writeBinary(initial_address.toString(), out); writeBinary(initial_address.toString(), out);
if (server_protocol_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME)
{
writeBinary(initial_query_start_time, out);
writeBinary(initial_query_start_time_microseconds, out);
}
writeBinary(UInt8(interface), out); writeBinary(UInt8(interface), out);
if (interface == Interface::TCP) if (interface == Interface::TCP)
@ -115,12 +109,6 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision)
readBinary(initial_address_string, in); readBinary(initial_address_string, in);
initial_address = Poco::Net::SocketAddress(initial_address_string); initial_address = Poco::Net::SocketAddress(initial_address_string);
if (client_protocol_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME)
{
readBinary(initial_query_start_time, in);
readBinary(initial_query_start_time_microseconds, in);
}
UInt8 read_interface = 0; UInt8 read_interface = 0;
readBinary(read_interface, in); readBinary(read_interface, in);
interface = Interface(read_interface); interface = Interface(read_interface);

View File

@ -61,8 +61,6 @@ public:
String initial_user; String initial_user;
String initial_query_id; String initial_query_id;
Poco::Net::SocketAddress initial_address; Poco::Net::SocketAddress initial_address;
time_t initial_query_start_time{};
Decimal64 initial_query_start_time_microseconds{};
// OpenTelemetry trace context we received from client, or which we are going // OpenTelemetry trace context we received from client, or which we are going
// to send to server. // to send to server.

View File

@ -83,6 +83,17 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, ContextPtr c
} }
} }
if (settings.offset)
{
new_settings.offset = 0;
new_settings.offset.changed = false;
}
if (settings.limit)
{
new_settings.limit = 0;
new_settings.limit.changed = false;
}
auto new_context = Context::createCopy(context); auto new_context = Context::createCopy(context);
new_context->setSettings(new_settings); new_context->setSettings(new_settings);
return new_context; return new_context;

View File

@ -447,6 +447,8 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
defaults_sample_block = validateColumnsDefaultsAndGetSampleBlock(default_expr_list, column_names_and_types, context_); defaults_sample_block = validateColumnsDefaultsAndGetSampleBlock(default_expr_list, column_names_and_types, context_);
bool sanity_check_compression_codecs = !attach && !context_->getSettingsRef().allow_suspicious_codecs; bool sanity_check_compression_codecs = !attach && !context_->getSettingsRef().allow_suspicious_codecs;
bool allow_experimental_codecs = attach || context_->getSettingsRef().allow_experimental_codecs;
ColumnsDescription res; ColumnsDescription res;
auto name_type_it = column_names_and_types.begin(); auto name_type_it = column_names_and_types.begin();
for (auto ast_it = columns_ast.children.begin(); ast_it != columns_ast.children.end(); ++ast_it, ++name_type_it) for (auto ast_it = columns_ast.children.begin(); ast_it != columns_ast.children.end(); ++ast_it, ++name_type_it)
@ -481,7 +483,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
if (col_decl.default_specifier == "ALIAS") if (col_decl.default_specifier == "ALIAS")
throw Exception{"Cannot specify codec for column type ALIAS", ErrorCodes::BAD_ARGUMENTS}; throw Exception{"Cannot specify codec for column type ALIAS", ErrorCodes::BAD_ARGUMENTS};
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST( column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(
col_decl.codec, column.type, sanity_check_compression_codecs); col_decl.codec, column.type, sanity_check_compression_codecs, allow_experimental_codecs);
} }
if (col_decl.ttl) if (col_decl.ttl)

View File

@ -78,8 +78,6 @@ Block QueryLogElement::createBlock()
{std::make_shared<DataTypeString>(), "initial_query_id"}, {std::make_shared<DataTypeString>(), "initial_query_id"},
{DataTypeFactory::instance().get("IPv6"), "initial_address"}, {DataTypeFactory::instance().get("IPv6"), "initial_address"},
{std::make_shared<DataTypeUInt16>(), "initial_port"}, {std::make_shared<DataTypeUInt16>(), "initial_port"},
{std::make_shared<DataTypeDateTime>(), "initial_query_start_time"},
{std::make_shared<DataTypeDateTime64>(6), "initial_query_start_time_microseconds"},
{std::make_shared<DataTypeUInt8>(), "interface"}, {std::make_shared<DataTypeUInt8>(), "interface"},
{std::make_shared<DataTypeString>(), "os_user"}, {std::make_shared<DataTypeString>(), "os_user"},
{std::make_shared<DataTypeString>(), "client_hostname"}, {std::make_shared<DataTypeString>(), "client_hostname"},
@ -258,8 +256,6 @@ void QueryLogElement::appendClientInfo(const ClientInfo & client_info, MutableCo
columns[i++]->insert(client_info.initial_query_id); columns[i++]->insert(client_info.initial_query_id);
columns[i++]->insertData(IPv6ToBinary(client_info.initial_address.host()).data(), 16); columns[i++]->insertData(IPv6ToBinary(client_info.initial_address.host()).data(), 16);
columns[i++]->insert(client_info.initial_address.port()); columns[i++]->insert(client_info.initial_address.port());
columns[i++]->insert(client_info.initial_query_start_time);
columns[i++]->insert(client_info.initial_query_start_time_microseconds);
columns[i++]->insert(UInt64(client_info.interface)); columns[i++]->insert(UInt64(client_info.interface));

View File

@ -51,8 +51,6 @@ Block QueryThreadLogElement::createBlock()
{std::make_shared<DataTypeString>(), "initial_query_id"}, {std::make_shared<DataTypeString>(), "initial_query_id"},
{DataTypeFactory::instance().get("IPv6"), "initial_address"}, {DataTypeFactory::instance().get("IPv6"), "initial_address"},
{std::make_shared<DataTypeUInt16>(), "initial_port"}, {std::make_shared<DataTypeUInt16>(), "initial_port"},
{std::make_shared<DataTypeDateTime>(), "initial_query_start_time"},
{std::make_shared<DataTypeDateTime64>(6), "initial_query_start_time_microseconds"},
{std::make_shared<DataTypeUInt8>(), "interface"}, {std::make_shared<DataTypeUInt8>(), "interface"},
{std::make_shared<DataTypeString>(), "os_user"}, {std::make_shared<DataTypeString>(), "os_user"},
{std::make_shared<DataTypeString>(), "client_hostname"}, {std::make_shared<DataTypeString>(), "client_hostname"},

View File

@ -543,6 +543,40 @@ BoolMask MergeTreeSetIndex::checkInRange(const std::vector<Range> & key_ranges,
auto left_lower = std::lower_bound(indices.begin(), indices.end(), left_point, less); auto left_lower = std::lower_bound(indices.begin(), indices.end(), left_point, less);
auto right_lower = std::lower_bound(indices.begin(), indices.end(), right_point, less); auto right_lower = std::lower_bound(indices.begin(), indices.end(), right_point, less);
/// A special case of 1-element KeyRange. It's useful for partition pruning
bool one_element_range = true;
for (size_t i = 0; i < tuple_size; ++i)
{
auto & left = left_point[i];
auto & right = right_point[i];
if (left.getType() == right.getType())
{
if (left.getType() == ValueWithInfinity::NORMAL)
{
if (0 != left.getColumnIfFinite().compareAt(0, 0, right.getColumnIfFinite(), 1))
{
one_element_range = false;
break;
}
}
}
else
{
one_element_range = false;
break;
}
}
if (one_element_range)
{
/// Here we know that there is one element in range.
/// The main difference with the normal case is that we can definitely say that
/// condition in this range always TRUE (can_be_false = 0) xor always FALSE (can_be_true = 0).
if (left_lower != indices.end() && equals(*left_lower, left_point))
return {true, false};
else
return {false, true};
}
return return
{ {
left_lower != right_lower left_lower != right_lower

View File

@ -224,8 +224,9 @@ namespace
} }
} }
} }
else if (const auto * tuple_literal = right->as<ASTLiteral>(); else if (const auto * tuple_literal = right->as<ASTLiteral>(); tuple_literal)
tuple_literal && tuple_literal->value.getType() == Field::Types::Tuple) {
if (tuple_literal->value.getType() == Field::Types::Tuple)
{ {
const auto & tuple = tuple_literal->value.get<const Tuple &>(); const auto & tuple = tuple_literal->value.get<const Tuple &>();
for (const auto & child : tuple) for (const auto & child : tuple)
@ -243,6 +244,9 @@ namespace
} }
} }
} }
else
return analyzeEquals(identifier, tuple_literal, expr);
}
else else
{ {
return {}; return {};

View File

@ -355,15 +355,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
{ {
const auto current_time = std::chrono::system_clock::now(); const auto current_time = std::chrono::system_clock::now();
auto & client_info = context->getClientInfo();
// If it's an initial query, set to current_time
if (client_info.initial_query_start_time == 0)
{
client_info.initial_query_start_time = time_in_seconds(current_time);
client_info.initial_query_start_time_microseconds = time_in_microseconds(current_time);
}
#if !defined(ARCADIA_BUILD) #if !defined(ARCADIA_BUILD)
assert(internal || CurrentThread::get().getQueryContext()); assert(internal || CurrentThread::get().getQueryContext());
assert(internal || CurrentThread::get().getQueryContext()->getCurrentQueryId() == CurrentThread::getQueryId()); assert(internal || CurrentThread::get().getQueryContext()->getCurrentQueryId() == CurrentThread::getQueryId());
@ -652,7 +643,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.query = query_for_logging; elem.query = query_for_logging;
elem.normalized_query_hash = normalizedQueryHash<false>(query_for_logging); elem.normalized_query_hash = normalizedQueryHash<false>(query_for_logging);
elem.client_info = client_info; elem.client_info = context->getClientInfo();
bool log_queries = settings.log_queries && !internal; bool log_queries = settings.log_queries && !internal;

View File

@ -1135,8 +1135,7 @@ void TCPHandler::receiveQuery()
/// Per query settings are also passed via TCP. /// Per query settings are also passed via TCP.
/// We need to check them before applying due to they can violate the settings constraints. /// We need to check them before applying due to they can violate the settings constraints.
auto settings_format = (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS) auto settings_format = (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS) ? SettingsWriteFormat::STRINGS_WITH_FLAGS
? SettingsWriteFormat::STRINGS_WITH_FLAGS
: SettingsWriteFormat::BINARY; : SettingsWriteFormat::BINARY;
Settings passed_settings; Settings passed_settings;
passed_settings.read(*in, settings_format); passed_settings.read(*in, settings_format);
@ -1399,7 +1398,7 @@ void TCPHandler::initBlockOutput(const Block & block)
if (state.compression == Protocol::Compression::Enable) if (state.compression == Protocol::Compression::Enable)
{ {
CompressionCodecFactory::instance().validateCodec(method, level, !query_settings.allow_suspicious_codecs); CompressionCodecFactory::instance().validateCodec(method, level, !query_settings.allow_suspicious_codecs, query_settings.allow_experimental_codecs);
state.maybe_compressed_out = std::make_shared<CompressedWriteBuffer>( state.maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(
*out, CompressionCodecFactory::instance().get(method, level)); *out, CompressionCodecFactory::instance().get(method, level));

View File

@ -348,7 +348,7 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
column.comment = *comment; column.comment = *comment;
if (codec) if (codec)
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type, false); column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type, false, true);
column.ttl = ttl; column.ttl = ttl;
@ -389,7 +389,7 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
else else
{ {
if (codec) if (codec)
column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type ? data_type : column.type, false); column.codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(codec, data_type ? data_type : column.type, false, true);
if (comment) if (comment)
column.comment = *comment; column.comment = *comment;
@ -995,7 +995,7 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, ContextPt
ErrorCodes::BAD_ARGUMENTS}; ErrorCodes::BAD_ARGUMENTS};
if (command.codec) if (command.codec)
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs); CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
all_columns.add(ColumnDescription(column_name, command.data_type)); all_columns.add(ColumnDescription(column_name, command.data_type));
} }
@ -1015,7 +1015,7 @@ void AlterCommands::validate(const StorageInMemoryMetadata & metadata, ContextPt
ErrorCodes::NOT_IMPLEMENTED}; ErrorCodes::NOT_IMPLEMENTED};
if (command.codec) if (command.codec)
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs); CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
auto column_default = all_columns.getDefault(column_name); auto column_default = all_columns.getDefault(column_name);
if (column_default) if (column_default)
{ {

View File

@ -128,7 +128,7 @@ void ColumnDescription::readText(ReadBuffer & buf)
comment = col_ast->comment->as<ASTLiteral &>().value.get<String>(); comment = col_ast->comment->as<ASTLiteral &>().value.get<String>();
if (col_ast->codec) if (col_ast->codec)
codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(col_ast->codec, type, false); codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(col_ast->codec, type, false, true);
if (col_ast->ttl) if (col_ast->ttl)
ttl = col_ast->ttl; ttl = col_ast->ttl;

View File

@ -632,7 +632,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
if (compression_method == "ZSTD") if (compression_method == "ZSTD")
compression_level = settings.network_zstd_compression_level; compression_level = settings.network_zstd_compression_level;
CompressionCodecFactory::instance().validateCodec(compression_method, compression_level, !settings.allow_suspicious_codecs); CompressionCodecFactory::instance().validateCodec(compression_method, compression_level, !settings.allow_suspicious_codecs, settings.allow_experimental_codecs);
CompressionCodecPtr compression_codec = CompressionCodecFactory::instance().get(compression_method, compression_level); CompressionCodecPtr compression_codec = CompressionCodecFactory::instance().get(compression_method, compression_level);
/// tmp directory is used to ensure atomicity of transactions /// tmp directory is used to ensure atomicity of transactions

View File

@ -189,7 +189,12 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
if (action_type == ActionType::ADD_PART) if (action_type == ActionType::ADD_PART)
{ {
MergedBlockOutputStream part_out(part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {})); MergedBlockOutputStream part_out(
part,
metadata_snapshot,
block.getNamesAndTypesList(),
{},
CompressionCodecFactory::instance().get("NONE", {}));
part->minmax_idx.update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey())); part->minmax_idx.update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
part->partition.create(metadata_snapshot, block, 0, context); part->partition.create(metadata_snapshot, block, 0, context);

View File

@ -441,10 +441,6 @@ Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormat
if (type == DROP_RANGE) if (type == DROP_RANGE)
return {new_part_name}; return {new_part_name};
/// CLEAR_COLUMN and CLEAR_INDEX are deprecated since 20.3
if (type == CLEAR_COLUMN || type == CLEAR_INDEX)
return {};
if (type == REPLACE_RANGE) if (type == REPLACE_RANGE)
{ {
Strings res = replace_range_entry->new_part_names; Strings res = replace_range_entry->new_part_names;

View File

@ -140,18 +140,6 @@ struct ReplicatedMergeTreeLogEntryData
/// selection of merges. These parts are added to queue.virtual_parts. /// selection of merges. These parts are added to queue.virtual_parts.
Strings getVirtualPartNames(MergeTreeDataFormatVersion format_version) const; Strings getVirtualPartNames(MergeTreeDataFormatVersion format_version) const;
/// Returns set of parts that denote the block number ranges that should be blocked during the entry execution.
/// These parts are added to future_parts.
Strings getBlockingPartNames(MergeTreeDataFormatVersion format_version) const
{
Strings res = getVirtualPartNames(format_version);
if (type == CLEAR_COLUMN)
res.emplace_back(new_part_name);
return res;
}
/// Returns fake part for drop range (for DROP_RANGE and REPLACE_RANGE) /// Returns fake part for drop range (for DROP_RANGE and REPLACE_RANGE)
std::optional<String> getDropRange(MergeTreeDataFormatVersion format_version) const; std::optional<String> getDropRange(MergeTreeDataFormatVersion format_version) const;

View File

@ -1024,17 +1024,11 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{ {
/// If our entry produce part which is already covered by /// If our entry produce part which is already covered by
/// some other entry which is currently executing, then we can postpone this entry. /// some other entry which is currently executing, then we can postpone this entry.
if (entry.type == LogEntry::MERGE_PARTS for (const String & new_part_name : entry.getVirtualPartNames(format_version))
|| entry.type == LogEntry::GET_PART
|| entry.type == LogEntry::ATTACH_PART
|| entry.type == LogEntry::MUTATE_PART)
{
for (const String & new_part_name : entry.getBlockingPartNames(format_version))
{ {
if (!isNotCoveredByFuturePartsImpl(entry.znode_name, new_part_name, out_postpone_reason, state_lock)) if (!isNotCoveredByFuturePartsImpl(entry.znode_name, new_part_name, out_postpone_reason, state_lock))
return false; return false;
} }
}
/// Check that fetches pool is not overloaded /// Check that fetches pool is not overloaded
if ((entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART) if ((entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART)
@ -1247,7 +1241,7 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(const Replicate
++entry->num_tries; ++entry->num_tries;
entry->last_attempt_time = time(nullptr); entry->last_attempt_time = time(nullptr);
for (const String & new_part_name : entry->getBlockingPartNames(queue.format_version)) for (const String & new_part_name : entry->getVirtualPartNames(queue.format_version))
{ {
if (!queue.future_parts.emplace(new_part_name, entry).second) if (!queue.future_parts.emplace(new_part_name, entry).second)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tagging already tagged future part {}. This is a bug. " throw Exception(ErrorCodes::LOGICAL_ERROR, "Tagging already tagged future part {}. This is a bug. "
@ -1288,7 +1282,7 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting()
entry->currently_executing = false; entry->currently_executing = false;
entry->execution_complete.notify_all(); entry->execution_complete.notify_all();
for (const String & new_part_name : entry->getBlockingPartNames(queue.format_version)) for (const String & new_part_name : entry->getVirtualPartNames(queue.format_version))
{ {
if (!queue.future_parts.erase(new_part_name)) if (!queue.future_parts.erase(new_part_name))
{ {

View File

@ -609,7 +609,7 @@ void StorageDistributed::read(
ClusterProxy::executeQuery(query_plan, select_stream_factory, log, ClusterProxy::executeQuery(query_plan, select_stream_factory, log,
modified_query_ast, local_context, query_info, modified_query_ast, local_context, query_info,
sharding_key_expr, sharding_key_column_name, sharding_key_expr, sharding_key_column_name,
getCluster()); query_info.cluster);
/// This is a bug, it is possible only when there is no shards to query, and this is handled earlier. /// This is a bug, it is possible only when there is no shards to query, and this is handled earlier.
if (!query_plan.isInitialized()) if (!query_plan.isInitialized())

View File

@ -5357,56 +5357,6 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry(
event->tryWait(event_wait_timeout_ms); event->tryWait(event_wait_timeout_ms);
} }
} }
else if (startsWith(entry.znode_name, "queue-"))
{
/** In this case, the number of `log` node is unknown. You need look through everything from `log_pointer` to the end,
* looking for a node with the same content. And if we do not find it - then the replica has already taken this entry in its queue.
*/
String log_pointer = getZooKeeper()->get(fs::path(table_zookeeper_path) / "replicas" / replica / "log_pointer");
Strings log_entries = getZooKeeper()->getChildren(fs::path(table_zookeeper_path) / "log");
UInt64 log_index = 0;
bool found = false;
for (const String & log_entry_name : log_entries)
{
log_index = parse<UInt64>(log_entry_name.substr(log_entry_name.size() - 10));
if (!log_pointer.empty() && log_index < parse<UInt64>(log_pointer))
continue;
String log_entry_str;
bool exists = getZooKeeper()->tryGet(fs::path(table_zookeeper_path) / "log" / log_entry_name, log_entry_str);
if (exists && entry_str == log_entry_str)
{
found = true;
log_node_name = log_entry_name;
break;
}
}
if (found)
{
LOG_DEBUG(log, "Waiting for {} to pull {} to queue", replica, log_node_name);
/// Let's wait until the entry gets into the replica queue.
while (!stop_waiting())
{
zkutil::EventPtr event = std::make_shared<Poco::Event>();
String log_pointer_new = getZooKeeper()->get(fs::path(table_zookeeper_path) / "replicas" / replica / "log_pointer", nullptr, event);
if (!log_pointer_new.empty() && parse<UInt64>(log_pointer_new) > log_index)
break;
/// Wait with timeout because we can be already shut down, but not dropped.
/// So log_pointer node will exist, but we will never update it because all background threads already stopped.
/// It can lead to query hung because table drop query can wait for some query (alter, optimize, etc) which called this method,
/// but the query will never finish because the drop already shut down the table.
event->tryWait(event_wait_timeout_ms);
}
}
}
else else
throw Exception("Logical error: unexpected name of log node: " + entry.znode_name, ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: unexpected name of log node: " + entry.znode_name, ErrorCodes::LOGICAL_ERROR);

View File

@ -3,8 +3,9 @@
#include <DataTypes/DataTypeDateTime.h> #include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeArray.h> #include <DataTypes/DataTypeArray.h>
#include <Storages/System/StorageSystemErrors.h> #include <Storages/System/StorageSystemErrors.h>
#include <Common/ErrorCodes.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Common/ErrorCodes.h>
namespace DB namespace DB
{ {

View File

@ -289,7 +289,7 @@ TTLDescription TTLDescription::getTTLFromAST(
{ {
result.recompression_codec = result.recompression_codec =
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST( CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(
ttl_element->recompression_codec, {}, !context->getSettingsRef().allow_suspicious_codecs); ttl_element->recompression_codec, {}, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
} }
} }

View File

@ -1,6 +1,32 @@
import subprocess
import pytest
import logging
from helpers.test_tools import TSV from helpers.test_tools import TSV
from helpers.network import _NetworkManager
@pytest.fixture(autouse=True, scope="session")
def cleanup_environment():
_NetworkManager.clean_all_user_iptables_rules()
try:
result = subprocess.run(['docker', 'container', 'list', '-a', '|', 'wc', '-l'])
if result.returncode != 0:
logging.error(f"docker ps returned error:{str(result.stderr)}")
else:
if int(result.stdout) > 1:
if env["PYTEST_CLEANUP_CONTAINERS"] != 1:
logging.warning(f"Docker containters({result.stdout}) are running before tests run. They can be left from previous pytest run and cause test failures.\n"\
"You can set env PYTEST_CLEANUP_CONTAINERS=1 or use runner with --cleanup-containers argument to enable automatic containers cleanup.")
else:
logging.debug("Trying to kill unstopped containers...")
subprocess.run(['docker', 'kill', f'`docker container list -a`'])
subprocess.run(['docker', 'rm', f'`docker container list -a`'])
logging.debug("Unstopped containers killed")
r = subprocess.run(['docker-compose', 'ps', '--services', '--all'])
logging.debug(f"Docker ps before start:{r.stdout}")
else:
logging.debug(f"No running containers")
except Exception as e:
logging.error(f"cleanup_environment:{str(e)}")
pass
def pytest_assertrepr_compare(op, left, right): yield
if isinstance(left, TSV) and isinstance(right, TSV) and op == '==':
return ['TabSeparated values differ: '] + left.diff(right)

View File

@ -373,7 +373,7 @@ class ClickHouseCluster:
def cleanup(self): def cleanup(self):
# Just in case kill unstopped containers from previous launch # Just in case kill unstopped containers from previous launch
try: try:
result = run_and_check(['docker', 'container', 'list', '-a', '-f name={self.project_name}']) result = run_and_check(['docker', 'container', 'list', '-a', '-f name={self.project_name}', '|', 'wc', '-l'])
if int(result) > 1: if int(result) > 1:
logging.debug("Trying to kill unstopped containers...") logging.debug("Trying to kill unstopped containers...")
run_and_check(['docker', 'kill', f'`docker container list -a -f name={self.project_name}`']) run_and_check(['docker', 'kill', f'`docker container list -a -f name={self.project_name}`'])
@ -663,7 +663,6 @@ class ClickHouseCluster:
'--file', p.join(docker_compose_yml_dir, 'docker_compose_cassandra.yml')] '--file', p.join(docker_compose_yml_dir, 'docker_compose_cassandra.yml')]
return self.base_cassandra_cmd return self.base_cassandra_cmd
def add_instance(self, name, base_config_dir=None, main_configs=None, user_configs=None, dictionaries=None, def add_instance(self, name, base_config_dir=None, main_configs=None, user_configs=None, dictionaries=None,
macros=None, with_zookeeper=False, with_zookeeper_secure=False, macros=None, with_zookeeper=False, with_zookeeper_secure=False,
with_mysql_client=False, with_mysql=False, with_mysql8=False, with_mysql_cluster=False, with_mysql_client=False, with_mysql=False, with_mysql8=False, with_mysql_cluster=False,

View File

@ -136,6 +136,17 @@ class _NetworkManager:
cmd.extend(self._iptables_cmd_suffix(**kwargs)) cmd.extend(self._iptables_cmd_suffix(**kwargs))
self._exec_run_with_retry(cmd, retry_count=3, privileged=True) self._exec_run_with_retry(cmd, retry_count=3, privileged=True)
@staticmethod
def clean_all_user_iptables_rules():
for i in range(1000):
iptables_iter = i
# when rules will be empty, it will return error
res = subprocess.run("iptables -D DOCKER-USER 1", shell=True)
if res.returncode != 0:
logging.info("All iptables rules cleared, " + str(iptables_iter) + "iterations, last error: " + str(res.stderr))
return
@staticmethod @staticmethod
def _iptables_cmd_suffix( def _iptables_cmd_suffix(
source=None, destination=None, source=None, destination=None,

View File

@ -183,6 +183,13 @@ if __name__ == "__main__":
dest="tmpfs", dest="tmpfs",
help="Use tmpfs for dockerd files") help="Use tmpfs for dockerd files")
parser.add_argument(
"--cleanup-containers",
action='store_true',
default=False,
dest="cleanup_containers",
help="Remove all running containers on test session start")
parser.add_argument( parser.add_argument(
"--dockerd-volume-dir", "--dockerd-volume-dir",
action='store', action='store',
@ -241,6 +248,10 @@ if __name__ == "__main__":
subprocess.check_call('docker volume create {name}_volume'.format(name=CONTAINER_NAME), shell=True) subprocess.check_call('docker volume create {name}_volume'.format(name=CONTAINER_NAME), shell=True)
dockerd_internal_volume = "--volume={}_volume:/var/lib/docker".format(CONTAINER_NAME) dockerd_internal_volume = "--volume={}_volume:/var/lib/docker".format(CONTAINER_NAME)
# If enabled we kill and remove containers before pytest session run.
env_cleanup = ""
if args.cleanup_containers:
env_cleanup = "-e PYTEST_CLEANUP_CONTAINERS=1"
# enable tty mode & interactive for docker if we have real tty # enable tty mode & interactive for docker if we have real tty
tty = "" tty = ""
if sys.stdout.isatty() and sys.stdin.isatty(): if sys.stdout.isatty() and sys.stdin.isatty():
@ -253,7 +264,7 @@ if __name__ == "__main__":
--volume={base_cfg}:/clickhouse-config --volume={cases_dir}:/ClickHouse/tests/integration \ --volume={base_cfg}:/clickhouse-config --volume={cases_dir}:/ClickHouse/tests/integration \
--volume={src_dir}/Server/grpc_protos:/ClickHouse/src/Server/grpc_protos \ --volume={src_dir}/Server/grpc_protos:/ClickHouse/src/Server/grpc_protos \
{dockerd_internal_volume} -e DOCKER_CLIENT_TIMEOUT=300 -e COMPOSE_HTTP_TIMEOUT=600 \ {dockerd_internal_volume} -e DOCKER_CLIENT_TIMEOUT=300 -e COMPOSE_HTTP_TIMEOUT=600 \
{env_tags} -e PYTEST_OPTS='{parallel} {opts} {tests_list}' {img} {command}".format( {env_tags} {env_cleanup} -e PYTEST_OPTS='{parallel} {opts} {tests_list}' {img} {command}".format(
net=net, net=net,
tty=tty, tty=tty,
bin=args.binary, bin=args.binary,
@ -263,6 +274,7 @@ if __name__ == "__main__":
cases_dir=args.cases_dir, cases_dir=args.cases_dir,
src_dir=args.src_dir, src_dir=args.src_dir,
env_tags=env_tags, env_tags=env_tags,
env_cleanup=env_cleanup,
parallel=parallel_args, parallel=parallel_args,
opts=' '.join(args.pytest_args), opts=' '.join(args.pytest_args),
tests_list=' '.join(args.tests_list), tests_list=' '.join(args.tests_list),

View File

@ -17,11 +17,12 @@ def run_endpoint(cluster):
cluster.exec_in_container(container_id, ["python", "endpoint.py"], detach=True) cluster.exec_in_container(container_id, ["python", "endpoint.py"], detach=True)
# Wait for S3 endpoint start # Wait for S3 endpoint start
for attempt in range(10): num_attempts = 100
for attempt in range(num_attempts):
ping_response = cluster.exec_in_container(cluster.get_container_id('resolver'), ping_response = cluster.exec_in_container(cluster.get_container_id('resolver'),
["curl", "-s", "http://resolver:8080/"], nothrow=True) ["curl", "-s", "http://resolver:8080/"], nothrow=True)
if ping_response != 'OK': if ping_response != 'OK':
if attempt == 9: if attempt == num_attempts - 1:
assert ping_response == 'OK', 'Expected "OK", but got "{}"'.format(ping_response) assert ping_response == 'OK', 'Expected "OK", but got "{}"'.format(ping_response)
else: else:
time.sleep(1) time.sleep(1)

View File

@ -0,0 +1,7 @@
<yandex>
<profiles>
<default>
<allow_experimental_codecs>1</allow_experimental_codecs>
</default>
</profiles>
</yandex>

View File

@ -17,7 +17,8 @@ node4 = cluster.add_instance('node4', user_configs=['configs/enable_uncompressed
node5 = cluster.add_instance('node5', main_configs=['configs/zstd_compression_by_default.xml'], node5 = cluster.add_instance('node5', main_configs=['configs/zstd_compression_by_default.xml'],
user_configs=['configs/enable_uncompressed_cache.xml', user_configs=['configs/enable_uncompressed_cache.xml',
'configs/allow_suspicious_codecs.xml']) 'configs/allow_suspicious_codecs.xml'])
node6 = cluster.add_instance('node6', main_configs=['configs/allow_experimental_codecs.xml'],
user_configs=['configs/allow_suspicious_codecs.xml'])
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def start_cluster(): def start_cluster():

View File

@ -22,6 +22,8 @@ def started_cluster():
finally: finally:
cluster.shutdown() cluster.shutdown()
# TODO Remove it and enable test
@pytest.mark.skip(reason="Don't work in parallel mode for some reason")
def test_read_table(started_cluster): def test_read_table(started_cluster):
hdfs_api = started_cluster.make_hdfs_api(kerberized=True) hdfs_api = started_cluster.make_hdfs_api(kerberized=True)
@ -34,7 +36,8 @@ def test_read_table(started_cluster):
select_read = node1.query("select * from hdfs('hdfs://kerberizedhdfs1:9010/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')") select_read = node1.query("select * from hdfs('hdfs://kerberizedhdfs1:9010/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')")
assert select_read == data assert select_read == data
# TODO Remove it and enable test
@pytest.mark.skip(reason="Don't work in parallel mode for some reason")
def test_read_write_storage(started_cluster): def test_read_write_storage(started_cluster):
hdfs_api = started_cluster.make_hdfs_api(kerberized=True) hdfs_api = started_cluster.make_hdfs_api(kerberized=True)
@ -47,7 +50,8 @@ def test_read_write_storage(started_cluster):
select_read = node1.query("select * from SimpleHDFSStorage2") select_read = node1.query("select * from SimpleHDFSStorage2")
assert select_read == "1\tMark\t72.53\n" assert select_read == "1\tMark\t72.53\n"
# TODO Remove it and enable test
@pytest.mark.skip(reason="Don't work in parallel mode for some reason")
def test_write_storage_not_expired(started_cluster): def test_write_storage_not_expired(started_cluster):
hdfs_api = started_cluster.make_hdfs_api(kerberized=True) hdfs_api = started_cluster.make_hdfs_api(kerberized=True)
@ -62,7 +66,8 @@ def test_write_storage_not_expired(started_cluster):
select_read = node1.query("select * from SimpleHDFSStorageNotExpired") select_read = node1.query("select * from SimpleHDFSStorageNotExpired")
assert select_read == "1\tMark\t72.53\n" assert select_read == "1\tMark\t72.53\n"
# TODO Remove it and enable test
@pytest.mark.skip(reason="Don't work in parallel mode for some reason")
def test_two_users(started_cluster): def test_two_users(started_cluster):
hdfs_api = started_cluster.make_hdfs_api(kerberized=True) hdfs_api = started_cluster.make_hdfs_api(kerberized=True)
@ -76,6 +81,8 @@ def test_two_users(started_cluster):
select_read_2 = node1.query("select * from hdfs('hdfs://suser@kerberizedhdfs1:9010/storage_user_one', 'TSV', 'id UInt64, text String, number Float64')") select_read_2 = node1.query("select * from hdfs('hdfs://suser@kerberizedhdfs1:9010/storage_user_one', 'TSV', 'id UInt64, text String, number Float64')")
# TODO Remove it and enable test
@pytest.mark.skip(reason="Don't work in parallel mode for some reason")
def test_read_table_expired(started_cluster): def test_read_table_expired(started_cluster):
hdfs_api = started_cluster.make_hdfs_api(kerberized=True) hdfs_api = started_cluster.make_hdfs_api(kerberized=True)
@ -93,7 +100,8 @@ def test_read_table_expired(started_cluster):
started_cluster.unpause_container('hdfskerberos') started_cluster.unpause_container('hdfskerberos')
# TODO Remove it and enable test
@pytest.mark.skip(reason="Don't work in parallel mode for some reason")
def test_prohibited(started_cluster): def test_prohibited(started_cluster):
node1.query("create table HDFSStorTwoProhibited (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://suser@kerberizedhdfs1:9010/storage_user_two_prohibited', 'TSV')") node1.query("create table HDFSStorTwoProhibited (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://suser@kerberizedhdfs1:9010/storage_user_two_prohibited', 'TSV')")
try: try:
@ -102,7 +110,8 @@ def test_prohibited(started_cluster):
except Exception as ex: except Exception as ex:
assert "Unable to open HDFS file: /storage_user_two_prohibited error: Permission denied: user=specuser, access=WRITE" in str(ex) assert "Unable to open HDFS file: /storage_user_two_prohibited error: Permission denied: user=specuser, access=WRITE" in str(ex)
# TODO Remove it and enable test
@pytest.mark.skip(reason="Don't work in parallel mode for some reason")
def test_cache_path(started_cluster): def test_cache_path(started_cluster):
node1.query("create table HDFSStorCachePath (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://dedicatedcachepath@kerberizedhdfs1:9010/storage_dedicated_cache_path', 'TSV')") node1.query("create table HDFSStorCachePath (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://dedicatedcachepath@kerberizedhdfs1:9010/storage_dedicated_cache_path', 'TSV')")
try: try:

View File

@ -440,11 +440,12 @@ def run_s3_mocks(started_cluster):
# Wait for S3 mocks to start # Wait for S3 mocks to start
for mock_filename, container, port in mocks: for mock_filename, container, port in mocks:
for attempt in range(10): num_attempts = 100
for attempt in range(num_attempts):
ping_response = started_cluster.exec_in_container(started_cluster.get_container_id(container), ping_response = started_cluster.exec_in_container(started_cluster.get_container_id(container),
["curl", "-s", f"http://localhost:{port}/"], nothrow=True) ["curl", "-s", f"http://localhost:{port}/"], nothrow=True)
if ping_response != 'OK': if ping_response != 'OK':
if attempt == 9: if attempt == num_attempts - 1:
assert ping_response == 'OK', 'Expected "OK", but got "{}"'.format(ping_response) assert ping_response == 'OK', 'Expected "OK", but got "{}"'.format(ping_response)
else: else:
time.sleep(1) time.sleep(1)

View File

@ -1,2 +0,0 @@
*.stderr
*.stdout

View File

@ -3,9 +3,11 @@ DROP TABLE IF EXISTS ES;
create table ES(A String) Engine=MergeTree order by tuple(); create table ES(A String) Engine=MergeTree order by tuple();
insert into ES select toString(number) from numbers(10000000); insert into ES select toString(number) from numbers(10000000);
SET max_execution_time = 100, max_execution_speed = 1000000; SET max_execution_time = 100,
SET max_threads = 1; timeout_before_checking_execution_speed = 100,
SET max_block_size = 1000000; max_execution_speed = 1000000,
max_threads = 1,
max_block_size = 1000000;
-- Exception about execution speed is not thrown from these queries. -- Exception about execution speed is not thrown from these queries.
SELECT * FROM ES LIMIT 1 format Null; SELECT * FROM ES LIMIT 1 format Null;

View File

@ -25,3 +25,5 @@ $CLICKHOUSE_CLIENT -q "select x + y + 1, argMax(y, x), sum(x - y) as s from test
$CLICKHOUSE_CLIENT -q "select x + y, sum(x - y) as s from test_agg_proj prewhere (x + y) % 2 = 1 group by x + y order by s desc limit 5 settings allow_experimental_projection_optimization=1" $CLICKHOUSE_CLIENT -q "select x + y, sum(x - y) as s from test_agg_proj prewhere (x + y) % 2 = 1 group by x + y order by s desc limit 5 settings allow_experimental_projection_optimization=1"
$CLICKHOUSE_CLIENT -q "select x + y, sum(x - y) as s from test_agg_proj prewhere (x + y) % 2 = 1 group by x + y order by s desc limit 5 settings allow_experimental_projection_optimization=1 format JSON" | grep "rows_read" $CLICKHOUSE_CLIENT -q "select x + y, sum(x - y) as s from test_agg_proj prewhere (x + y) % 2 = 1 group by x + y order by s desc limit 5 settings allow_experimental_projection_optimization=1 format JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT -q "drop table test_agg_proj"

View File

@ -77,3 +77,4 @@ echo "optimize_move_to_prewhere = 1, allow_experimental_projection_optimization
$CLICKHOUSE_CLIENT -q "SELECT * FROM test_sort_proj WHERE y > 4294967286 order by x FORMAT JSON $CLICKHOUSE_CLIENT -q "SELECT * FROM test_sort_proj WHERE y > 4294967286 order by x FORMAT JSON
SETTINGS optimize_move_to_prewhere = 1, allow_experimental_projection_optimization = 1" | grep rows_read SETTINGS optimize_move_to_prewhere = 1, allow_experimental_projection_optimization = 1" | grep rows_read
$CLICKHOUSE_CLIENT -q "DROP TABLE test_sort_proj"

View File

@ -12,6 +12,8 @@ WITH CAST(\'default\', \'String\') AS id_2 SELECT one.dummy, ignore(id_2) FROM s
optimize_skip_unused_shards_rewrite_in(0,) optimize_skip_unused_shards_rewrite_in(0,)
0 0 0 0
WITH CAST(\'default\', \'String\') AS id_0 SELECT one.dummy, ignore(id_0) FROM system.one WHERE dummy IN tuple(0) WITH CAST(\'default\', \'String\') AS id_0 SELECT one.dummy, ignore(id_0) FROM system.one WHERE dummy IN tuple(0)
0
0
errors errors
others others
0 0

View File

@ -81,18 +81,18 @@ select query from system.query_log where
type = 'QueryFinish' type = 'QueryFinish'
order by query; order by query;
-- not tuple
select * from dist_01756 where dummy in (0);
select * from dist_01756 where dummy in ('0');
-- --
-- errors -- errors
-- --
select 'errors'; select 'errors';
-- not tuple
select * from dist_01756 where dummy in (0); -- { serverError 507 }
-- optimize_skip_unused_shards does not support non-constants -- optimize_skip_unused_shards does not support non-constants
select * from dist_01756 where dummy in (select * from system.one); -- { serverError 507 } select * from dist_01756 where dummy in (select * from system.one); -- { serverError 507 }
select * from dist_01756 where dummy in (toUInt8(0)); -- { serverError 507 } select * from dist_01756 where dummy in (toUInt8(0)); -- { serverError 507 }
-- wrong type (tuple)
select * from dist_01756 where dummy in ('0'); -- { serverError 507 }
-- intHash64 does not accept string -- intHash64 does not accept string
select * from dist_01756 where dummy in ('0', '2'); -- { serverError 43 } select * from dist_01756 where dummy in ('0', '2'); -- { serverError 43 }
-- NOT IN does not supported -- NOT IN does not supported

View File

@ -12,3 +12,6 @@ SELECT 1025 == count(n) FROM foo AS t1 ANY LEFT JOIN foo_lc AS t2 ON t1.n == t2.
SELECT 1025 == count(n) FROM foo_lc AS t1 ANY LEFT JOIN foo AS t2 ON t1.n == t2.n; SELECT 1025 == count(n) FROM foo_lc AS t1 ANY LEFT JOIN foo AS t2 ON t1.n == t2.n;
SELECT 1025 == count(n) FROM foo_lc AS t1 ALL LEFT JOIN foo_lc AS t2 ON t1.n == t2.n; SELECT 1025 == count(n) FROM foo_lc AS t1 ALL LEFT JOIN foo_lc AS t2 ON t1.n == t2.n;
DROP TABLE foo;
DROP TABLE foo_lc;

View File

@ -41,3 +41,5 @@ SELECT 'after row policy with plaintext_password';
psql "postgresql://postgresql_user:qwerty@localhost:${CLICKHOUSE_PORT_POSTGRESQL}/db01802" -c "SELECT * FROM postgresql;" psql "postgresql://postgresql_user:qwerty@localhost:${CLICKHOUSE_PORT_POSTGRESQL}/db01802" -c "SELECT * FROM postgresql;"
$CLICKHOUSE_CLIENT -q "DROP TABLE db01802.postgresql"
$CLICKHOUSE_CLIENT -q "DROP DATABASE db01802"

View File

@ -5,3 +5,5 @@ drop table if exists data_01832;
create table data_01832 (key Int) Engine=Memory; create table data_01832 (key Int) Engine=Memory;
insert into data_01832 values (1); insert into data_01832 values (1);
select * from data_01832; select * from data_01832;
drop table data_01832;

View File

@ -24,3 +24,6 @@ LAYOUT(COMPLEX_KEY_DIRECT());
SELECT 'complex key'; SELECT 'complex key';
SELECT name, key FROM system.dictionaries WHERE name='example_complex_key_dictionary' AND database=currentDatabase(); SELECT name, key FROM system.dictionaries WHERE name='example_complex_key_dictionary' AND database=currentDatabase();
DROP DICTIONARY example_complex_key_dictionary;
DROP DICTIONARY example_simple_key_dictionary;

View File

@ -28,3 +28,5 @@ CREATE TABLE test_null_as_default (a Int8, b Int64 DEFAULT c - 500, c Int32 DEFA
INSERT INTO test_null_as_default(a, c) SELECT 1, NULL UNION ALL SELECT 2, NULL; INSERT INTO test_null_as_default(a, c) SELECT 1, NULL UNION ALL SELECT 2, NULL;
SELECT * FROM test_null_as_default ORDER BY a; SELECT * FROM test_null_as_default ORDER BY a;
DROP TABLE test_null_as_default;

View File

@ -4,5 +4,4 @@
# shellcheck source=../shell_config.sh # shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh . "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_LOCAL} -q "select col1, initializeAggregation('argMaxState', col2, insertTime) as col2, now() as insertTime FROM generateRandom('col1 String, col2 Array(Float64)') LIMIT 1000000 FORMAT CSV" | curl -s 'http://localhost:8123/?query=INSERT%20INTO%20non_existing_table%20SELECT%20col1%2C%20initializeAggregation(%27argMaxState%27%2C%20col2%2C%20insertTime)%20as%20col2%2C%20now()%20as%20insertTime%20FROM%20input(%27col1%20String%2C%20col2%20Array(Float64)%27)%20FORMAT%20CSV' --data-binary @- | grep -q "Table default.non_existing_table doesn't exist" && echo 'Ok.' || echo 'FAIL' ||: ${CLICKHOUSE_LOCAL} -q "select col1, initializeAggregation('argMaxState', col2, insertTime) as col2, now() as insertTime FROM generateRandom('col1 String, col2 Array(Float64)') LIMIT 1000000 FORMAT CSV" | ${CLICKHOUSE_CURL} -s 'http://localhost:8123/?query=INSERT%20INTO%20non_existing_table%20SELECT%20col1%2C%20initializeAggregation(%27argMaxState%27%2C%20col2%2C%20insertTime)%20as%20col2%2C%20now()%20as%20insertTime%20FROM%20input(%27col1%20String%2C%20col2%20Array(Float64)%27)%20FORMAT%20CSV' --data-binary @- | grep -q "Table default.non_existing_table doesn't exist" && echo 'Ok.' || echo 'FAIL' ||:

View File

@ -13,3 +13,6 @@ insert into dist_01850 values (1); -- { serverError 60 }
drop table if exists dist_01850; drop table if exists dist_01850;
drop table shard_0.data_01850; drop table shard_0.data_01850;
drop database shard_0;
drop database shard_1;

View File

@ -307,3 +307,4 @@ SELECT name, found_rate FROM system.dictionaries WHERE database = currentDatabas
DROP TABLE polygons_01862; DROP TABLE polygons_01862;
DROP TABLE points_01862; DROP TABLE points_01862;
DROP DICTIONARY polygon_dictionary_01862;

View File

@ -14,3 +14,6 @@ INSERT INTO test_jit_nullable VALUES (0), (1), (NULL);
SELECT 'test_jit_nullable'; SELECT 'test_jit_nullable';
SELECT value, multiIf(value = 1, 2, value, 1, 0), if (value, 1, 0) FROM test_jit_nullable; SELECT value, multiIf(value = 1, 2, value, 1, 0), if (value, 1, 0) FROM test_jit_nullable;
DROP TABLE test_jit_nonnull;
DROP TABLE test_jit_nullable;

View File

@ -36,6 +36,7 @@ function run_test_once()
$CLICKHOUSE_CLIENT -nm -q " $CLICKHOUSE_CLIENT -nm -q "
DROP DICTIONARY simple_key_cache_dictionary_01863; DROP DICTIONARY simple_key_cache_dictionary_01863;
DROP TABLE simple_key_source_table_01863;
" "
if [ "$prev" == "$curr" ]; then if [ "$prev" == "$curr" ]; then

View File

@ -5,3 +5,5 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh . "$CURDIR"/../shell_config.sh
python3 "$CURDIR"/01854_HTTP_dict_decompression.python python3 "$CURDIR"/01854_HTTP_dict_decompression.python
$CLICKHOUSE_CLIENT -q "DROP DICTIONARY test_table_select"

View File

@ -0,0 +1,26 @@
DROP TABLE IF EXISTS interval;
DROP TABLE IF EXISTS fl_interval;
DROP TABLE IF EXISTS dt_interval;
DROP TABLE IF EXISTS date_interval;
CREATE TABLE interval ( `id` String, `start` Int64, `end` Int64 ) ENGINE = MergeTree ORDER BY start;
INSERT INTO interval VALUES ('a', 1, 3), ('a', 1, 3), ('a', 2, 4), ('a', 1, 1), ('a', 5, 6), ('a', 5, 7), ('b', 10, 12), ('b', 13, 19), ('b', 14, 16), ('c', -1, 1), ('c', -2, -1);
CREATE TABLE fl_interval ( `id` String, `start` Float, `end` Float ) ENGINE = MergeTree ORDER BY start;
INSERT INTO fl_interval VALUES ('a', 1.1, 3.2), ('a', 1.5, 3.6), ('a', 4.0, 5.0);
CREATE TABLE dt_interval ( `id` String, `start` DateTime, `end` DateTime ) ENGINE = MergeTree ORDER BY start;
INSERT INTO dt_interval VALUES ('a', '2020-01-01 02:11:22', '2020-01-01 03:12:31'), ('a', '2020-01-01 01:12:30', '2020-01-01 02:50:11');
CREATE TABLE date_interval ( `id` String, `start` Date, `end` Date ) ENGINE = MergeTree ORDER BY start;
INSERT INTO date_interval VALUES ('a', '2020-01-01', '2020-01-04'), ('a', '2020-01-03', '2020-01-08 02:50:11');
SELECT id, intervalLengthSum(start, end), toTypeName(intervalLengthSum(start, end)) FROM interval GROUP BY id ORDER BY id;
SELECT id, 3.4 < intervalLengthSum(start, end) AND intervalLengthSum(start, end) < 3.6, toTypeName(intervalLengthSum(start, end)) FROM fl_interval GROUP BY id ORDER BY id;
SELECT id, intervalLengthSum(start, end), toTypeName(intervalLengthSum(start, end)) FROM dt_interval GROUP BY id ORDER BY id;
SELECT id, intervalLengthSum(start, end), toTypeName(intervalLengthSum(start, end)) FROM date_interval GROUP BY id ORDER BY id;
DROP TABLE interval;
DROP TABLE fl_interval;
DROP TABLE dt_interval;
DROP TABLE date_interval;

View File

@ -1,26 +0,0 @@
DROP TABLE IF EXISTS segment;
DROP TABLE IF EXISTS fl_segment;
DROP TABLE IF EXISTS dt_segment;
DROP TABLE IF EXISTS date_segment;
CREATE TABLE segment ( `id` String, `start` Int64, `end` Int64 ) ENGINE = MergeTree ORDER BY start;
INSERT INTO segment VALUES ('a', 1, 3), ('a', 1, 3), ('a', 2, 4), ('a', 1, 1), ('a', 5, 6), ('a', 5, 7), ('b', 10, 12), ('b', 13, 19), ('b', 14, 16), ('c', -1, 1), ('c', -2, -1);
CREATE TABLE fl_segment ( `id` String, `start` Float, `end` Float ) ENGINE = MergeTree ORDER BY start;
INSERT INTO fl_segment VALUES ('a', 1.1, 3.2), ('a', 1.5, 3.6), ('a', 4.0, 5.0);
CREATE TABLE dt_segment ( `id` String, `start` DateTime, `end` DateTime ) ENGINE = MergeTree ORDER BY start;
INSERT INTO dt_segment VALUES ('a', '2020-01-01 02:11:22', '2020-01-01 03:12:31'), ('a', '2020-01-01 01:12:30', '2020-01-01 02:50:11');
CREATE TABLE date_segment ( `id` String, `start` Date, `end` Date ) ENGINE = MergeTree ORDER BY start;
INSERT INTO date_segment VALUES ('a', '2020-01-01', '2020-01-04'), ('a', '2020-01-03', '2020-01-08 02:50:11');
SELECT id, segmentLengthSum(start, end), toTypeName(segmentLengthSum(start, end)) FROM segment GROUP BY id ORDER BY id;
SELECT id, 3.4 < segmentLengthSum(start, end) AND segmentLengthSum(start, end) < 3.6, toTypeName(segmentLengthSum(start, end)) FROM fl_segment GROUP BY id ORDER BY id;
SELECT id, segmentLengthSum(start, end), toTypeName(segmentLengthSum(start, end)) FROM dt_segment GROUP BY id ORDER BY id;
SELECT id, segmentLengthSum(start, end), toTypeName(segmentLengthSum(start, end)) FROM date_segment GROUP BY id ORDER BY id;
DROP TABLE segment;
DROP TABLE fl_segment;
DROP TABLE dt_segment;
DROP TABLE date_segment;

View File

@ -38,3 +38,5 @@ SELECT 'dt64 != const dt' FROM dt64test WHERE dt64_column != toDateTime('2020-01
SELECT 'dt64 != dt' FROM dt64test WHERE dt64_column != materialize(toDateTime('2020-01-13 13:37:00')); SELECT 'dt64 != dt' FROM dt64test WHERE dt64_column != materialize(toDateTime('2020-01-13 13:37:00'));
SELECT 'dt != const dt64' FROM dt64test WHERE dt_column != toDateTime64('2020-01-13 13:37:00', 3); SELECT 'dt != const dt64' FROM dt64test WHERE dt_column != toDateTime64('2020-01-13 13:37:00', 3);
SELECT 'dt != dt64' FROM dt64test WHERE dt_column != materialize(toDateTime64('2020-01-13 13:37:00', 3)); SELECT 'dt != dt64' FROM dt64test WHERE dt_column != materialize(toDateTime64('2020-01-13 13:37:00', 3));
DROP TABLE dt64test;

View File

@ -48,3 +48,7 @@ SELECT count() FROM table4 WHERE id % 10 = 7;
SELECT 'comparison:'; SELECT 'comparison:';
SELECT v, v-205 as vv, modulo(vv, 200), moduloLegacy(vv, 200) FROM table1 ORDER BY v; SELECT v, v-205 as vv, modulo(vv, 200), moduloLegacy(vv, 200) FROM table1 ORDER BY v;
DROP TABLE table1;
DROP TABLE table2;
DROP TABLE table3;
DROP TABLE table4;

View File

@ -1,27 +0,0 @@
#!/usr/bin/env bash
set -ue
# this test doesn't need 'current_database = currentDatabase()',
unset CLICKHOUSE_LOG_COMMENT
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} -q "drop table if exists m"
${CLICKHOUSE_CLIENT} -q "create table m (dummy UInt8) ENGINE = Distributed('test_cluster_two_shards', 'system', 'one')"
query_id=$(${CLICKHOUSE_CLIENT} -q "select lower(hex(reverse(reinterpretAsString(generateUUIDv4()))))")
${CLICKHOUSE_CLIENT} -q "select * from m format Null" "--query_id=$query_id"
${CLICKHOUSE_CLIENT} -n -q "
system flush logs;
select
anyIf(initial_query_start_time, is_initial_query) = anyIf(initial_query_start_time, not is_initial_query),
anyIf(initial_query_start_time_microseconds, is_initial_query) = anyIf(initial_query_start_time_microseconds, not is_initial_query)
from system.query_log
where event_date = today() and event_time >= now() - interval 1 hour and initial_query_id = '$query_id' and type = 'QueryFinish';
"
${CLICKHOUSE_CLIENT} -q "drop table m"

View File

@ -14,3 +14,6 @@ select
/ /
(select count() from nums_in_mem_dist where rand() > 0) (select count() from nums_in_mem_dist where rand() > 0)
from system.one; -- { serverError 158 } from system.one; -- { serverError 158 }
drop table nums_in_mem;
drop table nums_in_mem_dist;

View File

@ -51,3 +51,5 @@ check_background_query &
$CLICKHOUSE_CLIENT -q "select *, sleepEachRow(1) from data_01882" --max_threads=1 --format Null --query_id="$QUERY_ID" --max_block_size=1 $CLICKHOUSE_CLIENT -q "select *, sleepEachRow(1) from data_01882" --max_threads=1 --format Null --query_id="$QUERY_ID" --max_block_size=1
wait wait
$CLICKHOUSE_CLIENT -q "drop table data_01882"

View File

@ -0,0 +1,6 @@
7
0 100
6 106
7 107
8 108
9 109

Some files were not shown because too many files have changed in this diff Show More