mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 01:22:04 +00:00
[update] Integrate cloneAndLoadDataPartOnSamePart and cloneAndLoadDataPart into a function, controlled by parameters; integrate remoteBackup and localBackup into a class, and change the name to Backup.
This commit is contained in:
parent
26182ca2c7
commit
6e83fe9015
1
data/default/local_table
Symbolic link
1
data/default/local_table
Symbolic link
@ -0,0 +1 @@
|
||||
/data/home/unashi/ck_issue/ClickHouse/store/39b/39b798e4-1787-4b2e-971f-c4f092bf0cde/
|
2
metadata/INFORMATION_SCHEMA.sql
Normal file
2
metadata/INFORMATION_SCHEMA.sql
Normal file
@ -0,0 +1,2 @@
|
||||
ATTACH DATABASE INFORMATION_SCHEMA
|
||||
ENGINE = Memory
|
1
metadata/default
Symbolic link
1
metadata/default
Symbolic link
@ -0,0 +1 @@
|
||||
/data/home/unashi/ck_issue/ClickHouse/store/c39/c3900d2c-f110-426e-b693-ceaf42d2362c/
|
2
metadata/default.sql
Normal file
2
metadata/default.sql
Normal file
@ -0,0 +1,2 @@
|
||||
ATTACH DATABASE _ UUID 'c3900d2c-f110-426e-b693-ceaf42d2362c'
|
||||
ENGINE = Atomic
|
2
metadata/information_schema.sql
Normal file
2
metadata/information_schema.sql
Normal file
@ -0,0 +1,2 @@
|
||||
ATTACH DATABASE information_schema
|
||||
ENGINE = Memory
|
1
metadata/system
Symbolic link
1
metadata/system
Symbolic link
@ -0,0 +1 @@
|
||||
/data/home/unashi/ck_issue/ClickHouse/store/f47/f47c2a69-345f-476e-ac54-5c1a9acc883b/
|
2
metadata/system.sql
Normal file
2
metadata/system.sql
Normal file
@ -0,0 +1,2 @@
|
||||
ATTACH DATABASE _ UUID 'f47c2a69-345f-476e-ac54-5c1a9acc883b'
|
||||
ENGINE = Atomic
|
44
preprocessed_configs/config.xml
Normal file
44
preprocessed_configs/config.xml
Normal file
@ -0,0 +1,44 @@
|
||||
<!-- This file was generated automatically.
|
||||
Do not edit it: it is likely to be discarded and generated again before it's read next time.
|
||||
Files used to generate this file:
|
||||
config.xml -->
|
||||
|
||||
<!-- Config that is used when server is run without config file. -->
|
||||
<clickhouse>
|
||||
<logger>
|
||||
<level>trace</level>
|
||||
<console>true</console>
|
||||
</logger>
|
||||
|
||||
<http_port>8123</http_port>
|
||||
<tcp_port>9000</tcp_port>
|
||||
<mysql_port>9004</mysql_port>
|
||||
|
||||
<path>./</path>
|
||||
|
||||
<mlock_executable>true</mlock_executable>
|
||||
|
||||
<users>
|
||||
<default>
|
||||
<password/>
|
||||
|
||||
<networks>
|
||||
<ip>::/0</ip>
|
||||
</networks>
|
||||
|
||||
<profile>default</profile>
|
||||
<quota>default</quota>
|
||||
|
||||
<access_management>1</access_management>
|
||||
<named_collection_control>1</named_collection_control>
|
||||
</default>
|
||||
</users>
|
||||
|
||||
<profiles>
|
||||
<default/>
|
||||
</profiles>
|
||||
|
||||
<quotas>
|
||||
<default/>
|
||||
</quotas>
|
||||
</clickhouse>
|
@ -1,4 +1,4 @@
|
||||
#include "localBackup.h"
|
||||
#include "Backup.h"
|
||||
|
||||
#include <Common/Exception.h>
|
||||
#include <string>
|
||||
@ -16,8 +16,9 @@ namespace ErrorCodes
|
||||
namespace
|
||||
{
|
||||
|
||||
void localBackupImpl(
|
||||
const DiskPtr & disk,
|
||||
void BackupImpl(
|
||||
const DiskPtr & src_disk,
|
||||
const DiskPtr & dst_disk,
|
||||
IDiskTransaction * transaction,
|
||||
const String & source_path,
|
||||
const String & destination_path,
|
||||
@ -38,41 +39,42 @@ void localBackupImpl(
|
||||
if (transaction)
|
||||
transaction->createDirectories(destination_path);
|
||||
else
|
||||
disk->createDirectories(destination_path);
|
||||
dst_disk->createDirectories(destination_path);
|
||||
|
||||
for (auto it = disk->iterateDirectory(source_path); it->isValid(); it->next())
|
||||
for (auto it = src_disk->iterateDirectory(source_path); it->isValid(); it->next())
|
||||
{
|
||||
auto source = it->path();
|
||||
auto destination = fs::path(destination_path) / it->name();
|
||||
|
||||
if (!disk->isDirectory(source))
|
||||
if (!src_disk->isDirectory(source))
|
||||
{
|
||||
if (make_source_readonly)
|
||||
{
|
||||
if (transaction)
|
||||
transaction->setReadOnly(source);
|
||||
else
|
||||
disk->setReadOnly(source);
|
||||
src_disk->setReadOnly(source);
|
||||
}
|
||||
if (copy_instead_of_hardlinks || files_to_copy_instead_of_hardlinks.contains(it->name()))
|
||||
{
|
||||
if (transaction)
|
||||
transaction->copyFile(source, destination, read_settings, write_settings);
|
||||
else
|
||||
disk->copyFile(source, *disk, destination, read_settings, write_settings);
|
||||
src_disk->copyFile(source, *dst_disk, destination, read_settings, write_settings);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (transaction)
|
||||
transaction->createHardLink(source, destination);
|
||||
else
|
||||
disk->createHardLink(source, destination);
|
||||
src_disk->createHardLink(source, destination);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
localBackupImpl(
|
||||
disk,
|
||||
BackupImpl(
|
||||
src_disk,
|
||||
dst_disk,
|
||||
transaction,
|
||||
source,
|
||||
destination,
|
||||
@ -123,8 +125,11 @@ private:
|
||||
};
|
||||
}
|
||||
|
||||
void localBackup(
|
||||
const DiskPtr & disk,
|
||||
/// src_disk and dst_disk can be the same disk when local backup.
|
||||
/// copy_instead_of_hardlinks must be true when remote backup.
|
||||
void Backup(
|
||||
const DiskPtr & src_disk,
|
||||
const DiskPtr & dst_disk,
|
||||
const String & source_path,
|
||||
const String & destination_path,
|
||||
const ReadSettings & read_settings,
|
||||
@ -135,10 +140,10 @@ void localBackup(
|
||||
const NameSet & files_to_copy_intead_of_hardlinks,
|
||||
DiskTransactionPtr disk_transaction)
|
||||
{
|
||||
if (disk->exists(destination_path) && !disk->isDirectoryEmpty(destination_path))
|
||||
if (dst_disk->exists(destination_path) && !dst_disk->isDirectoryEmpty(destination_path))
|
||||
{
|
||||
throw DB::Exception(ErrorCodes::DIRECTORY_ALREADY_EXISTS, "Directory {} already exists and is not empty.",
|
||||
DB::fullPath(disk, destination_path));
|
||||
DB::fullPath(dst_disk, destination_path));
|
||||
}
|
||||
|
||||
size_t try_no = 0;
|
||||
@ -154,8 +159,9 @@ void localBackup(
|
||||
{
|
||||
if (disk_transaction)
|
||||
{
|
||||
localBackupImpl(
|
||||
disk,
|
||||
BackupImpl(
|
||||
src_disk,
|
||||
dst_disk,
|
||||
disk_transaction.get(),
|
||||
source_path,
|
||||
destination_path,
|
||||
@ -165,27 +171,29 @@ void localBackup(
|
||||
/* level= */ 0,
|
||||
max_level,
|
||||
copy_instead_of_hardlinks,
|
||||
files_to_copy_intead_of_hardlinks);
|
||||
files_to_copy_intead_of_hardlinks
|
||||
);
|
||||
}
|
||||
else if (copy_instead_of_hardlinks)
|
||||
{
|
||||
CleanupOnFail cleanup([disk, destination_path]() { disk->removeRecursive(destination_path); });
|
||||
disk->copyDirectoryContent(source_path, disk, destination_path, read_settings, write_settings, /*cancellation_hook=*/{});
|
||||
CleanupOnFail cleanup([dst_disk, destination_path]() { dst_disk->removeRecursive(destination_path); });
|
||||
src_disk->copyDirectoryContent(source_path, dst_disk, destination_path, read_settings, write_settings, /*cancellation_hook=*/{});
|
||||
cleanup.success();
|
||||
}
|
||||
else
|
||||
{
|
||||
std::function<void()> cleaner;
|
||||
if (disk->supportZeroCopyReplication())
|
||||
if (dst_disk->supportZeroCopyReplication())
|
||||
/// Note: this code will create garbage on s3. We should always remove `copy_instead_of_hardlinks` files.
|
||||
/// The third argument should be a list of exceptions, but (looks like) it is ignored for keep_all_shared_data = true.
|
||||
cleaner = [disk, destination_path]() { disk->removeSharedRecursive(destination_path, /*keep_all_shared_data*/ true, {}); };
|
||||
cleaner = [dst_disk, destination_path]() { dst_disk->removeSharedRecursive(destination_path, /*keep_all_shared_data*/ true, {}); };
|
||||
else
|
||||
cleaner = [disk, destination_path]() { disk->removeRecursive(destination_path); };
|
||||
cleaner = [dst_disk, destination_path]() { dst_disk->removeRecursive(destination_path); };
|
||||
|
||||
CleanupOnFail cleanup(std::move(cleaner));
|
||||
localBackupImpl(
|
||||
disk,
|
||||
BackupImpl(
|
||||
src_disk,
|
||||
dst_disk,
|
||||
disk_transaction.get(),
|
||||
source_path,
|
||||
destination_path,
|
@ -24,8 +24,9 @@ struct WriteSettings;
|
||||
*
|
||||
* If `transaction` is provided, the changes will be added to it instead of performend on disk.
|
||||
*/
|
||||
void localBackup(
|
||||
const DiskPtr & disk,
|
||||
void Backup(
|
||||
const DiskPtr & src_disk,
|
||||
const DiskPtr & dst_disk,
|
||||
const String & source_path,
|
||||
const String & destination_path,
|
||||
const ReadSettings & read_settings,
|
@ -8,8 +8,7 @@
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/formatReadable.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Storages/MergeTree/localBackup.h>
|
||||
#include <Storages/MergeTree/remoteBackup.h>
|
||||
#include <Storages/MergeTree/Backup.h>
|
||||
#include <Backups/BackupEntryFromSmallFile.h>
|
||||
#include <Backups/BackupEntryFromImmutableFile.h>
|
||||
#include <Backups/BackupEntryWrappedWith.h>
|
||||
@ -460,7 +459,8 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::freeze(
|
||||
else
|
||||
disk->createDirectories(to);
|
||||
|
||||
localBackup(
|
||||
Backup(
|
||||
disk,
|
||||
disk,
|
||||
getRelativePath(),
|
||||
fs::path(to) / dir_path,
|
||||
@ -512,7 +512,7 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::freezeRemote(
|
||||
else
|
||||
dst_disk->createDirectories(to);
|
||||
|
||||
remoteBackup(
|
||||
Backup(
|
||||
src_disk,
|
||||
dst_disk,
|
||||
getRelativePath(),
|
||||
@ -521,6 +521,8 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::freezeRemote(
|
||||
write_settings,
|
||||
params.make_source_readonly,
|
||||
/* max_level= */ {},
|
||||
true,
|
||||
{},
|
||||
params.external_transaction);
|
||||
|
||||
/// The save_metadata_callback function acts on the target dist.
|
||||
@ -545,7 +547,7 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::freezeRemote(
|
||||
auto single_disk_volume = std::make_shared<SingleDiskVolume>(dst_disk->getName(), dst_disk, 0);
|
||||
|
||||
/// Do not initialize storage in case of DETACH because part may be broken.
|
||||
bool to_detached = dir_path.starts_with("detached/");
|
||||
bool to_detached = dir_path.starts_with(std::string_view((fs::path(MergeTreeData::DETACHED_DIR_NAME) / "").string()));
|
||||
return create(single_disk_volume, to, dir_path, /*initialize=*/ !to_detached && !params.external_transaction);
|
||||
}
|
||||
|
||||
|
@ -26,7 +26,7 @@
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Storages/MergeTree/PartMetadataManagerOrdinary.h>
|
||||
#include <Storages/MergeTree/checkDataPart.h>
|
||||
#include <Storages/MergeTree/localBackup.h>
|
||||
#include <Storages/MergeTree/Backup.h>
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
#include <base/JSON.h>
|
||||
#include <boost/algorithm/string/join.hpp>
|
||||
|
@ -7053,121 +7053,7 @@ MergeTreeData & MergeTreeData::checkStructureAndGetMergeTreeData(
|
||||
return checkStructureAndGetMergeTreeData(*source_table, src_snapshot, my_snapshot);
|
||||
}
|
||||
|
||||
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAndLoadDataPartOnSameDisk(
|
||||
const MergeTreeData::DataPartPtr & src_part,
|
||||
const String & tmp_part_prefix,
|
||||
const MergeTreePartInfo & dst_part_info,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const IDataPartStorage::ClonePartParams & params,
|
||||
const ReadSettings & read_settings,
|
||||
const WriteSettings & write_settings)
|
||||
{
|
||||
chassert(!isStaticStorage());
|
||||
|
||||
/// Check that the storage policy contains the disk where the src_part is located.
|
||||
bool does_storage_policy_allow_same_disk = false;
|
||||
for (const DiskPtr & disk : getStoragePolicy()->getDisks())
|
||||
{
|
||||
if (disk->getName() == src_part->getDataPartStorage().getDiskName())
|
||||
{
|
||||
does_storage_policy_allow_same_disk = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!does_storage_policy_allow_same_disk)
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Could not clone and load part {} because disk does not belong to storage policy",
|
||||
quoteString(src_part->getDataPartStorage().getFullPath()));
|
||||
|
||||
String dst_part_name = src_part->getNewName(dst_part_info);
|
||||
String tmp_dst_part_name = tmp_part_prefix + dst_part_name;
|
||||
auto temporary_directory_lock = getTemporaryPartDirectoryHolder(tmp_dst_part_name);
|
||||
|
||||
/// Why it is needed if we only hardlink files?
|
||||
auto reservation = src_part->getDataPartStorage().reserve(src_part->getBytesOnDisk());
|
||||
auto src_part_storage = src_part->getDataPartStoragePtr();
|
||||
|
||||
scope_guard src_flushed_tmp_dir_lock;
|
||||
MergeTreeData::MutableDataPartPtr src_flushed_tmp_part;
|
||||
|
||||
String with_copy;
|
||||
if (params.copy_instead_of_hardlink)
|
||||
with_copy = " (copying data)";
|
||||
|
||||
auto dst_part_storage = src_part_storage->freeze(
|
||||
relative_data_path,
|
||||
tmp_dst_part_name,
|
||||
read_settings,
|
||||
write_settings,
|
||||
/* save_metadata_callback= */ {},
|
||||
params);
|
||||
|
||||
if (params.metadata_version_to_write.has_value())
|
||||
{
|
||||
chassert(!params.keep_metadata_version);
|
||||
auto out_metadata = dst_part_storage->writeFile(IMergeTreeDataPart::METADATA_VERSION_FILE_NAME, 4096, getContext()->getWriteSettings());
|
||||
writeText(metadata_snapshot->getMetadataVersion(), *out_metadata);
|
||||
out_metadata->finalize();
|
||||
if (getSettings()->fsync_after_insert)
|
||||
out_metadata->sync();
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Clone{} part {} to {}{}",
|
||||
src_flushed_tmp_part ? " flushed" : "",
|
||||
src_part_storage->getFullPath(),
|
||||
std::string(fs::path(dst_part_storage->getFullRootPath()) / tmp_dst_part_name),
|
||||
with_copy);
|
||||
|
||||
auto dst_data_part = MergeTreeDataPartBuilder(*this, dst_part_name, dst_part_storage)
|
||||
.withPartFormatFromDisk()
|
||||
.build();
|
||||
|
||||
if (!params.copy_instead_of_hardlink && params.hardlinked_files)
|
||||
{
|
||||
params.hardlinked_files->source_part_name = src_part->name;
|
||||
params.hardlinked_files->source_table_shared_id = src_part->storage.getTableSharedID();
|
||||
|
||||
for (auto it = src_part->getDataPartStorage().iterate(); it->isValid(); it->next())
|
||||
{
|
||||
if (!params.files_to_copy_instead_of_hardlinks.contains(it->name())
|
||||
&& it->name() != IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME_DEPRECATED
|
||||
&& it->name() != IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME)
|
||||
{
|
||||
params.hardlinked_files->hardlinks_from_source_part.insert(it->name());
|
||||
}
|
||||
}
|
||||
|
||||
auto projections = src_part->getProjectionParts();
|
||||
for (const auto & [name, projection_part] : projections)
|
||||
{
|
||||
const auto & projection_storage = projection_part->getDataPartStorage();
|
||||
for (auto it = projection_storage.iterate(); it->isValid(); it->next())
|
||||
{
|
||||
auto file_name_with_projection_prefix = fs::path(projection_storage.getPartDirectory()) / it->name();
|
||||
if (!params.files_to_copy_instead_of_hardlinks.contains(file_name_with_projection_prefix)
|
||||
&& it->name() != IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME_DEPRECATED
|
||||
&& it->name() != IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME)
|
||||
{
|
||||
params.hardlinked_files->hardlinks_from_source_part.insert(file_name_with_projection_prefix);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// We should write version metadata on part creation to distinguish it from parts that were created without transaction.
|
||||
TransactionID tid = params.txn ? params.txn->tid : Tx::PrehistoricTID;
|
||||
dst_data_part->version.setCreationTID(tid, nullptr);
|
||||
dst_data_part->storeVersionMetadata();
|
||||
|
||||
dst_data_part->is_temp = true;
|
||||
|
||||
dst_data_part->loadColumnsChecksumsIndexes(require_part_metadata, true);
|
||||
dst_data_part->modification_time = dst_part_storage->getLastModified().epochTime();
|
||||
return std::make_pair(dst_data_part, std::move(temporary_directory_lock));
|
||||
}
|
||||
|
||||
/// Used only when attach partition; Both for same disk and different disk.
|
||||
/// must_on_same_disk=false is used only when attach partition; Both for same disk and different disk.
|
||||
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAndLoadDataPart(
|
||||
const MergeTreeData::DataPartPtr & src_part,
|
||||
const String & tmp_part_prefix,
|
||||
@ -7175,7 +7061,8 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const IDataPartStorage::ClonePartParams & params,
|
||||
const ReadSettings & read_settings,
|
||||
const WriteSettings & write_settings)
|
||||
const WriteSettings & write_settings,
|
||||
bool must_on_same_disk)
|
||||
{
|
||||
chassert(!isStaticStorage());
|
||||
|
||||
@ -7189,12 +7076,16 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!on_same_disk && must_on_same_disk)
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Could not clone and load part {} because disk does not belong to storage policy",
|
||||
quoteString(src_part->getDataPartStorage().getFullPath()));
|
||||
|
||||
String dst_part_name = src_part->getNewName(dst_part_info);
|
||||
String tmp_dst_part_name = tmp_part_prefix + dst_part_name;
|
||||
auto temporary_directory_lock = getTemporaryPartDirectoryHolder(tmp_dst_part_name);
|
||||
|
||||
/// Why it is needed if we only hardlink files?
|
||||
auto reservation = src_part->getDataPartStorage().reserve(src_part->getBytesOnDisk());
|
||||
auto src_part_storage = src_part->getDataPartStoragePtr();
|
||||
|
||||
|
@ -842,15 +842,6 @@ public:
|
||||
MergeTreeData & checkStructureAndGetMergeTreeData(const StoragePtr & source_table, const StorageMetadataPtr & src_snapshot, const StorageMetadataPtr & my_snapshot) const;
|
||||
MergeTreeData & checkStructureAndGetMergeTreeData(IStorage & source_table, const StorageMetadataPtr & src_snapshot, const StorageMetadataPtr & my_snapshot) const;
|
||||
|
||||
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> cloneAndLoadDataPartOnSameDisk(
|
||||
const MergeTreeData::DataPartPtr & src_part,
|
||||
const String & tmp_part_prefix,
|
||||
const MergeTreePartInfo & dst_part_info,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const IDataPartStorage::ClonePartParams & params,
|
||||
const ReadSettings & read_settings,
|
||||
const WriteSettings & write_settings);
|
||||
|
||||
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> cloneAndLoadDataPart(
|
||||
const MergeTreeData::DataPartPtr & src_part,
|
||||
const String & tmp_part_prefix,
|
||||
@ -858,7 +849,8 @@ public:
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const IDataPartStorage::ClonePartParams & params,
|
||||
const ReadSettings & read_settings,
|
||||
const WriteSettings & write_settings);
|
||||
const WriteSettings & write_settings,
|
||||
bool must_on_same_disk);
|
||||
|
||||
virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0;
|
||||
|
||||
|
@ -2146,8 +2146,8 @@ bool MutateTask::prepare()
|
||||
scope_guard lock;
|
||||
|
||||
{
|
||||
std::tie(part, lock) = ctx->data->cloneAndLoadDataPartOnSameDisk(
|
||||
ctx->source_part, prefix, ctx->future_part->part_info, ctx->metadata_snapshot, clone_params, ctx->context->getReadSettings(), ctx->context->getWriteSettings());
|
||||
std::tie(part, lock) = ctx->data->cloneAndLoadDataPart(
|
||||
ctx->source_part, prefix, ctx->future_part->part_info, ctx->metadata_snapshot, clone_params, ctx->context->getReadSettings(), ctx->context->getWriteSettings(), true/*must_on_same_disk*/);
|
||||
part->getDataPartStorage().beginTransaction();
|
||||
ctx->temporary_directory_lock = std::move(lock);
|
||||
}
|
||||
|
@ -1,195 +0,0 @@
|
||||
#include "remoteBackup.h"
|
||||
|
||||
#include <Common/Exception.h>
|
||||
#include <string>
|
||||
#include <cerrno>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int TOO_DEEP_RECURSION;
|
||||
extern const int DIRECTORY_ALREADY_EXISTS;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
void remoteBackupImpl(
|
||||
const DiskPtr & src_disk,
|
||||
const DiskPtr & dst_disk,
|
||||
IDiskTransaction * transaction,
|
||||
const String & source_path,
|
||||
const String & destination_path,
|
||||
const ReadSettings & read_settings,
|
||||
const WriteSettings & write_settings,
|
||||
bool make_source_readonly,
|
||||
size_t level,
|
||||
std::optional<size_t> max_level)
|
||||
{
|
||||
if (max_level && level > *max_level)
|
||||
return;
|
||||
|
||||
if (level >= 1000)
|
||||
throw DB::Exception(DB::ErrorCodes::TOO_DEEP_RECURSION, "Too deep recursion");
|
||||
|
||||
if (transaction)
|
||||
transaction->createDirectories(destination_path);
|
||||
else
|
||||
dst_disk->createDirectories(destination_path);
|
||||
|
||||
for (auto it = src_disk->iterateDirectory(source_path); it->isValid(); it->next())
|
||||
{
|
||||
auto source = it->path();
|
||||
auto destination = fs::path(destination_path) / it->name();
|
||||
|
||||
if (!src_disk->isDirectory(source))
|
||||
{
|
||||
if (make_source_readonly)
|
||||
{
|
||||
if (transaction)
|
||||
transaction->setReadOnly(source);
|
||||
else
|
||||
src_disk->setReadOnly(source);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (transaction)
|
||||
transaction->copyFile(source, destination, read_settings, write_settings);
|
||||
else
|
||||
src_disk->copyFile(source, *dst_disk, destination, read_settings, write_settings);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
remoteBackupImpl(
|
||||
src_disk,
|
||||
dst_disk,
|
||||
transaction,
|
||||
source,
|
||||
destination,
|
||||
read_settings,
|
||||
write_settings,
|
||||
make_source_readonly,
|
||||
level + 1,
|
||||
max_level);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class CleanupOnFail
|
||||
{
|
||||
public:
|
||||
explicit CleanupOnFail(std::function<void()> && cleaner_)
|
||||
: cleaner(cleaner_)
|
||||
{}
|
||||
|
||||
~CleanupOnFail()
|
||||
{
|
||||
if (!is_success)
|
||||
{
|
||||
/// We are trying to handle race condition here. So if we was not
|
||||
/// able to backup directory try to remove garbage, but it's ok if
|
||||
/// it doesn't exist.
|
||||
try
|
||||
{
|
||||
cleaner();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void success()
|
||||
{
|
||||
is_success = true;
|
||||
}
|
||||
|
||||
private:
|
||||
std::function<void()> cleaner;
|
||||
bool is_success{false};
|
||||
};
|
||||
}
|
||||
|
||||
/// remoteBackup only supports copy
|
||||
void remoteBackup(
|
||||
const DiskPtr & src_disk,
|
||||
const DiskPtr & dst_disk,
|
||||
const String & source_path,
|
||||
const String & destination_path,
|
||||
const ReadSettings & read_settings,
|
||||
const WriteSettings & write_settings,
|
||||
bool make_source_readonly,
|
||||
std::optional<size_t> max_level,
|
||||
DiskTransactionPtr disk_transaction)
|
||||
{
|
||||
if (dst_disk->exists(destination_path) && !dst_disk->isDirectoryEmpty(destination_path))
|
||||
{
|
||||
throw DB::Exception(ErrorCodes::DIRECTORY_ALREADY_EXISTS, "Directory {} already exists and is not empty.",
|
||||
DB::fullPath(dst_disk, destination_path));
|
||||
}
|
||||
|
||||
size_t try_no = 0;
|
||||
const size_t max_tries = 10;
|
||||
|
||||
/** Files in the directory can be permanently added and deleted.
|
||||
* If some file is deleted during an attempt to make a backup, then try again,
|
||||
* because it's important to take into account any new files that might appear.
|
||||
*/
|
||||
while (true)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (disk_transaction)
|
||||
{
|
||||
remoteBackupImpl(
|
||||
src_disk,
|
||||
dst_disk,
|
||||
disk_transaction.get(),
|
||||
source_path,
|
||||
destination_path,
|
||||
read_settings,
|
||||
write_settings,
|
||||
make_source_readonly,
|
||||
/* level= */ 0,
|
||||
max_level);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// roll back if fail
|
||||
CleanupOnFail cleanup([dst_disk, destination_path]() { dst_disk->removeRecursive(destination_path); });
|
||||
src_disk->copyDirectoryContent(source_path, dst_disk, destination_path, read_settings, write_settings, /*cancellation_hook=*/{});
|
||||
cleanup.success();
|
||||
}
|
||||
}
|
||||
catch (const DB::ErrnoException & e)
|
||||
{
|
||||
if (e.getErrno() != ENOENT)
|
||||
throw;
|
||||
|
||||
++try_no;
|
||||
if (try_no == max_tries)
|
||||
throw;
|
||||
|
||||
continue;
|
||||
}
|
||||
catch (const fs::filesystem_error & e)
|
||||
{
|
||||
if (e.code() == std::errc::no_such_file_or_directory)
|
||||
{
|
||||
++try_no;
|
||||
if (try_no == max_tries)
|
||||
throw;
|
||||
continue;
|
||||
}
|
||||
throw;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,38 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <optional>
|
||||
#include <base/types.h>
|
||||
#include <Disks/IDisk.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct WriteSettings;
|
||||
|
||||
/** Creates a local (at the same mount point) backup (snapshot) directory.
|
||||
*
|
||||
* In the specified destination directory, it creates hard links on all source-directory files
|
||||
* and in all nested directories, with saving (creating) all relative paths;
|
||||
* and also `chown`, removing the write permission.
|
||||
*
|
||||
* This protects data from accidental deletion or modification,
|
||||
* and is intended to be used as a simple means of protection against a human or program error,
|
||||
* but not from a hardware failure.
|
||||
*
|
||||
* If max_level is specified, than only files with depth relative source_path less or equal max_level will be copied.
|
||||
* So, if max_level=0 than only direct file child are copied.
|
||||
*
|
||||
* If `transaction` is provided, the changes will be added to it instead of performend on disk.
|
||||
*/
|
||||
void remoteBackup(
|
||||
const DiskPtr & src_disk,
|
||||
const DiskPtr & dst_disk,
|
||||
const String & source_path,
|
||||
const String & destination_path,
|
||||
const ReadSettings & read_settings,
|
||||
const WriteSettings & write_settings,
|
||||
bool make_source_readonly = true,
|
||||
std::optional<size_t> max_level = {},
|
||||
DiskTransactionPtr disk_transaction = nullptr);
|
||||
|
||||
}
|
@ -2123,14 +2123,15 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
|
||||
if (replace)
|
||||
{
|
||||
/// Replace can only work on the same disk
|
||||
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(
|
||||
auto [dst_part, part_lock] = cloneAndLoadDataPart(
|
||||
src_part,
|
||||
TMP_PREFIX,
|
||||
dst_part_info,
|
||||
my_metadata_snapshot,
|
||||
clone_params,
|
||||
local_context->getReadSettings(),
|
||||
local_context->getWriteSettings());
|
||||
local_context->getWriteSettings(),
|
||||
true/*must_on_same_disk*/);
|
||||
dst_parts.emplace_back(std::move(dst_part));
|
||||
dst_parts_locks.emplace_back(std::move(part_lock));
|
||||
}
|
||||
@ -2144,7 +2145,8 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
|
||||
my_metadata_snapshot,
|
||||
clone_params,
|
||||
local_context->getReadSettings(),
|
||||
local_context->getWriteSettings());
|
||||
local_context->getWriteSettings(),
|
||||
false/*must_on_same_disk*/);
|
||||
dst_parts.emplace_back(std::move(dst_part));
|
||||
dst_parts_locks.emplace_back(std::move(part_lock));
|
||||
}
|
||||
@ -2252,14 +2254,15 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
|
||||
.copy_instead_of_hardlink = getSettings()->always_use_copy_instead_of_hardlinks,
|
||||
};
|
||||
|
||||
auto [dst_part, part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(
|
||||
auto [dst_part, part_lock] = dest_table_storage->cloneAndLoadDataPart(
|
||||
src_part,
|
||||
TMP_PREFIX,
|
||||
dst_part_info,
|
||||
dest_metadata_snapshot,
|
||||
clone_params,
|
||||
local_context->getReadSettings(),
|
||||
local_context->getWriteSettings()
|
||||
local_context->getWriteSettings(),
|
||||
true/*must_on_same_disk*/
|
||||
);
|
||||
|
||||
dst_parts.emplace_back(std::move(dst_part));
|
||||
|
@ -2793,7 +2793,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(LogEntry & entry)
|
||||
|
||||
auto obtain_part = [&] (PartDescriptionPtr & part_desc)
|
||||
{
|
||||
/// Fetches with zero-copy-replication are cheap, but cloneAndLoadDataPartOnSameDisk will do full copy.
|
||||
/// Fetches with zero-copy-replication are cheap, but cloneAndLoadDataPart(must_on_same_disk=true) will do full copy.
|
||||
/// It's okay to check the setting for current table and disk for the source table, because src and dst part are on the same disk.
|
||||
bool prefer_fetch_from_other_replica = !part_desc->replica.empty() && storage_settings_ptr->allow_remote_fs_zero_copy_replication
|
||||
&& part_desc->src_table_part && part_desc->src_table_part->isStoredOnRemoteDiskWithZeroCopySupport();
|
||||
@ -2812,14 +2812,15 @@ bool StorageReplicatedMergeTree::executeReplaceRange(LogEntry & entry)
|
||||
.copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || ((our_zero_copy_enabled || source_zero_copy_enabled) && part_desc->src_table_part->isStoredOnRemoteDiskWithZeroCopySupport()),
|
||||
.metadata_version_to_write = metadata_snapshot->getMetadataVersion()
|
||||
};
|
||||
auto [res_part, temporary_part_lock] = cloneAndLoadDataPartOnSameDisk(
|
||||
auto [res_part, temporary_part_lock] = cloneAndLoadDataPart(
|
||||
part_desc->src_table_part,
|
||||
TMP_PREFIX + "clone_",
|
||||
part_desc->new_part_info,
|
||||
metadata_snapshot,
|
||||
clone_params,
|
||||
getContext()->getReadSettings(),
|
||||
getContext()->getWriteSettings());
|
||||
getContext()->getWriteSettings(),
|
||||
true/*must_on_same_disk*/);
|
||||
part_desc->res_part = std::move(res_part);
|
||||
part_desc->temporary_part_lock = std::move(temporary_part_lock);
|
||||
}
|
||||
@ -4893,14 +4894,15 @@ bool StorageReplicatedMergeTree::fetchPart(
|
||||
.keep_metadata_version = true,
|
||||
};
|
||||
|
||||
auto [cloned_part, lock] = cloneAndLoadDataPartOnSameDisk(
|
||||
auto [cloned_part, lock] = cloneAndLoadDataPart(
|
||||
part_to_clone,
|
||||
"tmp_clone_",
|
||||
part_info,
|
||||
metadata_snapshot,
|
||||
clone_params,
|
||||
getContext()->getReadSettings(),
|
||||
getContext()->getWriteSettings());
|
||||
getContext()->getWriteSettings(),
|
||||
true/*must_on_same_disk*/);
|
||||
|
||||
part_directory_lock = std::move(lock);
|
||||
return cloned_part;
|
||||
@ -8104,14 +8106,15 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
|
||||
if (replace)
|
||||
{
|
||||
/// Replace can only work on the same disk
|
||||
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(
|
||||
auto [dst_part, part_lock] = cloneAndLoadDataPart(
|
||||
src_part,
|
||||
TMP_PREFIX,
|
||||
dst_part_info,
|
||||
metadata_snapshot,
|
||||
clone_params,
|
||||
query_context->getReadSettings(),
|
||||
query_context->getWriteSettings());
|
||||
query_context->getWriteSettings(),
|
||||
true/*must_on_same_disk*/);
|
||||
dst_parts.emplace_back(std::move(dst_part));
|
||||
dst_parts_locks.emplace_back(std::move(part_lock));
|
||||
}
|
||||
@ -8125,7 +8128,8 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
|
||||
metadata_snapshot,
|
||||
clone_params,
|
||||
query_context->getReadSettings(),
|
||||
query_context->getWriteSettings());
|
||||
query_context->getWriteSettings(),
|
||||
false/*must_on_same_disk*/);
|
||||
dst_parts.emplace_back(std::move(dst_part));
|
||||
dst_parts_locks.emplace_back(std::move(part_lock));
|
||||
}
|
||||
@ -8385,14 +8389,15 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
|
||||
.copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()),
|
||||
.metadata_version_to_write = dest_metadata_snapshot->getMetadataVersion()
|
||||
};
|
||||
auto [dst_part, dst_part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(
|
||||
auto [dst_part, dst_part_lock] = dest_table_storage->cloneAndLoadDataPart(
|
||||
src_part,
|
||||
TMP_PREFIX,
|
||||
dst_part_info,
|
||||
dest_metadata_snapshot,
|
||||
clone_params,
|
||||
query_context->getReadSettings(),
|
||||
query_context->getWriteSettings());
|
||||
query_context->getWriteSettings(),
|
||||
true/*must_on_same_disk*/);
|
||||
|
||||
src_parts.emplace_back(src_part);
|
||||
dst_parts.emplace_back(dst_part);
|
||||
|
@ -0,0 +1 @@
|
||||
1
|
@ -0,0 +1,20 @@
|
||||
ATTACH TABLE _ UUID '39b798e4-1787-4b2e-971f-c4f092bf0cde'
|
||||
(
|
||||
`price` UInt32,
|
||||
`date` Date,
|
||||
`postcode1` LowCardinality(String),
|
||||
`postcode2` LowCardinality(String),
|
||||
`type` Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4),
|
||||
`is_new` UInt8,
|
||||
`duration` Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2),
|
||||
`addr1` String,
|
||||
`addr2` String,
|
||||
`street` LowCardinality(String),
|
||||
`locality` LowCardinality(String),
|
||||
`town` LowCardinality(String),
|
||||
`district` LowCardinality(String),
|
||||
`county` LowCardinality(String)
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY (postcode1, postcode2, addr1, addr2)
|
||||
SETTINGS index_granularity = 8192
|
@ -0,0 +1,20 @@
|
||||
ATTACH TABLE _ UUID 'cf712b4f-2ca8-435c-ac23-c4393efe52f7'
|
||||
(
|
||||
`price` UInt32,
|
||||
`date` Date,
|
||||
`postcode1` LowCardinality(String),
|
||||
`postcode2` LowCardinality(String),
|
||||
`type` Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4),
|
||||
`is_new` UInt8,
|
||||
`duration` Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2),
|
||||
`addr1` String,
|
||||
`addr2` String,
|
||||
`street` LowCardinality(String),
|
||||
`locality` LowCardinality(String),
|
||||
`town` LowCardinality(String),
|
||||
`district` LowCardinality(String),
|
||||
`county` LowCardinality(String)
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY (postcode1, postcode2, addr1, addr2)
|
||||
SETTINGS disk = disk(type = web, endpoint = 'https://raw.githubusercontent.com/ClickHouse/web-tables-demo/main/web/'), index_granularity = 8192
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -0,0 +1,16 @@
|
||||
columns format version: 1
|
||||
14 columns:
|
||||
`price` UInt32
|
||||
`date` Date
|
||||
`postcode1` LowCardinality(String)
|
||||
`postcode2` LowCardinality(String)
|
||||
`type` Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4)
|
||||
`is_new` UInt8
|
||||
`duration` Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2)
|
||||
`addr1` String
|
||||
`addr2` String
|
||||
`street` LowCardinality(String)
|
||||
`locality` LowCardinality(String)
|
||||
`town` LowCardinality(String)
|
||||
`district` LowCardinality(String)
|
||||
`county` LowCardinality(String)
|
@ -0,0 +1 @@
|
||||
27910954
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -0,0 +1 @@
|
||||
CODEC(ZSTD(1))
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -0,0 +1 @@
|
||||
1
|
@ -0,0 +1,20 @@
|
||||
ATTACH TABLE _ UUID '9f761770-85bc-436f-8852-0f1f9b44bfd4'
|
||||
(
|
||||
`price` UInt32,
|
||||
`date` Date,
|
||||
`postcode1` LowCardinality(String),
|
||||
`postcode2` LowCardinality(String),
|
||||
`type` Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4),
|
||||
`is_new` UInt8,
|
||||
`duration` Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2),
|
||||
`addr1` String,
|
||||
`addr2` String,
|
||||
`street` LowCardinality(String),
|
||||
`locality` LowCardinality(String),
|
||||
`town` LowCardinality(String),
|
||||
`district` LowCardinality(String),
|
||||
`county` LowCardinality(String)
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY (postcode1, postcode2, addr1, addr2)
|
||||
SETTINGS index_granularity = 8192
|
@ -0,0 +1,20 @@
|
||||
ATTACH TABLE _ UUID 'cf712b4f-2ca8-435c-ac23-c4393efe52f7'
|
||||
(
|
||||
`price` UInt32,
|
||||
`date` Date,
|
||||
`postcode1` LowCardinality(String),
|
||||
`postcode2` LowCardinality(String),
|
||||
`type` Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4),
|
||||
`is_new` UInt8,
|
||||
`duration` Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2),
|
||||
`addr1` String,
|
||||
`addr2` String,
|
||||
`street` LowCardinality(String),
|
||||
`locality` LowCardinality(String),
|
||||
`town` LowCardinality(String),
|
||||
`district` LowCardinality(String),
|
||||
`county` LowCardinality(String)
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY (postcode1, postcode2, addr1, addr2)
|
||||
SETTINGS disk = disk(type = web, endpoint = 'https://raw.githubusercontent.com/ClickHouse/web-tables-demo/main/web/'), index_granularity = 8192
|
Loading…
Reference in New Issue
Block a user