Merge branch 'master' into ADQM-880

This commit is contained in:
Alexey Gerasimchuk 2023-05-30 07:45:09 +10:00 committed by GitHub
commit 9714fa011e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
52 changed files with 971 additions and 273 deletions

View File

@ -25,6 +25,9 @@ message(STATUS "Intel QPL version: ${QPL_VERSION}")
# Generate 8 library targets: middle_layer_lib, isal, isal_asm, qplcore_px, qplcore_avx512, qplcore_sw_dispatcher, core_iaa, middle_layer_lib.
# Output ch_contrib::qpl by linking with 8 library targets.
# The qpl submodule comes with its own version of isal. It contains code which does not exist in upstream isal. It would be nice to link
# only upstream isal (ch_contrib::isal) but at this point we can't.
include("${QPL_PROJECT_DIR}/cmake/CompileOptions.cmake")
# check nasm compiler
@ -308,7 +311,7 @@ target_include_directories(middle_layer_lib
target_compile_definitions(middle_layer_lib PUBLIC -DQPL_LIB)
# [SUBDIR]c_api
file(GLOB_RECURSE QPL_C_API_SRC
file(GLOB_RECURSE QPL_C_API_SRC
${QPL_SRC_DIR}/c_api/*.c
${QPL_SRC_DIR}/c_api/*.cpp)

View File

@ -131,14 +131,17 @@ CREATE TABLE table_with_asterisk (name String, value UInt32)
The following settings can be set before query execution or placed into configuration file.
- `s3_max_single_part_upload_size` — The maximum size of object to upload using singlepart upload to S3. Default value is `64Mb`.
- `s3_min_upload_part_size` — The minimum size of part to upload during multipart upload to [S3 Multipart upload](https://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html). Default value is `512Mb`.
- `s3_max_single_part_upload_size` — The maximum size of object to upload using singlepart upload to S3. Default value is `32Mb`.
- `s3_min_upload_part_size` — The minimum size of part to upload during multipart upload to [S3 Multipart upload](https://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html). Default value is `16Mb`.
- `s3_max_redirects` — Max number of S3 redirects hops allowed. Default value is `10`.
- `s3_single_read_retries` — The maximum number of attempts during single read. Default value is `4`.
- `s3_max_put_rps` — Maximum PUT requests per second rate before throttling. Default value is `0` (unlimited).
- `s3_max_put_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_put_rps`.
- `s3_max_get_rps` — Maximum GET requests per second rate before throttling. Default value is `0` (unlimited).
- `s3_max_get_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_get_rps`.
- `s3_upload_part_size_multiply_factor` - Multiply `s3_min_upload_part_size` by this factor each time `s3_multiply_parts_count_threshold` parts were uploaded from a single write to S3. Default values is `2`.
- `s3_upload_part_size_multiply_parts_count_threshold` - Each time this number of parts was uploaded to S3 `s3_min_upload_part_size multiplied` by `s3_upload_part_size_multiply_factor`. DEfault value us `500`.
- `s3_max_inflight_parts_for_one_file` - Limits the number of put requests that can be run concurenly for one object. Its number should be limited. The value `0` means unlimited. Default value is `20`. Each inflight part has a buffer with size `s3_min_upload_part_size` for the first `s3_upload_part_size_multiply_factor` parts and more when file is big enought, see `upload_part_size_multiply_factor`. With default settings one uploaded file consumes not more than `320Mb` for a file which is less than `8G`. The consumption is greater for a larger file.
Security consideration: if malicious user can specify arbitrary S3 URLs, `s3_max_redirects` must be set to zero to avoid [SSRF](https://en.wikipedia.org/wiki/Server-side_request_forgery) attacks; or alternatively, `remote_host_filter` must be specified in server configuration.

View File

@ -1219,11 +1219,12 @@ Authentication parameters (the disk will try all available methods **and** Manag
* `account_name` and `account_key` - For authentication using Shared Key.
Limit parameters (mainly for internal usage):
* `max_single_part_upload_size` - Limits the size of a single block upload to Blob Storage.
* `s3_max_single_part_upload_size` - Limits the size of a single block upload to Blob Storage.
* `min_bytes_for_seek` - Limits the size of a seekable region.
* `max_single_read_retries` - Limits the number of attempts to read a chunk of data from Blob Storage.
* `max_single_download_retries` - Limits the number of attempts to download a readable buffer from Blob Storage.
* `thread_pool_size` - Limits the number of threads with which `IDiskRemote` is instantiated.
* `s3_max_inflight_parts_for_one_file` - Limits the number of put requests that can be run concurenly for one object.
Other parameters:
* `metadata_path` - Path on local FS to store metadata files for Blob Storage. Default value is `/var/lib/clickhouse/disks/<disk_name>/`.

View File

@ -1341,12 +1341,14 @@ Queries are logged in the [system.part_log](../../operations/system-tables/part_
Use the following parameters to configure logging:
- `database` Name of the database.
- `table` Name of the system table.
- `partition_by` — [Custom partitioning key](../../engines/table-engines/mergetree-family/custom-partitioning-key.md) for a system table. Can't be used if `engine` defined.
- `engine` - [MergeTree Engine Definition](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-creating-a-table) for a system table. Can't be used if `partition_by` defined.
- `flush_interval_milliseconds` Interval for flushing data from the buffer in memory to the table.
- `storage_policy` Name of storage policy to use for the table (optional)
- `database` - Name of the database.
- `table` - Name of the system table.
- `partition_by` - [Custom partitioning key](../../engines/table-engines/mergetree-family/custom-partitioning-key.md) for a system table. Can't be used if `engine` defined.
- `order_by` - [Custom sorting key](../../engines/table-engines/mergetree-family/mergetree.md#order_by) for a system table. Can't be used if `engine` defined.
- `engine` - [MergeTree Engine Definition](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-creating-a-table) for a system table. Can't be used if `partition_by` or `order_by` defined.
- `flush_interval_milliseconds` - Interval for flushing data from the buffer in memory to the table.
- `storage_policy` - Name of storage policy to use for the table (optional).
- `settings` - [Additional parameters](../../engines/table-engines/mergetree-family/mergetree#settings) that control the behavior of the MergeTree (optional).
**Example**
@ -1417,12 +1419,14 @@ Queries are logged in the [system.query_log](../../operations/system-tables/quer
Use the following parameters to configure logging:
- `database` Name of the database.
- `table` Name of the system table the queries will be logged in.
- `partition_by` — [Custom partitioning key](../../engines/table-engines/mergetree-family/custom-partitioning-key.md) for a system table. Can't be used if `engine` defined.
- `engine` - [MergeTree Engine Definition](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-creating-a-table) for a system table. Can't be used if `partition_by` defined.
- `flush_interval_milliseconds` Interval for flushing data from the buffer in memory to the table.
- `storage_policy` Name of storage policy to use for the table (optional)
- `database` - Name of the database.
- `table` - Name of the system table the queries will be logged in.
- `partition_by` - [Custom partitioning key](../../engines/table-engines/mergetree-family/custom-partitioning-key.md) for a system table. Can't be used if `engine` defined.
- `order_by` - [Custom sorting key](../../engines/table-engines/mergetree-family/mergetree.md#order_by) for a system table. Can't be used if `engine` defined.
- `engine` - [MergeTree Engine Definition](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-creating-a-table) for a system table. Can't be used if `partition_by` or `order_by` defined.
- `flush_interval_milliseconds` - Interval for flushing data from the buffer in memory to the table.
- `storage_policy` - Name of storage policy to use for the table (optional).
- `settings` - [Additional parameters](../../engines/table-engines/mergetree-family/mergetree#settings) that control the behavior of the MergeTree (optional).
If the table does not exist, ClickHouse will create it. If the structure of the query log changed when the ClickHouse server was updated, the table with the old structure is renamed, and a new table is created automatically.
@ -1473,12 +1477,14 @@ Queries are logged in the [system.query_thread_log](../../operations/system-tabl
Use the following parameters to configure logging:
- `database` Name of the database.
- `table` Name of the system table the queries will be logged in.
- `partition_by` — [Custom partitioning key](../../engines/table-engines/mergetree-family/custom-partitioning-key.md) for a system table. Can't be used if `engine` defined.
- `engine` - [MergeTree Engine Definition](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-creating-a-table) for a system table. Can't be used if `partition_by` defined.
- `flush_interval_milliseconds` Interval for flushing data from the buffer in memory to the table.
- `storage_policy` Name of storage policy to use for the table (optional)
- `database` - Name of the database.
- `table` - Name of the system table the queries will be logged in.
- `partition_by` - [Custom partitioning key](../../engines/table-engines/mergetree-family/custom-partitioning-key.md) for a system table. Can't be used if `engine` defined.
- `order_by` - [Custom sorting key](../../engines/table-engines/mergetree-family/mergetree.md#order_by) for a system table. Can't be used if `engine` defined.
- `engine` - [MergeTree Engine Definition](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-creating-a-table) for a system table. Can't be used if `partition_by` or `order_by` defined.
- `flush_interval_milliseconds` - Interval for flushing data from the buffer in memory to the table.
- `storage_policy` - Name of storage policy to use for the table (optional).
- `settings` - [Additional parameters](../../engines/table-engines/mergetree-family/mergetree#settings) that control the behavior of the MergeTree (optional).
If the table does not exist, ClickHouse will create it. If the structure of the query thread log changed when the ClickHouse server was updated, the table with the old structure is renamed, and a new table is created automatically.
@ -1501,12 +1507,14 @@ Queries are logged in the [system.query_views_log](../../operations/system-table
Use the following parameters to configure logging:
- `database` Name of the database.
- `table` Name of the system table the queries will be logged in.
- `partition_by` — [Custom partitioning key](../../engines/table-engines/mergetree-family/custom-partitioning-key.md) for a system table. Can't be used if `engine` defined.
- `engine` - [MergeTree Engine Definition](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-creating-a-table) for a system table. Can't be used if `partition_by` defined.
- `flush_interval_milliseconds` Interval for flushing data from the buffer in memory to the table.
- `storage_policy` Name of storage policy to use for the table (optional)
- `database` - Name of the database.
- `table` - Name of the system table the queries will be logged in.
- `partition_by` - [Custom partitioning key](../../engines/table-engines/mergetree-family/custom-partitioning-key.md) for a system table. Can't be used if `engine` defined.
- `order_by` - [Custom sorting key](../../engines/table-engines/mergetree-family/mergetree.md#order_by) for a system table. Can't be used if `engine` defined.
- `engine` - [MergeTree Engine Definition](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-creating-a-table) for a system table. Can't be used if `partition_by` or `order_by` defined.
- `flush_interval_milliseconds` - Interval for flushing data from the buffer in memory to the table.
- `storage_policy` - Name of storage policy to use for the table (optional).
- `settings` - [Additional parameters](../../engines/table-engines/mergetree-family/mergetree#settings) that control the behavior of the MergeTree (optional).
If the table does not exist, ClickHouse will create it. If the structure of the query views log changed when the ClickHouse server was updated, the table with the old structure is renamed, and a new table is created automatically.
@ -1527,13 +1535,15 @@ Settings for the [text_log](../../operations/system-tables/text_log.md#system_ta
Parameters:
- `level` — Maximum Message Level (by default `Trace`) which will be stored in a table.
- `database` — Database name.
- `table` — Table name.
- `partition_by` — [Custom partitioning key](../../engines/table-engines/mergetree-family/custom-partitioning-key.md) for a system table. Can't be used if `engine` defined.
- `engine` - [MergeTree Engine Definition](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-creating-a-table) for a system table. Can't be used if `partition_by` defined.
- `flush_interval_milliseconds` — Interval for flushing data from the buffer in memory to the table.
- `storage_policy` Name of storage policy to use for the table (optional)
- `level` - Maximum Message Level (by default `Trace`) which will be stored in a table.
- `database` - Database name.
- `table` - Table name.
- `partition_by` - [Custom partitioning key](../../engines/table-engines/mergetree-family/custom-partitioning-key.md) for a system table. Can't be used if `engine` defined.
- `order_by` - [Custom sorting key](../../engines/table-engines/mergetree-family/mergetree.md#order_by) for a system table. Can't be used if `engine` defined.
- `engine` - [MergeTree Engine Definition](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-creating-a-table) for a system table. Can't be used if `partition_by` or `order_by` defined.
- `flush_interval_milliseconds` - Interval for flushing data from the buffer in memory to the table.
- `storage_policy` - Name of storage policy to use for the table (optional).
- `settings` - [Additional parameters](../../engines/table-engines/mergetree-family/mergetree#settings) that control the behavior of the MergeTree (optional).
**Example**
```xml
@ -1556,12 +1566,14 @@ Settings for the [trace_log](../../operations/system-tables/trace_log.md#system_
Parameters:
- `database` — Database for storing a table.
- `table` — Table name.
- `partition_by` — [Custom partitioning key](../../engines/table-engines/mergetree-family/custom-partitioning-key.md) for a system table. Can't be used if `engine` defined.
- `engine` - [MergeTree Engine Definition](../../engines/table-engines/mergetree-family/index.md) for a system table. Can't be used if `partition_by` defined.
- `flush_interval_milliseconds` — Interval for flushing data from the buffer in memory to the table.
- `storage_policy` Name of storage policy to use for the table (optional)
- `database` - Database for storing a table.
- `table` - Table name.
- `partition_by` - [Custom partitioning key](../../engines/table-engines/mergetree-family/custom-partitioning-key.md) for a system table. Can't be used if `engine` defined.
- `order_by` - [Custom sorting key](../../engines/table-engines/mergetree-family/mergetree.md#order_by) for a system table. Can't be used if `engine` defined.
- `engine` - [MergeTree Engine Definition](../../engines/table-engines/mergetree-family/index.md) for a system table. Can't be used if `partition_by` or `order_by` defined.
- `flush_interval_milliseconds` - Interval for flushing data from the buffer in memory to the table.
- `storage_policy` - Name of storage policy to use for the table (optional).
- `settings` - [Additional parameters](../../engines/table-engines/mergetree-family/mergetree#settings) that control the behavior of the MergeTree (optional).
The default server configuration file `config.xml` contains the following settings section:

View File

@ -1187,6 +1187,36 @@ Disable limit on kafka_num_consumers that depends on the number of available CPU
Default value: false.
## postgresql_connection_pool_size {#postgresql-connection-pool-size}
Connection pool size for PostgreSQL table engine and database engine.
Default value: 16
## postgresql_connection_pool_size {#postgresql-connection-pool-size}
Connection pool push/pop timeout on empty pool for PostgreSQL table engine and database engine. By default it will block on empty pool.
Default value: 5000
## postgresql_connection_pool_auto_close_connection {#postgresql-connection-pool-auto-close-connection}
Close connection before returning connection to the pool.
Default value: true.
## odbc_bridge_connection_pool_size {#odbc-bridge-connection-pool-size}
Connection pool size for each connection settings string in ODBC bridge.
Default value: 16
## odbc_bridge_use_connection_pooling {#odbc-bridge-use-connection-pooling}
Use connection pooling in ODBC bridge. If set to false, a new connection is created every time.
Default value: true
## use_uncompressed_cache {#setting-use_uncompressed_cache}
Whether to use a cache of uncompressed blocks. Accepts 0 or 1. By default, 0 (disabled).
@ -3563,7 +3593,7 @@ SETTINGS index_granularity = 8192 │
## external_table_functions_use_nulls {#external-table-functions-use-nulls}
Defines how [mysql](../../sql-reference/table-functions/mysql.md), [postgresql](../../sql-reference/table-functions/postgresql.md) and [odbc](../../sql-reference/table-functions/odbc.md)] table functions use Nullable columns.
Defines how [mysql](../../sql-reference/table-functions/mysql.md), [postgresql](../../sql-reference/table-functions/postgresql.md) and [odbc](../../sql-reference/table-functions/odbc.md) table functions use Nullable columns.
Possible values:

View File

@ -2234,7 +2234,7 @@ Result:
## Regular Expression Tree Dictionary {#regexp-tree-dictionary}
Regular expression tree dictionaries are a special type of dictionary which represent the mapping from key to attributes using a tree of regular expressions. There are some use cases, e.g. parsing of (user agent)[https://en.wikipedia.org/wiki/User_agent] strings, which can be expressed elegantly with regexp tree dictionaries.
Regular expression tree dictionaries are a special type of dictionary which represent the mapping from key to attributes using a tree of regular expressions. There are some use cases, e.g. parsing of [user agent](https://en.wikipedia.org/wiki/User_agent) strings, which can be expressed elegantly with regexp tree dictionaries.
### Use Regular Expression Tree Dictionary in ClickHouse Open-Source
@ -2280,7 +2280,7 @@ This config consists of a list of regular expression tree nodes. Each node has t
- The value of an attribute may contain **back references**, referring to capture groups of the matched regular expression. In the example, the value of attribute `version` in the first node consists of a back-reference `\1` to capture group `(\d+[\.\d]*)` in the regular expression. Back-reference numbers range from 1 to 9 and are written as `$1` or `\1` (for number 1). The back reference is replaced by the matched capture group during query execution.
- **child nodes**: a list of children of a regexp tree node, each of which has its own attributes and (potentially) children nodes. String matching proceeds in a depth-first fashion. If a string matches a regexp node, the dictionary checks if it also matches the nodes' child nodes. If that is the case, the attributes of the deepest matching node are assigned. Attributes of a child node overwrite equally named attributes of parent nodes. The name of child nodes in YAML files can be arbitrary, e.g. `versions` in above example.
Regexp tree dictionaries only allow access using functions `dictGet`, `dictGetOrDefault` and `dictGetOrNull`.
Regexp tree dictionaries only allow access using the functions `dictGet` and `dictGetOrDefault`.
Example:

View File

@ -34,7 +34,7 @@ For the `SAMPLE` clause the following syntax is supported:
| `SAMPLE k OFFSET m` | Here `k` and `m` are the numbers from 0 to 1. The query is executed on a sample of `k` fraction of the data. The data used for the sample is offset by `m` fraction. [Read more](#select-sample-offset) |
## SAMPLE K
## SAMPLE K {#select-sample-k}
Here `k` is the number from 0 to 1 (both fractional and decimal notations are supported). For example, `SAMPLE 1/2` or `SAMPLE 0.5`.
@ -54,7 +54,7 @@ ORDER BY PageViews DESC LIMIT 1000
In this example, the query is executed on a sample from 0.1 (10%) of data. Values of aggregate functions are not corrected automatically, so to get an approximate result, the value `count()` is manually multiplied by 10.
## SAMPLE N
## SAMPLE N {#select-sample-n}
Here `n` is a sufficiently large integer. For example, `SAMPLE 10000000`.
@ -90,7 +90,7 @@ FROM visits
SAMPLE 10000000
```
## SAMPLE K OFFSET M
## SAMPLE K OFFSET M {#select-sample-offset}
Here `k` and `m` are numbers from 0 to 1. Examples are shown below.

View File

@ -1137,6 +1137,16 @@
<ttl>event_date + INTERVAL 30 DAY DELETE</ttl>
-->
<!--
ORDER BY expr: https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree#order_by
Example:
event_date, event_time
event_date, type, query_id
event_date, event_time, initial_query_id
<order_by>event_date, event_time, initial_query_id</order_by>
-->
<!-- Instead of partition_by, you can provide full engine expression (starting with ENGINE = ) with parameters,
Example: <engine>ENGINE = MergeTree PARTITION BY toYYYYMM(event_date) ORDER BY (event_date, event_time) SETTINGS index_granularity = 1024</engine>
-->

View File

@ -152,6 +152,13 @@ public:
nested_func->merge(place, rhs, arena);
}
bool isAbleToParallelizeMerge() const override { return nested_func->isAbleToParallelizeMerge(); }
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, ThreadPool & thread_pool, Arena * arena) const override
{
nested_func->merge(place, rhs, thread_pool, arena);
}
void mergeBatch(
size_t row_begin,
size_t row_end,

View File

@ -433,7 +433,7 @@ const String & AsyncLoader::getPoolName(size_t pool) const
return pools[pool].name; // NOTE: lock is not needed because `name` is const and `pools` are immutable
}
ssize_t AsyncLoader::getPoolPriority(size_t pool) const
Priority AsyncLoader::getPoolPriority(size_t pool) const
{
return pools[pool].priority; // NOTE: lock is not needed because `priority` is const and `pools` are immutable
}
@ -576,7 +576,7 @@ void AsyncLoader::prioritize(const LoadJobPtr & job, size_t new_pool_id, std::un
{
Pool & old_pool = pools[job->pool_id];
Pool & new_pool = pools[new_pool_id];
if (old_pool.priority >= new_pool.priority)
if (old_pool.priority <= new_pool.priority)
return; // Never lower priority or change pool leaving the same priority
// Update priority and push job forward through ready queue if needed
@ -590,7 +590,7 @@ void AsyncLoader::prioritize(const LoadJobPtr & job, size_t new_pool_id, std::un
spawn(new_pool, lock);
}
// Set user-facing pool and priority (may affect executing jobs)
// Set user-facing pool (may affect executing jobs)
job->pool_id.store(new_pool_id);
// Recurse into dependencies
@ -621,7 +621,7 @@ bool AsyncLoader::canSpawnWorker(Pool & pool, std::unique_lock<std::mutex> &)
return is_running
&& !pool.ready_queue.empty()
&& pool.workers < pool.max_threads
&& (!current_priority || *current_priority <= pool.priority);
&& (!current_priority || *current_priority >= pool.priority);
}
bool AsyncLoader::canWorkerLive(Pool & pool, std::unique_lock<std::mutex> &)
@ -629,17 +629,17 @@ bool AsyncLoader::canWorkerLive(Pool & pool, std::unique_lock<std::mutex> &)
return is_running
&& !pool.ready_queue.empty()
&& pool.workers <= pool.max_threads
&& (!current_priority || *current_priority <= pool.priority);
&& (!current_priority || *current_priority >= pool.priority);
}
void AsyncLoader::updateCurrentPriorityAndSpawn(std::unique_lock<std::mutex> & lock)
{
// Find current priority.
// NOTE: We assume low number of pools, so O(N) scans are fine.
std::optional<ssize_t> priority;
std::optional<Priority> priority;
for (Pool & pool : pools)
{
if (pool.isActive() && (!priority || *priority < pool.priority))
if (pool.isActive() && (!priority || *priority > pool.priority))
priority = pool.priority;
}
current_priority = priority;

View File

@ -11,6 +11,7 @@
#include <boost/noncopyable.hpp>
#include <base/types.h>
#include <Common/CurrentMetrics.h>
#include <Common/Priority.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadPool_fwd.h>
@ -268,10 +269,10 @@ inline LoadTaskPtrs joinTasks(const LoadTaskPtrs & tasks1, const LoadTaskPtrs &
// `AsyncLoader` is a scheduler for DAG of `LoadJob`s. It tracks job dependencies and priorities.
// Basic usage example:
// // Start async_loader with two thread pools (0=bg, 1=fg):
// // Start async_loader with two thread pools (0=fg, 1=bg):
// AsyncLoader async_loader({
// {"BgPool", CurrentMetrics::AsyncLoaderThreads, CurrentMetrics::AsyncLoaderThreadsActive, .max_threads = 1, .priority = 0}
// {"FgPool", CurrentMetrics::AsyncLoaderThreads, CurrentMetrics::AsyncLoaderThreadsActive, .max_threads = 2, .priority = 1}
// {"FgPool", CurrentMetrics::AsyncLoaderThreads, CurrentMetrics::AsyncLoaderThreadsActive, .max_threads = 2, .priority{0}}
// {"BgPool", CurrentMetrics::AsyncLoaderThreads, CurrentMetrics::AsyncLoaderThreadsActive, .max_threads = 1, .priority{1}}
// });
//
// // Create and schedule a task consisting of three jobs. Job1 has no dependencies and is run first.
@ -279,19 +280,19 @@ inline LoadTaskPtrs joinTasks(const LoadTaskPtrs & tasks1, const LoadTaskPtrs &
// auto job_func = [&] (const LoadJobPtr & self) {
// LOG_TRACE(log, "Executing load job '{}' in pool '{}'", self->name, async_loader->getPoolName(self->pool()));
// };
// auto job1 = makeLoadJob({}, "job1", /* pool_id = */ 0, job_func);
// auto job2 = makeLoadJob({ job1 }, "job2", /* pool_id = */ 0, job_func);
// auto job3 = makeLoadJob({ job1 }, "job3", /* pool_id = */ 0, job_func);
// auto job1 = makeLoadJob({}, "job1", /* pool_id = */ 1, job_func);
// auto job2 = makeLoadJob({ job1 }, "job2", /* pool_id = */ 1, job_func);
// auto job3 = makeLoadJob({ job1 }, "job3", /* pool_id = */ 1, job_func);
// auto task = makeLoadTask(async_loader, { job1, job2, job3 });
// task.schedule();
//
// // Another thread may prioritize a job by changing its pool and wait for it:
// async_loader->prioritize(job3, /* pool_id = */ 1); // higher priority jobs are run first, default priority is zero.
// job3->wait(); // blocks until job completion or cancellation and rethrow an exception (if any)
// async_loader->prioritize(job3, /* pool_id = */ 0); // Increase priority: 1 -> 0 (lower is better)
// job3->wait(); // Blocks until job completion or cancellation and rethrow an exception (if any)
//
// Every job has a pool associated with it. AsyncLoader starts every job in its thread pool.
// Each pool has a constant priority and a mutable maximum number of threads.
// Higher priority (greater `pool.priority` value) jobs are run first.
// Higher priority (lower `pool.priority` value) jobs are run first.
// No job with lower priority is started while there is at least one higher priority job ready or running.
//
// Job priority can be elevated (but cannot be lowered)
@ -301,7 +302,8 @@ inline LoadTaskPtrs joinTasks(const LoadTaskPtrs & tasks1, const LoadTaskPtrs &
// this also leads to a priority inheritance for all the dependencies.
// Value stored in load job `pool_id` field is atomic and can be changed even during job execution.
// Job is, of course, not moved from its initial thread pool, but it should use `self->pool()` for
// all new jobs it create to avoid priority inversion.
// all new jobs it create to avoid priority inversion. To obtain pool in which job is being executed
// call `self->execution_pool()` instead.
//
// === IMPLEMENTATION DETAILS ===
// All possible states and statuses of a job:
@ -335,7 +337,7 @@ private:
struct Pool
{
const String name;
const ssize_t priority;
const Priority priority;
std::unique_ptr<ThreadPool> thread_pool; // NOTE: we avoid using a `ThreadPool` queue to be able to move jobs between pools.
std::map<UInt64, LoadJobPtr> ready_queue; // FIFO queue of jobs to be executed in this pool. Map is used for faster erasing. Key is `ready_seqno`
size_t max_threads; // Max number of workers to be spawn
@ -367,7 +369,7 @@ public:
Metric metric_threads;
Metric metric_active_threads;
size_t max_threads;
ssize_t priority;
Priority priority;
};
AsyncLoader(std::vector<PoolInitializer> pool_initializers, bool log_failures_, bool log_progress_);
@ -412,7 +414,7 @@ public:
size_t getMaxThreads(size_t pool) const;
const String & getPoolName(size_t pool) const;
ssize_t getPoolPriority(size_t pool) const;
Priority getPoolPriority(size_t pool) const;
size_t getScheduledJobCount() const;
@ -451,7 +453,7 @@ private:
mutable std::mutex mutex; // Guards all the fields below.
bool is_running = true;
std::optional<ssize_t> current_priority; // highest priority among active pools
std::optional<Priority> current_priority; // highest priority among active pools
UInt64 last_ready_seqno = 0; // Increasing counter for ready queue keys.
std::unordered_map<LoadJobPtr, Info> scheduled_jobs; // Full set of scheduled pending jobs along with scheduling info.
std::vector<Pool> pools; // Thread pools for job execution and ready queues

View File

@ -366,7 +366,7 @@ The server successfully detected this situation and will download merged part fr
M(WriteBufferFromS3Microseconds, "Time spent on writing to S3.") \
M(WriteBufferFromS3Bytes, "Bytes written to S3.") \
M(WriteBufferFromS3RequestsErrors, "Number of exceptions while writing to S3.") \
\
M(WriteBufferFromS3WaitInflightLimitMicroseconds, "Time spent on waiting while some of the current requests are done when its number reached the limit defined by s3_max_inflight_parts_for_one_file.") \
M(QueryMemoryLimitExceeded, "Number of times when memory limit exceeded for query.") \
\
M(CachedReadBufferReadFromSourceMicroseconds, "Time reading from filesystem cache source (from remote filesystem, etc)") \

View File

@ -92,7 +92,7 @@ public:
String getName() const override { return LogElement::name(); }
static const char * getDefaultOrderBy() { return "(event_date, event_time)"; }
static const char * getDefaultOrderBy() { return "event_date, event_time"; }
protected:
Poco::Logger * log;

View File

@ -32,7 +32,7 @@ namespace DB::ErrorCodes
struct Initializer {
size_t max_threads = 1;
ssize_t priority = 0;
Priority priority;
};
struct AsyncLoaderTest
@ -144,11 +144,11 @@ struct AsyncLoaderTest
TEST(AsyncLoader, Smoke)
{
AsyncLoaderTest t({
{.max_threads = 2, .priority = 0},
{.max_threads = 2, .priority = -1},
{.max_threads = 2, .priority = Priority{0}},
{.max_threads = 2, .priority = Priority{1}},
});
static constexpr ssize_t low_priority_pool = 1;
static constexpr size_t low_priority_pool = 1;
std::atomic<size_t> jobs_done{0};
std::atomic<size_t> low_priority_jobs_done{0};
@ -419,6 +419,8 @@ TEST(AsyncLoader, CancelExecutingTask)
}
}
// This test is disabled due to `MemorySanitizer: use-of-uninitialized-value` issue in `collectSymbolsFromProgramHeaders` function
// More details: https://github.com/ClickHouse/ClickHouse/pull/48923#issuecomment-1545415482
TEST(AsyncLoader, DISABLED_JobFailure)
{
AsyncLoaderTest t;
@ -595,16 +597,16 @@ TEST(AsyncLoader, TestOverload)
TEST(AsyncLoader, StaticPriorities)
{
AsyncLoaderTest t({
{.max_threads = 1, .priority = 0},
{.max_threads = 1, .priority = 1},
{.max_threads = 1, .priority = 2},
{.max_threads = 1, .priority = 3},
{.max_threads = 1, .priority = 4},
{.max_threads = 1, .priority = 5},
{.max_threads = 1, .priority = 6},
{.max_threads = 1, .priority = 7},
{.max_threads = 1, .priority = 8},
{.max_threads = 1, .priority = 9},
{.max_threads = 1, .priority{0}},
{.max_threads = 1, .priority{-1}},
{.max_threads = 1, .priority{-2}},
{.max_threads = 1, .priority{-3}},
{.max_threads = 1, .priority{-4}},
{.max_threads = 1, .priority{-5}},
{.max_threads = 1, .priority{-6}},
{.max_threads = 1, .priority{-7}},
{.max_threads = 1, .priority{-8}},
{.max_threads = 1, .priority{-9}},
});
std::string schedule;
@ -614,6 +616,15 @@ TEST(AsyncLoader, StaticPriorities)
schedule += fmt::format("{}{}", self->name, self->pool());
};
// Job DAG with priorities. After priority inheritance from H9, jobs D9 and E9 can be
// executed in undefined order (Tested further in DynamicPriorities)
// A0(9) -+-> B3
// |
// `-> C4
// |
// `-> D1(9) -.
// | +-> F0(9) --> G0(9) --> H9
// `-> E2(9) -'
std::vector<LoadJobPtr> jobs;
jobs.push_back(makeLoadJob({}, 0, "A", job_func)); // 0
jobs.push_back(makeLoadJob({ jobs[0] }, 3, "B", job_func)); // 1
@ -627,16 +638,15 @@ TEST(AsyncLoader, StaticPriorities)
t.loader.start();
t.loader.wait();
ASSERT_EQ(schedule, "A9E9D9F9G9H9C4B3");
ASSERT_TRUE(schedule == "A9E9D9F9G9H9C4B3" || schedule == "A9D9E9F9G9H9C4B3");
}
TEST(AsyncLoader, SimplePrioritization)
{
AsyncLoaderTest t({
{.max_threads = 1, .priority = 0},
{.max_threads = 1, .priority = 1},
{.max_threads = 1, .priority = 2},
{.max_threads = 1, .priority{0}},
{.max_threads = 1, .priority{-1}},
{.max_threads = 1, .priority{-2}},
});
t.loader.start();
@ -674,16 +684,16 @@ TEST(AsyncLoader, SimplePrioritization)
TEST(AsyncLoader, DynamicPriorities)
{
AsyncLoaderTest t({
{.max_threads = 1, .priority = 0},
{.max_threads = 1, .priority = 1},
{.max_threads = 1, .priority = 2},
{.max_threads = 1, .priority = 3},
{.max_threads = 1, .priority = 4},
{.max_threads = 1, .priority = 5},
{.max_threads = 1, .priority = 6},
{.max_threads = 1, .priority = 7},
{.max_threads = 1, .priority = 8},
{.max_threads = 1, .priority = 9},
{.max_threads = 1, .priority{0}},
{.max_threads = 1, .priority{-1}},
{.max_threads = 1, .priority{-2}},
{.max_threads = 1, .priority{-3}},
{.max_threads = 1, .priority{-4}},
{.max_threads = 1, .priority{-5}},
{.max_threads = 1, .priority{-6}},
{.max_threads = 1, .priority{-7}},
{.max_threads = 1, .priority{-8}},
{.max_threads = 1, .priority{-9}},
});
for (bool prioritize : {false, true})
@ -890,8 +900,8 @@ TEST(AsyncLoader, DynamicPools)
const size_t max_threads[] { 2, 10 };
const int jobs_in_chain = 16;
AsyncLoaderTest t({
{.max_threads = max_threads[0], .priority = 0},
{.max_threads = max_threads[1], .priority = 1},
{.max_threads = max_threads[0], .priority{0}},
{.max_threads = max_threads[1], .priority{-1}},
});
t.loader.start();

View File

@ -631,14 +631,12 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
request_for_session->digest = state_machine->getNodesDigest();
using enum KeeperStateMachine::ZooKeeperLogSerializationVersion;
/// older versions of Keeper can send logs that are missing some fields
size_t bytes_missing = 0;
if (serialization_version < WITH_TIME)
if (serialization_version < KeeperStateMachine::ZooKeeperLogSerializationVersion::WITH_TIME)
bytes_missing += sizeof(request_for_session->time);
if (serialization_version < WITH_ZXID_DIGEST)
if (serialization_version < KeeperStateMachine::ZooKeeperLogSerializationVersion::WITH_ZXID_DIGEST)
bytes_missing += sizeof(request_for_session->zxid) + sizeof(request_for_session->digest->version) + sizeof(request_for_session->digest->value);
if (bytes_missing != 0)
@ -652,14 +650,14 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
size_t write_buffer_header_size
= sizeof(request_for_session->zxid) + sizeof(request_for_session->digest->version) + sizeof(request_for_session->digest->value);
if (serialization_version < WITH_TIME)
if (serialization_version < KeeperStateMachine::ZooKeeperLogSerializationVersion::WITH_TIME)
write_buffer_header_size += sizeof(request_for_session->time);
auto * buffer_start = reinterpret_cast<BufferBase::Position>(entry_buf->data_begin() + entry_buf->size() - write_buffer_header_size);
WriteBuffer write_buf(buffer_start, write_buffer_header_size);
if (serialization_version < WITH_TIME)
if (serialization_version < KeeperStateMachine::ZooKeeperLogSerializationVersion::WITH_TIME)
writeIntBinary(request_for_session->time, write_buf);
writeIntBinary(request_for_session->zxid, write_buf);

View File

@ -78,6 +78,7 @@ class IColumn;
M(UInt64, s3_max_upload_part_size, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_upload_part_size_multiply_factor, 2, "Multiply s3_min_upload_part_size by this factor each time s3_multiply_parts_count_threshold parts were uploaded from a single write to S3.", 0) \
M(UInt64, s3_upload_part_size_multiply_parts_count_threshold, 500, "Each time this number of parts was uploaded to S3 s3_min_upload_part_size multiplied by s3_upload_part_size_multiply_factor.", 0) \
M(UInt64, s3_max_inflight_parts_for_one_file, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited. You ", 0) \
M(UInt64, s3_max_single_part_upload_size, 32*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, s3_max_unexpected_write_error_retries, 4, "The maximum number of retries in case of unexpected errors during S3 write.", 0) \
@ -93,6 +94,7 @@ class IColumn;
M(Bool, s3_check_objects_after_upload, false, "Check each uploaded object to s3 with head request to be sure that upload was successful", 0) \
M(Bool, s3_allow_parallel_part_upload, true, "Use multiple threads for s3 multipart upload. It may lead to slightly higher memory usage", 0) \
M(Bool, s3_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \
M(UInt64, s3_retry_attempts, 10, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries", 0) \
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \

View File

@ -188,12 +188,12 @@ try
try
{
file->write(payload.data(), payload.size());
file->finalize();
}
catch (...)
{
/// Log current exception, because finalize() can throw a different exception.
tryLogCurrentException(__PRETTY_FUNCTION__);
file->finalize();
throw;
}
}

View File

@ -146,7 +146,8 @@ std::unique_ptr<S3::Client> getClient(
S3::ServerSideEncryptionKMSConfig sse_kms_config = S3::getSSEKMSConfig(config_prefix, config);
client_configuration.retryStrategy
= std::make_shared<Aws::Client::DefaultRetryStrategy>(config.getUInt(config_prefix + ".retry_attempts", 10));
= std::make_shared<Aws::Client::DefaultRetryStrategy>(
config.getUInt64(config_prefix + ".retry_attempts", settings.request_settings.retry_attempts));
return S3::ClientFactory::instance().create(
client_configuration,

View File

@ -2,6 +2,7 @@
#include <Common/ErrorCodes.h>
#include <Common/Exception.h>
#include <Common/Priority.h>
#include <IO/ResourceRequest.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -37,7 +38,7 @@ inline const Poco::Util::AbstractConfiguration & emptyConfig()
struct SchedulerNodeInfo
{
double weight = 1.0; /// Weight of this node among it's siblings
Int64 priority = 0; /// Priority of this node among it's siblings (higher value means higher priority)
Priority priority; /// Priority of this node among it's siblings (lower value means higher priority)
/// Arbitrary data accessed/stored by parent
union {
@ -65,7 +66,7 @@ struct SchedulerNodeInfo
void setPriority(Int64 value)
{
priority = value;
priority.value = value;
}
};

View File

@ -26,12 +26,12 @@ class PriorityPolicy : public ISchedulerNode
struct Item
{
ISchedulerNode * child = nullptr;
Int64 priority = 0; // higher value means higher priority
Priority priority; // lower value means higher priority
/// For max-heap by priority
bool operator<(const Item& rhs) const noexcept
{
return priority < rhs.priority;
return priority > rhs.priority; // Reversed for heap top to yield highest priority (lowest value) child first
}
};

View File

@ -22,9 +22,9 @@ TEST(IOResourcePriorityPolicy, Priorities)
ResourceTest t;
t.add<PriorityPolicy>("/");
t.add<FifoQueue>("/A", "<priority>1</priority>");
t.add<FifoQueue>("/A", "<priority>3</priority>");
t.add<FifoQueue>("/B", "<priority>2</priority>");
t.add<FifoQueue>("/C", "<priority>3</priority>");
t.add<FifoQueue>("/C", "<priority>1</priority>");
t.enqueue("/A", {10, 10, 10});
t.enqueue("/B", {10, 10, 10});
@ -56,9 +56,9 @@ TEST(IOResourcePriorityPolicy, Activation)
ResourceTest t;
t.add<PriorityPolicy>("/");
t.add<FifoQueue>("/A", "<priority>1</priority>");
t.add<FifoQueue>("/A", "<priority>3</priority>");
t.add<FifoQueue>("/B", "<priority>2</priority>");
t.add<FifoQueue>("/C", "<priority>3</priority>");
t.add<FifoQueue>("/C", "<priority>1</priority>");
t.enqueue("/A", {10, 10, 10, 10, 10, 10});
t.enqueue("/B", {10});

View File

@ -49,7 +49,7 @@ TEST(IOResourceStaticResourceManager, Prioritization)
{
// Lock is not required here because this is called during request execution and we have max_requests = 1
if (last_priority)
EXPECT_TRUE(priority <= *last_priority); // Should be true if every queue arrived at the same time at busy period start
EXPECT_TRUE(priority >= *last_priority); // Should be true if every queue arrived at the same time at busy period start
last_priority = priority;
};
@ -63,8 +63,8 @@ TEST(IOResourceStaticResourceManager, Prioritization)
<res1>
<node path="/"> <type>inflight_limit</type><max_requests>1</max_requests></node>
<node path="/prio"> <type>priority</type></node>
<node path="/prio/A"> <priority>-1</priority></node>
<node path="/prio/B"> <priority>1</priority></node>
<node path="/prio/A"> <priority>1</priority></node>
<node path="/prio/B"> <priority>-1</priority></node>
<node path="/prio/C"> </node>
<node path="/prio/D"> </node>
<node path="/prio/leader"></node>

View File

@ -92,8 +92,11 @@ WriteBufferFromS3::WriteBufferFromS3(
, write_settings(write_settings_)
, client_ptr(std::move(client_ptr_))
, object_metadata(std::move(object_metadata_))
, buffer_allocation_policy(ChooseBufferPolicy(request_settings_.getUploadSettings()))
, task_tracker(std::make_unique<WriteBufferFromS3::TaskTracker>(std::move(schedule_)))
, buffer_allocation_policy(ChooseBufferPolicy(upload_settings))
, task_tracker(
std::make_unique<WriteBufferFromS3::TaskTracker>(
std::move(schedule_),
upload_settings.max_inflight_parts_for_one_file))
{
LOG_TRACE(log, "Create WriteBufferFromS3, {}", getLogDetails());
@ -109,8 +112,11 @@ void WriteBufferFromS3::nextImpl()
ErrorCodes::LOGICAL_ERROR,
"Cannot write to prefinalized buffer for S3, the file could have been created with PutObjectRequest");
/// Make sense to call to before adding new async task to check if there is an exception
task_tracker->waitReady();
/// Make sense to call waitIfAny before adding new async task to check if there is an exception
/// The faster the exception is propagated the lesser time is spent for cancellation
/// Despite the fact that `task_tracker->add()` collects tasks statuses and propagates their exceptions
/// that call is necessary for the case when the is no in-flight limitation and therefore `task_tracker->add()` doesn't wait anything
task_tracker->waitIfAny();
hidePartialData();
@ -134,7 +140,8 @@ void WriteBufferFromS3::preFinalize()
LOG_TRACE(log, "preFinalize WriteBufferFromS3. {}", getLogDetails());
task_tracker->waitReady();
/// This function should not be run again if an exception has occurred
is_prefinalized = true;
hidePartialData();
@ -166,8 +173,6 @@ void WriteBufferFromS3::preFinalize()
{
writeMultipartUpload();
}
is_prefinalized = true;
}
void WriteBufferFromS3::finalizeImpl()
@ -212,8 +217,8 @@ String WriteBufferFromS3::getLogDetails() const
multipart_upload_details = fmt::format(", upload id {}, upload has finished {}"
, multipart_upload_id, multipart_upload_finished);
return fmt::format("Details: bucket {}, key {}, total size {}, count {}, hidden_size {}, offset {}, with pool: {}, finalized {}{}",
bucket, key, total_size, count(), hidden_size, offset(), task_tracker->isAsync(), finalized, multipart_upload_details);
return fmt::format("Details: bucket {}, key {}, total size {}, count {}, hidden_size {}, offset {}, with pool: {}, prefinalized {}, finalized {}{}",
bucket, key, total_size, count(), hidden_size, offset(), task_tracker->isAsync(), is_prefinalized, finalized, multipart_upload_details);
}
void WriteBufferFromS3::tryToAbortMultipartUpload()
@ -234,7 +239,7 @@ WriteBufferFromS3::~WriteBufferFromS3()
{
LOG_TRACE(log, "Close WriteBufferFromS3. {}.", getLogDetails());
// That descructor could be call with finalized=false in case of exceptions
// That destructor could be call with finalized=false in case of exceptions
if (!finalized)
{
LOG_ERROR(log, "WriteBufferFromS3 is not finalized in destructor. It could be if an exception occurs. File is not written to S3. {}.", getLogDetails());

View File

@ -4,12 +4,18 @@
#include <IO/WriteBufferFromS3TaskTracker.h>
namespace ProfileEvents
{
extern const Event WriteBufferFromS3WaitInflightLimitMicroseconds;
}
namespace DB
{
WriteBufferFromS3::TaskTracker::TaskTracker(ThreadPoolCallbackRunner<void> scheduler_)
WriteBufferFromS3::TaskTracker::TaskTracker(ThreadPoolCallbackRunner<void> scheduler_, size_t max_tasks_inflight_)
: is_async(bool(scheduler_))
, scheduler(scheduler_ ? std::move(scheduler_) : syncRunner())
, max_tasks_inflight(max_tasks_inflight_)
{}
WriteBufferFromS3::TaskTracker::~TaskTracker()
@ -28,36 +34,6 @@ ThreadPoolCallbackRunner<void> WriteBufferFromS3::TaskTracker::syncRunner()
};
}
void WriteBufferFromS3::TaskTracker::waitReady()
{
LOG_TEST(log, "waitReady, in queue {}", futures.size());
/// Exceptions are propagated
auto it = futures.begin();
while (it != futures.end())
{
chassert(it->valid());
if (it->wait_for(std::chrono::seconds(0)) != std::future_status::ready)
{
++it;
continue;
}
try
{
it->get();
} catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
it = futures.erase(it);
}
LOG_TEST(log, "waitReady ended, in queue {}", futures.size());
}
void WriteBufferFromS3::TaskTracker::waitAll()
{
LOG_TEST(log, "waitAll, in queue {}", futures.size());
@ -65,66 +41,145 @@ void WriteBufferFromS3::TaskTracker::waitAll()
/// Exceptions are propagated
for (auto & future : futures)
{
try
{
future.get();
} catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
future.get();
}
futures.clear();
std::lock_guard lock(mutex);
finished_futures.clear();
}
void WriteBufferFromS3::TaskTracker::safeWaitAll()
{
LOG_TEST(log, "safeWaitAll, wait in queue {}", futures.size());
/// Exceptions are not propagated
for (auto & future : futures)
{
LOG_TEST(log, "safeWaitAll, wait future");
if (future.valid())
future.wait();
}
LOG_TEST(log, "safeWaitAll, get in queue {}", futures.size());
for (auto & future : futures)
{
if (future.valid())
{
try
{
/// Exceptions are not propagated
future.get();
} catch (...)
{
/// But at least they are printed
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
futures.clear();
LOG_TEST(log, "safeWaitAll ended, get in queue {}", futures.size());
std::lock_guard lock(mutex);
finished_futures.clear();
}
void WriteBufferFromS3::TaskTracker::waitIfAny()
{
LOG_TEST(log, "waitIfAny, in queue {}", futures.size());
if (futures.empty())
return;
Stopwatch watch;
{
std::lock_guard lock(mutex);
for (auto & it : finished_futures)
{
/// actually that call might lock this thread until the future is set finally
/// however that won't lock us for long, the task is about to finish when the pointer appears in the `finished_futures`
it->get();
/// in case of exception in `it->get()`
/// it it not necessary to remove `it` from list `futures`
/// `TaskTracker` has to be destroyed after any exception occurs, for this `safeWaitAll` is called.
/// `safeWaitAll` handles invalid futures in the list `futures`
futures.erase(it);
}
finished_futures.clear();
}
watch.stop();
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3WaitInflightLimitMicroseconds, watch.elapsedMicroseconds());
LOG_TEST(log, "waitIfAny ended, in queue {}", futures.size());
}
void WriteBufferFromS3::TaskTracker::add(Callback && func)
{
LOG_TEST(log, "add, in queue {}", futures.size());
/// All this fuzz is about 2 things. This is the most critical place of TaskTracker.
/// The first is not to fail insertion in the list `futures`.
/// In order to face it, the element is allocated at the end of the list `futures` in advance.
/// The second is not to fail the notification of the task.
/// In order to face it, the list element, which would be inserted to the list `finished_futures`,
/// is allocated in advance as an other list `pre_allocated_finished` with one element inside.
auto future = scheduler(std::move(func), Priority{});
auto exit_scope = scope_guard(
[&future]()
/// preallocation for the first issue
futures.emplace_back();
auto future_placeholder = std::prev(futures.end());
/// preallocation for the second issue
FinishedList pre_allocated_finished {future_placeholder};
Callback func_with_notification = [&, func=std::move(func), pre_allocated_finished=std::move(pre_allocated_finished)] () mutable
{
SCOPE_EXIT({
DENY_ALLOCATIONS_IN_SCOPE;
std::lock_guard lock(mutex);
finished_futures.splice(finished_futures.end(), pre_allocated_finished);
has_finished.notify_one();
});
func();
};
/// this move is nothrow
*future_placeholder = scheduler(std::move(func_with_notification), Priority{});
LOG_TEST(log, "add ended, in queue {}, limit {}", futures.size(), max_tasks_inflight);
waitTilInflightShrink();
}
void WriteBufferFromS3::TaskTracker::waitTilInflightShrink()
{
if (!max_tasks_inflight)
return;
LOG_TEST(log, "waitTilInflightShrink, in queue {}", futures.size());
Stopwatch watch;
/// Alternative approach is to wait until at least futures.size() - max_tasks_inflight element are finished
/// However the faster finished task is collected the faster CH checks if there is an exception
/// The faster an exception is propagated the lesser time is spent for cancellation
while (futures.size() >= max_tasks_inflight)
{
std::unique_lock lock(mutex);
has_finished.wait(lock, [this] () TSA_REQUIRES(mutex) { return !finished_futures.empty(); });
for (auto & it : finished_futures)
{
future.wait();
SCOPE_EXIT({
/// According to basic exception safety TaskTracker has to be destroyed after exception
/// If it would be true than this SCOPE_EXIT is superfluous
/// However WriteBufferWithFinalizeCallback, WriteBufferFromFileDecorator do call finalize in d-tor
/// TaskTracker has to cope this until the issue with finalizing in d-tor is addressed in #50274
futures.erase(it);
});
it->get();
}
);
futures.push_back(std::move(future));
finished_futures.clear();
}
exit_scope.release();
LOG_TEST(log, "add ended, in queue {}", futures.size());
watch.stop();
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3WaitInflightLimitMicroseconds, watch.elapsedMicroseconds());
LOG_TEST(log, "waitTilInflightShrink ended, in queue {}", futures.size());
}
bool WriteBufferFromS3::TaskTracker::isAsync() const

View File

@ -6,36 +6,61 @@
#include "WriteBufferFromS3.h"
#include <list>
namespace DB
{
/// That class is used only in WriteBufferFromS3 for now.
/// Therefore it declared as a part of WriteBufferFromS3.
/// TaskTracker takes a Callback which is run by scheduler in some external shared ThreadPool.
/// TaskTracker brings the methods waitReady, waitAll/safeWaitAll
/// TaskTracker brings the methods waitIfAny, waitAll/safeWaitAll
/// to help with coordination of the running tasks.
/// Basic exception safety is provided. If exception occurred the object has to be destroyed.
/// No thread safety is provided. Use this object with no concurrency.
class WriteBufferFromS3::TaskTracker
{
public:
using Callback = std::function<void()>;
explicit TaskTracker(ThreadPoolCallbackRunner<void> scheduler_);
TaskTracker(ThreadPoolCallbackRunner<void> scheduler_, size_t max_tasks_inflight_);
~TaskTracker();
static ThreadPoolCallbackRunner<void> syncRunner();
bool isAsync() const;
void waitReady();
/// waitIfAny collects statuses from already finished tasks
/// There could be no finished tasks yet, so waitIfAny do nothing useful in that case
/// the first exception is thrown if any task has failed
void waitIfAny();
/// Well, waitAll waits all the tasks until they finish and collects their statuses
void waitAll();
/// safeWaitAll does the same as waitAll but mutes the exceptions
void safeWaitAll();
void add(Callback && func);
private:
bool is_async;
/// waitTilInflightShrink waits til the number of in-flight tasks beyond the limit `max_tasks_inflight`.
void waitTilInflightShrink() TSA_NO_THREAD_SAFETY_ANALYSIS;
const bool is_async;
ThreadPoolCallbackRunner<void> scheduler;
std::list<std::future<void>> futures;
const size_t max_tasks_inflight;
using FutureList = std::list<std::future<void>>;
FutureList futures;
Poco::Logger * log = &Poco::Logger::get("TaskTracker");
std::mutex mutex;
std::condition_variable has_finished TSA_GUARDED_BY(mutex);
using FinishedList = std::list<FutureList::iterator>;
FinishedList finished_futures TSA_GUARDED_BY(mutex);
};
}

View File

@ -45,7 +45,7 @@ public:
using SystemLog<AsynchronousInsertLogElement>::SystemLog;
/// This table is usually queried for fixed table name.
static const char * getDefaultOrderBy() { return "(database, table, event_date, event_time)"; }
static const char * getDefaultOrderBy() { return "database, table, event_date, event_time"; }
};
}

View File

@ -49,7 +49,7 @@ public:
void addValues(const AsynchronousMetricValues &);
/// This table is usually queried for fixed metric name.
static const char * getDefaultOrderBy() { return "(metric, event_date, event_time)"; }
static const char * getDefaultOrderBy() { return "metric, event_date, event_time"; }
};
}

View File

@ -143,28 +143,58 @@ std::shared_ptr<TSystemLog> createSystemLog(
"If 'engine' is specified for system table, PARTITION BY parameters should "
"be specified directly inside 'engine' and 'partition_by' setting doesn't make sense");
if (config.has(config_prefix + ".ttl"))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "If 'engine' is specified for system table, "
"TTL parameters should be specified directly inside 'engine' and 'ttl' setting doesn't make sense");
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"If 'engine' is specified for system table, TTL parameters should "
"be specified directly inside 'engine' and 'ttl' setting doesn't make sense");
if (config.has(config_prefix + ".order_by"))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"If 'engine' is specified for system table, ORDER BY parameters should "
"be specified directly inside 'engine' and 'order_by' setting doesn't make sense");
if (config.has(config_prefix + ".storage_policy"))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "If 'engine' is specified for system table, SETTINGS storage_policy = '...' "
"should be specified directly inside 'engine' and 'storage_policy' setting doesn't make sense");
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"If 'engine' is specified for system table, SETTINGS storage_policy = '...' should "
"be specified directly inside 'engine' and 'storage_policy' setting doesn't make sense");
if (config.has(config_prefix + ".settings"))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"If 'engine' is specified for system table, SETTINGS parameters should "
"be specified directly inside 'engine' and 'settings' setting doesn't make sense");
engine = config.getString(config_prefix + ".engine");
}
else
{
String partition_by = config.getString(config_prefix + ".partition_by", "toYYYYMM(event_date)");
/// ENGINE expr is necessary.
engine = "ENGINE = MergeTree";
/// PARTITION expr is not necessary.
String partition_by = config.getString(config_prefix + ".partition_by", "toYYYYMM(event_date)");
if (!partition_by.empty())
engine += " PARTITION BY (" + partition_by + ")";
/// TTL expr is not necessary.
String ttl = config.getString(config_prefix + ".ttl", "");
if (!ttl.empty())
engine += " TTL " + ttl;
engine += " ORDER BY ";
engine += TSystemLog::getDefaultOrderBy();
/// ORDER BY expr is necessary.
String order_by = config.getString(config_prefix + ".order_by", TSystemLog::getDefaultOrderBy());
engine += " ORDER BY (" + order_by + ")";
/// SETTINGS expr is not necessary.
/// https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree#settings
///
/// STORAGE POLICY expr is retained for backward compatible.
String storage_policy = config.getString(config_prefix + ".storage_policy", "");
if (!storage_policy.empty())
engine += " SETTINGS storage_policy = " + quoteString(storage_policy);
String settings = config.getString(config_prefix + ".settings", "");
if (!storage_policy.empty() || !settings.empty())
{
engine += " SETTINGS";
/// If 'storage_policy' is repeated, the 'settings' configuration is preferred.
if (!storage_policy.empty())
engine += " storage_policy = " + quoteString(storage_policy);
if (!settings.empty())
engine += (storage_policy.empty() ? " " : ", ") + settings;
}
}
/// Validate engine definition syntax to prevent some configuration errors.

