Allow parallel execution of distributed DDL

Add distributed_ddl.pool_size to control maximum parallel to handle
distributed DDL.

Also:
- convert Exception constructors to fmt-like
- use sleepFor* over std::this_thread::sleep_for()
This commit is contained in:
Azat Khuzhin 2020-09-03 01:35:47 +03:00
parent 70b0fe88b1
commit dd867b787f
4 changed files with 162 additions and 136 deletions

View File

@ -708,7 +708,10 @@ int Server::main(const std::vector<std::string> & /*args*/)
{
/// DDL worker should be started after all tables were loaded
String ddl_zookeeper_path = config().getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/");
global_context->setDDLWorker(std::make_unique<DDLWorker>(ddl_zookeeper_path, *global_context, &config(), "distributed_ddl"));
int pool_size = config().getInt("distributed_ddl.pool_size", 1);
if (pool_size < 1)
throw Exception("distributed_ddl.pool_size should be greater then 0", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
global_context->setDDLWorker(std::make_unique<DDLWorker>(pool_size, ddl_zookeeper_path, *global_context, &config(), "distributed_ddl"));
}
std::unique_ptr<DNSCacheUpdater> dns_cache_updater;

View File

@ -615,6 +615,9 @@
<!-- Settings from this profile will be used to execute DDL queries -->
<!-- <profile>default</profile> -->
<!-- Controls how much ON CLUSTER queries can be run simultaneously. -->
<!-- <pool_size>1</pool_size> -->
</distributed_ddl>
<!-- Settings to fine tune MergeTree tables. See documentation in source code, in MergeTreeSettings.h -->

View File

@ -22,7 +22,6 @@
#include <Access/ContextAccess.h>
#include <Common/DNSResolver.h>
#include <Common/Macros.h>
#include <common/getFQDNOrHostName.h>
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
#include <Common/randomSeed.h>
@ -38,10 +37,11 @@
#include <Columns/ColumnArray.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Poco/Timestamp.h>
#include <Poco/Net/NetException.h>
#include <common/sleep.h>
#include <common/getFQDNOrHostName.h>
#include <random>
#include <pcg_random.hpp>
#include <Poco/Net/NetException.h>
namespace DB
@ -144,7 +144,7 @@ struct DDLLogEntry
rb >> "version: " >> version >> "\n";
if (version != CURRENT_VERSION)
throw Exception("Unknown DDLLogEntry format version: " + DB::toString(version), ErrorCodes::UNKNOWN_FORMAT_VERSION);
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unknown DDLLogEntry format version: {}", version);
Strings host_id_strings;
rb >> "query: " >> escape >> query >> "\n";
@ -308,9 +308,14 @@ static bool isSupportedAlterType(int type)
}
DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix)
: context(context_), log(&Poco::Logger::get("DDLWorker"))
DDLWorker::DDLWorker(int pool_size_, const std::string & zk_root_dir, Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix)
: context(context_)
, log(&Poco::Logger::get("DDLWorker"))
, pool_size(pool_size_)
, worker_pool(pool_size_)
{
last_tasks.reserve(pool_size);
queue_dir = zk_root_dir;
if (queue_dir.back() == '/')
queue_dir.resize(queue_dir.size() - 1);
@ -343,6 +348,7 @@ DDLWorker::~DDLWorker()
stop_flag = true;
queue_updated_event->set();
cleanup_event->set();
worker_pool.wait();
main_thread.join();
cleanup_thread.join();
}
@ -364,8 +370,27 @@ DDLWorker::ZooKeeperPtr DDLWorker::getAndSetZooKeeper()
return current_zookeeper;
}
void DDLWorker::recoverZooKeeper()
{
LOG_DEBUG(log, "Recovering ZooKeeper session after: {}", getCurrentExceptionMessage(false));
bool DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper)
while (!stop_flag)
{
try
{
getAndSetZooKeeper();
break;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
sleepForSeconds(5);
}
}
}
DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper)
{
String node_data;
String entry_path = queue_dir + "/" + entry_name;
@ -374,7 +399,7 @@ bool DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason,
{
/// It is Ok that node could be deleted just now. It means that there are no current host in node's host list.
out_reason = "The task was deleted";
return false;
return {};
}
auto task = std::make_unique<DDLTask>();
@ -405,7 +430,7 @@ bool DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason,
}
out_reason = "Incorrect task format";
return false;
return {};
}
bool host_in_hostlist = false;
@ -433,12 +458,13 @@ bool DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason,
}
}
if (host_in_hostlist)
current_task = std::move(task);
else
if (!host_in_hostlist)
{
out_reason = "There is no a local address in host list";
return {};
}
return host_in_hostlist;
return task;
}
@ -448,10 +474,9 @@ static void filterAndSortQueueNodes(Strings & all_nodes)
std::sort(all_nodes.begin(), all_nodes.end());
}
void DDLWorker::processTasks()
void DDLWorker::scheduleTasks()
{
LOG_DEBUG(log, "Processing tasks");
LOG_DEBUG(log, "Scheduling tasks");
auto zookeeper = tryGetZooKeeper();
Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, queue_updated_event);
@ -459,86 +484,60 @@ void DDLWorker::processTasks()
if (queue_nodes.empty())
return;
bool server_startup = last_processed_task_name.empty();
bool server_startup = last_tasks.empty();
auto begin_node = server_startup
? queue_nodes.begin()
: std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_processed_task_name);
: std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_tasks.back());
for (auto it = begin_node; it != queue_nodes.end(); ++it)
{
String entry_name = *it;
if (current_task)
String reason;
auto task = initAndCheckTask(entry_name, reason, zookeeper);
if (!task)
{
if (current_task->entry_name == entry_name)
{
LOG_INFO(log, "Trying to process task {} again", entry_name);
}
else
{
LOG_INFO(log, "Task {} was deleted from ZooKeeper before current host committed it", current_task->entry_name);
current_task = nullptr;
}
LOG_DEBUG(log, "Will not execute task {}: {}", entry_name, reason);
saveTask(entry_name);
continue;
}
if (!current_task)
bool already_processed = zookeeper->exists(task->entry_path + "/finished/" + task->host_id_str);
if (!server_startup && !task->was_executed && already_processed)
{
String reason;
if (!initAndCheckTask(entry_name, reason, zookeeper))
{
LOG_DEBUG(log, "Will not execute task {}: {}", entry_name, reason);
last_processed_task_name = entry_name;
continue;
}
}
DDLTask & task = *current_task;
bool already_processed = zookeeper->exists(task.entry_path + "/finished/" + task.host_id_str);
if (!server_startup && !task.was_executed && already_processed)
{
throw Exception(
"Server expects that DDL task " + task.entry_name + " should be processed, but it was already processed according to ZK",
ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Server expects that DDL task {} should be processed, but it was already processed according to ZK",
entry_name);
}
if (!already_processed)
{
try
worker_pool.scheduleOrThrowOnError([this, task_ptr = task.release()]()
{
processTask(task, zookeeper);
}
catch (const Coordination::Exception & e)
{
if (server_startup && e.code == Coordination::Error::ZNONODE)
{
LOG_WARNING(log, "ZooKeeper NONODE error during startup. Ignoring entry {} ({}) : {}", task.entry_name, task.entry.query, getCurrentExceptionMessage(true));
}
else
{
throw;
}
}
catch (...)
{
LOG_WARNING(log, "An error occurred while processing task {} ({}) : {}", task.entry_name, task.entry.query, getCurrentExceptionMessage(true));
throw;
}
enqueueTask(DDLTaskPtr(task_ptr));
});
}
else
{
LOG_DEBUG(log, "Task {} ({}) has been already processed", task.entry_name, task.entry.query);
LOG_DEBUG(log, "Task {} ({}) has been already processed", entry_name, task->entry.query);
}
last_processed_task_name = task.entry_name;
current_task.reset();
saveTask(entry_name);
if (stop_flag)
break;
}
}
void DDLWorker::saveTask(const String & entry_name)
{
if (last_tasks.size() == pool_size)
{
last_tasks.erase(last_tasks.begin());
}
last_tasks.emplace_back(entry_name);
}
/// Parses query and resolves cluster and host in cluster
void DDLWorker::parseQueryAndResolveHost(DDLTask & task)
@ -559,10 +558,9 @@ void DDLWorker::parseQueryAndResolveHost(DDLTask & task)
task.cluster_name = task.query_on_cluster->cluster;
task.cluster = context.tryGetCluster(task.cluster_name);
if (!task.cluster)
{
throw Exception("DDL task " + task.entry_name + " contains current host " + task.host_id.readableString()
+ " in cluster " + task.cluster_name + ", but there are no such cluster here.", ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
}
throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION,
"DDL task {} contains current host {} in cluster {}, but there are no such cluster here.",
task.entry_name, task.host_id.readableString(), task.cluster_name);
/// Try to find host from task host list in cluster
/// At the first, try find exact match (host name and ports should be literally equal)
@ -583,10 +581,9 @@ void DDLWorker::parseQueryAndResolveHost(DDLTask & task)
{
if (default_database == address.default_database)
{
throw Exception(
"There are two exactly the same ClickHouse instances " + address.readableString() + " in cluster "
+ task.cluster_name,
ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION,
"There are two exactly the same ClickHouse instances {} in cluster {}",
address.readableString(), task.cluster_name);
}
else
{
@ -600,9 +597,8 @@ void DDLWorker::parseQueryAndResolveHost(DDLTask & task)
auto * query_with_table = dynamic_cast<ASTQueryWithTableAndOutput *>(task.query.get());
if (!query_with_table || query_with_table->database.empty())
{
throw Exception(
"For a distributed DDL on circular replicated cluster its table name must be qualified by database name.",
ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION,
"For a distributed DDL on circular replicated cluster its table name must be qualified by database name.");
}
if (default_database == query_with_table->database)
return;
@ -635,8 +631,9 @@ void DDLWorker::parseQueryAndResolveHost(DDLTask & task)
{
if (found_via_resolving)
{
throw Exception("There are two the same ClickHouse instances in cluster " + task.cluster_name + " : "
+ task.address_in_cluster.readableString() + " and " + address.readableString(), ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION,
"There are two the same ClickHouse instances in cluster {} : {} and {}",
task.cluster_name, task.address_in_cluster.readableString(), address.readableString());
}
else
{
@ -651,8 +648,9 @@ void DDLWorker::parseQueryAndResolveHost(DDLTask & task)
if (!found_via_resolving)
{
throw Exception("Not found host " + task.host_id.readableString() + " in definition of cluster " + task.cluster_name,
ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION,
"Not found host {} in definition of cluster {}",
task.host_id.readableString(), task.cluster_name);
}
else
{
@ -673,7 +671,7 @@ bool DDLWorker::tryExecuteQuery(const String & query, const DDLTask & task, Exec
try
{
current_context = std::make_unique<Context>(context);
auto current_context = std::make_unique<Context>(context);
current_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
current_context->setCurrentQueryId(""); // generate random query_id
executeQuery(istr, ostr, false, *current_context, {});
@ -707,8 +705,44 @@ void DDLWorker::attachToThreadGroup()
}
void DDLWorker::processTask(DDLTask & task, const ZooKeeperPtr & zookeeper)
void DDLWorker::enqueueTask(DDLTaskPtr task_ptr)
{
auto & task = *task_ptr;
while (!stop_flag)
{
try
{
processTask(task);
return;
}
catch (const Coordination::Exception & e)
{
if (Coordination::isHardwareError(e.code))
{
recoverZooKeeper();
}
else if (e.code == Coordination::Error::ZNONODE)
{
LOG_ERROR(log, "ZooKeeper error: {}", getCurrentExceptionMessage(true));
// TODO: retry?
}
else
{
LOG_ERROR(log, "Unexpected ZooKeeper error: {}.", getCurrentExceptionMessage(true));
return;
}
}
catch (...)
{
LOG_WARNING(log, "An error occurred while processing task {} ({}) : {}", task.entry_name, task.entry.query, getCurrentExceptionMessage(true));
}
}
}
void DDLWorker::processTask(DDLTask & task)
{
auto zookeeper = tryGetZooKeeper();
LOG_DEBUG(log, "Processing task {} ({})", task.entry_name, task.entry.query);
String dummy;
@ -816,16 +850,17 @@ void DDLWorker::checkShardConfig(const String & table, const DDLTask & task, Sto
if (storage->supportsReplication() && !config_is_replicated_shard)
{
throw Exception("Table " + backQuote(table) + " is replicated, but shard #" + toString(task.host_shard_num + 1) +
" isn't replicated according to its cluster definition."
" Possibly <internal_replication>true</internal_replication> is forgotten in the cluster config.",
ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION,
"Table {} is replicated, but shard #{} isn't replicated according to its cluster definition. "
"Possibly <internal_replication>true</internal_replication> is forgotten in the cluster config.",
backQuote(table), task.host_shard_num + 1);
}
if (!storage->supportsReplication() && config_is_replicated_shard)
{
throw Exception("Table " + backQuote(table) + " isn't replicated, but shard #" + toString(task.host_shard_num + 1) +
" is replicated according to its cluster definition", ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
throw Exception(ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION,
"Table {} isn't replicated, but shard #{} is replicated according to its cluster definition",
backQuote(table), task.host_shard_num + 1);
}
}
@ -841,7 +876,7 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
/// If we will develop new replicated storage
if (!replicated_storage)
throw Exception("Storage type '" + storage->getName() + "' is not supported by distributed DDL", ErrorCodes::NOT_IMPLEMENTED);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Storage type '{}' is not supported by distributed DDL", storage->getName());
/// Generate unique name for shard node, it will be used to execute the query by only single host
/// Shard node name has format 'replica_name1,replica_name2,...,replica_nameN'
@ -1118,7 +1153,7 @@ void DDLWorker::runMainThread()
attachToThreadGroup();
cleanup_event->set();
processTasks();
scheduleTasks();
LOG_DEBUG(log, "Waiting a watch");
queue_updated_event->wait();
@ -1127,23 +1162,7 @@ void DDLWorker::runMainThread()
{
if (Coordination::isHardwareError(e.code))
{
LOG_DEBUG(log, "Recovering ZooKeeper session after: {}", getCurrentExceptionMessage(false));
while (!stop_flag)
{
try
{
getAndSetZooKeeper();
break;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
using namespace std::chrono_literals;
std::this_thread::sleep_for(5s);
}
}
recoverZooKeeper();
}
else if (e.code == Coordination::Error::ZNONODE)
{
@ -1260,28 +1279,24 @@ public:
size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished;
size_t num_active_hosts = current_active_hosts.size();
std::stringstream msg;
msg << "Watching task " << node_path << " is executing longer than distributed_ddl_task_timeout"
<< " (=" << timeout_seconds << ") seconds."
<< " There are " << num_unfinished_hosts << " unfinished hosts"
<< " (" << num_active_hosts << " of them are currently active)"
<< ", they are going to execute the query in background";
throw Exception(msg.str(), ErrorCodes::TIMEOUT_EXCEEDED);
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED,
"Watching task {} is executing longer than distributed_ddl_task_timeout (={}) seconds. "
"There are {} unfinished hosts ({} of them are currently active), they are going to execute the query in background",
node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts);
}
if (num_hosts_finished != 0 || try_number != 0)
{
auto current_sleep_for = std::chrono::milliseconds(std::min(static_cast<size_t>(1000), 50 * (try_number + 1)));
std::this_thread::sleep_for(current_sleep_for);
sleepForMilliseconds(std::min<size_t>(1000, 50 * (try_number + 1)));
}
/// TODO: add shared lock
if (!zookeeper->exists(node_path))
{
throw Exception("Cannot provide query execution status. The query's node " + node_path
+ " has been deleted by the cleaner since it was finished (or its lifetime is expired)",
ErrorCodes::UNFINISHED);
throw Exception(ErrorCodes::UNFINISHED,
"Cannot provide query execution status. The query's node {} has been deleted by the cleaner since it was finished (or its lifetime is expired)",
node_path);
}
Strings new_hosts = getNewAndUpdate(getChildrenAllowNoNode(zookeeper, node_path + "/finished"));
@ -1304,7 +1319,7 @@ public:
auto [host, port] = Cluster::Address::fromString(host_id);
if (status.code != 0 && first_exception == nullptr)
first_exception = std::make_unique<Exception>("There was an error on [" + host + ":" + toString(port) + "]: " + status.message, status.code);
first_exception = std::make_unique<Exception>(status.code, "There was an error on [{}:{}]: {}", host, port, status.message);
++num_hosts_finished;

View File

@ -26,6 +26,7 @@ class ASTAlterQuery;
class AccessRightsElements;
struct DDLLogEntry;
struct DDLTask;
using DDLTaskPtr = std::unique_ptr<DDLTask>;
/// Pushes distributed DDL query to the queue
@ -37,7 +38,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & conte
class DDLWorker
{
public:
DDLWorker(const std::string & zk_root_dir, Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix);
DDLWorker(int pool_size_, const std::string & zk_root_dir, Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix);
~DDLWorker();
/// Pushes query into DDL queue, returns path to created node
@ -57,14 +58,19 @@ private:
ZooKeeperPtr tryGetZooKeeper() const;
/// If necessary, creates a new session and caches it.
ZooKeeperPtr getAndSetZooKeeper();
/// ZooKeeper recover loop (while not stopped).
void recoverZooKeeper();
void processTasks();
void checkCurrentTasks();
void scheduleTasks();
void saveTask(const String & entry_name);
/// Reads entry and check that the host belongs to host list of the task
/// Returns true and sets current_task if entry parsed and the check is passed
bool initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper);
/// Returns non-empty DDLTaskPtr if entry parsed and the check is passed
DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper);
void processTask(DDLTask & task, const ZooKeeperPtr & zookeeper);
void enqueueTask(DDLTaskPtr task);
void processTask(DDLTask & task);
/// Check that query should be executed on leader replica only
static bool taskShouldBeExecutedOnLeader(const ASTPtr ast_ddl, StoragePtr storage);
@ -101,32 +107,31 @@ private:
void attachToThreadGroup();
private:
bool is_circular_replicated;
std::atomic<bool> is_circular_replicated = false;
Context & context;
Poco::Logger * log;
std::unique_ptr<Context> current_context;
std::string host_fqdn; /// current host domain name
std::string host_fqdn_id; /// host_name:port
std::string queue_dir; /// dir with queue of queries
/// Name of last task that was skipped or successfully executed
std::string last_processed_task_name;
mutable std::mutex zookeeper_mutex;
ZooKeeperPtr current_zookeeper;
/// Save state of executed task to avoid duplicate execution on ZK error
using DDLTaskPtr = std::unique_ptr<DDLTask>;
DDLTaskPtr current_task;
std::vector<std::string> last_tasks;
std::shared_ptr<Poco::Event> queue_updated_event = std::make_shared<Poco::Event>();
std::shared_ptr<Poco::Event> cleanup_event = std::make_shared<Poco::Event>();
std::atomic<bool> stop_flag{false};
std::atomic<bool> stop_flag = false;
ThreadFromGlobalPool main_thread;
ThreadFromGlobalPool cleanup_thread;
/// Size of the pool for query execution.
size_t pool_size = 1;
ThreadPool worker_pool;
/// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago
Int64 cleanup_delay_period = 60; // minute (in seconds)
/// Delete node if its age is greater than that