Merge pull request #20585 from ClickHouse/persistent_nukeeper_log_storage

Persistent coordination log storage
This commit is contained in:
alesapin 2021-02-25 10:06:03 +03:00 committed by GitHub
commit 8f81dce32f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 2513 additions and 497 deletions

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit 7adf7ae33e7d5c307342431b577c8ab1025ee793
Subproject commit 9a0d78de4b90546368d954b6434f0e9a823e8d80

View File

@ -70,6 +70,7 @@ function start_server
--path "$FASTTEST_DATA"
--user_files_path "$FASTTEST_DATA/user_files"
--top_level_domains_path "$FASTTEST_DATA/top_level_domains"
--test_keeper_server.log_storage_path "$FASTTEST_DATA/coordination"
)
clickhouse-server "${opts[@]}" &>> "$FASTTEST_OUTPUT/server.log" &
server_pid=$!
@ -375,7 +376,7 @@ function run_tests
stop_server ||:
# Clean the data so that there is no interference from the previous test run.
rm -rf "$FASTTEST_DATA"/{{meta,}data,user_files} ||:
rm -rf "$FASTTEST_DATA"/{{meta,}data,user_files,coordination} ||:
start_server

View File

@ -0,0 +1,557 @@
#include <Coordination/Changelog.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFile.h>
#include <filesystem>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <Common/Exception.h>
#include <Common/SipHash.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CHECKSUM_DOESNT_MATCH;
extern const int CORRUPTED_DATA;
extern const int UNKNOWN_FORMAT_VERSION;
extern const int LOGICAL_ERROR;
}
namespace
{
constexpr auto DEFAULT_PREFIX = "changelog";
std::string formatChangelogPath(const std::string & prefix, const ChangelogFileDescription & name)
{
std::filesystem::path path(prefix);
path /= std::filesystem::path(name.prefix + "_" + std::to_string(name.from_log_index) + "_" + std::to_string(name.to_log_index) + ".bin");
return path;
}
ChangelogFileDescription getChangelogFileDescription(const std::string & path_str)
{
std::filesystem::path path(path_str);
std::string filename = path.stem();
Strings filename_parts;
boost::split(filename_parts, filename, boost::is_any_of("_"));
if (filename_parts.size() < 3)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Invalid changelog {}", path_str);
ChangelogFileDescription result;
result.prefix = filename_parts[0];
result.from_log_index = parse<size_t>(filename_parts[1]);
result.to_log_index = parse<size_t>(filename_parts[2]);
result.path = path_str;
return result;
}
LogEntryPtr makeClone(const LogEntryPtr & entry)
{
return cs_new<nuraft::log_entry>(entry->get_term(), nuraft::buffer::clone(entry->get_buf()), entry->get_val_type());
}
Checksum computeRecordChecksum(const ChangelogRecord & record)
{
SipHash hash;
hash.update(record.header.version);
hash.update(record.header.index);
hash.update(record.header.term);
hash.update(record.header.value_type);
hash.update(record.header.blob_size);
if (record.header.blob_size != 0)
hash.update(reinterpret_cast<char *>(record.blob->data_begin()), record.blob->size());
return hash.get64();
}
}
class ChangelogWriter
{
public:
ChangelogWriter(const std::string & filepath_, WriteMode mode, size_t start_index_)
: filepath(filepath_)
, plain_buf(filepath, DBMS_DEFAULT_BUFFER_SIZE, mode == WriteMode::Rewrite ? -1 : (O_APPEND | O_CREAT | O_WRONLY))
, start_index(start_index_)
{}
off_t appendRecord(ChangelogRecord && record, bool sync)
{
off_t result = plain_buf.count();
writeIntBinary(computeRecordChecksum(record), plain_buf);
writeIntBinary(record.header.version, plain_buf);
writeIntBinary(record.header.index, plain_buf);
writeIntBinary(record.header.term, plain_buf);
writeIntBinary(record.header.value_type, plain_buf);
writeIntBinary(record.header.blob_size, plain_buf);
if (record.header.blob_size != 0)
plain_buf.write(reinterpret_cast<char *>(record.blob->data_begin()), record.blob->size());
entries_written++;
if (sync)
plain_buf.sync();
return result;
}
void truncateToLength(off_t new_length)
{
flush();
plain_buf.truncate(new_length);
plain_buf.seek(new_length, SEEK_SET);
}
void flush()
{
plain_buf.sync();
}
size_t getEntriesWritten() const
{
return entries_written;
}
void setEntriesWritten(size_t entries_written_)
{
entries_written = entries_written_;
}
size_t getStartIndex() const
{
return start_index;
}
void setStartIndex(size_t start_index_)
{
start_index = start_index_;
}
private:
std::string filepath;
WriteBufferFromFile plain_buf;
size_t entries_written = 0;
size_t start_index;
};
struct ChangelogReadResult
{
size_t entries_read;
off_t last_position;
bool error;
};
class ChangelogReader
{
public:
explicit ChangelogReader(const std::string & filepath_)
: filepath(filepath_)
, read_buf(filepath)
{}
ChangelogReadResult readChangelog(IndexToLogEntry & logs, size_t start_log_index, IndexToOffset & index_to_offset, Poco::Logger * log)
{
size_t previous_index = 0;
ChangelogReadResult result{};
try
{
while (!read_buf.eof())
{
result.last_position = read_buf.count();
Checksum record_checksum;
readIntBinary(record_checksum, read_buf);
/// Initialization is required, otherwise checksums may fail
ChangelogRecord record;
readIntBinary(record.header.version, read_buf);
readIntBinary(record.header.index, read_buf);
readIntBinary(record.header.term, read_buf);
readIntBinary(record.header.value_type, read_buf);
readIntBinary(record.header.blob_size, read_buf);
if (record.header.version > CURRENT_CHANGELOG_VERSION)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported changelog version {} on path {}", record.header.version, filepath);
if (record.header.blob_size != 0)
{
auto buffer = nuraft::buffer::alloc(record.header.blob_size);
auto * buffer_begin = reinterpret_cast<char *>(buffer->data_begin());
read_buf.readStrict(buffer_begin, record.header.blob_size);
record.blob = buffer;
}
else
record.blob = nullptr;
if (previous_index != 0 && previous_index + 1 != record.header.index)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Previous log entry {}, next log entry {}, seems like some entries skipped", previous_index, record.header.index);
previous_index = record.header.index;
Checksum checksum = computeRecordChecksum(record);
if (checksum != record_checksum)
{
throw Exception(ErrorCodes::CHECKSUM_DOESNT_MATCH,
"Checksums doesn't match for log {} (version {}), index {}, blob_size {}",
filepath, record.header.version, record.header.index, record.header.blob_size);
}
if (logs.count(record.header.index) != 0)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Duplicated index id {} in log {}", record.header.index, filepath);
result.entries_read += 1;
if (record.header.index < start_log_index)
continue;
auto log_entry = nuraft::cs_new<nuraft::log_entry>(record.header.term, record.blob, record.header.value_type);
logs.emplace(record.header.index, log_entry);
index_to_offset[record.header.index] = result.last_position;
if (result.entries_read % 50000 == 0)
LOG_TRACE(log, "Reading changelog from path {}, entries {}", filepath, result.entries_read);
}
}
catch (const Exception & ex)
{
if (ex.code() == ErrorCodes::UNKNOWN_FORMAT_VERSION)
throw ex;
result.error = true;
LOG_WARNING(log, "Cannot completely read changelog on path {}, error: {}", filepath, ex.message());
}
catch (...)
{
result.error = true;
tryLogCurrentException(log);
}
LOG_TRACE(log, "Totally read from changelog {} {} entries", filepath, result.entries_read);
return result;
}
private:
std::string filepath;
ReadBufferFromFile read_buf;
};
Changelog::Changelog(const std::string & changelogs_dir_, size_t rotate_interval_, Poco::Logger * log_)
: changelogs_dir(changelogs_dir_)
, rotate_interval(rotate_interval_)
, log(log_)
{
namespace fs = std::filesystem;
if (!fs::exists(changelogs_dir))
fs::create_directories(changelogs_dir);
for (const auto & p : fs::directory_iterator(changelogs_dir))
{
auto file_description = getChangelogFileDescription(p.path());
existing_changelogs[file_description.from_log_index] = file_description;
}
}
void Changelog::readChangelogAndInitWriter(size_t from_log_index)
{
start_index = from_log_index == 0 ? 1 : from_log_index;
size_t total_read = 0;
size_t entries_in_last = 0;
size_t incomplete_log_index = 0;
ChangelogReadResult result{};
bool started = false;
for (const auto & [changelog_start_index, changelog_description] : existing_changelogs)
{
entries_in_last = changelog_description.to_log_index - changelog_description.from_log_index + 1;
if (changelog_description.to_log_index >= from_log_index)
{
if (!started)
{
if (changelog_description.from_log_index > start_index)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Cannot read changelog from index {}, smallest available index {}", start_index, changelog_description.from_log_index);
started = true;
}
ChangelogReader reader(changelog_description.path);
result = reader.readChangelog(logs, from_log_index, index_to_start_pos, log);
total_read += result.entries_read;
/// May happen after truncate, crash or simply unfinished log
if (result.entries_read < entries_in_last)
{
incomplete_log_index = changelog_start_index;
break;
}
}
}
if (!started && start_index != 1)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Required to read data from {}, but we don't have any active changelogs", from_log_index);
if (incomplete_log_index != 0)
{
/// All subsequent logs shouldn't exist. But they may exist if we crashed after writeAt started. Remove them.
for (auto itr = existing_changelogs.upper_bound(incomplete_log_index); itr != existing_changelogs.end();)
{
LOG_WARNING(log, "Removing changelog {}, because it's goes after broken changelog entry", itr->second.path);
std::filesystem::remove(itr->second.path);
itr = existing_changelogs.erase(itr);
}
/// Continue to write into existing log
if (!existing_changelogs.empty())
{
auto description = existing_changelogs.rbegin()->second;
LOG_TRACE(log, "Continue to write into {}", description.path);
current_writer = std::make_unique<ChangelogWriter>(description.path, WriteMode::Append, description.from_log_index);
current_writer->setEntriesWritten(result.entries_read);
/// Truncate all broken entries from log
if (result.error)
{
LOG_WARNING(log, "Read finished with error, truncating all broken log entries");
current_writer->truncateToLength(result.last_position);
}
}
}
/// Start new log if we don't initialize writer from previous log
if (!current_writer)
rotate(start_index + total_read);
}
void Changelog::rotate(size_t new_start_log_index)
{
//// doesn't exist on init
if (current_writer)
current_writer->flush();
ChangelogFileDescription new_description;
new_description.prefix = DEFAULT_PREFIX;
new_description.from_log_index = new_start_log_index;
new_description.to_log_index = new_start_log_index + rotate_interval - 1;
new_description.path = formatChangelogPath(changelogs_dir, new_description);
LOG_TRACE(log, "Starting new changelog {}", new_description.path);
existing_changelogs[new_start_log_index] = new_description;
current_writer = std::make_unique<ChangelogWriter>(new_description.path, WriteMode::Rewrite, new_start_log_index);
}
ChangelogRecord Changelog::buildRecord(size_t index, const LogEntryPtr & log_entry)
{
ChangelogRecord record;
record.header.version = ChangelogVersion::V0;
record.header.index = index;
record.header.term = log_entry->get_term();
record.header.value_type = log_entry->get_val_type();
auto buffer = log_entry->get_buf_ptr();
if (buffer)
record.header.blob_size = buffer->size();
else
record.header.blob_size = 0;
record.blob = buffer;
return record;
}
void Changelog::appendEntry(size_t index, const LogEntryPtr & log_entry, bool force_sync)
{
if (!current_writer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Changelog must be initialized before appending records");
if (logs.empty())
start_index = index;
if (current_writer->getEntriesWritten() == rotate_interval)
rotate(index);
auto offset = current_writer->appendRecord(buildRecord(index, log_entry), force_sync);
if (!index_to_start_pos.try_emplace(index, offset).second)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Record with index {} already exists", index);
logs[index] = makeClone(log_entry);
}
void Changelog::writeAt(size_t index, const LogEntryPtr & log_entry, bool force_sync)
{
if (index_to_start_pos.count(index) == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot write at index {} because changelog doesn't contain it", index);
bool go_to_previous_file = index < current_writer->getStartIndex();
if (go_to_previous_file)
{
auto index_changelog = existing_changelogs.lower_bound(index);
ChangelogFileDescription description;
if (index_changelog->first == index)
description = index_changelog->second;
else
description = std::prev(index_changelog)->second;
current_writer = std::make_unique<ChangelogWriter>(description.path, WriteMode::Append, index_changelog->first);
current_writer->setEntriesWritten(description.to_log_index - description.from_log_index + 1);
}
auto entries_written = current_writer->getEntriesWritten();
current_writer->truncateToLength(index_to_start_pos[index]);
if (go_to_previous_file)
{
/// Remove all subsequent files
auto to_remove_itr = existing_changelogs.upper_bound(index);
for (auto itr = to_remove_itr; itr != existing_changelogs.end();)
{
std::filesystem::remove(itr->second.path);
itr = existing_changelogs.erase(itr);
}
}
/// Remove redundant logs from memory
for (size_t i = index; ; ++i)
{
auto log_itr = logs.find(i);
if (log_itr == logs.end())
break;
logs.erase(log_itr);
index_to_start_pos.erase(i);
entries_written--;
}
current_writer->setEntriesWritten(entries_written);
appendEntry(index, log_entry, force_sync);
}
void Changelog::compact(size_t up_to_log_index)
{
for (auto itr = existing_changelogs.begin(); itr != existing_changelogs.end();)
{
/// Remove all completely outdated changelog files
if (itr->second.to_log_index <= up_to_log_index)
{
LOG_INFO(log, "Removing changelog {} because of compaction", itr->second.path);
std::erase_if(index_to_start_pos, [right_index = itr->second.to_log_index] (const auto & item) { return item.first <= right_index; });
std::filesystem::remove(itr->second.path);
itr = existing_changelogs.erase(itr);
}
else /// Files are ordered, so all subsequent should exist
break;
}
start_index = up_to_log_index + 1;
std::erase_if(logs, [up_to_log_index] (const auto & item) { return item.first <= up_to_log_index; });
}
LogEntryPtr Changelog::getLastEntry() const
{
static LogEntryPtr fake_entry = nuraft::cs_new<nuraft::log_entry>(0, nuraft::buffer::alloc(sizeof(size_t)));
size_t next_index = getNextEntryIndex() - 1;
auto entry = logs.find(next_index);
if (entry == logs.end())
return fake_entry;
return entry->second;
}
LogEntriesPtr Changelog::getLogEntriesBetween(size_t start, size_t end)
{
LogEntriesPtr ret = nuraft::cs_new<std::vector<nuraft::ptr<nuraft::log_entry>>>();
ret->resize(end - start);
size_t result_pos = 0;
for (size_t i = start; i < end; ++i)
{
(*ret)[result_pos] = entryAt(i);
result_pos++;
}
return ret;
}
LogEntryPtr Changelog::entryAt(size_t index)
{
nuraft::ptr<nuraft::log_entry> src = nullptr;
auto entry = logs.find(index);
if (entry == logs.end())
return nullptr;
src = entry->second;
return src;
}
nuraft::ptr<nuraft::buffer> Changelog::serializeEntriesToBuffer(size_t index, int32_t count)
{
std::vector<nuraft::ptr<nuraft::buffer>> returned_logs;
size_t size_total = 0;
for (size_t i = index; i < index + count; ++i)
{
auto entry = logs.find(i);
if (entry == logs.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Don't have log entry {}", i);
nuraft::ptr<nuraft::buffer> buf = entry->second->serialize();
size_total += buf->size();
returned_logs.push_back(buf);
}
nuraft::ptr<nuraft::buffer> buf_out = nuraft::buffer::alloc(sizeof(int32_t) + count * sizeof(int32_t) + size_total);
buf_out->pos(0);
buf_out->put(static_cast<int32_t>(count));
for (auto & entry : returned_logs)
{
nuraft::ptr<nuraft::buffer> & bb = entry;
buf_out->put(static_cast<int32_t>(bb->size()));
buf_out->put(*bb);
}
return buf_out;
}
void Changelog::applyEntriesFromBuffer(size_t index, nuraft::buffer & buffer, bool force_sync)
{
buffer.pos(0);
int num_logs = buffer.get_int();
for (int i = 0; i < num_logs; ++i)
{
size_t cur_index = index + i;
int buf_size = buffer.get_int();
nuraft::ptr<nuraft::buffer> buf_local = nuraft::buffer::alloc(buf_size);
buffer.get(buf_local);
LogEntryPtr log_entry = nuraft::log_entry::deserialize(*buf_local);
if (i == 0 && logs.count(cur_index))
writeAt(cur_index, log_entry, force_sync);
else
appendEntry(cur_index, log_entry, force_sync);
}
}
void Changelog::flush()
{
current_writer->flush();
}
Changelog::~Changelog()
{
try
{
if (current_writer)
current_writer->flush();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}

View File

@ -0,0 +1,136 @@
#pragma once
#include <libnuraft/nuraft.hxx> // Y_IGNORE
#include <city.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/HashingWriteBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Disks/IDisk.h>
namespace DB
{
using Checksum = UInt64;
using LogEntryPtr = nuraft::ptr<nuraft::log_entry>;
using LogEntries = std::vector<LogEntryPtr>;
using LogEntriesPtr = nuraft::ptr<LogEntries>;
using BufferPtr = nuraft::ptr<nuraft::buffer>;
using IndexToOffset = std::unordered_map<size_t, off_t>;
using IndexToLogEntry = std::unordered_map<size_t, LogEntryPtr>;
enum class ChangelogVersion : uint8_t
{
V0 = 0,
};
static constexpr auto CURRENT_CHANGELOG_VERSION = ChangelogVersion::V0;
struct ChangelogRecordHeader
{
ChangelogVersion version = CURRENT_CHANGELOG_VERSION;
size_t index; /// entry log number
size_t term;
nuraft::log_val_type value_type;
size_t blob_size;
};
/// Changelog record on disk
struct ChangelogRecord
{
ChangelogRecordHeader header;
nuraft::ptr<nuraft::buffer> blob;
};
/// changelog_fromindex_toindex.bin
/// [fromindex, toindex] <- inclusive
struct ChangelogFileDescription
{
std::string prefix;
size_t from_log_index;
size_t to_log_index;
std::string path;
};
class ChangelogWriter;
/// Simplest changelog with files rotation.
/// No compression, no metadata, just entries with headers one by one
/// Able to read broken files/entries and discard them.
class Changelog
{
public:
Changelog(const std::string & changelogs_dir_, size_t rotate_interval_, Poco::Logger * log_);
/// Read changelog from files on changelogs_dir_ skipping all entries before from_log_index
/// Truncate broken entries, remove files after broken entries.
void readChangelogAndInitWriter(size_t from_log_index);
/// Add entry to log with index. Call fsync if force_sync true.
void appendEntry(size_t index, const LogEntryPtr & log_entry, bool force_sync);
/// Write entry at index and truncate all subsequent entries.
void writeAt(size_t index, const LogEntryPtr & log_entry, bool force_sync);
/// Remove log files with to_log_index <= up_to_log_index.
void compact(size_t up_to_log_index);
size_t getNextEntryIndex() const
{
return start_index + logs.size();
}
size_t getStartIndex() const
{
return start_index;
}
/// Last entry in log, or fake entry with term 0 if log is empty
LogEntryPtr getLastEntry() const;
/// Return log entries between [start, end)
LogEntriesPtr getLogEntriesBetween(size_t start_index, size_t end_index);
/// Return entry at position index
LogEntryPtr entryAt(size_t index);
/// Serialize entries from index into buffer
BufferPtr serializeEntriesToBuffer(size_t index, int32_t count);
/// Apply entries from buffer overriding existing entries
void applyEntriesFromBuffer(size_t index, nuraft::buffer & buffer, bool force_sync);
/// Fsync log to disk
void flush();
size_t size() const
{
return logs.size();
}
/// Fsync log to disk
~Changelog();
private:
/// Pack log_entry into changelog record
static ChangelogRecord buildRecord(size_t index, const LogEntryPtr & log_entry);
/// Starts new file [new_start_log_index, new_start_log_index + rotate_interval]
void rotate(size_t new_start_log_index);
private:
const std::string changelogs_dir;
const size_t rotate_interval;
Poco::Logger * log;
std::map<size_t, ChangelogFileDescription> existing_changelogs;
std::unique_ptr<ChangelogWriter> current_writer;
IndexToOffset index_to_start_pos;
IndexToLogEntry logs;
size_t start_index = 0;
};
}

View File

@ -22,13 +22,15 @@ struct Settings;
M(Milliseconds, heart_beat_interval_ms, 500, "Heartbeat interval between quorum nodes", 0) \
M(Milliseconds, election_timeout_lower_bound_ms, 1000, "Lower bound of election timer (avoid too often leader elections)", 0) \
M(Milliseconds, election_timeout_upper_bound_ms, 2000, "Lower bound of election timer (avoid too often leader elections)", 0) \
M(UInt64, reserved_log_items, 5000, "How many log items to store (don't remove during compaction)", 0) \
M(UInt64, snapshot_distance, 5000, "How many log items we have to collect to write new snapshot", 0) \
M(UInt64, reserved_log_items, 50000, "How many log items to store (don't remove during compaction)", 0) \
M(UInt64, snapshot_distance, 100000, "How many log items we have to collect to write new snapshot", 0) \
M(UInt64, max_stored_snapshots, 3, "How many snapshots we want to store", 0) \
M(Bool, auto_forwarding, true, "Allow to forward write requests from followers to leader", 0) \
M(Milliseconds, shutdown_timeout, 5000, "How many time we will until RAFT shutdown", 0) \
M(Milliseconds, startup_timeout, 30000, "How many time we will until RAFT to start", 0) \
M(LogsLevel, raft_logs_level, LogsLevel::information, "Log internal RAFT logs into main server log level. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0)
M(LogsLevel, raft_logs_level, LogsLevel::information, "Log internal RAFT logs into main server log level. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \
M(UInt64, rotate_log_storage_interval, 100000, "How many records will be stored in one log storage file", 0) \
M(Bool, force_sync, true, " Call fsync on each change in RAFT changelog", 0)
DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS)

View File

@ -72,12 +72,12 @@ nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> InMemoryLogStore::log_e
ret->resize(end - start);
size_t cc = 0;
for (size_t ii = start; ii < end; ++ii)
for (size_t i = start; i < end; ++i)
{
nuraft::ptr<nuraft::log_entry> src = nullptr;
{
std::lock_guard<std::mutex> l(logs_lock);
auto entry = logs.find(ii);
auto entry = logs.find(i);
if (entry == logs.end())
{
entry = logs.find(0);
@ -152,9 +152,9 @@ void InMemoryLogStore::apply_pack(size_t index, nuraft::buffer & pack)
pack.pos(0);
Int32 num_logs = pack.get_int();
for (Int32 ii = 0; ii < num_logs; ++ii)
for (Int32 i = 0; i < num_logs; ++i)
{
size_t cur_idx = index + ii;
size_t cur_idx = index + i;
Int32 buf_size = pack.get_int();
nuraft::ptr<nuraft::buffer> buf_local = nuraft::buffer::alloc(buf_size);

View File

@ -9,12 +9,26 @@ namespace DB
class LoggerWrapper : public nuraft::logger
{
private:
static inline const std::unordered_map<LogsLevel, Poco::Message::Priority> LEVELS =
{
{LogsLevel::trace, Poco::Message::Priority::PRIO_TRACE},
{LogsLevel::debug, Poco::Message::Priority::PRIO_DEBUG},
{LogsLevel::information, Poco::Message::PRIO_INFORMATION},
{LogsLevel::warning, Poco::Message::PRIO_WARNING},
{LogsLevel::error, Poco::Message::PRIO_ERROR},
{LogsLevel::fatal, Poco::Message::PRIO_FATAL}
};
static inline const int LEVEL_MAX = static_cast<int>(LogsLevel::trace);
static inline const int LEVEL_MIN = static_cast<int>(LogsLevel::none);
public:
LoggerWrapper(const std::string & name, LogsLevel level_)
: log(&Poco::Logger::get(name))
, level(static_cast<int>(level_))
, level(level_)
{
log->setLevel(level);
log->setLevel(static_cast<int>(LEVELS.at(level)));
}
void put_details(
@ -24,24 +38,26 @@ public:
size_t /* line_number */,
const std::string & msg) override
{
LOG_IMPL(log, static_cast<DB::LogsLevel>(level_), static_cast<Poco::Message::Priority>(level_), msg);
LogsLevel db_level = static_cast<LogsLevel>(level_);
LOG_IMPL(log, db_level, LEVELS.at(db_level), msg);
}
void set_level(int level_) override
{
level_ = std::min(6, std::max(1, level_));
log->setLevel(level_);
level = level_;
level_ = std::min(LEVEL_MAX, std::max(LEVEL_MIN, level_));
level = static_cast<LogsLevel>(level_);
log->setLevel(static_cast<int>(LEVELS.at(level)));
}
int get_level() override
{
return level;
LogsLevel lvl = level;
return static_cast<int>(lvl);
}
private:
Poco::Logger * log;
std::atomic<int> level;
std::atomic<LogsLevel> level;
};
}

View File

@ -0,0 +1,105 @@
#include <Coordination/NuKeeperLogStore.h>
namespace DB
{
NuKeeperLogStore::NuKeeperLogStore(const std::string & changelogs_path, size_t rotate_interval_, bool force_sync_)
: log(&Poco::Logger::get("NuKeeperLogStore"))
, changelog(changelogs_path, rotate_interval_, log)
, force_sync(force_sync_)
{
}
size_t NuKeeperLogStore::start_index() const
{
std::lock_guard lock(changelog_lock);
return changelog.getStartIndex();
}
void NuKeeperLogStore::init(size_t from_log_idx)
{
std::lock_guard lock(changelog_lock);
changelog.readChangelogAndInitWriter(from_log_idx);
}
size_t NuKeeperLogStore::next_slot() const
{
std::lock_guard lock(changelog_lock);
return changelog.getNextEntryIndex();
}
nuraft::ptr<nuraft::log_entry> NuKeeperLogStore::last_entry() const
{
std::lock_guard lock(changelog_lock);
return changelog.getLastEntry();
}
size_t NuKeeperLogStore::append(nuraft::ptr<nuraft::log_entry> & entry)
{
std::lock_guard lock(changelog_lock);
size_t idx = changelog.getNextEntryIndex();
changelog.appendEntry(idx, entry, force_sync);
return idx;
}
void NuKeeperLogStore::write_at(size_t index, nuraft::ptr<nuraft::log_entry> & entry)
{
std::lock_guard lock(changelog_lock);
changelog.writeAt(index, entry, force_sync);
}
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> NuKeeperLogStore::log_entries(size_t start, size_t end)
{
std::lock_guard lock(changelog_lock);
return changelog.getLogEntriesBetween(start, end);
}
nuraft::ptr<nuraft::log_entry> NuKeeperLogStore::entry_at(size_t index)
{
std::lock_guard lock(changelog_lock);
return changelog.entryAt(index);
}
size_t NuKeeperLogStore::term_at(size_t index)
{
std::lock_guard lock(changelog_lock);
auto entry = changelog.entryAt(index);
if (entry)
return entry->get_term();
return 0;
}
nuraft::ptr<nuraft::buffer> NuKeeperLogStore::pack(size_t index, int32_t cnt)
{
std::lock_guard lock(changelog_lock);
return changelog.serializeEntriesToBuffer(index, cnt);
}
bool NuKeeperLogStore::compact(size_t last_log_index)
{
std::lock_guard lock(changelog_lock);
changelog.compact(last_log_index);
return true;
}
bool NuKeeperLogStore::flush()
{
std::lock_guard lock(changelog_lock);
changelog.flush();
return true;
}
void NuKeeperLogStore::apply_pack(size_t index, nuraft::buffer & pack)
{
std::lock_guard lock(changelog_lock);
changelog.applyEntriesFromBuffer(index, pack, force_sync);
}
size_t NuKeeperLogStore::size() const
{
std::lock_guard lock(changelog_lock);
return changelog.size();
}
}

View File

@ -0,0 +1,52 @@
#pragma once
#include <libnuraft/log_store.hxx> // Y_IGNORE
#include <map>
#include <mutex>
#include <Core/Types.h>
#include <Coordination/Changelog.h>
#include <common/logger_useful.h>
namespace DB
{
class NuKeeperLogStore : public nuraft::log_store
{
public:
NuKeeperLogStore(const std::string & changelogs_path, size_t rotate_interval_, bool force_sync_);
void init(size_t from_log_idx);
size_t start_index() const override;
size_t next_slot() const override;
nuraft::ptr<nuraft::log_entry> last_entry() const override;
size_t append(nuraft::ptr<nuraft::log_entry> & entry) override;
void write_at(size_t index, nuraft::ptr<nuraft::log_entry> & entry) override;
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> log_entries(size_t start, size_t end) override;
nuraft::ptr<nuraft::log_entry> entry_at(size_t index) override;
size_t term_at(size_t index) override;
nuraft::ptr<nuraft::buffer> pack(size_t index, int32_t cnt) override;
void apply_pack(size_t index, nuraft::buffer & pack) override;
bool compact(size_t last_log_index) override;
bool flush() override;
size_t size() const;
private:
mutable std::mutex changelog_lock;
Poco::Logger * log;
Changelog changelog;
bool force_sync;
};
}

View File

@ -1,7 +1,7 @@
#include <Coordination/NuKeeperServer.h>
#include <Coordination/LoggerWrapper.h>
#include <Coordination/NuKeeperStateMachine.h>
#include <Coordination/InMemoryStateManager.h>
#include <Coordination/NuKeeperStateManager.h>
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <Coordination/ReadBufferFromNuraftBuffer.h>
#include <IO/ReadHelpers.h>
@ -26,13 +26,16 @@ NuKeeperServer::NuKeeperServer(
: server_id(server_id_)
, coordination_settings(coordination_settings_)
, state_machine(nuraft::cs_new<NuKeeperStateMachine>(responses_queue_, coordination_settings))
, state_manager(nuraft::cs_new<InMemoryStateManager>(server_id, "test_keeper_server.raft_configuration", config))
, state_manager(nuraft::cs_new<NuKeeperStateManager>(server_id, "test_keeper_server", config, coordination_settings))
, responses_queue(responses_queue_)
{
}
void NuKeeperServer::startup()
{
state_manager->loadLogStore(state_machine->last_commit_index());
nuraft::raft_params params;
params.heart_beat_interval_ = coordination_settings->heart_beat_interval_ms.totalMilliseconds();
params.election_timeout_lower_bound_ = coordination_settings->election_timeout_lower_bound_ms.totalMilliseconds();
@ -64,6 +67,7 @@ void NuKeeperServer::startup()
void NuKeeperServer::shutdown()
{
state_machine->shutdownStorage();
state_manager->flushLogStore();
if (!launcher.shutdown(coordination_settings->shutdown_timeout.totalSeconds()))
LOG_WARNING(&Poco::Logger::get("NuKeeperServer"), "Failed to shutdown RAFT server in {} seconds", 5);
}
@ -157,7 +161,7 @@ bool NuKeeperServer::isLeaderAlive() const
nuraft::cb_func::ReturnCode NuKeeperServer::callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * /* param */)
{
if (type == nuraft::cb_func::Type::BecomeFresh || type == nuraft::cb_func::Type::BecomeLeader)
if ((type == nuraft::cb_func::InitialBatchCommited && isLeader()) || type == nuraft::cb_func::BecomeFresh)
{
std::unique_lock lock(initialized_mutex);
initialized_flag = true;

View File

@ -2,7 +2,7 @@
#include <libnuraft/nuraft.hxx> // Y_IGNORE
#include <Coordination/InMemoryLogStore.h>
#include <Coordination/InMemoryStateManager.h>
#include <Coordination/NuKeeperStateManager.h>
#include <Coordination/NuKeeperStateMachine.h>
#include <Coordination/NuKeeperStorage.h>
#include <Coordination/CoordinationSettings.h>
@ -20,7 +20,7 @@ private:
nuraft::ptr<NuKeeperStateMachine> state_machine;
nuraft::ptr<InMemoryStateManager> state_manager;
nuraft::ptr<NuKeeperStateManager> state_manager;
nuraft::raft_launcher launcher;

View File

@ -46,7 +46,7 @@ NuKeeperStateMachine::NuKeeperStateMachine(ResponsesQueue & responses_queue_, co
, storage(coordination_settings->dead_session_check_period_ms.totalMilliseconds())
, responses_queue(responses_queue_)
, last_committed_idx(0)
, log(&Poco::Logger::get("NuRaftStateMachine"))
, log(&Poco::Logger::get("NuKeeperStateMachine"))
{
LOG_DEBUG(log, "Created nukeeper state machine");
}

View File

@ -1,4 +1,4 @@
#include <Coordination/InMemoryStateManager.h>
#include <Coordination/NuKeeperStateManager.h>
#include <Common/Exception.h>
namespace DB
@ -9,30 +9,34 @@ namespace ErrorCodes
extern const int RAFT_ERROR;
}
InMemoryStateManager::InMemoryStateManager(int server_id_, const std::string & host, int port)
NuKeeperStateManager::NuKeeperStateManager(int server_id_, const std::string & host, int port, const std::string & logs_path)
: my_server_id(server_id_)
, my_port(port)
, log_store(nuraft::cs_new<InMemoryLogStore>())
, log_store(nuraft::cs_new<NuKeeperLogStore>(logs_path, 5000, false))
, cluster_config(nuraft::cs_new<nuraft::cluster_config>())
{
auto peer_config = nuraft::cs_new<nuraft::srv_config>(my_server_id, host + ":" + std::to_string(port));
cluster_config->get_servers().push_back(peer_config);
}
InMemoryStateManager::InMemoryStateManager(
NuKeeperStateManager::NuKeeperStateManager(
int my_server_id_,
const std::string & config_prefix,
const Poco::Util::AbstractConfiguration & config)
const Poco::Util::AbstractConfiguration & config,
const CoordinationSettingsPtr & coordination_settings)
: my_server_id(my_server_id_)
, log_store(nuraft::cs_new<InMemoryLogStore>())
, log_store(nuraft::cs_new<NuKeeperLogStore>(
config.getString(config_prefix + ".log_storage_path"),
coordination_settings->rotate_log_storage_interval, coordination_settings->force_sync))
, cluster_config(nuraft::cs_new<nuraft::cluster_config>())
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
config.keys(config_prefix + ".raft_configuration", keys);
for (const auto & server_key : keys)
{
std::string full_prefix = config_prefix + "." + server_key;
std::string full_prefix = config_prefix + ".raft_configuration." + server_key;
int server_id = config.getInt(full_prefix + ".id");
std::string hostname = config.getString(full_prefix + ".hostname");
int port = config.getInt(full_prefix + ".port");
@ -53,13 +57,23 @@ InMemoryStateManager::InMemoryStateManager(
cluster_config->get_servers().push_back(peer_config);
}
if (!my_server_config)
throw Exception(ErrorCodes::RAFT_ERROR, "Our server id {} not found in raft_configuration section");
throw Exception(ErrorCodes::RAFT_ERROR, "Our server id {} not found in raft_configuration section", my_server_id);
if (start_as_follower_servers.size() == cluster_config->get_servers().size())
throw Exception(ErrorCodes::RAFT_ERROR, "At least one of servers should be able to start as leader (without <start_as_follower>)");
}
void InMemoryStateManager::save_config(const nuraft::cluster_config & config)
void NuKeeperStateManager::loadLogStore(size_t start_log_index)
{
log_store->init(start_log_index);
}
void NuKeeperStateManager::flushLogStore()
{
log_store->flush();
}
void NuKeeperStateManager::save_config(const nuraft::cluster_config & config)
{
// Just keep in memory in this example.
// Need to write to disk here, if want to make it durable.
@ -67,7 +81,7 @@ void InMemoryStateManager::save_config(const nuraft::cluster_config & config)
cluster_config = nuraft::cluster_config::deserialize(*buf);
}
void InMemoryStateManager::save_state(const nuraft::srv_state & state)
void NuKeeperStateManager::save_state(const nuraft::srv_state & state)
{
// Just keep in memory in this example.
// Need to write to disk here, if want to make it durable.

View File

@ -2,25 +2,32 @@
#include <Core/Types.h>
#include <string>
#include <Coordination/InMemoryLogStore.h>
#include <Coordination/NuKeeperLogStore.h>
#include <Coordination/CoordinationSettings.h>
#include <libnuraft/nuraft.hxx> // Y_IGNORE
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
class InMemoryStateManager : public nuraft::state_mgr
class NuKeeperStateManager : public nuraft::state_mgr
{
public:
InMemoryStateManager(
NuKeeperStateManager(
int server_id_,
const std::string & config_prefix,
const Poco::Util::AbstractConfiguration & config);
const Poco::Util::AbstractConfiguration & config,
const CoordinationSettingsPtr & coordination_settings);
InMemoryStateManager(
NuKeeperStateManager(
int server_id_,
const std::string & host,
int port);
int port,
const std::string & logs_path);
void loadLogStore(size_t start_log_index);
void flushLogStore();
nuraft::ptr<nuraft::cluster_config> load_config() override { return cluster_config; }
@ -49,7 +56,7 @@ private:
int my_server_id;
int my_port;
std::unordered_set<int> start_as_follower_servers;
nuraft::ptr<InMemoryLogStore> log_store;
nuraft::ptr<NuKeeperLogStore> log_store;
nuraft::ptr<nuraft::srv_config> my_server_config;
nuraft::ptr<nuraft::cluster_config> cluster_config;
nuraft::ptr<nuraft::srv_state> server_state;

View File

@ -25,10 +25,10 @@ static String parentPath(const String & path)
return "/";
}
static String baseName(const String & path)
static std::string getBaseName(const String & path)
{
auto rslash_pos = path.rfind('/');
return path.substr(rslash_pos + 1);
size_t basename_start = path.rfind('/');
return std::string{&path[basename_start + 1], path.length() - basename_start - 1};
}
static NuKeeperStorage::ResponsesForSessions processWatchesImpl(const String & path, NuKeeperStorage::Watches & watches, NuKeeperStorage::Watches & list_watches, Coordination::Event event_type)
@ -167,14 +167,17 @@ struct NuKeeperStorageCreateRequest final : public NuKeeperStorageRequest
/// Increment sequential number even if node is not sequential
++it->second.seq_num;
response.path_created = path_created;
container.emplace(path_created, std::move(created_node));
auto child_path = getBaseName(path_created);
it->second.children.insert(child_path);
if (request.is_ephemeral)
ephemerals[session_id].emplace(path_created);
undo = [&container, &ephemerals, session_id, path_created, is_ephemeral = request.is_ephemeral, parent_path = it->first]
undo = [&container, &ephemerals, session_id, path_created, is_ephemeral = request.is_ephemeral, parent_path = it->first, child_path]
{
container.erase(path_created);
if (is_ephemeral)
@ -183,6 +186,7 @@ struct NuKeeperStorageCreateRequest final : public NuKeeperStorageRequest
--undo_parent.stat.cversion;
--undo_parent.stat.numChildren;
--undo_parent.seq_num;
undo_parent.children.erase(child_path);
};
++it->second.stat.cversion;
@ -250,13 +254,16 @@ struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest
if (prev_node.is_ephemeral)
ephemerals[session_id].erase(request.path);
container.erase(it);
auto child_basename = getBaseName(it->first);
auto & parent = container.at(parentPath(request.path));
--parent.stat.numChildren;
++parent.stat.cversion;
parent.children.erase(child_basename);
response.error = Coordination::Error::ZOK;
undo = [prev_node, &container, &ephemerals, session_id, path = request.path]
container.erase(it);
undo = [prev_node, &container, &ephemerals, session_id, path = request.path, child_basename]
{
if (prev_node.is_ephemeral)
ephemerals[session_id].emplace(path);
@ -265,6 +272,7 @@ struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest
auto & undo_parent = container.at(parentPath(path));
++undo_parent.stat.numChildren;
--undo_parent.stat.cversion;
undo_parent.children.insert(child_basename);
};
}
@ -370,17 +378,9 @@ struct NuKeeperStorageListRequest final : public NuKeeperStorageRequest
if (path_prefix.empty())
throw DB::Exception("Logical error: path cannot be empty", ErrorCodes::LOGICAL_ERROR);
if (path_prefix.back() != '/')
path_prefix += '/';
response.names.insert(response.names.end(), it->second.children.begin(), it->second.children.end());
/// Fairly inefficient.
for (auto child_it = container.upper_bound(path_prefix);
child_it != container.end() && startsWith(child_it->first, path_prefix);
++child_it)
{
if (parentPath(child_it->first) == request.path)
response.names.emplace_back(baseName(child_it->first));
}
std::sort(response.names.begin(), response.names.end());
response.stat = it->second.stat;
response.error = Coordination::Error::ZOK;

View File

@ -16,6 +16,7 @@ using namespace DB;
struct NuKeeperStorageRequest;
using NuKeeperStorageRequestPtr = std::shared_ptr<NuKeeperStorageRequest>;
using ResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr &)>;
using ChildrenSet = std::unordered_set<std::string>;
class NuKeeperStorage
{
@ -30,6 +31,7 @@ public:
bool is_sequental = false;
Coordination::Stat stat{};
int32_t seq_num = 0;
ChildrenSet children{};
};
struct ResponseForSession
@ -48,9 +50,9 @@ public:
using RequestsForSessions = std::vector<RequestForSession>;
using Container = std::map<std::string, Node>;
using Ephemerals = std::unordered_map<int64_t, std::unordered_set<String>>;
using SessionAndWatcher = std::unordered_map<int64_t, std::unordered_set<String>>;
using Container = std::unordered_map<std::string, Node>;
using Ephemerals = std::unordered_map<int64_t, std::unordered_set<std::string>>;
using SessionAndWatcher = std::unordered_map<int64_t, std::unordered_set<std::string>>;
using SessionAndTimeout = std::unordered_map<int64_t, long>;
using SessionIDs = std::vector<int64_t>;

View File

@ -59,13 +59,16 @@ void NuKeeperStorageSerializer::deserialize(NuKeeperStorage & storage, ReadBuffe
size_t container_size;
Coordination::read(container_size, in);
while (storage.container.size() < container_size)
size_t current_size = 0;
while (current_size < container_size)
{
std::string path;
Coordination::read(path, in);
NuKeeperStorage::Node node;
readNode(node, in);
storage.container[path] = node;
current_size++;
}
size_t ephemerals_size;
Coordination::read(ephemerals_size, in);

View File

@ -6,9 +6,10 @@
#endif
#if USE_NURAFT
#include <Poco/ConsoleChannel.h>
#include <Poco/Logger.h>
#include <Coordination/InMemoryLogStore.h>
#include <Coordination/InMemoryStateManager.h>
#include <Coordination/NuKeeperStateManager.h>
#include <Coordination/NuKeeperStorageSerializer.h>
#include <Coordination/SummingStateMachine.h>
#include <Coordination/NuKeeperStateMachine.h>
@ -20,9 +21,35 @@
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Exception.h>
#include <common/logger_useful.h>
#include <libnuraft/nuraft.hxx> // Y_IGNORE
#include <thread>
#include <Coordination/NuKeeperLogStore.h>
#include <Coordination/Changelog.h>
#include <filesystem>
namespace fs = std::filesystem;
struct ChangelogDirTest
{
std::string path;
bool drop;
explicit ChangelogDirTest(std::string path_, bool drop_ = true)
: path(path_)
, drop(drop_)
{
if (fs::exists(path))
{
EXPECT_TRUE(false) << "Path " << path << " already exists, remove it to run test";
}
fs::create_directory(path);
}
~ChangelogDirTest()
{
if (fs::exists(path) && drop)
fs::remove_all(path);
}
};
TEST(CoordinationTest, BuildTest)
{
@ -67,14 +94,15 @@ TEST(CoordinationTest, BufferSerde)
template <typename StateMachine>
struct SimpliestRaftServer
{
SimpliestRaftServer(int server_id_, const std::string & hostname_, int port_)
SimpliestRaftServer(int server_id_, const std::string & hostname_, int port_, const std::string & logs_path)
: server_id(server_id_)
, hostname(hostname_)
, port(port_)
, endpoint(hostname + ":" + std::to_string(port))
, state_machine(nuraft::cs_new<StateMachine>())
, state_manager(nuraft::cs_new<DB::InMemoryStateManager>(server_id, hostname, port))
, state_manager(nuraft::cs_new<DB::NuKeeperStateManager>(server_id, hostname, port, logs_path))
{
state_manager->loadLogStore(1);
nuraft::raft_params params;
params.heart_beat_interval_ = 100;
params.election_timeout_lower_bound_ = 200;
@ -90,10 +118,10 @@ struct SimpliestRaftServer
if (!raft_instance)
{
std::cerr << "Failed to initialize launcher (see the message "
"in the log file)." << std::endl;
std::cerr << "Failed to initialize launcher" << std::endl;
exit(-1);
}
std::cout << "init Raft instance " << server_id;
for (size_t ii = 0; ii < 20; ++ii)
{
@ -123,7 +151,7 @@ struct SimpliestRaftServer
nuraft::ptr<StateMachine> state_machine;
// State manager.
nuraft::ptr<nuraft::state_mgr> state_manager;
nuraft::ptr<DB::NuKeeperStateManager> state_manager;
// Raft launcher.
nuraft::raft_launcher launcher;
@ -134,11 +162,10 @@ struct SimpliestRaftServer
using SummingRaftServer = SimpliestRaftServer<DB::SummingStateMachine>;
nuraft::ptr<nuraft::buffer> getLogEntry(int64_t number)
nuraft::ptr<nuraft::buffer> getBuffer(int64_t number)
{
nuraft::ptr<nuraft::buffer> ret = nuraft::buffer::alloc(sizeof(number));
nuraft::buffer_serializer bs(ret);
// WARNING: We don't consider endian-safety in this example.
bs.put_raw(&number, sizeof(number));
return ret;
}
@ -146,12 +173,13 @@ nuraft::ptr<nuraft::buffer> getLogEntry(int64_t number)
TEST(CoordinationTest, TestSummingRaft1)
{
SummingRaftServer s1(1, "localhost", 44444);
ChangelogDirTest test("./logs");
SummingRaftServer s1(1, "localhost", 44444, "./logs");
/// Single node is leader
EXPECT_EQ(s1.raft_instance->get_leader(), 1);
auto entry1 = getLogEntry(143);
auto entry1 = getBuffer(143);
auto ret = s1.raft_instance->append_entries({entry1});
EXPECT_TRUE(ret->get_accepted()) << "failed to replicate: entry 1" << ret->get_result_code();
EXPECT_EQ(ret->get_result_code(), nuraft::cmd_result_code::OK) << "failed to replicate: entry 1" << ret->get_result_code();
@ -169,17 +197,23 @@ TEST(CoordinationTest, TestSummingRaft1)
TEST(CoordinationTest, TestSummingRaft3)
{
SummingRaftServer s1(1, "localhost", 44444);
SummingRaftServer s2(2, "localhost", 44445);
SummingRaftServer s3(3, "localhost", 44446);
ChangelogDirTest test1("./logs1");
SummingRaftServer s1(1, "localhost", 44444, "./logs1");
ChangelogDirTest test2("./logs2");
SummingRaftServer s2(2, "localhost", 44445, "./logs2");
ChangelogDirTest test3("./logs3");
SummingRaftServer s3(3, "localhost", 44446, "./logs3");
nuraft::srv_config first_config(1, "localhost:44444");
nuraft::srv_config first_config(1, 0, "localhost:44444", "", false, 0);
auto ret1 = s2.raft_instance->add_srv(first_config);
if (!ret1->get_accepted())
while (!ret1->get_accepted())
{
std::cout << "failed to add server: "
<< ret1->get_result_str() << std::endl;
EXPECT_TRUE(false);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
ret1 = s2.raft_instance->add_srv(first_config);
}
while (s1.raft_instance->get_leader() != 2)
@ -188,13 +222,15 @@ TEST(CoordinationTest, TestSummingRaft3)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
nuraft::srv_config third_config(3, "localhost:44446");
nuraft::srv_config third_config(3, 0, "localhost:44446", "", false, 0);
auto ret3 = s2.raft_instance->add_srv(third_config);
if (!ret3->get_accepted())
{
std::cout << "failed to add server: "
<< ret3->get_result_str() << std::endl;
EXPECT_TRUE(false);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
ret3 = s2.raft_instance->add_srv(third_config);
}
while (s3.raft_instance->get_leader() != 2)
@ -209,10 +245,13 @@ TEST(CoordinationTest, TestSummingRaft3)
EXPECT_EQ(s3.raft_instance->get_leader(), 2);
std::cerr << "Starting to add entries\n";
auto entry = getLogEntry(1);
auto entry = getBuffer(1);
auto ret = s2.raft_instance->append_entries({entry});
EXPECT_TRUE(ret->get_accepted()) << "failed to replicate: entry 1" << ret->get_result_code();
EXPECT_EQ(ret->get_result_code(), nuraft::cmd_result_code::OK) << "failed to replicate: entry 1" << ret->get_result_code();
while (!ret->get_accepted() || ret->get_result_code() != nuraft::cmd_result_code::OK)
{
std::cerr << ret->get_accepted() << "failed to replicate: entry 1" << ret->get_result_code() << std::endl;
ret = s2.raft_instance->append_entries({entry});
}
while (s1.state_machine->getValue() != 1)
{
@ -236,7 +275,7 @@ TEST(CoordinationTest, TestSummingRaft3)
EXPECT_EQ(s2.state_machine->getValue(), 1);
EXPECT_EQ(s3.state_machine->getValue(), 1);
auto non_leader_entry = getLogEntry(3);
auto non_leader_entry = getBuffer(3);
auto ret_non_leader1 = s1.raft_instance->append_entries({non_leader_entry});
EXPECT_FALSE(ret_non_leader1->get_accepted());
@ -245,10 +284,13 @@ TEST(CoordinationTest, TestSummingRaft3)
EXPECT_FALSE(ret_non_leader3->get_accepted());
auto leader_entry = getLogEntry(77);
auto leader_entry = getBuffer(77);
auto ret_leader = s2.raft_instance->append_entries({leader_entry});
EXPECT_TRUE(ret_leader->get_accepted()) << "failed to replicate: entry 78" << ret_leader->get_result_code();
EXPECT_EQ(ret_leader->get_result_code(), nuraft::cmd_result_code::OK) << "failed to replicate: entry 78" << ret_leader->get_result_code();
while (!ret_leader->get_accepted() || ret_leader->get_result_code() != nuraft::cmd_result_code::OK)
{
std::cerr << "failed to replicate: entry 78" << ret_leader->get_result_code() << std::endl;
ret_leader = s2.raft_instance->append_entries({leader_entry});
}
while (s1.state_machine->getValue() != 78)
{
@ -333,4 +375,586 @@ TEST(CoordinationTest, TestStorageSerialization)
EXPECT_EQ(new_storage.ephemerals[1].size(), 1);
}
DB::LogEntryPtr getLogEntry(const std::string & s, size_t term)
{
DB::WriteBufferFromNuraftBuffer bufwriter;
writeText(s, bufwriter);
return nuraft::cs_new<nuraft::log_entry>(term, bufwriter.getBuffer());
}
TEST(CoordinationTest, ChangelogTestSimple)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 5, true);
changelog.init(1);
auto entry = getLogEntry("hello world", 77);
changelog.append(entry);
EXPECT_EQ(changelog.next_slot(), 2);
EXPECT_EQ(changelog.start_index(), 1);
EXPECT_EQ(changelog.last_entry()->get_term(), 77);
EXPECT_EQ(changelog.entry_at(1)->get_term(), 77);
EXPECT_EQ(changelog.log_entries(1, 2)->size(), 1);
}
TEST(CoordinationTest, ChangelogTestFile)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 5, true);
changelog.init(1);
auto entry = getLogEntry("hello world", 77);
changelog.append(entry);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
for (const auto & p : fs::directory_iterator("./logs"))
EXPECT_EQ(p.path(), "./logs/changelog_1_5.bin");
changelog.append(entry);
changelog.append(entry);
changelog.append(entry);
changelog.append(entry);
changelog.append(entry);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
}
TEST(CoordinationTest, ChangelogReadWrite)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 1000, true);
changelog.init(1);
for (size_t i = 0; i < 10; ++i)
{
auto entry = getLogEntry("hello world", i * 10);
changelog.append(entry);
}
EXPECT_EQ(changelog.size(), 10);
DB::NuKeeperLogStore changelog_reader("./logs", 1000, true);
changelog_reader.init(1);
EXPECT_EQ(changelog_reader.size(), 10);
EXPECT_EQ(changelog_reader.last_entry()->get_term(), changelog.last_entry()->get_term());
EXPECT_EQ(changelog_reader.start_index(), changelog.start_index());
EXPECT_EQ(changelog_reader.next_slot(), changelog.next_slot());
for (size_t i = 0; i < 10; ++i)
EXPECT_EQ(changelog_reader.entry_at(i + 1)->get_term(), changelog.entry_at(i + 1)->get_term());
auto entries_from_range_read = changelog_reader.log_entries(1, 11);
auto entries_from_range = changelog.log_entries(1, 11);
EXPECT_EQ(entries_from_range_read->size(), entries_from_range->size());
EXPECT_EQ(10, entries_from_range->size());
}
TEST(CoordinationTest, ChangelogWriteAt)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 1000, true);
changelog.init(1);
for (size_t i = 0; i < 10; ++i)
{
auto entry = getLogEntry("hello world", i * 10);
changelog.append(entry);
}
EXPECT_EQ(changelog.size(), 10);
auto entry = getLogEntry("writer", 77);
changelog.write_at(7, entry);
EXPECT_EQ(changelog.size(), 7);
EXPECT_EQ(changelog.last_entry()->get_term(), 77);
EXPECT_EQ(changelog.entry_at(7)->get_term(), 77);
EXPECT_EQ(changelog.next_slot(), 8);
DB::NuKeeperLogStore changelog_reader("./logs", 1000, true);
changelog_reader.init(1);
EXPECT_EQ(changelog_reader.size(), changelog.size());
EXPECT_EQ(changelog_reader.last_entry()->get_term(), changelog.last_entry()->get_term());
EXPECT_EQ(changelog_reader.start_index(), changelog.start_index());
EXPECT_EQ(changelog_reader.next_slot(), changelog.next_slot());
}
TEST(CoordinationTest, ChangelogTestAppendAfterRead)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 5, true);
changelog.init(1);
for (size_t i = 0; i < 7; ++i)
{
auto entry = getLogEntry("hello world", i * 10);
changelog.append(entry);
}
EXPECT_EQ(changelog.size(), 7);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
DB::NuKeeperLogStore changelog_reader("./logs", 5, true);
changelog_reader.init(1);
EXPECT_EQ(changelog_reader.size(), 7);
for (size_t i = 7; i < 10; ++i)
{
auto entry = getLogEntry("hello world", i * 10);
changelog_reader.append(entry);
}
EXPECT_EQ(changelog_reader.size(), 10);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
size_t logs_count = 0;
for (const auto & _ [[maybe_unused]]: fs::directory_iterator("./logs"))
logs_count++;
EXPECT_EQ(logs_count, 2);
auto entry = getLogEntry("someentry", 77);
changelog_reader.append(entry);
EXPECT_EQ(changelog_reader.size(), 11);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin"));
logs_count = 0;
for (const auto & _ [[maybe_unused]]: fs::directory_iterator("./logs"))
logs_count++;
EXPECT_EQ(logs_count, 3);
}
TEST(CoordinationTest, ChangelogTestCompaction)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 5, true);
changelog.init(1);
for (size_t i = 0; i < 3; ++i)
{
auto entry = getLogEntry("hello world", i * 10);
changelog.append(entry);
}
EXPECT_EQ(changelog.size(), 3);
changelog.compact(2);
EXPECT_EQ(changelog.size(), 1);
EXPECT_EQ(changelog.start_index(), 3);
EXPECT_EQ(changelog.next_slot(), 4);
EXPECT_EQ(changelog.last_entry()->get_term(), 20);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
auto e1 = getLogEntry("hello world", 30);
changelog.append(e1);
auto e2 = getLogEntry("hello world", 40);
changelog.append(e2);
auto e3 = getLogEntry("hello world", 50);
changelog.append(e3);
auto e4 = getLogEntry("hello world", 60);
changelog.append(e4);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
changelog.compact(6);
EXPECT_FALSE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
EXPECT_EQ(changelog.size(), 1);
EXPECT_EQ(changelog.start_index(), 7);
EXPECT_EQ(changelog.next_slot(), 8);
EXPECT_EQ(changelog.last_entry()->get_term(), 60);
/// And we able to read it
DB::NuKeeperLogStore changelog_reader("./logs", 5, true);
changelog_reader.init(7);
EXPECT_EQ(changelog_reader.size(), 1);
EXPECT_EQ(changelog_reader.start_index(), 7);
EXPECT_EQ(changelog_reader.next_slot(), 8);
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 60);
}
TEST(CoordinationTest, ChangelogTestBatchOperations)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 100, true);
changelog.init(1);
for (size_t i = 0; i < 10; ++i)
{
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
changelog.append(entry);
}
EXPECT_EQ(changelog.size(), 10);
auto entries = changelog.pack(1, 5);
DB::NuKeeperLogStore apply_changelog("./logs", 100, true);
apply_changelog.init(1);
for (size_t i = 0; i < 10; ++i)
{
EXPECT_EQ(apply_changelog.entry_at(i + 1)->get_term(), i * 10);
}
EXPECT_EQ(apply_changelog.size(), 10);
apply_changelog.apply_pack(8, *entries);
EXPECT_EQ(apply_changelog.size(), 12);
EXPECT_EQ(apply_changelog.start_index(), 1);
EXPECT_EQ(apply_changelog.next_slot(), 13);
for (size_t i = 0; i < 7; ++i)
{
EXPECT_EQ(apply_changelog.entry_at(i + 1)->get_term(), i * 10);
}
EXPECT_EQ(apply_changelog.entry_at(8)->get_term(), 0);
EXPECT_EQ(apply_changelog.entry_at(9)->get_term(), 10);
EXPECT_EQ(apply_changelog.entry_at(10)->get_term(), 20);
EXPECT_EQ(apply_changelog.entry_at(11)->get_term(), 30);
EXPECT_EQ(apply_changelog.entry_at(12)->get_term(), 40);
}
TEST(CoordinationTest, ChangelogTestBatchOperationsEmpty)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 100, true);
changelog.init(1);
for (size_t i = 0; i < 10; ++i)
{
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
changelog.append(entry);
}
EXPECT_EQ(changelog.size(), 10);
auto entries = changelog.pack(5, 5);
ChangelogDirTest test1("./logs1");
DB::NuKeeperLogStore changelog_new("./logs1", 100, true);
changelog_new.init(1);
EXPECT_EQ(changelog_new.size(), 0);
changelog_new.apply_pack(5, *entries);
EXPECT_EQ(changelog_new.size(), 5);
EXPECT_EQ(changelog_new.start_index(), 5);
EXPECT_EQ(changelog_new.next_slot(), 10);
for (size_t i = 4; i < 9; ++i)
EXPECT_EQ(changelog_new.entry_at(i + 1)->get_term(), i * 10);
auto e = getLogEntry("hello_world", 110);
changelog_new.append(e);
EXPECT_EQ(changelog_new.size(), 6);
EXPECT_EQ(changelog_new.start_index(), 5);
EXPECT_EQ(changelog_new.next_slot(), 11);
DB::NuKeeperLogStore changelog_reader("./logs1", 100, true);
changelog_reader.init(5);
}
TEST(CoordinationTest, ChangelogTestWriteAtPreviousFile)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 5, true);
changelog.init(1);
for (size_t i = 0; i < 33; ++i)
{
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
changelog.append(entry);
}
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin"));
EXPECT_EQ(changelog.size(), 33);
auto e1 = getLogEntry("helloworld", 5555);
changelog.write_at(7, e1);
EXPECT_EQ(changelog.size(), 7);
EXPECT_EQ(changelog.start_index(), 1);
EXPECT_EQ(changelog.next_slot(), 8);
EXPECT_EQ(changelog.last_entry()->get_term(), 5555);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_11_15.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin"));
DB::NuKeeperLogStore changelog_read("./logs", 5, true);
changelog_read.init(1);
EXPECT_EQ(changelog_read.size(), 7);
EXPECT_EQ(changelog_read.start_index(), 1);
EXPECT_EQ(changelog_read.next_slot(), 8);
EXPECT_EQ(changelog_read.last_entry()->get_term(), 5555);
}
TEST(CoordinationTest, ChangelogTestWriteAtFileBorder)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 5, true);
changelog.init(1);
for (size_t i = 0; i < 33; ++i)
{
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
changelog.append(entry);
}
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin"));
EXPECT_EQ(changelog.size(), 33);
auto e1 = getLogEntry("helloworld", 5555);
changelog.write_at(11, e1);
EXPECT_EQ(changelog.size(), 11);
EXPECT_EQ(changelog.start_index(), 1);
EXPECT_EQ(changelog.next_slot(), 12);
EXPECT_EQ(changelog.last_entry()->get_term(), 5555);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin"));
DB::NuKeeperLogStore changelog_read("./logs", 5, true);
changelog_read.init(1);
EXPECT_EQ(changelog_read.size(), 11);
EXPECT_EQ(changelog_read.start_index(), 1);
EXPECT_EQ(changelog_read.next_slot(), 12);
EXPECT_EQ(changelog_read.last_entry()->get_term(), 5555);
}
TEST(CoordinationTest, ChangelogTestWriteAtAllFiles)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 5, true);
changelog.init(1);
for (size_t i = 0; i < 33; ++i)
{
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
changelog.append(entry);
}
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin"));
EXPECT_EQ(changelog.size(), 33);
auto e1 = getLogEntry("helloworld", 5555);
changelog.write_at(1, e1);
EXPECT_EQ(changelog.size(), 1);
EXPECT_EQ(changelog.start_index(), 1);
EXPECT_EQ(changelog.next_slot(), 2);
EXPECT_EQ(changelog.last_entry()->get_term(), 5555);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_6_10.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_11_15.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin"));
}
TEST(CoordinationTest, ChangelogTestStartNewLogAfterRead)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 5, true);
changelog.init(1);
for (size_t i = 0; i < 35; ++i)
{
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
changelog.append(entry);
}
EXPECT_EQ(changelog.size(), 35);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_36_40.bin"));
DB::NuKeeperLogStore changelog_reader("./logs", 5, true);
changelog_reader.init(1);
auto entry = getLogEntry("36_hello_world", 360);
changelog_reader.append(entry);
EXPECT_EQ(changelog_reader.size(), 36);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_36_40.bin"));
}
TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 5, true);
changelog.init(1);
for (size_t i = 0; i < 35; ++i)
{
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
changelog.append(entry);
}
EXPECT_EQ(changelog.size(), 35);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_16_20.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_21_25.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_26_30.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_31_35.bin"));
DB::WriteBufferFromFile plain_buf("./logs/changelog_11_15.bin", DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
plain_buf.truncate(0);
DB::NuKeeperLogStore changelog_reader("./logs", 5, true);
changelog_reader.init(1);
EXPECT_EQ(changelog_reader.size(), 10);
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 90);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin"));
auto entry = getLogEntry("h", 7777);
changelog_reader.append(entry);
EXPECT_EQ(changelog_reader.size(), 11);
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 7777);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_11_15.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_16_20.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_21_25.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin"));
DB::NuKeeperLogStore changelog_reader2("./logs", 5, true);
changelog_reader2.init(1);
EXPECT_EQ(changelog_reader2.size(), 11);
EXPECT_EQ(changelog_reader2.last_entry()->get_term(), 7777);
}
TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 20, true);
changelog.init(1);
for (size_t i = 0; i < 35; ++i)
{
auto entry = getLogEntry(std::to_string(i) + "_hello_world", (i + 44) * 10);
changelog.append(entry);
}
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin"));
DB::WriteBufferFromFile plain_buf("./logs/changelog_1_20.bin", DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
plain_buf.truncate(140);
DB::NuKeeperLogStore changelog_reader("./logs", 20, true);
changelog_reader.init(1);
EXPECT_EQ(changelog_reader.size(), 2);
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 450);
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_21_40.bin"));
auto entry = getLogEntry("hello_world", 7777);
changelog_reader.append(entry);
EXPECT_EQ(changelog_reader.size(), 3);
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 7777);
DB::NuKeeperLogStore changelog_reader2("./logs", 20, true);
changelog_reader2.init(1);
EXPECT_EQ(changelog_reader2.size(), 3);
EXPECT_EQ(changelog_reader2.last_entry()->get_term(), 7777);
}
TEST(CoordinationTest, ChangelogTestLostFiles)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 20, true);
changelog.init(1);
for (size_t i = 0; i < 35; ++i)
{
auto entry = getLogEntry(std::to_string(i) + "_hello_world", (i + 44) * 10);
changelog.append(entry);
}
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin"));
fs::remove("./logs/changelog_1_20.bin");
DB::NuKeeperLogStore changelog_reader("./logs", 20, true);
EXPECT_THROW(changelog_reader.init(5), DB::Exception);
fs::remove("./logs/changelog_21_40.bin");
EXPECT_THROW(changelog_reader.init(3), DB::Exception);
}
int main(int argc, char ** argv)
{
Poco::AutoPtr<Poco::ConsoleChannel> channel(new Poco::ConsoleChannel(std::cerr));
Poco::Logger::root().setChannel(channel);
Poco::Logger::root().setLevel("trace");
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#endif

View File

@ -40,7 +40,7 @@ namespace ErrorCodes
struct PollResult
{
size_t ready_responses_count{0};
size_t responses_count{0};
bool has_requests{false};
bool error{false};
};
@ -70,14 +70,14 @@ struct SocketInterruptablePollWrapper
if (epollfd < 0)
throwFromErrno("Cannot epoll_create", ErrorCodes::SYSTEM_ERROR);
socket_event.events = EPOLLIN | EPOLLERR;
socket_event.events = EPOLLIN | EPOLLERR | EPOLLPRI;
socket_event.data.fd = sockfd;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &socket_event) < 0)
{
::close(epollfd);
throwFromErrno("Cannot insert socket into epoll queue", ErrorCodes::SYSTEM_ERROR);
}
pipe_event.events = EPOLLIN | EPOLLERR;
pipe_event.events = EPOLLIN | EPOLLERR | EPOLLPRI;
pipe_event.data.fd = pipe.fds_rw[0];
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, pipe.fds_rw[0], &pipe_event) < 0)
{
@ -92,13 +92,24 @@ struct SocketInterruptablePollWrapper
return pipe.fds_rw[1];
}
PollResult poll(Poco::Timespan remaining_time)
PollResult poll(Poco::Timespan remaining_time, const std::shared_ptr<ReadBufferFromPocoSocket> & in)
{
bool socket_ready = false;
bool fd_ready = false;
if (in->available() != 0)
socket_ready = true;
if (response_in.available() != 0)
fd_ready = true;
int rc = 0;
if (!fd_ready)
{
std::array<int, 2> outputs = {-1, -1};
#if defined(POCO_HAVE_FD_EPOLL)
int rc;
epoll_event evout[2];
memset(evout, 0, sizeof(evout));
evout[0].data.fd = evout[1].data.fd = -1;
do
{
Poco::Timestamp start;
@ -115,10 +126,13 @@ struct SocketInterruptablePollWrapper
}
while (rc < 0 && errno == EINTR);
if (rc >= 1 && evout[0].events & EPOLLIN)
outputs[0] = evout[0].data.fd;
if (rc == 2 && evout[1].events & EPOLLIN)
outputs[1] = evout[1].data.fd;
for (int i = 0; i < rc; ++i)
{
if (evout[i].data.fd == sockfd)
socket_ready = true;
if (evout[i].data.fd == pipe.fds_rw[0])
fd_ready = true;
}
#else
pollfd poll_buf[2];
poll_buf[0].fd = sockfd;
@ -126,7 +140,6 @@ struct SocketInterruptablePollWrapper
poll_buf[1].fd = pipe.fds_rw[0];
poll_buf[1].events = POLLIN;
int rc;
do
{
Poco::Timestamp start;
@ -142,47 +155,29 @@ struct SocketInterruptablePollWrapper
}
}
while (rc < 0 && errno == POCO_EINTR);
if (rc >= 1 && poll_buf[0].revents & POLLIN)
outputs[0] = sockfd;
socket_ready = true;
if (rc == 2 && poll_buf[1].revents & POLLIN)
outputs[1] = pipe.fds_rw[0];
fd_ready = true;
#endif
}
PollResult result{};
if (rc < 0)
{
result.error = true;
return result;
}
else if (rc == 0)
{
return result;
}
else
{
for (auto fd : outputs)
{
if (fd != -1)
{
if (fd == sockfd)
result.has_requests = true;
else
result.has_requests = socket_ready;
if (fd_ready)
{
UInt8 dummy;
do
{
/// All ready responses stored in responses queue,
/// but we have to count amount of ready responses in pipe
/// and process them only. Otherwise states of response_in
/// and response queue will be inconsistent and race condition is possible.
readIntBinary(dummy, response_in);
result.ready_responses_count++;
}
while (response_in.available());
}
}
}
result.responses_count = 1;
auto available = response_in.available();
response_in.ignore(available);
result.responses_count += available;
}
if (rc < 0)
result.error = true;
return result;
}
@ -339,10 +334,8 @@ void NuKeeperTCPHandler::runImpl()
{
using namespace std::chrono_literals;
PollResult result = poll_wrapper->poll(session_timeout);
PollResult result = poll_wrapper->poll(session_timeout, in);
if (result.has_requests && !close_received)
{
do
{
auto [received_op, received_xid] = receiveRequest();
@ -351,7 +344,6 @@ void NuKeeperTCPHandler::runImpl()
LOG_DEBUG(log, "Received close event with xid {} for session id #{}", received_xid, session_id);
close_xid = received_xid;
close_received = true;
break;
}
else if (received_op == Coordination::OpNum::Heartbeat)
{
@ -359,23 +351,23 @@ void NuKeeperTCPHandler::runImpl()
session_stopwatch.restart();
}
}
while (in->available());
}
/// Process exact amount of responses from pipe
/// otherwise state of responses queue and signaling pipe
/// became inconsistent and race condition is possible.
while (result.ready_responses_count != 0)
while (result.responses_count != 0)
{
Coordination::ZooKeeperResponsePtr response;
if (!responses->tryPop(response))
throw Exception(ErrorCodes::LOGICAL_ERROR, "We must have at least {} ready responses, but queue is empty. It's a bug.", result.ready_responses_count);
throw Exception(ErrorCodes::LOGICAL_ERROR, "We must have ready response, but queue is empty. It's a bug.");
if (response->xid == close_xid)
{
LOG_DEBUG(log, "Session #{} successfully closed", session_id);
return;
}
response->write(*out);
if (response->error == Coordination::Error::ZSESSIONEXPIRED)
{
@ -383,7 +375,8 @@ void NuKeeperTCPHandler::runImpl()
nu_keeper_storage_dispatcher->finishSession(session_id);
return;
}
result.ready_responses_count--;
result.responses_count--;
}
if (result.error)

