From 007ae0e6cc5e92fc04df7590458f68a79f3432d8 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Tue, 30 Aug 2022 23:36:02 +0200 Subject: [PATCH] Fix incremental backups for Log family. --- src/Backups/BackupImpl.cpp | 2 +- src/IO/ConcatSeekableReadBuffer.cpp | 5 ++ src/IO/ConcatSeekableReadBuffer.h | 2 +- src/IO/HashingReadBuffer.h | 19 +++-- .../PartMetadataManagerWithCache.cpp | 1 + .../test_backup_restore_new/test.py | 83 +++++++++++++++++++ 6 files changed, 105 insertions(+), 7 deletions(-) diff --git a/src/Backups/BackupImpl.cpp b/src/Backups/BackupImpl.cpp index f6442545f48..c9bd0be8e47 100644 --- a/src/Backups/BackupImpl.cpp +++ b/src/Backups/BackupImpl.cpp @@ -625,7 +625,7 @@ CheckBackupResult checkBaseBackupForFile(const SizeAndChecksum & base_backup_inf { /// We cannot reuse base backup because our file is smaller /// than file stored in previous backup - if (new_entry_info.size > base_backup_info.first) + if (new_entry_info.size < base_backup_info.first) return CheckBackupResult::HasNothing; if (base_backup_info.first == new_entry_info.size) diff --git a/src/IO/ConcatSeekableReadBuffer.cpp b/src/IO/ConcatSeekableReadBuffer.cpp index c5d48376e2f..0943d1eac45 100644 --- a/src/IO/ConcatSeekableReadBuffer.cpp +++ b/src/IO/ConcatSeekableReadBuffer.cpp @@ -9,6 +9,11 @@ namespace ErrorCodes extern const int ARGUMENT_OUT_OF_BOUND; } +ConcatSeekableReadBuffer::BufferInfo::BufferInfo(BufferInfo && src) noexcept + : in(std::exchange(src.in, nullptr)), own_in(std::exchange(src.own_in, false)), size(std::exchange(src.size, 0)) +{ +} + ConcatSeekableReadBuffer::BufferInfo::~BufferInfo() { if (own_in) diff --git a/src/IO/ConcatSeekableReadBuffer.h b/src/IO/ConcatSeekableReadBuffer.h index 5d7dca82524..c8c16c5d887 100644 --- a/src/IO/ConcatSeekableReadBuffer.h +++ b/src/IO/ConcatSeekableReadBuffer.h @@ -30,7 +30,7 @@ private: struct BufferInfo { BufferInfo() = default; - BufferInfo(BufferInfo &&) = default; + BufferInfo(BufferInfo && src) noexcept; ~BufferInfo(); SeekableReadBuffer * in = nullptr; bool own_in = false; diff --git a/src/IO/HashingReadBuffer.h b/src/IO/HashingReadBuffer.h index 5d42c64478c..a0a029e6f85 100644 --- a/src/IO/HashingReadBuffer.h +++ b/src/IO/HashingReadBuffer.h @@ -18,29 +18,38 @@ public: { working_buffer = in.buffer(); pos = in.position(); + hashing_begin = pos; + } - /// calculate hash from the data already read - if (!working_buffer.empty()) + uint128 getHash() + { + if (pos > hashing_begin) { - calculateHash(pos, working_buffer.end() - pos); + calculateHash(hashing_begin, pos - hashing_begin); + hashing_begin = pos; } + return IHashingBuffer::getHash(); } private: bool nextImpl() override { + if (pos > hashing_begin) + calculateHash(hashing_begin, pos - hashing_begin); + in.position() = pos; bool res = in.next(); working_buffer = in.buffer(); - pos = in.position(); // `pos` may be different from working_buffer.begin() when using sophisticated ReadBuffers. - calculateHash(pos, working_buffer.end() - pos); + pos = in.position(); + hashing_begin = pos; return res; } ReadBuffer & in; + BufferBase::Position hashing_begin; }; } diff --git a/src/Storages/MergeTree/PartMetadataManagerWithCache.cpp b/src/Storages/MergeTree/PartMetadataManagerWithCache.cpp index 9930aca2576..5a291373e6c 100644 --- a/src/Storages/MergeTree/PartMetadataManagerWithCache.cpp +++ b/src/Storages/MergeTree/PartMetadataManagerWithCache.cpp @@ -191,6 +191,7 @@ void PartMetadataManagerWithCache::getKeysAndCheckSums(Strings & keys, std::vect { ReadBufferFromString rbuf(values[i]); HashingReadBuffer hbuf(rbuf); + hbuf.ignoreAll(); checksums.push_back(hbuf.getHash()); } } diff --git a/tests/integration/test_backup_restore_new/test.py b/tests/integration/test_backup_restore_new/test.py index 2fe3bb99e45..ca0d6a632a0 100644 --- a/tests/integration/test_backup_restore_new/test.py +++ b/tests/integration/test_backup_restore_new/test.py @@ -224,6 +224,89 @@ def test_incremental_backup_after_renaming_table(): assert instance.query("SELECT count(), sum(x) FROM test.table2") == "100\t4950\n" +def test_incremental_backup_for_log_family(): + backup_name = new_backup_name() + create_and_fill_table(engine="Log") + + assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" + instance.query(f"BACKUP TABLE test.table TO {backup_name}") + + instance.query("INSERT INTO test.table VALUES (65, 'a'), (66, 'b')") + + assert instance.query("SELECT count(), sum(x) FROM test.table") == "102\t5081\n" + + backup_name2 = new_backup_name() + instance.query(f"BACKUP TABLE test.table TO {backup_name2}") + + backup_name_inc = new_backup_name() + instance.query( + f"BACKUP TABLE test.table TO {backup_name_inc} SETTINGS base_backup = {backup_name}" + ) + + metadata_path = os.path.join( + get_path_to_backup(backup_name), "metadata/test/table.sql" + ) + + metadata_path2 = os.path.join( + get_path_to_backup(backup_name2), "metadata/test/table.sql" + ) + + metadata_path_inc = os.path.join( + get_path_to_backup(backup_name_inc), "metadata/test/table.sql" + ) + + assert os.path.isfile(metadata_path) + assert os.path.isfile(metadata_path2) + assert not os.path.isfile(metadata_path_inc) + assert os.path.getsize(metadata_path) > 0 + assert os.path.getsize(metadata_path) == os.path.getsize(metadata_path2) + + x_bin_path = os.path.join(get_path_to_backup(backup_name), "data/test/table/x.bin") + y_bin_path = os.path.join(get_path_to_backup(backup_name), "data/test/table/y.bin") + + x_bin_path2 = os.path.join( + get_path_to_backup(backup_name2), "data/test/table/x.bin" + ) + y_bin_path2 = os.path.join( + get_path_to_backup(backup_name2), "data/test/table/y.bin" + ) + + x_bin_path_inc = os.path.join( + get_path_to_backup(backup_name_inc), "data/test/table/x.bin" + ) + + y_bin_path_inc = os.path.join( + get_path_to_backup(backup_name_inc), "data/test/table/y.bin" + ) + + assert os.path.isfile(x_bin_path) + assert os.path.isfile(y_bin_path) + assert os.path.isfile(x_bin_path2) + assert os.path.isfile(y_bin_path2) + assert os.path.isfile(x_bin_path_inc) + assert os.path.isfile(y_bin_path_inc) + + x_bin_size = os.path.getsize(x_bin_path) + y_bin_size = os.path.getsize(y_bin_path) + x_bin_size2 = os.path.getsize(x_bin_path2) + y_bin_size2 = os.path.getsize(y_bin_path2) + x_bin_size_inc = os.path.getsize(x_bin_path_inc) + y_bin_size_inc = os.path.getsize(y_bin_path_inc) + + assert x_bin_size > 0 + assert y_bin_size > 0 + assert x_bin_size2 > 0 + assert y_bin_size2 > 0 + assert x_bin_size_inc > 0 + assert y_bin_size_inc > 0 + assert x_bin_size2 == x_bin_size + x_bin_size_inc + assert y_bin_size2 == y_bin_size + y_bin_size_inc + + instance.query(f"RESTORE TABLE test.table AS test.table2 FROM {backup_name_inc}") + + assert instance.query("SELECT count(), sum(x) FROM test.table2") == "102\t5081\n" + + def test_backup_not_found_or_already_exists(): backup_name = new_backup_name()