Merge remote-tracking branch 'upstream/master' into group_by_all

This commit is contained in:
taofengliu 2022-11-19 22:36:31 +08:00
commit 66f101e451
50 changed files with 1572 additions and 340 deletions

File diff suppressed because it is too large Load Diff

View File

@ -38,7 +38,7 @@ jobs:
with:
ref: master
fetch-depth: 0
- name: Generate versions
- name: Update versions, docker version, changelog, security
env:
GITHUB_TOKEN: ${{ secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN }}
run: |
@ -51,6 +51,7 @@ jobs:
--gh-user-or-token="$GITHUB_TOKEN" --jobs=5 \
--output="/ClickHouse/docs/changelogs/${GITHUB_TAG}.md" "${GITHUB_TAG}"
git add "./docs/changelogs/${GITHUB_TAG}.md"
python ./utils/security-generator/generate_security.py > SECURITY.md
git diff HEAD
- name: Create Pull Request
uses: peter-evans/create-pull-request@v3

View File

@ -1,3 +1,6 @@
<!--
the file is autogenerated by utils/security-generator/generate_security.py
-->
# Security Policy
@ -62,5 +65,5 @@ As the security issue moves from triage, to identified fix, to release planning
## Public Disclosure Timing
A public disclosure date is negotiated by the ClickHouse maintainers and the bug submitter. We prefer to fully disclose the bug as soon as possible once a user mitigation is available. It is reasonable to delay disclosure when the bug or the fix is not yet fully understood, the solution is not well-tested, or for vendor coordination. The timeframe for disclosure is from immediate (especially if it's already publicly known) to 90 days. For a vulnerability with a straightforward mitigation, we expect the report date to disclosure date to be on the order of 7 days.
A public disclosure date is negotiated by the ClickHouse maintainers and the bug submitter. We prefer to fully disclose the bug as soon as possible once a user mitigation is available. It is reasonable to delay disclosure when the bug or the fix is not yet fully understood, the solution is not well-tested, or for vendor coordination. The timeframe for disclosure is from immediate (especially if it's already publicly known) to 90 days. For a vulnerability with a straightforward mitigation, we expect the report date to disclosure date to be on the order of 7 days.

View File

@ -254,7 +254,7 @@ sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_defau
start
./stress --hung-check --drop-databases --output-folder test_output --skip-func-tests "$SKIP_TESTS_OPTION" \
./stress --hung-check --drop-databases --output-folder test_output --skip-func-tests "$SKIP_TESTS_OPTION" --global-time-limit 1200 \
&& echo -e 'Test script exit code\tOK' >> /test_output/test_results.tsv \
|| echo -e 'Test script failed\tFAIL' >> /test_output/test_results.tsv

View File

@ -127,6 +127,10 @@ The following settings can be set before query execution or placed into configur
- `s3_min_upload_part_size` — The minimum size of part to upload during multipart upload to [S3 Multipart upload](https://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html). Default value is `512Mb`.
- `s3_max_redirects` — Max number of S3 redirects hops allowed. Default value is `10`.
- `s3_single_read_retries` — The maximum number of attempts during single read. Default value is `4`.
- `s3_max_put_rps` — Maximum PUT requests per second rate before throttling. Default value is `0` (unlimited).
- `s3_max_put_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_put_rps`.
- `s3_max_get_rps` — Maximum GET requests per second rate before throttling. Default value is `0` (unlimited).
- `s3_max_get_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_get_rps`.
Security consideration: if malicious user can specify arbitrary S3 URLs, `s3_max_redirects` must be set to zero to avoid [SSRF](https://en.wikipedia.org/wiki/Server-side_request_forgery) attacks; or alternatively, `remote_host_filter` must be specified in server configuration.
@ -142,6 +146,7 @@ The following settings can be specified in configuration file for given endpoint
- `header` — Adds specified HTTP header to a request to given endpoint. Optional, can be specified multiple times.
- `server_side_encryption_customer_key_base64` — If specified, required headers for accessing S3 objects with SSE-C encryption will be set. Optional.
- `max_single_read_retries` — The maximum number of attempts during single read. Default value is `4`. Optional.
- `max_put_rps`, `max_put_burst`, `max_get_rps` and `max_get_burst` - Throttling settings (see description above) to use for specific endpoint instead of per query. Optional.
**Example:**

View File

@ -940,6 +940,10 @@ Optional parameters:
- `cache_path` — Path on local FS where to store cached mark and index files. Default value is `/var/lib/clickhouse/disks/<disk_name>/cache/`.
- `skip_access_check` — If true, disk access checks will not be performed on disk start-up. Default value is `false`.
- `server_side_encryption_customer_key_base64` — If specified, required headers for accessing S3 objects with SSE-C encryption will be set.
- `s3_max_put_rps` — Maximum PUT requests per second rate before throttling. Default value is `0` (unlimited).
- `s3_max_put_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_put_rps`.
- `s3_max_get_rps` — Maximum GET requests per second rate before throttling. Default value is `0` (unlimited).
- `s3_max_get_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_get_rps`.
S3 disk can be configured as `main` or `cold` storage:
``` xml

View File

@ -189,10 +189,12 @@ preAllocSize=131072
# especially if there are a lot of clients. To prevent ZooKeeper from running
# out of memory due to queued requests, ZooKeeper will throttle clients so that
# there is no more than globalOutstandingLimit outstanding requests in the
# system. The default limit is 1,000.ZooKeeper logs transactions to a
# transaction log. After snapCount transactions are written to a log file a
# snapshot is started and a new transaction log file is started. The default
# snapCount is 10,000.
# system. The default limit is 1000.
# globalOutstandingLimit=1000
# ZooKeeper logs transactions to a transaction log. After snapCount transactions
# are written to a log file a snapshot is started and a new transaction log file
# is started. The default snapCount is 100000.
snapCount=3000000
# If this option is defined, requests will be will logged to a trace file named

View File

@ -46,7 +46,7 @@ namespace
context->getRemoteHostFilter(),
static_cast<unsigned>(context->getGlobalContext()->getSettingsRef().s3_max_redirects),
context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
/* for_disk_s3 = */ false);
/* for_disk_s3 = */ false, /* get_request_throttler = */ {}, /* put_request_throttler = */ {});
client_configuration.endpointOverride = s3_uri.endpoint;
client_configuration.maxConnections = static_cast<unsigned>(context->getSettingsRef().s3_max_connections);
@ -86,9 +86,10 @@ BackupReaderS3::BackupReaderS3(
const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, const ContextPtr & context_)
: s3_uri(s3_uri_)
, client(makeS3Client(s3_uri_, access_key_id_, secret_access_key_, context_))
, max_single_read_retries(context_->getSettingsRef().s3_max_single_read_retries)
, read_settings(context_->getReadSettings())
, request_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).request_settings)
{
request_settings.max_single_read_retries = context_->getSettingsRef().s3_max_single_read_retries; // FIXME: Avoid taking value for endpoint
}
DataSourceDescription BackupReaderS3::getDataSourceDescription() const
@ -115,7 +116,7 @@ UInt64 BackupReaderS3::getFileSize(const String & file_name)
std::unique_ptr<SeekableReadBuffer> BackupReaderS3::readFile(const String & file_name)
{
return std::make_unique<ReadBufferFromS3>(
client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, s3_uri.version_id, max_single_read_retries, read_settings);
client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, s3_uri.version_id, request_settings, read_settings);
}
@ -123,12 +124,12 @@ BackupWriterS3::BackupWriterS3(
const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, const ContextPtr & context_)
: s3_uri(s3_uri_)
, client(makeS3Client(s3_uri_, access_key_id_, secret_access_key_, context_))
, max_single_read_retries(context_->getSettingsRef().s3_max_single_read_retries)
, read_settings(context_->getReadSettings())
, rw_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).rw_settings)
, request_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).request_settings)
, log(&Poco::Logger::get("BackupWriterS3"))
{
rw_settings.updateFromSettingsIfEmpty(context_->getSettingsRef());
request_settings.updateFromSettingsIfEmpty(context_->getSettingsRef());
request_settings.max_single_read_retries = context_->getSettingsRef().s3_max_single_read_retries; // FIXME: Avoid taking value for endpoint
}
DataSourceDescription BackupWriterS3::getDataSourceDescription() const
@ -216,7 +217,7 @@ void BackupWriterS3::copyObjectMultipartImpl(
std::vector<String> part_tags;
size_t position = 0;
size_t upload_part_size = rw_settings.min_upload_part_size;
size_t upload_part_size = request_settings.min_upload_part_size;
for (size_t part_number = 1; position < size; ++part_number)
{
@ -248,10 +249,10 @@ void BackupWriterS3::copyObjectMultipartImpl(
position = next_position;
if (part_number % rw_settings.upload_part_size_multiply_parts_count_threshold == 0)
if (part_number % request_settings.upload_part_size_multiply_parts_count_threshold == 0)
{
upload_part_size *= rw_settings.upload_part_size_multiply_factor;
upload_part_size = std::min(upload_part_size, rw_settings.max_upload_part_size);
upload_part_size *= request_settings.upload_part_size_multiply_factor;
upload_part_size = std::min(upload_part_size, request_settings.max_upload_part_size);
}
}
@ -294,7 +295,7 @@ void BackupWriterS3::copyFileNative(DiskPtr from_disk, const String & file_name_
auto file_path = fs::path(s3_uri.key) / file_name_to;
auto head = requestObjectHeadData(source_bucket, objects[0].absolute_path).GetResult();
if (static_cast<size_t>(head.GetContentLength()) < rw_settings.max_single_operation_copy_size)
if (static_cast<size_t>(head.GetContentLength()) < request_settings.max_single_operation_copy_size)
{
copyObjectImpl(
source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, head);
@ -331,7 +332,7 @@ bool BackupWriterS3::fileContentsEqual(const String & file_name, const String &
try
{
auto in = std::make_unique<ReadBufferFromS3>(
client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, s3_uri.version_id, max_single_read_retries, read_settings);
client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, s3_uri.version_id, request_settings, read_settings);
String actual_file_contents(expected_file_contents.size(), ' ');
return (in->read(actual_file_contents.data(), actual_file_contents.size()) == actual_file_contents.size())
&& (actual_file_contents == expected_file_contents) && in->eof();
@ -349,7 +350,7 @@ std::unique_ptr<WriteBuffer> BackupWriterS3::writeFile(const String & file_name)
client,
s3_uri.bucket,
fs::path(s3_uri.key) / file_name,
rw_settings,
request_settings,
std::nullopt,
DBMS_DEFAULT_BUFFER_SIZE,
threadPoolCallbackRunner<void>(IOThreadPool::get(), "BackupWriterS3"));

View File

@ -39,8 +39,8 @@ public:
private:
S3::URI s3_uri;
std::shared_ptr<Aws::S3::S3Client> client;
UInt64 max_single_read_retries;
ReadSettings read_settings;
S3Settings::RequestSettings request_settings;
};
@ -81,9 +81,8 @@ private:
S3::URI s3_uri;
std::shared_ptr<Aws::S3::S3Client> client;
UInt64 max_single_read_retries;
ReadSettings read_settings;
S3Settings::ReadWriteSettings rw_settings;
S3Settings::RequestSettings request_settings;
Poco::Logger * log;
};

View File

@ -62,7 +62,7 @@
M(NetworkSendElapsedMicroseconds, "Total time spent waiting for data to send to network or sending data to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries..") \
M(NetworkReceiveBytes, "Total number of bytes received from network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
M(NetworkSendBytes, "Total number of bytes send to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
M(ThrottlerSleepMicroseconds, "Total time a query was sleeping to conform the 'max_network_bandwidth' setting.") \
M(ThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_network_bandwidth' and other throttling settings.") \
\
M(QueryMaskingRulesMatch, "Number of times query masking rules was successfully matched.") \
\

View File

@ -20,8 +20,6 @@ namespace ErrorCodes
/// Just 10^9.
static constexpr auto NS = 1000000000UL;
static const size_t default_burst_seconds = 1;
Throttler::Throttler(size_t max_speed_, const std::shared_ptr<Throttler> & parent_)
: max_speed(max_speed_)
, max_burst(max_speed_ * default_burst_seconds)

View File

@ -17,6 +17,8 @@ namespace DB
class Throttler
{
public:
static const size_t default_burst_seconds = 1;
Throttler(size_t max_speed_, size_t max_burst_, const std::shared_ptr<Throttler> & parent_ = nullptr)
: max_speed(max_speed_), max_burst(max_burst_), limit_exceeded_exception_message(""), tokens(max_burst), parent(parent_) {}

View File

@ -135,6 +135,9 @@ void CompressionCodecDelta::doDecompressData(const char * source, UInt32 source_
if (source_size < 2)
throw Exception("Cannot decompress. File has wrong header", ErrorCodes::CANNOT_DECOMPRESS);
if (uncompressed_size == 0)
return;
UInt8 bytes_size = source[0];
if (bytes_size == 0)

View File

@ -93,7 +93,7 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
auth_settings.region,
RemoteHostFilter(), s3_max_redirects,
enable_s3_requests_logging,
/* for_disk_s3 = */ false);
/* for_disk_s3 = */ false, /* get_request_throttler = */ {}, /* put_request_throttler = */ {});
client_configuration.endpointOverride = new_uri.endpoint;
@ -135,8 +135,8 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const std::string & snapshot_pa
if (s3_client == nullptr)
return;
S3Settings::ReadWriteSettings read_write_settings;
read_write_settings.upload_part_size_multiply_parts_count_threshold = 10000;
S3Settings::RequestSettings request_settings_1;
request_settings_1.upload_part_size_multiply_parts_count_threshold = 10000;
const auto create_writer = [&](const auto & key)
{
@ -145,7 +145,7 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const std::string & snapshot_pa
s3_client->client,
s3_client->uri.bucket,
key,
read_write_settings
request_settings_1
};
};
@ -194,13 +194,15 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const std::string & snapshot_pa
lock_writer.finalize();
// We read back the written UUID, if it's the same we can upload the file
S3Settings::RequestSettings request_settings_2;
request_settings_2.max_single_read_retries = 1;
ReadBufferFromS3 lock_reader
{
s3_client->client,
s3_client->uri.bucket,
lock_file,
"",
1,
request_settings_2,
{}
};

View File

@ -90,6 +90,10 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, s3_max_unexpected_write_error_retries, 4, "The maximum number of retries in case of unexpected errors during S3 write.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
M(UInt64, s3_max_get_rps, 0, "Limit on S3 GET request per second rate before throttling. Zero means unlimited.", 0) \
M(UInt64, s3_max_get_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_get_rps`", 0) \
M(UInt64, s3_max_put_rps, 0, "Limit on S3 PUT request per second rate before throttling. Zero means unlimited.", 0) \
M(UInt64, s3_max_put_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_put_rps`", 0) \
M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \
M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \
M(Bool, s3_check_objects_after_upload, false, "Check each uploaded object to s3 with head request to be sure that upload was successful", 0) \

View File

@ -175,7 +175,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
bucket,
path,
version_id,
settings_ptr->s3_settings.max_single_read_retries,
settings_ptr->request_settings,
disk_read_settings,
/* use_external_buffer */true,
/* offset */0,
@ -212,7 +212,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObject( /// NOLINT
bucket,
object.absolute_path,
version_id,
settings_ptr->s3_settings.max_single_read_retries,
settings_ptr->request_settings,
patchSettings(read_settings));
}
@ -238,7 +238,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
client.get(),
bucket,
object.absolute_path,
settings_ptr->s3_settings,
settings_ptr->request_settings,
attributes,
buf_size,
std::move(scheduler),
@ -489,7 +489,7 @@ void S3ObjectStorage::copyObjectImpl(
throwIfError(outcome);
auto settings_ptr = s3_settings.get();
if (settings_ptr->s3_settings.check_objects_after_upload)
if (settings_ptr->request_settings.check_objects_after_upload)
{
auto object_head = requestObjectHeadData(dst_bucket, dst_key);
if (!object_head.IsSuccess())
@ -533,7 +533,7 @@ void S3ObjectStorage::copyObjectMultipartImpl(
std::vector<String> part_tags;
size_t upload_part_size = settings_ptr->s3_settings.min_upload_part_size;
size_t upload_part_size = settings_ptr->request_settings.min_upload_part_size;
for (size_t position = 0, part_number = 1; position < size; ++part_number, position += upload_part_size)
{
ProfileEvents::increment(ProfileEvents::S3UploadPartCopy);
@ -586,7 +586,7 @@ void S3ObjectStorage::copyObjectMultipartImpl(
throwIfError(outcome);
}
if (settings_ptr->s3_settings.check_objects_after_upload)
if (settings_ptr->request_settings.check_objects_after_upload)
{
auto object_head = requestObjectHeadData(dst_bucket, dst_key);
if (!object_head.IsSuccess())
@ -643,17 +643,20 @@ void S3ObjectStorage::startup()
void S3ObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
{
s3_settings.set(getSettings(config, config_prefix, context));
client.set(getClient(config, config_prefix, context));
auto new_s3_settings = getSettings(config, config_prefix, context);
auto new_client = getClient(config, config_prefix, context, *new_s3_settings);
s3_settings.set(std::move(new_s3_settings));
client.set(std::move(new_client));
applyRemoteThrottlingSettings(context);
}
std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
const std::string & new_namespace, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
{
auto new_s3_settings = getSettings(config, config_prefix, context);
auto new_client = getClient(config, config_prefix, context, *new_s3_settings);
return std::make_unique<S3ObjectStorage>(
getClient(config, config_prefix, context),
getSettings(config, config_prefix, context),
std::move(new_client), std::move(new_s3_settings),
version_id, s3_capabilities, new_namespace,
S3::URI(Poco::URI(config.getString(config_prefix + ".endpoint"))).endpoint);
}

View File

@ -23,17 +23,17 @@ struct S3ObjectStorageSettings
S3ObjectStorageSettings() = default;
S3ObjectStorageSettings(
const S3Settings::ReadWriteSettings & s3_settings_,
const S3Settings::RequestSettings & request_settings_,
uint64_t min_bytes_for_seek_,
int32_t list_object_keys_size_,
int32_t objects_chunk_size_to_delete_)
: s3_settings(s3_settings_)
: request_settings(request_settings_)
, min_bytes_for_seek(min_bytes_for_seek_)
, list_object_keys_size(list_object_keys_size_)
, objects_chunk_size_to_delete(objects_chunk_size_to_delete_)
{}
S3Settings::ReadWriteSettings s3_settings;
S3Settings::RequestSettings request_settings;
uint64_t min_bytes_for_seek;
int32_t list_object_keys_size;

View File

@ -4,6 +4,7 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/logger_useful.h>
#include <Common/Throttler.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
@ -32,17 +33,26 @@ namespace ErrorCodes
std::unique_ptr<S3ObjectStorageSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context)
{
S3Settings::ReadWriteSettings rw_settings;
rw_settings.max_single_read_retries = config.getUInt64(config_prefix + ".s3_max_single_read_retries", context->getSettingsRef().s3_max_single_read_retries);
rw_settings.min_upload_part_size = config.getUInt64(config_prefix + ".s3_min_upload_part_size", context->getSettingsRef().s3_min_upload_part_size);
rw_settings.upload_part_size_multiply_factor = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_factor", context->getSettingsRef().s3_upload_part_size_multiply_factor);
rw_settings.upload_part_size_multiply_parts_count_threshold = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_parts_count_threshold", context->getSettingsRef().s3_upload_part_size_multiply_parts_count_threshold);
rw_settings.max_single_part_upload_size = config.getUInt64(config_prefix + ".s3_max_single_part_upload_size", context->getSettingsRef().s3_max_single_part_upload_size);
rw_settings.check_objects_after_upload = config.getUInt64(config_prefix + ".s3_check_objects_after_upload", context->getSettingsRef().s3_check_objects_after_upload);
rw_settings.max_unexpected_write_error_retries = config.getUInt64(config_prefix + ".s3_max_unexpected_write_error_retries", context->getSettingsRef().s3_max_unexpected_write_error_retries);
const Settings & settings = context->getSettingsRef();
S3Settings::RequestSettings request_settings;
request_settings.max_single_read_retries = config.getUInt64(config_prefix + ".s3_max_single_read_retries", settings.s3_max_single_read_retries);
request_settings.min_upload_part_size = config.getUInt64(config_prefix + ".s3_min_upload_part_size", settings.s3_min_upload_part_size);
request_settings.upload_part_size_multiply_factor = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_factor", settings.s3_upload_part_size_multiply_factor);
request_settings.upload_part_size_multiply_parts_count_threshold = config.getUInt64(config_prefix + ".s3_upload_part_size_multiply_parts_count_threshold", settings.s3_upload_part_size_multiply_parts_count_threshold);
request_settings.max_single_part_upload_size = config.getUInt64(config_prefix + ".s3_max_single_part_upload_size", settings.s3_max_single_part_upload_size);
request_settings.check_objects_after_upload = config.getUInt64(config_prefix + ".s3_check_objects_after_upload", settings.s3_check_objects_after_upload);
request_settings.max_unexpected_write_error_retries = config.getUInt64(config_prefix + ".s3_max_unexpected_write_error_retries", settings.s3_max_unexpected_write_error_retries);
// NOTE: it would be better to reuse old throttlers to avoid losing token bucket state on every config reload, which could lead to exceeding limit for short time. But it is good enough unless very high `burst` values are used.
if (UInt64 max_get_rps = config.getUInt64(config_prefix + ".s3_max_get_rps", settings.s3_max_get_rps))
request_settings.get_request_throttler = std::make_shared<Throttler>(
max_get_rps, config.getUInt64(config_prefix + ".s3_max_get_burst", settings.s3_max_get_burst ? settings.s3_max_get_burst : Throttler::default_burst_seconds * max_get_rps));
if (UInt64 max_put_rps = config.getUInt64(config_prefix + ".s3_max_put_rps", settings.s3_max_put_rps))
request_settings.put_request_throttler = std::make_shared<Throttler>(
max_put_rps, config.getUInt64(config_prefix + ".s3_max_put_burst", settings.s3_max_put_burst ? settings.s3_max_put_burst : Throttler::default_burst_seconds * max_put_rps));
return std::make_unique<S3ObjectStorageSettings>(
rw_settings,
request_settings,
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getInt(config_prefix + ".list_object_keys_size", 1000),
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000));
@ -112,14 +122,20 @@ std::shared_ptr<S3::ProxyConfiguration> getProxyConfiguration(const String & pre
}
std::unique_ptr<Aws::S3::S3Client> getClient(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context)
std::unique_ptr<Aws::S3::S3Client> getClient(
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
ContextPtr context,
const S3ObjectStorageSettings & settings)
{
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
config.getString(config_prefix + ".region", ""),
context->getRemoteHostFilter(),
static_cast<int>(context->getGlobalContext()->getSettingsRef().s3_max_redirects),
context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
/* for_disk_s3 = */ true);
/* for_disk_s3 = */ true,
settings.request_settings.get_request_throttler,
settings.request_settings.put_request_throttler);
S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint")));
if (uri.key.back() != '/')

View File

@ -22,7 +22,7 @@ struct S3ObjectStorageSettings;
std::unique_ptr<S3ObjectStorageSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context);
std::unique_ptr<Aws::S3::S3Client> getClient(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context);
std::unique_ptr<Aws::S3::S3Client> getClient(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context, const S3ObjectStorageSettings & settings);
}

View File

@ -1,27 +0,0 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include <aws/core/client/DefaultRetryStrategy.h>
#include <IO/S3Common.h>
#include <Storages/StorageS3Settings.h>
#include <Disks/ObjectStorages/S3/ProxyConfiguration.h>
#include <Disks/ObjectStorages/S3/ProxyListConfiguration.h>
#include <Disks/ObjectStorages/S3/ProxyResolverConfiguration.h>
#include <Disks/DiskRestartProxy.h>
#include <Disks/DiskLocal.h>
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
namespace DB
{
std::unique_ptr<DiskS3Settings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context);
std::shared_ptr<Aws::S3::S3Client> getClient(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context);
}

View File

@ -130,21 +130,16 @@ void registerDiskS3(DiskFactory & factory)
chassert(type == "s3" || type == "s3_plain");
MetadataStoragePtr metadata_storage;
auto settings = getSettings(config, config_prefix, context);
auto client = getClient(config, config_prefix, context, *settings);
if (type == "s3_plain")
{
s3_storage = std::make_shared<S3PlainObjectStorage>(
getClient(config, config_prefix, context),
getSettings(config, config_prefix, context),
uri.version_id, s3_capabilities, uri.bucket, uri.endpoint);
s3_storage = std::make_shared<S3PlainObjectStorage>(std::move(client), std::move(settings), uri.version_id, s3_capabilities, uri.bucket, uri.endpoint);
metadata_storage = std::make_shared<MetadataStorageFromPlainObjectStorage>(s3_storage, uri.key);
}
else
{
s3_storage = std::make_shared<S3ObjectStorage>(
getClient(config, config_prefix, context),
getSettings(config, config_prefix, context),
uri.version_id, s3_capabilities, uri.bucket, uri.endpoint);
s3_storage = std::make_shared<S3ObjectStorage>(std::move(client), std::move(settings), uri.version_id, s3_capabilities, uri.bucket, uri.endpoint);
auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, uri.key);
}

View File

@ -45,7 +45,7 @@ ReadBufferFromS3::ReadBufferFromS3(
const String & bucket_,
const String & key_,
const String & version_id_,
UInt64 max_single_read_retries_,
const S3Settings::RequestSettings & request_settings_,
const ReadSettings & settings_,
bool use_external_buffer_,
size_t offset_,
@ -56,7 +56,7 @@ ReadBufferFromS3::ReadBufferFromS3(
, bucket(bucket_)
, key(key_)
, version_id(version_id_)
, max_single_read_retries(max_single_read_retries_)
, request_settings(request_settings_)
, offset(offset_)
, read_until_position(read_until_position_)
, read_settings(settings_)
@ -105,7 +105,7 @@ bool ReadBufferFromS3::nextImpl()
}
size_t sleep_time_with_backoff_milliseconds = 100;
for (size_t attempt = 0; (attempt < max_single_read_retries) && !next_result; ++attempt)
for (size_t attempt = 0; attempt < request_settings.max_single_read_retries && !next_result; ++attempt)
{
Stopwatch watch;
try
@ -166,7 +166,7 @@ bool ReadBufferFromS3::nextImpl()
attempt,
e.message());
if (attempt + 1 == max_single_read_retries)
if (attempt + 1 == request_settings.max_single_read_retries)
throw;
/// Pause before next attempt.
@ -349,7 +349,7 @@ SeekableReadBufferPtr ReadBufferS3Factory::getReader()
bucket,
key,
version_id,
s3_max_single_read_retries,
request_settings,
read_settings,
false /*use_external_buffer*/,
next_range->first,

View File

@ -1,6 +1,7 @@
#pragma once
#include <Common/RangeGenerator.h>
#include <Storages/StorageS3Settings.h>
#include "config.h"
#if USE_AWS_S3
@ -33,7 +34,7 @@ private:
String bucket;
String key;
String version_id;
UInt64 max_single_read_retries;
const S3Settings::RequestSettings request_settings;
/// These variables are atomic because they can be used for `logging only`
/// (where it is not important to get consistent result)
@ -52,7 +53,7 @@ public:
const String & bucket_,
const String & key_,
const String & version_id_,
UInt64 max_single_read_retries_,
const S3Settings::RequestSettings & request_settings_,
const ReadSettings & settings_,
bool use_external_buffer = false,
size_t offset_ = 0,
@ -100,7 +101,7 @@ public:
const String & version_id_,
size_t range_step_,
size_t object_size_,
UInt64 s3_max_single_read_retries_,
const S3Settings::RequestSettings & request_settings_,
const ReadSettings & read_settings_)
: client_ptr(client_ptr_)
, bucket(bucket_)
@ -110,7 +111,7 @@ public:
, range_generator(object_size_, range_step_)
, range_step(range_step_)
, object_size(object_size_)
, s3_max_single_read_retries(s3_max_single_read_retries_)
, request_settings(request_settings_)
{
assert(range_step > 0);
assert(range_step < object_size);
@ -135,7 +136,7 @@ private:
size_t range_step;
size_t object_size;
UInt64 s3_max_single_read_retries;
const S3Settings::RequestSettings request_settings;
};
}