View File

@ -2,12 +2,15 @@
<test_keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
<snapshot_distance>0</snapshot_distance>
<reserved_log_items>0</reserved_log_items>
<force_sync>false</force_sync>
<startup_timeout>60000</startup_timeout>
</coordination_settings>
<raft_configuration>

View File

@ -2,11 +2,13 @@
<test_keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
<force_sync>false</force_sync>
</coordination_settings>
<raft_configuration>

View File

@ -0,0 +1,8 @@
<yandex>
<zookeeper>
<node index="1">
<host>node1</host>
<port>9181</port>
</node>
</zookeeper>
</yandex>

View File

@ -8,27 +8,18 @@ from multiprocessing.dummy import Pool
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', main_configs=['configs/enable_test_keeper.xml', 'configs/logs_conf.xml'], with_zookeeper=True)
from kazoo.client import KazooClient, KazooState
_genuine_zk_instance = None
_fake_zk_instance = None
from kazoo.client import KazooClient, KazooState, KeeperState
def get_genuine_zk():
global _genuine_zk_instance
if not _genuine_zk_instance:
print("Zoo1", cluster.get_instance_ip("zoo1"))
_genuine_zk_instance = cluster.get_kazoo_client('zoo1')
return _genuine_zk_instance
return cluster.get_kazoo_client('zoo1')
def get_fake_zk():
global _fake_zk_instance
if not _fake_zk_instance:
print("node", cluster.get_instance_ip("node"))
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip("node") + ":9181", timeout=30.0)
def reset_last_zxid_listener(state):
print("Fake zk callback called for state", state)
global _fake_zk_instance
nonlocal _fake_zk_instance
if state != KazooState.CONNECTED:
_fake_zk_instance._reset()
@ -44,6 +35,15 @@ def create_random_path(prefix="", depth=1):
return prefix
return create_random_path(os.path.join(prefix, random_string(3)), depth - 1)
def stop_zk(zk):
try:
if zk:
zk.stop()
zk.close()
except:
pass
@pytest.fixture(scope="module")
def started_cluster():
try:
@ -53,15 +53,10 @@ def started_cluster():
finally:
cluster.shutdown()
if _genuine_zk_instance:
_genuine_zk_instance.stop()
_genuine_zk_instance.close()
if _fake_zk_instance:
_fake_zk_instance.stop()
_fake_zk_instance.close()
def test_simple_commands(started_cluster):
try:
genuine_zk = get_genuine_zk()
fake_zk = get_fake_zk()
@ -75,9 +70,13 @@ def test_simple_commands(started_cluster):
assert zk.exists("/test_simple_commands/somenode1")
print(zk.get("/test_simple_commands/somenode1"))
assert zk.get("/test_simple_commands/somenode1")[0] == b"world"
finally:
for zk in [genuine_zk, fake_zk]:
stop_zk(zk)
def test_sequential_nodes(started_cluster):
try:
genuine_zk = get_genuine_zk()
fake_zk = get_fake_zk()
genuine_zk.create("/test_sequential_nodes")
@ -91,6 +90,9 @@ def test_sequential_nodes(started_cluster):
genuine_childs = list(sorted(genuine_zk.get_children("/test_sequential_nodes")))
fake_childs = list(sorted(fake_zk.get_children("/test_sequential_nodes")))
assert genuine_childs == fake_childs
finally:
for zk in [genuine_zk, fake_zk]:
stop_zk(zk)
def assert_eq_stats(stat1, stat2):
@ -102,6 +104,7 @@ def assert_eq_stats(stat1, stat2):
assert stat1.numChildren == stat2.numChildren
def test_stats(started_cluster):
try:
genuine_zk = get_genuine_zk()
fake_zk = get_fake_zk()
genuine_zk.create("/test_stats_nodes")
@ -139,8 +142,12 @@ def test_stats(started_cluster):
print(genuine_stats)
print(fake_stats)
assert_eq_stats(genuine_stats, fake_stats)
finally:
for zk in [genuine_zk, fake_zk]:
stop_zk(zk)
def test_watchers(started_cluster):
try:
genuine_zk = get_genuine_zk()
fake_zk = get_fake_zk()
genuine_zk.create("/test_data_watches")
@ -196,8 +203,12 @@ def test_watchers(started_cluster):
print("Genuine children", genuine_children)
print("Fake children", fake_children)
assert genuine_children == fake_children
finally:
for zk in [genuine_zk, fake_zk]:
stop_zk(zk)
def test_multitransactions(started_cluster):
try:
genuine_zk = get_genuine_zk()
fake_zk = get_fake_zk()
for zk in [genuine_zk, fake_zk]:
@ -225,7 +236,9 @@ def test_multitransactions(started_cluster):
assert zk.exists('/test_multitransactions/q') is None
assert zk.exists('/test_multitransactions/a') is None
assert zk.exists('/test_multitransactions/x') is None
finally:
for zk in [genuine_zk, fake_zk]:
stop_zk(zk)
def exists(zk, path):
result = zk.exists(path)
@ -278,13 +291,13 @@ class Request(object):
arg_str = ', '.join([str(k) + "=" + str(v) for k, v in self.arguments.items()])
return "ZKRequest name {} with arguments {}".format(self.name, arg_str)
def generate_requests(iters=1):
def generate_requests(prefix="/", iters=1):
requests = []
existing_paths = []
for i in range(iters):
for _ in range(100):
rand_length = random.randint(0, 10)
path = "/"
path = prefix
for j in range(1, rand_length):
path = create_random_path(path, 1)
existing_paths.append(path)
@ -322,9 +335,13 @@ def generate_requests(iters=1):
def test_random_requests(started_cluster):
requests = generate_requests(10)
try:
requests = generate_requests("/test_random_requests", 10)
print("Generated", len(requests), "requests")
genuine_zk = get_genuine_zk()
fake_zk = get_fake_zk()
genuine_zk.create("/test_random_requests")
fake_zk.create("/test_random_requests")
for i, request in enumerate(requests):
genuine_throw = False
fake_throw = False
@ -333,20 +350,28 @@ def test_random_requests(started_cluster):
try:
genuine_result = request.callback(genuine_zk)
except Exception as ex:
print("i", i, "request", request)
print("Genuine exception", str(ex))
genuine_throw = True
try:
fake_result = request.callback(fake_zk)
except Exception as ex:
print("i", i, "request", request)
print("Fake exception", str(ex))
fake_throw = True
assert fake_throw == genuine_throw, "Fake throw genuine not or vise versa"
assert fake_throw == genuine_throw, "Fake throw genuine not or vise versa request {}"
assert fake_result == genuine_result, "Zookeeper results differ"
root_children_genuine = [elem for elem in list(sorted(genuine_zk.get_children("/"))) if elem not in ('clickhouse', 'zookeeper')]
root_children_fake = [elem for elem in list(sorted(fake_zk.get_children("/"))) if elem not in ('clickhouse', 'zookeeper')]
root_children_genuine = [elem for elem in list(sorted(genuine_zk.get_children("/test_random_requests"))) if elem not in ('clickhouse', 'zookeeper')]
root_children_fake = [elem for elem in list(sorted(fake_zk.get_children("/test_random_requests"))) if elem not in ('clickhouse', 'zookeeper')]
assert root_children_fake == root_children_genuine
finally:
for zk in [genuine_zk, fake_zk]:
stop_zk(zk)
def test_end_of_session(started_cluster):
fake_zk1 = None
fake_zk2 = None
genuine_zk1 = None
@ -401,13 +426,8 @@ def test_end_of_session(started_cluster):
assert fake_ephemeral_event == genuine_ephemeral_event
finally:
try:
for zk in [fake_zk1, fake_zk2, genuine_zk1, genuine_zk2]:
if zk:
zk.stop()
zk.close()
except:
pass
stop_zk(zk)
def test_end_of_watches_session(started_cluster):
fake_zk1 = None
@ -442,15 +462,11 @@ def test_end_of_watches_session(started_cluster):
assert dummy_set == 2
finally:
try:
for zk in [fake_zk1, fake_zk2]:
if zk:
zk.stop()
zk.close()
except:
pass
stop_zk(zk)
def test_concurrent_watches(started_cluster):
try:
fake_zk = get_fake_zk()
fake_zk.restart()
global_path = "/test_concurrent_watches_0"
@ -530,3 +546,5 @@ def test_concurrent_watches(started_cluster):
print("Diff", list(set(all_paths_created) - set(all_paths_triggered)))
assert dumb_watch_triggered_counter == watches_must_be_triggered
finally:
stop_zk(fake_zk)

