s3_max_inflight_parts_for_one_file with tests

This commit is contained in:
Sema Checherinda 2023-05-17 15:40:36 +02:00
parent 6554e7438e
commit 4249bda449
10 changed files with 379 additions and 65 deletions

View File

@ -78,6 +78,7 @@ class IColumn;
M(UInt64, s3_max_upload_part_size, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_upload_part_size_multiply_factor, 2, "Multiply s3_min_upload_part_size by this factor each time s3_multiply_parts_count_threshold parts were uploaded from a single write to S3.", 0) \
M(UInt64, s3_upload_part_size_multiply_parts_count_threshold, 500, "Each time this number of parts was uploaded to S3 s3_min_upload_part_size multiplied by s3_upload_part_size_multiply_factor.", 0) \
M(UInt64, s3_max_inflight_parts_for_one_file, 20, "The maximum number of a concurrent loaded parts in multipart upload request. 0 means unlimited.", 0) \
M(UInt64, s3_max_single_part_upload_size, 32*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, s3_max_unexpected_write_error_retries, 4, "The maximum number of retries in case of unexpected errors during S3 write.", 0) \

View File

@ -92,8 +92,11 @@ WriteBufferFromS3::WriteBufferFromS3(
, write_settings(write_settings_)
, client_ptr(std::move(client_ptr_))
, object_metadata(std::move(object_metadata_))
, buffer_allocation_policy(ChooseBufferPolicy(request_settings_.getUploadSettings()))
, task_tracker(std::make_unique<WriteBufferFromS3::TaskTracker>(std::move(schedule_)))
, buffer_allocation_policy(ChooseBufferPolicy(upload_settings))
, task_tracker(
std::make_unique<WriteBufferFromS3::TaskTracker>(
std::move(schedule_),
upload_settings.s3_max_inflight_parts_for_one_file))
{
LOG_TRACE(log, "Create WriteBufferFromS3, {}", getLogDetails());
@ -110,7 +113,7 @@ void WriteBufferFromS3::nextImpl()
"Cannot write to prefinalized buffer for S3, the file could have been created with PutObjectRequest");
/// Make sense to call to before adding new async task to check if there is an exception
task_tracker->waitReady();
task_tracker->consumeReady();
hidePartialData();
@ -134,8 +137,6 @@ void WriteBufferFromS3::preFinalize()
LOG_TRACE(log, "preFinalize WriteBufferFromS3. {}", getLogDetails());
task_tracker->waitReady();
hidePartialData();
if (hidden_size > 0)
@ -234,7 +235,7 @@ WriteBufferFromS3::~WriteBufferFromS3()
{
LOG_TRACE(log, "Close WriteBufferFromS3. {}.", getLogDetails());
// That descructor could be call with finalized=false in case of exceptions
// That destructor could be call with finalized=false in case of exceptions
if (!finalized)
{
LOG_ERROR(log, "WriteBufferFromS3 is not finalized in destructor. It could be if an exception occurs. File is not written to S3. {}.", getLogDetails());

View File

@ -7,9 +7,10 @@
namespace DB
{
WriteBufferFromS3::TaskTracker::TaskTracker(ThreadPoolCallbackRunner<void> scheduler_)
WriteBufferFromS3::TaskTracker::TaskTracker(ThreadPoolCallbackRunner<void> scheduler_, size_t max_tasks_inflight_)
: is_async(bool(scheduler_))
, scheduler(scheduler_ ? std::move(scheduler_) : syncRunner())
, max_tasks_inflight(max_tasks_inflight_)
{}
WriteBufferFromS3::TaskTracker::~TaskTracker()
@ -28,34 +29,36 @@ ThreadPoolCallbackRunner<void> WriteBufferFromS3::TaskTracker::syncRunner()
};
}
void WriteBufferFromS3::TaskTracker::waitReady()
size_t WriteBufferFromS3::TaskTracker::consumeReady()
{
LOG_TEST(log, "waitReady, in queue {}", futures.size());
LOG_TEST(log, "consumeReady, in queue {}", futures.size());
size_t consumed = 0;
/// Exceptions are propagated
auto it = futures.begin();
while (it != futures.end())
{
chassert(it->valid());
if (it->wait_for(std::chrono::seconds(0)) != std::future_status::ready)
std::unique_lock lock(mutex);
for (auto it : finished_futures)
{
++it;
continue;
try
{
it->get();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
futures.erase(it);
}
try
{
it->get();
} catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
it = futures.erase(it);
consumed = finished_futures.size();
finished_futures.clear();
}
LOG_TEST(log, "waitReady ended, in queue {}", futures.size());
LOG_TEST(log, "consumeReady ended, in queue {}", futures.size());
return consumed;
}
void WriteBufferFromS3::TaskTracker::waitAll()
@ -75,6 +78,9 @@ void WriteBufferFromS3::TaskTracker::waitAll()
}
}
futures.clear();
/// no concurrent tasks, no mutex required
finished_futures.clear();
}
void WriteBufferFromS3::TaskTracker::safeWaitAll()
@ -106,25 +112,65 @@ void WriteBufferFromS3::TaskTracker::safeWaitAll()
}
}
futures.clear();
/// no concurrent tasks, no mutex required
finished_futures.clear();
LOG_TEST(log, "safeWaitAll ended, get in queue {}", futures.size());
}
void WriteBufferFromS3::TaskTracker::waitAny()
{
LOG_TEST(log, "waitAny, in queue {}", futures.size());
while (futures.size() > 0 && consumeReady() == 0)
{
std::unique_lock lock(mutex);
cond_var.wait(lock, [&] () { return finished_futures.size() > 0; });
}
LOG_TEST(log, "waitAny ended, in queue {}", futures.size());
}
void WriteBufferFromS3::TaskTracker::add(Callback && func)
{
LOG_TEST(log, "add, in queue {}", futures.size());
futures.emplace_back();
auto future_placeholder = std::prev(futures.end());
FinishedList pre_allocated_finished {future_placeholder};
auto future = scheduler(std::move(func), Priority{});
auto exit_scope = scope_guard(
[&future]()
{
future.wait();
}
);
Callback func_with_notification = [&, func=std::move(func), pre_allocated_finished=std::move(pre_allocated_finished)] () mutable
{
SCOPE_EXIT({
DENY_ALLOCATIONS_IN_SCOPE;
futures.push_back(std::move(future));
std::unique_lock lock(mutex);
finished_futures.splice(finished_futures.end(), pre_allocated_finished, pre_allocated_finished.begin());
cond_var.notify_one();
});
func();
};
*future_placeholder = scheduler(std::move(func_with_notification), Priority{});
exit_scope.release();
LOG_TEST(log, "add ended, in queue {}", futures.size());
waitInFlight();
}
void WriteBufferFromS3::TaskTracker::waitInFlight()
{
if (!max_tasks_inflight)
return;
LOG_TEST(log, "waitInFlight, in queue {}", futures.size());
while (futures.size() >= max_tasks_inflight)
{
waitAny();
}
LOG_TEST(log, "waitInFlight ended, in queue {}", futures.size());
}
bool WriteBufferFromS3::TaskTracker::isAsync() const

