Merge remote-tracking branch 'rschu1ze/master' into is_deterministic

This commit is contained in:
Robert Schulze 2023-09-21 09:29:44 +00:00
commit 44b7fa1030
No known key found for this signature in database
GPG Key ID: 26703B55FB13728A
58 changed files with 378 additions and 129 deletions

View File

@ -57,7 +57,7 @@ public:
String relative_path_from = validatePathAndGetAsRelative(path_from); String relative_path_from = validatePathAndGetAsRelative(path_from);
String relative_path_to = validatePathAndGetAsRelative(path_to); String relative_path_to = validatePathAndGetAsRelative(path_to);
disk_from->copyDirectoryContent(relative_path_from, disk_to, relative_path_to, /* settings= */ {}); disk_from->copyDirectoryContent(relative_path_from, disk_to, relative_path_to, /* read_settings= */ {}, /* write_settings= */ {});
} }
}; };
} }

View File

@ -46,7 +46,7 @@ void BackupReaderDisk::copyFileToDisk(const String & path_in_backup, size_t file
{ {
/// Use more optimal way. /// Use more optimal way.
LOG_TRACE(log, "Copying file {} from disk {} to disk {}", path_in_backup, disk->getName(), destination_disk->getName()); LOG_TRACE(log, "Copying file {} from disk {} to disk {}", path_in_backup, disk->getName(), destination_disk->getName());
disk->copyFile(root_path / path_in_backup, *destination_disk, destination_path, write_settings); disk->copyFile(root_path / path_in_backup, *destination_disk, destination_path, read_settings, write_settings);
return; /// copied! return; /// copied!
} }
} }
@ -119,7 +119,7 @@ void BackupWriterDisk::copyFileFromDisk(const String & path_in_backup, DiskPtr s
LOG_TRACE(log, "Copying file {} from disk {} to disk {}", src_path, src_disk->getName(), disk->getName()); LOG_TRACE(log, "Copying file {} from disk {} to disk {}", src_path, src_disk->getName(), disk->getName());
auto dest_file_path = root_path / path_in_backup; auto dest_file_path = root_path / path_in_backup;
disk->createDirectories(dest_file_path.parent_path()); disk->createDirectories(dest_file_path.parent_path());
src_disk->copyFile(src_path, *disk, dest_file_path, write_settings); src_disk->copyFile(src_path, *disk, dest_file_path, read_settings, write_settings);
return; /// copied! return; /// copied!
} }
} }

View File

@ -170,6 +170,7 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s
/* dest_bucket= */ blob_path[1], /* dest_bucket= */ blob_path[1],
/* dest_key= */ blob_path[0], /* dest_key= */ blob_path[0],
request_settings, request_settings,
read_settings,
object_attributes, object_attributes,
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupReaderS3"), threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupReaderS3"),
/* for_disk_s3= */ true); /* for_disk_s3= */ true);
@ -230,6 +231,7 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src
s3_uri.bucket, s3_uri.bucket,
fs::path(s3_uri.key) / path_in_backup, fs::path(s3_uri.key) / path_in_backup,
request_settings, request_settings,
read_settings,
{}, {},
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3")); threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
return; /// copied! return; /// copied!

View File

@ -103,6 +103,7 @@ static const size_t signal_pipe_buf_size =
+ sizeof(siginfo_t) + sizeof(siginfo_t)
+ sizeof(ucontext_t*) + sizeof(ucontext_t*)
+ sizeof(StackTrace) + sizeof(StackTrace)
+ sizeof(UInt64)
+ sizeof(UInt32) + sizeof(UInt32)
+ sizeof(void*); + sizeof(void*);

View File

@ -324,7 +324,7 @@ ReservationPtr DiskEncrypted::reserve(UInt64 bytes)
} }
void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const WriteSettings & settings) void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const ReadSettings & read_settings, const WriteSettings & write_settings)
{ {
/// Check if we can copy the file without deciphering. /// Check if we can copy the file without deciphering.
if (isSameDiskType(*this, *to_disk)) if (isSameDiskType(*this, *to_disk))
@ -340,14 +340,14 @@ void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::sha
auto wrapped_from_path = wrappedPath(from_dir); auto wrapped_from_path = wrappedPath(from_dir);
auto to_delegate = to_disk_enc->delegate; auto to_delegate = to_disk_enc->delegate;
auto wrapped_to_path = to_disk_enc->wrappedPath(to_dir); auto wrapped_to_path = to_disk_enc->wrappedPath(to_dir);
delegate->copyDirectoryContent(wrapped_from_path, to_delegate, wrapped_to_path, settings); delegate->copyDirectoryContent(wrapped_from_path, to_delegate, wrapped_to_path, read_settings, write_settings);
return; return;
} }
} }
} }
/// Copy the file through buffers with deciphering. /// Copy the file through buffers with deciphering.
IDisk::copyDirectoryContent(from_dir, to_disk, to_dir, settings); IDisk::copyDirectoryContent(from_dir, to_disk, to_dir, read_settings, write_settings);
} }
std::unique_ptr<ReadBufferFromFileBase> DiskEncrypted::readFile( std::unique_ptr<ReadBufferFromFileBase> DiskEncrypted::readFile(

View File

@ -112,7 +112,7 @@ public:
delegate->listFiles(wrapped_path, file_names); delegate->listFiles(wrapped_path, file_names);
} }
void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const WriteSettings & settings) override; void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const ReadSettings & read_settings, const WriteSettings & write_settings) override;
std::unique_ptr<ReadBufferFromFileBase> readFile( std::unique_ptr<ReadBufferFromFileBase> readFile(
const String & path, const String & path,

View File

@ -53,11 +53,11 @@ String DiskEncryptedSettings::findKeyByFingerprint(UInt128 key_fingerprint, cons
return it->second; return it->second;
} }
void DiskEncryptedTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings) void DiskEncryptedTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings)
{ {
auto wrapped_from_path = wrappedPath(from_file_path); auto wrapped_from_path = wrappedPath(from_file_path);
auto wrapped_to_path = wrappedPath(to_file_path); auto wrapped_to_path = wrappedPath(to_file_path);
delegate_transaction->copyFile(wrapped_from_path, wrapped_to_path, settings); delegate_transaction->copyFile(wrapped_from_path, wrapped_to_path, read_settings, write_settings);
} }
std::unique_ptr<WriteBufferFromFileBase> DiskEncryptedTransaction::writeFile( // NOLINT std::unique_ptr<WriteBufferFromFileBase> DiskEncryptedTransaction::writeFile( // NOLINT

View File

@ -116,7 +116,7 @@ public:
/// but it's impossible to implement correctly in transactions because other disk can /// but it's impossible to implement correctly in transactions because other disk can
/// use different metadata storage. /// use different metadata storage.
/// TODO: maybe remove it at all, we don't want copies /// TODO: maybe remove it at all, we don't want copies
void copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings) override; void copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings) override;
/// Open the file for write and return WriteBufferFromFileBase object. /// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeFile( /// NOLINT std::unique_ptr<WriteBufferFromFileBase> writeFile( /// NOLINT

View File

@ -432,13 +432,13 @@ bool inline isSameDiskType(const IDisk & one, const IDisk & another)
return typeid(one) == typeid(another); return typeid(one) == typeid(another);
} }
void DiskLocal::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const WriteSettings & settings) void DiskLocal::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const ReadSettings & read_settings, const WriteSettings & write_settings)
{ {
/// If throttling was configured we cannot use copying directly. /// If throttling was configured we cannot use copying directly.
if (isSameDiskType(*this, *to_disk) && !settings.local_throttler) if (isSameDiskType(*this, *to_disk) && !read_settings.local_throttler && !write_settings.local_throttler)
fs::copy(fs::path(disk_path) / from_dir, fs::path(to_disk->getPath()) / to_dir, fs::copy_options::recursive | fs::copy_options::overwrite_existing); /// Use more optimal way. fs::copy(fs::path(disk_path) / from_dir, fs::path(to_disk->getPath()) / to_dir, fs::copy_options::recursive | fs::copy_options::overwrite_existing); /// Use more optimal way.
else else
IDisk::copyDirectoryContent(from_dir, to_disk, to_dir, settings); IDisk::copyDirectoryContent(from_dir, to_disk, to_dir, read_settings, write_settings);
} }
SyncGuardPtr DiskLocal::getDirectorySyncGuard(const String & path) const SyncGuardPtr DiskLocal::getDirectorySyncGuard(const String & path) const

View File

@ -65,7 +65,7 @@ public:
void replaceFile(const String & from_path, const String & to_path) override; void replaceFile(const String & from_path, const String & to_path) override;
void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const WriteSettings & settings) override; void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const ReadSettings & read_settings, const WriteSettings & write_settings) override;
void listFiles(const String & path, std::vector<String> & file_names) const override; void listFiles(const String & path, std::vector<String> & file_names) const override;

View File

