Small changes for Keeper

This commit is contained in:
Antonio Andelic 2022-04-05 06:27:03 +00:00
parent c3c284e6e6
commit d296eeee2d
12 changed files with 117 additions and 106 deletions

View File

@ -846,7 +846,7 @@ void ZooKeeper::receiveEvent()
void ZooKeeper::finalize(bool error_send, bool error_receive, const String & reason)
{
/// If some thread (send/receive) already finalizing session don't try to do it
bool already_started = finalization_started.exchange(true);
bool already_started = finalization_started.test_and_set();
LOG_TEST(log, "Finalizing session {}: finalization_started={}, queue_finished={}, reason={}",
session_id, already_started, requests_queue.isFinished(), reason);

View File

@ -209,7 +209,7 @@ private:
std::atomic<XID> next_xid {1};
/// Mark session finalization start. Used to avoid simultaneous
/// finalization from different threads. One-shot flag.
std::atomic<bool> finalization_started {false};
std::atomic_flag finalization_started;
using clock = std::chrono::steady_clock;

View File

@ -222,8 +222,8 @@ public:
}
/// Check for duplicated changelog ids
if (logs.count(record.header.index) != 0)
std::erase_if(logs, [record] (const auto & item) { return item.first >= record.header.index; });
if (logs.contains(record.header.index))
std::erase_if(logs, [&record] (const auto & item) { return item.first >= record.header.index; });
result.total_entries_read_from_log += 1;
@ -659,6 +659,7 @@ LogEntryPtr Changelog::getLatestConfigChange() const
nuraft::ptr<nuraft::buffer> Changelog::serializeEntriesToBuffer(uint64_t index, int32_t count)
{
std::vector<nuraft::ptr<nuraft::buffer>> returned_logs;
returned_logs.reserve(count);
uint64_t size_total = 0;
for (uint64_t i = index; i < index + count; ++i)
@ -669,7 +670,7 @@ nuraft::ptr<nuraft::buffer> Changelog::serializeEntriesToBuffer(uint64_t index,
nuraft::ptr<nuraft::buffer> buf = entry->second->serialize();
size_total += buf->size();
returned_logs.push_back(buf);
returned_logs.push_back(std::move(buf));
}
nuraft::ptr<nuraft::buffer> buf_out = nuraft::buffer::alloc(sizeof(int32_t) + count * sizeof(int32_t) + size_total);
@ -678,9 +679,8 @@ nuraft::ptr<nuraft::buffer> Changelog::serializeEntriesToBuffer(uint64_t index,
for (auto & entry : returned_logs)
{
nuraft::ptr<nuraft::buffer> & bb = entry;
buf_out->put(static_cast<int32_t>(bb->size()));
buf_out->put(*bb);
buf_out->put(static_cast<int32_t>(entry->size()));
buf_out->put(*entry);
}
return buf_out;
}
@ -699,7 +699,7 @@ void Changelog::applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer)
buffer.get(buf_local);
LogEntryPtr log_entry = nuraft::log_entry::deserialize(*buf_local);
if (i == 0 && logs.count(cur_index))
if (i == 0 && logs.contains(cur_index))
writeAt(cur_index, log_entry);
else
appendEntry(cur_index, log_entry);

View File

@ -121,7 +121,7 @@ void KeeperDispatcher::requestThread()
current_batch.clear();
}
prev_batch = current_batch;
prev_batch = std::move(current_batch);
prev_result = result;
}

View File

