Merge pull request #21945 from vitlibar/backup

Add new commands BACKUP and RESTORE (part 1)
This commit is contained in:
Vitaly Baranov 2021-08-24 08:28:07 +03:00 committed by GitHub
commit 355cc09004
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
76 changed files with 3524 additions and 31 deletions

28
base/common/insertAtEnd.h Normal file
View File

@ -0,0 +1,28 @@
#pragma once
#include <vector>
/// Appends a specified vector with elements of another vector.
template <typename T>
void insertAtEnd(std::vector<T> & dest, const std::vector<T> & src)
{
if (src.empty())
return;
dest.reserve(dest.size() + src.size());
dest.insert(dest.end(), src.begin(), src.end());
}
template <typename T>
void insertAtEnd(std::vector<T> & dest, std::vector<T> && src)
{
if (src.empty())
return;
if (dest.empty())
{
dest.swap(src);
return;
}
dest.reserve(dest.size() + src.size());
dest.insert(dest.end(), std::make_move_iterator(src.begin()), std::make_move_iterator(src.end()));
src.clear();
}

View File

@ -9,6 +9,7 @@
#include <cmath>
#include <cfloat>
#include <cassert>
#include <tuple>
#include <limits>
@ -39,6 +40,18 @@ static constexpr bool IntegralConcept() noexcept
return std::is_integral_v<T> || IsWideInteger<T>::value;
}
template <typename T>
class IsTupleLike
{
template <typename U>
static auto check(U * p) -> decltype(std::tuple_size<U>::value, int());
template <typename>
static void check(...);
public:
static constexpr const bool value = !std::is_void<decltype(check<T>(nullptr))>::value;
};
}
namespace std
@ -227,6 +240,19 @@ struct integer<Bits, Signed>::_impl
self.items[i] = 0;
}
template <typename TupleLike, size_t i = 0>
constexpr static void wide_integer_from_tuple_like(integer<Bits, Signed> & self, const TupleLike & tuple) noexcept
{
if constexpr (i < item_count)
{
if constexpr (i < std::tuple_size_v<TupleLike>)
self.items[i] = std::get<i>(tuple);
else
self.items[i] = 0;
wide_integer_from_tuple_like<TupleLike, i + 1>(self, tuple);
}
}
/**
* N.B. t is constructed from double, so max(t) = max(double) ~ 2^310
* the recursive call happens when t / 2^64 > 2^64, so there won't be more than 5 of them.
@ -966,6 +992,8 @@ constexpr integer<Bits, Signed>::integer(T rhs) noexcept
{
if constexpr (IsWideInteger<T>::value)
_impl::wide_integer_from_wide_integer(*this, rhs);
else if constexpr (IsTupleLike<T>::value)
_impl::wide_integer_from_tuple_like(*this, rhs);
else
_impl::wide_integer_from_builtin(*this, rhs);
}
@ -979,6 +1007,8 @@ constexpr integer<Bits, Signed>::integer(std::initializer_list<T> il) noexcept
{
if constexpr (IsWideInteger<T>::value)
_impl::wide_integer_from_wide_integer(*this, *il.begin());
else if constexpr (IsTupleLike<T>::value)
_impl::wide_integer_from_tuple_like(*this, *il.begin());
else
_impl::wide_integer_from_builtin(*this, *il.begin());
}
@ -1007,7 +1037,10 @@ template <size_t Bits, typename Signed>
template <typename T>
constexpr integer<Bits, Signed> & integer<Bits, Signed>::operator=(T rhs) noexcept
{
_impl::wide_integer_from_builtin(*this, rhs);
if constexpr (IsTupleLike<T>::value)
_impl::wide_integer_from_tuple_like(*this, rhs);
else
_impl::wide_integer_from_builtin(*this, rhs);
return *this;
}

View File

@ -736,6 +736,10 @@ if (ThreadFuzzer::instance().isEffective())
setupTmpPath(log, disk->getPath());
}
/// Storage keeping all the backups.
fs::create_directories(path / "backups");
global_context->setBackupsVolume(config().getString("backups_path", path / "backups"), config().getString("backups_policy", ""));
/** Directory with 'flags': files indicating temporary settings for the server set by system administrator.
* Flags may be cleared automatically after being applied by the server.
* Examples: do repair of local data; clone all replicated tables from replica.

View File

@ -0,0 +1,28 @@
#include <Backups/BackupEntryConcat.h>
#include <IO/ConcatReadBuffer.h>
namespace DB
{
BackupEntryConcat::BackupEntryConcat(
BackupEntryPtr first_source_,
BackupEntryPtr second_source_,
const std::optional<UInt128> & checksum_)
: first_source(std::move(first_source_))
, second_source(std::move(second_source_))
, checksum(checksum_)
{
}
UInt64 BackupEntryConcat::getSize() const
{
if (!size)
size = first_source->getSize() + second_source->getSize();
return *size;
}
std::unique_ptr<ReadBuffer> BackupEntryConcat::getReadBuffer() const
{
return std::make_unique<ConcatReadBuffer>(first_source->getReadBuffer(), second_source->getReadBuffer());
}
}

View File

@ -0,0 +1,30 @@
#pragma once
#include <Backups/IBackupEntry.h>
namespace DB
{
/// Concatenates data of two backup entries.
class BackupEntryConcat : public IBackupEntry
{
public:
/// The constructor is allowed to not set `checksum_`, in that case it will be calculated from the data.
BackupEntryConcat(
BackupEntryPtr first_source_,
BackupEntryPtr second_source_,
const std::optional<UInt128> & checksum_ = {});
UInt64 getSize() const override;
std::optional<UInt128> getChecksum() const override { return checksum; }
std::unique_ptr<ReadBuffer> getReadBuffer() const override;
private:
BackupEntryPtr first_source;
BackupEntryPtr second_source;
mutable std::optional<UInt64> size;
std::optional<UInt128> checksum;
};
}

View File

@ -0,0 +1,35 @@
#include <Backups/BackupEntryFromAppendOnlyFile.h>
#include <IO/LimitReadBuffer.h>
namespace DB
{
BackupEntryFromAppendOnlyFile::BackupEntryFromAppendOnlyFile(
const String & file_path_,
const std::optional<UInt64> & file_size_,
const std::optional<UInt128> & checksum_,
const std::shared_ptr<Poco::TemporaryFile> & temporary_file_)
: BackupEntryFromImmutableFile(file_path_, file_size_, checksum_, temporary_file_)
, limit(BackupEntryFromImmutableFile::getSize())
{
}
BackupEntryFromAppendOnlyFile::BackupEntryFromAppendOnlyFile(
const DiskPtr & disk_,
const String & file_path_,
const std::optional<UInt64> & file_size_,
const std::optional<UInt128> & checksum_,
const std::shared_ptr<TemporaryFileOnDisk> & temporary_file_)
: BackupEntryFromImmutableFile(disk_, file_path_, file_size_, checksum_, temporary_file_)
, limit(BackupEntryFromImmutableFile::getSize())
{
}
std::unique_ptr<ReadBuffer> BackupEntryFromAppendOnlyFile::getReadBuffer() const
{
auto buf = BackupEntryFromImmutableFile::getReadBuffer();
return std::make_unique<LimitReadBuffer>(std::move(buf), limit, true);
}
}

View File

@ -0,0 +1,35 @@
#pragma once
#include <Backups/BackupEntryFromImmutableFile.h>
namespace DB
{
/// Represents a file prepared to be included in a backup, assuming that until this backup entry is destroyed
/// the file can be appended with new data, but the bytes which are already in the file won't be changed.
class BackupEntryFromAppendOnlyFile : public BackupEntryFromImmutableFile
{
public:
/// The constructor is allowed to not set `file_size_` or `checksum_`, in that case it will be calculated from the data.
BackupEntryFromAppendOnlyFile(
const String & file_path_,
const std::optional<UInt64> & file_size_ = {},
const std::optional<UInt128> & checksum_ = {},
const std::shared_ptr<Poco::TemporaryFile> & temporary_file_ = {});
BackupEntryFromAppendOnlyFile(
const DiskPtr & disk_,
const String & file_path_,
const std::optional<UInt64> & file_size_ = {},
const std::optional<UInt128> & checksum_ = {},
const std::shared_ptr<TemporaryFileOnDisk> & temporary_file_ = {});
UInt64 getSize() const override { return limit; }
std::unique_ptr<ReadBuffer> getReadBuffer() const override;
private:
const UInt64 limit;
};
}

View File

@ -0,0 +1,47 @@
#include <Backups/BackupEntryFromImmutableFile.h>
#include <Disks/IDisk.h>
#include <IO/createReadBufferFromFileBase.h>
#include <Poco/File.h>
namespace DB
{
BackupEntryFromImmutableFile::BackupEntryFromImmutableFile(
const String & file_path_,
const std::optional<UInt64> & file_size_,
const std::optional<UInt128> & checksum_,
const std::shared_ptr<Poco::TemporaryFile> & temporary_file_)
: file_path(file_path_), file_size(file_size_), checksum(checksum_), temporary_file(temporary_file_)
{
}
BackupEntryFromImmutableFile::BackupEntryFromImmutableFile(
const DiskPtr & disk_,
const String & file_path_,
const std::optional<UInt64> & file_size_,
const std::optional<UInt128> & checksum_,
const std::shared_ptr<TemporaryFileOnDisk> & temporary_file_)
: disk(disk_), file_path(file_path_), file_size(file_size_), checksum(checksum_), temporary_file_on_disk(temporary_file_)
{
}
BackupEntryFromImmutableFile::~BackupEntryFromImmutableFile() = default;
UInt64 BackupEntryFromImmutableFile::getSize() const
{
std::lock_guard lock{get_file_size_mutex};
if (!file_size)
file_size = disk ? disk->getFileSize(file_path) : Poco::File(file_path).getSize();
return *file_size;
}
std::unique_ptr<ReadBuffer> BackupEntryFromImmutableFile::getReadBuffer() const
{
if (disk)
return disk->readFile(file_path);
else
return createReadBufferFromFileBase(file_path, 0, 0, 0, nullptr);
}
}

View File

@ -0,0 +1,51 @@
#pragma once
#include <Backups/IBackupEntry.h>
#include <mutex>
namespace Poco { class TemporaryFile; }
namespace DB
{
class TemporaryFileOnDisk;
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
/// Represents a file prepared to be included in a backup, assuming that until this backup entry is destroyed the file won't be changed.
class BackupEntryFromImmutableFile : public IBackupEntry
{
public:
/// The constructor is allowed to not set `file_size_` or `checksum_`, in that case it will be calculated from the data.
BackupEntryFromImmutableFile(
const String & file_path_,
const std::optional<UInt64> & file_size_ = {},
const std::optional<UInt128> & checksum_ = {},
const std::shared_ptr<Poco::TemporaryFile> & temporary_file_ = {});
BackupEntryFromImmutableFile(
const DiskPtr & disk_,
const String & file_path_,
const std::optional<UInt64> & file_size_ = {},
const std::optional<UInt128> & checksum_ = {},
const std::shared_ptr<TemporaryFileOnDisk> & temporary_file_ = {});
~BackupEntryFromImmutableFile() override;
UInt64 getSize() const override;
std::optional<UInt128> getChecksum() const override { return checksum; }
std::unique_ptr<ReadBuffer> getReadBuffer() const override;
String getFilePath() const { return file_path; }
DiskPtr getDisk() const { return disk; }
private:
const DiskPtr disk;
const String file_path;
mutable std::optional<UInt64> file_size;
mutable std::mutex get_file_size_mutex;
const std::optional<UInt128> checksum;
const std::shared_ptr<Poco::TemporaryFile> temporary_file;
const std::shared_ptr<TemporaryFileOnDisk> temporary_file_on_disk;
};
}

View File

@ -0,0 +1,23 @@
#include <Backups/BackupEntryFromMemory.h>
#include <IO/ReadBufferFromString.h>
namespace DB
{
BackupEntryFromMemory::BackupEntryFromMemory(const void * data_, size_t size_, const std::optional<UInt128> & checksum_)
: BackupEntryFromMemory(String{reinterpret_cast<const char *>(data_), size_}, checksum_)
{
}
BackupEntryFromMemory::BackupEntryFromMemory(String data_, const std::optional<UInt128> & checksum_)
: data(std::move(data_)), checksum(checksum_)
{
}
std::unique_ptr<ReadBuffer> BackupEntryFromMemory::getReadBuffer() const
{
return std::make_unique<ReadBufferFromString>(data);
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <Backups/IBackupEntry.h>
#include <IO/ReadBufferFromString.h>
namespace DB
{
/// Represents small preloaded data to be included in a backup.
class BackupEntryFromMemory : public IBackupEntry
{
public:
/// The constructor is allowed to not set `checksum_`, in that case it will be calculated from the data.
BackupEntryFromMemory(const void * data_, size_t size_, const std::optional<UInt128> & checksum_ = {});
BackupEntryFromMemory(String data_, const std::optional<UInt128> & checksum_ = {});
UInt64 getSize() const override { return data.size(); }
std::optional<UInt128> getChecksum() const override { return checksum; }
std::unique_ptr<ReadBuffer> getReadBuffer() const override;
private:
const String data;
const std::optional<UInt128> checksum;
};
}

View File

@ -0,0 +1,39 @@
#include <Backups/BackupEntryFromSmallFile.h>
#include <Disks/IDisk.h>
#include <IO/createReadBufferFromFileBase.h>
#include <IO/ReadHelpers.h>
namespace DB
{
namespace
{
String readFile(const String & file_path)
{
auto buf = createReadBufferFromFileBase(file_path, 0, 0, 0, nullptr);
String s;
readStringUntilEOF(s, *buf);
return s;
}
String readFile(const DiskPtr & disk, const String & file_path)
{
auto buf = disk->readFile(file_path);
String s;
readStringUntilEOF(s, *buf);
return s;
}
}
BackupEntryFromSmallFile::BackupEntryFromSmallFile(const String & file_path_, const std::optional<UInt128> & checksum_)
: BackupEntryFromMemory(readFile(file_path_), checksum_), file_path(file_path_)
{
}
BackupEntryFromSmallFile::BackupEntryFromSmallFile(
const DiskPtr & disk_, const String & file_path_, const std::optional<UInt128> & checksum_)
: BackupEntryFromMemory(readFile(disk_, file_path_), checksum_), disk(disk_), file_path(file_path_)
{
}
}

View File

@ -0,0 +1,34 @@
#pragma once
#include <Backups/BackupEntryFromMemory.h>
namespace DB
{
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
/// Represents a file prepared to be included in a backup,
/// assuming that the file is small and can be easily loaded into memory.
class BackupEntryFromSmallFile : public BackupEntryFromMemory
{
public:
/// The constructor is allowed to not set `checksum_`, in that case it will be calculated from the data.
BackupEntryFromSmallFile(
const String & file_path_,
const std::optional<UInt128> & checksum_ = {});
BackupEntryFromSmallFile(
const DiskPtr & disk_,
const String & file_path_,
const std::optional<UInt128> & checksum_ = {});
String getFilePath() const { return file_path; }
DiskPtr getDisk() const { return disk; }
private:
const DiskPtr disk;
const String file_path;
};
}

View File

@ -0,0 +1,65 @@
#include <Backups/BackupFactory.h>
#include <Backups/BackupInDirectory.h>
#include <Interpreters/Context.h>
#include <Disks/IVolume.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BACKUP_NOT_FOUND;
extern const int BACKUP_ALREADY_EXISTS;
extern const int NOT_ENOUGH_SPACE;
extern const int LOGICAL_ERROR;
}
BackupFactory & BackupFactory::instance()
{
static BackupFactory the_instance;
return the_instance;
}
void BackupFactory::setBackupsVolume(VolumePtr backups_volume_)
{
backups_volume = backups_volume_;
}
BackupMutablePtr BackupFactory::createBackup(const String & backup_name, UInt64 estimated_backup_size, const BackupPtr & base_backup) const
{
if (!backups_volume)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No backups volume");
for (const auto & disk : backups_volume->getDisks())
{
if (disk->exists(backup_name))
throw Exception(ErrorCodes::BACKUP_ALREADY_EXISTS, "Backup {} already exists", quoteString(backup_name));
}
auto reservation = backups_volume->reserve(estimated_backup_size);
if (!reservation)
throw Exception(
ErrorCodes::NOT_ENOUGH_SPACE,
"Couldn't reserve {} bytes of free space for new backup {}",
estimated_backup_size,
quoteString(backup_name));
return std::make_shared<BackupInDirectory>(IBackup::OpenMode::WRITE, reservation->getDisk(), backup_name, base_backup);
}
BackupPtr BackupFactory::openBackup(const String & backup_name, const BackupPtr & base_backup) const
{
if (!backups_volume)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No backups volume");
for (const auto & disk : backups_volume->getDisks())
{
if (disk->exists(backup_name))
return std::make_shared<BackupInDirectory>(IBackup::OpenMode::READ, disk, backup_name, base_backup);
}
throw Exception(ErrorCodes::BACKUP_NOT_FOUND, "Backup {} not found", quoteString(backup_name));
}
}

View File

@ -0,0 +1,38 @@
#pragma once
#include <Core/Types.h>
#include <boost/noncopyable.hpp>
#include <memory>
namespace DB
{
class IBackup;
using BackupPtr = std::shared_ptr<const IBackup>;
using BackupMutablePtr = std::shared_ptr<IBackup>;
class Context;
using ContextMutablePtr = std::shared_ptr<Context>;
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
/// Factory for implementations of the IBackup interface.
class BackupFactory : boost::noncopyable
{
public:
static BackupFactory & instance();
/// Must be called to initialize the backup factory.
void setBackupsVolume(VolumePtr backups_volume_);
/// Creates a new backup and open it for writing.
BackupMutablePtr createBackup(const String & backup_name, UInt64 estimated_backup_size, const BackupPtr & base_backup = {}) const;
/// Opens an existing backup for reading.
BackupPtr openBackup(const String & backup_name, const BackupPtr & base_backup = {}) const;
private:
VolumePtr backups_volume;
};
}

View File

@ -0,0 +1,454 @@
#include <Backups/BackupInDirectory.h>
#include <Backups/BackupFactory.h>
#include <Backups/BackupEntryConcat.h>
#include <Backups/BackupEntryFromImmutableFile.h>
#include <Backups/BackupEntryFromMemory.h>
#include <Backups/IBackupEntry.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/typeid_cast.h>
#include <Common/quoteString.h>
#include <Disks/DiskSelector.h>
#include <Disks/IDisk.h>
#include <IO/HashingReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadHelpers.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <boost/range/adaptor/map.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int BACKUP_NOT_FOUND;
extern const int BACKUP_ALREADY_EXISTS;
extern const int BACKUP_VERSION_NOT_SUPPORTED;
extern const int BACKUP_DAMAGED;
extern const int NO_BASE_BACKUP;
extern const int WRONG_BASE_BACKUP;
extern const int BACKUP_ENTRY_ALREADY_EXISTS;
extern const int BACKUP_ENTRY_NOT_FOUND;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
namespace
{
const UInt64 BACKUP_VERSION = 1;
}
BackupInDirectory::BackupInDirectory(OpenMode open_mode_, const DiskPtr & disk_, const String & path_, const std::shared_ptr<const IBackup> & base_backup_)
: open_mode(open_mode_), disk(disk_), path(path_), path_with_sep(path_), base_backup(base_backup_)
{
if (!path_with_sep.ends_with('/'))
path_with_sep += '/';
trimRight(path, '/');
open();
}
BackupInDirectory::~BackupInDirectory()
{
close();
}
void BackupInDirectory::open()
{
if (open_mode == OpenMode::WRITE)
{
if (disk->exists(path))
throw Exception(ErrorCodes::BACKUP_ALREADY_EXISTS, "Backup {} already exists", quoteString(path));
disk->createDirectories(path);
directory_was_created = true;
writePathToBaseBackup();
}
if (open_mode == OpenMode::READ)
{
if (!disk->isDirectory(path))
throw Exception(ErrorCodes::BACKUP_NOT_FOUND, "Backup {} not found", quoteString(path));
readContents();
readPathToBaseBackup();
}
}
void BackupInDirectory::close()
{
if (open_mode == OpenMode::WRITE)
{
if (!finalized && directory_was_created)
{
/// Creating of the backup wasn't finished correctly,
/// so the backup cannot be used and it's better to remove its files.
disk->removeRecursive(path);
}
}
}
void BackupInDirectory::writePathToBaseBackup()
{
String file_path = path_with_sep + ".base_backup";
if (!base_backup)
{
disk->removeFileIfExists(file_path);
return;
}
auto out = disk->writeFile(file_path);
writeString(base_backup->getPath(), *out);
}
void BackupInDirectory::readPathToBaseBackup()
{
if (base_backup)
return;
String file_path = path_with_sep + ".base_backup";
if (!disk->exists(file_path))
return;
auto in = disk->readFile(file_path);
String base_backup_path;
readStringUntilEOF(base_backup_path, *in);
if (base_backup_path.empty())
return;
base_backup = BackupFactory::instance().openBackup(base_backup_path);
}
void BackupInDirectory::writeContents()
{
auto out = disk->writeFile(path_with_sep + ".contents");
writeVarUInt(BACKUP_VERSION, *out);
writeVarUInt(infos.size(), *out);
for (const auto & [path_in_backup, info] : infos)
{
writeBinary(path_in_backup, *out);
writeVarUInt(info.size, *out);
if (info.size)
{
writeBinary(info.checksum, *out);
writeVarUInt(info.base_size, *out);
if (info.base_size && (info.base_size != info.size))
writeBinary(info.base_checksum, *out);
}
}
}
void BackupInDirectory::readContents()
{
auto in = disk->readFile(path_with_sep + ".contents");
UInt64 version;
readVarUInt(version, *in);
if (version != BACKUP_VERSION)
throw Exception(ErrorCodes::BACKUP_VERSION_NOT_SUPPORTED, "Backup {}: Version {} is not supported", quoteString(path), version);
size_t num_infos;
readVarUInt(num_infos, *in);
infos.clear();
for (size_t i = 0; i != num_infos; ++i)
{
String path_in_backup;
readBinary(path_in_backup, *in);
EntryInfo info;
readVarUInt(info.size, *in);
if (info.size)
{
readBinary(info.checksum, *in);
readVarUInt(info.base_size, *in);
if (info.base_size && (info.base_size != info.size))
readBinary(info.base_checksum, *in);
else if (info.base_size)
info.base_checksum = info.checksum;
}
infos.emplace(path_in_backup, info);
}
}
IBackup::OpenMode BackupInDirectory::getOpenMode() const
{
return open_mode;
}
String BackupInDirectory::getPath() const
{
return path;
}
Strings BackupInDirectory::list(const String & prefix, const String & terminator) const
{
if (!prefix.ends_with('/') && !prefix.empty())
throw Exception("prefix should end with '/'", ErrorCodes::BAD_ARGUMENTS);
std::lock_guard lock{mutex};
Strings elements;
for (auto it = infos.lower_bound(prefix); it != infos.end(); ++it)
{
const String & name = it->first;
if (!name.starts_with(prefix))
break;
size_t start_pos = prefix.length();
size_t end_pos = String::npos;
if (!terminator.empty())
end_pos = name.find(terminator, start_pos);
std::string_view new_element = std::string_view{name}.substr(start_pos, end_pos - start_pos);
if (!elements.empty() && (elements.back() == new_element))
continue;
elements.push_back(String{new_element});
}
return elements;
}
bool BackupInDirectory::exists(const String & name) const
{
std::lock_guard lock{mutex};
return infos.count(name) != 0;
}
size_t BackupInDirectory::getSize(const String & name) const
{
std::lock_guard lock{mutex};
auto it = infos.find(name);
if (it == infos.end())
throw Exception(
ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "Backup {}: Entry {} not found in the backup", quoteString(path), quoteString(name));
return it->second.size;
}
UInt128 BackupInDirectory::getChecksum(const String & name) const
{
std::lock_guard lock{mutex};
auto it = infos.find(name);
if (it == infos.end())
throw Exception(
ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "Backup {}: Entry {} not found in the backup", quoteString(path), quoteString(name));
return it->second.checksum;
}
BackupEntryPtr BackupInDirectory::read(const String & name) const
{
std::lock_guard lock{mutex};
auto it = infos.find(name);
if (it == infos.end())
throw Exception(
ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "Backup {}: Entry {} not found in the backup", quoteString(path), quoteString(name));
const auto & info = it->second;
if (!info.size)
{
/// Entry's data is empty.
return std::make_unique<BackupEntryFromMemory>(nullptr, 0, UInt128{0, 0});
}
if (!info.base_size)
{
/// Data goes completely from this backup, the base backup isn't used.
return std::make_unique<BackupEntryFromImmutableFile>(disk, path_with_sep + name, info.size, info.checksum);
}
if (info.size < info.base_size)
{
throw Exception(
ErrorCodes::BACKUP_DAMAGED,
"Backup {}: Entry {} has its data size less than in the base backup {}: {} < {}",
quoteString(path), quoteString(name), quoteString(base_backup->getPath()), info.size, info.base_size);
}
if (!base_backup)
{
throw Exception(
ErrorCodes::NO_BASE_BACKUP,
"Backup {}: Entry {} is marked to be read from a base backup, but there is no base backup specified",
quoteString(path), quoteString(name));
}
if (!base_backup->exists(name))
{
throw Exception(
ErrorCodes::WRONG_BASE_BACKUP,
"Backup {}: Entry {} is marked to be read from a base backup, but doesn't exist there",
quoteString(path), quoteString(name));
}
auto base_entry = base_backup->read(name);
auto base_size = base_entry->getSize();
if (base_size != info.base_size)
{
throw Exception(
ErrorCodes::WRONG_BASE_BACKUP,
"Backup {}: Entry {} has unexpected size in the base backup {}: {} (expected size: {})",
quoteString(path), quoteString(name), quoteString(base_backup->getPath()), base_size, info.base_size);
}
auto base_checksum = base_entry->getChecksum();
if (base_checksum && (*base_checksum != info.base_checksum))
{
throw Exception(
ErrorCodes::WRONG_BASE_BACKUP,
"Backup {}: Entry {} has unexpected checksum in the base backup {}",
quoteString(path), quoteString(name), quoteString(base_backup->getPath()));
}
if (info.size == info.base_size)
{
/// Data goes completely from the base backup (nothing goes from this backup).
return base_entry;
}
/// The beginning of the data goes from the base backup,
/// and the ending goes from this backup.
return std::make_unique<BackupEntryConcat>(
std::move(base_entry),
std::make_unique<BackupEntryFromImmutableFile>(disk, path_with_sep + name, info.size - info.base_size),
info.checksum);
}
void BackupInDirectory::write(const String & name, BackupEntryPtr entry)
{
std::lock_guard lock{mutex};
if (open_mode != OpenMode::WRITE)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal operation: Cannot write to a backup opened for reading");
if (infos.contains(name))
throw Exception(
ErrorCodes::BACKUP_ENTRY_ALREADY_EXISTS, "Backup {}: Entry {} already exists", quoteString(path), quoteString(name));
UInt64 size = entry->getSize();
std::optional<UInt128> checksum = entry->getChecksum();
/// Check if the entry's data is empty.
if (!size)
{
infos.emplace(name, EntryInfo{});
return;
}
/// Check if a entry with such name exists in the base backup.
bool base_exists = (base_backup && base_backup->exists(name));
UInt64 base_size = 0;
UInt128 base_checksum{0, 0};
if (base_exists)
{
base_size = base_backup->getSize(name);
base_checksum = base_backup->getChecksum(name);
}
std::unique_ptr<ReadBuffer> read_buffer; /// We'll set that later.
UInt64 read_pos = 0; /// Current position in read_buffer.
/// Determine whether it's possible to receive this entry's data from the base backup completely or partly.
bool use_base = false;
if (base_exists && base_size)
{
if (size == base_size)
{
/// The size is the same, we need to compare checksums to find out
/// if the entry's data has not been changed since the base backup.
if (!checksum)
{
read_buffer = entry->getReadBuffer();
HashingReadBuffer hashing_read_buffer{*read_buffer};
hashing_read_buffer.ignore(size);
read_pos = size;
checksum = hashing_read_buffer.getHash();
}
if (checksum == base_checksum)
use_base = true; /// The data has not been changed.
}
else if (size > base_size)
{
/// The size has been increased, we need to calculate a partial checksum to find out
/// if the entry's data has been only appended since the base backup.
read_buffer = entry->getReadBuffer();
HashingReadBuffer hashing_read_buffer{*read_buffer};
hashing_read_buffer.ignore(base_size);
UInt128 partial_checksum = hashing_read_buffer.getHash();
read_pos = base_size;
if (!checksum)
{
hashing_read_buffer.ignore(size - base_size);
checksum = hashing_read_buffer.getHash();
read_pos = size;
}
if (partial_checksum == base_checksum)
use_base = true; /// The data has been appended.
}
}
if (use_base && (size == base_size))
{
/// The entry's data has not been changed since the base backup.
EntryInfo info;
info.size = base_size;
info.checksum = base_checksum;
info.base_size = base_size;
info.base_checksum = base_checksum;
infos.emplace(name, info);
return;
}
{
/// Either the entry wasn't exist in the base backup
/// or the entry has data appended to the end of the data from the base backup.
/// In both those cases we have to copy data to this backup.
/// Find out where the start position to copy data is.
auto copy_pos = use_base ? base_size : 0;
/// Move the current read position to the start position to copy data.
/// If `read_buffer` is seekable it's easier, otherwise we can use ignore().
if ((read_pos > copy_pos) && !typeid_cast<SeekableReadBuffer *>(read_buffer.get()))
{
read_buffer.reset();
read_pos = 0;
}
if (!read_buffer)
read_buffer = entry->getReadBuffer();
if (read_pos != copy_pos)
{
if (auto * seekable_buffer = typeid_cast<SeekableReadBuffer *>(read_buffer.get()))
seekable_buffer->seek(copy_pos, SEEK_SET);
else if (copy_pos)
read_buffer->ignore(copy_pos - read_pos);
}
/// If we haven't received or calculated a checksum yet, calculate it now.
ReadBuffer * maybe_hashing_read_buffer = read_buffer.get();
std::optional<HashingReadBuffer> hashing_read_buffer;
if (!checksum)
maybe_hashing_read_buffer = &hashing_read_buffer.emplace(*read_buffer);
/// Copy the entry's data after `copy_pos`.
String out_file_path = path_with_sep + name;
disk->createDirectories(directoryPath(out_file_path));
auto out = disk->writeFile(out_file_path);
copyData(*maybe_hashing_read_buffer, *out, size - copy_pos);
if (hashing_read_buffer)
checksum = hashing_read_buffer->getHash();
/// Done!
EntryInfo info;
info.size = size;
info.checksum = *checksum;
if (use_base)
{
info.base_size = base_size;
info.base_checksum = base_checksum;
}
infos.emplace(name, info);
}
}
void BackupInDirectory::finalizeWriting()
{
if (open_mode != OpenMode::WRITE)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal operation: Cannot write to a backup opened for reading");
writeContents();
finalized = true;
}
}

View File

@ -0,0 +1,66 @@
#pragma once
#include <Backups/IBackup.h>
#include <map>
#include <mutex>
namespace DB
{
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
/// Represents a backup stored on a disk.
/// A backup is stored as a directory, each entry is stored as a file in that directory.
/// Also three system files are stored:
/// 1) ".base" is an XML file with information about the base backup.
/// 2) ".contents" is a binary file containing a list of all entries along with their sizes
/// and checksums and information whether the base backup should be used for each entry
/// 3) ".write_lock" is a temporary empty file which is created before writing of a backup
/// and deleted after finishing that writing.
class BackupInDirectory : public IBackup
{
public:
BackupInDirectory(OpenMode open_mode_, const DiskPtr & disk_, const String & path_, const std::shared_ptr<const IBackup> & base_backup_ = {});
~BackupInDirectory() override;
OpenMode getOpenMode() const override;
String getPath() const override;
Strings list(const String & prefix, const String & terminator) const override;
bool exists(const String & name) const override;
size_t getSize(const String & name) const override;
UInt128 getChecksum(const String & name) const override;
BackupEntryPtr read(const String & name) const override;
void write(const String & name, BackupEntryPtr entry) override;
void finalizeWriting() override;
private:
void open();
void close();
void writePathToBaseBackup();
void readPathToBaseBackup();
void writeContents();
void readContents();
struct EntryInfo
{
UInt64 size = 0;
UInt128 checksum{0, 0};
/// for incremental backups
UInt64 base_size = 0;
UInt128 base_checksum{0, 0};
};
const OpenMode open_mode;
const DiskPtr disk;
String path;
String path_with_sep;
std::shared_ptr<const IBackup> base_backup;
std::map<String, EntryInfo> infos;
bool directory_was_created = false;
bool finalized = false;
mutable std::mutex mutex;
};
}

View File

@ -0,0 +1,89 @@
#include <Backups/BackupRenamingConfig.h>
#include <Parsers/ASTBackupQuery.h>
namespace DB
{
using Kind = ASTBackupQuery::Kind;
using ElementType = ASTBackupQuery::ElementType;
void BackupRenamingConfig::setNewTableName(const DatabaseAndTableName & old_table_name, const DatabaseAndTableName & new_table_name)
{
old_to_new_table_names[old_table_name] = new_table_name;
}
void BackupRenamingConfig::setNewDatabaseName(const String & old_database_name, const String & new_database_name)
{
old_to_new_database_names[old_database_name] = new_database_name;
}
void BackupRenamingConfig::setNewTemporaryTableName(const String & old_temporary_table_name, const String & new_temporary_table_name)
{
old_to_new_temporary_table_names[old_temporary_table_name] = new_temporary_table_name;
}
void BackupRenamingConfig::setFromBackupQuery(const ASTBackupQuery & backup_query)
{
setFromBackupQueryElements(backup_query.elements);
}
void BackupRenamingConfig::setFromBackupQueryElements(const ASTBackupQuery::Elements & backup_query_elements)
{
for (const auto & element : backup_query_elements)
{
switch (element.type)
{
case ElementType::TABLE: [[fallthrough]];
case ElementType::DICTIONARY:
{
const auto & new_name = element.new_name.second.empty() ? element.name : element.new_name;
setNewTableName(element.name, new_name);
break;
}
case ASTBackupQuery::DATABASE:
{
const auto & new_name = element.new_name.first.empty() ? element.name.first : element.new_name.first;
setNewDatabaseName(element.name.first, new_name);
break;
}
case ASTBackupQuery::TEMPORARY_TABLE:
{
const auto & new_name = element.new_name.second.empty() ? element.name.second : element.new_name.second;
setNewTemporaryTableName(element.name.second, new_name);
break;
}
case ASTBackupQuery::ALL_DATABASES: break;
case ASTBackupQuery::ALL_TEMPORARY_TABLES: break;
case ASTBackupQuery::EVERYTHING: break;
}
}
}
DatabaseAndTableName BackupRenamingConfig::getNewTableName(const DatabaseAndTableName & old_table_name) const
{
auto it = old_to_new_table_names.find(old_table_name);
if (it != old_to_new_table_names.end())
return it->second;
return {getNewDatabaseName(old_table_name.first), old_table_name.second};
}
const String & BackupRenamingConfig::getNewDatabaseName(const String & old_database_name) const
{
auto it = old_to_new_database_names.find(old_database_name);
if (it != old_to_new_database_names.end())
return it->second;
return old_database_name;
}
const String & BackupRenamingConfig::getNewTemporaryTableName(const String & old_temporary_table_name) const
{
auto it = old_to_new_temporary_table_names.find(old_temporary_table_name);
if (it != old_to_new_temporary_table_names.end())
return it->second;
return old_temporary_table_name;
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <Parsers/ASTBackupQuery.h>
#include <Core/Types.h>
#include <map>
#include <unordered_map>
namespace DB
{
using DatabaseAndTableName = std::pair<String, String>;
/// Keeps information about renamings of databases or tables being processed
/// while we're making a backup or while we're restoring from a backup.
class BackupRenamingConfig
{
public:
BackupRenamingConfig() = default;
void setNewTableName(const DatabaseAndTableName & old_table_name, const DatabaseAndTableName & new_table_name);
void setNewDatabaseName(const String & old_database_name, const String & new_database_name);
void setNewTemporaryTableName(const String & old_temporary_table_name, const String & new_temporary_table_name);
void setFromBackupQuery(const ASTBackupQuery & backup_query);
void setFromBackupQueryElements(const ASTBackupQuery::Elements & backup_query_elements);
/// Changes names according to the renaming.
DatabaseAndTableName getNewTableName(const DatabaseAndTableName & old_table_name) const;
const String & getNewDatabaseName(const String & old_database_name) const;
const String & getNewTemporaryTableName(const String & old_temporary_table_name) const;
private:
std::map<DatabaseAndTableName, DatabaseAndTableName> old_to_new_table_names;
std::unordered_map<String, String> old_to_new_database_names;
std::unordered_map<String, String> old_to_new_temporary_table_names;
};
using BackupRenamingConfigPtr = std::shared_ptr<const BackupRenamingConfig>;
}

View File

@ -0,0 +1,6 @@
#include <Backups/BackupSettings.h>
namespace DB
{
IMPLEMENT_SETTINGS_TRAITS(BackupSettingsTraits, LIST_OF_BACKUP_SETTINGS)
}

View File

@ -0,0 +1,16 @@
#pragma once
#include <Core/BaseSettings.h>
namespace DB
{
#define LIST_OF_BACKUP_SETTINGS(M) \
M(String, base_backup, "", "Name of the base backup. Only differences made after the base backup will be included in a newly created backup, so this option allows to make an incremental backup.", 0) \
DECLARE_SETTINGS_TRAITS_ALLOW_CUSTOM_SETTINGS(BackupSettingsTraits, LIST_OF_BACKUP_SETTINGS)
struct BackupSettings : public BaseSettings<BackupSettingsTraits> {};
}

830
src/Backups/BackupUtils.cpp Normal file
View File

@ -0,0 +1,830 @@
#include <Backups/BackupUtils.h>
#include <Backups/BackupEntryFromMemory.h>
#include <Backups/BackupRenamingConfig.h>
#include <Backups/IBackup.h>
#include <Backups/hasCompatibleDataToRestoreTable.h>
#include <Backups/renameInCreateQuery.h>
#include <Common/escapeForFileName.h>
#include <Databases/IDatabase.h>
#include <IO/ReadHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/formatAST.h>
#include <Storages/IStorage.h>
#include <common/insertAtEnd.h>
#include <boost/range/adaptor/reversed.hpp>
#include <filesystem>
namespace DB
{
namespace ErrorCodes
{
extern const int BACKUP_ELEMENT_DUPLICATE;
extern const int BACKUP_IS_EMPTY;
extern const int LOGICAL_ERROR;
extern const int TABLE_ALREADY_EXISTS;
extern const int CANNOT_RESTORE_TABLE;
}
namespace
{
using Kind = ASTBackupQuery::Kind;
using Element = ASTBackupQuery::Element;
using Elements = ASTBackupQuery::Elements;
using ElementType = ASTBackupQuery::ElementType;
/// Replace elements of types DICTIONARY or EVERYTHING with elements of other types.
void replaceElementTypesWithBaseElementTypes(Elements & elements)
{
for (size_t i = 0; i != elements.size(); ++i)
{
auto & element = elements[i];
switch (element.type)
{
case ElementType::DICTIONARY:
{
element.type = ElementType::TABLE;
break;
}
case ElementType::EVERYTHING:
{
element.type = ElementType::ALL_DATABASES;
auto & new_element = elements.emplace_back();
new_element.type = ElementType::ALL_TEMPORARY_TABLES;
break;
}
default:
break;
}
}
}
/// Replaces an empty database with the current database.
void replaceEmptyDatabaseWithCurrentDatabase(Elements & elements, const String & current_database)
{
for (auto & element : elements)
{
if (element.type == ElementType::TABLE)
{
if (element.name.first.empty() && !element.name.second.empty())
element.name.first = current_database;
if (element.new_name.first.empty() && !element.new_name.second.empty())
element.new_name.first = current_database;
}
}
}
/// Replaces elements of types TEMPORARY_TABLE or ALL_TEMPORARY_TABLES with elements of type TABLE or DATABASE.
void replaceTemporaryTablesWithTemporaryDatabase(Elements & elements)
{
for (size_t i = 0; i != elements.size(); ++i)
{
auto & element = elements[i];
switch (element.type)
{
case ElementType::TEMPORARY_TABLE:
{
element.type = ElementType::TABLE;
element.name.first = DatabaseCatalog::TEMPORARY_DATABASE;
if (element.new_name.first.empty() && !element.new_name.second.empty())
element.new_name.first = DatabaseCatalog::TEMPORARY_DATABASE;
break;
}
case ElementType::ALL_TEMPORARY_TABLES:
{
element.type = ElementType::DATABASE;
element.name.first = DatabaseCatalog::TEMPORARY_DATABASE;
break;
}
default:
break;
}
}
}
/// Set new names if they are not specified.
void setNewNamesIfNotSet(Elements & elements)
{
for (auto & element : elements)
{
switch (element.type)
{
case ElementType::TABLE:
{
if (element.new_name.second.empty())
element.new_name = element.name;
break;
}
case ElementType::DATABASE:
{
if (element.new_name.first.empty())
element.new_name = element.name;
break;
}
default:
break;
}
}
}
/// Removes duplications in the elements of a backup query by removing some excessive elements and by updating except_lists.
/// This function helps deduplicate elements in queries like "BACKUP ALL DATABASES, DATABASE xxx USING NAME yyy"
/// (we need a deduplication for that query because `ALL DATABASES` includes `xxx` however we don't want
/// to backup/restore the same database twice while executing the same query).
/// Also this function slightly reorders elements: it puts databases before tables and dictionaries they contain.
void deduplicateAndReorderElements(Elements & elements)
{
std::set<size_t> skip_indices; /// Indices of elements which should be removed in the end of this function.
size_t index_all_databases = static_cast<size_t>(-1); /// Index of the first element of type ALL_DATABASES or -1 if not found.
struct DatabaseInfo
{
size_t index = static_cast<size_t>(-1);
std::unordered_map<std::string_view, size_t> tables;
};
std::unordered_map<std::string_view, DatabaseInfo> databases; /// Found databases and tables.
for (size_t i = 0; i != elements.size(); ++i)
{
auto & element = elements[i];
switch (element.type)
{
case ElementType::TABLE:
{
auto & tables = databases.emplace(element.name.first, DatabaseInfo{}).first->second.tables;
auto it = tables.find(element.name.second);
if (it == tables.end())
{
tables.emplace(element.name.second, i);
}
else
{
size_t prev_index = it->second;
if ((elements[i].new_name == elements[prev_index].new_name)
&& (elements[i].partitions.empty() == elements[prev_index].partitions.empty()))
{
insertAtEnd(elements[prev_index].partitions, elements[i].partitions);
skip_indices.emplace(i);
}
else
{
throw Exception(
"Table " + backQuote(element.name.first) + "." + backQuote(element.name.second) + " was specified twice",
ErrorCodes::BACKUP_ELEMENT_DUPLICATE);
}
}
break;
}
case ElementType::DATABASE:
{
auto it = databases.find(element.name.first);
if (it == databases.end())
{
DatabaseInfo new_db_info;
new_db_info.index = i;
databases.emplace(element.name.first, new_db_info);
}
else if (it->second.index == static_cast<size_t>(-1))
{
it->second.index = i;
}
else
{
size_t prev_index = it->second.index;
if ((elements[i].new_name == elements[prev_index].new_name)
&& (elements[i].except_list == elements[prev_index].except_list))
{
skip_indices.emplace(i);
}
else
{
throw Exception("Database " + backQuote(element.name.first) + " was specified twice", ErrorCodes::BACKUP_ELEMENT_DUPLICATE);
}
}
break;
}
case ElementType::ALL_DATABASES:
{
if (index_all_databases == static_cast<size_t>(-1))
{
index_all_databases = i;
}
else
{
size_t prev_index = index_all_databases;
if (elements[i].except_list == elements[prev_index].except_list)
skip_indices.emplace(i);
else
throw Exception("The tag ALL DATABASES was specified twice", ErrorCodes::BACKUP_ELEMENT_DUPLICATE);
}
break;
}
default:
/// replaceElementTypesWithBaseElementTypes() and replaceTemporaryTablesWithTemporaryDatabase() should have removed all other element types.
throw Exception("Unexpected element type: " + std::to_string(static_cast<int>(element.type)), ErrorCodes::LOGICAL_ERROR);
}
}
if (index_all_databases != static_cast<size_t>(-1))
{
for (auto & [database_name, database] : databases)
{
elements[index_all_databases].except_list.emplace(database_name);
if (database.index == static_cast<size_t>(-1))
{
auto & new_element = elements.emplace_back();
new_element.type = ElementType::DATABASE;
new_element.name.first = database_name;
new_element.new_name = new_element.name;
database.index = elements.size() - 1;
}
}
}
for (auto & [database_name, database] : databases)
{
if (database.index == static_cast<size_t>(-1))
continue;
for (const auto & [table_name, table_index] : database.tables)
elements[database.index].except_list.emplace(table_name);
}
/// Reorder the elements: databases should be before tables and dictionaries they contain.
for (auto & [database_name, database] : databases)
{
if (database.index == static_cast<size_t>(-1))
continue;
size_t min_index = std::numeric_limits<size_t>::max();
auto min_index_it = database.tables.end();
for (auto it = database.tables.begin(); it != database.tables.end(); ++it)
{
if (min_index > it->second)
{
min_index = it->second;
min_index_it = it;
}
}
if (database.index > min_index)
{
std::swap(elements[database.index], elements[min_index]);
std::swap(database.index, min_index_it->second);
}
}
for (auto skip_index : skip_indices | boost::adaptors::reversed)
elements.erase(elements.begin() + skip_index);
}
Elements adjustElements(const Elements & elements, const String & current_database)
{
auto res = elements;
replaceElementTypesWithBaseElementTypes(res);
replaceEmptyDatabaseWithCurrentDatabase(res, current_database);
replaceTemporaryTablesWithTemporaryDatabase(res);
setNewNamesIfNotSet(res);
deduplicateAndReorderElements(res);
return res;
}
String getDataPathInBackup(const DatabaseAndTableName & table_name)
{
if (table_name.first.empty() || table_name.second.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database name and table name must not be empty");
assert(!table_name.first.empty() && !table_name.second.empty());
return String{"data/"} + escapeForFileName(table_name.first) + "/" + escapeForFileName(table_name.second) + "/";
}
String getDataPathInBackup(const IAST & create_query)
{
const auto & create = create_query.as<const ASTCreateQuery &>();
if (create.table.empty())
return {};
if (create.temporary)
return getDataPathInBackup({DatabaseCatalog::TEMPORARY_DATABASE, create.table});
return getDataPathInBackup({create.database, create.table});
}
String getMetadataPathInBackup(const DatabaseAndTableName & table_name)
{
if (table_name.first.empty() || table_name.second.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database name and table name must not be empty");
return String{"metadata/"} + escapeForFileName(table_name.first) + "/" + escapeForFileName(table_name.second) + ".sql";
}
String getMetadataPathInBackup(const String & database_name)
{
if (database_name.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database name must not be empty");
return String{"metadata/"} + escapeForFileName(database_name) + ".sql";
}
String getMetadataPathInBackup(const IAST & create_query)
{
const auto & create = create_query.as<const ASTCreateQuery &>();
if (create.table.empty())
return getMetadataPathInBackup(create.database);
if (create.temporary)
return getMetadataPathInBackup({DatabaseCatalog::TEMPORARY_DATABASE, create.table});
return getMetadataPathInBackup({create.database, create.table});
}
void backupCreateQuery(const IAST & create_query, BackupEntries & backup_entries)
{
auto metadata_entry = std::make_unique<BackupEntryFromMemory>(serializeAST(create_query));
String metadata_path = getMetadataPathInBackup(create_query);
backup_entries.emplace_back(metadata_path, std::move(metadata_entry));
}
void backupTable(
const DatabaseAndTable & database_and_table,
const String & table_name,
const ASTs & partitions,
const ContextPtr & context,
const BackupRenamingConfigPtr & renaming_config,
BackupEntries & backup_entries)
{
const auto & database = database_and_table.first;
const auto & storage = database_and_table.second;
context->checkAccess(AccessType::SELECT, database->getDatabaseName(), table_name);
auto create_query = database->getCreateTableQuery(table_name, context);
ASTPtr new_create_query = renameInCreateQuery(create_query, renaming_config, context);
backupCreateQuery(*new_create_query, backup_entries);
auto data_backup = storage->backup(partitions, context);
if (!data_backup.empty())
{
String data_path = getDataPathInBackup(*new_create_query);
for (auto & [path_in_backup, backup_entry] : data_backup)
backup_entries.emplace_back(data_path + path_in_backup, std::move(backup_entry));
}
}
void backupDatabase(
const DatabasePtr & database,
const std::set<String> & except_list,
const ContextPtr & context,
const BackupRenamingConfigPtr & renaming_config,
BackupEntries & backup_entries)
{
context->checkAccess(AccessType::SHOW_TABLES, database->getDatabaseName());
auto create_query = database->getCreateDatabaseQuery();
ASTPtr new_create_query = renameInCreateQuery(create_query, renaming_config, context);
backupCreateQuery(*new_create_query, backup_entries);
for (auto it = database->getTablesIteratorForBackup(context); it->isValid(); it->next())
{
if (except_list.contains(it->name()))
continue;
backupTable({database, it->table()}, it->name(), {}, context, renaming_config, backup_entries);
}
}
void backupAllDatabases(
const std::set<String> & except_list,
const ContextPtr & context,
const BackupRenamingConfigPtr & renaming_config,
BackupEntries & backup_entries)
{
for (const auto & [database_name, database] : DatabaseCatalog::instance().getDatabases())
{
if (except_list.contains(database_name))
continue;
if (database_name == DatabaseCatalog::SYSTEM_DATABASE || database_name == DatabaseCatalog::TEMPORARY_DATABASE)
continue;
backupDatabase(database, {}, context, renaming_config, backup_entries);
}
}
void makeDatabaseIfNotExists(const String & database_name, ContextMutablePtr context)
{
if (DatabaseCatalog::instance().isDatabaseExist(database_name))
return;
/// We create and execute `create` query for the database name.
auto create_query = std::make_shared<ASTCreateQuery>();
create_query->database = database_name;
create_query->if_not_exists = true;
InterpreterCreateQuery create_interpreter{create_query, context};
create_interpreter.execute();
}
ASTPtr readCreateQueryFromBackup(const DatabaseAndTableName & table_name, const BackupPtr & backup)
{
String create_query_path = getMetadataPathInBackup(table_name);
auto read_buffer = backup->read(create_query_path)->getReadBuffer();
String create_query_str;
readStringUntilEOF(create_query_str, *read_buffer);
read_buffer.reset();
ParserCreateQuery create_parser;
return parseQuery(create_parser, create_query_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
}
ASTPtr readCreateQueryFromBackup(const String & database_name, const BackupPtr & backup)
{
String create_query_path = getMetadataPathInBackup(database_name);
auto read_buffer = backup->read(create_query_path)->getReadBuffer();
String create_query_str;
readStringUntilEOF(create_query_str, *read_buffer);
read_buffer.reset();
ParserCreateQuery create_parser;
return parseQuery(create_parser, create_query_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
}
void restoreTable(
const DatabaseAndTableName & table_name,
const ASTs & partitions,
ContextMutablePtr context,
const BackupPtr & backup,
const BackupRenamingConfigPtr & renaming_config,
RestoreObjectsTasks & restore_tasks)
{
ASTPtr create_query = readCreateQueryFromBackup(table_name, backup);
auto new_create_query = typeid_cast<std::shared_ptr<ASTCreateQuery>>(renameInCreateQuery(create_query, renaming_config, context));
restore_tasks.emplace_back([table_name, new_create_query, partitions, context, backup]() -> RestoreDataTasks
{
DatabaseAndTableName new_table_name{new_create_query->database, new_create_query->table};
if (new_create_query->temporary)
new_table_name.first = DatabaseCatalog::TEMPORARY_DATABASE;
context->checkAccess(AccessType::INSERT, new_table_name.first, new_table_name.second);
StoragePtr storage;
for (size_t try_index = 0; try_index != 10; ++try_index)
{
if (DatabaseCatalog::instance().isTableExist({new_table_name.first, new_table_name.second}, context))
{
DatabasePtr existing_database;
StoragePtr existing_storage;
std::tie(existing_database, existing_storage) = DatabaseCatalog::instance().tryGetDatabaseAndTable({new_table_name.first, new_table_name.second}, context);
if (existing_storage)
{
if (auto existing_table_create_query = existing_database->tryGetCreateTableQuery(new_table_name.second, context))
{
if (hasCompatibleDataToRestoreTable(*new_create_query, existing_table_create_query->as<ASTCreateQuery &>()))
{
storage = existing_storage;
break;
}
else
{
String error_message = (new_table_name.first == DatabaseCatalog::TEMPORARY_DATABASE)
? ("Temporary table " + backQuoteIfNeed(new_table_name.second) + " already exists")
: ("Table " + backQuoteIfNeed(new_table_name.first) + "." + backQuoteIfNeed(new_table_name.second)
+ " already exists");
throw Exception(error_message, ErrorCodes::CANNOT_RESTORE_TABLE);
}
}
}
}
makeDatabaseIfNotExists(new_table_name.first, context);
try
{
InterpreterCreateQuery create_interpreter{new_create_query, context};
create_interpreter.execute();
}
catch (Exception & e)
{
if (e.code() != ErrorCodes::TABLE_ALREADY_EXISTS)
throw;
}
}
if (!storage)
{
String error_message = (new_table_name.first == DatabaseCatalog::TEMPORARY_DATABASE)
? ("Could not create temporary table " + backQuoteIfNeed(new_table_name.second) + " for restoring")
: ("Could not create table " + backQuoteIfNeed(new_table_name.first) + "." + backQuoteIfNeed(new_table_name.second)
+ " for restoring");
throw Exception(error_message, ErrorCodes::CANNOT_RESTORE_TABLE);
}
String data_path_in_backup = getDataPathInBackup(table_name);
RestoreDataTasks restore_data_tasks = storage->restoreFromBackup(backup, data_path_in_backup, partitions, context);
/// Keep `storage` alive while we're executing `restore_data_tasks`.
for (auto & restore_data_task : restore_data_tasks)
restore_data_task = [restore_data_task, storage]() { restore_data_task(); };
return restore_data_tasks;
});
}
void restoreDatabase(const String & database_name, const std::set<String> & except_list, ContextMutablePtr context, const BackupPtr & backup, const BackupRenamingConfigPtr & renaming_config, RestoreObjectsTasks & restore_tasks)
{
ASTPtr create_query = readCreateQueryFromBackup(database_name, backup);
auto new_create_query = typeid_cast<std::shared_ptr<ASTCreateQuery>>(renameInCreateQuery(create_query, renaming_config, context));
restore_tasks.emplace_back([database_name, new_create_query, except_list, context, backup, renaming_config]() -> RestoreDataTasks
{
const String & new_database_name = new_create_query->database;
context->checkAccess(AccessType::SHOW_TABLES, new_database_name);
if (!DatabaseCatalog::instance().isDatabaseExist(new_database_name))
{
/// We create and execute `create` query for the database name.
new_create_query->if_not_exists = true;
InterpreterCreateQuery create_interpreter{new_create_query, context};
create_interpreter.execute();
}
RestoreObjectsTasks restore_objects_tasks;
Strings table_names = backup->list("metadata/" + escapeForFileName(database_name) + "/", "/");
for (const String & table_name : table_names)
{
if (except_list.contains(table_name))
continue;
restoreTable({database_name, table_name}, {}, context, backup, renaming_config, restore_objects_tasks);
}
RestoreDataTasks restore_data_tasks;
for (auto & restore_object_task : restore_objects_tasks)
insertAtEnd(restore_data_tasks, std::move(restore_object_task)());
return restore_data_tasks;
});
}
void restoreAllDatabases(const std::set<String> & except_list, ContextMutablePtr context, const BackupPtr & backup, const BackupRenamingConfigPtr & renaming_config, RestoreObjectsTasks & restore_tasks)
{
restore_tasks.emplace_back([except_list, context, backup, renaming_config]() -> RestoreDataTasks
{
Strings database_names = backup->list("metadata/", "/");
RestoreObjectsTasks restore_objects_tasks;
for (const String & database_name : database_names)
{
if (except_list.contains(database_name))
continue;
restoreDatabase(database_name, {}, context, backup, renaming_config, restore_objects_tasks);
}
RestoreDataTasks restore_data_tasks;
for (auto & restore_object_task : restore_objects_tasks)
insertAtEnd(restore_data_tasks, std::move(restore_object_task)());
return restore_data_tasks;
});
}
}
BackupEntries makeBackupEntries(const Elements & elements, const ContextPtr & context)
{
BackupEntries backup_entries;
auto elements2 = adjustElements(elements, context->getCurrentDatabase());
auto renaming_config = std::make_shared<BackupRenamingConfig>();
renaming_config->setFromBackupQueryElements(elements2);
for (const auto & element : elements2)
{
switch (element.type)
{
case ElementType::TABLE:
{
const String & database_name = element.name.first;
const String & table_name = element.name.second;
auto [database, storage] = DatabaseCatalog::instance().getDatabaseAndTable({database_name, table_name}, context);
backupTable({database, storage}, table_name, element.partitions, context, renaming_config, backup_entries);
break;
}
case ElementType::DATABASE:
{
const String & database_name = element.name.first;
auto database = DatabaseCatalog::instance().getDatabase(database_name, context);
backupDatabase(database, element.except_list, context, renaming_config, backup_entries);
break;
}
case ElementType::ALL_DATABASES:
{
backupAllDatabases(element.except_list, context, renaming_config, backup_entries);
break;
}
default:
throw Exception("Unexpected element type", ErrorCodes::LOGICAL_ERROR); /// other element types have been removed in deduplicateElements()
}
}
/// A backup cannot be empty.
if (backup_entries.empty())
throw Exception("Backup must not be empty", ErrorCodes::BACKUP_IS_EMPTY);
/// Check that all backup entries are unique.
std::sort(
backup_entries.begin(),
backup_entries.end(),
[](const std::pair<String, std::unique_ptr<IBackupEntry>> & lhs, const std::pair<String, std::unique_ptr<IBackupEntry>> & rhs)
{
return lhs.first < rhs.first;
});
auto adjacent = std::adjacent_find(backup_entries.begin(), backup_entries.end());
if (adjacent != backup_entries.end())
throw Exception("Cannot write multiple entries with the same name " + quoteString(adjacent->first), ErrorCodes::BACKUP_ELEMENT_DUPLICATE);
return backup_entries;
}
UInt64 estimateBackupSize(const BackupEntries & backup_entries, const BackupPtr & base_backup)
{
UInt64 total_size = 0;
for (const auto & [name, entry] : backup_entries)
{
UInt64 data_size = entry->getSize();
if (base_backup)
{
if (base_backup->exists(name) && (data_size == base_backup->getSize(name)))
{
auto checksum = entry->getChecksum();
if (checksum && (*checksum == base_backup->getChecksum(name)))
continue;
}
}
total_size += data_size;
}
return total_size;
}
void writeBackupEntries(BackupMutablePtr backup, BackupEntries && backup_entries, size_t num_threads)
{
if (!num_threads)
num_threads = 1;
std::vector<ThreadFromGlobalPool> threads;
size_t num_active_threads = 0;
std::mutex mutex;
std::condition_variable cond;
std::exception_ptr exception;
for (auto & name_and_entry : backup_entries)
{
auto & name = name_and_entry.first;
auto & entry = name_and_entry.second;
{
std::unique_lock lock{mutex};
if (exception)
break;
cond.wait(lock, [&] { return num_active_threads < num_threads; });
if (exception)
break;
++num_active_threads;
}
threads.emplace_back([backup, &name, &entry, &mutex, &cond, &num_active_threads, &exception]()
{
try
{
backup->write(name, std::move(entry));
}
catch (...)
{
std::lock_guard lock{mutex};
if (!exception)
exception = std::current_exception();
}
{
std::lock_guard lock{mutex};
--num_active_threads;
cond.notify_all();
}
});
}
for (auto & thread : threads)
thread.join();
backup_entries.clear();
if (exception)
{
/// We don't call finalizeWriting() if an error occurs.
/// And IBackup's implementation should remove the backup in its destructor if finalizeWriting() hasn't called before.
std::rethrow_exception(exception);
}
backup->finalizeWriting();
}
RestoreObjectsTasks makeRestoreTasks(const Elements & elements, ContextMutablePtr context, const BackupPtr & backup)
{
RestoreObjectsTasks restore_tasks;
auto elements2 = adjustElements(elements, context->getCurrentDatabase());
auto renaming_config = std::make_shared<BackupRenamingConfig>();
renaming_config->setFromBackupQueryElements(elements2);
for (const auto & element : elements2)
{
switch (element.type)
{
case ElementType::TABLE:
{
const String & database_name = element.name.first;
const String & table_name = element.name.second;
restoreTable({database_name, table_name}, element.partitions, context, backup, renaming_config, restore_tasks);
break;
}
case ElementType::DATABASE:
{
const String & database_name = element.name.first;
auto database = DatabaseCatalog::instance().getDatabase(database_name, context);
restoreDatabase(database_name, element.except_list, context, backup, renaming_config, restore_tasks);
break;
}
case ElementType::ALL_DATABASES:
{
restoreAllDatabases(element.except_list, context, backup, renaming_config, restore_tasks);
break;
}
default:
throw Exception("Unexpected element type", ErrorCodes::LOGICAL_ERROR); /// other element types have been removed in deduplicateElements()
}
}
return restore_tasks;
}
void executeRestoreTasks(RestoreObjectsTasks && restore_tasks, size_t num_threads)
{
if (!num_threads)
num_threads = 1;
RestoreDataTasks restore_data_tasks;
for (auto & restore_object_task : restore_tasks)
insertAtEnd(restore_data_tasks, std::move(restore_object_task)());
restore_tasks.clear();
std::vector<ThreadFromGlobalPool> threads;
size_t num_active_threads = 0;
std::mutex mutex;
std::condition_variable cond;
std::exception_ptr exception;
for (auto & restore_data_task : restore_data_tasks)
{
{
std::unique_lock lock{mutex};
if (exception)
break;
cond.wait(lock, [&] { return num_active_threads < num_threads; });
if (exception)
break;
++num_active_threads;
}
threads.emplace_back([&restore_data_task, &mutex, &cond, &num_active_threads, &exception]() mutable
{
try
{
restore_data_task();
restore_data_task = {};
}
catch (...)
{
std::lock_guard lock{mutex};
if (!exception)
exception = std::current_exception();
}
{
std::lock_guard lock{mutex};
--num_active_threads;
cond.notify_all();
}
});
}
for (auto & thread : threads)
thread.join();
restore_data_tasks.clear();
if (exception)
std::rethrow_exception(exception);
}
}

39
src/Backups/BackupUtils.h Normal file
View File

@ -0,0 +1,39 @@
#pragma once
#include <Parsers/ASTBackupQuery.h>
namespace DB
{
class IBackup;
using BackupPtr = std::shared_ptr<const IBackup>;
using BackupMutablePtr = std::shared_ptr<IBackup>;
class IBackupEntry;
using BackupEntryPtr = std::unique_ptr<IBackupEntry>;
using BackupEntries = std::vector<std::pair<String, BackupEntryPtr>>;
using RestoreDataTask = std::function<void()>;
using RestoreDataTasks = std::vector<RestoreDataTask>;
using RestoreObjectTask = std::function<RestoreDataTasks()>;
using RestoreObjectsTasks = std::vector<RestoreObjectTask>;
class Context;
using ContextPtr = std::shared_ptr<const Context>;
using ContextMutablePtr = std::shared_ptr<Context>;
/// Prepares backup entries.
BackupEntries makeBackupEntries(const ASTBackupQuery::Elements & elements, const ContextPtr & context);
/// Estimate total size of the backup which would be written from the specified entries.
UInt64 estimateBackupSize(const BackupEntries & backup_entries, const BackupPtr & base_backup);
/// Write backup entries to an opened backup.
void writeBackupEntries(BackupMutablePtr backup, BackupEntries && backup_entries, size_t num_threads);
/// Prepare restore tasks.
RestoreObjectsTasks makeRestoreTasks(const ASTBackupQuery::Elements & elements, ContextMutablePtr context, const BackupPtr & backup);
/// Execute restore tasks.
void executeRestoreTasks(RestoreObjectsTasks && restore_tasks, size_t num_threads);
}

View File

65
src/Backups/IBackup.h Normal file
View File

@ -0,0 +1,65 @@
#pragma once
#include <Core/Types.h>
#include <memory>
namespace DB
{
class IBackupEntry;
using BackupEntryPtr = std::unique_ptr<IBackupEntry>;
/// Represents a backup, i.e. a storage of BackupEntries which can be accessed by their names.
/// A backup can be either incremental or non-incremental. An incremental backup doesn't store
/// the data of the entries which are not changed compared to its base backup.
class IBackup
{
public:
virtual ~IBackup() = default;
enum class OpenMode
{
READ,
WRITE,
};
/// A backup can be open either in READ or WRITE mode.
virtual OpenMode getOpenMode() const = 0;
/// Returns the path to the backup.
virtual String getPath() const = 0;
/// Returns names of entries stored in the backup.
/// If `prefix` isn't empty the function will return only the names starting with
/// the prefix (but without the prefix itself).
/// If the `terminator` isn't empty the function will returns only parts of the names
/// before the terminator. For example, list("", "") returns names of all the entries
/// in the backup; and list("data/", "/") return kind of a list of folders and
/// files stored in the "data/" directory inside the backup.
virtual Strings list(const String & prefix = "", const String & terminator = "/") const = 0;
/// Checks if an entry with a specified name exists.
virtual bool exists(const String & name) const = 0;
/// Returns the size of the entry's data.
/// This function does the same as `read(name)->getSize()` but faster.
virtual size_t getSize(const String & name) const = 0;
/// Returns the checksum of the entry's data.
/// This function does the same as `read(name)->getCheckum()` but faster.
virtual UInt128 getChecksum(const String & name) const = 0;
/// Reads an entry from the backup.
virtual BackupEntryPtr read(const String & name) const = 0;
/// Puts a new entry to the backup.
virtual void write(const String & name, BackupEntryPtr entry) = 0;
/// Finalizes writing the backup, should be called after all entries have been successfully written.
virtual void finalizeWriting() = 0;
};
using BackupPtr = std::shared_ptr<const IBackup>;
using BackupMutablePtr = std::shared_ptr<IBackup>;
}

View File

@ -0,0 +1,32 @@
#pragma once
#include <Core/Types.h>
#include <memory>
#include <optional>
#include <vector>
namespace DB
{
class ReadBuffer;
/// A backup entry represents some data which should be written to the backup or has been read from the backup.
class IBackupEntry
{
public:
virtual ~IBackupEntry() = default;
/// Returns the size of the data.
virtual UInt64 getSize() const = 0;
/// Returns the checksum of the data if it's precalculated.
/// Can return nullopt which means the checksum should be calculated from the read buffer.
virtual std::optional<UInt128> getChecksum() const { return {}; }
/// Returns a read buffer for reading the data.
virtual std::unique_ptr<ReadBuffer> getReadBuffer() const = 0;
};
using BackupEntryPtr = std::unique_ptr<IBackupEntry>;
using BackupEntries = std::vector<std::pair<String, BackupEntryPtr>>;
}

View File

@ -0,0 +1,22 @@
#include <Backups/hasCompatibleDataToRestoreTable.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/formatAST.h>
namespace DB
{
bool hasCompatibleDataToRestoreTable(const ASTCreateQuery & query1, const ASTCreateQuery & query2)
{
/// TODO: Write more subtle condition here.
auto q1 = typeid_cast<std::shared_ptr<ASTCreateQuery>>(query1.clone());
auto q2 = typeid_cast<std::shared_ptr<ASTCreateQuery>>(query2.clone());
/// Remove UUIDs.
q1->uuid = UUIDHelpers::Nil;
q2->uuid = UUIDHelpers::Nil;
return serializeAST(*q1) == serializeAST(*q2);
}
}

View File

@ -0,0 +1,11 @@
#pragma once
namespace DB
{
class ASTCreateQuery;
/// Whether the data of the first table can be inserted to the second table.
bool hasCompatibleDataToRestoreTable(const ASTCreateQuery & query1, const ASTCreateQuery & query2);
}

View File

@ -0,0 +1,276 @@
#include <Backups/renameInCreateQuery.h>
#include <Backups/BackupRenamingConfig.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/evaluateConstantExpression.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace
{
class RenameInCreateQueryTransformMatcher
{
public:
struct Data
{
BackupRenamingConfigPtr renaming_config;
ContextPtr context;
};
static bool needChildVisit(ASTPtr &, const ASTPtr &) { return true; }
static void visit(ASTPtr & ast, const Data & data)
{
if (auto * create = ast->as<ASTCreateQuery>())
visitCreateQuery(*create, data);
else if (auto * expr = ast->as<ASTTableExpression>())
visitTableExpression(*expr, data);
else if (auto * function = ast->as<ASTFunction>())
visitFunction(*function, data);
else if (auto * dictionary = ast->as<ASTDictionary>())
visitDictionary(*dictionary, data);
}
private:
/// Replaces names of tables and databases used in a CREATE query, which can be either CREATE TABLE or
/// CREATE DICTIONARY or CREATE VIEW or CREATE TEMPORARY TABLE or CREATE DATABASE query.
static void visitCreateQuery(ASTCreateQuery & create, const Data & data)
{
if (create.temporary)
{
if (create.table.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table name specified in the CREATE TEMPORARY TABLE query must not be empty");
create.table = data.renaming_config->getNewTemporaryTableName(create.table);
}
else if (create.table.empty())
{
if (create.database.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database name specified in the CREATE DATABASE query must not be empty");
create.database = data.renaming_config->getNewDatabaseName(create.database);
}
else
{
if (create.database.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Database name specified in the CREATE TABLE query must not be empty");
std::tie(create.database, create.table) = data.renaming_config->getNewTableName({create.database, create.table});
}
create.uuid = UUIDHelpers::Nil;
if (!create.as_table.empty() && !create.as_database.empty())
std::tie(create.as_database, create.as_table) = data.renaming_config->getNewTableName({create.as_database, create.as_table});
if (!create.to_table_id.table_name.empty() && !create.to_table_id.database_name.empty())
{
auto to_table = data.renaming_config->getNewTableName({create.to_table_id.database_name, create.to_table_id.table_name});
create.to_table_id = StorageID{to_table.first, to_table.second};
}
}
/// Replaces names of a database and a table in a expression like `db`.`table`
static void visitTableExpression(ASTTableExpression & expr, const Data & data)
{
if (!expr.database_and_table_name)
return;
ASTIdentifier * id = expr.database_and_table_name->as<ASTIdentifier>();
if (!id)
return;
auto table_id = id->createTable();
if (!table_id)
return;
const String & db_name = table_id->getDatabaseName();
const String & table_name = table_id->shortName();
if (db_name.empty() || table_name.empty())
return;
String new_db_name, new_table_name;
std::tie(new_db_name, new_table_name) = data.renaming_config->getNewTableName({db_name, table_name});
if ((new_db_name == db_name) && (new_table_name == table_name))
return;
expr.database_and_table_name = std::make_shared<ASTIdentifier>(Strings{new_db_name, new_table_name});
expr.children.push_back(expr.database_and_table_name);
}
/// Replaces names of tables and databases used in arguments of a table function or a table engine.
static void visitFunction(ASTFunction & function, const Data & data)
{
if ((function.name == "merge") || (function.name == "Merge"))
{
visitFunctionMerge(function, data);
}
else if ((function.name == "remote") || (function.name == "remoteSecure") || (function.name == "cluster") ||
(function.name == "clusterAllReplicas") || (function.name == "Distributed"))
{
visitFunctionRemote(function, data);
}
}
/// Replaces a database's name passed via an argument of the function merge() or the table engine Merge.
static void visitFunctionMerge(ASTFunction & function, const Data & data)
{
if (!function.arguments)
return;
/// The first argument is a database's name and we can rename it.
/// The second argument is a regular expression and we can do nothing about it.
auto & args = function.arguments->as<ASTExpressionList &>().children;
size_t db_name_arg_index = 0;
if (args.size() <= db_name_arg_index)
return;
String db_name = evaluateConstantExpressionForDatabaseName(args[db_name_arg_index], data.context)->as<ASTLiteral &>().value.safeGet<String>();
if (db_name.empty())
return;
String new_db_name = data.renaming_config->getNewDatabaseName(db_name);
if (new_db_name == db_name)
return;
args[db_name_arg_index] = std::make_shared<ASTLiteral>(new_db_name);
}
/// Replaces names of a table and a database passed via arguments of the function remote() or cluster() or the table engine Distributed.
static void visitFunctionRemote(ASTFunction & function, const Data & data)
{
if (!function.arguments)
return;
/// The first argument is an address or cluster's name, so we skip it.
/// The second argument can be either 'db.name' or just 'db' followed by the third argument 'table'.
auto & args = function.arguments->as<ASTExpressionList &>().children;
const auto * second_arg_as_function = args[1]->as<ASTFunction>();
if (second_arg_as_function && TableFunctionFactory::instance().isTableFunctionName(second_arg_as_function->name))
return;
size_t db_name_index = 1;
if (args.size() <= db_name_index)
return;
String db_name = evaluateConstantExpressionForDatabaseName(args[db_name_index], data.context)->as<ASTLiteral &>().value.safeGet<String>();
String table_name;
size_t table_name_index = static_cast<size_t>(-1);
size_t dot = String::npos;
if (function.name != "Distributed")
dot = db_name.find('.');
if (dot != String::npos)
{
table_name = db_name.substr(dot + 1);
db_name.resize(dot);
}
else
{
table_name_index = 2;
if (args.size() <= table_name_index)
return;
table_name = evaluateConstantExpressionForDatabaseName(args[table_name_index], data.context)->as<ASTLiteral &>().value.safeGet<String>();
}
if (db_name.empty() || table_name.empty())
return;
String new_db_name, new_table_name;
std::tie(new_db_name, new_table_name) = data.renaming_config->getNewTableName({db_name, table_name});
if ((new_db_name == db_name) && (new_table_name == table_name))
return;
if (table_name_index != static_cast<size_t>(-1))
{
if (new_db_name != db_name)
args[db_name_index] = std::make_shared<ASTLiteral>(new_db_name);
if (new_table_name != table_name)
args[table_name_index] = std::make_shared<ASTLiteral>(new_table_name);
}
else
{
args[db_name_index] = std::make_shared<ASTLiteral>(new_db_name);
args.insert(args.begin() + db_name_index + 1, std::make_shared<ASTLiteral>(new_table_name));
}
}
/// Replaces names of a table and a database used in source parameters of a dictionary.
static void visitDictionary(ASTDictionary & dictionary, const Data & data)
{
if (!dictionary.source || dictionary.source->name != "clickhouse" || !dictionary.source->elements)
return;
auto & elements = dictionary.source->elements->as<ASTExpressionList &>().children;
String db_name, table_name;
size_t db_name_index = static_cast<size_t>(-1);
size_t table_name_index = static_cast<size_t>(-1);
for (size_t i = 0; i != elements.size(); ++i)
{
auto & pair = elements[i]->as<ASTPair &>();
if (pair.first == "db")
{
if (db_name_index != static_cast<size_t>(-1))
return;
db_name = pair.second->as<ASTLiteral &>().value.safeGet<String>();
db_name_index = i;
}
else if (pair.first == "table")
{
if (table_name_index != static_cast<size_t>(-1))
return;
table_name = pair.second->as<ASTLiteral &>().value.safeGet<String>();
table_name_index = i;
}
}
if (db_name.empty() || table_name.empty())
return;
String new_db_name, new_table_name;
std::tie(new_db_name, new_table_name) = data.renaming_config->getNewTableName({db_name, table_name});
if ((new_db_name == db_name) && (new_table_name == table_name))
return;
if (new_db_name != db_name)
{
auto & pair = elements[db_name_index]->as<ASTPair &>();
pair.replace(pair.second, std::make_shared<ASTLiteral>(new_db_name));
}
if (new_table_name != table_name)
{
auto & pair = elements[table_name_index]->as<ASTPair &>();
pair.replace(pair.second, std::make_shared<ASTLiteral>(new_table_name));
}
}
};
using RenameInCreateQueryTransformVisitor = InDepthNodeVisitor<RenameInCreateQueryTransformMatcher, false>;
}
ASTPtr renameInCreateQuery(const ASTPtr & ast, const BackupRenamingConfigPtr & renaming_config, const ContextPtr & context)
{
auto new_ast = ast->clone();
try
{
RenameInCreateQueryTransformVisitor::Data data{renaming_config, context};
RenameInCreateQueryTransformVisitor{data}.visit(new_ast);
return new_ast;
}
catch (...)
{
tryLogCurrentException("Backup", "Error while renaming in AST");
return ast;
}
}
}

View File

@ -0,0 +1,16 @@
#pragma once
#include <memory>
namespace DB
{
class IAST;
using ASTPtr = std::shared_ptr<IAST>;
class Context;
using ContextPtr = std::shared_ptr<const Context>;
class BackupRenamingConfig;
using BackupRenamingConfigPtr = std::shared_ptr<const BackupRenamingConfig>;
/// Changes names in AST according to the renaming settings.
ASTPtr renameInCreateQuery(const ASTPtr & ast, const BackupRenamingConfigPtr & renaming_config, const ContextPtr & context);
}

View File

@ -45,6 +45,7 @@ if (COMPILER_GCC)
endif ()
add_subdirectory (Access)
add_subdirectory (Backups)
add_subdirectory (Columns)
add_subdirectory (Common)
add_subdirectory (Core)
@ -180,6 +181,7 @@ macro(add_object_library name common_path)
endmacro()
add_object_library(clickhouse_access Access)
add_object_library(clickhouse_backups Backups)
add_object_library(clickhouse_core Core)
add_object_library(clickhouse_core_mysql Core/MySQL)
add_object_library(clickhouse_compression Compression)

View File

@ -566,6 +566,17 @@
M(595, BZIP2_STREAM_ENCODER_FAILED) \
M(596, INTERSECT_OR_EXCEPT_RESULT_STRUCTURES_MISMATCH) \
M(597, NO_SUCH_ERROR_CODE) \
M(598, BACKUP_ALREADY_EXISTS) \
M(599, BACKUP_NOT_FOUND) \
M(600, BACKUP_VERSION_NOT_SUPPORTED) \
M(601, BACKUP_DAMAGED) \
M(602, NO_BASE_BACKUP) \
M(603, WRONG_BASE_BACKUP) \
M(604, BACKUP_ENTRY_ALREADY_EXISTS) \
M(605, BACKUP_ENTRY_NOT_FOUND) \
M(606, BACKUP_IS_EMPTY) \
M(607, BACKUP_ELEMENT_DUPLICATE) \
M(608, CANNOT_RESTORE_TABLE) \
\
M(998, POSTGRESQL_CONNECTION_FAILURE) \
M(999, KEEPER_EXCEPTION) \

View File

@ -276,14 +276,30 @@ inline void trimLeft(std::string_view & str, char c = ' ')
str.remove_prefix(1);
}
inline void trimLeft(std::string & str, char c = ' ')
{
str.erase(0, str.find_first_not_of(c));
}
inline void trimRight(std::string_view & str, char c = ' ')
{
while (str.ends_with(c))
str.remove_suffix(1);
}
inline void trimRight(std::string & str, char c = ' ')
{
str.erase(str.find_last_not_of(c) + 1);
}
inline void trim(std::string_view & str, char c = ' ')
{
trimLeft(str, c);
trimRight(str, c);
}
inline void trim(std::string & str, char c = ' ')
{
trimRight(str, c);
trimLeft(str, c);
}

View File

@ -354,6 +354,8 @@ class IColumn;
M(UInt64, max_network_bandwidth_for_user, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running user queries. Zero means unlimited.", 0)\
M(UInt64, max_network_bandwidth_for_all_users, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running queries. Zero means unlimited.", 0) \
\
M(UInt64, max_backup_threads, 0, "The maximum number of threads to execute a BACKUP or RESTORE request. By default, it is determined automatically.", 0) \
\
M(Bool, log_profile_events, true, "Log query performance statistics into the query_log, query_thread_log and query_views_log.", 0) \
M(Bool, log_query_settings, true, "Log query settings into the query_log.", 0) \
M(Bool, log_query_threads, true, "Log query threads into system.query_thread_log table. This setting have effect only when 'log_queries' is true.", 0) \

View File

@ -403,7 +403,7 @@ void DatabaseAtomic::assertCanBeDetached(bool cleanup)
}
DatabaseTablesIteratorPtr
DatabaseAtomic::getTablesIterator(ContextPtr local_context, const IDatabase::FilterByNameFunction & filter_by_table_name)
DatabaseAtomic::getTablesIterator(ContextPtr local_context, const IDatabase::FilterByNameFunction & filter_by_table_name) const
{
auto base_iter = DatabaseWithOwnTablesBase::getTablesIterator(local_context, filter_by_table_name);
return std::make_unique<AtomicDatabaseTablesSnapshotIterator>(std::move(typeid_cast<DatabaseTablesSnapshotIterator &>(*base_iter)));

View File

@ -45,7 +45,7 @@ public:
void drop(ContextPtr /*context*/) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach) override;

