Merge branch 'master' into non-blocking-connect

This commit is contained in:
Kruglov Pavel 2023-03-24 15:59:40 +01:00 committed by GitHub
commit f3f93dd06c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
169 changed files with 5694 additions and 1643 deletions

@ -1 +1 @@
Subproject commit d80af319f5f047067b956b2fe93a6c00038c1e0d
Subproject commit 4bfaeb31dd0ef13f025221f93c138974a3e0a22a

2
contrib/vectorscan vendored

@ -1 +1 @@
Subproject commit f6250ae3e5a3085000239313ad0689cc1e00cdc2
Subproject commit b4bba94b1a250603b0b198e0394946e32f6c3f30

View File

@ -26,6 +26,7 @@ logging.basicConfig(
total_start_seconds = time.perf_counter()
stage_start_seconds = total_start_seconds
# Thread executor that does not hides exception that happens during function
# execution, and rethrows it after join()
class SafeThread(Thread):
@ -158,6 +159,7 @@ for e in subst_elems:
available_parameters[name] = values
# Takes parallel lists of templates, substitutes them with all combos of
# parameters. The set of parameters is determined based on the first list.
# Note: keep the order of queries -- sometimes we have DROP IF EXISTS

View File

@ -670,7 +670,6 @@ if args.report == "main":
)
elif args.report == "all-queries":
print((header_template.format()))
add_tested_commits()

View File

@ -10,31 +10,38 @@ import requests
import tempfile
DEFAULT_URL = 'https://clickhouse-datasets.s3.amazonaws.com'
DEFAULT_URL = "https://clickhouse-datasets.s3.amazonaws.com"
AVAILABLE_DATASETS = {
'hits': 'hits_v1.tar',
'visits': 'visits_v1.tar',
"hits": "hits_v1.tar",
"visits": "visits_v1.tar",
}
RETRIES_COUNT = 5
def _get_temp_file_name():
return os.path.join(tempfile._get_default_tempdir(), next(tempfile._get_candidate_names()))
return os.path.join(
tempfile._get_default_tempdir(), next(tempfile._get_candidate_names())
)
def build_url(base_url, dataset):
return os.path.join(base_url, dataset, 'partitions', AVAILABLE_DATASETS[dataset])
return os.path.join(base_url, dataset, "partitions", AVAILABLE_DATASETS[dataset])
def dowload_with_progress(url, path):
logging.info("Downloading from %s to temp path %s", url, path)
for i in range(RETRIES_COUNT):
try:
with open(path, 'wb') as f:
with open(path, "wb") as f:
response = requests.get(url, stream=True)
response.raise_for_status()
total_length = response.headers.get('content-length')
total_length = response.headers.get("content-length")
if total_length is None or int(total_length) == 0:
logging.info("No content-length, will download file without progress")
logging.info(
"No content-length, will download file without progress"
)
f.write(response.content)
else:
dl = 0
@ -46,7 +53,11 @@ def dowload_with_progress(url, path):
if sys.stdout.isatty():
done = int(50 * dl / total_length)
percent = int(100 * float(dl) / total_length)
sys.stdout.write("\r[{}{}] {}%".format('=' * done, ' ' * (50-done), percent))
sys.stdout.write(
"\r[{}{}] {}%".format(
"=" * done, " " * (50 - done), percent
)
)
sys.stdout.flush()
break
except Exception as ex:
@ -56,14 +67,21 @@ def dowload_with_progress(url, path):
if os.path.exists(path):
os.remove(path)
else:
raise Exception("Cannot download dataset from {}, all retries exceeded".format(url))
raise Exception(
"Cannot download dataset from {}, all retries exceeded".format(url)
)
sys.stdout.write("\n")
logging.info("Downloading finished")
def unpack_to_clickhouse_directory(tar_path, clickhouse_path):
logging.info("Will unpack data from temp path %s to clickhouse db %s", tar_path, clickhouse_path)
with tarfile.open(tar_path, 'r') as comp_file:
logging.info(
"Will unpack data from temp path %s to clickhouse db %s",
tar_path,
clickhouse_path,
)
with tarfile.open(tar_path, "r") as comp_file:
comp_file.extractall(path=clickhouse_path)
logging.info("Unpack finished")
@ -72,15 +90,21 @@ if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser(
description="Simple tool for dowloading datasets for clickhouse from S3")
description="Simple tool for dowloading datasets for clickhouse from S3"
)
parser.add_argument('--dataset-names', required=True, nargs='+', choices=list(AVAILABLE_DATASETS.keys()))
parser.add_argument('--url-prefix', default=DEFAULT_URL)
parser.add_argument('--clickhouse-data-path', default='/var/lib/clickhouse/')
parser.add_argument(
"--dataset-names",
required=True,
nargs="+",
choices=list(AVAILABLE_DATASETS.keys()),
)
parser.add_argument("--url-prefix", default=DEFAULT_URL)
parser.add_argument("--clickhouse-data-path", default="/var/lib/clickhouse/")
args = parser.parse_args()
datasets = args.dataset_names
logging.info("Will fetch following datasets: %s", ', '.join(datasets))
logging.info("Will fetch following datasets: %s", ", ".join(datasets))
for dataset in datasets:
logging.info("Processing %s", dataset)
temp_archive_path = _get_temp_file_name()
@ -92,10 +116,11 @@ if __name__ == "__main__":
logging.info("Some exception occured %s", str(ex))
raise
finally:
logging.info("Will remove downloaded file %s from filesystem if it exists", temp_archive_path)
logging.info(
"Will remove downloaded file %s from filesystem if it exists",
temp_archive_path,
)
if os.path.exists(temp_archive_path):
os.remove(temp_archive_path)
logging.info("Processing of %s finished", dataset)
logging.info("Fetch finished, enjoy your tables!")

View File

@ -170,6 +170,7 @@ if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]
fi
rg -Fa "<Fatal>" /var/log/clickhouse-server/clickhouse-server.log ||:
rg -A50 -Fa "============" /var/log/clickhouse-server/stderr.log ||:
zstd --threads=0 < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhouse-server.log.zst &
# Compress tables.

View File

@ -11,13 +11,14 @@ RUN apt-get update && env DEBIAN_FRONTEND=noninteractive apt-get install --yes \
aspell \
curl \
git \
file \
libxml2-utils \
moreutils \
python3-fuzzywuzzy \
python3-pip \
shellcheck \
yamllint \
&& pip3 install black==22.8.0 boto3 codespell==2.2.1 dohq-artifactory mypy PyGithub unidiff pylint==2.6.2 \
&& pip3 install black==23.1.0 boto3 codespell==2.2.1 dohq-artifactory mypy PyGithub unidiff pylint==2.6.2 \
&& apt-get clean \
&& rm -rf /root/.cache/pip

View File