View File

@ -6,6 +6,8 @@
#include "WriteBufferFromS3.h"
#include <list>
namespace DB
{
@ -20,22 +22,33 @@ class WriteBufferFromS3::TaskTracker
public:
using Callback = std::function<void()>;
explicit TaskTracker(ThreadPoolCallbackRunner<void> scheduler_);
explicit TaskTracker(ThreadPoolCallbackRunner<void> scheduler_, size_t max_tasks_inflight_ = 0);
~TaskTracker();
static ThreadPoolCallbackRunner<void> syncRunner();
bool isAsync() const;
void waitReady();
size_t consumeReady();
void waitAny();
void waitAll();
void safeWaitAll();
void add(Callback && func);
private:
bool is_async;
void waitInFlight();
const bool is_async;
ThreadPoolCallbackRunner<void> scheduler;
std::list<std::future<void>> futures;
const size_t max_tasks_inflight;
using FutureList = std::list<std::future<void>>;
FutureList futures;
Poco::Logger * log = &Poco::Logger::get("TaskTracker");
std::mutex mutex;
std::condition_variable cond_var;
using FinishedList = std::list<FutureList::iterator>;
FinishedList finished_futures;
};
}

View File

@ -37,6 +37,7 @@ S3Settings::RequestSettings::PartUploadSettings::PartUploadSettings(
max_upload_part_size = config.getUInt64(key + "max_upload_part_size", max_upload_part_size);
upload_part_size_multiply_factor = config.getUInt64(key + "upload_part_size_multiply_factor", upload_part_size_multiply_factor);
upload_part_size_multiply_parts_count_threshold = config.getUInt64(key + "upload_part_size_multiply_parts_count_threshold", upload_part_size_multiply_parts_count_threshold);
s3_max_inflight_parts_for_one_file = config.getUInt64(key + "s3_max_inflight_parts_for_one_file", s3_max_inflight_parts_for_one_file);
max_part_number = config.getUInt64(key + "max_part_number", max_part_number);
max_single_part_upload_size = config.getUInt64(key + "max_single_part_upload_size", max_single_part_upload_size);
max_single_operation_copy_size = config.getUInt64(key + "max_single_operation_copy_size", max_single_operation_copy_size);
@ -55,6 +56,7 @@ S3Settings::RequestSettings::PartUploadSettings::PartUploadSettings(const NamedC
max_single_part_upload_size = collection.getOrDefault<UInt64>("max_single_part_upload_size", max_single_part_upload_size);
upload_part_size_multiply_factor = collection.getOrDefault<UInt64>("upload_part_size_multiply_factor", upload_part_size_multiply_factor);
upload_part_size_multiply_parts_count_threshold = collection.getOrDefault<UInt64>("upload_part_size_multiply_parts_count_threshold", upload_part_size_multiply_parts_count_threshold);
s3_max_inflight_parts_for_one_file = collection.getOrDefault<UInt64>("s3_max_inflight_parts_for_one_file", s3_max_inflight_parts_for_one_file);
/// This configuration is only applicable to s3. Other types of object storage are not applicable or have different meanings.
storage_class_name = collection.getOrDefault<String>("s3_storage_class", storage_class_name);
@ -80,6 +82,9 @@ void S3Settings::RequestSettings::PartUploadSettings::updateFromSettingsImpl(con
if (!if_changed || settings.s3_upload_part_size_multiply_parts_count_threshold.changed)
upload_part_size_multiply_parts_count_threshold = settings.s3_upload_part_size_multiply_parts_count_threshold;
if (!if_changed || settings.s3_max_inflight_parts_for_one_file.changed)
s3_max_inflight_parts_for_one_file = settings.s3_max_inflight_parts_for_one_file;
if (!if_changed || settings.s3_max_single_part_upload_size.changed)
max_single_part_upload_size = settings.s3_max_single_part_upload_size;
}

View File

@ -33,6 +33,7 @@ struct S3Settings
size_t max_upload_part_size = 5ULL * 1024 * 1024 * 1024;
size_t upload_part_size_multiply_factor = 2;
size_t upload_part_size_multiply_parts_count_threshold = 500;
size_t s3_max_inflight_parts_for_one_file = 20;
size_t max_part_number = 10000;
size_t max_single_part_upload_size = 32 * 1024 * 1024;
size_t max_single_operation_copy_size = 5ULL * 1024 * 1024 * 1024;

View File

@ -29,6 +29,8 @@
<secret_access_key>minio123</secret_access_key>
<s3_retry_attempts>1</s3_retry_attempts>
<connect_timeout_ms>20000</connect_timeout_ms>
<request_timeout_ms>30000</request_timeout_ms>
<skip_access_check>true</skip_access_check>
</broken_s3>
<hdd>
<type>local</type>

View File

@ -1,3 +1,14 @@
<clickhouse>
<profiles>
<default>
<s3_check_objects_after_upload> 1 </s3_check_objects_after_upload>
<enable_s3_requests_logging> 1 </enable_s3_requests_logging>
<http_send_timeout>60</http_send_timeout>
<http_receive_timeout>60</http_receive_timeout>
<send_timeout>60</send_timeout>
<receive_timeout>60</receive_timeout>
</default>
</profiles>
<enable_system_unfreeze>true</enable_system_unfreeze>
</clickhouse>

View File

@ -1,4 +1,8 @@
import logging
import sys
import threading
import random
import time
import urllib.parse
import http.server
import socketserver
@ -9,16 +13,66 @@ UPSTREAM_PORT = 9001
class ServerRuntime:
class SlowPut:
def __init__(
self, probability_=None, timeout_=None, minimal_length_=None, count_=None
):
self.probability = probability_ if probability_ is not None else 1
self.timeout = timeout_ if timeout_ is not None else 0.1
self.minimal_length = minimal_length_ if minimal_length_ is not None else 0
self.count = count_ if count_ is not None else 2**32
def __str__(self):
return (
f"probability:{self.probability}"
f" timeout:{self.timeout}"
f" minimal_length:{self.minimal_length}"
f" count:{self.count}"
)
def get_timeout(self, content_length):
if content_length > self.minimal_length:
if self.count > 0:
if (
runtime.slow_put.probability == 1
or random.random() <= runtime.slow_put.probability
):
self.count -= 1
return runtime.slow_put.timeout
return None
def __init__(self):
self.lock = threading.Lock()
self.error_at_put_when_length_bigger = None
self.fake_put_when_length_bigger = None
self.fake_uploads = dict()
self.slow_put = None
def register_fake_upload(self, upload_id, key):
with self.lock:
self.fake_uploads[upload_id] = key
def is_fake_upload(self, upload_id, key):
with self.lock:
if upload_id in self.fake_uploads:
return self.fake_uploads[upload_id] == key
return False
def reset(self):
self.error_at_put_when_length_bigger = None
self.fake_put_when_length_bigger = None
self.fake_uploads = dict()
self.slow_put = None
runtime = ServerRuntime()
def and_then(value, func):
assert callable(func)
return None if value is None else func(value)
class RequestHandler(http.server.BaseHTTPRequestHandler):
def _ok(self):
self.send_response(200)
@ -55,51 +109,124 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
self.send_response(500)
self.send_header("Content-Type", "text/xml")
self.end_headers()
self.wfile.write(data)
self.wfile.write(bytes(data, "UTF-8"))
def _fake_put_ok(self):
self._read_out()
self.send_response(200)
self.send_header("Content-Type", "text/xml")
self.send_header("ETag", "b54357faf0632cce46e942fa68356b38")
self.send_header("Content-Length", 0)
self.end_headers()
def _fake_post_ok(self, path):
self._read_out()
parts = [x for x in path.split("/") if x]
bucket = parts[0]
key = "/".join(parts[1:])
location = "http://Example-Bucket.s3.Region.amazonaws.com/" + path
data = (
'<?xml version="1.0" encoding="UTF-8"?>\n'
"<CompleteMultipartUploadResult>\n"
f"<Location>{location}</Location>\n"
f"<Bucket>{bucket}</Bucket>\n"
f"<Key>{key}</Key>\n"
f'<ETag>"3858f62230ac3c915f300c664312c11f-9"</ETag>\n'
f"</CompleteMultipartUploadResult>\n"
)
self.send_response(200)
self.send_header("Content-Type", "text/xml")
self.send_header("Content-Length", len(data))
self.end_headers()
self.wfile.write(bytes(data, "UTF-8"))
def _mock_settings(self):
parts = urllib.parse.urlsplit(self.path)
path = [x for x in parts.path.split("/") if x]
assert path[0] == "mock_settings", path
if len(path) < 2:
return self._error("_mock_settings: wrong command")
if path[1] == "error_at_put":
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
runtime.error_at_put_when_length_bigger = int(
params.get("when_length_bigger", [1024 * 1024])[0]
)
self._ok()
elif path[1] == "reset":
return self._ok()
if path[1] == "fake_put":
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
runtime.fake_put_when_length_bigger = int(
params.get("when_length_bigger", [1024 * 1024])[0]
)
return self._ok()
if path[1] == "slow_put":
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
runtime.slow_put = ServerRuntime.SlowPut(
minimal_length_=and_then(params.get("minimal_length", [None])[0], int),
probability_=and_then(params.get("probability", [None])[0], float),
timeout_=and_then(params.get("timeout", [None])[0], float),
count_=and_then(params.get("count", [None])[0], int),
)
self.log_message("set slow put %s", runtime.slow_put)
return self._ok()
if path[1] == "reset":
runtime.reset()
self._ok()
else:
self._error("_mock_settings: wrong command")
return self._ok()
return self._error("_mock_settings: wrong command")
def do_GET(self):
if self.path == "/":
self._ping()
elif self.path.startswith("/mock_settings"):
self._mock_settings()
else:
self._redirect()
return self._ping()
if self.path.startswith("/mock_settings"):
return self._mock_settings()
return self._redirect()
def do_PUT(self):
content_length = int(self.headers.get("Content-Length", 0))
if runtime.slow_put is not None:
timeout = runtime.slow_put.get_timeout(content_length)
if timeout is not None:
self.log_message("slow put %s", timeout)
time.sleep(timeout)
if runtime.error_at_put_when_length_bigger is not None:
content_length = int(self.headers.get("Content-Length", 0))
if content_length > runtime.error_at_put_when_length_bigger:
self._error(
b'<?xml version="1.0" encoding="UTF-8"?>'
b"<Error>"
b"<Code>ExpectedError</Code>"
b"<Message>mock s3 injected error</Message>"
b"<RequestId>txfbd566d03042474888193-00608d7537</RequestId>"
b"</Error>"
return self._error(
'<?xml version="1.0" encoding="UTF-8"?>'
"<Error>"
"<Code>ExpectedError</Code>"
"<Message>mock s3 injected error</Message>"
"<RequestId>txfbd566d03042474888193-00608d7537</RequestId>"
"</Error>"
)
else:
self._redirect()
else:
self._redirect()
parts = urllib.parse.urlsplit(self.path)
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
upload_id = params.get("uploadId", [None])[0]
if runtime.fake_put_when_length_bigger is not None and upload_id is not None:
if content_length > runtime.fake_put_when_length_bigger:
runtime.register_fake_upload(upload_id, parts.path)
return self._fake_put_ok()
return self._redirect()
def do_POST(self):
self._redirect()
parts = urllib.parse.urlsplit(self.path)
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
upload_id = params.get("uploadId", [None])[0]
if runtime.is_fake_upload(upload_id, parts.path):
return self._fake_post_ok(parts.path)
return self._redirect()
def do_HEAD(self):
self._redirect()

View File

@ -961,3 +961,110 @@ def test_merge_canceled_by_s3_errors_when_move(cluster, node_name):
check_no_objects_after_drop(
cluster, table_name="merge_canceled_by_s3_errors_when_move", node_name=node_name
)
def value_or(value, default):
assert default is not None
return value if value is not None else default
class BrokenS3:
@staticmethod
def reset(cluster):
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
[
"curl",
"-s",
f"http://localhost:8083/mock_settings/reset",
],
nothrow=True,
)
assert response == "OK"
@staticmethod
def setup_fake_upload(cluster, part_length):
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
[
"curl",
"-s",
f"http://localhost:8083/mock_settings/fake_put?when_length_bigger={part_length}",
],
nothrow=True,
)
assert response == "OK"
@staticmethod
def setup_slow_answers(
cluster, minimal_length=0, timeout=None, probability=None, count=None
):
url = f"http://localhost:8083/" \
f"mock_settings/slow_put" \
f"?minimal_length={minimal_length}"
if timeout is not None:
url += f"&timeout={timeout}"
if probability is not None:
url += f"&probability={probability}"
if count is not None:
url += f"&count={count}"
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
["curl", "-s", url],
nothrow=True,
)
assert response == "OK"
@pytest.fixture(autouse=True, scope="function")
def reset_broken_s3(cluster):
BrokenS3.reset(cluster)
yield
@pytest.mark.parametrize("node_name", ["node"])
@pytest.mark.parametrize(
"in_flight_memory", [(10, 245918115), (5, 156786752), (1, 106426187)]
)
def test_heavy_write_check_mem(cluster, node_name, in_flight_memory):
in_flight = in_flight_memory[0]
memory = in_flight_memory[1]
node = cluster.instances[node_name]
node.query("DROP TABLE IF EXISTS s3_test SYNC")
node.query(
"CREATE TABLE s3_test"
" ("
" key UInt32 CODEC(NONE), value String CODEC(NONE)"
" )"
" ENGINE S3('http://resolver:8083/root/data/test-upload.csv', 'minio', 'minio123', 'CSV')",
)
BrokenS3.setup_fake_upload(cluster, 1000)
BrokenS3.setup_slow_answers(cluster, 10 * 1024 * 1024, timeout=10, count=10)
query_id = f"INSERT_INTO_S3_QUERY_ID_{in_flight}"
node.query(
"INSERT INTO s3_test SELECT number, toString(number) FROM numbers(50000000)"
f" SETTINGS max_memory_usage={2*memory}, s3_max_inflight_parts_for_one_file={in_flight}",
query_id=query_id,
)
node.query("SYSTEM FLUSH LOGS")
result = node.query(
"SELECT memory_usage"
" FROM system.query_log"
f" WHERE query_id='{query_id}'"
" AND type!='QueryStart'"
)
assert int(result) < 1.1 * memory
assert int(result) > 0.9 * memory
check_no_objects_after_drop(cluster, node_name=node_name)