View File

@ -11,6 +11,7 @@
#include <Common/logger_useful.h>
#include <Common/Stopwatch.h>
#include <Common/Throttler.h>
#include <IO/HTTPCommon.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
@ -76,12 +77,16 @@ PocoHTTPClientConfiguration::PocoHTTPClientConfiguration(
const RemoteHostFilter & remote_host_filter_,
unsigned int s3_max_redirects_,
bool enable_s3_requests_logging_,
bool for_disk_s3_)
bool for_disk_s3_,
const ThrottlerPtr & get_request_throttler_,
const ThrottlerPtr & put_request_throttler_)
: force_region(force_region_)
, remote_host_filter(remote_host_filter_)
, s3_max_redirects(s3_max_redirects_)
, enable_s3_requests_logging(enable_s3_requests_logging_)
, for_disk_s3(for_disk_s3_)
, get_request_throttler(get_request_throttler_)
, put_request_throttler(put_request_throttler_)
{
}
@ -128,6 +133,8 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & client_config
, s3_max_redirects(client_configuration.s3_max_redirects)
, enable_s3_requests_logging(client_configuration.enable_s3_requests_logging)
, for_disk_s3(client_configuration.for_disk_s3)
, get_request_throttler(client_configuration.get_request_throttler)
, put_request_throttler(client_configuration.put_request_throttler)
, extra_headers(client_configuration.extra_headers)
{
}
@ -245,6 +252,23 @@ void PocoHTTPClient::makeRequestInternal(
if (enable_s3_requests_logging)
LOG_TEST(log, "Make request to: {}", uri);
switch (request.GetMethod())
{
case Aws::Http::HttpMethod::HTTP_GET:
case Aws::Http::HttpMethod::HTTP_HEAD:
if (get_request_throttler)
get_request_throttler->add(1);
break;
case Aws::Http::HttpMethod::HTTP_PUT:
case Aws::Http::HttpMethod::HTTP_POST:
case Aws::Http::HttpMethod::HTTP_PATCH:
if (put_request_throttler)
put_request_throttler->add(1);
break;
case Aws::Http::HttpMethod::HTTP_DELETE:
break; // Not throttled
}
addMetric(request, S3MetricType::Count);
CurrentMetrics::Increment metric_increment{CurrentMetrics::S3Requests};

View File

@ -8,6 +8,7 @@
#if USE_AWS_S3
#include <Common/RemoteHostFilter.h>
#include <Common/Throttler_fwd.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/HTTPCommon.h>
#include <IO/S3/SessionAwareIOStream.h>
@ -48,6 +49,8 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
unsigned int s3_max_redirects;
bool enable_s3_requests_logging;
bool for_disk_s3;
ThrottlerPtr get_request_throttler;
ThrottlerPtr put_request_throttler;
HeaderCollection extra_headers;
void updateSchemeAndRegion();
@ -60,7 +63,9 @@ private:
const RemoteHostFilter & remote_host_filter_,
unsigned int s3_max_redirects_,
bool enable_s3_requests_logging_,
bool for_disk_s3_
bool for_disk_s3_,
const ThrottlerPtr & get_request_throttler_,
const ThrottlerPtr & put_request_throttler_
);
/// Constructor of Aws::Client::ClientConfiguration must be called after AWS SDK initialization.
@ -154,6 +159,16 @@ private:
unsigned int s3_max_redirects;
bool enable_s3_requests_logging;
bool for_disk_s3;
/// Limits get request per second rate for GET, SELECT and all other requests, excluding throttled by put throttler
/// (i.e. throttles GetObject, HeadObject)
ThrottlerPtr get_request_throttler;
/// Limits put request per second rate for PUT, COPY, POST, LIST requests
/// (i.e. throttles PutObject, CopyObject, ListObjects, CreateMultipartUpload, UploadPartCopy, UploadPart, CompleteMultipartUpload)
/// NOTE: DELETE and CANCEL requests are not throttled by either put or get throttler
ThrottlerPtr put_request_throttler;
const HeaderCollection extra_headers;
};

