Merge branch 'master' into export-logs-in-ci

This commit is contained in:
Alexey Milovidov 2023-08-07 23:08:26 +02:00
commit cafe4354c5
14 changed files with 196 additions and 7 deletions

View File

@ -84,6 +84,7 @@ The BACKUP and RESTORE statements take a list of DATABASE and TABLE names, a des
- `password` for the file on disk
- `base_backup`: the destination of the previous backup of this source. For example, `Disk('backups', '1.zip')`
- `structure_only`: if enabled, allows to only backup or restore the CREATE statements without the data of tables
- `storage_policy`: storage policy for the tables being restored. See [Using Multiple Block Devices for Data Storage](../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-multiple-volumes). This setting is only applicable to the `RESTORE` command. The specified storage policy applies only to tables with an engine from the `MergeTree` family.
- `s3_storage_class`: the storage class used for S3 backup. For example, `STANDARD`
### Usage examples

View File

@ -8,6 +8,7 @@
#include <boost/algorithm/string/predicate.hpp>
#include <Common/FieldVisitorConvertToNumber.h>
#include <Backups/SettingsFieldOptionalUUID.h>
#include <Backups/SettingsFieldOptionalString.h>
namespace DB
@ -164,6 +165,7 @@ namespace
M(Bool, allow_s3_native_copy) \
M(Bool, internal) \
M(String, host_id) \
M(OptionalString, storage_policy) \
M(OptionalUUID, restore_uuid)

View File

@ -117,6 +117,9 @@ struct RestoreSettings
/// The current host's ID in the format 'escaped_host_name:port'.
String host_id;
/// Alternative storage policy that may be specified in the SETTINGS clause of RESTORE queries
std::optional<String> storage_policy;
/// Internal, should not be specified by user.
/// Cluster's hosts' IDs in the format 'escaped_host_name:port' for all shards and replicas in a cluster specified in BACKUP ON CLUSTER.
std::vector<Strings> cluster_host_ids;

View File

@ -322,6 +322,7 @@ void RestorerFromBackup::findTableInBackup(const QualifiedTableName & table_name
read_buffer.reset();
ParserCreateQuery create_parser;
ASTPtr create_table_query = parseQuery(create_parser, create_query_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
applyCustomStoragePolicy(create_table_query);
renameDatabaseAndTableNameInCreateQuery(create_table_query, renaming_map, context->getGlobalContext());
QualifiedTableName table_name = renaming_map.getNewTableName(table_name_in_backup);
@ -625,6 +626,24 @@ void RestorerFromBackup::checkDatabase(const String & database_name)
}
}
void RestorerFromBackup::applyCustomStoragePolicy(ASTPtr query_ptr)
{
constexpr auto setting_name = "storage_policy";
if (query_ptr && restore_settings.storage_policy.has_value())
{
ASTStorage * storage = query_ptr->as<ASTCreateQuery &>().storage;
if (storage && storage->settings)
{
if (restore_settings.storage_policy.value().empty())
/// it has been set to "" deliberately, so the source storage policy is erased
storage->settings->changes.removeSetting(setting_name);
else
/// it has been set to a custom value, so it either overwrites the existing value or is added as a new one
storage->settings->changes.setSetting(setting_name, restore_settings.storage_policy.value());
}
}
}
void RestorerFromBackup::removeUnresolvedDependencies()
{
auto need_exclude_dependency = [this](const StorageID & table_id)

View File

@ -95,6 +95,8 @@ private:
void createDatabase(const String & database_name) const;
void checkDatabase(const String & database_name);
void applyCustomStoragePolicy(ASTPtr query_ptr);
void removeUnresolvedDependencies();
void createTables();
void createTable(const QualifiedTableName & table_name);

View File

@ -0,0 +1,29 @@
#include <Backups/SettingsFieldOptionalString.h>
#include <Common/ErrorCodes.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PARSE_BACKUP_SETTINGS;
}
SettingFieldOptionalString::SettingFieldOptionalString(const Field & field)
{
if (field.getType() == Field::Types::Null)
{
value = std::nullopt;
return;
}
if (field.getType() == Field::Types::String)
{
value = field.get<const String &>();
return;
}
throw Exception(ErrorCodes::CANNOT_PARSE_BACKUP_SETTINGS, "Cannot get string from {}", field);
}
}

View File

@ -0,0 +1,20 @@
#pragma once
#include <optional>
#include <Core/SettingsFields.h>
namespace DB
{
struct SettingFieldOptionalString
{
std::optional<String> value;
explicit SettingFieldOptionalString(const std::optional<String> & value_) : value(value_) {}
explicit SettingFieldOptionalString(const Field & field);
explicit operator Field() const { return Field(value ? toString(*value) : ""); }
};
}

View File

