Merge pull request #48862 from ClickHouse/fix-zero-copy-replication-drop-detached-parts

Fix unexpected part name error when trying to drop a ignored detached part with zero copy replication
This commit is contained in:
alesapin 2023-04-24 12:57:25 +02:00 committed by GitHub
commit 516a0c9784
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 158 additions and 28 deletions

View File

@ -14,37 +14,40 @@
#include <Common/thread_local_rng.h>
#include <Common/typeid_cast.h>
#include <Common/ThreadFuzzer.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <base/sort.h>
#include <Storages/AlterCommands.h>
#include <Storages/PartitionCommands.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/Freeze.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Storages/MergeTree/extractZkPathFromCreateQuery.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/LeaderElection.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergeFromLogEntryTask.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/PinnedPartUUIDs.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAttachThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
#include <Storages/MergeTree/ReplicatedMergeTreeSink.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumAddedParts.h>
#include <Storages/MergeTree/ReplicatedMergeTreePartHeader.h>
#include <Storages/MergeTree/MergeFromLogEntryTask.h>
#include <Storages/MergeTree/MutateFromLogEntryTask.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreeReaderCompact.h>
#include <Storages/MergeTree/LeaderElection.h>
#include <Storages/MergeTree/MutateFromLogEntryTask.h>
#include <Storages/MergeTree/PinnedPartUUIDs.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAttachThread.h>
#include <Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreePartHeader.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumAddedParts.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeSink.h>
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
#include <Storages/MergeTree/ZeroCopyLock.h>
#include <Storages/MergeTree/extractZkPathFromCreateQuery.h>
#include <Storages/Freeze.h>
#include <Storages/PartitionCommands.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Databases/DatabaseOnDisk.h>
#include <Databases/DatabaseReplicated.h>
@ -96,7 +99,6 @@
#include <base/scope_guard.h>
#include <Common/scope_guard_safe.h>
#include <Storages/MergeTree/AsyncBlockIDsCache.h>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/replace.hpp>
@ -149,6 +151,7 @@ namespace ErrorCodes
extern const int REPLICA_IS_NOT_IN_QUORUM;
extern const int TABLE_IS_READ_ONLY;
extern const int NOT_FOUND_NODE;
extern const int BAD_DATA_PART_NAME;
extern const int NO_ACTIVE_REPLICAS;
extern const int NOT_A_LEADER;
extern const int TABLE_WAS_NOT_DROPPED;
@ -8281,7 +8284,7 @@ StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part, co
return std::make_pair(true, NameSet{});
return unlockSharedDataByID(
part.getUniqueId(), shared_id, part.name, replica_name,
part.getUniqueId(), shared_id, part.info, replica_name,
part.getDataPartStorage().getDiskType(), zookeeper, *getSettings(), log, zookeeper_path, format_version);
}
@ -8298,11 +8301,10 @@ namespace
/// But sometimes we need an opposite. When we deleting all_0_0_0_1 it can be non replicated to other replicas, so we are the only owner of this part.
/// In this case when we will drop all_0_0_0_1 we will drop blobs for all_0_0_0. But it will lead to dataloss. For such case we need to check that other replicas
/// still need parent part.
std::pair<bool, NameSet> getParentLockedBlobs(const ZooKeeperWithFaultInjectionPtr & zookeeper_ptr, const std::string & zero_copy_part_path_prefix, const std::string & part_info_str, MergeTreeDataFormatVersion format_version, Poco::Logger * log)
std::pair<bool, NameSet> getParentLockedBlobs(const ZooKeeperWithFaultInjectionPtr & zookeeper_ptr, const std::string & zero_copy_part_path_prefix, const MergeTreePartInfo & part_info, MergeTreeDataFormatVersion format_version, Poco::Logger * log)
{
NameSet files_not_to_remove;
MergeTreePartInfo part_info = MergeTreePartInfo::fromPartName(part_info_str, format_version);
/// No mutations -- no hardlinks -- no issues
if (part_info.mutation == 0)
return {false, files_not_to_remove};
@ -8336,7 +8338,7 @@ std::pair<bool, NameSet> getParentLockedBlobs(const ZooKeeperWithFaultInjectionP
/// We are mutation child of this parent
if (part_info.isMutationChildOf(parent_candidate_info))
{
LOG_TRACE(log, "Found mutation parent {} for part {}", part_candidate_info_str, part_info_str);
LOG_TRACE(log, "Found mutation parent {} for part {}", part_candidate_info_str, part_info.getPartNameV1());
/// Get hardlinked files
String files_not_to_remove_str;
Coordination::Error code;
@ -8359,12 +8361,14 @@ std::pair<bool, NameSet> getParentLockedBlobs(const ZooKeeperWithFaultInjectionP
}
std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedDataByID(
String part_id, const String & table_uuid, const String & part_name,
String part_id, const String & table_uuid, const MergeTreePartInfo & part_info,
const String & replica_name_, const std::string & disk_type, const ZooKeeperWithFaultInjectionPtr & zookeeper_ptr, const MergeTreeSettings & settings,
Poco::Logger * logger, const String & zookeeper_path_old, MergeTreeDataFormatVersion data_format_version)
{
boost::replace_all(part_id, "/", "_");
auto part_name = part_info.getPartNameV1();
Strings zc_zookeeper_paths = getZeroCopyPartPath(settings, disk_type, table_uuid, part_name, zookeeper_path_old);
bool part_has_no_more_locks = true;
@ -8380,7 +8384,7 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedDataByID(
boost::split(files_not_to_remove, files_not_to_remove_str, boost::is_any_of("\n "));
auto [has_parent, parent_not_to_remove] = getParentLockedBlobs(
zookeeper_ptr, fs::path(zc_zookeeper_path).parent_path(), part_name, data_format_version, logger);
zookeeper_ptr, fs::path(zc_zookeeper_path).parent_path(), part_info, data_format_version, logger);
files_not_to_remove.insert(parent_not_to_remove.begin(), parent_not_to_remove.end());
String zookeeper_part_uniq_node = fs::path(zc_zookeeper_path) / part_id;
@ -9035,6 +9039,11 @@ bool StorageReplicatedMergeTree::removeSharedDetachedPart(DiskPtr disk, const St
NameSet files_not_to_remove;
// zero copy replication is only available since format version 1 so we can safely use it here
auto part_info = DetachedPartInfo::parseDetachedPartName(disk, part_name, MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING);
if (!part_info.valid_name)
throw Exception(ErrorCodes::BAD_DATA_PART_NAME, "Invalid detached part name {} on disk {}", path, disk->getName());
fs::path checksums = fs::path(path) / IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK;
if (disk->exists(checksums))
{
@ -9043,7 +9052,7 @@ bool StorageReplicatedMergeTree::removeSharedDetachedPart(DiskPtr disk, const St
String id = disk->getUniqueId(checksums);
bool can_remove = false;
std::tie(can_remove, files_not_to_remove) = StorageReplicatedMergeTree::unlockSharedDataByID(
id, table_uuid, part_name,
id, table_uuid, part_info,
detached_replica_name,
toString(disk->getDataSourceDescription().type),
std::make_shared<ZooKeeperWithFaultInjection>(zookeeper), local_context->getReplicatedMergeTreeSettings(),

View File

@ -267,7 +267,7 @@ public:
static std::pair<bool, NameSet> unlockSharedDataByID(
String part_id,
const String & table_uuid,
const String & part_name,
const MergeTreePartInfo & part_info,
const String & replica_name_,
const std::string & disk_type,
const ZooKeeperWithFaultInjectionPtr & zookeeper_,

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,30 @@
<clickhouse>
<logger>
<level>test</level>
</logger>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
<merge_tree>
<storage_policy>s3</storage_policy>
<allow_remote_fs_zero_copy_replication>true</allow_remote_fs_zero_copy_replication>
</merge_tree>
</clickhouse>

View File

@ -0,0 +1,90 @@
#!/usr/bin/env python3
import logging
import random
import string
import time
import os
from multiprocessing.dummy import Pool
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.add_instance(
"node1",
main_configs=["configs/storage_conf.xml"],
with_minio=True,
with_zookeeper=True,
)
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_drop_detached_part(started_cluster):
node1 = cluster.instances["node1"]
node1.query(
"""
CREATE TABLE test1 (EventDate Date, CounterID UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse-tables/test1', 'r1')
ORDER BY (CounterID, EventDate)"""
)
node1.query(
"INSERT INTO test1 SELECT toDate('2023-01-01') + toIntervalDay(number), number + 1000 from system.numbers limit 20"
)
node1.query("ALTER TABLE test1 DETACH PART 'all_0_0_0'")
def get_path_to_detached_part(query_result):
part_to_disk = {}
for row in query_result.strip().split("\n"):
print(row)
return row
path_to_detached_part = get_path_to_detached_part(
node1.query("SELECT path FROM system.detached_parts where table = 'test1'")
)
new_part_name = "ignored_" + os.path.basename(path_to_detached_part)
new_path_to_detached_part_name = (
os.path.dirname(path_to_detached_part) + os.sep + new_part_name
)
node1.exec_in_container(
[
"bash",
"-c",
f"mv {path_to_detached_part} {new_path_to_detached_part_name}",
],
privileged=True,
user="root",
)
assert (
node1.query(
"SELECT path FROM system.detached_parts where table = 'test1'"
).strip()
== new_path_to_detached_part_name
)
node1.query(
f"ALTER TABLE test1 DROP DETACHED PART '{new_part_name}'",
settings={"allow_drop_detached": 1},
)
assert (
node1.query(
"SELECT path FROM system.detached_parts where table = 'test1'"
).strip()
== ""
)