View File

@ -88,7 +88,9 @@ TEST(IOTestAwsS3Client, AppendExtraSSECHeaders)
remote_host_filter,
s3_max_redirects,
enable_s3_requests_logging,
/* for_disk_s3 = */ false
/* for_disk_s3 = */ false,
/* get_request_throttler = */ {},
/* put_request_throttler = */ {}
);
client_configuration.endpointOverride = uri.endpoint;
@ -113,12 +115,14 @@ TEST(IOTestAwsS3Client, AppendExtraSSECHeaders)
ASSERT_TRUE(client);
DB::ReadSettings read_settings;
DB::S3Settings::RequestSettings request_settings;
request_settings.max_single_read_retries = max_single_read_retries;
DB::ReadBufferFromS3 read_buffer(
client,
uri.bucket,
uri.key,
version_id,
max_single_read_retries,
request_settings,
read_settings
);

View File

@ -573,7 +573,14 @@ public:
/// AWS API tries credentials providers one by one. Some of providers (like ProfileConfigFileAWSCredentialsProvider) can be
/// quite verbose even if nobody configured them. So we use our provider first and only after it use default providers.
{
DB::S3::PocoHTTPClientConfiguration aws_client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(configuration.region, configuration.remote_host_filter, configuration.s3_max_redirects, configuration.enable_s3_requests_logging, configuration.for_disk_s3);
DB::S3::PocoHTTPClientConfiguration aws_client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(
configuration.region,
configuration.remote_host_filter,
configuration.s3_max_redirects,
configuration.enable_s3_requests_logging,
configuration.for_disk_s3,
configuration.get_request_throttler,
configuration.put_request_throttler);
AddProvider(std::make_shared<AwsAuthSTSAssumeRoleWebIdentityCredentialsProvider>(aws_client_configuration));
}
@ -610,7 +617,14 @@ public:
}
else if (Aws::Utils::StringUtils::ToLower(ec2_metadata_disabled.c_str()) != "true")
{
DB::S3::PocoHTTPClientConfiguration aws_client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(configuration.region, configuration.remote_host_filter, configuration.s3_max_redirects, configuration.enable_s3_requests_logging, configuration.for_disk_s3);
DB::S3::PocoHTTPClientConfiguration aws_client_configuration = DB::S3::ClientFactory::instance().createClientConfiguration(
configuration.region,
configuration.remote_host_filter,
configuration.s3_max_redirects,
configuration.enable_s3_requests_logging,
configuration.for_disk_s3,
configuration.get_request_throttler,
configuration.put_request_throttler);
/// See MakeDefaultHttpResourceClientConfiguration().
/// This is part of EC2 metadata client, but unfortunately it can't be accessed from outside
@ -731,9 +745,18 @@ namespace S3
const RemoteHostFilter & remote_host_filter,
unsigned int s3_max_redirects,
bool enable_s3_requests_logging,
bool for_disk_s3)
bool for_disk_s3,
const ThrottlerPtr & get_request_throttler,
const ThrottlerPtr & put_request_throttler)
{
return PocoHTTPClientConfiguration(force_region, remote_host_filter, s3_max_redirects, enable_s3_requests_logging, for_disk_s3);
return PocoHTTPClientConfiguration(
force_region,
remote_host_filter,
s3_max_redirects,
enable_s3_requests_logging,
for_disk_s3,
get_request_throttler,
put_request_throttler);
}
URI::URI(const Poco::URI & uri_)