View File

@ -52,7 +52,7 @@ DatabaseDictionary::DatabaseDictionary(const String & name_, ContextPtr context_
{
}
Tables DatabaseDictionary::listTables(const FilterByNameFunction & filter_by_name)
Tables DatabaseDictionary::listTables(const FilterByNameFunction & filter_by_name) const
{
Tables tables;
auto load_results = getContext()->getExternalDictionariesLoader().getLoadResults(filter_by_name);
@ -77,7 +77,7 @@ StoragePtr DatabaseDictionary::tryGetTable(const String & table_name, ContextPtr
return createStorageDictionary(getDatabaseName(), load_result, getContext());
}
DatabaseTablesIteratorPtr DatabaseDictionary::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name)
DatabaseTablesIteratorPtr DatabaseDictionary::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name) const
{
return std::make_unique<DatabaseTablesSnapshotIterator>(listTables(filter_by_table_name), getDatabaseName());
}

View File

@ -34,7 +34,7 @@ public:
StoragePtr tryGetTable(const String & table_name, ContextPtr context) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
bool empty() const override;
@ -50,7 +50,7 @@ protected:
private:
Poco::Logger * log;
Tables listTables(const FilterByNameFunction & filter_by_name);
Tables listTables(const FilterByNameFunction & filter_by_name) const;
};
}

