Merge remote-tracking branch 'origin/master' into fix-flaky-test-20

This commit is contained in:
Alexey Milovidov 2021-07-20 21:47:22 +03:00
commit 4a0ec010af
67 changed files with 734 additions and 532 deletions

View File

@ -72,7 +72,10 @@ do
if [ "$DO_CHOWN" = "1" ]; then
# ensure proper directories permissions
chown -R "$USER:$GROUP" "$dir"
# but skip it for if directory already has proper premissions, cause recursive chown may be slow
if [ "$(stat -c %u "$dir")" != "$USER" ] || [ "$(stat -c %g "$dir")" != "$GROUP" ]; then
chown -R "$USER:$GROUP" "$dir"
fi
elif ! $gosu test -d "$dir" -a -w "$dir" -a -r "$dir"; then
echo "Necessary directory '$dir' isn't accessible by user with id '$USER'"
exit 1

View File

@ -65,7 +65,7 @@ RUN apt-get update \
unixodbc \
--yes --no-install-recommends
RUN pip3 install numpy scipy pandas
RUN pip3 install numpy scipy pandas Jinja2
# This symlink required by gcc to find lld compiler
RUN ln -s /usr/bin/lld-${LLVM_VERSION} /usr/bin/ld.lld

View File

@ -76,6 +76,7 @@ RUN python3 -m pip install \
pytest \
pytest-timeout \
pytest-xdist \
pytest-repeat \
redis \
tzlocal \
urllib3 \

View File

@ -2,7 +2,7 @@ version: '2.3'
services:
postgres1:
image: postgres
command: ["postgres", "-c", "logging_collector=on", "-c", "log_directory=/postgres/logs", "-c", "log_filename=postgresql.log", "-c", "log_statement=all"]
command: ["postgres", "-c", "logging_collector=on", "-c", "log_directory=/postgres/logs", "-c", "log_filename=postgresql.log", "-c", "log_statement=all", "-c", "max_connections=200"]
restart: always
expose:
- ${POSTGRES_PORT}

View File

@ -32,7 +32,7 @@ RUN apt-get update -y \
postgresql-client \
sqlite3
RUN pip3 install numpy scipy pandas
RUN pip3 install numpy scipy pandas Jinja2
RUN mkdir -p /tmp/clickhouse-odbc-tmp \
&& wget -nv -O - ${odbc_driver_url} | tar --strip-components=1 -xz -C /tmp/clickhouse-odbc-tmp \

View File

@ -12,7 +12,7 @@ UNKNOWN_SIGN = "[ UNKNOWN "
SKIPPED_SIGN = "[ SKIPPED "
HUNG_SIGN = "Found hung queries in processlist"
NO_TASK_TIMEOUT_SIGN = "All tests have finished"
NO_TASK_TIMEOUT_SIGNS = ["All tests have finished", "No tests were run"]
RETRIES_SIGN = "Some tests were restarted"
@ -29,7 +29,7 @@ def process_test_log(log_path):
with open(log_path, 'r') as test_file:
for line in test_file:
line = line.strip()
if NO_TASK_TIMEOUT_SIGN in line:
if any(s in line for s in NO_TASK_TIMEOUT_SIGNS):
task_timeout = False
if HUNG_SIGN in line:
hung = True
@ -80,6 +80,7 @@ def process_result(result_path):
if result_path and os.path.exists(result_path):
total, skipped, unknown, failed, success, hung, task_timeout, retries, test_results = process_test_log(result_path)
is_flacky_check = 1 < int(os.environ.get('NUM_TRIES', 1))
logging.info("Is flacky check: %s", is_flacky_check)
# If no tests were run (success == 0) it indicates an error (e.g. server did not start or crashed immediately)
# But it's Ok for "flaky checks" - they can contain just one test for check which is marked as skipped.
if failed != 0 or unknown != 0 or (success == 0 and (not is_flacky_check)):

View File