View File

@ -17,6 +17,7 @@
#include <Poco/URI.h>
#include <Common/Exception.h>
#include <Common/Throttler_fwd.h>
namespace Aws::S3
{
@ -88,7 +89,9 @@ public:
const RemoteHostFilter & remote_host_filter,
unsigned int s3_max_redirects,
bool enable_s3_requests_logging,
bool for_disk_s3);
bool for_disk_s3,
const ThrottlerPtr & get_request_throttler,
const ThrottlerPtr & put_request_throttler);
private:
ClientFactory();

View File

@ -71,7 +71,7 @@ WriteBufferFromS3::WriteBufferFromS3(
std::shared_ptr<const Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & key_,
const S3Settings::ReadWriteSettings & s3_settings_,
const S3Settings::RequestSettings & request_settings_,
std::optional<std::map<String, String>> object_metadata_,
size_t buffer_size_,
ThreadPoolCallbackRunner<void> schedule_,
@ -79,10 +79,10 @@ WriteBufferFromS3::WriteBufferFromS3(
: BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0)
, bucket(bucket_)
, key(key_)
, s3_settings(s3_settings_)
, request_settings(request_settings_)
, client_ptr(std::move(client_ptr_))
, object_metadata(std::move(object_metadata_))
, upload_part_size(s3_settings_.min_upload_part_size)
, upload_part_size(request_settings_.min_upload_part_size)
, schedule(std::move(schedule_))
, write_settings(write_settings_)
{
@ -107,7 +107,7 @@ void WriteBufferFromS3::nextImpl()
write_settings.remote_throttler->add(offset());
/// Data size exceeds singlepart upload threshold, need to use multipart upload.
if (multipart_upload_id.empty() && last_part_size > s3_settings.max_single_part_upload_size)
if (multipart_upload_id.empty() && last_part_size > request_settings.max_single_part_upload_size)
createMultipartUpload();
if (!multipart_upload_id.empty() && last_part_size > upload_part_size)
@ -122,10 +122,10 @@ void WriteBufferFromS3::nextImpl()
void WriteBufferFromS3::allocateBuffer()
{
if (total_parts_uploaded != 0 && total_parts_uploaded % s3_settings.upload_part_size_multiply_parts_count_threshold == 0)
if (total_parts_uploaded != 0 && total_parts_uploaded % request_settings.upload_part_size_multiply_parts_count_threshold == 0)
{
upload_part_size *= s3_settings.upload_part_size_multiply_factor;
upload_part_size = std::min(upload_part_size, s3_settings.max_upload_part_size);
upload_part_size *= request_settings.upload_part_size_multiply_factor;
upload_part_size = std::min(upload_part_size, request_settings.max_upload_part_size);
}
temporary_buffer = Aws::MakeShared<Aws::StringStream>("temporary buffer");
@ -180,7 +180,7 @@ void WriteBufferFromS3::finalizeImpl()
if (!multipart_upload_id.empty())
completeMultipartUpload();
if (s3_settings.check_objects_after_upload)
if (request_settings.check_objects_after_upload)
{
LOG_TRACE(log, "Checking object {} exists after upload", key);
@ -370,7 +370,7 @@ void WriteBufferFromS3::completeMultipartUpload()
req.SetMultipartUpload(multipart_upload);
size_t max_retry = std::max(s3_settings.max_unexpected_write_error_retries, 1UL);
size_t max_retry = std::max(request_settings.max_unexpected_write_error_retries, 1UL);
for (size_t i = 0; i < max_retry; ++i)
{
ProfileEvents::increment(ProfileEvents::S3CompleteMultipartUpload);
@ -476,7 +476,7 @@ void WriteBufferFromS3::fillPutRequest(Aws::S3::Model::PutObjectRequest & req)
void WriteBufferFromS3::processPutRequest(const PutObjectTask & task)
{
size_t max_retry = std::max(s3_settings.max_unexpected_write_error_retries, 1UL);
size_t max_retry = std::max(request_settings.max_unexpected_write_error_retries, 1UL);
for (size_t i = 0; i < max_retry; ++i)
{
ProfileEvents::increment(ProfileEvents::S3PutObject);

View File

@ -50,7 +50,7 @@ public:
std::shared_ptr<const Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & key_,
const S3Settings::ReadWriteSettings & s3_settings_,
const S3Settings::RequestSettings & request_settings_,
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
ThreadPoolCallbackRunner<void> schedule_ = {},
@ -88,7 +88,7 @@ private:
const String bucket;
const String key;
const S3Settings::ReadWriteSettings s3_settings;
const S3Settings::RequestSettings request_settings;
const std::shared_ptr<const Aws::S3::S3Client> client_ptr;
const std::optional<std::map<String, String>> object_metadata;

View File

@ -625,8 +625,6 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
OpenTelemetry::TracingContextHolderPtr thread_trace_context;
Stopwatch watch;
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.getContext()->getSettingsRef());
try
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
@ -644,6 +642,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
thread_trace_context->root_span.addAttribute("clickhouse.rows", distributed_header.rows);
thread_trace_context->root_span.addAttribute("clickhouse.bytes", distributed_header.bytes);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(distributed_header.insert_settings);
auto connection = pool->get(timeouts, &distributed_header.insert_settings);
LOG_DEBUG(log, "Sending `{}` to {} ({} rows, {} bytes)",
file_path,
@ -780,14 +779,6 @@ struct StorageDistributedDirectoryMonitor::Batch
fs::rename(tmp_file, parent.current_batch_file_path);
}
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(parent.storage.getContext()->getSettingsRef());
auto connection = parent.pool->get(timeouts);
LOG_DEBUG(parent.log, "Sending a batch of {} files to {} ({} rows, {} bytes).",
file_indices.size(),
connection->getDescription(),
formatReadableQuantity(total_rows),
formatReadableSizeWithBinarySuffix(total_bytes));
bool batch_broken = false;
bool batch_marked_as_broken = false;
@ -795,14 +786,14 @@ struct StorageDistributedDirectoryMonitor::Batch
{
try
{
sendBatch(*connection, timeouts);
sendBatch();
}
catch (const Exception & e)
{
if (split_batch_on_failure && isSplittableErrorCode(e.code(), e.isRemoteException()))
if (split_batch_on_failure && file_indices.size() > 1 && isSplittableErrorCode(e.code(), e.isRemoteException()))
{
tryLogCurrentException(parent.log, "Trying to split batch due to");
sendSeparateFiles(*connection, timeouts);
sendSeparateFiles();
}
else
throw;
@ -882,9 +873,12 @@ struct StorageDistributedDirectoryMonitor::Batch
}
private:
void sendBatch(Connection & connection, const ConnectionTimeouts & timeouts)
void sendBatch()
{
std::unique_ptr<RemoteInserter> remote;
bool compression_expected = false;
IConnectionPool::Entry connection;
for (UInt64 file_idx : file_indices)
{
@ -902,12 +896,21 @@ private:
if (!remote)
{
remote = std::make_unique<RemoteInserter>(connection, timeouts,
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(distributed_header.insert_settings);
connection = parent.pool->get(timeouts);
compression_expected = connection->getCompression() == Protocol::Compression::Enable;
LOG_DEBUG(parent.log, "Sending a batch of {} files to {} ({} rows, {} bytes).",
file_indices.size(),
connection->getDescription(),
formatReadableQuantity(total_rows),
formatReadableSizeWithBinarySuffix(total_bytes));
remote = std::make_unique<RemoteInserter>(*connection, timeouts,
distributed_header.insert_query,
distributed_header.insert_settings,
distributed_header.client_info);
}
bool compression_expected = connection.getCompression() == Protocol::Compression::Enable;
writeRemoteConvert(distributed_header, *remote, compression_expected, in, parent.log);
}
@ -915,7 +918,7 @@ private:
remote->onFinish();
}
void sendSeparateFiles(Connection & connection, const ConnectionTimeouts & timeouts)
void sendSeparateFiles()
{
size_t broken_files = 0;
@ -939,11 +942,15 @@ private:
distributed_header.client_info.client_trace_context,
parent.storage.getContext()->getOpenTelemetrySpanLog());
RemoteInserter remote(connection, timeouts,
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(distributed_header.insert_settings);
auto connection = parent.pool->get(timeouts);
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;
RemoteInserter remote(*connection, timeouts,
distributed_header.insert_query,
distributed_header.insert_settings,
distributed_header.client_info);
bool compression_expected = connection.getCompression() == Protocol::Compression::Enable;
writeRemoteConvert(distributed_header, remote, compression_expected, in, parent.log);
remote.onFinish();
}

View File

@ -118,7 +118,7 @@ struct URLBasedDataSourceConfiguration
struct StorageS3Configuration : URLBasedDataSourceConfiguration
{
S3::AuthSettings auth_settings;
S3Settings::ReadWriteSettings rw_settings;
S3Settings::RequestSettings request_settings;
};

View File

@ -282,8 +282,8 @@ MergeTreeData::MergeTreeData(
checkTTLExpressions(metadata_, metadata_);
/// format_file always contained on any data path
PathWithDisk version_file;
const auto format_version_path = fs::path(relative_data_path) / MergeTreeData::FORMAT_VERSION_FILE_NAME;
std::optional<UInt32> read_format_version;
/// Creating directories, if not exist.
for (const auto & disk : getDisks())
{
@ -292,42 +292,44 @@ MergeTreeData::MergeTreeData(
disk->createDirectories(relative_data_path);
disk->createDirectories(fs::path(relative_data_path) / MergeTreeData::DETACHED_DIR_NAME);
String current_version_file_path = fs::path(relative_data_path) / MergeTreeData::FORMAT_VERSION_FILE_NAME;
if (disk->exists(current_version_file_path))
if (disk->exists(format_version_path))
{
if (!version_file.first.empty())
throw Exception(ErrorCodes::CORRUPTED_DATA, "Duplication of version file {} and {}", fullPath(version_file.second, version_file.first), current_version_file_path);
version_file = {current_version_file_path, disk};
auto buf = disk->readFile(format_version_path);
UInt32 current_format_version{0};
readIntText(current_format_version, *buf);
if (!buf->eof())
throw Exception(ErrorCodes::CORRUPTED_DATA, "Bad version file: {}", fullPath(disk, format_version_path));
if (!read_format_version.has_value())
read_format_version = current_format_version;
else if (*read_format_version != current_format_version)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Version file on {} contains version {} expected version is {}.", fullPath(disk, format_version_path), current_format_version, *read_format_version);
}
}
/// If not choose any
if (version_file.first.empty())
version_file = {fs::path(relative_data_path) / MergeTreeData::FORMAT_VERSION_FILE_NAME, getStoragePolicy()->getAnyDisk()};
bool version_file_exists = version_file.second->exists(version_file.first);
// When data path or file not exists, ignore the format_version check
if (!attach || !version_file_exists)
if (!attach || !read_format_version)
{
format_version = min_format_version;
if (!version_file.second->isReadOnly())
// try to write to first non-readonly disk
for (const auto & disk : getStoragePolicy()->getDisks())
{
auto buf = version_file.second->writeFile(version_file.first, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, context_->getWriteSettings());
writeIntText(format_version.toUnderType(), *buf);
if (getContext()->getSettingsRef().fsync_metadata)
buf->sync();
if (!disk->isReadOnly())
{
auto buf = disk->writeFile(format_version_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, context_->getWriteSettings());
writeIntText(format_version.toUnderType(), *buf);
if (getContext()->getSettingsRef().fsync_metadata)
buf->sync();
break;
}
}
}
else
{
auto buf = version_file.second->readFile(version_file.first);
UInt32 read_format_version;
readIntText(read_format_version, *buf);
format_version = read_format_version;
if (!buf->eof())
throw Exception("Bad version file: " + fullPath(version_file.second, version_file.first), ErrorCodes::CORRUPTED_DATA);
format_version = *read_format_version;
}
if (format_version < min_format_version)

View File

@ -314,18 +314,32 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
if (metadata_snapshot->hasAnyTTL() && merge_with_ttl_allowed && !ttl_merges_blocker.isCancelled())
{
/// TTL delete is preferred to recompression
TTLDeleteMergeSelector delete_ttl_selector(
TTLDeleteMergeSelector drop_ttl_selector(
next_delete_ttl_merge_times_by_partition,
current_time,
data_settings->merge_with_ttl_timeout,
data_settings->ttl_only_drop_parts);
true);
parts_to_merge = delete_ttl_selector.select(parts_ranges, max_total_size_to_merge);
/// The size of the completely expired part of TTL drop is not affected by the merge pressure and the size of the storage space
parts_to_merge = drop_ttl_selector.select(parts_ranges, data_settings->max_bytes_to_merge_at_max_space_in_pool);
if (!parts_to_merge.empty())
{
future_part->merge_type = MergeType::TTLDelete;
}
else if (metadata_snapshot->hasAnyRecompressionTTL())
else if (!data_settings->ttl_only_drop_parts)
{
TTLDeleteMergeSelector delete_ttl_selector(
next_delete_ttl_merge_times_by_partition,
current_time,
data_settings->merge_with_ttl_timeout,
false);
parts_to_merge = delete_ttl_selector.select(parts_ranges, max_total_size_to_merge);
if (!parts_to_merge.empty())
future_part->merge_type = MergeType::TTLDelete;
}
if (parts_to_merge.empty() && metadata_snapshot->hasAnyRecompressionTTL())
{
TTLRecompressMergeSelector recompress_ttl_selector(
next_recompress_ttl_merge_times_by_partition,
@ -621,8 +635,16 @@ MergeTreeData::DataPartPtr MergeTreeDataMergerMutator::renameMergedTemporaryPart
size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts)
{
size_t res = 0;
time_t current_time = std::time(nullptr);
for (const MergeTreeData::DataPartPtr & part : source_parts)
{
/// Exclude expired parts
time_t part_max_ttl = part->ttl_infos.part_max_ttl;
if (part_max_ttl && part_max_ttl <= current_time)
continue;
res += part->getBytesOnDisk();
}
return static_cast<size_t>(res * DISK_USAGE_COEFFICIENT_TO_RESERVE);
}

View File

@ -151,12 +151,14 @@ std::vector<String> JsonMetadataGetter::getJsonLogFiles()
std::shared_ptr<ReadBuffer> JsonMetadataGetter::createS3ReadBuffer(const String & key, ContextPtr context)
{
/// TODO: add parallel downloads
S3Settings::RequestSettings request_settings;
request_settings.max_single_read_retries = 10;
return std::make_shared<ReadBufferFromS3>(
base_configuration.client,
base_configuration.uri.bucket,
key,
base_configuration.uri.version_id,
/* max single read retries */10,
request_settings,
context->getReadSettings());
}
@ -187,7 +189,7 @@ StorageDelta::StorageDelta(
ContextPtr context_,
std::optional<FormatSettings> format_settings_)
: IStorage(table_id_)
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers}
, log(&Poco::Logger::get("StorageDeltaLake (" + table_id_.table_name + ")"))
, table_path(base_configuration.uri.key)
{

View File

@ -37,7 +37,7 @@ StorageHudi::StorageHudi(
ContextPtr context_,
std::optional<FormatSettings> format_settings_)
: IStorage(table_id_)
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
, base_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers}
, log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")"))
, table_path(base_configuration.uri.key)
{

View File

@ -4507,6 +4507,9 @@ void StorageReplicatedMergeTree::assertNotReadonly() const
SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
if (!initialization_done)
throw Exception(ErrorCodes::NOT_INITIALIZED, "Table is not initialized yet");
/// If table is read-only because it doesn't have metadata in zk yet, then it's not possible to insert into it
/// Without this check, we'll write data parts on disk, and afterwards will remove them since we'll fail to commit them into zk
/// In case of remote storage like s3, it'll generate unnecessary PUT requests

View File

@ -100,7 +100,8 @@ public:
const Block & virtual_header_,
ContextPtr context_,
std::unordered_map<String, S3::ObjectInfo> * object_infos_,
Strings * read_keys_)
Strings * read_keys_,
const S3Settings::RequestSettings & request_settings_)
: WithContext(context_)
, client(client_)
, globbed_uri(globbed_uri_)
@ -108,6 +109,7 @@ public:
, virtual_header(virtual_header_)
, object_infos(object_infos_)
, read_keys(read_keys_)
, request_settings(request_settings_)
{
if (globbed_uri.bucket.find_first_of("*?{") != globbed_uri.bucket.npos)
throw Exception("Expression can not have wildcards inside bucket name", ErrorCodes::UNEXPECTED_EXPRESSION);
@ -258,6 +260,7 @@ private:
bool is_finished{false};
std::unordered_map<String, S3::ObjectInfo> * object_infos;
Strings * read_keys;
S3Settings::RequestSettings request_settings;
};
StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
@ -267,8 +270,9 @@ StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
const Block & virtual_header,
ContextPtr context,
std::unordered_map<String, S3::ObjectInfo> * object_infos_,
Strings * read_keys_)
: pimpl(std::make_shared<StorageS3Source::DisclosedGlobIterator::Impl>(client_, globbed_uri_, query, virtual_header, context, object_infos_, read_keys_))
Strings * read_keys_,
const S3Settings::RequestSettings & request_settings_)
: pimpl(std::make_shared<StorageS3Source::DisclosedGlobIterator::Impl>(client_, globbed_uri_, query, virtual_header, context, object_infos_, read_keys_, request_settings_))
{
}
@ -381,7 +385,7 @@ StorageS3Source::StorageS3Source(
std::optional<FormatSettings> format_settings_,
const ColumnsDescription & columns_,
UInt64 max_block_size_,
UInt64 max_single_read_retries_,
const S3Settings::RequestSettings & request_settings_,
String compression_hint_,
const std::shared_ptr<const Aws::S3::S3Client> & client_,
const String & bucket_,
@ -397,7 +401,7 @@ StorageS3Source::StorageS3Source(
, format(format_)
, columns_desc(columns_)
, max_block_size(max_block_size_)
, max_single_read_retries(max_single_read_retries_)
, request_settings(request_settings_)
, compression_hint(std::move(compression_hint_))
, client(client_)
, sample_block(sample_block_)
@ -463,7 +467,7 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & k
if (!use_parallel_download || object_too_small)
{
LOG_TRACE(log, "Downloading object of size {} from S3 in single thread", object_size);
return std::make_unique<ReadBufferFromS3>(client, bucket, key, version_id, max_single_read_retries, getContext()->getReadSettings());
return std::make_unique<ReadBufferFromS3>(client, bucket, key, version_id, request_settings, getContext()->getReadSettings());
}
assert(object_size > 0);
@ -475,7 +479,7 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & k
}
auto factory = std::make_unique<ReadBufferS3Factory>(
client, bucket, key, version_id, download_buffer_size, object_size, max_single_read_retries, getContext()->getReadSettings());
client, bucket, key, version_id, download_buffer_size, object_size, request_settings, getContext()->getReadSettings());
LOG_TRACE(
log, "Downloading from S3 in {} threads. Object size: {}, Range size: {}.", download_thread_num, object_size, download_buffer_size);
@ -585,7 +589,7 @@ public:
s3_configuration_.client,
bucket,
key,
s3_configuration_.rw_settings,
s3_configuration_.request_settings,
std::nullopt,
DBMS_DEFAULT_BUFFER_SIZE,
threadPoolCallbackRunner<void>(IOThreadPool::get(), "S3ParallelRead"),
@ -749,7 +753,7 @@ StorageS3::StorageS3(
bool distributed_processing_,
ASTPtr partition_by_)
: IStorage(table_id_)
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers}
, keys({s3_configuration.uri.key})
, format_name(configuration_.format)
, compression_method(configuration_.compression_method)
@ -815,7 +819,7 @@ std::shared_ptr<StorageS3Source::IteratorWrapper> StorageS3::createFileIterator(
{
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.uri, query, virtual_block, local_context, object_infos, read_keys);
*s3_configuration.client, s3_configuration.uri, query, virtual_block, local_context, object_infos, read_keys, s3_configuration.request_settings);
return std::make_shared<StorageS3Source::IteratorWrapper>([glob_iterator]() { return glob_iterator->next(); });
}
else
@ -905,7 +909,7 @@ Pipe StorageS3::read(
format_settings,
columns_description,
max_block_size,
s3_configuration.rw_settings.max_single_read_retries,
s3_configuration.request_settings,
compression_method,
s3_configuration.client,
s3_configuration.uri.bucket,
@ -1022,12 +1026,10 @@ void StorageS3::truncate(const ASTPtr & /* query */, const StorageMetadataPtr &,
void StorageS3::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configuration & upd)
{
auto settings = ctx->getStorageS3Settings().getSettings(upd.uri.uri.toString());
const auto & config_rw_settings = settings.rw_settings;
if (upd.request_settings != settings.request_settings)
upd.request_settings = settings.request_settings;
if (upd.rw_settings != config_rw_settings)
upd.rw_settings = settings.rw_settings;
upd.rw_settings.updateFromSettingsIfEmpty(ctx->getSettings());
upd.request_settings.updateFromSettingsIfEmpty(ctx->getSettings());
if (upd.client)
{
@ -1045,10 +1047,12 @@ void StorageS3::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configuration
ctx->getRemoteHostFilter(),
static_cast<unsigned>(ctx->getGlobalContext()->getSettingsRef().s3_max_redirects),
ctx->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
/* for_disk_s3 = */ false);
/* for_disk_s3 = */ false,
upd.request_settings.get_request_throttler,
upd.request_settings.put_request_throttler);
client_configuration.endpointOverride = upd.uri.endpoint;
client_configuration.maxConnections = static_cast<unsigned>(upd.rw_settings.max_connections);
client_configuration.maxConnections = static_cast<unsigned>(upd.request_settings.max_connections);
auto credentials = Aws::Auth::AWSCredentials(upd.auth_settings.access_key_id, upd.auth_settings.secret_access_key);
auto headers = upd.auth_settings.headers;
@ -1080,17 +1084,17 @@ void StorageS3::processNamedCollectionResult(StorageS3Configuration & configurat
else if (arg_name == "use_environment_credentials")
configuration.auth_settings.use_environment_credentials = checkAndGetLiteralArgument<UInt8>(arg_value, "use_environment_credentials");
else if (arg_name == "max_single_read_retries")
configuration.rw_settings.max_single_read_retries = checkAndGetLiteralArgument<UInt64>(arg_value, "max_single_read_retries");
configuration.request_settings.max_single_read_retries = checkAndGetLiteralArgument<UInt64>(arg_value, "max_single_read_retries");
else if (arg_name == "min_upload_part_size")
configuration.rw_settings.max_single_read_retries = checkAndGetLiteralArgument<UInt64>(arg_value, "min_upload_part_size");
configuration.request_settings.min_upload_part_size = checkAndGetLiteralArgument<UInt64>(arg_value, "min_upload_part_size");
else if (arg_name == "upload_part_size_multiply_factor")
configuration.rw_settings.max_single_read_retries = checkAndGetLiteralArgument<UInt64>(arg_value, "upload_part_size_multiply_factor");
configuration.request_settings.upload_part_size_multiply_factor = checkAndGetLiteralArgument<UInt64>(arg_value, "upload_part_size_multiply_factor");
else if (arg_name == "upload_part_size_multiply_parts_count_threshold")
configuration.rw_settings.max_single_read_retries = checkAndGetLiteralArgument<UInt64>(arg_value, "upload_part_size_multiply_parts_count_threshold");
configuration.request_settings.upload_part_size_multiply_parts_count_threshold = checkAndGetLiteralArgument<UInt64>(arg_value, "upload_part_size_multiply_parts_count_threshold");
else if (arg_name == "max_single_part_upload_size")
configuration.rw_settings.max_single_read_retries = checkAndGetLiteralArgument<UInt64>(arg_value, "max_single_part_upload_size");
configuration.request_settings.max_single_part_upload_size = checkAndGetLiteralArgument<UInt64>(arg_value, "max_single_part_upload_size");
else if (arg_name == "max_connections")
configuration.rw_settings.max_single_read_retries = checkAndGetLiteralArgument<UInt64>(arg_value, "max_connections");
configuration.request_settings.max_connections = checkAndGetLiteralArgument<UInt64>(arg_value, "max_connections");
else
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Unknown key-value argument `{}` for StorageS3, expected: url, [access_key_id, secret_access_key], name of used format and [compression_method].",
@ -1166,7 +1170,7 @@ ColumnsDescription StorageS3::getTableStructureFromData(
S3Configuration s3_configuration{
configuration.url,
configuration.auth_settings,
S3Settings::ReadWriteSettings(ctx->getSettingsRef()),
S3Settings::RequestSettings(ctx->getSettingsRef()),
configuration.headers};
updateS3Configuration(ctx, s3_configuration);
@ -1228,7 +1232,7 @@ ColumnsDescription StorageS3::getTableStructureFromDataImpl(
int zstd_window_log_max = static_cast<int>(ctx->getSettingsRef().zstd_window_log_max);
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>(
s3_configuration.client, s3_configuration.uri.bucket, key, s3_configuration.uri.version_id, s3_configuration.rw_settings.max_single_read_retries, ctx->getReadSettings()),
s3_configuration.client, s3_configuration.uri.bucket, key, s3_configuration.uri.version_id, s3_configuration.request_settings, ctx->getReadSettings()),
chooseCompressionMethod(key, compression_method),
zstd_window_log_max);
};

View File

@ -43,7 +43,8 @@ public:
const Block & virtual_header,
ContextPtr context,
std::unordered_map<String, S3::ObjectInfo> * object_infos = nullptr,
Strings * read_keys_ = nullptr);
Strings * read_keys_ = nullptr,
const S3Settings::RequestSettings & request_settings_ = {});
String next();
@ -79,7 +80,7 @@ public:
std::optional<FormatSettings> format_settings_,
const ColumnsDescription & columns_,
UInt64 max_block_size_,
UInt64 max_single_read_retries_,
const S3Settings::RequestSettings & request_settings_,
String compression_hint_,
const std::shared_ptr<const Aws::S3::S3Client> & client_,
const String & bucket,
@ -102,7 +103,7 @@ private:
String format;
ColumnsDescription columns_desc;
UInt64 max_block_size;
UInt64 max_single_read_retries;
S3Settings::RequestSettings request_settings;
String compression_hint;
std::shared_ptr<const Aws::S3::S3Client> client;
Block sample_block;
@ -186,7 +187,7 @@ public:
std::shared_ptr<const Aws::S3::S3Client> client;
S3::AuthSettings auth_settings;
S3Settings::ReadWriteSettings rw_settings;
S3Settings::RequestSettings request_settings;
/// If s3 configuration was passed from ast, then it is static.
/// If from config - it can be changed with config reload.
@ -198,11 +199,11 @@ public:
S3Configuration(
const String & url_,
const S3::AuthSettings & auth_settings_,
const S3Settings::ReadWriteSettings & rw_settings_,
const S3Settings::RequestSettings & request_settings_,
const HeaderCollection & headers_from_ast_)
: uri(S3::URI(url_))
, auth_settings(auth_settings_)
, rw_settings(rw_settings_)
, request_settings(request_settings_)
, static_configuration(!auth_settings_.access_key_id.empty())
, headers_from_ast(headers_from_ast_) {}
};

View File

@ -46,7 +46,7 @@ StorageS3Cluster::StorageS3Cluster(
const ConstraintsDescription & constraints_,
ContextPtr context_)
: IStorage(table_id_)
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers}
, s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.request_settings, configuration_.headers}
, filename(configuration_.url)
, cluster_name(configuration_.cluster_name)
, format_name(configuration_.format)

