Merge remote-tracking branch 'origin' into integration--7

This commit is contained in:
Yatsishin Ilya 2021-11-11 10:41:50 +03:00
commit 14c14dcd00
61 changed files with 1400 additions and 726 deletions

View File

@ -108,6 +108,11 @@ public:
LocalDate toDate() const { return LocalDate(m_year, m_month, m_day); }
LocalDateTime toStartOfDate() const { return LocalDateTime(m_year, m_month, m_day, 0, 0, 0); }
time_t to_time_t(const DateLUTImpl & time_zone = DateLUT::instance()) const
{
return time_zone.makeDateTime(m_year, m_month, m_day, m_hour, m_minute, m_second);
}
std::string toString() const
{
std::string s{"0000-00-00 00:00:00"};

View File

@ -6,7 +6,7 @@ Columns:
- `policy_name` ([String](../../sql-reference/data-types/string.md)) — Name of the storage policy.
- `volume_name` ([String](../../sql-reference/data-types/string.md)) — Volume name defined in the storage policy.
- `volume_priority` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Volume order number in the configuration.
- `volume_priority` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Volume order number in the configuration, the data fills the volumes according this priority, i.e. data during inserts and merges is written to volumes with a lower priority (taking into account other rules: TTL, `max_data_part_size`, `move_factor`).
- `disks` ([Array(String)](../../sql-reference/data-types/array.md)) — Disk names, defined in the storage policy.
- `max_data_part_size` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Maximum size of a data part that can be stored on volume disks (0 — no limit).
- `move_factor` ([Float64](../../sql-reference/data-types/float.md)) — Ratio of free disk space. When the ratio exceeds the value of configuration parameter, ClickHouse start to move data to the next volume in order.

View File

@ -6,7 +6,7 @@
- `policy_name` ([String](../../sql-reference/data-types/string.md)) — имя политики хранения.
- `volume_name` ([String](../../sql-reference/data-types/string.md)) — имя тома, который содержится в политике хранения.
- `volume_priority` ([UInt64](../../sql-reference/data-types/int-uint.md)) — порядковый номер тома согласно конфигурации.
- `volume_priority` ([UInt64](../../sql-reference/data-types/int-uint.md)) — порядковый номер тома согласно конфигурации, приоритет согласно которому данные заполняют тома, т.е. данные при инсертах и мержах записываются на тома с более низким приоритетом (с учетом других правил: TTL, `max_data_part_size`, `move_factor`).
- `disks` ([Array(String)](../../sql-reference/data-types/array.md)) — имена дисков, содержащихся в политике хранения.
- `max_data_part_size` ([UInt64](../../sql-reference/data-types/int-uint.md)) — максимальный размер куска данных, который может храниться на дисках тома (0 — без ограничений).
- `move_factor` — доля доступного свободного места на томе, если места становится меньше, то данные начнут перемещение на следующий том, если он есть (по умолчанию 0.1).

View File

@ -703,10 +703,6 @@ if (ThreadFuzzer::instance().isEffective())
setupTmpPath(log, disk->getPath());
}
/// Storage keeping all the backups.
fs::create_directories(path / "backups");
global_context->setBackupsVolume(config().getString("backups_path", path / "backups"), config().getString("backups_policy", ""));
/** Directory with 'flags': files indicating temporary settings for the server set by system administrator.
* Flags may be cleared automatically after being applied by the server.
* Examples: do repair of local data; clone all replicated tables from replica.

View File

@ -0,0 +1,31 @@
#pragma once
#include <Backups/IBackupEntry.h>
namespace DB
{
/// Represents small preloaded data to be included in a backup.
class BackupEntryFromCallback : public IBackupEntry
{
public:
using ReadBufferCreator = std::function<std::unique_ptr<ReadBuffer>()>;
/// The constructor is allowed to not set `checksum_`, in that case it will be calculated from the data.
BackupEntryFromCallback(const ReadBufferCreator & callback_, size_t size_, const std::optional<UInt128> & checksum_ = {})
: callback(callback_), size(size_), checksum(checksum_)
{
}
UInt64 getSize() const override { return size; }
std::optional<UInt128> getChecksum() const override { return checksum; }
std::unique_ptr<ReadBuffer> getReadBuffer() const override { return callback(); }
private:
const ReadBufferCreator callback;
const size_t size;
const std::optional<UInt128> checksum;
};
}

View File

@ -1,65 +1,41 @@
#include <Backups/BackupFactory.h>
#include <Backups/BackupInDirectory.h>
#include <Interpreters/Context.h>
#include <Disks/IVolume.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BACKUP_NOT_FOUND;
extern const int BACKUP_ALREADY_EXISTS;
extern const int NOT_ENOUGH_SPACE;
extern const int BACKUP_ENGINE_NOT_FOUND;
extern const int LOGICAL_ERROR;
}
BackupFactory & BackupFactory::instance()
{
static BackupFactory the_instance;
return the_instance;
}
void BackupFactory::setBackupsVolume(VolumePtr backups_volume_)
BackupMutablePtr BackupFactory::createBackup(const CreateParams & params) const
{
backups_volume = backups_volume_;
const String & engine_name = params.backup_info.backup_engine_name;
auto it = creators.find(engine_name);
if (it == creators.end())
throw Exception(ErrorCodes::BACKUP_ENGINE_NOT_FOUND, "Not found backup engine {}", engine_name);
return (it->second)(params);
}
BackupMutablePtr BackupFactory::createBackup(const String & backup_name, UInt64 estimated_backup_size, const BackupPtr & base_backup) const
void BackupFactory::registerBackupEngine(const String & engine_name, const CreatorFn & creator_fn)
{
if (!backups_volume)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No backups volume");
for (const auto & disk : backups_volume->getDisks())
{
if (disk->exists(backup_name))
throw Exception(ErrorCodes::BACKUP_ALREADY_EXISTS, "Backup {} already exists", quoteString(backup_name));
}
auto reservation = backups_volume->reserve(estimated_backup_size);
if (!reservation)
throw Exception(
ErrorCodes::NOT_ENOUGH_SPACE,
"Couldn't reserve {} bytes of free space for new backup {}",
estimated_backup_size,
quoteString(backup_name));
return std::make_shared<BackupInDirectory>(IBackup::OpenMode::WRITE, reservation->getDisk(), backup_name, base_backup);
if (creators.contains(engine_name))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Backup engine {} was registered twice", engine_name);
creators[engine_name] = creator_fn;
}
BackupPtr BackupFactory::openBackup(const String & backup_name, const BackupPtr & base_backup) const
void registerBackupEngines(BackupFactory & factory);
BackupFactory::BackupFactory()
{
if (!backups_volume)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No backups volume");
for (const auto & disk : backups_volume->getDisks())
{
if (disk->exists(backup_name))
return std::make_shared<BackupInDirectory>(IBackup::OpenMode::READ, disk, backup_name, base_backup);
}
throw Exception(ErrorCodes::BACKUP_NOT_FOUND, "Backup {} not found", quoteString(backup_name));
registerBackupEngines(*this);
}
}

View File

@ -1,38 +1,46 @@
#pragma once
#include <Backups/IBackup.h>
#include <Backups/BackupInfo.h>
#include <Core/Types.h>
#include <Parsers/IAST_fwd.h>
#include <boost/noncopyable.hpp>
#include <memory>
#include <optional>
#include <unordered_map>
namespace DB
{
class IBackup;
using BackupPtr = std::shared_ptr<const IBackup>;
using BackupMutablePtr = std::shared_ptr<IBackup>;
class Context;
using ContextMutablePtr = std::shared_ptr<Context>;
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
using ContextPtr = std::shared_ptr<const Context>;
/// Factory for implementations of the IBackup interface.
class BackupFactory : boost::noncopyable
{
public:
using OpenMode = IBackup::OpenMode;
struct CreateParams
{
OpenMode open_mode = OpenMode::WRITE;
BackupInfo backup_info;
std::optional<BackupInfo> base_backup_info;
ContextPtr context;
};
static BackupFactory & instance();
/// Must be called to initialize the backup factory.
void setBackupsVolume(VolumePtr backups_volume_);
/// Creates a new backup or opens it.
BackupMutablePtr createBackup(const CreateParams & params) const;
/// Creates a new backup and open it for writing.
BackupMutablePtr createBackup(const String & backup_name, UInt64 estimated_backup_size, const BackupPtr & base_backup = {}) const;
/// Opens an existing backup for reading.
BackupPtr openBackup(const String & backup_name, const BackupPtr & base_backup = {}) const;
using CreatorFn = std::function<BackupMutablePtr(const CreateParams & params)>;
void registerBackupEngine(const String & engine_name, const CreatorFn & creator_fn);
private:
VolumePtr backups_volume;
BackupFactory();
std::unordered_map<String, CreatorFn> creators;
};
}

476
src/Backups/BackupImpl.cpp Normal file
View File

@ -0,0 +1,476 @@
#include <Backups/BackupImpl.h>
#include <Backups/BackupFactory.h>
#include <Backups/BackupEntryConcat.h>
#include <Backups/BackupEntryFromCallback.h>
#include <Backups/BackupEntryFromMemory.h>
#include <Backups/IBackupEntry.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/hex.h>
#include <Common/typeid_cast.h>
#include <Common/quoteString.h>
#include <IO/HashingReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadHelpers.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <Poco/Util/XMLConfiguration.h>
#include <boost/range/adaptor/map.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int BACKUP_NOT_FOUND;
extern const int BACKUP_ALREADY_EXISTS;
extern const int BACKUP_VERSION_NOT_SUPPORTED;
extern const int BACKUP_DAMAGED;
extern const int NO_BASE_BACKUP;
extern const int WRONG_BASE_BACKUP;
extern const int BACKUP_ENTRY_ALREADY_EXISTS;
extern const int BACKUP_ENTRY_NOT_FOUND;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
namespace
{
const UInt64 BACKUP_VERSION = 1;
UInt128 unhexChecksum(const String & checksum)
{
if (checksum.size() != sizeof(UInt128) * 2)
throw Exception(ErrorCodes::BACKUP_DAMAGED, "Unexpected size of checksum: {}, must be {}", checksum.size(), sizeof(UInt128) * 2);
return unhexUInt<UInt128>(checksum.data());
}
}
BackupImpl::BackupImpl(const String & backup_name_, OpenMode open_mode_, const ContextPtr & context_, const std::optional<BackupInfo> & base_backup_info_)
: backup_name(backup_name_), open_mode(open_mode_), context(context_), base_backup_info(base_backup_info_)
{
}
BackupImpl::~BackupImpl() = default;
void BackupImpl::open()
{
if (open_mode == OpenMode::WRITE)
{
if (backupExists())
throw Exception(ErrorCodes::BACKUP_ALREADY_EXISTS, "Backup {} already exists", getName());
timestamp = std::time(nullptr);
uuid = UUIDHelpers::generateV4();
startWriting();
writing_started = true;
}
if (open_mode == OpenMode::READ)
{
if (!backupExists())
throw Exception(ErrorCodes::BACKUP_NOT_FOUND, "Backup {} not found", getName());
readBackupMetadata();
}
if (base_backup_info)
{
BackupFactory::CreateParams params;
params.backup_info = *base_backup_info;
params.open_mode = OpenMode::READ;
params.context = context;
base_backup = BackupFactory::instance().createBackup(params);
if (open_mode == OpenMode::WRITE)
base_backup_uuid = base_backup->getUUID();
else if (base_backup_uuid != base_backup->getUUID())
throw Exception(ErrorCodes::WRONG_BASE_BACKUP, "Backup {}: The base backup {} has different UUID ({} != {})",
getName(), base_backup->getName(), toString(base_backup->getUUID()), (base_backup_uuid ? toString(*base_backup_uuid) : ""));
}
}
void BackupImpl::close()
{
if (open_mode == OpenMode::WRITE)
{
if (writing_started && !writing_finalized)
{
/// Creating of the backup wasn't finished correctly,
/// so the backup cannot be used and it's better to remove its files.
removeAllFilesAfterFailure();
}
}
}
void BackupImpl::writeBackupMetadata()
{
Poco::AutoPtr<Poco::Util::XMLConfiguration> config{new Poco::Util::XMLConfiguration()};
config->setUInt("version", BACKUP_VERSION);
config->setString("timestamp", toString(LocalDateTime{timestamp}));
config->setString("uuid", toString(uuid));
if (base_backup_info)
config->setString("base_backup", base_backup_info->toString());
if (base_backup_uuid)
config->setString("base_backup_uuid", toString(*base_backup_uuid));
size_t index = 0;
for (const auto & [name, info] : file_infos)
{
String prefix = index ? "contents.file[" + std::to_string(index) + "]." : "contents.file.";
config->setString(prefix + "name", name);
config->setUInt(prefix + "size", info.size);
if (info.size)
{
config->setString(prefix + "checksum", getHexUIntLowercase(info.checksum));
if (info.base_size)
{
config->setUInt(prefix + "base_size", info.base_size);
if (info.base_size != info.size)
config->setString(prefix + "base_checksum", getHexUIntLowercase(info.base_checksum));
}
}
++index;
}
std::ostringstream stream; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
config->save(stream);
String str = stream.str();
auto out = addFileImpl(".backup");
out->write(str.data(), str.size());
}
void BackupImpl::readBackupMetadata()
{
auto in = readFileImpl(".backup");
String str;
readStringUntilEOF(str, *in);
std::istringstream stream(std::move(str)); // STYLE_CHECK_ALLOW_STD_STRING_STREAM
Poco::AutoPtr<Poco::Util::XMLConfiguration> config{new Poco::Util::XMLConfiguration()};
config->load(stream);
UInt64 version = config->getUInt("version");
if (version != BACKUP_VERSION)
throw Exception(ErrorCodes::BACKUP_VERSION_NOT_SUPPORTED, "Backup {}: Version {} is not supported", getName(), version);
timestamp = parse<LocalDateTime>(config->getString("timestamp")).to_time_t();
uuid = parse<UUID>(config->getString("uuid"));
if (config->has("base_backup") && !base_backup_info)
base_backup_info = BackupInfo::fromString(config->getString("base_backup"));
if (config->has("base_backup_uuid") && !base_backup_uuid)
base_backup_uuid = parse<UUID>(config->getString("base_backup_uuid"));
file_infos.clear();
Poco::Util::AbstractConfiguration::Keys keys;
config->keys("contents", keys);
for (const auto & key : keys)
{
if ((key == "file") || key.starts_with("file["))
{
String prefix = "contents." + key + ".";
String name = config->getString(prefix + "name");
FileInfo & info = file_infos.emplace(name, FileInfo{}).first->second;
info.size = config->getUInt(prefix + "size");
if (info.size)
{
info.checksum = unhexChecksum(config->getString(prefix + "checksum"));
if (config->has(prefix + "base_size"))
{
info.base_size = config->getUInt(prefix + "base_size");
if (info.base_size == info.size)
info.base_checksum = info.checksum;
else
info.base_checksum = unhexChecksum(config->getString(prefix + "base_checksum"));
}
}
}
}
}
Strings BackupImpl::listFiles(const String & prefix, const String & terminator) const
{
if (!prefix.ends_with('/') && !prefix.empty())
throw Exception("prefix should end with '/'", ErrorCodes::BAD_ARGUMENTS);
std::lock_guard lock{mutex};
Strings elements;
for (auto it = file_infos.lower_bound(prefix); it != file_infos.end(); ++it)
{
const String & name = it->first;
if (!name.starts_with(prefix))
break;
size_t start_pos = prefix.length();
size_t end_pos = String::npos;
if (!terminator.empty())
end_pos = name.find(terminator, start_pos);
std::string_view new_element = std::string_view{name}.substr(start_pos, end_pos - start_pos);
if (!elements.empty() && (elements.back() == new_element))
continue;
elements.push_back(String{new_element});
}
return elements;
}
bool BackupImpl::fileExists(const String & file_name) const
{
std::lock_guard lock{mutex};
return file_infos.count(file_name) != 0;
}
size_t BackupImpl::getFileSize(const String & file_name) const
{
std::lock_guard lock{mutex};
auto it = file_infos.find(file_name);
if (it == file_infos.end())
throw Exception(
ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "Backup {}: Entry {} not found in the backup", getName(), quoteString(file_name));
return it->second.size;
}
UInt128 BackupImpl::getFileChecksum(const String & file_name) const
{
std::lock_guard lock{mutex};
auto it = file_infos.find(file_name);
if (it == file_infos.end())
throw Exception(
ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "Backup {}: Entry {} not found in the backup", getName(), quoteString(file_name));
return it->second.checksum;
}
BackupEntryPtr BackupImpl::readFile(const String & file_name) const
{
std::lock_guard lock{mutex};
auto it = file_infos.find(file_name);
if (it == file_infos.end())
throw Exception(
ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "Backup {}: Entry {} not found in the backup", getName(), quoteString(file_name));
const auto & info = it->second;
if (!info.size)
{
/// Entry's data is empty.
return std::make_unique<BackupEntryFromMemory>(nullptr, 0, UInt128{0, 0});
}
auto read_callback = [backup = std::static_pointer_cast<const BackupImpl>(shared_from_this()), file_name]()
{
return backup->readFileImpl(file_name);
};
if (!info.base_size)
{
/// Data goes completely from this backup, the base backup isn't used.
return std::make_unique<BackupEntryFromCallback>(read_callback, info.size, info.checksum);
}
if (info.size < info.base_size)
{
throw Exception(
ErrorCodes::BACKUP_DAMAGED,
"Backup {}: Entry {} has its data size less than in the base backup {}: {} < {}",
getName(), quoteString(file_name), base_backup->getName(), info.size, info.base_size);
}
if (!base_backup)
{
throw Exception(
ErrorCodes::NO_BASE_BACKUP,
"Backup {}: Entry {} is marked to be read from a base backup, but there is no base backup specified",
getName(), quoteString(file_name));
}
if (!base_backup->fileExists(file_name))
{
throw Exception(
ErrorCodes::WRONG_BASE_BACKUP,
"Backup {}: Entry {} is marked to be read from a base backup, but doesn't exist there",
getName(), quoteString(file_name));
}
auto base_entry = base_backup->readFile(file_name);
auto base_size = base_entry->getSize();
if (base_size != info.base_size)
{
throw Exception(
ErrorCodes::WRONG_BASE_BACKUP,
"Backup {}: Entry {} has unexpected size in the base backup {}: {} (expected size: {})",
getName(), quoteString(file_name), base_backup->getName(), base_size, info.base_size);
}
auto base_checksum = base_entry->getChecksum();
if (base_checksum && (*base_checksum != info.base_checksum))
{
throw Exception(
ErrorCodes::WRONG_BASE_BACKUP,
"Backup {}: Entry {} has unexpected checksum in the base backup {}",
getName(), quoteString(file_name), base_backup->getName());
}
if (info.size == info.base_size)
{
/// Data goes completely from the base backup (nothing goes from this backup).
return base_entry;
}
/// The beginning of the data goes from the base backup,
/// and the ending goes from this backup.
return std::make_unique<BackupEntryConcat>(
std::move(base_entry),
std::make_unique<BackupEntryFromCallback>(read_callback, info.size - info.base_size),
info.checksum);
}
void BackupImpl::addFile(const String & file_name, BackupEntryPtr entry)
{
std::lock_guard lock{mutex};
if (open_mode != OpenMode::WRITE)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal operation: Cannot write to a backup opened for reading");
if (file_infos.contains(file_name))
throw Exception(
ErrorCodes::BACKUP_ENTRY_ALREADY_EXISTS, "Backup {}: Entry {} already exists", getName(), quoteString(file_name));
UInt64 size = entry->getSize();
std::optional<UInt128> checksum = entry->getChecksum();
/// Check if the entry's data is empty.
if (!size)
{
file_infos.emplace(file_name, FileInfo{});
return;
}
/// Check if a entry with such name exists in the base backup.
bool base_exists = (base_backup && base_backup->fileExists(file_name));
UInt64 base_size = 0;
UInt128 base_checksum{0, 0};
if (base_exists)
{
base_size = base_backup->getFileSize(file_name);
base_checksum = base_backup->getFileChecksum(file_name);
}
std::unique_ptr<ReadBuffer> read_buffer; /// We'll set that later.
UInt64 read_pos = 0; /// Current position in read_buffer.
/// Determine whether it's possible to receive this entry's data from the base backup completely or partly.
bool use_base = false;
if (base_exists && base_size)
{
if (size == base_size)
{
/// The size is the same, we need to compare checksums to find out
/// if the entry's data has not been changed since the base backup.
if (!checksum)
{
read_buffer = entry->getReadBuffer();
HashingReadBuffer hashing_read_buffer{*read_buffer};
hashing_read_buffer.ignore(size);
read_pos = size;
checksum = hashing_read_buffer.getHash();
}
if (checksum == base_checksum)
use_base = true; /// The data has not been changed.
}
else if (size > base_size)
{
/// The size has been increased, we need to calculate a partial checksum to find out
/// if the entry's data has been only appended since the base backup.
read_buffer = entry->getReadBuffer();
HashingReadBuffer hashing_read_buffer{*read_buffer};
hashing_read_buffer.ignore(base_size);
UInt128 partial_checksum = hashing_read_buffer.getHash();
read_pos = base_size;
if (!checksum)
{
hashing_read_buffer.ignore(size - base_size);
checksum = hashing_read_buffer.getHash();
read_pos = size;
}
if (partial_checksum == base_checksum)
use_base = true; /// The data has been appended.
}
}
if (use_base && (size == base_size))
{
/// The entry's data has not been changed since the base backup.
FileInfo info;
info.size = base_size;
info.checksum = base_checksum;
info.base_size = base_size;
info.base_checksum = base_checksum;
file_infos.emplace(file_name, info);
return;
}
{
/// Either the entry wasn't exist in the base backup
/// or the entry has data appended to the end of the data from the base backup.
/// In both those cases we have to copy data to this backup.
/// Find out where the start position to copy data is.
auto copy_pos = use_base ? base_size : 0;
/// Move the current read position to the start position to copy data.
/// If `read_buffer` is seekable it's easier, otherwise we can use ignore().
if (auto * seekable_buffer = dynamic_cast<SeekableReadBuffer *>(read_buffer.get()))
{
if (read_pos != copy_pos)
seekable_buffer->seek(copy_pos, SEEK_SET);
}
else
{
if (read_pos > copy_pos)
{
read_buffer.reset();
read_pos = 0;
}
if (!read_buffer)
read_buffer = entry->getReadBuffer();
if (read_pos < copy_pos)
read_buffer->ignore(copy_pos - read_pos);
}
/// If we haven't received or calculated a checksum yet, calculate it now.
ReadBuffer * maybe_hashing_read_buffer = read_buffer.get();
std::optional<HashingReadBuffer> hashing_read_buffer;
if (!checksum)
maybe_hashing_read_buffer = &hashing_read_buffer.emplace(*read_buffer);
/// Copy the entry's data after `copy_pos`.
auto out = addFileImpl(file_name);
copyData(*maybe_hashing_read_buffer, *out);
if (hashing_read_buffer)
checksum = hashing_read_buffer->getHash();
/// Done!
FileInfo info;
info.size = size;
info.checksum = *checksum;
if (use_base)
{
info.base_size = base_size;
info.base_checksum = base_checksum;
}
file_infos.emplace(file_name, info);
}
}
void BackupImpl::finalizeWriting()
{
if (open_mode != OpenMode::WRITE)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal operation: Cannot write to a backup opened for reading");
writeBackupMetadata();
writing_finalized = true;
}
}

94
src/Backups/BackupImpl.h Normal file
View File

@ -0,0 +1,94 @@
#pragma once
#include <Backups/IBackup.h>
#include <Backups/BackupInfo.h>
#include <map>
#include <mutex>
namespace DB
{
class Context;
using ContextPtr = std::shared_ptr<const Context>;
/// Base implementation of IBackup.
/// Along with passed files it also stores backup metadata - a single file named ".backup" in XML format
/// which contains a list of all files in the backup with their sizes and checksums and information
/// whether the base backup should be used for each entry.
class BackupImpl : public IBackup
{
public:
BackupImpl(
const String & backup_name_,
OpenMode open_mode_,
const ContextPtr & context_,
const std::optional<BackupInfo> & base_backup_info_ = {});
~BackupImpl() override;
const String & getName() const override { return backup_name; }
OpenMode getOpenMode() const override { return open_mode; }
time_t getTimestamp() const override { return timestamp; }
UUID getUUID() const override { return uuid; }
Strings listFiles(const String & prefix, const String & terminator) const override;
bool fileExists(const String & file_name) const override;
size_t getFileSize(const String & file_name) const override;
UInt128 getFileChecksum(const String & file_name) const override;
BackupEntryPtr readFile(const String & file_name) const override;
void addFile(const String & file_name, BackupEntryPtr entry) override;
void finalizeWriting() override;
protected:
/// Should be called in the constructor of a derived class.
void open();
/// Should be called in the destructor of a derived class.
void close();
/// Read a file from the backup.
/// Low level: the function doesn't check base backup or checksums.
virtual std::unique_ptr<ReadBuffer> readFileImpl(const String & file_name) const = 0;
/// Add a file to the backup.
/// Low level: the function doesn't check base backup or checksums.
virtual std::unique_ptr<WriteBuffer> addFileImpl(const String & file_name) = 0;
/// Checks if this backup exists.
virtual bool backupExists() const = 0;
/// Starts writing of this backup, only used if `open_mode == OpenMode::WRITE`.
/// After calling this function `backupExists()` should return true.
virtual void startWriting() = 0;
/// Removes all the backup files, called if something goes wrong while we're writing the backup.
/// This function is called by `close()` if `startWriting()` was called and `finalizeWriting()` wasn't.
virtual void removeAllFilesAfterFailure() = 0;
private:
void writeBackupMetadata();
void readBackupMetadata();
struct FileInfo
{
UInt64 size = 0;
UInt128 checksum{0, 0};
/// for incremental backups
UInt64 base_size = 0;
UInt128 base_checksum{0, 0};
};
const String backup_name;
const OpenMode open_mode;
UUID uuid;
time_t timestamp = 0;
ContextPtr context;
std::optional<BackupInfo> base_backup_info;
std::shared_ptr<const IBackup> base_backup;
std::optional<UUID> base_backup_uuid;
std::map<String, FileInfo> file_infos;
bool writing_started = false;
bool writing_finalized = false;
mutable std::mutex mutex;
};
}

View File

@ -1,454 +1,160 @@
#include <Backups/BackupInDirectory.h>
#include <Backups/BackupFactory.h>
#include <Backups/BackupEntryConcat.h>
#include <Backups/BackupEntryFromImmutableFile.h>
#include <Backups/BackupEntryFromMemory.h>
#include <Backups/IBackupEntry.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/typeid_cast.h>
#include <Common/quoteString.h>
#include <Disks/DiskSelector.h>
#include <Disks/IDisk.h>
#include <IO/HashingReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadHelpers.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <boost/range/adaptor/map.hpp>
#include <Disks/DiskLocal.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BACKUP_NOT_FOUND;
extern const int BACKUP_ALREADY_EXISTS;
extern const int BACKUP_VERSION_NOT_SUPPORTED;
extern const int BACKUP_DAMAGED;
extern const int NO_BASE_BACKUP;
extern const int WRONG_BASE_BACKUP;
extern const int BACKUP_ENTRY_ALREADY_EXISTS;
extern const int BACKUP_ENTRY_NOT_FOUND;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
namespace
{
const UInt64 BACKUP_VERSION = 1;
/// Checks multiple keys "key", "key[1]", "key[2]", and so on in the configuration
/// and find out if some of them have matching value.
bool findConfigKeyWithMatchingValue(const Poco::Util::AbstractConfiguration & config, const String & key, const std::function<bool(const String & value)> & match_function)
{
String current_key = key;
size_t counter = 0;
while (config.has(current_key))
{
if (match_function(config.getString(current_key)))
return true;
current_key = key + "[" + std::to_string(++counter) + "]";
}
return false;
}
bool isDiskAllowed(const String & disk_name, const Poco::Util::AbstractConfiguration & config)
{
return findConfigKeyWithMatchingValue(config, "backups.allowed_disk", [&](const String & value) { return value == disk_name; });
}
bool isPathAllowed(const String & path, const Poco::Util::AbstractConfiguration & config)
{
return findConfigKeyWithMatchingValue(config, "backups.allowed_path", [&](const String & value) { return path.starts_with(value); });
}
}
BackupInDirectory::BackupInDirectory(OpenMode open_mode_, const DiskPtr & disk_, const String & path_, const std::shared_ptr<const IBackup> & base_backup_)
: open_mode(open_mode_), disk(disk_), path(path_), path_with_sep(path_), base_backup(base_backup_)
BackupInDirectory::BackupInDirectory(
const String & backup_name_,
OpenMode open_mode_,
const DiskPtr & disk_,
const String & path_,
const ContextPtr & context_,
const std::optional<BackupInfo> & base_backup_info_)
: BackupImpl(backup_name_, open_mode_, context_, base_backup_info_)
, disk(disk_), path(path_)
{
if (!path_with_sep.ends_with('/'))
path_with_sep += '/';
trimRight(path, '/');
/// Path to backup must end with '/'
if (path.back() != '/')
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Backup {}: Path to backup must end with '/', but {} doesn't.", getName(), quoteString(path));
dir_path = fs::path(path).parent_path(); /// get path without terminating slash
/// If `disk` is not specified, we create an internal instance of `DiskLocal` here.
if (!disk)
{
auto fspath = fs::path{dir_path};
if (!fspath.has_filename())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Backup {}: Path to a backup must be a directory path.", getName(), quoteString(path));
path = fspath.filename() / "";
dir_path = fs::path(path).parent_path(); /// get path without terminating slash
String disk_path = fspath.remove_filename();
disk = std::make_shared<DiskLocal>(disk_path, disk_path, 0);
}
open();
}
BackupInDirectory::~BackupInDirectory()
{
close();
}
void BackupInDirectory::open()
bool BackupInDirectory::backupExists() const
{
if (open_mode == OpenMode::WRITE)
{
if (disk->exists(path))
throw Exception(ErrorCodes::BACKUP_ALREADY_EXISTS, "Backup {} already exists", quoteString(path));
disk->createDirectories(path);
directory_was_created = true;
writePathToBaseBackup();
}
if (open_mode == OpenMode::READ)
{
if (!disk->isDirectory(path))
throw Exception(ErrorCodes::BACKUP_NOT_FOUND, "Backup {} not found", quoteString(path));
readContents();
readPathToBaseBackup();
}
return disk->isDirectory(dir_path);
}
void BackupInDirectory::close()
void BackupInDirectory::startWriting()
{
if (open_mode == OpenMode::WRITE)
disk->createDirectories(dir_path);
}
void BackupInDirectory::removeAllFilesAfterFailure()
{
if (disk->isDirectory(dir_path))
disk->removeRecursive(dir_path);
}
std::unique_ptr<ReadBuffer> BackupInDirectory::readFileImpl(const String & file_name) const
{
String file_path = path + file_name;
return disk->readFile(file_path);
}
std::unique_ptr<WriteBuffer> BackupInDirectory::addFileImpl(const String & file_name)
{
String file_path = path + file_name;
disk->createDirectories(fs::path(file_path).parent_path());
return disk->writeFile(file_path);
}
void registerBackupEngineFile(BackupFactory & factory)
{
auto creator_fn = [](const BackupFactory::CreateParams & params)
{
if (!finalized && directory_was_created)
String backup_name = params.backup_info.toString();
const String & engine_name = params.backup_info.backup_engine_name;
const auto & args = params.backup_info.args;
DiskPtr disk;
String path;
if (engine_name == "File")
{
/// Creating of the backup wasn't finished correctly,
/// so the backup cannot be used and it's better to remove its files.
disk->removeRecursive(path);
}
}
}
void BackupInDirectory::writePathToBaseBackup()
{
String file_path = path_with_sep + ".base_backup";
if (!base_backup)
{
disk->removeFileIfExists(file_path);
return;
}
auto out = disk->writeFile(file_path);
writeString(base_backup->getPath(), *out);
}
void BackupInDirectory::readPathToBaseBackup()
{
if (base_backup)
return;
String file_path = path_with_sep + ".base_backup";
if (!disk->exists(file_path))
return;
auto in = disk->readFile(file_path);
String base_backup_path;
readStringUntilEOF(base_backup_path, *in);
if (base_backup_path.empty())
return;
base_backup = BackupFactory::instance().openBackup(base_backup_path);
}
void BackupInDirectory::writeContents()
{
auto out = disk->writeFile(path_with_sep + ".contents");
writeVarUInt(BACKUP_VERSION, *out);
writeVarUInt(infos.size(), *out);
for (const auto & [path_in_backup, info] : infos)
{
writeBinary(path_in_backup, *out);
writeVarUInt(info.size, *out);
if (info.size)
{
writeBinary(info.checksum, *out);
writeVarUInt(info.base_size, *out);
if (info.base_size && (info.base_size != info.size))
writeBinary(info.base_checksum, *out);
}
}
}
void BackupInDirectory::readContents()
{
auto in = disk->readFile(path_with_sep + ".contents");
UInt64 version;
readVarUInt(version, *in);
if (version != BACKUP_VERSION)
throw Exception(ErrorCodes::BACKUP_VERSION_NOT_SUPPORTED, "Backup {}: Version {} is not supported", quoteString(path), version);
size_t num_infos;
readVarUInt(num_infos, *in);
infos.clear();
for (size_t i = 0; i != num_infos; ++i)
{
String path_in_backup;
readBinary(path_in_backup, *in);
EntryInfo info;
readVarUInt(info.size, *in);
if (info.size)
{
readBinary(info.checksum, *in);
readVarUInt(info.base_size, *in);
if (info.base_size && (info.base_size != info.size))
readBinary(info.base_checksum, *in);
else if (info.base_size)
info.base_checksum = info.checksum;
}
infos.emplace(path_in_backup, info);
}
}
IBackup::OpenMode BackupInDirectory::getOpenMode() const
{
return open_mode;
}
String BackupInDirectory::getPath() const
{
return path;
}
Strings BackupInDirectory::list(const String & prefix, const String & terminator) const
{
if (!prefix.ends_with('/') && !prefix.empty())
throw Exception("prefix should end with '/'", ErrorCodes::BAD_ARGUMENTS);
std::lock_guard lock{mutex};
Strings elements;
for (auto it = infos.lower_bound(prefix); it != infos.end(); ++it)
{
const String & name = it->first;
if (!name.starts_with(prefix))
break;
size_t start_pos = prefix.length();
size_t end_pos = String::npos;
if (!terminator.empty())
end_pos = name.find(terminator, start_pos);
std::string_view new_element = std::string_view{name}.substr(start_pos, end_pos - start_pos);
if (!elements.empty() && (elements.back() == new_element))
continue;
elements.push_back(String{new_element});
}
return elements;
}
bool BackupInDirectory::exists(const String & name) const
{
std::lock_guard lock{mutex};
return infos.count(name) != 0;
}
size_t BackupInDirectory::getSize(const String & name) const
{
std::lock_guard lock{mutex};
auto it = infos.find(name);
if (it == infos.end())
throw Exception(
ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "Backup {}: Entry {} not found in the backup", quoteString(path), quoteString(name));
return it->second.size;
}
UInt128 BackupInDirectory::getChecksum(const String & name) const
{
std::lock_guard lock{mutex};
auto it = infos.find(name);
if (it == infos.end())
throw Exception(
ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "Backup {}: Entry {} not found in the backup", quoteString(path), quoteString(name));
return it->second.checksum;
}
BackupEntryPtr BackupInDirectory::read(const String & name) const
{
std::lock_guard lock{mutex};
auto it = infos.find(name);
if (it == infos.end())
throw Exception(
ErrorCodes::BACKUP_ENTRY_NOT_FOUND, "Backup {}: Entry {} not found in the backup", quoteString(path), quoteString(name));
const auto & info = it->second;
if (!info.size)
{
/// Entry's data is empty.
return std::make_unique<BackupEntryFromMemory>(nullptr, 0, UInt128{0, 0});
}
if (!info.base_size)
{
/// Data goes completely from this backup, the base backup isn't used.
return std::make_unique<BackupEntryFromImmutableFile>(disk, path_with_sep + name, info.size, info.checksum);
}
if (info.size < info.base_size)
{
throw Exception(
ErrorCodes::BACKUP_DAMAGED,
"Backup {}: Entry {} has its data size less than in the base backup {}: {} < {}",
quoteString(path), quoteString(name), quoteString(base_backup->getPath()), info.size, info.base_size);
}
if (!base_backup)
{
throw Exception(
ErrorCodes::NO_BASE_BACKUP,
"Backup {}: Entry {} is marked to be read from a base backup, but there is no base backup specified",
quoteString(path), quoteString(name));
}
if (!base_backup->exists(name))
{
throw Exception(
ErrorCodes::WRONG_BASE_BACKUP,
"Backup {}: Entry {} is marked to be read from a base backup, but doesn't exist there",
quoteString(path), quoteString(name));
}
auto base_entry = base_backup->read(name);
auto base_size = base_entry->getSize();
if (base_size != info.base_size)
{
throw Exception(
ErrorCodes::WRONG_BASE_BACKUP,
"Backup {}: Entry {} has unexpected size in the base backup {}: {} (expected size: {})",
quoteString(path), quoteString(name), quoteString(base_backup->getPath()), base_size, info.base_size);
}
auto base_checksum = base_entry->getChecksum();
if (base_checksum && (*base_checksum != info.base_checksum))
{
throw Exception(
ErrorCodes::WRONG_BASE_BACKUP,
"Backup {}: Entry {} has unexpected checksum in the base backup {}",
quoteString(path), quoteString(name), quoteString(base_backup->getPath()));
}
if (info.size == info.base_size)
{
/// Data goes completely from the base backup (nothing goes from this backup).
return base_entry;
}
/// The beginning of the data goes from the base backup,
/// and the ending goes from this backup.
return std::make_unique<BackupEntryConcat>(
std::move(base_entry),
std::make_unique<BackupEntryFromImmutableFile>(disk, path_with_sep + name, info.size - info.base_size),
info.checksum);
}
void BackupInDirectory::write(const String & name, BackupEntryPtr entry)
{
std::lock_guard lock{mutex};
if (open_mode != OpenMode::WRITE)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal operation: Cannot write to a backup opened for reading");
if (infos.contains(name))
throw Exception(
ErrorCodes::BACKUP_ENTRY_ALREADY_EXISTS, "Backup {}: Entry {} already exists", quoteString(path), quoteString(name));
UInt64 size = entry->getSize();
std::optional<UInt128> checksum = entry->getChecksum();
/// Check if the entry's data is empty.
if (!size)
{
infos.emplace(name, EntryInfo{});
return;
}
/// Check if a entry with such name exists in the base backup.
bool base_exists = (base_backup && base_backup->exists(name));
UInt64 base_size = 0;
UInt128 base_checksum{0, 0};
if (base_exists)
{
base_size = base_backup->getSize(name);
base_checksum = base_backup->getChecksum(name);
}
std::unique_ptr<ReadBuffer> read_buffer; /// We'll set that later.
UInt64 read_pos = 0; /// Current position in read_buffer.
/// Determine whether it's possible to receive this entry's data from the base backup completely or partly.
bool use_base = false;
if (base_exists && base_size)
{
if (size == base_size)
{
/// The size is the same, we need to compare checksums to find out
/// if the entry's data has not been changed since the base backup.
if (!checksum)
if (args.size() != 1)
{
read_buffer = entry->getReadBuffer();
HashingReadBuffer hashing_read_buffer{*read_buffer};
hashing_read_buffer.ignore(size);
read_pos = size;
checksum = hashing_read_buffer.getHash();
throw Exception(
"Backup engine 'File' requires 1 argument (path)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
if (checksum == base_checksum)
use_base = true; /// The data has not been changed.
path = args[0].safeGet<String>();
if (!isPathAllowed(path, params.context->getConfigRef()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path {} is not allowed for backups", path);
}
else if (size > base_size)
else if (engine_name == "Disk")
{
/// The size has been increased, we need to calculate a partial checksum to find out
/// if the entry's data has been only appended since the base backup.
read_buffer = entry->getReadBuffer();
HashingReadBuffer hashing_read_buffer{*read_buffer};
hashing_read_buffer.ignore(base_size);
UInt128 partial_checksum = hashing_read_buffer.getHash();
read_pos = base_size;
if (!checksum)
if (args.size() != 2)
{
hashing_read_buffer.ignore(size - base_size);
checksum = hashing_read_buffer.getHash();
read_pos = size;
throw Exception(
"Backup engine 'Disk' requires 2 arguments (disk_name, path)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
if (partial_checksum == base_checksum)
use_base = true; /// The data has been appended.
}
}
if (use_base && (size == base_size))
{
/// The entry's data has not been changed since the base backup.
EntryInfo info;
info.size = base_size;
info.checksum = base_checksum;
info.base_size = base_size;
info.base_checksum = base_checksum;
infos.emplace(name, info);
return;
}
String disk_name = args[0].safeGet<String>();
disk = params.context->getDisk(disk_name);
path = args[1].safeGet<String>();
{
/// Either the entry wasn't exist in the base backup
/// or the entry has data appended to the end of the data from the base backup.
/// In both those cases we have to copy data to this backup.
/// Find out where the start position to copy data is.
auto copy_pos = use_base ? base_size : 0;
/// Move the current read position to the start position to copy data.
/// If `read_buffer` is seekable it's easier, otherwise we can use ignore().
if ((read_pos > copy_pos) && !typeid_cast<SeekableReadBuffer *>(read_buffer.get()))
{
read_buffer.reset();
read_pos = 0;
if (!isDiskAllowed(disk_name, params.context->getConfigRef()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk {} is not allowed for backups", disk_name);
}
if (!read_buffer)
read_buffer = entry->getReadBuffer();
return std::make_shared<BackupInDirectory>(backup_name, params.open_mode, disk, path, params.context, params.base_backup_info);
};
if (read_pos != copy_pos)
{
if (auto * seekable_buffer = typeid_cast<SeekableReadBuffer *>(read_buffer.get()))
seekable_buffer->seek(copy_pos, SEEK_SET);
else if (copy_pos)
read_buffer->ignore(copy_pos - read_pos);
}
/// If we haven't received or calculated a checksum yet, calculate it now.
ReadBuffer * maybe_hashing_read_buffer = read_buffer.get();
std::optional<HashingReadBuffer> hashing_read_buffer;
if (!checksum)
maybe_hashing_read_buffer = &hashing_read_buffer.emplace(*read_buffer);
/// Copy the entry's data after `copy_pos`.
String out_file_path = path_with_sep + name;
disk->createDirectories(directoryPath(out_file_path));
auto out = disk->writeFile(out_file_path);
copyData(*maybe_hashing_read_buffer, *out, size - copy_pos);
if (hashing_read_buffer)
checksum = hashing_read_buffer->getHash();
/// Done!
EntryInfo info;
info.size = size;
info.checksum = *checksum;
if (use_base)
{
info.base_size = base_size;
info.base_checksum = base_checksum;
}
infos.emplace(name, info);
}
}
void BackupInDirectory::finalizeWriting()
{
if (open_mode != OpenMode::WRITE)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal operation: Cannot write to a backup opened for reading");
writeContents();
finalized = true;
factory.registerBackupEngine("File", creator_fn);
factory.registerBackupEngine("Disk", creator_fn);
}
}

View File

@ -1,8 +1,6 @@
#pragma once
#include <Backups/IBackup.h>
#include <map>
#include <mutex>
#include <Backups/BackupImpl.h>
namespace DB
@ -12,55 +10,29 @@ using DiskPtr = std::shared_ptr<IDisk>;
/// Represents a backup stored on a disk.
/// A backup is stored as a directory, each entry is stored as a file in that directory.
/// Also three system files are stored:
/// 1) ".base" is an XML file with information about the base backup.
/// 2) ".contents" is a binary file containing a list of all entries along with their sizes
/// and checksums and information whether the base backup should be used for each entry
/// 3) ".write_lock" is a temporary empty file which is created before writing of a backup
/// and deleted after finishing that writing.
class BackupInDirectory : public IBackup
class BackupInDirectory : public BackupImpl
{
public:
BackupInDirectory(OpenMode open_mode_, const DiskPtr & disk_, const String & path_, const std::shared_ptr<const IBackup> & base_backup_ = {});
/// `disk`_ is allowed to be nullptr and that means the `path_` is a path in the local filesystem.
BackupInDirectory(
const String & backup_name_,
OpenMode open_mode_,
const DiskPtr & disk_,
const String & path_,
const ContextPtr & context_,
const std::optional<BackupInfo> & base_backup_info_ = {});
~BackupInDirectory() override;
OpenMode getOpenMode() const override;
String getPath() const override;
Strings list(const String & prefix, const String & terminator) const override;
bool exists(const String & name) const override;
size_t getSize(const String & name) const override;
UInt128 getChecksum(const String & name) const override;
BackupEntryPtr read(const String & name) const override;
void write(const String & name, BackupEntryPtr entry) override;
void finalizeWriting() override;
private:
void open();
void close();
void writePathToBaseBackup();
void readPathToBaseBackup();
void writeContents();
void readContents();
bool backupExists() const override;
void startWriting() override;
void removeAllFilesAfterFailure() override;
std::unique_ptr<ReadBuffer> readFileImpl(const String & file_name) const override;
std::unique_ptr<WriteBuffer> addFileImpl(const String & file_name) override;
struct EntryInfo
{
UInt64 size = 0;
UInt128 checksum{0, 0};
/// for incremental backups
UInt64 base_size = 0;
UInt128 base_checksum{0, 0};
};
const OpenMode open_mode;
const DiskPtr disk;
DiskPtr disk;
String path;
String path_with_sep;
std::shared_ptr<const IBackup> base_backup;
std::map<String, EntryInfo> infos;
bool directory_was_created = false;
bool finalized = false;
mutable std::mutex mutex;
String dir_path; /// `path` without terminating slash
};
}

View File

@ -0,0 +1,70 @@
#include <Backups/BackupInfo.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
String BackupInfo::toString() const
{
auto func = std::make_shared<ASTFunction>();
func->name = backup_engine_name;
func->no_empty_args = true;
auto list = std::make_shared<ASTExpressionList>();
func->arguments = list;
func->children.push_back(list);
list->children.reserve(args.size());
for (const auto & arg : args)
list->children.push_back(std::make_shared<ASTLiteral>(arg));
return serializeAST(*func);
}
BackupInfo BackupInfo::fromString(const String & str)
{
ParserIdentifierWithOptionalParameters parser;
ASTPtr ast = parseQuery(parser, str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
return fromAST(*ast);
}
BackupInfo BackupInfo::fromAST(const IAST & ast)
{
const auto * func = ast.as<const ASTFunction>();
if (!func)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected function, got {}", serializeAST(ast));
BackupInfo res;
res.backup_engine_name = func->name;
if (func->arguments)
{
const auto * list = func->arguments->as<const ASTExpressionList>();
if (!list)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected list, got {}", serializeAST(*func->arguments));
res.args.reserve(list->children.size());
for (const auto & elem : list->children)
{
const auto * lit = elem->as<const ASTLiteral>();
if (!lit)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected literal, got {}", serializeAST(*elem));
res.args.push_back(lit->value);
}
}
return res;
}
}

21
src/Backups/BackupInfo.h Normal file
View File

@ -0,0 +1,21 @@
#pragma once
#include <Core/Field.h>
namespace DB
{
class IAST;
/// Information about a backup.
struct BackupInfo
{
String backup_engine_name;
std::vector<Field> args;
String toString() const;
static BackupInfo fromString(const String & str);
static BackupInfo fromAST(const IAST & ast);
};
}

View File

@ -7,7 +7,7 @@ namespace DB
{
#define LIST_OF_BACKUP_SETTINGS(M) \
M(String, base_backup, "", "Name of the base backup. Only differences made after the base backup will be included in a newly created backup, so this option allows to make an incremental backup.", 0) \
M(Bool, dummy, false, "", 0) \
DECLARE_SETTINGS_TRAITS_ALLOW_CUSTOM_SETTINGS(BackupSettingsTraits, LIST_OF_BACKUP_SETTINGS)

View File

@ -18,6 +18,8 @@
#include <boost/range/adaptor/reversed.hpp>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
@ -426,7 +428,7 @@ namespace
ASTPtr readCreateQueryFromBackup(const DatabaseAndTableName & table_name, const BackupPtr & backup)
{
String create_query_path = getMetadataPathInBackup(table_name);
auto read_buffer = backup->read(create_query_path)->getReadBuffer();
auto read_buffer = backup->readFile(create_query_path)->getReadBuffer();
String create_query_str;
readStringUntilEOF(create_query_str, *read_buffer);
read_buffer.reset();
@ -437,7 +439,7 @@ namespace
ASTPtr readCreateQueryFromBackup(const String & database_name, const BackupPtr & backup)
{
String create_query_path = getMetadataPathInBackup(database_name);
auto read_buffer = backup->read(create_query_path)->getReadBuffer();
auto read_buffer = backup->readFile(create_query_path)->getReadBuffer();
String create_query_str;
readStringUntilEOF(create_query_str, *read_buffer);
read_buffer.reset();
@ -546,9 +548,10 @@ namespace
}
RestoreObjectsTasks restore_objects_tasks;
Strings table_names = backup->list("metadata/" + escapeForFileName(database_name) + "/", "/");
for (const String & table_name : table_names)
Strings table_metadata_filenames = backup->listFiles("metadata/" + escapeForFileName(database_name) + "/", "/");
for (const String & table_metadata_filename : table_metadata_filenames)
{
String table_name = unescapeForFileName(fs::path{table_metadata_filename}.stem());
if (except_list.contains(table_name))
continue;
restoreTable({database_name, table_name}, {}, context, backup, renaming_config, restore_objects_tasks);
@ -565,10 +568,11 @@ namespace
{
restore_tasks.emplace_back([except_list, context, backup, renaming_config]() -> RestoreDataTasks
{
Strings database_names = backup->list("metadata/", "/");
RestoreObjectsTasks restore_objects_tasks;
for (const String & database_name : database_names)
Strings database_metadata_filenames = backup->listFiles("metadata/", "/");
for (const String & database_metadata_filename : database_metadata_filenames)
{
String database_name = unescapeForFileName(fs::path{database_metadata_filename}.stem());
if (except_list.contains(database_name))
continue;
restoreDatabase(database_name, {}, context, backup, renaming_config, restore_objects_tasks);
@ -650,10 +654,10 @@ UInt64 estimateBackupSize(const BackupEntries & backup_entries, const BackupPtr
UInt64 data_size = entry->getSize();
if (base_backup)
{
if (base_backup->exists(name) && (data_size == base_backup->getSize(name)))
if (base_backup->fileExists(name) && (data_size == base_backup->getFileSize(name)))
{
auto checksum = entry->getChecksum();
if (checksum && (*checksum == base_backup->getChecksum(name)))
if (checksum && (*checksum == base_backup->getFileChecksum(name)))
continue;
}
}
@ -664,7 +668,7 @@ UInt64 estimateBackupSize(const BackupEntries & backup_entries, const BackupPtr
void writeBackupEntries(BackupMutablePtr backup, BackupEntries && backup_entries, size_t num_threads)
{
if (!num_threads)
if (!num_threads || !backup->supportsWritingInMultipleThreads())
num_threads = 1;
std::vector<ThreadFromGlobalPool> threads;
size_t num_active_threads = 0;
@ -691,7 +695,7 @@ void writeBackupEntries(BackupMutablePtr backup, BackupEntries && backup_entries
{
try
{
backup->write(name, std::move(entry));
backup->addFile(name, std::move(entry));
}
catch (...)
{
@ -747,7 +751,6 @@ RestoreObjectsTasks makeRestoreTasks(const Elements & elements, ContextMutablePt
case ElementType::DATABASE:
{
const String & database_name = element.name.first;
auto database = DatabaseCatalog::instance().getDatabase(database_name, context);
restoreDatabase(database_name, element.except_list, context, backup, renaming_config, restore_tasks);
break;
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Core/Types.h>
#include <Common/TypePromotion.h>
#include <memory>
@ -12,11 +13,15 @@ using BackupEntryPtr = std::unique_ptr<IBackupEntry>;
/// Represents a backup, i.e. a storage of BackupEntries which can be accessed by their names.
/// A backup can be either incremental or non-incremental. An incremental backup doesn't store
/// the data of the entries which are not changed compared to its base backup.
class IBackup
class IBackup : public std::enable_shared_from_this<IBackup>, public TypePromotion<IBackup>
{
public:
IBackup() {}
virtual ~IBackup() = default;
/// Name of the backup.
virtual const String & getName() const = 0;
enum class OpenMode
{
READ,
@ -26,8 +31,11 @@ public:
/// A backup can be open either in READ or WRITE mode.
virtual OpenMode getOpenMode() const = 0;
/// Returns the path to the backup.
virtual String getPath() const = 0;
/// Returns the time point when this backup was created.
virtual time_t getTimestamp() const = 0;
/// Returns UUID of the backup.
virtual UUID getUUID() const = 0;
/// Returns names of entries stored in the backup.
/// If `prefix` isn't empty the function will return only the names starting with
@ -36,24 +44,27 @@ public:
/// before the terminator. For example, list("", "") returns names of all the entries
/// in the backup; and list("data/", "/") return kind of a list of folders and
/// files stored in the "data/" directory inside the backup.
virtual Strings list(const String & prefix = "", const String & terminator = "/") const = 0;
virtual Strings listFiles(const String & prefix = "", const String & terminator = "/") const = 0;
/// Checks if an entry with a specified name exists.
virtual bool exists(const String & name) const = 0;
virtual bool fileExists(const String & file_name) const = 0;
/// Returns the size of the entry's data.
/// This function does the same as `read(name)->getSize()` but faster.
virtual size_t getSize(const String & name) const = 0;
/// This function does the same as `read(file_name)->getSize()` but faster.
virtual size_t getFileSize(const String & file_name) const = 0;
/// Returns the checksum of the entry's data.
/// This function does the same as `read(name)->getCheckum()` but faster.
virtual UInt128 getChecksum(const String & name) const = 0;
/// This function does the same as `read(file_name)->getCheckum()` but faster.
virtual UInt128 getFileChecksum(const String & file_name) const = 0;
/// Reads an entry from the backup.
virtual BackupEntryPtr read(const String & name) const = 0;
virtual BackupEntryPtr readFile(const String & file_name) const = 0;
/// Puts a new entry to the backup.
virtual void write(const String & name, BackupEntryPtr entry) = 0;
virtual void addFile(const String & file_name, BackupEntryPtr entry) = 0;
/// Whether it's possible to add new entries to the backup in multiple threads.
virtual bool supportsWritingInMultipleThreads() const { return true; }
/// Finalizes writing the backup, should be called after all entries have been successfully written.
virtual void finalizeWriting() = 0;

View File

@ -0,0 +1,14 @@
namespace DB
{
class BackupFactory;
void registerBackupEngineFile(BackupFactory &);
void registerBackupEngines(BackupFactory & factory)
{
registerBackupEngineFile(factory);
}
}

View File

@ -21,6 +21,7 @@
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTSelectIntersectExceptQuery.h>
#include <Parsers/ASTUseQuery.h>
#include <Parsers/ASTWindowDefinition.h>
#include <Parsers/ParserQuery.h>
@ -447,6 +448,11 @@ void QueryFuzzer::fuzz(ASTPtr & ast)
{
fuzz(with_union->list_of_selects);
}
else if (auto * with_intersect_except = typeid_cast<ASTSelectIntersectExceptQuery *>(ast.get()))
{
auto selects = with_intersect_except->getListOfSelects();
fuzz(selects);
}
else if (auto * tables = typeid_cast<ASTTablesInSelectQuery *>(ast.get()))
{
fuzz(tables->children);

View File

@ -594,6 +594,7 @@
M(624, BAD_FILE_TYPE) \
M(625, IO_SETUP_ERROR) \
M(626, CANNOT_SKIP_UNKNOWN_FIELD) \
M(627, BACKUP_ENGINE_NOT_FOUND) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -116,3 +116,26 @@ inline UInt16 unhex4(const char * data)
+ static_cast<UInt16>(unhex(data[2])) * 0x10
+ static_cast<UInt16>(unhex(data[3]));
}
template <typename TUInt>
TUInt unhexUInt(const char * data)
{
TUInt res = 0;
if constexpr ((sizeof(TUInt) <= 8) || ((sizeof(TUInt) % 8) != 0))
{
for (size_t i = 0; i < sizeof(TUInt) * 2; ++i, ++data)
{
res <<= 4;
res += unhex(*data);
}
}
else
{
for (size_t i = 0; i < sizeof(TUInt) / 8; ++i, data += 16)
{
res <<= 64;
res += unhexUInt<UInt64>(data);
}
}
return res;
}

View File

@ -508,7 +508,7 @@ class IColumn;
M(ShortCircuitFunctionEvaluation, short_circuit_function_evaluation, ShortCircuitFunctionEvaluation::ENABLE, "Setting for short-circuit function evaluation configuration. Possible values: 'enable' - use short-circuit function evaluation for functions that are suitable for it, 'disable' - disable short-circuit function evaluation, 'force_enable' - use short-circuit function evaluation for all functions.", 0) \
\
M(String, local_filesystem_read_method, "pread", "Method of reading data from local filesystem, one of: read, pread, mmap, pread_threadpool.", 0) \
M(String, remote_filesystem_read_method, "read", "Method of reading data from remote filesystem, one of: read, read_threadpool.", 0) \
M(String, remote_filesystem_read_method, "read", "Method of reading data from remote filesystem, one of: read, threadpool.", 0) \
M(Bool, local_filesystem_read_prefetch, false, "Should use prefetching when reading data from local filesystem.", 0) \
M(Bool, remote_filesystem_read_prefetch, true, "Should use prefetching when reading data from remote filesystem.", 0) \
M(Int64, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \

View File

@ -166,7 +166,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
RemoteMetadata meta(path, remote_path);
meta.remote_fs_objects.emplace_back(std::make_pair(remote_path, iter->second.size));
bool threadpool_read = read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool;
bool threadpool_read = read_settings.remote_fs_method == RemoteFSReadMethod::threadpool;
auto web_impl = std::make_unique<ReadBufferFromWebServerGather>(path, url, meta, getContext(), threadpool_read, read_settings);

View File

@ -77,7 +77,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path,
auto hdfs_impl = std::make_unique<ReadBufferFromHDFSGather>(path, config, remote_fs_root_path, metadata, read_settings.remote_fs_buffer_size);
if (read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool)
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto reader = getThreadPoolReader();
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings, std::move(hdfs_impl));

View File

@ -230,7 +230,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, co
LOG_TRACE(log, "Read from file by path: {}. Existing S3 objects: {}",
backQuote(metadata_path + path), metadata.remote_fs_objects.size());
bool threadpool_read = read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool;
bool threadpool_read = read_settings.remote_fs_method == RemoteFSReadMethod::threadpool;
auto s3_impl = std::make_unique<ReadBufferFromS3Gather>(
path,

View File

@ -170,7 +170,7 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
req.SetKey(key);
/**
* If remote_filesystem_read_method = 'read_threadpool', then for MergeTree family tables
* If remote_filesystem_read_method = 'threadpool', then for MergeTree family tables
* exact byte ranges to read are always passed here.
*/
if (read_until_position)