@ -419,7 +419,7 @@ catch (...)
throw;
}
void DistributedAsyncInsertDirectoryQueue::processFile(const std::string & file_path)
void DistributedAsyncInsertDirectoryQueue::processFile(std::string & file_path)
{
OpenTelemetry::TracingContextHolderPtr thread_trace_context;
@ -459,7 +459,7 @@ void DistributedAsyncInsertDirectoryQueue::processFile(const std::string & file_
if (isDistributedSendBroken(e.code(), e.isRemoteException()))
{
markAsBroken(file_path);
current_file.clear();
file_path.clear();
}
throw;
}
@ -473,8 +473,8 @@ void DistributedAsyncInsertDirectoryQueue::processFile(const std::string & file_
auto dir_sync_guard = getDirectorySyncGuard(relative_path);
markAsSend(file_path);
current_file.clear();
LOG_TRACE(log, "Finished processing `{}` (took {} ms)", file_path, watch.elapsedMilliseconds());
file_path.clear();
}
struct DistributedAsyncInsertDirectoryQueue::BatchHeader

View File

@ -100,7 +100,7 @@ private:
void addFile(const std::string & file_path);
void initializeFilesFromDisk();
void processFiles();
void processFile(const std::string & file_path);
void processFile(std::string & file_path);
void processFilesWithBatching();
void markAsBroken(const std::string & file_path);

View File

@ -8,6 +8,7 @@ from commit_status_helper import (
get_commit,
get_commit_filtered_statuses,
post_commit_status,
update_mergeable_check,
)
from get_robot_token import get_best_robot_token
from pr_info import PRInfo
@ -18,6 +19,8 @@ def main():
pr_info = PRInfo(need_orgs=True)
gh = Github(get_best_robot_token(), per_page=100)
# Update the Mergeable Check at the final step
update_mergeable_check(gh, pr_info, CI_STATUS_NAME)
commit = get_commit(gh, pr_info.sha)
statuses = [
@ -27,7 +30,8 @@ def main():
]
if not statuses:
return
status = statuses[0]
# Take the latest status
status = statuses[-1]
if status.state == "pending":
post_commit_status(
commit,

View File

@ -0,0 +1,33 @@
<clickhouse>
<storage_configuration>
<disks>
<one>
<type>local</type>
<path>/var/lib/disks/one/</path>
</one>
<two>
<type>local</type>
<path>/var/lib/disks/two/</path>
</two>
</disks>
<policies>
<policy1>
<volumes>
<single>
<disk>one</disk>
</single>
</volumes>
</policy1>
<policy2>
<volumes>
<single>
<disk>two</disk>
</single>
</volumes>
</policy2>
</policies>
</storage_configuration>
<backups>
<allowed_path>/backups</allowed_path>
</backups>
</clickhouse>

View File

@ -0,0 +1,76 @@
import pytest
from helpers.cluster import ClickHouseCluster
backup_id_counter = 0
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
"instance",
main_configs=["configs/storage_config.xml"],
)
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def cleanup_after_test():
try:
yield
finally:
instance.query("DROP DATABASE IF EXISTS test")
def new_backup_name():
global backup_id_counter
backup_id_counter += 1
return f"File('/backups/{backup_id_counter}/')"
def create_table_backup(backup_name, storage_policy=None):
instance.query("CREATE DATABASE test")
create_query = "CREATE TABLE test.table(x UInt32) ENGINE=MergeTree ORDER BY x"
if storage_policy is not None:
create_query += f" SETTINGS storage_policy = '{storage_policy}'"
instance.query(create_query)
instance.query(f"INSERT INTO test.table SELECT number FROM numbers(10)")
instance.query(f"BACKUP TABLE test.table TO {backup_name}")
instance.query("DROP TABLE test.table SYNC")
def restore_table(backup_name, storage_policy=None):
restore_query = f"RESTORE TABLE test.table FROM {backup_name}"
if storage_policy is not None:
restore_query += f" SETTINGS storage_policy = '{storage_policy}'"
instance.query(restore_query)
@pytest.mark.parametrize(
"origin_policy, restore_policy, expected_policy",
[
(None, "", "default"),
(None, None, "default"),
(None, "policy1", "policy1"),
("policy1", "policy1", "policy1"),
("policy1", "policy2", "policy2"),
("policy1", "", "default"),
("policy1", None, "policy1"),
],
)
def test_storage_policies(origin_policy, restore_policy, expected_policy):
backup_name = new_backup_name()
create_table_backup(backup_name, origin_policy)
restore_table(backup_name, restore_policy)
assert (
instance.query("SELECT storage_policy FROM system.tables WHERE name='table'")
== f"{expected_policy}\n"
)

View File

@ -3,8 +3,8 @@
{"operation_name":"void DB::DistributedSink::writeToLocal(const Cluster::ShardInfo &, const Block &, size_t)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
1
===2===
{"operation_name":"void DB::DistributedAsyncInsertDirectoryQueue::processFile(const std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"}
{"operation_name":"void DB::DistributedAsyncInsertDirectoryQueue::processFile(const std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
{"operation_name":"void DB::DistributedAsyncInsertDirectoryQueue::processFile(std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"1","rows":"1","bytes":"8"}
{"operation_name":"void DB::DistributedAsyncInsertDirectoryQueue::processFile(std::string &)","cluster":"test_cluster_two_shards_localhost","shard":"2","rows":"1","bytes":"8"}
3
2
===3===