Merge remote-tracking branch 'ClickHouse/master' into qiangxuhui-loongarch64

This commit is contained in:
Robert Schulze 2024-05-17 07:56:01 +00:00
commit 5fb8ea4c62
No known key found for this signature in database
GPG Key ID: 26703B55FB13728A
257 changed files with 4530 additions and 1330 deletions

View File

@ -89,7 +89,7 @@ PenaltyBreakFirstLessLess: 120
PenaltyBreakString: 1000
PenaltyExcessCharacter: 1000000
PenaltyReturnTypeOnItsOwnLine: 60
RemoveBracesLLVM: true
RemoveBracesLLVM: false
SpaceAfterCStyleCast: false
SpaceBeforeAssignmentOperators: true
SpaceBeforeParens: ControlStatements

View File

@ -138,6 +138,8 @@ Checks: [
# This is a good check, but clang-tidy crashes, see https://github.com/llvm/llvm-project/issues/91872
'-modernize-use-constraints',
# https://github.com/abseil/abseil-cpp/issues/1667
'-clang-analyzer-optin.core.EnumCastOutOfRange'
]
WarningsAsErrors: '*'

View File

@ -85,4 +85,4 @@ At a minimum, the following information should be added (but add more as needed)
- [ ] <!---batch_2--> 3
- [ ] <!---batch_3--> 4
<details>
</details>

View File

@ -9,6 +9,12 @@ on: # yamllint disable-line rule:truthy
push:
branches:
- 'backport/**'
# Cancel the previous wf run in PRs.
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
RunConfig:
runs-on: [self-hosted, style-checker-aarch64]

View File

@ -1,19 +0,0 @@
name: Cancel
env:
# Force the stdout and stderr streams to be unbuffered
PYTHONUNBUFFERED: 1
on: # yamllint disable-line rule:truthy
workflow_run:
workflows: ["PullRequestCI", "ReleaseBranchCI", "DocsCheck", "BackportPR"]
types:
- requested
jobs:
cancel:
runs-on: [self-hosted, style-checker]
steps:
- uses: styfle/cancel-workflow-action@0.9.1
with:
all_but_latest: true
workflow_id: ${{ github.event.workflow.id }}

View File

@ -1,11 +0,0 @@
# The CI for each commit, prints envs and content of GITHUB_EVENT_PATH
name: Debug
'on':
[push, pull_request, pull_request_review, release, workflow_dispatch, workflow_call]
jobs:
DebugInfo:
runs-on: ubuntu-latest
steps:
- uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6

View File

@ -10,14 +10,13 @@ env:
workflow_dispatch:
jobs:
Debug:
# The task for having a preserved ENV and event.json for later investigation
uses: ./.github/workflows/debug.yml
RunConfig:
runs-on: [self-hosted, style-checker-aarch64]
outputs:
data: ${{ steps.runconfig.outputs.CI_DATA }}
steps:
- name: DebugInfo
uses: hmarr/debug-action@f7318c783045ac39ed9bb497e22ce835fdafbfe6
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:

View File

@ -14,6 +14,11 @@ on: # yamllint disable-line rule:truthy
branches:
- master
# Cancel the previous wf run in PRs.
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
RunConfig:
runs-on: [self-hosted, style-checker-aarch64]

View File

@ -1,23 +0,0 @@
name: PullRequestApprovedCI
env:
# Force the stdout and stderr streams to be unbuffered
PYTHONUNBUFFERED: 1
on: # yamllint disable-line rule:truthy
pull_request_review:
types:
- submitted
jobs:
MergeOnApproval:
runs-on: [self-hosted, style-checker]
steps:
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
- name: Merge approved PR
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 merge_pr.py --check-approved

View File

@ -31,7 +31,9 @@ add_library(unwind ${LIBUNWIND_SOURCES})
set_target_properties(unwind PROPERTIES FOLDER "contrib/libunwind-cmake")
target_include_directories(unwind SYSTEM BEFORE PUBLIC $<BUILD_INTERFACE:${LIBUNWIND_SOURCE_DIR}/include>)
target_compile_definitions(unwind PRIVATE -D_LIBUNWIND_NO_HEAP=1 -D_DEBUG -D_LIBUNWIND_IS_NATIVE_ONLY)
target_compile_definitions(unwind PRIVATE -D_LIBUNWIND_NO_HEAP=1)
# NOTE: from this macros sizeof(unw_context_t)/sizeof(unw_cursor_t) is depends, so it should be set always
target_compile_definitions(unwind PUBLIC -D_LIBUNWIND_IS_NATIVE_ONLY)
# We should enable optimizations (otherwise it will be too slow in debug)
# and disable sanitizers (otherwise infinite loop may happen)

View File

