Merge pull request #22860 from ClickHouse/fix_coordination_build_macos

Fix Coordination MacOS build
This commit is contained in:
alexey-milovidov 2021-04-09 09:48:44 +03:00 committed by GitHub
commit bcb8da031d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 161 additions and 161 deletions

View File

@ -11,7 +11,7 @@ if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/NuRaft/CMakeLists.txt")
return()
endif ()
if (NOT OS_FREEBSD AND NOT OS_DARWIN)
if (NOT OS_FREEBSD)
set (USE_NURAFT 1)
set (NURAFT_LIBRARY nuraft)

View File

@ -44,8 +44,8 @@ ChangelogFileDescription getChangelogFileDescription(const std::string & path_st
ChangelogFileDescription result;
result.prefix = filename_parts[0];
result.from_log_index = parse<size_t>(filename_parts[1]);
result.to_log_index = parse<size_t>(filename_parts[2]);
result.from_log_index = parse<uint64_t>(filename_parts[1]);
result.to_log_index = parse<uint64_t>(filename_parts[2]);
result.path = path_str;
return result;
}
@ -73,7 +73,7 @@ Checksum computeRecordChecksum(const ChangelogRecord & record)
class ChangelogWriter
{
public:
ChangelogWriter(const std::string & filepath_, WriteMode mode, size_t start_index_)
ChangelogWriter(const std::string & filepath_, WriteMode mode, uint64_t start_index_)
: filepath(filepath_)
, plain_buf(filepath, DBMS_DEFAULT_BUFFER_SIZE, mode == WriteMode::Rewrite ? -1 : (O_APPEND | O_CREAT | O_WRONLY))
, start_index(start_index_)
@ -115,22 +115,22 @@ public:
plain_buf.sync();
}
size_t getEntriesWritten() const
uint64_t getEntriesWritten() const
{
return entries_written;
}
void setEntriesWritten(size_t entries_written_)
void setEntriesWritten(uint64_t entries_written_)
{
entries_written = entries_written_;
}
size_t getStartIndex() const
uint64_t getStartIndex() const
{
return start_index;
}
void setStartIndex(size_t start_index_)
void setStartIndex(uint64_t start_index_)
{
start_index = start_index_;
}
@ -138,14 +138,14 @@ public:
private:
std::string filepath;
WriteBufferFromFile plain_buf;
size_t entries_written = 0;
size_t start_index;
uint64_t entries_written = 0;
uint64_t start_index;
};
struct ChangelogReadResult
{
size_t entries_read;
size_t first_read_index;
uint64_t entries_read;
uint64_t first_read_index;
off_t last_position;
bool error;
};
@ -158,9 +158,9 @@ public:
, read_buf(filepath)
{}
ChangelogReadResult readChangelog(IndexToLogEntry & logs, size_t start_log_index, IndexToOffset & index_to_offset, Poco::Logger * log)
ChangelogReadResult readChangelog(IndexToLogEntry & logs, uint64_t start_log_index, IndexToOffset & index_to_offset, Poco::Logger * log)
{
size_t previous_index = 0;
uint64_t previous_index = 0;
ChangelogReadResult result{};
try
{
@ -247,7 +247,7 @@ private:
ReadBufferFromFile read_buf;
};
Changelog::Changelog(const std::string & changelogs_dir_, size_t rotate_interval_, Poco::Logger * log_)
Changelog::Changelog(const std::string & changelogs_dir_, uint64_t rotate_interval_, Poco::Logger * log_)
: changelogs_dir(changelogs_dir_)
, rotate_interval(rotate_interval_)
, log(log_)
@ -263,15 +263,15 @@ Changelog::Changelog(const std::string & changelogs_dir_, size_t rotate_interval
}
}
void Changelog::readChangelogAndInitWriter(size_t last_commited_log_index, size_t logs_to_keep)
void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uint64_t logs_to_keep)
{
size_t total_read = 0;
size_t entries_in_last = 0;
size_t incomplete_log_index = 0;
uint64_t total_read = 0;
uint64_t entries_in_last = 0;
uint64_t incomplete_log_index = 0;
ChangelogReadResult result{};
size_t first_read_index = 0;
uint64_t first_read_index = 0;
size_t start_to_read_from = last_commited_log_index;
uint64_t start_to_read_from = last_commited_log_index;
if (start_to_read_from > logs_to_keep)
start_to_read_from -= logs_to_keep;
else
@ -355,7 +355,7 @@ void Changelog::readChangelogAndInitWriter(size_t last_commited_log_index, size_
rotate(start_index + total_read);
}
void Changelog::rotate(size_t new_start_log_index)
void Changelog::rotate(uint64_t new_start_log_index)
{
ChangelogFileDescription new_description;
new_description.prefix = DEFAULT_PREFIX;
@ -369,7 +369,7 @@ void Changelog::rotate(size_t new_start_log_index)
current_writer = std::make_unique<ChangelogWriter>(new_description.path, WriteMode::Rewrite, new_start_log_index);
}
ChangelogRecord Changelog::buildRecord(size_t index, const LogEntryPtr & log_entry)
ChangelogRecord Changelog::buildRecord(uint64_t index, const LogEntryPtr & log_entry)
{
ChangelogRecord record;
record.header.version = ChangelogVersion::V0;
@ -387,7 +387,7 @@ ChangelogRecord Changelog::buildRecord(size_t index, const LogEntryPtr & log_ent
return record;
}
void Changelog::appendEntry(size_t index, const LogEntryPtr & log_entry, bool force_sync)
void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry, bool force_sync)
{
if (!current_writer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Changelog must be initialized before appending records");
@ -405,7 +405,7 @@ void Changelog::appendEntry(size_t index, const LogEntryPtr & log_entry, bool fo
logs[index] = makeClone(log_entry);
}
void Changelog::writeAt(size_t index, const LogEntryPtr & log_entry, bool force_sync)
void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry, bool force_sync)
{
if (index_to_start_pos.count(index) == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot write at index {} because changelog doesn't contain it", index);
@ -439,7 +439,7 @@ void Changelog::writeAt(size_t index, const LogEntryPtr & log_entry, bool force_
}
/// Remove redundant logs from memory
for (size_t i = index; ; ++i)
for (uint64_t i = index; ; ++i)
{
auto log_itr = logs.find(i);
if (log_itr == logs.end())
@ -454,7 +454,7 @@ void Changelog::writeAt(size_t index, const LogEntryPtr & log_entry, bool force_
appendEntry(index, log_entry, force_sync);
}
void Changelog::compact(size_t up_to_log_index)
void Changelog::compact(uint64_t up_to_log_index)
{
for (auto itr = existing_changelogs.begin(); itr != existing_changelogs.end();)
{
@ -476,9 +476,9 @@ void Changelog::compact(size_t up_to_log_index)
LogEntryPtr Changelog::getLastEntry() const
{
static LogEntryPtr fake_entry = nuraft::cs_new<nuraft::log_entry>(0, nuraft::buffer::alloc(sizeof(size_t)));
static LogEntryPtr fake_entry = nuraft::cs_new<nuraft::log_entry>(0, nuraft::buffer::alloc(sizeof(uint64_t)));
size_t next_index = getNextEntryIndex() - 1;
uint64_t next_index = getNextEntryIndex() - 1;
auto entry = logs.find(next_index);
if (entry == logs.end())
return fake_entry;
@ -486,13 +486,13 @@ LogEntryPtr Changelog::getLastEntry() const
return entry->second;
}
LogEntriesPtr Changelog::getLogEntriesBetween(size_t start, size_t end)
LogEntriesPtr Changelog::getLogEntriesBetween(uint64_t start, uint64_t end)
{
LogEntriesPtr ret = nuraft::cs_new<std::vector<nuraft::ptr<nuraft::log_entry>>>();
ret->resize(end - start);
size_t result_pos = 0;
for (size_t i = start; i < end; ++i)
uint64_t result_pos = 0;
for (uint64_t i = start; i < end; ++i)
{
(*ret)[result_pos] = entryAt(i);
result_pos++;
@ -500,7 +500,7 @@ LogEntriesPtr Changelog::getLogEntriesBetween(size_t start, size_t end)
return ret;
}
LogEntryPtr Changelog::entryAt(size_t index)
LogEntryPtr Changelog::entryAt(uint64_t index)
{
nuraft::ptr<nuraft::log_entry> src = nullptr;
auto entry = logs.find(index);
@ -511,12 +511,12 @@ LogEntryPtr Changelog::entryAt(size_t index)
return src;
}
nuraft::ptr<nuraft::buffer> Changelog::serializeEntriesToBuffer(size_t index, int32_t count)
nuraft::ptr<nuraft::buffer> Changelog::serializeEntriesToBuffer(uint64_t index, int32_t count)
{
std::vector<nuraft::ptr<nuraft::buffer>> returned_logs;
size_t size_total = 0;
for (size_t i = index; i < index + count; ++i)
uint64_t size_total = 0;
for (uint64_t i = index; i < index + count; ++i)
{
auto entry = logs.find(i);
if (entry == logs.end())
@ -540,14 +540,14 @@ nuraft::ptr<nuraft::buffer> Changelog::serializeEntriesToBuffer(size_t index, in
return buf_out;
}
void Changelog::applyEntriesFromBuffer(size_t index, nuraft::buffer & buffer, bool force_sync)
void Changelog::applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer, bool force_sync)
{
buffer.pos(0);
int num_logs = buffer.get_int();
for (int i = 0; i < num_logs; ++i)
{
size_t cur_index = index + i;
uint64_t cur_index = index + i;
int buf_size = buffer.get_int();
nuraft::ptr<nuraft::buffer> buf_local = nuraft::buffer::alloc(buf_size);

View File

@ -17,8 +17,8 @@ using LogEntries = std::vector<LogEntryPtr>;
using LogEntriesPtr = nuraft::ptr<LogEntries>;
using BufferPtr = nuraft::ptr<nuraft::buffer>;
using IndexToOffset = std::unordered_map<size_t, off_t>;
using IndexToLogEntry = std::unordered_map<size_t, LogEntryPtr>;
using IndexToOffset = std::unordered_map<uint64_t, off_t>;
using IndexToLogEntry = std::unordered_map<uint64_t, LogEntryPtr>;
enum class ChangelogVersion : uint8_t
{
@ -30,10 +30,10 @@ static constexpr auto CURRENT_CHANGELOG_VERSION = ChangelogVersion::V0;
struct ChangelogRecordHeader
{
ChangelogVersion version = CURRENT_CHANGELOG_VERSION;
size_t index; /// entry log number
size_t term;
uint64_t index; /// entry log number
uint64_t term;
nuraft::log_val_type value_type;
size_t blob_size;
uint64_t blob_size;
};
/// Changelog record on disk
@ -48,8 +48,8 @@ struct ChangelogRecord
struct ChangelogFileDescription
{
std::string prefix;
size_t from_log_index;
size_t to_log_index;
uint64_t from_log_index;
uint64_t to_log_index;
std::string path;
};
@ -63,27 +63,27 @@ class Changelog
{
public:
Changelog(const std::string & changelogs_dir_, size_t rotate_interval_, Poco::Logger * log_);
Changelog(const std::string & changelogs_dir_, uint64_t rotate_interval_, Poco::Logger * log_);
/// Read changelog from files on changelogs_dir_ skipping all entries before from_log_index
/// Truncate broken entries, remove files after broken entries.
void readChangelogAndInitWriter(size_t last_commited_log_index, size_t logs_to_keep);
void readChangelogAndInitWriter(uint64_t last_commited_log_index, uint64_t logs_to_keep);
/// Add entry to log with index. Call fsync if force_sync true.
void appendEntry(size_t index, const LogEntryPtr & log_entry, bool force_sync);
void appendEntry(uint64_t index, const LogEntryPtr & log_entry, bool force_sync);
/// Write entry at index and truncate all subsequent entries.
void writeAt(size_t index, const LogEntryPtr & log_entry, bool force_sync);
void writeAt(uint64_t index, const LogEntryPtr & log_entry, bool force_sync);
/// Remove log files with to_log_index <= up_to_log_index.
void compact(size_t up_to_log_index);
void compact(uint64_t up_to_log_index);
size_t getNextEntryIndex() const
uint64_t getNextEntryIndex() const
{
return start_index + logs.size();
}
size_t getStartIndex() const
uint64_t getStartIndex() const
{
return start_index;
}
@ -92,21 +92,21 @@ public:
LogEntryPtr getLastEntry() const;
/// Return log entries between [start, end)
LogEntriesPtr getLogEntriesBetween(size_t start_index, size_t end_index);
LogEntriesPtr getLogEntriesBetween(uint64_t start_index, uint64_t end_index);
/// Return entry at position index
LogEntryPtr entryAt(size_t index);
LogEntryPtr entryAt(uint64_t index);
/// Serialize entries from index into buffer
BufferPtr serializeEntriesToBuffer(size_t index, int32_t count);
BufferPtr serializeEntriesToBuffer(uint64_t index, int32_t count);
/// Apply entries from buffer overriding existing entries
void applyEntriesFromBuffer(size_t index, nuraft::buffer & buffer, bool force_sync);
void applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer, bool force_sync);
/// Fsync log to disk
void flush();
size_t size() const
uint64_t size() const
{
return logs.size();
}
@ -116,21 +116,21 @@ public:
private:
/// Pack log_entry into changelog record
static ChangelogRecord buildRecord(size_t index, const LogEntryPtr & log_entry);
static ChangelogRecord buildRecord(uint64_t index, const LogEntryPtr & log_entry);
/// Starts new file [new_start_log_index, new_start_log_index + rotate_interval]
void rotate(size_t new_start_log_index);
void rotate(uint64_t new_start_log_index);
private:
const std::string changelogs_dir;
const size_t rotate_interval;
const uint64_t rotate_interval;
Poco::Logger * log;
std::map<size_t, ChangelogFileDescription> existing_changelogs;
std::map<uint64_t, ChangelogFileDescription> existing_changelogs;
std::unique_ptr<ChangelogWriter> current_writer;
IndexToOffset index_to_start_pos;
IndexToLogEntry logs;
size_t start_index = 0;
uint64_t start_index = 0;
};
}

View File

@ -16,16 +16,16 @@ ptr<log_entry> makeClone(const ptr<log_entry> & entry)
InMemoryLogStore::InMemoryLogStore()
: start_idx(1)
{
nuraft::ptr<nuraft::buffer> buf = nuraft::buffer::alloc(sizeof(size_t));
nuraft::ptr<nuraft::buffer> buf = nuraft::buffer::alloc(sizeof(uint64_t));
logs[0] = nuraft::cs_new<nuraft::log_entry>(0, buf);
}
size_t InMemoryLogStore::start_index() const
uint64_t InMemoryLogStore::start_index() const
{
return start_idx;
}
size_t InMemoryLogStore::next_slot() const
uint64_t InMemoryLogStore::next_slot() const
{
std::lock_guard<std::mutex> l(logs_lock);
// Exclude the dummy entry.
@ -34,7 +34,7 @@ size_t InMemoryLogStore::next_slot() const
nuraft::ptr<nuraft::log_entry> InMemoryLogStore::last_entry() const
{
size_t next_idx = next_slot();
uint64_t next_idx = next_slot();
std::lock_guard<std::mutex> lock(logs_lock);
auto entry = logs.find(next_idx - 1);
if (entry == logs.end())
@ -43,17 +43,17 @@ nuraft::ptr<nuraft::log_entry> InMemoryLogStore::last_entry() const
return makeClone(entry->second);
}
size_t InMemoryLogStore::append(nuraft::ptr<nuraft::log_entry> & entry)
uint64_t InMemoryLogStore::append(nuraft::ptr<nuraft::log_entry> & entry)
{
ptr<log_entry> clone = makeClone(entry);
std::lock_guard<std::mutex> l(logs_lock);
size_t idx = start_idx + logs.size() - 1;
uint64_t idx = start_idx + logs.size() - 1;
logs[idx] = clone;
return idx;
}
void InMemoryLogStore::write_at(size_t index, nuraft::ptr<nuraft::log_entry> & entry)
void InMemoryLogStore::write_at(uint64_t index, nuraft::ptr<nuraft::log_entry> & entry)
{
nuraft::ptr<log_entry> clone = makeClone(entry);
@ -65,14 +65,14 @@ void InMemoryLogStore::write_at(size_t index, nuraft::ptr<nuraft::log_entry> & e
logs[index] = clone;
}
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> InMemoryLogStore::log_entries(size_t start, size_t end)
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> InMemoryLogStore::log_entries(uint64_t start, uint64_t end)
{
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> ret =
nuraft::cs_new<std::vector<nuraft::ptr<nuraft::log_entry>>>();
ret->resize(end - start);
size_t cc = 0;
for (size_t i = start; i < end; ++i)
uint64_t cc = 0;
for (uint64_t i = start; i < end; ++i)
{
nuraft::ptr<nuraft::log_entry> src = nullptr;
{
@ -90,7 +90,7 @@ nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> InMemoryLogStore::log_e
return ret;
}
nuraft::ptr<nuraft::log_entry> InMemoryLogStore::entry_at(size_t index)
nuraft::ptr<nuraft::log_entry> InMemoryLogStore::entry_at(uint64_t index)
{
nuraft::ptr<nuraft::log_entry> src = nullptr;
{
@ -103,9 +103,9 @@ nuraft::ptr<nuraft::log_entry> InMemoryLogStore::entry_at(size_t index)
return makeClone(src);
}
size_t InMemoryLogStore::term_at(size_t index)
uint64_t InMemoryLogStore::term_at(uint64_t index)
{
size_t term = 0;
uint64_t term = 0;
{
std::lock_guard<std::mutex> l(logs_lock);
auto entry = logs.find(index);
@ -116,12 +116,12 @@ size_t InMemoryLogStore::term_at(size_t index)
return term;
}
nuraft::ptr<nuraft::buffer> InMemoryLogStore::pack(size_t index, Int32 cnt)
nuraft::ptr<nuraft::buffer> InMemoryLogStore::pack(uint64_t index, Int32 cnt)
{
std::vector<nuraft::ptr<nuraft::buffer>> returned_logs;
size_t size_total = 0;
for (size_t ii = index; ii < index + cnt; ++ii)
uint64_t uint64_total = 0;
for (uint64_t ii = index; ii < index + cnt; ++ii)
{
ptr<log_entry> le = nullptr;
{
@ -130,11 +130,11 @@ nuraft::ptr<nuraft::buffer> InMemoryLogStore::pack(size_t index, Int32 cnt)
}
assert(le.get());
nuraft::ptr<nuraft::buffer> buf = le->serialize();
size_total += buf->size();
uint64_total += buf->size();
returned_logs.push_back(buf);
}
nuraft::ptr<buffer> buf_out = nuraft::buffer::alloc(sizeof(int32) + cnt * sizeof(int32) + size_total);
nuraft::ptr<buffer> buf_out = nuraft::buffer::alloc(sizeof(int32) + cnt * sizeof(int32) + uint64_total);
buf_out->pos(0);
buf_out->put(static_cast<Int32>(cnt));
@ -147,14 +147,14 @@ nuraft::ptr<nuraft::buffer> InMemoryLogStore::pack(size_t index, Int32 cnt)
return buf_out;
}
void InMemoryLogStore::apply_pack(size_t index, nuraft::buffer & pack)
void InMemoryLogStore::apply_pack(uint64_t index, nuraft::buffer & pack)
{
pack.pos(0);
Int32 num_logs = pack.get_int();
for (Int32 i = 0; i < num_logs; ++i)
{
size_t cur_idx = index + i;
uint64_t cur_idx = index + i;
Int32 buf_size = pack.get_int();
nuraft::ptr<nuraft::buffer> buf_local = nuraft::buffer::alloc(buf_size);
@ -177,10 +177,10 @@ void InMemoryLogStore::apply_pack(size_t index, nuraft::buffer & pack)
}
}
bool InMemoryLogStore::compact(size_t last_log_index)
bool InMemoryLogStore::compact(uint64_t last_log_index)
{
std::lock_guard<std::mutex> l(logs_lock);
for (size_t ii = start_idx; ii <= last_log_index; ++ii)
for (uint64_t ii = start_idx; ii <= last_log_index; ++ii)
{
auto entry = logs.find(ii);
if (entry != logs.end())

View File

@ -14,34 +14,34 @@ class InMemoryLogStore : public nuraft::log_store
public:
InMemoryLogStore();
size_t start_index() const override;
uint64_t start_index() const override;
size_t next_slot() const override;
uint64_t next_slot() const override;
nuraft::ptr<nuraft::log_entry> last_entry() const override;
size_t append(nuraft::ptr<nuraft::log_entry> & entry) override;
uint64_t append(nuraft::ptr<nuraft::log_entry> & entry) override;
void write_at(size_t index, nuraft::ptr<nuraft::log_entry> & entry) override;
void write_at(uint64_t index, nuraft::ptr<nuraft::log_entry> & entry) override;
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> log_entries(size_t start, size_t end) override;
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> log_entries(uint64_t start, uint64_t end) override;
nuraft::ptr<nuraft::log_entry> entry_at(size_t index) override;
nuraft::ptr<nuraft::log_entry> entry_at(uint64_t index) override;
size_t term_at(size_t index) override;
uint64_t term_at(uint64_t index) override;
nuraft::ptr<nuraft::buffer> pack(size_t index, Int32 cnt) override;
nuraft::ptr<nuraft::buffer> pack(uint64_t index, Int32 cnt) override;
void apply_pack(size_t index, nuraft::buffer & pack) override;
void apply_pack(uint64_t index, nuraft::buffer & pack) override;
bool compact(size_t last_log_index) override;
bool compact(uint64_t last_log_index) override;
bool flush() override { return true; }
private:
std::map<size_t, nuraft::ptr<nuraft::log_entry>> logs;
std::map<uint64_t, nuraft::ptr<nuraft::log_entry>> logs;
mutable std::mutex logs_lock;
std::atomic<size_t> start_idx;
std::atomic<uint64_t> start_idx;
};
}

View File

@ -3,26 +3,26 @@
namespace DB
{
KeeperLogStore::KeeperLogStore(const std::string & changelogs_path, size_t rotate_interval_, bool force_sync_)
KeeperLogStore::KeeperLogStore(const std::string & changelogs_path, uint64_t rotate_interval_, bool force_sync_)
: log(&Poco::Logger::get("KeeperLogStore"))
, changelog(changelogs_path, rotate_interval_, log)
, force_sync(force_sync_)
{
}
size_t KeeperLogStore::start_index() const
uint64_t KeeperLogStore::start_index() const
{
std::lock_guard lock(changelog_lock);
return changelog.getStartIndex();
}
void KeeperLogStore::init(size_t last_commited_log_index, size_t logs_to_keep)
void KeeperLogStore::init(uint64_t last_commited_log_index, uint64_t logs_to_keep)
{
std::lock_guard lock(changelog_lock);
changelog.readChangelogAndInitWriter(last_commited_log_index, logs_to_keep);
}
size_t KeeperLogStore::next_slot() const
uint64_t KeeperLogStore::next_slot() const
{
std::lock_guard lock(changelog_lock);
return changelog.getNextEntryIndex();
@ -34,34 +34,34 @@ nuraft::ptr<nuraft::log_entry> KeeperLogStore::last_entry() const
return changelog.getLastEntry();
}
size_t KeeperLogStore::append(nuraft::ptr<nuraft::log_entry> & entry)
uint64_t KeeperLogStore::append(nuraft::ptr<nuraft::log_entry> & entry)
{
std::lock_guard lock(changelog_lock);
size_t idx = changelog.getNextEntryIndex();
uint64_t idx = changelog.getNextEntryIndex();
changelog.appendEntry(idx, entry, force_sync);
return idx;
}
void KeeperLogStore::write_at(size_t index, nuraft::ptr<nuraft::log_entry> & entry)
void KeeperLogStore::write_at(uint64_t index, nuraft::ptr<nuraft::log_entry> & entry)
{
std::lock_guard lock(changelog_lock);
changelog.writeAt(index, entry, force_sync);
}
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> KeeperLogStore::log_entries(size_t start, size_t end)
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> KeeperLogStore::log_entries(uint64_t start, uint64_t end)
{
std::lock_guard lock(changelog_lock);
return changelog.getLogEntriesBetween(start, end);
}
nuraft::ptr<nuraft::log_entry> KeeperLogStore::entry_at(size_t index)
nuraft::ptr<nuraft::log_entry> KeeperLogStore::entry_at(uint64_t index)
{
std::lock_guard lock(changelog_lock);
return changelog.entryAt(index);
}
size_t KeeperLogStore::term_at(size_t index)
uint64_t KeeperLogStore::term_at(uint64_t index)
{
std::lock_guard lock(changelog_lock);
auto entry = changelog.entryAt(index);
@ -70,13 +70,13 @@ size_t KeeperLogStore::term_at(size_t index)
return 0;
}
nuraft::ptr<nuraft::buffer> KeeperLogStore::pack(size_t index, int32_t cnt)
nuraft::ptr<nuraft::buffer> KeeperLogStore::pack(uint64_t index, int32_t cnt)
{
std::lock_guard lock(changelog_lock);
return changelog.serializeEntriesToBuffer(index, cnt);
}
bool KeeperLogStore::compact(size_t last_log_index)
bool KeeperLogStore::compact(uint64_t last_log_index)
{
std::lock_guard lock(changelog_lock);
changelog.compact(last_log_index);
@ -90,13 +90,13 @@ bool KeeperLogStore::flush()
return true;
}
void KeeperLogStore::apply_pack(size_t index, nuraft::buffer & pack)
void KeeperLogStore::apply_pack(uint64_t index, nuraft::buffer & pack)
{
std::lock_guard lock(changelog_lock);
changelog.applyEntriesFromBuffer(index, pack, force_sync);
}
size_t KeeperLogStore::size() const
uint64_t KeeperLogStore::size() const
{
std::lock_guard lock(changelog_lock);
return changelog.size();

View File

@ -12,35 +12,35 @@ namespace DB
class KeeperLogStore : public nuraft::log_store
{
public:
KeeperLogStore(const std::string & changelogs_path, size_t rotate_interval_, bool force_sync_);
KeeperLogStore(const std::string & changelogs_path, uint64_t rotate_interval_, bool force_sync_);
void init(size_t last_commited_log_index, size_t logs_to_keep);
void init(uint64_t last_commited_log_index, uint64_t logs_to_keep);
size_t start_index() const override;
uint64_t start_index() const override;
size_t next_slot() const override;
uint64_t next_slot() const override;
nuraft::ptr<nuraft::log_entry> last_entry() const override;
size_t append(nuraft::ptr<nuraft::log_entry> & entry) override;
uint64_t append(nuraft::ptr<nuraft::log_entry> & entry) override;
void write_at(size_t index, nuraft::ptr<nuraft::log_entry> & entry) override;
void write_at(uint64_t index, nuraft::ptr<nuraft::log_entry> & entry) override;
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> log_entries(size_t start, size_t end) override;
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> log_entries(uint64_t start, uint64_t end) override;
nuraft::ptr<nuraft::log_entry> entry_at(size_t index) override;
nuraft::ptr<nuraft::log_entry> entry_at(uint64_t index) override;
size_t term_at(size_t index) override;
uint64_t term_at(uint64_t index) override;
nuraft::ptr<nuraft::buffer> pack(size_t index, int32_t cnt) override;
nuraft::ptr<nuraft::buffer> pack(uint64_t index, int32_t cnt) override;
void apply_pack(size_t index, nuraft::buffer & pack) override;
void apply_pack(uint64_t index, nuraft::buffer & pack) override;
bool compact(size_t last_log_index) override;
bool compact(uint64_t last_log_index) override;
bool flush() override;
size_t size() const;
uint64_t size() const;
private:
mutable std::mutex changelog_lock;

View File

@ -23,16 +23,16 @@ namespace ErrorCodes
namespace
{
size_t getSnapshotPathUpToLogIdx(const String & snapshot_path)
uint64_t getSnapshotPathUpToLogIdx(const String & snapshot_path)
{
std::filesystem::path path(snapshot_path);
std::string filename = path.stem();
Strings name_parts;
splitInto<'_'>(name_parts, filename);
return parse<size_t>(name_parts[1]);
return parse<uint64_t>(name_parts[1]);
}
std::string getSnapshotFileName(size_t up_to_log_idx)
std::string getSnapshotFileName(uint64_t up_to_log_idx)
{
return std::string{"snapshot_"} + std::to_string(up_to_log_idx) + ".bin";
}
@ -214,7 +214,7 @@ SnapshotMetadataPtr KeeperStorageSnapshot::deserialize(KeeperStorage & storage,
return result;
}
KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, size_t up_to_log_idx_)
KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t up_to_log_idx_)
: storage(storage_)
, snapshot_meta(std::make_shared<SnapshotMetadata>(up_to_log_idx_, 0, std::make_shared<nuraft::cluster_config>()))
, session_id(storage->session_id_counter)
@ -266,7 +266,7 @@ KeeperSnapshotManager::KeeperSnapshotManager(const std::string & snapshots_path_
}
std::string KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffer & buffer, size_t up_to_log_idx)
std::string KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx)
{
ReadBufferFromNuraftBuffer reader(buffer);
@ -307,7 +307,7 @@ nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeLatestSnapshotBuff
return nullptr;
}
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeSnapshotBufferFromDisk(size_t up_to_log_idx) const
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeSnapshotBufferFromDisk(uint64_t up_to_log_idx) const
{
const std::string & snapshot_path = existing_snapshots.at(up_to_log_idx);
WriteBufferFromNuraftBuffer writer;
@ -352,7 +352,7 @@ void KeeperSnapshotManager::removeOutdatedSnapshotsIfNeeded()
removeSnapshot(existing_snapshots.begin()->first);
}
void KeeperSnapshotManager::removeSnapshot(size_t log_idx)
void KeeperSnapshotManager::removeSnapshot(uint64_t log_idx)
{
auto itr = existing_snapshots.find(log_idx);
if (itr == existing_snapshots.end())

View File

@ -18,7 +18,7 @@ enum SnapshotVersion : uint8_t
struct KeeperStorageSnapshot
{
public:
KeeperStorageSnapshot(KeeperStorage * storage_, size_t up_to_log_idx_);
KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t up_to_log_idx_);
KeeperStorageSnapshot(KeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_);
~KeeperStorageSnapshot();
@ -51,14 +51,14 @@ public:
SnapshotMetaAndStorage restoreFromLatestSnapshot();
static nuraft::ptr<nuraft::buffer> serializeSnapshotToBuffer(const KeeperStorageSnapshot & snapshot);
std::string serializeSnapshotBufferToDisk(nuraft::buffer & buffer, size_t up_to_log_idx);
std::string serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx);
SnapshotMetaAndStorage deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const;
nuraft::ptr<nuraft::buffer> deserializeSnapshotBufferFromDisk(size_t up_to_log_idx) const;
nuraft::ptr<nuraft::buffer> deserializeSnapshotBufferFromDisk(uint64_t up_to_log_idx) const;
nuraft::ptr<nuraft::buffer> deserializeLatestSnapshotBufferFromDisk();
void removeSnapshot(size_t log_idx);
void removeSnapshot(uint64_t log_idx);
size_t totalSnapshots() const
{
@ -76,7 +76,7 @@ private:
void removeOutdatedSnapshotsIfNeeded();
const std::string snapshots_path;
const size_t snapshots_to_keep;
std::map<size_t, std::string> existing_snapshots;
std::map<uint64_t, std::string> existing_snapshots;
size_t storage_tick_time;
};

View File

@ -54,7 +54,7 @@ void KeeperStateMachine::init()
bool has_snapshots = snapshot_manager.totalSnapshots() != 0;
while (snapshot_manager.totalSnapshots() != 0)
{
size_t latest_log_index = snapshot_manager.getLatestSnapshotIndex();
uint64_t latest_log_index = snapshot_manager.getLatestSnapshotIndex();
LOG_DEBUG(log, "Trying to load state machine from snapshot up to log index {}", latest_log_index);
try
@ -88,7 +88,7 @@ void KeeperStateMachine::init()
storage = std::make_unique<KeeperStorage>(coordination_settings->dead_session_check_period_ms.totalMilliseconds());
}
nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const size_t log_idx, nuraft::buffer & data)
nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data)
{
if (data.size() == sizeof(int64_t))
{
@ -205,7 +205,7 @@ void KeeperStateMachine::create_snapshot(
void KeeperStateMachine::save_logical_snp_obj(
nuraft::snapshot & s,
size_t & obj_id,
uint64_t & obj_id,
nuraft::buffer & data,
bool /*is_first_obj*/,
bool /*is_last_obj*/)
@ -246,7 +246,7 @@ void KeeperStateMachine::save_logical_snp_obj(
int KeeperStateMachine::read_logical_snp_obj(
nuraft::snapshot & s,
void* & /*user_snp_ctx*/,
ulong obj_id,
uint64_t obj_id,
nuraft::ptr<nuraft::buffer> & data_out,
bool & is_last_obj)
{

View File

@ -20,13 +20,13 @@ public:
void init();
nuraft::ptr<nuraft::buffer> pre_commit(const size_t /*log_idx*/, nuraft::buffer & /*data*/) override { return nullptr; }
nuraft::ptr<nuraft::buffer> pre_commit(const uint64_t /*log_idx*/, nuraft::buffer & /*data*/) override { return nullptr; }
nuraft::ptr<nuraft::buffer> commit(const size_t log_idx, nuraft::buffer & data) override;
nuraft::ptr<nuraft::buffer> commit(const uint64_t log_idx, nuraft::buffer & data) override;
void rollback(const size_t /*log_idx*/, nuraft::buffer & /*data*/) override {}
void rollback(const uint64_t /*log_idx*/, nuraft::buffer & /*data*/) override {}
size_t last_commit_index() override { return last_committed_idx; }
uint64_t last_commit_index() override { return last_committed_idx; }
bool apply_snapshot(nuraft::snapshot & s) override;
@ -38,7 +38,7 @@ public:
void save_logical_snp_obj(
nuraft::snapshot & s,
size_t & obj_id,
uint64_t & obj_id,
nuraft::buffer & data,
bool is_first_obj,
bool is_last_obj) override;
@ -46,7 +46,7 @@ public:
int read_logical_snp_obj(
nuraft::snapshot & s,
void* & user_snp_ctx,
ulong obj_id,
uint64_t obj_id,
nuraft::ptr<nuraft::buffer> & data_out,
bool & is_last_obj) override;
@ -82,7 +82,7 @@ private:
std::mutex storage_lock;
/// Last committed Raft log number.
std::atomic<size_t> last_committed_idx;
std::atomic<uint64_t> last_committed_idx;
Poco::Logger * log;
};

View File

@ -64,7 +64,7 @@ KeeperStateManager::KeeperStateManager(
throw Exception(ErrorCodes::RAFT_ERROR, "At least one of servers should be able to start as leader (without <start_as_follower>)");
}
void KeeperStateManager::loadLogStore(size_t last_commited_index, size_t logs_to_keep)
void KeeperStateManager::loadLogStore(uint64_t last_commited_index, uint64_t logs_to_keep)
{
log_store->init(last_commited_index, logs_to_keep);
}

View File

@ -25,7 +25,7 @@ public:
int port,
const std::string & logs_path);
void loadLogStore(size_t last_commited_index, size_t logs_to_keep);
void loadLogStore(uint64_t last_commited_index, uint64_t logs_to_keep);
void flushLogStore();
@ -54,12 +54,12 @@ public:
nuraft::ptr<KeeperLogStore> getLogStore() const { return log_store; }
size_t getTotalServers() const { return total_servers; }
uint64_t getTotalServers() const { return total_servers; }
private:
int my_server_id;
int my_port;
size_t total_servers{0};
uint64_t total_servers{0};
std::unordered_set<int> start_as_follower_servers;
nuraft::ptr<KeeperLogStore> log_store;
nuraft::ptr<nuraft::srv_config> my_server_config;

View File

@ -21,7 +21,7 @@ SummingStateMachine::SummingStateMachine()
{
}
nuraft::ptr<nuraft::buffer> SummingStateMachine::commit(const size_t log_idx, nuraft::buffer & data)
nuraft::ptr<nuraft::buffer> SummingStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data)
{
int64_t value_to_add = deserializeValue(data);
@ -84,7 +84,7 @@ void SummingStateMachine::createSnapshotInternal(nuraft::snapshot & s)
void SummingStateMachine::save_logical_snp_obj(
nuraft::snapshot & s,
size_t & obj_id,
uint64_t & obj_id,
nuraft::buffer & data,
bool /*is_first_obj*/,
bool /*is_last_obj*/)
@ -112,7 +112,7 @@ void SummingStateMachine::save_logical_snp_obj(
int SummingStateMachine::read_logical_snp_obj(
nuraft::snapshot & s,
void* & /*user_snp_ctx*/,
size_t obj_id,
uint64_t obj_id,
nuraft::ptr<nuraft::buffer> & data_out,
bool & is_last_obj)
{
@ -142,7 +142,7 @@ int SummingStateMachine::read_logical_snp_obj(
else
{
// Object ID > 0: second object, put actual value.
data_out = nuraft::buffer::alloc(sizeof(size_t));
data_out = nuraft::buffer::alloc(sizeof(uint64_t));
nuraft::buffer_serializer bs(data_out);
bs.put_u64(ctx->value);
is_last_obj = true;

View File

@ -15,13 +15,13 @@ class SummingStateMachine : public nuraft::state_machine
public:
SummingStateMachine();
nuraft::ptr<nuraft::buffer> pre_commit(const size_t /*log_idx*/, nuraft::buffer & /*data*/) override { return nullptr; }
nuraft::ptr<nuraft::buffer> pre_commit(const uint64_t /*log_idx*/, nuraft::buffer & /*data*/) override { return nullptr; }
nuraft::ptr<nuraft::buffer> commit(const size_t log_idx, nuraft::buffer & data) override;
nuraft::ptr<nuraft::buffer> commit(const uint64_t log_idx, nuraft::buffer & data) override;
void rollback(const size_t /*log_idx*/, nuraft::buffer & /*data*/) override {}
void rollback(const uint64_t /*log_idx*/, nuraft::buffer & /*data*/) override {}
size_t last_commit_index() override { return last_committed_idx; }
uint64_t last_commit_index() override { return last_committed_idx; }
bool apply_snapshot(nuraft::snapshot & s) override;
@ -33,7 +33,7 @@ public:
void save_logical_snp_obj(
nuraft::snapshot & s,
size_t & obj_id,
uint64_t & obj_id,
nuraft::buffer & data,
bool is_first_obj,
bool is_last_obj) override;
@ -41,7 +41,7 @@ public:
int read_logical_snp_obj(
nuraft::snapshot & s,
void* & user_snp_ctx,
size_t obj_id,
uint64_t obj_id,
nuraft::ptr<nuraft::buffer> & data_out,
bool & is_last_obj) override;

View File

@ -1085,7 +1085,7 @@ nuraft::ptr<nuraft::log_entry> getLogEntryFromZKRequest(size_t term, int64_t ses
return nuraft::cs_new<nuraft::log_entry>(term, buffer);
}
void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, size_t total_logs)
void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint64_t total_logs)
{
using namespace Coordination;
using namespace DB;