Fix bug which can lead to corrupted logs in compressed format

This commit is contained in:
alesapin 2022-05-04 18:08:00 +02:00
parent d4db9740f2
commit c6556da5fc
4 changed files with 145 additions and 28 deletions

View File

@ -84,7 +84,8 @@ public:
}
else if (compression_method == CompressionMethod::Zstd)
{
compressed_buffer = std::make_unique<ZstdDeflatingAppendableWriteBuffer>(std::move(file_buf), /* compression level = */ 3, /* append_to_existing_stream = */ mode == WriteMode::Append);
compressed_buffer = std::make_unique<ZstdDeflatingAppendableWriteBuffer>(
std::move(file_buf), /* compression level = */ 3, /* append_to_existing_file_ = */ mode == WriteMode::Append);
}
else
{
@ -140,7 +141,7 @@ private:
std::string filepath;
std::unique_ptr<WriteBufferFromFile> file_buf;
std::unique_ptr<WriteBufferWithOwnMemoryDecorator> compressed_buffer;
std::unique_ptr<ZstdDeflatingAppendableWriteBuffer> compressed_buffer;
uint64_t start_index;
};

View File

@ -1604,44 +1604,112 @@ TEST_P(CoordinationTest, TestStorageSnapshotDifferentCompressions)
EXPECT_EQ(restored_storage->session_and_timeout.size(), 2);
}
TEST_P(CoordinationTest, ChangelogInsertThreeTimes)
TEST_P(CoordinationTest, ChangelogInsertThreeTimesSmooth)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
{
std::cerr << "================Firts time=====================\n";
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
changelog.init(1, 0);
auto entry = getLogEntry("hello_world", 1000);
changelog.append(entry);
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.next_slot(), 2);
}
{
std::cerr << "================Second time=====================\n";
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
changelog.init(1, 0);
auto entry = getLogEntry("hello_world", 1000);
changelog.append(entry);
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.next_slot(), 3);
}
{
std::cerr << "================Third time=====================\n";
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
changelog.init(1, 0);
auto entry = getLogEntry("hello_world", 1000);
changelog.append(entry);
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.next_slot(), 4);
}
{
std::cerr << "================Fourth time=====================\n";
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
changelog.init(1, 0);
auto entry = getLogEntry("hello_world", 1000);
changelog.append(entry);
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.next_slot(), 5);
}
}
TEST_P(CoordinationTest, ChangelogInsertMultipleTimesSmooth)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
for (size_t i = 0; i < 36; ++i)
{
std::cerr << "================Firts time=====================\n";
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
changelog.init(1, 0);
for (size_t j = 0; j < 7; ++j)
{
auto entry = getLogEntry("hello_world", 7);
changelog.append(entry);
}
changelog.end_of_append_batch(0, 0);
}
DB::KeeperLogStore changelog("./logs", 100, true, params.enable_compression);
changelog.init(1, 0);
EXPECT_EQ(changelog.next_slot(), 36 * 7 + 1);
}
TEST_P(CoordinationTest, ChangelogInsertThreeTimesHard)
{
auto params = GetParam();
ChangelogDirTest test("./logs");
std::cerr << "================Firts time=====================\n";
DB::KeeperLogStore changelog1("./logs", 100, true, params.enable_compression);
changelog1.init(1, 0);
auto entry = getLogEntry("hello_world", 1000);
changelog1.append(entry);
changelog1.end_of_append_batch(0, 0);
EXPECT_EQ(changelog1.next_slot(), 2);
std::cerr << "================Second time=====================\n";
DB::KeeperLogStore changelog2("./logs", 100, true, params.enable_compression);
changelog2.init(1, 0);
entry = getLogEntry("hello_world", 1000);
changelog2.append(entry);
changelog2.end_of_append_batch(0, 0);
EXPECT_EQ(changelog2.next_slot(), 3);
std::cerr << "================Third time=====================\n";
DB::KeeperLogStore changelog3("./logs", 100, true, params.enable_compression);
changelog3.init(1, 0);
entry = getLogEntry("hello_world", 1000);
changelog3.append(entry);
changelog3.end_of_append_batch(0, 0);
EXPECT_EQ(changelog3.next_slot(), 4);
std::cerr << "================Fourth time=====================\n";
DB::KeeperLogStore changelog4("./logs", 100, true, params.enable_compression);
changelog4.init(1, 0);
entry = getLogEntry("hello_world", 1000);
changelog4.append(entry);
changelog4.end_of_append_batch(0, 0);
EXPECT_EQ(changelog4.next_slot(), 5);
}
TEST_P(CoordinationTest, TestLogGap)
{
using namespace Coordination;

View File

@ -1,5 +1,7 @@
#include <IO/ZstdDeflatingAppendableWriteBuffer.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
#include <IO/ReadBufferFromFile.h>
namespace DB
{
@ -10,10 +12,15 @@ namespace ErrorCodes
}
ZstdDeflatingAppendableWriteBuffer::ZstdDeflatingAppendableWriteBuffer(
std::unique_ptr<WriteBuffer> out_, int compression_level, bool append_to_existing_stream_,
size_t buf_size, char * existing_memory, size_t alignment)
: WriteBufferWithOwnMemoryDecorator(std::move(out_), buf_size, existing_memory, alignment)
, append_to_existing_stream(append_to_existing_stream_)
std::unique_ptr<WriteBufferFromFile> out_,
int compression_level,
bool append_to_existing_file_,
size_t buf_size,
char * existing_memory,
size_t alignment)
: BufferWithOwnMemory(buf_size, existing_memory, alignment)
, out(std::move(out_))
, append_to_existing_file(append_to_existing_file_)
{
cctx = ZSTD_createCCtx();
if (cctx == nullptr)
@ -31,13 +38,11 @@ void ZstdDeflatingAppendableWriteBuffer::nextImpl()
if (!offset())
return;
ZSTD_EndDirective mode = ZSTD_e_flush;
input.src = reinterpret_cast<unsigned char *>(working_buffer.begin());
input.size = offset();
input.pos = 0;
if (first_write && append_to_existing_stream)
if (first_write && append_to_existing_file && isNeedToAddEmptyBlock())
{
addEmptyBlock();
first_write = false;
@ -54,11 +59,12 @@ void ZstdDeflatingAppendableWriteBuffer::nextImpl()
output.size = out->buffer().size();
output.pos = out->offset();
size_t compression_result = ZSTD_compressStream2(cctx, &output, &input, mode);
size_t compression_result = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_flush);
if (ZSTD_isError(compression_result))
throw Exception(
ErrorCodes::ZSTD_ENCODER_FAILED, "Zstd stream encoding failed: error code: {}; zstd version: {}", ZSTD_getErrorName(compression_result), ZSTD_VERSION_STRING);
first_write = false;
out->position() = out->buffer().begin() + output.pos;
bool everything_was_compressed = (input.pos == input.size);
@ -73,6 +79,7 @@ void ZstdDeflatingAppendableWriteBuffer::nextImpl()
out->position() = out->buffer().begin();
throw;
}
}
ZstdDeflatingAppendableWriteBuffer::~ZstdDeflatingAppendableWriteBuffer()
@ -87,10 +94,22 @@ void ZstdDeflatingAppendableWriteBuffer::finalizeImpl()
/// To free cctx
finalizeZstd();
/// Nothing was written
return;
}
WriteBufferDecorator::finalizeImpl();
else
{
try
{
finalizeBefore();
out->finalize();
finalizeAfter();
}
catch (...)
{
/// Do not try to flush next time after exception.
out->position() = out->buffer().begin();
throw;
}
}
}
void ZstdDeflatingAppendableWriteBuffer::finalizeBefore()
@ -111,7 +130,7 @@ void ZstdDeflatingAppendableWriteBuffer::finalizeBefore()
while (remaining != 0)
{
if (ZSTD_isError(remaining))
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "zstd stream encoder end failed: error: '{}' zstd version: {}", ZSTD_getErrorName(remaining), ZSTD_VERSION_STRING);
throw Exception(ErrorCodes::ZSTD_ENCODER_FAILED, "Zstd stream encoder end failed: error: '{}' zstd version: {}", ZSTD_getErrorName(remaining), ZSTD_VERSION_STRING);
remaining = ZSTD_compressStream2(cctx, &output, &input, ZSTD_e_end);
}
@ -143,14 +162,35 @@ void ZstdDeflatingAppendableWriteBuffer::finalizeZstd()
void ZstdDeflatingAppendableWriteBuffer::addEmptyBlock()
{
/// HACK: https://github.com/facebook/zstd/issues/2090#issuecomment-620158967
static const char empty_block[3] = {0x01, 0x00, 0x00};
if (out->buffer().size() - out->offset() < sizeof(empty_block))
if (out->buffer().size() - out->offset() < ZSTD_CORRECT_TERMINATION_LAST_BLOCK.size())
out->next();
std::memcpy(out->buffer().begin() + out->offset(), empty_block, sizeof(empty_block));
std::memcpy(out->buffer().begin() + out->offset(), ZSTD_CORRECT_TERMINATION_LAST_BLOCK.data(), ZSTD_CORRECT_TERMINATION_LAST_BLOCK.size());
out->position() = out->buffer().begin() + out->offset() + sizeof(empty_block);
out->position() = out->buffer().begin() + out->offset() + ZSTD_CORRECT_TERMINATION_LAST_BLOCK.size();
}
bool ZstdDeflatingAppendableWriteBuffer::isNeedToAddEmptyBlock()
{
ReadBufferFromFile reader(out->getFileName());
auto fsize = reader.size();
if (fsize > 3)
{
std::array<char, 3> result;
reader.seek(fsize - 3, SEEK_SET);
reader.readStrict(result.data(), 3);
return result != ZSTD_CORRECT_TERMINATION_LAST_BLOCK;
}
else if (fsize > 0)
{
throw Exception(
ErrorCodes::ZSTD_ENCODER_FAILED,
"Trying to write to non-empty file '{}' with tiny size {}. It can lead to data corruption",
out->getFileName(), fsize);
}
return false;
}
}

