mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge branch 'master' into stress-thread-fuzzer
This commit is contained in:
commit
51e98f747c
49
SECURITY.md
49
SECURITY.md
@ -1,9 +1,11 @@
|
||||
# Security Policy
|
||||
|
||||
## Supported Versions
|
||||
## Security Announcements
|
||||
Security fixes will be announced by posting them in the [security changelog](https://clickhouse.tech/docs/en/whats-new/security-changelog/)
|
||||
|
||||
The following versions of ClickHouse server are
|
||||
currently being supported with security updates:
|
||||
## Scope and Supported Versions
|
||||
|
||||
The following versions of ClickHouse server are currently being supported with security updates:
|
||||
|
||||
| Version | Supported |
|
||||
| ------- | ------------------ |
|
||||
@ -11,18 +13,49 @@ currently being supported with security updates:
|
||||
| 18.x | :x: |
|
||||
| 19.x | :x: |
|
||||
| 20.1 | :x: |
|
||||
| 20.3 | :white_check_mark: |
|
||||
| 20.3 | :x: |
|
||||
| 20.4 | :x: |
|
||||
| 20.5 | :x: |
|
||||
| 20.6 | :x: |
|
||||
| 20.7 | :x: |
|
||||
| 20.8 | :white_check_mark: |
|
||||
| 20.8 | :x: |
|
||||
| 20.9 | :x: |
|
||||
| 20.10 | :x: |
|
||||
| 20.11 | :white_check_mark: |
|
||||
| 20.12 | :white_check_mark: |
|
||||
| 21.1 | :white_check_mark: |
|
||||
| 20.11 | :x: |
|
||||
| 20.12 | :x: |
|
||||
| 21.1 | :x: |
|
||||
| 21.2 | :x: |
|
||||
| 21.3 | ✅ |
|
||||
| 21.4 | :x: |
|
||||
| 21.5 | :x: |
|
||||
| 21.6 | ✅ |
|
||||
| 21.7 | ✅ |
|
||||
| 21.8 | ✅ |
|
||||
|
||||
## Reporting a Vulnerability
|
||||
|
||||
We're extremely grateful for security researchers and users that report vulnerabilities to the ClickHouse Open Source Community. All reports are thoroughly investigated by developers.
|
||||
|
||||
To report a potential vulnerability in ClickHouse please send the details about it to [clickhouse-feedback@yandex-team.com](mailto:clickhouse-feedback@yandex-team.com).
|
||||
|
||||
### When Should I Report a Vulnerability?
|
||||
|
||||
- You think you discovered a potential security vulnerability in ClickHouse
|
||||
- You are unsure how a vulnerability affects ClickHouse
|
||||
|
||||
### When Should I NOT Report a Vulnerability?
|
||||
|
||||
- You need help tuning ClickHouse components for security
|
||||
- You need help applying security related updates
|
||||
- Your issue is not security related
|
||||
|
||||
## Security Vulnerability Response
|
||||
|
||||
Each report is acknowledged and analyzed by ClickHouse maintainers within 5 working days.
|
||||
As the security issue moves from triage, to identified fix, to release planning we will keep the reporter updated.
|
||||
|
||||
## Public Disclosure Timing
|
||||
|
||||
A public disclosure date is negotiated by the ClickHouse maintainers and the bug submitter. We prefer to fully disclose the bug as soon as possible once a user mitigation is available. It is reasonable to delay disclosure when the bug or the fix is not yet fully understood, the solution is not well-tested, or for vendor coordination. The timeframe for disclosure is from immediate (especially if it's already publicly known) to 90 days. For a vulnerability with a straightforward mitigation, we expect report date to disclosure date to be on the order of 7 days.
|
||||
|
||||
|
||||
|
10
base/common/unit.h
Normal file
10
base/common/unit.h
Normal file
@ -0,0 +1,10 @@
|
||||
#pragma once
|
||||
#include <cstddef>
|
||||
|
||||
constexpr size_t KiB = 1024;
|
||||
constexpr size_t MiB = 1024 * KiB;
|
||||
constexpr size_t GiB = 1024 * MiB;
|
||||
|
||||
constexpr size_t operator"" _KiB(unsigned long long val) { return val * KiB; }
|
||||
constexpr size_t operator"" _MiB(unsigned long long val) { return val * MiB; }
|
||||
constexpr size_t operator"" _GiB(unsigned long long val) { return val * GiB; }
|
@ -2,6 +2,8 @@ FROM ubuntu:20.04
|
||||
|
||||
ENV DEBIAN_FRONTEND=noninteractive LLVM_VERSION=11
|
||||
|
||||
RUN sed -i 's|http://archive|http://ru.archive|g' /etc/apt/sources.list
|
||||
|
||||
RUN apt-get update \
|
||||
&& apt-get install ca-certificates lsb-release wget gnupg apt-transport-https \
|
||||
--yes --no-install-recommends --verbose-versions \
|
||||
|
@ -3,6 +3,8 @@ FROM ubuntu:18.04
|
||||
ARG repository="deb https://repo.clickhouse.tech/deb/stable/ main/"
|
||||
ARG version=21.10.1.*
|
||||
|
||||
RUN sed -i 's|http://archive|http://ru.archive|g' /etc/apt/sources.list
|
||||
|
||||
RUN apt-get update \
|
||||
&& apt-get install --yes --no-install-recommends \
|
||||
apt-transport-https \
|
||||
|
@ -3,6 +3,8 @@ FROM ubuntu:20.04
|
||||
|
||||
ENV DEBIAN_FRONTEND=noninteractive LLVM_VERSION=11
|
||||
|
||||
RUN sed -i 's|http://archive|http://ru.archive|g' /etc/apt/sources.list
|
||||
|
||||
RUN apt-get update \
|
||||
&& apt-get install \
|
||||
apt-transport-https \
|
||||
|
@ -3,6 +3,8 @@ FROM ubuntu:20.04
|
||||
|
||||
ENV DEBIAN_FRONTEND=noninteractive LLVM_VERSION=11
|
||||
|
||||
RUN sed -i 's|http://archive|http://ru.archive|g' /etc/apt/sources.list
|
||||
|
||||
RUN apt-get update \
|
||||
&& apt-get install ca-certificates lsb-release wget gnupg apt-transport-https \
|
||||
--yes --no-install-recommends --verbose-versions \
|
||||
|
@ -5,6 +5,8 @@ RUN export CODENAME="$(lsb_release --codename --short | tr 'A-Z' 'a-z')" \
|
||||
&& wget -nv -O /tmp/arrow-keyring.deb "https://apache.jfrog.io/artifactory/arrow/ubuntu/apache-arrow-apt-source-latest-${CODENAME}.deb" \
|
||||
&& dpkg -i /tmp/arrow-keyring.deb
|
||||
|
||||
RUN sed -i 's|http://archive|http://ru.archive|g' /etc/apt/sources.list
|
||||
|
||||
# Libraries from OS are only needed to test the "unbundled" build (that is not used in production).
|
||||
RUN apt-get update \
|
||||
&& apt-get install \
|
||||
|
@ -26,6 +26,8 @@ ARG DEBIAN_FRONTEND=noninteractive
|
||||
# installed to prevent picking those uid / gid by some unrelated software.
|
||||
# The same uid / gid (101) is used both for alpine and ubuntu.
|
||||
|
||||
RUN sed -i 's|http://archive|http://ru.archive|g' /etc/apt/sources.list
|
||||
|
||||
RUN groupadd -r clickhouse --gid=101 \
|
||||
&& useradd -r -g clickhouse --uid=101 --home-dir=/var/lib/clickhouse --shell=/bin/bash clickhouse \
|
||||
&& apt-get update \
|
||||
|
@ -3,6 +3,8 @@ FROM ubuntu:20.04
|
||||
|
||||
ENV DEBIAN_FRONTEND=noninteractive LLVM_VERSION=11
|
||||
|
||||
RUN sed -i 's|http://archive|http://ru.archive|g' /etc/apt/sources.list
|
||||
|
||||
RUN apt-get update \
|
||||
&& apt-get install ca-certificates lsb-release wget gnupg apt-transport-https \
|
||||
--yes --no-install-recommends --verbose-versions \
|
||||
|
@ -2,6 +2,8 @@
|
||||
# docker run --volume=path_to_repo:/repo_folder --volume=path_to_result:/test_output yandex/clickhouse-codebrowser
|
||||
FROM yandex/clickhouse-binary-builder
|
||||
|
||||
RUN sed -i 's|http://archive|http://ru.archive|g' /etc/apt/sources.list
|
||||
|
||||
RUN apt-get update && apt-get --yes --allow-unauthenticated install clang-9 libllvm9 libclang-9-dev
|
||||
|
||||
# repo versions doesn't work correctly with C++17
|
||||
|
@ -3,6 +3,8 @@ FROM ubuntu:20.04
|
||||
|
||||
ENV DEBIAN_FRONTEND=noninteractive LLVM_VERSION=11
|
||||
|
||||
RUN sed -i 's|http://archive|http://ru.archive|g' /etc/apt/sources.list
|
||||
|
||||
RUN apt-get update \
|
||||
&& apt-get install ca-certificates lsb-release wget gnupg apt-transport-https \
|
||||
--yes --no-install-recommends --verbose-versions \
|
||||
|
@ -5,6 +5,8 @@ ENV LANG=C.UTF-8
|
||||
ENV TZ=Europe/Moscow
|
||||
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
|
||||
|
||||
RUN sed -i 's|http://archive|http://ru.archive|g' /etc/apt/sources.list
|
||||
|
||||
RUN apt-get update \
|
||||
&& DEBIAN_FRONTEND=noninteractive apt-get install --yes --no-install-recommends \
|
||||
ca-certificates \
|
||||
|
@ -1,6 +1,8 @@
|
||||
# docker build -t yandex/clickhouse-integration-tests-runner .
|
||||
FROM ubuntu:20.04
|
||||
|
||||
RUN sed -i 's|http://archive|http://ru.archive|g' /etc/apt/sources.list
|
||||
|
||||
RUN apt-get update \
|
||||
&& env DEBIAN_FRONTEND=noninteractive apt-get install --yes \
|
||||
ca-certificates \
|
||||
|
@ -5,6 +5,8 @@ ENV LANG=C.UTF-8
|
||||
ENV TZ=Europe/Moscow
|
||||
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
|
||||
|
||||
RUN sed -i 's|http://archive|http://ru.archive|g' /etc/apt/sources.list
|
||||
|
||||
RUN apt-get update \
|
||||
&& DEBIAN_FRONTEND=noninteractive apt-get install --yes --no-install-recommends \
|
||||
bash \
|
||||
|
@ -1,6 +1,8 @@
|
||||
# docker build -t yandex/clickhouse-sqlancer-test .
|
||||
FROM ubuntu:20.04
|
||||
|
||||
RUN sed -i 's|http://archive|http://ru.archive|g' /etc/apt/sources.list
|
||||
|
||||
RUN apt-get update --yes && env DEBIAN_FRONTEND=noninteractive apt-get install wget unzip git openjdk-14-jdk maven python3 --yes --no-install-recommends
|
||||
RUN wget https://github.com/sqlancer/sqlancer/archive/master.zip -O /sqlancer.zip
|
||||
RUN mkdir /sqlancer && \
|
||||
|
@ -1,6 +1,8 @@
|
||||
# docker build -t yandex/clickhouse-style-test .
|
||||
FROM ubuntu:20.04
|
||||
|
||||
RUN sed -i 's|http://archive|http://ru.archive|g' /etc/apt/sources.list
|
||||
|
||||
RUN apt-get update && env DEBIAN_FRONTEND=noninteractive apt-get install --yes \
|
||||
shellcheck \
|
||||
libxml2-utils \
|
||||
|
@ -1,6 +1,8 @@
|
||||
# docker build -t yandex/clickhouse-testflows-runner .
|
||||
FROM ubuntu:20.04
|
||||
|
||||
RUN sed -i 's|http://archive|http://ru.archive|g' /etc/apt/sources.list
|
||||
|
||||
RUN apt-get update \
|
||||
&& env DEBIAN_FRONTEND=noninteractive apt-get install --yes \
|
||||
ca-certificates \
|
||||
|
@ -2041,10 +2041,25 @@ Default value: 0.
|
||||
|
||||
## input_format_parallel_parsing {#input-format-parallel-parsing}
|
||||
|
||||
- Type: bool
|
||||
- Default value: True
|
||||
Enables or disables order-preserving parallel parsing of data formats. Supported only for [TSV](../../interfaces/formats.md#tabseparated), [TKSV](../../interfaces/formats.md#tskv), [CSV](../../interfaces/formats.md#csv) and [JSONEachRow](../../interfaces/formats.md#jsoneachrow) formats.
|
||||
|
||||
Enable order-preserving parallel parsing of data formats. Supported only for TSV, TKSV, CSV, and JSONEachRow formats.
|
||||
Possible values:
|
||||
|
||||
- 1 — Enabled.
|
||||
- 0 — Disabled.
|
||||
|
||||
Default value: `0`.
|
||||
|
||||
## output_format_parallel_formatting {#output-format-parallel-formatting}
|
||||
|
||||
Enables or disables parallel formatting of data formats. Supported only for [TSV](../../interfaces/formats.md#tabseparated), [TKSV](../../interfaces/formats.md#tskv), [CSV](../../interfaces/formats.md#csv) and [JSONEachRow](../../interfaces/formats.md#jsoneachrow) formats.
|
||||
|
||||
Possible values:
|
||||
|
||||
- 1 — Enabled.
|
||||
- 0 — Disabled.
|
||||
|
||||
Default value: `0`.
|
||||
|
||||
## min_chunk_bytes_for_parallel_parsing {#min-chunk-bytes-for-parallel-parsing}
|
||||
|
||||
|
@ -6,7 +6,7 @@ toc_title: JOIN
|
||||
|
||||
Join produces a new table by combining columns from one or multiple tables by using values common to each. It is a common operation in databases with SQL support, which corresponds to [relational algebra](https://en.wikipedia.org/wiki/Relational_algebra#Joins_and_join-like_operators) join. The special case of one table join is often referred to as “self-join”.
|
||||
|
||||
Syntax:
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
SELECT <expr_list>
|
||||
@ -38,7 +38,7 @@ Additional join types available in ClickHouse:
|
||||
|
||||
## Settings {#join-settings}
|
||||
|
||||
The default join type can be overriden using [join_default_strictness](../../../operations/settings/settings.md#settings-join_default_strictness) setting.
|
||||
The default join type can be overridden using [join_default_strictness](../../../operations/settings/settings.md#settings-join_default_strictness) setting.
|
||||
|
||||
The behavior of ClickHouse server for `ANY JOIN` operations depends on the [any_join_distinct_right_table_keys](../../../operations/settings/settings.md#any_join_distinct_right_table_keys) setting.
|
||||
|
||||
@ -52,6 +52,61 @@ The behavior of ClickHouse server for `ANY JOIN` operations depends on the [any_
|
||||
- [join_on_disk_max_files_to_merge](../../../operations/settings/settings.md#join_on_disk_max_files_to_merge)
|
||||
- [any_join_distinct_right_table_keys](../../../operations/settings/settings.md#any_join_distinct_right_table_keys)
|
||||
|
||||
## ON Section Conditions {on-section-conditions}
|
||||
|
||||
An `ON` section can contain several conditions combined using the `AND` operator. Conditions specifying join keys must refer both left and right tables and must use the equality operator. Other conditions may use other logical operators but they must refer either the left or the right table of a query.
|
||||
Rows are joined if the whole complex condition is met. If the conditions are not met, still rows may be included in the result depending on the `JOIN` type. Note that if the same conditions are placed in a `WHERE` section and they are not met, then rows are always filtered out from the result.
|
||||
|
||||
!!! note "Note"
|
||||
The `OR` operator inside an `ON` section is not supported yet.
|
||||
|
||||
!!! note "Note"
|
||||
If a condition refers columns from different tables, then only the equality operator (`=`) is supported so far.
|
||||
|
||||
**Example**
|
||||
|
||||
Consider `table_1` and `table_2`:
|
||||
|
||||
```
|
||||
┌─Id─┬─name─┐ ┌─Id─┬─text───────────┬─scores─┐
|
||||
│ 1 │ A │ │ 1 │ Text A │ 10 │
|
||||
│ 2 │ B │ │ 1 │ Another text A │ 12 │
|
||||
│ 3 │ C │ │ 2 │ Text B │ 15 │
|
||||
└────┴──────┘ └────┴────────────────┴────────┘
|
||||
```
|
||||
|
||||
Query with one join key condition and an additional condition for `table_2`:
|
||||
|
||||
``` sql
|
||||
SELECT name, text FROM table_1 LEFT OUTER JOIN table_2
|
||||
ON table_1.Id = table_2.Id AND startsWith(table_2.text, 'Text');
|
||||
```
|
||||
|
||||
Note that the result contains the row with the name `C` and the empty text column. It is included into the result because an `OUTER` type of a join is used.
|
||||
|
||||
```
|
||||
┌─name─┬─text───┐
|
||||
│ A │ Text A │
|
||||
│ B │ Text B │
|
||||
│ C │ │
|
||||
└──────┴────────┘
|
||||
```
|
||||
|
||||
Query with `INNER` type of a join and multiple conditions:
|
||||
|
||||
``` sql
|
||||
SELECT name, text, scores FROM table_1 INNER JOIN table_2
|
||||
ON table_1.Id = table_2.Id AND table_2.scores > 10 AND startsWith(table_2.text, 'Text');
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
```
|
||||
┌─name─┬─text───┬─scores─┐
|
||||
│ B │ Text B │ 15 │
|
||||
└──────┴────────┴────────┘
|
||||
```
|
||||
|
||||
## ASOF JOIN Usage {#asof-join-usage}
|
||||
|
||||
`ASOF JOIN` is useful when you need to join records that have no exact match.
|
||||
@ -59,7 +114,7 @@ The behavior of ClickHouse server for `ANY JOIN` operations depends on the [any_
|
||||
Algorithm requires the special column in tables. This column:
|
||||
|
||||
- Must contain an ordered sequence.
|
||||
- Can be one of the following types: [Int*, UInt*](../../../sql-reference/data-types/int-uint.md), [Float\*](../../../sql-reference/data-types/float.md), [Date](../../../sql-reference/data-types/date.md), [DateTime](../../../sql-reference/data-types/datetime.md), [Decimal\*](../../../sql-reference/data-types/decimal.md).
|
||||
- Can be one of the following types: [Int, UInt](../../../sql-reference/data-types/int-uint.md), [Float](../../../sql-reference/data-types/float.md), [Date](../../../sql-reference/data-types/date.md), [DateTime](../../../sql-reference/data-types/datetime.md), [Decimal](../../../sql-reference/data-types/decimal.md).
|
||||
- Can’t be the only column in the `JOIN` clause.
|
||||
|
||||
Syntax `ASOF JOIN ... ON`:
|
||||
@ -84,7 +139,7 @@ ASOF JOIN table_2
|
||||
USING (equi_column1, ... equi_columnN, asof_column)
|
||||
```
|
||||
|
||||
`ASOF JOIN` uses `equi_columnX` for joining on equality and `asof_column` for joining on the closest match with the `table_1.asof_column >= table_2.asof_column` condition. The `asof_column` column always the last one in the `USING` clause.
|
||||
`ASOF JOIN` uses `equi_columnX` for joining on equality and `asof_column` for joining on the closest match with the `table_1.asof_column >= table_2.asof_column` condition. The `asof_column` column is always the last one in the `USING` clause.
|
||||
|
||||
For example, consider the following tables:
|
||||
|
||||
|
@ -1865,10 +1865,25 @@ ClickHouse генерирует исключение
|
||||
|
||||
## input_format_parallel_parsing {#input-format-parallel-parsing}
|
||||
|
||||
- Тип: bool
|
||||
- Значение по умолчанию: True
|
||||
Включает или отключает режим, при котором входящие данные разбиваются на части, парсинг каждой из которых осуществляется параллельно с сохранением исходного порядка. Поддерживается только для форматов [TSV](../../interfaces/formats.md#tabseparated), [TKSV](../../interfaces/formats.md#tskv), [CSV](../../interfaces/formats.md#csv) и [JSONEachRow](../../interfaces/formats.md#jsoneachrow).
|
||||
|
||||
Включает режим, при котором входящие данные парсятся параллельно, но с сохранением исходного порядка следования. Поддерживается только для форматов TSV, TKSV, CSV и JSONEachRow.
|
||||
Возможные значения:
|
||||
|
||||
- 1 — включен режим параллельного разбора.
|
||||
- 0 — отключен режим параллельного разбора.
|
||||
|
||||
Значение по умолчанию: `0`.
|
||||
|
||||
## output_format_parallel_formatting {#output-format-parallel-formatting}
|
||||
|
||||
Включает или отключает режим, при котором исходящие данные форматируются параллельно с сохранением исходного порядка. Поддерживается только для форматов [TSV](../../interfaces/formats.md#tabseparated), [TKSV](../../interfaces/formats.md#tskv), [CSV](../../interfaces/formats.md#csv) и [JSONEachRow](../../interfaces/formats.md#jsoneachrow).
|
||||
|
||||
Возможные значения:
|
||||
|
||||
- 1 — включен режим параллельного форматирования.
|
||||
- 0 — отключен режим параллельного форматирования.
|
||||
|
||||
Значение по умолчанию: `0`.
|
||||
|
||||
## min_chunk_bytes_for_parallel_parsing {#min-chunk-bytes-for-parallel-parsing}
|
||||
|
||||
|
@ -6,7 +6,7 @@ toc_title: JOIN
|
||||
|
||||
`JOIN` создаёт новую таблицу путем объединения столбцов из одной или нескольких таблиц с использованием общих для каждой из них значений. Это обычная операция в базах данных с поддержкой SQL, которая соответствует join из [реляционной алгебры](https://en.wikipedia.org/wiki/Relational_algebra#Joins_and_join-like_operators). Частный случай соединения одной таблицы часто называют self-join.
|
||||
|
||||
Синтаксис:
|
||||
**Синтаксис**
|
||||
|
||||
``` sql
|
||||
SELECT <expr_list>
|
||||
@ -19,7 +19,7 @@ FROM <left_table>
|
||||
|
||||
## Поддерживаемые типы соединения {#select-join-types}
|
||||
|
||||
Все типы из стандартого [SQL JOIN](https://en.wikipedia.org/wiki/Join_(SQL)) поддерживаются:
|
||||
Все типы из стандартного [SQL JOIN](https://en.wikipedia.org/wiki/Join_(SQL)) поддерживаются:
|
||||
|
||||
- `INNER JOIN`, возвращаются только совпадающие строки.
|
||||
- `LEFT OUTER JOIN`, не совпадающие строки из левой таблицы возвращаются в дополнение к совпадающим строкам.
|
||||
@ -33,7 +33,7 @@ FROM <left_table>
|
||||
|
||||
- `LEFT SEMI JOIN` и `RIGHT SEMI JOIN`, белый список по ключам соединения, не производит декартово произведение.
|
||||
- `LEFT ANTI JOIN` и `RIGHT ANTI JOIN`, черный список по ключам соединения, не производит декартово произведение.
|
||||
- `LEFT ANY JOIN`, `RIGHT ANY JOIN` и `INNER ANY JOIN`, Частично (для противоположных сторон `LEFT` и `RIGHT`) или полностью (для `INNER` и `FULL`) отключает декартово произведение для стандартых видов `JOIN`.
|
||||
- `LEFT ANY JOIN`, `RIGHT ANY JOIN` и `INNER ANY JOIN`, Частично (для противоположных сторон `LEFT` и `RIGHT`) или полностью (для `INNER` и `FULL`) отключает декартово произведение для стандартных видов `JOIN`.
|
||||
- `ASOF JOIN` и `LEFT ASOF JOIN`, Для соединения последовательностей по нечеткому совпадению. Использование `ASOF JOIN` описано ниже.
|
||||
|
||||
## Настройки {#join-settings}
|
||||
@ -52,6 +52,61 @@ FROM <left_table>
|
||||
- [join_on_disk_max_files_to_merge](../../../operations/settings/settings.md#join_on_disk_max_files_to_merge)
|
||||
- [any_join_distinct_right_table_keys](../../../operations/settings/settings.md#any_join_distinct_right_table_keys)
|
||||
|
||||
## Условия в секции ON {on-section-conditions}
|
||||
|
||||
Секция `ON` может содержать несколько условий, связанных оператором `AND`. Условия, задающие ключи соединения, должны содержать столбцы левой и правой таблицы и должны использовать оператор равенства. Прочие условия могут использовать другие логические операторы, но в отдельном условии могут использоваться столбцы либо только левой, либо только правой таблицы.
|
||||
Строки объединяются только тогда, когда всё составное условие выполнено. Если оно не выполнено, то строки могут попасть в результат в зависимости от типа `JOIN`. Обратите внимание, что если то же самое условие поместить в секцию `WHERE`, то строки, для которых оно не выполняется, никогда не попаду в результат.
|
||||
|
||||
!!! note "Примечание"
|
||||
Оператор `OR` внутри секции `ON` пока не поддерживается.
|
||||
|
||||
!!! note "Примечание"
|
||||
Если в условии использованы столбцы из разных таблиц, то пока поддерживается только оператор равенства (`=`).
|
||||
|
||||
**Пример**
|
||||
|
||||
Рассмотрим `table_1` и `table_2`:
|
||||
|
||||
```
|
||||
┌─Id─┬─name─┐ ┌─Id─┬─text───────────┬─scores─┐
|
||||
│ 1 │ A │ │ 1 │ Text A │ 10 │
|
||||
│ 2 │ B │ │ 1 │ Another text A │ 12 │
|
||||
│ 3 │ C │ │ 2 │ Text B │ 15 │
|
||||
└────┴──────┘ └────┴────────────────┴────────┘
|
||||
```
|
||||
|
||||
Запрос с одним условием, задающим ключ соединения, и дополнительным условием для `table_2`:
|
||||
|
||||
``` sql
|
||||
SELECT name, text FROM table_1 LEFT OUTER JOIN table_2
|
||||
ON table_1.Id = table_2.Id AND startsWith(table_2.text, 'Text');
|
||||
```
|
||||
|
||||
Обратите внимание, что результат содержит строку с именем `C` и пустым текстом. Строка включена в результат, потому что использован тип соединения `OUTER`.
|
||||
|
||||
```
|
||||
┌─name─┬─text───┐
|
||||
│ A │ Text A │
|
||||
│ B │ Text B │
|
||||
│ C │ │
|
||||
└──────┴────────┘
|
||||
```
|
||||
|
||||
Запрос с типом соединения `INNER` и несколькими условиями:
|
||||
|
||||
``` sql
|
||||
SELECT name, text, scores FROM table_1 INNER JOIN table_2
|
||||
ON table_1.Id = table_2.Id AND table_2.scores > 10 AND startsWith(table_2.text, 'Text');
|
||||
```
|
||||
|
||||
Результат:
|
||||
|
||||
```
|
||||
┌─name─┬─text───┬─scores─┐
|
||||
│ B │ Text B │ 15 │
|
||||
└──────┴────────┴────────┘
|
||||
```
|
||||
|
||||
## Использование ASOF JOIN {#asof-join-usage}
|
||||
|
||||
`ASOF JOIN` применим в том случае, когда необходимо объединять записи, которые не имеют точного совпадения.
|
||||
@ -59,7 +114,7 @@ FROM <left_table>
|
||||
Для работы алгоритма необходим специальный столбец в таблицах. Этот столбец:
|
||||
|
||||
- Должен содержать упорядоченную последовательность.
|
||||
- Может быть одного из следующих типов: [Int*, UInt*](../../data-types/int-uint.md), [Float*](../../data-types/float.md), [Date](../../data-types/date.md), [DateTime](../../data-types/datetime.md), [Decimal*](../../data-types/decimal.md).
|
||||
- Может быть одного из следующих типов: [Int, UInt](../../data-types/int-uint.md), [Float](../../data-types/float.md), [Date](../../data-types/date.md), [DateTime](../../data-types/datetime.md), [Decimal](../../data-types/decimal.md).
|
||||
- Не может быть единственным столбцом в секции `JOIN`.
|
||||
|
||||
Синтаксис `ASOF JOIN ... ON`:
|
||||
|
@ -376,8 +376,8 @@ void LocalServer::processQueries()
|
||||
throw Exception("Cannot parse and execute the following part of query: " + String(parse_res.first), ErrorCodes::SYNTAX_ERROR);
|
||||
|
||||
/// Authenticate and create a context to execute queries.
|
||||
Session session{global_context, ClientInfo::Interface::TCP};
|
||||
session.authenticate("default", "", Poco::Net::SocketAddress{});
|
||||
Session session{global_context, ClientInfo::Interface::LOCAL};
|
||||
session.authenticate("default", "", {});
|
||||
|
||||
/// Use the same context for all queries.
|
||||
auto context = session.makeQueryContext();
|
||||
|
@ -56,6 +56,8 @@ template <typename Value, bool float_return> using FuncQuantilesTDigestWeighted
|
||||
template <typename Value, bool float_return> using FuncQuantileBFloat16 = AggregateFunctionQuantile<Value, QuantileBFloat16Histogram<Value>, NameQuantileBFloat16, false, std::conditional_t<float_return, Float64, void>, false>;
|
||||
template <typename Value, bool float_return> using FuncQuantilesBFloat16 = AggregateFunctionQuantile<Value, QuantileBFloat16Histogram<Value>, NameQuantilesBFloat16, false, std::conditional_t<float_return, Float64, void>, true>;
|
||||
|
||||
template <typename Value, bool float_return> using FuncQuantileBFloat16Weighted = AggregateFunctionQuantile<Value, QuantileBFloat16Histogram<Value>, NameQuantileBFloat16Weighted, true, std::conditional_t<float_return, Float64, void>, false>;
|
||||
template <typename Value, bool float_return> using FuncQuantilesBFloat16Weighted = AggregateFunctionQuantile<Value, QuantileBFloat16Histogram<Value>, NameQuantilesBFloat16Weighted, true, std::conditional_t<float_return, Float64, void>, true>;
|
||||
|
||||
template <template <typename, bool> class Function>
|
||||
static constexpr bool supportDecimal()
|
||||
@ -167,6 +169,9 @@ void registerAggregateFunctionsQuantile(AggregateFunctionFactory & factory)
|
||||
factory.registerFunction(NameQuantileBFloat16::name, createAggregateFunctionQuantile<FuncQuantileBFloat16>);
|
||||
factory.registerFunction(NameQuantilesBFloat16::name, { createAggregateFunctionQuantile<FuncQuantilesBFloat16>, properties });
|
||||
|
||||
factory.registerFunction(NameQuantileBFloat16Weighted::name, createAggregateFunctionQuantile<FuncQuantileBFloat16Weighted>);
|
||||
factory.registerFunction(NameQuantilesBFloat16Weighted::name, createAggregateFunctionQuantile<FuncQuantilesBFloat16Weighted>);
|
||||
|
||||
/// 'median' is an alias for 'quantile'
|
||||
factory.registerAlias("median", NameQuantile::name);
|
||||
factory.registerAlias("medianDeterministic", NameQuantileDeterministic::name);
|
||||
@ -179,6 +184,7 @@ void registerAggregateFunctionsQuantile(AggregateFunctionFactory & factory)
|
||||
factory.registerAlias("medianTDigest", NameQuantileTDigest::name);
|
||||
factory.registerAlias("medianTDigestWeighted", NameQuantileTDigestWeighted::name);
|
||||
factory.registerAlias("medianBFloat16", NameQuantileBFloat16::name);
|
||||
factory.registerAlias("medianBFloat16Weighted", NameQuantileBFloat16Weighted::name);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -237,5 +237,7 @@ struct NameQuantilesTDigestWeighted { static constexpr auto name = "quantilesTDi
|
||||
|
||||
struct NameQuantileBFloat16 { static constexpr auto name = "quantileBFloat16"; };
|
||||
struct NameQuantilesBFloat16 { static constexpr auto name = "quantilesBFloat16"; };
|
||||
struct NameQuantileBFloat16Weighted { static constexpr auto name = "quantileBFloat16Weighted"; };
|
||||
struct NameQuantilesBFloat16Weighted { static constexpr auto name = "quantilesBFloat16Weighted"; };
|
||||
|
||||
}
|
||||
|
@ -256,8 +256,8 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
|
||||
if (configuration.is_local)
|
||||
{
|
||||
/// Start local session in case when the dictionary is loaded in-process (without TCP communication).
|
||||
local_session = std::make_shared<Session>(global_context, ClientInfo::Interface::TCP);
|
||||
local_session->authenticate(configuration.user, configuration.password, Poco::Net::SocketAddress{"127.0.0.1", 0});
|
||||
local_session = std::make_shared<Session>(global_context, ClientInfo::Interface::LOCAL);
|
||||
local_session->authenticate(configuration.user, configuration.password, {});
|
||||
context = local_session->makeQueryContext();
|
||||
context->applySettingsChanges(readSettingsFromDictionaryConfig(config, config_prefix));
|
||||
}
|
||||
|
@ -6,25 +6,37 @@
|
||||
#include <bitset>
|
||||
#include <random>
|
||||
#include <utility>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
#include <common/unit.h>
|
||||
|
||||
#include <Common/checkStackSize.h>
|
||||
#include <Common/createHardLink.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
|
||||
#include <Disks/ReadIndirectBufferFromRemoteFS.h>
|
||||
#include <Disks/WriteIndirectBufferFromRemoteFS.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/SeekAvoidingReadBuffer.h>
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Common/createHardLink.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
#include <Common/checkStackSize.h>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
#include <aws/s3/model/CopyObjectRequest.h> // Y_IGNORE
|
||||
#include <aws/s3/model/DeleteObjectsRequest.h> // Y_IGNORE
|
||||
#include <aws/s3/model/GetObjectRequest.h> // Y_IGNORE
|
||||
#include <aws/s3/model/ListObjectsV2Request.h> // Y_IGNORE
|
||||
#include <aws/s3/model/HeadObjectRequest.h> // Y_IGNORE
|
||||
#include <aws/s3/model/CreateMultipartUploadRequest.h> // Y_IGNORE
|
||||
#include <aws/s3/model/CompleteMultipartUploadRequest.h> // Y_IGNORE
|
||||
#include <aws/s3/model/UploadPartCopyRequest.h> // Y_IGNORE
|
||||
#include <aws/s3/model/AbortMultipartUploadRequest.h> // Y_IGNORE
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -388,16 +400,7 @@ void DiskS3::saveSchemaVersion(const int & version)
|
||||
|
||||
void DiskS3::updateObjectMetadata(const String & key, const ObjectMetadata & metadata)
|
||||
{
|
||||
auto settings = current_settings.get();
|
||||
Aws::S3::Model::CopyObjectRequest request;
|
||||
request.SetCopySource(bucket + "/" + key);
|
||||
request.SetBucket(bucket);
|
||||
request.SetKey(key);
|
||||
request.SetMetadata(metadata);
|
||||
request.SetMetadataDirective(Aws::S3::Model::MetadataDirective::REPLACE);
|
||||
|
||||
auto outcome = settings->client->CopyObject(request);
|
||||
throwIfError(outcome);
|
||||
copyObjectImpl(bucket, key, bucket, key, std::nullopt, metadata);
|
||||
}
|
||||
|
||||
void DiskS3::migrateFileToRestorableSchema(const String & path)
|
||||
@ -553,18 +556,124 @@ void DiskS3::listObjects(const String & source_bucket, const String & source_pat
|
||||
} while (outcome.GetResult().GetIsTruncated());
|
||||
}
|
||||
|
||||
void DiskS3::copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key) const
|
||||
void DiskS3::copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
|
||||
std::optional<Aws::S3::Model::HeadObjectResult> head) const
|
||||
{
|
||||
if (head && (head->GetContentLength() >= static_cast<Int64>(5_GiB)))
|
||||
copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, head);
|
||||
else
|
||||
copyObjectImpl(src_bucket, src_key, dst_bucket, dst_key);
|
||||
}
|
||||
|
||||
void DiskS3::copyObjectImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
|
||||
std::optional<Aws::S3::Model::HeadObjectResult> head,
|
||||
std::optional<std::reference_wrapper<const ObjectMetadata>> metadata) const
|
||||
{
|
||||
auto settings = current_settings.get();
|
||||
Aws::S3::Model::CopyObjectRequest request;
|
||||
request.SetCopySource(src_bucket + "/" + src_key);
|
||||
request.SetBucket(dst_bucket);
|
||||
request.SetKey(dst_key);
|
||||
if (metadata)
|
||||
{
|
||||
request.SetMetadata(*metadata);
|
||||
request.SetMetadataDirective(Aws::S3::Model::MetadataDirective::REPLACE);
|
||||
}
|
||||
|
||||
auto outcome = settings->client->CopyObject(request);
|
||||
|
||||
if (!outcome.IsSuccess() && outcome.GetError().GetExceptionName() == "EntityTooLarge")
|
||||
{ // Can't come here with MinIO, MinIO allows single part upload for large objects.
|
||||
copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, head, metadata);
|
||||
return;
|
||||
}
|
||||
|
||||
throwIfError(outcome);
|
||||
}
|
||||
|
||||
void DiskS3::copyObjectMultipartImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
|
||||
std::optional<Aws::S3::Model::HeadObjectResult> head,
|
||||
std::optional<std::reference_wrapper<const ObjectMetadata>> metadata) const
|
||||
{
|
||||
LOG_DEBUG(log, "Multipart copy upload has created. Src Bucket: {}, Src Key: {}, Dst Bucket: {}, Dst Key: {}, Metadata: {}",
|
||||
src_bucket, src_key, dst_bucket, dst_key, metadata ? "REPLACE" : "NOT_SET");
|
||||
|
||||
auto settings = current_settings.get();
|
||||
|
||||
if (!head)
|
||||
head = headObject(src_bucket, src_key);
|
||||
|
||||
size_t size = head->GetContentLength();
|
||||
|
||||
String multipart_upload_id;
|
||||
|
||||
{
|
||||
Aws::S3::Model::CreateMultipartUploadRequest request;
|
||||
request.SetBucket(dst_bucket);
|
||||
request.SetKey(dst_key);
|
||||
if (metadata)
|
||||
request.SetMetadata(*metadata);
|
||||
|
||||
auto outcome = settings->client->CreateMultipartUpload(request);
|
||||
|
||||
throwIfError(outcome);
|
||||
|
||||
multipart_upload_id = outcome.GetResult().GetUploadId();
|
||||
}
|
||||
|
||||
std::vector<String> part_tags;
|
||||
|
||||
size_t upload_part_size = settings->s3_min_upload_part_size;
|
||||
for (size_t position = 0, part_number = 1; position < size; ++part_number, position += upload_part_size)
|
||||
{
|
||||
Aws::S3::Model::UploadPartCopyRequest part_request;
|
||||
part_request.SetCopySource(src_bucket + "/" + src_key);
|
||||
part_request.SetBucket(dst_bucket);
|
||||
part_request.SetKey(dst_key);
|
||||
part_request.SetUploadId(multipart_upload_id);
|
||||
part_request.SetPartNumber(part_number);
|
||||
part_request.SetCopySourceRange(fmt::format("bytes={}-{}", position, std::min(size, position + upload_part_size) - 1));
|
||||
|
||||
auto outcome = settings->client->UploadPartCopy(part_request);
|
||||
if (!outcome.IsSuccess())
|
||||
{
|
||||
Aws::S3::Model::AbortMultipartUploadRequest abort_request;
|
||||
abort_request.SetBucket(dst_bucket);
|
||||
abort_request.SetKey(dst_key);
|
||||
abort_request.SetUploadId(multipart_upload_id);
|
||||
settings->client->AbortMultipartUpload(abort_request);
|
||||
// In error case we throw exception later with first error from UploadPartCopy
|
||||
}
|
||||
throwIfError(outcome);
|
||||
|
||||
auto etag = outcome.GetResult().GetCopyPartResult().GetETag();
|
||||
part_tags.push_back(etag);
|
||||
}
|
||||
|
||||
{
|
||||
Aws::S3::Model::CompleteMultipartUploadRequest req;
|
||||
req.SetBucket(dst_bucket);
|
||||
req.SetKey(dst_key);
|
||||
req.SetUploadId(multipart_upload_id);
|
||||
|
||||
Aws::S3::Model::CompletedMultipartUpload multipart_upload;
|
||||
for (size_t i = 0; i < part_tags.size(); ++i)
|
||||
{
|
||||
Aws::S3::Model::CompletedPart part;
|
||||
multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(i + 1));
|
||||
}
|
||||
|
||||
req.SetMultipartUpload(multipart_upload);
|
||||
|
||||
auto outcome = settings->client->CompleteMultipartUpload(req);
|
||||
|
||||
throwIfError(outcome);
|
||||
|
||||
LOG_DEBUG(log, "Multipart copy upload has completed. Src Bucket: {}, Src Key: {}, Dst Bucket: {}, Dst Key: {}, "
|
||||
"Upload_id: {}, Parts: {}", src_bucket, src_key, dst_bucket, dst_key, multipart_upload_id, part_tags.size());
|
||||
}
|
||||
}
|
||||
|
||||
struct DiskS3::RestoreInformation
|
||||
{
|
||||
UInt64 revision = LATEST_REVISION;
|
||||
@ -757,7 +866,7 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so
|
||||
|
||||
/// Copy object if we restore to different bucket / path.
|
||||
if (bucket != source_bucket || remote_fs_root_path != source_path)
|
||||
copyObject(source_bucket, key, bucket, remote_fs_root_path + relative_key);
|
||||
copyObject(source_bucket, key, bucket, remote_fs_root_path + relative_key, head_result);
|
||||
|
||||
metadata.addObject(relative_key, head_result.GetContentLength());
|
||||
metadata.save();
|
||||
|
@ -7,6 +7,7 @@
|
||||
#if USE_AWS_S3
|
||||
|
||||
#include <atomic>
|
||||
#include <optional>
|
||||
#include <common/logger_useful.h>
|
||||
#include "Disks/DiskFactory.h"
|
||||
#include "Disks/Executor.h"
|
||||
@ -131,7 +132,15 @@ private:
|
||||
|
||||
Aws::S3::Model::HeadObjectResult headObject(const String & source_bucket, const String & key) const;
|
||||
void listObjects(const String & source_bucket, const String & source_path, std::function<bool(const Aws::S3::Model::ListObjectsV2Result &)> callback) const;
|
||||
void copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key) const;
|
||||
void copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
|
||||
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt) const;
|
||||
|
||||
void copyObjectImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
|
||||
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
|
||||
std::optional<std::reference_wrapper<const ObjectMetadata>> metadata = std::nullopt) const;
|
||||
void copyObjectMultipartImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
|
||||
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
|
||||
std::optional<std::reference_wrapper<const ObjectMetadata>> metadata = std::nullopt) const;
|
||||
|
||||
/// Restore S3 metadata files on file system.
|
||||
void restore();
|
||||
|
@ -61,7 +61,7 @@ namespace Regexps
|
||||
template <bool like, bool no_capture, bool case_insensitive = false>
|
||||
inline Pool::Pointer get(const std::string & pattern)
|
||||
{
|
||||
/// C++11 has thread-safe function-local statics on most modern compilers.
|
||||
/// C++11 has thread-safe function-local static on most modern compilers.
|
||||
static Pool known_regexps; /// Different variables for different pattern parameters.
|
||||
|
||||
return known_regexps.get(pattern, [&pattern]
|
||||
@ -257,7 +257,7 @@ namespace MultiRegexps
|
||||
template <bool save_indices, bool CompileForEditDistance>
|
||||
inline Regexps * get(const std::vector<StringRef> & patterns, std::optional<UInt32> edit_distance)
|
||||
{
|
||||
/// C++11 has thread-safe function-local statics on most modern compilers.
|
||||
/// C++11 has thread-safe function-local static on most modern compilers.
|
||||
static Pool known_regexps; /// Different variables for different pattern parameters.
|
||||
|
||||
std::vector<String> str_patterns;
|
||||
|
@ -188,7 +188,7 @@ public:
|
||||
/// Preprocessing can be computationally heavy but dramatically speeds up matching.
|
||||
|
||||
using Pool = ObjectPoolMap<PointInConstPolygonImpl, UInt128>;
|
||||
/// C++11 has thread-safe function-local statics.
|
||||
/// C++11 has thread-safe function-local static.
|
||||
static Pool known_polygons;
|
||||
|
||||
auto factory = [&polygon]()
|
||||
|
@ -85,7 +85,7 @@ private:
|
||||
{
|
||||
auto h3index = h3index_source.getWhole();
|
||||
|
||||
// covert to std::string and get the c_str to have the delimiting \0 at the end.
|
||||
// convert to std::string and get the c_str to have the delimiting \0 at the end.
|
||||
auto h3index_str = StringRef(h3index.data, h3index.size).toString();
|
||||
res_data[row_num] = stringToH3(h3index_str.c_str());
|
||||
|
||||
|
@ -28,6 +28,8 @@ public:
|
||||
GRPC = 3,
|
||||
MYSQL = 4,
|
||||
POSTGRESQL = 5,
|
||||
LOCAL = 6,
|
||||
TCP_INTERSERVER = 7,
|
||||
};
|
||||
|
||||
enum class HTTPMethod : uint8_t
|
||||
|
@ -276,10 +276,14 @@ void Session::authenticate(const Credentials & credentials_, const Poco::Net::So
|
||||
if (session_context)
|
||||
throw Exception("If there is a session context it must be created after authentication", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
user_id = global_context->getAccessControlManager().login(credentials_, address_.host());
|
||||
auto address = address_;
|
||||
if ((address == Poco::Net::SocketAddress{}) && (prepared_client_info->interface == ClientInfo::Interface::LOCAL))
|
||||
address = Poco::Net::SocketAddress{"127.0.0.1", 0};
|
||||
|
||||
user_id = global_context->getAccessControlManager().login(credentials_, address.host());
|
||||
|
||||
prepared_client_info->current_user = credentials_.getUserName();
|
||||
prepared_client_info->current_address = address_;
|
||||
prepared_client_info->current_address = address;
|
||||
|
||||
#if defined(ARCADIA_BUILD)
|
||||
/// This is harmful field that is used only in foreign "Arcadia" build.
|
||||
|
@ -979,6 +979,7 @@ void TCPHandler::receiveHello()
|
||||
is_interserver_mode = (user == USER_INTERSERVER_MARKER);
|
||||
if (is_interserver_mode)
|
||||
{
|
||||
client_info.interface = ClientInfo::Interface::TCP_INTERSERVER;
|
||||
receiveClusterNameAndSalt();
|
||||
return;
|
||||
}
|
||||
|
@ -679,6 +679,9 @@ bool isMetadataOnlyConversion(const IDataType * from, const IDataType * to)
|
||||
|
||||
while (true)
|
||||
{
|
||||
if (from->equals(*to))
|
||||
return true;
|
||||
|
||||
auto it_range = ALLOWED_CONVERSIONS.equal_range(typeid(*from));
|
||||
for (auto it = it_range.first; it != it_range.second; ++it)
|
||||
{
|
||||
@ -697,9 +700,9 @@ bool isMetadataOnlyConversion(const IDataType * from, const IDataType * to)
|
||||
|
||||
const auto * nullable_from = typeid_cast<const DataTypeNullable *>(from);
|
||||
const auto * nullable_to = typeid_cast<const DataTypeNullable *>(to);
|
||||
if (nullable_from && nullable_to)
|
||||
if (nullable_to)
|
||||
{
|
||||
from = nullable_from->getNestedType().get();
|
||||
from = nullable_from ? nullable_from->getNestedType().get() : from;
|
||||
to = nullable_to->getNestedType().get();
|
||||
continue;
|
||||
}
|
||||
|
@ -65,7 +65,7 @@ struct ReplicatedMergeTreeQuorumAddedParts
|
||||
added_parts = readV1(in);
|
||||
}
|
||||
|
||||
/// Read added bloks when node in ZooKeeper supports only one partition.
|
||||
/// Read added blocks when node in ZooKeeper supports only one partition.
|
||||
PartitionIdToPartName readV1(ReadBuffer & in)
|
||||
{
|
||||
PartitionIdToPartName parts_in_quorum;
|
||||
|
@ -49,6 +49,7 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int CANNOT_FSTAT;
|
||||
extern const int CANNOT_TRUNCATE_FILE;
|
||||
extern const int DATABASE_ACCESS_DENIED;
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
@ -164,6 +165,12 @@ bool StorageFile::isColumnOriented() const
|
||||
StorageFile::StorageFile(int table_fd_, CommonArguments args)
|
||||
: StorageFile(args)
|
||||
{
|
||||
struct stat buf;
|
||||
int res = fstat(table_fd_, &buf);
|
||||
if (-1 == res)
|
||||
throwFromErrno("Cannot execute fstat", res, ErrorCodes::CANNOT_FSTAT);
|
||||
total_bytes_to_read = buf.st_size;
|
||||
|
||||
if (args.getContext()->getApplicationType() == Context::ApplicationType::SERVER)
|
||||
throw Exception("Using file descriptor as source of storage isn't allowed for server daemons", ErrorCodes::DATABASE_ACCESS_DENIED);
|
||||
if (args.format_name == "Distributed")
|
||||
@ -208,6 +215,8 @@ StorageFile::StorageFile(const std::string & relative_table_dir_path, CommonArgu
|
||||
String table_dir_path = fs::path(base_path) / relative_table_dir_path / "";
|
||||
fs::create_directories(table_dir_path);
|
||||
paths = {getTablePath(table_dir_path, format_name)};
|
||||
if (fs::exists(paths[0]))
|
||||
total_bytes_to_read = fs::file_size(paths[0]);
|
||||
}
|
||||
|
||||
StorageFile::StorageFile(CommonArguments args)
|
||||
|
@ -8,6 +8,13 @@
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>
|
||||
</s3>
|
||||
<unstable_s3>
|
||||
<type>s3</type>
|
||||
<endpoint>http://resolver:8081/root/data/</endpoint>
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
<s3_max_single_read_retries>10</s3_max_single_read_retries>
|
||||
</unstable_s3>
|
||||
<hdd>
|
||||
<type>local</type>
|
||||
<path>/</path>
|
||||
@ -24,6 +31,13 @@
|
||||
</external>
|
||||
</volumes>
|
||||
</s3>
|
||||
<unstable_s3>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>unstable_s3</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</unstable_s3>
|
||||
</policies>
|
||||
</storage_configuration>
|
||||
|
||||
|
@ -0,0 +1,64 @@
|
||||
import http.client
|
||||
import http.server
|
||||
import random
|
||||
import socketserver
|
||||
import sys
|
||||
import urllib.parse
|
||||
|
||||
|
||||
UPSTREAM_HOST = "minio1:9001"
|
||||
random.seed("Unstable proxy/1.0")
|
||||
|
||||
|
||||
def request(command, url, headers={}, data=None):
|
||||
""" Mini-requests. """
|
||||
class Dummy:
|
||||
pass
|
||||
|
||||
parts = urllib.parse.urlparse(url)
|
||||
c = http.client.HTTPConnection(parts.hostname, parts.port)
|
||||
c.request(command, urllib.parse.urlunparse(parts._replace(scheme='', netloc='')), headers=headers, body=data)
|
||||
r = c.getresponse()
|
||||
result = Dummy()
|
||||
result.status_code = r.status
|
||||
result.headers = r.headers
|
||||
result.content = r.read()
|
||||
return result
|
||||
|
||||
|
||||
class RequestHandler(http.server.BaseHTTPRequestHandler):
|
||||
def do_GET(self):
|
||||
if self.path == "/":
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "text/plain")
|
||||
self.end_headers()
|
||||
self.wfile.write(b"OK")
|
||||
else:
|
||||
self.do_HEAD()
|
||||
|
||||
def do_PUT(self):
|
||||
self.do_HEAD()
|
||||
|
||||
def do_POST(self):
|
||||
self.do_HEAD()
|
||||
|
||||
def do_HEAD(self):
|
||||
content_length = self.headers.get("Content-Length")
|
||||
data = self.rfile.read(int(content_length)) if content_length else None
|
||||
r = request(self.command, f"http://{UPSTREAM_HOST}{self.path}", headers=self.headers, data=data)
|
||||
self.send_response(r.status_code)
|
||||
for k, v in r.headers.items():
|
||||
self.send_header(k, v)
|
||||
self.end_headers()
|
||||
if random.random() < 0.25 and len(r.content) > 1024*1024:
|
||||
r.content = r.content[:len(r.content)//2]
|
||||
self.wfile.write(r.content)
|
||||
self.wfile.close()
|
||||
|
||||
|
||||
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
|
||||
"""Handle requests in a separate thread."""
|
||||
|
||||
|
||||
httpd = ThreadedHTTPServer(("0.0.0.0", int(sys.argv[1])), RequestHandler)
|
||||
httpd.serve_forever()
|
@ -54,6 +54,7 @@ def cluster():
|
||||
logging.info("Starting cluster...")
|
||||
cluster.start()
|
||||
logging.info("Cluster started")
|
||||
run_s3_mocks(cluster)
|
||||
|
||||
yield cluster
|
||||
finally:
|
||||
@ -77,11 +78,17 @@ def generate_values(date_str, count, sign=1):
|
||||
return ",".join(["('{}',{},'{}')".format(x, y, z) for x, y, z in data])
|
||||
|
||||
|
||||
def create_table(cluster, table_name, additional_settings=None):
|
||||
def create_table(cluster, table_name, **additional_settings):
|
||||
node = cluster.instances["node"]
|
||||
settings = {
|
||||
"storage_policy": "s3",
|
||||
"old_parts_lifetime": 0,
|
||||
"index_granularity": 512
|
||||
}
|
||||
settings.update(additional_settings)
|
||||
|
||||
create_table_statement = """
|
||||
CREATE TABLE {} (
|
||||
create_table_statement = f"""
|
||||
CREATE TABLE {table_name} (
|
||||
dt Date,
|
||||
id Int64,
|
||||
data String,
|
||||
@ -89,19 +96,40 @@ def create_table(cluster, table_name, additional_settings=None):
|
||||
) ENGINE=MergeTree()
|
||||
PARTITION BY dt
|
||||
ORDER BY (dt, id)
|
||||
SETTINGS
|
||||
storage_policy='s3',
|
||||
old_parts_lifetime=0,
|
||||
index_granularity=512
|
||||
""".format(table_name)
|
||||
|
||||
if additional_settings:
|
||||
create_table_statement += ","
|
||||
create_table_statement += additional_settings
|
||||
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}"""
|
||||
|
||||
node.query(create_table_statement)
|
||||
|
||||
|
||||
def run_s3_mocks(cluster):
|
||||
logging.info("Starting s3 mocks")
|
||||
mocks = (
|
||||
("unstable_proxy.py", "resolver", "8081"),
|
||||
)
|
||||
for mock_filename, container, port in mocks:
|
||||
container_id = cluster.get_container_id(container)
|
||||
current_dir = os.path.dirname(__file__)
|
||||
cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_mocks", mock_filename), mock_filename)
|
||||
cluster.exec_in_container(container_id, ["python", mock_filename, port], detach=True)
|
||||
|
||||
# Wait for S3 mocks to start
|
||||
for mock_filename, container, port in mocks:
|
||||
num_attempts = 100
|
||||
for attempt in range(num_attempts):
|
||||
ping_response = cluster.exec_in_container(cluster.get_container_id(container),
|
||||
["curl", "-s", f"http://localhost:{port}/"], nothrow=True)
|
||||
if ping_response != "OK":
|
||||
if attempt == num_attempts - 1:
|
||||
assert ping_response == "OK", f'Expected "OK", but got "{ping_response}"'
|
||||
else:
|
||||
time.sleep(1)
|
||||
else:
|
||||
logging.debug(f"mock {mock_filename} ({port}) answered {ping_response} on attempt {attempt}")
|
||||
break
|
||||
|
||||
logging.info("S3 mocks started")
|
||||
|
||||
|
||||
def wait_for_delete_s3_objects(cluster, expected, timeout=30):
|
||||
minio = cluster.minio_client
|
||||
while timeout > 0:
|
||||
@ -136,7 +164,7 @@ def drop_table(cluster):
|
||||
]
|
||||
)
|
||||
def test_simple_insert_select(cluster, min_rows_for_wide_part, files_per_part):
|
||||
create_table(cluster, "s3_test", additional_settings="min_rows_for_wide_part={}".format(min_rows_for_wide_part))
|
||||
create_table(cluster, "s3_test", min_rows_for_wide_part=min_rows_for_wide_part)
|
||||
|
||||
node = cluster.instances["node"]
|
||||
minio = cluster.minio_client
|
||||
@ -158,13 +186,12 @@ def test_simple_insert_select(cluster, min_rows_for_wide_part, files_per_part):
|
||||
"merge_vertical", [False, True]
|
||||
)
|
||||
def test_insert_same_partition_and_merge(cluster, merge_vertical):
|
||||
settings = None
|
||||
settings = {}
|
||||
if merge_vertical:
|
||||
settings = """
|
||||
vertical_merge_algorithm_min_rows_to_activate=0,
|
||||
vertical_merge_algorithm_min_columns_to_activate=0
|
||||
"""
|
||||
create_table(cluster, "s3_test", additional_settings=settings)
|
||||
settings['vertical_merge_algorithm_min_rows_to_activate'] = 0
|
||||
settings['vertical_merge_algorithm_min_columns_to_activate'] = 0
|
||||
|
||||
create_table(cluster, "s3_test", **settings)
|
||||
|
||||
node = cluster.instances["node"]
|
||||
minio = cluster.minio_client
|
||||
@ -459,3 +486,13 @@ def test_s3_disk_restart_during_load(cluster):
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
|
||||
def test_s3_disk_reads_on_unstable_connection(cluster):
|
||||
create_table(cluster, "s3_test", storage_policy='unstable_s3')
|
||||
node = cluster.instances["node"]
|
||||
node.query("INSERT INTO s3_test SELECT today(), *, toString(*) FROM system.numbers LIMIT 9000000")
|
||||
for i in range(30):
|
||||
print(f"Read sequence {i}")
|
||||
assert node.query("SELECT sum(id) FROM s3_test").splitlines() == ["40499995500000"]
|
||||
|
||||
|
@ -16,3 +16,5 @@
|
||||
['2016-06-15 23:00:16']
|
||||
2016-04-02 17:23:12
|
||||
['2016-04-02 17:23:12']
|
||||
2016-04-02 17:23:12
|
||||
['2016-04-02 17:23:12']
|
||||
|
@ -30,4 +30,7 @@ SELECT quantilesTDigestWeighted(0.2)(d, 1) FROM datetime;
|
||||
SELECT quantileBFloat16(0.2)(d) FROM datetime;
|
||||
SELECT quantilesBFloat16(0.2)(d) FROM datetime;
|
||||
|
||||
SELECT quantileBFloat16Weighted(0.2)(d, 1) FROM datetime;
|
||||
SELECT quantilesBFloat16Weighted(0.2)(d, 1) FROM datetime;
|
||||
|
||||
DROP TABLE datetime;
|
||||
|
Loading…
Reference in New Issue
Block a user