Add config-based host name selection. [#CLICKHOUSE-3128]

This commit is contained in:
Vitaliy Lyudvichenko 2017-07-26 22:31:32 +03:00 committed by alexey-milovidov
parent 3c0d0274d0
commit c65c49b50a
4 changed files with 184 additions and 87 deletions

View File

@ -5,6 +5,7 @@
#include <Common/StringUtils.h>
#include <IO/HexWriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
#include <openssl/sha.h>
@ -18,6 +19,7 @@ namespace ErrorCodes
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int LOGICAL_ERROR;
extern const int SHARD_HAS_NO_CONNECTIONS;
extern const int SYNTAX_ERROR;
}
namespace
@ -113,6 +115,16 @@ String Cluster::Address::toString(const String & host_name, UInt16 port)
return escapeForFileName(host_name) + ':' + DB::toString(port);
}
void Cluster::Address::fromString(const String & host_port_string, String & host_name, UInt16 & port)
{
auto pos = host_port_string.find_last_of(':');
if (pos == std::string::npos)
throw Exception("Incorrect host ID format " + host_port_string, ErrorCodes::SYNTAX_ERROR);
host_name = unescapeForFileName(host_port_string.substr(0, pos));
port = parse<UInt16>(host_port_string.substr(0, pos));
}
String Cluster::Address::toStringFull() const
{

View File

@ -57,6 +57,7 @@ public:
UInt32 replica_num;
bool is_local;
Address() = default;
Address(Poco::Util::AbstractConfiguration & config, const String & config_prefix);
Address(const String & host_port_, const String & user_, const String & password_);
@ -65,6 +66,8 @@ public:
static String toString(const String & host_name, UInt16 port);
static void fromString(const String & host_port_string, String & host_name, UInt16 & port);
/// Retrurns escaped user:password@resolved_host_address:resolved_host_port#default_database
String toStringFull() const;
};

View File

