Add support for zookeeper retries to executeDDLQueryOnCluster().

This commit is contained in:
Vitaly Baranov 2024-10-29 20:20:08 +01:00
parent 8fea878834
commit 982b67fb22
7 changed files with 42 additions and 9 deletions

View File

@ -15,14 +15,15 @@ namespace ErrorCodes
struct ZooKeeperRetriesInfo
{
ZooKeeperRetriesInfo() = default;
ZooKeeperRetriesInfo(UInt64 max_retries_, UInt64 initial_backoff_ms_, UInt64 max_backoff_ms_)
: max_retries(max_retries_), initial_backoff_ms(std::min(initial_backoff_ms_, max_backoff_ms_)), max_backoff_ms(max_backoff_ms_)
{
}
UInt64 max_retries;
UInt64 initial_backoff_ms;
UInt64 max_backoff_ms;
UInt64 max_retries = 0; /// "max_retries = 0" means only one attempt.
UInt64 initial_backoff_ms = 100;
UInt64 max_backoff_ms = 5000;
};
class ZooKeeperRetriesControl
@ -220,6 +221,7 @@ private:
return false;
}
/// Check if the query was cancelled.
if (process_list_element)
process_list_element->checkTimeLimit();
@ -228,6 +230,10 @@ private:
sleepForMilliseconds(current_backoff_ms);
current_backoff_ms = std::min(current_backoff_ms * 2, retries_info.max_backoff_ms);
/// Check if the query was cancelled again after sleeping.
if (process_list_element)
process_list_element->checkTimeLimit();
return true;
}

View File

@ -199,13 +199,12 @@ void DatabaseReplicatedDDLWorker::initializeReplication()
active_node_holder = zkutil::EphemeralNodeHolder::existing(active_path, *active_node_holder_zookeeper);
}
String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry)
String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo &, QueryStatusPtr)
{
auto zookeeper = getAndSetZooKeeper();
return enqueueQueryImpl(zookeeper, entry, database);
}
bool DatabaseReplicatedDDLWorker::waitForReplicaToProcessAllEntries(UInt64 timeout_ms)
{
auto zookeeper = getAndSetZooKeeper();

View File

@ -24,7 +24,7 @@ class DatabaseReplicatedDDLWorker : public DDLWorker
public:
DatabaseReplicatedDDLWorker(DatabaseReplicated * db, ContextPtr context_);
String enqueueQuery(DDLLogEntry & entry) override;
String enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo &, QueryStatusPtr) override;
String tryEnqueueAndExecuteEntry(DDLLogEntry & entry, ContextPtr query_context);

View File

@ -26,6 +26,7 @@
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperLock.h>
#include <Common/ZooKeeper/ZooKeeperRetries.h>
#include <Common/isLocalAddress.h>
#include <Common/logger_useful.h>
#include <Common/randomSeed.h>
@ -1053,7 +1054,25 @@ void DDLWorker::createStatusDirs(const std::string & node_path, const ZooKeeperP
}
String DDLWorker::enqueueQuery(DDLLogEntry & entry)
String DDLWorker::enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo & retries_info, QueryStatusPtr process_list_element)
{
String node_path;
if (retries_info.max_retries > 0)
{
ZooKeeperRetriesControl retries_ctl{"DDLWorker::enqueueQuery", log, retries_info, process_list_element};
retries_ctl.retryLoop([&]{
node_path = enqueueQueryAttempt(entry);
});
}
else
{
node_path = enqueueQueryAttempt(entry);
}
return node_path;
}
String DDLWorker::enqueueQueryAttempt(DDLLogEntry & entry)
{
if (entry.hosts.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty host list in a distributed DDL task");

View File

@ -48,6 +48,9 @@ struct DDLTaskBase;
using DDLTaskPtr = std::unique_ptr<DDLTaskBase>;
using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
class AccessRightsElements;
struct ZooKeeperRetriesInfo;
class QueryStatus;
using QueryStatusPtr = std::shared_ptr<QueryStatus>;
class DDLWorker
{
@ -65,7 +68,7 @@ public:
virtual ~DDLWorker();
/// Pushes query into DDL queue, returns path to created node
virtual String enqueueQuery(DDLLogEntry & entry);
virtual String enqueueQuery(DDLLogEntry & entry, const ZooKeeperRetriesInfo & retries_info, QueryStatusPtr process_list_element);
/// Host ID (name:port) for logging purposes
/// Note that in each task hosts are identified individually by name:port from initiator server cluster config
@ -120,6 +123,9 @@ protected:
mutable std::shared_mutex mtx;
};
/// Pushes query into DDL queue, returns path to created node
String enqueueQueryAttempt(DDLLogEntry & entry);
/// Iterates through queue tasks in ZooKeeper, runs execution of new tasks
void scheduleTasks(bool reinitialized);

View File

@ -189,7 +189,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context,
entry.setSettingsIfRequired(context);
entry.tracing_context = OpenTelemetry::CurrentContext();
entry.initial_query_id = context->getClientInfo().initial_query_id;
String node_path = ddl_worker.enqueueQuery(entry);
String node_path = ddl_worker.enqueueQuery(entry, params.retries_info, context->getProcessListElement());
return getDDLOnClusterStatus(node_path, ddl_worker.getReplicasDir(), entry, context);
}

View File

@ -37,6 +37,9 @@ struct DDLQueryOnClusterParams
/// Privileges which the current user should have to execute a query.
AccessRightsElements access_to_check;
/// Use retries when creating nodes "query-0000000000", "query-0000000001", "query-0000000002" in ZooKeeper.
ZooKeeperRetriesInfo retries_info;
};
/// Pushes distributed DDL query to the queue.