Use shard and replica name from Replicated database arguments (#31488)

* fix another issue

* use shard and replica name from Replicated database

* fix

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
tavplubix 2021-11-23 12:41:54 +03:00 committed by GitHub
parent 2bef313f75
commit 7a43a87f5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 81 additions and 6 deletions

View File

@ -178,6 +178,12 @@ AggregateFunctionPtr AggregateFunctionFactory::getImpl(
/// uniqCombinedIfMergeIf is useful in cases when the underlying
/// storage stores AggregateFunction(uniqCombinedIf) and in SELECT you
/// need to filter aggregation result based on another column.
#if defined(UNBUNDLED)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wstringop-overread"
#endif
if (!combinator->supportsNesting() && nested_name.ends_with(combinator_name))
{
throw Exception(ErrorCodes::ILLEGAL_AGGREGATION,
@ -185,6 +191,10 @@ AggregateFunctionPtr AggregateFunctionFactory::getImpl(
combinator_name);
}
#if defined(UNBUNDLED)
#pragma GCC diagnostic pop
#endif
DataTypes nested_types = combinator->transformArguments(argument_types);
Array nested_parameters = combinator->transformParameters(parameters);

View File

@ -106,6 +106,16 @@ String Macros::expand(const String & s,
res += toString(info.table_id.uuid);
info.expanded_uuid = true;
}
else if (info.shard && macro_name == "shard")
{
res += *info.shard;
info.expanded_uuid = true;
}
else if (info.replica && macro_name == "replica")
{
res += *info.replica;
info.expanded_uuid = true;
}
else if (info.ignore_unknown || info.expand_special_macros_only)
{
if (info.expand_special_macros_only)

View File

@ -34,6 +34,8 @@ public:
StorageID table_id = StorageID::createEmpty();
bool ignore_unknown = false;
bool expand_special_macros_only = false;
std::optional<String> shard;
std::optional<String> replica;
/// Information about macro expansion
size_t level = 0;

View File

@ -8,6 +8,7 @@
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
@ -30,7 +31,6 @@
#if USE_MYSQL || USE_LIBPQXX
#include <Common/parseRemoteDescription.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Common/parseAddress.h>
#endif
@ -258,7 +258,9 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
if (!engine->arguments || engine->arguments->children.size() != 3)
throw Exception("Replicated database requires 3 arguments: zookeeper path, shard name and replica name", ErrorCodes::BAD_ARGUMENTS);
const auto & arguments = engine->arguments->children;
auto & arguments = engine->arguments->children;
for (auto & engine_arg : arguments)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
String zookeeper_path = safeGetLiteralValue<String>(arguments[0], "Replicated");
String shard_name = safeGetLiteralValue<String>(arguments[1], "Replicated");

View File

@ -50,6 +50,8 @@ public:
void stopReplication() override;
String getShardName() const { return shard_name; }
String getReplicaName() const { return replica_name; }
String getFullReplicaName() const;
static std::pair<String, String> parseFullReplicaName(const String & name);

View File

@ -0,0 +1,17 @@
#include <Databases/DatabaseReplicatedHelpers.h>
#include <Databases/DatabaseReplicated.h>
namespace DB
{
String getReplicatedDatabaseShardName(const DatabasePtr & database)
{
return assert_cast<const DatabaseReplicated *>(database.get())->getShardName();
}
String getReplicatedDatabaseReplicaName(const DatabasePtr & database)
{
return assert_cast<const DatabaseReplicated *>(database.get())->getReplicaName();
}
}

View File

@ -0,0 +1,14 @@
#pragma once
#include <Core/Types.h>
#include <memory>
namespace DB
{
class IDatabase;
using DatabasePtr = std::shared_ptr<IDatabase>;
String getReplicatedDatabaseShardName(const DatabasePtr & database);
String getReplicatedDatabaseReplicaName(const DatabasePtr & database);
}

View File

@ -36,8 +36,19 @@ void mergeDependenciesGraphs(DependenciesInfos & main_dependencies_info, const D
if (maybe_existing_info.dependencies.empty())
maybe_existing_info.dependencies = dependencies;
else if (maybe_existing_info.dependencies != dependencies)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Have different dependencies for {}: {} and {}, it's a bug",
table, fmt::join(maybe_existing_info.dependencies, ", "), fmt::join(dependencies, ", "));
{
/// Can happen on DatabaseReplicated recovery
LOG_WARNING(&Poco::Logger::get("TablesLoader"), "Replacing outdated dependencies ({}) of {} with: {}",
fmt::join(maybe_existing_info.dependencies, ", "),
table,
fmt::join(dependencies, ", "));
for (const auto & old_dependency : maybe_existing_info.dependencies)
{
[[maybe_unused]] bool removed = main_dependencies_info[old_dependency].dependent_database_objects.erase(table);
assert(removed);
}
maybe_existing_info.dependencies = dependencies;
}
}
}
}

View File

@ -22,6 +22,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Databases/DatabaseReplicatedHelpers.h>
namespace DB
{
@ -541,6 +542,12 @@ static StoragePtr create(const StorageFactory::Arguments & args)
/// to make possible copying metadata files between replicas.
Macros::MacroExpansionInfo info;
info.table_id = args.table_id;
if (is_replicated_database)
{
auto database = DatabaseCatalog::instance().getDatabase(args.table_id.database_name);
info.shard = getReplicatedDatabaseShardName(database);
info.replica = getReplicatedDatabaseReplicaName(database);
}
if (!allow_uuid_macro)
info.table_id.uuid = UUIDHelpers::Nil;
zookeeper_path = args.getContext()->getMacros()->expand(zookeeper_path, info);

View File

@ -16,7 +16,7 @@ main_node = cluster.add_instance('main_node', main_configs=['configs/config.xml'
dummy_node = cluster.add_instance('dummy_node', main_configs=['configs/config.xml'], user_configs=['configs/settings.xml'], with_zookeeper=True, stay_alive=True, macros={"shard": 1, "replica": 2})
competing_node = cluster.add_instance('competing_node', main_configs=['configs/config.xml'], user_configs=['configs/settings.xml'], with_zookeeper=True, macros={"shard": 1, "replica": 3})
snapshotting_node = cluster.add_instance('snapshotting_node', main_configs=['configs/config.xml'], user_configs=['configs/settings.xml'], with_zookeeper=True, macros={"shard": 2, "replica": 1})
snapshot_recovering_node = cluster.add_instance('snapshot_recovering_node', main_configs=['configs/config.xml'], user_configs=['configs/settings.xml'], with_zookeeper=True, macros={"shard": 2, "replica": 2})
snapshot_recovering_node = cluster.add_instance('snapshot_recovering_node', main_configs=['configs/config.xml'], user_configs=['configs/settings.xml'], with_zookeeper=True)
all_nodes = [main_node, dummy_node, competing_node, snapshotting_node, snapshot_recovering_node]
@ -37,7 +37,7 @@ def started_cluster():
cluster.shutdown()
def test_create_replicated_table(started_cluster):
main_node.query("CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica1');")
main_node.query("CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica' || '1');")
dummy_node.query("CREATE DATABASE testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica2');")
assert "Explicit zookeeper_path and replica_name are specified" in \
main_node.query_and_get_error("CREATE TABLE testdb.replicated_table (d Date, k UInt64, i32 Int32) "