Merge pull request #21677 from ClickHouse/jepsen_for_nukeeper

Jepsen for nukeeper
This commit is contained in:
alesapin 2021-03-27 10:18:42 +03:00 committed by GitHub
commit ced6d8e6bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 1731 additions and 75 deletions

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit 3d3683e77753cfe015a05fae95ddf418e19f59e1
Subproject commit 70468326ad5d72e9497944838484c591dae054ea

View File

@ -31,6 +31,8 @@ struct Settings;
M(UInt64, rotate_log_storage_interval, 10000, "How many records will be stored in one log storage file", 0) \
M(UInt64, snapshots_to_keep, 3, "How many compressed snapshots to keep on disk", 0) \
M(UInt64, stale_log_gap, 10000, "When node became stale and should receive snapshots from leader", 0) \
M(UInt64, fresh_log_gap, 200, "When node became fresh", 0) \
M(Bool, quorum_reads, false, "Execute read requests as writes through whole RAFT consesus with similar speed", 0) \
M(Bool, force_sync, true, " Call fsync on each change in RAFT changelog", 0)
DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS)

View File

@ -30,6 +30,8 @@ NuKeeperServer::NuKeeperServer(
, state_manager(nuraft::cs_new<NuKeeperStateManager>(server_id, "test_keeper_server", config, coordination_settings))
, responses_queue(responses_queue_)
{
if (coordination_settings->quorum_reads)
LOG_WARNING(&Poco::Logger::get("NuKeeperServer"), "Quorum reads enabled, NuKeeper will work slower.");
}
void NuKeeperServer::startup()
@ -59,6 +61,7 @@ void NuKeeperServer::startup()
params.reserved_log_items_ = coordination_settings->reserved_log_items;
params.snapshot_distance_ = coordination_settings->snapshot_distance;
params.stale_log_gap_ = coordination_settings->stale_log_gap;
params.fresh_log_gap_ = coordination_settings->fresh_log_gap;
params.client_req_timeout_ = coordination_settings->operation_timeout_ms.totalMilliseconds();
params.auto_forwarding_ = coordination_settings->auto_forwarding;
params.auto_forwarding_req_timeout_ = coordination_settings->operation_timeout_ms.totalMilliseconds() * 2;
@ -106,7 +109,7 @@ nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, const Coord
void NuKeeperServer::putRequest(const NuKeeperStorage::RequestForSession & request_for_session)
{
auto [session_id, request] = request_for_session;
if (isLeaderAlive() && request->isReadRequest())
if (!coordination_settings->quorum_reads && isLeaderAlive() && request->isReadRequest())
{
state_machine->processReadRequest(request_for_session);
}
@ -185,6 +188,9 @@ nuraft::cb_func::ReturnCode NuKeeperServer::callbackFunc(nuraft::cb_func::Type t
if (next_index < last_commited || next_index - last_commited <= 1)
commited_store = true;
if (initialized_flag)
return nuraft::cb_func::ReturnCode::Ok;
auto set_initialized = [this] ()
{
std::unique_lock lock(initialized_mutex);
@ -196,10 +202,27 @@ nuraft::cb_func::ReturnCode NuKeeperServer::callbackFunc(nuraft::cb_func::Type t
{
case nuraft::cb_func::BecomeLeader:
{
if (commited_store) /// We become leader and store is empty, ready to serve requests
/// We become leader and store is empty or we already committed it
if (commited_store || initial_batch_committed)
set_initialized();
return nuraft::cb_func::ReturnCode::Ok;
}
case nuraft::cb_func::BecomeFollower:
case nuraft::cb_func::GotAppendEntryReqFromLeader:
{
if (isLeaderAlive())
{
auto leader_index = raft_instance->get_leader_committed_log_idx();
auto our_index = raft_instance->get_committed_log_idx();
/// This may happen when we start RAFT cluster from scratch.
/// Node first became leader, and after that some other node became leader.
/// BecameFresh for this node will not be called because it was already fresh
/// when it was leader.
if (leader_index < our_index + coordination_settings->fresh_log_gap)
set_initialized();
}
return nuraft::cb_func::ReturnCode::Ok;
}
case nuraft::cb_func::BecomeFresh:
{
set_initialized(); /// We are fresh follower, ready to serve requests.
@ -209,6 +232,7 @@ nuraft::cb_func::ReturnCode NuKeeperServer::callbackFunc(nuraft::cb_func::Type t
{
if (isLeader()) /// We have committed our log store and we are leader, ready to serve requests.
set_initialized();
initial_batch_committed = true;
return nuraft::cb_func::ReturnCode::Ok;
}
default: /// ignore other events
@ -220,7 +244,7 @@ void NuKeeperServer::waitInit()
{
std::unique_lock lock(initialized_mutex);
int64_t timeout = coordination_settings->startup_timeout.totalMilliseconds();
if (!initialized_cv.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return initialized_flag; }))
if (!initialized_cv.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return initialized_flag.load(); }))
throw Exception(ErrorCodes::RAFT_ERROR, "Failed to wait RAFT initialization");
}

View File

@ -31,8 +31,9 @@ private:
ResponsesQueue & responses_queue;
std::mutex initialized_mutex;
bool initialized_flag = false;
std::atomic<bool> initialized_flag = false;
std::condition_variable initialized_cv;
std::atomic<bool> initial_batch_committed = false;
nuraft::cb_func::ReturnCode callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * param);

View File

@ -241,9 +241,10 @@ NuKeeperStorageSnapshot::~NuKeeperStorageSnapshot()
storage->disableSnapshotMode();
}
NuKeeperSnapshotManager::NuKeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_)
NuKeeperSnapshotManager::NuKeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_, size_t storage_tick_time_)
: snapshots_path(snapshots_path_)
, snapshots_to_keep(snapshots_to_keep_)
, storage_tick_time(storage_tick_time_)
{
namespace fs = std::filesystem;
@ -325,22 +326,24 @@ nuraft::ptr<nuraft::buffer> NuKeeperSnapshotManager::serializeSnapshotToBuffer(c
return writer.getBuffer();
}
SnapshotMetadataPtr NuKeeperSnapshotManager::deserializeSnapshotFromBuffer(NuKeeperStorage * storage, nuraft::ptr<nuraft::buffer> buffer)
SnapshotMetaAndStorage NuKeeperSnapshotManager::deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const
{
ReadBufferFromNuraftBuffer reader(buffer);
CompressedReadBuffer compressed_reader(reader);
return NuKeeperStorageSnapshot::deserialize(*storage, compressed_reader);
auto storage = std::make_unique<NuKeeperStorage>(storage_tick_time);
auto snapshot_metadata = NuKeeperStorageSnapshot::deserialize(*storage, compressed_reader);
return std::make_pair(snapshot_metadata, std::move(storage));
}
SnapshotMetadataPtr NuKeeperSnapshotManager::restoreFromLatestSnapshot(NuKeeperStorage * storage)
SnapshotMetaAndStorage NuKeeperSnapshotManager::restoreFromLatestSnapshot()
{
if (existing_snapshots.empty())
return nullptr;
return {};
auto buffer = deserializeLatestSnapshotBufferFromDisk();
if (!buffer)
return nullptr;
return deserializeSnapshotFromBuffer(storage, buffer);
return {};
return deserializeSnapshotFromBuffer(buffer);
}
void NuKeeperSnapshotManager::removeOutdatedSnapshotsIfNeeded()

View File

@ -40,17 +40,20 @@ public:
using NuKeeperStorageSnapshotPtr = std::shared_ptr<NuKeeperStorageSnapshot>;
using CreateSnapshotCallback = std::function<void(NuKeeperStorageSnapshotPtr &&)>;
using SnapshotMetaAndStorage = std::pair<SnapshotMetadataPtr, NuKeeperStoragePtr>;
class NuKeeperSnapshotManager
{
public:
NuKeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_);
NuKeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_, size_t storage_tick_time_ = 500);
SnapshotMetadataPtr restoreFromLatestSnapshot(NuKeeperStorage * storage);
SnapshotMetaAndStorage restoreFromLatestSnapshot();
static nuraft::ptr<nuraft::buffer> serializeSnapshotToBuffer(const NuKeeperStorageSnapshot & snapshot);
std::string serializeSnapshotBufferToDisk(nuraft::buffer & buffer, size_t up_to_log_idx);
static SnapshotMetadataPtr deserializeSnapshotFromBuffer(NuKeeperStorage * storage, nuraft::ptr<nuraft::buffer> buffer);
SnapshotMetaAndStorage deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const;
nuraft::ptr<nuraft::buffer> deserializeSnapshotBufferFromDisk(size_t up_to_log_idx) const;
nuraft::ptr<nuraft::buffer> deserializeLatestSnapshotBufferFromDisk();
@ -74,6 +77,7 @@ private:
const std::string snapshots_path;
const size_t snapshots_to_keep;
std::map<size_t, std::string> existing_snapshots;
size_t storage_tick_time;
};
struct CreateSnapshotTask

View File