@ -43,7 +43,7 @@ namespace
void writeNode(const KeeperStorage::Node & node, SnapshotVersion version, WriteBuffer & out)
{
writeBinary(node.data, out);
writeBinary(node.getData(), out);
/// Serialize ACL
writeBinary(node.acl_id, out);
@ -71,7 +71,9 @@ namespace
void readNode(KeeperStorage::Node & node, ReadBuffer & in, SnapshotVersion version, ACLMap & acl_map)
{
readBinary(node.data, in);
String new_data;
readBinary(new_data, in);
node.setData(std::move(new_data));
if (version >= SnapshotVersion::V1)
{
@ -281,7 +283,7 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
if (itr.key != "/")
{
auto parent_path = parentPath(itr.key);
storage.container.updateValue(parent_path, [path = itr.key] (KeeperStorage::Node & value) { value.children.insert(getBaseName(path)); });
storage.container.updateValue(parent_path, [path = itr.key] (KeeperStorage::Node & value) { value.addChild(getBaseName(path)); });
}
}

View File

@ -259,22 +259,9 @@ void KeeperStateMachine::save_logical_snp_obj(
{
LOG_DEBUG(log, "Saving snapshot {} obj_id {}", s.get_last_log_idx(), obj_id);
nuraft::ptr<nuraft::buffer> cloned_buffer;
nuraft::ptr<nuraft::snapshot> cloned_meta;
if (obj_id == 0) /// Fake snapshot required by NuRaft at startup
{
std::lock_guard lock(storage_and_responses_lock);
KeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx(), getClusterConfig());
cloned_buffer = snapshot_manager.serializeSnapshotToBuffer(snapshot);
}
else
{
/// copy snapshot into memory
}
/// copy snapshot meta into memory
nuraft::ptr<nuraft::buffer> snp_buf = s.serialize();
cloned_meta = nuraft::snapshot::deserialize(*snp_buf);
nuraft::ptr<nuraft::snapshot> cloned_meta = nuraft::snapshot::deserialize(*snp_buf);
try
{
@ -332,31 +319,22 @@ int KeeperStateMachine::read_logical_snp_obj(
{
LOG_DEBUG(log, "Reading snapshot {} obj_id {}", s.get_last_log_idx(), obj_id);
if (obj_id == 0) /// Fake snapshot required by NuRaft at startup
std::lock_guard lock(snapshots_lock);
/// Our snapshot is not equal to required. Maybe we still creating it in the background.
/// Let's wait and NuRaft will retry this call.
if (s.get_last_log_idx() != latest_snapshot_meta->get_last_log_idx())
{
data_out = nuraft::buffer::alloc(sizeof(int32_t));
nuraft::buffer_serializer bs(data_out);
bs.put_i32(0);
is_last_obj = false;
LOG_WARNING(log, "Required to apply snapshot with last log index {}, but our last log index is {}. Will ignore this one and retry",
s.get_last_log_idx(), latest_snapshot_meta->get_last_log_idx());
return -1;
}
else
if (bufferFromFile(log, latest_snapshot_path, data_out))
{
std::lock_guard lock(snapshots_lock);
/// Our snapshot is not equal to required. Maybe we still creating it in the background.
/// Let's wait and NuRaft will retry this call.
if (s.get_last_log_idx() != latest_snapshot_meta->get_last_log_idx())
{
LOG_WARNING(log, "Required to apply snapshot with last log index {}, but our last log index is {}. Will ignore this one and retry",
s.get_last_log_idx(), latest_snapshot_meta->get_last_log_idx());
return -1;
}
if (bufferFromFile(log, latest_snapshot_path, data_out))
{
LOG_WARNING(log, "Error reading snapshot {} from {}", s.get_last_log_idx(), latest_snapshot_path);
return -1;
}
is_last_obj = true;
LOG_WARNING(log, "Error reading snapshot {} from {}", s.get_last_log_idx(), latest_snapshot_path);
return -1;
}
is_last_obj = true;
return 1;
}

View File

@ -84,7 +84,7 @@ public:
bool shouldStartAsFollower() const
{
std::lock_guard lock(configuration_wrapper_mutex);
return configuration_wrapper.servers_start_as_followers.count(my_server_id);
return configuration_wrapper.servers_start_as_followers.contains(my_server_id);
}
bool isSecure() const

View File

@ -24,7 +24,10 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
static String base64Encode(const String & decoded)
namespace
{
String base64Encode(const String & decoded)
{
std::ostringstream ostr; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
ostr.exceptions(std::ios::failbit);
@ -35,7 +38,7 @@ static String base64Encode(const String & decoded)
return ostr.str();
}
static String getSHA1(const String & userdata)
String getSHA1(const String & userdata)
{
Poco::SHA1Engine engine;
engine.update(userdata);
@ -43,14 +46,14 @@ static String getSHA1(const String & userdata)
return String{digest_id.begin(), digest_id.end()};
}
static String generateDigest(const String & userdata)
String generateDigest(const String & userdata)
{
std::vector<String> user_password;
boost::split(user_password, userdata, [](char c) { return c == ':'; });
return user_password[0] + ":" + base64Encode(getSHA1(userdata));
}
static bool checkACL(int32_t permission, const Coordination::ACLs & node_acls, const std::vector<KeeperStorage::AuthID> & session_auths)
bool checkACL(int32_t permission, const Coordination::ACLs & node_acls, const std::vector<KeeperStorage::AuthID> & session_auths)
{
if (node_acls.empty())
return true;
@ -77,7 +80,7 @@ static bool checkACL(int32_t permission, const Coordination::ACLs & node_acls, c
return false;
}
static bool fixupACL(
bool fixupACL(
const std::vector<Coordination::ACL> & request_acls,
const std::vector<KeeperStorage::AuthID> & current_ids,
std::vector<Coordination::ACL> & result_acls)
@ -119,7 +122,7 @@ static bool fixupACL(
return valid_found;
}
static KeeperStorage::ResponsesForSessions processWatchesImpl(const String & path, KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches, Coordination::Event event_type)
KeeperStorage::ResponsesForSessions processWatchesImpl(const String & path, KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches, Coordination::Event event_type)
{
KeeperStorage::ResponsesForSessions result;
auto it = watches.find(path);
@ -174,6 +177,25 @@ static KeeperStorage::ResponsesForSessions processWatchesImpl(const String & pat
}
return result;
}
}
void KeeperStorage::Node::setData(String new_data)
{
size_bytes = size_bytes - data.size() + new_data.size();
data = std::move(new_data);
}
void KeeperStorage::Node::addChild(StringRef child_path)
{
size_bytes += sizeof child_path;
children.insert(child_path);
}
void KeeperStorage::Node::removeChild(StringRef child_path)
{
size_bytes -= sizeof child_path;
children.erase(child_path);
}
KeeperStorage::KeeperStorage(int64_t tick_time_ms, const String & superdigest_)
: session_expiry_queue(tick_time_ms)
@ -314,8 +336,8 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
created_node.stat.numChildren = 0;
created_node.stat.dataLength = request.data.length();
created_node.stat.ephemeralOwner = request.is_ephemeral ? session_id : 0;
created_node.data = request.data;
created_node.is_sequental = request.is_sequential;
created_node.setData(std::move(request.data));
auto [map_key, _] = container.insert(path_created, created_node);
/// Take child path from key owned by map.
@ -327,8 +349,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
container.updateValue(parent_path, [child_path, zxid, &prev_parent_zxid,
parent_cversion, &prev_parent_cversion] (KeeperStorage::Node & parent)
{
parent.children.insert(child_path);
parent.size_bytes += child_path.size;
parent.addChild(child_path);
prev_parent_cversion = parent.stat.cversion;
prev_parent_zxid = parent.stat.pzxid;
@ -363,8 +384,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
--undo_parent.seq_num;
undo_parent.stat.cversion = prev_parent_cversion;
undo_parent.stat.pzxid = prev_parent_zxid;
undo_parent.children.erase(child_path);
undo_parent.size_bytes -= child_path.size;
undo_parent.removeChild(child_path);
});
storage.container.erase(path_created);
@ -409,7 +429,7 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
else
{
response.stat = it->value.stat;
response.data = it->value.data;
response.data = it->value.getData();
response.error = Coordination::Error::ZOK;
}
@ -498,8 +518,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
{
--parent.stat.numChildren;
++parent.stat.cversion;
parent.children.erase(child_basename);
parent.size_bytes -= child_basename.size;
parent.removeChild(child_basename);
});
response.error = Coordination::Error::ZOK;
@ -520,8 +539,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
{
++parent.stat.numChildren;
--parent.stat.cversion;
parent.children.insert(child_name);
parent.size_bytes += child_name.size;
parent.addChild(child_name);
});
};
}
@ -598,14 +616,13 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
auto prev_node = it->value;
auto itr = container.updateValue(request.path, [zxid, request, time] (KeeperStorage::Node & value)
auto itr = container.updateValue(request.path, [zxid, request, time] (KeeperStorage::Node & value) mutable
{
value.stat.version++;
value.stat.mzxid = zxid;
value.stat.mtime = time;
value.stat.dataLength = request.data.length();
value.size_bytes = value.size_bytes + request.data.size() - value.data.size();
value.data = request.data;
value.setData(std::move(request.data));
});
container.updateValue(parentPath(request.path), [] (KeeperStorage::Node & parent)
@ -675,9 +692,10 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
if (path_prefix.empty())
throw DB::Exception("Logical error: path cannot be empty", ErrorCodes::LOGICAL_ERROR);
response.names.reserve(it->value.children.size());
const auto & children = it->value.getChildren();
response.names.reserve(children.size());
for (const auto child : it->value.children)
for (const auto child : children)
response.names.push_back(child.toString());
response.stat = it->value.stat;
@ -856,24 +874,23 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
for (const auto & sub_request : request.requests)
{
auto sub_zk_request = std::dynamic_pointer_cast<Coordination::ZooKeeperRequest>(sub_request);
if (sub_zk_request->getOpNum() == Coordination::OpNum::Create)
switch (sub_zk_request->getOpNum())
{
concrete_requests.push_back(std::make_shared<KeeperStorageCreateRequestProcessor>(sub_zk_request));
case Coordination::OpNum::Create:
concrete_requests.push_back(std::make_shared<KeeperStorageCreateRequestProcessor>(sub_zk_request));
break;
case Coordination::OpNum::Remove:
concrete_requests.push_back(std::make_shared<KeeperStorageRemoveRequestProcessor>(sub_zk_request));
break;
case Coordination::OpNum::Set:
concrete_requests.push_back(std::make_shared<KeeperStorageSetRequestProcessor>(sub_zk_request));
break;
case Coordination::OpNum::Check:
concrete_requests.push_back(std::make_shared<KeeperStorageCheckRequestProcessor>(sub_zk_request));
break;
default:
throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Illegal command as part of multi ZooKeeper request {}", sub_zk_request->getOpNum());
}
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Remove)
{
concrete_requests.push_back(std::make_shared<KeeperStorageRemoveRequestProcessor>(sub_zk_request));
}
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Set)
{
concrete_requests.push_back(std::make_shared<KeeperStorageSetRequestProcessor>(sub_zk_request));
}
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Check)
{
concrete_requests.push_back(std::make_shared<KeeperStorageCheckRequestProcessor>(sub_zk_request));
}
else
throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Illegal command as part of multi ZooKeeper request {}", sub_zk_request->getOpNum());
}
}
@ -1092,8 +1109,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina
--parent.stat.numChildren;
++parent.stat.cversion;
auto base_name = getBaseName(ephemeral_path);
parent.children.erase(base_name);
parent.size_bytes -= base_name.size;
parent.removeChild(base_name);
});
container.erase(ephemeral_path);