View File

@ -46,7 +46,7 @@ enum class LocalFSReadMethod
enum class RemoteFSReadMethod
{
read,
read_threadpool,
threadpool,
};
class MMappedFileCache;

View File

@ -8,6 +8,7 @@
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSelectIntersectExceptQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTCreateQuery.h>
@ -86,7 +87,12 @@ private:
void visit(ASTSelectWithUnionQuery & select, ASTPtr &) const
{
for (auto & child : select.list_of_selects->children)
tryVisit<ASTSelectQuery>(child);
{
if (child->as<ASTSelectQuery>())
tryVisit<ASTSelectQuery>(child);
else if (child->as<ASTSelectIntersectExceptQuery>())
tryVisit<ASTSelectIntersectExceptQuery>(child);
}
}
void visit(ASTSelectQuery & select, ASTPtr &) const
@ -97,6 +103,19 @@ private:
visitChildren(select);
}
void visit(ASTSelectIntersectExceptQuery & select, ASTPtr &) const
{
for (auto & child : select.getListOfSelects())
{
if (child->as<ASTSelectQuery>())
tryVisit<ASTSelectQuery>(child);
else if (child->as<ASTSelectIntersectExceptQuery>())
tryVisit<ASTSelectIntersectExceptQuery>(child);
else if (child->as<ASTSelectWithUnionQuery>())
tryVisit<ASTSelectWithUnionQuery>(child);
}
}
void visit(ASTTablesInSelectQuery & tables, ASTPtr &) const
{
for (auto & child : tables.children)

View File

@ -1,6 +1,7 @@
#include <Interpreters/ApplyWithGlobalVisitor.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSelectIntersectExceptQuery.h>
#include <Parsers/ASTWithAlias.h>
namespace DB
@ -40,6 +41,31 @@ void ApplyWithGlobalVisitor::visit(
{
visit(*node_select, exprs, with_expression_list);
}
else if (ASTSelectIntersectExceptQuery * node_intersect_except = select->as<ASTSelectIntersectExceptQuery>())
{
visit(*node_intersect_except, exprs, with_expression_list);
}
}
}
void ApplyWithGlobalVisitor::visit(
ASTSelectIntersectExceptQuery & selects, const std::map<String, ASTPtr> & exprs, const ASTPtr & with_expression_list)
{
auto selects_list = selects.getListOfSelects();
for (auto & select : selects_list)
{
if (ASTSelectWithUnionQuery * node_union = select->as<ASTSelectWithUnionQuery>())
{
visit(*node_union, exprs, with_expression_list);
}
else if (ASTSelectQuery * node_select = select->as<ASTSelectQuery>())
{
visit(*node_select, exprs, with_expression_list);
}
else if (ASTSelectIntersectExceptQuery * node_intersect_except = select->as<ASTSelectIntersectExceptQuery>())
{
visit(*node_intersect_except, exprs, with_expression_list);
}
}
}
@ -47,7 +73,7 @@ void ApplyWithGlobalVisitor::visit(ASTPtr & ast)
{
if (ASTSelectWithUnionQuery * node_union = ast->as<ASTSelectWithUnionQuery>())
{
if (auto * first_select = node_union->list_of_selects->children[0]->as<ASTSelectQuery>())
if (auto * first_select = typeid_cast<ASTSelectQuery *>(node_union->list_of_selects->children[0].get()))
{
ASTPtr with_expression_list = first_select->with();
if (with_expression_list)
@ -64,6 +90,8 @@ void ApplyWithGlobalVisitor::visit(ASTPtr & ast)
visit(*union_child, exprs, with_expression_list);
else if (auto * select_child = (*it)->as<ASTSelectQuery>())
visit(*select_child, exprs, with_expression_list);
else if (auto * intersect_except_child = (*it)->as<ASTSelectIntersectExceptQuery>())
visit(*intersect_except_child, exprs, with_expression_list);
}
}
}

