From abf36065b7bbddeba2b80f76ad966a9167852089 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Thu, 6 Jul 2023 17:24:06 +0200 Subject: [PATCH] fix --- .../ReplicatedMergeTreePartCheckThread.cpp | 89 ++++++++++--------- .../ReplicatedMergeTreePartCheckThread.h | 4 +- src/Storages/StorageReplicatedMergeTree.cpp | 17 +++- .../__init__.py | 0 .../configs/testkeeper.xml | 6 -- .../test.py | 65 -------------- .../02254_projection_broken_part.reference | 6 ++ .../02254_projection_broken_part.sh | 44 +++++++++ 8 files changed, 115 insertions(+), 116 deletions(-) delete mode 100644 tests/integration/test_projection_report_broken_part/__init__.py delete mode 100644 tests/integration/test_projection_report_broken_part/configs/testkeeper.xml delete mode 100644 tests/integration/test_projection_report_broken_part/test.py create mode 100644 tests/queries/0_stateless/02254_projection_broken_part.reference create mode 100755 tests/queries/0_stateless/02254_projection_broken_part.sh diff --git a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp index 1cc3736bd2e..ffe3f883f80 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp @@ -63,6 +63,7 @@ void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t if (parts_set.contains(name)) return; + LOG_TRACE(log, "Enqueueing {} for check after after {}s", name, delay_to_check_seconds); parts_queue.emplace_back(name, time(nullptr) + delay_to_check_seconds); parts_set.insert(name); task->schedule(); @@ -423,7 +424,7 @@ ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const St } -CheckResult ReplicatedMergeTreePartCheckThread::checkPartAndFix(const String & part_name) +CheckResult ReplicatedMergeTreePartCheckThread::checkPartAndFix(const String & part_name, std::optional * recheck_after) { LOG_INFO(log, "Checking part {}", part_name); ProfileEvents::increment(ProfileEvents::ReplicatedPartChecks); @@ -438,7 +439,11 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPartAndFix(const String & p break; case ReplicatedCheckResult::RecheckLater: - enqueuePart(part_name, result.recheck_after); + /// NOTE We cannot enqueue it from the check thread itself + if (recheck_after) + *recheck_after = result.recheck_after; + else + enqueuePart(part_name, result.recheck_after); break; case ReplicatedCheckResult::DetachUnexpected: @@ -471,10 +476,22 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPartAndFix(const String & p /// Part is not in ZooKeeper and not on disk (so there's nothing to detach or remove from ZooKeeper). /// Probably we cannot execute some entry from the replication queue (so don't need to enqueue another one). - /// Either all replicas having the part are not active, or the part is lost forever. + /// Either all replicas having the part are not active... bool found_something = searchForMissingPartOnOtherReplicas(part_name); - if (!found_something) - onPartIsLostForever(part_name); + if (found_something) + break; + + /// ... or the part is lost forever + bool handled_lost_part = onPartIsLostForever(part_name); + if (handled_lost_part) + break; + + /// We failed to create empty part, need retry + constexpr time_t retry_after_seconds = 30; + if (recheck_after) + *recheck_after = retry_after_seconds; + else + enqueuePart(part_name, retry_after_seconds); break; } @@ -483,7 +500,7 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPartAndFix(const String & p return result.status; } -void ReplicatedMergeTreePartCheckThread::onPartIsLostForever(const String & part_name) +bool ReplicatedMergeTreePartCheckThread::onPartIsLostForever(const String & part_name) { auto lost_part_info = MergeTreePartInfo::fromPartName(part_name, storage.format_version); if (lost_part_info.level != 0 || lost_part_info.mutation != 0) @@ -499,7 +516,7 @@ void ReplicatedMergeTreePartCheckThread::onPartIsLostForever(const String & part for (const String & source_part_name : source_parts) enqueuePart(source_part_name); - return; + return true; } } @@ -512,13 +529,11 @@ void ReplicatedMergeTreePartCheckThread::onPartIsLostForever(const String & part */ LOG_ERROR(log, "Part {} is lost forever.", part_name); ProfileEvents::increment(ProfileEvents::ReplicatedDataLoss); + return true; } - else - { - LOG_WARNING(log, "Cannot create empty part {} instead of lost. Will retry later", part_name); - constexpr time_t retry_after_seconds = 30; - enqueuePart(part_name, retry_after_seconds); - } + + LOG_WARNING(log, "Cannot create empty part {} instead of lost. Will retry later", part_name); + return false; } @@ -533,42 +548,29 @@ void ReplicatedMergeTreePartCheckThread::run() /// Take part from the queue for verification. PartsToCheckQueue::iterator selected = parts_queue.end(); /// end from std::list is not get invalidated - time_t min_check_time = std::numeric_limits::max(); { std::lock_guard lock(parts_mutex); - if (parts_queue.empty()) + if (parts_queue.empty() && !parts_set.empty()) { - if (!parts_set.empty()) - { - parts_set.clear(); - throw Exception(ErrorCodes::LOGICAL_ERROR, "Non-empty parts_set with empty parts_queue. This is a bug."); - } + parts_set.clear(); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Non-empty parts_set with empty parts_queue. This is a bug."); } - else - { - for (auto it = parts_queue.begin(); it != parts_queue.end(); ++it) - { - if (it->second <= current_time) - { - selected = it; - break; - } - if (it->second < min_check_time) - { - min_check_time = it->second; - selected = it; - } - } - } + selected = std::find_if(parts_queue.begin(), parts_queue.end(), [current_time](const auto & elem) + { + return elem.second <= current_time; + }); + if (selected == parts_queue.end()) + return; + + /// Move selected part to the end of the queue + parts_queue.splice(parts_queue.end(), parts_queue, selected); } - if (selected == parts_queue.end()) - return; - - checkPartAndFix(selected->first); + std::optional recheck_after; + checkPartAndFix(selected->first, &recheck_after); if (need_stop) return; @@ -581,6 +583,11 @@ void ReplicatedMergeTreePartCheckThread::run() { throw Exception(ErrorCodes::LOGICAL_ERROR, "Someone erased checking part from parts_queue. This is a bug."); } + else if (recheck_after.has_value()) + { + LOG_TRACE(log, "Will recheck part {} after after {}s", selected->first, *recheck_after); + selected->second = time(nullptr) + *recheck_after; + } else { parts_set.erase(selected->first); @@ -596,7 +603,7 @@ void ReplicatedMergeTreePartCheckThread::run() { tryLogCurrentException(log, __PRETTY_FUNCTION__); - if (e.code == Coordination::Error::ZSESSIONEXPIRED) + if (Coordination::isHardwareError(e.code)) return; task->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h index 0a8fbc75c05..fc76cbad4ed 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h @@ -65,7 +65,7 @@ public: size_t size() const; /// Check part by name - CheckResult checkPartAndFix(const String & part_name); + CheckResult checkPartAndFix(const String & part_name, std::optional * recheck_after = nullptr); ReplicatedCheckResult checkPartImpl(const String & part_name); @@ -77,7 +77,7 @@ public: private: void run(); - void onPartIsLostForever(const String & part_name); + bool onPartIsLostForever(const String & part_name); std::pair findLocalPart(const String & part_name); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ff319e47946..e8176ac1d5f 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -3366,6 +3366,10 @@ bool StorageReplicatedMergeTree::canExecuteFetch(const ReplicatedMergeTreeLogEnt { disable_reason = fmt::format("Not executing fetch of part {} because we still have broken part with that name. " "Waiting for the broken part to be removed first.", entry.new_part_name); + + constexpr time_t min_interval_to_wakeup_cleanup_s = 30; + if (entry.last_postpone_time + min_interval_to_wakeup_cleanup_s < time(nullptr)) + const_cast(this)->cleanup_thread.wakeup(); return false; } } @@ -3753,11 +3757,13 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n DataPartPtr broken_part; auto outdate_broken_part = [this, &broken_part]() { - if (broken_part) + if (!broken_part) return; DataPartsLock lock = lockParts(); if (broken_part->getState() == DataPartState::Active) removePartsFromWorkingSet(NO_TRANSACTION_RAW, {broken_part}, true, &lock); + broken_part.reset(); + cleanup_thread.wakeup(); }; /// We don't know exactly what happened to broken part @@ -3767,6 +3773,7 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n auto partition_range = getDataPartsVectorInPartitionForInternalUsage({MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated}, broken_part_info.partition_id); + Strings detached_parts; for (const auto & part : partition_range) { if (!broken_part_info.contains(part->info)) @@ -3784,7 +3791,9 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n { part->makeCloneInDetached("covered-by-broken", getInMemoryMetadataPtr()); } + detached_parts.push_back(part->name); } + LOG_WARNING(log, "Detached {} parts covered by broken part {}: {}", detached_parts.size(), part_name, fmt::join(detached_parts, ", ")); ThreadFuzzer::maybeInjectSleep(); ThreadFuzzer::maybeInjectMemoryLimitException(); @@ -3873,10 +3882,14 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n zkutil::KeeperMultiException::check(rc, ops, results); + String path_created = dynamic_cast(*results.back()).path_created; + log_entry->znode_name = path_created.substr(path_created.find_last_of('/') + 1); + LOG_DEBUG(log, "Created entry {} to fetch missing part {}", log_entry->znode_name, part_name); + queue.insert(zookeeper, log_entry); + /// Make the part outdated after creating the log entry. /// Otherwise, if we failed to create the entry, cleanup thread could remove the part from ZooKeeper (leading to diverged replicas) outdate_broken_part(); - queue_updating_task->schedule(); return; } } diff --git a/tests/integration/test_projection_report_broken_part/__init__.py b/tests/integration/test_projection_report_broken_part/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/tests/integration/test_projection_report_broken_part/configs/testkeeper.xml b/tests/integration/test_projection_report_broken_part/configs/testkeeper.xml deleted file mode 100644 index 617371b13fa..00000000000 --- a/tests/integration/test_projection_report_broken_part/configs/testkeeper.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - testkeeper - - diff --git a/tests/integration/test_projection_report_broken_part/test.py b/tests/integration/test_projection_report_broken_part/test.py deleted file mode 100644 index f376adf4f1a..00000000000 --- a/tests/integration/test_projection_report_broken_part/test.py +++ /dev/null @@ -1,65 +0,0 @@ -# pylint: disable=unused-argument -# pylint: disable=redefined-outer-name -# pylint: disable=line-too-long - -import pytest -import time - -from helpers.client import QueryRuntimeException -from helpers.cluster import ClickHouseCluster - -cluster = ClickHouseCluster(__file__) -node = cluster.add_instance( - "node", - main_configs=[ - "configs/testkeeper.xml", - ], -) - - -@pytest.fixture(scope="module", autouse=True) -def start_cluster(): - try: - cluster.start() - yield cluster - finally: - cluster.shutdown() - - -def test_projection_broken_part(): - node.query( - """ - create table test_projection_broken_parts_1 (a int, b int, projection ab (select a, sum(b) group by a)) - engine = ReplicatedMergeTree('/clickhouse-tables/test_projection_broken_parts', 'r1') - order by a settings index_granularity = 1; - - create table test_projection_broken_parts_2 (a int, b int, projection ab (select a, sum(b) group by a)) - engine ReplicatedMergeTree('/clickhouse-tables/test_projection_broken_parts', 'r2') - order by a settings index_granularity = 1; - - insert into test_projection_broken_parts_1 values (1, 1), (1, 2), (1, 3); - - system sync replica test_projection_broken_parts_2; - """ - ) - - # break projection part - node.exec_in_container( - [ - "bash", - "-c", - "rm /var/lib/clickhouse/data/default/test_projection_broken_parts_1/all_0_0_0/ab.proj/data.bin", - ] - ) - - expected_error = "No such file or directory" - assert expected_error in node.query_and_get_error( - "select sum(b) from test_projection_broken_parts_1 group by a" - ) - - time.sleep(2) - - assert ( - int(node.query("select sum(b) from test_projection_broken_parts_1 group by a")) - == 6 - ) diff --git a/tests/queries/0_stateless/02254_projection_broken_part.reference b/tests/queries/0_stateless/02254_projection_broken_part.reference new file mode 100644 index 00000000000..68538fd31ea --- /dev/null +++ b/tests/queries/0_stateless/02254_projection_broken_part.reference @@ -0,0 +1,6 @@ +1 1 1 all_0_0_0 +1 1 2 all_0_0_0 +1 1 3 all_0_0_0 +2 6 +0 +5 6 diff --git a/tests/queries/0_stateless/02254_projection_broken_part.sh b/tests/queries/0_stateless/02254_projection_broken_part.sh new file mode 100755 index 00000000000..d276c67f8de --- /dev/null +++ b/tests/queries/0_stateless/02254_projection_broken_part.sh @@ -0,0 +1,44 @@ +#!/usr/bin/env bash +# Tags: long, zookeeper + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -q "drop table if exists projection_broken_parts_1 sync;" +$CLICKHOUSE_CLIENT -q "drop table if exists projection_broken_parts_1 sync;" + +$CLICKHOUSE_CLIENT -q "create table projection_broken_parts_1 (a int, b int, projection ab (select a, sum(b) group by a)) + engine = ReplicatedMergeTree('/test/02369/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/rmt', 'r1') + order by a settings index_granularity = 1;" + +$CLICKHOUSE_CLIENT -q "create table projection_broken_parts_2 (a int, b int, projection ab (select a, sum(b) group by a)) + engine = ReplicatedMergeTree('/test/02369/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/rmt', 'r2') + order by a settings index_granularity = 1;" + +$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "insert into projection_broken_parts_1 values (1, 1), (1, 2), (1, 3);" +$CLICKHOUSE_CLIENT -q "system sync replica projection_broken_parts_2;" +$CLICKHOUSE_CLIENT -q "select 1, *, _part from projection_broken_parts_2 order by b;" +$CLICKHOUSE_CLIENT -q "select 2, sum(b) from projection_broken_parts_2 group by a;" + +path=$($CLICKHOUSE_CLIENT -q "select path from system.parts where database='$CLICKHOUSE_DATABASE' and table='projection_broken_parts_1' and name='all_0_0_0'") +# ensure that path is absolute before removing +$CLICKHOUSE_CLIENT -q "select throwIf(substring('$path', 1, 1) != '/', 'Path is relative: $path')" || exit +rm -f "$path/ab.proj/data.bin" + +$CLICKHOUSE_CLIENT -q "select 3, sum(b) from projection_broken_parts_1 group by a;" 2>/dev/null + +num_tries=0 +while ! $CLICKHOUSE_CLIENT -q "select 4, sum(b) from projection_broken_parts_1 group by a format Null;" 2>/dev/null; do + sleep 1; + num_tries=$((num_tries+1)) + if [ $num_tries -eq 60 ]; then + break + fi +done + +$CLICKHOUSE_CLIENT -q "system sync replica projection_broken_parts_1;" +$CLICKHOUSE_CLIENT -q "select 5, sum(b) from projection_broken_parts_1 group by a;" + +$CLICKHOUSE_CLIENT -q "drop table if exists projection_broken_parts_1 sync;" +$CLICKHOUSE_CLIENT -q "drop table if exists projection_broken_parts_1 sync;"