View File

@ -4,6 +4,7 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Exception.h>
#include <Common/Throttler.h>
#include <Interpreters/Context.h>
#include <base/unit.h>
#include <boost/algorithm/string/predicate.hpp>
@ -57,18 +58,26 @@ void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::U
auto auth_settings = S3::AuthSettings::loadFromConfig(config_elem + "." + key, config);
S3Settings::ReadWriteSettings rw_settings;
rw_settings.max_single_read_retries = get_uint_for_key(key, "max_single_read_retries", true, settings.s3_max_single_read_retries);
rw_settings.min_upload_part_size = get_uint_for_key(key, "min_upload_part_size", true, settings.s3_min_upload_part_size);
rw_settings.max_upload_part_size = get_uint_for_key(key, "max_upload_part_size", true, DEFAULT_MAX_UPLOAD_PART_SIZE);
rw_settings.upload_part_size_multiply_factor = get_uint_for_key(key, "upload_part_size_multiply_factor", true, settings.s3_upload_part_size_multiply_factor);
rw_settings.upload_part_size_multiply_parts_count_threshold = get_uint_for_key(key, "upload_part_size_multiply_parts_count_threshold", true, settings.s3_upload_part_size_multiply_parts_count_threshold);
rw_settings.max_single_part_upload_size = get_uint_for_key(key, "max_single_part_upload_size", true, settings.s3_max_single_part_upload_size);
rw_settings.max_single_operation_copy_size = get_uint_for_key(key, "max_single_operation_copy_size", true, DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE);
rw_settings.max_connections = get_uint_for_key(key, "max_connections", true, settings.s3_max_connections);
rw_settings.check_objects_after_upload = get_bool_for_key(key, "check_objects_after_upload", true, false);
S3Settings::RequestSettings request_settings;
request_settings.max_single_read_retries = get_uint_for_key(key, "max_single_read_retries", true, settings.s3_max_single_read_retries);
request_settings.min_upload_part_size = get_uint_for_key(key, "min_upload_part_size", true, settings.s3_min_upload_part_size);
request_settings.max_upload_part_size = get_uint_for_key(key, "max_upload_part_size", true, DEFAULT_MAX_UPLOAD_PART_SIZE);
request_settings.upload_part_size_multiply_factor = get_uint_for_key(key, "upload_part_size_multiply_factor", true, settings.s3_upload_part_size_multiply_factor);
request_settings.upload_part_size_multiply_parts_count_threshold = get_uint_for_key(key, "upload_part_size_multiply_parts_count_threshold", true, settings.s3_upload_part_size_multiply_parts_count_threshold);
request_settings.max_single_part_upload_size = get_uint_for_key(key, "max_single_part_upload_size", true, settings.s3_max_single_part_upload_size);
request_settings.max_single_operation_copy_size = get_uint_for_key(key, "max_single_operation_copy_size", true, DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE);
request_settings.max_connections = get_uint_for_key(key, "max_connections", true, settings.s3_max_connections);
request_settings.check_objects_after_upload = get_bool_for_key(key, "check_objects_after_upload", true, false);
s3_settings.emplace(endpoint, S3Settings{std::move(auth_settings), std::move(rw_settings)});
// NOTE: it would be better to reuse old throttlers to avoid losing token bucket state on every config reload, which could lead to exceeding limit for short time. But it is good enough unless very high `burst` values are used.
if (UInt64 max_get_rps = get_uint_for_key(key, "max_get_rps", true, settings.s3_max_get_rps))
request_settings.get_request_throttler = std::make_shared<Throttler>(
max_get_rps, get_uint_for_key(key, "max_get_burst", true, settings.s3_max_get_burst ? settings.s3_max_get_burst : Throttler::default_burst_seconds * max_get_rps));
if (UInt64 max_put_rps = get_uint_for_key(key, "max_put_rps", true, settings.s3_max_put_rps))
request_settings.put_request_throttler = std::make_shared<Throttler>(
max_put_rps, get_uint_for_key(key, "max_put_burst", true, settings.s3_max_put_burst ? settings.s3_max_put_burst : Throttler::default_burst_seconds * max_put_rps));
s3_settings.emplace(endpoint, S3Settings{std::move(auth_settings), std::move(request_settings)});
}
}
}
@ -89,7 +98,7 @@ S3Settings StorageS3Settings::getSettings(const String & endpoint) const
return {};
}
S3Settings::ReadWriteSettings::ReadWriteSettings(const Settings & settings)
S3Settings::RequestSettings::RequestSettings(const Settings & settings)
{
max_single_read_retries = settings.s3_max_single_read_retries;
min_upload_part_size = settings.s3_min_upload_part_size;
@ -99,9 +108,15 @@ S3Settings::ReadWriteSettings::ReadWriteSettings(const Settings & settings)
max_connections = settings.s3_max_connections;
check_objects_after_upload = settings.s3_check_objects_after_upload;
max_unexpected_write_error_retries = settings.s3_max_unexpected_write_error_retries;
if (settings.s3_max_get_rps)
get_request_throttler = std::make_shared<Throttler>(
settings.s3_max_get_rps, settings.s3_max_get_burst ? settings.s3_max_get_burst : Throttler::default_burst_seconds * settings.s3_max_get_rps);
if (settings.s3_max_put_rps)
put_request_throttler = std::make_shared<Throttler>(
settings.s3_max_put_rps, settings.s3_max_put_burst ? settings.s3_max_put_burst : Throttler::default_burst_seconds * settings.s3_max_put_rps);
}
void S3Settings::ReadWriteSettings::updateFromSettingsIfEmpty(const Settings & settings)
void S3Settings::RequestSettings::updateFromSettingsIfEmpty(const Settings & settings)
{
if (!max_single_read_retries)
max_single_read_retries = settings.s3_max_single_read_retries;
@ -122,6 +137,12 @@ void S3Settings::ReadWriteSettings::updateFromSettingsIfEmpty(const Settings & s
if (!max_unexpected_write_error_retries)
max_unexpected_write_error_retries = settings.s3_max_unexpected_write_error_retries;
check_objects_after_upload = settings.s3_check_objects_after_upload;
if (!get_request_throttler && settings.s3_max_get_rps)
get_request_throttler = std::make_shared<Throttler>(
settings.s3_max_get_rps, settings.s3_max_get_burst ? settings.s3_max_get_burst : Throttler::default_burst_seconds * settings.s3_max_get_rps);
if (!put_request_throttler && settings.s3_max_put_rps)
put_request_throttler = std::make_shared<Throttler>(
settings.s3_max_put_rps, settings.s3_max_put_burst ? settings.s3_max_put_burst : Throttler::default_burst_seconds * settings.s3_max_put_rps);
}
}

