Restore parts of MergeTree in correct order.

This commit is contained in:
Vitaly Baranov 2022-05-19 14:36:27 +02:00
parent e891eba80e
commit 5cabdbd982
6 changed files with 153 additions and 192 deletions

View File

@ -4,6 +4,7 @@
#include <Backups/BackupEntryFromSmallFile.h>
#include <Backups/IBackup.h>
#include <Backups/IRestoreTask.h>
#include <Backups/RestoreSettings.h>
#include <Compression/CompressedReadBuffer.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
@ -4065,12 +4066,7 @@ BackupEntries MergeTreeData::backupData(ContextPtr local_context, const ASTs & p
data_parts = getVisibleDataPartsVector(local_context);
else
data_parts = getVisibleDataPartsVectorInPartitions(local_context, getPartitionIDsFromQuery(partitions, local_context));
return backupDataParts(data_parts);
}
BackupEntries MergeTreeData::backupDataParts(const DataPartsVector & data_parts)
{
BackupEntries backup_entries;
std::map<DiskPtr, std::shared_ptr<TemporaryFileOnDisk>> temp_dirs;
@ -4115,22 +4111,27 @@ class MergeTreeDataRestoreTask : public IRestoreTask
public:
MergeTreeDataRestoreTask(
const std::shared_ptr<MergeTreeData> & storage_,
const std::unordered_set<String> & partition_ids_,
const BackupPtr & backup_,
const String & data_path_in_backup_,
const std::unordered_set<String> & partition_ids_,
SimpleIncrement * increment_)
const StorageRestoreSettings & restore_settings_,
const std::shared_ptr<IRestoreCoordination> & restore_coordination_)
: storage(storage_)
, partition_ids(partition_ids_)
, backup(backup_)
, data_path_in_backup(data_path_in_backup_)
, partition_ids(partition_ids_)
, increment(increment_)
, restore_settings(restore_settings_)
, restore_coordination(restore_coordination_)
{
}
RestoreTasks run() override
{
RestoreTasks restore_part_tasks;
RestoreTasks restore_tasks;
Strings part_names = backup->listFiles(data_path_in_backup);
auto attach_task_ptr = std::make_shared<std::unique_ptr<AttachPartsTask>>(std::make_unique<AttachPartsTask>(storage));
std::unordered_map<String, bool> partitions_restored_by_us;
for (const String & part_name : part_names)
{
const auto part_info = MergeTreePartInfo::tryParsePartName(part_name, storage->format_version);
@ -4140,35 +4141,51 @@ public:
if (!partition_ids.empty() && !partition_ids.contains(part_info->partition_id))
continue;
restore_part_tasks.push_back(
std::make_unique<RestorePartTask>(storage, backup, data_path_in_backup, part_name, *part_info, increment));
auto it = partitions_restored_by_us.find(part_info->partition_id);
if (it == partitions_restored_by_us.end())
{
it = partitions_restored_by_us.emplace(
part_info->partition_id,
storage->startRestoringPartition(part_info->partition_id, restore_settings, restore_coordination)).first;
}
if (!it->second)
continue; /// Other replica is already restoring this partition.
(*attach_task_ptr)->increaseNumParts(1);
restore_tasks.push_back(
std::make_unique<RestorePartTask>(storage, part_name, *part_info, backup, data_path_in_backup, attach_task_ptr));
}
return restore_part_tasks;
return restore_tasks;
}
private:
using MutableDataPartPtr = std::shared_ptr<IMergeTreeDataPart>;
using MutableDataPartsVector = std::vector<MutableDataPartPtr>;
class AttachPartsTask;
std::shared_ptr<MergeTreeData> storage;
std::unordered_set<String> partition_ids;
BackupPtr backup;
String data_path_in_backup;
std::unordered_set<String> partition_ids;
SimpleIncrement * increment;
StorageRestoreSettings restore_settings;
std::shared_ptr<IRestoreCoordination> restore_coordination;
class RestorePartTask : public IRestoreTask
{
public:
RestorePartTask(
const std::shared_ptr<MergeTreeData> & storage_,
const BackupPtr & backup_,
const String & data_path_in_backup_,
const String & part_name_,
const MergeTreePartInfo & part_info_,
SimpleIncrement * increment_)
const BackupPtr & backup_,
const String & data_path_in_backup_,
const std::shared_ptr<std::unique_ptr<AttachPartsTask>> & attach_task_ptr_)
: storage(storage_)
, backup(backup_)
, data_path_in_backup(data_path_in_backup_)
, part_name(part_name_)
, part_info(part_info_)
, increment(increment_)
, attach_task_ptr(attach_task_ptr_)
{
}
@ -4205,8 +4222,16 @@ private:
disk->removeFileIfExists(fs::path(temp_part_dir) / IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME);
part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
part->loadColumnsChecksumsIndexes(false, true);
storage->renameTempPartAndAdd(part, NO_TRANSACTION_RAW, increment);
return {};
auto & attach_task = *attach_task_ptr;
assert(attach_task);
attach_task->addPart(part, temp_part_dir_owner);
if (!attach_task->containsAllParts())
return {};
RestoreTasks restore_tasks;
restore_tasks.emplace_back(std::move(attach_task));
return restore_tasks;
}
private:
@ -4215,17 +4240,80 @@ private:
String data_path_in_backup;
String part_name;
MergeTreePartInfo part_info;
SimpleIncrement * increment;
std::shared_ptr<std::unique_ptr<AttachPartsTask>> attach_task_ptr;
};
class AttachPartsTask : public IRestoreTask
{
public:
AttachPartsTask(const std::shared_ptr<MergeTreeData> & storage_) : storage(storage_) {}
void increaseNumParts(size_t add_num_parts) { std::lock_guard lock{mutex}; num_parts += add_num_parts; }
void addPart(MutableDataPartPtr part, std::shared_ptr<TemporaryFileOnDisk> temp_part_dir_owner)
{
std::lock_guard lock{mutex};
parts.emplace_back(part);
temp_part_dir_owners.emplace_back(temp_part_dir_owner);
}
bool containsAllParts() const
{
std::lock_guard lock{mutex};
return parts.size() == num_parts;
}
RestoreTasks run() override
{
assert(containsAllParts());
std::lock_guard lock{mutex};
/// Sort parts by min_block (because we need to preserve the order of parts).
std::sort(
parts.begin(),
parts.end(),
[](const MutableDataPartPtr & lhs, const MutableDataPartPtr & rhs) { return lhs->info.min_block < rhs->info.min_block; });
storage->attachRestoredParts(std::move(parts));
parts.clear();
temp_part_dir_owners.clear();
return {};
}
private:
std::shared_ptr<MergeTreeData> storage;
size_t num_parts = 0;
MutableDataPartsVector parts;
std::vector<std::shared_ptr<TemporaryFileOnDisk>> temp_part_dir_owners;
mutable std::mutex mutex;
};
};
RestoreTaskPtr MergeTreeData::restoreDataParts(const std::unordered_set<String> & partition_ids,
const BackupPtr & backup, const String & data_path_in_backup,
SimpleIncrement * increment)
RestoreTaskPtr MergeTreeData::restoreData(
ContextMutablePtr local_context,
const ASTs & partitions,
const BackupPtr & backup,
const String & data_path_in_backup,
const StorageRestoreSettings & restore_settings,
const std::shared_ptr<IRestoreCoordination> & restore_coordination)
{
return std::make_unique<MergeTreeDataRestoreTask>(
std::static_pointer_cast<MergeTreeData>(shared_from_this()), backup, data_path_in_backup, partition_ids, increment);
std::static_pointer_cast<MergeTreeData>(shared_from_this()),
getPartitionIDsFromQuery(partitions, local_context),
backup,
data_path_in_backup,
restore_settings,
restore_coordination);
}
bool MergeTreeData::startRestoringPartition(
const String & /* partition_id */,
const StorageRestoreSettings & /* restore_settings */,
const std::shared_ptr<IRestoreCoordination> & /* restore_coordination */) const
{
return true;
}

