Use ThreadPoolCallbackRunnerLocal

This commit is contained in:
Antonio Andelic 2024-05-08 15:05:06 +02:00
parent 16eb12a321
commit 3d5c8db1e0

View File

@ -4,6 +4,7 @@
#include <IO/copyData.h>
#include <Poco/Logger.h>
#include <Interpreters/Context.h>
#include <Common/threadPoolCallbackRunner.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Core/ServerUUID.h>
@ -84,16 +85,12 @@ UInt128 IDisk::getEncryptedFileIV(const String &) const
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "File encryption is not implemented for disk of type {}", getDataSourceDescription().type);
}
using ResultsCollector = std::vector<std::future<void>>;
void asyncCopy(
IDisk & from_disk,
String from_path,
IDisk & to_disk,
String to_path,
ThreadPool & pool,
ResultsCollector & results,
ThreadPoolCallbackRunnerLocal<void> & runner,
bool copy_root_dir,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
@ -101,29 +98,11 @@ void asyncCopy(
{
if (from_disk.isFile(from_path))
{
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
pool.scheduleOrThrowOnError(
[&from_disk, from_path, &to_disk, to_path, &read_settings, &write_settings, promise, thread_group = CurrentThread::getGroup(), &cancellation_hook]()
{
try
{
SCOPE_EXIT_SAFE(if (thread_group) CurrentThread::detachFromGroupIfNotDetached(););
if (thread_group)
CurrentThread::attachToGroup(thread_group);
from_disk.copyFile(from_path, to_disk, fs::path(to_path) / fileName(from_path), read_settings, write_settings, cancellation_hook);
promise->set_value();
}
catch (...)
{
promise->set_exception(std::current_exception());
}
runner(
[&from_disk, from_path, &to_disk, to_path, &read_settings, &write_settings, &cancellation_hook] {
from_disk.copyFile(
from_path, to_disk, fs::path(to_path) / fileName(from_path), read_settings, write_settings, cancellation_hook);
});
results.push_back(std::move(future));
}
else
{
@ -136,7 +115,7 @@ void asyncCopy(
}
for (auto it = from_disk.iterateDirectory(from_path); it->isValid(); it->next())
asyncCopy(from_disk, it->path(), to_disk, dest, pool, results, true, read_settings, write_settings, cancellation_hook);
asyncCopy(from_disk, it->path(), to_disk, dest, runner, true, read_settings, write_settings, cancellation_hook);
}
}
@ -149,21 +128,16 @@ void IDisk::copyThroughBuffers(
WriteSettings write_settings,
const std::function<void()> & cancellation_hook)
{
ResultsCollector results;
SCOPE_EXIT_SAFE(
for (auto & result : results)
result.wait();
for (auto & result : results)
result.get(); /// May rethrow an exception
);
ThreadPoolCallbackRunnerLocal<void> runner(copying_thread_pool, "AsyncCopy");
/// Disable parallel write. We already copy in parallel.
/// Avoid high memory usage. See test_s3_zero_copy_ttl/test.py::test_move_and_s3_memory_usage
write_settings.s3_allow_parallel_part_upload = false;
write_settings.azure_allow_parallel_part_upload = false;
asyncCopy(*this, from_path, *to_disk, to_path, copying_thread_pool, results, copy_root_dir, read_settings, write_settings, cancellation_hook);
asyncCopy(*this, from_path, *to_disk, to_path, runner, copy_root_dir, read_settings, write_settings, cancellation_hook);
runner.waitForAllToFinishAndRethrowFirstError();
}