Don't allow to overwrite on startup

This commit is contained in:
Antonio Andelic 2022-07-25 12:38:48 +00:00
parent afb6cb6824
commit 04b03b6a90
4 changed files with 87 additions and 28 deletions

View File

@ -12,6 +12,8 @@ struct KeeperContext
};
Phase server_state{Phase::INIT};
bool ignore_system_path_on_startup{false};
bool digest_enabled{true};
};

View File

@ -114,6 +114,7 @@ KeeperServer::KeeperServer(
LOG_WARNING(log, "Quorum reads enabled, Keeper will work slower.");
keeper_context->digest_enabled = config.getBool("keeper_server.digest_enabled", false);
keeper_context->ignore_system_path_on_startup = config.getBool("keeper_server.ignore_system_path_on_startup", false);
state_machine = nuraft::cs_new<KeeperStateMachine>(
responses_queue_,

View File

@ -367,16 +367,36 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
using enum PathMatchResult;
auto match_result = matchPath(path, keeper_system_path);
if ((match_result == EXACT && !is_node_empty(node)) || match_result == IS_CHILD)
const std::string error_msg = fmt::format("Cannot read node on path {} from a snapshot because it is used as a system node", path);
if (match_result == IS_CHILD)
{
LOG_ERROR(&Poco::Logger::get("KeeperSnapshotManager"), "Cannot read node on path {} from a snapshot because it is used as a system node.", path);
if (match_result == IS_CHILD)
if (keeper_context->ignore_system_path_on_startup || keeper_context->server_state != KeeperContext::Phase::INIT)
{
LOG_ERROR(&Poco::Logger::get("KeeperSnapshotManager"), "{}. Ignoring it", error_msg);
continue;
node = KeeperStorage::Node{};
}
else
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"{}. Ignoring it can lead to data loss. "
"If you still want to ignore it, you can set 'keeper_server.ignore_system_path_on_startup' to true",
error_msg);
}
else if (match_result == EXACT && !is_node_empty(node))
{
if (keeper_context->ignore_system_path_on_startup || keeper_context->server_state != KeeperContext::Phase::INIT)
{
LOG_ERROR(&Poco::Logger::get("KeeperSnapshotManager"), "{}. Ignoring it", error_msg);
node = KeeperStorage::Node{};
}
else
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"{}. Ignoring it can lead to data loss. "
"If you still want to ignore it, you can set 'keeper_server.ignore_system_path_on_startup' to true",
error_msg);
}
storage.container.insertOrReplace(path, node);
if (node.stat.ephemeralOwner != 0)

View File