@ -54,9 +54,9 @@ public:
disk.replaceFile(from_path, to_path); disk.replaceFile(from_path, to_path);
} }
void copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings) override void copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings) override
{ {
disk.copyFile(from_file_path, disk, to_file_path, settings); disk.copyFile(from_file_path, disk, to_file_path, read_settings, write_settings);
} }
std::unique_ptr<WriteBufferFromFileBase> writeFile( /// NOLINT std::unique_ptr<WriteBufferFromFileBase> writeFile( /// NOLINT

View File

@ -24,13 +24,13 @@ bool IDisk::isDirectoryEmpty(const String & path) const
return !iterateDirectory(path)->isValid(); return !iterateDirectory(path)->isValid();
} }
void IDisk::copyFile(const String & from_file_path, IDisk & to_disk, const String & to_file_path, const WriteSettings & settings) /// NOLINT void IDisk::copyFile(const String & from_file_path, IDisk & to_disk, const String & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings) /// NOLINT
{ {
LOG_DEBUG(&Poco::Logger::get("IDisk"), "Copying from {} (path: {}) {} to {} (path: {}) {}.", LOG_DEBUG(&Poco::Logger::get("IDisk"), "Copying from {} (path: {}) {} to {} (path: {}) {}.",
getName(), getPath(), from_file_path, to_disk.getName(), to_disk.getPath(), to_file_path); getName(), getPath(), from_file_path, to_disk.getName(), to_disk.getPath(), to_file_path);
auto in = readFile(from_file_path); auto in = readFile(from_file_path, read_settings);
auto out = to_disk.writeFile(to_file_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, settings); auto out = to_disk.writeFile(to_file_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, write_settings);
copyData(*in, *out); copyData(*in, *out);
out->finalize(); out->finalize();
} }
@ -80,7 +80,7 @@ UInt128 IDisk::getEncryptedFileIV(const String &) const
using ResultsCollector = std::vector<std::future<void>>; using ResultsCollector = std::vector<std::future<void>>;
void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_path, ThreadPool & pool, ResultsCollector & results, bool copy_root_dir, const WriteSettings & settings) void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_path, ThreadPool & pool, ResultsCollector & results, bool copy_root_dir, const ReadSettings & read_settings, const WriteSettings & write_settings)
{ {
if (from_disk.isFile(from_path)) if (from_disk.isFile(from_path))
{ {
@ -88,7 +88,7 @@ void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_p
auto future = promise->get_future(); auto future = promise->get_future();
pool.scheduleOrThrowOnError( pool.scheduleOrThrowOnError(
[&from_disk, from_path, &to_disk, to_path, &settings, promise, thread_group = CurrentThread::getGroup()]() [&from_disk, from_path, &to_disk, to_path, &read_settings, &write_settings, promise, thread_group = CurrentThread::getGroup()]()
{ {
try try
{ {
@ -97,7 +97,7 @@ void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_p
if (thread_group) if (thread_group)
CurrentThread::attachToGroup(thread_group); CurrentThread::attachToGroup(thread_group);
from_disk.copyFile(from_path, to_disk, fs::path(to_path) / fileName(from_path), settings); from_disk.copyFile(from_path, to_disk, fs::path(to_path) / fileName(from_path), read_settings, write_settings);
promise->set_value(); promise->set_value();
} }
catch (...) catch (...)
@ -119,19 +119,19 @@ void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_p
} }
for (auto it = from_disk.iterateDirectory(from_path); it->isValid(); it->next()) for (auto it = from_disk.iterateDirectory(from_path); it->isValid(); it->next())
asyncCopy(from_disk, it->path(), to_disk, dest, pool, results, true, settings); asyncCopy(from_disk, it->path(), to_disk, dest, pool, results, true, read_settings, write_settings);
} }
} }
void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path, bool copy_root_dir, WriteSettings settings) void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path, bool copy_root_dir, const ReadSettings & read_settings, WriteSettings write_settings)
{ {
ResultsCollector results; ResultsCollector results;
/// Disable parallel write. We already copy in parallel. /// Disable parallel write. We already copy in parallel.
/// Avoid high memory usage. See test_s3_zero_copy_ttl/test.py::test_move_and_s3_memory_usage /// Avoid high memory usage. See test_s3_zero_copy_ttl/test.py::test_move_and_s3_memory_usage
settings.s3_allow_parallel_part_upload = false; write_settings.s3_allow_parallel_part_upload = false;
asyncCopy(*this, from_path, *to_disk, to_path, copying_thread_pool, results, copy_root_dir, settings); asyncCopy(*this, from_path, *to_disk, to_path, copying_thread_pool, results, copy_root_dir, read_settings, write_settings);
for (auto & result : results) for (auto & result : results)
result.wait(); result.wait();
@ -140,12 +140,12 @@ void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptr<I
} }
void IDisk::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const WriteSettings & settings) void IDisk::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const ReadSettings & read_settings, const WriteSettings & write_settings)
{ {
if (!to_disk->exists(to_dir)) if (!to_disk->exists(to_dir))
to_disk->createDirectories(to_dir); to_disk->createDirectories(to_dir);
copyThroughBuffers(from_dir, to_disk, to_dir, /* copy_root_dir= */ false, settings); copyThroughBuffers(from_dir, to_disk, to_dir, /* copy_root_dir= */ false, read_settings, write_settings);
} }
void IDisk::truncateFile(const String &, size_t) void IDisk::truncateFile(const String &, size_t)

View File

@ -193,14 +193,15 @@ public:
virtual void replaceFile(const String & from_path, const String & to_path) = 0; virtual void replaceFile(const String & from_path, const String & to_path) = 0;
/// Recursively copy files from from_dir to to_dir. Create to_dir if not exists. /// Recursively copy files from from_dir to to_dir. Create to_dir if not exists.
virtual void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const WriteSettings & settings); virtual void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir, const ReadSettings & read_settings, const WriteSettings & write_settings);
/// Copy file `from_file_path` to `to_file_path` located at `to_disk`. /// Copy file `from_file_path` to `to_file_path` located at `to_disk`.
virtual void copyFile( /// NOLINT virtual void copyFile( /// NOLINT
const String & from_file_path, const String & from_file_path,
IDisk & to_disk, IDisk & to_disk,
const String & to_file_path, const String & to_file_path,
const WriteSettings & settings = {}); const ReadSettings & read_settings = {},
const WriteSettings & write_settings = {});
/// List files at `path` and add their names to `file_names` /// List files at `path` and add their names to `file_names`
virtual void listFiles(const String & path, std::vector<String> & file_names) const = 0; virtual void listFiles(const String & path, std::vector<String> & file_names) const = 0;
@ -470,7 +471,7 @@ protected:
/// Base implementation of the function copy(). /// Base implementation of the function copy().
/// It just opens two files, reads data by portions from the first file, and writes it to the second one. /// It just opens two files, reads data by portions from the first file, and writes it to the second one.
/// A derived class may override copy() to provide a faster implementation. /// A derived class may override copy() to provide a faster implementation.
void copyThroughBuffers(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path, bool copy_root_dir, WriteSettings settings); void copyThroughBuffers(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path, bool copy_root_dir, const ReadSettings & read_settings, WriteSettings write_settings);
virtual void checkAccessImpl(const String & path); virtual void checkAccessImpl(const String & path);

View File

@ -59,7 +59,11 @@ public:
/// but it's impossible to implement correctly in transactions because other disk can /// but it's impossible to implement correctly in transactions because other disk can
/// use different metadata storage. /// use different metadata storage.
/// TODO: maybe remove it at all, we don't want copies /// TODO: maybe remove it at all, we don't want copies
virtual void copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings = {}) = 0; virtual void copyFile(
const std::string & from_file_path,
const std::string & to_file_path,
const ReadSettings & read_settings = {},
const WriteSettings & write_settings = {}) = 0;
/// Open the file for write and return WriteBufferFromFileBase object. /// Open the file for write and return WriteBufferFromFileBase object.
virtual std::unique_ptr<WriteBufferFromFileBase> writeFile( /// NOLINT virtual std::unique_ptr<WriteBufferFromFileBase> writeFile( /// NOLINT

View File

@ -357,6 +357,8 @@ ObjectMetadata AzureObjectStorage::getObjectMetadata(const std::string & path) c
void AzureObjectStorage::copyObject( /// NOLINT void AzureObjectStorage::copyObject( /// NOLINT
const StoredObject & object_from, const StoredObject & object_from,
const StoredObject & object_to, const StoredObject & object_to,
const ReadSettings &,
const WriteSettings &,
std::optional<ObjectAttributes> object_to_attributes) std::optional<ObjectAttributes> object_to_attributes)
{ {
auto client_ptr = client.get(); auto client_ptr = client.get();

View File

@ -100,6 +100,8 @@ public:
void copyObject( /// NOLINT void copyObject( /// NOLINT
const StoredObject & object_from, const StoredObject & object_from,
const StoredObject & object_to, const StoredObject & object_to,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
std::optional<ObjectAttributes> object_to_attributes = {}) override; std::optional<ObjectAttributes> object_to_attributes = {}) override;
void shutdown() override {} void shutdown() override {}

View File

@ -160,16 +160,22 @@ void CachedObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
void CachedObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT void CachedObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
const StoredObject & object_from, const StoredObject & object_from,
const StoredObject & object_to, const StoredObject & object_to,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
IObjectStorage & object_storage_to, IObjectStorage & object_storage_to,
std::optional<ObjectAttributes> object_to_attributes) std::optional<ObjectAttributes> object_to_attributes)
{ {
object_storage->copyObjectToAnotherObjectStorage(object_from, object_to, object_storage_to, object_to_attributes); object_storage->copyObjectToAnotherObjectStorage(object_from, object_to, read_settings, write_settings, object_storage_to, object_to_attributes);
} }
void CachedObjectStorage::copyObject( // NOLINT void CachedObjectStorage::copyObject( // NOLINT
const StoredObject & object_from, const StoredObject & object_to, std::optional<ObjectAttributes> object_to_attributes) const StoredObject & object_from,
const StoredObject & object_to,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
std::optional<ObjectAttributes> object_to_attributes)
{ {
object_storage->copyObject(object_from, object_to, object_to_attributes); object_storage->copyObject(object_from, object_to, read_settings, write_settings, object_to_attributes);
} }
std::unique_ptr<IObjectStorage> CachedObjectStorage::cloneObjectStorage( std::unique_ptr<IObjectStorage> CachedObjectStorage::cloneObjectStorage(

View File

@ -57,11 +57,15 @@ public:
void copyObject( /// NOLINT void copyObject( /// NOLINT
const StoredObject & object_from, const StoredObject & object_from,
const StoredObject & object_to, const StoredObject & object_to,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
std::optional<ObjectAttributes> object_to_attributes = {}) override; std::optional<ObjectAttributes> object_to_attributes = {}) override;
void copyObjectToAnotherObjectStorage( /// NOLINT void copyObjectToAnotherObjectStorage( /// NOLINT
const StoredObject & object_from, const StoredObject & object_from,
const StoredObject & object_to, const StoredObject & object_to,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
IObjectStorage & object_storage_to, IObjectStorage & object_storage_to,
std::optional<ObjectAttributes> object_to_attributes = {}) override; std::optional<ObjectAttributes> object_to_attributes = {}) override;

View File

@ -68,7 +68,7 @@ DiskObjectStorage::DiskObjectStorage(
, send_metadata(config.getBool(config_prefix + ".send_metadata", false)) , send_metadata(config.getBool(config_prefix + ".send_metadata", false))
, read_resource_name(config.getString(config_prefix + ".read_resource", "")) , read_resource_name(config.getString(config_prefix + ".read_resource", ""))
, write_resource_name(config.getString(config_prefix + ".write_resource", "")) , write_resource_name(config.getString(config_prefix + ".write_resource", ""))
, metadata_helper(std::make_unique<DiskObjectStorageRemoteMetadataRestoreHelper>(this, ReadSettings{})) , metadata_helper(std::make_unique<DiskObjectStorageRemoteMetadataRestoreHelper>(this, ReadSettings{}, WriteSettings{}))
{} {}
StoredObjects DiskObjectStorage::getStorageObjects(const String & local_path) const StoredObjects DiskObjectStorage::getStorageObjects(const String & local_path) const
@ -180,7 +180,8 @@ void DiskObjectStorage::copyFile( /// NOLINT
const String & from_file_path, const String & from_file_path,
IDisk & to_disk, IDisk & to_disk,
const String & to_file_path, const String & to_file_path,
const WriteSettings & settings) const ReadSettings & read_settings,
const WriteSettings & write_settings)
{ {
if (this == &to_disk) if (this == &to_disk)
{ {
@ -192,7 +193,7 @@ void DiskObjectStorage::copyFile( /// NOLINT
else else
{ {
/// Copy through buffers /// Copy through buffers
IDisk::copyFile(from_file_path, to_disk, to_file_path, settings); IDisk::copyFile(from_file_path, to_disk, to_file_path, read_settings, write_settings);
} }
} }

View File

@ -162,7 +162,8 @@ public:
const String & from_file_path, const String & from_file_path,
IDisk & to_disk, IDisk & to_disk,
const String & to_file_path, const String & to_file_path,
const WriteSettings & settings = {}) override; const ReadSettings & read_settings = {},
const WriteSettings & write_settings = {}) override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &) override; void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &) override;

