diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 4e3f550f67d..fe7f011d4d0 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -736,6 +736,10 @@ if (ThreadFuzzer::instance().isEffective()) setupTmpPath(log, disk->getPath()); } + /// Storage keeping all the backups. + fs::create_directories(path / "backups"); + global_context->setBackupsVolume(config().getString("backups_path", path / "backups"), config().getString("backups_policy", "")); + /** Directory with 'flags': files indicating temporary settings for the server set by system administrator. * Flags may be cleared automatically after being applied by the server. * Examples: do repair of local data; clone all replicated tables from replica. diff --git a/src/Backups/BackupEntryConcat.cpp b/src/Backups/BackupEntryConcat.cpp new file mode 100644 index 00000000000..c6234f68fb2 --- /dev/null +++ b/src/Backups/BackupEntryConcat.cpp @@ -0,0 +1,28 @@ +#include +#include + + +namespace DB +{ +BackupEntryConcat::BackupEntryConcat( + BackupEntryPtr first_source_, + BackupEntryPtr second_source_, + const std::optional & checksum_) + : first_source(std::move(first_source_)) + , second_source(std::move(second_source_)) + , checksum(checksum_) +{ +} + +UInt64 BackupEntryConcat::getSize() const +{ + if (!size) + size = first_source->getSize() + second_source->getSize(); + return *size; +} + +std::unique_ptr BackupEntryConcat::getReadBuffer() const +{ + return std::make_unique(first_source->getReadBuffer(), second_source->getReadBuffer()); +} +} diff --git a/src/Backups/BackupEntryConcat.h b/src/Backups/BackupEntryConcat.h new file mode 100644 index 00000000000..cb38fb9b163 --- /dev/null +++ b/src/Backups/BackupEntryConcat.h @@ -0,0 +1,30 @@ +#pragma once + +#include + + +namespace DB +{ + +/// Concatenates data of two backup entries. +class BackupEntryConcat : public IBackupEntry +{ +public: + /// The constructor is allowed to not set `checksum_`, in that case it will be calculated from the data. + BackupEntryConcat( + BackupEntryPtr first_source_, + BackupEntryPtr second_source_, + const std::optional & checksum_ = {}); + + UInt64 getSize() const override; + std::optional getChecksum() const override { return checksum; } + std::unique_ptr getReadBuffer() const override; + +private: + BackupEntryPtr first_source; + BackupEntryPtr second_source; + mutable std::optional size; + std::optional checksum; +}; + +} diff --git a/src/Backups/BackupEntryFromAppendOnlyFile.cpp b/src/Backups/BackupEntryFromAppendOnlyFile.cpp new file mode 100644 index 00000000000..32d5713952f --- /dev/null +++ b/src/Backups/BackupEntryFromAppendOnlyFile.cpp @@ -0,0 +1,35 @@ +#include +#include + + +namespace DB +{ + +BackupEntryFromAppendOnlyFile::BackupEntryFromAppendOnlyFile( + const String & file_path_, + const std::optional & file_size_, + const std::optional & checksum_, + const std::shared_ptr & temporary_file_) + : BackupEntryFromImmutableFile(file_path_, file_size_, checksum_, temporary_file_) + , limit(BackupEntryFromImmutableFile::getSize()) +{ +} + +BackupEntryFromAppendOnlyFile::BackupEntryFromAppendOnlyFile( + const DiskPtr & disk_, + const String & file_path_, + const std::optional & file_size_, + const std::optional & checksum_, + const std::shared_ptr & temporary_file_) + : BackupEntryFromImmutableFile(disk_, file_path_, file_size_, checksum_, temporary_file_) + , limit(BackupEntryFromImmutableFile::getSize()) +{ +} + +std::unique_ptr BackupEntryFromAppendOnlyFile::getReadBuffer() const +{ + auto buf = BackupEntryFromImmutableFile::getReadBuffer(); + return std::make_unique(std::move(buf), limit, true); +} + +} diff --git a/src/Backups/BackupEntryFromAppendOnlyFile.h b/src/Backups/BackupEntryFromAppendOnlyFile.h new file mode 100644 index 00000000000..2bc0c6a68a0 --- /dev/null +++ b/src/Backups/BackupEntryFromAppendOnlyFile.h @@ -0,0 +1,35 @@ +#pragma once + +#include + + +namespace DB +{ + +/// Represents a file prepared to be included in a backup, assuming that until this backup entry is destroyed +/// the file can be appended with new data, but the bytes which are already in the file won't be changed. +class BackupEntryFromAppendOnlyFile : public BackupEntryFromImmutableFile +{ +public: + /// The constructor is allowed to not set `file_size_` or `checksum_`, in that case it will be calculated from the data. + BackupEntryFromAppendOnlyFile( + const String & file_path_, + const std::optional & file_size_ = {}, + const std::optional & checksum_ = {}, + const std::shared_ptr & temporary_file_ = {}); + + BackupEntryFromAppendOnlyFile( + const DiskPtr & disk_, + const String & file_path_, + const std::optional & file_size_ = {}, + const std::optional & checksum_ = {}, + const std::shared_ptr & temporary_file_ = {}); + + UInt64 getSize() const override { return limit; } + std::unique_ptr getReadBuffer() const override; + +private: + const UInt64 limit; +}; + +} diff --git a/src/Backups/BackupEntryFromImmutableFile.cpp b/src/Backups/BackupEntryFromImmutableFile.cpp new file mode 100644 index 00000000000..41e8e913642 --- /dev/null +++ b/src/Backups/BackupEntryFromImmutableFile.cpp @@ -0,0 +1,47 @@ +#include +#include +#include +#include + + +namespace DB +{ + +BackupEntryFromImmutableFile::BackupEntryFromImmutableFile( + const String & file_path_, + const std::optional & file_size_, + const std::optional & checksum_, + const std::shared_ptr & temporary_file_) + : file_path(file_path_), file_size(file_size_), checksum(checksum_), temporary_file(temporary_file_) +{ +} + +BackupEntryFromImmutableFile::BackupEntryFromImmutableFile( + const DiskPtr & disk_, + const String & file_path_, + const std::optional & file_size_, + const std::optional & checksum_, + const std::shared_ptr & temporary_file_) + : disk(disk_), file_path(file_path_), file_size(file_size_), checksum(checksum_), temporary_file_on_disk(temporary_file_) +{ +} + +BackupEntryFromImmutableFile::~BackupEntryFromImmutableFile() = default; + +UInt64 BackupEntryFromImmutableFile::getSize() const +{ + std::lock_guard lock{get_file_size_mutex}; + if (!file_size) + file_size = disk ? disk->getFileSize(file_path) : Poco::File(file_path).getSize(); + return *file_size; +} + +std::unique_ptr BackupEntryFromImmutableFile::getReadBuffer() const +{ + if (disk) + return disk->readFile(file_path); + else + return createReadBufferFromFileBase(file_path, 0, 0, 0, nullptr); +} + +} diff --git a/src/Backups/BackupEntryFromImmutableFile.h b/src/Backups/BackupEntryFromImmutableFile.h new file mode 100644 index 00000000000..3be261c49ce --- /dev/null +++ b/src/Backups/BackupEntryFromImmutableFile.h @@ -0,0 +1,51 @@ +#pragma once + +#include +#include + +namespace Poco { class TemporaryFile; } + +namespace DB +{ +class TemporaryFileOnDisk; +class IDisk; +using DiskPtr = std::shared_ptr; + +/// Represents a file prepared to be included in a backup, assuming that until this backup entry is destroyed the file won't be changed. +class BackupEntryFromImmutableFile : public IBackupEntry +{ +public: + /// The constructor is allowed to not set `file_size_` or `checksum_`, in that case it will be calculated from the data. + BackupEntryFromImmutableFile( + const String & file_path_, + const std::optional & file_size_ = {}, + const std::optional & checksum_ = {}, + const std::shared_ptr & temporary_file_ = {}); + + BackupEntryFromImmutableFile( + const DiskPtr & disk_, + const String & file_path_, + const std::optional & file_size_ = {}, + const std::optional & checksum_ = {}, + const std::shared_ptr & temporary_file_ = {}); + + ~BackupEntryFromImmutableFile() override; + + UInt64 getSize() const override; + std::optional getChecksum() const override { return checksum; } + std::unique_ptr getReadBuffer() const override; + + String getFilePath() const { return file_path; } + DiskPtr getDisk() const { return disk; } + +private: + const DiskPtr disk; + const String file_path; + mutable std::optional file_size; + mutable std::mutex get_file_size_mutex; + const std::optional checksum; + const std::shared_ptr temporary_file; + const std::shared_ptr temporary_file_on_disk; +}; + +} diff --git a/src/Backups/BackupEntryFromMemory.cpp b/src/Backups/BackupEntryFromMemory.cpp new file mode 100644 index 00000000000..96493e7962e --- /dev/null +++ b/src/Backups/BackupEntryFromMemory.cpp @@ -0,0 +1,23 @@ +#include +#include + + +namespace DB +{ + +BackupEntryFromMemory::BackupEntryFromMemory(const void * data_, size_t size_, const std::optional & checksum_) + : BackupEntryFromMemory(String{reinterpret_cast(data_), size_}, checksum_) +{ +} + +BackupEntryFromMemory::BackupEntryFromMemory(String data_, const std::optional & checksum_) + : data(std::move(data_)), checksum(checksum_) +{ +} + +std::unique_ptr BackupEntryFromMemory::getReadBuffer() const +{ + return std::make_unique(data); +} + +} diff --git a/src/Backups/BackupEntryFromMemory.h b/src/Backups/BackupEntryFromMemory.h new file mode 100644 index 00000000000..3756c2ebfea --- /dev/null +++ b/src/Backups/BackupEntryFromMemory.h @@ -0,0 +1,27 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +/// Represents small preloaded data to be included in a backup. +class BackupEntryFromMemory : public IBackupEntry +{ +public: + /// The constructor is allowed to not set `checksum_`, in that case it will be calculated from the data. + BackupEntryFromMemory(const void * data_, size_t size_, const std::optional & checksum_ = {}); + BackupEntryFromMemory(String data_, const std::optional & checksum_ = {}); + + UInt64 getSize() const override { return data.size(); } + std::optional getChecksum() const override { return checksum; } + std::unique_ptr getReadBuffer() const override; + +private: + const String data; + const std::optional checksum; +}; + +} diff --git a/src/Backups/BackupEntryFromSmallFile.cpp b/src/Backups/BackupEntryFromSmallFile.cpp new file mode 100644 index 00000000000..cbd517744f5 --- /dev/null +++ b/src/Backups/BackupEntryFromSmallFile.cpp @@ -0,0 +1,39 @@ +#include +#include +#include +#include + + +namespace DB +{ +namespace +{ + String readFile(const String & file_path) + { + auto buf = createReadBufferFromFileBase(file_path, 0, 0, 0, nullptr); + String s; + readStringUntilEOF(s, *buf); + return s; + } + + String readFile(const DiskPtr & disk, const String & file_path) + { + auto buf = disk->readFile(file_path); + String s; + readStringUntilEOF(s, *buf); + return s; + } +} + + +BackupEntryFromSmallFile::BackupEntryFromSmallFile(const String & file_path_, const std::optional & checksum_) + : BackupEntryFromMemory(readFile(file_path_), checksum_), file_path(file_path_) +{ +} + +BackupEntryFromSmallFile::BackupEntryFromSmallFile( + const DiskPtr & disk_, const String & file_path_, const std::optional & checksum_) + : BackupEntryFromMemory(readFile(disk_, file_path_), checksum_), disk(disk_), file_path(file_path_) +{ +} +} diff --git a/src/Backups/BackupEntryFromSmallFile.h b/src/Backups/BackupEntryFromSmallFile.h new file mode 100644 index 00000000000..ab2ba2ea042 --- /dev/null +++ b/src/Backups/BackupEntryFromSmallFile.h @@ -0,0 +1,34 @@ +#pragma once + +#include + + +namespace DB +{ +class IDisk; +using DiskPtr = std::shared_ptr; + +/// Represents a file prepared to be included in a backup, +/// assuming that the file is small and can be easily loaded into memory. +class BackupEntryFromSmallFile : public BackupEntryFromMemory +{ +public: + /// The constructor is allowed to not set `checksum_`, in that case it will be calculated from the data. + BackupEntryFromSmallFile( + const String & file_path_, + const std::optional & checksum_ = {}); + + BackupEntryFromSmallFile( + const DiskPtr & disk_, + const String & file_path_, + const std::optional & checksum_ = {}); + + String getFilePath() const { return file_path; } + DiskPtr getDisk() const { return disk; } + +private: + const DiskPtr disk; + const String file_path; +}; + +} diff --git a/src/Backups/BackupFactory.cpp b/src/Backups/BackupFactory.cpp new file mode 100644 index 00000000000..41377300375 --- /dev/null +++ b/src/Backups/BackupFactory.cpp @@ -0,0 +1,65 @@ +#include +#include +#include +#include + + +namespace DB +{ +namespace ErrorCodes +{ + extern const int BACKUP_NOT_FOUND; + extern const int BACKUP_ALREADY_EXISTS; + extern const int NOT_ENOUGH_SPACE; + extern const int LOGICAL_ERROR; +} + + +BackupFactory & BackupFactory::instance() +{ + static BackupFactory the_instance; + return the_instance; +} + +void BackupFactory::setBackupsVolume(VolumePtr backups_volume_) +{ + backups_volume = backups_volume_; +} + +BackupMutablePtr BackupFactory::createBackup(const String & backup_name, UInt64 estimated_backup_size, const BackupPtr & base_backup) const +{ + if (!backups_volume) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No backups volume"); + + for (const auto & disk : backups_volume->getDisks()) + { + if (disk->exists(backup_name)) + throw Exception(ErrorCodes::BACKUP_ALREADY_EXISTS, "Backup {} already exists", quoteString(backup_name)); + } + + auto reservation = backups_volume->reserve(estimated_backup_size); + if (!reservation) + throw Exception( + ErrorCodes::NOT_ENOUGH_SPACE, + "Couldn't reserve {} bytes of free space for new backup {}", + estimated_backup_size, + quoteString(backup_name)); + + return std::make_shared(IBackup::OpenMode::WRITE, reservation->getDisk(), backup_name, base_backup); +} + +BackupPtr BackupFactory::openBackup(const String & backup_name, const BackupPtr & base_backup) const +{ + if (!backups_volume) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No backups volume"); + + for (const auto & disk : backups_volume->getDisks()) + { + if (disk->exists(backup_name)) + return std::make_shared(IBackup::OpenMode::READ, disk, backup_name, base_backup); + } + + throw Exception(ErrorCodes::BACKUP_NOT_FOUND, "Backup {} not found", quoteString(backup_name)); +} + +} diff --git a/src/Backups/BackupFactory.h b/src/Backups/BackupFactory.h new file mode 100644 index 00000000000..8360d66728f --- /dev/null +++ b/src/Backups/BackupFactory.h @@ -0,0 +1,38 @@ +#pragma once + +#include +#include +#include + + +namespace DB +{ +class IBackup; +using BackupPtr = std::shared_ptr; +using BackupMutablePtr = std::shared_ptr; +class Context; +using ContextMutablePtr = std::shared_ptr; +class IVolume; +using VolumePtr = std::shared_ptr; + + +/// Factory for implementations of the IBackup interface. +class BackupFactory : boost::noncopyable +{ +public: + static BackupFactory & instance(); + + /// Must be called to initialize the backup factory. + void setBackupsVolume(VolumePtr backups_volume_); + + /// Creates a new backup and open it for writing. + BackupMutablePtr createBackup(const String & backup_name, UInt64 estimated_backup_size, const BackupPtr & base_backup = {}) const; + + /// Opens an existing backup for reading. + BackupPtr openBackup(const String & backup_name, const BackupPtr & base_backup = {}) const; + +private: + VolumePtr backups_volume; +}; + +} diff --git a/src/Backups/BackupInDirectory.cpp b/src/Backups/BackupInDirectory.cpp new file mode 100644 index 00000000000..2a59e9e06dc --- /dev/null +++ b/src/Backups/BackupInDirectory.cpp @@ -0,0 +1,454 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ +namespace ErrorCodes +{ + extern const int BACKUP_NOT_FOUND; + extern const int BACKUP_ALREADY_EXISTS; + extern const int BACKUP_VERSION_NOT_SUPPORTED; + extern const int BACKUP_DAMAGED; + extern const int NO_BASE_BACKUP; + extern const int WRONG_BASE_BACKUP; + extern const int BACKUP_ENTRY_ALREADY_EXISTS; + extern const int BACKUP_ENTRY_NOT_FOUND; + extern const int BAD_ARGUMENTS; + extern const int LOGICAL_ERROR; +} + +namespace +{ + const UInt64 BACKUP_VERSION = 1; +} + +BackupInDirectory::BackupInDirectory(OpenMode open_mode_, const DiskPtr & disk_, const String & path_, const std::shared_ptr & base_backup_) + : open_mode(open_mode_), disk(disk_), path(path_), path_with_sep(path_), base_backup(base_backup_) +{ + if (!path_with_sep.ends_with('/')) + path_with_sep += '/'; + trimRight(path, '/'); + open(); +} + +BackupInDirectory::~BackupInDirectory() +{ + close(); +} + +void BackupInDirectory::open() +{ + if (open_mode == OpenMode::WRITE) + { + if (disk->exists(path)) + throw Exception(ErrorCodes::BACKUP_ALREADY_EXISTS, "Backup {} already exists", quoteString(path)); + disk->createDirectories(path); + directory_was_created = true; + writePathToBaseBackup(); + } + + if (open_mode == OpenMode::READ) + { + if (!disk->isDirectory(path)) + throw Exception(ErrorCodes::BACKUP_NOT_FOUND, "Backup {} not found", quoteString(path)); + readContents(); + readPathToBaseBackup(); + } +} + +void BackupInDirectory::close() +{ + if (open_mode == OpenMode::WRITE) + { + if (!finalized && directory_was_created) + { + /// Creating of the backup wasn't finished correctly, + /// so the backup cannot be used and it's better to remove its files. + disk->removeRecursive(path); + } + } +} + +void BackupInDirectory::writePathToBaseBackup() +{ + String file_path = path_with_sep + ".base_backup"; + if (!base_backup) + { + disk->removeFileIfExists(file_path); + return; + } + auto out = disk->writeFile(file_path); + writeString(base_backup->getPath(), *out); +} + +void BackupInDirectory::readPathToBaseBackup() +{ + if (base_backup) + return; + String file_path = path_with_sep + ".base_backup"; + if (!disk->exists(file_path)) + return; + auto in = disk->readFile(file_path); + String base_backup_path; + readStringUntilEOF(base_backup_path, *in); + if (base_backup_path.empty()) + return; + base_backup = BackupFactory::instance().openBackup(base_backup_path); +} + +void BackupInDirectory::writeContents() +{ + auto out = disk->writeFile(path_with_sep + ".contents"); + writeVarUInt(BACKUP_VERSION, *out); + + writeVarUInt(infos.size(), *out); + for (const auto & [path_in_backup, info] : infos) + { + writeBinary(path_in_backup, *out); + writeVarUInt(info.size, *out); + if (info.size) + { + writeBinary(info.checksum, *out); + writeVarUInt(info.base_size, *out); + if (info.base_size && (info.base_size != info.size)) + writeBinary(info.base_checksum, *out); + } + } +} + +void BackupInDirectory::readContents() +{ + auto in = disk->readFile(path_with_sep + ".contents"); + UInt64 version; + readVarUInt(version, *in); + if (version != BACKUP_VERSION) + throw Exception(ErrorCodes::BACKUP_VERSION_NOT_SUPPORTED, "Backup {}: Version {} is not supported", quoteString(path), version); + + size_t num_infos; + readVarUInt(num_infos, *in); + infos.clear(); + for (size_t i = 0; i != num_infos; ++i) + { + String path_in_backup; + readBinary(path_in_backup, *in); + EntryInfo info; + readVarUInt(info.size, *in); + if (info.size) + { + readBinary(info.checksum, *in); + readVarUInt(info.base_size, *in); + if (info.base_size && (info.base_size != info.size)) + readBinary(info.base_checksum, *in); + else if (info.base_size) + info.base_checksum = info.checksum; + } + infos.emplace(path_in_backup, info); + } +} + +IBackup::OpenMode BackupInDirectory::getOpenMode() const +{ + return open_mode; +} + +String BackupInDirectory::getPath() const +{ + return path; +} + +Strings BackupInDirectory::list(const String & prefix, const String & terminator) const +{ + if (!prefix.ends_with('/') && !prefix.empty()) + throw Exception("prefix should end with '/'", ErrorCodes::BAD_ARGUMENTS); + std::lock_guard lock{mutex}; + Strings elements; + for (auto it = infos.lower_bound(prefix); it != infos.end(); ++it) + { + const String & name = it->first; + if (!name.starts_with(prefix)) + break; + size_t start_pos = prefix.length(); + size_t end_pos = String::npos; + if (!terminator.empty()) + end_pos = name.find(terminator, start_pos); + std::string_view new_element = std::string_view{name}.substr(start_pos, end_pos - start_pos); + if (!elements.empty() && (elements.back() == new_element)) + continue; + elements.push_back(String{new_element}); + } + return elements; +} + +bool BackupInDirectory::exists(const String & name) const +{ + std::lock_guard lock{mutex}; + return infos.count(name) != 0; +} + +size_t BackupInDirectory::getSize(const String & name) const +{ + std::lock_guard lock{mutex}; + auto it = infos.find(name); + if (it == infos.end()) + throw Exception( + ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "Backup {}: Entry {} not found in the backup", quoteString(path), quoteString(name)); + return it->second.size; +} + +UInt128 BackupInDirectory::getChecksum(const String & name) const +{ + std::lock_guard lock{mutex}; + auto it = infos.find(name); + if (it == infos.end()) + throw Exception( + ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "Backup {}: Entry {} not found in the backup", quoteString(path), quoteString(name)); + return it->second.checksum; +} + + +BackupEntryPtr BackupInDirectory::read(const String & name) const +{ + std::lock_guard lock{mutex}; + auto it = infos.find(name); + if (it == infos.end()) + throw Exception( + ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "Backup {}: Entry {} not found in the backup", quoteString(path), quoteString(name)); + + const auto & info = it->second; + if (!info.size) + { + /// Entry's data is empty. + return std::make_unique(nullptr, 0, UInt128{0, 0}); + } + + if (!info.base_size) + { + /// Data goes completely from this backup, the base backup isn't used. + return std::make_unique(disk, path_with_sep + name, info.size, info.checksum); + } + + if (info.size < info.base_size) + { + throw Exception( + ErrorCodes::BACKUP_DAMAGED, + "Backup {}: Entry {} has its data size less than in the base backup {}: {} < {}", + quoteString(path), quoteString(name), quoteString(base_backup->getPath()), info.size, info.base_size); + } + + if (!base_backup) + { + throw Exception( + ErrorCodes::NO_BASE_BACKUP, + "Backup {}: Entry {} is marked to be read from a base backup, but there is no base backup specified", + quoteString(path), quoteString(name)); + } + + if (!base_backup->exists(name)) + { + throw Exception( + ErrorCodes::WRONG_BASE_BACKUP, + "Backup {}: Entry {} is marked to be read from a base backup, but doesn't exist there", + quoteString(path), quoteString(name)); + } + + auto base_entry = base_backup->read(name); + auto base_size = base_entry->getSize(); + if (base_size != info.base_size) + { + throw Exception( + ErrorCodes::WRONG_BASE_BACKUP, + "Backup {}: Entry {} has unexpected size in the base backup {}: {} (expected size: {})", + quoteString(path), quoteString(name), quoteString(base_backup->getPath()), base_size, info.base_size); + } + + auto base_checksum = base_entry->getChecksum(); + if (base_checksum && (*base_checksum != info.base_checksum)) + { + throw Exception( + ErrorCodes::WRONG_BASE_BACKUP, + "Backup {}: Entry {} has unexpected checksum in the base backup {}", + quoteString(path), quoteString(name), quoteString(base_backup->getPath())); + } + + if (info.size == info.base_size) + { + /// Data goes completely from the base backup (nothing goes from this backup). + return base_entry; + } + + /// The beginning of the data goes from the base backup, + /// and the ending goes from this backup. + return std::make_unique( + std::move(base_entry), + std::make_unique(disk, path_with_sep + name, info.size - info.base_size), + info.checksum); +} + + +void BackupInDirectory::write(const String & name, BackupEntryPtr entry) +{ + std::lock_guard lock{mutex}; + if (open_mode != OpenMode::WRITE) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal operation: Cannot write to a backup opened for reading"); + + if (infos.contains(name)) + throw Exception( + ErrorCodes::BACKUP_ENTRY_ALREADY_EXISTS, "Backup {}: Entry {} already exists", quoteString(path), quoteString(name)); + + UInt64 size = entry->getSize(); + std::optional checksum = entry->getChecksum(); + + /// Check if the entry's data is empty. + if (!size) + { + infos.emplace(name, EntryInfo{}); + return; + } + + /// Check if a entry with such name exists in the base backup. + bool base_exists = (base_backup && base_backup->exists(name)); + UInt64 base_size = 0; + UInt128 base_checksum{0, 0}; + if (base_exists) + { + base_size = base_backup->getSize(name); + base_checksum = base_backup->getChecksum(name); + } + + std::unique_ptr read_buffer; /// We'll set that later. + UInt64 read_pos = 0; /// Current position in read_buffer. + + /// Determine whether it's possible to receive this entry's data from the base backup completely or partly. + bool use_base = false; + if (base_exists && base_size) + { + if (size == base_size) + { + /// The size is the same, we need to compare checksums to find out + /// if the entry's data has not been changed since the base backup. + if (!checksum) + { + read_buffer = entry->getReadBuffer(); + HashingReadBuffer hashing_read_buffer{*read_buffer}; + hashing_read_buffer.ignore(size); + read_pos = size; + checksum = hashing_read_buffer.getHash(); + } + if (checksum == base_checksum) + use_base = true; /// The data has not been changed. + } + else if (size > base_size) + { + /// The size has been increased, we need to calculate a partial checksum to find out + /// if the entry's data has been only appended since the base backup. + read_buffer = entry->getReadBuffer(); + HashingReadBuffer hashing_read_buffer{*read_buffer}; + hashing_read_buffer.ignore(base_size); + UInt128 partial_checksum = hashing_read_buffer.getHash(); + read_pos = base_size; + if (!checksum) + { + hashing_read_buffer.ignore(size - base_size); + checksum = hashing_read_buffer.getHash(); + read_pos = size; + } + if (partial_checksum == base_checksum) + use_base = true; /// The data has been appended. + } + } + + if (use_base && (size == base_size)) + { + /// The entry's data has not been changed since the base backup. + EntryInfo info; + info.size = base_size; + info.checksum = base_checksum; + info.base_size = base_size; + info.base_checksum = base_checksum; + infos.emplace(name, info); + return; + } + + { + /// Either the entry wasn't exist in the base backup + /// or the entry has data appended to the end of the data from the base backup. + /// In both those cases we have to copy data to this backup. + + /// Find out where the start position to copy data is. + auto copy_pos = use_base ? base_size : 0; + + /// Move the current read position to the start position to copy data. + /// If `read_buffer` is seekable it's easier, otherwise we can use ignore(). + if ((read_pos > copy_pos) && !typeid_cast(read_buffer.get())) + { + read_buffer.reset(); + read_pos = 0; + } + + if (!read_buffer) + read_buffer = entry->getReadBuffer(); + + if (read_pos != copy_pos) + { + if (auto * seekable_buffer = typeid_cast(read_buffer.get())) + seekable_buffer->seek(copy_pos, SEEK_SET); + else if (copy_pos) + read_buffer->ignore(copy_pos - read_pos); + } + + /// If we haven't received or calculated a checksum yet, calculate it now. + ReadBuffer * maybe_hashing_read_buffer = read_buffer.get(); + std::optional hashing_read_buffer; + if (!checksum) + maybe_hashing_read_buffer = &hashing_read_buffer.emplace(*read_buffer); + + /// Copy the entry's data after `copy_pos`. + String out_file_path = path_with_sep + name; + disk->createDirectories(directoryPath(out_file_path)); + auto out = disk->writeFile(out_file_path); + + copyData(*maybe_hashing_read_buffer, *out, size - copy_pos); + + if (hashing_read_buffer) + checksum = hashing_read_buffer->getHash(); + + /// Done! + EntryInfo info; + info.size = size; + info.checksum = *checksum; + if (use_base) + { + info.base_size = base_size; + info.base_checksum = base_checksum; + } + infos.emplace(name, info); + } +} + +void BackupInDirectory::finalizeWriting() +{ + if (open_mode != OpenMode::WRITE) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal operation: Cannot write to a backup opened for reading"); + writeContents(); + finalized = true; +} + +} diff --git a/src/Backups/BackupInDirectory.h b/src/Backups/BackupInDirectory.h new file mode 100644 index 00000000000..ceaf4e34ccb --- /dev/null +++ b/src/Backups/BackupInDirectory.h @@ -0,0 +1,66 @@ +#pragma once + +#include +#include +#include + + +namespace DB +{ +class IDisk; +using DiskPtr = std::shared_ptr; + +/// Represents a backup stored on a disk. +/// A backup is stored as a directory, each entry is stored as a file in that directory. +/// Also three system files are stored: +/// 1) ".base" is an XML file with information about the base backup. +/// 2) ".contents" is a binary file containing a list of all entries along with their sizes +/// and checksums and information whether the base backup should be used for each entry +/// 3) ".write_lock" is a temporary empty file which is created before writing of a backup +/// and deleted after finishing that writing. +class BackupInDirectory : public IBackup +{ +public: + BackupInDirectory(OpenMode open_mode_, const DiskPtr & disk_, const String & path_, const std::shared_ptr & base_backup_ = {}); + ~BackupInDirectory() override; + + OpenMode getOpenMode() const override; + String getPath() const override; + Strings list(const String & prefix, const String & terminator) const override; + bool exists(const String & name) const override; + size_t getSize(const String & name) const override; + UInt128 getChecksum(const String & name) const override; + BackupEntryPtr read(const String & name) const override; + void write(const String & name, BackupEntryPtr entry) override; + void finalizeWriting() override; + +private: + void open(); + void close(); + void writePathToBaseBackup(); + void readPathToBaseBackup(); + void writeContents(); + void readContents(); + + struct EntryInfo + { + UInt64 size = 0; + UInt128 checksum{0, 0}; + + /// for incremental backups + UInt64 base_size = 0; + UInt128 base_checksum{0, 0}; + }; + + const OpenMode open_mode; + const DiskPtr disk; + String path; + String path_with_sep; + std::shared_ptr base_backup; + std::map infos; + bool directory_was_created = false; + bool finalized = false; + mutable std::mutex mutex; +}; + +} diff --git a/src/Backups/BackupRenamingConfig.cpp b/src/Backups/BackupRenamingConfig.cpp new file mode 100644 index 00000000000..ff510d82a32 --- /dev/null +++ b/src/Backups/BackupRenamingConfig.cpp @@ -0,0 +1,89 @@ +#include +#include + + +namespace DB +{ +using Kind = ASTBackupQuery::Kind; +using ElementType = ASTBackupQuery::ElementType; + +void BackupRenamingConfig::setNewTableName(const DatabaseAndTableName & old_table_name, const DatabaseAndTableName & new_table_name) +{ + old_to_new_table_names[old_table_name] = new_table_name; +} + +void BackupRenamingConfig::setNewDatabaseName(const String & old_database_name, const String & new_database_name) +{ + old_to_new_database_names[old_database_name] = new_database_name; +} + +void BackupRenamingConfig::setNewTemporaryTableName(const String & old_temporary_table_name, const String & new_temporary_table_name) +{ + old_to_new_temporary_table_names[old_temporary_table_name] = new_temporary_table_name; +} + +void BackupRenamingConfig::setFromBackupQuery(const ASTBackupQuery & backup_query) +{ + setFromBackupQueryElements(backup_query.elements); +} + +void BackupRenamingConfig::setFromBackupQueryElements(const ASTBackupQuery::Elements & backup_query_elements) +{ + for (const auto & element : backup_query_elements) + { + switch (element.type) + { + case ElementType::TABLE: [[fallthrough]]; + case ElementType::DICTIONARY: + { + const auto & new_name = element.new_name.second.empty() ? element.name : element.new_name; + setNewTableName(element.name, new_name); + break; + } + + case ASTBackupQuery::DATABASE: + { + const auto & new_name = element.new_name.first.empty() ? element.name.first : element.new_name.first; + setNewDatabaseName(element.name.first, new_name); + break; + } + + case ASTBackupQuery::TEMPORARY_TABLE: + { + const auto & new_name = element.new_name.second.empty() ? element.name.second : element.new_name.second; + setNewTemporaryTableName(element.name.second, new_name); + break; + } + + case ASTBackupQuery::ALL_DATABASES: break; + case ASTBackupQuery::ALL_TEMPORARY_TABLES: break; + case ASTBackupQuery::EVERYTHING: break; + } + } +} + +DatabaseAndTableName BackupRenamingConfig::getNewTableName(const DatabaseAndTableName & old_table_name) const +{ + auto it = old_to_new_table_names.find(old_table_name); + if (it != old_to_new_table_names.end()) + return it->second; + return {getNewDatabaseName(old_table_name.first), old_table_name.second}; +} + +const String & BackupRenamingConfig::getNewDatabaseName(const String & old_database_name) const +{ + auto it = old_to_new_database_names.find(old_database_name); + if (it != old_to_new_database_names.end()) + return it->second; + return old_database_name; +} + +const String & BackupRenamingConfig::getNewTemporaryTableName(const String & old_temporary_table_name) const +{ + auto it = old_to_new_temporary_table_names.find(old_temporary_table_name); + if (it != old_to_new_temporary_table_names.end()) + return it->second; + return old_temporary_table_name; +} + +} diff --git a/src/Backups/BackupRenamingConfig.h b/src/Backups/BackupRenamingConfig.h new file mode 100644 index 00000000000..740781c9c9f --- /dev/null +++ b/src/Backups/BackupRenamingConfig.h @@ -0,0 +1,39 @@ +#pragma once + +#include +#include +#include +#include + + +namespace DB +{ +using DatabaseAndTableName = std::pair; + +/// Keeps information about renamings of databases or tables being processed +/// while we're making a backup or while we're restoring from a backup. +class BackupRenamingConfig +{ +public: + BackupRenamingConfig() = default; + + void setNewTableName(const DatabaseAndTableName & old_table_name, const DatabaseAndTableName & new_table_name); + void setNewDatabaseName(const String & old_database_name, const String & new_database_name); + void setNewTemporaryTableName(const String & old_temporary_table_name, const String & new_temporary_table_name); + void setFromBackupQuery(const ASTBackupQuery & backup_query); + void setFromBackupQueryElements(const ASTBackupQuery::Elements & backup_query_elements); + + /// Changes names according to the renaming. + DatabaseAndTableName getNewTableName(const DatabaseAndTableName & old_table_name) const; + const String & getNewDatabaseName(const String & old_database_name) const; + const String & getNewTemporaryTableName(const String & old_temporary_table_name) const; + +private: + std::map old_to_new_table_names; + std::unordered_map old_to_new_database_names; + std::unordered_map old_to_new_temporary_table_names; +}; + +using BackupRenamingConfigPtr = std::shared_ptr; + +} diff --git a/src/Backups/BackupSettings.cpp b/src/Backups/BackupSettings.cpp new file mode 100644 index 00000000000..f383330f246 --- /dev/null +++ b/src/Backups/BackupSettings.cpp @@ -0,0 +1,6 @@ +#include + +namespace DB +{ +IMPLEMENT_SETTINGS_TRAITS(BackupSettingsTraits, LIST_OF_BACKUP_SETTINGS) +} diff --git a/src/Backups/BackupSettings.h b/src/Backups/BackupSettings.h new file mode 100644 index 00000000000..dc8a679415c --- /dev/null +++ b/src/Backups/BackupSettings.h @@ -0,0 +1,16 @@ +#pragma once + +#include + + +namespace DB +{ + +#define LIST_OF_BACKUP_SETTINGS(M) \ + M(String, base_backup, "", "Name of the base backup. Only differences made after the base backup will be included in a newly created backup, so this option allows to make an incremental backup.", 0) \ + +DECLARE_SETTINGS_TRAITS_ALLOW_CUSTOM_SETTINGS(BackupSettingsTraits, LIST_OF_BACKUP_SETTINGS) + +struct BackupSettings : public BaseSettings {}; + +} diff --git a/src/Backups/BackupUtils.cpp b/src/Backups/BackupUtils.cpp new file mode 100644 index 00000000000..974ae252829 --- /dev/null +++ b/src/Backups/BackupUtils.cpp @@ -0,0 +1,830 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ +namespace ErrorCodes +{ + extern const int BACKUP_ELEMENT_DUPLICATE; + extern const int BACKUP_IS_EMPTY; + extern const int LOGICAL_ERROR; + extern const int TABLE_ALREADY_EXISTS; + extern const int CANNOT_RESTORE_TABLE; +} + +namespace +{ + using Kind = ASTBackupQuery::Kind; + using Element = ASTBackupQuery::Element; + using Elements = ASTBackupQuery::Elements; + using ElementType = ASTBackupQuery::ElementType; + + /// Replace elements of types DICTIONARY or EVERYTHING with elements of other types. + void replaceElementTypesWithBaseElementTypes(Elements & elements) + { + for (size_t i = 0; i != elements.size(); ++i) + { + auto & element = elements[i]; + switch (element.type) + { + case ElementType::DICTIONARY: + { + element.type = ElementType::TABLE; + break; + } + + case ElementType::EVERYTHING: + { + element.type = ElementType::ALL_DATABASES; + auto & new_element = elements.emplace_back(); + new_element.type = ElementType::ALL_TEMPORARY_TABLES; + break; + } + + default: + break; + } + } + } + + /// Replaces an empty database with the current database. + void replaceEmptyDatabaseWithCurrentDatabase(Elements & elements, const String & current_database) + { + for (auto & element : elements) + { + if (element.type == ElementType::TABLE) + { + if (element.name.first.empty() && !element.name.second.empty()) + element.name.first = current_database; + if (element.new_name.first.empty() && !element.new_name.second.empty()) + element.new_name.first = current_database; + } + } + } + + /// Replaces elements of types TEMPORARY_TABLE or ALL_TEMPORARY_TABLES with elements of type TABLE or DATABASE. + void replaceTemporaryTablesWithTemporaryDatabase(Elements & elements) + { + for (size_t i = 0; i != elements.size(); ++i) + { + auto & element = elements[i]; + switch (element.type) + { + case ElementType::TEMPORARY_TABLE: + { + element.type = ElementType::TABLE; + element.name.first = DatabaseCatalog::TEMPORARY_DATABASE; + if (element.new_name.first.empty() && !element.new_name.second.empty()) + element.new_name.first = DatabaseCatalog::TEMPORARY_DATABASE; + break; + } + + case ElementType::ALL_TEMPORARY_TABLES: + { + element.type = ElementType::DATABASE; + element.name.first = DatabaseCatalog::TEMPORARY_DATABASE; + break; + } + + default: + break; + } + } + } + + /// Set new names if they are not specified. + void setNewNamesIfNotSet(Elements & elements) + { + for (auto & element : elements) + { + switch (element.type) + { + case ElementType::TABLE: + { + if (element.new_name.second.empty()) + element.new_name = element.name; + break; + } + + case ElementType::DATABASE: + { + if (element.new_name.first.empty()) + element.new_name = element.name; + break; + } + + default: + break; + } + } + } + + /// Removes duplications in the elements of a backup query by removing some excessive elements and by updating except_lists. + /// This function helps deduplicate elements in queries like "BACKUP ALL DATABASES, DATABASE xxx USING NAME yyy" + /// (we need a deduplication for that query because `ALL DATABASES` includes `xxx` however we don't want + /// to backup/restore the same database twice while executing the same query). + /// Also this function slightly reorders elements: it puts databases before tables and dictionaries they contain. + void deduplicateAndReorderElements(Elements & elements) + { + std::set skip_indices; /// Indices of elements which should be removed in the end of this function. + size_t index_all_databases = static_cast(-1); /// Index of the first element of type ALL_DATABASES or -1 if not found. + + struct DatabaseInfo + { + size_t index = static_cast(-1); + std::unordered_map tables; + }; + std::unordered_map databases; /// Found databases and tables. + + for (size_t i = 0; i != elements.size(); ++i) + { + auto & element = elements[i]; + switch (element.type) + { + case ElementType::TABLE: + { + auto & tables = databases.emplace(element.name.first, DatabaseInfo{}).first->second.tables; + auto it = tables.find(element.name.second); + if (it == tables.end()) + { + tables.emplace(element.name.second, i); + } + else + { + size_t prev_index = it->second; + if ((elements[i].new_name == elements[prev_index].new_name) + && (elements[i].partitions.empty() == elements[prev_index].partitions.empty())) + { + insertAtEnd(elements[prev_index].partitions, elements[i].partitions); + skip_indices.emplace(i); + } + else + { + throw Exception( + "Table " + backQuote(element.name.first) + "." + backQuote(element.name.second) + " was specified twice", + ErrorCodes::BACKUP_ELEMENT_DUPLICATE); + } + } + break; + } + + case ElementType::DATABASE: + { + auto it = databases.find(element.name.first); + if (it == databases.end()) + { + DatabaseInfo new_db_info; + new_db_info.index = i; + databases.emplace(element.name.first, new_db_info); + } + else if (it->second.index == static_cast(-1)) + { + it->second.index = i; + } + else + { + size_t prev_index = it->second.index; + if ((elements[i].new_name == elements[prev_index].new_name) + && (elements[i].except_list == elements[prev_index].except_list)) + { + skip_indices.emplace(i); + } + else + { + throw Exception("Database " + backQuote(element.name.first) + " was specified twice", ErrorCodes::BACKUP_ELEMENT_DUPLICATE); + } + + } + break; + } + + case ElementType::ALL_DATABASES: + { + if (index_all_databases == static_cast(-1)) + { + index_all_databases = i; + } + else + { + size_t prev_index = index_all_databases; + if (elements[i].except_list == elements[prev_index].except_list) + skip_indices.emplace(i); + else + throw Exception("The tag ALL DATABASES was specified twice", ErrorCodes::BACKUP_ELEMENT_DUPLICATE); + } + break; + } + + default: + /// replaceElementTypesWithBaseElementTypes() and replaceTemporaryTablesWithTemporaryDatabase() should have removed all other element types. + throw Exception("Unexpected element type: " + std::to_string(static_cast(element.type)), ErrorCodes::LOGICAL_ERROR); + } + } + + if (index_all_databases != static_cast(-1)) + { + for (auto & [database_name, database] : databases) + { + elements[index_all_databases].except_list.emplace(database_name); + if (database.index == static_cast(-1)) + { + auto & new_element = elements.emplace_back(); + new_element.type = ElementType::DATABASE; + new_element.name.first = database_name; + new_element.new_name = new_element.name; + database.index = elements.size() - 1; + } + } + } + + for (auto & [database_name, database] : databases) + { + if (database.index == static_cast(-1)) + continue; + for (const auto & [table_name, table_index] : database.tables) + elements[database.index].except_list.emplace(table_name); + } + + /// Reorder the elements: databases should be before tables and dictionaries they contain. + for (auto & [database_name, database] : databases) + { + if (database.index == static_cast(-1)) + continue; + size_t min_index = std::numeric_limits::max(); + auto min_index_it = database.tables.end(); + for (auto it = database.tables.begin(); it != database.tables.end(); ++it) + { + if (min_index > it->second) + { + min_index = it->second; + min_index_it = it; + } + } + if (database.index > min_index) + { + std::swap(elements[database.index], elements[min_index]); + std::swap(database.index, min_index_it->second); + } + } + + for (auto skip_index : skip_indices | boost::adaptors::reversed) + elements.erase(elements.begin() + skip_index); + } + + Elements adjustElements(const Elements & elements, const String & current_database) + { + auto res = elements; + replaceElementTypesWithBaseElementTypes(res); + replaceEmptyDatabaseWithCurrentDatabase(res, current_database); + replaceTemporaryTablesWithTemporaryDatabase(res); + setNewNamesIfNotSet(res); + deduplicateAndReorderElements(res); + return res; + } + + String getDataPathInBackup(const DatabaseAndTableName & table_name) + { + if (table_name.first.empty() || table_name.second.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Database name and table name must not be empty"); + assert(!table_name.first.empty() && !table_name.second.empty()); + return String{"data/"} + escapeForFileName(table_name.first) + "/" + escapeForFileName(table_name.second) + "/"; + } + + String getDataPathInBackup(const IAST & create_query) + { + const auto & create = create_query.as(); + if (create.table.empty()) + return {}; + if (create.temporary) + return getDataPathInBackup({DatabaseCatalog::TEMPORARY_DATABASE, create.table}); + return getDataPathInBackup({create.database, create.table}); + } + + String getMetadataPathInBackup(const DatabaseAndTableName & table_name) + { + if (table_name.first.empty() || table_name.second.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Database name and table name must not be empty"); + return String{"metadata/"} + escapeForFileName(table_name.first) + "/" + escapeForFileName(table_name.second) + ".sql"; + } + + String getMetadataPathInBackup(const String & database_name) + { + if (database_name.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Database name must not be empty"); + return String{"metadata/"} + escapeForFileName(database_name) + ".sql"; + } + + String getMetadataPathInBackup(const IAST & create_query) + { + const auto & create = create_query.as(); + if (create.table.empty()) + return getMetadataPathInBackup(create.database); + if (create.temporary) + return getMetadataPathInBackup({DatabaseCatalog::TEMPORARY_DATABASE, create.table}); + return getMetadataPathInBackup({create.database, create.table}); + } + + void backupCreateQuery(const IAST & create_query, BackupEntries & backup_entries) + { + auto metadata_entry = std::make_unique(serializeAST(create_query)); + String metadata_path = getMetadataPathInBackup(create_query); + backup_entries.emplace_back(metadata_path, std::move(metadata_entry)); + } + + void backupTable( + const DatabaseAndTable & database_and_table, + const String & table_name, + const ASTs & partitions, + const ContextPtr & context, + const BackupRenamingConfigPtr & renaming_config, + BackupEntries & backup_entries) + { + const auto & database = database_and_table.first; + const auto & storage = database_and_table.second; + context->checkAccess(AccessType::SELECT, database->getDatabaseName(), table_name); + + auto create_query = database->getCreateTableQuery(table_name, context); + ASTPtr new_create_query = renameInCreateQuery(create_query, renaming_config, context); + backupCreateQuery(*new_create_query, backup_entries); + + auto data_backup = storage->backup(partitions, context); + if (!data_backup.empty()) + { + String data_path = getDataPathInBackup(*new_create_query); + for (auto & [path_in_backup, backup_entry] : data_backup) + backup_entries.emplace_back(data_path + path_in_backup, std::move(backup_entry)); + } + } + + void backupDatabase( + const DatabasePtr & database, + const std::set & except_list, + const ContextPtr & context, + const BackupRenamingConfigPtr & renaming_config, + BackupEntries & backup_entries) + { + context->checkAccess(AccessType::SHOW_TABLES, database->getDatabaseName()); + + auto create_query = database->getCreateDatabaseQuery(); + ASTPtr new_create_query = renameInCreateQuery(create_query, renaming_config, context); + backupCreateQuery(*new_create_query, backup_entries); + + for (auto it = database->getTablesIteratorForBackup(context); it->isValid(); it->next()) + { + if (except_list.contains(it->name())) + continue; + backupTable({database, it->table()}, it->name(), {}, context, renaming_config, backup_entries); + } + } + + void backupAllDatabases( + const std::set & except_list, + const ContextPtr & context, + const BackupRenamingConfigPtr & renaming_config, + BackupEntries & backup_entries) + { + for (const auto & [database_name, database] : DatabaseCatalog::instance().getDatabases()) + { + if (except_list.contains(database_name)) + continue; + if (database_name == DatabaseCatalog::SYSTEM_DATABASE || database_name == DatabaseCatalog::TEMPORARY_DATABASE) + continue; + backupDatabase(database, {}, context, renaming_config, backup_entries); + } + } + + void makeDatabaseIfNotExists(const String & database_name, ContextMutablePtr context) + { + if (DatabaseCatalog::instance().isDatabaseExist(database_name)) + return; + + /// We create and execute `create` query for the database name. + auto create_query = std::make_shared(); + create_query->database = database_name; + create_query->if_not_exists = true; + InterpreterCreateQuery create_interpreter{create_query, context}; + create_interpreter.execute(); + } + + ASTPtr readCreateQueryFromBackup(const DatabaseAndTableName & table_name, const BackupPtr & backup) + { + String create_query_path = getMetadataPathInBackup(table_name); + auto read_buffer = backup->read(create_query_path)->getReadBuffer(); + String create_query_str; + readStringUntilEOF(create_query_str, *read_buffer); + read_buffer.reset(); + ParserCreateQuery create_parser; + return parseQuery(create_parser, create_query_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH); + } + + ASTPtr readCreateQueryFromBackup(const String & database_name, const BackupPtr & backup) + { + String create_query_path = getMetadataPathInBackup(database_name); + auto read_buffer = backup->read(create_query_path)->getReadBuffer(); + String create_query_str; + readStringUntilEOF(create_query_str, *read_buffer); + read_buffer.reset(); + ParserCreateQuery create_parser; + return parseQuery(create_parser, create_query_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH); + } + + void restoreTable( + const DatabaseAndTableName & table_name, + const ASTs & partitions, + ContextMutablePtr context, + const BackupPtr & backup, + const BackupRenamingConfigPtr & renaming_config, + RestoreObjectsTasks & restore_tasks) + { + ASTPtr create_query = readCreateQueryFromBackup(table_name, backup); + auto new_create_query = typeid_cast>(renameInCreateQuery(create_query, renaming_config, context)); + + restore_tasks.emplace_back([table_name, new_create_query, partitions, context, backup]() -> RestoreDataTasks + { + DatabaseAndTableName new_table_name{new_create_query->database, new_create_query->table}; + if (new_create_query->temporary) + new_table_name.first = DatabaseCatalog::TEMPORARY_DATABASE; + + context->checkAccess(AccessType::INSERT, new_table_name.first, new_table_name.second); + + StoragePtr storage; + for (size_t try_index = 0; try_index != 10; ++try_index) + { + if (DatabaseCatalog::instance().isTableExist({new_table_name.first, new_table_name.second}, context)) + { + DatabasePtr existing_database; + StoragePtr existing_storage; + std::tie(existing_database, existing_storage) = DatabaseCatalog::instance().tryGetDatabaseAndTable({new_table_name.first, new_table_name.second}, context); + if (existing_storage) + { + if (auto existing_table_create_query = existing_database->tryGetCreateTableQuery(new_table_name.second, context)) + { + if (hasCompatibleDataToRestoreTable(*new_create_query, existing_table_create_query->as())) + { + storage = existing_storage; + break; + } + else + { + String error_message = (new_table_name.first == DatabaseCatalog::TEMPORARY_DATABASE) + ? ("Temporary table " + backQuoteIfNeed(new_table_name.second) + " already exists") + : ("Table " + backQuoteIfNeed(new_table_name.first) + "." + backQuoteIfNeed(new_table_name.second) + + " already exists"); + throw Exception(error_message, ErrorCodes::CANNOT_RESTORE_TABLE); + } + } + } + } + + makeDatabaseIfNotExists(new_table_name.first, context); + + try + { + InterpreterCreateQuery create_interpreter{new_create_query, context}; + create_interpreter.execute(); + } + catch (Exception & e) + { + if (e.code() != ErrorCodes::TABLE_ALREADY_EXISTS) + throw; + } + } + + if (!storage) + { + String error_message = (new_table_name.first == DatabaseCatalog::TEMPORARY_DATABASE) + ? ("Could not create temporary table " + backQuoteIfNeed(new_table_name.second) + " for restoring") + : ("Could not create table " + backQuoteIfNeed(new_table_name.first) + "." + backQuoteIfNeed(new_table_name.second) + + " for restoring"); + throw Exception(error_message, ErrorCodes::CANNOT_RESTORE_TABLE); + } + + String data_path_in_backup = getDataPathInBackup(table_name); + RestoreDataTasks restore_data_tasks = storage->restoreFromBackup(backup, data_path_in_backup, partitions, context); + + /// Keep `storage` alive while we're executing `restore_data_tasks`. + for (auto & restore_data_task : restore_data_tasks) + restore_data_task = [restore_data_task, storage]() { restore_data_task(); }; + + return restore_data_tasks; + }); + } + + void restoreDatabase(const String & database_name, const std::set & except_list, ContextMutablePtr context, const BackupPtr & backup, const BackupRenamingConfigPtr & renaming_config, RestoreObjectsTasks & restore_tasks) + { + ASTPtr create_query = readCreateQueryFromBackup(database_name, backup); + auto new_create_query = typeid_cast>(renameInCreateQuery(create_query, renaming_config, context)); + + restore_tasks.emplace_back([database_name, new_create_query, except_list, context, backup, renaming_config]() -> RestoreDataTasks + { + const String & new_database_name = new_create_query->database; + context->checkAccess(AccessType::SHOW_TABLES, new_database_name); + + if (!DatabaseCatalog::instance().isDatabaseExist(new_database_name)) + { + /// We create and execute `create` query for the database name. + new_create_query->if_not_exists = true; + InterpreterCreateQuery create_interpreter{new_create_query, context}; + create_interpreter.execute(); + } + + RestoreObjectsTasks restore_objects_tasks; + Strings table_names = backup->list("metadata/" + escapeForFileName(database_name) + "/", "/"); + for (const String & table_name : table_names) + { + if (except_list.contains(table_name)) + continue; + restoreTable({database_name, table_name}, {}, context, backup, renaming_config, restore_objects_tasks); + } + + RestoreDataTasks restore_data_tasks; + for (auto & restore_object_task : restore_objects_tasks) + insertAtEnd(restore_data_tasks, std::move(restore_object_task)()); + return restore_data_tasks; + }); + } + + void restoreAllDatabases(const std::set & except_list, ContextMutablePtr context, const BackupPtr & backup, const BackupRenamingConfigPtr & renaming_config, RestoreObjectsTasks & restore_tasks) + { + restore_tasks.emplace_back([except_list, context, backup, renaming_config]() -> RestoreDataTasks + { + Strings database_names = backup->list("metadata/", "/"); + RestoreObjectsTasks restore_objects_tasks; + for (const String & database_name : database_names) + { + if (except_list.contains(database_name)) + continue; + restoreDatabase(database_name, {}, context, backup, renaming_config, restore_objects_tasks); + } + + RestoreDataTasks restore_data_tasks; + for (auto & restore_object_task : restore_objects_tasks) + insertAtEnd(restore_data_tasks, std::move(restore_object_task)()); + return restore_data_tasks; + }); + } +} + + +BackupEntries makeBackupEntries(const Elements & elements, const ContextPtr & context) +{ + BackupEntries backup_entries; + + auto elements2 = adjustElements(elements, context->getCurrentDatabase()); + auto renaming_config = std::make_shared(); + renaming_config->setFromBackupQueryElements(elements2); + + for (const auto & element : elements2) + { + switch (element.type) + { + case ElementType::TABLE: + { + const String & database_name = element.name.first; + const String & table_name = element.name.second; + auto [database, storage] = DatabaseCatalog::instance().getDatabaseAndTable({database_name, table_name}, context); + backupTable({database, storage}, table_name, element.partitions, context, renaming_config, backup_entries); + break; + } + + case ElementType::DATABASE: + { + const String & database_name = element.name.first; + auto database = DatabaseCatalog::instance().getDatabase(database_name, context); + backupDatabase(database, element.except_list, context, renaming_config, backup_entries); + break; + } + + case ElementType::ALL_DATABASES: + { + backupAllDatabases(element.except_list, context, renaming_config, backup_entries); + break; + } + + default: + throw Exception("Unexpected element type", ErrorCodes::LOGICAL_ERROR); /// other element types have been removed in deduplicateElements() + } + } + + /// A backup cannot be empty. + if (backup_entries.empty()) + throw Exception("Backup must not be empty", ErrorCodes::BACKUP_IS_EMPTY); + + /// Check that all backup entries are unique. + std::sort( + backup_entries.begin(), + backup_entries.end(), + [](const std::pair> & lhs, const std::pair> & rhs) + { + return lhs.first < rhs.first; + }); + auto adjacent = std::adjacent_find(backup_entries.begin(), backup_entries.end()); + if (adjacent != backup_entries.end()) + throw Exception("Cannot write multiple entries with the same name " + quoteString(adjacent->first), ErrorCodes::BACKUP_ELEMENT_DUPLICATE); + + return backup_entries; +} + +UInt64 estimateBackupSize(const BackupEntries & backup_entries, const BackupPtr & base_backup) +{ + UInt64 total_size = 0; + for (const auto & [name, entry] : backup_entries) + { + UInt64 data_size = entry->getSize(); + if (base_backup) + { + if (base_backup->exists(name) && (data_size == base_backup->getSize(name))) + { + auto checksum = entry->getChecksum(); + if (checksum && (*checksum == base_backup->getChecksum(name))) + continue; + } + } + total_size += data_size; + } + return total_size; +} + +void writeBackupEntries(BackupMutablePtr backup, BackupEntries && backup_entries, size_t num_threads) +{ + if (!num_threads) + num_threads = 1; + std::vector threads; + size_t num_active_threads = 0; + std::mutex mutex; + std::condition_variable cond; + std::exception_ptr exception; + + for (auto & name_and_entry : backup_entries) + { + auto & name = name_and_entry.first; + auto & entry = name_and_entry.second; + + { + std::unique_lock lock{mutex}; + if (exception) + break; + cond.wait(lock, [&] { return num_active_threads < num_threads; }); + if (exception) + break; + ++num_active_threads; + } + + threads.emplace_back([backup, &name, &entry, &mutex, &cond, &num_active_threads, &exception]() + { + try + { + backup->write(name, std::move(entry)); + } + catch (...) + { + std::lock_guard lock{mutex}; + if (!exception) + exception = std::current_exception(); + } + + { + std::lock_guard lock{mutex}; + --num_active_threads; + cond.notify_all(); + } + }); + } + + for (auto & thread : threads) + thread.join(); + + backup_entries.clear(); + + if (exception) + { + /// We don't call finalizeWriting() if an error occurs. + /// And IBackup's implementation should remove the backup in its destructor if finalizeWriting() hasn't called before. + std::rethrow_exception(exception); + } + + backup->finalizeWriting(); +} + + +RestoreObjectsTasks makeRestoreTasks(const Elements & elements, ContextMutablePtr context, const BackupPtr & backup) +{ + RestoreObjectsTasks restore_tasks; + + auto elements2 = adjustElements(elements, context->getCurrentDatabase()); + auto renaming_config = std::make_shared(); + renaming_config->setFromBackupQueryElements(elements2); + + for (const auto & element : elements2) + { + switch (element.type) + { + case ElementType::TABLE: + { + const String & database_name = element.name.first; + const String & table_name = element.name.second; + restoreTable({database_name, table_name}, element.partitions, context, backup, renaming_config, restore_tasks); + break; + } + + case ElementType::DATABASE: + { + const String & database_name = element.name.first; + auto database = DatabaseCatalog::instance().getDatabase(database_name, context); + restoreDatabase(database_name, element.except_list, context, backup, renaming_config, restore_tasks); + break; + } + + case ElementType::ALL_DATABASES: + { + restoreAllDatabases(element.except_list, context, backup, renaming_config, restore_tasks); + break; + } + + default: + throw Exception("Unexpected element type", ErrorCodes::LOGICAL_ERROR); /// other element types have been removed in deduplicateElements() + } + } + + return restore_tasks; +} + + +void executeRestoreTasks(RestoreObjectsTasks && restore_tasks, size_t num_threads) +{ + if (!num_threads) + num_threads = 1; + + RestoreDataTasks restore_data_tasks; + for (auto & restore_object_task : restore_tasks) + insertAtEnd(restore_data_tasks, std::move(restore_object_task)()); + restore_tasks.clear(); + + std::vector threads; + size_t num_active_threads = 0; + std::mutex mutex; + std::condition_variable cond; + std::exception_ptr exception; + + for (auto & restore_data_task : restore_data_tasks) + { + { + std::unique_lock lock{mutex}; + if (exception) + break; + cond.wait(lock, [&] { return num_active_threads < num_threads; }); + if (exception) + break; + ++num_active_threads; + } + + threads.emplace_back([&restore_data_task, &mutex, &cond, &num_active_threads, &exception]() mutable + { + try + { + restore_data_task(); + restore_data_task = {}; + } + catch (...) + { + std::lock_guard lock{mutex}; + if (!exception) + exception = std::current_exception(); + } + + { + std::lock_guard lock{mutex}; + --num_active_threads; + cond.notify_all(); + } + }); + } + + for (auto & thread : threads) + thread.join(); + + restore_data_tasks.clear(); + + if (exception) + std::rethrow_exception(exception); +} + +} diff --git a/src/Backups/BackupUtils.h b/src/Backups/BackupUtils.h new file mode 100644 index 00000000000..70f080cf6e9 --- /dev/null +++ b/src/Backups/BackupUtils.h @@ -0,0 +1,39 @@ +#pragma once + +#include + + +namespace DB +{ + +class IBackup; +using BackupPtr = std::shared_ptr; +using BackupMutablePtr = std::shared_ptr; +class IBackupEntry; +using BackupEntryPtr = std::unique_ptr; +using BackupEntries = std::vector>; +using RestoreDataTask = std::function; +using RestoreDataTasks = std::vector; +using RestoreObjectTask = std::function; +using RestoreObjectsTasks = std::vector; +class Context; +using ContextPtr = std::shared_ptr; +using ContextMutablePtr = std::shared_ptr; + + +/// Prepares backup entries. +BackupEntries makeBackupEntries(const ASTBackupQuery::Elements & elements, const ContextPtr & context); + +/// Estimate total size of the backup which would be written from the specified entries. +UInt64 estimateBackupSize(const BackupEntries & backup_entries, const BackupPtr & base_backup); + +/// Write backup entries to an opened backup. +void writeBackupEntries(BackupMutablePtr backup, BackupEntries && backup_entries, size_t num_threads); + +/// Prepare restore tasks. +RestoreObjectsTasks makeRestoreTasks(const ASTBackupQuery::Elements & elements, ContextMutablePtr context, const BackupPtr & backup); + +/// Execute restore tasks. +void executeRestoreTasks(RestoreObjectsTasks && restore_tasks, size_t num_threads); + +} diff --git a/src/Backups/CMakeLists.txt b/src/Backups/CMakeLists.txt new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/Backups/IBackup.h b/src/Backups/IBackup.h new file mode 100644 index 00000000000..f568f209f4b --- /dev/null +++ b/src/Backups/IBackup.h @@ -0,0 +1,65 @@ +#pragma once + +#include +#include + + +namespace DB +{ +class IBackupEntry; +using BackupEntryPtr = std::unique_ptr; + +/// Represents a backup, i.e. a storage of BackupEntries which can be accessed by their names. +/// A backup can be either incremental or non-incremental. An incremental backup doesn't store +/// the data of the entries which are not changed compared to its base backup. +class IBackup +{ +public: + virtual ~IBackup() = default; + + enum class OpenMode + { + READ, + WRITE, + }; + + /// A backup can be open either in READ or WRITE mode. + virtual OpenMode getOpenMode() const = 0; + + /// Returns the path to the backup. + virtual String getPath() const = 0; + + /// Returns names of entries stored in the backup. + /// If `prefix` isn't empty the function will return only the names starting with + /// the prefix (but without the prefix itself). + /// If the `terminator` isn't empty the function will returns only parts of the names + /// before the terminator. For example, list("", "") returns names of all the entries + /// in the backup; and list("data/", "/") return kind of a list of folders and + /// files stored in the "data/" directory inside the backup. + virtual Strings list(const String & prefix = "", const String & terminator = "/") const = 0; + + /// Checks if an entry with a specified name exists. + virtual bool exists(const String & name) const = 0; + + /// Returns the size of the entry's data. + /// This function does the same as `read(name)->getSize()` but faster. + virtual size_t getSize(const String & name) const = 0; + + /// Returns the checksum of the entry's data. + /// This function does the same as `read(name)->getCheckum()` but faster. + virtual UInt128 getChecksum(const String & name) const = 0; + + /// Reads an entry from the backup. + virtual BackupEntryPtr read(const String & name) const = 0; + + /// Puts a new entry to the backup. + virtual void write(const String & name, BackupEntryPtr entry) = 0; + + /// Finalizes writing the backup, should be called after all entries have been successfully written. + virtual void finalizeWriting() = 0; +}; + +using BackupPtr = std::shared_ptr; +using BackupMutablePtr = std::shared_ptr; + +} diff --git a/src/Backups/IBackupEntry.h b/src/Backups/IBackupEntry.h new file mode 100644 index 00000000000..719e03ae6f5 --- /dev/null +++ b/src/Backups/IBackupEntry.h @@ -0,0 +1,32 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ +class ReadBuffer; + +/// A backup entry represents some data which should be written to the backup or has been read from the backup. +class IBackupEntry +{ +public: + virtual ~IBackupEntry() = default; + + /// Returns the size of the data. + virtual UInt64 getSize() const = 0; + + /// Returns the checksum of the data if it's precalculated. + /// Can return nullopt which means the checksum should be calculated from the read buffer. + virtual std::optional getChecksum() const { return {}; } + + /// Returns a read buffer for reading the data. + virtual std::unique_ptr getReadBuffer() const = 0; +}; + +using BackupEntryPtr = std::unique_ptr; +using BackupEntries = std::vector>; + +} diff --git a/src/Backups/hasCompatibleDataToRestoreTable.cpp b/src/Backups/hasCompatibleDataToRestoreTable.cpp new file mode 100644 index 00000000000..9c11d371bb0 --- /dev/null +++ b/src/Backups/hasCompatibleDataToRestoreTable.cpp @@ -0,0 +1,22 @@ +#include +#include +#include + + +namespace DB +{ + +bool hasCompatibleDataToRestoreTable(const ASTCreateQuery & query1, const ASTCreateQuery & query2) +{ + /// TODO: Write more subtle condition here. + auto q1 = typeid_cast>(query1.clone()); + auto q2 = typeid_cast>(query2.clone()); + + /// Remove UUIDs. + q1->uuid = UUIDHelpers::Nil; + q2->uuid = UUIDHelpers::Nil; + + return serializeAST(*q1) == serializeAST(*q2); +} + +} diff --git a/src/Backups/hasCompatibleDataToRestoreTable.h b/src/Backups/hasCompatibleDataToRestoreTable.h new file mode 100644 index 00000000000..92aab8b4579 --- /dev/null +++ b/src/Backups/hasCompatibleDataToRestoreTable.h @@ -0,0 +1,11 @@ +#pragma once + + +namespace DB +{ +class ASTCreateQuery; + +/// Whether the data of the first table can be inserted to the second table. +bool hasCompatibleDataToRestoreTable(const ASTCreateQuery & query1, const ASTCreateQuery & query2); + +} diff --git a/src/Backups/renameInCreateQuery.cpp b/src/Backups/renameInCreateQuery.cpp new file mode 100644 index 00000000000..a36995654ee --- /dev/null +++ b/src/Backups/renameInCreateQuery.cpp @@ -0,0 +1,276 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +namespace +{ + class RenameInCreateQueryTransformMatcher + { + public: + struct Data + { + BackupRenamingConfigPtr renaming_config; + ContextPtr context; + }; + + static bool needChildVisit(ASTPtr &, const ASTPtr &) { return true; } + + static void visit(ASTPtr & ast, const Data & data) + { + if (auto * create = ast->as()) + visitCreateQuery(*create, data); + else if (auto * expr = ast->as()) + visitTableExpression(*expr, data); + else if (auto * function = ast->as()) + visitFunction(*function, data); + else if (auto * dictionary = ast->as()) + visitDictionary(*dictionary, data); + } + + private: + /// Replaces names of tables and databases used in a CREATE query, which can be either CREATE TABLE or + /// CREATE DICTIONARY or CREATE VIEW or CREATE TEMPORARY TABLE or CREATE DATABASE query. + static void visitCreateQuery(ASTCreateQuery & create, const Data & data) + { + if (create.temporary) + { + if (create.table.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Table name specified in the CREATE TEMPORARY TABLE query must not be empty"); + create.table = data.renaming_config->getNewTemporaryTableName(create.table); + } + else if (create.table.empty()) + { + if (create.database.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Database name specified in the CREATE DATABASE query must not be empty"); + create.database = data.renaming_config->getNewDatabaseName(create.database); + } + else + { + if (create.database.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Database name specified in the CREATE TABLE query must not be empty"); + std::tie(create.database, create.table) = data.renaming_config->getNewTableName({create.database, create.table}); + } + + create.uuid = UUIDHelpers::Nil; + + if (!create.as_table.empty() && !create.as_database.empty()) + std::tie(create.as_database, create.as_table) = data.renaming_config->getNewTableName({create.as_database, create.as_table}); + + if (!create.to_table_id.table_name.empty() && !create.to_table_id.database_name.empty()) + { + auto to_table = data.renaming_config->getNewTableName({create.to_table_id.database_name, create.to_table_id.table_name}); + create.to_table_id = StorageID{to_table.first, to_table.second}; + } + } + + /// Replaces names of a database and a table in a expression like `db`.`table` + static void visitTableExpression(ASTTableExpression & expr, const Data & data) + { + if (!expr.database_and_table_name) + return; + + ASTIdentifier * id = expr.database_and_table_name->as(); + if (!id) + return; + + auto table_id = id->createTable(); + if (!table_id) + return; + + const String & db_name = table_id->getDatabaseName(); + const String & table_name = table_id->shortName(); + if (db_name.empty() || table_name.empty()) + return; + + String new_db_name, new_table_name; + std::tie(new_db_name, new_table_name) = data.renaming_config->getNewTableName({db_name, table_name}); + if ((new_db_name == db_name) && (new_table_name == table_name)) + return; + + expr.database_and_table_name = std::make_shared(Strings{new_db_name, new_table_name}); + expr.children.push_back(expr.database_and_table_name); + } + + /// Replaces names of tables and databases used in arguments of a table function or a table engine. + static void visitFunction(ASTFunction & function, const Data & data) + { + if ((function.name == "merge") || (function.name == "Merge")) + { + visitFunctionMerge(function, data); + } + else if ((function.name == "remote") || (function.name == "remoteSecure") || (function.name == "cluster") || + (function.name == "clusterAllReplicas") || (function.name == "Distributed")) + { + visitFunctionRemote(function, data); + } + } + + /// Replaces a database's name passed via an argument of the function merge() or the table engine Merge. + static void visitFunctionMerge(ASTFunction & function, const Data & data) + { + if (!function.arguments) + return; + + /// The first argument is a database's name and we can rename it. + /// The second argument is a regular expression and we can do nothing about it. + auto & args = function.arguments->as().children; + size_t db_name_arg_index = 0; + if (args.size() <= db_name_arg_index) + return; + + String db_name = evaluateConstantExpressionForDatabaseName(args[db_name_arg_index], data.context)->as().value.safeGet(); + if (db_name.empty()) + return; + + String new_db_name = data.renaming_config->getNewDatabaseName(db_name); + if (new_db_name == db_name) + return; + args[db_name_arg_index] = std::make_shared(new_db_name); + } + + /// Replaces names of a table and a database passed via arguments of the function remote() or cluster() or the table engine Distributed. + static void visitFunctionRemote(ASTFunction & function, const Data & data) + { + if (!function.arguments) + return; + + /// The first argument is an address or cluster's name, so we skip it. + /// The second argument can be either 'db.name' or just 'db' followed by the third argument 'table'. + auto & args = function.arguments->as().children; + + const auto * second_arg_as_function = args[1]->as(); + if (second_arg_as_function && TableFunctionFactory::instance().isTableFunctionName(second_arg_as_function->name)) + return; + + size_t db_name_index = 1; + if (args.size() <= db_name_index) + return; + + String db_name = evaluateConstantExpressionForDatabaseName(args[db_name_index], data.context)->as().value.safeGet(); + + String table_name; + size_t table_name_index = static_cast(-1); + size_t dot = String::npos; + if (function.name != "Distributed") + dot = db_name.find('.'); + if (dot != String::npos) + { + table_name = db_name.substr(dot + 1); + db_name.resize(dot); + } + else + { + table_name_index = 2; + if (args.size() <= table_name_index) + return; + table_name = evaluateConstantExpressionForDatabaseName(args[table_name_index], data.context)->as().value.safeGet(); + } + + if (db_name.empty() || table_name.empty()) + return; + + String new_db_name, new_table_name; + std::tie(new_db_name, new_table_name) = data.renaming_config->getNewTableName({db_name, table_name}); + if ((new_db_name == db_name) && (new_table_name == table_name)) + return; + + if (table_name_index != static_cast(-1)) + { + if (new_db_name != db_name) + args[db_name_index] = std::make_shared(new_db_name); + if (new_table_name != table_name) + args[table_name_index] = std::make_shared(new_table_name); + } + else + { + args[db_name_index] = std::make_shared(new_db_name); + args.insert(args.begin() + db_name_index + 1, std::make_shared(new_table_name)); + } + } + + /// Replaces names of a table and a database used in source parameters of a dictionary. + static void visitDictionary(ASTDictionary & dictionary, const Data & data) + { + if (!dictionary.source || dictionary.source->name != "clickhouse" || !dictionary.source->elements) + return; + + auto & elements = dictionary.source->elements->as().children; + String db_name, table_name; + size_t db_name_index = static_cast(-1); + size_t table_name_index = static_cast(-1); + + for (size_t i = 0; i != elements.size(); ++i) + { + auto & pair = elements[i]->as(); + if (pair.first == "db") + { + if (db_name_index != static_cast(-1)) + return; + db_name = pair.second->as().value.safeGet(); + db_name_index = i; + } + else if (pair.first == "table") + { + if (table_name_index != static_cast(-1)) + return; + table_name = pair.second->as().value.safeGet(); + table_name_index = i; + } + } + + if (db_name.empty() || table_name.empty()) + return; + + String new_db_name, new_table_name; + std::tie(new_db_name, new_table_name) = data.renaming_config->getNewTableName({db_name, table_name}); + if ((new_db_name == db_name) && (new_table_name == table_name)) + return; + + if (new_db_name != db_name) + { + auto & pair = elements[db_name_index]->as(); + pair.replace(pair.second, std::make_shared(new_db_name)); + } + if (new_table_name != table_name) + { + auto & pair = elements[table_name_index]->as(); + pair.replace(pair.second, std::make_shared(new_table_name)); + } + } + }; + + using RenameInCreateQueryTransformVisitor = InDepthNodeVisitor; +} + + +ASTPtr renameInCreateQuery(const ASTPtr & ast, const BackupRenamingConfigPtr & renaming_config, const ContextPtr & context) +{ + auto new_ast = ast->clone(); + try + { + RenameInCreateQueryTransformVisitor::Data data{renaming_config, context}; + RenameInCreateQueryTransformVisitor{data}.visit(new_ast); + return new_ast; + } + catch (...) + { + tryLogCurrentException("Backup", "Error while renaming in AST"); + return ast; + } +} + +} diff --git a/src/Backups/renameInCreateQuery.h b/src/Backups/renameInCreateQuery.h new file mode 100644 index 00000000000..9c62d07e5c6 --- /dev/null +++ b/src/Backups/renameInCreateQuery.h @@ -0,0 +1,16 @@ +#pragma once + +#include + +namespace DB +{ +class IAST; +using ASTPtr = std::shared_ptr; +class Context; +using ContextPtr = std::shared_ptr; +class BackupRenamingConfig; +using BackupRenamingConfigPtr = std::shared_ptr; + +/// Changes names in AST according to the renaming settings. +ASTPtr renameInCreateQuery(const ASTPtr & ast, const BackupRenamingConfigPtr & renaming_config, const ContextPtr & context); +} diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 796c9eb4d2c..e9c97f5dc1f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -45,6 +45,7 @@ if (COMPILER_GCC) endif () add_subdirectory (Access) +add_subdirectory (Backups) add_subdirectory (Columns) add_subdirectory (Common) add_subdirectory (Core) @@ -180,6 +181,7 @@ macro(add_object_library name common_path) endmacro() add_object_library(clickhouse_access Access) +add_object_library(clickhouse_backups Backups) add_object_library(clickhouse_core Core) add_object_library(clickhouse_core_mysql Core/MySQL) add_object_library(clickhouse_compression Compression) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index fd14bddb64c..49fcb3b222f 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -566,6 +566,17 @@ M(595, BZIP2_STREAM_ENCODER_FAILED) \ M(596, INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH) \ M(597, NO_SUCH_ERROR_CODE) \ + M(598, BACKUP_ALREADY_EXISTS) \ + M(599, BACKUP_NOT_FOUND) \ + M(600, BACKUP_VERSION_NOT_SUPPORTED) \ + M(601, BACKUP_DAMAGED) \ + M(602, NO_BASE_BACKUP) \ + M(603, WRONG_BASE_BACKUP) \ + M(604, BACKUP_ENTRY_ALREADY_EXISTS) \ + M(605, BACKUP_ENTRY_NOT_FOUND) \ + M(606, BACKUP_IS_EMPTY) \ + M(607, BACKUP_ELEMENT_DUPLICATE) \ + M(608, CANNOT_RESTORE_TABLE) \ \ M(998, POSTGRESQL_CONNECTION_FAILURE) \ M(999, KEEPER_EXCEPTION) \ diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 507daaf794d..0f7bc89fc25 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -354,6 +354,8 @@ class IColumn; M(UInt64, max_network_bandwidth_for_user, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running user queries. Zero means unlimited.", 0)\ M(UInt64, max_network_bandwidth_for_all_users, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running queries. Zero means unlimited.", 0) \ \ + M(UInt64, max_backup_threads, 0, "The maximum number of threads to execute a BACKUP or RESTORE request. By default, it is determined automatically.", 0) \ + \ M(Bool, log_profile_events, true, "Log query performance statistics into the query_log, query_thread_log and query_views_log.", 0) \ M(Bool, log_query_settings, true, "Log query settings into the query_log.", 0) \ M(Bool, log_query_threads, true, "Log query threads into system.query_thread_log table. This setting have effect only when 'log_queries' is true.", 0) \ diff --git a/src/Databases/IDatabase.h b/src/Databases/IDatabase.h index c2a8737da9c..387c6882eab 100644 --- a/src/Databases/IDatabase.h +++ b/src/Databases/IDatabase.h @@ -240,6 +240,12 @@ public: throw Exception(getEngineName() + ": RENAME DATABASE is not supported", ErrorCodes::NOT_IMPLEMENTED); } + /// Whether the contained tables should be written to a backup. + virtual DatabaseTablesIteratorPtr getTablesIteratorForBackup(ContextPtr context) const + { + return getTablesIterator(context); /// By default we backup each table. + } + /// Returns path for persistent data storage if the database supports it, empty string otherwise virtual String getDataPath() const { return {}; } diff --git a/src/Disks/TemporaryFileOnDisk.cpp b/src/Disks/TemporaryFileOnDisk.cpp new file mode 100644 index 00000000000..90e51dcc369 --- /dev/null +++ b/src/Disks/TemporaryFileOnDisk.cpp @@ -0,0 +1,27 @@ +#include +#include +#include + + +namespace DB +{ + +TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix_) + : disk(disk_) +{ + String dummy_prefix = "a/"; + filepath = Poco::TemporaryFile::tempName(dummy_prefix); + dummy_prefix += "tmp"; + assert(filepath.starts_with(dummy_prefix)); + filepath.replace(0, dummy_prefix.length(), prefix_); +} + +TemporaryFileOnDisk::~TemporaryFileOnDisk() +{ +#if 1 + if (disk && !filepath.empty()) + disk->removeRecursive(filepath); +#endif +} + +} diff --git a/src/Disks/TemporaryFileOnDisk.h b/src/Disks/TemporaryFileOnDisk.h new file mode 100644 index 00000000000..c854a600146 --- /dev/null +++ b/src/Disks/TemporaryFileOnDisk.h @@ -0,0 +1,29 @@ +#pragma once + +#include +#include + +namespace DB +{ +class IDisk; +using DiskPtr = std::shared_ptr; + +/// This class helps with the handling of temporary files or directories. +/// A unique name for the temporary file or directory is automatically chosen based on a specified prefix. +/// Optionally can create a directory in the constructor. +/// The destructor always removes the temporary file or directory with all contained files. +class TemporaryFileOnDisk +{ +public: + TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix_ = "tmp"); + ~TemporaryFileOnDisk(); + + DiskPtr getDisk() const { return disk; } + const String & getPath() const { return filepath; } + +private: + DiskPtr disk; + String filepath; +}; + +} diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 8bc2b4d76a3..f59d50dbdeb 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -46,6 +46,7 @@ #include #include #include +#include #include #include #include @@ -165,6 +166,8 @@ struct ContextSharedPart String tmp_path; /// Path to the temporary files that occur when processing the request. mutable VolumePtr tmp_volume; /// Volume for the the temporary files that occur when processing the request. + mutable VolumePtr backups_volume; /// Volume for all the backups. + mutable std::optional embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization. mutable std::optional external_dictionaries_loader; mutable std::optional external_models_loader; @@ -520,6 +523,35 @@ VolumePtr Context::setTemporaryStorage(const String & path, const String & polic return shared->tmp_volume; } +void Context::setBackupsVolume(const String & path, const String & policy_name) +{ + std::lock_guard lock(shared->storage_policies_mutex); + if (policy_name.empty()) + { + String path_with_separator = path; + if (!path_with_separator.ends_with('/')) + path_with_separator += '/'; + auto disk = std::make_shared("_backups_default", path_with_separator, 0); + shared->backups_volume = std::make_shared("_backups_default", disk, 0); + } + else + { + StoragePolicyPtr policy = getStoragePolicySelector(lock)->get(policy_name); + if (policy->getVolumes().size() != 1) + throw Exception("Policy " + policy_name + " is used for backups, such policy should have exactly one volume", + ErrorCodes::NO_ELEMENTS_IN_CONFIG); + shared->backups_volume = policy->getVolume(0); + } + + BackupFactory::instance().setBackupsVolume(shared->backups_volume); +} + +VolumePtr Context::getBackupsVolume() const +{ + std::lock_guard lock(shared->storage_policies_mutex); + return shared->backups_volume; +} + void Context::setFlagsPath(const String & path) { auto lock = getLock(); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 0cac9df8516..9527b87ed39 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -340,6 +340,9 @@ public: VolumePtr setTemporaryStorage(const String & path, const String & policy_name = ""); + void setBackupsVolume(const String & path, const String & policy_name = ""); + VolumePtr getBackupsVolume() const; + using ConfigurationPtr = Poco::AutoPtr; /// Global application configuration settings. diff --git a/src/Interpreters/InterpreterBackupQuery.cpp b/src/Interpreters/InterpreterBackupQuery.cpp new file mode 100644 index 00000000000..11e0b2e40ef --- /dev/null +++ b/src/Interpreters/InterpreterBackupQuery.cpp @@ -0,0 +1,64 @@ +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ +namespace +{ + BackupSettings getBackupSettings(const ASTBackupQuery & query) + { + BackupSettings settings; + if (query.settings) + settings.applyChanges(query.settings->as().changes); + return settings; + } + + BackupPtr getBaseBackup(const BackupSettings & settings) + { + const String & base_backup_name = settings.base_backup; + if (base_backup_name.empty()) + return nullptr; + return BackupFactory::instance().openBackup(base_backup_name); + } + + void executeBackup(const ASTBackupQuery & query, const ContextPtr & context) + { + auto settings = getBackupSettings(query); + auto base_backup = getBaseBackup(settings); + + auto backup_entries = makeBackupEntries(query.elements, context); + UInt64 estimated_backup_size = estimateBackupSize(backup_entries, base_backup); + + auto backup = BackupFactory::instance().createBackup(query.backup_name, estimated_backup_size, base_backup); + writeBackupEntries(backup, std::move(backup_entries), context->getSettingsRef().max_backup_threads); + } + + void executeRestore(const ASTBackupQuery & query, ContextMutablePtr context) + { + auto settings = getBackupSettings(query); + auto base_backup = getBaseBackup(settings); + + auto backup = BackupFactory::instance().openBackup(query.backup_name, base_backup); + auto restore_tasks = makeRestoreTasks(query.elements, context, backup); + executeRestoreTasks(std::move(restore_tasks), context->getSettingsRef().max_backup_threads); + } +} + +BlockIO InterpreterBackupQuery::execute() +{ + const auto & query = query_ptr->as(); + if (query.kind == ASTBackupQuery::BACKUP) + executeBackup(query, context); + else if (query.kind == ASTBackupQuery::RESTORE) + executeRestore(query, context); + return {}; +} + +} diff --git a/src/Interpreters/InterpreterBackupQuery.h b/src/Interpreters/InterpreterBackupQuery.h new file mode 100644 index 00000000000..1c4b579acc7 --- /dev/null +++ b/src/Interpreters/InterpreterBackupQuery.h @@ -0,0 +1,20 @@ +#pragma once + +#include +#include + + +namespace DB +{ +class InterpreterBackupQuery : public IInterpreter +{ +public: + InterpreterBackupQuery(const ASTPtr & query_ptr_, ContextMutablePtr context_) : query_ptr(query_ptr_), context(context_) {} + + BlockIO execute() override; + +private: + ASTPtr query_ptr; + ContextMutablePtr context; +}; +} diff --git a/src/Interpreters/InterpreterFactory.cpp b/src/Interpreters/InterpreterFactory.cpp index e5b381b4d08..a50a6279873 100644 --- a/src/Interpreters/InterpreterFactory.cpp +++ b/src/Interpreters/InterpreterFactory.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -33,6 +34,7 @@ #include #include +#include #include #include #include @@ -270,6 +272,10 @@ std::unique_ptr InterpreterFactory::get(ASTPtr & query, ContextMut { return std::make_unique(query, context); } + else if (query->as()) + { + return std::make_unique(query, context); + } else { throw Exception("Unknown type of query: " + query->getID(), ErrorCodes::UNKNOWN_TYPE_OF_QUERY); diff --git a/src/Parsers/ASTBackupQuery.cpp b/src/Parsers/ASTBackupQuery.cpp new file mode 100644 index 00000000000..e3a6287c72a --- /dev/null +++ b/src/Parsers/ASTBackupQuery.cpp @@ -0,0 +1,130 @@ +#include +#include +#include + + +namespace DB +{ +namespace +{ + using Kind = ASTBackupQuery::Kind; + using Element = ASTBackupQuery::Element; + using ElementType = ASTBackupQuery::ElementType; + + void formatName(const DatabaseAndTableName & name, ElementType type, const IAST::FormatSettings & format) + { + switch (type) + { + case ElementType::TABLE: [[fallthrough]]; + case ElementType::DICTIONARY: + { + format.ostr << " "; + if (!name.first.empty()) + format.ostr << backQuoteIfNeed(name.first) << "."; + format.ostr << backQuoteIfNeed(name.second); + break; + } + case ElementType::DATABASE: + { + format.ostr << " " << backQuoteIfNeed(name.first); + break; + } + case ElementType::TEMPORARY_TABLE: + { + format.ostr << " " << backQuoteIfNeed(name.second); + break; + } + default: + break; + } + } + + void formatPartitions(const ASTs & partitions, const IAST::FormatSettings & format) + { + if (partitions.empty()) + return; + format.ostr << (format.hilite ? IAST::hilite_keyword : "") << " " << ((partitions.size() == 1) ? "PARTITION" : "PARTITIONS") << " " + << (format.hilite ? IAST::hilite_none : ""); + bool need_comma = false; + for (const auto & partition : partitions) + { + if (std::exchange(need_comma, true)) + format.ostr << ","; + format.ostr << " "; + partition->format(format); + } + } + + void formatElement(const Element & element, Kind kind, const IAST::FormatSettings & format) + { + format.ostr << (format.hilite ? IAST::hilite_keyword : "") << " "; + switch (element.type) + { + case ElementType::TABLE: format.ostr << "TABLE"; break; + case ElementType::DICTIONARY: format.ostr << "DICTIONARY"; break; + case ElementType::DATABASE: format.ostr << "DATABASE"; break; + case ElementType::ALL_DATABASES: format.ostr << "ALL DATABASES"; break; + case ElementType::TEMPORARY_TABLE: format.ostr << "TEMPORARY TABLE"; break; + case ElementType::ALL_TEMPORARY_TABLES: format.ostr << "ALL TEMPORARY TABLES"; break; + case ElementType::EVERYTHING: format.ostr << "EVERYTHING"; break; + } + format.ostr << (format.hilite ? IAST::hilite_none : ""); + + formatName(element.name, element.type, format); + + bool under_another_name = !element.new_name.first.empty() || !element.new_name.second.empty(); + if (under_another_name) + { + format.ostr << (format.hilite ? IAST::hilite_keyword : "") << " " << ((kind == Kind::BACKUP) ? "AS" : "INTO") + << (format.hilite ? IAST::hilite_none : ""); + formatName(element.new_name, element.type, format); + } + + formatPartitions(element.partitions, format); + } + + void formatElements(const std::vector & elements, Kind kind, const IAST::FormatSettings & format) + { + bool need_comma = false; + for (const auto & element : elements) + { + if (std::exchange(need_comma, true)) + format.ostr << ","; + formatElement(element, kind, format); + } + } + + void formatSettings(const IAST & settings, const IAST::FormatSettings & format) + { + format.ostr << (format.hilite ? IAST::hilite_keyword : "") << " SETTINGS " << (format.hilite ? IAST::hilite_none : ""); + settings.format(format); + } +} + +String ASTBackupQuery::getID(char) const +{ + return (kind == Kind::BACKUP) ? "BackupQuery" : "RestoreQuery"; +} + + +ASTPtr ASTBackupQuery::clone() const +{ + return std::make_shared(*this); +} + + +void ASTBackupQuery::formatImpl(const FormatSettings & format, FormatState &, FormatStateStacked) const +{ + format.ostr << (format.hilite ? hilite_keyword : "") << ((kind == Kind::BACKUP) ? "BACKUP" : "RESTORE") + << (format.hilite ? hilite_none : ""); + + formatElements(elements, kind, format); + + if (settings) + formatSettings(*settings, format); + + format.ostr << (format.hilite ? hilite_keyword : "") << ((kind == Kind::BACKUP) ? " TO" : " FROM") << (format.hilite ? hilite_none : ""); + format.ostr << " " << quoteString(backup_name); +} + +} diff --git a/src/Parsers/ASTBackupQuery.h b/src/Parsers/ASTBackupQuery.h new file mode 100644 index 00000000000..8bcef0c804d --- /dev/null +++ b/src/Parsers/ASTBackupQuery.h @@ -0,0 +1,87 @@ +#pragma once + +#include + + +namespace DB +{ +using Strings = std::vector; +using DatabaseAndTableName = std::pair; + + +/** BACKUP { TABLE [db.]table_name [AS [db.]table_name_in_backup] [PARTITION[S] partition_expr [,...]] | + * DICTIONARY [db.]dictionary_name [AS [db.]dictionary_name_in_backup] | + * DATABASE database_name [AS database_name_in_backup] | + * ALL DATABASES | + * TEMPORARY TABLE table_name [AS table_name_in_backup] + * ALL TEMPORARY TABLES | + * EVERYTHING } [,...] + * TO 'backup_name' + * SETTINGS base_backup='base_backup_name' + * + * RESTORE { TABLE [db.]table_name_in_backup [INTO [db.]table_name] [PARTITION[S] partition_expr [,...]] | + * DICTIONARY [db.]dictionary_name_in_backup [INTO [db.]dictionary_name] | + * DATABASE database_name_in_backup [INTO database_name] | + * ALL DATABASES | + * TEMPORARY TABLE table_name_in_backup [INTO table_name] | + * ALL TEMPORARY TABLES | + * EVERYTHING } [,...] + * FROM 'backup_name' + * + * Notes: + * RESTORE doesn't drop any data, it either creates a table or appends an existing table with restored data. + * This behaviour can cause data duplication. + * If appending isn't possible because the existing table has incompatible format then RESTORE will throw an exception. + * + * The "UNDER NAME" clause is useful to backup or restore under another name. + * For the BACKUP command this clause allows to set the name which an object will have inside the backup. + * And for the RESTORE command this clause allows to set the name which an object will have after RESTORE has finished. + * + * "ALL DATABASES" means all databases except the system database and the internal database containing temporary tables. + * "EVERYTHING" works exactly as "ALL DATABASES, ALL TEMPORARY TABLES" + * + * The "WITH BASE" clause allows to set a base backup. Only differences made after the base backup will be + * included in a newly created backup, so this option allows to make an incremental backup. + */ +class ASTBackupQuery : public IAST +{ +public: + enum Kind + { + BACKUP, + RESTORE, + }; + Kind kind = Kind::BACKUP; + + enum ElementType + { + TABLE, + DICTIONARY, + DATABASE, + ALL_DATABASES, + TEMPORARY_TABLE, + ALL_TEMPORARY_TABLES, + EVERYTHING, + }; + + struct Element + { + ElementType type; + DatabaseAndTableName name; + DatabaseAndTableName new_name; + ASTs partitions; + std::set except_list; + }; + + using Elements = std::vector; + Elements elements; + + String backup_name; + + ASTPtr settings; + + String getID(char) const override; + ASTPtr clone() const override; + void formatImpl(const FormatSettings & format, FormatState &, FormatStateStacked) const override; +}; +} diff --git a/src/Parsers/ParserBackupQuery.cpp b/src/Parsers/ParserBackupQuery.cpp new file mode 100644 index 00000000000..f9a54f053af --- /dev/null +++ b/src/Parsers/ParserBackupQuery.cpp @@ -0,0 +1,204 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace +{ + using Kind = ASTBackupQuery::Kind; + using Element = ASTBackupQuery::Element; + using ElementType = ASTBackupQuery::ElementType; + + bool parseName(IParser::Pos & pos, Expected & expected, ElementType type, DatabaseAndTableName & name) + { + switch (type) + { + case ElementType::TABLE: [[fallthrough]]; + case ElementType::DICTIONARY: + { + return parseDatabaseAndTableName(pos, expected, name.first, name.second); + } + + case ElementType::DATABASE: + { + ASTPtr ast; + if (!ParserIdentifier{}.parse(pos, ast, expected)) + return false; + name.first = getIdentifierName(ast); + name.second.clear(); + return true; + } + + case ElementType::TEMPORARY_TABLE: + { + ASTPtr ast; + if (!ParserIdentifier{}.parse(pos, ast, expected)) + return false; + name.second = getIdentifierName(ast); + name.first.clear(); + return true; + } + + default: + return true; + } + } + + bool parsePartitions(IParser::Pos & pos, Expected & expected, ASTs & partitions) + { + if (!ParserKeyword{"PARTITION"}.ignore(pos, expected) && !ParserKeyword{"PARTITIONS"}.ignore(pos, expected)) + return false; + + ASTs result; + auto parse_list_element = [&] + { + ASTPtr ast; + if (!ParserPartition{}.parse(pos, ast, expected)) + return false; + result.emplace_back(ast); + return true; + }; + if (!ParserList::parseUtil(pos, expected, parse_list_element, false)) + return false; + + partitions = std::move(result); + return true; + } + + bool parseElement(IParser::Pos & pos, Expected & expected, Element & entry) + { + return IParserBase::wrapParseImpl(pos, [&] + { + ElementType type; + if (ParserKeyword{"TABLE"}.ignore(pos, expected)) + type = ElementType::TABLE; + else if (ParserKeyword{"DICTIONARY"}.ignore(pos, expected)) + type = ElementType::DICTIONARY; + else if (ParserKeyword{"DATABASE"}.ignore(pos, expected)) + type = ElementType::DATABASE; + else if (ParserKeyword{"ALL DATABASES"}.ignore(pos, expected)) + type = ElementType::ALL_DATABASES; + else if (ParserKeyword{"TEMPORARY TABLE"}.ignore(pos, expected)) + type = ElementType::TEMPORARY_TABLE; + else if (ParserKeyword{"ALL TEMPORARY TABLES"}.ignore(pos, expected)) + type = ElementType::ALL_TEMPORARY_TABLES; + else if (ParserKeyword{"EVERYTHING"}.ignore(pos, expected)) + type = ElementType::EVERYTHING; + else + return false; + + DatabaseAndTableName name; + if (!parseName(pos, expected, type, name)) + return false; + + ASTs partitions; + if (type == ElementType::TABLE) + parsePartitions(pos, expected, partitions); + + DatabaseAndTableName new_name; + if (ParserKeyword{"AS"}.ignore(pos, expected) || ParserKeyword{"INTO"}.ignore(pos, expected)) + { + if (!parseName(pos, expected, type, new_name)) + return false; + } + + if ((type == ElementType::TABLE) && partitions.empty()) + parsePartitions(pos, expected, partitions); + + entry.type = type; + entry.name = std::move(name); + entry.new_name = std::move(new_name); + entry.partitions = std::move(partitions); + return true; + }); + } + + bool parseElements(IParser::Pos & pos, Expected & expected, std::vector & elements) + { + return IParserBase::wrapParseImpl(pos, [&] + { + std::vector result; + + auto parse_element = [&] + { + Element element; + if (parseElement(pos, expected, element)) + { + result.emplace_back(std::move(element)); + return true; + } + return false; + }; + + if (!ParserList::parseUtil(pos, expected, parse_element, false)) + return false; + + elements = std::move(result); + return true; + }); + } + + bool parseSettings(IParser::Pos & pos, Expected & expected, ASTPtr & settings) + { + return IParserBase::wrapParseImpl(pos, [&] + { + if (!ParserKeyword{"SETTINGS"}.ignore(pos, expected)) + return false; + + ASTPtr result; + if (!ParserSetQuery{true}.parse(pos, result, expected)) + return false; + + settings = std::move(result); + return true; + }); + } +} + + +bool ParserBackupQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) +{ + Kind kind; + if (ParserKeyword{"BACKUP"}.ignore(pos, expected)) + kind = Kind::BACKUP; + else if (ParserKeyword{"RESTORE"}.ignore(pos, expected)) + kind = Kind::RESTORE; + else + return false; + + std::vector elements; + if (!parseElements(pos, expected, elements)) + return false; + + if (!ParserKeyword{(kind == Kind::BACKUP) ? "TO" : "FROM"}.ignore(pos, expected)) + return false; + ASTPtr ast; + if (!ParserStringLiteral{}.parse(pos, ast, expected)) + return false; + String backup_name = ast->as().value.safeGet(); + + ASTPtr settings; + parseSettings(pos, expected, settings); + + auto query = std::make_shared(); + node = query; + + query->kind = kind; + query->elements = std::move(elements); + query->backup_name = std::move(backup_name); + query->settings = std::move(settings); + + return true; +} + +} diff --git a/src/Parsers/ParserBackupQuery.h b/src/Parsers/ParserBackupQuery.h new file mode 100644 index 00000000000..45a551cfd00 --- /dev/null +++ b/src/Parsers/ParserBackupQuery.h @@ -0,0 +1,34 @@ +#pragma once + +#include + + +namespace DB +{ +/** Parses queries like + * BACKUP { TABLE [db.]table_name [AS [db.]table_name_in_backup] [PARTITION[S] partition_expr [,...]] | + * DICTIONARY [db.]dictionary_name [AS [db.]dictionary_name_in_backup] | + * DATABASE database_name [AS database_name_in_backup] | + * ALL DATABASES | + * TEMPORARY TABLE table_name [AS table_name_in_backup] + * ALL TEMPORARY TABLES | + * EVERYTHING } [,...] + * TO 'backup_name' + * [SETTINGS base_backup = 'base_backup_name'] + * + * RESTORE { TABLE [db.]table_name_in_backup [INTO [db.]table_name] [PARTITION[S] partition_expr [,...]] | + * DICTIONARY [db.]dictionary_name_in_backup [INTO [db.]dictionary_name] | + * DATABASE database_name_in_backup [INTO database_name] | + * ALL DATABASES | + * TEMPORARY TABLE table_name_in_backup [INTO table_name] | + * ALL TEMPORARY TABLES | + * EVERYTHING } [,...] + * FROM 'backup_name' + */ +class ParserBackupQuery : public IParserBase +{ +protected: + const char * getName() const override { return "BACKUP or RESTORE query"; } + bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; +}; +} diff --git a/src/Parsers/ParserQuery.cpp b/src/Parsers/ParserQuery.cpp index 4550bdc8a75..3cc6b530d7c 100644 --- a/src/Parsers/ParserQuery.cpp +++ b/src/Parsers/ParserQuery.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -40,6 +41,7 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ParserGrantQuery grant_p; ParserSetRoleQuery set_role_p; ParserExternalDDLQuery external_ddl_p; + ParserBackupQuery backup_p; bool res = query_with_output_p.parse(pos, node, expected) || insert_p.parse(pos, node, expected) @@ -54,7 +56,8 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) || create_settings_profile_p.parse(pos, node, expected) || drop_access_entity_p.parse(pos, node, expected) || grant_p.parse(pos, node, expected) - || external_ddl_p.parse(pos, node, expected); + || external_ddl_p.parse(pos, node, expected) + || backup_p.parse(pos, node, expected); return res; } diff --git a/src/Storages/IStorage.cpp b/src/Storages/IStorage.cpp index 0e9f82a9802..cd48786b6c1 100644 --- a/src/Storages/IStorage.cpp +++ b/src/Storages/IStorage.cpp @@ -201,6 +201,16 @@ NameDependencies IStorage::getDependentViewsByColumn(ContextPtr context) const return name_deps; } +BackupEntries IStorage::backup(const ASTs &, ContextPtr) const +{ + throw Exception("Table engine " + getName() + " doesn't support backups", ErrorCodes::NOT_IMPLEMENTED); +} + +RestoreDataTasks IStorage::restoreFromBackup(const BackupPtr &, const String &, const ASTs &, ContextMutablePtr) +{ + throw Exception("Table engine " + getName() + " doesn't support restoring", ErrorCodes::NOT_IMPLEMENTED); +} + std::string PrewhereInfo::dump() const { WriteBufferFromOwnString ss; diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 85bfbfb1f84..b1d8fab5750 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -65,6 +65,13 @@ class EnabledQuota; struct SelectQueryInfo; using NameDependencies = std::unordered_map>; +using DatabaseAndTableName = std::pair; + +class IBackup; +using BackupPtr = std::shared_ptr; +class IBackupEntry; +using BackupEntries = std::vector>>; +using RestoreDataTasks = std::vector>; struct ColumnSize { @@ -188,6 +195,12 @@ public: NameDependencies getDependentViewsByColumn(ContextPtr context) const; + /// Prepares entries to backup data of the storage. + virtual BackupEntries backup(const ASTs & partitions, ContextPtr context) const; + + /// Extract data from the backup and put it to the storage. + virtual RestoreDataTasks restoreFromBackup(const BackupPtr & backup, const String & data_path_in_backup, const ASTs & partitions, ContextMutablePtr context); + protected: /// Returns whether the column is virtual - by default all columns are real. /// Initially reserved virtual column name may be shadowed by real column. diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 7b07f4aba76..764f5d7adf7 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1,3 +1,8 @@ +#include + +#include +#include +#include #include #include #include @@ -9,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -32,7 +38,6 @@ #include #include #include -#include #include #include #include @@ -56,6 +61,7 @@ #include #include +#include #include #include @@ -2837,7 +2843,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & return getActiveContainingPart(part_info); } -MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartition(MergeTreeData::DataPartState state, const String & partition_id) +MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartition(MergeTreeData::DataPartState state, const String & partition_id) const { DataPartStateAndPartitionID state_with_partition{state, partition_id}; @@ -2847,6 +2853,22 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartition(Merg data_parts_by_state_and_info.upper_bound(state_with_partition)); } +MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartitions(MergeTreeData::DataPartState state, const std::unordered_set & partition_ids) const +{ + auto lock = lockParts(); + DataPartsVector res; + for (const auto & partition_id : partition_ids) + { + DataPartStateAndPartitionID state_with_partition{state, partition_id}; + insertAtEnd( + res, + DataPartsVector( + data_parts_by_state_and_info.lower_bound(state_with_partition), + data_parts_by_state_and_info.upper_bound(state_with_partition))); + } + return res; +} + MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const MergeTreePartInfo & part_info, const MergeTreeData::DataPartStates & valid_states) { auto lock = lockParts(); @@ -3208,6 +3230,121 @@ Pipe MergeTreeData::alterPartition( return {}; } + +BackupEntries MergeTreeData::backup(const ASTs & partitions, ContextPtr local_context) const +{ + DataPartsVector data_parts; + if (partitions.empty()) + data_parts = getDataPartsVector(); + else + data_parts = getDataPartsVectorInPartitions(MergeTreeDataPartState::Committed, getPartitionIDsFromQuery(partitions, local_context)); + return backupDataParts(data_parts); +} + + +BackupEntries MergeTreeData::backupDataParts(const DataPartsVector & data_parts) +{ + BackupEntries backup_entries; + std::map> temp_dirs; + + for (const auto & part : data_parts) + { + auto disk = part->volume->getDisk(); + + auto temp_dir_it = temp_dirs.find(disk); + if (temp_dir_it == temp_dirs.end()) + temp_dir_it = temp_dirs.emplace(disk, std::make_shared(disk, "tmp_backup_")).first; + auto temp_dir_owner = temp_dir_it->second; + fs::path temp_dir = temp_dir_owner->getPath(); + + fs::path part_dir = part->getFullRelativePath(); + fs::path temp_part_dir = temp_dir / part->relative_path; + disk->createDirectories(temp_part_dir); + + for (const auto & [filepath, checksum] : part->checksums.files) + { + String relative_filepath = fs::path(part->relative_path) / filepath; + String hardlink_filepath = temp_part_dir / filepath; + disk->createHardLink(part_dir / filepath, hardlink_filepath); + UInt128 file_hash{checksum.file_hash.first, checksum.file_hash.second}; + backup_entries.emplace_back( + relative_filepath, + std::make_unique(disk, hardlink_filepath, checksum.file_size, file_hash, temp_dir_owner)); + } + + for (const auto & filepath : part->getFileNamesWithoutChecksums()) + { + String relative_filepath = fs::path(part->relative_path) / filepath; + backup_entries.emplace_back(relative_filepath, std::make_unique(disk, part_dir / filepath)); + } + } + + return backup_entries; +} + + +RestoreDataTasks MergeTreeData::restoreDataPartsFromBackup(const BackupPtr & backup, const String & data_path_in_backup, + const std::unordered_set & partition_ids, + SimpleIncrement * increment) +{ + RestoreDataTasks restore_tasks; + + Strings part_names = backup->list(data_path_in_backup); + for (const String & part_name : part_names) + { + MergeTreePartInfo part_info; + if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version)) + continue; + + if (!partition_ids.empty() && !partition_ids.contains(part_info.partition_id)) + continue; + + UInt64 total_size_of_part = 0; + Strings filenames = backup->list(data_path_in_backup + part_name + "/", ""); + for (const String & filename : filenames) + total_size_of_part += backup->getSize(data_path_in_backup + part_name + "/" + filename); + + std::shared_ptr reservation = getStoragePolicy()->reserveAndCheck(total_size_of_part); + + auto restore_task = [this, + backup, + data_path_in_backup, + part_name, + part_info = std::move(part_info), + filenames = std::move(filenames), + reservation, + increment]() + { + auto disk = reservation->getDisk(); + + auto temp_part_dir_owner = std::make_shared(disk, relative_data_path + "restoring_" + part_name + "_"); + String temp_part_dir = temp_part_dir_owner->getPath(); + disk->createDirectories(temp_part_dir); + + assert(temp_part_dir.starts_with(relative_data_path)); + String relative_temp_part_dir = temp_part_dir.substr(relative_data_path.size()); + + for (const String & filename : filenames) + { + auto backup_entry = backup->read(data_path_in_backup + part_name + "/" + filename); + auto read_buffer = backup_entry->getReadBuffer(); + auto write_buffer = disk->writeFile(temp_part_dir + "/" + filename); + copyData(*read_buffer, *write_buffer); + } + + auto single_disk_volume = std::make_shared(disk->getName(), disk, 0); + auto part = createPart(part_name, part_info, single_disk_volume, relative_temp_part_dir); + part->loadColumnsChecksumsIndexes(false, true); + renameTempPartAndAdd(part, increment); + }; + + restore_tasks.emplace_back(std::move(restore_task)); + } + + return restore_tasks; +} + + String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr local_context) const { const auto & partition_ast = ast->as(); @@ -3303,6 +3440,15 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr loc return partition_id; } +std::unordered_set MergeTreeData::getPartitionIDsFromQuery(const ASTs & asts, ContextPtr local_context) const +{ + std::unordered_set partition_ids; + for (const auto & ast : asts) + partition_ids.emplace(getPartitionIDFromQuery(ast, local_context)); + return partition_ids; +} + + MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector( const DataPartStates & affordable_states, DataPartStateVector * out_states, bool require_projection_parts) const { diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 02d1f5e264e..05d1b45a557 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -436,7 +436,8 @@ public: void swapActivePart(MergeTreeData::DataPartPtr part_copy); /// Returns all parts in specified partition - DataPartsVector getDataPartsVectorInPartition(DataPartState state, const String & partition_id); + DataPartsVector getDataPartsVectorInPartition(DataPartState state, const String & partition_id) const; + DataPartsVector getDataPartsVectorInPartitions(DataPartState state, const std::unordered_set & partition_ids) const; /// Returns the part with the given name and state or nullptr if no such part. DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states); @@ -606,6 +607,17 @@ public: ContextPtr context, TableLockHolder & table_lock_holder); + /// Prepares entries to backup data of the storage. + BackupEntries backup(const ASTs & partitions, ContextPtr context) const override; + static BackupEntries backupDataParts(const DataPartsVector & data_parts); + + /// Extract data from the backup and put it to the storage. + RestoreDataTasks restoreDataPartsFromBackup( + const BackupPtr & backup, + const String & data_path_in_backup, + const std::unordered_set & partition_ids, + SimpleIncrement * increment); + /// Moves partition to specified Disk void movePartitionToDisk(const ASTPtr & partition, const String & name, bool moving_part, ContextPtr context); @@ -636,6 +648,7 @@ public: /// For ATTACH/DETACH/DROP PARTITION. String getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr context) const; + std::unordered_set getPartitionIDsFromQuery(const ASTs & asts, ContextPtr context) const; /// Extracts MergeTreeData of other *MergeTree* storage /// and checks that their structure suitable for ALTER TABLE ATTACH PARTITION FROM diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 32c2c76dd10..880a729cb2c 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -1623,6 +1624,12 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_ } +RestoreDataTasks StorageMergeTree::restoreFromBackup(const BackupPtr & backup, const String & data_path_in_backup, const ASTs & partitions, ContextMutablePtr local_context) +{ + return restoreDataPartsFromBackup(backup, data_path_in_backup, getPartitionIDsFromQuery(partitions, local_context), &increment); +} + + MutationCommands StorageMergeTree::getFirstAlterMutationCommandsForPart(const DataPartPtr & part) const { std::lock_guard lock(currently_processing_in_background_mutex); diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 681475f7a49..e2dbc217135 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -94,6 +94,8 @@ public: CheckResults checkData(const ASTPtr & query, ContextPtr context) override; + RestoreDataTasks restoreFromBackup(const BackupPtr & backup, const String & data_path_in_backup, const ASTs & partitions, ContextMutablePtr context) override; + bool scheduleDataProcessingJob(IBackgroundJobExecutor & executor) override; MergeTreeDeduplicationLog * getDeduplicationLog() { return deduplication_log.get(); } diff --git a/tests/integration/test_backup_restore_new/__init__.py b/tests/integration/test_backup_restore_new/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_backup_restore_new/test.py b/tests/integration/test_backup_restore_new/test.py new file mode 100644 index 00000000000..472ecaf608b --- /dev/null +++ b/tests/integration/test_backup_restore_new/test.py @@ -0,0 +1,120 @@ +import pytest +import re +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) +instance = cluster.add_instance('instance') + + +def create_and_fill_table(): + instance.query("CREATE DATABASE test") + instance.query("CREATE TABLE test.table(x UInt32, y String) ENGINE=MergeTree ORDER BY y PARTITION BY x%10") + instance.query("INSERT INTO test.table SELECT number, toString(number) FROM numbers(100)") + + +@pytest.fixture(scope="module", autouse=True) +def start_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +@pytest.fixture(autouse=True) +def cleanup_after_test(): + try: + yield + finally: + instance.query("DROP DATABASE IF EXISTS test") + + +backup_id_counter = 0 +def new_backup_name(): + global backup_id_counter + backup_id_counter += 1 + return f"test-backup-{backup_id_counter}" + + +def test_restore_table(): + backup_name = new_backup_name() + create_and_fill_table() + + assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" + instance.query(f"BACKUP TABLE test.table TO '{backup_name}'") + + instance.query("DROP TABLE test.table") + assert instance.query("EXISTS test.table") == "0\n" + + instance.query(f"RESTORE TABLE test.table FROM '{backup_name}'") + assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" + + +def test_restore_table_into_existing_table(): + backup_name = new_backup_name() + create_and_fill_table() + + assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" + instance.query(f"BACKUP TABLE test.table TO '{backup_name}'") + + instance.query(f"RESTORE TABLE test.table INTO test.table FROM '{backup_name}'") + assert instance.query("SELECT count(), sum(x) FROM test.table") == "200\t9900\n" + + instance.query(f"RESTORE TABLE test.table INTO test.table FROM '{backup_name}'") + assert instance.query("SELECT count(), sum(x) FROM test.table") == "300\t14850\n" + + +def test_restore_table_under_another_name(): + backup_name = new_backup_name() + create_and_fill_table() + + assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" + instance.query(f"BACKUP TABLE test.table TO '{backup_name}'") + + assert instance.query("EXISTS test.table2") == "0\n" + + instance.query(f"RESTORE TABLE test.table INTO test.table2 FROM '{backup_name}'") + assert instance.query("SELECT count(), sum(x) FROM test.table2") == "100\t4950\n" + + +def test_backup_table_under_another_name(): + backup_name = new_backup_name() + create_and_fill_table() + + assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" + instance.query(f"BACKUP TABLE test.table AS test.table2 TO '{backup_name}'") + + assert instance.query("EXISTS test.table2") == "0\n" + + instance.query(f"RESTORE TABLE test.table2 FROM '{backup_name}'") + assert instance.query("SELECT count(), sum(x) FROM test.table2") == "100\t4950\n" + + +def test_incremental_backup(): + backup_name = new_backup_name() + incremental_backup_name = new_backup_name() + create_and_fill_table() + + assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n" + instance.query(f"BACKUP TABLE test.table TO '{backup_name}'") + + instance.query("INSERT INTO test.table VALUES (65, 'a'), (66, 'b')") + + assert instance.query("SELECT count(), sum(x) FROM test.table") == "102\t5081\n" + instance.query(f"BACKUP TABLE test.table TO '{incremental_backup_name}' SETTINGS base_backup = '{backup_name}'") + + instance.query(f"RESTORE TABLE test.table AS test.table2 FROM '{incremental_backup_name}'") + assert instance.query("SELECT count(), sum(x) FROM test.table2") == "102\t5081\n" + + +def test_backup_not_found_or_already_exists(): + backup_name = new_backup_name() + + expected_error = "Backup .* not found" + assert re.search(expected_error, instance.query_and_get_error(f"RESTORE TABLE test.table AS test.table2 FROM '{backup_name}'")) + + create_and_fill_table() + instance.query(f"BACKUP TABLE test.table TO '{backup_name}'") + + expected_error = "Backup .* already exists" + assert re.search(expected_error, instance.query_and_get_error(f"BACKUP TABLE test.table TO '{backup_name}'")) diff --git a/tests/queries/0_stateless/01544_errorCodeToName.sql b/tests/queries/0_stateless/01544_errorCodeToName.sql index aa32270f00b..3a925bcea90 100644 --- a/tests/queries/0_stateless/01544_errorCodeToName.sql +++ b/tests/queries/0_stateless/01544_errorCodeToName.sql @@ -1,5 +1,5 @@ SELECT errorCodeToName(toUInt32(-1)); SELECT errorCodeToName(-1); -SELECT errorCodeToName(600); /* gap in error codes */ +SELECT errorCodeToName(950); /* gap in error codes */ SELECT errorCodeToName(0); SELECT errorCodeToName(1);