mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-01 12:01:58 +00:00
Merge pull request #53338 from azat/throttling-fixes
Fix IO throttling during copying whole directories
This commit is contained in:
commit
cf5ea46713
@ -59,7 +59,7 @@ public:
|
|||||||
String relative_path_from = validatePathAndGetAsRelative(path_from);
|
String relative_path_from = validatePathAndGetAsRelative(path_from);
|
||||||
String relative_path_to = validatePathAndGetAsRelative(path_to);
|
String relative_path_to = validatePathAndGetAsRelative(path_to);
|
||||||
|
|
||||||
disk_from->copyDirectoryContent(relative_path_from, disk_to, relative_path_to);
|
disk_from->copyDirectoryContent(relative_path_from, disk_to, relative_path_to, /* settings= */ {});
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -324,7 +324,7 @@ ReservationPtr DiskEncrypted::reserve(UInt64 bytes)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir)
|
void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const WriteSettings & settings)
|
||||||
{
|
{
|
||||||
/// Check if we can copy the file without deciphering.
|
/// Check if we can copy the file without deciphering.
|
||||||
if (isSameDiskType(*this, *to_disk))
|
if (isSameDiskType(*this, *to_disk))
|
||||||
@ -340,14 +340,14 @@ void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::sha
|
|||||||
auto wrapped_from_path = wrappedPath(from_dir);
|
auto wrapped_from_path = wrappedPath(from_dir);
|
||||||
auto to_delegate = to_disk_enc->delegate;
|
auto to_delegate = to_disk_enc->delegate;
|
||||||
auto wrapped_to_path = to_disk_enc->wrappedPath(to_dir);
|
auto wrapped_to_path = to_disk_enc->wrappedPath(to_dir);
|
||||||
delegate->copyDirectoryContent(wrapped_from_path, to_delegate, wrapped_to_path);
|
delegate->copyDirectoryContent(wrapped_from_path, to_delegate, wrapped_to_path, settings);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Copy the file through buffers with deciphering.
|
/// Copy the file through buffers with deciphering.
|
||||||
IDisk::copyDirectoryContent(from_dir, to_disk, to_dir);
|
IDisk::copyDirectoryContent(from_dir, to_disk, to_dir, settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<ReadBufferFromFileBase> DiskEncrypted::readFile(
|
std::unique_ptr<ReadBufferFromFileBase> DiskEncrypted::readFile(
|
||||||
|
@ -112,7 +112,7 @@ public:
|
|||||||
delegate->listFiles(wrapped_path, file_names);
|
delegate->listFiles(wrapped_path, file_names);
|
||||||
}
|
}
|
||||||
|
|
||||||
void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir) override;
|
void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const WriteSettings & settings) override;
|
||||||
|
|
||||||
std::unique_ptr<ReadBufferFromFileBase> readFile(
|
std::unique_ptr<ReadBufferFromFileBase> readFile(
|
||||||
const String & path,
|
const String & path,
|
||||||
|
@ -53,11 +53,11 @@ String DiskEncryptedSettings::findKeyByFingerprint(UInt128 key_fingerprint, cons
|
|||||||
return it->second;
|
return it->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
void DiskEncryptedTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path)
|
void DiskEncryptedTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings)
|
||||||
{
|
{
|
||||||
auto wrapped_from_path = wrappedPath(from_file_path);
|
auto wrapped_from_path = wrappedPath(from_file_path);
|
||||||
auto wrapped_to_path = wrappedPath(to_file_path);
|
auto wrapped_to_path = wrappedPath(to_file_path);
|
||||||
delegate_transaction->copyFile(wrapped_from_path, wrapped_to_path);
|
delegate_transaction->copyFile(wrapped_from_path, wrapped_to_path, settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<WriteBufferFromFileBase> DiskEncryptedTransaction::writeFile( // NOLINT
|
std::unique_ptr<WriteBufferFromFileBase> DiskEncryptedTransaction::writeFile( // NOLINT
|
||||||
|
@ -116,7 +116,7 @@ public:
|
|||||||
/// but it's impossible to implement correctly in transactions because other disk can
|
/// but it's impossible to implement correctly in transactions because other disk can
|
||||||
/// use different metadata storage.
|
/// use different metadata storage.
|
||||||
/// TODO: maybe remove it at all, we don't want copies
|
/// TODO: maybe remove it at all, we don't want copies
|
||||||
void copyFile(const std::string & from_file_path, const std::string & to_file_path) override;
|
void copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings) override;
|
||||||
|
|
||||||
/// Open the file for write and return WriteBufferFromFileBase object.
|
/// Open the file for write and return WriteBufferFromFileBase object.
|
||||||
std::unique_ptr<WriteBufferFromFileBase> writeFile( /// NOLINT
|
std::unique_ptr<WriteBufferFromFileBase> writeFile( /// NOLINT
|
||||||
|
@ -432,12 +432,13 @@ bool inline isSameDiskType(const IDisk & one, const IDisk & another)
|
|||||||
return typeid(one) == typeid(another);
|
return typeid(one) == typeid(another);
|
||||||
}
|
}
|
||||||
|
|
||||||
void DiskLocal::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir)
|
void DiskLocal::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const WriteSettings & settings)
|
||||||
{
|
{
|
||||||
if (isSameDiskType(*this, *to_disk))
|
/// If throttling was configured we cannot use copying directly.
|
||||||
|
if (isSameDiskType(*this, *to_disk) && !settings.local_throttler)
|
||||||
fs::copy(fs::path(disk_path) / from_dir, fs::path(to_disk->getPath()) / to_dir, fs::copy_options::recursive | fs::copy_options::overwrite_existing); /// Use more optimal way.
|
fs::copy(fs::path(disk_path) / from_dir, fs::path(to_disk->getPath()) / to_dir, fs::copy_options::recursive | fs::copy_options::overwrite_existing); /// Use more optimal way.
|
||||||
else
|
else
|
||||||
IDisk::copyDirectoryContent(from_dir, to_disk, to_dir);
|
IDisk::copyDirectoryContent(from_dir, to_disk, to_dir, settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncGuardPtr DiskLocal::getDirectorySyncGuard(const String & path) const
|
SyncGuardPtr DiskLocal::getDirectorySyncGuard(const String & path) const
|
||||||
|
@ -65,7 +65,7 @@ public:
|
|||||||
|
|
||||||
void replaceFile(const String & from_path, const String & to_path) override;
|
void replaceFile(const String & from_path, const String & to_path) override;
|
||||||
|
|
||||||
void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir) override;
|
void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const WriteSettings & settings) override;
|
||||||
|
|
||||||
void listFiles(const String & path, std::vector<String> & file_names) const override;
|
void listFiles(const String & path, std::vector<String> & file_names) const override;
|
||||||
|
|
||||||
|
@ -54,9 +54,9 @@ public:
|
|||||||
disk.replaceFile(from_path, to_path);
|
disk.replaceFile(from_path, to_path);
|
||||||
}
|
}
|
||||||
|
|
||||||
void copyFile(const std::string & from_file_path, const std::string & to_file_path) override
|
void copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings) override
|
||||||
{
|
{
|
||||||
disk.copyFile(from_file_path, disk, to_file_path);
|
disk.copyFile(from_file_path, disk, to_file_path, settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<WriteBufferFromFileBase> writeFile( /// NOLINT
|
std::unique_ptr<WriteBufferFromFileBase> writeFile( /// NOLINT
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
#include <IO/WriteBufferFromFileBase.h>
|
#include <IO/WriteBufferFromFileBase.h>
|
||||||
#include <IO/copyData.h>
|
#include <IO/copyData.h>
|
||||||
#include <Poco/Logger.h>
|
#include <Poco/Logger.h>
|
||||||
|
#include <Interpreters/Context.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
#include <Common/setThreadName.h>
|
#include <Common/setThreadName.h>
|
||||||
#include <Core/ServerUUID.h>
|
#include <Core/ServerUUID.h>
|
||||||
@ -122,11 +123,10 @@ void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_p
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path, bool copy_root_dir)
|
void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path, bool copy_root_dir, WriteSettings settings)
|
||||||
{
|
{
|
||||||
ResultsCollector results;
|
ResultsCollector results;
|
||||||
|
|
||||||
WriteSettings settings;
|
|
||||||
/// Disable parallel write. We already copy in parallel.
|
/// Disable parallel write. We already copy in parallel.
|
||||||
/// Avoid high memory usage. See test_s3_zero_copy_ttl/test.py::test_move_and_s3_memory_usage
|
/// Avoid high memory usage. See test_s3_zero_copy_ttl/test.py::test_move_and_s3_memory_usage
|
||||||
settings.s3_allow_parallel_part_upload = false;
|
settings.s3_allow_parallel_part_upload = false;
|
||||||
@ -140,12 +140,12 @@ void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptr<I
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void IDisk::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir)
|
void IDisk::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const WriteSettings & settings)
|
||||||
{
|
{
|
||||||
if (!to_disk->exists(to_dir))
|
if (!to_disk->exists(to_dir))
|
||||||
to_disk->createDirectories(to_dir);
|
to_disk->createDirectories(to_dir);
|
||||||
|
|
||||||
copyThroughBuffers(from_dir, to_disk, to_dir, /* copy_root_dir */ false);
|
copyThroughBuffers(from_dir, to_disk, to_dir, /* copy_root_dir= */ false, settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
void IDisk::truncateFile(const String &, size_t)
|
void IDisk::truncateFile(const String &, size_t)
|
||||||
|
@ -193,7 +193,7 @@ public:
|
|||||||
virtual void replaceFile(const String & from_path, const String & to_path) = 0;
|
virtual void replaceFile(const String & from_path, const String & to_path) = 0;
|
||||||
|
|
||||||
/// Recursively copy files from from_dir to to_dir. Create to_dir if not exists.
|
/// Recursively copy files from from_dir to to_dir. Create to_dir if not exists.
|
||||||
virtual void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir);
|
virtual void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const WriteSettings & settings);
|
||||||
|
|
||||||
/// Copy file `from_file_path` to `to_file_path` located at `to_disk`.
|
/// Copy file `from_file_path` to `to_file_path` located at `to_disk`.
|
||||||
virtual void copyFile( /// NOLINT
|
virtual void copyFile( /// NOLINT
|
||||||
@ -470,7 +470,7 @@ protected:
|
|||||||
/// Base implementation of the function copy().
|
/// Base implementation of the function copy().
|
||||||
/// It just opens two files, reads data by portions from the first file, and writes it to the second one.
|
/// It just opens two files, reads data by portions from the first file, and writes it to the second one.
|
||||||
/// A derived class may override copy() to provide a faster implementation.
|
/// A derived class may override copy() to provide a faster implementation.
|
||||||
void copyThroughBuffers(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path, bool copy_root_dir = true);
|
void copyThroughBuffers(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path, bool copy_root_dir, WriteSettings settings);
|
||||||
|
|
||||||
virtual void checkAccessImpl(const String & path);
|
virtual void checkAccessImpl(const String & path);
|
||||||
|
|
||||||
|
@ -59,7 +59,7 @@ public:
|
|||||||
/// but it's impossible to implement correctly in transactions because other disk can
|
/// but it's impossible to implement correctly in transactions because other disk can
|
||||||
/// use different metadata storage.
|
/// use different metadata storage.
|
||||||
/// TODO: maybe remove it at all, we don't want copies
|
/// TODO: maybe remove it at all, we don't want copies
|
||||||
virtual void copyFile(const std::string & from_file_path, const std::string & to_file_path) = 0;
|
virtual void copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings = {}) = 0;
|
||||||
|
|
||||||
/// Open the file for write and return WriteBufferFromFileBase object.
|
/// Open the file for write and return WriteBufferFromFileBase object.
|
||||||
virtual std::unique_ptr<WriteBufferFromFileBase> writeFile( /// NOLINT
|
virtual std::unique_ptr<WriteBufferFromFileBase> writeFile( /// NOLINT
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
#include <ranges>
|
#include <ranges>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
#include <Common/Exception.h>
|
#include <Common/Exception.h>
|
||||||
|
#include <base/defines.h>
|
||||||
|
|
||||||
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
|
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
|
||||||
|
|
||||||
@ -769,8 +770,11 @@ void DiskObjectStorageTransaction::createFile(const std::string & path)
|
|||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
void DiskObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path)
|
void DiskObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings)
|
||||||
{
|
{
|
||||||
|
/// NOTE: For native copy we can ignore throttling, so no need to use WriteSettings
|
||||||
|
UNUSED(settings);
|
||||||
|
|
||||||
operations_to_execute.emplace_back(
|
operations_to_execute.emplace_back(
|
||||||
std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, from_file_path, to_file_path));
|
std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, from_file_path, to_file_path));
|
||||||
}
|
}
|
||||||
|
@ -86,7 +86,7 @@ public:
|
|||||||
|
|
||||||
void createFile(const String & path) override;
|
void createFile(const String & path) override;
|
||||||
|
|
||||||
void copyFile(const std::string & from_file_path, const std::string & to_file_path) override;
|
void copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings) override;
|
||||||
|
|
||||||
/// writeFile is a difficult function for transactions.
|
/// writeFile is a difficult function for transactions.
|
||||||
/// Now it's almost noop because metadata added to transaction in finalize method
|
/// Now it's almost noop because metadata added to transaction in finalize method
|
||||||
|
@ -416,6 +416,7 @@ void DataPartStorageOnDiskBase::backup(
|
|||||||
MutableDataPartStoragePtr DataPartStorageOnDiskBase::freeze(
|
MutableDataPartStoragePtr DataPartStorageOnDiskBase::freeze(
|
||||||
const std::string & to,
|
const std::string & to,
|
||||||
const std::string & dir_path,
|
const std::string & dir_path,
|
||||||
|
const WriteSettings & settings,
|
||||||
std::function<void(const DiskPtr &)> save_metadata_callback,
|
std::function<void(const DiskPtr &)> save_metadata_callback,
|
||||||
const ClonePartParams & params) const
|
const ClonePartParams & params) const
|
||||||
{
|
{
|
||||||
@ -425,8 +426,16 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::freeze(
|
|||||||
else
|
else
|
||||||
disk->createDirectories(to);
|
disk->createDirectories(to);
|
||||||
|
|
||||||
localBackup(disk, getRelativePath(), fs::path(to) / dir_path, params.make_source_readonly, {}, params.copy_instead_of_hardlink,
|
localBackup(
|
||||||
params.files_to_copy_instead_of_hardlinks, params.external_transaction);
|
disk,
|
||||||
|
getRelativePath(),
|
||||||
|
fs::path(to) / dir_path,
|
||||||
|
settings,
|
||||||
|
params.make_source_readonly,
|
||||||
|
/* max_level= */ {},
|
||||||
|
params.copy_instead_of_hardlink,
|
||||||
|
params.files_to_copy_instead_of_hardlinks,
|
||||||
|
params.external_transaction);
|
||||||
|
|
||||||
if (save_metadata_callback)
|
if (save_metadata_callback)
|
||||||
save_metadata_callback(disk);
|
save_metadata_callback(disk);
|
||||||
@ -457,6 +466,7 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::clonePart(
|
|||||||
const std::string & to,
|
const std::string & to,
|
||||||
const std::string & dir_path,
|
const std::string & dir_path,
|
||||||
const DiskPtr & dst_disk,
|
const DiskPtr & dst_disk,
|
||||||
|
const WriteSettings & write_settings,
|
||||||
Poco::Logger * log) const
|
Poco::Logger * log) const
|
||||||
{
|
{
|
||||||
String path_to_clone = fs::path(to) / dir_path / "";
|
String path_to_clone = fs::path(to) / dir_path / "";
|
||||||
@ -472,7 +482,7 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::clonePart(
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
dst_disk->createDirectories(to);
|
dst_disk->createDirectories(to);
|
||||||
src_disk->copyDirectoryContent(getRelativePath(), dst_disk, path_to_clone);
|
src_disk->copyDirectoryContent(getRelativePath(), dst_disk, path_to_clone, write_settings);
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
|
@ -63,6 +63,7 @@ public:
|
|||||||
MutableDataPartStoragePtr freeze(
|
MutableDataPartStoragePtr freeze(
|
||||||
const std::string & to,
|
const std::string & to,
|
||||||
const std::string & dir_path,
|
const std::string & dir_path,
|
||||||
|
const WriteSettings & settings,
|
||||||
std::function<void(const DiskPtr &)> save_metadata_callback,
|
std::function<void(const DiskPtr &)> save_metadata_callback,
|
||||||
const ClonePartParams & params) const override;
|
const ClonePartParams & params) const override;
|
||||||
|
|
||||||
@ -70,6 +71,7 @@ public:
|
|||||||
const std::string & to,
|
const std::string & to,
|
||||||
const std::string & dir_path,
|
const std::string & dir_path,
|
||||||
const DiskPtr & dst_disk,
|
const DiskPtr & dst_disk,
|
||||||
|
const WriteSettings & write_settings,
|
||||||
Poco::Logger * log) const override;
|
Poco::Logger * log) const override;
|
||||||
|
|
||||||
void rename(
|
void rename(
|
||||||
|
@ -252,6 +252,7 @@ public:
|
|||||||
virtual std::shared_ptr<IDataPartStorage> freeze(
|
virtual std::shared_ptr<IDataPartStorage> freeze(
|
||||||
const std::string & to,
|
const std::string & to,
|
||||||
const std::string & dir_path,
|
const std::string & dir_path,
|
||||||
|
const WriteSettings & settings,
|
||||||
std::function<void(const DiskPtr &)> save_metadata_callback,
|
std::function<void(const DiskPtr &)> save_metadata_callback,
|
||||||
const ClonePartParams & params) const = 0;
|
const ClonePartParams & params) const = 0;
|
||||||
|
|
||||||
@ -260,6 +261,7 @@ public:
|
|||||||
const std::string & to,
|
const std::string & to,
|
||||||
const std::string & dir_path,
|
const std::string & dir_path,
|
||||||
const DiskPtr & disk,
|
const DiskPtr & disk,
|
||||||
|
const WriteSettings & write_settings,
|
||||||
Poco::Logger * log) const = 0;
|
Poco::Logger * log) const = 0;
|
||||||
|
|
||||||
/// Change part's root. from_root should be a prefix path of current root path.
|
/// Change part's root. from_root should be a prefix path of current root path.
|
||||||
|
@ -1802,11 +1802,12 @@ DataPartStoragePtr IMergeTreeDataPart::makeCloneInDetached(const String & prefix
|
|||||||
return getDataPartStorage().freeze(
|
return getDataPartStorage().freeze(
|
||||||
storage.relative_data_path,
|
storage.relative_data_path,
|
||||||
*maybe_path_in_detached,
|
*maybe_path_in_detached,
|
||||||
|
Context::getGlobalContextInstance()->getWriteSettings(),
|
||||||
/* save_metadata_callback= */ {},
|
/* save_metadata_callback= */ {},
|
||||||
params);
|
params);
|
||||||
}
|
}
|
||||||
|
|
||||||
MutableDataPartStoragePtr IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & disk, const String & directory_name) const
|
MutableDataPartStoragePtr IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & disk, const String & directory_name, const WriteSettings & write_settings) const
|
||||||
{
|
{
|
||||||
assertOnDisk();
|
assertOnDisk();
|
||||||
|
|
||||||
@ -1816,7 +1817,7 @@ MutableDataPartStoragePtr IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & di
|
|||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not clone data part {} to empty directory.", name);
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not clone data part {} to empty directory.", name);
|
||||||
|
|
||||||
String path_to_clone = fs::path(storage.relative_data_path) / directory_name / "";
|
String path_to_clone = fs::path(storage.relative_data_path) / directory_name / "";
|
||||||
return getDataPartStorage().clonePart(path_to_clone, getDataPartStorage().getPartDirectory(), disk, storage.log);
|
return getDataPartStorage().clonePart(path_to_clone, getDataPartStorage().getPartDirectory(), disk, write_settings, storage.log);
|
||||||
}
|
}
|
||||||
|
|
||||||
UInt64 IMergeTreeDataPart::getIndexSizeFromFile() const
|
UInt64 IMergeTreeDataPart::getIndexSizeFromFile() const
|
||||||
|
@ -377,7 +377,7 @@ public:
|
|||||||
const DiskTransactionPtr & disk_transaction) const;
|
const DiskTransactionPtr & disk_transaction) const;
|
||||||
|
|
||||||
/// Makes full clone of part in specified subdirectory (relative to storage data directory, e.g. "detached") on another disk
|
/// Makes full clone of part in specified subdirectory (relative to storage data directory, e.g. "detached") on another disk
|
||||||
MutableDataPartStoragePtr makeCloneOnDisk(const DiskPtr & disk, const String & directory_name) const;
|
MutableDataPartStoragePtr makeCloneOnDisk(const DiskPtr & disk, const String & directory_name, const WriteSettings & write_settings) const;
|
||||||
|
|
||||||
/// Checks that .bin and .mrk files exist.
|
/// Checks that .bin and .mrk files exist.
|
||||||
///
|
///
|
||||||
|
@ -4985,7 +4985,7 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String &
|
|||||||
throw Exception(ErrorCodes::UNKNOWN_DISK, "All parts of partition '{}' are already on disk '{}'", partition_id, disk->getName());
|
throw Exception(ErrorCodes::UNKNOWN_DISK, "All parts of partition '{}' are already on disk '{}'", partition_id, disk->getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
MovePartsOutcome moves_outcome = movePartsToSpace(parts, std::static_pointer_cast<Space>(disk));
|
MovePartsOutcome moves_outcome = movePartsToSpace(parts, std::static_pointer_cast<Space>(disk), local_context->getWriteSettings());
|
||||||
switch (moves_outcome)
|
switch (moves_outcome)
|
||||||
{
|
{
|
||||||
case MovePartsOutcome::MovesAreCancelled:
|
case MovePartsOutcome::MovesAreCancelled:
|
||||||
@ -5048,7 +5048,7 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
|
|||||||
throw Exception(ErrorCodes::UNKNOWN_DISK, "All parts of partition '{}' are already on volume '{}'", partition_id, volume->getName());
|
throw Exception(ErrorCodes::UNKNOWN_DISK, "All parts of partition '{}' are already on volume '{}'", partition_id, volume->getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
MovePartsOutcome moves_outcome = movePartsToSpace(parts, std::static_pointer_cast<Space>(volume));
|
MovePartsOutcome moves_outcome = movePartsToSpace(parts, std::static_pointer_cast<Space>(volume), local_context->getWriteSettings());
|
||||||
switch (moves_outcome)
|
switch (moves_outcome)
|
||||||
{
|
{
|
||||||
case MovePartsOutcome::MovesAreCancelled:
|
case MovePartsOutcome::MovesAreCancelled:
|
||||||
@ -7401,7 +7401,8 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
|
|||||||
const String & tmp_part_prefix,
|
const String & tmp_part_prefix,
|
||||||
const MergeTreePartInfo & dst_part_info,
|
const MergeTreePartInfo & dst_part_info,
|
||||||
const StorageMetadataPtr & metadata_snapshot,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
const IDataPartStorage::ClonePartParams & params)
|
const IDataPartStorage::ClonePartParams & params,
|
||||||
|
const WriteSettings & write_settings)
|
||||||
{
|
{
|
||||||
/// Check that the storage policy contains the disk where the src_part is located.
|
/// Check that the storage policy contains the disk where the src_part is located.
|
||||||
bool does_storage_policy_allow_same_disk = false;
|
bool does_storage_policy_allow_same_disk = false;
|
||||||
@ -7458,6 +7459,7 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
|
|||||||
auto dst_part_storage = src_part_storage->freeze(
|
auto dst_part_storage = src_part_storage->freeze(
|
||||||
relative_data_path,
|
relative_data_path,
|
||||||
tmp_dst_part_name,
|
tmp_dst_part_name,
|
||||||
|
write_settings,
|
||||||
/* save_metadata_callback= */ {},
|
/* save_metadata_callback= */ {},
|
||||||
params);
|
params);
|
||||||
|
|
||||||
@ -7715,6 +7717,7 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(
|
|||||||
auto new_storage = data_part_storage->freeze(
|
auto new_storage = data_part_storage->freeze(
|
||||||
backup_part_path,
|
backup_part_path,
|
||||||
part->getDataPartStorage().getPartDirectory(),
|
part->getDataPartStorage().getPartDirectory(),
|
||||||
|
local_context->getWriteSettings(),
|
||||||
callback,
|
callback,
|
||||||
params);
|
params);
|
||||||
|
|
||||||
@ -7913,7 +7916,8 @@ bool MergeTreeData::scheduleDataMovingJob(BackgroundJobsAssignee & assignee)
|
|||||||
assignee.scheduleMoveTask(std::make_shared<ExecutableLambdaAdapter>(
|
assignee.scheduleMoveTask(std::make_shared<ExecutableLambdaAdapter>(
|
||||||
[this, moving_tagger] () mutable
|
[this, moving_tagger] () mutable
|
||||||
{
|
{
|
||||||
return moveParts(moving_tagger) == MovePartsOutcome::PartsMoved;
|
WriteSettings write_settings = Context::getGlobalContextInstance()->getWriteSettings();
|
||||||
|
return moveParts(moving_tagger, write_settings, /* wait_for_move_if_zero_copy= */ false) == MovePartsOutcome::PartsMoved;
|
||||||
}, moves_assignee_trigger, getStorageID()));
|
}, moves_assignee_trigger, getStorageID()));
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -7928,7 +7932,7 @@ bool MergeTreeData::areBackgroundMovesNeeded() const
|
|||||||
return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->getDisks().size() > 1;
|
return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->getDisks().size() > 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
MovePartsOutcome MergeTreeData::movePartsToSpace(const DataPartsVector & parts, SpacePtr space)
|
MovePartsOutcome MergeTreeData::movePartsToSpace(const DataPartsVector & parts, SpacePtr space, const WriteSettings & write_settings)
|
||||||
{
|
{
|
||||||
if (parts_mover.moves_blocker.isCancelled())
|
if (parts_mover.moves_blocker.isCancelled())
|
||||||
return MovePartsOutcome::MovesAreCancelled;
|
return MovePartsOutcome::MovesAreCancelled;
|
||||||
@ -7937,7 +7941,7 @@ MovePartsOutcome MergeTreeData::movePartsToSpace(const DataPartsVector & parts,
|
|||||||
if (moving_tagger->parts_to_move.empty())
|
if (moving_tagger->parts_to_move.empty())
|
||||||
return MovePartsOutcome::NothingToMove;
|
return MovePartsOutcome::NothingToMove;
|
||||||
|
|
||||||
return moveParts(moving_tagger, true);
|
return moveParts(moving_tagger, write_settings, /* wait_for_move_if_zero_copy= */ true);
|
||||||
}
|
}
|
||||||
|
|
||||||
MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::selectPartsForMove()
|
MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::selectPartsForMove()
|
||||||
@ -7992,7 +7996,7 @@ MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::checkPartsForMove(co
|
|||||||
return std::make_shared<CurrentlyMovingPartsTagger>(std::move(parts_to_move), *this);
|
return std::make_shared<CurrentlyMovingPartsTagger>(std::move(parts_to_move), *this);
|
||||||
}
|
}
|
||||||
|
|
||||||
MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, bool wait_for_move_if_zero_copy)
|
MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, const WriteSettings & write_settings, bool wait_for_move_if_zero_copy)
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Got {} parts to move.", moving_tagger->parts_to_move.size());
|
LOG_INFO(log, "Got {} parts to move.", moving_tagger->parts_to_move.size());
|
||||||
|
|
||||||
@ -8053,7 +8057,7 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr &
|
|||||||
{
|
{
|
||||||
if (lock->isLocked())
|
if (lock->isLocked())
|
||||||
{
|
{
|
||||||
cloned_part = parts_mover.clonePart(moving_part);
|
cloned_part = parts_mover.clonePart(moving_part, write_settings);
|
||||||
parts_mover.swapClonedPart(cloned_part);
|
parts_mover.swapClonedPart(cloned_part);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -8080,7 +8084,7 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr &
|
|||||||
}
|
}
|
||||||
else /// Ordinary move as it should be
|
else /// Ordinary move as it should be
|
||||||
{
|
{
|
||||||
cloned_part = parts_mover.clonePart(moving_part);
|
cloned_part = parts_mover.clonePart(moving_part, write_settings);
|
||||||
parts_mover.swapClonedPart(cloned_part);
|
parts_mover.swapClonedPart(cloned_part);
|
||||||
}
|
}
|
||||||
write_part_log({});
|
write_part_log({});
|
||||||
|
@ -63,6 +63,8 @@ using BackupEntries = std::vector<std::pair<String, std::shared_ptr<const IBacku
|
|||||||
class MergeTreeTransaction;
|
class MergeTreeTransaction;
|
||||||
using MergeTreeTransactionPtr = std::shared_ptr<MergeTreeTransaction>;
|
using MergeTreeTransactionPtr = std::shared_ptr<MergeTreeTransaction>;
|
||||||
|
|
||||||
|
struct WriteSettings;
|
||||||
|
|
||||||
/// Auxiliary struct holding information about the future merged or mutated part.
|
/// Auxiliary struct holding information about the future merged or mutated part.
|
||||||
struct EmergingPartInfo
|
struct EmergingPartInfo
|
||||||
{
|
{
|
||||||
@ -841,9 +843,12 @@ public:
|
|||||||
MergeTreeData & checkStructureAndGetMergeTreeData(IStorage & source_table, const StorageMetadataPtr & src_snapshot, const StorageMetadataPtr & my_snapshot) const;
|
MergeTreeData & checkStructureAndGetMergeTreeData(IStorage & source_table, const StorageMetadataPtr & src_snapshot, const StorageMetadataPtr & my_snapshot) const;
|
||||||
|
|
||||||
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> cloneAndLoadDataPartOnSameDisk(
|
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> cloneAndLoadDataPartOnSameDisk(
|
||||||
const MergeTreeData::DataPartPtr & src_part, const String & tmp_part_prefix,
|
const MergeTreeData::DataPartPtr & src_part,
|
||||||
const MergeTreePartInfo & dst_part_info, const StorageMetadataPtr & metadata_snapshot,
|
const String & tmp_part_prefix,
|
||||||
const IDataPartStorage::ClonePartParams & params);
|
const MergeTreePartInfo & dst_part_info,
|
||||||
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
|
const IDataPartStorage::ClonePartParams & params,
|
||||||
|
const WriteSettings & write_settings);
|
||||||
|
|
||||||
virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0;
|
virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0;
|
||||||
|
|
||||||
@ -1336,7 +1341,7 @@ protected:
|
|||||||
/// MergeTree because they store mutations in different way.
|
/// MergeTree because they store mutations in different way.
|
||||||
virtual std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const DataPartPtr & part) const = 0;
|
virtual std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const DataPartPtr & part) const = 0;
|
||||||
/// Moves part to specified space, used in ALTER ... MOVE ... queries
|
/// Moves part to specified space, used in ALTER ... MOVE ... queries
|
||||||
MovePartsOutcome movePartsToSpace(const DataPartsVector & parts, SpacePtr space);
|
MovePartsOutcome movePartsToSpace(const DataPartsVector & parts, SpacePtr space, const WriteSettings & write_settings);
|
||||||
|
|
||||||
struct PartBackupEntries
|
struct PartBackupEntries
|
||||||
{
|
{
|
||||||
@ -1489,7 +1494,7 @@ private:
|
|||||||
using CurrentlyMovingPartsTaggerPtr = std::shared_ptr<CurrentlyMovingPartsTagger>;
|
using CurrentlyMovingPartsTaggerPtr = std::shared_ptr<CurrentlyMovingPartsTagger>;
|
||||||
|
|
||||||
/// Move selected parts to corresponding disks
|
/// Move selected parts to corresponding disks
|
||||||
MovePartsOutcome moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, bool wait_for_move_if_zero_copy=false);
|
MovePartsOutcome moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, const WriteSettings & write_settings, bool wait_for_move_if_zero_copy);
|
||||||
|
|
||||||
/// Select parts for move and disks for them. Used in background moving processes.
|
/// Select parts for move and disks for them. Used in background moving processes.
|
||||||
CurrentlyMovingPartsTaggerPtr selectPartsForMove();
|
CurrentlyMovingPartsTaggerPtr selectPartsForMove();
|
||||||
|
@ -208,7 +208,7 @@ bool MergeTreePartsMover::selectPartsForMove(
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const MergeTreeMoveEntry & moving_part) const
|
MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const MergeTreeMoveEntry & moving_part, const WriteSettings & write_settings) const
|
||||||
{
|
{
|
||||||
if (moves_blocker.isCancelled())
|
if (moves_blocker.isCancelled())
|
||||||
throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts.");
|
throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts.");
|
||||||
@ -248,12 +248,13 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Part {} was not fetched, we are the first who move it to another disk, so we will copy it", part->name);
|
LOG_INFO(log, "Part {} was not fetched, we are the first who move it to another disk, so we will copy it", part->name);
|
||||||
cloned_part_storage = part->getDataPartStorage().clonePart(path_to_clone, part->getDataPartStorage().getPartDirectory(), disk, log);
|
cloned_part_storage = part->getDataPartStorage().clonePart(
|
||||||
|
path_to_clone, part->getDataPartStorage().getPartDirectory(), disk, write_settings, log);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
cloned_part_storage = part->makeCloneOnDisk(disk, MergeTreeData::MOVING_DIR_NAME);
|
cloned_part_storage = part->makeCloneOnDisk(disk, MergeTreeData::MOVING_DIR_NAME, write_settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
MergeTreeDataPartBuilder builder(*data, part->name, cloned_part_storage);
|
MergeTreeDataPartBuilder builder(*data, part->name, cloned_part_storage);
|
||||||
|
@ -65,7 +65,7 @@ public:
|
|||||||
const std::lock_guard<std::mutex> & moving_parts_lock);
|
const std::lock_guard<std::mutex> & moving_parts_lock);
|
||||||
|
|
||||||
/// Copies part to selected reservation in detached folder. Throws exception if part already exists.
|
/// Copies part to selected reservation in detached folder. Throws exception if part already exists.
|
||||||
TemporaryClonedPart clonePart(const MergeTreeMoveEntry & moving_part) const;
|
TemporaryClonedPart clonePart(const MergeTreeMoveEntry & moving_part, const WriteSettings & write_settings) const;
|
||||||
|
|
||||||
/// Replaces cloned part from detached directory into active data parts set.
|
/// Replaces cloned part from detached directory into active data parts set.
|
||||||
/// Replacing part changes state to DeleteOnDestroy and will be removed from disk after destructor of
|
/// Replacing part changes state to DeleteOnDestroy and will be removed from disk after destructor of
|
||||||
|
@ -1845,7 +1845,7 @@ bool MutateTask::prepare()
|
|||||||
.txn = ctx->txn, .hardlinked_files = &ctx->hardlinked_files,
|
.txn = ctx->txn, .hardlinked_files = &ctx->hardlinked_files,
|
||||||
.files_to_copy_instead_of_hardlinks = std::move(files_to_copy_instead_of_hardlinks), .keep_metadata_version = true
|
.files_to_copy_instead_of_hardlinks = std::move(files_to_copy_instead_of_hardlinks), .keep_metadata_version = true
|
||||||
};
|
};
|
||||||
auto [part, lock] = ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, prefix, ctx->future_part->part_info, ctx->metadata_snapshot, clone_params);
|
auto [part, lock] = ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, prefix, ctx->future_part->part_info, ctx->metadata_snapshot, clone_params, ctx->context->getWriteSettings());
|
||||||
part->getDataPartStorage().beginTransaction();
|
part->getDataPartStorage().beginTransaction();
|
||||||
|
|
||||||
ctx->temporary_directory_lock = std::move(lock);
|
ctx->temporary_directory_lock = std::move(lock);
|
||||||
|
@ -17,9 +17,15 @@ namespace
|
|||||||
{
|
{
|
||||||
|
|
||||||
void localBackupImpl(
|
void localBackupImpl(
|
||||||
const DiskPtr & disk, IDiskTransaction * transaction, const String & source_path,
|
const DiskPtr & disk,
|
||||||
const String & destination_path, bool make_source_readonly, size_t level,
|
IDiskTransaction * transaction,
|
||||||
std::optional<size_t> max_level, bool copy_instead_of_hardlinks,
|
const String & source_path,
|
||||||
|
const String & destination_path,
|
||||||
|
const WriteSettings & settings,
|
||||||
|
bool make_source_readonly,
|
||||||
|
size_t level,
|
||||||
|
std::optional<size_t> max_level,
|
||||||
|
bool copy_instead_of_hardlinks,
|
||||||
const NameSet & files_to_copy_instead_of_hardlinks)
|
const NameSet & files_to_copy_instead_of_hardlinks)
|
||||||
{
|
{
|
||||||
if (max_level && level > *max_level)
|
if (max_level && level > *max_level)
|
||||||
@ -51,11 +57,11 @@ void localBackupImpl(
|
|||||||
{
|
{
|
||||||
if (transaction)
|
if (transaction)
|
||||||
{
|
{
|
||||||
transaction->copyFile(source, destination);
|
transaction->copyFile(source, destination, settings);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
disk->copyFile(source, *disk, destination);
|
disk->copyFile(source, *disk, destination, settings);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -69,8 +75,16 @@ void localBackupImpl(
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
localBackupImpl(
|
localBackupImpl(
|
||||||
disk, transaction, source, destination, make_source_readonly, level + 1, max_level,
|
disk,
|
||||||
copy_instead_of_hardlinks, files_to_copy_instead_of_hardlinks);
|
transaction,
|
||||||
|
source,
|
||||||
|
destination,
|
||||||
|
settings,
|
||||||
|
make_source_readonly,
|
||||||
|
level + 1,
|
||||||
|
max_level,
|
||||||
|
copy_instead_of_hardlinks,
|
||||||
|
files_to_copy_instead_of_hardlinks);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -112,9 +126,15 @@ private:
|
|||||||
}
|
}
|
||||||
|
|
||||||
void localBackup(
|
void localBackup(
|
||||||
const DiskPtr & disk, const String & source_path,
|
const DiskPtr & disk,
|
||||||
const String & destination_path, bool make_source_readonly,
|
const String & source_path,
|
||||||
std::optional<size_t> max_level, bool copy_instead_of_hardlinks, const NameSet & files_to_copy_intead_of_hardlinks, DiskTransactionPtr disk_transaction)
|
const String & destination_path,
|
||||||
|
const WriteSettings & settings,
|
||||||
|
bool make_source_readonly,
|
||||||
|
std::optional<size_t> max_level,
|
||||||
|
bool copy_instead_of_hardlinks,
|
||||||
|
const NameSet & files_to_copy_intead_of_hardlinks,
|
||||||
|
DiskTransactionPtr disk_transaction)
|
||||||
{
|
{
|
||||||
if (disk->exists(destination_path) && !disk->isDirectoryEmpty(destination_path))
|
if (disk->exists(destination_path) && !disk->isDirectoryEmpty(destination_path))
|
||||||
{
|
{
|
||||||
@ -135,12 +155,22 @@ void localBackup(
|
|||||||
{
|
{
|
||||||
if (disk_transaction)
|
if (disk_transaction)
|
||||||
{
|
{
|
||||||
localBackupImpl(disk, disk_transaction.get(), source_path, destination_path, make_source_readonly, 0, max_level, copy_instead_of_hardlinks, files_to_copy_intead_of_hardlinks);
|
localBackupImpl(
|
||||||
|
disk,
|
||||||
|
disk_transaction.get(),
|
||||||
|
source_path,
|
||||||
|
destination_path,
|
||||||
|
settings,
|
||||||
|
make_source_readonly,
|
||||||
|
/* level= */ 0,
|
||||||
|
max_level,
|
||||||
|
copy_instead_of_hardlinks,
|
||||||
|
files_to_copy_intead_of_hardlinks);
|
||||||
}
|
}
|
||||||
else if (copy_instead_of_hardlinks)
|
else if (copy_instead_of_hardlinks)
|
||||||
{
|
{
|
||||||
CleanupOnFail cleanup([disk, destination_path]() { disk->removeRecursive(destination_path); });
|
CleanupOnFail cleanup([disk, destination_path]() { disk->removeRecursive(destination_path); });
|
||||||
disk->copyDirectoryContent(source_path, disk, destination_path);
|
disk->copyDirectoryContent(source_path, disk, destination_path, settings);
|
||||||
cleanup.success();
|
cleanup.success();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -154,7 +184,17 @@ void localBackup(
|
|||||||
cleaner = [disk, destination_path]() { disk->removeRecursive(destination_path); };
|
cleaner = [disk, destination_path]() { disk->removeRecursive(destination_path); };
|
||||||
|
|
||||||
CleanupOnFail cleanup(std::move(cleaner));
|
CleanupOnFail cleanup(std::move(cleaner));
|
||||||
localBackupImpl(disk, disk_transaction.get(), source_path, destination_path, make_source_readonly, 0, max_level, false, files_to_copy_intead_of_hardlinks);
|
localBackupImpl(
|
||||||
|
disk,
|
||||||
|
disk_transaction.get(),
|
||||||
|
source_path,
|
||||||
|
destination_path,
|
||||||
|
settings,
|
||||||
|
make_source_readonly,
|
||||||
|
/* level= */ 0,
|
||||||
|
max_level,
|
||||||
|
/* copy_instead_of_hardlinks= */ false,
|
||||||
|
files_to_copy_intead_of_hardlinks);
|
||||||
cleanup.success();
|
cleanup.success();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,8 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
|
struct WriteSettings;
|
||||||
|
|
||||||
/** Creates a local (at the same mount point) backup (snapshot) directory.
|
/** Creates a local (at the same mount point) backup (snapshot) directory.
|
||||||
*
|
*
|
||||||
* In the specified destination directory, it creates hard links on all source-directory files
|
* In the specified destination directory, it creates hard links on all source-directory files
|
||||||
@ -22,6 +24,15 @@ namespace DB
|
|||||||
*
|
*
|
||||||
* If `transaction` is provided, the changes will be added to it instead of performend on disk.
|
* If `transaction` is provided, the changes will be added to it instead of performend on disk.
|
||||||
*/
|
*/
|
||||||
void localBackup(const DiskPtr & disk, const String & source_path, const String & destination_path, bool make_source_readonly = true, std::optional<size_t> max_level = {}, bool copy_instead_of_hardlinks = false, const NameSet & files_to_copy_intead_of_hardlinks = {}, DiskTransactionPtr disk_transaction = nullptr);
|
void localBackup(
|
||||||
|
const DiskPtr & disk,
|
||||||
|
const String & source_path,
|
||||||
|
const String & destination_path,
|
||||||
|
const WriteSettings & settings,
|
||||||
|
bool make_source_readonly = true,
|
||||||
|
std::optional<size_t> max_level = {},
|
||||||
|
bool copy_instead_of_hardlinks = false,
|
||||||
|
const NameSet & files_to_copy_intead_of_hardlinks = {},
|
||||||
|
DiskTransactionPtr disk_transaction = nullptr);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -2035,7 +2035,7 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
|
|||||||
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
|
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
|
||||||
|
|
||||||
IDataPartStorage::ClonePartParams clone_params{.txn = local_context->getCurrentTransaction()};
|
IDataPartStorage::ClonePartParams clone_params{.txn = local_context->getCurrentTransaction()};
|
||||||
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, my_metadata_snapshot, clone_params);
|
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, my_metadata_snapshot, clone_params, local_context->getWriteSettings());
|
||||||
dst_parts.emplace_back(std::move(dst_part));
|
dst_parts.emplace_back(std::move(dst_part));
|
||||||
dst_parts_locks.emplace_back(std::move(part_lock));
|
dst_parts_locks.emplace_back(std::move(part_lock));
|
||||||
}
|
}
|
||||||
@ -2134,7 +2134,7 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
|
|||||||
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
|
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
|
||||||
|
|
||||||
IDataPartStorage::ClonePartParams clone_params{.txn = local_context->getCurrentTransaction()};
|
IDataPartStorage::ClonePartParams clone_params{.txn = local_context->getCurrentTransaction()};
|
||||||
auto [dst_part, part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, clone_params);
|
auto [dst_part, part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, clone_params, local_context->getWriteSettings());
|
||||||
dst_parts.emplace_back(std::move(dst_part));
|
dst_parts.emplace_back(std::move(dst_part));
|
||||||
dst_parts_locks.emplace_back(std::move(part_lock));
|
dst_parts_locks.emplace_back(std::move(part_lock));
|
||||||
}
|
}
|
||||||
|
@ -2465,7 +2465,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
|||||||
.metadata_version_to_write = metadata_snapshot->getMetadataVersion()
|
.metadata_version_to_write = metadata_snapshot->getMetadataVersion()
|
||||||
};
|
};
|
||||||
auto [res_part, temporary_part_lock] = cloneAndLoadDataPartOnSameDisk(
|
auto [res_part, temporary_part_lock] = cloneAndLoadDataPartOnSameDisk(
|
||||||
part_desc->src_table_part, TMP_PREFIX + "clone_", part_desc->new_part_info, metadata_snapshot, clone_params);
|
part_desc->src_table_part, TMP_PREFIX + "clone_", part_desc->new_part_info, metadata_snapshot, clone_params, getContext()->getWriteSettings());
|
||||||
part_desc->res_part = std::move(res_part);
|
part_desc->res_part = std::move(res_part);
|
||||||
part_desc->temporary_part_lock = std::move(temporary_part_lock);
|
part_desc->temporary_part_lock = std::move(temporary_part_lock);
|
||||||
}
|
}
|
||||||
@ -4560,7 +4560,7 @@ bool StorageReplicatedMergeTree::fetchPart(
|
|||||||
{
|
{
|
||||||
chassert(!is_zero_copy_part(part_to_clone));
|
chassert(!is_zero_copy_part(part_to_clone));
|
||||||
IDataPartStorage::ClonePartParams clone_params{ .keep_metadata_version = true };
|
IDataPartStorage::ClonePartParams clone_params{ .keep_metadata_version = true };
|
||||||
auto [cloned_part, lock] = cloneAndLoadDataPartOnSameDisk(part_to_clone, "tmp_clone_", part_info, metadata_snapshot, clone_params);
|
auto [cloned_part, lock] = cloneAndLoadDataPartOnSameDisk(part_to_clone, "tmp_clone_", part_info, metadata_snapshot, clone_params, getContext()->getWriteSettings());
|
||||||
part_directory_lock = std::move(lock);
|
part_directory_lock = std::move(lock);
|
||||||
return cloned_part;
|
return cloned_part;
|
||||||
};
|
};
|
||||||
@ -7606,7 +7606,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
|
|||||||
.copy_instead_of_hardlink = zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport(),
|
.copy_instead_of_hardlink = zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport(),
|
||||||
.metadata_version_to_write = metadata_snapshot->getMetadataVersion()
|
.metadata_version_to_write = metadata_snapshot->getMetadataVersion()
|
||||||
};
|
};
|
||||||
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, metadata_snapshot, clone_params);
|
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, metadata_snapshot, clone_params, query_context->getWriteSettings());
|
||||||
src_parts.emplace_back(src_part);
|
src_parts.emplace_back(src_part);
|
||||||
dst_parts.emplace_back(dst_part);
|
dst_parts.emplace_back(dst_part);
|
||||||
dst_parts_locks.emplace_back(std::move(part_lock));
|
dst_parts_locks.emplace_back(std::move(part_lock));
|
||||||
@ -7846,7 +7846,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
|
|||||||
.copy_instead_of_hardlink = zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport(),
|
.copy_instead_of_hardlink = zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport(),
|
||||||
.metadata_version_to_write = dest_metadata_snapshot->getMetadataVersion()
|
.metadata_version_to_write = dest_metadata_snapshot->getMetadataVersion()
|
||||||
};
|
};
|
||||||
auto [dst_part, dst_part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, clone_params);
|
auto [dst_part, dst_part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, clone_params, query_context->getWriteSettings());
|
||||||
|
|
||||||
src_parts.emplace_back(src_part);
|
src_parts.emplace_back(src_part);
|
||||||
dst_parts.emplace_back(dst_part);
|
dst_parts.emplace_back(dst_part);
|
||||||
|
@ -72,6 +72,12 @@
|
|||||||
</s3_cache_multi_2>
|
</s3_cache_multi_2>
|
||||||
</disks>
|
</disks>
|
||||||
<policies>
|
<policies>
|
||||||
|
<local_remote>
|
||||||
|
<volumes>
|
||||||
|
<local><disk>default</disk></local>
|
||||||
|
<remote><disk>s3_disk</disk></remote>
|
||||||
|
</volumes>
|
||||||
|
</local_remote>
|
||||||
<s3_cache>
|
<s3_cache>
|
||||||
<volumes>
|
<volumes>
|
||||||
<main>
|
<main>
|
||||||
|
@ -0,0 +1,3 @@
|
|||||||
|
default tuple() 1000000
|
||||||
|
Alter 1
|
||||||
|
s3_disk tuple() 1000000
|
@ -0,0 +1,12 @@
|
|||||||
|
-- Tags: no-random-merge-tree-settings, no-fasttest, no-replicated-database
|
||||||
|
-- Tag: no-fasttest -- requires S3
|
||||||
|
-- Tag: no-replicated-database -- ALTER MOVE PARTITION TO should not be replicated (will be fixed separatelly)
|
||||||
|
|
||||||
|
CREATE TABLE test_move_partition_throttling (key UInt64 CODEC(NONE)) ENGINE = MergeTree ORDER BY tuple() SETTINGS storage_policy='local_remote';
|
||||||
|
INSERT INTO test_move_partition_throttling SELECT number FROM numbers(1e6);
|
||||||
|
SELECT disk_name, partition, rows FROM system.parts WHERE database = currentDatabase() AND table = 'test_move_partition_throttling' and active;
|
||||||
|
ALTER TABLE test_move_partition_throttling MOVE PARTITION tuple() TO VOLUME 'remote' SETTINGS max_remote_write_network_bandwidth=1600000;
|
||||||
|
SYSTEM FLUSH LOGS;
|
||||||
|
-- (8e6-1600000)/1600000=4.0
|
||||||
|
SELECT query_kind, query_duration_ms>4e3 FROM system.query_log WHERE type = 'QueryFinish' AND current_database = currentDatabase() AND query_kind = 'Alter';
|
||||||
|
SELECT disk_name, partition, rows FROM system.parts WHERE database = currentDatabase() AND table = 'test_move_partition_throttling' and active;
|
Loading…
Reference in New Issue
Block a user