diff --git a/src/Storages/System/StorageSystemDDLWorkerQueue.cpp b/src/Storages/System/StorageSystemDDLWorkerQueue.cpp index 4464334be4f..495c2dcb454 100644 --- a/src/Storages/System/StorageSystemDDLWorkerQueue.cpp +++ b/src/Storages/System/StorageSystemDDLWorkerQueue.cpp @@ -5,7 +5,6 @@ #include "StorageSystemDDLWorkerQueue.h" #include -#include #include #include @@ -16,13 +15,14 @@ #include #include #include -#include #include -#include -#include #include #include +#include +#include +#include + namespace fs = std::filesystem; @@ -102,75 +102,8 @@ NamesAndTypesList StorageSystemDDLWorkerQueue::getNamesAndTypes() }; } - -static bool extractPathImpl(const IAST & elem, String & res, const Context & context) +void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo &) const { - const auto * function = elem.as(); - if (!function) - return false; - - if (function->name == "and") - { - for (const auto & child : function->arguments->children) - if (extractPathImpl(*child, res, context)) - return true; - return false; - } - - if (function->name == "equals") - { - const auto & args = function->arguments->as(); - ASTPtr value; - - if (args.children.size() != 2) - return false; - - const ASTIdentifier * ident; - if ((ident = args.children.at(0)->as())) - value = args.children.at(1); - else if ((ident = args.children.at(1)->as())) - value = args.children.at(0); - else - return false; - - if (ident->name() != "cluster") - return false; - - auto evaluated = evaluateConstantExpressionAsLiteral(value, context); - const auto * literal = evaluated->as(); - if (!literal) - return false; - - if (literal->value.getType() != Field::Types::String) - return false; - - res = literal->value.safeGet(); - return true; - } - return false; -} - - -/** Retrieve from the query a condition of the form `path = 'path'`, from conjunctions in the WHERE clause. - */ -static String extractPath(const ASTPtr & query, const Context & context) -{ - const auto & select = query->as(); - if (!select.where()) - return ""; - String res; - return extractPathImpl(*select.where(), res, context) ? res : ""; -} - - -void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const -{ - String cluster_name = extractPath(query_info.query, context); - if (cluster_name.empty()) - throw Exception( - "SELECT from system.ddl_worker_queue table must contain condition like cluster = 'cluster' in WHERE clause.", - ErrorCodes::BAD_ARGUMENTS); - zkutil::ZooKeeperPtr zookeeper = context.getZooKeeper(); Coordination::Error zk_exception_code = Coordination::Error::ZOK; String ddl_zookeeper_path = config.getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/"); @@ -189,107 +122,120 @@ void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const C if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE) zk_exception_code = code; - const auto & cluster = context.tryGetCluster(cluster_name); - const auto & shards_info = cluster->getShardsInfo(); - const auto & addresses_with_failover = cluster->getShardsAddresses(); - - if (cluster == nullptr) - throw Exception("No cluster with the name: " + cluster_name + " was found.", ErrorCodes::BAD_ARGUMENTS); - - for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index) + const auto & clusters = context.getClusters(); + for (const auto & name_and_cluster : clusters.getContainer()) { - const auto & shard_addresses = addresses_with_failover[shard_index]; - const auto & shard_info = shards_info[shard_index]; - const auto pool_status = shard_info.pool->getStatus(); - for (size_t replica_index = 0; replica_index < shard_addresses.size(); ++replica_index) + const ClusterPtr & cluster = name_and_cluster.second; + const auto & shards_info = cluster->getShardsInfo(); + const auto & addresses_with_failover = cluster->getShardsAddresses(); + for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index) { - // iterate through queries - /* Dir contents of every query will be similar to - [zk: localhost:2181(CONNECTED) 53] ls /clickhouse/task_queue/ddl/query-0000000004 - [active, finished] - */ - std::vector> futures; - futures.reserve(queries.size()); - for (const String & q : queries) + const auto & shard_addresses = addresses_with_failover[shard_index]; + const auto & shard_info = shards_info[shard_index]; + const auto pool_status = shard_info.pool->getStatus(); + for (size_t replica_index = 0; replica_index < shard_addresses.size(); ++replica_index) { - futures.push_back(zookeeper->asyncTryGet(fs::path(ddl_zookeeper_path) / q)); - } - for (size_t query_id = 0; query_id < queries.size(); query_id++) - { - Int64 query_finish_time = 0; - size_t i = 0; - res_columns[i++]->insert(queries[query_id]); // entry - const auto & address = shard_addresses[replica_index]; - res_columns[i++]->insert(address.host_name); // host_name - auto resolved = address.getResolvedAddress(); - res_columns[i++]->insert(resolved ? resolved->host().toString() : String()); // host_address - res_columns[i++]->insert(address.port); // port - ddl_query_path = fs::path(ddl_zookeeper_path) / queries[query_id]; + /* Dir contents of every query will be similar to + [zk: localhost:2181(CONNECTED) 53] ls /clickhouse/task_queue/ddl/query-0000000004 + [active, finished] + */ + std::vector> futures; + futures.reserve(queries.size()); + for (const String & q : queries) + { + futures.push_back(zookeeper->asyncTryGet(fs::path(ddl_zookeeper_path) / q)); + } + for (size_t query_id = 0; query_id < queries.size(); query_id++) + { + Int64 query_finish_time = 0; + size_t i = 0; + res_columns[i++]->insert(queries[query_id]); // entry + const auto & address = shard_addresses[replica_index]; + res_columns[i++]->insert(address.host_name); + auto resolved = address.getResolvedAddress(); + res_columns[i++]->insert(resolved ? resolved->host().toString() : String()); // host_address + res_columns[i++]->insert(address.port); + ddl_query_path = fs::path(ddl_zookeeper_path) / queries[query_id]; - zkutil::Strings active_nodes; - zkutil::Strings finished_nodes; + zkutil::Strings active_nodes; + zkutil::Strings finished_nodes; - // on error just skip and continue. + // on error just skip and continue. - code = zookeeper->tryGetChildren(fs::path(ddl_query_path) / "active", active_nodes); - if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE) - zk_exception_code = code; + code = zookeeper->tryGetChildren(fs::path(ddl_query_path) / "active", active_nodes); + if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE) + zk_exception_code = code; - code = zookeeper->tryGetChildren(fs::path(ddl_query_path) / "finished", finished_nodes); - if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE) - zk_exception_code = code; + code = zookeeper->tryGetChildren(fs::path(ddl_query_path) / "finished", finished_nodes); + if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE) + zk_exception_code = code; - /* status: + /* status: * active: If the hostname:port entry is present under active path. * finished: If the hostname:port entry is present under the finished path. * errored: If the hostname:port entry is present under the finished path but the error count is not 0. * unknown: If the above cases don't hold true, then status is unknown. */ - if (std::find(active_nodes.begin(), active_nodes.end(), address.toString()) != active_nodes.end()) - { - res_columns[i++]->insert(static_cast(Status::active)); - } - else if (std::find(finished_nodes.begin(), finished_nodes.end(), address.toString()) != finished_nodes.end()) - { - if (pool_status[replica_index].error_count != 0) + if (std::find(active_nodes.begin(), active_nodes.end(), address.toString()) != active_nodes.end()) { - res_columns[i++]->insert(static_cast(Status::errored)); + res_columns[i++]->insert(static_cast(Status::active)); + } + else if (std::find(finished_nodes.begin(), finished_nodes.end(), address.toString()) != finished_nodes.end()) + { + if (pool_status[replica_index].error_count != 0) + { + res_columns[i++]->insert(static_cast(Status::errored)); + } + else + { + res_columns[i++]->insert(static_cast(Status::finished)); + } + // regardless of the status finished or errored, the node host_name:port entry was found under the /finished path + // & should be able to get the contents of the znode at /finished path. + auto res_fn = zookeeper->asyncTryGet(fs::path(ddl_query_path) / "finished"); + auto stat_fn = res_fn.get().stat; + query_finish_time = stat_fn.mtime; } else { - res_columns[i++]->insert(static_cast(Status::finished)); + res_columns[i++]->insert(static_cast(Status::unknown)); } - // regardless of the status finished or errored, the node host_name:port entry was found under the /finished path - // & should be able to get the contents of the znode at /finished path. - auto res_fn = zookeeper->asyncTryGet(fs::path(ddl_query_path) / "finished"); - auto stat_fn = res_fn.get().stat; - query_finish_time = stat_fn.mtime; + + Coordination::GetResponse res; + if (!futures.empty()) + res = futures[query_id].get(); + + auto query_start_time = res.stat.mtime; + + DDLLogEntry entry; + entry.parse(res.data); + String cluster_name = clusterNameFromDDLQuery(context, entry); + + res_columns[i++]->insert(cluster_name); + res_columns[i++]->insert(entry.query); + res_columns[i++]->insert(entry.initiator); + res_columns[i++]->insert(UInt64(query_start_time / 1000)); + res_columns[i++]->insert(UInt64(query_finish_time / 1000)); + res_columns[i++]->insert(UInt64(query_finish_time - query_start_time)); + res_columns[i++]->insert(static_cast(zk_exception_code)); } - else - { - res_columns[i++]->insert(static_cast(Status::unknown)); - } - - // This is the original cluster_name from the query (WHERE cluster='cluster_name') - res_columns[i++]->insert(cluster_name); - - Coordination::GetResponse res; - if (!futures.empty()) - res = futures[query_id].get(); - - auto query_start_time = res.stat.mtime; - - DDLLogEntry entry; - entry.parse(res.data); - - res_columns[i++]->insert(entry.query); // query - res_columns[i++]->insert(entry.initiator); // initiator - res_columns[i++]->insert(UInt64(query_start_time / 1000)); // query_start_time - res_columns[i++]->insert(UInt64(query_finish_time / 1000)); // query_finish_time - res_columns[i++]->insert(UInt64(query_finish_time - query_start_time)); // query_duration_ms - res_columns[i++]->insert(static_cast(zk_exception_code)); // query_duration_ms } } } } +String StorageSystemDDLWorkerQueue::clusterNameFromDDLQuery(const Context & context, const DDLLogEntry & entry) const +{ + const char * begin = entry.query.data(); + const char * end = begin + entry.query.size(); + ASTPtr query; + ASTQueryWithOnCluster * query_on_cluster; + String cluster_name = ""; + ParserQuery parser_query(end); + String description; + query = parseQuery(parser_query, begin, end, description, 0, context.getSettingsRef().max_parser_depth); + if (query && (query_on_cluster = dynamic_cast(query.get()))) + cluster_name = query_on_cluster->cluster; + return cluster_name; +} + } diff --git a/src/Storages/System/StorageSystemDDLWorkerQueue.h b/src/Storages/System/StorageSystemDDLWorkerQueue.h index a348796e688..92da69e5fd8 100644 --- a/src/Storages/System/StorageSystemDDLWorkerQueue.h +++ b/src/Storages/System/StorageSystemDDLWorkerQueue.h @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -29,5 +30,6 @@ public: std::string getName() const override { return "SystemDDLWorkerQueue"; } static NamesAndTypesList getNamesAndTypes(); + String clusterNameFromDDLQuery(const Context & context, const DB::DDLLogEntry & entry) const; }; }