mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge remote-tracking branch 'origin/master' into mmap-cache
This commit is contained in:
commit
b3750b670f
2
contrib/NuRaft
vendored
2
contrib/NuRaft
vendored
@ -1 +1 @@
|
||||
Subproject commit 3d3683e77753cfe015a05fae95ddf418e19f59e1
|
||||
Subproject commit 70468326ad5d72e9497944838484c591dae054ea
|
@ -31,6 +31,8 @@ struct Settings;
|
||||
M(UInt64, rotate_log_storage_interval, 10000, "How many records will be stored in one log storage file", 0) \
|
||||
M(UInt64, snapshots_to_keep, 3, "How many compressed snapshots to keep on disk", 0) \
|
||||
M(UInt64, stale_log_gap, 10000, "When node became stale and should receive snapshots from leader", 0) \
|
||||
M(UInt64, fresh_log_gap, 200, "When node became fresh", 0) \
|
||||
M(Bool, quorum_reads, false, "Execute read requests as writes through whole RAFT consesus with similar speed", 0) \
|
||||
M(Bool, force_sync, true, " Call fsync on each change in RAFT changelog", 0)
|
||||
|
||||
DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS)
|
||||
|
@ -30,6 +30,8 @@ NuKeeperServer::NuKeeperServer(
|
||||
, state_manager(nuraft::cs_new<NuKeeperStateManager>(server_id, "test_keeper_server", config, coordination_settings))
|
||||
, responses_queue(responses_queue_)
|
||||
{
|
||||
if (coordination_settings->quorum_reads)
|
||||
LOG_WARNING(&Poco::Logger::get("NuKeeperServer"), "Quorum reads enabled, NuKeeper will work slower.");
|
||||
}
|
||||
|
||||
void NuKeeperServer::startup()
|
||||
@ -59,6 +61,7 @@ void NuKeeperServer::startup()
|
||||
params.reserved_log_items_ = coordination_settings->reserved_log_items;
|
||||
params.snapshot_distance_ = coordination_settings->snapshot_distance;
|
||||
params.stale_log_gap_ = coordination_settings->stale_log_gap;
|
||||
params.fresh_log_gap_ = coordination_settings->fresh_log_gap;
|
||||
params.client_req_timeout_ = coordination_settings->operation_timeout_ms.totalMilliseconds();
|
||||
params.auto_forwarding_ = coordination_settings->auto_forwarding;
|
||||
params.auto_forwarding_req_timeout_ = coordination_settings->operation_timeout_ms.totalMilliseconds() * 2;
|
||||
@ -106,7 +109,7 @@ nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, const Coord
|
||||
void NuKeeperServer::putRequest(const NuKeeperStorage::RequestForSession & request_for_session)
|
||||
{
|
||||
auto [session_id, request] = request_for_session;
|
||||
if (isLeaderAlive() && request->isReadRequest())
|
||||
if (!coordination_settings->quorum_reads && isLeaderAlive() && request->isReadRequest())
|
||||
{
|
||||
state_machine->processReadRequest(request_for_session);
|
||||
}
|
||||
@ -185,6 +188,9 @@ nuraft::cb_func::ReturnCode NuKeeperServer::callbackFunc(nuraft::cb_func::Type t
|
||||
if (next_index < last_commited || next_index - last_commited <= 1)
|
||||
commited_store = true;
|
||||
|
||||
if (initialized_flag)
|
||||
return nuraft::cb_func::ReturnCode::Ok;
|
||||
|
||||
auto set_initialized = [this] ()
|
||||
{
|
||||
std::unique_lock lock(initialized_mutex);
|
||||
@ -196,10 +202,27 @@ nuraft::cb_func::ReturnCode NuKeeperServer::callbackFunc(nuraft::cb_func::Type t
|
||||
{
|
||||
case nuraft::cb_func::BecomeLeader:
|
||||
{
|
||||
if (commited_store) /// We become leader and store is empty, ready to serve requests
|
||||
/// We become leader and store is empty or we already committed it
|
||||
if (commited_store || initial_batch_committed)
|
||||
set_initialized();
|
||||
return nuraft::cb_func::ReturnCode::Ok;
|
||||
}
|
||||
case nuraft::cb_func::BecomeFollower:
|
||||
case nuraft::cb_func::GotAppendEntryReqFromLeader:
|
||||
{
|
||||
if (isLeaderAlive())
|
||||
{
|
||||
auto leader_index = raft_instance->get_leader_committed_log_idx();
|
||||
auto our_index = raft_instance->get_committed_log_idx();
|
||||
/// This may happen when we start RAFT cluster from scratch.
|
||||
/// Node first became leader, and after that some other node became leader.
|
||||
/// BecameFresh for this node will not be called because it was already fresh
|
||||
/// when it was leader.
|
||||
if (leader_index < our_index + coordination_settings->fresh_log_gap)
|
||||
set_initialized();
|
||||
}
|
||||
return nuraft::cb_func::ReturnCode::Ok;
|
||||
}
|
||||
case nuraft::cb_func::BecomeFresh:
|
||||
{
|
||||
set_initialized(); /// We are fresh follower, ready to serve requests.
|
||||
@ -209,6 +232,7 @@ nuraft::cb_func::ReturnCode NuKeeperServer::callbackFunc(nuraft::cb_func::Type t
|
||||
{
|
||||
if (isLeader()) /// We have committed our log store and we are leader, ready to serve requests.
|
||||
set_initialized();
|
||||
initial_batch_committed = true;
|
||||
return nuraft::cb_func::ReturnCode::Ok;
|
||||
}
|
||||
default: /// ignore other events
|
||||
@ -220,7 +244,7 @@ void NuKeeperServer::waitInit()
|
||||
{
|
||||
std::unique_lock lock(initialized_mutex);
|
||||
int64_t timeout = coordination_settings->startup_timeout.totalMilliseconds();
|
||||
if (!initialized_cv.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return initialized_flag; }))
|
||||
if (!initialized_cv.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return initialized_flag.load(); }))
|
||||
throw Exception(ErrorCodes::RAFT_ERROR, "Failed to wait RAFT initialization");
|
||||
}
|
||||
|
||||
|
@ -31,8 +31,9 @@ private:
|
||||
ResponsesQueue & responses_queue;
|
||||
|
||||
std::mutex initialized_mutex;
|
||||
bool initialized_flag = false;
|
||||
std::atomic<bool> initialized_flag = false;
|
||||
std::condition_variable initialized_cv;
|
||||
std::atomic<bool> initial_batch_committed = false;
|
||||
|
||||
nuraft::cb_func::ReturnCode callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * param);
|
||||
|
||||
|
@ -241,9 +241,10 @@ NuKeeperStorageSnapshot::~NuKeeperStorageSnapshot()
|
||||
storage->disableSnapshotMode();
|
||||
}
|
||||
|
||||
NuKeeperSnapshotManager::NuKeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_)
|
||||
NuKeeperSnapshotManager::NuKeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_, size_t storage_tick_time_)
|
||||
: snapshots_path(snapshots_path_)
|
||||
, snapshots_to_keep(snapshots_to_keep_)
|
||||
, storage_tick_time(storage_tick_time_)
|
||||
{
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
@ -325,22 +326,24 @@ nuraft::ptr<nuraft::buffer> NuKeeperSnapshotManager::serializeSnapshotToBuffer(c
|
||||
return writer.getBuffer();
|
||||
}
|
||||
|
||||
SnapshotMetadataPtr NuKeeperSnapshotManager::deserializeSnapshotFromBuffer(NuKeeperStorage * storage, nuraft::ptr<nuraft::buffer> buffer)
|
||||
SnapshotMetaAndStorage NuKeeperSnapshotManager::deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const
|
||||
{
|
||||
ReadBufferFromNuraftBuffer reader(buffer);
|
||||
CompressedReadBuffer compressed_reader(reader);
|
||||
return NuKeeperStorageSnapshot::deserialize(*storage, compressed_reader);
|
||||
auto storage = std::make_unique<NuKeeperStorage>(storage_tick_time);
|
||||
auto snapshot_metadata = NuKeeperStorageSnapshot::deserialize(*storage, compressed_reader);
|
||||
return std::make_pair(snapshot_metadata, std::move(storage));
|
||||
}
|
||||
|
||||
SnapshotMetadataPtr NuKeeperSnapshotManager::restoreFromLatestSnapshot(NuKeeperStorage * storage)
|
||||
SnapshotMetaAndStorage NuKeeperSnapshotManager::restoreFromLatestSnapshot()
|
||||
{
|
||||
if (existing_snapshots.empty())
|
||||
return nullptr;
|
||||
return {};
|
||||
|
||||
auto buffer = deserializeLatestSnapshotBufferFromDisk();
|
||||
if (!buffer)
|
||||
return nullptr;
|
||||
return deserializeSnapshotFromBuffer(storage, buffer);
|
||||
return {};
|
||||
return deserializeSnapshotFromBuffer(buffer);
|
||||
}
|
||||
|
||||
void NuKeeperSnapshotManager::removeOutdatedSnapshotsIfNeeded()
|
||||
|
@ -40,17 +40,20 @@ public:
|
||||
using NuKeeperStorageSnapshotPtr = std::shared_ptr<NuKeeperStorageSnapshot>;
|
||||
using CreateSnapshotCallback = std::function<void(NuKeeperStorageSnapshotPtr &&)>;
|
||||
|
||||
|
||||
using SnapshotMetaAndStorage = std::pair<SnapshotMetadataPtr, NuKeeperStoragePtr>;
|
||||
|
||||
class NuKeeperSnapshotManager
|
||||
{
|
||||
public:
|
||||
NuKeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_);
|
||||
NuKeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_, size_t storage_tick_time_ = 500);
|
||||
|
||||
SnapshotMetadataPtr restoreFromLatestSnapshot(NuKeeperStorage * storage);
|
||||
SnapshotMetaAndStorage restoreFromLatestSnapshot();
|
||||
|
||||
static nuraft::ptr<nuraft::buffer> serializeSnapshotToBuffer(const NuKeeperStorageSnapshot & snapshot);
|
||||
std::string serializeSnapshotBufferToDisk(nuraft::buffer & buffer, size_t up_to_log_idx);
|
||||
|
||||
static SnapshotMetadataPtr deserializeSnapshotFromBuffer(NuKeeperStorage * storage, nuraft::ptr<nuraft::buffer> buffer);
|
||||
SnapshotMetaAndStorage deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const;
|
||||
|
||||
nuraft::ptr<nuraft::buffer> deserializeSnapshotBufferFromDisk(size_t up_to_log_idx) const;
|
||||
nuraft::ptr<nuraft::buffer> deserializeLatestSnapshotBufferFromDisk();
|
||||
@ -74,6 +77,7 @@ private:
|
||||
const std::string snapshots_path;
|
||||
const size_t snapshots_to_keep;
|
||||
std::map<size_t, std::string> existing_snapshots;
|
||||
size_t storage_tick_time;
|
||||
};
|
||||
|
||||
struct CreateSnapshotTask
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperIO.h>
|
||||
#include <Coordination/NuKeeperSnapshotManager.h>
|
||||
#include <future>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -37,8 +38,7 @@ NuKeeperStorage::RequestForSession parseRequest(nuraft::buffer & data)
|
||||
|
||||
NuKeeperStateMachine::NuKeeperStateMachine(ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_)
|
||||
: coordination_settings(coordination_settings_)
|
||||
, storage(coordination_settings->dead_session_check_period_ms.totalMilliseconds())
|
||||
, snapshot_manager(snapshots_path_, coordination_settings->snapshots_to_keep)
|
||||
, snapshot_manager(snapshots_path_, coordination_settings->snapshots_to_keep, coordination_settings->dead_session_check_period_ms.totalMicroseconds())
|
||||
, responses_queue(responses_queue_)
|
||||
, snapshots_queue(snapshots_queue_)
|
||||
, last_committed_idx(0)
|
||||
@ -60,7 +60,7 @@ void NuKeeperStateMachine::init()
|
||||
try
|
||||
{
|
||||
latest_snapshot_buf = snapshot_manager.deserializeSnapshotBufferFromDisk(latest_log_index);
|
||||
latest_snapshot_meta = snapshot_manager.deserializeSnapshotFromBuffer(&storage, latest_snapshot_buf);
|
||||
std::tie(latest_snapshot_meta, storage) = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_buf);
|
||||
last_committed_idx = latest_snapshot_meta->get_last_log_idx();
|
||||
loaded = true;
|
||||
break;
|
||||
@ -83,6 +83,9 @@ void NuKeeperStateMachine::init()
|
||||
{
|
||||
LOG_DEBUG(log, "No existing snapshots, last committed log index {}", last_committed_idx);
|
||||
}
|
||||
|
||||
if (!storage)
|
||||
storage = std::make_unique<NuKeeperStorage>(coordination_settings->dead_session_check_period_ms.totalMilliseconds());
|
||||
}
|
||||
|
||||
nuraft::ptr<nuraft::buffer> NuKeeperStateMachine::commit(const size_t log_idx, nuraft::buffer & data)
|
||||
@ -96,7 +99,7 @@ nuraft::ptr<nuraft::buffer> NuKeeperStateMachine::commit(const size_t log_idx, n
|
||||
nuraft::buffer_serializer bs(response);
|
||||
{
|
||||
std::lock_guard lock(storage_lock);
|
||||
session_id = storage.getSessionID(session_timeout_ms);
|
||||
session_id = storage->getSessionID(session_timeout_ms);
|
||||
bs.put_i64(session_id);
|
||||
}
|
||||
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_timeout_ms);
|
||||
@ -109,7 +112,7 @@ nuraft::ptr<nuraft::buffer> NuKeeperStateMachine::commit(const size_t log_idx, n
|
||||
NuKeeperStorage::ResponsesForSessions responses_for_sessions;
|
||||
{
|
||||
std::lock_guard lock(storage_lock);
|
||||
responses_for_sessions = storage.processRequest(request_for_session.request, request_for_session.session_id, log_idx);
|
||||
responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx);
|
||||
for (auto & response_for_session : responses_for_sessions)
|
||||
responses_queue.push(response_for_session);
|
||||
}
|
||||
@ -133,7 +136,7 @@ bool NuKeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
|
||||
|
||||
{
|
||||
std::lock_guard lock(storage_lock);
|
||||
snapshot_manager.deserializeSnapshotFromBuffer(&storage, latest_snapshot_ptr);
|
||||
std::tie(latest_snapshot_meta, storage) = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_ptr);
|
||||
}
|
||||
last_committed_idx = s.get_last_log_idx();
|
||||
return true;
|
||||
@ -157,7 +160,7 @@ void NuKeeperStateMachine::create_snapshot(
|
||||
CreateSnapshotTask snapshot_task;
|
||||
{
|
||||
std::lock_guard lock(storage_lock);
|
||||
snapshot_task.snapshot = std::make_shared<NuKeeperStorageSnapshot>(&storage, snapshot_meta_copy);
|
||||
snapshot_task.snapshot = std::make_shared<NuKeeperStorageSnapshot>(storage.get(), snapshot_meta_copy);
|
||||
}
|
||||
|
||||
snapshot_task.create_snapshot = [this, when_done] (NuKeeperStorageSnapshotPtr && snapshot)
|
||||
@ -179,7 +182,7 @@ void NuKeeperStateMachine::create_snapshot(
|
||||
{
|
||||
/// Must do it with lock (clearing elements from list)
|
||||
std::lock_guard lock(storage_lock);
|
||||
storage.clearGarbageAfterSnapshot();
|
||||
storage->clearGarbageAfterSnapshot();
|
||||
/// Destroy snapshot with lock
|
||||
snapshot.reset();
|
||||
LOG_TRACE(log, "Cleared garbage after snapshot");
|
||||
@ -214,7 +217,7 @@ void NuKeeperStateMachine::save_logical_snp_obj(
|
||||
if (obj_id == 0)
|
||||
{
|
||||
std::lock_guard lock(storage_lock);
|
||||
NuKeeperStorageSnapshot snapshot(&storage, s.get_last_log_idx());
|
||||
NuKeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx());
|
||||
cloned_buffer = snapshot_manager.serializeSnapshotToBuffer(snapshot);
|
||||
}
|
||||
else
|
||||
@ -225,7 +228,28 @@ void NuKeeperStateMachine::save_logical_snp_obj(
|
||||
nuraft::ptr<nuraft::buffer> snp_buf = s.serialize();
|
||||
cloned_meta = nuraft::snapshot::deserialize(*snp_buf);
|
||||
|
||||
auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*cloned_buffer, s.get_last_log_idx());
|
||||
/// Sometimes NuRaft can call save and create snapshots from different threads
|
||||
/// at once. To avoid race conditions we serialize snapshots through snapshots_queue
|
||||
/// TODO: make something better
|
||||
CreateSnapshotTask snapshot_task;
|
||||
std::shared_ptr<std::promise<void>> waiter = std::make_shared<std::promise<void>>();
|
||||
auto future = waiter->get_future();
|
||||
snapshot_task.snapshot = nullptr;
|
||||
snapshot_task.create_snapshot = [this, waiter, cloned_buffer, log_idx = s.get_last_log_idx()] (NuKeeperStorageSnapshotPtr &&)
|
||||
{
|
||||
try
|
||||
{
|
||||
auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*cloned_buffer, log_idx);
|
||||
LOG_DEBUG(log, "Saved snapshot {} to path {}", log_idx, result_path);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(log);
|
||||
}
|
||||
waiter->set_value();
|
||||
};
|
||||
snapshots_queue.push(std::move(snapshot_task));
|
||||
future.wait();
|
||||
|
||||
{
|
||||
std::lock_guard lock(snapshots_lock);
|
||||
@ -233,7 +257,6 @@ void NuKeeperStateMachine::save_logical_snp_obj(
|
||||
latest_snapshot_meta = cloned_meta;
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Created snapshot {} with path {}", s.get_last_log_idx(), result_path);
|
||||
|
||||
obj_id++;
|
||||
}
|
||||
@ -271,7 +294,7 @@ void NuKeeperStateMachine::processReadRequest(const NuKeeperStorage::RequestForS
|
||||
NuKeeperStorage::ResponsesForSessions responses;
|
||||
{
|
||||
std::lock_guard lock(storage_lock);
|
||||
responses = storage.processRequest(request_for_session.request, request_for_session.session_id, std::nullopt);
|
||||
responses = storage->processRequest(request_for_session.request, request_for_session.session_id, std::nullopt);
|
||||
}
|
||||
for (const auto & response : responses)
|
||||
responses_queue.push(response);
|
||||
@ -280,13 +303,13 @@ void NuKeeperStateMachine::processReadRequest(const NuKeeperStorage::RequestForS
|
||||
std::unordered_set<int64_t> NuKeeperStateMachine::getDeadSessions()
|
||||
{
|
||||
std::lock_guard lock(storage_lock);
|
||||
return storage.getDeadSessions();
|
||||
return storage->getDeadSessions();
|
||||
}
|
||||
|
||||
void NuKeeperStateMachine::shutdownStorage()
|
||||
{
|
||||
std::lock_guard lock(storage_lock);
|
||||
storage.finalize();
|
||||
storage->finalize();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -52,7 +52,7 @@ public:
|
||||
|
||||
NuKeeperStorage & getStorage()
|
||||
{
|
||||
return storage;
|
||||
return *storage;
|
||||
}
|
||||
|
||||
void processReadRequest(const NuKeeperStorage::RequestForSession & request_for_session);
|
||||
@ -68,7 +68,7 @@ private:
|
||||
|
||||
CoordinationSettingsPtr coordination_settings;
|
||||
|
||||
NuKeeperStorage storage;
|
||||
NuKeeperStoragePtr storage;
|
||||
|
||||
NuKeeperSnapshotManager snapshot_manager;
|
||||
|
||||
|
@ -233,7 +233,7 @@ struct NuKeeperStorageGetRequest final : public NuKeeperStorageRequest
|
||||
struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest
|
||||
{
|
||||
using NuKeeperStorageRequest::NuKeeperStorageRequest;
|
||||
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & ephemerals, int64_t /*zxid*/, int64_t session_id) const override
|
||||
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & ephemerals, int64_t /*zxid*/, int64_t /*session_id*/) const override
|
||||
{
|
||||
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
|
||||
Coordination::ZooKeeperRemoveResponse & response = dynamic_cast<Coordination::ZooKeeperRemoveResponse &>(*response_ptr);
|
||||
@ -257,7 +257,12 @@ struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest
|
||||
{
|
||||
auto prev_node = it->value;
|
||||
if (prev_node.stat.ephemeralOwner != 0)
|
||||
ephemerals[session_id].erase(request.path);
|
||||
{
|
||||
auto ephemerals_it = ephemerals.find(prev_node.stat.ephemeralOwner);
|
||||
ephemerals_it->second.erase(request.path);
|
||||
if (ephemerals_it->second.empty())
|
||||
ephemerals.erase(ephemerals_it);
|
||||
}
|
||||
|
||||
auto child_basename = getBaseName(it->key);
|
||||
container.updateValue(parentPath(request.path), [&child_basename] (NuKeeperStorage::Node & parent)
|
||||
@ -271,10 +276,10 @@ struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest
|
||||
|
||||
container.erase(request.path);
|
||||
|
||||
undo = [prev_node, &container, &ephemerals, session_id, path = request.path, child_basename]
|
||||
undo = [prev_node, &container, &ephemerals, path = request.path, child_basename]
|
||||
{
|
||||
if (prev_node.stat.ephemeralOwner != 0)
|
||||
ephemerals[session_id].emplace(path);
|
||||
ephemerals[prev_node.stat.ephemeralOwner].emplace(path);
|
||||
|
||||
container.insert(path, prev_node);
|
||||
container.updateValue(parentPath(path), [&child_basename] (NuKeeperStorage::Node & parent)
|
||||
@ -377,7 +382,6 @@ struct NuKeeperStorageSetRequest final : public NuKeeperStorageRequest
|
||||
{
|
||||
return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CHANGED);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
struct NuKeeperStorageListRequest final : public NuKeeperStorageRequest
|
||||
@ -641,6 +645,13 @@ NuKeeperStorage::ResponsesForSessions NuKeeperStorage::processRequest(const Coor
|
||||
for (const auto & ephemeral_path : it->second)
|
||||
{
|
||||
container.erase(ephemeral_path);
|
||||
container.updateValue(parentPath(ephemeral_path), [&ephemeral_path] (NuKeeperStorage::Node & parent)
|
||||
{
|
||||
--parent.stat.numChildren;
|
||||
++parent.stat.cversion;
|
||||
parent.children.erase(getBaseName(ephemeral_path));
|
||||
});
|
||||
|
||||
auto responses = processWatchesImpl(ephemeral_path, watches, list_watches, Coordination::Event::DELETED);
|
||||
results.insert(results.end(), responses.begin(), responses.end());
|
||||
}
|
||||
|
@ -131,4 +131,6 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
using NuKeeperStoragePtr = std::unique_ptr<NuKeeperStorage>;
|
||||
|
||||
}
|
||||
|
@ -132,6 +132,10 @@ void NuKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigurati
|
||||
|
||||
coordination_settings->loadFromConfig("test_keeper_server.coordination_settings", config);
|
||||
|
||||
request_thread = ThreadFromGlobalPool([this] { requestThread(); });
|
||||
responses_thread = ThreadFromGlobalPool([this] { responseThread(); });
|
||||
snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); });
|
||||
|
||||
server = std::make_unique<NuKeeperServer>(myid, coordination_settings, config, responses_queue, snapshots_queue);
|
||||
try
|
||||
{
|
||||
@ -148,10 +152,8 @@ void NuKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigurati
|
||||
throw;
|
||||
}
|
||||
|
||||
request_thread = ThreadFromGlobalPool([this] { requestThread(); });
|
||||
responses_thread = ThreadFromGlobalPool([this] { responseThread(); });
|
||||
|
||||
session_cleaner_thread = ThreadFromGlobalPool([this] { sessionCleanerTask(); });
|
||||
snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); });
|
||||
|
||||
LOG_DEBUG(log, "Dispatcher initialized");
|
||||
}
|
||||
|
@ -897,25 +897,25 @@ TEST(CoordinationTest, TestStorageSnapshotSimple)
|
||||
manager.serializeSnapshotBufferToDisk(*buf, 2);
|
||||
EXPECT_TRUE(fs::exists("./snapshots/snapshot_2.bin"));
|
||||
|
||||
DB::NuKeeperStorage restored_storage(500);
|
||||
|
||||
auto debuf = manager.deserializeSnapshotBufferFromDisk(2);
|
||||
manager.deserializeSnapshotFromBuffer(&restored_storage, debuf);
|
||||
|
||||
EXPECT_EQ(restored_storage.container.size(), 3);
|
||||
EXPECT_EQ(restored_storage.container.getValue("/").children.size(), 1);
|
||||
EXPECT_EQ(restored_storage.container.getValue("/hello").children.size(), 1);
|
||||
EXPECT_EQ(restored_storage.container.getValue("/hello/somepath").children.size(), 0);
|
||||
auto [snapshot_meta, restored_storage] = manager.deserializeSnapshotFromBuffer(debuf);
|
||||
|
||||
EXPECT_EQ(restored_storage.container.getValue("/").data, "");
|
||||
EXPECT_EQ(restored_storage.container.getValue("/hello").data, "world");
|
||||
EXPECT_EQ(restored_storage.container.getValue("/hello/somepath").data, "somedata");
|
||||
EXPECT_EQ(restored_storage.session_id_counter, 7);
|
||||
EXPECT_EQ(restored_storage.zxid, 2);
|
||||
EXPECT_EQ(restored_storage.ephemerals.size(), 2);
|
||||
EXPECT_EQ(restored_storage.ephemerals[3].size(), 1);
|
||||
EXPECT_EQ(restored_storage.ephemerals[1].size(), 1);
|
||||
EXPECT_EQ(restored_storage.session_and_timeout.size(), 2);
|
||||
EXPECT_EQ(restored_storage->container.size(), 3);
|
||||
EXPECT_EQ(restored_storage->container.getValue("/").children.size(), 1);
|
||||
EXPECT_EQ(restored_storage->container.getValue("/hello").children.size(), 1);
|
||||
EXPECT_EQ(restored_storage->container.getValue("/hello/somepath").children.size(), 0);
|
||||
|
||||
EXPECT_EQ(restored_storage->container.getValue("/").data, "");
|
||||
EXPECT_EQ(restored_storage->container.getValue("/hello").data, "world");
|
||||
EXPECT_EQ(restored_storage->container.getValue("/hello/somepath").data, "somedata");
|
||||
EXPECT_EQ(restored_storage->session_id_counter, 7);
|
||||
EXPECT_EQ(restored_storage->zxid, 2);
|
||||
EXPECT_EQ(restored_storage->ephemerals.size(), 2);
|
||||
EXPECT_EQ(restored_storage->ephemerals[3].size(), 1);
|
||||
EXPECT_EQ(restored_storage->ephemerals[1].size(), 1);
|
||||
EXPECT_EQ(restored_storage->session_and_timeout.size(), 2);
|
||||
}
|
||||
|
||||
TEST(CoordinationTest, TestStorageSnapshotMoreWrites)
|
||||
@ -946,15 +946,14 @@ TEST(CoordinationTest, TestStorageSnapshotMoreWrites)
|
||||
manager.serializeSnapshotBufferToDisk(*buf, 50);
|
||||
EXPECT_TRUE(fs::exists("./snapshots/snapshot_50.bin"));
|
||||
|
||||
DB::NuKeeperStorage restored_storage(500);
|
||||
|
||||
auto debuf = manager.deserializeSnapshotBufferFromDisk(50);
|
||||
manager.deserializeSnapshotFromBuffer(&restored_storage, debuf);
|
||||
auto [meta, restored_storage] = manager.deserializeSnapshotFromBuffer(debuf);
|
||||
|
||||
EXPECT_EQ(restored_storage.container.size(), 51);
|
||||
EXPECT_EQ(restored_storage->container.size(), 51);
|
||||
for (size_t i = 0; i < 50; ++i)
|
||||
{
|
||||
EXPECT_EQ(restored_storage.container.getValue("/hello_" + std::to_string(i)).data, "world_" + std::to_string(i));
|
||||
EXPECT_EQ(restored_storage->container.getValue("/hello_" + std::to_string(i)).data, "world_" + std::to_string(i));
|
||||
}
|
||||
}
|
||||
|
||||
@ -987,14 +986,13 @@ TEST(CoordinationTest, TestStorageSnapshotManySnapshots)
|
||||
EXPECT_TRUE(fs::exists("./snapshots/snapshot_250.bin"));
|
||||
|
||||
|
||||
DB::NuKeeperStorage restored_storage(500);
|
||||
manager.restoreFromLatestSnapshot(&restored_storage);
|
||||
auto [meta, restored_storage] = manager.restoreFromLatestSnapshot();
|
||||
|
||||
EXPECT_EQ(restored_storage.container.size(), 251);
|
||||
EXPECT_EQ(restored_storage->container.size(), 251);
|
||||
|
||||
for (size_t i = 0; i < 250; ++i)
|
||||
{
|
||||
EXPECT_EQ(restored_storage.container.getValue("/hello_" + std::to_string(i)).data, "world_" + std::to_string(i));
|
||||
EXPECT_EQ(restored_storage->container.getValue("/hello_" + std::to_string(i)).data, "world_" + std::to_string(i));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1040,12 +1038,11 @@ TEST(CoordinationTest, TestStorageSnapshotMode)
|
||||
EXPECT_FALSE(storage.container.contains("/hello_" + std::to_string(i)));
|
||||
}
|
||||
|
||||
DB::NuKeeperStorage restored_storage(500);
|
||||
manager.restoreFromLatestSnapshot(&restored_storage);
|
||||
auto [meta, restored_storage] = manager.restoreFromLatestSnapshot();
|
||||
|
||||
for (size_t i = 0; i < 50; ++i)
|
||||
{
|
||||
EXPECT_EQ(restored_storage.container.getValue("/hello_" + std::to_string(i)).data, "world_" + std::to_string(i));
|
||||
EXPECT_EQ(restored_storage->container.getValue("/hello_" + std::to_string(i)).data, "world_" + std::to_string(i));
|
||||
}
|
||||
|
||||
}
|
||||
@ -1071,8 +1068,7 @@ TEST(CoordinationTest, TestStorageSnapshotBroken)
|
||||
plain_buf.truncate(34);
|
||||
plain_buf.sync();
|
||||
|
||||
DB::NuKeeperStorage restored_storage(500);
|
||||
EXPECT_THROW(manager.restoreFromLatestSnapshot(&restored_storage), DB::Exception);
|
||||
EXPECT_THROW(manager.restoreFromLatestSnapshot(), DB::Exception);
|
||||
}
|
||||
|
||||
nuraft::ptr<nuraft::buffer> getBufferFromZKRequest(int64_t session_id, const Coordination::ZooKeeperRequestPtr & request)
|
||||
@ -1236,6 +1232,37 @@ TEST(CoordinationTest, TestStateMachineAndLogStore)
|
||||
}
|
||||
}
|
||||
|
||||
TEST(CoordinationTest, TestEphemeralNodeRemove)
|
||||
{
|
||||
using namespace Coordination;
|
||||
using namespace DB;
|
||||
|
||||
ChangelogDirTest snapshots("./snapshots");
|
||||
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
|
||||
|
||||
ResponsesQueue queue;
|
||||
SnapshotsQueue snapshots_queue{1};
|
||||
auto state_machine = std::make_shared<NuKeeperStateMachine>(queue, snapshots_queue, "./snapshots", settings);
|
||||
state_machine->init();
|
||||
|
||||
std::shared_ptr<ZooKeeperCreateRequest> request_c = std::make_shared<ZooKeeperCreateRequest>();
|
||||
request_c->path = "/hello";
|
||||
request_c->is_ephemeral = true;
|
||||
auto entry_c = getLogEntryFromZKRequest(0, 1, request_c);
|
||||
state_machine->commit(1, entry_c->get_buf());
|
||||
const auto & storage = state_machine->getStorage();
|
||||
|
||||
EXPECT_EQ(storage.ephemerals.size(), 1);
|
||||
std::shared_ptr<ZooKeeperRemoveRequest> request_d = std::make_shared<ZooKeeperRemoveRequest>();
|
||||
request_d->path = "/hello";
|
||||
/// Delete from other session
|
||||
auto entry_d = getLogEntryFromZKRequest(0, 2, request_d);
|
||||
state_machine->commit(2, entry_d->get_buf());
|
||||
|
||||
EXPECT_EQ(storage.ephemerals.size(), 0);
|
||||
}
|
||||
|
||||
|
||||
int main(int argc, char ** argv)
|
||||
{
|
||||
Poco::AutoPtr<Poco::ConsoleChannel> channel(new Poco::ConsoleChannel(std::cerr));
|
||||
|
@ -240,16 +240,10 @@ Poco::Timespan NuKeeperTCPHandler::receiveHandshake()
|
||||
throw Exception("Unexpected protocol version: " + toString(protocol_version), ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
||||
|
||||
Coordination::read(last_zxid_seen, *in);
|
||||
|
||||
if (last_zxid_seen != 0)
|
||||
throw Exception("Non zero last_zxid_seen is not supported", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
||||
|
||||
Coordination::read(timeout_ms, *in);
|
||||
|
||||
/// TODO Stop ignoring this value
|
||||
Coordination::read(previous_session_id, *in);
|
||||
|
||||
if (previous_session_id != 0)
|
||||
throw Exception("Non zero previous session id is not supported", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
||||
|
||||
Coordination::read(passwd, *in);
|
||||
|
||||
int8_t readonly;
|
||||
|
@ -4932,7 +4932,7 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry(
|
||||
|
||||
const auto & stop_waiting = [&]()
|
||||
{
|
||||
bool stop_waiting_itself = waiting_itself && is_dropped;
|
||||
bool stop_waiting_itself = waiting_itself && (partial_shutdown_called || is_dropped);
|
||||
bool stop_waiting_non_active = !wait_for_non_active && !getZooKeeper()->exists(table_zookeeper_path + "/replicas/" + replica + "/is_active");
|
||||
return stop_waiting_itself || stop_waiting_non_active;
|
||||
};
|
||||
|
@ -80,7 +80,7 @@ def test_load_dictionaries(started_cluster):
|
||||
create_dict(table_name)
|
||||
dict_name = 'dict0'
|
||||
|
||||
node1.query("SYSTEM RELOAD DICTIONARIES")
|
||||
node1.query("SYSTEM RELOAD DICTIONARY {}".format(dict_name))
|
||||
assert node1.query("SELECT count() FROM `test`.`dict_table_{}`".format(table_name)).rstrip() == '10000'
|
||||
assert node1.query("SELECT dictGetUInt32('{}', 'id', toUInt64(0))".format(dict_name)) == '0\n'
|
||||
assert node1.query("SELECT dictGetUInt32('{}', 'value', toUInt64(9999))".format(dict_name)) == '9999\n'
|
||||
|
@ -141,7 +141,8 @@ def test_reload_after_loading(started_cluster):
|
||||
time.sleep(1) # see the comment above
|
||||
replace_in_file_in_container('/etc/clickhouse-server/config.d/executable.xml', '81', '82')
|
||||
replace_in_file_in_container('/etc/clickhouse-server/config.d/file.txt', '101', '102')
|
||||
query("SYSTEM RELOAD DICTIONARIES")
|
||||
query("SYSTEM RELOAD DICTIONARY 'file'")
|
||||
query("SYSTEM RELOAD DICTIONARY 'executable'")
|
||||
assert query("SELECT dictGetInt32('executable', 'a', toUInt64(7))") == "82\n"
|
||||
assert query("SELECT dictGetInt32('file', 'a', toUInt64(9))") == "102\n"
|
||||
|
||||
|
@ -97,12 +97,14 @@ def test_insecure():
|
||||
n1.query('SELECT * FROM dist_insecure')
|
||||
|
||||
def test_insecure_insert_async():
|
||||
n1.query("TRUNCATE TABLE data")
|
||||
n1.query('INSERT INTO dist_insecure SELECT * FROM numbers(2)')
|
||||
n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER insecure dist_insecure')
|
||||
assert int(n1.query('SELECT count() FROM dist_insecure')) == 2
|
||||
n1.query('TRUNCATE TABLE data ON CLUSTER insecure')
|
||||
|
||||
def test_insecure_insert_sync():
|
||||
n1.query("TRUNCATE TABLE data")
|
||||
n1.query('INSERT INTO dist_insecure SELECT * FROM numbers(2)', settings={'insert_distributed_sync': 1})
|
||||
assert int(n1.query('SELECT count() FROM dist_insecure')) == 2
|
||||
n1.query('TRUNCATE TABLE data ON CLUSTER secure')
|
||||
@ -111,12 +113,14 @@ def test_secure():
|
||||
n1.query('SELECT * FROM dist_secure')
|
||||
|
||||
def test_secure_insert_async():
|
||||
n1.query("TRUNCATE TABLE data")
|
||||
n1.query('INSERT INTO dist_secure SELECT * FROM numbers(2)')
|
||||
n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure')
|
||||
assert int(n1.query('SELECT count() FROM dist_secure')) == 2
|
||||
n1.query('TRUNCATE TABLE data ON CLUSTER secure')
|
||||
|
||||
def test_secure_insert_sync():
|
||||
n1.query("TRUNCATE TABLE data")
|
||||
n1.query('INSERT INTO dist_secure SELECT * FROM numbers(2)', settings={'insert_distributed_sync': 1})
|
||||
assert int(n1.query('SELECT count() FROM dist_secure')) == 2
|
||||
n1.query('TRUNCATE TABLE data ON CLUSTER secure')
|
||||
@ -126,6 +130,7 @@ def test_secure_insert_sync():
|
||||
# Buffer() flush happens with global context, that does not have user
|
||||
# And so Context::user/ClientInfo::current_user/ClientInfo::initial_user will be empty
|
||||
def test_secure_insert_buffer_async():
|
||||
n1.query("TRUNCATE TABLE data")
|
||||
n1.query('INSERT INTO dist_secure_buffer SELECT * FROM numbers(2)')
|
||||
n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure dist_secure')
|
||||
# no Buffer flush happened
|
||||
@ -141,6 +146,7 @@ def test_secure_disagree():
|
||||
n1.query('SELECT * FROM dist_secure_disagree')
|
||||
|
||||
def test_secure_disagree_insert():
|
||||
n1.query("TRUNCATE TABLE data")
|
||||
n1.query('INSERT INTO dist_secure_disagree SELECT * FROM numbers(2)')
|
||||
with pytest.raises(QueryRuntimeException, match='.*Hash mismatch.*'):
|
||||
n1.query('SYSTEM FLUSH DISTRIBUTED ON CLUSTER secure_disagree dist_secure_disagree')
|
||||
|
@ -43,6 +43,8 @@ def start_small_cluster():
|
||||
|
||||
|
||||
def test_single_endpoint_connections_count(start_small_cluster):
|
||||
node1.query("TRUNCATE TABLE test_table")
|
||||
node2.query("SYSTEM SYNC REPLICA test_table")
|
||||
def task(count):
|
||||
print(("Inserting ten times from {}".format(count)))
|
||||
for i in range(count, count + 10):
|
||||
@ -58,9 +60,11 @@ def test_single_endpoint_connections_count(start_small_cluster):
|
||||
|
||||
|
||||
def test_keepalive_timeout(start_small_cluster):
|
||||
current_count = int(node1.query("select count() from test_table").strip())
|
||||
node1.query("TRUNCATE TABLE test_table")
|
||||
node2.query("SYSTEM SYNC REPLICA test_table")
|
||||
|
||||
node1.query("insert into test_table values ('2017-06-16', 777, 0)")
|
||||
assert_eq_with_retry(node2, "select count() from test_table", str(current_count + 1))
|
||||
assert_eq_with_retry(node2, "select count() from test_table", str(1))
|
||||
# Server keepAliveTimeout is 3 seconds, default client session timeout is 8
|
||||
# lets sleep in that interval
|
||||
time.sleep(4)
|
||||
@ -69,7 +73,7 @@ def test_keepalive_timeout(start_small_cluster):
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
assert_eq_with_retry(node2, "select count() from test_table", str(current_count + 2))
|
||||
assert_eq_with_retry(node2, "select count() from test_table", str(2))
|
||||
|
||||
assert not node2.contains_in_log("No message received"), "Found 'No message received' in clickhouse-server.log"
|
||||
|
||||
|
@ -360,6 +360,7 @@ def test_bridge_dies_with_parent(started_cluster):
|
||||
|
||||
assert clickhouse_pid is None
|
||||
assert bridge_pid is None
|
||||
node1.start_clickhouse(20)
|
||||
|
||||
|
||||
def test_odbc_postgres_date_data_type(started_cluster):
|
||||
|
@ -396,6 +396,10 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
|
||||
node_right.query("OPTIMIZE TABLE test_ttl_group_by FINAL")
|
||||
node_right.query("OPTIMIZE TABLE test_ttl_where FINAL")
|
||||
|
||||
node_left.query("SYSTEM SYNC REPLICA test_ttl_delete", timeout=20)
|
||||
node_left.query("SYSTEM SYNC REPLICA test_ttl_group_by", timeout=20)
|
||||
node_left.query("SYSTEM SYNC REPLICA test_ttl_where", timeout=20)
|
||||
|
||||
assert node_left.query("SELECT id FROM test_ttl_delete ORDER BY id") == "2\n4\n"
|
||||
assert node_right.query("SELECT id FROM test_ttl_delete ORDER BY id") == "2\n4\n"
|
||||
|
||||
|
13
tests/jepsen.nukeeper/.gitignore
vendored
Normal file
13
tests/jepsen.nukeeper/.gitignore
vendored
Normal file
@ -0,0 +1,13 @@
|
||||
/target
|
||||
/classes
|
||||
/checkouts
|
||||
profiles.clj
|
||||
pom.xml
|
||||
pom.xml.asc
|
||||
*.jar
|
||||
*.class
|
||||
/.lein-*
|
||||
/.nrepl-port
|
||||
/.prepl-port
|
||||
.hgignore
|
||||
.hg/
|
280
tests/jepsen.nukeeper/LICENSE
Normal file
280
tests/jepsen.nukeeper/LICENSE
Normal file
@ -0,0 +1,280 @@
|
||||
Eclipse Public License - v 2.0
|
||||
|
||||
THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE
|
||||
PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION
|
||||
OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT.
|
||||
|
||||
1. DEFINITIONS
|
||||
|
||||
"Contribution" means:
|
||||
|
||||
a) in the case of the initial Contributor, the initial content
|
||||
Distributed under this Agreement, and
|
||||
|
||||
b) in the case of each subsequent Contributor:
|
||||
i) changes to the Program, and
|
||||
ii) additions to the Program;
|
||||
where such changes and/or additions to the Program originate from
|
||||
and are Distributed by that particular Contributor. A Contribution
|
||||
"originates" from a Contributor if it was added to the Program by
|
||||
such Contributor itself or anyone acting on such Contributor's behalf.
|
||||
Contributions do not include changes or additions to the Program that
|
||||
are not Modified Works.
|
||||
|
||||
"Contributor" means any person or entity that Distributes the Program.
|
||||
|
||||
"Licensed Patents" mean patent claims licensable by a Contributor which
|
||||
are necessarily infringed by the use or sale of its Contribution alone
|
||||
or when combined with the Program.
|
||||
|
||||
"Program" means the Contributions Distributed in accordance with this
|
||||
Agreement.
|
||||
|
||||
"Recipient" means anyone who receives the Program under this Agreement
|
||||
or any Secondary License (as applicable), including Contributors.
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source Code or other
|
||||
form, that is based on (or derived from) the Program and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship.
|
||||
|
||||
"Modified Works" shall mean any work in Source Code or other form that
|
||||
results from an addition to, deletion from, or modification of the
|
||||
contents of the Program, including, for purposes of clarity any new file
|
||||
in Source Code form that contains any contents of the Program. Modified
|
||||
Works shall not include works that contain only declarations,
|
||||
interfaces, types, classes, structures, or files of the Program solely
|
||||
in each case in order to link to, bind by name, or subclass the Program
|
||||
or Modified Works thereof.
|
||||
|
||||
"Distribute" means the acts of a) distributing or b) making available
|
||||
in any manner that enables the transfer of a copy.
|
||||
|
||||
"Source Code" means the form of a Program preferred for making
|
||||
modifications, including but not limited to software source code,
|
||||
documentation source, and configuration files.
|
||||
|
||||
"Secondary License" means either the GNU General Public License,
|
||||
Version 2.0, or any later versions of that license, including any
|
||||
exceptions or additional permissions as identified by the initial
|
||||
Contributor.
|
||||
|
||||
2. GRANT OF RIGHTS
|
||||
|
||||
a) Subject to the terms of this Agreement, each Contributor hereby
|
||||
grants Recipient a non-exclusive, worldwide, royalty-free copyright
|
||||
license to reproduce, prepare Derivative Works of, publicly display,
|
||||
publicly perform, Distribute and sublicense the Contribution of such
|
||||
Contributor, if any, and such Derivative Works.
|
||||
|
||||
b) Subject to the terms of this Agreement, each Contributor hereby
|
||||
grants Recipient a non-exclusive, worldwide, royalty-free patent
|
||||
license under Licensed Patents to make, use, sell, offer to sell,
|
||||
import and otherwise transfer the Contribution of such Contributor,
|
||||
if any, in Source Code or other form. This patent license shall
|
||||
apply to the combination of the Contribution and the Program if, at
|
||||
the time the Contribution is added by the Contributor, such addition
|
||||
of the Contribution causes such combination to be covered by the
|
||||
Licensed Patents. The patent license shall not apply to any other
|
||||
combinations which include the Contribution. No hardware per se is
|
||||
licensed hereunder.
|
||||
|
||||
c) Recipient understands that although each Contributor grants the
|
||||
licenses to its Contributions set forth herein, no assurances are
|
||||
provided by any Contributor that the Program does not infringe the
|
||||
patent or other intellectual property rights of any other entity.
|
||||
Each Contributor disclaims any liability to Recipient for claims
|
||||
brought by any other entity based on infringement of intellectual
|
||||
property rights or otherwise. As a condition to exercising the
|
||||
rights and licenses granted hereunder, each Recipient hereby
|
||||
assumes sole responsibility to secure any other intellectual
|
||||
property rights needed, if any. For example, if a third party
|
||||
patent license is required to allow Recipient to Distribute the
|
||||
Program, it is Recipient's responsibility to acquire that license
|
||||
before distributing the Program.
|
||||
|
||||
d) Each Contributor represents that to its knowledge it has
|
||||
sufficient copyright rights in its Contribution, if any, to grant
|
||||
the copyright license set forth in this Agreement.
|
||||
|
||||
e) Notwithstanding the terms of any Secondary License, no
|
||||
Contributor makes additional grants to any Recipient (other than
|
||||
those set forth in this Agreement) as a result of such Recipient's
|
||||
receipt of the Program under the terms of a Secondary License
|
||||
(if permitted under the terms of Section 3).
|
||||
|
||||
3. REQUIREMENTS
|
||||
|
||||
3.1 If a Contributor Distributes the Program in any form, then:
|
||||
|
||||
a) the Program must also be made available as Source Code, in
|
||||
accordance with section 3.2, and the Contributor must accompany
|
||||
the Program with a statement that the Source Code for the Program
|
||||
is available under this Agreement, and informs Recipients how to
|
||||
obtain it in a reasonable manner on or through a medium customarily
|
||||
used for software exchange; and
|
||||
|
||||
b) the Contributor may Distribute the Program under a license
|
||||
different than this Agreement, provided that such license:
|
||||
i) effectively disclaims on behalf of all other Contributors all
|
||||
warranties and conditions, express and implied, including
|
||||
warranties or conditions of title and non-infringement, and
|
||||
implied warranties or conditions of merchantability and fitness
|
||||
for a particular purpose;
|
||||
|
||||
ii) effectively excludes on behalf of all other Contributors all
|
||||
liability for damages, including direct, indirect, special,
|
||||
incidental and consequential damages, such as lost profits;
|
||||
|
||||
iii) does not attempt to limit or alter the recipients' rights
|
||||
in the Source Code under section 3.2; and
|
||||
|
||||
iv) requires any subsequent distribution of the Program by any
|
||||
party to be under a license that satisfies the requirements
|
||||
of this section 3.
|
||||
|
||||
3.2 When the Program is Distributed as Source Code:
|
||||
|
||||
a) it must be made available under this Agreement, or if the
|
||||
Program (i) is combined with other material in a separate file or
|
||||
files made available under a Secondary License, and (ii) the initial
|
||||
Contributor attached to the Source Code the notice described in
|
||||
Exhibit A of this Agreement, then the Program may be made available
|
||||
under the terms of such Secondary Licenses, and
|
||||
|
||||
b) a copy of this Agreement must be included with each copy of
|
||||
the Program.
|
||||
|
||||
3.3 Contributors may not remove or alter any copyright, patent,
|
||||
trademark, attribution notices, disclaimers of warranty, or limitations
|
||||
of liability ("notices") contained within the Program from any copy of
|
||||
the Program which they Distribute, provided that Contributors may add
|
||||
their own appropriate notices.
|
||||
|
||||
4. COMMERCIAL DISTRIBUTION
|
||||
|
||||
Commercial distributors of software may accept certain responsibilities
|
||||
with respect to end users, business partners and the like. While this
|
||||
license is intended to facilitate the commercial use of the Program,
|
||||
the Contributor who includes the Program in a commercial product
|
||||
offering should do so in a manner which does not create potential
|
||||
liability for other Contributors. Therefore, if a Contributor includes
|
||||
the Program in a commercial product offering, such Contributor
|
||||
("Commercial Contributor") hereby agrees to defend and indemnify every
|
||||
other Contributor ("Indemnified Contributor") against any losses,
|
||||
damages and costs (collectively "Losses") arising from claims, lawsuits
|
||||
and other legal actions brought by a third party against the Indemnified
|
||||
Contributor to the extent caused by the acts or omissions of such
|
||||
Commercial Contributor in connection with its distribution of the Program
|
||||
in a commercial product offering. The obligations in this section do not
|
||||
apply to any claims or Losses relating to any actual or alleged
|
||||
intellectual property infringement. In order to qualify, an Indemnified
|
||||
Contributor must: a) promptly notify the Commercial Contributor in
|
||||
writing of such claim, and b) allow the Commercial Contributor to control,
|
||||
and cooperate with the Commercial Contributor in, the defense and any
|
||||
related settlement negotiations. The Indemnified Contributor may
|
||||
participate in any such claim at its own expense.
|
||||
|
||||
For example, a Contributor might include the Program in a commercial
|
||||
product offering, Product X. That Contributor is then a Commercial
|
||||
Contributor. If that Commercial Contributor then makes performance
|
||||
claims, or offers warranties related to Product X, those performance
|
||||
claims and warranties are such Commercial Contributor's responsibility
|
||||
alone. Under this section, the Commercial Contributor would have to
|
||||
defend claims against the other Contributors related to those performance
|
||||
claims and warranties, and if a court requires any other Contributor to
|
||||
pay any damages as a result, the Commercial Contributor must pay
|
||||
those damages.
|
||||
|
||||
5. NO WARRANTY
|
||||
|
||||
EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT
|
||||
PERMITTED BY APPLICABLE LAW, THE PROGRAM IS PROVIDED ON AN "AS IS"
|
||||
BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR
|
||||
IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF
|
||||
TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR
|
||||
PURPOSE. Each Recipient is solely responsible for determining the
|
||||
appropriateness of using and distributing the Program and assumes all
|
||||
risks associated with its exercise of rights under this Agreement,
|
||||
including but not limited to the risks and costs of program errors,
|
||||
compliance with applicable laws, damage to or loss of data, programs
|
||||
or equipment, and unavailability or interruption of operations.
|
||||
|
||||
6. DISCLAIMER OF LIABILITY
|
||||
|
||||
EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT
|
||||
PERMITTED BY APPLICABLE LAW, NEITHER RECIPIENT NOR ANY CONTRIBUTORS
|
||||
SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
|
||||
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST
|
||||
PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE
|
||||
EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE
|
||||
POSSIBILITY OF SUCH DAMAGES.
|
||||
|
||||
7. GENERAL
|
||||
|
||||
If any provision of this Agreement is invalid or unenforceable under
|
||||
applicable law, it shall not affect the validity or enforceability of
|
||||
the remainder of the terms of this Agreement, and without further
|
||||
action by the parties hereto, such provision shall be reformed to the
|
||||
minimum extent necessary to make such provision valid and enforceable.
|
||||
|
||||
If Recipient institutes patent litigation against any entity
|
||||
(including a cross-claim or counterclaim in a lawsuit) alleging that the
|
||||
Program itself (excluding combinations of the Program with other software
|
||||
or hardware) infringes such Recipient's patent(s), then such Recipient's
|
||||
rights granted under Section 2(b) shall terminate as of the date such
|
||||
litigation is filed.
|
||||
|
||||
All Recipient's rights under this Agreement shall terminate if it
|
||||
fails to comply with any of the material terms or conditions of this
|
||||
Agreement and does not cure such failure in a reasonable period of
|
||||
time after becoming aware of such noncompliance. If all Recipient's
|
||||
rights under this Agreement terminate, Recipient agrees to cease use
|
||||
and distribution of the Program as soon as reasonably practicable.
|
||||
However, Recipient's obligations under this Agreement and any licenses
|
||||
granted by Recipient relating to the Program shall continue and survive.
|
||||
|
||||
Everyone is permitted to copy and distribute copies of this Agreement,
|
||||
but in order to avoid inconsistency the Agreement is copyrighted and
|
||||
may only be modified in the following manner. The Agreement Steward
|
||||
reserves the right to publish new versions (including revisions) of
|
||||
this Agreement from time to time. No one other than the Agreement
|
||||
Steward has the right to modify this Agreement. The Eclipse Foundation
|
||||
is the initial Agreement Steward. The Eclipse Foundation may assign the
|
||||
responsibility to serve as the Agreement Steward to a suitable separate
|
||||
entity. Each new version of the Agreement will be given a distinguishing
|
||||
version number. The Program (including Contributions) may always be
|
||||
Distributed subject to the version of the Agreement under which it was
|
||||
received. In addition, after a new version of the Agreement is published,
|
||||
Contributor may elect to Distribute the Program (including its
|
||||
Contributions) under the new version.
|
||||
|
||||
Except as expressly stated in Sections 2(a) and 2(b) above, Recipient
|
||||
receives no rights or licenses to the intellectual property of any
|
||||
Contributor under this Agreement, whether expressly, by implication,
|
||||
estoppel or otherwise. All rights in the Program not expressly granted
|
||||
under this Agreement are reserved. Nothing in this Agreement is intended
|
||||
to be enforceable by any entity that is not a Contributor or Recipient.
|
||||
No third-party beneficiary rights are created under this Agreement.
|
||||
|
||||
Exhibit A - Form of Secondary Licenses Notice
|
||||
|
||||
"This Source Code may also be made available under the following
|
||||
Secondary Licenses when the conditions for such availability set forth
|
||||
in the Eclipse Public License, v. 2.0 are satisfied: GNU General Public
|
||||
License as published by the Free Software Foundation, either version 2
|
||||
of the License, or (at your option) any later version, with the GNU
|
||||
Classpath Exception which is available at
|
||||
https://www.gnu.org/software/classpath/license.html."
|
||||
|
||||
Simply including a copy of this Agreement, including this Exhibit A
|
||||
is not sufficient to license the Source Code under Secondary Licenses.
|
||||
|
||||
If it is not possible or desirable to put the notice in a particular
|
||||
file, then You may include the notice in a location (such as a LICENSE
|
||||
file in a relevant directory) where a recipient would be likely to
|
||||
look for such a notice.
|
||||
|
||||
You may add additional accurate notices of copyright ownership.
|
155
tests/jepsen.nukeeper/README.md
Normal file
155
tests/jepsen.nukeeper/README.md
Normal file
@ -0,0 +1,155 @@
|
||||
# Jepsen tests ClickHouse Keeper
|
||||
|
||||
A Clojure library designed to test ZooKeeper-like implementation inside ClickHouse.
|
||||
|
||||
## Test scenarios (workloads)
|
||||
|
||||
### CAS register
|
||||
|
||||
CAS Register has three operations: read number, write number, compare-and-swap number. This register is simulated as a single ZooKeeper node. Read transforms to ZooKeeper's `getData` request. Write transforms to the `set` request. Compare-and-swap implemented via `getData` + compare in code + `set` new value with `version` from `getData`.
|
||||
|
||||
In this test, we use a linearizable checker, so Jepsen validates that history was linearizable. One of the heaviest workloads.
|
||||
|
||||
Strictly requires `quorum_reads` to be true.
|
||||
|
||||
### Set
|
||||
|
||||
Set has two operations: add a number to set and read all values from set. This workload is simulated on a single ZooKeeper node with a string value that represents Clojure set data structure. Add operation very similar to compare-and-swap. We read string value from ZooKeeper node with `getData`, parse it to Clojure's set, add new value to the set and try to write it with the received version.
|
||||
|
||||
In this test, Jepsen validates that all successfully added values can be read. Generator for this workload performs only add operations until a timeout and after that tries to read set once.
|
||||
|
||||
### Unique IDs
|
||||
|
||||
In the Unique IDs workload we have only one operation: generate a new unique number. It's implemented using ZooKeeper's sequential nodes. For each generates request client just creates a new sequential node in ZooKeeper with a fixed prefix. After that cuts the prefix off from the returned path and parses the number from the rest part.
|
||||
|
||||
Jepsen checks that all returned IDs were unique.
|
||||
|
||||
### Counter
|
||||
|
||||
Counter workload has two operations: read counter value and add some number to the counter. Its implementation is quite weird. We add number `N` to the counter creating `N` sequential nodes in a single ZooKeeper transaction. Counter read implemented as `getChildren` ZooKeeper request and count of all returned nodes.
|
||||
|
||||
Jepsen checks that counter value lies in the interval of possible value. Strictly requires `quorum_reads` to be true.
|
||||
|
||||
### Total queue
|
||||
|
||||
Simulates an unordered queue with three operations: enqueue number, dequeue, and drain. Enqueue operation uses `create` request with node name equals to number. `Dequeue` operation is more interesting. We list (`getChildren`) all nodes and remember the parent node version. After that we choose the smallest one and prepare the transaction: `check` parent node version + set an empty value to parent node + delete smalled child node. Drain operation is just `getChildren` on the parent path.
|
||||
|
||||
Jepsen checks that all enqueued values were dequeued or drained. Duplicates are allowed because Jepsen doesn't know the value of the unknown-status (`:info`) dequeue operation. So when we try to `dequeue` some element we should return it even if our delete transaction failed with `Connection loss` error.
|
||||
|
||||
### Linear queue
|
||||
|
||||
Same with the total queue, but without drain operation. Checks linearizability between enqueue and dequeue. Sometimes consume more than 10GB during validation even for very short histories.
|
||||
|
||||
|
||||
## Nemesis
|
||||
|
||||
We use almost all standard nemeses with small changes for our storage.
|
||||
|
||||
### Random node killer (random-node-killer)
|
||||
|
||||
Sleep 5 seconds, kills random node, sleep for 5 seconds, and starts it back.
|
||||
|
||||
### All nodes killer (all-nodes-killer)
|
||||
|
||||
Kill all nodes at once, sleep for 5 seconds, and starts them back.
|
||||
|
||||
### Simple partitioner (simple-partitioner)
|
||||
|
||||
Partition one node from others using iptables. No one can see the victim and the victim cannot see anybody.
|
||||
|
||||
### Random node stop (random-node-hammer-time)
|
||||
|
||||
Send `SIGSTOP` to the random node. Sleep 5 seconds. Send `SIGCONT`.
|
||||
|
||||
### All nodes stop (all-nodes-hammer-time)
|
||||
|
||||
Send `SIGSTOP` to all nodes. Sleep 5 seconds. Send `SIGCONT`.
|
||||
|
||||
### Logs corruptor (logs-corruptor)
|
||||
|
||||
Corrupts latest log (change one random byte) in `clickhouse_path/coordination/logs`. Restarts nodes.
|
||||
|
||||
### Snapshots corruptor (snapshots-corruptor)
|
||||
|
||||
Corrupts latest snapshot (change one random byte) in `clickhouse_path/coordination/snapshots`. Restarts nodes.
|
||||
|
||||
### Logs and snapshots corruptor (logs-and-snapshots-corruptor)
|
||||
|
||||
Corrupts both the latest log and snapshot. Restarts node.
|
||||
|
||||
### Drop data corruptor (drop-data-corruptor)
|
||||
|
||||
Drop all data from `clickhouse_path/coordinator`. Restarts node.
|
||||
|
||||
### Bridge partitioner (bridge-partitioner)
|
||||
|
||||
Two nodes don't see each other but can see another node. The last node can see both.
|
||||
|
||||
### Blind node partitioner (blind-node-partitioner)
|
||||
|
||||
One of the nodes cannot see another, but they can see it.
|
||||
|
||||
### Blind others partitioner (blind-others-partitioner)
|
||||
|
||||
Two nodes don't see one node but it can see both.
|
||||
|
||||
## Usage
|
||||
|
||||
### Dependencies
|
||||
|
||||
- leiningen (https://leiningen.org/)
|
||||
- clojure (https://clojure.org/)
|
||||
- jvm
|
||||
|
||||
### Options for `lein run`
|
||||
|
||||
- `test` Run a single test.
|
||||
- `test-all` Run all available tests from tests-set.
|
||||
- `-w (--workload)` One of the workloads. Option for a single `test`.
|
||||
- `--nemesis` One of nemeses. Option for a single `test`.
|
||||
- `-q (--quorum)` Run test with quorum reads.
|
||||
- `-r (--rate)` How many operations per second Jepsen will generate in a single thread.
|
||||
- `-s (--snapshot-distance)` ClickHouse Keeper setting. How often we will create a new snapshot.
|
||||
- `--stale-log-gap` ClickHosue Keeper setting. A leader will send a snapshot instead of a log to this node if it's committed index less than leaders - this setting value.
|
||||
- `--reserved-log-items` ClickHouse Keeper setting. How many log items to keep after the snapshot.
|
||||
- `--ops-per-key` Option for CAS register workload. Total ops that will be generated for a single register.
|
||||
- `--lightweight-run` Run some lightweight tests without linearizability checks. Option for `tests-all` run.
|
||||
- `--reuse-binary` Don't download clickhouse binary if it already exists on the node.
|
||||
- `--clickhouse-source` URL to clickhouse `.deb`, `.tgz` or binary.
|
||||
- `--time-limit` (in seconds) How long Jepsen will generate new operations.
|
||||
- `--nodes-file` File with nodes for SSH. Newline separated.
|
||||
- `--username` SSH username for nodes.
|
||||
- `--password` SSH password for nodes.
|
||||
- `--concurrency` How many threads Jepsen will use for concurrent requests.
|
||||
- `--test-count` How many times to run a single test or how many tests to run from the tests set.
|
||||
|
||||
|
||||
### Examples:
|
||||
|
||||
1. Run `Set` workload with `logs-and-snapshots-corruptor` ten times:
|
||||
|
||||
```sh
|
||||
$ lein run test --nodes-file nodes.txt --username root --password '' --time-limit 30 --concurrency 50 -r 50 --workload set --nemesis logs-and-snapshots-corruptor --clickhouse-source 'https://clickhouse-builds.s3.yandex.net/someurl/clickhouse-common-static_21.4.1.6321_amd64.deb' -q --test-count 10 --reuse-binary
|
||||
```
|
||||
|
||||
2. Run ten random tests from `lightweight-run` with some custom Keeper settings:
|
||||
|
||||
``` sh
|
||||
$ lein run test-all --nodes-file nodes.txt --username root --password '' --time-limit 30 --concurrency 50 -r 50 --snapshot-distance 100 --stale-log-gap 100 --reserved-log-items 10 --lightweight-run --clickhouse-source 'someurl' -q --reuse-binary --test-count 10
|
||||
```
|
||||
|
||||
|
||||
## License
|
||||
|
||||
Copyright © 2021 FIXME
|
||||
|
||||
This program and the accompanying materials are made available under the
|
||||
terms of the Eclipse Public License 2.0 which is available at
|
||||
http://www.eclipse.org/legal/epl-2.0.
|
||||
|
||||
This Source Code may also be made available under the following Secondary
|
||||
Licenses when the conditions for such availability set forth in the Eclipse
|
||||
Public License, v. 2.0 are satisfied: GNU General Public License as published by
|
||||
the Free Software Foundation, either version 2 of the License, or (at your
|
||||
option) any later version, with the GNU Classpath Exception which is available
|
||||
at https://www.gnu.org/software/classpath/license.html.
|
3
tests/jepsen.nukeeper/doc/intro.md
Normal file
3
tests/jepsen.nukeeper/doc/intro.md
Normal file
@ -0,0 +1,3 @@
|
||||
# Introduction to jepsen.nukeeper
|
||||
|
||||
TODO: write [great documentation](http://jacobian.org/writing/what-to-write/)
|
13
tests/jepsen.nukeeper/project.clj
Normal file
13
tests/jepsen.nukeeper/project.clj
Normal file
@ -0,0 +1,13 @@
|
||||
(defproject jepsen.nukeeper "0.1.0-SNAPSHOT"
|
||||
:injections [(.. System (setProperty "zookeeper.request.timeout" "10000"))]
|
||||
:description "A jepsen tests for ClickHouse NuKeeper"
|
||||
:url "https://clickhouse.tech/"
|
||||
:license {:name "EPL-2.0"
|
||||
:url "https://www.eclipse.org/legal/epl-2.0/"}
|
||||
:main jepsen.nukeeper.main
|
||||
:plugins [[lein-cljfmt "0.7.0"]]
|
||||
:dependencies [[org.clojure/clojure "1.10.1"]
|
||||
[jepsen "0.2.3"]
|
||||
[zookeeper-clj "0.9.4"]
|
||||
[org.apache.zookeeper/zookeeper "3.6.1" :exclusions [org.slf4j/slf4j-log4j12]]]
|
||||
:repl-options {:init-ns jepsen.nukeeper.main})
|
1
tests/jepsen.nukeeper/resources/config.xml
Symbolic link
1
tests/jepsen.nukeeper/resources/config.xml
Symbolic link
@ -0,0 +1 @@
|
||||
../../../programs/server/config.xml
|
3
tests/jepsen.nukeeper/resources/listen.xml
Normal file
3
tests/jepsen.nukeeper/resources/listen.xml
Normal file
@ -0,0 +1,3 @@
|
||||
<yandex>
|
||||
<listen_host>::</listen_host>
|
||||
</yandex>
|
36
tests/jepsen.nukeeper/resources/test_keeper_config.xml
Normal file
36
tests/jepsen.nukeeper/resources/test_keeper_config.xml
Normal file
@ -0,0 +1,36 @@
|
||||
<yandex>
|
||||
<test_keeper_server>
|
||||
<tcp_port>9181</tcp_port>
|
||||
<server_id>{id}</server_id>
|
||||
|
||||
<coordination_settings>
|
||||
<operation_timeout_ms>10000</operation_timeout_ms>
|
||||
<session_timeout_ms>30000</session_timeout_ms>
|
||||
<force_sync>false</force_sync>
|
||||
<startup_timeout>120000</startup_timeout>
|
||||
<raft_logs_level>trace</raft_logs_level>
|
||||
<quorum_reads>{quorum_reads}</quorum_reads>
|
||||
<snapshot_distance>{snapshot_distance}</snapshot_distance>
|
||||
<stale_log_gap>{stale_log_gap}</stale_log_gap>
|
||||
<reserved_log_items>{reserved_log_items}</reserved_log_items>
|
||||
</coordination_settings>
|
||||
|
||||
<raft_configuration>
|
||||
<server>
|
||||
<id>1</id>
|
||||
<hostname>{srv1}</hostname>
|
||||
<port>9444</port>
|
||||
</server>
|
||||
<server>
|
||||
<id>2</id>
|
||||
<hostname>{srv2}</hostname>
|
||||
<port>9444</port>
|
||||
</server>
|
||||
<server>
|
||||
<id>3</id>
|
||||
<hostname>{srv3}</hostname>
|
||||
<port>9444</port>
|
||||
</server>
|
||||
</raft_configuration>
|
||||
</test_keeper_server>
|
||||
</yandex>
|
1
tests/jepsen.nukeeper/resources/users.xml
Symbolic link
1
tests/jepsen.nukeeper/resources/users.xml
Symbolic link
@ -0,0 +1 @@
|
||||
../../../programs/server/users.xml
|
18
tests/jepsen.nukeeper/src/jepsen/nukeeper/constants.clj
Normal file
18
tests/jepsen.nukeeper/src/jepsen/nukeeper/constants.clj
Normal file
@ -0,0 +1,18 @@
|
||||
(ns jepsen.nukeeper.constants)
|
||||
|
||||
(def common-prefix "/home/robot-clickhouse")
|
||||
|
||||
(def binary-name "clickhouse")
|
||||
|
||||
(def binary-path (str common-prefix "/" binary-name))
|
||||
(def pid-file-path (str common-prefix "/clickhouse.pid"))
|
||||
|
||||
(def data-dir (str common-prefix "/db"))
|
||||
(def logs-dir (str common-prefix "/logs"))
|
||||
(def configs-dir (str common-prefix "/config"))
|
||||
(def sub-configs-dir (str configs-dir "/config.d"))
|
||||
(def coordination-data-dir (str data-dir "/coordination"))
|
||||
(def coordination-snapshots-dir (str coordination-data-dir "/snapshots"))
|
||||
(def coordination-logs-dir (str coordination-data-dir "/logs"))
|
||||
|
||||
(def stderr-file (str logs-dir "/stderr.log"))
|
50
tests/jepsen.nukeeper/src/jepsen/nukeeper/counter.clj
Normal file
50
tests/jepsen.nukeeper/src/jepsen/nukeeper/counter.clj
Normal file
@ -0,0 +1,50 @@
|
||||
(ns jepsen.nukeeper.counter
|
||||
(:require
|
||||
[clojure.tools.logging :refer :all]
|
||||
[jepsen
|
||||
[checker :as checker]
|
||||
[client :as client]
|
||||
[generator :as gen]]
|
||||
[jepsen.nukeeper.utils :refer :all]
|
||||
[zookeeper :as zk])
|
||||
(:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException)))
|
||||
|
||||
(defn r [_ _] {:type :invoke, :f :read})
|
||||
(defn add [_ _] {:type :invoke, :f :add, :value (rand-int 5)})
|
||||
|
||||
(defrecord CounterClient [conn nodename]
|
||||
client/Client
|
||||
(open! [this test node]
|
||||
(assoc
|
||||
(assoc this
|
||||
:conn (zk-connect node 9181 30000))
|
||||
:nodename node))
|
||||
|
||||
(setup! [this test])
|
||||
|
||||
(invoke! [this test op]
|
||||
(case (:f op)
|
||||
:read (exec-with-retries 30 (fn []
|
||||
(assoc op
|
||||
:type :ok
|
||||
:value (count (zk-list conn "/")))))
|
||||
:add (try
|
||||
(do
|
||||
(zk-multi-create-many-seq-nodes conn "/seq-" (:value op))
|
||||
(assoc op :type :ok))
|
||||
(catch Exception _ (assoc op :type :info, :error :connect-error)))))
|
||||
|
||||
(teardown! [_ test])
|
||||
|
||||
(close! [_ test]
|
||||
(zk/close conn)))
|
||||
|
||||
(defn workload
|
||||
"A generator, client, and checker for a set test."
|
||||
[opts]
|
||||
{:client (CounterClient. nil nil)
|
||||
:checker (checker/counter)
|
||||
:generator (->> (range)
|
||||
(map (fn [x]
|
||||
(->> (gen/mix [r add])))))
|
||||
:final-generator (gen/once {:type :invoke, :f :read, :value nil})})
|
128
tests/jepsen.nukeeper/src/jepsen/nukeeper/db.clj
Normal file
128
tests/jepsen.nukeeper/src/jepsen/nukeeper/db.clj
Normal file
@ -0,0 +1,128 @@
|
||||
(ns jepsen.nukeeper.db
|
||||
(:require [clojure.tools.logging :refer :all]
|
||||
[jepsen
|
||||
[control :as c]
|
||||
[db :as db]
|
||||
[util :as util :refer [meh]]]
|
||||
[jepsen.nukeeper.constants :refer :all]
|
||||
[jepsen.nukeeper.utils :refer :all]
|
||||
[clojure.java.io :as io]
|
||||
[jepsen.control.util :as cu]
|
||||
[jepsen.os.ubuntu :as ubuntu]))
|
||||
|
||||
(defn get-clickhouse-sky
|
||||
[version]
|
||||
(c/exec :sky :get :-d common-prefix :-N :Backbone version)
|
||||
(str common-prefix "/clickhouse"))
|
||||
|
||||
(defn get-clickhouse-url
|
||||
[url]
|
||||
(let [download-result (cu/wget! url)]
|
||||
(do (c/exec :mv download-result common-prefix)
|
||||
(str common-prefix "/" download-result))))
|
||||
|
||||
(defn download-clickhouse
|
||||
[source]
|
||||
(info "Downloading clickhouse from" source)
|
||||
(cond
|
||||
(clojure.string/starts-with? source "rbtorrent:") (get-clickhouse-sky source)
|
||||
(clojure.string/starts-with? source "http") (get-clickhouse-url source)
|
||||
:else (throw (Exception. (str "Don't know how to download clickhouse from" source)))))
|
||||
|
||||
(defn unpack-deb
|
||||
[path]
|
||||
(do
|
||||
(c/exec :dpkg :-x path common-prefix)
|
||||
(c/exec :rm :-f path)
|
||||
(c/exec :mv (str common-prefix "/usr/bin/clickhouse") common-prefix)
|
||||
(c/exec :rm :-rf (str common-prefix "/usr") (str common-prefix "/etc"))))
|
||||
|
||||
(defn unpack-tgz
|
||||
[path]
|
||||
(do
|
||||
(c/exec :mkdir :-p (str common-prefix "/unpacked"))
|
||||
(c/exec :tar :-zxvf path :-C (str common-prefix "/unpacked"))
|
||||
(c/exec :rm :-f path)
|
||||
(let [subdir (c/exec :ls (str common-prefix "/unpacked"))]
|
||||
(c/exec :mv (str common-prefix "/unpacked/" subdir "/usr/bin/clickhouse") common-prefix)
|
||||
(c/exec :rm :-fr (str common-prefix "/unpacked")))))
|
||||
|
||||
(defn chmod-binary
|
||||
[path]
|
||||
(c/exec :chmod :+x path))
|
||||
|
||||
(defn install-downloaded-clickhouse
|
||||
[path]
|
||||
(cond
|
||||
(clojure.string/ends-with? path ".deb") (unpack-deb path)
|
||||
(clojure.string/ends-with? path ".tgz") (unpack-tgz path)
|
||||
(clojure.string/ends-with? path "clickhouse") (chmod-binary path)
|
||||
:else (throw (Exception. (str "Don't know how to install clickhouse from path" path)))))
|
||||
|
||||
(defn prepare-dirs
|
||||
[]
|
||||
(do
|
||||
(c/exec :mkdir :-p common-prefix)
|
||||
(c/exec :mkdir :-p data-dir)
|
||||
(c/exec :mkdir :-p logs-dir)
|
||||
(c/exec :mkdir :-p configs-dir)
|
||||
(c/exec :mkdir :-p sub-configs-dir)
|
||||
(c/exec :touch stderr-file)
|
||||
(c/exec :chown :-R :root common-prefix)))
|
||||
|
||||
(defn cluster-config
|
||||
[test node config-template]
|
||||
(let [nodes (:nodes test)
|
||||
replacement-map {#"\{srv1\}" (get nodes 0)
|
||||
#"\{srv2\}" (get nodes 1)
|
||||
#"\{srv3\}" (get nodes 2)
|
||||
#"\{id\}" (str (inc (.indexOf nodes node)))
|
||||
#"\{quorum_reads\}" (str (boolean (:quorum test)))
|
||||
#"\{snapshot_distance\}" (str (:snapshot-distance test))
|
||||
#"\{stale_log_gap\}" (str (:stale-log-gap test))
|
||||
#"\{reserved_log_items\}" (str (:reserved-log-items test))}]
|
||||
(reduce #(clojure.string/replace %1 (get %2 0) (get %2 1)) config-template replacement-map)))
|
||||
|
||||
(defn install-configs
|
||||
[test node]
|
||||
(c/exec :echo (slurp (io/resource "config.xml")) :> (str configs-dir "/config.xml"))
|
||||
(c/exec :echo (slurp (io/resource "users.xml")) :> (str configs-dir "/users.xml"))
|
||||
(c/exec :echo (slurp (io/resource "listen.xml")) :> (str sub-configs-dir "/listen.xml"))
|
||||
(c/exec :echo (cluster-config test node (slurp (io/resource "test_keeper_config.xml"))) :> (str sub-configs-dir "/test_keeper_config.xml")))
|
||||
|
||||
(defn db
|
||||
[version reuse-binary]
|
||||
(reify db/DB
|
||||
(setup! [_ test node]
|
||||
(c/su
|
||||
(do
|
||||
(info "Preparing directories")
|
||||
(prepare-dirs)
|
||||
(if (or (not (cu/exists? binary-path)) (not reuse-binary))
|
||||
(do (info "Downloading clickhouse")
|
||||
(install-downloaded-clickhouse (download-clickhouse version)))
|
||||
(info "Binary already exsist on path" binary-path "skipping download"))
|
||||
(info "Installing configs")
|
||||
(install-configs test node)
|
||||
(info "Starting server")
|
||||
(start-clickhouse! node test)
|
||||
(info "ClickHouse started"))))
|
||||
|
||||
(teardown! [_ test node]
|
||||
(info node "Tearing down clickhouse")
|
||||
(kill-clickhouse! node test)
|
||||
(c/su
|
||||
(if (not reuse-binary)
|
||||
(c/exec :rm :-rf binary-path))
|
||||
(c/exec :rm :-rf pid-file-path)
|
||||
(c/exec :rm :-rf data-dir)
|
||||
(c/exec :rm :-rf logs-dir)
|
||||
(c/exec :rm :-rf configs-dir)))
|
||||
|
||||
db/LogFiles
|
||||
(log-files [_ test node]
|
||||
(c/su
|
||||
(kill-clickhouse! node test)
|
||||
(c/cd data-dir
|
||||
(c/exec :tar :czf "coordination.tar.gz" "coordination")))
|
||||
[stderr-file (str logs-dir "/clickhouse-server.log") (str data-dir "/coordination.tar.gz")])))
|
159
tests/jepsen.nukeeper/src/jepsen/nukeeper/main.clj
Normal file
159
tests/jepsen.nukeeper/src/jepsen/nukeeper/main.clj
Normal file
@ -0,0 +1,159 @@
|
||||
(ns jepsen.nukeeper.main
|
||||
(:require [clojure.tools.logging :refer :all]
|
||||
[jepsen.nukeeper.utils :refer :all]
|
||||
[clojure.pprint :refer [pprint]]
|
||||
[jepsen.nukeeper.set :as set]
|
||||
[jepsen.nukeeper.db :refer :all]
|
||||
[jepsen.nukeeper.nemesis :as custom-nemesis]
|
||||
[jepsen.nukeeper.register :as register]
|
||||
[jepsen.nukeeper.unique :as unique]
|
||||
[jepsen.nukeeper.queue :as queue]
|
||||
[jepsen.nukeeper.counter :as counter]
|
||||
[jepsen.nukeeper.constants :refer :all]
|
||||
[clojure.string :as str]
|
||||
[jepsen
|
||||
[checker :as checker]
|
||||
[cli :as cli]
|
||||
[client :as client]
|
||||
[control :as c]
|
||||
[db :as db]
|
||||
[nemesis :as nemesis]
|
||||
[generator :as gen]
|
||||
[independent :as independent]
|
||||
[tests :as tests]
|
||||
[util :as util :refer [meh]]]
|
||||
[jepsen.control.util :as cu]
|
||||
[jepsen.os.ubuntu :as ubuntu]
|
||||
[jepsen.checker.timeline :as timeline]
|
||||
[clojure.java.io :as io]
|
||||
[zookeeper.data :as data]
|
||||
[zookeeper :as zk])
|
||||
(:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException)
|
||||
(ch.qos.logback.classic Level)
|
||||
(org.slf4j Logger LoggerFactory)))
|
||||
|
||||
(def workloads
|
||||
"A map of workload names to functions that construct workloads, given opts."
|
||||
{"set" set/workload
|
||||
"register" register/workload
|
||||
"unique-ids" unique/workload
|
||||
"counter" counter/workload
|
||||
"total-queue" queue/total-workload
|
||||
"linear-queue" queue/linear-workload})
|
||||
|
||||
(def cli-opts
|
||||
"Additional command line options."
|
||||
[["-w" "--workload NAME" "What workload should we run?"
|
||||
:default "set"
|
||||
:validate [workloads (cli/one-of workloads)]]
|
||||
[nil "--nemesis NAME" "Which nemesis will poison our lives?"
|
||||
:default "random-node-killer"
|
||||
:validate [custom-nemesis/custom-nemesises (cli/one-of custom-nemesis/custom-nemesises)]]
|
||||
["-q" "--quorum" "Use quorum reads, instead of reading from any primary."]
|
||||
["-r" "--rate HZ" "Approximate number of requests per second, per thread."
|
||||
:default 10
|
||||
:parse-fn read-string
|
||||
:validate [#(and (number? %) (pos? %)) "Must be a positive number"]]
|
||||
["-s" "--snapshot-distance NUM" "Number of log entries to create snapshot"
|
||||
:default 10000
|
||||
:parse-fn read-string
|
||||
:validate [#(and (number? %) (pos? %)) "Must be a positive number"]]
|
||||
[nil "--stale-log-gap NUM" "Number of log entries to send snapshot instead of separate logs"
|
||||
:default 1000
|
||||
:parse-fn read-string
|
||||
:validate [#(and (number? %) (pos? %)) "Must be a positive number"]]
|
||||
[nil "--reserved-log-items NUM" "Number of log entries to keep after snapshot"
|
||||
:default 1000
|
||||
:parse-fn read-string
|
||||
:validate [#(and (number? %) (pos? %)) "Must be a positive number"]]
|
||||
[nil "--ops-per-key NUM" "Maximum number of operations on any given key."
|
||||
:default 100
|
||||
:parse-fn parse-long
|
||||
:validate [pos? "Must be a positive integer."]]
|
||||
[nil, "--lightweight-run" "Subset of workloads/nemesises which is simple to validate"]
|
||||
[nil, "--reuse-binary" "Use already downloaded binary if it exists, don't remove it on shutdown"]
|
||||
["-c" "--clickhouse-source URL" "URL for clickhouse deb or tgz package"
|
||||
:default "https://clickhouse-builds.s3.yandex.net/21677/ef82333089156907a0979669d9374c2e18daabe5/clickhouse_build_check/clang-11_relwithdebuginfo_none_bundled_unsplitted_disable_False_deb/clickhouse-common-static_21.4.1.6313_amd64.deb"]])
|
||||
|
||||
(defn nukeeper-test
|
||||
"Given an options map from the command line runner (e.g. :nodes, :ssh,
|
||||
:concurrency, ...), constructs a test map."
|
||||
[opts]
|
||||
(info "Test opts\n" (with-out-str (pprint opts)))
|
||||
(let [quorum (boolean (:quorum opts))
|
||||
workload ((get workloads (:workload opts)) opts)
|
||||
current-nemesis (get custom-nemesis/custom-nemesises (:nemesis opts))]
|
||||
(merge tests/noop-test
|
||||
opts
|
||||
{:name (str "clickhouse-keeper-quorum=" quorum "-" (name (:workload opts)) "-" (name (:nemesis opts)))
|
||||
:os ubuntu/os
|
||||
:db (db (:clickhouse-source opts) (boolean (:reuse-binary opts)))
|
||||
:pure-generators true
|
||||
:client (:client workload)
|
||||
:nemesis (:nemesis current-nemesis)
|
||||
:checker (checker/compose
|
||||
{:perf (checker/perf)
|
||||
:workload (:checker workload)})
|
||||
:generator (gen/phases
|
||||
(->> (:generator workload)
|
||||
(gen/stagger (/ (:rate opts)))
|
||||
(gen/nemesis (:generator current-nemesis))
|
||||
(gen/time-limit (:time-limit opts)))
|
||||
(gen/log "Healing cluster")
|
||||
(gen/nemesis (gen/once {:type :info, :f :stop}))
|
||||
(gen/log "Waiting for recovery")
|
||||
(gen/sleep 10)
|
||||
(gen/clients (:final-generator workload)))})))
|
||||
|
||||
(def all-nemesises (keys custom-nemesis/custom-nemesises))
|
||||
|
||||
(def all-workloads (keys workloads))
|
||||
|
||||
(def lightweight-workloads ["set" "unique-ids" "counter" "total-queue"])
|
||||
|
||||
(def useful-nemesises ["random-node-killer"
|
||||
"simple-partitioner"
|
||||
"all-nodes-hammer-time"
|
||||
; can lead to a very rare data loss https://github.com/eBay/NuRaft/issues/185
|
||||
;"logs-and-snapshots-corruptor"
|
||||
;"drop-data-corruptor"
|
||||
"bridge-partitioner"
|
||||
"blind-node-partitioner"
|
||||
"blind-others-partitioner"])
|
||||
|
||||
(defn cart [colls]
|
||||
(if (empty? colls)
|
||||
'(())
|
||||
(for [more (cart (rest colls))
|
||||
x (first colls)]
|
||||
(cons x more))))
|
||||
|
||||
(defn all-test-options
|
||||
"Takes base cli options, a collection of nemeses, workloads, and a test count,
|
||||
and constructs a sequence of test options."
|
||||
[cli worload-nemeseis-collection]
|
||||
(take (:test-count cli)
|
||||
(shuffle (for [[workload nemesis] worload-nemeseis-collection]
|
||||
(assoc cli
|
||||
:nemesis nemesis
|
||||
:workload workload
|
||||
:test-count 1)))))
|
||||
(defn all-tests
|
||||
"Turns CLI options into a sequence of tests."
|
||||
[test-fn cli]
|
||||
(if (boolean (:lightweight-run cli))
|
||||
(map test-fn (all-test-options cli (cart [lightweight-workloads useful-nemesises])))
|
||||
(map test-fn (all-test-options cli (cart [all-workloads all-nemesises])))))
|
||||
|
||||
(defn -main
|
||||
"Handles command line arguments. Can either run a test, or a web server for
|
||||
browsing results."
|
||||
[& args]
|
||||
(.setLevel
|
||||
(LoggerFactory/getLogger "org.apache.zookeeper") Level/OFF)
|
||||
(cli/run! (merge (cli/single-test-cmd {:test-fn nukeeper-test
|
||||
:opt-spec cli-opts})
|
||||
(cli/test-all-cmd {:tests-fn (partial all-tests nukeeper-test)
|
||||
:opt-spec cli-opts})
|
||||
(cli/serve-cmd))
|
||||
args))
|
160
tests/jepsen.nukeeper/src/jepsen/nukeeper/nemesis.clj
Normal file
160
tests/jepsen.nukeeper/src/jepsen/nukeeper/nemesis.clj
Normal file
@ -0,0 +1,160 @@
|
||||
(ns jepsen.nukeeper.nemesis
|
||||
(:require
|
||||
[clojure.tools.logging :refer :all]
|
||||
[jepsen
|
||||
[nemesis :as nemesis]
|
||||
[control :as c]
|
||||
[generator :as gen]]
|
||||
[jepsen.nukeeper.constants :refer :all]
|
||||
[jepsen.nukeeper.utils :refer :all]))
|
||||
|
||||
(defn random-node-killer-nemesis
|
||||
[]
|
||||
(nemesis/node-start-stopper
|
||||
rand-nth
|
||||
(fn start [test node] (kill-clickhouse! node test))
|
||||
(fn stop [test node] (start-clickhouse! node test))))
|
||||
|
||||
(defn all-nodes-killer-nemesis
|
||||
[]
|
||||
(nemesis/node-start-stopper
|
||||
identity
|
||||
(fn start [test node] (kill-clickhouse! node test))
|
||||
(fn stop [test node] (start-clickhouse! node test))))
|
||||
|
||||
(defn random-node-hammer-time-nemesis
|
||||
[]
|
||||
(nemesis/hammer-time "clickhouse"))
|
||||
|
||||
(defn all-nodes-hammer-time-nemesis
|
||||
[]
|
||||
(nemesis/hammer-time identity "clickhouse"))
|
||||
|
||||
(defn select-last-file
|
||||
[path]
|
||||
(last (clojure.string/split
|
||||
(c/exec :find path :-type :f :-printf "%T+ %p\n" :| :grep :-v :tmp_ :| :sort :| :awk "{print $2}")
|
||||
#"\n")))
|
||||
|
||||
(defn random-file-pos
|
||||
[fname]
|
||||
(let [fsize (Integer/parseInt (c/exec :du :-b fname :| :cut :-f1))]
|
||||
(rand-int fsize)))
|
||||
|
||||
(defn corrupt-file
|
||||
[fname]
|
||||
(if (not (empty? fname))
|
||||
(do
|
||||
(info "Corrupting" fname)
|
||||
(c/exec :dd "if=/dev/zero" (str "of=" fname) "bs=1" "count=1" (str "seek=" (random-file-pos fname)) "conv=notrunc"))
|
||||
(info "Nothing to corrupt")))
|
||||
|
||||
(defn corruptor-nemesis
|
||||
[path corruption-op]
|
||||
(reify nemesis/Nemesis
|
||||
|
||||
(setup! [this test] this)
|
||||
|
||||
(invoke! [this test op]
|
||||
(cond (= (:f op) :corrupt)
|
||||
(let [nodes (list (rand-nth (:nodes test)))]
|
||||
(info "Corruption on node" nodes)
|
||||
(c/on-nodes test nodes
|
||||
(fn [test node]
|
||||
(c/su
|
||||
(kill-clickhouse! node test)
|
||||
(corruption-op path)
|
||||
(start-clickhouse! node test))))
|
||||
(assoc op :type :info, :value :corrupted))
|
||||
:else (do (c/on-nodes test (:nodes test)
|
||||
(fn [test node]
|
||||
(c/su
|
||||
(start-clickhouse! node test))))
|
||||
(assoc op :type :info, :value :done))))
|
||||
|
||||
(teardown! [this test])))
|
||||
|
||||
(defn logs-corruption-nemesis
|
||||
[]
|
||||
(corruptor-nemesis coordination-logs-dir #(corrupt-file (select-last-file %1))))
|
||||
|
||||
(defn snapshots-corruption-nemesis
|
||||
[]
|
||||
(corruptor-nemesis coordination-snapshots-dir #(corrupt-file (select-last-file %1))))
|
||||
|
||||
(defn logs-and-snapshots-corruption-nemesis
|
||||
[]
|
||||
(corruptor-nemesis coordination-data-dir (fn [path]
|
||||
(do
|
||||
(corrupt-file (select-last-file (str path "/snapshots")))
|
||||
(corrupt-file (select-last-file (str path "/logs")))))))
|
||||
(defn drop-all-corruption-nemesis
|
||||
[]
|
||||
(corruptor-nemesis coordination-data-dir (fn [path]
|
||||
(c/exec :rm :-fr path))))
|
||||
|
||||
(defn partition-bridge-nemesis
|
||||
[]
|
||||
(nemesis/partitioner nemesis/bridge))
|
||||
|
||||
(defn blind-node
|
||||
[nodes]
|
||||
(let [[[victim] others] (nemesis/split-one nodes)]
|
||||
{victim (into #{} others)}))
|
||||
|
||||
(defn blind-node-partition-nemesis
|
||||
[]
|
||||
(nemesis/partitioner blind-node))
|
||||
|
||||
(defn blind-others
|
||||
[nodes]
|
||||
(let [[[victim] others] (nemesis/split-one nodes)]
|
||||
(into {} (map (fn [node] [node #{victim}])) others)))
|
||||
|
||||
(defn blind-others-partition-nemesis
|
||||
[]
|
||||
(nemesis/partitioner blind-others))
|
||||
|
||||
(defn network-non-symmetric-nemesis
|
||||
[]
|
||||
(nemesis/partitioner nemesis/bridge))
|
||||
|
||||
(defn start-stop-generator
|
||||
[time-corrupt time-ok]
|
||||
(->>
|
||||
(cycle [(gen/sleep time-ok)
|
||||
{:type :info, :f :start}
|
||||
(gen/sleep time-corrupt)
|
||||
{:type :info, :f :stop}])))
|
||||
|
||||
(defn corruption-generator
|
||||
[]
|
||||
(->>
|
||||
(cycle [(gen/sleep 5)
|
||||
{:type :info, :f :corrupt}])))
|
||||
|
||||
(def custom-nemesises
|
||||
{"random-node-killer" {:nemesis (random-node-killer-nemesis)
|
||||
:generator (start-stop-generator 5 5)}
|
||||
"all-nodes-killer" {:nemesis (all-nodes-killer-nemesis)
|
||||
:generator (start-stop-generator 1 10)}
|
||||
"simple-partitioner" {:nemesis (nemesis/partition-random-halves)
|
||||
:generator (start-stop-generator 5 5)}
|
||||
"random-node-hammer-time" {:nemesis (random-node-hammer-time-nemesis)
|
||||
:generator (start-stop-generator 5 5)}
|
||||
"all-nodes-hammer-time" {:nemesis (all-nodes-hammer-time-nemesis)
|
||||
:generator (start-stop-generator 1 10)}
|
||||
"logs-corruptor" {:nemesis (logs-corruption-nemesis)
|
||||
:generator (corruption-generator)}
|
||||
"snapshots-corruptor" {:nemesis (snapshots-corruption-nemesis)
|
||||
:generator (corruption-generator)}
|
||||
"logs-and-snapshots-corruptor" {:nemesis (logs-and-snapshots-corruption-nemesis)
|
||||
:generator (corruption-generator)}
|
||||
"drop-data-corruptor" {:nemesis (drop-all-corruption-nemesis)
|
||||
:generator (corruption-generator)}
|
||||
"bridge-partitioner" {:nemesis (partition-bridge-nemesis)
|
||||
:generator (start-stop-generator 5 5)}
|
||||
"blind-node-partitioner" {:nemesis (blind-node-partition-nemesis)
|
||||
:generator (start-stop-generator 5 5)}
|
||||
"blind-others-partitioner" {:nemesis (blind-others-partition-nemesis)
|
||||
:generator (start-stop-generator 5 5)}})
|
79
tests/jepsen.nukeeper/src/jepsen/nukeeper/queue.clj
Normal file
79
tests/jepsen.nukeeper/src/jepsen/nukeeper/queue.clj
Normal file
@ -0,0 +1,79 @@
|
||||
(ns jepsen.nukeeper.queue
|
||||
(:require
|
||||
[clojure.tools.logging :refer :all]
|
||||
[jepsen
|
||||
[checker :as checker]
|
||||
[client :as client]
|
||||
[generator :as gen]]
|
||||
[knossos.model :as model]
|
||||
[jepsen.checker.timeline :as timeline]
|
||||
[jepsen.nukeeper.utils :refer :all]
|
||||
[zookeeper :as zk])
|
||||
(:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException)))
|
||||
|
||||
(defn enqueue [val _ _] {:type :invoke, :f :enqueue :value val})
|
||||
(defn dequeue [_ _] {:type :invoke, :f :dequeue})
|
||||
|
||||
(defrecord QueueClient [conn nodename]
|
||||
client/Client
|
||||
(open! [this test node]
|
||||
(assoc
|
||||
(assoc this
|
||||
:conn (zk-connect node 9181 30000))
|
||||
:nodename node))
|
||||
|
||||
(setup! [this test])
|
||||
|
||||
(invoke! [this test op]
|
||||
(case (:f op)
|
||||
:enqueue (try
|
||||
(do
|
||||
(zk-create-if-not-exists conn (str "/" (:value op)) "")
|
||||
(assoc op :type :ok))
|
||||
(catch Exception _ (assoc op :type :info, :error :connect-error)))
|
||||
:dequeue
|
||||
(try
|
||||
(let [result (zk-multi-delete-first-child conn "/")]
|
||||
(if (not (nil? result))
|
||||
(assoc op :type :ok :value result)
|
||||
(assoc op :type :fail :value result)))
|
||||
(catch Exception _ (assoc op :type :info, :error :connect-error)))
|
||||
:drain
|
||||
; drain via delete is to long, just list all nodes
|
||||
(exec-with-retries 30 (fn []
|
||||
(zk-sync conn)
|
||||
(assoc op :type :ok :value (into #{} (map #(str %1) (zk-list conn "/"))))))))
|
||||
|
||||
(teardown! [_ test])
|
||||
|
||||
(close! [_ test]
|
||||
(zk/close conn)))
|
||||
|
||||
(defn sorted-str-range
|
||||
[n]
|
||||
(sort (map (fn [v] (str v)) (take n (range)))))
|
||||
|
||||
(defn total-workload
|
||||
"A generator, client, and checker for a set test."
|
||||
[opts]
|
||||
{:client (QueueClient. nil nil)
|
||||
:checker (checker/compose
|
||||
{:total-queue (checker/total-queue)
|
||||
:timeline (timeline/html)})
|
||||
:generator (->> (sorted-str-range 50000)
|
||||
(map (fn [x]
|
||||
(rand-nth [{:type :invoke, :f :enqueue :value x}
|
||||
{:type :invoke, :f :dequeue}]))))
|
||||
:final-generator (gen/once {:type :invoke, :f :drain, :value nil})})
|
||||
|
||||
(defn linear-workload
|
||||
[opts]
|
||||
{:client (QueueClient. nil nil)
|
||||
:checker (checker/compose
|
||||
{:linear (checker/linearizable {:model (model/unordered-queue)
|
||||
:algorithm :linear})
|
||||
:timeline (timeline/html)})
|
||||
:generator (->> (sorted-str-range 10000)
|
||||
(map (fn [x]
|
||||
(rand-nth [{:type :invoke, :f :enqueue :value x}
|
||||
{:type :invoke, :f :dequeue}]))))})
|
64
tests/jepsen.nukeeper/src/jepsen/nukeeper/register.clj
Normal file
64
tests/jepsen.nukeeper/src/jepsen/nukeeper/register.clj
Normal file
@ -0,0 +1,64 @@
|
||||
(ns jepsen.nukeeper.register
|
||||
(:require [jepsen
|
||||
[checker :as checker]
|
||||
[client :as client]
|
||||
[independent :as independent]
|
||||
[generator :as gen]]
|
||||
[jepsen.checker.timeline :as timeline]
|
||||
[knossos.model :as model]
|
||||
[jepsen.nukeeper.utils :refer :all]
|
||||
[zookeeper :as zk])
|
||||
(:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException)))
|
||||
|
||||
(defn r [_ _] {:type :invoke, :f :read, :value nil})
|
||||
(defn w [_ _] {:type :invoke, :f :write, :value (rand-int 5)})
|
||||
(defn cas [_ _] {:type :invoke, :f :cas, :value [(rand-int 5) (rand-int 5)]})
|
||||
|
||||
(defrecord RegisterClient [conn]
|
||||
client/Client
|
||||
(open! [this test node]
|
||||
(assoc this :conn (zk-connect node 9181 30000)))
|
||||
|
||||
(setup! [this test]
|
||||
(zk-create-range conn 300)) ; 300 nodes to be sure
|
||||
|
||||
(invoke! [_ test op]
|
||||
(let [[k v] (:value op)
|
||||
zk-k (zk-path k)]
|
||||
(case (:f op)
|
||||
:read (try
|
||||
(assoc op :type :ok, :value (independent/tuple k (parse-long (:data (zk-get-str conn zk-k)))))
|
||||
(catch Exception _ (assoc op :type :fail, :error :connect-error)))
|
||||
:write (try
|
||||
(do (zk-set conn zk-k v)
|
||||
(assoc op :type :ok))
|
||||
(catch Exception _ (assoc op :type :info, :error :connect-error)))
|
||||
:cas (try
|
||||
(let [[old new] v]
|
||||
(assoc op :type (if (zk-cas conn zk-k old new)
|
||||
:ok
|
||||
:fail)))
|
||||
(catch KeeperException$BadVersionException _ (assoc op :type :fail, :error :bad-version))
|
||||
(catch Exception _ (assoc op :type :info, :error :connect-error))))))
|
||||
|
||||
(teardown! [this test])
|
||||
|
||||
(close! [_ test]
|
||||
(zk/close conn)))
|
||||
|
||||
(defn workload
|
||||
"Tests linearizable reads, writes, and compare-and-set operations on
|
||||
independent keys."
|
||||
[opts]
|
||||
{:client (RegisterClient. nil)
|
||||
:checker (independent/checker
|
||||
(checker/compose
|
||||
{:linear (checker/linearizable {:model (model/cas-register)
|
||||
:algorithm :linear})
|
||||
:timeline (timeline/html)}))
|
||||
:generator (independent/concurrent-generator
|
||||
10
|
||||
(range)
|
||||
(fn [k]
|
||||
(->> (gen/mix [r w cas])
|
||||
(gen/limit (:ops-per-key opts)))))})
|
49
tests/jepsen.nukeeper/src/jepsen/nukeeper/set.clj
Normal file
49
tests/jepsen.nukeeper/src/jepsen/nukeeper/set.clj
Normal file
@ -0,0 +1,49 @@
|
||||
(ns jepsen.nukeeper.set
|
||||
(:require
|
||||
[clojure.tools.logging :refer :all]
|
||||
[jepsen
|
||||
[checker :as checker]
|
||||
[client :as client]
|
||||
[generator :as gen]]
|
||||
[jepsen.nukeeper.utils :refer :all]
|
||||
[zookeeper :as zk])
|
||||
(:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException)))
|
||||
|
||||
(defrecord SetClient [k conn nodename]
|
||||
client/Client
|
||||
(open! [this test node]
|
||||
(assoc
|
||||
(assoc this
|
||||
:conn (zk-connect node 9181 30000))
|
||||
:nodename node))
|
||||
|
||||
(setup! [this test]
|
||||
(zk-create-if-not-exists conn k "#{}"))
|
||||
|
||||
(invoke! [this test op]
|
||||
(case (:f op)
|
||||
:read (exec-with-retries 30 (fn []
|
||||
(zk-sync conn)
|
||||
(assoc op
|
||||
:type :ok
|
||||
:value (read-string (:data (zk-get-str conn k))))))
|
||||
:add (try
|
||||
(do
|
||||
(zk-add-to-set conn k (:value op))
|
||||
(assoc op :type :ok))
|
||||
(catch KeeperException$BadVersionException _ (assoc op :type :fail, :error :bad-version))
|
||||
(catch Exception _ (assoc op :type :info, :error :connect-error)))))
|
||||
|
||||
(teardown! [_ test])
|
||||
|
||||
(close! [_ test]
|
||||
(zk/close conn)))
|
||||
|
||||
(defn workload
|
||||
"A generator, client, and checker for a set test."
|
||||
[opts]
|
||||
{:client (SetClient. "/a-set" nil nil)
|
||||
:checker (checker/set)
|
||||
:generator (->> (range)
|
||||
(map (fn [x] {:type :invoke, :f :add, :value x})))
|
||||
:final-generator (gen/once {:type :invoke, :f :read, :value nil})})
|
42
tests/jepsen.nukeeper/src/jepsen/nukeeper/unique.clj
Normal file
42
tests/jepsen.nukeeper/src/jepsen/nukeeper/unique.clj
Normal file
@ -0,0 +1,42 @@
|
||||
(ns jepsen.nukeeper.unique
|
||||
(:require
|
||||
[clojure.tools.logging :refer :all]
|
||||
[jepsen
|
||||
[checker :as checker]
|
||||
[client :as client]
|
||||
[generator :as gen]]
|
||||
[jepsen.nukeeper.utils :refer :all]
|
||||
[zookeeper :as zk])
|
||||
(:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException)))
|
||||
|
||||
(defrecord UniqueClient [conn nodename]
|
||||
client/Client
|
||||
(open! [this test node]
|
||||
(assoc
|
||||
(assoc this
|
||||
:conn (zk-connect node 9181 30000))
|
||||
:nodename node))
|
||||
|
||||
(setup! [this test])
|
||||
|
||||
(invoke! [this test op]
|
||||
(case
|
||||
:generate
|
||||
(try
|
||||
(let [result-path (zk-create-sequential conn "/seq-" "")]
|
||||
(assoc op :type :ok :value (parse-and-get-counter result-path)))
|
||||
(catch Exception _ (assoc op :type :info, :error :connect-error)))))
|
||||
|
||||
(teardown! [_ test])
|
||||
|
||||
(close! [_ test]
|
||||
(zk/close conn)))
|
||||
|
||||
(defn workload
|
||||
"A generator, client, and checker for a set test."
|
||||
[opts]
|
||||
{:client (UniqueClient. nil nil)
|
||||
:checker (checker/unique-ids)
|
||||
:generator (->>
|
||||
(range)
|
||||
(map (fn [_] {:type :invoke, :f :generate})))})
|
180
tests/jepsen.nukeeper/src/jepsen/nukeeper/utils.clj
Normal file
180
tests/jepsen.nukeeper/src/jepsen/nukeeper/utils.clj
Normal file
@ -0,0 +1,180 @@
|
||||
(ns jepsen.nukeeper.utils
|
||||
(:require [clojure.string :as str]
|
||||
[zookeeper.data :as data]
|
||||
[zookeeper :as zk]
|
||||
[zookeeper.internal :as zi]
|
||||
[jepsen.control.util :as cu]
|
||||
[jepsen.nukeeper.constants :refer :all]
|
||||
[jepsen.control :as c]
|
||||
[clojure.tools.logging :refer :all])
|
||||
(:import (org.apache.zookeeper.data Stat)
|
||||
(org.apache.zookeeper CreateMode
|
||||
ZooKeeper)
|
||||
(org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException)))
|
||||
|
||||
(defn parse-long
|
||||
"Parses a string to a Long. Passes through `nil` and empty strings."
|
||||
[s]
|
||||
(if (and s (> (count s) 0))
|
||||
(Long/parseLong s)))
|
||||
|
||||
(defn parse-and-get-counter
|
||||
[path]
|
||||
(Integer/parseInt (apply str (take-last 10 (seq (str path))))))
|
||||
|
||||
(defn zk-range
|
||||
[]
|
||||
(map (fn [v] (str "/" v)) (range)))
|
||||
|
||||
(defn zk-path
|
||||
[n]
|
||||
(str "/" n))
|
||||
|
||||
(defn zk-connect
|
||||
[host port timeout]
|
||||
(zk/connect (str host ":" port) :timeout-msec timeout))
|
||||
|
||||
(defn zk-create-range
|
||||
[conn n]
|
||||
(dorun (map (fn [v] (zk/create-all conn v :persistent? true)) (take n (zk-range)))))
|
||||
|
||||
(defn zk-set
|
||||
([conn path value]
|
||||
(zk/set-data conn path (data/to-bytes (str value)) -1))
|
||||
([conn path value version]
|
||||
(zk/set-data conn path (data/to-bytes (str value)) version)))
|
||||
|
||||
(defn zk-get-str
|
||||
[conn path]
|
||||
(let [zk-result (zk/data conn path)]
|
||||
{:data (data/to-string (:data zk-result))
|
||||
:stat (:stat zk-result)}))
|
||||
|
||||
(defn zk-list
|
||||
[conn path]
|
||||
(zk/children conn path))
|
||||
|
||||
(defn zk-list-with-stat
|
||||
[conn path]
|
||||
(let [stat (new Stat)
|
||||
children (seq (.getChildren conn path false stat))]
|
||||
{:children children
|
||||
:stat (zi/stat-to-map stat)}))
|
||||
|
||||
(defn zk-cas
|
||||
[conn path old-value new-value]
|
||||
(let [current-value (zk-get-str conn path)]
|
||||
(if (= (parse-long (:data current-value)) old-value)
|
||||
(do (zk-set conn path new-value (:version (:stat current-value)))
|
||||
true))))
|
||||
|
||||
(defn zk-add-to-set
|
||||
[conn path elem]
|
||||
(let [current-value (zk-get-str conn path)
|
||||
current-set (read-string (:data current-value))
|
||||
new-set (conj current-set elem)]
|
||||
(zk-set conn path (pr-str new-set) (:version (:stat current-value)))))
|
||||
|
||||
(defn zk-create-if-not-exists
|
||||
[conn path data]
|
||||
(zk/create conn path :data (data/to-bytes (str data)) :persistent? true))
|
||||
|
||||
(defn zk-create-sequential
|
||||
[conn path-prefix data]
|
||||
(zk/create conn path-prefix :data (data/to-bytes (str data)) :persistent? true :sequential? true))
|
||||
|
||||
(defn zk-multi-create-many-seq-nodes
|
||||
[conn path-prefix num]
|
||||
(let [txn (.transaction conn)]
|
||||
(loop [i 0]
|
||||
(cond (>= i num) (.commit txn)
|
||||
:else (do (.create txn path-prefix
|
||||
(data/to-bytes "")
|
||||
(zi/acls :open-acl-unsafe)
|
||||
CreateMode/PERSISTENT_SEQUENTIAL)
|
||||
(recur (inc i)))))))
|
||||
|
||||
; sync call not implemented in zookeeper-clj and don't have sync version in java API
|
||||
(defn zk-sync
|
||||
[conn]
|
||||
(zk-set conn "/" "" -1))
|
||||
|
||||
(defn zk-parent-path
|
||||
[path]
|
||||
(let [rslash_pos (str/last-index-of path "/")]
|
||||
(if (> rslash_pos 0)
|
||||
(subs path 0 rslash_pos)
|
||||
"/")))
|
||||
|
||||
(defn zk-multi-delete-first-child
|
||||
[conn path]
|
||||
(let [{children :children stat :stat} (zk-list-with-stat conn path)
|
||||
txn (.transaction conn)
|
||||
first-child (first (sort children))]
|
||||
(if (not (nil? first-child))
|
||||
(try
|
||||
(do (.check txn path (:version stat))
|
||||
(.setData txn path (data/to-bytes "") -1) ; I'm just checking multitransactions
|
||||
(.delete txn (str path first-child) -1)
|
||||
(.commit txn)
|
||||
first-child)
|
||||
(catch KeeperException$BadVersionException _ nil)
|
||||
; Even if we got connection loss, delete may actually be executed.
|
||||
; This function is used for queue model, which strictly require
|
||||
; all enqueued elements to be dequeued, but allow duplicates.
|
||||
; So even in case when we not sure about delete we return first-child.
|
||||
(catch Exception _ first-child))
|
||||
nil)))
|
||||
|
||||
(defn clickhouse-alive?
|
||||
[node test]
|
||||
(info "Checking server alive on" node)
|
||||
(try
|
||||
(c/exec binary-path :client :--query "SELECT 1")
|
||||
(catch Exception _ false)))
|
||||
|
||||
(defn wait-clickhouse-alive!
|
||||
[node test & {:keys [maxtries] :or {maxtries 30}}]
|
||||
(loop [i 0]
|
||||
(cond (> i maxtries) false
|
||||
(clickhouse-alive? node test) true
|
||||
:else (do (Thread/sleep 1000) (recur (inc i))))))
|
||||
|
||||
(defn kill-clickhouse!
|
||||
[node test]
|
||||
(info "Killing server on node" node)
|
||||
(c/su
|
||||
(cu/stop-daemon! binary-path pid-file-path)
|
||||
(c/exec :rm :-fr (str data-dir "/status"))))
|
||||
|
||||
(defn start-clickhouse!
|
||||
[node test]
|
||||
(info "Starting server on node" node)
|
||||
(c/su
|
||||
(cu/start-daemon!
|
||||
{:pidfile pid-file-path
|
||||
:logfile stderr-file
|
||||
:chdir data-dir}
|
||||
binary-path
|
||||
:server
|
||||
:--config (str configs-dir "/config.xml")
|
||||
:--
|
||||
:--path (str data-dir "/")
|
||||
:--user_files_path (str data-dir "/user_files")
|
||||
:--top_level_domains_path (str data-dir "/top_level_domains")
|
||||
:--logger.log (str logs-dir "/clickhouse-server.log")
|
||||
:--logger.errorlog (str logs-dir "/clickhouse-server.err.log")
|
||||
:--test_keeper_server.snapshot_storage_path coordination-snapshots-dir
|
||||
:--test_keeper_server.logs_storage_path coordination-logs-dir)
|
||||
(wait-clickhouse-alive! node test)))
|
||||
|
||||
(defn exec-with-retries
|
||||
[retries f & args]
|
||||
(let [res (try {:value (apply f args)}
|
||||
(catch Exception e
|
||||
(if (zero? retries)
|
||||
(throw e)
|
||||
{:exception e})))]
|
||||
(if (:exception res)
|
||||
(do (Thread/sleep 1000) (recur (dec retries) f args))
|
||||
(:value res))))
|
39
tests/jepsen.nukeeper/test/jepsen/nukeeper_test.clj
Normal file
39
tests/jepsen.nukeeper/test/jepsen/nukeeper_test.clj
Normal file
@ -0,0 +1,39 @@
|
||||
(ns jepsen.nukeeper-test
|
||||
(:require [clojure.test :refer :all]
|
||||
[jepsen.nukeeper.utils :refer :all]
|
||||
[zookeeper :as zk]
|
||||
[zookeeper.data :as data])
|
||||
(:import (ch.qos.logback.classic Level)
|
||||
(org.slf4j Logger LoggerFactory)))
|
||||
|
||||
(defn multicreate
|
||||
[conn]
|
||||
(dorun (map (fn [v] (zk/create conn v :persistent? true)) (take 10 (zk-range)))))
|
||||
|
||||
(defn multidelete
|
||||
[conn]
|
||||
(dorun (map (fn [v] (zk/delete conn v)) (take 10 (zk-range)))))
|
||||
|
||||
(deftest a-test
|
||||
(testing "nukeeper connection"
|
||||
(.setLevel
|
||||
(LoggerFactory/getLogger "org.apache.zookeeper") Level/OFF)
|
||||
(let [conn (zk/connect "localhost:9181" :timeout-msec 5000)]
|
||||
;(println (take 10 (zk-range)))
|
||||
;(multidelete conn)
|
||||
;(multicreate conn)
|
||||
;(zk/create-all conn "/0")
|
||||
;(zk/create conn "/0")
|
||||
;(println (zk/children conn "/"))
|
||||
;(zk/set-data conn "/0" (data/to-bytes "777") -1)
|
||||
(println (zk-parent-path "/sasds/dasda/das"))
|
||||
(println (zk-parent-path "/sasds"))
|
||||
(zk-multi-create-many-seq-nodes conn "/a-" 5)
|
||||
(println (zk/children conn "/"))
|
||||
(println (zk-list-with-stat conn "/"))
|
||||
(println (zk-multi-delete-first-child conn "/"))
|
||||
(println (zk-list-with-stat conn "/"))
|
||||
;(Thread/sleep 5000)
|
||||
;(println "VALUE" (data/to-string (:data (zk/data conn "/0"))))
|
||||
;(is (= (data/to-string (:data (zk/data conn "/0"))) "777"))
|
||||
(zk/close conn))))
|
@ -74,7 +74,7 @@ timeout $TIMEOUT bash -c thread5 2> /dev/null &
|
||||
|
||||
wait
|
||||
|
||||
$CLICKHOUSE_CLIENT -n -q "
|
||||
DROP TABLE alter_table;
|
||||
DROP TABLE alter_table2
|
||||
"
|
||||
$CLICKHOUSE_CLIENT -n -q "DROP TABLE alter_table;" &
|
||||
$CLICKHOUSE_CLIENT -n -q "DROP TABLE alter_table2;" &
|
||||
|
||||
wait
|
||||
|
@ -583,6 +583,7 @@
|
||||
"00980_zookeeper_merge_tree_alter_settings",
|
||||
"00988_constraints_replication_zookeeper",
|
||||
"00989_parallel_parts_loading",
|
||||
"00992_system_parts_race_condition_zookeeper_long",
|
||||
"00993_system_parts_race_condition_drop_zookeeper",
|
||||
"01012_show_tables_limit",
|
||||
"01013_sync_replica_timeout_zookeeper",
|
||||
|
@ -21,6 +21,7 @@ if (NOT DEFINED ENABLE_UTILS OR ENABLE_UTILS)
|
||||
add_subdirectory (corrector_utf8)
|
||||
add_subdirectory (zookeeper-cli)
|
||||
add_subdirectory (zookeeper-test)
|
||||
add_subdirectory (nukeeper-data-dumper)
|
||||
add_subdirectory (zookeeper-dump-tree)
|
||||
add_subdirectory (zookeeper-remove-by-list)
|
||||
add_subdirectory (zookeeper-create-entry-to-download-part)
|
||||
|
2
utils/nukeeper-data-dumper/CMakeLists.txt
Normal file
2
utils/nukeeper-data-dumper/CMakeLists.txt
Normal file
@ -0,0 +1,2 @@
|
||||
add_executable(nukeeper-data-dumper main.cpp)
|
||||
target_link_libraries(nukeeper-data-dumper PRIVATE dbms)
|
87
utils/nukeeper-data-dumper/main.cpp
Normal file
87
utils/nukeeper-data-dumper/main.cpp
Normal file
@ -0,0 +1,87 @@
|
||||
#include <Poco/ConsoleChannel.h>
|
||||
#include <Poco/Logger.h>
|
||||
#include <Coordination/NuKeeperStateMachine.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperCommon.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperIO.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <libnuraft/nuraft.hxx> // Y_IGNORE
|
||||
#include <Coordination/NuKeeperLogStore.h>
|
||||
#include <Coordination/Changelog.h>
|
||||
#include <common/logger_useful.h>
|
||||
|
||||
using namespace Coordination;
|
||||
using namespace DB;
|
||||
|
||||
void dumpMachine(std::shared_ptr<NuKeeperStateMachine> machine)
|
||||
{
|
||||
auto & storage = machine->getStorage();
|
||||
std::queue<std::string> keys;
|
||||
keys.push("/");
|
||||
|
||||
while (!keys.empty())
|
||||
{
|
||||
auto key = keys.front();
|
||||
keys.pop();
|
||||
std::cout << key << "\n";
|
||||
auto value = storage.container.getValue(key);
|
||||
std::cout << "\tStat: {version: " << value.stat.version <<
|
||||
", mtime: " << value.stat.mtime <<
|
||||
", emphemeralOwner: " << value.stat.ephemeralOwner <<
|
||||
", czxid: " << value.stat.czxid <<
|
||||
", mzxid: " << value.stat.mzxid <<
|
||||
", numChildren: " << value.stat.numChildren <<
|
||||
", dataLength: " << value.stat.dataLength <<
|
||||
"}" << std::endl;
|
||||
std::cout << "\tData: " << storage.container.getValue(key).data << std::endl;
|
||||
|
||||
for (const auto & child : value.children)
|
||||
{
|
||||
if (key == "/")
|
||||
keys.push(key + child);
|
||||
else
|
||||
keys.push(key + "/" + child);
|
||||
}
|
||||
}
|
||||
std::cout << std::flush;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
if (argc != 3)
|
||||
{
|
||||
std::cerr << "usage: " << argv[0] << " snapshotpath logpath" << std::endl;
|
||||
return 3;
|
||||
}
|
||||
else
|
||||
{
|
||||
Poco::AutoPtr<Poco::ConsoleChannel> channel(new Poco::ConsoleChannel(std::cerr));
|
||||
Poco::Logger::root().setChannel(channel);
|
||||
Poco::Logger::root().setLevel("trace");
|
||||
}
|
||||
auto * logger = &Poco::Logger::get("nukeeper-dumper");
|
||||
ResponsesQueue queue;
|
||||
SnapshotsQueue snapshots_queue{1};
|
||||
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
|
||||
auto state_machine = std::make_shared<NuKeeperStateMachine>(queue, snapshots_queue, argv[1], settings);
|
||||
state_machine->init();
|
||||
size_t last_commited_index = state_machine->last_commit_index();
|
||||
|
||||
LOG_INFO(logger, "Last committed index: {}", last_commited_index);
|
||||
|
||||
DB::NuKeeperLogStore changelog(argv[2], 10000000, true);
|
||||
changelog.init(last_commited_index, 10000000000UL); /// collect all logs
|
||||
if (changelog.size() == 0)
|
||||
LOG_INFO(logger, "Changelog empty");
|
||||
else
|
||||
LOG_INFO(logger, "Last changelog entry {}", changelog.next_slot() - 1);
|
||||
|
||||
for (size_t i = last_commited_index + 1; i < changelog.next_slot(); ++i)
|
||||
{
|
||||
if (changelog.entry_at(i)->get_val_type() == nuraft::log_val_type::app_log)
|
||||
state_machine->commit(i, changelog.entry_at(i)->get_buf());
|
||||
}
|
||||
|
||||
dumpMachine(state_machine);
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue
Block a user