diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 2db081194f1..5c845b1bd65 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -92,6 +92,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \ M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \ M(Bool, s3_check_objects_after_upload, false, "Check each uploaded object to s3 with head request to be sure that upload was successful", 0) \ + M(Bool, s3_allow_parallel_part_upload, true, "Use multiple threads for s3 multipart upload. It may lead to slightly higher memory usage", 0) \ M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \ M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \ M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \ diff --git a/src/Disks/IDisk.cpp b/src/Disks/IDisk.cpp index 3704a511478..8a6bea2565b 100644 --- a/src/Disks/IDisk.cpp +++ b/src/Disks/IDisk.cpp @@ -24,13 +24,13 @@ bool IDisk::isDirectoryEmpty(const String & path) const return !iterateDirectory(path)->isValid(); } -void IDisk::copyFile(const String & from_file_path, IDisk & to_disk, const String & to_file_path) +void IDisk::copyFile(const String & from_file_path, IDisk & to_disk, const String & to_file_path, const WriteSettings & settings) /// NOLINT { LOG_DEBUG(&Poco::Logger::get("IDisk"), "Copying from {} (path: {}) {} to {} (path: {}) {}.", getName(), getPath(), from_file_path, to_disk.getName(), to_disk.getPath(), to_file_path); auto in = readFile(from_file_path); - auto out = to_disk.writeFile(to_file_path); + auto out = to_disk.writeFile(to_file_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, settings); copyData(*in, *out); out->finalize(); } @@ -56,15 +56,15 @@ void IDisk::removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_ba using ResultsCollector = std::vector>; -void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_path, Executor & exec, ResultsCollector & results, bool copy_root_dir) +void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_path, Executor & exec, ResultsCollector & results, bool copy_root_dir, const WriteSettings & settings) { if (from_disk.isFile(from_path)) { auto result = exec.execute( - [&from_disk, from_path, &to_disk, to_path]() + [&from_disk, from_path, &to_disk, to_path, &settings]() { setThreadName("DiskCopier"); - from_disk.copyFile(from_path, to_disk, fs::path(to_path) / fileName(from_path)); + from_disk.copyFile(from_path, to_disk, fs::path(to_path) / fileName(from_path), settings); }); results.push_back(std::move(result)); @@ -80,7 +80,7 @@ void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_p } for (auto it = from_disk.iterateDirectory(from_path); it->isValid(); it->next()) - asyncCopy(from_disk, it->path(), to_disk, dest, exec, results, true); + asyncCopy(from_disk, it->path(), to_disk, dest, exec, results, true, settings); } } @@ -89,7 +89,12 @@ void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptrgetExecutor(); ResultsCollector results; - asyncCopy(*this, from_path, *to_disk, to_path, exec, results, copy_root_dir); + WriteSettings settings; + /// Disable parallel write. We already copy in parallel. + /// Avoid high memory usage. See test_s3_zero_copy_ttl/test.py::test_move_and_s3_memory_usage + settings.s3_allow_parallel_part_upload = false; + + asyncCopy(*this, from_path, *to_disk, to_path, exec, results, copy_root_dir, settings); for (auto & result : results) result.wait(); diff --git a/src/Disks/IDisk.h b/src/Disks/IDisk.h index de7b9181533..c9d7808ddf2 100644 --- a/src/Disks/IDisk.h +++ b/src/Disks/IDisk.h @@ -174,7 +174,11 @@ public: virtual void copyDirectoryContent(const String & from_dir, const std::shared_ptr & to_disk, const String & to_dir); /// Copy file `from_file_path` to `to_file_path` located at `to_disk`. - virtual void copyFile(const String & from_file_path, IDisk & to_disk, const String & to_file_path); + virtual void copyFile( /// NOLINT + const String & from_file_path, + IDisk & to_disk, + const String & to_file_path, + const WriteSettings & settings = {}); /// List files at `path` and add their names to `file_names` virtual void listFiles(const String & path, std::vector & file_names) const = 0; diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp index c2131a51b74..bba8f21e9d5 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp @@ -230,6 +230,10 @@ std::unique_ptr S3ObjectStorage::writeObject( /// NOLIN throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3 doesn't support append to files"); auto settings_ptr = s3_settings.get(); + ScheduleFunc scheduler; + if (write_settings.s3_allow_parallel_part_upload) + scheduler = threadPoolCallbackRunner(getThreadPoolWriter()); + auto s3_buffer = std::make_unique( client.get(), bucket, @@ -237,7 +241,7 @@ std::unique_ptr S3ObjectStorage::writeObject( /// NOLIN settings_ptr->s3_settings, attributes, buf_size, - threadPoolCallbackRunner(getThreadPoolWriter()), + std::move(scheduler), disk_write_settings); return std::make_unique( diff --git a/src/IO/WriteSettings.h b/src/IO/WriteSettings.h index 38a706997cf..a1f5b23fb97 100644 --- a/src/IO/WriteSettings.h +++ b/src/IO/WriteSettings.h @@ -15,6 +15,7 @@ struct WriteSettings bool enable_filesystem_cache_on_write_operations = false; bool enable_filesystem_cache_log = false; bool is_file_cache_persistent = false; + bool s3_allow_parallel_part_upload = true; /// Monitoring bool for_object_storage = false; // to choose which profile events should be incremented diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index d69878e6af0..70ec13b01f8 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -3443,6 +3443,7 @@ WriteSettings Context::getWriteSettings() const res.enable_filesystem_cache_on_write_operations = settings.enable_filesystem_cache_on_write_operations; res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log; + res.s3_allow_parallel_part_upload = settings.s3_allow_parallel_part_upload; res.remote_throttler = getRemoteWriteThrottler(); diff --git a/tests/integration/test_s3_zero_copy_ttl/test.py b/tests/integration/test_s3_zero_copy_ttl/test.py index 9a782aacef6..843c216ce72 100644 --- a/tests/integration/test_s3_zero_copy_ttl/test.py +++ b/tests/integration/test_s3_zero_copy_ttl/test.py @@ -15,6 +15,11 @@ node3 = cluster.add_instance( "node3", main_configs=["configs/s3.xml"], with_minio=True, with_zookeeper=True ) +single_node_cluster = ClickHouseCluster(__file__) +small_node = single_node_cluster.add_instance( + "small_node", main_configs=["configs/s3.xml"], with_minio=True +) + @pytest.fixture(scope="module") def started_cluster(): @@ -92,3 +97,52 @@ def test_ttl_move_and_s3(started_cluster): print(f"Attempts remaining: {attempt}") assert counter == 300 + + +@pytest.fixture(scope="module") +def started_single_node_cluster(): + try: + single_node_cluster.start() + + yield single_node_cluster + finally: + single_node_cluster.shutdown() + + +def test_move_and_s3_memory_usage(started_single_node_cluster): + if small_node.is_built_with_sanitizer() or small_node.is_debug_build(): + pytest.skip("Disabled for debug and sanitizers. Too slow.") + + small_node.query( + "CREATE TABLE s3_test_with_ttl (x UInt32, a String codec(NONE), b String codec(NONE), c String codec(NONE), d String codec(NONE), e String codec(NONE)) engine = MergeTree order by x partition by x SETTINGS storage_policy='s3_and_default'" + ) + + for _ in range(10): + small_node.query( + "insert into s3_test_with_ttl select 0, repeat('a', 100), repeat('b', 100), repeat('c', 100), repeat('d', 100), repeat('e', 100) from zeros(400000) settings max_block_size = 8192, max_insert_block_size=10000000, min_insert_block_size_rows=10000000" + ) + + # After this, we should have 5 columns per 10 * 100 * 400000 ~ 400 MB; total ~2G data in partition + small_node.query("optimize table s3_test_with_ttl final") + + small_node.query("system flush logs") + # Will take memory usage from metric_log. + # It is easier then specifying total memory limit (insert queries can hit this limit). + small_node.query("truncate table system.metric_log") + + small_node.query( + "alter table s3_test_with_ttl move partition 0 to volume 'external'", + settings={"send_logs_level": "error"}, + ) + small_node.query("system flush logs") + max_usage = small_node.query( + "select max(CurrentMetric_MemoryTracking) from system.metric_log" + ) + # 3G limit is a big one. However, we can hit it anyway with parallel s3 writes enabled. + # Also actual value can be bigger because of memory drift. + # Increase it a little bit if test fails. + assert int(max_usage) < 3e9 + res = small_node.query( + "select * from system.errors where last_error_message like '%Memory limit%' limit 1" + ) + assert res == ""