mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 00:52:02 +00:00
Merge pull request #41075 from ClickHouse/keeper-fix-possible-segfault
Fix possible segfault during Keeper shutdown
This commit is contained in:
commit
69d08b1007
@ -442,9 +442,9 @@ void KeeperServer::shutdownRaftServer()
|
|||||||
|
|
||||||
void KeeperServer::shutdown()
|
void KeeperServer::shutdown()
|
||||||
{
|
{
|
||||||
state_machine->shutdownStorage();
|
|
||||||
state_manager->flushAndShutDownLogStore();
|
state_manager->flushAndShutDownLogStore();
|
||||||
shutdownRaftServer();
|
shutdownRaftServer();
|
||||||
|
state_machine->shutdownStorage();
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace
|
namespace
|
||||||
@ -617,7 +617,9 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
|
|||||||
auto & entry_buf = entry->get_buf();
|
auto & entry_buf = entry->get_buf();
|
||||||
auto request_for_session = state_machine->parseRequest(entry_buf);
|
auto request_for_session = state_machine->parseRequest(entry_buf);
|
||||||
request_for_session.zxid = next_zxid;
|
request_for_session.zxid = next_zxid;
|
||||||
state_machine->preprocess(request_for_session);
|
if (!state_machine->preprocess(request_for_session))
|
||||||
|
return nuraft::cb_func::ReturnCode::ReturnNull;
|
||||||
|
|
||||||
request_for_session.digest = state_machine->getNodesDigest();
|
request_for_session.digest = state_machine->getNodesDigest();
|
||||||
entry = nuraft::cs_new<nuraft::log_entry>(entry->get_term(), getZooKeeperLogEntry(request_for_session), entry->get_val_type());
|
entry = nuraft::cs_new<nuraft::log_entry>(entry->get_term(), getZooKeeperLogEntry(request_for_session), entry->get_val_type());
|
||||||
break;
|
break;
|
||||||
|
@ -191,12 +191,16 @@ KeeperStorage::RequestForSession KeeperStateMachine::parseRequest(nuraft::buffer
|
|||||||
return request_for_session;
|
return request_for_session;
|
||||||
}
|
}
|
||||||
|
|
||||||
void KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & request_for_session)
|
bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & request_for_session)
|
||||||
{
|
{
|
||||||
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
|
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
|
||||||
return;
|
return true;
|
||||||
|
|
||||||
std::lock_guard lock(storage_and_responses_lock);
|
std::lock_guard lock(storage_and_responses_lock);
|
||||||
|
|
||||||
|
if (storage->isFinalized())
|
||||||
|
return false;
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
storage->preprocessRequest(
|
storage->preprocessRequest(
|
||||||
@ -215,6 +219,8 @@ void KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
|
|||||||
|
|
||||||
if (keeper_context->digest_enabled && request_for_session.digest)
|
if (keeper_context->digest_enabled && request_for_session.digest)
|
||||||
assertDigest(*request_for_session.digest, storage->getNodesDigest(false), *request_for_session.request, false);
|
assertDigest(*request_for_session.digest, storage->getNodesDigest(false), *request_for_session.request, false);
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data)
|
nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data)
|
||||||
|
@ -33,7 +33,7 @@ public:
|
|||||||
|
|
||||||
static KeeperStorage::RequestForSession parseRequest(nuraft::buffer & data);
|
static KeeperStorage::RequestForSession parseRequest(nuraft::buffer & data);
|
||||||
|
|
||||||
void preprocess(const KeeperStorage::RequestForSession & request_for_session);
|
bool preprocess(const KeeperStorage::RequestForSession & request_for_session);
|
||||||
|
|
||||||
nuraft::ptr<nuraft::buffer> pre_commit(uint64_t log_idx, nuraft::buffer & data) override;
|
nuraft::ptr<nuraft::buffer> pre_commit(uint64_t log_idx, nuraft::buffer & data) override;
|
||||||
|
|
||||||
|
@ -516,7 +516,7 @@ void KeeperStorage::UncommittedState::rollback(int64_t rollback_zxid)
|
|||||||
|
|
||||||
std::shared_ptr<KeeperStorage::Node> KeeperStorage::UncommittedState::getNode(StringRef path) const
|
std::shared_ptr<KeeperStorage::Node> KeeperStorage::UncommittedState::getNode(StringRef path) const
|
||||||
{
|
{
|
||||||
if (auto node_it = nodes.find(std::string{path}); node_it != nodes.end())
|
if (auto node_it = nodes.find(path.toView()); node_it != nodes.end())
|
||||||
return node_it->second.node;
|
return node_it->second.node;
|
||||||
|
|
||||||
return tryGetNodeFromStorage(path);
|
return tryGetNodeFromStorage(path);
|
||||||
@ -524,7 +524,7 @@ std::shared_ptr<KeeperStorage::Node> KeeperStorage::UncommittedState::getNode(St
|
|||||||
|
|
||||||
Coordination::ACLs KeeperStorage::UncommittedState::getACLs(StringRef path) const
|
Coordination::ACLs KeeperStorage::UncommittedState::getACLs(StringRef path) const
|
||||||
{
|
{
|
||||||
if (auto node_it = nodes.find(std::string{path}); node_it != nodes.end())
|
if (auto node_it = nodes.find(path.toView()); node_it != nodes.end())
|
||||||
return node_it->second.acls;
|
return node_it->second.acls;
|
||||||
|
|
||||||
auto node_it = storage.container.find(path);
|
auto node_it = storage.container.find(path);
|
||||||
@ -830,7 +830,9 @@ bool KeeperStorage::checkACL(StringRef path, int32_t permission, int64_t session
|
|||||||
void KeeperStorage::unregisterEphemeralPath(int64_t session_id, const std::string & path)
|
void KeeperStorage::unregisterEphemeralPath(int64_t session_id, const std::string & path)
|
||||||
{
|
{
|
||||||
auto ephemerals_it = ephemerals.find(session_id);
|
auto ephemerals_it = ephemerals.find(session_id);
|
||||||
assert(ephemerals_it != ephemerals.end());
|
if (ephemerals_it == ephemerals.end())
|
||||||
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Session {} is missing ephemeral path");
|
||||||
|
|
||||||
ephemerals_it->second.erase(path);
|
ephemerals_it->second.erase(path);
|
||||||
if (ephemerals_it->second.empty())
|
if (ephemerals_it->second.empty())
|
||||||
ephemerals.erase(ephemerals_it);
|
ephemerals.erase(ephemerals_it);
|
||||||
@ -1825,6 +1827,11 @@ void KeeperStorage::finalize()
|
|||||||
session_expiry_queue.clear();
|
session_expiry_queue.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool KeeperStorage::isFinalized() const
|
||||||
|
{
|
||||||
|
return finalized;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class KeeperStorageRequestProcessorsFactory final : private boost::noncopyable
|
class KeeperStorageRequestProcessorsFactory final : private boost::noncopyable
|
||||||
{
|
{
|
||||||
@ -1892,7 +1899,7 @@ UInt64 KeeperStorage::calculateNodesDigest(UInt64 current_digest, const std::vec
|
|||||||
if (!keeper_context->digest_enabled)
|
if (!keeper_context->digest_enabled)
|
||||||
return current_digest;
|
return current_digest;
|
||||||
|
|
||||||
std::unordered_map<std::string, std::shared_ptr<Node>> updated_nodes;
|
std::unordered_map<std::string_view, std::shared_ptr<Node>> updated_nodes;
|
||||||
|
|
||||||
for (const auto & delta : new_deltas)
|
for (const auto & delta : new_deltas)
|
||||||
{
|
{
|
||||||
|
@ -425,6 +425,8 @@ public:
|
|||||||
|
|
||||||
void finalize();
|
void finalize();
|
||||||
|
|
||||||
|
bool isFinalized() const;
|
||||||
|
|
||||||
/// Set of methods for creating snapshots
|
/// Set of methods for creating snapshots
|
||||||
|
|
||||||
/// Turn on snapshot mode, so data inside Container is not deleted, but replaced with new version.
|
/// Turn on snapshot mode, so data inside Container is not deleted, but replaced with new version.
|
||||||
|
Loading…
Reference in New Issue
Block a user