Merge branch 'master' of https://github.com/ClickHouse/ClickHouse into filelog-engine

This commit is contained in:
feng lv 2021-10-11 06:09:43 +00:00
commit a57e97d2e0
199 changed files with 2015 additions and 1839 deletions

View File

@ -1,4 +1,4 @@
### ClickHouse release v21.10, 2021-10-08
### ClickHouse release v21.10, 2021-10-14
#### Backward Incompatible Change
@ -110,6 +110,7 @@
* Fix the issue that in case of some sophisticated query with column aliases identical to the names of expressions, bad cast may happen. This fixes [#25447](https://github.com/ClickHouse/ClickHouse/issues/25447). This fixes [#26914](https://github.com/ClickHouse/ClickHouse/issues/26914). This fix may introduce backward incompatibility: if there are different expressions with identical names, exception will be thrown. It may break some rare cases when `enable_optimize_predicate_expression` is set. [#26639](https://github.com/ClickHouse/ClickHouse/pull/26639) ([alexey-milovidov](https://github.com/alexey-milovidov)).
* Now, scalar subquery always returns `Nullable` result if it's type can be `Nullable`. It is needed because in case of empty subquery it's result should be `Null`. Previously, it was possible to get error about incompatible types (type deduction does not execute scalar subquery, and it could use not-nullable type). Scalar subquery with empty result which can't be converted to `Nullable` (like `Array` or `Tuple`) now throws error. Fixes [#25411](https://github.com/ClickHouse/ClickHouse/issues/25411). [#26423](https://github.com/ClickHouse/ClickHouse/pull/26423) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Introduce syntax for here documents. Example `SELECT $doc$ VALUE $doc$`. [#26671](https://github.com/ClickHouse/ClickHouse/pull/26671) ([Maksim Kita](https://github.com/kitaisreal)). This change is backward incompatible if in query there are identifiers that contain `$` [#28768](https://github.com/ClickHouse/ClickHouse/issues/28768).
* Now indices can handle Nullable types, including `isNull` and `isNotNull`. [#12433](https://github.com/ClickHouse/ClickHouse/pull/12433) and [#12455](https://github.com/ClickHouse/ClickHouse/pull/12455) ([Amos Bird](https://github.com/amosbird)) and [#27250](https://github.com/ClickHouse/ClickHouse/pull/27250) ([Azat Khuzhin](https://github.com/azat)). But this was done with on-disk format changes, and even though new server can read old data, old server cannot. Also, in case you have `MINMAX` data skipping indices, you may get `Data after mutation/merge is not byte-identical` error, since new index will have `.idx2` extension while before it was `.idx`. That said, that you should not delay updating all existing replicas, in this case, otherwise, if old replica (<21.9) will download data from new replica with 21.9+ it will not be able to apply index for downloaded part.
#### New Feature
@ -179,7 +180,6 @@
* Add setting `log_formatted_queries` to log additional formatted query into `system.query_log`. It's useful for normalized query analysis because functions like `normalizeQuery` and `normalizeQueryKeepNames` don't parse/format queries in order to achieve better performance. [#27380](https://github.com/ClickHouse/ClickHouse/pull/27380) ([Amos Bird](https://github.com/amosbird)).
* Add two settings `max_hyperscan_regexp_length` and `max_hyperscan_regexp_total_length` to prevent huge regexp being used in hyperscan related functions, such as `multiMatchAny`. [#27378](https://github.com/ClickHouse/ClickHouse/pull/27378) ([Amos Bird](https://github.com/amosbird)).
* Memory consumed by bitmap aggregate functions now is taken into account for memory limits. This closes [#26555](https://github.com/ClickHouse/ClickHouse/issues/26555). [#27252](https://github.com/ClickHouse/ClickHouse/pull/27252) ([alexey-milovidov](https://github.com/alexey-milovidov)).
* Add new index data skipping minmax index format for proper Nullable support. [#27250](https://github.com/ClickHouse/ClickHouse/pull/27250) ([Azat Khuzhin](https://github.com/azat)).
* Add 10 seconds cache for S3 proxy resolver. [#27216](https://github.com/ClickHouse/ClickHouse/pull/27216) ([ianton-ru](https://github.com/ianton-ru)).
* Split global mutex into individual regexp construction. This helps avoid huge regexp construction blocking other related threads. [#27211](https://github.com/ClickHouse/ClickHouse/pull/27211) ([Amos Bird](https://github.com/amosbird)).
* Support schema for PostgreSQL database engine. Closes [#27166](https://github.com/ClickHouse/ClickHouse/issues/27166). [#27198](https://github.com/ClickHouse/ClickHouse/pull/27198) ([Kseniia Sumarokova](https://github.com/kssenii)).
@ -234,7 +234,6 @@
* Fix multiple block insertion into distributed table with `insert_distributed_one_random_shard = 1`. This is a marginal feature. Mark as improvement. [#23140](https://github.com/ClickHouse/ClickHouse/pull/23140) ([Amos Bird](https://github.com/amosbird)).
* Support `LowCardinality` and `FixedString` keys/values for `Map` type. [#21543](https://github.com/ClickHouse/ClickHouse/pull/21543) ([hexiaoting](https://github.com/hexiaoting)).
* Enable reloading of local disk config. [#19526](https://github.com/ClickHouse/ClickHouse/pull/19526) ([taiyang-li](https://github.com/taiyang-li)).
* Now KeyConditions can correctly skip nullable keys, including `isNull` and `isNotNull`. https://github.com/ClickHouse/ClickHouse/pull/12433. [#12455](https://github.com/ClickHouse/ClickHouse/pull/12455) ([Amos Bird](https://github.com/amosbird)).
#### Bug Fix

View File

@ -336,6 +336,10 @@ if (COMPILER_GCC OR COMPILER_CLANG)
set(COMPILER_FLAGS "${COMPILER_FLAGS} -falign-functions=32")
endif ()
if (COMPILER_GCC)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines")
endif ()
# Compiler-specific coverage flags e.g. -fcoverage-mapping for gcc
option(WITH_COVERAGE "Profile the resulting binary/binaries" OFF)

View File

@ -1,7 +1,7 @@
# Security Policy
## Security Announcements
Security fixes will be announced by posting them in the [security changelog](https://clickhouse.com/docs/en/whats-new/security-changelog/)
Security fixes will be announced by posting them in the [security changelog](https://clickhouse.com/docs/en/whats-new/security-changelog/).
## Scope and Supported Versions
@ -12,17 +12,7 @@ The following versions of ClickHouse server are currently being supported with s
| 1.x | :x: |
| 18.x | :x: |
| 19.x | :x: |
| 20.1 | :x: |
| 20.3 | :x: |
| 20.4 | :x: |
| 20.5 | :x: |
| 20.6 | :x: |
| 20.7 | :x: |
| 20.8 | :x: |
| 20.9 | :x: |
| 20.10 | :x: |
| 20.11 | :x: |
| 20.12 | :x: |
| 20.x | :x: |
| 21.1 | :x: |
| 21.2 | :x: |
| 21.3 | ✅ |

View File

@ -13,7 +13,6 @@ endif ()
if ((ARCH_ARM AND NOT ARCH_AARCH64) OR ARCH_I386)
message (FATAL_ERROR "32bit platforms are not supported")
endif ()
if (CMAKE_SYSTEM_PROCESSOR MATCHES "^(ppc64le.*|PPC64LE.*)")
set (ARCH_PPC64LE 1)
endif ()

View File

@ -1,14 +1,7 @@
if (CMAKE_VERSION VERSION_GREATER_EQUAL "3.12")
macro(add_glob cur_list)
file(GLOB __tmp RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} CONFIGURE_DEPENDS ${ARGN})
list(APPEND ${cur_list} ${__tmp})
endmacro()
else ()
macro(add_glob cur_list)
file(GLOB __tmp RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} ${ARGN})
list(APPEND ${cur_list} ${__tmp})
endmacro()
endif ()
macro(add_glob cur_list)
file(GLOB __tmp RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} ${ARGN})
list(APPEND ${cur_list} ${__tmp})
endmacro()
macro(add_headers_and_sources prefix common_path)
add_glob(${prefix}_headers ${CMAKE_CURRENT_SOURCE_DIR} ${common_path}/*.h)

2
contrib/cctz vendored

@ -1 +1 @@
Subproject commit c0f1bcb97fd2782f7c3f972fadd5aad5affac4b8
Subproject commit 9edd0861d8328b2ae77e8fb5f4d7dcd1cf33b42b

@ -1 +1 @@
Subproject commit a720b7105a610acbd7427eea475a5b6810c151eb
Subproject commit aa5429bf67a346e48ad60efd88bcefc286644bf3

2
contrib/libcxx vendored

@ -1 +1 @@
Subproject commit 2fa892f69acbaa40f8a18c6484854a6183a34482
Subproject commit 61e60294b1de01483caa9f5d00f437c99b674de6

View File

@ -67,7 +67,7 @@ RUN apt-get update \
unixodbc \
--yes --no-install-recommends
RUN pip3 install numpy scipy pandas Jinja2
RUN pip3 install numpy scipy pandas Jinja2 pandas clickhouse_driver
# This symlink required by gcc to find lld compiler
RUN ln -s /usr/bin/lld-${LLVM_VERSION} /usr/bin/ld.lld

View File

@ -27,7 +27,7 @@ RUN apt-get update \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
RUN pip3 install Jinja2
RUN pip3 install Jinja2 pandas clickhouse_driver
COPY * /

View File

@ -125,25 +125,9 @@ function fuzz
# interferes with gdb
export CLICKHOUSE_WATCHDOG_ENABLE=0
# NOTE: that $! cannot be used to obtain the server pid, since it will be
# the pid of the bash, due to piping the output of clickhouse-server to
# tail
PID_FILE=clickhouse-server.pid
clickhouse-server --pidfile=$PID_FILE --config-file db/config.xml -- --path db 2>&1 | tail -100000 > server.log &
server_pid=-1
for _ in {1..60}; do
if [ -s $PID_FILE ]; then
server_pid=$(cat $PID_FILE)
break
fi
sleep 1
done
if [ $server_pid = -1 ]; then
echo "Server did not started" >&2
exit 1
fi
# NOTE: we use process substitution here to preserve keep $! as a pid of clickhouse-server
clickhouse-server --config-file db/config.xml -- --path db > >(tail -100000 > server.log) 2>&1 &
server_pid=$!
kill -0 $server_pid

View File

@ -34,7 +34,7 @@ RUN apt-get update -y \
postgresql-client \
sqlite3
RUN pip3 install numpy scipy pandas Jinja2
RUN pip3 install numpy scipy pandas Jinja2 clickhouse_driver
RUN mkdir -p /tmp/clickhouse-odbc-tmp \
&& wget -nv -O - ${odbc_driver_url} | tar --strip-components=1 -xz -C /tmp/clickhouse-odbc-tmp \

View File

@ -10,7 +10,7 @@ RUN apt-get update && env DEBIAN_FRONTEND=noninteractive apt-get install --yes \
python3-pip \
pylint \
yamllint \
&& pip3 install codespell
&& pip3 install codespell pandas clickhouse_driver
COPY run.sh /
COPY process_style_check_result.py /

View File

@ -102,6 +102,7 @@ toc_title: Adopters
| <a href="https://www.rbinternational.com/" class="favicon">Raiffeisenbank</a> | Banking | Analytics | — | — | [Lecture in Russian, December 2020](https://cs.hse.ru/announcements/421965599.html) |
| <a href="https://rambler.ru" class="favicon">Rambler</a> | Internet services | Analytics | — | — | [Talk in Russian, April 2018](https://medium.com/@ramblertop/разработка-api-clickhouse-для-рамблер-топ-100-f4c7e56f3141) |
| <a href="https://retell.cc/" class="favicon">Retell</a> | Speech synthesis | Analytics | — | — | [Blog Article, August 2020](https://vc.ru/services/153732-kak-sozdat-audiostati-na-vashem-sayte-i-zachem-eto-nuzhno) |
| <a href="https://www.rollbar.com" class="favicon">Rollbar</a> | Software Development | Main Product | — | — | [Official Website](https://www.rollbar.com) |
| <a href="https://rspamd.com/" class="favicon">Rspamd</a> | Antispam | Analytics | — | — | [Official Website](https://rspamd.com/doc/modules/clickhouse.html) |
| <a href="https://rusiem.com/en" class="favicon">RuSIEM</a> | SIEM | Main Product | — | — | [Official Website](https://rusiem.com/en/products/architecture) |
| <a href="https://www.s7.ru" class="favicon">S7 Airlines</a> | Airlines | Metrics, Logging | — | — | [Talk in Russian, March 2019](https://www.youtube.com/watch?v=nwG68klRpPg&t=15s) |
@ -161,5 +162,6 @@ toc_title: Adopters
| <a href="https://zagravagames.com/en/" class="favicon">Zagrava Trading</a> | — | — | — | — | [Job offer, May 2021](https://twitter.com/datastackjobs/status/1394707267082063874) |
| <a href="https://beeline.ru/" class="favicon">Beeline</a> | Telecom | Data Platform | — | — | [Blog post, July 2021](https://habr.com/en/company/beeline/blog/567508/) |
| <a href="https://ecommpay.com/" class="favicon">Ecommpay</a> | Payment Processing | Logs | — | — | [Video, Nov 2019](https://www.youtube.com/watch?v=d3GdZTOWGLk) |
| <a href="https://omnicomm.ru/" class="favicon">Omnicomm</a> | Transportation Monitoring | — | — | — | [Facebook post, Oct 2021](https://www.facebook.com/OmnicommTeam/posts/2824479777774500) |
[Original article](https://clickhouse.com/docs/en/introduction/adopters/) <!--hide-->

View File

@ -3749,3 +3749,38 @@ Exception: Total regexp lengths too large.
**See Also**
- [max_hyperscan_regexp_length](#max-hyperscan-regexp-length)
## enable_positional_arguments {#enable-positional-arguments}
Enables or disables supporting positional arguments for [GROUP BY](../../sql-reference/statements/select/group-by.md), [LIMIT BY](../../sql-reference/statements/select/limit-by.md), [ORDER BY](../../sql-reference/statements/select/order-by.md) statements. When you want to use column numbers instead of column names in these clauses, set `enable_positional_arguments = 1`.
Possible values:
- 0 — Positional arguments aren't supported.
- 1 — Positional arguments are supported: column numbers can use instead of column names.
Default value: `0`.
**Example**
Query:
```sql
CREATE TABLE positional_arguments(one Int, two Int, three Int) ENGINE=Memory();
INSERT INTO positional_arguments VALUES (10, 20, 30), (20, 20, 10), (30, 10, 20);
SET enable_positional_arguments = 1;
SELECT * FROM positional_arguments ORDER BY 2,3;
```
Result:
```text
┌─one─┬─two─┬─three─┐
│ 30 │ 10 │ 20 │
│ 20 │ 20 │ 10 │
│ 10 │ 20 │ 30 │
└─────┴─────┴───────┘
```

View File

@ -10,6 +10,8 @@ toc_title: GROUP BY
- All the expressions in the [SELECT](../../../sql-reference/statements/select/index.md), [HAVING](../../../sql-reference/statements/select/having.md), and [ORDER BY](../../../sql-reference/statements/select/order-by.md) clauses **must** be calculated based on key expressions **or** on [aggregate functions](../../../sql-reference/aggregate-functions/index.md) over non-key expressions (including plain columns). In other words, each column selected from the table must be used either in a key expression or inside an aggregate function, but not both.
- Result of aggregating `SELECT` query will contain as many rows as there were unique values of “grouping key” in source table. Usually this signficantly reduces the row count, often by orders of magnitude, but not necessarily: row count stays the same if all “grouping key” values were distinct.
When you want to group data in the table by column numbers instead of column names, enable the setting [enable_positional_arguments](../../../operations/settings/settings.md#enable-positional-arguments).
!!! note "Note"
Theres an additional way to run aggregation over a table. If a query contains table columns only inside aggregate functions, the `GROUP BY clause` can be omitted, and aggregation by an empty set of keys is assumed. Such queries always return exactly one row.

View File

@ -144,7 +144,7 @@ Extreme values are calculated for rows before `LIMIT`, but after `LIMIT BY`. How
You can use synonyms (`AS` aliases) in any part of a query.
The `GROUP BY` and `ORDER BY` clauses do not support positional arguments. This contradicts MySQL, but conforms to standard SQL. For example, `GROUP BY 1, 2` will be interpreted as grouping by constants (i.e. aggregation of all rows into one).
The `GROUP BY`, `ORDER BY`, and `LIMIT BY` clauses can support positional arguments. To enable this, switch on the [enable_positional_arguments](../../../operations/settings/settings.md#enable-positional-arguments) setting. Then, for example, `ORDER BY 1,2` will be sorting rows in the table on the first and then the second column.
## Implementation Details {#implementation-details}

View File

@ -16,6 +16,9 @@ During query processing, ClickHouse selects data ordered by sorting key. The sor
!!! note "Note"
`LIMIT BY` is not related to [LIMIT](../../../sql-reference/statements/select/limit.md). They can both be used in the same query.
If you want to use column numbers instead of column names in the `LIMIT BY` clause, enable the setting [enable_positional_arguments](../../../operations/settings/settings.md#enable-positional-arguments).
## Examples {#examples}
Sample table:

View File

@ -4,7 +4,9 @@ toc_title: ORDER BY
# ORDER BY Clause {#select-order-by}
The `ORDER BY` clause contains a list of expressions, which can each be attributed with `DESC` (descending) or `ASC` (ascending) modifier which determine the sorting direction. If the direction is not specified, `ASC` is assumed, so its usually omitted. The sorting direction applies to a single expression, not to the entire list. Example: `ORDER BY Visits DESC, SearchPhrase`
The `ORDER BY` clause contains a list of expressions, which can each be attributed with `DESC` (descending) or `ASC` (ascending) modifier which determine the sorting direction. If the direction is not specified, `ASC` is assumed, so its usually omitted. The sorting direction applies to a single expression, not to the entire list. Example: `ORDER BY Visits DESC, SearchPhrase`.
If you want to sort by column numbers instead of column names, enable the setting [enable_positional_arguments](../../../operations/settings/settings.md#enable-positional-arguments).
Rows that have identical values for the list of sorting expressions are output in an arbitrary order, which can also be non-deterministic (different each time).
If the ORDER BY clause is omitted, the order of the rows is also undefined, and may be non-deterministic as well.

View File

@ -3538,3 +3538,38 @@ Exception: Total regexp lengths too large.
**См. также**
- [max_hyperscan_regexp_length](#max-hyperscan-regexp-length)
## enable_positional_arguments {#enable-positional-arguments}
Включает и отключает поддержку позиционных аргументов для [GROUP BY](../../sql-reference/statements/select/group-by.md), [LIMIT BY](../../sql-reference/statements/select/limit-by.md), [ORDER BY](../../sql-reference/statements/select/order-by.md). Если вы хотите использовать номера столбцов вместо названий в выражениях этих операторов, установите `enable_positional_arguments = 1`.
Возможные значения:
- 0 — Позиционные аргументы не поддерживаются.
- 1 — Позиционные аргументы поддерживаются: можно использовать номера столбцов вместо названий столбцов.
Значение по умолчанию: `0`.
**Пример**
Запрос:
```sql
CREATE TABLE positional_arguments(one Int, two Int, three Int) ENGINE=Memory();
INSERT INTO positional_arguments VALUES (10, 20, 30), (20, 20, 10), (30, 10, 20);
SET enable_positional_arguments = 1;
SELECT * FROM positional_arguments ORDER BY 2,3;
```
Результат:
```text
┌─one─┬─two─┬─three─┐
│ 30 │ 10 │ 20 │
│ 20 │ 20 │ 10 │
│ 10 │ 20 │ 30 │
└─────┴─────┴───────┘
```

View File

@ -10,6 +10,8 @@ toc_title: GROUP BY
- Все выражения в секциях [SELECT](index.md), [HAVING](having.md), и [ORDER BY](order-by.md) статьи **должны** быть вычисленными на основе ключевых выражений **или** на [агрегатных функций](../../../sql-reference/aggregate-functions/index.md) над неключевыми выражениями (включая столбцы). Другими словами, каждый столбец, выбранный из таблицы, должен использоваться либо в ключевом выражении, либо внутри агрегатной функции, но не в обоих.
- В результате агрегирования `SELECT` запрос будет содержать столько строк, сколько было уникальных значений ключа группировки в исходной таблице. Обычно агрегация значительно уменьшает количество строк, часто на порядки, но не обязательно: количество строк остается неизменным, если все исходные значения ключа группировки ценности были различны.
Если вы хотите для группировки данных в таблице указывать номера столбцов, а не названия, включите настройку [enable_positional_arguments](../../../operations/settings/settings.md#enable-positional-arguments).
!!! note "Примечание"
Есть ещё один способ запустить агрегацию по таблице. Если запрос содержит столбцы исходной таблицы только внутри агрегатных функций, то `GROUP BY` секцию можно опустить, и предполагается агрегирование по пустому набору ключей. Такие запросы всегда возвращают ровно одну строку.

View File

@ -140,8 +140,7 @@ Code: 42. DB::Exception: Received from localhost:9000. DB::Exception: Number of
Вы можете использовать синонимы (алиасы `AS`) в любом месте запроса.
В секциях `GROUP BY`, `ORDER BY`, в отличие от диалекта MySQL, и в соответствии со стандартным SQL, не поддерживаются позиционные аргументы.
Например, если вы напишите `GROUP BY 1, 2` - то это будет воспринято, как группировка по константам (то есть, агрегация всех строк в одну).
В секциях `GROUP BY`, `ORDER BY` и `LIMIT BY` можно использовать не названия столбцов, а номера. Для этого нужно включить настройку [enable_positional_arguments](../../../operations/settings/settings.md#enable-positional-arguments). Тогда, например, в запросе с `ORDER BY 1,2` будет выполнена сортировка сначала по первому, а затем по второму столбцу.
## Детали реализации {#implementation-details}

View File

@ -15,6 +15,8 @@ ClickHouse поддерживает следующий синтаксис:
`LIMIT BY` не связана с секцией `LIMIT`. Их можно использовать в одном запросе.
Если вы хотите использовать в секции `LIMIT BY` номера столбцов вместо названий, включите настройку [enable_positional_arguments](../../../operations/settings/settings.md#enable-positional-arguments).
## Примеры
Образец таблицы:

View File

@ -4,7 +4,9 @@ toc_title: ORDER BY
# Секция ORDER BY {#select-order-by}
Секция `ORDER BY` содержит список выражений, к каждому из которых также может быть приписано `DESC` или `ASC` (направление сортировки). Если ничего не приписано - это аналогично приписыванию `ASC`. `ASC` - сортировка по возрастанию, `DESC` - сортировка по убыванию. Обозначение направления сортировки действует на одно выражение, а не на весь список. Пример: `ORDER BY Visits DESC, SearchPhrase`
Секция `ORDER BY` содержит список выражений, к каждому из которых также может быть приписано `DESC` или `ASC` (направление сортировки). Если ничего не приписано - это аналогично приписыванию `ASC`. `ASC` - сортировка по возрастанию, `DESC` - сортировка по убыванию. Обозначение направления сортировки действует на одно выражение, а не на весь список. Пример: `ORDER BY Visits DESC, SearchPhrase`.
Если вы хотите для сортировки данных указывать номера столбцов, а не названия, включите настройку [enable_positional_arguments](../../../operations/settings/settings.md#enable-positional-arguments).
Строки, для которых список выражений, по которым производится сортировка, принимает одинаковые значения, выводятся в произвольном порядке, который может быть также недетерминированным (каждый раз разным).
Если секция ORDER BY отсутствует, то, аналогично, порядок, в котором идут строки, не определён, и может быть недетерминированным.

View File

@ -28,7 +28,7 @@
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/UseSSL.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/RemoteQueryExecutor.h>
#include <Interpreters/Context.h>
#include <Client/Connection.h>
#include <Common/InterruptListener.h>
@ -424,20 +424,19 @@ private:
if (reconnect)
connection.disconnect();
RemoteBlockInputStream stream(
RemoteQueryExecutor executor(
connection, query, {}, global_context, nullptr, Scalars(), Tables(), query_processing_stage);
if (!query_id.empty())
stream.setQueryId(query_id);
executor.setQueryId(query_id);
Progress progress;
stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
executor.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
stream.readPrefix();
while (Block block = stream.read());
BlockStreamProfileInfo info;
while (Block block = executor.read())
info.update(block);
stream.readSuffix();
const BlockStreamProfileInfo & info = stream.getProfileInfo();
executor.finish();
double seconds = watch.elapsedSeconds();

View File

@ -14,7 +14,6 @@
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Sources/RemoteSource.h>
#include <DataStreams/ExpressionBlockInputStream.h>
namespace DB
{

View File

@ -49,7 +49,7 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Formats/FormatSettings.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/RemoteQueryExecutor.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataStreams/NullBlockOutputStream.h>

View File

@ -1036,6 +1036,10 @@ if (ThreadFuzzer::instance().isEffective())
server.start();
SCOPE_EXIT({
/// Stop reloading of the main config. This must be done before `global_context->shutdown()` because
/// otherwise the reloading may pass a changed config to some destroyed parts of ContextSharedPart.
main_config_reloader.reset();
/** Ask to cancel background jobs all table engines,
* and also query_log.
* It is important to do early, not in destructor of Context, because
@ -1076,9 +1080,6 @@ if (ThreadFuzzer::instance().isEffective())
/// Wait server pool to avoid use-after-free of destroyed context in the handlers
server_pool.joinAll();
// Uses a raw pointer to global context for getting ZooKeeper.
main_config_reloader.reset();
/** Explicitly destroy Context. It is more convenient than in destructor of Server, because logger is still available.
* At this moment, no one could own shared part of Context.
*/
@ -1159,7 +1160,6 @@ if (ThreadFuzzer::instance().isEffective())
UInt64 total_memory_profiler_step = config().getUInt64("total_memory_profiler_step", 0);
if (total_memory_profiler_step)
{
total_memory_tracker.setOrRaiseProfilerLimit(total_memory_profiler_step);
total_memory_tracker.setProfilerStep(total_memory_profiler_step);
}

View File

@ -62,6 +62,27 @@
-->
</logger>
<!-- Add headers to response in options request. OPTIONS method is used in CORS preflight requests. -->
<!-- It is off by default. Next headers are obligate for CORS.-->
<!-- http_options_response>
<header>
<name>Access-Control-Allow-Origin</name>
<value>*</value>
</header>
<header>
<name>Access-Control-Allow-Headers</name>
<value>origin, x-requested-with</value>
</header>
<header>
<name>Access-Control-Allow-Methods</name>
<value>POST, GET, OPTIONS</value>
</header>
<header>
<name>Access-Control-Max-Age</name>
<value>86400</value>
</header>
</http_options_response -->
<!-- It is the name that will be shown in the clickhouse-client.
By default, anything with "production" will be highlighted in red in query prompt.
-->

View File

@ -48,7 +48,7 @@
#include <IO/CompressionMethod.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <DataStreams/InternalTextLogsRowOutputStream.h>
#include <DataStreams/InternalTextLogs.h>
namespace fs = std::filesystem;
@ -95,6 +95,9 @@ void interruptSignalHandler(int signum)
_exit(signum);
}
ClientBase::~ClientBase() = default;
ClientBase::ClientBase() = default;
void ClientBase::setupSignalHandler()
{
exit_on_signal.test_and_set();
@ -393,8 +396,7 @@ void ClientBase::initLogsOutputStream()
}
}
logs_out_stream = std::make_shared<InternalTextLogsRowOutputStream>(*wb, stdout_is_a_tty);
logs_out_stream->writePrefix();
logs_out_stream = std::make_unique<InternalTextLogs>(*wb, stdout_is_a_tty);
}
}
@ -426,10 +428,8 @@ void ClientBase::processTextAsSingleQuery(const String & full_query)
catch (Exception & e)
{
if (!is_interactive)
{
e.addMessage("(in query: {})", full_query);
throw;
}
throw;
}
if (have_error)
@ -641,9 +641,6 @@ void ClientBase::onEndOfStream()
if (block_out_stream)
block_out_stream->writeSuffix();
if (logs_out_stream)
logs_out_stream->writeSuffix();
resetOutput();
if (is_interactive && !written_first_block)

View File

@ -32,12 +32,17 @@ enum MultiQueryProcessingStage
void interruptSignalHandler(int signum);
class InternalTextLogs;
class ClientBase : public Poco::Util::Application
{
public:
using Arguments = std::vector<String>;
ClientBase();
~ClientBase() override;
void init(int argc, char ** argv);
protected:
@ -177,7 +182,7 @@ protected:
/// The user could specify special file for server logs (stderr by default)
std::unique_ptr<WriteBuffer> out_logs_buf;
String server_logs_file;
BlockOutputStreamPtr logs_out_stream;
std::unique_ptr<InternalTextLogs> logs_out_stream;
String home_path;
String history_file; /// Path to a file containing command history.

View File

@ -9,8 +9,8 @@
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <IO/TimeoutSetter.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/NativeReader.h>
#include <DataStreams/NativeWriter.h>
#include <Client/Connection.h>
#include <Client/ConnectionParameters.h>
#include <Common/ClickHouseRevision.h>
@ -58,6 +58,35 @@ namespace ErrorCodes
extern const int EMPTY_DATA_PASSED;
}
Connection::~Connection() = default;
Connection::Connection(const String & host_, UInt16 port_,
const String & default_database_,
const String & user_, const String & password_,
const String & cluster_,
const String & cluster_secret_,
const String & client_name_,
Protocol::Compression compression_,
Protocol::Secure secure_,
Poco::Timespan sync_request_timeout_)
: host(host_), port(port_), default_database(default_database_)
, user(user_), password(password_)
, cluster(cluster_)
, cluster_secret(cluster_secret_)
, client_name(client_name_)
, compression(compression_)
, secure(secure_)
, sync_request_timeout(sync_request_timeout_)
, log_wrapper(*this)
{
/// Don't connect immediately, only on first need.
if (user.empty())
user = "default";
setDescription();
}
void Connection::connect(const ConnectionTimeouts & timeouts)
{
@ -533,11 +562,11 @@ void Connection::sendData(const Block & block, const String & name, bool scalar)
if (!block_out)
{
if (compression == Protocol::Compression::Enable)
maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(*out, compression_codec);
maybe_compressed_out = std::make_unique<CompressedWriteBuffer>(*out, compression_codec);
else
maybe_compressed_out = out;
block_out = std::make_shared<NativeBlockOutputStream>(*maybe_compressed_out, server_revision, block.cloneEmpty());
block_out = std::make_unique<NativeWriter>(*maybe_compressed_out, server_revision, block.cloneEmpty());
}
if (scalar)
@ -866,18 +895,18 @@ Packet Connection::receivePacket()
Block Connection::receiveData()
{
initBlockInput();
return receiveDataImpl(block_in);
return receiveDataImpl(*block_in);
}
Block Connection::receiveLogData()
{
initBlockLogsInput();
return receiveDataImpl(block_logs_in);
return receiveDataImpl(*block_logs_in);
}
Block Connection::receiveDataImpl(BlockInputStreamPtr & stream)
Block Connection::receiveDataImpl(NativeReader & reader)
{
String external_table_name;
readStringBinary(external_table_name, *in);
@ -885,7 +914,7 @@ Block Connection::receiveDataImpl(BlockInputStreamPtr & stream)
size_t prev_bytes = in->count();
/// Read one block from network.
Block res = stream->read();
Block res = reader.read();
if (throttler)
throttler->add(in->count() - prev_bytes);
@ -912,7 +941,7 @@ void Connection::initBlockInput()
maybe_compressed_in = in;
}
block_in = std::make_shared<NativeBlockInputStream>(*maybe_compressed_in, server_revision);
block_in = std::make_unique<NativeReader>(*maybe_compressed_in, server_revision);
}
}
@ -922,7 +951,7 @@ void Connection::initBlockLogsInput()
if (!block_logs_in)
{
/// Have to return superset of SystemLogsQueue::getSampleBlock() columns
block_logs_in = std::make_shared<NativeBlockInputStream>(*in, server_revision);
block_logs_in = std::make_unique<NativeReader>(*in, server_revision);
}
}

View File

@ -32,6 +32,9 @@ struct ConnectionParameters;
using ConnectionPtr = std::shared_ptr<Connection>;
using Connections = std::vector<ConnectionPtr>;
class NativeReader;
class NativeWriter;
/** Connection with database server, to use by client.
* How to use - see Core/Protocol.h
@ -53,25 +56,9 @@ public:
const String & client_name_,
Protocol::Compression compression_,
Protocol::Secure secure_,
Poco::Timespan sync_request_timeout_ = Poco::Timespan(DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC, 0))
:
host(host_), port(port_), default_database(default_database_),
user(user_), password(password_),
cluster(cluster_),
cluster_secret(cluster_secret_),
client_name(client_name_),
compression(compression_),
secure(secure_),
sync_request_timeout(sync_request_timeout_),
log_wrapper(*this)
{
/// Don't connect immediately, only on first need.
Poco::Timespan sync_request_timeout_ = Poco::Timespan(DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC, 0));
if (user.empty())
user = "default";
setDescription();
}
~Connection() override;
static ServerConnectionPtr createConnection(const ConnectionParameters & parameters, ContextPtr context);
@ -217,12 +204,12 @@ private:
/// From where to read query execution result.
std::shared_ptr<ReadBuffer> maybe_compressed_in;
BlockInputStreamPtr block_in;
BlockInputStreamPtr block_logs_in;
std::unique_ptr<NativeReader> block_in;
std::unique_ptr<NativeReader> block_logs_in;
/// Where to write data for INSERT.
std::shared_ptr<WriteBuffer> maybe_compressed_out;
BlockOutputStreamPtr block_out;
std::unique_ptr<NativeWriter> block_out;
/// Logger is created lazily, for avoid to run DNS request in constructor.
class LoggerWrapper
@ -261,7 +248,7 @@ private:
Block receiveData();
Block receiveLogData();
Block receiveDataImpl(BlockInputStreamPtr & stream);
Block receiveDataImpl(NativeReader & reader);
std::vector<String> receiveMultistringMessage(UInt64 msg_type) const;
std::unique_ptr<Exception> receiveException() const;

View File

@ -144,7 +144,7 @@ Field QueryFuzzer::fuzzField(Field field)
{
size_t pos = fuzz_rand() % arr.size();
arr.erase(arr.begin() + pos);
fprintf(stderr, "erased\n");
std::cerr << "erased\n";
}
if (fuzz_rand() % 5 == 0)
@ -153,12 +153,12 @@ Field QueryFuzzer::fuzzField(Field field)
{
size_t pos = fuzz_rand() % arr.size();
arr.insert(arr.begin() + pos, fuzzField(arr[pos]));
fprintf(stderr, "inserted (pos %zd)\n", pos);
std::cerr << fmt::format("inserted (pos {})\n", pos);
}
else
{
arr.insert(arr.begin(), getRandomField(0));
fprintf(stderr, "inserted (0)\n");
std::cerr << "inserted (0)\n";
}
}
@ -278,7 +278,7 @@ void QueryFuzzer::fuzzOrderByList(IAST * ast)
}
else
{
fprintf(stderr, "no random col!\n");
std::cerr << "No random column.\n";
}
}
@ -312,13 +312,9 @@ void QueryFuzzer::fuzzColumnLikeExpressionList(IAST * ast)
: impl->children.begin() + fuzz_rand() % impl->children.size();
auto col = getRandomColumnLike();
if (col)
{
impl->children.insert(pos, col);
}
else
{
fprintf(stderr, "no random col!\n");
}
std::cerr << "No random column.\n";
}
// We don't have to recurse here to fuzz the children, this is handled by

View File

@ -2,7 +2,7 @@
#include <Columns/ColumnsCommon.h>
#include <Columns/MaskOperations.h>
#include <Common/assert_cast.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/WriteBufferFromArena.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>

View File

@ -13,7 +13,7 @@
#include <base/unaligned.h>
#include <base/sort.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <Common/Exception.h>
#include <Common/Arena.h>

View File

@ -16,7 +16,7 @@
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnCompressed.h>
#include <Columns/MaskOperations.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
template <typename T> bool decimalLess(T x, T y, UInt32 x_scale, UInt32 y_scale);

View File

@ -2,7 +2,7 @@
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnCompressed.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/WriteHelpers.h>
#include <Common/Arena.h>
#include <Common/HashTable/Hash.h>
@ -248,31 +248,23 @@ ColumnPtr ColumnFixedString::filter(const IColumn::Filter & filt, ssize_t result
UInt16 mask = _mm_movemask_epi8(_mm_cmpeq_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(filt_pos)), zero16));
mask = ~mask;
if (0 == mask)
{
/// Nothing is inserted.
data_pos += chars_per_simd_elements;
}
else if (0xFFFF == mask)
if (0xFFFF == mask)
{
res->chars.insert(data_pos, data_pos + chars_per_simd_elements);
data_pos += chars_per_simd_elements;
}
else
{
size_t res_chars_size = res->chars.size();
for (size_t i = 0; i < SIMD_BYTES; ++i)
while (mask)
{
if (filt_pos[i])
{
res->chars.resize(res_chars_size + n);
memcpySmallAllowReadWriteOverflow15(&res->chars[res_chars_size], data_pos, n);
res_chars_size += n;
}
data_pos += n;
size_t index = __builtin_ctz(mask);
res->chars.resize(res_chars_size + n);
memcpySmallAllowReadWriteOverflow15(&res->chars[res_chars_size], data_pos + index * n, n);
res_chars_size += n;
mask = mask & (mask - 1);
}
}
data_pos += chars_per_simd_elements;
filt_pos += SIMD_BYTES;
}
#endif

View File

@ -2,7 +2,7 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <DataTypes/NumberTraits.h>
#include <Common/HashTable/HashMap.h>
#include <Common/WeakHash.h>

View File

@ -1,7 +1,7 @@
#include <Columns/ColumnMap.h>
#include <Columns/ColumnCompressed.h>
#include <Columns/IColumnImpl.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <base/map.h>

View File

@ -8,7 +8,7 @@
#include <Columns/ColumnConst.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnCompressed.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
namespace DB

View File

@ -4,7 +4,7 @@
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnCompressed.h>
#include <Columns/MaskOperations.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <Common/Arena.h>
#include <Common/HashTable/Hash.h>
#include <Common/WeakHash.h>

View File

@ -3,7 +3,7 @@
#include <Columns/IColumnImpl.h>
#include <Columns/ColumnCompressed.h>
#include <Core/Field.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Common/WeakHash.h>

View File

@ -4,7 +4,7 @@
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnCompressed.h>
#include <Columns/MaskOperations.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/WriteHelpers.h>
#include <Common/Arena.h>
#include <Common/Exception.h>
@ -327,19 +327,18 @@ ColumnPtr ColumnVector<T>::filter(const IColumn::Filter & filt, ssize_t result_s
UInt16 mask = _mm_movemask_epi8(_mm_cmpeq_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(filt_pos)), zero16));
mask = ~mask;
if (0 == mask)
{
/// Nothing is inserted.
}
else if (0xFFFF == mask)
if (0xFFFF == mask)
{
res_data.insert(data_pos, data_pos + SIMD_BYTES);
}
else
{
for (size_t i = 0; i < SIMD_BYTES; ++i)
if (filt_pos[i])
res_data.push_back(data_pos[i]);
while (mask)
{
size_t index = __builtin_ctz(mask);
res_data.push_back(data_pos[index]);
mask = mask & (mask - 1);
}
}
filt_pos += SIMD_BYTES;

View File

@ -241,11 +241,7 @@ namespace
zero_vec));
mask = ~mask;
if (mask == 0)
{
/// SIMD_BYTES consecutive rows do not pass the filter
}
else if (mask == 0xffff)
if (mask == 0xffff)
{
/// SIMD_BYTES consecutive rows pass the filter
const auto first = offsets_pos == offsets_begin;
@ -262,9 +258,12 @@ namespace
}
else
{
for (size_t i = 0; i < SIMD_BYTES; ++i)
if (filt_pos[i])
copy_array(offsets_pos + i);
while (mask)
{
size_t index = __builtin_ctz(mask);
copy_array(offsets_pos + index);
mask = mask & (mask - 1);
}
}
filt_pos += SIMD_BYTES;

View File

@ -317,7 +317,7 @@ class ReverseIndex
{
public:
ReverseIndex(UInt64 num_prefix_rows_to_skip_, UInt64 base_index_)
: num_prefix_rows_to_skip(num_prefix_rows_to_skip_), base_index(base_index_), saved_hash_ptr(nullptr) {}
: num_prefix_rows_to_skip(num_prefix_rows_to_skip_), base_index(base_index_), external_saved_hash_ptr(nullptr) {}
void setColumn(ColumnType * column_);
@ -352,14 +352,14 @@ public:
if (!use_saved_hash)
return nullptr;
UInt64 * ptr = saved_hash_ptr.load();
UInt64 * ptr = external_saved_hash_ptr.load();
if (!ptr)
{
auto hash = calcHashes();
ptr = &hash->getData()[0];
UInt64 * expected = nullptr;
if (saved_hash_ptr.compare_exchange_strong(expected, ptr))
saved_hash = std::move(hash);
if (external_saved_hash_ptr.compare_exchange_strong(expected, ptr))
external_saved_hash = std::move(hash);
else
ptr = expected;
}
@ -379,7 +379,9 @@ private:
/// Lazy initialized.
std::unique_ptr<IndexMapType> index;
mutable ColumnUInt64::MutablePtr saved_hash;
mutable std::atomic<UInt64 *> saved_hash_ptr;
/// For usage during GROUP BY
mutable ColumnUInt64::MutablePtr external_saved_hash;
mutable std::atomic<UInt64 *> external_saved_hash_ptr;
void buildIndex();

View File

@ -200,11 +200,13 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
}
bool allocation_traced = false;
if (unlikely(current_profiler_limit && will_be > current_profiler_limit))
{
BlockerInThread untrack_lock(VariableContext::Global);
DB::TraceCollector::collect(DB::TraceType::Memory, StackTrace(), size);
setOrRaiseProfilerLimit((will_be + profiler_step - 1) / profiler_step * profiler_step);
allocation_traced = true;
}
std::bernoulli_distribution sample(sample_probability);
@ -212,6 +214,7 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
{
BlockerInThread untrack_lock(VariableContext::Global);
DB::TraceCollector::collect(DB::TraceType::MemorySample, StackTrace(), size);
allocation_traced = true;
}
if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded)
@ -230,17 +233,24 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded)
formatReadableSizeWithBinarySuffix(current_hard_limit));
}
bool peak_updated;
if (throw_if_memory_exceeded)
{
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
BlockerInThread untrack_lock(VariableContext::Global);
bool log_memory_usage = true;
updatePeak(will_be, log_memory_usage);
peak_updated = updatePeak(will_be, log_memory_usage);
}
else
{
bool log_memory_usage = false;
updatePeak(will_be, log_memory_usage);
peak_updated = updatePeak(will_be, log_memory_usage);
}
if (peak_updated && allocation_traced)
{
BlockerInThread untrack_lock(VariableContext::Global);
DB::TraceCollector::collect(DB::TraceType::MemoryPeak, StackTrace(), will_be);
}
if (auto * loaded_next = parent.load(std::memory_order_relaxed))
@ -259,7 +269,7 @@ void MemoryTracker::allocNoThrow(Int64 size)
allocImpl(size, throw_if_memory_exceeded);
}
void MemoryTracker::updatePeak(Int64 will_be, bool log_memory_usage)
bool MemoryTracker::updatePeak(Int64 will_be, bool log_memory_usage)
{
auto peak_old = peak.load(std::memory_order_relaxed);
if (will_be > peak_old) /// Races doesn't matter. Could rewrite with CAS, but not worth.
@ -269,7 +279,10 @@ void MemoryTracker::updatePeak(Int64 will_be, bool log_memory_usage)
if (log_memory_usage && (level == VariableContext::Process || level == VariableContext::Global)
&& will_be / log_peak_memory_usage_every > peak_old / log_peak_memory_usage_every)
logMemoryUsage(will_be);
return true;
}
return false;
}

View File

@ -58,9 +58,11 @@ private:
/// This description will be used as prefix into log messages (if isn't nullptr)
std::atomic<const char *> description_ptr = nullptr;
void updatePeak(Int64 will_be, bool log_memory_usage);
bool updatePeak(Int64 will_be, bool log_memory_usage);
void logMemoryUsage(Int64 current) const;
void setOrRaiseProfilerLimit(Int64 value);
public:
explicit MemoryTracker(VariableContext level_ = VariableContext::Thread);
explicit MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread);
@ -106,7 +108,6 @@ public:
* Otherwise, set limit to new value, if new value is greater than previous limit.
*/
void setOrRaiseHardLimit(Int64 value);
void setOrRaiseProfilerLimit(Int64 value);
void setFaultProbability(double value)
{
@ -121,6 +122,7 @@ public:
void setProfilerStep(Int64 value)
{
profiler_step = value;
setOrRaiseProfilerLimit(value);
}
/// next should be changed only once: from nullptr to some value.

View File

@ -1,19 +0,0 @@
#include <sys/resource.h>
#include "Stopwatch.h"
StopwatchRUsage::Timestamp StopwatchRUsage::Timestamp::current()
{
StopwatchRUsage::Timestamp res;
::rusage rusage {};
#if !defined(__APPLE__)
#if defined(OS_SUNOS)
::getrusage(RUSAGE_LWP, &rusage);
#else
::getrusage(RUSAGE_THREAD, &rusage);
#endif // OS_SUNOS
#endif // __APPLE__
res.user_ns = rusage.ru_utime.tv_sec * 1000000000UL + rusage.ru_utime.tv_usec * 1000UL;
res.sys_ns = rusage.ru_stime.tv_sec * 1000000000UL + rusage.ru_stime.tv_usec * 1000UL;
return res;
}

View File

@ -2,7 +2,9 @@
#include <base/time.h>
#include <base/types.h>
#include <base/defines.h>
#include <cassert>
#include <atomic>
#include <memory>
@ -14,6 +16,20 @@ inline UInt64 clock_gettime_ns(clockid_t clock_type = CLOCK_MONOTONIC)
return UInt64(ts.tv_sec * 1000000000LL + ts.tv_nsec);
}
/// Sometimes monotonic clock may not be monotonic (due to bug in kernel?).
/// It may cause some operations to fail with "Timeout exceeded: elapsed 18446744073.709553 seconds".
/// Takes previously returned value and returns it again if time stepped back for some reason.
inline UInt64 clock_gettime_ns_adjusted(UInt64 prev_time, clockid_t clock_type = CLOCK_MONOTONIC)
{
UInt64 current_time = clock_gettime_ns(clock_type);
if (likely(prev_time <= current_time))
return current_time;
/// Something probably went completely wrong if time stepped back for more than 1 second.
assert(prev_time - current_time <= 1000000000ULL);
return prev_time;
}
/** Differs from Poco::Stopwatch only by using 'clock_gettime' instead of 'gettimeofday',
* returns nanoseconds instead of microseconds, and also by other minor differencies.
*/
@ -41,7 +57,7 @@ private:
clockid_t clock_type;
bool is_running = false;
UInt64 nanoseconds() const { return clock_gettime_ns(clock_type); }
UInt64 nanoseconds() const { return clock_gettime_ns_adjusted(start_ns, clock_type); }
};
using StopwatchUniquePtr = std::unique_ptr<Stopwatch>;
@ -52,8 +68,12 @@ class AtomicStopwatch
public:
explicit AtomicStopwatch(clockid_t clock_type_ = CLOCK_MONOTONIC) : clock_type(clock_type_) { restart(); }
void restart() { start_ns = nanoseconds(); }
UInt64 elapsed() const { return nanoseconds() - start_ns; }
void restart() { start_ns = nanoseconds(0); }
UInt64 elapsed() const
{
UInt64 current_start_ns = start_ns;
return nanoseconds(current_start_ns) - current_start_ns;
}
UInt64 elapsedMilliseconds() const { return elapsed() / 1000000UL; }
double elapsedSeconds() const { return static_cast<double>(elapsed()) / 1000000000ULL; }
@ -64,8 +84,8 @@ public:
bool compareAndRestart(double seconds)
{
UInt64 threshold = static_cast<UInt64>(seconds * 1000000000.0);
UInt64 current_ns = nanoseconds();
UInt64 current_start_ns = start_ns;
UInt64 current_ns = nanoseconds(current_start_ns);
while (true)
{
@ -108,8 +128,8 @@ public:
Lock compareAndRestartDeferred(double seconds)
{
UInt64 threshold = UInt64(seconds * 1000000000.0);
UInt64 current_ns = nanoseconds();
UInt64 current_start_ns = start_ns;
UInt64 current_ns = nanoseconds(current_start_ns);
while (true)
{
@ -130,74 +150,6 @@ private:
clockid_t clock_type;
/// Most significant bit is a lock. When it is set, compareAndRestartDeferred method will return false.
UInt64 nanoseconds() const { return clock_gettime_ns(clock_type) & 0x7FFFFFFFFFFFFFFFULL; }
UInt64 nanoseconds(UInt64 prev_time) const { return clock_gettime_ns_adjusted(prev_time, clock_type) & 0x7FFFFFFFFFFFFFFFULL; }
};
/// Like ordinary StopWatch, but uses getrusage() system call
struct StopwatchRUsage
{
StopwatchRUsage() = default;
void start() { start_ts = Timestamp::current(); is_running = true; }
void stop() { stop_ts = Timestamp::current(); is_running = false; }
void reset() { start_ts = Timestamp(); stop_ts = Timestamp(); is_running = false; }
void restart() { start(); }
UInt64 elapsed(bool count_user = true, bool count_sys = true) const
{
return elapsedNanoseconds(count_user, count_sys);
}
UInt64 elapsedNanoseconds(bool count_user = true, bool count_sys = true) const
{
return (is_running ? Timestamp::current() : stop_ts).nanoseconds(count_user, count_sys) - start_ts.nanoseconds(count_user, count_sys);
}
UInt64 elapsedMicroseconds(bool count_user = true, bool count_sys = true) const
{
return elapsedNanoseconds(count_user, count_sys) / 1000UL;
}
UInt64 elapsedMilliseconds(bool count_user = true, bool count_sys = true) const
{
return elapsedNanoseconds(count_user, count_sys) / 1000000UL;
}
double elapsedSeconds(bool count_user = true, bool count_sys = true) const
{
return static_cast<double>(elapsedNanoseconds(count_user, count_sys)) / 1000000000.0;
}
private:
struct Timestamp
{
UInt64 user_ns = 0;
UInt64 sys_ns = 0;
static Timestamp current();
UInt64 nanoseconds(bool count_user = true, bool count_sys = true) const
{
return (count_user ? user_ns : 0) + (count_sys ? sys_ns : 0);
}
};
Timestamp start_ts;
Timestamp stop_ts;
bool is_running = false;
};
template <typename TStopwatch>
class StopwatchGuard : public TStopwatch
{
public:
explicit StopwatchGuard(UInt64 & elapsed_ns_) : elapsed_ns(elapsed_ns_) {}
~StopwatchGuard() { elapsed_ns += TStopwatch::elapsedNanoseconds(); }
private:
UInt64 & elapsed_ns;
};

View File

@ -35,7 +35,7 @@ void Throttler::add(size_t amount)
{
std::lock_guard lock(mutex);
auto now = clock_gettime_ns();
auto now = clock_gettime_ns_adjusted(prev_ns);
/// If prev_ns is equal to zero (first `add` call) we known nothing about speed
/// and don't track anything.
if (max_speed && prev_ns != 0)

View File

@ -20,7 +20,8 @@ enum class TraceType : uint8_t
Real,
CPU,
Memory,
MemorySample
MemorySample,
MemoryPeak,
};
class TraceCollector

View File

@ -289,7 +289,7 @@ ZooKeeper::~ZooKeeper()
{
try
{
finalize(false, false);
finalize(false, false, "destructor called");
if (send_thread.joinable())
send_thread.join();
@ -299,7 +299,7 @@ ZooKeeper::~ZooKeeper()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log);
}
}
@ -317,6 +317,7 @@ ZooKeeper::ZooKeeper(
session_timeout(session_timeout_),
operation_timeout(std::min(operation_timeout_, session_timeout_))
{
log = &Poco::Logger::get("ZooKeeperClient");
std::atomic_store(&zk_log, std::move(zk_log_));
if (!root_path.empty())
@ -450,6 +451,10 @@ void ZooKeeper::connect(
message << fail_reasons.str() << "\n";
throw Exception(message.str(), Error::ZCONNECTIONLOSS);
}
else
{
LOG_TEST(log, "Connected to ZooKeeper at {} with session_id {}", socket.peerAddress().toString(), session_id);
}
}
@ -604,8 +609,8 @@ void ZooKeeper::sendThread()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
finalize(true, false);
tryLogCurrentException(log);
finalize(true, false, "exception in sendThread");
}
}
@ -663,8 +668,8 @@ void ZooKeeper::receiveThread()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
finalize(false, true);
tryLogCurrentException(log);
finalize(false, true, "exception in receiveThread");
}
}
@ -799,7 +804,7 @@ void ZooKeeper::receiveEvent()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log);
/// Unrecoverable. Don't leave incorrect state in memory.
if (!response)
@ -819,7 +824,7 @@ void ZooKeeper::receiveEvent()
catch (...)
{
/// Throw initial exception, not exception from callback.
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log);
}
throw;
@ -832,10 +837,15 @@ void ZooKeeper::receiveEvent()
}
void ZooKeeper::finalize(bool error_send, bool error_receive)
void ZooKeeper::finalize(bool error_send, bool error_receive, const String & reason)
{
/// If some thread (send/receive) already finalizing session don't try to do it
if (finalization_started.exchange(true))
bool already_started = finalization_started.exchange(true);
LOG_TEST(log, "Finalizing session {}: finalization_started={}, queue_closed={}, reason={}",
session_id, already_started, requests_queue.isClosed(), reason);
if (already_started)
return;
auto expire_session_if_not_expired = [&]
@ -860,7 +870,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
/// This happens for example, when "Cannot push request to queue within operation timeout".
/// Just mark session expired in case of error on close request, otherwise sendThread may not stop.
expire_session_if_not_expired();
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log);
}
/// Send thread will exit after sending close request or on expired flag
@ -879,7 +889,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
catch (...)
{
/// We must continue to execute all callbacks, because the user is waiting for them.
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log);
}
if (!error_receive && receive_thread.joinable())
@ -908,7 +918,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
catch (...)
{
/// We must continue to all other callbacks, because the user is waiting for them.
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log);
}
}
}
@ -939,7 +949,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log);
}
}
}
@ -967,7 +977,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log);
}
}
}
@ -983,14 +993,14 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log);
}
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log);
}
}
@ -1028,7 +1038,7 @@ void ZooKeeper::pushRequest(RequestInfo && info)
}
catch (...)
{
finalize(false, false);
finalize(false, false, getCurrentExceptionMessage(false, false, false));
throw;
}

