From 88f77c686d6ab3f8a4fb6a7c0f3b922019c69053 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Thu, 6 Apr 2023 10:41:44 +0000 Subject: [PATCH 1/3] Correctly handle concurrent snapshots --- src/Coordination/KeeperStateMachine.cpp | 42 ++++++++++++++++++------- 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 632aaec6b54..efd1c10e568 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -288,15 +288,20 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s) nuraft::ptr latest_snapshot_ptr; { /// save snapshot into memory std::lock_guard lock(snapshots_lock); - if (s.get_last_log_idx() != latest_snapshot_meta->get_last_log_idx()) + if (s.get_last_log_idx() > latest_snapshot_meta->get_last_log_idx()) { ProfileEvents::increment(ProfileEvents::KeeperSnapshotApplysFailed); throw Exception( ErrorCodes::LOGICAL_ERROR, - "Required to apply snapshot with last log index {}, but our last log index is {}", + "Required to apply snapshot with last log index {}, but last created snapshot was for smaller log index {}", s.get_last_log_idx(), latest_snapshot_meta->get_last_log_idx()); } + else if (s.get_last_log_idx() < latest_snapshot_meta->get_last_log_idx()) + { + LOG_INFO(log, "A snapshot with a larger last log index ({}) was created, skipping applying this snapshot", latest_snapshot_meta->get_last_log_idx()); + } + latest_snapshot_ptr = latest_snapshot_buf; } @@ -371,19 +376,32 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res { { /// Read storage data without locks and create snapshot std::lock_guard lock(snapshots_lock); - auto [path, error_code] = snapshot_manager.serializeSnapshotToDisk(*snapshot); - if (error_code) + + if (latest_snapshot_meta && snapshot->snapshot_meta->get_last_log_idx() <= latest_snapshot_meta->get_last_log_idx()) { - throw Exception( - ErrorCodes::SYSTEM_ERROR, - "Snapshot {} was created failed, error: {}", + LOG_INFO( + log, + "Will not create a snapshot with last log idx {} because a snapshot with bigger last log idx ({}) is already " + "created", snapshot->snapshot_meta->get_last_log_idx(), - error_code.message()); + latest_snapshot_meta->get_last_log_idx()); + } + else + { + auto [path, error_code] = snapshot_manager.serializeSnapshotToDisk(*snapshot); + if (error_code) + { + throw Exception( + ErrorCodes::SYSTEM_ERROR, + "Snapshot {} was created failed, error: {}", + snapshot->snapshot_meta->get_last_log_idx(), + error_code.message()); + } + latest_snapshot_path = path; + latest_snapshot_meta = snapshot->snapshot_meta; + ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreations); + LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), path); } - latest_snapshot_path = path; - latest_snapshot_meta = snapshot->snapshot_meta; - ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreations); - LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), path); } { From 6d6f3bc58b62812139667b77571f62378fd78b71 Mon Sep 17 00:00:00 2001 From: Manas Alekar Date: Wed, 3 May 2023 18:11:17 -0700 Subject: [PATCH 2/3] Static cast std::atomic to uint64_t to serialize. There are no viable constructors for the atomic in rapidJSON. --- utils/keeper-bench/Stats.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/keeper-bench/Stats.cpp b/utils/keeper-bench/Stats.cpp index f5e5f84ba14..3e7e92db713 100644 --- a/utils/keeper-bench/Stats.cpp +++ b/utils/keeper-bench/Stats.cpp @@ -132,7 +132,7 @@ void Stats::writeJSON(DB::WriteBuffer & out, size_t concurrency, int64_t start_t { Value specific_results(kObjectType); - specific_results.AddMember("total_requests", Value(collector.requests), allocator); + specific_results.AddMember("total_requests", Value(static_cast(collector.requests)), allocator); auto [rps, bps] = collector.getThroughput(concurrency); specific_results.AddMember("requests_per_second", Value(rps), allocator); From c93a1310197ebba10484c8440453af223412793d Mon Sep 17 00:00:00 2001 From: Konstantin Bogdanov Date: Thu, 4 May 2023 18:21:08 +0300 Subject: [PATCH 3/3] Add an integration test for `shutdown_wait_unfinished_queries` (#49469) --- .../__init__.py | 0 .../configs/config_kill.xml | 4 ++ .../configs/config_wait.xml | 4 ++ .../test.py | 56 +++++++++++++++++++ 4 files changed, 64 insertions(+) create mode 100644 tests/integration/test_shutdown_wait_unfinished_queries/__init__.py create mode 100644 tests/integration/test_shutdown_wait_unfinished_queries/configs/config_kill.xml create mode 100644 tests/integration/test_shutdown_wait_unfinished_queries/configs/config_wait.xml create mode 100644 tests/integration/test_shutdown_wait_unfinished_queries/test.py diff --git a/tests/integration/test_shutdown_wait_unfinished_queries/__init__.py b/tests/integration/test_shutdown_wait_unfinished_queries/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_shutdown_wait_unfinished_queries/configs/config_kill.xml b/tests/integration/test_shutdown_wait_unfinished_queries/configs/config_kill.xml new file mode 100644 index 00000000000..9708c93b5f2 --- /dev/null +++ b/tests/integration/test_shutdown_wait_unfinished_queries/configs/config_kill.xml @@ -0,0 +1,4 @@ + + 30 + 0 + diff --git a/tests/integration/test_shutdown_wait_unfinished_queries/configs/config_wait.xml b/tests/integration/test_shutdown_wait_unfinished_queries/configs/config_wait.xml new file mode 100644 index 00000000000..f8e21c2684e --- /dev/null +++ b/tests/integration/test_shutdown_wait_unfinished_queries/configs/config_wait.xml @@ -0,0 +1,4 @@ + + 30 + 1 + diff --git a/tests/integration/test_shutdown_wait_unfinished_queries/test.py b/tests/integration/test_shutdown_wait_unfinished_queries/test.py new file mode 100644 index 00000000000..ae0710149de --- /dev/null +++ b/tests/integration/test_shutdown_wait_unfinished_queries/test.py @@ -0,0 +1,56 @@ +import pytest + +import threading +import time +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) +node_wait_queries = cluster.add_instance( + "node_wait_queries", main_configs=["configs/config_wait.xml"], stay_alive=True +) +node_kill_queries = cluster.add_instance( + "node_kill_queries", main_configs=["configs/config_kill.xml"], stay_alive=True +) + +global result + + +@pytest.fixture(scope="module") +def start_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def do_long_query(node): + global result + + result = node.query_and_get_answer_with_error( + "SELECT sleepEachRow(1) FROM system.numbers LIMIT 10", + settings={"send_logs_level": "trace"}, + ) + + +def test_shutdown_wait_unfinished_queries(start_cluster): + global result + + long_query = threading.Thread(target=do_long_query, args=(node_wait_queries,)) + long_query.start() + + time.sleep(1) + node_wait_queries.stop_clickhouse(kill=False) + + long_query.join() + + assert result[0].count("0") == 10 + + long_query = threading.Thread(target=do_long_query, args=(node_kill_queries,)) + long_query.start() + + time.sleep(1) + node_kill_queries.stop_clickhouse(kill=False) + + long_query.join() + assert "QUERY_WAS_CANCELLED" in result[1]