@ -83,7 +83,7 @@ setup_minio() {
./mc alias set clickminio http://localhost:11111 clickhouse clickhouse
./mc admin user add clickminio test testtest
./mc admin policy set clickminio readwrite user=test
./mc mb clickminio/test
./mc mb --ignore-existing clickminio/test
if [ "$test_type" = "stateless" ]; then
./mc policy set public clickminio/test
fi

View File

@ -75,7 +75,7 @@ The supported formats are:
| [ArrowStream](#data-format-arrow-stream) | ✔ | ✔ |
| [ORC](#data-format-orc) | ✔ | ✔ |
| [One](#data-format-one) | ✔ | ✗ |
| [Npy](#data-format-npy) | ✔ | |
| [Npy](#data-format-npy) | ✔ | |
| [RowBinary](#rowbinary) | ✔ | ✔ |
| [RowBinaryWithNames](#rowbinarywithnamesandtypes) | ✔ | ✔ |
| [RowBinaryWithNamesAndTypes](#rowbinarywithnamesandtypes) | ✔ | ✔ |
@ -2466,23 +2466,22 @@ Result:
## Npy {#data-format-npy}
This function is designed to load a NumPy array from a .npy file into ClickHouse. The NumPy file format is a binary format used for efficiently storing arrays of numerical data. During import, ClickHouse treats top level dimension as an array of rows with single column. Supported Npy data types and their corresponding type in ClickHouse:
| Npy type | ClickHouse type |
|:--------:|:---------------:|
| b1 | UInt8 |
| i1 | Int8 |
| i2 | Int16 |
| i4 | Int32 |
| i8 | Int64 |
| u1 | UInt8 |
| u2 | UInt16 |
| u4 | UInt32 |
| u8 | UInt64 |
| f2 | Float32 |
| f4 | Float32 |
| f8 | Float64 |
| S | String |
| U | String |
This function is designed to load a NumPy array from a .npy file into ClickHouse. The NumPy file format is a binary format used for efficiently storing arrays of numerical data. During import, ClickHouse treats top level dimension as an array of rows with single column. Supported Npy data types and their corresponding type in ClickHouse:
| Npy data type (`INSERT`) | ClickHouse data type | Npy data type (`SELECT`) |
|--------------------------|-----------------------------------------------------------------|--------------------------|
| `i1` | [Int8](/docs/en/sql-reference/data-types/int-uint.md) | `i1` |
| `i2` | [Int16](/docs/en/sql-reference/data-types/int-uint.md) | `i2` |
| `i4` | [Int32](/docs/en/sql-reference/data-types/int-uint.md) | `i4` |
| `i8` | [Int64](/docs/en/sql-reference/data-types/int-uint.md) | `i8` |
| `u1`, `b1` | [UInt8](/docs/en/sql-reference/data-types/int-uint.md) | `u1` |
| `u2` | [UInt16](/docs/en/sql-reference/data-types/int-uint.md) | `u2` |
| `u4` | [UInt32](/docs/en/sql-reference/data-types/int-uint.md) | `u4` |
| `u8` | [UInt64](/docs/en/sql-reference/data-types/int-uint.md) | `u8` |
| `f2`, `f4` | [Float32](/docs/en/sql-reference/data-types/float.md) | `f4` |
| `f8` | [Float64](/docs/en/sql-reference/data-types/float.md) | `f8` |
| `S`, `U` | [String](/docs/en/sql-reference/data-types/string.md) | `S` |
| | [FixedString](/docs/en/sql-reference/data-types/fixedstring.md) | `S` |
**Example of saving an array in .npy format using Python**
@ -2509,6 +2508,14 @@ Result:
└───────────────┘
```
**Selecting Data**
You can select data from a ClickHouse table and save them into some file in the Npy format by the following command:
```bash
$ clickhouse-client --query="SELECT {column} FROM {some_table} FORMAT Npy" > {filename.npy}
```
## LineAsString {#lineasstring}
In this format, every line of input data is interpreted as a single string value. This format can only be parsed for table with a single field of type [String](/docs/en/sql-reference/data-types/string.md). The remaining columns must be set to [DEFAULT](/docs/en/sql-reference/statements/create/table.md/#default) or [MATERIALIZED](/docs/en/sql-reference/statements/create/table.md/#materialized), or omitted.

View File

@ -67,6 +67,8 @@ generates merged configuration file:
</clickhouse>
```
### Using from_env and from_zk
To specify that a value of an element should be replaced by the value of an environment variable, you can use attribute `from_env`.
Example with `$MAX_QUERY_SIZE = 150000`:
@ -93,6 +95,59 @@ which is equal to
</clickhouse>
```
The same is possible using `from_zk`:
``` xml
<clickhouse>
<postgresql_port from_zk="/zk_configs/postgresql_port"/>
</clickhouse>
```
```
# clickhouse-keeper-client
/ :) touch /zk_configs
/ :) create /zk_configs/postgresql_port "9005"
/ :) get /zk_configs/postgresql_port
9005
```
which is equal to
``` xml
<clickhouse>
<postgresql_port>9005</postgresql_port>
</clickhouse>
```
#### Default values for from_env and from_zk attributes
It's possible to set the default value and substitute it only if the environment variable or zookeeper node is set using `replace="1"`.
With previous example, but `MAX_QUERY_SIZE` is unset:
``` xml
<clickhouse>
<profiles>
<default>
<max_query_size from_env="MAX_QUERY_SIZE" replace="1">150000</max_query_size>
</default>
</profiles>
</clickhouse>
```
will take the default value
``` xml
<clickhouse>
<profiles>
<default>
<max_query_size>150000</max_query_size>
</default>
</profiles>
</clickhouse>
```
## Substituting Configuration {#substitution}
The config can define substitutions. There are two types of substitutions:

View File

@ -32,20 +32,21 @@ WHERE name LIKE '%thread_pool%'
```
``` text
┌─name────────────────────────────────────────┬─value─┬─default─┬─changed─┬─description─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─type───┬─changeable_without_restart─┬─is_obsolete─┐
│ max_thread_pool_size │ 10000 │ 10000 │ 0 │ The maximum number of threads that could be allocated from the OS and used for query execution and background operations. │ UInt64 │ No │ 0 │
│ max_thread_pool_free_size │ 1000 │ 1000 │ 0 │ The maximum number of threads that will always stay in a global thread pool once allocated and remain idle in case of insufficient number of tasks. │ UInt64 │ No │ 0 │
│ thread_pool_queue_size │ 10000 │ 10000 │ 0 │ The maximum number of tasks that will be placed in a queue and wait for execution. │ UInt64 │ No │ 0 │
│ max_io_thread_pool_size │ 100 │ 100 │ 0 │ The maximum number of threads that would be used for IO operations │ UInt64 │ No │ 0 │
│ max_io_thread_pool_free_size │ 0 │ 0 │ 0 │ Max free size for IO thread pool. │ UInt64 │ No │ 0 │
│ io_thread_pool_queue_size │ 10000 │ 10000 │ 0 │ Queue size for IO thread pool. │ UInt64 │ No │ 0 │
│ max_active_parts_loading_thread_pool_size │ 64 │ 64 │ 0 │ The number of threads to load active set of data parts (Active ones) at startup. │ UInt64 │ No │ 0 │
│ max_outdated_parts_loading_thread_pool_size │ 32 │ 32 │ 0 │ The number of threads to load inactive set of data parts (Outdated ones) at startup. │ UInt64 │ No │ 0 │
│ max_parts_cleaning_thread_pool_size │ 128 │ 128 │ 0 │ The number of threads for concurrent removal of inactive data parts. │ UInt64 │ No │ 0 │
│ max_backups_io_thread_pool_size │ 1000 │ 1000 │ 0 │ The maximum number of threads that would be used for IO operations for BACKUP queries │ UInt64 │ No │ 0 │
│ max_backups_io_thread_pool_free_size │ 0 │ 0 │ 0 │ Max free size for backups IO thread pool. │ UInt64 │ No │ 0 │
│ backups_io_thread_pool_queue_size │ 0 │ 0 │ 0 │ Queue size for backups IO thread pool. │ UInt64 │ No │ 0 │
└─────────────────────────────────────────────┴───────┴─────────┴─────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────┴────────────────────────────┴─────────────┘
┌─name──────────────────────────────────────────┬─value─┬─default─┬─changed─┬─description─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─type───┬─changeable_without_restart─┬─is_obsolete─┐
│ max_thread_pool_size │ 10000 │ 10000 │ 0 │ The maximum number of threads that could be allocated from the OS and used for query execution and background operations. │ UInt64 │ No │ 0 │
│ max_thread_pool_free_size │ 1000 │ 1000 │ 0 │ The maximum number of threads that will always stay in a global thread pool once allocated and remain idle in case of insufficient number of tasks. │ UInt64 │ No │ 0 │
│ thread_pool_queue_size │ 10000 │ 10000 │ 0 │ The maximum number of tasks that will be placed in a queue and wait for execution. │ UInt64 │ No │ 0 │
│ max_io_thread_pool_size │ 100 │ 100 │ 0 │ The maximum number of threads that would be used for IO operations │ UInt64 │ No │ 0 │
│ max_io_thread_pool_free_size │ 0 │ 0 │ 0 │ Max free size for IO thread pool. │ UInt64 │ No │ 0 │
│ io_thread_pool_queue_size │ 10000 │ 10000 │ 0 │ Queue size for IO thread pool. │ UInt64 │ No │ 0 │
│ max_active_parts_loading_thread_pool_size │ 64 │ 64 │ 0 │ The number of threads to load active set of data parts (Active ones) at startup. │ UInt64 │ No │ 0 │
│ max_outdated_parts_loading_thread_pool_size │ 32 │ 32 │ 0 │ The number of threads to load inactive set of data parts (Outdated ones) at startup. │ UInt64 │ No │ 0 │
│ max_unexpected_parts_loading_thread_pool_size │ 32 │ 32 │ 0 │ The number of threads to load inactive set of data parts (Unexpected ones) at startup. │ UInt64 │ No │ 0 │
│ max_parts_cleaning_thread_pool_size │ 128 │ 128 │ 0 │ The number of threads for concurrent removal of inactive data parts. │ UInt64 │ No │ 0 │
│ max_backups_io_thread_pool_size │ 1000 │ 1000 │ 0 │ The maximum number of threads that would be used for IO operations for BACKUP queries │ UInt64 │ No │ 0 │
│ max_backups_io_thread_pool_free_size │ 0 │ 0 │ 0 │ Max free size for backups IO thread pool. │ UInt64 │ No │ 0 │
│ backups_io_thread_pool_queue_size │ 0 │ 0 │ 0 │ Queue size for backups IO thread pool. │ UInt64 │ No │ 0 │
└───────────────────────────────────────────────┴───────┴─────────┴─────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────┴────────────────────────────┴─────────────┘
```

View File

@ -234,3 +234,34 @@ SELECT least(toDateTime32(now() + toIntervalDay(1)), toDateTime64(now(), 3))
:::note
The type returned is a DateTime64 as the DataTime32 must be promoted to 64 bit for the comparison.
:::
## clamp
Constrain the return value between A and B.
**Syntax**
``` sql
clamp(value, min, max)
```
**Arguments**
- `value` Input value.
- `min` Limit the lower bound.
- `max` Limit the upper bound.
**Returned values**
If the value is less than the minimum value, return the minimum value; if it is greater than the maximum value, return the maximum value; otherwise, return the current value.
Examples:
```sql
SELECT clamp(1, 2, 3) result, toTypeName(result) type;
```
```response
┌─result─┬─type────┐
│ 2 │ Float64 │
└────────┴─────────┘
```

View File

@ -3301,3 +3301,31 @@ The setting is not enabled by default for security reasons, because some headers
HTTP headers are case sensitive for this function.
If the function is used in the context of a distributed query, it returns non-empty result only on the initiator node.
## showCertificate
Shows information about the current server's Secure Sockets Layer (SSL) certificate if it has been configured. See [Configuring SSL-TLS](https://clickhouse.com/docs/en/guides/sre/configuring-ssl) for more information on how to configure ClickHouse to use OpenSSL certificates to validate connections.
**Syntax**
```sql
showCertificate()
```
**Returned value**
- Map of key-value pairs relating to the configured SSL certificate. [Map](../../sql-reference/data-types/map.md)([String](../../sql-reference/data-types/string.md), [String](../../sql-reference/data-types/string.md)).
**Example**
Query:
```sql
SELECT showCertificate() FORMAT LineAsString;
```
Result:
```response
{'version':'1','serial_number':'2D9071D64530052D48308473922C7ADAFA85D6C5','signature_algo':'sha256WithRSAEncryption','issuer':'/CN=marsnet.local CA','not_before':'May 7 17:01:21 2024 GMT','not_after':'May 7 17:01:21 2025 GMT','subject':'/CN=chnode1','pkey_algo':'rsaEncryption'}
```

View File

@ -152,7 +152,7 @@ Configuration example:
**Syntax**
``` sql
cutToFirstSignificantSubdomain(URL, TLD)
cutToFirstSignificantSubdomainCustom(URL, TLD)
```
**Arguments**

View File

@ -151,6 +151,14 @@ Result:
Query with `INNER` type of a join and conditions with `OR` and `AND`:
:::note
By default, non-equal conditions are supported as long as they use columns from the same table.
For example, `t1.a = t2.key AND t1.b > 0 AND t2.b > t2.c`, because `t1.b > 0` uses columns only from `t1` and `t2.b > t2.c` uses columns only from `t2`.
However, you can try experimental support for conditions like `t1.a = t2.key AND t1.b > t2.key`, check out section below for more details.
:::
``` sql
SELECT a, b, val FROM t1 INNER JOIN t2 ON t1.a = t2.key OR t1.b = t2.key AND t2.val > 3;
```
@ -165,7 +173,7 @@ Result:
└───┴────┴─────┘
```
## [experimental] Join with inequality conditions
## [experimental] Join with inequality conditions for columns from different tables
:::note
This feature is experimental. To use it, set `allow_experimental_join_condition` to 1 in your configuration files or by using the `SET` command:

View File

@ -248,6 +248,25 @@ FROM s3(
LIMIT 5;
```
## Working with archives
Suppose that we have several archive files with following URIs on S3:
- 'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-10.csv.zip'
- 'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-11.csv.zip'
- 'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-12.csv.zip'
Extracting data from these archives is possible using ::. Globs can be used both in the url part as well as in the part after :: (responsible for the name of a file inside the archive).
``` sql
SELECT *
FROM s3(
'https://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-2018-01-1{0..2}.csv.zip :: *.csv'
);
```
## Virtual Columns {#virtual-columns}
- `_path` — Path to the file. Type: `LowCardinalty(String)`.

View File

@ -1,5 +1,5 @@
---
slug: /en/operations/utilities/backupview
slug: /ru/operations/utilities/backupview
title: clickhouse_backupview
---

View File

@ -1,5 +1,5 @@
---
slug: /ru/sql-reference/functions/functions-for-nulls
slug: /ru/sql-reference/functions/null-functions
sidebar_position: 63
sidebar_label: "Функции для работы с Nullable-аргументами"
---

View File

@ -1,5 +1,5 @@
---
slug: /zh/sql-reference/functions/functions-for-nulls
slug: /zh/sql-reference/functions/null-functions
---
# Nullable处理函数 {#nullablechu-li-han-shu}

View File

@ -8,6 +8,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int KEEPER_EXCEPTION;
}
bool LSCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String path;
@ -216,6 +221,8 @@ bool FindSuperNodes::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> &
node->args.push_back(threshold->as<ASTLiteral &>().value);
ParserToken{TokenType::Whitespace}.ignore(pos);
String path;
if (!parseKeeperPath(pos, expected, path))
path = ".";
@ -230,19 +237,23 @@ void FindSuperNodes::execute(const ASTKeeperQuery * query, KeeperClient * client
auto path = client->getAbsolutePath(query->args[1].safeGet<String>());
Coordination::Stat stat;
client->zookeeper->get(path, &stat);
if (!client->zookeeper->exists(path, &stat))
return; /// It is ok if node was deleted meanwhile
if (stat.numChildren >= static_cast<Int32>(threshold))
{
std::cout << static_cast<String>(path) << "\t" << stat.numChildren << "\n";
return;
}
auto children = client->zookeeper->getChildren(path);
Strings children;
auto status = client->zookeeper->tryGetChildren(path, children);
if (status == Coordination::Error::ZNONODE)
return; /// It is ok if node was deleted meanwhile
else if (status != Coordination::Error::ZOK)
throw DB::Exception(DB::ErrorCodes::KEEPER_EXCEPTION, "Error {} while getting children of {}", status, path.string());
std::sort(children.begin(), children.end());
auto next_query = *query;
for (const auto & child : children)
{
auto next_query = *query;
next_query.args[1] = DB::Field(path / child);
execute(&next_query, client);
}
@ -310,31 +321,34 @@ bool FindBigFamily::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> &
return true;
}
/// DFS the subtree and return the number of nodes in the subtree
static Int64 traverse(const fs::path & path, KeeperClient * client, std::vector<std::tuple<Int64, String>> & result)
{
Int64 nodes_in_subtree = 1;
Strings children;
auto status = client->zookeeper->tryGetChildren(path, children);
if (status == Coordination::Error::ZNONODE)
return 0;
else if (status != Coordination::Error::ZOK)
throw DB::Exception(DB::ErrorCodes::KEEPER_EXCEPTION, "Error {} while getting children of {}", status, path.string());
for (auto & child : children)
nodes_in_subtree += traverse(path / child, client, result);
result.emplace_back(nodes_in_subtree, path.string());
return nodes_in_subtree;
}
void FindBigFamily::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
auto path = client->getAbsolutePath(query->args[0].safeGet<String>());
auto n = query->args[1].safeGet<UInt64>();
std::vector<std::tuple<Int32, String>> result;
std::vector<std::tuple<Int64, String>> result;
std::queue<fs::path> queue;
queue.push(path);
while (!queue.empty())
{
auto next_path = queue.front();
queue.pop();
auto children = client->zookeeper->getChildren(next_path);
for (auto & child : children)
child = next_path / child;
auto response = client->zookeeper->get(children);
for (size_t i = 0; i < response.size(); ++i)
{
result.emplace_back(response[i].stat.numChildren, children[i]);
queue.push(children[i]);
}
}
traverse(path, client, result);
std::sort(result.begin(), result.end(), std::greater());
for (UInt64 i = 0; i < std::min(result.size(), static_cast<size_t>(n)); ++i)

View File

@ -160,6 +160,14 @@ void LocalServer::initialize(Poco::Util::Application & self)
getOutdatedPartsLoadingThreadPool().setMaxTurboThreads(active_parts_loading_threads);
const size_t unexpected_parts_loading_threads = config().getUInt("max_unexpected_parts_loading_thread_pool_size", 32);
getUnexpectedPartsLoadingThreadPool().initialize(
unexpected_parts_loading_threads,
0, // We don't need any threads one all the parts will be loaded
unexpected_parts_loading_threads);
getUnexpectedPartsLoadingThreadPool().setMaxTurboThreads(active_parts_loading_threads);
const size_t cleanup_threads = config().getUInt("max_parts_cleaning_thread_pool_size", 128);
getPartsCleaningThreadPool().initialize(
cleanup_threads,

View File

@ -885,6 +885,16 @@ try
server_settings.max_active_parts_loading_thread_pool_size
);
getUnexpectedPartsLoadingThreadPool().initialize(
server_settings.max_unexpected_parts_loading_thread_pool_size,
0, // We don't need any threads once all the parts will be loaded
server_settings.max_unexpected_parts_loading_thread_pool_size);
/// It could grow if we need to synchronously wait until all the data parts will be loaded.
getUnexpectedPartsLoadingThreadPool().setMaxTurboThreads(
server_settings.max_active_parts_loading_thread_pool_size
);
getPartsCleaningThreadPool().initialize(
server_settings.max_parts_cleaning_thread_pool_size,
0, // We don't need any threads one all the parts will be deleted

View File

@ -753,13 +753,21 @@ size_t getMaxArraySize()
return 0xFFFFFF;
}
bool hasLimitArraySize()
{
if (auto context = Context::getGlobalContextInstance())
return context->getServerSettings().aggregate_function_group_array_has_limit_size;
return false;
}
template <bool Tlast>
AggregateFunctionPtr createAggregateFunctionGroupArray(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertUnary(name, argument_types);
bool limit_size = false;
bool limit_size = hasLimitArraySize();
UInt64 max_elems = getMaxArraySize();
if (parameters.empty())

View File

@ -769,6 +769,7 @@ struct IdentifierResolveScope
/// Nodes with duplicated aliases
std::unordered_set<QueryTreeNodePtr> nodes_with_duplicated_aliases;
std::vector<QueryTreeNodePtr> cloned_nodes_with_duplicated_aliases;
/// Current scope expression in resolve process stack
ExpressionsStack expressions_in_resolve_process_stack;
@ -1031,6 +1032,14 @@ public:
return true;
}
private:
void addDuplicatingAlias(const QueryTreeNodePtr & node)
{
scope.nodes_with_duplicated_aliases.emplace(node);
auto cloned_node = node->clone();
scope.cloned_nodes_with_duplicated_aliases.emplace_back(cloned_node);
scope.nodes_with_duplicated_aliases.emplace(cloned_node);
}
void updateAliasesIfNeeded(const QueryTreeNodePtr & node, bool is_lambda_node)
{
if (!node->hasAlias())
@ -1045,21 +1054,21 @@ private:
if (is_lambda_node)
{
if (scope.alias_name_to_expression_node->contains(alias))
scope.nodes_with_duplicated_aliases.insert(node);
addDuplicatingAlias(node);
auto [_, inserted] = scope.alias_name_to_lambda_node.insert(std::make_pair(alias, node));
if (!inserted)
scope.nodes_with_duplicated_aliases.insert(node);
addDuplicatingAlias(node);
return;
}
if (scope.alias_name_to_lambda_node.contains(alias))
scope.nodes_with_duplicated_aliases.insert(node);
addDuplicatingAlias(node);
auto [_, inserted] = scope.alias_name_to_expression_node->insert(std::make_pair(alias, node));
if (!inserted)
scope.nodes_with_duplicated_aliases.insert(node);
addDuplicatingAlias(node);
/// If node is identifier put it also in scope alias name to lambda node map
if (node->getNodeType() == QueryTreeNodeType::IDENTIFIER)
@ -2329,7 +2338,7 @@ void QueryAnalyzer::replaceNodesWithPositionalArguments(QueryTreeNodePtr & node_
pos = value;
else
{
if (static_cast<size_t>(std::abs(value)) > projection_nodes.size())
if (value < -static_cast<Int64>(projection_nodes.size()))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Negative positional argument number {} is out of bounds. Expected in range [-{}, -1]. In scope {}",
@ -4610,7 +4619,7 @@ QueryAnalyzer::QueryTreeNodesWithNames QueryAnalyzer::resolveUnqualifiedMatcher(
for (auto & table_expression : table_expressions_stack)
{
bool table_expression_in_resolve_process = scope.table_expressions_in_resolve_process.contains(table_expression.get());
bool table_expression_in_resolve_process = nearest_query_scope->table_expressions_in_resolve_process.contains(table_expression.get());
if (auto * array_join_node = table_expression->as<ArrayJoinNode>())
{
@ -4835,6 +4844,19 @@ ProjectionNames QueryAnalyzer::resolveMatcher(QueryTreeNodePtr & matcher_node, I
}
}
if (!scope.expressions_in_resolve_process_stack.hasAggregateFunction())
{
for (auto & [node, _] : matched_expression_nodes_with_names)
{
auto it = scope.nullable_group_by_keys.find(node);
if (it != scope.nullable_group_by_keys.end())
{
node = it->node->clone();
node->convertToNullable();
}
}
}
std::unordered_map<const IColumnTransformerNode *, std::unordered_set<std::string>> strict_transformer_to_used_column_names;
for (const auto & transformer : matcher_node_typed.getColumnTransformers().getNodes())
{
@ -5028,7 +5050,10 @@ ProjectionNames QueryAnalyzer::resolveMatcher(QueryTreeNodePtr & matcher_node, I
scope.scope_node->formatASTForErrorMessage());
}
auto original_ast = matcher_node->getOriginalAST();
matcher_node = std::move(list);
if (original_ast)
matcher_node->setOriginalAST(original_ast);
return result_projection_names;
}
@ -5599,9 +5624,13 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
/// Replace storage with values storage of insertion block
if (StoragePtr storage = scope.context->getViewSource())
{
if (auto * query_node = in_second_argument->as<QueryNode>())
QueryTreeNodePtr table_expression;
/// Process possibly nested sub-selects
for (auto * query_node = in_second_argument->as<QueryNode>(); query_node; query_node = table_expression->as<QueryNode>())
table_expression = extractLeftTableExpression(query_node->getJoinTree());
if (table_expression)
{
auto table_expression = extractLeftTableExpression(query_node->getJoinTree());
if (auto * query_table_node = table_expression->as<TableNode>())
{
if (query_table_node->getStorageID().getFullNameNotQuoted() == storage->getStorageID().getFullNameNotQuoted())
@ -6238,6 +6267,10 @@ ProjectionNames QueryAnalyzer::resolveExpressionNode(QueryTreeNodePtr & node, Id
result_projection_names.push_back(node_alias);
}
bool is_duplicated_alias = scope.nodes_with_duplicated_aliases.contains(node);
if (is_duplicated_alias)
scope.non_cached_identifier_lookups_during_expression_resolve.insert({Identifier{node_alias}, IdentifierLookupContext::EXPRESSION});
/** Do not use alias table if node has alias same as some other node.
* Example: WITH x -> x + 1 AS lambda SELECT 1 AS lambda;
* During 1 AS lambda resolve if we use alias table we replace node with x -> x + 1 AS lambda.
@ -6248,7 +6281,7 @@ ProjectionNames QueryAnalyzer::resolveExpressionNode(QueryTreeNodePtr & node, Id
* alias table because in alias table subquery could be evaluated as scalar.
*/
bool use_alias_table = true;
if (scope.nodes_with_duplicated_aliases.contains(node) || (allow_table_expression && isSubqueryNodeType(node->getNodeType())))
if (is_duplicated_alias || (allow_table_expression && isSubqueryNodeType(node->getNodeType())))
use_alias_table = false;
if (!node_alias.empty() && use_alias_table)
@ -6552,6 +6585,9 @@ ProjectionNames QueryAnalyzer::resolveExpressionNode(QueryTreeNodePtr & node, Id
}
}
if (is_duplicated_alias)
scope.non_cached_identifier_lookups_during_expression_resolve.erase({Identifier{node_alias}, IdentifierLookupContext::EXPRESSION});
resolved_expressions.emplace(node, result_projection_names);
scope.popExpressionNode();
@ -6584,7 +6620,6 @@ ProjectionNames QueryAnalyzer::resolveExpressionNodeList(QueryTreeNodePtr & node
{
auto node_to_resolve = node;
auto expression_node_projection_names = resolveExpressionNode(node_to_resolve, scope, allow_lambda_expression, allow_table_expression);
size_t expected_projection_names_size = 1;
if (auto * expression_list = node_to_resolve->as<ListNode>())
{
@ -8043,7 +8078,12 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
window_node_typed.setParentWindowName({});
}
scope.window_name_to_window_node.emplace(window_node_typed.getAlias(), window_node);
auto [_, inserted] = scope.window_name_to_window_node.emplace(window_node_typed.getAlias(), window_node);
if (!inserted)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Window '{}' is already defined. In scope {}",
window_node_typed.getAlias(),
scope.scope_node->formatASTForErrorMessage());
}
/** Disable identifier cache during JOIN TREE resolve.
@ -8187,10 +8227,13 @@ void QueryAnalyzer::resolveQuery(const QueryTreeNodePtr & query_node, Identifier
* After scope nodes are resolved, we can compare node with duplicate alias with
* node from scope alias table.
*/
for (const auto & node_with_duplicated_alias : scope.nodes_with_duplicated_aliases)
for (const auto & node_with_duplicated_alias : scope.cloned_nodes_with_duplicated_aliases)
{
auto node = node_with_duplicated_alias;
auto node_alias = node->getAlias();
/// Add current alias to non cached set, because in case of cyclic alias identifier should not be substituted from cache.
/// See 02896_cyclic_aliases_crash.
resolveExpressionNode(node, scope, true /*allow_lambda_expression*/, false /*allow_table_expression*/);
bool has_node_in_alias_table = false;

View File

@ -26,6 +26,10 @@ namespace
void validateFilter(const QueryTreeNodePtr & filter_node, std::string_view exception_place_message, const QueryTreeNodePtr & query_node)
{
if (filter_node->getNodeType() == QueryTreeNodeType::LIST)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Unsupported expression '{}' in filter", filter_node->formatASTForErrorMessage());
auto filter_node_result_type = filter_node->getResultType();
if (!filter_node_result_type->canBeUsedInBooleanContext())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER,

View File

@ -1242,8 +1242,9 @@ void QueryFuzzer::fuzz(ASTPtr & ast)
}
else if (auto * explain_query = typeid_cast<ASTExplainQuery *>(ast.get()))
{
const auto & explained_query = explain_query->getExplainedQuery();
/// Fuzzing EXPLAIN query to SELECT query randomly
if (fuzz_rand() % 20 == 0 && explain_query->getExplainedQuery()->getQueryKind() == IAST::QueryKind::Select)
if (explained_query && explained_query->getQueryKind() == IAST::QueryKind::Select && fuzz_rand() % 20 == 0)
{
auto select_query = explain_query->getExplainedQuery()->clone();
fuzz(select_query);

View File

@ -28,7 +28,7 @@ namespace ErrorCodes
extern const int USER_SESSION_LIMIT_EXCEEDED;
}
static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggestion)
static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggestion, UInt64 server_revision)
{
/// NOTE: Once you will update the completion list,
/// do not forget to update 01676_clickhouse_client_autocomplete.sh
@ -60,7 +60,9 @@ static String getLoadSuggestionQuery(Int32 suggestion_limit, bool basic_suggesti
add_column("name", "data_type_families", false, {});
add_column("name", "merge_tree_settings", false, {});
add_column("name", "settings", false, {});
add_column("keyword", "keywords", false, {});
if (server_revision >= DBMS_MIN_REVISION_WITH_SYSTEM_KEYWORDS_TABLE)
add_column("keyword", "keywords", false, {});
if (!basic_suggestion)
{
@ -101,7 +103,11 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p
auto connection = ConnectionType::createConnection(connection_parameters, my_context);
fetch(*connection,
connection_parameters.timeouts,
getLoadSuggestionQuery(suggestion_limit, std::is_same_v<ConnectionType, LocalConnection>),
getLoadSuggestionQuery(
suggestion_limit,
std::is_same_v<ConnectionType, LocalConnection>,
connection->getServerRevision(connection_parameters.timeouts)
),
my_context->getClientInfo());
}
catch (const Exception & e)
@ -146,7 +152,7 @@ void Suggest::load(IServerConnection & connection,
{
try
{
fetch(connection, timeouts, getLoadSuggestionQuery(suggestion_limit, true), client_info);
fetch(connection, timeouts, getLoadSuggestionQuery(suggestion_limit, true, connection.getServerRevision(timeouts)), client_info);
}
catch (...)
{

View File

@ -1,7 +1,6 @@
#pragma once
#include <cstring>
#include <cassert>
#include <Columns/IColumn.h>
#include <Columns/IColumnImpl.h>
@ -12,6 +11,8 @@
#include <Common/assert_cast.h>
#include <Core/Field.h>
#include <base/defines.h>
class Collator;
@ -42,7 +43,11 @@ private:
size_t ALWAYS_INLINE offsetAt(ssize_t i) const { return offsets[i - 1]; }
/// Size of i-th element, including terminating zero.
size_t ALWAYS_INLINE sizeAt(ssize_t i) const { return offsets[i] - offsets[i - 1]; }
size_t ALWAYS_INLINE sizeAt(ssize_t i) const
{
chassert(offsets[i] > offsets[i - 1]);
return offsets[i] - offsets[i - 1];
}
struct ComparatorBase;
@ -79,7 +84,7 @@ public:
size_t byteSizeAt(size_t n) const override
{
assert(n < size());
chassert(n < size());
return sizeAt(n) + sizeof(offsets[0]);
}
@ -94,25 +99,25 @@ public:
Field operator[](size_t n) const override
{
assert(n < size());
chassert(n < size());
return Field(&chars[offsetAt(n)], sizeAt(n) - 1);
}
void get(size_t n, Field & res) const override
{
assert(n < size());
chassert(n < size());
res = std::string_view{reinterpret_cast<const char *>(&chars[offsetAt(n)]), sizeAt(n) - 1};
}
StringRef getDataAt(size_t n) const override
{
assert(n < size());
chassert(n < size());
return StringRef(&chars[offsetAt(n)], sizeAt(n) - 1);
}
bool isDefaultAt(size_t n) const override
{
assert(n < size());
chassert(n < size());
return sizeAt(n) == 1;
}

View File

@ -21,5 +21,8 @@ template class ColumnUnique<ColumnFloat64>;
template class ColumnUnique<ColumnString>;
template class ColumnUnique<ColumnFixedString>;
template class ColumnUnique<ColumnDateTime64>;
template class ColumnUnique<ColumnIPv4>;
template class ColumnUnique<ColumnIPv6>;
template class ColumnUnique<ColumnUUID>;
}

View File

@ -173,11 +173,6 @@ namespace DB
return true;
}
void CaresPTRResolver::cancel_requests(ares_channel channel)
{
ares_cancel(channel);
}
std::span<pollfd> CaresPTRResolver::get_readable_sockets(int * sockets, pollfd * pollfd, ares_channel channel)
{
int sockets_bitmask = ares_getsock(channel, sockets, ARES_GETSOCK_MAXNUM);

View File

@ -44,8 +44,6 @@ namespace DB
private:
bool wait_and_process(ares_channel channel);
void cancel_requests(ares_channel channel);
void resolve(const std::string & ip, std::unordered_set<std::string> & response, ares_channel channel);
void resolve_v6(const std::string & ip, std::unordered_set<std::string> & response, ares_channel channel);

View File

@ -177,6 +177,9 @@
M(MergeTreeOutdatedPartsLoaderThreads, "Number of threads in the threadpool for loading Outdated data parts.") \
M(MergeTreeOutdatedPartsLoaderThreadsActive, "Number of active threads in the threadpool for loading Outdated data parts.") \
M(MergeTreeOutdatedPartsLoaderThreadsScheduled, "Number of queued or active jobs in the threadpool for loading Outdated data parts.") \
M(MergeTreeUnexpectedPartsLoaderThreads, "Number of threads in the threadpool for loading Unexpected data parts.") \
M(MergeTreeUnexpectedPartsLoaderThreadsActive, "Number of active threads in the threadpool for loading Unexpected data parts.") \
M(MergeTreeUnexpectedPartsLoaderThreadsScheduled, "Number of queued or active jobs in the threadpool for loading Unexpected data parts.") \
M(MergeTreePartsCleanerThreads, "Number of threads in the MergeTree parts cleaner thread pool.") \
M(MergeTreePartsCleanerThreadsActive, "Number of threads in the MergeTree parts cleaner thread pool running a task.") \
M(MergeTreePartsCleanerThreadsScheduled, "Number of queued or active jobs in the MergeTree parts cleaner thread pool.") \

View File

@ -198,12 +198,18 @@ size_t PageCache::getPinnedSize() const
PageCache::MemoryStats PageCache::getResidentSetSize() const
{
MemoryStats stats;
#ifdef OS_LINUX
if (use_madv_free)
{
std::unordered_set<UInt64> cache_mmap_addrs;
{
std::lock_guard lock(global_mutex);
/// Don't spend time on reading smaps if page cache is not used.
if (mmaps.empty())
return stats;
for (const auto & m : mmaps)
cache_mmap_addrs.insert(reinterpret_cast<UInt64>(m.ptr));
}
@ -258,7 +264,7 @@ PageCache::MemoryStats PageCache::getResidentSetSize() const
UInt64 addr = unhexUInt<UInt64>(s.c_str());
current_range_is_cache = cache_mmap_addrs.contains(addr);
}
else if (s == "Rss:" || s == "LazyFree")
else if (s == "Rss:" || s == "LazyFree:")
{
skip_whitespace();
size_t val;

View File

@ -625,6 +625,8 @@ The server successfully detected this situation and will download merged part fr
M(S3QueueSetFileProcessingMicroseconds, "Time spent to set file as processing")\
M(S3QueueSetFileProcessedMicroseconds, "Time spent to set file as processed")\
M(S3QueueSetFileFailedMicroseconds, "Time spent to set file as failed")\
M(S3QueueFailedFiles, "Number of files which failed to be processed")\
M(S3QueueProcessedFiles, "Number of files which were processed")\
M(S3QueueCleanupMaxSetSizeOrTTLMicroseconds, "Time spent to set file as failed")\
M(S3QueuePullMicroseconds, "Time spent to read file data")\
M(S3QueueLockLocalFileStatusesMicroseconds, "Time spent to lock local file statuses")\

View File

@ -6,7 +6,8 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <deque>
#include <boost/intrusive/list.hpp>
#include <mutex>
@ -15,6 +16,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INVALID_SCHEDULER_NODE;
}
@ -42,7 +44,7 @@ public:
std::lock_guard lock(mutex);
queue_cost += request->cost;
bool was_empty = requests.empty();
requests.push_back(request);
requests.push_back(*request);
if (was_empty)
scheduleActivation();
}
@ -52,7 +54,7 @@ public:
std::lock_guard lock(mutex);
if (requests.empty())
return {nullptr, false};
ResourceRequest * result = requests.front();
ResourceRequest * result = &requests.front();
requests.pop_front();
if (requests.empty())
busy_periods++;
@ -65,19 +67,24 @@ public:
bool cancelRequest(ResourceRequest * request) override
{
std::lock_guard lock(mutex);
// TODO(serxa): reimplement queue as intrusive list of ResourceRequest to make this O(1) instead of O(N)
for (auto i = requests.begin(), e = requests.end(); i != e; ++i)
if (request->is_linked())
{
if (*i == request)
{
requests.erase(i);
if (requests.empty())
busy_periods++;
queue_cost -= request->cost;
canceled_requests++;
canceled_cost += request->cost;
return true;
}
// It's impossible to check that `request` is indeed inserted to this queue and not another queue.
// It's up to caller to make sure this is the case. Otherwise, list sizes will be corrupted.
// Not tracking list sizes is not an option, because another problem appears: removing from list w/o locking.
// Another possible solution - keep track if request `is_cancelable` guarded by `mutex`
// Simple check for list size corruption
if (requests.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "trying to cancel request (linked into another queue) from empty queue: {}", getPath());
requests.erase(requests.iterator_to(*request));
if (requests.empty())
busy_periods++;
queue_cost -= request->cost;
canceled_requests++;
canceled_cost += request->cost;
return true;
}
return false;
}
@ -124,7 +131,7 @@ public:
private:
std::mutex mutex;
Int64 queue_cost = 0;
std::deque<ResourceRequest *> requests; // TODO(serxa): reimplement it using intrusive list to avoid allocations/deallocations and O(N) during cancel
boost::intrusive::list<ResourceRequest> requests;
};
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <boost/intrusive/list.hpp>
#include <base/types.h>
#include <limits>
@ -41,7 +42,7 @@ constexpr ResourceCost ResourceCostMax = std::numeric_limits<int>::max();
* Returning true means successful cancel and therefore steps (4) and (5) are not going to happen
* and step (6) MUST be omitted.
*/
class ResourceRequest
class ResourceRequest : public boost::intrusive::list_base_hook<>
{
public:
/// Cost of request execution; should be filled before request enqueueing.
@ -62,6 +63,7 @@ public:
{
cost = cost_;
constraint = nullptr;
// Note that list_base_hook should be reset independently (by intrusive list)
}
virtual ~ResourceRequest() = default;

View File

@ -51,16 +51,17 @@ namespace ErrorCodes
ThreadFuzzer::ThreadFuzzer()
{
initConfiguration();
if (needsSetup())
setup();
if (!isEffective())
{
/// It has no effect - disable it
stop();
return;
}
setup();
}
template <typename T>
static void initFromEnv(T & what, const char * name)
{
@ -133,10 +134,16 @@ void ThreadFuzzer::initConfiguration()
}
bool ThreadFuzzer::needsSetup() const
{
return cpu_time_period_us != 0
&& (yield_probability > 0 || migrate_probability > 0 || (sleep_probability > 0 && sleep_time_us_max > 0));
}
bool ThreadFuzzer::isEffective() const
{
if (!isStarted())
return false;
if (needsSetup())
return true;
#if THREAD_FUZZER_WRAP_PTHREAD
# define CHECK_WRAPPER_PARAMS(RET, NAME, ...) \
@ -163,10 +170,13 @@ bool ThreadFuzzer::isEffective() const
# undef INIT_WRAPPER_PARAMS
#endif
return cpu_time_period_us != 0
&& (yield_probability > 0
|| migrate_probability > 0
|| (sleep_probability > 0 && sleep_time_us_max > 0));
if (explicit_sleep_probability > 0 && sleep_time_us_max > 0)
return true;
if (explicit_memory_exception_probability > 0)
return true;
return false;
}
void ThreadFuzzer::stop()
@ -220,11 +230,9 @@ static void injectionImpl(
UNUSED(migrate_probability);
#endif
if (sleep_probability > 0
&& sleep_time_us_max > 0
&& std::bernoulli_distribution(sleep_probability)(thread_local_rng))
if (sleep_probability > 0 && sleep_time_us_max > 0.001 && std::bernoulli_distribution(sleep_probability)(thread_local_rng))
{
sleepForNanoseconds((thread_local_rng() % static_cast<uint64_t>(sleep_time_us_max)) * 1000); /*may sleep(0)*/
sleepForNanoseconds((thread_local_rng() % static_cast<uint64_t>(sleep_time_us_max * 1000)));
}
}

View File

@ -52,6 +52,7 @@ public:
}
bool isEffective() const;
bool needsSetup() const;
static void stop();
static void start();

View File

@ -5,7 +5,6 @@
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/noexcept_scope.h>
#include <cassert>
#include <type_traits>
#include <Poco/Util/Application.h>
@ -437,6 +436,11 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
/// We don't run jobs after `shutdown` is set, but we have to properly dequeue all jobs and finish them.
if (shutdown)
{
{
ALLOW_ALLOCATIONS_IN_SCOPE;
/// job can contain packaged_task which can set exception during destruction
job_data.reset();
}
job_is_done = true;
continue;
}

View File

@ -79,6 +79,8 @@ static constexpr auto DBMS_MIN_REVISION_WITH_SSH_AUTHENTICATION = 54466;
/// Send read-only flag for Replicated tables as well
static constexpr auto DBMS_MIN_REVISION_WITH_TABLE_READ_ONLY_CHECK = 54467;
static constexpr auto DBMS_MIN_REVISION_WITH_SYSTEM_KEYWORDS_TABLE = 54468;
/// Version of ClickHouse TCP protocol.
///
/// Should be incremented manually on protocol changes.
@ -86,6 +88,6 @@ static constexpr auto DBMS_MIN_REVISION_WITH_TABLE_READ_ONLY_CHECK = 54467;
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases).
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54467;
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54468;
}

View File

@ -25,6 +25,7 @@ namespace DB
M(UInt64, io_thread_pool_queue_size, 10000, "Queue size for IO thread pool.", 0) \
M(UInt64, max_active_parts_loading_thread_pool_size, 64, "The number of threads to load active set of data parts (Active ones) at startup.", 0) \
M(UInt64, max_outdated_parts_loading_thread_pool_size, 32, "The number of threads to load inactive set of data parts (Outdated ones) at startup.", 0) \
M(UInt64, max_unexpected_parts_loading_thread_pool_size, 8, "The number of threads to load inactive set of data parts (Unexpected ones) at startup.", 0) \
M(UInt64, max_parts_cleaning_thread_pool_size, 128, "The number of threads for concurrent removal of inactive data parts.", 0) \
M(UInt64, max_mutations_bandwidth_for_server, 0, "The maximum read speed of all mutations on server in bytes per second. Zero means unlimited.", 0) \
M(UInt64, max_merges_bandwidth_for_server, 0, "The maximum read speed of all merges on server in bytes per second. Zero means unlimited.", 0) \
@ -50,6 +51,7 @@ namespace DB
M(UInt64, max_temporary_data_on_disk_size, 0, "The maximum amount of storage that could be used for external aggregation, joins or sorting., ", 0) \
M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \
M(UInt64, aggregate_function_group_array_max_element_size, 0xFFFFFF, "Max array element size in bytes for groupArray function. This limit is checked at serialization and help to avoid large state size.", 0) \
M(Bool, aggregate_function_group_array_has_limit_size, false, "When the max array element size is exceeded, a `Too large array size` exception will be thrown by default. When set to true, no exception will be thrown, and the excess elements will be discarded.", 0) \
M(UInt64, max_server_memory_usage, 0, "Maximum total memory usage of the server in bytes. Zero means unlimited.", 0) \
M(Double, max_server_memory_usage_to_ram_ratio, 0.9, "Same as max_server_memory_usage but in to RAM ratio. Allows to lower max memory on low-memory systems.", 0) \
M(UInt64, merges_mutations_memory_usage_soft_limit, 0, "Maximum total memory usage for merges and mutations in bytes. Zero means unlimited.", 0) \
@ -57,7 +59,7 @@ namespace DB
M(Bool, allow_use_jemalloc_memory, true, "Allows to use jemalloc memory.", 0) \
M(UInt64, cgroups_memory_usage_observer_wait_time, 15, "Polling interval in seconds to read the current memory usage from cgroups. Zero means disabled.", 0) \
M(Double, cgroup_memory_watcher_hard_limit_ratio, 0.95, "Hard memory limit ratio for cgroup memory usage observer", 0) \
M(Double, cgroup_memory_watcher_soft_limit_ratio, 0.9, "Sort memory limit ratio limit for cgroup memory usage observer", 0) \
M(Double, cgroup_memory_watcher_soft_limit_ratio, 0.9, "Soft memory limit ratio limit for cgroup memory usage observer", 0) \
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \
M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \

View File

@ -799,8 +799,8 @@ class IColumn;
M(UInt64, filesystem_cache_max_download_size, (128UL * 1024 * 1024 * 1024), "Max remote filesystem cache size that can be downloaded by a single query", 0) \
M(Bool, throw_on_error_from_cache_on_write_operations, false, "Ignore error from cache when caching on write operations (INSERT, merges)", 0) \
M(UInt64, filesystem_cache_segments_batch_size, 20, "Limit on size of a single batch of file segments that a read buffer can request from cache. Too low value will lead to excessive requests to cache, too large may slow down eviction from cache", 0) \
M(UInt64, filesystem_cache_reserve_space_wait_lock_timeout_milliseconds, 1000, "Wait time to lock cache for sapce reservation in filesystem cache", 0) \
M(UInt64, temporary_data_in_cache_reserve_space_wait_lock_timeout_milliseconds, (10 * 60 * 1000), "Wait time to lock cache for sapce reservation for temporary data in filesystem cache", 0) \
M(UInt64, filesystem_cache_reserve_space_wait_lock_timeout_milliseconds, 1000, "Wait time to lock cache for space reservation in filesystem cache", 0) \
M(UInt64, temporary_data_in_cache_reserve_space_wait_lock_timeout_milliseconds, (10 * 60 * 1000), "Wait time to lock cache for space reservation for temporary data in filesystem cache", 0) \
\
M(Bool, use_page_cache_for_disks_without_file_cache, false, "Use userspace page cache for remote disks that don't have filesystem cache enabled.", 0) \
M(Bool, read_from_page_cache_if_exists_otherwise_bypass_cache, false, "Use userspace page cache in passive mode, similar to read_from_filesystem_cache_if_exists_otherwise_bypass_cache.", 0) \

View File

@ -92,7 +92,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"cross_join_min_bytes_to_compress", 0, 1_GiB, "A new setting."},
{"prefer_external_sort_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Prefer maximum block bytes for external sort, reduce the memory usage during merging."},
{"input_format_force_null_for_omitted_fields", false, false, "Disable type-defaults for omitted fields when needed"},
{"output_format_pretty_preserve_border_for_multiline_string", 1, 1, "Applies better rendering for multiline strings."},
{"output_format_pretty_preserve_border_for_multiline_string", 0, 1, "Applies better rendering for multiline strings."},
}},
{"24.4", {{"input_format_json_throw_on_bad_escape_sequence", true, true, "Allow to save JSON strings with bad escape sequences"},
{"max_parsing_threads", 0, 0, "Add a separate setting to control number of threads in parallel parsing from files"},

View File

@ -13,6 +13,7 @@ class ASTStorage;
M(UInt64, wait_entry_commited_timeout_sec, 3600, "Replicas will try to cancel query if timeout exceed, but initiator host has not executed it yet", 0) \
M(String, collection_name, "", "A name of a collection defined in server's config where all info for cluster authentication is defined", 0) \
M(Bool, check_consistency, true, "Check consistency of local metadata and metadata in Keeper, do replica recovery on inconsistency", 0) \
M(UInt64, max_retries_before_automatic_recovery, 100, "Max number of attempts to execute a queue entry before marking replica as lost recovering it from snapshot (0 means infinite)", 0) \
DECLARE_SETTINGS_TRAITS(DatabaseReplicatedSettingsTraits, LIST_OF_DATABASE_REPLICATED_SETTINGS)

View File

@ -18,6 +18,8 @@ namespace ErrorCodes
extern const int UNFINISHED;
}
static constexpr const char * FORCE_AUTO_RECOVERY_DIGEST = "42";
DatabaseReplicatedDDLWorker::DatabaseReplicatedDDLWorker(DatabaseReplicated * db, ContextPtr context_)
: DDLWorker(/* pool_size */ 1, db->zookeeper_path + "/log", context_, nullptr, {}, fmt::format("DDLWorker({})", db->getDatabaseName()))
, database(db)
@ -44,6 +46,26 @@ bool DatabaseReplicatedDDLWorker::initializeMainThread()
/// NOTE It will not stop cleanup thread until DDLWorker::shutdown() call (cleanup thread will just do nothing)
break;
}
if (database->db_settings.max_retries_before_automatic_recovery &&
database->db_settings.max_retries_before_automatic_recovery <= subsequent_errors_count)
{
String current_task_name;
{
std::unique_lock lock{mutex};
current_task_name = current_task;
}
LOG_WARNING(log, "Database got stuck at processing task {}: it failed {} times in a row with the same error. "
"Will reset digest to mark our replica as lost, and trigger recovery from the most up-to-date metadata "
"from ZooKeeper. See max_retries_before_automatic_recovery setting. The error: {}",
current_task, subsequent_errors_count, last_unexpected_error);
String digest_str;
zookeeper->tryGet(database->replica_path + "/digest", digest_str);
LOG_WARNING(log, "Resetting digest from {} to {}", digest_str, FORCE_AUTO_RECOVERY_DIGEST);
zookeeper->trySet(database->replica_path + "/digest", FORCE_AUTO_RECOVERY_DIGEST);
}
initializeReplication();
initialized = true;
return true;

View File

@ -139,7 +139,11 @@ namespace
S3::URI getS3URI(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const ContextPtr & context)
{
String endpoint = context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
S3::URI uri(endpoint);
String endpoint_subpath;
if (config.has(config_prefix + ".endpoint_subpath"))
endpoint_subpath = context->getMacros()->expand(config.getString(config_prefix + ".endpoint_subpath"));
S3::URI uri(fs::path(endpoint) / endpoint_subpath);
/// An empty key remains empty.
if (!uri.key.empty() && !uri.key.ends_with('/'))

View File

@ -1,10 +1,12 @@
#pragma once
#include <cstddef>
#include <Storages/NamedCollectionsHelpers.h>
#include <IO/WriteBufferFromString.h>
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NOT_IMPLEMENTED;
}
enum class NumpyDataTypeIndex : uint8_t
@ -29,9 +31,9 @@ class NumpyDataType
public:
enum Endianness
{
LITTLE,
BIG,
NONE,
LITTLE = '<',
BIG = '>',
NONE = '|',
};
NumpyDataTypeIndex type_index;
@ -41,15 +43,18 @@ public:
Endianness getEndianness() const { return endianness; }
virtual NumpyDataTypeIndex getTypeIndex() const = 0;
virtual size_t getSize() const { throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Function getSize() is not implemented"); }
virtual void setSize(size_t) { throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Function setSize() is not implemented"); }
virtual String str() const { throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Function str() is not implemented"); }
private:
protected:
Endianness endianness;
};
class NumpyDataTypeInt : public NumpyDataType
{
public:
NumpyDataTypeInt(Endianness endianness, size_t size_, bool is_signed_) : NumpyDataType(endianness), size(size_), is_signed(is_signed_)
NumpyDataTypeInt(Endianness endianness_, size_t size_, bool is_signed_) : NumpyDataType(endianness_), size(size_), is_signed(is_signed_)
{
switch (size)
{
@ -67,6 +72,14 @@ public:
return type_index;
}
bool isSigned() const { return is_signed; }
String str() const override
{
DB::WriteBufferFromOwnString buf;
writeChar(static_cast<char>(endianness), buf);
writeChar(is_signed ? 'i' : 'u', buf);
writeIntText(size, buf);
return buf.str();
}
private:
size_t size;
@ -76,7 +89,7 @@ private:
class NumpyDataTypeFloat : public NumpyDataType
{
public:
NumpyDataTypeFloat(Endianness endianness, size_t size_) : NumpyDataType(endianness), size(size_)
NumpyDataTypeFloat(Endianness endianness_, size_t size_) : NumpyDataType(endianness_), size(size_)
{
switch (size)
{
@ -92,6 +105,14 @@ public:
{
return type_index;
}
String str() const override
{
DB::WriteBufferFromOwnString buf;
writeChar(static_cast<char>(endianness), buf);
writeChar('f', buf);
writeIntText(size, buf);
return buf.str();
}
private:
size_t size;
};
@ -99,13 +120,22 @@ private:
class NumpyDataTypeString : public NumpyDataType
{
public:
NumpyDataTypeString(Endianness endianness, size_t size_) : NumpyDataType(endianness), size(size_)
NumpyDataTypeString(Endianness endianness_, size_t size_) : NumpyDataType(endianness_), size(size_)
{
type_index = NumpyDataTypeIndex::String;
}
NumpyDataTypeIndex getTypeIndex() const override { return type_index; }
size_t getSize() const { return size; }
size_t getSize() const override { return size; }
void setSize(size_t size_) override { size = size_; }
String str() const override
{
DB::WriteBufferFromOwnString buf;
writeChar(static_cast<char>(endianness), buf);
writeChar('S', buf);
writeIntText(size, buf);
return buf.str();
}
private:
size_t size;
};
@ -113,13 +143,13 @@ private:
class NumpyDataTypeUnicode : public NumpyDataType
{
public:
NumpyDataTypeUnicode(Endianness endianness, size_t size_) : NumpyDataType(endianness), size(size_)
NumpyDataTypeUnicode(Endianness endianness_, size_t size_) : NumpyDataType(endianness_), size(size_)
{
type_index = NumpyDataTypeIndex::Unicode;
}
NumpyDataTypeIndex getTypeIndex() const override { return type_index; }
size_t getSize() const { return size * 4; }
size_t getSize() const override { return size * 4; }
private:
size_t size;
};

View File

@ -76,6 +76,8 @@ void registerInputFormatCustomSeparated(FormatFactory & factory);
void registerOutputFormatCustomSeparated(FormatFactory & factory);
void registerInputFormatCapnProto(FormatFactory & factory);
void registerOutputFormatCapnProto(FormatFactory & factory);
void registerInputFormatNpy(FormatFactory & factory);
void registerOutputFormatNpy(FormatFactory & factory);
void registerInputFormatForm(FormatFactory & factory);
/// Output only (presentational) formats.
@ -104,7 +106,6 @@ void registerInputFormatMySQLDump(FormatFactory & factory);
void registerInputFormatParquetMetadata(FormatFactory & factory);
void registerInputFormatDWARF(FormatFactory & factory);
void registerInputFormatOne(FormatFactory & factory);
void registerInputFormatNpy(FormatFactory & factory);
#if USE_HIVE
void registerInputFormatHiveText(FormatFactory & factory);
@ -224,6 +225,8 @@ void registerFormats()
registerOutputFormatAvro(factory);
registerInputFormatArrow(factory);
registerOutputFormatArrow(factory);
registerInputFormatNpy(factory);
registerOutputFormatNpy(factory);
registerOutputFormatPretty(factory);
registerOutputFormatPrettyCompact(factory);
@ -254,7 +257,6 @@ void registerFormats()
registerInputFormatParquetMetadata(factory);
registerInputFormatDWARF(factory);
registerInputFormatOne(factory);
registerInputFormatNpy(factory);
registerNonTrivialPrefixAndSuffixCheckerJSONEachRow(factory);
registerNonTrivialPrefixAndSuffixCheckerJSONAsString(factory);

View File

@ -80,7 +80,7 @@ ColumnWithTypeAndName columnGetNested(const ColumnWithTypeAndName & col)
return ColumnWithTypeAndName{ nullable_res, nested_type, col.name };
}
else
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column for DataTypeNullable");
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} for DataTypeNullable", col.dumpStructure());
}
return col;
}

View File

@ -7,6 +7,7 @@
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnNothing.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnObject.h>
#include <Columns/ColumnString.h>
@ -3770,6 +3771,12 @@ private:
}
else if (const auto * from_array = typeid_cast<const DataTypeArray *>(from_type_untyped.get()))
{
if (typeid_cast<const DataTypeNothing *>(from_array->getNestedType().get()))
return [nested = to_type->getNestedType()](ColumnsWithTypeAndName &, const DataTypePtr &, const ColumnNullable *, size_t size)
{
return ColumnMap::create(nested->createColumnConstWithDefaultValue(size)->convertToFullColumnIfConst());
};
const auto * nested_tuple = typeid_cast<const DataTypeTuple *>(from_array->getNestedType().get());
if (!nested_tuple || nested_tuple->getElements().size() != 2)
throw Exception(

69
src/Functions/clamp.cpp Normal file
View File

@ -0,0 +1,69 @@
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <DataTypes/getLeastSupertype.h>
#include <Interpreters/castColumn.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
}
class FunctionClamp : public IFunction
{
public:
static constexpr auto name = "clamp";
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 3; }
bool useDefaultImplementationForConstants() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionClamp>(); }
DataTypePtr getReturnTypeImpl(const DataTypes & types) const override
{
if (types.size() != 3)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} requires 3 arguments", getName());
return getLeastSupertype(types);
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
size_t arg_size = arguments.size();
Columns converted_columns(arg_size);
for (size_t arg = 0; arg < arg_size; ++arg)
converted_columns[arg] = castColumn(arguments[arg], result_type)->convertToFullColumnIfConst();
auto result_column = result_type->createColumn();
for (size_t row_num = 0; row_num < input_rows_count; ++row_num)
{
if (converted_columns[1]->compareAt(row_num, row_num, *converted_columns[2], 1) > 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The minimum value cannot be greater than the maximum value for function {}", getName());
size_t best_arg = 0;
if (converted_columns[1]->compareAt(row_num, row_num, *converted_columns[best_arg], 1) > 0)
best_arg = 1;
else if (converted_columns[2]->compareAt(row_num, row_num, *converted_columns[best_arg], 1) < 0)
best_arg = 2;
result_column->insertFrom(*converted_columns[best_arg], row_num);
}
return result_column;
}
};
REGISTER_FUNCTION(Clamp)
{
factory.registerFunction<FunctionClamp>();
}
}

View File

@ -73,8 +73,9 @@ bool ParallelReadBuffer::addReaderToPool()
auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(input, range_start, size));
++active_working_readers;
schedule([this, my_worker = std::move(worker)]() mutable { readerThreadFunction(std::move(my_worker)); }, Priority{});
/// increase number of workers only after we are sure that the reader was scheduled
++active_working_readers;
return true;
}

View File

@ -191,10 +191,14 @@ size_t ReadBufferFromS3::readBigAt(char * to, size_t n, size_t range_begin, cons
result = sendRequest(attempt, range_begin, range_begin + n - 1);
std::istream & istr = result->GetBody();
copyFromIStreamWithProgressCallback(istr, to, n, progress_callback, &bytes_copied);
bool cancelled = false;
copyFromIStreamWithProgressCallback(istr, to, n, progress_callback, &bytes_copied, &cancelled);
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3Bytes, bytes_copied);
if (cancelled)
return initial_n - n + bytes_copied;
if (read_settings.remote_throttler)
read_settings.remote_throttler->add(bytes_copied, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds);

View File

@ -1,8 +1,7 @@
#include <IO/S3/URI.h>
#include <Poco/URI.h>
#include "Common/Macros.h"
#include <Interpreters/Context.h>
#include <Storages/NamedCollectionsHelpers.h>
#include "Common/Macros.h"
#if USE_AWS_S3
#include <Common/Exception.h>
#include <Common/quoteString.h>
@ -55,7 +54,11 @@ URI::URI(const std::string & uri_)
static constexpr auto OSS = "OSS";
static constexpr auto EOS = "EOS";
uri = Poco::URI(uri_);
if (containsArchive(uri_))
std::tie(uri_str, archive_pattern) = getPathToArchiveAndArchivePattern(uri_);
else
uri_str = uri_;
uri = Poco::URI(uri_str);
std::unordered_map<std::string, std::string> mapper;
auto context = Context::getGlobalContextInstance();
@ -126,9 +129,10 @@ URI::URI(const std::string & uri_)
boost::to_upper(name);
/// For S3Express it will look like s3express-eun1-az1, i.e. contain region and AZ info
if (name != S3 && !name.starts_with(S3EXPRESS) && name != COS && name != OBS && name != OSS && name != EOS)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Object storage system name is unrecognized in virtual hosted style S3 URI: {}",
quoteString(name));
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Object storage system name is unrecognized in virtual hosted style S3 URI: {}",
quoteString(name));
if (name == COS)
storage_name = COSN;
@ -156,10 +160,40 @@ void URI::validateBucket(const String & bucket, const Poco::URI & uri)
/// S3 specification requires at least 3 and at most 63 characters in bucket name.
/// https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-s3-bucket-naming-requirements.html
if (bucket.length() < 3 || bucket.length() > 63)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bucket name length is out of bounds in virtual hosted style S3 URI: {}{}",
quoteString(bucket), !uri.empty() ? " (" + uri.toString() + ")" : "");
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Bucket name length is out of bounds in virtual hosted style S3 URI: {}{}",
quoteString(bucket),
!uri.empty() ? " (" + uri.toString() + ")" : "");
}
bool URI::containsArchive(const std::string & source)
{
size_t pos = source.find("::");
return (pos != std::string::npos);
}
std::pair<std::string, std::string> URI::getPathToArchiveAndArchivePattern(const std::string & source)
{
size_t pos = source.find("::");
assert(pos != std::string::npos);
std::string path_to_archive = source.substr(0, pos);
while ((!path_to_archive.empty()) && path_to_archive.ends_with(' '))
path_to_archive.pop_back();
if (path_to_archive.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path to archive is empty");
std::string_view path_in_archive_view = std::string_view{source}.substr(pos + 2);
while (path_in_archive_view.front() == ' ')
path_in_archive_view.remove_prefix(1);
if (path_in_archive_view.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Filename is empty");
return {path_to_archive, std::string{path_in_archive_view}};
}
}
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <optional>
#include <string>
#include "config.h"
@ -28,6 +29,8 @@ struct URI
std::string key;
std::string version_id;
std::string storage_name;
std::optional<std::string> archive_pattern;
std::string uri_str;
bool is_virtual_hosted_style;
@ -36,6 +39,10 @@ struct URI
void addRegionToURI(const std::string & region);
static void validateBucket(const std::string & bucket, const Poco::URI & uri);
private:
bool containsArchive(const std::string & source);
std::pair<std::string, std::string> getPathToArchiveAndArchivePattern(const std::string & source);
};
}