View File

@ -7,6 +7,7 @@
#include <vector>
#include <base/types.h>
#include <Interpreters/Context_fwd.h>
#include <Common/Throttler_fwd.h>
#include <Storages/HeaderCollection.h>
#include <IO/S3Common.h>
@ -23,7 +24,7 @@ struct Settings;
struct S3Settings
{
struct ReadWriteSettings
struct RequestSettings
{
size_t max_single_read_retries = 0;
size_t min_upload_part_size = 0;
@ -35,11 +36,13 @@ struct S3Settings
size_t max_connections = 0;
bool check_objects_after_upload = false;
size_t max_unexpected_write_error_retries = 0;
ThrottlerPtr get_request_throttler;
ThrottlerPtr put_request_throttler;
ReadWriteSettings() = default;
explicit ReadWriteSettings(const Settings & settings);
RequestSettings() = default;
explicit RequestSettings(const Settings & settings);
inline bool operator==(const ReadWriteSettings & other) const
inline bool operator==(const RequestSettings & other) const
{
return max_single_read_retries == other.max_single_read_retries
&& min_upload_part_size == other.min_upload_part_size
@ -50,18 +53,20 @@ struct S3Settings
&& max_single_operation_copy_size == other.max_single_operation_copy_size
&& max_connections == other.max_connections
&& check_objects_after_upload == other.check_objects_after_upload
&& max_unexpected_write_error_retries == other.max_unexpected_write_error_retries;
&& max_unexpected_write_error_retries == other.max_unexpected_write_error_retries
&& get_request_throttler == other.get_request_throttler
&& put_request_throttler == other.put_request_throttler;
}
void updateFromSettingsIfEmpty(const Settings & settings);
};
S3::AuthSettings auth_settings;
ReadWriteSettings rw_settings;
RequestSettings request_settings;
inline bool operator==(const S3Settings & other) const
{
return auth_settings == other.auth_settings && rw_settings == other.rw_settings;
return auth_settings == other.auth_settings && request_settings == other.request_settings;
}
};