View File

@ -1192,13 +1192,14 @@ void Planner::buildPlanForQueryNode()
const auto & settings = query_context->getSettingsRef();
if (planner_context->getTableExpressionNodeToData().size() > 1
&& (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas > 0))
/// Check support for JOIN for parallel replicas with custom key
if (planner_context->getTableExpressionNodeToData().size() > 1)
{
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
if (settings.allow_experimental_parallel_reading_from_replicas == 1 || !settings.parallel_replicas_custom_key.value.empty())
{
LOG_WARNING(
&Poco::Logger::get("Planner"), "JOINs are not supported with parallel replicas. Query will be executed without using them.");
LOG_WARNING(
&Poco::Logger::get("Planner"),
"JOINs are not supported with parallel replicas. Query will be executed without using them.");
auto & mutable_context = planner_context->getMutableQueryContext();
mutable_context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));

View File

@ -76,7 +76,7 @@ public:
bool checkEndOfRow();
bool checkForSuffixImpl(bool check_eof);
inline void skipSpaces() { if (ignore_spaces) skipWhitespaceIfAny(*buf); }
inline void skipSpaces() { if (ignore_spaces) skipWhitespaceIfAny(*buf, true); }
EscapingRule getEscapingRule() const override { return format_settings.custom.escaping_rule; }