View File

@ -20,6 +20,9 @@ namespace CurrentMetrics
extern const Metric MergeTreeOutdatedPartsLoaderThreads;
extern const Metric MergeTreeOutdatedPartsLoaderThreadsActive;
extern const Metric MergeTreeOutdatedPartsLoaderThreadsScheduled;
extern const Metric MergeTreeUnexpectedPartsLoaderThreads;
extern const Metric MergeTreeUnexpectedPartsLoaderThreadsActive;
extern const Metric MergeTreeUnexpectedPartsLoaderThreadsScheduled;
extern const Metric DatabaseReplicatedCreateTablesThreads;
extern const Metric DatabaseReplicatedCreateTablesThreadsActive;
extern const Metric DatabaseReplicatedCreateTablesThreadsScheduled;
@ -151,6 +154,12 @@ StaticThreadPool & getOutdatedPartsLoadingThreadPool()
return instance;
}
StaticThreadPool & getUnexpectedPartsLoadingThreadPool()
{
static StaticThreadPool instance("MergeTreeUnexpectedPartsLoaderThreadPool", CurrentMetrics::MergeTreeUnexpectedPartsLoaderThreads, CurrentMetrics::MergeTreeUnexpectedPartsLoaderThreadsActive, CurrentMetrics::MergeTreeUnexpectedPartsLoaderThreadsScheduled);
return instance;
}
StaticThreadPool & getDatabaseReplicatedCreateTablesThreadPool()
{
static StaticThreadPool instance("CreateTablesThreadPool", CurrentMetrics::DatabaseReplicatedCreateTablesThreads, CurrentMetrics::DatabaseReplicatedCreateTablesThreadsActive, CurrentMetrics::DatabaseReplicatedCreateTablesThreadsScheduled);

View File

@ -64,6 +64,8 @@ StaticThreadPool & getPartsCleaningThreadPool();
/// the number of threads by calling enableTurboMode() :-)
StaticThreadPool & getOutdatedPartsLoadingThreadPool();
StaticThreadPool & getUnexpectedPartsLoadingThreadPool();
/// ThreadPool used for creating tables in DatabaseReplicated.
StaticThreadPool & getDatabaseReplicatedCreateTablesThreadPool();

View File

@ -21,6 +21,9 @@
#include <base/sort.h>
#include <Common/JSONBuilder.h>
#include <absl/container/flat_hash_map.h>
#include <absl/container/inlined_vector.h>
namespace DB
{
@ -708,16 +711,18 @@ static ColumnWithTypeAndName executeActionForPartialResult(const ActionsDAG::Nod
return res_column;
}
Block ActionsDAG::updateHeader(Block header) const
Block ActionsDAG::updateHeader(const Block & header) const
{
IntermediateExecutionResult node_to_column;
std::set<size_t> pos_to_remove;
{
std::unordered_map<std::string_view, std::list<size_t>> input_positions;
using inline_vector = absl::InlinedVector<size_t, 7>; // 64B, holding max 7 size_t elements inlined
absl::flat_hash_map<std::string_view, inline_vector> input_positions;
for (size_t pos = 0; pos < inputs.size(); ++pos)
input_positions[inputs[pos]->result_name].emplace_back(pos);
/// We insert from last to first in the inlinedVector so it's easier to pop_back matches later
for (size_t pos = inputs.size(); pos != 0; pos--)
input_positions[inputs[pos - 1]->result_name].emplace_back(pos - 1);
for (size_t pos = 0; pos < header.columns(); ++pos)
{
@ -725,10 +730,11 @@ Block ActionsDAG::updateHeader(Block header) const
auto it = input_positions.find(col.name);
if (it != input_positions.end() && !it->second.empty())
{
auto & list = it->second;
pos_to_remove.insert(pos);
node_to_column[inputs[list.front()]] = col;
list.pop_front();
auto & v = it->second;
node_to_column[inputs[v.back()]] = col;
v.pop_back();
}
}
}
@ -746,18 +752,21 @@ Block ActionsDAG::updateHeader(Block header) const
throw;
}
if (isInputProjected())
header.clear();
else
header.erase(pos_to_remove);
Block res;
res.reserve(result_columns.size());
for (auto & col : result_columns)
res.insert(std::move(col));
for (auto && item : header)
res.insert(std::move(item));
if (isInputProjected())
return res;
res.reserve(header.columns() - pos_to_remove.size());
for (size_t i = 0; i < header.columns(); i++)
{
if (!pos_to_remove.contains(i))
res.insert(header.data[i]);
}
return res;
}

View File

@ -272,7 +272,7 @@ public:
///
/// In addition, check that result constants are constants according to DAG.
/// In case if function return constant, but arguments are not constant, materialize it.
Block updateHeader(Block header) const;
Block updateHeader(const Block & header) const;
using IntermediateExecutionResult = std::unordered_map<const Node *, ColumnWithTypeAndName>;
static ColumnsWithTypeAndName evaluatePartialResult(

View File

@ -667,11 +667,7 @@ namespace
using TimePoint = std::chrono::time_point<std::chrono::system_clock>;
void appendElementsToLogSafe(
AsynchronousInsertLog & log,
std::vector<AsynchronousInsertLogElement> elements,
TimePoint flush_time,
const String & flush_query_id,
const String & flush_exception)
AsynchronousInsertLog & log, std::vector<AsynchronousInsertLogElement> elements, TimePoint flush_time, const String & flush_exception)
try
{
using Status = AsynchronousInsertLogElement::Status;
@ -680,7 +676,6 @@ try
{
elem.flush_time = timeInSeconds(flush_time);
elem.flush_time_microseconds = timeInMicroseconds(flush_time);
elem.flush_query_id = flush_query_id;
elem.exception = flush_exception;
elem.status = flush_exception.empty() ? Status::Ok : Status::FlushError;
log.add(std::move(elem));
@ -808,12 +803,12 @@ try
throw;
}
auto add_entry_to_log = [&](const auto & entry,
const auto & entry_query_for_logging,
const auto & exception,
size_t num_rows,
size_t num_bytes,
Milliseconds timeout_ms)
auto add_entry_to_asynchronous_insert_log = [&](const auto & entry,
const auto & entry_query_for_logging,
const auto & exception,
size_t num_rows,
size_t num_bytes,
Milliseconds timeout_ms)
{
if (!async_insert_log)
return;
@ -831,6 +826,7 @@ try
elem.exception = exception;
elem.data_kind = entry->chunk.getDataKind();
elem.timeout_milliseconds = timeout_ms.count();
elem.flush_query_id = insert_query_id;
/// If there was a parsing error,
/// the entry won't be flushed anyway,
@ -857,7 +853,7 @@ try
if (!log_elements.empty())
{
auto flush_time = std::chrono::system_clock::now();
appendElementsToLogSafe(*async_insert_log, std::move(log_elements), flush_time, insert_query_id, "");
appendElementsToLogSafe(*async_insert_log, std::move(log_elements), flush_time, "");
}
};
@ -865,15 +861,27 @@ try
auto header = pipeline.getHeader();
if (key.data_kind == DataKind::Parsed)
chunk = processEntriesWithParsing(key, data, header, insert_context, log, add_entry_to_log);
chunk = processEntriesWithParsing(key, data, header, insert_context, log, add_entry_to_asynchronous_insert_log);
else
chunk = processPreprocessedEntries(key, data, header, insert_context, add_entry_to_log);
chunk = processPreprocessedEntries(key, data, header, insert_context, add_entry_to_asynchronous_insert_log);
ProfileEvents::increment(ProfileEvents::AsyncInsertRows, chunk.getNumRows());
auto log_and_add_finish_to_query_log = [&](size_t num_rows, size_t num_bytes)
{
LOG_DEBUG(log, "Flushed {} rows, {} bytes for query '{}'", num_rows, num_bytes, key.query_str);
queue_shard_flush_time_history.updateWithCurrentTime();
bool pulling_pipeline = false;
logQueryFinish(
query_log_elem, insert_context, key.query, pipeline, pulling_pipeline, query_span, QueryCache::Usage::None, internal);
};
if (chunk.getNumRows() == 0)
{
finish_entries();
log_and_add_finish_to_query_log(0, 0);
return;
}
@ -888,12 +896,7 @@ try
CompletedPipelineExecutor completed_executor(pipeline);
completed_executor.execute();
LOG_INFO(log, "Flushed {} rows, {} bytes for query '{}'", num_rows, num_bytes, key.query_str);
queue_shard_flush_time_history.updateWithCurrentTime();
bool pulling_pipeline = false;
logQueryFinish(query_log_elem, insert_context, key.query, pipeline, pulling_pipeline, query_span, QueryCache::Usage::None, internal);
log_and_add_finish_to_query_log(num_rows, num_bytes);
}
catch (...)
{
@ -903,7 +906,7 @@ try
{
auto exception = getCurrentExceptionMessage(false);
auto flush_time = std::chrono::system_clock::now();
appendElementsToLogSafe(*async_insert_log, std::move(log_elements), flush_time, insert_query_id, exception);
appendElementsToLogSafe(*async_insert_log, std::move(log_elements), flush_time, exception);
}
throw;
}

View File

@ -1607,6 +1607,21 @@ Tables Context::getExternalTables() const
void Context::addExternalTable(const String & table_name, TemporaryTableHolder && temporary_table)
{
addExternalTable(table_name, std::make_shared<TemporaryTableHolder>(std::move(temporary_table)));
}
void Context::updateExternalTable(const String & table_name, TemporaryTableHolder && temporary_table)
{
updateExternalTable(table_name, std::make_shared<TemporaryTableHolder>(std::move(temporary_table)));
}
void Context::addOrUpdateExternalTable(const String & table_name, TemporaryTableHolder && temporary_table)
{
addOrUpdateExternalTable(table_name, std::make_shared<TemporaryTableHolder>(std::move(temporary_table)));
}
void Context::addExternalTable(const String & table_name, std::shared_ptr<TemporaryTableHolder> temporary_table)
{
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have external tables");
@ -1614,34 +1629,32 @@ void Context::addExternalTable(const String & table_name, TemporaryTableHolder &
std::lock_guard lock(mutex);
if (external_tables_mapping.end() != external_tables_mapping.find(table_name))
throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Temporary table {} already exists", backQuoteIfNeed(table_name));
external_tables_mapping.emplace(table_name, std::make_shared<TemporaryTableHolder>(std::move(temporary_table)));
external_tables_mapping.emplace(table_name, std::move(temporary_table));
}
void Context::updateExternalTable(const String & table_name, TemporaryTableHolder && temporary_table)
void Context::updateExternalTable(const String & table_name, std::shared_ptr<TemporaryTableHolder> temporary_table)
{
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have external tables");
auto temporary_table_ptr = std::make_shared<TemporaryTableHolder>(std::move(temporary_table));
std::lock_guard lock(mutex);
auto it = external_tables_mapping.find(table_name);
if (it == external_tables_mapping.end())
throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Temporary table {} does not exists", backQuoteIfNeed(table_name));
it->second = std::move(temporary_table_ptr);
it->second = std::move(temporary_table);
}
void Context::addOrUpdateExternalTable(const String & table_name, TemporaryTableHolder && temporary_table)
void Context::addOrUpdateExternalTable(const String & table_name, std::shared_ptr<TemporaryTableHolder> temporary_table)
{
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have external tables");
auto temporary_table_ptr = std::make_shared<TemporaryTableHolder>(std::move(temporary_table));
std::lock_guard lock(mutex);
auto [it, inserted] = external_tables_mapping.emplace(table_name, temporary_table_ptr);
auto [it, inserted] = external_tables_mapping.emplace(table_name, temporary_table);
if (!inserted)
it->second = std::move(temporary_table_ptr);
it->second = std::move(temporary_table);
}
std::shared_ptr<TemporaryTableHolder> Context::findExternalTable(const String & table_name) const
@ -4467,7 +4480,7 @@ void Context::setApplicationType(ApplicationType type)
/// Lock isn't required, you should set it at start
shared->application_type = type;
if (type == ApplicationType::LOCAL || type == ApplicationType::SERVER)
if (type == ApplicationType::LOCAL || type == ApplicationType::SERVER || type == ApplicationType::DISKS)
shared->server_settings.loadSettingsFromConfig(Poco::Util::Application::instance().config());
if (type == ApplicationType::SERVER)

View File

@ -685,6 +685,9 @@ public:
void addExternalTable(const String & table_name, TemporaryTableHolder && temporary_table);
void updateExternalTable(const String & table_name, TemporaryTableHolder && temporary_table);
void addOrUpdateExternalTable(const String & table_name, TemporaryTableHolder && temporary_table);
void addExternalTable(const String & table_name, std::shared_ptr<TemporaryTableHolder> temporary_table);
void updateExternalTable(const String & table_name, std::shared_ptr<TemporaryTableHolder> temporary_table);
void addOrUpdateExternalTable(const String & table_name, std::shared_ptr<TemporaryTableHolder> temporary_table);
std::shared_ptr<TemporaryTableHolder> findExternalTable(const String & table_name) const;
std::shared_ptr<TemporaryTableHolder> removeExternalTable(const String & table_name);

View File

@ -676,7 +676,8 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper)
if (task.execution_status.code != 0)
{
bool status_written_by_table_or_db = task.ops.empty();
if (status_written_by_table_or_db)
bool is_replicated_database_task = dynamic_cast<DatabaseReplicatedTask *>(&task);
if (status_written_by_table_or_db || is_replicated_database_task)
{
throw Exception(ErrorCodes::UNFINISHED, "Unexpected error: {}", task.execution_status.message);
}
@ -710,6 +711,7 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper)
task.createSyncedNodeIfNeed(zookeeper);
updateMaxDDLEntryID(task.entry_name);
task.completely_processed = true;
subsequent_errors_count = 0;
}
@ -791,6 +793,8 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
bool executed_by_us = false;
bool executed_by_other_leader = false;
bool extra_attempt_for_replicated_database = false;
/// Defensive programming. One hour is more than enough to execute almost all DDL queries.
/// If it will be very long query like ALTER DELETE for a huge table it's still will be executed,
/// but DDL worker can continue processing other queries.
@ -835,7 +839,14 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
/// Checking and incrementing counter exclusively.
size_t counter = parse<int>(zookeeper->get(tries_to_execute_path));
if (counter > MAX_TRIES_TO_EXECUTE)
break;
{
/// Replicated databases have their own retries, limiting retries here would break outer retries
bool is_replicated_database_task = dynamic_cast<DatabaseReplicatedTask *>(&task);
if (is_replicated_database_task)
extra_attempt_for_replicated_database = true;
else
break;
}
zookeeper->set(tries_to_execute_path, toString(counter + 1));
@ -849,6 +860,8 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
executed_by_us = true;
break;
}
else if (extra_attempt_for_replicated_database)
break;
}
/// Waiting for someone who will execute query and change is_executed_path node
@ -892,7 +905,9 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
else /// If we exceeded amount of tries
{
LOG_WARNING(log, "Task {} was not executed by anyone, maximum number of retries exceeded", task.entry_name);
task.execution_status = ExecutionStatus(ErrorCodes::UNFINISHED, "Cannot execute replicated DDL query, maximum retries exceeded");
bool keep_original_error = extra_attempt_for_replicated_database && task.execution_status.code;
if (!keep_original_error)
task.execution_status = ExecutionStatus(ErrorCodes::UNFINISHED, "Cannot execute replicated DDL query, maximum retries exceeded");
}
return false;
}
@ -1144,12 +1159,14 @@ void DDLWorker::runMainThread()
cleanup_event->set();
scheduleTasks(reinitialized);
subsequent_errors_count = 0;
LOG_DEBUG(log, "Waiting for queue updates");
queue_updated_event->wait();
}
catch (const Coordination::Exception & e)
{
subsequent_errors_count = 0;
if (Coordination::isHardwareError(e.code))
{
initialized = false;
@ -1167,9 +1184,32 @@ void DDLWorker::runMainThread()
}
catch (...)
{
tryLogCurrentException(log, "Unexpected error, will try to restart main thread");
reset_state();
String message = getCurrentExceptionMessage(/*with_stacktrace*/ true);
if (subsequent_errors_count)
{
if (last_unexpected_error == message)
{
++subsequent_errors_count;
}
else
{
subsequent_errors_count = 1;
last_unexpected_error = message;
}
}
else
{
subsequent_errors_count = 1;
last_unexpected_error = message;
}
LOG_ERROR(log, "Unexpected error ({} times in a row), will try to restart main thread: {}", subsequent_errors_count, message);
/// Sleep before retrying
sleepForSeconds(5);
/// Reset state after sleeping, so DatabaseReplicated::canExecuteReplicatedMetadataAlter()
/// will have a chance even when the database got stuck in infinite retries
reset_state();
}
}
}