View File

@ -0,0 +1,18 @@
<clickhouse>
<storage_configuration>
<disks>
<disk2>
<path>/var/lib/clickhouse2/</path>
</disk2>
</disks>
<policies>
<test_policy>
<volumes>
<volume2>
<disk>disk2</disk>
</volume2>
</volumes>
</test_policy>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,24 @@
<clickhouse>
<storage_configuration>
<disks>
<disk1>
<path>/var/lib/clickhouse1/</path>
</disk1>
<disk2>
<path>/var/lib/clickhouse2/</path>
</disk2>
</disks>
<policies>
<test_policy>
<volumes>
<volume1>
<disk>disk1</disk>
</volume1>
<volume2>
<disk>disk2</disk>
</volume2>
</volumes>
</test_policy>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,40 @@
import os
import pytest
from helpers.test_tools import TSV
from helpers.cluster import ClickHouseCluster
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_DIR = os.path.join(SCRIPT_DIR, "configs")
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", main_configs=["configs/disks.xml"], stay_alive=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_storage_policy_configuration_change(started_cluster):
node.query(
"CREATE TABLE a (x UInt64) ENGINE = MergeTree ORDER BY x SETTINGS storage_policy = 'test_policy'"
)
node.stop_clickhouse()
node.copy_file_to_container(
os.path.join(CONFIG_DIR, "disk2_only.xml"),
"/etc/clickhouse-server/config.d/disks.xml",
)
node.start_clickhouse()
node.stop_clickhouse()
node.copy_file_to_container(
os.path.join(CONFIG_DIR, "disks.xml"),
"/etc/clickhouse-server/config.d/disks.xml",
)
node.start_clickhouse()

