Review - initial round of changes

This commit is contained in:
bharatnc 2020-12-13 23:18:19 -08:00
parent f6d33c49bb
commit 2a122905f1

View File

@ -1,29 +1,133 @@
#include <algorithm>
#include <filesystem>
#include <string>
#include "StorageSystemDDLWorkerQueue.h"
#include <Columns/ColumnArray.h>
#include <Interpreters/Cluster.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/SelectQueryInfo.h>
#include <Common/ZooKeeper/ZooKeeper.h>
namespace fs = std::filesystem;
enum status
{
active,
finished,
unknown,
errored
};
std::vector<std::pair<String, Int8>> getStatusEnumsAndValues()
{
return std::vector<std::pair<String, Int8>>{
{"active", static_cast<Int8>(status::active)},
{"finished", static_cast<Int8>(status::finished)},
{"unknown", static_cast<Int8>(status::unknown)},
{"errored", static_cast<Int8>(status::errored)},
};
}
namespace DB
{
NamesAndTypesList StorageSystemDDLWorkerQueue::getNamesAndTypes()
{
return {
{"name", std::make_shared<DataTypeString>()}, // query_<id>
{"active", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}, // list of host_fqdn
{"finished", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}, // list of host_fqdn
{"entry", std::make_shared<DataTypeString>()},
{"host_name", std::make_shared<DataTypeString>()},
{"host_address", std::make_shared<DataTypeString>()},
{"port", std::make_shared<DataTypeUInt16>()},
{"status", std::make_shared<DataTypeEnum8>(getStatusEnumsAndValues())},
{"cluster", std::make_shared<DataTypeString>()},
};
}
void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo &) const
static bool extractPathImpl(const IAST & elem, String & res, const Context & context)
{
const auto * function = elem.as<ASTFunction>();
if (!function)
return false;
if (function->name == "and")
{
for (const auto & child : function->arguments->children)
if (extractPathImpl(*child, res, context))
return true;
return false;
}
if (function->name == "equals")
{
const auto & args = function->arguments->as<ASTExpressionList &>();
ASTPtr value;
if (args.children.size() != 2)
return false;
const ASTIdentifier * ident;
if ((ident = args.children.at(0)->as<ASTIdentifier>()))
value = args.children.at(1);
else if ((ident = args.children.at(1)->as<ASTIdentifier>()))
value = args.children.at(0);
else
return false;
if (ident->name() != "cluster")
return false;
auto evaluated = evaluateConstantExpressionAsLiteral(value, context);
const auto * literal = evaluated->as<ASTLiteral>();
if (!literal)
return false;
if (literal->value.getType() != Field::Types::String)
return false;
res = literal->value.safeGet<String>();
return true;
}
return false;
}
/** Retrieve from the query a condition of the form `path = 'path'`, from conjunctions in the WHERE clause.
*/
static String extractPath(const ASTPtr & query, const Context & context)
{
const auto & select = query->as<ASTSelectQuery &>();
if (!select.where())
return "";
String res;
return extractPathImpl(*select.where(), res, context) ? res : "";
}
void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const
{
String cluster_name = extractPath(query_info.query, context);
if (cluster_name.empty())
throw Exception(
"SELECT from system.ddl_worker_queue table must contain condition like cluster = 'cluster' in WHERE clause.",
ErrorCodes::BAD_ARGUMENTS);
zkutil::ZooKeeperPtr zookeeper = context.getZooKeeper();
String ddl_zookeeper_path = config.getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/");
@ -35,40 +139,60 @@ void StorageSystemDDLWorkerQueue::fillData(MutableColumns & res_columns, const C
*/
zkutil::Strings queries = zookeeper->getChildren(ddl_zookeeper_path);
// iterate through queries
/* Dir contents of every query will be similar to
[zk: localhost:2181(CONNECTED) 53] ls /clickhouse/task_queue/ddl/query-0000000004
[active, finished]
*/
for (const String & q : queries)
const auto & cluster = context.tryGetCluster(cluster_name);
const auto & shards_info = cluster->getShardsInfo();
const auto & addresses_with_failover = cluster->getShardsAddresses();
if (cluster == nullptr)
throw Exception("No cluster with the name: " + cluster_name + " was found.", ErrorCodes::BAD_ARGUMENTS);
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
/* Fetch all nodes under active & finished.
[zk: localhost:2181(CONNECTED) 54] ls /clickhouse/task_queue/ddl/query-0000000004/active
[]
[zk: localhost:2181(CONNECTED) 55] ls /clickhouse/task_queue/ddl/query-0000000004/finished
[clickhouse01:9000, clickhouse02:9000, clickhouse03:9000, clickhouse04:9000]
*/
size_t col_num = 0;
ddl_query_path = ddl_zookeeper_path + "/" + q;
zkutil::Strings active_nodes = zookeeper->getChildren(ddl_query_path + "/active");
zkutil::Strings finished_nodes = zookeeper->getChildren(ddl_query_path + "/finished");
res_columns[col_num++]->insert(q); // name - query_<id>
const auto & shard_addresses = addresses_with_failover[shard_index];
const auto & shard_info = shards_info[shard_index];
const auto pool_status = shard_info.pool->getStatus();
for (size_t replica_index = 0; replica_index < shard_addresses.size(); ++replica_index)
{
Array active_nodes_array;
active_nodes_array.reserve(active_nodes.size());
for (const String & active_node : active_nodes)
active_nodes_array.emplace_back(active_node);
res_columns[col_num++]->insert(active_nodes_array);
}
// iterate through queries
/* Dir contents of every query will be similar to
[zk: localhost:2181(CONNECTED) 53] ls /clickhouse/task_queue/ddl/query-0000000004
[active, finished]
*/
for (size_t query_id = 0; query_id < queries.size(); query_id++)
{
size_t i = 0;
res_columns[i++]->insert(queries[query_id]); // entry
const auto & address = shard_addresses[replica_index];
res_columns[i++]->insert(address.host_name); // host_name
auto resolved = address.getResolvedAddress();
res_columns[i++]->insert(resolved ? resolved->host().toString() : String()); // host_address
res_columns[i++]->insert(address.port); // port
ddl_query_path = fs::path(ddl_zookeeper_path) / queries[query_id];
zkutil::Strings active_nodes = zookeeper->getChildren(fs::path(ddl_query_path) / "active");
zkutil::Strings finished_nodes = zookeeper->getChildren(fs::path(ddl_query_path) / "finished");
{
Array finished_nodes_array;
finished_nodes_array.reserve(active_nodes.size());
for (const String & finished_node : finished_nodes)
finished_nodes_array.emplace_back(finished_node);
res_columns[col_num++]->insert(finished_nodes_array);
// status
if (std::find(active_nodes.begin(), active_nodes.end(), address.toString()) != active_nodes.end())
{
res_columns[i++]->insert(static_cast<Int8>(status::active));
}
else if (std::find(finished_nodes.begin(), finished_nodes.end(), address.toString()) != finished_nodes.end())
{
if (pool_status[replica_index].error_count != 0)
{
res_columns[i++]->insert(static_cast<Int8>(status::errored));
}
else
{
res_columns[i++]->insert(static_cast<Int8>(status::finished));
}
}
else
{
res_columns[i++]->insert(static_cast<Int8>(status::unknown));
}
res_columns[i++]->insert(cluster_name); // cluster_name from the query.
}
}
}
}