View File

@ -194,6 +194,9 @@ protected:
ConcurrentSet entries_to_skip;
std::atomic_uint64_t subsequent_errors_count = 0;
String last_unexpected_error;
const CurrentMetrics::Metric * max_entry_metric;
const CurrentMetrics::Metric * max_pushed_entry_metric;
};

View File

@ -35,10 +35,17 @@
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/formatReadable.h>
#include "Core/Joins.h"
#include "Interpreters/TemporaryDataOnDisk.h"
#include <Functions/FunctionHelpers.h>
#include <Interpreters/castColumn.h>
namespace CurrentMetrics
{
extern const Metric TemporaryFilesForJoin;
}
namespace DB
{
@ -63,6 +70,7 @@ struct NotProcessedCrossJoin : public ExtraBlock
{
size_t left_position;
size_t right_block;
std::unique_ptr<TemporaryFileStream::Reader> reader;
};
@ -249,6 +257,10 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
, instance_id(instance_id_)
, asof_inequality(table_join->getAsofInequality())
, data(std::make_shared<RightTableData>())
, tmp_data(
table_join_->getTempDataOnDisk()
? std::make_unique<TemporaryDataOnDisk>(table_join_->getTempDataOnDisk(), CurrentMetrics::TemporaryFilesForJoin)
: nullptr)
, right_sample_block(right_sample_block_)
, max_joined_block_rows(table_join->maxJoinedBlockRows())
, instance_log_id(!instance_id_.empty() ? "(" + instance_id_ + ") " : "")
@ -827,6 +839,21 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
if (shrink_blocks)
block_to_save = block_to_save.shrinkToFit();
size_t max_bytes_in_join = table_join->sizeLimits().max_bytes;
size_t max_rows_in_join = table_join->sizeLimits().max_rows;
if (kind == JoinKind::Cross && tmp_data
&& (tmp_stream || (max_bytes_in_join && getTotalByteCount() + block_to_save.allocatedBytes() >= max_bytes_in_join)
|| (max_rows_in_join && getTotalRowCount() + block_to_save.rows() >= max_rows_in_join)))
{
if (tmp_stream == nullptr)
{
tmp_stream = &tmp_data->createStream(right_sample_block);
}
tmp_stream->write(block_to_save);
return true;
}
size_t total_rows = 0;
size_t total_bytes = 0;
{
@ -944,7 +971,6 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
shrinkStoredBlocksToFit(total_bytes);
return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
}
@ -2238,11 +2264,13 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
{
size_t start_left_row = 0;
size_t start_right_block = 0;
std::unique_ptr<TemporaryFileStream::Reader> reader = nullptr;
if (not_processed)
{
auto & continuation = static_cast<NotProcessedCrossJoin &>(*not_processed);
start_left_row = continuation.left_position;
start_right_block = continuation.right_block;
reader = std::move(continuation.reader);
not_processed.reset();
}
@ -2271,18 +2299,12 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
size_t rows_left = block.rows();
size_t rows_added = 0;
for (size_t left_row = start_left_row; left_row < rows_left; ++left_row)
{
size_t block_number = 0;
for (const Block & compressed_block_right : data->blocks)
auto process_right_block = [&](const Block & block_right)
{
++block_number;
if (block_number < start_right_block)
continue;
auto block_right = compressed_block_right.decompress();
size_t rows_right = block_right.rows();
rows_added += rows_right;
@ -2294,6 +2316,44 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
const IColumn & column_right = *block_right.getByPosition(col_num).column;
dst_columns[num_existing_columns + col_num]->insertRangeFrom(column_right, 0, rows_right);
}
};
for (const Block & compressed_block_right : data->blocks)
{
++block_number;
if (block_number < start_right_block)
continue;
auto block_right = compressed_block_right.decompress();
process_right_block(block_right);
if (rows_added > max_joined_block_rows)
{
break;
}
}
if (tmp_stream && rows_added <= max_joined_block_rows)
{
if (reader == nullptr)
{
tmp_stream->finishWritingAsyncSafe();
reader = tmp_stream->getReadStream();
}
while (auto block_right = reader->read())
{
++block_number;
process_right_block(block_right);
if (rows_added > max_joined_block_rows)
{
break;
}
}
/// It means, that reader->read() returned {}
if (rows_added <= max_joined_block_rows)
{
reader.reset();
}
}
start_right_block = 0;
@ -2301,7 +2361,7 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
if (rows_added > max_joined_block_rows)
{
not_processed = std::make_shared<NotProcessedCrossJoin>(
NotProcessedCrossJoin{{block.cloneEmpty()}, left_row, block_number + 1});
NotProcessedCrossJoin{{block.cloneEmpty()}, left_row, block_number + 1, std::move(reader)});
not_processed->block.swap(block);
break;
}
@ -2427,10 +2487,15 @@ HashJoin::~HashJoin()
{
if (!data)
{
LOG_TRACE(log, "{}Join data has been already released", instance_log_id);
LOG_TEST(log, "{}Join data has been already released", instance_log_id);
return;
}
LOG_TRACE(log, "{}Join data is being destroyed, {} bytes and {} rows in hash table", instance_log_id, getTotalByteCount(), getTotalRowCount());
LOG_TEST(
log,
"{}Join data is being destroyed, {} bytes and {} rows in hash table",
instance_log_id,
getTotalByteCount(),
getTotalRowCount());
}
template <typename Mapped>

View File

@ -26,6 +26,7 @@
#include <Storages/IStorage_fwd.h>
#include <Interpreters/IKeyValueEntity.h>
#include <Interpreters/TemporaryDataOnDisk.h>
namespace DB
{
@ -442,6 +443,10 @@ private:
RightTableDataPtr data;
std::vector<Sizes> key_sizes;
/// Needed to do external cross join
TemporaryDataOnDiskPtr tmp_data;
TemporaryFileStream* tmp_stream{nullptr};
/// Block with columns from the right-side table.
Block right_sample_block;
/// Block with columns from the right-side table except key columns.

View File

@ -505,7 +505,7 @@ ASTPtr InterpreterCreateQuery::formatProjections(const ProjectionsDescription &
}
ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
const ASTExpressionList & columns_ast, ContextPtr context_, LoadingStrictnessLevel mode)
const ASTExpressionList & columns_ast, ContextPtr context_, LoadingStrictnessLevel mode, bool is_restore_from_backup)
{
/// First, deduce implicit types.
@ -514,7 +514,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
NamesAndTypesList column_names_and_types;
bool make_columns_nullable = mode <= LoadingStrictnessLevel::CREATE && context_->getSettingsRef().data_type_default_nullable;
bool make_columns_nullable = mode <= LoadingStrictnessLevel::SECONDARY_CREATE && !is_restore_from_backup && context_->getSettingsRef().data_type_default_nullable;
bool has_columns_with_default_without_type = false;
for (const auto & ast : columns_ast.children)
@ -694,7 +694,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
res.add(std::move(column));
}
if (mode <= LoadingStrictnessLevel::CREATE && context_->getSettingsRef().flatten_nested)
if (mode <= LoadingStrictnessLevel::SECONDARY_CREATE && !is_restore_from_backup && context_->getSettingsRef().flatten_nested)
res.flattenNested();
@ -739,7 +739,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
if (create.columns_list->columns)
{
properties.columns = getColumnsDescription(*create.columns_list->columns, getContext(), mode);
properties.columns = getColumnsDescription(*create.columns_list->columns, getContext(), mode, is_restore_from_backup);
}
if (create.columns_list->indices)