@ -666,7 +666,7 @@ struct KeeperStorageRequestProcessor
explicit KeeperStorageRequestProcessor(const Coordination::ZooKeeperRequestPtr & zk_request_) : zk_request(zk_request_) { }
virtual Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid) const = 0;
virtual std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & /*storage*/, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /*time*/, uint64_t & /*digest*/) const
preprocess(KeeperStorage & /*storage*/, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /*time*/, uint64_t & /*digest*/, const KeeperContext & /*keeper_context*/) const
{
return {};
}
@ -782,7 +782,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
}
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time, uint64_t & digest) const override
preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time, uint64_t & digest, const KeeperContext & keeper_context) const override
{
Coordination::ZooKeeperCreateRequest & request = dynamic_cast<Coordination::ZooKeeperCreateRequest &>(*zk_request);
@ -808,15 +808,24 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
path_created += seq_num_str.str();
}
if (storage.uncommitted_state.getNode(path_created))
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNODEEXISTS}};
if (path_created.starts_with(keeper_system_path))
{
LOG_ERROR(&Poco::Logger::get("KeeperStorage"), "Trying to create a node inside the internal Keeper path ({}) which is not allowed. Path: {}", keeper_system_path, path_created);
auto error_msg = fmt::format("Trying to create a node inside the internal Keeper path ({}) which is not allowed. Path: {}", keeper_system_path, path_created);
if (keeper_context.server_state == KeeperContext::Phase::INIT && !keeper_context.ignore_system_path_on_startup)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"{}. Ignoring it can lead to data loss. "
"If you still want to ignore it, you can set 'keeper_server.ignore_system_path_on_startup' to true.",
error_msg);
LOG_ERROR(&Poco::Logger::get("KeeperStorage"), fmt::runtime(error_msg));
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};
}
if (storage.uncommitted_state.getNode(path_created))
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNODEEXISTS}};
if (getBaseName(path_created).size == 0)
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};
@ -901,7 +910,7 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & /*digest*/) const override
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & /*digest*/, const KeeperContext & /*keeper_context*/) const override
{
Coordination::ZooKeeperGetRequest & request = dynamic_cast<Coordination::ZooKeeperGetRequest &>(*zk_request);
@ -970,7 +979,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & digest) const override
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & digest, const KeeperContext & keeper_context) const override
{
Coordination::ZooKeeperRemoveRequest & request = dynamic_cast<Coordination::ZooKeeperRemoveRequest &>(*zk_request);
@ -978,7 +987,16 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
if (request.path.starts_with(keeper_system_path))
{
LOG_ERROR(&Poco::Logger::get("KeeperStorage"), "Trying to delete an internal Keeper path ({}) which is not allowed", request.path);
auto error_msg = fmt::format("Trying to delete an internal Keeper path ({}) which is not allowed", request.path);
if (keeper_context.server_state == KeeperContext::Phase::INIT && !keeper_context.ignore_system_path_on_startup)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"{}. Ignoring it can lead to data loss. "
"If you still want to ignore it, you can set 'keeper_server.ignore_system_path_on_startup' to true.",
error_msg);
LOG_ERROR(&Poco::Logger::get("KeeperStorage"), fmt::runtime(error_msg));
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};
}
@ -1058,7 +1076,7 @@ struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestPr
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & /*digest*/) const override
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & /*digest*/, const KeeperContext & /*keeper_context*/) const override
{
Coordination::ZooKeeperExistsRequest & request = dynamic_cast<Coordination::ZooKeeperExistsRequest &>(*zk_request);
@ -1122,7 +1140,7 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t time, uint64_t & digest) const override
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t time, uint64_t & digest, const KeeperContext & keeper_context) const override
{
Coordination::ZooKeeperSetRequest & request = dynamic_cast<Coordination::ZooKeeperSetRequest &>(*zk_request);
@ -1130,7 +1148,16 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
if (request.path.starts_with(keeper_system_path))
{
LOG_ERROR(&Poco::Logger::get("KeeperStorage"), "Trying to update an internal Keeper path ({}) which is not allowed", request.path);
auto error_msg = fmt::format("Trying to update an internal Keeper path ({}) which is not allowed", request.path);
if (keeper_context.server_state == KeeperContext::Phase::INIT && !keeper_context.ignore_system_path_on_startup)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"{}. Ignoring it can lead to data loss. "
"If you still want to ignore it, you can set 'keeper_server.ignore_system_path_on_startup' to true.",
error_msg);
LOG_ERROR(&Poco::Logger::get("KeeperStorage"), fmt::runtime(error_msg));
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};
}
@ -1212,7 +1239,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & /*digest*/) const override
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & /*digest*/, const KeeperContext & /*keeper_context*/) const override
{
Coordination::ZooKeeperListRequest & request = dynamic_cast<Coordination::ZooKeeperListRequest &>(*zk_request);
@ -1313,7 +1340,7 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & /*digest*/) const override
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & /*digest*/, const KeeperContext & /*keeper_context*/) const override
{
Coordination::ZooKeeperCheckRequest & request = dynamic_cast<Coordination::ZooKeeperCheckRequest &>(*zk_request);
@ -1391,13 +1418,22 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /*time*/, uint64_t & digest) const override
preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /*time*/, uint64_t & digest, const KeeperContext & keeper_context) const override
{
Coordination::ZooKeeperSetACLRequest & request = dynamic_cast<Coordination::ZooKeeperSetACLRequest &>(*zk_request);
if (request.path.starts_with(keeper_system_path))
{
LOG_ERROR(&Poco::Logger::get("KeeperStorage"), "Trying to update an internal Keeper path ({}) which is not allowed", request.path);
auto error_msg = fmt::format("Trying to update an internal Keeper path ({}) which is not allowed", request.path);
if (keeper_context.server_state == KeeperContext::Phase::INIT && !keeper_context.ignore_system_path_on_startup)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"{}. Ignoring it can lead to data loss. "
"If you still want to ignore it, you can set 'keeper_server.ignore_system_path_on_startup' to true.",
error_msg);
LOG_ERROR(&Poco::Logger::get("KeeperStorage"), fmt::runtime(error_msg));
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};
}
@ -1471,7 +1507,7 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & /*digest*/) const override
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & /*digest*/, const KeeperContext & /*keeper_context*/) const override
{
Coordination::ZooKeeperGetACLRequest & request = dynamic_cast<Coordination::ZooKeeperGetACLRequest &>(*zk_request);
@ -1568,14 +1604,14 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
}
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time, uint64_t & digest) const override
preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time, uint64_t & digest, const KeeperContext & keeper_context) const override
{
std::vector<Coordination::Error> response_errors;
response_errors.reserve(concrete_requests.size());
uint64_t current_digest = digest;
for (size_t i = 0; i < concrete_requests.size(); ++i)
{
auto new_deltas = concrete_requests[i]->preprocess(storage, zxid, session_id, time, current_digest);
auto new_deltas = concrete_requests[i]->preprocess(storage, zxid, session_id, time, current_digest, keeper_context);
if (!new_deltas.empty())
{
@ -1694,7 +1730,7 @@ struct KeeperStorageAuthRequestProcessor final : public KeeperStorageRequestProc
{
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /*time*/, uint64_t & /*digest*/) const override
preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /*time*/, uint64_t & /*digest*/, const KeeperContext & /*keeper_context*/) const override
{
Coordination::ZooKeeperAuthRequest & auth_request = dynamic_cast<Coordination::ZooKeeperAuthRequest &>(*zk_request);
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
@ -1955,7 +1991,7 @@ void KeeperStorage::preprocessRequest(
return;
}
new_deltas = request_processor->preprocess(*this, transaction.zxid, session_id, time, new_digest);
new_deltas = request_processor->preprocess(*this, transaction.zxid, session_id, time, new_digest, *keeper_context);
}
KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(