Implement RESTORE for replicated tables.

This commit is contained in:
Vitaly Baranov 2022-04-13 15:26:17 +02:00
parent 6c3333b50b
commit acd28d8a1d
6 changed files with 299 additions and 0 deletions

View File

@ -68,6 +68,11 @@
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Backups/IBackup.h>
#include <Backups/IBackupEntry.h>
#include <Backups/IRestoreTask.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Poco/DirectoryIterator.h>
#include <base/scope_guard.h>
@ -8195,4 +8200,160 @@ void StorageReplicatedMergeTree::createAndStoreFreezeMetadata(DiskPtr disk, Data
}
class ReplicatedMergeTreeRestoreTask : public IRestoreTask
{
public:
ReplicatedMergeTreeRestoreTask(
const ContextPtr & query_context_,
const std::shared_ptr<StorageReplicatedMergeTree> & storage_,
const std::unordered_set<String> & partition_ids_,
const BackupPtr & backup_,
const String & data_path_in_backup_)
: query_context(query_context_)
, storage(storage_)
, partition_ids(partition_ids_)
, backup(backup_)
, data_path_in_backup(data_path_in_backup_)
{
}
RestoreTasks run() override
{
RestoreTasks restore_part_tasks;
Strings part_names = backup->listFiles(data_path_in_backup);
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
auto sink = std::make_shared<ReplicatedMergeTreeSink>(*storage, metadata_snapshot, 0, 0, 0, false, false, query_context, /*is_attach*/true);
for (const String & part_name : part_names)
{
const auto part_info = MergeTreePartInfo::tryParsePartName(part_name, storage->format_version);
if (!part_info)
continue;
if (!partition_ids.empty() && !partition_ids.contains(part_info->partition_id))
continue;
restore_part_tasks.push_back(
std::make_unique<RestorePartTask>(storage, sink, part_name, *part_info, backup, data_path_in_backup));
}
return restore_part_tasks;
}
private:
ContextPtr query_context;
std::shared_ptr<StorageReplicatedMergeTree> storage;
std::unordered_set<String> partition_ids;
BackupPtr backup;
String data_path_in_backup;
class RestorePartTask : public IRestoreTask
{
public:
RestorePartTask(
const std::shared_ptr<StorageReplicatedMergeTree> & storage_,
const std::shared_ptr<ReplicatedMergeTreeSink> & sink_,
const String & part_name_,
const MergeTreePartInfo & part_info_,
const BackupPtr & backup_,
const String & data_path_in_backup_)
: storage(storage_)
, sink(sink_)
, part_name(part_name_)
, part_info(part_info_)
, backup(backup_)
, data_path_in_backup(data_path_in_backup_)
{
}
RestoreTasks run() override
{
UInt64 total_size_of_part = 0;
Strings filenames = backup->listFiles(data_path_in_backup + part_name + "/", "");
for (const String & filename : filenames)
total_size_of_part += backup->getFileSize(data_path_in_backup + part_name + "/" + filename);
std::shared_ptr<IReservation> reservation = storage->getStoragePolicy()->reserveAndCheck(total_size_of_part);
auto disk = reservation->getDisk();
String relative_data_path = storage->getRelativeDataPath();
auto temp_part_dir_owner = std::make_shared<TemporaryFileOnDisk>(disk, relative_data_path + "restoring_" + part_name + "_");
String temp_part_dir = temp_part_dir_owner->getPath();
disk->createDirectories(temp_part_dir);
assert(temp_part_dir.starts_with(relative_data_path));
String relative_temp_part_dir = temp_part_dir.substr(relative_data_path.size());
for (const String & filename : filenames)
{
auto backup_entry = backup->readFile(fs::path(data_path_in_backup) / part_name / filename);
auto read_buffer = backup_entry->getReadBuffer();
auto write_buffer = disk->writeFile(fs::path(temp_part_dir) / filename);
copyData(*read_buffer, *write_buffer);
reservation->update(reservation->getSize() - backup_entry->getSize());
}
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
auto part = storage->createPart(part_name, part_info, single_disk_volume, relative_temp_part_dir);
/// TODO Transactions: Decide what to do with version metadata (if any). Let's just remove it for now.
disk->removeFileIfExists(fs::path(temp_part_dir) / IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME);
part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
part->loadColumnsChecksumsIndexes(false, true);
sink->writeExistingPart(part);
return {};
}
private:
std::shared_ptr<StorageReplicatedMergeTree> storage;
std::shared_ptr<ReplicatedMergeTreeSink> sink;
String part_name;
MergeTreePartInfo part_info;
BackupPtr backup;
String data_path_in_backup;
};
};
#if 0
PartsTemporaryRename renamed_parts(*this, "detached/");
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
/// TODO Allow to use quorum here.
ReplicatedMergeTreeSink output(*this, metadata_snapshot, 0, 0, 0, false, false, query_context,
/*is_attach*/true);
for (size_t i = 0; i < loaded_parts.size(); ++i)
{
const String old_name = loaded_parts[i]->name;
output.writeExistingPart(loaded_parts[i]);
renamed_parts.old_and_new_names[i].old_name.clear();
LOG_DEBUG(log, "Attached part {} as {}", old_name, loaded_parts[i]->name);
results.push_back(PartitionCommandResultInfo{
.partition_id = loaded_parts[i]->info.partition_id,
.part_name = loaded_parts[i]->name,
.old_part_name = old_name,
});
}
#endif
RestoreTaskPtr StorageReplicatedMergeTree::restoreData(
ContextMutablePtr local_context,
const ASTs & partitions,
const BackupPtr & backup,
const String & data_path_in_backup,
const StorageRestoreSettings &)
{
return std::make_unique<ReplicatedMergeTreeRestoreTask>(
local_context,
std::static_pointer_cast<StorageReplicatedMergeTree>(shared_from_this()),
getPartitionIDsFromQuery(partitions, local_context),
backup,
data_path_in_backup);
}
}