View File

@ -719,14 +719,9 @@ public:
/// Prepares entries to backup data of the storage.
BackupEntries backupData(ContextPtr context, const ASTs & partitions) override;
static BackupEntries backupDataParts(const DataPartsVector & data_parts);
/// Extract data from the backup and put it to the storage.
RestoreTaskPtr restoreDataParts(
const std::unordered_set<String> & partition_ids,
const BackupPtr & backup,
const String & data_path_in_backup,
SimpleIncrement * increment);
RestoreTaskPtr restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings & restore_settings, const std::shared_ptr<IRestoreCoordination> & restore_coordination) override;
/// Moves partition to specified Disk
void movePartitionToDisk(const ASTPtr & partition, const String & name, bool moving_part, ContextPtr context);
@ -1007,6 +1002,7 @@ protected:
friend class MergeTask;
friend class IPartMetadataManager;
friend class IMergedBlockOutputStream; // for access to log
friend class MergeTreeDataRestoreTask;
bool require_part_metadata;
@ -1234,6 +1230,13 @@ protected:
/// Moves part to specified space, used in ALTER ... MOVE ... queries
bool movePartsToSpace(const DataPartsVector & parts, SpacePtr space);
/// Starts restoring a partition, if the function returns false the partition will be skipped.
/// Overriden by the replicated storage to skip partitions in case other replicas are already restoring them.
virtual bool startRestoringPartition(const String & partition_id, const StorageRestoreSettings & restore_settings, const std::shared_ptr<IRestoreCoordination> & restore_coordination) const;
/// Attaches restored parts to the storage.
virtual void attachRestoredParts(MutableDataPartsVector && parts) = 0;
static void incrementInsertedPartsProfileEvent(MergeTreeDataPartType type);
static void incrementMergedPartsProfileEvent(MergeTreeDataPartType type);