View File

@ -1263,6 +1263,11 @@ void StorageS3::Configuration::connect(ContextPtr context)
if (!headers_from_ast.empty())
headers.insert(headers.end(), headers_from_ast.begin(), headers_from_ast.end());
client_configuration.requestTimeoutMs = request_settings.request_timeout_ms;
client_configuration.retryStrategy
= std::make_shared<Aws::Client::DefaultRetryStrategy>(request_settings.retry_attempts);
auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key);
client = S3::ClientFactory::instance().create(
client_configuration,
@ -1273,11 +1278,11 @@ void StorageS3::Configuration::connect(ContextPtr context)
auth_settings.server_side_encryption_kms_config,
std::move(headers),
S3::CredentialsConfiguration{
auth_settings.use_environment_credentials.value_or(context->getConfigRef().getBool("s3.use_environment_credentials", true)),
auth_settings.use_insecure_imds_request.value_or(context->getConfigRef().getBool("s3.use_insecure_imds_request", false)),
auth_settings.expiration_window_seconds.value_or(
context->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
auth_settings.no_sign_request.value_or(context->getConfigRef().getBool("s3.no_sign_request", false)),
auth_settings.use_environment_credentials.value_or(context->getConfigRef().getBool("s3.use_environment_credentials", true)),
auth_settings.use_insecure_imds_request.value_or(context->getConfigRef().getBool("s3.use_insecure_imds_request", false)),
auth_settings.expiration_window_seconds.value_or(
context->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
auth_settings.no_sign_request.value_or(context->getConfigRef().getBool("s3.no_sign_request", false)),
});
}

View File

@ -37,6 +37,7 @@ S3Settings::RequestSettings::PartUploadSettings::PartUploadSettings(
max_upload_part_size = config.getUInt64(key + "max_upload_part_size", max_upload_part_size);
upload_part_size_multiply_factor = config.getUInt64(key + "upload_part_size_multiply_factor", upload_part_size_multiply_factor);
upload_part_size_multiply_parts_count_threshold = config.getUInt64(key + "upload_part_size_multiply_parts_count_threshold", upload_part_size_multiply_parts_count_threshold);
max_inflight_parts_for_one_file = config.getUInt64(key + "max_inflight_parts_for_one_file", max_inflight_parts_for_one_file);
max_part_number = config.getUInt64(key + "max_part_number", max_part_number);
max_single_part_upload_size = config.getUInt64(key + "max_single_part_upload_size", max_single_part_upload_size);
max_single_operation_copy_size = config.getUInt64(key + "max_single_operation_copy_size", max_single_operation_copy_size);
@ -55,6 +56,7 @@ S3Settings::RequestSettings::PartUploadSettings::PartUploadSettings(const NamedC
max_single_part_upload_size = collection.getOrDefault<UInt64>("max_single_part_upload_size", max_single_part_upload_size);
upload_part_size_multiply_factor = collection.getOrDefault<UInt64>("upload_part_size_multiply_factor", upload_part_size_multiply_factor);
upload_part_size_multiply_parts_count_threshold = collection.getOrDefault<UInt64>("upload_part_size_multiply_parts_count_threshold", upload_part_size_multiply_parts_count_threshold);
max_inflight_parts_for_one_file = collection.getOrDefault<UInt64>("max_inflight_parts_for_one_file", max_inflight_parts_for_one_file);
/// This configuration is only applicable to s3. Other types of object storage are not applicable or have different meanings.
storage_class_name = collection.getOrDefault<String>("s3_storage_class", storage_class_name);
@ -80,6 +82,9 @@ void S3Settings::RequestSettings::PartUploadSettings::updateFromSettingsImpl(con
if (!if_changed || settings.s3_upload_part_size_multiply_parts_count_threshold.changed)
upload_part_size_multiply_parts_count_threshold = settings.s3_upload_part_size_multiply_parts_count_threshold;
if (!if_changed || settings.s3_max_inflight_parts_for_one_file.changed)
max_inflight_parts_for_one_file = settings.s3_max_inflight_parts_for_one_file;
if (!if_changed || settings.s3_max_single_part_upload_size.changed)
max_single_part_upload_size = settings.s3_max_single_part_upload_size;
}
@ -193,6 +198,8 @@ S3Settings::RequestSettings::RequestSettings(
check_objects_after_upload = config.getBool(key + "check_objects_after_upload", settings.s3_check_objects_after_upload);
list_object_keys_size = config.getUInt64(key + "list_object_keys_size", settings.s3_list_object_keys_size);
throw_on_zero_files_match = config.getBool(key + "throw_on_zero_files_match", settings.s3_throw_on_zero_files_match);
retry_attempts = config.getUInt64(key + "retry_attempts", settings.s3_retry_attempts);
request_timeout_ms = config.getUInt64(key + "request_timeout_ms", request_timeout_ms);
/// NOTE: it would be better to reuse old throttlers to avoid losing token bucket state on every config reload,
/// which could lead to exceeding limit for short time. But it is good enough unless very high `burst` values are used.
@ -243,8 +250,11 @@ void S3Settings::RequestSettings::updateFromSettingsImpl(const Settings & settin
put_request_throttler = std::make_shared<Throttler>(
settings.s3_max_put_rps, settings.s3_max_put_burst ? settings.s3_max_put_burst : Throttler::default_burst_seconds * settings.s3_max_put_rps);
if (!if_changed || settings.s3_throw_on_zero_files_match)
if (!if_changed || settings.s3_throw_on_zero_files_match.changed)
throw_on_zero_files_match = settings.s3_throw_on_zero_files_match;
if (!if_changed || settings.s3_retry_attempts.changed)
retry_attempts = settings.s3_retry_attempts;
}
void S3Settings::RequestSettings::updateFromSettings(const Settings & settings)

View File

@ -33,6 +33,7 @@ struct S3Settings
size_t max_upload_part_size = 5ULL * 1024 * 1024 * 1024;
size_t upload_part_size_multiply_factor = 2;
size_t upload_part_size_multiply_parts_count_threshold = 500;
size_t max_inflight_parts_for_one_file = 20;
size_t max_part_number = 10000;
size_t max_single_part_upload_size = 32 * 1024 * 1024;
size_t max_single_operation_copy_size = 5ULL * 1024 * 1024 * 1024;
@ -67,6 +68,8 @@ struct S3Settings
size_t list_object_keys_size = 1000;
ThrottlerPtr get_request_throttler;
ThrottlerPtr put_request_throttler;
size_t retry_attempts = 10;
size_t request_timeout_ms = 30000;
bool throw_on_zero_files_match = false;

View File

@ -27,8 +27,11 @@
<endpoint>http://resolver:8083/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<s3_retry_attempts>1</s3_retry_attempts>
<skip_access_check>true</skip_access_check>
<retry_attempts>0</retry_attempts>
<connect_timeout_ms>20000</connect_timeout_ms>
<request_timeout_ms>20000</request_timeout_ms>
<s3_max_inflight_parts_for_one_file>1</s3_max_inflight_parts_for_one_file>
</broken_s3>
<hdd>
<type>local</type>

View File

@ -0,0 +1,8 @@
<clickhouse>
<profiles>
<default>
<enable_s3_requests_logging>1</enable_s3_requests_logging>
<s3_max_inflight_parts_for_one_file>20</s3_max_inflight_parts_for_one_file>
</default>
</profiles>
</clickhouse>

View File

@ -1,3 +1,11 @@
<clickhouse>
<s3>
<broken_s3>
<endpoint>http://resolver:8083/root/data/</endpoint>
<retry_attempts>0</retry_attempts>
<request_timeout_ms>20000</request_timeout_ms>
</broken_s3>
</s3>
<enable_system_unfreeze>true</enable_system_unfreeze>
</clickhouse>

View File

@ -1,4 +1,8 @@
import logging
import sys
import threading
import random
import time
import urllib.parse
import http.server
import socketserver
@ -9,16 +13,66 @@ UPSTREAM_PORT = 9001
class ServerRuntime:
class SlowPut:
def __init__(
self, probability_=None, timeout_=None, minimal_length_=None, count_=None
):
self.probability = probability_ if probability_ is not None else 1
self.timeout = timeout_ if timeout_ is not None else 0.1
self.minimal_length = minimal_length_ if minimal_length_ is not None else 0
self.count = count_ if count_ is not None else 2**32
def __str__(self):
return (
f"probability:{self.probability}"
f" timeout:{self.timeout}"
f" minimal_length:{self.minimal_length}"
f" count:{self.count}"
)
def get_timeout(self, content_length):
if content_length > self.minimal_length:
if self.count > 0:
if (
runtime.slow_put.probability == 1
or random.random() <= runtime.slow_put.probability
):
self.count -= 1
return runtime.slow_put.timeout
return None
def __init__(self):
self.lock = threading.Lock()
self.error_at_put_when_length_bigger = None
self.fake_put_when_length_bigger = None
self.fake_uploads = dict()
self.slow_put = None
def register_fake_upload(self, upload_id, key):
with self.lock:
self.fake_uploads[upload_id] = key
def is_fake_upload(self, upload_id, key):
with self.lock:
if upload_id in self.fake_uploads:
return self.fake_uploads[upload_id] == key
return False
def reset(self):
self.error_at_put_when_length_bigger = None
self.fake_put_when_length_bigger = None
self.fake_uploads = dict()
self.slow_put = None
runtime = ServerRuntime()
def and_then(value, func):
assert callable(func)
return None if value is None else func(value)
class RequestHandler(http.server.BaseHTTPRequestHandler):
def _ok(self):
self.send_response(200)
@ -55,51 +109,124 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
self.send_response(500)
self.send_header("Content-Type", "text/xml")
self.end_headers()
self.wfile.write(data)
self.wfile.write(bytes(data, "UTF-8"))
def _fake_put_ok(self):
self._read_out()
self.send_response(200)
self.send_header("Content-Type", "text/xml")
self.send_header("ETag", "b54357faf0632cce46e942fa68356b38")
self.send_header("Content-Length", 0)
self.end_headers()
def _fake_post_ok(self, path):
self._read_out()
parts = [x for x in path.split("/") if x]
bucket = parts[0]
key = "/".join(parts[1:])
location = "http://Example-Bucket.s3.Region.amazonaws.com/" + path
data = (
'<?xml version="1.0" encoding="UTF-8"?>\n'
"<CompleteMultipartUploadResult>\n"
f"<Location>{location}</Location>\n"
f"<Bucket>{bucket}</Bucket>\n"
f"<Key>{key}</Key>\n"
f'<ETag>"3858f62230ac3c915f300c664312c11f-9"</ETag>\n'
f"</CompleteMultipartUploadResult>\n"
)
self.send_response(200)
self.send_header("Content-Type", "text/xml")
self.send_header("Content-Length", len(data))
self.end_headers()
self.wfile.write(bytes(data, "UTF-8"))
def _mock_settings(self):
parts = urllib.parse.urlsplit(self.path)
path = [x for x in parts.path.split("/") if x]
assert path[0] == "mock_settings", path
if len(path) < 2:
return self._error("_mock_settings: wrong command")
if path[1] == "error_at_put":
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
runtime.error_at_put_when_length_bigger = int(
params.get("when_length_bigger", [1024 * 1024])[0]
)
self._ok()
elif path[1] == "reset":
return self._ok()
if path[1] == "fake_put":
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
runtime.fake_put_when_length_bigger = int(
params.get("when_length_bigger", [1024 * 1024])[0]
)
return self._ok()
if path[1] == "slow_put":
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
runtime.slow_put = ServerRuntime.SlowPut(
minimal_length_=and_then(params.get("minimal_length", [None])[0], int),
probability_=and_then(params.get("probability", [None])[0], float),
timeout_=and_then(params.get("timeout", [None])[0], float),
count_=and_then(params.get("count", [None])[0], int),
)
self.log_message("set slow put %s", runtime.slow_put)
return self._ok()
if path[1] == "reset":
runtime.reset()
self._ok()
else:
self._error("_mock_settings: wrong command")
return self._ok()
return self._error("_mock_settings: wrong command")
def do_GET(self):
if self.path == "/":
self._ping()
elif self.path.startswith("/mock_settings"):
self._mock_settings()
else:
self._redirect()
return self._ping()
if self.path.startswith("/mock_settings"):
return self._mock_settings()
return self._redirect()
def do_PUT(self):
content_length = int(self.headers.get("Content-Length", 0))
if runtime.slow_put is not None:
timeout = runtime.slow_put.get_timeout(content_length)
if timeout is not None:
self.log_message("slow put %s", timeout)
time.sleep(timeout)
if runtime.error_at_put_when_length_bigger is not None:
content_length = int(self.headers.get("Content-Length", 0))
if content_length > runtime.error_at_put_when_length_bigger:
self._error(
b'<?xml version="1.0" encoding="UTF-8"?>'
b"<Error>"
b"<Code>ExpectedError</Code>"
b"<Message>mock s3 injected error</Message>"
b"<RequestId>txfbd566d03042474888193-00608d7537</RequestId>"
b"</Error>"
return self._error(
'<?xml version="1.0" encoding="UTF-8"?>'
"<Error>"
"<Code>ExpectedError</Code>"
"<Message>mock s3 injected error</Message>"
"<RequestId>txfbd566d03042474888193-00608d7537</RequestId>"
"</Error>"
)
else:
self._redirect()
else:
self._redirect()
parts = urllib.parse.urlsplit(self.path)
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
upload_id = params.get("uploadId", [None])[0]
if runtime.fake_put_when_length_bigger is not None and upload_id is not None:
if content_length > runtime.fake_put_when_length_bigger:
runtime.register_fake_upload(upload_id, parts.path)
return self._fake_put_ok()
return self._redirect()
def do_POST(self):
self._redirect()
parts = urllib.parse.urlsplit(self.path)
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
upload_id = params.get("uploadId", [None])[0]
if runtime.is_fake_upload(upload_id, parts.path):
return self._fake_post_ok(parts.path)
return self._redirect()
def do_HEAD(self):
self._redirect()

View File

@ -26,6 +26,9 @@ def cluster():
"configs/config.d/storage_conf.xml",
"configs/config.d/bg_processing_pool_conf.xml",
],
user_configs=[
"configs/config.d/users.xml",
],
stay_alive=True,
with_minio=True,
)
@ -139,14 +142,76 @@ def clear_minio(cluster):
yield
class BrokenS3:
@staticmethod
def reset(cluster):
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
[
"curl",
"-s",
f"http://localhost:8083/mock_settings/reset",
],
nothrow=True,
)
assert response == "OK"
@staticmethod
def setup_fail_upload(cluster, part_length):
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
[
"curl",
"-s",
f"http://localhost:8083/mock_settings/error_at_put?when_length_bigger={part_length}",
],
nothrow=True,
)
assert response == "OK"
@staticmethod
def setup_fake_upload(cluster, part_length):
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
[
"curl",
"-s",
f"http://localhost:8083/mock_settings/fake_put?when_length_bigger={part_length}",
],
nothrow=True,
)
assert response == "OK"
@staticmethod
def setup_slow_answers(
cluster, minimal_length=0, timeout=None, probability=None, count=None
):
url = (
f"http://localhost:8083/"
f"mock_settings/slow_put"
f"?minimal_length={minimal_length}"
)
if timeout is not None:
url += f"&timeout={timeout}"
if probability is not None:
url += f"&probability={probability}"
if count is not None:
url += f"&count={count}"
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
["curl", "-s", url],
nothrow=True,
)
assert response == "OK"
@pytest.fixture(autouse=True, scope="function")
def reset_mock_broken_s3(cluster):
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
["curl", "-s", f"http://localhost:8083/mock_settings/reset"],
nothrow=True,
)
assert response == "OK"
def reset_broken_s3(cluster):
BrokenS3.reset(cluster)
yield
@ -886,16 +951,7 @@ def test_merge_canceled_by_s3_errors(cluster, node_name):
min_key = node.query("SELECT min(key) FROM test_merge_canceled_by_s3_errors")
assert int(min_key) == 0, min_key
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
[
"curl",
"-s",
f"http://localhost:8083/mock_settings/error_at_put?when_length_bigger=50000",
],
nothrow=True,
)
assert response == "OK"
BrokenS3.setup_fail_upload(cluster, 50000)
node.query("SYSTEM START MERGES test_merge_canceled_by_s3_errors")
@ -938,16 +994,7 @@ def test_merge_canceled_by_s3_errors_when_move(cluster, node_name):
settings={"materialize_ttl_after_modify": 0},
)
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
[
"curl",
"-s",
f"http://localhost:8083/mock_settings/error_at_put?when_length_bigger=10000",
],
nothrow=True,
)
assert response == "OK"
BrokenS3.setup_fail_upload(cluster, 10000)
node.query("SYSTEM START MERGES merge_canceled_by_s3_errors_when_move")
@ -961,3 +1008,95 @@ def test_merge_canceled_by_s3_errors_when_move(cluster, node_name):
check_no_objects_after_drop(
cluster, table_name="merge_canceled_by_s3_errors_when_move", node_name=node_name
)
@pytest.mark.parametrize("node_name", ["node"])
@pytest.mark.parametrize(
"in_flight_memory", [(10, 245918115), (5, 156786752), (1, 106426187)]
)
def test_s3_engine_heavy_write_check_mem(cluster, node_name, in_flight_memory):
in_flight = in_flight_memory[0]
memory = in_flight_memory[1]
node = cluster.instances[node_name]
node.query("DROP TABLE IF EXISTS s3_test SYNC")
node.query(
"CREATE TABLE s3_test"
" ("
" key UInt32 CODEC(NONE), value String CODEC(NONE)"
" )"
" ENGINE S3('http://resolver:8083/root/data/test-upload.csv', 'minio', 'minio123', 'CSV')",
)
BrokenS3.setup_fake_upload(cluster, 1000)
BrokenS3.setup_slow_answers(cluster, 10 * 1024 * 1024, timeout=15, count=10)
query_id = f"INSERT_INTO_S3_ENGINE_QUERY_ID_{in_flight}"
node.query(
"INSERT INTO s3_test SELECT number, toString(number) FROM numbers(50000000)"
f" SETTINGS max_memory_usage={2*memory}"
f", s3_max_inflight_parts_for_one_file={in_flight}",
query_id=query_id,
)
node.query("SYSTEM FLUSH LOGS")
memory_usage, wait_inflight = node.query(
"SELECT memory_usage, ProfileEvents['WriteBufferFromS3WaitInflightLimitMicroseconds']"
" FROM system.query_log"
f" WHERE query_id='{query_id}'"
" AND type!='QueryStart'"
).split()
assert int(memory_usage) < 1.1 * memory
assert int(memory_usage) > 0.9 * memory
assert int(wait_inflight) > 10 * 1000 * 1000
check_no_objects_after_drop(cluster, node_name=node_name)
@pytest.mark.parametrize("node_name", ["node"])
def test_s3_disk_heavy_write_check_mem(cluster, node_name):
memory = 2279055040
node = cluster.instances[node_name]
node.query("DROP TABLE IF EXISTS s3_test SYNC")
node.query(
"CREATE TABLE s3_test"
" ("
" key UInt32, value String"
" )"
" ENGINE=MergeTree()"
" ORDER BY key"
" SETTINGS"
" storage_policy='broken_s3'",
)
node.query("SYSTEM STOP MERGES s3_test")
BrokenS3.setup_fake_upload(cluster, 1000)
BrokenS3.setup_slow_answers(cluster, 10 * 1024 * 1024, timeout=10, count=50)
query_id = f"INSERT_INTO_S3_DISK_QUERY_ID"
node.query(
"INSERT INTO s3_test SELECT number, toString(number) FROM numbers(50000000)"
f" SETTINGS max_memory_usage={2*memory}"
f", max_insert_block_size=50000000"
f", min_insert_block_size_rows=50000000"
f", min_insert_block_size_bytes=1000000000000",
query_id=query_id,
)
node.query("SYSTEM FLUSH LOGS")
result = node.query(
"SELECT memory_usage"
" FROM system.query_log"
f" WHERE query_id='{query_id}'"
" AND type!='QueryStart'"
)
assert int(result) < 1.1 * memory
assert int(result) > 0.9 * memory
check_no_objects_after_drop(cluster, node_name=node_name)