View File

@ -225,6 +225,9 @@ public:
static bool removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path,
const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, Poco::Logger * logger);
/// Extract data from the backup and put it to the storage.
RestoreTaskPtr restoreData(ContextMutablePtr local_context, const ASTs & partitions, const BackupPtr & backup, const String & data_path_in_backup, const StorageRestoreSettings & restore_settings) override;
/// Schedules job to execute in background pool (merge, mutate, drop range and so on)
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;

View File

@ -0,0 +1,14 @@
<?xml version="1.0"?>
<clickhouse>
<storage_configuration>
<disks>
<backups>
<type>local</type>
<path>/backups/</path>
</backups>
</disks>
</storage_configuration>
<backups>
<allowed_disk>backups</allowed_disk>
</backups>
</clickhouse>

View File

@ -0,0 +1,16 @@
<clickhouse>
<remote_servers>
<cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</cluster>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,105 @@
import pytest
import os.path
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/remote_servers.xml", "configs/backups_disk.xml"],
external_dirs=["/backups/"],
macros={"replica": "node1"},
with_zookeeper=True,
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/remote_servers.xml", "configs/backups_disk.xml"],
external_dirs=["/backups/"],
macros={"replica": "node2"},
with_zookeeper=True,
)
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
node1.query("DROP TABLE IF EXISTS tbl ON CLUSTER 'cluster' NO DELAY")
finally:
cluster.shutdown()
def create_table(instance = None):
on_cluster_clause = "" if instance else "ON CLUSTER 'cluster'"
instance_to_execute = instance if instance else node1
instance_to_execute.query(
"CREATE TABLE tbl " + on_cluster_clause + " ("
"x UInt8, y String"
") ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}')"
"ORDER BY x"
)
def drop_table(instance = None):
on_cluster_clause = "" if instance else "ON CLUSTER 'cluster'"
instance_to_execute = instance if instance else node1
instance_to_execute.query(f"DROP TABLE tbl {on_cluster_clause} NO DELAY")
def insert_data(instance = None):
instance1_to_execute = instance if instance else node1
instance2_to_execute = instance if instance else node2
instance1_to_execute.query("INSERT INTO tbl VALUES (1, 'Don''t')")
instance2_to_execute.query("INSERT INTO tbl VALUES (2, 'count')")
instance1_to_execute.query("INSERT INTO tbl SETTINGS async_insert=true VALUES (3, 'your')")
instance2_to_execute.query("INSERT INTO tbl SETTINGS async_insert=true VALUES (4, 'chickens')")
backup_id_counter = 0
def new_backup_name():
global backup_id_counter
backup_id_counter += 1
return f"Disk('backups', '{backup_id_counter}.zip')"
def get_path_to_backup(instance, backup_name):
return os.path.join(
instance.path,
"backups",
backup_name.removeprefix("Disk('backups', '").removesuffix("')"),
)
def test_backup_and_restore():
create_table()
insert_data()
backup_name = new_backup_name()
# Make backup on node 1.
node1.query(f"BACKUP TABLE tbl TO {backup_name}")
# Drop table on both nodes.
drop_table()
# Restore from backup on node2.
os.link(
get_path_to_backup(node1, backup_name), get_path_to_backup(node2, backup_name)
)
node2.query(f"RESTORE TABLE tbl FROM {backup_name}")
assert node2.query("SELECT * FROM tbl ORDER BY x") == TSV(
[[1, "Don\\'t"], [2, "count"], [3, "your"], [4, "chickens"]]
)
# Data should be replicated to node1.
create_table(node1)
assert node1.query("SELECT * FROM tbl ORDER BY x") == TSV(
[[1, "Don\\'t"], [2, "count"], [3, "your"], [4, "chickens"]]
)