View File

@ -74,7 +74,7 @@ public:
/// Obtain information about columns, their types, default values and column comments,
/// for case when columns in CREATE query is specified explicitly.
static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, ContextPtr context, LoadingStrictnessLevel mode);
static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, ContextPtr context, LoadingStrictnessLevel mode, bool is_restore_from_backup = false);
static ConstraintsDescription getConstraintsDescription(const ASTExpressionList * constraints);
static void prepareOnClusterQuery(ASTCreateQuery & create, ContextPtr context, const String & cluster_name);

View File

@ -5,6 +5,8 @@
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <DataTypes/DataTypesNumber.h>
@ -38,22 +40,47 @@ namespace ErrorCodes
namespace
{
ASTPtr normalizeAndValidateQuery(const ASTPtr & query)
ASTPtr normalizeAndValidateQuery(const ASTPtr & query, const Names & column_names)
{
ASTPtr result_query;
if (query->as<ASTSelectWithUnionQuery>() || query->as<ASTSelectQuery>())
{
return query;
}
result_query = query;
else if (auto * subquery = query->as<ASTSubquery>())
{
return subquery->children[0];
}
result_query = subquery->children[0];
else
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Expected ASTSelectWithUnionQuery or ASTSelectQuery. Actual {}",
query->formatForErrorMessage());
}
if (column_names.empty())
return result_query;
/// The initial query the VIEW references to is wrapped here with another SELECT query to allow reading only necessary columns.
auto select_query = std::make_shared<ASTSelectQuery>();
auto result_table_expression_ast = std::make_shared<ASTTableExpression>();
result_table_expression_ast->children.push_back(std::make_shared<ASTSubquery>(std::move(result_query)));
result_table_expression_ast->subquery = result_table_expression_ast->children.back();
auto tables_in_select_query_element_ast = std::make_shared<ASTTablesInSelectQueryElement>();
tables_in_select_query_element_ast->children.push_back(std::move(result_table_expression_ast));
tables_in_select_query_element_ast->table_expression = tables_in_select_query_element_ast->children.back();
ASTPtr tables_in_select_query_ast = std::make_shared<ASTTablesInSelectQuery>();
tables_in_select_query_ast->children.push_back(std::move(tables_in_select_query_element_ast));
select_query->setExpression(ASTSelectQuery::Expression::TABLES, std::move(tables_in_select_query_ast));
auto projection_expression_list_ast = std::make_shared<ASTExpressionList>();
projection_expression_list_ast->children.reserve(column_names.size());
for (const auto & column_name : column_names)
projection_expression_list_ast->children.push_back(std::make_shared<ASTIdentifier>(column_name));
select_query->setExpression(ASTSelectQuery::Expression::SELECT, std::move(projection_expression_list_ast));
return select_query;
}
ContextMutablePtr buildContext(const ContextPtr & context, const SelectQueryOptions & select_query_options)
@ -125,8 +152,9 @@ QueryTreeNodePtr buildQueryTreeAndRunPasses(const ASTPtr & query,
InterpreterSelectQueryAnalyzer::InterpreterSelectQueryAnalyzer(
const ASTPtr & query_,
const ContextPtr & context_,
const SelectQueryOptions & select_query_options_)
: query(normalizeAndValidateQuery(query_))
const SelectQueryOptions & select_query_options_,
const Names & column_names)
: query(normalizeAndValidateQuery(query_, column_names))
, context(buildContext(context_, select_query_options_))
, select_query_options(select_query_options_)
, query_tree(buildQueryTreeAndRunPasses(query, select_query_options, context, nullptr /*storage*/))
@ -138,8 +166,9 @@ InterpreterSelectQueryAnalyzer::InterpreterSelectQueryAnalyzer(
const ASTPtr & query_,
const ContextPtr & context_,
const StoragePtr & storage_,
const SelectQueryOptions & select_query_options_)
: query(normalizeAndValidateQuery(query_))
const SelectQueryOptions & select_query_options_,
const Names & column_names)
: query(normalizeAndValidateQuery(query_, column_names))
, context(buildContext(context_, select_query_options_))
, select_query_options(select_query_options_)
, query_tree(buildQueryTreeAndRunPasses(query, select_query_options, context, storage_))

