Merge remote-tracking branch 'upstream/master' into fix-logical-error-executable

This commit is contained in:
Nikolay Degterinsky 2023-04-13 04:58:32 +00:00
commit 1be35ff821
216 changed files with 997 additions and 633 deletions

View File

@ -155,13 +155,13 @@ struct common_type<wide::integer<Bits, Signed>, Arithmetic>
std::is_floating_point_v<Arithmetic>, std::is_floating_point_v<Arithmetic>,
Arithmetic, Arithmetic,
std::conditional_t< std::conditional_t<
sizeof(Arithmetic) < Bits * sizeof(long), sizeof(Arithmetic) * 8 < Bits,
wide::integer<Bits, Signed>, wide::integer<Bits, Signed>,
std::conditional_t< std::conditional_t<
Bits * sizeof(long) < sizeof(Arithmetic), Bits < sizeof(Arithmetic) * 8,
Arithmetic, Arithmetic,
std::conditional_t< std::conditional_t<
Bits * sizeof(long) == sizeof(Arithmetic) && (std::is_same_v<Signed, signed> || std::is_signed_v<Arithmetic>), Bits == sizeof(Arithmetic) * 8 && (std::is_same_v<Signed, signed> || std::is_signed_v<Arithmetic>),
Arithmetic, Arithmetic,
wide::integer<Bits, Signed>>>>>; wide::integer<Bits, Signed>>>>>;
}; };

2
contrib/cctz vendored

@ -1 +1 @@
Subproject commit 7c78edd52b4d65acc103c2f195818ffcabe6fe0d Subproject commit 5e05432420f9692418e2e12aff09859e420b14a2

2
contrib/krb5 vendored

@ -1 +1 @@
Subproject commit 9453aec0d50e5aff9b189051611b321b40935d02 Subproject commit b56ce6ba690e1f320df1a64afa34980c3e462617

View File

