Handle backwards compatibility

This commit is contained in:
Antonio Andelic 2023-05-26 09:30:57 +00:00
parent 767193caf5
commit 3d98e591ba
5 changed files with 78 additions and 21 deletions

View File

@ -48,7 +48,7 @@ struct Settings;
M(UInt64, configuration_change_tries_count, 20, "How many times we will try to apply configuration change (add/remove server) to the cluster", 0) \
M(UInt64, max_log_file_size, 50 * 1024 * 1024, "Max size of the Raft log file. If possible, each created log file will preallocate this amount of bytes on disk. Set to 0 to disable the limit", 0) \
M(UInt64, log_file_overallocate_size, 50 * 1024 * 1024, "If max_log_file_size is not set to 0, this value will be added to it for preallocating bytes on disk. If a log record is larger than this value, it could lead to uncaught out-of-space issues so a larger value is preferred", 0) \
M(UInt64, min_request_size_for_cache, 50 * 1024, "Minimal size of the request to cache the deserialization result. Caching can have negative effect on latency for smaller requests", 0)
M(UInt64, min_request_size_for_cache, 50 * 1024, "Minimal size of the request to cache the deserialization result. Caching can have negative effect on latency for smaller requests, set to 0 to disable", 0)
DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS)

View File

@ -478,9 +478,11 @@ nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(const KeeperStorage::RequestFor
DB::writeIntBinary(request_for_session.session_id, write_buf);
request_for_session.request->write(write_buf);
DB::writeIntBinary(request_for_session.time, write_buf);
DB::writeIntBinary(static_cast<int64_t>(0), write_buf);
DB::writeIntBinary(KeeperStorage::DigestVersion::NO_DIGEST, write_buf);
DB::writeIntBinary(static_cast<uint64_t>(0), write_buf);
/// we fill with dummy values to eliminate unecessary copy later on when we will write correct values
DB::writeIntBinary(static_cast<int64_t>(0), write_buf); /// zxid
DB::writeIntBinary(KeeperStorage::DigestVersion::NO_DIGEST, write_buf); /// digest version or NO_DIGEST flag
DB::writeIntBinary(static_cast<uint64_t>(0), write_buf); /// digest value
/// if new fields are added, update KeeperStateMachine::ZooKeeperLogSerializationVersion along with parseRequest function and PreAppendLog callback handler
return write_buf.getBuffer();
}
@ -619,19 +621,47 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
assert(entry->get_val_type() == nuraft::app_log);
auto next_zxid = state_machine->getNextZxid();
auto & entry_buf = entry->get_buf();
auto request_for_session = state_machine->parseRequest(entry_buf);
auto entry_buf = entry->get_buf_ptr();
KeeperStateMachine::ZooKeeperLogSerializationVersion serialization_version;
auto request_for_session = state_machine->parseRequest(*entry_buf, /*final=*/false, &serialization_version);
request_for_session->zxid = next_zxid;
if (!state_machine->preprocess(*request_for_session))
return nuraft::cb_func::ReturnCode::ReturnNull;
request_for_session->digest = state_machine->getNodesDigest();
static constexpr size_t write_buffer_size
= sizeof(request_for_session->zxid) + sizeof(request_for_session->digest->version) + sizeof(request_for_session->digest->value);
auto * buffer_start = reinterpret_cast<BufferBase::Position>(entry_buf.data_begin() + entry_buf.size() - write_buffer_size);
using enum KeeperStateMachine::ZooKeeperLogSerializationVersion;
/// older versions of Keeper can send logs that are missing some fields
size_t bytes_missing = 0;
if (serialization_version < WITH_TIME)
bytes_missing += sizeof(request_for_session->time);
if (serialization_version < WITH_ZXID_DIGEST)
bytes_missing += sizeof(request_for_session->zxid) + sizeof(request_for_session->digest->version) + sizeof(request_for_session->digest->value);
if (bytes_missing != 0)
{
auto new_buffer = nuraft::buffer::alloc(entry_buf->size() + bytes_missing);
memcpy(new_buffer->data_begin(), entry_buf->data_begin(), entry_buf->size());
entry_buf = std::move(new_buffer);
entry = nuraft::cs_new<nuraft::log_entry>(entry->get_term(), entry_buf, entry->get_val_type());
}
size_t write_buffer_header_size
= sizeof(request_for_session->zxid) + sizeof(request_for_session->digest->version) + sizeof(request_for_session->digest->value);
if (serialization_version < WITH_TIME)
write_buffer_header_size += sizeof(request_for_session->time);
auto * buffer_start = reinterpret_cast<BufferBase::Position>(entry_buf->data_begin() + entry_buf->size() - write_buffer_header_size);
WriteBuffer write_buf(buffer_start, write_buffer_header_size);
if (serialization_version < WITH_TIME)
writeIntBinary(request_for_session->time, write_buf);
WriteBuffer write_buf(buffer_start, write_buffer_size);
writeIntBinary(request_for_session->zxid, write_buf);
writeIntBinary(request_for_session->digest->version, write_buf);
if (request_for_session->digest->version != KeeperStorage::NO_DIGEST)

