add implicit uuid macro

This commit is contained in:
Alexander Tokmakov 2020-07-10 12:19:32 +03:00
parent 77289146ab
commit 04c3e7cab6
7 changed files with 115 additions and 36 deletions

View File

@ -22,7 +22,11 @@ Macros::Macros(const Poco::Util::AbstractConfiguration & config, const String &
}
}
String Macros::expand(const String & s, size_t level, const String & database_name, const String & table_name) const
String Macros::expand(const String & s,
size_t level,
const String & database_name,
const String & table_name,
const UUID & uuid) const
{
if (s.find('{') == String::npos)
return s;
@ -64,10 +68,12 @@ String Macros::expand(const String & s, size_t level, const String & database_na
res += database_name;
else if (macro_name == "table" && !table_name.empty())
res += table_name;
else if (macro_name == "uuid" && uuid != UUIDHelpers::Nil)
res += toString(uuid);
else
throw Exception("No macro '" + macro_name +
"' in config while processing substitutions in '" + s + "' at "
+ toString(begin), ErrorCodes::SYNTAX_ERROR);
"' in config while processing substitutions in '" + s + "' at '"
+ toString(begin) + "' or macro is not supported here", ErrorCodes::SYNTAX_ERROR);
pos = end + 1;
}
@ -82,9 +88,9 @@ String Macros::getValue(const String & key) const
throw Exception("No macro " + key + " in config", ErrorCodes::SYNTAX_ERROR);
}
String Macros::expand(const String & s, const String & database_name, const String & table_name) const
String Macros::expand(const String & s, const StorageID & table_id, bool allow_uuid) const
{
return expand(s, 0, database_name, table_name);
return expand(s, 0, table_id.database_name, table_id.table_name, allow_uuid ? table_id.uuid : UUIDHelpers::Nil);
}
Names Macros::expand(const Names & source_names, size_t level) const

View File