@ -78,7 +78,7 @@ The supported formats are:
| [Null](#null) | ✗ | ✔ | | [Null](#null) | ✗ | ✔ |
| [XML](#xml) | ✗ | ✔ | | [XML](#xml) | ✗ | ✔ |
| [CapnProto](#capnproto) | ✔ | ✔ | | [CapnProto](#capnproto) | ✔ | ✔ |
| [LineAsString](#lineasstring) | ✔ | | | [LineAsString](#lineasstring) | ✔ | |
| [Regexp](#data-format-regexp) | ✔ | ✗ | | [Regexp](#data-format-regexp) | ✔ | ✗ |
| [RawBLOB](#rawblob) | ✔ | ✔ | | [RawBLOB](#rawblob) | ✔ | ✔ |
| [MsgPack](#msgpack) | ✔ | ✔ | | [MsgPack](#msgpack) | ✔ | ✔ |
@ -1877,6 +1877,13 @@ Column names must:
Output Avro file compression and sync interval can be configured with [output_format_avro_codec](/docs/en/operations/settings/settings-formats.md/#output_format_avro_codec) and [output_format_avro_sync_interval](/docs/en/operations/settings/settings-formats.md/#output_format_avro_sync_interval) respectively. Output Avro file compression and sync interval can be configured with [output_format_avro_codec](/docs/en/operations/settings/settings-formats.md/#output_format_avro_codec) and [output_format_avro_sync_interval](/docs/en/operations/settings/settings-formats.md/#output_format_avro_sync_interval) respectively.
### Example Data {#example-data-avro}
Using the ClickHouse [DESCRIBE](/docs/en/sql-reference/statements/describe-table) function, you can quickly view the inferred format of an Avro file like the following example. This example includes the URL of a publicly accessible Avro file in the ClickHouse S3 public bucket:
``` DESCRIBE url('https://clickhouse-public-datasets.s3.eu-central-1.amazonaws.com/hits.avro','Avro');
```
## AvroConfluent {#data-format-avro-confluent} ## AvroConfluent {#data-format-avro-confluent}
AvroConfluent supports decoding single-object Avro messages commonly used with [Kafka](https://kafka.apache.org/) and [Confluent Schema Registry](https://docs.confluent.io/current/schema-registry/index.html). AvroConfluent supports decoding single-object Avro messages commonly used with [Kafka](https://kafka.apache.org/) and [Confluent Schema Registry](https://docs.confluent.io/current/schema-registry/index.html).

View File

@ -50,6 +50,7 @@ last_queue_update: 2021-10-12 14:50:08
absolute_delay: 99 absolute_delay: 99
total_replicas: 5 total_replicas: 5
active_replicas: 5 active_replicas: 5
lost_part_count: 0
last_queue_update_exception: last_queue_update_exception:
zookeeper_exception: zookeeper_exception:
replica_is_active: {'r1':1,'r2':1} replica_is_active: {'r1':1,'r2':1}
@ -90,6 +91,7 @@ The next 4 columns have a non-zero value only where there is an active session w
- `absolute_delay` (`UInt64`) - How big lag in seconds the current replica has. - `absolute_delay` (`UInt64`) - How big lag in seconds the current replica has.
- `total_replicas` (`UInt8`) - The total number of known replicas of this table. - `total_replicas` (`UInt8`) - The total number of known replicas of this table.
- `active_replicas` (`UInt8`) - The number of replicas of this table that have a session in ClickHouse Keeper (i.e., the number of functioning replicas). - `active_replicas` (`UInt8`) - The number of replicas of this table that have a session in ClickHouse Keeper (i.e., the number of functioning replicas).
- `lost_part_count` (`UInt64`) - The number of data parts lost in the table by all replicas in total since table creation. Value is persisted in ClickHouse Keeper and can only increase.
- `last_queue_update_exception` (`String`) - When the queue contains broken entries. Especially important when ClickHouse breaks backward compatibility between versions and log entries written by newer versions aren't parseable by old versions. - `last_queue_update_exception` (`String`) - When the queue contains broken entries. Especially important when ClickHouse breaks backward compatibility between versions and log entries written by newer versions aren't parseable by old versions.
- `zookeeper_exception` (`String`) - The last exception message, got if the error happened when fetching the info from ClickHouse Keeper. - `zookeeper_exception` (`String`) - The last exception message, got if the error happened when fetching the info from ClickHouse Keeper.
- `replica_is_active` ([Map(String, UInt8)](../../sql-reference/data-types/map.md)) — Map between replica name and is replica active. - `replica_is_active` ([Map(String, UInt8)](../../sql-reference/data-types/map.md)) — Map between replica name and is replica active.

View File

@ -11,8 +11,16 @@ Columns:
- `volume_name` ([String](../../sql-reference/data-types/string.md)) — Volume name defined in the storage policy. - `volume_name` ([String](../../sql-reference/data-types/string.md)) — Volume name defined in the storage policy.
- `volume_priority` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Volume order number in the configuration, the data fills the volumes according this priority, i.e. data during inserts and merges is written to volumes with a lower priority (taking into account other rules: TTL, `max_data_part_size`, `move_factor`). - `volume_priority` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Volume order number in the configuration, the data fills the volumes according this priority, i.e. data during inserts and merges is written to volumes with a lower priority (taking into account other rules: TTL, `max_data_part_size`, `move_factor`).
- `disks` ([Array(String)](../../sql-reference/data-types/array.md)) — Disk names, defined in the storage policy. - `disks` ([Array(String)](../../sql-reference/data-types/array.md)) — Disk names, defined in the storage policy.
- `volume_type` ([Enum8](../../sql-reference/data-types/enum.md)) — Type of volume. Can have one of the following values:
- `JBOD`
- `SINGLE_DISK`
- `UNKNOWN`
- `max_data_part_size` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Maximum size of a data part that can be stored on volume disks (0 — no limit). - `max_data_part_size` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Maximum size of a data part that can be stored on volume disks (0 — no limit).
- `move_factor` ([Float64](../../sql-reference/data-types/float.md)) — Ratio of free disk space. When the ratio exceeds the value of configuration parameter, ClickHouse start to move data to the next volume in order. - `move_factor` ([Float64](../../sql-reference/data-types/float.md)) — Ratio of free disk space. When the ratio exceeds the value of configuration parameter, ClickHouse start to move data to the next volume in order.
- `prefer_not_to_merge` ([UInt8](../../sql-reference/data-types/int-uint.md)) — Value of the `prefer_not_to_merge` setting. When this setting is enabled, merging data on this volume is not allowed. This allows controlling how ClickHouse works with slow disks. - `prefer_not_to_merge` ([UInt8](../../sql-reference/data-types/int-uint.md)) — Value of the `prefer_not_to_merge` setting. When this setting is enabled, merging data on this volume is not allowed. This allows controlling how ClickHouse works with slow disks.
- `perform_ttl_move_on_insert` ([UInt8](../../sql-reference/data-types/int-uint.md)) — Value of the `perform_ttl_move_on_insert` setting. — Disables TTL move on data part INSERT. By default if we insert a data part that already expired by the TTL move rule it immediately goes to a volume/disk declared in move rule. This can significantly slowdown insert in case if destination volume/disk is slow (e.g. S3).
- `load_balancing` ([Enum8](../../sql-reference/data-types/enum.md)) — Policy for disk balancing. Can have one of the following values:
- `ROUND_ROBIN`
- `LEAST_USED`
If the storage policy contains more then one volume, then information for each volume is stored in the individual row of the table. If the storage policy contains more then one volume, then information for each volume is stored in the individual row of the table.

View File

@ -14,7 +14,7 @@ The `INSERT` query uses both parsers:
INSERT INTO t VALUES (1, 'Hello, world'), (2, 'abc'), (3, 'def') INSERT INTO t VALUES (1, 'Hello, world'), (2, 'abc'), (3, 'def')
``` ```
The `INSERT INTO t VALUES` fragment is parsed by the full parser, and the data `(1, 'Hello, world'), (2, 'abc'), (3, 'def')` is parsed by the fast stream parser. You can also turn on the full parser for the data by using the [input_format_values_interpret_expressions](../operations/settings/settings-formats.md#settings-input_format_values_interpret_expressions) setting. When `input_format_values_interpret_expressions = 1`, ClickHouse first tries to parse values with the fast stream parser. If it fails, ClickHouse tries to use the full parser for the data, treating it like an SQL [expression](#syntax-expressions). The `INSERT INTO t VALUES` fragment is parsed by the full parser, and the data `(1, 'Hello, world'), (2, 'abc'), (3, 'def')` is parsed by the fast stream parser. You can also turn on the full parser for the data by using the [input_format_values_interpret_expressions](../operations/settings/settings-formats.md#input_format_values_interpret_expressions) setting. When `input_format_values_interpret_expressions = 1`, ClickHouse first tries to parse values with the fast stream parser. If it fails, ClickHouse tries to use the full parser for the data, treating it like an SQL [expression](#expressions).
Data can have any format. When a query is received, the server calculates no more than [max_query_size](../operations/settings/settings.md#settings-max_query_size) bytes of the request in RAM (by default, 1 MB), and the rest is stream parsed. Data can have any format. When a query is received, the server calculates no more than [max_query_size](../operations/settings/settings.md#settings-max_query_size) bytes of the request in RAM (by default, 1 MB), and the rest is stream parsed.
It allows for avoiding issues with large `INSERT` queries. It allows for avoiding issues with large `INSERT` queries.
@ -45,7 +45,7 @@ You can check whether a data type name is case-sensitive in the [system.data_typ
In contrast to standard SQL, all other keywords (including functions names) are **case-sensitive**. In contrast to standard SQL, all other keywords (including functions names) are **case-sensitive**.
Keywords are not reserved; they are treated as such only in the corresponding context. If you use [identifiers](#syntax-identifiers) with the same name as the keywords, enclose them into double-quotes or backticks. For example, the query `SELECT "FROM" FROM table_name` is valid if the table `table_name` has column with the name `"FROM"`. Keywords are not reserved; they are treated as such only in the corresponding context. If you use [identifiers](#identifiers) with the same name as the keywords, enclose them into double-quotes or backticks. For example, the query `SELECT "FROM" FROM table_name` is valid if the table `table_name` has column with the name `"FROM"`.
## Identifiers ## Identifiers
@ -54,7 +54,7 @@ Identifiers are:
- Cluster, database, table, partition, and column names. - Cluster, database, table, partition, and column names.
- Functions. - Functions.
- Data types. - Data types.
- [Expression aliases](#syntax-expression_aliases). - [Expression aliases](#expression_aliases).
Identifiers can be quoted or non-quoted. The latter is preferred. Identifiers can be quoted or non-quoted. The latter is preferred.
@ -108,7 +108,7 @@ Depending on the data format (input or output), `NULL` may have a different repr
There are many nuances to processing `NULL`. For example, if at least one of the arguments of a comparison operation is `NULL`, the result of this operation is also `NULL`. The same is true for multiplication, addition, and other operations. For more information, read the documentation for each operation. There are many nuances to processing `NULL`. For example, if at least one of the arguments of a comparison operation is `NULL`, the result of this operation is also `NULL`. The same is true for multiplication, addition, and other operations. For more information, read the documentation for each operation.
In queries, you can check `NULL` using the [IS NULL](../sql-reference/operators/index.md#operator-is-null) and [IS NOT NULL](../sql-reference/operators/index.md) operators and the related functions `isNull` and `isNotNull`. In queries, you can check `NULL` using the [IS NULL](../sql-reference/operators/index.md#is-null) and [IS NOT NULL](../sql-reference/operators/index.md#is-not-null) operators and the related functions `isNull` and `isNotNull`.
### Heredoc ### Heredoc
@ -149,7 +149,7 @@ For example, the following SQL defines parameters named `a`, `b`, `c` and `d` -
SET param_a = 13; SET param_a = 13;
SET param_b = 'str'; SET param_b = 'str';
SET param_c = '2022-08-04 18:30:53'; SET param_c = '2022-08-04 18:30:53';
SET param_d = {'10': [11, 12], '13': [14, 15]}'; SET param_d = {'10': [11, 12], '13': [14, 15]};
SELECT SELECT
{a: UInt32}, {a: UInt32},
@ -166,7 +166,7 @@ Result:
If you are using `clickhouse-client`, the parameters are specified as `--param_name=value`. For example, the following parameter has the name `message` and it is retrieved as a `String`: If you are using `clickhouse-client`, the parameters are specified as `--param_name=value`. For example, the following parameter has the name `message` and it is retrieved as a `String`:
```sql ```bash
clickhouse-client --param_message='hello' --query="SELECT {message: String}" clickhouse-client --param_message='hello' --query="SELECT {message: String}"
``` ```
@ -190,7 +190,7 @@ Query parameters are not general text substitutions which can be used in arbitra
## Functions ## Functions
Function calls are written like an identifier with a list of arguments (possibly empty) in round brackets. In contrast to standard SQL, the brackets are required, even for an empty argument list. Example: `now()`. Function calls are written like an identifier with a list of arguments (possibly empty) in round brackets. In contrast to standard SQL, the brackets are required, even for an empty argument list. Example: `now()`.
There are regular and aggregate functions (see the section “Aggregate functions”). Some aggregate functions can contain two lists of arguments in brackets. Example: `quantile (0.9) (x)`. These aggregate functions are called “parametric” functions, and the arguments in the first list are called “parameters”. The syntax of aggregate functions without parameters is the same as for regular functions. There are regular and aggregate functions (see the section [Aggregate functions](/docs/en/sql-reference/aggregate-functions/index.md)). Some aggregate functions can contain two lists of arguments in brackets. Example: `quantile (0.9) (x)`. These aggregate functions are called “parametric” functions, and the arguments in the first list are called “parameters”. The syntax of aggregate functions without parameters is the same as for regular functions.
## Operators ## Operators
@ -199,7 +199,7 @@ For example, the expression `1 + 2 * 3 + 4` is transformed to `plus(plus(1, mult
## Data Types and Database Table Engines ## Data Types and Database Table Engines
Data types and table engines in the `CREATE` query are written the same way as identifiers or functions. In other words, they may or may not contain an argument list in brackets. For more information, see the sections “Data types,” “Table engines,” and “CREATE”. Data types and table engines in the `CREATE` query are written the same way as identifiers or functions. In other words, they may or may not contain an argument list in brackets. For more information, see the sections [Data types](/docs/en/sql-reference/data-types/index.md), [Table engines](/docs/en/engines/table-engines/index.md), and [CREATE](/docs/en/sql-reference/statements/create/index.md).
## Expression Aliases ## Expression Aliases
@ -211,17 +211,17 @@ expr AS alias
- `AS` — The keyword for defining aliases. You can define the alias for a table name or a column name in a `SELECT` clause without using the `AS` keyword. - `AS` — The keyword for defining aliases. You can define the alias for a table name or a column name in a `SELECT` clause without using the `AS` keyword.
For example, `SELECT table_name_alias.column_name FROM table_name table_name_alias`. For example, `SELECT table_name_alias.column_name FROM table_name table_name_alias`.
In the [CAST](./functions/type-conversion-functions.md#type_conversion_function-cast) function, the `AS` keyword has another meaning. See the description of the function. In the [CAST](./functions/type-conversion-functions.md#castx-t) function, the `AS` keyword has another meaning. See the description of the function.
- `expr` — Any expression supported by ClickHouse. - `expr` — Any expression supported by ClickHouse.
For example, `SELECT column_name * 2 AS double FROM some_table`. For example, `SELECT column_name * 2 AS double FROM some_table`.
- `alias` — Name for `expr`. Aliases should comply with the [identifiers](#syntax-identifiers) syntax. - `alias` — Name for `expr`. Aliases should comply with the [identifiers](#identifiers) syntax.
For example, `SELECT "table t".column_name FROM table_name AS "table t"`. For example, `SELECT "table t".column_name FROM table_name AS "table t"`.
### Notes on Usage ### Notes on Usage
@ -254,11 +254,11 @@ Received exception from server (version 18.14.17):
Code: 184. DB::Exception: Received from localhost:9000, 127.0.0.1. DB::Exception: Aggregate function sum(b) is found inside another aggregate function in query. Code: 184. DB::Exception: Received from localhost:9000, 127.0.0.1. DB::Exception: Aggregate function sum(b) is found inside another aggregate function in query.
``` ```
In this example, we declared table `t` with column `b`. Then, when selecting data, we defined the `sum(b) AS b` alias. As aliases are global, ClickHouse substituted the literal `b` in the expression `argMax(a, b)` with the expression `sum(b)`. This substitution caused the exception. You can change this default behavior by setting [prefer_column_name_to_alias](../operations/settings/settings.md#prefer_column_name_to_alias) to `1`. In this example, we declared table `t` with column `b`. Then, when selecting data, we defined the `sum(b) AS b` alias. As aliases are global, ClickHouse substituted the literal `b` in the expression `argMax(a, b)` with the expression `sum(b)`. This substitution caused the exception. You can change this default behavior by setting [prefer_column_name_to_alias](../operations/settings/settings.md#prefer-column-name-to-alias) to `1`.
## Asterisk ## Asterisk
In a `SELECT` query, an asterisk can replace the expression. For more information, see the section “SELECT”. In a `SELECT` query, an asterisk can replace the expression. For more information, see the section [SELECT](/docs/en/sql-reference/statements/select/index.md#asterisk).
## Expressions ## Expressions

View File

@ -20,7 +20,7 @@ A key advantage between ordinary UDF functions and the `executable` table functi
The `executable` table function requires three parameters and accepts an optional list of input queries: The `executable` table function requires three parameters and accepts an optional list of input queries:
```sql ```sql
executable(script_name, format, structure, [input_query...]) executable(script_name, format, structure, [input_query...] [,SETTINGS ...])
``` ```
- `script_name`: the file name of the script. saved in the `user_scripts` folder (the default folder of the `user_scripts_path` setting) - `script_name`: the file name of the script. saved in the `user_scripts` folder (the default folder of the `user_scripts_path` setting)
@ -83,6 +83,15 @@ The response looks like:
└────┴────────────┘ └────┴────────────┘
``` ```
## Settings
- `send_chunk_header` - controls whether to send row count before sending a chunk of data to process. Default value is `false`.
- `pool_size` — Size of pool. If 0 is specified as `pool_size` then there is no pool size restrictions. Default value is `16`.
- `max_command_execution_time` — Maximum executable script command execution time for processing block of data. Specified in seconds. Default value is 10.
- `command_termination_timeout` — executable script should contain main read-write loop. After table function is destroyed, pipe is closed, and executable file will have `command_termination_timeout` seconds to shutdown, before ClickHouse will send SIGTERM signal to child process. Specified in seconds. Default value is 10.
- `command_read_timeout` - timeout for reading data from command stdout in milliseconds. Default value 10000.
- `command_write_timeout` - timeout for writing data to command stdin in milliseconds. Default value 10000.
## Passing Query Results to a Script ## Passing Query Results to a Script
Be sure to check out the example in the `Executable` table engine on [how to pass query results to a script](../../engines/table-engines/special/executable.md#passing-query-results-to-a-script). Here is how you execute the same script in that example using the `executable` table function: Be sure to check out the example in the `Executable` table engine on [how to pass query results to a script](../../engines/table-engines/special/executable.md#passing-query-results-to-a-script). Here is how you execute the same script in that example using the `executable` table function:

View File

@ -375,15 +375,22 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
try try
{ {
ReadBufferFromFile in(binary_self_path.string()); String source = binary_self_path.string();
WriteBufferFromFile out(main_bin_tmp_path.string()); String destination = main_bin_tmp_path.string();
copyData(in, out);
out.sync();
if (0 != fchmod(out.getFD(), S_IRUSR | S_IRGRP | S_IROTH | S_IXUSR | S_IXGRP | S_IXOTH)) /// Try to make a hard link first, as an optimization.
/// It is possible if the source and the destination are on the same filesystems.
if (0 != link(source.c_str(), destination.c_str()))
{
ReadBufferFromFile in(binary_self_path.string());
WriteBufferFromFile out(main_bin_tmp_path.string());
copyData(in, out);
out.sync();
out.finalize();
}
if (0 != chmod(destination.c_str(), S_IRUSR | S_IRGRP | S_IROTH | S_IXUSR | S_IXGRP | S_IXOTH))
throwFromErrno(fmt::format("Cannot chmod {}", main_bin_tmp_path.string()), ErrorCodes::SYSTEM_ERROR); throwFromErrno(fmt::format("Cannot chmod {}", main_bin_tmp_path.string()), ErrorCodes::SYSTEM_ERROR);
out.finalize();
} }
catch (const Exception & e) catch (const Exception & e)
{ {

View File

@ -1,6 +1,5 @@
#pragma once #pragma once
#include <Common/logger_useful.h>
#include <base/sort.h> #include <base/sort.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>

View File

@ -11,7 +11,6 @@
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Common/PODArray.h> #include <Common/PODArray.h>
#include <Common/logger_useful.h>
#include <IO/ReadBufferFromString.h> #include <IO/ReadBufferFromString.h>
#include <Common/HashTable/HashMap.h> #include <Common/HashTable/HashMap.h>
#include <Columns/IColumn.h> #include <Columns/IColumn.h>

View File

@ -18,7 +18,6 @@
#include <AggregateFunctions/IAggregateFunction.h> #include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/FactoryHelpers.h> #include <AggregateFunctions/FactoryHelpers.h>
#include <map> #include <map>
#include <Common/logger_useful.h>
#include <Common/ClickHouseRevision.h> #include <Common/ClickHouseRevision.h>

View File

@ -5,7 +5,6 @@
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Net/HTTPRequest.h> #include <Poco/Net/HTTPRequest.h>
#include <Common/ShellCommand.h> #include <Common/ShellCommand.h>
#include <Common/logger_useful.h>
namespace DB namespace DB

View File

@ -11,7 +11,6 @@
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <Common/BridgeProtocolVersion.h> #include <Common/BridgeProtocolVersion.h>
#include <Common/ShellCommand.h> #include <Common/ShellCommand.h>
#include <Common/logger_useful.h>
#include <IO/ConnectionTimeouts.h> #include <IO/ConnectionTimeouts.h>
#include <base/range.h> #include <base/range.h>
#include <BridgeHelper/IBridgeHelper.h> #include <BridgeHelper/IBridgeHelper.h>

View File

@ -22,7 +22,8 @@
#include <Common/StringUtils/StringUtils.h> #include <Common/StringUtils/StringUtils.h>
#include <Common/OpenSSLHelpers.h> #include <Common/OpenSSLHelpers.h>
#include <Common/randomSeed.h> #include <Common/randomSeed.h>
#include "Core/Block.h" #include <Common/logger_useful.h>
#include <Core/Block.h>
#include <Interpreters/ClientInfo.h> #include <Interpreters/ClientInfo.h>
#include <Interpreters/OpenTelemetrySpanLog.h> #include <Interpreters/OpenTelemetrySpanLog.h>
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>

View File

@ -1,6 +1,5 @@
#pragma once #pragma once
#include <Common/logger_useful.h>
#include <Poco/Net/StreamSocket.h> #include <Poco/Net/StreamSocket.h>

View File

@ -6,6 +6,7 @@
#include <Processors/Executors/PushingAsyncPipelineExecutor.h> #include <Processors/Executors/PushingAsyncPipelineExecutor.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Common/ConcurrentBoundedQueue.h> #include <Common/ConcurrentBoundedQueue.h>
#include <Common/CurrentThread.h>
#include <Core/Protocol.h> #include <Core/Protocol.h>

View File

@ -7,6 +7,7 @@
#include <Interpreters/Session.h> #include <Interpreters/Session.h>
#include <Interpreters/ProfileEventsExt.h> #include <Interpreters/ProfileEventsExt.h>
#include <Storages/ColumnsDescription.h> #include <Storages/ColumnsDescription.h>
#include <Common/CurrentThread.h>
namespace DB namespace DB

View File

@ -3,6 +3,7 @@
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/filesystemHelpers.h> #include <Common/filesystemHelpers.h>
#include <Common/logger_useful.h>
#include <IO/UncompressedCache.h> #include <IO/UncompressedCache.h>
#include <IO/MMappedFileCache.h> #include <IO/MMappedFileCache.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>

View File

@ -12,7 +12,6 @@
#include <mutex> #include <mutex>
#include <unordered_map> #include <unordered_map>
#include <Common/logger_useful.h>
#include <base/defines.h> #include <base/defines.h>

View File

@ -21,6 +21,7 @@
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/getResource.h> #include <Common/getResource.h>
#include <Common/XMLUtils.h> #include <Common/XMLUtils.h>
#include <Common/logger_useful.h>
#include <base/errnoToString.h> #include <base/errnoToString.h>
#include <base/sort.h> #include <base/sort.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>

View File

@ -16,9 +16,10 @@
#include <Poco/DirectoryIterator.h> #include <Poco/DirectoryIterator.h>
#include <Poco/ConsoleChannel.h> #include <Poco/ConsoleChannel.h>
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <Common/logger_useful.h>
namespace Poco { class Logger; }
namespace zkutil namespace zkutil
{ {
class ZooKeeperNodeCache; class ZooKeeperNodeCache;

View File

@ -2,11 +2,11 @@
#include "config.h" #include "config.h"
#include <string> #include <Common/ErrorCodes.h>
#include <Common/Exception.h>
#include <base/types.h>
#include <Poco/DOM/Document.h> #include <Poco/DOM/Document.h>
#include "Poco/DOM/AutoPtr.h" #include <Poco/DOM/AutoPtr.h>
#include <Common/logger_useful.h>
#if USE_YAML_CPP #if USE_YAML_CPP

View File

@ -4,6 +4,7 @@
#include "ConfigProcessor.h" #include "ConfigProcessor.h"
#include <filesystem> #include <filesystem>
#include <iostream> #include <iostream>
#include <base/types.h>
namespace fs = std::filesystem; namespace fs = std::filesystem;

View File

@ -90,7 +90,7 @@ void CurrentThread::attachInternalTextLogsQueue(const std::shared_ptr<InternalTe
} }
ThreadGroupStatusPtr CurrentThread::getGroup() ThreadGroupPtr CurrentThread::getGroup()
{ {
if (unlikely(!current_thread)) if (unlikely(!current_thread))
return nullptr; return nullptr;

View File

@ -39,7 +39,7 @@ public:
static ThreadStatus & get(); static ThreadStatus & get();
/// Group to which belongs current thread /// Group to which belongs current thread
static ThreadGroupStatusPtr getGroup(); static ThreadGroupPtr getGroup();
/// A logs queue used by TCPHandler to pass logs to a client /// A logs queue used by TCPHandler to pass logs to a client
static void attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue, static void attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue,
@ -69,9 +69,9 @@ public:
/// You must call one of these methods when create a query child thread: /// You must call one of these methods when create a query child thread:
/// Add current thread to a group associated with the thread group /// Add current thread to a group associated with the thread group
static void attachToGroup(const ThreadGroupStatusPtr & thread_group); static void attachToGroup(const ThreadGroupPtr & thread_group);
/// Is useful for a ThreadPool tasks /// Is useful for a ThreadPool tasks
static void attachToGroupIfDetached(const ThreadGroupStatusPtr & thread_group); static void attachToGroupIfDetached(const ThreadGroupPtr & thread_group);
/// Non-master threads call this method in destructor automatically /// Non-master threads call this method in destructor automatically
static void detachFromGroupIfNotDetached(); static void detachFromGroupIfNotDetached();

View File

@ -3,6 +3,7 @@
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/thread_local_rng.h> #include <Common/thread_local_rng.h>
#include <Common/logger_useful.h>
#include <Core/Names.h> #include <Core/Names.h>
#include <base/types.h> #include <base/types.h>
#include <Poco/Net/IPAddress.h> #include <Poco/Net/IPAddress.h>

View File

@ -5,9 +5,10 @@
#include <base/types.h> #include <base/types.h>
#include <Core/Names.h> #include <Core/Names.h>
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
#include <Common/logger_useful.h>
namespace Poco { class Logger; }
namespace DB namespace DB
{ {

View File

@ -1,7 +1,6 @@
#pragma once #pragma once
#include <Poco/ErrorHandler.h> #include <Poco/ErrorHandler.h>
#include <Common/logger_useful.h>
#include <Common/Exception.h> #include <Common/Exception.h>

View File

@ -1,5 +1,7 @@
#include <Common/FileChecker.h> #include <Common/FileChecker.h>
#include <Common/escapeForFileName.h> #include <Common/escapeForFileName.h>
#include <Common/logger_useful.h>
#include <Common/ErrorCodes.h>
#include <Disks/IDisk.h> #include <Disks/IDisk.h>
#include <IO/WriteBufferFromFile.h> #include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
@ -25,7 +27,9 @@ FileChecker::FileChecker(const String & file_info_path_) : FileChecker(nullptr,
{ {
} }
FileChecker::FileChecker(DiskPtr disk_, const String & file_info_path_) : disk(std::move(disk_)) FileChecker::FileChecker(DiskPtr disk_, const String & file_info_path_)
: disk(std::move(disk_))
, log(&Poco::Logger::get("FileChecker"))
{ {
setPath(file_info_path_); setPath(file_info_path_);
try try

View File

@ -1,8 +1,10 @@
#pragma once #pragma once
#include <Common/logger_useful.h>
#include <Storages/CheckResults.h> #include <Storages/CheckResults.h>
#include <map>
#include <base/types.h>
namespace Poco { class Logger; }
namespace DB namespace DB
{ {
@ -46,7 +48,7 @@ private:
size_t getRealFileSize(const String & path_) const; size_t getRealFileSize(const String & path_) const;
const DiskPtr disk; const DiskPtr disk;
const Poco::Logger * log = &Poco::Logger::get("FileChecker"); const Poco::Logger * log;
String files_info_path; String files_info_path;
std::map<String, size_t> map; std::map<String, size_t> map;

View File

@ -5,8 +5,6 @@
#include <list> #include <list>
#include <unordered_map> #include <unordered_map>
#include <Common/logger_useful.h>
namespace DB namespace DB
{ {
/// Cache policy LRU evicts entries which are not used for a long time. /// Cache policy LRU evicts entries which are not used for a long time.
@ -174,7 +172,7 @@ private:
auto it = cells.find(key); auto it = cells.find(key);
if (it == cells.end()) if (it == cells.end())
{ {
LOG_ERROR(&Poco::Logger::get("LRUCache"), "LRUCache became inconsistent. There must be a bug in it."); // Queue became inconsistent
abort(); abort();
} }
@ -192,7 +190,7 @@ private:
if (current_size_in_bytes > (1ull << 63)) if (current_size_in_bytes > (1ull << 63))
{ {
LOG_ERROR(&Poco::Logger::get("LRUCache"), "LRUCache became inconsistent. There must be a bug in it."); // Queue became inconsistent
abort(); abort();
} }
} }

View File

@ -1,7 +1,7 @@
#pragma once #pragma once
#include <Common/logger_useful.h>
#include <base/types.h> #include <base/types.h>
#include <Core/Types.h>
#include <boost/core/noncopyable.hpp> #include <boost/core/noncopyable.hpp>
#include <Poco/Logger.h> #include <Poco/Logger.h>
#include <cassert> #include <cassert>

View File

@ -144,12 +144,17 @@ public:
return Entry(*items.back()); return Entry(*items.back());
} }
LOG_INFO(log, "No free connections in pool. Waiting.");
if (timeout < 0) if (timeout < 0)
{
LOG_INFO(log, "No free connections in pool. Waiting undefinitelly.");
available.wait(lock); available.wait(lock);
}
else else
available.wait_for(lock, std::chrono::microseconds(timeout)); {
auto timeout_ms = std::chrono::microseconds(timeout);
LOG_INFO(log, "No free connections in pool. Waiting {} ms.", timeout_ms.count());
available.wait_for(lock, timeout_ms);
}
} }
} }

View File

@ -5,8 +5,6 @@
#include <list> #include <list>
#include <unordered_map> #include <unordered_map>
#include <Common/logger_useful.h>
namespace DB namespace DB
{ {
@ -236,7 +234,7 @@ private:
auto it = cells.find(key); auto it = cells.find(key);
if (it == cells.end()) if (it == cells.end())
{ {
LOG_ERROR(&Poco::Logger::get("SLRUCache"), "SLRUCache became inconsistent. There must be a bug in it."); // Queue became inconsistent
abort(); abort();
} }
@ -264,7 +262,7 @@ private:
if (current_size_in_bytes > (1ull << 63)) if (current_size_in_bytes > (1ull << 63))
{ {
LOG_ERROR(&Poco::Logger::get("SLRUCache"), "SLRUCache became inconsistent. There must be a bug in it."); // Queue became inconsistent
abort(); abort();
} }
} }

View File

@ -23,6 +23,7 @@
#include <boost/algorithm/string/split.hpp> #include <boost/algorithm/string/split.hpp>
#include <base/errnoToString.h> #include <base/errnoToString.h>
#include <Common/logger_useful.h>
namespace ProfileEvents namespace ProfileEvents

View File

@ -2,11 +2,13 @@
#include <base/types.h> #include <base/types.h>
#include <base/getThreadId.h> #include <base/getThreadId.h>
#include <base/defines.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <sys/time.h> #include <sys/time.h>
#include <sys/resource.h> #include <sys/resource.h>
#include <pthread.h> #include <pthread.h>
#include <Common/logger_useful.h> #include <boost/noncopyable.hpp>
#if defined(OS_LINUX) #if defined(OS_LINUX)

View File

@ -2,6 +2,8 @@
#include <Common/ThreadProfileEvents.h> #include <Common/ThreadProfileEvents.h>
#include <Common/QueryProfiler.h> #include <Common/QueryProfiler.h>
#include <Common/ThreadStatus.h> #include <Common/ThreadStatus.h>
#include <Common/CurrentThread.h>
#include <Common/logger_useful.h>
#include <base/errnoToString.h> #include <base/errnoToString.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
@ -61,7 +63,7 @@ static thread_local ThreadStack alt_stack;
static thread_local bool has_alt_stack = false; static thread_local bool has_alt_stack = false;
#endif #endif
ThreadGroupStatus::ThreadGroupStatus() ThreadGroup::ThreadGroup()
: master_thread_id(CurrentThread::get().thread_id) : master_thread_id(CurrentThread::get().thread_id)
{} {}
@ -119,7 +121,7 @@ ThreadStatus::ThreadStatus()
#endif #endif
} }
ThreadGroupStatusPtr ThreadStatus::getThreadGroup() const ThreadGroupPtr ThreadStatus::getThreadGroup() const
{ {
return thread_group; return thread_group;
} }
@ -139,7 +141,7 @@ ContextPtr ThreadStatus::getGlobalContext() const
return global_context.lock(); return global_context.lock();
} }
void ThreadGroupStatus::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue, LogsLevel logs_level) void ThreadGroup::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue, LogsLevel logs_level)
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
shared_data.logs_queue_ptr = logs_queue; shared_data.logs_queue_ptr = logs_queue;

View File

@ -41,7 +41,6 @@ class TaskStatsInfoGetter;
class InternalTextLogsQueue; class InternalTextLogsQueue;
struct ViewRuntimeData; struct ViewRuntimeData;
class QueryViewsLog; class QueryViewsLog;
class ThreadGroupSwitcher;
using InternalTextLogsQueuePtr = std::shared_ptr<InternalTextLogsQueue>; using InternalTextLogsQueuePtr = std::shared_ptr<InternalTextLogsQueue>;
using InternalTextLogsQueueWeakPtr = std::weak_ptr<InternalTextLogsQueue>; using InternalTextLogsQueueWeakPtr = std::weak_ptr<InternalTextLogsQueue>;
@ -58,15 +57,15 @@ using ThreadStatusPtr = ThreadStatus *;
* Create via CurrentThread::initializeQuery (for queries) or directly (for various background tasks). * Create via CurrentThread::initializeQuery (for queries) or directly (for various background tasks).
* Use via CurrentThread::getGroup. * Use via CurrentThread::getGroup.
*/ */
class ThreadGroupStatus; class ThreadGroup;
using ThreadGroupStatusPtr = std::shared_ptr<ThreadGroupStatus>; using ThreadGroupPtr = std::shared_ptr<ThreadGroup>;
class ThreadGroupStatus class ThreadGroup
{ {
public: public:
ThreadGroupStatus(); ThreadGroup();
using FatalErrorCallback = std::function<void()>; using FatalErrorCallback = std::function<void()>;
ThreadGroupStatus(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_ = {}); ThreadGroup(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_ = {});
/// The first thread created this thread group /// The first thread created this thread group
const UInt64 master_thread_id; const UInt64 master_thread_id;
@ -104,9 +103,9 @@ public:
void attachInternalProfileEventsQueue(const InternalProfileEventsQueuePtr & profile_queue); void attachInternalProfileEventsQueue(const InternalProfileEventsQueuePtr & profile_queue);
/// When new query starts, new thread group is created for it, current thread becomes master thread of the query /// When new query starts, new thread group is created for it, current thread becomes master thread of the query
static ThreadGroupStatusPtr createForQuery(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_ = {}); static ThreadGroupPtr createForQuery(ContextPtr query_context_, FatalErrorCallback fatal_error_callback_ = {});
static ThreadGroupStatusPtr createForBackgroundProcess(ContextPtr storage_context); static ThreadGroupPtr createForBackgroundProcess(ContextPtr storage_context);
std::vector<UInt64> getInvolvedThreadIds() const; std::vector<UInt64> getInvolvedThreadIds() const;
void linkThread(UInt64 thread_it); void linkThread(UInt64 thread_it);
@ -120,6 +119,21 @@ private:
std::unordered_set<UInt64> thread_ids; std::unordered_set<UInt64> thread_ids;
}; };
/**
* Since merge is executed with multiple threads, this class
* switches the parent MemoryTracker as part of the thread group to account all the memory used.
*/
class ThreadGroupSwitcher : private boost::noncopyable
{
public:
explicit ThreadGroupSwitcher(ThreadGroupPtr thread_group);
~ThreadGroupSwitcher();
private:
ThreadGroupPtr prev_thread_group;
};
/** /**
* We use **constinit** here to tell the compiler the current_thread variable is initialized. * We use **constinit** here to tell the compiler the current_thread variable is initialized.
* If we didn't help the compiler, then it would most likely add a check before every use of the variable to initialize it if needed. * If we didn't help the compiler, then it would most likely add a check before every use of the variable to initialize it if needed.
@ -163,7 +177,7 @@ public:
private: private:
/// Group of threads, to which this thread attached /// Group of threads, to which this thread attached
ThreadGroupStatusPtr thread_group; ThreadGroupPtr thread_group;
/// Is set once /// Is set once
ContextWeakPtr global_context; ContextWeakPtr global_context;
@ -174,7 +188,7 @@ private:
using FatalErrorCallback = std::function<void()>; using FatalErrorCallback = std::function<void()>;
FatalErrorCallback fatal_error_callback; FatalErrorCallback fatal_error_callback;
ThreadGroupStatus::SharedData local_data; ThreadGroup::SharedData local_data;
bool performance_counters_finalized = false; bool performance_counters_finalized = false;
@ -215,7 +229,7 @@ public:
ThreadStatus(); ThreadStatus();
~ThreadStatus(); ~ThreadStatus();
ThreadGroupStatusPtr getThreadGroup() const; ThreadGroupPtr getThreadGroup() const;
const String & getQueryId() const; const String & getQueryId() const;
@ -239,7 +253,7 @@ public:
void setInternalThread(); void setInternalThread();
/// Attaches slave thread to existing thread group /// Attaches slave thread to existing thread group
void attachToGroup(const ThreadGroupStatusPtr & thread_group_, bool check_detached = true); void attachToGroup(const ThreadGroupPtr & thread_group_, bool check_detached = true);
/// Detaches thread from the thread group and the query, dumps performance counters if they have not been dumped /// Detaches thread from the thread group and the query, dumps performance counters if they have not been dumped
void detachFromGroup(); void detachFromGroup();
@ -287,7 +301,7 @@ private:
void logToQueryThreadLog(QueryThreadLog & thread_log, const String & current_database); void logToQueryThreadLog(QueryThreadLog & thread_log, const String & current_database);
void attachToGroupImpl(const ThreadGroupStatusPtr & thread_group_); void attachToGroupImpl(const ThreadGroupPtr & thread_group_);
}; };
/** /**

View File

@ -3,12 +3,10 @@
#include <functional> #include <functional>
#include <Common/ZooKeeper/ZooKeeper.h> #include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperWithFaultInjection.h>
namespace zkutil namespace zkutil
{ {
using GetZooKeeper = std::function<ZooKeeperPtr()>; using GetZooKeeper = std::function<ZooKeeperPtr()>;
using GetZooKeeperWithFaultInjection = std::function<Coordination::ZooKeeperWithFaultInjection::Ptr()>;
} }

View File

@ -15,6 +15,7 @@
#include "Common/ZooKeeper/IKeeper.h" #include "Common/ZooKeeper/IKeeper.h"
#include <Common/StringUtils/StringUtils.h> #include <Common/StringUtils/StringUtils.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/logger_useful.h>
#include <Poco/Net/NetException.h> #include <Poco/Net/NetException.h>
#include <Poco/Net/DNS.h> #include <Poco/Net/DNS.h>

View File

@ -7,7 +7,6 @@
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <string> #include <string>
#include <Common/logger_useful.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>

View File

@ -1,4 +1,6 @@
#include <Common/ZooKeeper/ZooKeeperLock.h> #include <Common/ZooKeeper/ZooKeeperLock.h>
#include <Common/logger_useful.h>
#include <Common/ErrorCodes.h>
#include <filesystem> #include <filesystem>
namespace DB namespace DB

View File

@ -3,7 +3,8 @@
#include <Common/ZooKeeper/KeeperException.h> #include <Common/ZooKeeper/KeeperException.h>
#include <memory> #include <memory>
#include <string> #include <string>
#include <Common/logger_useful.h>
namespace Poco { class Logger; }
namespace zkutil namespace zkutil
{ {

View File

@ -4,6 +4,7 @@
#include <Common/ZooKeeper/Types.h> #include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h> #include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h> #include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/logger_useful.h>
#include <Common/randomSeed.h> #include <Common/randomSeed.h>
namespace DB namespace DB

View File

@ -1,7 +1,6 @@
#pragma once #pragma once
#include <base/scope_guard.h> #include <base/scope_guard.h>
#include <Common/logger_useful.h>
#include <Common/LockMemoryExceptionInThread.h> #include <Common/LockMemoryExceptionInThread.h>
/// Same as SCOPE_EXIT() but block the MEMORY_LIMIT_EXCEEDED errors. /// Same as SCOPE_EXIT() but block the MEMORY_LIMIT_EXCEEDED errors.

View File

@ -10,6 +10,7 @@
#include <Common/checkStackSize.h> #include <Common/checkStackSize.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/logger_useful.h>
#include <future> #include <future>
#include <chrono> #include <chrono>

View File

@ -9,7 +9,6 @@
#include <Common/ConcurrentBoundedQueue.h> #include <Common/ConcurrentBoundedQueue.h>
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/logger_useful.h>
#include <functional> #include <functional>
#include <Coordination/KeeperServer.h> #include <Coordination/KeeperServer.h>
#include <Coordination/CoordinationSettings.h> #include <Coordination/CoordinationSettings.h>

View File

@ -1,5 +1,6 @@
#include <Coordination/KeeperLogStore.h> #include <Coordination/KeeperLogStore.h>
#include <IO/CompressionMethod.h> #include <IO/CompressionMethod.h>
#include <Common/logger_useful.h>
namespace DB namespace DB
{ {

View File

@ -4,7 +4,6 @@
#include <mutex> #include <mutex>
#include <Core/Types.h> #include <Core/Types.h>
#include <Coordination/Changelog.h> #include <Coordination/Changelog.h>
#include <Common/logger_useful.h>
#include <base/defines.h> #include <base/defines.h>
namespace DB namespace DB

View File

@ -9,7 +9,6 @@
#if USE_AWS_S3 #if USE_AWS_S3
#include <Common/ConcurrentBoundedQueue.h> #include <Common/ConcurrentBoundedQueue.h>
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Common/logger_useful.h>
#include <string> #include <string>
#endif #endif

View File

@ -8,9 +8,10 @@
#include <Coordination/WriteBufferFromNuraftBuffer.h> #include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <sys/mman.h> #include <sys/mman.h>
#include "Common/ZooKeeper/ZooKeeperCommon.h" #include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h> #include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/logger_useful.h>
#include "Coordination/KeeperStorage.h" #include "Coordination/KeeperStorage.h"

View File

@ -8,7 +8,6 @@
#include <libnuraft/nuraft.hxx> #include <libnuraft/nuraft.hxx>
#include <Common/ConcurrentBoundedQueue.h> #include <Common/ConcurrentBoundedQueue.h>
#include <Common/logger_useful.h>
namespace DB namespace DB

View File

@ -7,6 +7,7 @@
#include <Common/isLocalAddress.h> #include <Common/isLocalAddress.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <Common/getMultipleKeysFromConfig.h> #include <Common/getMultipleKeysFromConfig.h>
#include <Common/logger_useful.h>
namespace DB namespace DB
{ {

View File

@ -6,6 +6,7 @@
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <Common/ZooKeeper/ZooKeeperIO.h> #include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/logger_useful.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <Coordination/pathUtils.h> #include <Coordination/pathUtils.h>

View File

@ -1,7 +1,6 @@
#pragma once #pragma once
#include <string> #include <string>
#include <Coordination/KeeperStorage.h> #include <Coordination/KeeperStorage.h>
#include <Common/logger_useful.h>
namespace DB namespace DB
{ {

View File

@ -6,6 +6,7 @@
#include <Common/parseRemoteDescription.h> #include <Common/parseRemoteDescription.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/quoteString.h> #include <Common/quoteString.h>
#include <Common/logger_useful.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <IO/Operators.h> #include <IO/Operators.h>

View File

@ -8,7 +8,6 @@
#include "ConnectionHolder.h" #include "ConnectionHolder.h"
#include <mutex> #include <mutex>
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <Common/logger_useful.h>
#include <Storages/ExternalDataSourceConfiguration.h> #include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/StoragePostgreSQL.h> #include <Storages/StoragePostgreSQL.h>

View File

@ -4,6 +4,7 @@
#include <Common/JSONBuilder.h> #include <Common/JSONBuilder.h>
#include <Common/SipHash.h> #include <Common/SipHash.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Common/logger_useful.h>
#if USE_EMBEDDED_COMPILER #if USE_EMBEDDED_COMPILER
#include <DataTypes/Native.h> #include <DataTypes/Native.h>

View File

@ -16,7 +16,6 @@
#include <Poco/Util/ServerApplication.h> #include <Poco/Util/ServerApplication.h>
#include <Poco/Net/SocketAddress.h> #include <Poco/Net/SocketAddress.h>
#include <base/types.h> #include <base/types.h>
#include <Common/logger_useful.h>
#include <base/getThreadId.h> #include <base/getThreadId.h>
#include <Daemon/GraphiteWriter.h> #include <Daemon/GraphiteWriter.h>
#include <Common/Config/ConfigProcessor.h> #include <Common/Config/ConfigProcessor.h>

View File

@ -9,7 +9,6 @@
#include <pcg_random.hpp> #include <pcg_random.hpp>
#include <Common/logger_useful.h>
#include <Common/randomSeed.h> #include <Common/randomSeed.h>
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>

View File

@ -1,6 +1,5 @@
#pragma once #pragma once
#include <Common/logger_useful.h>
#include <Core/Block.h> #include <Core/Block.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>

View File

@ -1,6 +1,5 @@
#pragma once #pragma once
#include <Common/logger_useful.h>
#include <Core/Block.h> #include <Core/Block.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>

View File

@ -114,9 +114,18 @@ public:
~ParallelDictionaryLoader() ~ParallelDictionaryLoader()
{ {
for (auto & queue : shards_queues) try
queue->clearAndFinish(); {
pool.wait(); for (auto & queue : shards_queues)
queue->clearAndFinish();
/// NOTE: It is OK to not pass the exception next, since on success finish() should be called which will call wait()
pool.wait();
}
catch (...)
{
tryLogCurrentException(dictionary.log, "Exception had been thrown during parallel load of the dictionary");
}
} }
private: private:

View File

@ -5,6 +5,7 @@
#include <Common/IPv6ToBinary.h> #include <Common/IPv6ToBinary.h>
#include <Common/memcmpSmall.h> #include <Common/memcmpSmall.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Common/logger_useful.h>
#include <DataTypes/DataTypeFixedString.h> #include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesDecimal.h> #include <DataTypes/DataTypesDecimal.h>

View File

@ -11,7 +11,6 @@
#include <Columns/ColumnVector.h> #include <Columns/ColumnVector.h>
#include <Poco/Net/IPAddress.h> #include <Poco/Net/IPAddress.h>
#include <base/StringRef.h> #include <base/StringRef.h>
#include <Common/logger_useful.h>
#include "DictionaryStructure.h" #include "DictionaryStructure.h"
#include "IDictionary.h" #include "IDictionary.h"
#include "IDictionarySource.h" #include "IDictionarySource.h"

View File

@ -13,6 +13,7 @@
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <QueryPipeline/QueryPipeline.h> #include <QueryPipeline/QueryPipeline.h>
#include <Storages/ExternalDataSourceConfiguration.h> #include <Storages/ExternalDataSourceConfiguration.h>
#include <Common/logger_useful.h>
#endif #endif

View File

@ -8,7 +8,6 @@
#include "ExternalQueryBuilder.h" #include "ExternalQueryBuilder.h"
#include <Core/Block.h> #include <Core/Block.h>
#include <Common/LocalDateTime.h> #include <Common/LocalDateTime.h>
#include <Common/logger_useful.h>
#include <Core/PostgreSQL/PoolWithFailover.h> #include <Core/PostgreSQL/PoolWithFailover.h>

View File

@ -1,6 +1,5 @@
#pragma once #pragma once
#include <Common/logger_useful.h>
#include <Disks/DiskLocalCheckThread.h> #include <Disks/DiskLocalCheckThread.h>
#include <Disks/IDisk.h> #include <Disks/IDisk.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>

View File

@ -4,8 +4,9 @@
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <base/scope_guard.h> #include <base/scope_guard.h>
#include <Common/assert_cast.h> #include <Common/assert_cast.h>
#include <base/hex.h>
#include <Common/getRandomASCIIString.h> #include <Common/getRandomASCIIString.h>
#include <Common/logger_useful.h>
#include <base/hex.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>

View File

@ -1,7 +1,6 @@
#pragma once #pragma once
#include <Interpreters/Cache/FileCache.h> #include <Interpreters/Cache/FileCache.h>
#include <Common/logger_useful.h>
#include <IO/SeekableReadBuffer.h> #include <IO/SeekableReadBuffer.h>
#include <IO/WriteBufferFromFile.h> #include <IO/WriteBufferFromFile.h>
#include <IO/ReadSettings.h> #include <IO/ReadSettings.h>

View File

@ -9,7 +9,6 @@
#include <memory> #include <memory>
#include <Storages/StorageS3Settings.h> #include <Storages/StorageS3Settings.h>
#include <Common/MultiVersion.h> #include <Common/MultiVersion.h>
#include <Common/logger_useful.h>
namespace DB namespace DB

View File

@ -7,6 +7,7 @@
#include <Common/escapeForFileName.h> #include <Common/escapeForFileName.h>
#include <Common/formatReadable.h> #include <Common/formatReadable.h>
#include <Common/quoteString.h> #include <Common/quoteString.h>
#include <Common/logger_useful.h>
#include <set> #include <set>

View File

@ -10,7 +10,6 @@
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/formatReadable.h> #include <Common/formatReadable.h>
#include <Common/logger_useful.h>
#include <memory> #include <memory>
#include <mutex> #include <mutex>

View File

@ -10,7 +10,6 @@
#include <Functions/castTypeToEither.h> #include <Functions/castTypeToEither.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <Common/logger_useful.h>
#include <Poco/Logger.h> #include <Poco/Logger.h>
#include <Loggers/Loggers.h> #include <Loggers/Loggers.h>

View File

@ -8,6 +8,7 @@
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Interpreters/AggregationCommon.h> #include <Interpreters/AggregationCommon.h>
#include <Interpreters/Context_fwd.h>
#include <Common/HashTable/ClearableHashMap.h> #include <Common/HashTable/ClearableHashMap.h>
#include <Common/ColumnsHashing.h> #include <Common/ColumnsHashing.h>

View File

@ -9,6 +9,7 @@
#include <Functions/FunctionHelpers.h> #include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h> #include <Functions/IFunction.h>
#include <Interpreters/AggregationCommon.h> #include <Interpreters/AggregationCommon.h>
#include <Interpreters/Context_fwd.h>
#include <Common/ColumnsHashing.h> #include <Common/ColumnsHashing.h>
#include <Common/HashTable/ClearableHashMap.h> #include <Common/HashTable/ClearableHashMap.h>

View File

@ -18,7 +18,6 @@
#include <Interpreters/castColumn.h> #include <Interpreters/castColumn.h>
#include <cmath> #include <cmath>
#include <Common/logger_useful.h>
namespace DB namespace DB
{ {

View File

@ -2,7 +2,10 @@
#if USE_AWS_S3 #if USE_AWS_S3
#include <Core/SettingsEnums.h>
#include <Common/logger_useful.h>
#include <aws/core/utils/logging/LogLevel.h> #include <aws/core/utils/logging/LogLevel.h>
#include <Poco/Logger.h>
namespace namespace
{ {

View File

@ -4,8 +4,10 @@
#if USE_AWS_S3 #if USE_AWS_S3
#include <aws/core/utils/logging/LogSystemInterface.h> #include <aws/core/utils/logging/LogSystemInterface.h>
#include <base/types.h>
#include <unordered_map>
#include <Common/logger_useful.h> namespace Poco { class Logger; }
namespace DB::S3 namespace DB::S3
{ {

View File

@ -4,7 +4,6 @@
#if USE_AWS_S3 #if USE_AWS_S3
#include <Common/logger_useful.h>
#include <Common/assert_cast.h> #include <Common/assert_cast.h>
#include <base/scope_guard.h> #include <base/scope_guard.h>

View File

@ -9,7 +9,6 @@
# include <aws/core/auth/AWSCredentialsProvider.h> # include <aws/core/auth/AWSCredentialsProvider.h>
# include <aws/core/auth/AWSCredentialsProviderChain.h> # include <aws/core/auth/AWSCredentialsProviderChain.h>
# include <Common/logger_useful.h>
# include <IO/S3/PocoHTTPClient.h> # include <IO/S3/PocoHTTPClient.h>

View File

@ -9,7 +9,6 @@
#include <list> #include <list>
#include <base/types.h> #include <base/types.h>
#include <Common/logger_useful.h>
#include <IO/BufferWithOwnMemory.h> #include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBuffer.h> #include <IO/WriteBuffer.h>
#include <IO/WriteSettings.h> #include <IO/WriteSettings.h>

View File

@ -8,9 +8,6 @@
#endif #endif
#include <base/sort.h> #include <base/sort.h>
#include <Common/Stopwatch.h>
#include <Common/setThreadName.h>
#include <Common/formatReadable.h>
#include <DataTypes/DataTypeAggregateFunction.h> #include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h> #include <DataTypes/DataTypeLowCardinality.h>
@ -21,14 +18,6 @@
#include <IO/WriteBufferFromFile.h> #include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h> #include <Compression/CompressedWriteBuffer.h>
#include <Interpreters/Aggregator.h> #include <Interpreters/Aggregator.h>
#include <Common/CacheBase.h>
#include <Common/MemoryTracker.h>
#include <Common/CurrentThread.h>
#include <Common/CurrentMetrics.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/JSONBuilder.h>
#include <Common/filesystemHelpers.h>
#include <AggregateFunctions/AggregateFunctionArray.h> #include <AggregateFunctions/AggregateFunctionArray.h>
#include <AggregateFunctions/AggregateFunctionState.h> #include <AggregateFunctions/AggregateFunctionState.h>
#include <IO/Operators.h> #include <IO/Operators.h>
@ -37,6 +26,18 @@
#include <Core/ProtocolDefines.h> #include <Core/ProtocolDefines.h>
#include <Disks/TemporaryFileOnDisk.h> #include <Disks/TemporaryFileOnDisk.h>
#include <Interpreters/TemporaryDataOnDisk.h> #include <Interpreters/TemporaryDataOnDisk.h>
#include <Common/Stopwatch.h>
#include <Common/setThreadName.h>
#include <Common/formatReadable.h>
#include <Common/logger_useful.h>
#include <Common/CacheBase.h>
#include <Common/MemoryTracker.h>
#include <Common/CurrentThread.h>
#include <Common/CurrentMetrics.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/JSONBuilder.h>
#include <Common/filesystemHelpers.h>
#include <Common/scope_guard_safe.h> #include <Common/scope_guard_safe.h>
#include <Parsers/ASTSelectQuery.h> #include <Parsers/ASTSelectQuery.h>
@ -2315,7 +2316,7 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
std::atomic<UInt32> next_bucket_to_merge = 0; std::atomic<UInt32> next_bucket_to_merge = 0;
auto converter = [&](size_t thread_id, ThreadGroupStatusPtr thread_group) auto converter = [&](size_t thread_id, ThreadGroupPtr thread_group)
{ {
SCOPE_EXIT_SAFE( SCOPE_EXIT_SAFE(
if (thread_group) if (thread_group)
@ -3043,7 +3044,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
LOG_TRACE(log, "Merging partially aggregated two-level data."); LOG_TRACE(log, "Merging partially aggregated two-level data.");
auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, ThreadGroupStatusPtr thread_group) auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, ThreadGroupPtr thread_group)
{ {
SCOPE_EXIT_SAFE( SCOPE_EXIT_SAFE(
if (thread_group) if (thread_group)

View File

@ -5,7 +5,6 @@
#include <mutex> #include <mutex>
#include <type_traits> #include <type_traits>
#include <Common/logger_useful.h>
#include <base/StringRef.h> #include <base/StringRef.h>
#include <Common/Arena.h> #include <Common/Arena.h>

View File

@ -12,7 +12,6 @@
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
#include <Core/Types.h> #include <Core/Types.h>
#include <Common/logger_useful.h>
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <IO/ReadSettings.h> #include <IO/ReadSettings.h>
#include <Interpreters/Cache/IFileCachePriority.h> #include <Interpreters/Cache/IFileCachePriority.h>

View File

@ -1,5 +1,6 @@
#include <Interpreters/Cache/LRUFileCachePriority.h> #include <Interpreters/Cache/LRUFileCachePriority.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/logger_useful.h>
namespace CurrentMetrics namespace CurrentMetrics
{ {

View File

@ -2,7 +2,6 @@
#include <list> #include <list>
#include <Interpreters/Cache/IFileCachePriority.h> #include <Interpreters/Cache/IFileCachePriority.h>
#include <Common/logger_useful.h>
namespace DB namespace DB
{ {

View File

@ -2,6 +2,7 @@
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Common/DNSResolver.h> #include <Common/DNSResolver.h>
#include <Common/logger_useful.h>
namespace DB namespace DB

View File

@ -1,7 +1,7 @@
#include <Interpreters/DirectJoin.h> #include <Interpreters/DirectJoin.h>
#include <Columns/ColumnNullable.h>
#include <Interpreters/castColumn.h> #include <Interpreters/castColumn.h>
#include <Columns/ColumnNullable.h>
#include <Common/logger_useful.h>
namespace DB namespace DB
{ {

View File

@ -1,6 +1,5 @@
#pragma once #pragma once
#include <Common/logger_useful.h>
#include <Core/Block.h> #include <Core/Block.h>

View File

@ -9,8 +9,9 @@
#include <Common/randomSeed.h> #include <Common/randomSeed.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <Common/StatusInfo.h> #include <Common/StatusInfo.h>
#include <base/chrono_io.h>
#include <Common/scope_guard_safe.h> #include <Common/scope_guard_safe.h>
#include <Common/logger_useful.h>
#include <base/chrono_io.h>
#include <boost/range/adaptor/map.hpp> #include <boost/range/adaptor/map.hpp>
#include <boost/range/algorithm/copy.hpp> #include <boost/range/algorithm/copy.hpp>
#include <unordered_set> #include <unordered_set>
@ -967,7 +968,7 @@ private:
} }
/// Does the loading, possibly in the separate thread. /// Does the loading, possibly in the separate thread.
void doLoading(const String & name, size_t loading_id, bool forced_to_reload, size_t min_id_to_finish_loading_dependencies_, bool async, ThreadGroupStatusPtr thread_group = {}) void doLoading(const String & name, size_t loading_id, bool forced_to_reload, size_t min_id_to_finish_loading_dependencies_, bool async, ThreadGroupPtr thread_group = {})
{ {
SCOPE_EXIT_SAFE( SCOPE_EXIT_SAFE(
if (thread_group) if (thread_group)

View File

@ -6,10 +6,11 @@
#include <base/types.h> #include <base/types.h>
#include <Interpreters/IExternalLoadable.h> #include <Interpreters/IExternalLoadable.h>
#include <Interpreters/IExternalLoaderConfigRepository.h> #include <Interpreters/IExternalLoaderConfigRepository.h>
#include <Common/logger_useful.h>
#include <base/scope_guard.h> #include <base/scope_guard.h>
#include <Common/ExternalLoaderStatus.h> #include <Common/ExternalLoaderStatus.h>
#include <Core/Types.h>
namespace Poco { class Logger; }
namespace DB namespace DB
{ {

View File

@ -8,7 +8,6 @@
#include <DataTypes/DataTypeTuple.h> #include <DataTypes/DataTypeTuple.h>
#include <Interpreters/SystemLog.h> #include <Interpreters/SystemLog.h>
#include <Interpreters/TransactionVersionMetadata.h> #include <Interpreters/TransactionVersionMetadata.h>
#include <Common/logger_useful.h>
namespace DB namespace DB
{ {

View File

@ -4,6 +4,7 @@
#include <Interpreters/TableJoin.h> #include <Interpreters/TableJoin.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h> #include <DataTypes/DataTypeLowCardinality.h>
#include <Common/logger_useful.h>
#include <Poco/Logger.h> #include <Poco/Logger.h>
namespace DB namespace DB

View File

@ -16,7 +16,6 @@
#include <Common/HashTable/HashMap.h> #include <Common/HashTable/HashMap.h>
#include <Common/HashTable/FixedHashMap.h> #include <Common/HashTable/FixedHashMap.h>
#include <Storages/TableLockHolder.h> #include <Storages/TableLockHolder.h>
#include <Common/logger_useful.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h> #include <Columns/ColumnFixedString.h>

View File

@ -7,7 +7,6 @@
#include <Core/Block.h> #include <Core/Block.h>
#include <Columns/IColumn.h> #include <Columns/IColumn.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/logger_useful.h>
namespace DB namespace DB
{ {

View File

@ -32,6 +32,7 @@
#include <Storages/StorageMaterializedView.h> #include <Storages/StorageMaterializedView.h>
#include <Storages/WindowView/StorageWindowView.h> #include <Storages/WindowView/StorageWindowView.h>
#include <TableFunctions/TableFunctionFactory.h> #include <TableFunctions/TableFunctionFactory.h>
#include <Common/ThreadStatus.h>
#include <Common/checkStackSize.h> #include <Common/checkStackSize.h>
@ -233,8 +234,14 @@ Chain InterpreterInsertQuery::buildChain(
ThreadStatusesHolderPtr thread_status_holder, ThreadStatusesHolderPtr thread_status_holder,
std::atomic_uint64_t * elapsed_counter_ms) std::atomic_uint64_t * elapsed_counter_ms)
{ {
ThreadGroupPtr running_group;
if (current_thread)
running_group = current_thread->getThreadGroup();
if (!running_group)
running_group = std::make_shared<ThreadGroup>(getContext());
auto sample = getSampleBlock(columns, table, metadata_snapshot); auto sample = getSampleBlock(columns, table, metadata_snapshot);
return buildChainImpl(table, metadata_snapshot, sample, thread_status_holder, elapsed_counter_ms); return buildChainImpl(table, metadata_snapshot, sample, thread_status_holder, running_group, elapsed_counter_ms);
} }
Chain InterpreterInsertQuery::buildChainImpl( Chain InterpreterInsertQuery::buildChainImpl(
@ -242,6 +249,7 @@ Chain InterpreterInsertQuery::buildChainImpl(
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block, const Block & query_sample_block,
ThreadStatusesHolderPtr thread_status_holder, ThreadStatusesHolderPtr thread_status_holder,
ThreadGroupPtr running_group,
std::atomic_uint64_t * elapsed_counter_ms) std::atomic_uint64_t * elapsed_counter_ms)
{ {
ThreadStatus * thread_status = current_thread; ThreadStatus * thread_status = current_thread;
@ -273,7 +281,9 @@ Chain InterpreterInsertQuery::buildChainImpl(
} }
else else
{ {
out = buildPushingToViewsChain(table, metadata_snapshot, context_ptr, query_ptr, no_destination, thread_status_holder, elapsed_counter_ms); out = buildPushingToViewsChain(table, metadata_snapshot, context_ptr,
query_ptr, no_destination,
thread_status_holder, running_group, elapsed_counter_ms);
} }
/// Note that we wrap transforms one on top of another, so we write them in reverse of data processing order. /// Note that we wrap transforms one on top of another, so we write them in reverse of data processing order.
@ -461,9 +471,17 @@ BlockIO InterpreterInsertQuery::execute()
pipeline = interpreter_watch.buildQueryPipeline(); pipeline = interpreter_watch.buildQueryPipeline();
} }
ThreadGroupPtr running_group;
if (current_thread)
running_group = current_thread->getThreadGroup();
if (!running_group)
running_group = std::make_shared<ThreadGroup>(getContext());
for (size_t i = 0; i < out_streams_size; ++i) for (size_t i = 0; i < out_streams_size; ++i)
{ {
auto out = buildChainImpl(table, metadata_snapshot, query_sample_block, nullptr, nullptr); auto out = buildChainImpl(table, metadata_snapshot, query_sample_block,
/* thread_status_holder= */ nullptr,
running_group,
/* elapsed_counter_ms= */ nullptr);
out_chains.emplace_back(std::move(out)); out_chains.emplace_back(std::move(out));
} }
} }

View File

@ -4,6 +4,7 @@
#include <Interpreters/IInterpreter.h> #include <Interpreters/IInterpreter.h>
#include <Parsers/ASTInsertQuery.h> #include <Parsers/ASTInsertQuery.h>
#include <Storages/StorageInMemoryMetadata.h> #include <Storages/StorageInMemoryMetadata.h>
#include <Common/ThreadStatus.h>
namespace DB namespace DB
{ {
@ -70,6 +71,7 @@ private:
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block, const Block & query_sample_block,
ThreadStatusesHolderPtr thread_status_holder, ThreadStatusesHolderPtr thread_status_holder,
ThreadGroupPtr running_group,
std::atomic_uint64_t * elapsed_counter_ms); std::atomic_uint64_t * elapsed_counter_ms);
}; };

View File

@ -2,7 +2,6 @@
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/logger_useful.h>
#include <unordered_set> #include <unordered_set>
namespace DB namespace DB

Some files were not shown because too many files have changed in this diff Show More