View File

@ -84,7 +84,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::saveSchemaVersion(const int &
{ {
StoredObject object{fs::path(disk->object_storage_root_path) / SCHEMA_VERSION_OBJECT}; StoredObject object{fs::path(disk->object_storage_root_path) / SCHEMA_VERSION_OBJECT};
auto buf = disk->object_storage->writeObject(object, WriteMode::Rewrite); auto buf = disk->object_storage->writeObject(object, WriteMode::Rewrite, /* attributes= */ {}, /* buf_size= */ DBMS_DEFAULT_BUFFER_SIZE, write_settings);
writeIntText(version, *buf); writeIntText(version, *buf);
buf->finalize(); buf->finalize();
@ -93,7 +93,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::saveSchemaVersion(const int &
void DiskObjectStorageRemoteMetadataRestoreHelper::updateObjectMetadata(const String & key, const ObjectAttributes & metadata) const void DiskObjectStorageRemoteMetadataRestoreHelper::updateObjectMetadata(const String & key, const ObjectAttributes & metadata) const
{ {
StoredObject object{key}; StoredObject object{key};
disk->object_storage->copyObject(object, object, metadata); disk->object_storage->copyObject(object, object, read_settings, write_settings, metadata);
} }
void DiskObjectStorageRemoteMetadataRestoreHelper::migrateFileToRestorableSchema(const String & path) const void DiskObjectStorageRemoteMetadataRestoreHelper::migrateFileToRestorableSchema(const String & path) const
@ -434,7 +434,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::processRestoreFiles(
/// Copy object if we restore to different bucket / path. /// Copy object if we restore to different bucket / path.
if (source_object_storage->getObjectsNamespace() != disk->object_storage->getObjectsNamespace() || disk->object_storage_root_path != source_path) if (source_object_storage->getObjectsNamespace() != disk->object_storage->getObjectsNamespace() || disk->object_storage_root_path != source_path)
source_object_storage->copyObjectToAnotherObjectStorage(object_from, object_to, *disk->object_storage); source_object_storage->copyObjectToAnotherObjectStorage(object_from, object_to, read_settings, write_settings, *disk->object_storage);
auto tx = disk->metadata_storage->createTransaction(); auto tx = disk->metadata_storage->createTransaction();
tx->addBlobToMetadata(path, relative_key, meta.size_bytes); tx->addBlobToMetadata(path, relative_key, meta.size_bytes);

View File

@ -24,9 +24,10 @@ public:
static constexpr UInt64 LATEST_REVISION = std::numeric_limits<UInt64>::max(); static constexpr UInt64 LATEST_REVISION = std::numeric_limits<UInt64>::max();
static constexpr UInt64 UNKNOWN_REVISION = 0; static constexpr UInt64 UNKNOWN_REVISION = 0;
DiskObjectStorageRemoteMetadataRestoreHelper(DiskObjectStorage * disk_, ReadSettings read_settings_) DiskObjectStorageRemoteMetadataRestoreHelper(DiskObjectStorage * disk_, ReadSettings read_settings_, WriteSettings write_settings_)
: disk(disk_) : disk(disk_)
, read_settings(std::move(read_settings_)) , read_settings(std::move(read_settings_))
, write_settings(std::move(write_settings_))
, operation_log_suffix("-" + getFQDNOrHostName()) , operation_log_suffix("-" + getFQDNOrHostName())
{ {
} }
@ -94,6 +95,7 @@ private:
ObjectStoragePtr object_storage_from_another_namespace; ObjectStoragePtr object_storage_from_another_namespace;
ReadSettings read_settings; ReadSettings read_settings;
WriteSettings write_settings;
String operation_log_suffix; String operation_log_suffix;
}; };

View File

@ -1,6 +1,7 @@
#include <Disks/ObjectStorages/DiskObjectStorageTransaction.h> #include <Disks/ObjectStorages/DiskObjectStorageTransaction.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h> #include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/IO/WriteBufferWithFinalizeCallback.h> #include <Disks/IO/WriteBufferWithFinalizeCallback.h>
#include <Interpreters/Context.h>
#include <Common/checkStackSize.h> #include <Common/checkStackSize.h>
#include <ranges> #include <ranges>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
@ -474,6 +475,9 @@ struct WriteFileObjectStorageOperation final : public IDiskObjectStorageOperatio
struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
{ {
ReadSettings read_settings;
WriteSettings write_settings;
/// Local paths /// Local paths
std::string from_path; std::string from_path;
std::string to_path; std::string to_path;
@ -483,9 +487,13 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
CopyFileObjectStorageOperation( CopyFileObjectStorageOperation(
IObjectStorage & object_storage_, IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_, IMetadataStorage & metadata_storage_,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const std::string & from_path_, const std::string & from_path_,
const std::string & to_path_) const std::string & to_path_)
: IDiskObjectStorageOperation(object_storage_, metadata_storage_) : IDiskObjectStorageOperation(object_storage_, metadata_storage_)
, read_settings(read_settings_)
, write_settings(write_settings_)
, from_path(from_path_) , from_path(from_path_)
, to_path(to_path_) , to_path(to_path_)
{} {}
@ -505,7 +513,7 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
std::string blob_name = object_storage.generateBlobNameForPath(to_path); std::string blob_name = object_storage.generateBlobNameForPath(to_path);
auto object_to = StoredObject(fs::path(metadata_storage.getObjectStorageRootPath()) / blob_name); auto object_to = StoredObject(fs::path(metadata_storage.getObjectStorageRootPath()) / blob_name);
object_storage.copyObject(object_from, object_to); object_storage.copyObject(object_from, object_to, read_settings, write_settings);
tx->addBlobToMetadata(to_path, blob_name, object_from.bytes_size); tx->addBlobToMetadata(to_path, blob_name, object_from.bytes_size);
@ -810,13 +818,10 @@ void DiskObjectStorageTransaction::createFile(const std::string & path)
})); }));
} }
void DiskObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings) void DiskObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings & write_settings)
{ {
/// NOTE: For native copy we can ignore throttling, so no need to use WriteSettings
UNUSED(settings);
operations_to_execute.emplace_back( operations_to_execute.emplace_back(
std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, from_file_path, to_file_path)); std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, read_settings, write_settings, from_file_path, to_file_path));
} }
void DiskObjectStorageTransaction::commit() void DiskObjectStorageTransaction::commit()

View File

@ -86,7 +86,7 @@ public:
void createFile(const String & path) override; void createFile(const String & path) override;
void copyFile(const std::string & from_file_path, const std::string & to_file_path, const WriteSettings & settings) override; void copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings &) override;
/// writeFile is a difficult function for transactions. /// writeFile is a difficult function for transactions.
/// Now it's almost noop because metadata added to transaction in finalize method /// Now it's almost noop because metadata added to transaction in finalize method

View File

@ -133,6 +133,8 @@ ObjectMetadata HDFSObjectStorage::getObjectMetadata(const std::string &) const
void HDFSObjectStorage::copyObject( /// NOLINT void HDFSObjectStorage::copyObject( /// NOLINT
const StoredObject & object_from, const StoredObject & object_from,
const StoredObject & object_to, const StoredObject & object_to,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
std::optional<ObjectAttributes> object_to_attributes) std::optional<ObjectAttributes> object_to_attributes)
{ {
if (object_to_attributes.has_value()) if (object_to_attributes.has_value())
@ -140,8 +142,8 @@ void HDFSObjectStorage::copyObject( /// NOLINT
ErrorCodes::UNSUPPORTED_METHOD, ErrorCodes::UNSUPPORTED_METHOD,
"HDFS API doesn't support custom attributes/metadata for stored objects"); "HDFS API doesn't support custom attributes/metadata for stored objects");
auto in = readObject(object_from); auto in = readObject(object_from, read_settings);
auto out = writeObject(object_to, WriteMode::Rewrite); auto out = writeObject(object_to, WriteMode::Rewrite, /* attributes= */ {}, /* buf_size= */ DBMS_DEFAULT_BUFFER_SIZE, write_settings);
copyData(*in, *out); copyData(*in, *out);
out->finalize(); out->finalize();
} }

View File

@ -98,6 +98,8 @@ public:
void copyObject( /// NOLINT void copyObject( /// NOLINT
const StoredObject & object_from, const StoredObject & object_from,
const StoredObject & object_to, const StoredObject & object_to,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
std::optional<ObjectAttributes> object_to_attributes = {}) override; std::optional<ObjectAttributes> object_to_attributes = {}) override;
void shutdown() override; void shutdown() override;

View File

@ -62,14 +62,16 @@ ThreadPool & IObjectStorage::getThreadPoolWriter()
void IObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT void IObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
const StoredObject & object_from, const StoredObject & object_from,
const StoredObject & object_to, const StoredObject & object_to,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
IObjectStorage & object_storage_to, IObjectStorage & object_storage_to,
std::optional<ObjectAttributes> object_to_attributes) std::optional<ObjectAttributes> object_to_attributes)
{ {
if (&object_storage_to == this) if (&object_storage_to == this)
copyObject(object_from, object_to, object_to_attributes); copyObject(object_from, object_to, read_settings, write_settings, object_to_attributes);
auto in = readObject(object_from); auto in = readObject(object_from, read_settings);
auto out = object_storage_to.writeObject(object_to, WriteMode::Rewrite); auto out = object_storage_to.writeObject(object_to, WriteMode::Rewrite, /* attributes= */ {}, /* buf_size= */ DBMS_DEFAULT_BUFFER_SIZE, write_settings);
copyData(*in, *out); copyData(*in, *out);
out->finalize(); out->finalize();
} }

View File

@ -131,6 +131,8 @@ public:
virtual void copyObject( /// NOLINT virtual void copyObject( /// NOLINT
const StoredObject & object_from, const StoredObject & object_from,
const StoredObject & object_to, const StoredObject & object_to,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
std::optional<ObjectAttributes> object_to_attributes = {}) = 0; std::optional<ObjectAttributes> object_to_attributes = {}) = 0;
/// Copy object to another instance of object storage /// Copy object to another instance of object storage
@ -139,6 +141,8 @@ public:
virtual void copyObjectToAnotherObjectStorage( /// NOLINT virtual void copyObjectToAnotherObjectStorage( /// NOLINT
const StoredObject & object_from, const StoredObject & object_from,
const StoredObject & object_to, const StoredObject & object_to,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
IObjectStorage & object_storage_to, IObjectStorage & object_storage_to,
std::optional<ObjectAttributes> object_to_attributes = {}); std::optional<ObjectAttributes> object_to_attributes = {});

