Merge branch 'master' into it-grammar-fixes

This commit is contained in:
Ivan Takarlikov 2023-05-04 15:57:09 -03:00 committed by GitHub
commit 339fb0c2c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 95 additions and 13 deletions

View File

@ -289,15 +289,20 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
nuraft::ptr<nuraft::buffer> latest_snapshot_ptr;
{ /// save snapshot into memory
std::lock_guard lock(snapshots_lock);
if (s.get_last_log_idx() != latest_snapshot_meta->get_last_log_idx())
if (s.get_last_log_idx() > latest_snapshot_meta->get_last_log_idx())
{
ProfileEvents::increment(ProfileEvents::KeeperSnapshotApplysFailed);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Required to apply snapshot with last log index {}, but our last log index is {}",
"Required to apply snapshot with last log index {}, but last created snapshot was for smaller log index {}",
s.get_last_log_idx(),
latest_snapshot_meta->get_last_log_idx());
}
else if (s.get_last_log_idx() < latest_snapshot_meta->get_last_log_idx())
{
LOG_INFO(log, "A snapshot with a larger last log index ({}) was created, skipping applying this snapshot", latest_snapshot_meta->get_last_log_idx());
}
latest_snapshot_ptr = latest_snapshot_buf;
}
@ -372,6 +377,18 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
{
{ /// Read storage data without locks and create snapshot
std::lock_guard lock(snapshots_lock);
if (latest_snapshot_meta && snapshot->snapshot_meta->get_last_log_idx() <= latest_snapshot_meta->get_last_log_idx())
{
LOG_INFO(
log,
"Will not create a snapshot with last log idx {} because a snapshot with bigger last log idx ({}) is already "
"created",
snapshot->snapshot_meta->get_last_log_idx(),
latest_snapshot_meta->get_last_log_idx());
}
else
{
auto [path, error_code] = snapshot_manager.serializeSnapshotToDisk(*snapshot);
if (error_code)
{
@ -386,6 +403,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreations);
LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), path);
}
}
{
/// Destroy snapshot with lock

View File

@ -0,0 +1,4 @@
<clickhouse>
<shutdown_wait_unfinished>30</shutdown_wait_unfinished>
<shutdown_wait_unfinished_queries>0</shutdown_wait_unfinished_queries>
</clickhouse>

View File

@ -0,0 +1,4 @@
<clickhouse>
<shutdown_wait_unfinished>30</shutdown_wait_unfinished>
<shutdown_wait_unfinished_queries>1</shutdown_wait_unfinished_queries>
</clickhouse>

View File

@ -0,0 +1,56 @@
import pytest
import threading
import time
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node_wait_queries = cluster.add_instance(
"node_wait_queries", main_configs=["configs/config_wait.xml"], stay_alive=True
)
node_kill_queries = cluster.add_instance(
"node_kill_queries", main_configs=["configs/config_kill.xml"], stay_alive=True
)
global result
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def do_long_query(node):
global result
result = node.query_and_get_answer_with_error(
"SELECT sleepEachRow(1) FROM system.numbers LIMIT 10",
settings={"send_logs_level": "trace"},
)
def test_shutdown_wait_unfinished_queries(start_cluster):
global result
long_query = threading.Thread(target=do_long_query, args=(node_wait_queries,))
long_query.start()
time.sleep(1)
node_wait_queries.stop_clickhouse(kill=False)
long_query.join()
assert result[0].count("0") == 10
long_query = threading.Thread(target=do_long_query, args=(node_kill_queries,))
long_query.start()
time.sleep(1)
node_kill_queries.stop_clickhouse(kill=False)
long_query.join()
assert "QUERY_WAS_CANCELLED" in result[1]

View File

@ -132,7 +132,7 @@ void Stats::writeJSON(DB::WriteBuffer & out, size_t concurrency, int64_t start_t
{
Value specific_results(kObjectType);
specific_results.AddMember("total_requests", Value(collector.requests), allocator);
specific_results.AddMember("total_requests", Value(static_cast<uint64_t>(collector.requests)), allocator);
auto [rps, bps] = collector.getThroughput(concurrency);
specific_results.AddMember("requests_per_second", Value(rps), allocator);