From da8ac948bfb1b329bcc3d479981177ad0b2344f2 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 3 Nov 2020 12:24:10 +0300 Subject: [PATCH] Disallow to drop parts with parallel quorum --- src/Storages/StorageMergeTree.cpp | 7 +- src/Storages/StorageReplicatedMergeTree.cpp | 94 ++++++------------- src/Storages/StorageReplicatedMergeTree.h | 6 +- .../0_stateless/01451_detach_drop_part.sql | 2 + ...eplicated_detach_drop_and_quorum.reference | 5 + ...1451_replicated_detach_drop_and_quorum.sql | 49 ++++++++++ .../01451_replicated_detach_drop_part.sql | 2 + 7 files changed, 98 insertions(+), 67 deletions(-) create mode 100644 tests/queries/0_stateless/01451_replicated_detach_drop_and_quorum.reference create mode 100644 tests/queries/0_stateless/01451_replicated_detach_drop_and_quorum.sql diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 844884714ab..8f32d784055 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -42,6 +42,7 @@ namespace ErrorCodes extern const int CANNOT_ASSIGN_OPTIMIZE; extern const int TIMEOUT_EXCEEDED; extern const int UNKNOWN_POLICY; + extern const int NO_SUCH_DATA_PART; } namespace ActionLocks @@ -1229,7 +1230,6 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, bool MergeTreeData::DataPartsVector parts_to_remove; - /// TODO: should we include PreComitted parts like in Replicated case? if (drop_part) { String part_name = partition->as().value.safeGet(); @@ -1237,7 +1237,10 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, bool if (part) parts_to_remove.push_back(part); - } else + else + throw Exception("Part " + part_name + " not found, won't try to drop it.", ErrorCodes::NO_SUCH_DATA_PART); + } + else { String partition_id = getPartitionIDFromQuery(partition, context); parts_to_remove = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index a8e2df4e8f9..2f3d3ae1010 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -115,6 +115,7 @@ namespace ErrorCodes extern const int DIRECTORY_ALREADY_EXISTS; extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int UNKNOWN_POLICY; + extern const int NO_SUCH_DATA_PART; } namespace ActionLocks @@ -3275,61 +3276,34 @@ void StorageReplicatedMergeTree::cleanLastPartNode(const String & partition_id) } } -void StorageReplicatedMergeTree::updateLastPartNodeIfMatches(const String & partition_id, const String & old_part_name, const String & new_part_name) + +bool StorageReplicatedMergeTree::partIsInsertingWithParallelQuorum(const MergeTreePartInfo & part_info) const +{ + auto zookeeper = getZooKeeper(); + return zookeeper->exists(zookeeper_path + "/quorum/parallel/" + part_info.getPartName()); +} + +bool StorageReplicatedMergeTree::partIsLastQuorumPart(const MergeTreePartInfo & part_info) const { auto zookeeper = getZooKeeper(); - const String quorum_last_part_path = zookeeper_path + "/quorum/last_part"; + const String parts_with_quorum_path = zookeeper_path + "/quorum/last_part"; - while (true) - { - Coordination::Stat added_parts_stat; - String old_added_parts = zookeeper->get(quorum_last_part_path, &added_parts_stat); + String parts_with_quorum_str = zookeeper->get(parts_with_quorum_path); - ReplicatedMergeTreeQuorumAddedParts parts_with_quorum(format_version); + if (parts_with_quorum_str.empty()) + return false; - if (!old_added_parts.empty()) - parts_with_quorum.fromString(old_added_parts); + ReplicatedMergeTreeQuorumAddedParts parts_with_quorum(format_version); + parts_with_quorum.fromString(parts_with_quorum_str); - if (!parts_with_quorum.added_parts.count(partition_id)) - { - /// There is no information about partition at all. - break; - } + auto partition_it = parts_with_quorum.added_parts.find(part_info.partition_id); + if (partition_it == parts_with_quorum.added_parts.end()) + return false; - /// Part for which last quorum was reached in partition_id. - auto quorum_part_info = MergeTreePartInfo::fromPartName(parts_with_quorum.added_parts.at(partition_id), format_version); - auto old_part_info = MergeTreePartInfo::fromPartName(old_part_name, format_version); - - /// Update last part for which quorum was reached. - if (old_part_info.contains(quorum_part_info)) - parts_with_quorum.added_parts.emplace(partition_id, new_part_name); - - /// Serialize and try update. - String new_added_parts = parts_with_quorum.toString(); - - auto code = zookeeper->trySet(quorum_last_part_path, new_added_parts, added_parts_stat.version); - - if (code == Coordination::Error::ZOK) - { - break; - } - else if (code == Coordination::Error::ZNONODE) - { - /// Node is deleted. It is impossible, but it is Ok. - break; - } - else if (code == Coordination::Error::ZBADVERSION) - { - /// Node was updated meanwhile. We must re-read it and repeat all the actions. - continue; - } - else - throw Coordination::Exception(code, quorum_last_part_path); - } + return partition_it->second == part_info.getPartName(); } - bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const StorageMetadataPtr & metadata_snapshot, const String & source_replica_path, bool to_detached, size_t quorum, zkutil::ZooKeeper::Ptr zookeeper_) { @@ -4361,23 +4335,7 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool de } } - bool drop_entire_partition = !drop_part; - - /// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper. - if (drop_part) - { - auto part_info = MergeTreePartInfo::fromPartName(partition->as().value.safeGet(), format_version); - auto data_parts_vec = getDataPartsVectorInPartition(DataPartState::Committed, part_info.partition_id); - std::sort(data_parts_vec.begin(), data_parts_vec.end(), LessDataPart()); - - auto prev_part = std::upper_bound(data_parts_vec.begin(), data_parts_vec.end(), part_info, LessDataPart()); - if (prev_part != data_parts_vec.end()) - updateLastPartNodeIfMatches(part_info.partition_id, part_info.getPartName(), (*prev_part)->info.getPartName()); - else if (data_parts_vec.empty()) - drop_entire_partition = true; - } - - if (drop_entire_partition) + if (!drop_part) { String partition_id = getPartitionIDFromQuery(partition, query_context); cleanLastPartNode(partition_id); @@ -6058,14 +6016,22 @@ bool StorageReplicatedMergeTree::dropPart( auto part = getPartIfExists(part_info, {MergeTreeDataPartState::Committed}); if (!part) - throw Exception("Part " + part_name + " not found locally, won't try to drop it.", ErrorCodes::NOT_IMPLEMENTED); + throw Exception("Part " + part_name + " not found locally, won't try to drop it.", ErrorCodes::NO_SUCH_DATA_PART); /// There isn't a lot we can do otherwise. Can't cancel merges because it is possible that a replica already /// finished the merge. if (partIsAssignedToBackgroundOperation(part)) throw Exception("Part " + part_name + " is currently participating in a background operation (mutation/merge)" - + ", try again later.", ErrorCodes::PART_IS_TEMPORARILY_LOCKED); + + ", try again later", ErrorCodes::PART_IS_TEMPORARILY_LOCKED); + + if (partIsLastQuorumPart(part->info)) + throw Exception("Part " + part_name + " is last inserted part with quorum in partition. Cannot drop", + ErrorCodes::NOT_IMPLEMENTED); + + if (partIsInsertingWithParallelQuorum(part->info)) + throw Exception("Part " + part_name + " is inserting with parallel quorum. Cannot drop", + ErrorCodes::NOT_IMPLEMENTED); Coordination::Requests ops; getClearBlocksInPartitionOps(ops, *zookeeper, part_info.partition_id, part_info.min_block, part_info.max_block); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 2fc7848d87b..b72c3344f3e 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -502,7 +502,11 @@ private: /// Deletes info from quorum/last_part node for particular partition_id. void cleanLastPartNode(const String & partition_id); - void updateLastPartNodeIfMatches(const String & partition_id, const String & old_part_name, const String & new_part_name); + /// Part name is stored in quorum/last_part for corresponding partition_id. + bool partIsLastQuorumPart(const MergeTreePartInfo & part_info) const; + + /// Part currently inserting with quorum (node quorum/parallel/part_name exists) + bool partIsInsertingWithParallelQuorum(const MergeTreePartInfo & part_info) const; /// Creates new block number if block with such block_id does not exist std::optional allocateBlockNumber( diff --git a/tests/queries/0_stateless/01451_detach_drop_part.sql b/tests/queries/0_stateless/01451_detach_drop_part.sql index 47e61f2d924..7a2815f9a3e 100644 --- a/tests/queries/0_stateless/01451_detach_drop_part.sql +++ b/tests/queries/0_stateless/01451_detach_drop_part.sql @@ -9,6 +9,8 @@ INSERT INTO mt VALUES (2); SELECT v FROM mt ORDER BY v; +ALTER TABLE mt DETACH PART 'all_100_100_0'; -- { serverError 232 } + ALTER TABLE mt DETACH PART 'all_2_2_0'; SELECT v FROM mt ORDER BY v; diff --git a/tests/queries/0_stateless/01451_replicated_detach_drop_and_quorum.reference b/tests/queries/0_stateless/01451_replicated_detach_drop_and_quorum.reference new file mode 100644 index 00000000000..3768ac7b852 --- /dev/null +++ b/tests/queries/0_stateless/01451_replicated_detach_drop_and_quorum.reference @@ -0,0 +1,5 @@ +all_0_0_0 +all_2_2_0 +1 +all_2_2_0 +1 diff --git a/tests/queries/0_stateless/01451_replicated_detach_drop_and_quorum.sql b/tests/queries/0_stateless/01451_replicated_detach_drop_and_quorum.sql new file mode 100644 index 00000000000..fca14b81e27 --- /dev/null +++ b/tests/queries/0_stateless/01451_replicated_detach_drop_and_quorum.sql @@ -0,0 +1,49 @@ +SET replication_alter_partitions_sync = 2; + + +DROP TABLE IF EXISTS replica1; +DROP TABLE IF EXISTS replica2; + +CREATE TABLE replica1 (v UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/01451/quorum', 'r1') order by tuple() settings max_replicated_merges_in_queue = 0; +CREATE TABLE replica2 (v UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/01451/quorum', 'r2') order by tuple() settings max_replicated_merges_in_queue = 0; + +INSERT INTO replica1 VALUES (0); + +SYSTEM SYNC REPLICA replica2; + +SELECT name FROM system.parts WHERE table = 'replica2' and database = currentDatabase() and active = 1; + +ALTER TABLE replica2 DETACH PART 'all_0_0_0'; + +SELECT * FROM replica1; + +SELECT * FROM replica2; + +-- drop of empty partition works +ALTER TABLE replica2 DROP PARTITION ID 'all'; + +SET insert_quorum=2; + +INSERT INTO replica2 VALUES (1); + +SYSTEM SYNC REPLICA replica2; + +ALTER TABLE replica1 DETACH PART 'all_2_2_0'; --{serverError 48} + +SELECT name FROM system.parts WHERE table = 'replica1' and database = currentDatabase() and active = 1 ORDER BY name; + +SELECT COUNT() FROM replica1; + +SET insert_quorum_parallel=1; + +INSERT INTO replica2 VALUES (2); + +-- should work, parallel quorum nodes exists only during insert +ALTER TABLE replica1 DROP PART 'all_3_3_0'; + +SELECT name FROM system.parts WHERE table = 'replica1' and database = currentDatabase() and active = 1 ORDER BY name; + +SELECT COUNT() FROM replica1; + +DROP TABLE IF EXISTS replica1; +DROP TABLE IF EXISTS replica2; diff --git a/tests/queries/0_stateless/01451_replicated_detach_drop_part.sql b/tests/queries/0_stateless/01451_replicated_detach_drop_part.sql index 1209f11b68e..3cd9fc7bc7e 100644 --- a/tests/queries/0_stateless/01451_replicated_detach_drop_part.sql +++ b/tests/queries/0_stateless/01451_replicated_detach_drop_part.sql @@ -10,6 +10,8 @@ INSERT INTO replica1 VALUES (0); INSERT INTO replica1 VALUES (1); INSERT INTO replica1 VALUES (2); +ALTER TABLE replica1 DETACH PART 'all_100_100_0'; -- { serverError 232 } + SELECT v FROM replica1 ORDER BY v; SYSTEM SYNC REPLICA replica2;