View File

@ -167,10 +167,14 @@ ObjectMetadata LocalObjectStorage::getObjectMetadata(const std::string & /* path
} }
void LocalObjectStorage::copyObject( // NOLINT void LocalObjectStorage::copyObject( // NOLINT
const StoredObject & object_from, const StoredObject & object_to, std::optional<ObjectAttributes> /* object_to_attributes */) const StoredObject & object_from,
const StoredObject & object_to,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
std::optional<ObjectAttributes> /* object_to_attributes */)
{ {
auto in = readObject(object_from); auto in = readObject(object_from, read_settings);
auto out = writeObject(object_to, WriteMode::Rewrite); auto out = writeObject(object_to, WriteMode::Rewrite, /* attributes= */ {}, /* buf_size= */ DBMS_DEFAULT_BUFFER_SIZE, write_settings);
copyData(*in, *out); copyData(*in, *out);
out->finalize(); out->finalize();
} }

View File

@ -57,6 +57,8 @@ public:
void copyObject( /// NOLINT void copyObject( /// NOLINT
const StoredObject & object_from, const StoredObject & object_from,
const StoredObject & object_to, const StoredObject & object_to,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
std::optional<ObjectAttributes> object_to_attributes = {}) override; std::optional<ObjectAttributes> object_to_attributes = {}) override;
void shutdown() override; void shutdown() override;

View File

@ -425,6 +425,8 @@ ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) cons
void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
const StoredObject & object_from, const StoredObject & object_from,
const StoredObject & object_to, const StoredObject & object_to,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
IObjectStorage & object_storage_to, IObjectStorage & object_storage_to,
std::optional<ObjectAttributes> object_to_attributes) std::optional<ObjectAttributes> object_to_attributes)
{ {
@ -435,24 +437,48 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
auto settings_ptr = s3_settings.get(); auto settings_ptr = s3_settings.get();
auto size = S3::getObjectSize(*clients_->client, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true); auto size = S3::getObjectSize(*clients_->client, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy"); auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
copyS3File(clients_->client, clients_->client_with_long_timeout, bucket, object_from.remote_path, 0, size, dest_s3->bucket, object_to.remote_path, copyS3File(clients_->client,
settings_ptr->request_settings, object_to_attributes, scheduler, /* for_disk_s3= */ true); clients_->client_with_long_timeout,
bucket,
object_from.remote_path,
0,
size,
dest_s3->bucket,
object_to.remote_path,
settings_ptr->request_settings,
patchSettings(read_settings),
object_to_attributes,
scheduler,
/* for_disk_s3= */ true);
} }
else else
{ IObjectStorage::copyObjectToAnotherObjectStorage(object_from, object_to, read_settings, write_settings, object_storage_to, object_to_attributes);
IObjectStorage::copyObjectToAnotherObjectStorage(object_from, object_to, object_storage_to, object_to_attributes);
}
} }
void S3ObjectStorage::copyObject( // NOLINT void S3ObjectStorage::copyObject( // NOLINT
const StoredObject & object_from, const StoredObject & object_to, std::optional<ObjectAttributes> object_to_attributes) const StoredObject & object_from,
const StoredObject & object_to,
const ReadSettings & read_settings,
const WriteSettings &,
std::optional<ObjectAttributes> object_to_attributes)
{ {
auto clients_ = clients.get(); auto clients_ = clients.get();
auto settings_ptr = s3_settings.get(); auto settings_ptr = s3_settings.get();
auto size = S3::getObjectSize(*clients_->client, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true); auto size = S3::getObjectSize(*clients_->client, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy"); auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
copyS3File(clients_->client, clients_->client_with_long_timeout, bucket, object_from.remote_path, 0, size, bucket, object_to.remote_path, copyS3File(clients_->client,
settings_ptr->request_settings, object_to_attributes, scheduler, /* for_disk_s3= */ true); clients_->client_with_long_timeout,
bucket,
object_from.remote_path,
0,
size,
bucket,
object_to.remote_path,
settings_ptr->request_settings,
patchSettings(read_settings),
object_to_attributes,
scheduler,
/* for_disk_s3= */ true);
} }
void S3ObjectStorage::setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_) void S3ObjectStorage::setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_)

View File

@ -135,11 +135,15 @@ public:
void copyObject( /// NOLINT void copyObject( /// NOLINT
const StoredObject & object_from, const StoredObject & object_from,
const StoredObject & object_to, const StoredObject & object_to,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
std::optional<ObjectAttributes> object_to_attributes = {}) override; std::optional<ObjectAttributes> object_to_attributes = {}) override;
void copyObjectToAnotherObjectStorage( /// NOLINT void copyObjectToAnotherObjectStorage( /// NOLINT
const StoredObject & object_from, const StoredObject & object_from,
const StoredObject & object_to, const StoredObject & object_to,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
IObjectStorage & object_storage_to, IObjectStorage & object_storage_to,
std::optional<ObjectAttributes> object_to_attributes = {}) override; std::optional<ObjectAttributes> object_to_attributes = {}) override;

View File

@ -264,7 +264,7 @@ void WebObjectStorage::removeObjectsIfExist(const StoredObjects &)
throwNotAllowed(); throwNotAllowed();
} }
void WebObjectStorage::copyObject(const StoredObject &, const StoredObject &, std::optional<ObjectAttributes>) // NOLINT void WebObjectStorage::copyObject(const StoredObject &, const StoredObject &, const ReadSettings &, const WriteSettings &, std::optional<ObjectAttributes>) // NOLINT
{ {
throwNotAllowed(); throwNotAllowed();
} }

View File

@ -68,6 +68,8 @@ public:
void copyObject( /// NOLINT void copyObject( /// NOLINT
const StoredObject & object_from, const StoredObject & object_from,
const StoredObject & object_to, const StoredObject & object_to,
const ReadSettings & read_settings,
const WriteSettings & write_settings,
std::optional<ObjectAttributes> object_to_attributes = {}) override; std::optional<ObjectAttributes> object_to_attributes = {}) override;
void shutdown() override; void shutdown() override;

View File

