This commit is contained in:
Alexander Tokmakov 2023-07-06 17:24:06 +02:00
parent 607a8a1c46
commit abf36065b7
8 changed files with 115 additions and 116 deletions

View File

@ -63,6 +63,7 @@ void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t
if (parts_set.contains(name))
return;
LOG_TRACE(log, "Enqueueing {} for check after after {}s", name, delay_to_check_seconds);
parts_queue.emplace_back(name, time(nullptr) + delay_to_check_seconds);
parts_set.insert(name);
task->schedule();
@ -423,7 +424,7 @@ ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const St
}
CheckResult ReplicatedMergeTreePartCheckThread::checkPartAndFix(const String & part_name)
CheckResult ReplicatedMergeTreePartCheckThread::checkPartAndFix(const String & part_name, std::optional<time_t> * recheck_after)
{
LOG_INFO(log, "Checking part {}", part_name);
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecks);
@ -438,7 +439,11 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPartAndFix(const String & p
break;
case ReplicatedCheckResult::RecheckLater:
enqueuePart(part_name, result.recheck_after);
/// NOTE We cannot enqueue it from the check thread itself
if (recheck_after)
*recheck_after = result.recheck_after;
else
enqueuePart(part_name, result.recheck_after);
break;
case ReplicatedCheckResult::DetachUnexpected:
@ -471,10 +476,22 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPartAndFix(const String & p
/// Part is not in ZooKeeper and not on disk (so there's nothing to detach or remove from ZooKeeper).
/// Probably we cannot execute some entry from the replication queue (so don't need to enqueue another one).
/// Either all replicas having the part are not active, or the part is lost forever.
/// Either all replicas having the part are not active...
bool found_something = searchForMissingPartOnOtherReplicas(part_name);
if (!found_something)
onPartIsLostForever(part_name);
if (found_something)
break;
/// ... or the part is lost forever
bool handled_lost_part = onPartIsLostForever(part_name);
if (handled_lost_part)
break;
/// We failed to create empty part, need retry
constexpr time_t retry_after_seconds = 30;
if (recheck_after)
*recheck_after = retry_after_seconds;
else
enqueuePart(part_name, retry_after_seconds);
break;
}
@ -483,7 +500,7 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPartAndFix(const String & p
return result.status;
}
void ReplicatedMergeTreePartCheckThread::onPartIsLostForever(const String & part_name)
bool ReplicatedMergeTreePartCheckThread::onPartIsLostForever(const String & part_name)
{
auto lost_part_info = MergeTreePartInfo::fromPartName(part_name, storage.format_version);
if (lost_part_info.level != 0 || lost_part_info.mutation != 0)
@ -499,7 +516,7 @@ void ReplicatedMergeTreePartCheckThread::onPartIsLostForever(const String & part
for (const String & source_part_name : source_parts)
enqueuePart(source_part_name);
return;
return true;
}
}
@ -512,13 +529,11 @@ void ReplicatedMergeTreePartCheckThread::onPartIsLostForever(const String & part
*/
LOG_ERROR(log, "Part {} is lost forever.", part_name);
ProfileEvents::increment(ProfileEvents::ReplicatedDataLoss);
return true;
}
else
{
LOG_WARNING(log, "Cannot create empty part {} instead of lost. Will retry later", part_name);
constexpr time_t retry_after_seconds = 30;
enqueuePart(part_name, retry_after_seconds);
}
LOG_WARNING(log, "Cannot create empty part {} instead of lost. Will retry later", part_name);
return false;
}
@ -533,42 +548,29 @@ void ReplicatedMergeTreePartCheckThread::run()
/// Take part from the queue for verification.
PartsToCheckQueue::iterator selected = parts_queue.end(); /// end from std::list is not get invalidated
time_t min_check_time = std::numeric_limits<time_t>::max();
{
std::lock_guard lock(parts_mutex);
if (parts_queue.empty())
if (parts_queue.empty() && !parts_set.empty())
{
if (!parts_set.empty())
{
parts_set.clear();
throw Exception(ErrorCodes::LOGICAL_ERROR, "Non-empty parts_set with empty parts_queue. This is a bug.");
}
parts_set.clear();
throw Exception(ErrorCodes::LOGICAL_ERROR, "Non-empty parts_set with empty parts_queue. This is a bug.");
}
else
{
for (auto it = parts_queue.begin(); it != parts_queue.end(); ++it)
{
if (it->second <= current_time)
{
selected = it;
break;
}
if (it->second < min_check_time)
{
min_check_time = it->second;
selected = it;
}
}
}
selected = std::find_if(parts_queue.begin(), parts_queue.end(), [current_time](const auto & elem)
{
return elem.second <= current_time;
});
if (selected == parts_queue.end())
return;
/// Move selected part to the end of the queue
parts_queue.splice(parts_queue.end(), parts_queue, selected);
}
if (selected == parts_queue.end())
return;
checkPartAndFix(selected->first);
std::optional<time_t> recheck_after;
checkPartAndFix(selected->first, &recheck_after);
if (need_stop)
return;
@ -581,6 +583,11 @@ void ReplicatedMergeTreePartCheckThread::run()
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Someone erased checking part from parts_queue. This is a bug.");
}
else if (recheck_after.has_value())
{
LOG_TRACE(log, "Will recheck part {} after after {}s", selected->first, *recheck_after);
selected->second = time(nullptr) + *recheck_after;
}
else
{
parts_set.erase(selected->first);
@ -596,7 +603,7 @@ void ReplicatedMergeTreePartCheckThread::run()
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
if (e.code == Coordination::Error::ZSESSIONEXPIRED)
if (Coordination::isHardwareError(e.code))
return;
task->scheduleAfter(PART_CHECK_ERROR_SLEEP_MS);

View File

@ -65,7 +65,7 @@ public:
size_t size() const;
/// Check part by name
CheckResult checkPartAndFix(const String & part_name);
CheckResult checkPartAndFix(const String & part_name, std::optional<time_t> * recheck_after = nullptr);
ReplicatedCheckResult checkPartImpl(const String & part_name);
@ -77,7 +77,7 @@ public:
private:
void run();
void onPartIsLostForever(const String & part_name);
bool onPartIsLostForever(const String & part_name);
std::pair<bool, MergeTreeDataPartPtr> findLocalPart(const String & part_name);

View File

@ -3366,6 +3366,10 @@ bool StorageReplicatedMergeTree::canExecuteFetch(const ReplicatedMergeTreeLogEnt
{
disable_reason = fmt::format("Not executing fetch of part {} because we still have broken part with that name. "
"Waiting for the broken part to be removed first.", entry.new_part_name);
constexpr time_t min_interval_to_wakeup_cleanup_s = 30;
if (entry.last_postpone_time + min_interval_to_wakeup_cleanup_s < time(nullptr))
const_cast<StorageReplicatedMergeTree *>(this)->cleanup_thread.wakeup();
return false;
}
}
@ -3753,11 +3757,13 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
DataPartPtr broken_part;
auto outdate_broken_part = [this, &broken_part]()
{
if (broken_part)
if (!broken_part)
return;
DataPartsLock lock = lockParts();
if (broken_part->getState() == DataPartState::Active)
removePartsFromWorkingSet(NO_TRANSACTION_RAW, {broken_part}, true, &lock);
broken_part.reset();
cleanup_thread.wakeup();
};
/// We don't know exactly what happened to broken part
@ -3767,6 +3773,7 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
auto partition_range = getDataPartsVectorInPartitionForInternalUsage({MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated},
broken_part_info.partition_id);
Strings detached_parts;
for (const auto & part : partition_range)
{
if (!broken_part_info.contains(part->info))
@ -3784,7 +3791,9 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
{
part->makeCloneInDetached("covered-by-broken", getInMemoryMetadataPtr());
}
detached_parts.push_back(part->name);
}
LOG_WARNING(log, "Detached {} parts covered by broken part {}: {}", detached_parts.size(), part_name, fmt::join(detached_parts, ", "));
ThreadFuzzer::maybeInjectSleep();
ThreadFuzzer::maybeInjectMemoryLimitException();
@ -3873,10 +3882,14 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
zkutil::KeeperMultiException::check(rc, ops, results);
String path_created = dynamic_cast<const Coordination::CreateResponse &>(*results.back()).path_created;
log_entry->znode_name = path_created.substr(path_created.find_last_of('/') + 1);
LOG_DEBUG(log, "Created entry {} to fetch missing part {}", log_entry->znode_name, part_name);
queue.insert(zookeeper, log_entry);
/// Make the part outdated after creating the log entry.
/// Otherwise, if we failed to create the entry, cleanup thread could remove the part from ZooKeeper (leading to diverged replicas)
outdate_broken_part();
queue_updating_task->schedule();
return;
}
}