View File

@ -143,7 +143,7 @@ StoragePtr DatabaseLazy::tryGetTable(const String & table_name) const
return loadTable(table_name);
}
DatabaseTablesIteratorPtr DatabaseLazy::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name)
DatabaseTablesIteratorPtr DatabaseLazy::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name) const
{
std::lock_guard lock(mutex);
Strings filtered_tables;
@ -304,7 +304,7 @@ void DatabaseLazy::clearExpiredTables() const
}
DatabaseLazyIterator::DatabaseLazyIterator(DatabaseLazy & database_, Strings && table_names_)
DatabaseLazyIterator::DatabaseLazyIterator(const DatabaseLazy & database_, Strings && table_names_)
: IDatabaseTablesIterator(database_.database_name)
, database(database_)
, table_names(std::move(table_names_))

View File

@ -64,7 +64,7 @@ public:
bool empty() const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
void attachTable(const String & table_name, const StoragePtr & table, const String & relative_table_path) override;
@ -119,7 +119,7 @@ class DatabaseLazyIterator final : public IDatabaseTablesIterator
{
public:
DatabaseLazyIterator(
DatabaseLazy & database_,
const DatabaseLazy & database_,
Strings && table_names_);
void next() override;

View File

@ -11,7 +11,6 @@ namespace DB
{
class Context;
std::pair<String, StoragePtr> createTableFromAST(
ASTCreateQuery ast_create_query,
const String & database_name,

View File

@ -1,6 +1,7 @@
#include <Databases/DatabasesCommon.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/formatAST.h>
#include <Storages/StorageDictionary.h>
@ -40,7 +41,7 @@ StoragePtr DatabaseWithOwnTablesBase::tryGetTable(const String & table_name, Con
return {};
}
DatabaseTablesIteratorPtr DatabaseWithOwnTablesBase::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name)
DatabaseTablesIteratorPtr DatabaseWithOwnTablesBase::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name) const
{
std::lock_guard lock(mutex);
if (!filter_by_table_name)

View File

@ -29,7 +29,7 @@ public:
StoragePtr detachTable(const String & table_name) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
void shutdown() override;

View File

@ -137,7 +137,7 @@ public:
/// Get an iterator that allows you to pass through all the tables.
/// It is possible to have "hidden" tables that are not visible when passing through, but are visible if you get them by name using the functions above.
virtual DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name = {}) = 0;
virtual DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name = {}) const = 0;
/// Is the database empty.
virtual bool empty() const = 0;
@ -240,6 +240,12 @@ public:
throw Exception(getEngineName() + ": RENAME DATABASE is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
/// Whether the contained tables should be written to a backup.
virtual DatabaseTablesIteratorPtr getTablesIteratorForBackup(ContextPtr context) const
{
return getTablesIterator(context); /// By default we backup each table.
}
/// Returns path for persistent data storage if the database supports it, empty string otherwise
virtual String getDataPath() const { return {}; }

View File

@ -186,7 +186,7 @@ StoragePtr DatabaseMaterializedMySQL<Base>::tryGetTable(const String & name, Con
template <typename Base>
DatabaseTablesIteratorPtr
DatabaseMaterializedMySQL<Base>::getTablesIterator(ContextPtr context_, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name)
DatabaseMaterializedMySQL<Base>::getTablesIterator(ContextPtr context_, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
{
if (!MaterializedMySQLSyncThread::isMySQLSyncThread())
{

View File

@ -61,7 +61,7 @@ public:
StoragePtr tryGetTable(const String & name, ContextPtr context_) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context_, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context_, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;
void assertCalledFromSyncThreadOrDrop(const char * method) const;

View File

@ -84,7 +84,7 @@ bool DatabaseMySQL::empty() const
return true;
}
DatabaseTablesIteratorPtr DatabaseMySQL::getTablesIterator(ContextPtr local_context, const FilterByNameFunction & filter_by_table_name)
DatabaseTablesIteratorPtr DatabaseMySQL::getTablesIterator(ContextPtr local_context, const FilterByNameFunction & filter_by_table_name) const
{
Tables tables;
std::lock_guard<std::mutex> lock(mutex);

View File

@ -57,7 +57,7 @@ public:
bool empty() const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
ASTPtr getCreateDatabaseQuery() const override;

View File

@ -203,7 +203,7 @@ void DatabaseMaterializedPostgreSQL::drop(ContextPtr local_context)
DatabaseTablesIteratorPtr DatabaseMaterializedPostgreSQL::getTablesIterator(
ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name)
ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
{
/// Modify context into nested_context and pass query to Atomic database.
return DatabaseAtomic::getTablesIterator(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), filter_by_table_name);

View File

@ -46,7 +46,7 @@ public:
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach) override;
DatabaseTablesIteratorPtr getTablesIterator(
ContextPtr context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) override;
ContextPtr context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;

View File

@ -88,7 +88,7 @@ bool DatabasePostgreSQL::empty() const
}
DatabaseTablesIteratorPtr DatabasePostgreSQL::getTablesIterator(ContextPtr local_context, const FilterByNameFunction & /* filter_by_table_name */)
DatabaseTablesIteratorPtr DatabasePostgreSQL::getTablesIterator(ContextPtr local_context, const FilterByNameFunction & /* filter_by_table_name */) const
{
std::lock_guard<std::mutex> lock(mutex);

View File

@ -50,7 +50,7 @@ public:
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
bool isTableExist(const String & name, ContextPtr context) const override;
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;

View File

@ -44,7 +44,7 @@ bool DatabaseSQLite::empty() const
}
DatabaseTablesIteratorPtr DatabaseSQLite::getTablesIterator(ContextPtr local_context, const IDatabase::FilterByNameFunction &)
DatabaseTablesIteratorPtr DatabaseSQLite::getTablesIterator(ContextPtr local_context, const IDatabase::FilterByNameFunction &) const
{
std::lock_guard<std::mutex> lock(mutex);

View File

@ -34,7 +34,7 @@ public:
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
bool empty() const override;

View File

@ -0,0 +1,27 @@
#include <Disks/TemporaryFileOnDisk.h>
#include <Disks/IDisk.h>
#include <Poco/TemporaryFile.h>
namespace DB
{
TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix_)
: disk(disk_)
{
String dummy_prefix = "a/";
filepath = Poco::TemporaryFile::tempName(dummy_prefix);
dummy_prefix += "tmp";
assert(filepath.starts_with(dummy_prefix));
filepath.replace(0, dummy_prefix.length(), prefix_);
}
TemporaryFileOnDisk::~TemporaryFileOnDisk()
{
#if 1
if (disk && !filepath.empty())
disk->removeRecursive(filepath);
#endif
}
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <Core/Types.h>
#include <memory>
namespace DB
{
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
/// This class helps with the handling of temporary files or directories.
/// A unique name for the temporary file or directory is automatically chosen based on a specified prefix.
/// Optionally can create a directory in the constructor.
/// The destructor always removes the temporary file or directory with all contained files.
class TemporaryFileOnDisk
{
public:
TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix_ = "tmp");
~TemporaryFileOnDisk();
DiskPtr getDisk() const { return disk; }
const String & getPath() const { return filepath; }
private:
DiskPtr disk;
String filepath;
};
}

View File

@ -17,6 +17,7 @@ public:
protected:
ReadBuffers buffers;
bool own_buffers = false;
ReadBuffers::iterator current;
bool nextImpl() override
@ -61,7 +62,34 @@ public:
assert(!buffers.empty());
}
ConcatReadBuffer(ReadBuffer & buf1, ReadBuffer & buf2) : ConcatReadBuffer({&buf1, &buf2}) {}
ConcatReadBuffer(ReadBuffer & buf1, ReadBuffer & buf2) : ConcatReadBuffer(ReadBuffers{&buf1, &buf2}) {}
ConcatReadBuffer(std::vector<std::unique_ptr<ReadBuffer>> buffers_) : ReadBuffer(nullptr, 0)
{
own_buffers = true;
buffers.reserve(buffers_.size());
for (auto & buffer : buffers_)
buffers.emplace_back(buffer.release());
current = buffers.begin();
}
ConcatReadBuffer(std::unique_ptr<ReadBuffer> buf1, std::unique_ptr<ReadBuffer> buf2) : ReadBuffer(nullptr, 0)
{
own_buffers = true;
buffers.reserve(2);
buffers.emplace_back(buf1.release());
buffers.emplace_back(buf2.release());
current = buffers.begin();
}
~ConcatReadBuffer() override
{
if (own_buffers)
{
for (auto * buffer : buffers)
delete buffer;
}
}
};
}

