mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Merge pull request #65970 from ClickHouse/keeper-fix-race-snapshot
Fix data race for Keeper snapshot queue
This commit is contained in:
commit
4066d82ca3
@ -1,8 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <deque>
|
||||
#include <type_traits>
|
||||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
@ -200,22 +198,18 @@ public:
|
||||
*/
|
||||
bool finish()
|
||||
{
|
||||
bool was_finished_before = false;
|
||||
|
||||
{
|
||||
std::lock_guard lock(queue_mutex);
|
||||
|
||||
if (is_finished)
|
||||
return true;
|
||||
|
||||
was_finished_before = is_finished;
|
||||
is_finished = true;
|
||||
}
|
||||
|
||||
pop_condition.notify_all();
|
||||
push_condition.notify_all();
|
||||
|
||||
return was_finished_before;
|
||||
return false;
|
||||
}
|
||||
|
||||
/// Returns if queue is finished
|
||||
|
@ -334,19 +334,13 @@ void KeeperDispatcher::snapshotThread()
|
||||
{
|
||||
setThreadName("KeeperSnpT");
|
||||
const auto & shutdown_called = keeper_context->isShutdownCalled();
|
||||
while (!shutdown_called)
|
||||
CreateSnapshotTask task;
|
||||
while (snapshots_queue.pop(task))
|
||||
{
|
||||
CreateSnapshotTask task;
|
||||
if (!snapshots_queue.pop(task))
|
||||
break;
|
||||
|
||||
try
|
||||
{
|
||||
auto snapshot_file_info = task.create_snapshot(std::move(task.snapshot), /*execute_only_cleanup=*/shutdown_called);
|
||||
|
||||
if (shutdown_called)
|
||||
break;
|
||||
|
||||
if (!snapshot_file_info)
|
||||
continue;
|
||||
|
||||
|
@ -597,7 +597,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
|
||||
snapshot_task.create_snapshot = [this, when_done](KeeperStorageSnapshotPtr && snapshot, bool execute_only_cleanup)
|
||||
{
|
||||
nuraft::ptr<std::exception> exception(nullptr);
|
||||
bool ret = true;
|
||||
bool ret = false;
|
||||
if (!execute_only_cleanup)
|
||||
{
|
||||
try
|
||||
@ -627,7 +627,8 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
|
||||
else
|
||||
{
|
||||
auto snapshot_buf = snapshot_manager.serializeSnapshotToBuffer(*snapshot);
|
||||
auto snapshot_info = snapshot_manager.serializeSnapshotBufferToDisk(*snapshot_buf, snapshot->snapshot_meta->get_last_log_idx());
|
||||
auto snapshot_info = snapshot_manager.serializeSnapshotBufferToDisk(
|
||||
*snapshot_buf, snapshot->snapshot_meta->get_last_log_idx());
|
||||
latest_snapshot_info = std::move(snapshot_info);
|
||||
latest_snapshot_buf = std::move(snapshot_buf);
|
||||
}
|
||||
@ -640,13 +641,14 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
|
||||
latest_snapshot_info->path);
|
||||
}
|
||||
}
|
||||
|
||||
ret = true;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreationsFailed);
|
||||
LOG_TRACE(log, "Exception happened during snapshot");
|
||||
tryLogCurrentException(log);
|
||||
ret = false;
|
||||
}
|
||||
}
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user