Disable parallel s3 multipart upload for part moves. (#41268)

* Disable parallel s3 multipart upload for part moves.

* Add setting s3_allow_parallel_part_upload
This commit is contained in:
Nikolai Kochetov 2022-09-21 19:10:32 +02:00 committed by GitHub
parent 930d050b55
commit 0414d95878
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 79 additions and 9 deletions

View File

@ -92,6 +92,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \
M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \
M(Bool, s3_check_objects_after_upload, false, "Check each uploaded object to s3 with head request to be sure that upload was successful", 0) \
M(Bool, s3_allow_parallel_part_upload, true, "Use multiple threads for s3 multipart upload. It may lead to slightly higher memory usage", 0) \
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \

View File

@ -24,13 +24,13 @@ bool IDisk::isDirectoryEmpty(const String & path) const
return !iterateDirectory(path)->isValid();
}
void IDisk::copyFile(const String & from_file_path, IDisk & to_disk, const String & to_file_path)
void IDisk::copyFile(const String & from_file_path, IDisk & to_disk, const String & to_file_path, const WriteSettings & settings) /// NOLINT
{
LOG_DEBUG(&Poco::Logger::get("IDisk"), "Copying from {} (path: {}) {} to {} (path: {}) {}.",
getName(), getPath(), from_file_path, to_disk.getName(), to_disk.getPath(), to_file_path);
auto in = readFile(from_file_path);
auto out = to_disk.writeFile(to_file_path);
auto out = to_disk.writeFile(to_file_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, settings);
copyData(*in, *out);
out->finalize();
}
@ -56,15 +56,15 @@ void IDisk::removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_ba
using ResultsCollector = std::vector<std::future<void>>;
void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_path, Executor & exec, ResultsCollector & results, bool copy_root_dir)
void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_path, Executor & exec, ResultsCollector & results, bool copy_root_dir, const WriteSettings & settings)
{
if (from_disk.isFile(from_path))
{
auto result = exec.execute(
[&from_disk, from_path, &to_disk, to_path]()
[&from_disk, from_path, &to_disk, to_path, &settings]()
{
setThreadName("DiskCopier");
from_disk.copyFile(from_path, to_disk, fs::path(to_path) / fileName(from_path));
from_disk.copyFile(from_path, to_disk, fs::path(to_path) / fileName(from_path), settings);
});
results.push_back(std::move(result));
@ -80,7 +80,7 @@ void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_p
}
for (auto it = from_disk.iterateDirectory(from_path); it->isValid(); it->next())
asyncCopy(from_disk, it->path(), to_disk, dest, exec, results, true);
asyncCopy(from_disk, it->path(), to_disk, dest, exec, results, true, settings);
}
}
@ -89,7 +89,12 @@ void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptr<I
auto & exec = to_disk->getExecutor();
ResultsCollector results;
asyncCopy(*this, from_path, *to_disk, to_path, exec, results, copy_root_dir);
WriteSettings settings;
/// Disable parallel write. We already copy in parallel.
/// Avoid high memory usage. See test_s3_zero_copy_ttl/test.py::test_move_and_s3_memory_usage
settings.s3_allow_parallel_part_upload = false;
asyncCopy(*this, from_path, *to_disk, to_path, exec, results, copy_root_dir, settings);
for (auto & result : results)
result.wait();

View File

@ -174,7 +174,11 @@ public:
virtual void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir);
/// Copy file `from_file_path` to `to_file_path` located at `to_disk`.
virtual void copyFile(const String & from_file_path, IDisk & to_disk, const String & to_file_path);
virtual void copyFile( /// NOLINT
const String & from_file_path,
IDisk & to_disk,
const String & to_file_path,
const WriteSettings & settings = {});
/// List files at `path` and add their names to `file_names`
virtual void listFiles(const String & path, std::vector<String> & file_names) const = 0;

View File