View File

@ -2,6 +2,7 @@
<test_keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>

View File

@ -2,6 +2,7 @@
<test_keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>

View File

@ -2,6 +2,7 @@
<test_keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>

View File

@ -6,6 +6,7 @@ import os
import time
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_test_keeper1.xml', 'configs/log_conf.xml', 'configs/use_test_keeper.xml'], stay_alive=True)
@ -14,6 +15,18 @@ node3 = cluster.add_instance('node3', main_configs=['configs/enable_test_keeper3
from kazoo.client import KazooClient, KazooState
"""
In this test, we blockade RAFT leader and check that the whole system is
able to recover. It's not a good test because we use ClickHouse's replicated
tables to check connectivity, but they may require special operations (or a long
wait) after session expiration. We don't use kazoo, because this client pretends
to be very smart: SUSPEND sessions, try to recover them, and so on. The test
will be even less predictable than with ClickHouse tables.
TODO find (or write) not so smart python client.
TODO remove this when jepsen tests will be written.
"""
@pytest.fixture(scope="module")
def started_cluster():
try:
@ -55,7 +68,6 @@ def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
def reset_listener(state):
nonlocal _fake_zk_instance
print("Fake zk callback called for state", state)
if state != KazooState.CONNECTED:
_fake_zk_instance._reset()
@ -67,19 +79,25 @@ def get_fake_zk(nodename, timeout=30.0):
# in extremely rare case it can take more than 5 minutes in debug build with sanitizer
@pytest.mark.timeout(600)
def test_blocade_leader(started_cluster):
for i in range(100):
wait_nodes()
try:
for i, node in enumerate([node1, node2, node3]):
node.query("CREATE DATABASE IF NOT EXISTS ordinary ENGINE=Ordinary")
node.query("CREATE TABLE ordinary.t1 (value UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/t1', '{}') ORDER BY tuple()".format(i + 1))
node.query("CREATE TABLE IF NOT EXISTS ordinary.t1 (value UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/t1', '{}') ORDER BY tuple()".format(i + 1))
break
except Exception as ex:
print("Got exception from node", smaller_exception(ex))
time.sleep(0.1)
node2.query("INSERT INTO ordinary.t1 SELECT number FROM numbers(10)")
node1.query("SYSTEM SYNC REPLICA ordinary.t1", timeout=10)
node3.query("SYSTEM SYNC REPLICA ordinary.t1", timeout=10)
assert node1.query("SELECT COUNT() FROM ordinary.t1") == "10\n"
assert node2.query("SELECT COUNT() FROM ordinary.t1") == "10\n"
assert node3.query("SELECT COUNT() FROM ordinary.t1") == "10\n"
assert_eq_with_retry(node1, "SELECT COUNT() FROM ordinary.t1", "10")
assert_eq_with_retry(node2, "SELECT COUNT() FROM ordinary.t1", "10")
assert_eq_with_retry(node3, "SELECT COUNT() FROM ordinary.t1", "10")
with PartitionManager() as pm:
pm.partition_instances(node2, node1)
@ -87,7 +105,7 @@ def test_blocade_leader(started_cluster):
for i in range(100):
try:
node2.query("SYSTEM RESTART REPLICA ordinary.t1")
restart_replica_for_sure(node2, "ordinary.t1", "/clickhouse/t1/replicas/2")
node2.query("INSERT INTO ordinary.t1 SELECT rand() FROM numbers(100)")
break
except Exception as ex:
@ -104,7 +122,7 @@ def test_blocade_leader(started_cluster):
for i in range(100):
try:
node3.query("SYSTEM RESTART REPLICA ordinary.t1")
restart_replica_for_sure(node3, "ordinary.t1", "/clickhouse/t1/replicas/3")
node3.query("INSERT INTO ordinary.t1 SELECT rand() FROM numbers(100)")
break
except Exception as ex:
@ -122,7 +140,7 @@ def test_blocade_leader(started_cluster):
for n, node in enumerate([node1, node2, node3]):
for i in range(100):
try:
node.query("SYSTEM RESTART REPLICA ordinary.t1")
restart_replica_for_sure(node, "ordinary.t1", "/clickhouse/t1/replicas/{}".format(n + 1))
break
except Exception as ex:
try:
@ -150,7 +168,7 @@ def test_blocade_leader(started_cluster):
for n, node in enumerate([node1, node2, node3]):
for i in range(100):
try:
node.query("SYSTEM RESTART REPLICA ordinary.t1")
restart_replica_for_sure(node, "ordinary.t1", "/clickhouse/t1/replicas/{}".format(n + 1))
node.query("SYSTEM SYNC REPLICA ordinary.t1", timeout=10)
break
except Exception as ex:
@ -170,9 +188,9 @@ def test_blocade_leader(started_cluster):
for num, node in enumerate([node1, node2, node3]):
dump_zk(node, '/clickhouse/t1', '/clickhouse/t1/replicas/{}'.format(num + 1))
assert node1.query("SELECT COUNT() FROM ordinary.t1") == "310\n"
assert node2.query("SELECT COUNT() FROM ordinary.t1") == "310\n"
assert node3.query("SELECT COUNT() FROM ordinary.t1") == "310\n"
assert_eq_with_retry(node1, "SELECT COUNT() FROM ordinary.t1", "310")
assert_eq_with_retry(node2, "SELECT COUNT() FROM ordinary.t1", "310")
assert_eq_with_retry(node3, "SELECT COUNT() FROM ordinary.t1", "310")
def dump_zk(node, zk_path, replica_path):
@ -188,22 +206,47 @@ def dump_zk(node, zk_path, replica_path):
print("Parts")
print(node.query("SELECT name FROM system.zookeeper WHERE path = '{}/parts' FORMAT Vertical".format(replica_path)))
def restart_replica_for_sure(node, table_name, zk_replica_path):
fake_zk = None
try:
node.query("DETACH TABLE {}".format(table_name))
fake_zk = get_fake_zk(node.name)
if fake_zk.exists(zk_replica_path + "/is_active") is not None:
fake_zk.delete(zk_replica_path + "/is_active")
node.query("ATTACH TABLE {}".format(table_name))
except Exception as ex:
print("Exception", ex)
raise ex
finally:
if fake_zk:
fake_zk.stop()
fake_zk.close()
# in extremely rare case it can take more than 5 minutes in debug build with sanitizer
@pytest.mark.timeout(600)
def test_blocade_leader_twice(started_cluster):
for i in range(100):
wait_nodes()
try:
for i, node in enumerate([node1, node2, node3]):
node.query("CREATE DATABASE IF NOT EXISTS ordinary ENGINE=Ordinary")
node.query("CREATE TABLE ordinary.t2 (value UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/t2', '{}') ORDER BY tuple()".format(i + 1))
node.query("CREATE TABLE IF NOT EXISTS ordinary.t2 (value UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/t2', '{}') ORDER BY tuple()".format(i + 1))
break
except Exception as ex:
print("Got exception from node", smaller_exception(ex))
time.sleep(0.1)
node2.query("INSERT INTO ordinary.t2 SELECT number FROM numbers(10)")
node1.query("SYSTEM SYNC REPLICA ordinary.t2", timeout=10)
node3.query("SYSTEM SYNC REPLICA ordinary.t2", timeout=10)
assert node1.query("SELECT COUNT() FROM ordinary.t2") == "10\n"
assert node2.query("SELECT COUNT() FROM ordinary.t2") == "10\n"
assert node3.query("SELECT COUNT() FROM ordinary.t2") == "10\n"
assert_eq_with_retry(node1, "SELECT COUNT() FROM ordinary.t2", "10")
assert_eq_with_retry(node2, "SELECT COUNT() FROM ordinary.t2", "10")
assert_eq_with_retry(node3, "SELECT COUNT() FROM ordinary.t2", "10")
with PartitionManager() as pm:
pm.partition_instances(node2, node1)
@ -211,7 +254,7 @@ def test_blocade_leader_twice(started_cluster):
for i in range(100):
try:
node2.query("SYSTEM RESTART REPLICA ordinary.t2")
restart_replica_for_sure(node2, "ordinary.t2", "/clickhouse/t2/replicas/2")
node2.query("INSERT INTO ordinary.t2 SELECT rand() FROM numbers(100)")
break
except Exception as ex:
@ -228,7 +271,8 @@ def test_blocade_leader_twice(started_cluster):
for i in range(100):
try:
node3.query("SYSTEM RESTART REPLICA ordinary.t2")
restart_replica_for_sure(node3, "ordinary.t2", "/clickhouse/t2/replicas/3")
node3.query("SYSTEM SYNC REPLICA ordinary.t2", timeout=10)
node3.query("INSERT INTO ordinary.t2 SELECT rand() FROM numbers(100)")
break
except Exception as ex:
@ -243,6 +287,10 @@ def test_blocade_leader_twice(started_cluster):
dump_zk(node, '/clickhouse/t2', '/clickhouse/t2/replicas/{}'.format(num + 1))
assert False, "Cannot reconnect for node3"
node2.query("SYSTEM SYNC REPLICA ordinary.t2", timeout=10)
assert_eq_with_retry(node2, "SELECT COUNT() FROM ordinary.t2", "210")
assert_eq_with_retry(node3, "SELECT COUNT() FROM ordinary.t2", "210")
# Total network partition
pm.partition_instances(node3, node2)
@ -261,11 +309,10 @@ def test_blocade_leader_twice(started_cluster):
except Exception as ex:
time.sleep(0.5)
for n, node in enumerate([node1, node2, node3]):
for i in range(100):
try:
node.query("SYSTEM RESTART REPLICA ordinary.t2")
restart_replica_for_sure(node, "ordinary.t2", "/clickhouse/t2/replicas/{}".format(n + 1))
break
except Exception as ex:
try:
@ -293,13 +340,15 @@ def test_blocade_leader_twice(started_cluster):
dump_zk(node, '/clickhouse/t2', '/clickhouse/t2/replicas/{}'.format(num + 1))
assert False, "Cannot reconnect for node{}".format(n + 1)
for n, node in enumerate([node1, node2, node3]):
for i in range(100):
all_done = True
for n, node in enumerate([node1, node2, node3]):
try:
node.query("SYSTEM RESTART REPLICA ordinary.t2")
restart_replica_for_sure(node, "ordinary.t2", "/clickhouse/t2/replicas/{}".format(n + 1))
node.query("SYSTEM SYNC REPLICA ordinary.t2", timeout=10)
break
except Exception as ex:
all_done = False
try:
node.query("ATTACH TABLE ordinary.t2")
except Exception as attach_ex:
@ -307,15 +356,18 @@ def test_blocade_leader_twice(started_cluster):
print("Got exception node{}".format(n + 1), smaller_exception(ex))
time.sleep(0.5)
if all_done:
break
else:
for num, node in enumerate([node1, node2, node3]):
dump_zk(node, '/clickhouse/t2', '/clickhouse/t2/replicas/{}'.format(num + 1))
assert False, "Cannot reconnect for node{}".format(n + 1)
assert False, "Cannot reconnect in i {} retries".format(i)
assert node1.query("SELECT COUNT() FROM ordinary.t2") == "510\n"
assert_eq_with_retry(node1, "SELECT COUNT() FROM ordinary.t2", "510")
if node2.query("SELECT COUNT() FROM ordinary.t2") != "510\n":
for num, node in enumerate([node1, node2, node3]):
dump_zk(node, '/clickhouse/t2', '/clickhouse/t2/replicas/{}'.format(num + 1))
assert node2.query("SELECT COUNT() FROM ordinary.t2") == "510\n"
assert node3.query("SELECT COUNT() FROM ordinary.t2") == "510\n"
assert_eq_with_retry(node2, "SELECT COUNT() FROM ordinary.t2", "510")
assert_eq_with_retry(node3, "SELECT COUNT() FROM ordinary.t2", "510")

View File

@ -2,6 +2,7 @@
<test_keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>

View File

@ -2,6 +2,7 @@
<test_keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>

View File

@ -2,6 +2,7 @@
<test_keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>

View File

@ -6,6 +6,7 @@ import os
import time
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_test_keeper1.xml', 'configs/log_conf.xml', 'configs/use_test_keeper.xml'], stay_alive=True)
@ -234,6 +235,6 @@ def test_simple_replicated_table(started_cluster):
node1.query("SYSTEM SYNC REPLICA t", timeout=10)
node3.query("SYSTEM SYNC REPLICA t", timeout=10)
assert node1.query("SELECT COUNT() FROM t") == "10\n"
assert node2.query("SELECT COUNT() FROM t") == "10\n"
assert node3.query("SELECT COUNT() FROM t") == "10\n"
assert_eq_with_retry(node1, "SELECT COUNT() FROM t", "10")
assert_eq_with_retry(node2, "SELECT COUNT() FROM t", "10")
assert_eq_with_retry(node3, "SELECT COUNT() FROM t", "10")

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,21 @@
<yandex>
<test_keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>localhost</hostname>
<port>44444</port>
</server>
</raft_configuration>
</test_keeper_server>
</yandex>

View File

@ -0,0 +1,12 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,8 @@
<yandex>
<zookeeper>
<node index="1">
<host>node</host>
<port>9181</port>
</node>
</zookeeper>
</yandex>

View File

@ -0,0 +1,124 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import random
import string
import os
import time
from kazoo.client import KazooClient, KazooState
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', main_configs=['configs/enable_test_keeper.xml', 'configs/logs_conf.xml', 'configs/use_test_keeper.xml'], stay_alive=True)
def random_string(length):
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))
def create_random_path(prefix="", depth=1):
if depth == 0:
return prefix
return create_random_path(os.path.join(prefix, random_string(3)), depth - 1)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_connection_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
def reset_listener(state):
nonlocal _fake_zk_instance
print("Fake zk callback called for state", state)
if state != KazooState.CONNECTED:
_fake_zk_instance._reset()
_fake_zk_instance.add_listener(reset_listener)
_fake_zk_instance.start()
return _fake_zk_instance
def test_state_after_restart(started_cluster):
try:
node_zk = None
node_zk2 = None
node_zk = get_connection_zk("node")
node_zk.create("/test_state_after_restart", b"somevalue")
strs = []
for i in range(100):
strs.append(random_string(123).encode())
node_zk.create("/test_state_after_restart/node" + str(i), strs[i])
for i in range(100):
if i % 7 == 0:
node_zk.delete("/test_state_after_restart/node" + str(i))
node.restart_clickhouse(kill=True)
node_zk2 = get_connection_zk("node")
assert node_zk2.get("/test_state_after_restart")[0] == b"somevalue"
for i in range(100):
if i % 7 == 0:
assert node_zk2.exists("/test_state_after_restart/node" + str(i)) is None
else:
assert len(node_zk2.get("/test_state_after_restart/node" + str(i))[0]) == 123
assert node_zk2.get("/test_state_after_restart/node" + str(i))[0] == strs[i]
finally:
try:
if node_zk is not None:
node_zk.stop()
node_zk.close()
if node_zk2 is not None:
node_zk2.stop()
node_zk2.close()
except:
pass
# http://zookeeper-user.578899.n2.nabble.com/Why-are-ephemeral-nodes-written-to-disk-tp7583403p7583418.html
def test_ephemeral_after_restart(started_cluster):
try:
node_zk = None
node_zk2 = None
node_zk = get_connection_zk("node")
node_zk.create("/test_ephemeral_after_restart", b"somevalue")
strs = []
for i in range(100):
strs.append(random_string(123).encode())
node_zk.create("/test_ephemeral_after_restart/node" + str(i), strs[i], ephemeral=True)
for i in range(100):
if i % 7 == 0:
node_zk.delete("/test_ephemeral_after_restart/node" + str(i))
node.restart_clickhouse(kill=True)
node_zk2 = get_connection_zk("node")
assert node_zk2.get("/test_ephemeral_after_restart")[0] == b"somevalue"
for i in range(100):
if i % 7 == 0:
assert node_zk2.exists("/test_ephemeral_after_restart/node" + str(i)) is None
else:
assert len(node_zk2.get("/test_ephemeral_after_restart/node" + str(i))[0]) == 123
assert node_zk2.get("/test_ephemeral_after_restart/node" + str(i))[0] == strs[i]
finally:
try:
if node_zk is not None:
node_zk.stop()
node_zk.close()
if node_zk2 is not None:
node_zk2.stop()
node_zk2.close()
except:
pass

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,39 @@
<yandex>
<test_keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
<can_become_leader>true</can_become_leader>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>2</priority>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>1</priority>
</server>
</raft_configuration>
</test_keeper_server>
</yandex>

