Merge pull request #56533 from ClickHouse/rmt_check_shutdown_flags_in_retry_loops

ReplicatedMergeTree: check shutdown flags in retry loops
This commit is contained in:
Alexander Tokmakov 2023-11-28 15:24:40 +01:00 committed by GitHub
commit e40c71a74c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 1 deletions

View File

@ -1803,6 +1803,9 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd
while (true) while (true)
{ {
if (shutdown_called || partial_shutdown_called)
throw Exception(ErrorCodes::ABORTED, "Cannot commit part because shutdown called");
Coordination::Requests ops; Coordination::Requests ops;
size_t num_check_ops; size_t num_check_ops;
getOpsToCheckPartChecksumsAndCommit(zookeeper, part, hardlinked_files, replace_zero_copy_lock, ops, num_check_ops); getOpsToCheckPartChecksumsAndCommit(zookeeper, part, hardlinked_files, replace_zero_copy_lock, ops, num_check_ops);
@ -2833,6 +2836,9 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
/// we can possibly duplicate entries in queue of cloned replica. /// we can possibly duplicate entries in queue of cloned replica.
while (true) while (true)
{ {
if (shutdown_called)
throw Exception(ErrorCodes::ABORTED, "Cannot clone replica because shutdown called");
Coordination::Stat log_pointer_stat; Coordination::Stat log_pointer_stat;
String raw_log_pointer = zookeeper->get(fs::path(source_path) / "log_pointer", &log_pointer_stat); String raw_log_pointer = zookeeper->get(fs::path(source_path) / "log_pointer", &log_pointer_stat);
@ -3199,6 +3205,9 @@ void StorageReplicatedMergeTree::cloneMetadataIfNeeded(const String & source_rep
String source_columns; String source_columns;
while (true) while (true)
{ {
if (shutdown_called)
throw Exception(ErrorCodes::ABORTED, "Cannot clone metadata because shutdown called");
Coordination::Stat metadata_stat; Coordination::Stat metadata_stat;
Coordination::Stat columns_stat; Coordination::Stat columns_stat;
source_metadata = zookeeper->get(source_path + "/metadata", &metadata_stat); source_metadata = zookeeper->get(source_path + "/metadata", &metadata_stat);
@ -4028,6 +4037,8 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
while (true) while (true)
{ {
if (shutdown_called || partial_shutdown_called)
throw Exception(ErrorCodes::ABORTED, "Cannot remove part because shutdown called");
Coordination::Requests ops; Coordination::Requests ops;
@ -4525,6 +4536,9 @@ void StorageReplicatedMergeTree::cleanLastPartNode(const String & partition_id)
while (true) while (true)
{ {
if (shutdown_called || partial_shutdown_called)
throw Exception(ErrorCodes::ABORTED, "Cannot clean last part node because shutdown called");
Coordination::Stat added_parts_stat; Coordination::Stat added_parts_stat;
String old_added_parts = zookeeper->get(quorum_last_part_path, &added_parts_stat); String old_added_parts = zookeeper->get(quorum_last_part_path, &added_parts_stat);
@ -7256,6 +7270,9 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, Conte
/// Should work well if the number of concurrent mutation requests is small. /// Should work well if the number of concurrent mutation requests is small.
while (true) while (true)
{ {
if (shutdown_called || partial_shutdown_called)
throw Exception(ErrorCodes::ABORTED, "Cannot assign mutation because shutdown called");
Coordination::Stat mutations_stat; Coordination::Stat mutations_stat;
zookeeper->get(mutations_path, &mutations_stat); zookeeper->get(mutations_path, &mutations_stat);
@ -8526,6 +8543,9 @@ bool StorageReplicatedMergeTree::dropPartImpl(
while (true) while (true)
{ {
if (shutdown_called || partial_shutdown_called)
throw Exception(ErrorCodes::ABORTED, "Cannot drop part because shutdown called");
ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper, PartitionIdsHint{part_info.partition_id}); ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper, PartitionIdsHint{part_info.partition_id});
auto part = getPartIfExists(part_info, {MergeTreeDataPartState::Active}); auto part = getPartIfExists(part_info, {MergeTreeDataPartState::Active});
@ -9852,6 +9872,9 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
while (true) while (true)
{ {
if (shutdown_called || partial_shutdown_called)
throw Exception(ErrorCodes::ABORTED, "Cannot create an empty part because shutdown called");
/// We should be careful when creating an empty part, because we are not sure that this part is still needed. /// We should be careful when creating an empty part, because we are not sure that this part is still needed.
/// For example, it's possible that part (or partition) was dropped (or replaced) concurrently. /// For example, it's possible that part (or partition) was dropped (or replaced) concurrently.
/// We can enqueue part for check from DataPartExchange or SelectProcessor /// We can enqueue part for check from DataPartExchange or SelectProcessor

View File

@ -69,7 +69,7 @@ function alter_table()
if [ -z "$table" ]; then continue; fi if [ -z "$table" ]; then continue; fi
$CLICKHOUSE_CLIENT --distributed_ddl_task_timeout=0 -q \ $CLICKHOUSE_CLIENT --distributed_ddl_task_timeout=0 -q \
"alter table $table update n = n + (select max(n) from merge(REGEXP('${CLICKHOUSE_DATABASE}.*'), '.*')) where 1 settings allow_nondeterministic_mutations=1" \ "alter table $table update n = n + (select max(n) from merge(REGEXP('${CLICKHOUSE_DATABASE}.*'), '.*')) where 1 settings allow_nondeterministic_mutations=1" \
2>&1| grep -Fa "Exception: " | grep -Fv "Cannot enqueue query" | grep -Fv "ZooKeeper session expired" | grep -Fv UNKNOWN_DATABASE | grep -Fv UNKNOWN_TABLE | grep -Fv TABLE_IS_READ_ONLY | grep -Fv TABLE_IS_DROPPED | grep -Fv "Error while executing table function merge" 2>&1| grep -Fa "Exception: " | grep -Fv "Cannot enqueue query" | grep -Fv "ZooKeeper session expired" | grep -Fv UNKNOWN_DATABASE | grep -Fv UNKNOWN_TABLE | grep -Fv TABLE_IS_READ_ONLY | grep -Fv TABLE_IS_DROPPED | grep -Fv ABORTED | grep -Fv "Error while executing table function merge"
sleep 0.$RANDOM sleep 0.$RANDOM
done done
} }