View File

@ -187,7 +187,7 @@ public:
/// it will do read in another session, that read may not see the
/// already performed write.
void finalize() override { finalize(false, false); }
void finalize() override { finalize(false, false, "unknown"); }
void setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_);
@ -240,6 +240,8 @@ private:
ThreadFromGlobalPool send_thread;
ThreadFromGlobalPool receive_thread;
Poco::Logger * log;
void connect(
const Nodes & node,
Poco::Timespan connection_timeout);
@ -257,7 +259,7 @@ private:
void close();
/// Call all remaining callbacks and watches, passing errors to them.
void finalize(bool error_send, bool error_receive);
void finalize(bool error_send, bool error_receive, const String & reason);
template <typename T>
void write(const T &);

View File

@ -30,7 +30,6 @@ void CachedCompressedReadBuffer::initInput()
bool CachedCompressedReadBuffer::nextImpl()
{
/// Let's check for the presence of a decompressed block in the cache, grab the ownership of this block, if it exists.
UInt128 key = cache->hash(path, file_pos);
@ -60,6 +59,13 @@ bool CachedCompressedReadBuffer::nextImpl()
working_buffer = Buffer(owned_cell->data.data(), owned_cell->data.data() + owned_cell->data.size() - owned_cell->additional_bytes);
/// nextimpl_working_buffer_offset is set in the seek function (lazy seek). So we have to
/// check that we are not seeking beyond working buffer.
if (nextimpl_working_buffer_offset > working_buffer.size())
throw Exception("Seek position is beyond the decompressed block"
" (pos: " + toString(nextimpl_working_buffer_offset) + ", block size: " + toString(working_buffer.size()) + ")",
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
file_pos += owned_cell->compressed_size;
return true;
@ -74,28 +80,29 @@ CachedCompressedReadBuffer::CachedCompressedReadBuffer(
void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
{
/// Nothing to do if we already at required position
if (file_pos == offset_in_compressed_file
&& (offset() == offset_in_decompressed_block ||
nextimpl_working_buffer_offset == offset_in_decompressed_block))
return;
if (owned_cell &&
offset_in_compressed_file == file_pos - owned_cell->compressed_size &&
offset_in_decompressed_block <= working_buffer.size())
{
bytes += offset();
pos = working_buffer.begin() + offset_in_decompressed_block;
bytes -= offset();
}
else
{
/// Remember position in compressed file (will be moved in nextImpl)
file_pos = offset_in_compressed_file;
/// We will discard our working_buffer, but have to account rest bytes
bytes += offset();
nextImpl();
if (offset_in_decompressed_block > working_buffer.size())
throw Exception("Seek position is beyond the decompressed block"
" (pos: " + toString(offset_in_decompressed_block) + ", block size: " + toString(working_buffer.size()) + ")",
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
pos = working_buffer.begin() + offset_in_decompressed_block;
bytes -= offset();
/// No data, everything discarded
pos = working_buffer.end();
/// Remember required offset in decompressed block which will be set in
/// the next ReadBuffer::next() call
nextimpl_working_buffer_offset = offset_in_decompressed_block;
}
}

View File

@ -15,7 +15,7 @@ namespace DB
* The external cache is passed as an argument to the constructor.
* Allows you to increase performance in cases where the same blocks are often read.
* Disadvantages:
* - in case you need to read a lot of data in a row, but of them only a part is cached, you have to do seek-and.
* - in case you need to read a lot of data in a row, but some of them only a part is cached, you have to do seek-and.
*/
class CachedCompressedReadBuffer : public CompressedReadBufferBase, public ReadBuffer
{
@ -25,6 +25,8 @@ private:
std::unique_ptr<ReadBufferFromFileBase> file_in;
const std::string path;
/// Current position in file_in
size_t file_pos;
/// A piece of data from the cache, or a piece of read data that we put into the cache.
@ -37,9 +39,15 @@ private:
ReadBufferFromFileBase::ProfileCallback profile_callback;
clockid_t clock_type {};
/// Check comment in CompressedReadBuffer
/* size_t nextimpl_working_buffer_offset; */
public:
CachedCompressedReadBuffer(const std::string & path, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator, UncompressedCache * cache_, bool allow_different_codecs_ = false);
/// Seek is lazy. It doesn't move the position anywhere, just remember them and perform actual
/// seek inside nextImpl.
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);
void setProfileCallback(const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE)

View File

@ -33,6 +33,13 @@ bool CompressedReadBufferFromFile::nextImpl()
decompress(working_buffer, size_decompressed, size_compressed_without_checksum);
/// nextimpl_working_buffer_offset is set in the seek function (lazy seek). So we have to
/// check that we are not seeking beyond working buffer.
if (nextimpl_working_buffer_offset > working_buffer.size())
throw Exception("Required to move position beyond the decompressed block"
" (pos: " + toString(nextimpl_working_buffer_offset) + ", block size: " + toString(working_buffer.size()) + ")",
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
return true;
}
@ -67,33 +74,34 @@ CompressedReadBufferFromFile::CompressedReadBufferFromFile(
void CompressedReadBufferFromFile::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)
{
/// Nothing to do if we already at required position
if (!size_compressed && static_cast<size_t>(file_in.getPosition()) == offset_in_compressed_file && /// correct position in compressed file
(offset() == offset_in_decompressed_block /// correct position in buffer or
|| nextimpl_working_buffer_offset == offset_in_decompressed_block)) /// we will move our position to correct one
return;
/// Our seek is within working_buffer, so just move the position
if (size_compressed &&
offset_in_compressed_file == file_in.getPosition() - size_compressed &&
offset_in_decompressed_block <= working_buffer.size())
{
bytes += offset();
pos = working_buffer.begin() + offset_in_decompressed_block;
/// `bytes` can overflow and get negative, but in `count()` everything will overflow back and get right.
bytes -= offset();
}
else
else /// Our seek outside working buffer, so perform "lazy seek"
{
/// Actually seek compressed file
file_in.seek(offset_in_compressed_file, SEEK_SET);
/// We will discard our working_buffer, but have to account rest bytes
bytes += offset();
nextImpl();
if (offset_in_decompressed_block > working_buffer.size())
throw Exception("Seek position is beyond the decompressed block"
" (pos: " + toString(offset_in_decompressed_block) + ", block size: " + toString(working_buffer.size()) + ")",
ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
pos = working_buffer.begin() + offset_in_decompressed_block;
bytes -= offset();
/// No data, everything discarded
pos = working_buffer.end();
size_compressed = 0;
/// Remember required offset in decompressed block which will be set in
/// the next ReadBuffer::next() call
nextimpl_working_buffer_offset = offset_in_decompressed_block;
}
}
size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
{
size_t bytes_read = 0;
@ -115,9 +123,11 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
auto additional_size_at_the_end_of_buffer = codec->getAdditionalSizeAtTheEndOfBuffer();
/// If the decompressed block fits entirely where it needs to be copied.
if (size_decompressed + additional_size_at_the_end_of_buffer <= n - bytes_read)
/// If the decompressed block fits entirely where it needs to be copied and we don't
/// need to skip some bytes in decompressed data (seek happened before readBig call).
if (nextimpl_working_buffer_offset == 0 && size_decompressed + additional_size_at_the_end_of_buffer <= n - bytes_read)
{
decompressTo(to + bytes_read, size_decompressed, size_compressed_without_checksum);
bytes_read += size_decompressed;
bytes += size_decompressed;
@ -134,7 +144,11 @@ size_t CompressedReadBufferFromFile::readBig(char * to, size_t n)
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
decompress(working_buffer, size_decompressed, size_compressed_without_checksum);
pos = working_buffer.begin();
/// Manually take nextimpl_working_buffer_offset into account, because we don't use
/// nextImpl in this method.
pos = working_buffer.begin() + nextimpl_working_buffer_offset;
nextimpl_working_buffer_offset = 0;
bytes_read += read(to + bytes_read, n - bytes_read);
break;

View File

@ -28,6 +28,19 @@ private:
ReadBufferFromFileBase & file_in;
size_t size_compressed = 0;
/// This field inherited from ReadBuffer. It's used to perform "lazy" seek, so in seek() call we:
/// 1) actually seek only underlying compressed file_in to offset_in_compressed_file;
/// 2) reset current working_buffer;
/// 3) remember the position in decompressed block in nextimpl_working_buffer_offset.
/// After following ReadBuffer::next() -> nextImpl call we will read new data into working_buffer and
/// ReadBuffer::next() will move our position in the fresh working_buffer to nextimpl_working_buffer_offset and
/// reset it to zero.
///
/// NOTE: We have independent readBig implementation, so we have to take
/// nextimpl_working_buffer_offset into account there as well.
///
/* size_t nextimpl_working_buffer_offset; */
bool nextImpl() override;
void prefetch() override;
@ -37,6 +50,9 @@ public:
CompressedReadBufferFromFile(
const std::string & path, const ReadSettings & settings, size_t estimated_size, bool allow_different_codecs_ = false);
/// Seek is lazy in some sense. We move position in compressed file_in to offset_in_compressed_file, but don't
/// read data into working_buffer and don't shit our position to offset_in_decompressed_block. Instead
/// we store this offset inside nextimpl_working_buffer_offset.
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);
size_t readBig(char * to, size_t n) override;

View File

@ -13,3 +13,6 @@ target_link_libraries (mysql_protocol PRIVATE dbms)
if(USE_SSL)
target_include_directories (mysql_protocol SYSTEM PRIVATE ${OPENSSL_INCLUDE_DIR})
endif()
add_executable (coro coro.cpp)
target_link_libraries (coro PRIVATE clickhouse_common_io)

189
src/Core/examples/coro.cpp Normal file
View File

@ -0,0 +1,189 @@
#include <cassert>
#include <iostream>
#include <string>
#include <optional>
#include <Common/Exception.h>
#include <base/logger_useful.h>
#include <Poco/ConsoleChannel.h>
#include <Poco/Logger.h>
#include <Poco/AutoPtr.h>
#if defined(__clang__)
#include <experimental/coroutine>
namespace std
{
using namespace experimental::coroutines_v1;
}
#else
#include <coroutine>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
template <typename T>
struct suspend_value // NOLINT(readability-identifier-naming)
{
constexpr bool await_ready() const noexcept { return true; } // NOLINT(readability-identifier-naming)
constexpr void await_suspend(std::coroutine_handle<>) const noexcept {} // NOLINT(readability-identifier-naming)
constexpr T await_resume() const noexcept // NOLINT(readability-identifier-naming)
{
std::cout << " ret " << val << std::endl;
return val;
}
T val;
};
template <typename T>
struct Task
{
struct promise_type // NOLINT(readability-identifier-naming)
{
using coro_handle = std::coroutine_handle<promise_type>;
auto get_return_object() { return coro_handle::from_promise(*this); } // NOLINT(readability-identifier-naming)
auto initial_suspend() { return std::suspend_never(); } // NOLINT(readability-identifier-naming)
auto final_suspend() noexcept { return suspend_value<T>{*r->value}; } // NOLINT(readability-identifier-naming)
//void return_void() {}
void return_value(T value_) { r->value = value_; } // NOLINT(readability-identifier-naming)
void unhandled_exception() // NOLINT(readability-identifier-naming)
{
DB::tryLogCurrentException("Logger");
r->exception = std::current_exception(); // NOLINT(bugprone-throw-keyword-missing)
}
explicit promise_type(std::string tag_) : tag(tag_) {}
~promise_type() { std::cout << "~promise_type " << tag << std::endl; }
std::string tag;
coro_handle next;
Task * r = nullptr;
};
using coro_handle = std::coroutine_handle<promise_type>;
bool await_ready() const noexcept { return false; } // NOLINT(readability-identifier-naming)
void await_suspend(coro_handle g) noexcept // NOLINT(readability-identifier-naming)
{
std::cout << " await_suspend " << my.promise().tag << std::endl;
std::cout << " g tag " << g.promise().tag << std::endl;
g.promise().next = my;
}
T await_resume() noexcept // NOLINT(readability-identifier-naming)
{
std::cout << " await_res " << my.promise().tag << std::endl;
return *value;
}
Task(coro_handle handle) : my(handle), tag(handle.promise().tag) // NOLINT(google-explicit-constructor)
{
assert(handle);
my.promise().r = this;
std::cout << " Task " << tag << std::endl;
}
Task(Task &) = delete;
Task(Task &&rhs) : my(rhs.my), tag(rhs.tag)
{
rhs.my = {};
std::cout << " Task&& " << tag << std::endl;
}
static bool resumeImpl(Task *r)
{
if (r->value)
return false;
auto & next = r->my.promise().next;
if (next)
{
if (resumeImpl(next.promise().r))
return true;
next = {};
}
if (!r->value)
{
r->my.resume();
if (r->exception)
std::rethrow_exception(r->exception);
}
return !r->value;
}
bool resume()
{
return resumeImpl(this);
}
T res()
{
return *value;
}
~Task()
{
std::cout << " ~Task " << tag << std::endl;
}
private:
coro_handle my;
std::string tag;
std::optional<T> value;
std::exception_ptr exception;
};
Task<int> boo([[maybe_unused]] std::string tag)
{
std::cout << "x" << std::endl;
co_await std::suspend_always();
std::cout << StackTrace().toString();
std::cout << "y" << std::endl;
co_return 1;
}
Task<int> bar([[maybe_unused]] std::string tag)
{
std::cout << "a" << std::endl;
int res1 = co_await boo("boo1");
std::cout << "b " << res1 << std::endl;
int res2 = co_await boo("boo2");
if (res2 == 1)
throw DB::Exception(1, "hello");
std::cout << "c " << res2 << std::endl;
co_return res1 + res2; // 1 + 1 = 2
}
Task<int> foo([[maybe_unused]] std::string tag)
{
std::cout << "Hello" << std::endl;
auto res1 = co_await bar("bar1");
std::cout << "Coro " << res1 << std::endl;
auto res2 = co_await bar("bar2");
std::cout << "World " << res2 << std::endl;
co_return res1 * res2; // 2 * 2 = 4
}
int main()
{
Poco::AutoPtr<Poco::ConsoleChannel> app_channel(new Poco::ConsoleChannel(std::cerr));
Poco::Logger::root().setChannel(app_channel);
Poco::Logger::root().setLevel("trace");
LOG_INFO(&Poco::Logger::get(""), "Starting");
try
{
auto t = foo("foo");
std::cout << ".. started" << std::endl;
while (t.resume())
std::cout << ".. yielded" << std::endl;
std::cout << ".. done: " << t.res() << std::endl;
}
catch (DB::Exception & e)
{
std::cout << "Got exception " << e.what() << std::endl;
std::cout << e.getStackTraceString() << std::endl;
}
}

View File

@ -1,114 +0,0 @@
#include <DataStreams/ColumnGathererStream.h>
#include <base/logger_useful.h>
#include <Common/typeid_cast.h>
#include <Common/formatReadable.h>
#include <IO/WriteHelpers.h>
#include <iomanip>
namespace DB
{
namespace ErrorCodes
{
extern const int INCOMPATIBLE_COLUMNS;
extern const int INCORRECT_NUMBER_OF_COLUMNS;
extern const int EMPTY_DATA_PASSED;
extern const int RECEIVED_EMPTY_DATA;
}
ColumnGathererStream::ColumnGathererStream(
const String & column_name_, const BlockInputStreams & source_streams, ReadBuffer & row_sources_buf_,
size_t block_preferred_size_)
: column_name(column_name_), sources(source_streams.size()), row_sources_buf(row_sources_buf_)
, block_preferred_size(block_preferred_size_), log(&Poco::Logger::get("ColumnGathererStream"))
{
if (source_streams.empty())
throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED);
children.assign(source_streams.begin(), source_streams.end());
for (size_t i = 0; i < children.size(); ++i)
{
const Block & header = children[i]->getHeader();
/// Sometimes MergeTreeReader injects additional column with partitioning key
if (header.columns() > 2)
throw Exception(
"Block should have 1 or 2 columns, but contains " + toString(header.columns()),
ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
if (i == 0)
{
column.name = column_name;
column.type = header.getByName(column_name).type;
column.column = column.type->createColumn();
}
else if (header.getByName(column_name).column->getName() != column.column->getName())
throw Exception("Column types don't match", ErrorCodes::INCOMPATIBLE_COLUMNS);
}
}
Block ColumnGathererStream::readImpl()
{
/// Special case: single source and there are no skipped rows
if (children.size() == 1 && row_sources_buf.eof() && !source_to_fully_copy)
return children[0]->read();
if (!source_to_fully_copy && row_sources_buf.eof())
return Block();
MutableColumnPtr output_column = column.column->cloneEmpty();
output_block = Block{column.cloneEmpty()};
/// Surprisingly this call may directly change output_block, bypassing
/// output_column. See ColumnGathererStream::gather.
output_column->gather(*this);
if (!output_column->empty())
output_block.getByPosition(0).column = std::move(output_column);
return output_block;
}
void ColumnGathererStream::fetchNewBlock(Source & source, size_t source_num)
{
try
{
source.block = children[source_num]->read();
source.update(column_name);
}
catch (Exception & e)
{
e.addMessage("Cannot fetch required block. Stream " + children[source_num]->getName() + ", part " + toString(source_num));
throw;
}
if (0 == source.size)
{
throw Exception("Fetched block is empty. Stream " + children[source_num]->getName() + ", part " + toString(source_num),
ErrorCodes::RECEIVED_EMPTY_DATA);
}
}
void ColumnGathererStream::readSuffixImpl()
{
const BlockStreamProfileInfo & profile_info = getProfileInfo();
/// Don't print info for small parts (< 10M rows)
if (profile_info.rows < 10000000)
return;
double seconds = profile_info.total_stopwatch.elapsedSeconds();
if (!seconds)
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in 0 sec.",
column_name, static_cast<double>(profile_info.bytes) / profile_info.rows);
else
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in {} sec., {} rows/sec., {}/sec.",
column_name, static_cast<double>(profile_info.bytes) / profile_info.rows, seconds,
profile_info.rows / seconds, ReadableSize(profile_info.bytes / seconds));
}
}

View File

@ -1,39 +0,0 @@
#include <Interpreters/ExpressionActions.h>
#include <DataStreams/ExpressionBlockInputStream.h>
namespace DB
{
ExpressionBlockInputStream::ExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_)
: expression(expression_)
{
children.push_back(input);
cached_header = children.back()->getHeader();
expression->execute(cached_header, true);
}
String ExpressionBlockInputStream::getName() const { return "Expression"; }
Block ExpressionBlockInputStream::getTotals()
{
totals = children.back()->getTotals();
expression->execute(totals);
return totals;
}
Block ExpressionBlockInputStream::getHeader() const
{
return cached_header.cloneEmpty();
}
Block ExpressionBlockInputStream::readImpl()
{
Block res = children.back()->read();
if (res)
expression->execute(res);
return res;
}
}

View File

@ -1,52 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
class ExpressionActions;
/** Executes a certain expression over the block.
* The expression consists of column identifiers from the block, constants, common functions.
* For example: hits * 2 + 3, url LIKE '%yandex%'
* The expression processes each row independently of the others.
*/
class ExpressionBlockInputStream : public IBlockInputStream
{
public:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
ExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_);
String getName() const override;
Block getTotals() override;
Block getHeader() const override;
protected:
ExpressionActionsPtr expression;
Block readImpl() override;
private:
Block cached_header;
};
/// ExpressionBlockInputStream that could generate many out blocks for single input block.
class InflatingExpressionBlockInputStream : public ExpressionBlockInputStream
{
public:
InflatingExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_)
: ExpressionBlockInputStream(input, expression_)
{}
protected:
Block readImpl() override;
private:
ExtraBlockPtr not_processed;
size_t action_number = 0;
};
}

