#pragma once #include #include #include namespace Poco { class Logger; } namespace zkutil { class ZooKeeper; } namespace DB { class ASTQueryWithOnCluster; using ZooKeeperPtr = std::shared_ptr; class DatabaseReplicated; struct MetadataTransaction; using MetadataTransactionPtr = std::shared_ptr; struct HostID { String host_name; UInt16 port; HostID() = default; explicit HostID(const Cluster::Address & address) : host_name(address.host_name), port(address.port) {} static HostID fromString(const String & host_port_str); String toString() const { return Cluster::Address::toString(host_name, port); } String readableString() const { return host_name + ":" + DB::toString(port); } bool isLocalAddress(UInt16 clickhouse_port) const; static String applyToString(const HostID & host_id) { return host_id.toString(); } }; struct DDLLogEntry { String query; std::vector hosts; String initiator; // optional static constexpr int CURRENT_VERSION = 1; String toString() const; void parse(const String & data); }; struct DDLTaskBase { const String entry_name; const String entry_path; DDLLogEntry entry; String host_id_str; ASTPtr query; bool is_initial_query = false; bool is_circular_replicated = false; bool execute_on_leader = false; Coordination::Requests ops; ExecutionStatus execution_status; bool was_executed = false; std::atomic_bool completely_processed = false; DDLTaskBase(const String & name, const String & path) : entry_name(name), entry_path(path) {} DDLTaskBase(const DDLTaskBase &) = delete; virtual ~DDLTaskBase() = default; void parseQueryFromEntry(const Context & context); virtual String getShardID() const = 0; virtual std::unique_ptr makeQueryContext(Context & from_context); inline String getActiveNodePath() const { return entry_path + "/active/" + host_id_str; } inline String getFinishedNodePath() const { return entry_path + "/finished/" + host_id_str; } inline String getShardNodePath() const { return entry_path + "/shards/" + getShardID(); } }; struct DDLTask : public DDLTaskBase { DDLTask(const String & name, const String & path) : DDLTaskBase(name, path) {} bool findCurrentHostID(const Context & global_context, Poco::Logger * log); void setClusterInfo(const Context & context, Poco::Logger * log); String getShardID() const override; private: bool tryFindHostInCluster(); bool tryFindHostInClusterViaResolving(const Context & context); HostID host_id; String cluster_name; ClusterPtr cluster; Cluster::Address address_in_cluster; size_t host_shard_num; size_t host_replica_num; }; struct DatabaseReplicatedTask : public DDLTaskBase { DatabaseReplicatedTask(const String & name, const String & path, DatabaseReplicated * database_); String getShardID() const override; std::unique_ptr makeQueryContext(Context & from_context) override; static String getLogEntryName(UInt32 log_entry_number); static UInt32 getLogEntryNumber(const String & log_entry_name); DatabaseReplicated * database; }; struct MetadataTransaction { enum State { CREATED, COMMITED, FAILED }; State state = CREATED; ZooKeeperPtr current_zookeeper; String zookeeper_path; bool is_initial_query; Coordination::Requests ops; void addOps(Coordination::Requests & other_ops) { std::move(ops.begin(), ops.end(), std::back_inserter(other_ops)); ops.clear(); } void commit(); ~MetadataTransaction() { assert(state != CREATED || std::uncaught_exception()); } }; }