#pragma once #include #include #include #include #include #include #include #include #include #include namespace DB { class ASTAlterQuery; struct DDLLogEntry; struct DDLTask; /// Pushes distributed DDL query to the queue BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context, const NameSet & query_databases); class DDLWorker { public: DDLWorker(const std::string & zk_root_dir, Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix); ~DDLWorker(); /// Pushes query into DDL queue, returns path to created node String enqueueQuery(DDLLogEntry & entry); /// Host ID (name:port) for logging purposes /// Note that in each task hosts are identified individually by name:port from initiator server cluster config std::string getCommonHostID() const { return host_fqdn_id; } private: void processTasks(); /// Reads entry and check that the host belongs to host list of the task /// Returns true and sets current_task if entry parsed and the check is passed bool initAndCheckTask(const String & entry_name, String & out_reason); void processTask(DDLTask & task); void processTaskAlter( DDLTask & task, const ASTAlterQuery * ast_alter, const String & rewritten_query, const String & node_path); void parseQueryAndResolveHost(DDLTask & task); bool tryExecuteQuery(const String & query, const DDLTask & task, ExecutionStatus & status); /// Checks and cleanups queue's nodes void cleanupQueue(); /// Init task node void createStatusDirs(const std::string & node_name); void run(); void attachToThreadGroup(); private: Context & context; Logger * log; std::unique_ptr current_context; std::string host_fqdn; /// current host domain name std::string host_fqdn_id; /// host_name:port std::string queue_dir; /// dir with queue of queries /// Name of last task that was skipped or successfully executed std::string last_processed_task_name; std::shared_ptr zookeeper; /// Save state of executed task to avoid duplicate execution on ZK error using DDLTaskPtr = std::unique_ptr; DDLTaskPtr current_task; std::shared_ptr event_queue_updated; std::atomic stop_flag{false}; std::thread thread; Int64 last_cleanup_time_seconds = 0; /// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago Int64 cleanup_delay_period = 60; // minute (in seconds) /// Delete node if its age is greater than that Int64 task_max_lifetime = 7 * 24 * 60 * 60; // week (in seconds) /// How many tasks could be in the queue size_t max_tasks_in_queue = 1000; ThreadGroupStatusPtr thread_group; std::mutex thread_group_mutex; friend class DDLQueryStatusInputSream; friend struct DDLTask; }; }