ClickHouse/src/Interpreters/DDLTask.cpp
2021-05-31 17:49:02 +03:00

418 lines
15 KiB
C++

#include <Interpreters/DDLTask.h>
#include <Common/DNSResolver.h>
#include <Common/isLocalAddress.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <Poco/Net/NetException.h>
#include <common/logger_useful.h>
#include <Parsers/ParserQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Databases/DatabaseReplicated.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT_VERSION;
extern const int UNKNOWN_TYPE_OF_QUERY;
extern const int INCONSISTENT_CLUSTER_DEFINITION;
}
HostID HostID::fromString(const String & host_port_str)
{
HostID res;
std::tie(res.host_name, res.port) = Cluster::Address::fromString(host_port_str);
return res;
}
bool HostID::isLocalAddress(UInt16 clickhouse_port) const
{
try
{
return DB::isLocalAddress(DNSResolver::instance().resolveAddress(host_name, port), clickhouse_port);
}
catch (const Poco::Net::NetException &)
{
/// Avoid "Host not found" exceptions
return false;
}
}
void DDLLogEntry::assertVersion() const
{
constexpr UInt64 max_version = 2;
if (version == 0 || max_version < version)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unknown DDLLogEntry format version: {}."
"Maximum supported version is {}", version, max_version);
}
void DDLLogEntry::setSettingsIfRequired(ContextPtr context)
{
version = context->getSettingsRef().distributed_ddl_entry_format_version;
if (version == 2)
settings.emplace(context->getSettingsRef().changes());
}
String DDLLogEntry::toString() const
{
WriteBufferFromOwnString wb;
wb << "version: " << version << "\n";
wb << "query: " << escape << query << "\n";
bool write_hosts = version == 1 || !hosts.empty();
if (write_hosts)
{
Strings host_id_strings(hosts.size());
std::transform(hosts.begin(), hosts.end(), host_id_strings.begin(), HostID::applyToString);
wb << "hosts: " << host_id_strings << "\n";
}
wb << "initiator: " << initiator << "\n";
bool write_settings = 1 <= version && settings && !settings->empty();
if (write_settings)
{
ASTSetQuery ast;
ast.is_standalone = false;
ast.changes = *settings;
wb << "settings: " << serializeAST(ast) << "\n";
}
return wb.str();
}
void DDLLogEntry::parse(const String & data)
{
ReadBufferFromString rb(data);
rb >> "version: " >> version >> "\n";
assertVersion();
Strings host_id_strings;
rb >> "query: " >> escape >> query >> "\n";
if (version == 1)
{
rb >> "hosts: " >> host_id_strings >> "\n";
if (!rb.eof())
rb >> "initiator: " >> initiator >> "\n";
else
initiator.clear();
}
else if (version == 2)
{
if (!rb.eof() && *rb.position() == 'h')
rb >> "hosts: " >> host_id_strings >> "\n";
if (!rb.eof() && *rb.position() == 'i')
rb >> "initiator: " >> initiator >> "\n";
if (!rb.eof() && *rb.position() == 's')
{
String settings_str;
rb >> "settings: " >> settings_str >> "\n";
ParserSetQuery parser{true};
constexpr UInt64 max_size = 4096;
constexpr UInt64 max_depth = 16;
ASTPtr settings_ast = parseQuery(parser, settings_str, max_size, max_depth);
settings.emplace(std::move(settings_ast->as<ASTSetQuery>()->changes));
}
}
assertEOF(rb);
if (!host_id_strings.empty())
{
hosts.resize(host_id_strings.size());
std::transform(host_id_strings.begin(), host_id_strings.end(), hosts.begin(), HostID::fromString);
}
}
void DDLTaskBase::parseQueryFromEntry(ContextPtr context)
{
const char * begin = entry.query.data();
const char * end = begin + entry.query.size();
ParserQuery parser_query(end);
String description;
query = parseQuery(parser_query, begin, end, description, 0, context->getSettingsRef().max_parser_depth);
}
ContextMutablePtr DDLTaskBase::makeQueryContext(ContextPtr from_context, const ZooKeeperPtr & /*zookeeper*/)
{
auto query_context = Context::createCopy(from_context);
query_context->makeQueryContext();
query_context->setCurrentQueryId(""); // generate random query_id
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
if (entry.settings)
query_context->applySettingsChanges(*entry.settings);
return query_context;
}
bool DDLTask::findCurrentHostID(ContextPtr global_context, Poco::Logger * log)
{
bool host_in_hostlist = false;
for (const HostID & host : entry.hosts)
{
auto maybe_secure_port = global_context->getTCPPortSecure();
/// The port is considered local if it matches TCP or TCP secure port that the server is listening.
bool is_local_port = (maybe_secure_port && host.isLocalAddress(*maybe_secure_port))
|| host.isLocalAddress(global_context->getTCPPort());
if (!is_local_port)
continue;
if (host_in_hostlist)
{
/// This check could be slow a little bit
LOG_WARNING(log, "There are two the same ClickHouse instances in task {}: {} and {}. Will use the first one only.",
entry_name, host_id.readableString(), host.readableString());
}
else
{
host_in_hostlist = true;
host_id = host;
host_id_str = host.toString();
}
}
return host_in_hostlist;
}
void DDLTask::setClusterInfo(ContextPtr context, Poco::Logger * log)
{
auto * query_on_cluster = dynamic_cast<ASTQueryWithOnCluster *>(query.get());
if (!query_on_cluster)
throw Exception("Received unknown DDL query", ErrorCodes::UNKNOWN_TYPE_OF_QUERY);
cluster_name = query_on_cluster->cluster;
cluster = context->tryGetCluster(cluster_name);
if (!cluster)
throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION,
"DDL task {} contains current host {} in cluster {}, but there are no such cluster here.",
entry_name, host_id.readableString(), cluster_name);
/// Try to find host from task host list in cluster
/// At the first, try find exact match (host name and ports should be literally equal)
/// If the attempt fails, try find it resolving host name of each instance
if (!tryFindHostInCluster())
{
LOG_WARNING(log, "Not found the exact match of host {} from task {} in cluster {} definition. Will try to find it using host name resolving.",
host_id.readableString(), entry_name, cluster_name);
if (!tryFindHostInClusterViaResolving(context))
throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION, "Not found host {} in definition of cluster {}",
host_id.readableString(), cluster_name);
LOG_INFO(log, "Resolved host {} from task {} as host {} in definition of cluster {}",
host_id.readableString(), entry_name, address_in_cluster.readableString(), cluster_name);
}
query = query_on_cluster->getRewrittenASTWithoutOnCluster(address_in_cluster.default_database);
query_on_cluster = nullptr;
}
bool DDLTask::tryFindHostInCluster()
{
const auto & shards = cluster->getShardsAddresses();
bool found_exact_match = false;
String default_database;
for (size_t shard_num = 0; shard_num < shards.size(); ++shard_num)
{
for (size_t replica_num = 0; replica_num < shards[shard_num].size(); ++replica_num)
{
const Cluster::Address & address = shards[shard_num][replica_num];
if (address.host_name == host_id.host_name && address.port == host_id.port)
{
if (found_exact_match)
{
if (default_database == address.default_database)
{
throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION,
"There are two exactly the same ClickHouse instances {} in cluster {}",
address.readableString(), cluster_name);
}
else
{
/* Circular replication is used.
* It is when every physical node contains
* replicas of different shards of the same table.
* To distinguish one replica from another on the same node,
* every shard is placed into separate database.
* */
is_circular_replicated = true;
auto * query_with_table = dynamic_cast<ASTQueryWithTableAndOutput *>(query.get());
if (!query_with_table || query_with_table->database.empty())
{
throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION,
"For a distributed DDL on circular replicated cluster its table name must be qualified by database name.");
}
if (default_database == query_with_table->database)
return true;
}
}
found_exact_match = true;
host_shard_num = shard_num;
host_replica_num = replica_num;
address_in_cluster = address;
default_database = address.default_database;
}
}
}
return found_exact_match;
}
bool DDLTask::tryFindHostInClusterViaResolving(ContextPtr context)
{
const auto & shards = cluster->getShardsAddresses();
bool found_via_resolving = false;
for (size_t shard_num = 0; shard_num < shards.size(); ++shard_num)
{
for (size_t replica_num = 0; replica_num < shards[shard_num].size(); ++replica_num)
{
const Cluster::Address & address = shards[shard_num][replica_num];
if (auto resolved = address.getResolvedAddress(); resolved
&& (isLocalAddress(*resolved, context->getTCPPort())
|| (context->getTCPPortSecure() && isLocalAddress(*resolved, *context->getTCPPortSecure()))))
{
if (found_via_resolving)
{
throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION,
"There are two the same ClickHouse instances in cluster {} : {} and {}",
cluster_name, address_in_cluster.readableString(), address.readableString());
}
else
{
found_via_resolving = true;
host_shard_num = shard_num;
host_replica_num = replica_num;
address_in_cluster = address;
}
}
}
}
return found_via_resolving;
}
String DDLTask::getShardID() const
{
/// Generate unique name for shard node, it will be used to execute the query by only single host
/// Shard node name has format 'replica_name1,replica_name2,...,replica_nameN'
/// Where replica_name is 'replica_config_host_name:replica_port'
auto shard_addresses = cluster->getShardsAddresses().at(host_shard_num);
Strings replica_names;
for (const Cluster::Address & address : shard_addresses)
replica_names.emplace_back(address.readableString());
std::sort(replica_names.begin(), replica_names.end());
String res;
for (auto it = replica_names.begin(); it != replica_names.end(); ++it)
res += *it + (std::next(it) != replica_names.end() ? "," : "");
return res;
}
DatabaseReplicatedTask::DatabaseReplicatedTask(const String & name, const String & path, DatabaseReplicated * database_)
: DDLTaskBase(name, path)
, database(database_)
{
host_id_str = database->getFullReplicaName();
}
String DatabaseReplicatedTask::getShardID() const
{
return database->shard_name;
}
void DatabaseReplicatedTask::parseQueryFromEntry(ContextPtr context)
{
DDLTaskBase::parseQueryFromEntry(context);
if (auto * ddl_query = dynamic_cast<ASTQueryWithTableAndOutput *>(query.get()))
{
/// Update database name with actual name of local database
assert(ddl_query->database.empty());
ddl_query->database = database->getDatabaseName();
}
}
ContextMutablePtr DatabaseReplicatedTask::makeQueryContext(ContextPtr from_context, const ZooKeeperPtr & zookeeper)
{
auto query_context = DDLTaskBase::makeQueryContext(from_context, zookeeper);
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
query_context->setCurrentDatabase(database->getDatabaseName());
auto txn = std::make_shared<ZooKeeperMetadataTransaction>(zookeeper, database->zookeeper_path, is_initial_query);
query_context->initZooKeeperMetadataTransaction(txn);
if (is_initial_query)
{
txn->addOp(zkutil::makeRemoveRequest(entry_path + "/try", -1));
txn->addOp(zkutil::makeCreateRequest(entry_path + "/committed", host_id_str, zkutil::CreateMode::Persistent));
txn->addOp(zkutil::makeSetRequest(database->zookeeper_path + "/max_log_ptr", toString(getLogEntryNumber(entry_name)), -1));
}
txn->addOp(zkutil::makeSetRequest(database->replica_path + "/log_ptr", toString(getLogEntryNumber(entry_name)), -1));
for (auto & op : ops)
txn->addOp(std::move(op));
ops.clear();
return query_context;
}
String DDLTaskBase::getLogEntryName(UInt32 log_entry_number)
{
/// Sequential counter in ZooKeeper is Int32.
assert(log_entry_number < std::numeric_limits<Int32>::max());
constexpr size_t seq_node_digits = 10;
String number = toString(log_entry_number);
String name = "query-" + String(seq_node_digits - number.size(), '0') + number;
return name;
}
UInt32 DDLTaskBase::getLogEntryNumber(const String & log_entry_name)
{
constexpr const char * name = "query-";
assert(startsWith(log_entry_name, name));
UInt32 num = parse<UInt32>(log_entry_name.substr(strlen(name)));
assert(num < std::numeric_limits<Int32>::max());
return num;
}
void ZooKeeperMetadataTransaction::commit()
{
assert(state == CREATED);
state = FAILED;
current_zookeeper->multi(ops);
state = COMMITTED;
}
ClusterPtr tryGetReplicatedDatabaseCluster(const String & cluster_name)
{
if (const auto * replicated_db = dynamic_cast<const DatabaseReplicated *>(DatabaseCatalog::instance().tryGetDatabase(cluster_name).get()))
return replicated_db->getCluster();
return {};
}
}