View File

@ -46,6 +46,7 @@
#include <Access/SettingsConstraintsAndProfileIDs.h>
#include <Access/ExternalAuthenticators.h>
#include <Access/GSSAcceptor.h>
#include <Backups/BackupFactory.h>
#include <Dictionaries/Embedded/GeoDictionariesLoader.h>
#include <Interpreters/EmbeddedDictionaries.h>
#include <Interpreters/ExternalDictionariesLoader.h>
@ -165,6 +166,8 @@ struct ContextSharedPart
String tmp_path; /// Path to the temporary files that occur when processing the request.
mutable VolumePtr tmp_volume; /// Volume for the the temporary files that occur when processing the request.
mutable VolumePtr backups_volume; /// Volume for all the backups.
mutable std::optional<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
mutable std::optional<ExternalDictionariesLoader> external_dictionaries_loader;
mutable std::optional<ExternalModelsLoader> external_models_loader;
@ -520,6 +523,35 @@ VolumePtr Context::setTemporaryStorage(const String & path, const String & polic
return shared->tmp_volume;
}
void Context::setBackupsVolume(const String & path, const String & policy_name)
{
std::lock_guard lock(shared->storage_policies_mutex);
if (policy_name.empty())
{
String path_with_separator = path;
if (!path_with_separator.ends_with('/'))
path_with_separator += '/';
auto disk = std::make_shared<DiskLocal>("_backups_default", path_with_separator, 0);
shared->backups_volume = std::make_shared<SingleDiskVolume>("_backups_default", disk, 0);
}
else
{
StoragePolicyPtr policy = getStoragePolicySelector(lock)->get(policy_name);
if (policy->getVolumes().size() != 1)
throw Exception("Policy " + policy_name + " is used for backups, such policy should have exactly one volume",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
shared->backups_volume = policy->getVolume(0);
}
BackupFactory::instance().setBackupsVolume(shared->backups_volume);
}
VolumePtr Context::getBackupsVolume() const
{
std::lock_guard lock(shared->storage_policies_mutex);
return shared->backups_volume;
}
void Context::setFlagsPath(const String & path)
{
auto lock = getLock();

View File

@ -340,6 +340,9 @@ public:
VolumePtr setTemporaryStorage(const String & path, const String & policy_name = "");
void setBackupsVolume(const String & path, const String & policy_name = "");
VolumePtr getBackupsVolume() const;
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
/// Global application configuration settings.

View File

@ -0,0 +1,64 @@
#include <Interpreters/InterpreterBackupQuery.h>
#include <Backups/BackupFactory.h>
#include <Backups/BackupSettings.h>
#include <Backups/BackupUtils.h>
#include <Backups/IBackup.h>
#include <Backups/IBackupEntry.h>
#include <Parsers/ASTSetQuery.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace
{
BackupSettings getBackupSettings(const ASTBackupQuery & query)
{
BackupSettings settings;
if (query.settings)
settings.applyChanges(query.settings->as<const ASTSetQuery &>().changes);
return settings;
}
BackupPtr getBaseBackup(const BackupSettings & settings)
{
const String & base_backup_name = settings.base_backup;
if (base_backup_name.empty())
return nullptr;
return BackupFactory::instance().openBackup(base_backup_name);
}
void executeBackup(const ASTBackupQuery & query, const ContextPtr & context)
{
auto settings = getBackupSettings(query);
auto base_backup = getBaseBackup(settings);
auto backup_entries = makeBackupEntries(query.elements, context);
UInt64 estimated_backup_size = estimateBackupSize(backup_entries, base_backup);
auto backup = BackupFactory::instance().createBackup(query.backup_name, estimated_backup_size, base_backup);
writeBackupEntries(backup, std::move(backup_entries), context->getSettingsRef().max_backup_threads);
}
void executeRestore(const ASTBackupQuery & query, ContextMutablePtr context)
{
auto settings = getBackupSettings(query);
auto base_backup = getBaseBackup(settings);
auto backup = BackupFactory::instance().openBackup(query.backup_name, base_backup);
auto restore_tasks = makeRestoreTasks(query.elements, context, backup);
executeRestoreTasks(std::move(restore_tasks), context->getSettingsRef().max_backup_threads);
}
}
BlockIO InterpreterBackupQuery::execute()
{
const auto & query = query_ptr->as<const ASTBackupQuery &>();
if (query.kind == ASTBackupQuery::BACKUP)
executeBackup(query, context);
else if (query.kind == ASTBackupQuery::RESTORE)
executeRestore(query, context);
return {};
}
}

View File

@ -0,0 +1,20 @@
#pragma once
#include <Interpreters/IInterpreter.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
class InterpreterBackupQuery : public IInterpreter
{
public:
InterpreterBackupQuery(const ASTPtr & query_ptr_, ContextMutablePtr context_) : query_ptr(query_ptr_), context(context_) {}
BlockIO execute() override;
private:
ASTPtr query_ptr;
ContextMutablePtr context;
};
}