View File

@ -150,7 +150,7 @@ void assertDigest(
nuraft::ptr<nuraft::buffer> KeeperStateMachine::pre_commit(uint64_t log_idx, nuraft::buffer & data)
{
auto request_for_session = parseRequest(data);
auto request_for_session = parseRequest(data, /*final=*/false);
if (!request_for_session->zxid)
request_for_session->zxid = log_idx;
@ -158,7 +158,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::pre_commit(uint64_t log_idx, nur
return nullptr;
}
std::shared_ptr<KeeperStorage::RequestForSession> KeeperStateMachine::parseRequest(nuraft::buffer & data, bool final)
std::shared_ptr<KeeperStorage::RequestForSession> KeeperStateMachine::parseRequest(nuraft::buffer & data, bool final, ZooKeeperLogSerializationVersion * serialization_version)
{
ReadBufferFromNuraftBuffer buffer(data);
auto request_for_session = std::make_shared<KeeperStorage::RequestForSession>();
@ -177,10 +177,10 @@ std::shared_ptr<KeeperStorage::RequestForSession> KeeperStateMachine::parseReque
Coordination::CLOSE_XID,
};
const bool should_cache = request_for_session->session_id != -1 && data.size() > min_request_size_to_cache
&& std::all_of(non_cacheable_xids.begin(),
non_cacheable_xids.end(),
[&](const auto non_cacheable_xid) { return xid != non_cacheable_xid; });
const bool should_cache
= min_request_size_to_cache != 0 && request_for_session->session_id != -1 && data.size() >= min_request_size_to_cache
&& std::all_of(
non_cacheable_xids.begin(), non_cacheable_xids.end(), [&](const auto non_cacheable_xid) { return xid != non_cacheable_xid; });
if (should_cache)
{
@ -212,23 +212,35 @@ std::shared_ptr<KeeperStorage::RequestForSession> KeeperStateMachine::parseReque
request_for_session->request->xid = xid;
request_for_session->request->readImpl(buffer);
using enum ZooKeeperLogSerializationVersion;
ZooKeeperLogSerializationVersion version = INITIAL;
if (!buffer.eof())
{
version = WITH_TIME;
readIntBinary(request_for_session->time, buffer);
else /// backward compatibility
}
else
request_for_session->time
= std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
if (!buffer.eof())
{
version = WITH_ZXID_DIGEST;
readIntBinary(request_for_session->zxid, buffer);
if (!buffer.eof())
{
chassert(!buffer.eof());
request_for_session->digest.emplace();
readIntBinary(request_for_session->digest->version, buffer);
if (request_for_session->digest->version != KeeperStorage::DigestVersion::NO_DIGEST || !buffer.eof())
readIntBinary(request_for_session->digest->value, buffer);
}
if (serialization_version)
*serialization_version = version;
if (should_cache && !final)
{
std::lock_guard lock(request_cache_mutex);

View File

@ -36,7 +36,22 @@ public:
/// Read state from the latest snapshot
void init();
std::shared_ptr<KeeperStorage::RequestForSession> parseRequest(nuraft::buffer & data, bool final = false);
enum ZooKeeperLogSerializationVersion
{
INITIAL = 0,
WITH_TIME = 1,
WITH_ZXID_DIGEST = 2,
};
/// lifetime of a parsed request is:
/// [preprocess/PreAppendLog -> commit]
/// [preprocess/PreAppendLog -> rollback]
/// on events like commit and rollback we can remove the parsed request to keep the memory usage at minimum
/// request cache is also cleaned on session close in case somethign strange happened
///
/// final - whether it's the final time we will fetch the request so we can safely remove it from cache
/// serialization_version - information about which fields were parsed from the buffer so we can modify the buffer accordingly
std::shared_ptr<KeeperStorage::RequestForSession> parseRequest(nuraft::buffer & data, bool final, ZooKeeperLogSerializationVersion * serialization_version = nullptr);
bool preprocess(const KeeperStorage::RequestForSession & request_for_session);

View File

@ -110,7 +110,7 @@ public:
struct RequestForSession
{
int64_t session_id;
int64_t time;
int64_t time{0};
Coordination::ZooKeeperRequestPtr request;
int64_t zxid{0};
std::optional<Digest> digest;