Get rid of code duplication in extractZkPathFromCreateQuery().

This commit is contained in:
Vitaly Baranov 2024-04-25 19:24:36 +02:00
parent e09530ab75
commit 6e57931263
6 changed files with 272 additions and 232 deletions

View File

@ -11,7 +11,7 @@
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/formatAST.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/extractZkPathFromCreateQuery.h>
#include <Storages/MergeTree/extractZooKeeperPathFromReplicatedTableDef.h>
#include <base/chrono_io.h>
#include <base/insertAtEnd.h>
#include <base/scope_guard.h>
@ -776,7 +776,7 @@ void BackupEntriesCollector::makeBackupEntriesForTablesDefs()
checkIsQueryCancelled();
ASTPtr new_create_query = table_info.create_table_query;
table_info.replicated_table_zk_path = tryExtractZkPathFromCreateQuery(*new_create_query, context->getGlobalContext());
table_info.replicated_table_zk_path = extractZooKeeperPathFromReplicatedTableDef(new_create_query->as<const ASTCreateQuery &>(), context);
adjustCreateQueryForBackup(new_create_query, context->getGlobalContext());
renameDatabaseAndTableNameInCreateQuery(new_create_query, renaming_map, context->getGlobalContext());

View File

@ -1,61 +0,0 @@
#include <Databases/DatabaseReplicatedHelpers.h>
#include <Databases/IDatabase.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/MergeTree/extractZkPathFromCreateQuery.h>
#include <Common/Macros.h>
namespace DB
{
std::optional<String> tryExtractZkPathFromCreateQuery(const IAST & create_query, const ContextPtr & global_context)
{
const auto * create = create_query.as<const ASTCreateQuery>();
if (!create || !create->storage || !create->storage->engine)
return {};
/// Check if the table engine is one of the ReplicatedMergeTree family.
const auto & ast_engine = *create->storage->engine;
if (!ast_engine.name.starts_with("Replicated") || !ast_engine.name.ends_with("MergeTree"))
return {};
/// Get the first argument.
const auto * ast_arguments = typeid_cast<ASTExpressionList *>(ast_engine.arguments.get());
if (!ast_arguments || ast_arguments->children.empty())
return {};
auto * ast_zk_path = typeid_cast<ASTLiteral *>(ast_arguments->children[0].get());
if (!ast_zk_path || (ast_zk_path->value.getType() != Field::Types::String))
return {};
String zk_path = ast_zk_path->value.safeGet<String>();
/// Expand macros.
Macros::MacroExpansionInfo info;
info.table_id.table_name = create->getTable();
info.table_id.database_name = create->getDatabase();
info.table_id.uuid = create->uuid;
auto database = DatabaseCatalog::instance().tryGetDatabase(info.table_id.database_name);
if (database && database->getEngineName() == "Replicated")
{
info.shard = getReplicatedDatabaseShardName(database);
info.replica = getReplicatedDatabaseReplicaName(database);
}
try
{
zk_path = global_context->getMacros()->expand(zk_path, info);
}
catch (...)
{
return {}; /// Couldn't expand macros.
}
return zk_path;
}
}

View File

@ -1,19 +0,0 @@
#pragma once
#include <base/types.h>
#include <memory>
#include <optional>
namespace DB
{
class IAST;
class Context;
using ContextPtr = std::shared_ptr<const Context>;
/// Extracts a zookeeper path from a specified CREATE TABLE query. Returns std::nullopt if fails.
/// The function takes the first argument of the ReplicatedMergeTree table engine and expands macros in it.
/// It works like a part of what the create() function in registerStorageMergeTree.cpp does but in a simpler manner.
std::optional<String> tryExtractZkPathFromCreateQuery(const IAST & create_query, const ContextPtr & global_context);
}

View File

@ -0,0 +1,18 @@
#pragma once
#include <base/types.h>
#include <memory>
#include <optional>
namespace DB
{
class ASTCreateQuery;
class Context;
using ContextPtr = std::shared_ptr<const Context>;
/// Extracts a zookeeper path from a specified CREATE TABLE query. Returns std::nullopt if fails.
/// The function checks the table engine and if it is Replicated*MergeTree then it takes the first argument and expands macros in it.
std::optional<String> extractZooKeeperPathFromReplicatedTableDef(const ASTCreateQuery & create_query, const ContextPtr & context);
}

View File