View File

@ -1,4 +1,5 @@
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTBackupQuery.h>
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTCreateQuotaQuery.h>
@ -33,6 +34,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/InterpreterBackupQuery.h>
#include <Interpreters/InterpreterCheckQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterCreateQuotaQuery.h>
@ -270,6 +272,10 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, ContextMut
{
return std::make_unique<InterpreterExternalDDLQuery>(query, context);
}
else if (query->as<ASTBackupQuery>())
{
return std::make_unique<InterpreterBackupQuery>(query, context);
}
else
{
throw Exception("Unknown type of query: " + query->getID(), ErrorCodes::UNKNOWN_TYPE_OF_QUERY);

View File

@ -0,0 +1,130 @@
#include <Parsers/ASTBackupQuery.h>
#include <IO/Operators.h>
#include <Common/quoteString.h>
namespace DB
{
namespace
{
using Kind = ASTBackupQuery::Kind;
using Element = ASTBackupQuery::Element;
using ElementType = ASTBackupQuery::ElementType;
void formatName(const DatabaseAndTableName & name, ElementType type, const IAST::FormatSettings & format)
{
switch (type)
{
case ElementType::TABLE: [[fallthrough]];
case ElementType::DICTIONARY:
{
format.ostr << " ";
if (!name.first.empty())
format.ostr << backQuoteIfNeed(name.first) << ".";
format.ostr << backQuoteIfNeed(name.second);
break;
}
case ElementType::DATABASE:
{
format.ostr << " " << backQuoteIfNeed(name.first);
break;
}
case ElementType::TEMPORARY_TABLE:
{
format.ostr << " " << backQuoteIfNeed(name.second);
break;
}
default:
break;
}
}
void formatPartitions(const ASTs & partitions, const IAST::FormatSettings & format)
{
if (partitions.empty())
return;
format.ostr << (format.hilite ? IAST::hilite_keyword : "") << " " << ((partitions.size() == 1) ? "PARTITION" : "PARTITIONS") << " "
<< (format.hilite ? IAST::hilite_none : "");
bool need_comma = false;
for (const auto & partition : partitions)
{
if (std::exchange(need_comma, true))
format.ostr << ",";
format.ostr << " ";
partition->format(format);
}
}
void formatElement(const Element & element, Kind kind, const IAST::FormatSettings & format)
{
format.ostr << (format.hilite ? IAST::hilite_keyword : "") << " ";
switch (element.type)
{
case ElementType::TABLE: format.ostr << "TABLE"; break;
case ElementType::DICTIONARY: format.ostr << "DICTIONARY"; break;
case ElementType::DATABASE: format.ostr << "DATABASE"; break;
case ElementType::ALL_DATABASES: format.ostr << "ALL DATABASES"; break;
case ElementType::TEMPORARY_TABLE: format.ostr << "TEMPORARY TABLE"; break;
case ElementType::ALL_TEMPORARY_TABLES: format.ostr << "ALL TEMPORARY TABLES"; break;
case ElementType::EVERYTHING: format.ostr << "EVERYTHING"; break;
}
format.ostr << (format.hilite ? IAST::hilite_none : "");
formatName(element.name, element.type, format);
bool under_another_name = !element.new_name.first.empty() || !element.new_name.second.empty();
if (under_another_name)
{
format.ostr << (format.hilite ? IAST::hilite_keyword : "") << " " << ((kind == Kind::BACKUP) ? "AS" : "INTO")
<< (format.hilite ? IAST::hilite_none : "");
formatName(element.new_name, element.type, format);
}
formatPartitions(element.partitions, format);
}
void formatElements(const std::vector<Element> & elements, Kind kind, const IAST::FormatSettings & format)
{
bool need_comma = false;
for (const auto & element : elements)
{
if (std::exchange(need_comma, true))
format.ostr << ",";
formatElement(element, kind, format);
}
}
void formatSettings(const IAST & settings, const IAST::FormatSettings & format)
{
format.ostr << (format.hilite ? IAST::hilite_keyword : "") << " SETTINGS " << (format.hilite ? IAST::hilite_none : "");
settings.format(format);
}
}
String ASTBackupQuery::getID(char) const
{
return (kind == Kind::BACKUP) ? "BackupQuery" : "RestoreQuery";
}
ASTPtr ASTBackupQuery::clone() const
{
return std::make_shared<ASTBackupQuery>(*this);
}
void ASTBackupQuery::formatImpl(const FormatSettings & format, FormatState &, FormatStateStacked) const
{
format.ostr << (format.hilite ? hilite_keyword : "") << ((kind == Kind::BACKUP) ? "BACKUP" : "RESTORE")
<< (format.hilite ? hilite_none : "");
formatElements(elements, kind, format);
if (settings)
formatSettings(*settings, format);
format.ostr << (format.hilite ? hilite_keyword : "") << ((kind == Kind::BACKUP) ? " TO" : " FROM") << (format.hilite ? hilite_none : "");
format.ostr << " " << quoteString(backup_name);
}
}

View File

@ -0,0 +1,87 @@
#pragma once
#include <Parsers/IAST.h>
namespace DB
{
using Strings = std::vector<String>;
using DatabaseAndTableName = std::pair<String, String>;
/** BACKUP { TABLE [db.]table_name [AS [db.]table_name_in_backup] [PARTITION[S] partition_expr [,...]] |
* DICTIONARY [db.]dictionary_name [AS [db.]dictionary_name_in_backup] |
* DATABASE database_name [AS database_name_in_backup] |
* ALL DATABASES |
* TEMPORARY TABLE table_name [AS table_name_in_backup]
* ALL TEMPORARY TABLES |
* EVERYTHING } [,...]
* TO 'backup_name'
* SETTINGS base_backup='base_backup_name'
*
* RESTORE { TABLE [db.]table_name_in_backup [INTO [db.]table_name] [PARTITION[S] partition_expr [,...]] |
* DICTIONARY [db.]dictionary_name_in_backup [INTO [db.]dictionary_name] |
* DATABASE database_name_in_backup [INTO database_name] |
* ALL DATABASES |
* TEMPORARY TABLE table_name_in_backup [INTO table_name] |
* ALL TEMPORARY TABLES |
* EVERYTHING } [,...]
* FROM 'backup_name'
*
* Notes:
* RESTORE doesn't drop any data, it either creates a table or appends an existing table with restored data.
* This behaviour can cause data duplication.
* If appending isn't possible because the existing table has incompatible format then RESTORE will throw an exception.
*
* The "UNDER NAME" clause is useful to backup or restore under another name.
* For the BACKUP command this clause allows to set the name which an object will have inside the backup.
* And for the RESTORE command this clause allows to set the name which an object will have after RESTORE has finished.
*
* "ALL DATABASES" means all databases except the system database and the internal database containing temporary tables.
* "EVERYTHING" works exactly as "ALL DATABASES, ALL TEMPORARY TABLES"
*
* The "WITH BASE" clause allows to set a base backup. Only differences made after the base backup will be
* included in a newly created backup, so this option allows to make an incremental backup.
*/
class ASTBackupQuery : public IAST
{
public:
enum Kind
{
BACKUP,
RESTORE,
};
Kind kind = Kind::BACKUP;
enum ElementType
{
TABLE,
DICTIONARY,
DATABASE,
ALL_DATABASES,
TEMPORARY_TABLE,
ALL_TEMPORARY_TABLES,
EVERYTHING,
};
struct Element
{
ElementType type;
DatabaseAndTableName name;
DatabaseAndTableName new_name;
ASTs partitions;
std::set<String> except_list;
};
using Elements = std::vector<Element>;
Elements elements;
String backup_name;
ASTPtr settings;
String getID(char) const override;
ASTPtr clone() const override;
void formatImpl(const FormatSettings & format, FormatState &, FormatStateStacked) const override;
};
}

View File

@ -0,0 +1,204 @@
#include <Parsers/ParserBackupQuery.h>
#include <Parsers/ASTBackupQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/ParserPartition.h>
#include <Parsers/ParserSetQuery.h>
#include <Parsers/parseDatabaseAndTableName.h>
namespace DB
{
namespace
{
using Kind = ASTBackupQuery::Kind;
using Element = ASTBackupQuery::Element;
using ElementType = ASTBackupQuery::ElementType;
bool parseName(IParser::Pos & pos, Expected & expected, ElementType type, DatabaseAndTableName & name)
{
switch (type)
{
case ElementType::TABLE: [[fallthrough]];
case ElementType::DICTIONARY:
{
return parseDatabaseAndTableName(pos, expected, name.first, name.second);
}
case ElementType::DATABASE:
{
ASTPtr ast;
if (!ParserIdentifier{}.parse(pos, ast, expected))
return false;
name.first = getIdentifierName(ast);
name.second.clear();
return true;
}
case ElementType::TEMPORARY_TABLE:
{
ASTPtr ast;
if (!ParserIdentifier{}.parse(pos, ast, expected))
return false;
name.second = getIdentifierName(ast);
name.first.clear();
return true;
}
default:
return true;
}
}
bool parsePartitions(IParser::Pos & pos, Expected & expected, ASTs & partitions)
{
if (!ParserKeyword{"PARTITION"}.ignore(pos, expected) && !ParserKeyword{"PARTITIONS"}.ignore(pos, expected))
return false;
ASTs result;
auto parse_list_element = [&]
{
ASTPtr ast;
if (!ParserPartition{}.parse(pos, ast, expected))
return false;
result.emplace_back(ast);
return true;
};
if (!ParserList::parseUtil(pos, expected, parse_list_element, false))
return false;
partitions = std::move(result);
return true;
}
bool parseElement(IParser::Pos & pos, Expected & expected, Element & entry)
{
return IParserBase::wrapParseImpl(pos, [&]
{
ElementType type;
if (ParserKeyword{"TABLE"}.ignore(pos, expected))
type = ElementType::TABLE;
else if (ParserKeyword{"DICTIONARY"}.ignore(pos, expected))
type = ElementType::DICTIONARY;
else if (ParserKeyword{"DATABASE"}.ignore(pos, expected))
type = ElementType::DATABASE;
else if (ParserKeyword{"ALL DATABASES"}.ignore(pos, expected))
type = ElementType::ALL_DATABASES;
else if (ParserKeyword{"TEMPORARY TABLE"}.ignore(pos, expected))
type = ElementType::TEMPORARY_TABLE;
else if (ParserKeyword{"ALL TEMPORARY TABLES"}.ignore(pos, expected))
type = ElementType::ALL_TEMPORARY_TABLES;
else if (ParserKeyword{"EVERYTHING"}.ignore(pos, expected))
type = ElementType::EVERYTHING;
else
return false;
DatabaseAndTableName name;
if (!parseName(pos, expected, type, name))
return false;
ASTs partitions;
if (type == ElementType::TABLE)
parsePartitions(pos, expected, partitions);
DatabaseAndTableName new_name;
if (ParserKeyword{"AS"}.ignore(pos, expected) || ParserKeyword{"INTO"}.ignore(pos, expected))
{
if (!parseName(pos, expected, type, new_name))
return false;
}
if ((type == ElementType::TABLE) && partitions.empty())
parsePartitions(pos, expected, partitions);
entry.type = type;
entry.name = std::move(name);
entry.new_name = std::move(new_name);
entry.partitions = std::move(partitions);
return true;
});
}
bool parseElements(IParser::Pos & pos, Expected & expected, std::vector<Element> & elements)
{
return IParserBase::wrapParseImpl(pos, [&]
{
std::vector<Element> result;
auto parse_element = [&]
{
Element element;
if (parseElement(pos, expected, element))
{
result.emplace_back(std::move(element));
return true;
}
return false;
};
if (!ParserList::parseUtil(pos, expected, parse_element, false))
return false;
elements = std::move(result);
return true;
});
}
bool parseSettings(IParser::Pos & pos, Expected & expected, ASTPtr & settings)
{
return IParserBase::wrapParseImpl(pos, [&]
{
if (!ParserKeyword{"SETTINGS"}.ignore(pos, expected))
return false;
ASTPtr result;
if (!ParserSetQuery{true}.parse(pos, result, expected))
return false;
settings = std::move(result);
return true;
});
}
}
bool ParserBackupQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
Kind kind;
if (ParserKeyword{"BACKUP"}.ignore(pos, expected))
kind = Kind::BACKUP;
else if (ParserKeyword{"RESTORE"}.ignore(pos, expected))
kind = Kind::RESTORE;
else
return false;
std::vector<Element> elements;
if (!parseElements(pos, expected, elements))
return false;
if (!ParserKeyword{(kind == Kind::BACKUP) ? "TO" : "FROM"}.ignore(pos, expected))
return false;
ASTPtr ast;
if (!ParserStringLiteral{}.parse(pos, ast, expected))
return false;
String backup_name = ast->as<ASTLiteral &>().value.safeGet<String>();
ASTPtr settings;
parseSettings(pos, expected, settings);
auto query = std::make_shared<ASTBackupQuery>();
node = query;
query->kind = kind;
query->elements = std::move(elements);
query->backup_name = std::move(backup_name);
query->settings = std::move(settings);
return true;
}
}

