Disallow to drop parts with parallel quorum

This commit is contained in:
alesapin 2020-11-03 12:24:10 +03:00
parent 294489f6ad
commit da8ac948bf
7 changed files with 98 additions and 67 deletions

View File

@ -42,6 +42,7 @@ namespace ErrorCodes
extern const int CANNOT_ASSIGN_OPTIMIZE; extern const int CANNOT_ASSIGN_OPTIMIZE;
extern const int TIMEOUT_EXCEEDED; extern const int TIMEOUT_EXCEEDED;
extern const int UNKNOWN_POLICY; extern const int UNKNOWN_POLICY;
extern const int NO_SUCH_DATA_PART;
} }
namespace ActionLocks namespace ActionLocks
@ -1229,7 +1230,6 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, bool
MergeTreeData::DataPartsVector parts_to_remove; MergeTreeData::DataPartsVector parts_to_remove;
/// TODO: should we include PreComitted parts like in Replicated case?
if (drop_part) if (drop_part)
{ {
String part_name = partition->as<ASTLiteral &>().value.safeGet<String>(); String part_name = partition->as<ASTLiteral &>().value.safeGet<String>();
@ -1237,7 +1237,10 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, bool
if (part) if (part)
parts_to_remove.push_back(part); parts_to_remove.push_back(part);
} else else
throw Exception("Part " + part_name + " not found, won't try to drop it.", ErrorCodes::NO_SUCH_DATA_PART);
}
else
{ {
String partition_id = getPartitionIDFromQuery(partition, context); String partition_id = getPartitionIDFromQuery(partition, context);
parts_to_remove = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); parts_to_remove = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);

View File

