mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 10:02:01 +00:00
Revert "Merge pull request #41832 from ClickHouse/make_copy_instead_of_hardlink"
This reverts commit0097f15ee7
, reversing changes made to4e422b8046
.
This commit is contained in:
parent
fc301e8f2d
commit
f50d46fb8c
@ -742,14 +742,12 @@ DataPartStoragePtr DataPartStorageOnDisk::freeze(
|
|||||||
const std::string & dir_path,
|
const std::string & dir_path,
|
||||||
bool make_source_readonly,
|
bool make_source_readonly,
|
||||||
std::function<void(const DiskPtr &)> save_metadata_callback,
|
std::function<void(const DiskPtr &)> save_metadata_callback,
|
||||||
bool copy_instead_of_hardlink,
|
bool copy_instead_of_hardlink) const
|
||||||
const NameSet & files_to_copy_instead_of_hardlinks) const
|
|
||||||
|
|
||||||
{
|
{
|
||||||
auto disk = volume->getDisk();
|
auto disk = volume->getDisk();
|
||||||
disk->createDirectories(to);
|
disk->createDirectories(to);
|
||||||
|
|
||||||
localBackup(disk, getRelativePath(), fs::path(to) / dir_path, make_source_readonly, {}, copy_instead_of_hardlink, files_to_copy_instead_of_hardlinks);
|
localBackup(disk, getRelativePath(), fs::path(to) / dir_path, make_source_readonly, {}, copy_instead_of_hardlink);
|
||||||
|
|
||||||
if (save_metadata_callback)
|
if (save_metadata_callback)
|
||||||
save_metadata_callback(disk);
|
save_metadata_callback(disk);
|
||||||
|
@ -100,8 +100,7 @@ public:
|
|||||||
const std::string & dir_path,
|
const std::string & dir_path,
|
||||||
bool make_source_readonly,
|
bool make_source_readonly,
|
||||||
std::function<void(const DiskPtr &)> save_metadata_callback,
|
std::function<void(const DiskPtr &)> save_metadata_callback,
|
||||||
bool copy_instead_of_hardlink,
|
bool copy_instead_of_hardlink) const override;
|
||||||
const NameSet & files_to_copy_instead_of_hardlinks) const override;
|
|
||||||
|
|
||||||
DataPartStoragePtr clone(
|
DataPartStoragePtr clone(
|
||||||
const std::string & to,
|
const std::string & to,
|
||||||
|
@ -192,17 +192,12 @@ public:
|
|||||||
|
|
||||||
/// Creates hardlinks into 'to/dir_path' for every file in data part.
|
/// Creates hardlinks into 'to/dir_path' for every file in data part.
|
||||||
/// Callback is called after hardlinks are created, but before 'delete-on-destroy.txt' marker is removed.
|
/// Callback is called after hardlinks are created, but before 'delete-on-destroy.txt' marker is removed.
|
||||||
/// Some files can be copied instead of hardlinks. It's because of details of zero copy replication
|
|
||||||
/// implementation which relies on paths of some blobs in S3. For example if we want to hardlink
|
|
||||||
/// the whole part during mutation we shouldn't hardlink checksums.txt, because otherwise
|
|
||||||
/// zero-copy locks for different parts will be on the same path in zookeeper.
|
|
||||||
virtual std::shared_ptr<IDataPartStorage> freeze(
|
virtual std::shared_ptr<IDataPartStorage> freeze(
|
||||||
const std::string & to,
|
const std::string & to,
|
||||||
const std::string & dir_path,
|
const std::string & dir_path,
|
||||||
bool make_source_readonly,
|
bool make_source_readonly,
|
||||||
std::function<void(const DiskPtr &)> save_metadata_callback,
|
std::function<void(const DiskPtr &)> save_metadata_callback,
|
||||||
bool copy_instead_of_hardlink,
|
bool copy_instead_of_hardlink) const = 0;
|
||||||
const NameSet & files_to_copy_instead_of_hardlinks) const = 0;
|
|
||||||
|
|
||||||
/// Make a full copy of a data part into 'to/dir_path' (possibly to a different disk).
|
/// Make a full copy of a data part into 'to/dir_path' (possibly to a different disk).
|
||||||
virtual std::shared_ptr<IDataPartStorage> clone(
|
virtual std::shared_ptr<IDataPartStorage> clone(
|
||||||
|
@ -1516,19 +1516,12 @@ void IMergeTreeDataPart::renameToDetached(const String & prefix, DataPartStorage
|
|||||||
|
|
||||||
void IMergeTreeDataPart::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & /*metadata_snapshot*/) const
|
void IMergeTreeDataPart::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & /*metadata_snapshot*/) const
|
||||||
{
|
{
|
||||||
auto storage_settings = storage.getSettings();
|
|
||||||
|
|
||||||
/// In case of zero-copy replication we copy directory instead of hardlinks
|
|
||||||
/// because hardlinks tracking doesn't work for detached parts.
|
|
||||||
bool copy_instead_of_hardlink = isStoredOnRemoteDiskWithZeroCopySupport() && storage.supportsReplication() && storage_settings->allow_remote_fs_zero_copy_replication;
|
|
||||||
|
|
||||||
data_part_storage->freeze(
|
data_part_storage->freeze(
|
||||||
storage.relative_data_path,
|
storage.relative_data_path,
|
||||||
getRelativePathForDetachedPart(prefix),
|
getRelativePathForDetachedPart(prefix),
|
||||||
/*make_source_readonly*/ true,
|
/*make_source_readonly*/ true,
|
||||||
{},
|
{},
|
||||||
copy_instead_of_hardlink,
|
/*copy_instead_of_hardlink*/ false);
|
||||||
{});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
DataPartStoragePtr IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & disk, const String & directory_name) const
|
DataPartStoragePtr IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & disk, const String & directory_name) const
|
||||||
|
@ -6177,8 +6177,7 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
|
|||||||
const StorageMetadataPtr & metadata_snapshot,
|
const StorageMetadataPtr & metadata_snapshot,
|
||||||
const MergeTreeTransactionPtr & txn,
|
const MergeTreeTransactionPtr & txn,
|
||||||
HardlinkedFiles * hardlinked_files,
|
HardlinkedFiles * hardlinked_files,
|
||||||
bool copy_instead_of_hardlink,
|
bool copy_instead_of_hardlink)
|
||||||
const NameSet & files_to_copy_instead_of_hardlinks)
|
|
||||||
{
|
{
|
||||||
/// Check that the storage policy contains the disk where the src_part is located.
|
/// Check that the storage policy contains the disk where the src_part is located.
|
||||||
bool does_storage_policy_allow_same_disk = false;
|
bool does_storage_policy_allow_same_disk = false;
|
||||||
@ -6222,7 +6221,7 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
|
|||||||
std::string(fs::path(src_part_storage->getFullRootPath()) / tmp_dst_part_name),
|
std::string(fs::path(src_part_storage->getFullRootPath()) / tmp_dst_part_name),
|
||||||
with_copy);
|
with_copy);
|
||||||
|
|
||||||
auto dst_part_storage = src_part_storage->freeze(relative_data_path, tmp_dst_part_name, /* make_source_readonly */ false, {}, copy_instead_of_hardlink, files_to_copy_instead_of_hardlinks);
|
auto dst_part_storage = src_part_storage->freeze(relative_data_path, tmp_dst_part_name, /* make_source_readonly */ false, {}, /* copy_instead_of_hardlinks */ copy_instead_of_hardlink);
|
||||||
|
|
||||||
auto dst_data_part = createPart(dst_part_name, dst_part_info, dst_part_storage);
|
auto dst_data_part = createPart(dst_part_name, dst_part_info, dst_part_storage);
|
||||||
|
|
||||||
@ -6233,9 +6232,7 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
|
|||||||
|
|
||||||
for (auto it = src_part->data_part_storage->iterate(); it->isValid(); it->next())
|
for (auto it = src_part->data_part_storage->iterate(); it->isValid(); it->next())
|
||||||
{
|
{
|
||||||
if (!files_to_copy_instead_of_hardlinks.contains(it->name())
|
if (it->name() != IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME && it->name() != IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME)
|
||||||
&& it->name() != IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME
|
|
||||||
&& it->name() != IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME)
|
|
||||||
hardlinked_files->hardlinks_from_source_part.insert(it->name());
|
hardlinked_files->hardlinks_from_source_part.insert(it->name());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -6410,8 +6407,7 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(
|
|||||||
part->data_part_storage->getPartDirectory(),
|
part->data_part_storage->getPartDirectory(),
|
||||||
/*make_source_readonly*/ true,
|
/*make_source_readonly*/ true,
|
||||||
callback,
|
callback,
|
||||||
/*copy_instead_of_hardlink*/ false,
|
/*copy_instead_of_hardlink*/ false);
|
||||||
{});
|
|
||||||
|
|
||||||
part->is_frozen.store(true, std::memory_order_relaxed);
|
part->is_frozen.store(true, std::memory_order_relaxed);
|
||||||
result.push_back(PartitionCommandResultInfo{
|
result.push_back(PartitionCommandResultInfo{
|
||||||
|
@ -790,7 +790,7 @@ public:
|
|||||||
const MergeTreeData::DataPartPtr & src_part, const String & tmp_part_prefix,
|
const MergeTreeData::DataPartPtr & src_part, const String & tmp_part_prefix,
|
||||||
const MergeTreePartInfo & dst_part_info, const StorageMetadataPtr & metadata_snapshot,
|
const MergeTreePartInfo & dst_part_info, const StorageMetadataPtr & metadata_snapshot,
|
||||||
const MergeTreeTransactionPtr & txn, HardlinkedFiles * hardlinked_files,
|
const MergeTreeTransactionPtr & txn, HardlinkedFiles * hardlinked_files,
|
||||||
bool copy_instead_of_hardlink, const NameSet & files_to_copy_instead_of_hardlinks);
|
bool copy_instead_of_hardlink);
|
||||||
|
|
||||||
virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0;
|
virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0;
|
||||||
|
|
||||||
|
@ -1507,24 +1507,8 @@ bool MutateTask::prepare()
|
|||||||
if (ctx->source_part->isStoredOnDisk() && !isStorageTouchedByMutations(
|
if (ctx->source_part->isStoredOnDisk() && !isStorageTouchedByMutations(
|
||||||
ctx->storage_from_source_part, ctx->metadata_snapshot, ctx->commands_for_part, Context::createCopy(context_for_reading)))
|
ctx->storage_from_source_part, ctx->metadata_snapshot, ctx->commands_for_part, Context::createCopy(context_for_reading)))
|
||||||
{
|
{
|
||||||
NameSet files_to_copy_instead_of_hardlinks;
|
|
||||||
auto settings_ptr = ctx->data->getSettings();
|
|
||||||
/// In zero-copy replication checksums file path in s3 (blob path) is used for zero copy locks in ZooKeeper. If we will hardlink checksums file, we will have the same blob path
|
|
||||||
/// and two different parts (source and new mutated part) will use the same locks in ZooKeeper. To avoid this we copy checksums.txt to generate new blob path.
|
|
||||||
/// Example:
|
|
||||||
/// part: all_0_0_0/checksums.txt -> /s3/blobs/shjfgsaasdasdasdasdasdas
|
|
||||||
/// locks path in zk: /zero_copy/tbl_id/s3_blobs_shjfgsaasdasdasdasdasdas/replica_name
|
|
||||||
/// ^ part name don't participate in lock path
|
|
||||||
/// In case of full hardlink we will have:
|
|
||||||
/// part: all_0_0_0_1/checksums.txt -> /s3/blobs/shjfgsaasdasdasdasdasdas
|
|
||||||
/// locks path in zk: /zero_copy/tbl_id/s3_blobs_shjfgsaasdasdasdasdasdas/replica_name
|
|
||||||
/// So we need to copy to have a new name
|
|
||||||
bool copy_checksumns = ctx->data->supportsReplication() && settings_ptr->allow_remote_fs_zero_copy_replication && ctx->source_part->isStoredOnRemoteDiskWithZeroCopySupport();
|
|
||||||
if (copy_checksumns)
|
|
||||||
files_to_copy_instead_of_hardlinks.insert(IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK);
|
|
||||||
|
|
||||||
LOG_TRACE(ctx->log, "Part {} doesn't change up to mutation version {}", ctx->source_part->name, ctx->future_part->part_info.mutation);
|
LOG_TRACE(ctx->log, "Part {} doesn't change up to mutation version {}", ctx->source_part->name, ctx->future_part->part_info.mutation);
|
||||||
auto [part, lock] = ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, "tmp_clone_", ctx->future_part->part_info, ctx->metadata_snapshot, ctx->txn, &ctx->hardlinked_files, false, files_to_copy_instead_of_hardlinks);
|
auto [part, lock] = ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, "tmp_clone_", ctx->future_part->part_info, ctx->metadata_snapshot, ctx->txn, &ctx->hardlinked_files, false);
|
||||||
ctx->temporary_directory_lock = std::move(lock);
|
ctx->temporary_directory_lock = std::move(lock);
|
||||||
promise.set_value(std::move(part));
|
promise.set_value(std::move(part));
|
||||||
return false;
|
return false;
|
||||||
@ -1637,24 +1621,7 @@ bool MutateTask::prepare()
|
|||||||
LOG_TRACE(ctx->log, "Part {} doesn't change up to mutation version {} (optimized)", ctx->source_part->name, ctx->future_part->part_info.mutation);
|
LOG_TRACE(ctx->log, "Part {} doesn't change up to mutation version {} (optimized)", ctx->source_part->name, ctx->future_part->part_info.mutation);
|
||||||
/// new_data_part is not used here, another part is created instead (see the comment above)
|
/// new_data_part is not used here, another part is created instead (see the comment above)
|
||||||
ctx->temporary_directory_lock = {};
|
ctx->temporary_directory_lock = {};
|
||||||
|
auto [part, lock] = ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, "tmp_mut_", ctx->future_part->part_info, ctx->metadata_snapshot, ctx->txn, &ctx->hardlinked_files, false);
|
||||||
/// In zero-copy replication checksums file path in s3 (blob path) is used for zero copy locks in ZooKeeper. If we will hardlink checksums file, we will have the same blob path
|
|
||||||
/// and two different parts (source and new mutated part) will use the same locks in ZooKeeper. To avoid this we copy checksums.txt to generate new blob path.
|
|
||||||
/// Example:
|
|
||||||
/// part: all_0_0_0/checksums.txt -> /s3/blobs/shjfgsaasdasdasdasdasdas
|
|
||||||
/// locks path in zk: /zero_copy/tbl_id/s3_blobs_shjfgsaasdasdasdasdasdas/replica_name
|
|
||||||
/// ^ part name don't participate in lock path
|
|
||||||
/// In case of full hardlink we will have:
|
|
||||||
/// part: all_0_0_0_1/checksums.txt -> /s3/blobs/shjfgsaasdasdasdasdasdas
|
|
||||||
/// locks path in zk: /zero_copy/tbl_id/s3_blobs_shjfgsaasdasdasdasdasdas/replica_name
|
|
||||||
/// So we need to copy to have a new name
|
|
||||||
NameSet files_to_copy_instead_of_hardlinks;
|
|
||||||
auto settings_ptr = ctx->data->getSettings();
|
|
||||||
bool copy_checksumns = ctx->data->supportsReplication() && settings_ptr->allow_remote_fs_zero_copy_replication && ctx->source_part->isStoredOnRemoteDiskWithZeroCopySupport();
|
|
||||||
if (copy_checksumns)
|
|
||||||
files_to_copy_instead_of_hardlinks.insert(IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK);
|
|
||||||
|
|
||||||
auto [part, lock] = ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, "tmp_mut_", ctx->future_part->part_info, ctx->metadata_snapshot, ctx->txn, &ctx->hardlinked_files, false, files_to_copy_instead_of_hardlinks);
|
|
||||||
ctx->temporary_directory_lock = std::move(lock);
|
ctx->temporary_directory_lock = std::move(lock);
|
||||||
promise.set_value(std::move(part));
|
promise.set_value(std::move(part));
|
||||||
return false;
|
return false;
|
||||||
|
@ -19,7 +19,7 @@ namespace
|
|||||||
void localBackupImpl(
|
void localBackupImpl(
|
||||||
const DiskPtr & disk, const String & source_path,
|
const DiskPtr & disk, const String & source_path,
|
||||||
const String & destination_path, bool make_source_readonly, size_t level,
|
const String & destination_path, bool make_source_readonly, size_t level,
|
||||||
std::optional<size_t> max_level, const NameSet & files_to_copy_instead_of_hardlinks)
|
std::optional<size_t> max_level)
|
||||||
{
|
{
|
||||||
if (max_level && level > *max_level)
|
if (max_level && level > *max_level)
|
||||||
return;
|
return;
|
||||||
@ -38,14 +38,11 @@ void localBackupImpl(
|
|||||||
{
|
{
|
||||||
if (make_source_readonly)
|
if (make_source_readonly)
|
||||||
disk->setReadOnly(source);
|
disk->setReadOnly(source);
|
||||||
if (files_to_copy_instead_of_hardlinks.contains(it->name()))
|
|
||||||
disk->copyFile(source, *disk, destination);
|
|
||||||
else
|
|
||||||
disk->createHardLink(source, destination);
|
disk->createHardLink(source, destination);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
localBackupImpl(disk, source, destination, make_source_readonly, level + 1, max_level, files_to_copy_instead_of_hardlinks);
|
localBackupImpl(disk, source, destination, make_source_readonly, level + 1, max_level);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -89,7 +86,7 @@ private:
|
|||||||
void localBackup(
|
void localBackup(
|
||||||
const DiskPtr & disk, const String & source_path,
|
const DiskPtr & disk, const String & source_path,
|
||||||
const String & destination_path, bool make_source_readonly,
|
const String & destination_path, bool make_source_readonly,
|
||||||
std::optional<size_t> max_level, bool copy_instead_of_hardlinks, const NameSet & files_to_copy_intead_of_hardlinks)
|
std::optional<size_t> max_level, bool copy_instead_of_hardlinks)
|
||||||
{
|
{
|
||||||
if (disk->exists(destination_path) && !disk->isDirectoryEmpty(destination_path))
|
if (disk->exists(destination_path) && !disk->isDirectoryEmpty(destination_path))
|
||||||
{
|
{
|
||||||
@ -112,7 +109,7 @@ void localBackup(
|
|||||||
if (copy_instead_of_hardlinks)
|
if (copy_instead_of_hardlinks)
|
||||||
disk->copyDirectoryContent(source_path, disk, destination_path);
|
disk->copyDirectoryContent(source_path, disk, destination_path);
|
||||||
else
|
else
|
||||||
localBackupImpl(disk, source_path, destination_path, make_source_readonly, 0, max_level, files_to_copy_intead_of_hardlinks);
|
localBackupImpl(disk, source_path, destination_path, make_source_readonly, 0, max_level);
|
||||||
}
|
}
|
||||||
catch (const DB::ErrnoException & e)
|
catch (const DB::ErrnoException & e)
|
||||||
{
|
{
|
||||||
|
@ -20,6 +20,6 @@ namespace DB
|
|||||||
* If max_level is specified, than only files which depth relative source_path less or equal max_level will be copied.
|
* If max_level is specified, than only files which depth relative source_path less or equal max_level will be copied.
|
||||||
* So, if max_level=0 than only direct file child are copied.
|
* So, if max_level=0 than only direct file child are copied.
|
||||||
*/
|
*/
|
||||||
void localBackup(const DiskPtr & disk, const String & source_path, const String & destination_path, bool make_source_readonly = true, std::optional<size_t> max_level = {}, bool copy_instead_of_hardlinks = false, const NameSet & files_to_copy_intead_of_hardlinks = {});
|
void localBackup(const DiskPtr & disk, const String & source_path, const String & destination_path, bool make_source_readonly = true, std::optional<size_t> max_level = {}, bool copy_instead_of_hardlinks = false);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1569,7 +1569,7 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
|
|||||||
Int64 temp_index = insert_increment.get();
|
Int64 temp_index = insert_increment.get();
|
||||||
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
|
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
|
||||||
|
|
||||||
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, my_metadata_snapshot, local_context->getCurrentTransaction(), {}, false, {});
|
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, my_metadata_snapshot, local_context->getCurrentTransaction(), {}, false);
|
||||||
dst_parts.emplace_back(std::move(dst_part));
|
dst_parts.emplace_back(std::move(dst_part));
|
||||||
dst_parts_locks.emplace_back(std::move(part_lock));
|
dst_parts_locks.emplace_back(std::move(part_lock));
|
||||||
}
|
}
|
||||||
@ -1664,7 +1664,7 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
|
|||||||
Int64 temp_index = insert_increment.get();
|
Int64 temp_index = insert_increment.get();
|
||||||
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
|
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
|
||||||
|
|
||||||
auto [dst_part, part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, local_context->getCurrentTransaction(), {}, false, {});
|
auto [dst_part, part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, local_context->getCurrentTransaction(), {}, false);
|
||||||
dst_parts.emplace_back(std::move(dst_part));
|
dst_parts.emplace_back(std::move(dst_part));
|
||||||
dst_parts_locks.emplace_back(std::move(part_lock));
|
dst_parts_locks.emplace_back(std::move(part_lock));
|
||||||
}
|
}
|
||||||
|
@ -2164,7 +2164,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
|||||||
throw Exception("Checksums of " + part_desc->src_table_part->name + " is suddenly changed", ErrorCodes::UNFINISHED);
|
throw Exception("Checksums of " + part_desc->src_table_part->name + " is suddenly changed", ErrorCodes::UNFINISHED);
|
||||||
|
|
||||||
auto [res_part, temporary_part_lock] = cloneAndLoadDataPartOnSameDisk(
|
auto [res_part, temporary_part_lock] = cloneAndLoadDataPartOnSameDisk(
|
||||||
part_desc->src_table_part, TMP_PREFIX + "clone_", part_desc->new_part_info, metadata_snapshot, NO_TRANSACTION_PTR, &part_desc->hardlinked_files, false, {});
|
part_desc->src_table_part, TMP_PREFIX + "clone_", part_desc->new_part_info, metadata_snapshot, NO_TRANSACTION_PTR, &part_desc->hardlinked_files, false);
|
||||||
part_desc->res_part = std::move(res_part);
|
part_desc->res_part = std::move(res_part);
|
||||||
part_desc->temporary_part_lock = std::move(temporary_part_lock);
|
part_desc->temporary_part_lock = std::move(temporary_part_lock);
|
||||||
}
|
}
|
||||||
@ -3914,7 +3914,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
|
|||||||
{
|
{
|
||||||
get_part = [&, part_to_clone]()
|
get_part = [&, part_to_clone]()
|
||||||
{
|
{
|
||||||
auto [cloned_part, lock] = cloneAndLoadDataPartOnSameDisk(part_to_clone, "tmp_clone_", part_info, metadata_snapshot, NO_TRANSACTION_PTR, &hardlinked_files, false, {});
|
auto [cloned_part, lock] = cloneAndLoadDataPartOnSameDisk(part_to_clone, "tmp_clone_", part_info, metadata_snapshot, NO_TRANSACTION_PTR, &hardlinked_files, false);
|
||||||
part_to_clone_lock = std::move(lock);
|
part_to_clone_lock = std::move(lock);
|
||||||
return cloned_part;
|
return cloned_part;
|
||||||
};
|
};
|
||||||
@ -6495,7 +6495,8 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
|
|||||||
bool copy_instead_of_hardlink = storage_settings_ptr->allow_remote_fs_zero_copy_replication
|
bool copy_instead_of_hardlink = storage_settings_ptr->allow_remote_fs_zero_copy_replication
|
||||||
&& src_part->isStoredOnRemoteDiskWithZeroCopySupport();
|
&& src_part->isStoredOnRemoteDiskWithZeroCopySupport();
|
||||||
|
|
||||||
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, metadata_snapshot, NO_TRANSACTION_PTR, &hardlinked_files, copy_instead_of_hardlink, {});
|
auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, metadata_snapshot, NO_TRANSACTION_PTR, &hardlinked_files, copy_instead_of_hardlink);
|
||||||
|
|
||||||
src_parts.emplace_back(src_part);
|
src_parts.emplace_back(src_part);
|
||||||
dst_parts.emplace_back(dst_part);
|
dst_parts.emplace_back(dst_part);
|
||||||
dst_parts_locks.emplace_back(std::move(part_lock));
|
dst_parts_locks.emplace_back(std::move(part_lock));
|
||||||
@ -6725,7 +6726,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
|
|||||||
bool copy_instead_of_hardlink = storage_settings_ptr->allow_remote_fs_zero_copy_replication
|
bool copy_instead_of_hardlink = storage_settings_ptr->allow_remote_fs_zero_copy_replication
|
||||||
&& src_part->isStoredOnRemoteDiskWithZeroCopySupport();
|
&& src_part->isStoredOnRemoteDiskWithZeroCopySupport();
|
||||||
|
|
||||||
auto [dst_part, dst_part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, NO_TRANSACTION_PTR, &hardlinked_files, copy_instead_of_hardlink, {});
|
auto [dst_part, dst_part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, NO_TRANSACTION_PTR, &hardlinked_files, copy_instead_of_hardlink);
|
||||||
|
|
||||||
src_parts.emplace_back(src_part);
|
src_parts.emplace_back(src_part);
|
||||||
dst_parts.emplace_back(dst_part);
|
dst_parts.emplace_back(dst_part);
|
||||||
|
@ -816,6 +816,7 @@ class ClickHouseCluster:
|
|||||||
env_variables[f"keeper_config_dir{i}"] = configs_dir
|
env_variables[f"keeper_config_dir{i}"] = configs_dir
|
||||||
env_variables[f"keeper_db_dir{i}"] = coordination_dir
|
env_variables[f"keeper_db_dir{i}"] = coordination_dir
|
||||||
self.zookeeper_dirs_to_create += [logs_dir, configs_dir, coordination_dir]
|
self.zookeeper_dirs_to_create += [logs_dir, configs_dir, coordination_dir]
|
||||||
|
logging.debug(f"DEBUG KEEPER: {self.zookeeper_dirs_to_create}")
|
||||||
|
|
||||||
self.with_zookeeper = True
|
self.with_zookeeper = True
|
||||||
self.base_cmd.extend(["--file", keeper_docker_compose_path])
|
self.base_cmd.extend(["--file", keeper_docker_compose_path])
|
||||||
@ -4107,9 +4108,6 @@ class ClickHouseInstance:
|
|||||||
def get_backuped_s3_objects(self, disk, backup_name):
|
def get_backuped_s3_objects(self, disk, backup_name):
|
||||||
path = f"/var/lib/clickhouse/disks/{disk}/shadow/{backup_name}/store"
|
path = f"/var/lib/clickhouse/disks/{disk}/shadow/{backup_name}/store"
|
||||||
self.wait_for_path_exists(path, 10)
|
self.wait_for_path_exists(path, 10)
|
||||||
return self.get_s3_objects(path)
|
|
||||||
|
|
||||||
def get_s3_objects(self, path):
|
|
||||||
command = [
|
command = [
|
||||||
"find",
|
"find",
|
||||||
path,
|
path,
|
||||||
@ -4122,45 +4120,8 @@ class ClickHouseInstance:
|
|||||||
"{}",
|
"{}",
|
||||||
";",
|
";",
|
||||||
]
|
]
|
||||||
|
|
||||||
return self.exec_in_container(command).split("\n")
|
return self.exec_in_container(command).split("\n")
|
||||||
|
|
||||||
def get_s3_data_objects(self, path):
|
|
||||||
command = [
|
|
||||||
"find",
|
|
||||||
path,
|
|
||||||
"-type",
|
|
||||||
"f",
|
|
||||||
"-name",
|
|
||||||
"*.bin",
|
|
||||||
"-exec",
|
|
||||||
"grep",
|
|
||||||
"-o",
|
|
||||||
"r[01]\\{64\\}-file-[[:lower:]]\\{32\\}",
|
|
||||||
"{}",
|
|
||||||
";",
|
|
||||||
]
|
|
||||||
return self.exec_in_container(command).split("\n")
|
|
||||||
|
|
||||||
def get_table_objects(self, table, database=None):
|
|
||||||
objects = []
|
|
||||||
database_query = ""
|
|
||||||
if database:
|
|
||||||
database_query = f"AND database='{database}'"
|
|
||||||
data_paths = self.query(
|
|
||||||
f"""
|
|
||||||
SELECT arrayJoin(data_paths)
|
|
||||||
FROM system.tables
|
|
||||||
WHERE name='{table}'
|
|
||||||
{database_query}
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
paths = data_paths.split("\n")
|
|
||||||
for path in paths:
|
|
||||||
if path:
|
|
||||||
objects = objects + self.get_s3_data_objects(path)
|
|
||||||
return objects
|
|
||||||
|
|
||||||
|
|
||||||
class ClickHouseKiller(object):
|
class ClickHouseKiller(object):
|
||||||
def __init__(self, clickhouse_node):
|
def __init__(self, clickhouse_node):
|
||||||
|
@ -8,12 +8,11 @@ from helpers.cluster import ClickHouseCluster
|
|||||||
logging.getLogger().setLevel(logging.INFO)
|
logging.getLogger().setLevel(logging.INFO)
|
||||||
logging.getLogger().addHandler(logging.StreamHandler())
|
logging.getLogger().addHandler(logging.StreamHandler())
|
||||||
|
|
||||||
cluster = ClickHouseCluster(__file__)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
@pytest.fixture(scope="module")
|
||||||
def started_cluster():
|
def cluster():
|
||||||
try:
|
try:
|
||||||
|
cluster = ClickHouseCluster(__file__)
|
||||||
cluster.add_instance(
|
cluster.add_instance(
|
||||||
"node1",
|
"node1",
|
||||||
main_configs=["configs/config.d/s3.xml"],
|
main_configs=["configs/config.d/s3.xml"],
|
||||||
@ -97,7 +96,7 @@ def wait_for_active_parts(node, num_expected_parts, table_name, timeout=30):
|
|||||||
# Result of `get_large_objects_count` can be changed in other tests, so run this case at the beginning
|
# Result of `get_large_objects_count` can be changed in other tests, so run this case at the beginning
|
||||||
@pytest.mark.order(0)
|
@pytest.mark.order(0)
|
||||||
@pytest.mark.parametrize("policy", ["s3"])
|
@pytest.mark.parametrize("policy", ["s3"])
|
||||||
def test_s3_zero_copy_replication(started_cluster, policy):
|
def test_s3_zero_copy_replication(cluster, policy):
|
||||||
node1 = cluster.instances["node1"]
|
node1 = cluster.instances["node1"]
|
||||||
node2 = cluster.instances["node2"]
|
node2 = cluster.instances["node2"]
|
||||||
|
|
||||||
@ -154,7 +153,7 @@ def test_s3_zero_copy_replication(started_cluster, policy):
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.skip(reason="Test is flaky (and never was stable)")
|
@pytest.mark.skip(reason="Test is flaky (and never was stable)")
|
||||||
def test_s3_zero_copy_on_hybrid_storage(started_cluster):
|
def test_s3_zero_copy_on_hybrid_storage(cluster):
|
||||||
node1 = cluster.instances["node1"]
|
node1 = cluster.instances["node1"]
|
||||||
node2 = cluster.instances["node2"]
|
node2 = cluster.instances["node2"]
|
||||||
|
|
||||||
@ -269,9 +268,7 @@ def insert_large_data(node, table):
|
|||||||
("tiered_copy", True, 3),
|
("tiered_copy", True, 3),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
def test_s3_zero_copy_with_ttl_move(
|
def test_s3_zero_copy_with_ttl_move(cluster, storage_policy, large_data, iterations):
|
||||||
started_cluster, storage_policy, large_data, iterations
|
|
||||||
):
|
|
||||||
node1 = cluster.instances["node1"]
|
node1 = cluster.instances["node1"]
|
||||||
node2 = cluster.instances["node2"]
|
node2 = cluster.instances["node2"]
|
||||||
|
|
||||||
@ -336,7 +333,7 @@ def test_s3_zero_copy_with_ttl_move(
|
|||||||
(True, 3),
|
(True, 3),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
def test_s3_zero_copy_with_ttl_delete(started_cluster, large_data, iterations):
|
def test_s3_zero_copy_with_ttl_delete(cluster, large_data, iterations):
|
||||||
node1 = cluster.instances["node1"]
|
node1 = cluster.instances["node1"]
|
||||||
node2 = cluster.instances["node2"]
|
node2 = cluster.instances["node2"]
|
||||||
|
|
||||||
@ -418,22 +415,6 @@ def wait_mutations(node, table, seconds):
|
|||||||
assert mutations == "0\n"
|
assert mutations == "0\n"
|
||||||
|
|
||||||
|
|
||||||
def wait_for_clean_old_parts(node, table, seconds):
|
|
||||||
time.sleep(1)
|
|
||||||
while seconds > 0:
|
|
||||||
seconds -= 1
|
|
||||||
parts = node.query(
|
|
||||||
f"SELECT count() FROM system.parts WHERE table='{table}' AND active=0"
|
|
||||||
)
|
|
||||||
if parts == "0\n":
|
|
||||||
return
|
|
||||||
time.sleep(1)
|
|
||||||
parts = node.query(
|
|
||||||
f"SELECT count() FROM system.parts WHERE table='{table}' AND active=0"
|
|
||||||
)
|
|
||||||
assert parts == "0\n"
|
|
||||||
|
|
||||||
|
|
||||||
def s3_zero_copy_unfreeze_base(cluster, unfreeze_query_template):
|
def s3_zero_copy_unfreeze_base(cluster, unfreeze_query_template):
|
||||||
node1 = cluster.instances["node1"]
|
node1 = cluster.instances["node1"]
|
||||||
node2 = cluster.instances["node2"]
|
node2 = cluster.instances["node2"]
|
||||||
@ -454,8 +435,6 @@ def s3_zero_copy_unfreeze_base(cluster, unfreeze_query_template):
|
|||||||
|
|
||||||
node1.query("INSERT INTO unfreeze_test VALUES (0)")
|
node1.query("INSERT INTO unfreeze_test VALUES (0)")
|
||||||
|
|
||||||
wait_for_active_parts(node2, 1, "unfreeze_test")
|
|
||||||
|
|
||||||
node1.query("ALTER TABLE unfreeze_test FREEZE WITH NAME 'freeze_backup1'")
|
node1.query("ALTER TABLE unfreeze_test FREEZE WITH NAME 'freeze_backup1'")
|
||||||
node2.query("ALTER TABLE unfreeze_test FREEZE WITH NAME 'freeze_backup2'")
|
node2.query("ALTER TABLE unfreeze_test FREEZE WITH NAME 'freeze_backup2'")
|
||||||
wait_mutations(node1, "unfreeze_test", 10)
|
wait_mutations(node1, "unfreeze_test", 10)
|
||||||
@ -493,11 +472,11 @@ def s3_zero_copy_unfreeze_base(cluster, unfreeze_query_template):
|
|||||||
node2.query("DROP TABLE IF EXISTS unfreeze_test NO DELAY")
|
node2.query("DROP TABLE IF EXISTS unfreeze_test NO DELAY")
|
||||||
|
|
||||||
|
|
||||||
def test_s3_zero_copy_unfreeze_alter(started_cluster):
|
def test_s3_zero_copy_unfreeze_alter(cluster):
|
||||||
s3_zero_copy_unfreeze_base(cluster, "ALTER TABLE unfreeze_test UNFREEZE WITH NAME")
|
s3_zero_copy_unfreeze_base(cluster, "ALTER TABLE unfreeze_test UNFREEZE WITH NAME")
|
||||||
|
|
||||||
|
|
||||||
def test_s3_zero_copy_unfreeze_system(started_cluster):
|
def test_s3_zero_copy_unfreeze_system(cluster):
|
||||||
s3_zero_copy_unfreeze_base(cluster, "SYSTEM UNFREEZE WITH NAME")
|
s3_zero_copy_unfreeze_base(cluster, "SYSTEM UNFREEZE WITH NAME")
|
||||||
|
|
||||||
|
|
||||||
@ -586,17 +565,17 @@ def s3_zero_copy_drop_detached(cluster, unfreeze_query_template):
|
|||||||
check_objects_not_exisis(cluster, objects1)
|
check_objects_not_exisis(cluster, objects1)
|
||||||
|
|
||||||
|
|
||||||
def test_s3_zero_copy_drop_detached_alter(started_cluster):
|
def test_s3_zero_copy_drop_detached_alter(cluster):
|
||||||
s3_zero_copy_drop_detached(
|
s3_zero_copy_drop_detached(
|
||||||
cluster, "ALTER TABLE drop_detached_test UNFREEZE WITH NAME"
|
cluster, "ALTER TABLE drop_detached_test UNFREEZE WITH NAME"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_s3_zero_copy_drop_detached_system(started_cluster):
|
def test_s3_zero_copy_drop_detached_system(cluster):
|
||||||
s3_zero_copy_drop_detached(cluster, "SYSTEM UNFREEZE WITH NAME")
|
s3_zero_copy_drop_detached(cluster, "SYSTEM UNFREEZE WITH NAME")
|
||||||
|
|
||||||
|
|
||||||
def test_s3_zero_copy_concurrent_merge(started_cluster):
|
def test_s3_zero_copy_concurrent_merge(cluster):
|
||||||
node1 = cluster.instances["node1"]
|
node1 = cluster.instances["node1"]
|
||||||
node2 = cluster.instances["node2"]
|
node2 = cluster.instances["node2"]
|
||||||
|
|
||||||
@ -641,119 +620,3 @@ def test_s3_zero_copy_concurrent_merge(started_cluster):
|
|||||||
|
|
||||||
for node in (node1, node2):
|
for node in (node1, node2):
|
||||||
assert node.query("select sum(id) from concurrent_merge").strip() == "1600"
|
assert node.query("select sum(id) from concurrent_merge").strip() == "1600"
|
||||||
|
|
||||||
|
|
||||||
def test_s3_zero_copy_keeps_data_after_mutation(started_cluster):
|
|
||||||
node1 = cluster.instances["node1"]
|
|
||||||
node2 = cluster.instances["node2"]
|
|
||||||
|
|
||||||
node1.query("DROP TABLE IF EXISTS zero_copy_mutation NO DELAY")
|
|
||||||
node2.query("DROP TABLE IF EXISTS zero_copy_mutation NO DELAY")
|
|
||||||
|
|
||||||
node1.query(
|
|
||||||
"""
|
|
||||||
CREATE TABLE zero_copy_mutation (id UInt64, value1 String, value2 String, value3 String)
|
|
||||||
ENGINE=ReplicatedMergeTree('/clickhouse/tables/zero_copy_mutation', '{replica}')
|
|
||||||
ORDER BY id
|
|
||||||
PARTITION BY (id % 4)
|
|
||||||
SETTINGS storage_policy='s3',
|
|
||||||
old_parts_lifetime=1000
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
node2.query(
|
|
||||||
"""
|
|
||||||
CREATE TABLE zero_copy_mutation (id UInt64, value1 String, value2 String, value3 String)
|
|
||||||
ENGINE=ReplicatedMergeTree('/clickhouse/tables/zero_copy_mutation', '{replica}')
|
|
||||||
ORDER BY id
|
|
||||||
PARTITION BY (id % 4)
|
|
||||||
SETTINGS storage_policy='s3',
|
|
||||||
old_parts_lifetime=1000
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
node1.query(
|
|
||||||
"""
|
|
||||||
INSERT INTO zero_copy_mutation
|
|
||||||
SELECT * FROM generateRandom('id UInt64, value1 String, value2 String, value3 String') limit 1000000
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
wait_for_active_parts(node2, 4, "zero_copy_mutation")
|
|
||||||
|
|
||||||
objects1 = node1.get_table_objects("zero_copy_mutation")
|
|
||||||
check_objects_exisis(cluster, objects1)
|
|
||||||
|
|
||||||
node1.query(
|
|
||||||
"""
|
|
||||||
ALTER TABLE zero_copy_mutation
|
|
||||||
ADD COLUMN valueX String MATERIALIZED value1
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
node1.query(
|
|
||||||
"""
|
|
||||||
ALTER TABLE zero_copy_mutation
|
|
||||||
MATERIALIZE COLUMN valueX
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
wait_mutations(node1, "zero_copy_mutation", 10)
|
|
||||||
wait_mutations(node2, "zero_copy_mutation", 10)
|
|
||||||
|
|
||||||
# If bug present at least one node has metadata with incorrect ref_count values.
|
|
||||||
# But it may be any node depends on mutation execution order.
|
|
||||||
# We can try to find one, but this required knowledge about internal metadata structure.
|
|
||||||
# It can be change in future, so we do not find this node here.
|
|
||||||
# And with the bug test may be success sometimes.
|
|
||||||
nodeX = node1
|
|
||||||
nodeY = node2
|
|
||||||
|
|
||||||
objectsY = nodeY.get_table_objects("zero_copy_mutation")
|
|
||||||
check_objects_exisis(cluster, objectsY)
|
|
||||||
|
|
||||||
nodeX.query(
|
|
||||||
"""
|
|
||||||
ALTER TABLE zero_copy_mutation
|
|
||||||
DETACH PARTITION '0'
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
nodeX.query(
|
|
||||||
"""
|
|
||||||
ALTER TABLE zero_copy_mutation
|
|
||||||
ATTACH PARTITION '0'
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
wait_mutations(node1, "zero_copy_mutation", 10)
|
|
||||||
wait_mutations(node2, "zero_copy_mutation", 10)
|
|
||||||
|
|
||||||
nodeX.query(
|
|
||||||
"""
|
|
||||||
DROP TABLE zero_copy_mutation SYNC
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
# time to remove objects
|
|
||||||
time.sleep(10)
|
|
||||||
|
|
||||||
nodeY.query(
|
|
||||||
"""
|
|
||||||
SELECT count() FROM zero_copy_mutation
|
|
||||||
WHERE value3 LIKE '%ab%'
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
check_objects_exisis(cluster, objectsY)
|
|
||||||
|
|
||||||
nodeY.query(
|
|
||||||
"""
|
|
||||||
DROP TABLE zero_copy_mutation SYNC
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
|
|
||||||
# time to remove objects
|
|
||||||
time.sleep(10)
|
|
||||||
|
|
||||||
check_objects_not_exisis(cluster, objectsY)
|
|
||||||
|
Loading…
Reference in New Issue
Block a user