Merge branch 'master' into jit-compilation-migration-to-llvm-15

This commit is contained in:
Maksim Kita 2022-10-07 18:45:53 +03:00 committed by GitHub
commit 930bda9c72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 171 additions and 600 deletions

View File

@ -16,5 +16,5 @@ ClickHouse® is an open-source column-oriented database management system that a
* [Contacts](https://clickhouse.com/company/contact) can help to get your questions answered if there are any.
## Upcoming events
* [**v22.9 Release Webinar**](https://clickhouse.com/company/events/v22-9-release-webinar) Original creator, co-founder, and CTO of ClickHouse Alexey Milovidov will walk us through the highlights of the release, provide live demos, and share vision into what is coming in the roadmap.
* [**ClickHouse for Analytics @ Barracuda Networks**](https://www.meetup.com/clickhouse-silicon-valley-meetup-group/events/288140358/) Join us for this in person meetup hosted by our friends at Barracuda in Bay Area.
* [**v22.10 Release Webinar**](https://clickhouse.com/company/events/v22-10-release-webinar) Original creator, co-founder, and CTO of ClickHouse Alexey Milovidov will walk us through the highlights of the release, provide live demos, and share vision into what is coming in the roadmap.
* [**Introducing ClickHouse Cloud**](https://clickhouse.com/company/events/cloud-beta) Introducing ClickHouse as a service, built by creators and maintainers of the fastest OLAP database on earth. Join Tanya Bragin for a detailed walkthrough of ClickHouse Cloud capabilities, as well as a peek behind the curtain to understand the unique architecture that makes our service tick.

View File

@ -1599,7 +1599,7 @@ Right now it requires `optimize_skip_unused_shards` (the reason behind this is t
## optimize_throw_if_noop {#setting-optimize_throw_if_noop}
Enables or disables throwing an exception if an [OPTIMIZE](../../sql-reference/statements/misc.md#misc_operations-optimize) query didnt perform a merge.
Enables or disables throwing an exception if an [OPTIMIZE](../../sql-reference/statements/optimize.md) query didnt perform a merge.
By default, `OPTIMIZE` returns successfully even if it didnt do anything. This setting lets you differentiate these situations and get the reason in an exception message.

View File

@ -5,7 +5,7 @@ slug: /en/operations/system-tables/columns
Contains information about columns in all the tables.
You can use this table to get information similar to the [DESCRIBE TABLE](../../sql-reference/statements/misc.md#misc-describe-table) query, but for multiple tables at once.
You can use this table to get information similar to the [DESCRIBE TABLE](../../sql-reference/statements/describe-table.md) query, but for multiple tables at once.
Columns from [temporary tables](../../sql-reference/statements/create/table.md#temporary-tables) are visible in the `system.columns` only in those session where they have been created. They are shown with the empty `database` field.

View File

@ -127,7 +127,7 @@ Adds a comment to the column. If the `IF EXISTS` clause is specified, the query
Each column can have one comment. If a comment already exists for the column, a new comment overwrites the previous comment.
Comments are stored in the `comment_expression` column returned by the [DESCRIBE TABLE](../../../sql-reference/statements/misc.md#misc-describe-table) query.
Comments are stored in the `comment_expression` column returned by the [DESCRIBE TABLE](../../../sql-reference/statements/describe-table.md) query.
Example:
@ -253,7 +253,7 @@ The `ALTER` query lets you create and delete separate elements (columns) in nest
There is no support for deleting columns in the primary key or the sampling key (columns that are used in the `ENGINE` expression). Changing the type for columns that are included in the primary key is only possible if this change does not cause the data to be modified (for example, you are allowed to add values to an Enum or to change a type from `DateTime` to `UInt32`).
If the `ALTER` query is not sufficient to make the table changes you need, you can create a new table, copy the data to it using the [INSERT SELECT](../../../sql-reference/statements/insert-into.md#insert_query_insert-select) query, then switch the tables using the [RENAME](../../../sql-reference/statements/misc.md#misc_operations-rename) query and delete the old table. You can use the [clickhouse-copier](../../../operations/utilities/clickhouse-copier.md) as an alternative to the `INSERT SELECT` query.
If the `ALTER` query is not sufficient to make the table changes you need, you can create a new table, copy the data to it using the [INSERT SELECT](../../../sql-reference/statements/insert-into.md#insert_query_insert-select) query, then switch the tables using the [RENAME](../../../sql-reference/statements/rename.md#rename-table) query and delete the old table. You can use the [clickhouse-copier](../../../operations/utilities/clickhouse-copier.md) as an alternative to the `INSERT SELECT` query.
The `ALTER` query blocks all reads and writes for the table. In other words, if a long `SELECT` is running at the time of the `ALTER` query, the `ALTER` query will wait for it to complete. At the same time, all new queries to the same table will wait while this `ALTER` is running.

View File

@ -44,7 +44,7 @@ For `*MergeTree` tables mutations execute by **rewriting whole data parts**. The
Mutations are totally ordered by their creation order and are applied to each part in that order. Mutations are also partially ordered with `INSERT INTO` queries: data that was inserted into the table before the mutation was submitted will be mutated and data that was inserted after that will not be mutated. Note that mutations do not block inserts in any way.
A mutation query returns immediately after the mutation entry is added (in case of replicated tables to ZooKeeper, for non-replicated tables - to the filesystem). The mutation itself executes asynchronously using the system profile settings. To track the progress of mutations you can use the [`system.mutations`](../../../operations/system-tables/mutations.md#system_tables-mutations) table. A mutation that was successfully submitted will continue to execute even if ClickHouse servers are restarted. There is no way to roll back the mutation once it is submitted, but if the mutation is stuck for some reason it can be cancelled with the [`KILL MUTATION`](../../../sql-reference/statements/misc.md#kill-mutation) query.
A mutation query returns immediately after the mutation entry is added (in case of replicated tables to ZooKeeper, for non-replicated tables - to the filesystem). The mutation itself executes asynchronously using the system profile settings. To track the progress of mutations you can use the [`system.mutations`](../../../operations/system-tables/mutations.md#system_tables-mutations) table. A mutation that was successfully submitted will continue to execute even if ClickHouse servers are restarted. There is no way to roll back the mutation once it is submitted, but if the mutation is stuck for some reason it can be cancelled with the [`KILL MUTATION`](../../../sql-reference/statements/kill.md#kill-mutation) query.
Entries for finished mutations are not deleted right away (the number of preserved entries is determined by the `finished_mutations_to_keep` storage engine parameter). Older mutation entries are deleted.

View File

@ -319,7 +319,7 @@ You can specify the partition expression in `ALTER ... PARTITION` queries in dif
Usage of quotes when specifying the partition depends on the type of partition expression. For example, for the `String` type, you have to specify its name in quotes (`'`). For the `Date` and `Int*` types no quotes are needed.
All the rules above are also true for the [OPTIMIZE](../../../sql-reference/statements/misc.md#misc_operations-optimize) query. If you need to specify the only partition when optimizing a non-partitioned table, set the expression `PARTITION tuple()`. For example:
All the rules above are also true for the [OPTIMIZE](../../../sql-reference/statements/optimize.md) query. If you need to specify the only partition when optimizing a non-partitioned table, set the expression `PARTITION tuple()`. For example:
``` sql
OPTIMIZE TABLE table_not_partitioned PARTITION tuple() FINAL;

View File

@ -1,7 +1,7 @@
---
slug: /en/sql-reference/statements/check-table
sidebar_position: 41
sidebar_label: CHECK
sidebar_label: CHECK TABLE
title: "CHECK TABLE Statement"
---

View File

@ -1,7 +1,7 @@
---
slug: /en/sql-reference/statements/describe-table
sidebar_position: 42
sidebar_label: DESCRIBE
sidebar_label: DESCRIBE TABLE
title: "DESCRIBE TABLE"
---

View File

@ -221,7 +221,7 @@ By default, a user account or a role has no privileges.
If a user or a role has no privileges, it is displayed as [NONE](#grant-none) privilege.
Some queries by their implementation require a set of privileges. For example, to execute the [RENAME](../../sql-reference/statements/misc.md#misc_operations-rename) query you need the following privileges: `SELECT`, `CREATE TABLE`, `INSERT` and `DROP TABLE`.
Some queries by their implementation require a set of privileges. For example, to execute the [RENAME](../../sql-reference/statements/optimize.md) query you need the following privileges: `SELECT`, `CREATE TABLE`, `INSERT` and `DROP TABLE`.
### SELECT
@ -304,11 +304,11 @@ Examples of how this hierarchy is treated:
- The `MODIFY SETTING` privilege allows modifying table engine settings. It does not affect settings or server configuration parameters.
- The `ATTACH` operation needs the [CREATE](#grant-create) privilege.
- The `DETACH` operation needs the [DROP](#grant-drop) privilege.
- To stop mutation by the [KILL MUTATION](../../sql-reference/statements/misc.md#kill-mutation) query, you need to have a privilege to start this mutation. For example, if you want to stop the `ALTER UPDATE` query, you need the `ALTER UPDATE`, `ALTER TABLE`, or `ALTER` privilege.
- To stop mutation by the [KILL MUTATION](../../sql-reference/statements/kill.md#kill-mutation) query, you need to have a privilege to start this mutation. For example, if you want to stop the `ALTER UPDATE` query, you need the `ALTER UPDATE`, `ALTER TABLE`, or `ALTER` privilege.
### CREATE
Allows executing [CREATE](../../sql-reference/statements/create/index.md) and [ATTACH](../../sql-reference/statements/misc.md#attach) DDL-queries according to the following hierarchy of privileges:
Allows executing [CREATE](../../sql-reference/statements/create/index.md) and [ATTACH](../../sql-reference/statements/attach.md) DDL-queries according to the following hierarchy of privileges:
- `CREATE`. Level: `GROUP`
- `CREATE DATABASE`. Level: `DATABASE`
@ -323,7 +323,7 @@ Allows executing [CREATE](../../sql-reference/statements/create/index.md) and [A
### DROP
Allows executing [DROP](../../sql-reference/statements/misc.md#drop) and [DETACH](../../sql-reference/statements/misc.md#detach) queries according to the following hierarchy of privileges:
Allows executing [DROP](../../sql-reference/statements/drop.md) and [DETACH](../../sql-reference/statements/detach.md) queries according to the following hierarchy of privileges:
- `DROP`. Level: `GROUP`
- `DROP DATABASE`. Level: `DATABASE`
@ -333,13 +333,13 @@ Allows executing [DROP](../../sql-reference/statements/misc.md#drop) and [DETACH
### TRUNCATE
Allows executing [TRUNCATE](../../sql-reference/statements/misc.md#truncate-statement) queries.
Allows executing [TRUNCATE](../../sql-reference/statements/truncate.md) queries.
Privilege level: `TABLE`.
### OPTIMIZE
Allows executing [OPTIMIZE TABLE](../../sql-reference/statements/misc.md#misc_operations-optimize) queries.
Allows executing [OPTIMIZE TABLE](../../sql-reference/statements/optimize.md) queries.
Privilege level: `TABLE`.
@ -359,7 +359,7 @@ A user has the `SHOW` privilege if it has any other privilege concerning the spe
### KILL QUERY
Allows executing [KILL](../../sql-reference/statements/misc.md#kill-query-statement) queries according to the following hierarchy of privileges:
Allows executing [KILL](../../sql-reference/statements/kill.md#kill-query) queries according to the following hierarchy of privileges:
Privilege level: `GLOBAL`.

View File

@ -488,7 +488,7 @@ FORMAT TSV
max_insert_block_size 1048576 0 "The maximum block size for insertion, if we control the creation of blocks for insertion."
```
Optionally you can [OPTIMIZE](../sql-reference/statements/misc.md#misc_operations-optimize) the tables after import. Tables that are configured with an engine from MergeTree-family always do merges of data parts in the background to optimize data storage (or at least check if it makes sense). These queries force the table engine to do storage optimization right now instead of some time later:
Optionally you can [OPTIMIZE](../sql-reference/statements/optimize.md) the tables after import. Tables that are configured with an engine from MergeTree-family always do merges of data parts in the background to optimize data storage (or at least check if it makes sense). These queries force the table engine to do storage optimization right now instead of some time later:
``` bash
clickhouse-client --query "OPTIMIZE TABLE tutorial.hits_v1 FINAL"

View File

@ -64,7 +64,7 @@ ClickHouse поддерживает управление доступом на
- [CREATE USER](../sql-reference/statements/create/user.md#create-user-statement)
- [ALTER USER](../sql-reference/statements/alter/user.md)
- [DROP USER](../sql-reference/statements/misc.md#drop-user-statement)
- [DROP USER](../sql-reference/statements/drop.md#drop-user)
- [SHOW CREATE USER](../sql-reference/statements/show.md#show-create-user-statement)
### Применение настроек {#access-control-settings-applying}
@ -91,9 +91,9 @@ ClickHouse поддерживает управление доступом на
- [CREATE ROLE](../sql-reference/statements/create/index.md#create-role-statement)
- [ALTER ROLE](../sql-reference/statements/alter/role.md)
- [DROP ROLE](../sql-reference/statements/misc.md#drop-role-statement)
- [SET ROLE](../sql-reference/statements/misc.md#set-role-statement)
- [SET DEFAULT ROLE](../sql-reference/statements/misc.md#set-default-role-statement)
- [DROP ROLE](../sql-reference/statements/drop.md#drop-role)
- [SET ROLE](../sql-reference/statements/set-role.md)
- [SET DEFAULT ROLE](../sql-reference/statements/set-role.md#set-default-role)
- [SHOW CREATE ROLE](../sql-reference/statements/show.md#show-create-role-statement)
Привилегии можно присвоить роли с помощью запроса [GRANT](../sql-reference/statements/grant.md). Для отзыва привилегий у роли ClickHouse предоставляет запрос [REVOKE](../sql-reference/statements/revoke.md).
@ -106,7 +106,7 @@ ClickHouse поддерживает управление доступом на
- [CREATE ROW POLICY](../sql-reference/statements/create/index.md#create-row-policy-statement)
- [ALTER ROW POLICY](../sql-reference/statements/alter/row-policy.md)
- [DROP ROW POLICY](../sql-reference/statements/misc.md#drop-row-policy-statement)
- [DROP ROW POLICY](../sql-reference/statements/drop.md#drop-row-policy)
- [SHOW CREATE ROW POLICY](../sql-reference/statements/show.md#show-create-row-policy-statement)
@ -118,7 +118,7 @@ ClickHouse поддерживает управление доступом на
- [CREATE SETTINGS PROFILE](../sql-reference/statements/create/index.md#create-settings-profile-statement)
- [ALTER SETTINGS PROFILE](../sql-reference/statements/alter/settings-profile.md)
- [DROP SETTINGS PROFILE](../sql-reference/statements/misc.md#drop-settings-profile-statement)
- [DROP SETTINGS PROFILE](../sql-reference/statements/drop.md#drop-settings-profile)
- [SHOW CREATE SETTINGS PROFILE](../sql-reference/statements/show.md#show-create-settings-profile-statement)
@ -132,7 +132,7 @@ ClickHouse поддерживает управление доступом на
- [CREATE QUOTA](../sql-reference/statements/create/index.md#create-quota-statement)
- [ALTER QUOTA](../sql-reference/statements/alter/quota.md)
- [DROP QUOTA](../sql-reference/statements/misc.md#drop-quota-statement)
- [DROP QUOTA](../sql-reference/statements/drop.md#drop-quota)
- [SHOW CREATE QUOTA](../sql-reference/statements/show.md#show-create-quota-statement)

View File

@ -1986,7 +1986,7 @@ SELECT * FROM test_table
## optimize_throw_if_noop {#setting-optimize_throw_if_noop}
Включает или отключает генерирование исключения в случаях, когда запрос [OPTIMIZE](../../sql-reference/statements/misc.md#misc_operations-optimize) не выполняет мёрж.
Включает или отключает генерирование исключения в случаях, когда запрос [OPTIMIZE](../../sql-reference/statements/optimize.md) не выполняет мёрж.
По умолчанию, `OPTIMIZE` завершается успешно и в тех случаях, когда он ничего не сделал. Настройка позволяет отделить подобные случаи и включает генерирование исключения с поясняющим сообщением.

View File

@ -5,7 +5,7 @@ slug: /ru/operations/system-tables/columns
Содержит информацию о столбцах всех таблиц.
С помощью этой таблицы можно получить информацию аналогично запросу [DESCRIBE TABLE](../../sql-reference/statements/misc.md#misc-describe-table), но для многих таблиц сразу.
С помощью этой таблицы можно получить информацию аналогично запросу [DESCRIBE TABLE](../../sql-reference/statements/describe-table.md), но для многих таблиц сразу.
Колонки [временных таблиц](../../sql-reference/statements/create/table.md#temporary-tables) содержатся в `system.columns` только в тех сессиях, в которых эти таблицы были созданы. Поле `database` у таких колонок пустое.

View File

@ -10,5 +10,4 @@ sidebar_position: 28
- [INSERT INTO](statements/insert-into.md)
- [CREATE](statements/create/index.md)
- [ALTER](statements/alter/index.md#query_language_queries_alter)
- [Прочие виды запросов](statements/misc.md)

View File

@ -128,7 +128,7 @@ COMMENT COLUMN [IF EXISTS] name 'Text comment'
Каждый столбец может содержать только один комментарий. При выполнении запроса существующий комментарий заменяется на новый.
Посмотреть комментарии можно в столбце `comment_expression` из запроса [DESCRIBE TABLE](../misc.md#misc-describe-table).
Посмотреть комментарии можно в столбце `comment_expression` из запроса [DESCRIBE TABLE](../describe-table.md).
Пример:
@ -254,7 +254,7 @@ SELECT groupArray(x), groupArray(s) FROM tmp;
Отсутствует возможность удалять столбцы, входящие в первичный ключ или ключ для сэмплирования (в общем, входящие в выражение `ENGINE`). Изменение типа у столбцов, входящих в первичный ключ возможно только в том случае, если это изменение не приводит к изменению данных (например, разрешено добавление значения в Enum или изменение типа с `DateTime` на `UInt32`).
Если возможностей запроса `ALTER` не хватает для нужного изменения таблицы, вы можете создать новую таблицу, скопировать туда данные с помощью запроса [INSERT SELECT](../insert-into.md#insert_query_insert-select), затем поменять таблицы местами с помощью запроса [RENAME](../misc.md#misc_operations-rename), и удалить старую таблицу. В качестве альтернативы для запроса `INSERT SELECT`, можно использовать инструмент [clickhouse-copier](../../../sql-reference/statements/alter/index.md).
Если возможностей запроса `ALTER` не хватает для нужного изменения таблицы, вы можете создать новую таблицу, скопировать туда данные с помощью запроса [INSERT SELECT](../insert-into.md#insert_query_insert-select), затем поменять таблицы местами с помощью запроса [RENAME](../rename.md#rename-table), и удалить старую таблицу. В качестве альтернативы для запроса `INSERT SELECT`, можно использовать инструмент [clickhouse-copier](../../../sql-reference/statements/alter/index.md).
Запрос `ALTER` блокирует все чтения и записи для таблицы. То есть если на момент запроса `ALTER` выполнялся долгий `SELECT`, то запрос `ALTER` сначала дождётся его выполнения. И в это время все новые запросы к той же таблице будут ждать, пока завершится этот `ALTER`.

View File

@ -1,7 +1,7 @@
---
slug: /ru/sql-reference/statements/check-table
sidebar_position: 41
sidebar_label: CHECK
sidebar_label: CHECK TABLE
---
# CHECK TABLE Statement {#check-table}

View File

@ -17,13 +17,13 @@ CREATE ROLE [IF NOT EXISTS | OR REPLACE] name1 [ON CLUSTER cluster_name1] [, nam
## Управление ролями {#managing-roles}
Одному пользователю можно назначить несколько ролей. Пользователи могут применять назначенные роли в произвольных комбинациях с помощью выражения [SET ROLE](../misc.md#set-role-statement). Конечный объем привилегий — это комбинация всех привилегий всех примененных ролей. Если у пользователя имеются привилегии, присвоенные его аккаунту напрямую, они также прибавляются к привилегиям, присвоенным через роли.
Одному пользователю можно назначить несколько ролей. Пользователи могут применять назначенные роли в произвольных комбинациях с помощью выражения [SET ROLE](../set-role.md). Конечный объем привилегий — это комбинация всех привилегий всех примененных ролей. Если у пользователя имеются привилегии, присвоенные его аккаунту напрямую, они также прибавляются к привилегиям, присвоенным через роли.
Роли по умолчанию применяются при входе пользователя в систему. Установить роли по умолчанию можно с помощью выражений [SET DEFAULT ROLE](../misc.md#set-default-role-statement) или [ALTER USER](../alter/index.md#alter-user-statement).
Роли по умолчанию применяются при входе пользователя в систему. Установить роли по умолчанию можно с помощью выражений [SET DEFAULT ROLE](../set-role.md#set-default-role) или [ALTER USER](../alter/index.md#alter-user-statement).
Для отзыва роли используется выражение [REVOKE](../../../sql-reference/statements/revoke.md).
Для удаления роли используется выражение [DROP ROLE](../misc.md#drop-role-statement). Удаленная роль автоматически отзывается у всех пользователей, которым была назначена.
Для удаления роли используется выражение [DROP ROLE](../drop.md#drop-role). Удаленная роль автоматически отзывается у всех пользователей, которым была назначена.
## Примеры {#create-role-examples}

View File

@ -1,7 +1,7 @@
---
slug: /ru/sql-reference/statements/describe-table
sidebar_position: 42
sidebar_label: DESCRIBE
sidebar_label: DESCRIBE TABLE
---
# DESCRIBE TABLE {#misc-describe-table}

View File

@ -221,7 +221,7 @@ GRANT SELECT(x,y) ON db.table TO john WITH GRANT OPTION
Отсутствие привилегий у пользователя или роли отображается как привилегия [NONE](#grant-none).
Выполнение некоторых запросов требует определенного набора привилегий. Например, чтобы выполнить запрос [RENAME](misc.md#misc_operations-rename), нужны следующие привилегии: `SELECT`, `CREATE TABLE`, `INSERT` и `DROP TABLE`.
Выполнение некоторых запросов требует определенного набора привилегий. Например, чтобы выполнить запрос [RENAME](rename.md#rename-table), нужны следующие привилегии: `SELECT`, `CREATE TABLE`, `INSERT` и `DROP TABLE`.
### SELECT {#grant-select}
@ -309,7 +309,7 @@ GRANT INSERT(x,y) ON db.table TO john
### CREATE {#grant-create}
Разрешает выполнять DDL-запросы [CREATE](../../sql-reference/statements/create/index.md) и [ATTACH](misc.md#attach) в соответствии со следующей иерархией привилегий:
Разрешает выполнять DDL-запросы [CREATE](../../sql-reference/statements/create/index.md) и [ATTACH](attach.md) в соответствии со следующей иерархией привилегий:
- `CREATE`. Уровень: `GROUP`
- `CREATE DATABASE`. Уровень: `DATABASE`
@ -324,7 +324,7 @@ GRANT INSERT(x,y) ON db.table TO john
### DROP {#grant-drop}
Разрешает выполнять запросы [DROP](misc.md#drop) и [DETACH](misc.md#detach-statement) в соответствии со следующей иерархией привилегий:
Разрешает выполнять запросы [DROP](drop.md) и [DETACH](detach.md) в соответствии со следующей иерархией привилегий:
- `DROP`. Уровень: `GROUP`
- `DROP DATABASE`. Уровень: `DATABASE`
@ -340,7 +340,7 @@ GRANT INSERT(x,y) ON db.table TO john
### OPTIMIZE {#grant-optimize}
Разрешает выполнять запросы [OPTIMIZE TABLE](misc.md#misc_operations-optimize).
Разрешает выполнять запросы [OPTIMIZE TABLE](optimize.md).
Уровень: `TABLE`.

View File

@ -4,7 +4,6 @@
#include <Common/logger_useful.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <IO/ReadSettings.h>
namespace CurrentMetrics
@ -42,6 +41,7 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
size_t min_bytes_for_seek_)
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0)
, read_settings(settings_)
, reader(reader_)
, priority(settings_.priority)
, impl(impl_)
@ -125,6 +125,7 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch()
return;
/// Prefetch even in case hasPendingData() == true.
chassert(prefetch_buffer.size() == read_settings.remote_fs_buffer_size);
prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size());
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
}
@ -199,6 +200,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::nextImpl()
{
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads);
chassert(memory.size() == read_settings.remote_fs_buffer_size);
auto result = asyncReadInto(memory.data(), memory.size()).get();
size = result.size;
auto offset = result.offset;

View File

@ -3,6 +3,7 @@
#include "config.h"
#include <IO/ReadBufferFromFile.h>
#include <IO/AsynchronousReader.h>
#include <IO/ReadSettings.h>
#include <utility>
namespace Poco { class Logger; }
@ -11,7 +12,6 @@ namespace DB
{
class ReadBufferFromRemoteFSGather;
struct ReadSettings;
/**
* Reads data from S3/HDFS/Web using stored paths in metadata.
@ -64,6 +64,8 @@ private:
std::future<IAsynchronousReader::Result> asyncReadInto(char * data, size_t size);
ReadSettings read_settings;
IAsynchronousReader & reader;
Int32 priority;

View File

@ -1,7 +1,6 @@
#include "ReadIndirectBufferFromRemoteFS.h"
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <IO/ReadSettings.h>
namespace DB
@ -17,6 +16,7 @@ ReadIndirectBufferFromRemoteFS::ReadIndirectBufferFromRemoteFS(
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_, const ReadSettings & settings)
: ReadBufferFromFileBase(settings.remote_fs_buffer_size, nullptr, 0)
, impl(impl_)
, read_settings(settings)
{
}
@ -92,24 +92,27 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence)
bool ReadIndirectBufferFromRemoteFS::nextImpl()
{
/// Transfer current position and working_buffer to actual ReadBuffer
swap(*impl);
chassert(internal_buffer.size() == read_settings.remote_fs_buffer_size);
chassert(file_offset_of_buffer_end <= impl->getFileSize());
assert(!impl->hasPendingData());
/// Position and working_buffer will be updated in next() call
auto result = impl->next();
/// and assigned to current buffer.
swap(*impl);
auto [size, offset] = impl->readInto(internal_buffer.begin(), internal_buffer.size(), file_offset_of_buffer_end, /* ignore */0);
if (result)
chassert(offset <= size);
chassert(size <= internal_buffer.size());
if (size)
{
file_offset_of_buffer_end += available();
BufferBase::set(working_buffer.begin() + offset(), available(), 0);
file_offset_of_buffer_end = impl->getFileOffsetOfBufferEnd();
working_buffer = Buffer(internal_buffer.begin() + offset, internal_buffer.begin() + size);
}
assert(file_offset_of_buffer_end == impl->file_offset_of_buffer_end);
/// In case of multiple files for the same file in clickhouse (i.e. log family)
/// file_offset_of_buffer_end will not match getImplementationBufferOffset()
/// so we use [impl->getImplementationBufferOffset(), impl->getFileSize()]
chassert(file_offset_of_buffer_end >= impl->getImplementationBufferOffset());
chassert(file_offset_of_buffer_end <= impl->getFileSize());
return result;
return size;
}
}

View File

@ -2,6 +2,7 @@
#include "config.h"
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadSettings.h>
#include <utility>
@ -9,7 +10,6 @@ namespace DB
{
class ReadBufferFromRemoteFSGather;
struct ReadSettings;
/**
* Reads data from S3/HDFS/Web using stored paths in metadata.
@ -40,6 +40,8 @@ private:
std::shared_ptr<ReadBufferFromRemoteFSGather> impl;
ReadSettings read_settings;
size_t file_offset_of_buffer_end = 0;
};

View File

@ -43,7 +43,6 @@ REGISTER_FUNCTION(Hashing)
factory.registerFunction<FunctionWyHash64>();
#if USE_BLAKE3
factory.registerFunction<FunctionBLAKE3>(
{
R"(
@ -53,10 +52,9 @@ The function is rather fast and shows approximately two times faster performance
It returns a BLAKE3 hash as a byte array with type FixedString(32).
)",
Documentation::Examples{
{"hash", "SELECT hex(blake3('ABC'))"}},
{"hash", "SELECT hex(BLAKE3('ABC'))"}},
Documentation::Categories{"Hash"}
},
FunctionFactory::CaseSensitive);
#endif
}
}

View File

@ -61,6 +61,7 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int NOT_IMPLEMENTED;
extern const int ILLEGAL_COLUMN;
extern const int SUPPORT_IS_DISABLED;
}
@ -618,13 +619,20 @@ struct ImplXxHash64
static constexpr bool use_int_hash_for_pods = false;
};
#if USE_BLAKE3
struct ImplBLAKE3
{
static constexpr auto name = "blake3";
static constexpr auto name = "BLAKE3";
enum { length = 32 };
#if !USE_BLAKE3
[[noreturn]] static void apply(const char * begin, const size_t size, unsigned char* out_char_data)
{
UNUSED(begin);
UNUSED(size);
UNUSED(out_char_data);
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "BLAKE3 is not available. Rust code or BLAKE3 itself may be disabled.");
}
#else
static void apply(const char * begin, const size_t size, unsigned char* out_char_data)
{
#if defined(MEMORY_SANITIZER)
@ -640,9 +648,8 @@ struct ImplBLAKE3
throw Exception("Function returned error message: " + std::string(err_msg), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
}
#endif
};
#endif
template <typename Impl>
class FunctionStringHashFixedString : public IFunction
@ -1502,9 +1509,5 @@ using FunctionXxHash32 = FunctionAnyHash<ImplXxHash32>;
using FunctionXxHash64 = FunctionAnyHash<ImplXxHash64>;
using FunctionWyHash64 = FunctionAnyHash<ImplWyHash64>;
#if USE_BLAKE3
using FunctionBLAKE3 = FunctionStringHashFixedString<ImplBLAKE3>;
#endif
using FunctionBLAKE3 = FunctionStringHashFixedString<ImplBLAKE3>;
}

View File

@ -331,7 +331,7 @@ BlockIO InterpreterInsertQuery::execute()
if (!query.table_function)
getContext()->checkAccess(AccessType::INSERT, query.table_id, query_sample_block.getNames());
if (query.select && settings.parallel_distributed_insert_select)
if (query.select && table->isRemote() && settings.parallel_distributed_insert_select)
// Distributed INSERT SELECT
distributed_pipeline = table->distributedWrite(query, getContext());

View File

@ -1,5 +1,4 @@
#include "config.h"
#include "Interpreters/Context_fwd.h"
#if USE_HDFS
@ -42,7 +41,7 @@ StorageHDFSCluster::StorageHDFSCluster(
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & compression_method_)
: IStorageCluster(table_id_)
: IStorage(table_id_)
, cluster_name(cluster_name_)
, uri(uri_)
, format_name(format_name_)
@ -75,8 +74,13 @@ Pipe StorageHDFSCluster::read(
size_t /*max_block_size*/,
unsigned /*num_streams*/)
{
auto cluster = getCluster(context);
auto extension = getTaskIteratorExtension(query_info.query, context);
auto cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
auto iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context, uri);
auto callback = std::make_shared<HDFSSource::IteratorWrapper>([iterator]() mutable -> String
{
return iterator->next();
});
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
Block header =
@ -113,7 +117,7 @@ Pipe StorageHDFSCluster::read(
scalars,
Tables(),
processed_stage,
extension);
RemoteQueryExecutor::Extension{.task_iterator = callback});
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, add_agg_info, false));
}
@ -136,18 +140,6 @@ QueryProcessingStage::Enum StorageHDFSCluster::getQueryProcessingStage(
}
ClusterPtr StorageHDFSCluster::getCluster(ContextPtr context) const
{
return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
}
RemoteQueryExecutor::Extension StorageHDFSCluster::getTaskIteratorExtension(ASTPtr, ContextPtr context) const
{
auto iterator = std::make_shared<HDFSSource::DisclosedGlobIterator>(context, uri);
auto callback = std::make_shared<HDFSSource::IteratorWrapper>([iter = std::move(iterator)]() mutable -> String { return iter->next(); });
return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)};
}
NamesAndTypesList StorageHDFSCluster::getVirtuals() const
{
return NamesAndTypesList{

View File

@ -9,7 +9,6 @@
#include <Client/Connection.h>
#include <Interpreters/Cluster.h>
#include <Storages/IStorageCluster.h>
#include <Storages/HDFS/StorageHDFS.h>
namespace DB
@ -17,7 +16,7 @@ namespace DB
class Context;
class StorageHDFSCluster : public IStorageCluster
class StorageHDFSCluster : public IStorage
{
public:
StorageHDFSCluster(
@ -40,9 +39,6 @@ public:
NamesAndTypesList getVirtuals() const override;
ClusterPtr getCluster(ContextPtr context) const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, ContextPtr context) const override;
private:
String cluster_name;
String uri;

View File

@ -1,29 +0,0 @@
#pragma once
#include <Storages/IStorage.h>
#include <Interpreters/Cluster.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
namespace DB
{
/**
* Base cluster for Storages used in table functions like s3Cluster and hdfsCluster
* Needed for code simplification around parallel_distributed_insert_select
*/
class IStorageCluster : public IStorage
{
public:
explicit IStorageCluster(const StorageID & table_id_) : IStorage(table_id_) {}
virtual ClusterPtr getCluster(ContextPtr context) const = 0;
/// Query is needed for pruning by virtual columns (_file, _path)
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, ContextPtr context) const = 0;
bool isRemote() const override { return true; }
};
}

View File

@ -59,8 +59,6 @@
#include <TableFunctions/TableFunctionView.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Storages/IStorageCluster.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
@ -761,35 +759,55 @@ SinkToStoragePtr StorageDistributed::write(const ASTPtr &, const StorageMetadata
}
std::optional<QueryPipeline> StorageDistributed::distributedWriteBetweenDistributedTables(const StorageDistributed & src_distributed, const ASTInsertQuery & query, ContextPtr local_context) const
std::optional<QueryPipeline> StorageDistributed::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context)
{
const auto & settings = local_context->getSettingsRef();
QueryPipeline pipeline;
const Settings & settings = local_context->getSettingsRef();
if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
std::shared_ptr<StorageDistributed> storage_src;
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
auto new_query = std::dynamic_pointer_cast<ASTInsertQuery>(query.clone());
/// Unwrap view() function.
if (src_distributed.remote_table_function_ptr)
if (select.list_of_selects->children.size() == 1)
{
const TableFunctionPtr src_table_function =
TableFunctionFactory::instance().get(src_distributed.remote_table_function_ptr, local_context);
const TableFunctionView * view_function =
assert_cast<const TableFunctionView *>(src_table_function.get());
new_query->select = view_function->getSelectQuery().clone();
}
else
{
const auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
if (auto * select_query = select.list_of_selects->children.at(0)->as<ASTSelectQuery>())
{
JoinedTables joined_tables(Context::createCopy(local_context), *select_query);
auto * select = query.select->as<ASTSelectWithUnionQuery &>().list_of_selects->children.at(0)->as<ASTSelectQuery>();
auto new_select_query = std::dynamic_pointer_cast<ASTSelectQuery>(select->clone());
select_with_union_query->list_of_selects->children.push_back(new_select_query);
if (joined_tables.tablesCount() == 1)
{
storage_src = std::dynamic_pointer_cast<StorageDistributed>(joined_tables.getLeftTableStorage());
if (storage_src)
{
/// Unwrap view() function.
if (storage_src->remote_table_function_ptr)
{
const TableFunctionPtr src_table_function =
TableFunctionFactory::instance().get(storage_src->remote_table_function_ptr, local_context);
const TableFunctionView * view_function =
assert_cast<const TableFunctionView *>(src_table_function.get());
new_query->select = view_function->getSelectQuery().clone();
}
else
{
const auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
new_select_query->replaceDatabaseAndTable(src_distributed.getRemoteDatabaseName(), src_distributed.getRemoteTableName());
auto new_select_query = std::dynamic_pointer_cast<ASTSelectQuery>(select_query->clone());
select_with_union_query->list_of_selects->children.push_back(new_select_query);
new_query->select = select_with_union_query;
new_select_query->replaceDatabaseAndTable(storage_src->getRemoteDatabaseName(), storage_src->getRemoteTableName());
new_query->select = select_with_union_query;
}
}
}
}
}
const Cluster::AddressesWithFailover & src_addresses = src_distributed.getCluster()->getShardsAddresses();
const Cluster::AddressesWithFailover & src_addresses = storage_src ? storage_src->getCluster()->getShardsAddresses() : Cluster::AddressesWithFailover{};
const Cluster::AddressesWithFailover & dst_addresses = getCluster()->getShardsAddresses();
/// Compare addresses instead of cluster name, to handle remote()/cluster().
/// (since for remote()/cluster() the getClusterName() is empty string)
@ -804,7 +822,7 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteBetweenDistribu
LOG_WARNING(log,
"Parallel distributed INSERT SELECT is not possible "
"(source cluster={} ({} addresses), destination cluster={} ({} addresses))",
src_distributed.getClusterName(),
storage_src ? storage_src->getClusterName() : "<not a Distributed table>",
src_addresses.size(),
getClusterName(),
dst_addresses.size());
@ -831,7 +849,6 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteBetweenDistribu
new_query_str = buf.str();
}
QueryPipeline pipeline;
ContextMutablePtr query_context = Context::createCopy(local_context);
++query_context->getClientInfo().distributed_depth;
@ -865,120 +882,6 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteBetweenDistribu
}
std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStorage(const IStorageCluster & src_storage_cluster, const ASTInsertQuery & query, ContextPtr local_context) const
{
const auto & settings = local_context->getSettingsRef();
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
/// Select query is needed for pruining on virtual columns
auto extension = src_storage_cluster.getTaskIteratorExtension(
select.list_of_selects->children.at(0)->as<ASTSelectQuery>()->clone(),
local_context);
auto dst_cluster = getCluster();
auto new_query = std::dynamic_pointer_cast<ASTInsertQuery>(query.clone());
if (settings.parallel_distributed_insert_select == PARALLEL_DISTRIBUTED_INSERT_SELECT_ALL)
{
new_query->table_id = StorageID(getRemoteDatabaseName(), getRemoteTableName());
/// Reset table function for INSERT INTO remote()/cluster()
new_query->table_function.reset();
}
String new_query_str;
{
WriteBufferFromOwnString buf;
IAST::FormatSettings ast_format_settings(buf, /*one_line*/ true);
ast_format_settings.always_quote_identifiers = true;
new_query->IAST::format(ast_format_settings);
new_query_str = buf.str();
}
QueryPipeline pipeline;
ContextMutablePtr query_context = Context::createCopy(local_context);
++query_context->getClientInfo().distributed_depth;
/// Here we take addresses from destination cluster and assume source table exists on these nodes
for (const auto & replicas : getCluster()->getShardsAddresses())
{
/// There will be only one replica, because we consider each replica as a shard
for (const auto & node : replicas)
{
auto connection = std::make_shared<Connection>(
node.host_name, node.port, query_context->getGlobalContext()->getCurrentDatabase(),
node.user, node.password, node.quota_key, node.cluster, node.cluster_secret,
"ParallelInsertSelectInititiator",
node.compression,
node.secure
);
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
connection,
new_query_str,
Block{},
query_context,
/*throttler=*/nullptr,
Scalars{},
Tables{},
QueryProcessingStage::Complete,
extension);
QueryPipeline remote_pipeline(std::make_shared<RemoteSource>(remote_query_executor, false, settings.async_socket_for_remote));
remote_pipeline.complete(std::make_shared<EmptySink>(remote_query_executor->getHeader()));
pipeline.addCompletedPipeline(std::move(remote_pipeline));
}
}
return pipeline;
}
std::optional<QueryPipeline> StorageDistributed::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context)
{
const Settings & settings = local_context->getSettingsRef();
if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
StoragePtr src_storage;
/// Distributed write only works in the most trivial case INSERT ... SELECT
/// without any unions or joins on the right side
if (select.list_of_selects->children.size() == 1)
{
if (auto * select_query = select.list_of_selects->children.at(0)->as<ASTSelectQuery>())
{
JoinedTables joined_tables(Context::createCopy(local_context), *select_query);
if (joined_tables.tablesCount() == 1)
{
src_storage = joined_tables.getLeftTableStorage();
}
}
}
if (!src_storage)
return {};
if (auto src_distributed = std::dynamic_pointer_cast<StorageDistributed>(src_storage))
{
return distributedWriteBetweenDistributedTables(*src_distributed, query, local_context);
}
if (auto src_storage_cluster = std::dynamic_pointer_cast<IStorageCluster>(src_storage))
{
return distributedWriteFromClusterStorage(*src_storage_cluster, query, local_context);
}
if (local_context->getClientInfo().distributed_depth == 0)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parallel distributed INSERT SELECT is not possible. "\
"Reason: distributed reading is supported only from Distributed engine or *Cluster table functions, but got {} storage", src_storage->getName());
}
return {};
}
void StorageDistributed::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const
{
auto name_deps = getDependentViewsByColumn(local_context);

View File

@ -1,7 +1,6 @@
#pragma once
#include <Storages/IStorage.h>
#include <Storages/IStorageCluster.h>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <Storages/Distributed/DistributedSettings.h>
#include <Storages/getStructureOfRemoteTable.h>
@ -208,9 +207,6 @@ private:
void delayInsertOrThrowIfNeeded() const;
std::optional<QueryPipeline> distributedWriteFromClusterStorage(const IStorageCluster & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context) const;
std::optional<QueryPipeline> distributedWriteBetweenDistributedTables(const StorageDistributed & src_distributed, const ASTInsertQuery & query, ContextPtr context) const;
String remote_database;
String remote_table;
ASTPtr remote_table_function_ptr;

View File

@ -47,13 +47,11 @@
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTOptimizeQuery.h>
#include <Parsers/ASTPartition.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ExpressionListParsers.h>
@ -63,7 +61,6 @@
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/Sinks/EmptySink.h>
#include <IO/ReadBufferFromString.h>
#include <IO/Operators.h>
@ -78,7 +75,6 @@
#include <Interpreters/InterserverCredentials.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/JoinedTables.h>
#include <Backups/BackupEntriesCollector.h>
#include <Backups/IBackup.h>
@ -167,7 +163,6 @@ namespace ErrorCodes
extern const int CONCURRENT_ACCESS_NOT_SUPPORTED;
extern const int CHECKSUM_DOESNT_MATCH;
extern const int NOT_INITIALIZED;
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
}
namespace ActionLocks
@ -4458,106 +4453,6 @@ SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, con
}
std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWriteFromClusterStorage(const std::shared_ptr<IStorageCluster> & src_storage_cluster, const ASTInsertQuery & query, ContextPtr local_context)
{
const auto & settings = local_context->getSettingsRef();
auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context);
/// Here we won't check that the cluster formed from table replicas is a subset of a cluster specified in s3Cluster/hdfsCluster table function
auto src_cluster = src_storage_cluster->getCluster(local_context);
/// Actually the query doesn't change, we just serialize it to string
String query_str;
{
WriteBufferFromOwnString buf;
IAST::FormatSettings ast_format_settings(buf, /*one_line*/ true);
ast_format_settings.always_quote_identifiers = true;
query.IAST::format(ast_format_settings);
query_str = buf.str();
}
QueryPipeline pipeline;
ContextMutablePtr query_context = Context::createCopy(local_context);
++query_context->getClientInfo().distributed_depth;
for (const auto & replicas : src_cluster->getShardsAddresses())
{
/// There will be only one replica, because we consider each replica as a shard
for (const auto & node : replicas)
{
auto connection = std::make_shared<Connection>(
node.host_name, node.port, query_context->getGlobalContext()->getCurrentDatabase(),
node.user, node.password, node.quota_key, node.cluster, node.cluster_secret,
"ParallelInsertSelectInititiator",
node.compression,
node.secure
);
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
connection,
query_str,
Block{},
query_context,
/*throttler=*/nullptr,
Scalars{},
Tables{},
QueryProcessingStage::Complete,
extension);
QueryPipeline remote_pipeline(std::make_shared<RemoteSource>(remote_query_executor, false, settings.async_socket_for_remote));
remote_pipeline.complete(std::make_shared<EmptySink>(remote_query_executor->getHeader()));
pipeline.addCompletedPipeline(std::move(remote_pipeline));
}
}
return pipeline;
}
std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context)
{
/// Do not enable parallel distributed INSERT SELECT in case when query probably comes from another server
if (local_context->getClientInfo().query_kind != ClientInfo::QueryKind::INITIAL_QUERY)
return {};
const Settings & settings = local_context->getSettingsRef();
if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
StoragePtr src_storage;
if (select.list_of_selects->children.size() == 1)
{
if (auto * select_query = select.list_of_selects->children.at(0)->as<ASTSelectQuery>())
{
JoinedTables joined_tables(Context::createCopy(local_context), *select_query);
if (joined_tables.tablesCount() == 1)
{
src_storage = joined_tables.getLeftTableStorage();
}
}
}
if (!src_storage)
return {};
if (auto src_distributed = std::dynamic_pointer_cast<IStorageCluster>(src_storage))
{
return distributedWriteFromClusterStorage(src_distributed, query, local_context);
}
else if (local_context->getClientInfo().distributed_depth == 0)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parallel distributed INSERT SELECT is not possible. Reason: distributed "
"reading into Replicated table is supported only from *Cluster table functions, but got {} storage", src_storage->getName());
}
return {};
}
bool StorageReplicatedMergeTree::optimize(
const ASTPtr &,
const StorageMetadataPtr &,

View File

@ -4,7 +4,6 @@
#include <atomic>
#include <pcg_random.hpp>
#include <Storages/IStorage.h>
#include <Storages/IStorageCluster.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/MergeTreePartsMover.h>
@ -140,8 +139,6 @@ public:
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context) override;
std::optional<QueryPipeline> distributedWrite(const ASTInsertQuery & /*query*/, ContextPtr /*context*/) override;
bool optimize(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
@ -486,8 +483,6 @@ private:
std::mutex last_broken_disks_mutex;
std::set<String> last_broken_disks;
static std::optional<QueryPipeline> distributedWriteFromClusterStorage(const std::shared_ptr<IStorageCluster> & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context);
template <class Func>
void foreachActiveParts(Func && func, bool select_sequential_consistency) const;

View File

@ -51,7 +51,7 @@ StorageS3Cluster::StorageS3Cluster(
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_)
: IStorageCluster(table_id_)
: IStorage(table_id_)
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
, filename(configuration_.url)
, cluster_name(configuration_.cluster_name)
@ -101,8 +101,11 @@ Pipe StorageS3Cluster::read(
{
StorageS3::updateS3Configuration(context, s3_configuration);
auto cluster = getCluster(context);
auto extension = getTaskIteratorExtension(query_info.query, context);
auto cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.uri, query_info.query, virtual_block, context);
auto callback = std::make_shared<StorageS3Source::IteratorWrapper>([iterator]() mutable -> String { return iterator->next(); });
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
Block header =
@ -127,6 +130,7 @@ Pipe StorageS3Cluster::read(
node.secure
);
/// For unknown reason global context is passed to IStorage::read() method
/// So, task_identifier is passed as constructor argument. It is more obvious.
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
@ -138,7 +142,7 @@ Pipe StorageS3Cluster::read(
scalars,
Tables(),
processed_stage,
extension);
RemoteQueryExecutor::Extension{.task_iterator = callback});
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, add_agg_info, false));
}
@ -161,19 +165,6 @@ QueryProcessingStage::Enum StorageS3Cluster::getQueryProcessingStage(
}
ClusterPtr StorageS3Cluster::getCluster(ContextPtr context) const
{
return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
}
RemoteQueryExecutor::Extension StorageS3Cluster::getTaskIteratorExtension(ASTPtr query, ContextPtr context) const
{
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.uri, query, virtual_block, context);
auto callback = std::make_shared<StorageS3Source::IteratorWrapper>([iter = std::move(iterator)]() mutable -> String { return iter->next(); });
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
}
NamesAndTypesList StorageS3Cluster::getVirtuals() const
{
return virtual_columns;

View File

@ -10,7 +10,6 @@
#include "Client/Connection.h"
#include <Interpreters/Cluster.h>
#include <IO/S3Common.h>
#include <Storages/IStorageCluster.h>
#include <Storages/StorageS3.h>
namespace DB
@ -18,7 +17,7 @@ namespace DB
class Context;
class StorageS3Cluster : public IStorageCluster
class StorageS3Cluster : public IStorage
{
public:
StorageS3Cluster(
@ -38,11 +37,9 @@ public:
NamesAndTypesList getVirtuals() const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(ASTPtr query, ContextPtr context) const override;
ClusterPtr getCluster(ContextPtr context) const override;
private:
StorageS3::S3Configuration s3_configuration;
String filename;
String cluster_name;
String format_name;

View File

@ -20,23 +20,8 @@
</shard>
</cluster_simple>
<!-- A part of the cluster above, represents only one shard-->
<first_shard>
<shard>
<replica>
<host>s0_0_0</host>
<port>9000</port>
</replica>
<replica>
<host>s0_0_1</host>
<port>9000</port>
</replica>
</shard>
</first_shard>
</remote_servers>
<macros>
<default_cluster_macro>cluster_simple</default_cluster_macro>
</macros>
</clickhouse>
</clickhouse>

View File

@ -1,9 +1,5 @@
from email.errors import HeaderParseError
import logging
import os
import csv
import shutil
import time
import pytest
from helpers.cluster import ClickHouseCluster
@ -23,21 +19,6 @@ S3_DATA = [
def create_buckets_s3(cluster):
minio = cluster.minio_client
for file_number in range(100):
file_name = f"data/generated/file_{file_number}.csv"
os.makedirs(os.path.join(SCRIPT_DIR, "data/generated/"), exist_ok=True)
S3_DATA.append(file_name)
with open(os.path.join(SCRIPT_DIR, file_name), "w+", encoding="utf-8") as f:
# a String, b UInt64
data = []
for number in range(100):
data.append([str(number) * 10, number])
writer = csv.writer(f)
writer.writerows(data)
for file in S3_DATA:
minio.fput_object(
bucket_name=cluster.minio_bucket,
@ -53,24 +34,10 @@ def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"s0_0_0",
main_configs=["configs/cluster.xml"],
macros={"replica": "node1", "shard": "shard1"},
with_minio=True,
with_zookeeper=True,
)
cluster.add_instance(
"s0_0_1",
main_configs=["configs/cluster.xml"],
macros={"replica": "replica2", "shard": "shard1"},
with_zookeeper=True,
)
cluster.add_instance(
"s0_1_0",
main_configs=["configs/cluster.xml"],
macros={"replica": "replica1", "shard": "shard2"},
with_zookeeper=True,
"s0_0_0", main_configs=["configs/cluster.xml"], with_minio=True
)
cluster.add_instance("s0_0_1", main_configs=["configs/cluster.xml"])
cluster.add_instance("s0_1_0", main_configs=["configs/cluster.xml"])
logging.info("Starting cluster...")
cluster.start()
@ -80,7 +47,6 @@ def started_cluster():
yield cluster
finally:
shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/"))
cluster.shutdown()
@ -89,17 +55,17 @@ def test_select_all(started_cluster):
pure_s3 = node.query(
"""
SELECT * from s3(
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
ORDER BY (name, value, polygon)"""
)
# print(pure_s3)
s3_distibuted = node.query(
"""
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') ORDER BY (name, value, polygon)"""
)
# print(s3_distibuted)
@ -112,15 +78,15 @@ def test_count(started_cluster):
pure_s3 = node.query(
"""
SELECT count(*) from s3(
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')"""
)
# print(pure_s3)
s3_distibuted = node.query(
"""
SELECT count(*) from s3Cluster(
'cluster_simple', 'http://minio1:9001/root/data/{clickhouse,database}/*',
'cluster_simple', 'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')"""
)
@ -159,13 +125,13 @@ def test_union_all(started_cluster):
SELECT * FROM
(
SELECT * from s3(
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
UNION ALL
SELECT * from s3(
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
)
ORDER BY (name, value, polygon)
@ -177,13 +143,13 @@ def test_union_all(started_cluster):
SELECT * FROM
(
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
UNION ALL
SELECT * from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
)
ORDER BY (name, value, polygon)
@ -200,12 +166,12 @@ def test_wrong_cluster(started_cluster):
"""
SELECT count(*) from s3Cluster(
'non_existent_cluster',
'http://minio1:9001/root/data/{clickhouse,database}/*',
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
UNION ALL
SELECT count(*) from s3Cluster(
'non_existent_cluster',
'http://minio1:9001/root/data/{clickhouse,database}/*',
'http://minio1:9001/root/data/{clickhouse,database}/*',
'minio', 'minio123', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
"""
)
@ -218,139 +184,14 @@ def test_ambiguous_join(started_cluster):
result = node.query(
"""
SELECT l.name, r.value from s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') as l
JOIN s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'cluster_simple',
'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') as r
ON l.name = r.name
"""
)
assert "AMBIGUOUS_COLUMN_NAME" not in result
def test_distributed_insert_select(started_cluster):
first_replica_first_shard = started_cluster.instances["s0_0_0"]
second_replica_first_shard = started_cluster.instances["s0_0_1"]
first_replica_second_shard = started_cluster.instances["s0_1_0"]
first_replica_first_shard.query(
"""DROP TABLE IF EXISTS insert_select_local ON CLUSTER 'cluster_simple';"""
)
first_replica_first_shard.query(
"""DROP TABLE IF EXISTS insert_select_distributed ON CLUSTER 'cluster_simple';"""
)
first_replica_first_shard.query(
"""
CREATE TABLE insert_select_local ON CLUSTER 'cluster_simple' (a String, b UInt64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/insert_select', '{replica}')
ORDER BY (a, b);
"""
)
first_replica_first_shard.query(
"""
CREATE TABLE insert_select_distributed ON CLUSTER 'cluster_simple' as insert_select_local
ENGINE = Distributed('cluster_simple', default, insert_select_local, b % 2);
"""
)
first_replica_first_shard.query(
"""
INSERT INTO insert_select_distributed SETTINGS insert_distributed_sync=1 SELECT * FROM s3Cluster(
'cluster_simple',
'http://minio1:9001/root/data/generated/*.csv', 'minio', 'minio123', 'CSV','a String, b UInt64'
) SETTINGS parallel_distributed_insert_select=1, insert_distributed_sync=1;
"""
)
for line in (
first_replica_first_shard.query("""SELECT * FROM insert_select_local;""")
.strip()
.split("\n")
):
_, b = line.split()
assert int(b) % 2 == 0
for line in (
second_replica_first_shard.query("""SELECT * FROM insert_select_local;""")
.strip()
.split("\n")
):
_, b = line.split()
assert int(b) % 2 == 0
for line in (
first_replica_second_shard.query("""SELECT * FROM insert_select_local;""")
.strip()
.split("\n")
):
_, b = line.split()
assert int(b) % 2 == 1
first_replica_first_shard.query(
"""DROP TABLE IF EXISTS insert_select_local ON CLUSTER 'cluster_simple';"""
)
first_replica_first_shard.query(
"""DROP TABLE IF EXISTS insert_select_distributed ON CLUSTER 'cluster_simple';"""
)
def test_distributed_insert_select_with_replicated(started_cluster):
first_replica_first_shard = started_cluster.instances["s0_0_0"]
second_replica_first_shard = started_cluster.instances["s0_0_1"]
first_replica_first_shard.query(
"""DROP TABLE IF EXISTS insert_select_replicated_local ON CLUSTER 'first_shard';"""
)
first_replica_first_shard.query(
"""
CREATE TABLE insert_select_replicated_local ON CLUSTER 'first_shard' (a String, b UInt64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/insert_select_with_replicated', '{replica}')
ORDER BY (a, b);
"""
)
for replica in [first_replica_first_shard, second_replica_first_shard]:
replica.query(
"""
SYSTEM STOP FETCHES;
"""
)
replica.query(
"""
SYSTEM STOP MERGES;
"""
)
first_replica_first_shard.query(
"""
INSERT INTO insert_select_replicated_local SELECT * FROM s3Cluster(
'first_shard',
'http://minio1:9001/root/data/generated_replicated/*.csv', 'minio', 'minio123', 'CSV','a String, b UInt64'
) SETTINGS parallel_distributed_insert_select=1;
"""
)
for replica in [first_replica_first_shard, second_replica_first_shard]:
replica.query(
"""
SYSTEM FLUSH LOGS;
"""
)
second = int(
second_replica_first_shard.query(
"""SELECT count(*) FROM system.query_log WHERE not is_initial_query and query like '%s3Cluster%';"""
).strip()
)
assert second != 0
first_replica_first_shard.query(
"""DROP TABLE IF EXISTS insert_select_replicated_local ON CLUSTER 'first_shard';"""
)

View File

@ -8,7 +8,7 @@
<value>SHA224</value>
<value>SHA256</value>
<value>halfMD5</value>
<value>blake3</value>
<value>BLAKE3</value>
</values>
</substitution>
<substitution>

View File

@ -1,5 +1,5 @@
-- Tags: no-fasttest
SELECT hex(blake3('test_1'));
SELECT hex(blake3('test_2'));
SELECT hex(blake3('test_3'));
SELECT hex(BLAKE3('test_1'));
SELECT hex(BLAKE3('test_2'));
SELECT hex(BLAKE3('test_3'));