View File

@ -32,7 +32,7 @@
<!-- ClickHouse starts earlier than custom S3 endpoint. Skip access check to avoid fail on start-up -->
<skip_access_check>true</skip_access_check>
<!-- Avoid extra retries to speed up tests -->
<s3_retry_attempts>1</s3_retry_attempts>
<retry_attempts>1</retry_attempts>
<s3_max_single_read_retries>1</s3_max_single_read_retries>
<connect_timeout_ms>20000</connect_timeout_ms>
</s3_no_retries>

View File

@ -183,7 +183,7 @@ def test_postgres_conversions(started_cluster):
cursor.execute(f"DROP TABLE test_array_dimensions")
def test_non_default_scema(started_cluster):
def test_non_default_schema(started_cluster):
node1.query("DROP TABLE IF EXISTS test_pg_table_schema")
node1.query("DROP TABLE IF EXISTS test_pg_table_schema_with_dots")

View File

@ -0,0 +1,28 @@
<clickhouse>
<storage_configuration>
<disks>
<disk1>
<path>/var/lib/clickhouse1/</path>
</disk1>
<disk2>
<path>/var/lib/clickhouse2/</path>
</disk2>
</disks>
<policies>
<policy1>
<volumes>
<volume1>
<disk>disk1</disk>
</volume1>
</volumes>
</policy1>
<policy2>
<volumes>
<volume1>
<disk>disk2</disk>
</volume1>
</volumes>
</policy2>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,5 @@
<clickhouse>
<query_log>
<engine>Engine = MergeTree PARTITION BY event_date ORDER BY event_time TTL event_date + INTERVAL 30 day SETTINGS storage_policy='policy2', ttl_only_drop_parts=1</engine>
</query_log>
</clickhouse>

