mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Revert "Add support for preprocessing ZooKeeper operations in clickhouse-keeper
"
This commit is contained in:
parent
b50d4549c9
commit
6a962549d5
@ -15,7 +15,6 @@
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <libnuraft/cluster_config.hxx>
|
||||
#include <libnuraft/log_val_type.hxx>
|
||||
#include <libnuraft/raft_server.hxx>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Poco/Util/Application.h>
|
||||
@ -316,22 +315,6 @@ void KeeperServer::startup(const Poco::Util::AbstractConfiguration & config, boo
|
||||
|
||||
state_manager->loadLogStore(state_machine->last_commit_index() + 1, coordination_settings->reserved_log_items);
|
||||
|
||||
auto log_store = state_manager->load_log_store();
|
||||
auto next_log_idx = log_store->next_slot();
|
||||
if (next_log_idx > 0 && next_log_idx > state_machine->last_commit_index())
|
||||
{
|
||||
auto log_entries = log_store->log_entries(state_machine->last_commit_index() + 1, next_log_idx);
|
||||
|
||||
auto idx = state_machine->last_commit_index() + 1;
|
||||
for (const auto & entry : *log_entries)
|
||||
{
|
||||
if (entry && entry->get_val_type() == nuraft::log_val_type::app_log)
|
||||
state_machine->preprocess(idx, entry->get_buf());
|
||||
|
||||
++idx;
|
||||
}
|
||||
}
|
||||
|
||||
loadLatestConfig();
|
||||
|
||||
last_local_config = state_manager->parseServersConfiguration(config, true).cluster_config;
|
||||
|
@ -44,6 +44,7 @@ namespace
|
||||
else /// backward compatibility
|
||||
request_for_session.time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
|
||||
|
||||
|
||||
return request_for_session;
|
||||
}
|
||||
}
|
||||
@ -113,21 +114,6 @@ void KeeperStateMachine::init()
|
||||
storage = std::make_unique<KeeperStorage>(coordination_settings->dead_session_check_period_ms.totalMilliseconds(), superdigest);
|
||||
}
|
||||
|
||||
nuraft::ptr<nuraft::buffer> KeeperStateMachine::pre_commit(uint64_t log_idx, nuraft::buffer & data)
|
||||
{
|
||||
preprocess(log_idx, data);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void KeeperStateMachine::preprocess(const uint64_t log_idx, nuraft::buffer & data)
|
||||
{
|
||||
auto request_for_session = parseRequest(data);
|
||||
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
|
||||
return;
|
||||
std::lock_guard lock(storage_and_responses_lock);
|
||||
storage->preprocessRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, log_idx);
|
||||
}
|
||||
|
||||
nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data)
|
||||
{
|
||||
auto request_for_session = parseRequest(data);
|
||||
@ -196,12 +182,6 @@ void KeeperStateMachine::commit_config(const uint64_t /* log_idx */, nuraft::ptr
|
||||
cluster_config = ClusterConfig::deserialize(*tmp);
|
||||
}
|
||||
|
||||
void KeeperStateMachine::rollback(uint64_t log_idx, nuraft::buffer & /*data*/)
|
||||
{
|
||||
std::lock_guard lock(storage_and_responses_lock);
|
||||
storage->rollbackRequest(log_idx);
|
||||
}
|
||||
|
||||
nuraft::ptr<nuraft::snapshot> KeeperStateMachine::last_snapshot()
|
||||
{
|
||||
/// Just return the latest snapshot.
|
||||
@ -363,7 +343,7 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi
|
||||
{
|
||||
/// Pure local request, just process it with storage
|
||||
std::lock_guard lock(storage_and_responses_lock);
|
||||
auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, std::nullopt, true /*check_acl*/, true /*is_local*/);
|
||||
auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, std::nullopt);
|
||||
for (const auto & response : responses)
|
||||
if (!responses_queue.push(response))
|
||||
throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", response.session_id);
|
||||
|
@ -27,16 +27,16 @@ public:
|
||||
/// Read state from the latest snapshot
|
||||
void init();
|
||||
|
||||
void preprocess(uint64_t log_idx, nuraft::buffer & data);
|
||||
|
||||
nuraft::ptr<nuraft::buffer> pre_commit(uint64_t log_idx, nuraft::buffer & data) override;
|
||||
/// Currently not supported
|
||||
nuraft::ptr<nuraft::buffer> pre_commit(const uint64_t /*log_idx*/, nuraft::buffer & /*data*/) override { return nullptr; }
|
||||
|
||||
nuraft::ptr<nuraft::buffer> commit(const uint64_t log_idx, nuraft::buffer & data) override; /// NOLINT
|
||||
|
||||
/// Save new cluster config to our snapshot (copy of the config stored in StateManager)
|
||||
void commit_config(const uint64_t log_idx, nuraft::ptr<nuraft::cluster_config> & new_conf) override; /// NOLINT
|
||||
|
||||
void rollback(uint64_t log_idx, nuraft::buffer & data) override;
|
||||
/// Currently not supported
|
||||
void rollback(const uint64_t /*log_idx*/, nuraft::buffer & /*data*/) override {}
|
||||
|
||||
uint64_t last_commit_index() override { return last_committed_idx; }
|
||||
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -1,14 +1,14 @@
|
||||
#pragma once
|
||||
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
#include <Coordination/ACLMap.h>
|
||||
#include <Common/ZooKeeper/IKeeper.h>
|
||||
#include <Common/ConcurrentBoundedQueue.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperCommon.h>
|
||||
#include <Coordination/SessionExpiryQueue.h>
|
||||
#include <Coordination/ACLMap.h>
|
||||
#include <Coordination/SnapshotableHashTable.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <Common/ConcurrentBoundedQueue.h>
|
||||
#include <Common/ZooKeeper/IKeeper.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperCommon.h>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include <absl/container/flat_hash_set.h>
|
||||
|
||||
@ -29,6 +29,7 @@ struct KeeperStorageSnapshot;
|
||||
class KeeperStorage
|
||||
{
|
||||
public:
|
||||
|
||||
struct Node
|
||||
{
|
||||
uint64_t acl_id = 0; /// 0 -- no ACL by default
|
||||
@ -40,18 +41,26 @@ public:
|
||||
Node() : size_bytes(sizeof(Node)) { }
|
||||
|
||||
/// Object memory size
|
||||
uint64_t sizeInBytes() const { return size_bytes; }
|
||||
uint64_t sizeInBytes() const
|
||||
{
|
||||
return size_bytes;
|
||||
}
|
||||
|
||||
void setData(String new_data);
|
||||
|
||||
const auto & getData() const noexcept { return data; }
|
||||
const auto & getData() const noexcept
|
||||
{
|
||||
return data;
|
||||
}
|
||||
|
||||
void addChild(StringRef child_path);
|
||||
|
||||
void removeChild(StringRef child_path);
|
||||
|
||||
const auto & getChildren() const noexcept { return children; }
|
||||
|
||||
const auto & getChildren() const noexcept
|
||||
{
|
||||
return children;
|
||||
}
|
||||
private:
|
||||
String data;
|
||||
ChildrenSet children{};
|
||||
@ -76,7 +85,10 @@ public:
|
||||
std::string scheme;
|
||||
std::string id;
|
||||
|
||||
bool operator==(const AuthID & other) const { return scheme == other.scheme && id == other.id; }
|
||||
bool operator==(const AuthID & other) const
|
||||
{
|
||||
return scheme == other.scheme && id == other.id;
|
||||
}
|
||||
};
|
||||
|
||||
using RequestsForSessions = std::vector<RequestForSession>;
|
||||
@ -100,146 +112,6 @@ public:
|
||||
/// container.
|
||||
Container container;
|
||||
|
||||
// Applying ZooKeeper request to storage consists of two steps:
|
||||
// - preprocessing which, instead of applying the changes directly to storage,
|
||||
// generates deltas with those changes, denoted with the request ZXID
|
||||
// - processing which applies deltas with the correct ZXID to the storage
|
||||
//
|
||||
// Delta objects allow us two things:
|
||||
// - fetch the latest, uncommitted state of an object by getting the committed
|
||||
// state of that same object from the storage and applying the deltas
|
||||
// in the same order as they are defined
|
||||
// - quickly commit the changes to the storage
|
||||
struct CreateNodeDelta
|
||||
{
|
||||
Coordination::Stat stat;
|
||||
bool is_ephemeral;
|
||||
bool is_sequental;
|
||||
Coordination::ACLs acls;
|
||||
String data;
|
||||
};
|
||||
|
||||
struct RemoveNodeDelta
|
||||
{
|
||||
int32_t version{-1};
|
||||
};
|
||||
|
||||
struct UpdateNodeDelta
|
||||
{
|
||||
std::function<void(Node &)> update_fn;
|
||||
int32_t version{-1};
|
||||
};
|
||||
|
||||
struct SetACLDelta
|
||||
{
|
||||
Coordination::ACLs acls;
|
||||
int32_t version{-1};
|
||||
};
|
||||
|
||||
struct ErrorDelta
|
||||
{
|
||||
Coordination::Error error;
|
||||
};
|
||||
|
||||
struct FailedMultiDelta
|
||||
{
|
||||
std::vector<Coordination::Error> error_codes;
|
||||
};
|
||||
|
||||
// Denotes end of a subrequest in multi request
|
||||
struct SubDeltaEnd
|
||||
{
|
||||
};
|
||||
|
||||
struct AddAuthDelta
|
||||
{
|
||||
int64_t session_id;
|
||||
AuthID auth_id;
|
||||
};
|
||||
|
||||
using Operation
|
||||
= std::variant<CreateNodeDelta, RemoveNodeDelta, UpdateNodeDelta, SetACLDelta, AddAuthDelta, ErrorDelta, SubDeltaEnd, FailedMultiDelta>;
|
||||
|
||||
struct Delta
|
||||
{
|
||||
Delta(String path_, int64_t zxid_, Operation operation_) : path(std::move(path_)), zxid(zxid_), operation(std::move(operation_)) { }
|
||||
|
||||
Delta(int64_t zxid_, Coordination::Error error) : Delta("", zxid_, ErrorDelta{error}) { }
|
||||
|
||||
Delta(int64_t zxid_, Operation subdelta) : Delta("", zxid_, subdelta) { }
|
||||
|
||||
String path;
|
||||
int64_t zxid;
|
||||
Operation operation;
|
||||
};
|
||||
|
||||
struct UncommittedState
|
||||
{
|
||||
explicit UncommittedState(KeeperStorage & storage_) : storage(storage_) { }
|
||||
|
||||
template <typename Visitor>
|
||||
void applyDeltas(StringRef path, const Visitor & visitor) const
|
||||
{
|
||||
for (const auto & delta : deltas)
|
||||
{
|
||||
if (path.empty() || delta.path == path)
|
||||
std::visit(visitor, delta.operation);
|
||||
}
|
||||
}
|
||||
|
||||
bool hasACL(int64_t session_id, bool is_local, std::function<bool(const AuthID &)> predicate)
|
||||
{
|
||||
for (const auto & session_auth : storage.session_and_auth[session_id])
|
||||
{
|
||||
if (predicate(session_auth))
|
||||
return true;
|
||||
}
|
||||
|
||||
if (is_local)
|
||||
return false;
|
||||
|
||||
|
||||
for (const auto & delta : deltas)
|
||||
{
|
||||
if (const auto * auth_delta = std::get_if<KeeperStorage::AddAuthDelta>(&delta.operation);
|
||||
auth_delta && auth_delta->session_id == session_id && predicate(auth_delta->auth_id))
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
std::shared_ptr<Node> getNode(StringRef path);
|
||||
bool hasNode(StringRef path) const;
|
||||
Coordination::ACLs getACLs(StringRef path) const;
|
||||
|
||||
std::deque<Delta> deltas;
|
||||
KeeperStorage & storage;
|
||||
};
|
||||
|
||||
UncommittedState uncommitted_state{*this};
|
||||
|
||||
Coordination::Error commit(int64_t zxid, int64_t session_id);
|
||||
|
||||
// Create node in the storage
|
||||
// Returns false if it failed to create the node, true otherwise
|
||||
// We don't care about the exact failure because we should've caught it during preprocessing
|
||||
bool createNode(
|
||||
const std::string & path,
|
||||
String data,
|
||||
const Coordination::Stat & stat,
|
||||
bool is_sequental,
|
||||
bool is_ephemeral,
|
||||
Coordination::ACLs node_acls,
|
||||
int64_t session_id);
|
||||
|
||||
// Remove node in the storage
|
||||
// Returns false if it failed to remove the node, true otherwise
|
||||
// We don't care about the exact failure because we should've caught it during preprocessing
|
||||
bool removeNode(const std::string & path, int32_t version);
|
||||
|
||||
bool checkACL(StringRef path, int32_t permissions, int64_t session_id, bool is_local);
|
||||
|
||||
/// Mapping session_id -> set of ephemeral nodes paths
|
||||
Ephemerals ephemerals;
|
||||
/// Mapping session_id -> set of watched nodes paths
|
||||
@ -258,12 +130,15 @@ public:
|
||||
|
||||
/// Currently active watches (node_path -> subscribed sessions)
|
||||
Watches watches;
|
||||
Watches list_watches; /// Watches for 'list' request (watches on children).
|
||||
Watches list_watches; /// Watches for 'list' request (watches on children).
|
||||
|
||||
void clearDeadWatches(int64_t session_id);
|
||||
|
||||
/// Get current zxid
|
||||
int64_t getZXID() const { return zxid; }
|
||||
int64_t getZXID() const
|
||||
{
|
||||
return zxid;
|
||||
}
|
||||
|
||||
const String superdigest;
|
||||
|
||||
@ -287,53 +162,78 @@ public:
|
||||
|
||||
/// Process user request and return response.
|
||||
/// check_acl = false only when converting data from ZooKeeper.
|
||||
ResponsesForSessions processRequest(
|
||||
const Coordination::ZooKeeperRequestPtr & request,
|
||||
int64_t session_id,
|
||||
int64_t time,
|
||||
std::optional<int64_t> new_last_zxid,
|
||||
bool check_acl = true,
|
||||
bool is_local = false);
|
||||
void preprocessRequest(
|
||||
const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, int64_t time, int64_t new_last_zxid, bool check_acl = true);
|
||||
void rollbackRequest(int64_t rollback_zxid);
|
||||
ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, int64_t time, std::optional<int64_t> new_last_zxid, bool check_acl = true);
|
||||
|
||||
void finalize();
|
||||
|
||||
/// Set of methods for creating snapshots
|
||||
|
||||
/// Turn on snapshot mode, so data inside Container is not deleted, but replaced with new version.
|
||||
void enableSnapshotMode(size_t up_to_version) { container.enableSnapshotMode(up_to_version); }
|
||||
void enableSnapshotMode(size_t up_to_version)
|
||||
{
|
||||
container.enableSnapshotMode(up_to_version);
|
||||
|
||||
}
|
||||
|
||||
/// Turn off snapshot mode.
|
||||
void disableSnapshotMode() { container.disableSnapshotMode(); }
|
||||
void disableSnapshotMode()
|
||||
{
|
||||
container.disableSnapshotMode();
|
||||
}
|
||||
|
||||
Container::const_iterator getSnapshotIteratorBegin() const { return container.begin(); }
|
||||
Container::const_iterator getSnapshotIteratorBegin() const
|
||||
{
|
||||
return container.begin();
|
||||
}
|
||||
|
||||
/// Clear outdated data from internal container.
|
||||
void clearGarbageAfterSnapshot() { container.clearOutdatedNodes(); }
|
||||
void clearGarbageAfterSnapshot()
|
||||
{
|
||||
container.clearOutdatedNodes();
|
||||
}
|
||||
|
||||
/// Get all active sessions
|
||||
const SessionAndTimeout & getActiveSessions() const { return session_and_timeout; }
|
||||
const SessionAndTimeout & getActiveSessions() const
|
||||
{
|
||||
return session_and_timeout;
|
||||
}
|
||||
|
||||
/// Get all dead sessions
|
||||
std::vector<int64_t> getDeadSessions() const { return session_expiry_queue.getExpiredSessions(); }
|
||||
std::vector<int64_t> getDeadSessions() const
|
||||
{
|
||||
return session_expiry_queue.getExpiredSessions();
|
||||
}
|
||||
|
||||
/// Introspection functions mostly used in 4-letter commands
|
||||
uint64_t getNodesCount() const { return container.size(); }
|
||||
uint64_t getNodesCount() const
|
||||
{
|
||||
return container.size();
|
||||
}
|
||||
|
||||
uint64_t getApproximateDataSize() const { return container.getApproximateDataSize(); }
|
||||
uint64_t getApproximateDataSize() const
|
||||
{
|
||||
return container.getApproximateDataSize();
|
||||
}
|
||||
|
||||
uint64_t getArenaDataSize() const { return container.keyArenaSize(); }
|
||||
uint64_t getArenaDataSize() const
|
||||
{
|
||||
return container.keyArenaSize();
|
||||
}
|
||||
|
||||
|
||||
uint64_t getTotalWatchesCount() const;
|
||||
|
||||
uint64_t getWatchedPathsCount() const { return watches.size() + list_watches.size(); }
|
||||
uint64_t getWatchedPathsCount() const
|
||||
{
|
||||
return watches.size() + list_watches.size();
|
||||
}
|
||||
|
||||
uint64_t getSessionsWithWatchesCount() const;
|
||||
|
||||
uint64_t getSessionWithEphemeralNodesCount() const { return ephemerals.size(); }
|
||||
uint64_t getSessionWithEphemeralNodesCount() const
|
||||
{
|
||||
return ephemerals.size();
|
||||
}
|
||||
uint64_t getTotalEphemeralNodesCount() const;
|
||||
|
||||
void dumpWatches(WriteBufferFromOwnString & buf) const;
|
||||
|
@ -12,6 +12,7 @@ public:
|
||||
WriteBufferFromNuraftBuffer();
|
||||
|
||||
nuraft::ptr<nuraft::buffer> getBuffer();
|
||||
bool isFinished() const { return finalized; }
|
||||
|
||||
~WriteBufferFromNuraftBuffer() override;
|
||||
|
||||
|
@ -520,7 +520,6 @@ bool deserializeTxn(KeeperStorage & storage, ReadBuffer & in, Poco::Logger * /*l
|
||||
if (request->getOpNum() == Coordination::OpNum::Multi && hasErrorsInMultiRequest(request))
|
||||
return true;
|
||||
|
||||
storage.preprocessRequest(request, session_id, time, zxid, /* check_acl = */ false);
|
||||
storage.processRequest(request, session_id, time, zxid, /* check_acl = */ false);
|
||||
}
|
||||
}
|
||||
|
@ -1,8 +1,6 @@
|
||||
#include <chrono>
|
||||
#include <gtest/gtest.h>
|
||||
#include "Common/ZooKeeper/IKeeper.h"
|
||||
|
||||
#include "Coordination/KeeperStorage.h"
|
||||
#include "config_core.h"
|
||||
|
||||
#if USE_NURAFT
|
||||
@ -1263,7 +1261,6 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint
|
||||
changelog.append(entry);
|
||||
changelog.end_of_append_batch(0, 0);
|
||||
|
||||
state_machine->pre_commit(i, changelog.entry_at(i)->get_buf());
|
||||
state_machine->commit(i, changelog.entry_at(i)->get_buf());
|
||||
bool snapshot_created = false;
|
||||
if (i % settings->snapshot_distance == 0)
|
||||
@ -1308,7 +1305,6 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint
|
||||
|
||||
for (size_t i = restore_machine->last_commit_index() + 1; i < restore_changelog.next_slot(); ++i)
|
||||
{
|
||||
restore_machine->pre_commit(i, changelog.entry_at(i)->get_buf());
|
||||
restore_machine->commit(i, changelog.entry_at(i)->get_buf());
|
||||
}
|
||||
|
||||
@ -1411,7 +1407,6 @@ TEST_P(CoordinationTest, TestEphemeralNodeRemove)
|
||||
request_c->path = "/hello";
|
||||
request_c->is_ephemeral = true;
|
||||
auto entry_c = getLogEntryFromZKRequest(0, 1, request_c);
|
||||
state_machine->pre_commit(1, entry_c->get_buf());
|
||||
state_machine->commit(1, entry_c->get_buf());
|
||||
const auto & storage = state_machine->getStorage();
|
||||
|
||||
@ -1420,7 +1415,6 @@ TEST_P(CoordinationTest, TestEphemeralNodeRemove)
|
||||
request_d->path = "/hello";
|
||||
/// Delete from other session
|
||||
auto entry_d = getLogEntryFromZKRequest(0, 2, request_d);
|
||||
state_machine->pre_commit(2, entry_d->get_buf());
|
||||
state_machine->commit(2, entry_d->get_buf());
|
||||
|
||||
EXPECT_EQ(storage.ephemerals.size(), 0);
|
||||
@ -1783,130 +1777,6 @@ TEST_P(CoordinationTest, TestLogGap)
|
||||
EXPECT_EQ(changelog1.next_slot(), 61);
|
||||
}
|
||||
|
||||
template <typename ResponseType>
|
||||
ResponseType getSingleResponse(const auto & responses)
|
||||
{
|
||||
EXPECT_FALSE(responses.empty());
|
||||
return dynamic_cast<ResponseType &>(*responses[0].response);
|
||||
}
|
||||
|
||||
TEST_P(CoordinationTest, TestUncommittedStateBasicCrud)
|
||||
{
|
||||
using namespace DB;
|
||||
using namespace Coordination;
|
||||
|
||||
DB::KeeperStorage storage{500, ""};
|
||||
|
||||
constexpr std::string_view path = "/test";
|
||||
|
||||
const auto get_committed_data = [&]() -> std::optional<String>
|
||||
{
|
||||
auto request = std::make_shared<ZooKeeperGetRequest>();
|
||||
request->path = path;
|
||||
auto responses = storage.processRequest(request, 0, 0, std::nullopt, true, true);
|
||||
const auto & get_response = getSingleResponse<ZooKeeperGetResponse>(responses);
|
||||
|
||||
if (get_response.error != Error::ZOK)
|
||||
return std::nullopt;
|
||||
|
||||
return get_response.data;
|
||||
};
|
||||
|
||||
const auto preprocess_get = [&](int64_t zxid)
|
||||
{
|
||||
auto get_request = std::make_shared<ZooKeeperGetRequest>();
|
||||
get_request->path = path;
|
||||
storage.preprocessRequest(get_request, 0, 0, zxid);
|
||||
return get_request;
|
||||
};
|
||||
|
||||
const auto create_request = std::make_shared<ZooKeeperCreateRequest>();
|
||||
create_request->path = path;
|
||||
create_request->data = "initial_data";
|
||||
storage.preprocessRequest(create_request, 0, 0, 1);
|
||||
storage.preprocessRequest(create_request, 0, 0, 2);
|
||||
|
||||
ASSERT_FALSE(get_committed_data());
|
||||
|
||||
const auto after_create_get = preprocess_get(3);
|
||||
|
||||
ASSERT_FALSE(get_committed_data());
|
||||
|
||||
const auto set_request = std::make_shared<ZooKeeperSetRequest>();
|
||||
set_request->path = path;
|
||||
set_request->data = "new_data";
|
||||
storage.preprocessRequest(set_request, 0, 0, 4);
|
||||
|
||||
const auto after_set_get = preprocess_get(5);
|
||||
|
||||
ASSERT_FALSE(get_committed_data());
|
||||
|
||||
const auto remove_request = std::make_shared<ZooKeeperRemoveRequest>();
|
||||
remove_request->path = path;
|
||||
storage.preprocessRequest(remove_request, 0, 0, 6);
|
||||
storage.preprocessRequest(remove_request, 0, 0, 7);
|
||||
|
||||
const auto after_remove_get = preprocess_get(8);
|
||||
|
||||
ASSERT_FALSE(get_committed_data());
|
||||
|
||||
{
|
||||
const auto responses = storage.processRequest(create_request, 0, 0, 1);
|
||||
const auto & create_response = getSingleResponse<ZooKeeperCreateResponse>(responses);
|
||||
ASSERT_EQ(create_response.error, Error::ZOK);
|
||||
}
|
||||
|
||||
{
|
||||
const auto responses = storage.processRequest(create_request, 0, 0, 2);
|
||||
const auto & create_response = getSingleResponse<ZooKeeperCreateResponse>(responses);
|
||||
ASSERT_EQ(create_response.error, Error::ZNODEEXISTS);
|
||||
}
|
||||
|
||||
{
|
||||
const auto responses = storage.processRequest(after_create_get, 0, 0, 3);
|
||||
const auto & get_response = getSingleResponse<ZooKeeperGetResponse>(responses);
|
||||
ASSERT_EQ(get_response.error, Error::ZOK);
|
||||
ASSERT_EQ(get_response.data, "initial_data");
|
||||
}
|
||||
|
||||
ASSERT_EQ(get_committed_data(), "initial_data");
|
||||
|
||||
{
|
||||
const auto responses = storage.processRequest(set_request, 0, 0, 4);
|
||||
const auto & create_response = getSingleResponse<ZooKeeperSetResponse>(responses);
|
||||
ASSERT_EQ(create_response.error, Error::ZOK);
|
||||
}
|
||||
|
||||
{
|
||||
const auto responses = storage.processRequest(after_set_get, 0, 0, 5);
|
||||
const auto & get_response = getSingleResponse<ZooKeeperGetResponse>(responses);
|
||||
ASSERT_EQ(get_response.error, Error::ZOK);
|
||||
ASSERT_EQ(get_response.data, "new_data");
|
||||
}
|
||||
|
||||
ASSERT_EQ(get_committed_data(), "new_data");
|
||||
|
||||
{
|
||||
const auto responses = storage.processRequest(remove_request, 0, 0, 6);
|
||||
const auto & create_response = getSingleResponse<ZooKeeperRemoveResponse>(responses);
|
||||
ASSERT_EQ(create_response.error, Error::ZOK);
|
||||
}
|
||||
|
||||
{
|
||||
const auto responses = storage.processRequest(remove_request, 0, 0, 7);
|
||||
const auto & create_response = getSingleResponse<ZooKeeperRemoveResponse>(responses);
|
||||
ASSERT_EQ(create_response.error, Error::ZNONODE);
|
||||
}
|
||||
|
||||
{
|
||||
const auto responses = storage.processRequest(after_remove_get, 0, 0, 8);
|
||||
const auto & get_response = getSingleResponse<ZooKeeperGetResponse>(responses);
|
||||
ASSERT_EQ(get_response.error, Error::ZNONODE);
|
||||
}
|
||||
|
||||
ASSERT_FALSE(get_committed_data());
|
||||
}
|
||||
|
||||
|
||||
INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite,
|
||||
CoordinationTest,
|
||||
|
Loading…
Reference in New Issue
Block a user