Add write buffer from nuraft

This commit is contained in:
alesapin 2021-01-19 12:40:25 +03:00
parent 6883143994
commit 1063b22b4c
4 changed files with 150 additions and 0 deletions

View File

@ -0,0 +1,17 @@
#pragma once
#include <IO/ReadBufferFromMemory.h>
#include <libnuraft/nuraft.hxx>
namespace DB
{
class ReadBufferFromNuraftBuffer : public ReadBufferFromMemory
{
public:
explicit ReadBufferFromNuraftBuffer(nuraft::ptr<nuraft::buffer> buffer)
: ReadBufferFromMemory(buffer->data_begin(), buffer->size())
{}
};
}

View File

@ -0,0 +1,66 @@
#include <Coordination/WriteBufferFromNuraftBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_WRITE_AFTER_END_OF_BUFFER;
}
void WriteBufferFromNuraftBuffer::nextImpl()
{
if (is_finished)
throw Exception("WriteBufferFromNuraftBuffer is finished", ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER);
size_t old_size = buffer->size();
/// pos may not be equal to vector.data() + old_size, because WriteBuffer::next() can be used to flush data
size_t pos_offset = pos - reinterpret_cast<Position>(buffer->data_begin());
nuraft::ptr<nuraft::buffer> new_buffer = nuraft::buffer::alloc(old_size * size_multiplier);
memcpy(new_buffer->data_begin(), buffer->data_begin(), buffer->size());
buffer = new_buffer;
internal_buffer = Buffer(reinterpret_cast<Position>(buffer->data_begin() + pos_offset), reinterpret_cast<Position>(buffer->data_begin() + buffer->size()));
working_buffer = internal_buffer;
}
WriteBufferFromNuraftBuffer::WriteBufferFromNuraftBuffer()
: WriteBuffer(nullptr, 0)
{
buffer = nuraft::buffer::alloc(initial_size);
set(reinterpret_cast<Position>(buffer->data_begin()), buffer->size());
}
void WriteBufferFromNuraftBuffer::finalize()
{
if (is_finished)
return;
is_finished = true;
size_t real_size = position() - reinterpret_cast<Position>(buffer->data_begin());
nuraft::ptr<nuraft::buffer> new_buffer = nuraft::buffer::alloc(real_size);
memcpy(new_buffer->data_begin(), buffer->data_begin(), real_size);
buffer = new_buffer;
/// Prevent further writes.
set(nullptr, 0);
}
nuraft::ptr<nuraft::buffer> WriteBufferFromNuraftBuffer::getBuffer()
{
finalize();
return buffer;
}
WriteBufferFromNuraftBuffer::~WriteBufferFromNuraftBuffer()
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}

View File

@ -0,0 +1,30 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <libnuraft/nuraft.hxx>
namespace DB
{
class WriteBufferFromNuraftBuffer : public WriteBuffer
{
private:
nuraft::ptr<nuraft::buffer> buffer;
bool is_finished = false;
static constexpr size_t initial_size = 32;
static constexpr size_t size_multiplier = 2;
void nextImpl() override;
public:
WriteBufferFromNuraftBuffer();
void finalize() override final;
nuraft::ptr<nuraft::buffer> getBuffer();
bool isFinished() const { return is_finished; }
~WriteBufferFromNuraftBuffer() override;
};
}

View File

@ -4,6 +4,10 @@
#include <Coordination/InMemoryStateManager.h>
#include <Coordination/SummingStateMachine.h>
#include <Coordination/LoggerWrapper.h>
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <Coordination/ReadBufferFromNuraftBuffer.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Exception.h>
#include <libnuraft/nuraft.hxx>
#include <thread>
@ -26,6 +30,39 @@ TEST(CoordinationTest, BuildTest)
EXPECT_EQ(1, 1);
}
TEST(CoordinationTest, BufferSerde)
{
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Get);
request->xid = 3;
dynamic_cast<Coordination::ZooKeeperGetRequest *>(request.get())->path = "/path/value";
DB::WriteBufferFromNuraftBuffer wbuf;
request->write(wbuf);
auto nuraft_buffer = wbuf.getBuffer();
EXPECT_EQ(nuraft_buffer->size(), 28);
DB::ReadBufferFromNuraftBuffer rbuf(nuraft_buffer);
int32_t length;
Coordination::read(length, rbuf);
EXPECT_EQ(length + sizeof(length), nuraft_buffer->size());
int32_t xid;
Coordination::read(xid, rbuf);
EXPECT_EQ(xid, request->xid);
Coordination::OpNum opnum;
Coordination::read(opnum, rbuf);
Coordination::ZooKeeperRequestPtr request_read = Coordination::ZooKeeperRequestFactory::instance().get(opnum);
request_read->xid = xid;
request_read->readImpl(rbuf);
EXPECT_EQ(request_read->getOpNum(), Coordination::OpNum::Get);
EXPECT_EQ(request_read->xid, 3);
EXPECT_EQ(dynamic_cast<Coordination::ZooKeeperGetRequest *>(request_read.get())->path, "/path/value");
}
struct SummingRaftServer
{
SummingRaftServer(int server_id_, const std::string & hostname_, int port_)