@ -4,6 +4,7 @@
#include <IO/ReadHelpers.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Coordination/NuKeeperSnapshotManager.h>
#include <future>
namespace DB
{
@ -37,8 +38,7 @@ NuKeeperStorage::RequestForSession parseRequest(nuraft::buffer & data)
NuKeeperStateMachine::NuKeeperStateMachine(ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_)
: coordination_settings(coordination_settings_)
, storage(coordination_settings->dead_session_check_period_ms.totalMilliseconds())
, snapshot_manager(snapshots_path_, coordination_settings->snapshots_to_keep)
, snapshot_manager(snapshots_path_, coordination_settings->snapshots_to_keep, coordination_settings->dead_session_check_period_ms.totalMicroseconds())
, responses_queue(responses_queue_)
, snapshots_queue(snapshots_queue_)
, last_committed_idx(0)
@ -60,7 +60,7 @@ void NuKeeperStateMachine::init()
try
{
latest_snapshot_buf = snapshot_manager.deserializeSnapshotBufferFromDisk(latest_log_index);
latest_snapshot_meta = snapshot_manager.deserializeSnapshotFromBuffer(&storage, latest_snapshot_buf);
std::tie(latest_snapshot_meta, storage) = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_buf);
last_committed_idx = latest_snapshot_meta->get_last_log_idx();
loaded = true;
break;
@ -83,6 +83,9 @@ void NuKeeperStateMachine::init()
{
LOG_DEBUG(log, "No existing snapshots, last committed log index {}", last_committed_idx);
}
if (!storage)
storage = std::make_unique<NuKeeperStorage>(coordination_settings->dead_session_check_period_ms.totalMilliseconds());
}
nuraft::ptr<nuraft::buffer> NuKeeperStateMachine::commit(const size_t log_idx, nuraft::buffer & data)
@ -96,7 +99,7 @@ nuraft::ptr<nuraft::buffer> NuKeeperStateMachine::commit(const size_t log_idx, n
nuraft::buffer_serializer bs(response);
{
std::lock_guard lock(storage_lock);
session_id = storage.getSessionID(session_timeout_ms);
session_id = storage->getSessionID(session_timeout_ms);
bs.put_i64(session_id);
}
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_timeout_ms);
@ -109,7 +112,7 @@ nuraft::ptr<nuraft::buffer> NuKeeperStateMachine::commit(const size_t log_idx, n
NuKeeperStorage::ResponsesForSessions responses_for_sessions;
{
std::lock_guard lock(storage_lock);
responses_for_sessions = storage.processRequest(request_for_session.request, request_for_session.session_id, log_idx);
responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx);
for (auto & response_for_session : responses_for_sessions)
responses_queue.push(response_for_session);
}
@ -133,7 +136,7 @@ bool NuKeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
{
std::lock_guard lock(storage_lock);
snapshot_manager.deserializeSnapshotFromBuffer(&storage, latest_snapshot_ptr);
std::tie(latest_snapshot_meta, storage) = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_ptr);
}
last_committed_idx = s.get_last_log_idx();
return true;
@ -157,7 +160,7 @@ void NuKeeperStateMachine::create_snapshot(
CreateSnapshotTask snapshot_task;
{
std::lock_guard lock(storage_lock);
snapshot_task.snapshot = std::make_shared<NuKeeperStorageSnapshot>(&storage, snapshot_meta_copy);
snapshot_task.snapshot = std::make_shared<NuKeeperStorageSnapshot>(storage.get(), snapshot_meta_copy);
}
snapshot_task.create_snapshot = [this, when_done] (NuKeeperStorageSnapshotPtr && snapshot)
@ -179,7 +182,7 @@ void NuKeeperStateMachine::create_snapshot(
{
/// Must do it with lock (clearing elements from list)
std::lock_guard lock(storage_lock);
storage.clearGarbageAfterSnapshot();
storage->clearGarbageAfterSnapshot();
/// Destroy snapshot with lock
snapshot.reset();
LOG_TRACE(log, "Cleared garbage after snapshot");
@ -214,7 +217,7 @@ void NuKeeperStateMachine::save_logical_snp_obj(
if (obj_id == 0)
{
std::lock_guard lock(storage_lock);
NuKeeperStorageSnapshot snapshot(&storage, s.get_last_log_idx());
NuKeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx());
cloned_buffer = snapshot_manager.serializeSnapshotToBuffer(snapshot);
}
else
@ -225,7 +228,28 @@ void NuKeeperStateMachine::save_logical_snp_obj(
nuraft::ptr<nuraft::buffer> snp_buf = s.serialize();
cloned_meta = nuraft::snapshot::deserialize(*snp_buf);
auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*cloned_buffer, s.get_last_log_idx());
/// Sometimes NuRaft can call save and create snapshots from different threads
/// at once. To avoid race conditions we serialize snapshots through snapshots_queue
/// TODO: make something better
CreateSnapshotTask snapshot_task;
std::shared_ptr<std::promise<void>> waiter = std::make_shared<std::promise<void>>();
auto future = waiter->get_future();
snapshot_task.snapshot = nullptr;
snapshot_task.create_snapshot = [this, waiter, cloned_buffer, log_idx = s.get_last_log_idx()] (NuKeeperStorageSnapshotPtr &&)
{
try
{
auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*cloned_buffer, log_idx);
LOG_DEBUG(log, "Saved snapshot {} to path {}", log_idx, result_path);
}
catch (...)
{
tryLogCurrentException(log);
}
waiter->set_value();
};
snapshots_queue.push(std::move(snapshot_task));
future.wait();
{
std::lock_guard lock(snapshots_lock);
@ -233,7 +257,6 @@ void NuKeeperStateMachine::save_logical_snp_obj(
latest_snapshot_meta = cloned_meta;
}
LOG_DEBUG(log, "Created snapshot {} with path {}", s.get_last_log_idx(), result_path);
obj_id++;
}
@ -271,7 +294,7 @@ void NuKeeperStateMachine::processReadRequest(const NuKeeperStorage::RequestForS
NuKeeperStorage::ResponsesForSessions responses;
{
std::lock_guard lock(storage_lock);
responses = storage.processRequest(request_for_session.request, request_for_session.session_id, std::nullopt);
responses = storage->processRequest(request_for_session.request, request_for_session.session_id, std::nullopt);
}
for (const auto & response : responses)
responses_queue.push(response);
@ -280,13 +303,13 @@ void NuKeeperStateMachine::processReadRequest(const NuKeeperStorage::RequestForS
std::unordered_set<int64_t> NuKeeperStateMachine::getDeadSessions()
{
std::lock_guard lock(storage_lock);
return storage.getDeadSessions();
return storage->getDeadSessions();
}
void NuKeeperStateMachine::shutdownStorage()
{
std::lock_guard lock(storage_lock);
storage.finalize();
storage->finalize();
}
}

View File

@ -52,7 +52,7 @@ public:
NuKeeperStorage & getStorage()
{
return storage;
return *storage;
}
void processReadRequest(const NuKeeperStorage::RequestForSession & request_for_session);
@ -68,7 +68,7 @@ private:
CoordinationSettingsPtr coordination_settings;
NuKeeperStorage storage;
NuKeeperStoragePtr storage;
NuKeeperSnapshotManager snapshot_manager;

View File

@ -233,7 +233,7 @@ struct NuKeeperStorageGetRequest final : public NuKeeperStorageRequest
struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest
{
using NuKeeperStorageRequest::NuKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & ephemerals, int64_t /*zxid*/, int64_t session_id) const override
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & ephemerals, int64_t /*zxid*/, int64_t /*session_id*/) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperRemoveResponse & response = dynamic_cast<Coordination::ZooKeeperRemoveResponse &>(*response_ptr);
@ -257,7 +257,12 @@ struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest
{
auto prev_node = it->value;
if (prev_node.stat.ephemeralOwner != 0)
ephemerals[session_id].erase(request.path);
{
auto ephemerals_it = ephemerals.find(prev_node.stat.ephemeralOwner);
ephemerals_it->second.erase(request.path);
if (ephemerals_it->second.empty())
ephemerals.erase(ephemerals_it);
}
auto child_basename = getBaseName(it->key);
container.updateValue(parentPath(request.path), [&child_basename] (NuKeeperStorage::Node & parent)
@ -271,10 +276,10 @@ struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest
container.erase(request.path);
undo = [prev_node, &container, &ephemerals, session_id, path = request.path, child_basename]
undo = [prev_node, &container, &ephemerals, path = request.path, child_basename]
{
if (prev_node.stat.ephemeralOwner != 0)
ephemerals[session_id].emplace(path);
ephemerals[prev_node.stat.ephemeralOwner].emplace(path);
container.insert(path, prev_node);
container.updateValue(parentPath(path), [&child_basename] (NuKeeperStorage::Node & parent)
@ -377,7 +382,6 @@ struct NuKeeperStorageSetRequest final : public NuKeeperStorageRequest
{
return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CHANGED);
}
};
struct NuKeeperStorageListRequest final : public NuKeeperStorageRequest
@ -641,6 +645,13 @@ NuKeeperStorage::ResponsesForSessions NuKeeperStorage::processRequest(const Coor
for (const auto & ephemeral_path : it->second)
{
container.erase(ephemeral_path);
container.updateValue(parentPath(ephemeral_path), [&ephemeral_path] (NuKeeperStorage::Node & parent)
{
--parent.stat.numChildren;
++parent.stat.cversion;
parent.children.erase(getBaseName(ephemeral_path));
});
auto responses = processWatchesImpl(ephemeral_path, watches, list_watches, Coordination::Event::DELETED);
results.insert(results.end(), responses.begin(), responses.end());
}

View File

@ -131,4 +131,6 @@ public:
}
};
using NuKeeperStoragePtr = std::unique_ptr<NuKeeperStorage>;
}

View File

@ -132,6 +132,10 @@ void NuKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigurati
coordination_settings->loadFromConfig("test_keeper_server.coordination_settings", config);
request_thread = ThreadFromGlobalPool([this] { requestThread(); });
responses_thread = ThreadFromGlobalPool([this] { responseThread(); });
snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); });
server = std::make_unique<NuKeeperServer>(myid, coordination_settings, config, responses_queue, snapshots_queue);
try
{
@ -148,10 +152,8 @@ void NuKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigurati
throw;
}
request_thread = ThreadFromGlobalPool([this] { requestThread(); });
responses_thread = ThreadFromGlobalPool([this] { responseThread(); });
session_cleaner_thread = ThreadFromGlobalPool([this] { sessionCleanerTask(); });
snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); });
LOG_DEBUG(log, "Dispatcher initialized");
}

View File

