diff --git a/CHANGELOG.md b/CHANGELOG.md
index babb5ebca8d..718aa751cc2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,4 @@
-### ClickHouse release v21.10, 2021-10-08
+### ClickHouse release v21.10, 2021-10-14
#### Backward Incompatible Change
@@ -110,6 +110,7 @@
* Fix the issue that in case of some sophisticated query with column aliases identical to the names of expressions, bad cast may happen. This fixes [#25447](https://github.com/ClickHouse/ClickHouse/issues/25447). This fixes [#26914](https://github.com/ClickHouse/ClickHouse/issues/26914). This fix may introduce backward incompatibility: if there are different expressions with identical names, exception will be thrown. It may break some rare cases when `enable_optimize_predicate_expression` is set. [#26639](https://github.com/ClickHouse/ClickHouse/pull/26639) ([alexey-milovidov](https://github.com/alexey-milovidov)).
* Now, scalar subquery always returns `Nullable` result if it's type can be `Nullable`. It is needed because in case of empty subquery it's result should be `Null`. Previously, it was possible to get error about incompatible types (type deduction does not execute scalar subquery, and it could use not-nullable type). Scalar subquery with empty result which can't be converted to `Nullable` (like `Array` or `Tuple`) now throws error. Fixes [#25411](https://github.com/ClickHouse/ClickHouse/issues/25411). [#26423](https://github.com/ClickHouse/ClickHouse/pull/26423) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Introduce syntax for here documents. Example `SELECT $doc$ VALUE $doc$`. [#26671](https://github.com/ClickHouse/ClickHouse/pull/26671) ([Maksim Kita](https://github.com/kitaisreal)). This change is backward incompatible if in query there are identifiers that contain `$` [#28768](https://github.com/ClickHouse/ClickHouse/issues/28768).
+* Now indices can handle Nullable types, including `isNull` and `isNotNull`. [#12433](https://github.com/ClickHouse/ClickHouse/pull/12433) and [#12455](https://github.com/ClickHouse/ClickHouse/pull/12455) ([Amos Bird](https://github.com/amosbird)) and [#27250](https://github.com/ClickHouse/ClickHouse/pull/27250) ([Azat Khuzhin](https://github.com/azat)). But this was done with on-disk format changes, and even though new server can read old data, old server cannot. Also, in case you have `MINMAX` data skipping indices, you may get `Data after mutation/merge is not byte-identical` error, since new index will have `.idx2` extension while before it was `.idx`. That said, that you should not delay updating all existing replicas, in this case, otherwise, if old replica (<21.9) will download data from new replica with 21.9+ it will not be able to apply index for downloaded part.
#### New Feature
@@ -179,7 +180,6 @@
* Add setting `log_formatted_queries` to log additional formatted query into `system.query_log`. It's useful for normalized query analysis because functions like `normalizeQuery` and `normalizeQueryKeepNames` don't parse/format queries in order to achieve better performance. [#27380](https://github.com/ClickHouse/ClickHouse/pull/27380) ([Amos Bird](https://github.com/amosbird)).
* Add two settings `max_hyperscan_regexp_length` and `max_hyperscan_regexp_total_length` to prevent huge regexp being used in hyperscan related functions, such as `multiMatchAny`. [#27378](https://github.com/ClickHouse/ClickHouse/pull/27378) ([Amos Bird](https://github.com/amosbird)).
* Memory consumed by bitmap aggregate functions now is taken into account for memory limits. This closes [#26555](https://github.com/ClickHouse/ClickHouse/issues/26555). [#27252](https://github.com/ClickHouse/ClickHouse/pull/27252) ([alexey-milovidov](https://github.com/alexey-milovidov)).
-* Add new index data skipping minmax index format for proper Nullable support. [#27250](https://github.com/ClickHouse/ClickHouse/pull/27250) ([Azat Khuzhin](https://github.com/azat)).
* Add 10 seconds cache for S3 proxy resolver. [#27216](https://github.com/ClickHouse/ClickHouse/pull/27216) ([ianton-ru](https://github.com/ianton-ru)).
* Split global mutex into individual regexp construction. This helps avoid huge regexp construction blocking other related threads. [#27211](https://github.com/ClickHouse/ClickHouse/pull/27211) ([Amos Bird](https://github.com/amosbird)).
* Support schema for PostgreSQL database engine. Closes [#27166](https://github.com/ClickHouse/ClickHouse/issues/27166). [#27198](https://github.com/ClickHouse/ClickHouse/pull/27198) ([Kseniia Sumarokova](https://github.com/kssenii)).
@@ -234,7 +234,6 @@
* Fix multiple block insertion into distributed table with `insert_distributed_one_random_shard = 1`. This is a marginal feature. Mark as improvement. [#23140](https://github.com/ClickHouse/ClickHouse/pull/23140) ([Amos Bird](https://github.com/amosbird)).
* Support `LowCardinality` and `FixedString` keys/values for `Map` type. [#21543](https://github.com/ClickHouse/ClickHouse/pull/21543) ([hexiaoting](https://github.com/hexiaoting)).
* Enable reloading of local disk config. [#19526](https://github.com/ClickHouse/ClickHouse/pull/19526) ([taiyang-li](https://github.com/taiyang-li)).
-* Now KeyConditions can correctly skip nullable keys, including `isNull` and `isNotNull`. https://github.com/ClickHouse/ClickHouse/pull/12433. [#12455](https://github.com/ClickHouse/ClickHouse/pull/12455) ([Amos Bird](https://github.com/amosbird)).
#### Bug Fix
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 66df4d3124a..685b2c25a0d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -336,6 +336,10 @@ if (COMPILER_GCC OR COMPILER_CLANG)
set(COMPILER_FLAGS "${COMPILER_FLAGS} -falign-functions=32")
endif ()
+if (COMPILER_GCC)
+ set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines")
+endif ()
+
# Compiler-specific coverage flags e.g. -fcoverage-mapping for gcc
option(WITH_COVERAGE "Profile the resulting binary/binaries" OFF)
diff --git a/contrib/cctz b/contrib/cctz
index c0f1bcb97fd..9edd0861d83 160000
--- a/contrib/cctz
+++ b/contrib/cctz
@@ -1 +1 @@
-Subproject commit c0f1bcb97fd2782f7c3f972fadd5aad5affac4b8
+Subproject commit 9edd0861d8328b2ae77e8fb5f4d7dcd1cf33b42b
diff --git a/contrib/libcxx b/contrib/libcxx
index 2fa892f69ac..61e60294b1d 160000
--- a/contrib/libcxx
+++ b/contrib/libcxx
@@ -1 +1 @@
-Subproject commit 2fa892f69acbaa40f8a18c6484854a6183a34482
+Subproject commit 61e60294b1de01483caa9f5d00f437c99b674de6
diff --git a/docker/test/fasttest/Dockerfile b/docker/test/fasttest/Dockerfile
index 798910fb952..f50c65bb9f2 100644
--- a/docker/test/fasttest/Dockerfile
+++ b/docker/test/fasttest/Dockerfile
@@ -67,7 +67,7 @@ RUN apt-get update \
unixodbc \
--yes --no-install-recommends
-RUN pip3 install numpy scipy pandas Jinja2
+RUN pip3 install numpy scipy pandas Jinja2 pandas clickhouse_driver
# This symlink required by gcc to find lld compiler
RUN ln -s /usr/bin/lld-${LLVM_VERSION} /usr/bin/ld.lld
diff --git a/docker/test/fuzzer/Dockerfile b/docker/test/fuzzer/Dockerfile
index 6444e745c47..13353bc2960 100644
--- a/docker/test/fuzzer/Dockerfile
+++ b/docker/test/fuzzer/Dockerfile
@@ -27,7 +27,7 @@ RUN apt-get update \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
-RUN pip3 install Jinja2
+RUN pip3 install Jinja2 pandas clickhouse_driver
COPY * /
diff --git a/docker/test/fuzzer/run-fuzzer.sh b/docker/test/fuzzer/run-fuzzer.sh
index 3c3fcd42fde..15590902b68 100755
--- a/docker/test/fuzzer/run-fuzzer.sh
+++ b/docker/test/fuzzer/run-fuzzer.sh
@@ -125,25 +125,9 @@ function fuzz
# interferes with gdb
export CLICKHOUSE_WATCHDOG_ENABLE=0
- # NOTE: that $! cannot be used to obtain the server pid, since it will be
- # the pid of the bash, due to piping the output of clickhouse-server to
- # tail
- PID_FILE=clickhouse-server.pid
- clickhouse-server --pidfile=$PID_FILE --config-file db/config.xml -- --path db 2>&1 | tail -100000 > server.log &
-
- server_pid=-1
- for _ in {1..60}; do
- if [ -s $PID_FILE ]; then
- server_pid=$(cat $PID_FILE)
- break
- fi
- sleep 1
- done
-
- if [ $server_pid = -1 ]; then
- echo "Server did not started" >&2
- exit 1
- fi
+ # NOTE: we use process substitution here to preserve keep $! as a pid of clickhouse-server
+ clickhouse-server --config-file db/config.xml -- --path db > >(tail -100000 > server.log) 2>&1 &
+ server_pid=$!
kill -0 $server_pid
diff --git a/docker/test/stateless/Dockerfile b/docker/test/stateless/Dockerfile
index 7de8c061673..a5733d11dd2 100644
--- a/docker/test/stateless/Dockerfile
+++ b/docker/test/stateless/Dockerfile
@@ -34,7 +34,7 @@ RUN apt-get update -y \
postgresql-client \
sqlite3
-RUN pip3 install numpy scipy pandas Jinja2
+RUN pip3 install numpy scipy pandas Jinja2 clickhouse_driver
RUN mkdir -p /tmp/clickhouse-odbc-tmp \
&& wget -nv -O - ${odbc_driver_url} | tar --strip-components=1 -xz -C /tmp/clickhouse-odbc-tmp \
diff --git a/docker/test/style/Dockerfile b/docker/test/style/Dockerfile
index 33cdb9db57a..64cc0c9c7b7 100644
--- a/docker/test/style/Dockerfile
+++ b/docker/test/style/Dockerfile
@@ -10,7 +10,7 @@ RUN apt-get update && env DEBIAN_FRONTEND=noninteractive apt-get install --yes \
python3-pip \
pylint \
yamllint \
- && pip3 install codespell
+ && pip3 install codespell pandas clickhouse_driver
COPY run.sh /
COPY process_style_check_result.py /
diff --git a/docs/en/introduction/adopters.md b/docs/en/introduction/adopters.md
index 72ebe33292f..20bf9a10986 100644
--- a/docs/en/introduction/adopters.md
+++ b/docs/en/introduction/adopters.md
@@ -102,6 +102,7 @@ toc_title: Adopters
| Raiffeisenbank | Banking | Analytics | — | — | [Lecture in Russian, December 2020](https://cs.hse.ru/announcements/421965599.html) |
| Rambler | Internet services | Analytics | — | — | [Talk in Russian, April 2018](https://medium.com/@ramblertop/разработка-api-clickhouse-для-рамблер-топ-100-f4c7e56f3141) |
| Retell | Speech synthesis | Analytics | — | — | [Blog Article, August 2020](https://vc.ru/services/153732-kak-sozdat-audiostati-na-vashem-sayte-i-zachem-eto-nuzhno) |
+| Rollbar | Software Development | Main Product | — | — | [Official Website](https://www.rollbar.com) |
| Rspamd | Antispam | Analytics | — | — | [Official Website](https://rspamd.com/doc/modules/clickhouse.html) |
| RuSIEM | SIEM | Main Product | — | — | [Official Website](https://rusiem.com/en/products/architecture) |
| S7 Airlines | Airlines | Metrics, Logging | — | — | [Talk in Russian, March 2019](https://www.youtube.com/watch?v=nwG68klRpPg&t=15s) |
diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md
index de7a1835038..f78fbc8a2bc 100644
--- a/docs/en/operations/settings/settings.md
+++ b/docs/en/operations/settings/settings.md
@@ -3749,3 +3749,38 @@ Exception: Total regexp lengths too large.
**See Also**
- [max_hyperscan_regexp_length](#max-hyperscan-regexp-length)
+
+## enable_positional_arguments {#enable-positional-arguments}
+
+Enables or disables supporting positional arguments for [GROUP BY](../../sql-reference/statements/select/group-by.md), [LIMIT BY](../../sql-reference/statements/select/limit-by.md), [ORDER BY](../../sql-reference/statements/select/order-by.md) statements. When you want to use column numbers instead of column names in these clauses, set `enable_positional_arguments = 1`.
+
+Possible values:
+
+- 0 — Positional arguments aren't supported.
+- 1 — Positional arguments are supported: column numbers can use instead of column names.
+
+Default value: `0`.
+
+**Example**
+
+Query:
+
+```sql
+CREATE TABLE positional_arguments(one Int, two Int, three Int) ENGINE=Memory();
+
+INSERT INTO positional_arguments VALUES (10, 20, 30), (20, 20, 10), (30, 10, 20);
+
+SET enable_positional_arguments = 1;
+
+SELECT * FROM positional_arguments ORDER BY 2,3;
+```
+
+Result:
+
+```text
+┌─one─┬─two─┬─three─┐
+│ 30 │ 10 │ 20 │
+│ 20 │ 20 │ 10 │
+│ 10 │ 20 │ 30 │
+└─────┴─────┴───────┘
+```
\ No newline at end of file
diff --git a/docs/en/sql-reference/statements/select/group-by.md b/docs/en/sql-reference/statements/select/group-by.md
index 7c2d3a20f43..26dd51d806d 100644
--- a/docs/en/sql-reference/statements/select/group-by.md
+++ b/docs/en/sql-reference/statements/select/group-by.md
@@ -10,6 +10,8 @@ toc_title: GROUP BY
- All the expressions in the [SELECT](../../../sql-reference/statements/select/index.md), [HAVING](../../../sql-reference/statements/select/having.md), and [ORDER BY](../../../sql-reference/statements/select/order-by.md) clauses **must** be calculated based on key expressions **or** on [aggregate functions](../../../sql-reference/aggregate-functions/index.md) over non-key expressions (including plain columns). In other words, each column selected from the table must be used either in a key expression or inside an aggregate function, but not both.
- Result of aggregating `SELECT` query will contain as many rows as there were unique values of “grouping key” in source table. Usually this signficantly reduces the row count, often by orders of magnitude, but not necessarily: row count stays the same if all “grouping key” values were distinct.
+When you want to group data in the table by column numbers instead of column names, enable the setting [enable_positional_arguments](../../../operations/settings/settings.md#enable-positional-arguments).
+
!!! note "Note"
There’s an additional way to run aggregation over a table. If a query contains table columns only inside aggregate functions, the `GROUP BY clause` can be omitted, and aggregation by an empty set of keys is assumed. Such queries always return exactly one row.
diff --git a/docs/en/sql-reference/statements/select/index.md b/docs/en/sql-reference/statements/select/index.md
index b3cc7555d91..0427764475a 100644
--- a/docs/en/sql-reference/statements/select/index.md
+++ b/docs/en/sql-reference/statements/select/index.md
@@ -144,7 +144,7 @@ Extreme values are calculated for rows before `LIMIT`, but after `LIMIT BY`. How
You can use synonyms (`AS` aliases) in any part of a query.
-The `GROUP BY` and `ORDER BY` clauses do not support positional arguments. This contradicts MySQL, but conforms to standard SQL. For example, `GROUP BY 1, 2` will be interpreted as grouping by constants (i.e. aggregation of all rows into one).
+The `GROUP BY`, `ORDER BY`, and `LIMIT BY` clauses can support positional arguments. To enable this, switch on the [enable_positional_arguments](../../../operations/settings/settings.md#enable-positional-arguments) setting. Then, for example, `ORDER BY 1,2` will be sorting rows in the table on the first and then the second column.
## Implementation Details {#implementation-details}
diff --git a/docs/en/sql-reference/statements/select/limit-by.md b/docs/en/sql-reference/statements/select/limit-by.md
index 34645b68b03..e1ca58cdec8 100644
--- a/docs/en/sql-reference/statements/select/limit-by.md
+++ b/docs/en/sql-reference/statements/select/limit-by.md
@@ -16,6 +16,9 @@ During query processing, ClickHouse selects data ordered by sorting key. The sor
!!! note "Note"
`LIMIT BY` is not related to [LIMIT](../../../sql-reference/statements/select/limit.md). They can both be used in the same query.
+If you want to use column numbers instead of column names in the `LIMIT BY` clause, enable the setting [enable_positional_arguments](../../../operations/settings/settings.md#enable-positional-arguments).
+
+
## Examples {#examples}
Sample table:
diff --git a/docs/en/sql-reference/statements/select/order-by.md b/docs/en/sql-reference/statements/select/order-by.md
index 030f04d5e83..ee6893812cc 100644
--- a/docs/en/sql-reference/statements/select/order-by.md
+++ b/docs/en/sql-reference/statements/select/order-by.md
@@ -4,7 +4,9 @@ toc_title: ORDER BY
# ORDER BY Clause {#select-order-by}
-The `ORDER BY` clause contains a list of expressions, which can each be attributed with `DESC` (descending) or `ASC` (ascending) modifier which determine the sorting direction. If the direction is not specified, `ASC` is assumed, so it’s usually omitted. The sorting direction applies to a single expression, not to the entire list. Example: `ORDER BY Visits DESC, SearchPhrase`
+The `ORDER BY` clause contains a list of expressions, which can each be attributed with `DESC` (descending) or `ASC` (ascending) modifier which determine the sorting direction. If the direction is not specified, `ASC` is assumed, so it’s usually omitted. The sorting direction applies to a single expression, not to the entire list. Example: `ORDER BY Visits DESC, SearchPhrase`.
+
+If you want to sort by column numbers instead of column names, enable the setting [enable_positional_arguments](../../../operations/settings/settings.md#enable-positional-arguments).
Rows that have identical values for the list of sorting expressions are output in an arbitrary order, which can also be non-deterministic (different each time).
If the ORDER BY clause is omitted, the order of the rows is also undefined, and may be non-deterministic as well.
diff --git a/docs/ru/operations/settings/settings.md b/docs/ru/operations/settings/settings.md
index e639c0a0df2..500485aea2f 100644
--- a/docs/ru/operations/settings/settings.md
+++ b/docs/ru/operations/settings/settings.md
@@ -3538,3 +3538,38 @@ Exception: Total regexp lengths too large.
**См. также**
- [max_hyperscan_regexp_length](#max-hyperscan-regexp-length)
+
+## enable_positional_arguments {#enable-positional-arguments}
+
+Включает и отключает поддержку позиционных аргументов для [GROUP BY](../../sql-reference/statements/select/group-by.md), [LIMIT BY](../../sql-reference/statements/select/limit-by.md), [ORDER BY](../../sql-reference/statements/select/order-by.md). Если вы хотите использовать номера столбцов вместо названий в выражениях этих операторов, установите `enable_positional_arguments = 1`.
+
+Возможные значения:
+
+- 0 — Позиционные аргументы не поддерживаются.
+- 1 — Позиционные аргументы поддерживаются: можно использовать номера столбцов вместо названий столбцов.
+
+Значение по умолчанию: `0`.
+
+**Пример**
+
+Запрос:
+
+```sql
+CREATE TABLE positional_arguments(one Int, two Int, three Int) ENGINE=Memory();
+
+INSERT INTO positional_arguments VALUES (10, 20, 30), (20, 20, 10), (30, 10, 20);
+
+SET enable_positional_arguments = 1;
+
+SELECT * FROM positional_arguments ORDER BY 2,3;
+```
+
+Результат:
+
+```text
+┌─one─┬─two─┬─three─┐
+│ 30 │ 10 │ 20 │
+│ 20 │ 20 │ 10 │
+│ 10 │ 20 │ 30 │
+└─────┴─────┴───────┘
+```
\ No newline at end of file
diff --git a/docs/ru/sql-reference/statements/select/group-by.md b/docs/ru/sql-reference/statements/select/group-by.md
index 2f0cabd14fb..8bc1b765ad3 100644
--- a/docs/ru/sql-reference/statements/select/group-by.md
+++ b/docs/ru/sql-reference/statements/select/group-by.md
@@ -10,6 +10,8 @@ toc_title: GROUP BY
- Все выражения в секциях [SELECT](index.md), [HAVING](having.md), и [ORDER BY](order-by.md) статьи **должны** быть вычисленными на основе ключевых выражений **или** на [агрегатных функций](../../../sql-reference/aggregate-functions/index.md) над неключевыми выражениями (включая столбцы). Другими словами, каждый столбец, выбранный из таблицы, должен использоваться либо в ключевом выражении, либо внутри агрегатной функции, но не в обоих.
- В результате агрегирования `SELECT` запрос будет содержать столько строк, сколько было уникальных значений ключа группировки в исходной таблице. Обычно агрегация значительно уменьшает количество строк, часто на порядки, но не обязательно: количество строк остается неизменным, если все исходные значения ключа группировки ценности были различны.
+Если вы хотите для группировки данных в таблице указывать номера столбцов, а не названия, включите настройку [enable_positional_arguments](../../../operations/settings/settings.md#enable-positional-arguments).
+
!!! note "Примечание"
Есть ещё один способ запустить агрегацию по таблице. Если запрос содержит столбцы исходной таблицы только внутри агрегатных функций, то `GROUP BY` секцию можно опустить, и предполагается агрегирование по пустому набору ключей. Такие запросы всегда возвращают ровно одну строку.
diff --git a/docs/ru/sql-reference/statements/select/index.md b/docs/ru/sql-reference/statements/select/index.md
index c2820bc7be4..ffaae74b1d9 100644
--- a/docs/ru/sql-reference/statements/select/index.md
+++ b/docs/ru/sql-reference/statements/select/index.md
@@ -140,8 +140,7 @@ Code: 42. DB::Exception: Received from localhost:9000. DB::Exception: Number of
Вы можете использовать синонимы (алиасы `AS`) в любом месте запроса.
-В секциях `GROUP BY`, `ORDER BY`, в отличие от диалекта MySQL, и в соответствии со стандартным SQL, не поддерживаются позиционные аргументы.
-Например, если вы напишите `GROUP BY 1, 2` - то это будет воспринято, как группировка по константам (то есть, агрегация всех строк в одну).
+В секциях `GROUP BY`, `ORDER BY` и `LIMIT BY` можно использовать не названия столбцов, а номера. Для этого нужно включить настройку [enable_positional_arguments](../../../operations/settings/settings.md#enable-positional-arguments). Тогда, например, в запросе с `ORDER BY 1,2` будет выполнена сортировка сначала по первому, а затем по второму столбцу.
## Детали реализации {#implementation-details}
diff --git a/docs/ru/sql-reference/statements/select/limit-by.md b/docs/ru/sql-reference/statements/select/limit-by.md
index fba81c023b5..861d88dcafb 100644
--- a/docs/ru/sql-reference/statements/select/limit-by.md
+++ b/docs/ru/sql-reference/statements/select/limit-by.md
@@ -15,6 +15,8 @@ ClickHouse поддерживает следующий синтаксис:
`LIMIT BY` не связана с секцией `LIMIT`. Их можно использовать в одном запросе.
+Если вы хотите использовать в секции `LIMIT BY` номера столбцов вместо названий, включите настройку [enable_positional_arguments](../../../operations/settings/settings.md#enable-positional-arguments).
+
## Примеры
Образец таблицы:
diff --git a/docs/ru/sql-reference/statements/select/order-by.md b/docs/ru/sql-reference/statements/select/order-by.md
index d7d2e9c7574..190a46dacc9 100644
--- a/docs/ru/sql-reference/statements/select/order-by.md
+++ b/docs/ru/sql-reference/statements/select/order-by.md
@@ -4,7 +4,9 @@ toc_title: ORDER BY
# Секция ORDER BY {#select-order-by}
-Секция `ORDER BY` содержит список выражений, к каждому из которых также может быть приписано `DESC` или `ASC` (направление сортировки). Если ничего не приписано - это аналогично приписыванию `ASC`. `ASC` - сортировка по возрастанию, `DESC` - сортировка по убыванию. Обозначение направления сортировки действует на одно выражение, а не на весь список. Пример: `ORDER BY Visits DESC, SearchPhrase`
+Секция `ORDER BY` содержит список выражений, к каждому из которых также может быть приписано `DESC` или `ASC` (направление сортировки). Если ничего не приписано - это аналогично приписыванию `ASC`. `ASC` - сортировка по возрастанию, `DESC` - сортировка по убыванию. Обозначение направления сортировки действует на одно выражение, а не на весь список. Пример: `ORDER BY Visits DESC, SearchPhrase`.
+
+Если вы хотите для сортировки данных указывать номера столбцов, а не названия, включите настройку [enable_positional_arguments](../../../operations/settings/settings.md#enable-positional-arguments).
Строки, для которых список выражений, по которым производится сортировка, принимает одинаковые значения, выводятся в произвольном порядке, который может быть также недетерминированным (каждый раз разным).
Если секция ORDER BY отсутствует, то, аналогично, порядок, в котором идут строки, не определён, и может быть недетерминированным.
diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp
index cd5d72cfba4..4ed5b114082 100644
--- a/programs/server/Server.cpp
+++ b/programs/server/Server.cpp
@@ -1159,7 +1159,6 @@ if (ThreadFuzzer::instance().isEffective())
UInt64 total_memory_profiler_step = config().getUInt64("total_memory_profiler_step", 0);
if (total_memory_profiler_step)
{
- total_memory_tracker.setOrRaiseProfilerLimit(total_memory_profiler_step);
total_memory_tracker.setProfilerStep(total_memory_profiler_step);
}
diff --git a/programs/server/config.xml b/programs/server/config.xml
index ba4c680d765..e38a6daeaed 100644
--- a/programs/server/config.xml
+++ b/programs/server/config.xml
@@ -62,6 +62,27 @@
-->
+
+
+
+
diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp
index cde5a5f9977..b68df11fd60 100644
--- a/src/Client/ClientBase.cpp
+++ b/src/Client/ClientBase.cpp
@@ -426,10 +426,8 @@ void ClientBase::processTextAsSingleQuery(const String & full_query)
catch (Exception & e)
{
if (!is_interactive)
- {
e.addMessage("(in query: {})", full_query);
- throw;
- }
+ throw;
}
if (have_error)
diff --git a/src/Client/QueryFuzzer.cpp b/src/Client/QueryFuzzer.cpp
index aa9f89e47b5..ec267e44a99 100644
--- a/src/Client/QueryFuzzer.cpp
+++ b/src/Client/QueryFuzzer.cpp
@@ -144,7 +144,7 @@ Field QueryFuzzer::fuzzField(Field field)
{
size_t pos = fuzz_rand() % arr.size();
arr.erase(arr.begin() + pos);
- fprintf(stderr, "erased\n");
+ std::cerr << "erased\n";
}
if (fuzz_rand() % 5 == 0)
@@ -153,12 +153,12 @@ Field QueryFuzzer::fuzzField(Field field)
{
size_t pos = fuzz_rand() % arr.size();
arr.insert(arr.begin() + pos, fuzzField(arr[pos]));
- fprintf(stderr, "inserted (pos %zd)\n", pos);
+ std::cerr << fmt::format("inserted (pos {})\n", pos);
}
else
{
arr.insert(arr.begin(), getRandomField(0));
- fprintf(stderr, "inserted (0)\n");
+ std::cerr << "inserted (0)\n";
}
}
@@ -278,7 +278,7 @@ void QueryFuzzer::fuzzOrderByList(IAST * ast)
}
else
{
- fprintf(stderr, "no random col!\n");
+ std::cerr << "No random column.\n";
}
}
@@ -312,13 +312,9 @@ void QueryFuzzer::fuzzColumnLikeExpressionList(IAST * ast)
: impl->children.begin() + fuzz_rand() % impl->children.size();
auto col = getRandomColumnLike();
if (col)
- {
impl->children.insert(pos, col);
- }
else
- {
- fprintf(stderr, "no random col!\n");
- }
+ std::cerr << "No random column.\n";
}
// We don't have to recurse here to fuzz the children, this is handled by
diff --git a/src/Columns/ColumnFixedString.cpp b/src/Columns/ColumnFixedString.cpp
index ce39ab0994c..733ecaa979a 100644
--- a/src/Columns/ColumnFixedString.cpp
+++ b/src/Columns/ColumnFixedString.cpp
@@ -248,31 +248,23 @@ ColumnPtr ColumnFixedString::filter(const IColumn::Filter & filt, ssize_t result
UInt16 mask = _mm_movemask_epi8(_mm_cmpeq_epi8(_mm_loadu_si128(reinterpret_cast(filt_pos)), zero16));
mask = ~mask;
- if (0 == mask)
- {
- /// Nothing is inserted.
- data_pos += chars_per_simd_elements;
- }
- else if (0xFFFF == mask)
+ if (0xFFFF == mask)
{
res->chars.insert(data_pos, data_pos + chars_per_simd_elements);
- data_pos += chars_per_simd_elements;
}
else
{
size_t res_chars_size = res->chars.size();
- for (size_t i = 0; i < SIMD_BYTES; ++i)
+ while (mask)
{
- if (filt_pos[i])
- {
- res->chars.resize(res_chars_size + n);
- memcpySmallAllowReadWriteOverflow15(&res->chars[res_chars_size], data_pos, n);
- res_chars_size += n;
- }
- data_pos += n;
+ size_t index = __builtin_ctz(mask);
+ res->chars.resize(res_chars_size + n);
+ memcpySmallAllowReadWriteOverflow15(&res->chars[res_chars_size], data_pos + index * n, n);
+ res_chars_size += n;
+ mask = mask & (mask - 1);
}
}
-
+ data_pos += chars_per_simd_elements;
filt_pos += SIMD_BYTES;
}
#endif
diff --git a/src/Columns/ColumnVector.cpp b/src/Columns/ColumnVector.cpp
index 7f3cdaeec7f..a769cd93037 100644
--- a/src/Columns/ColumnVector.cpp
+++ b/src/Columns/ColumnVector.cpp
@@ -327,19 +327,18 @@ ColumnPtr ColumnVector::filter(const IColumn::Filter & filt, ssize_t result_s
UInt16 mask = _mm_movemask_epi8(_mm_cmpeq_epi8(_mm_loadu_si128(reinterpret_cast(filt_pos)), zero16));
mask = ~mask;
- if (0 == mask)
- {
- /// Nothing is inserted.
- }
- else if (0xFFFF == mask)
+ if (0xFFFF == mask)
{
res_data.insert(data_pos, data_pos + SIMD_BYTES);
}
else
{
- for (size_t i = 0; i < SIMD_BYTES; ++i)
- if (filt_pos[i])
- res_data.push_back(data_pos[i]);
+ while (mask)
+ {
+ size_t index = __builtin_ctz(mask);
+ res_data.push_back(data_pos[index]);
+ mask = mask & (mask - 1);
+ }
}
filt_pos += SIMD_BYTES;
diff --git a/src/Columns/ColumnsCommon.cpp b/src/Columns/ColumnsCommon.cpp
index 41933ed08ed..a4d7de34382 100644
--- a/src/Columns/ColumnsCommon.cpp
+++ b/src/Columns/ColumnsCommon.cpp
@@ -241,11 +241,7 @@ namespace
zero_vec));
mask = ~mask;
- if (mask == 0)
- {
- /// SIMD_BYTES consecutive rows do not pass the filter
- }
- else if (mask == 0xffff)
+ if (mask == 0xffff)
{
/// SIMD_BYTES consecutive rows pass the filter
const auto first = offsets_pos == offsets_begin;
@@ -262,9 +258,12 @@ namespace
}
else
{
- for (size_t i = 0; i < SIMD_BYTES; ++i)
- if (filt_pos[i])
- copy_array(offsets_pos + i);
+ while (mask)
+ {
+ size_t index = __builtin_ctz(mask);
+ copy_array(offsets_pos + index);
+ mask = mask & (mask - 1);
+ }
}
filt_pos += SIMD_BYTES;
diff --git a/src/Common/MemoryTracker.cpp b/src/Common/MemoryTracker.cpp
index 205771a5f6c..013005442be 100644
--- a/src/Common/MemoryTracker.cpp
+++ b/src/Common/MemoryTracker.cpp
@@ -200,11 +200,13 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
}
+ bool allocation_traced = false;
if (unlikely(current_profiler_limit && will_be > current_profiler_limit))
{
BlockerInThread untrack_lock(VariableContext::Global);
DB::TraceCollector::collect(DB::TraceType::Memory, StackTrace(), size);
setOrRaiseProfilerLimit((will_be + profiler_step - 1) / profiler_step * profiler_step);
+ allocation_traced = true;
}
std::bernoulli_distribution sample(sample_probability);
@@ -212,6 +214,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
{
BlockerInThread untrack_lock(VariableContext::Global);
DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), size);
+ allocation_traced = true;
}
if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded)
@@ -230,17 +233,24 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
formatReadableSizeWithBinarySuffix(current_hard_limit));
}
+ bool peak_updated;
if (throw_if_memory_exceeded)
{
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
BlockerInThread untrack_lock(VariableContext::Global);
bool log_memory_usage = true;
- updatePeak(will_be, log_memory_usage);
+ peak_updated = updatePeak(will_be, log_memory_usage);
}
else
{
bool log_memory_usage = false;
- updatePeak(will_be, log_memory_usage);
+ peak_updated = updatePeak(will_be, log_memory_usage);
+ }
+
+ if (peak_updated && allocation_traced)
+ {
+ BlockerInThread untrack_lock(VariableContext::Global);
+ DB::TraceCollector::collect(DB::TraceType::MemoryPeak, StackTrace(), will_be);
}
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
@@ -259,7 +269,7 @@ void MemoryTracker::allocNoThrow(Int64 size)
allocImpl(size, throw_if_memory_exceeded);
}
-void MemoryTracker::updatePeak(Int64 will_be, bool log_memory_usage)
+bool MemoryTracker::updatePeak(Int64 will_be, bool log_memory_usage)
{
auto peak_old = peak.load(std::memory_order_relaxed);
if (will_be > peak_old) /// Races doesn't matter. Could rewrite with CAS, but not worth.
@@ -269,7 +279,10 @@ void MemoryTracker::updatePeak(Int64 will_be, bool log_memory_usage)
if (log_memory_usage && (level == VariableContext::Process || level == VariableContext::Global)
&& will_be / log_peak_memory_usage_every > peak_old / log_peak_memory_usage_every)
logMemoryUsage(will_be);
+
+ return true;
}
+ return false;
}
diff --git a/src/Common/MemoryTracker.h b/src/Common/MemoryTracker.h
index 36560fec334..b860c611be2 100644
--- a/src/Common/MemoryTracker.h
+++ b/src/Common/MemoryTracker.h
@@ -58,9 +58,11 @@ private:
/// This description will be used as prefix into log messages (if isn't nullptr)
std::atomic description_ptr = nullptr;
- void updatePeak(Int64 will_be, bool log_memory_usage);
+ bool updatePeak(Int64 will_be, bool log_memory_usage);
void logMemoryUsage(Int64 current) const;
+ void setOrRaiseProfilerLimit(Int64 value);
+
public:
explicit MemoryTracker(VariableContext level_ = VariableContext::Thread);
explicit MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread);
@@ -106,7 +108,6 @@ public:
* Otherwise, set limit to new value, if new value is greater than previous limit.
*/
void setOrRaiseHardLimit(Int64 value);
- void setOrRaiseProfilerLimit(Int64 value);
void setFaultProbability(double value)
{
@@ -121,6 +122,7 @@ public:
void setProfilerStep(Int64 value)
{
profiler_step = value;
+ setOrRaiseProfilerLimit(value);
}
/// next should be changed only once: from nullptr to some value.
diff --git a/src/Common/TraceCollector.h b/src/Common/TraceCollector.h
index 86e9d659d0a..d3bbc74726e 100644
--- a/src/Common/TraceCollector.h
+++ b/src/Common/TraceCollector.h
@@ -20,7 +20,8 @@ enum class TraceType : uint8_t
Real,
CPU,
Memory,
- MemorySample
+ MemorySample,
+ MemoryPeak,
};
class TraceCollector
diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp
index d79a94169b2..cf607a3d70e 100644
--- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp
+++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp
@@ -289,7 +289,7 @@ ZooKeeper::~ZooKeeper()
{
try
{
- finalize(false, false);
+ finalize(false, false, "destructor called");
if (send_thread.joinable())
send_thread.join();
@@ -299,7 +299,7 @@ ZooKeeper::~ZooKeeper()
}
catch (...)
{
- tryLogCurrentException(__PRETTY_FUNCTION__);
+ tryLogCurrentException(log);
}
}
@@ -317,6 +317,7 @@ ZooKeeper::ZooKeeper(
session_timeout(session_timeout_),
operation_timeout(std::min(operation_timeout_, session_timeout_))
{
+ log = &Poco::Logger::get("ZooKeeperClient");
std::atomic_store(&zk_log, std::move(zk_log_));
if (!root_path.empty())
@@ -450,6 +451,10 @@ void ZooKeeper::connect(
message << fail_reasons.str() << "\n";
throw Exception(message.str(), Error::ZCONNECTIONLOSS);
}
+ else
+ {
+ LOG_TEST(log, "Connected to ZooKeeper at {} with session_id {}", socket.peerAddress().toString(), session_id);
+ }
}
@@ -604,8 +609,8 @@ void ZooKeeper::sendThread()
}
catch (...)
{
- tryLogCurrentException(__PRETTY_FUNCTION__);
- finalize(true, false);
+ tryLogCurrentException(log);
+ finalize(true, false, "exception in sendThread");
}
}
@@ -663,8 +668,8 @@ void ZooKeeper::receiveThread()
}
catch (...)
{
- tryLogCurrentException(__PRETTY_FUNCTION__);
- finalize(false, true);
+ tryLogCurrentException(log);
+ finalize(false, true, "exception in receiveThread");
}
}
@@ -799,7 +804,7 @@ void ZooKeeper::receiveEvent()
}
catch (...)
{
- tryLogCurrentException(__PRETTY_FUNCTION__);
+ tryLogCurrentException(log);
/// Unrecoverable. Don't leave incorrect state in memory.
if (!response)
@@ -819,7 +824,7 @@ void ZooKeeper::receiveEvent()
catch (...)
{
/// Throw initial exception, not exception from callback.
- tryLogCurrentException(__PRETTY_FUNCTION__);
+ tryLogCurrentException(log);
}
throw;
@@ -832,10 +837,15 @@ void ZooKeeper::receiveEvent()
}
-void ZooKeeper::finalize(bool error_send, bool error_receive)
+void ZooKeeper::finalize(bool error_send, bool error_receive, const String & reason)
{
/// If some thread (send/receive) already finalizing session don't try to do it
- if (finalization_started.exchange(true))
+ bool already_started = finalization_started.exchange(true);
+
+ LOG_TEST(log, "Finalizing session {}: finalization_started={}, queue_closed={}, reason={}",
+ session_id, already_started, requests_queue.isClosed(), reason);
+
+ if (already_started)
return;
auto expire_session_if_not_expired = [&]
@@ -860,7 +870,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
/// This happens for example, when "Cannot push request to queue within operation timeout".
/// Just mark session expired in case of error on close request, otherwise sendThread may not stop.
expire_session_if_not_expired();
- tryLogCurrentException(__PRETTY_FUNCTION__);
+ tryLogCurrentException(log);
}
/// Send thread will exit after sending close request or on expired flag
@@ -879,7 +889,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
catch (...)
{
/// We must continue to execute all callbacks, because the user is waiting for them.
- tryLogCurrentException(__PRETTY_FUNCTION__);
+ tryLogCurrentException(log);
}
if (!error_receive && receive_thread.joinable())
@@ -908,7 +918,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
catch (...)
{
/// We must continue to all other callbacks, because the user is waiting for them.
- tryLogCurrentException(__PRETTY_FUNCTION__);
+ tryLogCurrentException(log);
}
}
}
@@ -939,7 +949,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
}
catch (...)
{
- tryLogCurrentException(__PRETTY_FUNCTION__);
+ tryLogCurrentException(log);
}
}
}
@@ -967,7 +977,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
}
catch (...)
{
- tryLogCurrentException(__PRETTY_FUNCTION__);
+ tryLogCurrentException(log);
}
}
}
@@ -983,14 +993,14 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
}
catch (...)
{
- tryLogCurrentException(__PRETTY_FUNCTION__);
+ tryLogCurrentException(log);
}
}
}
}
catch (...)
{
- tryLogCurrentException(__PRETTY_FUNCTION__);
+ tryLogCurrentException(log);
}
}
@@ -1028,7 +1038,7 @@ void ZooKeeper::pushRequest(RequestInfo && info)
}
catch (...)
{
- finalize(false, false);
+ finalize(false, false, getCurrentExceptionMessage(false, false, false));
throw;
}
diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.h b/src/Common/ZooKeeper/ZooKeeperImpl.h
index ce37ca7b650..53908e5b0c7 100644
--- a/src/Common/ZooKeeper/ZooKeeperImpl.h
+++ b/src/Common/ZooKeeper/ZooKeeperImpl.h
@@ -187,7 +187,7 @@ public:
/// it will do read in another session, that read may not see the
/// already performed write.
- void finalize() override { finalize(false, false); }
+ void finalize() override { finalize(false, false, "unknown"); }
void setZooKeeperLog(std::shared_ptr zk_log_);
@@ -240,6 +240,8 @@ private:
ThreadFromGlobalPool send_thread;
ThreadFromGlobalPool receive_thread;
+ Poco::Logger * log;
+
void connect(
const Nodes & node,
Poco::Timespan connection_timeout);
@@ -257,7 +259,7 @@ private:
void close();
/// Call all remaining callbacks and watches, passing errors to them.
- void finalize(bool error_send, bool error_receive);
+ void finalize(bool error_send, bool error_receive, const String & reason);
template
void write(const T &);
diff --git a/src/Core/examples/CMakeLists.txt b/src/Core/examples/CMakeLists.txt
index 6b07dfbbfa6..c8846eb1743 100644
--- a/src/Core/examples/CMakeLists.txt
+++ b/src/Core/examples/CMakeLists.txt
@@ -13,3 +13,6 @@ target_link_libraries (mysql_protocol PRIVATE dbms)
if(USE_SSL)
target_include_directories (mysql_protocol SYSTEM PRIVATE ${OPENSSL_INCLUDE_DIR})
endif()
+
+add_executable (coro coro.cpp)
+target_link_libraries (coro PRIVATE clickhouse_common_io)
diff --git a/src/Core/examples/coro.cpp b/src/Core/examples/coro.cpp
new file mode 100644
index 00000000000..0f152d8090a
--- /dev/null
+++ b/src/Core/examples/coro.cpp
@@ -0,0 +1,189 @@
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include
+
+#if defined(__clang__)
+#include
+
+namespace std
+{
+ using namespace experimental::coroutines_v1;
+}
+
+#else
+#include
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wzero-as-null-pointer-constant"
+#endif
+
+
+template
+struct suspend_value // NOLINT(readability-identifier-naming)
+{
+ constexpr bool await_ready() const noexcept { return true; } // NOLINT(readability-identifier-naming)
+ constexpr void await_suspend(std::coroutine_handle<>) const noexcept {} // NOLINT(readability-identifier-naming)
+ constexpr T await_resume() const noexcept // NOLINT(readability-identifier-naming)
+ {
+ std::cout << " ret " << val << std::endl;
+ return val;
+ }
+
+ T val;
+};
+
+template
+struct Task
+{
+ struct promise_type // NOLINT(readability-identifier-naming)
+ {
+ using coro_handle = std::coroutine_handle;
+ auto get_return_object() { return coro_handle::from_promise(*this); } // NOLINT(readability-identifier-naming)
+ auto initial_suspend() { return std::suspend_never(); } // NOLINT(readability-identifier-naming)
+ auto final_suspend() noexcept { return suspend_value{*r->value}; } // NOLINT(readability-identifier-naming)
+ //void return_void() {}
+ void return_value(T value_) { r->value = value_; } // NOLINT(readability-identifier-naming)
+ void unhandled_exception() // NOLINT(readability-identifier-naming)
+ {
+ DB::tryLogCurrentException("Logger");
+ r->exception = std::current_exception(); // NOLINT(bugprone-throw-keyword-missing)
+ }
+
+ explicit promise_type(std::string tag_) : tag(tag_) {}
+ ~promise_type() { std::cout << "~promise_type " << tag << std::endl; }
+ std::string tag;
+ coro_handle next;
+ Task * r = nullptr;
+ };
+
+ using coro_handle = std::coroutine_handle;
+
+ bool await_ready() const noexcept { return false; } // NOLINT(readability-identifier-naming)
+ void await_suspend(coro_handle g) noexcept // NOLINT(readability-identifier-naming)
+ {
+ std::cout << " await_suspend " << my.promise().tag << std::endl;
+ std::cout << " g tag " << g.promise().tag << std::endl;
+ g.promise().next = my;
+ }
+ T await_resume() noexcept // NOLINT(readability-identifier-naming)
+ {
+ std::cout << " await_res " << my.promise().tag << std::endl;
+ return *value;
+ }
+
+ Task(coro_handle handle) : my(handle), tag(handle.promise().tag) // NOLINT(google-explicit-constructor)
+ {
+ assert(handle);
+ my.promise().r = this;
+ std::cout << " Task " << tag << std::endl;
+ }
+ Task(Task &) = delete;
+ Task(Task &&rhs) : my(rhs.my), tag(rhs.tag)
+ {
+ rhs.my = {};
+ std::cout << " Task&& " << tag << std::endl;
+ }
+ static bool resumeImpl(Task *r)
+ {
+ if (r->value)
+ return false;
+
+ auto & next = r->my.promise().next;
+
+ if (next)
+ {
+ if (resumeImpl(next.promise().r))
+ return true;
+ next = {};
+ }
+
+ if (!r->value)
+ {
+ r->my.resume();
+ if (r->exception)
+ std::rethrow_exception(r->exception);
+ }
+ return !r->value;
+ }
+
+ bool resume()
+ {
+ return resumeImpl(this);
+ }
+
+ T res()
+ {
+ return *value;
+ }
+
+ ~Task()
+ {
+ std::cout << " ~Task " << tag << std::endl;
+ }
+
+private:
+ coro_handle my;
+ std::string tag;
+ std::optional value;
+ std::exception_ptr exception;
+};
+
+Task boo([[maybe_unused]] std::string tag)
+{
+ std::cout << "x" << std::endl;
+ co_await std::suspend_always();
+ std::cout << StackTrace().toString();
+ std::cout << "y" << std::endl;
+ co_return 1;
+}
+
+Task bar([[maybe_unused]] std::string tag)
+{
+ std::cout << "a" << std::endl;
+ int res1 = co_await boo("boo1");
+ std::cout << "b " << res1 << std::endl;
+ int res2 = co_await boo("boo2");
+ if (res2 == 1)
+ throw DB::Exception(1, "hello");
+ std::cout << "c " << res2 << std::endl;
+ co_return res1 + res2; // 1 + 1 = 2
+}
+
+Task foo([[maybe_unused]] std::string tag)
+{
+ std::cout << "Hello" << std::endl;
+ auto res1 = co_await bar("bar1");
+ std::cout << "Coro " << res1 << std::endl;
+ auto res2 = co_await bar("bar2");
+ std::cout << "World " << res2 << std::endl;
+ co_return res1 * res2; // 2 * 2 = 4
+}
+
+int main()
+{
+ Poco::AutoPtr app_channel(new Poco::ConsoleChannel(std::cerr));
+ Poco::Logger::root().setChannel(app_channel);
+ Poco::Logger::root().setLevel("trace");
+
+ LOG_INFO(&Poco::Logger::get(""), "Starting");
+
+ try
+ {
+ auto t = foo("foo");
+ std::cout << ".. started" << std::endl;
+ while (t.resume())
+ std::cout << ".. yielded" << std::endl;
+ std::cout << ".. done: " << t.res() << std::endl;
+ }
+ catch (DB::Exception & e)
+ {
+ std::cout << "Got exception " << e.what() << std::endl;
+ std::cout << e.getStackTraceString() << std::endl;
+ }
+}
diff --git a/src/DataStreams/ColumnGathererStream.cpp b/src/DataStreams/ColumnGathererStream.cpp
index 9018870f3a4..9b2fac79bb0 100644
--- a/src/DataStreams/ColumnGathererStream.cpp
+++ b/src/DataStreams/ColumnGathererStream.cpp
@@ -11,104 +11,157 @@ namespace DB
namespace ErrorCodes
{
- extern const int INCOMPATIBLE_COLUMNS;
extern const int INCORRECT_NUMBER_OF_COLUMNS;
extern const int EMPTY_DATA_PASSED;
extern const int RECEIVED_EMPTY_DATA;
}
ColumnGathererStream::ColumnGathererStream(
- const String & column_name_, const BlockInputStreams & source_streams, ReadBuffer & row_sources_buf_,
- size_t block_preferred_size_)
- : column_name(column_name_), sources(source_streams.size()), row_sources_buf(row_sources_buf_)
- , block_preferred_size(block_preferred_size_), log(&Poco::Logger::get("ColumnGathererStream"))
+ size_t num_inputs, ReadBuffer & row_sources_buf_, size_t block_preferred_size_)
+ : sources(num_inputs), row_sources_buf(row_sources_buf_)
+ , block_preferred_size(block_preferred_size_)
{
- if (source_streams.empty())
+ if (num_inputs == 0)
throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED);
+}
- children.assign(source_streams.begin(), source_streams.end());
-
- for (size_t i = 0; i < children.size(); ++i)
+void ColumnGathererStream::initialize(Inputs inputs)
+{
+ for (size_t i = 0; i < inputs.size(); ++i)
{
- const Block & header = children[i]->getHeader();
-
- /// Sometimes MergeTreeReader injects additional column with partitioning key
- if (header.columns() > 2)
- throw Exception(
- "Block should have 1 or 2 columns, but contains " + toString(header.columns()),
- ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
-
- if (i == 0)
+ if (inputs[i].chunk)
{
- column.name = column_name;
- column.type = header.getByName(column_name).type;
- column.column = column.type->createColumn();
+ sources[i].update(inputs[i].chunk.detachColumns().at(0));
+ if (!result_column)
+ result_column = sources[i].column->cloneEmpty();
}
- else if (header.getByName(column_name).column->getName() != column.column->getName())
- throw Exception("Column types don't match", ErrorCodes::INCOMPATIBLE_COLUMNS);
}
}
-
-Block ColumnGathererStream::readImpl()
+IMergingAlgorithm::Status ColumnGathererStream::merge()
{
+ /// Nothing to read after initialize.
+ if (!result_column)
+ return Status(Chunk(), true);
+
+ if (source_to_fully_copy) /// Was set on a previous iteration
+ {
+ Chunk res;
+ res.addColumn(source_to_fully_copy->column);
+ merged_rows += source_to_fully_copy->size;
+ source_to_fully_copy->pos = source_to_fully_copy->size;
+ source_to_fully_copy = nullptr;
+ return Status(std::move(res));
+ }
+
/// Special case: single source and there are no skipped rows
- if (children.size() == 1 && row_sources_buf.eof() && !source_to_fully_copy)
- return children[0]->read();
+ /// Note: looks like this should never happen because row_sources_buf cannot just skip row info.
+ if (sources.size() == 1 && row_sources_buf.eof())
+ {
+ if (sources.front().pos < sources.front().size)
+ {
+ next_required_source = 0;
+ Chunk res;
+ merged_rows += sources.front().column->size();
+ merged_bytes += sources.front().column->allocatedBytes();
+ res.addColumn(std::move(sources.front().column));
+ sources.front().pos = sources.front().size = 0;
+ return Status(std::move(res));
+ }
- if (!source_to_fully_copy && row_sources_buf.eof())
- return Block();
+ if (next_required_source == -1)
+ return Status(Chunk(), true);
- MutableColumnPtr output_column = column.column->cloneEmpty();
- output_block = Block{column.cloneEmpty()};
- /// Surprisingly this call may directly change output_block, bypassing
+ next_required_source = 0;
+ return Status(next_required_source);
+ }
+
+ if (next_required_source != -1 && sources[next_required_source].size == 0)
+ throw Exception("Cannot fetch required block. Source " + toString(next_required_source), ErrorCodes::RECEIVED_EMPTY_DATA);
+
+ /// Surprisingly this call may directly change some internal state of ColumnGathererStream.
/// output_column. See ColumnGathererStream::gather.
- output_column->gather(*this);
- if (!output_column->empty())
- output_block.getByPosition(0).column = std::move(output_column);
+ result_column->gather(*this);
- return output_block;
+ if (next_required_source != -1)
+ return Status(next_required_source);
+
+ if (source_to_fully_copy && result_column->empty())
+ {
+ Chunk res;
+ merged_rows += source_to_fully_copy->column->size();
+ merged_bytes += source_to_fully_copy->column->allocatedBytes();
+ res.addColumn(source_to_fully_copy->column);
+ source_to_fully_copy->pos = source_to_fully_copy->size;
+ source_to_fully_copy = nullptr;
+ return Status(std::move(res));
+ }
+
+ auto col = result_column->cloneEmpty();
+ result_column.swap(col);
+
+ Chunk res;
+ merged_rows += col->size();
+ merged_bytes += col->allocatedBytes();
+ res.addColumn(std::move(col));
+ return Status(std::move(res), row_sources_buf.eof() && !source_to_fully_copy);
}
-void ColumnGathererStream::fetchNewBlock(Source & source, size_t source_num)
+void ColumnGathererStream::consume(Input & input, size_t source_num)
{
- try
- {
- source.block = children[source_num]->read();
- source.update(column_name);
- }
- catch (Exception & e)
- {
- e.addMessage("Cannot fetch required block. Stream " + children[source_num]->getName() + ", part " + toString(source_num));
- throw;
- }
+ auto & source = sources[source_num];
+ if (input.chunk)
+ source.update(input.chunk.getColumns().at(0));
if (0 == source.size)
{
- throw Exception("Fetched block is empty. Stream " + children[source_num]->getName() + ", part " + toString(source_num),
+ throw Exception("Fetched block is empty. Source " + toString(source_num),
ErrorCodes::RECEIVED_EMPTY_DATA);
}
}
-
-void ColumnGathererStream::readSuffixImpl()
+ColumnGathererTransform::ColumnGathererTransform(
+ const Block & header,
+ size_t num_inputs,
+ ReadBuffer & row_sources_buf_,
+ size_t block_preferred_size_)
+ : IMergingTransform(
+ num_inputs, header, header, /*have_all_inputs_=*/ true, /*has_limit_below_one_block_=*/ false,
+ num_inputs, row_sources_buf_, block_preferred_size_)
+ , log(&Poco::Logger::get("ColumnGathererStream"))
{
- const BlockStreamProfileInfo & profile_info = getProfileInfo();
+ if (header.columns() != 1)
+ throw Exception(
+ "Header should have 1 column, but contains " + toString(header.columns()),
+ ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
+}
+void ColumnGathererTransform::work()
+{
+ Stopwatch stopwatch;
+ IMergingTransform::work();
+ elapsed_ns += stopwatch.elapsedNanoseconds();
+}
+
+void ColumnGathererTransform::onFinish()
+{
+ auto merged_rows = algorithm.getMergedRows();
+ auto merged_bytes = algorithm.getMergedRows();
/// Don't print info for small parts (< 10M rows)
- if (profile_info.rows < 10000000)
+ if (merged_rows < 10000000)
return;
- double seconds = profile_info.total_stopwatch.elapsedSeconds();
+ double seconds = static_cast(elapsed_ns) / 1000000000ULL;
+ const auto & column_name = getOutputPort().getHeader().getByPosition(0).name;
if (!seconds)
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in 0 sec.",
- column_name, static_cast(profile_info.bytes) / profile_info.rows);
+ column_name, static_cast(merged_bytes) / merged_rows);
else
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in {} sec., {} rows/sec., {}/sec.",
- column_name, static_cast(profile_info.bytes) / profile_info.rows, seconds,
- profile_info.rows / seconds, ReadableSize(profile_info.bytes / seconds));
+ column_name, static_cast(merged_bytes) / merged_rows, seconds,
+ merged_rows / seconds, ReadableSize(merged_bytes / seconds));
}
}
diff --git a/src/DataStreams/ColumnGathererStream.h b/src/DataStreams/ColumnGathererStream.h
index 05665ab3f42..2d013e596ce 100644
--- a/src/DataStreams/ColumnGathererStream.h
+++ b/src/DataStreams/ColumnGathererStream.h
@@ -1,8 +1,9 @@
#pragma once
-#include
#include
#include
+#include
+#include
namespace Poco { class Logger; }
@@ -53,77 +54,91 @@ using MergedRowSources = PODArray;
* Stream mask maps row number to index of source stream.
* Streams should contain exactly one column.
*/
-class ColumnGathererStream : public IBlockInputStream
+class ColumnGathererStream final : public IMergingAlgorithm
{
public:
- ColumnGathererStream(
- const String & column_name_, const BlockInputStreams & source_streams, ReadBuffer & row_sources_buf_,
- size_t block_preferred_size_ = DEFAULT_BLOCK_SIZE);
+ ColumnGathererStream(size_t num_inputs, ReadBuffer & row_sources_buf_, size_t block_preferred_size_ = DEFAULT_BLOCK_SIZE);
- String getName() const override { return "ColumnGatherer"; }
-
- Block readImpl() override;
-
- void readSuffixImpl() override;
-
- Block getHeader() const override { return children.at(0)->getHeader(); }
+ void initialize(Inputs inputs) override;
+ void consume(Input & input, size_t source_num) override;
+ Status merge() override;
/// for use in implementations of IColumn::gather()
template
void gather(Column & column_res);
+ UInt64 getMergedRows() const { return merged_rows; }
+ UInt64 getMergedBytes() const { return merged_bytes; }
+
private:
/// Cache required fields
struct Source
{
- const IColumn * column = nullptr;
+ ColumnPtr column;
size_t pos = 0;
size_t size = 0;
- Block block;
- void update(const String & name)
+ void update(ColumnPtr column_)
{
- column = block.getByName(name).column.get();
- size = block.rows();
+ column = std::move(column_);
+ size = column->size();
pos = 0;
}
};
- void fetchNewBlock(Source & source, size_t source_num);
-
- String column_name;
- ColumnWithTypeAndName column;
+ MutableColumnPtr result_column;
std::vector