@ -2,6 +2,7 @@
#include <Core/Types.h>
#include <Core/Names.h>
#include <Interpreters/StorageID.h>
#include <map>
@ -30,9 +31,13 @@ public:
* If {database} and {table} macros aren`t defined explicitly, expand them as database_name and table_name respectively.
* level - the level of recursion.
*/
String expand(const String & s, size_t level = 0, const String & database_name = "", const String & table_name = "") const;
String expand(const String & s,
size_t level = 0,
const String & database_name = "",
const String & table_name = "",
const UUID & uuid = UUIDHelpers::Nil) const;
String expand(const String & s, const String & database_name, const String & table_name) const;
String expand(const String & s, const StorageID & table_id, bool allow_uuid) const;
/** Apply expand for the list.

View File

@ -129,7 +129,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
bool old_style_database = context.getSettingsRef().default_database_engine.value == DefaultDatabaseEngine::Ordinary;
auto engine = std::make_shared<ASTFunction>();
auto storage = std::make_shared<ASTStorage>();
engine->name = !old_style_database ? "Ordinary" : "Atomic"; //FIXME
engine->name = old_style_database ? "Ordinary" : "Atomic";
storage->set(storage->engine, engine);
create.set(create.storage, storage);
}
@ -161,9 +161,12 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
}
else
{
if (create.uuid != UUIDHelpers::Nil)
bool is_on_cluster = context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
if (create.uuid != UUIDHelpers::Nil && !is_on_cluster)
throw Exception("Ordinary database engine does not support UUID", ErrorCodes::INCORRECT_QUERY);
/// Ignore UUID if it's ON CLUSTER query
create.uuid = UUIDHelpers::Nil;
metadata_path = metadata_path / "metadata" / database_name_escaped;
}
@ -672,8 +675,12 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
}
else
{
if (create.uuid != UUIDHelpers::Nil)
bool is_on_cluster = context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
if (create.uuid != UUIDHelpers::Nil && !is_on_cluster)
throw Exception("Table UUID specified, but engine of database " + create.database + " is not Atomic", ErrorCodes::INCORRECT_QUERY);
/// Ignore UUID if it's ON CLUSTER query
create.uuid = UUIDHelpers::Nil;
}
/** If the request specifies IF NOT EXISTS, we allow concurrent CREATE queries (which do nothing).
@ -778,6 +785,8 @@ BlockIO InterpreterCreateQuery::fillTableIfNeeded(const ASTCreateQuery & create)
BlockIO InterpreterCreateQuery::createDictionary(ASTCreateQuery & create)
{
create.uuid = UUIDHelpers::Nil; //FIXME
String dictionary_name = create.table;
create.database = context.resolveDatabase(create.database);
@ -816,8 +825,10 @@ BlockIO InterpreterCreateQuery::execute()
auto & create = query_ptr->as<ASTCreateQuery &>();
if (!create.cluster.empty())
{
/// NOTE: if it's CREATE query and create.database is DatabaseAtomic, different UUIDs will be generated on all servers.
/// However, it allows to use UUID as replica name.
/// For CREATE query generate UUID on initiator, so it will be the same on all hosts.
/// It will be ignored if database does not support UUIDs.
if (!create.attach && create.uuid == UUIDHelpers::Nil)
create.uuid = UUIDHelpers::generateV4();
return executeDDLQueryOnCluster(query_ptr, context, getRequiredAccess());
}

View File

@ -7,6 +7,7 @@
#include <Common/typeid_cast.h>
#include <Common/OptimizedRegularExpression.h>
#include <Common/Macros.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
@ -324,8 +325,16 @@ static StoragePtr create(const StorageFactory::Arguments & args)
if (replicated)
{
add_mandatory_param("path in ZooKeeper");
add_mandatory_param("replica name");
if (is_extended_storage_def)
{
add_optional_param("path in ZooKeeper");
add_optional_param("replica name");
}
else
{
add_mandatory_param("path in ZooKeeper");
add_mandatory_param("replica name");
}
}
if (!is_extended_storage_def)
@ -394,28 +403,50 @@ static StoragePtr create(const StorageFactory::Arguments & args)
if (replicated)
{
const auto * ast = engine_args[arg_num]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
zookeeper_path = safeGet<String>(ast->value);
else
throw Exception(
"Path in ZooKeeper must be a string literal" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
++arg_num;
bool has_arguments = arg_num + 2 <= arg_cnt
&& engine_args[arg_num]->as<ASTLiteral>()
&& engine_args[arg_num + 1]->as<ASTLiteral>();
ast = engine_args[arg_num]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
replica_name = safeGet<String>(ast->value);
else
throw Exception(
"Replica name must be a string literal" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
if (has_arguments)
{
const auto * ast = engine_args[arg_num]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
zookeeper_path = safeGet<String>(ast->value);
else
throw Exception(
"Path in ZooKeeper must be a string literal" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
++arg_num;
if (replica_name.empty())
throw Exception(
"No replica name in config" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::NO_REPLICA_NAME_GIVEN);
++arg_num;
ast = engine_args[arg_num]->as<ASTLiteral>();
if (ast && ast->value.getType() == Field::Types::String)
replica_name = safeGet<String>(ast->value);
else
throw Exception(
"Replica name must be a string literal" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);
if (replica_name.empty())
throw Exception(
"No replica name in config" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::NO_REPLICA_NAME_GIVEN);
++arg_num;
}
else if (is_extended_storage_def)
{
/// Try use default values if arguments are not specified.
/// It works for ON CLUSTER queries when database engine is Atomic and there are {shard} and {replica} in config.
zookeeper_path = "/clickhouse/tables/{uuid}/{shard}";
replica_name = "{replica}"; /// TODO maybe use hostname if {replica} is not defined?
}
else
throw Exception("Expected zookeper_path and replica_name arguments", ErrorCodes::LOGICAL_ERROR);
/// Allow implicit {uuid} macros only for zookeeper_path in ON CLUSTER queries
bool is_on_cluster = args.local_context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
bool allow_uuid_macro = is_on_cluster || args.query.attach;
zookeeper_path = args.context.getMacros()->expand(zookeeper_path, args.table_id, allow_uuid_macro);
replica_name = args.context.getMacros()->expand(replica_name, args.table_id, false);
}
/// This merging param maybe used as part of sorting key

View File

@ -175,8 +175,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
true, /// require_part_metadata
attach,
[this] (const std::string & name) { enqueuePartForCheck(name); })
, zookeeper_path(global_context.getMacros()->expand(zookeeper_path_, table_id_.database_name, table_id_.table_name))
, replica_name(global_context.getMacros()->expand(replica_name_, table_id_.database_name, table_id_.table_name))
, zookeeper_path(zookeeper_path_)
, replica_name(replica_name_)
, reader(*this)
, writer(*this)
, merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads())

View File

@ -1,5 +1,7 @@
<yandex>
<macros>
<test>Hello, world!</test>
<shard>s1</shard>
<replica>r1</replica>
</macros>
</yandex>

View File

@ -295,6 +295,30 @@ def test_socket_timeout(test_cluster):
for i in range(0, 100):
instance.query("select hostName() as host, count() from cluster('cluster', 'system', 'settings') group by host")
def test_replicated_without_arguments(test_cluster):
def insert_and_check(i):
for name in ['ch1', 'ch2', 'ch3', 'ch4']:
test_cluster.instances[name].query("INSERT INTO test_atomic.rmt VALUES (?, hostName())".replace('?', str(i)))
for name in ['ch1', 'ch2', 'ch3', 'ch4']:
test_cluster.instances[name].query("SYSTEM SYNC REPLICA test_atomic.rmt")
assert instance.query("SELECT * FROM cluster('cluster', 'test_atomic', 'rmt') ORDER BY s") == TSV("?\tch1\n?\tch2\n?\tch3\n?\tch4\n".replace('?', str(i)))
instance = test_cluster.instances['ch1']
test_cluster.ddl_check_query(instance, "CREATE DATABASE test_atomic ON CLUSTER cluster ENGINE=Atomic")
test_cluster.ddl_check_query(instance, "CREATE TABLE test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n")
assert instance.query("SELECT count(DISTINCT uuid) FROM cluster('cluster', 'system', 'databases') WHERE name='test_atomic'") == "1\n"
assert instance.query("SELECT count(DISTINCT uuid) FROM cluster('cluster', 'system', 'tables') WHERE database='test_atomic' AND name='rmt'") == "1\n"
insert_and_check(1)
test_cluster.ddl_check_query(instance, "DROP TABLE test_atomic.rmt ON CLUSTER cluster")
test_cluster.ddl_check_query(instance, "CREATE TABLE test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n")
insert_and_check(2)
test_cluster.ddl_check_query(instance, "RENAME TABLE test_atomic.rmt TO test_atomic.rmt_renamed ON CLUSTER cluster")
test_cluster.ddl_check_query(instance, "CREATE TABLE test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n")
insert_and_check(3)
test_cluster.ddl_check_query(instance, "EXCHANGE TABLES test_atomic.rmt AND test_atomic.rmt_renamed ON CLUSTER cluster")
assert instance.query("SELECT DISTINCT n FROM cluster('cluster', 'test_atomic', 'rmt')") == "2\n"
assert instance.query("SELECT DISTINCT n FROM cluster('cluster', 'test_atomic', 'rmt_renamed')") == "3\n"
if __name__ == '__main__':
with contextmanager(test_cluster)() as ctx_cluster:
for name, instance in ctx_cluster.instances.items():