View File

@ -0,0 +1,34 @@
#pragma once
#include <Parsers/IParserBase.h>
namespace DB
{
/** Parses queries like
* BACKUP { TABLE [db.]table_name [AS [db.]table_name_in_backup] [PARTITION[S] partition_expr [,...]] |
* DICTIONARY [db.]dictionary_name [AS [db.]dictionary_name_in_backup] |
* DATABASE database_name [AS database_name_in_backup] |
* ALL DATABASES |
* TEMPORARY TABLE table_name [AS table_name_in_backup]
* ALL TEMPORARY TABLES |
* EVERYTHING } [,...]
* TO 'backup_name'
* [SETTINGS base_backup = 'base_backup_name']
*
* RESTORE { TABLE [db.]table_name_in_backup [INTO [db.]table_name] [PARTITION[S] partition_expr [,...]] |
* DICTIONARY [db.]dictionary_name_in_backup [INTO [db.]dictionary_name] |
* DATABASE database_name_in_backup [INTO database_name] |
* ALL DATABASES |
* TEMPORARY TABLE table_name_in_backup [INTO table_name] |
* ALL TEMPORARY TABLES |
* EVERYTHING } [,...]
* FROM 'backup_name'
*/
class ParserBackupQuery : public IParserBase
{
protected:
const char * getName() const override { return "BACKUP or RESTORE query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}

View File

@ -1,4 +1,5 @@
#include <Parsers/ParserAlterQuery.h>
#include <Parsers/ParserBackupQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ParserCreateQuotaQuery.h>
#include <Parsers/ParserCreateRoleQuery.h>
@ -40,6 +41,7 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserGrantQuery grant_p;
ParserSetRoleQuery set_role_p;
ParserExternalDDLQuery external_ddl_p;
ParserBackupQuery backup_p;
bool res = query_with_output_p.parse(pos, node, expected)
|| insert_p.parse(pos, node, expected)
@ -54,7 +56,8 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|| create_settings_profile_p.parse(pos, node, expected)
|| drop_access_entity_p.parse(pos, node, expected)
|| grant_p.parse(pos, node, expected)
|| external_ddl_p.parse(pos, node, expected);
|| external_ddl_p.parse(pos, node, expected)
|| backup_p.parse(pos, node, expected);
return res;
}

View File

@ -201,6 +201,16 @@ NameDependencies IStorage::getDependentViewsByColumn(ContextPtr context) const
return name_deps;
}
BackupEntries IStorage::backup(const ASTs &, ContextPtr) const
{
throw Exception("Table engine " + getName() + " doesn't support backups", ErrorCodes::NOT_IMPLEMENTED);
}
RestoreDataTasks IStorage::restoreFromBackup(const BackupPtr &, const String &, const ASTs &, ContextMutablePtr)
{
throw Exception("Table engine " + getName() + " doesn't support restoring", ErrorCodes::NOT_IMPLEMENTED);
}
std::string PrewhereInfo::dump() const
{
WriteBufferFromOwnString ss;

View File

@ -65,6 +65,13 @@ class EnabledQuota;
struct SelectQueryInfo;
using NameDependencies = std::unordered_map<String, std::vector<String>>;
using DatabaseAndTableName = std::pair<String, String>;
class IBackup;
using BackupPtr = std::shared_ptr<const IBackup>;
class IBackupEntry;
using BackupEntries = std::vector<std::pair<String, std::unique_ptr<IBackupEntry>>>;
using RestoreDataTasks = std::vector<std::function<void()>>;
struct ColumnSize
{
@ -188,6 +195,12 @@ public:
NameDependencies getDependentViewsByColumn(ContextPtr context) const;
/// Prepares entries to backup data of the storage.
virtual BackupEntries backup(const ASTs & partitions, ContextPtr context) const;
/// Extract data from the backup and put it to the storage.
virtual RestoreDataTasks restoreFromBackup(const BackupPtr & backup, const String & data_path_in_backup, const ASTs & partitions, ContextMutablePtr context);
protected:
/// Returns whether the column is virtual - by default all columns are real.
/// Initially reserved virtual column name may be shadowed by real column.

View File

@ -1,3 +1,8 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Backups/BackupEntryFromImmutableFile.h>
#include <Backups/BackupEntryFromSmallFile.h>
#include <Backups/IBackup.h>
#include <Compression/CompressedReadBuffer.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeArray.h>
@ -9,6 +14,7 @@
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/NestedUtils.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Formats/FormatFactory.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
@ -32,7 +38,6 @@
#include <Parsers/queryToString.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Storages/AlterCommands.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergeTreeDataPartWide.h>
@ -56,6 +61,7 @@
#include <boost/range/adaptor/filtered.hpp>
#include <boost/algorithm/string/join.hpp>
#include <common/insertAtEnd.h>
#include <common/scope_guard_safe.h>
#include <algorithm>
@ -2837,7 +2843,7 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String &
return getActiveContainingPart(part_info);
}
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartition(MergeTreeData::DataPartState state, const String & partition_id)
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartition(MergeTreeData::DataPartState state, const String & partition_id) const
{
DataPartStateAndPartitionID state_with_partition{state, partition_id};
@ -2847,6 +2853,22 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartition(Merg
data_parts_by_state_and_info.upper_bound(state_with_partition));
}
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartitions(MergeTreeData::DataPartState state, const std::unordered_set<String> & partition_ids) const
{
auto lock = lockParts();
DataPartsVector res;
for (const auto & partition_id : partition_ids)
{
DataPartStateAndPartitionID state_with_partition{state, partition_id};
insertAtEnd(
res,
DataPartsVector(
data_parts_by_state_and_info.lower_bound(state_with_partition),
data_parts_by_state_and_info.upper_bound(state_with_partition)));
}
return res;
}
MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const MergeTreePartInfo & part_info, const MergeTreeData::DataPartStates & valid_states)
{
auto lock = lockParts();
@ -3208,6 +3230,121 @@ Pipe MergeTreeData::alterPartition(
return {};
}
BackupEntries MergeTreeData::backup(const ASTs & partitions, ContextPtr local_context) const
{
DataPartsVector data_parts;
if (partitions.empty())
data_parts = getDataPartsVector();
else
data_parts = getDataPartsVectorInPartitions(MergeTreeDataPartState::Committed, getPartitionIDsFromQuery(partitions, local_context));
return backupDataParts(data_parts);
}
BackupEntries MergeTreeData::backupDataParts(const DataPartsVector & data_parts)
{
BackupEntries backup_entries;
std::map<DiskPtr, std::shared_ptr<TemporaryFileOnDisk>> temp_dirs;
for (const auto & part : data_parts)
{
auto disk = part->volume->getDisk();
auto temp_dir_it = temp_dirs.find(disk);
if (temp_dir_it == temp_dirs.end())
temp_dir_it = temp_dirs.emplace(disk, std::make_shared<TemporaryFileOnDisk>(disk, "tmp_backup_")).first;
auto temp_dir_owner = temp_dir_it->second;
fs::path temp_dir = temp_dir_owner->getPath();
fs::path part_dir = part->getFullRelativePath();
fs::path temp_part_dir = temp_dir / part->relative_path;
disk->createDirectories(temp_part_dir);
for (const auto & [filepath, checksum] : part->checksums.files)
{
String relative_filepath = fs::path(part->relative_path) / filepath;
String hardlink_filepath = temp_part_dir / filepath;
disk->createHardLink(part_dir / filepath, hardlink_filepath);
UInt128 file_hash{checksum.file_hash.first, checksum.file_hash.second};
backup_entries.emplace_back(
relative_filepath,
std::make_unique<BackupEntryFromImmutableFile>(disk, hardlink_filepath, checksum.file_size, file_hash, temp_dir_owner));
}
for (const auto & filepath : part->getFileNamesWithoutChecksums())
{
String relative_filepath = fs::path(part->relative_path) / filepath;
backup_entries.emplace_back(relative_filepath, std::make_unique<BackupEntryFromSmallFile>(disk, part_dir / filepath));
}
}
return backup_entries;
}
RestoreDataTasks MergeTreeData::restoreDataPartsFromBackup(const BackupPtr & backup, const String & data_path_in_backup,
const std::unordered_set<String> & partition_ids,
SimpleIncrement * increment)
{
RestoreDataTasks restore_tasks;
Strings part_names = backup->list(data_path_in_backup);
for (const String & part_name : part_names)
{
MergeTreePartInfo part_info;
if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
continue;
if (!partition_ids.empty() && !partition_ids.contains(part_info.partition_id))
continue;
UInt64 total_size_of_part = 0;
Strings filenames = backup->list(data_path_in_backup + part_name + "/", "");
for (const String & filename : filenames)
total_size_of_part += backup->getSize(data_path_in_backup + part_name + "/" + filename);
std::shared_ptr<IReservation> reservation = getStoragePolicy()->reserveAndCheck(total_size_of_part);
auto restore_task = [this,
backup,
data_path_in_backup,
part_name,
part_info = std::move(part_info),
filenames = std::move(filenames),
reservation,
increment]()
{
auto disk = reservation->getDisk();
auto temp_part_dir_owner = std::make_shared<TemporaryFileOnDisk>(disk, relative_data_path + "restoring_" + part_name + "_");
String temp_part_dir = temp_part_dir_owner->getPath();
disk->createDirectories(temp_part_dir);
assert(temp_part_dir.starts_with(relative_data_path));
String relative_temp_part_dir = temp_part_dir.substr(relative_data_path.size());
for (const String & filename : filenames)
{
auto backup_entry = backup->read(data_path_in_backup + part_name + "/" + filename);
auto read_buffer = backup_entry->getReadBuffer();
auto write_buffer = disk->writeFile(temp_part_dir + "/" + filename);
copyData(*read_buffer, *write_buffer);
}
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
auto part = createPart(part_name, part_info, single_disk_volume, relative_temp_part_dir);
part->loadColumnsChecksumsIndexes(false, true);
renameTempPartAndAdd(part, increment);
};
restore_tasks.emplace_back(std::move(restore_task));
}
return restore_tasks;
}
String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr local_context) const
{
const auto & partition_ast = ast->as<ASTPartition &>();
@ -3303,6 +3440,15 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr loc
return partition_id;
}
std::unordered_set<String> MergeTreeData::getPartitionIDsFromQuery(const ASTs & asts, ContextPtr local_context) const
{
std::unordered_set<String> partition_ids;
for (const auto & ast : asts)
partition_ids.emplace(getPartitionIDFromQuery(ast, local_context));
return partition_ids;
}
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(
const DataPartStates & affordable_states, DataPartStateVector * out_states, bool require_projection_parts) const
{

View File

@ -436,7 +436,8 @@ public:
void swapActivePart(MergeTreeData::DataPartPtr part_copy);
/// Returns all parts in specified partition
DataPartsVector getDataPartsVectorInPartition(DataPartState state, const String & partition_id);
DataPartsVector getDataPartsVectorInPartition(DataPartState state, const String & partition_id) const;
DataPartsVector getDataPartsVectorInPartitions(DataPartState state, const std::unordered_set<String> & partition_ids) const;
/// Returns the part with the given name and state or nullptr if no such part.
DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states);
@ -606,6 +607,17 @@ public:
ContextPtr context,
TableLockHolder & table_lock_holder);
/// Prepares entries to backup data of the storage.
BackupEntries backup(const ASTs & partitions, ContextPtr context) const override;
static BackupEntries backupDataParts(const DataPartsVector & data_parts);
/// Extract data from the backup and put it to the storage.
RestoreDataTasks restoreDataPartsFromBackup(
const BackupPtr & backup,
const String & data_path_in_backup,
const std::unordered_set<String> & partition_ids,
SimpleIncrement * increment);
/// Moves partition to specified Disk
void movePartitionToDisk(const ASTPtr & partition, const String & name, bool moving_part, ContextPtr context);
@ -636,6 +648,7 @@ public:
/// For ATTACH/DETACH/DROP PARTITION.
String getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr context) const;
std::unordered_set<String> getPartitionIDsFromQuery(const ASTs & asts, ContextPtr context) const;
/// Extracts MergeTreeData of other *MergeTree* storage
/// and checks that their structure suitable for ALTER TABLE ATTACH PARTITION FROM

