update doc, add profile event WriteBufferFromS3WaitInflightLimitMicroseconds

This commit is contained in:
Sema Checherinda 2023-05-24 13:43:48 +02:00
parent 7031796d3f
commit b0eff95388
5 changed files with 23 additions and 9 deletions

View File

@ -139,6 +139,7 @@ The following settings can be set before query execution or placed into configur
- `s3_max_put_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_put_rps`. - `s3_max_put_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_put_rps`.
- `s3_max_get_rps` — Maximum GET requests per second rate before throttling. Default value is `0` (unlimited). - `s3_max_get_rps` — Maximum GET requests per second rate before throttling. Default value is `0` (unlimited).
- `s3_max_get_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_get_rps`. - `s3_max_get_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_get_rps`.
- `s3_max_inflight_parts_for_one_file` - Limits the number of put requests that can be run concurenly for one object. The value `0` means unlimited. Default value is `20`.
Security consideration: if malicious user can specify arbitrary S3 URLs, `s3_max_redirects` must be set to zero to avoid [SSRF](https://en.wikipedia.org/wiki/Server-side_request_forgery) attacks; or alternatively, `remote_host_filter` must be specified in server configuration. Security consideration: if malicious user can specify arbitrary S3 URLs, `s3_max_redirects` must be set to zero to avoid [SSRF](https://en.wikipedia.org/wiki/Server-side_request_forgery) attacks; or alternatively, `remote_host_filter` must be specified in server configuration.

View File

@ -1219,11 +1219,12 @@ Authentication parameters (the disk will try all available methods **and** Manag
* `account_name` and `account_key` - For authentication using Shared Key. * `account_name` and `account_key` - For authentication using Shared Key.
Limit parameters (mainly for internal usage): Limit parameters (mainly for internal usage):
* `max_single_part_upload_size` - Limits the size of a single block upload to Blob Storage. * `s3_max_single_part_upload_size` - Limits the size of a single block upload to Blob Storage.
* `min_bytes_for_seek` - Limits the size of a seekable region. * `min_bytes_for_seek` - Limits the size of a seekable region.
* `max_single_read_retries` - Limits the number of attempts to read a chunk of data from Blob Storage. * `max_single_read_retries` - Limits the number of attempts to read a chunk of data from Blob Storage.
* `max_single_download_retries` - Limits the number of attempts to download a readable buffer from Blob Storage. * `max_single_download_retries` - Limits the number of attempts to download a readable buffer from Blob Storage.
* `thread_pool_size` - Limits the number of threads with which `IDiskRemote` is instantiated. * `thread_pool_size` - Limits the number of threads with which `IDiskRemote` is instantiated.
* `s3_max_inflight_parts_for_one_file` - Limits the number of put requests that can be run concurenly for one object.
Other parameters: Other parameters:
* `metadata_path` - Path on local FS to store metadata files for Blob Storage. Default value is `/var/lib/clickhouse/disks/<disk_name>/`. * `metadata_path` - Path on local FS to store metadata files for Blob Storage. Default value is `/var/lib/clickhouse/disks/<disk_name>/`.

View File

@ -366,7 +366,7 @@ The server successfully detected this situation and will download merged part fr
M(WriteBufferFromS3Microseconds, "Time spent on writing to S3.") \ M(WriteBufferFromS3Microseconds, "Time spent on writing to S3.") \
M(WriteBufferFromS3Bytes, "Bytes written to S3.") \ M(WriteBufferFromS3Bytes, "Bytes written to S3.") \
M(WriteBufferFromS3RequestsErrors, "Number of exceptions while writing to S3.") \ M(WriteBufferFromS3RequestsErrors, "Number of exceptions while writing to S3.") \
\ M(WriteBufferFromS3WaitInflightLimitMicroseconds, "Time spent on waiting while some of the current requests are done when its number reached the limit defined by s3_max_inflight_parts_for_one_file.") \
M(QueryMemoryLimitExceeded, "Number of times when memory limit exceeded for query.") \ M(QueryMemoryLimitExceeded, "Number of times when memory limit exceeded for query.") \
\ \
M(CachedReadBufferReadFromSourceMicroseconds, "Time reading from filesystem cache source (from remote filesystem, etc)") \ M(CachedReadBufferReadFromSourceMicroseconds, "Time reading from filesystem cache source (from remote filesystem, etc)") \

View File

@ -4,6 +4,11 @@
#include <IO/WriteBufferFromS3TaskTracker.h> #include <IO/WriteBufferFromS3TaskTracker.h>
namespace ProfileEvents
{
extern const Event WriteBufferFromS3WaitInflightLimitMicroseconds;
}
namespace DB namespace DB
{ {
@ -125,10 +130,10 @@ void WriteBufferFromS3::TaskTracker::waitAny()
{ {
LOG_TEST(log, "waitAny, in queue {}", futures.size()); LOG_TEST(log, "waitAny, in queue {}", futures.size());
while (futures.size() > 0 && consumeReady() == 0) while (!futures.empty() && consumeReady() == 0)
{ {
std::unique_lock lock(mutex); std::unique_lock lock(mutex);
cond_var.wait(lock, [&] () { return finished_futures.size() > 0; }); cond_var.wait(lock, [&] () { return !finished_futures.empty(); });
} }
LOG_TEST(log, "waitAny ended, in queue {}", futures.size()); LOG_TEST(log, "waitAny ended, in queue {}", futures.size());
@ -167,11 +172,16 @@ void WriteBufferFromS3::TaskTracker::waitInFlight()
LOG_TEST(log, "waitInFlight, in queue {}", futures.size()); LOG_TEST(log, "waitInFlight, in queue {}", futures.size());
Stopwatch watch;
while (futures.size() >= max_tasks_inflight) while (futures.size() >= max_tasks_inflight)
{ {
waitAny(); waitAny();
} }
watch.stop();
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3WaitInflightLimitMicroseconds, watch.elapsedMicroseconds());
LOG_TEST(log, "waitInFlight ended, in queue {}", futures.size()); LOG_TEST(log, "waitInFlight ended, in queue {}", futures.size());
} }

View File

@ -1041,15 +1041,17 @@ def test_s3_engine_heavy_write_check_mem(cluster, node_name, in_flight_memory):
node.query("SYSTEM FLUSH LOGS") node.query("SYSTEM FLUSH LOGS")
result = node.query( memory_usage, wait_inflight = node.query(
"SELECT memory_usage" "SELECT memory_usage, ProfileEvents['WriteBufferFromS3WaitInflightLimitMicroseconds']"
" FROM system.query_log" " FROM system.query_log"
f" WHERE query_id='{query_id}'" f" WHERE query_id='{query_id}'"
" AND type!='QueryStart'" " AND type!='QueryStart'"
) ).split()
assert int(result) < 1.1 * memory assert int(memory_usage) < 1.1 * memory
assert int(result) > 0.9 * memory assert int(memory_usage) > 0.9 * memory
assert int(wait_inflight) > 10 * 1000 * 1000
check_no_objects_after_drop(cluster, node_name=node_name) check_no_objects_after_drop(cluster, node_name=node_name)