View File

@ -32,28 +32,38 @@ public:
struct Node
{
String data;
uint64_t acl_id = 0; /// 0 -- no ACL by default
bool is_sequental = false;
Coordination::Stat stat{};
int32_t seq_num = 0;
ChildrenSet children{};
uint64_t size_bytes; // save size to avoid calculate every time
Node()
{
size_bytes = sizeof(size_bytes);
size_bytes += data.size();
size_bytes += sizeof(acl_id);
size_bytes += sizeof(is_sequental);
size_bytes += sizeof(stat);
size_bytes += sizeof(seq_num);
}
Node() : size_bytes(sizeof(Node)) { }
/// Object memory size
uint64_t sizeInBytes() const
{
return size_bytes;
}
void setData(String new_data);
const auto & getData() const noexcept
{
return data;
}
void addChild(StringRef child_path);
void removeChild(StringRef child_path);
const auto & getChildren() const noexcept
{
return children;
}
private:
String data;
ChildrenSet children{};
};
struct ResponseForSession
@ -104,7 +114,7 @@ public:
/// Mapping session_id -> set of ephemeral nodes paths
Ephemerals ephemerals;
/// Mapping sessuib_id -> set of watched nodes paths
/// Mapping session_id -> set of watched nodes paths
SessionAndWatcher sessions_and_watchers;
/// Expiration queue for session, allows to get dead sessions at some point of time
SessionExpiryQueue session_expiry_queue;

