extract cluster_name from DDLQuery

This commit is contained in:
bharatnc 2020-12-30 02:13:26 -08:00
parent f0ea07b493
commit 6f0009ff52
2 changed files with 100 additions and 152 deletions

View File

@ -5,7 +5,6 @@
#include "StorageSystemDDLWorkerQueue.h"
#include <Columns/ColumnArray.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/DDLWorker.h>
#include <DataTypes/DataTypeArray.h>
@ -16,13 +15,14 @@
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/SelectQueryInfo.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/ParserQuery.h>
#include <Parsers/parseQuery.h>
namespace fs = std::filesystem;
@ -102,75 +102,8 @@ NamesAndTypesList StorageSystemDDLWorkerQueue::getNamesAndTypes()
};
}
static bool extractPathImpl(const IAST & elem, String & res, const Context & context)
void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo &) const
{
const auto * function = elem.as<ASTFunction>();
if (!function)
return false;
if (function->name == "and")
{
for (const auto & child : function->arguments->children)
if (extractPathImpl(*child, res, context))
return true;
return false;
}
if (function->name == "equals")
{
const auto & args = function->arguments->as<ASTExpressionList &>();
ASTPtr value;
if (args.children.size() != 2)
return false;
const ASTIdentifier * ident;
if ((ident = args.children.at(0)->as<ASTIdentifier>()))
value = args.children.at(1);
else if ((ident = args.children.at(1)->as<ASTIdentifier>()))
value = args.children.at(0);
else
return false;
if (ident->name() != "cluster")
return false;
auto evaluated = evaluateConstantExpressionAsLiteral(value, context);
const auto * literal = evaluated->as<ASTLiteral>();
if (!literal)
return false;
if (literal->value.getType() != Field::Types::String)
return false;
res = literal->value.safeGet<String>();
return true;
}
return false;
}
/** Retrieve from the query a condition of the form `path = 'path'`, from conjunctions in the WHERE clause.
*/
static String extractPath(const ASTPtr & query, const Context & context)
{
const auto & select = query->as<ASTSelectQuery &>();
if (!select.where())
return "";
String res;
return extractPathImpl(*select.where(), res, context) ? res : "";
}
void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const
{
String cluster_name = extractPath(query_info.query, context);
if (cluster_name.empty())
throw Exception(
"SELECT from system.ddl_worker_queue table must contain condition like cluster = 'cluster' in WHERE clause.",
ErrorCodes::BAD_ARGUMENTS);
zkutil::ZooKeeperPtr zookeeper = context.getZooKeeper();
Coordination::Error zk_exception_code = Coordination::Error::ZOK;
String ddl_zookeeper_path = config.getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/");
@ -189,107 +122,120 @@ void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const C
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
zk_exception_code = code;
const auto & cluster = context.tryGetCluster(cluster_name);
const auto & shards_info = cluster->getShardsInfo();
const auto & addresses_with_failover = cluster->getShardsAddresses();
if (cluster == nullptr)
throw Exception("No cluster with the name: " + cluster_name + " was found.", ErrorCodes::BAD_ARGUMENTS);
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
const auto & clusters = context.getClusters();
for (const auto & name_and_cluster : clusters.getContainer())
{
const auto & shard_addresses = addresses_with_failover[shard_index];
const auto & shard_info = shards_info[shard_index];
const auto pool_status = shard_info.pool->getStatus();
for (size_t replica_index = 0; replica_index < shard_addresses.size(); ++replica_index)
const ClusterPtr & cluster = name_and_cluster.second;
const auto & shards_info = cluster->getShardsInfo();
const auto & addresses_with_failover = cluster->getShardsAddresses();
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
// iterate through queries
/* Dir contents of every query will be similar to
[zk: localhost:2181(CONNECTED) 53] ls /clickhouse/task_queue/ddl/query-0000000004
[active, finished]
*/
std::vector<std::future<Coordination::GetResponse>> futures;
futures.reserve(queries.size());
for (const String & q : queries)
const auto & shard_addresses = addresses_with_failover[shard_index];
const auto & shard_info = shards_info[shard_index];
const auto pool_status = shard_info.pool->getStatus();
for (size_t replica_index = 0; replica_index < shard_addresses.size(); ++replica_index)
{
futures.push_back(zookeeper->asyncTryGet(fs::path(ddl_zookeeper_path) / q));
}
for (size_t query_id = 0; query_id < queries.size(); query_id++)
{
Int64 query_finish_time = 0;
size_t i = 0;
res_columns[i++]->insert(queries[query_id]); // entry
const auto & address = shard_addresses[replica_index];
res_columns[i++]->insert(address.host_name); // host_name
auto resolved = address.getResolvedAddress();
res_columns[i++]->insert(resolved ? resolved->host().toString() : String()); // host_address
res_columns[i++]->insert(address.port); // port
ddl_query_path = fs::path(ddl_zookeeper_path) / queries[query_id];
/* Dir contents of every query will be similar to
[zk: localhost:2181(CONNECTED) 53] ls /clickhouse/task_queue/ddl/query-0000000004
[active, finished]
*/
std::vector<std::future<Coordination::GetResponse>> futures;
futures.reserve(queries.size());
for (const String & q : queries)
{
futures.push_back(zookeeper->asyncTryGet(fs::path(ddl_zookeeper_path) / q));
}
for (size_t query_id = 0; query_id < queries.size(); query_id++)
{
Int64 query_finish_time = 0;
size_t i = 0;
res_columns[i++]->insert(queries[query_id]); // entry
const auto & address = shard_addresses[replica_index];
res_columns[i++]->insert(address.host_name);
auto resolved = address.getResolvedAddress();
res_columns[i++]->insert(resolved ? resolved->host().toString() : String()); // host_address
res_columns[i++]->insert(address.port);
ddl_query_path = fs::path(ddl_zookeeper_path) / queries[query_id];
zkutil::Strings active_nodes;
zkutil::Strings finished_nodes;
zkutil::Strings active_nodes;
zkutil::Strings finished_nodes;
// on error just skip and continue.
// on error just skip and continue.
code = zookeeper->tryGetChildren(fs::path(ddl_query_path) / "active", active_nodes);
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
zk_exception_code = code;
code = zookeeper->tryGetChildren(fs::path(ddl_query_path) / "active", active_nodes);
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
zk_exception_code = code;
code = zookeeper->tryGetChildren(fs::path(ddl_query_path) / "finished", finished_nodes);
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
zk_exception_code = code;
code = zookeeper->tryGetChildren(fs::path(ddl_query_path) / "finished", finished_nodes);
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
zk_exception_code = code;
/* status:
/* status:
* active: If the hostname:port entry is present under active path.
* finished: If the hostname:port entry is present under the finished path.
* errored: If the hostname:port entry is present under the finished path but the error count is not 0.
* unknown: If the above cases don't hold true, then status is unknown.
*/
if (std::find(active_nodes.begin(), active_nodes.end(), address.toString()) != active_nodes.end())
{
res_columns[i++]->insert(static_cast<Int8>(Status::active));
}
else if (std::find(finished_nodes.begin(), finished_nodes.end(), address.toString()) != finished_nodes.end())
{
if (pool_status[replica_index].error_count != 0)
if (std::find(active_nodes.begin(), active_nodes.end(), address.toString()) != active_nodes.end())
{
res_columns[i++]->insert(static_cast<Int8>(Status::errored));
res_columns[i++]->insert(static_cast<Int8>(Status::active));
}
else if (std::find(finished_nodes.begin(), finished_nodes.end(), address.toString()) != finished_nodes.end())
{
if (pool_status[replica_index].error_count != 0)
{
res_columns[i++]->insert(static_cast<Int8>(Status::errored));
}
else
{
res_columns[i++]->insert(static_cast<Int8>(Status::finished));
}
// regardless of the status finished or errored, the node host_name:port entry was found under the /finished path
// & should be able to get the contents of the znode at /finished path.
auto res_fn = zookeeper->asyncTryGet(fs::path(ddl_query_path) / "finished");
auto stat_fn = res_fn.get().stat;
query_finish_time = stat_fn.mtime;
}
else
{
res_columns[i++]->insert(static_cast<Int8>(Status::finished));
res_columns[i++]->insert(static_cast<Int8>(Status::unknown));
}
// regardless of the status finished or errored, the node host_name:port entry was found under the /finished path
// & should be able to get the contents of the znode at /finished path.
auto res_fn = zookeeper->asyncTryGet(fs::path(ddl_query_path) / "finished");
auto stat_fn = res_fn.get().stat;
query_finish_time = stat_fn.mtime;
Coordination::GetResponse res;
if (!futures.empty())
res = futures[query_id].get();
auto query_start_time = res.stat.mtime;
DDLLogEntry entry;
entry.parse(res.data);
String cluster_name = clusterNameFromDDLQuery(context, entry);
res_columns[i++]->insert(cluster_name);
res_columns[i++]->insert(entry.query);
res_columns[i++]->insert(entry.initiator);
res_columns[i++]->insert(UInt64(query_start_time / 1000));
res_columns[i++]->insert(UInt64(query_finish_time / 1000));
res_columns[i++]->insert(UInt64(query_finish_time - query_start_time));
res_columns[i++]->insert(static_cast<Int8>(zk_exception_code));
}
else
{
res_columns[i++]->insert(static_cast<Int8>(Status::unknown));
}
// This is the original cluster_name from the query (WHERE cluster='cluster_name')
res_columns[i++]->insert(cluster_name);
Coordination::GetResponse res;
if (!futures.empty())
res = futures[query_id].get();
auto query_start_time = res.stat.mtime;
DDLLogEntry entry;
entry.parse(res.data);
res_columns[i++]->insert(entry.query); // query
res_columns[i++]->insert(entry.initiator); // initiator
res_columns[i++]->insert(UInt64(query_start_time / 1000)); // query_start_time
res_columns[i++]->insert(UInt64(query_finish_time / 1000)); // query_finish_time
res_columns[i++]->insert(UInt64(query_finish_time - query_start_time)); // query_duration_ms
res_columns[i++]->insert(static_cast<Int8>(zk_exception_code)); // query_duration_ms
}
}
}
}
String StorageSystemDDLWorkerQueue::clusterNameFromDDLQuery(const Context & context, const DDLLogEntry & entry) const
{
const char * begin = entry.query.data();
const char * end = begin + entry.query.size();
ASTPtr query;
ASTQueryWithOnCluster * query_on_cluster;
String cluster_name = "";
ParserQuery parser_query(end);
String description;
query = parseQuery(parser_query, begin, end, description, 0, context.getSettingsRef().max_parser_depth);
if (query && (query_on_cluster = dynamic_cast<ASTQueryWithOnCluster *>(query.get())))
cluster_name = query_on_cluster->cluster;
return cluster_name;
}
}

View File

@ -3,6 +3,7 @@
#include <Poco/Util/Application.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Interpreters/DDLWorker.h>
#include <Storages/System/IStorageSystemOneBlock.h>
#include <ext/shared_ptr_helper.h>
@ -29,5 +30,6 @@ public:
std::string getName() const override { return "SystemDDLWorkerQueue"; }
static NamesAndTypesList getNamesAndTypes();
String clusterNameFromDDLQuery(const Context & context, const DB::DDLLogEntry & entry) const;
};
}