@ -115,6 +115,7 @@ namespace ErrorCodes
extern const int DIRECTORY_ALREADY_EXISTS; extern const int DIRECTORY_ALREADY_EXISTS;
extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int UNKNOWN_POLICY; extern const int UNKNOWN_POLICY;
extern const int NO_SUCH_DATA_PART;
} }
namespace ActionLocks namespace ActionLocks
@ -3275,61 +3276,34 @@ void StorageReplicatedMergeTree::cleanLastPartNode(const String & partition_id)
} }
} }
void StorageReplicatedMergeTree::updateLastPartNodeIfMatches(const String & partition_id, const String & old_part_name, const String & new_part_name)
bool StorageReplicatedMergeTree::partIsInsertingWithParallelQuorum(const MergeTreePartInfo & part_info) const
{
auto zookeeper = getZooKeeper();
return zookeeper->exists(zookeeper_path + "/quorum/parallel/" + part_info.getPartName());
}
bool StorageReplicatedMergeTree::partIsLastQuorumPart(const MergeTreePartInfo & part_info) const
{ {
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();
const String quorum_last_part_path = zookeeper_path + "/quorum/last_part"; const String parts_with_quorum_path = zookeeper_path + "/quorum/last_part";
while (true) String parts_with_quorum_str = zookeeper->get(parts_with_quorum_path);
{
Coordination::Stat added_parts_stat; if (parts_with_quorum_str.empty())
String old_added_parts = zookeeper->get(quorum_last_part_path, &added_parts_stat); return false;
ReplicatedMergeTreeQuorumAddedParts parts_with_quorum(format_version); ReplicatedMergeTreeQuorumAddedParts parts_with_quorum(format_version);
parts_with_quorum.fromString(parts_with_quorum_str);
if (!old_added_parts.empty()) auto partition_it = parts_with_quorum.added_parts.find(part_info.partition_id);
parts_with_quorum.fromString(old_added_parts); if (partition_it == parts_with_quorum.added_parts.end())
return false;
if (!parts_with_quorum.added_parts.count(partition_id)) return partition_it->second == part_info.getPartName();
{
/// There is no information about partition at all.
break;
}
/// Part for which last quorum was reached in partition_id.
auto quorum_part_info = MergeTreePartInfo::fromPartName(parts_with_quorum.added_parts.at(partition_id), format_version);
auto old_part_info = MergeTreePartInfo::fromPartName(old_part_name, format_version);
/// Update last part for which quorum was reached.
if (old_part_info.contains(quorum_part_info))
parts_with_quorum.added_parts.emplace(partition_id, new_part_name);
/// Serialize and try update.
String new_added_parts = parts_with_quorum.toString();
auto code = zookeeper->trySet(quorum_last_part_path, new_added_parts, added_parts_stat.version);
if (code == Coordination::Error::ZOK)
{
break;
}
else if (code == Coordination::Error::ZNONODE)
{
/// Node is deleted. It is impossible, but it is Ok.
break;
}
else if (code == Coordination::Error::ZBADVERSION)
{
/// Node was updated meanwhile. We must re-read it and repeat all the actions.
continue;
}
else
throw Coordination::Exception(code, quorum_last_part_path);
}
} }
bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const StorageMetadataPtr & metadata_snapshot, bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const StorageMetadataPtr & metadata_snapshot,
const String & source_replica_path, bool to_detached, size_t quorum, zkutil::ZooKeeper::Ptr zookeeper_) const String & source_replica_path, bool to_detached, size_t quorum, zkutil::ZooKeeper::Ptr zookeeper_)
{ {
@ -4361,23 +4335,7 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool de
} }
} }
bool drop_entire_partition = !drop_part; if (!drop_part)
/// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper.
if (drop_part)
{
auto part_info = MergeTreePartInfo::fromPartName(partition->as<ASTLiteral &>().value.safeGet<String>(), format_version);
auto data_parts_vec = getDataPartsVectorInPartition(DataPartState::Committed, part_info.partition_id);
std::sort(data_parts_vec.begin(), data_parts_vec.end(), LessDataPart());
auto prev_part = std::upper_bound(data_parts_vec.begin(), data_parts_vec.end(), part_info, LessDataPart());
if (prev_part != data_parts_vec.end())
updateLastPartNodeIfMatches(part_info.partition_id, part_info.getPartName(), (*prev_part)->info.getPartName());
else if (data_parts_vec.empty())
drop_entire_partition = true;
}
if (drop_entire_partition)
{ {
String partition_id = getPartitionIDFromQuery(partition, query_context); String partition_id = getPartitionIDFromQuery(partition, query_context);
cleanLastPartNode(partition_id); cleanLastPartNode(partition_id);
@ -6058,14 +6016,22 @@ bool StorageReplicatedMergeTree::dropPart(
auto part = getPartIfExists(part_info, {MergeTreeDataPartState::Committed}); auto part = getPartIfExists(part_info, {MergeTreeDataPartState::Committed});
if (!part) if (!part)
throw Exception("Part " + part_name + " not found locally, won't try to drop it.", ErrorCodes::NOT_IMPLEMENTED); throw Exception("Part " + part_name + " not found locally, won't try to drop it.", ErrorCodes::NO_SUCH_DATA_PART);
/// There isn't a lot we can do otherwise. Can't cancel merges because it is possible that a replica already /// There isn't a lot we can do otherwise. Can't cancel merges because it is possible that a replica already
/// finished the merge. /// finished the merge.
if (partIsAssignedToBackgroundOperation(part)) if (partIsAssignedToBackgroundOperation(part))
throw Exception("Part " + part_name throw Exception("Part " + part_name
+ " is currently participating in a background operation (mutation/merge)" + " is currently participating in a background operation (mutation/merge)"
+ ", try again later.", ErrorCodes::PART_IS_TEMPORARILY_LOCKED); + ", try again later", ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
if (partIsLastQuorumPart(part->info))
throw Exception("Part " + part_name + " is last inserted part with quorum in partition. Cannot drop",
ErrorCodes::NOT_IMPLEMENTED);
if (partIsInsertingWithParallelQuorum(part->info))
throw Exception("Part " + part_name + " is inserting with parallel quorum. Cannot drop",
ErrorCodes::NOT_IMPLEMENTED);
Coordination::Requests ops; Coordination::Requests ops;
getClearBlocksInPartitionOps(ops, *zookeeper, part_info.partition_id, part_info.min_block, part_info.max_block); getClearBlocksInPartitionOps(ops, *zookeeper, part_info.partition_id, part_info.min_block, part_info.max_block);

View File

@ -502,7 +502,11 @@ private:
/// Deletes info from quorum/last_part node for particular partition_id. /// Deletes info from quorum/last_part node for particular partition_id.
void cleanLastPartNode(const String & partition_id); void cleanLastPartNode(const String & partition_id);
void updateLastPartNodeIfMatches(const String & partition_id, const String & old_part_name, const String & new_part_name); /// Part name is stored in quorum/last_part for corresponding partition_id.
bool partIsLastQuorumPart(const MergeTreePartInfo & part_info) const;
/// Part currently inserting with quorum (node quorum/parallel/part_name exists)
bool partIsInsertingWithParallelQuorum(const MergeTreePartInfo & part_info) const;
/// Creates new block number if block with such block_id does not exist /// Creates new block number if block with such block_id does not exist
std::optional<EphemeralLockInZooKeeper> allocateBlockNumber( std::optional<EphemeralLockInZooKeeper> allocateBlockNumber(

View File

@ -9,6 +9,8 @@ INSERT INTO mt VALUES (2);
SELECT v FROM mt ORDER BY v; SELECT v FROM mt ORDER BY v;
ALTER TABLE mt DETACH PART 'all_100_100_0'; -- { serverError 232 }
ALTER TABLE mt DETACH PART 'all_2_2_0'; ALTER TABLE mt DETACH PART 'all_2_2_0';
SELECT v FROM mt ORDER BY v; SELECT v FROM mt ORDER BY v;

View File

@ -0,0 +1,5 @@
all_0_0_0
all_2_2_0
1
all_2_2_0
1

View File

@ -0,0 +1,49 @@
SET replication_alter_partitions_sync = 2;
DROP TABLE IF EXISTS replica1;
DROP TABLE IF EXISTS replica2;
CREATE TABLE replica1 (v UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/01451/quorum', 'r1') order by tuple() settings max_replicated_merges_in_queue = 0;
CREATE TABLE replica2 (v UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/01451/quorum', 'r2') order by tuple() settings max_replicated_merges_in_queue = 0;
INSERT INTO replica1 VALUES (0);
SYSTEM SYNC REPLICA replica2;
SELECT name FROM system.parts WHERE table = 'replica2' and database = currentDatabase() and active = 1;
ALTER TABLE replica2 DETACH PART 'all_0_0_0';
SELECT * FROM replica1;
SELECT * FROM replica2;
-- drop of empty partition works
ALTER TABLE replica2 DROP PARTITION ID 'all';
SET insert_quorum=2;
INSERT INTO replica2 VALUES (1);
SYSTEM SYNC REPLICA replica2;
ALTER TABLE replica1 DETACH PART 'all_2_2_0'; --{serverError 48}
SELECT name FROM system.parts WHERE table = 'replica1' and database = currentDatabase() and active = 1 ORDER BY name;
SELECT COUNT() FROM replica1;
SET insert_quorum_parallel=1;
INSERT INTO replica2 VALUES (2);
-- should work, parallel quorum nodes exists only during insert
ALTER TABLE replica1 DROP PART 'all_3_3_0';
SELECT name FROM system.parts WHERE table = 'replica1' and database = currentDatabase() and active = 1 ORDER BY name;
SELECT COUNT() FROM replica1;
DROP TABLE IF EXISTS replica1;
DROP TABLE IF EXISTS replica2;

View File

@ -10,6 +10,8 @@ INSERT INTO replica1 VALUES (0);
INSERT INTO replica1 VALUES (1); INSERT INTO replica1 VALUES (1);
INSERT INTO replica1 VALUES (2); INSERT INTO replica1 VALUES (2);
ALTER TABLE replica1 DETACH PART 'all_100_100_0'; -- { serverError 232 }
SELECT v FROM replica1 ORDER BY v; SELECT v FROM replica1 ORDER BY v;
SYSTEM SYNC REPLICA replica2; SYSTEM SYNC REPLICA replica2;