@ -897,25 +897,25 @@ TEST(CoordinationTest, TestStorageSnapshotSimple)
manager.serializeSnapshotBufferToDisk(*buf, 2);
EXPECT_TRUE(fs::exists("./snapshots/snapshot_2.bin"));
DB::NuKeeperStorage restored_storage(500);
auto debuf = manager.deserializeSnapshotBufferFromDisk(2);
manager.deserializeSnapshotFromBuffer(&restored_storage, debuf);
EXPECT_EQ(restored_storage.container.size(), 3);
EXPECT_EQ(restored_storage.container.getValue("/").children.size(), 1);
EXPECT_EQ(restored_storage.container.getValue("/hello").children.size(), 1);
EXPECT_EQ(restored_storage.container.getValue("/hello/somepath").children.size(), 0);
auto [snapshot_meta, restored_storage] = manager.deserializeSnapshotFromBuffer(debuf);
EXPECT_EQ(restored_storage.container.getValue("/").data, "");
EXPECT_EQ(restored_storage.container.getValue("/hello").data, "world");
EXPECT_EQ(restored_storage.container.getValue("/hello/somepath").data, "somedata");
EXPECT_EQ(restored_storage.session_id_counter, 7);
EXPECT_EQ(restored_storage.zxid, 2);
EXPECT_EQ(restored_storage.ephemerals.size(), 2);
EXPECT_EQ(restored_storage.ephemerals[3].size(), 1);
EXPECT_EQ(restored_storage.ephemerals[1].size(), 1);
EXPECT_EQ(restored_storage.session_and_timeout.size(), 2);
EXPECT_EQ(restored_storage->container.size(), 3);
EXPECT_EQ(restored_storage->container.getValue("/").children.size(), 1);
EXPECT_EQ(restored_storage->container.getValue("/hello").children.size(), 1);
EXPECT_EQ(restored_storage->container.getValue("/hello/somepath").children.size(), 0);
EXPECT_EQ(restored_storage->container.getValue("/").data, "");
EXPECT_EQ(restored_storage->container.getValue("/hello").data, "world");
EXPECT_EQ(restored_storage->container.getValue("/hello/somepath").data, "somedata");
EXPECT_EQ(restored_storage->session_id_counter, 7);
EXPECT_EQ(restored_storage->zxid, 2);
EXPECT_EQ(restored_storage->ephemerals.size(), 2);
EXPECT_EQ(restored_storage->ephemerals[3].size(), 1);
EXPECT_EQ(restored_storage->ephemerals[1].size(), 1);
EXPECT_EQ(restored_storage->session_and_timeout.size(), 2);
}
TEST(CoordinationTest, TestStorageSnapshotMoreWrites)
@ -946,15 +946,14 @@ TEST(CoordinationTest, TestStorageSnapshotMoreWrites)
manager.serializeSnapshotBufferToDisk(*buf, 50);
EXPECT_TRUE(fs::exists("./snapshots/snapshot_50.bin"));
DB::NuKeeperStorage restored_storage(500);
auto debuf = manager.deserializeSnapshotBufferFromDisk(50);
manager.deserializeSnapshotFromBuffer(&restored_storage, debuf);
auto [meta, restored_storage] = manager.deserializeSnapshotFromBuffer(debuf);
EXPECT_EQ(restored_storage.container.size(), 51);
EXPECT_EQ(restored_storage->container.size(), 51);
for (size_t i = 0; i < 50; ++i)
{
EXPECT_EQ(restored_storage.container.getValue("/hello_" + std::to_string(i)).data, "world_" + std::to_string(i));
EXPECT_EQ(restored_storage->container.getValue("/hello_" + std::to_string(i)).data, "world_" + std::to_string(i));
}
}
@ -987,14 +986,13 @@ TEST(CoordinationTest, TestStorageSnapshotManySnapshots)
EXPECT_TRUE(fs::exists("./snapshots/snapshot_250.bin"));
DB::NuKeeperStorage restored_storage(500);
manager.restoreFromLatestSnapshot(&restored_storage);
auto [meta, restored_storage] = manager.restoreFromLatestSnapshot();
EXPECT_EQ(restored_storage.container.size(), 251);
EXPECT_EQ(restored_storage->container.size(), 251);
for (size_t i = 0; i < 250; ++i)
{
EXPECT_EQ(restored_storage.container.getValue("/hello_" + std::to_string(i)).data, "world_" + std::to_string(i));
EXPECT_EQ(restored_storage->container.getValue("/hello_" + std::to_string(i)).data, "world_" + std::to_string(i));
}
}
@ -1040,12 +1038,11 @@ TEST(CoordinationTest, TestStorageSnapshotMode)
EXPECT_FALSE(storage.container.contains("/hello_" + std::to_string(i)));
}
DB::NuKeeperStorage restored_storage(500);
manager.restoreFromLatestSnapshot(&restored_storage);
auto [meta, restored_storage] = manager.restoreFromLatestSnapshot();
for (size_t i = 0; i < 50; ++i)
{
EXPECT_EQ(restored_storage.container.getValue("/hello_" + std::to_string(i)).data, "world_" + std::to_string(i));
EXPECT_EQ(restored_storage->container.getValue("/hello_" + std::to_string(i)).data, "world_" + std::to_string(i));
}
}
@ -1071,8 +1068,7 @@ TEST(CoordinationTest, TestStorageSnapshotBroken)
plain_buf.truncate(34);
plain_buf.sync();
DB::NuKeeperStorage restored_storage(500);
EXPECT_THROW(manager.restoreFromLatestSnapshot(&restored_storage), DB::Exception);
EXPECT_THROW(manager.restoreFromLatestSnapshot(), DB::Exception);
}
nuraft::ptr<nuraft::buffer> getBufferFromZKRequest(int64_t session_id, const Coordination::ZooKeeperRequestPtr & request)
@ -1236,6 +1232,37 @@ TEST(CoordinationTest, TestStateMachineAndLogStore)
}
}
TEST(CoordinationTest, TestEphemeralNodeRemove)
{
using namespace Coordination;
using namespace DB;
ChangelogDirTest snapshots("./snapshots");
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
ResponsesQueue queue;
SnapshotsQueue snapshots_queue{1};
auto state_machine = std::make_shared<NuKeeperStateMachine>(queue, snapshots_queue, "./snapshots", settings);
state_machine->init();
std::shared_ptr<ZooKeeperCreateRequest> request_c = std::make_shared<ZooKeeperCreateRequest>();
request_c->path = "/hello";
request_c->is_ephemeral = true;
auto entry_c = getLogEntryFromZKRequest(0, 1, request_c);
state_machine->commit(1, entry_c->get_buf());
const auto & storage = state_machine->getStorage();
EXPECT_EQ(storage.ephemerals.size(), 1);
std::shared_ptr<ZooKeeperRemoveRequest> request_d = std::make_shared<ZooKeeperRemoveRequest>();
request_d->path = "/hello";
/// Delete from other session
auto entry_d = getLogEntryFromZKRequest(0, 2, request_d);
state_machine->commit(2, entry_d->get_buf());
EXPECT_EQ(storage.ephemerals.size(), 0);
}
int main(int argc, char ** argv)
{
Poco::AutoPtr<Poco::ConsoleChannel> channel(new Poco::ConsoleChannel(std::cerr));

View File

@ -240,16 +240,10 @@ Poco::Timespan NuKeeperTCPHandler::receiveHandshake()
throw Exception("Unexpected protocol version: " + toString(protocol_version), ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
Coordination::read(last_zxid_seen, *in);
if (last_zxid_seen != 0)
throw Exception("Non zero last_zxid_seen is not supported", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
Coordination::read(timeout_ms, *in);
/// TODO Stop ignoring this value
Coordination::read(previous_session_id, *in);
if (previous_session_id != 0)
throw Exception("Non zero previous session id is not supported", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
Coordination::read(passwd, *in);
int8_t readonly;

13
tests/jepsen.nukeeper/.gitignore vendored Normal file
View File

@ -0,0 +1,13 @@
/target
/classes
/checkouts
profiles.clj
pom.xml
pom.xml.asc
*.jar
*.class
/.lein-*
/.nrepl-port
/.prepl-port
.hgignore
.hg/

View File

@ -0,0 +1,280 @@
Eclipse Public License - v 2.0
THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE
PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION
OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT.
1. DEFINITIONS
"Contribution" means:
a) in the case of the initial Contributor, the initial content
Distributed under this Agreement, and
b) in the case of each subsequent Contributor:
i) changes to the Program, and
ii) additions to the Program;
where such changes and/or additions to the Program originate from
and are Distributed by that particular Contributor. A Contribution
"originates" from a Contributor if it was added to the Program by
such Contributor itself or anyone acting on such Contributor's behalf.
Contributions do not include changes or additions to the Program that
are not Modified Works.
"Contributor" means any person or entity that Distributes the Program.
"Licensed Patents" mean patent claims licensable by a Contributor which
are necessarily infringed by the use or sale of its Contribution alone
or when combined with the Program.
"Program" means the Contributions Distributed in accordance with this
Agreement.
"Recipient" means anyone who receives the Program under this Agreement
or any Secondary License (as applicable), including Contributors.
"Derivative Works" shall mean any work, whether in Source Code or other
form, that is based on (or derived from) the Program and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship.
"Modified Works" shall mean any work in Source Code or other form that
results from an addition to, deletion from, or modification of the
contents of the Program, including, for purposes of clarity any new file
in Source Code form that contains any contents of the Program. Modified
Works shall not include works that contain only declarations,
interfaces, types, classes, structures, or files of the Program solely
in each case in order to link to, bind by name, or subclass the Program
or Modified Works thereof.
"Distribute" means the acts of a) distributing or b) making available
in any manner that enables the transfer of a copy.
"Source Code" means the form of a Program preferred for making
modifications, including but not limited to software source code,
documentation source, and configuration files.
"Secondary License" means either the GNU General Public License,
Version 2.0, or any later versions of that license, including any
exceptions or additional permissions as identified by the initial
Contributor.
2. GRANT OF RIGHTS
a) Subject to the terms of this Agreement, each Contributor hereby
grants Recipient a non-exclusive, worldwide, royalty-free copyright
license to reproduce, prepare Derivative Works of, publicly display,
publicly perform, Distribute and sublicense the Contribution of such
Contributor, if any, and such Derivative Works.
b) Subject to the terms of this Agreement, each Contributor hereby
grants Recipient a non-exclusive, worldwide, royalty-free patent
license under Licensed Patents to make, use, sell, offer to sell,
import and otherwise transfer the Contribution of such Contributor,
if any, in Source Code or other form. This patent license shall
apply to the combination of the Contribution and the Program if, at
the time the Contribution is added by the Contributor, such addition
of the Contribution causes such combination to be covered by the
Licensed Patents. The patent license shall not apply to any other
combinations which include the Contribution. No hardware per se is
licensed hereunder.
c) Recipient understands that although each Contributor grants the
licenses to its Contributions set forth herein, no assurances are
provided by any Contributor that the Program does not infringe the
patent or other intellectual property rights of any other entity.
Each Contributor disclaims any liability to Recipient for claims
brought by any other entity based on infringement of intellectual
property rights or otherwise. As a condition to exercising the
rights and licenses granted hereunder, each Recipient hereby
assumes sole responsibility to secure any other intellectual
property rights needed, if any. For example, if a third party
patent license is required to allow Recipient to Distribute the
Program, it is Recipient's responsibility to acquire that license
before distributing the Program.
d) Each Contributor represents that to its knowledge it has
sufficient copyright rights in its Contribution, if any, to grant
the copyright license set forth in this Agreement.
e) Notwithstanding the terms of any Secondary License, no
Contributor makes additional grants to any Recipient (other than
those set forth in this Agreement) as a result of such Recipient's
receipt of the Program under the terms of a Secondary License
(if permitted under the terms of Section 3).
3. REQUIREMENTS
3.1 If a Contributor Distributes the Program in any form, then:
a) the Program must also be made available as Source Code, in
accordance with section 3.2, and the Contributor must accompany
the Program with a statement that the Source Code for the Program
is available under this Agreement, and informs Recipients how to
obtain it in a reasonable manner on or through a medium customarily
used for software exchange; and
b) the Contributor may Distribute the Program under a license
different than this Agreement, provided that such license:
i) effectively disclaims on behalf of all other Contributors all
warranties and conditions, express and implied, including
warranties or conditions of title and non-infringement, and
implied warranties or conditions of merchantability and fitness
for a particular purpose;
ii) effectively excludes on behalf of all other Contributors all
liability for damages, including direct, indirect, special,
incidental and consequential damages, such as lost profits;
iii) does not attempt to limit or alter the recipients' rights
in the Source Code under section 3.2; and
iv) requires any subsequent distribution of the Program by any
party to be under a license that satisfies the requirements
of this section 3.
3.2 When the Program is Distributed as Source Code:
a) it must be made available under this Agreement, or if the
Program (i) is combined with other material in a separate file or
files made available under a Secondary License, and (ii) the initial
Contributor attached to the Source Code the notice described in
Exhibit A of this Agreement, then the Program may be made available
under the terms of such Secondary Licenses, and
b) a copy of this Agreement must be included with each copy of
the Program.
3.3 Contributors may not remove or alter any copyright, patent,
trademark, attribution notices, disclaimers of warranty, or limitations
of liability ("notices") contained within the Program from any copy of
the Program which they Distribute, provided that Contributors may add
their own appropriate notices.
4. COMMERCIAL DISTRIBUTION
Commercial distributors of software may accept certain responsibilities
with respect to end users, business partners and the like. While this
license is intended to facilitate the commercial use of the Program,
the Contributor who includes the Program in a commercial product
offering should do so in a manner which does not create potential
liability for other Contributors. Therefore, if a Contributor includes
the Program in a commercial product offering, such Contributor
("Commercial Contributor") hereby agrees to defend and indemnify every
other Contributor ("Indemnified Contributor") against any losses,
damages and costs (collectively "Losses") arising from claims, lawsuits
and other legal actions brought by a third party against the Indemnified
Contributor to the extent caused by the acts or omissions of such
Commercial Contributor in connection with its distribution of the Program
in a commercial product offering. The obligations in this section do not
apply to any claims or Losses relating to any actual or alleged
intellectual property infringement. In order to qualify, an Indemnified
Contributor must: a) promptly notify the Commercial Contributor in
writing of such claim, and b) allow the Commercial Contributor to control,
and cooperate with the Commercial Contributor in, the defense and any
related settlement negotiations. The Indemnified Contributor may
participate in any such claim at its own expense.
For example, a Contributor might include the Program in a commercial
product offering, Product X. That Contributor is then a Commercial
Contributor. If that Commercial Contributor then makes performance
claims, or offers warranties related to Product X, those performance
claims and warranties are such Commercial Contributor's responsibility
alone. Under this section, the Commercial Contributor would have to
defend claims against the other Contributors related to those performance
claims and warranties, and if a court requires any other Contributor to
pay any damages as a result, the Commercial Contributor must pay
those damages.
5. NO WARRANTY
EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT
PERMITTED BY APPLICABLE LAW, THE PROGRAM IS PROVIDED ON AN "AS IS"
BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR
IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF
TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR
PURPOSE. Each Recipient is solely responsible for determining the
appropriateness of using and distributing the Program and assumes all
risks associated with its exercise of rights under this Agreement,
including but not limited to the risks and costs of program errors,
compliance with applicable laws, damage to or loss of data, programs
or equipment, and unavailability or interruption of operations.
6. DISCLAIMER OF LIABILITY
EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT
PERMITTED BY APPLICABLE LAW, NEITHER RECIPIENT NOR ANY CONTRIBUTORS
SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST
PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE
EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGES.
7. GENERAL
If any provision of this Agreement is invalid or unenforceable under
applicable law, it shall not affect the validity or enforceability of
the remainder of the terms of this Agreement, and without further
action by the parties hereto, such provision shall be reformed to the
minimum extent necessary to make such provision valid and enforceable.
If Recipient institutes patent litigation against any entity
(including a cross-claim or counterclaim in a lawsuit) alleging that the
Program itself (excluding combinations of the Program with other software
or hardware) infringes such Recipient's patent(s), then such Recipient's
rights granted under Section 2(b) shall terminate as of the date such
litigation is filed.
All Recipient's rights under this Agreement shall terminate if it
fails to comply with any of the material terms or conditions of this
Agreement and does not cure such failure in a reasonable period of
time after becoming aware of such noncompliance. If all Recipient's
rights under this Agreement terminate, Recipient agrees to cease use
and distribution of the Program as soon as reasonably practicable.
However, Recipient's obligations under this Agreement and any licenses
granted by Recipient relating to the Program shall continue and survive.
Everyone is permitted to copy and distribute copies of this Agreement,
but in order to avoid inconsistency the Agreement is copyrighted and
may only be modified in the following manner. The Agreement Steward
reserves the right to publish new versions (including revisions) of
this Agreement from time to time. No one other than the Agreement
Steward has the right to modify this Agreement. The Eclipse Foundation
is the initial Agreement Steward. The Eclipse Foundation may assign the
responsibility to serve as the Agreement Steward to a suitable separate
entity. Each new version of the Agreement will be given a distinguishing
version number. The Program (including Contributions) may always be
Distributed subject to the version of the Agreement under which it was
received. In addition, after a new version of the Agreement is published,
Contributor may elect to Distribute the Program (including its
Contributions) under the new version.
Except as expressly stated in Sections 2(a) and 2(b) above, Recipient
receives no rights or licenses to the intellectual property of any
Contributor under this Agreement, whether expressly, by implication,
estoppel or otherwise. All rights in the Program not expressly granted
under this Agreement are reserved. Nothing in this Agreement is intended
to be enforceable by any entity that is not a Contributor or Recipient.
No third-party beneficiary rights are created under this Agreement.
Exhibit A - Form of Secondary Licenses Notice
"This Source Code may also be made available under the following
Secondary Licenses when the conditions for such availability set forth
in the Eclipse Public License, v. 2.0 are satisfied: GNU General Public
License as published by the Free Software Foundation, either version 2
of the License, or (at your option) any later version, with the GNU
Classpath Exception which is available at
https://www.gnu.org/software/classpath/license.html."
Simply including a copy of this Agreement, including this Exhibit A
is not sufficient to license the Source Code under Secondary Licenses.
If it is not possible or desirable to put the notice in a particular
file, then You may include the notice in a location (such as a LICENSE
file in a relevant directory) where a recipient would be likely to
look for such a notice.
You may add additional accurate notices of copyright ownership.