View File

@ -0,0 +1,39 @@
<yandex>
<test_keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
<can_become_leader>true</can_become_leader>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>2</priority>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>1</priority>
</server>
</raft_configuration>
</test_keeper_server>
</yandex>

View File

@ -0,0 +1,39 @@
<yandex>
<test_keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
<can_become_leader>true</can_become_leader>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>2</priority>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>1</priority>
</server>
</raft_configuration>
</test_keeper_server>
</yandex>

View File

@ -0,0 +1,12 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,16 @@
<yandex>
<zookeeper>
<node index="1">
<host>node1</host>
<port>9181</port>
</node>
<node index="2">
<host>node2</host>
<port>9181</port>
</node>
<node index="3">
<host>node3</host>
<port>9181</port>
</node>
</zookeeper>
</yandex>

View File

@ -0,0 +1,98 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import random
import string
import os
import time
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_test_keeper1.xml', 'configs/log_conf.xml', 'configs/use_test_keeper.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_test_keeper2.xml', 'configs/log_conf.xml', 'configs/use_test_keeper.xml'], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/enable_test_keeper3.xml', 'configs/log_conf.xml', 'configs/use_test_keeper.xml'], stay_alive=True)
from kazoo.client import KazooClient, KazooState
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
def reset_listener(state):
nonlocal _fake_zk_instance
print("Fake zk callback called for state", state)
if state != KazooState.CONNECTED:
_fake_zk_instance._reset()
_fake_zk_instance.add_listener(reset_listener)
_fake_zk_instance.start()
return _fake_zk_instance
def stop_zk(zk):
try:
if zk:
zk.stop()
zk.close()
except:
pass
def test_restart_multinode(started_cluster):
try:
node1_zk = node2_zk = node3_zk = None
node1_zk = get_fake_zk("node1")
node2_zk = get_fake_zk("node2")
node3_zk = get_fake_zk("node3")
for i in range(100):
node1_zk.create("/test_read_write_multinode_node" + str(i), ("somedata" + str(i)).encode())
for i in range(100):
if i % 10 == 0:
node1_zk.delete("/test_read_write_multinode_node" + str(i))
node2_zk.sync("/test_read_write_multinode_node0")
node3_zk.sync("/test_read_write_multinode_node0")
for i in range(100):
if i % 10 != 0:
assert node2_zk.get("/test_read_write_multinode_node" + str(i))[0] == ("somedata" + str(i)).encode()
assert node3_zk.get("/test_read_write_multinode_node" + str(i))[0] == ("somedata" + str(i)).encode()
else:
assert node2_zk.exists("/test_read_write_multinode_node" + str(i)) is None
assert node3_zk.exists("/test_read_write_multinode_node" + str(i)) is None
finally:
for zk in [node1_zk, node2_zk, node3_zk]:
stop_zk(zk)
node1.restart_clickhouse(kill=True)
node2.restart_clickhouse(kill=True)
node3.restart_clickhouse(kill=True)
for i in range(100):
try:
node1_zk = get_fake_zk("node1")
node2_zk = get_fake_zk("node2")
node3_zk = get_fake_zk("node3")
for i in range(100):
if i % 10 != 0:
assert node1_zk.get("/test_read_write_multinode_node" + str(i))[0] == ("somedata" + str(i)).encode()
assert node2_zk.get("/test_read_write_multinode_node" + str(i))[0] == ("somedata" + str(i)).encode()
assert node3_zk.get("/test_read_write_multinode_node" + str(i))[0] == ("somedata" + str(i)).encode()
else:
assert node1_zk.exists("/test_read_write_multinode_node" + str(i)) is None
assert node2_zk.exists("/test_read_write_multinode_node" + str(i)) is None
assert node3_zk.exists("/test_read_write_multinode_node" + str(i)) is None
break
except Exception as ex:
print("Got exception as ex", ex)
finally:
for zk in [node1_zk, node2_zk, node3_zk]:
stop_zk(zk)