Ability to backup-restore metadata files for DiskS3 (fixes and tests)

This commit is contained in:
Pavel Kovalenko 2021-01-11 20:37:08 +03:00
parent 2848b32af1
commit 0856b2c514
13 changed files with 269 additions and 71 deletions

View File

@ -108,7 +108,7 @@ DiskCacheWrapper::readFile(const String & path, size_t buf_size, size_t estimate
if (!cache_file_predicate(path))
return DiskDecorator::readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Read file {} from cache", backQuote(path));
LOG_DEBUG(&Poco::Logger::get("DiskCache"), "Read file {} from cache", backQuote(path));
if (cache_disk->exists(path))
return cache_disk->readFile(path, buf_size, estimated_size, aio_threshold, mmap_threshold);
@ -122,11 +122,11 @@ DiskCacheWrapper::readFile(const String & path, size_t buf_size, size_t estimate
{
/// This thread will responsible for file downloading to cache.
metadata->status = DOWNLOADING;
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "File {} doesn't exist in cache. Will download it", backQuote(path));
LOG_DEBUG(&Poco::Logger::get("DiskCache"), "File {} doesn't exist in cache. Will download it", backQuote(path));
}
else if (metadata->status == DOWNLOADING)
{
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Waiting for file {} download to cache", backQuote(path));
LOG_DEBUG(&Poco::Logger::get("DiskCache"), "Waiting for file {} download to cache", backQuote(path));
metadata->condition.wait(lock, [metadata] { return metadata->status == DOWNLOADED || metadata->status == ERROR; });
}
}
@ -151,11 +151,11 @@ DiskCacheWrapper::readFile(const String & path, size_t buf_size, size_t estimate
}
cache_disk->moveFile(tmp_path, path);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "File {} downloaded to cache", backQuote(path));
LOG_DEBUG(&Poco::Logger::get("DiskCache"), "File {} downloaded to cache", backQuote(path));
}
catch (...)
{
tryLogCurrentException("DiskS3", "Failed to download file + " + backQuote(path) + " to cache");
tryLogCurrentException("DiskCache", "Failed to download file + " + backQuote(path) + " to cache");
result_status = ERROR;
}
}
@ -180,7 +180,7 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode
if (!cache_file_predicate(path))
return DiskDecorator::writeFile(path, buf_size, mode, estimated_size, aio_threshold);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Write file {} to cache", backQuote(path));
LOG_DEBUG(&Poco::Logger::get("DiskCache"), "Write file {} to cache", backQuote(path));
auto dir_path = directoryPath(path);
if (!cache_disk->exists(dir_path))

View File

@ -195,6 +195,9 @@ public:
/// Returns executor to perform asynchronous operations.
virtual Executor & getExecutor() { return *executor; }
/// Invoked when partitions freeze is invoked.
virtual void onFreeze(const String &) { }
private:
std::unique_ptr<Executor> executor;
};

View File