View File

@ -0,0 +1,8 @@
<clickhouse>
<query_log>
<order_by>event_date, event_time, initial_query_id</order_by>
</query_log>
<query_thread_log>
<order_by>event_date, event_time, query_id</order_by>
</query_thread_log>
</clickhouse>

View File

@ -0,0 +1,10 @@
<clickhouse>
<query_log>
<partition_by>toYYYYMM(event_date)</partition_by>
<ttl>event_date + INTERVAL 30 DAY DELETE</ttl>
<order_by>event_date, event_time, initial_query_id</order_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<storage_policy>policy1</storage_policy>
<settings>storage_policy='policy2', ttl_only_drop_parts=1</settings>
</query_log>
</clickhouse>

View File

@ -0,0 +1,15 @@
<clickhouse>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
<http_port>8123</http_port>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<interserver_http_port>9009</interserver_http_port>
<interserver_http_host>127.0.0.1</interserver_http_host>
</clickhouse>

View File

@ -0,0 +1,38 @@
<clickhouse>
<profiles>
<default>
<max_memory_usage>10000000000</max_memory_usage>
<max_block_size>64999</max_block_size>
<load_balancing>random</load_balancing>
</default>
<readonly>
<readonly>1</readonly>
</readonly>
</profiles>
<users>
<default>
<password></password>
<networks>
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
<interval>
<duration>3600</duration>
<queries>0</queries>
<errors>0</errors>
<result_rows>0</result_rows>
<read_rows>0</read_rows>
<execution_time>0</execution_time>
</interval>
</default>
</quotas>
</clickhouse>

