Support BACKUP & RESTORE for log family.

This commit is contained in:
Vitaly Baranov 2021-10-26 12:48:31 +03:00
parent 9d967e9883
commit a1f29d31ea
11 changed files with 331 additions and 12 deletions

View File

@ -38,6 +38,11 @@ void FileChecker::setPath(const String & file_info_path_)
files_info_path = file_info_path_;
}
String FileChecker::getPath() const
{
return files_info_path;
}
void FileChecker::update(const String & full_file_path)
{
bool exists = disk->exists(full_file_path);

View File

@ -13,7 +13,9 @@ class FileChecker
{
public:
FileChecker(DiskPtr disk_, const String & file_info_path_);
void setPath(const String & file_info_path_);
String getPath() const;
void update(const String & full_file_path);
void setEmpty(const String & full_file_path);

View File

@ -218,7 +218,7 @@ bool IStorage::isStaticStorage() const
return false;
}
BackupEntries IStorage::backup(const ASTs &, ContextPtr) const
BackupEntries IStorage::backup(const ASTs &, ContextPtr)
{
throw Exception("Table engine " + getName() + " doesn't support backups", ErrorCodes::NOT_IMPLEMENTED);
}

View File

@ -205,7 +205,7 @@ public:
NameDependencies getDependentViewsByColumn(ContextPtr context) const;
/// Prepares entries to backup data of the storage.
virtual BackupEntries backup(const ASTs & partitions, ContextPtr context) const;
virtual BackupEntries backup(const ASTs & partitions, ContextPtr context);
/// Extract data from the backup and put it to the storage.
virtual RestoreDataTasks restoreFromBackup(const BackupPtr & backup, const String & data_path_in_backup, const ASTs & partitions, ContextMutablePtr context);

View File

@ -3500,7 +3500,7 @@ Pipe MergeTreeData::alterPartition(
}
BackupEntries MergeTreeData::backup(const ASTs & partitions, ContextPtr local_context) const
BackupEntries MergeTreeData::backup(const ASTs & partitions, ContextPtr local_context)
{
DataPartsVector data_parts;
if (partitions.empty())
@ -3522,7 +3522,7 @@ BackupEntries MergeTreeData::backupDataParts(const DataPartsVector & data_parts)
auto temp_dir_it = temp_dirs.find(disk);
if (temp_dir_it == temp_dirs.end())
temp_dir_it = temp_dirs.emplace(disk, std::make_shared<TemporaryFileOnDisk>(disk, "tmp_backup_")).first;
temp_dir_it = temp_dirs.emplace(disk, std::make_shared<TemporaryFileOnDisk>(disk, "tmp/backup_")).first;
auto temp_dir_owner = temp_dir_it->second;
fs::path temp_dir = temp_dir_owner->getPath();

View File

@ -630,7 +630,7 @@ public:
TableLockHolder & table_lock_holder);
/// Prepares entries to backup data of the storage.
BackupEntries backup(const ASTs & partitions, ContextPtr context) const override;
BackupEntries backup(const ASTs & partitions, ContextPtr context) override;
static BackupEntries backupDataParts(const DataPartsVector & data_parts);
/// Extract data from the backup and put it to the storage.

View File

@ -12,6 +12,7 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
@ -27,6 +28,11 @@
#include <QueryPipeline/Pipe.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Backups/BackupEntryFromImmutableFile.h>
#include <Backups/BackupEntryFromSmallFile.h>
#include <Backups/IBackup.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <cassert>
#include <chrono>
@ -46,6 +52,7 @@ namespace ErrorCodes
extern const int SIZES_OF_MARKS_FILES_ARE_INCONSISTENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int INCORRECT_FILE_NAME;
extern const int NOT_IMPLEMENTED;
}
/// NOTE: The lock `StorageLog::rwlock` is NOT kept locked while reading,
@ -879,6 +886,162 @@ IStorage::ColumnSizeByName StorageLog::getColumnSizes() const
}
BackupEntries StorageLog::backup(const ASTs & partitions, ContextPtr context)
{
if (!partitions.empty())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Table engine {} doesn't support partitions", getName());
auto lock_timeout = getLockTimeout(context);
loadMarks(lock_timeout);
ReadLock lock{rwlock, lock_timeout};
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
if (!num_data_files || !file_checker.getFileSize(data_files[INDEX_WITH_REAL_ROW_COUNT].path))
return {};
auto temp_dir_owner = std::make_shared<TemporaryFileOnDisk>(disk, "tmp/backup_");
auto temp_dir = temp_dir_owner->getPath();
disk->createDirectories(temp_dir);
BackupEntries backup_entries;
/// *.bin
for (const auto & data_file : data_files)
{
/// We make a copy of the data file because it can be changed later in write() or in truncate().
String data_file_name = fileName(data_file.path);
String temp_file_path = temp_dir + "/" + data_file_name;
disk->copy(data_file.path, disk, temp_file_path);
backup_entries.emplace_back(
data_file_name,
std::make_unique<BackupEntryFromImmutableFile>(
disk, temp_file_path, file_checker.getFileSize(data_file.path), std::nullopt, temp_dir_owner));
}
/// __marks.mrk
if (use_marks_file)
{
/// We make a copy of the data file because it can be changed later in write() or in truncate().
String marks_file_name = fileName(marks_file_path);
String temp_file_path = temp_dir + "/" + marks_file_name;
disk->copy(marks_file_path, disk, temp_file_path);
backup_entries.emplace_back(
marks_file_name,
std::make_unique<BackupEntryFromImmutableFile>(
disk, temp_file_path, file_checker.getFileSize(marks_file_path), std::nullopt, temp_dir_owner));
}
/// sizes.json
String files_info_path = file_checker.getPath();
backup_entries.emplace_back(fileName(files_info_path), std::make_unique<BackupEntryFromSmallFile>(disk, files_info_path));
/// columns.txt
backup_entries.emplace_back(
"columns.txt", std::make_unique<BackupEntryFromMemory>(getInMemoryMetadata().getColumns().getAllPhysical().toString()));
/// count.txt
if (use_marks_file)
{
size_t num_rows = data_files[INDEX_WITH_REAL_ROW_COUNT].marks.empty() ? 0 : data_files[INDEX_WITH_REAL_ROW_COUNT].marks.back().rows;
backup_entries.emplace_back("count.txt", std::make_unique<BackupEntryFromMemory>(toString(num_rows)));
}
return backup_entries;
}
RestoreDataTasks StorageLog::restoreFromBackup(const BackupPtr & backup, const String & data_path_in_backup, const ASTs & partitions, ContextMutablePtr context)
{
if (!partitions.empty())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Table engine {} doesn't support partitions", getName());
auto restore_task = [this, backup, data_path_in_backup, context]()
{
auto lock_timeout = getLockTimeout(context);
WriteLock lock{rwlock, lock_timeout};
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
if (!num_data_files)
return;
/// Load the marks if not loaded yet. We have to do that now because we're going to update these marks.
loadMarks(lock);
/// If there were no files, save zero file sizes to be able to rollback in case of error.
saveFileSizes(lock);
try
{
/// Append data files.
for (const auto & data_file : data_files)
{
String file_path_in_backup = data_path_in_backup + fileName(data_file.path);
auto backup_entry = backup->read(file_path_in_backup);
auto in = backup_entry->getReadBuffer();
auto out = disk->writeFile(data_file.path, max_compress_block_size, WriteMode::Append);
copyData(*in, *out);
}
if (use_marks_file)
{
/// Append marks.
size_t num_extra_marks = 0;
String file_path_in_backup = data_path_in_backup + fileName(marks_file_path);
size_t file_size = backup->getSize(file_path_in_backup);
if (file_size % (num_data_files * sizeof(Mark)) != 0)
throw Exception("Size of marks file is inconsistent", ErrorCodes::SIZES_OF_MARKS_FILES_ARE_INCONSISTENT);
num_extra_marks = file_size / (num_data_files * sizeof(Mark));
size_t num_marks = data_files[0].marks.size();
for (auto & data_file : data_files)
data_file.marks.reserve(num_marks + num_extra_marks);
std::vector<size_t> old_data_sizes;
std::vector<size_t> old_num_rows;
old_data_sizes.resize(num_data_files);
old_num_rows.resize(num_data_files);
for (size_t i = 0; i != num_data_files; ++i)
{
old_data_sizes[i] = file_checker.getFileSize(data_files[i].path);
old_num_rows[i] = num_marks ? data_files[i].marks[num_marks - 1].rows : 0;
}
auto backup_entry = backup->read(file_path_in_backup);
auto marks_rb = backup_entry->getReadBuffer();
for (size_t i = 0; i != num_extra_marks; ++i)
{
for (size_t j = 0; j != num_data_files; ++j)
{
Mark mark;
mark.read(*marks_rb);
mark.rows += old_num_rows[j]; /// Adjust the number of rows.
mark.offset += old_data_sizes[j]; /// Adjust the offset.
data_files[j].marks.push_back(mark);
}
}
}
/// Finish writing.
saveMarks(lock);
saveFileSizes(lock);
}
catch (...)
{
/// Rollback partial writes.
file_checker.repair();
removeUnsavedMarks(lock);
throw;
}
};
return {restore_task};
}
void registerStorageLog(StorageFactory & factory)
{
StorageFactory::StorageFeatures features{

View File

@ -51,6 +51,9 @@ public:
bool supportsSubcolumns() const override { return true; }
ColumnSizeByName getColumnSizes() const override;
BackupEntries backup(const ASTs & partitions, ContextPtr context) override;
RestoreDataTasks restoreFromBackup(const BackupPtr & backup, const String & data_path_in_backup, const ASTs & partitions, ContextMutablePtr context) override;
protected:
/** Attach the table with the appropriate name, along the appropriate path (with / at the end),
* (the correctness of names and paths is not verified)

View File

@ -9,10 +9,12 @@
#include <Common/Exception.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <Formats/NativeReader.h>
#include <Formats/NativeWriter.h>
@ -33,6 +35,13 @@
#include <Processors/Sinks/SinkToStorage.h>
#include <QueryPipeline/Pipe.h>
#include <Backups/BackupEntryFromImmutableFile.h>
#include <Backups/BackupEntryFromSmallFile.h>
#include <Backups/IBackup.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <base/insertAtEnd.h>
#include <cassert>
@ -44,6 +53,7 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int INCORRECT_FILE_NAME;
extern const int TIMEOUT_EXCEEDED;
extern const int NOT_IMPLEMENTED;
}
@ -481,6 +491,134 @@ void StorageStripeLog::saveFileSizes(const WriteLock & /* already locked for wri
}
BackupEntries StorageStripeLog::backup(const ASTs & partitions, ContextPtr context)
{
if (!partitions.empty())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Table engine {} doesn't support partitions", getName());
auto lock_timeout = getLockTimeout(context);
loadIndices(lock_timeout);
ReadLock lock{rwlock, lock_timeout};
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
if (!file_checker.getFileSize(data_file_path))
return {};
auto temp_dir_owner = std::make_shared<TemporaryFileOnDisk>(disk, "tmp/backup_");
auto temp_dir = temp_dir_owner->getPath();
disk->createDirectories(temp_dir);
BackupEntries backup_entries;
/// data.bin
{
/// We make a copy of the data file because it can be changed later in write() or in truncate().
String data_file_name = fileName(data_file_path);
String temp_file_path = temp_dir + "/" + data_file_name;
disk->copy(data_file_path, disk, temp_file_path);
backup_entries.emplace_back(
data_file_name,
std::make_unique<BackupEntryFromImmutableFile>(
disk, temp_file_path, file_checker.getFileSize(data_file_path), std::nullopt, temp_dir_owner));
}
/// index.mrk
{
/// We make a copy of the data file because it can be changed later in write() or in truncate().
String index_file_name = fileName(index_file_path);
String temp_file_path = temp_dir + "/" + index_file_name;
disk->copy(index_file_path, disk, temp_file_path);
backup_entries.emplace_back(
index_file_name,
std::make_unique<BackupEntryFromImmutableFile>(
disk, temp_file_path, file_checker.getFileSize(index_file_path), std::nullopt, temp_dir_owner));
}
/// sizes.json
String files_info_path = file_checker.getPath();
backup_entries.emplace_back(fileName(files_info_path), std::make_unique<BackupEntryFromSmallFile>(disk, files_info_path));
/// columns.txt
backup_entries.emplace_back(
"columns.txt", std::make_unique<BackupEntryFromMemory>(getInMemoryMetadata().getColumns().getAllPhysical().toString()));
/// count.txt
size_t num_rows = 0;
for (const auto & block : indices.blocks)
num_rows += block.num_rows;
backup_entries.emplace_back("count.txt", std::make_unique<BackupEntryFromMemory>(toString(num_rows)));
return backup_entries;
}
RestoreDataTasks StorageStripeLog::restoreFromBackup(const BackupPtr & backup, const String & data_path_in_backup, const ASTs & partitions, ContextMutablePtr context)
{
if (!partitions.empty())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Table engine {} doesn't support partitions", getName());
auto restore_task = [this, backup, data_path_in_backup, context]()
{
WriteLock lock{rwlock, getLockTimeout(context)};
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
/// Load the indices if not loaded yet. We have to do that now because we're going to update these indices.
loadIndices(lock);
/// If there were no files, save zero file sizes to be able to rollback in case of error.
saveFileSizes(lock);
try
{
/// Append the data file.
auto old_data_size = file_checker.getFileSize(data_file_path);
{
String file_path_in_backup = data_path_in_backup + fileName(data_file_path);
auto backup_entry = backup->read(file_path_in_backup);
auto in = backup_entry->getReadBuffer();
auto out = disk->writeFile(data_file_path, max_compress_block_size, WriteMode::Append);
copyData(*in, *out);
}
/// Append the index.
String index_path_in_backup = data_path_in_backup + fileName(index_file_path);
if (backup->exists(index_path_in_backup))
{
IndexForNativeFormat extra_indices;
auto backup_entry = backup->read(index_path_in_backup);
auto index_in = backup_entry->getReadBuffer();
CompressedReadBuffer index_compressed_in{*index_in};
extra_indices.read(index_compressed_in);
/// Adjust the offsets.
for (auto & block : extra_indices.blocks)
{
for (auto & column : block.columns)
column.location.offset_in_compressed_file += old_data_size;
}
insertAtEnd(indices.blocks, std::move(extra_indices.blocks));
}
/// Finish writing.
saveIndices(lock);
saveFileSizes(lock);
}
catch (...)
{
/// Rollback partial writes.
file_checker.repair();
removeUnsavedIndices(lock);
throw;
}
};
return {restore_task};
}
void registerStorageStripeLog(StorageFactory & factory)
{
StorageFactory::StorageFeatures features{

View File

@ -50,6 +50,9 @@ public:
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder&) override;
BackupEntries backup(const ASTs & partitions, ContextPtr context) override;
RestoreDataTasks restoreFromBackup(const BackupPtr & backup, const String & data_path_in_backup, const ASTs & partitions, ContextMutablePtr context) override;
protected:
StorageStripeLog(
DiskPtr disk_,
@ -92,7 +95,7 @@ private:
const size_t max_compress_block_size;
std::shared_timed_mutex rwlock;
mutable std::shared_timed_mutex rwlock;
Poco::Logger * log;
};

View File

@ -6,9 +6,11 @@ cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance')
def create_and_fill_table():
def create_and_fill_table(engine="MergeTree"):
if engine == "MergeTree":
engine = "MergeTree ORDER BY y PARTITION BY x%10"
instance.query("CREATE DATABASE test")
instance.query("CREATE TABLE test.table(x UInt32, y String) ENGINE=MergeTree ORDER BY y PARTITION BY x%10")
instance.query(f"CREATE TABLE test.table(x UInt32, y String) ENGINE={engine}")
instance.query("INSERT INTO test.table SELECT number, toString(number) FROM numbers(100)")
@ -36,9 +38,11 @@ def new_backup_name():
return f"test-backup-{backup_id_counter}"
def test_restore_table():
@pytest.mark.parametrize("engine", ["MergeTree", "Log", "TinyLog", "StripeLog"])
def test_restore_table(engine):
backup_name = new_backup_name()
create_and_fill_table()
create_and_fill_table(engine=engine)
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO '{backup_name}'")
@ -50,9 +54,10 @@ def test_restore_table():
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
def test_restore_table_into_existing_table():
@pytest.mark.parametrize("engine", ["MergeTree", "Log", "TinyLog", "StripeLog"])
def test_restore_table_into_existing_table(engine):
backup_name = new_backup_name()
create_and_fill_table()
create_and_fill_table(engine=engine)
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO '{backup_name}'")