@ -18,6 +18,10 @@
#include "vec_crc32.h" #include "vec_crc32.h"
#endif #endif
#if defined(__s390x__) && __BYTE_ORDER__==__ORDER_BIG_ENDIAN__
#include <crc32-s390x.h>
#endif
namespace DB namespace DB
{ {
@ -43,7 +47,7 @@ struct Hash
#elif (defined(__PPC64__) || defined(__powerpc64__)) && __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ #elif (defined(__PPC64__) || defined(__powerpc64__)) && __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
return crc32_ppc(crc, reinterpret_cast<const unsigned char *>(&val), sizeof(val)); return crc32_ppc(crc, reinterpret_cast<const unsigned char *>(&val), sizeof(val));
#elif defined(__s390x__) && __BYTE_ORDER__==__ORDER_BIG_ENDIAN__ #elif defined(__s390x__) && __BYTE_ORDER__==__ORDER_BIG_ENDIAN__
return s390x_crc32(crc, val); return crc32c_le(static_cast<UInt32>(crc), reinterpret_cast<unsigned char *>(&val), sizeof(val));
#else #else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "String hash is not implemented without sse4.2 support"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "String hash is not implemented without sse4.2 support");
#endif #endif
@ -58,7 +62,7 @@ struct Hash
#elif (defined(__PPC64__) || defined(__powerpc64__)) && __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ #elif (defined(__PPC64__) || defined(__powerpc64__)) && __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
return crc32_ppc(crc, reinterpret_cast<const unsigned char *>(&val), sizeof(val)); return crc32_ppc(crc, reinterpret_cast<const unsigned char *>(&val), sizeof(val));
#elif defined(__s390x__) && __BYTE_ORDER__==__ORDER_BIG_ENDIAN__ #elif defined(__s390x__) && __BYTE_ORDER__==__ORDER_BIG_ENDIAN__
return s390x_crc32_u32(crc, val); return crc32c_le(static_cast<UInt32>(crc), reinterpret_cast<unsigned char *>(&val), sizeof(val));
#else #else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "String hash is not implemented without sse4.2 support"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "String hash is not implemented without sse4.2 support");
#endif #endif
@ -73,7 +77,7 @@ struct Hash
#elif (defined(__PPC64__) || defined(__powerpc64__)) && __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ #elif (defined(__PPC64__) || defined(__powerpc64__)) && __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
return crc32_ppc(crc, reinterpret_cast<const unsigned char *>(&val), sizeof(val)); return crc32_ppc(crc, reinterpret_cast<const unsigned char *>(&val), sizeof(val));
#elif defined(__s390x__) && __BYTE_ORDER__==__ORDER_BIG_ENDIAN__ #elif defined(__s390x__) && __BYTE_ORDER__==__ORDER_BIG_ENDIAN__
return s390x_crc32_u16(crc, val); return crc32c_le(static_cast<UInt32>(crc), reinterpret_cast<unsigned char *>(&val), sizeof(val));
#else #else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "String hash is not implemented without sse4.2 support"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "String hash is not implemented without sse4.2 support");
#endif #endif
@ -88,7 +92,7 @@ struct Hash
#elif (defined(__PPC64__) || defined(__powerpc64__)) && __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ #elif (defined(__PPC64__) || defined(__powerpc64__)) && __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
return crc32_ppc(crc, reinterpret_cast<const unsigned char *>(&val), sizeof(val)); return crc32_ppc(crc, reinterpret_cast<const unsigned char *>(&val), sizeof(val));
#elif defined(__s390x__) && __BYTE_ORDER__==__ORDER_BIG_ENDIAN__ #elif defined(__s390x__) && __BYTE_ORDER__==__ORDER_BIG_ENDIAN__
return s390x_crc32_u8(crc, val); return crc32c_le(static_cast<UInt32>(crc), reinterpret_cast<unsigned char *>(&val), sizeof(val));
#else #else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "String hash is not implemented without sse4.2 support"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "String hash is not implemented without sse4.2 support");
#endif #endif

View File

@ -610,6 +610,7 @@ namespace
const String & dest_bucket_, const String & dest_bucket_,
const String & dest_key_, const String & dest_key_,
const S3Settings::RequestSettings & request_settings_, const S3Settings::RequestSettings & request_settings_,
const ReadSettings & read_settings_,
const std::optional<std::map<String, String>> & object_metadata_, const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunner<void> schedule_, ThreadPoolCallbackRunner<void> schedule_,
bool for_disk_s3_) bool for_disk_s3_)
@ -619,6 +620,7 @@ namespace
, offset(src_offset_) , offset(src_offset_)
, size(src_size_) , size(src_size_)
, supports_multipart_copy(client_ptr_->supportsMultiPartCopy()) , supports_multipart_copy(client_ptr_->supportsMultiPartCopy())
, read_settings(read_settings_)
{ {
} }
@ -639,12 +641,13 @@ namespace
size_t offset; size_t offset;
size_t size; size_t size;
bool supports_multipart_copy; bool supports_multipart_copy;
const ReadSettings read_settings;
CreateReadBuffer getSourceObjectReadBuffer() CreateReadBuffer getSourceObjectReadBuffer()
{ {
return [&] return [&]
{ {
return std::make_unique<ReadBufferFromS3>(client_ptr, src_bucket, src_key, "", request_settings, Context::getGlobalContextInstance()->getReadSettings()); return std::make_unique<ReadBufferFromS3>(client_ptr, src_bucket, src_key, "", request_settings, read_settings);
}; };
} }
@ -826,20 +829,21 @@ void copyS3File(
const String & dest_bucket, const String & dest_bucket,
const String & dest_key, const String & dest_key,
const S3Settings::RequestSettings & settings, const S3Settings::RequestSettings & settings,
const ReadSettings & read_settings,
const std::optional<std::map<String, String>> & object_metadata, const std::optional<std::map<String, String>> & object_metadata,
ThreadPoolCallbackRunner<void> schedule, ThreadPoolCallbackRunner<void> schedule,
bool for_disk_s3) bool for_disk_s3)
{ {
if (settings.allow_native_copy) if (settings.allow_native_copy)
{ {
CopyFileHelper helper{s3_client, s3_client_with_long_timeout, src_bucket, src_key, src_offset, src_size, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3}; CopyFileHelper helper{s3_client, s3_client_with_long_timeout, src_bucket, src_key, src_offset, src_size, dest_bucket, dest_key, settings, read_settings, object_metadata, schedule, for_disk_s3};
helper.performCopy(); helper.performCopy();
} }
else else
{ {
auto create_read_buffer = [&] auto create_read_buffer = [&]
{ {
return std::make_unique<ReadBufferFromS3>(s3_client, src_bucket, src_key, "", settings, Context::getGlobalContextInstance()->getReadSettings()); return std::make_unique<ReadBufferFromS3>(s3_client, src_bucket, src_key, "", settings, read_settings);
}; };
copyDataToS3File(create_read_buffer, src_offset, src_size, s3_client, s3_client_with_long_timeout, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3); copyDataToS3File(create_read_buffer, src_offset, src_size, s3_client, s3_client_with_long_timeout, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3);
} }

View File

@ -31,6 +31,8 @@ using CreateReadBuffer = std::function<std::unique_ptr<SeekableReadBuffer>()>;
/// CompleteMultipartUpload requests. These requests need longer timeout because S3 servers often /// CompleteMultipartUpload requests. These requests need longer timeout because S3 servers often
/// block on them for multiple seconds without sending or receiving data from us (maybe the servers /// block on them for multiple seconds without sending or receiving data from us (maybe the servers
/// are copying data internally, or maybe throttling, idk). /// are copying data internally, or maybe throttling, idk).
///
/// read_settings - is used for throttling in case of native copy is not possible
void copyS3File( void copyS3File(
const std::shared_ptr<const S3::Client> & s3_client, const std::shared_ptr<const S3::Client> & s3_client,
const std::shared_ptr<const S3::Client> & s3_client_with_long_timeout, const std::shared_ptr<const S3::Client> & s3_client_with_long_timeout,
@ -41,6 +43,7 @@ void copyS3File(
const String & dest_bucket, const String & dest_bucket,
const String & dest_key, const String & dest_key,
const S3Settings::RequestSettings & settings, const S3Settings::RequestSettings & settings,
const ReadSettings & read_settings,
const std::optional<std::map<String, String>> & object_metadata = std::nullopt, const std::optional<std::map<String, String>> & object_metadata = std::nullopt,
ThreadPoolCallbackRunner<void> schedule_ = {}, ThreadPoolCallbackRunner<void> schedule_ = {},
bool for_disk_s3 = false); bool for_disk_s3 = false);

View File

@ -7,77 +7,88 @@ namespace DB
{ {
bool parseIntervalKind(IParser::Pos & pos, Expected & expected, IntervalKind & result) bool parseIntervalKind(IParser::Pos & pos, Expected & expected, IntervalKind & result)
{ {
if (ParserKeyword("NANOSECOND").ignore(pos, expected) || ParserKeyword("SQL_TSI_NANOSECOND").ignore(pos, expected) if (ParserKeyword("NANOSECOND").ignore(pos, expected) || ParserKeyword("NANOSECONDS").ignore(pos, expected)
|| ParserKeyword("SQL_TSI_NANOSECOND").ignore(pos, expected)
|| ParserKeyword("NS").ignore(pos, expected)) || ParserKeyword("NS").ignore(pos, expected))
{ {
result = IntervalKind::Nanosecond; result = IntervalKind::Nanosecond;
return true; return true;
} }
if (ParserKeyword("MICROSECOND").ignore(pos, expected) || ParserKeyword("SQL_TSI_MICROSECOND").ignore(pos, expected) if (ParserKeyword("MICROSECOND").ignore(pos, expected) || ParserKeyword("MICROSECONDS").ignore(pos, expected)
|| ParserKeyword("SQL_TSI_MICROSECOND").ignore(pos, expected)
|| ParserKeyword("MCS").ignore(pos, expected)) || ParserKeyword("MCS").ignore(pos, expected))
{ {
result = IntervalKind::Microsecond; result = IntervalKind::Microsecond;
return true; return true;
} }
if (ParserKeyword("MILLISECOND").ignore(pos, expected) || ParserKeyword("SQL_TSI_MILLISECOND").ignore(pos, expected) if (ParserKeyword("MILLISECOND").ignore(pos, expected) || ParserKeyword("MILLISECONDS").ignore(pos, expected)
|| ParserKeyword("SQL_TSI_MILLISECOND").ignore(pos, expected)
|| ParserKeyword("MS").ignore(pos, expected)) || ParserKeyword("MS").ignore(pos, expected))
{ {
result = IntervalKind::Millisecond; result = IntervalKind::Millisecond;
return true; return true;
} }
if (ParserKeyword("SECOND").ignore(pos, expected) || ParserKeyword("SQL_TSI_SECOND").ignore(pos, expected) if (ParserKeyword("SECOND").ignore(pos, expected) || ParserKeyword("SECONDS").ignore(pos, expected)
|| ParserKeyword("SQL_TSI_SECOND").ignore(pos, expected)
|| ParserKeyword("SS").ignore(pos, expected) || ParserKeyword("S").ignore(pos, expected)) || ParserKeyword("SS").ignore(pos, expected) || ParserKeyword("S").ignore(pos, expected))
{ {
result = IntervalKind::Second; result = IntervalKind::Second;
return true; return true;
} }
if (ParserKeyword("MINUTE").ignore(pos, expected) || ParserKeyword("SQL_TSI_MINUTE").ignore(pos, expected) if (ParserKeyword("MINUTE").ignore(pos, expected) || ParserKeyword("MINUTES").ignore(pos, expected)
|| ParserKeyword("SQL_TSI_MINUTE").ignore(pos, expected)
|| ParserKeyword("MI").ignore(pos, expected) || ParserKeyword("N").ignore(pos, expected)) || ParserKeyword("MI").ignore(pos, expected) || ParserKeyword("N").ignore(pos, expected))
{ {
result = IntervalKind::Minute; result = IntervalKind::Minute;
return true; return true;
} }
if (ParserKeyword("HOUR").ignore(pos, expected) || ParserKeyword("SQL_TSI_HOUR").ignore(pos, expected) if (ParserKeyword("HOUR").ignore(pos, expected) || ParserKeyword("HOURS").ignore(pos, expected)
|| ParserKeyword("SQL_TSI_HOUR").ignore(pos, expected)
|| ParserKeyword("HH").ignore(pos, expected) || ParserKeyword("H").ignore(pos, expected)) || ParserKeyword("HH").ignore(pos, expected) || ParserKeyword("H").ignore(pos, expected))
{ {
result = IntervalKind::Hour; result = IntervalKind::Hour;
return true; return true;
} }
if (ParserKeyword("DAY").ignore(pos, expected) || ParserKeyword("SQL_TSI_DAY").ignore(pos, expected) if (ParserKeyword("DAY").ignore(pos, expected) || ParserKeyword("DAYS").ignore(pos, expected)
|| ParserKeyword("SQL_TSI_DAY").ignore(pos, expected)
|| ParserKeyword("DD").ignore(pos, expected) || ParserKeyword("D").ignore(pos, expected)) || ParserKeyword("DD").ignore(pos, expected) || ParserKeyword("D").ignore(pos, expected))
{ {
result = IntervalKind::Day; result = IntervalKind::Day;
return true; return true;
} }
if (ParserKeyword("WEEK").ignore(pos, expected) || ParserKeyword("SQL_TSI_WEEK").ignore(pos, expected) if (ParserKeyword("WEEK").ignore(pos, expected) || ParserKeyword("WEEKS").ignore(pos, expected)
|| ParserKeyword("SQL_TSI_WEEK").ignore(pos, expected)
|| ParserKeyword("WK").ignore(pos, expected) || ParserKeyword("WW").ignore(pos, expected)) || ParserKeyword("WK").ignore(pos, expected) || ParserKeyword("WW").ignore(pos, expected))
{ {
result = IntervalKind::Week; result = IntervalKind::Week;
return true; return true;
} }
if (ParserKeyword("MONTH").ignore(pos, expected) || ParserKeyword("SQL_TSI_MONTH").ignore(pos, expected) if (ParserKeyword("MONTH").ignore(pos, expected) || ParserKeyword("MONTHS").ignore(pos, expected)
|| ParserKeyword("SQL_TSI_MONTH").ignore(pos, expected)
|| ParserKeyword("MM").ignore(pos, expected) || ParserKeyword("M").ignore(pos, expected)) || ParserKeyword("MM").ignore(pos, expected) || ParserKeyword("M").ignore(pos, expected))
{ {
result = IntervalKind::Month; result = IntervalKind::Month;
return true; return true;
} }
if (ParserKeyword("QUARTER").ignore(pos, expected) || ParserKeyword("SQL_TSI_QUARTER").ignore(pos, expected) if (ParserKeyword("QUARTER").ignore(pos, expected) || ParserKeyword("QUARTERS").ignore(pos, expected)
|| ParserKeyword("SQL_TSI_QUARTER").ignore(pos, expected)
|| ParserKeyword("QQ").ignore(pos, expected) || ParserKeyword("Q").ignore(pos, expected)) || ParserKeyword("QQ").ignore(pos, expected) || ParserKeyword("Q").ignore(pos, expected))
{ {
result = IntervalKind::Quarter; result = IntervalKind::Quarter;
return true; return true;
} }
if (ParserKeyword("YEAR").ignore(pos, expected) || ParserKeyword("SQL_TSI_YEAR").ignore(pos, expected) if (ParserKeyword("YEAR").ignore(pos, expected) || ParserKeyword("YEARS").ignore(pos, expected)
|| ParserKeyword("SQL_TSI_YEAR").ignore(pos, expected)
|| ParserKeyword("YYYY").ignore(pos, expected) || ParserKeyword("YY").ignore(pos, expected)) || ParserKeyword("YYYY").ignore(pos, expected) || ParserKeyword("YY").ignore(pos, expected))
{ {
result = IntervalKind::Year; result = IntervalKind::Year;

View File

@ -382,7 +382,7 @@ void KeeperTCPHandler::runImpl()
} }
auto response_fd = poll_wrapper->getResponseFD(); auto response_fd = poll_wrapper->getResponseFD();
auto response_callback = [this, response_fd] (const Coordination::ZooKeeperResponsePtr & response) auto response_callback = [responses = this->responses, response_fd](const Coordination::ZooKeeperResponsePtr & response)
{ {
if (!responses->push(response)) if (!responses->push(response))
throw Exception(ErrorCodes::SYSTEM_ERROR, throw Exception(ErrorCodes::SYSTEM_ERROR,

View File

@ -25,7 +25,7 @@ struct SocketInterruptablePollWrapper;
using SocketInterruptablePollWrapperPtr = std::unique_ptr<SocketInterruptablePollWrapper>; using SocketInterruptablePollWrapperPtr = std::unique_ptr<SocketInterruptablePollWrapper>;
using ThreadSafeResponseQueue = ConcurrentBoundedQueue<Coordination::ZooKeeperResponsePtr>; using ThreadSafeResponseQueue = ConcurrentBoundedQueue<Coordination::ZooKeeperResponsePtr>;
using ThreadSafeResponseQueuePtr = std::unique_ptr<ThreadSafeResponseQueue>; using ThreadSafeResponseQueuePtr = std::shared_ptr<ThreadSafeResponseQueue>;
struct LastOp; struct LastOp;
using LastOpMultiVersion = MultiVersion<LastOp>; using LastOpMultiVersion = MultiVersion<LastOp>;

View File

@ -416,7 +416,8 @@ void DataPartStorageOnDiskBase::backup(
MutableDataPartStoragePtr DataPartStorageOnDiskBase::freeze( MutableDataPartStoragePtr DataPartStorageOnDiskBase::freeze(
const std::string & to, const std::string & to,
const std::string & dir_path, const std::string & dir_path,
const WriteSettings & settings, const ReadSettings & read_settings,
const WriteSettings & write_settings,
std::function<void(const DiskPtr &)> save_metadata_callback, std::function<void(const DiskPtr &)> save_metadata_callback,
const ClonePartParams & params) const const ClonePartParams & params) const
{ {
@ -430,7 +431,8 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::freeze(
disk, disk,
getRelativePath(), getRelativePath(),
fs::path(to) / dir_path, fs::path(to) / dir_path,
settings, read_settings,
write_settings,
params.make_source_readonly, params.make_source_readonly,
/* max_level= */ {}, /* max_level= */ {},
params.copy_instead_of_hardlink, params.copy_instead_of_hardlink,
@ -466,6 +468,7 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::clonePart(
const std::string & to, const std::string & to,
const std::string & dir_path, const std::string & dir_path,
const DiskPtr & dst_disk, const DiskPtr & dst_disk,
const ReadSettings & read_settings,
const WriteSettings & write_settings, const WriteSettings & write_settings,
Poco::Logger * log) const Poco::Logger * log) const
{ {
@ -482,7 +485,7 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::clonePart(
try try
{ {
dst_disk->createDirectories(to); dst_disk->createDirectories(to);
src_disk->copyDirectoryContent(getRelativePath(), dst_disk, path_to_clone, write_settings); src_disk->copyDirectoryContent(getRelativePath(), dst_disk, path_to_clone, read_settings, write_settings);
} }
catch (...) catch (...)
{ {

View File

@ -63,7 +63,8 @@ public:
MutableDataPartStoragePtr freeze( MutableDataPartStoragePtr freeze(
const std::string & to, const std::string & to,
const std::string & dir_path, const std::string & dir_path,
const WriteSettings & settings, const ReadSettings & read_settings,
const WriteSettings & write_settings,
std::function<void(const DiskPtr &)> save_metadata_callback, std::function<void(const DiskPtr &)> save_metadata_callback,
const ClonePartParams & params) const override; const ClonePartParams & params) const override;
@ -71,6 +72,7 @@ public:
const std::string & to, const std::string & to,
const std::string & dir_path, const std::string & dir_path,
const DiskPtr & dst_disk, const DiskPtr & dst_disk,
const ReadSettings & read_settings,
const WriteSettings & write_settings, const WriteSettings & write_settings,
Poco::Logger * log) const override; Poco::Logger * log) const override;

View File

@ -250,7 +250,8 @@ public:
virtual std::shared_ptr<IDataPartStorage> freeze( virtual std::shared_ptr<IDataPartStorage> freeze(
const std::string & to, const std::string & to,
const std::string & dir_path, const std::string & dir_path,
const WriteSettings & settings, const ReadSettings & read_settings,
const WriteSettings & write_settings,
std::function<void(const DiskPtr &)> save_metadata_callback, std::function<void(const DiskPtr &)> save_metadata_callback,
const ClonePartParams & params) const = 0; const ClonePartParams & params) const = 0;
@ -259,6 +260,7 @@ public:
const std::string & to, const std::string & to,
const std::string & dir_path, const std::string & dir_path,
const DiskPtr & disk, const DiskPtr & disk,
const ReadSettings & read_settings,
const WriteSettings & write_settings, const WriteSettings & write_settings,
Poco::Logger * log) const = 0; Poco::Logger * log) const = 0;

View File

@ -1793,12 +1793,13 @@ DataPartStoragePtr IMergeTreeDataPart::makeCloneInDetached(const String & prefix
return getDataPartStorage().freeze( return getDataPartStorage().freeze(
storage.relative_data_path, storage.relative_data_path,
*maybe_path_in_detached, *maybe_path_in_detached,
Context::getGlobalContextInstance()->getReadSettings(),
Context::getGlobalContextInstance()->getWriteSettings(), Context::getGlobalContextInstance()->getWriteSettings(),
/* save_metadata_callback= */ {}, /* save_metadata_callback= */ {},
params); params);
} }
MutableDataPartStoragePtr IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & disk, const String & directory_name, const WriteSettings & write_settings) const MutableDataPartStoragePtr IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & disk, const String & directory_name, const ReadSettings & read_settings, const WriteSettings & write_settings) const
{ {
assertOnDisk(); assertOnDisk();
@ -1808,7 +1809,7 @@ MutableDataPartStoragePtr IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & di
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not clone data part {} to empty directory.", name); throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not clone data part {} to empty directory.", name);
String path_to_clone = fs::path(storage.relative_data_path) / directory_name / ""; String path_to_clone = fs::path(storage.relative_data_path) / directory_name / "";
return getDataPartStorage().clonePart(path_to_clone, getDataPartStorage().getPartDirectory(), disk, write_settings, storage.log); return getDataPartStorage().clonePart(path_to_clone, getDataPartStorage().getPartDirectory(), disk, read_settings, write_settings, storage.log);
} }
UInt64 IMergeTreeDataPart::getIndexSizeFromFile() const UInt64 IMergeTreeDataPart::getIndexSizeFromFile() const

View File

@ -377,7 +377,7 @@ public:
const DiskTransactionPtr & disk_transaction) const; const DiskTransactionPtr & disk_transaction) const;
/// Makes full clone of part in specified subdirectory (relative to storage data directory, e.g. "detached") on another disk /// Makes full clone of part in specified subdirectory (relative to storage data directory, e.g. "detached") on another disk
MutableDataPartStoragePtr makeCloneOnDisk(const DiskPtr & disk, const String & directory_name, const WriteSettings & write_settings) const; MutableDataPartStoragePtr makeCloneOnDisk(const DiskPtr & disk, const String & directory_name, const ReadSettings & read_settings, const WriteSettings & write_settings) const;
/// Checks that .bin and .mrk files exist. /// Checks that .bin and .mrk files exist.
/// ///

View File

@ -4968,7 +4968,7 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String &
throw Exception(ErrorCodes::UNKNOWN_DISK, "All parts of partition '{}' are already on disk '{}'", partition_id, disk->getName()); throw Exception(ErrorCodes::UNKNOWN_DISK, "All parts of partition '{}' are already on disk '{}'", partition_id, disk->getName());
} }
MovePartsOutcome moves_outcome = movePartsToSpace(parts, std::static_pointer_cast<Space>(disk), local_context->getWriteSettings()); MovePartsOutcome moves_outcome = movePartsToSpace(parts, std::static_pointer_cast<Space>(disk), local_context->getReadSettings(), local_context->getWriteSettings());
switch (moves_outcome) switch (moves_outcome)
{ {
case MovePartsOutcome::MovesAreCancelled: case MovePartsOutcome::MovesAreCancelled:
@ -5031,7 +5031,7 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
throw Exception(ErrorCodes::UNKNOWN_DISK, "All parts of partition '{}' are already on volume '{}'", partition_id, volume->getName()); throw Exception(ErrorCodes::UNKNOWN_DISK, "All parts of partition '{}' are already on volume '{}'", partition_id, volume->getName());
} }
MovePartsOutcome moves_outcome = movePartsToSpace(parts, std::static_pointer_cast<Space>(volume), local_context->getWriteSettings()); MovePartsOutcome moves_outcome = movePartsToSpace(parts, std::static_pointer_cast<Space>(volume), local_context->getReadSettings(), local_context->getWriteSettings());
switch (moves_outcome) switch (moves_outcome)
{ {
case MovePartsOutcome::MovesAreCancelled: case MovePartsOutcome::MovesAreCancelled:
@ -7488,6 +7488,7 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
const MergeTreePartInfo & dst_part_info, const MergeTreePartInfo & dst_part_info,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
const IDataPartStorage::ClonePartParams & params, const IDataPartStorage::ClonePartParams & params,
const ReadSettings & read_settings,
const WriteSettings & write_settings) const WriteSettings & write_settings)
{ {
/// Check that the storage policy contains the disk where the src_part is located. /// Check that the storage policy contains the disk where the src_part is located.
@ -7545,6 +7546,7 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
auto dst_part_storage = src_part_storage->freeze( auto dst_part_storage = src_part_storage->freeze(
relative_data_path, relative_data_path,
tmp_dst_part_name, tmp_dst_part_name,
read_settings,
write_settings, write_settings,
/* save_metadata_callback= */ {}, /* save_metadata_callback= */ {},
params); params);
@ -7803,6 +7805,7 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(
auto new_storage = data_part_storage->freeze( auto new_storage = data_part_storage->freeze(
backup_part_path, backup_part_path,
part->getDataPartStorage().getPartDirectory(), part->getDataPartStorage().getPartDirectory(),
local_context->getReadSettings(),
local_context->getWriteSettings(), local_context->getWriteSettings(),
callback, callback,
params); params);
@ -8002,8 +8005,9 @@ bool MergeTreeData::scheduleDataMovingJob(BackgroundJobsAssignee & assignee)
assignee.scheduleMoveTask(std::make_shared<ExecutableLambdaAdapter>( assignee.scheduleMoveTask(std::make_shared<ExecutableLambdaAdapter>(
[this, moving_tagger] () mutable [this, moving_tagger] () mutable
{ {
ReadSettings read_settings = Context::getGlobalContextInstance()->getReadSettings();
WriteSettings write_settings = Context::getGlobalContextInstance()->getWriteSettings(); WriteSettings write_settings = Context::getGlobalContextInstance()->getWriteSettings();
return moveParts(moving_tagger, write_settings, /* wait_for_move_if_zero_copy= */ false) == MovePartsOutcome::PartsMoved; return moveParts(moving_tagger, read_settings, write_settings, /* wait_for_move_if_zero_copy= */ false) == MovePartsOutcome::PartsMoved;
}, moves_assignee_trigger, getStorageID())); }, moves_assignee_trigger, getStorageID()));
return true; return true;
} }
@ -8018,7 +8022,7 @@ bool MergeTreeData::areBackgroundMovesNeeded() const
return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->getDisks().size() > 1; return policy->getVolumes().size() == 1 && policy->getVolumes()[0]->getDisks().size() > 1;
} }
MovePartsOutcome MergeTreeData::movePartsToSpace(const DataPartsVector & parts, SpacePtr space, const WriteSettings & write_settings) MovePartsOutcome MergeTreeData::movePartsToSpace(const DataPartsVector & parts, SpacePtr space, const ReadSettings & read_settings, const WriteSettings & write_settings)
{ {
if (parts_mover.moves_blocker.isCancelled()) if (parts_mover.moves_blocker.isCancelled())
return MovePartsOutcome::MovesAreCancelled; return MovePartsOutcome::MovesAreCancelled;
@ -8027,7 +8031,7 @@ MovePartsOutcome MergeTreeData::movePartsToSpace(const DataPartsVector & parts,
if (moving_tagger->parts_to_move.empty()) if (moving_tagger->parts_to_move.empty())
return MovePartsOutcome::NothingToMove; return MovePartsOutcome::NothingToMove;
return moveParts(moving_tagger, write_settings, /* wait_for_move_if_zero_copy= */ true); return moveParts(moving_tagger, read_settings, write_settings, /* wait_for_move_if_zero_copy= */ true);
} }
MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::selectPartsForMove() MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::selectPartsForMove()
@ -8082,7 +8086,7 @@ MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::checkPartsForMove(co
return std::make_shared<CurrentlyMovingPartsTagger>(std::move(parts_to_move), *this); return std::make_shared<CurrentlyMovingPartsTagger>(std::move(parts_to_move), *this);
} }
MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, const WriteSettings & write_settings, bool wait_for_move_if_zero_copy) MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, const ReadSettings & read_settings, const WriteSettings & write_settings, bool wait_for_move_if_zero_copy)
{ {
LOG_INFO(log, "Got {} parts to move.", moving_tagger->parts_to_move.size()); LOG_INFO(log, "Got {} parts to move.", moving_tagger->parts_to_move.size());
@ -8143,7 +8147,7 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr &
{ {
if (lock->isLocked()) if (lock->isLocked())
{ {
cloned_part = parts_mover.clonePart(moving_part, write_settings); cloned_part = parts_mover.clonePart(moving_part, read_settings, write_settings);
parts_mover.swapClonedPart(cloned_part); parts_mover.swapClonedPart(cloned_part);
break; break;
} }
@ -8170,7 +8174,7 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr &
} }
else /// Ordinary move as it should be else /// Ordinary move as it should be
{ {
cloned_part = parts_mover.clonePart(moving_part, write_settings); cloned_part = parts_mover.clonePart(moving_part, read_settings, write_settings);
parts_mover.swapClonedPart(cloned_part); parts_mover.swapClonedPart(cloned_part);
} }
write_part_log({}); write_part_log({});

View File

@ -848,6 +848,7 @@ public:
const MergeTreePartInfo & dst_part_info, const MergeTreePartInfo & dst_part_info,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
const IDataPartStorage::ClonePartParams & params, const IDataPartStorage::ClonePartParams & params,
const ReadSettings & read_settings,
const WriteSettings & write_settings); const WriteSettings & write_settings);
virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0; virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0;
@ -1340,7 +1341,7 @@ protected:
/// MergeTree because they store mutations in different way. /// MergeTree because they store mutations in different way.
virtual std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const DataPartPtr & part) const = 0; virtual std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const DataPartPtr & part) const = 0;
/// Moves part to specified space, used in ALTER ... MOVE ... queries /// Moves part to specified space, used in ALTER ... MOVE ... queries
MovePartsOutcome movePartsToSpace(const DataPartsVector & parts, SpacePtr space, const WriteSettings & write_settings); MovePartsOutcome movePartsToSpace(const DataPartsVector & parts, SpacePtr space, const ReadSettings & read_settings, const WriteSettings & write_settings);
struct PartBackupEntries struct PartBackupEntries
{ {
@ -1494,7 +1495,7 @@ private:
using CurrentlyMovingPartsTaggerPtr = std::shared_ptr<CurrentlyMovingPartsTagger>; using CurrentlyMovingPartsTaggerPtr = std::shared_ptr<CurrentlyMovingPartsTagger>;
/// Move selected parts to corresponding disks /// Move selected parts to corresponding disks
MovePartsOutcome moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, const WriteSettings & write_settings, bool wait_for_move_if_zero_copy); MovePartsOutcome moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, const ReadSettings & read_settings, const WriteSettings & write_settings, bool wait_for_move_if_zero_copy);
/// Select parts for move and disks for them. Used in background moving processes. /// Select parts for move and disks for them. Used in background moving processes.
CurrentlyMovingPartsTaggerPtr selectPartsForMove(); CurrentlyMovingPartsTaggerPtr selectPartsForMove();

View File

@ -208,7 +208,7 @@ bool MergeTreePartsMover::selectPartsForMove(
return false; return false;
} }
MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const MergeTreeMoveEntry & moving_part, const WriteSettings & write_settings) const MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const MergeTreeMoveEntry & moving_part, const ReadSettings & read_settings, const WriteSettings & write_settings) const
{ {
if (moves_blocker.isCancelled()) if (moves_blocker.isCancelled())
throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts."); throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts.");
@ -249,12 +249,12 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
{ {
LOG_INFO(log, "Part {} was not fetched, we are the first who move it to another disk, so we will copy it", part->name); LOG_INFO(log, "Part {} was not fetched, we are the first who move it to another disk, so we will copy it", part->name);
cloned_part_storage = part->getDataPartStorage().clonePart( cloned_part_storage = part->getDataPartStorage().clonePart(
path_to_clone, part->getDataPartStorage().getPartDirectory(), disk, write_settings, log); path_to_clone, part->getDataPartStorage().getPartDirectory(), disk, read_settings, write_settings, log);
} }
} }
else else
{ {
cloned_part_storage = part->makeCloneOnDisk(disk, MergeTreeData::MOVING_DIR_NAME, write_settings); cloned_part_storage = part->makeCloneOnDisk(disk, MergeTreeData::MOVING_DIR_NAME, read_settings, write_settings);
} }
MergeTreeDataPartBuilder builder(*data, part->name, cloned_part_storage); MergeTreeDataPartBuilder builder(*data, part->name, cloned_part_storage);