View File

@ -16,7 +16,8 @@ public:
/// Initialize interpreter with query AST
InterpreterSelectQueryAnalyzer(const ASTPtr & query_,
const ContextPtr & context_,
const SelectQueryOptions & select_query_options_);
const SelectQueryOptions & select_query_options_,
const Names & column_names = {});
/** Initialize interpreter with query AST and storage.
* After query tree is built left most table expression is replaced with table node that
@ -25,7 +26,8 @@ public:
InterpreterSelectQueryAnalyzer(const ASTPtr & query_,
const ContextPtr & context_,
const StoragePtr & storage_,
const SelectQueryOptions & select_query_options_);
const SelectQueryOptions & select_query_options_,
const Names & column_names = {});
/** Initialize interpreter with query tree.
* No query tree passes are applied.

View File

@ -310,7 +310,7 @@ std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & se
auto settings = context->getSettingsRef();
MultiEnum<JoinAlgorithm> join_algorithm = settings.join_algorithm;
bool try_use_direct_join = join_algorithm.isSet(JoinAlgorithm::DIRECT) || join_algorithm.isSet(JoinAlgorithm::DEFAULT);
auto table_join = std::make_shared<TableJoin>(settings, context->getGlobalTemporaryVolume());
auto table_join = std::make_shared<TableJoin>(settings, context->getGlobalTemporaryVolume(), context->getTempDataOnDisk());
const ASTTablesInSelectQueryElement * ast_join = select_query_.join();
const auto & table_to_join = ast_join->table_expression->as<ASTTableExpression &>();

View File

@ -700,8 +700,10 @@ void MergeJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
/// We need to check type of masks before `addConditionJoinColumn`, because it assumes that types is correct
JoinCommon::checkTypesOfMasks(block, mask_column_name_left, right_sample_block, mask_column_name_right);
/// Add auxiliary column, will be removed after joining
addConditionJoinColumn(block, JoinTableSide::Left);
if (!not_processed)
/// Add an auxiliary column, which will be removed after joining
/// We do not need to add it twice when we are continuing to process the block from the previous iteration
addConditionJoinColumn(block, JoinTableSide::Left);
/// Types of keys can be checked only after `checkTypesOfKeys`
JoinCommon::checkTypesOfKeys(block, key_names_left, right_table_keys, key_names_right);

View File

@ -103,7 +103,7 @@ bool forAllKeys(OnExpr & expressions, Func callback)
}
TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_)
TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_, TemporaryDataOnDiskScopePtr tmp_data_)
: size_limits(SizeLimits{settings.max_rows_in_join, settings.max_bytes_in_join, settings.join_overflow_mode})
, default_max_bytes(settings.default_max_bytes_in_join)
, join_use_nulls(settings.join_use_nulls)
@ -117,6 +117,7 @@ TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_)
, temporary_files_codec(settings.temporary_files_codec)
, max_memory_usage(settings.max_memory_usage)
, tmp_volume(tmp_volume_)
, tmp_data(tmp_data_)
{
}

View File

@ -9,6 +9,7 @@
#include <QueryPipeline/SizeLimits.h>
#include <DataTypes/getLeastSupertype.h>
#include <Interpreters/IKeyValueEntity.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Common/Exception.h>
#include <Parsers/IAST_fwd.h>
@ -188,6 +189,8 @@ private:
VolumePtr tmp_volume;
TemporaryDataOnDiskScopePtr tmp_data;
std::shared_ptr<StorageJoin> right_storage_join;
std::shared_ptr<const IKeyValueEntity> right_kv_storage;
@ -233,7 +236,7 @@ private:
public:
TableJoin() = default;
TableJoin(const Settings & settings, VolumePtr tmp_volume_);
TableJoin(const Settings & settings, VolumePtr tmp_volume_, TemporaryDataOnDiskScopePtr tmp_data_);
/// for StorageJoin
TableJoin(SizeLimits limits, bool use_nulls, JoinKind kind, JoinStrictness strictness,
@ -259,6 +262,8 @@ public:
VolumePtr getGlobalTemporaryVolume() { return tmp_volume; }
TemporaryDataOnDiskScopePtr getTempDataOnDisk() { return tmp_data; }
ActionsDAGPtr createJoinedBlockActions(ContextPtr context) const;
const std::vector<JoinAlgorithm> & getEnabledJoinAlgorithms() const { return join_algorithm; }

View File

@ -1,12 +1,11 @@
#include <atomic>
#include <mutex>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Compression/CompressedReadBuffer.h>
#include <Interpreters/Cache/FileCache.h>
#include <Formats/NativeWriter.h>
#include <Formats/NativeReader.h>
#include <Core/ProtocolDefines.h>
#include <Disks/SingleDiskVolume.h>
#include <Disks/DiskLocal.h>
@ -14,6 +13,7 @@
#include <Core/Defines.h>
#include <Interpreters/Cache/WriteBufferToFileSegment.h>
#include "Common/Exception.h"
namespace ProfileEvents
{
@ -224,33 +224,26 @@ struct TemporaryFileStream::OutputWriter
bool finalized = false;
};
struct TemporaryFileStream::InputReader
TemporaryFileStream::Reader::Reader(const String & path, const Block & header_, size_t size)
: in_file_buf(path, size ? std::min<size_t>(DBMS_DEFAULT_BUFFER_SIZE, size) : DBMS_DEFAULT_BUFFER_SIZE)
, in_compressed_buf(in_file_buf)
, in_reader(in_compressed_buf, header_, DBMS_TCP_PROTOCOL_VERSION)
{
InputReader(const String & path, const Block & header_, size_t size = 0)
: in_file_buf(path, size ? std::min<size_t>(DBMS_DEFAULT_BUFFER_SIZE, size) : DBMS_DEFAULT_BUFFER_SIZE)
, in_compressed_buf(in_file_buf)
, in_reader(in_compressed_buf, header_, DBMS_TCP_PROTOCOL_VERSION)
{
LOG_TEST(getLogger("TemporaryFileStream"), "Reading {} from {}", header_.dumpStructure(), path);
}
LOG_TEST(getLogger("TemporaryFileStream"), "Reading {} from {}", header_.dumpStructure(), path);
}
explicit InputReader(const String & path, size_t size = 0)
: in_file_buf(path, size ? std::min<size_t>(DBMS_DEFAULT_BUFFER_SIZE, size) : DBMS_DEFAULT_BUFFER_SIZE)
, in_compressed_buf(in_file_buf)
, in_reader(in_compressed_buf, DBMS_TCP_PROTOCOL_VERSION)
{
LOG_TEST(getLogger("TemporaryFileStream"), "Reading from {}", path);
}
TemporaryFileStream::Reader::Reader(const String & path, size_t size)
: in_file_buf(path, size ? std::min<size_t>(DBMS_DEFAULT_BUFFER_SIZE, size) : DBMS_DEFAULT_BUFFER_SIZE)
, in_compressed_buf(in_file_buf)
, in_reader(in_compressed_buf, DBMS_TCP_PROTOCOL_VERSION)
{
LOG_TEST(getLogger("TemporaryFileStream"), "Reading from {}", path);
}
Block read()
{
return in_reader.read();
}
ReadBufferFromFile in_file_buf;
CompressedReadBuffer in_compressed_buf;
NativeReader in_reader;
};
Block TemporaryFileStream::Reader::read()
{
return in_reader.read();
}
TemporaryFileStream::TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_)
: parent(parent_)
@ -310,6 +303,12 @@ TemporaryFileStream::Stat TemporaryFileStream::finishWriting()
return stat;
}
TemporaryFileStream::Stat TemporaryFileStream::finishWritingAsyncSafe()
{
std::call_once(finish_writing, [this]{ finishWriting(); });
return stat;
}
bool TemporaryFileStream::isWriteFinished() const
{
assert(in_reader == nullptr || out_writer == nullptr);
@ -326,7 +325,7 @@ Block TemporaryFileStream::read()
if (!in_reader)
{
in_reader = std::make_unique<InputReader>(getPath(), header, getSize());
in_reader = std::make_unique<Reader>(getPath(), header, getSize());
}
Block block = in_reader->read();
@ -338,6 +337,17 @@ Block TemporaryFileStream::read()
return block;
}
std::unique_ptr<TemporaryFileStream::Reader> TemporaryFileStream::getReadStream()
{
if (!isWriteFinished())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been not finished");
if (isEof())
return nullptr;
return std::make_unique<Reader>(getPath(), header, getSize());
}
void TemporaryFileStream::updateAllocAndCheck()
{
assert(out_writer);

View File

@ -1,7 +1,12 @@
#pragma once
#include <atomic>
#include <mutex>
#include <boost/noncopyable.hpp>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <Formats/NativeReader.h>
#include <Core/Block.h>
#include <Disks/IVolume.h>
#include <Disks/TemporaryFileOnDisk.h>
@ -132,12 +137,25 @@ private:
/*
* Data can be written into this stream and then read.
* After finish writing, call `finishWriting` and then `read` to read the data.
* After finish writing, call `finishWriting` and then either call `read` or 'getReadStream'(only one of the two) to read the data.
* Account amount of data written to disk in parent scope.
*/
class TemporaryFileStream : boost::noncopyable
{
public:
struct Reader
{
Reader(const String & path, const Block & header_, size_t size = 0);
explicit Reader(const String & path, size_t size = 0);
Block read();
ReadBufferFromFile in_file_buf;
CompressedReadBuffer in_compressed_buf;
NativeReader in_reader;
};
struct Stat
{
/// Statistics for file
@ -154,8 +172,11 @@ public:
void flush();
Stat finishWriting();
Stat finishWritingAsyncSafe();
bool isWriteFinished() const;
std::unique_ptr<Reader> getReadStream();
Block read();
String getPath() const;
@ -184,11 +205,12 @@ private:
Stat stat;
std::once_flag finish_writing;
struct OutputWriter;
std::unique_ptr<OutputWriter> out_writer;
struct InputReader;
std::unique_ptr<InputReader> in_reader;
std::unique_ptr<Reader> in_reader;
};
}