@ -924,19 +924,24 @@ void DiskS3::startup()
/// Find last revision.
UInt64 l = 0, r = LATEST_REVISION;
while (r - l > 1)
while (l < r)
{
auto revision = (r - l) >> 1;
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Check revision in bounds {}-{}", l, r);
auto revision = l + (r - l + 1) / 2;
auto revision_str = revisionToString(revision);
/// Check that file or operation with such revision exists.
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Check object with revision {}", revision);
/// Check file or operation with such revision exists.
if (checkObjectExists(s3_root_path + "r" + revision_str)
|| checkObjectExists(s3_root_path + "operations/r" + revision_str))
l = revision;
else
r = revision;
r = revision - 1;
}
revision_counter = l;
LOG_INFO(&Poco::Logger::get("DiskS3"), "Found last revision number {}", revision_counter);
LOG_INFO(&Poco::Logger::get("DiskS3"), "Found last revision number {} for disk {}", revision_counter, name);
}
bool DiskS3::checkObjectExists(const String & prefix)
@ -969,7 +974,7 @@ void DiskS3::listObjects(const String & source_bucket, const String & source_pat
Aws::S3::Model::ListObjectsV2Request request;
request.SetBucket(source_bucket);
request.SetPrefix(source_path);
request.SetMaxKeys(1000);
request.SetMaxKeys(list_object_keys_size);
Aws::S3::Model::ListObjectsV2Outcome outcome;
do
@ -1000,13 +1005,13 @@ void DiskS3::copyObject(const String & src_bucket, const String & src_key, const
struct DiskS3::RestoreInformation
{
UInt64 revision = LATEST_REVISION;
String bucket;
String path;
String source_bucket;
String source_path;
};
void DiskS3::readRestoreInformation(DiskS3::RestoreInformation & restore_information)
{
ReadBufferFromFile buffer(metadata_path + restore_file, 512);
ReadBufferFromFile buffer(metadata_path + restore_file_name, 512);
buffer.next();
/// Empty file - just restore all metadata.
@ -1021,13 +1026,13 @@ void DiskS3::readRestoreInformation(DiskS3::RestoreInformation & restore_informa
if (!buffer.hasPendingData())
return;
readText(restore_information.bucket, buffer);
readText(restore_information.source_bucket, buffer);
assertChar('\n', buffer);
if (!buffer.hasPendingData())
return;
readText(restore_information.path, buffer);
readText(restore_information.source_path, buffer);
assertChar('\n', buffer);
if (buffer.hasPendingData())
@ -1041,35 +1046,42 @@ void DiskS3::readRestoreInformation(DiskS3::RestoreInformation & restore_informa
void DiskS3::restore()
{
if (!exists(restore_file))
if (!exists(restore_file_name))
return;
try
{
RestoreInformation information;
information.bucket = bucket;
information.path = s3_root_path;
information.source_bucket = bucket;
information.source_path = s3_root_path;
readRestoreInformation(information);
if (information.revision == 0)
information.revision = LATEST_REVISION;
if (!information.source_path.ends_with('/'))
information.source_path += '/';
if (information.bucket == bucket)
if (information.source_bucket == bucket)
{
/// In this case we need to additionally cleanup S3 from objects with later revision.
/// Will be simply just restore to different path.
if (information.path == s3_root_path && information.revision != LATEST_REVISION)
if (information.source_path == s3_root_path && information.revision != LATEST_REVISION)
throw Exception("Restoring to the same bucket and path is allowed if revision is latest (0)", ErrorCodes::BAD_ARGUMENTS);
/// This case complicates S3 cleanup in case of unsuccessful restore.
if (information.path != s3_root_path && (information.path.starts_with(s3_root_path) || s3_root_path.starts_with(information.path)))
throw Exception("Restoring to the same bucket is allowed only if restore paths are same or not prefixes of each other", ErrorCodes::BAD_ARGUMENTS);
if (information.source_path != s3_root_path && s3_root_path.starts_with(information.source_path))
throw Exception("Restoring to the same bucket is allowed only if source path is not a sub-path of configured path in S3 disk", ErrorCodes::BAD_ARGUMENTS);
}
///TODO: Cleanup FS and bucket if previous restore was failed.
restoreFiles(information.bucket, information.path, information.revision);
restoreFileOperations(information.bucket, information.path, information.revision);
restoreFiles(information.source_bucket, information.source_path, information.revision);
restoreFileOperations(information.source_bucket, information.source_path, information.revision);
Poco::File restore_file(metadata_path + restore_file_name);
restore_file.remove();
LOG_INFO(&Poco::Logger::get("DiskS3"), "Restore disk {} finished", name);
}
catch (const Exception & e)
{
@ -1093,7 +1105,7 @@ void DiskS3::restoreFiles(const String & source_bucket, const String & source_pa
if (key.find("/operations/") != String::npos)
continue;
auto [revision, _] = extractRevisionAndOperationFromKey(key);
const auto [revision, _] = extractRevisionAndOperationFromKey(key);
/// Filter early if it's possible to get revision from key.
if (revision > target_revision)
continue;
@ -1129,11 +1141,11 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so
auto head_result = headObject(source_bucket, key);
auto object_metadata = head_result.GetMetadata();
/// If object has 'path' in metadata then restore it.
/// Restore file if object has 'path' in metadata.
auto path_entry = object_metadata.find("path");
if (path_entry == object_metadata.end())
{
LOG_WARNING(&Poco::Logger::get("DiskS3"), "Skip key {} because it doesn't have 'path' key in metadata", key);
LOG_WARNING(&Poco::Logger::get("DiskS3"), "Skip key {} because it doesn't have 'path' in metadata", key);
continue;
}
@ -1141,17 +1153,16 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so
createDirectories(directoryPath(path));
auto metadata = createMeta(path);
auto relative_key = shrinkKey(source_path, key);
metadata.addObject(relative_key, head_result.GetContentLength());
/// Copy object if we restore to different bucket / path.
if (bucket != source_bucket || s3_root_path != source_path)
copyObject(source_bucket, key, bucket, s3_root_path + relative_key);
metadata.addObject(relative_key, head_result.GetContentLength());
metadata.save();
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Restored {} file", path);
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Restored file {}", path);
}
}
@ -1159,7 +1170,7 @@ void DiskS3::restoreFileOperations(const String & source_bucket, const String &
{
LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting restore file operations for disk {}", name);
/// Enable record file operations if we restore to different bucket / path.
/// Enable recording file operations if we restore to different bucket / path.
send_metadata = bucket != source_bucket || s3_root_path != source_path;
listObjects(source_bucket, source_path + "operations/", [this, &source_bucket, &target_revision](auto list_result)
@ -1171,15 +1182,15 @@ void DiskS3::restoreFileOperations(const String & source_bucket, const String &
{
const String & key = row.GetKey();
auto [revision, operation] = extractRevisionAndOperationFromKey(key);
if (revision == 0)
const auto [revision, operation] = extractRevisionAndOperationFromKey(key);
if (revision == UNKNOWN_REVISION)
{
LOG_WARNING(&Poco::Logger::get("DiskS3"), "Skip key {} with unknown revision", revision);
LOG_WARNING(&Poco::Logger::get("DiskS3"), "Skip key {} with unknown revision", key);
continue;
}
/// Stop processing when get revision more than required.
/// S3 ensures that keys will be listed in ascending UTF-8 bytes order.
/// S3 ensures that keys will be listed in ascending UTF-8 bytes order (revision order).
/// We can stop processing if revision of the object is already more than required.
if (revision > target_revision)
return false;
@ -1220,7 +1231,7 @@ void DiskS3::restoreFileOperations(const String & source_bucket, const String &
std::tuple<UInt64, String> DiskS3::extractRevisionAndOperationFromKey(const String & key)
{
UInt64 revision = 0;
UInt64 revision = UNKNOWN_REVISION;
String operation;
re2::RE2::FullMatch(key, key_regexp, &revision, &operation);
@ -1249,4 +1260,10 @@ String DiskS3::revisionToString(UInt64 revision)
return revision_str;
}
void DiskS3::onFreeze(const String & path)
{
WriteBufferFromFile revision_file_buf(metadata_path + path + "revision.txt", 32);
writeIntText(revision_counter.load(), revision_file_buf);
}
}

View File

@ -126,6 +126,8 @@ public:
/// Restore S3 metadata files on file system.
void restore();
void onFreeze(const String & path) override;
private:
bool tryReserve(UInt64 bytes);
@ -172,9 +174,10 @@ private:
std::atomic<UInt64> revision_counter;
static constexpr UInt64 LATEST_REVISION = (static_cast<UInt64>(1)) << 63;
static constexpr UInt64 UNKNOWN_REVISION = 0;
/// File contains restore information
const String restore_file = "restore";
/// File at path {metadata_path}/restore indicates that metadata restore is needed and contains restore information
const String restore_file_name = "restore";
/// The number of keys listed in one request (1000 is max value).
int list_object_keys_size;

View File

@ -150,7 +150,7 @@ void registerDiskS3(DiskFactory & factory)
context.getSettingsRef().s3_min_upload_part_size,
context.getSettingsRef().s3_max_single_part_upload_size,
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getBool(config_prefix + ".send_object_metadata", false),
config.getBool(config_prefix + ".send_metadata", false),
config.getInt(config_prefix + ".thread_pool_size", 16),
config.getInt(config_prefix + ".list_object_keys_size", 1000));

View File

@ -3604,6 +3604,10 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(MatcherFn m
const auto data_parts = getDataParts();
String backup_name = (!with_name.empty() ? escapeForFileName(with_name) : toString(increment));
String backup_path = shadow_path + backup_name + "/";
for (const auto & disk : getStoragePolicy()->getDisks())
disk->onFreeze(backup_path);
PartitionCommandsResultInfo result;
@ -3613,12 +3617,10 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(MatcherFn m
if (!matcher(part))
continue;
part->volume->getDisk()->createDirectories(shadow_path);
String backup_path = shadow_path + backup_name + "/";
LOG_DEBUG(log, "Freezing part {} snapshot will be placed at {}", part->name, backup_path);
part->volume->getDisk()->createDirectories(shadow_path);
String backup_part_path = backup_path + relative_data_path + part->relative_path;
if (auto part_in_memory = asInMemoryPart(part))
part_in_memory->flushToDisk(backup_path + relative_data_path, part->relative_path, metadata_snapshot);

View File

@ -1040,32 +1040,25 @@ class ClickHouseInstance:
return self.http_query(sql=sql, data=data, params=params, user=user, password=password,
expect_fail_and_get_error=True)
def kill_clickhouse(self, stop_start_wait_sec=5):
pid = self.get_process_pid("clickhouse")
if not pid:
raise Exception("No clickhouse found")
self.exec_in_container(["bash", "-c", "kill -9 {}".format(pid)], user='root')
time.sleep(stop_start_wait_sec)
def restore_clickhouse(self, retries=100):
pid = self.get_process_pid("clickhouse")
if pid:
raise Exception("ClickHouse has already started")
self.exec_in_container(["bash", "-c", "{} --daemon".format(CLICKHOUSE_START_COMMAND)], user=str(os.getuid()))
from helpers.test_tools import assert_eq_with_retry
# wait start
assert_eq_with_retry(self, "select 1", "1", retry_count=retries)
def restart_clickhouse(self, stop_start_wait_sec=5, kill=False):
def stop_clickhouse(self, start_wait_sec=5, kill=False):
if not self.stay_alive:
raise Exception("clickhouse can be restarted only with stay_alive=True instance")
raise Exception("clickhouse can be stopped only with stay_alive=True instance")
self.exec_in_container(["bash", "-c", "pkill {} clickhouse".format("-9" if kill else "")], user='root')
time.sleep(stop_start_wait_sec)
time.sleep(start_wait_sec)
def start_clickhouse(self, stop_wait_sec=5):
if not self.stay_alive:
raise Exception("clickhouse can be started again only with stay_alive=True instance")
self.exec_in_container(["bash", "-c", "{} --daemon".format(CLICKHOUSE_START_COMMAND)], user=str(os.getuid()))
# wait start
from helpers.test_tools import assert_eq_with_retry
assert_eq_with_retry(self, "select 1", "1", retry_count=int(stop_start_wait_sec / 0.5), sleep_time=0.5)
assert_eq_with_retry(self, "select 1", "1", retry_count=int(stop_wait_sec / 0.5), sleep_time=0.5)
def restart_clickhouse(self, stop_start_wait_sec=5, kill=False):
self.stop_clickhouse(stop_start_wait_sec, kill)
self.start_clickhouse(stop_start_wait_sec)
def exec_in_container(self, cmd, detach=False, nothrow=False, **kwargs):
container_id = self.get_docker_handle().id
@ -1085,9 +1078,7 @@ class ClickHouseInstance:
return self.cluster.copy_file_to_container(container_id, local_path, dest_path)
def get_process_pid(self, process_name):
output = self.exec_in_container(["bash", "-c",
"ps ax | grep '{}' | grep -v 'grep' | grep -v 'bash -c' | awk '{{print $1}}'".format(
process_name)])
output = self.exec_in_container(["pidof", "{}".format(process_name)])
if output:
try:
pid = int(output.split('\n')[0].strip())
@ -1403,7 +1394,7 @@ class ClickHouseKiller(object):
self.clickhouse_node = clickhouse_node
def __enter__(self):
self.clickhouse_node.kill_clickhouse()
self.clickhouse_node.stop_clickhouse()
def __exit__(self, exc_type, exc_val, exc_tb):
self.clickhouse_node.restore_clickhouse()
self.clickhouse_node.start_clickhouse()

View File

@ -0,0 +1,5 @@
<yandex>
<background_processing_pool_thread_sleep_seconds>0.5</background_processing_pool_thread_sleep_seconds>
<background_processing_pool_task_sleep_seconds_when_no_work_min>0.5</background_processing_pool_task_sleep_seconds_when_no_work_min>
<background_processing_pool_task_sleep_seconds_when_no_work_max>0.5</background_processing_pool_task_sleep_seconds_when_no_work_max>
</yandex>

View File

@ -0,0 +1,12 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,34 @@
<yandex>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<send_metadata>true</send_metadata>
<list_object_keys_size>1</list_object_keys_size> <!-- To effectively test restore parallelism -->
</s3>
<hdd>
<type>local</type>
<path>/</path>
</hdd>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
<external>
<disk>hdd</disk>
</external>
</volumes>
</s3>
</policies>
</storage_configuration>
<merge_tree>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
</merge_tree>
</yandex>

View File

@ -0,0 +1,5 @@
<yandex>
<profiles>
<default/>
</profiles>
</yandex>

View File

@ -0,0 +1,20 @@
<?xml version="1.0"?>
<yandex>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
</yandex>

View File

@ -0,0 +1,106 @@
import logging
import random
import string
import time
import pytest
from helpers.cluster import ClickHouseCluster
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node", main_configs=["configs/config.d/storage_conf.xml",
"configs/config.d/bg_processing_pool_conf.xml",
"configs/config.d/log_conf.xml"], user_configs=[], with_minio=True, stay_alive=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def random_string(length):
letters = string.ascii_letters
return ''.join(random.choice(letters) for i in range(length))
def generate_values(date_str, count, sign=1):
data = [[date_str, sign * (i + 1), random_string(10)] for i in range(count)]
data.sort(key=lambda tup: tup[1])
return ",".join(["('{}',{},'{}')".format(x, y, z) for x, y, z in data])
def create_table(cluster, table_name, additional_settings=None):
node = cluster.instances["node"]
create_table_statement = """
CREATE TABLE {} (
dt Date,
id Int64,
data String,
INDEX min_max (id) TYPE minmax GRANULARITY 3
) ENGINE=MergeTree()
PARTITION BY dt
ORDER BY (dt, id)
SETTINGS
storage_policy='s3',
old_parts_lifetime=600,
index_granularity=512
""".format(table_name)
if additional_settings:
create_table_statement += ","
create_table_statement += additional_settings
node.query(create_table_statement)
@pytest.fixture(autouse=True)
def drop_table(cluster):
yield
node = cluster.instances["node"]
minio = cluster.minio_client
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
for obj in list(minio.list_objects(cluster.minio_bucket, 'data/')):
minio.remove_object(cluster.minio_bucket, obj.object_name)
# Restore to the same bucket and path with latest revision.
def test_simple_full_restore(cluster):
create_table(cluster, "s3_test")
node = cluster.instances["node"]
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-04', 4096, -1)))
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-05', 4096)))
node.query("INSERT INTO s3_test VALUES {}".format(generate_values('2020-01-05', 4096, -1)))
# To ensure parts have merged
node.query("OPTIMIZE TABLE s3_test")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "({})".format(4096 * 4)
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "({})".format(0)
node.stop_clickhouse()
node.exec_in_container(['bash', '-c', 'rm -r /var/lib/clickhouse/disks/s3/*'], user='root')
node.start_clickhouse()
# All data is removed.
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "({})".format(0)
node.stop_clickhouse()
node.exec_in_container(['bash', '-c', 'touch /var/lib/clickhouse/disks/s3/restore'], user='root')
node.start_clickhouse()
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "({})".format(4096 * 4)
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "({})".format(0)