Implement BACKUP & RESTORE for the Memory table engine.

This commit is contained in:
Vitaly Baranov 2022-01-25 00:41:13 +07:00 committed by Vitaly Baranov
parent ef57a87394
commit e72a343994
6 changed files with 204 additions and 16 deletions

View File

@ -1,13 +1,16 @@
#include <base/JSON.h>
#include <Common/FileChecker.h>
#include <Common/escapeForFileName.h>
#include <Disks/IDisk.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Common/escapeForFileName.h>
#include <IO/createReadBufferFromFileBase.h>
#include <base/JSON.h>
#include <Common/FileChecker.h>
namespace fs = std::filesystem;
namespace DB
{
@ -19,6 +22,10 @@ namespace ErrorCodes
}
FileChecker::FileChecker(const String & file_info_path_) : FileChecker(nullptr, file_info_path_)
{
}
FileChecker::FileChecker(DiskPtr disk_, const String & file_info_path_) : disk(std::move(disk_))
{
setPath(file_info_path_);
@ -45,8 +52,8 @@ String FileChecker::getPath() const
void FileChecker::update(const String & full_file_path)
{
bool exists = disk->exists(full_file_path);
auto real_size = exists ? disk->getFileSize(full_file_path) : 0; /// No race condition assuming no one else is working with these files.
bool exists = fileReallyExists(full_file_path);
auto real_size = exists ? getRealFileSize(full_file_path) : 0; /// No race condition assuming no one else is working with these files.
map[fileName(full_file_path)] = real_size;
}
@ -74,8 +81,8 @@ CheckResults FileChecker::check() const
{
const String & name = name_size.first;
String path = parentPath(files_info_path) + name;
bool exists = disk->exists(path);
auto real_size = exists ? disk->getFileSize(path) : 0; /// No race condition assuming no one else is working with these files.
bool exists = fileReallyExists(path);
auto real_size = exists ? getRealFileSize(path) : 0; /// No race condition assuming no one else is working with these files.
if (real_size != name_size.second)
{
@ -99,8 +106,8 @@ void FileChecker::repair()
const String & name = name_size.first;
size_t expected_size = name_size.second;
String path = parentPath(files_info_path) + name;
bool exists = disk->exists(path);
auto real_size = exists ? disk->getFileSize(path) : 0; /// No race condition assuming no one else is working with these files.
bool exists = fileReallyExists(path);
auto real_size = exists ? getRealFileSize(path) : 0; /// No race condition assuming no one else is working with these files.
if (real_size < expected_size)
throw Exception(ErrorCodes::UNEXPECTED_END_OF_FILE, "Size of {} is less than expected. Size is {} but should be {}.",
@ -119,7 +126,7 @@ void FileChecker::save() const
std::string tmp_files_info_path = parentPath(files_info_path) + "tmp_" + fileName(files_info_path);
{
std::unique_ptr<WriteBuffer> out = disk->writeFile(tmp_files_info_path);
std::unique_ptr<WriteBuffer> out = disk ? disk->writeFile(tmp_files_info_path) : std::make_unique<WriteBufferFromFile>(tmp_files_info_path);
/// So complex JSON structure - for compatibility with the old format.
writeCString("{\"clickhouse\":{", *out);
@ -141,17 +148,20 @@ void FileChecker::save() const
out->next();
}
disk->replaceFile(tmp_files_info_path, files_info_path);
if (disk)
disk->replaceFile(tmp_files_info_path, files_info_path);
else
fs::rename(tmp_files_info_path, files_info_path);
}
void FileChecker::load()
{
map.clear();
if (!disk->exists(files_info_path))
if (!fileReallyExists(files_info_path))
return;
std::unique_ptr<ReadBuffer> in = disk->readFile(files_info_path);
std::unique_ptr<ReadBuffer> in = disk ? disk->readFile(files_info_path) : createReadBufferFromFileBase(files_info_path, {});
WriteBufferFromOwnString out;
/// The JSON library does not support whitespace. We delete them. Inefficient.
@ -169,4 +179,14 @@ void FileChecker::load()
map[unescapeForFileName(file.getName())] = file.getValue()["size"].toUInt();
}
bool FileChecker::fileReallyExists(const String & path_) const
{
return disk ? disk->exists(path_) : fs::exists(path_);
}
size_t FileChecker::getRealFileSize(const String & path_) const
{
return disk ? disk->getFileSize(path_) : fs::file_size(path_);
}
}

View File

@ -2,16 +2,19 @@
#include <base/logger_useful.h>
#include <Storages/CheckResults.h>
#include <Disks/IDisk.h>
namespace DB
{
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
/// Stores the sizes of all columns, and can check whether the columns are corrupted.
class FileChecker
{
public:
FileChecker(const String & file_info_path_);
FileChecker(DiskPtr disk_, const String & file_info_path_);
void setPath(const String & file_info_path_);
@ -36,6 +39,9 @@ public:
private:
void load();
bool fileReallyExists(const String & path_) const;
size_t getRealFileSize(const String & path_) const;
const DiskPtr disk;
const Poco::Logger * log = &Poco::Logger::get("FileChecker");

View File

@ -17,6 +17,17 @@
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Parsers/ASTCreateQuery.h>
#include <Common/FileChecker.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Backups/BackupEntryFromImmutableFile.h>
#include <Backups/IBackup.h>
#include <Backups/IRestoreTask.h>
#include <IO/copyData.h>
#include <IO/createReadBufferFromFileBase.h>
#include <Poco/TemporaryFile.h>
namespace DB
{
@ -364,6 +375,152 @@ void StorageMemory::truncate(
total_size_rows.store(0, std::memory_order_relaxed);
}
BackupEntries StorageMemory::backup(ContextPtr context, const ASTs & partitions)
{
if (!partitions.empty())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Table engine {} doesn't support partitions", getName());
auto blocks = data.get();
/// We store our data in the StripeLog format.
BackupEntries backup_entries;
auto temp_dir_owner = std::make_shared<Poco::TemporaryFile>();
auto temp_dir = temp_dir_owner->path();
fs::create_directories(temp_dir);
/// Writing data.bin
constexpr char data_file_name[] = "data.bin";
String data_file_path = temp_dir + "/" + data_file_name;
IndexForNativeFormat index;
{
auto data_out_compressed = std::make_unique<WriteBufferFromFile>(data_file_path);
CompressedWriteBuffer data_out{*data_out_compressed, CompressionCodecFactory::instance().getDefaultCodec(), context->getSettingsRef().max_compress_block_size};
NativeWriter block_out{data_out, 0, getInMemoryMetadataPtr()->getSampleBlock(), false, &index};
for (const auto & block : *blocks)
block_out.write(block);
}
/// Writing index.mrk
constexpr char index_file_name[] = "index.mrk";
String index_file_path = temp_dir + "/" + index_file_name;
{
auto index_out_compressed = std::make_unique<WriteBufferFromFile>(index_file_path);
CompressedWriteBuffer index_out{*index_out_compressed};
index.write(index_out);
}
/// Writing sizes.json
constexpr char sizes_file_name[] = "sizes.json";
String sizes_file_path = temp_dir + "/" + sizes_file_name;
FileChecker file_checker{sizes_file_path};
file_checker.update(data_file_path);
file_checker.update(index_file_path);
file_checker.save();
/// Prepare backup entries.
backup_entries.emplace_back(
data_file_name,
std::make_unique<BackupEntryFromImmutableFile>(
data_file_path, file_checker.getFileSize(data_file_path), std::nullopt, temp_dir_owner));
backup_entries.emplace_back(
index_file_name,
std::make_unique<BackupEntryFromImmutableFile>(
index_file_path, file_checker.getFileSize(index_file_path), std::nullopt, temp_dir_owner));
backup_entries.emplace_back(
sizes_file_name,
std::make_unique<BackupEntryFromImmutableFile>(
sizes_file_path, std::nullopt, std::nullopt, temp_dir_owner));
return backup_entries;
}
class MemoryRestoreTask : public IRestoreTask
{
public:
MemoryRestoreTask(
std::shared_ptr<StorageMemory> storage_, const BackupPtr & backup_, const String & data_path_in_backup_, ContextMutablePtr context_)
: storage(storage_), backup(backup_), data_path_in_backup(data_path_in_backup_), context(context_)
{
}
RestoreTasks run() override
{
/// Our data are in the StripeLog format.
/// Reading index.mrk
IndexForNativeFormat index;
{
String index_file_path = data_path_in_backup + "index.mrk";
auto backup_entry = backup->readFile(index_file_path);
auto in = backup_entry->getReadBuffer();
CompressedReadBuffer compressed_in{*in};
index.read(compressed_in);
}
/// Reading data.bin
Blocks new_blocks;
size_t new_bytes = 0;
size_t new_rows = 0;
{
String data_file_path = data_path_in_backup + "data.bin";
auto backup_entry = backup->readFile(data_file_path);
std::unique_ptr<ReadBuffer> in = backup_entry->getReadBuffer();
std::optional<Poco::TemporaryFile> temp_data_copy;
if (!typeid_cast<ReadBufferFromFileBase *>(in.get()))
{
temp_data_copy.emplace();
auto temp_data_copy_out = std::make_unique<WriteBufferFromFile>(temp_data_copy->path());
copyData(*in, *temp_data_copy_out);
temp_data_copy_out.reset();
in = createReadBufferFromFileBase(temp_data_copy->path(), {});
}
std::unique_ptr<ReadBufferFromFileBase> in_from_file{static_cast<ReadBufferFromFileBase *>(in.release())};
CompressedReadBufferFromFile compressed_in{std::move(in_from_file)};
NativeReader block_in{compressed_in, 0, index.blocks.begin(), index.blocks.end()};
while (auto block = block_in.read())
{
new_bytes += block.bytes();
new_rows += block.rows();
new_blocks.push_back(std::move(block));
}
}
/// Append old blocks with the new ones.
auto old_blocks = storage->data.get();
Blocks old_and_new_blocks = *old_blocks;
old_and_new_blocks.insert(old_and_new_blocks.end(), std::make_move_iterator(new_blocks.begin()), std::make_move_iterator(new_blocks.end()));
/// Finish restoring.
storage->data.set(std::make_unique<Blocks>(std::move(old_and_new_blocks)));
storage->total_size_bytes += new_bytes;
storage->total_size_rows += new_rows;
return {};
}
private:
std::shared_ptr<StorageMemory> storage;
BackupPtr backup;
String data_path_in_backup;
ContextMutablePtr context;
};
RestoreTaskPtr StorageMemory::restoreFromBackup(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings &)
{
if (!partitions.empty())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Table engine {} doesn't support partitions", getName());
return std::make_unique<MemoryRestoreTask>(
typeid_cast<std::shared_ptr<StorageMemory>>(shared_from_this()), backup, data_path_in_backup, context);
}
std::optional<UInt64> StorageMemory::totalRows(const Settings &) const
{
/// All modifications of these counters are done under mutex which automatically guarantees synchronization/consistency

View File

@ -22,6 +22,7 @@ namespace DB
class StorageMemory final : public shared_ptr_helper<StorageMemory>, public IStorage
{
friend class MemorySink;
friend class MemoryRestoreTask;
friend struct shared_ptr_helper<StorageMemory>;
public:
@ -65,6 +66,9 @@ public:
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) override;
BackupEntries backup(ContextPtr context, const ASTs & partitions) override;
RestoreTaskPtr restoreFromBackup(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings & restore_settings) override;
std::optional<UInt64> totalRows(const Settings &) const override;
std::optional<UInt64> totalBytes(const Settings &) const override;

View File

@ -10,6 +10,7 @@
#include <Formats/IndexForNativeFormat.h>
#include <Common/FileChecker.h>
#include <Common/escapeForFileName.h>
#include <Disks/IDisk.h>
namespace DB

View File

@ -37,7 +37,7 @@ def new_backup_name():
return f"Disk('backups', '{backup_id_counter}/')"
@pytest.mark.parametrize("engine", ["MergeTree", "Log", "TinyLog", "StripeLog"])
@pytest.mark.parametrize("engine", ["MergeTree", "Log", "TinyLog", "StripeLog", "Memory"])
def test_restore_table(engine):
backup_name = new_backup_name()
create_and_fill_table(engine=engine)
@ -52,7 +52,7 @@ def test_restore_table(engine):
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
@pytest.mark.parametrize("engine", ["MergeTree", "Log", "TinyLog", "StripeLog"])
@pytest.mark.parametrize("engine", ["MergeTree", "Log", "TinyLog", "StripeLog", "Memory"])
def test_restore_table_into_existing_table(engine):
backup_name = new_backup_name()
create_and_fill_table(engine=engine)