diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 0ed3f89b2c3..188f6366f8b 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -4897,7 +4897,7 @@ void MergeTreeData::removeQueryId(const String & query_id) const { std::lock_guard lock(query_id_set_mutex); if (query_id_set.find(query_id) == query_id_set.end()) - LOG_WARNING(log, "We have query_id removed but it's not recorded. This is a bug"); + throw Exception(ErrorCodes::LOGICAL_ERROR, "We have query_id removed but it's not recorded. This is a bug"); else query_id_set.erase(query_id); } @@ -5077,7 +5077,10 @@ CurrentlySubmergingEmergingTagger::~CurrentlySubmergingEmergingTagger() for (const auto & part : submerging_parts) { if (!storage.currently_submerging_big_parts.count(part)) - LOG_WARNING(log, "currently_submerging_big_parts doesn't contain part {} to erase. This is a bug", part->name); + { + LOG_ERROR(log, "currently_submerging_big_parts doesn't contain part {} to erase. This is a bug", part->name); + assert(false); + } else storage.currently_submerging_big_parts.erase(part); } diff --git a/src/Storages/MergeTree/MergeTreePartInfo.h b/src/Storages/MergeTree/MergeTreePartInfo.h index e81143de8c2..66d5342b67f 100644 --- a/src/Storages/MergeTree/MergeTreePartInfo.h +++ b/src/Storages/MergeTree/MergeTreePartInfo.h @@ -71,6 +71,13 @@ struct MergeTreePartInfo || max_block < rhs.min_block; } + bool isFakeDropRangePart() const + { + /// Another max level was previously used for REPLACE/MOVE PARTITION + auto another_max_level = std::numeric_limits::max(); + return level == MergeTreePartInfo::MAX_LEVEL || level == another_max_level; + } + String getPartName() const; String getPartNameV0(DayNum left_date, DayNum right_date) const; UInt64 getBlocksCount() const diff --git a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp index 09b2a23767c..35c42a7f325 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp @@ -18,6 +18,7 @@ namespace DB namespace ErrorCodes { extern const int TABLE_DIFFERS_TOO_MUCH; + extern const int LOGICAL_ERROR; } static const auto PART_CHECK_ERROR_SLEEP_MS = 5 * 1000; @@ -190,7 +191,7 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPartAndFetchIfPossible( if (missing_part_search_result == MissingPartSearchResult::LostForever) { /// Is it in the replication queue? If there is - delete, because the task can not be processed. - if (!storage.queue.remove(zookeeper, part_name)) + if (!storage.queue.markPartAsLostForever(zookeeper, part_name)) { /// The part was not in our queue. LOG_WARNING(log, "Missing part {} is not in our queue, this can happen rarely.", part_name); @@ -367,8 +368,8 @@ void ReplicatedMergeTreePartCheckThread::run() { if (!parts_set.empty()) { - LOG_ERROR(log, "Non-empty parts_set with empty parts_queue. This is a bug."); parts_set.clear(); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Non-empty parts_set with empty parts_queue. This is a bug."); } } else @@ -401,7 +402,7 @@ void ReplicatedMergeTreePartCheckThread::run() if (parts_queue.empty()) { - LOG_ERROR(log, "Someone erased checking part from parts_queue. This is a bug."); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Someone erased checking part from parts_queue. This is a bug."); } else { diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 6b7fb3bf17f..f3c5e88f54a 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -174,6 +174,9 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() storage.partial_shutdown_called = false; storage.partial_shutdown_event.reset(); + /// Start queue processing + storage.background_executor.start(); + storage.queue_updating_task->activateAndSchedule(); storage.mutations_updating_task->activateAndSchedule(); storage.mutations_finalizing_task->activateAndSchedule(); @@ -227,7 +230,7 @@ void ReplicatedMergeTreeRestartingThread::removeFailedQuorumParts() { LOG_DEBUG(log, "Found part {} with failed quorum. Moving to detached. This shouldn't happen often.", part_name); storage.forgetPartAndMoveToDetached(part, "noquorum"); - storage.queue.removeFromVirtualParts(part->info); + storage.queue.removeFailedQuorumPart(part->info); } } } @@ -352,6 +355,9 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown() storage.cleanup_thread.stop(); storage.part_check_thread.stop(); + /// Stop queue processing + storage.background_executor.finish(); + LOG_TRACE(log, "Threads finished"); } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index b3da3e2287b..23abb39eb3e 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -1465,6 +1465,12 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry) return true; } + if (entry.type == LogEntry::PART_IS_LOST) + { + queue.executePartIsLost(getZooKeeper(), entry); + return true; + } + const bool is_get_or_attach = entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART; if (is_get_or_attach || entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::MUTATE_PART) @@ -2043,7 +2049,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry) if (code == Coordination::Error::ZOK) { LOG_DEBUG(log, "Marked quorum for part {} as failed.", entry.new_part_name); - queue.removeFromVirtualParts(part_info); + queue.removeFailedQuorumPart(part_info); return true; } else if (code == Coordination::Error::ZBADVERSION || code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS) @@ -2055,7 +2061,10 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry) } else { - LOG_WARNING(log, "No active replica has part {}, but that part needs quorum and /quorum/status contains entry about another part {}. It means that part was successfully written to {} replicas, but then all of them goes offline. Or it is a bug.", entry.new_part_name, quorum_entry.part_name, entry.quorum); + LOG_WARNING(log, "No active replica has part {}, " + "but that part needs quorum and /quorum/status contains entry about another part {}. " + "It means that part was successfully written to {} replicas, but then all of them goes offline. " + "Or it is a bug.", entry.new_part_name, quorum_entry.part_name, entry.quorum); } } } @@ -2743,7 +2752,6 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo { if (active_parts_set.getContainingPart(part).empty()) { - queue.remove(zookeeper, part); parts_to_remove_from_zk.emplace_back(part); LOG_WARNING(log, "Source replica does not have part {}. Removing it from ZooKeeper.", part); } @@ -2988,6 +2996,7 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zooke /// Clear obsolete queue that we no longer need. zookeeper->removeChildren(replica_path + "/queue"); + queue.clear(); /// Will do repair from the selected replica. cloneReplica(source_replica, source_is_lost_stat, zookeeper); diff --git a/tests/integration/test_consistent_parts_after_clone_replica/test.py b/tests/integration/test_consistent_parts_after_clone_replica/test.py index 784f94397af..459bba42028 100644 --- a/tests/integration/test_consistent_parts_after_clone_replica/test.py +++ b/tests/integration/test_consistent_parts_after_clone_replica/test.py @@ -37,12 +37,17 @@ def start_cluster(): def test_inconsistent_parts_if_drop_while_replica_not_active(start_cluster): with PartitionManager() as pm: # insert into all replicas - for i in range(50): + for i in range(10): node1.query("INSERT INTO test_table VALUES ('2019-08-16', {})".format(i)) assert_eq_with_retry(node2, "SELECT count(*) FROM test_table", node1.query("SELECT count(*) FROM test_table")) - # disable network on the first replica + # partition the first replica from the second one and (later) from zk pm.partition_instances(node1, node2) + + # insert some parts on the second replica only, we will drop these parts + for i in range(10): + node2.query("INSERT INTO test_table VALUES ('2019-08-16', {})".format(10 + i)) + pm.drop_instance_zk_connections(node1) # drop all parts on the second replica @@ -51,9 +56,14 @@ def test_inconsistent_parts_if_drop_while_replica_not_active(start_cluster): # insert into the second replica # DROP_RANGE will be removed from the replication log and the first replica will be lost - for i in range(50): - node2.query("INSERT INTO test_table VALUES ('2019-08-16', {})".format(50 + i)) + for i in range(20): + node2.query("INSERT INTO test_table VALUES ('2019-08-16', {})".format(20 + i)) # the first replica will be cloned from the second pm.heal_all() assert_eq_with_retry(node1, "SELECT count(*) FROM test_table", node2.query("SELECT count(*) FROM test_table")) + # ensure replica was cloned + assert node1.contains_in_log("Will mimic node2") + # queue must be empty (except some merges that are possibly executing right now) + assert node1.query("SELECT count() FROM system.replication_queue WHERE type != 'MERGE_PARTS'") == "0\n" + assert node2.query("SELECT count() FROM system.replication_queue WHERE type != 'MERGE_PARTS'") == "0\n" diff --git a/tests/queries/0_stateless/01307_multiple_leaders_zookeeper.sh b/tests/queries/0_stateless/01307_multiple_leaders_zookeeper.sh index 21fc88d7c2d..37ee5bf7ad7 100755 --- a/tests/queries/0_stateless/01307_multiple_leaders_zookeeper.sh +++ b/tests/queries/0_stateless/01307_multiple_leaders_zookeeper.sh @@ -12,7 +12,7 @@ DATA_SIZE=200 SEQ=$(seq 0 $(($NUM_REPLICAS - 1))) for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "DROP TABLE IF EXISTS r$REPLICA"; done -for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "CREATE TABLE r$REPLICA (x UInt64) ENGINE = ReplicatedMergeTree('/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table', 'r$REPLICA') ORDER BY x SETTINGS min_bytes_for_wide_part = '10M';"; done +for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "CREATE TABLE r$REPLICA (x UInt64) ENGINE = ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table', 'r$REPLICA') ORDER BY x SETTINGS min_bytes_for_wide_part = '10M';"; done function thread() { diff --git a/tests/queries/0_stateless/01320_create_sync_race_condition_zookeeper.sh b/tests/queries/0_stateless/01320_create_sync_race_condition_zookeeper.sh index 97c200c651f..678a3aa12fe 100755 --- a/tests/queries/0_stateless/01320_create_sync_race_condition_zookeeper.sh +++ b/tests/queries/0_stateless/01320_create_sync_race_condition_zookeeper.sh @@ -11,7 +11,7 @@ $CLICKHOUSE_CLIENT --query "CREATE DATABASE test_01320 ENGINE=Ordinary" # Diff function thread1() { - while true; do $CLICKHOUSE_CLIENT -n --query "CREATE TABLE test_01320.r (x UInt64) ENGINE = ReplicatedMergeTree('/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table', 'r') ORDER BY x; DROP TABLE test_01320.r;"; done + while true; do $CLICKHOUSE_CLIENT -n --query "CREATE TABLE test_01320.r (x UInt64) ENGINE = ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table', 'r') ORDER BY x; DROP TABLE test_01320.r;"; done } function thread2() diff --git a/tests/queries/0_stateless/01417_freeze_partition_verbose_zookeeper.sh b/tests/queries/0_stateless/01417_freeze_partition_verbose_zookeeper.sh index bb935a950ff..16b39db6e84 100755 --- a/tests/queries/0_stateless/01417_freeze_partition_verbose_zookeeper.sh +++ b/tests/queries/0_stateless/01417_freeze_partition_verbose_zookeeper.sh @@ -11,7 +11,7 @@ FREEZE_OUT_STRUCTURE='backup_name String, backup_path String , part_backup_path # setup ${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS table_for_freeze_replicated;" -${CLICKHOUSE_CLIENT} --query "CREATE TABLE table_for_freeze_replicated (key UInt64, value String) ENGINE = ReplicatedMergeTree('/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table_for_freeze_replicated', '1') ORDER BY key PARTITION BY key % 10;" +${CLICKHOUSE_CLIENT} --query "CREATE TABLE table_for_freeze_replicated (key UInt64, value String) ENGINE = ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table_for_freeze_replicated', '1') ORDER BY key PARTITION BY key % 10;" ${CLICKHOUSE_CLIENT} --query "INSERT INTO table_for_freeze_replicated SELECT number, toString(number) from numbers(10);" ${CLICKHOUSE_CLIENT} --query "ALTER TABLE table_for_freeze_replicated FREEZE WITH NAME 'test_01417' FORMAT TSVWithNames SETTINGS alter_partition_verbose_result = 1;" \