Fix compatibility

This commit is contained in:
Antonio Andelic 2022-07-21 09:31:06 +00:00
parent 179d04518a
commit 7d30ab80c4
5 changed files with 57 additions and 16 deletions

View File

@ -39,9 +39,11 @@ int mainEntryClickHouseKeeperConverter(int argc, char ** argv)
try
{
DB::KeeperStorage storage(500, "", true);
DB::KeeperStorage storage(500, "", true, false);
DB::deserializeKeeperStorageFromSnapshotsDir(storage, options["zookeeper-snapshots-dir"].as<std::string>(), logger);
storage.initializeSystemNodes();
DB::deserializeLogsAndApplyToStorage(storage, options["zookeeper-logs-dir"].as<std::string>(), logger);
DB::SnapshotMetadataPtr snapshot_meta = std::make_shared<DB::SnapshotMetadata>(storage.getZXID(), 1, std::make_shared<nuraft::cluster_config>());
DB::KeeperStorageSnapshot snapshot(&storage, snapshot_meta);

View File

@ -13,6 +13,7 @@
#include <filesystem>
#include <memory>
#include <Common/logger_useful.h>
#include <Coordination/KeeperConstants.h>
namespace DB
{
@ -188,6 +189,7 @@ void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, Wr
{
const auto & path = it->key;
const auto & node = it->value;
/// Benign race condition possible while taking snapshot: NuRaft decide to create snapshot at some log id
/// and only after some time we lock storage and enable snapshot mode. So snapshot_container_size can be
/// slightly bigger than required.
@ -323,6 +325,7 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
readBinary(path, in);
KeeperStorage::Node node{};
readNode(node, in, current_version, storage.acl_map);
storage.container.insertOrReplace(path, node);
if (node.stat.ephemeralOwner != 0)
storage.ephemerals[node.stat.ephemeralOwner].insert(path);
@ -583,8 +586,9 @@ SnapshotDeserializationResult KeeperSnapshotManager::deserializeSnapshotFromBuff
compressed_reader = std::make_unique<CompressedReadBuffer>(*reader);
SnapshotDeserializationResult result;
result.storage = std::make_unique<KeeperStorage>(storage_tick_time, superdigest, digest_enabled);
result.storage = std::make_unique<KeeperStorage>(storage_tick_time, superdigest, digest_enabled, false);
KeeperStorageSnapshot::deserialize(result, *compressed_reader);
result.storage->initializeSystemNodes();
return result;
}

View File

@ -227,15 +227,27 @@ void KeeperStorage::Node::shallowCopy(const KeeperStorage::Node & other)
cached_digest = other.cached_digest;
}
KeeperStorage::KeeperStorage(int64_t tick_time_ms, const String & superdigest_, const bool digest_enabled_)
KeeperStorage::KeeperStorage(
int64_t tick_time_ms, const String & superdigest_, const bool digest_enabled_, const bool initialize_system_nodes)
: session_expiry_queue(tick_time_ms), digest_enabled(digest_enabled_), superdigest(superdigest_)
{
Node root_node;
container.insert("/", root_node);
addDigest(root_node, "/");
if (initialize_system_nodes)
initializeSystemNodes();
}
void KeeperStorage::initializeSystemNodes()
{
if (initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "KeeperStorage system nodes initialized twice");
const auto create_system_node = [&](const auto & path, auto data)
{
auto node_it = container.find(path);
if (node_it == container.end())
{
// we update numChildren during preprocessing so and createNode is called during
// commit so we need to update it manually here
@ -247,6 +259,17 @@ KeeperStorage::KeeperStorage(int64_t tick_time_ms, const String & superdigest_,
}
);
createNode(path, std::move(data), {}, false, {});
}
else
{
container.updateValue(
path,
[data = std::move(data)](KeeperStorage::Node & node)
{
node.setData(std::move(data));
}
);
}
};
create_system_node(keeper_system_path, "");
@ -254,6 +277,8 @@ KeeperStorage::KeeperStorage(int64_t tick_time_ms, const String & superdigest_,
assert(keeper_api_version_path.starts_with(keeper_system_path));
auto api_version_data = toString(static_cast<uint8_t>(current_keeper_api_version));
create_system_node(keeper_api_version_path, std::move(api_version_data));
initialized = true;
}
template <class... Ts>
@ -1847,6 +1872,9 @@ void KeeperStorage::preprocessRequest(
bool check_acl,
std::optional<Digest> digest)
{
if (!initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "KeeperStorage system nodes are not initialized");
int64_t last_zxid = getNextZXID() - 1;
if (uncommitted_transactions.empty())
@ -1932,6 +1960,9 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
bool check_acl,
bool is_local)
{
if (!initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "KeeperStorage system nodes are not initialized");
if (new_last_zxid)
{
if (uncommitted_transactions.empty())

View File

@ -340,7 +340,11 @@ public:
const String superdigest;
KeeperStorage(int64_t tick_time_ms, const String & superdigest_, bool digest_enabled_);
bool initialized{false};
KeeperStorage(int64_t tick_time_ms, const String & superdigest_, bool digest_enabled_, bool initialize_system_nodes = true);
void initializeSystemNodes();
/// Allocate new session id with the specified timeouts
int64_t getSessionID(int64_t session_timeout_ms)

View File

@ -137,7 +137,7 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::L
for (const auto & itr : storage.container)
{
if (itr.key != "/" && !itr.key.toView().starts_with(keeper_system_path))
if (itr.key != "/")
{
auto parent_path = parentPath(itr.key);
storage.container.updateValue(parent_path, [path = itr.key] (KeeperStorage::Node & value) { value.addChild(getBaseName(path)); value.stat.numChildren++; });