View File

@ -8,6 +8,7 @@ namespace DB
class ASTSelectWithUnionQuery;
class ASTSelectQuery;
class ASTSelectIntersectExceptQuery;
/// Pull out the WITH statement from the first child of ASTSelectWithUnion query if any.
class ApplyWithGlobalVisitor
@ -18,6 +19,7 @@ public:
private:
static void visit(ASTSelectWithUnionQuery & selects, const std::map<String, ASTPtr> & exprs, const ASTPtr & with_expression_list);
static void visit(ASTSelectQuery & select, const std::map<String, ASTPtr> & exprs, const ASTPtr & with_expression_list);
static void visit(ASTSelectIntersectExceptQuery & select, const std::map<String, ASTPtr> & exprs, const ASTPtr & with_expression_list);
};
}

View File

@ -572,35 +572,6 @@ VolumePtr Context::setTemporaryStorage(const String & path, const String & polic
return shared->tmp_volume;
}
void Context::setBackupsVolume(const String & path, const String & policy_name)
{
std::lock_guard lock(shared->storage_policies_mutex);
if (policy_name.empty())
{
String path_with_separator = path;
if (!path_with_separator.ends_with('/'))
path_with_separator += '/';
auto disk = std::make_shared<DiskLocal>("_backups_default", path_with_separator, 0);
shared->backups_volume = std::make_shared<SingleDiskVolume>("_backups_default", disk, 0);
}
else
{
StoragePolicyPtr policy = getStoragePolicySelector(lock)->get(policy_name);
if (policy->getVolumes().size() != 1)
throw Exception("Policy " + policy_name + " is used for backups, such policy should have exactly one volume",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
shared->backups_volume = policy->getVolume(0);
}
BackupFactory::instance().setBackupsVolume(shared->backups_volume);
}
VolumePtr Context::getBackupsVolume() const
{
std::lock_guard lock(shared->storage_policies_mutex);
return shared->backups_volume;
}
void Context::setFlagsPath(const String & path)
{
auto lock = getLock();

View File

@ -354,9 +354,6 @@ public:
VolumePtr setTemporaryStorage(const String & path, const String & policy_name = "");
void setBackupsVolume(const String & path, const String & policy_name = "");
VolumePtr getBackupsVolume() const;
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
/// Global application configuration settings.

View File

@ -191,13 +191,25 @@ private:
ASTTableJoin * table_join = node.table_join->as<ASTTableJoin>();
if (table_join->locality != ASTTableJoin::Locality::Global)
{
if (auto & subquery = node.table_expression->as<ASTTableExpression>()->subquery)
if (auto * table = node.table_expression->as<ASTTableExpression>())
{
std::vector<ASTPtr> renamed;
NonGlobalTableVisitor::Data table_data(data.getContext(), data.checker, renamed, nullptr, table_join);
NonGlobalTableVisitor(table_data).visit(subquery);
if (!renamed.empty()) //-V547
data.renamed_tables.emplace_back(subquery, std::move(renamed));
if (auto & subquery = table->subquery)
{
std::vector<ASTPtr> renamed;
NonGlobalTableVisitor::Data table_data(data.getContext(), data.checker, renamed, nullptr, table_join);
NonGlobalTableVisitor(table_data).visit(subquery);
if (!renamed.empty()) //-V547
data.renamed_tables.emplace_back(subquery, std::move(renamed));
}
else if (table->database_and_table_name)
{
auto tb = node.table_expression;
std::vector<ASTPtr> renamed;
NonGlobalTableVisitor::Data table_data{data.getContext(), data.checker, renamed, nullptr, table_join};
NonGlobalTableVisitor(table_data).visit(tb);
if (!renamed.empty()) //-V547
data.renamed_tables.emplace_back(tb, std::move(renamed));
}
}
}
}

View File

@ -12,40 +12,39 @@ namespace DB
{
namespace
{
BackupSettings getBackupSettings(const ASTBackupQuery & query)
BackupMutablePtr createBackup(const ASTBackupQuery & query, const ContextPtr & context)
{
BackupSettings settings;
BackupFactory::CreateParams params;
params.open_mode = (query.kind == ASTBackupQuery::BACKUP) ? IBackup::OpenMode::WRITE : IBackup::OpenMode::READ;
params.context = context;
params.backup_info = BackupInfo::fromAST(*query.backup_name);
if (query.base_backup_name)
params.base_backup_info = BackupInfo::fromAST(*query.base_backup_name);
return BackupFactory::instance().createBackup(params);
}
#if 0
void getBackupSettings(const ASTBackupQuery & query, BackupSettings & settings, std::optional<BaseBackupInfo> & base_backup)
{
settings = {};
if (query.settings)
settings.applyChanges(query.settings->as<const ASTSetQuery &>().changes);
return settings;
}
BackupPtr getBaseBackup(const BackupSettings & settings)
{
const String & base_backup_name = settings.base_backup;
if (base_backup_name.empty())
return nullptr;
return BackupFactory::instance().openBackup(base_backup_name);
}
#endif
void executeBackup(const ASTBackupQuery & query, const ContextPtr & context)
{
auto settings = getBackupSettings(query);
auto base_backup = getBaseBackup(settings);
BackupMutablePtr backup = createBackup(query, context);
auto backup_entries = makeBackupEntries(query.elements, context);
UInt64 estimated_backup_size = estimateBackupSize(backup_entries, base_backup);
auto backup = BackupFactory::instance().createBackup(query.backup_name, estimated_backup_size, base_backup);
writeBackupEntries(backup, std::move(backup_entries), context->getSettingsRef().max_backup_threads);
}
void executeRestore(const ASTBackupQuery & query, ContextMutablePtr context)
{
auto settings = getBackupSettings(query);
auto base_backup = getBaseBackup(settings);
auto backup = BackupFactory::instance().openBackup(query.backup_name, base_backup);
BackupPtr backup = createBackup(query, context);
auto restore_tasks = makeRestoreTasks(query.elements, context, backup);
executeRestoreTasks(std::move(restore_tasks), context->getSettingsRef().max_backup_threads);
}

View File

@ -56,7 +56,7 @@ InterpreterSelectIntersectExceptQuery::InterpreterSelectIntersectExceptQuery(
ASTSelectIntersectExceptQuery * ast = query_ptr->as<ASTSelectIntersectExceptQuery>();
final_operator = ast->final_operator;
const auto & children = ast->children;
const auto & children = ast->getListOfSelects();
size_t num_children = children.size();
/// AST must have been changed by the visitor.

View File

@ -878,52 +878,44 @@ static bool hasWithTotalsInAnySubqueryInFromClause(const ASTSelectQuery & query)
return true;
/** NOTE You can also check that the table in the subquery is distributed, and that it only looks at one shard.
* In other cases, totals will be computed on the initiating server of the query, and it is not necessary to read the data to the end.
*/
* In other cases, totals will be computed on the initiating server of the query, and it is not necessary to read the data to the end.
*/
if (auto query_table = extractTableExpression(query, 0))
{
if (const auto * ast_union = query_table->as<ASTSelectWithUnionQuery>())
{
for (const auto & elem : ast_union->list_of_selects->children)
/** NOTE
* 1. For ASTSelectWithUnionQuery after normalization for union child node the height of the AST tree is at most 2.
* 2. For ASTSelectIntersectExceptQuery after normalization in case there are intersect or except nodes,
* the height of the AST tree can have any depth (each intersect/except adds a level), but the
* number of children in those nodes is always 2.
*/
std::function<bool(ASTPtr)> traverse_recursively = [&](ASTPtr child_ast) -> bool
{
/// After normalization for union child node the height of the AST tree is at most 2.
if (const auto * child_union = elem->as<ASTSelectWithUnionQuery>())
if (const auto * select_child = child_ast->as <ASTSelectQuery>())
{
for (const auto & child_elem : child_union->list_of_selects->children)
if (hasWithTotalsInAnySubqueryInFromClause(child_elem->as<ASTSelectQuery &>()))
if (hasWithTotalsInAnySubqueryInFromClause(select_child->as<ASTSelectQuery &>()))
return true;
}
else if (const auto * union_child = child_ast->as<ASTSelectWithUnionQuery>())
{
for (const auto & subchild : union_child->list_of_selects->children)
if (traverse_recursively(subchild))
return true;
}
/// After normalization in case there are intersect or except nodes, the height of
/// the AST tree can have any depth (each intersect/except adds a level), but the
/// number of children in those nodes is always 2.
else if (elem->as<ASTSelectIntersectExceptQuery>())
else if (const auto * intersect_child = child_ast->as<ASTSelectIntersectExceptQuery>())
{
std::function<bool(ASTPtr)> traverse_recursively = [&](ASTPtr child_ast) -> bool
{
if (const auto * child = child_ast->as <ASTSelectQuery>())
return hasWithTotalsInAnySubqueryInFromClause(child->as<ASTSelectQuery &>());
if (const auto * child = child_ast->as<ASTSelectWithUnionQuery>())
for (const auto & subchild : child->list_of_selects->children)
if (traverse_recursively(subchild))
return true;
if (const auto * child = child_ast->as<ASTSelectIntersectExceptQuery>())
for (const auto & subchild : child->children)
if (traverse_recursively(subchild))
return true;
return false;
};
if (traverse_recursively(elem))
return true;
auto selects = intersect_child->getListOfSelects();
for (const auto & subchild : selects)
if (traverse_recursively(subchild))
return true;
}
else
{
if (hasWithTotalsInAnySubqueryInFromClause(elem->as<ASTSelectQuery &>()))
return true;
}
}
return false;
};
for (const auto & elem : ast_union->list_of_selects->children)
if (traverse_recursively(elem))
return true;
}
}

View File

@ -86,7 +86,9 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
if (num_children == 1 && settings_limit_offset_needed)
{
const ASTPtr first_select_ast = ast->list_of_selects->children.at(0);
ASTSelectQuery * select_query = first_select_ast->as<ASTSelectQuery>();
ASTSelectQuery * select_query = dynamic_cast<ASTSelectQuery *>(first_select_ast.get());
if (!select_query)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Invalid type in list_of_selects: {}", first_select_ast->getID());
if (!select_query->withFill() && !select_query->limit_with_ties)
{

View File

@ -6,6 +6,7 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTColumnsMatcher.h>
#include <Parsers/ASTQualifiedAsterisk.h>
#include <Parsers/ASTSelectIntersectExceptQuery.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/getTableExpressions.h>
#include <Interpreters/InterpreterSelectQuery.h>
@ -37,13 +38,44 @@ void PredicateRewriteVisitorData::visit(ASTSelectWithUnionQuery & union_select_q
for (size_t index = 0; index < internal_select_list.size(); ++index)
{
if (auto * child_union = internal_select_list[index]->as<ASTSelectWithUnionQuery>())
visit(*child_union, internal_select_list[index]);
else
{
if (index == 0)
visitFirstInternalSelect(*internal_select_list[0]->as<ASTSelectQuery>(), internal_select_list[0]);
else
visitOtherInternalSelect(*internal_select_list[index]->as<ASTSelectQuery>(), internal_select_list[index]);
visit(*child_union, internal_select_list[index]);
}
else if (auto * child_select = internal_select_list[index]->as<ASTSelectQuery>())
{
visitInternalSelect(index, *child_select, internal_select_list[index]);
}
else if (auto * child_intersect_except = internal_select_list[index]->as<ASTSelectIntersectExceptQuery>())
{
visit(*child_intersect_except, internal_select_list[index]);
}
}
}
void PredicateRewriteVisitorData::visitInternalSelect(size_t index, ASTSelectQuery & select_node, ASTPtr & node)
{
if (index == 0)
visitFirstInternalSelect(select_node, node);
else
visitOtherInternalSelect(select_node, node);
}
void PredicateRewriteVisitorData::visit(ASTSelectIntersectExceptQuery & intersect_except_query, ASTPtr &)
{
auto internal_select_list = intersect_except_query.getListOfSelects();
for (size_t index = 0; index < internal_select_list.size(); ++index)
{
if (auto * union_node = internal_select_list[index]->as<ASTSelectWithUnionQuery>())
{
visit(*union_node, internal_select_list[index]);
}
else if (auto * select_node = internal_select_list[index]->as<ASTSelectQuery>())
{
visitInternalSelect(index, *select_node, internal_select_list[index]);
}
else if (auto * intersect_node = internal_select_list[index]->as<ASTSelectIntersectExceptQuery>())
{
visit(*intersect_node, internal_select_list[index]);
}
}
}

View File

@ -10,6 +10,8 @@
namespace DB
{
class ASTSelectIntersectExceptQuery;
class PredicateRewriteVisitorData : WithContext
{
public:
@ -40,7 +42,11 @@ private:
void visitOtherInternalSelect(ASTSelectQuery & select_query, ASTPtr &);
void visit(ASTSelectIntersectExceptQuery & intersect_except_query, ASTPtr &);
bool rewriteSubquery(ASTSelectQuery & subquery, const Names & inner_columns);
void visitInternalSelect(size_t index, ASTSelectQuery & select_node, ASTPtr & node);
};
using PredicateRewriteMatcher = OneTypeMatcher<PredicateRewriteVisitorData, PredicateRewriteVisitorData::needChild>;

View File

@ -3,6 +3,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSelectIntersectExceptQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTKillQueryQuery.h>
#include <Parsers/queryNormalization.h>

View File

@ -94,10 +94,25 @@ namespace
}
}
void formatSettings(const IAST & settings, const IAST::FormatSettings & format)
void formatSettings(const ASTPtr & settings, const ASTPtr & base_backup_name, const IAST::FormatSettings & format)
{
if (!settings && !base_backup_name)
return;
format.ostr << (format.hilite ? IAST::hilite_keyword : "") << " SETTINGS " << (format.hilite ? IAST::hilite_none : "");
settings.format(format);
bool empty = true;
if (base_backup_name)
{
format.ostr << "base_backup = ";
base_backup_name->format(format);
empty = false;
}
if (settings)
{
if (!empty)
format.ostr << ", ";
settings->format(format);
}
}
}
@ -120,11 +135,11 @@ void ASTBackupQuery::formatImpl(const FormatSettings & format, FormatState &, Fo
formatElements(elements, kind, format);
if (settings)
formatSettings(*settings, format);
format.ostr << (format.hilite ? hilite_keyword : "") << ((kind == Kind::BACKUP) ? " TO " : " FROM ") << (format.hilite ? hilite_none : "");
backup_name->format(format);
format.ostr << (format.hilite ? hilite_keyword : "") << ((kind == Kind::BACKUP) ? " TO" : " FROM") << (format.hilite ? hilite_none : "");
format.ostr << " " << quoteString(backup_name);
if (settings || base_backup_name)
formatSettings(settings, base_backup_name, format);
}
}

View File

@ -16,8 +16,9 @@ using DatabaseAndTableName = std::pair<String, String>;
* TEMPORARY TABLE table_name [AS table_name_in_backup]
* ALL TEMPORARY TABLES |
* EVERYTHING } [,...]
* TO 'backup_name'
* SETTINGS base_backup='base_backup_name'
* TO { File('path/') |
* Disk('disk_name', 'path/')
* [SETTINGS base_backup = {File(...) | Disk(...)}]
*
* RESTORE { TABLE [db.]table_name_in_backup [INTO [db.]table_name] [PARTITION[S] partition_expr [,...]] |
* DICTIONARY [db.]dictionary_name_in_backup [INTO [db.]dictionary_name] |
@ -26,7 +27,7 @@ using DatabaseAndTableName = std::pair<String, String>;
* TEMPORARY TABLE table_name_in_backup [INTO table_name] |
* ALL TEMPORARY TABLES |
* EVERYTHING } [,...]
* FROM 'backup_name'
* FROM {File(...) | Disk(...)}
*
* Notes:
* RESTORE doesn't drop any data, it either creates a table or appends an existing table with restored data.
@ -76,7 +77,11 @@ public:
using Elements = std::vector<Element>;
Elements elements;
String backup_name;
ASTPtr backup_name;
/// Base backup. Only differences made after the base backup will be included in a newly created backup,
/// so this setting allows to make an incremental backup.
ASTPtr base_backup_name;
ASTPtr settings;

View File

@ -15,12 +15,10 @@ ASTPtr ASTSelectIntersectExceptQuery::clone() const
res->children.push_back(child->clone());
res->final_operator = final_operator;
cloneOutputOptions(*res);
return res;
}
void ASTSelectIntersectExceptQuery::formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
void ASTSelectIntersectExceptQuery::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
std::string indent_str = settings.one_line ? "" : std::string(4 * frame.indent, ' ');
@ -38,4 +36,21 @@ void ASTSelectIntersectExceptQuery::formatQueryImpl(const FormatSettings & setti
}
}
ASTs ASTSelectIntersectExceptQuery::getListOfSelects() const
{
/**
* Because of normalization actual number of selects is 2.
* But this is checked in InterpreterSelectIntersectExceptQuery.
*/
ASTs selects;
for (const auto & child : children)
{
if (typeid_cast<ASTSelectQuery *>(child.get())
|| typeid_cast<ASTSelectWithUnionQuery *>(child.get())
|| typeid_cast<ASTSelectIntersectExceptQuery *>(child.get()))
selects.push_back(child);
}
return selects;
}
}

