ClickHouse/src/Interpreters/DDLWorker.h

146 lines
4.5 KiB
C++
Raw Normal View History

#pragma once
#include <Common/CurrentThread.h>
#include <Common/ThreadPool.h>
2020-11-03 13:47:26 +00:00
#include <Storages/IStorage_fwd.h>
#include <Parsers/IAST_fwd.h>
#include <Interpreters/Context.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>
namespace zkutil
{
class ZooKeeper;
}
2020-11-03 13:47:26 +00:00
namespace Poco
{
class Logger;
namespace Util { class AbstractConfiguration; }
}
namespace DB
{
class Context;
class ASTAlterQuery;
struct DDLLogEntry;
2020-11-27 14:04:03 +00:00
struct DDLTaskBase;
using DDLTaskPtr = std::unique_ptr<DDLTaskBase>;
2020-11-13 18:35:45 +00:00
using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
class DDLWorker
{
public:
2020-11-13 18:35:45 +00:00
DDLWorker(int pool_size_, const std::string & zk_root_dir, const Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix,
2020-11-27 14:04:03 +00:00
const String & logger_name = "DDLWorker");
virtual ~DDLWorker();
2017-04-21 12:39:28 +00:00
/// Pushes query into DDL queue, returns path to created node
2020-11-27 14:04:03 +00:00
virtual String enqueueQuery(DDLLogEntry & entry);
/// Host ID (name:port) for logging purposes
2017-07-28 16:14:49 +00:00
/// Note that in each task hosts are identified individually by name:port from initiator server cluster config
std::string getCommonHostID() const
{
return host_fqdn_id;
}
2020-11-13 18:35:45 +00:00
void shutdown();
2020-11-29 11:45:32 +00:00
bool isCurrentlyActive() const { return initialized && !stop_flag; }
2020-11-27 14:04:03 +00:00
protected:
/// Returns cached ZooKeeper session (possibly expired).
ZooKeeperPtr tryGetZooKeeper() const;
/// If necessary, creates a new session and caches it.
ZooKeeperPtr getAndSetZooKeeper();
/// ZooKeeper recover loop (while not stopped).
void recoverZooKeeper();
void checkCurrentTasks();
void scheduleTasks();
/// Reads entry and check that the host belongs to host list of the task
/// Returns non-empty DDLTaskPtr if entry parsed and the check is passed
2020-11-27 14:04:03 +00:00
virtual DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper);
2017-07-28 16:14:49 +00:00
void enqueueTask(DDLTaskPtr task);
2020-11-27 14:04:03 +00:00
void processTask(DDLTaskBase & task);
/// Check that query should be executed on leader replica only
2020-03-18 00:57:00 +00:00
static bool taskShouldBeExecutedOnLeader(const ASTPtr ast_ddl, StoragePtr storage);
/// Executes query only on leader replica in case of replicated table.
/// Queries like TRUNCATE/ALTER .../OPTIMIZE have to be executed only on one node of shard.
/// Most of these queries can be executed on non-leader replica, but actually they still send
/// query via RemoteBlockOutputStream to leader, so to avoid such "2-phase" query execution we
/// execute query directly on leader.
bool tryExecuteQueryOnLeaderReplica(
2020-11-27 14:04:03 +00:00
DDLTaskBase & task,
StoragePtr storage,
const String & rewritten_query,
const String & node_path,
const ZooKeeperPtr & zookeeper);
2020-11-27 14:04:03 +00:00
bool tryExecuteQuery(const String & query, const DDLTaskBase & task, ExecutionStatus & status);
2017-04-19 14:21:27 +00:00
/// Checks and cleanups queue's nodes
void cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zookeeper);
/// Init task node
2020-03-18 00:57:00 +00:00
static void createStatusDirs(const std::string & node_path, const ZooKeeperPtr & zookeeper);
2017-04-25 15:21:03 +00:00
2020-11-27 14:04:03 +00:00
virtual void initialize() {}
2017-07-28 16:14:49 +00:00
void runMainThread();
void runCleanupThread();
void attachToThreadGroup();
2020-11-27 14:04:03 +00:00
protected:
Context context;
2020-05-30 21:57:37 +00:00
Poco::Logger * log;
std::string host_fqdn; /// current host domain name
std::string host_fqdn_id; /// host_name:port
std::string queue_dir; /// dir with queue of queries
mutable std::mutex zookeeper_mutex;
ZooKeeperPtr current_zookeeper;
/// Save state of executed task to avoid duplicate execution on ZK error
2020-11-27 14:04:03 +00:00
//std::vector<std::string> last_tasks;
std::optional<String> last_entry_name;
std::shared_ptr<Poco::Event> queue_updated_event = std::make_shared<Poco::Event>();
std::shared_ptr<Poco::Event> cleanup_event = std::make_shared<Poco::Event>();
2020-11-29 11:45:32 +00:00
std::atomic<bool> initialized = false;
std::atomic<bool> stop_flag = false;
ThreadFromGlobalPool main_thread;
ThreadFromGlobalPool cleanup_thread;
2017-05-31 14:01:08 +00:00
/// Size of the pool for query execution.
size_t pool_size = 1;
ThreadPool worker_pool;
2017-05-31 14:01:08 +00:00
/// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago
Int64 cleanup_delay_period = 60; // minute (in seconds)
/// Delete node if its age is greater than that
Int64 task_max_lifetime = 7 * 24 * 60 * 60; // week (in seconds)
/// How many tasks could be in the queue
size_t max_tasks_in_queue = 1000;
2017-04-19 14:21:27 +00:00
ThreadGroupStatusPtr thread_group;
};
}