View File

@ -4,6 +4,7 @@
#include <IO/CompressionMethod.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferDecorator.h>
#include <IO/WriteBufferFromFile.h>
#include <zstd.h>
@ -20,13 +21,16 @@ namespace DB
/// said that there is no risks of compatibility issues https://github.com/facebook/zstd/issues/2090#issuecomment-620158967.
/// 2) Doesn't support internal ZSTD check-summing, because ZSTD checksums written at the end of frame (frame epilogue).
///
class ZstdDeflatingAppendableWriteBuffer : public WriteBufferWithOwnMemoryDecorator
class ZstdDeflatingAppendableWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{
public:
using ZSTDLastBlock = const std::array<char, 3>;
static inline constexpr ZSTDLastBlock ZSTD_CORRECT_TERMINATION_LAST_BLOCK = {0x01, 0x00, 0x00};
ZstdDeflatingAppendableWriteBuffer(
std::unique_ptr<WriteBuffer> out_,
std::unique_ptr<WriteBufferFromFile> out_,
int compression_level,
bool append_to_existing_stream_, /// if true then out mustn't be empty
bool append_to_existing_file_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
size_t alignment = 0);
@ -39,6 +43,8 @@ public:
out->sync();
}
WriteBuffer * getNestedBuffer() { return out.get(); }
private:
/// NOTE: will fill compressed data to the out.working_buffer, but will not call out.next method until the buffer is full
void nextImpl() override;
@ -50,15 +56,17 @@ private:
/// After the first call to this function, subsequent calls will have no effect and
/// an attempt to write to this buffer will result in exception.
void finalizeImpl() override;
void finalizeBefore() override;
void finalizeAfter() override;
void finalizeBefore();
void finalizeAfter();
void finalizeZstd();
bool isNeedToAddEmptyBlock();
/// Adding zstd empty block to out.working_buffer
void addEmptyBlock();
/// We appending data to existing stream so on the first nextImpl call we
/// will append empty block.
bool append_to_existing_stream;
std::unique_ptr<WriteBufferFromFile> out;
bool append_to_existing_file = false;
ZSTD_CCtx * cctx;
ZSTD_inBuffer input;
ZSTD_outBuffer output;