@ -30,6 +30,7 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/Lock.h>
#include <Common/isLocalAddress.h>
#include <Poco/Timestamp.h>
#include <experimental/optional>
@ -47,13 +48,13 @@ namespace ErrorCodes
extern const int INCONSISTENT_CLUSTER_DEFINITION;
extern const int TIMEOUT_EXCEEDED;
extern const int UNFINISHED;
extern const int UNKNOWN_TYPE_OF_QUERY;
}
const size_t DDLWorker::node_max_lifetime_seconds = 7 * 24 * 60 * 60; // week
const size_t DDLWorker::cleanup_min_period_seconds = 60; // minute
struct DDLLogEntry
{
String query;
@ -98,21 +99,81 @@ struct DDLLogEntry
};
using ShardAndHostNum = std::experimental::optional<std::pair<size_t, size_t>>;
static ShardAndHostNum tryGetShardAndHostNum(const Cluster::AddressesWithFailover & cluster, const String & host_name, UInt16 port)
struct DDLTask
{
for (size_t shard_num = 0; shard_num < cluster.size(); ++shard_num)
DDLLogEntry entry;
String entry_name;
ASTPtr query;
ASTQueryWithOnCluster * query_on_cluster = nullptr;
String cluster_name;
ClusterPtr cluster;
Cluster::Address host_address;
String host_id_in_cluster;
size_t host_shard_num;
size_t host_replica_num;
/// Parses entry and query, extracts cluster and finds current host in the cluster
/// Return true if current host is found in the cluster
bool fillFromEntryData(const String & entry_data, const String & entry_name_, DDLWorker & worker)
{
for (size_t host_num = 0; host_num < cluster[shard_num].size(); ++host_num)
entry.parse(entry_data);
entry_name = entry_name_;
{
const Cluster::Address & address = cluster[shard_num][host_num];
if (address.host_name == host_name && address.port == port)
return std::make_pair(shard_num, host_num);
const char * begin = entry.query.data();
const char * end = begin + entry.query.size();
ParserQuery parser_query(end);
String description;
query = parseQuery(parser_query, begin, end, description);
}
if (!query || !(query_on_cluster = dynamic_cast<ASTQueryWithOnCluster *>(query.get())))
throw Exception("Recieved unknown DDL query", ErrorCodes::UNKNOWN_TYPE_OF_QUERY);
cluster_name = query_on_cluster->cluster;
cluster = worker.context.tryGetCluster(cluster_name);
if (!cluster)
{
LOG_INFO(worker.log, "Will not execute entry " << entry_name << ": there is no cluster " << cluster_name << " on current host");
return false;
}
bool found = false;
const auto & shards = cluster->getShardsAddresses();
for (size_t shard_num = 0; shard_num < shards.size(); ++shard_num)
{
for (size_t replica_num = 0; replica_num < shards[shard_num].size(); ++replica_num)
{
const Cluster::Address & address = shards[shard_num][replica_num];
if (isLocalAddress(address.resolved_address))
{
if (found)
{
LOG_WARNING(worker.log, "There are at least two the same ClickHouse instances in cluster " << cluster_name << ": "
<< host_id_in_cluster << " and " << address.toString()
<< ". Will use the first one only.");
}
else
{
host_address = address;
host_id_in_cluster = address.toString();
host_shard_num = shard_num;
host_replica_num = replica_num;
found = true;
}
}
}
}
return {};
}
return found;
}
};
static bool isSupportedAlterType(int type)
@ -136,9 +197,8 @@ DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_)
if (queue_dir.back() == '/')
queue_dir.resize(queue_dir.size() - 1);
host_name = getFQDNOrHostName();
port = context.getTCPPort();
host_id = Cluster::Address::toString(host_name, port);
host_fqdn = getFQDNOrHostName();
host_fqdn_id = Cluster::Address::toString(host_fqdn, context.getTCPPort());
event_queue_updated = std::make_shared<Poco::Event>();
@ -181,34 +241,74 @@ void DDLWorker::processTasks()
continue;
}
DDLLogEntry node;
node.parse(node_data);
bool host_in_hostlist = std::find(node.hosts.cbegin(), node.hosts.cend(), host_id) != node.hosts.cend();
bool already_processed = zookeeper->exists(node_path + "/finished/" + host_id);
if (!server_startup && already_processed)
{
throw Exception(
"Server expects that DDL node " + node_name + " should be processed, but it was already processed according to ZK",
ErrorCodes::LOGICAL_ERROR);
}
if (host_in_hostlist && !already_processed)
{
DDLTask task;
bool found_cluster_and_host = false;
try
{
processTask(node, node_name);
found_cluster_and_host = task.fillFromEntryData(node_data, node_name, *this);
}
catch (...)
{
tryLogCurrentException(log, "An error occurred while processing node " + node_name + " (" + node.query + ")");
auto status = ExecutionStatus::fromCurrentException();
/// We even cannot parse host name and can't properly submit execution status.
/// What should we do?
}
const auto & hosts = task.entry.hosts;
if (!found_cluster_and_host)
{
bool fqdn_in_hostlist = std::find(hosts.cbegin(), hosts.cend(), host_fqdn_id) != hosts.cend();
if (fqdn_in_hostlist)
{
LOG_ERROR(log, "Not found current host in cluster " << task.cluster_name << " of task " << task.entry_name
<< ", but found host " << host_fqdn_id << " with the same FQDN in host list of the task"
<< ". Possibly inconsistent cluster definition among servers.");
}
else
{
LOG_DEBUG(log, "Skipping task " << node_data);
}
last_processed_node_name = node_name;
continue;
}
else
{
bool host_in_hostlist = std::find(hosts.cbegin(), hosts.cend(), task.host_id_in_cluster) != hosts.cend();
if (!host_in_hostlist)
{
LOG_ERROR(log, "Current host was found in cluster " << task.cluster_name
<< ", but was not found in host list of task " << task.entry_name
<< ". Possibly inconsistent cluster definition among servers.");
last_processed_node_name = node_name;
continue;
}
}
bool already_processed = zookeeper->exists(node_path + "/finished/" + task.host_id_in_cluster);
if (!server_startup && already_processed)
{
throw Exception(
"Server expects that DDL task " + node_name + " should be processed, but it was already processed according to ZK",
ErrorCodes::LOGICAL_ERROR);
}
if (!already_processed)
{
try
{
processTask(task);
}
catch (...)
{
tryLogCurrentException(log, "An error occurred while processing task " + node_name + " (" + task.entry.query + ")");
throw;
}
}
else
{
LOG_DEBUG(log, "Node " << node_name << " (" << node.query << ") will not be processed");
LOG_DEBUG(log, "Task " << node_name << " (" << task.entry.query << ") has been already processed");
}
last_processed_node_name = node_name;
@ -240,58 +340,32 @@ static bool tryExecuteQuery(const String & query, Context & context, ExecutionSt
}
void DDLWorker::processTask(const DDLLogEntry & node, const std::string & node_name)
void DDLWorker::processTask(DDLTask & task)
{
LOG_DEBUG(log, "Processing node " << node_name << " (" << node.query << ")");
LOG_DEBUG(log, "Processing entry " << task.entry_name << " (" << task.entry.query << ")");
String node_path = queue_dir + "/" + node_name;
String node_path = queue_dir + "/" + task.entry_name;
createStatusDirs(node_path);
bool should_not_execute = current_node == node_name && current_node_was_executed;
bool should_not_execute = current_node == task.entry_name && current_node_was_executed;
if (!should_not_execute)
{
current_node = node_name;
current_node = task.entry_name;
current_node_was_executed = false;
zookeeper->create(node_path + "/active/" + host_id, "", zkutil::CreateMode::Ephemeral);
zookeeper->create(node_path + "/active/" + task.host_id_in_cluster, "", zkutil::CreateMode::Ephemeral);
try
{
ASTPtr query_ast;
{
const char * begin = &node.query.front();
ParserQuery parser_query(begin + node.query.size());
String description;
query_ast = parseQuery(parser_query, begin, begin + node.query.size(), description);
}
const ASTQueryWithOnCluster * query;
if (!query_ast || !(query = dynamic_cast<const ASTQueryWithOnCluster *>(query_ast.get())))
throw Exception("Recieved unsupported DDL query", ErrorCodes::NOT_IMPLEMENTED);
String cluster_name = query->cluster;
auto cluster = context.getCluster(cluster_name);
auto shard_host_num = tryGetShardAndHostNum(cluster->getShardsAddresses(), host_name, port);
if (!shard_host_num)
{
throw Exception("Cannot find own address (" + host_id + ") in cluster " + cluster_name + " configuration",
ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
}
size_t shard_num = shard_host_num->first;
size_t host_num = shard_host_num->second;
const auto & host_address = cluster->getShardsAddresses().at(shard_num).at(host_num);
ASTPtr rewritten_ast = query->getRewrittenASTWithoutOnCluster(host_address.default_database);
ASTPtr rewritten_ast = task.query_on_cluster->getRewrittenASTWithoutOnCluster(task.host_address.default_database);
String rewritten_query = queryToString(rewritten_ast);
LOG_DEBUG(log, "Executing query: " << rewritten_query);
if (auto query_alter = dynamic_cast<const ASTAlterQuery *>(rewritten_ast.get()))
{
processTaskAlter(query_alter, rewritten_query, cluster, shard_num, node_path);
processTaskAlter(task, query_alter, rewritten_query, node_path);
}
else
{
@ -313,21 +387,23 @@ void DDLWorker::processTask(const DDLLogEntry & node, const std::string & node_n
/// Delete active flag and create finish flag
zkutil::Ops ops;
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(node_path + "/active/" + host_id, -1));
ops.emplace_back(std::make_unique<zkutil::Op::Create>(node_path + "/finished/" + host_id,
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(node_path + "/active/" + task.host_id_in_cluster, -1));
ops.emplace_back(std::make_unique<zkutil::Op::Create>(node_path + "/finished/" + task.host_id_in_cluster,
current_node_execution_status.serializeText(), zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
int code = zookeeper->tryMultiWithRetries(ops);
if (code != ZOK && code != ZNONODE)
throw zkutil::KeeperException("Cannot commit executed node " + node_name, code);
{
/// FIXME: if server fails here, the task will be executed twice. We need WAL here.
throw zkutil::KeeperException("Cannot commit executed entry " + task.entry_name, code);
}
}
void DDLWorker::processTaskAlter(
DDLTask & task,
const ASTAlterQuery * query_alter,
const String & rewritten_query,
const std::shared_ptr<Cluster> & cluster,
ssize_t shard_num,
const String & node_path)
{
String database = query_alter->database.empty() ? context.getCurrentDatabase() : query_alter->database;
@ -345,31 +421,31 @@ void DDLWorker::processTaskAlter(
execute_on_leader_replica |= param.type == ASTAlterQuery::DROP_PARTITION;
}
const auto & shard_info = cluster->getShardsInfo().at(shard_num);
const auto & shard_info = task.cluster->getShardsInfo().at(task.host_shard_num);
bool config_is_replicated_shard = shard_info.hasInternalReplication();
if (execute_once_on_replica && !config_is_replicated_shard)
{
throw Exception("Table " + query_alter->table + " is replicated, but shard #" + toString(shard_num + 1) +
throw Exception("Table " + query_alter->table + " is replicated, but shard #" + toString(task.host_shard_num + 1) +
" isn't replicated according to its cluster definition", ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
}
else if (!execute_once_on_replica && config_is_replicated_shard)
{
throw Exception("Table " + query_alter->table + " isn't replicated, but shard #" + toString(shard_num + 1) +
throw Exception("Table " + query_alter->table + " isn't replicated, but shard #" + toString(task.host_shard_num + 1) +
" replicated according to its cluster definition", ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
}
if (execute_once_on_replica)
{
/// The following code may perform ALTER twice if
/// current secver aquires lock, executes replicated alter,
/// The following code can perform ALTER twice if
/// current server aquires lock, executes replicated alter,
/// losts zookeeper connection and doesn't have time to create /executed node, second server executes replicated alter again
/// To avoid this problem alter() method of replicated tables should be changed and takes into account ddl query id tag.
if (!context.getSettingsRef().distributed_ddl_allow_replicated_alter)
throw Exception("Distributed DDL alters don't work properly yet", ErrorCodes::NOT_IMPLEMENTED);
Strings replica_names;
for (const auto & address : cluster->getShardsAddresses().at(shard_num))
for (const auto & address : task.cluster->getShardsAddresses().at(task.host_shard_num))
replica_names.emplace_back(address.toString());
std::sort(replica_names.begin(), replica_names.end());
@ -386,8 +462,8 @@ void DDLWorker::processTaskAlter(
auto zookeeper_holder = std::make_shared<zkutil::ZooKeeperHolder>();
zookeeper_holder->initFromInstance(zookeeper);
zkutil::Lock lock(zookeeper_holder, shard_path, "lock", host_id);
std::mt19937 rng(std::hash<String>{}(host_id) + reinterpret_cast<intptr_t>(&rng));
zkutil::Lock lock(zookeeper_holder, shard_path, "lock", task.host_id_in_cluster);
std::mt19937 rng(std::hash<String>{}(task.host_id_in_cluster) + reinterpret_cast<intptr_t>(&rng));
for (int num_tries = 0; num_tries < 10; ++num_tries)
{
@ -406,7 +482,7 @@ void DDLWorker::processTaskAlter(
/// TODO: it is ok to recieve exception "host is not leader"
}
zookeeper->create(is_executed_path, host_id, zkutil::CreateMode::Persistent);
zookeeper->create(is_executed_path, task.host_id_in_cluster, zkutil::CreateMode::Persistent);
lock.unlock();
alter_executed_by_replica = true;
break;
@ -455,7 +531,7 @@ void DDLWorker::cleanupQueue(const Strings * node_names_to_check)
if (!zookeeper->tryGet(node_path, data, &stat))
continue;
/// TODO: Add shared lock to avoid rare race counditions.
/// TODO: Add shared lock to avoid rare race conditions.
size_t zookeeper_time_seconds = stat.mtime / zookeeper_time_resolution;
if (zookeeper_time_seconds + node_max_lifetime_seconds < current_time_seconds)
@ -603,7 +679,7 @@ public:
if (!zookeeper->exists(node_path))
{
throw Exception("Cannot provide query execution status. The query's node " + node_path
+ " had been deleted by cleaner since it was finished (or its lifetime is expired)",
+ " had been deleted by the cleaner since it was finished (or its lifetime is expired)",
ErrorCodes::UNFINISHED);
}
@ -615,15 +691,19 @@ public:
Strings cur_active_hosts = getChildrenAllowNoNode(zookeeper, node_path + "/active");
res = sample.cloneEmpty();
for (const String & host : new_hosts)
for (const String & host_id : new_hosts)
{
ExecutionStatus status(1, "Cannot obtain error message");
{
String status_data;
if (zookeeper->tryGet(node_path + "/finished/" + host, status_data))
if (zookeeper->tryGet(node_path + "/finished/" + host_id, status_data))
status.deserializeText(status_data);
}
String host;
UInt16 port;
Cluster::Address::fromString(host_id, host, port);
res.getByName("host").column->insert(host);
res.getByName("status").column->insert(static_cast<UInt64>(status.code));
res.getByName("error").column->insert(status.message);
@ -698,7 +778,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, Context & context)
DDLLogEntry entry;
entry.query = queryToString(query_ptr);
entry.initiator = ddl_worker.getHostName();
entry.initiator = ddl_worker.getCommonHostID();
Cluster::AddressesWithFailover shards = cluster->getShardsAddresses();
for (const auto & shard : shards)

View File

@ -15,6 +15,7 @@ namespace DB
class ASTAlterQuery;
struct DDLLogEntry;
struct DDLTask;
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, Context & context);
@ -29,21 +30,22 @@ public:
/// Pushes query into DDL queue, returns path to created node
String enqueueQuery(DDLLogEntry & entry);
std::string getHostName() const
/// Host ID (name:port) for logging purposes
/// Note that in each entry hosts are identified by name:port from cluster config
std::string getCommonHostID() const
{
return host_id;
return host_fqdn_id;
}
private:
void processTasks();
void processTask(const DDLLogEntry & node, const std::string & node_path);
void processTask(DDLTask & task);
void processTaskAlter(
DDLTask & task,
const ASTAlterQuery * query_alter,
const String & rewritten_query,
const std::shared_ptr<Cluster> & cluster,
ssize_t shard_num,
const String & node_path);
/// Checks and cleanups queue's nodes
@ -58,9 +60,8 @@ private:
Context & context;
Logger * log = &Logger::get("DDLWorker");
std::string host_id; /// host_name:port
std::string host_name;
UInt16 port;
std::string host_fqdn; /// current host domain name
std::string host_fqdn_id; /// host_name:port
std::string queue_dir; /// dir with queue of queries
std::string master_dir; /// dir with queries was initiated by the server
@ -87,6 +88,7 @@ private:
static const size_t cleanup_min_period_seconds;
friend class DDLQueryStatusInputSream;
friend class DDLTask;
};