View File

@ -80,7 +80,7 @@ private:
approximate_data_size += value_size;
if (!snapshot_mode)
{
approximate_data_size += key_size;
approximate_data_size -= key_size;
approximate_data_size -= old_value_size;
}
}
@ -132,7 +132,6 @@ public:
if (!it)
{
ListElem elem{copyStringInArena(arena, key), value, current_version};
auto itr = list.insert(list.end(), std::move(elem));
bool inserted;
@ -228,7 +227,7 @@ public:
/// We in snapshot mode but updating some node which is already more
/// fresh than snapshot distance. So it will not participate in
/// snapshot and we don't need to copy it.
if (snapshot_mode && list_itr->version <= snapshot_up_to_version)
if (list_itr->version <= snapshot_up_to_version)
{
auto elem_copy = *(list_itr);
list_itr->active_in_map = false;

View File

@ -98,7 +98,9 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::L
while (path != "/")
{
KeeperStorage::Node node{};
Coordination::read(node.data, in);
String data;
Coordination::read(data, in);
node.setData(std::move(data));
Coordination::read(node.acl_id, in);
/// Deserialize stat
@ -117,7 +119,7 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::L
Coordination::read(node.stat.pzxid, in);
if (!path.empty())
{
node.stat.dataLength = node.data.length();
node.stat.dataLength = node.getData().length();
node.seq_num = node.stat.cversion;
storage.container.insertOrReplace(path, node);
@ -137,7 +139,7 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::L
if (itr.key != "/")
{
auto parent_path = parentPath(itr.key);
storage.container.updateValue(parent_path, [path = itr.key] (KeeperStorage::Node & value) { value.children.insert(getBaseName(path)); value.stat.numChildren++; });
storage.container.updateValue(parent_path, [path = itr.key] (KeeperStorage::Node & value) { value.addChild(getBaseName(path)); value.stat.numChildren++; });
}
}

View File

@ -946,6 +946,8 @@ TEST_P(CoordinationTest, SnapshotableHashMapDataSize)
EXPECT_EQ(hello.getApproximateDataSize(), 9);
hello.updateValue("hello", [](IntNode & value) { value = 2; });
EXPECT_EQ(hello.getApproximateDataSize(), 9);
hello.insertOrReplace("hello", 3);
EXPECT_EQ(hello.getApproximateDataSize(), 9);
hello.erase("hello");
EXPECT_EQ(hello.getApproximateDataSize(), 0);
@ -958,6 +960,8 @@ TEST_P(CoordinationTest, SnapshotableHashMapDataSize)
EXPECT_EQ(hello.getApproximateDataSize(), 9);
hello.updateValue("hello", [](IntNode & value) { value = 2; });
EXPECT_EQ(hello.getApproximateDataSize(), 18);
hello.insertOrReplace("hello", 1);
EXPECT_EQ(hello.getApproximateDataSize(), 27);
hello.clearOutdatedNodes();
EXPECT_EQ(hello.getApproximateDataSize(), 9);