View File

@ -8,6 +8,7 @@
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Common/ClickHouseRevision.h>
#include <Common/SymbolIndex.h>
namespace DB
@ -53,6 +54,18 @@ ColumnsDescription TraceLogElement::getColumnsDescription()
};
}
NamesAndAliases TraceLogElement::getNamesAndAliases()
{
String build_id_hex;
#if defined(__ELF__) && !defined(OS_FREEBSD)
build_id_hex = SymbolIndex::instance().getBuildIDHex();
#endif
return
{
{"build_id", std::make_shared<DataTypeString>(), "\'" + build_id_hex + "\'"},
};
}
void TraceLogElement::appendToBlock(MutableColumns & columns) const
{
size_t i = 0;

View File

@ -39,7 +39,7 @@ struct TraceLogElement
static std::string name() { return "TraceLog"; }
static ColumnsDescription getColumnsDescription();
static NamesAndAliases getNamesAndAliases() { return {}; }
static NamesAndAliases getNamesAndAliases();
void appendToBlock(MutableColumns & columns) const;
};

View File

@ -808,12 +808,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
bool is_create_parameterized_view = false;
if (const auto * create_query = ast->as<ASTCreateQuery>())
{
is_create_parameterized_view = create_query->isParameterizedView();
}
else if (const auto * explain_query = ast->as<ASTExplainQuery>())
{
assert(!explain_query->children.empty());
if (const auto * create_of_explain_query = explain_query->children[0]->as<ASTCreateQuery>())
is_create_parameterized_view = create_of_explain_query->isParameterizedView();
if (!explain_query->children.empty())
if (const auto * create_of_explain_query = explain_query->children[0]->as<ASTCreateQuery>())
is_create_parameterized_view = create_of_explain_query->isParameterizedView();
}
/// Replace ASTQueryParameter with ASTLiteral for prepared statements.

View File

@ -51,7 +51,7 @@ public:
{
/// We allow to not hide type of the disk, e.g. disk(type = s3, ...)
/// and also nested disk, e.g. disk = 'disk_name'
return arg_name != "type" && arg_name != "disk";
return arg_name != "type" && arg_name != "disk" && arg_name != "name" ;
};
for (const auto & arg : disk_function_args)

View File

@ -107,6 +107,9 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!columns_p.parse(pos, columns, expected))
return false;
/// Optional trailing comma
ParserToken(TokenType::Comma).ignore(pos);
if (!s_rparen.ignore(pos, expected))
return false;
}

View File