View File

@ -65,7 +65,7 @@ public:
const std::lock_guard<std::mutex> & moving_parts_lock); const std::lock_guard<std::mutex> & moving_parts_lock);
/// Copies part to selected reservation in detached folder. Throws exception if part already exists. /// Copies part to selected reservation in detached folder. Throws exception if part already exists.
TemporaryClonedPart clonePart(const MergeTreeMoveEntry & moving_part, const WriteSettings & write_settings) const; TemporaryClonedPart clonePart(const MergeTreeMoveEntry & moving_part, const ReadSettings & read_settings, const WriteSettings & write_settings) const;
/// Replaces cloned part from detached directory into active data parts set. /// Replaces cloned part from detached directory into active data parts set.
/// Replacing part changes state to DeleteOnDestroy and will be removed from disk after destructor of /// Replacing part changes state to DeleteOnDestroy and will be removed from disk after destructor of

View File

@ -1845,7 +1845,14 @@ bool MutateTask::prepare()
.txn = ctx->txn, .hardlinked_files = &ctx->hardlinked_files, .txn = ctx->txn, .hardlinked_files = &ctx->hardlinked_files,
.files_to_copy_instead_of_hardlinks = std::move(files_to_copy_instead_of_hardlinks), .keep_metadata_version = true .files_to_copy_instead_of_hardlinks = std::move(files_to_copy_instead_of_hardlinks), .keep_metadata_version = true
}; };
auto [part, lock] = ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, prefix, ctx->future_part->part_info, ctx->metadata_snapshot, clone_params, ctx->context->getWriteSettings()); auto [part, lock] = ctx->data->cloneAndLoadDataPartOnSameDisk(
ctx->source_part,
prefix,
ctx->future_part->part_info,
ctx->metadata_snapshot,
clone_params,
ctx->context->getReadSettings(),
ctx->context->getWriteSettings());
part->getDataPartStorage().beginTransaction(); part->getDataPartStorage().beginTransaction();
ctx->temporary_directory_lock = std::move(lock); ctx->temporary_directory_lock = std::move(lock);

