add test with merge tree table

This commit is contained in:
Sema Checherinda 2023-05-21 12:42:28 +02:00
parent 4249bda449
commit 206efee9b7
12 changed files with 173 additions and 124 deletions

View File

@ -94,6 +94,7 @@ class IColumn;
M(Bool, s3_check_objects_after_upload, false, "Check each uploaded object to s3 with head request to be sure that upload was successful", 0) \
M(Bool, s3_allow_parallel_part_upload, true, "Use multiple threads for s3 multipart upload. It may lead to slightly higher memory usage", 0) \
M(Bool, s3_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \
M(UInt64, s3_retry_attempts, 10, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries", 0) \
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \

View File

@ -146,7 +146,8 @@ std::unique_ptr<S3::Client> getClient(
S3::ServerSideEncryptionKMSConfig sse_kms_config = S3::getSSEKMSConfig(config_prefix, config);
client_configuration.retryStrategy
= std::make_shared<Aws::Client::DefaultRetryStrategy>(config.getUInt(config_prefix + ".retry_attempts", 10));
= std::make_shared<Aws::Client::DefaultRetryStrategy>(
config.getUInt64(config_prefix + ".retry_attempts", settings.request_settings.retry_attempts));
return S3::ClientFactory::instance().create(
client_configuration,

View File

@ -96,7 +96,7 @@ WriteBufferFromS3::WriteBufferFromS3(
, task_tracker(
std::make_unique<WriteBufferFromS3::TaskTracker>(
std::move(schedule_),
upload_settings.s3_max_inflight_parts_for_one_file))
upload_settings.max_inflight_parts_for_one_file))
{
LOG_TRACE(log, "Create WriteBufferFromS3, {}", getLogDetails());

View File

@ -153,7 +153,7 @@ void WriteBufferFromS3::TaskTracker::add(Callback && func)
*future_placeholder = scheduler(std::move(func_with_notification), Priority{});
LOG_TEST(log, "add ended, in queue {}", futures.size());
LOG_TEST(log, "add ended, in queue {}, limit {}", futures.size(), max_tasks_inflight);
waitInFlight();
}

View File

@ -1263,6 +1263,11 @@ void StorageS3::Configuration::connect(ContextPtr context)
if (!headers_from_ast.empty())
headers.insert(headers.end(), headers_from_ast.begin(), headers_from_ast.end());
client_configuration.requestTimeoutMs = request_settings.request_timeout_ms;
client_configuration.retryStrategy
= std::make_shared<Aws::Client::DefaultRetryStrategy>(request_settings.retry_attempts);
auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key);
client = S3::ClientFactory::instance().create(
client_configuration,
@ -1273,11 +1278,11 @@ void StorageS3::Configuration::connect(ContextPtr context)
auth_settings.server_side_encryption_kms_config,
std::move(headers),
S3::CredentialsConfiguration{
auth_settings.use_environment_credentials.value_or(context->getConfigRef().getBool("s3.use_environment_credentials", true)),
auth_settings.use_insecure_imds_request.value_or(context->getConfigRef().getBool("s3.use_insecure_imds_request", false)),
auth_settings.expiration_window_seconds.value_or(
context->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
auth_settings.no_sign_request.value_or(context->getConfigRef().getBool("s3.no_sign_request", false)),
auth_settings.use_environment_credentials.value_or(context->getConfigRef().getBool("s3.use_environment_credentials", true)),
auth_settings.use_insecure_imds_request.value_or(context->getConfigRef().getBool("s3.use_insecure_imds_request", false)),
auth_settings.expiration_window_seconds.value_or(
context->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
auth_settings.no_sign_request.value_or(context->getConfigRef().getBool("s3.no_sign_request", false)),
});
}

View File

@ -37,7 +37,7 @@ S3Settings::RequestSettings::PartUploadSettings::PartUploadSettings(
max_upload_part_size = config.getUInt64(key + "max_upload_part_size", max_upload_part_size);
upload_part_size_multiply_factor = config.getUInt64(key + "upload_part_size_multiply_factor", upload_part_size_multiply_factor);
upload_part_size_multiply_parts_count_threshold = config.getUInt64(key + "upload_part_size_multiply_parts_count_threshold", upload_part_size_multiply_parts_count_threshold);
s3_max_inflight_parts_for_one_file = config.getUInt64(key + "s3_max_inflight_parts_for_one_file", s3_max_inflight_parts_for_one_file);
max_inflight_parts_for_one_file = config.getUInt64(key + "max_inflight_parts_for_one_file", max_inflight_parts_for_one_file);
max_part_number = config.getUInt64(key + "max_part_number", max_part_number);
max_single_part_upload_size = config.getUInt64(key + "max_single_part_upload_size", max_single_part_upload_size);
max_single_operation_copy_size = config.getUInt64(key + "max_single_operation_copy_size", max_single_operation_copy_size);
@ -56,7 +56,7 @@ S3Settings::RequestSettings::PartUploadSettings::PartUploadSettings(const NamedC
max_single_part_upload_size = collection.getOrDefault<UInt64>("max_single_part_upload_size", max_single_part_upload_size);
upload_part_size_multiply_factor = collection.getOrDefault<UInt64>("upload_part_size_multiply_factor", upload_part_size_multiply_factor);
upload_part_size_multiply_parts_count_threshold = collection.getOrDefault<UInt64>("upload_part_size_multiply_parts_count_threshold", upload_part_size_multiply_parts_count_threshold);
s3_max_inflight_parts_for_one_file = collection.getOrDefault<UInt64>("s3_max_inflight_parts_for_one_file", s3_max_inflight_parts_for_one_file);
max_inflight_parts_for_one_file = collection.getOrDefault<UInt64>("max_inflight_parts_for_one_file", max_inflight_parts_for_one_file);
/// This configuration is only applicable to s3. Other types of object storage are not applicable or have different meanings.
storage_class_name = collection.getOrDefault<String>("s3_storage_class", storage_class_name);
@ -83,7 +83,7 @@ void S3Settings::RequestSettings::PartUploadSettings::updateFromSettingsImpl(con
upload_part_size_multiply_parts_count_threshold = settings.s3_upload_part_size_multiply_parts_count_threshold;
if (!if_changed || settings.s3_max_inflight_parts_for_one_file.changed)
s3_max_inflight_parts_for_one_file = settings.s3_max_inflight_parts_for_one_file;
max_inflight_parts_for_one_file = settings.s3_max_inflight_parts_for_one_file;
if (!if_changed || settings.s3_max_single_part_upload_size.changed)
max_single_part_upload_size = settings.s3_max_single_part_upload_size;
@ -198,6 +198,8 @@ S3Settings::RequestSettings::RequestSettings(
check_objects_after_upload = config.getBool(key + "check_objects_after_upload", settings.s3_check_objects_after_upload);
list_object_keys_size = config.getUInt64(key + "list_object_keys_size", settings.s3_list_object_keys_size);
throw_on_zero_files_match = config.getBool(key + "throw_on_zero_files_match", settings.s3_throw_on_zero_files_match);
retry_attempts = config.getUInt64(key + "retry_attempts", settings.s3_retry_attempts);
request_timeout_ms = config.getUInt64(key + "request_timeout_ms", request_timeout_ms);
/// NOTE: it would be better to reuse old throttlers to avoid losing token bucket state on every config reload,
/// which could lead to exceeding limit for short time. But it is good enough unless very high `burst` values are used.
@ -248,8 +250,11 @@ void S3Settings::RequestSettings::updateFromSettingsImpl(const Settings & settin
put_request_throttler = std::make_shared<Throttler>(
settings.s3_max_put_rps, settings.s3_max_put_burst ? settings.s3_max_put_burst : Throttler::default_burst_seconds * settings.s3_max_put_rps);
if (!if_changed || settings.s3_throw_on_zero_files_match)
if (!if_changed || settings.s3_throw_on_zero_files_match.changed)
throw_on_zero_files_match = settings.s3_throw_on_zero_files_match;
if (!if_changed || settings.s3_retry_attempts.changed)
retry_attempts = settings.s3_retry_attempts;
}
void S3Settings::RequestSettings::updateFromSettings(const Settings & settings)

View File

@ -33,7 +33,7 @@ struct S3Settings
size_t max_upload_part_size = 5ULL * 1024 * 1024 * 1024;
size_t upload_part_size_multiply_factor = 2;
size_t upload_part_size_multiply_parts_count_threshold = 500;
size_t s3_max_inflight_parts_for_one_file = 20;
size_t max_inflight_parts_for_one_file = 20;
size_t max_part_number = 10000;
size_t max_single_part_upload_size = 32 * 1024 * 1024;
size_t max_single_operation_copy_size = 5ULL * 1024 * 1024 * 1024;
@ -68,6 +68,8 @@ struct S3Settings
size_t list_object_keys_size = 1000;
ThrottlerPtr get_request_throttler;
ThrottlerPtr put_request_throttler;
size_t retry_attempts = 10;
size_t request_timeout_ms = 30000;
bool throw_on_zero_files_match = false;

View File

@ -27,10 +27,10 @@
<endpoint>http://resolver:8083/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<s3_retry_attempts>1</s3_retry_attempts>
<retry_attempts>0</retry_attempts>
<connect_timeout_ms>20000</connect_timeout_ms>
<request_timeout_ms>30000</request_timeout_ms>
<skip_access_check>true</skip_access_check>
<request_timeout_ms>20000</request_timeout_ms>
<s3_max_inflight_parts_for_one_file>1</s3_max_inflight_parts_for_one_file>
</broken_s3>
<hdd>
<type>local</type>

View File

@ -0,0 +1,8 @@
<clickhouse>
<profiles>
<default>
<enable_s3_requests_logging>1</enable_s3_requests_logging>
<s3_max_inflight_parts_for_one_file>20</s3_max_inflight_parts_for_one_file>
</default>
</profiles>
</clickhouse>

View File

@ -1,14 +1,11 @@
<clickhouse>
<profiles>
<default>
<s3_check_objects_after_upload> 1 </s3_check_objects_after_upload>
<enable_s3_requests_logging> 1 </enable_s3_requests_logging>
<http_send_timeout>60</http_send_timeout>
<http_receive_timeout>60</http_receive_timeout>
<send_timeout>60</send_timeout>
<receive_timeout>60</receive_timeout>
</default>
</profiles>
<s3>
<broken_s3>
<endpoint>http://resolver:8083/root/data/</endpoint>
<retry_attempts>0</retry_attempts>
<request_timeout_ms>20000</request_timeout_ms>
</broken_s3>
</s3>
<enable_system_unfreeze>true</enable_system_unfreeze>
</clickhouse>

View File

@ -26,6 +26,9 @@ def cluster():
"configs/config.d/storage_conf.xml",
"configs/config.d/bg_processing_pool_conf.xml",
],
user_configs=[
"configs/config.d/users.xml",
],
stay_alive=True,
with_minio=True,
)
@ -139,14 +142,76 @@ def clear_minio(cluster):
yield
class BrokenS3:
@staticmethod
def reset(cluster):
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
[
"curl",
"-s",
f"http://localhost:8083/mock_settings/reset",
],
nothrow=True,
)
assert response == "OK"
@staticmethod
def setup_fail_upload(cluster, part_length):
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
[
"curl",
"-s",
f"http://localhost:8083/mock_settings/error_at_put?when_length_bigger={part_length}",
],
nothrow=True,
)
assert response == "OK"
@staticmethod
def setup_fake_upload(cluster, part_length):
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
[
"curl",
"-s",
f"http://localhost:8083/mock_settings/fake_put?when_length_bigger={part_length}",
],
nothrow=True,
)
assert response == "OK"
@staticmethod
def setup_slow_answers(
cluster, minimal_length=0, timeout=None, probability=None, count=None
):
url = (
f"http://localhost:8083/"
f"mock_settings/slow_put"
f"?minimal_length={minimal_length}"
)
if timeout is not None:
url += f"&timeout={timeout}"
if probability is not None:
url += f"&probability={probability}"
if count is not None:
url += f"&count={count}"
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
["curl", "-s", url],
nothrow=True,
)
assert response == "OK"
@pytest.fixture(autouse=True, scope="function")
def reset_mock_broken_s3(cluster):
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
["curl", "-s", f"http://localhost:8083/mock_settings/reset"],
nothrow=True,
)
assert response == "OK"
def reset_broken_s3(cluster):
BrokenS3.reset(cluster)
yield
@ -886,16 +951,7 @@ def test_merge_canceled_by_s3_errors(cluster, node_name):
min_key = node.query("SELECT min(key) FROM test_merge_canceled_by_s3_errors")
assert int(min_key) == 0, min_key
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
[
"curl",
"-s",
f"http://localhost:8083/mock_settings/error_at_put?when_length_bigger=50000",
],
nothrow=True,
)
assert response == "OK"
BrokenS3.setup_fail_upload(cluster, 50000)
node.query("SYSTEM START MERGES test_merge_canceled_by_s3_errors")
@ -938,16 +994,7 @@ def test_merge_canceled_by_s3_errors_when_move(cluster, node_name):
settings={"materialize_ttl_after_modify": 0},
)
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
[
"curl",
"-s",
f"http://localhost:8083/mock_settings/error_at_put?when_length_bigger=10000",
],
nothrow=True,
)
assert response == "OK"
BrokenS3.setup_fail_upload(cluster, 10000)
node.query("SYSTEM START MERGES merge_canceled_by_s3_errors_when_move")
@ -963,75 +1010,11 @@ def test_merge_canceled_by_s3_errors_when_move(cluster, node_name):
)
def value_or(value, default):
assert default is not None
return value if value is not None else default
class BrokenS3:
@staticmethod
def reset(cluster):
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
[
"curl",
"-s",
f"http://localhost:8083/mock_settings/reset",
],
nothrow=True,
)
assert response == "OK"
@staticmethod
def setup_fake_upload(cluster, part_length):
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
[
"curl",
"-s",
f"http://localhost:8083/mock_settings/fake_put?when_length_bigger={part_length}",
],
nothrow=True,
)
assert response == "OK"
@staticmethod
def setup_slow_answers(
cluster, minimal_length=0, timeout=None, probability=None, count=None
):
url = f"http://localhost:8083/" \
f"mock_settings/slow_put" \
f"?minimal_length={minimal_length}"
if timeout is not None:
url += f"&timeout={timeout}"
if probability is not None:
url += f"&probability={probability}"
if count is not None:
url += f"&count={count}"
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
["curl", "-s", url],
nothrow=True,
)
assert response == "OK"
@pytest.fixture(autouse=True, scope="function")
def reset_broken_s3(cluster):
BrokenS3.reset(cluster)
yield
@pytest.mark.parametrize("node_name", ["node"])
@pytest.mark.parametrize(
"in_flight_memory", [(10, 245918115), (5, 156786752), (1, 106426187)]
)
def test_heavy_write_check_mem(cluster, node_name, in_flight_memory):
def test_s3_engine_heavy_write_check_mem(cluster, node_name, in_flight_memory):
in_flight = in_flight_memory[0]
memory = in_flight_memory[1]
@ -1046,12 +1029,13 @@ def test_heavy_write_check_mem(cluster, node_name, in_flight_memory):
)
BrokenS3.setup_fake_upload(cluster, 1000)
BrokenS3.setup_slow_answers(cluster, 10 * 1024 * 1024, timeout=10, count=10)
BrokenS3.setup_slow_answers(cluster, 10 * 1024 * 1024, timeout=15, count=10)
query_id = f"INSERT_INTO_S3_QUERY_ID_{in_flight}"
query_id = f"INSERT_INTO_S3_ENGINE_QUERY_ID_{in_flight}"
node.query(
"INSERT INTO s3_test SELECT number, toString(number) FROM numbers(50000000)"
f" SETTINGS max_memory_usage={2*memory}, s3_max_inflight_parts_for_one_file={in_flight}",
f" SETTINGS max_memory_usage={2*memory}"
f", s3_max_inflight_parts_for_one_file={in_flight}",
query_id=query_id,
)
@ -1064,7 +1048,53 @@ def test_heavy_write_check_mem(cluster, node_name, in_flight_memory):
" AND type!='QueryStart'"
)
assert int(result) < 1.1 * memory
assert int(result) > 0.9 * memory
assert int(result) < 1.01 * memory
assert int(result) > 0.99 * memory
check_no_objects_after_drop(cluster, node_name=node_name)
@pytest.mark.parametrize("node_name", ["node"])
def test_s3_disk_heavy_write_check_mem(cluster, node_name):
memory = 2279055040
node = cluster.instances[node_name]
node.query("DROP TABLE IF EXISTS s3_test SYNC")
node.query(
"CREATE TABLE s3_test"
" ("
" key UInt32, value String"
" )"
" ENGINE=MergeTree()"
" ORDER BY key"
" SETTINGS"
" storage_policy='broken_s3'",
)
node.query("SYSTEM STOP MERGES s3_test")
BrokenS3.setup_fake_upload(cluster, 1000)
BrokenS3.setup_slow_answers(cluster, 10 * 1024 * 1024, timeout=10, count=50)
query_id = f"INSERT_INTO_S3_DISK_QUERY_ID"
node.query(
"INSERT INTO s3_test SELECT number, toString(number) FROM numbers(50000000)"
f" SETTINGS max_memory_usage={2*memory}"
f", max_insert_block_size=50000000"
f", min_insert_block_size_rows=50000000"
f", min_insert_block_size_bytes=1000000000000",
query_id=query_id,
)
node.query("SYSTEM FLUSH LOGS")
result = node.query(
"SELECT memory_usage"
" FROM system.query_log"
f" WHERE query_id='{query_id}'"
" AND type!='QueryStart'"
)
assert int(result) < 1.01 * memory
assert int(result) > 0.99 * memory
check_no_objects_after_drop(cluster, node_name=node_name)

View File

@ -32,7 +32,7 @@
<!-- ClickHouse starts earlier than custom S3 endpoint. Skip access check to avoid fail on start-up -->
<skip_access_check>true</skip_access_check>
<!-- Avoid extra retries to speed up tests -->
<s3_retry_attempts>1</s3_retry_attempts>
<retry_attempts>1</retry_attempts>
<s3_max_single_read_retries>1</s3_max_single_read_retries>
<connect_timeout_ms>20000</connect_timeout_ms>
</s3_no_retries>