Don't allocate Executor instance on each disk->getExecutor() call.

This commit is contained in:
Pavel Kovalenko 2020-08-11 22:08:32 +03:00
parent bece61b1cc
commit ea46ee74ae
5 changed files with 36 additions and 42 deletions

View File

@ -14,4 +14,29 @@ public:
virtual std::future<void> execute(std::function<void()> task) = 0;
};
/// Executes task synchronously in case when disk doesn't support async operations.
class SyncExecutor : public Executor
{
public:
SyncExecutor() = default;
std::future<void> execute(std::function<void()> task) override
{
auto promise = std::make_shared<std::promise<void>>();
try
{
task();
promise->set_value();
}
catch (...)
{
try
{
promise->set_exception(std::current_exception());
}
catch (...) { }
}
return promise->get_future();
}
};
}

View File

@ -59,10 +59,10 @@ void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_p
void IDisk::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
{
auto exec = to_disk->getExecutor();
auto & exec = to_disk->getExecutor();
ResultsCollector results;
asyncCopy(*this, from_path, *to_disk, to_path, *exec, results);
asyncCopy(*this, from_path, *to_disk, to_path, exec, results);
for (auto & result : results)
result.wait();
@ -70,36 +70,6 @@ void IDisk::copy(const String & from_path, const std::shared_ptr<IDisk> & to_dis
result.get();
}
/// Executes task synchronously in case when disk doesn't support async operations.
class SyncExecutor : public Executor
{
public:
SyncExecutor() = default;
std::future<void> execute(std::function<void()> task) override
{
auto promise = std::make_shared<std::promise<void>>();
try
{
task();
promise->set_value();
}
catch (...)
{
try
{
promise->set_exception(std::current_exception());
}
catch (...) { }
}
return promise->get_future();
}
};
std::unique_ptr<Executor> IDisk::getExecutor()
{
return std::make_unique<SyncExecutor>();
}
void IDisk::truncateFile(const String &, size_t)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate operation is not implemented for disk of type {}", getType());

View File

@ -4,6 +4,7 @@
#include <Core/Types.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
#include <Disks/Executor.h>
#include <memory>
#include <mutex>
@ -29,7 +30,6 @@ using Reservations = std::vector<ReservationPtr>;
class ReadBufferFromFileBase;
class WriteBufferFromFileBase;
class Executor;
/**
* Mode of opening a file for write.
@ -67,6 +67,9 @@ using SpacePtr = std::shared_ptr<Space>;
class IDisk : public Space
{
public:
/// Default constructor.
explicit IDisk(std::unique_ptr<Executor> executor_ = std::make_unique<SyncExecutor>()) : executor(std::move(executor_)) { }
/// Root path for all files stored on the disk.
/// It's not required to be a local filesystem path.
virtual const String & getPath() const = 0;
@ -182,7 +185,9 @@ public:
private:
/// Returns executor to perform asynchronous operations.
virtual std::unique_ptr<Executor> getExecutor();
Executor & getExecutor() { return *executor; }
std::unique_ptr<Executor> executor;
};
using DiskPtr = std::shared_ptr<IDisk>;

View File

@ -483,7 +483,8 @@ DiskS3::DiskS3(
size_t min_upload_part_size_,
size_t min_multi_part_upload_size_,
size_t min_bytes_for_seek_)
: name(std::move(name_))
: IDisk(std::make_unique<AsyncExecutor>())
, name(std::move(name_))
, client(std::move(client_))
, proxy_configuration(std::move(proxy_configuration_))
, bucket(std::move(bucket_))
@ -745,9 +746,4 @@ void DiskS3::setReadOnly(const String & path)
Poco::File(metadata_path + path).setReadOnly(true);
}
std::unique_ptr<Executor> DiskS3::getExecutor()
{
return std::make_unique<AsyncExecutor>();
}
}

View File

@ -105,8 +105,6 @@ public:
private:
bool tryReserve(UInt64 bytes);
std::unique_ptr<Executor> getExecutor() override;
private:
const String name;
std::shared_ptr<Aws::S3::S3Client> client;