Merge branch 'master' into configurable_max_parts_to_merge

This commit is contained in:
alesapin 2021-05-26 11:30:39 +03:00
commit 08c37a4c9e
60 changed files with 965 additions and 223 deletions

2
contrib/libunwind vendored

@ -1 +1 @@
Subproject commit 1e4a2e5ce77be1af12e918a3c15dccf2bbac587d
Subproject commit a491c27b33109a842d577c0f7ac5f5f218859181

View File

@ -374,6 +374,7 @@ function run_tests
01801_s3_cluster
# Depends on LLVM JIT
01072_nullable_jit
01852_jit_if
01865_jit_comparison_constant_result
01871_merge_tree_compile_expressions

View File

@ -27,6 +27,10 @@ while true; do
done
set -e
# cleanup for retry run if volume is not recreated
docker kill "$(docker ps -aq)" || true
docker rm "$(docker ps -aq)" || true
echo "Start tests"
export CLICKHOUSE_TESTS_SERVER_BIN_PATH=/clickhouse
export CLICKHOUSE_TESTS_CLIENT_BIN_PATH=/clickhouse

View File

@ -41,6 +41,14 @@ toc_title: Cloud
- Built-in monitoring and database management platform
- Professional database expert technical support and service
## SberCloud {#sbercloud}
[SberCloud.Advanced](https://sbercloud.ru/en/advanced) provides [MapReduce Service (MRS)](https://docs.sbercloud.ru/mrs/ug/topics/ug__clickhouse.html), a reliable, secure, and easy-to-use enterprise-level platform for storing, processing, and analyzing big data. MRS allows you to quickly create and manage ClickHouse clusters.
- A ClickHouse instance consists of three ZooKeeper nodes and multiple ClickHouse nodes. The Dedicated Replica mode is used to ensure high reliability of dual data copies.
- MRS provides smooth and elastic scaling capabilities to quickly meet service growth requirements in scenarios where the cluster storage capacity or CPU computing resources are not enough. When you expand the capacity of ClickHouse nodes in a cluster, MRS provides a one-click data balancing tool and gives you the initiative to balance data. You can determine the data balancing mode and time based on service characteristics to ensure service availability, implementing smooth scaling.
- MRS uses the Elastic Load Balance ensuring high availability deployment architecture to automatically distribute user access traffic to multiple backend nodes, expanding service capabilities to external systems and improving fault tolerance. With the ELB polling mechanism, data is written to local tables and read from distributed tables on different nodes. In this way, data read/write load and high availability of application access are guaranteed.
## Tencent Cloud {#tencent-cloud}
[Tencent Managed Service for ClickHouse](https://cloud.tencent.com/product/cdwch) provides the following key features:

View File

@ -101,6 +101,8 @@ For very large clusters, you can use different ZooKeeper clusters for different
Replication is asynchronous and multi-master. `INSERT` queries (as well as `ALTER`) can be sent to any available server. Data is inserted on the server where the query is run, and then it is copied to the other servers. Because it is asynchronous, recently inserted data appears on the other replicas with some latency. If part of the replicas are not available, the data is written when they become available. If a replica is available, the latency is the amount of time it takes to transfer the block of compressed data over the network. The number of threads performing background tasks for replicated tables can be set by [background_schedule_pool_size](../../../operations/settings/settings.md#background_schedule_pool_size) setting.
`ReplicatedMergeTree` engine uses a separate thread pool for replicated fetches. Size of the pool is limited by the [background_fetches_pool_size](../../../operations/settings/settings.md#background_fetches_pool_size) setting which can be tuned with a server restart.
By default, an INSERT query waits for confirmation of writing the data from only one replica. If the data was successfully written to only one replica and the server with this replica ceases to exist, the stored data will be lost. To enable getting confirmation of data writes from multiple replicas, use the `insert_quorum` option.
Each block of data is written atomically. The INSERT query is divided into blocks up to `max_insert_block_size = 1048576` rows. In other words, if the `INSERT` query has less than 1048576 rows, it is made atomically.
@ -284,6 +286,7 @@ If the data in ZooKeeper was lost or damaged, you can save data by moving it to
**See Also**
- [background_schedule_pool_size](../../../operations/settings/settings.md#background_schedule_pool_size)
- [background_fetches_pool_size](../../../operations/settings/settings.md#background_fetches_pool_size)
- [execute_merges_on_single_replica_time_threshold](../../../operations/settings/settings.md#execute-merges-on-single-replica-time-threshold)
[Original article](https://clickhouse.tech/docs/en/operations/table_engines/replication/) <!--hide-->

View File

@ -2034,6 +2034,16 @@ Possible values:
Default value: 16.
## background_fetches_pool_size {#background_fetches_pool_size}
Sets the number of threads performing background fetches for [replicated](../../engines/table-engines/mergetree-family/replication.md) tables. This setting is applied at the ClickHouse server start and cant be changed in a user session. For production usage with frequent small insertions or slow ZooKeeper cluster is recomended to use default value.
Possible values:
- Any positive integer.
Default value: 8.
## always_fetch_merged_part {#always_fetch_merged_part}
Prohibits data parts merging in [Replicated\*MergeTree](../../engines/table-engines/mergetree-family/replication.md)-engine tables.

View File

@ -13,7 +13,7 @@ Returns an array of selected substrings. Empty substrings may be selected if the
**Syntax**
``` sql
splitByChar(<separator>, <s>)
splitByChar(separator, s)
```
**Arguments**
@ -29,12 +29,12 @@ Returns an array of selected substrings. Empty substrings may be selected when:
- There are multiple consecutive separators;
- The original string `s` is empty.
Type: [Array](../../sql-reference/data-types/array.md) of [String](../../sql-reference/data-types/string.md).
Type: [Array](../../sql-reference/data-types/array.md)([String](../../sql-reference/data-types/string.md)).
**Example**
``` sql
SELECT splitByChar(',', '1,2,3,abcde')
SELECT splitByChar(',', '1,2,3,abcde');
```
``` text
@ -50,7 +50,7 @@ Splits a string into substrings separated by a string. It uses a constant string
**Syntax**
``` sql
splitByString(<separator>, <s>)
splitByString(separator, s)
```
**Arguments**
@ -62,7 +62,7 @@ splitByString(<separator>, <s>)
Returns an array of selected substrings. Empty substrings may be selected when:
Type: [Array](../../sql-reference/data-types/array.md) of [String](../../sql-reference/data-types/string.md).
Type: [Array](../../sql-reference/data-types/array.md)([String](../../sql-reference/data-types/string.md)).
- A non-empty separator occurs at the beginning or end of the string;
- There are multiple consecutive non-empty separators;
@ -71,7 +71,7 @@ Type: [Array](../../sql-reference/data-types/array.md) of [String](../../sql-ref
**Example**
``` sql
SELECT splitByString(', ', '1, 2 3, 4,5, abcde')
SELECT splitByString(', ', '1, 2 3, 4,5, abcde');
```
``` text
@ -81,7 +81,7 @@ SELECT splitByString(', ', '1, 2 3, 4,5, abcde')
```
``` sql
SELECT splitByString('', 'abcde')
SELECT splitByString('', 'abcde');
```
``` text
@ -92,12 +92,12 @@ SELECT splitByString('', 'abcde')
## splitByRegexp(regexp, s) {#splitbyregexpseparator-s}
Splits a string into substrings separated by a regular expression. It uses a regular expression string `regexp` as the separator. If the `regexp` is empty, it will split the string s into an array of single characters. If no match is found for this regex expression, the string `s` won't be split.
Splits a string into substrings separated by a regular expression. It uses a regular expression string `regexp` as the separator. If the `regexp` is empty, it will split the string `s` into an array of single characters. If no match is found for this regular expression, the string `s` won't be split.
**Syntax**
``` sql
splitByRegexp(<regexp>, <s>)
splitByRegexp(regexp, s)
```
**Arguments**
@ -109,28 +109,36 @@ splitByRegexp(<regexp>, <s>)
Returns an array of selected substrings. Empty substrings may be selected when:
- A non-empty regular expression match occurs at the beginning or end of the string;
- There are multiple consecutive non-empty regular expression matches;
- The original string `s` is empty while the regular expression is not empty.
Type: [Array](../../sql-reference/data-types/array.md) of [String](../../sql-reference/data-types/string.md).
Type: [Array](../../sql-reference/data-types/array.md)([String](../../sql-reference/data-types/string.md)).
**Example**
Query:
``` sql
SELECT splitByRegexp('\\d+', 'a12bc23de345f')
SELECT splitByRegexp('\\d+', 'a12bc23de345f');
```
Result:
``` text
┌─splitByRegexp('\\d+', 'a12bc23de345f')─┐
│ ['a','bc','de','f'] │
└────────────────────────────────────────┘
```
Query:
``` sql
SELECT splitByRegexp('', 'abcde')
SELECT splitByRegexp('', 'abcde');
```
Result:
``` text
┌─splitByRegexp('', 'abcde')─┐
│ ['a','b','c','d','e'] │
@ -149,7 +157,7 @@ Selects substrings of consecutive bytes from the ranges a-z and A-Z.Returns an a
**Example**
``` sql
SELECT alphaTokens('abca1abc')
SELECT alphaTokens('abca1abc');
```
``` text

View File

@ -65,6 +65,8 @@ ClickHouse хранит метаинформацию о репликах в [Apa
Репликация асинхронная, мульти-мастер. Запросы `INSERT` и `ALTER` можно направлять на любой доступный сервер. Данные вставятся на сервер, где выполнен запрос, а затем скопируются на остальные серверы. В связи с асинхронностью, только что вставленные данные появляются на остальных репликах с небольшой задержкой. Если часть реплик недоступна, данные на них запишутся тогда, когда они станут доступны. Если реплика доступна, то задержка составляет столько времени, сколько требуется для передачи блока сжатых данных по сети. Количество потоков для выполнения фоновых задач можно задать с помощью настройки [background_schedule_pool_size](../../../operations/settings/settings.md#background_schedule_pool_size).
Движок `ReplicatedMergeTree` использует отдельный пул потоков для скачивания кусков данных. Размер пула ограничен настройкой [background_fetches_pool_size](../../../operations/settings/settings.md#background_fetches_pool_size), которую можно указать при перезапуске сервера.
По умолчанию, запрос INSERT ждёт подтверждения записи только от одной реплики. Если данные были успешно записаны только на одну реплику, и сервер с этой репликой перестал существовать, то записанные данные будут потеряны. Вы можете включить подтверждение записи от нескольких реплик, используя настройку `insert_quorum`.
Каждый блок данных записывается атомарно. Запрос INSERT разбивается на блоки данных размером до `max_insert_block_size = 1048576` строк. То есть, если в запросе `INSERT` менее 1048576 строк, то он делается атомарно.
@ -249,5 +251,6 @@ $ sudo -u clickhouse touch /var/lib/clickhouse/flags/force_restore_data
**Смотрите также**
- [background_schedule_pool_size](../../../operations/settings/settings.md#background_schedule_pool_size)
- [background_fetches_pool_size](../../../operations/settings/settings.md#background_fetches_pool_size)
- [execute_merges_on_single_replica_time_threshold](../../../operations/settings/settings.md#execute-merges-on-single-replica-time-threshold)

View File

@ -2043,6 +2043,16 @@ SELECT idx, i FROM null_in WHERE i IN (1, NULL) SETTINGS transform_null_in = 1;
Значение по умолчанию: 16.
## background_fetches_pool_size {#background_fetches_pool_size}
Задает количество потоков для скачивания кусков данных для [реплицируемых](../../engines/table-engines/mergetree-family/replication.md) таблиц. Настройка применяется при запуске сервера ClickHouse и не может быть изменена в пользовательском сеансе. Для использования в продакшене с частыми небольшими вставками или медленным кластером ZooKeeper рекомендуется использовать значение по умолчанию.
Допустимые значения:
- Положительное целое число.
Значение по умолчанию: 8.
## background_distributed_schedule_pool_size {#background_distributed_schedule_pool_size}
Задает количество потоков для выполнения фоновых задач. Работает для таблиц с движком [Distributed](../../engines/table-engines/special/distributed.md). Настройка применяется при запуске сервера ClickHouse и не может быть изменена в пользовательском сеансе.

View File

@ -18,37 +18,37 @@ toc_title: JSON
Проверяет наличие поля с именем `name`.
Алиас: `simpleJSONHas`.
Синоним: `simpleJSONHas`.
## visitParamExtractUInt(params, name) {#visitparamextractuintparams-name}
Пытается выделить число типа UInt64 из значения поля с именем `name`. Если поле строковое, пытается выделить число из начала строки. Если такого поля нет, или если оно есть, но содержит не число, то возвращает 0.
Алиас: `simpleJSONExtractUInt`.
Синоним: `simpleJSONExtractUInt`.
## visitParamExtractInt(params, name) {#visitparamextractintparams-name}
Аналогично для Int64.
Алиас: `simpleJSONExtractInt`.
Синоним: `simpleJSONExtractInt`.
## visitParamExtractFloat(params, name) {#visitparamextractfloatparams-name}
Аналогично для Float64.
Алиас: `simpleJSONExtractFloat`.
Синоним: `simpleJSONExtractFloat`.
## visitParamExtractBool(params, name) {#visitparamextractboolparams-name}
Пытается выделить значение true/false. Результат — UInt8.
Алиас: `simpleJSONExtractBool`.
Синоним: `simpleJSONExtractBool`.
## visitParamExtractRaw(params, name) {#visitparamextractrawparams-name}
Возвращает значение поля, включая разделители.
Алиас: `simpleJSONExtractRaw`.
Синоним: `simpleJSONExtractRaw`.
Примеры:
@ -61,7 +61,7 @@ visitParamExtractRaw('{"abc":{"def":[1,2,3]}}', 'abc') = '{"def":[1,2,3]}';
Разбирает строку в двойных кавычках. У значения убирается экранирование. Если убрать экранированные символы не удалось, то возвращается пустая строка.
Алиас: `simpleJSONExtractString`.
Синоним: `simpleJSONExtractString`.
Примеры:

View File

@ -14,7 +14,7 @@ separator должен быть константной строкой из ро
**Синтаксис**
``` sql
splitByChar(<separator>, <s>)
splitByChar(separator, s)
```
**Аргументы**
@ -30,12 +30,12 @@ splitByChar(<separator>, <s>)
- Задано несколько последовательных разделителей;
- Исходная строка `s` пуста.
Type: [Array](../../sql-reference/data-types/array.md) of [String](../../sql-reference/data-types/string.md).
Тип: [Array](../../sql-reference/data-types/array.md)([String](../../sql-reference/data-types/string.md)).
**Пример**
``` sql
SELECT splitByChar(',', '1,2,3,abcde')
SELECT splitByChar(',', '1,2,3,abcde');
```
``` text
@ -67,12 +67,12 @@ splitByString(separator, s)
- Задано несколько последовательных разделителей;
- Исходная строка `s` пуста.
Тип: [Array](../../sql-reference/data-types/array.md) of [String](../../sql-reference/data-types/string.md).
Тип: [Array](../../sql-reference/data-types/array.md)([String](../../sql-reference/data-types/string.md)).
**Примеры**
``` sql
SELECT splitByString(', ', '1, 2 3, 4,5, abcde')
SELECT splitByString(', ', '1, 2 3, 4,5, abcde');
```
``` text
@ -82,7 +82,7 @@ SELECT splitByString(', ', '1, 2 3, 4,5, abcde')
```
``` sql
SELECT splitByString('', 'abcde')
SELECT splitByString('', 'abcde');
```
``` text
@ -91,6 +91,60 @@ SELECT splitByString('', 'abcde')
└────────────────────────────┘
```
## splitByRegexp(regexp, s) {#splitbyregexpseparator-s}
Разбивает строку на подстроки, разделенные регулярным выражением. В качестве разделителя используется строка регулярного выражения `regexp`. Если `regexp` пустая, функция разделит строку `s` на массив одиночных символов. Если для регулярного выражения совпадения не найдено, строка `s` не будет разбита.
**Синтаксис**
``` sql
splitByRegexp(regexp, s)
```
**Аргументы**
- `regexp` — регулярное выражение. Константа. [String](../data-types/string.md) или [FixedString](../data-types/fixedstring.md).
- `s` — разбиваемая строка. [String](../../sql-reference/data-types/string.md).
**Возвращаемые значения**
Возвращает массив выбранных подстрок. Пустая подстрока может быть возвращена, если:
- Непустое совпадение с регулярным выражением происходит в начале или конце строки;
- Имеется несколько последовательных совпадений c непустым регулярным выражением;
- Исходная строка `s` пуста, а регулярное выражение не пустое.
Тип: [Array](../../sql-reference/data-types/array.md)([String](../../sql-reference/data-types/string.md)).
**Примеры**
Запрос:
``` sql
SELECT splitByRegexp('\\d+', 'a12bc23de345f');
```
Результат:
``` text
┌─splitByRegexp('\\d+', 'a12bc23de345f')─┐
│ ['a','bc','de','f'] │
└────────────────────────────────────────┘
```
Запрос:
``` sql
SELECT splitByRegexp('', 'abcde');
```
Результат:
``` text
┌─splitByRegexp('', 'abcde')─┐
│ ['a','b','c','d','e'] │
└────────────────────────────┘
```
## arrayStringConcat(arr\[, separator\]) {#arraystringconcatarr-separator}
@ -106,7 +160,7 @@ separator - необязательный параметр, константна
**Пример:**
``` sql
SELECT alphaTokens('abca1abc')
SELECT alphaTokens('abca1abc');
```
``` text
@ -114,4 +168,3 @@ SELECT alphaTokens('abca1abc')
│ ['abca','abc'] │
└─────────────────────────┘
```

View File

@ -116,6 +116,12 @@ template <typename A, typename B> struct ResultOfModulo
using Type = std::conditional_t<std::is_floating_point_v<A> || std::is_floating_point_v<B>, Float64, Type0>;
};
template <typename A, typename B> struct ResultOfModuloLegacy
{
using Type0 = typename Construct<is_signed_v<A> || is_signed_v<B>, false, sizeof(B)>::Type;
using Type = std::conditional_t<std::is_floating_point_v<A> || std::is_floating_point_v<B>, Float64, Type0>;
};
template <typename A> struct ResultOfNegate
{
using Type = typename Construct<

View File

@ -54,6 +54,7 @@ class DiskS3::AwsS3KeyKeeper : public std::list<Aws::Vector<Aws::S3::Model::Obje
{
public:
void addKey(const String & key);
static String getChunkKeys(const Aws::Vector<Aws::S3::Model::ObjectIdentifier> & chunk);
private:
/// limit for one DeleteObject request
@ -74,6 +75,19 @@ void DiskS3::AwsS3KeyKeeper::addKey(const String & key)
back().push_back(obj);
}
String DiskS3::AwsS3KeyKeeper::getChunkKeys(const Aws::Vector<Aws::S3::Model::ObjectIdentifier> & chunk)
{
String res;
for (const auto & obj : chunk)
{
const auto & key = obj.GetKey();
if (!res.empty())
res.append(", ");
res.append(key.c_str(), key.size());
}
return res;
}
String getRandomName()
{
std::uniform_int_distribution<int> distribution('a', 'z');
@ -794,6 +808,8 @@ void DiskS3::removeAws(const AwsS3KeyKeeper & keys)
for (const auto & chunk : keys)
{
LOG_DEBUG(log, "Remove AWS keys {}", AwsS3KeyKeeper::getChunkKeys(chunk));
Aws::S3::Model::Delete delkeys;
delkeys.SetObjects(chunk);

View File

@ -172,4 +172,10 @@ struct ModuloImpl
#endif
};
template <typename A, typename B>
struct ModuloLegacyImpl : ModuloImpl<A, B>
{
using ResultType = typename NumberTraits::ResultOfModuloLegacy<A, B>::Type;
};
}

View File

@ -96,6 +96,11 @@ struct ModuloByConstantImpl
}
};
template <typename A, typename B>
struct ModuloLegacyByConstantImpl : ModuloByConstantImpl<A, B>
{
using Op = ModuloLegacyImpl<A, B>;
};
}
/** Specializations are specified for dividing numbers of the type UInt64 and UInt32 by the numbers of the same sign.
@ -134,4 +139,12 @@ void registerFunctionModulo(FunctionFactory & factory)
factory.registerAlias("mod", "modulo", FunctionFactory::CaseInsensitive);
}
struct NameModuloLegacy { static constexpr auto name = "moduloLegacy"; };
using FunctionModuloLegacy = BinaryArithmeticOverloadResolver<ModuloLegacyImpl, NameModuloLegacy, false>;
void registerFunctionModuloLegacy(FunctionFactory & factory)
{
factory.registerFunction<FunctionModuloLegacy>();
}
}

View File

@ -11,6 +11,7 @@ void registerFunctionIntDiv(FunctionFactory & factory);
void registerFunctionIntDivOrZero(FunctionFactory & factory);
void registerFunctionModulo(FunctionFactory & factory);
void registerFunctionModuloOrZero(FunctionFactory & factory);
void registerFunctionModuloLegacy(FunctionFactory & factory);
void registerFunctionNegate(FunctionFactory & factory);
void registerFunctionAbs(FunctionFactory & factory);
void registerFunctionBitAnd(FunctionFactory & factory);
@ -51,6 +52,7 @@ void registerFunctionsArithmetic(FunctionFactory & factory)
registerFunctionIntDivOrZero(factory);
registerFunctionModulo(factory);
registerFunctionModuloOrZero(factory);
registerFunctionModuloLegacy(factory);
registerFunctionNegate(factory);
registerFunctionAbs(factory);
registerFunctionBitAnd(factory);

View File

@ -463,12 +463,6 @@ struct ContextSharedPart
dictionaries_xmls.reset();
delete_system_logs = std::move(system_logs);
#if USE_EMBEDDED_COMPILER
if (auto * cache = CompiledExpressionCacheFactory::instance().tryGetCache())
cache->reset();
#endif
embedded_dictionaries.reset();
external_dictionaries_loader.reset();
models_repository_guard.reset();

View File

@ -28,7 +28,6 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
}
static CHJIT & getJITInstance()
@ -43,13 +42,36 @@ static Poco::Logger * getLogger()
return &logger;
}
class CompiledFunction
{
public:
CompiledFunction(void * compiled_function_, CHJIT::CompiledModuleInfo module_info_)
: compiled_function(compiled_function_)
, module_info(std::move(module_info_))
{}
void * getCompiledFunction() const { return compiled_function; }
~CompiledFunction()
{
getJITInstance().deleteCompiledModule(module_info);
}
private:
void * compiled_function;
CHJIT::CompiledModuleInfo module_info;
};
class LLVMExecutableFunction : public IExecutableFunction
{
public:
explicit LLVMExecutableFunction(const std::string & name_, JITCompiledFunction function_)
explicit LLVMExecutableFunction(const std::string & name_, std::shared_ptr<CompiledFunction> compiled_function_)
: name(name_)
, function(function_)
, compiled_function(compiled_function_)
{
}
@ -81,7 +103,9 @@ public:
}
columns[arguments.size()] = getColumnData(result_column.get());
function(input_rows_count, columns.data());
JITCompiledFunction jit_compiled_function_typed = reinterpret_cast<JITCompiledFunction>(compiled_function->getCompiledFunction());
jit_compiled_function_typed(input_rows_count, columns.data());
#if defined(MEMORY_SANITIZER)
/// Memory sanitizer don't know about stores from JIT-ed code.
@ -111,7 +135,7 @@ public:
private:
std::string name;
JITCompiledFunction function = nullptr;
std::shared_ptr<CompiledFunction> compiled_function;
};
class LLVMFunction : public IFunctionBase
@ -131,17 +155,13 @@ public:
else if (node.type == CompileDAG::CompileType::INPUT)
argument_types.emplace_back(node.result_type);
}
module_info = compileFunction(getJITInstance(), *this);
}
~LLVMFunction() override
void setCompiledFunction(std::shared_ptr<CompiledFunction> compiled_function_)
{
getJITInstance().deleteCompiledModule(module_info);
compiled_function = compiled_function_;
}
size_t getCompiledSize() const { return module_info.size; }
bool isCompilable() const override { return true; }
llvm::Value * compile(llvm::IRBuilderBase & builder, Values values) const override
@ -157,13 +177,10 @@ public:
ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName &) const override
{
void * function = getJITInstance().findCompiledFunction(module_info, name);
if (!compiled_function)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Compiled function was not initialized {}", name);
if (!function)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot find compiled function {}", name);
JITCompiledFunction function_typed = reinterpret_cast<JITCompiledFunction>(function);
return std::make_unique<LLVMExecutableFunction>(name, function_typed);
return std::make_unique<LLVMExecutableFunction>(name, compiled_function);
}
bool isDeterministic() const override
@ -252,7 +269,7 @@ private:
CompileDAG dag;
DataTypes argument_types;
std::vector<FunctionBasePtr> nested_functions;
CHJIT::CompiledModuleInfo module_info;
std::shared_ptr<CompiledFunction> compiled_function;
};
static FunctionBasePtr compile(
@ -271,43 +288,42 @@ static FunctionBasePtr compile(
LOG_TRACE(getLogger(), "Try to compile expression {}", dag.dump());
FunctionBasePtr fn;
auto llvm_function = std::make_shared<LLVMFunction>(dag);
if (auto * compilation_cache = CompiledExpressionCacheFactory::instance().tryGetCache())
{
auto [compiled_function, was_inserted] = compilation_cache->getOrSet(hash_key, [&dag] ()
auto [compiled_function_cache_entry, was_inserted] = compilation_cache->getOrSet(hash_key, [&] ()
{
auto llvm_function = std::make_unique<LLVMFunction>(dag);
size_t compiled_size = llvm_function->getCompiledSize();
CHJIT::CompiledModuleInfo compiled_module_info = compileFunction(getJITInstance(), *llvm_function);
auto * compiled_jit_function = getJITInstance().findCompiledFunction(compiled_module_info, llvm_function->getName());
auto compiled_function = std::make_shared<CompiledFunction>(compiled_jit_function, compiled_module_info);
CompiledFunction function
{
.function = std::move(llvm_function),
.compiled_size = compiled_size
};
return std::make_shared<CompiledFunction>(function);
return std::make_shared<CompiledFunctionCacheEntry>(std::move(compiled_function), compiled_module_info.size);
});
if (was_inserted)
LOG_TRACE(getLogger(),
"Put compiled expression {} in cache used cache size {} total cache size {}",
compiled_function->function->getName(),
llvm_function->getName(),
compilation_cache->weight(),
compilation_cache->maxSize());
else
LOG_TRACE(getLogger(), "Get compiled expression {} from cache", compiled_function->function->getName());
LOG_TRACE(getLogger(), "Get compiled expression {} from cache", llvm_function->getName());
fn = compiled_function->function;
llvm_function->setCompiledFunction(compiled_function_cache_entry->getCompiledFunction());
}
else
{
fn = std::make_unique<LLVMFunction>(dag);
CHJIT::CompiledModuleInfo compiled_module_info = compileFunction(getJITInstance(), *llvm_function);
auto * compiled_jit_function = getJITInstance().findCompiledFunction(compiled_module_info, llvm_function->getName());
auto compiled_function = std::make_shared<CompiledFunction>(compiled_jit_function, compiled_module_info);
llvm_function->setCompiledFunction(compiled_function);
}
LOG_TRACE(getLogger(), "Use compiled expression {}", fn->getName());
LOG_TRACE(getLogger(), "Use compiled expression {}", llvm_function->getName());
return fn;
return llvm_function;
}
static bool isCompilableConstant(const ActionsDAG::Node & node)
@ -334,16 +350,11 @@ static bool isCompilableFunction(const ActionsDAG::Node & node)
return function.isCompilable();
}
static bool isCompilableInput(const ActionsDAG::Node & node)
{
return node.type == ActionsDAG::ActionType::INPUT || node.type == ActionsDAG::ActionType::ALIAS;
}
static CompileDAG getCompilableDAG(
const ActionsDAG::Node * root,
ActionsDAG::NodeRawConstPtrs & children)
{
/// Extract CompileDAG from root actions dag node, it is important that each root child is compilable.
/// Extract CompileDAG from root actions dag node.
CompileDAG dag;
@ -363,6 +374,32 @@ static CompileDAG getCompilableDAG(
auto & frame = stack.top();
const auto * node = frame.node;
bool is_compilable_constant = isCompilableConstant(*node);
bool is_compilable_function = isCompilableFunction(*node);
if (!is_compilable_function || is_compilable_constant)
{
CompileDAG::Node compile_node;
compile_node.function = node->function_base;
compile_node.result_type = node->result_type;
if (is_compilable_constant)
{
compile_node.type = CompileDAG::CompileType::CONSTANT;
compile_node.column = node->column;
}
else
{
compile_node.type = CompileDAG::CompileType::INPUT;
children.emplace_back(node);
}
visited_node_to_compile_dag_position[node] = dag.getNodesCount();
dag.addNode(std::move(compile_node));
stack.pop();
continue;
}
while (frame.next_child_to_visit < node->children.size())
{
const auto & child = node->children[frame.next_child_to_visit];
@ -382,26 +419,15 @@ static CompileDAG getCompilableDAG(
if (!all_children_visited)
continue;
/// Here we process only functions that are not compiled constants
CompileDAG::Node compile_node;
compile_node.function = node->function_base;
compile_node.result_type = node->result_type;
compile_node.type = CompileDAG::CompileType::FUNCTION;
if (isCompilableConstant(*node))
{
compile_node.type = CompileDAG::CompileType::CONSTANT;
compile_node.column = node->column;
}
else if (node->type == ActionsDAG::ActionType::FUNCTION)
{
compile_node.type = CompileDAG::CompileType::FUNCTION;
for (const auto * child : node->children)
compile_node.arguments.push_back(visited_node_to_compile_dag_position[child]);
}
else
{
compile_node.type = CompileDAG::CompileType::INPUT;
children.emplace_back(node);
}
for (const auto * child : node->children)
compile_node.arguments.push_back(visited_node_to_compile_dag_position[child]);
visited_node_to_compile_dag_position[node] = dag.getNodesCount();
@ -417,8 +443,8 @@ void ActionsDAG::compileFunctions(size_t min_count_to_compile_expression)
struct Data
{
bool is_compilable_in_isolation = false;
bool all_children_compilable = false;
bool all_parents_compilable = true;
size_t compilable_children_size = 0;
size_t children_size = 0;
};
@ -428,7 +454,7 @@ void ActionsDAG::compileFunctions(size_t min_count_to_compile_expression)
for (const auto & node : nodes)
{
bool node_is_compilable_in_isolation = isCompilableConstant(node) || isCompilableFunction(node) || isCompilableInput(node);
bool node_is_compilable_in_isolation = isCompilableFunction(node) && !isCompilableConstant(node);
node_to_data[&node].is_compilable_in_isolation = node_is_compilable_in_isolation;
}
@ -441,8 +467,7 @@ void ActionsDAG::compileFunctions(size_t min_count_to_compile_expression)
std::stack<Frame> stack;
std::unordered_set<const Node *> visited_nodes;
/** Algorithm is to iterate over each node in ActionsDAG, and update node compilable status.
* Node is compilable if all its children are compilable and node is also compilable.
/** Algorithm is to iterate over each node in ActionsDAG, and update node compilable_children_size.
* After this procedure data for each node is initialized.
*/
@ -479,14 +504,18 @@ void ActionsDAG::compileFunctions(size_t min_count_to_compile_expression)
auto & current_node_data = node_to_data[current_node];
current_node_data.all_children_compilable = true;
if (current_node_data.is_compilable_in_isolation)
{
for (const auto * child : current_node->children)
{
current_node_data.all_children_compilable &= node_to_data[child].is_compilable_in_isolation;
current_node_data.all_children_compilable &= node_to_data[child].all_children_compilable;
auto & child_data = node_to_data[child];
if (child_data.is_compilable_in_isolation)
{
current_node_data.compilable_children_size += child_data.compilable_children_size;
current_node_data.compilable_children_size += 1;
}
current_node_data.children_size += node_to_data[child].children_size;
}
@ -501,10 +530,10 @@ void ActionsDAG::compileFunctions(size_t min_count_to_compile_expression)
for (const auto & node : nodes)
{
auto & node_data = node_to_data[&node];
bool is_compilable = node_data.is_compilable_in_isolation && node_data.all_children_compilable;
bool node_is_valid_for_compilation = node_data.is_compilable_in_isolation && node_data.compilable_children_size > 0;
for (const auto & child : node.children)
node_to_data[child].all_parents_compilable &= is_compilable;
node_to_data[child].all_parents_compilable &= node_is_valid_for_compilation;
}
for (const auto & node : index)
@ -519,11 +548,10 @@ void ActionsDAG::compileFunctions(size_t min_count_to_compile_expression)
{
auto & node_data = node_to_data[&node];
bool node_is_valid_for_compilation = !isCompilableConstant(node) && node.children.size() > 1;
bool can_be_compiled = node_data.is_compilable_in_isolation && node_data.all_children_compilable && node_is_valid_for_compilation;
bool node_is_valid_for_compilation = node_data.is_compilable_in_isolation && node_data.compilable_children_size > 0;
/// If all parents are compilable then this node should not be standalone compiled
bool should_compile = can_be_compiled && !node_data.all_parents_compilable;
bool should_compile = node_is_valid_for_compilation && !node_data.all_parents_compilable;
if (!should_compile)
continue;

View File

@ -5,35 +5,47 @@
#endif
#if USE_EMBEDDED_COMPILER
# include <Functions/IFunction.h>
# include <Common/LRUCache.h>
# include <Common/HashTable/Hash.h>
namespace DB
{
struct CompiledFunction
class CompiledFunction;
class CompiledFunctionCacheEntry
{
FunctionBasePtr function;
size_t compiled_size;
public:
CompiledFunctionCacheEntry(std::shared_ptr<CompiledFunction> compiled_function_, size_t compiled_function_size_)
: compiled_function(std::move(compiled_function_))
, compiled_function_size(compiled_function_size_)
{}
std::shared_ptr<CompiledFunction> getCompiledFunction() const { return compiled_function; }
size_t getCompiledFunctionSize() const { return compiled_function_size; }
private:
std::shared_ptr<CompiledFunction> compiled_function;
size_t compiled_function_size;
};
struct CompiledFunctionWeightFunction
{
size_t operator()(const CompiledFunction & compiled_function) const
size_t operator()(const CompiledFunctionCacheEntry & compiled_function) const
{
return compiled_function.compiled_size;
return compiled_function.getCompiledFunctionSize();
}
};
/** This child of LRUCache breaks one of it's invariants: total weight may be changed after insertion.
* We have to do so, because we don't known real memory consumption of generated LLVM code for every function.
*/
class CompiledExpressionCache : public LRUCache<UInt128, CompiledFunction, UInt128Hash, CompiledFunctionWeightFunction>
class CompiledExpressionCache : public LRUCache<UInt128, CompiledFunctionCacheEntry, UInt128Hash, CompiledFunctionWeightFunction>
{
public:
using Base = LRUCache<UInt128, CompiledFunction, UInt128Hash, CompiledFunctionWeightFunction>;
using Base = LRUCache<UInt128, CompiledFunctionCacheEntry, UInt128Hash, CompiledFunctionWeightFunction>;
using Base::Base;
};

View File

@ -302,7 +302,7 @@ Block InterpreterKillQueryQuery::getSelectResult(const String & columns, const S
if (where_expression)
select_query += " WHERE " + queryToString(where_expression);
auto stream = executeQuery(select_query, getContext()->getGlobalContext(), true).getInputStream();
auto stream = executeQuery(select_query, getContext(), true).getInputStream();
Block res = stream->read();
if (res && stream->read())

View File

@ -2,6 +2,7 @@
#include <Functions/IFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
@ -86,6 +87,30 @@ KeyDescription KeyDescription::getKeyFromAST(
return getSortingKeyFromAST(definition_ast, columns, context, {});
}
bool KeyDescription::moduloToModuloLegacyRecursive(ASTPtr node_expr)
{
if (!node_expr)
return false;
auto * function_expr = node_expr->as<ASTFunction>();
bool modulo_in_ast = false;
if (function_expr)
{
if (function_expr->name == "modulo")
{
function_expr->name = "moduloLegacy";
modulo_in_ast = true;
}
if (function_expr->arguments)
{
auto children = function_expr->arguments->children;
for (const auto & child : children)
modulo_in_ast |= moduloToModuloLegacyRecursive(child);
}
}
return modulo_in_ast;
}
KeyDescription KeyDescription::getSortingKeyFromAST(
const ASTPtr & definition_ast,
const ColumnsDescription & columns,

View File

@ -69,6 +69,9 @@ struct KeyDescription
/// unintentionaly share AST variables and modify them.
KeyDescription(const KeyDescription & other);
KeyDescription & operator=(const KeyDescription & other);
/// Substitute modulo with moduloLegacy. Used in KeyCondition to allow proper comparison with keys.
static bool moduloToModuloLegacyRecursive(ASTPtr node_expr);
};
}

View File

@ -379,6 +379,7 @@ MergeTreeData::DataPartPtr Service::findPart(const String & name)
MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const String & part_name,
const String & replica_path,
const String & host,
@ -470,9 +471,36 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
size_t sum_files_size = 0;
readBinary(sum_files_size, in);
IMergeTreeDataPart::TTLInfos ttl_infos;
/// Skip ttl infos, not required for S3 metadata
String ttl_infos_string;
readBinary(ttl_infos_string, in);
ReadBufferFromString ttl_infos_buffer(ttl_infos_string);
assertString("ttl format version: 1\n", ttl_infos_buffer);
ttl_infos.read(ttl_infos_buffer);
ReservationPtr reservation
= data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, &ttl_infos, true);
if (!reservation)
reservation
= data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true);
if (reservation)
{
/// When we have multi-volume storage, one of them was chosen, depends on TTL, free space, etc.
/// Chosen one may be S3 or not.
DiskPtr disk = reservation->getDisk();
if (disk && disk->getType() == DiskType::Type::S3)
{
for (const auto & d : disks_s3)
{
if (d->getPath() == disk->getPath())
{
Disks disks_tmp = { disk };
disks_s3.swap(disks_tmp);
break;
}
}
}
}
String part_type = "Wide";
readStringBinary(part_type, in);
if (part_type == "InMemory")
@ -493,7 +521,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
if (e.code() != ErrorCodes::S3_ERROR)
throw;
/// Try again but without S3 copy
return fetchPart(metadata_snapshot, part_name, replica_path, host, port, timeouts,
return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts,
user, password, interserver_scheme, to_detached, tmp_prefix_, nullptr, false);
}
}
@ -557,7 +585,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
MergeTreeData::DataPart::Checksums checksums;
return part_type == "InMemory"
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, std::move(reservation), in, projections)
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, std::move(reservation), in, projections)
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, reservation->getDisk(), in, projections, checksums);
}
@ -565,6 +593,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
const String & part_name,
const UUID & part_uuid,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
ReservationPtr reservation,
PooledReadWriteBufferFromHTTP & in,
size_t projections)
@ -619,7 +648,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
new_data_part->is_temp = true;
new_data_part->setColumns(block.getNamesAndTypesList());
new_data_part->minmax_idx.update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
new_data_part->partition.create(metadata_snapshot, block, 0);
new_data_part->partition.create(metadata_snapshot, block, 0, context);
MergedBlockOutputStream part_out(
new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {}));
@ -795,7 +824,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToS3(
readBinary(files, in);
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, volume, part_relative_path);
for (size_t i = 0; i < files; ++i)
{
@ -805,7 +833,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToS3(
readStringBinary(file_name, in);
readBinary(file_size, in);
String data_path = new_data_part->getFullRelativePath() + file_name;
String data_path = part_download_path + file_name;
String metadata_file = fullPath(disk, data_path);
{
@ -837,6 +865,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToS3(
assertEOF(in);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, volume, part_relative_path);
new_data_part->is_temp = true;
new_data_part->modification_time = time(nullptr);
new_data_part->loadColumnsChecksumsIndexes(true, false);

View File

@ -65,6 +65,7 @@ public:
/// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory.
MergeTreeData::MutableDataPartPtr fetchPart(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const String & part_name,
const String & replica_path,
const String & host,
@ -106,6 +107,7 @@ private:
const String & part_name,
const UUID & part_uuid,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
ReservationPtr reservation,
PooledReadWriteBufferFromHTTP & in,
size_t projections);

View File

@ -429,9 +429,9 @@ void IMergeTreeDataPart::removeIfNeeded()
}
if (parent_part)
projectionRemove(parent_part->getFullRelativePath());
projectionRemove(parent_part->getFullRelativePath(), keep_s3_on_delete);
else
remove(false);
remove(keep_s3_on_delete);
if (state == State::DeleteOnDestroy)
{
@ -1108,7 +1108,7 @@ void IMergeTreeDataPart::remove(bool keep_s3) const
if (isProjectionPart())
{
LOG_WARNING(storage.log, "Projection part {} should be removed by its parent {}.", name, parent_part->name);
projectionRemove(parent_part->getFullRelativePath());
projectionRemove(parent_part->getFullRelativePath(), keep_s3);
return;
}
@ -1158,7 +1158,7 @@ void IMergeTreeDataPart::remove(bool keep_s3) const
std::unordered_set<String> projection_directories;
for (const auto & [p_name, projection_part] : projection_parts)
{
projection_part->projectionRemove(to);
projection_part->projectionRemove(to, keep_s3);
projection_directories.emplace(p_name + ".proj");
}
@ -1207,7 +1207,7 @@ void IMergeTreeDataPart::remove(bool keep_s3) const
}
void IMergeTreeDataPart::projectionRemove(const String & parent_to) const
void IMergeTreeDataPart::projectionRemove(const String & parent_to, bool keep_s3) const
{
String to = parent_to + "/" + relative_path;
auto disk = volume->getDisk();
@ -1219,7 +1219,7 @@ void IMergeTreeDataPart::projectionRemove(const String & parent_to) const
"Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: checksums.txt is missing",
fullPath(disk, to));
/// If the part is not completely written, we cannot use fast path by listing files.
disk->removeRecursive(to + "/");
disk->removeSharedRecursive(to + "/", keep_s3);
}
else
{
@ -1232,17 +1232,17 @@ void IMergeTreeDataPart::projectionRemove(const String & parent_to) const
# pragma GCC diagnostic ignored "-Wunused-variable"
#endif
for (const auto & [file, _] : checksums.files)
disk->removeFile(to + "/" + file);
disk->removeSharedFile(to + "/" + file, keep_s3);
#if !defined(__clang__)
# pragma GCC diagnostic pop
#endif
for (const auto & file : {"checksums.txt", "columns.txt"})
disk->removeFile(to + "/" + file);
disk->removeFileIfExists(to + "/" + DEFAULT_COMPRESSION_CODEC_FILE_NAME);
disk->removeFileIfExists(to + "/" + DELETE_ON_DESTROY_MARKER_FILE_NAME);
disk->removeSharedFile(to + "/" + file, keep_s3);
disk->removeSharedFileIfExists(to + "/" + DEFAULT_COMPRESSION_CODEC_FILE_NAME, keep_s3);
disk->removeSharedFileIfExists(to + "/" + DELETE_ON_DESTROY_MARKER_FILE_NAME, keep_s3);
disk->removeDirectory(to);
disk->removeSharedRecursive(to, keep_s3);
}
catch (...)
{
@ -1250,7 +1250,7 @@ void IMergeTreeDataPart::projectionRemove(const String & parent_to) const
LOG_ERROR(storage.log, "Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: {}", fullPath(disk, to), getCurrentExceptionMessage(false));
disk->removeRecursive(to + "/");
disk->removeSharedRecursive(to + "/", keep_s3);
}
}
}

View File

@ -130,7 +130,7 @@ public:
void remove(bool keep_s3 = false) const;
void projectionRemove(const String & parent_to) const;
void projectionRemove(const String & parent_to, bool keep_s3 = false) const;
/// Initialize columns (from columns.txt if exists, or create from column files if not).
/// Load checksums from checksums.txt if exists. Load index if required.
@ -199,18 +199,21 @@ public:
/// Frozen by ALTER TABLE ... FREEZE ... It is used for information purposes in system.parts table.
mutable std::atomic<bool> is_frozen {false};
/// Flag for keep S3 data when zero-copy replication over S3 turned on.
mutable bool keep_s3_on_delete = false;
/**
* Part state is a stage of its lifetime. States are ordered and state of a part could be increased only.
* Part state should be modified under data_parts mutex.
*
* Possible state transitions:
* Temporary -> Precommitted: we are trying to commit a fetched, inserted or merged part to active set
* Precommitted -> Outdated: we could not add a part to active set and are doing a rollback (for example it is duplicated part)
* Precommitted -> Committed: we successfully committed a part to active dataset
* Precommitted -> Outdated: a part was replaced by a covering part or DROP PARTITION
* Outdated -> Deleting: a cleaner selected this part for deletion
* Deleting -> Outdated: if an ZooKeeper error occurred during the deletion, we will retry deletion
* Committed -> DeleteOnDestroy if part was moved to another disk
* Temporary -> Precommitted: we are trying to commit a fetched, inserted or merged part to active set
* Precommitted -> Outdated: we could not add a part to active set and are doing a rollback (for example it is duplicated part)
* Precommitted -> Committed: we successfully committed a part to active dataset
* Precommitted -> Outdated: a part was replaced by a covering part or DROP PARTITION
* Outdated -> Deleting: a cleaner selected this part for deletion
* Deleting -> Outdated: if an ZooKeeper error occurred during the deletion, we will retry deletion
* Committed -> DeleteOnDestroy: if part was moved to another disk
*/
enum class State
{

View File

@ -21,6 +21,7 @@
#include <Parsers/ASTIdentifier.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Storages/KeyDescription.h>
#include <cassert>
#include <stack>
@ -591,6 +592,30 @@ void KeyCondition::traverseAST(const ASTPtr & node, ContextPtr context, Block &
rpn.emplace_back(std::move(element));
}
bool KeyCondition::canConstantBeWrapped(const ASTPtr & node, const String & expr_name, String & result_expr_name)
{
const auto & sample_block = key_expr->getSampleBlock();
/// sample_block from key_expr cannot contain modulo and moduloLegacy at the same time.
/// For partition key it is always moduloLegacy.
if (sample_block.has(expr_name))
{
result_expr_name = expr_name;
}
else
{
auto adjusted_ast = node->clone();
KeyDescription::moduloToModuloLegacyRecursive(adjusted_ast);
String adjusted_expr_name = adjusted_ast->getColumnName();
if (!sample_block.has(adjusted_expr_name))
return false;
result_expr_name = adjusted_expr_name;
}
return true;
}
bool KeyCondition::canConstantBeWrappedByMonotonicFunctions(
const ASTPtr & node,
@ -600,11 +625,13 @@ bool KeyCondition::canConstantBeWrappedByMonotonicFunctions(
DataTypePtr & out_type)
{
// Constant expr should use alias names if any
String expr_name = node->getColumnName();
const auto & sample_block = key_expr->getSampleBlock();
if (!sample_block.has(expr_name))
String passed_expr_name = node->getColumnName();
String expr_name;
if (!canConstantBeWrapped(node, passed_expr_name, expr_name))
return false;
const auto & sample_block = key_expr->getSampleBlock();
/// TODO Nullable index is not yet landed.
if (out_value.isNull())
return false;
@ -668,11 +695,13 @@ bool KeyCondition::canConstantBeWrappedByFunctions(
const ASTPtr & ast, size_t & out_key_column_num, DataTypePtr & out_key_column_type, Field & out_value, DataTypePtr & out_type)
{
// Constant expr should use alias names if any
String expr_name = ast->getColumnName();
const auto & sample_block = key_expr->getSampleBlock();
if (!sample_block.has(expr_name))
String passed_expr_name = ast->getColumnName();
String expr_name;
if (!canConstantBeWrapped(ast, passed_expr_name, expr_name))
return false;
const auto & sample_block = key_expr->getSampleBlock();
/// TODO Nullable index is not yet landed.
if (out_value.isNull())
return false;

View File

@ -419,6 +419,12 @@ private:
bool canConstantBeWrappedByFunctions(
const ASTPtr & ast, size_t & out_key_column_num, DataTypePtr & out_key_column_type, Field & out_value, DataTypePtr & out_type);
/// Check if ASTPtr node, passed to canConstantBeWrappedBy*, can be used by them for further checks.
/// Always call this method at start of other methods, which require key comparison, because it also checks if adjusted
/// key expression can also be used (with substitution from modulo to moduloLegacy). This is needed because partition key
/// is always modified, when passed into keyCondition, - with recursive substitution from modulo to moduloLegacy.
bool canConstantBeWrapped(const ASTPtr & node, const String & expr_name, String & result_expr_name);
/// If it's possible to make an RPNElement
/// that will filter values (possibly tuples) by the content of 'prepared_set',
/// do it and return true.

View File

@ -23,7 +23,7 @@ void MergeTreeBlockOutputStream::writePrefix()
void MergeTreeBlockOutputStream::write(const Block & block)
{
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
for (auto & current_block : part_blocks)
{
Stopwatch watch;

View File

@ -773,7 +773,7 @@ std::optional<UInt64> MergeTreeData::totalRowsByPartitionPredicateImpl(
// Generate valid expressions for filtering
bool valid = VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, local_context, virtual_columns_block, expression_ast);
PartitionPruner partition_pruner(metadata_snapshot->getPartitionKey(), query_info, local_context, true /* strict */);
PartitionPruner partition_pruner(metadata_snapshot, query_info, local_context, true /* strict */);
if (partition_pruner.isUseless() && !valid)
return {};
@ -877,13 +877,13 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
/// Create and correctly initialize global WAL object
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
for (auto && part : write_ahead_log->restore(metadata_snapshot))
for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext()))
parts_from_wal.push_back(std::move(part));
}
else if (settings->in_memory_parts_enable_wal)
{
MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name());
for (auto && part : wal.restore(metadata_snapshot))
for (auto && part : wal.restore(metadata_snapshot, getContext()))
parts_from_wal.push_back(std::move(part));
}
}
@ -2726,6 +2726,22 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
if (active_part_it == data_parts_by_info.end())
throw Exception("Cannot swap part '" + part_copy->name + "', no such active part.", ErrorCodes::NO_SUCH_DATA_PART);
/// We do not check allow_s3_zero_copy_replication here because data may be shared
/// when allow_s3_zero_copy_replication turned on and off again
original_active_part->keep_s3_on_delete = false;
if (original_active_part->volume->getDisk()->getType() == DiskType::Type::S3)
{
if (part_copy->volume->getDisk()->getType() == DiskType::Type::S3
&& original_active_part->getUniqueId() == part_copy->getUniqueId())
{ /// May be when several volumes use the same S3 storage
original_active_part->keep_s3_on_delete = true;
}
else
original_active_part->keep_s3_on_delete = !unlockSharedData(*original_active_part);
}
modifyPartState(original_active_part, DataPartState::DeleteOnDestroy);
data_parts_indexes.erase(active_part_it);

View File

@ -503,7 +503,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
minmax_idx_condition.emplace(
query_info, context, minmax_columns_names, data.getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(context)));
partition_pruner.emplace(metadata_snapshot_base->getPartitionKey(), query_info, context, false /* strict */);
partition_pruner.emplace(metadata_snapshot_base, query_info, context, false /* strict */);
if (settings.force_index_by_date && (minmax_idx_condition->alwaysUnknownOrTrue() && partition_pruner->isUseless()))
{

View File

@ -140,7 +140,8 @@ void updateTTL(
}
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot)
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
BlocksWithPartition result;
if (!block || !block.rows())
@ -155,12 +156,12 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block
}
Block block_copy = block;
const auto & partition_key = metadata_snapshot->getPartitionKey();
partition_key.expression->execute(block_copy);
/// After expression execution partition key columns will be added to block_copy with names regarding partition function.
auto partition_key_names_and_types = MergeTreePartition::executePartitionByExpression(metadata_snapshot, block_copy, context);
ColumnRawPtrs partition_columns;
partition_columns.reserve(partition_key.sample_block.columns());
for (const ColumnWithTypeAndName & element : partition_key.sample_block)
partition_columns.reserve(partition_key_names_and_types.size());
for (const auto & element : partition_key_names_and_types)
partition_columns.emplace_back(block_copy.getByName(element.name).column.get());
PODArray<size_t> partition_num_to_first_row;

View File

@ -39,7 +39,7 @@ public:
* (split rows by partition)
* Works deterministically: if same block was passed, function will return same result in same order.
*/
static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot);
static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
/** All rows must correspond to same partition.
* Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.

View File

@ -129,7 +129,7 @@ void MergeTreePartition::load(const MergeTreeData & storage, const DiskPtr & dis
if (!metadata_snapshot->hasPartitionKey())
return;
const auto & partition_key_sample = metadata_snapshot->getPartitionKey().sample_block;
const auto & partition_key_sample = adjustPartitionKey(metadata_snapshot, storage.getContext()).sample_block;
auto partition_file_path = part_path + "partition.dat";
auto file = openForReading(disk, partition_file_path);
value.resize(partition_key_sample.columns());
@ -140,7 +140,7 @@ void MergeTreePartition::load(const MergeTreeData & storage, const DiskPtr & dis
void MergeTreePartition::store(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const
{
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
const auto & partition_key_sample = metadata_snapshot->getPartitionKey().sample_block;
const auto & partition_key_sample = adjustPartitionKey(metadata_snapshot, storage.getContext()).sample_block;
store(partition_key_sample, disk, part_path, checksums);
}
@ -153,28 +153,62 @@ void MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr
HashingWriteBuffer out_hashing(*out);
for (size_t i = 0; i < value.size(); ++i)
partition_key_sample.getByPosition(i).type->getDefaultSerialization()->serializeBinary(value[i], out_hashing);
out_hashing.next();
checksums.files["partition.dat"].file_size = out_hashing.count();
checksums.files["partition.dat"].file_hash = out_hashing.getHash();
out->finalize();
}
void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row)
void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context)
{
if (!metadata_snapshot->hasPartitionKey())
return;
const auto & partition_key = metadata_snapshot->getPartitionKey();
partition_key.expression->execute(block);
size_t partition_columns_num = partition_key.sample_block.columns();
value.resize(partition_columns_num);
auto partition_key_names_and_types = executePartitionByExpression(metadata_snapshot, block, context);
value.resize(partition_key_names_and_types.size());
for (size_t i = 0; i < partition_columns_num; ++i)
/// Executing partition_by expression adds new columns to passed block according to partition functions.
/// The block is passed by reference and is used afterwards. `moduloLegacy` needs to be substituted back
/// with just `modulo`, because it was a temporary substitution.
static constexpr auto modulo_legacy_function_name = "moduloLegacy";
size_t i = 0;
for (const auto & element : partition_key_names_and_types)
{
const auto & column_name = partition_key.sample_block.getByPosition(i).name;
const auto & partition_column = block.getByName(column_name).column;
partition_column->get(row, value[i]);
auto & partition_column = block.getByName(element.name);
if (element.name.starts_with(modulo_legacy_function_name))
partition_column.name = "modulo" + partition_column.name.substr(std::strlen(modulo_legacy_function_name));
partition_column.column->get(row, value[i++]);
}
}
NamesAndTypesList MergeTreePartition::executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context)
{
auto adjusted_partition_key = adjustPartitionKey(metadata_snapshot, context);
adjusted_partition_key.expression->execute(block);
return adjusted_partition_key.sample_block.getNamesAndTypesList();
}
KeyDescription MergeTreePartition::adjustPartitionKey(const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
const auto & partition_key = metadata_snapshot->getPartitionKey();
if (!partition_key.definition_ast)
return partition_key;
ASTPtr ast_copy = partition_key.definition_ast->clone();
/// Implementation of modulo function was changed from 8bit result type to 16bit. For backward compatibility partition by expression is always
/// calculated according to previous version - `moduloLegacy`.
if (KeyDescription::moduloToModuloLegacyRecursive(ast_copy))
{
auto adjusted_partition_key = KeyDescription::getKeyFromAST(ast_copy, metadata_snapshot->columns, context);
return adjusted_partition_key;
}
return partition_key;
}
}

View File

@ -3,9 +3,9 @@
#include <common/types.h>
#include <Disks/IDisk.h>
#include <IO/WriteBuffer.h>
#include <Storages/KeyDescription.h>
#include <Core/Field.h>
namespace DB
{
@ -41,7 +41,13 @@ public:
void assign(const MergeTreePartition & other) { value = other.value; }
void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row);
void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context);
/// Adjust partition key and execute its expression on block. Return sample block according to used expression.
static NamesAndTypesList executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context);
/// Make a modified partition key with substitution from modulo to moduloLegacy. Used in paritionPruner.
static KeyDescription adjustPartitionKey(const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
};
}

View File

@ -111,7 +111,7 @@ void MergeTreeWriteAheadLog::rotate(const std::unique_lock<std::mutex> &)
init();
}
MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot)
MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
std::unique_lock lock(write_mutex);
@ -192,7 +192,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
MergedBlockOutputStream part_out(part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {}));
part->minmax_idx.update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
part->partition.create(metadata_snapshot, block, 0);
part->partition.create(metadata_snapshot, block, 0, context);
if (metadata_snapshot->hasSortingKey())
metadata_snapshot->getSortingKey().expression->execute(block);

View File

@ -62,7 +62,7 @@ public:
void addPart(DataPartInMemoryPtr & part);
void dropPart(const String & part_name);
std::vector<MergeTreeMutableDataPartPtr> restore(const StorageMetadataPtr & metadata_snapshot);
std::vector<MergeTreeMutableDataPartPtr> restore(const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
using MinMaxBlockNumber = std::pair<Int64, Int64>;
static std::optional<MinMaxBlockNumber> tryParseMinMaxBlockNumber(const String & filename);

View File

@ -14,15 +14,18 @@ class PartitionPruner
{
private:
std::unordered_map<String, bool> partition_filter_map;
const KeyDescription & partition_key;
/// partition_key is adjusted here (with substitution from modulo to moduloLegacy).
KeyDescription partition_key;
KeyCondition partition_condition;
bool useless;
using DataPart = IMergeTreeDataPart;
using DataPartPtr = std::shared_ptr<const DataPart>;
public:
PartitionPruner(const KeyDescription & partition_key_, const SelectQueryInfo & query_info, ContextPtr context, bool strict)
: partition_key(partition_key_)
PartitionPruner(const StorageMetadataPtr & metadata, const SelectQueryInfo & query_info, ContextPtr context, bool strict)
: partition_key(MergeTreePartition::adjustPartitionKey(metadata, context))
, partition_condition(
query_info, context, partition_key.column_names, partition_key.expression, true /* single_point */, strict)
, useless(strict ? partition_condition.anyUnknownOrAlwaysTrue() : partition_condition.alwaysUnknownOrTrue())

View File

@ -136,7 +136,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
if (quorum)
checkQuorumPrecondition(zookeeper);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
for (auto & current_block : part_blocks)
{

View File

@ -268,8 +268,7 @@ void ReplicatedMergeTreeQueue::removeCoveredPartsFromMutations(const String & pa
bool some_mutations_are_probably_done = false;
auto from_it = in_partition->second.lower_bound(part_info.getDataVersion());
for (auto it = from_it; it != in_partition->second.end(); ++it)
for (auto it = in_partition->second.begin(); it != in_partition->second.end(); ++it)
{
MutationStatus & status = *it->second;

View File

@ -2500,7 +2500,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
throw Exception("Interserver schemas are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR);
part_desc->res_part = fetcher.fetchPart(
metadata_snapshot, part_desc->found_new_part_name, source_replica_path,
metadata_snapshot, getContext(), part_desc->found_new_part_name, source_replica_path,
address.host, address.replication_port, timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, TMP_PREFIX + "fetch_");
/// TODO: check columns_version of fetched part
@ -2616,7 +2616,7 @@ void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entr
ErrorCodes::LOGICAL_ERROR);
return fetcher.fetchPart(
metadata_snapshot, entry.new_part_name, source_replica_path,
metadata_snapshot, getContext(), entry.new_part_name, source_replica_path,
address.host, address.replication_port,
timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, true);
};
@ -4016,6 +4016,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
return fetcher.fetchPart(
metadata_snapshot,
getContext(),
part_name,
source_replica_path,
address.host,
@ -4171,7 +4172,7 @@ bool StorageReplicatedMergeTree::fetchExistsPart(const String & part_name, const
ErrorCodes::INTERSERVER_SCHEME_DOESNT_MATCH);
return fetcher.fetchPart(
metadata_snapshot, part_name, source_replica_path,
metadata_snapshot, getContext(), part_name, source_replica_path,
address.host, address.replication_port,
timeouts, credentials->getUser(), credentials->getPassword(), interserver_scheme, false, "", nullptr, true,
replaced_disk);
@ -4999,7 +5000,10 @@ bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const St
auto zookeeper = getZooKeeper();
delimiting_block_lock = allocateBlockNumber(partition_id, zookeeper);
right = delimiting_block_lock->getNumber();
mutation_version = queue.getCurrentMutationVersion(partition_id, right);
/// Make sure we cover all parts in drop range.
/// There might be parts with mutation version greater than current block number
/// if some part mutation has been assigned after block number allocation, but before creation of DROP_RANGE entry.
mutation_version = MergeTreePartInfo::MAX_BLOCK_NUMBER;
}
if (for_replace_range)

View File

@ -8,6 +8,18 @@
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s31>
<s31_again>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s31_again>
<s32>
<type>s3</type>
<endpoint>http://minio1:9001/root/data2/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s32>
</disks>
<policies>
<s3>
@ -28,11 +40,31 @@
</volumes>
<move_factor>0.0</move_factor>
</hybrid>
<tiered>
<volumes>
<main>
<disk>s31</disk>
</main>
<external>
<disk>s32</disk>
</external>
</volumes>
</tiered>
<tiered_copy>
<volumes>
<main>
<disk>s31</disk>
</main>
<external>
<disk>s31_again</disk>
</external>
</volumes>
</tiered_copy>
</policies>
</storage_configuration>
<merge_tree>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
<min_bytes_for_wide_part>1024</min_bytes_for_wide_part>
<old_parts_lifetime>1</old_parts_lifetime>
<allow_s3_zero_copy_replication>1</allow_s3_zero_copy_replication>
</merge_tree>

View File

@ -1,3 +1,4 @@
import datetime
import logging
import time
@ -27,10 +28,10 @@ def cluster():
cluster.shutdown()
def get_large_objects_count(cluster, size=100):
def get_large_objects_count(cluster, size=100, folder='data'):
minio = cluster.minio_client
counter = 0
for obj in minio.list_objects(cluster.minio_bucket, 'data/'):
for obj in minio.list_objects(cluster.minio_bucket, '{}/'.format(folder)):
if obj.size >= size:
counter = counter + 1
return counter
@ -38,11 +39,11 @@ def get_large_objects_count(cluster, size=100):
def wait_for_large_objects_count(cluster, expected, size=100, timeout=30):
while timeout > 0:
if get_large_objects_count(cluster, size) == expected:
if get_large_objects_count(cluster, size=size) == expected:
return
timeout -= 1
time.sleep(1)
assert get_large_objects_count(cluster, size) == expected
assert get_large_objects_count(cluster, size=size) == expected
@pytest.mark.parametrize(
@ -63,7 +64,7 @@ def test_s3_zero_copy_replication(cluster, policy):
)
node1.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')")
time.sleep(1)
node2.query("SYSTEM SYNC REPLICA s3_test")
assert node1.query("SELECT * FROM s3_test order by id FORMAT Values") == "(0,'data'),(1,'data')"
assert node2.query("SELECT * FROM s3_test order by id FORMAT Values") == "(0,'data'),(1,'data')"
@ -71,14 +72,15 @@ def test_s3_zero_copy_replication(cluster, policy):
assert get_large_objects_count(cluster) == 1
node2.query("INSERT INTO s3_test VALUES (2,'data'),(3,'data')")
time.sleep(1)
node1.query("SYSTEM SYNC REPLICA s3_test")
assert node2.query("SELECT * FROM s3_test order by id FORMAT Values") == "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
assert node1.query("SELECT * FROM s3_test order by id FORMAT Values") == "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
# Based on version 20.x - two parts
wait_for_large_objects_count(cluster, 2)
node1.query("OPTIMIZE TABLE s3_test")
node1.query("OPTIMIZE TABLE s3_test FINAL")
# Based on version 20.x - after merge, two old parts and one merged
wait_for_large_objects_count(cluster, 3)
@ -105,8 +107,7 @@ def test_s3_zero_copy_on_hybrid_storage(cluster):
)
node1.query("INSERT INTO hybrid_test VALUES (0,'data'),(1,'data')")
time.sleep(1)
node2.query("SYSTEM SYNC REPLICA hybrid_test")
assert node1.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"
assert node2.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"
@ -120,7 +121,7 @@ def test_s3_zero_copy_on_hybrid_storage(cluster):
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','default')"
# Total objects in S3
s3_objects = get_large_objects_count(cluster, 0)
s3_objects = get_large_objects_count(cluster, size=0)
node2.query("ALTER TABLE hybrid_test MOVE PARTITION ID 'all' TO DISK 's31'")
@ -135,3 +136,115 @@ def test_s3_zero_copy_on_hybrid_storage(cluster):
node1.query("DROP TABLE IF EXISTS hybrid_test NO DELAY")
node2.query("DROP TABLE IF EXISTS hybrid_test NO DELAY")
def insert_data_time(node, table, number_of_mb, time, start=0):
values = ','.join(f"({x},{time})" for x in range(start, int((1024 * 1024 * number_of_mb) / 8) + start + 1))
node.query(f"INSERT INTO {table} VALUES {values}")
def insert_large_data(node, table):
tm = time.mktime((datetime.date.today() - datetime.timedelta(days=7)).timetuple())
insert_data_time(node, table, 1, tm, 0)
tm = time.mktime((datetime.date.today() - datetime.timedelta(days=3)).timetuple())
insert_data_time(node, table, 1, tm, 1024*1024)
tm = time.mktime(datetime.date.today().timetuple())
insert_data_time(node, table, 10, tm, 1024*1024*2)
@pytest.mark.parametrize(
("storage_policy", "large_data", "iterations"),
[
("tiered", False, 10),
("tiered_copy", False, 10),
("tiered", True, 3),
("tiered_copy", True, 3),
]
)
def test_s3_zero_copy_with_ttl_move(cluster, storage_policy, large_data, iterations):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
node2.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
for i in range(iterations):
node1.query(
"""
CREATE TABLE ttl_move_test ON CLUSTER test_cluster (d UInt64, d1 DateTime)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/ttl_move_test', '{}')
ORDER BY d
TTL d1 + INTERVAL 2 DAY TO VOLUME 'external'
SETTINGS storage_policy='{}'
"""
.format('{replica}', storage_policy)
)
if large_data:
insert_large_data(node1, 'ttl_move_test')
else:
node1.query("INSERT INTO ttl_move_test VALUES (10, now() - INTERVAL 3 DAY)")
node1.query("INSERT INTO ttl_move_test VALUES (11, now() - INTERVAL 1 DAY)")
node1.query("OPTIMIZE TABLE ttl_move_test FINAL")
node2.query("SYSTEM SYNC REPLICA ttl_move_test")
if large_data:
assert node1.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(1572867)"
assert node2.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(1572867)"
else:
assert node1.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(2)"
assert node2.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(2)"
assert node1.query("SELECT d FROM ttl_move_test ORDER BY d FORMAT Values") == "(10),(11)"
assert node2.query("SELECT d FROM ttl_move_test ORDER BY d FORMAT Values") == "(10),(11)"
node1.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
node2.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY")
@pytest.mark.parametrize(
("large_data", "iterations"),
[
(False, 10),
(True, 3),
]
)
def test_s3_zero_copy_with_ttl_delete(cluster, large_data, iterations):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
node1.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
node2.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
for i in range(iterations):
node1.query(
"""
CREATE TABLE ttl_delete_test ON CLUSTER test_cluster (d UInt64, d1 DateTime)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/ttl_delete_test', '{}')
ORDER BY d
TTL d1 + INTERVAL 2 DAY
SETTINGS storage_policy='tiered'
"""
.format('{replica}')
)
if large_data:
insert_large_data(node1, 'ttl_delete_test')
else:
node1.query("INSERT INTO ttl_delete_test VALUES (10, now() - INTERVAL 3 DAY)")
node1.query("INSERT INTO ttl_delete_test VALUES (11, now() - INTERVAL 1 DAY)")
node1.query("OPTIMIZE TABLE ttl_delete_test FINAL")
node2.query("SYSTEM SYNC REPLICA ttl_delete_test")
if large_data:
assert node1.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1310721)"
assert node2.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1310721)"
else:
assert node1.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1)"
assert node2.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1)"
assert node1.query("SELECT d FROM ttl_delete_test ORDER BY d FORMAT Values") == "(11)"
assert node2.query("SELECT d FROM ttl_delete_test ORDER BY d FORMAT Values") == "(11)"
node1.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")
node2.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")

View File

@ -0,0 +1,28 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True, image='yandex/clickhouse-server', tag='21.2', with_installed_binary=True, stay_alive=True)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_modulo_partition_key_after_update(start_cluster):
node1.query("CREATE TABLE test (id Int64, v UInt64, value String) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/table1', '1', v) PARTITION BY id % 20 ORDER BY (id, v)")
node1.query("INSERT INTO test SELECT number, number, toString(number) FROM numbers(10)")
expected = node1.query("SELECT number, number, toString(number) FROM numbers(10)")
partition_data = node1.query("SELECT partition, name FROM system.parts WHERE table='test' ORDER BY partition")
assert(expected == node1.query("SELECT * FROM test ORDER BY id"))
node1.restart_with_latest_version(signal=9)
assert(expected == node1.query("SELECT * FROM test ORDER BY id"))
assert(partition_data == node1.query("SELECT partition, name FROM system.parts WHERE table='test' ORDER BY partition"))

View File

@ -1,3 +1,5 @@
SET compile_expressions = 1;
DROP TABLE IF EXISTS foo;
CREATE TABLE foo (
@ -12,7 +14,7 @@ CREATE TABLE foo (
INSERT INTO foo VALUES (1, 0.5, 0.2, 0.3, 0.8);
SELECT divide(sum(a) + sum(b), nullIf(sum(c) + sum(d), 0)) FROM foo SETTINGS compile_expressions = 1;
SELECT divide(sum(a) + sum(b), nullIf(sum(c) + sum(d), 0)) FROM foo SETTINGS compile_expressions = 1;
SELECT divide(sum(a) + sum(b), nullIf(sum(c) + sum(d), 0)) FROM foo;
SELECT divide(sum(a) + sum(b), nullIf(sum(c) + sum(d), 0)) FROM foo;
DROP TABLE foo;

View File

@ -1,9 +0,0 @@
0
0
0
0
0
0
0
0
0

View File

@ -4,11 +4,18 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
opts=(
--max_distributed_connections 9
--max_threads 1
--query "SELECT sleepEachRow(1) FROM remote('127.{2..10}', system.one)"
)
# 5 less then 9 seconds (9 streams), but long enough to cover possible load peaks
# "$@" left to pass manual options (like --experimental_use_processors 0) during manual testing
timeout 5s ${CLICKHOUSE_CLIENT} "${opts[@]}" "$@"
# Sometimes five seconds are not enough due to system overload.
# But if it can run in less than five seconds at least sometimes - it is enough for the test.
while true
do
opts=(
--max_distributed_connections 9
--max_threads 1
--query "SELECT sleepEachRow(1) FROM remote('127.{2..10}', system.one)"
--format Null
)
# 5 less then 9 seconds (9 streams), but long enough to cover possible load peaks
# "$@" left to pass manual options (like --experimental_use_processors 0) during manual testing
timeout 5s ${CLICKHOUSE_CLIENT} "${opts[@]}" "$@" && break
done

View File

@ -4,6 +4,12 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
query="SELECT sleepEachRow(1) FROM remote('127.{2,3}', system.one)"
# 1.8 less then 2 seconds, but long enough to cover possible load peaks
timeout 1.8s ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_distributed_connections=2&max_threads=1" -d "$query"
# Sometimes 1.8 seconds are not enough due to system overload.
# But if it can run in less than five seconds at least sometimes - it is enough for the test.
while true
do
query="SELECT sleepEachRow(1) FROM remote('127.{2,3}', system.one) FORMAT Null"
# 1.8 less then 2 seconds, but long enough to cover possible load peaks
timeout 1.8s ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&max_distributed_connections=2&max_threads=1" -d "$query" && break
done

View File

@ -0,0 +1,7 @@
waiting default rmt 0000000002 UPDATE m = m * toInt8(s) WHERE n = 3
1 4 2
2 15 5
3 7 fail
4 11 13
0000000000 UPDATE m = m * toInt8(s) WHERE 1 [] 0 1
0000000001 UPDATE m = m * toInt8(s) WHERE 1 [] 0 1

View File

@ -0,0 +1,34 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# shellcheck source=./mergetree_mutations.lib
. "$CURDIR"/mergetree_mutations.lib
${CLICKHOUSE_CLIENT} -q "drop table if exists rmt;"
${CLICKHOUSE_CLIENT} -q "create table rmt (n int, m int, s String) engine=ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/rmt', '1')
order by n settings max_replicated_mutations_in_queue=0;"
${CLICKHOUSE_CLIENT} -q "insert into rmt values (1, 1, '2');" # 0_0_0_0
${CLICKHOUSE_CLIENT} --mutations_sync=0 -q "alter table rmt update m = m*toInt8(s) where 1;" # 0000000000
${CLICKHOUSE_CLIENT} -q "insert into rmt values (2, 3, '5');" # 0_2_2_0
${CLICKHOUSE_CLIENT} --mutations_sync=0 -q "alter table rmt update m = m*toInt8(s) where 1;" # 0000000001
${CLICKHOUSE_CLIENT} -q "insert into rmt values (3, 7, 'fail');" # 0_4_4_0
${CLICKHOUSE_CLIENT} --mutations_sync=0 -q "alter table rmt update m = m*toInt8(s) where n=3;" # 0000000002, will fail to mutate 0_4_4_0 to 0_4_4_0_5
${CLICKHOUSE_CLIENT} -q "insert into rmt values (4, 11, '13');" # 0_6_6_0
${CLICKHOUSE_CLIENT} -q "alter table rmt modify setting max_replicated_mutations_in_queue=1;"
sleep 5 # test does not rely on this, but it may help to reproduce a bug
${CLICKHOUSE_CLIENT} -q "kill mutation where database=currentDatabase() and table='rmt' and mutation_id='0000000002'";
${CLICKHOUSE_CLIENT} -q "system sync replica rmt;"
# now check that mutations 0 and 1 are finished
wait_for_mutation "rmt" "0000000001"
${CLICKHOUSE_CLIENT} -q "select * from rmt order by n;"
${CLICKHOUSE_CLIENT} -q "select mutation_id, command, parts_to_do_names, parts_to_do, is_done from system.mutations where database=currentDatabase() and table='rmt';"
${CLICKHOUSE_CLIENT} -q "drop table rmt;"

View File

@ -1,4 +1,5 @@
42
Hello
waiting default mutation_table mutation_3.txt MODIFY COLUMN `value` UInt64
42
Hello

View File

@ -0,0 +1,2 @@
199
57

View File

@ -0,0 +1,2 @@
SELECT moduloLegacy(199, 200);
SELECT moduloLegacy(-199, 200);

View File

@ -0,0 +1,130 @@
simple partition key:
-61
-60
-59
-58
-57
-5
-4
-3
-2
-1
0
0
1
2
3
4
57
58
59
60
where id % 200 = +-2:
-202
202
where id % 200 > 0:
195
196
197
198
199
201
202
203
204
where id % 200 < 0:
-205
-204
-203
-202
-201
-199
-198
-197
-196
tuple as partition key:
(-1,-1)
(-1,0)
(-2,-2)
(-2,-3)
(-2,59)
(-2,60)
(0,-4)
(0,-5)
(0,-57)
(0,-58)
(0,4)
(0,57)
(0,58)
(1,-61)
(1,0)
(1,1)
(2,-59)
(2,-60)
(2,2)
(2,3)
recursive modulo partition key:
(-1,-1,0)
(-2,-2,-1)
(-3,-3,-2)
(-4,-4,-2)
(-5,-5,-2)
(-57,-7,-28)
(-58,-8,-29)
(-59,-9,-30)
(-60,0,-30)
(-61,-1,-30)
(0,0,0)
(0,0,0)
(1,1,0)
(2,2,1)
(3,3,2)
(4,4,2)
(57,7,28)
(58,8,29)
(59,9,30)
(60,0,30)
After detach:
(-1,-1,0)
(-2,-2,-1)
(-3,-3,-2)
(-4,-4,-2)
(-5,-5,-2)
(-57,-7,-28)
(-58,-8,-29)
(-59,-9,-30)
(-60,0,-30)
(-61,-1,-30)
(0,0,0)
(0,0,0)
(1,1,0)
(2,2,1)
(3,3,2)
(4,4,2)
(57,7,28)
(58,8,29)
(59,9,30)
(60,0,30)
Indexes:
100
comparison:
0 -205 -5 -5
1 -204 -4 -4
2 -203 -3 -3
3 -202 -2 -2
4 -201 -1 -1
5 -200 0 0
6 -199 -199 57
7 -198 -198 58
8 -197 -197 59
9 -196 -196 60
400 195 195 -61
401 196 196 -60
402 197 197 -59
403 198 198 -58
404 199 199 -57
405 200 0 0
406 201 1 1
407 202 2 2
408 203 3 3
409 204 4 4

View File

@ -0,0 +1,50 @@
SELECT 'simple partition key:';
DROP TABLE IF EXISTS table1 SYNC;
CREATE TABLE table1 (id Int64, v UInt64)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/test/tables/table12', '1', v)
PARTITION BY id % 200 ORDER BY id;
INSERT INTO table1 SELECT number-205, number FROM numbers(10);
INSERT INTO table1 SELECT number-205, number FROM numbers(400, 10);
SELECT toInt64(partition) as p FROM system.parts WHERE table='table1' and database=currentDatabase() ORDER BY p;
select 'where id % 200 = +-2:';
select id from table1 where id % 200 = 2 OR id % 200 = -2 order by id;
select 'where id % 200 > 0:';
select id from table1 where id % 200 > 0 order by id;
select 'where id % 200 < 0:';
select id from table1 where id % 200 < 0 order by id;
SELECT 'tuple as partition key:';
DROP TABLE IF EXISTS table2;
CREATE TABLE table2 (id Int64, v UInt64)
ENGINE = MergeTree()
PARTITION BY (toInt32(id / 2) % 3, id % 200) ORDER BY id;
INSERT INTO table2 SELECT number-205, number FROM numbers(10);
INSERT INTO table2 SELECT number-205, number FROM numbers(400, 10);
SELECT partition as p FROM system.parts WHERE table='table2' and database=currentDatabase() ORDER BY p;
SELECT 'recursive modulo partition key:';
DROP TABLE IF EXISTS table3;
CREATE TABLE table3 (id Int64, v UInt64)
ENGINE = MergeTree()
PARTITION BY (id % 200, (id % 200) % 10, toInt32(round((id % 200) / 2, 0))) ORDER BY id;
INSERT INTO table3 SELECT number-205, number FROM numbers(10);
INSERT INTO table3 SELECT number-205, number FROM numbers(400, 10);
SELECT partition as p FROM system.parts WHERE table='table3' and database=currentDatabase() ORDER BY p;
DETACH TABLE table3;
ATTACH TABLE table3;
SELECT 'After detach:';
SELECT partition as p FROM system.parts WHERE table='table3' and database=currentDatabase() ORDER BY p;
SELECT 'Indexes:';
DROP TABLE IF EXISTS table4;
CREATE TABLE table4 (id Int64, v UInt64, s String,
INDEX a (id * 2, s) TYPE minmax GRANULARITY 3
) ENGINE = MergeTree() PARTITION BY id % 10 ORDER BY v;
INSERT INTO table4 SELECT number, number, toString(number) FROM numbers(1000);
SELECT count() FROM table4 WHERE id % 10 = 7;
SELECT 'comparison:';
SELECT v, v-205 as vv, modulo(vv, 200), moduloLegacy(vv, 200) FROM table1 ORDER BY v;

View File

@ -6,9 +6,9 @@ SET compile_expressions=true;
-- CREATE TABLE will use global profile with default min_count_to_compile_expression=3
-- so retry 3 times
CREATE TABLE data_01875_1 Engine=MergeTree ORDER BY number PARTITION BY bitShiftRight(number,8) AS SELECT * FROM numbers(16384);
CREATE TABLE data_01875_2 Engine=MergeTree ORDER BY number PARTITION BY bitShiftRight(number,8) AS SELECT * FROM numbers(16384);
CREATE TABLE data_01875_3 Engine=MergeTree ORDER BY number PARTITION BY bitShiftRight(number,8) AS SELECT * FROM numbers(16384);
CREATE TABLE data_01875_1 Engine=MergeTree ORDER BY number PARTITION BY bitShiftRight(number, 8) + 1 AS SELECT * FROM numbers(16384);
CREATE TABLE data_01875_2 Engine=MergeTree ORDER BY number PARTITION BY bitShiftRight(number, 8) + 1 AS SELECT * FROM numbers(16384);
CREATE TABLE data_01875_3 Engine=MergeTree ORDER BY number PARTITION BY bitShiftRight(number, 8) + 1 AS SELECT * FROM numbers(16384);
SELECT number FROM data_01875_3 WHERE number = 999;

View File

@ -723,6 +723,7 @@
"01850_dist_INSERT_preserve_error", // uses cluster with different static databases shard_0/shard_1
"01821_table_comment",
"01710_projection_fetch",
"01870_modulo_partition_key",
"01870_buffer_flush" // creates database
]
}

View File

@ -4,6 +4,7 @@ v21.4.6.55-stable 2021-04-30
v21.4.5.46-stable 2021-04-24
v21.4.4.30-stable 2021-04-16
v21.4.3.21-stable 2021-04-12
v21.3.12.2-lts 2021-05-25
v21.3.11.5-lts 2021-05-14
v21.3.10.1-lts 2021-05-09
v21.3.9.83-lts 2021-04-28

1 v21.5.5.12-stable 2021-05-20
4 v21.4.5.46-stable 2021-04-24
5 v21.4.4.30-stable 2021-04-16
6 v21.4.3.21-stable 2021-04-12
7 v21.3.12.2-lts 2021-05-25
8 v21.3.11.5-lts 2021-05-14
9 v21.3.10.1-lts 2021-05-09
10 v21.3.9.83-lts 2021-04-28