View File

@ -0,0 +1,2 @@
1
1 1 1

View File

@ -0,0 +1,33 @@
#!/usr/bin/env bash
# Tags: no-fasttest
# Tag no-fasttest: needs s3
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -nq "
-- Limit S3 PUT request per second rate
SET s3_max_put_rps = 2;
SET s3_max_put_burst = 1;
CREATE TEMPORARY TABLE times (t DateTime);
-- INSERT query requires 3 PUT requests and 1/rps = 0.5 second in between, the first query is not throttled due to burst
INSERT INTO times SELECT now();
INSERT INTO TABLE FUNCTION s3('http://localhost:11111/test/request-throttler.csv', 'test', 'testtest', 'CSV', 'number UInt64') SELECT number FROM numbers(1000000) SETTINGS s3_max_single_part_upload_size = 10000, s3_truncate_on_insert = 1;
INSERT INTO times SELECT now();
SELECT max(t) - min(t) >= 1 FROM times;
SYSTEM FLUSH LOGS;
SELECT ProfileEvents['S3CreateMultipartUpload'] == 1,
ProfileEvents['S3UploadPart'] == 1,
ProfileEvents['S3CompleteMultipartUpload'] == 1
FROM system.query_log
WHERE query LIKE '%request-throttler.csv%'
AND type = 'QueryFinish'
AND current_database = currentDatabase()
ORDER BY query_start_time DESC
LIMIT 1;
"

View File

@ -1,80 +0,0 @@
#!/bin/bash
# This is a script to automate the SECURITY.md generation in the repository root.
# The logic is the following:
# We support the latest ClickHouse Y.M stable release,
# the two releases before the latest stable,
# and the two latest LTS releases (which may be already included by the criteria above).
# The LTS releases are every Y.3 and Y.8 stable release.
echo "
# Security Policy
## Security Announcements
Security fixes will be announced by posting them in the [security changelog](https://clickhouse.com/docs/en/whats-new/security-changelog/).
## Scope and Supported Versions
The following versions of ClickHouse server are currently being supported with security updates:
"
clickhouse-local --query "
SELECT
y::String || '.' || (y < toYear(today()) - 2000 - 1 ? '*' : m::String) AS Version,
(n <= 3 OR (is_lts AND lts_n <= 2)) ? '✔️' : '❌' AS Supported
FROM
(
SELECT
y,
m,
count() OVER (ORDER BY y DESC, m DESC) AS n,
m IN (3, 8) AS is_lts,
countIf(is_lts) OVER (ORDER BY y DESC, m DESC) AS lts_n
FROM
(
WITH
extractGroups(version, 'v(\\d+)\\.(\\d+)') AS v,
v[1]::UInt8 AS y,
v[2]::UInt8 AS m
SELECT
y,
m
FROM file('$(dirname "${BASH_SOURCE[0]}")/../list-versions/version_date.tsv', TSV, 'version String, date String')
ORDER BY
y DESC,
m DESC
LIMIT 1 BY
y,
m
)
)
LIMIT 1 BY Version
FORMAT Markdown"
echo "
## Reporting a Vulnerability
We're extremely grateful for security researchers and users that report vulnerabilities to the ClickHouse Open Source Community. All reports are thoroughly investigated by developers.
To report a potential vulnerability in ClickHouse please send the details about it to [security@clickhouse.com](mailto:security@clickhouse.com). We do not offer any financial rewards for reporting issues to us using this method. Alternatively, you can also submit your findings through our public bug bounty program hosted by [Bugcrowd](https://bugcrowd.com/clickhouse) and be rewarded for it as per the program scope and rules of engagement.
### When Should I Report a Vulnerability?
- You think you discovered a potential security vulnerability in ClickHouse
- You are unsure how a vulnerability affects ClickHouse
### When Should I NOT Report a Vulnerability?
- You need help tuning ClickHouse components for security
- You need help applying security related updates
- Your issue is not security related
## Security Vulnerability Response
Each report is acknowledged and analyzed by ClickHouse maintainers within 5 working days.
As the security issue moves from triage, to identified fix, to release planning we will keep the reporter updated.
## Public Disclosure Timing
A public disclosure date is negotiated by the ClickHouse maintainers and the bug submitter. We prefer to fully disclose the bug as soon as possible once a user mitigation is available. It is reasonable to delay disclosure when the bug or the fix is not yet fully understood, the solution is not well-tested, or for vendor coordination. The timeframe for disclosure is from immediate (especially if it's already publicly known) to 90 days. For a vulnerability with a straightforward mitigation, we expect the report date to disclosure date to be on the order of 7 days.
"

View File

@ -0,0 +1,102 @@
#!/usr/bin/env python
from pathlib import Path
from typing import List
VERSIONS_FILE = (
Path(__file__).absolute().parent.parent / "list-versions" / "version_date.tsv"
)
HEADER = """<!--
the file is autogenerated by utils/security-generator/generate_security.py
-->
# Security Policy
## Security Announcements
Security fixes will be announced by posting them in the [security changelog](https://clickhouse.com/docs/en/whats-new/security-changelog/).
## Scope and Supported Versions
The following versions of ClickHouse server are currently being supported with security updates:
"""
FOOTER = """## Reporting a Vulnerability
We're extremely grateful for security researchers and users that report vulnerabilities to the ClickHouse Open Source Community. All reports are thoroughly investigated by developers.
To report a potential vulnerability in ClickHouse please send the details about it to [security@clickhouse.com](mailto:security@clickhouse.com). We do not offer any financial rewards for reporting issues to us using this method. Alternatively, you can also submit your findings through our public bug bounty program hosted by [Bugcrowd](https://bugcrowd.com/clickhouse) and be rewarded for it as per the program scope and rules of engagement.
### When Should I Report a Vulnerability?
- You think you discovered a potential security vulnerability in ClickHouse
- You are unsure how a vulnerability affects ClickHouse
### When Should I NOT Report a Vulnerability?
- You need help tuning ClickHouse components for security
- You need help applying security related updates
- Your issue is not security related
## Security Vulnerability Response
Each report is acknowledged and analyzed by ClickHouse maintainers within 5 working days.
As the security issue moves from triage, to identified fix, to release planning we will keep the reporter updated.
## Public Disclosure Timing
A public disclosure date is negotiated by the ClickHouse maintainers and the bug submitter. We prefer to fully disclose the bug as soon as possible once a user mitigation is available. It is reasonable to delay disclosure when the bug or the fix is not yet fully understood, the solution is not well-tested, or for vendor coordination. The timeframe for disclosure is from immediate (especially if it's already publicly known) to 90 days. For a vulnerability with a straightforward mitigation, we expect the report date to disclosure date to be on the order of 7 days.
"""
def generate_supported_versions():
with open(VERSIONS_FILE, "r", encoding="utf-8") as fd:
versions = [line.split(maxsplit=1)[0][1:] for line in fd.readlines()]
# The versions in VERSIONS_FILE are ordered ascending, so the first one is
# the greatest one. We may have supported versions in the previous year
unsupported_year = int(versions[0].split(".", maxsplit=1)[0]) - 2
# 3 supported versions
supported = [] # type: List[str]
# 2 LTS versions, one of them could be in supported
lts = [] # type: List[str]
# The rest are unsupported
unsupported = [] # type: List[str]
table = [
"| Version | Supported |",
"|:-|:-|",
]
for version in versions:
year = int(version.split(".")[0])
month = int(version.split(".")[1])
version = f"{year}.{month}"
if version in supported or version in lts:
continue
if len(supported) < 3:
supported.append(version)
if len(lts) < 2 and month in [3, 8]:
# The version can be LTS as well
lts.append(version)
table.append(f"| {version} | ✔️ |")
continue
if len(lts) < 2 and month in [3, 8]:
lts.append(version)
table.append(f"| {version} | ✔️ |")
continue
if year <= unsupported_year:
# The whole year is unsopported
version = f"{year}.*"
if not version in unsupported:
unsupported.append(version)
table.append(f"| {version} | ❌ |")
return "\n".join(table) + "\n"
def main():
print(HEADER)
print(generate_supported_versions())
print(FOOTER)
if __name__ == "__main__":
main()