Merge pull request #28197 from ClickHouse/fix_requests_push

Fix rare case when watch response received before request response
This commit is contained in:
alesapin 2021-08-27 20:20:30 +03:00 committed by GitHub
commit fde3cc1315
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 25 additions and 29 deletions

View File

@ -106,31 +106,26 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
{ {
const Coordination::ZooKeeperSessionIDRequest & session_id_request = dynamic_cast<const Coordination::ZooKeeperSessionIDRequest &>(*request_for_session.request); const Coordination::ZooKeeperSessionIDRequest & session_id_request = dynamic_cast<const Coordination::ZooKeeperSessionIDRequest &>(*request_for_session.request);
int64_t session_id; int64_t session_id;
{
std::lock_guard lock(storage_lock);
session_id = storage->getSessionID(session_id_request.session_timeout_ms);
}
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms);
std::shared_ptr<Coordination::ZooKeeperSessionIDResponse> response = std::make_shared<Coordination::ZooKeeperSessionIDResponse>(); std::shared_ptr<Coordination::ZooKeeperSessionIDResponse> response = std::make_shared<Coordination::ZooKeeperSessionIDResponse>();
response->internal_id = session_id_request.internal_id; response->internal_id = session_id_request.internal_id;
response->session_id = session_id;
response->server_id = session_id_request.server_id; response->server_id = session_id_request.server_id;
KeeperStorage::ResponseForSession response_for_session; KeeperStorage::ResponseForSession response_for_session;
response_for_session.session_id = -1; response_for_session.session_id = -1;
response_for_session.response = response; response_for_session.response = response;
responses_queue.push(response_for_session); {
std::lock_guard lock(storage_and_responses_lock);
session_id = storage->getSessionID(session_id_request.session_timeout_ms);
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms);
response->session_id = session_id;
responses_queue.push(response_for_session);
}
} }
else else
{ {
KeeperStorage::ResponsesForSessions responses_for_sessions; std::lock_guard lock(storage_and_responses_lock);
{ KeeperStorage::ResponsesForSessions responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx);
std::lock_guard lock(storage_lock); for (auto & response_for_session : responses_for_sessions)
responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx); responses_queue.push(response_for_session);
for (auto & response_for_session : responses_for_sessions)
responses_queue.push(response_for_session);
}
} }
last_committed_idx = log_idx; last_committed_idx = log_idx;
@ -150,7 +145,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
} }
{ /// deserialize and apply snapshot to storage { /// deserialize and apply snapshot to storage
std::lock_guard lock(storage_lock); std::lock_guard lock(storage_and_responses_lock);
std::tie(latest_snapshot_meta, storage) = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_ptr); std::tie(latest_snapshot_meta, storage) = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_ptr);
} }
@ -175,7 +170,7 @@ void KeeperStateMachine::create_snapshot(
auto snapshot_meta_copy = nuraft::snapshot::deserialize(*snp_buf); auto snapshot_meta_copy = nuraft::snapshot::deserialize(*snp_buf);
CreateSnapshotTask snapshot_task; CreateSnapshotTask snapshot_task;
{ /// lock storage for a short period time to turn on "snapshot mode". After that we can read consistent storage state without locking. { /// lock storage for a short period time to turn on "snapshot mode". After that we can read consistent storage state without locking.
std::lock_guard lock(storage_lock); std::lock_guard lock(storage_and_responses_lock);
snapshot_task.snapshot = std::make_shared<KeeperStorageSnapshot>(storage.get(), snapshot_meta_copy); snapshot_task.snapshot = std::make_shared<KeeperStorageSnapshot>(storage.get(), snapshot_meta_copy);
} }
@ -198,7 +193,7 @@ void KeeperStateMachine::create_snapshot(
{ {
/// Must do it with lock (clearing elements from list) /// Must do it with lock (clearing elements from list)
std::lock_guard lock(storage_lock); std::lock_guard lock(storage_and_responses_lock);
/// Turn off "snapshot mode" and clear outdate part of storage state /// Turn off "snapshot mode" and clear outdate part of storage state
storage->clearGarbageAfterSnapshot(); storage->clearGarbageAfterSnapshot();
/// Destroy snapshot with lock /// Destroy snapshot with lock
@ -236,7 +231,7 @@ void KeeperStateMachine::save_logical_snp_obj(
nuraft::ptr<nuraft::snapshot> cloned_meta; nuraft::ptr<nuraft::snapshot> cloned_meta;
if (obj_id == 0) /// Fake snapshot required by NuRaft at startup if (obj_id == 0) /// Fake snapshot required by NuRaft at startup
{ {
std::lock_guard lock(storage_lock); std::lock_guard lock(storage_and_responses_lock);
KeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx()); KeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx());
cloned_buffer = snapshot_manager.serializeSnapshotToBuffer(snapshot); cloned_buffer = snapshot_manager.serializeSnapshotToBuffer(snapshot);
} }
@ -303,24 +298,21 @@ int KeeperStateMachine::read_logical_snp_obj(
void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSession & request_for_session) void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSession & request_for_session)
{ {
/// Pure local request, just process it with storage /// Pure local request, just process it with storage
KeeperStorage::ResponsesForSessions responses; std::lock_guard lock(storage_and_responses_lock);
{ auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, std::nullopt);
std::lock_guard lock(storage_lock);
responses = storage->processRequest(request_for_session.request, request_for_session.session_id, std::nullopt);
}
for (const auto & response : responses) for (const auto & response : responses)
responses_queue.push(response); responses_queue.push(response);
} }
std::unordered_set<int64_t> KeeperStateMachine::getDeadSessions() std::unordered_set<int64_t> KeeperStateMachine::getDeadSessions()
{ {
std::lock_guard lock(storage_lock); std::lock_guard lock(storage_and_responses_lock);
return storage->getDeadSessions(); return storage->getDeadSessions();
} }
void KeeperStateMachine::shutdownStorage() void KeeperStateMachine::shutdownStorage()
{ {
std::lock_guard lock(storage_lock); std::lock_guard lock(storage_and_responses_lock);
storage->finalize(); storage->finalize();
} }

View File

@ -99,8 +99,12 @@ private:
/// Mutex for snapshots /// Mutex for snapshots
std::mutex snapshots_lock; std::mutex snapshots_lock;
/// Lock for storage /// Lock for storage and responses_queue. It's important to process requests
std::mutex storage_lock; /// and push them to the responses queue while holding this lock. Otherwise
/// we can get strange cases when, for example client send read request with
/// watch and after that receive watch response and only receive response
/// for request.
std::mutex storage_and_responses_lock;
/// Last committed Raft log number. /// Last committed Raft log number.
std::atomic<uint64_t> last_committed_idx; std::atomic<uint64_t> last_committed_idx;