ClickHouse/src/Interpreters/DDLTask.cpp

501 lines
18 KiB
C++
Raw Normal View History

2020-11-03 13:47:26 +00:00
#include <Interpreters/DDLTask.h>
2022-01-30 19:49:48 +00:00
#include <base/sort.h>
2020-11-03 13:47:26 +00:00
#include <Common/DNSResolver.h>
#include <Common/isLocalAddress.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <Poco/Net/NetException.h>
2022-04-27 15:05:45 +00:00
#include <Common/logger_useful.h>
2020-11-27 14:04:03 +00:00
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/ParserQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
2020-11-27 14:04:03 +00:00
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Databases/DatabaseReplicated.h>
#include <Parsers/maskSensitiveInfoInQueryForLogging.h>
2020-11-03 13:47:26 +00:00
2022-01-30 19:49:48 +00:00
2020-11-03 13:47:26 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT_VERSION;
2020-11-27 14:04:03 +00:00
extern const int UNKNOWN_TYPE_OF_QUERY;
extern const int INCONSISTENT_CLUSTER_DEFINITION;
2021-07-02 16:39:55 +00:00
extern const int LOGICAL_ERROR;
extern const int DNS_ERROR;
2020-11-03 13:47:26 +00:00
}
HostID HostID::fromString(const String & host_port_str)
{
HostID res;
std::tie(res.host_name, res.port) = Cluster::Address::fromString(host_port_str);
return res;
}
bool HostID::isLocalAddress(UInt16 clickhouse_port) const
{
try
{
return DB::isLocalAddress(DNSResolver::instance().resolveAddress(host_name, port), clickhouse_port);
}
catch (const Poco::Net::NetException &)
{
/// Avoid "Host not found" exceptions
return false;
}
}
void DDLLogEntry::assertVersion() const
{
if (version == 0
2022-09-23 03:36:06 +00:00
/// NORMALIZE_CREATE_ON_INITIATOR_VERSION does not change the entry format, it uses versioin 2, so there shouldn't be such version
2022-09-23 03:32:21 +00:00
|| version == NORMALIZE_CREATE_ON_INITIATOR_VERSION
2022-09-23 03:36:06 +00:00
|| version > DDL_ENTRY_FORMAT_MAX_VERSION)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unknown DDLLogEntry format version: {}."
2022-09-23 03:36:06 +00:00
"Maximum supported version is {}", version, DDL_ENTRY_FORMAT_MAX_VERSION);
}
void DDLLogEntry::setSettingsIfRequired(ContextPtr context)
{
2022-09-23 03:36:06 +00:00
version = context->getSettingsRef().distributed_ddl_entry_format_version;
if (version <= 0 || version > DDL_ENTRY_FORMAT_MAX_VERSION)
2022-09-23 03:32:21 +00:00
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unknown distributed_ddl_entry_format_version: {}."
2022-09-23 03:36:06 +00:00
"Maximum supported version is {}.", version, DDL_ENTRY_FORMAT_MAX_VERSION);
/// NORMALIZE_CREATE_ON_INITIATOR_VERSION does not affect entry format in ZooKeeper
if (version == NORMALIZE_CREATE_ON_INITIATOR_VERSION)
version = SETTINGS_IN_ZK_VERSION;
2022-09-23 03:32:21 +00:00
if (version >= SETTINGS_IN_ZK_VERSION)
settings.emplace(context->getSettingsRef().changes());
}
2020-11-03 13:47:26 +00:00
String DDLLogEntry::toString() const
{
WriteBufferFromOwnString wb;
wb << "version: " << version << "\n";
wb << "query: " << escape << query << "\n";
bool write_hosts = version == OLDEST_VERSION || !hosts.empty();
if (write_hosts)
{
Strings host_id_strings(hosts.size());
std::transform(hosts.begin(), hosts.end(), host_id_strings.begin(), HostID::applyToString);
wb << "hosts: " << host_id_strings << "\n";
}
2020-11-03 13:47:26 +00:00
wb << "initiator: " << initiator << "\n";
bool write_settings = SETTINGS_IN_ZK_VERSION <= version && settings && !settings->empty();
if (write_settings)
{
ASTSetQuery ast;
ast.is_standalone = false;
ast.changes = *settings;
wb << "settings: " << serializeAST(ast) << "\n";
}
2022-09-23 03:32:21 +00:00
if (version >= OPENTELEMETRY_ENABLED_VERSION)
wb << "tracing: " << this->tracing_context;
2020-11-03 13:47:26 +00:00
return wb.str();
}
void DDLLogEntry::parse(const String & data)
{
ReadBufferFromString rb(data);
rb >> "version: " >> version >> "\n";
assertVersion();
2020-11-03 13:47:26 +00:00
Strings host_id_strings;
rb >> "query: " >> escape >> query >> "\n";
2022-09-23 03:32:21 +00:00
if (version == OLDEST_VERSION)
{
rb >> "hosts: " >> host_id_strings >> "\n";
if (!rb.eof())
rb >> "initiator: " >> initiator >> "\n";
else
initiator.clear();
}
2022-09-23 03:32:21 +00:00
else if (version >= SETTINGS_IN_ZK_VERSION)
{
if (!rb.eof() && *rb.position() == 'h')
rb >> "hosts: " >> host_id_strings >> "\n";
if (!rb.eof() && *rb.position() == 'i')
rb >> "initiator: " >> initiator >> "\n";
if (!rb.eof() && *rb.position() == 's')
{
String settings_str;
rb >> "settings: " >> settings_str >> "\n";
ParserSetQuery parser{true};
constexpr UInt64 max_size = 4096;
constexpr UInt64 max_depth = 16;
ASTPtr settings_ast = parseQuery(parser, settings_str, max_size, max_depth);
settings.emplace(std::move(settings_ast->as<ASTSetQuery>()->changes));
}
}
2020-11-03 13:47:26 +00:00
2022-09-23 03:32:21 +00:00
if (version >= OPENTELEMETRY_ENABLED_VERSION)
{
if (!rb.eof() && *rb.position() == 't')
rb >> "tracing: " >> this->tracing_context;
}
assertEOF(rb);
2022-09-15 10:00:08 +00:00
if (!host_id_strings.empty())
{
hosts.resize(host_id_strings.size());
std::transform(host_id_strings.begin(), host_id_strings.end(), hosts.begin(), HostID::fromString);
}
2020-11-03 13:47:26 +00:00
}
void DDLTaskBase::parseQueryFromEntry(ContextPtr context)
2020-11-27 14:04:03 +00:00
{
const char * begin = entry.query.data();
const char * end = begin + entry.query.size();
const auto & settings = context->getSettingsRef();
2020-11-27 14:04:03 +00:00
ParserQuery parser_query(end, settings.allow_settings_after_format_in_insert);
2020-11-27 14:04:03 +00:00
String description;
query = parseQuery(parser_query, begin, end, description, 0, settings.max_parser_depth);
2020-11-27 14:04:03 +00:00
}
void DDLTaskBase::formatRewrittenQuery(ContextPtr)
{
/// Convert rewritten AST back to string.
query_str = queryToString(*query);
query_for_logging = maskSensitiveInfoInQueryForLogging(query_str, query);
}
2021-05-31 14:49:02 +00:00
ContextMutablePtr DDLTaskBase::makeQueryContext(ContextPtr from_context, const ZooKeeperPtr & /*zookeeper*/)
2020-11-27 14:04:03 +00:00
{
auto query_context = Context::createCopy(from_context);
2020-11-27 14:04:03 +00:00
query_context->makeQueryContext();
query_context->setCurrentQueryId(""); // generate random query_id
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
if (entry.settings)
query_context->applySettingsChanges(*entry.settings);
2020-11-27 14:04:03 +00:00
return query_context;
}
bool DDLTask::findCurrentHostID(ContextPtr global_context, Poco::Logger * log)
2020-11-27 14:04:03 +00:00
{
bool host_in_hostlist = false;
std::exception_ptr first_exception = nullptr;
2020-11-27 14:04:03 +00:00
for (const HostID & host : entry.hosts)
{
auto maybe_secure_port = global_context->getTCPPortSecure();
2020-11-27 14:04:03 +00:00
try
{
/// The port is considered local if it matches TCP or TCP secure port that the server is listening.
bool is_local_port
= (maybe_secure_port && host.isLocalAddress(*maybe_secure_port)) || host.isLocalAddress(global_context->getTCPPort());
if (!is_local_port)
continue;
}
catch (const Exception & e)
{
if (e.code() != ErrorCodes::DNS_ERROR)
throw;
if (!first_exception)
first_exception = std::current_exception();
2020-11-27 14:04:03 +00:00
/// Ignore unknown hosts (in case DNS record was removed)
/// We will rethrow exception if we don't find local host in the list.
2020-11-27 14:04:03 +00:00
continue;
}
2020-11-27 14:04:03 +00:00
if (host_in_hostlist)
{
/// This check could be slow a little bit
LOG_WARNING(log, "There are two the same ClickHouse instances in task {}: {} and {}. Will use the first one only.",
entry_name, host_id.readableString(), host.readableString());
}
else
{
host_in_hostlist = true;
host_id = host;
host_id_str = host.toString();
}
}
if (!host_in_hostlist && first_exception)
{
/// We don't know for sure if we should process task or not
std::rethrow_exception(first_exception);
}
2020-11-27 14:04:03 +00:00
return host_in_hostlist;
}
void DDLTask::setClusterInfo(ContextPtr context, Poco::Logger * log)
2020-11-27 14:04:03 +00:00
{
2021-01-25 18:59:23 +00:00
auto * query_on_cluster = dynamic_cast<ASTQueryWithOnCluster *>(query.get());
2020-11-27 14:04:03 +00:00
if (!query_on_cluster)
throw Exception("Received unknown DDL query", ErrorCodes::UNKNOWN_TYPE_OF_QUERY);
cluster_name = query_on_cluster->cluster;
cluster = context->tryGetCluster(cluster_name);
2020-11-27 14:04:03 +00:00
if (!cluster)
throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION,
"DDL task {} contains current host {} in cluster {}, but there is no such cluster here.",
2020-11-27 14:04:03 +00:00
entry_name, host_id.readableString(), cluster_name);
/// Try to find host from task host list in cluster
/// At the first, try find exact match (host name and ports should be literally equal)
/// If the attempt fails, try find it resolving host name of each instance
if (!tryFindHostInCluster())
{
LOG_WARNING(log, "Not found the exact match of host {} from task {} in cluster {} definition. Will try to find it using host name resolving.",
host_id.readableString(), entry_name, cluster_name);
if (!tryFindHostInClusterViaResolving(context))
throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION, "Not found host {} in definition of cluster {}",
host_id.readableString(), cluster_name);
LOG_INFO(log, "Resolved host {} from task {} as host {} in definition of cluster {}",
host_id.readableString(), entry_name, address_in_cluster.readableString(), cluster_name);
}
/// Rewrite AST without ON CLUSTER.
WithoutOnClusterASTRewriteParams params;
params.default_database = address_in_cluster.default_database;
params.host_id = address_in_cluster.toString();
query = query_on_cluster->getRewrittenASTWithoutOnCluster(params);
2020-11-27 14:04:03 +00:00
query_on_cluster = nullptr;
}
bool DDLTask::tryFindHostInCluster()
{
const auto & shards = cluster->getShardsAddresses();
bool found_exact_match = false;
String default_database;
for (size_t shard_num = 0; shard_num < shards.size(); ++shard_num)
{
for (size_t replica_num = 0; replica_num < shards[shard_num].size(); ++replica_num)
{
const Cluster::Address & address = shards[shard_num][replica_num];
if (address.host_name == host_id.host_name && address.port == host_id.port)
{
if (found_exact_match)
{
if (default_database == address.default_database)
{
throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION,
"There are two exactly the same ClickHouse instances {} in cluster {}",
address.readableString(), cluster_name);
}
else
{
/* Circular replication is used.
* It is when every physical node contains
* replicas of different shards of the same table.
* To distinguish one replica from another on the same node,
* every shard is placed into separate database.
* */
is_circular_replicated = true;
auto * query_with_table = dynamic_cast<ASTQueryWithTableAndOutput *>(query.get());
/// For other DDLs like CREATE USER, there is no database name and should be executed successfully.
if (query_with_table)
2020-11-27 14:04:03 +00:00
{
if (!query_with_table->database)
throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION,
"For a distributed DDL on circular replicated cluster its table name must be qualified by database name.");
if (default_database == query_with_table->getDatabase())
return true;
2020-11-27 14:04:03 +00:00
}
}
}
found_exact_match = true;
host_shard_num = shard_num;
host_replica_num = replica_num;
address_in_cluster = address;
default_database = address.default_database;
}
}
}
return found_exact_match;
}
bool DDLTask::tryFindHostInClusterViaResolving(ContextPtr context)
2020-11-27 14:04:03 +00:00
{
const auto & shards = cluster->getShardsAddresses();
bool found_via_resolving = false;
for (size_t shard_num = 0; shard_num < shards.size(); ++shard_num)
{
for (size_t replica_num = 0; replica_num < shards[shard_num].size(); ++replica_num)
{
const Cluster::Address & address = shards[shard_num][replica_num];
if (auto resolved = address.getResolvedAddress(); resolved
&& (isLocalAddress(*resolved, context->getTCPPort())
|| (context->getTCPPortSecure() && isLocalAddress(*resolved, *context->getTCPPortSecure()))))
2020-11-27 14:04:03 +00:00
{
if (found_via_resolving)
{
throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION,
"There are two the same ClickHouse instances in cluster {} : {} and {}",
cluster_name, address_in_cluster.readableString(), address.readableString());
}
else
{
found_via_resolving = true;
host_shard_num = shard_num;
host_replica_num = replica_num;
address_in_cluster = address;
}
}
}
}
return found_via_resolving;
}
String DDLTask::getShardID() const
{
/// Generate unique name for shard node, it will be used to execute the query by only single host
/// Shard node name has format 'replica_name1,replica_name2,...,replica_nameN'
/// Where replica_name is 'replica_config_host_name:replica_port'
auto shard_addresses = cluster->getShardsAddresses().at(host_shard_num);
Strings replica_names;
for (const Cluster::Address & address : shard_addresses)
replica_names.emplace_back(address.readableString());
2022-01-30 19:49:48 +00:00
::sort(replica_names.begin(), replica_names.end());
2020-11-27 14:04:03 +00:00
String res;
for (auto it = replica_names.begin(); it != replica_names.end(); ++it)
res += *it + (std::next(it) != replica_names.end() ? "," : "");
return res;
}
DatabaseReplicatedTask::DatabaseReplicatedTask(const String & name, const String & path, DatabaseReplicated * database_)
: DDLTaskBase(name, path)
, database(database_)
{
host_id_str = database->getFullReplicaName();
}
String DatabaseReplicatedTask::getShardID() const
{
return database->shard_name;
}
void DatabaseReplicatedTask::parseQueryFromEntry(ContextPtr context)
2021-03-17 14:29:24 +00:00
{
DDLTaskBase::parseQueryFromEntry(context);
if (auto * ddl_query = dynamic_cast<ASTQueryWithTableAndOutput *>(query.get()))
{
/// Update database name with actual name of local database
2022-06-29 14:27:21 +00:00
chassert(!ddl_query->database);
ddl_query->setDatabase(database->getDatabaseName());
2021-03-17 14:29:24 +00:00
}
formatRewrittenQuery(context);
2021-03-17 14:29:24 +00:00
}
2021-05-31 14:49:02 +00:00
ContextMutablePtr DatabaseReplicatedTask::makeQueryContext(ContextPtr from_context, const ZooKeeperPtr & zookeeper)
2020-11-27 14:04:03 +00:00
{
2021-02-19 23:41:58 +00:00
auto query_context = DDLTaskBase::makeQueryContext(from_context, zookeeper);
2021-02-08 19:36:17 +00:00
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
2021-07-30 16:34:18 +00:00
query_context->getClientInfo().is_replicated_database_internal = true;
2020-11-27 14:04:03 +00:00
query_context->setCurrentDatabase(database->getDatabaseName());
2021-07-06 10:26:03 +00:00
auto txn = std::make_shared<ZooKeeperMetadataTransaction>(zookeeper, database->zookeeper_path, is_initial_query, entry_path);
2021-02-19 23:41:58 +00:00
query_context->initZooKeeperMetadataTransaction(txn);
2020-11-29 11:45:32 +00:00
if (is_initial_query)
2020-11-27 14:04:03 +00:00
{
2021-02-19 23:41:58 +00:00
txn->addOp(zkutil::makeRemoveRequest(entry_path + "/try", -1));
txn->addOp(zkutil::makeCreateRequest(entry_path + "/committed", host_id_str, zkutil::CreateMode::Persistent));
txn->addOp(zkutil::makeSetRequest(database->zookeeper_path + "/max_log_ptr", toString(getLogEntryNumber(entry_name)), -1));
2020-11-27 14:04:03 +00:00
}
2022-05-06 16:37:20 +00:00
txn->addOp(getOpToUpdateLogPointer());
2020-11-29 11:45:32 +00:00
2021-02-19 23:41:58 +00:00
for (auto & op : ops)
txn->addOp(std::move(op));
2020-12-04 20:12:32 +00:00
ops.clear();
2020-11-27 14:04:03 +00:00
return query_context;
}
2022-05-06 16:37:20 +00:00
Coordination::RequestPtr DatabaseReplicatedTask::getOpToUpdateLogPointer()
{
return zkutil::makeSetRequest(database->replica_path + "/log_ptr", toString(getLogEntryNumber(entry_name)), -1);
}
2022-07-27 17:15:00 +00:00
void DatabaseReplicatedTask::createSyncedNodeIfNeed(const ZooKeeperPtr & zookeeper)
{
assert(!completely_processed);
if (!entry.settings)
return;
Field value;
if (!entry.settings->tryGet("database_replicated_enforce_synchronous_settings", value))
return;
/// Bool type is really weird, sometimes it's Bool and sometimes it's UInt64...
assert(value.getType() == Field::Types::Bool || value.getType() == Field::Types::UInt64);
if (!value.get<UInt64>())
return;
zookeeper->createIfNotExists(getSyncedNodePath(), "");
}
2021-02-09 15:14:20 +00:00
String DDLTaskBase::getLogEntryName(UInt32 log_entry_number)
2020-11-27 14:04:03 +00:00
{
2022-03-14 20:43:34 +00:00
return zkutil::getSequentialNodeName("query-", log_entry_number);
2020-11-27 14:04:03 +00:00
}
2021-02-09 15:14:20 +00:00
UInt32 DDLTaskBase::getLogEntryNumber(const String & log_entry_name)
2020-11-27 14:04:03 +00:00
{
constexpr const char * name = "query-";
assert(startsWith(log_entry_name, name));
2021-03-04 23:17:07 +00:00
UInt32 num = parse<UInt32>(log_entry_name.substr(strlen(name)));
assert(num < std::numeric_limits<Int32>::max());
return num;
2020-11-27 14:04:03 +00:00
}
2021-02-19 23:41:58 +00:00
void ZooKeeperMetadataTransaction::commit()
2020-11-27 14:04:03 +00:00
{
2021-07-02 16:39:55 +00:00
if (state != CREATED)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Incorrect state ({}), it's a bug", state);
2020-12-04 20:12:32 +00:00
state = FAILED;
2020-11-29 11:45:32 +00:00
current_zookeeper->multi(ops);
2021-02-08 19:36:17 +00:00
state = COMMITTED;
2020-11-27 14:04:03 +00:00
}
ClusterPtr tryGetReplicatedDatabaseCluster(const String & cluster_name)
{
if (const auto * replicated_db = dynamic_cast<const DatabaseReplicated *>(DatabaseCatalog::instance().tryGetDatabase(cluster_name).get()))
2022-08-18 14:13:27 +00:00
return replicated_db->tryGetCluster();
return {};
}
2020-11-03 13:47:26 +00:00
}