View File

@ -8,6 +8,7 @@
#include <Interpreters/PartLog.h>
#include <Interpreters/MutationsInterpreter.h>
#include <Interpreters/Context.h>
#include <IO/copyData.h>
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
@ -1623,6 +1624,12 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
}
RestoreDataTasks StorageMergeTree::restoreFromBackup(const BackupPtr & backup, const String & data_path_in_backup, const ASTs & partitions, ContextMutablePtr local_context)
{
return restoreDataPartsFromBackup(backup, data_path_in_backup, getPartitionIDsFromQuery(partitions, local_context), &increment);
}
MutationCommands StorageMergeTree::getFirstAlterMutationCommandsForPart(const DataPartPtr & part) const
{
std::lock_guard lock(currently_processing_in_background_mutex);

View File

@ -94,6 +94,8 @@ public:
CheckResults checkData(const ASTPtr & query, ContextPtr context) override;
RestoreDataTasks restoreFromBackup(const BackupPtr & backup, const String & data_path_in_backup, const ASTs & partitions, ContextMutablePtr context) override;
bool scheduleDataProcessingJob(IBackgroundJobExecutor & executor) override;
MergeTreeDeduplicationLog * getDeduplicationLog() { return deduplication_log.get(); }

View File

@ -0,0 +1,120 @@
import pytest
import re
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance')
def create_and_fill_table():
instance.query("CREATE DATABASE test")
instance.query("CREATE TABLE test.table(x UInt32, y String) ENGINE=MergeTree ORDER BY y PARTITION BY x%10")
instance.query("INSERT INTO test.table SELECT number, toString(number) FROM numbers(100)")
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def cleanup_after_test():
try:
yield
finally:
instance.query("DROP DATABASE IF EXISTS test")
backup_id_counter = 0
def new_backup_name():
global backup_id_counter
backup_id_counter += 1
return f"test-backup-{backup_id_counter}"
def test_restore_table():
backup_name = new_backup_name()
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO '{backup_name}'")
instance.query("DROP TABLE test.table")
assert instance.query("EXISTS test.table") == "0\n"
instance.query(f"RESTORE TABLE test.table FROM '{backup_name}'")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
def test_restore_table_into_existing_table():
backup_name = new_backup_name()
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO '{backup_name}'")
instance.query(f"RESTORE TABLE test.table INTO test.table FROM '{backup_name}'")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "200\t9900\n"
instance.query(f"RESTORE TABLE test.table INTO test.table FROM '{backup_name}'")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "300\t14850\n"
def test_restore_table_under_another_name():
backup_name = new_backup_name()
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO '{backup_name}'")
assert instance.query("EXISTS test.table2") == "0\n"
instance.query(f"RESTORE TABLE test.table INTO test.table2 FROM '{backup_name}'")
assert instance.query("SELECT count(), sum(x) FROM test.table2") == "100\t4950\n"
def test_backup_table_under_another_name():
backup_name = new_backup_name()
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table AS test.table2 TO '{backup_name}'")
assert instance.query("EXISTS test.table2") == "0\n"
instance.query(f"RESTORE TABLE test.table2 FROM '{backup_name}'")
assert instance.query("SELECT count(), sum(x) FROM test.table2") == "100\t4950\n"
def test_incremental_backup():
backup_name = new_backup_name()
incremental_backup_name = new_backup_name()
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO '{backup_name}'")
instance.query("INSERT INTO test.table VALUES (65, 'a'), (66, 'b')")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "102\t5081\n"
instance.query(f"BACKUP TABLE test.table TO '{incremental_backup_name}' SETTINGS base_backup = '{backup_name}'")
instance.query(f"RESTORE TABLE test.table AS test.table2 FROM '{incremental_backup_name}'")
assert instance.query("SELECT count(), sum(x) FROM test.table2") == "102\t5081\n"
def test_backup_not_found_or_already_exists():
backup_name = new_backup_name()
expected_error = "Backup .* not found"
assert re.search(expected_error, instance.query_and_get_error(f"RESTORE TABLE test.table AS test.table2 FROM '{backup_name}'"))
create_and_fill_table()
instance.query(f"BACKUP TABLE test.table TO '{backup_name}'")
expected_error = "Backup .* already exists"
assert re.search(expected_error, instance.query_and_get_error(f"BACKUP TABLE test.table TO '{backup_name}'"))

View File

@ -1,5 +1,5 @@
SELECT errorCodeToName(toUInt32(-1));
SELECT errorCodeToName(-1);
SELECT errorCodeToName(600); /* gap in error codes */
SELECT errorCodeToName(950); /* gap in error codes */
SELECT errorCodeToName(0);
SELECT errorCodeToName(1);