View File

@ -1,4 +1,4 @@
#include "InternalTextLogsRowOutputStream.h"
#include "InternalTextLogs.h"
#include <Core/Block.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Common/typeid_cast.h>
@ -13,12 +13,7 @@
namespace DB
{
Block InternalTextLogsRowOutputStream::getHeader() const
{
return InternalTextLogsQueue::getSampleBlock();
}
void InternalTextLogsRowOutputStream::write(const Block & block)
void InternalTextLogs::write(const Block & block)
{
const auto & array_event_time = typeid_cast<const ColumnUInt32 &>(*block.getByName("event_time").column).getData();
const auto & array_microseconds = typeid_cast<const ColumnUInt32 &>(*block.getByName("event_time_microseconds").column).getData();

View File

@ -9,16 +9,15 @@ namespace DB
/// Prints internal server logs
/// Input blocks have to have the same structure as SystemLogsQueue::getSampleBlock()
/// NOTE: IRowOutputFormat does not suite well for this case
class InternalTextLogsRowOutputStream : public IBlockOutputStream
class InternalTextLogs
{
public:
InternalTextLogsRowOutputStream(WriteBuffer & buf_out, bool color_) : wb(buf_out), color(color_) {}
InternalTextLogs(WriteBuffer & buf_out, bool color_) : wb(buf_out), color(color_) {}
Block getHeader() const override;
void write(const Block & block) override;
void write(const Block & block);
void flush() override
void flush()
{
wb.next();
}

View File

@ -1,28 +0,0 @@
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/materializeBlock.h>
namespace DB
{
MaterializingBlockInputStream::MaterializingBlockInputStream(const BlockInputStreamPtr & input)
{
children.push_back(input);
}
String MaterializingBlockInputStream::getName() const
{
return "Materializing";
}
Block MaterializingBlockInputStream::getHeader() const
{
return materializeBlock(children.back()->getHeader());
}
Block MaterializingBlockInputStream::readImpl()
{
return materializeBlock(children.back()->read());
}
}

View File

@ -1,21 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
/** Converts columns-constants to full columns ("materializes" them).
*/
class MaterializingBlockInputStream : public IBlockInputStream
{
public:
MaterializingBlockInputStream(const BlockInputStreamPtr & input);
String getName() const override;
Block getHeader() const override;
protected:
Block readImpl() override;
};
}

View File

@ -8,7 +8,7 @@
#include <Common/typeid_cast.h>
#include <base/range.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeReader.h>
#include <DataTypes/DataTypeLowCardinality.h>
@ -23,17 +23,17 @@ namespace ErrorCodes
}
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_)
NativeReader::NativeReader(ReadBuffer & istr_, UInt64 server_revision_)
: istr(istr_), server_revision(server_revision_)
{
}
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_)
NativeReader::NativeReader(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_)
: istr(istr_), header(header_), server_revision(server_revision_)
{
}
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_,
NativeReader::NativeReader(ReadBuffer & istr_, UInt64 server_revision_,
IndexForNativeFormat::Blocks::const_iterator index_block_it_,
IndexForNativeFormat::Blocks::const_iterator index_block_end_)
: istr(istr_), server_revision(server_revision_),
@ -57,21 +57,13 @@ NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server
}
// also resets few vars from IBlockInputStream (I didn't want to propagate resetParser upthere)
void NativeBlockInputStream::resetParser()
void NativeReader::resetParser()
{
istr_concrete = nullptr;
use_index = false;
#ifndef NDEBUG
read_prefix_is_called = false;
read_suffix_is_called = false;
#endif
is_cancelled.store(false);
is_killed.store(false);
}
void NativeBlockInputStream::readData(const IDataType & type, ColumnPtr & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint)
void NativeReader::readData(const IDataType & type, ColumnPtr & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint)
{
ISerialization::DeserializeBinaryBulkSettings settings;
settings.getter = [&](ISerialization::SubstreamPath) -> ReadBuffer * { return &istr; };
@ -91,13 +83,13 @@ void NativeBlockInputStream::readData(const IDataType & type, ColumnPtr & column
}
Block NativeBlockInputStream::getHeader() const
Block NativeReader::getHeader() const
{
return header;
}
Block NativeBlockInputStream::readImpl()
Block NativeReader::read()
{
Block res;
@ -215,7 +207,7 @@ Block NativeBlockInputStream::readImpl()
return res;
}
void NativeBlockInputStream::updateAvgValueSizeHints(const Block & block)
void NativeReader::updateAvgValueSizeHints(const Block & block)
{
auto rows = block.rows();
if (rows < 10)

View File

@ -57,32 +57,28 @@ struct IndexForNativeFormat
* Can also be used to store data on disk.
* In this case, can use the index.
*/
class NativeBlockInputStream : public IBlockInputStream
class NativeReader
{
public:
/// If a non-zero server_revision is specified, additional block information may be expected and read.
NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_);
NativeReader(ReadBuffer & istr_, UInt64 server_revision_);
/// For cases when data structure (header) is known in advance.
/// NOTE We may use header for data validation and/or type conversions. It is not implemented.
NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_);
NativeReader(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_);
/// For cases when we have an index. It allows to skip columns. Only columns specified in the index will be read.
NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_,
NativeReader(ReadBuffer & istr_, UInt64 server_revision_,
IndexForNativeFormat::Blocks::const_iterator index_block_it_,
IndexForNativeFormat::Blocks::const_iterator index_block_end_);
String getName() const override { return "Native"; }
static void readData(const IDataType & type, ColumnPtr & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint);
Block getHeader() const override;
Block getHeader() const;
void resetParser();
protected:
Block readImpl() override;
Block read();
private:
ReadBuffer & istr;

View File

@ -6,7 +6,7 @@
#include <Compression/CompressedWriteBuffer.h>
#include <DataStreams/MarkInCompressedFile.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/NativeWriter.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeLowCardinality.h>
@ -20,7 +20,7 @@ namespace ErrorCodes
}
NativeBlockOutputStream::NativeBlockOutputStream(
NativeWriter::NativeWriter(
WriteBuffer & ostr_, UInt64 client_revision_, const Block & header_, bool remove_low_cardinality_,
WriteBuffer * index_ostr_, size_t initial_size_of_file_)
: ostr(ostr_), client_revision(client_revision_), header(header_),
@ -35,7 +35,7 @@ NativeBlockOutputStream::NativeBlockOutputStream(
}
void NativeBlockOutputStream::flush()
void NativeWriter::flush()
{
ostr.next();
}
@ -62,7 +62,7 @@ static void writeData(const IDataType & type, const ColumnPtr & column, WriteBuf
}
void NativeBlockOutputStream::write(const Block & block)
void NativeWriter::write(const Block & block)
{
/// Additional information about the block.
if (client_revision > 0)

View File

@ -1,8 +1,8 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <base/types.h>
#include <DataTypes/IDataType.h>
#include <Core/Block.h>
namespace DB
{
@ -17,20 +17,20 @@ class CompressedWriteBuffer;
* A stream can be specified to write the index. The index contains offsets to each part of each column.
* If an `append` is made to an existing file, and you need to write the index, then specify `initial_size_of_file`.
*/
class NativeBlockOutputStream : public IBlockOutputStream
class NativeWriter
{
public:
/** If non-zero client_revision is specified, additional block information can be written.
*/
NativeBlockOutputStream(
NativeWriter(
WriteBuffer & ostr_, UInt64 client_revision_, const Block & header_, bool remove_low_cardinality_ = false,
WriteBuffer * index_ostr_ = nullptr, size_t initial_size_of_file_ = 0);
Block getHeader() const override { return header; }
void write(const Block & block) override;
void flush() override;
Block getHeader() const { return header; }
void write(const Block & block);
void flush();
String getContentType() const override { return "application/octet-stream"; }
static String getContentType() { return "application/octet-stream"; }
private:
WriteBuffer & ostr;

View File

@ -1,69 +0,0 @@
#include <DataStreams/RemoteBlockInputStream.h>
#include <Interpreters/Context.h>
namespace DB
{
RemoteBlockInputStream::RemoteBlockInputStream(
Connection & connection,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query_executor(connection, query_, header_, context_, throttler, scalars_, external_tables_, stage_)
{
init();
}
RemoteBlockInputStream::RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool,
std::vector<IConnectionPool::Entry> && connections,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query_executor(pool, std::move(connections), query_, header_, context_, throttler, scalars_, external_tables_, stage_)
{
init();
}
RemoteBlockInputStream::RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query_executor(pool, query_, header_, context_, throttler, scalars_, external_tables_, stage_)
{
init();
}
void RemoteBlockInputStream::init()
{
query_executor.setProgressCallback([this](const Progress & progress) { progressImpl(progress); });
query_executor.setProfileInfoCallback([this](const BlockStreamProfileInfo & info_) { info.setFrom(info_, true); });
query_executor.setLogger(log);
}
void RemoteBlockInputStream::cancel(bool kill)
{
if (kill)
is_killed = true;
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
query_executor.cancel();
}
Block RemoteBlockInputStream::readImpl()
{
auto block = query_executor.read();
if (isCancelledOrThrowIfKilled())
return Block();
return block;
}
void RemoteBlockInputStream::readSuffixImpl()
{
query_executor.finish();
}
}

View File

@ -1,78 +0,0 @@
#pragma once
#include <optional>
#include <base/logger_useful.h>
#include <DataStreams/IBlockInputStream.h>
#include <Common/Throttler.h>
#include <Client/ConnectionPool.h>
#include <Client/MultiplexedConnections.h>
#include <Interpreters/Cluster.h>
#include <DataStreams/RemoteQueryExecutor.h>
namespace DB
{
class Context;
/** This class allows one to launch queries on remote replicas of one shard and get results
*/
class RemoteBlockInputStream : public IBlockInputStream
{
public:
/// Takes already set connection.
RemoteBlockInputStream(
Connection & connection,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/// Accepts several connections already taken from pool.
RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool,
std::vector<IConnectionPool::Entry> && connections,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/// Takes a pool and gets one or several connections from it.
RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/// Set the query_id. For now, used by performance test to later find the query
/// in the server query_log. Must be called before sending the query to the server.
void setQueryId(const std::string & query_id) { query_executor.setQueryId(query_id); }
/// Specify how we allocate connections on a shard.
void setPoolMode(PoolMode pool_mode) { query_executor.setPoolMode(pool_mode); }
void setMainTable(StorageID main_table_) { query_executor.setMainTable(std::move(main_table_)); }
/// Prevent default progress notification because progress' callback is called by its own.
void progress(const Progress & /*value*/) override {}
void cancel(bool kill) override;
String getName() const override { return "Remote"; }
Block getHeader() const override { return query_executor.getHeader(); }
Block getTotals() override { return query_executor.getTotals(); }
Block getExtremes() override { return query_executor.getExtremes(); }
protected:
Block readImpl() override;
void readSuffixImpl() override;
private:
RemoteQueryExecutor query_executor;
Poco::Logger * log = &Poco::Logger::get("RemoteBlockInputStream");
void init();
};
}

View File

@ -16,18 +16,17 @@
namespace DB
{
TTLBlockInputStream::TTLBlockInputStream(
const BlockInputStreamPtr & input_,
TTLTransform::TTLTransform(
const Block & header_,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::MutableDataPartPtr & data_part_,
time_t current_time_,
bool force_)
: data_part(data_part_)
, log(&Poco::Logger::get(storage_.getLogName() + " (TTLBlockInputStream)"))
: IAccumulatingTransform(header_, header_)
, data_part(data_part_)
, log(&Poco::Logger::get(storage_.getLogName() + " (TTLTransform)"))
{
children.push_back(input_);
header = children.at(0)->getHeader();
auto old_ttl_infos = data_part->ttl_infos;
if (metadata_snapshot_->hasRowsTTL())
@ -50,7 +49,7 @@ TTLBlockInputStream::TTLBlockInputStream(
for (const auto & group_by_ttl : metadata_snapshot_->getGroupByTTLs())
algorithms.emplace_back(std::make_unique<TTLAggregationAlgorithm>(
group_by_ttl, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_, header, storage_));
group_by_ttl, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_, getInputPort().getHeader(), storage_));
if (metadata_snapshot_->hasAnyColumnTTL())
{
@ -98,22 +97,40 @@ Block reorderColumns(Block block, const Block & header)
return res;
}
Block TTLBlockInputStream::readImpl()
void TTLTransform::consume(Chunk chunk)
{
if (all_data_dropped)
return {};
{
finishConsume();
return;
}
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
auto block = children.at(0)->read();
for (const auto & algorithm : algorithms)
algorithm->execute(block);
if (!block)
return block;
return;
return reorderColumns(std::move(block), header);
size_t num_rows = block.rows();
setReadyChunk(Chunk(reorderColumns(std::move(block), getOutputPort().getHeader()).getColumns(), num_rows));
}
void TTLBlockInputStream::readSuffixImpl()
Chunk TTLTransform::generate()
{
Block block;
for (const auto & algorithm : algorithms)
algorithm->execute(block);
if (!block)
return {};
size_t num_rows = block.rows();
return Chunk(reorderColumns(std::move(block), getOutputPort().getHeader()).getColumns(), num_rows);
}
void TTLTransform::finalize()
{
data_part->ttl_infos = {};
for (const auto & algorithm : algorithms)
@ -126,4 +143,13 @@ void TTLBlockInputStream::readSuffixImpl()
}
}
IProcessor::Status TTLTransform::prepare()
{
auto status = IAccumulatingTransform::prepare();
if (status == Status::Finished)
finalize();
return status;
}
}

View File

@ -1,5 +1,5 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Processors/IAccumulatingTransform.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Core/Block.h>
@ -12,11 +12,11 @@
namespace DB
{
class TTLBlockInputStream : public IBlockInputStream
class TTLTransform : public IAccumulatingTransform
{
public:
TTLBlockInputStream(
const BlockInputStreamPtr & input_,
TTLTransform(
const Block & header_,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::MutableDataPartPtr & data_part_,
@ -25,13 +25,15 @@ public:
);
String getName() const override { return "TTL"; }
Block getHeader() const override { return header; }
Status prepare() override;
protected:
Block readImpl() override;
void consume(Chunk chunk) override;
Chunk generate() override;
/// Finalizes ttl infos and updates data part
void readSuffixImpl() override;
void finalize();
private:
std::vector<TTLAlgorithmPtr> algorithms;
@ -41,7 +43,6 @@ private:
/// ttl_infos and empty_columns are updating while reading
const MergeTreeData::MutableDataPartPtr & data_part;
Poco::Logger * log;
Block header;
};
}

View File

@ -4,18 +4,17 @@
namespace DB
{
TTLCalcInputStream::TTLCalcInputStream(
const BlockInputStreamPtr & input_,
TTLCalcTransform::TTLCalcTransform(
const Block & header_,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::MutableDataPartPtr & data_part_,
time_t current_time_,
bool force_)
: data_part(data_part_)
, log(&Poco::Logger::get(storage_.getLogName() + " (TTLCalcInputStream)"))
: IAccumulatingTransform(header_, header_)
, data_part(data_part_)
, log(&Poco::Logger::get(storage_.getLogName() + " (TTLCalcTransform)"))
{
children.push_back(input_);
header = children.at(0)->getHeader();
auto old_ttl_infos = data_part->ttl_infos;
if (metadata_snapshot_->hasRowsTTL())
@ -51,27 +50,52 @@ TTLCalcInputStream::TTLCalcInputStream(
recompression_ttl, TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));
}
Block TTLCalcInputStream::readImpl()
void TTLCalcTransform::consume(Chunk chunk)
{
auto block = children.at(0)->read();
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
for (const auto & algorithm : algorithms)
algorithm->execute(block);
if (!block)
return block;
return;
Block res;
for (const auto & col : header)
res.insert(block.getByName(col.name));
Chunk res;
for (const auto & col : getOutputPort().getHeader())
res.addColumn(block.getByName(col.name).column);
setReadyChunk(std::move(res));
}
Chunk TTLCalcTransform::generate()
{
Block block;
for (const auto & algorithm : algorithms)
algorithm->execute(block);
if (!block)
return {};
Chunk res;
for (const auto & col : getOutputPort().getHeader())
res.addColumn(block.getByName(col.name).column);
return res;
}
void TTLCalcInputStream::readSuffixImpl()
void TTLCalcTransform::finalize()
{
data_part->ttl_infos = {};
for (const auto & algorithm : algorithms)
algorithm->finalize(data_part);
}
IProcessor::Status TTLCalcTransform::prepare()
{
auto status = IAccumulatingTransform::prepare();
if (status == Status::Finished)
finalize();
return status;
}
}

View File

@ -1,5 +1,5 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Processors/IAccumulatingTransform.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Core/Block.h>
@ -11,11 +11,11 @@
namespace DB
{
class TTLCalcInputStream : public IBlockInputStream
class TTLCalcTransform : public IAccumulatingTransform
{
public:
TTLCalcInputStream(
const BlockInputStreamPtr & input_,
TTLCalcTransform(
const Block & header_,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::MutableDataPartPtr & data_part_,
@ -24,13 +24,14 @@ public:
);
String getName() const override { return "TTL_CALC"; }
Block getHeader() const override { return header; }
Status prepare() override;
protected:
Block readImpl() override;
void consume(Chunk chunk) override;
Chunk generate() override;
/// Finalizes ttl infos and updates data part
void readSuffixImpl() override;
void finalize();
private:
std::vector<TTLAlgorithmPtr> algorithms;
@ -38,7 +39,6 @@ private:
/// ttl_infos and empty_columns are updating while reading
const MergeTreeData::MutableDataPartPtr & data_part;
Poco::Logger * log;
Block header;
};
}

View File

@ -1,7 +1,7 @@
#include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/NativeReader.h>
#include <DataStreams/NativeWriter.h>
#include <DataStreams/copyData.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ISource.h>
@ -17,13 +17,13 @@ namespace DB
TemporaryFileStream::TemporaryFileStream(const std::string & path)
: file_in(path)
, compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, DBMS_TCP_PROTOCOL_VERSION))
, block_in(std::make_unique<NativeReader>(compressed_in, DBMS_TCP_PROTOCOL_VERSION))
{}
TemporaryFileStream::TemporaryFileStream(const std::string & path, const Block & header_)
: file_in(path)
, compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, header_, 0))
, block_in(std::make_unique<NativeReader>(compressed_in, header_, 0))
{}
/// Flush data from input stream into file for future reading
@ -31,18 +31,15 @@ void TemporaryFileStream::write(const std::string & path, const Block & header,
{
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf, CompressionCodecFactory::instance().get(codec, {}));
NativeBlockOutputStream output(compressed_buf, 0, header);
NativeWriter output(compressed_buf, 0, header);
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingPipelineExecutor executor(pipeline);
output.writePrefix();
Block block;
while (executor.pull(block))
output.write(block);
output.writeSuffix();
compressed_buf.finalize();
}

View File

@ -5,6 +5,7 @@
#include <Compression/CompressedReadBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/NativeReader.h>
namespace DB
{
@ -14,7 +15,7 @@ struct TemporaryFileStream
{
ReadBufferFromFile file_in;
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
std::unique_ptr<NativeReader> block_in;
explicit TemporaryFileStream(const std::string & path);
TemporaryFileStream(const std::string & path, const Block & header_);

View File

@ -16,6 +16,8 @@ void DDLDependencyVisitor::visit(const ASTPtr & ast, Data & data)
visit(*function, data);
else if (const auto * dict_source = ast->as<ASTFunctionWithKeyValueArguments>())
visit(*dict_source, data);
else if (const auto * storage = ast->as<ASTStorage>())
visit(*storage, data);
}
bool DDLDependencyVisitor::needChildVisit(const ASTPtr & node, const ASTPtr & child)
@ -66,6 +68,16 @@ void DDLDependencyVisitor::visit(const ASTFunctionWithKeyValueArguments & dict_s
data.dependencies.emplace(std::move(info->table_name));
}
void DDLDependencyVisitor::visit(const ASTStorage & storage, Data & data)
{
if (!storage.engine)
return;
if (storage.engine->name != "Dictionary")
return;
extractTableNameFromArgument(*storage.engine, data, 0);
}
void DDLDependencyVisitor::extractTableNameFromArgument(const ASTFunction & function, Data & data, size_t arg_idx)
{

View File

@ -8,6 +8,7 @@ namespace DB
class ASTFunction;
class ASTFunctionWithKeyValueArguments;
class ASTStorage;
/// Visits ASTCreateQuery and extracts names of table (or dictionary) dependencies
/// from column default expressions (joinGet, dictGet, etc)
@ -33,6 +34,7 @@ public:
private:
static void visit(const ASTFunction & function, Data & data);
static void visit(const ASTFunctionWithKeyValueArguments & dict_source, Data & data);
static void visit(const ASTStorage & storage, Data & data);
static void extractTableNameFromArgument(const ASTFunction & function, Data & data, size_t arg_idx);
};

View File

@ -250,7 +250,7 @@ PostgreSQLTableStructure fetchPostgreSQLTableStructure(
"and i.oid = ix.indexrelid "
"and a.attrelid = t.oid "
"and a.attnum = ANY(ix.indkey) "
"and t.relkind = 'r' " /// simple tables
"and t.relkind in ('r', 'p') " /// simple tables
"and t.relname = {} " /// Connection is already done to a needed database, only table name is needed.
"and ix.indisreplident = 't' " /// index is is replica identity index
"ORDER BY a.attname", /// column names

View File

@ -67,7 +67,7 @@ void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
// Poco/MongoDB/BSONWriter.h:54: void writeCString(const std::string & value);
// src/IO/WriteHelpers.h:146 #define writeCString(s, buf)
#include <IO/WriteHelpers.h>
#include <DataStreams/MongoDBSource.h>
#include <Processors/Transforms/MongoDBSource.h>
namespace DB

View File

@ -8,7 +8,7 @@
#if USE_LIBPQXX
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/PostgreSQLSource.h>
#include <Processors/Transforms/PostgreSQLSource.h>
#include "readInvalidateQuery.h"
#include <Interpreters/Context.h>
#include <Processors/QueryPipeline.h>

View File

@ -11,7 +11,6 @@
#include <Processors/Formats/OutputStreamToOutputFormat.h>
#include <Processors/Formats/Impl/ValuesBlockInputFormat.h>
#include <Processors/Formats/Impl/MySQLOutputFormat.h>
#include <Processors/Formats/Impl/NativeFormat.h>
#include <Processors/Formats/Impl/ParallelParsingInputFormat.h>
#include <Processors/Formats/Impl/ParallelFormattingOutputFormat.h>
#include <Poco/URI.h>
@ -138,9 +137,6 @@ InputFormatPtr FormatFactory::getInput(
UInt64 max_block_size,
const std::optional<FormatSettings> & _format_settings) const
{
if (name == "Native")
return std::make_shared<NativeInputFormatFromNativeBlockInputStream>(sample, buf);
auto format_settings = _format_settings
? *_format_settings : getFormatSettings(context);

View File

@ -1,33 +1,108 @@
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/NativeReader.h>
#include <DataStreams/NativeWriter.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Transforms/AggregatingTransform.h>
namespace DB
{
class NativeInputFormat final : public IInputFormat
{
public:
NativeInputFormat(ReadBuffer & buf, const Block & header)
: IInputFormat(header, buf)
, reader(buf, header, 0) {}
String getName() const override { return "Native"; }
void resetParser() override
{
IInputFormat::resetParser();
reader.resetParser();
}
Chunk generate() override
{
auto block = reader.read();
if (!block)
return {};
assertBlocksHaveEqualStructure(getPort().getHeader(), block, getName());
block.checkNumberOfRows();
size_t num_rows = block.rows();
return Chunk(block.getColumns(), num_rows);
}
private:
NativeReader reader;
};
class NativeOutputFormat final : public IOutputFormat
{
public:
NativeOutputFormat(WriteBuffer & buf, const Block & header)
: IOutputFormat(header, buf)
, writer(buf, 0, header)
{
}
String getName() const override { return "Native"; }
std::string getContentType() const override
{
return writer.getContentType();
}
protected:
void consume(Chunk chunk) override
{
if (chunk)
{
auto block = getPort(PortKind::Main).getHeader();
// const auto & info = chunk.getChunkInfo();
// const auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get());
// if (agg_info)
// {
// block.info.bucket_num = agg_info->bucket_num;
// block.info.is_overflows = agg_info->is_overflows;
// }
block.setColumns(chunk.detachColumns());
writer.write(block);
}
}
private:
NativeWriter writer;
};
void registerInputFormatNative(FormatFactory & factory)
{
factory.registerInputFormat("Native", [](
factory.registerInputFormatProcessor("Native", [](
ReadBuffer & buf,
const Block & sample,
UInt64 /* max_block_size */,
FormatFactory::ReadCallback /* callback */,
const RowInputFormatParams &,
const FormatSettings &)
{
return std::make_shared<NativeBlockInputStream>(buf, sample, 0);
return std::make_shared<NativeInputFormat>(buf, sample);
});
}
void registerOutputFormatNative(FormatFactory & factory)
{
factory.registerOutputFormat("Native", [](
factory.registerOutputFormatProcessor("Native", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings &)
{
return std::make_shared<NativeBlockOutputStream>(buf, 0, sample);
return std::make_shared<NativeOutputFormat>(buf, sample);
});
}

View File

@ -21,8 +21,6 @@ void registerFileSegmentationEngineJSONAsString(FormatFactory & factory);
void registerInputFormatNative(FormatFactory & factory);
void registerOutputFormatNative(FormatFactory & factory);
void registerInputFormatProcessorNative(FormatFactory & factory);
void registerOutputFormatProcessorNative(FormatFactory & factory);
void registerInputFormatProcessorRowBinary(FormatFactory & factory);
void registerOutputFormatProcessorRowBinary(FormatFactory & factory);
void registerInputFormatProcessorTabSeparated(FormatFactory & factory);
@ -96,8 +94,6 @@ void registerFormats()
registerInputFormatNative(factory);
registerOutputFormatNative(factory);
registerInputFormatProcessorNative(factory);
registerOutputFormatProcessorNative(factory);
registerInputFormatProcessorRowBinary(factory);
registerOutputFormatProcessorRowBinary(factory);
registerInputFormatProcessorTabSeparated(factory);

View File

@ -12,18 +12,9 @@ template<typename Derived, typename T, typename ColumnT>
class FunctionConstantBase : public IFunction
{
public:
/// For server-level constants (uptime(), version(), etc)
explicit FunctionConstantBase(ContextPtr context, T && constant_value_)
: is_distributed(context->isDistributed())
, constant_value(std::forward<T>(constant_value_))
{
}
/// For real constants (pi(), e(), etc)
explicit FunctionConstantBase(const T & constant_value_)
: is_distributed(false)
, constant_value(constant_value_)
template <typename U>
explicit FunctionConstantBase(U && constant_value_, bool is_distributed_ = false)
: constant_value(std::forward<U>(constant_value_)), is_distributed(is_distributed_)
{
}
@ -56,8 +47,8 @@ public:
}
private:
bool is_distributed;
const T constant_value;
bool is_distributed;
};
}

View File

@ -81,6 +81,7 @@ void registerFunctionQueryID(FunctionFactory & factory);
void registerFunctionInitialQueryID(FunctionFactory & factory);
void registerFunctionServerUUID(FunctionFactory &);
void registerFunctionZooKeeperSessionUptime(FunctionFactory &);
void registerFunctionGetOSKernelVersion(FunctionFactory &);
#if USE_ICU
void registerFunctionConvertCharset(FunctionFactory &);
@ -162,6 +163,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
registerFunctionInitialQueryID(factory);
registerFunctionServerUUID(factory);
registerFunctionZooKeeperSessionUptime(factory);
registerFunctionGetOSKernelVersion(factory);
#if USE_ICU
registerFunctionConvertCharset(factory);

View File

@ -7,6 +7,10 @@
#include <Common/DNSResolver.h>
#include <base/DateLUT.h>
#if defined(OS_LINUX)
# include <Poco/Environment.h>
#endif
#if !defined(ARCADIA_BUILD)
# include <Common/config_version.h>
#endif
@ -24,7 +28,7 @@ namespace
public:
static constexpr auto name = "buildId";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionBuildId>(context); }
explicit FunctionBuildId(ContextPtr context) : FunctionConstantBase(context, SymbolIndex::instance()->getBuildIDHex()) {}
explicit FunctionBuildId(ContextPtr context) : FunctionConstantBase(SymbolIndex::instance()->getBuildIDHex(), context->isDistributed()) {}
};
#endif
@ -35,7 +39,7 @@ namespace
public:
static constexpr auto name = "hostName";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionHostName>(context); }
explicit FunctionHostName(ContextPtr context) : FunctionConstantBase(context, DNSResolver::instance().getHostName()) {}
explicit FunctionHostName(ContextPtr context) : FunctionConstantBase(DNSResolver::instance().getHostName(), context->isDistributed()) {}
};
@ -44,7 +48,7 @@ namespace
public:
static constexpr auto name = "serverUUID";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionServerUUID>(context); }
explicit FunctionServerUUID(ContextPtr context) : FunctionConstantBase(context, ServerUUID::get()) {}
explicit FunctionServerUUID(ContextPtr context) : FunctionConstantBase(ServerUUID::get(), context->isDistributed()) {}
};
@ -53,7 +57,7 @@ namespace
public:
static constexpr auto name = "tcpPort";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionTcpPort>(context); }
explicit FunctionTcpPort(ContextPtr context) : FunctionConstantBase(context, context->getTCPPort()) {}
explicit FunctionTcpPort(ContextPtr context) : FunctionConstantBase(context->getTCPPort(), context->isDistributed()) {}
};
@ -63,7 +67,7 @@ namespace
public:
static constexpr auto name = "timezone";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionTimezone>(context); }
explicit FunctionTimezone(ContextPtr context) : FunctionConstantBase(context, String{DateLUT::instance().getTimeZone()}) {}
explicit FunctionTimezone(ContextPtr context) : FunctionConstantBase(String{DateLUT::instance().getTimeZone()}, context->isDistributed()) {}
};
@ -73,7 +77,7 @@ namespace
public:
static constexpr auto name = "uptime";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionUptime>(context); }
explicit FunctionUptime(ContextPtr context) : FunctionConstantBase(context, context->getUptimeSeconds()) {}
explicit FunctionUptime(ContextPtr context) : FunctionConstantBase(context->getUptimeSeconds(), context->isDistributed()) {}
};
@ -83,16 +87,30 @@ namespace
public:
static constexpr auto name = "version";
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionVersion>(context); }
explicit FunctionVersion(ContextPtr context) : FunctionConstantBase(context, VERSION_STRING) {}
explicit FunctionVersion(ContextPtr context) : FunctionConstantBase(VERSION_STRING, context->isDistributed()) {}
};
class FunctionZooKeeperSessionUptime : public FunctionConstantBase<FunctionZooKeeperSessionUptime, UInt32, DataTypeUInt32>
{
public:
static constexpr auto name = "zookeeperSessionUptime";
explicit FunctionZooKeeperSessionUptime(ContextPtr context) : FunctionConstantBase(context, context->getZooKeeperSessionUptime()) {}
explicit FunctionZooKeeperSessionUptime(ContextPtr context)
: FunctionConstantBase(context->getZooKeeperSessionUptime(), context->isDistributed())
{
}
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionZooKeeperSessionUptime>(context); }
};
#if defined(OS_LINUX)
class FunctionGetOSKernelVersion : public FunctionConstantBase<FunctionGetOSKernelVersion, String, DataTypeString>
{
public:
static constexpr auto name = "getOSKernelVersion";
explicit FunctionGetOSKernelVersion(ContextPtr context) : FunctionConstantBase(Poco::Environment::osName() + " " + Poco::Environment::osVersion(), context->isDistributed()) {}
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionGetOSKernelVersion>(context); }
};
#endif
}
@ -140,5 +158,14 @@ void registerFunctionZooKeeperSessionUptime(FunctionFactory & factory)
factory.registerFunction<FunctionZooKeeperSessionUptime>();
}
void registerFunctionGetOSKernelVersion([[maybe_unused]] FunctionFactory & factory)
{
#if defined(OS_LINUX)
factory.registerFunction<FunctionGetOSKernelVersion>();
#endif
}
}