@ -509,6 +509,23 @@ Possible values:
Default value: `ALL`.
## join_algorithm {#settings-join_algorithm}
Specifies [JOIN](../../sql-reference/statements/select/join.md) algorithm.
Possible values:
- `hash` — [Hash join algorithm](https://en.wikipedia.org/wiki/Hash_join) is used.
- `partial_merge` — [Sort-merge algorithm](https://en.wikipedia.org/wiki/Sort-merge_join) is used.
- `prefer_partial_merge` — ClickHouse always tries to use `merge` join if possible.
- `auto` — ClickHouse tries to change `hash` join to `merge` join on the fly to avoid out of memory.
Default value: `hash`.
When using `hash` algorithm the right part of `JOIN` is uploaded into RAM.
When using `partial_merge` algorithm ClickHouse sorts the data and dumps it to the disk. The `merge` algorithm in ClickHouse differs a bit from the classic realization. First ClickHouse sorts the right table by [join key](../../sql-reference/statements/select/join.md#select-join) in blocks and creates min-max index for sorted blocks. Then it sorts parts of left table by `join key` and joins them over right table. The min-max index is also used to skip unneeded right table blocks.
## join_any_take_last_row {#settings-join_any_take_last_row}
Changes behaviour of join operations with `ANY` strictness.

View File

@ -9,8 +9,8 @@ toc_title: Map(key, value)
**Parameters**
- `key` — The key part of the pair. [String](../../sql-reference/data-types/string.md) or [Integer](../../sql-reference/data-types/int-uint.md).
- `value` — The value part of the pair. [String](../../sql-reference/data-types/string.md), [Integer](../../sql-reference/data-types/int-uint.md) or [Array](../../sql-reference/data-types/array.md).
- `key` — The key part of the pair. [String](../../sql-reference/data-types/string.md), [Integer](../../sql-reference/data-types/int-uint.md), [LowCardinality](../../sql-reference/data-types/lowcardinality.md), or [FixedString](../../sql-reference/data-types/fixedstring.md).
- `value` — The value part of the pair. [String](../../sql-reference/data-types/string.md), [Integer](../../sql-reference/data-types/int-uint.md), [Array](../../sql-reference/data-types/array.md), [LowCardinality](../../sql-reference/data-types/lowcardinality.md), or [FixedString](../../sql-reference/data-types/fixedstring.md).
To get the value from an `a Map('key', 'value')` column, use `a['key']` syntax. This lookup works now with a linear complexity.

View File

@ -36,14 +36,23 @@ Additional join types available in ClickHouse:
- `LEFT ANY JOIN`, `RIGHT ANY JOIN` and `INNER ANY JOIN`, partially (for opposite side of `LEFT` and `RIGHT`) or completely (for `INNER` and `FULL`) disables the cartesian product for standard `JOIN` types.
- `ASOF JOIN` and `LEFT ASOF JOIN`, joining sequences with a non-exact match. `ASOF JOIN` usage is described below.
## Setting {#join-settings}
## Settings {#join-settings}
!!! note "Note"
The default join type can be overriden using [join_default_strictness](../../../operations/settings/settings.md#settings-join_default_strictness) setting.
The default join type can be overriden using [join_default_strictness](../../../operations/settings/settings.md#settings-join_default_strictness) setting.
Also the behavior of ClickHouse server for `ANY JOIN` operations depends on the [any_join_distinct_right_table_keys](../../../operations/settings/settings.md#any_join_distinct_right_table_keys) setting.
The behavior of ClickHouse server for `ANY JOIN` operations depends on the [any_join_distinct_right_table_keys](../../../operations/settings/settings.md#any_join_distinct_right_table_keys) setting.
### ASOF JOIN Usage {#asof-join-usage}
**See also**
- [join_algorithm](../../../operations/settings/settings.md#settings-join_algorithm)
- [join_any_take_last_row](../../../operations/settings/settings.md#settings-join_any_take_last_row)
- [join_use_nulls](../../../operations/settings/settings.md#join_use_nulls)
- [partial_merge_join_optimizations](../../../operations/settings/settings.md#partial_merge_join_optimizations)
- [partial_merge_join_rows_in_right_blocks](../../../operations/settings/settings.md#partial_merge_join_rows_in_right_blocks)
- [join_on_disk_max_files_to_merge](../../../operations/settings/settings.md#join_on_disk_max_files_to_merge)
- [any_join_distinct_right_table_keys](../../../operations/settings/settings.md#any_join_distinct_right_table_keys)
## ASOF JOIN Usage {#asof-join-usage}
`ASOF JOIN` is useful when you need to join records that have no exact match.
@ -93,7 +102,7 @@ For example, consider the following tables:
!!! note "Note"
`ASOF` join is **not** supported in the [Join](../../../engines/table-engines/special/join.md) table engine.
## Distributed Join {#global-join}
## Distributed JOIN {#global-join}
There are two ways to execute join involving distributed tables:
@ -102,6 +111,42 @@ There are two ways to execute join involving distributed tables:
Be careful when using `GLOBAL`. For more information, see the [Distributed subqueries](../../../sql-reference/operators/in.md#select-distributed-subqueries) section.
## Implicit Type Conversion {#implicit-type-conversion}
`INNER JOIN`, `LEFT JOIN`, `RIGHT JOIN`, and `FULL JOIN` queries support the implicit type conversion for "join keys". However the query can not be executed, if join keys from the left and the right tables cannot be converted to a single type (for example, there is no data type that can hold all values from both `UInt64` and `Int64`, or `String` and `Int32`).
**Example**
Consider the table `t_1`:
```text
┌─a─┬─b─┬─toTypeName(a)─┬─toTypeName(b)─┐
│ 1 │ 1 │ UInt16 │ UInt8 │
│ 2 │ 2 │ UInt16 │ UInt8 │
└───┴───┴───────────────┴───────────────┘
```
and the table `t_2`:
```text
┌──a─┬────b─┬─toTypeName(a)─┬─toTypeName(b)───┐
│ -1 │ 1 │ Int16 │ Nullable(Int64) │
│ 1 │ -1 │ Int16 │ Nullable(Int64) │
│ 1 │ 1 │ Int16 │ Nullable(Int64) │
└────┴──────┴───────────────┴─────────────────┘
```
The query
```sql
SELECT a, b, toTypeName(a), toTypeName(b) FROM t_1 FULL JOIN t_2 USING (a, b);
```
returns the set:
```text
┌──a─┬────b─┬─toTypeName(a)─┬─toTypeName(b)───┐
│ 1 │ 1 │ Int32 │ Nullable(Int64) │
│ 2 │ 2 │ Int32 │ Nullable(Int64) │
│ -1 │ 1 │ Int32 │ Nullable(Int64) │
│ 1 │ -1 │ Int32 │ Nullable(Int64) │
└────┴──────┴───────────────┴─────────────────┘
```
## Usage Recommendations {#usage-recommendations}
### Processing of Empty or NULL Cells {#processing-of-empty-or-null-cells}
@ -139,9 +184,9 @@ If you need a `JOIN` for joining with dimension tables (these are relatively sma
### Memory Limitations {#memory-limitations}
By default, ClickHouse uses the [hash join](https://en.wikipedia.org/wiki/Hash_join) algorithm. ClickHouse takes the `<right_table>` and creates a hash table for it in RAM. After some threshold of memory consumption, ClickHouse falls back to merge join algorithm.
By default, ClickHouse uses the [hash join](https://en.wikipedia.org/wiki/Hash_join) algorithm. ClickHouse takes the right_table and creates a hash table for it in RAM. If `join_algorithm = 'auto'` is enabled, then after some threshold of memory consumption, ClickHouse falls back to [merge](https://en.wikipedia.org/wiki/Sort-merge_join) join algorithm. For `JOIN` algorithms description see the [join_algorithm](../../../operations/settings/settings.md#settings-join_algorithm) setting.
If you need to restrict join operation memory consumption use the following settings:
If you need to restrict `JOIN` operation memory consumption use the following settings:
- [max_rows_in_join](../../../operations/settings/query-complexity.md#settings-max_rows_in_join) — Limits number of rows in the hash table.
- [max_bytes_in_join](../../../operations/settings/query-complexity.md#settings-max_bytes_in_join) — Limits size of the hash table.

View File

@ -3,6 +3,16 @@ toc_priority: 76
toc_title: Security Changelog
---
## Fixed in ClickHouse 21.4.3.21, 2021-04-12 {#fixed-in-clickhouse-release-21-4-3-21-2021-04-12}
### CVE-2021-25263 {#cve-2021-25263}
An attacker that has CREATE DICTIONARY privilege, can read arbitary file outside permitted directory.
Fix has been pushed to versions 20.8.18.32-lts, 21.1.9.41-stable, 21.2.9.41-stable, 21.3.6.55-lts, 21.4.3.21-stable and later.
Credits: [Vyacheslav Egoshin](https://twitter.com/vegoshin)
## Fixed in ClickHouse Release 19.14.3.3, 2019-09-10 {#fixed-in-clickhouse-release-19-14-3-3-2019-09-10}
### CVE-2019-15024 {#cve-2019-15024}

View File

@ -490,6 +490,23 @@ ClickHouse может парсить только базовый формат `Y
Значение по умолчанию: `ALL`.
## join_algorithm {#settings-join_algorithm}
Определяет алгоритм выполнения запроса [JOIN](../../sql-reference/statements/select/join.md).
Возможные значения:
- `hash` — используется [алгоритм соединения хешированием](https://ru.wikipedia.org/wiki/Алгоритм_соединения_хешированием).
- `partial_merge` — используется [алгоритм соединения слиянием сортированных списков](https://ru.wikipedia.org/wiki/Алгоритм_соединения_слиянием_сортированных_списков).
- `prefer_partial_merge` — используется алгоритм соединения слиянием сортированных списков, когда это возможно.
- `auto` — сервер ClickHouse пытается на лету заменить алгоритм `hash` на `merge`, чтобы избежать переполнения памяти.
Значение по умолчанию: `hash`.
При использовании алгоритма `hash` правая часть `JOIN` загружается в оперативную память.
При использовании алгоритма `partial_merge` сервер сортирует данные и сбрасывает их на диск. Работа алгоритма `merge` в ClickHouse немного отличается от классической реализации. Сначала ClickHouse сортирует правую таблицу по блокам на основе [ключей соединения](../../sql-reference/statements/select/join.md#select-join) и для отсортированных блоков строит индексы min-max. Затем он сортирует куски левой таблицы на основе ключей соединения и объединяет их с правой таблицей операцией `JOIN`. Созданные min-max индексы используются для пропуска тех блоков из правой таблицы, которые не участвуют в данной операции `JOIN`.
## join_any_take_last_row {#settings-join_any_take_last_row}
Изменяет поведение операций, выполняемых со строгостью `ANY`.

View File

@ -9,8 +9,8 @@ toc_title: Map(key, value)
**Параметры**
- `key` — ключ. [String](../../sql-reference/data-types/string.md) или [Integer](../../sql-reference/data-types/int-uint.md).
- `value` — значение. [String](../../sql-reference/data-types/string.md), [Integer](../../sql-reference/data-types/int-uint.md) или [Array](../../sql-reference/data-types/array.md).
- `key` — ключ. [String](../../sql-reference/data-types/string.md), [Integer](../../sql-reference/data-types/int-uint.md), [LowCardinality](../../sql-reference/data-types/lowcardinality.md) или [FixedString](../../sql-reference/data-types/fixedstring.md).
- `value` — значение. [String](../../sql-reference/data-types/string.md), [Integer](../../sql-reference/data-types/int-uint.md), [Array](../../sql-reference/data-types/array.md), [LowCardinality](../../sql-reference/data-types/lowcardinality.md) или [FixedString](../../sql-reference/data-types/fixedstring.md).
Чтобы получить значение из колонки `a Map('key', 'value')`, используйте синтаксис `a['key']`. В настоящее время такая подстановка работает по алгоритму с линейной сложностью.

View File

@ -4,7 +4,7 @@ toc_title: JOIN
# Секция JOIN {#select-join}
Join создаёт новую таблицу путем объединения столбцов из одной или нескольких таблиц с использованием общих для каждой из них значений. Это обычная операция в базах данных с поддержкой SQL, которая соответствует join из [реляционной алгебры](https://en.wikipedia.org/wiki/Relational_algebra#Joins_and_join-like_operators). Частный случай соединения одной таблицы часто называют «self-join».
`JOIN` создаёт новую таблицу путем объединения столбцов из одной или нескольких таблиц с использованием общих для каждой из них значений. Это обычная операция в базах данных с поддержкой SQL, которая соответствует join из [реляционной алгебры](https://en.wikipedia.org/wiki/Relational_algebra#Joins_and_join-like_operators). Частный случай соединения одной таблицы часто называют self-join.
Синтаксис:
@ -38,12 +38,21 @@ FROM <left_table>
## Настройки {#join-settings}
!!! note "Примечание"
Значение строгости по умолчанию может быть переопределено с помощью настройки [join_default_strictness](../../../operations/settings/settings.md#settings-join_default_strictness).
Значение строгости по умолчанию может быть переопределено с помощью настройки [join_default_strictness](../../../operations/settings/settings.md#settings-join_default_strictness).
Поведение сервера ClickHouse для операций `ANY JOIN` зависит от параметра [any_join_distinct_right_table_keys](../../../operations/settings/settings.md#any_join_distinct_right_table_keys).
### Использование ASOF JOIN {#asof-join-usage}
**См. также**
- [join_algorithm](../../../operations/settings/settings.md#settings-join_algorithm)
- [join_any_take_last_row](../../../operations/settings/settings.md#settings-join_any_take_last_row)
- [join_use_nulls](../../../operations/settings/settings.md#join_use_nulls)
- [partial_merge_join_optimizations](../../../operations/settings/settings.md#partial_merge_join_optimizations)
- [partial_merge_join_rows_in_right_blocks](../../../operations/settings/settings.md#partial_merge_join_rows_in_right_blocks)
- [join_on_disk_max_files_to_merge](../../../operations/settings/settings.md#join_on_disk_max_files_to_merge)
- [any_join_distinct_right_table_keys](../../../operations/settings/settings.md#any_join_distinct_right_table_keys)
## Использование ASOF JOIN {#asof-join-usage}
`ASOF JOIN` применим в том случае, когда необходимо объединять записи, которые не имеют точного совпадения.
@ -95,7 +104,7 @@ USING (equi_column1, ... equi_columnN, asof_column)
Чтобы задать значение строгости по умолчанию, используйте сессионный параметр [join_default_strictness](../../../operations/settings/settings.md#settings-join_default_strictness).
#### Распределённый join {#global-join}
## Распределённый JOIN {#global-join}
Есть два пути для выполнения соединения с участием распределённых таблиц:
@ -104,6 +113,42 @@ USING (equi_column1, ... equi_columnN, asof_column)
Будьте аккуратны при использовании `GLOBAL`. За дополнительной информацией обращайтесь в раздел [Распределенные подзапросы](../../../sql-reference/operators/in.md#select-distributed-subqueries).
## Неявные преобразования типов {#implicit-type-conversion}
Запросы `INNER JOIN`, `LEFT JOIN`, `RIGHT JOIN` и `FULL JOIN` поддерживают неявные преобразования типов для ключей соединения. Однако запрос не может быть выполнен, если не существует типа, к которому можно привести значения ключей с обеих сторон (например, нет типа, который бы одновременно вмещал в себя значения `UInt64` и `Int64`, или `String` и `Int32`).
**Пример**
Рассмотрим таблицу `t_1`:
```text
┌─a─┬─b─┬─toTypeName(a)─┬─toTypeName(b)─┐
│ 1 │ 1 │ UInt16 │ UInt8 │
│ 2 │ 2 │ UInt16 │ UInt8 │
└───┴───┴───────────────┴───────────────┘
```
и таблицу `t_2`:
```text
┌──a─┬────b─┬─toTypeName(a)─┬─toTypeName(b)───┐
│ -1 │ 1 │ Int16 │ Nullable(Int64) │
│ 1 │ -1 │ Int16 │ Nullable(Int64) │
│ 1 │ 1 │ Int16 │ Nullable(Int64) │
└────┴──────┴───────────────┴─────────────────┘
```
Запрос
```sql
SELECT a, b, toTypeName(a), toTypeName(b) FROM t_1 FULL JOIN t_2 USING (a, b);
```
вернёт результат:
```text
┌──a─┬────b─┬─toTypeName(a)─┬─toTypeName(b)───┐
│ 1 │ 1 │ Int32 │ Nullable(Int64) │
│ 2 │ 2 │ Int32 │ Nullable(Int64) │
│ -1 │ 1 │ Int32 │ Nullable(Int64) │
│ 1 │ -1 │ Int32 │ Nullable(Int64) │
└────┴──────┴───────────────┴─────────────────┘
```
## Рекомендации по использованию {#usage-recommendations}
### Обработка пустых ячеек и NULL {#processing-of-empty-or-null-cells}
@ -142,12 +187,14 @@ USING (equi_column1, ... equi_columnN, asof_column)
### Ограничения по памяти {#memory-limitations}
По умолчанию ClickHouse использует алгоритм [hash join](https://en.wikipedia.org/wiki/Hash_join). ClickHouse берет `<right_table>` и создает для него хэш-таблицу в оперативной памяти. После некоторого порога потребления памяти ClickHouse переходит к алгоритму merge join.
По умолчанию ClickHouse использует алгоритм [hash join](https://ru.wikipedia.org/wiki/Алгоритм_соединения_хешированием). ClickHouse берет правую таблицу и создает для нее хеш-таблицу в оперативной памяти. При включённой настройке `join_algorithm = 'auto'`, после некоторого порога потребления памяти ClickHouse переходит к алгоритму [merge join](https://ru.wikipedia.org/wiki/Алгоритм_соединения_слиянием_сортированных_списков). Описание алгоритмов `JOIN` см. в настройке [join_algorithm](../../../operations/settings/settings.md#settings-join_algorithm).
- [max_rows_in_join](../../../operations/settings/query-complexity.md#settings-max_rows_in_join) — ограничивает количество строк в хэш-таблице.
- [max_bytes_in_join](../../../operations/settings/query-complexity.md#settings-max_bytes_in_join) — ограничивает размер хэш-таблицы.
Если вы хотите ограничить потребление памяти во время выполнения операции `JOIN`, используйте настройки:
По достижении любого из этих ограничений, ClickHouse действует в соответствии с настройкой [join_overflow_mode](../../../operations/settings/query-complexity.md#settings-join_overflow_mode).
- [max_rows_in_join](../../../operations/settings/query-complexity.md#settings-max_rows_in_join) — ограничивает количество строк в хеш-таблице.
- [max_bytes_in_join](../../../operations/settings/query-complexity.md#settings-max_bytes_in_join) — ограничивает размер хеш-таблицы.
По достижении любого из этих ограничений ClickHouse действует в соответствии с настройкой [join_overflow_mode](../../../operations/settings/query-complexity.md#settings-join_overflow_mode).
## Примеры {#examples}

View File

@ -5,6 +5,17 @@ toc_title: Security Changelog
# Security Changelog {#security-changelog}
## Исправлено в релизе 21.4.3.21, 2021-04-12 {#fixed-in-clickhouse-release-21-4-3-21-2019-09-10}
### CVE-2021-25263 {#cve-2021-25263}
Злоумышленник с доступом к созданию словарей может читать файлы на файловой системе сервера Clickhouse.
Злоумышленник может обойти некорректную проверку пути к файлу словаря и загрузить часть любого файла как словарь. При этом, манипулируя опциями парсинга файла, можно получить следующую часть файла и пошагово прочитать весь файл.
Исправление доступно в версиях 20.8.18.32-lts, 21.1.9.41-stable, 21.2.9.41-stable, 21.3.6.55-lts, 21.4.3.21-stable и выше.
Обнаружено благодаря: [Вячеславу Егошину](https://twitter.com/vegoshin)
## Исправлено в релизе 19.14.3.3, 2019-09-10 {#ispravleno-v-relize-19-14-3-3-2019-09-10}
### CVE-2019-15024 {#cve-2019-15024}

View File

@ -305,12 +305,12 @@ void DatabaseLazy::clearExpiredTables() const
DatabaseLazyIterator::DatabaseLazyIterator(DatabaseLazy & database_, Strings && table_names_)
: database(database_)
: IDatabaseTablesIterator(database_.database_name)
, database(database_)
, table_names(std::move(table_names_))
, iterator(table_names.begin())
, current_storage(nullptr)
{
database_name = database.database_name;
}
void DatabaseLazyIterator::next()

View File

@ -45,6 +45,9 @@ public:
/// - it maintains a list of tables but tables are loaded lazily).
virtual const StoragePtr & table() const = 0;
IDatabaseTablesIterator(const String & database_name_) : database_name(database_name_) { }
IDatabaseTablesIterator(String && database_name_) : database_name(std::move(database_name_)) { }
virtual ~IDatabaseTablesIterator() = default;
virtual UUID uuid() const { return UUIDHelpers::Nil; }
@ -52,7 +55,7 @@ public:
const String & databaseName() const { assert(!database_name.empty()); return database_name; }
protected:
String database_name;
const String database_name;
};
/// Copies list of tables and iterates through such snapshot.
@ -64,26 +67,24 @@ private:
protected:
DatabaseTablesSnapshotIterator(DatabaseTablesSnapshotIterator && other)
: IDatabaseTablesIterator(std::move(other.database_name))
{
size_t idx = std::distance(other.tables.begin(), other.it);
std::swap(tables, other.tables);
other.it = other.tables.end();
it = tables.begin();
std::advance(it, idx);
database_name = std::move(other.database_name);
}
public:
DatabaseTablesSnapshotIterator(const Tables & tables_, const String & database_name_)
: tables(tables_), it(tables.begin())
: IDatabaseTablesIterator(database_name_), tables(tables_), it(tables.begin())
{
database_name = database_name_;
}
DatabaseTablesSnapshotIterator(Tables && tables_, String && database_name_)
: tables(std::move(tables_)), it(tables.begin())
: IDatabaseTablesIterator(std::move(database_name_)), tables(std::move(tables_)), it(tables.begin())
{
database_name = std::move(database_name_);
}
void next() override { ++it; }

View File

@ -66,6 +66,8 @@ public:
void assertCalledFromSyncThreadOrDrop(const char * method) const;
void shutdownSynchronizationThread();
friend class DatabaseMaterializeTablesIterator;
};

View File

@ -30,7 +30,7 @@ public:
UUID uuid() const override { return nested_iterator->uuid(); }
DatabaseMaterializeTablesIterator(DatabaseTablesIteratorPtr nested_iterator_, const IDatabase * database_)
: nested_iterator(std::move(nested_iterator_)), database(database_)
: IDatabaseTablesIterator(database_->getDatabaseName()), nested_iterator(std::move(nested_iterator_)), database(database_)
{
}

View File

@ -29,10 +29,12 @@ std::pair<bool, size_t> fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, D
if (quotes)
{
pos = find_first_symbols<'\\', '"'>(pos, in.buffer().end());
if (pos > in.buffer().end())
throw Exception("Position in buffer is out of bounds. There must be a bug.", ErrorCodes::LOGICAL_ERROR);
else if (pos == in.buffer().end())
continue;
if (*pos == '\\')
{
++pos;
@ -48,10 +50,12 @@ std::pair<bool, size_t> fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, D
else
{
pos = find_first_symbols<'{', '}', '\\', '"'>(pos, in.buffer().end());
if (pos > in.buffer().end())
throw Exception("Position in buffer is out of bounds. There must be a bug.", ErrorCodes::LOGICAL_ERROR);
else if (pos == in.buffer().end())
continue;
else if (*pos == '{')
{
++balance;

View File

@ -352,9 +352,12 @@ static ReturnType parseJSONEscapeSequence(Vector & s, ReadBuffer & buf)
};
++buf.position();
if (buf.eof())
return error("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE);
assert(buf.hasPendingData());
switch (*buf.position())
{
case '"':
@ -1125,10 +1128,13 @@ void saveUpToPosition(ReadBuffer & in, DB::Memory<> & memory, char * current)
const size_t old_bytes = memory.size();
const size_t additional_bytes = current - in.position();
const size_t new_bytes = old_bytes + additional_bytes;
/// There are no new bytes to add to memory.
/// No need to do extra stuff.
if (new_bytes == 0)
return;
assert(in.position() + additional_bytes <= in.buffer().end());
memory.resize(new_bytes);
memcpy(memory.data() + old_bytes, in.position(), additional_bytes);
in.position() = current;

View File

@ -56,6 +56,13 @@ bool ZstdInflatingReadBuffer::nextImpl()
eof = true;
return !working_buffer.empty();
}
else if (output.pos == 0)
{
/// It is possible, that input buffer is not at eof yet, but nothing was decompressed in current iteration.
/// But there are cases, when such behaviour is not allowed - i.e. if input buffer is not eof, then
/// it has to be guaranteed that working_buffer is not empty. So if it is empty, continue.
return nextImpl();
}
return true;
}

View File

@ -81,8 +81,12 @@ DictionaryStructure ExternalDictionariesLoader::getDictionaryStructure(const std
std::string ExternalDictionariesLoader::resolveDictionaryName(const std::string & dictionary_name, const std::string & current_database_name) const
{
bool has_dictionary = has(dictionary_name);
if (has_dictionary)
return dictionary_name;
std::string resolved_name = resolveDictionaryNameFromDatabaseCatalog(dictionary_name);
bool has_dictionary = has(resolved_name);
has_dictionary = has(resolved_name);
if (!has_dictionary)
{

View File

@ -388,6 +388,9 @@ InterpreterSelectQuery::InterpreterSelectQuery(
query_info.syntax_analyzer_result = syntax_analyzer_result;
if (storage && !query.final() && storage->needRewriteQueryWithFinal(syntax_analyzer_result->requiredSourceColumns()))
query.setFinal();
/// Save scalar sub queries's results in the query context
if (!options.only_analyze && context->hasQueryContext())
for (const auto & it : syntax_analyzer_result->getScalars())

View File

@ -438,4 +438,19 @@ ASTPtr & ASTSelectQuery::getExpression(Expression expr)
return children[positions[expr]];
}
void ASTSelectQuery::setFinal() // NOLINT method can be made const
{
auto & tables_in_select_query = tables()->as<ASTTablesInSelectQuery &>();
if (tables_in_select_query.children.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tables list is empty, it's a bug");
auto & tables_element = tables_in_select_query.children[0]->as<ASTTablesInSelectQueryElement &>();
if (!tables_element.table_expression)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no table expression, it's a bug");
tables_element.table_expression->as<ASTTableExpression &>().final = true;
}
}

View File

@ -93,6 +93,8 @@ public:
void addTableFunction(ASTPtr & table_function_ptr);
void updateTreeHashImpl(SipHash & hash_state) const override;
void setFinal();
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;

View File

@ -95,6 +95,7 @@ void MySQLHandler::run()
connection_context->getClientInfo().interface = ClientInfo::Interface::MYSQL;
connection_context->setDefaultFormat("MySQLWire");
connection_context->getClientInfo().connection_id = connection_id;
connection_context->getClientInfo().query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
in = std::make_shared<ReadBufferFromPocoSocket>(socket());
out = std::make_shared<WriteBufferFromPocoSocket>(socket());

View File

@ -55,6 +55,7 @@ void PostgreSQLHandler::run()
connection_context->makeSessionContext();
connection_context->getClientInfo().interface = ClientInfo::Interface::POSTGRESQL;
connection_context->setDefaultFormat("PostgreSQLWire");
connection_context->getClientInfo().query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
try
{

View File

@ -272,6 +272,10 @@ public:
throw Exception("Method watch is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/// Returns true if FINAL modifier must be added to SELECT query depending on required columns.
/// It's needed for ReplacingMergeTree wrappers such as MaterializedMySQL and MaterializedPostrgeSQL
virtual bool needRewriteQueryWithFinal(const Names & /*column_names*/) const { return false; }
/** Read a set of columns from the table.
* Accepts a list of columns to read, as well as a description of the query,
* from which information can be extracted about how to retrieve data

View File

@ -256,6 +256,12 @@ NamesAndTypesList StorageMaterializedPostgreSQL::getVirtuals() const
}
bool StorageMaterializedPostgreSQL::needRewriteQueryWithFinal(const Names & column_names) const
{
return needRewriteQueryWithFinalForStorage(column_names, getNested());
}
Pipe StorageMaterializedPostgreSQL::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,

View File

@ -82,6 +82,8 @@ public:
NamesAndTypesList getVirtuals() const override;
bool needRewriteQueryWithFinal(const Names & column_names) const override;
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -119,6 +121,8 @@ public:
/// for current table, set has_nested = true.
StoragePtr prepare();
bool supportsFinal() const override { return true; }
protected:
StorageMaterializedPostgreSQL(
const StorageID & table_id_,

View File

@ -57,11 +57,13 @@ void RabbitMQHandler::iterateLoop()
/// initial RabbitMQ setup - at this point there is no background loop thread.
void RabbitMQHandler::startBlockingLoop()
{
LOG_DEBUG(log, "Started blocking loop.");
uv_run(loop, UV_RUN_DEFAULT);
}
void RabbitMQHandler::stopLoop()
{
LOG_DEBUG(log, "Implicit loop stop.");
uv_stop(loop);
}

View File

@ -56,7 +56,11 @@ public:
ChannelPtr & getChannel() { return consumer_channel; }
void setupChannel();
bool needChannelUpdate();
void closeChannel() { consumer_channel->close(); }
void closeChannel()
{
if (consumer_channel)
consumer_channel->close();
}
void updateQueues(std::vector<String> & queues_) { queues = queues_; }
size_t queuesCount() { return queues.size(); }

View File

@ -264,6 +264,9 @@ size_t StorageRabbitMQ::getMaxBlockSize() const
void StorageRabbitMQ::initRabbitMQ()
{
if (stream_cancelled)
return;
if (use_user_setup)
{
queues.emplace_back(queue_base);
@ -703,10 +706,6 @@ void StorageRabbitMQ::shutdown()
while (!connection->closed() && cnt_retries++ != RETRIES_MAX)
event_handler->iterateLoop();
/// Should actually force closure, if not yet closed, but it generates distracting error logs
//if (!connection->closed())
// connection->close(true);
for (size_t i = 0; i < num_created_consumers; ++i)
popReadBuffer();
}
@ -719,6 +718,22 @@ void StorageRabbitMQ::cleanupRabbitMQ() const
if (use_user_setup)
return;
if (!event_handler->connectionRunning())
{
String queue_names;
for (const auto & queue : queues)
{
if (!queue_names.empty())
queue_names += ", ";
queue_names += queue;
}
LOG_WARNING(log,
"RabbitMQ clean up not done, because there is no connection in table's shutdown."
"There are {} queues ({}), which might need to be deleted manually. Exchanges will be auto-deleted",
queues.size(), queue_names);
return;
}
AMQP::TcpChannel rabbit_channel(connection.get());
for (const auto & queue : queues)
{

View File

@ -16,6 +16,14 @@
namespace DB
{
bool needRewriteQueryWithFinalForStorage(const Names & column_names, const StoragePtr & storage)
{
const StorageMetadataPtr & metadata = storage->getInMemoryMetadataPtr();
Block header = metadata->getSampleBlock();
ColumnWithTypeAndName & version_column = header.getByPosition(header.columns() - 1);
return std::find(column_names.begin(), column_names.end(), version_column.name) == column_names.end();
}
Pipe readFinalFromNestedStorage(
StoragePtr nested_storage,
const Names & column_names,
@ -32,20 +40,6 @@ Pipe readFinalFromNestedStorage(
Block nested_header = nested_metadata->getSampleBlock();
ColumnWithTypeAndName & sign_column = nested_header.getByPosition(nested_header.columns() - 2);
ColumnWithTypeAndName & version_column = nested_header.getByPosition(nested_header.columns() - 1);
if (ASTSelectQuery * select_query = query_info.query->as<ASTSelectQuery>(); select_query && !column_names_set.count(version_column.name))
{
auto & tables_in_select_query = select_query->tables()->as<ASTTablesInSelectQuery &>();
if (!tables_in_select_query.children.empty())
{
auto & tables_element = tables_in_select_query.children[0]->as<ASTTablesInSelectQueryElement &>();
if (tables_element.table_expression)
tables_element.table_expression->as<ASTTableExpression &>().final = true;
}
}
String filter_column_name;
Names require_columns_name = column_names;
@ -59,9 +53,6 @@ Pipe readFinalFromNestedStorage(
expressions->children.emplace_back(makeASTFunction("equals", sign_column_name, fetch_sign_value));
filter_column_name = expressions->children.back()->getColumnName();
for (const auto & column_name : column_names)
expressions->children.emplace_back(std::make_shared<ASTIdentifier>(column_name));
}
Pipe pipe = nested_storage->read(require_columns_name, nested_metadata, query_info, context, processed_stage, max_block_size, num_streams);

View File

@ -13,6 +13,8 @@
namespace DB
{
bool needRewriteQueryWithFinalForStorage(const Names & column_names, const StoragePtr & storage);
Pipe readFinalFromNestedStorage(
StoragePtr nested_storage,
const Names & column_names,

View File

@ -167,7 +167,8 @@ Pipe StorageDictionary::read(
const size_t max_block_size,
const unsigned /*threads*/)
{
auto dictionary = getContext()->getExternalDictionariesLoader().getDictionary(dictionary_name, local_context);
auto registered_dictionary_name = location == Location::SameDatabaseAndNameAsDictionary ? getStorageID().getInternalDictionaryName() : dictionary_name;
auto dictionary = getContext()->getExternalDictionariesLoader().getDictionary(registered_dictionary_name, local_context);
auto stream = dictionary->getBlockInputStream(column_names, max_block_size);
/// TODO: update dictionary interface for processors.
return Pipe(std::make_shared<SourceFromInputStream>(stream));

View File

@ -362,7 +362,7 @@ public:
}
else
{
nested_buffer = std::make_unique<ReadBufferFromFile>(current_path);
nested_buffer = std::make_unique<ReadBufferFromFile>(current_path, context->getSettingsRef().max_read_buffer_size);
method = chooseCompressionMethod(current_path, storage->compression_method);
}

View File

@ -36,6 +36,11 @@ StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_stora
setInMemoryMetadata(in_memory_metadata);
}
bool StorageMaterializeMySQL::needRewriteQueryWithFinal(const Names & column_names) const
{
return needRewriteQueryWithFinalForStorage(column_names, nested_storage);
}
Pipe StorageMaterializeMySQL::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -47,6 +52,7 @@ Pipe StorageMaterializeMySQL::read(
{
/// If the background synchronization thread has exception.
rethrowSyncExceptionIfNeed(database);
return readFinalFromNestedStorage(nested_storage, column_names, metadata_snapshot,
query_info, context, processed_stage, max_block_size, num_streams);
}

View File

@ -24,6 +24,8 @@ public:
StorageMaterializeMySQL(const StoragePtr & nested_storage_, const IDatabase * database_);
bool needRewriteQueryWithFinal(const Names & column_names) const override;
Pipe read(
const Names & column_names, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info,
ContextPtr context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override;

View File

@ -388,6 +388,13 @@ Pipe StorageMerge::createSources(
return pipe;
}
if (!modified_select.final() && storage->needRewriteQueryWithFinal(real_column_names))
{
/// NOTE: It may not work correctly in some cases, because query was analyzed without final.
/// However, it's needed for MaterializeMySQL and it's unlikely that someone will use it with Merge tables.
modified_select.setFinal();
}
auto storage_stage
= storage->getQueryProcessingStage(modified_context, QueryProcessingStage::Complete, metadata_snapshot, modified_query_info);
if (processed_stage <= storage_stage)

3
tests/.gitignore vendored
View File

@ -3,3 +3,6 @@
*.error
*.dump
test_data
/queries/0_stateless/*.gen.sql
/queries/0_stateless/*.gen.reference

View File

@ -29,6 +29,13 @@ import string
import multiprocessing
from contextlib import closing
USE_JINJA = True
try:
import jinja2
except ImportError:
USE_JINJA = False
print('WARNING: jinja2 not installed! Template tests will be skipped.')
DISTRIBUTED_DDL_TIMEOUT_MSG = "is executing longer than distributed_ddl_task_timeout"
MESSAGES_TO_RETRY = [
@ -47,6 +54,8 @@ MESSAGES_TO_RETRY = [
MAX_RETRIES = 3
TEST_FILE_EXTENSIONS = ['.sql', '.sql.j2', '.sh', '.py', '.expect']
class Terminated(KeyboardInterrupt):
pass
@ -407,13 +416,13 @@ def run_tests_array(all_tests_with_params):
status = ''
if not is_concurrent:
sys.stdout.flush()
sys.stdout.write("{0:72}".format(name + ": "))
sys.stdout.write("{0:72}".format(removesuffix(name, ".gen", ".sql") + ": "))
# This flush is needed so you can see the test name of the long
# running test before it will finish. But don't do it in parallel
# mode, so that the lines don't mix.
sys.stdout.flush()
else:
status = "{0:72}".format(name + ": ")
status = "{0:72}".format(removesuffix(name, ".gen", ".sql") + ": ")
if args.skip and any(s in name for s in args.skip):
status += MSG_SKIPPED + " - skip\n"
@ -434,6 +443,9 @@ def run_tests_array(all_tests_with_params):
or 'race' in name):
status += MSG_SKIPPED + " - no long\n"
skipped_total += 1
elif not USE_JINJA and ext.endswith("j2"):
status += MSG_SKIPPED + " - no jinja\n"
skipped_total += 1
else:
disabled_file = os.path.join(suite_dir, name) + '.disabled'
@ -458,11 +470,10 @@ def run_tests_array(all_tests_with_params):
break
file_suffix = ('.' + str(os.getpid())) if is_concurrent and args.test_runs > 1 else ''
reference_file = os.path.join(suite_dir, name) + '.reference'
reference_file = get_reference_file(suite_dir, name)
stdout_file = os.path.join(suite_tmp_dir, name) + file_suffix + '.stdout'
stderr_file = os.path.join(suite_tmp_dir, name) + file_suffix + '.stderr'
testcase_args = configure_testcase_args(args, case_file, suite_tmp_dir, stderr_file)
proc, stdout, stderr, total_time = run_single_test(testcase_args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file)
@ -535,7 +546,7 @@ def run_tests_array(all_tests_with_params):
status += " - having exception:\n{}\n".format(
'\n'.join(stdout.split('\n')[:100]))
status += 'Database: ' + testcase_args.testcase_database
elif not os.path.isfile(reference_file):
elif reference_file is None:
status += MSG_UNKNOWN
status += print_test_time(total_time)
status += " - no reference file\n"
@ -760,6 +771,97 @@ def do_run_tests(jobs, suite, suite_dir, suite_tmp_dir, all_tests, parallel_test
return num_tests
def is_test_from_dir(suite_dir, case):
case_file = os.path.join(suite_dir, case)
# We could also test for executable files (os.access(case_file, os.X_OK),
# but it interferes with 01610_client_spawn_editor.editor, which is invoked
# as a query editor in the test, and must be marked as executable.
return os.path.isfile(case_file) and any(case_file.endswith(suppotred_ext) for suppotred_ext in TEST_FILE_EXTENSIONS)
def removesuffix(text, *suffixes):
"""
Added in python 3.9
https://www.python.org/dev/peps/pep-0616/
This version can work with severtal possible suffixes
"""
for suffix in suffixes:
if suffix and text.endswith(suffix):
return text[:-len(suffix)]
return text
def render_test_template(j2env, suite_dir, test_name):
"""
Render template for test and reference file if needed
"""
if j2env is None:
return test_name
test_base_name = removesuffix(test_name, ".sql.j2", ".sql")
reference_file_name = test_base_name + ".reference.j2"
reference_file_path = os.path.join(suite_dir, reference_file_name)
if os.path.isfile(reference_file_path):
tpl = j2env.get_template(reference_file_name)
tpl.stream().dump(os.path.join(suite_dir, test_base_name) + ".gen.reference")
if test_name.endswith(".sql.j2"):
tpl = j2env.get_template(test_name)
generated_test_name = test_base_name + ".gen.sql"
tpl.stream().dump(os.path.join(suite_dir, generated_test_name))
return generated_test_name
return test_name
def get_selected_tests(suite_dir, patterns):
"""
Find all files with tests, filter, render templates
"""
j2env = jinja2.Environment(
loader=jinja2.FileSystemLoader(suite_dir),
keep_trailing_newline=True,
) if USE_JINJA else None
for test_name in os.listdir(suite_dir):
if not is_test_from_dir(suite_dir, test_name):
continue
if patterns and not any(re.search(pattern, test_name) for pattern in patterns):
continue
if USE_JINJA and test_name.endswith(".gen.sql"):
continue
test_name = render_test_template(j2env, suite_dir, test_name)
yield test_name
def get_tests_list(suite_dir, patterns, test_runs, sort_key):
"""
Return list of tests file names to run
"""
all_tests = list(get_selected_tests(suite_dir, patterns))
all_tests = all_tests * test_runs
all_tests.sort(key=sort_key)
return all_tests
def get_reference_file(suite_dir, name):
"""
Returns reference file name for specified test
"""
name = removesuffix(name, ".gen")
for ext in ['.reference', '.gen.reference']:
reference_file = os.path.join(suite_dir, name) + ext
if os.path.isfile(reference_file):
return reference_file
return None
def main(args):
global server_died
global stop_time
@ -844,14 +946,6 @@ def main(args):
create_common_database(args, args.database)
create_common_database(args, "test")
def is_test_from_dir(suite_dir, case):
case_file = os.path.join(suite_dir, case)
(_, ext) = os.path.splitext(case)
# We could also test for executable files (os.access(case_file, os.X_OK),
# but it interferes with 01610_client_spawn_editor.editor, which is invoked
# as a query editor in the test, and must be marked as executable.
return os.path.isfile(case_file) and (ext in ['.sql', '.sh', '.py', '.expect'])
def sute_key_func(item):
if args.order == 'random':
return random.random()
@ -911,12 +1005,7 @@ def main(args):
except ValueError:
return 99997
all_tests = os.listdir(suite_dir)
all_tests = [case for case in all_tests if is_test_from_dir(suite_dir, case)]
if args.test:
all_tests = [t for t in all_tests if any(re.search(r, t) for r in args.test)]
all_tests = all_tests * args.test_runs
all_tests.sort(key=key_func)
all_tests = get_tests_list(suite_dir, args.test, args.test_runs, key_func)
jobs = args.jobs
parallel_tests = []

View File

@ -1,4 +1,3 @@
import subprocess
from helpers.cluster import run_and_check
import pytest
import logging

View File

@ -29,6 +29,8 @@ from dict2xml import dict2xml
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
from minio import Minio
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from helpers.test_tools import assert_eq_with_retry
from helpers import pytest_xdist_logging_to_separate_files
@ -332,12 +334,16 @@ class ClickHouseCluster:
# available when with_postgres == True
self.postgres_host = "postgres1"
self.postgres_ip = None
self.postgres_conn = None
self.postgres2_host = "postgres2"
self.postgres2_ip = None
self.postgres2_conn = None
self.postgres3_host = "postgres3"
self.postgres3_ip = None
self.postgres3_conn = None
self.postgres4_host = "postgres4"
self.postgres4_ip = None
self.postgres4_conn = None
self.postgres_port = 5432
self.postgres_dir = p.abspath(p.join(self.instances_dir, "postgres"))
self.postgres_logs_dir = os.path.join(self.postgres_dir, "postgres1")
@ -1077,8 +1083,9 @@ class ClickHouseCluster:
start = time.time()
while time.time() - start < timeout:
try:
conn = psycopg2.connect(host=self.postgres_ip, port=self.postgres_port, user='postgres', password='mysecretpassword')
conn.close()
self.postgres_conn = psycopg2.connect(host=self.postgres_ip, port=self.postgres_port, database='postgres', user='postgres', password='mysecretpassword')
self.postgres_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.postgres_conn.autocommit = True
logging.debug("Postgres Started")
return
except Exception as ex:
@ -1092,16 +1099,36 @@ class ClickHouseCluster:
self.postgres3_ip = self.get_instance_ip(self.postgres3_host)
self.postgres4_ip = self.get_instance_ip(self.postgres4_host)
start = time.time()
for ip in [self.postgres2_ip, self.postgres3_ip, self.postgres4_ip]:
while time.time() - start < timeout:
try:
conn = psycopg2.connect(host=ip, port=self.postgres_port, user='postgres', password='mysecretpassword')
conn.close()
logging.debug("Postgres Cluster Started")
return
except Exception as ex:
logging.debug("Can't connect to Postgres " + str(ex))
time.sleep(0.5)
while time.time() - start < timeout:
try:
self.postgres2_conn = psycopg2.connect(host=self.postgres2_ip, port=self.postgres_port, database='postgres', user='postgres', password='mysecretpassword')
self.postgres2_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.postgres2_conn.autocommit = True
logging.debug("Postgres Cluster host 2 started")
break
except Exception as ex:
logging.debug("Can't connect to Postgres host 2" + str(ex))
time.sleep(0.5)
while time.time() - start < timeout:
try:
self.postgres3_conn = psycopg2.connect(host=self.postgres3_ip, port=self.postgres_port, database='postgres', user='postgres', password='mysecretpassword')
self.postgres3_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.postgres3_conn.autocommit = True
logging.debug("Postgres Cluster host 3 started")
break
except Exception as ex:
logging.debug("Can't connect to Postgres host 3" + str(ex))
time.sleep(0.5)
while time.time() - start < timeout:
try:
self.postgres4_conn = psycopg2.connect(host=self.postgres4_ip, port=self.postgres_port, database='postgres', user='postgres', password='mysecretpassword')
self.postgres4_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.postgres4_conn.autocommit = True
logging.debug("Postgres Cluster host 4 started")
return
except Exception as ex:
logging.debug("Can't connect to Postgres host 4" + str(ex))
time.sleep(0.5)
raise Exception("Cannot wait Postgres container")

View File

@ -89,9 +89,9 @@ class Task1:
instance = cluster.instances['s0_0_0']
for cluster_num in ["0", "1"]:
ddl_check_query(instance, "DROP DATABASE IF EXISTS default ON CLUSTER cluster{}".format(cluster_num))
ddl_check_query(instance, "DROP DATABASE IF EXISTS default ON CLUSTER cluster{} SYNC".format(cluster_num))
ddl_check_query(instance,
"CREATE DATABASE IF NOT EXISTS default ON CLUSTER cluster{}".format(
"CREATE DATABASE default ON CLUSTER cluster{} ".format(
cluster_num))
ddl_check_query(instance, "CREATE TABLE hits ON CLUSTER cluster0 (d UInt64, d1 UInt64 MATERIALIZED d+1) " +
@ -105,11 +105,11 @@ class Task1:
settings={"insert_distributed_sync": 1})
def check(self):
assert TSV(self.cluster.instances['s0_0_0'].query("SELECT count() FROM hits_all")) == TSV("1002\n")
assert TSV(self.cluster.instances['s1_0_0'].query("SELECT count() FROM hits_all")) == TSV("1002\n")
assert self.cluster.instances['s0_0_0'].query("SELECT count() FROM hits_all").strip() == "1002"
assert self.cluster.instances['s1_0_0'].query("SELECT count() FROM hits_all").strip() == "1002"
assert TSV(self.cluster.instances['s1_0_0'].query("SELECT DISTINCT d % 2 FROM hits")) == TSV("1\n")
assert TSV(self.cluster.instances['s1_1_0'].query("SELECT DISTINCT d % 2 FROM hits")) == TSV("0\n")
assert self.cluster.instances['s1_0_0'].query("SELECT DISTINCT d % 2 FROM hits").strip() == "1"
assert self.cluster.instances['s1_1_0'].query("SELECT DISTINCT d % 2 FROM hits").strip() == "0"
instance = self.cluster.instances['s0_0_0']
ddl_check_query(instance, "DROP TABLE hits_all ON CLUSTER cluster0")

View File

@ -944,7 +944,22 @@ def move_to_prewhere_and_column_filtering(clickhouse_node, mysql_node, service_n
clickhouse_node.query("CREATE DATABASE cond_on_key_col ENGINE = MaterializeMySQL('{}:3306', 'cond_on_key_col', 'root', 'clickhouse')".format(service_name))
mysql_node.query("create table cond_on_key_col.products (id int primary key, product_id int not null, catalog_id int not null, brand_id int not null, name text)")
mysql_node.query("insert into cond_on_key_col.products (id, name, catalog_id, brand_id, product_id) values (915, 'ertyui', 5287, 15837, 0), (990, 'wer', 1053, 24390, 1), (781, 'qwerty', 1041, 1176, 2);")
mysql_node.query("create table cond_on_key_col.test (id int(11) NOT NULL AUTO_INCREMENT, a int(11) DEFAULT NULL, b int(11) DEFAULT NULL, PRIMARY KEY (id)) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4;")
mysql_node.query("insert into cond_on_key_col.test values (42, 123, 1);")
mysql_node.query("CREATE TABLE cond_on_key_col.balance_change_record (id bigint(20) NOT NULL AUTO_INCREMENT, type tinyint(4) DEFAULT NULL, value decimal(10,4) DEFAULT NULL, time timestamp NULL DEFAULT NULL, "
"initiative_id varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL, passivity_id varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL, "
"person_id varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL, tenant_code varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL, "
"created_time timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', updated_time timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
"value_snapshot decimal(10,4) DEFAULT NULL, PRIMARY KEY (id), KEY balance_change_record_initiative_id (person_id) USING BTREE, "
"KEY type (type) USING BTREE, KEY balance_change_record_type (time) USING BTREE, KEY initiative_id (initiative_id) USING BTREE, "
"KEY balance_change_record_tenant_code (passivity_id) USING BTREE, KEY tenant_code (tenant_code) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=1691049 DEFAULT CHARSET=utf8")
mysql_node.query("insert into cond_on_key_col.balance_change_record values (123, 1, 3.14, null, 'qwe', 'asd', 'zxc', 'rty', null, null, 2.7);")
mysql_node.query("CREATE TABLE cond_on_key_col.test1 (id int(11) NOT NULL AUTO_INCREMENT, c1 varchar(32) NOT NULL, c2 varchar(32), PRIMARY KEY (id)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4")
mysql_node.query("insert into cond_on_key_col.test1(c1,c2) values ('a','b'), ('c', null);")
check_query(clickhouse_node, "SELECT DISTINCT P.id, P.name, P.catalog_id FROM cond_on_key_col.products P WHERE P.name ILIKE '%e%' and P.catalog_id=5287", '915\tertyui\t5287\n')
check_query(clickhouse_node, "select count(a) from cond_on_key_col.test where b = 1;", "1\n")
check_query(clickhouse_node, "select id from cond_on_key_col.balance_change_record where type=1;", "123\n")
check_query(clickhouse_node, "select count(c1) from cond_on_key_col.test1 where c2='b';", "1\n")
clickhouse_node.query("DROP DATABASE cond_on_key_col")
mysql_node.query("DROP DATABASE cond_on_key_col")

View File

@ -99,8 +99,8 @@ def create_distributed_table(node, table_name):
def drop_distributed_table(node, table_name):
node.query("DROP TABLE IF EXISTS {} ON CLUSTER test_cluster".format(table_name))
node.query("DROP TABLE IF EXISTS {}_replicated ON CLUSTER test_cluster".format(table_name))
node.query("DROP TABLE IF EXISTS {} ON CLUSTER test_cluster SYNC".format(table_name))
node.query("DROP TABLE IF EXISTS {}_replicated ON CLUSTER test_cluster SYNC".format(table_name))
time.sleep(1)

View File

@ -17,7 +17,7 @@ def started_cluster():
def test_read_write_storage(started_cluster):
hdfs_api = started_cluster.hdfs_api
node1.query("drop table if exists SimpleHDFSStorage SYNC")
node1.query(
"create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/simple_storage', 'TSV')")
node1.query("insert into SimpleHDFSStorage values (1, 'Mark', 72.53)")

View File

@ -1,55 +1,18 @@
import time
import logging
import pytest
import psycopg2
from multiprocessing.dummy import Pool
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_postgres=True)
node2 = cluster.add_instance('node2', with_postgres_cluster=True)
def get_postgres_conn(cluster, ip, database=False):
if database == True:
conn_string = f"host={ip} port='{cluster.postgres_port}' dbname='clickhouse' user='postgres' password='mysecretpassword'"
else:
conn_string = f"host={ip} port='{cluster.postgres_port}' user='postgres' password='mysecretpassword'"
conn = psycopg2.connect(conn_string)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
conn.autocommit = True
return conn
def create_postgres_db(conn, name):
cursor = conn.cursor()
cursor.execute("DROP DATABASE IF EXISTS {}".format(name))
cursor.execute("CREATE DATABASE {}".format(name))
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
postgres_conn = get_postgres_conn(cluster, ip=cluster.postgres_ip)
print("postgres connected")
create_postgres_db(postgres_conn, 'clickhouse')
postgres_conn = get_postgres_conn(cluster, ip=cluster.postgres2_ip)
print("postgres2 connected")
create_postgres_db(postgres_conn, 'clickhouse')
postgres_conn = get_postgres_conn(cluster, ip=cluster.postgres3_ip)
print("postgres2 connected")
create_postgres_db(postgres_conn, 'clickhouse')
postgres_conn = get_postgres_conn(cluster, ip=cluster.postgres4_ip)
print("postgres2 connected")
create_postgres_db(postgres_conn, 'clickhouse')
print("postgres connected")
yield cluster
finally:
@ -57,43 +20,51 @@ def started_cluster():
def test_postgres_select_insert(started_cluster):
conn = get_postgres_conn(started_cluster, started_cluster.postgres_ip, True)
cursor = conn.cursor()
cursor = started_cluster.postgres_conn.cursor()
table_name = 'test_many'
table = f'''postgresql('{started_cluster.postgres_ip}:{started_cluster.postgres_port}', 'clickhouse', '{table_name}', 'postgres', 'mysecretpassword')'''
cursor.execute('CREATE TABLE IF NOT EXISTS {} (a integer, b text, c integer)'.format(table_name))
table = f'''postgresql('{started_cluster.postgres_ip}:{started_cluster.postgres_port}', 'postgres', '{table_name}', 'postgres', 'mysecretpassword')'''
cursor.execute(f'DROP TABLE IF EXISTS {table_name}')
cursor.execute(f'CREATE TABLE {table_name} (a integer, b text, c integer)')
result = node1.query('''
INSERT INTO TABLE FUNCTION {}
SELECT number, concat('name_', toString(number)), 3 from numbers(10000)'''.format(table))
check1 = "SELECT count() FROM {}".format(table)
check2 = "SELECT Sum(c) FROM {}".format(table)
check3 = "SELECT count(c) FROM {} WHERE a % 2 == 0".format(table)
check4 = "SELECT count() FROM {} WHERE b LIKE concat('name_', toString(1))".format(table)
result = node1.query(f'''
INSERT INTO TABLE FUNCTION {table}
SELECT number, concat('name_', toString(number)), 3 from numbers(10000)''')
check1 = f"SELECT count() FROM {table}"
check2 = f"SELECT Sum(c) FROM {table}"
check3 = f"SELECT count(c) FROM {table} WHERE a % 2 == 0"
check4 = f"SELECT count() FROM {table} WHERE b LIKE concat('name_', toString(1))"
assert (node1.query(check1)).rstrip() == '10000'
assert (node1.query(check2)).rstrip() == '30000'
assert (node1.query(check3)).rstrip() == '5000'
assert (node1.query(check4)).rstrip() == '1'
# Triggers issue https://github.com/ClickHouse/ClickHouse/issues/26088
# for i in range(1, 1000):
# assert (node1.query(check1)).rstrip() == '10000', f"Failed on {i}"
cursor.execute(f'DROP TABLE {table_name} ')
def test_postgres_conversions(started_cluster):
conn = get_postgres_conn(started_cluster, started_cluster.postgres_ip, True)
cursor = conn.cursor()
cursor = started_cluster.postgres_conn.cursor()
cursor.execute(f'DROP TABLE IF EXISTS test_types')
cursor.execute(f'DROP TABLE IF EXISTS test_array_dimensions')
cursor.execute(
'''CREATE TABLE IF NOT EXISTS test_types (
'''CREATE TABLE test_types (
a smallint, b integer, c bigint, d real, e double precision, f serial, g bigserial,
h timestamp, i date, j decimal(5, 3), k numeric, l boolean)''')
node1.query('''
INSERT INTO TABLE FUNCTION postgresql('postgres1:5432', 'clickhouse', 'test_types', 'postgres', 'mysecretpassword') VALUES
INSERT INTO TABLE FUNCTION postgresql('postgres1:5432', 'postgres', 'test_types', 'postgres', 'mysecretpassword') VALUES
(-32768, -2147483648, -9223372036854775808, 1.12345, 1.1234567890, 2147483647, 9223372036854775807, '2000-05-12 12:12:12.012345', '2000-05-12', 22.222, 22.222, 1)''')
result = node1.query('''
SELECT a, b, c, d, e, f, g, h, i, j, toDecimal128(k, 3), l FROM postgresql('postgres1:5432', 'clickhouse', 'test_types', 'postgres', 'mysecretpassword')''')
SELECT a, b, c, d, e, f, g, h, i, j, toDecimal128(k, 3), l FROM postgresql('postgres1:5432', 'postgres', 'test_types', 'postgres', 'mysecretpassword')''')
assert(result == '-32768\t-2147483648\t-9223372036854775808\t1.12345\t1.123456789\t2147483647\t9223372036854775807\t2000-05-12 12:12:12.012345\t2000-05-12\t22.222\t22.222\t1\n')
cursor.execute("INSERT INTO test_types (l) VALUES (TRUE), (true), ('yes'), ('y'), ('1');")
cursor.execute("INSERT INTO test_types (l) VALUES (FALSE), (false), ('no'), ('off'), ('0');")
expected = "1\n1\n1\n1\n1\n1\n0\n0\n0\n0\n0\n"
result = node1.query('''SELECT l FROM postgresql('postgres1:5432', 'clickhouse', 'test_types', 'postgres', 'mysecretpassword')''')
result = node1.query('''SELECT l FROM postgresql('postgres1:5432', 'postgres', 'test_types', 'postgres', 'mysecretpassword')''')
assert(result == expected)
cursor.execute(
@ -112,7 +83,7 @@ def test_postgres_conversions(started_cluster):
)''')
result = node1.query('''
DESCRIBE TABLE postgresql('postgres1:5432', 'clickhouse', 'test_array_dimensions', 'postgres', 'mysecretpassword')''')
DESCRIBE TABLE postgresql('postgres1:5432', 'postgres', 'test_array_dimensions', 'postgres', 'mysecretpassword')''')
expected = ('a\tArray(Date)\t\t\t\t\t\n' +
'b\tArray(DateTime64(6))\t\t\t\t\t\n' +
'c\tArray(Array(Float32))\t\t\t\t\t\n' +
@ -126,7 +97,7 @@ def test_postgres_conversions(started_cluster):
)
assert(result.rstrip() == expected)
node1.query("INSERT INTO TABLE FUNCTION postgresql('postgres1:5432', 'clickhouse', 'test_array_dimensions', 'postgres', 'mysecretpassword') "
node1.query("INSERT INTO TABLE FUNCTION postgresql('postgres1:5432', 'postgres', 'test_array_dimensions', 'postgres', 'mysecretpassword') "
"VALUES ("
"['2000-05-12', '2000-05-12'], "
"['2000-05-12 12:12:12.012345', '2000-05-12 12:12:12.012345'], "
@ -141,7 +112,7 @@ def test_postgres_conversions(started_cluster):
")")
result = node1.query('''
SELECT * FROM postgresql('postgres1:5432', 'clickhouse', 'test_array_dimensions', 'postgres', 'mysecretpassword')''')
SELECT * FROM postgresql('postgres1:5432', 'postgres', 'test_array_dimensions', 'postgres', 'mysecretpassword')''')
expected = (
"['2000-05-12','2000-05-12']\t" +
"['2000-05-12 12:12:12.012345','2000-05-12 12:12:12.012345']\t" +
@ -156,25 +127,33 @@ def test_postgres_conversions(started_cluster):
)
assert(result == expected)
cursor.execute(f'DROP TABLE test_types')
cursor.execute(f'DROP TABLE test_array_dimensions')
def test_non_default_scema(started_cluster):
conn = get_postgres_conn(started_cluster, started_cluster.postgres_ip, True)
cursor = conn.cursor()
node1.query('DROP TABLE IF EXISTS test_pg_table_schema')
node1.query('DROP TABLE IF EXISTS test_pg_table_schema_with_dots')
cursor = started_cluster.postgres_conn.cursor()
cursor.execute('DROP SCHEMA IF EXISTS test_schema CASCADE')
cursor.execute('DROP SCHEMA IF EXISTS "test.nice.schema" CASCADE')
cursor.execute('CREATE SCHEMA test_schema')
cursor.execute('CREATE TABLE test_schema.test_table (a integer)')
cursor.execute('INSERT INTO test_schema.test_table SELECT i FROM generate_series(0, 99) as t(i)')
node1.query('''
CREATE TABLE test_pg_table_schema (a UInt32)
ENGINE PostgreSQL('postgres1:5432', 'clickhouse', 'test_table', 'postgres', 'mysecretpassword', 'test_schema');
ENGINE PostgreSQL('postgres1:5432', 'postgres', 'test_table', 'postgres', 'mysecretpassword', 'test_schema');
''')
result = node1.query('SELECT * FROM test_pg_table_schema')
expected = node1.query('SELECT number FROM numbers(100)')
assert(result == expected)
table_function = '''postgresql('postgres1:5432', 'clickhouse', 'test_table', 'postgres', 'mysecretpassword', 'test_schema')'''
result = node1.query('SELECT * FROM {}'.format(table_function))
table_function = '''postgresql('postgres1:5432', 'postgres', 'test_table', 'postgres', 'mysecretpassword', 'test_schema')'''
result = node1.query(f'SELECT * FROM {table_function}')
assert(result == expected)
cursor.execute('''CREATE SCHEMA "test.nice.schema"''')
@ -183,24 +162,28 @@ def test_non_default_scema(started_cluster):
node1.query('''
CREATE TABLE test_pg_table_schema_with_dots (a UInt32)
ENGINE PostgreSQL('postgres1:5432', 'clickhouse', 'test.nice.table', 'postgres', 'mysecretpassword', 'test.nice.schema');
ENGINE PostgreSQL('postgres1:5432', 'postgres', 'test.nice.table', 'postgres', 'mysecretpassword', 'test.nice.schema');
''')
result = node1.query('SELECT * FROM test_pg_table_schema_with_dots')
assert(result == expected)
cursor.execute('INSERT INTO "test_schema"."test_table" SELECT i FROM generate_series(100, 199) as t(i)')
result = node1.query('SELECT * FROM {}'.format(table_function))
result = node1.query(f'SELECT * FROM {table_function}')
expected = node1.query('SELECT number FROM numbers(200)')
assert(result == expected)
cursor.execute('DROP SCHEMA test_schema CASCADE')
cursor.execute('DROP SCHEMA "test.nice.schema" CASCADE')
node1.query('DROP TABLE test_pg_table_schema')
node1.query('DROP TABLE test_pg_table_schema_with_dots')
def test_concurrent_queries(started_cluster):
conn = get_postgres_conn(started_cluster, started_cluster.postgres_ip, True)
cursor = conn.cursor()
cursor = started_cluster.postgres_conn.cursor()
node1.query('''
CREATE TABLE test_table (key UInt32, value UInt32)
ENGINE = PostgreSQL('postgres1:5432', 'clickhouse', 'test_table', 'postgres', 'mysecretpassword')''')
ENGINE = PostgreSQL('postgres1:5432', 'postgres', 'test_table', 'postgres', 'mysecretpassword')''')
cursor.execute('CREATE TABLE test_table (key integer, value integer)')
@ -212,7 +195,7 @@ def test_concurrent_queries(started_cluster):
p = busy_pool.map_async(node_select, range(20))
p.wait()
count = node1.count_in_log('New connection to postgres1:5432')
print(count, prev_count)
logging.debug(f'count {count}, prev_count {prev_count}')
# 16 is default size for connection pool
assert(int(count) <= int(prev_count) + 16)
@ -224,7 +207,7 @@ def test_concurrent_queries(started_cluster):
p = busy_pool.map_async(node_insert, range(5))
p.wait()
result = node1.query("SELECT count() FROM test_table", user='default')
print(result)
logging.debug(result)
assert(int(result) == 5 * 5 * 1000)
def node_insert_select(_):
@ -236,44 +219,41 @@ def test_concurrent_queries(started_cluster):
p = busy_pool.map_async(node_insert_select, range(5))
p.wait()
result = node1.query("SELECT count() FROM test_table", user='default')
print(result)
logging.debug(result)
assert(int(result) == 5 * 5 * 1000 * 2)
node1.query('DROP TABLE test_table;')
cursor.execute('DROP TABLE test_table;')
count = node1.count_in_log('New connection to postgres1:5432')
print(count, prev_count)
logging.debug(f'count {count}, prev_count {prev_count}')
assert(int(count) <= int(prev_count) + 16)
def test_postgres_distributed(started_cluster):
conn0 = get_postgres_conn(started_cluster, started_cluster.postgres_ip, database=True)
conn1 = get_postgres_conn(started_cluster, started_cluster.postgres2_ip, database=True)
conn2 = get_postgres_conn(started_cluster, started_cluster.postgres3_ip, database=True)
conn3 = get_postgres_conn(started_cluster, started_cluster.postgres4_ip, database=True)
cursor0 = conn0.cursor()
cursor1 = conn1.cursor()
cursor2 = conn2.cursor()
cursor3 = conn3.cursor()
cursor0 = started_cluster.postgres_conn.cursor()
cursor1 = started_cluster.postgres2_conn.cursor()
cursor2 = started_cluster.postgres3_conn.cursor()
cursor3 = started_cluster.postgres4_conn.cursor()
cursors = [cursor0, cursor1, cursor2, cursor3]
for i in range(4):
cursors[i].execute('DROP TABLE IF EXISTS test_replicas')
cursors[i].execute('CREATE TABLE test_replicas (id Integer, name Text)')
cursors[i].execute("""INSERT INTO test_replicas select i, 'host{}' from generate_series(0, 99) as t(i);""".format(i + 1));
cursors[i].execute(f"""INSERT INTO test_replicas select i, 'host{i+1}' from generate_series(0, 99) as t(i);""");
# test multiple ports parsing
result = node2.query('''SELECT DISTINCT(name) FROM postgresql(`postgres{1|2|3}:5432`, 'clickhouse', 'test_replicas', 'postgres', 'mysecretpassword'); ''')
result = node2.query('''SELECT DISTINCT(name) FROM postgresql(`postgres{1|2|3}:5432`, 'postgres', 'test_replicas', 'postgres', 'mysecretpassword'); ''')
assert(result == 'host1\n' or result == 'host2\n' or result == 'host3\n')
result = node2.query('''SELECT DISTINCT(name) FROM postgresql(`postgres2:5431|postgres3:5432`, 'clickhouse', 'test_replicas', 'postgres', 'mysecretpassword'); ''')
result = node2.query('''SELECT DISTINCT(name) FROM postgresql(`postgres2:5431|postgres3:5432`, 'postgres', 'test_replicas', 'postgres', 'mysecretpassword'); ''')
assert(result == 'host3\n' or result == 'host2\n')
# Create storage with with 3 replicas
node2.query('DROP TABLE IF EXISTS test_replicas')
node2.query('''
CREATE TABLE test_replicas
(id UInt32, name String)
ENGINE = PostgreSQL(`postgres{2|3|4}:5432`, 'clickhouse', 'test_replicas', 'postgres', 'mysecretpassword'); ''')
ENGINE = PostgreSQL(`postgres{2|3|4}:5432`, 'postgres', 'test_replicas', 'postgres', 'mysecretpassword'); ''')
# Check all replicas are traversed
query = "SELECT name FROM ("
@ -284,10 +264,12 @@ def test_postgres_distributed(started_cluster):
assert(result == 'host2\nhost3\nhost4\n')
# Create storage with with two two shards, each has 2 replicas
node2.query('DROP TABLE IF EXISTS test_shards')
node2.query('''
CREATE TABLE test_shards
(id UInt32, name String, age UInt32, money UInt32)
ENGINE = ExternalDistributed('PostgreSQL', `postgres{1|2}:5432,postgres{3|4}:5432`, 'clickhouse', 'test_replicas', 'postgres', 'mysecretpassword'); ''')
ENGINE = ExternalDistributed('PostgreSQL', `postgres{1|2}:5432,postgres{3|4}:5432`, 'postgres', 'test_replicas', 'postgres', 'mysecretpassword'); ''')
# Check only one replica in each shard is used
result = node2.query("SELECT DISTINCT(name) FROM test_shards ORDER BY name")
@ -306,26 +288,32 @@ def test_postgres_distributed(started_cluster):
result = node2.query("SELECT DISTINCT(name) FROM test_shards ORDER BY name")
started_cluster.unpause_container('postgres1')
assert(result == 'host2\nhost4\n' or result == 'host3\nhost4\n')
node2.query('DROP TABLE test_shards')
node2.query('DROP TABLE test_replicas')
def test_datetime_with_timezone(started_cluster):
conn = get_postgres_conn(started_cluster, started_cluster.postgres_ip, True)
cursor = conn.cursor()
cursor = started_cluster.postgres_conn.cursor()
cursor.execute("DROP TABLE IF EXISTS test_timezone")
node1.query("DROP TABLE IF EXISTS test_timezone")
cursor.execute("CREATE TABLE test_timezone (ts timestamp without time zone, ts_z timestamp with time zone)")
cursor.execute("insert into test_timezone select '2014-04-04 20:00:00', '2014-04-04 20:00:00'::timestamptz at time zone 'America/New_York';")
cursor.execute("select * from test_timezone")
result = cursor.fetchall()[0]
print(result[0], str(result[1])[:-6])
node1.query("create table test_timezone ( ts DateTime, ts_z DateTime('America/New_York')) ENGINE PostgreSQL('postgres1:5432', 'clickhouse', 'test_timezone', 'postgres', 'mysecretpassword');")
logging.debug(f'{result[0]}, {str(result[1])[:-6]}')
node1.query("create table test_timezone ( ts DateTime, ts_z DateTime('America/New_York')) ENGINE PostgreSQL('postgres1:5432', 'postgres', 'test_timezone', 'postgres', 'mysecretpassword');")
assert(node1.query("select ts from test_timezone").strip() == str(result[0]))
# [:-6] because 2014-04-04 16:00:00+00:00 -> 2014-04-04 16:00:00
assert(node1.query("select ts_z from test_timezone").strip() == str(result[1])[:-6])
assert(node1.query("select * from test_timezone") == "2014-04-04 20:00:00\t2014-04-04 16:00:00\n")
cursor.execute("DROP TABLE test_timezone")
node1.query("DROP TABLE test_timezone")
def test_postgres_ndim(started_cluster):
conn = get_postgres_conn(started_cluster, started_cluster.postgres_ip, True)
cursor = conn.cursor()
cursor = started_cluster.postgres_conn.cursor()
cursor.execute("DROP TABLE IF EXISTS arr1, arr2")
cursor.execute('CREATE TABLE arr1 (a Integer[])')
cursor.execute("INSERT INTO arr1 SELECT '{{1}, {2}}'")
@ -335,8 +323,9 @@ def test_postgres_ndim(started_cluster):
result = cursor.fetchall()[0]
assert(int(result[0]) == 0)
result = node1.query('''SELECT toTypeName(a) FROM postgresql('postgres1:5432', 'clickhouse', 'arr2', 'postgres', 'mysecretpassword')''')
result = node1.query('''SELECT toTypeName(a) FROM postgresql('postgres1:5432', 'postgres', 'arr2', 'postgres', 'mysecretpassword')''')
assert(result.strip() == "Array(Array(Nullable(Int32)))")
cursor.execute("DROP TABLE arr1, arr2")
if __name__ == '__main__':

View File

@ -2032,6 +2032,20 @@ def test_rabbitmq_queue_consume(rabbitmq_cluster):
instance.query('DROP TABLE test.rabbitmq_queue')
def test_rabbitmq_drop_table_with_unfinished_setup(rabbitmq_cluster):
rabbitmq_cluster.pause_container('rabbitmq1')
instance.query('''
CREATE TABLE test.drop (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_exchange_name = 'drop',
rabbitmq_format = 'JSONEachRow';
''')
time.sleep(5)
instance.query('DROP TABLE test.drop;')
rabbitmq_cluster.unpause_container('rabbitmq1')
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -198,12 +198,14 @@ def test_empty_put(started_cluster, auth):
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
drop_empty_table_query = "DROP TABLE IF EXISTS empty_table"
create_empty_table_query = """
CREATE TABLE empty_table (
{}
) ENGINE = Null()
""".format(table_format)
run_query(instance, drop_empty_table_query)
run_query(instance, create_empty_table_query)
filename = "empty_put_test.csv"
@ -305,22 +307,22 @@ def test_put_with_zero_redirect(started_cluster):
def test_put_get_with_globs(started_cluster):
# type: (ClickHouseCluster) -> None
unique_prefix = random.randint(1,10000)
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
max_path = ""
for i in range(10):
for j in range(10):
path = "{}_{}/{}.csv".format(i, random.choice(['a', 'b', 'c', 'd']), j)
path = "{}/{}_{}/{}.csv".format(unique_prefix, i, random.choice(['a', 'b', 'c', 'd']), j)
max_path = max(path, max_path)
values = "({},{},{})".format(i, j, i + j)
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, path, table_format, values)
run_query(instance, query)
query = "select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from s3('http://{}:{}/{}/*_{{a,b,c,d}}/%3f.csv', 'CSV', '{}')".format(
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, table_format)
query = "select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from s3('http://{}:{}/{}/{}/*_{{a,b,c,d}}/%3f.csv', 'CSV', '{}')".format(
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, unique_prefix, table_format)
assert run_query(instance, query).splitlines() == [
"450\t450\t900\t0.csv\t{bucket}/{max_path}".format(bucket=bucket, max_path=max_path)]
@ -479,6 +481,7 @@ def test_custom_auth_headers(started_cluster):
result = run_query(instance, get_query)
assert result == '1\t2\t3\n'
instance.query("DROP TABLE IF EXISTS test")
instance.query(
"CREATE TABLE test ({table_format}) ENGINE = S3('http://resolver:8080/{bucket}/{file}', 'CSV')".format(
bucket=started_cluster.minio_restricted_bucket,
@ -494,6 +497,7 @@ def test_custom_auth_headers(started_cluster):
replace_config("<header>Authorization: Bearer INVALID_TOKEN", "<header>Authorization: Bearer TOKEN")
instance.query("SYSTEM RELOAD CONFIG")
assert run_query(instance, "SELECT * FROM test") == '1\t2\t3\n'
instance.query("DROP TABLE test")
def test_custom_auth_headers_exclusion(started_cluster):
@ -551,6 +555,8 @@ def test_storage_s3_get_gzip(started_cluster, extension, method):
"Norman Ortega,33",
""
]
run_query(instance, f"DROP TABLE IF EXISTS {name}")
buf = io.BytesIO()
compressed = gzip.GzipFile(fileobj=buf, mode="wb")
compressed.write(("\n".join(data)).encode())
@ -562,7 +568,8 @@ def test_storage_s3_get_gzip(started_cluster, extension, method):
'CSV',
'{method}')""")
run_query(instance, "SELECT sum(id) FROM {}".format(name)).splitlines() == ["565"]
run_query(instance, f"SELECT sum(id) FROM {name}").splitlines() == ["565"]
run_query(instance, f"DROP TABLE {name}")
def test_storage_s3_get_unstable(started_cluster):

View File

@ -26,6 +26,7 @@ def start_cluster():
def test_mutate_and_upgrade(start_cluster):
for node in [node1, node2]:
node.query("DROP TABLE IF EXISTS mt")
node.query(
"CREATE TABLE mt (EventDate Date, id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t', '{}') ORDER BY tuple()".format(
node.name))
@ -67,8 +68,13 @@ def test_mutate_and_upgrade(start_cluster):
assert node1.query("SELECT id FROM mt") == "1\n4\n"
assert node2.query("SELECT id FROM mt") == "1\n4\n"
for node in [node1, node2]:
node.query("DROP TABLE mt")
def test_upgrade_while_mutation(start_cluster):
node3.query("DROP TABLE IF EXISTS mt1")
node3.query(
"CREATE TABLE mt1 (EventDate Date, id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t1', 'node3') ORDER BY tuple()")
@ -86,3 +92,5 @@ def test_upgrade_while_mutation(start_cluster):
# will delete nothing, but previous async mutation will finish with this query
assert_eq_with_retry(node3, "SELECT COUNT() from mt1", "50000\n")
node3.query("DROP TABLE mt1")

View File

@ -1,102 +0,0 @@
=== hash ===
= full =
1 1
2 2
-1 1
1 \N
1 257
1 -1
= left =
1 1
2 2
= right =
1 1
-1 1
1 \N
1 257
1 -1
= inner =
1 1
= full =
1 1 1 1
2 2 0 \N
0 0 -1 1
0 0 1 \N
0 0 1 257
0 0 1 -1
= left =
1 1 1 1
2 2 0 \N
= right =
1 1 1 1
0 0 -1 1
0 0 1 \N
0 0 1 257
0 0 1 -1
= inner =
1 1 1 1
= agg =
5 260
3 3
3 258
1 1
5 260
3 3
3 258
1 1
= types =
1
1
1
1
=== partial_merge ===
= full =
1 1
2 2
-1 1
1 \N
1 257
1 -1
= left =
1 1
2 2
= right =
1 1
-1 1
1 \N
1 257
1 -1
= inner =
1 1
= full =
1 1 1 1
2 2 0 \N
0 0 -1 1
0 0 1 \N
0 0 1 257
0 0 1 -1
= left =
1 1 1 1
2 2 0 \N
= right =
1 1 1 1
0 0 -1 1
0 0 1 \N
0 0 1 257
0 0 1 -1
= inner =
1 1 1 1
= agg =
5 260
3 3
3 258
1 1
5 260
3 3
3 258
1 1
= types =
1
1
1
1

View File

@ -0,0 +1,53 @@
{% for join_type in ['hash', 'partial_merge'] -%}
=== {{ join_type }} ===
= full =
1 1
2 2
-1 1
1 \N
1 257
1 -1
= left =
1 1
2 2
= right =
1 1
-1 1
1 \N
1 257
1 -1
= inner =
1 1
= full =
1 1 1 1
2 2 0 \N
0 0 -1 1
0 0 1 \N
0 0 1 257
0 0 1 -1
= left =
1 1 1 1
2 2 0 \N
= right =
1 1 1 1
0 0 -1 1
0 0 1 \N
0 0 1 257
0 0 1 -1
= inner =
1 1 1 1
= agg =
5 260
3 3
3 258
1 1
5 260
3 3
3 258
1 1
= types =
1
1
1
1
{% endfor -%}

View File

@ -6,9 +6,11 @@ CREATE TABLE t_ab2 (id Nullable(Int32), a Int16, b Nullable(Int64)) ENGINE = Tin
INSERT INTO t_ab1 VALUES (0, 1, 1), (1, 2, 2);
INSERT INTO t_ab2 VALUES (2, -1, 1), (3, 1, NULL), (4, 1, 257), (5, 1, -1), (6, 1, 1);
SELECT '=== hash ===';
{% for join_type in ['hash', 'partial_merge'] -%}
SET join_algorithm = 'hash';
SELECT '=== {{ join_type }} ===';
SET join_algorithm = '{{ join_type }}';
SELECT '= full =';
SELECT a, b FROM t_ab1 FULL JOIN t_ab2 USING (a, b) ORDER BY ifNull(t_ab1.id, t_ab2.id);
@ -49,48 +51,7 @@ SELECT any(toTypeName(a)) == 'Int32' AND any(toTypeName(b)) == 'Nullable(Int64)'
SELECT * FROM ( SELECT a, b as "CAST(a, Int32)" FROM t_ab1 ) t_ab1 FULL JOIN t_ab2 ON (t_ab1.a == t_ab2.a); -- { serverError 44 }
SELECT * FROM ( SELECT a, b as "CAST(a, Int32)" FROM t_ab1 ) t_ab1 FULL JOIN t_ab2 USING (a) FORMAT Null;
SELECT '=== partial_merge ===';
SET join_algorithm = 'partial_merge';
SELECT '= full =';
SELECT a, b FROM t_ab1 FULL JOIN t_ab2 USING (a, b) ORDER BY ifNull(t_ab1.id, t_ab2.id);
SELECT '= left =';
SELECT a, b FROM t_ab1 LEFT JOIN t_ab2 USING (a, b) ORDER BY ifNull(t_ab1.id, t_ab2.id);
SELECT '= right =';
SELECT a, b FROM t_ab1 RIGHT JOIN t_ab2 USING (a, b) ORDER BY ifNull(t_ab1.id, t_ab2.id);
SELECT '= inner =';
SELECT a, b FROM t_ab1 INNER JOIN t_ab2 USING (a, b) ORDER BY ifNull(t_ab1.id, t_ab2.id);
SELECT '= full =';
SELECT a, b, t_ab2.a, t_ab2.b FROM t_ab1 FULL JOIN t_ab2 ON (t_ab1.a == t_ab2.a AND t_ab1.b == t_ab2.b) ORDER BY ifNull(t_ab1.id, t_ab2.id);
SELECT '= left =';
SELECT a, b, t_ab2.a, t_ab2.b FROM t_ab1 LEFT JOIN t_ab2 ON (t_ab1.a == t_ab2.a AND t_ab1.b == t_ab2.b) ORDER BY ifNull(t_ab1.id, t_ab2.id);
SELECT '= right =';
SELECT a, b, t_ab2.a, t_ab2.b FROM t_ab1 RIGHT JOIN t_ab2 ON (t_ab1.a == t_ab2.a AND t_ab1.b == t_ab2.b) ORDER BY ifNull(t_ab1.id, t_ab2.id);
SELECT '= inner =';
SELECT a, b, t_ab2.a, t_ab2.b FROM t_ab1 INNER JOIN t_ab2 ON (t_ab1.a == t_ab2.a AND t_ab1.b == t_ab2.b) ORDER BY ifNull(t_ab1.id, t_ab2.id);
SELECT '= agg =';
SELECT sum(a), sum(b) FROM t_ab1 FULL JOIN t_ab2 USING (a, b);
SELECT sum(a), sum(b) FROM t_ab1 LEFT JOIN t_ab2 USING (a, b);
SELECT sum(a), sum(b) FROM t_ab1 RIGHT JOIN t_ab2 USING (a, b);
SELECT sum(a), sum(b) FROM t_ab1 INNER JOIN t_ab2 USING (a, b);
SELECT sum(a) + sum(t_ab2.a) - 1, sum(b) + sum(t_ab2.b) - 1 FROM t_ab1 FULL JOIN t_ab2 ON (t_ab1.a == t_ab2.a AND t_ab1.b == t_ab2.b);
SELECT sum(a) + sum(t_ab2.a) - 1, sum(b) + sum(t_ab2.b) - 1 FROM t_ab1 LEFT JOIN t_ab2 ON (t_ab1.a == t_ab2.a AND t_ab1.b == t_ab2.b);
SELECT sum(a) + sum(t_ab2.a) - 1, sum(b) + sum(t_ab2.b) - 1 FROM t_ab1 RIGHT JOIN t_ab2 ON (t_ab1.a == t_ab2.a AND t_ab1.b == t_ab2.b);
SELECT sum(a) + sum(t_ab2.a) - 1, sum(b) + sum(t_ab2.b) - 1 FROM t_ab1 INNER JOIN t_ab2 ON (t_ab1.a == t_ab2.a AND t_ab1.b == t_ab2.b);
SELECT '= types =';
SELECT any(toTypeName(a)) == 'Int32' AND any(toTypeName(b)) == 'Nullable(Int64)' FROM t_ab1 FULL JOIN t_ab2 USING (a, b);
SELECT any(toTypeName(a)) == 'Int32' AND any(toTypeName(b)) == 'Nullable(Int64)' FROM t_ab1 LEFT JOIN t_ab2 USING (a, b);
SELECT any(toTypeName(a)) == 'Int32' AND any(toTypeName(b)) == 'Nullable(Int64)' FROM t_ab1 RIGHT JOIN t_ab2 USING (a, b);
SELECT any(toTypeName(a)) == 'Int32' AND any(toTypeName(b)) == 'Nullable(Int64)' FROM t_ab1 INNER JOIN t_ab2 USING (a, b);
SELECT * FROM ( SELECT a, b as "CAST(a, Int32)" FROM t_ab1 ) t_ab1 FULL JOIN t_ab2 ON (t_ab1.a == t_ab2.a); -- { serverError 44 }
SELECT * FROM ( SELECT a, b as "CAST(a, Int32)" FROM t_ab1 ) t_ab1 FULL JOIN t_ab2 USING (a) FORMAT Null;
{% endfor %}
DROP TABLE IF EXISTS t_ab1;
DROP TABLE IF EXISTS t_ab2;

View File

@ -400,7 +400,7 @@
1
1
1
=== switch ===
=== auto ===
= full =
-4 0 196
-3 0 197

View File

@ -7,159 +7,14 @@ CREATE TABLE t2 (a Int16, b Nullable(Int64)) ENGINE = TinyLog;
INSERT INTO t1 SELECT number as a, 100 + number as b FROM system.numbers LIMIT 1, 10;
INSERT INTO t2 SELECT number - 5 as a, 200 + number - 5 as b FROM system.numbers LIMIT 1, 10;
SELECT '=== hash ===';
SET join_algorithm = 'hash';
{% for join_type in ['hash', 'partial_merge', 'auto'] -%}
SELECT '= full =';
SELECT a, b, t2.b FROM t1 FULL JOIN t2 USING (a) ORDER BY (a);
SELECT '= left =';
SELECT a, b, t2.b FROM t1 LEFT JOIN t2 USING (a) ORDER BY (a);
SELECT '= right =';
SELECT a, b, t2.b FROM t1 RIGHT JOIN t2 USING (a) ORDER BY (a);
SELECT '= inner =';
SELECT a, b, t2.b FROM t1 INNER JOIN t2 USING (a) ORDER BY (a);
SELECT '=== {{ join_type }} ===';
SET join_algorithm = '{{ join_type }}';
SELECT '= full =';
SELECT a, t1.a, t2.a FROM t1 FULL JOIN t2 USING (a) ORDER BY (t1.a, t2.a);
SELECT '= left =';
SELECT a, t1.a, t2.a FROM t1 LEFT JOIN t2 USING (a) ORDER BY (t1.a, t2.a);
SELECT '= right =';
SELECT a, t1.a, t2.a FROM t1 RIGHT JOIN t2 USING (a) ORDER BY (t1.a, t2.a);
SELECT '= inner =';
SELECT a, t1.a, t2.a FROM t1 INNER JOIN t2 USING (a) ORDER BY (t1.a, t2.a);
SELECT '= join on =';
SELECT '= full =';
SELECT a, b, t2.a, t2.b FROM t1 FULL JOIN t2 ON (t1.a == t2.a) ORDER BY (t1.a, t2.a);
SELECT '= left =';
SELECT a, b, t2.a, t2.b FROM t1 LEFT JOIN t2 ON (t1.a == t2.a) ORDER BY (t1.a, t2.a);
SELECT '= right =';
SELECT a, b, t2.a, t2.b FROM t1 RIGHT JOIN t2 ON (t1.a == t2.a) ORDER BY (t1.a, t2.a);
SELECT '= inner =';
SELECT a, b, t2.a, t2.b FROM t1 INNER JOIN t2 ON (t1.a == t2.a) ORDER BY (t1.a, t2.a);
SELECT '= full =';
SELECT * FROM t1 FULL JOIN t2 ON (t1.a + t1.b = t2.a + t2.b - 100) ORDER BY (t1.a, t2.a);
SELECT '= left =';
SELECT * FROM t1 LEFT JOIN t2 ON (t1.a + t1.b = t2.a + t2.b - 100) ORDER BY (t1.a, t2.a);
SELECT '= right =';
SELECT * FROM t1 RIGHT JOIN t2 ON (t1.a + t1.b = t2.a + t2.b - 100) ORDER BY (t1.a, t2.a);
SELECT '= inner =';
SELECT * FROM t1 INNER JOIN t2 ON (t1.a + t1.b = t2.a + t2.b - 100) ORDER BY (t1.a, t2.a);
-- Int64 and UInt64 has no supertype
SELECT * FROM t1 FULL JOIN t2 ON (t1.a + t1.b + 100 = t2.a + t2.b) ORDER BY (t1.a, t2.a); -- { serverError 53 }
SELECT * FROM t1 LEFT JOIN t2 ON (t1.a + t1.b + 100 = t2.a + t2.b) ORDER BY (t1.a, t2.a); -- { serverError 53 }
SELECT * FROM t1 RIGHT JOIN t2 ON (t1.a + t1.b + 100 = t2.a + t2.b) ORDER BY (t1.a, t2.a); -- { serverError 53 }
SELECT * FROM t1 INNER JOIN t2 ON (t1.a + t1.b + 100 = t2.a + t2.b) ORDER BY (t1.a, t2.a); -- { serverError 53 }
SELECT '= agg =';
SELECT sum(a) == 7 FROM t1 FULL JOIN t2 USING (a) WHERE b > 102 AND t2.b <= 204;
SELECT sum(a) == 7 FROM t1 INNER JOIN t2 USING (a) WHERE b > 102 AND t2.b <= 204;
SELECT sum(b) = 103 FROM t1 LEFT JOIN t2 USING (a) WHERE b > 102 AND t2.b < 204;
SELECT sum(t2.b) = 203 FROM t1 RIGHT JOIN t2 USING (a) WHERE b > 102 AND t2.b < 204;
SELECT sum(a) == 2 + 3 + 4 FROM t1 FULL JOIN t2 ON (t1.a + t1.b = t2.a + t2.b - 100) WHERE t1.b < 105 AND t2.b > 201;
SELECT sum(a) == 55 FROM t1 FULL JOIN t2 ON (t1.a + t1.b = t2.a + t2.b - 100) WHERE 1;
SELECT a > 0, sum(a), sum(b) FROM t1 FULL JOIN t2 USING (a) GROUP BY (a > 0) ORDER BY a > 0;
SELECT a > 0, sum(a), sum(t2.a), sum(b), sum(t2.b) FROM t1 FULL JOIN t2 ON (t1.a == t2.a) GROUP BY (a > 0) ORDER BY a > 0;
SELECT '= types =';
SELECT any(toTypeName(a)) == 'Int32' AND any(toTypeName(t2.a)) == 'Int32' FROM t1 FULL JOIN t2 USING (a);
SELECT any(toTypeName(a)) == 'Int32' AND any(toTypeName(t2.a)) == 'Int32' FROM t1 LEFT JOIN t2 USING (a);
SELECT any(toTypeName(a)) == 'Int32' AND any(toTypeName(t2.a)) == 'Int32' FROM t1 RIGHT JOIN t2 USING (a);
SELECT any(toTypeName(a)) == 'Int32' AND any(toTypeName(t2.a)) == 'Int32' FROM t1 INNER JOIN t2 USING (a);
SELECT toTypeName(any(a)) == 'Int32' AND toTypeName(any(t2.a)) == 'Int32' FROM t1 FULL JOIN t2 USING (a);
SELECT min(toTypeName(a) == 'Int32' AND toTypeName(t2.a) == 'Int32') FROM t1 FULL JOIN t2 USING (a);
SELECT any(toTypeName(a)) == 'UInt16' AND any(toTypeName(t2.a)) == 'Int16' FROM t1 FULL JOIN t2 ON (t1.a == t2.a);
SELECT any(toTypeName(a)) == 'UInt16' AND any(toTypeName(t2.a)) == 'Int16' FROM t1 LEFT JOIN t2 ON (t1.a == t2.a);
SELECT any(toTypeName(a)) == 'UInt16' AND any(toTypeName(t2.a)) == 'Int16' FROM t1 RIGHT JOIN t2 ON (t1.a == t2.a);
SELECT any(toTypeName(a)) == 'UInt16' AND any(toTypeName(t2.a)) == 'Int16' FROM t1 INNER JOIN t2 ON (t1.a == t2.a);
SELECT toTypeName(any(a)) == 'UInt16' AND toTypeName(any(t2.a)) == 'Int16' FROM t1 FULL JOIN t2 ON (t1.a == t2.a);
SELECT '=== partial_merge ===';
SET join_algorithm = 'partial_merge';
SELECT '= full =';
SELECT a, b, t2.b FROM t1 FULL JOIN t2 USING (a) ORDER BY (a);
SELECT '= left =';
SELECT a, b, t2.b FROM t1 LEFT JOIN t2 USING (a) ORDER BY (a);
SELECT '= right =';
SELECT a, b, t2.b FROM t1 RIGHT JOIN t2 USING (a) ORDER BY (a);
SELECT '= inner =';
SELECT a, b, t2.b FROM t1 INNER JOIN t2 USING (a) ORDER BY (a);
SELECT '= full =';
SELECT a, t1.a, t2.a FROM t1 FULL JOIN t2 USING (a) ORDER BY (t1.a, t2.a);
SELECT '= left =';
SELECT a, t1.a, t2.a FROM t1 LEFT JOIN t2 USING (a) ORDER BY (t1.a, t2.a);
SELECT '= right =';
SELECT a, t1.a, t2.a FROM t1 RIGHT JOIN t2 USING (a) ORDER BY (t1.a, t2.a);
SELECT '= inner =';
SELECT a, t1.a, t2.a FROM t1 INNER JOIN t2 USING (a) ORDER BY (t1.a, t2.a);
SELECT '= join on =';
SELECT '= full =';
SELECT a, b, t2.a, t2.b FROM t1 FULL JOIN t2 ON (t1.a == t2.a) ORDER BY (t1.a, t2.a);
SELECT '= left =';
SELECT a, b, t2.a, t2.b FROM t1 LEFT JOIN t2 ON (t1.a == t2.a) ORDER BY (t1.a, t2.a);
SELECT '= right =';
SELECT a, b, t2.a, t2.b FROM t1 RIGHT JOIN t2 ON (t1.a == t2.a) ORDER BY (t1.a, t2.a);
SELECT '= inner =';
SELECT a, b, t2.a, t2.b FROM t1 INNER JOIN t2 ON (t1.a == t2.a) ORDER BY (t1.a, t2.a);
SELECT '= full =';
SELECT * FROM t1 FULL JOIN t2 ON (t1.a + t1.b = t2.a + t2.b - 100) ORDER BY (t1.a, t2.a);
SELECT '= left =';
SELECT * FROM t1 LEFT JOIN t2 ON (t1.a + t1.b = t2.a + t2.b - 100) ORDER BY (t1.a, t2.a);
SELECT '= right =';
SELECT * FROM t1 RIGHT JOIN t2 ON (t1.a + t1.b = t2.a + t2.b - 100) ORDER BY (t1.a, t2.a);
SELECT '= inner =';
SELECT * FROM t1 INNER JOIN t2 ON (t1.a + t1.b = t2.a + t2.b - 100) ORDER BY (t1.a, t2.a);
-- Int64 and UInt64 has no supertype
SELECT * FROM t1 FULL JOIN t2 ON (t1.a + t1.b + 100 = t2.a + t2.b) ORDER BY (t1.a, t2.a); -- { serverError 53 }
SELECT * FROM t1 LEFT JOIN t2 ON (t1.a + t1.b + 100 = t2.a + t2.b) ORDER BY (t1.a, t2.a); -- { serverError 53 }
SELECT * FROM t1 RIGHT JOIN t2 ON (t1.a + t1.b + 100 = t2.a + t2.b) ORDER BY (t1.a, t2.a); -- { serverError 53 }
SELECT * FROM t1 INNER JOIN t2 ON (t1.a + t1.b + 100 = t2.a + t2.b) ORDER BY (t1.a, t2.a); -- { serverError 53 }
SELECT '= agg =';
SELECT sum(a) == 7 FROM t1 FULL JOIN t2 USING (a) WHERE b > 102 AND t2.b <= 204;
SELECT sum(a) == 7 FROM t1 INNER JOIN t2 USING (a) WHERE b > 102 AND t2.b <= 204;
SELECT sum(b) = 103 FROM t1 LEFT JOIN t2 USING (a) WHERE b > 102 AND t2.b < 204;
SELECT sum(t2.b) = 203 FROM t1 RIGHT JOIN t2 USING (a) WHERE b > 102 AND t2.b < 204;
SELECT sum(a) == 2 + 3 + 4 FROM t1 FULL JOIN t2 ON (t1.a + t1.b = t2.a + t2.b - 100) WHERE t1.b < 105 AND t2.b > 201;
SELECT sum(a) == 55 FROM t1 FULL JOIN t2 ON (t1.a + t1.b = t2.a + t2.b - 100) WHERE 1;
SELECT a > 0, sum(a), sum(b) FROM t1 FULL JOIN t2 USING (a) GROUP BY (a > 0) ORDER BY a > 0;
SELECT a > 0, sum(a), sum(t2.a), sum(b), sum(t2.b) FROM t1 FULL JOIN t2 ON (t1.a == t2.a) GROUP BY (a > 0) ORDER BY a > 0;
SELECT '= types =';
SELECT any(toTypeName(a)) == 'Int32' AND any(toTypeName(t2.a)) == 'Int32' FROM t1 FULL JOIN t2 USING (a);
SELECT any(toTypeName(a)) == 'Int32' AND any(toTypeName(t2.a)) == 'Int32' FROM t1 LEFT JOIN t2 USING (a);
SELECT any(toTypeName(a)) == 'Int32' AND any(toTypeName(t2.a)) == 'Int32' FROM t1 RIGHT JOIN t2 USING (a);
SELECT any(toTypeName(a)) == 'Int32' AND any(toTypeName(t2.a)) == 'Int32' FROM t1 INNER JOIN t2 USING (a);
SELECT toTypeName(any(a)) == 'Int32' AND toTypeName(any(t2.a)) == 'Int32' FROM t1 FULL JOIN t2 USING (a);
SELECT min(toTypeName(a) == 'Int32' AND toTypeName(t2.a) == 'Int32') FROM t1 FULL JOIN t2 USING (a);
SELECT any(toTypeName(a)) == 'UInt16' AND any(toTypeName(t2.a)) == 'Int16' FROM t1 FULL JOIN t2 ON (t1.a == t2.a);
SELECT any(toTypeName(a)) == 'UInt16' AND any(toTypeName(t2.a)) == 'Int16' FROM t1 LEFT JOIN t2 ON (t1.a == t2.a);
SELECT any(toTypeName(a)) == 'UInt16' AND any(toTypeName(t2.a)) == 'Int16' FROM t1 RIGHT JOIN t2 ON (t1.a == t2.a);
SELECT any(toTypeName(a)) == 'UInt16' AND any(toTypeName(t2.a)) == 'Int16' FROM t1 INNER JOIN t2 ON (t1.a == t2.a);
SELECT toTypeName(any(a)) == 'UInt16' AND toTypeName(any(t2.a)) == 'Int16' FROM t1 FULL JOIN t2 ON (t1.a == t2.a);
SELECT '=== switch ===';
SET join_algorithm = 'auto';
{% if join_type == 'auto' -%}
SET max_bytes_in_join = 100;
{% endif -%}
SELECT '= full =';
SELECT a, b, t2.b FROM t1 FULL JOIN t2 USING (a) ORDER BY (a);
@ -232,7 +87,11 @@ SELECT any(toTypeName(a)) == 'UInt16' AND any(toTypeName(t2.a)) == 'Int16' FROM
SELECT any(toTypeName(a)) == 'UInt16' AND any(toTypeName(t2.a)) == 'Int16' FROM t1 INNER JOIN t2 ON (t1.a == t2.a);
SELECT toTypeName(any(a)) == 'UInt16' AND toTypeName(any(t2.a)) == 'Int16' FROM t1 FULL JOIN t2 ON (t1.a == t2.a);
{% if join_type == 'auto' -%}
SET max_bytes_in_join = 0;
{% endif -%}
{% endfor -%}
SELECT '=== join use nulls ===';

View File

@ -1,16 +0,0 @@
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1

View File

@ -0,0 +1,3 @@
{% for i in range(24) -%}
1
{% endfor -%}

View File

@ -1,19 +0,0 @@
SET max_block_size = 6, join_algorithm = 'partial_merge';
SELECT count() == 4 FROM (SELECT 1 AS s) AS js1 ALL RIGHT JOIN (SELECT arrayJoin([2, 2, 2, 2]) AS s) AS js2 USING (s);
SELECT count() == 5 FROM (SELECT 1 AS s) AS js1 ALL RIGHT JOIN (SELECT arrayJoin([2, 2, 2, 2, 2]) AS s) AS js2 USING (s);
SELECT count() == 6 FROM (SELECT 1 AS s) AS js1 ALL RIGHT JOIN (SELECT arrayJoin([2, 2, 2, 2, 2, 2]) AS s) AS js2 USING (s);
SELECT count() == 7 FROM (SELECT 1 AS s) AS js1 ALL RIGHT JOIN (SELECT arrayJoin([2, 2, 2, 2, 2, 2, 2]) AS s) AS js2 USING (s);
SELECT count() == 8 FROM (SELECT 1 AS s) AS js1 ALL RIGHT JOIN (SELECT arrayJoin([2, 2, 2, 2, 2, 2, 2, 3]) AS s) AS js2 USING (s);
SELECT count() == 9 FROM (SELECT 1 AS s) AS js1 ALL RIGHT JOIN (SELECT arrayJoin([2, 2, 2, 2, 2, 2, 2, 3, 3]) AS s) AS js2 USING (s);
SELECT count() == 10 FROM (SELECT 1 AS s) AS js1 ALL RIGHT JOIN (SELECT arrayJoin([2, 2, 2, 2, 2, 2, 2, 3, 3, 3]) AS s) AS js2 USING (s);
SELECT count() == 11 FROM (SELECT 1 AS s) AS js1 ALL RIGHT JOIN (SELECT arrayJoin([2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3]) AS s) AS js2 USING (s);
SELECT count() == 12 FROM (SELECT 1 AS s) AS js1 ALL RIGHT JOIN (SELECT arrayJoin([2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3]) AS s) AS js2 USING (s);
SELECT count() == 13 FROM (SELECT 1 AS s) AS js1 ALL RIGHT JOIN (SELECT arrayJoin([2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3]) AS s) AS js2 USING (s);
SELECT count() == 14 FROM (SELECT 1 AS s) AS js1 ALL RIGHT JOIN (SELECT arrayJoin([2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3]) AS s) AS js2 USING (s);
SELECT count() == 15 FROM (SELECT 1 AS s) AS js1 ALL RIGHT JOIN (SELECT arrayJoin([2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3]) AS s) AS js2 USING (s);
SELECT count() == 8 FROM (SELECT 1 AS s) AS js1 FULL JOIN (SELECT arrayJoin([2, 2, 2, 2, 2, 2, 2]) AS s) AS js2 USING (s);
SELECT count() == 9 FROM (SELECT 1 AS s) AS js1 FULL JOIN (SELECT arrayJoin([2, 2, 2, 2, 2, 2, 2, 3]) AS s) AS js2 USING (s);
SELECT count() == 10 FROM (SELECT 1 AS s) AS js1 FULL JOIN (SELECT arrayJoin([2, 2, 2, 2, 2, 2, 2, 3, 3]) AS s) AS js2 USING (s);
SELECT count() == 11 FROM (SELECT 1 AS s) AS js1 FULL JOIN (SELECT arrayJoin([2, 2, 2, 2, 2, 2, 2, 3, 3, 3]) AS s) AS js2 USING (s);

View File

@ -0,0 +1,9 @@
SET max_block_size = 6, join_algorithm = 'partial_merge';
{% for i in range(4, 16) -%}
SELECT count() == {{ i }} FROM (SELECT 100 AS s) AS js1 ALL RIGHT JOIN ( SELECT number AS s FROM numbers({{ i }}) ) AS js2 USING (s);
{% endfor -%}
{% for i in range(4, 16) -%}
SELECT count() == {{ i + 1 }} FROM (SELECT 100 AS s) AS js1 ALL FULL JOIN ( SELECT number AS s FROM numbers({{ i }}) ) AS js2 USING (s);
{% endfor -%}

View File

@ -0,0 +1,20 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# See 01658_read_file_to_string_column.sh
user_files_path=$(clickhouse-client --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
mkdir -p ${user_files_path}/
cp $CUR_DIR/data_zstd/test_01946.zstd ${user_files_path}/
${CLICKHOUSE_CLIENT} --multiline --multiquery --query "
set max_read_buffer_size = 65536;
set input_format_parallel_parsing = 0;
select * from file('test_01946.zstd', 'JSONEachRow', 'foo String') limit 30 format Null;
set input_format_parallel_parsing = 1;
select * from file('test_01946.zstd', 'JSONEachRow', 'foo String') limit 30 format Null;
"

View File

@ -0,0 +1,2 @@
0 Value
0 Value

View File

@ -0,0 +1,38 @@
DROP DATABASE IF EXISTS `01945.db`;
CREATE DATABASE `01945.db`;
CREATE TABLE `01945.db`.test_dictionary_values
(
id UInt64,
value String
) ENGINE=TinyLog;
INSERT INTO `01945.db`.test_dictionary_values VALUES (0, 'Value');
CREATE DICTIONARY `01945.db`.test_dictionary
(
id UInt64,
value String
)
PRIMARY KEY id
LAYOUT(DIRECT())
SOURCE(CLICKHOUSE(DB '01945.db' TABLE 'test_dictionary_values'));
SELECT * FROM `01945.db`.test_dictionary;
DROP DICTIONARY `01945.db`.test_dictionary;
CREATE DICTIONARY `01945.db`.`test_dictionary.test`
(
id UInt64,
value String
)
PRIMARY KEY id
LAYOUT(DIRECT())
SOURCE(CLICKHOUSE(DB '01945.db' TABLE 'test_dictionary_values'));
SELECT * FROM `01945.db`.`test_dictionary.test`;
DROP DICTIONARY `01945.db`.`test_dictionary.test`;
DROP TABLE `01945.db`.test_dictionary_values;
DROP DATABASE `01945.db`;

Binary file not shown.

View File

@ -516,6 +516,7 @@
"01925_test_storage_merge_aliases",
"01933_client_replxx_convert_history", /// Uses non unique history file
"01902_table_function_merge_db_repr",
"01946_test_zstd_decompression_with_escape_sequence_at_the_end_of_buffer",
"01946_test_wrong_host_name_access"
]
}