View File

@ -1,6 +0,0 @@
<clickhouse>
<zookeeper>
<!-- Don't need real [Zoo]Keeper for this test -->
<implementation>testkeeper</implementation>
</zookeeper>
</clickhouse>

View File

@ -1,65 +0,0 @@
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
# pylint: disable=line-too-long
import pytest
import time
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=[
"configs/testkeeper.xml",
],
)
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_projection_broken_part():
node.query(
"""
create table test_projection_broken_parts_1 (a int, b int, projection ab (select a, sum(b) group by a))
engine = ReplicatedMergeTree('/clickhouse-tables/test_projection_broken_parts', 'r1')
order by a settings index_granularity = 1;
create table test_projection_broken_parts_2 (a int, b int, projection ab (select a, sum(b) group by a))
engine ReplicatedMergeTree('/clickhouse-tables/test_projection_broken_parts', 'r2')
order by a settings index_granularity = 1;
insert into test_projection_broken_parts_1 values (1, 1), (1, 2), (1, 3);
system sync replica test_projection_broken_parts_2;
"""
)
# break projection part
node.exec_in_container(
[
"bash",
"-c",
"rm /var/lib/clickhouse/data/default/test_projection_broken_parts_1/all_0_0_0/ab.proj/data.bin",
]
)
expected_error = "No such file or directory"
assert expected_error in node.query_and_get_error(
"select sum(b) from test_projection_broken_parts_1 group by a"
)
time.sleep(2)
assert (
int(node.query("select sum(b) from test_projection_broken_parts_1 group by a"))
== 6
)