View File

@ -1,22 +1,18 @@
#pragma once
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTSelectQuery.h>
namespace DB
{
class ASTSelectIntersectExceptQuery : public ASTQueryWithOutput
class ASTSelectIntersectExceptQuery : public ASTSelectQuery
{
public:
String getID(char) const override { return "SelectIntersectExceptQuery"; }
ASTPtr clone() const override;
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
const char * getQueryKindString() const override { return "SelectIntersectExcept"; }
enum class Operator
{
UNKNOWN,
@ -24,6 +20,12 @@ public:
EXCEPT
};
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
const char * getQueryKindString() const override { return "SelectIntersectExcept"; }
ASTs getListOfSelects() const;
/// Final operator after applying visitor.
Operator final_operator = Operator::UNKNOWN;
};

View File

@ -148,18 +148,47 @@ namespace
});
}
bool parseSettings(IParser::Pos & pos, Expected & expected, ASTPtr & settings)
bool parseBackupName(IParser::Pos & pos, Expected & expected, ASTPtr & backup_name)
{
return ParserIdentifierWithOptionalParameters{}.parse(pos, backup_name, expected);
}
bool parseBaseBackupSetting(IParser::Pos & pos, Expected & expected, ASTPtr & base_backup_name)
{
return IParserBase::wrapParseImpl(pos, [&]
{
return ParserKeyword{"base_backup"}.ignore(pos, expected)
&& ParserToken(TokenType::Equals).ignore(pos, expected)
&& parseBackupName(pos, expected, base_backup_name);
});
}
bool parseSettings(IParser::Pos & pos, Expected & expected, ASTPtr & settings, ASTPtr & base_backup_name)
{
return IParserBase::wrapParseImpl(pos, [&]
{
if (!ParserKeyword{"SETTINGS"}.ignore(pos, expected))
return false;
ASTPtr result;
if (!ParserSetQuery{true}.parse(pos, result, expected))
ASTPtr res_settings;
ASTPtr res_base_backup_name;
auto parse_setting = [&]
{
if (!res_settings && ParserSetQuery{true}.parse(pos, res_settings, expected))
return true;
if (!res_base_backup_name && parseBaseBackupSetting(pos, expected, res_base_backup_name))
return true;
return false;
};
if (!ParserList::parseUtil(pos, expected, parse_setting, false))
return false;
settings = std::move(result);
settings = std::move(res_settings);
base_backup_name = std::move(res_base_backup_name);
return true;
});
}
@ -182,13 +211,14 @@ bool ParserBackupQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!ParserKeyword{(kind == Kind::BACKUP) ? "TO" : "FROM"}.ignore(pos, expected))
return false;
ASTPtr ast;
if (!ParserStringLiteral{}.parse(pos, ast, expected))
ASTPtr backup_name;
if (!parseBackupName(pos, expected, backup_name))
return false;
String backup_name = ast->as<ASTLiteral &>().value.safeGet<String>();
ASTPtr settings;
parseSettings(pos, expected, settings);
ASTPtr base_backup_name;
parseSettings(pos, expected, settings, base_backup_name);
auto query = std::make_shared<ASTBackupQuery>();
node = query;
@ -196,6 +226,7 @@ bool ParserBackupQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
query->kind = kind;
query->elements = std::move(elements);
query->backup_name = std::move(backup_name);
query->base_backup_name = std::move(base_backup_name);
query->settings = std::move(settings);
return true;