@ -377,8 +377,9 @@ CREATE TABLE table_name
i32 Int32,
s String,
...
INDEX a (u64 * i32, s) TYPE minmax GRANULARITY 3,
INDEX b (u64 * length(s)) TYPE set(1000) GRANULARITY 4
INDEX idx1 u64 TYPE bloom_filter GRANULARITY 3,
INDEX idx2 u64 * i32 TYPE minmax GRANULARITY 3,
INDEX idx3 u64 * length(s) TYPE set(1000) GRANULARITY 4
) ENGINE = MergeTree()
...
```
@ -386,8 +387,9 @@ CREATE TABLE table_name
Indices from the example can be used by ClickHouse to reduce the amount of data to read from disk in the following queries:
``` sql
SELECT count() FROM table WHERE s < 'z'
SELECT count() FROM table WHERE u64 * i32 == 10 AND u64 * length(s) >= 1234
SELECT count() FROM table WHERE u64 == 10;
SELECT count() FROM table WHERE u64 * i32 >= 1234
SELECT count() FROM table WHERE u64 * length(s) == 1234
```
Data skipping indexes can also be created on composite columns:

View File

@ -1,7 +0,0 @@
position: 1
label: 'Example Datasets'
collapsible: true
collapsed: true
link:
type: doc
id: en/getting-started/example-datasets/

View File

@ -154,7 +154,7 @@ Arrays are written as a list of comma-separated values in square brackets. Numbe
In input data, ENUM values can be represented as names or as ids. First, we try to match the input value to the ENUM name. If we fail and the input value is a number, we try to match this number to ENUM id.
If input data contains only ENUM ids, it's recommended to enable the setting [input_format_tsv_enum_as_number](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_enum_as_number) to optimize ENUM parsing.
Each element of [Nested](/docs/en/sql-reference/data-types/nested-data-structures/nested.md) structures is represented as an array.
Each element of [Nested](/docs/en/sql-reference/data-types/nested-data-structures/index.md) structures is represented as an array.
For example:
@ -1150,7 +1150,7 @@ Any set of bytes can be output in the strings. Use the `JSONEachRow` format if y
### Usage of Nested Structures {#jsoneachrow-nested}
If you have a table with [Nested](/docs/en/sql-reference/data-types/nested-data-structures/nested.md) data type columns, you can insert JSON data with the same structure. Enable this feature with the [input_format_import_nested_json](/docs/en/operations/settings/settings-formats.md/#input_format_import_nested_json) setting.
If you have a table with [Nested](/docs/en/sql-reference/data-types/nested-data-structures/index.md) data type columns, you can insert JSON data with the same structure. Enable this feature with the [input_format_import_nested_json](/docs/en/operations/settings/settings-formats.md/#input_format_import_nested_json) setting.
For example, consider the following table:
@ -1776,7 +1776,7 @@ message MessageType {
```
ClickHouse tries to find a column named `x.y.z` (or `x_y_z` or `X.y_Z` and so on).
Nested messages are suitable to input or output a [nested data structures](/docs/en/sql-reference/data-types/nested-data-structures/nested.md).
Nested messages are suitable to input or output a [nested data structures](/docs/en/sql-reference/data-types/nested-data-structures/index.md).
Default values defined in a protobuf schema like this
@ -1978,7 +1978,7 @@ To exchange data with Hadoop, you can use [HDFS table engine](/docs/en/engines/t
- [output_format_parquet_row_group_size](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_row_group_size) - row group size in rows while data output. Default value - `1000000`.
- [output_format_parquet_string_as_string](/docs/en/operations/settings/settings-formats.md/#output_format_parquet_string_as_string) - use Parquet String type instead of Binary for String columns. Default value - `false`.
- [input_format_parquet_import_nested](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_import_nested) - allow inserting array of structs into [Nested](/docs/en/sql-reference/data-types/nested-data-structures/nested.md) table in Parquet input format. Default value - `false`.
- [input_format_parquet_import_nested](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_import_nested) - allow inserting array of structs into [Nested](/docs/en/sql-reference/data-types/nested-data-structures/index.md) table in Parquet input format. Default value - `false`.
- [input_format_parquet_case_insensitive_column_matching](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_case_insensitive_column_matching) - ignore case when matching Parquet columns with ClickHouse columns. Default value - `false`.
- [input_format_parquet_allow_missing_columns](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_allow_missing_columns) - allow missing columns while reading Parquet data. Default value - `false`.
- [input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference](/docs/en/operations/settings/settings-formats.md/#input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference) - allow skipping columns with unsupported types while schema inference for Parquet format. Default value - `false`.

View File

@ -6,7 +6,7 @@ keywords: [clickhouse, network, interfaces, http, tcp, grpc, command-line, clien
description: ClickHouse provides three network interfaces
---
# Interfaces
# Drivers and Interfaces
ClickHouse provides three network interfaces (they can be optionally wrapped in TLS for additional security):

View File

@ -331,7 +331,7 @@ It is also possible to `BACKUP`/`RESTORE` to S3 by configuring an S3 disk in the
<s3>
<volumes>
<main>
<disk>s3</disk>
<disk>s3_plain</disk>
</main>
</volumes>
</s3>

View File

@ -964,7 +964,7 @@ Default value: 1.
### input_format_arrow_import_nested {#input_format_arrow_import_nested}
Enables or disables the ability to insert the data into [Nested](../../sql-reference/data-types/nested-data-structures/nested.md) columns as an array of structs in [Arrow](../../interfaces/formats.md/#data_types-matching-arrow) input format.
Enables or disables the ability to insert the data into [Nested](../../sql-reference/data-types/nested-data-structures/index.md) columns as an array of structs in [Arrow](../../interfaces/formats.md/#data_types-matching-arrow) input format.
Possible values:
@ -1024,7 +1024,7 @@ Default value: `none`.
### input_format_orc_import_nested {#input_format_orc_import_nested}
Enables or disables the ability to insert the data into [Nested](../../sql-reference/data-types/nested-data-structures/nested.md) columns as an array of structs in [ORC](../../interfaces/formats.md/#data-format-orc) input format.
Enables or disables the ability to insert the data into [Nested](../../sql-reference/data-types/nested-data-structures/index.md) columns as an array of structs in [ORC](../../interfaces/formats.md/#data-format-orc) input format.
Possible values:
@ -1073,7 +1073,7 @@ Default value: `none`.
### input_format_parquet_import_nested {#input_format_parquet_import_nested}
Enables or disables the ability to insert the data into [Nested](../../sql-reference/data-types/nested-data-structures/nested.md) columns as an array of structs in [Parquet](../../interfaces/formats.md/#data-format-parquet) input format.
Enables or disables the ability to insert the data into [Nested](../../sql-reference/data-types/nested-data-structures/index.md) columns as an array of structs in [Parquet](../../interfaces/formats.md/#data-format-parquet) input format.
Possible values:
@ -1538,6 +1538,6 @@ Default value: `1GiB`.
### input_format_native_allow_types_conversion {#input_format_native_allow_types_conversion}
Allow types conversion in Native input format between columns from input data and requested columns.
Allow types conversion in Native input format between columns from input data and requested columns.
Enabled by default.

View File

@ -3438,7 +3438,7 @@ Default value: `throw`.
## flatten_nested {#flatten-nested}
Sets the data format of a [nested](../../sql-reference/data-types/nested-data-structures/nested.md) columns.
Sets the data format of a [nested](../../sql-reference/data-types/nested-data-structures/index.md) columns.
Possible values:

View File

@ -1,7 +1,7 @@
---
slug: /en/operations/utilities/
sidebar_position: 56
sidebar_label: Utilities
sidebar_label: List of tools and utilities
pagination_next: 'en/operations/utilities/clickhouse-copier'
---

View File

@ -1,13 +1,33 @@
---
slug: /en/sql-reference/data-types/
sidebar_label: Data Types
sidebar_label: List of data types
sidebar_position: 37
---
# Data Types
# ClickHouse Data Types
ClickHouse can store various kinds of data in table cells.
ClickHouse can store various kinds of data in table cells. This section describes the supported data types and special considerations for using and/or implementing them if any.
This section describes the supported data types and special considerations for using and/or implementing them if any.
:::note
You can check whether a data type name is case-sensitive in the [system.data_type_families](../../operations/system-tables/data_type_families.md#system_tables-data_type_families) table.
:::
You can check whether data type name is case-sensitive in the [system.data_type_families](../../operations/system-tables/data_type_families.md#system_tables-data_type_families) table.
ClickHouse data types include:
- **Integer types**: [signed and unsigned integers](./int-uint.md) (`UInt8`, `UInt16`, `UInt32`, `UInt64`, `UInt128`, `UInt256`, `Int8`, `Int16`, `Int32`, `Int64`, `Int128`, `Int256`)
- **Floating-point numbers**: [floats](./float.md)(`Float32` and `Float64`) and [`Decimal` values](./decimal.md)
- **Boolean**: ClickHouse has a [`Boolean` type](./boolean.md)
- **Strings**: [`String`](./string.md) and [`FixedString`](./fixedstring.md)
- **Dates**: use [`Date`](./date.md) and [`Date32`](./date32.md) for days, and [`DateTime`](./datetime.md) and [`DateTime64`](./datetime64.md) for instances in time
- **JSON**: the [`JSON` object](./json.md) stores a JSON document in a single column
- **UUID**: a performant option for storing [`UUID` values](./uuid.md)
- **Low cardinality types**: use an [`Enum`](./enum.md) when you have a handful of unique values, or use [`LowCardinality`](./lowcardinality.md) when you have up to 10,000 unique values of a column
- **Arrays**: any column can be defined as an [`Array` of values](./array.md)
- **Maps**: use [`Map`](./map.md) for storing key/value pairs
- **Aggregation function types**: use [`SimpleAggregateFunction`](./simpleaggregatefunction.md) and [`AggregateFunction`](./aggregatefunction.md) for storing the intermediate status of aggregate function results
- **Nested data structures**: A [`Nested` data structure](./nested-data-structures/index.md) is like a table inside a cell
- **Tuples**: A [`Tuple` of elements](./tuple.md), each having an individual type.
- **Nullable**: [`Nullbale`](./nullable.md) allows you to store a value as `NULL` when a value is "missing" (instead of the column gettings its default value for the data type)
- **IP addresses**: use [`IPv4`](./domains/ipv4.md) and [`IPv6`](./domains/ipv6.md) to efficiently store IP addresses
- **Geo types**: for[ geographical data](./geo.md), including `Point`, `Ring`, `Polygon` and `MultiPolygon`
- **Special data types**: including [`Expression`](./special-data-types/expression.md), [`Set`](./special-data-types/set.md), [`Nothing`](./special-data-types/nothing.md) and [`Interval`](./special-data-types/interval.md)

View File

@ -1,7 +1,105 @@
---
slug: /en/sql-reference/data-types/nested-data-structures/
sidebar_label: Nested Data Structures
sidebar_position: 54
slug: /en/sql-reference/data-types/nested-data-structures/nested
sidebar_position: 57
sidebar_label: Nested(Name1 Type1, Name2 Type2, ...)
---
# Nested Data Structures
# Nested
## Nested(name1 Type1, Name2 Type2, …)
A nested data structure is like a table inside a cell. The parameters of a nested data structure the column names and types are specified the same way as in a [CREATE TABLE](../../../sql-reference/statements/create/table.md) query. Each table row can correspond to any number of rows in a nested data structure.
Example:
``` sql
CREATE TABLE test.visits
(
CounterID UInt32,
StartDate Date,
Sign Int8,
IsNew UInt8,
VisitID UInt64,
UserID UInt64,
...
Goals Nested
(
ID UInt32,
Serial UInt32,
EventTime DateTime,
Price Int64,
OrderID String,
CurrencyID UInt32
),
...
) ENGINE = CollapsingMergeTree(StartDate, intHash32(UserID), (CounterID, StartDate, intHash32(UserID), VisitID), 8192, Sign)
```
This example declares the `Goals` nested data structure, which contains data about conversions (goals reached). Each row in the visits table can correspond to zero or any number of conversions.
When [flatten_nested](../../../operations/settings/settings.md#flatten-nested) is set to `0` (which is not by default), arbitrary levels of nesting are supported.
In most cases, when working with a nested data structure, its columns are specified with column names separated by a dot. These columns make up an array of matching types. All the column arrays of a single nested data structure have the same length.
Example:
``` sql
SELECT
Goals.ID,
Goals.EventTime
FROM test.visits
WHERE CounterID = 101500 AND length(Goals.ID) < 5
LIMIT 10
```
``` text
┌─Goals.ID───────────────────────┬─Goals.EventTime───────────────────────────────────────────────────────────────────────────┐
│ [1073752,591325,591325] │ ['2014-03-17 16:38:10','2014-03-17 16:38:48','2014-03-17 16:42:27'] │
│ [1073752] │ ['2014-03-17 00:28:25'] │
│ [1073752] │ ['2014-03-17 10:46:20'] │
│ [1073752,591325,591325,591325] │ ['2014-03-17 13:59:20','2014-03-17 22:17:55','2014-03-17 22:18:07','2014-03-17 22:18:51'] │
│ [] │ [] │
│ [1073752,591325,591325] │ ['2014-03-17 11:37:06','2014-03-17 14:07:47','2014-03-17 14:36:21'] │
│ [] │ [] │
│ [] │ [] │
│ [591325,1073752] │ ['2014-03-17 00:46:05','2014-03-17 00:46:05'] │
│ [1073752,591325,591325,591325] │ ['2014-03-17 13:28:33','2014-03-17 13:30:26','2014-03-17 18:51:21','2014-03-17 18:51:45'] │
└────────────────────────────────┴───────────────────────────────────────────────────────────────────────────────────────────┘
```
It is easiest to think of a nested data structure as a set of multiple column arrays of the same length.
The only place where a SELECT query can specify the name of an entire nested data structure instead of individual columns is the ARRAY JOIN clause. For more information, see “ARRAY JOIN clause”. Example:
``` sql
SELECT
Goal.ID,
Goal.EventTime
FROM test.visits
ARRAY JOIN Goals AS Goal
WHERE CounterID = 101500 AND length(Goals.ID) < 5
LIMIT 10
```
``` text
┌─Goal.ID─┬──────Goal.EventTime─┐
│ 1073752 │ 2014-03-17 16:38:10 │
│ 591325 │ 2014-03-17 16:38:48 │
│ 591325 │ 2014-03-17 16:42:27 │
│ 1073752 │ 2014-03-17 00:28:25 │
│ 1073752 │ 2014-03-17 10:46:20 │
│ 1073752 │ 2014-03-17 13:59:20 │
│ 591325 │ 2014-03-17 22:17:55 │
│ 591325 │ 2014-03-17 22:18:07 │
│ 591325 │ 2014-03-17 22:18:51 │
│ 1073752 │ 2014-03-17 11:37:06 │
└─────────┴─────────────────────┘
```
You cant perform SELECT for an entire nested data structure. You can only explicitly list individual columns that are part of it.
For an INSERT query, you should pass all the component column arrays of a nested data structure separately (as if they were individual column arrays). During insertion, the system checks that they have the same length.
For a DESCRIBE query, the columns in a nested data structure are listed separately in the same way.
The ALTER query for elements in a nested data structure has limitations.

View File

@ -1,105 +0,0 @@
---
slug: /en/sql-reference/data-types/nested-data-structures/nested
sidebar_position: 57
sidebar_label: Nested(Name1 Type1, Name2 Type2, ...)
---
# Nested
## Nested(name1 Type1, Name2 Type2, …)
A nested data structure is like a table inside a cell. The parameters of a nested data structure the column names and types are specified the same way as in a [CREATE TABLE](../../../sql-reference/statements/create/table.md) query. Each table row can correspond to any number of rows in a nested data structure.
Example:
``` sql
CREATE TABLE test.visits
(
CounterID UInt32,
StartDate Date,
Sign Int8,
IsNew UInt8,
VisitID UInt64,
UserID UInt64,
...
Goals Nested
(
ID UInt32,
Serial UInt32,
EventTime DateTime,
Price Int64,
OrderID String,
CurrencyID UInt32
),
...
) ENGINE = CollapsingMergeTree(StartDate, intHash32(UserID), (CounterID, StartDate, intHash32(UserID), VisitID), 8192, Sign)
```
This example declares the `Goals` nested data structure, which contains data about conversions (goals reached). Each row in the visits table can correspond to zero or any number of conversions.
When [flatten_nested](../../../operations/settings/settings.md#flatten-nested) is set to `0` (which is not by default), arbitrary levels of nesting are supported.
In most cases, when working with a nested data structure, its columns are specified with column names separated by a dot. These columns make up an array of matching types. All the column arrays of a single nested data structure have the same length.
Example:
``` sql
SELECT
Goals.ID,
Goals.EventTime
FROM test.visits
WHERE CounterID = 101500 AND length(Goals.ID) < 5
LIMIT 10
```
``` text
┌─Goals.ID───────────────────────┬─Goals.EventTime───────────────────────────────────────────────────────────────────────────┐
│ [1073752,591325,591325] │ ['2014-03-17 16:38:10','2014-03-17 16:38:48','2014-03-17 16:42:27'] │
│ [1073752] │ ['2014-03-17 00:28:25'] │
│ [1073752] │ ['2014-03-17 10:46:20'] │
│ [1073752,591325,591325,591325] │ ['2014-03-17 13:59:20','2014-03-17 22:17:55','2014-03-17 22:18:07','2014-03-17 22:18:51'] │
│ [] │ [] │
│ [1073752,591325,591325] │ ['2014-03-17 11:37:06','2014-03-17 14:07:47','2014-03-17 14:36:21'] │
│ [] │ [] │
│ [] │ [] │
│ [591325,1073752] │ ['2014-03-17 00:46:05','2014-03-17 00:46:05'] │
│ [1073752,591325,591325,591325] │ ['2014-03-17 13:28:33','2014-03-17 13:30:26','2014-03-17 18:51:21','2014-03-17 18:51:45'] │
└────────────────────────────────┴───────────────────────────────────────────────────────────────────────────────────────────┘
```
It is easiest to think of a nested data structure as a set of multiple column arrays of the same length.
The only place where a SELECT query can specify the name of an entire nested data structure instead of individual columns is the ARRAY JOIN clause. For more information, see “ARRAY JOIN clause”. Example:
``` sql
SELECT
Goal.ID,
Goal.EventTime
FROM test.visits
ARRAY JOIN Goals AS Goal
WHERE CounterID = 101500 AND length(Goals.ID) < 5
LIMIT 10
```
``` text
┌─Goal.ID─┬──────Goal.EventTime─┐
│ 1073752 │ 2014-03-17 16:38:10 │
│ 591325 │ 2014-03-17 16:38:48 │
│ 591325 │ 2014-03-17 16:42:27 │
│ 1073752 │ 2014-03-17 00:28:25 │
│ 1073752 │ 2014-03-17 10:46:20 │
│ 1073752 │ 2014-03-17 13:59:20 │
│ 591325 │ 2014-03-17 22:17:55 │
│ 591325 │ 2014-03-17 22:18:07 │
│ 591325 │ 2014-03-17 22:18:51 │
│ 1073752 │ 2014-03-17 11:37:06 │
└─────────┴─────────────────────┘
```
You cant perform SELECT for an entire nested data structure. You can only explicitly list individual columns that are part of it.
For an INSERT query, you should pass all the component column arrays of a nested data structure separately (as if they were individual column arrays). During insertion, the system checks that they have the same length.
For a DESCRIBE query, the columns in a nested data structure are listed separately in the same way.
The ALTER query for elements in a nested data structure has limitations.

View File

@ -1232,12 +1232,14 @@ SELECT timeSlots(toDateTime64('1980-12-12 21:01:02.1234', 4, 'UTC'), toDecimal64
└───────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```
## formatDateTime
## formatDateTime {#date_time_functions-formatDateTime}
Formats a Time according to the given Format string. Format is a constant expression, so you cannot have multiple formats for a single result column.
formatDateTime uses MySQL datetime format style, refer to https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_date-format.
The opposite operation of this function is [parseDateTime](/docs/en/sql-reference/functions/type-conversion-functions.md#type_conversion_functions-parseDateTime).
Alias: `DATE_FORMAT`.
**Syntax**
@ -1257,7 +1259,7 @@ Using replacement fields, you can define a pattern for the resulting string. “
|----------|---------------------------------------------------------|------------|
| %a | abbreviated weekday name (Mon-Sun) | Mon |
| %b | abbreviated month name (Jan-Dec) | Jan |
| %c | month as a decimal number (01-12) | 01 |
| %c | month as an integer number (01-12) | 01 |
| %C | year divided by 100 and truncated to integer (00-99) | 20 |
| %d | day of the month, zero-padded (01-31) | 02 |
| %D | Short MM/DD/YY date, equivalent to %m/%d/%y | 01/02/18 |
@ -1273,7 +1275,7 @@ Using replacement fields, you can define a pattern for the resulting string. “
| %j | day of the year (001-366) | 002 |
| %k | hour in 24h format (00-23) | 22 |
| %l | hour in 12h format (01-12) | 09 |
| %m | month as a decimal number (01-12) | 01 |
| %m | month as an integer number (01-12) | 01 |
| %M | minute (00-59) | 33 |
| %n | new-line character () | |
| %p | AM or PM designation | PM |
@ -1286,7 +1288,7 @@ Using replacement fields, you can define a pattern for the resulting string. “
| %T | ISO 8601 time format (HH:MM:SS), equivalent to %H:%M:%S | 22:33:44 |
| %u | ISO 8601 weekday as number with Monday as 1 (1-7) | 2 |
| %V | ISO 8601 week number (01-53) | 01 |
| %w | weekday as a decimal number with Sunday as 0 (0-6) | 2 |
| %w | weekday as a integer number with Sunday as 0 (0-6) | 2 |
| %W | full weekday name (Monday-Sunday) | Monday |
| %y | Year, last two digits (00-99) | 18 |
| %Y | Year | 2018 |
@ -1328,10 +1330,11 @@ Result:
- [formatDateTimeInJodaSyntax](##formatDateTimeInJodaSyntax)
## formatDateTimeInJodaSyntax
## formatDateTimeInJodaSyntax {#date_time_functions-formatDateTimeInJodaSyntax}
Similar to formatDateTime, except that it formats datetime in Joda style instead of MySQL style. Refer to https://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html.
The opposite operation of this function is [parseDateTimeInJodaSyntax](/docs/en/sql-reference/functions/type-conversion-functions.md#type_conversion_functions-parseDateTimeInJodaSyntax).
**Replacement fields**

View File

@ -1148,6 +1148,85 @@ Result:
└───────────────────────────┴──────────────────────────────┘
```
## parseDateTime {#type_conversion_functions-parseDateTime}
Converts a [String](/docs/en/sql-reference/data-types/string.md) to [DateTime](/docs/en/sql-reference/data-types/datetime.md) according to a [MySQL format string](https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_date-format).
This function is the opposite operation of function [formatDateTime](/docs/en/sql-reference/functions/date-time-functions.md#date_time_functions-formatDateTime).
**Syntax**
``` sql
parseDateTime(str, format[, timezone])
```
**Arguments**
- `str` — the String to be parsed
- `format` — the format string
- `timezone` — [Timezone](/docs/en/operations/server-configuration-parameters/settings.md/#server_configuration_parameters-timezone). Optional.
**Returned value(s)**
Returns DateTime values parsed from input string according to a MySQL style format string.
**Supported format specifiers**
All format specifiers listed in [formatDateTime](/docs/en/sql-reference/functions/date-time-functions.md#date_time_functions-formatDateTime) except:
- %f: fractional second
- %Q: Quarter (1-4)
**Example**
``` sql
SELECT parseDateTime('2021-01-04+23:00:00', '%Y-%m-%d+%H:%i:%s')
┌─parseDateTime('2021-01-04+23:00:00', '%Y-%m-%d+%H:%i:%s')─┐
│ 2021-01-04 23:00:00 │
└───────────────────────────────────────────────────────────┘
```
Alias: `TO_TIMESTAMP`.
## parseDateTimeInJodaSyntax {#type_conversion_functions-parseDateTimeInJodaSyntax}
Similar to [parseDateTime](#parsedatetime), except that the format string is in [Joda](https://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html) instead of MySQL syntax.
This function is the opposite operation of function [formatDateTimeInJodaSyntax](/docs/en/sql-reference/functions/date-time-functions.md#date_time_functions-formatDateTimeInJodaSyntax).
**Syntax**
``` sql
parseDateTimeInJodaSyntax(str, format[, timezone])
```
**Arguments**
- `str` — the String to be parsed
- `format` — the format string
- `timezone` — [Timezone](/docs/en/operations/server-configuration-parameters/settings.md/#server_configuration_parameters-timezone). Optional.
**Returned value(s)**
Returns DateTime values parsed from input string according to a Joda style format.
**Supported format specifiers**
All format specifiers listed in [formatDateTimeInJoda](/docs/en/sql-reference/functions/date-time-functions.md#date_time_functions-formatDateTime) are supported, except:
- S: fraction of second
- z: time zone
- Z: time zone offset/id
**Example**
``` sql
SELECT parseDateTimeInJodaSyntax('2023-02-24 14:53:31', 'yyyy-MM-dd HH:mm:ss', 'Europe/Minsk')
┌─parseDateTimeInJodaSyntax('2023-02-24 14:53:31', 'yyyy-MM-dd HH:mm:ss', 'Europe/Minsk')─┐
│ 2023-02-24 14:53:31 │
└─────────────────────────────────────────────────────────────────────────────────────────┘
```
## parseDateTimeBestEffort
## parseDateTime32BestEffort
@ -1351,7 +1430,6 @@ Same as for [parseDateTime64BestEffort](#parsedatetime64besteffort), except that
Same as for [parseDateTime64BestEffort](#parsedatetime64besteffort), except that this function prefers US date format (`MM/DD/YYYY` etc.) in case of ambiguity and returns zero date or zero date time when it encounters a date format that cannot be processed.
## toLowCardinality
Converts input parameter to the [LowCardinality](/docs/en/sql-reference/data-types/lowcardinality.md) version of same data type.

View File

@ -24,9 +24,9 @@ The `DESCRIBE` statement returns a row for each table column with the following
- `ttl_expression` — A [TTL](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-ttl) expression.
- `is_subcolumn` — A flag that equals `1` for internal subcolumns. It is included into the result only if subcolumn description is enabled by the [describe_include_subcolumns](../../operations/settings/settings.md#describe_include_subcolumns) setting.
All columns in [Nested](../../sql-reference/data-types/nested-data-structures/nested.md) data structures are described separately. The name of each column is prefixed with a parent column name and a dot.
All columns in [Nested](../../sql-reference/data-types/nested-data-structures/index.md) data structures are described separately. The name of each column is prefixed with a parent column name and a dot.
To show internal subcolumns of other data types, use the [describe_include_subcolumns](../../operations/settings/settings.md#describe_include_subcolumns) setting.
To show internal subcolumns of other data types, use the [describe_include_subcolumns](../../operations/settings/settings.md#describe_include_subcolumns) setting.
**Example**

View File

@ -1,10 +1,10 @@
---
slug: /en/sql-reference/statements/
sidebar_position: 1
sidebar_label: Statements
sidebar_label: List of statements
---
# ClickHouse SQL Statements
# ClickHouse SQL Statements
Statements represent various kinds of action you can perform using SQL queries. Each kind of statement has its own syntax and usage details that are described separately:

View File

@ -185,7 +185,7 @@ SETTINGS enable_unaligned_array_join = 1;
## ARRAY JOIN with Nested Data Structure
`ARRAY JOIN` also works with [nested data structures](../../../sql-reference/data-types/nested-data-structures/nested.md):
`ARRAY JOIN` also works with [nested data structures](../../../sql-reference/data-types/nested-data-structures/index.md):
``` sql
CREATE TABLE nested_test

View File

@ -97,7 +97,7 @@ CREATE DATABASE mysql ENGINE = MaterializeMySQL('localhost:3306', 'db', 'user',
### DDL查询 {#ddl-queries}
MySQL DDL查询转换为相应的ClickHouse DDL查询([ALTER](../../sql-reference/statements/alter/index.md), [CREATE](../../sql-reference/statements/create/index.md), [DROP](../../sql-reference/statements/drop.md), [RENAME](../../sql-reference/statements/rename.md))。如果ClickHouse无法解析某个DDL查询则该查询将被忽略。
MySQL DDL查询转换为相应的ClickHouse DDL查询([ALTER](../../sql-reference/statements/alter/index.md), [CREATE](../../sql-reference/statements/create.md), [DROP](../../sql-reference/statements/drop.md), [RENAME](../../sql-reference/statements/rename.md))。如果ClickHouse无法解析某个DDL查询则该查询将被忽略。
### Data Replication {#data-replication}

View File

@ -109,7 +109,7 @@ MySQL中的Time 类型会被ClickHouse转换成微秒来存储
### DDL Queries {#ddl-queries}
MySQL DDL 语句会被转换成对应的ClickHouse DDL 语句,比如: ([ALTER](../../sql-reference/statements/alter/index.md), [CREATE](../../sql-reference/statements/create/index.md), [DROP](../../sql-reference/statements/drop.md), [RENAME](../../sql-reference/statements/rename.md)). 如果ClickHouse 无法解析某些语句DDL 操作,则会跳过。
MySQL DDL 语句会被转换成对应的ClickHouse DDL 语句,比如: ([ALTER](../../sql-reference/statements/alter/index.md), [CREATE](../../sql-reference/statements/create.md), [DROP](../../sql-reference/statements/drop.md), [RENAME](../../sql-reference/statements/rename.md)). 如果ClickHouse 无法解析某些语句DDL 操作,则会跳过。
### 数据复制 {#data-replication}

View File

@ -1,5 +1,5 @@
---
slug: /zh/faq/general
slug: /zh/faq/general/overview
---
# 常见问题 {#chang-jian-wen-ti}

View File

@ -21,8 +21,7 @@ sidebar_label: General
- [我如何为 ClickHouse贡献代码?](../../faq/general/how-do-i-contribute-code-to-clickhouse.md)
!!! info "没找到您需要的内容?"
请查阅 [其他 F.A.Q. 类别](../../faq/) 或者从左侧导航栏浏览其他文档
请查阅 [其他 F.A.Q. 类别](../../faq/index.md) 或者从左侧导航栏浏览其他文档
{## [原始文档](https://clickhouse.com/docs/en/faq/general/) ##}

View File

@ -338,6 +338,12 @@ UserID.binURL.bin和EventTime.bin是<font face = "monospace">UserID</font>
:::note
- 最后一个颗粒1082颗粒是少于8192行的。
- 我们在本指南开头的“DDL 语句详细信息”中提到,我们禁用了自适应索引粒度(为了简化本指南中的讨论,并使图表和结果可重现)。
因此,示例表中所有颗粒(除了最后一个)都具有相同大小。
- 对于具有自适应索引粒度的表(默认情况下索引粒度是自适应的),某些粒度的大小可以小于 8192 行,具体取决于行数据大小。
- 我们将主键列(<font face = "monospace">UserID</font>, <font face = "monospace">URL</font>)中的一些列值标记为橙色。
这些橙色标记的列值是每个颗粒中每个主键列的最小值。这里的例外是最后一个颗粒(上图中的颗粒1082),最后一个颗粒我们标记的是最大的值。

View File

@ -1,10 +0,0 @@
---
slug: /zh/sql-reference/functions/geo/
sidebar_label: Geo
sidebar_position: 62
title: "Geo Functions"
---
import Content from '@site/docs/en/sql-reference/functions/geo/index.md';
<Content />

View File

@ -1,5 +1,5 @@
---
slug: /zh/sql-reference/statements/alter/
slug: /zh/sql-reference/statements/alter/overview
sidebar_position: 35
sidebar_label: ALTER
---

View File

@ -1,11 +0,0 @@
---
slug: /zh/sql-reference/statements/create/
sidebar_label: CREATE
sidebar_position: 34
---
# CREATE语法 {#create-queries}
CREATE语法包含以下子集:
- [DATABASE](../../../sql-reference/statements/create/database.md)

View File

@ -10,7 +10,7 @@ sidebar_position: 31
- [SELECT](../../sql-reference/statements/select/index.md)
- [INSERT INTO](../../sql-reference/statements/insert-into.md)
- [CREATE](../../sql-reference/statements/create/index.md)
- [CREATE](../../sql-reference/statements/create.md)
- [ALTER](../../sql-reference/statements/alter/index.md)
- [SYSTEM](../../sql-reference/statements/system.md)
- [SHOW](../../sql-reference/statements/show.md)

View File

@ -810,12 +810,9 @@ bool BackupCoordinationRemote::hasConcurrentBackups(const std::atomic<size_t> &)
if (existing_backup_uuid == toString(backup_uuid))
continue;
String status;
if (zk->tryGet(root_zookeeper_path + "/" + existing_backup_path + "/stage", status))
{
if (status != Stage::COMPLETED)
return true;
}
const auto status = zk->get(root_zookeeper_path + "/" + existing_backup_path + "/stage");
if (status != Stage::COMPLETED)
return true;
}
zk->createIfNotExists(backup_stage_path, "");

View File

@ -349,12 +349,14 @@ The server successfully detected this situation and will download merged part fr
M(DiskS3PutObject, "Number of DiskS3 API PutObject calls.") \
M(DiskS3GetObject, "Number of DiskS3 API GetObject calls.") \
\
M(ReadBufferFromS3Microseconds, "Time spend in reading from S3.") \
M(ReadBufferFromS3InitMicroseconds, "Time spend initializing connection to S3.") \
M(ReadBufferFromS3Microseconds, "Time spent on reading from S3.") \
M(ReadBufferFromS3InitMicroseconds, "Time spent initializing connection to S3.") \
M(ReadBufferFromS3Bytes, "Bytes read from S3.") \
M(ReadBufferFromS3RequestsErrors, "Number of exceptions while reading from S3.") \
\
M(WriteBufferFromS3Microseconds, "Time spent on writing to S3.") \
M(WriteBufferFromS3Bytes, "Bytes written to S3.") \
M(WriteBufferFromS3RequestsErrors, "Number of exceptions while writing to S3.") \
\
M(QueryMemoryLimitExceeded, "Number of times when memory limit exceeded for query.") \
\

View File

@ -13,6 +13,7 @@
#include <Functions/IFunction.h>
#include <Functions/castTypeToEither.h>
#include <Functions/extractTimeZoneFromFunctionArguments.h>
#include <Functions/numLiteralChars.h>
#include <IO/WriteHelpers.h>
@ -54,55 +55,19 @@ struct FormatDateTimeTraits
};
template <typename DataType> struct ActionValueTypeMap {};
template <> struct ActionValueTypeMap<DataTypeInt8> { using ActionValueType = UInt32; };
template <> struct ActionValueTypeMap<DataTypeUInt8> { using ActionValueType = UInt32; };
template <> struct ActionValueTypeMap<DataTypeInt16> { using ActionValueType = UInt32; };
template <> struct ActionValueTypeMap<DataTypeUInt16> { using ActionValueType = UInt32; };
template <> struct ActionValueTypeMap<DataTypeInt32> { using ActionValueType = UInt32; };
template <> struct ActionValueTypeMap<DataTypeUInt32> { using ActionValueType = UInt32; };
template <> struct ActionValueTypeMap<DataTypeInt64> { using ActionValueType = UInt32; };
template <> struct ActionValueTypeMap<DataTypeUInt64> { using ActionValueType = UInt32; };
template <> struct ActionValueTypeMap<DataTypeDate> { using ActionValueType = UInt16; };
template <> struct ActionValueTypeMap<DataTypeDate32> { using ActionValueType = Int32; };
template <> struct ActionValueTypeMap<DataTypeDateTime> { using ActionValueType = UInt32; };
template <> struct ActionValueTypeMap<DataTypeDateTime64> { using ActionValueType = Int64; };
/// Counts the number of literal characters in Joda format string until the next closing literal
/// sequence single quote. Returns -1 if no literal single quote was found.
/// In Joda format string(https://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html)
/// literal content must be quoted with single quote. and two single quote means literal with one single quote.
/// For example:
/// Format string: "'aaaa'", unescaped literal: "aaaa";
/// Format string: "'aa''aa'", unescaped literal: "aa'aa";
/// Format string: "'aaa''aa" is not valid because of missing of end single quote.
Int64 numLiteralChars(const char * cur, const char * end)
{
bool found = false;
Int64 count = 0;
while (cur < end)
{
if (*cur == '\'')
{
if (cur + 1 < end && *(cur + 1) == '\'')
{
count += 2;
cur += 2;
}
else
{
found = true;
break;
}
}
else
{
++count;
++cur;
}
}
return found ? count : -1;
}
template <typename DataType> struct InstructionValueTypeMap {};
template <> struct InstructionValueTypeMap<DataTypeInt8> { using InstructionValueType = UInt32; };
template <> struct InstructionValueTypeMap<DataTypeUInt8> { using InstructionValueType = UInt32; };
template <> struct InstructionValueTypeMap<DataTypeInt16> { using InstructionValueType = UInt32; };
template <> struct InstructionValueTypeMap<DataTypeUInt16> { using InstructionValueType = UInt32; };
template <> struct InstructionValueTypeMap<DataTypeInt32> { using InstructionValueType = UInt32; };
template <> struct InstructionValueTypeMap<DataTypeUInt32> { using InstructionValueType = UInt32; };
template <> struct InstructionValueTypeMap<DataTypeInt64> { using InstructionValueType = UInt32; };
template <> struct InstructionValueTypeMap<DataTypeUInt64> { using InstructionValueType = UInt32; };
template <> struct InstructionValueTypeMap<DataTypeDate> { using InstructionValueType = UInt16; };
template <> struct InstructionValueTypeMap<DataTypeDate32> { using InstructionValueType = Int32; };
template <> struct InstructionValueTypeMap<DataTypeDateTime> { using InstructionValueType = UInt32; };
template <> struct InstructionValueTypeMap<DataTypeDateTime64> { using InstructionValueType = Int64; };
/// Cast value from integer to string, making sure digits number in result string is no less than total_digits by padding leading '0'.
String padValue(UInt32 val, size_t min_digits)
@ -184,7 +149,7 @@ private:
}
template <typename Time>
class Action
class Instruction
{
public:
/// Using std::function will cause performance degradation in MySQL format by 0.45x.
@ -201,8 +166,8 @@ private:
/// extra_shift is only used in MySQL format syntax. It is always 0 in Joda format syntax.
size_t extra_shift = 0;
/// Action for appending date/time related number in specified format.
explicit Action(Func && func_) : func(std::move(func_)) {}
/// Instruction for appending date/time related number in specified format.
explicit Instruction(Func && func_) : func(std::move(func_)) {}
void perform(char *& dest, Time source, UInt64 fractional_second, UInt32 scale, const DateLUTImpl & timezone)
{
@ -825,8 +790,8 @@ public:
if constexpr (std::is_same_v<DataType, DataTypeDateTime64>)
scale = times->getScale();
using T = typename ActionValueTypeMap<DataType>::ActionValueType;
std::vector<Action<T>> instructions;
using T = typename InstructionValueTypeMap<DataType>::InstructionValueType;
std::vector<Instruction<T>> instructions;
String out_template;
auto result_size = parseFormat(format, instructions, scale, out_template);
@ -898,27 +863,25 @@ public:
}
template <typename T>
size_t parseFormat(const String & format, std::vector<Action<T>> & instructions, UInt32 scale, String & out_template) const
size_t parseFormat(const String & format, std::vector<Instruction<T>> & instructions, UInt32 scale, String & out_template) const
{
static_assert(
format_syntax == FormatDateTimeTraits::FormatSyntax::MySQL || format_syntax == FormatDateTimeTraits::FormatSyntax::Joda,
"format syntax must be one of MySQL or Joda");
if constexpr (format_syntax == FormatDateTimeTraits::FormatSyntax::MySQL)
return parseMySQLFormat(format, instructions, scale, out_template);
else if constexpr (format_syntax == FormatDateTimeTraits::FormatSyntax::Joda)
return parseJodaFormat(format, instructions, scale, out_template);
else
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Unknown datetime format style {} in function {}",
magic_enum::enum_name(format_syntax),
getName());
return parseJodaFormat(format, instructions, scale, out_template);
}
template <typename T>
size_t parseMySQLFormat(const String & format, std::vector<Action<T>> & instructions, UInt32 scale, String & out_template) const
size_t parseMySQLFormat(const String & format, std::vector<Instruction<T>> & instructions, UInt32 scale, String & out_template) const
{
auto add_extra_shift = [&](size_t amount)
{
if (instructions.empty())
instructions.emplace_back(&Action<T>::mysqlNoop);
instructions.emplace_back(&Instruction<T>::mysqlNoop);
instructions.back().extra_shift += amount;
};
@ -931,7 +894,7 @@ public:
};
const char * pos = format.data();
const char * const end = pos + format.size();
const char * const end = format.data() + format.size();
while (true)
{
@ -953,43 +916,43 @@ public:
{
// Abbreviated weekday [Mon...Sun]
case 'a':
instructions.emplace_back(&Action<T>::mysqlDayOfWeekTextShort);
instructions.emplace_back(&Instruction<T>::mysqlDayOfWeekTextShort);
out_template += "Mon";
break;
// Abbreviated month [Jan...Dec]
case 'b':
instructions.emplace_back(&Action<T>::mysqlMonthOfYearTextShort);
instructions.emplace_back(&Instruction<T>::mysqlMonthOfYearTextShort);
out_template += "Jan";
break;
// Month as a decimal number (01-12)
// Month as a integer number (01-12)
case 'c':
instructions.emplace_back(&Action<T>::mysqlMonth);
instructions.emplace_back(&Instruction<T>::mysqlMonth);
out_template += "00";
break;
// Year, divided by 100, zero-padded
case 'C':
instructions.emplace_back(&Action<T>::mysqlCentury);
instructions.emplace_back(&Instruction<T>::mysqlCentury);
out_template += "00";
break;
// Day of month, zero-padded (01-31)
case 'd':
instructions.emplace_back(&Action<T>::mysqlDayOfMonth);
instructions.emplace_back(&Instruction<T>::mysqlDayOfMonth);
out_template += "00";
break;
// Short MM/DD/YY date, equivalent to %m/%d/%y
case 'D':
instructions.emplace_back(&Action<T>::mysqlAmericanDate);
instructions.emplace_back(&Instruction<T>::mysqlAmericanDate);
out_template += "00/00/00";
break;
// Day of month, space-padded ( 1-31) 23
case 'e':
instructions.emplace_back(&Action<T>::mysqlDayOfMonthSpacePadded);
instructions.emplace_back(&Instruction<T>::mysqlDayOfMonthSpacePadded);
out_template += " 0";
break;
@ -997,86 +960,86 @@ public:
case 'f':
{
/// If the time data type has no fractional part, then we print '0' as the fractional part.
instructions.emplace_back(&Action<T>::mysqlFractionalSecond);
instructions.emplace_back(&Instruction<T>::mysqlFractionalSecond);
out_template += String(std::max<UInt32>(1, scale), '0');
break;
}
// Short YYYY-MM-DD date, equivalent to %Y-%m-%d 2001-08-23
case 'F':
instructions.emplace_back(&Action<T>::mysqlISO8601Date);
instructions.emplace_back(&Instruction<T>::mysqlISO8601Date);
out_template += "0000-00-00";
break;
// Last two digits of year of ISO 8601 week number (see %G)
case 'g':
instructions.emplace_back(&Action<T>::mysqlISO8601Year2);
instructions.emplace_back(&Instruction<T>::mysqlISO8601Year2);
out_template += "00";
break;
// Year of ISO 8601 week number (see %V)
case 'G':
instructions.emplace_back(&Action<T>::mysqlISO8601Year4);
instructions.emplace_back(&Instruction<T>::mysqlISO8601Year4);
out_template += "0000";
break;
// Day of the year (001-366) 235
case 'j':
instructions.emplace_back(&Action<T>::mysqlDayOfYear);
instructions.emplace_back(&Instruction<T>::mysqlDayOfYear);
out_template += "000";
break;
// Month as a decimal number (01-12)
// Month as a integer number (01-12)
case 'm':
instructions.emplace_back(&Action<T>::mysqlMonth);
instructions.emplace_back(&Instruction<T>::mysqlMonth);
out_template += "00";
break;
// ISO 8601 weekday as number with Monday as 1 (1-7)
case 'u':
instructions.emplace_back(&Action<T>::mysqlDayOfWeek);
instructions.emplace_back(&Instruction<T>::mysqlDayOfWeek);
out_template += "0";
break;
// ISO 8601 week number (01-53)
case 'V':
instructions.emplace_back(&Action<T>::mysqlISO8601Week);
instructions.emplace_back(&Instruction<T>::mysqlISO8601Week);
out_template += "00";
break;
// Weekday as a decimal number with Sunday as 0 (0-6) 4
// Weekday as a integer number with Sunday as 0 (0-6) 4
case 'w':
instructions.emplace_back(&Action<T>::mysqlDayOfWeek0To6);
instructions.emplace_back(&Instruction<T>::mysqlDayOfWeek0To6);
out_template += "0";
break;
// Full weekday [Monday...Sunday]
case 'W':
instructions.emplace_back(&Action<T>::mysqlDayOfWeekTextLong);
instructions.emplace_back(&Instruction<T>::mysqlDayOfWeekTextLong);
out_template += "Monday";
break;
// Two digits year
case 'y':
instructions.emplace_back(&Action<T>::mysqlYear2);
instructions.emplace_back(&Instruction<T>::mysqlYear2);
out_template += "00";
break;
// Four digits year
case 'Y':
instructions.emplace_back(&Action<T>::mysqlYear4);
instructions.emplace_back(&Instruction<T>::mysqlYear4);
out_template += "0000";
break;
// Quarter (1-4)
case 'Q':
instructions.template emplace_back(&Action<T>::mysqlQuarter);
instructions.template emplace_back(&Instruction<T>::mysqlQuarter);
out_template += "0";
break;
// Offset from UTC timezone as +hhmm or -hhmm
case 'z':
instructions.emplace_back(&Action<T>::mysqlTimezoneOffset);
instructions.emplace_back(&Instruction<T>::mysqlTimezoneOffset);
out_template += "+0000";
break;
@ -1084,79 +1047,79 @@ public:
// Minute (00-59)
case 'M':
add_instruction_or_extra_shift(&Action<T>::mysqlMinute, 2);
add_instruction_or_extra_shift(&Instruction<T>::mysqlMinute, 2);
out_template += "00";
break;
// AM or PM
case 'p':
add_instruction_or_extra_shift(&Action<T>::mysqlAMPM, 2);
add_instruction_or_extra_shift(&Instruction<T>::mysqlAMPM, 2);
out_template += "AM";
break;
// 12-hour HH:MM time, equivalent to %h:%i %p 2:55 PM
case 'r':
add_instruction_or_extra_shift(&Action<T>::mysqlHHMM12, 8);
add_instruction_or_extra_shift(&Instruction<T>::mysqlHHMM12, 8);
out_template += "12:00 AM";
break;
// 24-hour HH:MM time, equivalent to %H:%i 14:55
case 'R':
add_instruction_or_extra_shift(&Action<T>::mysqlHHMM24, 5);
add_instruction_or_extra_shift(&Instruction<T>::mysqlHHMM24, 5);
out_template += "00:00";
break;
// Seconds
case 's':
add_instruction_or_extra_shift(&Action<T>::mysqlSecond, 2);
add_instruction_or_extra_shift(&Instruction<T>::mysqlSecond, 2);
out_template += "00";
break;
// Seconds
case 'S':
add_instruction_or_extra_shift(&Action<T>::mysqlSecond, 2);
add_instruction_or_extra_shift(&Instruction<T>::mysqlSecond, 2);
out_template += "00";
break;
// ISO 8601 time format (HH:MM:SS), equivalent to %H:%i:%S 14:55:02
case 'T':
add_instruction_or_extra_shift(&Action<T>::mysqlISO8601Time, 8);
add_instruction_or_extra_shift(&Instruction<T>::mysqlISO8601Time, 8);
out_template += "00:00:00";
break;
// Hour in 12h format (01-12)
case 'h':
add_instruction_or_extra_shift(&Action<T>::mysqlHour12, 2);
add_instruction_or_extra_shift(&Instruction<T>::mysqlHour12, 2);
out_template += "12";
break;
// Hour in 24h format (00-23)
case 'H':
add_instruction_or_extra_shift(&Action<T>::mysqlHour24, 2);
add_instruction_or_extra_shift(&Instruction<T>::mysqlHour24, 2);
out_template += "00";
break;
// Minute of hour range [0, 59]
case 'i':
add_instruction_or_extra_shift(&Action<T>::mysqlMinute, 2);
add_instruction_or_extra_shift(&Instruction<T>::mysqlMinute, 2);
out_template += "00";
break;
// Hour in 12h format (01-12)
case 'I':
add_instruction_or_extra_shift(&Action<T>::mysqlHour12, 2);
add_instruction_or_extra_shift(&Instruction<T>::mysqlHour12, 2);
out_template += "12";
break;
// Hour in 24h format (00-23)
case 'k':
add_instruction_or_extra_shift(&Action<T>::mysqlHour24, 2);
add_instruction_or_extra_shift(&Instruction<T>::mysqlHour24, 2);
out_template += "00";
break;
// Hour in 12h format (01-12)
case 'l':
add_instruction_or_extra_shift(&Action<T>::mysqlHour12, 2);
add_instruction_or_extra_shift(&Instruction<T>::mysqlHour12, 2);
out_template += "12";
break;
@ -1209,7 +1172,7 @@ public:
}
template <typename T>
size_t parseJodaFormat(const String & format, std::vector<Action<T>> & instructions, UInt32, String &) const
size_t parseJodaFormat(const String & format, std::vector<Instruction<T>> & instructions, UInt32, String &) const
{
/// If the argument was DateTime, add instruction for printing. If it was date, just append default literal
auto add_instruction = [&](auto && func [[maybe_unused]], const String & default_literal [[maybe_unused]])
@ -1217,13 +1180,12 @@ public:
if constexpr (std::is_same_v<T, UInt32> || std::is_same_v<T, Int64>)
instructions.emplace_back(func);
else
instructions.emplace_back(std::bind_front(&Action<T>::template jodaLiteral<String>, default_literal));
instructions.emplace_back(std::bind_front(&Instruction<T>::template jodaLiteral<String>, default_literal));
};
size_t reserve_size = 0;
const char * pos = format.data();
const char * end = pos + format.size();
const char * end = format.data() + format.size();
while (pos < end)
{
const char * cur_token = pos;
@ -1235,7 +1197,7 @@ public:
if (pos + 1 < end && *(pos + 1) == '\'')
{
std::string_view literal(cur_token, 1);
instructions.emplace_back(std::bind_front(&Action<T>::template jodaLiteral<decltype(literal)>, literal));
instructions.emplace_back(std::bind_front(&Instruction<T>::template jodaLiteral<decltype(literal)>, literal));
++reserve_size;
pos += 2;
}
@ -1251,7 +1213,7 @@ public:
{
std::string_view literal(cur_token + i, 1);
instructions.emplace_back(
std::bind_front(&Action<T>::template jodaLiteral<decltype(literal)>, literal));
std::bind_front(&Instruction<T>::template jodaLiteral<decltype(literal)>, literal));
++reserve_size;
if (*(cur_token + i) == '\'')
i += 1;
@ -1272,115 +1234,115 @@ public:
switch (*cur_token)
{
case 'G':
instructions.emplace_back(std::bind_front(&Action<T>::jodaEra, repetitions));
instructions.emplace_back(std::bind_front(&Instruction<T>::jodaEra, repetitions));
reserve_size += repetitions <= 3 ? 2 : 13;
break;
case 'C':
instructions.emplace_back(std::bind_front(&Action<T>::jodaCenturyOfEra, repetitions));
instructions.emplace_back(std::bind_front(&Instruction<T>::jodaCenturyOfEra, repetitions));
/// Year range [1900, 2299]
reserve_size += std::max(repetitions, 2);
break;
case 'Y':
instructions.emplace_back(std::bind_front(&Action<T>::jodaYearOfEra, repetitions));
instructions.emplace_back(std::bind_front(&Instruction<T>::jodaYearOfEra, repetitions));
/// Year range [1900, 2299]
reserve_size += repetitions == 2 ? 2 : std::max(repetitions, 4);
break;
case 'x':
instructions.emplace_back(std::bind_front(&Action<T>::jodaWeekYear, repetitions));
instructions.emplace_back(std::bind_front(&Instruction<T>::jodaWeekYear, repetitions));
/// weekyear range [1900, 2299]
reserve_size += std::max(repetitions, 4);
break;
case 'w':
instructions.emplace_back(std::bind_front(&Action<T>::jodaWeekOfWeekYear, repetitions));
instructions.emplace_back(std::bind_front(&Instruction<T>::jodaWeekOfWeekYear, repetitions));
/// Week of weekyear range [1, 52]
reserve_size += std::max(repetitions, 2);
break;
case 'e':
instructions.emplace_back(std::bind_front(&Action<T>::jodaDayOfWeek1Based, repetitions));
instructions.emplace_back(std::bind_front(&Instruction<T>::jodaDayOfWeek1Based, repetitions));
/// Day of week range [1, 7]
reserve_size += std::max(repetitions, 1);
break;
case 'E':
instructions.emplace_back(std::bind_front(&Action<T>::jodaDayOfWeekText, repetitions));
instructions.emplace_back(std::bind_front(&Instruction<T>::jodaDayOfWeekText, repetitions));
/// Maximum length of short name is 3, maximum length of full name is 9.
reserve_size += repetitions <= 3 ? 3 : 9;
break;
case 'y':
instructions.emplace_back(std::bind_front(&Action<T>::jodaYear, repetitions));
instructions.emplace_back(std::bind_front(&Instruction<T>::jodaYear, repetitions));
/// Year range [1900, 2299]
reserve_size += repetitions == 2 ? 2 : std::max(repetitions, 4);
break;
case 'D':
instructions.emplace_back(std::bind_front(&Action<T>::jodaDayOfYear, repetitions));
instructions.emplace_back(std::bind_front(&Instruction<T>::jodaDayOfYear, repetitions));
/// Day of year range [1, 366]
reserve_size += std::max(repetitions, 3);
break;
case 'M':
if (repetitions <= 2)
{
instructions.emplace_back(std::bind_front(&Action<T>::jodaMonthOfYear, repetitions));
instructions.emplace_back(std::bind_front(&Instruction<T>::jodaMonthOfYear, repetitions));
/// Month of year range [1, 12]
reserve_size += 2;
}
else
{
instructions.emplace_back(std::bind_front(&Action<T>::jodaMonthOfYearText, repetitions));
instructions.emplace_back(std::bind_front(&Instruction<T>::jodaMonthOfYearText, repetitions));
/// Maximum length of short name is 3, maximum length of full name is 9.
reserve_size += repetitions <= 3 ? 3 : 9;
}
break;
case 'd':
instructions.emplace_back(std::bind_front(&Action<T>::jodaDayOfMonth, repetitions));
instructions.emplace_back(std::bind_front(&Instruction<T>::jodaDayOfMonth, repetitions));
/// Day of month range [1, 3]
reserve_size += std::max(repetitions, 3);
break;
case 'a':
/// Default half day of day is "AM"
add_instruction(std::bind_front(&Action<T>::jodaHalfDayOfDay, repetitions), "AM");
add_instruction(std::bind_front(&Instruction<T>::jodaHalfDayOfDay, repetitions), "AM");
reserve_size += 2;
break;
case 'K':
/// Default hour of half day is 0
add_instruction(
std::bind_front(&Action<T>::jodaHourOfHalfDay, repetitions), padValue(0, repetitions));
std::bind_front(&Instruction<T>::jodaHourOfHalfDay, repetitions), padValue(0, repetitions));
/// Hour of half day range [0, 11]
reserve_size += std::max(repetitions, 2);
break;
case 'h':
/// Default clock hour of half day is 12
add_instruction(
std::bind_front(&Action<T>::jodaClockHourOfHalfDay, repetitions),
std::bind_front(&Instruction<T>::jodaClockHourOfHalfDay, repetitions),
padValue(12, repetitions));
/// Clock hour of half day range [1, 12]
reserve_size += std::max(repetitions, 2);
break;
case 'H':
/// Default hour of day is 0
add_instruction(std::bind_front(&Action<T>::jodaHourOfDay, repetitions), padValue(0, repetitions));
add_instruction(std::bind_front(&Instruction<T>::jodaHourOfDay, repetitions), padValue(0, repetitions));
/// Hour of day range [0, 23]
reserve_size += std::max(repetitions, 2);
break;
case 'k':
/// Default clock hour of day is 24
add_instruction(std::bind_front(&Action<T>::jodaClockHourOfDay, repetitions), padValue(24, repetitions));
add_instruction(std::bind_front(&Instruction<T>::jodaClockHourOfDay, repetitions), padValue(24, repetitions));
/// Clock hour of day range [1, 24]
reserve_size += std::max(repetitions, 2);
break;
case 'm':
/// Default minute of hour is 0
add_instruction(std::bind_front(&Action<T>::jodaMinuteOfHour, repetitions), padValue(0, repetitions));
add_instruction(std::bind_front(&Instruction<T>::jodaMinuteOfHour, repetitions), padValue(0, repetitions));
/// Minute of hour range [0, 59]
reserve_size += std::max(repetitions, 2);
break;
case 's':
/// Default second of minute is 0
add_instruction(std::bind_front(&Action<T>::jodaSecondOfMinute, repetitions), padValue(0, repetitions));
add_instruction(std::bind_front(&Instruction<T>::jodaSecondOfMinute, repetitions), padValue(0, repetitions));
/// Second of minute range [0, 59]
reserve_size += std::max(repetitions, 2);
break;
case 'S':
/// Default fraction of second is 0
instructions.emplace_back(std::bind_front(&Action<T>::jodaFractionOfSecond, repetitions));
instructions.emplace_back(std::bind_front(&Instruction<T>::jodaFractionOfSecond, repetitions));
/// 'S' repetitions range [0, 9]
reserve_size += repetitions <= 9 ? repetitions : 9;
break;
@ -1388,7 +1350,7 @@ public:
if (repetitions <= 3)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Short name time zone is not yet supported");
instructions.emplace_back(std::bind_front(&Action<T>::jodaTimezone, repetitions));
instructions.emplace_back(std::bind_front(&Instruction<T>::jodaTimezone, repetitions));
/// Longest length of full name of time zone is 32.
reserve_size += 32;
break;
@ -1399,7 +1361,7 @@ public:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "format is not supported for {}", String(cur_token, repetitions));
std::string_view literal(cur_token, pos - cur_token);
instructions.emplace_back(std::bind_front(&Action<T>::template jodaLiteral<decltype(literal)>, literal));
instructions.emplace_back(std::bind_front(&Instruction<T>::template jodaLiteral<decltype(literal)>, literal));
reserve_size += pos - cur_token;
break;
}

View File

@ -0,0 +1,44 @@
#pragma once
#include <base/types.h>
namespace DB
{
/// Counts the number of literal characters in Joda format string until the next closing literal
/// sequence single quote. Returns -1 if no literal single quote was found.
/// In Joda format string(https://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html)
/// literal content must be quoted with single quote. and two single quote means literal with one single quote.
/// For example:
/// Format string: "'aaaa'", unescaped literal: "aaaa";
/// Format string: "'aa''aa'", unescaped literal: "aa'aa";
/// Format string: "'aaa''aa" is not valid because of missing of end single quote.
inline Int64 numLiteralChars(const char * cur, const char * end)
{
bool found = false;
Int64 count = 0;
while (cur < end)
{
if (*cur == '\'')
{
if (cur + 1 < end && *(cur + 1) == '\'')
{
count += 2;
cur += 2;
}
else
{
found = true;
break;
}
}
else
{
++count;
++cur;
}
}
return found ? count : -1;
}
}

File diff suppressed because it is too large Load Diff

View File

@ -159,6 +159,8 @@ namespace detail
if (out_stream_callback)
request.setChunkedTransferEncoding(true);
else if (method == Poco::Net::HTTPRequest::HTTP_POST)
request.setContentLength(0); /// No callback - no body
for (auto & [header, value] : http_header_entries)
request.set(header, value);

View File

@ -23,6 +23,8 @@
namespace ProfileEvents
{
extern const Event WriteBufferFromS3Bytes;
extern const Event WriteBufferFromS3Microseconds;
extern const Event WriteBufferFromS3RequestsErrors;
extern const Event S3WriteBytes;
extern const Event S3CreateMultipartUpload;
@ -200,7 +202,11 @@ void WriteBufferFromS3::createMultipartUpload()
if (write_settings.for_object_storage)
ProfileEvents::increment(ProfileEvents::DiskS3CreateMultipartUpload);
Stopwatch watch;
auto outcome = client_ptr->CreateMultipartUpload(req);
watch.stop();
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
if (outcome.IsSuccess())
{
@ -208,7 +214,10 @@ void WriteBufferFromS3::createMultipartUpload()
LOG_TRACE(log, "Multipart upload has created. Bucket: {}, Key: {}, Upload id: {}", bucket, key, multipart_upload_id);
}
else
{
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1);
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
}
}
void WriteBufferFromS3::writePart()
@ -345,9 +354,13 @@ void WriteBufferFromS3::processUploadRequest(UploadPartTask & task)
ResourceCost cost = task.req.GetContentLength();
ResourceGuard rlock(write_settings.resource_link, cost);
Stopwatch watch;
auto outcome = client_ptr->UploadPart(task.req);
watch.stop();
rlock.unlock(); // Avoid acquiring other locks under resource lock
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
if (outcome.IsSuccess())
{
task.tag = outcome.GetResult().GetETag();
@ -356,6 +369,7 @@ void WriteBufferFromS3::processUploadRequest(UploadPartTask & task)
}
else
{
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1);
write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
}
@ -391,25 +405,34 @@ void WriteBufferFromS3::completeMultipartUpload()
if (write_settings.for_object_storage)
ProfileEvents::increment(ProfileEvents::DiskS3CompleteMultipartUpload);
Stopwatch watch;
auto outcome = client_ptr->CompleteMultipartUpload(req);
watch.stop();
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
if (outcome.IsSuccess())
{
LOG_TRACE(log, "Multipart upload has completed. Bucket: {}, Key: {}, Upload_id: {}, Parts: {}", bucket, key, multipart_upload_id, tags.size());
return;
}
else if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY)
{
/// For unknown reason, at least MinIO can respond with NO_SUCH_KEY for put requests
/// BTW, NO_SUCH_UPLOAD is expected error and we shouldn't retry it
LOG_INFO(log, "Multipart upload failed with NO_SUCH_KEY error for Bucket: {}, Key: {}, Upload_id: {}, Parts: {}, will retry", bucket, key, multipart_upload_id, tags.size());
}
else
{
throw S3Exception(
outcome.GetError().GetErrorType(),
"Message: {}, Key: {}, Bucket: {}, Tags: {}",
outcome.GetError().GetMessage(), key, bucket, fmt::join(tags.begin(), tags.end(), " "));
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1);
if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY)
{
/// For unknown reason, at least MinIO can respond with NO_SUCH_KEY for put requests
/// BTW, NO_SUCH_UPLOAD is expected error and we shouldn't retry it
LOG_INFO(log, "Multipart upload failed with NO_SUCH_KEY error for Bucket: {}, Key: {}, Upload_id: {}, Parts: {}, will retry", bucket, key, multipart_upload_id, tags.size());
}
else
{
throw S3Exception(
outcome.GetError().GetErrorType(),
"Message: {}, Key: {}, Bucket: {}, Tags: {}",
outcome.GetError().GetMessage(), key, bucket, fmt::join(tags.begin(), tags.end(), " "));
}
}
}
@ -506,28 +529,36 @@ void WriteBufferFromS3::processPutRequest(const PutObjectTask & task)
ResourceCost cost = task.req.GetContentLength();
ResourceGuard rlock(write_settings.resource_link, cost);
Stopwatch watch;
auto outcome = client_ptr->PutObject(task.req);
watch.stop();
rlock.unlock();
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());
bool with_pool = static_cast<bool>(schedule);
if (outcome.IsSuccess())
{
LOG_TRACE(log, "Single part upload has completed. Bucket: {}, Key: {}, Object size: {}, WithPool: {}", bucket, key, task.req.GetContentLength(), with_pool);
return;
}
else if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY)
{
write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure
/// For unknown reason, at least MinIO can respond with NO_SUCH_KEY for put requests
LOG_INFO(log, "Single part upload failed with NO_SUCH_KEY error for Bucket: {}, Key: {}, Object size: {}, WithPool: {}, will retry", bucket, key, task.req.GetContentLength(), with_pool);
}
else
{
write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure
throw S3Exception(
outcome.GetError().GetErrorType(),
"Message: {}, Key: {}, Bucket: {}, Object size: {}, WithPool: {}",
outcome.GetError().GetMessage(), key, bucket, task.req.GetContentLength(), with_pool);
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3RequestsErrors, 1);
if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY)
{
write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure
/// For unknown reason, at least MinIO can respond with NO_SUCH_KEY for put requests
LOG_INFO(log, "Single part upload failed with NO_SUCH_KEY error for Bucket: {}, Key: {}, Object size: {}, WithPool: {}, will retry", bucket, key, task.req.GetContentLength(), with_pool);
}
else
{
write_settings.resource_link.accumulate(cost); // We assume no resource was used in case of failure
throw S3Exception(
outcome.GetError().GetErrorType(),
"Message: {}, Key: {}, Bucket: {}, Object size: {}, WithPool: {}",
outcome.GetError().GetMessage(), key, bucket, task.req.GetContentLength(), with_pool);
}
}
}

View File

@ -6,6 +6,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ZLIB_INFLATE_FAILED;
extern const int ARGUMENT_OUT_OF_BOUND;
}
ZlibInflatingReadBuffer::ZlibInflatingReadBuffer(
@ -17,6 +18,11 @@ ZlibInflatingReadBuffer::ZlibInflatingReadBuffer(
: CompressedReadBufferWrapper(std::move(in_), buf_size, existing_memory, alignment)
, eof_flag(false)
{
if (buf_size > max_buffer_size)
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND,
"Zlib does not support decompression with buffer size greater than {}, got buffer size: {}",
max_buffer_size, buf_size);
zstr.zalloc = nullptr;
zstr.zfree = nullptr;
zstr.opaque = nullptr;
@ -31,10 +37,7 @@ ZlibInflatingReadBuffer::ZlibInflatingReadBuffer(
window_bits += 16;
}
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wold-style-cast"
int rc = inflateInit2(&zstr, window_bits);
#pragma GCC diagnostic pop
if (rc != Z_OK)
throw Exception(ErrorCodes::ZLIB_INFLATE_FAILED, "inflateInit2 failed: {}; zlib version: {}.", zError(rc), ZLIB_VERSION);
@ -61,16 +64,22 @@ bool ZlibInflatingReadBuffer::nextImpl()
{
in->nextIfAtEnd();
zstr.next_in = reinterpret_cast<unsigned char *>(in->position());
zstr.avail_in = static_cast<unsigned>(in->buffer().end() - in->position());
zstr.avail_in = static_cast<BufferSizeType>(std::min(
static_cast<UInt64>(in->buffer().end() - in->position()),
static_cast<UInt64>(max_buffer_size)));
}
/// init output bytes (place, where decompressed data will be)
zstr.next_out = reinterpret_cast<unsigned char *>(internal_buffer.begin());
zstr.avail_out = static_cast<unsigned>(internal_buffer.size());
zstr.avail_out = static_cast<BufferSizeType>(internal_buffer.size());
size_t old_total_in = zstr.total_in;
int rc = inflate(&zstr, Z_NO_FLUSH);
/// move in stream on place, where reading stopped
in->position() = in->buffer().end() - zstr.avail_in;
size_t bytes_read = zstr.total_in - old_total_in;
in->position() += bytes_read;
/// change size of working buffer (it's size equal to internal_buffer size without unused uncompressed values)
working_buffer.resize(internal_buffer.size() - zstr.avail_out);
@ -94,9 +103,10 @@ bool ZlibInflatingReadBuffer::nextImpl()
return true;
}
}
/// If it is not end and not OK, something went wrong, throw exception
if (rc != Z_OK)
throw Exception(ErrorCodes::ZLIB_INFLATE_FAILED, "inflateReset failed: {}", zError(rc));
throw Exception(ErrorCodes::ZLIB_INFLATE_FAILED, "inflate failed: {}", zError(rc));
}
while (working_buffer.empty());

View File

@ -4,6 +4,7 @@
#include <IO/CompressedReadBufferWrapper.h>
#include <IO/CompressionMethod.h>
#include <limits>
#include <zlib.h>
@ -33,6 +34,11 @@ private:
z_stream zstr;
bool eof_flag;
/// Limit size of buffer because zlib uses
/// UInt32 for sizes of internal buffers.
using BufferSizeType = decltype(zstr.avail_in);
static constexpr auto max_buffer_size = std::numeric_limits<BufferSizeType>::max();
};
}

View File

@ -2874,8 +2874,10 @@ void InterpreterSelectQuery::executeMergeSorted(QueryPlan & query_plan, const st
SortDescription sort_description = getSortDescription(query, context);
const UInt64 limit = getLimitForSorting(query, context);
const auto max_block_size = context->getSettingsRef().max_block_size;
const auto exact_rows_before_limit = context->getSettingsRef().exact_rows_before_limit;
auto merging_sorted = std::make_unique<SortingStep>(query_plan.getCurrentDataStream(), std::move(sort_description), max_block_size, limit);
auto merging_sorted = std::make_unique<SortingStep>(
query_plan.getCurrentDataStream(), std::move(sort_description), max_block_size, limit, exact_rows_before_limit);
merging_sorted->setStepDescription("Merge sorted streams " + description);
query_plan.addStep(std::move(merging_sorted));
}

View File

@ -525,7 +525,8 @@ void addMergeSortingStep(QueryPlan & query_plan,
auto merging_sorted = std::make_unique<SortingStep>(query_plan.getCurrentDataStream(),
sort_description,
max_block_size,
query_analysis_result.partial_sorting_limit);
query_analysis_result.partial_sorting_limit,
settings.exact_rows_before_limit);
merging_sorted->setStepDescription("Merge sorted streams " + description);
query_plan.addStep(std::move(merging_sorted));
}

View File

@ -39,7 +39,7 @@ public:
virtual void setRowsBeforeLimit(size_t /*rows_before_limit*/) {}
/// Counter to calculate rows_before_limit_at_least in processors pipeline.
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit_counter.swap(counter); }
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit_counter.swap(counter); }
/// Notify about progress. Method could be called from different threads.
/// Passed value are delta, that must be summarized.

View File

@ -21,6 +21,9 @@ class IQueryPlanStep;
struct StorageLimits;
using StorageLimitsList = std::list<StorageLimits>;
class RowsBeforeLimitCounter;
using RowsBeforeLimitCounterPtr = std::shared_ptr<RowsBeforeLimitCounter>;
class IProcessor;
using ProcessorPtr = std::shared_ptr<IProcessor>;
using Processors = std::vector<ProcessorPtr>;
@ -357,6 +360,10 @@ public:
/// You should zero internal counters in the call, in order to make in idempotent.
virtual std::optional<ReadProgress> getReadProgress() { return std::nullopt; }
/// Set rows_before_limit counter for current processor.
/// This counter is used to calculate the number of rows right before any filtration of LimitTransform.
virtual void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr /* counter */) {}
protected:
virtual void onCancel() {}

View File

@ -183,7 +183,7 @@ LimitTransform::Status LimitTransform::preparePair(PortsData & data)
auto rows = data.current_chunk.getNumRows();
if (rows_before_limit_at_least)
if (rows_before_limit_at_least && !data.input_port_has_counter)
rows_before_limit_at_least->add(rows);
/// Skip block (for 'always_read_till_end' case).

View File

@ -41,6 +41,11 @@ private:
InputPort * input_port = nullptr;
OutputPort * output_port = nullptr;
bool is_finished = false;
/// This flag is used to avoid counting rows multiple times before applying a limit
/// condition, which can happen through certain input ports like PartialSortingTransform and
/// RemoteSource.
bool input_port_has_counter = false;
};
std::vector<PortsData> ports_data;
@ -66,7 +71,8 @@ public:
InputPort & getInputPort() { return inputs.front(); }
OutputPort & getOutputPort() { return outputs.front(); }
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit_at_least.swap(counter); }
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit_at_least.swap(counter); }
void setInputPortHasCounter(size_t pos) { ports_data[pos].input_port_has_counter = true; }
};
}

View File

@ -16,7 +16,7 @@ public:
const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -20,7 +20,7 @@ public:
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -20,7 +20,7 @@ public:
size_t max_block_size,
size_t max_block_bytes)
: IMergingTransform(
num_inputs, header, {}, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
num_inputs, header, {}, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
params,

View File

@ -15,7 +15,7 @@ public:
SortDescription description_, size_t max_block_size,
Graphite::Params params_, time_t time_of_merge_)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -14,10 +14,12 @@ IMergingTransformBase::IMergingTransformBase(
const Block & input_header,
const Block & output_header,
bool have_all_inputs_,
UInt64 limit_hint_)
UInt64 limit_hint_,
bool always_read_till_end_)
: IProcessor(InputPorts(num_inputs, input_header), {output_header})
, have_all_inputs(have_all_inputs_)
, limit_hint(limit_hint_)
, always_read_till_end(always_read_till_end_)
{
}
@ -33,10 +35,12 @@ IMergingTransformBase::IMergingTransformBase(
const Blocks & input_headers,
const Block & output_header,
bool have_all_inputs_,
UInt64 limit_hint_)
UInt64 limit_hint_,
bool always_read_till_end_)
: IProcessor(createPorts(input_headers), {output_header})
, have_all_inputs(have_all_inputs_)
, limit_hint(limit_hint_)
, always_read_till_end(always_read_till_end_)
{
}
@ -98,7 +102,7 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
/// (e.g. with optimized 'ORDER BY primary_key LIMIT n' and small 'n')
/// we won't have to read any chunks anymore;
auto chunk = input.pull(limit_hint != 0);
if (limit_hint && chunk.getNumRows() < limit_hint)
if ((limit_hint && chunk.getNumRows() < limit_hint) || always_read_till_end)
input.setNeeded();
if (!chunk.hasRows())
@ -164,6 +168,21 @@ IProcessor::Status IMergingTransformBase::prepare()
if (is_port_full)
return Status::PortFull;
if (always_read_till_end)
{
for (auto & input : inputs)
{
if (!input.isFinished())
{
input.setNeeded();
if (input.hasData())
std::ignore = input.pull();
return Status::NeedData;
}
}
}
for (auto & input : inputs)
input.close();

View File

@ -17,13 +17,15 @@ public:
const Block & input_header,
const Block & output_header,
bool have_all_inputs_,
UInt64 limit_hint_);
UInt64 limit_hint_,
bool always_read_till_end_);
IMergingTransformBase(
const Blocks & input_headers,
const Block & output_header,
bool have_all_inputs_,
UInt64 limit_hint_);
UInt64 limit_hint_,
bool always_read_till_end_);
OutputPort & getOutputPort() { return outputs.front(); }
@ -67,6 +69,7 @@ private:
std::atomic<bool> have_all_inputs;
bool is_initialized = false;
UInt64 limit_hint = 0;
bool always_read_till_end = false;
IProcessor::Status prepareInitializeInputs();
};
@ -83,8 +86,9 @@ public:
const Block & output_header,
bool have_all_inputs_,
UInt64 limit_hint_,
bool always_read_till_end_,
Args && ... args)
: IMergingTransformBase(num_inputs, input_header, output_header, have_all_inputs_, limit_hint_)
: IMergingTransformBase(num_inputs, input_header, output_header, have_all_inputs_, limit_hint_, always_read_till_end_)
, algorithm(std::forward<Args>(args) ...)
{
}
@ -95,9 +99,10 @@ public:
const Block & output_header,
bool have_all_inputs_,
UInt64 limit_hint_,
bool always_read_till_end_,
bool empty_chunk_on_finish_,
Args && ... args)
: IMergingTransformBase(input_headers, output_header, have_all_inputs_, limit_hint_)
: IMergingTransformBase(input_headers, output_header, have_all_inputs_, limit_hint_, always_read_till_end_)
, empty_chunk_on_finish(empty_chunk_on_finish_)
, algorithm(std::forward<Args>(args) ...)
{

View File

@ -14,6 +14,7 @@ MergingSortedTransform::MergingSortedTransform(
size_t max_block_size,
SortingQueueStrategy sorting_queue_strategy,
UInt64 limit_,
bool always_read_till_end_,
WriteBuffer * out_row_sources_buf_,
bool quiet_,
bool use_average_block_sizes,
@ -24,6 +25,7 @@ MergingSortedTransform::MergingSortedTransform(
header,
have_all_inputs_,
limit_,
always_read_till_end_,
header,
num_inputs,
description_,

View File

@ -18,6 +18,7 @@ public:
size_t max_block_size,
SortingQueueStrategy sorting_queue_strategy,
UInt64 limit_ = 0,
bool always_read_till_end_ = false,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool quiet_ = false,
bool use_average_block_sizes = false,

View File

@ -20,7 +20,7 @@ public:
bool use_average_block_sizes = false,
bool cleanup = false)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -19,7 +19,7 @@ public:
const Names & partition_key_columns,
size_t max_block_size)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -19,7 +19,7 @@ public:
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
std::move(description_),

View File

@ -45,7 +45,7 @@ public:
InputPort & getInputPort() { return inputs.front(); }
OutputPort & getOutputPort() { return outputs.front(); }
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit_at_least.swap(counter); }
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit_at_least.swap(counter); }
};
}

View File

@ -55,6 +55,10 @@ std::unique_ptr<QueryPlan> createLocalPlan(
auto query_plan = std::make_unique<QueryPlan>();
auto new_context = Context::createCopy(context);
/// Do not push down limit to local plan, as it will break `rows_before_limit_at_least` counter.
if (processed_stage == QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit)
processed_stage = QueryProcessingStage::WithMergeableStateAfterAggregation;
/// Do not apply AST optimizations, because query
/// is already optimized and some optimizations
/// can be applied only for non-distributed tables

View File

@ -98,11 +98,13 @@ SortingStep::SortingStep(
const DataStream & input_stream,
SortDescription sort_description_,
size_t max_block_size_,
UInt64 limit_)
UInt64 limit_,
bool always_read_till_end_)
: ITransformingStep(input_stream, input_stream.header, getTraits(limit_))
, type(Type::MergingSorted)
, result_description(std::move(sort_description_))
, limit(limit_)
, always_read_till_end(always_read_till_end_)
, sort_settings(max_block_size_)
{
sort_settings.max_block_size = max_block_size_;
@ -175,7 +177,8 @@ void SortingStep::mergingSorted(QueryPipelineBuilder & pipeline, const SortDescr
result_sort_desc,
sort_settings.max_block_size,
SortingQueueStrategy::Batch,
limit_);
limit_,
always_read_till_end);
pipeline.addTransform(std::move(transform));
}
@ -262,7 +265,13 @@ void SortingStep::fullSort(
if (pipeline.getNumStreams() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipeline.getHeader(), pipeline.getNumStreams(), result_sort_desc, sort_settings.max_block_size, SortingQueueStrategy::Batch, limit_);
pipeline.getHeader(),
pipeline.getNumStreams(),
result_sort_desc,
sort_settings.max_block_size,
SortingQueueStrategy::Batch,
limit_,
always_read_till_end);
pipeline.addTransform(std::move(transform));
}

View File

@ -53,7 +53,9 @@ public:
const DataStream & input_stream,
SortDescription sort_description_,
size_t max_block_size_,
UInt64 limit_ = 0);
UInt64 limit_ = 0,
bool always_read_till_end_ = false
);
String getName() const override { return "Sorting"; }
@ -100,6 +102,7 @@ private:
SortDescription prefix_description;
const SortDescription result_description;
UInt64 limit;
bool always_read_till_end = false;
Settings sort_settings;

View File

@ -107,8 +107,13 @@ std::optional<Chunk> RemoteSource::tryGenerate()
/// Get rows_before_limit result for remote query from ProfileInfo packet.
query_executor->setProfileInfoCallback([this](const ProfileInfo & info)
{
if (rows_before_limit && info.hasAppliedLimit())
rows_before_limit->set(info.getRowsBeforeLimit());
if (rows_before_limit)
{
if (info.hasAppliedLimit())
rows_before_limit->add(info.getRowsBeforeLimit());
else
manually_add_rows_before_limit_counter = true; /// Remote subquery doesn't contain a limit
}
});
if (async_query_sending)
@ -162,11 +167,15 @@ std::optional<Chunk> RemoteSource::tryGenerate()
if (!block)
{
if (manually_add_rows_before_limit_counter)
rows_before_limit->add(rows);
query_executor->finish();
return {};
}
UInt64 num_rows = block.rows();
rows += num_rows;
Chunk chunk(block.getColumns(), num_rows);
if (add_aggregation_info)

View File

@ -3,7 +3,7 @@
#include <Processors/ISource.h>
#include <Processors/RowsBeforeLimitCounter.h>
#include <QueryPipeline/Pipe.h>
#include "Core/UUID.h"
#include <Core/UUID.h>
#include <atomic>
namespace DB
@ -27,7 +27,7 @@ public:
void connectToScheduler(InputPort & input_port);
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { rows_before_limit.swap(counter); }
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { rows_before_limit.swap(counter); }
UUID getParallelReplicasGroupUUID();
@ -56,6 +56,8 @@ private:
bool is_async_state = false;
UUID uuid;
int fd = -1;
size_t rows = 0;
bool manually_add_rows_before_limit_counter = false;
};
/// Totals source from RemoteQueryExecutor.

View File

@ -126,7 +126,7 @@ ColumnGathererTransform::ColumnGathererTransform(
ReadBuffer & row_sources_buf_,
size_t block_preferred_size_)
: IMergingTransform<ColumnGathererStream>(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0,
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
num_inputs, row_sources_buf_, block_preferred_size_)
, log(&Poco::Logger::get("ColumnGathererStream"))
{

View File

@ -109,6 +109,7 @@ void FinishSortingTransform::generate()
generated_prefix = true;
}
// TODO: Here we should also consider LIMIT optimization.
generated_chunk = merge_sorter->read();
if (!generated_chunk)

View File

@ -844,6 +844,7 @@ MergeJoinTransform::MergeJoinTransform(
output_header,
/* have_all_inputs_= */ true,
limit_hint_,
/* always_read_till_end_= */ false,
/* empty_chunk_on_finish_= */ true,
table_join, input_headers, max_block_size)
, log(&Poco::Logger::get("MergeJoinTransform"))

View File

@ -187,6 +187,7 @@ void MergeSortingTransform::consume(Chunk chunk)
max_merged_block_size,
SortingQueueStrategy::Batch,
limit,
/*always_read_till_end_=*/ false,
nullptr,
quiet,
use_average_block_sizes,

View File

@ -20,7 +20,7 @@ public:
String getName() const override { return "PartialSortingTransform"; }
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) { read_rows.swap(counter); }
void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr counter) override { read_rows.swap(counter); }
protected:
void transform(Chunk & chunk) override;

View File

@ -42,6 +42,8 @@ public:
Status prepare() override;
void work() override;
bool hasFilter() const { return !filter_column_name.empty(); }
static Block transformHeader(Block block, const ActionsDAG * expression, const std::string & filter_column_name, bool remove_filter, bool final, const ColumnsMask & aggregates_mask);
protected:

View File

@ -20,6 +20,7 @@
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Transforms/StreamInQueryCacheTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
@ -129,50 +130,79 @@ static void checkCompleted(Processors & processors)
static void initRowsBeforeLimit(IOutputFormat * output_format)
{
RowsBeforeLimitCounterPtr rows_before_limit_at_least;
/// TODO: add setRowsBeforeLimitCounter as virtual method to IProcessor.
std::vector<LimitTransform *> limits;
std::vector<RemoteSource *> remote_sources;
std::vector<IProcessor *> processors;
std::map<LimitTransform *, std::vector<size_t>> limit_candidates;
std::unordered_set<IProcessor *> visited;
bool has_limit = false;
struct QueuedEntry
{
IProcessor * processor;
bool visited_limit;
LimitTransform * limit_processor;
ssize_t limit_input_port;
};
std::queue<QueuedEntry> queue;
queue.push({ output_format, false });
queue.push({ output_format, nullptr, -1 });
visited.emplace(output_format);
while (!queue.empty())
{
auto * processor = queue.front().processor;
auto visited_limit = queue.front().visited_limit;
auto * limit_processor = queue.front().limit_processor;
auto limit_input_port = queue.front().limit_input_port;
queue.pop();
if (!visited_limit)
/// Set counter based on the following cases:
/// 1. Remote: Set counter on Remote
/// 2. Limit ... PartialSorting: Set counter on PartialSorting
/// 3. Limit ... TotalsHaving(with filter) ... Remote: Set counter on the input port of Limit
/// 4. Limit ... Remote: Set counter on Remote
/// 5. Limit ... : Set counter on the input port of Limit
/// Case 1.
if (typeid_cast<RemoteSource *>(processor) && !limit_processor)
{
if (auto * limit = typeid_cast<LimitTransform *>(processor))
processors.emplace_back(processor);
continue;
}
if (auto * limit = typeid_cast<LimitTransform *>(processor))
{
has_limit = true;
/// Ignore child limits
if (limit_processor)
continue;
limit_processor = limit;
limit_candidates[limit_processor] = {};
}
else if (limit_processor)
{
/// Case 2.
if (typeid_cast<PartialSortingTransform *>(processor))
{
visited_limit = true;
limits.emplace_back(limit);
processors.emplace_back(processor);
limit_candidates[limit_processor].push_back(limit_input_port);
continue;
}
if (auto * source = typeid_cast<RemoteSource *>(processor))
remote_sources.emplace_back(source);
}
else if (auto * sorting = typeid_cast<PartialSortingTransform *>(processor))
{
if (!rows_before_limit_at_least)
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
/// Case 3.
if (auto * having = typeid_cast<TotalsHavingTransform *>(processor))
{
if (having->hasFilter())
continue;
}
sorting->setRowsBeforeLimitCounter(rows_before_limit_at_least);
/// Don't go to children. Take rows_before_limit from last PartialSortingTransform.
continue;
/// Case 4.
if (typeid_cast<RemoteSource *>(processor))
{
processors.emplace_back(processor);
limit_candidates[limit_processor].push_back(limit_input_port);
continue;
}
}
/// Skip totals and extremes port for output format.
@ -180,37 +210,58 @@ static void initRowsBeforeLimit(IOutputFormat * output_format)
{
auto * child_processor = &format->getPort(IOutputFormat::PortKind::Main).getOutputPort().getProcessor();
if (visited.emplace(child_processor).second)
queue.push({ child_processor, visited_limit });
queue.push({ child_processor, limit_processor, limit_input_port });
continue;
}
for (auto & child_port : processor->getInputs())
if (limit_processor == processor)
{
auto * child_processor = &child_port.getOutputPort().getProcessor();
if (visited.emplace(child_processor).second)
queue.push({ child_processor, visited_limit });
ssize_t i = 0;
for (auto & child_port : processor->getInputs())
{
auto * child_processor = &child_port.getOutputPort().getProcessor();
if (visited.emplace(child_processor).second)
queue.push({ child_processor, limit_processor, i });
++i;
}
}
else
{
for (auto & child_port : processor->getInputs())
{
auto * child_processor = &child_port.getOutputPort().getProcessor();
if (visited.emplace(child_processor).second)
queue.push({ child_processor, limit_processor, limit_input_port });
}
}
}
if (!rows_before_limit_at_least && (!limits.empty() || !remote_sources.empty()))
/// Case 5.
for (auto && [limit, ports] : limit_candidates)
{
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
for (auto & limit : limits)
limit->setRowsBeforeLimitCounter(rows_before_limit_at_least);
for (auto & source : remote_sources)
source->setRowsBeforeLimitCounter(rows_before_limit_at_least);
/// If there are some input ports which don't have the counter, add it to LimitTransform.
if (ports.size() < limit->getInputs().size())
{
processors.push_back(limit);
for (auto port : ports)
limit->setInputPortHasCounter(port);
}
}
/// If there is a limit, then enable rows_before_limit_at_least
/// It is needed when zero rows is read, but we still want rows_before_limit_at_least in result.
if (!limits.empty())
rows_before_limit_at_least->add(0);
if (!processors.empty())
{
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
for (auto & processor : processors)
processor->setRowsBeforeLimitCounter(rows_before_limit_at_least);
/// If there is a limit, then enable rows_before_limit_at_least
/// It is needed when zero rows is read, but we still want rows_before_limit_at_least in result.
if (has_limit)
rows_before_limit_at_least->add(0);
if (rows_before_limit_at_least)
output_format->setRowsBeforeLimitCounter(rows_before_limit_at_least);
}
}

View File

@ -83,7 +83,7 @@ TEST(MergingSortedTest, SimpleBlockSizeTest)
EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
DEFAULT_MERGE_BLOCK_SIZE, SortingQueueStrategy::Batch, 0, nullptr, false, true);
DEFAULT_MERGE_BLOCK_SIZE, SortingQueueStrategy::Batch, 0, false, nullptr, false, true);
pipe.addTransform(std::move(transform));
@ -125,7 +125,7 @@ TEST(MergingSortedTest, MoreInterestingBlockSizes)
EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
DEFAULT_MERGE_BLOCK_SIZE, SortingQueueStrategy::Batch, 0, nullptr, false, true);
DEFAULT_MERGE_BLOCK_SIZE, SortingQueueStrategy::Batch, 0, false, nullptr, false, true);
pipe.addTransform(std::move(transform));

View File

@ -63,11 +63,9 @@ HTTPServerRequest::HTTPServerRequest(HTTPContextPtr context, HTTPServerResponse
}
else if (getMethod() != HTTPRequest::HTTP_GET && getMethod() != HTTPRequest::HTTP_HEAD && getMethod() != HTTPRequest::HTTP_DELETE)
{
/// That check for has_body may be false-negative in rare cases, but it's okay
bool has_body = in->hasPendingData();
stream = std::move(in);
if (!startsWith(getContentType(), "multipart/form-data") && has_body)
LOG_WARNING(&Poco::Logger::get("HTTPServerRequest"), "Got an HTTP request with no content length "
if (!startsWith(getContentType(), "multipart/form-data"))
LOG_WARNING(LogFrequencyLimiter(&Poco::Logger::get("HTTPServerRequest"), 10), "Got an HTTP request with no content length "
"and no chunked/multipart encoding, it may be impossible to distinguish graceful EOF from abnormal connection loss");
}
else

View File

@ -58,6 +58,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int TIMEOUT_EXCEEDED;
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
extern const int ABORTED;
}
static Block adoptBlock(const Block & header, const Block & block, Poco::Logger * log)
@ -295,6 +296,10 @@ DistributedSink::runWritingJob(JobReplica & job, const Block & current_block, si
auto thread_group = CurrentThread::getGroup();
return [this, thread_group, &job, &current_block, num_shards]()
{
/// Avoid Logical error: 'Pipeline for PushingPipelineExecutor was finished before all data was inserted' (whatever it means)
if (isCancelled())
throw Exception(ErrorCodes::ABORTED, "Writing job was cancelled");
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();

View File

@ -929,7 +929,16 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
{
case MergeTreeData::MergingParams::Ordinary:
merged_transform = std::make_shared<MergingSortedTransform>(
header, pipes.size(), sort_description, merge_block_size, SortingQueueStrategy::Default, 0, ctx->rows_sources_write_buf.get(), true, ctx->blocks_are_granules_size);
header,
pipes.size(),
sort_description,
merge_block_size,
SortingQueueStrategy::Default,
/* limit_= */0,
/* always_read_till_end_= */false,
ctx->rows_sources_write_buf.get(),
true,
ctx->blocks_are_granules_size);
break;
case MergeTreeData::MergingParams::Collapsing:

View File

@ -76,50 +76,52 @@ std::unique_lock<std::mutex> ReplicatedMergeTreePartCheckThread::pausePartsCheck
void ReplicatedMergeTreePartCheckThread::cancelRemovedPartsCheck(const MergeTreePartInfo & drop_range_info)
{
Strings removed_names;
Strings parts_to_remove;
{
std::lock_guard lock(parts_mutex);
removed_names.reserve(parts_queue.size()); /// Avoid memory limit in the middle
for (auto it = parts_queue.begin(); it != parts_queue.end();)
{
if (drop_range_info.contains(MergeTreePartInfo::fromPartName(it->first, storage.format_version)))
{
/// Remove part from the queue to avoid part resurrection
/// if we will check it and enqueue fetch after DROP/REPLACE execution.
removed_names.push_back(it->first);
parts_set.erase(it->first);
it = parts_queue.erase(it);
}
else
{
++it;
}
}
for (const auto & elem : parts_queue)
if (drop_range_info.contains(MergeTreePartInfo::fromPartName(elem.first, storage.format_version)))
parts_to_remove.push_back(elem.first);
}
/// This filtering is not necessary
auto new_end = std::remove_if(removed_names.begin(), removed_names.end(), [this](const String & part_name)
/// We have to remove parts that were not removed by removePartAndEnqueueFetch
LOG_INFO(log, "Removing broken parts from ZooKeeper: {}", fmt::join(parts_to_remove, ", "));
storage.removePartsFromZooKeeperWithRetries(parts_to_remove); /// May throw
/// Now we can remove parts from the check queue.
/// It's not atomic (because it's bad idea to hold the mutex while removing something from zk with retries),
/// but the check thread is currently paused, and no new parts in drop_range_info can by enqueued
/// while the corresponding DROP_RANGE/REPLACE_RANGE exists, so it should be okay. We will recheck it just in case.
StringSet removed_parts;
for (auto & part : parts_to_remove)
removed_parts.emplace(std::move(part));
size_t count = 0;
std::lock_guard lock(parts_mutex);
for (const auto & elem : parts_queue)
{
auto part = storage.getPartIfExists(part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated, MergeTreeDataPartState::Deleting});
/// The rest of parts will be removed normally
return part && !part->outdated_because_broken;
bool is_removed = removed_parts.contains(elem.first);
bool should_have_been_removed = drop_range_info.contains(MergeTreePartInfo::fromPartName(elem.first, storage.format_version));
if (is_removed != should_have_been_removed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Inconsistent parts_queue: name={}, is_removed={}, should_have_been_removed={}",
elem.first, is_removed, should_have_been_removed);
count += is_removed;
}
if (count != parts_to_remove.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected number of parts to remove from parts_queue: should be {}, got {}",
parts_to_remove.size(), count);
auto new_end = std::remove_if(parts_queue.begin(), parts_queue.end(), [&removed_parts] (const auto & elem)
{
return removed_parts.contains(elem.first);
});
removed_names.erase(new_end, removed_names.end());
if (removed_names.empty())
return;
try
{
/// We have to remove parts that were not removed by removePartAndEnqueueFetch
LOG_INFO(log, "Removing broken parts from ZooKeeper: {}", fmt::join(removed_names, ", "));
storage.removePartsFromZooKeeperWithRetries(removed_names, /* max_retries */ 100);
}
catch (...)
{
/// It's highly unlikely to happen on normal use cases. And if it happens it's easier to restart and reinitialize
LOG_FATAL(log, "Failed to remove parts [{}] from ZooKeeper: {}", fmt::join(removed_names, ", "), getCurrentExceptionMessage(/* with_stacktrace = */ true));
std::terminate();
}
parts_queue.erase(new_end, parts_queue.end());
for (const auto & elem : removed_parts)
parts_set.erase(elem);
}
size_t ReplicatedMergeTreePartCheckThread::size() const

View File

@ -49,6 +49,7 @@ StorageS3Cluster::StorageS3Cluster(
ContextPtr context_,
bool structure_argument_was_provided_)
: IStorageCluster(table_id_)
, log(&Poco::Logger::get("StorageS3Cluster (" + table_id_.table_name + ")"))
, s3_configuration{configuration_}
, cluster_name(configuration_.cluster_name)
, format_name(configuration_.format)
@ -156,6 +157,7 @@ Pipe StorageS3Cluster::read(
processed_stage,
extension);
remote_query_executor->setLogger(log);
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, add_agg_info, false, false));
}
}

View File

@ -48,6 +48,7 @@ public:
ClusterPtr getCluster(ContextPtr context) const override;
private:
Poco::Logger * log;
StorageS3::Configuration s3_configuration;
String cluster_name;
String format_name;

View File

@ -141,7 +141,6 @@ def prepare_tests_results_for_clickhouse(
report_url: str,
check_name: str,
) -> List[dict]:
pull_request_url = "https://github.com/ClickHouse/ClickHouse/commits/master"
base_ref = "master"
head_ref = "master"

View File

@ -96,7 +96,6 @@ def get_images_dict(repo_path: str, image_file_path: str) -> ImagesDict:
def get_changed_docker_images(
pr_info: PRInfo, images_dict: ImagesDict
) -> Set[DockerImage]:
if not images_dict:
return set()

View File

@ -51,7 +51,6 @@ def find_previous_release(
for release in releases:
if release.version < server_version:
# Check if the artifact exists on GitHub.
# It can be not true for a short period of time
# after creating a tag for a new release before uploading the packages.

View File

@ -473,7 +473,7 @@ def create_build_html_report(
commit_url: str,
) -> str:
rows = ""
for (build_result, build_log_url, artifact_urls) in zip(
for build_result, build_log_url, artifact_urls in zip(
build_results, build_logs_urls, artifact_urls_list
):
row = "<tr>"

View File

@ -6,6 +6,7 @@ import os
import argparse
import logging
import time
import random
def get_options(i, upgrade_check):
@ -43,6 +44,10 @@ def get_options(i, upgrade_check):
client_options.append("join_algorithm='auto'")
client_options.append("max_rows_in_join=1000")
if i > 0 and random.random() < 1 / 3:
client_options.append("allow_experimental_query_cache=1")
client_options.append("use_query_cache=1")
if i % 5 == 1:
client_options.append("memory_tracker_fault_probability=0.001")

View File

@ -77,7 +77,7 @@ def trim_for_log(s):
return s
lines = s.splitlines()
if len(lines) > 10000:
separator = "-" * 40 + str(len(lines) - 10000) + " lines are hidden" + "-" * 40
separator = "-" * 40 + str(len(lines) - 10000) + " lines are hidden" + "-" * 40
return "\n".join(lines[:5000] + [] + [separator] + [] + lines[-5000:])
else:
return "\n".join(lines)
@ -95,7 +95,13 @@ class HTTPError(Exception):
# Helpers to execute queries via HTTP interface.
def clickhouse_execute_http(
base_args, query, timeout=30, settings=None, default_format=None, max_http_retries=5, retry_error_codes=False
base_args,
query,
timeout=30,
settings=None,
default_format=None,
max_http_retries=5,
retry_error_codes=False,
):
if args.secure:
client = http.client.HTTPSConnection(
@ -146,12 +152,36 @@ def clickhouse_execute_http(
return data
def clickhouse_execute(base_args, query, timeout=30, settings=None, max_http_retries=5, retry_error_codes=False):
return clickhouse_execute_http(base_args, query, timeout, settings, max_http_retries=max_http_retries, retry_error_codes=retry_error_codes).strip()
def clickhouse_execute(
base_args,
query,
timeout=30,
settings=None,
max_http_retries=5,
retry_error_codes=False,
):
return clickhouse_execute_http(
base_args,
query,
timeout,
settings,
max_http_retries=max_http_retries,
retry_error_codes=retry_error_codes,
).strip()
def clickhouse_execute_json(base_args, query, timeout=60, settings=None, max_http_retries=5):
data = clickhouse_execute_http(base_args, query, timeout, settings, "JSONEachRow", max_http_retries=max_http_retries)
def clickhouse_execute_json(
base_args, query, timeout=60, settings=None, max_http_retries=5
):
data = clickhouse_execute_http(
base_args,
query,
timeout,
settings,
"JSONEachRow",
max_http_retries=max_http_retries,
)
if not data:
return None
rows = []
@ -648,7 +678,9 @@ class TestCase:
clickhouse_execute(
args,
"CREATE DATABASE IF NOT EXISTS " + database + get_db_engine(testcase_args, database),
"CREATE DATABASE IF NOT EXISTS "
+ database
+ get_db_engine(testcase_args, database),
settings=get_create_database_settings(args, testcase_args),
)
@ -831,7 +863,8 @@ class TestCase:
# TODO: remove checking "no-upgrade-check" after 23.1
elif args.upgrade_check and (
"no-upgrade-check" in tags or "no-upgrade-check" in tags):
"no-upgrade-check" in tags or "no-upgrade-check" in tags
):
return FailureReason.NO_UPGRADE_CHECK
elif tags and ("no-s3-storage" in tags) and args.s3_storage:
@ -1051,7 +1084,11 @@ class TestCase:
@staticmethod
def send_test_name_failed(suite: str, case: str):
pid = os.getpid()
clickhouse_execute(args, f"SELECT 'Running test {suite}/{case} from pid={pid}'", retry_error_codes=True)
clickhouse_execute(
args,
f"SELECT 'Running test {suite}/{case} from pid={pid}'",
retry_error_codes=True,
)
def run_single_test(
self, server_logs_level, client_options
@ -2217,6 +2254,7 @@ def find_binary(name):
raise Exception(f"{name} was not found in PATH")
def find_clickhouse_command(binary, command):
symlink = binary + "-" + command
if os.access(symlink, os.X_OK):
@ -2225,6 +2263,7 @@ def find_clickhouse_command(binary, command):
# To avoid requiring symlinks (in case you download binary from CI)
return binary + " " + command
def get_additional_client_options(args):
if args.client_option:
return " ".join("--" + option for option in args.client_option)
@ -2566,7 +2605,9 @@ if __name__ == "__main__":
"WARNING: --extract_from_config option is deprecated and will be removed the the future",
file=sys.stderr,
)
args.extract_from_config = find_clickhouse_command(args.binary, "extract-from-config")
args.extract_from_config = find_clickhouse_command(
args.binary, "extract-from-config"
)
if args.configclient:
args.client += " --config-file=" + args.configclient

View File

@ -63,6 +63,7 @@ DEFAULT_ENV_NAME = ".env"
SANITIZER_SIGN = "=================="
# to create docker-compose env file
def _create_env_file(path, variables):
logging.debug(f"Env {variables} stored in {path}")
@ -1454,7 +1455,6 @@ class ClickHouseCluster:
config_root_name="clickhouse",
extra_configs=[],
) -> "ClickHouseInstance":
"""Add an instance to the cluster.
name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
@ -3089,7 +3089,6 @@ class ClickHouseInstance:
config_root_name="clickhouse",
extra_configs=[],
):
self.name = name
self.base_cmd = cluster.base_cmd
self.docker_id = cluster.get_instance_docker_id(self.name)

View File

@ -216,7 +216,6 @@ class _NetworkManager:
container_exit_timeout=60,
docker_api_version=os.environ.get("DOCKER_API_VERSION"),
):
self.container_expire_timeout = container_expire_timeout
self.container_exit_timeout = container_exit_timeout
@ -232,7 +231,6 @@ class _NetworkManager:
def _ensure_container(self):
if self._container is None or self._container_expire_time <= time.time():
for i in range(5):
if self._container is not None:
try:

View File

@ -1,6 +1,7 @@
import logging
import os.path
# Makes the parallel workers of pytest-xdist to log to separate files.
# Without this function all workers will log to the same log file
# and mix everything together making it much more difficult for troubleshooting.

View File

@ -243,11 +243,18 @@ if __name__ == "__main__":
)
parser.add_argument(
"--no-random", action="store", dest="no_random", help="Disable tests order randomization"
"--no-random",
action="store",
dest="no_random",
help="Disable tests order randomization",
)
parser.add_argument(
"--pre-pull", action="store_true", default=False, dest="pre_pull", help="Pull images for docker_compose before all other actions"
"--pre-pull",
action="store_true",
default=False,
dest="pre_pull",
help="Pull images for docker_compose before all other actions",
)
parser.add_argument(
@ -306,7 +313,6 @@ if __name__ == "__main__":
# if not args.no_random:
# rand_args += f"--random-seed={os.getpid()}"
net = ""
if args.network:
net = "--net={}".format(args.network)
@ -416,8 +422,11 @@ if __name__ == "__main__":
name=CONTAINER_NAME,
)
cmd = cmd_base + " " + args.command
cmd_pre_pull = cmd_base + " find /compose -name docker_compose_*.yml -exec docker-compose -f '{}' pull \;"
cmd = cmd_base + " " + args.command
cmd_pre_pull = (
cmd_base
+ " find /compose -name docker_compose_*.yml -exec docker-compose -f '{}' pull \;"
)
containers = subprocess.check_output(
f"docker ps --all --quiet --filter name={CONTAINER_NAME} --format={{{{.ID}}}}",

View File

@ -9,14 +9,13 @@ from helpers.test_tools import TSV, assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
num_nodes = 4
ddl_task_timeout = 640
num_nodes = 10
def generate_cluster_def():
path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"./_gen/cluster_for_disallow_concurrency_test.xml",
"./_gen/cluster_for_concurrency_test.xml",
)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
@ -86,7 +85,7 @@ def drop_after_test():
node0.query(
"DROP TABLE IF EXISTS tbl ON CLUSTER 'cluster' NO DELAY",
settings={
"distributed_ddl_task_timeout": ddl_task_timeout,
"distributed_ddl_task_timeout": 360,
},
)
@ -101,7 +100,6 @@ def new_backup_name():
def create_and_fill_table():
node0.query("SET mutations_sync=2")
node0.query(
"CREATE TABLE tbl ON CLUSTER 'cluster' ("
"x UInt64"
@ -109,10 +107,7 @@ def create_and_fill_table():
"ORDER BY x"
)
for i in range(num_nodes):
nodes[i].query(f"INSERT INTO tbl SELECT number FROM numbers(100000000)")
nodes[i].query(
f"INSERT INTO tbl SELECT number+100000000 FROM numbers(100000000)"
)
nodes[i].query(f"INSERT INTO tbl SELECT number FROM numbers(40000000)")
# All the tests have concurrent backup/restores with same backup names
@ -143,8 +138,6 @@ def test_concurrent_backups_on_same_node():
nodes[0],
f"SELECT status FROM system.backups WHERE status == 'BACKUP_CREATED' AND id = '{id}'",
"BACKUP_CREATED",
retry_count=100,
sleep_time=1,
)
# This restore part is added to confirm creating an internal backup & restore work
@ -152,11 +145,10 @@ def test_concurrent_backups_on_same_node():
nodes[0].query(
f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY",
settings={
"distributed_ddl_task_timeout": ddl_task_timeout,
"distributed_ddl_task_timeout": 360,
},
)
nodes[0].query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}")
nodes[0].query("SYSTEM SYNC REPLICA ON CLUSTER 'cluster' tbl")
def test_concurrent_backups_on_different_nodes():
@ -181,8 +173,6 @@ def test_concurrent_backups_on_different_nodes():
nodes[1],
f"SELECT status FROM system.backups WHERE status == 'BACKUP_CREATED' AND id = '{id}'",
"BACKUP_CREATED",
retry_count=100,
sleep_time=1,
)
@ -206,14 +196,12 @@ def test_concurrent_restores_on_same_node():
nodes[0],
f"SELECT status FROM system.backups WHERE status == 'BACKUP_CREATED' AND id = '{id}'",
"BACKUP_CREATED",
retry_count=100,
sleep_time=1,
)
nodes[0].query(
f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY",
settings={
"distributed_ddl_task_timeout": ddl_task_timeout,
"distributed_ddl_task_timeout": 360,
},
)
restore_id = (
@ -237,46 +225,44 @@ def test_concurrent_restores_on_different_node():
backup_name = new_backup_name()
id = (
nodes[1]
nodes[0]
.query(f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC")
.split("\t")[0]
)
assert_eq_with_retry(
nodes[1],
nodes[0],
f"SELECT status FROM system.backups WHERE status == 'CREATING_BACKUP' AND id = '{id}'",
"CREATING_BACKUP",
)
assert_eq_with_retry(
nodes[1],
nodes[0],
f"SELECT status FROM system.backups WHERE status == 'BACKUP_CREATED' AND id = '{id}'",
"BACKUP_CREATED",
retry_count=100,
sleep_time=1,
)
nodes[1].query(
nodes[0].query(
f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY",
settings={
"distributed_ddl_task_timeout": ddl_task_timeout,
"distributed_ddl_task_timeout": 360,
},
)
restore_id = (
nodes[1]
nodes[0]
.query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name} ASYNC")
.split("\t")[0]
)
assert_eq_with_retry(
nodes[1],
f"SELECT status FROM system.backups WHERE status == 'RESTORING' AND id == '{restore_id}'",
nodes[0],
f"SELECT status FROM system.backups WHERE status == 'RESTORING'",
"RESTORING",
)
assert "Concurrent restores not supported" in nodes[0].query_and_get_error(
assert "Concurrent restores not supported" in nodes[1].query_and_get_error(
f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}"
)
assert_eq_with_retry(
nodes[1],
nodes[0],
f"SELECT status FROM system.backups WHERE status == 'RESTORED' AND id == '{restore_id}'",
"RESTORED",
)

View File

@ -24,7 +24,6 @@ def start_cluster():
def test_detach_part_wrong_partition_id(start_cluster):
# Here we create table with partition by UUID.
node_21_6.query(
"create table tab (id UUID, value UInt32) engine = MergeTree PARTITION BY (id) order by tuple()"

View File

@ -19,7 +19,6 @@ cluster = ClickHouseCluster(__file__)
def started_cluster():
global cluster
try:
for name in ["first", "second", "third"]:
cluster.add_instance(
name,

View File

@ -19,7 +19,6 @@ cluster = ClickHouseCluster(__file__)
def started_cluster():
global cluster
try:
for name in ["first_of_two", "second_of_two"]:
instance = cluster.add_instance(
name,

View File

@ -63,7 +63,6 @@ def netcat(hostname, port, content):
def test_connections():
client = Client(server.ip_address, 9000, command=cluster.client_bin_path)
assert client.query("SELECT 1") == "1\n"

View File

@ -25,7 +25,6 @@ def start_cluster():
def test_create_query_const_constraints():
instance.query("CREATE USER u_const SETTINGS max_threads = 1 CONST")
instance.query("GRANT ALL ON *.* TO u_const")
@ -57,7 +56,6 @@ def test_create_query_const_constraints():
def test_create_query_minmax_constraints():
instance.query("CREATE USER u_minmax SETTINGS max_threads = 4 MIN 2 MAX 6")
instance.query("GRANT ALL ON *.* TO u_minmax")

View File

@ -348,7 +348,6 @@ class RangedLayoutTester(BaseLayoutTester):
self.layouts = LAYOUTS_RANGED
def execute(self, layout_name, node):
if layout_name not in self.layout_to_dictionary:
raise RuntimeError("Source doesn't support layout: {}".format(layout_name))

View File

@ -7,7 +7,6 @@ import pytest
def started_cluster():
global cluster
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"disks_app_test", main_configs=["config.xml"], with_minio=True

View File

@ -10,6 +10,7 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
# By default the exceptions that was throwed in threads will be ignored
# (they will not mark the test as failed, only printed to stderr).
#

View File

@ -18,7 +18,6 @@ def started_cluster():
def test_huge_column(started_cluster):
if (
node.is_built_with_thread_sanitizer()
or node.is_built_with_memory_sanitizer()

View File

@ -13,7 +13,6 @@ number_of_iterations = 100
def perform_request():
buffer = BytesIO()
crl = pycurl.Curl()
crl.setopt(pycurl.INTERFACE, client_ip)

View File

@ -45,7 +45,6 @@ def start_cluster():
def check_balance(node, table):
partitions = node.query(
"""
WITH

Some files were not shown because too many files have changed in this diff Show More