From 1063b22b4c62b498d232f8acc10017663debdf21 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 19 Jan 2021 12:40:25 +0300 Subject: [PATCH] Add write buffer from nuraft --- src/Coordination/ReadBufferFromNuraftBuffer.h | 17 +++++ .../WriteBufferFromNuraftBuffer.cpp | 66 +++++++++++++++++++ .../WriteBufferFromNuraftBuffer.h | 30 +++++++++ src/Coordination/tests/gtest_for_build.cpp | 37 +++++++++++ 4 files changed, 150 insertions(+) create mode 100644 src/Coordination/ReadBufferFromNuraftBuffer.h create mode 100644 src/Coordination/WriteBufferFromNuraftBuffer.cpp create mode 100644 src/Coordination/WriteBufferFromNuraftBuffer.h diff --git a/src/Coordination/ReadBufferFromNuraftBuffer.h b/src/Coordination/ReadBufferFromNuraftBuffer.h new file mode 100644 index 00000000000..392a97bdd8f --- /dev/null +++ b/src/Coordination/ReadBufferFromNuraftBuffer.h @@ -0,0 +1,17 @@ +#pragma once +#include + +#include + +namespace DB +{ + +class ReadBufferFromNuraftBuffer : public ReadBufferFromMemory +{ +public: + explicit ReadBufferFromNuraftBuffer(nuraft::ptr buffer) + : ReadBufferFromMemory(buffer->data_begin(), buffer->size()) + {} +}; + +} diff --git a/src/Coordination/WriteBufferFromNuraftBuffer.cpp b/src/Coordination/WriteBufferFromNuraftBuffer.cpp new file mode 100644 index 00000000000..09e1034ae8f --- /dev/null +++ b/src/Coordination/WriteBufferFromNuraftBuffer.cpp @@ -0,0 +1,66 @@ +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int CANNOT_WRITE_AFTER_END_OF_BUFFER; +} + +void WriteBufferFromNuraftBuffer::nextImpl() +{ + if (is_finished) + throw Exception("WriteBufferFromNuraftBuffer is finished", ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER); + + size_t old_size = buffer->size(); + /// pos may not be equal to vector.data() + old_size, because WriteBuffer::next() can be used to flush data + size_t pos_offset = pos - reinterpret_cast(buffer->data_begin()); + nuraft::ptr new_buffer = nuraft::buffer::alloc(old_size * size_multiplier); + memcpy(new_buffer->data_begin(), buffer->data_begin(), buffer->size()); + buffer = new_buffer; + internal_buffer = Buffer(reinterpret_cast(buffer->data_begin() + pos_offset), reinterpret_cast(buffer->data_begin() + buffer->size())); + working_buffer = internal_buffer; +} + +WriteBufferFromNuraftBuffer::WriteBufferFromNuraftBuffer() + : WriteBuffer(nullptr, 0) +{ + buffer = nuraft::buffer::alloc(initial_size); + set(reinterpret_cast(buffer->data_begin()), buffer->size()); +} + +void WriteBufferFromNuraftBuffer::finalize() +{ + if (is_finished) + return; + + is_finished = true; + size_t real_size = position() - reinterpret_cast(buffer->data_begin()); + nuraft::ptr new_buffer = nuraft::buffer::alloc(real_size); + memcpy(new_buffer->data_begin(), buffer->data_begin(), real_size); + buffer = new_buffer; + + /// Prevent further writes. + set(nullptr, 0); +} + +nuraft::ptr WriteBufferFromNuraftBuffer::getBuffer() +{ + finalize(); + return buffer; +} + + WriteBufferFromNuraftBuffer::~WriteBufferFromNuraftBuffer() +{ + try + { + finalize(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } +} + +} diff --git a/src/Coordination/WriteBufferFromNuraftBuffer.h b/src/Coordination/WriteBufferFromNuraftBuffer.h new file mode 100644 index 00000000000..47a01fbc2a4 --- /dev/null +++ b/src/Coordination/WriteBufferFromNuraftBuffer.h @@ -0,0 +1,30 @@ +#pragma once + +#include +#include + +namespace DB +{ + +class WriteBufferFromNuraftBuffer : public WriteBuffer +{ +private: + nuraft::ptr buffer; + bool is_finished = false; + + static constexpr size_t initial_size = 32; + static constexpr size_t size_multiplier = 2; + + void nextImpl() override; + +public: + WriteBufferFromNuraftBuffer(); + + void finalize() override final; + nuraft::ptr getBuffer(); + bool isFinished() const { return is_finished; } + + ~WriteBufferFromNuraftBuffer() override; +}; + +} diff --git a/src/Coordination/tests/gtest_for_build.cpp b/src/Coordination/tests/gtest_for_build.cpp index 188565de4ce..38602e48fae 100644 --- a/src/Coordination/tests/gtest_for_build.cpp +++ b/src/Coordination/tests/gtest_for_build.cpp @@ -4,6 +4,10 @@ #include #include #include +#include +#include +#include +#include #include #include #include @@ -26,6 +30,39 @@ TEST(CoordinationTest, BuildTest) EXPECT_EQ(1, 1); } +TEST(CoordinationTest, BufferSerde) +{ + Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Get); + request->xid = 3; + dynamic_cast(request.get())->path = "/path/value"; + + DB::WriteBufferFromNuraftBuffer wbuf; + request->write(wbuf); + auto nuraft_buffer = wbuf.getBuffer(); + EXPECT_EQ(nuraft_buffer->size(), 28); + + DB::ReadBufferFromNuraftBuffer rbuf(nuraft_buffer); + + int32_t length; + Coordination::read(length, rbuf); + EXPECT_EQ(length + sizeof(length), nuraft_buffer->size()); + + int32_t xid; + Coordination::read(xid, rbuf); + EXPECT_EQ(xid, request->xid); + + Coordination::OpNum opnum; + Coordination::read(opnum, rbuf); + + Coordination::ZooKeeperRequestPtr request_read = Coordination::ZooKeeperRequestFactory::instance().get(opnum); + request_read->xid = xid; + request_read->readImpl(rbuf); + + EXPECT_EQ(request_read->getOpNum(), Coordination::OpNum::Get); + EXPECT_EQ(request_read->xid, 3); + EXPECT_EQ(dynamic_cast(request_read.get())->path, "/path/value"); +} + struct SummingRaftServer { SummingRaftServer(int server_id_, const std::string & hostname_, int port_)