View File

@ -13,8 +13,9 @@ namespace DB
* TEMPORARY TABLE table_name [AS table_name_in_backup]
* ALL TEMPORARY TABLES |
* EVERYTHING } [,...]
* TO 'backup_name'
* [SETTINGS base_backup = 'base_backup_name']
* TO { File('path/') |
* Disk('disk_name', 'path/')
* [SETTINGS base_backup = {FILE(...) | DISK(...)}]
*
* RESTORE { TABLE [db.]table_name_in_backup [INTO [db.]table_name] [PARTITION[S] partition_expr [,...]] |
* DICTIONARY [db.]dictionary_name_in_backup [INTO [db.]dictionary_name] |
@ -23,7 +24,7 @@ namespace DB
* TEMPORARY TABLE table_name_in_backup [INTO table_name] |
* ALL TEMPORARY TABLES |
* EVERYTHING } [,...]
* FROM 'backup_name'
* FROM {File(...) | Disk(...)}
*/
class ParserBackupQuery : public IParserBase
{

View File

@ -3558,7 +3558,7 @@ RestoreDataTasks MergeTreeData::restoreDataPartsFromBackup(const BackupPtr & bac
{
RestoreDataTasks restore_tasks;
Strings part_names = backup->list(data_path_in_backup);
Strings part_names = backup->listFiles(data_path_in_backup);
for (const String & part_name : part_names)
{
const auto part_info = MergeTreePartInfo::tryParsePartName(part_name, format_version);
@ -3570,9 +3570,9 @@ RestoreDataTasks MergeTreeData::restoreDataPartsFromBackup(const BackupPtr & bac
continue;
UInt64 total_size_of_part = 0;
Strings filenames = backup->list(data_path_in_backup + part_name + "/", "");
Strings filenames = backup->listFiles(data_path_in_backup + part_name + "/", "");
for (const String & filename : filenames)
total_size_of_part += backup->getSize(data_path_in_backup + part_name + "/" + filename);
total_size_of_part += backup->getFileSize(data_path_in_backup + part_name + "/" + filename);
std::shared_ptr<IReservation> reservation = getStoragePolicy()->reserveAndCheck(total_size_of_part);
@ -3596,7 +3596,7 @@ RestoreDataTasks MergeTreeData::restoreDataPartsFromBackup(const BackupPtr & bac
for (const String & filename : filenames)
{
auto backup_entry = backup->read(data_path_in_backup + part_name + "/" + filename);
auto backup_entry = backup->readFile(data_path_in_backup + part_name + "/" + filename);
auto read_buffer = backup_entry->getReadBuffer();
auto write_buffer = disk->writeFile(temp_part_dir + "/" + filename);
copyData(*read_buffer, *write_buffer);

View File

@ -978,7 +978,7 @@ RestoreDataTasks StorageLog::restoreFromBackup(const BackupPtr & backup, const S
for (const auto & data_file : data_files)
{
String file_path_in_backup = data_path_in_backup + fileName(data_file.path);
auto backup_entry = backup->read(file_path_in_backup);
auto backup_entry = backup->readFile(file_path_in_backup);
auto in = backup_entry->getReadBuffer();
auto out = disk->writeFile(data_file.path, max_compress_block_size, WriteMode::Append);
copyData(*in, *out);
@ -989,7 +989,7 @@ RestoreDataTasks StorageLog::restoreFromBackup(const BackupPtr & backup, const S
/// Append marks.
size_t num_extra_marks = 0;
String file_path_in_backup = data_path_in_backup + fileName(marks_file_path);
size_t file_size = backup->getSize(file_path_in_backup);
size_t file_size = backup->getFileSize(file_path_in_backup);
if (file_size % (num_data_files * sizeof(Mark)) != 0)
throw Exception("Size of marks file is inconsistent", ErrorCodes::SIZES_OF_MARKS_FILES_ARE_INCONSISTENT);
@ -1009,7 +1009,7 @@ RestoreDataTasks StorageLog::restoreFromBackup(const BackupPtr & backup, const S
old_num_rows[i] = num_marks ? data_files[i].marks[num_marks - 1].rows : 0;
}
auto backup_entry = backup->read(file_path_in_backup);
auto backup_entry = backup->readFile(file_path_in_backup);
auto marks_rb = backup_entry->getReadBuffer();
for (size_t i = 0; i != num_extra_marks; ++i)

View File

@ -576,18 +576,17 @@ RestoreDataTasks StorageStripeLog::restoreFromBackup(const BackupPtr & backup, c
auto old_data_size = file_checker.getFileSize(data_file_path);
{
String file_path_in_backup = data_path_in_backup + fileName(data_file_path);
auto backup_entry = backup->read(file_path_in_backup);
auto backup_entry = backup->readFile(file_path_in_backup);
auto in = backup_entry->getReadBuffer();
auto out = disk->writeFile(data_file_path, max_compress_block_size, WriteMode::Append);
copyData(*in, *out);
}
/// Append the index.
String index_path_in_backup = data_path_in_backup + fileName(index_file_path);
if (backup->exists(index_path_in_backup))
{
String index_path_in_backup = data_path_in_backup + fileName(index_file_path);
IndexForNativeFormat extra_indices;
auto backup_entry = backup->read(index_path_in_backup);
auto backup_entry = backup->readFile(index_path_in_backup);
auto index_in = backup_entry->getReadBuffer();
CompressedReadBuffer index_compressed_in{*index_in};
extra_indices.read(index_compressed_in);

View File

@ -765,7 +765,7 @@ class ClickHouseCluster:
with_kerberized_hdfs=False, with_mongo=False, with_mongo_secure=False, with_nginx=False,
with_redis=False, with_minio=False, with_cassandra=False, with_jdbc_bridge=False,
hostname=None, env_variables=None, image="clickhouse/integration-test", tag=None,
stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, tmpfs=None,
stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, external_dirs=None, tmpfs=None,
zookeeper_docker_compose_path=None, minio_certs_dir=None, use_keeper=True,
main_config_name="config.xml", users_config_name="users.xml", copy_common_configs=True, config_root_name="clickhouse"):
@ -839,6 +839,7 @@ class ClickHouseCluster:
main_config_name=main_config_name,
users_config_name=users_config_name,
copy_common_configs=copy_common_configs,
external_dirs=external_dirs,
tmpfs=tmpfs or [],
config_root_name=config_root_name)
@ -1795,6 +1796,7 @@ services:
{binary_volume}
{odbc_bridge_volume}
{library_bridge_volume}
{external_dirs_volumes}
{odbc_ini_path}
{keytab_path}
{krb5_conf}
@ -1837,7 +1839,7 @@ class ClickHouseInstance:
main_config_name="config.xml", users_config_name="users.xml", copy_common_configs=True,
hostname=None, env_variables=None,
image="clickhouse/integration-test", tag="latest",
stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, tmpfs=None, config_root_name="clickhouse"):
stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, external_dirs=None, tmpfs=None, config_root_name="clickhouse"):
self.name = name
self.base_cmd = cluster.base_cmd
@ -1845,6 +1847,7 @@ class ClickHouseInstance:
self.cluster = cluster
self.hostname = hostname if hostname is not None else self.name
self.external_dirs = external_dirs
self.tmpfs = tmpfs or []
self.base_config_dir = p.abspath(p.join(base_path, base_config_dir)) if base_config_dir else None
self.custom_main_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_main_configs]
@ -2574,6 +2577,14 @@ class ClickHouseInstance:
odbc_bridge_volume = "- " + self.odbc_bridge_bin_path + ":/usr/share/clickhouse-odbc-bridge_fresh"
library_bridge_volume = "- " + self.library_bridge_bin_path + ":/usr/share/clickhouse-library-bridge_fresh"
external_dirs_volumes = ""
if self.external_dirs:
for external_dir in self.external_dirs:
external_dir_abs_path = p.abspath(p.join(self.path, external_dir.lstrip('/')))
logging.info(f'external_dir_abs_path={external_dir_abs_path}')
os.mkdir(external_dir_abs_path)
external_dirs_volumes += "- " + external_dir_abs_path + ":" + external_dir + "\n"
with open(self.docker_compose_path, 'w') as docker_compose:
docker_compose.write(DOCKER_COMPOSE_TEMPLATE.format(
@ -2587,6 +2598,7 @@ class ClickHouseInstance:
instance_config_dir=instance_config_dir,
config_d_dir=self.config_d_dir,
db_dir=db_dir,
external_dirs_volumes=external_dirs_volumes,
tmpfs=str(self.tmpfs),
logs_dir=logs_dir,
depends_on=str(depends_on),

View File

@ -0,0 +1,15 @@
<?xml version="1.0"?>
<clickhouse>
<storage_configuration>
<disks>
<backups>
<type>local</type>
<path>/backups/</path>
</backups>
</disks>
</storage_configuration>
<backups>
<allowed_disk>backups</allowed_disk>
<allowed_path>/backups/</allowed_path>
</backups>
</clickhouse>

View File

@ -3,8 +3,7 @@ import re
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance')
instance = cluster.add_instance('instance', main_configs=["configs/backups_disk.xml"], external_dirs=["/backups/"])
def create_and_fill_table(engine="MergeTree"):
if engine == "MergeTree":
@ -35,8 +34,7 @@ backup_id_counter = 0
def new_backup_name():
global backup_id_counter
backup_id_counter += 1
return f"test-backup-{backup_id_counter}"
return f"Disk('backups', '{backup_id_counter}/')"
@pytest.mark.parametrize("engine", ["MergeTree", "Log", "TinyLog", "StripeLog"])
@ -45,12 +43,12 @@ def test_restore_table(engine):
create_and_fill_table(engine=engine)
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO '{backup_name}'")
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
instance.query("DROP TABLE test.table")
assert instance.query("EXISTS test.table") == "0\n"
instance.query(f"RESTORE TABLE test.table FROM '{backup_name}'")
instance.query(f"RESTORE TABLE test.table FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
@ -60,12 +58,12 @@ def test_restore_table_into_existing_table(engine):
create_and_fill_table(engine=engine)
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO '{backup_name}'")
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
instance.query(f"RESTORE TABLE test.table INTO test.table FROM '{backup_name}'")
instance.query(f"RESTORE TABLE test.table INTO test.table FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "200\t9900\n"
instance.query(f"RESTORE TABLE test.table INTO test.table FROM '{backup_name}'")
instance.query(f"RESTORE TABLE test.table INTO test.table FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "300\t14850\n"
@ -74,11 +72,11 @@ def test_restore_table_under_another_name():
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO '{backup_name}'")
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
assert instance.query("EXISTS test.table2") == "0\n"
instance.query(f"RESTORE TABLE test.table INTO test.table2 FROM '{backup_name}'")
instance.query(f"RESTORE TABLE test.table INTO test.table2 FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table2") == "100\t4950\n"
@ -87,11 +85,11 @@ def test_backup_table_under_another_name():
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table AS test.table2 TO '{backup_name}'")
instance.query(f"BACKUP TABLE test.table AS test.table2 TO {backup_name}")
assert instance.query("EXISTS test.table2") == "0\n"
instance.query(f"RESTORE TABLE test.table2 FROM '{backup_name}'")
instance.query(f"RESTORE TABLE test.table2 FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table2") == "100\t4950\n"
@ -101,14 +99,14 @@ def test_incremental_backup():
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO '{backup_name}'")
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
instance.query("INSERT INTO test.table VALUES (65, 'a'), (66, 'b')")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "102\t5081\n"
instance.query(f"BACKUP TABLE test.table TO '{incremental_backup_name}' SETTINGS base_backup = '{backup_name}'")
instance.query(f"BACKUP TABLE test.table TO {incremental_backup_name} SETTINGS base_backup = {backup_name}")
instance.query(f"RESTORE TABLE test.table AS test.table2 FROM '{incremental_backup_name}'")
instance.query(f"RESTORE TABLE test.table AS test.table2 FROM {incremental_backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table2") == "102\t5081\n"
@ -116,10 +114,36 @@ def test_backup_not_found_or_already_exists():
backup_name = new_backup_name()
expected_error = "Backup .* not found"
assert re.search(expected_error, instance.query_and_get_error(f"RESTORE TABLE test.table AS test.table2 FROM '{backup_name}'"))
assert re.search(expected_error, instance.query_and_get_error(f"RESTORE TABLE test.table AS test.table2 FROM {backup_name}"))
create_and_fill_table()
instance.query(f"BACKUP TABLE test.table TO '{backup_name}'")
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
expected_error = "Backup .* already exists"
assert re.search(expected_error, instance.query_and_get_error(f"BACKUP TABLE test.table TO '{backup_name}'"))
assert re.search(expected_error, instance.query_and_get_error(f"BACKUP TABLE test.table TO {backup_name}"))
def test_file_engine():
backup_name = f"File('/backups/file/')"
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
instance.query("DROP TABLE test.table")
assert instance.query("EXISTS test.table") == "0\n"
instance.query(f"RESTORE TABLE test.table FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
def test_database():
backup_name = new_backup_name()
create_and_fill_table()
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
instance.query(f"BACKUP DATABASE test TO {backup_name}")
instance.query("DROP DATABASE test")
instance.query(f"RESTORE DATABASE test FROM {backup_name}")
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"

View File

@ -1,7 +1,7 @@
<yandex>
<profiles>
<default>
<remote_filesystem_read_method>read_threadpool</remote_filesystem_read_method>
<remote_filesystem_read_method>threadpool</remote_filesystem_read_method>
</default>
</profiles>
</yandex>

View File

@ -1,7 +1,7 @@
<yandex>
<profiles>
<default>
<remote_filesystem_read_method>read_threadpool</remote_filesystem_read_method>
<remote_filesystem_read_method>threadpool</remote_filesystem_read_method>
</default>
</profiles>
</yandex>

View File

@ -2,6 +2,7 @@ import os
import pytest
from helpers.cluster import ClickHouseCluster
from pyhdfs import HdfsClient
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_hdfs=True)
@ -238,11 +239,21 @@ def test_virtual_columns(started_cluster):
def test_read_files_with_spaces(started_cluster):
hdfs_api = started_cluster.hdfs_api
hdfs_api.write_data("/test test test 1.txt", "1\n")
hdfs_api.write_data("/test test test 2.txt", "2\n")
hdfs_api.write_data("/test test test 3.txt", "3\n")
node1.query("create table test (id UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/test*', 'TSV')")
fs = HdfsClient(hosts=started_cluster.hdfs_ip)
dir = '/test_spaces'
exists = fs.exists(dir)
if exists:
fs.delete(dir, recursive=True)
fs.mkdirs(dir)
hdfs_api.write_data(f"{dir}/test test test 1.txt", "1\n")
hdfs_api.write_data(f"{dir}/test test test 2.txt", "2\n")
hdfs_api.write_data(f"{dir}/test test test 3.txt", "3\n")
node1.query(f"create table test (id UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/{dir}/test*', 'TSV')")
assert node1.query("select * from test order by id") == "1\n2\n3\n"
fs.delete(dir, recursive=True)
def test_truncate_table(started_cluster):

View File

@ -42,7 +42,7 @@ GLOBAL ALL INNER JOIN
(
SELECT id
FROM t1_distr AS d1
ALL INNER JOIN t2_distr AS d2 ON id = d2.id
GLOBAL ALL INNER JOIN t2_distr AS d2 ON id = d2.id
WHERE id > 0
ORDER BY id ASC
) AS s0 USING (id)

View File

@ -134,3 +134,8 @@ UNION ALL
SELECT 1
EXCEPT
SELECT 4
set limit=1;
select 1 intersect select 1;
1
(((select 1) intersect select 1));
1

View File

@ -48,3 +48,7 @@ select 1 intersect select count() from (select 1 except select 2 intersect selec
explain syntax select 1 intersect select 1;
explain syntax select 1 except select 1;
explain syntax select 1 union all select 2 except (select 2 except select 1 union all select 1) except select 4;
set limit=1;
select 1 intersect select 1;
(((select 1) intersect select 1));

View File

@ -1,3 +1,2 @@
2000000
2000000
2000000
100000
100000

View File

@ -18,7 +18,7 @@ rm -rf ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME:?}/*
chmod 777 ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/
for i in {1..200}
for i in {1..10}
do
${CLICKHOUSE_CLIENT} --query "insert into function file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/test$i.csv', 'CSV', 'k UInt32, v UInt32') select number, number from numbers(10000);"
done
@ -28,14 +28,7 @@ ${CLICKHOUSE_CLIENT} --query "create table file_log(k UInt32, v UInt32) engine=F
${CLICKHOUSE_CLIENT} --query "select count() from file_log "
for i in {201..400}
do
${CLICKHOUSE_CLIENT} --query "insert into function file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/test$i.csv', 'CSV', 'k UInt32, v UInt32') select number, number from numbers(10000);"
done
${CLICKHOUSE_CLIENT} --query "select count() from file_log "
for i in {401..600}
for i in {11..20}
do
${CLICKHOUSE_CLIENT} --query "insert into function file('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/test$i.csv', 'CSV', 'k UInt32, v UInt32') select number, number from numbers(10000);"
done

View File

@ -0,0 +1,36 @@
1
2
3
1
2
3
SELECT a
FROM t1_all AS t1
ALL INNER JOIN test_02115.t2_local AS t2 ON a = t2.a
1
2
3
1
2
3
1
2
3
1
2
3
SELECT a
FROM t1_all AS t1
GLOBAL ALL INNER JOIN t2_all AS t2 ON a = t2.a
1
1
2
2
3
3
1
1
2
2
3
3

View File

@ -0,0 +1,33 @@
-- Tags: global, no-parallel
CREATE DATABASE IF NOT EXISTS test_02115;
USE test_02115;
DROP TABLE IF EXISTS t1_local;
DROP TABLE IF EXISTS t2_local;
DROP TABLE IF EXISTS t1_all;
DROP TABLE IF EXISTS t2_all;
create table t1_local(a Int32) engine=MergeTree() order by a;
create table t2_local as t1_local;
create table t1_all as t1_local engine Distributed(test_cluster_two_shards_localhost, test_02115, t1_local, rand());
create table t2_all as t2_local engine Distributed(test_cluster_two_shards_localhost, test_02115, t2_local, rand());
insert into t1_local values(1), (2), (3);
insert into t2_local values(1), (2), (3);
set distributed_product_mode = 'local';
select * from t1_all t1 where t1.a in (select t2.a from t2_all t2);
explain syntax select t1.* from t1_all t1 join t2_all t2 on t1.a = t2.a;
select t1.* from t1_all t1 join t2_all t2 on t1.a = t2.a;
set distributed_product_mode = 'global';
select * from t1_all t1 where t1.a in (select t2.a from t2_all t2);
explain syntax select t1.* from t1_all t1 join t2_all t2 on t1.a = t2.a;
select t1.* from t1_all t1 join t2_all t2 on t1.a = t2.a;
DROP TABLE t1_local;
DROP TABLE t2_local;
DROP TABLE t1_all;
DROP TABLE t2_all;
DROP DATABASE test_02115;