Merge branch 'master' into consistent_metadata4

This commit is contained in:
alesapin 2020-06-09 18:59:59 +03:00
commit f7a397242f
25 changed files with 642 additions and 95 deletions

View File

@ -1,6 +1,8 @@
#pragma once
#include <functional>
#include <type_traits>
#include <utility>
template <class T, class Tag>
struct StrongTypedef

View File

@ -60,7 +60,7 @@ Engines in the family:
- [Distributed](special/distributed.md#distributed)
- [MaterializedView](special/materializedview.md#materializedview)
- [Dictionary](special/dictionary.md#dictionary)
- [Merge](special/merge.md#merge
- [Merge](special/merge.md#merge)
- [File](special/file.md#file)
- [Null](special/null.md#null)
- [Set](special/set.md#set)

View File

@ -5,10 +5,13 @@ toc_title: SYSTEM
# SYSTEM Queries {#query-language-system}
- [RELOAD EMBEDDED DICTIONARIES](#query_language-system-reload-emdedded-dictionaries)
- [RELOAD DICTIONARIES](#query_language-system-reload-dictionaries)
- [RELOAD DICTIONARY](#query_language-system-reload-dictionary)
- [DROP DNS CACHE](#query_language-system-drop-dns-cache)
- [DROP MARK CACHE](#query_language-system-drop-mark-cache)
- [DROP UNCOMPRESSED CACHE](#query_language-system-drop-uncompressed-cache)
- [DROP COMPILED EXPRESSION CACHE](#query_language-system-drop-compiled-expression-cache)
- [FLUSH LOGS](#query_language-system-flush_logs)
- [RELOAD CONFIG](#query_language-system-reload-config)
- [SHUTDOWN](#query_language-system-shutdown)
@ -18,7 +21,25 @@ toc_title: SYSTEM
- [START DISTRIBUTED SENDS](#query_language-system-start-distributed-sends)
- [STOP MERGES](#query_language-system-stop-merges)
- [START MERGES](#query_language-system-start-merges)
- [STOP TTL MERGES](#query_language-stop-ttl-merges)
- [START TTL MERGES](#query_language-start-ttl-merges)
- [STOP MOVES](#query_language-stop-moves)
- [START MOVES](#query_language-start-moves)
- [STOP FETCHES](#query_language-system-stop-fetches)
- [START FETCHES](#query_language-system-start-fetches)
- [STOP REPLICATED SENDS](#query_language-system-start-replicated-sends)
- [START REPLICATED SENDS](#query_language-system-start-replicated-sends)
- [STOP REPLICATION QUEUES](#query_language-system-stop-replication-queues)
- [START REPLICATION QUEUES](#query_language-system-start-replication-queues)
- [SYNC REPLICA](#query_language-system-sync-replica)
- [RESTART REPLICA](#query_language-system-restart-replica)
- [RESTART REPLICAS](#query_language-system-restart-replicas)
## RELOAD EMBEDDED DICTIONARIES] {#query_language-system-reload-emdedded-dictionaries}
Reload all [Internal dictionaries](../dictionaries/internal-dicts.md).
By default, internal dictionaries are disabled.
Always returns `Ok.` regardless of the result of the internal dictionary update.
## RELOAD DICTIONARIES {#query_language-system-reload-dictionaries}
Reloads all dictionaries that have been successfully loaded before.
@ -45,6 +66,16 @@ For more convenient (automatic) cache management, see disable\_internal\_dns\_ca
Resets the mark cache. Used in development of ClickHouse and performance tests.
## DROP UNCOMPRESSED CACHE {#query_language-system-drop-uncompressed-cache}
Reset the uncompressed data cache. Used in development of ClickHouse and performance tests.
For manage uncompressed data cache parameters use following server level settings [uncompressed_cache_size](../../operations/server-configuration-parameters/settings.md#server-settings-uncompressed_cache_size) and query/user/profile level settings [use_uncompressed_cache](../../operations/settings/settings.md#setting-use_uncompressed_cache)
## DROP COMPILED EXPRESSION CACHE {#query_language-system-drop-compiled-expression-cache}
Reset the compiled expression cache. Used in development of ClickHouse and performance tests.
Complied expression cache used when query/user/profile enable option [compile](../../operations/settings/settings.md#compile)
## FLUSH LOGS {#query_language-system-flush_logs}
Flushes buffers of log messages to system tables (e.g. system.query\_log). Allows you to not wait 7.5 seconds when debugging.
@ -89,6 +120,10 @@ Enables background data distribution when inserting data into distributed tables
SYSTEM START DISTRIBUTED SENDS [db.]<distributed_table_name>
```
## Managing MergeTree Tables {#query-language-system-mergetree}
ClickHouse can manage background processes in [MergeTree](../../engines/table-engines/mergetree-family/mergetree.md) tables.
### STOP MERGES {#query_language-system-stop-merges}
Provides possibility to stop background merges for tables in the MergeTree family:
@ -108,4 +143,110 @@ Provides possibility to start background merges for tables in the MergeTree fami
SYSTEM START MERGES [[db.]merge_tree_family_table_name]
```
### STOP TTL MERGES {#query_language-stop-ttl-merges}
Provides possibility to stop background delete old data according to [TTL expression](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-ttl) for tables in the MergeTree family:
Return `Ok.` even table doesn't exists or table have not MergeTree engine. Return error when database doesn't exists:
``` sql
SYSTEM STOP TTL MERGES [[db.]merge_tree_family_table_name]
```
### START TTL MERGES {#query_language-start-ttl-merges}
Provides possibility to start background delete old data according to [TTL expression](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-ttl) for tables in the MergeTree family:
Return `Ok.` even table doesn't exists. Return error when database doesn't exists:
``` sql
SYSTEM START TTL MERGES [[db.]merge_tree_family_table_name]
```
### STOP MOVES {#query_language-stop-moves}
Provides possibility to stop background move data according to [TTL table expression with TO VOLUME or TO DISK clause](../../engines/table-engines/mergetree-family/mergetree.md#mergetree-table-ttl) for tables in the MergeTree family:
Return `Ok.` even table doesn't exists. Return error when database doesn't exists:
``` sql
SYSTEM STOP MOVES [[db.]merge_tree_family_table_name]
```
### START MOVES {#query_language-start-moves}
Provides possibility to start background move data according to [TTL table expression with TO VOLUME and TO DISK clause](../../engines/table-engines/mergetree-family/mergetree.md#mergetree-table-ttl) for tables in the MergeTree family:
Return `Ok.` even table doesn't exists. Return error when database doesn't exists:
``` sql
SYSTEM STOP MOVES [[db.]merge_tree_family_table_name]
```
## Managing ReplicatedMergeTree Tables {#query-language-system-replicated}
ClickHouse can manage background replication related processes in [ReplicatedMergeTree](../../engines/table-engines/mergetree-family/replacingmergetree.md) tables.
### STOP FETCHES {#query_language-system-stop-fetches}
Provides possibility to stop background fetches for inserted parts for tables in the `ReplicatedMergeTree` family:
Always returns `Ok.` regardless of the table engine and even table or database doesn't exists.
``` sql
SYSTEM STOP FETCHES [[db.]replicated_merge_tree_family_table_name]
```
### START FETCHES {#query_language-system-start-fetches}
Provides possibility to start background fetches for inserted parts for tables in the `ReplicatedMergeTree` family:
Always returns `Ok.` regardless of the table engine and even table or database doesn't exists.
``` sql
SYSTEM START FETCHES [[db.]replicated_merge_tree_family_table_name]
```
### STOP REPLICATED SENDS {#query_language-system-start-replicated-sends}
Provides possibility to stop background sends to other replicas in cluster for new inserted parts for tables in the `ReplicatedMergeTree` family:
``` sql
SYSTEM STOP REPLICATED SENDS [[db.]replicated_merge_tree_family_table_name]
```
### START REPLICATED SENDS {#query_language-system-start-replicated-sends}
Provides possibility to start background sends to other replicas in cluster for new inserted parts for tables in the `ReplicatedMergeTree` family:
``` sql
SYSTEM START REPLICATED SENDS [[db.]replicated_merge_tree_family_table_name]
```
### STOP REPLICATION QUEUES {#query_language-system-stop-replication-queues}
Provides possibility to stop background fetch tasks from replication queues which stored in Zookeeper for tables in the `ReplicatedMergeTree` family. Possible background tasks types - merges, fetches, mutation, DDL statements with ON CLUSTER clause:
``` sql
SYSTEM STOP REPLICATION QUEUES [[db.]replicated_merge_tree_family_table_name]
```
### START REPLICATION QUEUES {#query_language-system-start-replication-queues}
Provides possibility to start background fetch tasks from replication queues which stored in Zookeeper for tables in the `ReplicatedMergeTree` family. Possible background tasks types - merges, fetches, mutation, DDL statements with ON CLUSTER clause:
``` sql
SYSTEM START REPLICATION QUEUES [[db.]replicated_merge_tree_family_table_name]
```
### SYNC REPLICA {#query_language-system-sync-replica}
Wait until a `ReplicatedMergeTree` table will be synced with other replicas in a cluster. Will run until `receive_timeout` if fetches currently disabled for the table.
``` sql
SYSTEM SYNC REPLICA [db.]replicated_merge_tree_family_table_name
```
### RESTART REPLICA {#query_language-system-restart-replica}
Provides possibility to reinitialize Zookeeper sessions state for `ReplicatedMergeTree` table, will compare current state with Zookeeper as source of true and add tasks to Zookeeper queue if needed
Initialization replication quene based on ZooKeeper date happens in the same way as `ATTACH TABLE` statement. For a short time the table will be unavailable for any operations.
``` sql
SYSTEM RESTART REPLICA [db.]replicated_merge_tree_family_table_name
```
### RESTART REPLICAS {#query_language-system-restart-replicas}
Provides possibility to reinitialize Zookeeper sessions state for all `ReplicatedMergeTree` tables, will compare current state with Zookeeper as source of true and add tasks to Zookeeper queue if needed
``` sql
SYSTEM RESTART QUEUES [db.]replicated_merge_tree_family_table_name
```
[Original article](https://clickhouse.tech/docs/en/query_language/system/) <!--hide-->

View File

@ -1,9 +1,12 @@
# Запросы SYSTEM {#query-language-system}
- [RELOAD EMBEDDED DICTIONARIES](#query_language-system-reload-emdedded-dictionaries)
- [RELOAD DICTIONARIES](#query_language-system-reload-dictionaries)
- [RELOAD DICTIONARY](#query_language-system-reload-dictionary)
- [DROP DNS CACHE](#query_language-system-drop-dns-cache)
- [DROP MARK CACHE](#query_language-system-drop-mark-cache)
- [DROP UNCOMPRESSED CACHE](#query_language-system-drop-uncompressed-cache)
- [DROP COMPILED EXPRESSION CACHE](#query_language-system-drop-compiled-expression-cache)
- [FLUSH LOGS](#query_language-system-flush_logs)
- [RELOAD CONFIG](#query_language-system-reload-config)
- [SHUTDOWN](#query_language-system-shutdown)
@ -13,7 +16,25 @@
- [START DISTRIBUTED SENDS](#query_language-system-start-distributed-sends)
- [STOP MERGES](#query_language-system-stop-merges)
- [START MERGES](#query_language-system-start-merges)
- [STOP TTL MERGES](#query_language-stop-ttl-merges)
- [START TTL MERGES](#query_language-start-ttl-merges)
- [STOP MOVES](#query_language-stop-moves)
- [START MOVES](#query_language-start-moves)
- [STOP FETCHES](#query_language-system-stop-fetches)
- [START FETCHES](#query_language-system-start-fetches)
- [STOP REPLICATED SENDS](#query_language-system-start-replicated-sends)
- [START REPLICATED SENDS](#query_language-system-start-replicated-sends)
- [STOP REPLICATION QUEUES](#query_language-system-stop-replication-queues)
- [START REPLICATION QUEUES](#query_language-system-start-replication-queues)
- [SYNC REPLICA](#query_language-system-sync-replica)
- [RESTART REPLICA](#query_language-system-restart-replica)
- [RESTART REPLICAS](#query_language-system-restart-replicas)
## RELOAD EMBEDDED DICTIONARIES] {#query_language-system-reload-emdedded-dictionaries}
Перегружет все [Встроенные словари](../dictionaries/internal-dicts.md).
По умолчанию встроенные словари выключены.
Всегда возвращает `Ok.`, вне зависимости от результата обновления встроенных словарей.
## RELOAD DICTIONARIES {#query_language-system-reload-dictionaries}
Перегружает все словари, которые были успешно загружены до этого.
@ -40,6 +61,16 @@ SELECT name, status FROM system.dictionaries;
Сбрасывает кеш «засечек» (`mark cache`). Используется при разработке ClickHouse и тестах производительности.
## DROP UNCOMPRESSED CACHE {#query_language-system-drop-uncompressed-cache}
Сбрасывает кеш не сжатых данных. Используется при разработке ClickHouse и тестах производительности.
Для управления кешем не сжатых данных используйте следующие настройки уровня сервера [uncompressed_cache_size](../../operations/server-configuration-parameters/settings.md#server-settings-uncompressed_cache_size) и настройки уровня запрос/пользователь/профиль [use_uncompressed_cache](../../operations/settings/settings.md#setting-use_uncompressed_cache)
## DROP COMPILED EXPRESSION CACHE {#query_language-system-drop-compiled-expression-cache}
Сбрасывает кеш скомпилированных выражений. Используется при разработке ClickHouse и тестах производительности.
Компилированные выражения используются когда включена настройка уровня запрос/пользователь/профиль [compile](../../operations/settings/settings.md#compile)
## FLUSH LOGS {#query_language-system-flush_logs}
Записывает буферы логов в системные таблицы (например system.query\_log). Позволяет не ждать 7.5 секунд при отладке.
@ -84,6 +115,10 @@ SYSTEM FLUSH DISTRIBUTED [db.]<distributed_table_name>
SYSTEM START DISTRIBUTED SENDS [db.]<distributed_table_name>
```
## Managing MergeTree Tables {#query-language-system-mergetree}
ClickHouse может управлять фоновыми процессами в [MergeTree](../../engines/table-engines/mergetree-family/mergetree.md) таблицах.
### STOP MERGES {#query_language-system-stop-merges}
Позволяет остановить фоновые мержи для таблиц семейства MergeTree:
@ -103,4 +138,110 @@ SYSTEM STOP MERGES [[db.]merge_tree_family_table_name]
SYSTEM START MERGES [[db.]merge_tree_family_table_name]
```
### STOP TTL MERGES {#query_language-stop-ttl-merges}
Позволяет остановить фоновые процессы удаления старых данных основанные на [выражениях TTL](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-ttl) для таблиц семейства MergeTree:
Возвращает `Ok.` даже если указана несуществующая таблица или таблица имеет тип отличный от MergeTree. Возвращает ошибку если указана не существующая база данных:
``` sql
SYSTEM STOP TTL MERGES [[db.]merge_tree_family_table_name]
```
### START TTL MERGES {#query_language-start-ttl-merges}
Запускает фоновые процессы удаления старых данных основанные на [выражениях TTL](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-ttl) для таблиц семейства MergeTree:
Возвращает `Ok.` даже если указана несуществующая таблица или таблица имеет тип отличный от MergeTree. Возвращает ошибку если указана не существующая база данных:
``` sql
SYSTEM START TTL MERGES [[db.]merge_tree_family_table_name]
```
### STOP MOVES {#query_language-stop-moves}
Позволяет остановить фоновые процессы переноса данных основанные [табличных выражениях TTL с использованием TO VOLUME или TO DISK](../../engines/table-engines/mergetree-family/mergetree.md#mergetree-table-ttl) for tables in the MergeTree family:
Возвращает `Ok.` даже если указана несуществующая таблица или таблица имеет тип отличный от MergeTree. Возвращает ошибку если указана не существующая база данных:
``` sql
SYSTEM STOP MOVES [[db.]merge_tree_family_table_name]
```
### START MOVES {#query_language-start-moves}
Запускает фоновые процессы переноса данных основанные [табличных выражениях TTL с использованием TO VOLUME или TO DISK](../../engines/table-engines/mergetree-family/mergetree.md#mergetree-table-ttl) for tables in the MergeTree family:
Возвращает `Ok.` даже если указана несуществующая таблица или таблица имеет тип отличный от MergeTree. Возвращает ошибку если указана не существующая база данных:
``` sql
SYSTEM STOP MOVES [[db.]merge_tree_family_table_name]
```
## Managing ReplicatedMergeTree Tables {#query-language-system-replicated}
ClickHouse может управлять фоновыми процессами связанными c репликацией в таблицах семейства [ReplicatedMergeTree](../../engines/table-engines/mergetree-family/replacingmergetree.md).
### STOP FETCHES {#query_language-system-stop-fetches}
Позволяет остановить фоновые процессы синхронизации новыми вставленными кусками данных с другими репликами в кластере для таблиц семейства `ReplicatedMergeTree`:
Всегда возвращает `Ok.` вне зависимости от типа таблицы и даже если таблица или база данных не существет.
``` sql
SYSTEM STOP FETCHES [[db.]replicated_merge_tree_family_table_name]
```
### START FETCHES {#query_language-system-start-fetches}
Позволяет запустить фоновые процессы синхронизации новыми вставленными кусками данных с другими репликами в кластере для таблиц семейства `ReplicatedMergeTree`:
Всегда возвращает `Ok.` вне зависимости от типа таблицы и даже если таблица или база данных не существет.
``` sql
SYSTEM START FETCHES [[db.]replicated_merge_tree_family_table_name]
```
### STOP REPLICATED SENDS {#query_language-system-start-replicated-sends}
Позволяет остановить фоновые процессы отсылки новых вставленных кусков данных другим репликам в кластере для таблиц семейства `ReplicatedMergeTree`:
``` sql
SYSTEM STOP REPLICATED SENDS [[db.]replicated_merge_tree_family_table_name]
```
### START REPLICATED SENDS {#query_language-system-start-replicated-sends}
Позволяет запустить фоновые процессы отсылки новых вставленных кусков данных другим репликам в кластере для таблиц семейства `ReplicatedMergeTree`:
``` sql
SYSTEM START REPLICATED SENDS [[db.]replicated_merge_tree_family_table_name]
```
### STOP REPLICATION QUEUES {#query_language-system-stop-replication-queues}
Останавливает фоновые процессы разбора заданий из очереди репликации которая хранится в Zookeeper для таблиц семейства `ReplicatedMergeTree`. Возможные типы заданий - merges, fetches, mutation, DDL запросы с ON CLUSTER:
``` sql
SYSTEM STOP REPLICATION QUEUES [[db.]replicated_merge_tree_family_table_name]
```
### START REPLICATION QUEUES {#query_language-system-start-replication-queues}
Запускает фоновые процессы разбора заданий из очереди репликации которая хранится в Zookeeper для таблиц семейства `ReplicatedMergeTree`. Возможные типы заданий - merges, fetches, mutation, DDL запросы с ON CLUSTER:
``` sql
SYSTEM START REPLICATION QUEUES [[db.]replicated_merge_tree_family_table_name]
```
### SYNC REPLICA {#query_language-system-sync-replica}
Ждет когда таблица семейства `ReplicatedMergeTree` будет синхронизирована с другими репликами в кластере, будет работать до достижения `receive_timeout`, если синхронизация для таблицы отключена в настоящий момент времени:
``` sql
SYSTEM SYNC REPLICA [db.]replicated_merge_tree_family_table_name
```
### RESTART REPLICA {#query_language-system-restart-replica}
Реинициализация состояния Zookeeper сессий для таблицы семейства `ReplicatedMergeTree`, сравнивает текущее состояние с тем что хранится в Zookeeper как источник правды и добавляет задачи Zookeeper очередь если необходимо
Инициализация очереди репликации на основе данных ZooKeeper, происходит так же как при attach table. На короткое время таблица станет недоступной для любых операций.
``` sql
SYSTEM RESTART QUEUES [db.]replicated_merge_tree_family_table_name
```
### RESTART REPLICAS {#query_language-system-restart-replicas}
Реинициализация состояния Zookeeper сессий для всех `ReplicatedMergeTree` таблиц, сравнивает текущее состояние с тем что хранится в Zookeeper как источник правды и добавляет задачи Zookeeper очередь если необходимо
``` sql
SYSTEM RESTART QUEUES [db.]replicated_merge_tree_family_table_name
```
[Оригинальная статья](https://clickhouse.tech/docs/ru/query_language/system/) <!--hide-->

View File

@ -2,7 +2,7 @@ Babel==2.8.0
backports-abc==0.5
backports.functools-lru-cache==1.6.1
beautifulsoup4==4.9.1
certifi==2020.4.5.1
certifi==2020.4.5.2
chardet==3.0.4
click==7.1.2
closure==20191111
@ -13,7 +13,7 @@ idna==2.9
Jinja2==2.11.2
jinja2-highlight==0.6.1
jsmin==2.2.2
livereload==2.6.1
livereload==2.6.2
Markdown==3.2.1
MarkupSafe==1.1.1
mkdocs==1.1.2

View File

@ -1,5 +1,5 @@
Babel==2.8.0
certifi==2020.4.5.1
certifi==2020.4.5.2
chardet==3.0.4
googletrans==2.4.0
idna==2.9

View File

@ -116,8 +116,6 @@ void Suggest::loadImpl(Connection & connection, const ConnectionTimeouts & timeo
<< " UNION ALL "
"SELECT DISTINCT name FROM system.dictionaries LIMIT " << limit_str
<< " UNION ALL "
"SELECT DISTINCT name FROM system.users LIMIT " << limit_str
<< " UNION ALL "
"SELECT DISTINCT name FROM system.columns LIMIT " << limit_str;
}

View File

@ -6,6 +6,7 @@
#include <IO/WriteBufferFromArena.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Common/FieldVisitors.h>
#include <Common/SipHash.h>
#include <Common/AlignedBuffer.h>
#include <Common/typeid_cast.h>
@ -27,6 +28,51 @@ namespace ErrorCodes
}
static std::string getTypeString(const AggregateFunctionPtr & func)
{
WriteBufferFromOwnString stream;
stream << "AggregateFunction(" << func->getName();
const auto & parameters = func->getParameters();
const auto & argument_types = func->getArgumentTypes();
if (!parameters.empty())
{
stream << '(';
for (size_t i = 0; i < parameters.size(); ++i)
{
if (i)
stream << ", ";
stream << applyVisitor(FieldVisitorToString(), parameters[i]);
}
stream << ')';
}
for (const auto & argument_type : argument_types)
stream << ", " << argument_type->getName();
stream << ')';
return stream.str();
}
ColumnAggregateFunction::ColumnAggregateFunction(const AggregateFunctionPtr & func_)
: func(func_), type_string(getTypeString(func))
{
}
ColumnAggregateFunction::ColumnAggregateFunction(const AggregateFunctionPtr & func_, const ConstArenas & arenas_)
: foreign_arenas(arenas_), func(func_), type_string(getTypeString(func))
{
}
void ColumnAggregateFunction::set(const AggregateFunctionPtr & func_)
{
func = func_;
type_string = getTypeString(func);
}
ColumnAggregateFunction::~ColumnAggregateFunction()
{
if (!func->hasTrivialDestructor() && !src)
@ -336,15 +382,10 @@ MutableColumnPtr ColumnAggregateFunction::cloneEmpty() const
return create(func);
}
String ColumnAggregateFunction::getTypeString() const
{
return DataTypeAggregateFunction(func, func->getArgumentTypes(), func->getParameters()).getName();
}
Field ColumnAggregateFunction::operator[](size_t n) const
{
Field field = AggregateFunctionStateData();
field.get<AggregateFunctionStateData &>().name = getTypeString();
field.get<AggregateFunctionStateData &>().name = type_string;
{
WriteBufferFromString buffer(field.get<AggregateFunctionStateData &>().data);
func->serialize(data[n], buffer);
@ -355,7 +396,7 @@ Field ColumnAggregateFunction::operator[](size_t n) const
void ColumnAggregateFunction::get(size_t n, Field & res) const
{
res = AggregateFunctionStateData();
res.get<AggregateFunctionStateData &>().name = getTypeString();
res.get<AggregateFunctionStateData &>().name = type_string;
{
WriteBufferFromString buffer(res.get<AggregateFunctionStateData &>().data);
func->serialize(data[n], buffer);
@ -425,8 +466,6 @@ static void pushBackAndCreateState(ColumnAggregateFunction::Container & data, Ar
void ColumnAggregateFunction::insert(const Field & x)
{
String type_string = getTypeString();
if (x.getType() != Field::Types::AggregateFunctionState)
throw Exception(String("Inserting field of type ") + x.getTypeName() + " into ColumnAggregateFunction. "
"Expected " + Field::Types::toString(Field::Types::AggregateFunctionState), ErrorCodes::LOGICAL_ERROR);
@ -564,7 +603,7 @@ void ColumnAggregateFunction::getExtremes(Field & min, Field & max) const
AggregateDataPtr place = place_buffer.data();
AggregateFunctionStateData serialized;
serialized.name = getTypeString();
serialized.name = type_string;
func->create(place);
try

View File

@ -74,6 +74,9 @@ private:
/// Array of pointers to aggregation states, that are placed in arenas.
Container data;
/// Name of the type to distinguish different aggregation states.
String type_string;
ColumnAggregateFunction() {}
/// Create a new column that has another column as a source.
@ -84,29 +87,17 @@ private:
/// but ownership of different elements cannot be mixed by different columns.
void ensureOwnership();
ColumnAggregateFunction(const AggregateFunctionPtr & func_)
: func(func_)
{
}
ColumnAggregateFunction(const AggregateFunctionPtr & func_);
ColumnAggregateFunction(const AggregateFunctionPtr & func_,
const ConstArenas & arenas_)
: foreign_arenas(arenas_), func(func_)
{
}
const ConstArenas & arenas_);
ColumnAggregateFunction(const ColumnAggregateFunction & src_);
String getTypeString() const;
public:
~ColumnAggregateFunction() override;
void set(const AggregateFunctionPtr & func_)
{
func = func_;
}
void set(const AggregateFunctionPtr & func_);
AggregateFunctionPtr getAggregateFunction() { return func; }
AggregateFunctionPtr getAggregateFunction() const { return func; }

View File

@ -14,6 +14,8 @@
#include <Formats/ProtobufWriter.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeFactory.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Parsers/ASTFunction.h>
@ -36,25 +38,25 @@ namespace ErrorCodes
std::string DataTypeAggregateFunction::doGetName() const
{
std::stringstream stream;
WriteBufferFromOwnString stream;
stream << "AggregateFunction(" << function->getName();
if (!parameters.empty())
{
stream << "(";
stream << '(';
for (size_t i = 0; i < parameters.size(); ++i)
{
if (i)
stream << ", ";
stream << applyVisitor(DB::FieldVisitorToString(), parameters[i]);
}
stream << ")";
stream << ')';
}
for (const auto & argument_type : argument_types)
stream << ", " << argument_type->getName();
stream << ")";
stream << ')';
return stream.str();
}

View File

@ -999,6 +999,73 @@ void Aggregator::convertToBlockImpl(
data.clearAndShrink();
}
template <typename Mapped>
inline void Aggregator::insertAggregatesIntoColumns(
Mapped & mapped,
MutableColumns & final_aggregate_columns) const
{
/** Final values of aggregate functions are inserted to columns.
* Then states of aggregate functions, that are not longer needed, are destroyed.
*
* We mark already destroyed states with "nullptr" in data,
* so they will not be destroyed in destructor of Aggregator
* (other values will be destroyed in destructor in case of exception).
*
* But it becomes tricky, because we have multiple aggregate states pointed by a single pointer in data.
* So, if exception is thrown in the middle of moving states for different aggregate functions,
* we have to catch exceptions and destroy all the states that are no longer needed,
* to keep the data in consistent state.
*
* It is also tricky, because there are aggregate functions with "-State" modifier.
* When we call "insertResultInto" for them, they insert a pointer to the state to ColumnAggregateFunction
* and ColumnAggregateFunction will take ownership of this state.
* So, for aggregate functions with "-State" modifier, the state must not be destroyed
* after it has been transferred to ColumnAggregateFunction.
* But we should mark that the data no longer owns these states.
*/
size_t insert_i = 0;
std::exception_ptr exception;
try
{
/// Insert final values of aggregate functions into columns.
for (; insert_i < params.aggregates_size; ++insert_i)
aggregate_functions[insert_i]->insertResultInto(
mapped + offsets_of_aggregate_states[insert_i],
*final_aggregate_columns[insert_i]);
}
catch (...)
{
exception = std::current_exception();
}
/** Destroy states that are no longer needed. This loop does not throw.
*
* Don't destroy states for "-State" aggregate functions,
* because the ownership of this state is transferred to ColumnAggregateFunction
* and ColumnAggregateFunction will take care.
*
* But it's only for states that has been transferred to ColumnAggregateFunction
* before exception has been thrown;
*/
for (size_t destroy_i = 0; destroy_i < params.aggregates_size; ++destroy_i)
{
/// If ownership was not transferred to ColumnAggregateFunction.
if (!(destroy_i < insert_i && aggregate_functions[destroy_i]->isState()))
aggregate_functions[destroy_i]->destroy(
mapped + offsets_of_aggregate_states[destroy_i]);
}
/// Mark the cell as destroyed so it will not be destroyed in destructor.
mapped = nullptr;
if (exception)
std::rethrow_exception(exception);
}
template <typename Method, typename Table>
void NO_INLINE Aggregator::convertToBlockImplFinal(
Method & method,
@ -1011,25 +1078,15 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
if (data.hasNullKeyData())
{
key_columns[0]->insertDefault();
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->insertResultInto(
data.getNullKeyData() + offsets_of_aggregate_states[i],
*final_aggregate_columns[i]);
insertAggregatesIntoColumns(data.getNullKeyData(), final_aggregate_columns);
}
}
data.forEachValue([&](const auto & key, auto & mapped)
{
method.insertKeyIntoColumns(key, key_columns, key_sizes);
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->insertResultInto(
mapped + offsets_of_aggregate_states[i],
*final_aggregate_columns[i]);
insertAggregatesIntoColumns(mapped, final_aggregate_columns);
});
destroyImpl<Method>(data);
}
template <typename Method, typename Table>
@ -1047,6 +1104,8 @@ void NO_INLINE Aggregator::convertToBlockImplNotFinal(
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i]->push_back(data.getNullKeyData() + offsets_of_aggregate_states[i]);
data.getNullKeyData() = nullptr;
}
}
@ -1187,16 +1246,16 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va
{
AggregatedDataWithoutKey & data = data_variants.without_key;
for (size_t i = 0; i < params.aggregates_size; ++i)
{
if (!final_)
aggregate_columns[i]->push_back(data + offsets_of_aggregate_states[i]);
else
aggregate_functions[i]->insertResultInto(data + offsets_of_aggregate_states[i], *final_aggregate_columns[i]);
}
if (!final_)
{
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_columns[i]->push_back(data + offsets_of_aggregate_states[i]);
data = nullptr;
}
else
{
insertAggregatesIntoColumns(data, final_aggregate_columns);
}
if (params.overflow_row)
for (size_t i = 0; i < params.keys_size; ++i)
@ -2387,8 +2446,7 @@ void NO_INLINE Aggregator::destroyImpl(Table & table) const
return;
for (size_t i = 0; i < params.aggregates_size; ++i)
if (!aggregate_functions[i]->isState())
aggregate_functions[i]->destroy(data + offsets_of_aggregate_states[i]);
aggregate_functions[i]->destroy(data + offsets_of_aggregate_states[i]);
data = nullptr;
});
@ -2402,8 +2460,7 @@ void Aggregator::destroyWithoutKey(AggregatedDataVariants & result) const
if (nullptr != res_data)
{
for (size_t i = 0; i < params.aggregates_size; ++i)
if (!aggregate_functions[i]->isState())
aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]);
aggregate_functions[i]->destroy(res_data + offsets_of_aggregate_states[i]);
res_data = nullptr;
}

View File

@ -1166,6 +1166,11 @@ protected:
MutableColumns & final_aggregate_columns,
bool final) const;
template <typename Mapped>
void insertAggregatesIntoColumns(
Mapped & mapped,
MutableColumns & final_aggregate_columns) const;
template <typename Method, typename Table>
void convertToBlockImplFinal(
Method & method,

View File

@ -1776,7 +1776,7 @@ void MergeTreeData::removePartsFromWorkingSet(const MergeTreeData::DataPartsVect
part->remove_time.store(remove_time, std::memory_order_relaxed);
if (part->state != IMergeTreeDataPart::State::Outdated)
modifyPartState(part,IMergeTreeDataPart::State::Outdated);
modifyPartState(part, IMergeTreeDataPart::State::Outdated);
}
}

View File

@ -141,16 +141,6 @@ void StorageMergeTree::shutdown()
mutation_wait_event.notify_all();
}
try
{
clearOldPartsFromFilesystem(true);
}
catch (...)
{
/// Example: the case of readonly filesystem, we have failure removing old parts.
/// Should not prevent table shutdown.
tryLogCurrentException(log);
}
merger_mutator.merges_blocker.cancelForever();
parts_mover.moves_blocker.cancelForever();
@ -160,6 +150,23 @@ void StorageMergeTree::shutdown()
if (moving_task_handle)
global_context.getBackgroundMovePool().removeTask(moving_task_handle);
try
{
/// We clear all old parts after stopping all background operations.
/// It's important, because background operations can produce temporary
/// parts which will remove themselves in their descrutors. If so, we
/// may have race condition between our remove call and background
/// process.
clearOldPartsFromFilesystem(true);
}
catch (...)
{
/// Example: the case of readonly filesystem, we have failure removing old parts.
/// Should not prevent table shutdown.
tryLogCurrentException(log);
}
}

View File

@ -2964,7 +2964,6 @@ void StorageReplicatedMergeTree::startup()
void StorageReplicatedMergeTree::shutdown()
{
clearOldPartsFromFilesystem(true);
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
fetcher.blocker.cancelForever();
merger_mutator.merges_blocker.cancelForever();
@ -3000,6 +2999,12 @@ void StorageReplicatedMergeTree::shutdown()
std::unique_lock lock(data_parts_exchange_endpoint->rwlock);
}
data_parts_exchange_endpoint.reset();
/// We clear all old parts after stopping all background operations. It's
/// important, because background operations can produce temporary parts
/// which will remove themselves in their descrutors. If so, we may have
/// race condition between our remove call and background process.
clearOldPartsFromFilesystem(true);
}

View File

@ -1,5 +1,6 @@
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/Context.h>
#include <TableFunctions/parseColumnsListForTableFunction.h>
@ -11,27 +12,20 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int SYNTAX_ERROR;
}
ColumnsDescription parseColumnsListFromString(const std::string & structure, const Context & context)
{
Expected expected;
Tokens tokens(structure.c_str(), structure.c_str() + structure.size());
IParser::Pos token_iterator(tokens, context.getSettingsRef().max_parser_depth);
ParserColumnDeclarationList parser;
ASTPtr columns_list_raw;
const Settings & settings = context.getSettingsRef();
if (!parser.parse(token_iterator, columns_list_raw, expected))
throw Exception("Cannot parse columns declaration list.", ErrorCodes::SYNTAX_ERROR);
ASTPtr columns_list_raw = parseQuery(parser, structure, "columns declaration list", settings.max_query_size, settings.max_parser_depth);
auto * columns_list = dynamic_cast<ASTExpressionList *>(columns_list_raw.get());
if (!columns_list)
throw Exception("Could not cast AST to ASTExpressionList", ErrorCodes::LOGICAL_ERROR);
return InterpreterCreateQuery::getColumnsDescription(*columns_list, context, !context.getSettingsRef().allow_suspicious_codecs);
return InterpreterCreateQuery::getColumnsDescription(*columns_list, context, !settings.allow_suspicious_codecs);
}
}

View File

@ -61,7 +61,7 @@ def get_used_disks_for_table(node, table_name, partition=None):
def check_used_disks_with_retry(node, table_name, expected_disks, retries):
for _ in range(retries):
used_disks = get_used_disks_for_table(node, table_name)
if set(used_disks) == expected_disks:
if set(used_disks).issubset(expected_disks):
return True
time.sleep(0.5)
return False
@ -830,7 +830,8 @@ def test_concurrent_alter_with_ttl_move(started_cluster, name, engine):
def optimize_table(num):
for i in range(num):
try: # optimize may throw after concurrent alter
node1.query("OPTIMIZE TABLE {} FINAL".format(name))
node1.query("OPTIMIZE TABLE {} FINAL".format(name), settings={'optimize_throw_if_noop': '1'})
break
except:
pass
@ -903,3 +904,93 @@ def test_double_move_while_select(started_cluster, name, positive):
finally:
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
@pytest.mark.parametrize("name,engine,positive", [
("mt_test_alter_with_merge_do_not_work","MergeTree()",0),
("replicated_mt_test_alter_with_merge_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_alter_with_merge_do_not_work', '1')",0),
("mt_test_alter_with_merge_work","MergeTree()",1),
("replicated_mt_test_alter_with_merge_work","ReplicatedMergeTree('/clickhouse/replicated_test_alter_with_merge_work', '1')",1),
])
def test_alter_with_merge_work(started_cluster, name, engine, positive):
"""Copyright 2019, Altinity LTD
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""
"""Check that TTL expressions are re-evaluated for
existing parts after ALTER command changes TTL expressions
and parts are merged.
"""
try:
node1.query("""
CREATE TABLE {name} (
s1 String,
d1 DateTime
) ENGINE = {engine}
ORDER BY tuple()
TTL d1 + INTERVAL 3000 SECOND TO DISK 'jbod2',
d1 + INTERVAL 6000 SECOND TO VOLUME 'external'
SETTINGS storage_policy='jbods_with_external', merge_with_ttl_timeout=0
""".format(name=name, engine=engine))
def optimize_table(num):
for i in range(num):
try: # optimize may throw after concurrent alter
node1.query("OPTIMIZE TABLE {} FINAL".format(name), settings={'optimize_throw_if_noop': '1'})
break
except:
pass
for p in range(3):
data = [] # 6MB in total
now = time.time()
for i in range(2):
s1 = get_random_string(1024 * 1024) # 1MB
d1 = now - 1 if positive else now + 300
data.append("('{}', toDateTime({}))".format(s1, d1))
values = ",".join(data)
node1.query("INSERT INTO {name} (s1, d1) VALUES {values}".format(name=name, values=values))
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod1", "jbod2"}
node1.query("SELECT count() FROM {name}".format(name=name)).splitlines() == ["6"]
node1.query("""
ALTER TABLE {name} MODIFY
TTL d1 + INTERVAL 0 SECOND TO DISK 'jbod2',
d1 + INTERVAL 5 SECOND TO VOLUME 'external',
d1 + INTERVAL 10 SECOND DELETE
""".format(name=name))
optimize_table(20)
assert node1.query("SELECT count() FROM system.parts WHERE table = '{name}' AND active = 1".format(name=name)) == "1\n"
time.sleep(5)
optimize_table(20)
if positive:
assert check_used_disks_with_retry(node1, name, set(["external"]), 50)
else:
assert check_used_disks_with_retry(node1, name, set(["jbod1", "jbod2"]), 50)
time.sleep(5)
optimize_table(20)
if positive:
assert node1.query("SELECT count() FROM {name}".format(name=name)) == "0\n"
else:
assert node1.query("SELECT count() FROM {name}".format(name=name)) == "6\n"
finally:
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))

View File

@ -0,0 +1,30 @@
<test>
<create_query>
CREATE TABLE test(
t UInt64,
q1 AggregateFunction(quantilesTiming(0.50, 0.75, 0.90, 0.99), Float64),
q2 AggregateFunction(quantilesTiming(0.50, 0.75, 0.90, 0.99), Float64),
q3 AggregateFunction(quantilesTiming(0.50, 0.75, 0.90, 0.99), Float64),
q4 AggregateFunction(quantilesTiming(0.50, 0.75, 0.90, 0.99), Float64),
q5 AggregateFunction(quantilesTiming(0.50, 0.75, 0.90, 0.99), Float64)
) ENGINE=SummingMergeTree()
ORDER BY t
</create_query>
<create_query>
INSERT INTO test
SELECT
number / 10 as t,
quantilesTimingState(0.50, 0.75, 0.90, 0.99)(number/1000) as q1,
quantilesTimingState(0.50, 0.75, 0.90, 0.99)(number/1000) as q2,
quantilesTimingState(0.50, 0.75, 0.90, 0.99)(number/1000) as q3,
quantilesTimingState(0.50, 0.75, 0.90, 0.99)(number/1000) as q4,
quantilesTimingState(0.50, 0.75, 0.90, 0.99)(number/1000) as q5
FROM numbers(1000 * 1000)
GROUP BY t
</create_query>
<query>OPTIMIZE TABLE test FINAL</query>
<drop_query>DROP TABLE test</drop_query>
</test>

View File

@ -100,8 +100,14 @@ wait
echo "Finishing alters"
# This alter will finish all previous, but replica 1 maybe still not up-to-date
while [[ $(timeout 30 $CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 MODIFY COLUMN value1 String SETTINGS replication_alter_partitions_sync=2" 2>&1) ]]; do
# This alter will finish all previous, but replica 1 maybe still not up-to-date.
# If query will throw something, than we will sleep 1 and retry. If timeout
# happened we will silently go out of loop and probably fail tests in the
# following for loop.
#
# 120 seconds is more than enough, but in rare cases for slow builds (debug,
# thread) it maybe necessary.
while [[ $(timeout 120 $CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 MODIFY COLUMN value1 String SETTINGS replication_alter_partitions_sync=2" 2>&1) ]]; do
sleep 1
done

View File

@ -33,11 +33,11 @@ LIMIT 10;
SELECT '-';
SELECT
toTypeName(i)s
FROM generateRandom('i Nullable(Enum16(\'h\' = 1, \'w\' = 5 , \'o\' = -200)))')
FROM generateRandom('i Nullable(Enum16(\'h\' = 1, \'w\' = 5 , \'o\' = -200))')
LIMIT 1;
SELECT
i
FROM generateRandom('i Nullable(Enum16(\'h\' = 1, \'w\' = 5 , \'o\' = -200)))', 1, 10, 10)
FROM generateRandom('i Nullable(Enum16(\'h\' = 1, \'w\' = 5 , \'o\' = -200))', 1, 10, 10)
LIMIT 10;
SELECT '-';
SELECT

View File

@ -17,14 +17,16 @@ function read_numbers_func()
function show_processes_func()
{
sleep 0.1;
# These two system metrics for the generating query above are guaranteed to be nonzero when ProcFS is mounted at /proc
$CLICKHOUSE_CLIENT -q "
SELECT count() > 0 FROM system.processes\
WHERE has(ProfileEvents.Names, 'OSCPUVirtualTimeMicroseconds') AND has(ProfileEvents.Names, 'OSReadChars')\
SETTINGS max_threads = 1
";
while true; do
sleep 0.1;
# These two system metrics for the generating query above are guaranteed to be nonzero when ProcFS is mounted at /proc
$CLICKHOUSE_CLIENT -q "
SELECT count() > 0 FROM system.processes\
WHERE has(ProfileEvents.Names, 'OSCPUVirtualTimeMicroseconds') AND has(ProfileEvents.Names, 'OSReadChars')\
SETTINGS max_threads = 1
" | grep '1' && break;
done
}

View File

@ -0,0 +1,2 @@
Memory limit (for query) exceeded
Ok

View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
function test()
{
for i in {1..1000}; do
$CLICKHOUSE_CLIENT --max_memory_usage 1G <<< "SELECT uniqExactState(number) FROM system.numbers_mt GROUP BY number % 10";
done
}
export -f test;
# If the memory leak exists, it will lead to OOM fairly quickly.
timeout 30 bash -c test 2>&1 | grep -o -F 'Memory limit (for query) exceeded' | uniq
echo 'Ok'

View File

@ -0,0 +1,16 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
function test()
{
for i in {1..250}; do
$CLICKHOUSE_CLIENT --query "SELECT groupArrayIfState(('Hello, world' AS s) || s || s || s || s || s || s || s || s || s, NOT throwIf(number > 10000000, 'Ok')) FROM system.numbers_mt GROUP BY number % 10";
done
}
export -f test;
# If the memory leak exists, it will lead to OOM fairly quickly.
timeout 30 bash -c test 2>&1 | grep -o -F 'Ok' | uniq