View File

@ -0,0 +1,90 @@
# pylint: disable=line-too-long
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
base_config_dir="configs",
main_configs=["configs/config.d/system_logs_order_by.xml"],
stay_alive=True,
)
node2 = cluster.add_instance(
"node2",
base_config_dir="configs",
main_configs=[
"configs/config.d/system_logs_engine.xml",
"configs/config.d/disks.xml",
],
stay_alive=True,
)
node3 = cluster.add_instance(
"node3",
base_config_dir="configs",
main_configs=[
"configs/config.d/system_logs_settings.xml",
"configs/config.d/disks.xml",
],
stay_alive=True,
)
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_system_logs_order_by_expr(start_cluster):
node1.query("SET log_query_threads = 1")
node1.query("SELECT count() FROM system.tables")
node1.query("SYSTEM FLUSH LOGS")
# Check 'sorting_key' of system.query_log.
assert (
node1.query(
"SELECT sorting_key FROM system.tables WHERE database='system' and name='query_log'"
)
== "event_date, event_time, initial_query_id\n"
)
# Check 'sorting_key' of system.query_thread_log.
assert (
node1.query(
"SELECT sorting_key FROM system.tables WHERE database='system' and name='query_thread_log'"
)
== "event_date, event_time, query_id\n"
)
def test_system_logs_engine_expr(start_cluster):
node2.query("SET log_query_threads = 1")
node2.query("SELECT count() FROM system.tables")
node2.query("SYSTEM FLUSH LOGS")
# Check 'engine_full' of system.query_log.
expected = "MergeTree PARTITION BY event_date ORDER BY event_time TTL event_date + toIntervalDay(30) SETTINGS storage_policy = \\'policy2\\', ttl_only_drop_parts = 1"
assert expected in node2.query(
"SELECT engine_full FROM system.tables WHERE database='system' and name='query_log'"
)
def test_system_logs_settings_expr(start_cluster):
node3.query("SET log_query_threads = 1")
node3.query("SELECT count() FROM system.tables")
node3.query("SYSTEM FLUSH LOGS")
# Check 'engine_full' of system.query_log.
expected = "MergeTree PARTITION BY toYYYYMM(event_date) ORDER BY (event_date, event_time, initial_query_id) TTL event_date + toIntervalDay(30) SETTINGS storage_policy = \\'policy1\\', storage_policy = \\'policy2\\', ttl_only_drop_parts = 1"
assert expected in node3.query(
"SELECT engine_full FROM system.tables WHERE database='system' and name='query_log'"
)

View File

@ -0,0 +1,3 @@
<test>
<query>SELECT uniqExactIf(number, 1) FROM numbers_mt(1e6)</query>
</test>

View File

@ -0,0 +1 @@
unquoted_string

View File

@ -0,0 +1 @@
select * from format(CustomSeparatedIgnoreSpaces, 'x String', ' unquoted_string\n') settings format_custom_escaping_rule='CSV';

View File

@ -6,6 +6,6 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
out="explain1.$CLICKHOUSE_TEST_UNIQUE_NAME.out"
# only EXPLAIN triggers the problem under MSan
$CLICKHOUSE_CLIENT -q "explain select * from numbers(1) into outfile '$out'"
$CLICKHOUSE_CLIENT --allow_experimental_analyzer=0 -q "explain select * from numbers(1) into outfile '$out'"
cat "$out"
rm -f "$out"