Fix incremental backups for Log family.

This commit is contained in:
Vitaly Baranov 2022-08-30 23:36:02 +02:00
parent 0ebcbecb92
commit 007ae0e6cc
6 changed files with 105 additions and 7 deletions

View File

@ -625,7 +625,7 @@ CheckBackupResult checkBaseBackupForFile(const SizeAndChecksum & base_backup_inf
{
/// We cannot reuse base backup because our file is smaller
/// than file stored in previous backup
if (new_entry_info.size > base_backup_info.first)
if (new_entry_info.size < base_backup_info.first)
return CheckBackupResult::HasNothing;
if (base_backup_info.first == new_entry_info.size)

View File

@ -9,6 +9,11 @@ namespace ErrorCodes
extern const int ARGUMENT_OUT_OF_BOUND;
}
ConcatSeekableReadBuffer::BufferInfo::BufferInfo(BufferInfo && src) noexcept
: in(std::exchange(src.in, nullptr)), own_in(std::exchange(src.own_in, false)), size(std::exchange(src.size, 0))
{
}
ConcatSeekableReadBuffer::BufferInfo::~BufferInfo()
{
if (own_in)

View File

@ -30,7 +30,7 @@ private:
struct BufferInfo
{
BufferInfo() = default;
BufferInfo(BufferInfo &&) = default;
BufferInfo(BufferInfo && src) noexcept;
~BufferInfo();
SeekableReadBuffer * in = nullptr;
bool own_in = false;

View File

@ -18,29 +18,38 @@ public:
{
working_buffer = in.buffer();
pos = in.position();
hashing_begin = pos;
}
/// calculate hash from the data already read
if (!working_buffer.empty())
uint128 getHash()
{
if (pos > hashing_begin)
{
calculateHash(pos, working_buffer.end() - pos);
calculateHash(hashing_begin, pos - hashing_begin);
hashing_begin = pos;
}
return IHashingBuffer<ReadBuffer>::getHash();
}
private:
bool nextImpl() override
{
if (pos > hashing_begin)
calculateHash(hashing_begin, pos - hashing_begin);
in.position() = pos;
bool res = in.next();
working_buffer = in.buffer();
pos = in.position();
// `pos` may be different from working_buffer.begin() when using sophisticated ReadBuffers.
calculateHash(pos, working_buffer.end() - pos);
pos = in.position();
hashing_begin = pos;
return res;
}
ReadBuffer & in;
BufferBase::Position hashing_begin;
};
}

View File

@ -191,6 +191,7 @@ void PartMetadataManagerWithCache::getKeysAndCheckSums(Strings & keys, std::vect
{
ReadBufferFromString rbuf(values[i]);
HashingReadBuffer hbuf(rbuf);
hbuf.ignoreAll();
checksums.push_back(hbuf.getHash());
}
}

View File

@ -224,6 +224,89 @@ def test_incremental_backup_after_renaming_table():
assert instance.query("SELECT count(), sum(x) FROM test.table2") == "100\t4950\n"
def test_incremental_backup_for_log_family():
backup_name = new_backup_name()
create_and_fill_table(engine="Log")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
instance.query("INSERT INTO test.table VALUES (65, 'a'), (66, 'b')")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "102\t5081\n"
backup_name2 = new_backup_name()
instance.query(f"BACKUP TABLE test.table TO {backup_name2}")
backup_name_inc = new_backup_name()
instance.query(
f"BACKUP TABLE test.table TO {backup_name_inc} SETTINGS base_backup = {backup_name}"
)
metadata_path = os.path.join(
get_path_to_backup(backup_name), "metadata/test/table.sql"
)
metadata_path2 = os.path.join(
get_path_to_backup(backup_name2), "metadata/test/table.sql"
)
metadata_path_inc = os.path.join(
get_path_to_backup(backup_name_inc), "metadata/test/table.sql"
)
assert os.path.isfile(metadata_path)
assert os.path.isfile(metadata_path2)
assert not os.path.isfile(metadata_path_inc)
assert os.path.getsize(metadata_path) > 0
assert os.path.getsize(metadata_path) == os.path.getsize(metadata_path2)
x_bin_path = os.path.join(get_path_to_backup(backup_name), "data/test/table/x.bin")
y_bin_path = os.path.join(get_path_to_backup(backup_name), "data/test/table/y.bin")
x_bin_path2 = os.path.join(
get_path_to_backup(backup_name2), "data/test/table/x.bin"
)
y_bin_path2 = os.path.join(
get_path_to_backup(backup_name2), "data/test/table/y.bin"
)
x_bin_path_inc = os.path.join(
get_path_to_backup(backup_name_inc), "data/test/table/x.bin"
)
y_bin_path_inc = os.path.join(
get_path_to_backup(backup_name_inc), "data/test/table/y.bin"
)
assert os.path.isfile(x_bin_path)
assert os.path.isfile(y_bin_path)
assert os.path.isfile(x_bin_path2)
assert os.path.isfile(y_bin_path2)
assert os.path.isfile(x_bin_path_inc)
assert os.path.isfile(y_bin_path_inc)
x_bin_size = os.path.getsize(x_bin_path)
y_bin_size = os.path.getsize(y_bin_path)
x_bin_size2 = os.path.getsize(x_bin_path2)
y_bin_size2 = os.path.getsize(y_bin_path2)
x_bin_size_inc = os.path.getsize(x_bin_path_inc)
y_bin_size_inc = os.path.getsize(y_bin_path_inc)
assert x_bin_size > 0
assert y_bin_size > 0
assert x_bin_size2 > 0
assert y_bin_size2 > 0
assert x_bin_size_inc > 0
assert y_bin_size_inc > 0
assert x_bin_size2 == x_bin_size + x_bin_size_inc
assert y_bin_size2 == y_bin_size + y_bin_size_inc
instance.query(f"RESTORE TABLE test.table AS test.table2 FROM {backup_name_inc}")
assert instance.query("SELECT count(), sum(x) FROM test.table2") == "102\t5081\n"
def test_backup_not_found_or_already_exists():
backup_name = new_backup_name()