@ -1,6 +1,7 @@
#include <Databases/DatabaseReplicatedHelpers.h>
#include <Storages/MergeTree/MergeTreeIndexMinMax.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/extractZooKeeperPathFromReplicatedTableDef.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
@ -122,6 +123,248 @@ static void verifySortingKey(const KeyDescription & sorting_key)
}
}
/// Returns whether a new syntax is used to define a table engine, i.e. MergeTree() PRIMARY KEY ... PARTITION BY ... SETTINGS ...
/// instead of MergeTree(MergeTree(date, [sample_key], primary_key).
static bool isExtendedStorageDef(const ASTCreateQuery & query)
{
if (query.storage && query.storage->isExtendedStorageDefinition())
return true;
if (query.columns_list &&
((query.columns_list->indices && !query.columns_list->indices->children.empty()) ||
(query.columns_list->projections && !query.columns_list->projections->children.empty())))
{
return true;
}
return false;
}
/// Evaluates expressions in engine arguments.
/// In new syntax an argument can be literal or identifier or array/tuple of identifiers.
static void evaluateEngineArgs(ASTs & engine_args, const ContextPtr & context)
{
size_t arg_idx = 0;
try
{
for (; arg_idx < engine_args.size(); ++arg_idx)
{
auto & arg = engine_args[arg_idx];
auto * arg_func = arg->as<ASTFunction>();
if (!arg_func)
continue;
/// If we got ASTFunction, let's evaluate it and replace with ASTLiteral.
/// Do not try evaluate array or tuple, because it's array or tuple of column identifiers.
if (arg_func->name == "array" || arg_func->name == "tuple")
continue;
Field value = evaluateConstantExpression(arg, context).first;
arg = std::make_shared<ASTLiteral>(value);
}
}
catch (Exception & e)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot evaluate engine argument {}: {} {}",
arg_idx, e.message(), verbose_help_message);
}
}
/// Returns whether this is a Replicated table engine?
static bool isReplicated(const String & engine_name)
{
return engine_name.starts_with("Replicated") && engine_name.ends_with("MergeTree");
}
/// Returns the part of the name of a table engine between "Replicated" (if any) and "MergeTree".
static std::string_view getNamePart(const String & engine_name)
{
std::string_view name_part = engine_name;
if (name_part.starts_with("Replicated"))
name_part.remove_prefix(strlen("Replicated"));
if (name_part.ends_with("MergeTree"))
name_part.remove_suffix(strlen("MergeTree"));
return name_part;
}
/// Extracts zookeeper path and replica name from the table engine's arguments.
/// The function can modify those arguments (that's why they're passed separately in `engine_args`) and also determines RenamingRestrictions.
/// The function assumes the table engine is Replicated.
static void extractZooKeeperPathAndReplicaNameFromEngineArgs(
const ASTCreateQuery & query,
const StorageID & table_id,
const String & engine_name,
ASTs & engine_args,
LoadingStrictnessLevel mode,
const ContextPtr & context,
String & zookeeper_path,
String & replica_name,
RenamingRestrictions & renaming_restrictions)
{
chassert(isReplicated(engine_name));
zookeeper_path = "";
replica_name = "";
renaming_restrictions = RenamingRestrictions::ALLOW_ANY;
bool is_extended_storage_def = isExtendedStorageDef(query);
if (is_extended_storage_def)
{
/// Allow expressions in engine arguments.
/// In new syntax argument can be literal or identifier or array/tuple of identifiers.
evaluateEngineArgs(engine_args, context);
}
bool is_on_cluster = context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
bool is_replicated_database = context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
DatabaseCatalog::instance().getDatabase(table_id.database_name)->getEngineName() == "Replicated";
/// Allow implicit {uuid} macros only for zookeeper_path in ON CLUSTER queries
/// and if UUID was explicitly passed in CREATE TABLE (like for ATTACH)
bool allow_uuid_macro = is_on_cluster || is_replicated_database || query.attach || query.has_uuid;
auto expand_macro = [&] (ASTLiteral * ast_zk_path, ASTLiteral * ast_replica_name)
{
/// Unfold {database} and {table} macro on table creation, so table can be renamed.
if (mode < LoadingStrictnessLevel::ATTACH)
{
Macros::MacroExpansionInfo info;
/// NOTE: it's not recursive
info.expand_special_macros_only = true;
info.table_id = table_id;
/// Avoid unfolding {uuid} macro on this step.
/// We did unfold it in previous versions to make moving table from Atomic to Ordinary database work correctly,
/// but now it's not allowed (and it was the only reason to unfold {uuid} macro).
info.table_id.uuid = UUIDHelpers::Nil;
zookeeper_path = context->getMacros()->expand(zookeeper_path, info);
info.level = 0;
replica_name = context->getMacros()->expand(replica_name, info);
}
ast_zk_path->value = zookeeper_path;
ast_replica_name->value = replica_name;
/// Expand other macros (such as {shard} and {replica}). We do not expand them on previous step
/// to make possible copying metadata files between replicas.
Macros::MacroExpansionInfo info;
info.table_id = table_id;
if (is_replicated_database)
{
auto database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
info.shard = getReplicatedDatabaseShardName(database);
info.replica = getReplicatedDatabaseReplicaName(database);
}
if (!allow_uuid_macro)
info.table_id.uuid = UUIDHelpers::Nil;
zookeeper_path = context->getMacros()->expand(zookeeper_path, info);
info.level = 0;
info.table_id.uuid = UUIDHelpers::Nil;
replica_name = context->getMacros()->expand(replica_name, info);
/// We do not allow renaming table with these macros in metadata, because zookeeper_path will be broken after RENAME TABLE.
/// NOTE: it may happen if table was created by older version of ClickHouse (< 20.10) and macros was not unfolded on table creation
/// or if one of these macros is recursively expanded from some other macro.
/// Also do not allow to move table from Atomic to Ordinary database if there's {uuid} macro
if (info.expanded_database || info.expanded_table)
renaming_restrictions = RenamingRestrictions::DO_NOT_ALLOW;
else if (info.expanded_uuid)
renaming_restrictions = RenamingRestrictions::ALLOW_PRESERVING_UUID;
};
size_t arg_num = 0;
size_t arg_cnt = engine_args.size();
bool has_arguments = (arg_num + 2 <= arg_cnt);
bool has_valid_arguments = has_arguments && engine_args[arg_num]->as<ASTLiteral>() && engine_args[arg_num + 1]->as<ASTLiteral>();
if (has_valid_arguments)
{
/// Get path and name from engine arguments
auto * ast_zk_path = engine_args[arg_num]->as<ASTLiteral>();
if (ast_zk_path && ast_zk_path->value.getType() == Field::Types::String)
zookeeper_path = ast_zk_path->value.safeGet<String>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path in ZooKeeper must be a string literal{}", verbose_help_message);
auto * ast_replica_name = engine_args[arg_num + 1]->as<ASTLiteral>();
if (ast_replica_name && ast_replica_name->value.getType() == Field::Types::String)
replica_name = ast_replica_name->value.safeGet<String>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica name must be a string literal{}", verbose_help_message);
if (replica_name.empty())
throw Exception(ErrorCodes::NO_REPLICA_NAME_GIVEN, "No replica name in config{}", verbose_help_message);
expand_macro(ast_zk_path, ast_replica_name);
}
else if (is_extended_storage_def
&& (arg_cnt == 0
|| !engine_args[arg_num]->as<ASTLiteral>()
|| (arg_cnt == 1 && (getNamePart(engine_name) == "Graphite"))))
{
/// Try use default values if arguments are not specified.
/// Note: {uuid} macro works for ON CLUSTER queries when database engine is Atomic.
const auto & server_settings = context->getServerSettings();
zookeeper_path = server_settings.default_replica_path;
/// TODO maybe use hostname if {replica} is not defined?
replica_name = server_settings.default_replica_name;
/// Modify query, so default values will be written to metadata
assert(arg_num == 0);
ASTs old_args;
std::swap(engine_args, old_args);
auto path_arg = std::make_shared<ASTLiteral>(zookeeper_path);
auto name_arg = std::make_shared<ASTLiteral>(replica_name);
auto * ast_zk_path = path_arg.get();
auto * ast_replica_name = name_arg.get();
expand_macro(ast_zk_path, ast_replica_name);
engine_args.emplace_back(std::move(path_arg));
engine_args.emplace_back(std::move(name_arg));
std::move(std::begin(old_args), std::end(old_args), std::back_inserter(engine_args));
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected two string literal arguments: zookeeper_path and replica_name");
}
/// Extracts a zookeeper path from a specified CREATE TABLE query. Returns std::nullopt if fails.
std::optional<String> extractZooKeeperPathFromReplicatedTableDef(const ASTCreateQuery & query, const ContextPtr & context)
{
try
{
if (!query.storage || !query.storage->engine)
return {};
const String & engine_name = query.storage->engine->name;
if (!isReplicated(engine_name))
return {};
StorageID table_id{query.getDatabase(), query.getTable(), query.uuid};
ASTs engine_args;
if (query.storage->engine->arguments)
engine_args = query.storage->engine->arguments->children;
for (auto & engine_arg : engine_args)
engine_arg = engine_arg->clone();
LoadingStrictnessLevel mode = LoadingStrictnessLevel::CREATE;
String zookeeper_path;
String replica_name;
RenamingRestrictions renaming_restrictions;
extractZooKeeperPathAndReplicaNameFromEngineArgs(query, table_id, engine_name, engine_args, mode, context,
zookeeper_path, replica_name, renaming_restrictions);
return zookeeper_path;
}
catch (...)
{
return {};
}
}
static StoragePtr create(const StorageFactory::Arguments & args)
{
@ -156,17 +399,12 @@ static StoragePtr create(const StorageFactory::Arguments & args)
* - Additional MergeTreeSettings in the SETTINGS clause;
*/
bool is_extended_storage_def = args.storage_def->isExtendedStorageDefinition()
|| (args.query.columns_list->indices && !args.query.columns_list->indices->children.empty())
|| (args.query.columns_list->projections && !args.query.columns_list->projections->children.empty());
bool is_extended_storage_def = isExtendedStorageDef(args.query);
const Settings & local_settings = args.getLocalContext()->getSettingsRef();
String name_part = args.engine_name.substr(0, args.engine_name.size() - strlen("MergeTree"));
bool replicated = startsWith(name_part, "Replicated");
if (replicated)
name_part = name_part.substr(strlen("Replicated"));
bool replicated = isReplicated(args.engine_name);
std::string_view name_part = getNamePart(args.engine_name);
MergeTreeData::MergingParams merging_params;
merging_params.mode = MergeTreeData::MergingParams::Ordinary;
@ -283,29 +521,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
{
/// Allow expressions in engine arguments.
/// In new syntax argument can be literal or identifier or array/tuple of identifiers.
size_t arg_idx = 0;
try
{
for (; arg_idx < engine_args.size(); ++arg_idx)
{
auto & arg = engine_args[arg_idx];
auto * arg_func = arg->as<ASTFunction>();
if (!arg_func)
continue;
/// If we got ASTFunction, let's evaluate it and replace with ASTLiteral.
/// Do not try evaluate array or tuple, because it's array or tuple of column identifiers.
if (arg_func->name == "array" || arg_func->name == "tuple")
continue;
Field value = evaluateConstantExpression(arg, args.getLocalContext()).first;
arg = std::make_shared<ASTLiteral>(value);
}
}
catch (Exception & e)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot evaluate engine argument {}: {} {}",
arg_idx, e.message(), verbose_help_message);
}
evaluateEngineArgs(engine_args, args.getLocalContext());
}
else if (args.mode <= LoadingStrictnessLevel::CREATE && !local_settings.allow_deprecated_syntax_for_merge_tree)
{
@ -314,130 +530,17 @@ static StoragePtr create(const StorageFactory::Arguments & args)
"See also `allow_deprecated_syntax_for_merge_tree` setting.");
}
/// For Replicated.
/// Extract zookeeper path and replica name from engine arguments.
String zookeeper_path;
String replica_name;
RenamingRestrictions renaming_restrictions = RenamingRestrictions::ALLOW_ANY;
bool is_on_cluster = args.getLocalContext()->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
bool is_replicated_database = args.getLocalContext()->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
DatabaseCatalog::instance().getDatabase(args.table_id.database_name)->getEngineName() == "Replicated";
/// Allow implicit {uuid} macros only for zookeeper_path in ON CLUSTER queries
/// and if UUID was explicitly passed in CREATE TABLE (like for ATTACH)
bool allow_uuid_macro = is_on_cluster || is_replicated_database || args.query.attach || args.query.has_uuid;
auto expand_macro = [&] (ASTLiteral * ast_zk_path, ASTLiteral * ast_replica_name)
{
/// Unfold {database} and {table} macro on table creation, so table can be renamed.
if (args.mode < LoadingStrictnessLevel::ATTACH)
{
Macros::MacroExpansionInfo info;
/// NOTE: it's not recursive
info.expand_special_macros_only = true;
info.table_id = args.table_id;
/// Avoid unfolding {uuid} macro on this step.
/// We did unfold it in previous versions to make moving table from Atomic to Ordinary database work correctly,
/// but now it's not allowed (and it was the only reason to unfold {uuid} macro).
info.table_id.uuid = UUIDHelpers::Nil;
zookeeper_path = context->getMacros()->expand(zookeeper_path, info);
info.level = 0;
replica_name = context->getMacros()->expand(replica_name, info);
}
ast_zk_path->value = zookeeper_path;
ast_replica_name->value = replica_name;
/// Expand other macros (such as {shard} and {replica}). We do not expand them on previous step
/// to make possible copying metadata files between replicas.
Macros::MacroExpansionInfo info;
info.table_id = args.table_id;
if (is_replicated_database)
{
auto database = DatabaseCatalog::instance().getDatabase(args.table_id.database_name);
info.shard = getReplicatedDatabaseShardName(database);
info.replica = getReplicatedDatabaseReplicaName(database);
}
if (!allow_uuid_macro)
info.table_id.uuid = UUIDHelpers::Nil;
zookeeper_path = context->getMacros()->expand(zookeeper_path, info);
info.level = 0;
info.table_id.uuid = UUIDHelpers::Nil;
replica_name = context->getMacros()->expand(replica_name, info);
/// We do not allow renaming table with these macros in metadata, because zookeeper_path will be broken after RENAME TABLE.
/// NOTE: it may happen if table was created by older version of ClickHouse (< 20.10) and macros was not unfolded on table creation
/// or if one of these macros is recursively expanded from some other macro.
/// Also do not allow to move table from Atomic to Ordinary database if there's {uuid} macro
if (info.expanded_database || info.expanded_table)
renaming_restrictions = RenamingRestrictions::DO_NOT_ALLOW;
else if (info.expanded_uuid)
renaming_restrictions = RenamingRestrictions::ALLOW_PRESERVING_UUID;
};
if (replicated)
{
bool has_arguments = arg_num + 2 <= arg_cnt;
bool has_valid_arguments = has_arguments && engine_args[arg_num]->as<ASTLiteral>() && engine_args[arg_num + 1]->as<ASTLiteral>();
ASTLiteral * ast_zk_path;
ASTLiteral * ast_replica_name;
if (has_valid_arguments)
{
/// Get path and name from engine arguments
ast_zk_path = engine_args[arg_num]->as<ASTLiteral>();
if (ast_zk_path && ast_zk_path->value.getType() == Field::Types::String)
zookeeper_path = ast_zk_path->value.safeGet<String>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Path in ZooKeeper must be a string literal{}", verbose_help_message);
++arg_num;
ast_replica_name = engine_args[arg_num]->as<ASTLiteral>();
if (ast_replica_name && ast_replica_name->value.getType() == Field::Types::String)
replica_name = ast_replica_name->value.safeGet<String>();
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica name must be a string literal{}", verbose_help_message);
if (replica_name.empty())
throw Exception(ErrorCodes::NO_REPLICA_NAME_GIVEN, "No replica name in config{}", verbose_help_message);
++arg_num;
expand_macro(ast_zk_path, ast_replica_name);
}
else if (is_extended_storage_def
&& (arg_cnt == 0
|| !engine_args[arg_num]->as<ASTLiteral>()
|| (arg_cnt == 1 && merging_params.mode == MergeTreeData::MergingParams::Graphite)))
{
/// Try use default values if arguments are not specified.
/// Note: {uuid} macro works for ON CLUSTER queries when database engine is Atomic.
const auto & server_settings = args.getContext()->getServerSettings();
zookeeper_path = server_settings.default_replica_path;
/// TODO maybe use hostname if {replica} is not defined?
replica_name = server_settings.default_replica_name;
/// Modify query, so default values will be written to metadata
assert(arg_num == 0);
ASTs old_args;
std::swap(engine_args, old_args);
auto path_arg = std::make_shared<ASTLiteral>(zookeeper_path);
auto name_arg = std::make_shared<ASTLiteral>(replica_name);
ast_zk_path = path_arg.get();
ast_replica_name = name_arg.get();
expand_macro(ast_zk_path, ast_replica_name);
engine_args.emplace_back(std::move(path_arg));
engine_args.emplace_back(std::move(name_arg));
std::move(std::begin(old_args), std::end(old_args), std::back_inserter(engine_args));
arg_num = 2;
arg_cnt += 2;
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected two string literal arguments: zookeeper_path and replica_name");
extractZooKeeperPathAndReplicaNameFromEngineArgs(args.query, args.table_id, args.engine_name, args.engine_args, args.mode,
args.getLocalContext(), zookeeper_path, replica_name, renaming_restrictions);
arg_cnt = engine_args.size(); /// Update `arg_cnt` here because extractZooKeeperPathAndReplicaNameFromEngineArgs() could add arguments.
arg_num = 2; /// zookeeper_path and replica_name together are always two arguments.
}
/// This merging param maybe used as part of sorting key

View File

@ -31,7 +31,6 @@
#include <Storages/ColumnsDescription.h>
#include <Storages/Freeze.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Storages/MergeTree/extractZkPathFromCreateQuery.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/LeaderElection.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>