View File

@ -3,7 +3,6 @@
#include <optional>
#include <base/sort.h>
#include <Backups/IRestoreTask.h>
#include <Databases/IDatabase.h>
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
@ -1783,9 +1782,10 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
}
RestoreTaskPtr StorageMergeTree::restoreData(ContextMutablePtr local_context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings &, const std::shared_ptr<IRestoreCoordination> &)
void StorageMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
{
return restoreDataParts(getPartitionIDsFromQuery(partitions, local_context), backup, data_path_in_backup, &increment);
for (auto part : parts)
renameTempPartAndAdd(part, NO_TRANSACTION_RAW, &increment);
}

View File

@ -105,8 +105,6 @@ public:
CheckResults checkData(const ASTPtr & query, ContextPtr context) override;
RestoreTaskPtr restoreData(ContextMutablePtr context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings & restore_settings, const std::shared_ptr<IRestoreCoordination> & restore_coordination) override;
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;
MergeTreeDeduplicationLog * getDeduplicationLog() { return deduplication_log.get(); }
@ -255,6 +253,9 @@ private:
void startBackgroundMovesIfNeeded() override;
/// Attaches restored parts to the storage.
void attachRestoredParts(MutableDataPartsVector && parts) override;
std::unique_ptr<MergeTreeSettings> getDefaultSettings() const override;
friend class MergeTreeSink;

View File