View File

@ -0,0 +1,6 @@
1 1 1 all_0_0_0
1 1 2 all_0_0_0
1 1 3 all_0_0_0
2 6
0
5 6

View File

@ -0,0 +1,44 @@
#!/usr/bin/env bash
# Tags: long, zookeeper
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "drop table if exists projection_broken_parts_1 sync;"
$CLICKHOUSE_CLIENT -q "drop table if exists projection_broken_parts_1 sync;"
$CLICKHOUSE_CLIENT -q "create table projection_broken_parts_1 (a int, b int, projection ab (select a, sum(b) group by a))
engine = ReplicatedMergeTree('/test/02369/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/rmt', 'r1')
order by a settings index_granularity = 1;"
$CLICKHOUSE_CLIENT -q "create table projection_broken_parts_2 (a int, b int, projection ab (select a, sum(b) group by a))
engine = ReplicatedMergeTree('/test/02369/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/rmt', 'r2')
order by a settings index_granularity = 1;"
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "insert into projection_broken_parts_1 values (1, 1), (1, 2), (1, 3);"
$CLICKHOUSE_CLIENT -q "system sync replica projection_broken_parts_2;"
$CLICKHOUSE_CLIENT -q "select 1, *, _part from projection_broken_parts_2 order by b;"
$CLICKHOUSE_CLIENT -q "select 2, sum(b) from projection_broken_parts_2 group by a;"
path=$($CLICKHOUSE_CLIENT -q "select path from system.parts where database='$CLICKHOUSE_DATABASE' and table='projection_broken_parts_1' and name='all_0_0_0'")
# ensure that path is absolute before removing
$CLICKHOUSE_CLIENT -q "select throwIf(substring('$path', 1, 1) != '/', 'Path is relative: $path')" || exit
rm -f "$path/ab.proj/data.bin"
$CLICKHOUSE_CLIENT -q "select 3, sum(b) from projection_broken_parts_1 group by a;" 2>/dev/null
num_tries=0
while ! $CLICKHOUSE_CLIENT -q "select 4, sum(b) from projection_broken_parts_1 group by a format Null;" 2>/dev/null; do
sleep 1;
num_tries=$((num_tries+1))
if [ $num_tries -eq 60 ]; then
break
fi
done
$CLICKHOUSE_CLIENT -q "system sync replica projection_broken_parts_1;"
$CLICKHOUSE_CLIENT -q "select 5, sum(b) from projection_broken_parts_1 group by a;"
$CLICKHOUSE_CLIENT -q "drop table if exists projection_broken_parts_1 sync;"
$CLICKHOUSE_CLIENT -q "drop table if exists projection_broken_parts_1 sync;"