View File

@ -21,7 +21,8 @@ void localBackupImpl(
IDiskTransaction * transaction, IDiskTransaction * transaction,
const String & source_path, const String & source_path,
const String & destination_path, const String & destination_path,
const WriteSettings & settings, const ReadSettings & read_settings,
const WriteSettings & write_settings,
bool make_source_readonly, bool make_source_readonly,
size_t level, size_t level,
std::optional<size_t> max_level, std::optional<size_t> max_level,
@ -56,13 +57,9 @@ void localBackupImpl(
if (copy_instead_of_hardlinks || files_to_copy_instead_of_hardlinks.contains(it->name())) if (copy_instead_of_hardlinks || files_to_copy_instead_of_hardlinks.contains(it->name()))
{ {
if (transaction) if (transaction)
{ transaction->copyFile(source, destination, read_settings, write_settings);
transaction->copyFile(source, destination, settings);
}
else else
{ disk->copyFile(source, *disk, destination, read_settings, write_settings);
disk->copyFile(source, *disk, destination, settings);
}
} }
else else
{ {
@ -79,7 +76,8 @@ void localBackupImpl(
transaction, transaction,
source, source,
destination, destination,
settings, read_settings,
write_settings,
make_source_readonly, make_source_readonly,
level + 1, level + 1,
max_level, max_level,
@ -129,7 +127,8 @@ void localBackup(
const DiskPtr & disk, const DiskPtr & disk,
const String & source_path, const String & source_path,
const String & destination_path, const String & destination_path,
const WriteSettings & settings, const ReadSettings & read_settings,
const WriteSettings & write_settings,
bool make_source_readonly, bool make_source_readonly,
std::optional<size_t> max_level, std::optional<size_t> max_level,
bool copy_instead_of_hardlinks, bool copy_instead_of_hardlinks,
@ -160,7 +159,8 @@ void localBackup(
disk_transaction.get(), disk_transaction.get(),
source_path, source_path,
destination_path, destination_path,
settings, read_settings,
write_settings,
make_source_readonly, make_source_readonly,
/* level= */ 0, /* level= */ 0,
max_level, max_level,
@ -170,7 +170,7 @@ void localBackup(
else if (copy_instead_of_hardlinks) else if (copy_instead_of_hardlinks)
{ {
CleanupOnFail cleanup([disk, destination_path]() { disk->removeRecursive(destination_path); }); CleanupOnFail cleanup([disk, destination_path]() { disk->removeRecursive(destination_path); });
disk->copyDirectoryContent(source_path, disk, destination_path, settings); disk->copyDirectoryContent(source_path, disk, destination_path, read_settings, write_settings);
cleanup.success(); cleanup.success();
} }
else else
@ -189,7 +189,8 @@ void localBackup(
disk_transaction.get(), disk_transaction.get(),
source_path, source_path,
destination_path, destination_path,
settings, read_settings,
write_settings,
make_source_readonly, make_source_readonly,
/* level= */ 0, /* level= */ 0,
max_level, max_level,

View File

@ -28,7 +28,8 @@ struct WriteSettings;
const DiskPtr & disk, const DiskPtr & disk,
const String & source_path, const String & source_path,
const String & destination_path, const String & destination_path,
const WriteSettings & settings, const ReadSettings & read_settings,
const WriteSettings & write_settings,
bool make_source_readonly = true, bool make_source_readonly = true,
std::optional<size_t> max_level = {}, std::optional<size_t> max_level = {},
bool copy_instead_of_hardlinks = false, bool copy_instead_of_hardlinks = false,

View File

@ -2043,7 +2043,14 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level); MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
IDataPartStorage::ClonePartParams clone_params{.txn = local_context->getCurrentTransaction()}; IDataPartStorage::ClonePartParams clone_params{.txn = local_context->getCurrentTransaction()};
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, my_metadata_snapshot, clone_params, local_context->getWriteSettings()); auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(
src_part,
TMP_PREFIX,
dst_part_info,
my_metadata_snapshot,
clone_params,
local_context->getReadSettings(),
local_context->getWriteSettings());
dst_parts.emplace_back(std::move(dst_part)); dst_parts.emplace_back(std::move(dst_part));
dst_parts_locks.emplace_back(std::move(part_lock)); dst_parts_locks.emplace_back(std::move(part_lock));
} }
@ -2142,7 +2149,15 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level); MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
IDataPartStorage::ClonePartParams clone_params{.txn = local_context->getCurrentTransaction()}; IDataPartStorage::ClonePartParams clone_params{.txn = local_context->getCurrentTransaction()};
auto [dst_part, part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, clone_params, local_context->getWriteSettings()); auto [dst_part, part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(
src_part,
TMP_PREFIX,
dst_part_info,
dest_metadata_snapshot,
clone_params,
local_context->getReadSettings(),
local_context->getWriteSettings()
);
dst_parts.emplace_back(std::move(dst_part)); dst_parts.emplace_back(std::move(dst_part));
dst_parts_locks.emplace_back(std::move(part_lock)); dst_parts_locks.emplace_back(std::move(part_lock));
} }

View File

@ -2473,7 +2473,13 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
.metadata_version_to_write = metadata_snapshot->getMetadataVersion() .metadata_version_to_write = metadata_snapshot->getMetadataVersion()
}; };
auto [res_part, temporary_part_lock] = cloneAndLoadDataPartOnSameDisk( auto [res_part, temporary_part_lock] = cloneAndLoadDataPartOnSameDisk(
part_desc->src_table_part, TMP_PREFIX + "clone_", part_desc->new_part_info, metadata_snapshot, clone_params, getContext()->getWriteSettings()); part_desc->src_table_part,
TMP_PREFIX + "clone_",
part_desc->new_part_info,
metadata_snapshot,
clone_params,
getContext()->getReadSettings(),
getContext()->getWriteSettings());
part_desc->res_part = std::move(res_part); part_desc->res_part = std::move(res_part);
part_desc->temporary_part_lock = std::move(temporary_part_lock); part_desc->temporary_part_lock = std::move(temporary_part_lock);
} }
@ -4568,7 +4574,14 @@ bool StorageReplicatedMergeTree::fetchPart(
{ {
chassert(!is_zero_copy_part(part_to_clone)); chassert(!is_zero_copy_part(part_to_clone));
IDataPartStorage::ClonePartParams clone_params{ .keep_metadata_version = true }; IDataPartStorage::ClonePartParams clone_params{ .keep_metadata_version = true };
auto [cloned_part, lock] = cloneAndLoadDataPartOnSameDisk(part_to_clone, "tmp_clone_", part_info, metadata_snapshot, clone_params, getContext()->getWriteSettings()); auto [cloned_part, lock] = cloneAndLoadDataPartOnSameDisk(
part_to_clone,
"tmp_clone_",
part_info,
metadata_snapshot,
clone_params,
getContext()->getReadSettings(),
getContext()->getWriteSettings());
part_directory_lock = std::move(lock); part_directory_lock = std::move(lock);
return cloned_part; return cloned_part;
}; };
@ -7656,7 +7669,14 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
.copy_instead_of_hardlink = zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport(), .copy_instead_of_hardlink = zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport(),
.metadata_version_to_write = metadata_snapshot->getMetadataVersion() .metadata_version_to_write = metadata_snapshot->getMetadataVersion()
}; };
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, metadata_snapshot, clone_params, query_context->getWriteSettings()); auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(
src_part,
TMP_PREFIX,
dst_part_info,
metadata_snapshot,
clone_params,
query_context->getReadSettings(),
query_context->getWriteSettings());
src_parts.emplace_back(src_part); src_parts.emplace_back(src_part);
dst_parts.emplace_back(dst_part); dst_parts.emplace_back(dst_part);
dst_parts_locks.emplace_back(std::move(part_lock)); dst_parts_locks.emplace_back(std::move(part_lock));
@ -7896,7 +7916,14 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
.copy_instead_of_hardlink = zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport(), .copy_instead_of_hardlink = zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport(),
.metadata_version_to_write = dest_metadata_snapshot->getMetadataVersion() .metadata_version_to_write = dest_metadata_snapshot->getMetadataVersion()
}; };
auto [dst_part, dst_part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, clone_params, query_context->getWriteSettings()); auto [dst_part, dst_part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(
src_part,
TMP_PREFIX,
dst_part_info,
dest_metadata_snapshot,
clone_params,
query_context->getReadSettings(),
query_context->getWriteSettings());
src_parts.emplace_back(src_part); src_parts.emplace_back(src_part);
dst_parts.emplace_back(dst_part); dst_parts.emplace_back(dst_part);

View File

@ -0,0 +1,2 @@
native_copy 0
no_native_copy 1

View File

@ -0,0 +1,36 @@
#!/usr/bin/env bash
# Tags: no-fasttest
# Tag: no-fasttest - requires S3
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -nm -q "
drop table if exists data;
create table data (key UInt64 CODEC(NONE)) engine=MergeTree() order by tuple() settings min_bytes_for_wide_part=1e9, disk='s3_disk';
-- reading 1e6*8 bytes with 1M bandwith it should take (8-1)/1=7 seconds
insert into data select * from numbers(1e6);
"
query_id=$(random_str 10)
$CLICKHOUSE_CLIENT --query_id "$query_id" -q "backup table data to S3(s3_conn, 'backups/$CLICKHOUSE_DATABASE/data/backup2') SETTINGS allow_s3_native_copy=1" --max_backup_bandwidth=1M > /dev/null
$CLICKHOUSE_CLIENT -nm -q "
SYSTEM FLUSH LOGS;
SELECT
'native_copy',
query_duration_ms >= 7e3
FROM system.query_log
WHERE current_database = '$CLICKHOUSE_DATABASE' AND query_id = '$query_id' AND type != 'QueryStart'
"
query_id=$(random_str 10)
$CLICKHOUSE_CLIENT --query_id "$query_id" -q "backup table data to S3(s3_conn, 'backups/$CLICKHOUSE_DATABASE/data/backup3') SETTINGS allow_s3_native_copy=0" --max_backup_bandwidth=1M > /dev/null
$CLICKHOUSE_CLIENT -nm -q "
SYSTEM FLUSH LOGS;
SELECT
'no_native_copy',
query_duration_ms >= 7e3
FROM system.query_log
WHERE current_database = '$CLICKHOUSE_DATABASE' AND query_id = '$query_id' AND type != 'QueryStart'
"

View File

@ -0,0 +1,23 @@
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2009-02-14 01:31:30
2009-02-14 01:31:30
2009-02-15 23:31:30
2009-02-15 23:31:30
2009-02-15 23:31:30

View File

@ -0,0 +1,23 @@
SELECT INTERVAL 2 year;
SELECT INTERVAL 2 years;
SELECT INTERVAL '2 years';
SELECT INTERVAL 2 month;
SELECT INTERVAL 2 months;
SELECT INTERVAL '2 months';
SELECT INTERVAL 2 week;
SELECT INTERVAL 2 weeks;
SELECT INTERVAL '2 weeks';
SELECT INTERVAL 2 day;
SELECT INTERVAL 2 days;
SELECT INTERVAL '2 days';
SELECT INTERVAL 2 hour;
SELECT INTERVAL 2 hours;
SELECT INTERVAL '2 hours';
SELECT INTERVAL 2 minute;
SELECT INTERVAL 2 minutes;
SELECT INTERVAL '2 minutes';
SELECT DATE_ADD(hour, 2, toDateTime(1234567890, 'UTC'));
SELECT DATE_ADD(hours, 2, toDateTime(1234567890, 'UTC'));
SELECT DATE_ADD(toDateTime(1234567890, 'UTC'), INTERVAL 2 day);
SELECT DATE_ADD(toDateTime(1234567890, 'UTC'), INTERVAL 2 days);
SELECT DATE_ADD(toDateTime(1234567890, 'UTC'), INTERVAL '2 days');