mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Fix rare case when watch response received before request response
This commit is contained in:
parent
e42b029155
commit
23325c3fa6
@ -106,31 +106,26 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
|
||||
{
|
||||
const Coordination::ZooKeeperSessionIDRequest & session_id_request = dynamic_cast<const Coordination::ZooKeeperSessionIDRequest &>(*request_for_session.request);
|
||||
int64_t session_id;
|
||||
{
|
||||
std::lock_guard lock(storage_lock);
|
||||
session_id = storage->getSessionID(session_id_request.session_timeout_ms);
|
||||
}
|
||||
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms);
|
||||
|
||||
std::shared_ptr<Coordination::ZooKeeperSessionIDResponse> response = std::make_shared<Coordination::ZooKeeperSessionIDResponse>();
|
||||
response->internal_id = session_id_request.internal_id;
|
||||
response->session_id = session_id;
|
||||
response->server_id = session_id_request.server_id;
|
||||
|
||||
KeeperStorage::ResponseForSession response_for_session;
|
||||
response_for_session.session_id = -1;
|
||||
response_for_session.response = response;
|
||||
responses_queue.push(response_for_session);
|
||||
{
|
||||
std::lock_guard lock(storage_and_responses_lock);
|
||||
session_id = storage->getSessionID(session_id_request.session_timeout_ms);
|
||||
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms);
|
||||
response->session_id = session_id;
|
||||
responses_queue.push(response_for_session);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
KeeperStorage::ResponsesForSessions responses_for_sessions;
|
||||
{
|
||||
std::lock_guard lock(storage_lock);
|
||||
responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx);
|
||||
for (auto & response_for_session : responses_for_sessions)
|
||||
responses_queue.push(response_for_session);
|
||||
}
|
||||
std::lock_guard lock(storage_and_responses_lock);
|
||||
KeeperStorage::ResponsesForSessions responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx);
|
||||
for (auto & response_for_session : responses_for_sessions)
|
||||
responses_queue.push(response_for_session);
|
||||
}
|
||||
|
||||
last_committed_idx = log_idx;
|
||||
@ -150,7 +145,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
|
||||
}
|
||||
|
||||
{ /// deserialize and apply snapshot to storage
|
||||
std::lock_guard lock(storage_lock);
|
||||
std::lock_guard lock(storage_and_responses_lock);
|
||||
std::tie(latest_snapshot_meta, storage) = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_ptr);
|
||||
}
|
||||
|
||||
@ -175,7 +170,7 @@ void KeeperStateMachine::create_snapshot(
|
||||
auto snapshot_meta_copy = nuraft::snapshot::deserialize(*snp_buf);
|
||||
CreateSnapshotTask snapshot_task;
|
||||
{ /// lock storage for a short period time to turn on "snapshot mode". After that we can read consistent storage state without locking.
|
||||
std::lock_guard lock(storage_lock);
|
||||
std::lock_guard lock(storage_and_responses_lock);
|
||||
snapshot_task.snapshot = std::make_shared<KeeperStorageSnapshot>(storage.get(), snapshot_meta_copy);
|
||||
}
|
||||
|
||||
@ -198,7 +193,7 @@ void KeeperStateMachine::create_snapshot(
|
||||
|
||||
{
|
||||
/// Must do it with lock (clearing elements from list)
|
||||
std::lock_guard lock(storage_lock);
|
||||
std::lock_guard lock(storage_and_responses_lock);
|
||||
/// Turn off "snapshot mode" and clear outdate part of storage state
|
||||
storage->clearGarbageAfterSnapshot();
|
||||
/// Destroy snapshot with lock
|
||||
@ -236,7 +231,7 @@ void KeeperStateMachine::save_logical_snp_obj(
|
||||
nuraft::ptr<nuraft::snapshot> cloned_meta;
|
||||
if (obj_id == 0) /// Fake snapshot required by NuRaft at startup
|
||||
{
|
||||
std::lock_guard lock(storage_lock);
|
||||
std::lock_guard lock(storage_and_responses_lock);
|
||||
KeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx());
|
||||
cloned_buffer = snapshot_manager.serializeSnapshotToBuffer(snapshot);
|
||||
}
|
||||
@ -303,24 +298,21 @@ int KeeperStateMachine::read_logical_snp_obj(
|
||||
void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSession & request_for_session)
|
||||
{
|
||||
/// Pure local request, just process it with storage
|
||||
KeeperStorage::ResponsesForSessions responses;
|
||||
{
|
||||
std::lock_guard lock(storage_lock);
|
||||
responses = storage->processRequest(request_for_session.request, request_for_session.session_id, std::nullopt);
|
||||
}
|
||||
std::lock_guard lock(storage_and_responses_lock);
|
||||
auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, std::nullopt);
|
||||
for (const auto & response : responses)
|
||||
responses_queue.push(response);
|
||||
}
|
||||
|
||||
std::unordered_set<int64_t> KeeperStateMachine::getDeadSessions()
|
||||
{
|
||||
std::lock_guard lock(storage_lock);
|
||||
std::lock_guard lock(storage_and_responses_lock);
|
||||
return storage->getDeadSessions();
|
||||
}
|
||||
|
||||
void KeeperStateMachine::shutdownStorage()
|
||||
{
|
||||
std::lock_guard lock(storage_lock);
|
||||
std::lock_guard lock(storage_and_responses_lock);
|
||||
storage->finalize();
|
||||
}
|
||||
|
||||
|
@ -99,8 +99,12 @@ private:
|
||||
/// Mutex for snapshots
|
||||
std::mutex snapshots_lock;
|
||||
|
||||
/// Lock for storage
|
||||
std::mutex storage_lock;
|
||||
/// Lock for storage and responses_queue. It's important to process requests
|
||||
/// and push them to the responses queue while holding this lock. Otherwise
|
||||
/// we can get strange cases when, for example client send read request with
|
||||
/// watch and after that receive watch response and only receive response
|
||||
/// for request.
|
||||
std::mutex storage_and_responses_lock;
|
||||
|
||||
/// Last committed Raft log number.
|
||||
std::atomic<uint64_t> last_committed_idx;
|
||||
|
Loading…
Reference in New Issue
Block a user