@ -230,6 +230,10 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3 doesn't support append to files");
auto settings_ptr = s3_settings.get();
ScheduleFunc scheduler;
if (write_settings.s3_allow_parallel_part_upload)
scheduler = threadPoolCallbackRunner(getThreadPoolWriter());
auto s3_buffer = std::make_unique<WriteBufferFromS3>(
client.get(),
bucket,
@ -237,7 +241,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
settings_ptr->s3_settings,
attributes,
buf_size,
threadPoolCallbackRunner(getThreadPoolWriter()),
std::move(scheduler),
disk_write_settings);
return std::make_unique<WriteIndirectBufferFromRemoteFS>(

View File

@ -15,6 +15,7 @@ struct WriteSettings
bool enable_filesystem_cache_on_write_operations = false;
bool enable_filesystem_cache_log = false;
bool is_file_cache_persistent = false;
bool s3_allow_parallel_part_upload = true;
/// Monitoring
bool for_object_storage = false; // to choose which profile events should be incremented

View File

@ -3443,6 +3443,7 @@ WriteSettings Context::getWriteSettings() const
res.enable_filesystem_cache_on_write_operations = settings.enable_filesystem_cache_on_write_operations;
res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log;
res.s3_allow_parallel_part_upload = settings.s3_allow_parallel_part_upload;
res.remote_throttler = getRemoteWriteThrottler();

View File

@ -15,6 +15,11 @@ node3 = cluster.add_instance(
"node3", main_configs=["configs/s3.xml"], with_minio=True, with_zookeeper=True
)
single_node_cluster = ClickHouseCluster(__file__)
small_node = single_node_cluster.add_instance(
"small_node", main_configs=["configs/s3.xml"], with_minio=True
)
@pytest.fixture(scope="module")
def started_cluster():
@ -92,3 +97,52 @@ def test_ttl_move_and_s3(started_cluster):
print(f"Attempts remaining: {attempt}")
assert counter == 300
@pytest.fixture(scope="module")
def started_single_node_cluster():
try:
single_node_cluster.start()
yield single_node_cluster
finally:
single_node_cluster.shutdown()
def test_move_and_s3_memory_usage(started_single_node_cluster):
if small_node.is_built_with_sanitizer() or small_node.is_debug_build():
pytest.skip("Disabled for debug and sanitizers. Too slow.")
small_node.query(
"CREATE TABLE s3_test_with_ttl (x UInt32, a String codec(NONE), b String codec(NONE), c String codec(NONE), d String codec(NONE), e String codec(NONE)) engine = MergeTree order by x partition by x SETTINGS storage_policy='s3_and_default'"
)
for _ in range(10):
small_node.query(
"insert into s3_test_with_ttl select 0, repeat('a', 100), repeat('b', 100), repeat('c', 100), repeat('d', 100), repeat('e', 100) from zeros(400000) settings max_block_size = 8192, max_insert_block_size=10000000, min_insert_block_size_rows=10000000"
)
# After this, we should have 5 columns per 10 * 100 * 400000 ~ 400 MB; total ~2G data in partition
small_node.query("optimize table s3_test_with_ttl final")
small_node.query("system flush logs")
# Will take memory usage from metric_log.
# It is easier then specifying total memory limit (insert queries can hit this limit).
small_node.query("truncate table system.metric_log")
small_node.query(
"alter table s3_test_with_ttl move partition 0 to volume 'external'",
settings={"send_logs_level": "error"},
)
small_node.query("system flush logs")
max_usage = small_node.query(
"select max(CurrentMetric_MemoryTracking) from system.metric_log"
)
# 3G limit is a big one. However, we can hit it anyway with parallel s3 writes enabled.
# Also actual value can be bigger because of memory drift.
# Increase it a little bit if test fails.
assert int(max_usage) < 3e9
res = small_node.query(
"select * from system.errors where last_error_message like '%Memory limit%' limit 1"
)
assert res == ""