View File

@ -0,0 +1,155 @@
# Jepsen tests ClickHouse Keeper
A Clojure library designed to test ZooKeeper-like implementation inside ClickHouse.
## Test scenarios (workloads)
### CAS register
CAS Register has three operations: read number, write number, compare-and-swap number. This register is simulated as a single ZooKeeper node. Read transforms to ZooKeeper's `getData` request. Write transforms to the `set` request. Compare-and-swap implemented via `getData` + compare in code + `set` new value with `version` from `getData`.
In this test, we use a linearizable checker, so Jepsen validates that history was linearizable. One of the heaviest workloads.
Strictly requires `quorum_reads` to be true.
### Set
Set has two operations: add a number to set and read all values from set. This workload is simulated on a single ZooKeeper node with a string value that represents Clojure set data structure. Add operation very similar to compare-and-swap. We read string value from ZooKeeper node with `getData`, parse it to Clojure's set, add new value to the set and try to write it with the received version.
In this test, Jepsen validates that all successfully added values can be read. Generator for this workload performs only add operations until a timeout and after that tries to read set once.
### Unique IDs
In the Unique IDs workload we have only one operation: generate a new unique number. It's implemented using ZooKeeper's sequential nodes. For each generates request client just creates a new sequential node in ZooKeeper with a fixed prefix. After that cuts the prefix off from the returned path and parses the number from the rest part.
Jepsen checks that all returned IDs were unique.
### Counter
Counter workload has two operations: read counter value and add some number to the counter. Its implementation is quite weird. We add number `N` to the counter creating `N` sequential nodes in a single ZooKeeper transaction. Counter read implemented as `getChildren` ZooKeeper request and count of all returned nodes.
Jepsen checks that counter value lies in the interval of possible value. Strictly requires `quorum_reads` to be true.
### Total queue
Simulates an unordered queue with three operations: enqueue number, dequeue, and drain. Enqueue operation uses `create` request with node name equals to number. `Dequeue` operation is more interesting. We list (`getChildren`) all nodes and remember the parent node version. After that we choose the smallest one and prepare the transaction: `check` parent node version + set an empty value to parent node + delete smalled child node. Drain operation is just `getChildren` on the parent path.
Jepsen checks that all enqueued values were dequeued or drained. Duplicates are allowed because Jepsen doesn't know the value of the unknown-status (`:info`) dequeue operation. So when we try to `dequeue` some element we should return it even if our delete transaction failed with `Connection loss` error.
### Linear queue
Same with the total queue, but without drain operation. Checks linearizability between enqueue and dequeue. Sometimes consume more than 10GB during validation even for very short histories.
## Nemesis
We use almost all standard nemeses with small changes for our storage.
### Random node killer (random-node-killer)
Sleep 5 seconds, kills random node, sleep for 5 seconds, and starts it back.
### All nodes killer (all-nodes-killer)
Kill all nodes at once, sleep for 5 seconds, and starts them back.
### Simple partitioner (simple-partitioner)
Partition one node from others using iptables. No one can see the victim and the victim cannot see anybody.
### Random node stop (random-node-hammer-time)
Send `SIGSTOP` to the random node. Sleep 5 seconds. Send `SIGCONT`.
### All nodes stop (all-nodes-hammer-time)
Send `SIGSTOP` to all nodes. Sleep 5 seconds. Send `SIGCONT`.
### Logs corruptor (logs-corruptor)
Corrupts latest log (change one random byte) in `clickhouse_path/coordination/logs`. Restarts nodes.
### Snapshots corruptor (snapshots-corruptor)
Corrupts latest snapshot (change one random byte) in `clickhouse_path/coordination/snapshots`. Restarts nodes.
### Logs and snapshots corruptor (logs-and-snapshots-corruptor)
Corrupts both the latest log and snapshot. Restarts node.
### Drop data corruptor (drop-data-corruptor)
Drop all data from `clickhouse_path/coordinator`. Restarts node.
### Bridge partitioner (bridge-partitioner)
Two nodes don't see each other but can see another node. The last node can see both.
### Blind node partitioner (blind-node-partitioner)
One of the nodes cannot see another, but they can see it.
### Blind others partitioner (blind-others-partitioner)
Two nodes don't see one node but it can see both.
## Usage
### Dependencies
- leiningen (https://leiningen.org/)
- clojure (https://clojure.org/)
- jvm
### Options for `lein run`
- `test` Run a single test.
- `test-all` Run all available tests from tests-set.
- `-w (--workload)` One of the workloads. Option for a single `test`.
- `--nemesis` One of nemeses. Option for a single `test`.
- `-q (--quorum)` Run test with quorum reads.
- `-r (--rate)` How many operations per second Jepsen will generate in a single thread.
- `-s (--snapshot-distance)` ClickHouse Keeper setting. How often we will create a new snapshot.
- `--stale-log-gap` ClickHosue Keeper setting. A leader will send a snapshot instead of a log to this node if it's committed index less than leaders - this setting value.
- `--reserved-log-items` ClickHouse Keeper setting. How many log items to keep after the snapshot.
- `--ops-per-key` Option for CAS register workload. Total ops that will be generated for a single register.
- `--lightweight-run` Run some lightweight tests without linearizability checks. Option for `tests-all` run.
- `--reuse-binary` Don't download clickhouse binary if it already exists on the node.
- `--clickhouse-source` URL to clickhouse `.deb`, `.tgz` or binary.
- `--time-limit` (in seconds) How long Jepsen will generate new operations.
- `--nodes-file` File with nodes for SSH. Newline separated.
- `--username` SSH username for nodes.
- `--password` SSH password for nodes.
- `--concurrency` How many threads Jepsen will use for concurrent requests.
- `--test-count` How many times to run a single test or how many tests to run from the tests set.
### Examples:
1. Run `Set` workload with `logs-and-snapshots-corruptor` ten times:
```sh
$ lein run test --nodes-file nodes.txt --username root --password '' --time-limit 30 --concurrency 50 -r 50 --workload set --nemesis logs-and-snapshots-corruptor --clickhouse-source 'https://clickhouse-builds.s3.yandex.net/someurl/clickhouse-common-static_21.4.1.6321_amd64.deb' -q --test-count 10 --reuse-binary
```
2. Run ten random tests from `lightweight-run` with some custom Keeper settings:
``` sh
$ lein run test-all --nodes-file nodes.txt --username root --password '' --time-limit 30 --concurrency 50 -r 50 --snapshot-distance 100 --stale-log-gap 100 --reserved-log-items 10 --lightweight-run --clickhouse-source 'someurl' -q --reuse-binary --test-count 10
```
## License
Copyright © 2021 FIXME
This program and the accompanying materials are made available under the
terms of the Eclipse Public License 2.0 which is available at
http://www.eclipse.org/legal/epl-2.0.
This Source Code may also be made available under the following Secondary
Licenses when the conditions for such availability set forth in the Eclipse
Public License, v. 2.0 are satisfied: GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or (at your
option) any later version, with the GNU Classpath Exception which is available
at https://www.gnu.org/software/classpath/license.html.

View File

@ -0,0 +1,3 @@
# Introduction to jepsen.nukeeper
TODO: write [great documentation](http://jacobian.org/writing/what-to-write/)

View File

@ -0,0 +1,13 @@
(defproject jepsen.nukeeper "0.1.0-SNAPSHOT"
:injections [(.. System (setProperty "zookeeper.request.timeout" "10000"))]
:description "A jepsen tests for ClickHouse NuKeeper"
:url "https://clickhouse.tech/"
:license {:name "EPL-2.0"
:url "https://www.eclipse.org/legal/epl-2.0/"}
:main jepsen.nukeeper.main
:plugins [[lein-cljfmt "0.7.0"]]
:dependencies [[org.clojure/clojure "1.10.1"]
[jepsen "0.2.3"]
[zookeeper-clj "0.9.4"]
[org.apache.zookeeper/zookeeper "3.6.1" :exclusions [org.slf4j/slf4j-log4j12]]]
:repl-options {:init-ns jepsen.nukeeper.main})

View File

@ -0,0 +1 @@
../../../programs/server/config.xml

View File

@ -0,0 +1,3 @@
<yandex>
<listen_host>::</listen_host>
</yandex>

View File

@ -0,0 +1,36 @@
<yandex>
<test_keeper_server>
<tcp_port>9181</tcp_port>
<server_id>{id}</server_id>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
<force_sync>false</force_sync>
<startup_timeout>120000</startup_timeout>
<raft_logs_level>trace</raft_logs_level>
<quorum_reads>{quorum_reads}</quorum_reads>
<snapshot_distance>{snapshot_distance}</snapshot_distance>
<stale_log_gap>{stale_log_gap}</stale_log_gap>
<reserved_log_items>{reserved_log_items}</reserved_log_items>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>{srv1}</hostname>
<port>9444</port>
</server>
<server>
<id>2</id>
<hostname>{srv2}</hostname>
<port>9444</port>
</server>
<server>
<id>3</id>
<hostname>{srv3}</hostname>
<port>9444</port>
</server>
</raft_configuration>
</test_keeper_server>
</yandex>

View File

@ -0,0 +1 @@
../../../programs/server/users.xml

View File

@ -0,0 +1,18 @@
(ns jepsen.nukeeper.constants)
(def common-prefix "/home/robot-clickhouse")
(def binary-name "clickhouse")
(def binary-path (str common-prefix "/" binary-name))
(def pid-file-path (str common-prefix "/clickhouse.pid"))
(def data-dir (str common-prefix "/db"))
(def logs-dir (str common-prefix "/logs"))
(def configs-dir (str common-prefix "/config"))
(def sub-configs-dir (str configs-dir "/config.d"))
(def coordination-data-dir (str data-dir "/coordination"))
(def coordination-snapshots-dir (str coordination-data-dir "/snapshots"))
(def coordination-logs-dir (str coordination-data-dir "/logs"))
(def stderr-file (str logs-dir "/stderr.log"))

View File

@ -0,0 +1,50 @@
(ns jepsen.nukeeper.counter
(:require
[clojure.tools.logging :refer :all]
[jepsen
[checker :as checker]
[client :as client]
[generator :as gen]]
[jepsen.nukeeper.utils :refer :all]
[zookeeper :as zk])
(:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException)))
(defn r [_ _] {:type :invoke, :f :read})
(defn add [_ _] {:type :invoke, :f :add, :value (rand-int 5)})
(defrecord CounterClient [conn nodename]
client/Client
(open! [this test node]
(assoc
(assoc this
:conn (zk-connect node 9181 30000))
:nodename node))
(setup! [this test])
(invoke! [this test op]
(case (:f op)
:read (exec-with-retries 30 (fn []
(assoc op
:type :ok
:value (count (zk-list conn "/")))))
:add (try
(do
(zk-multi-create-many-seq-nodes conn "/seq-" (:value op))
(assoc op :type :ok))
(catch Exception _ (assoc op :type :info, :error :connect-error)))))
(teardown! [_ test])
(close! [_ test]
(zk/close conn)))
(defn workload
"A generator, client, and checker for a set test."
[opts]
{:client (CounterClient. nil nil)
:checker (checker/counter)
:generator (->> (range)
(map (fn [x]
(->> (gen/mix [r add])))))
:final-generator (gen/once {:type :invoke, :f :read, :value nil})})

View File

@ -0,0 +1,128 @@
(ns jepsen.nukeeper.db
(:require [clojure.tools.logging :refer :all]
[jepsen
[control :as c]
[db :as db]
[util :as util :refer [meh]]]
[jepsen.nukeeper.constants :refer :all]
[jepsen.nukeeper.utils :refer :all]
[clojure.java.io :as io]
[jepsen.control.util :as cu]
[jepsen.os.ubuntu :as ubuntu]))
(defn get-clickhouse-sky
[version]
(c/exec :sky :get :-d common-prefix :-N :Backbone version)
(str common-prefix "/clickhouse"))
(defn get-clickhouse-url
[url]
(let [download-result (cu/wget! url)]
(do (c/exec :mv download-result common-prefix)
(str common-prefix "/" download-result))))
(defn download-clickhouse
[source]
(info "Downloading clickhouse from" source)
(cond
(clojure.string/starts-with? source "rbtorrent:") (get-clickhouse-sky source)
(clojure.string/starts-with? source "http") (get-clickhouse-url source)
:else (throw (Exception. (str "Don't know how to download clickhouse from" source)))))
(defn unpack-deb
[path]
(do
(c/exec :dpkg :-x path common-prefix)
(c/exec :rm :-f path)
(c/exec :mv (str common-prefix "/usr/bin/clickhouse") common-prefix)
(c/exec :rm :-rf (str common-prefix "/usr") (str common-prefix "/etc"))))
(defn unpack-tgz
[path]
(do
(c/exec :mkdir :-p (str common-prefix "/unpacked"))
(c/exec :tar :-zxvf path :-C (str common-prefix "/unpacked"))
(c/exec :rm :-f path)
(let [subdir (c/exec :ls (str common-prefix "/unpacked"))]
(c/exec :mv (str common-prefix "/unpacked/" subdir "/usr/bin/clickhouse") common-prefix)
(c/exec :rm :-fr (str common-prefix "/unpacked")))))
(defn chmod-binary
[path]
(c/exec :chmod :+x path))
(defn install-downloaded-clickhouse
[path]
(cond
(clojure.string/ends-with? path ".deb") (unpack-deb path)
(clojure.string/ends-with? path ".tgz") (unpack-tgz path)
(clojure.string/ends-with? path "clickhouse") (chmod-binary path)
:else (throw (Exception. (str "Don't know how to install clickhouse from path" path)))))
(defn prepare-dirs
[]
(do
(c/exec :mkdir :-p common-prefix)
(c/exec :mkdir :-p data-dir)
(c/exec :mkdir :-p logs-dir)
(c/exec :mkdir :-p configs-dir)
(c/exec :mkdir :-p sub-configs-dir)
(c/exec :touch stderr-file)
(c/exec :chown :-R :root common-prefix)))
(defn cluster-config
[test node config-template]
(let [nodes (:nodes test)
replacement-map {#"\{srv1\}" (get nodes 0)
#"\{srv2\}" (get nodes 1)
#"\{srv3\}" (get nodes 2)
#"\{id\}" (str (inc (.indexOf nodes node)))
#"\{quorum_reads\}" (str (boolean (:quorum test)))
#"\{snapshot_distance\}" (str (:snapshot-distance test))
#"\{stale_log_gap\}" (str (:stale-log-gap test))
#"\{reserved_log_items\}" (str (:reserved-log-items test))}]
(reduce #(clojure.string/replace %1 (get %2 0) (get %2 1)) config-template replacement-map)))
(defn install-configs
[test node]
(c/exec :echo (slurp (io/resource "config.xml")) :> (str configs-dir "/config.xml"))
(c/exec :echo (slurp (io/resource "users.xml")) :> (str configs-dir "/users.xml"))
(c/exec :echo (slurp (io/resource "listen.xml")) :> (str sub-configs-dir "/listen.xml"))
(c/exec :echo (cluster-config test node (slurp (io/resource "test_keeper_config.xml"))) :> (str sub-configs-dir "/test_keeper_config.xml")))
(defn db
[version reuse-binary]
(reify db/DB
(setup! [_ test node]
(c/su
(do
(info "Preparing directories")
(prepare-dirs)
(if (or (not (cu/exists? binary-path)) (not reuse-binary))
(do (info "Downloading clickhouse")
(install-downloaded-clickhouse (download-clickhouse version)))
(info "Binary already exsist on path" binary-path "skipping download"))
(info "Installing configs")
(install-configs test node)
(info "Starting server")
(start-clickhouse! node test)
(info "ClickHouse started"))))
(teardown! [_ test node]
(info node "Tearing down clickhouse")
(kill-clickhouse! node test)
(c/su
(if (not reuse-binary)
(c/exec :rm :-rf binary-path))
(c/exec :rm :-rf pid-file-path)
(c/exec :rm :-rf data-dir)
(c/exec :rm :-rf logs-dir)
(c/exec :rm :-rf configs-dir)))
db/LogFiles
(log-files [_ test node]
(c/su
(kill-clickhouse! node test)
(c/cd data-dir
(c/exec :tar :czf "coordination.tar.gz" "coordination")))
[stderr-file (str logs-dir "/clickhouse-server.log") (str data-dir "/coordination.tar.gz")])))

View File

@ -0,0 +1,159 @@
(ns jepsen.nukeeper.main
(:require [clojure.tools.logging :refer :all]
[jepsen.nukeeper.utils :refer :all]
[clojure.pprint :refer [pprint]]
[jepsen.nukeeper.set :as set]
[jepsen.nukeeper.db :refer :all]
[jepsen.nukeeper.nemesis :as custom-nemesis]
[jepsen.nukeeper.register :as register]
[jepsen.nukeeper.unique :as unique]
[jepsen.nukeeper.queue :as queue]
[jepsen.nukeeper.counter :as counter]
[jepsen.nukeeper.constants :refer :all]
[clojure.string :as str]
[jepsen
[checker :as checker]
[cli :as cli]
[client :as client]
[control :as c]
[db :as db]
[nemesis :as nemesis]
[generator :as gen]
[independent :as independent]
[tests :as tests]
[util :as util :refer [meh]]]
[jepsen.control.util :as cu]
[jepsen.os.ubuntu :as ubuntu]
[jepsen.checker.timeline :as timeline]
[clojure.java.io :as io]
[zookeeper.data :as data]
[zookeeper :as zk])
(:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException)
(ch.qos.logback.classic Level)
(org.slf4j Logger LoggerFactory)))
(def workloads
"A map of workload names to functions that construct workloads, given opts."
{"set" set/workload
"register" register/workload
"unique-ids" unique/workload
"counter" counter/workload
"total-queue" queue/total-workload
"linear-queue" queue/linear-workload})
(def cli-opts
"Additional command line options."
[["-w" "--workload NAME" "What workload should we run?"
:default "set"
:validate [workloads (cli/one-of workloads)]]
[nil "--nemesis NAME" "Which nemesis will poison our lives?"
:default "random-node-killer"
:validate [custom-nemesis/custom-nemesises (cli/one-of custom-nemesis/custom-nemesises)]]
["-q" "--quorum" "Use quorum reads, instead of reading from any primary."]
["-r" "--rate HZ" "Approximate number of requests per second, per thread."
:default 10
:parse-fn read-string
:validate [#(and (number? %) (pos? %)) "Must be a positive number"]]
["-s" "--snapshot-distance NUM" "Number of log entries to create snapshot"
:default 10000
:parse-fn read-string
:validate [#(and (number? %) (pos? %)) "Must be a positive number"]]
[nil "--stale-log-gap NUM" "Number of log entries to send snapshot instead of separate logs"
:default 1000
:parse-fn read-string
:validate [#(and (number? %) (pos? %)) "Must be a positive number"]]
[nil "--reserved-log-items NUM" "Number of log entries to keep after snapshot"
:default 1000
:parse-fn read-string
:validate [#(and (number? %) (pos? %)) "Must be a positive number"]]
[nil "--ops-per-key NUM" "Maximum number of operations on any given key."
:default 100
:parse-fn parse-long
:validate [pos? "Must be a positive integer."]]
[nil, "--lightweight-run" "Subset of workloads/nemesises which is simple to validate"]
[nil, "--reuse-binary" "Use already downloaded binary if it exists, don't remove it on shutdown"]
["-c" "--clickhouse-source URL" "URL for clickhouse deb or tgz package"
:default "https://clickhouse-builds.s3.yandex.net/21677/ef82333089156907a0979669d9374c2e18daabe5/clickhouse_build_check/clang-11_relwithdebuginfo_none_bundled_unsplitted_disable_False_deb/clickhouse-common-static_21.4.1.6313_amd64.deb"]])
(defn nukeeper-test
"Given an options map from the command line runner (e.g. :nodes, :ssh,
:concurrency, ...), constructs a test map."
[opts]
(info "Test opts\n" (with-out-str (pprint opts)))
(let [quorum (boolean (:quorum opts))
workload ((get workloads (:workload opts)) opts)
current-nemesis (get custom-nemesis/custom-nemesises (:nemesis opts))]
(merge tests/noop-test
opts
{:name (str "clickhouse-keeper-quorum=" quorum "-" (name (:workload opts)) "-" (name (:nemesis opts)))
:os ubuntu/os
:db (db (:clickhouse-source opts) (boolean (:reuse-binary opts)))
:pure-generators true
:client (:client workload)
:nemesis (:nemesis current-nemesis)
:checker (checker/compose
{:perf (checker/perf)
:workload (:checker workload)})
:generator (gen/phases
(->> (:generator workload)
(gen/stagger (/ (:rate opts)))
(gen/nemesis (:generator current-nemesis))
(gen/time-limit (:time-limit opts)))
(gen/log "Healing cluster")
(gen/nemesis (gen/once {:type :info, :f :stop}))
(gen/log "Waiting for recovery")
(gen/sleep 10)
(gen/clients (:final-generator workload)))})))
(def all-nemesises (keys custom-nemesis/custom-nemesises))
(def all-workloads (keys workloads))
(def lightweight-workloads ["set" "unique-ids" "counter" "total-queue"])
(def useful-nemesises ["random-node-killer"
"simple-partitioner"
"all-nodes-hammer-time"
; can lead to a very rare data loss https://github.com/eBay/NuRaft/issues/185
;"logs-and-snapshots-corruptor"
;"drop-data-corruptor"
"bridge-partitioner"
"blind-node-partitioner"
"blind-others-partitioner"])
(defn cart [colls]
(if (empty? colls)
'(())
(for [more (cart (rest colls))
x (first colls)]
(cons x more))))
(defn all-test-options
"Takes base cli options, a collection of nemeses, workloads, and a test count,
and constructs a sequence of test options."
[cli worload-nemeseis-collection]
(take (:test-count cli)
(shuffle (for [[workload nemesis] worload-nemeseis-collection]
(assoc cli
:nemesis nemesis
:workload workload
:test-count 1)))))
(defn all-tests
"Turns CLI options into a sequence of tests."
[test-fn cli]
(if (boolean (:lightweight-run cli))
(map test-fn (all-test-options cli (cart [lightweight-workloads useful-nemesises])))
(map test-fn (all-test-options cli (cart [all-workloads all-nemesises])))))
(defn -main
"Handles command line arguments. Can either run a test, or a web server for
browsing results."
[& args]
(.setLevel
(LoggerFactory/getLogger "org.apache.zookeeper") Level/OFF)
(cli/run! (merge (cli/single-test-cmd {:test-fn nukeeper-test
:opt-spec cli-opts})
(cli/test-all-cmd {:tests-fn (partial all-tests nukeeper-test)
:opt-spec cli-opts})
(cli/serve-cmd))
args))

View File

@ -0,0 +1,160 @@
(ns jepsen.nukeeper.nemesis
(:require
[clojure.tools.logging :refer :all]
[jepsen
[nemesis :as nemesis]
[control :as c]
[generator :as gen]]
[jepsen.nukeeper.constants :refer :all]
[jepsen.nukeeper.utils :refer :all]))
(defn random-node-killer-nemesis
[]
(nemesis/node-start-stopper
rand-nth
(fn start [test node] (kill-clickhouse! node test))
(fn stop [test node] (start-clickhouse! node test))))
(defn all-nodes-killer-nemesis
[]
(nemesis/node-start-stopper
identity
(fn start [test node] (kill-clickhouse! node test))
(fn stop [test node] (start-clickhouse! node test))))
(defn random-node-hammer-time-nemesis
[]
(nemesis/hammer-time "clickhouse"))
(defn all-nodes-hammer-time-nemesis
[]
(nemesis/hammer-time identity "clickhouse"))
(defn select-last-file
[path]
(last (clojure.string/split
(c/exec :find path :-type :f :-printf "%T+ %p\n" :| :grep :-v :tmp_ :| :sort :| :awk "{print $2}")
#"\n")))
(defn random-file-pos
[fname]
(let [fsize (Integer/parseInt (c/exec :du :-b fname :| :cut :-f1))]
(rand-int fsize)))
(defn corrupt-file
[fname]
(if (not (empty? fname))
(do
(info "Corrupting" fname)
(c/exec :dd "if=/dev/zero" (str "of=" fname) "bs=1" "count=1" (str "seek=" (random-file-pos fname)) "conv=notrunc"))
(info "Nothing to corrupt")))
(defn corruptor-nemesis
[path corruption-op]
(reify nemesis/Nemesis
(setup! [this test] this)
(invoke! [this test op]
(cond (= (:f op) :corrupt)
(let [nodes (list (rand-nth (:nodes test)))]
(info "Corruption on node" nodes)
(c/on-nodes test nodes
(fn [test node]
(c/su
(kill-clickhouse! node test)
(corruption-op path)
(start-clickhouse! node test))))
(assoc op :type :info, :value :corrupted))
:else (do (c/on-nodes test (:nodes test)
(fn [test node]
(c/su
(start-clickhouse! node test))))
(assoc op :type :info, :value :done))))
(teardown! [this test])))
(defn logs-corruption-nemesis
[]
(corruptor-nemesis coordination-logs-dir #(corrupt-file (select-last-file %1))))
(defn snapshots-corruption-nemesis
[]
(corruptor-nemesis coordination-snapshots-dir #(corrupt-file (select-last-file %1))))
(defn logs-and-snapshots-corruption-nemesis
[]
(corruptor-nemesis coordination-data-dir (fn [path]
(do
(corrupt-file (select-last-file (str path "/snapshots")))
(corrupt-file (select-last-file (str path "/logs")))))))
(defn drop-all-corruption-nemesis
[]
(corruptor-nemesis coordination-data-dir (fn [path]
(c/exec :rm :-fr path))))
(defn partition-bridge-nemesis
[]
(nemesis/partitioner nemesis/bridge))
(defn blind-node
[nodes]
(let [[[victim] others] (nemesis/split-one nodes)]
{victim (into #{} others)}))
(defn blind-node-partition-nemesis
[]
(nemesis/partitioner blind-node))
(defn blind-others
[nodes]
(let [[[victim] others] (nemesis/split-one nodes)]
(into {} (map (fn [node] [node #{victim}])) others)))
(defn blind-others-partition-nemesis
[]
(nemesis/partitioner blind-others))
(defn network-non-symmetric-nemesis
[]
(nemesis/partitioner nemesis/bridge))
(defn start-stop-generator
[time-corrupt time-ok]
(->>
(cycle [(gen/sleep time-ok)
{:type :info, :f :start}
(gen/sleep time-corrupt)
{:type :info, :f :stop}])))
(defn corruption-generator
[]
(->>
(cycle [(gen/sleep 5)
{:type :info, :f :corrupt}])))
(def custom-nemesises
{"random-node-killer" {:nemesis (random-node-killer-nemesis)
:generator (start-stop-generator 5 5)}
"all-nodes-killer" {:nemesis (all-nodes-killer-nemesis)
:generator (start-stop-generator 1 10)}
"simple-partitioner" {:nemesis (nemesis/partition-random-halves)
:generator (start-stop-generator 5 5)}
"random-node-hammer-time" {:nemesis (random-node-hammer-time-nemesis)
:generator (start-stop-generator 5 5)}
"all-nodes-hammer-time" {:nemesis (all-nodes-hammer-time-nemesis)
:generator (start-stop-generator 1 10)}
"logs-corruptor" {:nemesis (logs-corruption-nemesis)
:generator (corruption-generator)}
"snapshots-corruptor" {:nemesis (snapshots-corruption-nemesis)
:generator (corruption-generator)}
"logs-and-snapshots-corruptor" {:nemesis (logs-and-snapshots-corruption-nemesis)
:generator (corruption-generator)}
"drop-data-corruptor" {:nemesis (drop-all-corruption-nemesis)
:generator (corruption-generator)}
"bridge-partitioner" {:nemesis (partition-bridge-nemesis)
:generator (start-stop-generator 5 5)}
"blind-node-partitioner" {:nemesis (blind-node-partition-nemesis)
:generator (start-stop-generator 5 5)}
"blind-others-partitioner" {:nemesis (blind-others-partition-nemesis)
:generator (start-stop-generator 5 5)}})

View File

@ -0,0 +1,79 @@
(ns jepsen.nukeeper.queue
(:require
[clojure.tools.logging :refer :all]
[jepsen
[checker :as checker]
[client :as client]
[generator :as gen]]
[knossos.model :as model]
[jepsen.checker.timeline :as timeline]
[jepsen.nukeeper.utils :refer :all]
[zookeeper :as zk])
(:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException)))
(defn enqueue [val _ _] {:type :invoke, :f :enqueue :value val})
(defn dequeue [_ _] {:type :invoke, :f :dequeue})
(defrecord QueueClient [conn nodename]
client/Client
(open! [this test node]
(assoc
(assoc this
:conn (zk-connect node 9181 30000))
:nodename node))
(setup! [this test])
(invoke! [this test op]
(case (:f op)
:enqueue (try
(do
(zk-create-if-not-exists conn (str "/" (:value op)) "")
(assoc op :type :ok))
(catch Exception _ (assoc op :type :info, :error :connect-error)))
:dequeue
(try
(let [result (zk-multi-delete-first-child conn "/")]
(if (not (nil? result))
(assoc op :type :ok :value result)
(assoc op :type :fail :value result)))
(catch Exception _ (assoc op :type :info, :error :connect-error)))
:drain
; drain via delete is to long, just list all nodes
(exec-with-retries 30 (fn []
(zk-sync conn)
(assoc op :type :ok :value (into #{} (map #(str %1) (zk-list conn "/"))))))))
(teardown! [_ test])
(close! [_ test]
(zk/close conn)))
(defn sorted-str-range
[n]
(sort (map (fn [v] (str v)) (take n (range)))))
(defn total-workload
"A generator, client, and checker for a set test."
[opts]
{:client (QueueClient. nil nil)
:checker (checker/compose
{:total-queue (checker/total-queue)
:timeline (timeline/html)})
:generator (->> (sorted-str-range 50000)
(map (fn [x]
(rand-nth [{:type :invoke, :f :enqueue :value x}
{:type :invoke, :f :dequeue}]))))
:final-generator (gen/once {:type :invoke, :f :drain, :value nil})})
(defn linear-workload
[opts]
{:client (QueueClient. nil nil)
:checker (checker/compose
{:linear (checker/linearizable {:model (model/unordered-queue)
:algorithm :linear})
:timeline (timeline/html)})
:generator (->> (sorted-str-range 10000)
(map (fn [x]
(rand-nth [{:type :invoke, :f :enqueue :value x}
{:type :invoke, :f :dequeue}]))))})

View File

@ -0,0 +1,64 @@
(ns jepsen.nukeeper.register
(:require [jepsen
[checker :as checker]
[client :as client]
[independent :as independent]
[generator :as gen]]
[jepsen.checker.timeline :as timeline]
[knossos.model :as model]
[jepsen.nukeeper.utils :refer :all]
[zookeeper :as zk])
(:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException)))
(defn r [_ _] {:type :invoke, :f :read, :value nil})
(defn w [_ _] {:type :invoke, :f :write, :value (rand-int 5)})
(defn cas [_ _] {:type :invoke, :f :cas, :value [(rand-int 5) (rand-int 5)]})
(defrecord RegisterClient [conn]
client/Client
(open! [this test node]
(assoc this :conn (zk-connect node 9181 30000)))
(setup! [this test]
(zk-create-range conn 300)) ; 300 nodes to be sure
(invoke! [_ test op]
(let [[k v] (:value op)
zk-k (zk-path k)]
(case (:f op)
:read (try
(assoc op :type :ok, :value (independent/tuple k (parse-long (:data (zk-get-str conn zk-k)))))
(catch Exception _ (assoc op :type :fail, :error :connect-error)))
:write (try
(do (zk-set conn zk-k v)
(assoc op :type :ok))
(catch Exception _ (assoc op :type :info, :error :connect-error)))
:cas (try
(let [[old new] v]
(assoc op :type (if (zk-cas conn zk-k old new)
:ok
:fail)))
(catch KeeperException$BadVersionException _ (assoc op :type :fail, :error :bad-version))
(catch Exception _ (assoc op :type :info, :error :connect-error))))))
(teardown! [this test])
(close! [_ test]
(zk/close conn)))
(defn workload
"Tests linearizable reads, writes, and compare-and-set operations on
independent keys."
[opts]
{:client (RegisterClient. nil)
:checker (independent/checker
(checker/compose
{:linear (checker/linearizable {:model (model/cas-register)
:algorithm :linear})
:timeline (timeline/html)}))
:generator (independent/concurrent-generator
10
(range)
(fn [k]
(->> (gen/mix [r w cas])
(gen/limit (:ops-per-key opts)))))})

View File

@ -0,0 +1,49 @@
(ns jepsen.nukeeper.set
(:require
[clojure.tools.logging :refer :all]
[jepsen
[checker :as checker]
[client :as client]
[generator :as gen]]
[jepsen.nukeeper.utils :refer :all]
[zookeeper :as zk])
(:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException)))
(defrecord SetClient [k conn nodename]
client/Client
(open! [this test node]
(assoc
(assoc this
:conn (zk-connect node 9181 30000))
:nodename node))
(setup! [this test]
(zk-create-if-not-exists conn k "#{}"))
(invoke! [this test op]
(case (:f op)
:read (exec-with-retries 30 (fn []
(zk-sync conn)
(assoc op
:type :ok
:value (read-string (:data (zk-get-str conn k))))))
:add (try
(do
(zk-add-to-set conn k (:value op))
(assoc op :type :ok))
(catch KeeperException$BadVersionException _ (assoc op :type :fail, :error :bad-version))
(catch Exception _ (assoc op :type :info, :error :connect-error)))))
(teardown! [_ test])
(close! [_ test]
(zk/close conn)))
(defn workload
"A generator, client, and checker for a set test."
[opts]
{:client (SetClient. "/a-set" nil nil)
:checker (checker/set)
:generator (->> (range)
(map (fn [x] {:type :invoke, :f :add, :value x})))
:final-generator (gen/once {:type :invoke, :f :read, :value nil})})

View File

@ -0,0 +1,42 @@
(ns jepsen.nukeeper.unique
(:require
[clojure.tools.logging :refer :all]
[jepsen
[checker :as checker]
[client :as client]
[generator :as gen]]
[jepsen.nukeeper.utils :refer :all]
[zookeeper :as zk])
(:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException)))
(defrecord UniqueClient [conn nodename]
client/Client
(open! [this test node]
(assoc
(assoc this
:conn (zk-connect node 9181 30000))
:nodename node))
(setup! [this test])
(invoke! [this test op]
(case
:generate
(try
(let [result-path (zk-create-sequential conn "/seq-" "")]
(assoc op :type :ok :value (parse-and-get-counter result-path)))
(catch Exception _ (assoc op :type :info, :error :connect-error)))))
(teardown! [_ test])
(close! [_ test]
(zk/close conn)))
(defn workload
"A generator, client, and checker for a set test."
[opts]
{:client (UniqueClient. nil nil)
:checker (checker/unique-ids)
:generator (->>
(range)
(map (fn [_] {:type :invoke, :f :generate})))})

View File

@ -0,0 +1,180 @@
(ns jepsen.nukeeper.utils
(:require [clojure.string :as str]
[zookeeper.data :as data]
[zookeeper :as zk]
[zookeeper.internal :as zi]
[jepsen.control.util :as cu]
[jepsen.nukeeper.constants :refer :all]
[jepsen.control :as c]
[clojure.tools.logging :refer :all])
(:import (org.apache.zookeeper.data Stat)
(org.apache.zookeeper CreateMode
ZooKeeper)
(org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException)))
(defn parse-long
"Parses a string to a Long. Passes through `nil` and empty strings."
[s]
(if (and s (> (count s) 0))
(Long/parseLong s)))
(defn parse-and-get-counter
[path]
(Integer/parseInt (apply str (take-last 10 (seq (str path))))))
(defn zk-range
[]
(map (fn [v] (str "/" v)) (range)))
(defn zk-path
[n]
(str "/" n))
(defn zk-connect
[host port timeout]
(zk/connect (str host ":" port) :timeout-msec timeout))
(defn zk-create-range
[conn n]
(dorun (map (fn [v] (zk/create-all conn v :persistent? true)) (take n (zk-range)))))
(defn zk-set
([conn path value]
(zk/set-data conn path (data/to-bytes (str value)) -1))
([conn path value version]
(zk/set-data conn path (data/to-bytes (str value)) version)))
(defn zk-get-str
[conn path]
(let [zk-result (zk/data conn path)]
{:data (data/to-string (:data zk-result))
:stat (:stat zk-result)}))
(defn zk-list
[conn path]
(zk/children conn path))
(defn zk-list-with-stat
[conn path]
(let [stat (new Stat)
children (seq (.getChildren conn path false stat))]
{:children children
:stat (zi/stat-to-map stat)}))
(defn zk-cas
[conn path old-value new-value]
(let [current-value (zk-get-str conn path)]
(if (= (parse-long (:data current-value)) old-value)
(do (zk-set conn path new-value (:version (:stat current-value)))
true))))
(defn zk-add-to-set
[conn path elem]
(let [current-value (zk-get-str conn path)
current-set (read-string (:data current-value))
new-set (conj current-set elem)]
(zk-set conn path (pr-str new-set) (:version (:stat current-value)))))
(defn zk-create-if-not-exists
[conn path data]
(zk/create conn path :data (data/to-bytes (str data)) :persistent? true))
(defn zk-create-sequential
[conn path-prefix data]
(zk/create conn path-prefix :data (data/to-bytes (str data)) :persistent? true :sequential? true))
(defn zk-multi-create-many-seq-nodes
[conn path-prefix num]
(let [txn (.transaction conn)]
(loop [i 0]
(cond (>= i num) (.commit txn)
:else (do (.create txn path-prefix
(data/to-bytes "")
(zi/acls :open-acl-unsafe)
CreateMode/PERSISTENT_SEQUENTIAL)
(recur (inc i)))))))
; sync call not implemented in zookeeper-clj and don't have sync version in java API
(defn zk-sync
[conn]
(zk-set conn "/" "" -1))
(defn zk-parent-path
[path]
(let [rslash_pos (str/last-index-of path "/")]
(if (> rslash_pos 0)
(subs path 0 rslash_pos)
"/")))
(defn zk-multi-delete-first-child
[conn path]
(let [{children :children stat :stat} (zk-list-with-stat conn path)
txn (.transaction conn)
first-child (first (sort children))]
(if (not (nil? first-child))
(try
(do (.check txn path (:version stat))
(.setData txn path (data/to-bytes "") -1) ; I'm just checking multitransactions
(.delete txn (str path first-child) -1)
(.commit txn)
first-child)
(catch KeeperException$BadVersionException _ nil)
; Even if we got connection loss, delete may actually be executed.
; This function is used for queue model, which strictly require
; all enqueued elements to be dequeued, but allow duplicates.
; So even in case when we not sure about delete we return first-child.
(catch Exception _ first-child))
nil)))
(defn clickhouse-alive?
[node test]
(info "Checking server alive on" node)
(try
(c/exec binary-path :client :--query "SELECT 1")
(catch Exception _ false)))
(defn wait-clickhouse-alive!
[node test & {:keys [maxtries] :or {maxtries 30}}]
(loop [i 0]
(cond (> i maxtries) false
(clickhouse-alive? node test) true
:else (do (Thread/sleep 1000) (recur (inc i))))))
(defn kill-clickhouse!
[node test]
(info "Killing server on node" node)
(c/su
(cu/stop-daemon! binary-path pid-file-path)
(c/exec :rm :-fr (str data-dir "/status"))))
(defn start-clickhouse!
[node test]
(info "Starting server on node" node)
(c/su
(cu/start-daemon!
{:pidfile pid-file-path
:logfile stderr-file
:chdir data-dir}
binary-path
:server
:--config (str configs-dir "/config.xml")
:--
:--path (str data-dir "/")
:--user_files_path (str data-dir "/user_files")
:--top_level_domains_path (str data-dir "/top_level_domains")
:--logger.log (str logs-dir "/clickhouse-server.log")
:--logger.errorlog (str logs-dir "/clickhouse-server.err.log")
:--test_keeper_server.snapshot_storage_path coordination-snapshots-dir
:--test_keeper_server.logs_storage_path coordination-logs-dir)
(wait-clickhouse-alive! node test)))
(defn exec-with-retries
[retries f & args]
(let [res (try {:value (apply f args)}
(catch Exception e
(if (zero? retries)
(throw e)
{:exception e})))]
(if (:exception res)
(do (Thread/sleep 1000) (recur (dec retries) f args))
(:value res))))

View File

@ -0,0 +1,39 @@
(ns jepsen.nukeeper-test
(:require [clojure.test :refer :all]
[jepsen.nukeeper.utils :refer :all]
[zookeeper :as zk]
[zookeeper.data :as data])
(:import (ch.qos.logback.classic Level)
(org.slf4j Logger LoggerFactory)))
(defn multicreate
[conn]
(dorun (map (fn [v] (zk/create conn v :persistent? true)) (take 10 (zk-range)))))
(defn multidelete
[conn]
(dorun (map (fn [v] (zk/delete conn v)) (take 10 (zk-range)))))
(deftest a-test
(testing "nukeeper connection"
(.setLevel
(LoggerFactory/getLogger "org.apache.zookeeper") Level/OFF)
(let [conn (zk/connect "localhost:9181" :timeout-msec 5000)]
;(println (take 10 (zk-range)))
;(multidelete conn)
;(multicreate conn)
;(zk/create-all conn "/0")
;(zk/create conn "/0")
;(println (zk/children conn "/"))
;(zk/set-data conn "/0" (data/to-bytes "777") -1)
(println (zk-parent-path "/sasds/dasda/das"))
(println (zk-parent-path "/sasds"))
(zk-multi-create-many-seq-nodes conn "/a-" 5)
(println (zk/children conn "/"))
(println (zk-list-with-stat conn "/"))
(println (zk-multi-delete-first-child conn "/"))
(println (zk-list-with-stat conn "/"))
;(Thread/sleep 5000)
;(println "VALUE" (data/to-string (:data (zk/data conn "/0"))))
;(is (= (data/to-string (:data (zk/data conn "/0"))) "777"))
(zk/close conn))))

View File

@ -21,6 +21,7 @@ if (NOT DEFINED ENABLE_UTILS OR ENABLE_UTILS)
add_subdirectory (corrector_utf8)
add_subdirectory (zookeeper-cli)
add_subdirectory (zookeeper-test)
add_subdirectory (nukeeper-data-dumper)
add_subdirectory (zookeeper-dump-tree)
add_subdirectory (zookeeper-remove-by-list)
add_subdirectory (zookeeper-create-entry-to-download-part)

View File

@ -0,0 +1,2 @@
add_executable(nukeeper-data-dumper main.cpp)
target_link_libraries(nukeeper-data-dumper PRIVATE dbms)

View File

@ -0,0 +1,87 @@
#include <Poco/ConsoleChannel.h>
#include <Poco/Logger.h>
#include <Coordination/NuKeeperStateMachine.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Exception.h>
#include <libnuraft/nuraft.hxx> // Y_IGNORE
#include <Coordination/NuKeeperLogStore.h>
#include <Coordination/Changelog.h>
#include <common/logger_useful.h>
using namespace Coordination;
using namespace DB;
void dumpMachine(std::shared_ptr<NuKeeperStateMachine> machine)
{
auto & storage = machine->getStorage();
std::queue<std::string> keys;
keys.push("/");
while (!keys.empty())
{
auto key = keys.front();
keys.pop();
std::cout << key << "\n";
auto value = storage.container.getValue(key);
std::cout << "\tStat: {version: " << value.stat.version <<
", mtime: " << value.stat.mtime <<
", emphemeralOwner: " << value.stat.ephemeralOwner <<
", czxid: " << value.stat.czxid <<
", mzxid: " << value.stat.mzxid <<
", numChildren: " << value.stat.numChildren <<
", dataLength: " << value.stat.dataLength <<
"}" << std::endl;
std::cout << "\tData: " << storage.container.getValue(key).data << std::endl;
for (const auto & child : value.children)
{
if (key == "/")
keys.push(key + child);
else
keys.push(key + "/" + child);
}
}
std::cout << std::flush;
}
int main(int argc, char *argv[])
{
if (argc != 3)
{
std::cerr << "usage: " << argv[0] << " snapshotpath logpath" << std::endl;
return 3;
}
else
{
Poco::AutoPtr<Poco::ConsoleChannel> channel(new Poco::ConsoleChannel(std::cerr));
Poco::Logger::root().setChannel(channel);
Poco::Logger::root().setLevel("trace");
}
auto * logger = &Poco::Logger::get("nukeeper-dumper");
ResponsesQueue queue;
SnapshotsQueue snapshots_queue{1};
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
auto state_machine = std::make_shared<NuKeeperStateMachine>(queue, snapshots_queue, argv[1], settings);
state_machine->init();
size_t last_commited_index = state_machine->last_commit_index();
LOG_INFO(logger, "Last committed index: {}", last_commited_index);
DB::NuKeeperLogStore changelog(argv[2], 10000000, true);
changelog.init(last_commited_index, 10000000000UL); /// collect all logs
if (changelog.size() == 0)
LOG_INFO(logger, "Changelog empty");
else
LOG_INFO(logger, "Last changelog entry {}", changelog.next_slot() - 1);
for (size_t i = last_commited_index + 1; i < changelog.next_slot(); ++i)
{
if (changelog.entry_at(i)->get_val_type() == nuraft::log_val_type::app_log)
state_machine->commit(i, changelog.entry_at(i)->get_buf());
}
dumpMachine(state_machine);
return 0;
}