@ -71,12 +71,8 @@
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Backups/IBackup.h>
#include <Backups/IBackupEntry.h>
#include <Backups/IRestoreTask.h>
#include <Backups/IRestoreCoordination.h>
#include <Backups/RestoreSettings.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Poco/DirectoryIterator.h>
@ -8227,139 +8223,25 @@ void StorageReplicatedMergeTree::createAndStoreFreezeMetadata(DiskPtr disk, Data
}
class ReplicatedMergeTreeRestoreTask : public IRestoreTask
bool StorageReplicatedMergeTree::startRestoringPartition(
const String & partition_id,
const StorageRestoreSettings & restore_settings,
const std::shared_ptr<IRestoreCoordination> & restore_coordination) const
{
public:
ReplicatedMergeTreeRestoreTask(
const std::shared_ptr<StorageReplicatedMergeTree> & storage_,
const std::unordered_set<String> & partition_ids_,
const BackupPtr & backup_,
const StorageRestoreSettings & restore_settings_,
const std::shared_ptr<IRestoreCoordination> & restore_coordination_)
: storage(storage_)
, partition_ids(partition_ids_)
, backup(backup_)
, restore_settings(restore_settings_)
, restore_coordination(restore_coordination_)
{
}
RestoreTasks run() override
{
RestoreTasks restore_part_tasks;
String full_zk_path = storage->getZooKeeperName() + storage->getZooKeeperPath();
String data_path_in_backup = restore_coordination->getReplicatedTableDataPath(full_zk_path);
auto storage_id = storage->getStorageID();
DatabaseAndTableName table_name = {storage_id.database_name, storage_id.table_name};
std::unordered_map<String, bool> partitions_restored_by_us;
Strings part_names = backup->listFiles(data_path_in_backup);
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
auto sink = std::make_shared<ReplicatedMergeTreeSink>(*storage, metadata_snapshot, 0, 0, 0, false, false, storage->getContext(), /*is_attach*/true);
for (const String & part_name : part_names)
{
const auto part_info = MergeTreePartInfo::tryParsePartName(part_name, storage->format_version);
if (!part_info)
continue;
if (!partition_ids.empty() && !partition_ids.contains(part_info->partition_id))
continue;
auto it = partitions_restored_by_us.find(part_info->partition_id);
if (it == partitions_restored_by_us.end())
{
it = partitions_restored_by_us.emplace(
part_info->partition_id,
restore_coordination->startInsertingDataToPartitionInReplicatedTable(
restore_settings.host_id, table_name, full_zk_path, part_info->partition_id)).first;
}
if (!it->second)
continue; /// Other replica is already restoring this partition.
restore_part_tasks.push_back(
std::make_unique<RestorePartTask>(storage, sink, part_name, *part_info, backup, data_path_in_backup));
}
return restore_part_tasks;
}
private:
std::shared_ptr<StorageReplicatedMergeTree> storage;
std::unordered_set<String> partition_ids;
BackupPtr backup;
StorageRestoreSettings restore_settings;
std::shared_ptr<IRestoreCoordination> restore_coordination;
class RestorePartTask : public IRestoreTask
{
public:
RestorePartTask(
const std::shared_ptr<StorageReplicatedMergeTree> & storage_,
const std::shared_ptr<ReplicatedMergeTreeSink> & sink_,
const String & part_name_,
const MergeTreePartInfo & part_info_,
const BackupPtr & backup_,
const String & data_path_in_backup_)
: storage(storage_)
, sink(sink_)
, part_name(part_name_)
, part_info(part_info_)
, backup(backup_)
, data_path_in_backup(data_path_in_backup_)
{
}
RestoreTasks run() override
{
UInt64 total_size_of_part = 0;
Strings filenames = backup->listFiles(data_path_in_backup + part_name + "/", "");
for (const String & filename : filenames)
total_size_of_part += backup->getFileSize(data_path_in_backup + part_name + "/" + filename);
std::shared_ptr<IReservation> reservation = storage->getStoragePolicy()->reserveAndCheck(total_size_of_part);
auto disk = reservation->getDisk();
String relative_data_path = storage->getRelativeDataPath();
auto temp_part_dir_owner = std::make_shared<TemporaryFileOnDisk>(disk, relative_data_path + "restoring_" + part_name + "_");
String temp_part_dir = temp_part_dir_owner->getPath();
disk->createDirectories(temp_part_dir);
assert(temp_part_dir.starts_with(relative_data_path));
String relative_temp_part_dir = temp_part_dir.substr(relative_data_path.size());
for (const String & filename : filenames)
{
auto backup_entry = backup->readFile(fs::path(data_path_in_backup) / part_name / filename);
auto read_buffer = backup_entry->getReadBuffer();
auto write_buffer = disk->writeFile(fs::path(temp_part_dir) / filename);
copyData(*read_buffer, *write_buffer);
reservation->update(reservation->getSize() - backup_entry->getSize());
}
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
auto part = storage->createPart(part_name, part_info, single_disk_volume, relative_temp_part_dir);
/// TODO Transactions: Decide what to do with version metadata (if any). Let's just remove it for now.
disk->removeFileIfExists(fs::path(temp_part_dir) / IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME);
part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
part->loadColumnsChecksumsIndexes(false, true);
sink->writeExistingPart(part);
return {};
}
private:
std::shared_ptr<StorageReplicatedMergeTree> storage;
std::shared_ptr<ReplicatedMergeTreeSink> sink;
String part_name;
MergeTreePartInfo part_info;
BackupPtr backup;
String data_path_in_backup;
};
};
String full_zk_path = getZooKeeperName() + getZooKeeperPath();
auto storage_id = getStorageID();
DatabaseAndTableName table_name = {storage_id.database_name, storage_id.table_name};
return restore_coordination->startInsertingDataToPartitionInReplicatedTable(
restore_settings.host_id, table_name, full_zk_path, partition_id);
}
void StorageReplicatedMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
auto sink = std::make_shared<ReplicatedMergeTreeSink>(*this, metadata_snapshot, 0, 0, 0, false, false, getContext(), /*is_attach*/true);
for (auto part : parts)
sink->writeExistingPart(part);
}
#if 0
PartsTemporaryRename renamed_parts(*this, "detached/");
@ -8387,21 +8269,4 @@ for (size_t i = 0; i < loaded_parts.size(); ++i)
}
#endif
RestoreTaskPtr StorageReplicatedMergeTree::restoreData(
ContextMutablePtr local_context,
const ASTs & partitions,
const BackupPtr & backup,
const String & /* data_path_in_backup */,
const StorageRestoreSettings & restore_settings,
const std::shared_ptr<IRestoreCoordination> & restore_coordination)
{
return std::make_unique<ReplicatedMergeTreeRestoreTask>(
std::static_pointer_cast<StorageReplicatedMergeTree>(shared_from_this()),
getPartitionIDsFromQuery(partitions, local_context),
backup,
restore_settings,
restore_coordination);
}
}

View File

@ -230,9 +230,6 @@ public:
static bool removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path,
const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, Poco::Logger * logger);
/// Extract data from the backup and put it to the storage.
RestoreTaskPtr restoreData(ContextMutablePtr local_context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings & restore_settings, const std::shared_ptr<IRestoreCoordination> & restore_coordination) override;
/// Schedules job to execute in background pool (merge, mutate, drop range and so on)
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;
@ -780,6 +777,13 @@ private:
void startBackgroundMovesIfNeeded() override;
/// Starts restoring a partition, if the function returns false the partition will be skipped.
/// We need to skip partitions in case other replicas are already restoring them.
bool startRestoringPartition(const String & partition_id, const StorageRestoreSettings & restore_settings, const std::shared_ptr<IRestoreCoordination> & restore_coordination) const override;
/// Attaches restored parts to the storage.
void attachRestoredParts(MutableDataPartsVector && parts) override;
std::unique_ptr<MergeTreeSettings> getDefaultSettings() const override;
PartitionBlockNumbersHolder allocateBlockNumbersInAffectedPartitions(