@ -4,20 +4,6 @@
namespace DB
{
Tokens::Tokens(const char * begin, const char * end, size_t max_query_size, bool skip_insignificant)
{
Lexer lexer(begin, end, max_query_size);
bool stop = false;
do
{
Token token = lexer.nextToken();
stop = token.isEnd() || token.type == TokenType::ErrorMaxQuerySizeExceeded;
if (token.isSignificant() || (!skip_insignificant && !data.empty() && data.back().isSignificant()))
data.emplace_back(std::move(token));
} while (!stop);
}
UnmatchedParentheses checkUnmatchedParentheses(TokenIterator begin)
{
/// We have just two kind of parentheses: () and [].

View File

@ -15,25 +15,44 @@ namespace DB
*/
/** Used as an input for parsers.
* All whitespace and comment tokens are transparently skipped.
* All whitespace and comment tokens are transparently skipped if `skip_insignificant`.
*/
class Tokens
{
private:
std::vector<Token> data;
std::size_t last_accessed_index = 0;
Lexer lexer;
bool skip_insignificant;
public:
Tokens(const char * begin, const char * end, size_t max_query_size = 0, bool skip_insignificant = true);
ALWAYS_INLINE inline const Token & operator[](size_t index)
Tokens(const char * begin, const char * end, size_t max_query_size = 0, bool skip_insignificant_ = true)
: lexer(begin, end, max_query_size), skip_insignificant(skip_insignificant_)
{
assert(index < data.size());
last_accessed_index = std::max(last_accessed_index, index);
return data[index];
}
ALWAYS_INLINE inline const Token & max() { return data[last_accessed_index]; }
const Token & operator[] (size_t index)
{
while (true)
{
if (index < data.size())
return data[index];
if (!data.empty() && data.back().isEnd())
return data.back();
Token token = lexer.nextToken();
if (!skip_insignificant || token.isSignificant())
data.emplace_back(token);
}
}
const Token & max()
{
if (data.empty())
return (*this)[0];
return data.back();
}
};

View File

@ -36,6 +36,12 @@ public:
void visitImpl(const QueryTreeNodePtr & node)
{
if (const auto * constant_node = node->as<ConstantNode>())
/// Collect sets from source expression as well.
/// Most likely we will not build them, but those sets could be requested during analysis.
if (constant_node->hasSourceExpression())
collectSets(constant_node->getSourceExpression(), planner_context);
auto * function_node = node->as<FunctionNode>();
if (!function_node || !isNameOfInFunction(function_node->getFunctionName()))
return;

View File

@ -1229,8 +1229,9 @@ void Planner::buildQueryPlanIfNeeded()
if (query_plan.isInitialized())
return;
LOG_TRACE(getLogger("Planner"), "Query {} to stage {}{}",
query_tree->formatConvertedASTForErrorMessage(),
LOG_TRACE(
getLogger("Planner"),
"Query to stage {}{}",
QueryProcessingStage::toString(select_query_options.to_stage),
select_query_options.only_analyze ? " only analyze" : "");
@ -1506,8 +1507,9 @@ void Planner::buildPlanForQueryNode()
auto & mapping = join_tree_query_plan.query_node_to_plan_step_mapping;
query_node_to_plan_step_mapping.insert(mapping.begin(), mapping.end());
LOG_TRACE(getLogger("Planner"), "Query {} from stage {} to stage {}{}",
query_tree->formatConvertedASTForErrorMessage(),
LOG_TRACE(
getLogger("Planner"),
"Query from stage {} to stage {}{}",
QueryProcessingStage::toString(from_stage),
QueryProcessingStage::toString(select_query_options.to_stage),
select_query_options.only_analyze ? " only analyze" : "");

View File

@ -1207,7 +1207,7 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(const QueryTreeNodePtr & join_table_
}
}
auto table_join = std::make_shared<TableJoin>(settings, query_context->getGlobalTemporaryVolume());
auto table_join = std::make_shared<TableJoin>(settings, query_context->getGlobalTemporaryVolume(), query_context->getTempDataOnDisk());
table_join->getTableJoin() = join_node.toASTTableJoin()->as<ASTTableJoin &>();
if (join_constant)

View File

@ -328,7 +328,7 @@ void buildJoinClause(
{
throw Exception(
ErrorCodes::INVALID_JOIN_ON_EXPRESSION,
"JOIN {} join expression contains column from left and right table",
"JOIN {} join expression contains column from left and right table, you may try experimental support of this feature by `SET allow_experimental_join_condition = 1`",
join_node.formatASTForErrorMessage());
}
}
@ -363,7 +363,7 @@ void buildJoinClause(
{
throw Exception(
ErrorCodes::INVALID_JOIN_ON_EXPRESSION,
"JOIN {} join expression contains column from left and right table",
"JOIN {} join expression contains column from left and right table, you may try experimental support of this feature by `SET allow_experimental_join_condition = 1`",
join_node.formatASTForErrorMessage());
}
}

View File

@ -1,24 +1,25 @@
#include <Planner/findQueryForParallelReplicas.h>
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Storages/buildQueryTreeForShard.h>
#include <Interpreters/ClusterProxy/executeQuery.h>
#include <Planner/PlannerJoinTree.h>
#include <Planner/Utils.h>
#include <Analyzer/ArrayJoinNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/JoinNode.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/TableNode.h>
#include <Analyzer/UnionNode.h>
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/ClusterProxy/executeQuery.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/queryToString.h>
#include <Planner/PlannerJoinTree.h>
#include <Planner/Utils.h>
#include <Planner/findQueryForParallelReplicas.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/StorageDummy.h>
#include <Storages/StorageMaterializedView.h>
#include <Storages/buildQueryTreeForShard.h>
namespace DB
{
@ -316,7 +317,8 @@ static const TableNode * findTableForParallelReplicas(const IQueryTreeNode * que
case QueryTreeNodeType::TABLE:
{
const auto & table_node = query_tree_node->as<TableNode &>();
const auto & storage = table_node.getStorage();
const auto * as_mat_view = typeid_cast<const StorageMaterializedView *>(table_node.getStorage().get());
const auto & storage = as_mat_view ? as_mat_view->getTargetTable() : table_node.getStorage();
if (std::dynamic_pointer_cast<MergeTreeData>(storage) || typeid_cast<const StorageDummy *>(storage.get()))
return &table_node;

View File

@ -0,0 +1,269 @@
#include <Processors/Formats/Impl/NpyOutputFormat.h>
#include <Core/TypeId.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeArray.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnArray.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <Formats/FormatFactory.h>
#include <Common/assert_cast.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_MANY_COLUMNS;
extern const int BAD_ARGUMENTS;
extern const int ILLEGAL_COLUMN;
}
namespace
{
template <typename ColumnType, typename ValueType>
void writeNumpyNumbers(const ColumnPtr & column, WriteBuffer & buf)
{
const auto * number_column = assert_cast<const ColumnType *>(column.get());
for (size_t i = 0; i < number_column->size(); ++i)
writeBinaryLittleEndian(ValueType(number_column->getElement(i)), buf);
}
template <typename ColumnType>
void writeNumpyStrings(const ColumnPtr & column, size_t length, WriteBuffer & buf)
{
const auto * string_column = assert_cast<const ColumnType *>(column.get());
for (size_t i = 0; i < string_column->size(); ++i)
{
auto data = string_column->getDataAt(i);
buf.write(data.data, data.size);
writeChar(0, length - data.size, buf);
}
}
}
String NpyOutputFormat::shapeStr() const
{
WriteBufferFromOwnString shape;
writeIntText(num_rows, shape);
writeChar(',', shape);
for (UInt64 dim : numpy_shape)
{
writeIntText(dim, shape);
writeChar(',', shape);
}
return shape.str();
}
NpyOutputFormat::NpyOutputFormat(WriteBuffer & out_, const Block & header_) : IOutputFormat(header_, out_)
{
const auto & header = getPort(PortKind::Main).getHeader();
auto data_types = header.getDataTypes();
if (data_types.size() > 1)
throw Exception(ErrorCodes::TOO_MANY_COLUMNS, "Expected single column for Npy output format, got {}", data_types.size());
data_type = data_types[0];
if (!getNumpyDataType(data_type))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Type {} is not supported for Npy output format", nested_data_type->getName());
}
bool NpyOutputFormat::getNumpyDataType(const DataTypePtr & type)
{
switch (type->getTypeId())
{
case TypeIndex::Int8:
numpy_data_type = std::make_shared<NumpyDataTypeInt>(NumpyDataType::Endianness::LITTLE, sizeof(Int8), true);
break;
case TypeIndex::Int16:
numpy_data_type = std::make_shared<NumpyDataTypeInt>(NumpyDataType::Endianness::LITTLE, sizeof(Int16), true);
break;
case TypeIndex::Int32:
numpy_data_type = std::make_shared<NumpyDataTypeInt>(NumpyDataType::Endianness::LITTLE, sizeof(Int32), true);
break;
case TypeIndex::Int64:
numpy_data_type = std::make_shared<NumpyDataTypeInt>(NumpyDataType::Endianness::LITTLE, sizeof(Int64), true);
break;
case TypeIndex::UInt8:
numpy_data_type = std::make_shared<NumpyDataTypeInt>(NumpyDataType::Endianness::LITTLE, sizeof(UInt8), false);
break;
case TypeIndex::UInt16:
numpy_data_type = std::make_shared<NumpyDataTypeInt>(NumpyDataType::Endianness::LITTLE, sizeof(UInt16), false);
break;
case TypeIndex::UInt32:
numpy_data_type = std::make_shared<NumpyDataTypeInt>(NumpyDataType::Endianness::LITTLE, sizeof(UInt32), false);
break;
case TypeIndex::UInt64:
numpy_data_type = std::make_shared<NumpyDataTypeInt>(NumpyDataType::Endianness::LITTLE, sizeof(UInt64), false);
break;
case TypeIndex::Float32:
numpy_data_type = std::make_shared<NumpyDataTypeFloat>(NumpyDataType::Endianness::LITTLE, sizeof(Float32));
break;
case TypeIndex::Float64:
numpy_data_type = std::make_shared<NumpyDataTypeFloat>(NumpyDataType::Endianness::LITTLE, sizeof(Float64));
break;
case TypeIndex::FixedString:
numpy_data_type = std::make_shared<NumpyDataTypeString>(
NumpyDataType::Endianness::NONE, assert_cast<const DataTypeFixedString *>(type.get())->getN());
break;
case TypeIndex::String:
numpy_data_type = std::make_shared<NumpyDataTypeString>(NumpyDataType::Endianness::NONE, 0);
break;
case TypeIndex::Array:
return getNumpyDataType(assert_cast<const DataTypeArray *>(type.get())->getNestedType());
default:
nested_data_type = type;
return false;
}
nested_data_type = type;
return true;
}
void NpyOutputFormat::consume(Chunk chunk)
{
if (!invalid_shape)
{
num_rows += chunk.getNumRows();
const auto & column = chunk.getColumns()[0];
if (!is_initialized)
{
initShape(column);
is_initialized = true;
}
ColumnPtr nested_column = column;
checkShape(nested_column);
updateSizeIfTypeString(nested_column);
columns.push_back(nested_column);
}
}
void NpyOutputFormat::initShape(const ColumnPtr & column)
{
ColumnPtr nested_column = column;
while (const auto * array_column = typeid_cast<const ColumnArray *>(nested_column.get()))
{
auto dim = array_column->getOffsets()[0];
invalid_shape = dim == 0;
numpy_shape.push_back(dim);
nested_column = array_column->getDataPtr();
}
if (invalid_shape)
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Shape ({}) is invalid, as dimension size cannot be 0", shapeStr());
}
void NpyOutputFormat::checkShape(ColumnPtr & column)
{
int dim = 0;
while (const auto * array_column = typeid_cast<const ColumnArray *>(column.get()))
{
const auto & array_offset = array_column->getOffsets();
for (size_t i = 0; i < array_offset.size(); ++i)
if (array_offset[i] - array_offset[i - 1] != numpy_shape[dim])
{
invalid_shape = true;
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "ClickHouse doesn't support object types, cannot format ragged nested sequences (which is a list of arrays with different shapes)");
}
column = array_column->getDataPtr();
dim += 1;
}
}
void NpyOutputFormat::updateSizeIfTypeString(const ColumnPtr & column)
{
if (nested_data_type->getTypeId() == TypeIndex::String)
{
const auto & string_offsets = assert_cast<const ColumnString *>(column.get())->getOffsets();
for (size_t i = 0; i < string_offsets.size(); ++i)
{
size_t string_length = static_cast<size_t>(string_offsets[i] - 1 - string_offsets[i - 1]);
if (numpy_data_type->getSize() < string_length)
numpy_data_type->setSize(string_length);
}
}
}
void NpyOutputFormat::finalizeImpl()
{
if (!invalid_shape)
{
writeHeader();
writeColumns();
}
}
void NpyOutputFormat::writeHeader()
{
String dict = "{'descr':'" + numpy_data_type->str() + "','fortran_order':False,'shape':(" + shapeStr() + "),}";
String padding = "\n";
/// completes the length of the header, which is divisible by 64.
size_t dict_length = dict.length() + 1;
size_t header_length = STATIC_HEADER_LENGTH + sizeof(UInt32) + dict_length;
if (header_length % 64)
{
header_length = ((header_length / 64) + 1) * 64;
dict_length = header_length - STATIC_HEADER_LENGTH - sizeof(UInt32);
padding = std::string(dict_length - dict.length(), '\x20');
padding.back() = '\n';
}
out.write(STATIC_HEADER, STATIC_HEADER_LENGTH);
writeBinaryLittleEndian(static_cast<UInt32>(dict_length), out);
out.write(dict.data(), dict.length());
out.write(padding.data(), padding.length());
}
void NpyOutputFormat::writeColumns()
{
for (const auto & column : columns)
{
switch (nested_data_type->getTypeId())
{
case TypeIndex::Int8: writeNumpyNumbers<ColumnInt8, Int8>(column, out); break;
case TypeIndex::Int16: writeNumpyNumbers<ColumnInt16, Int16>(column, out); break;
case TypeIndex::Int32: writeNumpyNumbers<ColumnInt32, Int32>(column, out); break;
case TypeIndex::Int64: writeNumpyNumbers<ColumnInt64, Int64>(column, out); break;
case TypeIndex::UInt8: writeNumpyNumbers<ColumnUInt8, UInt8>(column, out); break;
case TypeIndex::UInt16: writeNumpyNumbers<ColumnUInt16, UInt16>(column, out); break;
case TypeIndex::UInt32: writeNumpyNumbers<ColumnUInt32, UInt32>(column, out); break;
case TypeIndex::UInt64: writeNumpyNumbers<ColumnUInt64, UInt64>(column, out); break;
case TypeIndex::Float32: writeNumpyNumbers<ColumnFloat32, Float32>(column, out); break;
case TypeIndex::Float64: writeNumpyNumbers<ColumnFloat64, Float64>(column, out); break;
case TypeIndex::FixedString:
writeNumpyStrings<ColumnFixedString>(column, numpy_data_type->getSize(), out);
break;
case TypeIndex::String:
writeNumpyStrings<ColumnString>(column, numpy_data_type->getSize(), out);
break;
default:
break;
}
}
}
void registerOutputFormatNpy(FormatFactory & factory)
{
factory.registerOutputFormat("Npy",[](
WriteBuffer & buf,
const Block & sample,
const FormatSettings &)
{
return std::make_shared<NpyOutputFormat>(buf, sample);
});
factory.markFormatHasNoAppendSupport("Npy");
}
}

View File

@ -0,0 +1,60 @@
#pragma once
#include <Core/Block.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferFromVector.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Formats/FormatSettings.h>
#include <Formats/NumpyDataTypes.h>
#include <Columns/IColumn.h>
#include <Common/PODArray_fwd.h>
#include <vector>
#include <string>
namespace DB
{
/** Stream for output data in Npy format.
* https://numpy.org/doc/stable/reference/generated/numpy.lib.format.html
*/
class NpyOutputFormat : public IOutputFormat
{
public:
NpyOutputFormat(WriteBuffer & out_, const Block & header_);
String getName() const override { return "NpyOutputFormat"; }
String getContentType() const override { return "application/octet-stream"; }
private:
String shapeStr() const;
bool getNumpyDataType(const DataTypePtr & type);
void consume(Chunk) override;
void initShape(const ColumnPtr & column);
void checkShape(ColumnPtr & column);
void updateSizeIfTypeString(const ColumnPtr & column);
void finalizeImpl() override;
void writeHeader();
void writeColumns();
bool is_initialized = false;
bool invalid_shape = false;
DataTypePtr data_type;
DataTypePtr nested_data_type;
std::shared_ptr<NumpyDataType> numpy_data_type;
UInt64 num_rows = 0;
std::vector<UInt64> numpy_shape;
Columns columns;
/// static header (version 3.0)
constexpr static auto STATIC_HEADER = "\x93NUMPY\x03\x00";
constexpr static size_t STATIC_HEADER_LENGTH = 8;
};
}

View File

@ -13,10 +13,14 @@ namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
RegexpFieldExtractor::RegexpFieldExtractor(const FormatSettings & format_settings) : regexp(format_settings.regexp.regexp), skip_unmatched(format_settings.regexp.skip_unmatched)
RegexpFieldExtractor::RegexpFieldExtractor(const FormatSettings & format_settings) : regexp_str(format_settings.regexp.regexp), regexp(regexp_str), skip_unmatched(format_settings.regexp.skip_unmatched)
{
if (regexp_str.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The regular expression is not set for the `Regexp` format. It requires setting the value of the `format_regexp` setting.");
size_t fields_count = regexp.NumberOfCapturingGroups();
matched_fields.resize(fields_count);
re2_arguments.resize(fields_count);
@ -58,8 +62,8 @@ bool RegexpFieldExtractor::parseRow(PeekableReadBuffer & buf)
static_cast<int>(re2_arguments_ptrs.size()));
if (!match && !skip_unmatched)
throw Exception(ErrorCodes::INCORRECT_DATA, "Line \"{}\" doesn't match the regexp.",
std::string(buf.position(), line_to_match));
throw Exception(ErrorCodes::INCORRECT_DATA, "Line \"{}\" doesn't match the regexp: `{}`",
std::string(buf.position(), line_to_match), regexp_str);
buf.position() += line_size;
if (!buf.eof() && !checkChar('\n', buf))

View File

@ -31,6 +31,7 @@ public:
size_t getNumberOfGroups() const { return regexp.NumberOfCapturingGroups(); }
private:
String regexp_str;
const re2::RE2 regexp;
// The vector of fields extracted from line using regexp.
std::vector<std::string_view> matched_fields;

View File

@ -572,9 +572,16 @@ bool ValuesBlockInputFormat::checkDelimiterAfterValue(size_t column_idx)
skipWhitespaceIfAny(*buf);
if (likely(column_idx + 1 != num_columns))
{
return checkChar(',', *buf);
}
else
{
/// Optional trailing comma.
if (checkChar(',', *buf))
skipWhitespaceIfAny(*buf);
return checkChar(')', *buf);
}
}
bool ValuesBlockInputFormat::shouldDeduceNewTemplate(size_t column_idx)

View File

@ -59,6 +59,7 @@ public:
const Aggregator::Params & getParams() const { return params; }
const auto & getGroupingSetsParamsList() const { return grouping_sets_params; }
bool isGroupByUseNulls() const { return group_by_use_nulls; }
bool inOrder() const { return !sort_description_for_merging.empty(); }
bool explicitSortingRequired() const { return explicit_sorting_required_for_aggregation_in_order; }

View File

@ -262,10 +262,6 @@ static size_t tryPushDownOverJoinStep(QueryPlan::Node * parent_node, QueryPlan::
{
const auto & left_table_key_name = join_clause.key_names_left[i];
const auto & right_table_key_name = join_clause.key_names_right[i];
if (!join_header.has(left_table_key_name) || !join_header.has(right_table_key_name))
continue;
const auto & left_table_column = left_stream_input_header.getByName(left_table_key_name);
const auto & right_table_column = right_stream_input_header.getByName(right_table_key_name);
@ -338,9 +334,9 @@ static size_t tryPushDownOverJoinStep(QueryPlan::Node * parent_node, QueryPlan::
auto join_filter_push_down_actions = filter->getExpression()->splitActionsForJOINFilterPushDown(filter->getFilterColumnName(),
filter->removesFilterColumn(),
left_stream_available_columns_to_push_down,
left_stream_input_header.getColumnsWithTypeAndName(),
left_stream_input_header,
right_stream_available_columns_to_push_down,
right_stream_input_header.getColumnsWithTypeAndName(),
right_stream_input_header,
equivalent_columns_to_push_down,
equivalent_left_stream_column_to_right_stream_column,
equivalent_right_stream_column_to_left_stream_column);
@ -428,6 +424,9 @@ size_t tryPushDownFilter(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes
/// of the grouping sets, we could not push the filter down.
if (aggregating->isGroupingSets())
{
/// Cannot push down filter if type has been changed.
if (aggregating->isGroupByUseNulls())
return 0;
const auto & actions = filter->getExpression();
const auto & filter_node = actions->findInOutputs(filter->getFilterColumnName());

View File

@ -579,8 +579,6 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
auto candidates = getAggregateProjectionCandidates(node, *aggregating, *reading, max_added_blocks, allow_implicit_projections);
const auto & parts = reading->getParts();
const auto & alter_conversions = reading->getAlterConvertionsForParts();
const auto & query_info = reading->getQueryInfo();
const auto metadata = reading->getStorageMetadata();
ContextPtr context = reading->getContext();
@ -592,7 +590,7 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
}
else if (!candidates.real.empty())
{
auto ordinary_reading_select_result = reading->selectRangesToRead(parts, alter_conversions);
auto ordinary_reading_select_result = reading->selectRangesToRead();
size_t ordinary_reading_marks = ordinary_reading_select_result->selected_marks;
/// Nothing to read. Ignore projections.

View File

@ -136,12 +136,10 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
NormalProjectionCandidate * best_candidate = nullptr;
const Names & required_columns = reading->getAllColumnNames();
const auto & parts = reading->getParts();
const auto & alter_conversions = reading->getAlterConvertionsForParts();
const auto & query_info = reading->getQueryInfo();
MergeTreeDataSelectExecutor reader(reading->getMergeTreeData());
auto ordinary_reading_select_result = reading->selectRangesToRead(parts, alter_conversions);
auto ordinary_reading_select_result = reading->selectRangesToRead();
size_t ordinary_reading_marks = ordinary_reading_select_result->selected_marks;
/// Nothing to read. Ignore projections.

Some files were not shown because too many files have changed in this diff Show More