#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace fs = std::filesystem; namespace DB { namespace ErrorCodes { extern const int NOT_IMPLEMENTED; extern const int TIMEOUT_EXCEEDED; extern const int UNFINISHED; extern const int QUERY_IS_PROHIBITED; extern const int LOGICAL_ERROR; } static ZooKeeperRetriesInfo getRetriesInfo() { const auto & config_ref = Context::getGlobalContextInstance()->getConfigRef(); return ZooKeeperRetriesInfo( "DistributedDDL", &Poco::Logger::get("DDLQueryStatusSource"), config_ref.getInt("distributed_ddl_keeper_max_retries", 5), config_ref.getInt("distributed_ddl_keeper_initial_backoff_ms", 100), config_ref.getInt("distributed_ddl_keeper_max_backoff_ms", 5000) ); } bool isSupportedAlterTypeForOnClusterDDLQuery(int type) { assert(type != ASTAlterCommand::NO_TYPE); static const std::unordered_set unsupported_alter_types{ /// It's dangerous, because it may duplicate data if executed on multiple replicas. We can allow it after #18978 ASTAlterCommand::ATTACH_PARTITION, /// Usually followed by ATTACH PARTITION ASTAlterCommand::FETCH_PARTITION, /// Logical error ASTAlterCommand::NO_TYPE, }; return !unsupported_alter_types.contains(type); } BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context, const DDLQueryOnClusterParams & params) { OpenTelemetry::SpanHolder span(__FUNCTION__, OpenTelemetry::PRODUCER); if (context->getCurrentTransaction() && context->getSettingsRef().throw_on_unsupported_query_inside_transaction) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "ON CLUSTER queries inside transactions are not supported"); /// Remove FORMAT and INTO OUTFILE if exists ASTPtr query_ptr = query_ptr_->clone(); ASTQueryWithOutput::resetOutputASTIfExist(*query_ptr); // XXX: serious design flaw since `ASTQueryWithOnCluster` is not inherited from `IAST`! auto * query = dynamic_cast(query_ptr.get()); if (!query) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Distributed execution is not supported for such DDL queries"); } if (!context->getSettingsRef().allow_distributed_ddl) throw Exception(ErrorCodes::QUERY_IS_PROHIBITED, "Distributed DDL queries are prohibited for the user"); if (const auto * query_alter = query_ptr->as()) { for (const auto & command : query_alter->command_list->children) { if (!isSupportedAlterTypeForOnClusterDDLQuery(command->as().type)) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported type of ALTER query"); } } ClusterPtr cluster = params.cluster; if (!cluster) { query->cluster = context->getMacros()->expand(query->cluster); cluster = context->getCluster(query->cluster); } span.addAttribute("clickhouse.cluster", query->cluster); if (!cluster->areDistributedDDLQueriesAllowed()) throw Exception(ErrorCodes::QUERY_IS_PROHIBITED, "Distributed DDL queries are prohibited for the cluster"); /// TODO: support per-cluster grant context->checkAccess(AccessType::CLUSTER); DDLWorker & ddl_worker = context->getDDLWorker(); /// Enumerate hosts which will be used to send query. auto addresses = cluster->filterAddressesByShardOrReplica(params.only_shard_num, params.only_replica_num); if (addresses.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "No hosts defined to execute distributed DDL query"); std::vector hosts; hosts.reserve(addresses.size()); for (const auto * address : addresses) hosts.emplace_back(*address); /// The current database in a distributed query need to be replaced with either /// the local current database or a shard's default database. AccessRightsElements access_to_check = params.access_to_check; bool need_replace_current_database = std::any_of( access_to_check.begin(), access_to_check.end(), [](const AccessRightsElement & elem) { return elem.isEmptyDatabase(); }); bool use_local_default_database = false; const String & current_database = context->getCurrentDatabase(); if (need_replace_current_database) { Strings host_default_databases; for (const auto * address : addresses) { if (!address->default_database.empty()) host_default_databases.push_back(address->default_database); else use_local_default_database = true; } ::sort(host_default_databases.begin(), host_default_databases.end()); host_default_databases.erase(std::unique(host_default_databases.begin(), host_default_databases.end()), host_default_databases.end()); assert(use_local_default_database || !host_default_databases.empty()); if (use_local_default_database && !host_default_databases.empty()) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Mixed local default DB and shard default DB in DDL query"); if (use_local_default_database) { access_to_check.replaceEmptyDatabase(current_database); } else { for (size_t i = 0; i != access_to_check.size();) { auto & element = access_to_check[i]; if (element.isEmptyDatabase()) { access_to_check.insert(access_to_check.begin() + i + 1, host_default_databases.size() - 1, element); for (size_t j = 0; j != host_default_databases.size(); ++j) access_to_check[i + j].replaceEmptyDatabase(host_default_databases[j]); i += host_default_databases.size(); } else ++i; } } } AddDefaultDatabaseVisitor visitor(context, current_database, !use_local_default_database); visitor.visitDDL(query_ptr); /// Check access rights, assume that all servers have the same users config context->checkAccess(access_to_check); DDLLogEntry entry; entry.hosts = std::move(hosts); entry.query = queryToString(query_ptr); entry.initiator = ddl_worker.getCommonHostID(); entry.setSettingsIfRequired(context); entry.tracing_context = OpenTelemetry::CurrentContext(); entry.initial_query_id = context->getClientInfo().initial_query_id; String node_path = ddl_worker.enqueueQuery(entry); return getDistributedDDLStatus(node_path, entry, context, /* hosts_to_wait */ nullptr); } class DDLQueryStatusSource final : public ISource { public: DDLQueryStatusSource( const String & zk_node_path, const DDLLogEntry & entry, ContextPtr context_, const Strings * hosts_to_wait); String getName() const override { return "DDLQueryStatus"; } Chunk generate() override; Status prepare() override; private: static Strings getChildrenAllowNoNode(const std::shared_ptr & zookeeper, const String & node_path); static Block getSampleBlock(ContextPtr context_, bool hosts_to_wait); Strings getNewAndUpdate(const Strings & current_list_of_finished_hosts); std::pair parseHostAndPort(const String & host_id) const; Chunk generateChunkWithUnfinishedHosts() const; enum ReplicatedDatabaseQueryStatus { /// Query is (successfully) finished OK = 0, /// Query is not finished yet, but replica is currently executing it IN_PROGRESS = 1, /// Replica is not available or busy with previous queries. It will process query asynchronously QUEUED = 2, }; String node_path; ContextPtr context; Stopwatch watch; Poco::Logger * log; NameSet waiting_hosts; /// hosts from task host list NameSet finished_hosts; /// finished hosts from host list NameSet ignoring_hosts; /// appeared hosts that are not in hosts list Strings current_active_hosts; /// Hosts that were in active state at the last check size_t num_hosts_finished = 0; /// Save the first detected error and throw it at the end of execution std::unique_ptr first_exception; Int64 timeout_seconds = 120; bool is_replicated_database = false; bool throw_on_timeout = true; bool timeout_exceeded = false; }; BlockIO getDistributedDDLStatus(const String & node_path, const DDLLogEntry & entry, ContextPtr context, const Strings * hosts_to_wait) { BlockIO io; if (context->getSettingsRef().distributed_ddl_task_timeout == 0) return io; auto source = std::make_shared(node_path, entry, context, hosts_to_wait); io.pipeline = QueryPipeline(std::move(source)); if (context->getSettingsRef().distributed_ddl_output_mode == DistributedDDLOutputMode::NONE) io.pipeline.complete(std::make_shared(io.pipeline.getHeader())); return io; } Block DDLQueryStatusSource::getSampleBlock(ContextPtr context_, bool hosts_to_wait) { auto output_mode = context_->getSettingsRef().distributed_ddl_output_mode; auto maybe_make_nullable = [&](const DataTypePtr & type) -> DataTypePtr { if (output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::NONE) return type; return std::make_shared(type); }; auto get_status_enum = []() { return std::make_shared( DataTypeEnum8::Values { {"OK", static_cast(OK)}, {"IN_PROGRESS", static_cast(IN_PROGRESS)}, {"QUEUED", static_cast(QUEUED)}, }); }; if (hosts_to_wait) { return Block{ {std::make_shared(), "shard"}, {std::make_shared(), "replica"}, {get_status_enum(), "status"}, {std::make_shared(), "num_hosts_remaining"}, {std::make_shared(), "num_hosts_active"}, }; } else { return Block{ {std::make_shared(), "host"}, {std::make_shared(), "port"}, {maybe_make_nullable(std::make_shared()), "status"}, {maybe_make_nullable(std::make_shared()), "error"}, {std::make_shared(), "num_hosts_remaining"}, {std::make_shared(), "num_hosts_active"}, }; } } DDLQueryStatusSource::DDLQueryStatusSource( const String & zk_node_path, const DDLLogEntry & entry, ContextPtr context_, const Strings * hosts_to_wait) : ISource(getSampleBlock(context_, static_cast(hosts_to_wait))) , node_path(zk_node_path) , context(context_) , watch(CLOCK_MONOTONIC_COARSE) , log(&Poco::Logger::get("DDLQueryStatusSource")) { auto output_mode = context->getSettingsRef().distributed_ddl_output_mode; throw_on_timeout = output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::NONE; if (hosts_to_wait) { waiting_hosts = NameSet(hosts_to_wait->begin(), hosts_to_wait->end()); is_replicated_database = true; } else { for (const HostID & host : entry.hosts) waiting_hosts.emplace(host.toString()); } addTotalRowsApprox(waiting_hosts.size()); timeout_seconds = context->getSettingsRef().distributed_ddl_task_timeout; } std::pair DDLQueryStatusSource::parseHostAndPort(const String & host_id) const { String host = host_id; UInt16 port = 0; if (!is_replicated_database) { auto host_and_port = Cluster::Address::fromString(host_id); host = host_and_port.first; port = host_and_port.second; } return {host, port}; } Chunk DDLQueryStatusSource::generateChunkWithUnfinishedHosts() const { NameSet unfinished_hosts = waiting_hosts; for (const auto & host_id : finished_hosts) unfinished_hosts.erase(host_id); NameSet active_hosts_set = NameSet{current_active_hosts.begin(), current_active_hosts.end()}; /// Query is not finished on the rest hosts, so fill the corresponding rows with NULLs. MutableColumns columns = output.getHeader().cloneEmptyColumns(); for (const String & host_id : unfinished_hosts) { size_t num = 0; if (is_replicated_database) { auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id); columns[num++]->insert(shard); columns[num++]->insert(replica); if (active_hosts_set.contains(host_id)) columns[num++]->insert(IN_PROGRESS); else columns[num++]->insert(QUEUED); } else { auto [host, port] = parseHostAndPort(host_id); columns[num++]->insert(host); columns[num++]->insert(port); columns[num++]->insert(Field{}); columns[num++]->insert(Field{}); } columns[num++]->insert(unfinished_hosts.size()); columns[num++]->insert(current_active_hosts.size()); } return Chunk(std::move(columns), unfinished_hosts.size()); } Chunk DDLQueryStatusSource::generate() { bool all_hosts_finished = num_hosts_finished >= waiting_hosts.size(); /// Seems like num_hosts_finished cannot be strictly greater than waiting_hosts.size() assert(num_hosts_finished <= waiting_hosts.size()); if (all_hosts_finished || timeout_exceeded) return {}; String node_to_wait = "finished"; if (is_replicated_database && context->getSettingsRef().database_replicated_enforce_synchronous_settings) node_to_wait = "synced"; size_t try_number = 0; while (true) { if (isCancelled()) return {}; if (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds) { timeout_exceeded = true; size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished; size_t num_active_hosts = current_active_hosts.size(); constexpr auto msg_format = "Watching task {} is executing longer than distributed_ddl_task_timeout (={}) seconds. " "There are {} unfinished hosts ({} of them are currently active), " "they are going to execute the query in background"; if (throw_on_timeout) { if (!first_exception) first_exception = std::make_unique(Exception(ErrorCodes::TIMEOUT_EXCEEDED, msg_format, node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts)); /// For Replicated database print a list of unfinished hosts as well. Will return empty block on next iteration. if (is_replicated_database) return generateChunkWithUnfinishedHosts(); return {}; } LOG_INFO(log, msg_format, node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts); return generateChunkWithUnfinishedHosts(); } if (num_hosts_finished != 0 || try_number != 0) { sleepForMilliseconds(std::min(1000, 50 * (try_number + 1))); } bool node_exists = false; Strings tmp_hosts; Strings tmp_active_hosts; { auto retries_info = getRetriesInfo(); auto retries_ctl = ZooKeeperRetriesControl("executeDDLQueryOnCluster", retries_info, context->getProcessListElement()); retries_ctl.retryLoop([&]() { auto zookeeper = context->getZooKeeper(); node_exists = zookeeper->exists(node_path); tmp_hosts = getChildrenAllowNoNode(zookeeper, fs::path(node_path) / node_to_wait); tmp_active_hosts = getChildrenAllowNoNode(zookeeper, fs::path(node_path) / "active"); }); } if (!node_exists) { /// Paradoxically, this exception will be throw even in case of "never_throw" mode. if (!first_exception) first_exception = std::make_unique(Exception(ErrorCodes::UNFINISHED, "Cannot provide query execution status. The query's node {} has been deleted by the cleaner" " since it was finished (or its lifetime is expired)", node_path)); return {}; } Strings new_hosts = getNewAndUpdate(tmp_hosts); ++try_number; if (new_hosts.empty()) continue; current_active_hosts = std::move(tmp_active_hosts); MutableColumns columns = output.getHeader().cloneEmptyColumns(); for (const String & host_id : new_hosts) { ExecutionStatus status(-1, "Cannot obtain error message"); if (node_to_wait == "finished") { String status_data; bool finished_exists = false; auto retries_info = getRetriesInfo(); auto retries_ctl = ZooKeeperRetriesControl("executeDDLQueryOnCluster", retries_info, context->getProcessListElement()); retries_ctl.retryLoop([&]() { finished_exists = context->getZooKeeper()->tryGet(fs::path(node_path) / "finished" / host_id, status_data); }); if (finished_exists) status.tryDeserializeText(status_data); } else { status = ExecutionStatus{0}; } if (status.code != 0 && !first_exception && context->getSettingsRef().distributed_ddl_output_mode != DistributedDDLOutputMode::NEVER_THROW) { /// Replicated database retries in case of error, it should not write error status. if (is_replicated_database) throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message); auto [host, port] = parseHostAndPort(host_id); first_exception = std::make_unique(Exception(status.code, "There was an error on [{}:{}]: {}", host, port, status.message)); } ++num_hosts_finished; size_t num = 0; if (is_replicated_database) { if (status.code != 0) throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message); auto [shard, replica] = DatabaseReplicated::parseFullReplicaName(host_id); columns[num++]->insert(shard); columns[num++]->insert(replica); columns[num++]->insert(OK); } else { auto [host, port] = parseHostAndPort(host_id); columns[num++]->insert(host); columns[num++]->insert(port); columns[num++]->insert(status.code); columns[num++]->insert(status.message); } columns[num++]->insert(waiting_hosts.size() - num_hosts_finished); columns[num++]->insert(current_active_hosts.size()); } return Chunk(std::move(columns), new_hosts.size()); } } IProcessor::Status DDLQueryStatusSource::prepare() { /// This method is overloaded to throw exception after all data is read. /// Exception is pushed into pipe (instead of simply being thrown) to ensure the order of data processing and exception. if (finished) { if (first_exception) { if (!output.canPush()) return Status::PortFull; output.pushException(std::make_exception_ptr(*first_exception)); } output.finish(); return Status::Finished; } else return ISource::prepare(); } Strings DDLQueryStatusSource::getChildrenAllowNoNode(const std::shared_ptr & zookeeper, const String & node_path) { Strings res; Coordination::Error code = zookeeper->tryGetChildren(node_path, res); if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE) throw Coordination::Exception::fromPath(code, node_path); return res; } Strings DDLQueryStatusSource::getNewAndUpdate(const Strings & current_list_of_finished_hosts) { Strings diff; for (const String & host : current_list_of_finished_hosts) { if (!waiting_hosts.contains(host)) { if (!ignoring_hosts.contains(host)) { ignoring_hosts.emplace(host); LOG_INFO(log, "Unexpected host {} appeared in task {}", host, node_path); } continue; } if (!finished_hosts.contains(host)) { diff.emplace_back(host); finished_hosts.emplace(host); } } return diff; } bool maybeRemoveOnCluster(const ASTPtr & query_ptr, ContextPtr context) { const auto * query = dynamic_cast(query_ptr.get()); if (!query || !query->table) return false; String database_name = query->getDatabase(); if (database_name.empty()) database_name = context->getCurrentDatabase(); auto * query_on_cluster = dynamic_cast(query_ptr.get()); if (database_name != query_on_cluster->cluster) return false; auto database = DatabaseCatalog::instance().tryGetDatabase(database_name); if (database && database->shouldReplicateQuery(context, query_ptr)) { /// It's Replicated database and query is replicated on database level, /// so ON CLUSTER clause is redundant. query_on_cluster->cluster.clear(); return true; } return false; } }