View File

@ -9,7 +9,7 @@
#include <DataTypes/DataTypeLowCardinality.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/NativeWriter.h>
#include <DataStreams/materializeBlock.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
@ -1068,7 +1068,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
const std::string & path = file->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
NativeBlockOutputStream block_out(compressed_buf, DBMS_TCP_PROTOCOL_VERSION, getHeader(false));
NativeWriter block_out(compressed_buf, DBMS_TCP_PROTOCOL_VERSION, getHeader(false));
LOG_DEBUG(log, "Writing part of aggregation data into temporary file {}.", path);
ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart);
@ -1193,7 +1193,7 @@ template <typename Method>
void Aggregator::writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,
Method & method,
IBlockOutputStream & out) const
NativeWriter & out) const
{
size_t max_temporary_block_size_rows = 0;
size_t max_temporary_block_size_bytes = 0;

View File

@ -853,6 +853,7 @@ using ManyAggregatedDataVariants = std::vector<AggregatedDataVariantsPtr>;
using ManyAggregatedDataVariantsPtr = std::shared_ptr<ManyAggregatedDataVariants>;
class CompiledAggregateFunctionsHolder;
class NativeWriter;
/** How are "total" values calculated with WITH TOTALS?
* (For more details, see TotalsHavingTransform.)
@ -1150,7 +1151,7 @@ private:
void writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,
Method & method,
IBlockOutputStream & out) const;
NativeWriter & out) const;
/// Merge NULL key data from hash table `src` into `dst`.
template <typename Method, typename Table>

View File

@ -35,8 +35,9 @@ public:
enum class HTTPMethod : uint8_t
{
UNKNOWN = 0,
GET = 1,
POST = 2,
GET = 1,
POST = 2,
OPTIONS = 3
};
enum class QueryKind : uint8_t

View File

@ -2,7 +2,7 @@
#include <Access/AccessFlags.h>
#include <Columns/ColumnNullable.h>
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <Processors/Transforms/buildPushingToViewsChain.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeNullable.h>

View File

@ -65,6 +65,7 @@
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/FilterTransform.h>
@ -1815,8 +1816,8 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
Block block_with_count{
{std::move(column), std::make_shared<DataTypeAggregateFunction>(func, argument_types, desc.parameters), desc.column_name}};
auto istream = std::make_shared<OneBlockInputStream>(block_with_count);
auto prepared_count = std::make_unique<ReadFromPreparedSource>(Pipe(std::make_shared<SourceFromInputStream>(istream)), context);
auto source = std::make_shared<SourceFromSingleChunk>(block_with_count);
auto prepared_count = std::make_unique<ReadFromPreparedSource>(Pipe(std::move(source)), context);
prepared_count->setStepDescription("Optimized trivial count");
query_plan.addStep(std::move(prepared_count));
from_stage = QueryProcessingStage::WithMergeableState;

View File

@ -214,7 +214,7 @@ bool isStorageTouchedByMutations(
ASTPtr select_query = prepareQueryAffectedAST(commands, storage, context_copy);
/// Interpreter must be alive, when we use result of execute() method.
/// For some reason it may copy context and and give it into ExpressionBlockInputStream
/// For some reason it may copy context and and give it into ExpressionTransform
/// after that we will use context from destroyed stack frame in our stream.
InterpreterSelectQuery interpreter(select_query, context_copy, storage, metadata_snapshot, SelectQueryOptions().ignoreLimits());
auto io = interpreter.execute();
@ -932,7 +932,7 @@ void MutationsInterpreter::validate()
auto pipeline = addStreamsForLaterStages(stages, plan);
}
BlockInputStreamPtr MutationsInterpreter::execute()
QueryPipeline MutationsInterpreter::execute()
{
if (!can_execute)
throw Exception("Cannot execute mutations interpreter because can_execute flag set to false", ErrorCodes::LOGICAL_ERROR);
@ -956,12 +956,11 @@ BlockInputStreamPtr MutationsInterpreter::execute()
}
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
BlockInputStreamPtr result_stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
if (!updated_header)
updated_header = std::make_unique<Block>(result_stream->getHeader());
updated_header = std::make_unique<Block>(pipeline.getHeader());
return result_stream;
return pipeline;
}
Block MutationsInterpreter::getUpdatedHeader() const

View File

@ -50,7 +50,7 @@ public:
size_t evaluateCommandsSize();
/// The resulting stream will return blocks containing only changed columns and columns, that we need to recalculate indices.
BlockInputStreamPtr execute();
QueryPipeline execute();
/// Only changed columns.
Block getUpdatedHeader() const;

View File

@ -120,7 +120,7 @@ static NamesAndTypesList getColumnsList(const ASTExpressionList * columns_defini
auto * literal = child->as<ASTLiteral>();
new_child->arguments = std::make_shared<ASTExpressionList>();
new_child->arguments->children.push_back(std::make_shared<ASTLiteral>(literal->value.get<String>()));
new_child->arguments->children.push_back(std::make_shared<ASTLiteral>(literal->value.safeGet<String>()));
new_child->arguments->children.push_back(std::make_shared<ASTLiteral>(Int16(++i)));
child = new_child;
}

View File

@ -203,7 +203,6 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
if (query_context->hasTraceCollector())
{
/// Set up memory profiling
thread_group->memory_tracker.setOrRaiseProfilerLimit(settings.memory_profiler_step);
thread_group->memory_tracker.setProfilerStep(settings.memory_profiler_step);
thread_group->memory_tracker.setSampleProbability(settings.memory_profiler_sample_probability);
}

View File

@ -527,7 +527,7 @@ void SystemLog<LogElement>::prepareTable()
auto alias_columns = LogElement::getNamesAndAliases();
auto current_query = InterpreterCreateQuery::formatColumns(ordinary_columns, alias_columns);
if (old_query->getTreeHash() != current_query->getTreeHash())
if (serializeAST(*old_query) != serializeAST(*current_query))
{
/// Rename the existing table.
int suffix = 0;

Some files were not shown because too many files have changed in this diff Show More